Skip to content

server_info.py

This module contains Splunk server info related functionalities.

__all__ = ['ServerInfo', 'ServerInfoException'] module-attribute

ServerInfo

This class is a wrapper of Splunk server info.

Source code in solnlib/server_info.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class ServerInfo:
    """This class is a wrapper of Splunk server info."""

    SHC_MEMBER_ENDPOINT = "/services/shcluster/member/members"
    SHC_CAPTAIN_INFO_ENDPOINT = "/services/shcluster/captain/info"

    def __init__(
        self,
        session_key: str,
        scheme: Optional[str] = None,
        host: Optional[str] = None,
        port: Optional[int] = None,
        **context: Any
    ):
        """Initializes ServerInfo.

        Arguments:
            session_key: Splunk access token.
            scheme: The access scheme, default is None.
            host: The host name, default is None.
            port: The port number, default is None.
            context: Other configurations for Splunk rest client.
        """
        self._rest_client = rest_client.SplunkRestClient(
            session_key, "-", scheme=scheme, host=host, port=port, **context
        )

    @classmethod
    def from_server_uri(
        cls, server_uri: str, session_key: str, **context: Any
    ) -> "ServerInfo":
        """Creates ServerInfo class using server_uri and session_key.

        Note: splunktalib uses these parameters to create it's ServerInfo class,
        so this method should ease the transition from splunktalib to solnlib.

        Arguments:
            server_uri: splunkd URI.
            session_key: Splunk access token.
            context: Other configurations for Splunk rest client.

        Returns:
            An instance of `ServerInfo`.

        Raises:
            ValueError: server_uri is in the wrong format.
        """
        scheme, host, port = utils.extract_http_scheme_host_port(server_uri)
        return ServerInfo(
            session_key,
            scheme=scheme,
            host=host,
            port=port,
            **context,
        )

    def to_dict(self) -> Dict:
        """Returns server information in a form dictionary.

        Note: This method is implemented here to have compatibility with splunktalib's
        analogue.

        Returns:
            Server information in a dictionary format.
        """
        return self._server_info()

    @utils.retry(exceptions=[binding.HTTPError])
    def _server_info(self):
        return self._rest_client.info

    @property
    def server_name(self) -> str:
        """Get server name.

        Returns:
            Server name.
        """
        return self._server_info()["serverName"]

    @property
    def guid(self) -> str:
        """Get guid for the server.

        Returns:
            Server GUID.
        """
        return self._server_info()["guid"]

    @property
    def version(self) -> str:
        """Get Splunk server version.

        Returns:
            Splunk version.
        """
        return self._server_info()["version"]

    def is_captain(self) -> bool:
        """Check if this server is SHC captain.

        Note during a rolling start of SH members, the captain may be changed
        from machine to machine. To avoid the race condition, client may need
        do necessary sleep and then poll `is_captain_ready() == True` and then
        check `is_captain()`. See `is_captain_ready()` for more details.

        Returns:
            True if this server is SHC captain else False.
        """

        return "shc_captain" in self._server_info()["server_roles"]

    def is_cloud_instance(self) -> bool:
        """Check if this server is a cloud instance.

        Returns:
            True if this server is a cloud instance else False.
        """

        try:
            return self._server_info()["instance_type"] == "cloud"
        except KeyError:
            return False

    def is_search_head(self) -> bool:
        """Check if this server is a search head.

        Returns:
            True if this server is a search head else False.
        """

        server_info = self._server_info()
        for sh in ("search_head", "cluster_search_head"):
            if sh in server_info["server_roles"]:
                return True

        return False

    def is_shc_member(self) -> bool:
        """Check if this server is a SHC member.

        Returns:
            True if this server is a SHC member else False.
        """

        server_info = self._server_info()
        for sh in ("shc_member", "shc_captain"):
            if sh in server_info["server_roles"]:
                return True

        return False

    @utils.retry(exceptions=[binding.HTTPError])
    def get_shc_members(self) -> list:
        """Get SHC members.

        Raises:
            ServerInfoException: If this server has no SHC members.

        Returns:
            List of SHC members [(label, peer_scheme_host_port) ...].
        """
        try:
            content = self._rest_client.get(
                self.SHC_MEMBER_ENDPOINT, output_mode="json"
            ).body.read()
        except binding.HTTPError as e:
            if e.status != 404 and e.status != 503:
                raise

            raise ServerInfoException(
                "This server is not a SHC member and has no SHC members."
            )

        members = []
        for member in json.loads(content)["entry"]:
            content = member["content"]
            members.append((content["label"], content["peer_scheme_host_port"]))

        return members

    @utils.retry(exceptions=[binding.HTTPError])
    def is_captain_ready(self) -> bool:
        """Check if captain is ready.

        Client usually first polls this function until captain is ready
        and then call is_captain to detect current captain machine

        Returns:
            True if captain is ready else False.

        Examples:
            >>> from solnlib import server_info
            >>> serverinfo = server_info.ServerInfo(session_key)
            >>> while 1:
            >>>    if serverinfo.is_captain_ready():
            >>>        break
            >>>    time.sleep(2)
            >>>
            >>> # If do_stuff can only be executed in SH captain
            >>> if serverinfo.is_captain():
            >>>    do_stuff()
        """

        cap_info = self.captain_info()
        return utils.is_true(cap_info["service_ready_flag"]) and utils.is_false(
            cap_info["maintenance_mode"]
        )

    @utils.retry(exceptions=[binding.HTTPError])
    def captain_info(self) -> dict:
        """Get captain information.

        Raises:
            ServerInfoException: If there is SHC is not enabled.

        Returns:
            Captain information.
        """

        try:
            content = self._rest_client.get(
                self.SHC_CAPTAIN_INFO_ENDPOINT, output_mode="json"
            ).body.read()
        except binding.HTTPError as e:
            if e.status == 503 and "not available" in str(e):
                raise ServerInfoException(str(e))
            raise

        return json.loads(content)["entry"][0]["content"]

SHC_CAPTAIN_INFO_ENDPOINT = '/services/shcluster/captain/info' class-attribute instance-attribute

SHC_MEMBER_ENDPOINT = '/services/shcluster/member/members' class-attribute instance-attribute

guid: str property

Get guid for the server.

Returns:

Type Description
str

Server GUID.

server_name: str property

Get server name.

Returns:

Type Description
str

Server name.

version: str property

Get Splunk server version.

Returns:

Type Description
str

Splunk version.

__init__(session_key, scheme=None, host=None, port=None, **context)

Initializes ServerInfo.

Parameters:

Name Type Description Default
session_key str

Splunk access token.

required
scheme Optional[str]

The access scheme, default is None.

None
host Optional[str]

The host name, default is None.

None
port Optional[int]

The port number, default is None.

None
context Any

Other configurations for Splunk rest client.

{}
Source code in solnlib/server_info.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(
    self,
    session_key: str,
    scheme: Optional[str] = None,
    host: Optional[str] = None,
    port: Optional[int] = None,
    **context: Any
):
    """Initializes ServerInfo.

    Arguments:
        session_key: Splunk access token.
        scheme: The access scheme, default is None.
        host: The host name, default is None.
        port: The port number, default is None.
        context: Other configurations for Splunk rest client.
    """
    self._rest_client = rest_client.SplunkRestClient(
        session_key, "-", scheme=scheme, host=host, port=port, **context
    )

captain_info()

Get captain information.

Raises:

Type Description
ServerInfoException

If there is SHC is not enabled.

Returns:

Type Description
dict

Captain information.

Source code in solnlib/server_info.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
@utils.retry(exceptions=[binding.HTTPError])
def captain_info(self) -> dict:
    """Get captain information.

    Raises:
        ServerInfoException: If there is SHC is not enabled.

    Returns:
        Captain information.
    """

    try:
        content = self._rest_client.get(
            self.SHC_CAPTAIN_INFO_ENDPOINT, output_mode="json"
        ).body.read()
    except binding.HTTPError as e:
        if e.status == 503 and "not available" in str(e):
            raise ServerInfoException(str(e))
        raise

    return json.loads(content)["entry"][0]["content"]

from_server_uri(server_uri, session_key, **context) classmethod

Creates ServerInfo class using server_uri and session_key.

Note: splunktalib uses these parameters to create it’s ServerInfo class, so this method should ease the transition from splunktalib to solnlib.

Parameters:

Name Type Description Default
server_uri str

splunkd URI.

required
session_key str

Splunk access token.

required
context Any

Other configurations for Splunk rest client.

{}

Returns:

Type Description
ServerInfo

An instance of ServerInfo.

Raises:

Type Description
ValueError

server_uri is in the wrong format.

Source code in solnlib/server_info.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@classmethod
def from_server_uri(
    cls, server_uri: str, session_key: str, **context: Any
) -> "ServerInfo":
    """Creates ServerInfo class using server_uri and session_key.

    Note: splunktalib uses these parameters to create it's ServerInfo class,
    so this method should ease the transition from splunktalib to solnlib.

    Arguments:
        server_uri: splunkd URI.
        session_key: Splunk access token.
        context: Other configurations for Splunk rest client.

    Returns:
        An instance of `ServerInfo`.

    Raises:
        ValueError: server_uri is in the wrong format.
    """
    scheme, host, port = utils.extract_http_scheme_host_port(server_uri)
    return ServerInfo(
        session_key,
        scheme=scheme,
        host=host,
        port=port,
        **context,
    )

get_shc_members()

Get SHC members.

Raises:

Type Description
ServerInfoException

If this server has no SHC members.

Returns:

Type Description
list

List of SHC members [(label, peer_scheme_host_port) …].

Source code in solnlib/server_info.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@utils.retry(exceptions=[binding.HTTPError])
def get_shc_members(self) -> list:
    """Get SHC members.

    Raises:
        ServerInfoException: If this server has no SHC members.

    Returns:
        List of SHC members [(label, peer_scheme_host_port) ...].
    """
    try:
        content = self._rest_client.get(
            self.SHC_MEMBER_ENDPOINT, output_mode="json"
        ).body.read()
    except binding.HTTPError as e:
        if e.status != 404 and e.status != 503:
            raise

        raise ServerInfoException(
            "This server is not a SHC member and has no SHC members."
        )

    members = []
    for member in json.loads(content)["entry"]:
        content = member["content"]
        members.append((content["label"], content["peer_scheme_host_port"]))

    return members

is_captain()

Check if this server is SHC captain.

Note during a rolling start of SH members, the captain may be changed from machine to machine. To avoid the race condition, client may need do necessary sleep and then poll is_captain_ready() == True and then check is_captain(). See is_captain_ready() for more details.

Returns:

Type Description
bool

True if this server is SHC captain else False.

Source code in solnlib/server_info.py
134
135
136
137
138
139
140
141
142
143
144
145
146
def is_captain(self) -> bool:
    """Check if this server is SHC captain.

    Note during a rolling start of SH members, the captain may be changed
    from machine to machine. To avoid the race condition, client may need
    do necessary sleep and then poll `is_captain_ready() == True` and then
    check `is_captain()`. See `is_captain_ready()` for more details.

    Returns:
        True if this server is SHC captain else False.
    """

    return "shc_captain" in self._server_info()["server_roles"]

is_captain_ready()

Check if captain is ready.

Client usually first polls this function until captain is ready and then call is_captain to detect current captain machine

Returns:

Type Description
bool

True if captain is ready else False.

Examples:

>>> from solnlib import server_info
>>> serverinfo = server_info.ServerInfo(session_key)
>>> while 1:
>>>    if serverinfo.is_captain_ready():
>>>        break
>>>    time.sleep(2)
>>>
>>> # If do_stuff can only be executed in SH captain
>>> if serverinfo.is_captain():
>>>    do_stuff()
Source code in solnlib/server_info.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
@utils.retry(exceptions=[binding.HTTPError])
def is_captain_ready(self) -> bool:
    """Check if captain is ready.

    Client usually first polls this function until captain is ready
    and then call is_captain to detect current captain machine

    Returns:
        True if captain is ready else False.

    Examples:
        >>> from solnlib import server_info
        >>> serverinfo = server_info.ServerInfo(session_key)
        >>> while 1:
        >>>    if serverinfo.is_captain_ready():
        >>>        break
        >>>    time.sleep(2)
        >>>
        >>> # If do_stuff can only be executed in SH captain
        >>> if serverinfo.is_captain():
        >>>    do_stuff()
    """

    cap_info = self.captain_info()
    return utils.is_true(cap_info["service_ready_flag"]) and utils.is_false(
        cap_info["maintenance_mode"]
    )

is_cloud_instance()

Check if this server is a cloud instance.

Returns:

Type Description
bool

True if this server is a cloud instance else False.

Source code in solnlib/server_info.py
148
149
150
151
152
153
154
155
156
157
158
def is_cloud_instance(self) -> bool:
    """Check if this server is a cloud instance.

    Returns:
        True if this server is a cloud instance else False.
    """

    try:
        return self._server_info()["instance_type"] == "cloud"
    except KeyError:
        return False

is_search_head()

Check if this server is a search head.

Returns:

Type Description
bool

True if this server is a search head else False.

Source code in solnlib/server_info.py
160
161
162
163
164
165
166
167
168
169
170
171
172
def is_search_head(self) -> bool:
    """Check if this server is a search head.

    Returns:
        True if this server is a search head else False.
    """

    server_info = self._server_info()
    for sh in ("search_head", "cluster_search_head"):
        if sh in server_info["server_roles"]:
            return True

    return False

is_shc_member()

Check if this server is a SHC member.

Returns:

Type Description
bool

True if this server is a SHC member else False.

Source code in solnlib/server_info.py
174
175
176
177
178
179
180
181
182
183
184
185
186
def is_shc_member(self) -> bool:
    """Check if this server is a SHC member.

    Returns:
        True if this server is a SHC member else False.
    """

    server_info = self._server_info()
    for sh in ("shc_member", "shc_captain"):
        if sh in server_info["server_roles"]:
            return True

    return False

to_dict()

Returns server information in a form dictionary.

Note: This method is implemented here to have compatibility with splunktalib’s analogue.

Returns:

Type Description
Dict

Server information in a dictionary format.

Source code in solnlib/server_info.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def to_dict(self) -> Dict:
    """Returns server information in a form dictionary.

    Note: This method is implemented here to have compatibility with splunktalib's
    analogue.

    Returns:
        Server information in a dictionary format.
    """
    return self._server_info()

ServerInfoException

Bases: Exception

Exception raised by ServerInfo class.

Source code in solnlib/server_info.py
30
31
32
33
class ServerInfoException(Exception):
    """Exception raised by ServerInfo class."""

    pass