Skip to content

splunkenv.py

Splunk platform related utilities.

ETC_LEAF = 'etc' module-attribute

__all__ = ['make_splunkhome_path', 'get_splunk_host_info', 'get_splunk_bin', 'get_splunkd_access_info', 'get_scheme_from_hec_settings', 'get_splunkd_uri', 'get_conf_key_value', 'get_conf_stanza', 'get_conf_stanzas'] module-attribute

on_shared_storage = [os.path.join(ETC_LEAF, 'apps'), os.path.join(ETC_LEAF, 'users'), os.path.join('var', 'run', 'splunk', 'dispatch'), os.path.join('var', 'run', 'splunk', 'srtemp'), os.path.join('var', 'run', 'splunk', 'rss'), os.path.join('var', 'run', 'splunk', 'scheduler'), os.path.join('var', 'run', 'splunk', 'lookup_tmp')] module-attribute

get_conf_key_value(conf_name, stanza, key)

Get value of key of stanza in conf_name.

Parameters:

Name Type Description Default
conf_name str

Config file.

required
stanza str

Stanza name.

required
key str

Key name.

required

Returns:

Type Description
Union[str, List, dict]

Config value.

Raises:

Type Description
KeyError

If stanza or key doesn’t exist.

Source code in solnlib/splunkenv.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def get_conf_key_value(conf_name: str, stanza: str, key: str) -> Union[str, List, dict]:
    """Get value of `key` of `stanza` in `conf_name`.

    Arguments:
        conf_name: Config file.
        stanza: Stanza name.
        key: Key name.

    Returns:
        Config value.

    Raises:
        KeyError: If `stanza` or `key` doesn't exist.
    """

    stanzas = get_conf_stanzas(conf_name)
    return stanzas[stanza][key]

get_conf_stanza(conf_name, stanza)

Get stanza in conf_name.

Parameters:

Name Type Description Default
conf_name str

Config file.

required
stanza str

Stanza name.

required

Returns:

Type Description
dict

Config stanza.

Raises:

Type Description
KeyError

If stanza doesn’t exist.

Source code in solnlib/splunkenv.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def get_conf_stanza(conf_name: str, stanza: str) -> dict:
    """Get `stanza` in `conf_name`.

    Arguments:
        conf_name: Config file.
        stanza: Stanza name.

    Returns:
        Config stanza.

    Raises:
         KeyError: If stanza doesn't exist.
    """

    stanzas = get_conf_stanzas(conf_name)
    return stanzas[stanza]

get_conf_stanzas(conf_name)

Get stanzas of conf_name

Parameters:

Name Type Description Default
conf_name str

Config file.

required

Returns:

Type Description
dict

Config stanzas.

Examples:

>>> stanzas = get_conf_stanzas('server')
>>> return: {'serverName': 'testServer', 'sessionTimeout': '1h', ...}
Source code in solnlib/splunkenv.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def get_conf_stanzas(conf_name: str) -> dict:
    """Get stanzas of `conf_name`

    Arguments:
        conf_name: Config file.

    Returns:
        Config stanzas.

    Examples:
       >>> stanzas = get_conf_stanzas('server')
       >>> return: {'serverName': 'testServer', 'sessionTimeout': '1h', ...}
    """

    if conf_name.endswith(".conf"):
        conf_name = conf_name[:-5]

    # TODO: dynamically calculate SPLUNK_HOME
    btool_cli = [
        op.join(os.environ["SPLUNK_HOME"], "bin", "splunk"),
        "cmd",
        "btool",
        conf_name,
        "list",
    ]
    p = subprocess.Popen(  # nosemgrep: python.lang.security.audit.dangerous-subprocess-use.dangerous-subprocess-use
        btool_cli, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    )
    out, _ = p.communicate()

    if isinstance(out, bytes):
        out = out.decode()

    parser = ConfigParser(**{"strict": False})
    parser.optionxform = str
    parser.read_file(StringIO(out))

    out = {}
    for section in parser.sections():
        out[section] = {item[0]: item[1] for item in parser.items(section, raw=True)}
    return out

get_scheme_from_hec_settings()

Get scheme from HEC global settings.

Returns:

Type Description
str

scheme (str)

Source code in solnlib/splunkenv.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def get_scheme_from_hec_settings() -> str:
    """Get scheme from HEC global settings.

    Returns:
        scheme (str)
    """
    try:
        ssl_enabled = get_conf_key_value("inputs", "http", "enableSSL")
    except KeyError:
        raise KeyError(
            "Cannot get enableSSL setting form conf: 'inputs' and stanza: '[http]'. "
            "Verify that your Splunk instance has the inputs.conf file with the correct [http] stanza. "
            "For more information see: "
            "https://docs.splunk.com/Documentation/Splunk/9.2.0/Data/UseHECusingconffiles"
        )

    if is_true(ssl_enabled):
        scheme = "https"
    else:
        scheme = "http"

    return scheme

get_splunk_bin()

Get absolute path of splunk CLI.

Returns:

Type Description
str

Absolute path of splunk CLI.

Source code in solnlib/splunkenv.py
162
163
164
165
166
167
168
169
170
171
172
173
def get_splunk_bin() -> str:
    """Get absolute path of splunk CLI.

    Returns:
        Absolute path of splunk CLI.
    """

    if os.name == "nt":
        splunk_bin = "splunk.exe"
    else:
        splunk_bin = "splunk"
    return make_splunkhome_path(("bin", splunk_bin))

get_splunk_host_info()

Get splunk host info.

Returns:

Type Description
Tuple

Tuple of (server_name, host_name).

Source code in solnlib/splunkenv.py
150
151
152
153
154
155
156
157
158
159
def get_splunk_host_info() -> Tuple:
    """Get splunk host info.

    Returns:
        Tuple of (server_name, host_name).
    """

    server_name = get_conf_key_value("server", "general", "serverName")
    host_name = socket.gethostname()
    return server_name, host_name

get_splunkd_access_info()

Get splunkd server access info.

Returns:

Type Description
Tuple[str, str, int]

Tuple of (scheme, host, port).

Source code in solnlib/splunkenv.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def get_splunkd_access_info() -> Tuple[str, str, int]:
    """Get splunkd server access info.

    Returns:
        Tuple of (scheme, host, port).
    """

    if is_true(get_conf_key_value("server", "sslConfig", "enableSplunkdSSL")):
        scheme = "https"
    else:
        scheme = "http"

    host_port = get_conf_key_value("web", "settings", "mgmtHostPort")
    host_port = host_port.strip()
    host_port_split_parts = host_port.split(":")
    host = ":".join(host_port_split_parts[:-1])
    port = int(host_port_split_parts[-1])

    if "SPLUNK_BINDIP" in os.environ:
        bindip = os.environ["SPLUNK_BINDIP"]
        port_idx = bindip.rfind(":")
        host = bindip[:port_idx] if port_idx > 0 else bindip

    return scheme, host, port

get_splunkd_uri()

Get splunkd uri.

Returns:

Type Description
str

Splunkd uri.

Source code in solnlib/splunkenv.py
226
227
228
229
230
231
232
233
234
235
236
237
def get_splunkd_uri() -> str:
    """Get splunkd uri.

    Returns:
        Splunkd uri.
    """

    if os.environ.get("SPLUNKD_URI"):
        return os.environ["SPLUNKD_URI"]

    scheme, host, port = get_splunkd_access_info()
    return f"{scheme}://{host}:{port}"

make_splunkhome_path(parts)

Construct absolute path by $SPLUNK_HOME and parts.

Concatenate $SPLUNK_HOME and parts to an absolute path. For example, parts is [‘etc’, ‘apps’, ‘Splunk_TA_test’], the return path will be $SPLUNK_HOME/etc/apps/Splunk_TA_test. Note: this function assumed SPLUNK_HOME is in environment varialbes.

Parameters:

Name Type Description Default
parts Union[List, Tuple]

Path parts.

required

Returns:

Type Description
str

Absolute path.

Raises:

Type Description
ValueError

Escape from intended parent directories.

Source code in solnlib/splunkenv.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def make_splunkhome_path(parts: Union[List, Tuple]) -> str:
    """Construct absolute path by $SPLUNK_HOME and `parts`.

    Concatenate $SPLUNK_HOME and `parts` to an absolute path.
    For example, `parts` is ['etc', 'apps', 'Splunk_TA_test'],
    the return path will be $SPLUNK_HOME/etc/apps/Splunk_TA_test.
    Note: this function assumed SPLUNK_HOME is in environment varialbes.

    Arguments:
        parts: Path parts.

    Returns:
        Absolute path.

    Raises:
        ValueError: Escape from intended parent directories.
    """

    relpath = os.path.normpath(os.path.join(*parts))

    basepath = None
    shared_storage = _get_shared_storage()
    if shared_storage:
        for candidate in on_shared_storage:
            # SPL-100508 On windows if the path is missing the drive letter,
            # construct fullpath manually and call relpath
            if os.name == "nt" and not _verify_path_prefix(relpath, candidate):
                break

            if os.path.relpath(relpath, candidate)[0:2] != "..":
                basepath = shared_storage
                break

    if basepath is None:
        etc_with_trailing_sep = os.path.join(ETC_LEAF, "")
        if relpath == ETC_LEAF or relpath.startswith(etc_with_trailing_sep):
            # Redirect $SPLUNK_HOME/etc to $SPLUNK_ETC.
            basepath = _splunk_etc()
            # Remove leading etc (and path separator, if present). Note: when
            # emitting $SPLUNK_ETC exactly, with no additional path parts, we
            # set <relpath> to the empty string.
            relpath = relpath[4:]
        else:
            basepath = _splunk_home()

    fullpath = os.path.normpath(os.path.join(basepath, relpath))

    # Check that we haven't escaped from intended parent directories.
    if os.path.relpath(fullpath, basepath)[0:2] == "..":
        raise ValueError(
            f'Illegal escape from parent directory "{basepath}": {fullpath}'
        )
    return fullpath