Skip to content

conf_manager.py

This module contains simple interfaces for Splunk config file management, you can update/get/delete stanzas and encrypt/decrypt some fields of stanza automatically.

__all__ = ['ConfStanzaNotExistException', 'ConfFile', 'ConfManagerException', 'ConfManager'] module-attribute

ConfFile

Configuration file.

Source code in solnlib/conf_manager.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
class ConfFile:
    """Configuration file."""

    ENCRYPTED_TOKEN = "******"

    reserved_keys = ("userName", "appName")

    def __init__(
        self,
        name: str,
        conf: client.ConfigurationFile,
        session_key: str,
        app: str,
        owner: str = "nobody",
        scheme: str = None,
        host: str = None,
        port: int = None,
        realm: str = None,
        **context: dict,
    ):
        """Initializes ConfFile.

        Arguments:
            name: Configuration file name.
            conf: Configuration file object.
            session_key: Splunk access token.
            app: App name of namespace.
            owner: (optional) Owner of namespace, default is `nobody`.
            scheme: (optional) The access scheme, default is None.
            host: (optional) The host name, default is None.
            port: (optional) The port number, default is None.
            realm: (optional) Realm of credential, default is None.
            context: Other configurations for Splunk rest client.
        """
        self._name = name
        self._conf = conf
        self._session_key = session_key
        self._app = app
        self._owner = owner
        self._scheme = scheme
        self._host = host
        self._port = port
        self._context = context
        self._cred_manager = None
        # 'realm' is set to provided 'realm' argument otherwise as default
        # behaviour it is set to 'APP_NAME'.
        if realm is None:
            self._realm = self._app
        else:
            self._realm = realm

    @property
    @retry(exceptions=[binding.HTTPError])
    def _cred_mgr(self):
        if self._cred_manager is None:
            self._cred_manager = CredentialManager(
                self._session_key,
                self._app,
                owner=self._owner,
                realm=self._realm,
                scheme=self._scheme,
                host=self._host,
                port=self._port,
                **self._context,
            )

        return self._cred_manager

    def _filter_stanza(self, stanza):
        for k in self.reserved_keys:
            if k in stanza:
                del stanza[k]

        return stanza

    def _encrypt_stanza(self, stanza_name, stanza, encrypt_keys):
        if not encrypt_keys:
            return stanza

        encrypt_stanza_keys = [k for k in encrypt_keys if k in stanza]
        encrypt_fields = {key: stanza[key] for key in encrypt_stanza_keys}
        if not encrypt_fields:
            return stanza
        self._cred_mgr.set_password(stanza_name, json.dumps(encrypt_fields))

        for key in encrypt_stanza_keys:
            stanza[key] = self.ENCRYPTED_TOKEN

        return stanza

    def _decrypt_stanza(self, stanza_name, encrypted_stanza):
        encrypted_keys = [
            key
            for key in encrypted_stanza
            if encrypted_stanza[key] == self.ENCRYPTED_TOKEN
        ]
        if encrypted_keys:
            encrypted_fields = json.loads(self._cred_mgr.get_password(stanza_name))
            for key in encrypted_keys:
                encrypted_stanza[key] = encrypted_fields[key]

        return encrypted_stanza

    def _delete_stanza_creds(self, stanza_name):
        self._cred_mgr.delete_password(stanza_name)

    @retry(exceptions=[binding.HTTPError])
    def stanza_exist(self, stanza_name: str) -> bool:
        """Check whether stanza exists.

        Arguments:
            stanza_name: Stanza name.

        Returns:
            True if stanza exists else False.

        Examples:
           >>> from solnlib import conf_manager
           >>> cfm = conf_manager.ConfManager(session_key,
                                              'Splunk_TA_test')
           >>> conf = cfm.get_conf('test')
           >>> conf.stanza_exist('test_stanza')
        """

        try:
            self._conf.list(name=stanza_name)[0]
        except binding.HTTPError as e:
            if e.status != 404:
                raise

            return False

        return True

    @retry(exceptions=[binding.HTTPError])
    def get(self, stanza_name: str, only_current_app: bool = False) -> dict:
        """Get stanza from configuration file.

        Result is like:

            {
                'disabled': '0',
                'eai:appName': 'solnlib_demo',
                'eai:userName': 'nobody',
                'k1': '1',
                'k2': '2'
            }

        Arguments:
            stanza_name: Stanza name.
            only_current_app: Only include current app.

        Returns:
            Stanza.

        Raises:
            ConfStanzaNotExistException: If stanza does not exist.

        Examples:
           >>> from solnlib import conf_manager
           >>> cfm = conf_manager.ConfManager(session_key,
                                              'Splunk_TA_test')
           >>> conf = cfm.get_conf('test')
           >>> conf.get('test_stanza')
        """

        try:
            if only_current_app:
                stanza_mgrs = self._conf.list(
                    search="eai:acl.app={} name={}".format(
                        self._app, stanza_name.replace("=", r"\=")
                    )
                )
            else:
                stanza_mgrs = self._conf.list(name=stanza_name)
        except binding.HTTPError as e:
            if e.status != 404:
                raise

            raise ConfStanzaNotExistException(
                f"Stanza: {stanza_name} does not exist in {self._name}.conf"
            )

        if len(stanza_mgrs) == 0:
            raise ConfStanzaNotExistException(
                f"Stanza: {stanza_name} does not exist in {self._name}.conf"
            )

        stanza = self._decrypt_stanza(stanza_mgrs[0].name, stanza_mgrs[0].content)
        stanza["eai:access"] = stanza_mgrs[0].access
        stanza["eai:appName"] = stanza_mgrs[0].access.app
        return stanza

    @retry(exceptions=[binding.HTTPError])
    def get_all(self, only_current_app: bool = False) -> dict:
        """Get all stanzas from configuration file.

        Result is like:

            {
                'test':
                    {
                        'disabled': '0',
                        'eai:appName': 'solnlib_demo',
                        'eai:userName': 'nobody',
                        'k1': '1',
                        'k2': '2'
                    }
            }

        Arguments:
            only_current_app: Only include current app.

        Returns:
            Dict of stanzas.

        Examples:
           >>> from solnlib import conf_manager
           >>> cfm = conf_manager.ConfManager(session_key,
                                              'Splunk_TA_test')
           >>> conf = cfm.get_conf('test')
           >>> conf.get_all()
        """

        if only_current_app:
            stanza_mgrs = self._conf.list(search=f"eai:acl.app={self._app}")
        else:
            stanza_mgrs = self._conf.list()
        res = {}
        for stanza_mgr in stanza_mgrs:
            name = stanza_mgr.name
            key_values = self._decrypt_stanza(name, stanza_mgr.content)
            key_values["eai:access"] = stanza_mgr.access
            key_values["eai:appName"] = stanza_mgr.access.app
            res[name] = key_values
        return res

    @retry(exceptions=[binding.HTTPError])
    def update(self, stanza_name: str, stanza: dict, encrypt_keys: List[str] = None):
        """Update stanza.

        It will try to encrypt the credential automatically fist if
        encrypt_keys are not None else keep stanza untouched.

        Arguments:
            stanza_name: Stanza name.
            stanza: Stanza to update.
            encrypt_keys: Field names to encrypt.

        Examples:
           >>> from solnlib import conf_manager
           >>> cfm = conf_manager.ConfManager(session_key,
                                              'Splunk_TA_test')
           >>> conf = cfm.get_conf('test')
           >>> conf.update('test_stanza', {'k1': 1, 'k2': 2}, ['k1'])
        """

        stanza = self._filter_stanza(stanza)
        encrypted_stanza = self._encrypt_stanza(stanza_name, stanza, encrypt_keys)

        try:
            stanza_mgr = self._conf.list(name=stanza_name)[0]
        except binding.HTTPError as e:
            if e.status != 404:
                raise

            stanza_mgr = self._conf.create(stanza_name)

        stanza_mgr.submit(encrypted_stanza)

    @retry(exceptions=[binding.HTTPError])
    def delete(self, stanza_name: str):
        """Delete stanza.

        Arguments:
            stanza_name: Stanza name to delete.

        Raises:
            ConfStanzaNotExistException: If stanza does not exist.

        Examples:
           >>> from solnlib import conf_manager
           >>> cfm = conf_manager.ConfManager(session_key,
                                              'Splunk_TA_test')
           >>> conf = cfm.get_conf('test')
           >>> conf.delete('test_stanza')
        """

        try:
            self._cred_mgr.delete_password(stanza_name)
        except CredentialNotExistException:
            pass

        try:
            self._conf.delete(stanza_name)
        except KeyError:
            logging.error(
                "Delete stanza: %s error: %s.", stanza_name, traceback.format_exc()
            )
            raise ConfStanzaNotExistException(
                f"Stanza: {stanza_name} does not exist in {self._name}.conf"
            )

    @retry(exceptions=[binding.HTTPError])
    def reload(self):
        """Reload configuration file.

        Examples:
           >>> from solnlib import conf_manager
           >>> cfm = conf_manager.ConfManager(session_key,
                                              'Splunk_TA_test')
           >>> conf = cfm.get_conf('test')
           >>> conf.reload()
        """

        self._conf.get("_reload")

ENCRYPTED_TOKEN = '******' class-attribute instance-attribute

reserved_keys = ('userName', 'appName') class-attribute instance-attribute

__init__(name, conf, session_key, app, owner='nobody', scheme=None, host=None, port=None, realm=None, **context)

Initializes ConfFile.

Parameters:

Name Type Description Default
name str

Configuration file name.

required
conf client.ConfigurationFile

Configuration file object.

required
session_key str

Splunk access token.

required
app str

App name of namespace.

required
owner str

(optional) Owner of namespace, default is nobody.

'nobody'
scheme str

(optional) The access scheme, default is None.

None
host str

(optional) The host name, default is None.

None
port int

(optional) The port number, default is None.

None
realm str

(optional) Realm of credential, default is None.

None
context dict

Other configurations for Splunk rest client.

{}
Source code in solnlib/conf_manager.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def __init__(
    self,
    name: str,
    conf: client.ConfigurationFile,
    session_key: str,
    app: str,
    owner: str = "nobody",
    scheme: str = None,
    host: str = None,
    port: int = None,
    realm: str = None,
    **context: dict,
):
    """Initializes ConfFile.

    Arguments:
        name: Configuration file name.
        conf: Configuration file object.
        session_key: Splunk access token.
        app: App name of namespace.
        owner: (optional) Owner of namespace, default is `nobody`.
        scheme: (optional) The access scheme, default is None.
        host: (optional) The host name, default is None.
        port: (optional) The port number, default is None.
        realm: (optional) Realm of credential, default is None.
        context: Other configurations for Splunk rest client.
    """
    self._name = name
    self._conf = conf
    self._session_key = session_key
    self._app = app
    self._owner = owner
    self._scheme = scheme
    self._host = host
    self._port = port
    self._context = context
    self._cred_manager = None
    # 'realm' is set to provided 'realm' argument otherwise as default
    # behaviour it is set to 'APP_NAME'.
    if realm is None:
        self._realm = self._app
    else:
        self._realm = realm

delete(stanza_name)

Delete stanza.

Parameters:

Name Type Description Default
stanza_name str

Stanza name to delete.

required

Raises:

Type Description
ConfStanzaNotExistException

If stanza does not exist.

Examples:

>>> from solnlib import conf_manager
>>> cfm = conf_manager.ConfManager(session_key,
                                   'Splunk_TA_test')
>>> conf = cfm.get_conf('test')
>>> conf.delete('test_stanza')
Source code in solnlib/conf_manager.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
@retry(exceptions=[binding.HTTPError])
def delete(self, stanza_name: str):
    """Delete stanza.

    Arguments:
        stanza_name: Stanza name to delete.

    Raises:
        ConfStanzaNotExistException: If stanza does not exist.

    Examples:
       >>> from solnlib import conf_manager
       >>> cfm = conf_manager.ConfManager(session_key,
                                          'Splunk_TA_test')
       >>> conf = cfm.get_conf('test')
       >>> conf.delete('test_stanza')
    """

    try:
        self._cred_mgr.delete_password(stanza_name)
    except CredentialNotExistException:
        pass

    try:
        self._conf.delete(stanza_name)
    except KeyError:
        logging.error(
            "Delete stanza: %s error: %s.", stanza_name, traceback.format_exc()
        )
        raise ConfStanzaNotExistException(
            f"Stanza: {stanza_name} does not exist in {self._name}.conf"
        )

get(stanza_name, only_current_app=False)

Get stanza from configuration file.

Result is like

{ ‘disabled’: ‘0’, ‘eai:appName’: ‘solnlib_demo’, ‘eai:userName’: ‘nobody’, ‘k1’: ‘1’, ‘k2’: ‘2’ }

Parameters:

Name Type Description Default
stanza_name str

Stanza name.

required
only_current_app bool

Only include current app.

False

Returns:

Type Description
dict

Stanza.

Raises:

Type Description
ConfStanzaNotExistException

If stanza does not exist.

Examples:

>>> from solnlib import conf_manager
>>> cfm = conf_manager.ConfManager(session_key,
                                   'Splunk_TA_test')
>>> conf = cfm.get_conf('test')
>>> conf.get('test_stanza')
Source code in solnlib/conf_manager.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
@retry(exceptions=[binding.HTTPError])
def get(self, stanza_name: str, only_current_app: bool = False) -> dict:
    """Get stanza from configuration file.

    Result is like:

        {
            'disabled': '0',
            'eai:appName': 'solnlib_demo',
            'eai:userName': 'nobody',
            'k1': '1',
            'k2': '2'
        }

    Arguments:
        stanza_name: Stanza name.
        only_current_app: Only include current app.

    Returns:
        Stanza.

    Raises:
        ConfStanzaNotExistException: If stanza does not exist.

    Examples:
       >>> from solnlib import conf_manager
       >>> cfm = conf_manager.ConfManager(session_key,
                                          'Splunk_TA_test')
       >>> conf = cfm.get_conf('test')
       >>> conf.get('test_stanza')
    """

    try:
        if only_current_app:
            stanza_mgrs = self._conf.list(
                search="eai:acl.app={} name={}".format(
                    self._app, stanza_name.replace("=", r"\=")
                )
            )
        else:
            stanza_mgrs = self._conf.list(name=stanza_name)
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        raise ConfStanzaNotExistException(
            f"Stanza: {stanza_name} does not exist in {self._name}.conf"
        )

    if len(stanza_mgrs) == 0:
        raise ConfStanzaNotExistException(
            f"Stanza: {stanza_name} does not exist in {self._name}.conf"
        )

    stanza = self._decrypt_stanza(stanza_mgrs[0].name, stanza_mgrs[0].content)
    stanza["eai:access"] = stanza_mgrs[0].access
    stanza["eai:appName"] = stanza_mgrs[0].access.app
    return stanza

get_all(only_current_app=False)

Get all stanzas from configuration file.

Result is like

{ ‘test’: { ‘disabled’: ‘0’, ‘eai:appName’: ‘solnlib_demo’, ‘eai:userName’: ‘nobody’, ‘k1’: ‘1’, ‘k2’: ‘2’ } }

Parameters:

Name Type Description Default
only_current_app bool

Only include current app.

False

Returns:

Type Description
dict

Dict of stanzas.

Examples:

>>> from solnlib import conf_manager
>>> cfm = conf_manager.ConfManager(session_key,
                                   'Splunk_TA_test')
>>> conf = cfm.get_conf('test')
>>> conf.get_all()
Source code in solnlib/conf_manager.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
@retry(exceptions=[binding.HTTPError])
def get_all(self, only_current_app: bool = False) -> dict:
    """Get all stanzas from configuration file.

    Result is like:

        {
            'test':
                {
                    'disabled': '0',
                    'eai:appName': 'solnlib_demo',
                    'eai:userName': 'nobody',
                    'k1': '1',
                    'k2': '2'
                }
        }

    Arguments:
        only_current_app: Only include current app.

    Returns:
        Dict of stanzas.

    Examples:
       >>> from solnlib import conf_manager
       >>> cfm = conf_manager.ConfManager(session_key,
                                          'Splunk_TA_test')
       >>> conf = cfm.get_conf('test')
       >>> conf.get_all()
    """

    if only_current_app:
        stanza_mgrs = self._conf.list(search=f"eai:acl.app={self._app}")
    else:
        stanza_mgrs = self._conf.list()
    res = {}
    for stanza_mgr in stanza_mgrs:
        name = stanza_mgr.name
        key_values = self._decrypt_stanza(name, stanza_mgr.content)
        key_values["eai:access"] = stanza_mgr.access
        key_values["eai:appName"] = stanza_mgr.access.app
        res[name] = key_values
    return res

reload()

Reload configuration file.

Examples:

>>> from solnlib import conf_manager
>>> cfm = conf_manager.ConfManager(session_key,
                                   'Splunk_TA_test')
>>> conf = cfm.get_conf('test')
>>> conf.reload()
Source code in solnlib/conf_manager.py
349
350
351
352
353
354
355
356
357
358
359
360
361
@retry(exceptions=[binding.HTTPError])
def reload(self):
    """Reload configuration file.

    Examples:
       >>> from solnlib import conf_manager
       >>> cfm = conf_manager.ConfManager(session_key,
                                          'Splunk_TA_test')
       >>> conf = cfm.get_conf('test')
       >>> conf.reload()
    """

    self._conf.get("_reload")

stanza_exist(stanza_name)

Check whether stanza exists.

Parameters:

Name Type Description Default
stanza_name str

Stanza name.

required

Returns:

Type Description
bool

True if stanza exists else False.

Examples:

>>> from solnlib import conf_manager
>>> cfm = conf_manager.ConfManager(session_key,
                                   'Splunk_TA_test')
>>> conf = cfm.get_conf('test')
>>> conf.stanza_exist('test_stanza')
Source code in solnlib/conf_manager.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@retry(exceptions=[binding.HTTPError])
def stanza_exist(self, stanza_name: str) -> bool:
    """Check whether stanza exists.

    Arguments:
        stanza_name: Stanza name.

    Returns:
        True if stanza exists else False.

    Examples:
       >>> from solnlib import conf_manager
       >>> cfm = conf_manager.ConfManager(session_key,
                                          'Splunk_TA_test')
       >>> conf = cfm.get_conf('test')
       >>> conf.stanza_exist('test_stanza')
    """

    try:
        self._conf.list(name=stanza_name)[0]
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        return False

    return True

update(stanza_name, stanza, encrypt_keys=None)

Update stanza.

It will try to encrypt the credential automatically fist if encrypt_keys are not None else keep stanza untouched.

Parameters:

Name Type Description Default
stanza_name str

Stanza name.

required
stanza dict

Stanza to update.

required
encrypt_keys List[str]

Field names to encrypt.

None

Examples:

>>> from solnlib import conf_manager
>>> cfm = conf_manager.ConfManager(session_key,
                                   'Splunk_TA_test')
>>> conf = cfm.get_conf('test')
>>> conf.update('test_stanza', {'k1': 1, 'k2': 2}, ['k1'])
Source code in solnlib/conf_manager.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
@retry(exceptions=[binding.HTTPError])
def update(self, stanza_name: str, stanza: dict, encrypt_keys: List[str] = None):
    """Update stanza.

    It will try to encrypt the credential automatically fist if
    encrypt_keys are not None else keep stanza untouched.

    Arguments:
        stanza_name: Stanza name.
        stanza: Stanza to update.
        encrypt_keys: Field names to encrypt.

    Examples:
       >>> from solnlib import conf_manager
       >>> cfm = conf_manager.ConfManager(session_key,
                                          'Splunk_TA_test')
       >>> conf = cfm.get_conf('test')
       >>> conf.update('test_stanza', {'k1': 1, 'k2': 2}, ['k1'])
    """

    stanza = self._filter_stanza(stanza)
    encrypted_stanza = self._encrypt_stanza(stanza_name, stanza, encrypt_keys)

    try:
        stanza_mgr = self._conf.list(name=stanza_name)[0]
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        stanza_mgr = self._conf.create(stanza_name)

    stanza_mgr.submit(encrypted_stanza)

ConfManager

Configuration file manager.

Examples:

>>> from solnlib import conf_manager
>>> cfm = conf_manager.ConfManager(session_key,
                                  'Splunk_TA_test')

Examples:

If stanza in passwords.conf is formatted as below:

credential:__REST_CREDENTIAL__#Splunk_TA_test#configs/conf-CONF_FILENAME:STANZA_NAME``splunk_cred_sep``1:

>>> from solnlib import conf_manager
>>> cfm = conf_manager.ConfManager(
        session_key,
        'Splunk_TA_test',
        realm='__REST_CREDENTIAL__#Splunk_TA_test#configs/conf-CONF_FILENAME'
    )
Source code in solnlib/conf_manager.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
class ConfManager:
    """Configuration file manager.

    Examples:

        >>> from solnlib import conf_manager
        >>> cfm = conf_manager.ConfManager(session_key,
                                          'Splunk_TA_test')

    Examples:
        If stanza in passwords.conf is formatted as below:

        `credential:__REST_CREDENTIAL__#Splunk_TA_test#configs/conf-CONF_FILENAME:STANZA_NAME``splunk_cred_sep``1:`

        >>> from solnlib import conf_manager
        >>> cfm = conf_manager.ConfManager(
                session_key,
                'Splunk_TA_test',
                realm='__REST_CREDENTIAL__#Splunk_TA_test#configs/conf-CONF_FILENAME'
            )
    """

    def __init__(
        self,
        session_key: str,
        app: str,
        owner: str = "nobody",
        scheme: str = None,
        host: str = None,
        port: int = None,
        realm: str = None,
        **context: dict,
    ):
        """Initializes ConfManager.

        Arguments:
            session_key: Splunk access token.
            app: App name of namespace.
            owner: (optional) Owner of namespace, default is `nobody`.
            scheme: (optional) The access scheme, default is None.
            host: (optional) The host name, default is None.
            port: (optional) The port number, default is None.
            realm: (optional) Realm of credential, default is None.
            context: Other configurations for Splunk rest client.
        """
        self._session_key = session_key
        self._app = app
        self._owner = owner
        self._scheme = scheme
        self._host = host
        self._port = port
        self._context = context
        self._rest_client = rest_client.SplunkRestClient(
            self._session_key,
            self._app,
            owner=self._owner,
            scheme=self._scheme,
            host=self._host,
            port=self._port,
            **self._context,
        )
        self._confs = None
        self._realm = realm

    @retry(exceptions=[binding.HTTPError])
    def get_conf(self, name: str, refresh: bool = False) -> ConfFile:
        """Get conf file.

        Arguments:
            name: Conf file name.
            refresh: (optional) Flag to refresh conf file list, default is False.

        Returns:
            Conf file object.

        Raises:
            ConfManagerException: If `conf_file` does not exist.
        """

        if self._confs is None or refresh:
            # Fix bug that can't pass `-` as app name.
            curr_app = self._rest_client.namespace.app
            self._rest_client.namespace.app = "dummy"
            self._confs = self._rest_client.confs
            self._rest_client.namespace.app = curr_app

        try:
            conf = self._confs[name]
        except KeyError:
            raise ConfManagerException(f"Config file: {name} does not exist.")

        return ConfFile(
            name,
            conf,
            self._session_key,
            self._app,
            self._owner,
            self._scheme,
            self._host,
            self._port,
            self._realm,
            **self._context,
        )

    @retry(exceptions=[binding.HTTPError])
    def create_conf(self, name: str) -> ConfFile:
        """Create conf file.

        Arguments:
            name: Conf file name.

        Returns:
            Conf file object.
        """

        if self._confs is None:
            self._confs = self._rest_client.confs

        conf = self._confs.create(name)
        return ConfFile(
            name,
            conf,
            self._session_key,
            self._app,
            self._owner,
            self._scheme,
            self._host,
            self._port,
            self._realm,
            **self._context,
        )

__init__(session_key, app, owner='nobody', scheme=None, host=None, port=None, realm=None, **context)

Initializes ConfManager.

Parameters:

Name Type Description Default
session_key str

Splunk access token.

required
app str

App name of namespace.

required
owner str

(optional) Owner of namespace, default is nobody.

'nobody'
scheme str

(optional) The access scheme, default is None.

None
host str

(optional) The host name, default is None.

None
port int

(optional) The port number, default is None.

None
realm str

(optional) Realm of credential, default is None.

None
context dict

Other configurations for Splunk rest client.

{}
Source code in solnlib/conf_manager.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
def __init__(
    self,
    session_key: str,
    app: str,
    owner: str = "nobody",
    scheme: str = None,
    host: str = None,
    port: int = None,
    realm: str = None,
    **context: dict,
):
    """Initializes ConfManager.

    Arguments:
        session_key: Splunk access token.
        app: App name of namespace.
        owner: (optional) Owner of namespace, default is `nobody`.
        scheme: (optional) The access scheme, default is None.
        host: (optional) The host name, default is None.
        port: (optional) The port number, default is None.
        realm: (optional) Realm of credential, default is None.
        context: Other configurations for Splunk rest client.
    """
    self._session_key = session_key
    self._app = app
    self._owner = owner
    self._scheme = scheme
    self._host = host
    self._port = port
    self._context = context
    self._rest_client = rest_client.SplunkRestClient(
        self._session_key,
        self._app,
        owner=self._owner,
        scheme=self._scheme,
        host=self._host,
        port=self._port,
        **self._context,
    )
    self._confs = None
    self._realm = realm

create_conf(name)

Create conf file.

Parameters:

Name Type Description Default
name str

Conf file name.

required

Returns:

Type Description
ConfFile

Conf file object.

Source code in solnlib/conf_manager.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
@retry(exceptions=[binding.HTTPError])
def create_conf(self, name: str) -> ConfFile:
    """Create conf file.

    Arguments:
        name: Conf file name.

    Returns:
        Conf file object.
    """

    if self._confs is None:
        self._confs = self._rest_client.confs

    conf = self._confs.create(name)
    return ConfFile(
        name,
        conf,
        self._session_key,
        self._app,
        self._owner,
        self._scheme,
        self._host,
        self._port,
        self._realm,
        **self._context,
    )

get_conf(name, refresh=False)

Get conf file.

Parameters:

Name Type Description Default
name str

Conf file name.

required
refresh bool

(optional) Flag to refresh conf file list, default is False.

False

Returns:

Type Description
ConfFile

Conf file object.

Raises:

Type Description
ConfManagerException

If conf_file does not exist.

Source code in solnlib/conf_manager.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
@retry(exceptions=[binding.HTTPError])
def get_conf(self, name: str, refresh: bool = False) -> ConfFile:
    """Get conf file.

    Arguments:
        name: Conf file name.
        refresh: (optional) Flag to refresh conf file list, default is False.

    Returns:
        Conf file object.

    Raises:
        ConfManagerException: If `conf_file` does not exist.
    """

    if self._confs is None or refresh:
        # Fix bug that can't pass `-` as app name.
        curr_app = self._rest_client.namespace.app
        self._rest_client.namespace.app = "dummy"
        self._confs = self._rest_client.confs
        self._rest_client.namespace.app = curr_app

    try:
        conf = self._confs[name]
    except KeyError:
        raise ConfManagerException(f"Config file: {name} does not exist.")

    return ConfFile(
        name,
        conf,
        self._session_key,
        self._app,
        self._owner,
        self._scheme,
        self._host,
        self._port,
        self._realm,
        **self._context,
    )

ConfManagerException

Bases: Exception

Exception raised by ConfManager class.

Source code in solnlib/conf_manager.py
364
365
366
367
class ConfManagerException(Exception):
    """Exception raised by ConfManager class."""

    pass

ConfStanzaNotExistException

Bases: Exception

Exception raised by ConfFile class.

Source code in solnlib/conf_manager.py
40
41
42
43
class ConfStanzaNotExistException(Exception):
    """Exception raised by ConfFile class."""

    pass

get_log_level(*, logger, session_key, app_name, conf_name, log_stanza='logging', log_level_field='loglevel', default_log_level='INFO')

This function returns the log level for the addon from configuration file.

Parameters:

Name Type Description Default
logger logging.Logger

Logger.

required
session_key str

Splunk access token.

required
app_name str

Add-on name.

required
conf_name str

Configuration file name where logging stanza is.

required
log_stanza str

Logging stanza to define log_level_field and its value.

'logging'
log_level_field str

Logging level field name under logging stanza.

'loglevel'
default_log_level str

Default log level to return in case of errors.

'INFO'

Returns:

Type Description
str

Log level defined under logging.log_level_field field in conf_name

str

file. In case of any error, default_log_level will be returned.

Examples:

>>> from solnlib import conf_manager
>>> log_level = conf_manager.get_log_level(
>>>     logger,
>>>     "session_key",
>>>     "ADDON_NAME",
>>>     "splunk_ta_addon_settings",
>>> )
Source code in solnlib/conf_manager.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
def get_log_level(
    *,
    logger: logging.Logger,
    session_key: str,
    app_name: str,
    conf_name: str,
    log_stanza: str = "logging",
    log_level_field: str = "loglevel",
    default_log_level: str = "INFO",
) -> str:
    """This function returns the log level for the addon from configuration
    file.

    Arguments:
        logger: Logger.
        session_key: Splunk access token.
        app_name: Add-on name.
        conf_name: Configuration file name where logging stanza is.
        log_stanza: Logging stanza to define `log_level_field` and its value.
        log_level_field: Logging level field name under logging stanza.
        default_log_level: Default log level to return in case of errors.

    Returns:
        Log level defined under `logging.log_level_field` field in `conf_name`
        file. In case of any error, `default_log_level` will be returned.

    Examples:
        >>> from solnlib import conf_manager
        >>> log_level = conf_manager.get_log_level(
        >>>     logger,
        >>>     "session_key",
        >>>     "ADDON_NAME",
        >>>     "splunk_ta_addon_settings",
        >>> )
    """
    try:
        cfm = ConfManager(
            session_key,
            app_name,
            realm=f"__REST_CREDENTIAL__#{app_name}#configs/conf-{conf_name}",
        )
        conf = cfm.get_conf(conf_name)
    except ConfManagerException:
        logger.error(
            f"Failed to fetch configuration file {conf_name}, "
            f"taking {default_log_level} as log level."
        )
        return default_log_level
    try:
        logging_details = conf.get(log_stanza)
        return logging_details.get(log_level_field, default_log_level)
    except ConfStanzaNotExistException:
        logger.error(
            f'"logging" stanza does not exist under {conf_name}, '
            f"taking {default_log_level} as log level."
        )
        return default_log_level