Skip to content

acl.py

This module contains interfaces that support CRUD operations on ACL.

__all__ = ['ACLException', 'ACLManager'] module-attribute

ACLException

Bases: Exception

Exception raised by ACLManager.

Source code in solnlib/acl.py
30
31
32
33
class ACLException(Exception):
    """Exception raised by ACLManager."""

    pass

ACLManager

ACL manager.

Examples:

>>> import solnlib.acl as sacl
>>> saclm = sacl.ACLManager(session_key, 'Splunk_TA_test')
>>> saclm.get('data/transforms/extractions')
>>> saclm.update('data/transforms/extractions/_acl',
                 perms_read=['*'], perms_write=['*'])
Source code in solnlib/acl.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class ACLManager:
    """ACL manager.

    Examples:
       >>> import solnlib.acl as sacl
       >>> saclm = sacl.ACLManager(session_key, 'Splunk_TA_test')
       >>> saclm.get('data/transforms/extractions')
       >>> saclm.update('data/transforms/extractions/_acl',
                        perms_read=['*'], perms_write=['*'])
    """

    def __init__(
        self,
        session_key: str,
        app: str,
        owner: str = "nobody",
        scheme: str = None,
        host: str = None,
        port: int = None,
        **context: dict
    ):
        """Initializes ACLManager.

        Arguments:
            session_key: Splunk access token.
            app: App name of namespace.
            owner: (optional) Owner of namespace, default is `nobody`.
            scheme: (optional) The access scheme, default is None.
            host: (optional) The host name, default is None.
            port: (optional) The port number, default is None.
            context: Other configurations for Splunk rest client.
        """
        self._rest_client = rest_client.SplunkRestClient(
            session_key,
            app,
            owner=owner,
            scheme=scheme,
            host=host,
            port=port,
            **context
        )

    @retry(exceptions=[binding.HTTPError])
    def get(self, path: str) -> dict:
        """Get ACL of  /servicesNS/{`owner`}/{`app`}/{`path`}.

        Arguments:
            path: Path of ACL relative to /servicesNS/{`owner`}/{`app`}

        Returns:
            A dict contains ACL.

        Raises:
            ACLException: If `path` is invalid.

        Examples:
           >>> aclm = acl.ACLManager(session_key, 'Splunk_TA_test')
           >>> perms = aclm.get('data/transforms/extractions/_acl')
        """

        try:
            content = self._rest_client.get(path, output_mode="json").body.read()
        except binding.HTTPError as e:
            if e.status != 404:
                raise

            raise ACLException("Invalid endpoint: %s.", path)

        return json.loads(content)["entry"][0]["acl"]

    @retry(exceptions=[binding.HTTPError])
    def update(
        self,
        path: str,
        owner: str = None,
        perms_read: List = None,
        perms_write: List = None,
    ) -> dict:
        """Update ACL of /servicesNS/{`owner`}/{`app`}/{`path`}.

        If the ACL is per-entity (ends in /acl), owner can be reassigned. If
        the acl is endpoint-level (ends in _acl), owner will be ignored. The
        'sharing' setting is always retrieved from the current.

        Arguments:
            path: Path of ACL relative to /servicesNS/{owner}/{app}. MUST
                end with /acl or /_acl indicating whether the permission is applied
                at the per-entity level or endpoint level respectively.
            owner: (optional) New owner of ACL, default is `nobody`.
            perms_read: (optional) List of roles (['*'] for all roles). If
                unspecified we will POST with current (if available) perms.read,
                default is None.
            perms_write: (optional) List of roles (['*'] for all roles). If
                unspecified we will POST with current (if available) perms.write,
                default is None.

        Returns:
            A dict contains ACL after update.

        Raises:
            ACLException: If `path` is invalid.

        Examples:
           >>> aclm = acl.ACLManager(session_key, 'Splunk_TA_test')
           >>> perms = aclm.update('data/transforms/extractions/_acl',
                                   perms_read=['admin'], perms_write=['admin'])
        """

        if not path.endswith("/acl") and not path.endswith("/_acl"):
            raise ACLException(
                "Invalid endpoint: %s, must end with /acl or /_acl." % path
            )

        curr_acl = self.get(path)

        postargs = {}
        if perms_read:
            postargs["perms.read"] = ",".join(perms_read)
        else:
            curr_read = curr_acl["perms"].get("read", [])
            if curr_read:
                postargs["perms.read"] = ",".join(curr_read)

        if perms_write:
            postargs["perms.write"] = ",".join(perms_write)
        else:
            curr_write = curr_acl["perms"].get("write", [])
            if curr_write:
                postargs["perms.write"] = ",".join(curr_write)

        if path.endswith("/acl"):
            # Allow ownership to be reset only at entity level.
            postargs["owner"] = owner or curr_acl["owner"]

        postargs["sharing"] = curr_acl["sharing"]

        try:
            content = self._rest_client.post(
                path, body=binding._encode(**postargs), output_mode="json"
            ).body.read()
        except binding.HTTPError as e:
            if e.status != 404:
                raise

            raise ACLException("Invalid endpoint: %s.", path)

        return json.loads(content)["entry"][0]["acl"]

__init__(session_key, app, owner='nobody', scheme=None, host=None, port=None, **context)

Initializes ACLManager.

Parameters:

Name Type Description Default
session_key str

Splunk access token.

required
app str

App name of namespace.

required
owner str

(optional) Owner of namespace, default is nobody.

'nobody'
scheme str

(optional) The access scheme, default is None.

None
host str

(optional) The host name, default is None.

None
port int

(optional) The port number, default is None.

None
context dict

Other configurations for Splunk rest client.

{}
Source code in solnlib/acl.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def __init__(
    self,
    session_key: str,
    app: str,
    owner: str = "nobody",
    scheme: str = None,
    host: str = None,
    port: int = None,
    **context: dict
):
    """Initializes ACLManager.

    Arguments:
        session_key: Splunk access token.
        app: App name of namespace.
        owner: (optional) Owner of namespace, default is `nobody`.
        scheme: (optional) The access scheme, default is None.
        host: (optional) The host name, default is None.
        port: (optional) The port number, default is None.
        context: Other configurations for Splunk rest client.
    """
    self._rest_client = rest_client.SplunkRestClient(
        session_key,
        app,
        owner=owner,
        scheme=scheme,
        host=host,
        port=port,
        **context
    )

get(path)

Get ACL of /servicesNS/{owner}/{app}/{path}.

Parameters:

Name Type Description Default
path str

Path of ACL relative to /servicesNS/{owner}/{app}

required

Returns:

Type Description
dict

A dict contains ACL.

Raises:

Type Description
ACLException

If path is invalid.

Examples:

>>> aclm = acl.ACLManager(session_key, 'Splunk_TA_test')
>>> perms = aclm.get('data/transforms/extractions/_acl')
Source code in solnlib/acl.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@retry(exceptions=[binding.HTTPError])
def get(self, path: str) -> dict:
    """Get ACL of  /servicesNS/{`owner`}/{`app`}/{`path`}.

    Arguments:
        path: Path of ACL relative to /servicesNS/{`owner`}/{`app`}

    Returns:
        A dict contains ACL.

    Raises:
        ACLException: If `path` is invalid.

    Examples:
       >>> aclm = acl.ACLManager(session_key, 'Splunk_TA_test')
       >>> perms = aclm.get('data/transforms/extractions/_acl')
    """

    try:
        content = self._rest_client.get(path, output_mode="json").body.read()
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        raise ACLException("Invalid endpoint: %s.", path)

    return json.loads(content)["entry"][0]["acl"]

update(path, owner=None, perms_read=None, perms_write=None)

Update ACL of /servicesNS/{owner}/{app}/{path}.

If the ACL is per-entity (ends in /acl), owner can be reassigned. If the acl is endpoint-level (ends in _acl), owner will be ignored. The ‘sharing’ setting is always retrieved from the current.

Parameters:

Name Type Description Default
path str

Path of ACL relative to /servicesNS/{owner}/{app}. MUST end with /acl or /_acl indicating whether the permission is applied at the per-entity level or endpoint level respectively.

required
owner str

(optional) New owner of ACL, default is nobody.

None
perms_read List

(optional) List of roles ([‘*’] for all roles). If unspecified we will POST with current (if available) perms.read, default is None.

None
perms_write List

(optional) List of roles ([‘*’] for all roles). If unspecified we will POST with current (if available) perms.write, default is None.

None

Returns:

Type Description
dict

A dict contains ACL after update.

Raises:

Type Description
ACLException

If path is invalid.

Examples:

>>> aclm = acl.ACLManager(session_key, 'Splunk_TA_test')
>>> perms = aclm.update('data/transforms/extractions/_acl',
                        perms_read=['admin'], perms_write=['admin'])
Source code in solnlib/acl.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
@retry(exceptions=[binding.HTTPError])
def update(
    self,
    path: str,
    owner: str = None,
    perms_read: List = None,
    perms_write: List = None,
) -> dict:
    """Update ACL of /servicesNS/{`owner`}/{`app`}/{`path`}.

    If the ACL is per-entity (ends in /acl), owner can be reassigned. If
    the acl is endpoint-level (ends in _acl), owner will be ignored. The
    'sharing' setting is always retrieved from the current.

    Arguments:
        path: Path of ACL relative to /servicesNS/{owner}/{app}. MUST
            end with /acl or /_acl indicating whether the permission is applied
            at the per-entity level or endpoint level respectively.
        owner: (optional) New owner of ACL, default is `nobody`.
        perms_read: (optional) List of roles (['*'] for all roles). If
            unspecified we will POST with current (if available) perms.read,
            default is None.
        perms_write: (optional) List of roles (['*'] for all roles). If
            unspecified we will POST with current (if available) perms.write,
            default is None.

    Returns:
        A dict contains ACL after update.

    Raises:
        ACLException: If `path` is invalid.

    Examples:
       >>> aclm = acl.ACLManager(session_key, 'Splunk_TA_test')
       >>> perms = aclm.update('data/transforms/extractions/_acl',
                               perms_read=['admin'], perms_write=['admin'])
    """

    if not path.endswith("/acl") and not path.endswith("/_acl"):
        raise ACLException(
            "Invalid endpoint: %s, must end with /acl or /_acl." % path
        )

    curr_acl = self.get(path)

    postargs = {}
    if perms_read:
        postargs["perms.read"] = ",".join(perms_read)
    else:
        curr_read = curr_acl["perms"].get("read", [])
        if curr_read:
            postargs["perms.read"] = ",".join(curr_read)

    if perms_write:
        postargs["perms.write"] = ",".join(perms_write)
    else:
        curr_write = curr_acl["perms"].get("write", [])
        if curr_write:
            postargs["perms.write"] = ",".join(curr_write)

    if path.endswith("/acl"):
        # Allow ownership to be reset only at entity level.
        postargs["owner"] = owner or curr_acl["owner"]

    postargs["sharing"] = curr_acl["sharing"]

    try:
        content = self._rest_client.post(
            path, body=binding._encode(**postargs), output_mode="json"
        ).body.read()
    except binding.HTTPError as e:
        if e.status != 404:
            raise

        raise ACLException("Invalid endpoint: %s.", path)

    return json.loads(content)["entry"][0]["acl"]