Skip to content

FieldsTests

Test generation mechanism to verify the field extractions of an Add-on

TestTemplates

Includes the test scenarios to check the field extractions of an Add-on.

FieldTestTemplates

Bases: object

Test templates to test the knowledge objects of an App

Source code in pytest_splunk_addon/fields_tests/test_templates.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
class FieldTestTemplates(object):
    """
    Test templates to test the knowledge objects of an App
    """

    logger = logging.getLogger("pytest-splunk-addon")

    @pytest.mark.splunk_searchtime_fields
    @pytest.mark.splunk_searchtime_internal_errors
    def test_splunk_internal_errors(
        self, splunk_search_util, ignore_internal_errors, record_property, caplog
    ):
        """
        This test case checks that there are not any unexpected internal errors

        Args:
            splunk_search_util (SearchUtil): Object that helps to search on Splunk.
            ignore_internal_errors (fixture): common list of errors to be ignored
            record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
            caplog (fixture): fixture to capture logs.
        """
        search = """
            search index=_internal log_level=ERROR
            sourcetype!=splunkd_ui_access
            AND sourcetype!=splunk_web_access
            AND sourcetype!=splunk_web_service
            AND sourcetype!=splunkd_access
            AND sourcetype!=splunkd
        """
        for each in ignore_internal_errors:
            search += " NOT " + json.dumps(each)
        search += " | table _raw"
        record_property("search", search)
        result, results = splunk_search_util.checkQueryCountIsZero(search)
        if not result:
            record_property("results", results.as_list)
            pp = pprint.PrettyPrinter(indent=4)
            result_str = pp.pformat(results.as_list[:10])
        assert result, (
            f"\nQuery result greater than 0."
            f"{format_search_query_log(search)}"
            f"\nfound result={result_str}"
        )

    @pytest.mark.splunk_searchtime_fields
    @pytest.mark.splunk_searchtime_fields_positive
    def test_props_fields(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_searchtime_fields_positive,
        record_property,
    ):
        """
        This test case checks that a field value has the expected values.

        Args:
            splunk_search_util (SearchUtil): Object that helps to search on Splunk.
            splunk_ingest_data (fixture): Ensure data was ingested before running test
            splunk_setup (fixture): Ensure that test environment was set up before running test
            splunk_searchtime_fields_positive (fixture): fields data of the event to be tested
            record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
        """

        # Search Query
        record_property("stanza_name", splunk_searchtime_fields_positive["stanza"])
        record_property("stanza_type", splunk_searchtime_fields_positive["stanza_type"])
        record_property("fields", splunk_searchtime_fields_positive["fields"])

        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )
        search = (
            f"search {index_list}"
            f" {splunk_searchtime_fields_positive['stanza_type']}=\""
            f"{splunk_searchtime_fields_positive['stanza']}\""
        )
        for field_dict in splunk_searchtime_fields_positive["fields"]:
            field = Field(field_dict)
            expected_values = ", ".join([f'"{each}"' for each in field.expected_values])
            negative_values = ", ".join([f'"{each}"' for each in field.negative_values])

            search = (
                search + f" AND ({field} IN ({expected_values})"
                f" AND NOT {field} IN ({negative_values}))"
            )
        search += COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

        self.logger.info(f"Executing the search query: {search}")

        # run search
        result = splunk_search_util.checkQueryCountIsGreaterThanZero(
            search,
            interval=splunk_search_util.search_interval,
            retries=splunk_search_util.search_retry,
        )
        record_property("search", search)

        assert result, (
            f"\nNo result found for the search."
            f"{format_search_query_log(search)}"
            f"\ninterval={splunk_search_util.search_interval}, retries={splunk_search_util.search_retry}"
        )

    @pytest.mark.splunk_searchtime_fields
    @pytest.mark.splunk_requirements
    @pytest.mark.splunk_searchtime_fields_requirements
    def test_requirements_fields(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_searchtime_fields_requirements,
        record_property,
    ):
        """
        This test case checks that a field value has the expected values.

        Args:
            splunk_search_util (SearchUtil): Object that helps to search on Splunk.
            splunk_ingest_data (fixture): Ensure data was ingested before running test
            splunk_setup (fixture): Ensure that test environment was set up before running test
            splunk_searchtime_fields_requirements (fixture): fields data of the event to be tested
            record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
        """

        # Search Query
        record_property(
            "stanza_name", splunk_searchtime_fields_requirements["escaped_event"]
        )
        record_property("fields", splunk_searchtime_fields_requirements["fields"])
        record_property(
            "modinput_params", splunk_searchtime_fields_requirements["modinput_params"]
        )

        escaped_event = splunk_searchtime_fields_requirements["escaped_event"]
        fields = splunk_searchtime_fields_requirements["fields"]
        modinput_params = splunk_searchtime_fields_requirements["modinput_params"]

        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )

        basic_search = ""
        for param, param_value in modinput_params.items():
            if param_value is not None:
                basic_search += f" {param}={param_value}"

        search = f"search {index_list} {basic_search} {escaped_event} | fields *"

        self.logger.info(f"Executing the search query: {search}")

        fields_from_splunk = splunk_search_util.getFieldValuesDict(
            search,
            interval=splunk_search_util.search_interval,
            retries=splunk_search_util.search_retry,
        )

        assert fields_from_splunk, f"Event was not returned with search: {search}"

        missing_fields = []
        wrong_value_fields = {}

        for field, value in fields.items():
            if field not in fields_from_splunk:
                missing_fields.append(field)

            if value != fields_from_splunk.get(field):
                wrong_value_fields[field] = fields_from_splunk.get(field)

        wrong_values_table = get_table_output(
            headers=["Field", "Splunk value", "Expected value"],
            value_list=[
                [
                    str(field),
                    str(value),
                    str(fields[field]),
                ]
                for field, value in wrong_value_fields.items()
            ],
        )

        if not wrong_value_fields == {}:
            self.logger.error("Wrong field values:\n" + wrong_values_table)

        assert wrong_value_fields == {}, (
            f"\nNot all required fields have correct values or some fields are missing in Splunk. Wrong field values:\n{wrong_values_table}"
            f"{format_search_query_log(search)}"
        )

    @pytest.mark.splunk_searchtime_fields
    @pytest.mark.splunk_searchtime_fields_negative
    def test_props_fields_no_dash_not_empty(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_searchtime_fields_negative,
        record_property,
    ):
        """
        This test case checks negative scenario for the field value.

        Args:
            splunk_search_util (SearchUtil): Object that helps to search on Splunk.
            splunk_ingest_data (fixture): Ensure data was ingested before running test
            splunk_setup (fixture): Ensure that test environment was set up before running test
            splunk_searchtime_fields_negative (fixture): fields data of the event to be tested
            record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
        """

        # Search Query
        record_property("stanza_name", splunk_searchtime_fields_negative["stanza"])
        record_property("stanza_type", splunk_searchtime_fields_negative["stanza_type"])
        record_property("fields", splunk_searchtime_fields_negative["fields"])

        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )
        base_search = (
            f"search {index_list}"
            f" {splunk_searchtime_fields_negative['stanza_type']}=\""
            f"{splunk_searchtime_fields_negative['stanza']}\""
        )

        fields_search = []
        for field_dict in splunk_searchtime_fields_negative["fields"]:
            field = Field(field_dict)
            negative_values = ", ".join([f'"{each}"' for each in field.negative_values])

            fields_search.append(f"({field} IN ({negative_values}))")
        base_search += " AND ({})".format(" OR ".join(fields_search))
        search = base_search + COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

        self.logger.info(f"Executing the search query: {search}")

        # run search
        result, results = splunk_search_util.checkQueryCountIsZero(search)
        record_property("search", search)
        if not result:
            record_property("results", results.as_list)
            pp = pprint.PrettyPrinter(indent=4)
            result_str = pp.pformat(results.as_list[:10])

            query_for_unique_events = (
                base_search + TOP_FIVE_STRUCTURALLY_UNIQUE_EVENTS_QUERY_PART
            )
            query_results = splunk_search_util.get_search_results(
                query_for_unique_events
            )
            results_formatted_str = pp.pformat(query_results.as_list)
        assert result, (
            f"\nQuery result greater than 0."
            f"{format_search_query_log(search)}"
            f"\nfound result={result_str}\n"
            " === STRUCTURALLY UNIQUE EVENTS:\n"
            f"query={query_for_unique_events}\n"
            f"events= {results_formatted_str}"
        )

    @pytest.mark.splunk_searchtime_fields
    @pytest.mark.splunk_searchtime_fields_tags
    def test_tags(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_searchtime_fields_tags,
        record_property,
        caplog,
    ):
        """
        Test case to check tags mentioned in tags.conf

        This test case checks if a tag is assigned to the event if enabled,
        and also checks that a tag is not assigned to the event if disabled.

        Args:
            splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil): object that helps to search on Splunk.
            splunk_ingest_data (fixture): Ensure data was ingested before running test
            splunk_setup (fixture): Ensure that test environment was set up before running test
            splunk_searchtime_fields_tags (fixture): pytest parameters to test.
            record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
            caplog (fixture): fixture to capture logs.
        """

        is_tag_enabled = splunk_searchtime_fields_tags.get("enabled", True)
        tag_query = splunk_searchtime_fields_tags["stanza"]
        tag = splunk_searchtime_fields_tags["tag"]
        self.logger.info(f"Testing for tag {tag} with tag_query {tag_query}")

        record_property("Event_with", tag_query)
        record_property("tag", tag)
        record_property("is_tag_enabled", is_tag_enabled)

        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )
        search = f"search {index_list} {tag_query} AND tag={tag}"
        search += COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

        self.logger.info(f"Search: {search}")

        result = splunk_search_util.checkQueryCountIsGreaterThanZero(
            search,
            interval=splunk_search_util.search_interval,
            retries=splunk_search_util.search_retry,
        )

        record_property("search", search)

        if is_tag_enabled:
            assert result, (
                f"\nNo events found for the enabled Tag={tag}."
                f"{format_search_query_log(search)}"
                f"\ninterval={splunk_search_util.search_interval}, retries={splunk_search_util.search_retry}"
            )
        else:
            assert not result, (
                f"\nEvents found for the disabled Tag={tag}."
                f"{format_search_query_log(search)}"
                f"\ninterval={splunk_search_util.search_interval}, retries={splunk_search_util.search_retry}"
            )

    @pytest.mark.splunk_searchtime_fields
    @pytest.mark.splunk_requirements
    @pytest.mark.splunk_searchtime_fields_datamodels
    def test_datamodels(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_searchtime_fields_datamodels,
        record_property,
        caplog,
    ):
        """
        Test case to check if correct datamodels are assigned to the event.

        This test case checks if tags assigned to the event match assigned datamodel
        and also checks if there is no additional wrongly assigned datamodel.

        Args:
            splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil):
                object that helps to search on Splunk.
            splunk_ingest_data (fixture): Ensure data was ingested before running test
            splunk_setup (fixture): Ensure that test environment was set up before running test
            splunk_searchtime_fields_datamodels (fixture): pytest parameters to test.
            record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
            caplog (fixture): fixture to capture logs.
        """
        esacaped_event = splunk_searchtime_fields_datamodels["stanza"]
        datamodels = splunk_searchtime_fields_datamodels["datamodels"]
        self.logger.info(
            f"Testing for tag {datamodels} with tag_query {esacaped_event}"
        )

        record_property("Event_with", esacaped_event)
        record_property("datamodels", datamodels)

        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )
        search = f"search {index_list} {esacaped_event} | fields *"

        self.logger.info(f"Search: {search}")

        fields_from_splunk = splunk_search_util.getFieldValuesDict(
            search,
            interval=splunk_search_util.search_interval,
            retries=splunk_search_util.search_retry,
        )

        assert fields_from_splunk, f"Event was not returned with search: {search}"

        extracted_tags = fields_from_splunk.get("tag", "")
        extracted_tags = extracted_tags.strip("][").split(", ")
        extracted_tags = [tag.replace("'", "") for tag in extracted_tags]
        dm_tags = list(
            chain.from_iterable(
                [tags for dm, tags in dict_datamodel_tag.items() if dm in datamodels]
            )
        )
        self.logger.info(f"Tags extracted from Splunk {extracted_tags}")
        self.logger.info(f"Tags assigned to datamodels {dm_tags}")

        matched_datamodels = {
            dm: tags
            for dm, tags in dict_datamodel_tag.items()
            if all(tag in extracted_tags for tag in tags)
        }
        assigned_datamodels = {
            dm: tags
            for dm, tags in matched_datamodels.items()
            if not any(
                set(tags).issubset(set(matched_tags)) and dm != matched_datamodel
                for matched_datamodel, matched_tags in matched_datamodels.items()
            )
        }

        record_property("search", search)

        missing_datamodels = [dm for dm in datamodels if dm not in assigned_datamodels]
        wrong_datamodels = [dm for dm in assigned_datamodels if dm not in datamodels]

        exc_message = get_table_output(
            headers=[
                "Expected datamodel",
                "Expected tags",
                "Found datamodel",
                "Found tags",
            ],
            value_list=[
                [
                    ",".join(datamodels),
                    ",".join(dm_tags),
                    ",".join(assigned_datamodels.keys()),
                    ",".join(extracted_tags),
                ]
            ],
        )

        assert (
            missing_datamodels == [] and wrong_datamodels == []
        ), f"Incorrect datamodels found:\n{exc_message}"

    @pytest.mark.splunk_searchtime_fields
    @pytest.mark.splunk_searchtime_fields_eventtypes
    def test_eventtype(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_searchtime_fields_eventtypes,
        record_property,
        caplog,
    ):
        """
        Tests if all eventtypes in eventtypes.conf are generated in Splunk.

        Args:
            splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil):
                object that helps to search on Splunk.
            splunk_ingest_data (fixture): Ensure data was ingested before running test
            splunk_setup (fixture): Ensure that test environment was set up before running test
            splunk_searchtime_fields_eventtypes (fixture): Fixture containing list of eventtypes
            record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
            caplog (fixture): Access and control log capturing

        Returns:
            Asserts whether test case passes or fails.
        """
        record_property("eventtype", splunk_searchtime_fields_eventtypes["stanza"])
        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )
        search = (
            f"search {index_list} AND "
            f"eventtype="
            f"\"{splunk_searchtime_fields_eventtypes['stanza']}\""
        )
        search += COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

        self.logger.info(
            "Testing eventtype =%s", splunk_searchtime_fields_eventtypes["stanza"]
        )

        self.logger.info("Search query for testing =%s", search)

        # run search
        result = splunk_search_util.checkQueryCountIsGreaterThanZero(
            search,
            interval=splunk_search_util.search_interval,
            retries=splunk_search_util.search_retry,
        )
        record_property("search", search)
        assert result, (
            f"\nNo result found for the search."
            f"{format_search_query_log(search)}"
            f"\ninterval={splunk_search_util.search_interval}, retries={splunk_search_util.search_retry}"
        )

    @pytest.mark.splunk_searchtime_fields
    @pytest.mark.splunk_searchtime_fields_savedsearches
    def test_savedsearches(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_searchtime_fields_savedsearches,
        record_property,
        caplog,
    ):
        """
        Tests if all savedsearches in savedsearches.conf are being executed properly to generate proper results.

        Args:
            splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil):
                object that helps to search on Splunk.
            splunk_ingest_data (fixture): Ensure data was ingested before running test
            splunk_setup (fixture): Ensure that test environment was set up before running test
            splunk_searchtime_fields_savedsearches (fixture): Fixture containing list of savedsearches
            record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
            caplog (fixture): Access and control log capturing

        Returns:
            Asserts whether test case passes or fails.
        """
        search_query = splunk_searchtime_fields_savedsearches["search"]
        earliest_time = splunk_searchtime_fields_savedsearches["dispatch.earliest_time"]
        latest_time = splunk_searchtime_fields_savedsearches["dispatch.latest_time"]

        temp_search_query = search_query.split("|")
        if temp_search_query[0].find("savedsearch") == -1 and (
            len(temp_search_query) < 2 or temp_search_query[1].find("savedsearch") == -1
        ):
            temp_search_query[0] += " earliest_time = {0} latest_time = {1} ".format(
                earliest_time, latest_time
            )
            search_query = "|".join(temp_search_query)
            search = f"search {search_query}"
        else:
            search = "|".join(temp_search_query)

        self.logger.info(f"Search: {search}")

        result = splunk_search_util.checkQueryCountIsGreaterThanZero(
            search,
            interval=splunk_search_util.search_interval,
            retries=splunk_search_util.search_retry,
        )

        record_property("search", search)
        assert result, (
            f"\nNo result found for the search."
            f"{format_search_query_log(search)}"
            f"\ninterval={splunk_search_util.search_interval}, retries={splunk_search_util.search_retry}"
        )

test_datamodels(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_searchtime_fields_datamodels, record_property, caplog)

Test case to check if correct datamodels are assigned to the event.

This test case checks if tags assigned to the event match assigned datamodel and also checks if there is no additional wrongly assigned datamodel.

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

object that helps to search on Splunk.

required
splunk_ingest_data fixture

Ensure data was ingested before running test

required
splunk_setup fixture

Ensure that test environment was set up before running test

required
splunk_searchtime_fields_datamodels fixture

pytest parameters to test.

required
record_property fixture

Document facts of test cases to provide more info in the test failure reports.

required
caplog fixture

fixture to capture logs.

required
Source code in pytest_splunk_addon/fields_tests/test_templates.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
@pytest.mark.splunk_searchtime_fields
@pytest.mark.splunk_requirements
@pytest.mark.splunk_searchtime_fields_datamodels
def test_datamodels(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_searchtime_fields_datamodels,
    record_property,
    caplog,
):
    """
    Test case to check if correct datamodels are assigned to the event.

    This test case checks if tags assigned to the event match assigned datamodel
    and also checks if there is no additional wrongly assigned datamodel.

    Args:
        splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil):
            object that helps to search on Splunk.
        splunk_ingest_data (fixture): Ensure data was ingested before running test
        splunk_setup (fixture): Ensure that test environment was set up before running test
        splunk_searchtime_fields_datamodels (fixture): pytest parameters to test.
        record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
        caplog (fixture): fixture to capture logs.
    """
    esacaped_event = splunk_searchtime_fields_datamodels["stanza"]
    datamodels = splunk_searchtime_fields_datamodels["datamodels"]
    self.logger.info(
        f"Testing for tag {datamodels} with tag_query {esacaped_event}"
    )

    record_property("Event_with", esacaped_event)
    record_property("datamodels", datamodels)

    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )
    search = f"search {index_list} {esacaped_event} | fields *"

    self.logger.info(f"Search: {search}")

    fields_from_splunk = splunk_search_util.getFieldValuesDict(
        search,
        interval=splunk_search_util.search_interval,
        retries=splunk_search_util.search_retry,
    )

    assert fields_from_splunk, f"Event was not returned with search: {search}"

    extracted_tags = fields_from_splunk.get("tag", "")
    extracted_tags = extracted_tags.strip("][").split(", ")
    extracted_tags = [tag.replace("'", "") for tag in extracted_tags]
    dm_tags = list(
        chain.from_iterable(
            [tags for dm, tags in dict_datamodel_tag.items() if dm in datamodels]
        )
    )
    self.logger.info(f"Tags extracted from Splunk {extracted_tags}")
    self.logger.info(f"Tags assigned to datamodels {dm_tags}")

    matched_datamodels = {
        dm: tags
        for dm, tags in dict_datamodel_tag.items()
        if all(tag in extracted_tags for tag in tags)
    }
    assigned_datamodels = {
        dm: tags
        for dm, tags in matched_datamodels.items()
        if not any(
            set(tags).issubset(set(matched_tags)) and dm != matched_datamodel
            for matched_datamodel, matched_tags in matched_datamodels.items()
        )
    }

    record_property("search", search)

    missing_datamodels = [dm for dm in datamodels if dm not in assigned_datamodels]
    wrong_datamodels = [dm for dm in assigned_datamodels if dm not in datamodels]

    exc_message = get_table_output(
        headers=[
            "Expected datamodel",
            "Expected tags",
            "Found datamodel",
            "Found tags",
        ],
        value_list=[
            [
                ",".join(datamodels),
                ",".join(dm_tags),
                ",".join(assigned_datamodels.keys()),
                ",".join(extracted_tags),
            ]
        ],
    )

    assert (
        missing_datamodels == [] and wrong_datamodels == []
    ), f"Incorrect datamodels found:\n{exc_message}"

test_eventtype(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_searchtime_fields_eventtypes, record_property, caplog)

Tests if all eventtypes in eventtypes.conf are generated in Splunk.

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

object that helps to search on Splunk.

required
splunk_ingest_data fixture

Ensure data was ingested before running test

required
splunk_setup fixture

Ensure that test environment was set up before running test

required
splunk_searchtime_fields_eventtypes fixture

Fixture containing list of eventtypes

required
record_property fixture

Document facts of test cases to provide more info in the test failure reports.

required
caplog fixture

Access and control log capturing

required

Returns:

Type Description

Asserts whether test case passes or fails.

Source code in pytest_splunk_addon/fields_tests/test_templates.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
@pytest.mark.splunk_searchtime_fields
@pytest.mark.splunk_searchtime_fields_eventtypes
def test_eventtype(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_searchtime_fields_eventtypes,
    record_property,
    caplog,
):
    """
    Tests if all eventtypes in eventtypes.conf are generated in Splunk.

    Args:
        splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil):
            object that helps to search on Splunk.
        splunk_ingest_data (fixture): Ensure data was ingested before running test
        splunk_setup (fixture): Ensure that test environment was set up before running test
        splunk_searchtime_fields_eventtypes (fixture): Fixture containing list of eventtypes
        record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
        caplog (fixture): Access and control log capturing

    Returns:
        Asserts whether test case passes or fails.
    """
    record_property("eventtype", splunk_searchtime_fields_eventtypes["stanza"])
    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )
    search = (
        f"search {index_list} AND "
        f"eventtype="
        f"\"{splunk_searchtime_fields_eventtypes['stanza']}\""
    )
    search += COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

    self.logger.info(
        "Testing eventtype =%s", splunk_searchtime_fields_eventtypes["stanza"]
    )

    self.logger.info("Search query for testing =%s", search)

    # run search
    result = splunk_search_util.checkQueryCountIsGreaterThanZero(
        search,
        interval=splunk_search_util.search_interval,
        retries=splunk_search_util.search_retry,
    )
    record_property("search", search)
    assert result, (
        f"\nNo result found for the search."
        f"{format_search_query_log(search)}"
        f"\ninterval={splunk_search_util.search_interval}, retries={splunk_search_util.search_retry}"
    )

test_props_fields(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_searchtime_fields_positive, record_property)

This test case checks that a field value has the expected values.

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

Object that helps to search on Splunk.

required
splunk_ingest_data fixture

Ensure data was ingested before running test

required
splunk_setup fixture

Ensure that test environment was set up before running test

required
splunk_searchtime_fields_positive fixture

fields data of the event to be tested

required
record_property fixture

Document facts of test cases to provide more info in the test failure reports.

required
Source code in pytest_splunk_addon/fields_tests/test_templates.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@pytest.mark.splunk_searchtime_fields
@pytest.mark.splunk_searchtime_fields_positive
def test_props_fields(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_searchtime_fields_positive,
    record_property,
):
    """
    This test case checks that a field value has the expected values.

    Args:
        splunk_search_util (SearchUtil): Object that helps to search on Splunk.
        splunk_ingest_data (fixture): Ensure data was ingested before running test
        splunk_setup (fixture): Ensure that test environment was set up before running test
        splunk_searchtime_fields_positive (fixture): fields data of the event to be tested
        record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
    """

    # Search Query
    record_property("stanza_name", splunk_searchtime_fields_positive["stanza"])
    record_property("stanza_type", splunk_searchtime_fields_positive["stanza_type"])
    record_property("fields", splunk_searchtime_fields_positive["fields"])

    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )
    search = (
        f"search {index_list}"
        f" {splunk_searchtime_fields_positive['stanza_type']}=\""
        f"{splunk_searchtime_fields_positive['stanza']}\""
    )
    for field_dict in splunk_searchtime_fields_positive["fields"]:
        field = Field(field_dict)
        expected_values = ", ".join([f'"{each}"' for each in field.expected_values])
        negative_values = ", ".join([f'"{each}"' for each in field.negative_values])

        search = (
            search + f" AND ({field} IN ({expected_values})"
            f" AND NOT {field} IN ({negative_values}))"
        )
    search += COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

    self.logger.info(f"Executing the search query: {search}")

    # run search
    result = splunk_search_util.checkQueryCountIsGreaterThanZero(
        search,
        interval=splunk_search_util.search_interval,
        retries=splunk_search_util.search_retry,
    )
    record_property("search", search)

    assert result, (
        f"\nNo result found for the search."
        f"{format_search_query_log(search)}"
        f"\ninterval={splunk_search_util.search_interval}, retries={splunk_search_util.search_retry}"
    )

test_props_fields_no_dash_not_empty(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_searchtime_fields_negative, record_property)

This test case checks negative scenario for the field value.

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

Object that helps to search on Splunk.

required
splunk_ingest_data fixture

Ensure data was ingested before running test

required
splunk_setup fixture

Ensure that test environment was set up before running test

required
splunk_searchtime_fields_negative fixture

fields data of the event to be tested

required
record_property fixture

Document facts of test cases to provide more info in the test failure reports.

required
Source code in pytest_splunk_addon/fields_tests/test_templates.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
@pytest.mark.splunk_searchtime_fields
@pytest.mark.splunk_searchtime_fields_negative
def test_props_fields_no_dash_not_empty(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_searchtime_fields_negative,
    record_property,
):
    """
    This test case checks negative scenario for the field value.

    Args:
        splunk_search_util (SearchUtil): Object that helps to search on Splunk.
        splunk_ingest_data (fixture): Ensure data was ingested before running test
        splunk_setup (fixture): Ensure that test environment was set up before running test
        splunk_searchtime_fields_negative (fixture): fields data of the event to be tested
        record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
    """

    # Search Query
    record_property("stanza_name", splunk_searchtime_fields_negative["stanza"])
    record_property("stanza_type", splunk_searchtime_fields_negative["stanza_type"])
    record_property("fields", splunk_searchtime_fields_negative["fields"])

    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )
    base_search = (
        f"search {index_list}"
        f" {splunk_searchtime_fields_negative['stanza_type']}=\""
        f"{splunk_searchtime_fields_negative['stanza']}\""
    )

    fields_search = []
    for field_dict in splunk_searchtime_fields_negative["fields"]:
        field = Field(field_dict)
        negative_values = ", ".join([f'"{each}"' for each in field.negative_values])

        fields_search.append(f"({field} IN ({negative_values}))")
    base_search += " AND ({})".format(" OR ".join(fields_search))
    search = base_search + COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

    self.logger.info(f"Executing the search query: {search}")

    # run search
    result, results = splunk_search_util.checkQueryCountIsZero(search)
    record_property("search", search)
    if not result:
        record_property("results", results.as_list)
        pp = pprint.PrettyPrinter(indent=4)
        result_str = pp.pformat(results.as_list[:10])

        query_for_unique_events = (
            base_search + TOP_FIVE_STRUCTURALLY_UNIQUE_EVENTS_QUERY_PART
        )
        query_results = splunk_search_util.get_search_results(
            query_for_unique_events
        )
        results_formatted_str = pp.pformat(query_results.as_list)
    assert result, (
        f"\nQuery result greater than 0."
        f"{format_search_query_log(search)}"
        f"\nfound result={result_str}\n"
        " === STRUCTURALLY UNIQUE EVENTS:\n"
        f"query={query_for_unique_events}\n"
        f"events= {results_formatted_str}"
    )

test_requirements_fields(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_searchtime_fields_requirements, record_property)

This test case checks that a field value has the expected values.

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

Object that helps to search on Splunk.

required
splunk_ingest_data fixture

Ensure data was ingested before running test

required
splunk_setup fixture

Ensure that test environment was set up before running test

required
splunk_searchtime_fields_requirements fixture

fields data of the event to be tested

required
record_property fixture

Document facts of test cases to provide more info in the test failure reports.

required
Source code in pytest_splunk_addon/fields_tests/test_templates.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
@pytest.mark.splunk_searchtime_fields
@pytest.mark.splunk_requirements
@pytest.mark.splunk_searchtime_fields_requirements
def test_requirements_fields(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_searchtime_fields_requirements,
    record_property,
):
    """
    This test case checks that a field value has the expected values.

    Args:
        splunk_search_util (SearchUtil): Object that helps to search on Splunk.
        splunk_ingest_data (fixture): Ensure data was ingested before running test
        splunk_setup (fixture): Ensure that test environment was set up before running test
        splunk_searchtime_fields_requirements (fixture): fields data of the event to be tested
        record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
    """

    # Search Query
    record_property(
        "stanza_name", splunk_searchtime_fields_requirements["escaped_event"]
    )
    record_property("fields", splunk_searchtime_fields_requirements["fields"])
    record_property(
        "modinput_params", splunk_searchtime_fields_requirements["modinput_params"]
    )

    escaped_event = splunk_searchtime_fields_requirements["escaped_event"]
    fields = splunk_searchtime_fields_requirements["fields"]
    modinput_params = splunk_searchtime_fields_requirements["modinput_params"]

    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )

    basic_search = ""
    for param, param_value in modinput_params.items():
        if param_value is not None:
            basic_search += f" {param}={param_value}"

    search = f"search {index_list} {basic_search} {escaped_event} | fields *"

    self.logger.info(f"Executing the search query: {search}")

    fields_from_splunk = splunk_search_util.getFieldValuesDict(
        search,
        interval=splunk_search_util.search_interval,
        retries=splunk_search_util.search_retry,
    )

    assert fields_from_splunk, f"Event was not returned with search: {search}"

    missing_fields = []
    wrong_value_fields = {}

    for field, value in fields.items():
        if field not in fields_from_splunk:
            missing_fields.append(field)

        if value != fields_from_splunk.get(field):
            wrong_value_fields[field] = fields_from_splunk.get(field)

    wrong_values_table = get_table_output(
        headers=["Field", "Splunk value", "Expected value"],
        value_list=[
            [
                str(field),
                str(value),
                str(fields[field]),
            ]
            for field, value in wrong_value_fields.items()
        ],
    )

    if not wrong_value_fields == {}:
        self.logger.error("Wrong field values:\n" + wrong_values_table)

    assert wrong_value_fields == {}, (
        f"\nNot all required fields have correct values or some fields are missing in Splunk. Wrong field values:\n{wrong_values_table}"
        f"{format_search_query_log(search)}"
    )

test_savedsearches(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_searchtime_fields_savedsearches, record_property, caplog)

Tests if all savedsearches in savedsearches.conf are being executed properly to generate proper results.

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

object that helps to search on Splunk.

required
splunk_ingest_data fixture

Ensure data was ingested before running test

required
splunk_setup fixture

Ensure that test environment was set up before running test

required
splunk_searchtime_fields_savedsearches fixture

Fixture containing list of savedsearches

required
record_property fixture

Document facts of test cases to provide more info in the test failure reports.

required
caplog fixture

Access and control log capturing

required

Returns:

Type Description

Asserts whether test case passes or fails.

Source code in pytest_splunk_addon/fields_tests/test_templates.py
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
@pytest.mark.splunk_searchtime_fields
@pytest.mark.splunk_searchtime_fields_savedsearches
def test_savedsearches(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_searchtime_fields_savedsearches,
    record_property,
    caplog,
):
    """
    Tests if all savedsearches in savedsearches.conf are being executed properly to generate proper results.

    Args:
        splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil):
            object that helps to search on Splunk.
        splunk_ingest_data (fixture): Ensure data was ingested before running test
        splunk_setup (fixture): Ensure that test environment was set up before running test
        splunk_searchtime_fields_savedsearches (fixture): Fixture containing list of savedsearches
        record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
        caplog (fixture): Access and control log capturing

    Returns:
        Asserts whether test case passes or fails.
    """
    search_query = splunk_searchtime_fields_savedsearches["search"]
    earliest_time = splunk_searchtime_fields_savedsearches["dispatch.earliest_time"]
    latest_time = splunk_searchtime_fields_savedsearches["dispatch.latest_time"]

    temp_search_query = search_query.split("|")
    if temp_search_query[0].find("savedsearch") == -1 and (
        len(temp_search_query) < 2 or temp_search_query[1].find("savedsearch") == -1
    ):
        temp_search_query[0] += " earliest_time = {0} latest_time = {1} ".format(
            earliest_time, latest_time
        )
        search_query = "|".join(temp_search_query)
        search = f"search {search_query}"
    else:
        search = "|".join(temp_search_query)

    self.logger.info(f"Search: {search}")

    result = splunk_search_util.checkQueryCountIsGreaterThanZero(
        search,
        interval=splunk_search_util.search_interval,
        retries=splunk_search_util.search_retry,
    )

    record_property("search", search)
    assert result, (
        f"\nNo result found for the search."
        f"{format_search_query_log(search)}"
        f"\ninterval={splunk_search_util.search_interval}, retries={splunk_search_util.search_retry}"
    )

test_splunk_internal_errors(splunk_search_util, ignore_internal_errors, record_property, caplog)

This test case checks that there are not any unexpected internal errors

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

Object that helps to search on Splunk.

required
ignore_internal_errors fixture

common list of errors to be ignored

required
record_property fixture

Document facts of test cases to provide more info in the test failure reports.

required
caplog fixture

fixture to capture logs.

required
Source code in pytest_splunk_addon/fields_tests/test_templates.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@pytest.mark.splunk_searchtime_fields
@pytest.mark.splunk_searchtime_internal_errors
def test_splunk_internal_errors(
    self, splunk_search_util, ignore_internal_errors, record_property, caplog
):
    """
    This test case checks that there are not any unexpected internal errors

    Args:
        splunk_search_util (SearchUtil): Object that helps to search on Splunk.
        ignore_internal_errors (fixture): common list of errors to be ignored
        record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
        caplog (fixture): fixture to capture logs.
    """
    search = """
        search index=_internal log_level=ERROR
        sourcetype!=splunkd_ui_access
        AND sourcetype!=splunk_web_access
        AND sourcetype!=splunk_web_service
        AND sourcetype!=splunkd_access
        AND sourcetype!=splunkd
    """
    for each in ignore_internal_errors:
        search += " NOT " + json.dumps(each)
    search += " | table _raw"
    record_property("search", search)
    result, results = splunk_search_util.checkQueryCountIsZero(search)
    if not result:
        record_property("results", results.as_list)
        pp = pprint.PrettyPrinter(indent=4)
        result_str = pp.pformat(results.as_list[:10])
    assert result, (
        f"\nQuery result greater than 0."
        f"{format_search_query_log(search)}"
        f"\nfound result={result_str}"
    )

test_tags(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_searchtime_fields_tags, record_property, caplog)

Test case to check tags mentioned in tags.conf

This test case checks if a tag is assigned to the event if enabled, and also checks that a tag is not assigned to the event if disabled.

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

object that helps to search on Splunk.

required
splunk_ingest_data fixture

Ensure data was ingested before running test

required
splunk_setup fixture

Ensure that test environment was set up before running test

required
splunk_searchtime_fields_tags fixture

pytest parameters to test.

required
record_property fixture

Document facts of test cases to provide more info in the test failure reports.

required
caplog fixture

fixture to capture logs.

required
Source code in pytest_splunk_addon/fields_tests/test_templates.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
@pytest.mark.splunk_searchtime_fields
@pytest.mark.splunk_searchtime_fields_tags
def test_tags(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_searchtime_fields_tags,
    record_property,
    caplog,
):
    """
    Test case to check tags mentioned in tags.conf

    This test case checks if a tag is assigned to the event if enabled,
    and also checks that a tag is not assigned to the event if disabled.

    Args:
        splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil): object that helps to search on Splunk.
        splunk_ingest_data (fixture): Ensure data was ingested before running test
        splunk_setup (fixture): Ensure that test environment was set up before running test
        splunk_searchtime_fields_tags (fixture): pytest parameters to test.
        record_property (fixture): Document facts of test cases to provide more info in the test failure reports.
        caplog (fixture): fixture to capture logs.
    """

    is_tag_enabled = splunk_searchtime_fields_tags.get("enabled", True)
    tag_query = splunk_searchtime_fields_tags["stanza"]
    tag = splunk_searchtime_fields_tags["tag"]
    self.logger.info(f"Testing for tag {tag} with tag_query {tag_query}")

    record_property("Event_with", tag_query)
    record_property("tag", tag)
    record_property("is_tag_enabled", is_tag_enabled)

    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )
    search = f"search {index_list} {tag_query} AND tag={tag}"
    search += COUNT_BY_SOURCE_TYPE_SEARCH_QUERY_PART

    self.logger.info(f"Search: {search}")

    result = splunk_search_util.checkQueryCountIsGreaterThanZero(
        search,
        interval=splunk_search_util.search_interval,
        retries=splunk_search_util.search_retry,
    )

    record_property("search", search)

    if is_tag_enabled:
        assert result, (
            f"\nNo events found for the enabled Tag={tag}."
            f"{format_search_query_log(search)}"
            f"\ninterval={splunk_search_util.search_interval}, retries={splunk_search_util.search_retry}"
        )
    else:
        assert not result, (
            f"\nEvents found for the disabled Tag={tag}."
            f"{format_search_query_log(search)}"
            f"\ninterval={splunk_search_util.search_interval}, retries={splunk_search_util.search_retry}"
        )

TestGenerator

Module include class to generate the test cases to test the knowledge objects of an Add-on.

FieldTestGenerator

Bases: object

Generates test cases to test the knowledge objects of an Add-on.

  • Provides the pytest parameters to the test templates.
  • Supports field_bank: List of fields with patterns and expected values which should be tested for the Add-on.

Parameters:

Name Type Description Default
app_path str

Path of the app package

required
tokenized_events list

list of tokenized events

required
field_bank str

Path of the fields Json file

None
Source code in pytest_splunk_addon/fields_tests/test_generator.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
class FieldTestGenerator(object):
    """
    Generates test cases to test the knowledge objects of an Add-on.

    * Provides the pytest parameters to the test templates.
    * Supports field_bank: List of fields with patterns and expected
      values which should be tested for the Add-on.

    Args:
        app_path (str): Path of the app package
        tokenized_events (list): list of tokenized events
        field_bank (str): Path of the fields Json file
    """

    def __init__(self, app_path, tokenized_events, field_bank=None):
        LOGGER.debug("initializing AddonParser to parse the app")
        self.app_path = app_path
        self.addon_parser = AddonParser(self.app_path)
        self.tokenized_events = tokenized_events
        self.field_bank = field_bank

    def generate_tests(self, fixture):
        """
        Generate the test cases based on the fixture provided
        supported fixtures:

            * splunk_searchtime_fields_positive
            * splunk_searchtime_fields_negative
            * splunk_searchtime_fields_tags
            * splunk_searchtime_fields_eventtypes
            * splunk_searchtime_fields_savedsearches
            * splunk_searchtime_fields_requirements

        Args:
            fixture (str): fixture name

        """
        if fixture.endswith("positive"):
            yield from self.generate_field_tests(is_positive=True)
        elif fixture.endswith("negative"):
            yield from self.generate_field_tests(is_positive=False)
        elif fixture.endswith("tags"):
            yield from self.generate_tag_tests()
        elif fixture.endswith("eventtypes"):
            yield from self.generate_eventtype_tests()
        elif fixture.endswith("savedsearches"):
            yield from self.generate_savedsearches_tests()
        elif fixture.endswith("requirements"):
            yield from self.generate_requirements_tests()
        elif fixture.endswith("datamodels"):
            yield from self.generate_requirements_datamodels_tests()

    def generate_field_tests(self, is_positive):
        """
        Generate test case for fields

        Args:
            is_positive (bool): Test type to generate

        Yields:
            pytest.params for the test templates
        """
        LOGGER.info("generating field tests")
        field_itr = chain(
            FieldBank.init_field_bank_tests(self.field_bank),
            self.addon_parser.get_props_fields(),
        )
        for fields_group in field_itr:
            # Generate test case for the stanza
            # Do not generate if it is a negative test case
            if is_positive:
                stanza_test_group = fields_group.copy()
                stanza_test_group["fields"] = []
                yield pytest.param(
                    stanza_test_group, id="{stanza}".format(**fields_group)
                )

            # Generate a test case for all the fields in the classname
            if self._contains_classname(fields_group, ["EXTRACT", "REPORT", "LOOKUP"]):
                # ACD-4136: Convert the Field objects to dictionary to resolve the shared
                # memory issue with pytest-xdist parallel execution
                test_group = fields_group.copy()
                test_group["fields"] = [each.__dict__ for each in test_group["fields"]]
                yield pytest.param(
                    test_group, id="{stanza}::{classname}".format(**test_group)
                )

            # For each field mentioned in field_bank, a separate
            # test should be generated.
            # Counter to make the test_id unique
            field_bank_id = 0

            # Generate test-cases for each field in classname one by one
            for each_field in fields_group["fields"]:
                # Create a dictionary for a single field with classname and stanza
                # ACD-4136: Convert the Field object to dictionary to resolve the shared
                # memory issue with pytest-xdist parallel execution
                one_field_group = fields_group.copy()
                one_field_group["fields"] = [each_field.__dict__]
                if fields_group["classname"] != "field_bank":
                    test_type = "field"
                else:
                    field_bank_id += 1
                    test_type = f"field_bank_{field_bank_id}"

                stanza = fields_group["stanza"]
                yield pytest.param(
                    one_field_group, id=f"{stanza}::{test_type}::{each_field}"
                )

    def generate_tag_tests(self):
        """
        Generate test case for tags

        Yields:
            pytest.params for the test templates
        """
        for each_tag_group in self.addon_parser.get_tags():
            yield pytest.param(
                each_tag_group, id="{stanza}::tag::{tag}".format(**each_tag_group)
            )

    def generate_requirements_datamodels_tests(self):
        """
        Generate test case for datamodels

        Yields:
            pytest.params for the test templates
        """
        for event in self.tokenized_events:
            if (
                not event.requirement_test_data
                or event.requirement_test_data.keys() == {"other_fields"}
            ):
                continue
            if event.metadata.get("input_type", "").startswith("syslog"):
                stripped_event = xml_event_parser.strip_syslog_header(event.event)
                if stripped_event is None:
                    LOGGER.error(
                        "Syslog event do not match CEF, RFC_3164, RFC_5424 format"
                    )
                    continue
            else:
                stripped_event = event.event

            escaped_event = xml_event_parser.escape_char_event(stripped_event)
            datamodels = event.requirement_test_data.get("datamodels")
            if datamodels:
                if type(datamodels) is dict:
                    if type(datamodels["model"]) == list:
                        datamodels = datamodels["model"]
                    else:
                        datamodels = [datamodels]
                        datamodels = [dm["model"] for dm in datamodels]
            else:
                datamodels = []
            datamodels = [
                datamodel.replace(" ", "_").replace(":", "_")
                for datamodel in datamodels
            ]
            yield pytest.param(
                {
                    "datamodels": datamodels,
                    "stanza": escaped_event,
                },
                id=f"{'-'.join(datamodels)}::sample_name::{event.sample_name}::host::{event.metadata.get('host')}",
            )

    def generate_eventtype_tests(self):
        """
        Generate test case for eventtypes

        Yields:
            pytest.params for the test templates

        """
        for each_eventtype in self.addon_parser.get_eventtypes():
            yield pytest.param(
                each_eventtype, id="eventtype::{stanza}".format(**each_eventtype)
            )

    def generate_savedsearches_tests(self):
        """
        Generate test case for savedsearches

        Yields:
            pytest.params for the test templates
        """
        for each_savedsearch in self.addon_parser.get_savedsearches():
            yield pytest.param(
                each_savedsearch, id="{stanza}".format(**each_savedsearch)
            )

    def generate_requirements_tests(self):
        """
        Generate test cases for fields defined for datamodel
        This function generates tests previously covered by requirement tests

        Yields:
            pytest.params for the test templates
        """
        for event in self.tokenized_events:
            if not event.requirement_test_data:
                continue
            if event.metadata.get("input_type", "").startswith("syslog"):
                stripped_event = xml_event_parser.strip_syslog_header(event.event)
                if stripped_event is None:
                    LOGGER.error(
                        "Syslog event do not match CEF, RFC_3164, RFC_5424 format"
                    )
                    continue
            else:
                stripped_event = event.event

            escaped_event = xml_event_parser.escape_char_event(stripped_event)
            exceptions = event.requirement_test_data.get("exceptions", {})
            metadata = event.metadata
            modinput_params = {
                "sourcetype": metadata.get("sourcetype_to_search"),
            }

            cim_fields = event.requirement_test_data.get("cim_fields", {})
            other_fields = event.requirement_test_data.get("other_fields", {})
            requirement_fields = {**cim_fields, **other_fields}

            if requirement_fields:
                requirement_fields = {
                    field: value
                    for field, value in requirement_fields.items()
                    if field not in exceptions
                }
                yield pytest.param(
                    {
                        "escaped_event": escaped_event,
                        "fields": requirement_fields,
                        "modinput_params": modinput_params,
                    },
                    id=f"sample_name::{event.sample_name}::host::{event.metadata.get('host')}",
                )

    def _contains_classname(self, fields_group, criteria):
        """
        Check if the field_group dictionary contains the classname
        """
        return any([fields_group["classname"].startswith(each) for each in criteria])

generate_eventtype_tests()

Generate test case for eventtypes

Yields:

Type Description

pytest.params for the test templates

Source code in pytest_splunk_addon/fields_tests/test_generator.py
201
202
203
204
205
206
207
208
209
210
211
212
def generate_eventtype_tests(self):
    """
    Generate test case for eventtypes

    Yields:
        pytest.params for the test templates

    """
    for each_eventtype in self.addon_parser.get_eventtypes():
        yield pytest.param(
            each_eventtype, id="eventtype::{stanza}".format(**each_eventtype)
        )

generate_field_tests(is_positive)

Generate test case for fields

Parameters:

Name Type Description Default
is_positive bool

Test type to generate

required

Yields:

Type Description

pytest.params for the test templates

Source code in pytest_splunk_addon/fields_tests/test_generator.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def generate_field_tests(self, is_positive):
    """
    Generate test case for fields

    Args:
        is_positive (bool): Test type to generate

    Yields:
        pytest.params for the test templates
    """
    LOGGER.info("generating field tests")
    field_itr = chain(
        FieldBank.init_field_bank_tests(self.field_bank),
        self.addon_parser.get_props_fields(),
    )
    for fields_group in field_itr:
        # Generate test case for the stanza
        # Do not generate if it is a negative test case
        if is_positive:
            stanza_test_group = fields_group.copy()
            stanza_test_group["fields"] = []
            yield pytest.param(
                stanza_test_group, id="{stanza}".format(**fields_group)
            )

        # Generate a test case for all the fields in the classname
        if self._contains_classname(fields_group, ["EXTRACT", "REPORT", "LOOKUP"]):
            # ACD-4136: Convert the Field objects to dictionary to resolve the shared
            # memory issue with pytest-xdist parallel execution
            test_group = fields_group.copy()
            test_group["fields"] = [each.__dict__ for each in test_group["fields"]]
            yield pytest.param(
                test_group, id="{stanza}::{classname}".format(**test_group)
            )

        # For each field mentioned in field_bank, a separate
        # test should be generated.
        # Counter to make the test_id unique
        field_bank_id = 0

        # Generate test-cases for each field in classname one by one
        for each_field in fields_group["fields"]:
            # Create a dictionary for a single field with classname and stanza
            # ACD-4136: Convert the Field object to dictionary to resolve the shared
            # memory issue with pytest-xdist parallel execution
            one_field_group = fields_group.copy()
            one_field_group["fields"] = [each_field.__dict__]
            if fields_group["classname"] != "field_bank":
                test_type = "field"
            else:
                field_bank_id += 1
                test_type = f"field_bank_{field_bank_id}"

            stanza = fields_group["stanza"]
            yield pytest.param(
                one_field_group, id=f"{stanza}::{test_type}::{each_field}"
            )

generate_requirements_datamodels_tests()

Generate test case for datamodels

Yields:

Type Description

pytest.params for the test templates

Source code in pytest_splunk_addon/fields_tests/test_generator.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def generate_requirements_datamodels_tests(self):
    """
    Generate test case for datamodels

    Yields:
        pytest.params for the test templates
    """
    for event in self.tokenized_events:
        if (
            not event.requirement_test_data
            or event.requirement_test_data.keys() == {"other_fields"}
        ):
            continue
        if event.metadata.get("input_type", "").startswith("syslog"):
            stripped_event = xml_event_parser.strip_syslog_header(event.event)
            if stripped_event is None:
                LOGGER.error(
                    "Syslog event do not match CEF, RFC_3164, RFC_5424 format"
                )
                continue
        else:
            stripped_event = event.event

        escaped_event = xml_event_parser.escape_char_event(stripped_event)
        datamodels = event.requirement_test_data.get("datamodels")
        if datamodels:
            if type(datamodels) is dict:
                if type(datamodels["model"]) == list:
                    datamodels = datamodels["model"]
                else:
                    datamodels = [datamodels]
                    datamodels = [dm["model"] for dm in datamodels]
        else:
            datamodels = []
        datamodels = [
            datamodel.replace(" ", "_").replace(":", "_")
            for datamodel in datamodels
        ]
        yield pytest.param(
            {
                "datamodels": datamodels,
                "stanza": escaped_event,
            },
            id=f"{'-'.join(datamodels)}::sample_name::{event.sample_name}::host::{event.metadata.get('host')}",
        )

generate_requirements_tests()

Generate test cases for fields defined for datamodel This function generates tests previously covered by requirement tests

Yields:

Type Description

pytest.params for the test templates

Source code in pytest_splunk_addon/fields_tests/test_generator.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def generate_requirements_tests(self):
    """
    Generate test cases for fields defined for datamodel
    This function generates tests previously covered by requirement tests

    Yields:
        pytest.params for the test templates
    """
    for event in self.tokenized_events:
        if not event.requirement_test_data:
            continue
        if event.metadata.get("input_type", "").startswith("syslog"):
            stripped_event = xml_event_parser.strip_syslog_header(event.event)
            if stripped_event is None:
                LOGGER.error(
                    "Syslog event do not match CEF, RFC_3164, RFC_5424 format"
                )
                continue
        else:
            stripped_event = event.event

        escaped_event = xml_event_parser.escape_char_event(stripped_event)
        exceptions = event.requirement_test_data.get("exceptions", {})
        metadata = event.metadata
        modinput_params = {
            "sourcetype": metadata.get("sourcetype_to_search"),
        }

        cim_fields = event.requirement_test_data.get("cim_fields", {})
        other_fields = event.requirement_test_data.get("other_fields", {})
        requirement_fields = {**cim_fields, **other_fields}

        if requirement_fields:
            requirement_fields = {
                field: value
                for field, value in requirement_fields.items()
                if field not in exceptions
            }
            yield pytest.param(
                {
                    "escaped_event": escaped_event,
                    "fields": requirement_fields,
                    "modinput_params": modinput_params,
                },
                id=f"sample_name::{event.sample_name}::host::{event.metadata.get('host')}",
            )

generate_savedsearches_tests()

Generate test case for savedsearches

Yields:

Type Description

pytest.params for the test templates

Source code in pytest_splunk_addon/fields_tests/test_generator.py
214
215
216
217
218
219
220
221
222
223
224
def generate_savedsearches_tests(self):
    """
    Generate test case for savedsearches

    Yields:
        pytest.params for the test templates
    """
    for each_savedsearch in self.addon_parser.get_savedsearches():
        yield pytest.param(
            each_savedsearch, id="{stanza}".format(**each_savedsearch)
        )

generate_tag_tests()

Generate test case for tags

Yields:

Type Description

pytest.params for the test templates

Source code in pytest_splunk_addon/fields_tests/test_generator.py
143
144
145
146
147
148
149
150
151
152
153
def generate_tag_tests(self):
    """
    Generate test case for tags

    Yields:
        pytest.params for the test templates
    """
    for each_tag_group in self.addon_parser.get_tags():
        yield pytest.param(
            each_tag_group, id="{stanza}::tag::{tag}".format(**each_tag_group)
        )

generate_tests(fixture)

Generate the test cases based on the fixture provided supported fixtures:

* splunk_searchtime_fields_positive
* splunk_searchtime_fields_negative
* splunk_searchtime_fields_tags
* splunk_searchtime_fields_eventtypes
* splunk_searchtime_fields_savedsearches
* splunk_searchtime_fields_requirements

Parameters:

Name Type Description Default
fixture str

fixture name

required
Source code in pytest_splunk_addon/fields_tests/test_generator.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def generate_tests(self, fixture):
    """
    Generate the test cases based on the fixture provided
    supported fixtures:

        * splunk_searchtime_fields_positive
        * splunk_searchtime_fields_negative
        * splunk_searchtime_fields_tags
        * splunk_searchtime_fields_eventtypes
        * splunk_searchtime_fields_savedsearches
        * splunk_searchtime_fields_requirements

    Args:
        fixture (str): fixture name

    """
    if fixture.endswith("positive"):
        yield from self.generate_field_tests(is_positive=True)
    elif fixture.endswith("negative"):
        yield from self.generate_field_tests(is_positive=False)
    elif fixture.endswith("tags"):
        yield from self.generate_tag_tests()
    elif fixture.endswith("eventtypes"):
        yield from self.generate_eventtype_tests()
    elif fixture.endswith("savedsearches"):
        yield from self.generate_savedsearches_tests()
    elif fixture.endswith("requirements"):
        yield from self.generate_requirements_tests()
    elif fixture.endswith("datamodels"):
        yield from self.generate_requirements_datamodels_tests()

FieldBank

To enhance the test cases while verifying the field extractions.

FieldBank

Bases: object

List of fields with patterns and expected

values which should be tested for the Add-on.

Steps to use:

  1. Create a json file with the list of fields.

    Example::

    {
        "stanza_name": [    # Key should be stanza_name
            {
                "name": "action",
                "condition": "| regex _raw=\"success\""
                "validity": "action=if(action=\"unknown\", null(), action)"
                "expected_values": ["success", "failure"]
                "negative_values": ["", "-", "unknown"]
            }
        ]
    }
    

    .. csv-table:: :header: Parameter, Description

    condition, A filtering SPL command.
    validity, An EVAL statement. Filter out invalid value of a field
    expected_fields, List of expected_fields
    negative_fields, The list of values the field should not have
    

    supported stanza_type:

    1. source
    2. sourcetype
    
  2. Provide path of the json file with –field-bank=path parameter

Source code in pytest_splunk_addon/fields_tests/field_bank.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class FieldBank(object):
    """
    Supports field_bank: List of fields with patterns and expected
        values which should be tested for the Add-on.

    Steps to use:

    1. Create a json file with the list of fields.

        Example::

            {
                "stanza_name": [    # Key should be stanza_name
                    {
                        "name": "action",
                        "condition": "| regex _raw=\\"success\\""
                        "validity": "action=if(action=\\"unknown\\", null(), action)"
                        "expected_values": ["success", "failure"]
                        "negative_values": ["", "-", "unknown"]
                    }
                ]
            }


        .. csv-table::
            :header: Parameter, Description

            condition, A filtering SPL command.
            validity, An EVAL statement. Filter out invalid value of a field
            expected_fields, List of expected_fields
            negative_fields, The list of values the field should not have

        supported stanza_type:

            1. source
            2. sourcetype

    2. Provide path of the json file with --field-bank=path parameter
    """

    @classmethod
    def init_field_bank_tests(cls, field_bank_path):
        """
        Parse the field JSON file and return the list of fields

        Args:
            field_bank_path (str): Path of the field JSON file

        Yields:
            dict: details of the fields including stanza and stanza_type
        """
        if field_bank_path:
            with open(field_bank_path) as field_file:
                stanza_list = json.load(field_file)
            for each_stanza in stanza_list:
                if each_stanza.startswith("host::"):
                    continue
                field_list = Field.parse_fields(stanza_list[each_stanza])
                if each_stanza.startswith("source::"):
                    for each_source in PropsParser.get_list_of_sources(each_stanza):
                        yield {
                            "stanza": each_source,
                            "stanza_type": "source",
                            "classname": "field_bank",
                            "fields": field_list,
                        }
                else:
                    yield {
                        "stanza": each_stanza,
                        "stanza_type": "sourcetype",
                        "classname": "field_bank",
                        "fields": field_list,
                    }

init_field_bank_tests(field_bank_path) classmethod

Parse the field JSON file and return the list of fields

Parameters:

Name Type Description Default
field_bank_path str

Path of the field JSON file

required

Yields:

Name Type Description
dict

details of the fields including stanza and stanza_type

Source code in pytest_splunk_addon/fields_tests/field_bank.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@classmethod
def init_field_bank_tests(cls, field_bank_path):
    """
    Parse the field JSON file and return the list of fields

    Args:
        field_bank_path (str): Path of the field JSON file

    Yields:
        dict: details of the fields including stanza and stanza_type
    """
    if field_bank_path:
        with open(field_bank_path) as field_file:
            stanza_list = json.load(field_file)
        for each_stanza in stanza_list:
            if each_stanza.startswith("host::"):
                continue
            field_list = Field.parse_fields(stanza_list[each_stanza])
            if each_stanza.startswith("source::"):
                for each_source in PropsParser.get_list_of_sources(each_stanza):
                    yield {
                        "stanza": each_source,
                        "stanza_type": "source",
                        "classname": "field_bank",
                        "fields": field_list,
                    }
            else:
                yield {
                    "stanza": each_stanza,
                    "stanza_type": "sourcetype",
                    "classname": "field_bank",
                    "fields": field_list,
                }