Skip to content

IndexTimeTests

Test generation mechanism to verify the index time extractions of an Add-on

TestTemplates

Includes the test scenarios to check the index time properties of an Add-on.

IndexTimeTestTemplate

Bases: object

Test templates to test the index time fields of an App

Source code in pytest_splunk_addon/index_tests/test_templates.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
class IndexTimeTestTemplate(object):
    """
    Test templates to test the index time fields of an App
    """

    logger = logging.getLogger("pytest-splunk-addon-tests")

    @pytest.mark.first
    @pytest.mark.splunk_indextime
    def test_indextime_key_fields(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_indextime_key_fields,
        record_property,
        caplog,
    ):
        """
        This test case checks that a key_field has the expected values.
        The key fields are as follows:

            * src
            * src_port
            * dest
            * dest_port
            * dvc
            * host
            * user
            * url

        Args:
            splunk_search_util (SearchUtil): Object that helps to search on Splunk.
            splunk_ingest_data (fixture): To ingest data into splunk.
            splunk_indextime_key_fields (fixture): Test for key fields
            record_property (fixture): Document facts of test cases.
            caplog (fixture): fixture to capture logs.
        """

        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )

        assert splunk_indextime_key_fields.get(
            "identifier"
        ) or splunk_indextime_key_fields.get(
            "hosts"
        ), "Host or identifier fields cannot be determined from the config file.."

        if splunk_indextime_key_fields.get("identifier"):
            extra_filter = splunk_indextime_key_fields.get("identifier")
        else:
            extra_filter = (
                'host IN ("'
                + '","'.join(set(splunk_indextime_key_fields.get("hosts")))
                + '")'
            )
        fields_to_check = copy.deepcopy(
            splunk_indextime_key_fields["tokenized_event"].key_fields
        )

        query = "sourcetype={} {} | table {}".format(
            splunk_indextime_key_fields.get("sourcetype"),
            extra_filter,
            ",".join(fields_to_check),
        )

        search = "search {} {}".format(index_list, query)
        record_property("Query", search)
        LOGGER.debug("Base search for indextime key field test: {}".format(search))
        results = splunk_search_util.getFieldValuesList(
            search,
            interval=splunk_search_util.search_interval,
            retries=splunk_search_util.search_retry,
        )
        results = list(results)
        LOGGER.debug("Results:{}".format(results))

        if not results:
            assert False, (
                f"\nNo Events found for query." f"{format_search_query_log(search)}"
            )
        result_fields = dict()
        for result in results:
            for key, val in result.items():
                try:
                    result_fields[key].append(val)
                except KeyError:
                    result_fields[key] = [val]

        # This logic helps in comparing Results when the token is
        # only replaced once but the value is assigned to n events
        # Example syslog: all the headers are only tokenized once hence
        #   key_fields = {'host': ['dummy_host']}
        #   result_dict = {'host': ['dummy_host']*n}
        result_fields = {key: set(value) for key, value in result_fields.items()}
        fields_to_check = {key: set(value) for key, value in fields_to_check.items()}
        if not result_fields == fields_to_check:
            value_list, missing_keys = [], []
            for each_field in fields_to_check.keys():
                if each_field in result_fields.keys():
                    if not fields_to_check.get(each_field) == result_fields.get(
                        each_field
                    ):
                        value_list.append(
                            [
                                each_field,
                                fields_to_check[each_field],
                                result_fields.get(each_field),
                            ]
                        )
                else:
                    missing_keys.append([each_field, fields_to_check[each_field]])
            final_str = ""
            if value_list:
                result_str = get_table_output(
                    headers=["Key_field", "Expected_values", "Actual_values"],
                    value_list=[
                        [
                            each_value[0],
                            str(each_value[1]),
                            str(each_value[2]),
                        ]
                        for each_value in value_list
                    ],
                )
                final_str += f"Some values for the following key fields are missing\n\n{result_str}"

            if missing_keys:
                missing_keys_result_str = get_table_output(
                    headers=["Key_field", "Expected_values"],
                    value_list=[
                        [
                            each_key[0],
                            str(each_key[1]),
                        ]
                        for each_key in missing_keys
                    ],
                )
                final_str += f"\n\nSome key fields are not found in search results\n\n{missing_keys_result_str}"
            LOGGER.info(final_str)

            assert (
                int(len(value_list)) == 0 and int(len(missing_keys)) == 0
            ), f"\nSearch query: {search}{final_str}\n"

    @pytest.mark.first
    @pytest.mark.splunk_indextime
    def test_indextime_time(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_indextime_time,
        record_property,
        caplog,
    ):
        """
        This test case checks that _time value in the events has the expected values.

        Args:
            splunk_search_util (SearchUtil): Object that helps to search on Splunk.
            splunk_ingest_data (fixture): To ingest data into splunk.
            splunk_indextime_time (fixture): Test for _time field
            record_property (fixture): Document facts of test cases.
            caplog (fixture): fixture to capture logs.
        """
        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )

        assert splunk_indextime_time.get("identifier") or splunk_indextime_time.get(
            "hosts"
        ), "Host or identifier fields cannot be determined from the config file.."
        assert splunk_indextime_time[
            "tokenized_event"
        ].time_values, "_time field cannot be determined from the config file."

        if splunk_indextime_time.get("identifier"):
            extra_filter = splunk_indextime_time.get("identifier")
        else:
            extra_filter = (
                'host IN ("'
                + '","'.join(set(splunk_indextime_time.get("hosts")))
                + '")'
            )

        if splunk_indextime_time["tokenized_event"].time_values:
            extra_filter += " | eval e_time=_time"

        query = "sourcetype={} {} | table {}".format(
            splunk_indextime_time.get("sourcetype"),
            extra_filter,
            "e_time",
        )

        search = "search {} {}".format(index_list, query)

        record_property("Query", search)
        LOGGER.debug("Base search for indextime time field test: {}".format(search))
        results = splunk_search_util.getFieldValuesList(
            search,
            interval=splunk_search_util.search_interval,
            retries=splunk_search_util.search_retry,
        )
        results = list(results)
        LOGGER.debug("Results:{}".format(results))
        if not results:
            assert False, (
                f"\nNo Events found for query." f"{format_search_query_log(search)}"
            )
        result_fields = {
            key: [ceil(float(item[key])) for item in results]
            for key in results[0].keys()
        }

        key_time = [
            ceil(t) for t in splunk_indextime_time["tokenized_event"].time_values
        ]
        result_fields["e_time"].sort()
        key_time.sort()

        record_property("time_values", key_time)
        record_property("result_time", result_fields)

        assert all(
            timestamp in result_fields["e_time"] for timestamp in key_time
        ), "Actual time {} :: Time in result {}".format(
            key_time, result_fields["e_time"]
        )

    @pytest.mark.first
    @pytest.mark.splunk_indextime
    def test_indextime_line_breaker(
        self,
        splunk_search_util,
        splunk_ingest_data,
        splunk_setup,
        splunk_indextime_line_breaker,
        record_property,
        caplog,
    ):
        """
        This test case checks that number of events is as expected.

        Args:
            splunk_search_util (SearchUtil): Object that helps to search on Splunk.
            splunk_ingest_data (fixture): To ingest data into splunk.
            splunk_indextime_line_breaker (fixture): Test for event count
            record_property (fixture): Document facts of test cases.
            caplog (fixture): fixture to capture logs.
        """
        expected_events_count = int(
            splunk_indextime_line_breaker["expected_event_count"]
        )
        index_list = (
            "(index="
            + " OR index=".join(splunk_search_util.search_index.split(","))
            + ")"
        )
        host = '("' + '","'.join(splunk_indextime_line_breaker.get("host")) + '")'
        query = "search {} sourcetype={} host IN {} | stats count".format(
            index_list, splunk_indextime_line_breaker.get("sourcetype"), host
        )
        record_property("Query", query)

        LOGGER.debug("Base search for indextime key field test: {}".format(query))
        results = list(
            splunk_search_util.getFieldValuesList(
                query,
                interval=splunk_search_util.search_interval,
                retries=splunk_search_util.search_retry,
            )
        )
        count_from_results = int(results[0].get("count"))
        LOGGER.debug("Resulting count:{}".format(count_from_results))
        assert count_from_results == expected_events_count, (
            f"{format_search_query_log(query)}"
            f"\nExpected count: {expected_events_count} Actual Count: {count_from_results}"
        )

test_indextime_key_fields(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_indextime_key_fields, record_property, caplog)

This test case checks that a key_field has the expected values. The key fields are as follows:

* src
* src_port
* dest
* dest_port
* dvc
* host
* user
* url

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

Object that helps to search on Splunk.

required
splunk_ingest_data fixture

To ingest data into splunk.

required
splunk_indextime_key_fields fixture

Test for key fields

required
record_property fixture

Document facts of test cases.

required
caplog fixture

fixture to capture logs.

required
Source code in pytest_splunk_addon/index_tests/test_templates.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@pytest.mark.first
@pytest.mark.splunk_indextime
def test_indextime_key_fields(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_indextime_key_fields,
    record_property,
    caplog,
):
    """
    This test case checks that a key_field has the expected values.
    The key fields are as follows:

        * src
        * src_port
        * dest
        * dest_port
        * dvc
        * host
        * user
        * url

    Args:
        splunk_search_util (SearchUtil): Object that helps to search on Splunk.
        splunk_ingest_data (fixture): To ingest data into splunk.
        splunk_indextime_key_fields (fixture): Test for key fields
        record_property (fixture): Document facts of test cases.
        caplog (fixture): fixture to capture logs.
    """

    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )

    assert splunk_indextime_key_fields.get(
        "identifier"
    ) or splunk_indextime_key_fields.get(
        "hosts"
    ), "Host or identifier fields cannot be determined from the config file.."

    if splunk_indextime_key_fields.get("identifier"):
        extra_filter = splunk_indextime_key_fields.get("identifier")
    else:
        extra_filter = (
            'host IN ("'
            + '","'.join(set(splunk_indextime_key_fields.get("hosts")))
            + '")'
        )
    fields_to_check = copy.deepcopy(
        splunk_indextime_key_fields["tokenized_event"].key_fields
    )

    query = "sourcetype={} {} | table {}".format(
        splunk_indextime_key_fields.get("sourcetype"),
        extra_filter,
        ",".join(fields_to_check),
    )

    search = "search {} {}".format(index_list, query)
    record_property("Query", search)
    LOGGER.debug("Base search for indextime key field test: {}".format(search))
    results = splunk_search_util.getFieldValuesList(
        search,
        interval=splunk_search_util.search_interval,
        retries=splunk_search_util.search_retry,
    )
    results = list(results)
    LOGGER.debug("Results:{}".format(results))

    if not results:
        assert False, (
            f"\nNo Events found for query." f"{format_search_query_log(search)}"
        )
    result_fields = dict()
    for result in results:
        for key, val in result.items():
            try:
                result_fields[key].append(val)
            except KeyError:
                result_fields[key] = [val]

    # This logic helps in comparing Results when the token is
    # only replaced once but the value is assigned to n events
    # Example syslog: all the headers are only tokenized once hence
    #   key_fields = {'host': ['dummy_host']}
    #   result_dict = {'host': ['dummy_host']*n}
    result_fields = {key: set(value) for key, value in result_fields.items()}
    fields_to_check = {key: set(value) for key, value in fields_to_check.items()}
    if not result_fields == fields_to_check:
        value_list, missing_keys = [], []
        for each_field in fields_to_check.keys():
            if each_field in result_fields.keys():
                if not fields_to_check.get(each_field) == result_fields.get(
                    each_field
                ):
                    value_list.append(
                        [
                            each_field,
                            fields_to_check[each_field],
                            result_fields.get(each_field),
                        ]
                    )
            else:
                missing_keys.append([each_field, fields_to_check[each_field]])
        final_str = ""
        if value_list:
            result_str = get_table_output(
                headers=["Key_field", "Expected_values", "Actual_values"],
                value_list=[
                    [
                        each_value[0],
                        str(each_value[1]),
                        str(each_value[2]),
                    ]
                    for each_value in value_list
                ],
            )
            final_str += f"Some values for the following key fields are missing\n\n{result_str}"

        if missing_keys:
            missing_keys_result_str = get_table_output(
                headers=["Key_field", "Expected_values"],
                value_list=[
                    [
                        each_key[0],
                        str(each_key[1]),
                    ]
                    for each_key in missing_keys
                ],
            )
            final_str += f"\n\nSome key fields are not found in search results\n\n{missing_keys_result_str}"
        LOGGER.info(final_str)

        assert (
            int(len(value_list)) == 0 and int(len(missing_keys)) == 0
        ), f"\nSearch query: {search}{final_str}\n"

test_indextime_line_breaker(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_indextime_line_breaker, record_property, caplog)

This test case checks that number of events is as expected.

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

Object that helps to search on Splunk.

required
splunk_ingest_data fixture

To ingest data into splunk.

required
splunk_indextime_line_breaker fixture

Test for event count

required
record_property fixture

Document facts of test cases.

required
caplog fixture

fixture to capture logs.

required
Source code in pytest_splunk_addon/index_tests/test_templates.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
@pytest.mark.first
@pytest.mark.splunk_indextime
def test_indextime_line_breaker(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_indextime_line_breaker,
    record_property,
    caplog,
):
    """
    This test case checks that number of events is as expected.

    Args:
        splunk_search_util (SearchUtil): Object that helps to search on Splunk.
        splunk_ingest_data (fixture): To ingest data into splunk.
        splunk_indextime_line_breaker (fixture): Test for event count
        record_property (fixture): Document facts of test cases.
        caplog (fixture): fixture to capture logs.
    """
    expected_events_count = int(
        splunk_indextime_line_breaker["expected_event_count"]
    )
    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )
    host = '("' + '","'.join(splunk_indextime_line_breaker.get("host")) + '")'
    query = "search {} sourcetype={} host IN {} | stats count".format(
        index_list, splunk_indextime_line_breaker.get("sourcetype"), host
    )
    record_property("Query", query)

    LOGGER.debug("Base search for indextime key field test: {}".format(query))
    results = list(
        splunk_search_util.getFieldValuesList(
            query,
            interval=splunk_search_util.search_interval,
            retries=splunk_search_util.search_retry,
        )
    )
    count_from_results = int(results[0].get("count"))
    LOGGER.debug("Resulting count:{}".format(count_from_results))
    assert count_from_results == expected_events_count, (
        f"{format_search_query_log(query)}"
        f"\nExpected count: {expected_events_count} Actual Count: {count_from_results}"
    )

test_indextime_time(splunk_search_util, splunk_ingest_data, splunk_setup, splunk_indextime_time, record_property, caplog)

This test case checks that _time value in the events has the expected values.

Parameters:

Name Type Description Default
splunk_search_util SearchUtil

Object that helps to search on Splunk.

required
splunk_ingest_data fixture

To ingest data into splunk.

required
splunk_indextime_time fixture

Test for _time field

required
record_property fixture

Document facts of test cases.

required
caplog fixture

fixture to capture logs.

required
Source code in pytest_splunk_addon/index_tests/test_templates.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
@pytest.mark.first
@pytest.mark.splunk_indextime
def test_indextime_time(
    self,
    splunk_search_util,
    splunk_ingest_data,
    splunk_setup,
    splunk_indextime_time,
    record_property,
    caplog,
):
    """
    This test case checks that _time value in the events has the expected values.

    Args:
        splunk_search_util (SearchUtil): Object that helps to search on Splunk.
        splunk_ingest_data (fixture): To ingest data into splunk.
        splunk_indextime_time (fixture): Test for _time field
        record_property (fixture): Document facts of test cases.
        caplog (fixture): fixture to capture logs.
    """
    index_list = (
        "(index="
        + " OR index=".join(splunk_search_util.search_index.split(","))
        + ")"
    )

    assert splunk_indextime_time.get("identifier") or splunk_indextime_time.get(
        "hosts"
    ), "Host or identifier fields cannot be determined from the config file.."
    assert splunk_indextime_time[
        "tokenized_event"
    ].time_values, "_time field cannot be determined from the config file."

    if splunk_indextime_time.get("identifier"):
        extra_filter = splunk_indextime_time.get("identifier")
    else:
        extra_filter = (
            'host IN ("'
            + '","'.join(set(splunk_indextime_time.get("hosts")))
            + '")'
        )

    if splunk_indextime_time["tokenized_event"].time_values:
        extra_filter += " | eval e_time=_time"

    query = "sourcetype={} {} | table {}".format(
        splunk_indextime_time.get("sourcetype"),
        extra_filter,
        "e_time",
    )

    search = "search {} {}".format(index_list, query)

    record_property("Query", search)
    LOGGER.debug("Base search for indextime time field test: {}".format(search))
    results = splunk_search_util.getFieldValuesList(
        search,
        interval=splunk_search_util.search_interval,
        retries=splunk_search_util.search_retry,
    )
    results = list(results)
    LOGGER.debug("Results:{}".format(results))
    if not results:
        assert False, (
            f"\nNo Events found for query." f"{format_search_query_log(search)}"
        )
    result_fields = {
        key: [ceil(float(item[key])) for item in results]
        for key in results[0].keys()
    }

    key_time = [
        ceil(t) for t in splunk_indextime_time["tokenized_event"].time_values
    ]
    result_fields["e_time"].sort()
    key_time.sort()

    record_property("time_values", key_time)
    record_property("result_time", result_fields)

    assert all(
        timestamp in result_fields["e_time"] for timestamp in key_time
    ), "Actual time {} :: Time in result {}".format(
        key_time, result_fields["e_time"]
    )

TestGenerator

IndexTimeTestGenerator

Bases: object

Generates test cases to test the index time extraction of an Add-on.

  • Provides the pytest parameters to the test templates.
  • Supports key_fields: List of fields which should be tested for the Add-on.
Source code in pytest_splunk_addon/index_tests/test_generator.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
class IndexTimeTestGenerator(object):
    """
    Generates test cases to test the index time extraction of an Add-on.

    * Provides the pytest parameters to the test templates.
    * Supports key_fields: List of fields which should be tested
      for the Add-on.
    """

    def generate_tests(self, store_events, app_path, config_path, test_type):
        """
        Generates the test cases based on test_type

        Args:
            app_path (str): Path of the app package
            config_path (str): Path of package which contains pytest-splunk-addon-data.conf
            test_type (str): Type of test case

        Yields:
            pytest.params for the test templates

        """
        sample_generator = SampleXdistGenerator(app_path, config_path)
        store_sample = sample_generator.get_samples(store_events)
        tokenized_events = store_sample.get("tokenized_events")
        if not store_sample.get("conf_name") == "psa-data-gen":
            msg = (
                "Index time tests cannot be executed without "
                "pytest-splunk-addon-data.conf"
            )
            LOGGER.warning(msg)
            return msg

        if test_type == "line_breaker":
            LOGGER.info("Generating line breaker test")
            yield from self.generate_line_breaker_tests(tokenized_events)

        else:

            for tokenized_event in tokenized_events:

                identifier_key = tokenized_event.metadata.get("identifier")

                hosts = self.get_hosts(tokenized_event)

                # Generate test params only if key_fields
                if test_type == "key_fields" and tokenized_event.key_fields:
                    event = SampleEvent.copy(tokenized_event)
                    if tokenized_event.key_fields.get(
                        "host"
                    ) and tokenized_event.metadata.get("host_prefix"):
                        host_prefix = tokenized_event.metadata.get("host_prefix")
                        event.key_fields["host"] = self.add_host_prefix(
                            host_prefix, tokenized_event.key_fields.get("host")
                        )
                    LOGGER.debug(
                        "Generating Key field test with the following params:\nevent={e}\nidentifier_key={k}\nhosts={h}".format(
                            e=event, k=identifier_key, h=hosts
                        )
                    )
                    yield from self.generate_params(event, identifier_key, hosts)

                # Generate test only if time_values
                elif (
                    test_type == "_time"
                    and tokenized_event.metadata.get("timestamp_type") == "event"
                    and not (
                        int(tokenized_event.metadata.get("requirement_test_sample", 0))
                        > 0
                        and tokenized_event.time_values == []
                    )
                ):
                    LOGGER.debug(
                        "Generating time field test with the following params:\ntokenized_event={e}\nidentifier_key={k}\nhosts={h}".format(
                            e=tokenized_event, k=identifier_key, h=hosts
                        )
                    )
                    yield from self.generate_params(
                        tokenized_event, identifier_key, hosts
                    )

    def generate_line_breaker_tests(self, tokenized_events):
        """
        Generates test case for testing line breaker

        Yields:
            pytest.params for the test templates
        """

        line_breaker_params = {}
        sample_count = 1
        expected_count = 1

        # As all the sample events would have same properties except Host
        # Assigning those values outside the loop

        for event in tokenized_events:
            try:
                sample_count = int(event.metadata.get("sample_count", 1))
                expected_count = int(event.metadata.get("expected_event_count", 1))
                LOGGER.info(
                    "Sample Count: {}".format(
                        int(event.metadata.get("sample_count", 1))
                    )
                )
                LOGGER.info(
                    "Expected Count: {}".format(
                        int(event.metadata.get("expected_event_count", 1))
                    )
                )
            except ValueError as e:
                raise_warning("Invalid value  {}".format(e))

            if event.sample_name not in line_breaker_params:
                line_breaker_params[event.sample_name] = {}

            if not line_breaker_params[event.sample_name].get("sourcetype"):
                line_breaker_params[event.sample_name][
                    "sourcetype"
                ] = self.get_sourcetype(event)

            if not line_breaker_params[event.sample_name].get("expected_event_count"):
                if event.metadata.get("input_type") not in [
                    "modinput",
                    "windows_input",
                ]:
                    expected_count = expected_count * sample_count
                line_breaker_params[event.sample_name][
                    "expected_event_count"
                ] = expected_count

            if not line_breaker_params[event.sample_name].get("host"):
                line_breaker_params[event.sample_name]["host"] = set()

            event_host = self.get_hosts(event)
            if event_host:
                line_breaker_params[event.sample_name]["host"] |= set(event_host)

        for sample_name, params in line_breaker_params.items():
            LOGGER.debug(
                "Generating Line Breaker test with the following params:\nhost:{h}\nsourcetype:{s}\nexpected_event_count{e}".format(
                    h=params["host"],
                    s=params["sourcetype"],
                    e=params["expected_event_count"],
                )
            )
            yield pytest.param(
                {
                    "host": params["host"],
                    "sourcetype": params["sourcetype"],
                    "expected_event_count": params["expected_event_count"],
                },
                id="{}::{}".format(params["sourcetype"].replace(" ", "-"), sample_name),
            )

    def get_hosts(self, tokenized_event):
        """
        Returns value of host for event

        Args:
            tokenized_event (SampleEvent): Instance containing event info

        Returns:
            Value of host for event
        """
        if tokenized_event.metadata.get("host_type") in ("plugin", None):
            hosts = tokenized_event.metadata.get("host")
        elif tokenized_event.metadata.get("host_type") == "event":
            hosts = tokenized_event.key_fields.get("host")
        else:
            hosts = None
            LOGGER.error(
                "Invalid 'host_type' for stanza {}".format(tokenized_event.sample_name)
            )
        if isinstance(hosts, str):
            hosts = [hosts]
        if tokenized_event.metadata.get("host_prefix"):
            host_prefix = str(tokenized_event.metadata.get("host_prefix"))
            hosts = self.add_host_prefix(host_prefix, hosts)
        LOGGER.info(
            "Returning host with value {} for stanza {}".format(
                hosts, tokenized_event.sample_name
            )
        )
        return hosts

    def add_host_prefix(self, host_prefix, hosts):
        """
        Returns value of host with prefix

        Args:
            host_prefix (str): Prefix value to be added in host
            hosts (list): List of host

        Returns:
            Value of host with prefix
        """
        hosts = [host_prefix + str(host) for host in hosts]
        return hosts

    def get_sourcetype(self, sample_event):
        """
        Returns value of sourcetype for event

        Args:
            sample_event (SampleEvent): Instance containing event info

        Returns:
            Value of sourcetype for event
        """
        return sample_event.metadata.get(
            "sourcetype_to_search",
            sample_event.metadata.get("sourcetype", "*"),
        )

    def get_source(self, sample_event):
        """
        Returns value of source for event

        Args:
            sample_event (SampleEvent): Instance containing event info

        Returns:
            Value of source for event
        """
        return sample_event.metadata.get(
            "source_to_search", sample_event.metadata.get("source", "*")
        )

    def generate_params(self, tokenized_event, identifier_key, hosts):
        """
        Generates test case based on parameters

        Args:
            tokenized_event (SampleEvent): Instance containing event info
            identifier_key (str): Identifier Key if mention in conf file
            hosts (list): List of host for event

        Yields:
            pytest.params for the test templates
        """
        if identifier_key:
            yield from self.generate_identifier_params(tokenized_event, identifier_key)
        else:
            yield from self.generate_hosts_params(tokenized_event, hosts)

    def generate_identifier_params(self, tokenized_event, identifier_key):
        """
        Generates test case based on Identifier key mentioned in conf file

        Args:
            tokenized_event (SampleEvent): Instance containing event info
            identifier_key (str): Identifier Key if mention in conf file

        Yields:
            pytest.params for the test templates
        """
        identifier_val = tokenized_event.key_fields.get(identifier_key)
        for identifier in identifier_val:
            yield pytest.param(
                {
                    "identifier": identifier_key + "=" + identifier,
                    "sourcetype": self.get_sourcetype(tokenized_event),
                    "source": self.get_source(tokenized_event),
                    "tokenized_event": tokenized_event,
                },
                id="{}::{}:{}".format(
                    self.get_sourcetype(tokenized_event),
                    identifier_key,
                    identifier,
                ),
            )

    def generate_hosts_params(self, tokenized_event, hosts):
        """
        Generates test case based on host value of the event

        Args:
            tokenized_event (SampleEvent): Instance containing event info
            hosts (list): List of hosts for event

        Yields:
            pytest.params for the test templates
        """
        id_host = tokenized_event.sample_name

        if hosts:
            if len(hosts) == 1:
                id_host = hosts[0]
            else:
                id_host = hosts[0] + "_to_" + hosts[-1]

        yield pytest.param(
            {
                "hosts": hosts,
                "sourcetype": self.get_sourcetype(tokenized_event),
                "source": self.get_source(tokenized_event),
                "tokenized_event": tokenized_event,
            },
            id="{}::{}".format(self.get_sourcetype(tokenized_event), id_host),
        )

add_host_prefix(host_prefix, hosts)

Returns value of host with prefix

Parameters:

Name Type Description Default
host_prefix str

Prefix value to be added in host

required
hosts list

List of host

required

Returns:

Type Description

Value of host with prefix

Source code in pytest_splunk_addon/index_tests/test_generator.py
212
213
214
215
216
217
218
219
220
221
222
223
224
def add_host_prefix(self, host_prefix, hosts):
    """
    Returns value of host with prefix

    Args:
        host_prefix (str): Prefix value to be added in host
        hosts (list): List of host

    Returns:
        Value of host with prefix
    """
    hosts = [host_prefix + str(host) for host in hosts]
    return hosts

generate_hosts_params(tokenized_event, hosts)

Generates test case based on host value of the event

Parameters:

Name Type Description Default
tokenized_event SampleEvent

Instance containing event info

required
hosts list

List of hosts for event

required

Yields:

Type Description

pytest.params for the test templates

Source code in pytest_splunk_addon/index_tests/test_generator.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def generate_hosts_params(self, tokenized_event, hosts):
    """
    Generates test case based on host value of the event

    Args:
        tokenized_event (SampleEvent): Instance containing event info
        hosts (list): List of hosts for event

    Yields:
        pytest.params for the test templates
    """
    id_host = tokenized_event.sample_name

    if hosts:
        if len(hosts) == 1:
            id_host = hosts[0]
        else:
            id_host = hosts[0] + "_to_" + hosts[-1]

    yield pytest.param(
        {
            "hosts": hosts,
            "sourcetype": self.get_sourcetype(tokenized_event),
            "source": self.get_source(tokenized_event),
            "tokenized_event": tokenized_event,
        },
        id="{}::{}".format(self.get_sourcetype(tokenized_event), id_host),
    )

generate_identifier_params(tokenized_event, identifier_key)

Generates test case based on Identifier key mentioned in conf file

Parameters:

Name Type Description Default
tokenized_event SampleEvent

Instance containing event info

required
identifier_key str

Identifier Key if mention in conf file

required

Yields:

Type Description

pytest.params for the test templates

Source code in pytest_splunk_addon/index_tests/test_generator.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def generate_identifier_params(self, tokenized_event, identifier_key):
    """
    Generates test case based on Identifier key mentioned in conf file

    Args:
        tokenized_event (SampleEvent): Instance containing event info
        identifier_key (str): Identifier Key if mention in conf file

    Yields:
        pytest.params for the test templates
    """
    identifier_val = tokenized_event.key_fields.get(identifier_key)
    for identifier in identifier_val:
        yield pytest.param(
            {
                "identifier": identifier_key + "=" + identifier,
                "sourcetype": self.get_sourcetype(tokenized_event),
                "source": self.get_source(tokenized_event),
                "tokenized_event": tokenized_event,
            },
            id="{}::{}:{}".format(
                self.get_sourcetype(tokenized_event),
                identifier_key,
                identifier,
            ),
        )

generate_line_breaker_tests(tokenized_events)

Generates test case for testing line breaker

Yields:

Type Description

pytest.params for the test templates

Source code in pytest_splunk_addon/index_tests/test_generator.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def generate_line_breaker_tests(self, tokenized_events):
    """
    Generates test case for testing line breaker

    Yields:
        pytest.params for the test templates
    """

    line_breaker_params = {}
    sample_count = 1
    expected_count = 1

    # As all the sample events would have same properties except Host
    # Assigning those values outside the loop

    for event in tokenized_events:
        try:
            sample_count = int(event.metadata.get("sample_count", 1))
            expected_count = int(event.metadata.get("expected_event_count", 1))
            LOGGER.info(
                "Sample Count: {}".format(
                    int(event.metadata.get("sample_count", 1))
                )
            )
            LOGGER.info(
                "Expected Count: {}".format(
                    int(event.metadata.get("expected_event_count", 1))
                )
            )
        except ValueError as e:
            raise_warning("Invalid value  {}".format(e))

        if event.sample_name not in line_breaker_params:
            line_breaker_params[event.sample_name] = {}

        if not line_breaker_params[event.sample_name].get("sourcetype"):
            line_breaker_params[event.sample_name][
                "sourcetype"
            ] = self.get_sourcetype(event)

        if not line_breaker_params[event.sample_name].get("expected_event_count"):
            if event.metadata.get("input_type") not in [
                "modinput",
                "windows_input",
            ]:
                expected_count = expected_count * sample_count
            line_breaker_params[event.sample_name][
                "expected_event_count"
            ] = expected_count

        if not line_breaker_params[event.sample_name].get("host"):
            line_breaker_params[event.sample_name]["host"] = set()

        event_host = self.get_hosts(event)
        if event_host:
            line_breaker_params[event.sample_name]["host"] |= set(event_host)

    for sample_name, params in line_breaker_params.items():
        LOGGER.debug(
            "Generating Line Breaker test with the following params:\nhost:{h}\nsourcetype:{s}\nexpected_event_count{e}".format(
                h=params["host"],
                s=params["sourcetype"],
                e=params["expected_event_count"],
            )
        )
        yield pytest.param(
            {
                "host": params["host"],
                "sourcetype": params["sourcetype"],
                "expected_event_count": params["expected_event_count"],
            },
            id="{}::{}".format(params["sourcetype"].replace(" ", "-"), sample_name),
        )

generate_params(tokenized_event, identifier_key, hosts)

Generates test case based on parameters

Parameters:

Name Type Description Default
tokenized_event SampleEvent

Instance containing event info

required
identifier_key str

Identifier Key if mention in conf file

required
hosts list

List of host for event

required

Yields:

Type Description

pytest.params for the test templates

Source code in pytest_splunk_addon/index_tests/test_generator.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def generate_params(self, tokenized_event, identifier_key, hosts):
    """
    Generates test case based on parameters

    Args:
        tokenized_event (SampleEvent): Instance containing event info
        identifier_key (str): Identifier Key if mention in conf file
        hosts (list): List of host for event

    Yields:
        pytest.params for the test templates
    """
    if identifier_key:
        yield from self.generate_identifier_params(tokenized_event, identifier_key)
    else:
        yield from self.generate_hosts_params(tokenized_event, hosts)

generate_tests(store_events, app_path, config_path, test_type)

Generates the test cases based on test_type

Parameters:

Name Type Description Default
app_path str

Path of the app package

required
config_path str

Path of package which contains pytest-splunk-addon-data.conf

required
test_type str

Type of test case

required

Yields:

Type Description

pytest.params for the test templates

Source code in pytest_splunk_addon/index_tests/test_generator.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def generate_tests(self, store_events, app_path, config_path, test_type):
    """
    Generates the test cases based on test_type

    Args:
        app_path (str): Path of the app package
        config_path (str): Path of package which contains pytest-splunk-addon-data.conf
        test_type (str): Type of test case

    Yields:
        pytest.params for the test templates

    """
    sample_generator = SampleXdistGenerator(app_path, config_path)
    store_sample = sample_generator.get_samples(store_events)
    tokenized_events = store_sample.get("tokenized_events")
    if not store_sample.get("conf_name") == "psa-data-gen":
        msg = (
            "Index time tests cannot be executed without "
            "pytest-splunk-addon-data.conf"
        )
        LOGGER.warning(msg)
        return msg

    if test_type == "line_breaker":
        LOGGER.info("Generating line breaker test")
        yield from self.generate_line_breaker_tests(tokenized_events)

    else:

        for tokenized_event in tokenized_events:

            identifier_key = tokenized_event.metadata.get("identifier")

            hosts = self.get_hosts(tokenized_event)

            # Generate test params only if key_fields
            if test_type == "key_fields" and tokenized_event.key_fields:
                event = SampleEvent.copy(tokenized_event)
                if tokenized_event.key_fields.get(
                    "host"
                ) and tokenized_event.metadata.get("host_prefix"):
                    host_prefix = tokenized_event.metadata.get("host_prefix")
                    event.key_fields["host"] = self.add_host_prefix(
                        host_prefix, tokenized_event.key_fields.get("host")
                    )
                LOGGER.debug(
                    "Generating Key field test with the following params:\nevent={e}\nidentifier_key={k}\nhosts={h}".format(
                        e=event, k=identifier_key, h=hosts
                    )
                )
                yield from self.generate_params(event, identifier_key, hosts)

            # Generate test only if time_values
            elif (
                test_type == "_time"
                and tokenized_event.metadata.get("timestamp_type") == "event"
                and not (
                    int(tokenized_event.metadata.get("requirement_test_sample", 0))
                    > 0
                    and tokenized_event.time_values == []
                )
            ):
                LOGGER.debug(
                    "Generating time field test with the following params:\ntokenized_event={e}\nidentifier_key={k}\nhosts={h}".format(
                        e=tokenized_event, k=identifier_key, h=hosts
                    )
                )
                yield from self.generate_params(
                    tokenized_event, identifier_key, hosts
                )

get_hosts(tokenized_event)

Returns value of host for event

Parameters:

Name Type Description Default
tokenized_event SampleEvent

Instance containing event info

required

Returns:

Type Description

Value of host for event

Source code in pytest_splunk_addon/index_tests/test_generator.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def get_hosts(self, tokenized_event):
    """
    Returns value of host for event

    Args:
        tokenized_event (SampleEvent): Instance containing event info

    Returns:
        Value of host for event
    """
    if tokenized_event.metadata.get("host_type") in ("plugin", None):
        hosts = tokenized_event.metadata.get("host")
    elif tokenized_event.metadata.get("host_type") == "event":
        hosts = tokenized_event.key_fields.get("host")
    else:
        hosts = None
        LOGGER.error(
            "Invalid 'host_type' for stanza {}".format(tokenized_event.sample_name)
        )
    if isinstance(hosts, str):
        hosts = [hosts]
    if tokenized_event.metadata.get("host_prefix"):
        host_prefix = str(tokenized_event.metadata.get("host_prefix"))
        hosts = self.add_host_prefix(host_prefix, hosts)
    LOGGER.info(
        "Returning host with value {} for stanza {}".format(
            hosts, tokenized_event.sample_name
        )
    )
    return hosts

get_source(sample_event)

Returns value of source for event

Parameters:

Name Type Description Default
sample_event SampleEvent

Instance containing event info

required

Returns:

Type Description

Value of source for event

Source code in pytest_splunk_addon/index_tests/test_generator.py
241
242
243
244
245
246
247
248
249
250
251
252
253
def get_source(self, sample_event):
    """
    Returns value of source for event

    Args:
        sample_event (SampleEvent): Instance containing event info

    Returns:
        Value of source for event
    """
    return sample_event.metadata.get(
        "source_to_search", sample_event.metadata.get("source", "*")
    )

get_sourcetype(sample_event)

Returns value of sourcetype for event

Parameters:

Name Type Description Default
sample_event SampleEvent

Instance containing event info

required

Returns:

Type Description

Value of sourcetype for event

Source code in pytest_splunk_addon/index_tests/test_generator.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def get_sourcetype(self, sample_event):
    """
    Returns value of sourcetype for event

    Args:
        sample_event (SampleEvent): Instance containing event info

    Returns:
        Value of sourcetype for event
    """
    return sample_event.metadata.get(
        "sourcetype_to_search",
        sample_event.metadata.get("sourcetype", "*"),
    )