Skip to content

splunkenv.py

Splunk platform related utilities.

APP_HEC = 'splunk_httpinput' module-attribute

APP_SYSTEM = 'system' module-attribute

ETC_LEAF = 'etc' module-attribute

__all__ = ['make_splunkhome_path', 'get_splunk_host_info', 'get_splunk_bin', 'get_splunkd_access_info', 'get_scheme_from_hec_settings', 'get_splunkd_uri', 'get_conf_key_value', 'get_conf_stanza', 'get_conf_stanzas'] module-attribute

SessionKeyNotFound

Bases: Exception

Source code in solnlib/splunkenv.py
73
74
class SessionKeyNotFound(Exception):
    pass

getSessionKey(*args, **kwargs)

Source code in solnlib/splunkenv.py
44
45
def getSessionKey(*args, **kwargs):
    raise ImportError("This module requires Splunk to be installed.")

get_conf_key_value(conf_name, stanza, key, app_name, session_key=None, user='nobody', raw_output=False)

Get value of key of stanza in conf_name.

Parameters:

Name Type Description Default
conf_name str

Config file.

required
stanza str

Stanza name.

required
key str

Key name in the stanza.

required
app_name str

Application name. To make a call to global context use ‘-’ as app_name and set raw_output=True. In that case manual parsing is needed as response may be the list with multiple entries.

required
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None
user str

used for set user context in API call. Optional.

'nobody'
raw_output Optional[bool]

if ‘true’ full, decoded response in json format will be returned. It should be set to True when app_name is a global context ‘/-/’. In that case splunk API may return multiple entries.

False

Returns:

Type Description
Union[str, List, dict]

Config value.

Raises:

Type Description
KeyError

If stanza or key doesn’t exist.

Source code in solnlib/splunkenv.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def get_conf_key_value(
    conf_name: str,
    stanza: str,
    key: str,
    app_name: str,
    session_key: Optional[str] = None,
    user: str = "nobody",
    raw_output: Optional[bool] = False,
) -> Union[str, List, dict]:
    """Get value of `key` of `stanza` in `conf_name`.

    Arguments:
        conf_name: Config file.
        stanza: Stanza name.
        key: Key name in the stanza.
        app_name: Application name. To make a call to global context use '-' as app_name and set raw_output=True.
            In that case manual parsing is needed as response may be the list with multiple entries.
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
        user: used for set user context in API call. Optional.
        raw_output: if 'true' full, decoded response in json format will be returned. It should be set to True when
            app_name is a global context '/-/'. In that case splunk API may return multiple entries.

    Returns:
        Config value.

    Raises:
        KeyError: If `stanza` or `key` doesn't exist.
    """

    if use_btool:
        app = None if app_name == "-" else app_name
        stanzas = get_conf_stanzas(conf_name, app)
        return stanzas[stanza][key]

    stanzas = _get_conf_stanzas_from_splunk_api(
        conf_name, app_name, session_key=session_key, user=user, stanza=stanza
    )

    if raw_output:
        return stanzas

    stanza = stanzas.get("entry")[0].get("content")
    requested_key = stanza[key]
    return requested_key

get_conf_stanza(conf_name, stanza, app_name, session_key=None, user='nobody', raw_output=False)

Get stanza in conf_name.

Parameters:

Name Type Description Default
conf_name str

Config file.

required
stanza str

Stanza name.

required
app_name str

Application name. To make a call to global context use ‘-’ as app_name and set raw_output=True. In that case manual parsing is needed as response may be the list with multiple entries.

required
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None
user str

used for set user context in API call. Optional.

'nobody'
raw_output Optional[bool]

if ‘true’ full, decoded response in json format will be returned. It should be set to True when app_name is a global context ‘/-/’. In that case splunk API may return multiple entries.

False

Returns:

Type Description
dict

Config stanza.

Raises:

Type Description
KeyError

If stanza doesn’t exist.

Source code in solnlib/splunkenv.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def get_conf_stanza(
    conf_name: str,
    stanza: str,
    app_name: str,
    session_key: Optional[str] = None,
    user: str = "nobody",
    raw_output: Optional[bool] = False,
) -> dict:
    """Get `stanza` in `conf_name`.

    Arguments:
        conf_name: Config file.
        stanza: Stanza name.
        app_name: Application name. To make a call to global context use '-' as app_name and set raw_output=True.
            In that case manual parsing is needed as response may be the list with multiple entries.
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
        user: used for set user context in API call. Optional.
        raw_output: if 'true' full, decoded response in json format will be returned. It should be set to True when
            app_name is a global context '/-/'. In that case splunk API may return multiple entries.

    Returns:
        Config stanza.

    Raises:
         KeyError: If stanza doesn't exist.
    """

    if use_btool:
        app = None if app_name == "-" else app_name
        stanzas = get_conf_stanzas(conf_name, app)
        return stanzas[stanza]

    stanzas = _get_conf_stanzas_from_splunk_api(
        conf_name, app_name, session_key=session_key, user=user, stanza=stanza
    )

    if raw_output:
        return stanzas

    stanza = stanzas.get("entry")[0].get("content")
    return stanza

get_conf_stanzas(conf_name, app_name=None)

Get stanzas of conf_name

Parameters:

Name Type Description Default
conf_name str

Config file.

required
app_name Optional[str]

Application name. Optional.

None

Returns:

Type Description
dict

Config stanzas.

Examples:

>>> stanzas = get_conf_stanzas('server')
>>> return: {'serverName': 'testServer', 'sessionTimeout': '1h', ...}
Source code in solnlib/splunkenv.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def get_conf_stanzas(conf_name: str, app_name: Optional[str] = None) -> dict:
    """Get stanzas of `conf_name`

    Arguments:
        conf_name: Config file.
        app_name: Application name. Optional.

    Returns:
        Config stanzas.

    Examples:
       >>> stanzas = get_conf_stanzas('server')
       >>> return: {'serverName': 'testServer', 'sessionTimeout': '1h', ...}
    """

    if conf_name.endswith(".conf"):
        conf_name = conf_name[:-5]

    # TODO: dynamically calculate SPLUNK_HOME
    btool_cli = [
        op.join(os.environ["SPLUNK_HOME"], "bin", "splunk"),
        "cmd",
        "btool",
        conf_name,
        "list",
    ]

    if app_name:
        btool_cli.append(f"--app={app_name}")

    p = subprocess.Popen(  # nosemgrep: python.lang.security.audit.dangerous-subprocess-use.dangerous-subprocess-use
        btool_cli, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    )
    out, _ = p.communicate()

    if isinstance(out, bytes):
        out = out.decode()

    parser = ConfigParser(**{"strict": False})
    parser.optionxform = str
    parser.read_file(StringIO(out))

    out = {}
    for section in parser.sections():
        out[section] = {item[0]: item[1] for item in parser.items(section, raw=True)}
    return out

get_scheme_from_hec_settings(session_key=None)

Get scheme from HEC global settings.

Parameters:

Name Type Description Default
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None

Returns:

Type Description
str

scheme (str)

Source code in solnlib/splunkenv.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def get_scheme_from_hec_settings(session_key: Optional[str] = None) -> str:
    """Get scheme from HEC global settings.

    Arguments:
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
    Returns:
        scheme (str)
    """
    try:
        ssl_enabled = get_conf_key_value(
            "inputs",
            "http",
            "enableSSL",
            APP_HEC,
            session_key=session_key,
        )
    except KeyError:
        raise KeyError(
            "Cannot get enableSSL setting form conf: 'inputs' and stanza: '[http]'. "
            "Verify that your Splunk instance has the inputs.conf file with the correct [http] stanza. "
            "For more information see: "
            "https://docs.splunk.com/Documentation/Splunk/9.2.0/Data/UseHECusingconffiles"
        )

    if is_true(ssl_enabled):
        scheme = "https"
    else:
        scheme = "http"

    return scheme

get_splunk_bin()

Get absolute path of splunk CLI.

Returns:

Type Description
str

Absolute path of splunk CLI.

Source code in solnlib/splunkenv.py
119
120
121
122
123
124
125
126
127
128
129
130
def get_splunk_bin() -> str:
    """Get absolute path of splunk CLI.

    Returns:
        Absolute path of splunk CLI.
    """

    if os.name == "nt":
        splunk_bin = "splunk.exe"
    else:
        splunk_bin = "splunk"
    return make_splunkhome_path(("bin", splunk_bin))

get_splunk_host_info(session_key=None)

Get splunk host info.

Parameters:

Name Type Description Default
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None

Returns:

Type Description
Tuple

Tuple of (server_name, host_name).

Source code in solnlib/splunkenv.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def get_splunk_host_info(session_key: Optional[str] = None) -> Tuple:
    """Get splunk host info.

    Arguments:
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
    Returns:
        Tuple of (server_name, host_name).
    """

    server_name = get_conf_key_value(
        "server",
        "general",
        "serverName",
        APP_SYSTEM,
        session_key=session_key,
    )
    host_name = socket.gethostname()

    return server_name, host_name

get_splunkd_access_info(session_key=None)

Get splunkd server access info.

Parameters:

Name Type Description Default
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None

Returns:

Type Description
Tuple[str, str, int]

Tuple of (scheme, host, port).

Source code in solnlib/splunkenv.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def get_splunkd_access_info(session_key: Optional[str] = None) -> Tuple[str, str, int]:
    """Get splunkd server access info.

    Arguments:
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
    Returns:
        Tuple of (scheme, host, port).
    """
    enable_splunkd_ssl = get_conf_key_value(
        "server",
        "sslConfig",
        "enableSplunkdSSL",
        APP_SYSTEM,
        session_key=session_key,
    )

    if is_true(enable_splunkd_ssl):
        scheme = "https"
    else:
        scheme = "http"

    host_port = get_conf_key_value(
        "web",
        "settings",
        "mgmtHostPort",
        APP_SYSTEM,
        session_key=session_key,
    )
    host_port = host_port.strip()
    host_port_split_parts = host_port.split(":")
    host = ":".join(host_port_split_parts[:-1])
    port = int(host_port_split_parts[-1])

    if "SPLUNK_BINDIP" in os.environ:
        bindip = os.environ["SPLUNK_BINDIP"]
        port_idx = bindip.rfind(":")
        host = bindip[:port_idx] if port_idx > 0 else bindip

    return scheme, host, port

get_splunkd_uri(session_key=None)

Get splunkd uri.

Parameters:

Name Type Description Default
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None

Returns:

Type Description
str

Splunkd uri.

Source code in solnlib/splunkenv.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def get_splunkd_uri(session_key: Optional[str] = None) -> str:
    """Get splunkd uri.

    Arguments:
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
    Returns:
        Splunkd uri.
    """

    if os.environ.get("SPLUNKD_URI"):
        return os.environ["SPLUNKD_URI"]

    scheme, host, port = get_splunkd_access_info(session_key)
    return f"{scheme}://{host}:{port}"

make_splunkhome_path(parts)

Construct absolute path by $SPLUNK_HOME and parts.

Concatenate $SPLUNK_HOME and parts to an absolute path. For example, parts is [‘etc’, ‘apps’, ‘Splunk_TA_test’], the return path will be $SPLUNK_HOME/etc/apps/Splunk_TA_test. Note: this function assumed SPLUNK_HOME is in environment varialbes.

Parameters:

Name Type Description Default
parts Union[List, Tuple]

Path parts.

required

Returns:

Type Description
str

Absolute path.

Raises:

Type Description
ValueError

Escape from intended parent directories.

Source code in solnlib/splunkenv.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def make_splunkhome_path(parts: Union[List, Tuple]) -> str:
    """Construct absolute path by $SPLUNK_HOME and `parts`.

    Concatenate $SPLUNK_HOME and `parts` to an absolute path.
    For example, `parts` is ['etc', 'apps', 'Splunk_TA_test'],
    the return path will be $SPLUNK_HOME/etc/apps/Splunk_TA_test.
    Note: this function assumed SPLUNK_HOME is in environment varialbes.

    Arguments:
        parts: Path parts.

    Returns:
        Absolute path.

    Raises:
        ValueError: Escape from intended parent directories.
    """
    return msp(parts)

msp(*args, **kwargs)

Source code in solnlib/splunkenv.py
52
53
def msp(*args, **kwargs):
    raise ImportError("This module requires Splunk to be installed.")

simpleRequest(*args, **kwargs)

Source code in solnlib/splunkenv.py
36
37
def simpleRequest(*args, **kwargs):
    raise ImportError("This module requires Splunk to be installed.")