Skip to content

event.py

This module provides Splunk modular input event encapsulation.

__all__ = ['EventException', 'XMLEvent', 'HECEvent'] module-attribute

Event

Base class of modular input event.

Source code in solnlib/modular_input/event.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class Event:
    """Base class of modular input event."""

    def __init__(
        self,
        data: dict,
        time: float = None,
        index: str = None,
        host: str = None,
        source: str = None,
        sourcetype: str = None,
        fields: dict = None,
        stanza: str = None,
        unbroken: bool = False,
        done: bool = False,
    ):
        """Modular input event.

        Arguments:
            data: Event data.
            time: (optional) Event timestamp, default is None.
            index: (optional) The index event will be written to, default is None.
            host: (optional) Event host, default is None.
            source: (optional) Event source, default is None.
            sourcetype: (optional) Event sourcetype, default is None.
            fields: (optional) Event fields, default is None.
            stanza: (optional) Event stanza name, default is None.
            unbroken: (optional) Event unbroken flag, default is False.
            done: (optional) The last unbroken event, default is False.

        Examples:
           >>> event = Event(
           >>>     data='This is a test data.',
           >>>     time=1372274622.493,
           >>>     index='main',
           >>>     host='localhost',
           >>>     source='Splunk',
           >>>     sourcetype='misc',
           >>>     fields= {'Cloud':'AWS','region': 'us-west-1'},
           >>>     stanza='test_scheme://test',
           >>>     unbroken=True,
           >>>     done=True)
        """

        self._data = data
        self._time = "%.3f" % time if time else None
        self._index = index
        self._host = host
        self._source = source
        self._sourcetype = sourcetype
        if fields:
            self._fields = fields
        self._stanza = stanza
        if not unbroken and done:
            raise EventException('Invalid combination of "unbroken" and "done".')
        self._unbroken = unbroken
        self._done = done

    def __str__(self):
        event = {
            "data": self._data,
            "time": float(self._time) if self._time else self._time,
            "index": self._index,
            "host": self._host,
            "source": self._source,
            "sourcetype": self._sourcetype,
            "stanza": self._stanza,
            "unbroken": self._unbroken,
            "done": self._done,
        }

        if hasattr(self, "_fields"):
            event["fields"] = self._fields

        return json.dumps(event)

    @classmethod
    def format_events(cls, events: List) -> List:
        """Format events to list of string.

        Arguments:
            events: List of events to format.

        Returns:
            List of formatted events string.
        """

        raise EventException('Unimplemented "format_events".')

__init__(data, time=None, index=None, host=None, source=None, sourcetype=None, fields=None, stanza=None, unbroken=False, done=False)

Modular input event.

Parameters:

Name Type Description Default
data dict

Event data.

required
time float

(optional) Event timestamp, default is None.

None
index str

(optional) The index event will be written to, default is None.

None
host str

(optional) Event host, default is None.

None
source str

(optional) Event source, default is None.

None
sourcetype str

(optional) Event sourcetype, default is None.

None
fields dict

(optional) Event fields, default is None.

None
stanza str

(optional) Event stanza name, default is None.

None
unbroken bool

(optional) Event unbroken flag, default is False.

False
done bool

(optional) The last unbroken event, default is False.

False

Examples:

>>> event = Event(
>>>     data='This is a test data.',
>>>     time=1372274622.493,
>>>     index='main',
>>>     host='localhost',
>>>     source='Splunk',
>>>     sourcetype='misc',
>>>     fields= {'Cloud':'AWS','region': 'us-west-1'},
>>>     stanza='test_scheme://test',
>>>     unbroken=True,
>>>     done=True)
Source code in solnlib/modular_input/event.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __init__(
    self,
    data: dict,
    time: float = None,
    index: str = None,
    host: str = None,
    source: str = None,
    sourcetype: str = None,
    fields: dict = None,
    stanza: str = None,
    unbroken: bool = False,
    done: bool = False,
):
    """Modular input event.

    Arguments:
        data: Event data.
        time: (optional) Event timestamp, default is None.
        index: (optional) The index event will be written to, default is None.
        host: (optional) Event host, default is None.
        source: (optional) Event source, default is None.
        sourcetype: (optional) Event sourcetype, default is None.
        fields: (optional) Event fields, default is None.
        stanza: (optional) Event stanza name, default is None.
        unbroken: (optional) Event unbroken flag, default is False.
        done: (optional) The last unbroken event, default is False.

    Examples:
       >>> event = Event(
       >>>     data='This is a test data.',
       >>>     time=1372274622.493,
       >>>     index='main',
       >>>     host='localhost',
       >>>     source='Splunk',
       >>>     sourcetype='misc',
       >>>     fields= {'Cloud':'AWS','region': 'us-west-1'},
       >>>     stanza='test_scheme://test',
       >>>     unbroken=True,
       >>>     done=True)
    """

    self._data = data
    self._time = "%.3f" % time if time else None
    self._index = index
    self._host = host
    self._source = source
    self._sourcetype = sourcetype
    if fields:
        self._fields = fields
    self._stanza = stanza
    if not unbroken and done:
        raise EventException('Invalid combination of "unbroken" and "done".')
    self._unbroken = unbroken
    self._done = done

__str__()

Source code in solnlib/modular_input/event.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def __str__(self):
    event = {
        "data": self._data,
        "time": float(self._time) if self._time else self._time,
        "index": self._index,
        "host": self._host,
        "source": self._source,
        "sourcetype": self._sourcetype,
        "stanza": self._stanza,
        "unbroken": self._unbroken,
        "done": self._done,
    }

    if hasattr(self, "_fields"):
        event["fields"] = self._fields

    return json.dumps(event)

format_events(events) classmethod

Format events to list of string.

Parameters:

Name Type Description Default
events List

List of events to format.

required

Returns:

Type Description
List

List of formatted events string.

Source code in solnlib/modular_input/event.py
108
109
110
111
112
113
114
115
116
117
118
119
@classmethod
def format_events(cls, events: List) -> List:
    """Format events to list of string.

    Arguments:
        events: List of events to format.

    Returns:
        List of formatted events string.
    """

    raise EventException('Unimplemented "format_events".')

EventException

Bases: Exception

Source code in solnlib/modular_input/event.py
28
29
class EventException(Exception):
    pass

HECEvent

Bases: Event

HEC event.

Source code in solnlib/modular_input/event.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class HECEvent(Event):
    """HEC event."""

    max_hec_event_length = 1000000

    def _to_hec(self, event_field):
        event = {}
        event[event_field] = self._data
        if self._time:
            event["time"] = float(self._time)
        if self._index:
            event["index"] = self._index
        if self._host:
            event["host"] = self._host
        if self._source:
            event["source"] = self._source
        if self._sourcetype:
            event["sourcetype"] = self._sourcetype
        if hasattr(self, "_fields"):
            event["fields"] = self._fields

        return json.dumps(event, ensure_ascii=False)

    @classmethod
    def format_events(cls, events: List, event_field: str = "event") -> List:
        """Format events to list of string.

        Arguments:
            events: List of events to format.
            event_field: Event field.

        Returns:
            List of formatted events string, example::

                [
                    '{"index": "main", ... "event": {"kk": [1, 2, 3]}}\\n
                    {"index": "main", ... "event": {"kk": [3, 2, 3]}}',
                '...'
                ]
        """

        size = 0
        new_events, batched_events = [], []
        events = [event._to_hec(event_field) for event in events]
        for event in events:
            new_length = size + len(event) + len(batched_events) - 1
            if new_length >= cls.max_hec_event_length:
                if batched_events:
                    new_events.append("\n".join(batched_events))
                del batched_events[:]
                size = 0

            batched_events.append(event)
            size = size + len(event)
        if batched_events:
            new_events.append("\n".join(batched_events))

        return new_events

max_hec_event_length = 1000000 class-attribute instance-attribute

format_events(events, event_field='event') classmethod

Format events to list of string.

Parameters:

Name Type Description Default
events List

List of events to format.

required
event_field str

Event field.

'event'

Returns:

Type Description
List

List of formatted events string, example::

[ ‘{“index”: “main”, … “event”: {“kk”: [1, 2, 3]}}\n {“index”: “main”, … “event”: {“kk”: [3, 2, 3]}}’, ‘…’ ]

Source code in solnlib/modular_input/event.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
@classmethod
def format_events(cls, events: List, event_field: str = "event") -> List:
    """Format events to list of string.

    Arguments:
        events: List of events to format.
        event_field: Event field.

    Returns:
        List of formatted events string, example::

            [
                '{"index": "main", ... "event": {"kk": [1, 2, 3]}}\\n
                {"index": "main", ... "event": {"kk": [3, 2, 3]}}',
            '...'
            ]
    """

    size = 0
    new_events, batched_events = [], []
    events = [event._to_hec(event_field) for event in events]
    for event in events:
        new_length = size + len(event) + len(batched_events) - 1
        if new_length >= cls.max_hec_event_length:
            if batched_events:
                new_events.append("\n".join(batched_events))
            del batched_events[:]
            size = 0

        batched_events.append(event)
        size = size + len(event)
    if batched_events:
        new_events.append("\n".join(batched_events))

    return new_events

XMLEvent

Bases: Event

XML event.

Source code in solnlib/modular_input/event.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class XMLEvent(Event):
    """XML event."""

    def _to_xml(self):
        _event = ET.Element("event")
        if self._stanza:
            _event.set("stanza", self._stanza)
        if self._unbroken:
            _event.set("unbroken", str(int(self._unbroken)))

        if self._time:
            ET.SubElement(_event, "time").text = self._time

        sub_elements = [
            ("index", self._index),
            ("host", self._host),
            ("source", self._source),
            ("sourcetype", self._sourcetype),
        ]
        for node, value in sub_elements:
            if value:
                ET.SubElement(_event, node).text = value

        if isinstance(self._data, str):
            ET.SubElement(_event, "data").text = self._data
        else:
            ET.SubElement(_event, "data").text = json.dumps(self._data)

        if self._done:
            ET.SubElement(_event, "done")

        return _event

    @classmethod
    def format_events(cls, events: List) -> List:
        """Format events to list of string.

        Arguments:
            events: List of events to format.

        Returns:
            List of formatted events string, example::

                [
                    '<stream>
                    <event stanza="test_scheme://test" unbroken="1">
                    <time>1459919070.994</time>
                    <index>main</index>
                    <host>localhost</host>
                    <source>test</source>
                    <sourcetype>test</sourcetype>
                    <data>{"kk": [1, 2, 3]}</data>
                    <done />
                    </event>
                    <event stanza="test_scheme://test" unbroken="1">
                    <time>1459919082.961</time>
                    <index>main</index>
                    <host>localhost</host>
                    <source>test</source>
                    <sourcetype>test</sourcetype>
                    <data>{"kk": [3, 2, 3]}</data>
                    <done />
                    </event>
                    </stream>'
                ]
        """

        stream = ET.Element("stream")
        for event in events:
            stream.append(event._to_xml())

        return [
            defused_et.tostring(stream, encoding="utf-8", method="xml").decode("utf-8")
        ]

format_events(events) classmethod

Format events to list of string.

Parameters:

Name Type Description Default
events List

List of events to format.

required

Returns:

Type Description
List

List of formatted events string, example::

[ ‘ main localhost test test {“kk”: [1, 2, 3]} main localhost test test {“kk”: [3, 2, 3]} ’ ]

Source code in solnlib/modular_input/event.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@classmethod
def format_events(cls, events: List) -> List:
    """Format events to list of string.

    Arguments:
        events: List of events to format.

    Returns:
        List of formatted events string, example::

            [
                '<stream>
                <event stanza="test_scheme://test" unbroken="1">
                <time>1459919070.994</time>
                <index>main</index>
                <host>localhost</host>
                <source>test</source>
                <sourcetype>test</sourcetype>
                <data>{"kk": [1, 2, 3]}</data>
                <done />
                </event>
                <event stanza="test_scheme://test" unbroken="1">
                <time>1459919082.961</time>
                <index>main</index>
                <host>localhost</host>
                <source>test</source>
                <sourcetype>test</sourcetype>
                <data>{"kk": [3, 2, 3]}</data>
                <done />
                </event>
                </stream>'
            ]
    """

    stream = ET.Element("stream")
    for event in events:
        stream.append(event._to_xml())

    return [
        defused_et.tostring(stream, encoding="utf-8", method="xml").decode("utf-8")
    ]