Skip to content

CimTests

Test generation mechanism to verify the CIM compatibility of an Add-on

TestTemplates

Includes the test scenarios to check the CIM compatibility of an Add-on.

CIMTestTemplates

Bases: object

Test scenarios to check the CIM compatibility of an Add-on Supported Test scenarios:

- The eventtype should exctract all required fields of data model
- One eventtype should not be mapped with more than one data model
- Field Cluster should be verified (should be included with required field test)
- Verify if CIM installed or not
- Not Allowed Fields should not be extracted
Source code in pytest_splunk_addon/cim_tests/test_templates.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
class CIMTestTemplates(object):
    """
    Test scenarios to check the CIM compatibility of an Add-on
    Supported Test scenarios:

        - The eventtype should exctract all required fields of data model
        - One eventtype should not be mapped with more than one data model
        - Field Cluster should be verified (should be included with required field test)
        - Verify if CIM installed or not
        - Not Allowed Fields should not be extracted
    """

    logger = logging.getLogger("pytest-splunk-addon-cim-tests")

    @pytest.mark.splunk_searchtime_cim
    @pytest.mark.splunk_searchtime_cim_fields
    def test_cim_required_fields(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_searchtime_cim_fields,
        record_property,
    ):
        """
        Test the the required fields in the data models are extracted with valid values.
        Supports 3 scenarios. The test order is maintained for better test report.

            - Check that there is at least 1 event mapped with the data model
            - Check that each required field is extracted in all of the events mapped with the data model.
            - Check that if there are inter dependent fields, either all fields should be
              extracted or none of them should be extracted.
        """

        cim_data_set = splunk_searchtime_cim_fields["data_set"]
        cim_fields = splunk_searchtime_cim_fields["fields"]
        cim_tag_stanza = splunk_searchtime_cim_fields["tag_stanza"]

        cim_single_field = ", ".join(map(str, cim_fields))
        cim_fields_type = ", ".join(map(lambda f: f.get_type(), cim_fields))
        cim_data_model = cim_data_set[-1].data_model
        data_set = str(cim_data_set[-1])
        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )

        # Search Query
        base_search = "| search {}".format(index_list)
        for each_set in cim_data_set:
            base_search += " | search {}".format(each_set.search_constraints)

        base_search += " | search {}".format(cim_tag_stanza)

        test_helper = FieldTestHelper(
            splunk_search_util,
            cim_fields,
            interval=splunk_search_util.search_interval,
            retries=splunk_search_util.search_retry,
        )
        record_property("tag_stanza", cim_tag_stanza)
        record_property("data_model", cim_data_model)
        record_property("data_set", data_set)
        record_property("fields", cim_single_field)
        record_property("fields_type", cim_fields_type)
        # Execute the query and get the results
        results = test_helper.test_field(base_search, record_property)
        # All assertion are made in the same tests to make the test report with
        # very clear order of scenarios. with this approach, a user will be able to identify
        # what went wrong very quickly.

        if len(cim_fields) == 0:
            # If no fields are there, check that the events are mapped
            # with the data model
            assert results, (
                "\n0 Events mapped with the dataset.\n"
                f"\n{test_helper.format_exc_message()}"
            )
        if len(cim_fields) == 1:
            test_field = cim_fields[0]
            # If the field is required,
            #   there should be events mapped with the data model
            # If the field is conditional,
            #   It's fine if no events matched the condition
            if not test_field.type == "conditional":
                assert results, (
                    "\n0 Events mapped with the dataset.\n"
                    f"\n{test_helper.format_exc_message()}"
                )
            # The field should be extracted if event count > 0
            for each_field in results:
                assert not each_field["field_count"] == 0, (
                    f"\nField {test_field} is not extracted in any events."
                    f"\n{test_helper.format_exc_message()}"
                )
                if each_field["field_count"] > each_field["event_count"]:
                    raise AssertionError(
                        f"\nField {test_field} should not be multi-value."
                        f"\n{test_helper.format_exc_message()}"
                    )
                elif each_field["field_count"] < each_field["event_count"]:
                    # The field should be extracted in all events mapped
                    raise AssertionError(
                        f"\nField {test_field} is not extracted in some events."
                        f"\n{test_helper.format_exc_message()}"
                    )
                assert each_field["field_count"] == each_field["valid_field_count"], (
                    f"\nField {test_field} has invalid values."
                    f"\n{test_helper.format_exc_message()}"
                )
        elif len(cim_fields) > 1:
            # Check that count for all the fields in cluster is same.
            # If all the fields are not extracted in an event, that's a passing scenario
            # The count of the field may or may not be same with the count of event.
            sourcetype_fields = dict()
            for each_result in results:
                sourcetype_fields.setdefault(
                    (each_result["source"], each_result["sourcetype"]), list()
                ).extend([each_result["field_count"], each_result["valid_field_count"]])
            for sourcetype_fields in sourcetype_fields.values():
                assert len(set(sourcetype_fields)) == 1, (
                    "All fields from the field-cluster should be extracted with valid values if any one field is extracted."
                    f"\n{test_helper.format_exc_message()}"
                )

    @pytest.mark.splunk_searchtime_cim
    @pytest.mark.splunk_searchtime_cim_fields_not_allowed_in_search
    def test_cim_fields_not_allowed_in_search(
        self,
        splunk_ingest_data,
        splunk_search_util,
        splunk_setup,
        splunk_searchtime_cim_fields_not_allowed_in_search,
        record_property,
    ):
        """
        This test case checks the event_count for the cim fields of type ["not_allowed_in_search_and_props", "not_allowed_in_search"].
        - Expected event_count for these fields is zero.
        """
        cim_dataset = splunk_searchtime_cim_fields_not_allowed_in_search["data_set"]
        cim_fields = splunk_searchtime_cim_fields_not_allowed_in_search["fields"]
        cim_tag_stanza = splunk_searchtime_cim_fields_not_allowed_in_search[
            "tag_stanza"
        ]
        cim_data_model = cim_dataset[-1].data_model
        data_set = str(cim_dataset[-1])

        # Search Query
        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )

        base_search = "search {}".format(index_list)
        for each_set in cim_dataset:
            base_search += " | search {}".format(each_set.search_constraints)

        base_search += " | search {}".format(cim_tag_stanza)

        base_search += " AND ("
        for each_field in cim_fields:
            base_search += " ({}=*) OR".format(each_field.name)

        # To remove the extra OR at the end of search
        base_search = base_search[:-2]
        base_search += ")"

        if not cim_tag_stanza:
            base_search = base_search.replace("search OR", "search")

        base_search += " | stats "

        for each_field in cim_fields:
            base_search += " count({fname}) AS {fname}".format(fname=each_field.name)

        base_search += " by source, sourcetype"

        record_property("search", base_search)
        record_property("tag_stanza", cim_tag_stanza)
        record_property("data_model", cim_data_model)
        record_property("data_set", data_set)
        record_property("fields", ", ".join(map(str, cim_fields)))

        self.logger.info("base_search: %s", base_search)
        results = list(
            splunk_search_util.getFieldValuesList(
                base_search,
                interval=splunk_search_util.search_interval,
                retries=splunk_search_util.search_retry,
            )
        )

        violations = []
        if results:
            violations = [
                [
                    each_elem["source"],
                    each_elem["sourcetype"],
                    field.name,
                    each_elem.get(field.name),
                ]
                for each_elem in results
                for field in cim_fields
                if not each_elem.get(field.name) == "0"
                and not each_elem.get(field.name) == each_elem["sourcetype"]
            ]

            violation_str = (
                "The fields should not be extracted in the dataset"
                "\nThese fields are automatically provided by asset and identity"
                " correlation features of applications like Splunk Enterprise Security."
                "\nDo not define extractions for these fields when writing add-ons."
                "\nExpected eventcount: 0 \n\n"
            )
            violation_str += get_table_output(
                headers=["Source", "Sourcetype", "Fields", "Event Count"],
                value_list=violations,
            )

        assert not violations, violation_str

    @pytest.mark.splunk_searchtime_cim
    @pytest.mark.splunk_searchtime_cim_fields_not_allowed_in_props
    def test_cim_fields_not_allowed_in_props(
        self,
        splunk_ingest_data,
        splunk_setup,
        splunk_searchtime_cim_fields_not_allowed_in_props,
        record_property,
    ):
        """
        This testcase checks for cim field of type ["not_allowed_in_search_and_props", "not_allowed_in_props"] if an extraction is defined in the configuration file.
        """
        result_str = (
            "The field extractions are not allowed in the configuration files"
            "\nThese fields are automatically provided by asset and identity"
            " correlation features of applications like Splunk Enterprise Security."
            "\nDo not define extractions for these fields when writing add-ons.\n\n"
        )

        result_str += get_table_output(
            headers=["Stanza", "Classname", "Fieldname"],
            value_list=[
                [data["stanza"], data["classname"], data["name"]]
                for data in splunk_searchtime_cim_fields_not_allowed_in_props["fields"]
            ],
        )

        assert not splunk_searchtime_cim_fields_not_allowed_in_props[
            "fields"
        ], result_str

    @pytest.mark.splunk_searchtime_cim
    @pytest.mark.splunk_searchtime_cim_mapped_datamodel
    def test_eventtype_mapped_multiple_cim_datamodel(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_searchtime_cim_mapped_datamodel,
        record_property,
        caplog,
    ):
        """
        This test case check that event type is not be mapped with more than one data model

        Args:
            splunk_search_util (SearchUtil): Object that helps to search on Splunk.
            splunk_searchtime_cim_mapped_datamodel: Object which contain eventtype list
            record_property (fixture): Document facts of test cases.
            caplog (fixture): fixture to capture logs.
        """

        data_models = [
            {"name": "Alerts", "tags": [["alert"]]},
            {
                "name": "Authentication",
                "tags": [
                    ["authentication"],
                    ["authentication", "default"],
                    ["authentication", "insecure"],
                    ["authentication", "privileged"],
                ],
            },
            {"name": "Certificates", "tags": [["certificate"], ["certificate", "ssl"]]},
            {
                "name": "Change",
                "tags": [
                    ["change"],
                    ["change", "audit"],
                    ["change", "endpoint"],
                    ["change", "network"],
                    ["change", "account"],
                ],
            },
            {
                "name": "Compute_Inventory",
                "tags": [
                    ["inventory", "cpu"],
                    ["inventory", "memory"],
                    ["inventory", "network"],
                    ["inventory", "storage"],
                    ["inventory", "system", "version"],
                    ["inventory", "user"],
                    ["inventory", "user", "default"],
                    ["inventory", "virtual"],
                    ["inventory", "virtual", "snapshot"],
                    ["inventory", "virtual", "tools"],
                ],
            },
            {"name": "DLP", "tags": [["dlp", "incident"]]},
            {
                "name": "Databases",
                "tags": [
                    ["database"],
                    ["database", "instance"],
                    ["database", "instance", "stats"],
                    ["database", "instance", "session"],
                    ["database", "instance", "lock"],
                    ["database", "query"],
                    ["database", "query", "tablespace"],
                    ["database", "query", "stats"],
                ],
            },
            {
                "name": "Email",
                "tags": [
                    ["email"],
                    ["email", "delivery"],
                    ["email", "content"],
                    ["email", "filter"],
                ],
            },
            {
                "name": "Endpoint",
                "tags": [
                    ["listening", "port"],
                    ["process", "report"],
                    ["service", "report"],
                    ["endpoint", "filesystem"],
                    ["endpoint", "registry"],
                ],
            },
            {"name": "Event_Signatures", "tags": [["track_event_signatures"]]},
            {"name": "Interprocess_Messaging", "tags": [["messaging"]]},
            {"name": "Intrusion_Detection", "tags": [["ids", "attack"]]},
            {
                "name": "JVM",
                "tags": [
                    ["jvm"],
                    ["jvm", "threading"],
                    ["jvm", "runtime"],
                    ["jvm", "os"],
                    ["jvm", "compilation"],
                    ["jvm", "classloading"],
                    ["jvm", "memory"],
                ],
            },
            {
                "name": "Malware",
                "tags": [["malware", "attack"], ["malware", "operations"]],
            },
            {"name": "Network_Resolution", "tags": [["network", "resolution", "dns"]]},
            {
                "name": "Network_Sessions",
                "tags": [
                    ["network", "session"],
                    ["network", "session", "start"],
                    ["network", "session", "end"],
                    ["network", "session", "dhcp"],
                    ["network", "session", "vpn"],
                ],
            },
            {"name": "Network_Traffic", "tags": [["network", "communicate"]]},
            {
                "name": "Performance",
                "tags": [
                    ["performance", "cpu"],
                    ["performance", "facilities"],
                    ["performance", "memory"],
                    ["performance", "storage"],
                    ["performance", "network"],
                    ["performance", "os"],
                    ["performance", "os", "time", "synchronize"],
                    ["performance", "os", "uptime"],
                ],
            },
            {
                "name": "Splunk_Audit",
                "tags": [["modaction"], ["modaction", "invocation"]],
            },
            {
                "name": "Ticket_Management",
                "tags": [
                    ["ticketing"],
                    ["ticketing", "change"],
                    ["ticketing", "incident"],
                    ["ticketing", "problem"],
                ],
            },
            {"name": "Updates", "tags": [["update", "status"], ["update", "error"]]},
            {"name": "Vulnerabilities", "tags": [["report", "vulnerability"]]},
            {"name": "Web", "tags": [["web"], ["web", "proxy"]]},
        ]
        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )
        search = "search {} ".format(index_list)
        # search = "search "
        search += " OR ".join(
            "eventtype={} \n".format(eventtype)
            for eventtype in splunk_searchtime_cim_mapped_datamodel["eventtypes"]
        )
        search += " | fields eventtype,tag \n"

        for data_model in data_models:
            search += "| appendpipe [ | search "
            search += " OR ".join(
                "({})".format((" ".join("tag={}".format(tag) for tag in tags_list)))
                for tags_list in data_model.get("tags")
            )
            search += f" | eval dm_type=\"{data_model.get('name')}\"]\n"

        search += """| stats delim=", " dc(dm_type) as datamodel_count, values(dm_type) as datamodels by eventtype | nomv datamodels
        | where datamodel_count > 1 and NOT eventtype IN ("err0r")
        """

        record_property("search", search)

        results = list(
            splunk_search_util.getFieldValuesList(
                search,
                splunk_search_util.search_interval,
                splunk_search_util.search_retry,
            )
        )
        if results:
            record_property("results", results)
            result_str = get_table_output(
                headers=["Count", "Eventtype", "Datamodels"],
                value_list=[
                    [
                        each_result["datamodel_count"],
                        each_result["eventtype"],
                        each_result["datamodels"],
                    ]
                    for each_result in results
                ],
            )

        assert not results, (
            "Multiple data models are mapped with an eventtype"
            f"\nQuery result greater than 0."
            f"{format_search_query_log(search)}"
            f"\nEvent type which associated with multiple data model \n{result_str}"
        )

    @pytest.mark.splunk_searchtime_cim
    @pytest.mark.splunk_requirements
    @pytest.mark.splunk_requirements_unit
    def test_cim_fields_recommended(
        self, splunk_dm_recommended_fields, splunk_searchtime_cim_fields_recommended
    ):
        datamodel = splunk_searchtime_cim_fields_recommended["datamodel"]
        datasets = splunk_searchtime_cim_fields_recommended["datasets"]
        fields = splunk_searchtime_cim_fields_recommended["fields"]
        cim_version = splunk_searchtime_cim_fields_recommended["cim_version"]

        fields_from_model_definition = splunk_dm_recommended_fields(
            datamodel, datasets, cim_version
        )
        self.logger.debug(f"Fields from Splunk: {fields_from_model_definition}")

        model_key = f"{cim_version}:{datamodel}:{':'.join(datasets)}".strip(":")

        model_fields = fields_from_model_definition.get(model_key, [])
        self.logger.debug(f"Fields from CIM definition: {model_fields}")

        missing_fields = []
        for field in set(model_fields):
            if field not in fields:
                missing_fields.append(field)

        assert (
            missing_fields == []
        ), f"Not all fields from datamodel found for event definition. Missing fields: {', '.join(missing_fields)}"

test_cim_fields_not_allowed_in_props(splunk_ingest_data, splunk_setup, splunk_searchtime_cim_fields_not_allowed_in_props, record_property)

This testcase checks for cim field of type [“not_allowed_in_search_and_props”, “not_allowed_in_props”] if an extraction is defined in the configuration file.

Source code in pytest_splunk_addon/cim_tests/test_templates.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
@pytest.mark.splunk_searchtime_cim
@pytest.mark.splunk_searchtime_cim_fields_not_allowed_in_props
def test_cim_fields_not_allowed_in_props(
    self,
    splunk_ingest_data,
    splunk_setup,
    splunk_searchtime_cim_fields_not_allowed_in_props,
    record_property,
):
    """
    This testcase checks for cim field of type ["not_allowed_in_search_and_props", "not_allowed_in_props"] if an extraction is defined in the configuration file.
    """
    result_str = (
        "The field extractions are not allowed in the configuration files"
        "\nThese fields are automatically provided by asset and identity"
        " correlation features of applications like Splunk Enterprise Security."
        "\nDo not define extractions for these fields when writing add-ons.\n\n"
    )

    result_str += get_table_output(
        headers=["Stanza", "Classname", "Fieldname"],
        value_list=[
            [data["stanza"], data["classname"], data["name"]]
            for data in splunk_searchtime_cim_fields_not_allowed_in_props["fields"]
        ],
    )

    assert not splunk_searchtime_cim_fields_not_allowed_in_props[
        "fields"
    ], result_str

This test case checks the event_count for the cim fields of type [“not_allowed_in_search_and_props”, “not_allowed_in_search”]. - Expected event_count for these fields is zero.

Source code in pytest_splunk_addon/cim_tests/test_templates.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
@pytest.mark.splunk_searchtime_cim
@pytest.mark.splunk_searchtime_cim_fields_not_allowed_in_search
def test_cim_fields_not_allowed_in_search(
    self,
    splunk_ingest_data,
    splunk_search_util,
    splunk_setup,
    splunk_searchtime_cim_fields_not_allowed_in_search,
    record_property,
):
    """
    This test case checks the event_count for the cim fields of type ["not_allowed_in_search_and_props", "not_allowed_in_search"].
    - Expected event_count for these fields is zero.
    """
    cim_dataset = splunk_searchtime_cim_fields_not_allowed_in_search["data_set"]
    cim_fields = splunk_searchtime_cim_fields_not_allowed_in_search["fields"]
    cim_tag_stanza = splunk_searchtime_cim_fields_not_allowed_in_search[
        "tag_stanza"
    ]
    cim_data_model = cim_dataset[-1].data_model
    data_set = str(cim_dataset[-1])

    # Search Query
    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )

    base_search = "search {}".format(index_list)
    for each_set in cim_dataset:
        base_search += " | search {}".format(each_set.search_constraints)

    base_search += " | search {}".format(cim_tag_stanza)

    base_search += " AND ("
    for each_field in cim_fields:
        base_search += " ({}=*) OR".format(each_field.name)

    # To remove the extra OR at the end of search
    base_search = base_search[:-2]
    base_search += ")"

    if not cim_tag_stanza:
        base_search = base_search.replace("search OR", "search")

    base_search += " | stats "

    for each_field in cim_fields:
        base_search += " count({fname}) AS {fname}".format(fname=each_field.name)

    base_search += " by source, sourcetype"

    record_property("search", base_search)
    record_property("tag_stanza", cim_tag_stanza)
    record_property("data_model", cim_data_model)
    record_property("data_set", data_set)
    record_property("fields", ", ".join(map(str, cim_fields)))

    self.logger.info("base_search: %s", base_search)
    results = list(
        splunk_search_util.getFieldValuesList(
            base_search,
            interval=splunk_search_util.search_interval,
            retries=splunk_search_util.search_retry,
        )
    )

    violations = []
    if results:
        violations = [
            [
                each_elem["source"],
                each_elem["sourcetype"],
                field.name,
                each_elem.get(field.name),
            ]
            for each_elem in results
            for field in cim_fields
            if not each_elem.get(field.name) == "0"
            and not each_elem.get(field.name) == each_elem["sourcetype"]
        ]

        violation_str = (
            "The fields should not be extracted in the dataset"
            "\nThese fields are automatically provided by asset and identity"
            " correlation features of applications like Splunk Enterprise Security."
            "\nDo not define extractions for these fields when writing add-ons."
            "\nExpected eventcount: 0 \n\n"
        )
        violation_str += get_table_output(
            headers=["Source", "Sourcetype", "Fields", "Event Count"],
            value_list=violations,
        )

    assert not violations, violation_str

test_cim_required_fields(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_searchtime_cim_fields, record_property)

Test the the required fields in the data models are extracted with valid values. Supports 3 scenarios. The test order is maintained for better test report.

- Check that there is at least 1 event mapped with the data model
- Check that each required field is extracted in all of the events mapped with the data model.
- Check that if there are inter dependent fields, either all fields should be
  extracted or none of them should be extracted.
Source code in pytest_splunk_addon/cim_tests/test_templates.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@pytest.mark.splunk_searchtime_cim
@pytest.mark.splunk_searchtime_cim_fields
def test_cim_required_fields(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_searchtime_cim_fields,
    record_property,
):
    """
    Test the the required fields in the data models are extracted with valid values.
    Supports 3 scenarios. The test order is maintained for better test report.

        - Check that there is at least 1 event mapped with the data model
        - Check that each required field is extracted in all of the events mapped with the data model.
        - Check that if there are inter dependent fields, either all fields should be
          extracted or none of them should be extracted.
    """

    cim_data_set = splunk_searchtime_cim_fields["data_set"]
    cim_fields = splunk_searchtime_cim_fields["fields"]
    cim_tag_stanza = splunk_searchtime_cim_fields["tag_stanza"]

    cim_single_field = ", ".join(map(str, cim_fields))
    cim_fields_type = ", ".join(map(lambda f: f.get_type(), cim_fields))
    cim_data_model = cim_data_set[-1].data_model
    data_set = str(cim_data_set[-1])
    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )

    # Search Query
    base_search = "| search {}".format(index_list)
    for each_set in cim_data_set:
        base_search += " | search {}".format(each_set.search_constraints)

    base_search += " | search {}".format(cim_tag_stanza)

    test_helper = FieldTestHelper(
        splunk_search_util,
        cim_fields,
        interval=splunk_search_util.search_interval,
        retries=splunk_search_util.search_retry,
    )
    record_property("tag_stanza", cim_tag_stanza)
    record_property("data_model", cim_data_model)
    record_property("data_set", data_set)
    record_property("fields", cim_single_field)
    record_property("fields_type", cim_fields_type)
    # Execute the query and get the results
    results = test_helper.test_field(base_search, record_property)
    # All assertion are made in the same tests to make the test report with
    # very clear order of scenarios. with this approach, a user will be able to identify
    # what went wrong very quickly.

    if len(cim_fields) == 0:
        # If no fields are there, check that the events are mapped
        # with the data model
        assert results, (
            "\n0 Events mapped with the dataset.\n"
            f"\n{test_helper.format_exc_message()}"
        )
    if len(cim_fields) == 1:
        test_field = cim_fields[0]
        # If the field is required,
        #   there should be events mapped with the data model
        # If the field is conditional,
        #   It's fine if no events matched the condition
        if not test_field.type == "conditional":
            assert results, (
                "\n0 Events mapped with the dataset.\n"
                f"\n{test_helper.format_exc_message()}"
            )
        # The field should be extracted if event count > 0
        for each_field in results:
            assert not each_field["field_count"] == 0, (
                f"\nField {test_field} is not extracted in any events."
                f"\n{test_helper.format_exc_message()}"
            )
            if each_field["field_count"] > each_field["event_count"]:
                raise AssertionError(
                    f"\nField {test_field} should not be multi-value."
                    f"\n{test_helper.format_exc_message()}"
                )
            elif each_field["field_count"] < each_field["event_count"]:
                # The field should be extracted in all events mapped
                raise AssertionError(
                    f"\nField {test_field} is not extracted in some events."
                    f"\n{test_helper.format_exc_message()}"
                )
            assert each_field["field_count"] == each_field["valid_field_count"], (
                f"\nField {test_field} has invalid values."
                f"\n{test_helper.format_exc_message()}"
            )
    elif len(cim_fields) > 1:
        # Check that count for all the fields in cluster is same.
        # If all the fields are not extracted in an event, that's a passing scenario
        # The count of the field may or may not be same with the count of event.
        sourcetype_fields = dict()
        for each_result in results:
            sourcetype_fields.setdefault(
                (each_result["source"], each_result["sourcetype"]), list()
            ).extend([each_result["field_count"], each_result["valid_field_count"]])
        for sourcetype_fields in sourcetype_fields.values():
            assert len(set(sourcetype_fields)) == 1, (
                "All fields from the field-cluster should be extracted with valid values if any one field is extracted."
                f"\n{test_helper.format_exc_message()}"
            )

test_eventtype_mapped_multiple_cim_datamodel(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_searchtime_cim_mapped_datamodel, record_property, caplog)

This test case check that event type is not be mapped with more than one data model

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

Object that helps to search on Splunk.

required
splunk_searchtime_cim_mapped_datamodel

Object which contain eventtype list

required
record_property fixture

Document facts of test cases.

required
caplog fixture

fixture to capture logs.

required
Source code in pytest_splunk_addon/cim_tests/test_templates.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
@pytest.mark.splunk_searchtime_cim
@pytest.mark.splunk_searchtime_cim_mapped_datamodel
def test_eventtype_mapped_multiple_cim_datamodel(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_searchtime_cim_mapped_datamodel,
    record_property,
    caplog,
):
    """
    This test case check that event type is not be mapped with more than one data model

    Args:
        splunk_search_util (SearchUtil): Object that helps to search on Splunk.
        splunk_searchtime_cim_mapped_datamodel: Object which contain eventtype list
        record_property (fixture): Document facts of test cases.
        caplog (fixture): fixture to capture logs.
    """

    data_models = [
        {"name": "Alerts", "tags": [["alert"]]},
        {
            "name": "Authentication",
            "tags": [
                ["authentication"],
                ["authentication", "default"],
                ["authentication", "insecure"],
                ["authentication", "privileged"],
            ],
        },
        {"name": "Certificates", "tags": [["certificate"], ["certificate", "ssl"]]},
        {
            "name": "Change",
            "tags": [
                ["change"],
                ["change", "audit"],
                ["change", "endpoint"],
                ["change", "network"],
                ["change", "account"],
            ],
        },
        {
            "name": "Compute_Inventory",
            "tags": [
                ["inventory", "cpu"],
                ["inventory", "memory"],
                ["inventory", "network"],
                ["inventory", "storage"],
                ["inventory", "system", "version"],
                ["inventory", "user"],
                ["inventory", "user", "default"],
                ["inventory", "virtual"],
                ["inventory", "virtual", "snapshot"],
                ["inventory", "virtual", "tools"],
            ],
        },
        {"name": "DLP", "tags": [["dlp", "incident"]]},
        {
            "name": "Databases",
            "tags": [
                ["database"],
                ["database", "instance"],
                ["database", "instance", "stats"],
                ["database", "instance", "session"],
                ["database", "instance", "lock"],
                ["database", "query"],
                ["database", "query", "tablespace"],
                ["database", "query", "stats"],
            ],
        },
        {
            "name": "Email",
            "tags": [
                ["email"],
                ["email", "delivery"],
                ["email", "content"],
                ["email", "filter"],
            ],
        },
        {
            "name": "Endpoint",
            "tags": [
                ["listening", "port"],
                ["process", "report"],
                ["service", "report"],
                ["endpoint", "filesystem"],
                ["endpoint", "registry"],
            ],
        },
        {"name": "Event_Signatures", "tags": [["track_event_signatures"]]},
        {"name": "Interprocess_Messaging", "tags": [["messaging"]]},
        {"name": "Intrusion_Detection", "tags": [["ids", "attack"]]},
        {
            "name": "JVM",
            "tags": [
                ["jvm"],
                ["jvm", "threading"],
                ["jvm", "runtime"],
                ["jvm", "os"],
                ["jvm", "compilation"],
                ["jvm", "classloading"],
                ["jvm", "memory"],
            ],
        },
        {
            "name": "Malware",
            "tags": [["malware", "attack"], ["malware", "operations"]],
        },
        {"name": "Network_Resolution", "tags": [["network", "resolution", "dns"]]},
        {
            "name": "Network_Sessions",
            "tags": [
                ["network", "session"],
                ["network", "session", "start"],
                ["network", "session", "end"],
                ["network", "session", "dhcp"],
                ["network", "session", "vpn"],
            ],
        },
        {"name": "Network_Traffic", "tags": [["network", "communicate"]]},
        {
            "name": "Performance",
            "tags": [
                ["performance", "cpu"],
                ["performance", "facilities"],
                ["performance", "memory"],
                ["performance", "storage"],
                ["performance", "network"],
                ["performance", "os"],
                ["performance", "os", "time", "synchronize"],
                ["performance", "os", "uptime"],
            ],
        },
        {
            "name": "Splunk_Audit",
            "tags": [["modaction"], ["modaction", "invocation"]],
        },
        {
            "name": "Ticket_Management",
            "tags": [
                ["ticketing"],
                ["ticketing", "change"],
                ["ticketing", "incident"],
                ["ticketing", "problem"],
            ],
        },
        {"name": "Updates", "tags": [["update", "status"], ["update", "error"]]},
        {"name": "Vulnerabilities", "tags": [["report", "vulnerability"]]},
        {"name": "Web", "tags": [["web"], ["web", "proxy"]]},
    ]
    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )
    search = "search {} ".format(index_list)
    # search = "search "
    search += " OR ".join(
        "eventtype={} \n".format(eventtype)
        for eventtype in splunk_searchtime_cim_mapped_datamodel["eventtypes"]
    )
    search += " | fields eventtype,tag \n"

    for data_model in data_models:
        search += "| appendpipe [ | search "
        search += " OR ".join(
            "({})".format((" ".join("tag={}".format(tag) for tag in tags_list)))
            for tags_list in data_model.get("tags")
        )
        search += f" | eval dm_type=\"{data_model.get('name')}\"]\n"

    search += """| stats delim=", " dc(dm_type) as datamodel_count, values(dm_type) as datamodels by eventtype | nomv datamodels
    | where datamodel_count > 1 and NOT eventtype IN ("err0r")
    """

    record_property("search", search)

    results = list(
        splunk_search_util.getFieldValuesList(
            search,
            splunk_search_util.search_interval,
            splunk_search_util.search_retry,
        )
    )
    if results:
        record_property("results", results)
        result_str = get_table_output(
            headers=["Count", "Eventtype", "Datamodels"],
            value_list=[
                [
                    each_result["datamodel_count"],
                    each_result["eventtype"],
                    each_result["datamodels"],
                ]
                for each_result in results
            ],
        )

    assert not results, (
        "Multiple data models are mapped with an eventtype"
        f"\nQuery result greater than 0."
        f"{format_search_query_log(search)}"
        f"\nEvent type which associated with multiple data model \n{result_str}"
    )

TestGenerator

Generates test cases to verify the CIM compatibility .

CIMTestGenerator

Bases: object

Generates test cases to verify the CIM compatibility.

Parameters:

Name Type Description Default
addon_path str

Relative or absolute path to the add-on

required
data_model_path str

Relative or absolute path to the data model json files

required
test_field_type list

For which types of fields, the test cases should be generated

['required', 'conditional']
common_fields_path str

Relative or absolute path of the json file with common fields

None
Source code in pytest_splunk_addon/cim_tests/test_generator.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
class CIMTestGenerator(object):
    """
    Generates test cases to verify the CIM compatibility.

    Args:
        addon_path (str):
            Relative or absolute path to the add-on
        data_model_path (str):
            Relative or absolute path to the data model json files
        test_field_type (list):
            For which types of fields, the test cases should be generated
        common_fields_path (str):
            Relative or absolute path of the json file with common fields
    """

    COMMON_FIELDS_PATH = "CommonFields.json"

    def __init__(
        self,
        addon_path,
        data_model_path,
        tokenized_events,
        test_field_type=["required", "conditional"],
        common_fields_path=None,
    ):

        self.data_model_handler = DataModelHandler(data_model_path)
        self.addon_parser = AddonParser(addon_path)
        self.tokenized_events = tokenized_events
        self.test_field_type = test_field_type
        self.common_fields_path = common_fields_path or op.join(
            op.dirname(op.abspath(__file__)), self.COMMON_FIELDS_PATH
        )

    def generate_tests(self, fixture):
        """
        Generate the test cases based on the fixture provided
        supported fixtures:

            * splunk_searchtime_cim_fields
            * splunk_searchtime_cim_fields_not_allowed
            * splunk_searchtime_cim_fields_not_extracted

        Args:
            fixture(str): fixture name
        """

        if fixture.endswith("fields"):
            yield from self.generate_cim_fields_tests()
        elif fixture.endswith("not_allowed_in_props"):
            yield from self.generate_field_extractions_test()
        elif fixture.endswith("not_allowed_in_search"):
            yield from self.generate_fields_event_count_test()
        elif fixture.endswith("mapped_datamodel"):
            yield from self.generate_mapped_datamodel_tests()
        elif fixture.endswith("fields_recommended"):
            yield from self.generate_recommended_fields_tests()

    def get_mapped_datasets(self):
        """
        Get all mapped data_sets for each tags stanza from an add-on

        Yields:
            tuple: Tag Stanza, mapped DataSet
        """
        yield from self.data_model_handler.get_mapped_data_models(self.addon_parser)

    def generate_cim_fields_tests(self):
        """
        Generates the test cases for required/conditional/cluster fields.

        1. List CIM mapped models
        2. Iterate through each field in CIM data model
        3. Generate & Yield pytest.param for each test case
        4. Include the cluster test case as well.
        """
        LOGGER.info("Generating cim fields tests")
        for tag_stanza, dataset_list in self.get_mapped_datasets():
            test_dataset = dataset_list[-1]
            LOGGER.info(
                "Generating cim tests for tag_stanza=%s, dataset_list=%s",
                tag_stanza,
                test_dataset,
            )
            # Test to check there is at least one event in the dataset
            yield pytest.param(
                {"tag_stanza": tag_stanza, "data_set": dataset_list, "fields": []},
                id=f"{tag_stanza}::{test_dataset}",
            )
            # Test for each required fields
            for each_field in test_dataset.fields:
                if each_field.type in self.test_field_type:
                    yield pytest.param(
                        {
                            "tag_stanza": tag_stanza,
                            "data_set": dataset_list,
                            "fields": [each_field],
                        },
                        id=f"{tag_stanza}::{test_dataset}::{each_field}",
                    )

            # Test for the field cluster
            for each_fields_cluster in test_dataset.fields_cluster:
                yield pytest.param(
                    {
                        "tag_stanza": tag_stanza,
                        "data_set": dataset_list,
                        "fields": each_fields_cluster,
                    },
                    id=(
                        f"{tag_stanza}::{test_dataset}::"
                        f"{'+'.join([each_field.name for each_field in each_fields_cluster])}"
                    ),
                )

    def generate_field_extractions_test(self):
        """
        Generate tests for the fields which the extractions are not allowed in props.conf

        1. Get a list of fields of type in ["not_allowed_in_search_and_props", "not_allowed_in_props"] from common fields json.
        2. Get a list of fields whose extractions are defined in props.
        3. Compare and get the list of fields whose extractions are not allowed but defined.
        4. yield the field list
        """
        common_fields_list = self.get_common_fields(
            test_type=["not_allowed_in_search_and_props", "not_allowed_in_props"]
        )

        for _, dataset_list in self.get_mapped_datasets():
            test_dataset = dataset_list[-1]
            common_fields_list.extend(
                [
                    each_field
                    for each_field in test_dataset.fields
                    if each_field.type
                    in ["not_allowed_in_search_and_props", "not_allowed_in_props"]
                    and each_field not in common_fields_list
                ]
            )

        addon_stanzas = self.addon_parser.get_props_fields()
        not_allowed_fields = []
        for field_group in addon_stanzas:
            test_group = field_group.copy()
            not_allowed_fields.extend(
                [
                    {
                        "name": each_common_field.name,
                        "stanza": test_group.get("stanza"),
                        "classname": test_group.get("classname"),
                    }
                    for each in test_group["fields"]
                    for each_common_field in common_fields_list
                    if each_common_field.name == each.name
                    and each_common_field not in not_allowed_fields
                ]
            )

        yield pytest.param(
            {"fields": not_allowed_fields},
            id=f"searchtime_cim_fields",
        )

    def generate_fields_event_count_test(self):
        """
        Generates the tests which should not be extracted in an add-on

        1. Get the list of type=["not_allowed_in_search_and_props", "not_allowed_in_search"] fields from common fields json.
        2. Get the list of type=["not_allowed_in_search_and_props", "not_allowed_in_search"] fields from mapped datasets.
        3. Combine list1 and list2
        4. yield the field list
        5. Expected event_count for the fields: 0
        """

        not_allowed_fields = self.get_common_fields(
            test_type=["not_allowed_in_search_and_props", "not_allowed_in_search"]
        )

        for tag_stanza, dataset_list in self.get_mapped_datasets():
            test_dataset = dataset_list[-1]
            if not test_dataset.fields:
                continue
            test_fields = not_allowed_fields[:]
            test_fields.extend(
                [
                    each_field
                    for each_field in test_dataset.fields
                    if each_field.type
                    in ["not_allowed_in_search_and_props", "not_allowed_in_search"]
                    and each_field not in test_fields
                ]
            )
            yield pytest.param(
                {
                    "tag_stanza": tag_stanza,
                    "data_set": dataset_list,
                    "fields": test_fields,
                },
                id=f"{tag_stanza}::{test_dataset}",
            )

    def get_common_fields(self, test_type=[]):
        """
        To obtain list object of common fields mentioned in COMMON_FIELDS_PATH
        """
        with open(self.common_fields_path, "r") as cf_json:
            common_fields_json = json.load(cf_json)
        common_fields_list = list(Field.parse_fields(common_fields_json["fields"]))
        return [
            each_field
            for each_field in common_fields_list
            if each_field.type in test_type
        ]

    def generate_mapped_datamodel_tests(self):
        """
        Generates the tests to check event type is not be mapped with more than one data model

        1. Get a list of eventtype which defined in eventtype configuration.
        2. yield the eventtype list
        """
        eventtypes = []
        for each_eventtype in self.addon_parser.get_eventtypes():
            eventtypes.append(each_eventtype.get("stanza"))

        yield pytest.param(
            {"eventtypes": eventtypes},
            id=f"mapped_datamodel_tests",
        )

    def generate_recommended_fields_tests(self):
        for event in self.tokenized_events:
            if not event.requirement_test_data:
                continue
            for _, datamodels in event.requirement_test_data["datamodels"].items():
                if type(datamodels) is not list:
                    datamodels = [datamodels]
                for datamodel in datamodels:
                    model, *datasets = datamodel.split(":")
                    model = model.replace(" ", "_")
                    if datasets:
                        datasets = [dataset.replace(" ", "_") for dataset in datasets]

                    fields = (
                        list(event.requirement_test_data["cim_fields"].keys())
                        + event.requirement_test_data["missing_recommended_fields"]
                    )
                    for exception, _ in event.requirement_test_data[
                        "exceptions"
                    ].items():
                        fields.append(exception)

                    yield pytest.param(
                        {
                            "datamodel": model,
                            "datasets": datasets,
                            "fields": fields,
                            "cim_version": event.requirement_test_data["cim_version"],
                        },
                        id=f"{model}-{'-'.join(datasets)}::sample_name::{event.sample_name}::host::{event.metadata.get('host')}",
                    )

generate_cim_fields_tests()

Generates the test cases for required/conditional/cluster fields.

  1. List CIM mapped models
  2. Iterate through each field in CIM data model
  3. Generate & Yield pytest.param for each test case
  4. Include the cluster test case as well.
Source code in pytest_splunk_addon/cim_tests/test_generator.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def generate_cim_fields_tests(self):
    """
    Generates the test cases for required/conditional/cluster fields.

    1. List CIM mapped models
    2. Iterate through each field in CIM data model
    3. Generate & Yield pytest.param for each test case
    4. Include the cluster test case as well.
    """
    LOGGER.info("Generating cim fields tests")
    for tag_stanza, dataset_list in self.get_mapped_datasets():
        test_dataset = dataset_list[-1]
        LOGGER.info(
            "Generating cim tests for tag_stanza=%s, dataset_list=%s",
            tag_stanza,
            test_dataset,
        )
        # Test to check there is at least one event in the dataset
        yield pytest.param(
            {"tag_stanza": tag_stanza, "data_set": dataset_list, "fields": []},
            id=f"{tag_stanza}::{test_dataset}",
        )
        # Test for each required fields
        for each_field in test_dataset.fields:
            if each_field.type in self.test_field_type:
                yield pytest.param(
                    {
                        "tag_stanza": tag_stanza,
                        "data_set": dataset_list,
                        "fields": [each_field],
                    },
                    id=f"{tag_stanza}::{test_dataset}::{each_field}",
                )

        # Test for the field cluster
        for each_fields_cluster in test_dataset.fields_cluster:
            yield pytest.param(
                {
                    "tag_stanza": tag_stanza,
                    "data_set": dataset_list,
                    "fields": each_fields_cluster,
                },
                id=(
                    f"{tag_stanza}::{test_dataset}::"
                    f"{'+'.join([each_field.name for each_field in each_fields_cluster])}"
                ),
            )

generate_field_extractions_test()

Generate tests for the fields which the extractions are not allowed in props.conf

  1. Get a list of fields of type in [“not_allowed_in_search_and_props”, “not_allowed_in_props”] from common fields json.
  2. Get a list of fields whose extractions are defined in props.
  3. Compare and get the list of fields whose extractions are not allowed but defined.
  4. yield the field list
Source code in pytest_splunk_addon/cim_tests/test_generator.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def generate_field_extractions_test(self):
    """
    Generate tests for the fields which the extractions are not allowed in props.conf

    1. Get a list of fields of type in ["not_allowed_in_search_and_props", "not_allowed_in_props"] from common fields json.
    2. Get a list of fields whose extractions are defined in props.
    3. Compare and get the list of fields whose extractions are not allowed but defined.
    4. yield the field list
    """
    common_fields_list = self.get_common_fields(
        test_type=["not_allowed_in_search_and_props", "not_allowed_in_props"]
    )

    for _, dataset_list in self.get_mapped_datasets():
        test_dataset = dataset_list[-1]
        common_fields_list.extend(
            [
                each_field
                for each_field in test_dataset.fields
                if each_field.type
                in ["not_allowed_in_search_and_props", "not_allowed_in_props"]
                and each_field not in common_fields_list
            ]
        )

    addon_stanzas = self.addon_parser.get_props_fields()
    not_allowed_fields = []
    for field_group in addon_stanzas:
        test_group = field_group.copy()
        not_allowed_fields.extend(
            [
                {
                    "name": each_common_field.name,
                    "stanza": test_group.get("stanza"),
                    "classname": test_group.get("classname"),
                }
                for each in test_group["fields"]
                for each_common_field in common_fields_list
                if each_common_field.name == each.name
                and each_common_field not in not_allowed_fields
            ]
        )

    yield pytest.param(
        {"fields": not_allowed_fields},
        id=f"searchtime_cim_fields",
    )

generate_fields_event_count_test()

Generates the tests which should not be extracted in an add-on

  1. Get the list of type=[“not_allowed_in_search_and_props”, “not_allowed_in_search”] fields from common fields json.
  2. Get the list of type=[“not_allowed_in_search_and_props”, “not_allowed_in_search”] fields from mapped datasets.
  3. Combine list1 and list2
  4. yield the field list
  5. Expected event_count for the fields: 0
Source code in pytest_splunk_addon/cim_tests/test_generator.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_fields_event_count_test(self):
    """
    Generates the tests which should not be extracted in an add-on

    1. Get the list of type=["not_allowed_in_search_and_props", "not_allowed_in_search"] fields from common fields json.
    2. Get the list of type=["not_allowed_in_search_and_props", "not_allowed_in_search"] fields from mapped datasets.
    3. Combine list1 and list2
    4. yield the field list
    5. Expected event_count for the fields: 0
    """

    not_allowed_fields = self.get_common_fields(
        test_type=["not_allowed_in_search_and_props", "not_allowed_in_search"]
    )

    for tag_stanza, dataset_list in self.get_mapped_datasets():
        test_dataset = dataset_list[-1]
        if not test_dataset.fields:
            continue
        test_fields = not_allowed_fields[:]
        test_fields.extend(
            [
                each_field
                for each_field in test_dataset.fields
                if each_field.type
                in ["not_allowed_in_search_and_props", "not_allowed_in_search"]
                and each_field not in test_fields
            ]
        )
        yield pytest.param(
            {
                "tag_stanza": tag_stanza,
                "data_set": dataset_list,
                "fields": test_fields,
            },
            id=f"{tag_stanza}::{test_dataset}",
        )

generate_mapped_datamodel_tests()

Generates the tests to check event type is not be mapped with more than one data model

  1. Get a list of eventtype which defined in eventtype configuration.
  2. yield the eventtype list
Source code in pytest_splunk_addon/cim_tests/test_generator.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def generate_mapped_datamodel_tests(self):
    """
    Generates the tests to check event type is not be mapped with more than one data model

    1. Get a list of eventtype which defined in eventtype configuration.
    2. yield the eventtype list
    """
    eventtypes = []
    for each_eventtype in self.addon_parser.get_eventtypes():
        eventtypes.append(each_eventtype.get("stanza"))

    yield pytest.param(
        {"eventtypes": eventtypes},
        id=f"mapped_datamodel_tests",
    )

generate_tests(fixture)

Generate the test cases based on the fixture provided supported fixtures:

* splunk_searchtime_cim_fields
* splunk_searchtime_cim_fields_not_allowed
* splunk_searchtime_cim_fields_not_extracted

Parameters:

Name Type Description Default
fixture(str)

fixture name

required
Source code in pytest_splunk_addon/cim_tests/test_generator.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def generate_tests(self, fixture):
    """
    Generate the test cases based on the fixture provided
    supported fixtures:

        * splunk_searchtime_cim_fields
        * splunk_searchtime_cim_fields_not_allowed
        * splunk_searchtime_cim_fields_not_extracted

    Args:
        fixture(str): fixture name
    """

    if fixture.endswith("fields"):
        yield from self.generate_cim_fields_tests()
    elif fixture.endswith("not_allowed_in_props"):
        yield from self.generate_field_extractions_test()
    elif fixture.endswith("not_allowed_in_search"):
        yield from self.generate_fields_event_count_test()
    elif fixture.endswith("mapped_datamodel"):
        yield from self.generate_mapped_datamodel_tests()
    elif fixture.endswith("fields_recommended"):
        yield from self.generate_recommended_fields_tests()

get_common_fields(test_type=[])

To obtain list object of common fields mentioned in COMMON_FIELDS_PATH

Source code in pytest_splunk_addon/cim_tests/test_generator.py
232
233
234
235
236
237
238
239
240
241
242
243
def get_common_fields(self, test_type=[]):
    """
    To obtain list object of common fields mentioned in COMMON_FIELDS_PATH
    """
    with open(self.common_fields_path, "r") as cf_json:
        common_fields_json = json.load(cf_json)
    common_fields_list = list(Field.parse_fields(common_fields_json["fields"]))
    return [
        each_field
        for each_field in common_fields_list
        if each_field.type in test_type
    ]

get_mapped_datasets()

Get all mapped data_sets for each tags stanza from an add-on

Yields:

Name Type Description
tuple

Tag Stanza, mapped DataSet

Source code in pytest_splunk_addon/cim_tests/test_generator.py
89
90
91
92
93
94
95
96
def get_mapped_datasets(self):
    """
    Get all mapped data_sets for each tags stanza from an add-on

    Yields:
        tuple: Tag Stanza, mapped DataSet
    """
    yield from self.data_model_handler.get_mapped_data_models(self.addon_parser)

DataModelHandler

Provides Data Model handling functionalities. Such as

  • Parse all the data model JSON files
  • Get Mapped data model for an eventtype

DataModelHandler

Bases: object

Provides Data Model handling functionalities. Such as

  • Parse all the data model JSON files
  • Get Mapped data model for an eventtype

Parameters:

Name Type Description Default
data_model_path str

path to the data model JSON files

required
Source code in pytest_splunk_addon/cim_tests/data_model_handler.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class DataModelHandler(object):
    """
    Provides Data Model handling functionalities. Such as

    * Parse all the data model JSON files
    * Get Mapped data model for an eventtype

    Args:
        data_model_path (str): path to the data model JSON files
    """

    def __init__(self, data_model_path):
        self.data_model_path = data_model_path
        self._data_models = None

    @property
    def data_models(self):
        if not self._data_models:
            self._data_models = list(self.load_data_models(self.data_model_path))
        return self._data_models

    def _get_all_tags_per_stanza(self, addon_parser):
        """
        Get list of all tags mapped with single stanza in tags.conf

        Args:
            addon_parser (addon_parser.AddonParser): Object of Addon_parser

        Returns:
            tags mapped with stanzas in tags.conf

                {
                    stanza_name: [List of tags mapped to the stanza]
                }
        """

        tag_stanzas = {}
        for each_tag in addon_parser.get_tags():
            stanza_name = each_tag["stanza"]
            tags = each_tag["tag"]

            tag_stanzas.setdefault(stanza_name, []).append(tags)

        return tag_stanzas

    def load_data_models(self, data_model_path):
        """
        Parse all the data model JSON files one by one

        Yields:
            (cim_tests.data_model.DataModel): parsed data model object
        """
        # Parse each fields and load data models
        json_list = [
            each for each in os.listdir(data_model_path) if each.endswith(".json")
        ]
        for each_json in json_list:
            yield DataModel(
                JSONSchema.parse_data_model(os.path.join(data_model_path, each_json))
            )

    def get_mapped_data_models(self, addon_parser):
        """
        Get list of eventtypes mapped with Data-Sets.
        The reason addon_parser is an argument & not attribute of the class
        is that, the loaded handler should be used with multiple addons.

        Args:
            addon_parser (addon_parser.AddonParser): Object of Addon_parser

        Yields:
            tag stanza mapped with list of data sets

                "eventtype=sample", DataSet(performance)
        """

        tags_in_each_stanza = self._get_all_tags_per_stanza(addon_parser)
        for eventtype, tags in tags_in_each_stanza.items():
            is_mapped_datasets = False
            for each_data_model in self.data_models:
                mapped_datasets = list(each_data_model.get_mapped_datasets(tags))
                if mapped_datasets:
                    is_mapped_datasets = True
                    LOGGER.info(
                        "Data Model=%s mapped for %s", each_data_model, eventtype
                    )
                    for each_mapped_dataset in mapped_datasets:
                        yield eventtype, each_mapped_dataset
            if not is_mapped_datasets:
                LOGGER.info("No Data Model mapped for %s", eventtype)

get_mapped_data_models(addon_parser)

Get list of eventtypes mapped with Data-Sets. The reason addon_parser is an argument & not attribute of the class is that, the loaded handler should be used with multiple addons.

Parameters:

Name Type Description Default
addon_parser AddonParser

Object of Addon_parser

required

Yields:

Type Description

tag stanza mapped with list of data sets

“eventtype=sample”, DataSet(performance)

Source code in pytest_splunk_addon/cim_tests/data_model_handler.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def get_mapped_data_models(self, addon_parser):
    """
    Get list of eventtypes mapped with Data-Sets.
    The reason addon_parser is an argument & not attribute of the class
    is that, the loaded handler should be used with multiple addons.

    Args:
        addon_parser (addon_parser.AddonParser): Object of Addon_parser

    Yields:
        tag stanza mapped with list of data sets

            "eventtype=sample", DataSet(performance)
    """

    tags_in_each_stanza = self._get_all_tags_per_stanza(addon_parser)
    for eventtype, tags in tags_in_each_stanza.items():
        is_mapped_datasets = False
        for each_data_model in self.data_models:
            mapped_datasets = list(each_data_model.get_mapped_datasets(tags))
            if mapped_datasets:
                is_mapped_datasets = True
                LOGGER.info(
                    "Data Model=%s mapped for %s", each_data_model, eventtype
                )
                for each_mapped_dataset in mapped_datasets:
                    yield eventtype, each_mapped_dataset
        if not is_mapped_datasets:
            LOGGER.info("No Data Model mapped for %s", eventtype)

load_data_models(data_model_path)

Parse all the data model JSON files one by one

Yields:

Type Description
DataModel

parsed data model object

Source code in pytest_splunk_addon/cim_tests/data_model_handler.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def load_data_models(self, data_model_path):
    """
    Parse all the data model JSON files one by one

    Yields:
        (cim_tests.data_model.DataModel): parsed data model object
    """
    # Parse each fields and load data models
    json_list = [
        each for each in os.listdir(data_model_path) if each.endswith(".json")
    ]
    for each_json in json_list:
        yield DataModel(
            JSONSchema.parse_data_model(os.path.join(data_model_path, each_json))
        )

DataModel

Includes DataModel class which handles the DataSets within a data model.

DataModel

Bases: object

Handles the DataSets within a data model.

Parameters:

Name Type Description Default
data_model_json(dict)

Dictionary of the data model Json file

required
Source code in pytest_splunk_addon/cim_tests/data_model.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class DataModel(object):
    """
    Handles the DataSets within a data model.

    Args:
        data_model_json(dict): Dictionary of the data model Json file
    """

    def __init__(self, data_model_json):

        self.name = data_model_json.get("model_name")
        self.root_data_set = list(
            DataSet.load_dataset(data_model_json.get("objects"), self.name)
        )

    def _get_mapped_datasets(self, addon_tags, data_sets, mapped_datasets=[]):
        """
        Recursive function to get the data_sets mapped with addon_tags
        If the parent data_set is mapped, check the child data_sets too

        Args:
            addon_tags(list): Contains tags mapped to a stanza
            data_sets(list): list of data sets to check with

        Yields:
            data_set.DataSet: data set object mapped with the tags
        """
        for each_data_set in data_sets:
            if each_data_set.match_tags(addon_tags):
                current_mapped_ds = mapped_datasets[:]
                current_mapped_ds.append(each_data_set)
                yield current_mapped_ds
                yield from self._get_mapped_datasets(
                    addon_tags, each_data_set.child_dataset, current_mapped_ds
                )

    def get_mapped_datasets(self, addon_tags):
        """
        Get all mapped dataSets for an Add-on's tags stanza

        Args:
            addon_tags(list): Contains tags mapped to a stanza

        Yields:
            data_set.DataSet: data set object mapped with the tags
        """
        yield from self._get_mapped_datasets(addon_tags, self.root_data_set)

    def __str__(self):
        return str(self.name)

get_mapped_datasets(addon_tags)

Get all mapped dataSets for an Add-on’s tags stanza

Parameters:

Name Type Description Default
addon_tags(list)

Contains tags mapped to a stanza

required

Yields:

Type Description

data_set.DataSet: data set object mapped with the tags

Source code in pytest_splunk_addon/cim_tests/data_model.py
60
61
62
63
64
65
66
67
68
69
70
def get_mapped_datasets(self, addon_tags):
    """
    Get all mapped dataSets for an Add-on's tags stanza

    Args:
        addon_tags(list): Contains tags mapped to a stanza

    Yields:
        data_set.DataSet: data set object mapped with the tags
    """
    yield from self._get_mapped_datasets(addon_tags, self.root_data_set)

DataSet

Includes DataSet class which handles a single data set

DataSet

Bases: object

Handles a single data set

Parameters:

Name Type Description Default
data_set_json(dict)

Json of a single DataSet

required
Source code in pytest_splunk_addon/cim_tests/data_set.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class DataSet(object):
    """
    Handles a single data set

    Args:
        data_set_json(dict): Json of a single DataSet
    """

    def __init__(self, data_set_json, data_model):
        self.name = data_set_json.get("name")
        self.tags = data_set_json.get("tags")
        self.data_model = data_model
        self.child_dataset = list(
            self.load_dataset(data_set_json.get("child_dataset"), self.data_model)
        )
        self.fields = list(
            Field.parse_fields(
                data_set_json.get("fields"),
                expected_values=[],
                negative_values=["", "-", "unknown", "null", "(null)"],
            )
        )
        self.fields_cluster = self._parse_fields_cluster(
            data_set_json.get("fields_cluster")
        )
        self.search_constraints = self._parse_constraint(
            data_set_json.get("search_constraints")
        )

    @classmethod
    def load_dataset(cls, dataset_list, data_model):
        """
        Parse all the fields from the data_model_json

        Args:
            dataset_list(list): Contains list of datasets
            data_model: Name of the data model

        Yields:
            data_set.DataSet: Dataset object for the given list
        """
        if dataset_list is not None:
            for each_dataset in dataset_list:
                yield cls(each_dataset, data_model)

    @classmethod
    def _parse_constraint(cls, constraint_search):
        """
        For future implementation when
        Constraint parsing mechanism should be added.
        This would come in picture while we parse data model Json.
        """
        return constraint_search

    def _parse_fields_cluster(self, fields_clusters):
        """
        Parse all the fields from the data_model_json
        """
        parsed_fields_clusters = []
        for each_cluster in fields_clusters:
            parsed_cluster = list(filter(lambda f: f.name in each_cluster, self.fields))
            assert len(each_cluster) == len(
                parsed_cluster
            ), f"Dataset={self.name}, Each cluster field should be included in fields list"
            parsed_fields_clusters.append(parsed_cluster)
        return parsed_fields_clusters

    def match_tags(self, addon_tag_list):
        """
        Check if the tags are mapped with this data set
        """
        for each_tag_group in self.tags:
            if set(each_tag_group).issubset(set(addon_tag_list)):
                return True

    def __str__(self):
        return str(self.name)

load_dataset(dataset_list, data_model) classmethod

Parse all the fields from the data_model_json

Parameters:

Name Type Description Default
dataset_list(list)

Contains list of datasets

required
data_model

Name of the data model

required

Yields:

Type Description

data_set.DataSet: Dataset object for the given list

Source code in pytest_splunk_addon/cim_tests/data_set.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@classmethod
def load_dataset(cls, dataset_list, data_model):
    """
    Parse all the fields from the data_model_json

    Args:
        dataset_list(list): Contains list of datasets
        data_model: Name of the data model

    Yields:
        data_set.DataSet: Dataset object for the given list
    """
    if dataset_list is not None:
        for each_dataset in dataset_list:
            yield cls(each_dataset, data_model)

match_tags(addon_tag_list)

Check if the tags are mapped with this data set

Source code in pytest_splunk_addon/cim_tests/data_set.py
90
91
92
93
94
95
96
def match_tags(self, addon_tag_list):
    """
    Check if the tags are mapped with this data set
    """
    for each_tag_group in self.tags:
        if set(each_tag_group).issubset(set(addon_tag_list)):
            return True

FieldTestAdapter

FieldTestAdapter

Bases: Field

Field adapter to include the testing related properties on top of Field

Properties:

  • valid_field (str): New field generated which can only have the valid values
  • invalid_field (str): New field generated which can only have the invalid values
  • validity_query (str): The query which extracts the valid_field out of the field
Source code in pytest_splunk_addon/cim_tests/field_test_adapter.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class FieldTestAdapter(Field):
    """
    Field adapter to include the testing related properties on top of Field

    Properties:

    * valid_field (str): New field generated which can only have the valid values
    * invalid_field (str): New field generated which can only have the invalid values
    * validity_query (str): The query which extracts the valid_field out of the field

    """

    VALID_FIELD = "{}_valid"
    INVALID_FIELD = "{}_invalid"
    FIELD_COUNT = "{}_count"
    VALID_FIELD_COUNT = "{}_valid_count"
    INVALID_FIELD_VALUES = "{}_invalid_values"

    def __init__(self, field):
        self.__dict__ = field.__dict__.copy()
        self.valid_field = self.VALID_FIELD.format(field)
        self.invalid_field = self.INVALID_FIELD.format(field)
        self.validity_query = None

    @staticmethod
    def get_query_from_values(values):
        """
        List of values into SPL list

        Example::
            ["a", "b"] to '\"a\", \"b\"'

        Args:
            values (list): List of str values

        Returns:
            str: SPL query list
        """
        query = '\\", \\"'.join(values)
        return f'\\"{query}\\"'

    def gen_validity_query(self):
        """
        Generate validation search query::

            | eval valid_field = <validity>
            | eval valid_field = if(searchmatch(valid_field in <expected_values>), valid_field, null())
            | eval valid_field = if(searchmatch(valid_field in <negative_values>), null(), valid_field)
            | eval invalid_field=if(isnull(valid_field),field, null())

        """
        if not self.validity_query is None:
            return self.validity_query
        else:
            self.validity_query = ""
            if self.multi_value:
                self.validity_query += "\n" f"| nomv {self.name}"
            self.validity_query += "\n" f"| eval {self.valid_field}={self.validity}"
            if self.expected_values:
                self.validity_query += (
                    "\n"
                    '| eval {valid_field}=if(searchmatch("{valid_field} IN ({values})"), {valid_field}, null())'.format(
                        valid_field=self.valid_field,
                        values=self.get_query_from_values(self.expected_values),
                    )
                )
            if self.negative_values:
                self.validity_query += (
                    "\n"
                    '| eval {valid_field}=if(searchmatch("{valid_field} IN ({values})"), null(), {valid_field})'.format(
                        valid_field=self.valid_field,
                        values=self.get_query_from_values(self.negative_values),
                    )
                )
            self.validity_query += (
                "\n"
                f"| eval {self.invalid_field}=if(isnull({self.valid_field}), {self.name}, null())"
            )
            return self.validity_query

    def get_stats_query(self):
        """
        Generate stats search query::

            count(field) as field_count, count(valid_field) as valid_field_count,
                values(invalid_field) as invalid_values
        """
        query = f", count({self.name}) as {self.FIELD_COUNT.format(self.name)}"
        if self.gen_validity_query():
            query += f", count({self.valid_field}) as {self.VALID_FIELD_COUNT.format(self.name)}"
            query += f", values({self.invalid_field}) as {self.INVALID_FIELD_VALUES.format(self.name)}"
        return query

    @classmethod
    def get_test_fields(cls, fields):
        return [cls(each_field) for each_field in fields]

gen_validity_query()

Generate validation search query::

| eval valid_field = <validity>
| eval valid_field = if(searchmatch(valid_field in <expected_values>), valid_field, null())
| eval valid_field = if(searchmatch(valid_field in <negative_values>), null(), valid_field)
| eval invalid_field=if(isnull(valid_field),field, null())
Source code in pytest_splunk_addon/cim_tests/field_test_adapter.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def gen_validity_query(self):
    """
    Generate validation search query::

        | eval valid_field = <validity>
        | eval valid_field = if(searchmatch(valid_field in <expected_values>), valid_field, null())
        | eval valid_field = if(searchmatch(valid_field in <negative_values>), null(), valid_field)
        | eval invalid_field=if(isnull(valid_field),field, null())

    """
    if not self.validity_query is None:
        return self.validity_query
    else:
        self.validity_query = ""
        if self.multi_value:
            self.validity_query += "\n" f"| nomv {self.name}"
        self.validity_query += "\n" f"| eval {self.valid_field}={self.validity}"
        if self.expected_values:
            self.validity_query += (
                "\n"
                '| eval {valid_field}=if(searchmatch("{valid_field} IN ({values})"), {valid_field}, null())'.format(
                    valid_field=self.valid_field,
                    values=self.get_query_from_values(self.expected_values),
                )
            )
        if self.negative_values:
            self.validity_query += (
                "\n"
                '| eval {valid_field}=if(searchmatch("{valid_field} IN ({values})"), null(), {valid_field})'.format(
                    valid_field=self.valid_field,
                    values=self.get_query_from_values(self.negative_values),
                )
            )
        self.validity_query += (
            "\n"
            f"| eval {self.invalid_field}=if(isnull({self.valid_field}), {self.name}, null())"
        )
        return self.validity_query

get_query_from_values(values) staticmethod

List of values into SPL list

Example:: [“a”, “b”] to ‘“a”, “b”’

Parameters:

Name Type Description Default
values list

List of str values

required

Returns:

Name Type Description
str

SPL query list

Source code in pytest_splunk_addon/cim_tests/field_test_adapter.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@staticmethod
def get_query_from_values(values):
    """
    List of values into SPL list

    Example::
        ["a", "b"] to '\"a\", \"b\"'

    Args:
        values (list): List of str values

    Returns:
        str: SPL query list
    """
    query = '\\", \\"'.join(values)
    return f'\\"{query}\\"'

get_stats_query()

Generate stats search query::

count(field) as field_count, count(valid_field) as valid_field_count,
    values(invalid_field) as invalid_values
Source code in pytest_splunk_addon/cim_tests/field_test_adapter.py
 99
100
101
102
103
104
105
106
107
108
109
110
def get_stats_query(self):
    """
    Generate stats search query::

        count(field) as field_count, count(valid_field) as valid_field_count,
            values(invalid_field) as invalid_values
    """
    query = f", count({self.name}) as {self.FIELD_COUNT.format(self.name)}"
    if self.gen_validity_query():
        query += f", count({self.valid_field}) as {self.VALID_FIELD_COUNT.format(self.name)}"
        query += f", values({self.invalid_field}) as {self.INVALID_FIELD_VALUES.format(self.name)}"
    return query

FieldTestHelper

Provides the helper methods to test addon_parser.Field object

FieldTestHelper

Bases: object

Provides the helper methods to test addon_parser.Field object

Parameters:

Name Type Description Default
search_util SearchUtil

the util class to search on the Splunk instance

required
fields list addon_parser.Field

The field to be tested

required
interval int

at what interval each retry should be made

10
retries int

number of retries to make if no results found

4
Source code in pytest_splunk_addon/cim_tests/field_test_helper.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class FieldTestHelper(object):
    """
    Provides the helper methods to test addon_parser.Field object

    Args:
        search_util (SearchUtil): the util class to search on the Splunk instance
        fields (list addon_parser.Field): The field to be tested
        interval (int): at what interval each retry should be made
        retries (int): number of retries to make if no results found
    """

    logger = logging.getLogger("pytest-splunk-addon-tests")

    def __init__(self, search_util, fields, interval=10, retries=4):
        self.search_util = search_util
        self.fields = FieldTestAdapter.get_test_fields(fields)
        self.interval = interval
        self.retries = retries

    def test_field(self, base_search, record_property=None):
        """
        Generate a query for the list of fields and return the result

        Format of the query is::

            <condition>
            | eval <validity>
            | eval <expected_values>
            | eval <not negative_values>
            | eval <invalid_fields>
            | stats count as event_count, count(field) as field_count,
                count(valid_field) as valid_field_count,
                values(invalid_field) by sourcetype, source

        Args:
            base_search (str): Base search. Must be a search command.
            record_property (fixture): Document facts of test cases.

        Yields:
            dict: with source, sourcetype, field, event_count, field_count,
             valid_field_count, invalid_values keys
        """
        self._make_search_query(base_search)
        self.logger.info(f"Executing the search query: {self.search}")
        if record_property:
            record_property("search", " ".join(self.search.splitlines()))
        self.results = list(
            self.search_util.getFieldValuesList(
                self.search, self.interval, self.retries
            )
        )
        return self._parse_result(self.results)

    def _make_search_query(self, base_search):
        """
        Make the search query by using the list of fields::

            <base_search> <condition>
            | eval valid_field=<validity>
            | eval valid_field = if(field in <expected_values>)
            | eval valid_field = if(field not in <not negative_values>)
            | eval invalid_field = field if isnull(valid_field)
            | stats count as event_count, count(field) as field_count,
                count(valid_field) as valid_field_count,
                values(invalid_field) by sourcetype, source

        Args:
            base_search (str): The base search
        """
        self.search = f"{base_search} {self._gen_condition()}"
        self.search_event = self.search
        for each_field in self.fields:
            self.search += each_field.gen_validity_query()

        self.search += " \n| stats count as event_count"
        for each_field in self.fields:
            self.search += each_field.get_stats_query()
        self.search += " by sourcetype, source"

    def _parse_result(self, results):
        """
        Flatten the result into the following format::

            [{
                "sourcetype": str,
                "source:: str,
                "event_count": int,
                "field": Field,
                "field_count": int,
                "valid_field_count": int
                "invalid_values": list
            }]
        """
        self.parsed_result = list()
        for each_result in results:
            sourcetype = each_result.get("sourcetype")
            source = each_result.get("source")
            event_count = int(each_result.get("event_count"))
            for each_field in self.fields:
                field_dict = {
                    "field": each_field,
                    "field_count": int(
                        each_result.get(
                            FieldTestAdapter.FIELD_COUNT.format(each_field.name)
                        )
                    ),
                }
                if each_field.gen_validity_query():
                    field_dict["valid_field_count"] = int(
                        each_result.get(
                            FieldTestAdapter.VALID_FIELD_COUNT.format(each_field.name)
                        )
                    )
                    field_dict["invalid_values"] = each_result.get(
                        FieldTestAdapter.INVALID_FIELD_VALUES.format(each_field.name),
                        "-",
                    )
                field_dict.update(
                    {
                        "sourcetype": sourcetype,
                        "event_count": event_count,
                        "source": source,
                    }
                )
                self.parsed_result.append(field_dict)
            if not self.fields:
                self.parsed_result.append(
                    {
                        "sourcetype": sourcetype,
                        "event_count": event_count,
                        "source": source,
                    }
                )
        return self.parsed_result

    def _gen_condition(self):
        return " OR ".join(
            [each_field.condition for each_field in self.fields if each_field.condition]
        )

    def format_exc_message(self):
        """
        Format the exception message to display

        1) There's no field in the result::

            Source          Sourcetype      Event Count
            -------------------------------------------
            splunkd.log     splunkd         10
            scheduler.log   scheduler       0
            -------------------------------------------
            Search = <search query>

        2) There are multiple fields in the result::

            Source          Sourcetype  Field  Event Count  Field Count  Invalid Field Count  Invalid Values
            ------------------------------------------------------------------------------------------------
            splunkd.log     splunkd     One    10           10           5                   'unknown'
            scheduler.log   scheduler   Two    20           20           7                   '-', 'invalid'
            ------------------------------------------------------------------------------------------------
            Event count = 20
            Search = <search_query>

            Properties for the field :: One
            . . .

        """
        if not self.fields:
            exc_message = get_table_output(
                headers=["Source", "Sourcetype", "Event Count"],
                value_list=[
                    [
                        each_result["source"],
                        each_result["sourcetype"],
                        each_result["event_count"],
                    ]
                    for each_result in self.parsed_result
                ],
            )
        elif len(self.fields) >= 1:
            exc_message = get_table_output(
                headers=[
                    "Source",
                    "Sourcetype",
                    "Field",
                    "Event Count",
                    "Field Count",
                    "Invalid Field Count",
                    "Invalid Values",
                ],
                value_list=[
                    [
                        each_result["source"],
                        each_result["sourcetype"],
                        each_result["field"].name,
                        each_result["event_count"],
                        each_result["field_count"],
                        each_result["field_count"]
                        - each_result.get(
                            "valid_field_count", each_result["field_count"]
                        ),
                        (
                            each_result["invalid_values"][:200]
                            if each_result["invalid_values"]
                            else "-"
                        ),
                    ]
                    for each_result in self.parsed_result
                ],
            )
        exc_message += f"{format_search_query_log(self.search)}"
        for each_field in self.fields:
            exc_message += (
                f"\n\nProperties for the field :: {each_field.get_properties()}"
            )
        return exc_message

format_exc_message()

Format the exception message to display

1) There’s no field in the result::

Source          Sourcetype      Event Count
-------------------------------------------
splunkd.log     splunkd         10
scheduler.log   scheduler       0
-------------------------------------------
Search = <search query>

2) There are multiple fields in the result::

Source          Sourcetype  Field  Event Count  Field Count  Invalid Field Count  Invalid Values
------------------------------------------------------------------------------------------------
splunkd.log     splunkd     One    10           10           5                   'unknown'
scheduler.log   scheduler   Two    20           20           7                   '-', 'invalid'
------------------------------------------------------------------------------------------------
Event count = 20
Search = <search_query>

Properties for the field :: One
. . .
Source code in pytest_splunk_addon/cim_tests/field_test_helper.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def format_exc_message(self):
    """
    Format the exception message to display

    1) There's no field in the result::

        Source          Sourcetype      Event Count
        -------------------------------------------
        splunkd.log     splunkd         10
        scheduler.log   scheduler       0
        -------------------------------------------
        Search = <search query>

    2) There are multiple fields in the result::

        Source          Sourcetype  Field  Event Count  Field Count  Invalid Field Count  Invalid Values
        ------------------------------------------------------------------------------------------------
        splunkd.log     splunkd     One    10           10           5                   'unknown'
        scheduler.log   scheduler   Two    20           20           7                   '-', 'invalid'
        ------------------------------------------------------------------------------------------------
        Event count = 20
        Search = <search_query>

        Properties for the field :: One
        . . .

    """
    if not self.fields:
        exc_message = get_table_output(
            headers=["Source", "Sourcetype", "Event Count"],
            value_list=[
                [
                    each_result["source"],
                    each_result["sourcetype"],
                    each_result["event_count"],
                ]
                for each_result in self.parsed_result
            ],
        )
    elif len(self.fields) >= 1:
        exc_message = get_table_output(
            headers=[
                "Source",
                "Sourcetype",
                "Field",
                "Event Count",
                "Field Count",
                "Invalid Field Count",
                "Invalid Values",
            ],
            value_list=[
                [
                    each_result["source"],
                    each_result["sourcetype"],
                    each_result["field"].name,
                    each_result["event_count"],
                    each_result["field_count"],
                    each_result["field_count"]
                    - each_result.get(
                        "valid_field_count", each_result["field_count"]
                    ),
                    (
                        each_result["invalid_values"][:200]
                        if each_result["invalid_values"]
                        else "-"
                    ),
                ]
                for each_result in self.parsed_result
            ],
        )
    exc_message += f"{format_search_query_log(self.search)}"
    for each_field in self.fields:
        exc_message += (
            f"\n\nProperties for the field :: {each_field.get_properties()}"
        )
    return exc_message

test_field(base_search, record_property=None)

Generate a query for the list of fields and return the result

Format of the query is::

<condition>
| eval <validity>
| eval <expected_values>
| eval <not negative_values>
| eval <invalid_fields>
| stats count as event_count, count(field) as field_count,
    count(valid_field) as valid_field_count,
    values(invalid_field) by sourcetype, source

Parameters:

Name Type Description Default
base_search str

Base search. Must be a search command.

required
record_property fixture

Document facts of test cases.

None

Yields:

Name Type Description
dict

with source, sourcetype, field, event_count, field_count, valid_field_count, invalid_values keys

Source code in pytest_splunk_addon/cim_tests/field_test_helper.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def test_field(self, base_search, record_property=None):
    """
    Generate a query for the list of fields and return the result

    Format of the query is::

        <condition>
        | eval <validity>
        | eval <expected_values>
        | eval <not negative_values>
        | eval <invalid_fields>
        | stats count as event_count, count(field) as field_count,
            count(valid_field) as valid_field_count,
            values(invalid_field) by sourcetype, source

    Args:
        base_search (str): Base search. Must be a search command.
        record_property (fixture): Document facts of test cases.

    Yields:
        dict: with source, sourcetype, field, event_count, field_count,
         valid_field_count, invalid_values keys
    """
    self._make_search_query(base_search)
    self.logger.info(f"Executing the search query: {self.search}")
    if record_property:
        record_property("search", " ".join(self.search.splitlines()))
    self.results = list(
        self.search_util.getFieldValuesList(
            self.search, self.interval, self.retries
        )
    )
    return self._parse_result(self.results)

JsonSchema

Includes JSON schema for data models

JSONSchema

Bases: BaseSchema

JsonSchema + Parser of the Data model json files

Parameters:

Name Type Description Default
schema_path str

Relative or absolute path of the schema file

None
Source code in pytest_splunk_addon/cim_tests/json_schema.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class JSONSchema(BaseSchema):
    """
    JsonSchema + Parser of the Data model json files

    Args:
        schema_path (str): Relative or absolute path of the schema file
    """

    SCHEMA_FILE = "DatamodelSchema.json"

    def __init__(
        self,
        schema_path=None,
    ):
        self.schema_path = schema_path or op.join(
            op.dirname(op.abspath(__file__)), self.SCHEMA_FILE
        )

    @classmethod
    def parse_data_model(cls, file_path):
        """
        Parse and validate the Json file

        Args:
            schema_path (str): Relative or absolute path of the data model json file
        """
        try:
            with open(
                cls().schema_path,
                "r",
            ) as schema_f:
                json_schema = json.load(schema_f)
            with open(file_path, "r") as json_f:
                json_data = json.load(json_f)
                errors = Draft7Validator(json_schema).iter_errors(json_data)
                error_location, exc = "", ""
                LOGGER.info("Validating {}".format(file_path))
                for error in errors:
                    for error_index in error.path:
                        error_location = error_location + "[{}]".format(error_index)
                    if type(error.instance) == dict:
                        exc = exc + "\n{} for {}".format(error.message, error_location)
                    elif type(error.instance) in [str, list]:
                        exc = exc + "\nType mismatch: {} in property {}".format(
                            error.message, error_location
                        )
                    else:
                        exc = exc + "\n{}".format(error)
                if not error_location:
                    LOGGER.info("Valid Json")
                    return json_data
                else:
                    LOGGER.exception(exc)
                    raise Exception(exc)

        except json.decoder.JSONDecodeError as err:
            LOGGER.error("Json Decoding error in {} ".format(file_path))
            raise Exception("{} in file {}".format(err.args[0], file_path))

parse_data_model(file_path) classmethod

Parse and validate the Json file

Parameters:

Name Type Description Default
schema_path str

Relative or absolute path of the data model json file

required
Source code in pytest_splunk_addon/cim_tests/json_schema.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@classmethod
def parse_data_model(cls, file_path):
    """
    Parse and validate the Json file

    Args:
        schema_path (str): Relative or absolute path of the data model json file
    """
    try:
        with open(
            cls().schema_path,
            "r",
        ) as schema_f:
            json_schema = json.load(schema_f)
        with open(file_path, "r") as json_f:
            json_data = json.load(json_f)
            errors = Draft7Validator(json_schema).iter_errors(json_data)
            error_location, exc = "", ""
            LOGGER.info("Validating {}".format(file_path))
            for error in errors:
                for error_index in error.path:
                    error_location = error_location + "[{}]".format(error_index)
                if type(error.instance) == dict:
                    exc = exc + "\n{} for {}".format(error.message, error_location)
                elif type(error.instance) in [str, list]:
                    exc = exc + "\nType mismatch: {} in property {}".format(
                        error.message, error_location
                    )
                else:
                    exc = exc + "\n{}".format(error)
            if not error_location:
                LOGGER.info("Valid Json")
                return json_data
            else:
                LOGGER.exception(exc)
                raise Exception(exc)

    except json.decoder.JSONDecodeError as err:
        LOGGER.error("Json Decoding error in {} ".format(file_path))
        raise Exception("{} in file {}".format(err.args[0], file_path))

BaseSchema

Includes base class for data model schema.

BaseSchema

Bases: ABC

Abstract class to parse the Data model files. The possible format can be JSON, YML, CSV, Cim_json

Source code in pytest_splunk_addon/cim_tests/base_schema.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class BaseSchema(ABC):
    """
    Abstract class to parse the Data model files.
    The possible format can be JSON, YML, CSV, Cim_json
    """

    @abstractclassmethod
    def parse_data_model(cls, file_path):
        """
        Parse the DataModel file
        Convert it to JSON

        Expected Output::

            {
                "name":"Default_Authentication",
                "tags": ["authentication","default"],
                "fields_cluster":[],
                "fields":[
                    {
                        "fieldname": "action",
                        "field_type": "required",
                        "condition": "action IN ('success','failure','error')",
                        "comment":"The action performed on the resource."
                    },
                    ],
                "child_dataset": [
                    {
                        "name":"SuccessFul_Default_Authentication",
                        "tags": ["authentication","default"],
                        "fields_cluster":[],
                        "fields":[]
                        "child_dataset":[],
                        "search_constraints": "action='success'"
                    }
                ],
                "search_constraints":"action='failure'"
            }
        """
        pass

parse_data_model(file_path)

Parse the DataModel file Convert it to JSON

Expected Output::

{
    "name":"Default_Authentication",
    "tags": ["authentication","default"],
    "fields_cluster":[],
    "fields":[
        {
            "fieldname": "action",
            "field_type": "required",
            "condition": "action IN ('success','failure','error')",
            "comment":"The action performed on the resource."
        },
        ],
    "child_dataset": [
        {
            "name":"SuccessFul_Default_Authentication",
            "tags": ["authentication","default"],
            "fields_cluster":[],
            "fields":[]
            "child_dataset":[],
            "search_constraints": "action='success'"
        }
    ],
    "search_constraints":"action='failure'"
}
Source code in pytest_splunk_addon/cim_tests/base_schema.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@abstractclassmethod
def parse_data_model(cls, file_path):
    """
    Parse the DataModel file
    Convert it to JSON

    Expected Output::

        {
            "name":"Default_Authentication",
            "tags": ["authentication","default"],
            "fields_cluster":[],
            "fields":[
                {
                    "fieldname": "action",
                    "field_type": "required",
                    "condition": "action IN ('success','failure','error')",
                    "comment":"The action performed on the resource."
                },
                ],
            "child_dataset": [
                {
                    "name":"SuccessFul_Default_Authentication",
                    "tags": ["authentication","default"],
                    "fields_cluster":[],
                    "fields":[]
                    "child_dataset":[],
                    "search_constraints": "action='success'"
                }
            ],
            "search_constraints":"action='failure'"
        }
    """
    pass