Skip to content

user_access.py

Splunk user access control related utilities.

__all__ = ['ObjectACLException', 'ObjectACL', 'ObjectACLManagerException', 'ObjectACLManager', 'AppCapabilityManagerException', 'AppCapabilityManager', 'UserAccessException', 'check_user_access', 'InvalidSessionKeyException', 'get_current_username', 'UserNotExistException', 'get_user_capabilities', 'user_is_capable', 'get_user_roles'] module-attribute

AppCapabilityManager

App capability manager.

Examples:

>>> from solnlib import user_access
>>> acm = user_access.AppCapabilityManager('test_collection',
                                           session_key,
                                           'Splunk_TA_test')
>>> acm.register_capabilities(...)
>>> acm.unregister_capabilities(...)
Source code in solnlib/user_access.py
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
class AppCapabilityManager:
    """App capability manager.

    Examples:
       >>> from solnlib import user_access
       >>> acm = user_access.AppCapabilityManager('test_collection',
                                                  session_key,
                                                  'Splunk_TA_test')
       >>> acm.register_capabilities(...)
       >>> acm.unregister_capabilities(...)
    """

    def __init__(
        self,
        collection_name: str,
        session_key: str,
        app: str,
        owner: str = "nobody",
        scheme: str = None,
        host: str = None,
        port: int = None,
        **context: dict,
    ):
        """Initializes AppCapabilityManager.

        Arguments:
            collection_name: Collection name to store capabilities.
            session_key: Splunk access token.
            app: App name of namespace.
            owner: (optional) Owner of namespace, default is `nobody`.
            scheme: (optional) The access scheme, default is None.
            host: (optional) The host name, default is None.
            port: (optional) The port number, default is None.
            context: Other configurations for Splunk rest client.

        Raises:
            AppCapabilityManagerException: If init AppCapabilityManager failed.
        """
        self._app = app

        collection_name = f"{app}_{collection_name}"
        try:
            self._collection_data = _utils.get_collection_data(
                collection_name,
                session_key,
                app,
                owner,
                scheme,
                host,
                port,
                None,
                **context,
            )
        except KeyError:
            raise AppCapabilityManagerException(
                f"Get app capabilities collection: {collection_name} failed."
            )

    @utils.retry(exceptions=[binding.HTTPError])
    def register_capabilities(self, capabilities: dict):
        """Register app capabilities.

        Arguments:
            capabilities: App capabilities, example:

                {
                    'object_type1': {
                        'read': 'read_app_object_type1',
                        'write': 'write_app_object_type1',
                        'delete': 'delete_app_object_type1'},
                        'object_type2': {
                        'read': 'read_app_object_type2',
                        'write': 'write_app_object_type2',
                        'delete': 'delete_app_object_type2'
                    },
                    ...
                }
        """

        record = {"_key": self._app, "capabilities": capabilities}
        self._collection_data.batch_save(record)

    @utils.retry(exceptions=[binding.HTTPError])
    def unregister_capabilities(self):
        """Unregister app capabilities.

        Raises:
            AppCapabilityNotExistException: If app capabilities are not registered.
        """

        try:
            self._collection_data.delete_by_id(self._app)
        except binding.HTTPError as e:
            if e.status != 404:
                raise

            raise AppCapabilityNotExistException(
                "App capabilities for %s have not been registered." % self._app
            )

    @utils.retry(exceptions=[binding.HTTPError])
    def capabilities_are_registered(self) -> bool:
        """Check if app capabilities are registered.

        Returns:
            True if app capabilities are registered else False.
        """

        try:
            self._collection_data.query_by_id(self._app)
        except binding.HTTPError as e:
            if e.status != 404:
                raise

            return False

        return True

    @utils.retry(exceptions=[binding.HTTPError])
    def get_capabilities(self) -> dict:
        """Get app capabilities.

        Returns:
            App capabilities.

        Raises:
             AppCapabilityNotExistException: If app capabilities are not registered.
        """

        try:
            record = self._collection_data.query_by_id(self._app)
        except binding.HTTPError as e:
            if e.status != 404:
                raise

            raise AppCapabilityNotExistException(
                "App capabilities for %s have not been registered." % self._app
            )

        return record["capabilities"]

__init__(collection_name, session_key, app, owner='nobody', scheme=None, host=None, port=None, **context)

Initializes AppCapabilityManager.

Parameters:

Name Type Description Default
collection_name str

Collection name to store capabilities.

required
session_key str

Splunk access token.

required
app str

App name of namespace.

required
owner str

(optional) Owner of namespace, default is nobody.

'nobody'
scheme str

(optional) The access scheme, default is None.

None
host str

(optional) The host name, default is None.

None
port int

(optional) The port number, default is None.

None
context dict

Other configurations for Splunk rest client.

{}

Raises:

Type Description
AppCapabilityManagerException

If init AppCapabilityManager failed.

Source code in solnlib/user_access.py
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
def __init__(
    self,
    collection_name: str,
    session_key: str,
    app: str,
    owner: str = "nobody",
    scheme: str = None,
    host: str = None,
    port: int = None,
    **context: dict,
):
    """Initializes AppCapabilityManager.

    Arguments:
        collection_name: Collection name to store capabilities.
        session_key: Splunk access token.
        app: App name of namespace.
        owner: (optional) Owner of namespace, default is `nobody`.
        scheme: (optional) The access scheme, default is None.
        host: (optional) The host name, default is None.
        port: (optional) The port number, default is None.
        context: Other configurations for Splunk rest client.

    Raises:
        AppCapabilityManagerException: If init AppCapabilityManager failed.
    """
    self._app = app

    collection_name = f"{app}_{collection_name}"
    try:
        self._collection_data = _utils.get_collection_data(
            collection_name,
            session_key,
            app,
            owner,
            scheme,
            host,
            port,
            None,
            **context,
        )
    except KeyError:
        raise AppCapabilityManagerException(
            f"Get app capabilities collection: {collection_name} failed."
        )

capabilities_are_registered()

Check if app capabilities are registered.

Returns:

Type Description
bool

True if app capabilities are registered else False.

Source code in solnlib/user_access.py
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
@utils.retry(exceptions=[binding.HTTPError])
def capabilities_are_registered(self) -> bool:
    """Check if app capabilities are registered.

    Returns:
        True if app capabilities are registered else False.
    """

    try:
        self._collection_data.query_by_id(self._app)
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        return False

    return True

get_capabilities()

Get app capabilities.

Returns:

Type Description
dict

App capabilities.

Raises:

Type Description
AppCapabilityNotExistException

If app capabilities are not registered.

Source code in solnlib/user_access.py
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
@utils.retry(exceptions=[binding.HTTPError])
def get_capabilities(self) -> dict:
    """Get app capabilities.

    Returns:
        App capabilities.

    Raises:
         AppCapabilityNotExistException: If app capabilities are not registered.
    """

    try:
        record = self._collection_data.query_by_id(self._app)
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        raise AppCapabilityNotExistException(
            "App capabilities for %s have not been registered." % self._app
        )

    return record["capabilities"]

register_capabilities(capabilities)

Register app capabilities.

Parameters:

Name Type Description Default
capabilities dict

App capabilities, example:

{ ‘object_type1’: { ‘read’: ‘read_app_object_type1’, ‘write’: ‘write_app_object_type1’, ‘delete’: ‘delete_app_object_type1’}, ‘object_type2’: { ‘read’: ‘read_app_object_type2’, ‘write’: ‘write_app_object_type2’, ‘delete’: ‘delete_app_object_type2’ }, … }

required
Source code in solnlib/user_access.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
@utils.retry(exceptions=[binding.HTTPError])
def register_capabilities(self, capabilities: dict):
    """Register app capabilities.

    Arguments:
        capabilities: App capabilities, example:

            {
                'object_type1': {
                    'read': 'read_app_object_type1',
                    'write': 'write_app_object_type1',
                    'delete': 'delete_app_object_type1'},
                    'object_type2': {
                    'read': 'read_app_object_type2',
                    'write': 'write_app_object_type2',
                    'delete': 'delete_app_object_type2'
                },
                ...
            }
    """

    record = {"_key": self._app, "capabilities": capabilities}
    self._collection_data.batch_save(record)

unregister_capabilities()

Unregister app capabilities.

Raises:

Type Description
AppCapabilityNotExistException

If app capabilities are not registered.

Source code in solnlib/user_access.py
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
@utils.retry(exceptions=[binding.HTTPError])
def unregister_capabilities(self):
    """Unregister app capabilities.

    Raises:
        AppCapabilityNotExistException: If app capabilities are not registered.
    """

    try:
        self._collection_data.delete_by_id(self._app)
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        raise AppCapabilityNotExistException(
            "App capabilities for %s have not been registered." % self._app
        )

AppCapabilityManagerException

Bases: Exception

Exception for AppCapabilityManager.

Source code in solnlib/user_access.py
542
543
544
545
class AppCapabilityManagerException(Exception):
    """Exception for AppCapabilityManager."""

    pass

AppCapabilityNotExistException

Bases: Exception

Exception for the situation when AppCapability does not exist for a specific app.

Source code in solnlib/user_access.py
548
549
550
551
552
class AppCapabilityNotExistException(Exception):
    """Exception for the situation when AppCapability does not exist for a
    specific app."""

    pass

InvalidSessionKeyException

Bases: Exception

Exception when Splunk session key is invalid.

Source code in solnlib/user_access.py
773
774
775
776
class InvalidSessionKeyException(Exception):
    """Exception when Splunk session key is invalid."""

    pass

ObjectACL

Object ACL record.

Examples:

>>> from solnlib import user_access
>>> obj_acl = user_access.ObjectACL(
>>>    'test_collection',
>>>    '9defa6f510d711e6be16a45e60e34295',
>>>    'test_object',
>>>    'Splunk_TA_test',
>>>    'admin',
>>>    {'read': ['*'], 'write': ['admin'], 'delete': ['admin']},
>>>    False)
Source code in solnlib/user_access.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class ObjectACL:
    """Object ACL record.

    Examples:
       >>> from solnlib import user_access
       >>> obj_acl = user_access.ObjectACL(
       >>>    'test_collection',
       >>>    '9defa6f510d711e6be16a45e60e34295',
       >>>    'test_object',
       >>>    'Splunk_TA_test',
       >>>    'admin',
       >>>    {'read': ['*'], 'write': ['admin'], 'delete': ['admin']},
       >>>    False)
    """

    OBJ_COLLECTION_KEY = "obj_collection"
    OBJ_ID_KEY = "obj_id"
    OBJ_TYPE_KEY = "obj_type"
    OBJ_APP_KEY = "obj_app"
    OBJ_OWNER_KEY = "obj_owner"
    OBJ_PERMS_KEY = "obj_perms"
    OBJ_PERMS_READ_KEY = "read"
    OBJ_PERMS_WRITE_KEY = "write"
    OBJ_PERMS_DELETE_KEY = "delete"
    OBJ_PERMS_ALLOW_ALL = "*"
    OBJ_SHARED_BY_INCLUSION_KEY = "obj_shared_by_inclusion"

    def __init__(
        self,
        obj_collection: str,
        obj_id: str,
        obj_type: str,
        obj_app: str,
        obj_owner: str,
        obj_perms: dict,
        obj_shared_by_inclusion: bool,
    ):
        """Initializes ObjectACL.

        Arguments:
            obj_collection: Collection where object currently stored.
            obj_id: ID of this object.
            obj_type: Type of this object.
            obj_app: App of this object.
            obj_owner: Owner of this object.
            obj_perms: Object perms, like: {'read': ['*'], 'write': ['admin'], 'delete': ['admin']}.
            obj_shared_by_inclusion: Flag of object is shared by inclusion.
        """
        self.obj_collection = obj_collection
        self.obj_id = obj_id
        self.obj_type = obj_type
        self.obj_app = obj_app
        self.obj_owner = obj_owner
        self._check_perms(obj_perms)
        self._obj_perms = obj_perms
        self.obj_shared_by_inclusion = obj_shared_by_inclusion

    @classmethod
    def _check_perms(cls, obj_perms):
        if not isinstance(obj_perms, dict):
            raise ObjectACLException(
                "Invalid object acl perms type: %s, should be a dict." % type(obj_perms)
            )

        if not (
            cls.OBJ_PERMS_READ_KEY in obj_perms
            and cls.OBJ_PERMS_WRITE_KEY in obj_perms
            and cls.OBJ_PERMS_DELETE_KEY in obj_perms
        ):
            raise ObjectACLException(
                "Invalid object acl perms: %s, "
                "should include read, write and delete perms." % obj_perms
            )

    @property
    def obj_perms(self):
        return self._obj_perms

    @obj_perms.setter
    def obj_perms(self, obj_perms):
        self._check_perms(obj_perms)
        self._obj_perms = obj_perms

    @property
    def record(self) -> dict:
        """Get object acl record.

        Returns: Object acl record, like:

            {
                '_key': 'test_collection-1234',
                'obj_collection': 'test_collection',
                'obj_id': '1234',
                'obj_type': 'test_object',
                'obj_app': 'Splunk_TA_test',
                'obj_owner': 'admin',
                'obj_perms': {'read': ['*'], 'write': ['admin'], 'delete': ['admin']},
                'obj_shared_by_inclusion': True
            }
        """

        return {
            "_key": self.generate_key(self.obj_collection, self.obj_id),
            self.OBJ_COLLECTION_KEY: self.obj_collection,
            self.OBJ_ID_KEY: self.obj_id,
            self.OBJ_TYPE_KEY: self.obj_type,
            self.OBJ_APP_KEY: self.obj_app,
            self.OBJ_OWNER_KEY: self.obj_owner,
            self.OBJ_PERMS_KEY: self._obj_perms,
            self.OBJ_SHARED_BY_INCLUSION_KEY: self.obj_shared_by_inclusion,
        }

    @staticmethod
    def generate_key(obj_collection: str, obj_id: str) -> str:
        """Generate object acl record key.

        Arguments:
            obj_collection: Collection where object currently stored.
            obj_id: ID of this object.

        Returns:
            Object acl record key.
        """

        return "{obj_collection}_{obj_id}".format(
            obj_collection=obj_collection, obj_id=obj_id
        )

    @staticmethod
    def parse(obj_acl_record: dict) -> "ObjectACL":
        """Parse object acl record and construct a new `ObjectACL` object from
        it.

        Arguments:
            obj_acl_record: Object acl record.

        Returns:
            New `ObjectACL` object.
        """

        return ObjectACL(
            obj_acl_record[ObjectACL.OBJ_COLLECTION_KEY],
            obj_acl_record[ObjectACL.OBJ_ID_KEY],
            obj_acl_record[ObjectACL.OBJ_TYPE_KEY],
            obj_acl_record[ObjectACL.OBJ_APP_KEY],
            obj_acl_record[ObjectACL.OBJ_OWNER_KEY],
            obj_acl_record[ObjectACL.OBJ_PERMS_KEY],
            obj_acl_record[ObjectACL.OBJ_SHARED_BY_INCLUSION_KEY],
        )

    def merge(self, obj_acl: "ObjectACL"):
        """Merge current object perms with perms of `obj_acl`.

        Arguments:
            obj_acl: Object acl to merge.
        """

        for perm_key in self._obj_perms:
            self._obj_perms[perm_key] = list(
                set.union(
                    set(self._obj_perms[perm_key]), set(obj_acl._obj_perms[perm_key])
                )
            )
            if self.OBJ_PERMS_ALLOW_ALL in self._obj_perms[perm_key]:
                self._obj_perms[perm_key] = [self.OBJ_PERMS_ALLOW_ALL]

    def __str__(self):
        return json.dumps(self.record)

OBJ_APP_KEY = 'obj_app' class-attribute instance-attribute

OBJ_COLLECTION_KEY = 'obj_collection' class-attribute instance-attribute

OBJ_ID_KEY = 'obj_id' class-attribute instance-attribute

OBJ_OWNER_KEY = 'obj_owner' class-attribute instance-attribute

OBJ_PERMS_ALLOW_ALL = '*' class-attribute instance-attribute

OBJ_PERMS_DELETE_KEY = 'delete' class-attribute instance-attribute

OBJ_PERMS_KEY = 'obj_perms' class-attribute instance-attribute

OBJ_PERMS_READ_KEY = 'read' class-attribute instance-attribute

OBJ_PERMS_WRITE_KEY = 'write' class-attribute instance-attribute

OBJ_SHARED_BY_INCLUSION_KEY = 'obj_shared_by_inclusion' class-attribute instance-attribute

OBJ_TYPE_KEY = 'obj_type' class-attribute instance-attribute

obj_app = obj_app instance-attribute

obj_collection = obj_collection instance-attribute

obj_id = obj_id instance-attribute

obj_owner = obj_owner instance-attribute

obj_perms writable property

obj_shared_by_inclusion = obj_shared_by_inclusion instance-attribute

obj_type = obj_type instance-attribute

record: dict property

Get object acl record.

Object acl record, like:

Type Description
dict

{ ‘_key’: ‘test_collection-1234’, ‘obj_collection’: ‘test_collection’, ‘obj_id’: ‘1234’, ‘obj_type’: ‘test_object’, ‘obj_app’: ‘Splunk_TA_test’, ‘obj_owner’: ‘admin’, ‘obj_perms’: {‘read’: [‘*’], ‘write’: [‘admin’], ‘delete’: [‘admin’]}, ‘obj_shared_by_inclusion’: True

dict

}

__init__(obj_collection, obj_id, obj_type, obj_app, obj_owner, obj_perms, obj_shared_by_inclusion)

Initializes ObjectACL.

Parameters:

Name Type Description Default
obj_collection str

Collection where object currently stored.

required
obj_id str

ID of this object.

required
obj_type str

Type of this object.

required
obj_app str

App of this object.

required
obj_owner str

Owner of this object.

required
obj_perms dict

Object perms, like: {‘read’: [‘*’], ‘write’: [‘admin’], ‘delete’: [‘admin’]}.

required
obj_shared_by_inclusion bool

Flag of object is shared by inclusion.

required
Source code in solnlib/user_access.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def __init__(
    self,
    obj_collection: str,
    obj_id: str,
    obj_type: str,
    obj_app: str,
    obj_owner: str,
    obj_perms: dict,
    obj_shared_by_inclusion: bool,
):
    """Initializes ObjectACL.

    Arguments:
        obj_collection: Collection where object currently stored.
        obj_id: ID of this object.
        obj_type: Type of this object.
        obj_app: App of this object.
        obj_owner: Owner of this object.
        obj_perms: Object perms, like: {'read': ['*'], 'write': ['admin'], 'delete': ['admin']}.
        obj_shared_by_inclusion: Flag of object is shared by inclusion.
    """
    self.obj_collection = obj_collection
    self.obj_id = obj_id
    self.obj_type = obj_type
    self.obj_app = obj_app
    self.obj_owner = obj_owner
    self._check_perms(obj_perms)
    self._obj_perms = obj_perms
    self.obj_shared_by_inclusion = obj_shared_by_inclusion

__str__()

Source code in solnlib/user_access.py
216
217
def __str__(self):
    return json.dumps(self.record)

generate_key(obj_collection, obj_id) staticmethod

Generate object acl record key.

Parameters:

Name Type Description Default
obj_collection str

Collection where object currently stored.

required
obj_id str

ID of this object.

required

Returns:

Type Description
str

Object acl record key.

Source code in solnlib/user_access.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@staticmethod
def generate_key(obj_collection: str, obj_id: str) -> str:
    """Generate object acl record key.

    Arguments:
        obj_collection: Collection where object currently stored.
        obj_id: ID of this object.

    Returns:
        Object acl record key.
    """

    return "{obj_collection}_{obj_id}".format(
        obj_collection=obj_collection, obj_id=obj_id
    )

merge(obj_acl)

Merge current object perms with perms of obj_acl.

Parameters:

Name Type Description Default
obj_acl ObjectACL

Object acl to merge.

required
Source code in solnlib/user_access.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def merge(self, obj_acl: "ObjectACL"):
    """Merge current object perms with perms of `obj_acl`.

    Arguments:
        obj_acl: Object acl to merge.
    """

    for perm_key in self._obj_perms:
        self._obj_perms[perm_key] = list(
            set.union(
                set(self._obj_perms[perm_key]), set(obj_acl._obj_perms[perm_key])
            )
        )
        if self.OBJ_PERMS_ALLOW_ALL in self._obj_perms[perm_key]:
            self._obj_perms[perm_key] = [self.OBJ_PERMS_ALLOW_ALL]

parse(obj_acl_record) staticmethod

Parse object acl record and construct a new ObjectACL object from it.

Parameters:

Name Type Description Default
obj_acl_record dict

Object acl record.

required

Returns:

Type Description
ObjectACL

New ObjectACL object.

Source code in solnlib/user_access.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
@staticmethod
def parse(obj_acl_record: dict) -> "ObjectACL":
    """Parse object acl record and construct a new `ObjectACL` object from
    it.

    Arguments:
        obj_acl_record: Object acl record.

    Returns:
        New `ObjectACL` object.
    """

    return ObjectACL(
        obj_acl_record[ObjectACL.OBJ_COLLECTION_KEY],
        obj_acl_record[ObjectACL.OBJ_ID_KEY],
        obj_acl_record[ObjectACL.OBJ_TYPE_KEY],
        obj_acl_record[ObjectACL.OBJ_APP_KEY],
        obj_acl_record[ObjectACL.OBJ_OWNER_KEY],
        obj_acl_record[ObjectACL.OBJ_PERMS_KEY],
        obj_acl_record[ObjectACL.OBJ_SHARED_BY_INCLUSION_KEY],
    )

ObjectACLException

Bases: Exception

Source code in solnlib/user_access.py
46
47
class ObjectACLException(Exception):
    pass

ObjectACLManager

Object ACL manager.

Examples:

>>> from solnlib import user_access
>>> oaclm = user_access.ObjectACLManager(session_key,
                                         'Splunk_TA_test')
Source code in solnlib/user_access.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
class ObjectACLManager:
    """Object ACL manager.

    Examples:
       >>> from solnlib import user_access
       >>> oaclm = user_access.ObjectACLManager(session_key,
                                                'Splunk_TA_test')
    """

    def __init__(
        self,
        collection_name: str,
        session_key: str,
        app: str,
        owner: Optional[str] = "nobody",
        scheme: Optional[str] = None,
        host: Optional[str] = None,
        port: Optional[int] = None,
        **context: dict,
    ):
        """Initializes ObjectACLManager.

        Arguments:
            collection_name: Collection name to store object ACL info.
            session_key: Splunk access token.
            app: App name of namespace.
            owner: (optional) Owner of namespace, default is `nobody`.
            scheme: (optional) The access scheme, default is None.
            host: (optional) The host name, default is None.
            port: (optional) The port number, default is None.
            context: Other configurations for Splunk rest client.

        Raises:
            ObjectACLManagerException: If init ObjectACLManager failed.
        """
        collection_name = "{app}_{collection_name}".format(
            app=app, collection_name=collection_name
        )
        try:
            self._collection_data = _utils.get_collection_data(
                collection_name,
                session_key,
                app,
                owner,
                scheme,
                host,
                port,
                None,
                **context,
            )
        except KeyError:
            raise ObjectACLManagerException(
                f"Get object acl collection: {collection_name} fail."
            )

    @utils.retry(exceptions=[binding.HTTPError])
    def update_acl(
        self,
        obj_collection: str,
        obj_id: str,
        obj_type: str,
        obj_app: str,
        obj_owner: str,
        obj_perms: dict,
        obj_shared_by_inclusion: bool = True,
        replace_existing: bool = True,
    ):
        """Update acl info of object.

        Construct a new object acl info first, if `replace_existing` is True
        then replace existing acl info else merge new object acl info with the
        old one and replace the old acl info with merged acl info.

        Arguments:
            obj_collection: Collection where object currently stored.
            obj_id: ID of this object.
            obj_type: Type of this object.
            obj_app: App of this object.
            obj_owner: Owner of this object.
            obj_perms: Object perms, like:

                {
                    'read': ['*'],
                    'write': ['admin'],
                    'delete': ['admin']
                }.
            obj_shared_by_inclusion: (optional) Flag of object is shared by
                inclusion, default is True.
            replace_existing: (optional) Replace existing acl info flag, True
                indicates replace old acl info with new one else merge with old
                acl info, default is True.
        """

        obj_acl = ObjectACL(
            obj_collection,
            obj_id,
            obj_type,
            obj_app,
            obj_owner,
            obj_perms,
            obj_shared_by_inclusion,
        )

        if not replace_existing:
            try:
                old_obj_acl = self.get_acl(obj_collection, obj_id)
            except ObjectACLNotExistException:
                old_obj_acl = None

            if old_obj_acl:
                obj_acl.merge(old_obj_acl)

        self._collection_data.batch_save(obj_acl.record)

    @utils.retry(exceptions=[binding.HTTPError])
    def update_acls(
        self,
        obj_collection: str,
        obj_ids: List[str],
        obj_type: str,
        obj_app: str,
        obj_owner: str,
        obj_perms: dict,
        obj_shared_by_inclusion: bool = True,
        replace_existing: bool = True,
    ):
        """Batch update object acl info to all provided `obj_ids`.

        Arguments:
            obj_collection: Collection where objects currently stored.
            obj_ids: IDs list of objects.
            obj_type: Type of this object.
            obj_app: App of this object.
            obj_owner: Owner of this object.
            obj_perms: Object perms, like:

                {
                    'read': ['*'],
                    'write': ['admin'],
                    'delete': ['admin']
                }.
            obj_shared_by_inclusion: (optional) Flag of object is shared by
                inclusion, default is True.
            replace_existing: (optional) Replace existing acl info flag, True
                indicates replace old acl info with new one else merge with old acl
                info, default is True.
        """

        obj_acl_records = []
        for obj_id in obj_ids:
            obj_acl = ObjectACL(
                obj_collection,
                obj_id,
                obj_type,
                obj_app,
                obj_owner,
                obj_perms,
                obj_shared_by_inclusion,
            )

            if not replace_existing:
                try:
                    old_obj_acl = self.get_acl(obj_collection, obj_id)
                except ObjectACLNotExistException:
                    old_obj_acl = None

                if old_obj_acl:
                    obj_acl.merge(old_obj_acl)

            obj_acl_records.append(obj_acl.record)

        self._collection_data.batch_save(*obj_acl_records)

    @utils.retry(exceptions=[binding.HTTPError])
    def get_acl(self, obj_collection: str, obj_id: str) -> "ObjectACL":
        """Get acl info.

        Query object acl info with parameter of the combination of
        `obj_collection` and `obj_id` from `self.collection_name` and
        return it.

        Arguments:
            obj_collection: Collection where object currently stored.
            obj_id: ID of this object.

        Returns:
            Object acl info if success else None.

        Raises:
            ObjectACLNotExistException: If object ACL info does not exist.
        """

        key = ObjectACL.generate_key(obj_collection, obj_id)
        try:
            obj_acl = self._collection_data.query_by_id(key)
        except binding.HTTPError as e:
            if e.status != 404:
                raise

            raise ObjectACLNotExistException(
                "Object ACL info of {}_{} does not exist.".format(
                    obj_collection, obj_id
                )
            )

        return ObjectACL.parse(obj_acl)

    @utils.retry(exceptions=[binding.HTTPError])
    def get_acls(self, obj_collection: str, obj_ids: List[str]) -> List[ObjectACL]:
        """Batch get acl info.

        Query objects acl info with parameter of the combination of
        `obj_collection` and `obj_ids` from KVStore and return them.

        Arguments:
            obj_collection: Collection where object currently stored.
            obj_ids: IDs of objects.

        Returns:
            List of `ObjectACL` instances.
        """

        query = json.dumps(
            {
                "$or": [
                    {"_key": ObjectACL.generate_key(obj_collection, obj_id)}
                    for obj_id in obj_ids
                ]
            }
        )
        obj_acls = self._collection_data.query(query=query)

        return [ObjectACL.parse(obj_acl) for obj_acl in obj_acls]

    @utils.retry(exceptions=[binding.HTTPError])
    def delete_acl(self, obj_collection: str, obj_id: str):
        """Delete acl info.

        Query object acl info with parameter of the combination of
        `obj_collection` and `obj_ids` from KVStore and delete it.

        Arguments:
            obj_collection: Collection where object currently stored.
            obj_id: ID of this object.

        Raises:
            ObjectACLNotExistException: If object ACL info does not exist.
        """

        key = ObjectACL.generate_key(obj_collection, obj_id)
        try:
            self._collection_data.delete_by_id(key)
        except binding.HTTPError as e:
            if e.status != 404:
                raise

            raise ObjectACLNotExistException(
                "Object ACL info of {}_{} does not exist.".format(
                    obj_collection, obj_id
                )
            )

    @utils.retry(exceptions=[binding.HTTPError])
    def delete_acls(self, obj_collection: str, obj_ids: List[str]):
        """Batch delete acl info.

        Query objects acl info with parameter of the combination of
        `obj_collection` and `obj_ids` from KVStore and delete them.

        Arguments:
            obj_collection: Collection where object currently stored.
            obj_ids: IDs of objects.
        """

        query = json.dumps(
            {
                "$or": [
                    {"_key": ObjectACL.generate_key(obj_collection, obj_id)}
                    for obj_id in obj_ids
                ]
            }
        )
        self._collection_data.delete(query=query)

    @utils.retry(exceptions=[binding.HTTPError])
    def get_accessible_object_ids(
        self, user: str, operation: str, obj_collection: str, obj_ids: List[str]
    ) -> List[str]:
        """Get accessible IDs of objects from `obj_acls`.

        Arguments:
            user: User name of current `operation`.
            operation: User operation, possible option: (read/write/delete).
            obj_collection: Collection where object currently stored.
            obj_ids: IDs of objects.

        Returns:
            List of IDs of accessible objects.
        """

        obj_acls = self.get_acls(obj_collection, obj_ids)
        accessible_obj_ids = []
        for obj_acl in obj_acls:
            perms = obj_acl.obj_perms[operation]
            if ObjectACL.OBJ_PERMS_ALLOW_ALL in perms or user in perms:
                accessible_obj_ids.append(obj_acl.obj_id)

        return accessible_obj_ids

__init__(collection_name, session_key, app, owner='nobody', scheme=None, host=None, port=None, **context)

Initializes ObjectACLManager.

Parameters:

Name Type Description Default
collection_name str

Collection name to store object ACL info.

required
session_key str

Splunk access token.

required
app str

App name of namespace.

required
owner Optional[str]

(optional) Owner of namespace, default is nobody.

'nobody'
scheme Optional[str]

(optional) The access scheme, default is None.

None
host Optional[str]

(optional) The host name, default is None.

None
port Optional[int]

(optional) The port number, default is None.

None
context dict

Other configurations for Splunk rest client.

{}

Raises:

Type Description
ObjectACLManagerException

If init ObjectACLManager failed.

Source code in solnlib/user_access.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def __init__(
    self,
    collection_name: str,
    session_key: str,
    app: str,
    owner: Optional[str] = "nobody",
    scheme: Optional[str] = None,
    host: Optional[str] = None,
    port: Optional[int] = None,
    **context: dict,
):
    """Initializes ObjectACLManager.

    Arguments:
        collection_name: Collection name to store object ACL info.
        session_key: Splunk access token.
        app: App name of namespace.
        owner: (optional) Owner of namespace, default is `nobody`.
        scheme: (optional) The access scheme, default is None.
        host: (optional) The host name, default is None.
        port: (optional) The port number, default is None.
        context: Other configurations for Splunk rest client.

    Raises:
        ObjectACLManagerException: If init ObjectACLManager failed.
    """
    collection_name = "{app}_{collection_name}".format(
        app=app, collection_name=collection_name
    )
    try:
        self._collection_data = _utils.get_collection_data(
            collection_name,
            session_key,
            app,
            owner,
            scheme,
            host,
            port,
            None,
            **context,
        )
    except KeyError:
        raise ObjectACLManagerException(
            f"Get object acl collection: {collection_name} fail."
        )

delete_acl(obj_collection, obj_id)

Delete acl info.

Query object acl info with parameter of the combination of obj_collection and obj_ids from KVStore and delete it.

Parameters:

Name Type Description Default
obj_collection str

Collection where object currently stored.

required
obj_id str

ID of this object.

required

Raises:

Type Description
ObjectACLNotExistException

If object ACL info does not exist.

Source code in solnlib/user_access.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
@utils.retry(exceptions=[binding.HTTPError])
def delete_acl(self, obj_collection: str, obj_id: str):
    """Delete acl info.

    Query object acl info with parameter of the combination of
    `obj_collection` and `obj_ids` from KVStore and delete it.

    Arguments:
        obj_collection: Collection where object currently stored.
        obj_id: ID of this object.

    Raises:
        ObjectACLNotExistException: If object ACL info does not exist.
    """

    key = ObjectACL.generate_key(obj_collection, obj_id)
    try:
        self._collection_data.delete_by_id(key)
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        raise ObjectACLNotExistException(
            "Object ACL info of {}_{} does not exist.".format(
                obj_collection, obj_id
            )
        )

delete_acls(obj_collection, obj_ids)

Batch delete acl info.

Query objects acl info with parameter of the combination of obj_collection and obj_ids from KVStore and delete them.

Parameters:

Name Type Description Default
obj_collection str

Collection where object currently stored.

required
obj_ids List[str]

IDs of objects.

required
Source code in solnlib/user_access.py
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
@utils.retry(exceptions=[binding.HTTPError])
def delete_acls(self, obj_collection: str, obj_ids: List[str]):
    """Batch delete acl info.

    Query objects acl info with parameter of the combination of
    `obj_collection` and `obj_ids` from KVStore and delete them.

    Arguments:
        obj_collection: Collection where object currently stored.
        obj_ids: IDs of objects.
    """

    query = json.dumps(
        {
            "$or": [
                {"_key": ObjectACL.generate_key(obj_collection, obj_id)}
                for obj_id in obj_ids
            ]
        }
    )
    self._collection_data.delete(query=query)

get_accessible_object_ids(user, operation, obj_collection, obj_ids)

Get accessible IDs of objects from obj_acls.

Parameters:

Name Type Description Default
user str

User name of current operation.

required
operation str

User operation, possible option: (read/write/delete).

required
obj_collection str

Collection where object currently stored.

required
obj_ids List[str]

IDs of objects.

required

Returns:

Type Description
List[str]

List of IDs of accessible objects.

Source code in solnlib/user_access.py
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
@utils.retry(exceptions=[binding.HTTPError])
def get_accessible_object_ids(
    self, user: str, operation: str, obj_collection: str, obj_ids: List[str]
) -> List[str]:
    """Get accessible IDs of objects from `obj_acls`.

    Arguments:
        user: User name of current `operation`.
        operation: User operation, possible option: (read/write/delete).
        obj_collection: Collection where object currently stored.
        obj_ids: IDs of objects.

    Returns:
        List of IDs of accessible objects.
    """

    obj_acls = self.get_acls(obj_collection, obj_ids)
    accessible_obj_ids = []
    for obj_acl in obj_acls:
        perms = obj_acl.obj_perms[operation]
        if ObjectACL.OBJ_PERMS_ALLOW_ALL in perms or user in perms:
            accessible_obj_ids.append(obj_acl.obj_id)

    return accessible_obj_ids

get_acl(obj_collection, obj_id)

Get acl info.

Query object acl info with parameter of the combination of obj_collection and obj_id from self.collection_name and return it.

Parameters:

Name Type Description Default
obj_collection str

Collection where object currently stored.

required
obj_id str

ID of this object.

required

Returns:

Type Description
ObjectACL

Object acl info if success else None.

Raises:

Type Description
ObjectACLNotExistException

If object ACL info does not exist.

Source code in solnlib/user_access.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
@utils.retry(exceptions=[binding.HTTPError])
def get_acl(self, obj_collection: str, obj_id: str) -> "ObjectACL":
    """Get acl info.

    Query object acl info with parameter of the combination of
    `obj_collection` and `obj_id` from `self.collection_name` and
    return it.

    Arguments:
        obj_collection: Collection where object currently stored.
        obj_id: ID of this object.

    Returns:
        Object acl info if success else None.

    Raises:
        ObjectACLNotExistException: If object ACL info does not exist.
    """

    key = ObjectACL.generate_key(obj_collection, obj_id)
    try:
        obj_acl = self._collection_data.query_by_id(key)
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        raise ObjectACLNotExistException(
            "Object ACL info of {}_{} does not exist.".format(
                obj_collection, obj_id
            )
        )

    return ObjectACL.parse(obj_acl)

get_acls(obj_collection, obj_ids)

Batch get acl info.

Query objects acl info with parameter of the combination of obj_collection and obj_ids from KVStore and return them.

Parameters:

Name Type Description Default
obj_collection str

Collection where object currently stored.

required
obj_ids List[str]

IDs of objects.

required

Returns:

Type Description
List[ObjectACL]

List of ObjectACL instances.

Source code in solnlib/user_access.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
@utils.retry(exceptions=[binding.HTTPError])
def get_acls(self, obj_collection: str, obj_ids: List[str]) -> List[ObjectACL]:
    """Batch get acl info.

    Query objects acl info with parameter of the combination of
    `obj_collection` and `obj_ids` from KVStore and return them.

    Arguments:
        obj_collection: Collection where object currently stored.
        obj_ids: IDs of objects.

    Returns:
        List of `ObjectACL` instances.
    """

    query = json.dumps(
        {
            "$or": [
                {"_key": ObjectACL.generate_key(obj_collection, obj_id)}
                for obj_id in obj_ids
            ]
        }
    )
    obj_acls = self._collection_data.query(query=query)

    return [ObjectACL.parse(obj_acl) for obj_acl in obj_acls]

update_acl(obj_collection, obj_id, obj_type, obj_app, obj_owner, obj_perms, obj_shared_by_inclusion=True, replace_existing=True)

Update acl info of object.

Construct a new object acl info first, if replace_existing is True then replace existing acl info else merge new object acl info with the old one and replace the old acl info with merged acl info.

Parameters:

Name Type Description Default
obj_collection str

Collection where object currently stored.

required
obj_id str

ID of this object.

required
obj_type str

Type of this object.

required
obj_app str

App of this object.

required
obj_owner str

Owner of this object.

required
obj_perms dict

Object perms, like:

{ ‘read’: [‘*’], ‘write’: [‘admin’], ‘delete’: [‘admin’] }.

required
obj_shared_by_inclusion bool

(optional) Flag of object is shared by inclusion, default is True.

True
replace_existing bool

(optional) Replace existing acl info flag, True indicates replace old acl info with new one else merge with old acl info, default is True.

True
Source code in solnlib/user_access.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
@utils.retry(exceptions=[binding.HTTPError])
def update_acl(
    self,
    obj_collection: str,
    obj_id: str,
    obj_type: str,
    obj_app: str,
    obj_owner: str,
    obj_perms: dict,
    obj_shared_by_inclusion: bool = True,
    replace_existing: bool = True,
):
    """Update acl info of object.

    Construct a new object acl info first, if `replace_existing` is True
    then replace existing acl info else merge new object acl info with the
    old one and replace the old acl info with merged acl info.

    Arguments:
        obj_collection: Collection where object currently stored.
        obj_id: ID of this object.
        obj_type: Type of this object.
        obj_app: App of this object.
        obj_owner: Owner of this object.
        obj_perms: Object perms, like:

            {
                'read': ['*'],
                'write': ['admin'],
                'delete': ['admin']
            }.
        obj_shared_by_inclusion: (optional) Flag of object is shared by
            inclusion, default is True.
        replace_existing: (optional) Replace existing acl info flag, True
            indicates replace old acl info with new one else merge with old
            acl info, default is True.
    """

    obj_acl = ObjectACL(
        obj_collection,
        obj_id,
        obj_type,
        obj_app,
        obj_owner,
        obj_perms,
        obj_shared_by_inclusion,
    )

    if not replace_existing:
        try:
            old_obj_acl = self.get_acl(obj_collection, obj_id)
        except ObjectACLNotExistException:
            old_obj_acl = None

        if old_obj_acl:
            obj_acl.merge(old_obj_acl)

    self._collection_data.batch_save(obj_acl.record)

update_acls(obj_collection, obj_ids, obj_type, obj_app, obj_owner, obj_perms, obj_shared_by_inclusion=True, replace_existing=True)

Batch update object acl info to all provided obj_ids.

Parameters:

Name Type Description Default
obj_collection str

Collection where objects currently stored.

required
obj_ids List[str]

IDs list of objects.

required
obj_type str

Type of this object.

required
obj_app str

App of this object.

required
obj_owner str

Owner of this object.

required
obj_perms dict

Object perms, like:

{ ‘read’: [‘*’], ‘write’: [‘admin’], ‘delete’: [‘admin’] }.

required
obj_shared_by_inclusion bool

(optional) Flag of object is shared by inclusion, default is True.

True
replace_existing bool

(optional) Replace existing acl info flag, True indicates replace old acl info with new one else merge with old acl info, default is True.

True
Source code in solnlib/user_access.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
@utils.retry(exceptions=[binding.HTTPError])
def update_acls(
    self,
    obj_collection: str,
    obj_ids: List[str],
    obj_type: str,
    obj_app: str,
    obj_owner: str,
    obj_perms: dict,
    obj_shared_by_inclusion: bool = True,
    replace_existing: bool = True,
):
    """Batch update object acl info to all provided `obj_ids`.

    Arguments:
        obj_collection: Collection where objects currently stored.
        obj_ids: IDs list of objects.
        obj_type: Type of this object.
        obj_app: App of this object.
        obj_owner: Owner of this object.
        obj_perms: Object perms, like:

            {
                'read': ['*'],
                'write': ['admin'],
                'delete': ['admin']
            }.
        obj_shared_by_inclusion: (optional) Flag of object is shared by
            inclusion, default is True.
        replace_existing: (optional) Replace existing acl info flag, True
            indicates replace old acl info with new one else merge with old acl
            info, default is True.
    """

    obj_acl_records = []
    for obj_id in obj_ids:
        obj_acl = ObjectACL(
            obj_collection,
            obj_id,
            obj_type,
            obj_app,
            obj_owner,
            obj_perms,
            obj_shared_by_inclusion,
        )

        if not replace_existing:
            try:
                old_obj_acl = self.get_acl(obj_collection, obj_id)
            except ObjectACLNotExistException:
                old_obj_acl = None

            if old_obj_acl:
                obj_acl.merge(old_obj_acl)

        obj_acl_records.append(obj_acl.record)

    self._collection_data.batch_save(*obj_acl_records)

ObjectACLManagerException

Bases: Exception

Exception for ObjectACLManager.

Source code in solnlib/user_access.py
220
221
222
223
class ObjectACLManagerException(Exception):
    """Exception for ObjectACLManager."""

    pass

ObjectACLNotExistException

Bases: Exception

Exception for the situation when ACL does not exist.

Source code in solnlib/user_access.py
226
227
228
229
class ObjectACLNotExistException(Exception):
    """Exception for the situation when ACL does not exist."""

    pass

UserAccessException

Bases: Exception

Exception for the situation when there is user access exception.

Source code in solnlib/user_access.py
697
698
699
700
class UserAccessException(Exception):
    """Exception for the situation when there is user access exception."""

    pass

UserNotExistException

Bases: Exception

Exception when user does not exist.

Source code in solnlib/user_access.py
823
824
825
826
class UserNotExistException(Exception):
    """Exception when user does not exist."""

    pass

check_user_access(session_key, capabilities, obj_type, operation, scheme=None, host=None, port=None, **context)

User access checker.

It will fetch user capabilities from given session_key and check if the capability extracted from capabilities, obj_type and operation is contained, if user capabilities include the extracted capability user access is ok else fail.

Parameters:

Name Type Description Default
session_key str

Splunk access token.

required
capabilities dict

App capabilities, example:

{ ‘object_type1’: { ‘read’: ‘read_app_object_type1’, ‘write’: ‘write_app_object_type1’, ‘delete’: ‘delete_app_object_type1’}, ‘object_type2’: { ‘read’: ‘read_app_object_type2’, ‘write’: ‘write_app_object_type2’, ‘delete’: ‘delete_app_object_type2’ }, … }

required
obj_type str

Object type.

required
operation str

User operation, possible option: (read/write/delete).

required
scheme str

(optional) The access scheme, default is None.

None
host str

(optional) The host name, default is None.

None
port int

(optional) The port number, default is None.

None
context dict

Other configurations for Splunk rest client.

{}

Raises:

Type Description
UserAccessException

If user access permission is denied.

Examples:

>>> from solnlib.user_access import check_user_access
>>> def fun():
>>>     check_user_access(
>>>         session_key, capabilities, 'test_object', 'read')
>>>     ...
Source code in solnlib/user_access.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
def check_user_access(
    session_key: str,
    capabilities: dict,
    obj_type: str,
    operation: str,
    scheme: str = None,
    host: str = None,
    port: int = None,
    **context: dict,
):
    """User access checker.

    It will fetch user capabilities from given `session_key` and check if
    the capability extracted from `capabilities`, `obj_type` and `operation`
    is contained, if user capabilities include the extracted capability user
    access is ok else fail.

    Arguments:
        session_key: Splunk access token.
        capabilities: App capabilities, example:

            {
                'object_type1': {
                    'read': 'read_app_object_type1',
                    'write': 'write_app_object_type1',
                    'delete': 'delete_app_object_type1'},
                    'object_type2': {
                    'read': 'read_app_object_type2',
                    'write': 'write_app_object_type2',
                    'delete': 'delete_app_object_type2'
                },
                ...
            }
        obj_type: Object type.
        operation: User operation, possible option: (read/write/delete).
        scheme: (optional) The access scheme, default is None.
        host: (optional) The host name, default is None.
        port: (optional) The port number, default is None.
        context: Other configurations for Splunk rest client.

    Raises:
        UserAccessException: If user access permission is denied.

    Examples:
       >>> from solnlib.user_access import check_user_access
       >>> def fun():
       >>>     check_user_access(
       >>>         session_key, capabilities, 'test_object', 'read')
       >>>     ...
    """

    username = get_current_username(
        session_key, scheme=scheme, host=host, port=port, **context
    )
    capability = capabilities[obj_type][operation]
    if not user_is_capable(
        session_key,
        username,
        capability,
        scheme=scheme,
        host=host,
        port=port,
        **context,
    ):
        raise UserAccessException(
            "Permission denied, %s does not have the capability: %s."
            % (username, capability)
        )

get_current_username(session_key, scheme=None, host=None, port=None, **context)

Get current user name from session_key.

Parameters:

Name Type Description Default
session_key str

Splunk access token.

required
scheme str

(optional) The access scheme, default is None.

None
host str

(optional) The host name, default is None.

None
port int

(optional) The port number, default is None.

None
context dict

Other configurations for Splunk rest client.

{}

Returns:

Type Description
str

Current user name.

Raises:

Type Description
InvalidSessionKeyException

If session_key is invalid.

Examples:

>>> from solnlib import user_access
>>> user_name = user_access.get_current_username(session_key)
Source code in solnlib/user_access.py
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
@utils.retry(exceptions=[binding.HTTPError])
def get_current_username(
    session_key: str,
    scheme: str = None,
    host: str = None,
    port: int = None,
    **context: dict,
) -> str:
    """Get current user name from `session_key`.

    Arguments:
        session_key: Splunk access token.
        scheme: (optional) The access scheme, default is None.
        host: (optional) The host name, default is None.
        port: (optional) The port number, default is None.
        context: Other configurations for Splunk rest client.

    Returns:
        Current user name.

    Raises:
        InvalidSessionKeyException: If `session_key` is invalid.

    Examples:
       >>> from solnlib import user_access
       >>> user_name = user_access.get_current_username(session_key)
    """

    _rest_client = rest_client.SplunkRestClient(
        session_key, "-", scheme=scheme, host=host, port=port, **context
    )
    try:
        response = _rest_client.get(
            "/services/authentication/current-context", output_mode="json"
        ).body.read()
    except binding.HTTPError as e:
        if e.status != 401:
            raise

        raise InvalidSessionKeyException("Invalid session key.")

    return json.loads(response)["entry"][0]["content"]["username"]

get_user_capabilities(session_key, username, scheme=None, host=None, port=None, **context)

Get user capabilities.

Parameters:

Name Type Description Default
session_key str

Splunk access token.

required
scheme str

(optional) The access scheme, default is None.

None
host str

(optional) The host name, default is None.

None
port int

(optional) The port number, default is None.

None
context dict

Other configurations for Splunk rest client.

{}

Returns:

Type Description
List[dict]

User capabilities.

Raises:

Type Description
UserNotExistException

If username does not exist.

Examples:

>>> from solnlib import user_access
>>> user_capabilities = user_access.get_user_capabilities(
>>>     session_key, 'test_user')
Source code in solnlib/user_access.py
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
@utils.retry(exceptions=[binding.HTTPError])
def get_user_capabilities(
    session_key: str,
    username: str,
    scheme: str = None,
    host: str = None,
    port: int = None,
    **context: dict,
) -> List[dict]:
    """Get user capabilities.

    Arguments:
        session_key: Splunk access token.
        scheme: (optional) The access scheme, default is None.
        host: (optional) The host name, default is None.
        port: (optional) The port number, default is None.
        context: Other configurations for Splunk rest client.

    Returns:
        User capabilities.

    Raises:
        UserNotExistException: If `username` does not exist.

    Examples:
       >>> from solnlib import user_access
       >>> user_capabilities = user_access.get_user_capabilities(
       >>>     session_key, 'test_user')
    """

    _rest_client = rest_client.SplunkRestClient(
        session_key, "-", scheme=scheme, host=host, port=port, **context
    )
    url = f"/services/authentication/users/{username}"
    try:
        response = _rest_client.get(url, output_mode="json").body.read()
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        raise UserNotExistException("User: %s does not exist." % username)

    return json.loads(response)["entry"][0]["content"]["capabilities"]

get_user_roles(session_key, username, scheme=None, host=None, port=None, **context)

Get user roles.

Parameters:

Name Type Description Default
session_key str

Splunk access token.

required
username str

(optional) User name of roles to get.

required
scheme

(optional) The access scheme, default is None.

None
host

(optional) The host name, default is None.

None
port

(optional) The port number, default is None.

None
context

Other configurations for Splunk rest client.

{}

Returns:

Type Description
List

User roles.

Raises:

Type Description
UserNotExistException

If username does not exist.

Examples:

>>> from solnlib import user_access
>>> user_roles = user_access.get_user_roles(session_key, 'test_user')
Source code in solnlib/user_access.py
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
@utils.retry(exceptions=[binding.HTTPError])
def get_user_roles(
    session_key: str, username: str, scheme=None, host=None, port=None, **context
) -> List:
    """Get user roles.

    Arguments:
        session_key: Splunk access token.
        username: (optional) User name of roles to get.
        scheme: (optional) The access scheme, default is None.
        host: (optional) The host name, default is None.
        port: (optional) The port number, default is None.
        context: Other configurations for Splunk rest client.

    Returns:
        User roles.

    Raises:
        UserNotExistException: If `username` does not exist.

    Examples:
       >>> from solnlib import user_access
       >>> user_roles = user_access.get_user_roles(session_key, 'test_user')
    """

    _rest_client = rest_client.SplunkRestClient(
        session_key, "-", scheme=scheme, host=host, port=port, **context
    )
    url = f"/services/authentication/users/{username}"
    try:
        response = _rest_client.get(url, output_mode="json").body.read()
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        raise UserNotExistException("User: %s does not exist." % username)

    return json.loads(response)["entry"][0]["content"]["roles"]

user_is_capable(session_key, username, capability, scheme=None, host=None, port=None, **context)

Check if user is capable for given capability.

Parameters:

Name Type Description Default
session_key str

Splunk access token.

required
username str

(optional) User name of roles to get.

required
capability str

The capability we wish to check for.

required
scheme str

(optional) The access scheme, default is None.

None
host str

(optional) The host name, default is None.

None
port int

(optional) The port number, default is None.

None
context dict

Other configurations for Splunk rest client.

{}

Returns:

Type Description
bool

True if user is capable else False.

Raises:

Type Description
UserNotExistException

If username does not exist.

Examples:

>>> from solnlib import user_access
>>> is_capable = user_access.user_is_capable(
>>>     session_key, 'test_user', 'object_read_capability')
Source code in solnlib/user_access.py
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
def user_is_capable(
    session_key: str,
    username: str,
    capability: str,
    scheme: str = None,
    host: str = None,
    port: int = None,
    **context: dict,
) -> bool:
    """Check if user is capable for given `capability`.

    Arguments:
        session_key: Splunk access token.
        username: (optional) User name of roles to get.
        capability: The capability we wish to check for.
        scheme: (optional) The access scheme, default is None.
        host: (optional) The host name, default is None.
        port: (optional) The port number, default is None.
        context: Other configurations for Splunk rest client.

    Returns:
        True if user is capable else False.

    Raises:
        UserNotExistException: If `username` does not exist.

    Examples:
       >>> from solnlib import user_access
       >>> is_capable = user_access.user_is_capable(
       >>>     session_key, 'test_user', 'object_read_capability')
    """

    capabilities = get_user_capabilities(
        session_key, username, scheme=scheme, host=host, port=port, **context
    )
    return capability in capabilities