Skip to content

DataGenerator

PytestSplunkAddonDataParser

PytestSplunkAddonDataParser

This class parses pytest-splunk-addon-data.conf file.

Parameters:

Name Type Description Default
addon_path str

Path to the Splunk App

required
Source code in pytest_splunk_addon/sample_generation/pytest_splunk_addon_data_parser.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class PytestSplunkAddonDataParser:
    """
    This class parses pytest-splunk-addon-data.conf file.

    Args:
        addon_path: Path to the Splunk App
    """

    conf_name = " "

    def __init__(self, addon_path: str, config_path: str):
        self._conf_parser = conf_parser.TABConfigParser()
        self.config_path = config_path
        self._psa_data = None
        self.addon_path = addon_path
        self.match_stanzas = set()
        self._path_to_samples = self._get_path_to_samples()

    def _get_path_to_samples(self):
        if os.path.exists(os.path.join(self.config_path, "samples")):
            LOGGER.info(
                "Samples path is: {}".format(os.path.join(self.config_path, "samples"))
            )
            return os.path.join(self.config_path, "samples")
        elif os.path.exists(
            os.path.join(
                os.path.abspath(os.path.join(self.config_path, os.pardir)), "samples"
            )
        ):
            LOGGER.info(
                "Samples path is: {}".format(
                    os.path.join(
                        os.path.abspath(os.path.join(self.config_path, os.pardir)),
                        "samples",
                    )
                )
            )
            return os.path.join(
                os.path.abspath(os.path.join(self.config_path, os.pardir)), "samples"
            )
        else:
            LOGGER.info(
                "Samples path is: {}".format(os.path.join(self.addon_path, "samples"))
            )
            return os.path.join(self.addon_path, "samples")

    @property
    def psa_data(self):
        psa_data_path = os.path.join(self.config_path, PSA_DATA_CONFIG_FILE)
        if os.path.exists(psa_data_path):
            self._conf_parser.read(psa_data_path)
            self.conf_name = "psa-data-gen"
            self._psa_data = self._conf_parser.item_dict()
            return self._psa_data
        else:
            LOGGER.warning(f"{PSA_DATA_CONFIG_FILE} not found")
            raise FileNotFoundError(f"{PSA_DATA_CONFIG_FILE} not found")

    def get_sample_stanzas(self):
        """
        Converts a stanza in pytest-splunk-addon-data.conf to an object of SampleStanza.

        Returns:
            List of SampleStanza objects.
        """
        _psa_data = self._get_psa_data_stanzas()
        self._check_samples()
        results = []
        for sample_name, stanza_params in sorted(_psa_data.items()):
            sample_path = os.path.join(self._path_to_samples, sample_name)
            results.append(SampleStanza(sample_path, stanza_params))
        return results

    def _get_psa_data_stanzas(self):
        """
        Parses the pytest-splunk-addon-data.conf file and converts it into a dictionary.

        Format::

            {
                "sample_file_name": # Not Stanza name
                {
                    "input_type": "str",
                    "tokens":
                    {
                        1:
                        {
                            token: #One#
                            replacementType: random
                            replacement: static
                        }
                    }
                }
            }

        Return:
            Dictionary representing pytest-splunk-addon-data.conf in the above format.
        """
        psa_data_dict = {}
        schema = XMLSchema(SCHEMA_PATH)
        if os.path.exists(self._path_to_samples):
            for sample_file in os.listdir(self._path_to_samples):
                for stanza, fields in sorted(self.psa_data.items()):
                    stanza_match_obj = re.search(stanza, sample_file)
                    if stanza_match_obj and stanza_match_obj.group(0) == sample_file:
                        self.match_stanzas.add(stanza)
                        if (
                            "requirement_test_sample" in self.psa_data[stanza].keys()
                            and int(self.psa_data[stanza]["requirement_test_sample"])
                            > 0
                        ):
                            filename = os.path.join(self._path_to_samples, sample_file)
                            schema.validate(filename)
                            test_unicode_char(filename)
                        psa_data_dict.setdefault(sample_file, {"tokens": {}})
                        for key, value in fields.items():
                            if key.startswith("token"):
                                _, token_id, token_param = key.split(".")
                                token_key = f"{stanza}_{token_id}"
                                if (
                                    not token_key
                                    in psa_data_dict[sample_file]["tokens"].keys()
                                ):
                                    psa_data_dict[sample_file]["tokens"][token_key] = {}
                                psa_data_dict[sample_file]["tokens"][token_key][
                                    token_param
                                ] = value
                            else:
                                psa_data_dict[sample_file][key] = value
        return psa_data_dict

    def _check_samples(self):
        """
        Gives a user warning when sample file is not found for the stanza
        present in the configuration file.
        """
        if os.path.exists(self._path_to_samples):
            for stanza in self.psa_data.keys():
                if stanza not in self.match_stanzas:
                    raise_warning(f"No sample file found for stanza : {stanza}")
                LOGGER.info(f"Sample file found for stanza : {stanza}")

get_sample_stanzas()

Converts a stanza in pytest-splunk-addon-data.conf to an object of SampleStanza.

Returns:

Type Description

List of SampleStanza objects.

Source code in pytest_splunk_addon/sample_generation/pytest_splunk_addon_data_parser.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def get_sample_stanzas(self):
    """
    Converts a stanza in pytest-splunk-addon-data.conf to an object of SampleStanza.

    Returns:
        List of SampleStanza objects.
    """
    _psa_data = self._get_psa_data_stanzas()
    self._check_samples()
    results = []
    for sample_name, stanza_params in sorted(_psa_data.items()):
        sample_path = os.path.join(self._path_to_samples, sample_name)
        results.append(SampleStanza(sample_path, stanza_params))
    return results

SampleStanza

SampleStanza

Bases: object

This class represents a stanza of the pytest-splunk-addon-data.conf. It contains all the parameters for the stanza such as:

* Sample Name
* Tokens
* Sample file's raw data
* Tokenized events
* Sample ingestion type

Parameters:

Name Type Description Default
sample_path str

Path to the sample file

required
psa_data_params dict

Dictionary representing pytest-splunk-addon-data.conf

required
Source code in pytest_splunk_addon/sample_generation/sample_stanza.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
class SampleStanza(object):
    """
    This class represents a stanza of the pytest-splunk-addon-data.conf.
    It contains all the parameters for the stanza such as:

        * Sample Name
        * Tokens
        * Sample file's raw data
        * Tokenized events
        * Sample ingestion type

    Args:
        sample_path (str): Path to the sample file
        psa_data_params (dict): Dictionary representing pytest-splunk-addon-data.conf
    """

    def __init__(self, sample_path, psa_data_params):
        self.sample_path = sample_path
        self.sample_name = os.path.basename(sample_path)
        self.metadata = self._parse_meta(psa_data_params)
        self.sample_rules = list(self._parse_rules(psa_data_params, self.sample_path))
        self.input_type = self.metadata.get("input_type", "default")
        self.host_count = 0

    def get_raw_events(self):
        """
        Gets the raw events from the sample file.
        """
        # self.sample_raw_data = list(self._get_raw_sample())
        self.tokenized_events = self._get_raw_sample()

    def get_tokenized_events(self):
        """
        Yields the tokenized events
        """
        for event in self.tokenized_events:
            event.event, event.metadata, event.key_fields = SampleEvent.update_metadata(
                self, event.event, event.metadata, event.key_fields
            )
            yield event

    def tokenize(self, conf_name):
        """
        Tokenizes the raw events by replacing all the tokens in it.

        Args:
            conf_name (str): Name of the conf file, "psa-data-gen"
        """
        if conf_name == "eventgen":
            required_event_count = self.metadata.get("count")
        else:
            required_event_count = 1

        if (
            required_event_count is None
            or int(required_event_count) == 0
            or int(required_event_count) > BULK_EVENT_COUNT
        ):
            required_event_count = BULK_EVENT_COUNT

        bulk_event = []
        raw_event = []
        event_counter = 0
        while (int(required_event_count)) > len((bulk_event)):
            raw_event.insert(event_counter, list(self._get_raw_sample()))
            if not raw_event[-1]:
                break
            for each_rule in self.sample_rules:
                if each_rule:
                    raw_event[event_counter] = each_rule.apply(raw_event[event_counter])
            for event in raw_event[event_counter]:
                host_value = event.metadata.get("host")
                host = token_value(key=host_value, value=host_value)
                event.update_requirement_test_field("host", "##host##", host)
            bulk_event.extend(raw_event[event_counter])
            event_counter = event_counter + 1

        if self.metadata.get("breaker") is not None:
            self.metadata.update(sample_count=1)
            for each in bulk_event:
                each.metadata.update(sample_count=1)

        if self.metadata.get("expected_event_count") is None:
            breaker = self.metadata.get("breaker")
            if breaker is not None:
                expected_events = 0
                for each_event in bulk_event:
                    expected_events += len(
                        list(filter(lambda x: x, self.break_events(each_event.event)))
                    )
            else:
                expected_events = len(bulk_event)
            self.metadata.update(expected_event_count=expected_events)
            for each in bulk_event:
                each.metadata.update(expected_event_count=expected_events)
        else:
            self.metadata.update(sample_count=1)
            for each in bulk_event:
                each.metadata.update(sample_count=1)

        self.tokenized_events = bulk_event

    def _parse_rules(self, psa_data_params, sample_path):
        """
        Yield the rule instance based token replacement type.

        Args:
            psa_data_params (dict): PSA data stanzas dictionary
            sample_path (str): Path to the sample file
        """
        token_list = self._sort_tokens_by_replacement_type_all(
            psa_data_params["tokens"]
        )
        for each_token, token_value in token_list:
            applied_rule = Rule.parse_rule(token_value, psa_data_params, sample_path)
            if not applied_rule:
                raise_warning(
                    "Unidentified Rule: '{}' for token '{}'".format(
                        token_value["replacement"], token_value["token"]
                    )
                )
            else:
                yield applied_rule

    def _parse_meta(self, psa_data_params):
        """
        Return the metadata from PSA data stanzas.

        Args:
            psa_data_params (dict): PSA data stanzas dictionary
        """
        metadata = {
            key: psa_data_params[key] for key in psa_data_params if key != "tokens"
        }
        host = metadata.get("host") or self.sample_name
        metadata.update(host=host)
        if (
            metadata.get("input_type")
            not in [
                "modinput",
                "windows_input",
                "file_monitor",
                "uf_file_monitor",
                "scripted_input",
                "syslog_tcp",
                "syslog_udp",
                "default",
            ]
            and not None
        ):
            raise_warning(
                "Invalid value for input_type found: '{}' using default input_type".format(
                    metadata.get("input_type")
                )
            )
            metadata.update(input_type="default")
        if metadata.get("host_type") not in ["event", "plugin", None]:
            raise_warning(
                "Invalid value for host_type: '{}' using host_type = plugin.".format(
                    metadata.get("host_type")
                )
            )
            metadata.update(host_type="plugin")
        if metadata.get("timestamp_type") not in ["event", "plugin", None]:
            raise_warning(
                "Invalid value for timestamp_type: '{}' using timestamp_type = plugin.".format(
                    metadata.get("timestamp_type")
                )
            )
            metadata.update(timestamp_type="plugin")
        if metadata.get("timezone") not in ["local", "0000", None] and not re.match(
            TIMEZONE_REX, metadata.get("timezone")
        ):
            raise_warning(
                "Invalid value for timezone: '{}' using timezone = 0000.".format(
                    metadata.get("timezone")
                )
            )
            metadata.update(timezone="0000")
            psa_data_params.update(timezone="0000")
        if metadata.get("timestamp_type") not in ["event", "plugin", None]:
            raise_warning(
                "Invalid value for timestamp_type: '{}' using timestamp_type = plugin.".format(
                    metadata.get("timestamp_type")
                )
            )
            metadata.update(timestamp_type="plugin")
        if (
            metadata.get("sample_count")
            and not metadata.get("sample_count").isnumeric()
        ):
            raise_warning(
                "Invalid value for sample_count: '{}' using sample_count = 1.".format(
                    metadata.get("sample_count")
                )
            )
            metadata.update(sample_count="1")
        if (
            metadata.get("expected_event_count")
            and not metadata.get("expected_event_count").isnumeric()
        ):
            raise_warning(
                "Invalid value for expected_event_count: '{}' using expected_event_count = 1.".format(
                    metadata.get("expected_event_count")
                )
            )
            metadata.update(expected_event_count="1")
        if metadata.get("count") and not metadata.get("count").isnumeric():
            raise_warning(
                "Invalid value for count: '{}' using count = 1.".format(
                    metadata.get("count")
                )
            )
            metadata.update(count="100")
        if metadata.get("index") is not None and metadata.get("input_type") in [
            "syslog_tcp",
            "tcp",
            "udp",
        ]:
            raise_warning(
                "For input_type '{}', there should be no index set".format(
                    metadata.get("input_type")
                )
            )
        if metadata.get("input_type") == "uf_file_monitor":
            metadata["host"] = metadata.get("host").replace("_", "-").replace(".", "-")
        return metadata

    def get_eventmetadata(self):
        """
        Return the unique host metadata for event.
        """
        self.host_count += 1
        event_host = self.metadata.get("host") + "_" + str(self.host_count)
        event_metadata = copy.deepcopy(self.metadata)
        event_metadata.update(host=event_host)
        LOGGER.info("event metadata: {}".format(event_metadata))
        return event_metadata

    def _get_raw_sample(self):
        """
        Converts a sample file into raw events based on the input type and breaker.
        Input: Name of the sample file for which events have to be generated.
        Output: Yields object of SampleEvent.

        If the input type is in ["modinput", "windows_input"], a new event will be generated for each line in the file.
        If the input type is in below categories, a single event will be generated for the entire file.
            [
                "file_monitor",
                "scripted_input",
                "syslog_tcp",
                "syslog_udp",
                "default"
            ]
        """
        with open(self.sample_path, "r", encoding="utf-8") as sample_file:
            sample_raw = sample_file.read()

        if self.metadata.get("requirement_test_sample"):
            samples = xmltodict.parse(sample_raw)
            events = (
                samples["device"]["event"]
                if type(samples["device"]["event"]) == list
                else [samples["device"]["event"]]
            )
            if self.metadata.get("sample_count") is None:
                self.metadata.update(sample_count="1")
            for each_event in events:
                event = each_event["raw"].strip()
                event_metadata = self.get_eventmetadata()
                requirement_test_data = self.populate_requirement_test_data(each_event)
                if "transport" in each_event.keys():
                    static_host = each_event["transport"].get("@host")
                    if static_host:
                        event_metadata.update(host=static_host)
                    static_source = each_event["transport"].get("@source")
                    if static_source:
                        event_metadata.update(source=static_source)
                yield SampleEvent(
                    event, event_metadata, self.sample_name, requirement_test_data
                )
        elif self.metadata.get("breaker"):
            for each_event in self.break_events(sample_raw):
                if each_event:
                    event_metadata = self.get_eventmetadata()
                    yield SampleEvent(each_event, event_metadata, self.sample_name)
        elif self.input_type in ["modinput", "windows_input"]:
            for each_line in sample_raw.split("\n"):
                if each_line:
                    event_metadata = self.get_eventmetadata()
                    yield SampleEvent(each_line, event_metadata, self.sample_name)
        elif self.input_type in [
            "file_monitor",
            "uf_file_monitor",
            "scripted_input",
            "syslog_tcp",
            "syslog_udp",
            "default",
        ]:
            event = sample_raw.strip()
            if not event:
                raise_warning("sample file: '{}' is empty".format(self.sample_path))
            else:
                yield SampleEvent(event, self.metadata, self.sample_name)
        if not self.input_type:
            # TODO: input_type not found scenario
            pass
        # More input types to be added here.

    def break_events(self, sample_raw):
        """
        Break sample file into list of raw events using breaker

        Args:
            sample_raw (str): Raw sample

        Return:
            event_list (list): List of raw events
        """

        sample_match = re.finditer(
            self.metadata.get("breaker"), sample_raw, flags=re.MULTILINE
        )
        pos = 0
        try:
            match_obj = next(sample_match)
            event_list = list()
            if match_obj.start() != 0:
                event_list.append(sample_raw[pos : match_obj.start()].strip())
                pos = match_obj.start()
            for _, match in enumerate(sample_match):
                event_list.append(sample_raw[pos : match.start()].strip())
                pos = match.start()
            event_list.append(sample_raw[pos:].strip())
            return event_list
        except:
            raise_warning("Invalid breaker for stanza {}".format(self.sample_name))
            return [sample_raw]

    def _sort_tokens_by_replacement_type_all(self, tokens_dict):
        """
        Return the sorted token list by replacementType=all first in list.

        Args:
            tokens_dict (dict): tokens dictionary
        """
        token_list = []
        for token in tokens_dict.items():
            if token[1]["replacementType"] == "all":
                token_list.insert(0, token)
            else:
                token_list.append(token)
        return token_list

    @staticmethod
    def populate_requirement_test_data(event):
        """
        Analyze event's datamodels, cim_fields, missing_recommended_fields, exception

        Args:
            event (dict): event data from xml file

        Return:
            requirement_test_data (dict): datamodels, cim_fields, missing_recommended_fields, exception
        """
        requirement_test_data = {}
        cim = event.get("cim")
        other_mappings = event.get("other_mappings")
        if other_mappings:
            other_fields = {}
            fields = other_mappings["field"]
            if type(fields) == list:
                for field in fields:
                    other_fields[field["@name"]] = field["@value"]
            elif type(fields) == dict:
                other_fields[fields["@name"]] = fields["@value"]
            requirement_test_data["other_fields"] = other_fields
        if cim:
            requirement_test_data["cim_version"] = cim.get("@version", "latest")
            requirement_test_data["datamodels"] = cim.get("models") or {}

            defined_fields = cim.get("cim_fields") or {}
            cim_fields = {}
            if defined_fields:
                fields = defined_fields["field"]
                if type(fields) == list:
                    for field in fields:
                        cim_fields[field["@name"]] = field["@value"]
                elif type(fields) == dict:
                    cim_fields[fields["@name"]] = fields["@value"]
            requirement_test_data["cim_fields"] = cim_fields

            missing_recommended_fields = cim.get("missing_recommended_fields") or []
            if missing_recommended_fields:
                missing_recommended_fields = (
                    missing_recommended_fields.get("field") or []
                )
                if type(missing_recommended_fields) != list:
                    missing_recommended_fields = [missing_recommended_fields]
            requirement_test_data[
                "missing_recommended_fields"
            ] = missing_recommended_fields

            defined_exceptions = cim.get("exceptions") or {}
            exceptions = {}
            if defined_exceptions:
                defined_fields = defined_exceptions["field"]
                defined_fields = (
                    defined_fields if type(defined_fields) == list else [defined_fields]
                )
                for field in defined_fields:
                    exceptions[field["@name"]] = field["@value"]
            requirement_test_data["exceptions"] = exceptions
        return requirement_test_data

break_events(sample_raw)

Break sample file into list of raw events using breaker

Parameters:

Name Type Description Default
sample_raw str

Raw sample

required
Return

event_list (list): List of raw events

Source code in pytest_splunk_addon/sample_generation/sample_stanza.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def break_events(self, sample_raw):
    """
    Break sample file into list of raw events using breaker

    Args:
        sample_raw (str): Raw sample

    Return:
        event_list (list): List of raw events
    """

    sample_match = re.finditer(
        self.metadata.get("breaker"), sample_raw, flags=re.MULTILINE
    )
    pos = 0
    try:
        match_obj = next(sample_match)
        event_list = list()
        if match_obj.start() != 0:
            event_list.append(sample_raw[pos : match_obj.start()].strip())
            pos = match_obj.start()
        for _, match in enumerate(sample_match):
            event_list.append(sample_raw[pos : match.start()].strip())
            pos = match.start()
        event_list.append(sample_raw[pos:].strip())
        return event_list
    except:
        raise_warning("Invalid breaker for stanza {}".format(self.sample_name))
        return [sample_raw]

get_eventmetadata()

Return the unique host metadata for event.

Source code in pytest_splunk_addon/sample_generation/sample_stanza.py
262
263
264
265
266
267
268
269
270
271
def get_eventmetadata(self):
    """
    Return the unique host metadata for event.
    """
    self.host_count += 1
    event_host = self.metadata.get("host") + "_" + str(self.host_count)
    event_metadata = copy.deepcopy(self.metadata)
    event_metadata.update(host=event_host)
    LOGGER.info("event metadata: {}".format(event_metadata))
    return event_metadata

get_raw_events()

Gets the raw events from the sample file.

Source code in pytest_splunk_addon/sample_generation/sample_stanza.py
58
59
60
61
62
63
def get_raw_events(self):
    """
    Gets the raw events from the sample file.
    """
    # self.sample_raw_data = list(self._get_raw_sample())
    self.tokenized_events = self._get_raw_sample()

get_tokenized_events()

Yields the tokenized events

Source code in pytest_splunk_addon/sample_generation/sample_stanza.py
65
66
67
68
69
70
71
72
73
def get_tokenized_events(self):
    """
    Yields the tokenized events
    """
    for event in self.tokenized_events:
        event.event, event.metadata, event.key_fields = SampleEvent.update_metadata(
            self, event.event, event.metadata, event.key_fields
        )
        yield event

populate_requirement_test_data(event) staticmethod

Analyze event’s datamodels, cim_fields, missing_recommended_fields, exception

Parameters:

Name Type Description Default
event dict

event data from xml file

required
Return

requirement_test_data (dict): datamodels, cim_fields, missing_recommended_fields, exception

Source code in pytest_splunk_addon/sample_generation/sample_stanza.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
@staticmethod
def populate_requirement_test_data(event):
    """
    Analyze event's datamodels, cim_fields, missing_recommended_fields, exception

    Args:
        event (dict): event data from xml file

    Return:
        requirement_test_data (dict): datamodels, cim_fields, missing_recommended_fields, exception
    """
    requirement_test_data = {}
    cim = event.get("cim")
    other_mappings = event.get("other_mappings")
    if other_mappings:
        other_fields = {}
        fields = other_mappings["field"]
        if type(fields) == list:
            for field in fields:
                other_fields[field["@name"]] = field["@value"]
        elif type(fields) == dict:
            other_fields[fields["@name"]] = fields["@value"]
        requirement_test_data["other_fields"] = other_fields
    if cim:
        requirement_test_data["cim_version"] = cim.get("@version", "latest")
        requirement_test_data["datamodels"] = cim.get("models") or {}

        defined_fields = cim.get("cim_fields") or {}
        cim_fields = {}
        if defined_fields:
            fields = defined_fields["field"]
            if type(fields) == list:
                for field in fields:
                    cim_fields[field["@name"]] = field["@value"]
            elif type(fields) == dict:
                cim_fields[fields["@name"]] = fields["@value"]
        requirement_test_data["cim_fields"] = cim_fields

        missing_recommended_fields = cim.get("missing_recommended_fields") or []
        if missing_recommended_fields:
            missing_recommended_fields = (
                missing_recommended_fields.get("field") or []
            )
            if type(missing_recommended_fields) != list:
                missing_recommended_fields = [missing_recommended_fields]
        requirement_test_data[
            "missing_recommended_fields"
        ] = missing_recommended_fields

        defined_exceptions = cim.get("exceptions") or {}
        exceptions = {}
        if defined_exceptions:
            defined_fields = defined_exceptions["field"]
            defined_fields = (
                defined_fields if type(defined_fields) == list else [defined_fields]
            )
            for field in defined_fields:
                exceptions[field["@name"]] = field["@value"]
        requirement_test_data["exceptions"] = exceptions
    return requirement_test_data

tokenize(conf_name)

Tokenizes the raw events by replacing all the tokens in it.

Parameters:

Name Type Description Default
conf_name str

Name of the conf file, “psa-data-gen”

required
Source code in pytest_splunk_addon/sample_generation/sample_stanza.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def tokenize(self, conf_name):
    """
    Tokenizes the raw events by replacing all the tokens in it.

    Args:
        conf_name (str): Name of the conf file, "psa-data-gen"
    """
    if conf_name == "eventgen":
        required_event_count = self.metadata.get("count")
    else:
        required_event_count = 1

    if (
        required_event_count is None
        or int(required_event_count) == 0
        or int(required_event_count) > BULK_EVENT_COUNT
    ):
        required_event_count = BULK_EVENT_COUNT

    bulk_event = []
    raw_event = []
    event_counter = 0
    while (int(required_event_count)) > len((bulk_event)):
        raw_event.insert(event_counter, list(self._get_raw_sample()))
        if not raw_event[-1]:
            break
        for each_rule in self.sample_rules:
            if each_rule:
                raw_event[event_counter] = each_rule.apply(raw_event[event_counter])
        for event in raw_event[event_counter]:
            host_value = event.metadata.get("host")
            host = token_value(key=host_value, value=host_value)
            event.update_requirement_test_field("host", "##host##", host)
        bulk_event.extend(raw_event[event_counter])
        event_counter = event_counter + 1

    if self.metadata.get("breaker") is not None:
        self.metadata.update(sample_count=1)
        for each in bulk_event:
            each.metadata.update(sample_count=1)

    if self.metadata.get("expected_event_count") is None:
        breaker = self.metadata.get("breaker")
        if breaker is not None:
            expected_events = 0
            for each_event in bulk_event:
                expected_events += len(
                    list(filter(lambda x: x, self.break_events(each_event.event)))
                )
        else:
            expected_events = len(bulk_event)
        self.metadata.update(expected_event_count=expected_events)
        for each in bulk_event:
            each.metadata.update(expected_event_count=expected_events)
    else:
        self.metadata.update(sample_count=1)
        for each in bulk_event:
            each.metadata.update(sample_count=1)

    self.tokenized_events = bulk_event

SampleEvent

SampleEvent

Bases: object

This class represents an event which will be ingested in Splunk.

Parameters:

Name Type Description Default
event_string str

Event content

required
metadata dict

Contains metadata for the event

required
sample_name str

Name of the file containing this event

required
Source code in pytest_splunk_addon/sample_generation/sample_event.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
class SampleEvent(object):
    """
    This class represents an event which will be ingested in Splunk.

    Args:
        event_string (str): Event content
        metadata (dict): Contains metadata for the event
        sample_name (str): Name of the file containing this event
    """

    def __init__(self, event_string, metadata, sample_name, requirement_test_data=None):
        self.event = event_string
        self.key_fields = dict()
        self.time_values = list()
        self.metadata = metadata
        self.sample_name = sample_name
        self.host_count = 0
        self.requirement_test_data = requirement_test_data

    def update(self, new_event):
        """
        This method updates the event content

        Args:
            new_event (str): Event content
        """
        LOGGER.debug("Updated the event {} with {}".format(self.event, new_event))
        self.event = new_event

    def get_host(self):
        """
        Returns a unique host value
        """
        global host_count
        host_count += 1
        LOGGER.debug(
            "Creating host value: {}-{}-{}".format(
                "host", self.sample_name, str(host_count)
            )
        )
        return "{}-{}-{}".format("host", self.sample_name, str(host_count))

    def get_field_host(self, rule):
        """
        Returns unique host value for the key fields src, dest, host, dvc

        Args:
            rule (str): Type of rule either src, host, dest, dvc
        """
        global host_count
        host_count += 1
        LOGGER.debug(
            "Creating field with value: {}-{}{}".format(rule, "sample_host", host_count)
        )
        return "{}-{}{}".format(rule, "sample_host", host_count)

    def get_field_fqdn(self, rule):
        """
        Returns unique fqdn value for the key fields src, dest, host, dvc

        Args:
            rule (str): Type of rule either src, host, dest, dvc
        """
        global fqdn_count
        fqdn_count += 1
        LOGGER.debug(
            "Creating fgdn field with value: {}_{}.{}{}.com".format(
                rule, "sample_host", "sample_domain", fqdn_count
            )
        )
        return "{}_{}.{}{}.com".format(rule, "sample_host", "sample_domain", fqdn_count)

    def get_ipv4(self, rule):
        """
        Returns Ipv4 Address as per the rule.

        Args:
            rule (str): Type of rule either src, host, dest, dvc.
            If the value is not one of the key field it will return a randomly generated Ipv4 address.
        """
        if rule == "src":
            global src_ipv4
            src_ipv4 += 1
            addr = [int(src_ipv4 / 256) % 256, src_ipv4 % 256]
            LOGGER.debug(
                "Creating ipv4 field with value: {}".format(
                    "".join(
                        [ip_rules.get(rule)["ipv4"], str(addr[0]), ".", str(addr[1])]
                    )
                )
            )
            return "".join(
                [ip_rules.get(rule)["ipv4"], str(addr[0]), ".", str(addr[1])]
            )
        elif rule == "host":
            global host_ipv4, host_ipv4_octet_count
            host_ipv4_octet_count += 1
            if host_ipv4_octet_count > 255:
                host_ipv4 += 1
                host_ipv4_octet_count = host_ipv4_octet_count % 256
            if host_ipv4 == 101:
                host_ipv4 = 51
            LOGGER.debug(
                "Creating ipv4 field with value: {}".format(
                    "".join(
                        [
                            ip_rules.get(rule)["ipv4"],
                            str(host_ipv4 % 101),
                            ".",
                            str(host_ipv4_octet_count % 256),
                        ]
                    )
                )
            )
            return "".join(
                [
                    ip_rules.get(rule)["ipv4"],
                    str(host_ipv4 % 101),
                    ".",
                    str(host_ipv4_octet_count % 256),
                ]
            )
        elif rule == "dvc":
            global dvc_ipv4, dvc_ipv4_octet_count
            dvc_ipv4 += 1
            dvc_ipv4_octet_count += 1
            LOGGER.debug(
                "Creating ipv4 field with value: {}".format(
                    "".join(
                        [
                            ip_rules.get(rule)["ipv4"],
                            str(dvc_ipv4 % 51),
                            ".",
                            str(dvc_ipv4_octet_count % 256),
                        ]
                    )
                )
            )
            return "".join(
                [
                    ip_rules.get(rule)["ipv4"],
                    str(dvc_ipv4 % 51),
                    ".",
                    str(dvc_ipv4_octet_count % 256),
                ]
            )
        elif rule == "dest":
            global dest_ipv4
            dest_ipv4 += 1
            addr = [int(dest_ipv4 / 256) % 256, dest_ipv4 % 256]
            LOGGER.debug(
                "Creating ipv4 field with value: {}".format(
                    "".join(
                        [ip_rules.get(rule)["ipv4"], str(addr[0]), ".", str(addr[1])]
                    )
                )
            )
            return "".join(
                [ip_rules.get(rule)["ipv4"], str(addr[0]), ".", str(addr[1])]
            )
        elif rule == "url":
            global url_ip_count
            url_ip_count += 1
            addr = [int(url_ip_count / 256) % 256, url_ip_count % 256]
            LOGGER.debug(
                "Creating ipv4 field with value: {}".format(
                    "".join(
                        [ip_rules.get(rule)["ip_host"], str(addr[0]), ".", str(addr[1])]
                    )
                )
            )
            return "".join(
                [ip_rules.get(rule)["ip_host"], str(addr[0]), ".", str(addr[1])]
            )
        else:
            temp_ipv4 = Faker().ipv4()
            LOGGER.debug("Creating ipv4 field with value: {}".format(temp_ipv4))
            return temp_ipv4

    def get_ipv6(self, rule):
        """
        Returns Ipv6 Address as per the rule.

        Args:
            rule (str): Type of rule either src, host, dest, dvc.
            If the value is not one of the key field it will return a randomly generated Ipv6 address.
        """
        if rule == "src":
            global src_ipv6
            ipv6 = src_ipv6 % (int("ffffffffffffffff", 16))
            src_ipv6 += 1
        elif rule == "host":
            global host_ipv6
            ipv6 = host_ipv6 % (int("ffffffffffffffff", 16))
            host_ipv6 += 1
        elif rule == "dvc":
            global dvc_ipv6
            ipv6 = dvc_ipv6 % (int("ffffffffffffffff", 16))
            dvc_ipv6 += 1
        elif rule == "dest":
            global dest_ipv6
            ipv6 = dest_ipv6 % (int("ffffffffffffffff", 16))
            dest_ipv6 += 1
        else:
            temp_ipv4 = Faker().ipv6()
            LOGGER.debug("Creating ipv6 field with value: {}".format(temp_ipv4))
            return temp_ipv4

        hex_count = hex(ipv6)
        non_zero_cnt = len(hex_count[2:])
        addr = "{}{}".format("0" * (16 - non_zero_cnt), hex_count[2:])
        LOGGER.debug(
            "Creating ipv6 field with value: {}:{}".format(
                ip_rules.get(rule)["ipv6"],
                ":".join(addr[i : i + 4] for i in range(0, len(addr), 4)),
            )
        )
        return "{}:{}".format(
            ip_rules.get(rule)["ipv6"],
            ":".join(addr[i : i + 4] for i in range(0, len(addr), 4)),
        )

    def get_token_count(self, token):
        """
        Returns the token count in event

        Args:
            token (str): Token name
        """
        return len(re.findall(token, self.event, flags=re.MULTILINE))

    def get_token_extractions_count(self, token):
        """
        Returns minimum number of occurrence count if token not found in event but is in extracted fields

        Args:
            token (str): Token name
        """
        tokens_in_extractions = 0
        if (
            self.requirement_test_data is not None
            and "cim_fields" in self.requirement_test_data.keys()
        ):
            for extracted_field in self.requirement_test_data["cim_fields"].values():
                if isinstance(extracted_field, str):
                    tokens_in_extractions += len(re.findall(token, extracted_field))
                elif isinstance(extracted_field, list):
                    for each_filed in extracted_field:
                        tokens_in_extractions += len(re.findall(token, each_filed))
        return 1 if tokens_in_extractions > 0 else 0

    def replace_token(self, token, token_values):
        """
        Replaces the token value in event

        Args:
            token (str): Token name
            token_values (list/str): Value(s) to be replaced in the token
        """
        # TODO: How to handle dependent Values with list of token_values
        if isinstance(token_values, list):
            sample_tokens = re.finditer(token, self.event, flags=re.MULTILINE)

            for _, token_value in enumerate(token_values):
                token_value = token_value.value
                match_object = next(sample_tokens)
                match_str = (
                    match_object.group(0)
                    if len(match_object.groups()) == 0
                    else match_object.group(1)
                )
                match_str = re.escape(match_str)
                self.event = re.sub(
                    match_str,
                    lambda x: str(token_value),
                    self.event,
                    1,
                    flags=re.MULTILINE,
                )
        else:
            self.event = re.sub(
                token, lambda x: str(token_values), self.event, flags=re.MULTILINE
            )

    def register_field_value(self, field, token_values):
        """
        Registers the value for the key fields in its SampleEvent object

        Args:
            field (str): Token field name
            token_values (list/str): Token value(s) which are replaced in the key fields
        """
        if field == "_time":
            time_list = (
                token_values if isinstance(token_values, list) else [token_values]
            )
            self.time_values.extend([i.key for i in time_list])
        elif field in key_fields.KEY_FIELDS:
            if isinstance(token_values, list):
                for token_value in token_values:
                    self.key_fields.setdefault(field, []).append(str(token_value.key))
            else:
                self.key_fields.setdefault(field, []).append(str(token_values.key))

    def update_requirement_test_field(self, field, token, token_values):
        if field != "_time":
            if (
                self.requirement_test_data is not None
                and "cim_fields" in self.requirement_test_data.keys()
            ):
                for cim_field, value in self.requirement_test_data[
                    "cim_fields"
                ].items():
                    if token in value:
                        if isinstance(token_values, list):
                            if len(token_values) == 1:
                                self.requirement_test_data["cim_fields"][
                                    cim_field
                                ] = value.replace(token, str(token_values[0].key))
                            else:
                                self.requirement_test_data["cim_fields"][cim_field] = [
                                    value.replace(token, str(token_value.key))
                                    for token_value in token_values
                                ]
                        else:
                            self.requirement_test_data["cim_fields"][
                                cim_field
                            ] = value.replace(token, str(token_values.key))

    def get_key_fields(self):
        """
        Returns the key field value from event
        """
        return self.key_fields

    @classmethod
    def copy(cls, event):
        """
        Copies the SampleEvent object into a new one.
        Args:
            event (SampleEvent): Event object which has to be copied

        Returns:
            Copy of the SampleEvent object
        """
        new_event = cls("", {}, "")
        new_event.__dict__ = event.__dict__.copy()
        new_event.key_fields = event.key_fields.copy()
        new_event.time_values = event.time_values[:]
        new_event.metadata = deepcopy(event.metadata)
        new_event.requirement_test_data = deepcopy(event.requirement_test_data)
        return new_event

    def update_metadata(self, event, metadata, key_fields):
        """
        Processes the syslog formated samples
        Format::

            '***SPLUNK*** source=<source> sourcetype=<sourcetype> \
            field_1       field2        field3 \
            ##value1##    ##value2##   ##value3##'

        Args:
            event (str): event string containing raw syslog data
            metadata (dict): Contains metadata for the event

        Returns:
            Syslog event and the updated metadata
        """
        try:
            if isinstance(event, str) and event.startswith("***SPLUNK***"):
                header, event = event.split("\n", 1)

                for meta_field in re.findall(r"[\w]+=[^\s]+", header):
                    field, value = meta_field.split("=")
                    if field == "host":
                        metadata[field] = f"host_{metadata[field]}"
                        key_fields["host"] = list([metadata["host"]])
                    else:
                        metadata[field] = value

            return event, metadata, key_fields

        except KeyError as error:
            LOGGER.error(f"Unexpected data found. Error: {error}")
            raise error

copy(event) classmethod

Copies the SampleEvent object into a new one. Args: event (SampleEvent): Event object which has to be copied

Returns:

Type Description

Copy of the SampleEvent object

Source code in pytest_splunk_addon/sample_generation/sample_event.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
@classmethod
def copy(cls, event):
    """
    Copies the SampleEvent object into a new one.
    Args:
        event (SampleEvent): Event object which has to be copied

    Returns:
        Copy of the SampleEvent object
    """
    new_event = cls("", {}, "")
    new_event.__dict__ = event.__dict__.copy()
    new_event.key_fields = event.key_fields.copy()
    new_event.time_values = event.time_values[:]
    new_event.metadata = deepcopy(event.metadata)
    new_event.requirement_test_data = deepcopy(event.requirement_test_data)
    return new_event

get_field_fqdn(rule)

Returns unique fqdn value for the key fields src, dest, host, dvc

Parameters:

Name Type Description Default
rule str

Type of rule either src, host, dest, dvc

required
Source code in pytest_splunk_addon/sample_generation/sample_event.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def get_field_fqdn(self, rule):
    """
    Returns unique fqdn value for the key fields src, dest, host, dvc

    Args:
        rule (str): Type of rule either src, host, dest, dvc
    """
    global fqdn_count
    fqdn_count += 1
    LOGGER.debug(
        "Creating fgdn field with value: {}_{}.{}{}.com".format(
            rule, "sample_host", "sample_domain", fqdn_count
        )
    )
    return "{}_{}.{}{}.com".format(rule, "sample_host", "sample_domain", fqdn_count)

get_field_host(rule)

Returns unique host value for the key fields src, dest, host, dvc

Parameters:

Name Type Description Default
rule str

Type of rule either src, host, dest, dvc

required
Source code in pytest_splunk_addon/sample_generation/sample_event.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def get_field_host(self, rule):
    """
    Returns unique host value for the key fields src, dest, host, dvc

    Args:
        rule (str): Type of rule either src, host, dest, dvc
    """
    global host_count
    host_count += 1
    LOGGER.debug(
        "Creating field with value: {}-{}{}".format(rule, "sample_host", host_count)
    )
    return "{}-{}{}".format(rule, "sample_host", host_count)

get_host()

Returns a unique host value

Source code in pytest_splunk_addon/sample_generation/sample_event.py
83
84
85
86
87
88
89
90
91
92
93
94
def get_host(self):
    """
    Returns a unique host value
    """
    global host_count
    host_count += 1
    LOGGER.debug(
        "Creating host value: {}-{}-{}".format(
            "host", self.sample_name, str(host_count)
        )
    )
    return "{}-{}-{}".format("host", self.sample_name, str(host_count))

get_ipv4(rule)

Returns Ipv4 Address as per the rule.

Parameters:

Name Type Description Default
rule str

Type of rule either src, host, dest, dvc.

required
Source code in pytest_splunk_addon/sample_generation/sample_event.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def get_ipv4(self, rule):
    """
    Returns Ipv4 Address as per the rule.

    Args:
        rule (str): Type of rule either src, host, dest, dvc.
        If the value is not one of the key field it will return a randomly generated Ipv4 address.
    """
    if rule == "src":
        global src_ipv4
        src_ipv4 += 1
        addr = [int(src_ipv4 / 256) % 256, src_ipv4 % 256]
        LOGGER.debug(
            "Creating ipv4 field with value: {}".format(
                "".join(
                    [ip_rules.get(rule)["ipv4"], str(addr[0]), ".", str(addr[1])]
                )
            )
        )
        return "".join(
            [ip_rules.get(rule)["ipv4"], str(addr[0]), ".", str(addr[1])]
        )
    elif rule == "host":
        global host_ipv4, host_ipv4_octet_count
        host_ipv4_octet_count += 1
        if host_ipv4_octet_count > 255:
            host_ipv4 += 1
            host_ipv4_octet_count = host_ipv4_octet_count % 256
        if host_ipv4 == 101:
            host_ipv4 = 51
        LOGGER.debug(
            "Creating ipv4 field with value: {}".format(
                "".join(
                    [
                        ip_rules.get(rule)["ipv4"],
                        str(host_ipv4 % 101),
                        ".",
                        str(host_ipv4_octet_count % 256),
                    ]
                )
            )
        )
        return "".join(
            [
                ip_rules.get(rule)["ipv4"],
                str(host_ipv4 % 101),
                ".",
                str(host_ipv4_octet_count % 256),
            ]
        )
    elif rule == "dvc":
        global dvc_ipv4, dvc_ipv4_octet_count
        dvc_ipv4 += 1
        dvc_ipv4_octet_count += 1
        LOGGER.debug(
            "Creating ipv4 field with value: {}".format(
                "".join(
                    [
                        ip_rules.get(rule)["ipv4"],
                        str(dvc_ipv4 % 51),
                        ".",
                        str(dvc_ipv4_octet_count % 256),
                    ]
                )
            )
        )
        return "".join(
            [
                ip_rules.get(rule)["ipv4"],
                str(dvc_ipv4 % 51),
                ".",
                str(dvc_ipv4_octet_count % 256),
            ]
        )
    elif rule == "dest":
        global dest_ipv4
        dest_ipv4 += 1
        addr = [int(dest_ipv4 / 256) % 256, dest_ipv4 % 256]
        LOGGER.debug(
            "Creating ipv4 field with value: {}".format(
                "".join(
                    [ip_rules.get(rule)["ipv4"], str(addr[0]), ".", str(addr[1])]
                )
            )
        )
        return "".join(
            [ip_rules.get(rule)["ipv4"], str(addr[0]), ".", str(addr[1])]
        )
    elif rule == "url":
        global url_ip_count
        url_ip_count += 1
        addr = [int(url_ip_count / 256) % 256, url_ip_count % 256]
        LOGGER.debug(
            "Creating ipv4 field with value: {}".format(
                "".join(
                    [ip_rules.get(rule)["ip_host"], str(addr[0]), ".", str(addr[1])]
                )
            )
        )
        return "".join(
            [ip_rules.get(rule)["ip_host"], str(addr[0]), ".", str(addr[1])]
        )
    else:
        temp_ipv4 = Faker().ipv4()
        LOGGER.debug("Creating ipv4 field with value: {}".format(temp_ipv4))
        return temp_ipv4

get_ipv6(rule)

Returns Ipv6 Address as per the rule.

Parameters:

Name Type Description Default
rule str

Type of rule either src, host, dest, dvc.

required
Source code in pytest_splunk_addon/sample_generation/sample_event.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def get_ipv6(self, rule):
    """
    Returns Ipv6 Address as per the rule.

    Args:
        rule (str): Type of rule either src, host, dest, dvc.
        If the value is not one of the key field it will return a randomly generated Ipv6 address.
    """
    if rule == "src":
        global src_ipv6
        ipv6 = src_ipv6 % (int("ffffffffffffffff", 16))
        src_ipv6 += 1
    elif rule == "host":
        global host_ipv6
        ipv6 = host_ipv6 % (int("ffffffffffffffff", 16))
        host_ipv6 += 1
    elif rule == "dvc":
        global dvc_ipv6
        ipv6 = dvc_ipv6 % (int("ffffffffffffffff", 16))
        dvc_ipv6 += 1
    elif rule == "dest":
        global dest_ipv6
        ipv6 = dest_ipv6 % (int("ffffffffffffffff", 16))
        dest_ipv6 += 1
    else:
        temp_ipv4 = Faker().ipv6()
        LOGGER.debug("Creating ipv6 field with value: {}".format(temp_ipv4))
        return temp_ipv4

    hex_count = hex(ipv6)
    non_zero_cnt = len(hex_count[2:])
    addr = "{}{}".format("0" * (16 - non_zero_cnt), hex_count[2:])
    LOGGER.debug(
        "Creating ipv6 field with value: {}:{}".format(
            ip_rules.get(rule)["ipv6"],
            ":".join(addr[i : i + 4] for i in range(0, len(addr), 4)),
        )
    )
    return "{}:{}".format(
        ip_rules.get(rule)["ipv6"],
        ":".join(addr[i : i + 4] for i in range(0, len(addr), 4)),
    )

get_key_fields()

Returns the key field value from event

Source code in pytest_splunk_addon/sample_generation/sample_event.py
383
384
385
386
387
def get_key_fields(self):
    """
    Returns the key field value from event
    """
    return self.key_fields

get_token_count(token)

Returns the token count in event

Parameters:

Name Type Description Default
token str

Token name

required
Source code in pytest_splunk_addon/sample_generation/sample_event.py
276
277
278
279
280
281
282
283
def get_token_count(self, token):
    """
    Returns the token count in event

    Args:
        token (str): Token name
    """
    return len(re.findall(token, self.event, flags=re.MULTILINE))

get_token_extractions_count(token)

Returns minimum number of occurrence count if token not found in event but is in extracted fields

Parameters:

Name Type Description Default
token str

Token name

required
Source code in pytest_splunk_addon/sample_generation/sample_event.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def get_token_extractions_count(self, token):
    """
    Returns minimum number of occurrence count if token not found in event but is in extracted fields

    Args:
        token (str): Token name
    """
    tokens_in_extractions = 0
    if (
        self.requirement_test_data is not None
        and "cim_fields" in self.requirement_test_data.keys()
    ):
        for extracted_field in self.requirement_test_data["cim_fields"].values():
            if isinstance(extracted_field, str):
                tokens_in_extractions += len(re.findall(token, extracted_field))
            elif isinstance(extracted_field, list):
                for each_filed in extracted_field:
                    tokens_in_extractions += len(re.findall(token, each_filed))
    return 1 if tokens_in_extractions > 0 else 0

register_field_value(field, token_values)

Registers the value for the key fields in its SampleEvent object

Parameters:

Name Type Description Default
field str

Token field name

required
token_values list / str

Token value(s) which are replaced in the key fields

required
Source code in pytest_splunk_addon/sample_generation/sample_event.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def register_field_value(self, field, token_values):
    """
    Registers the value for the key fields in its SampleEvent object

    Args:
        field (str): Token field name
        token_values (list/str): Token value(s) which are replaced in the key fields
    """
    if field == "_time":
        time_list = (
            token_values if isinstance(token_values, list) else [token_values]
        )
        self.time_values.extend([i.key for i in time_list])
    elif field in key_fields.KEY_FIELDS:
        if isinstance(token_values, list):
            for token_value in token_values:
                self.key_fields.setdefault(field, []).append(str(token_value.key))
        else:
            self.key_fields.setdefault(field, []).append(str(token_values.key))

replace_token(token, token_values)

Replaces the token value in event

Parameters:

Name Type Description Default
token str

Token name

required
token_values list / str

Value(s) to be replaced in the token

required
Source code in pytest_splunk_addon/sample_generation/sample_event.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def replace_token(self, token, token_values):
    """
    Replaces the token value in event

    Args:
        token (str): Token name
        token_values (list/str): Value(s) to be replaced in the token
    """
    # TODO: How to handle dependent Values with list of token_values
    if isinstance(token_values, list):
        sample_tokens = re.finditer(token, self.event, flags=re.MULTILINE)

        for _, token_value in enumerate(token_values):
            token_value = token_value.value
            match_object = next(sample_tokens)
            match_str = (
                match_object.group(0)
                if len(match_object.groups()) == 0
                else match_object.group(1)
            )
            match_str = re.escape(match_str)
            self.event = re.sub(
                match_str,
                lambda x: str(token_value),
                self.event,
                1,
                flags=re.MULTILINE,
            )
    else:
        self.event = re.sub(
            token, lambda x: str(token_values), self.event, flags=re.MULTILINE
        )

update(new_event)

This method updates the event content

Parameters:

Name Type Description Default
new_event str

Event content

required
Source code in pytest_splunk_addon/sample_generation/sample_event.py
73
74
75
76
77
78
79
80
81
def update(self, new_event):
    """
    This method updates the event content

    Args:
        new_event (str): Event content
    """
    LOGGER.debug("Updated the event {} with {}".format(self.event, new_event))
    self.event = new_event

update_metadata(event, metadata, key_fields)

Processes the syslog formated samples Format::

'***SPLUNK*** source=<source> sourcetype=<sourcetype>             field_1       field2        field3             ##value1##    ##value2##   ##value3##'

Parameters:

Name Type Description Default
event str

event string containing raw syslog data

required
metadata dict

Contains metadata for the event

required

Returns:

Type Description

Syslog event and the updated metadata

Source code in pytest_splunk_addon/sample_generation/sample_event.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def update_metadata(self, event, metadata, key_fields):
    """
    Processes the syslog formated samples
    Format::

        '***SPLUNK*** source=<source> sourcetype=<sourcetype> \
        field_1       field2        field3 \
        ##value1##    ##value2##   ##value3##'

    Args:
        event (str): event string containing raw syslog data
        metadata (dict): Contains metadata for the event

    Returns:
        Syslog event and the updated metadata
    """
    try:
        if isinstance(event, str) and event.startswith("***SPLUNK***"):
            header, event = event.split("\n", 1)

            for meta_field in re.findall(r"[\w]+=[^\s]+", header):
                field, value = meta_field.split("=")
                if field == "host":
                    metadata[field] = f"host_{metadata[field]}"
                    key_fields["host"] = list([metadata["host"]])
                else:
                    metadata[field] = value

        return event, metadata, key_fields

    except KeyError as error:
        LOGGER.error(f"Unexpected data found. Error: {error}")
        raise error

Rule

Provides Rules for all possible replacements for tokens.

DestPortRule

Bases: Rule

DestPortRule

Source code in pytest_splunk_addon/sample_generation/rule.py
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
class DestPortRule(Rule):
    """
    DestPortRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random port value from [80, 443, 25, 22, 21]

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        DEST_PORT = [80, 443, 25, 22, 21]
        for _ in range(token_count):
            yield self.token_value(*([choice(DEST_PORT)] * 2))

replace(sample, token_count)

Yields a random port value from [80, 443, 25, 22, 21]

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
def replace(self, sample, token_count):
    """
    Yields a random port value from [80, 443, 25, 22, 21]

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    DEST_PORT = [80, 443, 25, 22, 21]
    for _ in range(token_count):
        yield self.token_value(*([choice(DEST_PORT)] * 2))

DestRule

Bases: Rule

DestRule

Source code in pytest_splunk_addon/sample_generation/rule.py
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
class DestRule(Rule):
    """
    DestRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random dest replacement value from the list
        of values mentioned in token.
        Possible values: ["host", "ipv4", "ipv6", "fqdn"]

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        value_match = re.match(r"[dD]est(\[.*?\])", self.replacement)
        if value_match:
            value_list_str = value_match.group(1)
            value_list = eval(value_list_str)

            for _ in range(token_count):
                csv_row = self.get_rule_replacement_values(
                    sample, value_list, rule="dest"
                )
                if csv_row:
                    yield self.token_value(*([choice(csv_row)] * 2))
                else:
                    raise_warning(
                        "Invalid Value: '{}' in stanza '{}'.\n Accepted values: ['host','ipv4','ipv6','fqdn']".format(
                            self.replacement, sample.sample_name
                        )
                    )
        else:
            raise_warning(
                "Non-supported format: '{}' in stanza '{}'.\n Try  dest['host','ipv4','ipv6','fqdn']".format(
                    self.replacement, sample.sample_name
                )
            )

replace(sample, token_count)

Yields a random dest replacement value from the list of values mentioned in token. Possible values: [“host”, “ipv4”, “ipv6”, “fqdn”]

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
def replace(self, sample, token_count):
    """
    Yields a random dest replacement value from the list
    of values mentioned in token.
    Possible values: ["host", "ipv4", "ipv6", "fqdn"]

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    value_match = re.match(r"[dD]est(\[.*?\])", self.replacement)
    if value_match:
        value_list_str = value_match.group(1)
        value_list = eval(value_list_str)

        for _ in range(token_count):
            csv_row = self.get_rule_replacement_values(
                sample, value_list, rule="dest"
            )
            if csv_row:
                yield self.token_value(*([choice(csv_row)] * 2))
            else:
                raise_warning(
                    "Invalid Value: '{}' in stanza '{}'.\n Accepted values: ['host','ipv4','ipv6','fqdn']".format(
                        self.replacement, sample.sample_name
                    )
                )
    else:
        raise_warning(
            "Non-supported format: '{}' in stanza '{}'.\n Try  dest['host','ipv4','ipv6','fqdn']".format(
                self.replacement, sample.sample_name
            )
        )

DvcRule

Bases: Rule

DvcRule

Source code in pytest_splunk_addon/sample_generation/rule.py
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
class DvcRule(Rule):
    """
    DvcRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random dvc replacement value from the list
        of values mentioned in token.
        Possible values: ["host", "ipv4", "ipv6", "fqdn"]

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        value_match = re.match(r"[dD]vc(\[.*?\])", self.replacement)
        if value_match:
            value_list_str = value_match.group(1)
            value_list = eval(value_list_str)
            for _ in range(token_count):
                csv_row = self.get_rule_replacement_values(
                    sample, value_list, rule="dvc"
                )
                if csv_row:
                    yield self.token_value(*([choice(csv_row)] * 2))
                else:
                    raise_warning(
                        "Invalid Value: '{}' in stanza '{}'.\n Accepted values: ['host','ipv4','ipv6','fqdn']".format(
                            self.replacement, sample.sample_name
                        )
                    )
        else:
            raise_warning(
                "Non-supported format: '{}' in stanza '{}'.\n Try  dvc['host','ipv4','ipv6','fqdn']".format(
                    self.replacement, sample.sample_name
                )
            )

replace(sample, token_count)

Yields a random dvc replacement value from the list of values mentioned in token. Possible values: [“host”, “ipv4”, “ipv6”, “fqdn”]

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
def replace(self, sample, token_count):
    """
    Yields a random dvc replacement value from the list
    of values mentioned in token.
    Possible values: ["host", "ipv4", "ipv6", "fqdn"]

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    value_match = re.match(r"[dD]vc(\[.*?\])", self.replacement)
    if value_match:
        value_list_str = value_match.group(1)
        value_list = eval(value_list_str)
        for _ in range(token_count):
            csv_row = self.get_rule_replacement_values(
                sample, value_list, rule="dvc"
            )
            if csv_row:
                yield self.token_value(*([choice(csv_row)] * 2))
            else:
                raise_warning(
                    "Invalid Value: '{}' in stanza '{}'.\n Accepted values: ['host','ipv4','ipv6','fqdn']".format(
                        self.replacement, sample.sample_name
                    )
                )
    else:
        raise_warning(
            "Non-supported format: '{}' in stanza '{}'.\n Try  dvc['host','ipv4','ipv6','fqdn']".format(
                self.replacement, sample.sample_name
            )
        )

EmailRule

Bases: Rule

EmailRule

Source code in pytest_splunk_addon/sample_generation/rule.py
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
class EmailRule(Rule):
    """
    EmailRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random email from lookups\\user_email.csv file.

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """

        for i in range(token_count):
            if (
                hasattr(sample, "replacement_map")
                and "user" in sample.replacement_map
                and i < len(sample.replacement_map["user"])
            ):
                csv_rows = sample.replacement_map["user"]
                yield self.token_value(
                    *([csv_rows[i][self.user_header.index("email")]] * 2)
                )
            else:
                index_list, csv_row = self.get_lookup_value(
                    sample,
                    "email",
                    self.user_header,
                    ["email"],
                )
                yield self.token_value(
                    *([csv_row[self.user_header.index("email")]] * 2)
                )

replace(sample, token_count)

Yields a random email from lookups\user_email.csv file.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
def replace(self, sample, token_count):
    """
    Yields a random email from lookups\\user_email.csv file.

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """

    for i in range(token_count):
        if (
            hasattr(sample, "replacement_map")
            and "user" in sample.replacement_map
            and i < len(sample.replacement_map["user"])
        ):
            csv_rows = sample.replacement_map["user"]
            yield self.token_value(
                *([csv_rows[i][self.user_header.index("email")]] * 2)
            )
        else:
            index_list, csv_row = self.get_lookup_value(
                sample,
                "email",
                self.user_header,
                ["email"],
            )
            yield self.token_value(
                *([csv_row[self.user_header.index("email")]] * 2)
            )

FileRule

Bases: Rule

FileRule

Source code in pytest_splunk_addon/sample_generation/rule.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
class FileRule(Rule):
    """
    FileRule
    """

    every_replacement_types = []

    def replace(self, sample, token_count):
        """
        Yields the values of token by reading files.

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        relative_file_path, index = self.get_file_path()

        if index:
            try:
                index = int(index)
                for i in self.indexed_sample_file(
                    sample, relative_file_path, index, token_count
                ):
                    yield self.token_value(*([i] * 2))

            except ValueError:
                for i in self.lookupfile(
                    sample, relative_file_path, index, token_count
                ):
                    yield self.token_value(*([i] * 2))

        else:
            try:
                with open(relative_file_path) as f:
                    txt = f.read()
                    lines = [each.strip() for each in txt.split("\n") if each]
                    if (
                        self.replacement_type == "random"
                        or self.replacement_type == "file"
                    ):
                        for _ in range(token_count):
                            yield self.token_value(*([choice(lines)] * 2))
                    elif self.replacement_type == "all":
                        for each_value in lines:
                            yield self.token_value(*([each_value] * 2))
            except IOError:
                LOGGER.warning("File not found : {}".format(relative_file_path))

    def get_file_path(self):
        """
        Returns the relative sample file path and index value
        """

        if self.replacement.startswith(("file", "File")):
            sample_file_path = re.match(r"[fF]ile\[(.*?)\]", self.replacement).group(1)
        else:
            sample_file_path = self.replacement

        sample_file_path = sample_file_path.replace("/", os.sep)
        relative_file_path = self.sample_path.split(f"{os.sep}samples")[0]
        try:
            # get the relative_file_path and index value from filepath
            # mentioned in the token if the filepath matches the pattern
            # pattern like: <directory_path>/apps/<addon_name>/<file_path> or
            # pattern like:
            # <directory_path>/apps/<addon_name>/<file_path>:<index>
            _, splitter, file_path = re.search(
                r"(.*)(\\?\/?apps\\?\/?[a-zA-Z-_0-9.*]+\\?\/?)(.*)", sample_file_path
            ).groups()
            relative_file_path = os.path.join(
                relative_file_path, file_path.split(":")[0]
            )
            file_index = file_path.split(":")
            index = file_index[1] if len(file_index) > 1 else None

            if not os.path.isfile(relative_file_path):
                raise AttributeError

        except AttributeError:
            # get the relative_file_path and index value from filepath
            # mentioned in the token if the filepath matches the pattern
            # pattern like: <directory_path>/<file_path> or
            # pattern like: <directory_path>/<file_path>:<index>
            file_path = sample_file_path
            index = None
            if file_path.count(":") > 0:
                file_index = file_path.rsplit(":", 1)
                index = file_index[1] if len(file_index) > 1 else None
                file_path = file_path.rsplit(":", 1)[0]
            relative_file_path = file_path

        return relative_file_path, index

    def indexed_sample_file(self, sample, file_path, index, token_count):
        """
        Yields the column value of token by reading files.

        Args:
            sample (SampleEvent): Instance containing event info
            file_path (str): path of the file mentioned in token.
            index (int): index value mentioned in file_path i.e. <file_path>:<index>
            token_count (int): No. of token in sample event where rule is applicable
        """
        all_data = []
        try:
            with open(file_path, "r") as _file:
                selected_sample_lines = _file.readlines()
                for i in selected_sample_lines:
                    if i.strip() != "":
                        all_data.append(i.strip())

                if (
                    hasattr(sample, "replacement_map")
                    and file_path in sample.replacement_map
                ):
                    index = int(index)
                    file_values = sample.replacement_map[file_path]["data"][
                        self.file_count
                    ].split(",")
                    if sample.replacement_map[file_path].get("find_all"):
                        # if condition to increase the line no. of sample data
                        # when the replacement_type = all provided in token for indexed file
                        if self.file_count == len(all_data) - 1:
                            # reset the file count when count reaches to pick value corresponding to
                            # length of the sample data
                            self.file_count = 0
                        else:
                            self.file_count += 1
                    for _ in range(token_count):
                        yield file_values[index - 1]
                else:
                    if self.replacement_type == "all":
                        sample.__setattr__(
                            "replacement_map",
                            {file_path: {"data": all_data, "find_all": True}},
                        )
                        for i in all_data:
                            file_values = i.split(",")
                            yield file_values[index - 1]
                    else:
                        random_line = random.randint(0, len(all_data) - 1)
                        if hasattr(sample, "replacement_map"):
                            sample.replacement_map.update(
                                {file_path: {"data": [all_data[random_line]]}}
                            )
                        else:
                            sample.__setattr__(
                                "replacement_map",
                                {file_path: {"data": [all_data[random_line]]}},
                            )
                        file_values = all_data[random_line].split(",")
                        for _ in range(token_count):
                            yield file_values[index - 1]
        except IndexError:
            LOGGER.error(
                f"Index for column {index} in replacement"
                f"file {file_path} is out of bounds"
            )
        except IOError:
            LOGGER.warning("File not found : {}".format(file_path))

    def lookupfile(self, sample, file_path, index, token_count):
        """
        Yields the column value of token by reading files.

        Args:
            sample (SampleEvent): Instance containing event info
            file_path (str): path of the file mentioned in token.
            index (int): index value mentioned in file_path i.e. <file_path>:<index>
            token_count (int): No. of token in sample event where rule is applicable
        """
        all_data = []
        header = ""
        try:
            with open(file_path, "r") as _file:
                header = next(_file)
                for line in _file:
                    if line.strip() != "":
                        all_data.append(line.strip())
            for _ in range(token_count):
                if (
                    hasattr(sample, "replacement_map")
                    and file_path in sample.replacement_map
                ):
                    index = (
                        sample.replacement_map[file_path][0]
                        .strip()
                        .split(",")
                        .index(index)
                    )
                    file_values = sample.replacement_map[file_path][1].split(",")
                    for _ in range(token_count):
                        yield file_values[index]
                else:
                    if (
                        hasattr(sample, "replacement_map")
                        and file_path in sample.replacement_map
                    ):
                        sample.replacement_map[file_path].append(all_data)
                    else:
                        if (
                            self.replacement_type == "random"
                            or self.replacement_type == "file"
                        ):
                            self.file_count = random.randint(0, len(all_data) - 1)
                            sample.__setattr__(
                                "replacement_map",
                                {file_path: [header, all_data[self.file_count]]},
                            )
                            index = header.strip().split(",").index(index)
                            file_values = all_data[self.file_count].split(",")
                            for _ in range(token_count):
                                yield file_values[index]
                        else:
                            LOGGER.warning(
                                f"'replacement_type = {self.replacement_type}' is not supported for the lookup files. Please use 'random' or 'file'"
                            )
                            yield self.token
        except ValueError:
            LOGGER.error(
                "Column '%s' is not present replacement file '%s'" % (index, file_path)
            )
        except IOError:
            LOGGER.warning("File not found : {}".format(file_path))

get_file_path()

Returns the relative sample file path and index value

Source code in pytest_splunk_addon/sample_generation/rule.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def get_file_path(self):
    """
    Returns the relative sample file path and index value
    """

    if self.replacement.startswith(("file", "File")):
        sample_file_path = re.match(r"[fF]ile\[(.*?)\]", self.replacement).group(1)
    else:
        sample_file_path = self.replacement

    sample_file_path = sample_file_path.replace("/", os.sep)
    relative_file_path = self.sample_path.split(f"{os.sep}samples")[0]
    try:
        # get the relative_file_path and index value from filepath
        # mentioned in the token if the filepath matches the pattern
        # pattern like: <directory_path>/apps/<addon_name>/<file_path> or
        # pattern like:
        # <directory_path>/apps/<addon_name>/<file_path>:<index>
        _, splitter, file_path = re.search(
            r"(.*)(\\?\/?apps\\?\/?[a-zA-Z-_0-9.*]+\\?\/?)(.*)", sample_file_path
        ).groups()
        relative_file_path = os.path.join(
            relative_file_path, file_path.split(":")[0]
        )
        file_index = file_path.split(":")
        index = file_index[1] if len(file_index) > 1 else None

        if not os.path.isfile(relative_file_path):
            raise AttributeError

    except AttributeError:
        # get the relative_file_path and index value from filepath
        # mentioned in the token if the filepath matches the pattern
        # pattern like: <directory_path>/<file_path> or
        # pattern like: <directory_path>/<file_path>:<index>
        file_path = sample_file_path
        index = None
        if file_path.count(":") > 0:
            file_index = file_path.rsplit(":", 1)
            index = file_index[1] if len(file_index) > 1 else None
            file_path = file_path.rsplit(":", 1)[0]
        relative_file_path = file_path

    return relative_file_path, index

indexed_sample_file(sample, file_path, index, token_count)

Yields the column value of token by reading files.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
file_path str

path of the file mentioned in token.

required
index int

index value mentioned in file_path i.e. :

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def indexed_sample_file(self, sample, file_path, index, token_count):
    """
    Yields the column value of token by reading files.

    Args:
        sample (SampleEvent): Instance containing event info
        file_path (str): path of the file mentioned in token.
        index (int): index value mentioned in file_path i.e. <file_path>:<index>
        token_count (int): No. of token in sample event where rule is applicable
    """
    all_data = []
    try:
        with open(file_path, "r") as _file:
            selected_sample_lines = _file.readlines()
            for i in selected_sample_lines:
                if i.strip() != "":
                    all_data.append(i.strip())

            if (
                hasattr(sample, "replacement_map")
                and file_path in sample.replacement_map
            ):
                index = int(index)
                file_values = sample.replacement_map[file_path]["data"][
                    self.file_count
                ].split(",")
                if sample.replacement_map[file_path].get("find_all"):
                    # if condition to increase the line no. of sample data
                    # when the replacement_type = all provided in token for indexed file
                    if self.file_count == len(all_data) - 1:
                        # reset the file count when count reaches to pick value corresponding to
                        # length of the sample data
                        self.file_count = 0
                    else:
                        self.file_count += 1
                for _ in range(token_count):
                    yield file_values[index - 1]
            else:
                if self.replacement_type == "all":
                    sample.__setattr__(
                        "replacement_map",
                        {file_path: {"data": all_data, "find_all": True}},
                    )
                    for i in all_data:
                        file_values = i.split(",")
                        yield file_values[index - 1]
                else:
                    random_line = random.randint(0, len(all_data) - 1)
                    if hasattr(sample, "replacement_map"):
                        sample.replacement_map.update(
                            {file_path: {"data": [all_data[random_line]]}}
                        )
                    else:
                        sample.__setattr__(
                            "replacement_map",
                            {file_path: {"data": [all_data[random_line]]}},
                        )
                    file_values = all_data[random_line].split(",")
                    for _ in range(token_count):
                        yield file_values[index - 1]
    except IndexError:
        LOGGER.error(
            f"Index for column {index} in replacement"
            f"file {file_path} is out of bounds"
        )
    except IOError:
        LOGGER.warning("File not found : {}".format(file_path))

lookupfile(sample, file_path, index, token_count)

Yields the column value of token by reading files.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
file_path str

path of the file mentioned in token.

required
index int

index value mentioned in file_path i.e. :

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
def lookupfile(self, sample, file_path, index, token_count):
    """
    Yields the column value of token by reading files.

    Args:
        sample (SampleEvent): Instance containing event info
        file_path (str): path of the file mentioned in token.
        index (int): index value mentioned in file_path i.e. <file_path>:<index>
        token_count (int): No. of token in sample event where rule is applicable
    """
    all_data = []
    header = ""
    try:
        with open(file_path, "r") as _file:
            header = next(_file)
            for line in _file:
                if line.strip() != "":
                    all_data.append(line.strip())
        for _ in range(token_count):
            if (
                hasattr(sample, "replacement_map")
                and file_path in sample.replacement_map
            ):
                index = (
                    sample.replacement_map[file_path][0]
                    .strip()
                    .split(",")
                    .index(index)
                )
                file_values = sample.replacement_map[file_path][1].split(",")
                for _ in range(token_count):
                    yield file_values[index]
            else:
                if (
                    hasattr(sample, "replacement_map")
                    and file_path in sample.replacement_map
                ):
                    sample.replacement_map[file_path].append(all_data)
                else:
                    if (
                        self.replacement_type == "random"
                        or self.replacement_type == "file"
                    ):
                        self.file_count = random.randint(0, len(all_data) - 1)
                        sample.__setattr__(
                            "replacement_map",
                            {file_path: [header, all_data[self.file_count]]},
                        )
                        index = header.strip().split(",").index(index)
                        file_values = all_data[self.file_count].split(",")
                        for _ in range(token_count):
                            yield file_values[index]
                    else:
                        LOGGER.warning(
                            f"'replacement_type = {self.replacement_type}' is not supported for the lookup files. Please use 'random' or 'file'"
                        )
                        yield self.token
    except ValueError:
        LOGGER.error(
            "Column '%s' is not present replacement file '%s'" % (index, file_path)
        )
    except IOError:
        LOGGER.warning("File not found : {}".format(file_path))

replace(sample, token_count)

Yields the values of token by reading files.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def replace(self, sample, token_count):
    """
    Yields the values of token by reading files.

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    relative_file_path, index = self.get_file_path()

    if index:
        try:
            index = int(index)
            for i in self.indexed_sample_file(
                sample, relative_file_path, index, token_count
            ):
                yield self.token_value(*([i] * 2))

        except ValueError:
            for i in self.lookupfile(
                sample, relative_file_path, index, token_count
            ):
                yield self.token_value(*([i] * 2))

    else:
        try:
            with open(relative_file_path) as f:
                txt = f.read()
                lines = [each.strip() for each in txt.split("\n") if each]
                if (
                    self.replacement_type == "random"
                    or self.replacement_type == "file"
                ):
                    for _ in range(token_count):
                        yield self.token_value(*([choice(lines)] * 2))
                elif self.replacement_type == "all":
                    for each_value in lines:
                        yield self.token_value(*([each_value] * 2))
        except IOError:
            LOGGER.warning("File not found : {}".format(relative_file_path))

FloatRule

Bases: Rule

FloatRule

Source code in pytest_splunk_addon/sample_generation/rule.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
class FloatRule(Rule):
    """
    FloatRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random float no. between the range mentioned in token.

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        float_match = re.match(r"[Ff]loat\[(-?[\d\.]+):(-?[\d\.]+)\]", self.replacement)
        if float_match:
            lower_limit, upper_limit = float_match.groups()
            precision = re.search("\[-?\d+\.?(\d*):", self.replacement).group(1)
            if not precision:
                precision = str(1)
            for _ in range(token_count):
                yield self.token_value(
                    *(
                        [
                            round(
                                uniform(float(lower_limit), float(upper_limit)),
                                len(precision),
                            )
                        ]
                        * 2
                    )
                )
        else:
            raise_warning(
                "Non-supported format: '{}' in stanza '{}'.\n i.e float[0.00:70.00]".format(
                    self.replacement, sample.sample_name
                )
            )

replace(sample, token_count)

Yields a random float no. between the range mentioned in token.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def replace(self, sample, token_count):
    """
    Yields a random float no. between the range mentioned in token.

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    float_match = re.match(r"[Ff]loat\[(-?[\d\.]+):(-?[\d\.]+)\]", self.replacement)
    if float_match:
        lower_limit, upper_limit = float_match.groups()
        precision = re.search("\[-?\d+\.?(\d*):", self.replacement).group(1)
        if not precision:
            precision = str(1)
        for _ in range(token_count):
            yield self.token_value(
                *(
                    [
                        round(
                            uniform(float(lower_limit), float(upper_limit)),
                            len(precision),
                        )
                    ]
                    * 2
                )
            )
    else:
        raise_warning(
            "Non-supported format: '{}' in stanza '{}'.\n i.e float[0.00:70.00]".format(
                self.replacement, sample.sample_name
            )
        )

GuidRule

Bases: Rule

GuidRule

Source code in pytest_splunk_addon/sample_generation/rule.py
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
class GuidRule(Rule):
    """
    GuidRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random guid.

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        for _ in range(token_count):
            yield self.token_value(*([str(uuid.uuid4())] * 2))

replace(sample, token_count)

Yields a random guid.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
780
781
782
783
784
785
786
787
788
789
def replace(self, sample, token_count):
    """
    Yields a random guid.

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    for _ in range(token_count):
        yield self.token_value(*([str(uuid.uuid4())] * 2))

HexRule

Bases: Rule

HexRule

Source code in pytest_splunk_addon/sample_generation/rule.py
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
class HexRule(Rule):
    """
    HexRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random hex value.

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        hex_match = re.match(r"[Hh]ex\((.*?)\)", self.replacement)
        if hex_match:
            hex_range = hex_match.group(1)
            if hex_range.isnumeric():
                hex_digits = [
                    "0",
                    "1",
                    "2",
                    "3",
                    "4",
                    "5",
                    "6",
                    "7",
                    "8",
                    "9",
                    "a",
                    "b",
                    "c",
                    "d",
                    "e",
                    "f",
                ]
                hex_array = []
                for _ in range(token_count):
                    for i in range(int(hex_range)):
                        hex_array.append(hex_digits[randint(0, 15)])
                    hex_value = "".join(hex_array)
                    yield self.token_value(*([hex_value] * 2))
            else:
                raise_warning(
                    "Invalid Value: '{}' in stanza '{}'.\n '{}' is not an integer value".format(
                        self.replacement, sample.sample_name, hex_range
                    )
                )
        else:
            raise_warning(
                "Invalid Hex value: '{}' in stanza '{}'. Try hex(<i>) where i is an integer".format(
                    self.replacement, sample.sample_name
                )
            )

replace(sample, token_count)

Yields a random hex value.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
def replace(self, sample, token_count):
    """
    Yields a random hex value.

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    hex_match = re.match(r"[Hh]ex\((.*?)\)", self.replacement)
    if hex_match:
        hex_range = hex_match.group(1)
        if hex_range.isnumeric():
            hex_digits = [
                "0",
                "1",
                "2",
                "3",
                "4",
                "5",
                "6",
                "7",
                "8",
                "9",
                "a",
                "b",
                "c",
                "d",
                "e",
                "f",
            ]
            hex_array = []
            for _ in range(token_count):
                for i in range(int(hex_range)):
                    hex_array.append(hex_digits[randint(0, 15)])
                hex_value = "".join(hex_array)
                yield self.token_value(*([hex_value] * 2))
        else:
            raise_warning(
                "Invalid Value: '{}' in stanza '{}'.\n '{}' is not an integer value".format(
                    self.replacement, sample.sample_name, hex_range
                )
            )
    else:
        raise_warning(
            "Invalid Hex value: '{}' in stanza '{}'. Try hex(<i>) where i is an integer".format(
                self.replacement, sample.sample_name
            )
        )

HostRule

Bases: Rule

HostRule

Source code in pytest_splunk_addon/sample_generation/rule.py
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
class HostRule(Rule):
    """
    HostRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random host replacement value from the list
        of values mentioned in token.
        Possible values: ["host", "ipv4", "ipv6", "fqdn"]

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        value_match = re.match(r"[hH]ost(\[.*?\])", self.replacement)
        if value_match:
            value_list_str = value_match.group(1)
            value_list = eval(value_list_str)
            for _ in range(token_count):
                csv_row = self.get_rule_replacement_values(
                    sample, value_list, rule="host"
                )
                if csv_row:
                    if "host" in value_list:
                        if sample.metadata.get("input_type") in [
                            "modinput",
                            "windows_input",
                            "syslog_tcp",
                            "syslog_udp",
                        ]:
                            csv_row[0] = sample.metadata.get("host")
                        elif sample.metadata.get("input_type") in [
                            "file_monitor",
                            "scripted_input",
                            "default",
                        ]:
                            csv_row[0] = sample.get_host()
                    yield self.token_value(*([choice(csv_row)] * 2))
                else:
                    raise_warning(
                        "Invalid Value: '{}' in stanza '{}'.\n Accepted values: ['host','ipv4','ipv6','fqdn']".format(
                            self.replacement, sample.sample_name
                        )
                    )
        else:
            raise_warning(
                "Non-supported format: '{}' in stanza '{}'.\n Try  host['host','ipv4','ipv6','fqdn']".format(
                    self.replacement, sample.sample_name
                )
            )

replace(sample, token_count)

Yields a random host replacement value from the list of values mentioned in token. Possible values: [“host”, “ipv4”, “ipv6”, “fqdn”]

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
def replace(self, sample, token_count):
    """
    Yields a random host replacement value from the list
    of values mentioned in token.
    Possible values: ["host", "ipv4", "ipv6", "fqdn"]

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    value_match = re.match(r"[hH]ost(\[.*?\])", self.replacement)
    if value_match:
        value_list_str = value_match.group(1)
        value_list = eval(value_list_str)
        for _ in range(token_count):
            csv_row = self.get_rule_replacement_values(
                sample, value_list, rule="host"
            )
            if csv_row:
                if "host" in value_list:
                    if sample.metadata.get("input_type") in [
                        "modinput",
                        "windows_input",
                        "syslog_tcp",
                        "syslog_udp",
                    ]:
                        csv_row[0] = sample.metadata.get("host")
                    elif sample.metadata.get("input_type") in [
                        "file_monitor",
                        "scripted_input",
                        "default",
                    ]:
                        csv_row[0] = sample.get_host()
                yield self.token_value(*([choice(csv_row)] * 2))
            else:
                raise_warning(
                    "Invalid Value: '{}' in stanza '{}'.\n Accepted values: ['host','ipv4','ipv6','fqdn']".format(
                        self.replacement, sample.sample_name
                    )
                )
    else:
        raise_warning(
            "Non-supported format: '{}' in stanza '{}'.\n Try  host['host','ipv4','ipv6','fqdn']".format(
                self.replacement, sample.sample_name
            )
        )

IntRule

Bases: Rule

IntRule

Source code in pytest_splunk_addon/sample_generation/rule.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
class IntRule(Rule):
    """
    IntRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random int between the range mentioned in token.

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        limits_match = re.match(r"[Ii]nteger\[(-?\d+):(-?\d+)\]", self.replacement)
        if limits_match:
            lower_limit, upper_limit = limits_match.groups()
            if self.replacement_type == "random":
                for _ in range(token_count):
                    yield self.token_value(
                        *([randint(int(lower_limit), int(upper_limit))] * 2)
                    )
            else:
                for each_int in range(int(lower_limit), int(upper_limit)):
                    yield self.token_value(*([str(each_int)] * 2))
        else:
            raise_warning(
                "Non-supported format: '{}' in stanza '{}'.\n Try integer[0:10]".format(
                    self.replacement, sample.sample_name
                )
            )

replace(sample, token_count)

Yields a random int between the range mentioned in token.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def replace(self, sample, token_count):
    """
    Yields a random int between the range mentioned in token.

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    limits_match = re.match(r"[Ii]nteger\[(-?\d+):(-?\d+)\]", self.replacement)
    if limits_match:
        lower_limit, upper_limit = limits_match.groups()
        if self.replacement_type == "random":
            for _ in range(token_count):
                yield self.token_value(
                    *([randint(int(lower_limit), int(upper_limit))] * 2)
                )
        else:
            for each_int in range(int(lower_limit), int(upper_limit)):
                yield self.token_value(*([str(each_int)] * 2))
    else:
        raise_warning(
            "Non-supported format: '{}' in stanza '{}'.\n Try integer[0:10]".format(
                self.replacement, sample.sample_name
            )
        )

Ipv4Rule

Bases: Rule

Ipv4Rule

Source code in pytest_splunk_addon/sample_generation/rule.py
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
class Ipv4Rule(Rule):
    """
    Ipv4Rule
    """

    def replace(self, sample, token_count):
        """
        Yields a random ipv4 address.

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        for _ in range(token_count):
            yield self.token_value(*([self.fake.ipv4()] * 2))

replace(sample, token_count)

Yields a random ipv4 address.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
729
730
731
732
733
734
735
736
737
738
def replace(self, sample, token_count):
    """
    Yields a random ipv4 address.

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    for _ in range(token_count):
        yield self.token_value(*([self.fake.ipv4()] * 2))

Ipv6Rule

Bases: Rule

Ipv6Rule

Source code in pytest_splunk_addon/sample_generation/rule.py
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
class Ipv6Rule(Rule):
    """
    Ipv6Rule
    """

    def replace(self, sample, token_count):
        """
        Yields a random ipv6 address

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        for _ in range(token_count):
            yield self.token_value(*([self.fake.ipv6()] * 2))

replace(sample, token_count)

Yields a random ipv6 address

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
746
747
748
749
750
751
752
753
754
755
def replace(self, sample, token_count):
    """
    Yields a random ipv6 address

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    for _ in range(token_count):
        yield self.token_value(*([self.fake.ipv6()] * 2))

ListRule

Bases: Rule

ListRule

Source code in pytest_splunk_addon/sample_generation/rule.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
class ListRule(Rule):
    """
    ListRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random value from the list mentioned in token.

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        value_match = re.match(r"[lL]ist(\[.*?\])", self.replacement)
        if value_match:
            value_list_str = value_match.group(1)
            value_list = eval(value_list_str)

            if self.replacement_type == "random":
                for _ in range(token_count):
                    yield self.token_value(*([str(choice(value_list))] * 2))
            else:
                for each_value in value_list:
                    yield self.token_value(*([str(each_value)] * 2))
        else:
            raise_warning(
                "Non-supported format: '{}' in stanza '{}'.\n Try  list['value1','value2']".format(
                    self.replacement, sample.sample_name
                )
            )

replace(sample, token_count)

Yields a random value from the list mentioned in token.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
def replace(self, sample, token_count):
    """
    Yields a random value from the list mentioned in token.

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    value_match = re.match(r"[lL]ist(\[.*?\])", self.replacement)
    if value_match:
        value_list_str = value_match.group(1)
        value_list = eval(value_list_str)

        if self.replacement_type == "random":
            for _ in range(token_count):
                yield self.token_value(*([str(choice(value_list))] * 2))
        else:
            for each_value in value_list:
                yield self.token_value(*([str(each_value)] * 2))
    else:
        raise_warning(
            "Non-supported format: '{}' in stanza '{}'.\n Try  list['value1','value2']".format(
                self.replacement, sample.sample_name
            )
        )

MacRule

Bases: Rule

MacRule

Source code in pytest_splunk_addon/sample_generation/rule.py
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
class MacRule(Rule):
    """
    MacRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random mac address

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        for _ in range(token_count):
            yield self.token_value(*([self.fake.mac_address()] * 2))

replace(sample, token_count)

Yields a random mac address

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
763
764
765
766
767
768
769
770
771
772
def replace(self, sample, token_count):
    """
    Yields a random mac address

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    for _ in range(token_count):
        yield self.token_value(*([self.fake.mac_address()] * 2))

Rule

Base class for all the rules.

Parameters:

Name Type Description Default
token dict

Dictionary containing token and its data

required
psa_data_params dict

PSA data stanzas dictionary

None
sample_path str

Path to the samples directory

None
Source code in pytest_splunk_addon/sample_generation/rule.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class Rule:
    """
    Base class for all the rules.

    Args:
        token (dict): Dictionary containing token and its data
        psa_data_params (dict): PSA data stanzas dictionary
        sample_path (str): Path to the samples directory
    """

    user_header = ["name", "email", "domain_user", "distinquised_name"]
    src_header = ["host", "ipv4", "ipv6", "fqdn"]
    token_value = namedtuple("token_value", ["key", "value"])

    def __init__(self, token, psa_data_params=None, sample_path=None):
        self.token = token["token"]
        self.replacement = token["replacement"]
        self.replacement_type = token["replacementType"]
        self.field = token.get("field", self.token.strip("#"))
        self.psa_data_params = psa_data_params
        self.sample_path = sample_path
        self.fake = Faker()
        self.file_count = 0

    @classmethod
    def parse_rule(cls, token, psa_data_params, sample_path):
        """
        Returns appropriate Rule object as per replacement type of token.

        Args:
            token (dict): Dictionary containing token and its data
            psa_data_params (dict): PSA data stanzas dictionary
            sample_path (str): Path to the samples directory
        """
        rule_book = {
            "integer": IntRule,
            "list": ListRule,
            "ipv4": Ipv4Rule,
            "float": FloatRule,
            "ipv6": Ipv6Rule,
            "mac": MacRule,
            "file": FileRule,
            "url": UrlRule,
            "user": UserRule,
            "email": EmailRule,
            "host": HostRule,
            "hex": HexRule,
            "src_port": SrcPortRule,
            "dest_port": DestPortRule,
            "src": SrcRule,
            "dest": DestRule,
            "dvc": DvcRule,
            "guid": GuidRule,
        }
        rule_all_support = ["integer", "list", "file"]
        LOGGER.info(
            "The replacement type given is: '{}' for token:'{}'".format(
                token.get("replacementType"), token.get("token")
            )
        )
        if token.get("replacementType") not in [
            "static",
            "all",
            "random",
            "timestamp",
            "mvfile",
            "file",
        ]:
            raise_warning(
                "Invalid replacementType: '{}' for token:'{}' using 'random' as replacementType".format(
                    token.get("replacementType"), token.get("token")
                )
            )
            token["replacement"] = "random"
        replacement_type = token["replacementType"]
        replacement = token["replacement"]
        if replacement_type == "static":
            return StaticRule(token)
        elif replacement_type == "timestamp":
            return TimeRule(token, psa_data_params)
        elif replacement_type == "random" or replacement_type == "all":
            for each_rule in rule_book:
                if replacement.lower().startswith(each_rule):
                    if replacement_type == "all" and each_rule not in rule_all_support:
                        token["replacementType"] = "random"
                        LOGGER.warning(
                            "replacement_type=all is not supported for {} rule applied to {} token.".format(
                                each_rule, token.get("token")
                            )
                        )
                        warnings.warn(
                            UserWarning(
                                "replacement_type=all is not supported for {} rule applied to {} token.".format(
                                    each_rule, token.get("token")
                                )
                            )
                        )
                    return rule_book[each_rule](token, sample_path=sample_path)
        elif replacement_type == "file" or replacement_type == "mvfile":
            return FileRule(token, sample_path=sample_path)

    def apply(self, events):
        """
        Replaces the token with appropriate values as per rules mapped with the tokens in the event.
        For replacement_type = all it will generate an event for each replacement value.
        i.e. integer[1:50] => will generate 50 events

        Args:
            events (list): List of events(SampleEvent)
        """
        new_events = []
        for each_event in events:
            token_count = each_event.get_token_count(
                self.token
            ) or each_event.get_token_extractions_count(self.token)
            token_values = list(self.replace(each_event, token_count))
            if token_count > 0:
                if self.replacement_type == "all":
                    # NOTE: If replacement_type is all and same token is more than
                    #       one time in event then replace all tokens with same
                    #       value in that event
                    for each_token_value in token_values:
                        new_event = SampleEvent.copy(each_event)
                        global event_host_count
                        event_host_count += 1
                        host = (
                            each_event.metadata.get("host")
                            .replace("_", "-")
                            .replace(".", "-")
                        )
                        host_split = host.split("-")
                        if re.match("\d+", host_split[-1]):
                            host = "-".join(host_split[:-1])
                        new_event.metadata["host"] = "{}-{}".format(
                            host, event_host_count
                        )
                        new_event.metadata["id"] = "{}_{}".format(
                            each_event.sample_name,
                            event_host_count,
                        )
                        new_event.replace_token(self.token, each_token_value.value)
                        new_event.register_field_value(self.field, each_token_value)
                        new_event.update_requirement_test_field(
                            self.field, self.token, each_token_value
                        )
                        new_events.append(new_event)
                else:
                    each_event.replace_token(self.token, token_values)

                    if not (
                        each_event.metadata.get("timestamp_type") != "event"
                        and self.field == "_time"
                    ):
                        each_event.register_field_value(self.field, token_values)
                        each_event.update_requirement_test_field(
                            self.field, self.token, token_values
                        )
                    new_events.append(each_event)
            else:
                new_events.append(each_event)
        return new_events

    def get_lookup_value(self, sample, key, headers, value_list):
        """
        Common method to read csv and get a random row.

        Args:
            sample (SampleEvent): Instance containing event info
            key (str): fieldname i.e. host, src, user, dvc etc
            headers (list): Headers of csv file in list format
            value_list (list): list of replacement values mentioned in configuration file.

        Returns:
            index_list (list): list of mapped columns(int) as per value_list
            csv_row (list): list of replacement values for the rule.
        """
        csv_row = []
        global user_email_count
        user_email_count += 1
        name = "user{}".format(user_email_count)
        email = "user{}@email.com".format(user_email_count)
        domain_user = r"sample_domain.com\user{}".format(user_email_count)
        distinguished_name = "CN=user{}".format(user_email_count)
        csv_row.extend([name, email, domain_user, distinguished_name])
        index_list = [i for i, item in enumerate(headers) if item in value_list]
        if hasattr(sample, "replacement_map") and key in sample.replacement_map:
            sample.replacement_map[key].append(csv_row)
        else:
            sample.__setattr__("replacement_map", {key: [csv_row]})
        return index_list, csv_row

    def get_rule_replacement_values(self, sample, value_list, rule):
        """
        Common method for replacement values of
        SrcRule, Destrule, DvcRule, HostRule.

        Args:
            sample (SampleEvent): Instance containing event info
            value_list (list): list of replacement values mentioned in configuration file.
            rule (str): fieldname i.e. host, src, user, dvc etc

        Returns:
            index_list (list): list of mapped columns(int) as per value_list
            csv_row (list): list of replacement values for the rule.
        """
        csv_row = []
        for each in value_list:
            if each == "host":
                csv_row.append(sample.get_field_host(rule))
            elif each == "ipv4":
                csv_row.append(sample.get_ipv4(rule))
            elif each == "ipv6":
                csv_row.append(sample.get_ipv6(rule))
            elif each == "fqdn":
                csv_row.append(sample.get_field_fqdn(rule))
        return csv_row

    @staticmethod
    def clean_rules():
        global event_host_count
        event_host_count = 0

apply(events)

Replaces the token with appropriate values as per rules mapped with the tokens in the event. For replacement_type = all it will generate an event for each replacement value. i.e. integer[1:50] => will generate 50 events

Parameters:

Name Type Description Default
events list

List of events(SampleEvent)

required
Source code in pytest_splunk_addon/sample_generation/rule.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def apply(self, events):
    """
    Replaces the token with appropriate values as per rules mapped with the tokens in the event.
    For replacement_type = all it will generate an event for each replacement value.
    i.e. integer[1:50] => will generate 50 events

    Args:
        events (list): List of events(SampleEvent)
    """
    new_events = []
    for each_event in events:
        token_count = each_event.get_token_count(
            self.token
        ) or each_event.get_token_extractions_count(self.token)
        token_values = list(self.replace(each_event, token_count))
        if token_count > 0:
            if self.replacement_type == "all":
                # NOTE: If replacement_type is all and same token is more than
                #       one time in event then replace all tokens with same
                #       value in that event
                for each_token_value in token_values:
                    new_event = SampleEvent.copy(each_event)
                    global event_host_count
                    event_host_count += 1
                    host = (
                        each_event.metadata.get("host")
                        .replace("_", "-")
                        .replace(".", "-")
                    )
                    host_split = host.split("-")
                    if re.match("\d+", host_split[-1]):
                        host = "-".join(host_split[:-1])
                    new_event.metadata["host"] = "{}-{}".format(
                        host, event_host_count
                    )
                    new_event.metadata["id"] = "{}_{}".format(
                        each_event.sample_name,
                        event_host_count,
                    )
                    new_event.replace_token(self.token, each_token_value.value)
                    new_event.register_field_value(self.field, each_token_value)
                    new_event.update_requirement_test_field(
                        self.field, self.token, each_token_value
                    )
                    new_events.append(new_event)
            else:
                each_event.replace_token(self.token, token_values)

                if not (
                    each_event.metadata.get("timestamp_type") != "event"
                    and self.field == "_time"
                ):
                    each_event.register_field_value(self.field, token_values)
                    each_event.update_requirement_test_field(
                        self.field, self.token, token_values
                    )
                new_events.append(each_event)
        else:
            new_events.append(each_event)
    return new_events

get_lookup_value(sample, key, headers, value_list)

Common method to read csv and get a random row.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
key str

fieldname i.e. host, src, user, dvc etc

required
headers list

Headers of csv file in list format

required
value_list list

list of replacement values mentioned in configuration file.

required

Returns:

Name Type Description
index_list list

list of mapped columns(int) as per value_list

csv_row list

list of replacement values for the rule.

Source code in pytest_splunk_addon/sample_generation/rule.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def get_lookup_value(self, sample, key, headers, value_list):
    """
    Common method to read csv and get a random row.

    Args:
        sample (SampleEvent): Instance containing event info
        key (str): fieldname i.e. host, src, user, dvc etc
        headers (list): Headers of csv file in list format
        value_list (list): list of replacement values mentioned in configuration file.

    Returns:
        index_list (list): list of mapped columns(int) as per value_list
        csv_row (list): list of replacement values for the rule.
    """
    csv_row = []
    global user_email_count
    user_email_count += 1
    name = "user{}".format(user_email_count)
    email = "user{}@email.com".format(user_email_count)
    domain_user = r"sample_domain.com\user{}".format(user_email_count)
    distinguished_name = "CN=user{}".format(user_email_count)
    csv_row.extend([name, email, domain_user, distinguished_name])
    index_list = [i for i, item in enumerate(headers) if item in value_list]
    if hasattr(sample, "replacement_map") and key in sample.replacement_map:
        sample.replacement_map[key].append(csv_row)
    else:
        sample.__setattr__("replacement_map", {key: [csv_row]})
    return index_list, csv_row

get_rule_replacement_values(sample, value_list, rule)

Common method for replacement values of SrcRule, Destrule, DvcRule, HostRule.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
value_list list

list of replacement values mentioned in configuration file.

required
rule str

fieldname i.e. host, src, user, dvc etc

required

Returns:

Name Type Description
index_list list

list of mapped columns(int) as per value_list

csv_row list

list of replacement values for the rule.

Source code in pytest_splunk_addon/sample_generation/rule.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def get_rule_replacement_values(self, sample, value_list, rule):
    """
    Common method for replacement values of
    SrcRule, Destrule, DvcRule, HostRule.

    Args:
        sample (SampleEvent): Instance containing event info
        value_list (list): list of replacement values mentioned in configuration file.
        rule (str): fieldname i.e. host, src, user, dvc etc

    Returns:
        index_list (list): list of mapped columns(int) as per value_list
        csv_row (list): list of replacement values for the rule.
    """
    csv_row = []
    for each in value_list:
        if each == "host":
            csv_row.append(sample.get_field_host(rule))
        elif each == "ipv4":
            csv_row.append(sample.get_ipv4(rule))
        elif each == "ipv6":
            csv_row.append(sample.get_ipv6(rule))
        elif each == "fqdn":
            csv_row.append(sample.get_field_fqdn(rule))
    return csv_row

parse_rule(token, psa_data_params, sample_path) classmethod

Returns appropriate Rule object as per replacement type of token.

Parameters:

Name Type Description Default
token dict

Dictionary containing token and its data

required
psa_data_params dict

PSA data stanzas dictionary

required
sample_path str

Path to the samples directory

required
Source code in pytest_splunk_addon/sample_generation/rule.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@classmethod
def parse_rule(cls, token, psa_data_params, sample_path):
    """
    Returns appropriate Rule object as per replacement type of token.

    Args:
        token (dict): Dictionary containing token and its data
        psa_data_params (dict): PSA data stanzas dictionary
        sample_path (str): Path to the samples directory
    """
    rule_book = {
        "integer": IntRule,
        "list": ListRule,
        "ipv4": Ipv4Rule,
        "float": FloatRule,
        "ipv6": Ipv6Rule,
        "mac": MacRule,
        "file": FileRule,
        "url": UrlRule,
        "user": UserRule,
        "email": EmailRule,
        "host": HostRule,
        "hex": HexRule,
        "src_port": SrcPortRule,
        "dest_port": DestPortRule,
        "src": SrcRule,
        "dest": DestRule,
        "dvc": DvcRule,
        "guid": GuidRule,
    }
    rule_all_support = ["integer", "list", "file"]
    LOGGER.info(
        "The replacement type given is: '{}' for token:'{}'".format(
            token.get("replacementType"), token.get("token")
        )
    )
    if token.get("replacementType") not in [
        "static",
        "all",
        "random",
        "timestamp",
        "mvfile",
        "file",
    ]:
        raise_warning(
            "Invalid replacementType: '{}' for token:'{}' using 'random' as replacementType".format(
                token.get("replacementType"), token.get("token")
            )
        )
        token["replacement"] = "random"
    replacement_type = token["replacementType"]
    replacement = token["replacement"]
    if replacement_type == "static":
        return StaticRule(token)
    elif replacement_type == "timestamp":
        return TimeRule(token, psa_data_params)
    elif replacement_type == "random" or replacement_type == "all":
        for each_rule in rule_book:
            if replacement.lower().startswith(each_rule):
                if replacement_type == "all" and each_rule not in rule_all_support:
                    token["replacementType"] = "random"
                    LOGGER.warning(
                        "replacement_type=all is not supported for {} rule applied to {} token.".format(
                            each_rule, token.get("token")
                        )
                    )
                    warnings.warn(
                        UserWarning(
                            "replacement_type=all is not supported for {} rule applied to {} token.".format(
                                each_rule, token.get("token")
                            )
                        )
                    )
                return rule_book[each_rule](token, sample_path=sample_path)
    elif replacement_type == "file" or replacement_type == "mvfile":
        return FileRule(token, sample_path=sample_path)

SrcPortRule

Bases: Rule

SrcPortRule

Source code in pytest_splunk_addon/sample_generation/rule.py
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
class SrcPortRule(Rule):
    """
    SrcPortRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random port value from the range 4000-5000

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        for _ in range(token_count):
            yield self.token_value(*([randint(4000, 5000)] * 2))

replace(sample, token_count)

Yields a random port value from the range 4000-5000

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
def replace(self, sample, token_count):
    """
    Yields a random port value from the range 4000-5000

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    for _ in range(token_count):
        yield self.token_value(*([randint(4000, 5000)] * 2))

SrcRule

Bases: Rule

SrcRule

Source code in pytest_splunk_addon/sample_generation/rule.py
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
class SrcRule(Rule):
    """
    SrcRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random src replacement value from the list
        of values mentioned in token.
        Possible values: ["host", "ipv4", "ipv6", "fqdn"]

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        value_match = re.match(r"[sS]rc(\[.*?\])", self.replacement)
        if value_match:
            value_list_str = value_match.group(1)
            value_list = eval(value_list_str)
            for _ in range(token_count):
                csv_row = self.get_rule_replacement_values(
                    sample, value_list, rule="src"
                )
                if csv_row:
                    yield self.token_value(*([choice(csv_row)] * 2))
                else:
                    raise_warning(
                        "Invalid Value: '{}' in stanza '{}'.\n Accepted values: ['host','ipv4','ipv6','fqdn']".format(
                            self.replacement, sample.sample_name
                        )
                    )
        else:
            raise_warning(
                "Non-supported format: '{}' in stanza '{}'.\n Try  src['host','ipv4','ipv6','fqdn']".format(
                    self.replacement, sample.sample_name
                )
            )

replace(sample, token_count)

Yields a random src replacement value from the list of values mentioned in token. Possible values: [“host”, “ipv4”, “ipv6”, “fqdn”]

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
def replace(self, sample, token_count):
    """
    Yields a random src replacement value from the list
    of values mentioned in token.
    Possible values: ["host", "ipv4", "ipv6", "fqdn"]

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    value_match = re.match(r"[sS]rc(\[.*?\])", self.replacement)
    if value_match:
        value_list_str = value_match.group(1)
        value_list = eval(value_list_str)
        for _ in range(token_count):
            csv_row = self.get_rule_replacement_values(
                sample, value_list, rule="src"
            )
            if csv_row:
                yield self.token_value(*([choice(csv_row)] * 2))
            else:
                raise_warning(
                    "Invalid Value: '{}' in stanza '{}'.\n Accepted values: ['host','ipv4','ipv6','fqdn']".format(
                        self.replacement, sample.sample_name
                    )
                )
    else:
        raise_warning(
            "Non-supported format: '{}' in stanza '{}'.\n Try  src['host','ipv4','ipv6','fqdn']".format(
                self.replacement, sample.sample_name
            )
        )

StaticRule

Bases: Rule

StaticRule

Source code in pytest_splunk_addon/sample_generation/rule.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
class StaticRule(Rule):
    """
    StaticRule
    """

    def replace(self, sample, token_count):
        """
        Yields the static value mentioned in token.

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        for _ in range(token_count):
            yield self.token_value(*([self.replacement] * 2))

replace(sample, token_count)

Yields the static value mentioned in token.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
391
392
393
394
395
396
397
398
399
400
def replace(self, sample, token_count):
    """
    Yields the static value mentioned in token.

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    for _ in range(token_count):
        yield self.token_value(*([self.replacement] * 2))

TimeRule

Bases: Rule

Source code in pytest_splunk_addon/sample_generation/rule.py
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
class TimeRule(Rule):
    def replace(self, sample, token_count):
        """
        Returns time according to the parameters specified in the input.

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        earliest = self.psa_data_params.get("earliest")
        latest = self.psa_data_params.get("latest")
        timezone_time = self.psa_data_params.get("timezone", "0000")
        random_time = datetime.utcnow()
        time_parser = time_parse()
        time_delta = datetime.now().timestamp() - datetime.utcnow().timestamp()

        if earliest != "now" and earliest is not None:

            earliest_match = re.match(r"([+-])(\d{1,})(.*)", earliest)
            if earliest_match:
                sign, num, unit = earliest_match.groups()
                earliest = time_parser.convert_to_time(sign, num, unit)
            else:
                raise_warning(
                    "Invalid value found in earliest: '{}' for stanza '{}'. using earliest = now".format(
                        earliest, sample.sample_name
                    )
                )
                earliest = datetime.utcnow()
        else:
            earliest = datetime.utcnow()

        if latest != "now" and latest is not None:

            latest_match = re.match(r"([+-])(\d{1,})(.*)", latest)
            if latest_match:
                sign, num, unit = latest_match.groups()
                latest = time_parser.convert_to_time(sign, num, unit)
            else:
                raise_warning(
                    "Invalid value found in latest: '{}' for stanza '{}'. using latest = now".format(
                        latest, sample.sample_name
                    )
                )
                latest = datetime.utcnow()
        else:
            latest = datetime.utcnow()

        earliest_in_epoch = mktime(earliest.timetuple())
        latest_in_epoch = mktime(latest.timetuple())

        if earliest_in_epoch > latest_in_epoch:
            LOGGER.info("Latest time is earlier than earliest time.")
            yield self.token
        for _ in range(token_count):
            random_time = datetime.fromtimestamp(
                randint(earliest_in_epoch, latest_in_epoch)
            )
            if timezone_time in ["local", '"local"', "'local'"]:
                random_time = random_time.replace(tzinfo=timezone.utc).astimezone(
                    tz=None
                )

            elif timezone_time and timezone_time.strip("'").strip('"') != r"0000":
                random_time = time_parser.get_timezone_time(random_time, timezone_time)

            if r"%s" == self.replacement.strip("'").strip('"'):
                time_in_sec = self.replacement.replace(
                    r"%s", str(int(mktime(random_time.timetuple())))
                )
                yield self.token_value(float(time_in_sec), time_in_sec)

            else:
                if timezone_time not in (None, "0000"):
                    modified_random_time = time_parser.get_timezone_time(
                        random_time, self.invert_timezone(timezone_time)
                    )
                else:
                    modified_random_time = random_time
                yield self.token_value(
                    float(mktime(modified_random_time.timetuple())) + time_delta,
                    random_time.strftime(self.replacement.replace(r"%e", r"%d")),
                )

    def invert_timezone(self, timezone_time):
        if timezone_time == "0000":
            return "0000"
        elif timezone_time[0] == "-":
            return "+" + timezone_time[-4:]
        elif timezone_time[0] == "+":
            return "-" + timezone_time[-4:]
        else:
            raise Exception("Invalid timezone value found.")

replace(sample, token_count)

Returns time according to the parameters specified in the input.

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
def replace(self, sample, token_count):
    """
    Returns time according to the parameters specified in the input.

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    earliest = self.psa_data_params.get("earliest")
    latest = self.psa_data_params.get("latest")
    timezone_time = self.psa_data_params.get("timezone", "0000")
    random_time = datetime.utcnow()
    time_parser = time_parse()
    time_delta = datetime.now().timestamp() - datetime.utcnow().timestamp()

    if earliest != "now" and earliest is not None:

        earliest_match = re.match(r"([+-])(\d{1,})(.*)", earliest)
        if earliest_match:
            sign, num, unit = earliest_match.groups()
            earliest = time_parser.convert_to_time(sign, num, unit)
        else:
            raise_warning(
                "Invalid value found in earliest: '{}' for stanza '{}'. using earliest = now".format(
                    earliest, sample.sample_name
                )
            )
            earliest = datetime.utcnow()
    else:
        earliest = datetime.utcnow()

    if latest != "now" and latest is not None:

        latest_match = re.match(r"([+-])(\d{1,})(.*)", latest)
        if latest_match:
            sign, num, unit = latest_match.groups()
            latest = time_parser.convert_to_time(sign, num, unit)
        else:
            raise_warning(
                "Invalid value found in latest: '{}' for stanza '{}'. using latest = now".format(
                    latest, sample.sample_name
                )
            )
            latest = datetime.utcnow()
    else:
        latest = datetime.utcnow()

    earliest_in_epoch = mktime(earliest.timetuple())
    latest_in_epoch = mktime(latest.timetuple())

    if earliest_in_epoch > latest_in_epoch:
        LOGGER.info("Latest time is earlier than earliest time.")
        yield self.token
    for _ in range(token_count):
        random_time = datetime.fromtimestamp(
            randint(earliest_in_epoch, latest_in_epoch)
        )
        if timezone_time in ["local", '"local"', "'local'"]:
            random_time = random_time.replace(tzinfo=timezone.utc).astimezone(
                tz=None
            )

        elif timezone_time and timezone_time.strip("'").strip('"') != r"0000":
            random_time = time_parser.get_timezone_time(random_time, timezone_time)

        if r"%s" == self.replacement.strip("'").strip('"'):
            time_in_sec = self.replacement.replace(
                r"%s", str(int(mktime(random_time.timetuple())))
            )
            yield self.token_value(float(time_in_sec), time_in_sec)

        else:
            if timezone_time not in (None, "0000"):
                modified_random_time = time_parser.get_timezone_time(
                    random_time, self.invert_timezone(timezone_time)
                )
            else:
                modified_random_time = random_time
            yield self.token_value(
                float(mktime(modified_random_time.timetuple())) + time_delta,
                random_time.strftime(self.replacement.replace(r"%e", r"%d")),
            )

UrlRule

Bases: Rule

UrlRule

Source code in pytest_splunk_addon/sample_generation/rule.py
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
class UrlRule(Rule):
    """
    UrlRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random url replacement value from the list
        of values mentioned in token.

        Possible values: ["ip_host", "fqdn_host", "path", "query", "protocol", "full"]

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        replace_token = True
        value_match = re.match(r"[uU]rl(\[.*?\])", self.replacement)
        if value_match:
            value_list_str = value_match.group(1)
            value_list = eval(value_list_str)
            for each in value_list:
                if each not in [
                    "ip_host",
                    "fqdn_host",
                    "path",
                    "query",
                    "protocol",
                    "full",
                ]:
                    raise_warning(
                        'Invalid Value for url: "{}" for replacement {} in stanza "{}".\n Accepted values: ["ip_host", "fqdn_host", "path", "query", "protocol"]'.format(
                            each, self.replacement, sample.sample_name
                        )
                    )
                    replace_token = False
            if replace_token:
                for _ in range(token_count):
                    if bool(
                        set(["ip_host", "fqdn_host", "full"]).intersection(value_list)
                    ):
                        url = ""
                        domain_name = []
                        if bool(set(["full", "protocol"]).intersection(value_list)):
                            url = url + choice(["http://", "https://"])
                        if bool(set(["full", "ip_host"]).intersection(value_list)):
                            domain_name.append(sample.get_ipv4("url"))
                        if bool(set(["full", "fqdn_host"]).intersection(value_list)):
                            domain_name.append(self.fake.hostname())
                        url = url + choice(domain_name)
                    else:
                        url = self.fake.url()

                    if bool(set(["full", "path"]).intersection(value_list)):
                        if value_list == ["path"]:
                            url = ""
                        url = (
                            url
                            + "/"
                            + choice(
                                [
                                    self.fake.uri_path(),
                                    self.fake.uri_page() + self.fake.uri_extension(),
                                ]
                            )
                        )
                    if bool(set(["full", "query"]).intersection(value_list)):
                        if value_list == ["query"]:
                            url = ""
                        url = url + self.generate_url_query_params()
                    yield self.token_value(*([str(url)] * 2))
        else:
            raise_warning(
                'Unidentified format: "{}" in stanza "{}".\n Expected values: ["ip_host", "fqdn_host", "path", "query", "protocol", "full"]'.format(
                    self.replacement, sample.sample_name
                )
            )

    def generate_url_query_params(self):
        """
        Generates random query params for url

        Returns:
            Return the query param string
        """
        url_params = "?"
        for _ in range(randint(1, 4)):
            field = "".join(
                choice(string.ascii_lowercase) for _ in range(randint(2, 5))
            )
            value = "".join(
                choice(string.ascii_lowercase + string.digits)
                for _ in range(randint(2, 5))
            )
            url_params = url_params + field + "=" + value + "&"
        return url_params[:-1]

generate_url_query_params()

Generates random query params for url

Returns:

Type Description

Return the query param string

Source code in pytest_splunk_addon/sample_generation/rule.py
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
def generate_url_query_params(self):
    """
    Generates random query params for url

    Returns:
        Return the query param string
    """
    url_params = "?"
    for _ in range(randint(1, 4)):
        field = "".join(
            choice(string.ascii_lowercase) for _ in range(randint(2, 5))
        )
        value = "".join(
            choice(string.ascii_lowercase + string.digits)
            for _ in range(randint(2, 5))
        )
        url_params = url_params + field + "=" + value + "&"
    return url_params[:-1]

replace(sample, token_count)

Yields a random url replacement value from the list of values mentioned in token.

Possible values: [“ip_host”, “fqdn_host”, “path”, “query”, “protocol”, “full”]

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
def replace(self, sample, token_count):
    """
    Yields a random url replacement value from the list
    of values mentioned in token.

    Possible values: ["ip_host", "fqdn_host", "path", "query", "protocol", "full"]

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    replace_token = True
    value_match = re.match(r"[uU]rl(\[.*?\])", self.replacement)
    if value_match:
        value_list_str = value_match.group(1)
        value_list = eval(value_list_str)
        for each in value_list:
            if each not in [
                "ip_host",
                "fqdn_host",
                "path",
                "query",
                "protocol",
                "full",
            ]:
                raise_warning(
                    'Invalid Value for url: "{}" for replacement {} in stanza "{}".\n Accepted values: ["ip_host", "fqdn_host", "path", "query", "protocol"]'.format(
                        each, self.replacement, sample.sample_name
                    )
                )
                replace_token = False
        if replace_token:
            for _ in range(token_count):
                if bool(
                    set(["ip_host", "fqdn_host", "full"]).intersection(value_list)
                ):
                    url = ""
                    domain_name = []
                    if bool(set(["full", "protocol"]).intersection(value_list)):
                        url = url + choice(["http://", "https://"])
                    if bool(set(["full", "ip_host"]).intersection(value_list)):
                        domain_name.append(sample.get_ipv4("url"))
                    if bool(set(["full", "fqdn_host"]).intersection(value_list)):
                        domain_name.append(self.fake.hostname())
                    url = url + choice(domain_name)
                else:
                    url = self.fake.url()

                if bool(set(["full", "path"]).intersection(value_list)):
                    if value_list == ["path"]:
                        url = ""
                    url = (
                        url
                        + "/"
                        + choice(
                            [
                                self.fake.uri_path(),
                                self.fake.uri_page() + self.fake.uri_extension(),
                            ]
                        )
                    )
                if bool(set(["full", "query"]).intersection(value_list)):
                    if value_list == ["query"]:
                        url = ""
                    url = url + self.generate_url_query_params()
                yield self.token_value(*([str(url)] * 2))
    else:
        raise_warning(
            'Unidentified format: "{}" in stanza "{}".\n Expected values: ["ip_host", "fqdn_host", "path", "query", "protocol", "full"]'.format(
                self.replacement, sample.sample_name
            )
        )

UserRule

Bases: Rule

UserRule

Source code in pytest_splunk_addon/sample_generation/rule.py
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
class UserRule(Rule):
    """
    UserRule
    """

    def replace(self, sample, token_count):
        """
        Yields a random user replacement value from the list of values mentioned in token.
        Possible values: ["name", "email", "domain_user", "distinquised_name"]

        Args:
            sample (SampleEvent): Instance containing event info
            token_count (int): No. of token in sample event where rule is applicable
        """
        value_match = re.match(r"[uU]ser(\[.*?\])", self.replacement)
        if value_match:
            value_list_str = value_match.group(1)
            value_list = eval(value_list_str)

            for i in range(token_count):
                if (
                    hasattr(sample, "replacement_map")
                    and "email" in sample.replacement_map
                    and i < len(sample.replacement_map["email"])
                ):
                    index_list = [
                        i
                        for i, item in enumerate(self.user_header)
                        if item in value_list
                    ]
                    csv_rows = sample.replacement_map["email"]
                    yield self.token_value(*([csv_rows[i][choice(index_list)]] * 2))
                else:
                    index_list, csv_row = self.get_lookup_value(
                        sample,
                        "user",
                        self.user_header,
                        value_list,
                    )
                    if index_list:
                        yield self.token_value(*([csv_row[choice(index_list)]] * 2))
                    else:
                        raise_warning(
                            "Invalid Value: '{}' in stanza '{}'.\n Accepted values: ['name','email','domain_name','distinquised_name']".format(
                                self.replacement, sample.sample_name
                            )
                        )
        else:
            raise_warning(
                "Unidentified format: '{}' in stanza '{}'.\n Try  user['name','email','domain_name','distinquised_name']".format(
                    self.replacement, sample.sample_name
                )
            )

replace(sample, token_count)

Yields a random user replacement value from the list of values mentioned in token. Possible values: [“name”, “email”, “domain_user”, “distinquised_name”]

Parameters:

Name Type Description Default
sample SampleEvent

Instance containing event info

required
token_count int

No. of token in sample event where rule is applicable

required
Source code in pytest_splunk_addon/sample_generation/rule.py
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
def replace(self, sample, token_count):
    """
    Yields a random user replacement value from the list of values mentioned in token.
    Possible values: ["name", "email", "domain_user", "distinquised_name"]

    Args:
        sample (SampleEvent): Instance containing event info
        token_count (int): No. of token in sample event where rule is applicable
    """
    value_match = re.match(r"[uU]ser(\[.*?\])", self.replacement)
    if value_match:
        value_list_str = value_match.group(1)
        value_list = eval(value_list_str)

        for i in range(token_count):
            if (
                hasattr(sample, "replacement_map")
                and "email" in sample.replacement_map
                and i < len(sample.replacement_map["email"])
            ):
                index_list = [
                    i
                    for i, item in enumerate(self.user_header)
                    if item in value_list
                ]
                csv_rows = sample.replacement_map["email"]
                yield self.token_value(*([csv_rows[i][choice(index_list)]] * 2))
            else:
                index_list, csv_row = self.get_lookup_value(
                    sample,
                    "user",
                    self.user_header,
                    value_list,
                )
                if index_list:
                    yield self.token_value(*([csv_row[choice(index_list)]] * 2))
                else:
                    raise_warning(
                        "Invalid Value: '{}' in stanza '{}'.\n Accepted values: ['name','email','domain_name','distinquised_name']".format(
                            self.replacement, sample.sample_name
                        )
                    )
    else:
        raise_warning(
            "Unidentified format: '{}' in stanza '{}'.\n Try  user['name','email','domain_name','distinquised_name']".format(
                self.replacement, sample.sample_name
            )
        )

raise_warning(warning_string)

To raise a pytest user warning along with a log.

Parameters:

Name Type Description Default
warning_string(str)

warning string

required
Source code in pytest_splunk_addon/sample_generation/rule.py
49
50
51
52
53
54
55
56
57
def raise_warning(warning_string):
    """
    To raise a pytest user warning along with a log.

    Args:
        warning_string(str): warning string
    """
    LOGGER.warning(warning_string)
    warnings.warn(UserWarning(warning_string))