Skip to content

AddonParser

The module provides the Add-on parsing mechanism. It can parse the knowledge objects from an Add-on’s configuration files.

Supports: fields from props & transforms, tags, eventtypes, savedsearches

AddonParser

Bases: object

Parse the knowledge objects from an Add-on’s configuration files. Supports: fields from props & transforms, tags, eventtypes

Parameters:

Name Type Description Default
splunk_app_path str

Path to the Splunk App

required
Source code in pytest_splunk_addon/addon_parser/__init__.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class AddonParser(object):
    """
    Parse the knowledge objects from an Add-on's configuration files.
    Supports: fields from props & transforms, tags, eventtypes

    Args:
        splunk_app_path (str): Path to the Splunk App
    """

    def __init__(self, splunk_app_path):
        self.splunk_app_path = splunk_app_path
        self._props_parser = None
        self._tags_parser = None
        self._eventtype_parser = None
        self._savedsearch_parser = None
        self._parser_cache = ParserCache()

    @property
    def props_parser(self):
        if not self._props_parser:

            def _parse_props():
                parser = PropsParser(self.splunk_app_path)
                return parser.props

            props_data = self._parser_cache.get_or_parse(_parse_props, "props")
            self._props_parser = PropsParser(
                self.splunk_app_path, props_data=props_data
            )
        return self._props_parser

    @property
    def tags_parser(self):
        if not self._tags_parser:

            def _parse_tags():
                parser = TagsParser(self.splunk_app_path)
                return parser.tags

            tags_data = self._parser_cache.get_or_parse(_parse_tags, "tags")
            self._tags_parser = TagsParser(self.splunk_app_path, tags_data=tags_data)
        return self._tags_parser

    @property
    def eventtype_parser(self):
        if not self._eventtype_parser:

            def _parse_eventtypes():
                parser = EventTypeParser(self.splunk_app_path)
                return parser.eventtypes

            eventtypes_data = self._parser_cache.get_or_parse(
                _parse_eventtypes, "eventtypes"
            )
            self._eventtype_parser = EventTypeParser(
                self.splunk_app_path, eventtypes_data=eventtypes_data
            )
        return self._eventtype_parser

    @property
    def savedsearch_parser(self):
        if not self._savedsearch_parser:

            def _parse_savedsearches():
                parser = SavedSearchParser(self.splunk_app_path)
                return parser.savedsearches

            savedsearches_data = self._parser_cache.get_or_parse(
                _parse_savedsearches, "savedsearches"
            )
            self._savedsearch_parser = SavedSearchParser(
                self.splunk_app_path, savedsearches_data=savedsearches_data
            )
        return self._savedsearch_parser

    def get_props_fields(self):
        """
        Parse the props.conf and yield all supported fields

        Yields:
            generator of all the supported fields
        """

        def _parse_props_fields():
            LOGGER.info("Building props_fields cache")
            fields = list(self.props_parser.get_props_fields())
            return fields

        fields_data = self._parser_cache.get_or_parse(
            _parse_props_fields, "props_fields"
        )
        return iter(fields_data or [])

    def get_tags(self):
        """
        Parse the tags.conf of the App & yield stanzas

        Yields:
            generator of stanzas from the tags
        """
        return self.tags_parser.get_tags()

    def get_eventtypes(self):
        """
        Parse the App configuration files & yield eventtypes

        Yields:
            generator of list of eventtypes
        """
        return self.eventtype_parser.get_eventtypes()

    def get_savedsearches(self):
        """
        Parse the App configuration files & yield searchedservices

        Yields:
            generator of list of searchedservices
        """
        return self.savedsearch_parser.get_savedsearches()

get_eventtypes()

Parse the App configuration files & yield eventtypes

Yields:

Type Description

generator of list of eventtypes

Source code in pytest_splunk_addon/addon_parser/__init__.py
140
141
142
143
144
145
146
147
def get_eventtypes(self):
    """
    Parse the App configuration files & yield eventtypes

    Yields:
        generator of list of eventtypes
    """
    return self.eventtype_parser.get_eventtypes()

get_props_fields()

Parse the props.conf and yield all supported fields

Yields:

Type Description

generator of all the supported fields

Source code in pytest_splunk_addon/addon_parser/__init__.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def get_props_fields(self):
    """
    Parse the props.conf and yield all supported fields

    Yields:
        generator of all the supported fields
    """

    def _parse_props_fields():
        LOGGER.info("Building props_fields cache")
        fields = list(self.props_parser.get_props_fields())
        return fields

    fields_data = self._parser_cache.get_or_parse(
        _parse_props_fields, "props_fields"
    )
    return iter(fields_data or [])

get_savedsearches()

Parse the App configuration files & yield searchedservices

Yields:

Type Description

generator of list of searchedservices

Source code in pytest_splunk_addon/addon_parser/__init__.py
149
150
151
152
153
154
155
156
def get_savedsearches(self):
    """
    Parse the App configuration files & yield searchedservices

    Yields:
        generator of list of searchedservices
    """
    return self.savedsearch_parser.get_savedsearches()

get_tags()

Parse the tags.conf of the App & yield stanzas

Yields:

Type Description

generator of stanzas from the tags

Source code in pytest_splunk_addon/addon_parser/__init__.py
131
132
133
134
135
136
137
138
def get_tags(self):
    """
    Parse the tags.conf of the App & yield stanzas

    Yields:
        generator of stanzas from the tags
    """
    return self.tags_parser.get_tags()

PropsParser

Provides props.conf parsing mechanism

PropsParser

Bases: object

Parses props.conf and extracts the fields.

Parameters:

Name Type Description Default
splunk_app_path str

Path of the Splunk app

required
Source code in pytest_splunk_addon/addon_parser/props_parser.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
class PropsParser(object):
    """
    Parses props.conf and extracts the fields.

    Args:
        splunk_app_path (str): Path of the Splunk app
    """

    def __init__(self, splunk_app_path: str, props_data: Optional[Dict] = None):
        self._conf_parser = conf_parser.TABConfigParser()
        self.splunk_app_path = splunk_app_path
        self._props = props_data
        self.transforms_parser = TransformsParser(self.splunk_app_path)

    @property
    def props(self) -> Optional[Dict]:
        if self._props is not None:
            return self._props
        props_conf_path = os.path.join(self.splunk_app_path, "default", "props.conf")
        LOGGER.info("Parsing props.conf")
        self._conf_parser.read(props_conf_path)
        self._props = self._conf_parser.item_dict()
        return self._props if self._props else None

    def get_props_fields(self):
        """
        Parse the props.conf and yield all supported fields

        Yields:
            generator of all the supported fields
        """
        for stanza_type, stanza_name, stanza_values in self._get_props_stanzas():
            for key, value in stanza_values.items():
                LOGGER.info(f"Parsing parameter={key} of stanza={stanza_name}")
                if re.match("REPORT", key, re.IGNORECASE):
                    for transform_stanza, fields in self._get_report_fields(key, value):
                        field_list = list(fields)
                        if field_list:
                            yield {
                                "stanza": stanza_name,
                                "stanza_type": stanza_type,
                                "classname": f"{key}::{transform_stanza}",
                                "fields": field_list,
                            }
                elif re.match("TRANSFORM", key, re.IGNORECASE):
                    for transforms_sourcetype_group in self._get_transforms_sourcetypes(
                        stanza_name, stanza_values
                    ):
                        yield transforms_sourcetype_group
                else:
                    LOGGER.info(f"Trying to parse classname={key}")
                    parsing_method = self._get_props_method(key)
                    if parsing_method:
                        field_list = list(parsing_method(key, value))
                        if field_list:
                            yield {
                                "stanza": stanza_name,
                                "stanza_type": stanza_type,
                                "classname": key,
                                "fields": field_list,
                            }

    def _get_props_method(self, class_name: str):
        """
        Get the parsing method depending on classname

        Args:
            class_name (str): class name of the props property

        Returns:
            instance method to parse the property
        """
        method_mapping = {
            "EXTRACT": self._get_extract_fields,
            "EVAL": self._get_eval_fields,
            "FIELDALIAS": self._get_fieldalias_fields,
            "LOOKUP": self._get_lookup_fields,
        }
        for each_type in method_mapping:
            if re.match(each_type, class_name, re.IGNORECASE):
                LOGGER.info(f"Matched method of type={each_type}")
                return method_mapping[each_type]
        else:
            if utils.check_first_worker():
                LOGGER.warning(f"No parser available for {class_name}. Skipping...")

    def _get_props_stanzas(self) -> Optional[Generator]:
        """
        Parse the props.conf of the App & yield stanzas.
        For source with | (OR), it will return all combinations

        Yields:
            generator of stanzas from the props
        """
        if not self.props:
            return
        for stanza_name, stanza_values in self.props.items():
            if stanza_name.startswith("host::"):
                LOGGER.warning("Host stanza is not supported. Skipping..")
                continue
            if stanza_name.startswith("source::"):
                LOGGER.info(f"Parsing Source based stanza: {stanza_name}")
                for each_source in self.get_list_of_sources(stanza_name):
                    yield "source", each_source, stanza_values
            else:
                LOGGER.info(f"Parsing Sourcetype based stanza: {stanza_name}")
                yield "sourcetype", stanza_name, stanza_values

    @staticmethod
    def get_list_of_sources(source: str) -> Generator:
        """
        For source with | (OR), it will return all combinations.
        Uses itertools.product to list the combinations

        Example::

            input "(preA|preB)str(postX|postY)"
            output [
                preAstrpostX
                preBstrpostX
                preAstrpostY
                preBstrpostY
            ]

        Args:
            source (str): Source name

        Yields:
            generator of source name
        """
        LOGGER.debug("Finding combinations of a source..")
        match_obj = re.search(r"source::(.*)", source)
        value = match_obj.group(1).replace("...", "*")
        sub_groups = re.findall(r"\([^\)]+\)", value)
        sub_group_list = []
        for each_group in sub_groups:
            sub_group_list.append(each_group.strip("()").split("|"))
        template = re.sub(r"\([^\)]+\)", "{}", value)
        count = 0
        for each_permutation in product(*sub_group_list):
            count += 1
            yield template.format(*each_permutation)
        LOGGER.debug("Found %d combinations", count)

    @convert_to_fields
    def _get_extract_fields(self, name: str, value: str):
        """
        Returns the fields parsed from EXTRACT

        Example::

            EXTRACT-one = regex with (?<capturing_group>.*)

        Args:
            name (str): key in the configuration settings
            value (str): value of the respective name in the configuration

        Regex:
            Parse the fields from a regex. Examples,

            * (?<name>regex)
            * (?'name'regex)
            * (?P<name>regex)

        Yields:
            generator of fields
        """
        regex = r"\(\?P?(?:[<'])([^\>'\s]+)[\>']"
        fields_group = []
        for field in re.findall(regex, value):
            if not field.startswith(("_KEY_", "_VAL_")):
                fields_group.append(field)
                yield field

        # If SOURCE_KEY is used in EXTRACT, generate the test for the same.
        regex_for_source_key = r"(?i)(?:in\s+(\w+))\s*$"
        extract_source_key = re.search(regex_for_source_key, value, re.MULTILINE)
        if extract_source_key:
            LOGGER.info(f"Found a source key in {name}")
            yield extract_source_key.group(1)
            fields_group.insert(0, extract_source_key.group(1))

    @convert_to_fields
    def _get_eval_fields(self, name, value):
        """
        Return the fields parsed from EVAL

        Example::

            EVAL-action = if(isnull(action), "unknown", action)

        Args:
            name (str): key in the configuration settings
            value (str): value of the respective name in the configuration

        Yields:
            generator of fields
        """
        regex = r"EVAL-(?P<FIELD>.*)"
        if not value == "null()":
            yield from re.findall(regex, name, re.IGNORECASE)

    @convert_to_fields
    def _get_fieldalias_fields(self, name: str, value: str):
        """
        Return the fields parsed from FIELDALIAS

        Example::

            FIELDALIAS-class = source AS dest, sc2 AS dest2

        Args:
            name (str): key in the configuration settings
            value (str): value of the respective name in the configuration

        Regex:
            Description:

            * Find all field alias group separated by space or comma

            Examples:

            * field_source AS field_destination
            * "Field Source" as "Field Destination"
            * field_source ASNEW 'Field Destination'
            * field_source asnew field_destination

        Yields:
            generator of fields
        """
        regex = (
            r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)"
            r"\s+(?:as(?:new)?)\s+"
            r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)"
        )
        fields_tuples = re.findall(regex, value, re.IGNORECASE)
        return list(set([item for t in fields_tuples for item in t]))

    def _get_report_fields(self, name: str, value: str):
        """
        Returns the fields parsed from REPORT

        In order to parse the fields REPORT, the method parses the
        transforms.conf and returns the list

        Args:
            name (str): key in the configuration settings
            value (str): value of the respective name in the configuration

        Yields:
            generator of (transform_stanza ,fields) parsed from transforms.conf
        """

        transforms_itr = (each_stanza.strip() for each_stanza in value.split(","))
        for transforms_section in transforms_itr:
            yield (
                transforms_section,
                self.transforms_parser.get_transform_fields(transforms_section),
            )

    @convert_to_fields
    def _get_lookup_fields(self, name: str, value: str):
        """
        Extracts the lookup fields

        Args:
            name (str): key in the configuration settings
            value (str): value of the respective name in the configuration

        Returns:
            List of lookup fields
        """
        parsed_fields = self._parse_lookup(value)
        lookup_field_list = (
            parsed_fields["input_fields"] + parsed_fields["output_fields"]
        )

        # If the OUTPUT or OUTPUTNEW argument is never used, then get the fields from the csv file
        if not parsed_fields["output_fields"]:
            LOGGER.info(
                "OUTPUT fields not found classname=%s. Parsing the lookup csv file",
                name,
            )
            lookup_field_list += list(
                self.transforms_parser.get_lookup_csv_fields(
                    parsed_fields["lookup_stanza"]
                )
            )
        return list(set(lookup_field_list))

    def _parse_lookup(self, lookup: str):
        """
        Get list of lookup fields by parsing the lookup string.
        If a field is aliased to another field, take the aliased field into consideration

        Example::

            LOOKUP-class = lookup_stanza input_field OUTPUT output_field

        Args:
            lookup_str (str): Lookup string from props.conf

        Regex:
            Parse the fields from the lookup string. Examples,

            * field1 AS field2, field3 field4 as field5

        Returns:
            (dict):
                lookup_stanza (str): The stanza name for the lookup in question in transforms.conf
                input_fields (list): The fields in the input of the lookup
                output_fields (list): The fields in the output of the lookup
        """

        input_output_field_list = []
        lookup_stanza = lookup.split(" ")[0]
        lookup_str = " ".join(lookup.split(" ")[1:])

        # 0: Take the left side of the OUTPUT as input fields
        # -1: Take the right side of the OUTPUT as output fields
        for input_output_index in [0, -1]:
            if "OUTPUT" not in lookup_str:
                lookup_str += " OUTPUT "

            # Take input fields or output fields depending on the input_output_index
            input_output_str = lookup_str.split("OUTPUTNEW")[input_output_index].split(
                "OUTPUT"
            )[input_output_index]

            field_parser = r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)\s*(?:[aA][sS]\s+(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+))?"
            # field_groups: Group of max 2 fields - (source, destination) for "source as destination"
            field_groups = re.findall(field_parser, input_output_str)

            field_list = []
            # Take the last non-empty field from a field group.
            # Taking last non-empty field ensures that the aliased value will have
            # higher priority
            for each_group in field_groups:
                field_list.append(
                    [each_field for each_field in reversed(each_group) if each_field][0]
                )

            input_output_field_list.append(field_list)
        return {
            "input_fields": input_output_field_list[0],
            "output_fields": input_output_field_list[1],
            "lookup_stanza": lookup_stanza,
        }

    def _get_transforms_sourcetypes(self, stanza_name, stanza_values):
        """
        Extract sourcetypes defined via TRANSFORMS directives.

        Looks for TRANSFORMS keys matching pattern TRANSFORMS-.*sourcetype.*
        and extracts sourcetypes from the referenced transform stanzas.

        Args:
            stanza_name (str): Name of the props.conf stanza
            stanza_values (dict): Dictionary of stanza key-value pairs

        Yields:
            Field group dictionaries with empty fields list for sourcetype coverage testing
        """
        LOGGER.info("Getting transforms sourcetypes for stanza: %s", stanza_name)
        sourcetype_transforms_pattern = re.compile(r"TRANSFORMS-.*", re.IGNORECASE)
        seen_sourcetypes = set()

        for key, value in stanza_values.items():
            if not sourcetype_transforms_pattern.match(key):
                continue

            LOGGER.debug(
                "Found TRANSFORMS sourcetype directive: %s=%s in stanza %s",
                key,
                value,
                stanza_name,
            )

            transform_stanzas = [s.strip() for s in value.split(",")]

            for transform_stanza in transform_stanzas:
                if not transform_stanza:
                    continue

                sourcetype = self.transforms_parser.get_sourcetype_from_transform(
                    transform_stanza
                )

                if sourcetype and sourcetype not in seen_sourcetypes:
                    seen_sourcetypes.add(sourcetype)
                    LOGGER.info(
                        "Found TRANSFORMS-defined sourcetype: %s (from transform %s)",
                        sourcetype,
                        transform_stanza,
                    )
                    yield {
                        "stanza": sourcetype,
                        "stanza_type": "sourcetype",
                        "classname": f"TRANSFORMS-sourcetype::{transform_stanza}",
                        "fields": [],  # Empty fields triggers coverage test
                    }

get_list_of_sources(source) staticmethod

For source with | (OR), it will return all combinations. Uses itertools.product to list the combinations

Example::

input "(preA|preB)str(postX|postY)"
output [
    preAstrpostX
    preBstrpostX
    preAstrpostY
    preBstrpostY
]

Parameters:

Name Type Description Default
source str

Source name

required

Yields:

Type Description
Generator

generator of source name

Source code in pytest_splunk_addon/addon_parser/props_parser.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@staticmethod
def get_list_of_sources(source: str) -> Generator:
    """
    For source with | (OR), it will return all combinations.
    Uses itertools.product to list the combinations

    Example::

        input "(preA|preB)str(postX|postY)"
        output [
            preAstrpostX
            preBstrpostX
            preAstrpostY
            preBstrpostY
        ]

    Args:
        source (str): Source name

    Yields:
        generator of source name
    """
    LOGGER.debug("Finding combinations of a source..")
    match_obj = re.search(r"source::(.*)", source)
    value = match_obj.group(1).replace("...", "*")
    sub_groups = re.findall(r"\([^\)]+\)", value)
    sub_group_list = []
    for each_group in sub_groups:
        sub_group_list.append(each_group.strip("()").split("|"))
    template = re.sub(r"\([^\)]+\)", "{}", value)
    count = 0
    for each_permutation in product(*sub_group_list):
        count += 1
        yield template.format(*each_permutation)
    LOGGER.debug("Found %d combinations", count)

get_props_fields()

Parse the props.conf and yield all supported fields

Yields:

Type Description

generator of all the supported fields

Source code in pytest_splunk_addon/addon_parser/props_parser.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def get_props_fields(self):
    """
    Parse the props.conf and yield all supported fields

    Yields:
        generator of all the supported fields
    """
    for stanza_type, stanza_name, stanza_values in self._get_props_stanzas():
        for key, value in stanza_values.items():
            LOGGER.info(f"Parsing parameter={key} of stanza={stanza_name}")
            if re.match("REPORT", key, re.IGNORECASE):
                for transform_stanza, fields in self._get_report_fields(key, value):
                    field_list = list(fields)
                    if field_list:
                        yield {
                            "stanza": stanza_name,
                            "stanza_type": stanza_type,
                            "classname": f"{key}::{transform_stanza}",
                            "fields": field_list,
                        }
            elif re.match("TRANSFORM", key, re.IGNORECASE):
                for transforms_sourcetype_group in self._get_transforms_sourcetypes(
                    stanza_name, stanza_values
                ):
                    yield transforms_sourcetype_group
            else:
                LOGGER.info(f"Trying to parse classname={key}")
                parsing_method = self._get_props_method(key)
                if parsing_method:
                    field_list = list(parsing_method(key, value))
                    if field_list:
                        yield {
                            "stanza": stanza_name,
                            "stanza_type": stanza_type,
                            "classname": key,
                            "fields": field_list,
                        }

EventtypeParser

Provides eventtypes.conf parsing mechanism

EventTypeParser

Bases: object

Parses eventtypes.conf and extracts eventtypes

Parameters:

Name Type Description Default
splunk_app_path str

Path of the Splunk app

required
Source code in pytest_splunk_addon/addon_parser/eventtype_parser.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class EventTypeParser(object):
    """
    Parses eventtypes.conf and extracts eventtypes

    Args:
        splunk_app_path (str): Path of the Splunk app
    """

    def __init__(self, splunk_app_path: str, eventtypes_data: Optional[Dict] = None):
        self._conf_parser = conf_parser.TABConfigParser()
        self.splunk_app_path = splunk_app_path
        self._eventtypes = eventtypes_data

    @property
    def eventtypes(self) -> Optional[Dict]:
        if self._eventtypes is not None:
            return self._eventtypes
        eventtypes_conf_path = os.path.join(
            self.splunk_app_path, "default", "eventtypes.conf"
        )
        LOGGER.info("Parsing eventtypes.conf")
        self._conf_parser.read(eventtypes_conf_path)
        self._eventtypes = self._conf_parser.item_dict()
        return self._eventtypes if self._eventtypes else None

    def get_eventtypes(self) -> Optional[Generator]:
        """
        Parse the App configuration files & yield eventtypes

        Yields:
            generator of list of eventtypes
        """
        if not self.eventtypes:
            return None
        for stanza_key in self.eventtypes.keys():
            LOGGER.info("Parsing eventtype stanza=%s", stanza_key)
            yield {"stanza": stanza_key}

get_eventtypes()

Parse the App configuration files & yield eventtypes

Yields:

Type Description
Optional[Generator]

generator of list of eventtypes

Source code in pytest_splunk_addon/addon_parser/eventtype_parser.py
55
56
57
58
59
60
61
62
63
64
65
66
def get_eventtypes(self) -> Optional[Generator]:
    """
    Parse the App configuration files & yield eventtypes

    Yields:
        generator of list of eventtypes
    """
    if not self.eventtypes:
        return None
    for stanza_key in self.eventtypes.keys():
        LOGGER.info("Parsing eventtype stanza=%s", stanza_key)
        yield {"stanza": stanza_key}

Field

Provides the Field class containing all the field properties and a decorator to convert a list to field list

Field

Bases: object

Contains the field properties

  • name (str): name of the field
  • type (str): Field type. Supported [required, conditional, optional]
  • multi_value (bool): True if field is multi value field
  • expected_values (list): The field should have this expected values
  • negative_values (list): The field should not have negative values
  • condition (spl): The field should only be checked if the condition satisfies
  • validity (eval): eval statement to extract the valid fields only

Parameters:

Name Type Description Default
field_json dict

dictionary containing field properties

None
Source code in pytest_splunk_addon/addon_parser/fields.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class Field(object):
    """
    Contains the field properties

    * name (str): name of the field
    * type (str): Field type. Supported [required, conditional, optional]
    * multi_value (bool): True if field is multi value field
    * expected_values (list): The field should have this expected values
    * negative_values (list): The field should not have negative values
    * condition (spl): The field should only be checked if the condition satisfies
    * validity (eval): eval statement to extract the valid fields only

    Args:
        field_json (dict): dictionary containing field properties
    """

    SUPPORTED_TYPES = ["required", "conditional", "optional"]

    def __init__(self, field_json=None):
        self.name = field_json.get("name")
        self.type = field_json.get("type") or "required"
        self.multi_value = field_json.get("multi_value") or False
        self.expected_values = field_json.get("expected_values", ["*"])
        self.negative_values = field_json.get("negative_values", ["-", ""])
        self.condition = field_json.get("condition") or ""
        self.validity = field_json.get("validity") or self.name

    def __str__(self):
        return str(self.name)

    def __eq__(self, other: "Field"):
        return self.__dict__ == other.__dict__

    def __lt__(self, other: "Field"):
        return self.name < other.name

    def __repr__(self):
        return f"<Field name={self.name}>"

    def get_type(self):
        return self.type

    @classmethod
    def parse_fields(cls, field_list, **kwargs):
        """
        Parse the fields from a list

        Args:
            field_list (list): list of field names
        """
        for each_fields in field_list:
            yield Field(dict(kwargs, **each_fields))

    def get_properties(self):
        return (
            f"{self.name}"
            f"\ntype={self.type}"
            f"\nmulti_value={self.multi_value}"
            f"\ncondition={self.condition}"
            f"\nvalidity={self.validity}"
            f"\nexpected_values={self.expected_values}"
            f"\nnegative_values={self.negative_values}"
        )

parse_fields(field_list, **kwargs) classmethod

Parse the fields from a list

Parameters:

Name Type Description Default
field_list list

list of field names

required
Source code in pytest_splunk_addon/addon_parser/fields.py
67
68
69
70
71
72
73
74
75
76
@classmethod
def parse_fields(cls, field_list, **kwargs):
    """
    Parse the fields from a list

    Args:
        field_list (list): list of field names
    """
    for each_fields in field_list:
        yield Field(dict(kwargs, **each_fields))

convert_to_fields(func)

Decorator to initialize the list of fields

Source code in pytest_splunk_addon/addon_parser/fields.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def convert_to_fields(func):
    """
    Decorator to initialize the list of fields
    """

    @wraps(func)
    def inner_func(*args, **kwargs):
        for each_field in func(*args, **kwargs):
            if each_field:
                yield Field({"name": each_field})

    return inner_func

TagsParser

Provides tags.conf parsing mechanism

TagsParser

Parses tags.conf and extracts tags

Parameters:

Name Type Description Default
splunk_app_path str

Path of the Splunk app

required
Source code in pytest_splunk_addon/addon_parser/tags_parser.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class TagsParser:
    """
    Parses tags.conf and extracts tags

    Args:
        splunk_app_path (str): Path of the Splunk app
    """

    def __init__(self, splunk_app_path: str, tags_data: Optional[Dict] = None):
        self._conf_parser = conf_parser.TABConfigParser()
        self.splunk_app_path = splunk_app_path
        self._tags = tags_data

    @property
    def tags(self) -> Optional[Dict]:
        if self._tags is not None:
            return self._tags
        tags_conf_path = os.path.join(self.splunk_app_path, "default", "tags.conf")
        LOGGER.info("Parsing tags.conf")
        self._conf_parser.read(tags_conf_path)
        self._tags = self._conf_parser.item_dict()
        return self._tags if self._tags else None

    def get_tags(self) -> Optional[Generator]:
        """
        Parse the tags.conf of the App & yield stanzas

        Yields:
            generator of stanzas from the tags
        """
        if not self.tags:
            return
        for stanza_key, stanza_values in self.tags.items():
            LOGGER.info(f"Parsing tags of stanza={stanza_key}")
            stanza_key = stanza_key.replace("=", '="') + '"'
            stanza_key = unquote(stanza_key)
            LOGGER.debug(f"Parsed tags-stanza={stanza_key}")
            for key, value in stanza_values.items():
                LOGGER.info(f"Parsing tag={key} enabled={value} of stanza={stanza_key}")
                tag_container = {
                    "stanza": stanza_key,
                    "tag": key,
                    "enabled": True if value == "enabled" else False,
                }
                yield tag_container

get_tags()

Parse the tags.conf of the App & yield stanzas

Yields:

Type Description
Optional[Generator]

generator of stanzas from the tags

Source code in pytest_splunk_addon/addon_parser/tags_parser.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def get_tags(self) -> Optional[Generator]:
    """
    Parse the tags.conf of the App & yield stanzas

    Yields:
        generator of stanzas from the tags
    """
    if not self.tags:
        return
    for stanza_key, stanza_values in self.tags.items():
        LOGGER.info(f"Parsing tags of stanza={stanza_key}")
        stanza_key = stanza_key.replace("=", '="') + '"'
        stanza_key = unquote(stanza_key)
        LOGGER.debug(f"Parsed tags-stanza={stanza_key}")
        for key, value in stanza_values.items():
            LOGGER.info(f"Parsing tag={key} enabled={value} of stanza={stanza_key}")
            tag_container = {
                "stanza": stanza_key,
                "tag": key,
                "enabled": True if value == "enabled" else False,
            }
            yield tag_container

TransformsParser

Provides transforms.conf parsing mechanism

TransformsParser

Bases: object

Parses transforms.conf and extracts fields

Parameters:

Name Type Description Default
splunk_app_path str

Path of the Splunk app

required
Source code in pytest_splunk_addon/addon_parser/transforms_parser.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
class TransformsParser(object):
    """
    Parses transforms.conf and extracts fields

    Args:
        splunk_app_path (str): Path of the Splunk app
    """

    def __init__(self, splunk_app_path: str, transforms_data: Optional[Dict] = None):
        self._conf_parser = conf_parser.TABConfigParser()
        self.splunk_app_path = splunk_app_path
        self._transforms = transforms_data
        self._parser_cache = ParserCache()

    @property
    def transforms(self) -> Optional[Dict]:
        if self._transforms is not None:
            return self._transforms

        def _parse_transforms():
            transforms_conf_path = os.path.join(
                self.splunk_app_path, "default", "transforms.conf"
            )
            LOGGER.info("Parsing transforms.conf")
            self._conf_parser.read(transforms_conf_path)
            return self._conf_parser.item_dict()

        self._transforms = self._parser_cache.get_or_parse(
            _parse_transforms, "transforms"
        )
        return self._transforms if self._transforms else None

    @convert_to_fields
    def get_transform_fields(self, transforms_stanza: str) -> Optional[Generator]:
        """
        Parse the transforms.conf of the App & yield fields of
        a specific stanza.

        Supported extractions from transforms.conf are

        * SOURCE_KEY = _raw
        * REGEX = some regex with (capturing_group)
        * FIELDS = one,

        Args:
            transforms_stanza (str):
                The stanza of which the fields should be extracted

        Regex:
            Parse the fields from a regex. Examples::

                (?<name>regex)
                (?'name'regex)
                (?P<name>regex)

        Yields:
            generator of fields
        """

        try:
            if not self.transforms:
                return
            transforms_values = self.transforms[transforms_stanza]
            if "SOURCE_KEY" in transforms_values:
                LOGGER.info(f"Parsing source_key of {transforms_stanza}")
                yield transforms_values["SOURCE_KEY"]
            if "REGEX" in transforms_values:
                LOGGER.info(f"Parsing REGEX of {transforms_stanza}")

                regex = r"\(\?P?[<'](?!_KEY|_VAL)([A-Za-z0-9_]+)[>']"
                match_fields = re.findall(regex, transforms_values["REGEX"])
                for each_field in match_fields:
                    if not each_field.startswith(("_KEY_", "_VAL_")):
                        yield each_field.strip()
            if "FIELDS" in transforms_values:
                LOGGER.info(f"Parsing FIELDS of {transforms_stanza}")
                fields_values = transforms_values["FIELDS"]
                for each_field in fields_values.split(","):
                    yield each_field.strip()
            if "FORMAT" in transforms_values:
                LOGGER.info(f"Parsing FORMAT of {transforms_stanza}")
                regex = r"(\S*)::"
                match_fields = re.findall(regex, transforms_values["FORMAT"])
                for each_field in match_fields:
                    if "$" not in each_field:
                        yield each_field.strip()
        except KeyError:
            LOGGER.error(
                f"The stanza {transforms_stanza} does not exists in transforms.conf."
            )

    def get_lookup_csv_fields(self, lookup_stanza: str) -> Optional[Generator]:
        """
        Parse the fields from a lookup file for a specific lookup_stanza

        Args:
            lookup_stanza (str): A lookup stanza mentioned in transforms.conf

        Yields:
            string of field names
        """
        if not self.transforms:
            return
        if lookup_stanza in self.transforms.keys():
            stanza_values = self.transforms[lookup_stanza]
            if "filename" in stanza_values:
                lookup_file = stanza_values["filename"]
                try:
                    location = os.path.join(
                        self.splunk_app_path, "lookups", lookup_file
                    )
                    with open(location) as csv_file:
                        reader = csv.DictReader(csv_file)
                        fieldnames = reader.fieldnames
                        for items in fieldnames:
                            yield items.strip()
                # If there is an error. the test should fail with the current fields
                # This makes sure the test doesn't exit prematurely
                except (OSError, IOError, UnboundLocalError, TypeError) as e:
                    LOGGER.error(
                        "Could not read the lookup file, skipping test. error=%s",
                        str(e),
                    )

    def get_sourcetype_from_transform(self, transform_stanza: str) -> Optional[str]:
        """
        Extract sourcetype from a transform stanza's FORMAT field.

        Looks for FORMAT field with pattern: sourcetype::<sourcetype_name>

        Args:
            transform_stanza (str): Name of the transform stanza in transforms.conf

        Returns:
            Extracted sourcetype name or None if not found

        Example:
            If transforms.conf has:
            [gcp_pubsub_activity_sourcetype]
            FORMAT = sourcetype::google:gcp:pubsub:audit:admin_activity

            Then get_sourcetype_from_transform("gcp_pubsub_activity_sourcetype")
            returns "google:gcp:pubsub:audit:admin_activity"
        """
        if not self.transforms:
            return None

        try:
            transforms_values = self.transforms[transform_stanza]
            if "FORMAT" not in transforms_values:
                return None

            format_value = transforms_values["FORMAT"]

            # Skip if format contains $ variables (like $1, $2, etc.)
            if "$" in format_value:
                LOGGER.debug(
                    "Skipping transform %s: FORMAT contains variables (%s)",
                    transform_stanza,
                    format_value,
                )
                return None

            # Match pattern: sourcetype::<sourcetype_name>
            # Case-insensitive, handles whitespace, handles quoted values
            regex = r"(?i)sourcetype\s*::\s*([^\s]+)"
            match = re.search(regex, format_value)

            if match:
                sourcetype = match.group(1).strip()
                # Remove quotes if present
                sourcetype = sourcetype.strip("\"'")
                LOGGER.debug(
                    "Extracted sourcetype %s from transform %s",
                    sourcetype,
                    transform_stanza,
                )
                return sourcetype

            return None
        except KeyError:
            LOGGER.warning(
                "Transform stanza %s not found in transforms.conf", transform_stanza
            )
            return None
        except Exception as e:
            LOGGER.warning(
                "Error extracting sourcetype from transform %s: %s",
                transform_stanza,
                str(e),
            )
            return None

get_lookup_csv_fields(lookup_stanza)

Parse the fields from a lookup file for a specific lookup_stanza

Parameters:

Name Type Description Default
lookup_stanza str

A lookup stanza mentioned in transforms.conf

required

Yields:

Type Description
Optional[Generator]

string of field names

Source code in pytest_splunk_addon/addon_parser/transforms_parser.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def get_lookup_csv_fields(self, lookup_stanza: str) -> Optional[Generator]:
    """
    Parse the fields from a lookup file for a specific lookup_stanza

    Args:
        lookup_stanza (str): A lookup stanza mentioned in transforms.conf

    Yields:
        string of field names
    """
    if not self.transforms:
        return
    if lookup_stanza in self.transforms.keys():
        stanza_values = self.transforms[lookup_stanza]
        if "filename" in stanza_values:
            lookup_file = stanza_values["filename"]
            try:
                location = os.path.join(
                    self.splunk_app_path, "lookups", lookup_file
                )
                with open(location) as csv_file:
                    reader = csv.DictReader(csv_file)
                    fieldnames = reader.fieldnames
                    for items in fieldnames:
                        yield items.strip()
            # If there is an error. the test should fail with the current fields
            # This makes sure the test doesn't exit prematurely
            except (OSError, IOError, UnboundLocalError, TypeError) as e:
                LOGGER.error(
                    "Could not read the lookup file, skipping test. error=%s",
                    str(e),
                )

get_sourcetype_from_transform(transform_stanza)

Extract sourcetype from a transform stanza’s FORMAT field.

Looks for FORMAT field with pattern: sourcetype::

Parameters:

Name Type Description Default
transform_stanza str

Name of the transform stanza in transforms.conf

required

Returns:

Type Description
Optional[str]

Extracted sourcetype name or None if not found

Example

If transforms.conf has: [gcp_pubsub_activity_sourcetype] FORMAT = sourcetype::google:gcp:pubsub:audit:admin_activity

Then get_sourcetype_from_transform(“gcp_pubsub_activity_sourcetype”) returns “google:gcp:pubsub:audit:admin_activity”

Source code in pytest_splunk_addon/addon_parser/transforms_parser.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def get_sourcetype_from_transform(self, transform_stanza: str) -> Optional[str]:
    """
    Extract sourcetype from a transform stanza's FORMAT field.

    Looks for FORMAT field with pattern: sourcetype::<sourcetype_name>

    Args:
        transform_stanza (str): Name of the transform stanza in transforms.conf

    Returns:
        Extracted sourcetype name or None if not found

    Example:
        If transforms.conf has:
        [gcp_pubsub_activity_sourcetype]
        FORMAT = sourcetype::google:gcp:pubsub:audit:admin_activity

        Then get_sourcetype_from_transform("gcp_pubsub_activity_sourcetype")
        returns "google:gcp:pubsub:audit:admin_activity"
    """
    if not self.transforms:
        return None

    try:
        transforms_values = self.transforms[transform_stanza]
        if "FORMAT" not in transforms_values:
            return None

        format_value = transforms_values["FORMAT"]

        # Skip if format contains $ variables (like $1, $2, etc.)
        if "$" in format_value:
            LOGGER.debug(
                "Skipping transform %s: FORMAT contains variables (%s)",
                transform_stanza,
                format_value,
            )
            return None

        # Match pattern: sourcetype::<sourcetype_name>
        # Case-insensitive, handles whitespace, handles quoted values
        regex = r"(?i)sourcetype\s*::\s*([^\s]+)"
        match = re.search(regex, format_value)

        if match:
            sourcetype = match.group(1).strip()
            # Remove quotes if present
            sourcetype = sourcetype.strip("\"'")
            LOGGER.debug(
                "Extracted sourcetype %s from transform %s",
                sourcetype,
                transform_stanza,
            )
            return sourcetype

        return None
    except KeyError:
        LOGGER.warning(
            "Transform stanza %s not found in transforms.conf", transform_stanza
        )
        return None
    except Exception as e:
        LOGGER.warning(
            "Error extracting sourcetype from transform %s: %s",
            transform_stanza,
            str(e),
        )
        return None

get_transform_fields(transforms_stanza)

Parse the transforms.conf of the App & yield fields of a specific stanza.

Supported extractions from transforms.conf are

  • SOURCE_KEY = _raw
  • REGEX = some regex with (capturing_group)
  • FIELDS = one,

Parameters:

Name Type Description Default
transforms_stanza str

The stanza of which the fields should be extracted

required
Regex

Parse the fields from a regex. Examples::

(?<name>regex)
(?'name'regex)
(?P<name>regex)

Yields:

Type Description
Optional[Generator]

generator of fields

Source code in pytest_splunk_addon/addon_parser/transforms_parser.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@convert_to_fields
def get_transform_fields(self, transforms_stanza: str) -> Optional[Generator]:
    """
    Parse the transforms.conf of the App & yield fields of
    a specific stanza.

    Supported extractions from transforms.conf are

    * SOURCE_KEY = _raw
    * REGEX = some regex with (capturing_group)
    * FIELDS = one,

    Args:
        transforms_stanza (str):
            The stanza of which the fields should be extracted

    Regex:
        Parse the fields from a regex. Examples::

            (?<name>regex)
            (?'name'regex)
            (?P<name>regex)

    Yields:
        generator of fields
    """

    try:
        if not self.transforms:
            return
        transforms_values = self.transforms[transforms_stanza]
        if "SOURCE_KEY" in transforms_values:
            LOGGER.info(f"Parsing source_key of {transforms_stanza}")
            yield transforms_values["SOURCE_KEY"]
        if "REGEX" in transforms_values:
            LOGGER.info(f"Parsing REGEX of {transforms_stanza}")

            regex = r"\(\?P?[<'](?!_KEY|_VAL)([A-Za-z0-9_]+)[>']"
            match_fields = re.findall(regex, transforms_values["REGEX"])
            for each_field in match_fields:
                if not each_field.startswith(("_KEY_", "_VAL_")):
                    yield each_field.strip()
        if "FIELDS" in transforms_values:
            LOGGER.info(f"Parsing FIELDS of {transforms_stanza}")
            fields_values = transforms_values["FIELDS"]
            for each_field in fields_values.split(","):
                yield each_field.strip()
        if "FORMAT" in transforms_values:
            LOGGER.info(f"Parsing FORMAT of {transforms_stanza}")
            regex = r"(\S*)::"
            match_fields = re.findall(regex, transforms_values["FORMAT"])
            for each_field in match_fields:
                if "$" not in each_field:
                    yield each_field.strip()
    except KeyError:
        LOGGER.error(
            f"The stanza {transforms_stanza} does not exists in transforms.conf."
        )

SavedsearchesParser

Provides savedsearches.conf parsing mechanism

SavedSearchParser

Bases: object

Parses savedsearches.conf and extracts savedsearches

Parameters:

Name Type Description Default
splunk_app_path str

Path of the Splunk app

required
Source code in pytest_splunk_addon/addon_parser/savedsearches_parser.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class SavedSearchParser(object):
    """
    Parses savedsearches.conf and extracts savedsearches

    Args:
        splunk_app_path (str): Path of the Splunk app
    """

    def __init__(self, splunk_app_path: str, savedsearches_data: Optional[Dict] = None):
        self._conf_parser = conf_parser.TABConfigParser()
        self.splunk_app_path = splunk_app_path
        self._savedsearches = savedsearches_data

    @property
    def savedsearches(self) -> Optional[Dict]:
        if self._savedsearches is not None:
            return self._savedsearches
        savedsearches_conf_path = os.path.join(
            self.splunk_app_path, "default", "savedsearches.conf"
        )
        LOGGER.info("Parsing savedsearches.conf")
        self._conf_parser.read(savedsearches_conf_path)
        self._savedsearches = self._conf_parser.item_dict()
        return self._savedsearches if self._savedsearches else None

    def get_savedsearches(self) -> Optional[Generator]:
        """
        Parse the App configuration files & yield savedsearches

        Yields:
            generator of list of savedsearches
        """
        if not self.savedsearches:
            return None
        for stanza_key, stanza_values in self.savedsearches.items():
            LOGGER.info(f"Parsing savedsearches of stanza={stanza_key}")
            savedsearch_container = {
                "stanza": stanza_key,
                "search": 'index = "main"',
                "dispatch.earliest_time": "0",
                "dispatch.latest_time": "now",
            }
            empty_value = ["None", "", " "]
            for key, value in stanza_values.items():
                if key in ("search", "dispatch.earliest_time", "dispatch.latest_time"):
                    if value not in empty_value:
                        savedsearch_container[key] = value
            yield savedsearch_container

get_savedsearches()

Parse the App configuration files & yield savedsearches

Yields:

Type Description
Optional[Generator]

generator of list of savedsearches

Source code in pytest_splunk_addon/addon_parser/savedsearches_parser.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def get_savedsearches(self) -> Optional[Generator]:
    """
    Parse the App configuration files & yield savedsearches

    Yields:
        generator of list of savedsearches
    """
    if not self.savedsearches:
        return None
    for stanza_key, stanza_values in self.savedsearches.items():
        LOGGER.info(f"Parsing savedsearches of stanza={stanza_key}")
        savedsearch_container = {
            "stanza": stanza_key,
            "search": 'index = "main"',
            "dispatch.earliest_time": "0",
            "dispatch.latest_time": "now",
        }
        empty_value = ["None", "", " "]
        for key, value in stanza_values.items():
            if key in ("search", "dispatch.earliest_time", "dispatch.latest_time"):
                if value not in empty_value:
                    savedsearch_container[key] = value
        yield savedsearch_container