Skip to content

EventIngestor

HEC Event Ingestor

HECEventIngestor

Bases: EventIngestor

Class to ingest event via HEC Event

The format for required_configs is::

{
    hec_uri: {splunk_hec_scheme}://{splunk_host}:{hec_port}/services/collector,
    session_headers(dict):
    {
        "Authorization": f"Splunk <hec-token>",
    }
}

Parameters:

Name Type Description Default
required_configs dict

Dictionary containing hec_uri and session headers

required
Source code in pytest_splunk_addon/event_ingestors/hec_event_ingestor.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class HECEventIngestor(EventIngestor):
    """
    Class to ingest event via HEC Event

    The format for required_configs is::

        {
            hec_uri: {splunk_hec_scheme}://{splunk_host}:{hec_port}/services/collector,
            session_headers(dict):
            {
                "Authorization": f"Splunk <hec-token>",
            }
        }

    Args:
        required_configs (dict): Dictionary containing hec_uri and session headers

    """

    def __init__(self, required_configs):
        self.hec_uri = required_configs.get("splunk_hec_uri")
        self.session_headers = required_configs.get("session_headers")

    def ingest(self, events, thread_count):
        """
        Ingests event and metric data into splunk using HEC token via event endpoint.

        For batch ingestion of events in a single request at event endpoint provide a list of event dict to be ingested.

        The format of dictionary for ingesting a single event::

            {
                "sourcetype": "sample_HEC",
                "source": "sample_source",
                "host": "sample_host",
                "event": "event_str"
            }

        The format of dictionary for ingesting a batch of events::

            [
                {
                    "sourcetype": "sample_HEC",
                    "source": "sample_source",
                    "host": "sample_host",
                    "event": "event_str1"
                },
                {
                    "sourcetype": "sample_HEC",
                    "source": "sample_source",
                    "host": "sample_host",
                    "event": "event_str2"
                },
            ]

        Args:
            events (list): List of events (SampleEvent) to be ingested

        """
        data = list()
        for event in events:

            event_dict = {
                "sourcetype": event.metadata.get("sourcetype", "pytest_splunk_addon"),
                "source": event.metadata.get("source", "pytest_splunk_addon:hec:event"),
                "event": event.event,
                "index": event.metadata.get("index", "main"),
            }

            if event.metadata.get("host_type") in ("plugin", None):
                host = event.metadata.get("host")
            else:
                host = event.key_fields.get("host")[0]
            if host:
                event_dict["host"] = host

            if event.metadata.get("timestamp_type").lower() == "event":
                if event.time_values:
                    event_dict["time"] = event.time_values[0]
            data.append(event_dict)

        batch_event_list = []
        for i in range(0, len(data), 100):
            batch_event_list.append(data[i : i + 100])

        with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
            _ = list(executor.map(self.__ingest, batch_event_list))

    def __ingest(self, data):
        try:
            LOGGER.info(
                "Making a HEC event request with the following params:\nhec_uri:{}\nheaders:{}".format(
                    str(self.hec_uri), str(self.session_headers)
                )
            )
            LOGGER.debug(
                "Creating the following sample event to be ingested via HEC event endoipnt:{}".format(
                    str(data)
                )
            )
            response = requests.post(  # nosemgrep: splunk.disabled-cert-validation
                "{}/{}".format(self.hec_uri, "event"),
                auth=None,
                json=data,
                headers=self.session_headers,
                verify=False,
            )
            LOGGER.debug("Status code: {}".format(response.status_code))
            if response.status_code not in (200, 201):
                raise Exception(
                    "\nStatus code: {} \nReason: {} \ntext:{}".format(
                        response.status_code, response.reason, response.text
                    )
                )

        except Exception as e:
            LOGGER.error("\n\nAn error occurred while data ingestion.{}".format(e))
            raise type(e)("An error occurred while data ingestion.{}".format(e))

ingest(events, thread_count)

Ingests event and metric data into splunk using HEC token via event endpoint.

For batch ingestion of events in a single request at event endpoint provide a list of event dict to be ingested.

The format of dictionary for ingesting a single event::

{
    "sourcetype": "sample_HEC",
    "source": "sample_source",
    "host": "sample_host",
    "event": "event_str"
}

The format of dictionary for ingesting a batch of events::

[
    {
        "sourcetype": "sample_HEC",
        "source": "sample_source",
        "host": "sample_host",
        "event": "event_str1"
    },
    {
        "sourcetype": "sample_HEC",
        "source": "sample_source",
        "host": "sample_host",
        "event": "event_str2"
    },
]

Parameters:

Name Type Description Default
events list

List of events (SampleEvent) to be ingested

required
Source code in pytest_splunk_addon/event_ingestors/hec_event_ingestor.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def ingest(self, events, thread_count):
    """
    Ingests event and metric data into splunk using HEC token via event endpoint.

    For batch ingestion of events in a single request at event endpoint provide a list of event dict to be ingested.

    The format of dictionary for ingesting a single event::

        {
            "sourcetype": "sample_HEC",
            "source": "sample_source",
            "host": "sample_host",
            "event": "event_str"
        }

    The format of dictionary for ingesting a batch of events::

        [
            {
                "sourcetype": "sample_HEC",
                "source": "sample_source",
                "host": "sample_host",
                "event": "event_str1"
            },
            {
                "sourcetype": "sample_HEC",
                "source": "sample_source",
                "host": "sample_host",
                "event": "event_str2"
            },
        ]

    Args:
        events (list): List of events (SampleEvent) to be ingested

    """
    data = list()
    for event in events:

        event_dict = {
            "sourcetype": event.metadata.get("sourcetype", "pytest_splunk_addon"),
            "source": event.metadata.get("source", "pytest_splunk_addon:hec:event"),
            "event": event.event,
            "index": event.metadata.get("index", "main"),
        }

        if event.metadata.get("host_type") in ("plugin", None):
            host = event.metadata.get("host")
        else:
            host = event.key_fields.get("host")[0]
        if host:
            event_dict["host"] = host

        if event.metadata.get("timestamp_type").lower() == "event":
            if event.time_values:
                event_dict["time"] = event.time_values[0]
        data.append(event_dict)

    batch_event_list = []
    for i in range(0, len(data), 100):
        batch_event_list.append(data[i : i + 100])

    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        _ = list(executor.map(self.__ingest, batch_event_list))

HEC Raw Ingestor

HECRawEventIngestor

Bases: EventIngestor

Class to ingest event via HEC Raw

The format for required_configs is::

{
    hec_uri: {splunk_hec_scheme}://{splunk_host}:{hec_port}/services/collector,
    session_headers(dict):
    {
        "Authorization": f"Splunk <hec-token>",
    }
}

Parameters:

Name Type Description Default
required_configs(dict)

Dictionary containing hec_uri and session headers

required
Source code in pytest_splunk_addon/event_ingestors/hec_raw_ingestor.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class HECRawEventIngestor(EventIngestor):
    """
    Class to ingest event via HEC Raw

    The format for required_configs is::

        {
            hec_uri: {splunk_hec_scheme}://{splunk_host}:{hec_port}/services/collector,
            session_headers(dict):
            {
                "Authorization": f"Splunk <hec-token>",
            }
        }


    Args:
        required_configs(dict): Dictionary containing hec_uri and session headers
    """

    def __init__(self, required_configs):
        self.hec_uri = required_configs["splunk_hec_uri"]
        self.session_headers = required_configs["session_headers"]

    def ingest(self, events, thread_count):
        """
        Ingests data into splunk via raw endpoint.

        For batch ingestion of events in a single request at raw endpoint provide a string of events in data to be ingested.

        The format of event and params for ingesting a single event::

            '127.0.0.1 - admin [28/Sep/2016:09:05:26.875 -0700] "GET /servicesNS/admin/launcher/data/ui/views?count=-1 HTTP/1.0" 200 126721 - - - 6ms'

            {
                "sourcetype": "sample_HEC",
                "source": "sample_source",
                "host": "sample_host",
            }

        The format of event and params for ingesting a batch of events::

                '''127.0.0.1 - admin [28/Sep/2016:09:05:26.875 -0700] "GET /servicesNS/admin/launcher/data/ui/views?count=-1 HTTP/1.0" 200 126721 - - - 6ms
                127.0.0.1 - admin [28/Sep/2016:09:05:26.917 -0700] "GET /servicesNS/admin/launcher/data/ui/nav/default HTTP/1.0" 200 4367 - - - 6ms
                127.0.0.1 - admin [28/Sep/2016:09:05:26.941 -0700] "GET /services/apps/local?search=disabled%3Dfalse&count=-1 HTTP/1.0" 200 31930 - - - 4ms'''

            {
                "sourcetype": "sample_HEC",
                "source": "sample_source",
                "host": "sample_host",
            }

        Args:
            events (list): List of events (SampleEvent) to be ingested
            params (dict): dict with the info of the data to be ingested.

        """
        main_event = []
        param_list = []
        for event in events:
            event_dict = {
                "sourcetype": event.metadata.get("sourcetype", "pytest_splunk_addon"),
                "source": event.metadata.get("source", "pytest_splunk_addon:hec:raw"),
                "index": event.metadata.get("index", "main"),
            }

            if event.metadata.get("host"):
                event_dict["host"] = event.metadata.get("host")

            param_list.append(event_dict)

            main_event.append(event.event)
        with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
            _ = list(executor.map(self.__ingest, main_event, param_list))

    def __ingest(self, event, params):
        try:
            LOGGER.info(
                "Making a HEC raw endpoint request with the following params:\nhec_uri:{}\nheaders:{}".format(
                    str(self.hec_uri), str(self.session_headers)
                )
            )
            LOGGER.debug(
                "Creating the following sample event to be ingested via HEC RAW endpoint:\nEvents: {}\nParams:{}".format(
                    str(event), str(params)
                )
            )
            response = requests.post(  # nosemgrep: splunk.disabled-cert-validation
                "{}/{}".format(self.hec_uri, "raw"),
                auth=None,
                data=event,
                params=params,
                headers=self.session_headers,
                verify=False,
            )
            LOGGER.debug("Status code: {}".format(response.status_code))
            if response.status_code not in (200, 201):
                raise Exception(
                    "\nStatus code: {} \nReason: {} \ntext:{}".format(
                        response.status_code, response.reason, response.text
                    )
                )

        except Exception as e:
            LOGGER.error("\n\nAn error occurred while data ingestion.{}".format(e))
            raise type(e)("An error occurred while data ingestion.{}".format(e))

ingest(events, thread_count)

Ingests data into splunk via raw endpoint.

For batch ingestion of events in a single request at raw endpoint provide a string of events in data to be ingested.

The format of event and params for ingesting a single event::

'127.0.0.1 - admin [28/Sep/2016:09:05:26.875 -0700] "GET /servicesNS/admin/launcher/data/ui/views?count=-1 HTTP/1.0" 200 126721 - - - 6ms'

{
    "sourcetype": "sample_HEC",
    "source": "sample_source",
    "host": "sample_host",
}

The format of event and params for ingesting a batch of events::

    '''127.0.0.1 - admin [28/Sep/2016:09:05:26.875 -0700] "GET /servicesNS/admin/launcher/data/ui/views?count=-1 HTTP/1.0" 200 126721 - - - 6ms
    127.0.0.1 - admin [28/Sep/2016:09:05:26.917 -0700] "GET /servicesNS/admin/launcher/data/ui/nav/default HTTP/1.0" 200 4367 - - - 6ms
    127.0.0.1 - admin [28/Sep/2016:09:05:26.941 -0700] "GET /services/apps/local?search=disabled%3Dfalse&count=-1 HTTP/1.0" 200 31930 - - - 4ms'''

{
    "sourcetype": "sample_HEC",
    "source": "sample_source",
    "host": "sample_host",
}

Parameters:

Name Type Description Default
events list

List of events (SampleEvent) to be ingested

required
params dict

dict with the info of the data to be ingested.

required
Source code in pytest_splunk_addon/event_ingestors/hec_raw_ingestor.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def ingest(self, events, thread_count):
    """
    Ingests data into splunk via raw endpoint.

    For batch ingestion of events in a single request at raw endpoint provide a string of events in data to be ingested.

    The format of event and params for ingesting a single event::

        '127.0.0.1 - admin [28/Sep/2016:09:05:26.875 -0700] "GET /servicesNS/admin/launcher/data/ui/views?count=-1 HTTP/1.0" 200 126721 - - - 6ms'

        {
            "sourcetype": "sample_HEC",
            "source": "sample_source",
            "host": "sample_host",
        }

    The format of event and params for ingesting a batch of events::

            '''127.0.0.1 - admin [28/Sep/2016:09:05:26.875 -0700] "GET /servicesNS/admin/launcher/data/ui/views?count=-1 HTTP/1.0" 200 126721 - - - 6ms
            127.0.0.1 - admin [28/Sep/2016:09:05:26.917 -0700] "GET /servicesNS/admin/launcher/data/ui/nav/default HTTP/1.0" 200 4367 - - - 6ms
            127.0.0.1 - admin [28/Sep/2016:09:05:26.941 -0700] "GET /services/apps/local?search=disabled%3Dfalse&count=-1 HTTP/1.0" 200 31930 - - - 4ms'''

        {
            "sourcetype": "sample_HEC",
            "source": "sample_source",
            "host": "sample_host",
        }

    Args:
        events (list): List of events (SampleEvent) to be ingested
        params (dict): dict with the info of the data to be ingested.

    """
    main_event = []
    param_list = []
    for event in events:
        event_dict = {
            "sourcetype": event.metadata.get("sourcetype", "pytest_splunk_addon"),
            "source": event.metadata.get("source", "pytest_splunk_addon:hec:raw"),
            "index": event.metadata.get("index", "main"),
        }

        if event.metadata.get("host"):
            event_dict["host"] = event.metadata.get("host")

        param_list.append(event_dict)

        main_event.append(event.event)
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        _ = list(executor.map(self.__ingest, main_event, param_list))

SC4S Event Ingestor

SC4SEventIngestor

Bases: EventIngestor

Class to Ingest Events via SC4S

The format for required_configs is::

{
    sc4s_host (str): Address of the Splunk Server. Do not provide http scheme in the host.
    sc4s_port (int): Port number of the above host address
}

Parameters:

Name Type Description Default
required_configs dict

Dictionary containing splunk host and sc4s port

required
Source code in pytest_splunk_addon/event_ingestors/sc4s_event_ingestor.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class SC4SEventIngestor(EventIngestor):
    """
    Class to Ingest Events via SC4S

    The format for required_configs is::

        {
            sc4s_host (str): Address of the Splunk Server. Do not provide http scheme in the host.
            sc4s_port (int): Port number of the above host address
        }

    Args:
        required_configs (dict): Dictionary containing splunk host and sc4s port
    """

    def __init__(self, required_configs):
        self.sc4s_host = required_configs["sc4s_host"]
        self.sc4s_port = required_configs["sc4s_port"]
        self.server_address = (
            required_configs["sc4s_host"],
            required_configs["sc4s_port"],
        )

    def ingest(self, events, thread_count):
        """
        Ingests events in the splunk via sc4s (Single/Batch of Events)

        Args:
            events (list): Events with newline character or LineBreaker as separator

        """

        # This loop just checks for a viable remote connection
        tried = 0
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        while True:
            try:
                sock.connect(self.server_address)
                break
            except Exception as e:
                tried += 1
                LOGGER.debug("Attempt {} to ingest data with SC4S".format(str(tried)))
                if tried > 90:
                    LOGGER.error(
                        "Failed to ingest event with SC4S {} times".format(str(tried))
                    )
                    raise e
                sleep(1)
            finally:
                sock.close()

        raw_events = list()
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect(self.server_address)
        for event in events:
            # raw_events.extend()
            for se in event.event.splitlines():
                try:
                    sock.sendall(str.encode(se + "\n"))
                except Exception as e:
                    LOGGER.debug("Attempt ingest data with SC4S=".format(se))
                    LOGGER.exception(e)
                    sleep(1)
        sock.close()

ingest(events, thread_count)

Ingests events in the splunk via sc4s (Single/Batch of Events)

Parameters:

Name Type Description Default
events list

Events with newline character or LineBreaker as separator

required
Source code in pytest_splunk_addon/event_ingestors/sc4s_event_ingestor.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def ingest(self, events, thread_count):
    """
    Ingests events in the splunk via sc4s (Single/Batch of Events)

    Args:
        events (list): Events with newline character or LineBreaker as separator

    """

    # This loop just checks for a viable remote connection
    tried = 0
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    while True:
        try:
            sock.connect(self.server_address)
            break
        except Exception as e:
            tried += 1
            LOGGER.debug("Attempt {} to ingest data with SC4S".format(str(tried)))
            if tried > 90:
                LOGGER.error(
                    "Failed to ingest event with SC4S {} times".format(str(tried))
                )
                raise e
            sleep(1)
        finally:
            sock.close()

    raw_events = list()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(self.server_address)
    for event in events:
        # raw_events.extend()
        for se in event.event.splitlines():
            try:
                sock.sendall(str.encode(se + "\n"))
            except Exception as e:
                LOGGER.debug("Attempt ingest data with SC4S=".format(se))
                LOGGER.exception(e)
                sleep(1)
    sock.close()

File Monitor Ingestor

FileMonitorEventIngestor

Bases: EventIngestor

Class to ingest event via File monitor This ingestor will only work if splunk_type is docker and container of universal forwarder is linked with container of splunk instance as ‘splunk’ service.

The format for required_configs is::

{
    uf_host: Host of universal forwarder
    uf_port: Management port of universal forwarder
    uf_username: Name of user for universal forwarder
    uf_password: Password of universal forwarder
}

Parameters:

Name Type Description Default
required_configs dict

Dictionary containing information about universal forwarder

required
Source code in pytest_splunk_addon/event_ingestors/file_monitor_ingestor.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class FileMonitorEventIngestor(EventIngestor):
    """
    Class to ingest event via File monitor
    This ingestor will only work if splunk_type is docker and container of universal forwarder is linked with container
    of splunk instance as 'splunk' service.

    The format for required_configs is::

        {
            uf_host: Host of universal forwarder
            uf_port: Management port of universal forwarder
            uf_username: Name of user for universal forwarder
            uf_password: Password of universal forwarder
        }

    Args:
        required_configs (dict): Dictionary containing information about universal forwarder

    """

    def __init__(self, required_configs):
        self.uf_host = required_configs.get("uf_host")
        self.uf_port = required_configs.get("uf_port")
        self.uf_username = required_configs.get("uf_username")
        self.uf_password = required_configs.get("uf_password")
        # Container of universal forwarder is linked with splunk instance.
        # So using splunk_host as splunk and port 9997 directly.
        self.splunk_host = "splunk"
        self.splunk_s2s_port = "9997"
        self.uf_rest_uri = "https://{}:{}".format(self.uf_host, self.uf_port)
        self.outputs_endpoint = "{}/services/data/outputs/tcp/group".format(
            self.uf_rest_uri
        )
        self.inputs_endpoint = "{}/servicesNS/nobody/search/data/inputs/monitor".format(
            self.uf_rest_uri
        )

    def ingest(self, events, thread_count):
        """
        Ingests data into splunk via file monitor.
        Args:
            events (list): List of events (SampleEvent) to be ingested
        """
        self.create_output_conf()
        for each_event in events:
            self.create_event_file(each_event)
            sleep(10)
            self.create_inputs_stanza(each_event)

    def create_output_conf(self):
        """
        Create stanza in outputs.conf file of universal forwarder to send on splunk(indexer).
        """
        tcp_out_dict = {
            "name": "uf_monitor",
            "servers": "{}:{}".format(self.splunk_host, self.splunk_s2s_port),
        }
        LOGGER.info(
            "Making rest call to create stanza in outputs.conf file with following endpoint : {}".format(
                self.outputs_endpoint
            )
        )
        LOGGER.debug(
            "Creating following stanza in output.conf : {}".format(tcp_out_dict)
        )
        try:
            response = requests.post(  # nosemgrep: splunk.disabled-cert-validation
                self.outputs_endpoint,
                tcp_out_dict,
                auth=(self.uf_username, self.uf_password),
                verify=False,
            )
            if response.status_code not in (200, 201):
                LOGGER.warning(
                    "Unable to create stanza in outputs.conf\nStatus code: {} \nReason: {} \ntext:{}".format(
                        response.status_code, response.reason, response.text
                    )
                )
        except ConnectionError as e:
            LOGGER.error("Unable to connect to Universal forwarder, {}".format(e))

    def create_event_file(self, event):
        """
        Write each tokenized event in files with host name as name of file. The host of all events will be unique.

        Args:
            event (SampleEvent): Instance containing event info
        """
        try:
            with open(self.get_file_path(event), "w+") as fp:
                LOGGER.info(
                    "Writing events file for host={}".format(event.metadata.get("host"))
                )
                fp.write(event.event)
                LOGGER.debug(
                    "Wrote tokenized events file on path : {}".format(
                        self.get_file_path(event)
                    )
                )
        except Exception as e:
            LOGGER.warning(
                "Unable to create event file for host : {}, Reason : {}".format(
                    event.metadata.get("host"), e
                )
            )

    def create_inputs_stanza(self, event):
        """
        Create stanza in inputs.conf on universal forwarder for each tokenized event.

        Args:
            event (SampleEvent): Instance containing event info
        """
        file_path = self.get_file_path(event)
        host_segment = len(file_path.split(os.sep)) - 2
        # If file path is /home/uf_files/host_name/sample_name
        # Then split will return ['', home, 'uf_files', 'host_name', 'sample_name']
        sourcetype = event.metadata.get("sourcetype")
        if not sourcetype:
            sourcetype = event.metadata.get(
                "sourcetype_to_search", "pytest_splunk_addon"
            )
        stanza = {
            "name": file_path,
            "sourcetype": sourcetype,
            "index": event.metadata.get("index", "main"),
            "disabled": False,
            "crc-salt": "<SOURCE>",
        }
        if event.metadata.get("host_type") in ("plugin"):
            stanza["host_segment"] = host_segment
        LOGGER.info(
            "Making rest call to create stanza in inputs.conf file with following endpoint : {}".format(
                self.inputs_endpoint
            )
        )
        LOGGER.debug("Creating following stanza in inputs.conf : {}".format(stanza))
        try:
            response = requests.post(  # nosemgrep: splunk.disabled-cert-validation
                self.inputs_endpoint,
                stanza,
                auth=(self.uf_username, self.uf_password),
                verify=False,
            )
            if response.status_code not in (200, 201):
                LOGGER.warning(
                    "Unable to add stanza in inputs.conf for Path : {} \nStatus code: {} \nReason: {} \ntext:{}".format(
                        file_path, response.status_code, response.reason, response.text
                    )
                )
        except ConnectionError as e:
            LOGGER.error("Unable to connect to Universal forwarder, {}".format(e))

    def get_file_path(self, event):
        """
        Returns absolute path for tokenized events.

        Args:
            event (SampleEvent): Instance containing event info
        """
        host_dir = os.path.join(os.getcwd(), MONITOR_DIR, event.metadata.get("host"))
        if not os.path.exists(host_dir):
            os.mkdir(host_dir)
        return os.path.join(host_dir, event.metadata.get("source"))

create_event_file(event)

Write each tokenized event in files with host name as name of file. The host of all events will be unique.

Parameters:

Name Type Description Default
event SampleEvent

Instance containing event info

required
Source code in pytest_splunk_addon/event_ingestors/file_monitor_ingestor.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def create_event_file(self, event):
    """
    Write each tokenized event in files with host name as name of file. The host of all events will be unique.

    Args:
        event (SampleEvent): Instance containing event info
    """
    try:
        with open(self.get_file_path(event), "w+") as fp:
            LOGGER.info(
                "Writing events file for host={}".format(event.metadata.get("host"))
            )
            fp.write(event.event)
            LOGGER.debug(
                "Wrote tokenized events file on path : {}".format(
                    self.get_file_path(event)
                )
            )
    except Exception as e:
        LOGGER.warning(
            "Unable to create event file for host : {}, Reason : {}".format(
                event.metadata.get("host"), e
            )
        )

create_inputs_stanza(event)

Create stanza in inputs.conf on universal forwarder for each tokenized event.

Parameters:

Name Type Description Default
event SampleEvent

Instance containing event info

required
Source code in pytest_splunk_addon/event_ingestors/file_monitor_ingestor.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def create_inputs_stanza(self, event):
    """
    Create stanza in inputs.conf on universal forwarder for each tokenized event.

    Args:
        event (SampleEvent): Instance containing event info
    """
    file_path = self.get_file_path(event)
    host_segment = len(file_path.split(os.sep)) - 2
    # If file path is /home/uf_files/host_name/sample_name
    # Then split will return ['', home, 'uf_files', 'host_name', 'sample_name']
    sourcetype = event.metadata.get("sourcetype")
    if not sourcetype:
        sourcetype = event.metadata.get(
            "sourcetype_to_search", "pytest_splunk_addon"
        )
    stanza = {
        "name": file_path,
        "sourcetype": sourcetype,
        "index": event.metadata.get("index", "main"),
        "disabled": False,
        "crc-salt": "<SOURCE>",
    }
    if event.metadata.get("host_type") in ("plugin"):
        stanza["host_segment"] = host_segment
    LOGGER.info(
        "Making rest call to create stanza in inputs.conf file with following endpoint : {}".format(
            self.inputs_endpoint
        )
    )
    LOGGER.debug("Creating following stanza in inputs.conf : {}".format(stanza))
    try:
        response = requests.post(  # nosemgrep: splunk.disabled-cert-validation
            self.inputs_endpoint,
            stanza,
            auth=(self.uf_username, self.uf_password),
            verify=False,
        )
        if response.status_code not in (200, 201):
            LOGGER.warning(
                "Unable to add stanza in inputs.conf for Path : {} \nStatus code: {} \nReason: {} \ntext:{}".format(
                    file_path, response.status_code, response.reason, response.text
                )
            )
    except ConnectionError as e:
        LOGGER.error("Unable to connect to Universal forwarder, {}".format(e))

create_output_conf()

Create stanza in outputs.conf file of universal forwarder to send on splunk(indexer).

Source code in pytest_splunk_addon/event_ingestors/file_monitor_ingestor.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def create_output_conf(self):
    """
    Create stanza in outputs.conf file of universal forwarder to send on splunk(indexer).
    """
    tcp_out_dict = {
        "name": "uf_monitor",
        "servers": "{}:{}".format(self.splunk_host, self.splunk_s2s_port),
    }
    LOGGER.info(
        "Making rest call to create stanza in outputs.conf file with following endpoint : {}".format(
            self.outputs_endpoint
        )
    )
    LOGGER.debug(
        "Creating following stanza in output.conf : {}".format(tcp_out_dict)
    )
    try:
        response = requests.post(  # nosemgrep: splunk.disabled-cert-validation
            self.outputs_endpoint,
            tcp_out_dict,
            auth=(self.uf_username, self.uf_password),
            verify=False,
        )
        if response.status_code not in (200, 201):
            LOGGER.warning(
                "Unable to create stanza in outputs.conf\nStatus code: {} \nReason: {} \ntext:{}".format(
                    response.status_code, response.reason, response.text
                )
            )
    except ConnectionError as e:
        LOGGER.error("Unable to connect to Universal forwarder, {}".format(e))

get_file_path(event)

Returns absolute path for tokenized events.

Parameters:

Name Type Description Default
event SampleEvent

Instance containing event info

required
Source code in pytest_splunk_addon/event_ingestors/file_monitor_ingestor.py
180
181
182
183
184
185
186
187
188
189
190
def get_file_path(self, event):
    """
    Returns absolute path for tokenized events.

    Args:
        event (SampleEvent): Instance containing event info
    """
    host_dir = os.path.join(os.getcwd(), MONITOR_DIR, event.metadata.get("host"))
    if not os.path.exists(host_dir):
        os.mkdir(host_dir)
    return os.path.join(host_dir, event.metadata.get("source"))

ingest(events, thread_count)

Ingests data into splunk via file monitor. Args: events (list): List of events (SampleEvent) to be ingested

Source code in pytest_splunk_addon/event_ingestors/file_monitor_ingestor.py
64
65
66
67
68
69
70
71
72
73
74
def ingest(self, events, thread_count):
    """
    Ingests data into splunk via file monitor.
    Args:
        events (list): List of events (SampleEvent) to be ingested
    """
    self.create_output_conf()
    for each_event in events:
        self.create_event_file(each_event)
        sleep(10)
        self.create_inputs_stanza(each_event)