Skip to content

event_writer.py

This module provides two kinds of event writers (ClassicEventWriter, HECEventWriter) to write Splunk modular input events.

__all__ = ['ClassicEventWriter', 'HECEventWriter'] module-attribute

ClassicEventWriter

Bases: EventWriter

Classic event writer.

Use sys.stdout as the output.

Examples:

>>> from solnlib.modular_input import event_writer
>>> ew = event_writer.ClassicEventWriter()
>>> ew.write_events([event1, event2])
Source code in solnlib/modular_input/event_writer.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class ClassicEventWriter(EventWriter):
    """Classic event writer.

    Use sys.stdout as the output.

    Examples:
        >>> from solnlib.modular_input import event_writer
        >>> ew = event_writer.ClassicEventWriter()
        >>> ew.write_events([event1, event2])
    """

    description = "ClassicEventWriter"

    def __init__(self, lock: Union[threading.Lock, multiprocessing.Lock] = None):
        """Initializes ClassicEventWriter.

        Arguments:
            lock: (optional) lock to exclusively access stdout.
                by default, it is None and it will use threading safe lock.
                if user would like to make the lock multiple-process safe, user should
                pass in multiprocessing.Lock() instead
        """
        if lock is None:
            self._lock = threading.Lock()
        else:
            self._lock = lock

    def create_event(
        self,
        data: dict,
        time: float = None,
        index: str = None,
        host: str = None,
        source: str = None,
        sourcetype: str = None,
        fields: dict = None,
        stanza: str = None,
        unbroken: bool = False,
        done: bool = False,
    ):
        """Create a new XMLEvent object."""

        return XMLEvent(
            data,
            time=time,
            index=index,
            host=host,
            source=source,
            sourcetype=sourcetype,
            stanza=stanza,
            unbroken=unbroken,
            done=done,
        )

    def write_events(self, events):
        if not events:
            return

        stdout = sys.stdout

        data = "".join([event for event in XMLEvent.format_events(events)])
        with self._lock:
            stdout.write(data)
            stdout.flush()

description = 'ClassicEventWriter' class-attribute instance-attribute

__init__(lock=None)

Initializes ClassicEventWriter.

Parameters:

Name Type Description Default
lock Union[threading.Lock, multiprocessing.Lock]

(optional) lock to exclusively access stdout. by default, it is None and it will use threading safe lock. if user would like to make the lock multiple-process safe, user should pass in multiprocessing.Lock() instead

None
Source code in solnlib/modular_input/event_writer.py
123
124
125
126
127
128
129
130
131
132
133
134
135
def __init__(self, lock: Union[threading.Lock, multiprocessing.Lock] = None):
    """Initializes ClassicEventWriter.

    Arguments:
        lock: (optional) lock to exclusively access stdout.
            by default, it is None and it will use threading safe lock.
            if user would like to make the lock multiple-process safe, user should
            pass in multiprocessing.Lock() instead
    """
    if lock is None:
        self._lock = threading.Lock()
    else:
        self._lock = lock

create_event(data, time=None, index=None, host=None, source=None, sourcetype=None, fields=None, stanza=None, unbroken=False, done=False)

Create a new XMLEvent object.

Source code in solnlib/modular_input/event_writer.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def create_event(
    self,
    data: dict,
    time: float = None,
    index: str = None,
    host: str = None,
    source: str = None,
    sourcetype: str = None,
    fields: dict = None,
    stanza: str = None,
    unbroken: bool = False,
    done: bool = False,
):
    """Create a new XMLEvent object."""

    return XMLEvent(
        data,
        time=time,
        index=index,
        host=host,
        source=source,
        sourcetype=sourcetype,
        stanza=stanza,
        unbroken=unbroken,
        done=done,
    )

write_events(events)

Source code in solnlib/modular_input/event_writer.py
164
165
166
167
168
169
170
171
172
173
def write_events(self, events):
    if not events:
        return

    stdout = sys.stdout

    data = "".join([event for event in XMLEvent.format_events(events)])
    with self._lock:
        stdout.write(data)
        stdout.flush()

EventWriter

Base class of event writer.

Source code in solnlib/modular_input/event_writer.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class EventWriter(metaclass=ABCMeta):
    """Base class of event writer."""

    description = "EventWriter"

    @abstractmethod
    def create_event(
        self,
        data: dict,
        time: float = None,
        index: str = None,
        host: str = None,
        source: str = None,
        sourcetype: str = None,
        fields: dict = None,
        stanza: str = None,
        unbroken: bool = False,
        done: bool = False,
    ) -> Union[XMLEvent, HECEvent]:
        """Create a new event.

        Arguments:
            data: Event data.
            time: (optional) Event timestamp, default is None.
            index: (optional) The index event will be written to, default is None.
            host: (optional) Event host, default is None.
            source: (optional) Event source, default is None.
            sourcetype: (optional) Event sourcetype, default is None.
            fields: (optional) Event fields, default is None.
            stanza: (optional) Event stanza name, default is None.
            unbroken: (optional) Event unbroken flag, default is False.
                It is only meaningful when for XMLEvent when using ClassicEventWriter.
            done: (optional) The last unbroken event, default is False.
                It is only meaningful when for XMLEvent when using ClassicEventWriter.

        Examples:
           >>> ew = event_writer.HECEventWriter(...)
           >>> event = ew.create_event(
           >>>     data='This is a test data.',
           >>>     time='%.3f' % 1372274622.493,
           >>>     index='main',
           >>>     host='localhost',
           >>>     source='Splunk',
           >>>     sourcetype='misc',
           >>>     fields={'accountid': '603514901691', 'Cloud': u'AWS'},
           >>>     stanza='test_scheme://test',
           >>>     unbroken=True,
           >>>     done=True)
        """

        pass

    @abstractmethod
    def write_events(self, events: List):
        """Write events.

        Arguments:
            events: List of events to write.

        Examples:
           >>> from solnlib.modular_input import event_writer
           >>> ew = event_writer.EventWriter(...)
           >>> ew.write_events([event1, event2])
        """

        pass

description = 'EventWriter' class-attribute instance-attribute

create_event(data, time=None, index=None, host=None, source=None, sourcetype=None, fields=None, stanza=None, unbroken=False, done=False) abstractmethod

Create a new event.

Parameters:

Name Type Description Default
data dict

Event data.

required
time float

(optional) Event timestamp, default is None.

None
index str

(optional) The index event will be written to, default is None.

None
host str

(optional) Event host, default is None.

None
source str

(optional) Event source, default is None.

None
sourcetype str

(optional) Event sourcetype, default is None.

None
fields dict

(optional) Event fields, default is None.

None
stanza str

(optional) Event stanza name, default is None.

None
unbroken bool

(optional) Event unbroken flag, default is False. It is only meaningful when for XMLEvent when using ClassicEventWriter.

False
done bool

(optional) The last unbroken event, default is False. It is only meaningful when for XMLEvent when using ClassicEventWriter.

False

Examples:

>>> ew = event_writer.HECEventWriter(...)
>>> event = ew.create_event(
>>>     data='This is a test data.',
>>>     time='%.3f' % 1372274622.493,
>>>     index='main',
>>>     host='localhost',
>>>     source='Splunk',
>>>     sourcetype='misc',
>>>     fields={'accountid': '603514901691', 'Cloud': u'AWS'},
>>>     stanza='test_scheme://test',
>>>     unbroken=True,
>>>     done=True)
Source code in solnlib/modular_input/event_writer.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@abstractmethod
def create_event(
    self,
    data: dict,
    time: float = None,
    index: str = None,
    host: str = None,
    source: str = None,
    sourcetype: str = None,
    fields: dict = None,
    stanza: str = None,
    unbroken: bool = False,
    done: bool = False,
) -> Union[XMLEvent, HECEvent]:
    """Create a new event.

    Arguments:
        data: Event data.
        time: (optional) Event timestamp, default is None.
        index: (optional) The index event will be written to, default is None.
        host: (optional) Event host, default is None.
        source: (optional) Event source, default is None.
        sourcetype: (optional) Event sourcetype, default is None.
        fields: (optional) Event fields, default is None.
        stanza: (optional) Event stanza name, default is None.
        unbroken: (optional) Event unbroken flag, default is False.
            It is only meaningful when for XMLEvent when using ClassicEventWriter.
        done: (optional) The last unbroken event, default is False.
            It is only meaningful when for XMLEvent when using ClassicEventWriter.

    Examples:
       >>> ew = event_writer.HECEventWriter(...)
       >>> event = ew.create_event(
       >>>     data='This is a test data.',
       >>>     time='%.3f' % 1372274622.493,
       >>>     index='main',
       >>>     host='localhost',
       >>>     source='Splunk',
       >>>     sourcetype='misc',
       >>>     fields={'accountid': '603514901691', 'Cloud': u'AWS'},
       >>>     stanza='test_scheme://test',
       >>>     unbroken=True,
       >>>     done=True)
    """

    pass

write_events(events) abstractmethod

Write events.

Parameters:

Name Type Description Default
events List

List of events to write.

required

Examples:

>>> from solnlib.modular_input import event_writer
>>> ew = event_writer.EventWriter(...)
>>> ew.write_events([event1, event2])
Source code in solnlib/modular_input/event_writer.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@abstractmethod
def write_events(self, events: List):
    """Write events.

    Arguments:
        events: List of events to write.

    Examples:
       >>> from solnlib.modular_input import event_writer
       >>> ew = event_writer.EventWriter(...)
       >>> ew.write_events([event1, event2])
    """

    pass

HECEventWriter

Bases: EventWriter

HEC event writer.

Use Splunk HEC as the output.

Examples:

>>> from solnlib.modular_input import event_writer
>>> ew = event_writer.HECEventWriter(hec_input_name, session_key)
>>> ew.write_events([event1, event2])
Source code in solnlib/modular_input/event_writer.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
class HECEventWriter(EventWriter):
    """HEC event writer.

    Use Splunk HEC as the output.

    Examples:
        >>> from solnlib.modular_input import event_writer
        >>> ew = event_writer.HECEventWriter(hec_input_name, session_key)
        >>> ew.write_events([event1, event2])
    """

    WRITE_EVENT_RETRIES = 5
    HTTP_INPUT_CONFIG_ENDPOINT = "/servicesNS/nobody/splunk_httpinput/data/inputs/http"
    HTTP_EVENT_COLLECTOR_ENDPOINT = "/services/collector"
    TOO_MANY_REQUESTS = 429  # we exceeded rate limit
    SERVICE_UNAVAILABLE = 503  # remote service is temporary unavailable

    description = "HECEventWriter"

    headers = [("Content-Type", "application/json")]

    def __init__(
        self,
        hec_input_name: str,
        session_key: str,
        scheme: str = None,
        host: str = None,
        port: int = None,
        hec_uri: str = None,
        hec_token: str = None,
        global_settings_schema: bool = True,
        logger: logging.Logger = None,
        **context: dict
    ):
        """Initializes HECEventWriter.

        Arguments:
            hec_input_name: Splunk HEC input name.
            session_key: Splunk access token.
            scheme: (optional) The access scheme, default is None.
            host: (optional) The host name, default is None.
            port: (optional) The port number, default is None.
            hec_uri: (optional) If hec_uri and hec_token are provided, they will
                higher precedence than hec_input_name.
            hec_token: (optional) HEC token.
            global_settings_schema: (optional) if True, scheme will be set based on HEC global settings, default False.
            logger: Logger object.
            context: Other configurations for Splunk rest client.
        """
        super().__init__()
        self._session_key = session_key
        if logger:
            self.logger = logger
        else:
            self.logger = logging

        if hec_uri and hec_token:
            scheme, host, hec_port = utils.extract_http_scheme_host_port(hec_uri)
        else:
            if not all([scheme, host, port]):
                scheme, host, port = get_splunkd_access_info()
            hec_port, hec_token = self._get_hec_config(
                hec_input_name, session_key, scheme, host, port, **context
            )

        if global_settings_schema:
            scheme = get_scheme_from_hec_settings()

        if not context.get("pool_connections"):
            context["pool_connections"] = 10

        if not context.get("pool_maxsize"):
            context["pool_maxsize"] = 10

        self._rest_client = rest_client.SplunkRestClient(
            hec_token, app="-", scheme=scheme, host=host, port=hec_port, **context
        )

    @staticmethod
    def create_from_token(
        hec_uri: str,
        hec_token: str,
        global_settings_schema: bool = False,
        **context: dict
    ) -> "HECEventWriter":
        """Given HEC URI and HEC token, create HECEventWriter object. This
        function simplifies the standalone mode HECEventWriter usage (not in a
        modinput).

        Arguments:
            hec_uri: HTTP Event Collector URI, like https://localhost:8088.
            hec_token: HTTP Event Collector token.
            global_settings_schema: (optional) if True, scheme will be set based on HEC global settings, default False.
            context: Other configurations.

        Returns:
            Created HECEventWriter.
        """

        return HECEventWriter(
            None,
            None,
            None,
            None,
            None,
            hec_uri=hec_uri,
            hec_token=hec_token,
            global_settings_schema=global_settings_schema,
            **context
        )

    @staticmethod
    def create_from_input(
        hec_input_name: str,
        splunkd_uri: str,
        session_key: str,
        global_settings_schema: bool = False,
        **context: dict
    ) -> "HECEventWriter":
        """Given HEC input stanza name, splunkd URI and splunkd session key,
        create HECEventWriter object. HEC URI and token etc will be discovered
        from HEC input stanza. When hitting HEC event limit, the underlying
        code will increase the HEC event limit automatically by calling
        corresponding REST API against splunkd_uri by using session_key.

        Arguments:
            hec_input_name: Splunk HEC input name.
            splunkd_uri: Splunkd URI, like https://localhost:8089
            session_key: Splunkd access token.
            global_settings_schema: (optional) if True, scheme will be set based on HEC global settings, default False.
            context: Other configurations.

        Returns:
            Created HECEventWriter.
        """

        scheme, host, port = utils.extract_http_scheme_host_port(splunkd_uri)
        return HECEventWriter(
            hec_input_name,
            session_key,
            scheme,
            host,
            port,
            global_settings_schema=global_settings_schema,
            **context
        )

    @staticmethod
    def create_from_token_with_session_key(
        splunkd_uri: str,
        session_key: str,
        hec_uri: str,
        hec_token: str,
        global_settings_schema: bool = False,
        **context: dict
    ) -> "HECEventWriter":
        """Given Splunkd URI, Splunkd session key, HEC URI and HEC token,
        create HECEventWriter object. When hitting HEC event limit, the event
        writer will increase the HEC event limit automatically by calling
        corresponding REST API against splunkd_uri by using session_key.

        Arguments:
            splunkd_uri: Splunkd URI, like https://localhost:8089.
            session_key: Splunkd access token.
            hec_uri: Http Event Collector URI, like https://localhost:8088.
            hec_token: Http Event Collector token.
            global_settings_schema: (optional) if True, scheme will be set based on HEC global settings, default False.
            context: Other configurations.

        Returns:
            Created HECEventWriter.
        """

        scheme, host, port = utils.extract_http_scheme_host_port(splunkd_uri)
        return HECEventWriter(
            None,
            session_key,
            scheme,
            host,
            port,
            hec_uri=hec_uri,
            hec_token=hec_token,
            global_settings_schema=global_settings_schema,
            **context
        )

    @retry(exceptions=[binding.HTTPError])
    def _get_hec_config(
        self, hec_input_name, session_key, scheme, host, port, **context
    ):
        hc = HECConfig(session_key, scheme=scheme, host=host, port=port, **context)
        settings = hc.get_settings()
        if utils.is_true(settings.get("disabled")):
            # Enable HEC input
            self.logger.info("Enabling HEC")
            settings["disabled"] = "0"
            settings["enableSSL"] = context.get("hec_enablessl", "1")
            settings["port"] = context.get("hec_port", "8088")
            hc.update_settings(settings)

        hec_input = hc.get_input(hec_input_name)
        if not hec_input:
            # Create HEC input
            self.logger.info("Create HEC datainput, name=%s", hec_input_name)
            hinput = {
                "index": context.get("index", "main"),
            }

            if context.get("sourcetype"):
                hinput["sourcetype"] = context["sourcetype"]

            if context.get("token"):
                hinput["token"] = context["token"]

            if context.get("source"):
                hinput["source"] = context["source"]

            if context.get("host"):
                hinput["host"] = context["host"]

            hec_input = hc.create_input(hec_input_name, hinput)

        limits = hc.get_limits()
        HECEvent.max_hec_event_length = int(limits.get("max_content_length", 1000000))

        return settings["port"], hec_input["token"]

    def create_event(
        self,
        data: dict,
        time: float = None,
        index: str = None,
        host: str = None,
        source: str = None,
        sourcetype: str = None,
        fields: dict = None,
        stanza: str = None,
        unbroken: bool = False,
        done: bool = False,
    ) -> HECEvent:
        """Create a new HECEvent object.

        Arguments:
            data: Event data.
            time: (optional) Event timestamp, default is None.
            index: (optional) The index event will be written to, default is None.
            host: (optional) Event host, default is None.
            source: (optional) Event source, default is None.
            sourcetype: (optional) Event sourcetype, default is None.
            fields: (optional) Event fields, default is None.
            stanza: (optional) Event stanza name, default is None.
            unbroken: (optional) Event unbroken flag, default is False.
                It is only meaningful when for XMLEvent when using ClassicEventWriter.
            done: (optional) The last unbroken event, default is False.
                It is only meaningful when for XMLEvent when using ClassicEventWriter.

        Returns:
            Created HECEvent.
        """

        return HECEvent(
            data,
            time=time,
            index=index,
            host=host,
            source=source,
            sourcetype=sourcetype,
            fields=fields,
        )

    def write_events(
        self,
        events: List,
        retries: int = WRITE_EVENT_RETRIES,
        event_field: str = "event",
    ):
        """Write events to index in bulk.

        Arguments:
            events: List of events.
            retries: Number of retries for writing events to index.
            event_field: Event field.
        """
        if not events:
            return

        last_ex = None
        for event in HECEvent.format_events(events, event_field):
            for i in range(retries):
                try:
                    self._rest_client.post(
                        self.HTTP_EVENT_COLLECTOR_ENDPOINT,
                        body=event.encode("utf-8"),
                        headers=self.headers,
                    )
                except binding.HTTPError as e:
                    self.logger.warn(
                        "Write events through HEC failed. Status=%s", e.status
                    )
                    last_ex = e
                    if e.status in [self.TOO_MANY_REQUESTS, self.SERVICE_UNAVAILABLE]:
                        # wait time for n retries: 10, 20, 40, 80, 80, 80, 80, ....
                        sleep_time = min(((2 ** (i + 1)) * 5), 80)
                        if i < retries - 1:
                            random_millisecond = randint(0, 1000) / 1000.0
                            time.sleep(sleep_time + random_millisecond)
                    else:
                        raise last_ex
                else:
                    break
            else:
                # When failed after retry, we reraise the exception
                # to exit the function to let client handle this situation
                self.logger.error(
                    "Write events through HEC failed: %s. status=%s",
                    traceback.format_exc(),
                    last_ex.status,
                )
                raise last_ex

HTTP_EVENT_COLLECTOR_ENDPOINT = '/services/collector' class-attribute instance-attribute

HTTP_INPUT_CONFIG_ENDPOINT = '/servicesNS/nobody/splunk_httpinput/data/inputs/http' class-attribute instance-attribute

SERVICE_UNAVAILABLE = 503 class-attribute instance-attribute

TOO_MANY_REQUESTS = 429 class-attribute instance-attribute

WRITE_EVENT_RETRIES = 5 class-attribute instance-attribute

description = 'HECEventWriter' class-attribute instance-attribute

headers = [('Content-Type', 'application/json')] class-attribute instance-attribute

logger = logger instance-attribute

__init__(hec_input_name, session_key, scheme=None, host=None, port=None, hec_uri=None, hec_token=None, global_settings_schema=True, logger=None, **context)

Initializes HECEventWriter.

Parameters:

Name Type Description Default
hec_input_name str

Splunk HEC input name.

required
session_key str

Splunk access token.

required
scheme str

(optional) The access scheme, default is None.

None
host str

(optional) The host name, default is None.

None
port int

(optional) The port number, default is None.

None
hec_uri str

(optional) If hec_uri and hec_token are provided, they will higher precedence than hec_input_name.

None
hec_token str

(optional) HEC token.

None
global_settings_schema bool

(optional) if True, scheme will be set based on HEC global settings, default False.

True
logger logging.Logger

Logger object.

None
context dict

Other configurations for Splunk rest client.

{}
Source code in solnlib/modular_input/event_writer.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def __init__(
    self,
    hec_input_name: str,
    session_key: str,
    scheme: str = None,
    host: str = None,
    port: int = None,
    hec_uri: str = None,
    hec_token: str = None,
    global_settings_schema: bool = True,
    logger: logging.Logger = None,
    **context: dict
):
    """Initializes HECEventWriter.

    Arguments:
        hec_input_name: Splunk HEC input name.
        session_key: Splunk access token.
        scheme: (optional) The access scheme, default is None.
        host: (optional) The host name, default is None.
        port: (optional) The port number, default is None.
        hec_uri: (optional) If hec_uri and hec_token are provided, they will
            higher precedence than hec_input_name.
        hec_token: (optional) HEC token.
        global_settings_schema: (optional) if True, scheme will be set based on HEC global settings, default False.
        logger: Logger object.
        context: Other configurations for Splunk rest client.
    """
    super().__init__()
    self._session_key = session_key
    if logger:
        self.logger = logger
    else:
        self.logger = logging

    if hec_uri and hec_token:
        scheme, host, hec_port = utils.extract_http_scheme_host_port(hec_uri)
    else:
        if not all([scheme, host, port]):
            scheme, host, port = get_splunkd_access_info()
        hec_port, hec_token = self._get_hec_config(
            hec_input_name, session_key, scheme, host, port, **context
        )

    if global_settings_schema:
        scheme = get_scheme_from_hec_settings()

    if not context.get("pool_connections"):
        context["pool_connections"] = 10

    if not context.get("pool_maxsize"):
        context["pool_maxsize"] = 10

    self._rest_client = rest_client.SplunkRestClient(
        hec_token, app="-", scheme=scheme, host=host, port=hec_port, **context
    )

create_event(data, time=None, index=None, host=None, source=None, sourcetype=None, fields=None, stanza=None, unbroken=False, done=False)

Create a new HECEvent object.

Parameters:

Name Type Description Default
data dict

Event data.

required
time float

(optional) Event timestamp, default is None.

None
index str

(optional) The index event will be written to, default is None.

None
host str

(optional) Event host, default is None.

None
source str

(optional) Event source, default is None.

None
sourcetype str

(optional) Event sourcetype, default is None.

None
fields dict

(optional) Event fields, default is None.

None
stanza str

(optional) Event stanza name, default is None.

None
unbroken bool

(optional) Event unbroken flag, default is False. It is only meaningful when for XMLEvent when using ClassicEventWriter.

False
done bool

(optional) The last unbroken event, default is False. It is only meaningful when for XMLEvent when using ClassicEventWriter.

False

Returns:

Type Description
HECEvent

Created HECEvent.

Source code in solnlib/modular_input/event_writer.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
def create_event(
    self,
    data: dict,
    time: float = None,
    index: str = None,
    host: str = None,
    source: str = None,
    sourcetype: str = None,
    fields: dict = None,
    stanza: str = None,
    unbroken: bool = False,
    done: bool = False,
) -> HECEvent:
    """Create a new HECEvent object.

    Arguments:
        data: Event data.
        time: (optional) Event timestamp, default is None.
        index: (optional) The index event will be written to, default is None.
        host: (optional) Event host, default is None.
        source: (optional) Event source, default is None.
        sourcetype: (optional) Event sourcetype, default is None.
        fields: (optional) Event fields, default is None.
        stanza: (optional) Event stanza name, default is None.
        unbroken: (optional) Event unbroken flag, default is False.
            It is only meaningful when for XMLEvent when using ClassicEventWriter.
        done: (optional) The last unbroken event, default is False.
            It is only meaningful when for XMLEvent when using ClassicEventWriter.

    Returns:
        Created HECEvent.
    """

    return HECEvent(
        data,
        time=time,
        index=index,
        host=host,
        source=source,
        sourcetype=sourcetype,
        fields=fields,
    )

create_from_input(hec_input_name, splunkd_uri, session_key, global_settings_schema=False, **context) staticmethod

Given HEC input stanza name, splunkd URI and splunkd session key, create HECEventWriter object. HEC URI and token etc will be discovered from HEC input stanza. When hitting HEC event limit, the underlying code will increase the HEC event limit automatically by calling corresponding REST API against splunkd_uri by using session_key.

Parameters:

Name Type Description Default
hec_input_name str

Splunk HEC input name.

required
splunkd_uri str

Splunkd URI, like https://localhost:8089

required
session_key str

Splunkd access token.

required
global_settings_schema bool

(optional) if True, scheme will be set based on HEC global settings, default False.

False
context dict

Other configurations.

{}

Returns:

Type Description
HECEventWriter

Created HECEventWriter.

Source code in solnlib/modular_input/event_writer.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
@staticmethod
def create_from_input(
    hec_input_name: str,
    splunkd_uri: str,
    session_key: str,
    global_settings_schema: bool = False,
    **context: dict
) -> "HECEventWriter":
    """Given HEC input stanza name, splunkd URI and splunkd session key,
    create HECEventWriter object. HEC URI and token etc will be discovered
    from HEC input stanza. When hitting HEC event limit, the underlying
    code will increase the HEC event limit automatically by calling
    corresponding REST API against splunkd_uri by using session_key.

    Arguments:
        hec_input_name: Splunk HEC input name.
        splunkd_uri: Splunkd URI, like https://localhost:8089
        session_key: Splunkd access token.
        global_settings_schema: (optional) if True, scheme will be set based on HEC global settings, default False.
        context: Other configurations.

    Returns:
        Created HECEventWriter.
    """

    scheme, host, port = utils.extract_http_scheme_host_port(splunkd_uri)
    return HECEventWriter(
        hec_input_name,
        session_key,
        scheme,
        host,
        port,
        global_settings_schema=global_settings_schema,
        **context
    )

create_from_token(hec_uri, hec_token, global_settings_schema=False, **context) staticmethod

Given HEC URI and HEC token, create HECEventWriter object. This function simplifies the standalone mode HECEventWriter usage (not in a modinput).

Parameters:

Name Type Description Default
hec_uri str

HTTP Event Collector URI, like https://localhost:8088.

required
hec_token str

HTTP Event Collector token.

required
global_settings_schema bool

(optional) if True, scheme will be set based on HEC global settings, default False.

False
context dict

Other configurations.

{}

Returns:

Type Description
HECEventWriter

Created HECEventWriter.

Source code in solnlib/modular_input/event_writer.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
@staticmethod
def create_from_token(
    hec_uri: str,
    hec_token: str,
    global_settings_schema: bool = False,
    **context: dict
) -> "HECEventWriter":
    """Given HEC URI and HEC token, create HECEventWriter object. This
    function simplifies the standalone mode HECEventWriter usage (not in a
    modinput).

    Arguments:
        hec_uri: HTTP Event Collector URI, like https://localhost:8088.
        hec_token: HTTP Event Collector token.
        global_settings_schema: (optional) if True, scheme will be set based on HEC global settings, default False.
        context: Other configurations.

    Returns:
        Created HECEventWriter.
    """

    return HECEventWriter(
        None,
        None,
        None,
        None,
        None,
        hec_uri=hec_uri,
        hec_token=hec_token,
        global_settings_schema=global_settings_schema,
        **context
    )

create_from_token_with_session_key(splunkd_uri, session_key, hec_uri, hec_token, global_settings_schema=False, **context) staticmethod

Given Splunkd URI, Splunkd session key, HEC URI and HEC token, create HECEventWriter object. When hitting HEC event limit, the event writer will increase the HEC event limit automatically by calling corresponding REST API against splunkd_uri by using session_key.

Parameters:

Name Type Description Default
splunkd_uri str

Splunkd URI, like https://localhost:8089.

required
session_key str

Splunkd access token.

required
hec_uri str

Http Event Collector URI, like https://localhost:8088.

required
hec_token str

Http Event Collector token.

required
global_settings_schema bool

(optional) if True, scheme will be set based on HEC global settings, default False.

False
context dict

Other configurations.

{}

Returns:

Type Description
HECEventWriter

Created HECEventWriter.

Source code in solnlib/modular_input/event_writer.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
@staticmethod
def create_from_token_with_session_key(
    splunkd_uri: str,
    session_key: str,
    hec_uri: str,
    hec_token: str,
    global_settings_schema: bool = False,
    **context: dict
) -> "HECEventWriter":
    """Given Splunkd URI, Splunkd session key, HEC URI and HEC token,
    create HECEventWriter object. When hitting HEC event limit, the event
    writer will increase the HEC event limit automatically by calling
    corresponding REST API against splunkd_uri by using session_key.

    Arguments:
        splunkd_uri: Splunkd URI, like https://localhost:8089.
        session_key: Splunkd access token.
        hec_uri: Http Event Collector URI, like https://localhost:8088.
        hec_token: Http Event Collector token.
        global_settings_schema: (optional) if True, scheme will be set based on HEC global settings, default False.
        context: Other configurations.

    Returns:
        Created HECEventWriter.
    """

    scheme, host, port = utils.extract_http_scheme_host_port(splunkd_uri)
    return HECEventWriter(
        None,
        session_key,
        scheme,
        host,
        port,
        hec_uri=hec_uri,
        hec_token=hec_token,
        global_settings_schema=global_settings_schema,
        **context
    )

write_events(events, retries=WRITE_EVENT_RETRIES, event_field='event')

Write events to index in bulk.

Parameters:

Name Type Description Default
events List

List of events.

required
retries int

Number of retries for writing events to index.

WRITE_EVENT_RETRIES
event_field str

Event field.

'event'
Source code in solnlib/modular_input/event_writer.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def write_events(
    self,
    events: List,
    retries: int = WRITE_EVENT_RETRIES,
    event_field: str = "event",
):
    """Write events to index in bulk.

    Arguments:
        events: List of events.
        retries: Number of retries for writing events to index.
        event_field: Event field.
    """
    if not events:
        return

    last_ex = None
    for event in HECEvent.format_events(events, event_field):
        for i in range(retries):
            try:
                self._rest_client.post(
                    self.HTTP_EVENT_COLLECTOR_ENDPOINT,
                    body=event.encode("utf-8"),
                    headers=self.headers,
                )
            except binding.HTTPError as e:
                self.logger.warn(
                    "Write events through HEC failed. Status=%s", e.status
                )
                last_ex = e
                if e.status in [self.TOO_MANY_REQUESTS, self.SERVICE_UNAVAILABLE]:
                    # wait time for n retries: 10, 20, 40, 80, 80, 80, 80, ....
                    sleep_time = min(((2 ** (i + 1)) * 5), 80)
                    if i < retries - 1:
                        random_millisecond = randint(0, 1000) / 1000.0
                        time.sleep(sleep_time + random_millisecond)
                else:
                    raise last_ex
            else:
                break
        else:
            # When failed after retry, we reraise the exception
            # to exit the function to let client handle this situation
            self.logger.error(
                "Write events through HEC failed: %s. status=%s",
                traceback.format_exc(),
                last_ex.status,
            )
            raise last_ex