Skip to content

splunkenv.py

Splunk platform related utilities.

APP_HEC = 'splunk_httpinput' module-attribute

APP_SYSTEM = 'system' module-attribute

ETC_LEAF = 'etc' module-attribute

__all__ = ['make_splunkhome_path', 'get_splunk_host_info', 'get_splunk_bin', 'get_splunkd_access_info', 'get_scheme_from_hec_settings', 'get_splunkd_uri', 'get_conf_key_value', 'get_conf_stanza', 'get_conf_stanzas'] module-attribute

SessionKeyNotFound

Bases: Exception

Source code in solnlib/splunkenv.py
71
72
class SessionKeyNotFound(Exception):
    pass

get_conf_key_value(conf_name, stanza, key, app_name, session_key=None, user='nobody', raw_output=False)

Get value of key of stanza in conf_name.

Parameters:

Name Type Description Default
conf_name str

Config file.

required
stanza str

Stanza name.

required
key str

Key name in the stanza.

required
app_name str

Application name. To make a call to global context use ‘-’ as app_name and set raw_output=True. In that case manual parsing is needed as response may be the list with multiple entries.

required
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None
user str

used for set user context in API call. Optional.

'nobody'
raw_output Optional[bool]

if ‘true’ full, decoded response in json format will be returned. It should be set to True when app_name is a global context ‘/-/’. In that case splunk API may return multiple entries.

False

Returns:

Type Description
Union[str, list, dict]

Config value.

Raises:

Type Description
KeyError

If stanza or key doesn’t exist.

Source code in solnlib/splunkenv.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def get_conf_key_value(
    conf_name: str,
    stanza: str,
    key: str,
    app_name: str,
    session_key: Optional[str] = None,
    user: str = "nobody",
    raw_output: Optional[bool] = False,
) -> Union[str, list, dict]:
    """Get value of `key` of `stanza` in `conf_name`.

    Arguments:
        conf_name: Config file.
        stanza: Stanza name.
        key: Key name in the stanza.
        app_name: Application name. To make a call to global context use '-' as app_name and set raw_output=True.
            In that case manual parsing is needed as response may be the list with multiple entries.
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
        user: used for set user context in API call. Optional.
        raw_output: if 'true' full, decoded response in json format will be returned. It should be set to True when
            app_name is a global context '/-/'. In that case splunk API may return multiple entries.

    Returns:
        Config value.

    Raises:
        KeyError: If `stanza` or `key` doesn't exist.
    """

    if use_btool:
        app = None if app_name == "-" else app_name
        stanzas = get_conf_stanzas(conf_name, app)
        return stanzas[stanza][key]

    stanzas = _get_conf_stanzas_from_splunk_api(
        conf_name, app_name, session_key=session_key, user=user, stanza=stanza
    )

    if raw_output:
        return stanzas

    stanza = stanzas.get("entry")[0].get("content")
    requested_key = stanza[key]
    return requested_key

get_conf_stanza(conf_name, stanza, app_name, session_key=None, user='nobody', raw_output=False)

Get stanza in conf_name.

Parameters:

Name Type Description Default
conf_name str

Config file.

required
stanza str

Stanza name.

required
app_name str

Application name. To make a call to global context use ‘-’ as app_name and set raw_output=True. In that case manual parsing is needed as response may be the list with multiple entries.

required
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None
user str

used for set user context in API call. Optional.

'nobody'
raw_output Optional[bool]

if ‘true’ full, decoded response in json format will be returned. It should be set to True when app_name is a global context ‘/-/’. In that case splunk API may return multiple entries.

False

Returns:

Type Description
dict

Config stanza.

Raises:

Type Description
KeyError

If stanza doesn’t exist.

Source code in solnlib/splunkenv.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def get_conf_stanza(
    conf_name: str,
    stanza: str,
    app_name: str,
    session_key: Optional[str] = None,
    user: str = "nobody",
    raw_output: Optional[bool] = False,
) -> dict:
    """Get `stanza` in `conf_name`.

    Arguments:
        conf_name: Config file.
        stanza: Stanza name.
        app_name: Application name. To make a call to global context use '-' as app_name and set raw_output=True.
            In that case manual parsing is needed as response may be the list with multiple entries.
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
        user: used for set user context in API call. Optional.
        raw_output: if 'true' full, decoded response in json format will be returned. It should be set to True when
            app_name is a global context '/-/'. In that case splunk API may return multiple entries.

    Returns:
        Config stanza.

    Raises:
         KeyError: If stanza doesn't exist.
    """

    if use_btool:
        app = None if app_name == "-" else app_name
        stanzas = get_conf_stanzas(conf_name, app)
        return stanzas[stanza]

    stanzas = _get_conf_stanzas_from_splunk_api(
        conf_name, app_name, session_key=session_key, user=user, stanza=stanza
    )

    if raw_output:
        return stanzas

    stanza = stanzas.get("entry")[0].get("content")
    return stanza

get_conf_stanzas(conf_name, app_name=None)

Get stanzas of conf_name

Parameters:

Name Type Description Default
conf_name str

Config file.

required
app_name Optional[str]

Application name. Optional.

None

Returns:

Type Description
dict

Config stanzas.

Examples:

>>> stanzas = get_conf_stanzas('server')
>>> return: {'serverName': 'testServer', 'sessionTimeout': '1h', ...}
Source code in solnlib/splunkenv.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def get_conf_stanzas(conf_name: str, app_name: Optional[str] = None) -> dict:
    """Get stanzas of `conf_name`

    Arguments:
        conf_name: Config file.
        app_name: Application name. Optional.

    Returns:
        Config stanzas.

    Examples:
       >>> stanzas = get_conf_stanzas('server')
       >>> return: {'serverName': 'testServer', 'sessionTimeout': '1h', ...}
    """

    if conf_name.endswith(".conf"):
        conf_name = conf_name[:-5]

    # TODO: dynamically calculate SPLUNK_HOME
    btool_cli = [
        op.join(os.environ["SPLUNK_HOME"], "bin", "splunk"),
        "cmd",
        "btool",
        conf_name,
        "list",
    ]

    if app_name:
        btool_cli.append(f"--app={app_name}")

    p = subprocess.Popen(  # nosemgrep: python.lang.security.audit.dangerous-subprocess-use.dangerous-subprocess-use
        btool_cli, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    )
    out, _ = p.communicate()

    if isinstance(out, bytes):
        out = out.decode()

    parser = ConfigParser(**{"strict": False})
    parser.optionxform = str
    parser.read_file(StringIO(out))

    out = {}
    for section in parser.sections():
        out[section] = {item[0]: item[1] for item in parser.items(section, raw=True)}
    return out

get_scheme_from_hec_settings(session_key=None)

Get scheme from HEC global settings.

Parameters:

Name Type Description Default
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None

Returns: scheme (str)

Source code in solnlib/splunkenv.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def get_scheme_from_hec_settings(session_key: Optional[str] = None) -> str:
    """Get scheme from HEC global settings.

    Arguments:
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
    Returns:
        scheme (str)
    """
    try:
        ssl_enabled = get_conf_key_value(
            "inputs",
            "http",
            "enableSSL",
            APP_HEC,
            session_key=session_key,
        )
    except KeyError:
        raise KeyError(
            "Cannot get enableSSL setting form conf: 'inputs' and stanza: '[http]'. "
            "Verify that your Splunk instance has the inputs.conf file with the correct [http] stanza. "
            "For more information see: "
            "https://docs.splunk.com/Documentation/Splunk/9.2.0/Data/UseHECusingconffiles"
        )

    if is_true(ssl_enabled):
        scheme = "https"
    else:
        scheme = "http"

    return scheme

get_splunk_bin()

Get absolute path of splunk CLI.

Returns:

Type Description
str

Absolute path of splunk CLI.

Source code in solnlib/splunkenv.py
117
118
119
120
121
122
123
124
125
126
127
128
def get_splunk_bin() -> str:
    """Get absolute path of splunk CLI.

    Returns:
        Absolute path of splunk CLI.
    """

    if os.name == "nt":
        splunk_bin = "splunk.exe"
    else:
        splunk_bin = "splunk"
    return make_splunkhome_path(("bin", splunk_bin))

get_splunk_host_info(session_key=None)

Get splunk host info.

Parameters:

Name Type Description Default
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None

Returns: Tuple of (server_name, host_name).

Source code in solnlib/splunkenv.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def get_splunk_host_info(session_key: Optional[str] = None) -> tuple:
    """Get splunk host info.

    Arguments:
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
    Returns:
        Tuple of (server_name, host_name).
    """

    server_name = get_conf_key_value(
        "server",
        "general",
        "serverName",
        APP_SYSTEM,
        session_key=session_key,
    )
    host_name = socket.gethostname()

    return server_name, host_name

get_splunkd_access_info(session_key=None)

Get splunkd server access info.

Parameters:

Name Type Description Default
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None

Returns: Tuple of (scheme, host, port).

Source code in solnlib/splunkenv.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def get_splunkd_access_info(session_key: Optional[str] = None) -> tuple[str, str, int]:
    """Get splunkd server access info.

    Arguments:
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
    Returns:
        Tuple of (scheme, host, port).
    """
    enable_splunkd_ssl = get_conf_key_value(
        "server",
        "sslConfig",
        "enableSplunkdSSL",
        APP_SYSTEM,
        session_key=session_key,
    )

    if is_true(enable_splunkd_ssl):
        scheme = "https"
    else:
        scheme = "http"

    host_port = get_conf_key_value(
        "web",
        "settings",
        "mgmtHostPort",
        APP_SYSTEM,
        session_key=session_key,
    )
    host_port = host_port.strip()
    host_port_split_parts = host_port.split(":")
    host = ":".join(host_port_split_parts[:-1])
    port = int(host_port_split_parts[-1])

    if "SPLUNK_BINDIP" in os.environ:
        bindip = os.environ["SPLUNK_BINDIP"]
        port_idx = bindip.rfind(":")
        host = bindip[:port_idx] if port_idx > 0 else bindip

    return scheme, host, port

get_splunkd_uri(session_key=None)

Get splunkd uri.

Parameters:

Name Type Description Default
session_key Optional[str]

Needed to make a call to config endpoint. If ‘None’, solnlib will try to get it from splunk.getSessionKey() and/or main module and if it won’t get it, SessionKeyNotFound will be raised.

None

Returns: Splunkd uri.

Source code in solnlib/splunkenv.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def get_splunkd_uri(session_key: Optional[str] = None) -> str:
    """Get splunkd uri.

    Arguments:
        session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from
            splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised.
    Returns:
        Splunkd uri.
    """

    if os.environ.get("SPLUNKD_URI"):
        return os.environ["SPLUNKD_URI"]

    scheme, host, port = get_splunkd_access_info(session_key)
    return f"{scheme}://{host}:{port}"

make_splunkhome_path(parts)

Construct absolute path by $SPLUNK_HOME and parts.

Concatenate $SPLUNK_HOME and parts to an absolute path. For example, parts is [‘etc’, ‘apps’, ‘Splunk_TA_test’], the return path will be $SPLUNK_HOME/etc/apps/Splunk_TA_test. Note: this function assumed SPLUNK_HOME is in environment varialbes.

Parameters:

Name Type Description Default
parts Union[list, tuple]

Path parts.

required

Returns:

Type Description
str

Absolute path.

Raises:

Type Description
ValueError

Escape from intended parent directories.

Source code in solnlib/splunkenv.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def make_splunkhome_path(parts: Union[list, tuple]) -> str:
    """Construct absolute path by $SPLUNK_HOME and `parts`.

    Concatenate $SPLUNK_HOME and `parts` to an absolute path.
    For example, `parts` is ['etc', 'apps', 'Splunk_TA_test'],
    the return path will be $SPLUNK_HOME/etc/apps/Splunk_TA_test.
    Note: this function assumed SPLUNK_HOME is in environment varialbes.

    Arguments:
        parts: Path parts.

    Returns:
        Absolute path.

    Raises:
        ValueError: Escape from intended parent directories.
    """
    return msp(parts)