Skip to content

Limit Risk Rule Score Stacking

These will help reduce the maximum amount of risk which can be added from noisy Risk Rules.

There are two methods for limiting score stacking

- Skill Level Pros Cons
Method I Beginner Easy to get started with Less context around what was capped and why
Method II Intermediate More precise deduplication and additional information Additional understanding of SPL

Method I

This caps the risk score contribution of a single source by 3x the highest score from that source.

1
2
3
4
5
6
7
8
9
| tstats summariesonly=true sum(All_Risk.calculated_risk_score) as summed_risk_score max(All_Risk.calculated_risk_score) as single_risk_score dc(source) as source_count count
 FROM datamodel=Risk.All_Risk
 WHERE All_Risk.risk_object_type="*" (All_Risk.risk_object="*" OR risk_object="*")
 BY All_Risk.risk_object All_Risk.risk_object_type source
| eval capped_risk_score=if(summed_risk_score < single_risk_score*3, summed_risk_score, single_risk_score*3)
| stats sum(capped_risk_score) as capped_risk_score sum(summed_risk_score) as summed_risk_score dc(source) as source sum(count) as count
 BY All_Risk.risk_object All_Risk.risk_object_type
| sort 1000 - risk_score
...

Note

You may want to limit this to particular sources, but this is extra handy for noisy sources like EDR, DLP, or IDS.

Thanks David Dorsey!

Method II

This option adds some complexity, however, provides more information and better deduplication. The full write-up of how to accomplish this method can be found on gabs website.

Visit Website

Deduplicate Notable Events *reference: https://www.gabrielvasseur.com/post/rba-a-better-way-to-dedup-risk-events

Final SPL from blog post with re-addition of threat object
| tstats summariesonly=t count latest(_time) as _time
    values(All_Risk.annotations.mitre_attack.mitre_tactic_id) as mitre_tactic_id
    values(All_Risk.annotations.mitre_attack.mitre_technique_id) as mitre_technique_id
    from datamodel=Risk 
    by All_Risk.normalized_risk_object  All_Risk.risk_object_type  All_Risk.calculated_risk_score source All_Risk.risk_message All_Risk.threat_object All_Risk.threat_object_type
``` Initial query -- notice we split BY risk_score, risk_message, source, and threat object so we can get find every unique combination ```    
| rename All_Risk.* as *
| rename normalized_risk_object AS risk_object
| eval risk_score=round( calculated_risk_score, 0 )
``` Get breakdown per risk_message ``` 
| eventstats sum(count) as count_msg
    by risk_object risk_object_type risk_score source risk_message 
| eventstats values(eval(count_msg."*".risk_score)) as breakdown_msg
    by risk_object risk_object_type source risk_message
``` Get breakdown per source ```
| eventstats sum(count) as count_src
    by risk_object risk_object_type risk_score source 
| eventstats values(eval(count_src."*".risk_score)) as breakdown_src
    by risk_object risk_object_type source
``` Connect threat objects with their type so they can be retained ```
| eval threat_object_combo = threat_object_type.": ".threat_object
```Reduce to unique risk_message and retain highest score ```
| stats sum(count) as risk_event_count values(breakdown_src) as breakdown_src values(breakdown_msg) as breakdown_msg sum(eval(risk_score*count)) as total_score max(risk_score) as max_score latest(_time) as _time values(mitre_*) as mitre_*
    by risk_object risk_object_type source risk_message threat_object_combo
``` START limit to a maximum of 10 contributions per source ```
| eval risk_message= mvjoin(breakdown_msg,"+")."=".max_score
    . if( total_score!=max_score, " (!" . total_score . ")", "") . " " .risk_message
``` Only the lowest scores will be dedup'd ```
| sort 0 risk_object risk_object_type source - max_score 
| eventstats dc(risk_message) as dc_msg_per_source by risk_object risk_object_type source 
| streamstats count as rank_per_source by risk_object risk_object_type source 
| eval risk_message=case( 
    rank_per_source <= 10, risk_message,
    rank_per_source = 11, "...+" . ( dc_msg_per_source - 10 ) . " others from '" . source . "'..." ,
    1==1, null() ) 
| eval max_score=if( rank_per_source <= 10, max_score, 0 )
``` END limit to a maximum of 10 contributions per source ```
``` Reduce to unique source ```
| stats sum(risk_event_count) as risk_event_count values(breakdown_src) as breakdown_src values(threat_object_combo) as threat_object_combo list(risk_message) as risk_message sum(max_score) as risk_score sum(total_score) as risk_score_nodedup latest(_time) as _time values(mitre_*) as mitre_*
    by risk_object risk_object_type source 
``` create field we can split for a summary field as well as separate individual threat objects back to expected fields ```
| eval threat_object_single = mvjoin(threat_object_combo,"|||")
| eval threat_object_combo = source."----- ".mvjoin(threat_object_combo,"----- ")
| eval breakdown_src = mvjoin(breakdown_src,"+") ."=".risk_score
    . if( risk_score!=risk_score_nodedup, " (!" . risk_score_nodedup . ")", "" ) . " ".source
``` Reduce to unique object ```
| stats sum(risk_event_count) as risk_event_count list(source) as source dc(source) as source_count list(breakdown_src) as srcs values(threat_object_combo) as threat_object_combo values(threat_object_single) as threat_object_single list(risk_message) as risk_message sum(risk_score) as risk_score sum(risk_score_nodedup) as risk_score_nodedup latest(_time) as _time values(mitre_*) as mitre_* dc( mitre_tactic_id) as mitre_tactic_id_count dc(mitre_technique_id) as mitre_technique_id_count
    by risk_object risk_object_type
| eval normalized_risk_object = risk_object , threat_object_combo = split(mvjoin(threat_object_combo,"---"),"---") , threat_object_single = split(mvjoin(threat_object_single,"|||"),"|||")

Authors

@7thdrxn - Haylee Mills
@gabs - Gabriel Vasseur