Skip to content

AddonParser

The module provides the Add-on parsing mechanism. It can parse the knowledge objects from an Add-on’s configuration files

Supports: fields from props & transforms, tags, eventtypes

AddonParser

Bases: object

Parse the knowledge objects from an Add-on’s configuration files. Supports: fields from props & transforms, tags, eventtypes

Parameters:

Name Type Description Default
splunk_app_path str

Path to the Splunk App

required
Source code in pytest_splunk_addon/addon_parser/__init__.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class AddonParser(object):
    """
    Parse the knowledge objects from an Add-on's configuration files.
    Supports: fields from props & transforms, tags, eventtypes

    Args:
        splunk_app_path (str): Path to the Splunk App
    """

    def __init__(self, splunk_app_path):
        self.splunk_app_path = splunk_app_path
        self._props_parser = None
        self._tags_parser = None
        self._eventtype_parser = None
        self._savedsearch_parser = None

    @property
    def props_parser(self):
        if not self._props_parser:
            self._props_parser = PropsParser(self.splunk_app_path)
        return self._props_parser

    @property
    def tags_parser(self):
        if not self._tags_parser:
            self._tags_parser = TagsParser(self.splunk_app_path)
        return self._tags_parser

    @property
    def eventtype_parser(self):
        if not self._eventtype_parser:
            self._eventtype_parser = EventTypeParser(self.splunk_app_path)
        return self._eventtype_parser

    @property
    def savedsearch_parser(self):
        if not self._savedsearch_parser:
            self._savedsearch_parser = SavedSearchParser(self.splunk_app_path)
        return self._savedsearch_parser

    def get_props_fields(self):
        """
        Parse the props.conf and yield all supported fields

        Yields:
            generator of all the supported fields
        """
        return self.props_parser.get_props_fields()

    def get_tags(self):
        """
        Parse the tags.conf of the App & yield stanzas

        Yields:
            generator of stanzas from the tags
        """
        return self.tags_parser.get_tags()

    def get_eventtypes(self):
        """
        Parse the App configuration files & yield eventtypes

        Yields:
            generator of list of eventtypes
        """
        return self.eventtype_parser.get_eventtypes()

    def get_savedsearches(self):
        """
        Parse the App configuration files & yield searchedservices

        Yields:
            generator of list of searchedservices
        """
        return self.savedsearch_parser.get_savedsearches()

get_eventtypes()

Parse the App configuration files & yield eventtypes

Yields:

Type Description

generator of list of eventtypes

Source code in pytest_splunk_addon/addon_parser/__init__.py
 95
 96
 97
 98
 99
100
101
102
def get_eventtypes(self):
    """
    Parse the App configuration files & yield eventtypes

    Yields:
        generator of list of eventtypes
    """
    return self.eventtype_parser.get_eventtypes()

get_props_fields()

Parse the props.conf and yield all supported fields

Yields:

Type Description

generator of all the supported fields

Source code in pytest_splunk_addon/addon_parser/__init__.py
77
78
79
80
81
82
83
84
def get_props_fields(self):
    """
    Parse the props.conf and yield all supported fields

    Yields:
        generator of all the supported fields
    """
    return self.props_parser.get_props_fields()

get_savedsearches()

Parse the App configuration files & yield searchedservices

Yields:

Type Description

generator of list of searchedservices

Source code in pytest_splunk_addon/addon_parser/__init__.py
104
105
106
107
108
109
110
111
def get_savedsearches(self):
    """
    Parse the App configuration files & yield searchedservices

    Yields:
        generator of list of searchedservices
    """
    return self.savedsearch_parser.get_savedsearches()

get_tags()

Parse the tags.conf of the App & yield stanzas

Yields:

Type Description

generator of stanzas from the tags

Source code in pytest_splunk_addon/addon_parser/__init__.py
86
87
88
89
90
91
92
93
def get_tags(self):
    """
    Parse the tags.conf of the App & yield stanzas

    Yields:
        generator of stanzas from the tags
    """
    return self.tags_parser.get_tags()

PropsParser

Provides props.conf parsing mechanism

PropsParser

Bases: object

Parses props.conf and extracts the fields.

Parameters:

Name Type Description Default
splunk_app_path str

Path of the Splunk app

required
Source code in pytest_splunk_addon/addon_parser/props_parser.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
class PropsParser(object):
    """
    Parses props.conf and extracts the fields.

    Args:
        splunk_app_path (str): Path of the Splunk app
    """

    def __init__(self, splunk_app_path: str):
        self._conf_parser = conf_parser.TABConfigParser()
        self.splunk_app_path = splunk_app_path
        self._props = None
        self.transforms_parser = TransformsParser(self.splunk_app_path)

    @property
    def props(self) -> Optional[Dict]:
        if self._props is not None:
            return self._props
        props_conf_path = os.path.join(self.splunk_app_path, "default", "props.conf")
        LOGGER.info("Parsing props.conf")
        self._conf_parser.read(props_conf_path)
        self._props = self._conf_parser.item_dict()
        return self._props if self._props else None

    def get_props_fields(self):
        """
        Parse the props.conf and yield all supported fields

        Yields:
            generator of all the supported fields
        """
        for stanza_type, stanza_name, stanza_values in self._get_props_stanzas():
            for key, value in stanza_values.items():
                LOGGER.info(f"Parsing parameter={key} of stanza={stanza_name}")
                if not re.match("REPORT", key, re.IGNORECASE):
                    LOGGER.info(f"Trying to parse classname={key}")
                    parsing_method = self._get_props_method(key)
                    if parsing_method:
                        field_list = list(parsing_method(key, value))
                        if field_list:
                            yield {
                                "stanza": stanza_name,
                                "stanza_type": stanza_type,
                                "classname": key,
                                "fields": field_list,
                            }
                else:
                    for transform_stanza, fields in self._get_report_fields(key, value):
                        field_list = list(fields)
                        if field_list:
                            yield {
                                "stanza": stanza_name,
                                "stanza_type": stanza_type,
                                "classname": f"{key}::{transform_stanza}",
                                "fields": field_list,
                            }

    def _get_props_method(self, class_name: str):
        """
        Get the parsing method depending on classname

        Args:
            class_name (str): class name of the props property

        Returns:
            instance method to parse the property
        """
        method_mapping = {
            "EXTRACT": self._get_extract_fields,
            "EVAL": self._get_eval_fields,
            "FIELDALIAS": self._get_fieldalias_fields,
            "LOOKUP": self._get_lookup_fields,
        }
        for each_type in method_mapping:
            if re.match(each_type, class_name, re.IGNORECASE):
                LOGGER.info(f"Matched method of type={each_type}")
                return method_mapping[each_type]
        else:
            if utils.check_first_worker():
                LOGGER.warning(f"No parser available for {class_name}. Skipping...")

    def _get_props_stanzas(self) -> Optional[Generator]:
        """
        Parse the props.conf of the App & yield stanzas.
        For source with | (OR), it will return all combinations

        Yields:
            generator of stanzas from the props
        """
        if not self.props:
            return
        for stanza_name, stanza_values in self.props.items():
            if stanza_name.startswith("host::"):
                LOGGER.warning("Host stanza is not supported. Skipping..")
                continue
            if stanza_name.startswith("source::"):
                LOGGER.info(f"Parsing Source based stanza: {stanza_name}")
                for each_source in self.get_list_of_sources(stanza_name):
                    yield "source", each_source, stanza_values
            else:
                LOGGER.info(f"Parsing Sourcetype based stanza: {stanza_name}")
                yield "sourcetype", stanza_name, stanza_values

    @staticmethod
    def get_list_of_sources(source: str) -> Generator:
        """
        For source with | (OR), it will return all combinations.
        Uses itertools.product to list the combinations

        Example::

            input "(preA|preB)str(postX|postY)"
            output [
                preAstrpostX
                preBstrpostX
                preAstrpostY
                preBstrpostY
            ]

        Args:
            source (str): Source name

        Yields:
            generator of source name
        """
        LOGGER.debug("Finding combinations of a source..")
        match_obj = re.search(r"source::(.*)", source)
        value = match_obj.group(1).replace("...", "*")
        sub_groups = re.findall(r"\([^\)]+\)", value)
        sub_group_list = []
        for each_group in sub_groups:
            sub_group_list.append(each_group.strip("()").split("|"))
        template = re.sub(r"\([^\)]+\)", "{}", value)
        count = 0
        for each_permutation in product(*sub_group_list):
            count += 1
            yield template.format(*each_permutation)
        LOGGER.debug("Found %d combinations", count)

    @convert_to_fields
    def _get_extract_fields(self, name: str, value: str):
        """
        Returns the fields parsed from EXTRACT

        Example::

            EXTRACT-one = regex with (?<capturing_group>.*)

        Args:
            name (str): key in the configuration settings
            value (str): value of the respective name in the configuration

        Regex:
            Parse the fields from a regex. Examples,

            * (?<name>regex)
            * (?'name'regex)
            * (?P<name>regex)

        Yields:
            generator of fields
        """
        regex = r"\(\?P?(?:[<'])([^\>'\s]+)[\>']"
        fields_group = []
        for field in re.findall(regex, value):
            if not field.startswith(("_KEY_", "_VAL_")):
                fields_group.append(field)
                yield field

        # If SOURCE_KEY is used in EXTRACT, generate the test for the same.
        regex_for_source_key = r"(?:(?i)in\s+(\w+))\s*$"
        extract_source_key = re.search(regex_for_source_key, value, re.MULTILINE)
        if extract_source_key:
            LOGGER.info(f"Found a source key in {name}")
            yield extract_source_key.group(1)
            fields_group.insert(0, extract_source_key.group(1))

    @convert_to_fields
    def _get_eval_fields(self, name, value):
        """
        Return the fields parsed from EVAL

        Example::

            EVAL-action = if(isnull(action), "unknown", action)

        Args:
            name (str): key in the configuration settings
            value (str): value of the respective name in the configuration

        Yields:
            generator of fields
        """
        regex = r"EVAL-(?P<FIELD>.*)"
        if not value == "null()":
            yield from re.findall(regex, name, re.IGNORECASE)

    @convert_to_fields
    def _get_fieldalias_fields(self, name: str, value: str):
        """
        Return the fields parsed from FIELDALIAS

        Example::

            FIELDALIAS-class = source AS dest, sc2 AS dest2

        Args:
            name (str): key in the configuration settings
            value (str): value of the respective name in the configuration

        Regex:
            Description:

            * Find all field alias group separated by space or comma

            Examples:

            * field_source AS field_destination
            * "Field Source" as "Field Destination"
            * field_source ASNEW 'Field Destination'
            * field_source asnew field_destination

        Yields:
            generator of fields
        """
        regex = (
            r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)"
            r"\s+(?i)(?:as(?:new)?)\s+"
            r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)"
        )
        fields_tuples = re.findall(regex, value, re.IGNORECASE)
        return list(set([item for t in fields_tuples for item in t]))

    def _get_report_fields(self, name: str, value: str):
        """
        Returns the fields parsed from REPORT

        In order to parse the fields REPORT, the method parses the
        transforms.conf and returns the list

        Args:
            name (str): key in the configuration settings
            value (str): value of the respective name in the configuration

        Yields:
            generator of (transform_stanza ,fields) parsed from transforms.conf
        """

        transforms_itr = (each_stanza.strip() for each_stanza in value.split(","))
        for transforms_section in transforms_itr:
            yield (
                transforms_section,
                self.transforms_parser.get_transform_fields(transforms_section),
            )

    @convert_to_fields
    def _get_lookup_fields(self, name: str, value: str):
        """
        Extracts the lookup fields

        Args:
            name (str): key in the configuration settings
            value (str): value of the respective name in the configuration

        Returns:
            List of lookup fields
        """
        parsed_fields = self._parse_lookup(value)
        lookup_field_list = (
            parsed_fields["input_fields"] + parsed_fields["output_fields"]
        )

        # If the OUTPUT or OUTPUTNEW argument is never used, then get the fields from the csv file
        if not parsed_fields["output_fields"]:
            LOGGER.info(
                "OUTPUT fields not found classname=%s. Parsing the lookup csv file",
                name,
            )
            lookup_field_list += list(
                self.transforms_parser.get_lookup_csv_fields(
                    parsed_fields["lookup_stanza"]
                )
            )
        return list(set(lookup_field_list))

    def _parse_lookup(self, lookup: str):
        """
        Get list of lookup fields by parsing the lookup string.
        If a field is aliased to another field, take the aliased field into consideration

        Example::

            LOOKUP-class = lookup_stanza input_field OUTPUT output_field

        Args:
            lookup_str (str): Lookup string from props.conf

        Regex:
            Parse the fields from the lookup string. Examples,

            * field1 AS field2, field3 field4 as field5

        Returns:
            (dict):
                lookup_stanza (str): The stanza name for the lookup in question in transforms.conf
                input_fields (list): The fields in the input of the lookup
                output_fields (list): The fields in the output of the lookup
        """

        input_output_field_list = []
        lookup_stanza = lookup.split(" ")[0]
        lookup_str = " ".join(lookup.split(" ")[1:])

        # 0: Take the left side of the OUTPUT as input fields
        # -1: Take the right side of the OUTPUT as output fields
        for input_output_index in [0, -1]:
            if "OUTPUT" not in lookup_str:
                lookup_str += " OUTPUT "

            # Take input fields or output fields depending on the input_output_index
            input_output_str = lookup_str.split("OUTPUTNEW")[input_output_index].split(
                "OUTPUT"
            )[input_output_index]

            field_parser = r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)\s*(?:[aA][sS]\s+(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+))?"
            # field_groups: Group of max 2 fields - (source, destination) for "source as destination"
            field_groups = re.findall(field_parser, input_output_str)

            field_list = []
            # Take the last non-empty field from a field group.
            # Taking last non-empty field ensures that the aliased value will have
            # higher priority
            for each_group in field_groups:
                field_list.append(
                    [each_field for each_field in reversed(each_group) if each_field][0]
                )

            input_output_field_list.append(field_list)
        return {
            "input_fields": input_output_field_list[0],
            "output_fields": input_output_field_list[1],
            "lookup_stanza": lookup_stanza,
        }

get_list_of_sources(source) staticmethod

For source with | (OR), it will return all combinations. Uses itertools.product to list the combinations

Example::

input "(preA|preB)str(postX|postY)"
output [
    preAstrpostX
    preBstrpostX
    preAstrpostY
    preBstrpostY
]

Parameters:

Name Type Description Default
source str

Source name

required

Yields:

Type Description
Generator

generator of source name

Source code in pytest_splunk_addon/addon_parser/props_parser.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@staticmethod
def get_list_of_sources(source: str) -> Generator:
    """
    For source with | (OR), it will return all combinations.
    Uses itertools.product to list the combinations

    Example::

        input "(preA|preB)str(postX|postY)"
        output [
            preAstrpostX
            preBstrpostX
            preAstrpostY
            preBstrpostY
        ]

    Args:
        source (str): Source name

    Yields:
        generator of source name
    """
    LOGGER.debug("Finding combinations of a source..")
    match_obj = re.search(r"source::(.*)", source)
    value = match_obj.group(1).replace("...", "*")
    sub_groups = re.findall(r"\([^\)]+\)", value)
    sub_group_list = []
    for each_group in sub_groups:
        sub_group_list.append(each_group.strip("()").split("|"))
    template = re.sub(r"\([^\)]+\)", "{}", value)
    count = 0
    for each_permutation in product(*sub_group_list):
        count += 1
        yield template.format(*each_permutation)
    LOGGER.debug("Found %d combinations", count)

get_props_fields()

Parse the props.conf and yield all supported fields

Yields:

Type Description

generator of all the supported fields

Source code in pytest_splunk_addon/addon_parser/props_parser.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def get_props_fields(self):
    """
    Parse the props.conf and yield all supported fields

    Yields:
        generator of all the supported fields
    """
    for stanza_type, stanza_name, stanza_values in self._get_props_stanzas():
        for key, value in stanza_values.items():
            LOGGER.info(f"Parsing parameter={key} of stanza={stanza_name}")
            if not re.match("REPORT", key, re.IGNORECASE):
                LOGGER.info(f"Trying to parse classname={key}")
                parsing_method = self._get_props_method(key)
                if parsing_method:
                    field_list = list(parsing_method(key, value))
                    if field_list:
                        yield {
                            "stanza": stanza_name,
                            "stanza_type": stanza_type,
                            "classname": key,
                            "fields": field_list,
                        }
            else:
                for transform_stanza, fields in self._get_report_fields(key, value):
                    field_list = list(fields)
                    if field_list:
                        yield {
                            "stanza": stanza_name,
                            "stanza_type": stanza_type,
                            "classname": f"{key}::{transform_stanza}",
                            "fields": field_list,
                        }

EventtypeParser

Provides eventtypes.conf parsing mechanism

EventTypeParser

Bases: object

Parses eventtypes.conf and extracts eventtypes

Parameters:

Name Type Description Default
splunk_app_path str

Path of the Splunk app

required
Source code in pytest_splunk_addon/addon_parser/eventtype_parser.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class EventTypeParser(object):
    """
    Parses eventtypes.conf and extracts eventtypes

    Args:
        splunk_app_path (str): Path of the Splunk app
    """

    def __init__(self, splunk_app_path: str):
        self._conf_parser = conf_parser.TABConfigParser()
        self.splunk_app_path = splunk_app_path
        self._eventtypes = None

    @property
    def eventtypes(self) -> Optional[Dict]:
        if self._eventtypes is not None:
            return self._eventtypes
        eventtypes_conf_path = os.path.join(
            self.splunk_app_path, "default", "eventtypes.conf"
        )
        LOGGER.info("Parsing eventtypes.conf")
        self._conf_parser.read(eventtypes_conf_path)
        self._eventtypes = self._conf_parser.item_dict()
        return self._eventtypes if self._eventtypes else None

    def get_eventtypes(self) -> Optional[Generator]:
        """
        Parse the App configuration files & yield eventtypes

        Yields:
            generator of list of eventtypes
        """
        if not self.eventtypes:
            return None
        for stanza_key in self.eventtypes.keys():
            LOGGER.info("Parsing eventtype stanza=%s", stanza_key)
            yield {"stanza": stanza_key}

get_eventtypes()

Parse the App configuration files & yield eventtypes

Yields:

Type Description
Optional[Generator]

generator of list of eventtypes

Source code in pytest_splunk_addon/addon_parser/eventtype_parser.py
55
56
57
58
59
60
61
62
63
64
65
66
def get_eventtypes(self) -> Optional[Generator]:
    """
    Parse the App configuration files & yield eventtypes

    Yields:
        generator of list of eventtypes
    """
    if not self.eventtypes:
        return None
    for stanza_key in self.eventtypes.keys():
        LOGGER.info("Parsing eventtype stanza=%s", stanza_key)
        yield {"stanza": stanza_key}

Field

Provides the Field class containing all the field properties and a decorator to convert a list to field list

Field

Bases: object

Contains the field properties

  • name (str): name of the field
  • type (str): Field type. Supported [required, conditional, optional]
  • multi_value (bool): True if field is multi value field
  • expected_values (list): The field should have this expected values
  • negative_values (list): The field should not have negative values
  • condition (spl): The field should only be checked if the condition satisfies
  • validity (eval): eval statement to extract the valid fields only

Parameters:

Name Type Description Default
field_json dict

dictionary containing field properties

None
Source code in pytest_splunk_addon/addon_parser/fields.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class Field(object):
    """
    Contains the field properties

    * name (str): name of the field
    * type (str): Field type. Supported [required, conditional, optional]
    * multi_value (bool): True if field is multi value field
    * expected_values (list): The field should have this expected values
    * negative_values (list): The field should not have negative values
    * condition (spl): The field should only be checked if the condition satisfies
    * validity (eval): eval statement to extract the valid fields only

    Args:
        field_json (dict): dictionary containing field properties
    """

    SUPPORTED_TYPES = ["required", "conditional", "optional"]

    def __init__(self, field_json=None):
        self.name = field_json.get("name")
        self.type = field_json.get("type") or "required"
        self.multi_value = field_json.get("multi_value") or False
        self.expected_values = field_json.get("expected_values", ["*"])
        self.negative_values = field_json.get("negative_values", ["-", ""])
        self.condition = field_json.get("condition") or ""
        self.validity = field_json.get("validity") or self.name

    def __str__(self):
        return str(self.name)

    def __eq__(self, other: "Field"):
        return self.__dict__ == other.__dict__

    def __lt__(self, other: "Field"):
        return self.name < other.name

    def __repr__(self):
        return f"<Field name={self.name}>"

    def get_type(self):
        return self.type

    @classmethod
    def parse_fields(cls, field_list, **kwargs):
        """
        Parse the fields from a list

        Args:
            field_list (list): list of field names
        """
        for each_fields in field_list:
            yield Field(dict(kwargs, **each_fields))

    def get_properties(self):
        return (
            f"{self.name}"
            f"\ntype={self.type}"
            f"\nmulti_value={self.multi_value}"
            f"\ncondition={self.condition}"
            f"\nvalidity={self.validity}"
            f"\nexpected_values={self.expected_values}"
            f"\nnegative_values={self.negative_values}"
        )

parse_fields(field_list, **kwargs) classmethod

Parse the fields from a list

Parameters:

Name Type Description Default
field_list list

list of field names

required
Source code in pytest_splunk_addon/addon_parser/fields.py
67
68
69
70
71
72
73
74
75
76
@classmethod
def parse_fields(cls, field_list, **kwargs):
    """
    Parse the fields from a list

    Args:
        field_list (list): list of field names
    """
    for each_fields in field_list:
        yield Field(dict(kwargs, **each_fields))

convert_to_fields(func)

Decorator to initialize the list of fields

Source code in pytest_splunk_addon/addon_parser/fields.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def convert_to_fields(func):
    """
    Decorator to initialize the list of fields
    """

    @wraps(func)
    def inner_func(*args, **kwargs):
        for each_field in func(*args, **kwargs):
            if each_field:
                yield Field({"name": each_field})

    return inner_func

TagsParser

Provides tags.conf parsing mechanism

TagsParser

Parses tags.conf and extracts tags

Parameters:

Name Type Description Default
splunk_app_path str

Path of the Splunk app

required
Source code in pytest_splunk_addon/addon_parser/tags_parser.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class TagsParser:
    """
    Parses tags.conf and extracts tags

    Args:
        splunk_app_path (str): Path of the Splunk app
    """

    def __init__(self, splunk_app_path: str):
        self._conf_parser = conf_parser.TABConfigParser()
        self.splunk_app_path = splunk_app_path
        self._tags = None

    @property
    def tags(self) -> Optional[Dict]:
        if self._tags is not None:
            return self._tags
        tags_conf_path = os.path.join(self.splunk_app_path, "default", "tags.conf")
        LOGGER.info("Parsing tags.conf")
        self._conf_parser.read(tags_conf_path)
        self._tags = self._conf_parser.item_dict()
        return self._tags if self._tags else None

    def get_tags(self) -> Optional[Generator]:
        """
        Parse the tags.conf of the App & yield stanzas

        Yields:
            generator of stanzas from the tags
        """
        if not self.tags:
            return
        for stanza_key, stanza_values in self.tags.items():
            LOGGER.info(f"Parsing tags of stanza={stanza_key}")
            stanza_key = stanza_key.replace("=", '="') + '"'
            stanza_key = unquote(stanza_key)
            LOGGER.debug(f"Parsed tags-stanza={stanza_key}")
            for key, value in stanza_values.items():
                LOGGER.info(f"Parsing tag={key} enabled={value} of stanza={stanza_key}")
                tag_container = {
                    "stanza": stanza_key,
                    "tag": key,
                    "enabled": True if value == "enabled" else False,
                }
                yield tag_container

get_tags()

Parse the tags.conf of the App & yield stanzas

Yields:

Type Description
Optional[Generator]

generator of stanzas from the tags

Source code in pytest_splunk_addon/addon_parser/tags_parser.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def get_tags(self) -> Optional[Generator]:
    """
    Parse the tags.conf of the App & yield stanzas

    Yields:
        generator of stanzas from the tags
    """
    if not self.tags:
        return
    for stanza_key, stanza_values in self.tags.items():
        LOGGER.info(f"Parsing tags of stanza={stanza_key}")
        stanza_key = stanza_key.replace("=", '="') + '"'
        stanza_key = unquote(stanza_key)
        LOGGER.debug(f"Parsed tags-stanza={stanza_key}")
        for key, value in stanza_values.items():
            LOGGER.info(f"Parsing tag={key} enabled={value} of stanza={stanza_key}")
            tag_container = {
                "stanza": stanza_key,
                "tag": key,
                "enabled": True if value == "enabled" else False,
            }
            yield tag_container

TransformsParser

Provides transforms.conf parsing mechanism

TransformsParser

Bases: object

Parses transforms.conf and extracts fields

Parameters:

Name Type Description Default
splunk_app_path str

Path of the Splunk app

required
Source code in pytest_splunk_addon/addon_parser/transforms_parser.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class TransformsParser(object):
    """
    Parses transforms.conf and extracts fields

    Args:
        splunk_app_path (str): Path of the Splunk app
    """

    def __init__(self, splunk_app_path: str):
        self._conf_parser = conf_parser.TABConfigParser()
        self.splunk_app_path = splunk_app_path
        self._transforms = None

    @property
    def transforms(self) -> Optional[Dict]:
        if self._transforms is not None:
            return self._transforms
        transforms_conf_path = os.path.join(
            self.splunk_app_path, "default", "transforms.conf"
        )
        LOGGER.info("Parsing transforms.conf")
        self._conf_parser.read(transforms_conf_path)
        self._transforms = self._conf_parser.item_dict()
        return self._transforms if self._transforms else None

    @convert_to_fields
    def get_transform_fields(self, transforms_stanza: str) -> Optional[Generator]:
        """
        Parse the transforms.conf of the App & yield fields of
        a specific stanza.

        Supported extractions from transforms.conf are

        * SOURCE_KEY = _raw
        * REGEX = some regex with (capturing_group)
        * FIELDS = one,

        Args:
            transforms_stanza (str):
                The stanza of which the fields should be extracted

        Regex:
            Parse the fields from a regex. Examples::

                (?<name>regex)
                (?'name'regex)
                (?P<name>regex)

        Yields:
            generator of fields
        """

        try:
            if not self.transforms:
                return
            transforms_values = self.transforms[transforms_stanza]
            if "SOURCE_KEY" in transforms_values:
                LOGGER.info(f"Parsing source_key of {transforms_stanza}")
                yield transforms_values["SOURCE_KEY"]
            if "REGEX" in transforms_values:
                LOGGER.info(f"Parsing REGEX of {transforms_stanza}")

                regex = r"\(\?P?[<'](?!_KEY|_VAL)([A-Za-z0-9_]+)[>']"
                match_fields = re.findall(regex, transforms_values["REGEX"])
                for each_field in match_fields:
                    if not each_field.startswith(("_KEY_", "_VAL_")):
                        yield each_field.strip()
            if "FIELDS" in transforms_values:
                LOGGER.info(f"Parsing FIELDS of {transforms_stanza}")
                fields_values = transforms_values["FIELDS"]
                for each_field in fields_values.split(","):
                    yield each_field.strip()
            if "FORMAT" in transforms_values:
                LOGGER.info(f"Parsing FORMAT of {transforms_stanza}")
                regex = r"(\S*)::"
                match_fields = re.findall(regex, transforms_values["FORMAT"])
                for each_field in match_fields:
                    if "$" not in each_field:
                        yield each_field.strip()
        except KeyError:
            LOGGER.error(
                f"The stanza {transforms_stanza} does not exists in transforms.conf."
            )

    def get_lookup_csv_fields(self, lookup_stanza: str) -> Optional[Generator]:
        """
        Parse the fields from a lookup file for a specific lookup_stanza

        Args:
            lookup_stanza (str): A lookup stanza mentioned in transforms.conf

        Yields:
            string of field names
        """
        if not self.transforms:
            return
        if lookup_stanza in self.transforms.keys():
            stanza_values = self.transforms[lookup_stanza]
            if "filename" in stanza_values:
                lookup_file = stanza_values["filename"]
                try:
                    location = os.path.join(
                        self.splunk_app_path, "lookups", lookup_file
                    )
                    with open(location) as csv_file:
                        reader = csv.DictReader(csv_file)
                        fieldnames = reader.fieldnames
                        for items in fieldnames:
                            yield items.strip()
                # If there is an error. the test should fail with the current fields
                # This makes sure the test doesn't exit prematurely
                except (OSError, IOError, UnboundLocalError, TypeError) as e:
                    LOGGER.error(
                        "Could not read the lookup file, skipping test. error=%s",
                        str(e),
                    )

get_lookup_csv_fields(lookup_stanza)

Parse the fields from a lookup file for a specific lookup_stanza

Parameters:

Name Type Description Default
lookup_stanza str

A lookup stanza mentioned in transforms.conf

required

Yields:

Type Description
Optional[Generator]

string of field names

Source code in pytest_splunk_addon/addon_parser/transforms_parser.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def get_lookup_csv_fields(self, lookup_stanza: str) -> Optional[Generator]:
    """
    Parse the fields from a lookup file for a specific lookup_stanza

    Args:
        lookup_stanza (str): A lookup stanza mentioned in transforms.conf

    Yields:
        string of field names
    """
    if not self.transforms:
        return
    if lookup_stanza in self.transforms.keys():
        stanza_values = self.transforms[lookup_stanza]
        if "filename" in stanza_values:
            lookup_file = stanza_values["filename"]
            try:
                location = os.path.join(
                    self.splunk_app_path, "lookups", lookup_file
                )
                with open(location) as csv_file:
                    reader = csv.DictReader(csv_file)
                    fieldnames = reader.fieldnames
                    for items in fieldnames:
                        yield items.strip()
            # If there is an error. the test should fail with the current fields
            # This makes sure the test doesn't exit prematurely
            except (OSError, IOError, UnboundLocalError, TypeError) as e:
                LOGGER.error(
                    "Could not read the lookup file, skipping test. error=%s",
                    str(e),
                )

get_transform_fields(transforms_stanza)

Parse the transforms.conf of the App & yield fields of a specific stanza.

Supported extractions from transforms.conf are

  • SOURCE_KEY = _raw
  • REGEX = some regex with (capturing_group)
  • FIELDS = one,

Parameters:

Name Type Description Default
transforms_stanza str

The stanza of which the fields should be extracted

required
Regex

Parse the fields from a regex. Examples::

(?<name>regex)
(?'name'regex)
(?P<name>regex)

Yields:

Type Description
Optional[Generator]

generator of fields

Source code in pytest_splunk_addon/addon_parser/transforms_parser.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@convert_to_fields
def get_transform_fields(self, transforms_stanza: str) -> Optional[Generator]:
    """
    Parse the transforms.conf of the App & yield fields of
    a specific stanza.

    Supported extractions from transforms.conf are

    * SOURCE_KEY = _raw
    * REGEX = some regex with (capturing_group)
    * FIELDS = one,

    Args:
        transforms_stanza (str):
            The stanza of which the fields should be extracted

    Regex:
        Parse the fields from a regex. Examples::

            (?<name>regex)
            (?'name'regex)
            (?P<name>regex)

    Yields:
        generator of fields
    """

    try:
        if not self.transforms:
            return
        transforms_values = self.transforms[transforms_stanza]
        if "SOURCE_KEY" in transforms_values:
            LOGGER.info(f"Parsing source_key of {transforms_stanza}")
            yield transforms_values["SOURCE_KEY"]
        if "REGEX" in transforms_values:
            LOGGER.info(f"Parsing REGEX of {transforms_stanza}")

            regex = r"\(\?P?[<'](?!_KEY|_VAL)([A-Za-z0-9_]+)[>']"
            match_fields = re.findall(regex, transforms_values["REGEX"])
            for each_field in match_fields:
                if not each_field.startswith(("_KEY_", "_VAL_")):
                    yield each_field.strip()
        if "FIELDS" in transforms_values:
            LOGGER.info(f"Parsing FIELDS of {transforms_stanza}")
            fields_values = transforms_values["FIELDS"]
            for each_field in fields_values.split(","):
                yield each_field.strip()
        if "FORMAT" in transforms_values:
            LOGGER.info(f"Parsing FORMAT of {transforms_stanza}")
            regex = r"(\S*)::"
            match_fields = re.findall(regex, transforms_values["FORMAT"])
            for each_field in match_fields:
                if "$" not in each_field:
                    yield each_field.strip()
    except KeyError:
        LOGGER.error(
            f"The stanza {transforms_stanza} does not exists in transforms.conf."
        )

SavedsearchesParser

Provides savedsearches.conf parsing mechanism

SavedSearchParser

Bases: object

Parses savedsearches.conf and extracts savedsearches

Parameters:

Name Type Description Default
splunk_app_path str

Path of the Splunk app

required
Source code in pytest_splunk_addon/addon_parser/savedsearches_parser.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class SavedSearchParser(object):
    """
    Parses savedsearches.conf and extracts savedsearches

    Args:
        splunk_app_path (str): Path of the Splunk app
    """

    def __init__(self, splunk_app_path: str):
        self._conf_parser = conf_parser.TABConfigParser()
        self.splunk_app_path = splunk_app_path
        self._savedsearches = None

    @property
    def savedsearches(self) -> Optional[Dict]:
        if self._savedsearches is not None:
            return self._savedsearches
        savedsearches_conf_path = os.path.join(
            self.splunk_app_path, "default", "savedsearches.conf"
        )
        LOGGER.info("Parsing savedsearches.conf")
        self._conf_parser.read(savedsearches_conf_path)
        self._savedsearches = self._conf_parser.item_dict()
        return self._savedsearches if self._savedsearches else None

    def get_savedsearches(self) -> Optional[Generator]:
        """
        Parse the App configuration files & yield savedsearches

        Yields:
            generator of list of savedsearches
        """
        if not self.savedsearches:
            return None
        for stanza_key, stanza_values in self.savedsearches.items():
            LOGGER.info(f"Parsing savedsearches of stanza={stanza_key}")
            savedsearch_container = {
                "stanza": stanza_key,
                "search": 'index = "main"',
                "dispatch.earliest_time": "0",
                "dispatch.latest_time": "now",
            }
            empty_value = ["None", "", " "]
            for key, value in stanza_values.items():
                if key in ("search", "dispatch.earliest_time", "dispatch.latest_time"):
                    if value not in empty_value:
                        savedsearch_container[key] = value
            yield savedsearch_container

get_savedsearches()

Parse the App configuration files & yield savedsearches

Yields:

Type Description
Optional[Generator]

generator of list of savedsearches

Source code in pytest_splunk_addon/addon_parser/savedsearches_parser.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def get_savedsearches(self) -> Optional[Generator]:
    """
    Parse the App configuration files & yield savedsearches

    Yields:
        generator of list of savedsearches
    """
    if not self.savedsearches:
        return None
    for stanza_key, stanza_values in self.savedsearches.items():
        LOGGER.info(f"Parsing savedsearches of stanza={stanza_key}")
        savedsearch_container = {
            "stanza": stanza_key,
            "search": 'index = "main"',
            "dispatch.earliest_time": "0",
            "dispatch.latest_time": "now",
        }
        empty_value = ["None", "", " "]
        for key, value in stanza_values.items():
            if key in ("search", "dispatch.earliest_time", "dispatch.latest_time"):
                if value not in empty_value:
                    savedsearch_container[key] = value
        yield savedsearch_container