Limit Risk Rule Score Stacking¶
These will help reduce the maximum amount of risk which can be added from noisy Risk Rules.
Navigation¶
There are two methods for limiting score stacking
- | Skill Level | Pros | Cons |
---|---|---|---|
Method I | Beginner | Easy to get started with | Less context around what was capped and why |
Method II | Intermediate | More precise deduplication and additional information | Additional understanding of SPL |
Method I¶
This caps the risk score contribution of a single source by 3x the highest score from that source.
Note
You may want to limit this to particular sources, but this is extra handy for noisy sources like EDR, DLP, or IDS.
Thanks David Dorsey!
Method II¶
This option adds some complexity, however, provides more information and better deduplication. The full write-up of how to accomplish this method can be found on gabs website.
*reference: https://www.gabrielvasseur.com/post/rba-a-better-way-to-dedup-risk-events
Final SPL from blog post with re-addition of threat object
| tstats summariesonly=t count latest(_time) as _time
values(All_Risk.annotations.mitre_attack.mitre_tactic_id) as mitre_tactic_id
values(All_Risk.annotations.mitre_attack.mitre_technique_id) as mitre_technique_id
from datamodel=Risk
by All_Risk.normalized_risk_object All_Risk.risk_object_type All_Risk.calculated_risk_score source All_Risk.risk_message All_Risk.threat_object All_Risk.threat_object_type
``` Initial query -- notice we split BY risk_score, risk_message, source, and threat object so we can get find every unique combination ```
| rename All_Risk.* as *
| rename normalized_risk_object AS risk_object
| eval risk_score=round( calculated_risk_score, 0 )
``` Get breakdown per risk_message ```
| eventstats sum(count) as count_msg
by risk_object risk_object_type risk_score source risk_message
| eventstats values(eval(count_msg."*".risk_score)) as breakdown_msg
by risk_object risk_object_type source risk_message
``` Get breakdown per source ```
| eventstats sum(count) as count_src
by risk_object risk_object_type risk_score source
| eventstats values(eval(count_src."*".risk_score)) as breakdown_src
by risk_object risk_object_type source
``` Connect threat objects with their type so they can be retained ```
| eval threat_object_combo = threat_object_type.": ".threat_object
```Reduce to unique risk_message and retain highest score ```
| stats sum(count) as risk_event_count values(breakdown_src) as breakdown_src values(breakdown_msg) as breakdown_msg sum(eval(risk_score*count)) as total_score max(risk_score) as max_score latest(_time) as _time values(mitre_*) as mitre_*
by risk_object risk_object_type source risk_message threat_object_combo
``` START limit to a maximum of 10 contributions per source ```
| eval risk_message= mvjoin(breakdown_msg,"+")."=".max_score
. if( total_score!=max_score, " (!" . total_score . ")", "") . " " .risk_message
``` Only the lowest scores will be dedup'd ```
| sort 0 risk_object risk_object_type source - max_score
| eventstats dc(risk_message) as dc_msg_per_source by risk_object risk_object_type source
| streamstats count as rank_per_source by risk_object risk_object_type source
| eval risk_message=case(
rank_per_source <= 10, risk_message,
rank_per_source = 11, "...+" . ( dc_msg_per_source - 10 ) . " others from '" . source . "'..." ,
1==1, null() )
| eval max_score=if( rank_per_source <= 10, max_score, 0 )
``` END limit to a maximum of 10 contributions per source ```
``` Reduce to unique source ```
| stats sum(risk_event_count) as risk_event_count values(breakdown_src) as breakdown_src values(threat_object_combo) as threat_object_combo list(risk_message) as risk_message sum(max_score) as risk_score sum(total_score) as risk_score_nodedup latest(_time) as _time values(mitre_*) as mitre_*
by risk_object risk_object_type source
``` create field we can split for a summary field as well as separate individual threat objects back to expected fields ```
| eval threat_object_single = mvjoin(threat_object_combo,"|||")
| eval threat_object_combo = source."----- ".mvjoin(threat_object_combo,"----- ")
| eval breakdown_src = mvjoin(breakdown_src,"+") ."=".risk_score
. if( risk_score!=risk_score_nodedup, " (!" . risk_score_nodedup . ")", "" ) . " ".source
``` Reduce to unique object ```
| stats sum(risk_event_count) as risk_event_count list(source) as source dc(source) as source_count list(breakdown_src) as srcs values(threat_object_combo) as threat_object_combo values(threat_object_single) as threat_object_single list(risk_message) as risk_message sum(risk_score) as risk_score sum(risk_score_nodedup) as risk_score_nodedup latest(_time) as _time values(mitre_*) as mitre_* dc( mitre_tactic_id) as mitre_tactic_id_count dc(mitre_technique_id) as mitre_technique_id_count
by risk_object risk_object_type
| eval normalized_risk_object = risk_object , threat_object_combo = split(mvjoin(threat_object_combo,"---"),"---") , threat_object_single = split(mvjoin(threat_object_single,"|||"),"|||")
Authors