Skip to content

Commit 79237e1

Browse files
artemrysArtem Rys
authored andcommitted
refactor: move get_collection_data to a separate module
1 parent 2d971be commit 79237e1

File tree

3 files changed

+146
-110
lines changed

3 files changed

+146
-110
lines changed

solnlib/_utils.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#
2+
# Copyright 2021 Splunk Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
"""This module provide utils that are private to solnlib."""
17+
18+
import re
19+
from typing import Any, Dict, Optional, Union
20+
21+
from splunklib import binding
22+
23+
from solnlib import splunk_rest_client
24+
from solnlib.utils import retry
25+
26+
27+
@retry(exceptions=[binding.HTTPError])
28+
def get_collection_data(
29+
collection_name: str,
30+
session_key: str,
31+
app: str,
32+
owner: Optional[str] = None,
33+
scheme: Optional[str] = None,
34+
host: Optional[str] = None,
35+
port: Optional[Union[str, int]] = None,
36+
fields: Optional[Dict] = None,
37+
**context: Any,
38+
):
39+
"""Get collection data, if there is no such collection - creates one.
40+
41+
Arguments:
42+
collection_name: Collection name of KV Store checkpointer.
43+
session_key: Splunk access token.
44+
app: App name of namespace.
45+
owner: Owner of namespace, default is `nobody`.
46+
scheme: The access scheme, default is None.
47+
host: The host name, default is None.
48+
port: The port number, default is None.
49+
context: Other configurations for Splunk rest client.
50+
51+
Raises:
52+
binding.HTTPError: HTTP error different from 404, for example 503 when
53+
KV Store is initializing and not ready to serve requests.
54+
KeyError: KV Store did not get collection_name.
55+
"""
56+
kvstore = splunk_rest_client.SplunkRestClient(
57+
session_key, app, owner=owner, scheme=scheme, host=host, port=port, **context
58+
).kvstore
59+
60+
collection_name = re.sub(r"[^\w]+", "_", collection_name)
61+
try:
62+
kvstore.get(name=collection_name)
63+
except binding.HTTPError as e:
64+
if e.status != 404:
65+
raise
66+
67+
fields = fields if fields is not None else {}
68+
kvstore.create(collection_name, fields=fields)
69+
70+
collections = kvstore.list(search=collection_name)
71+
for collection in collections:
72+
if collection.name == collection_name:
73+
return collection.data
74+
else:
75+
raise KeyError(f"Get collection data: {collection_name} failed.")

solnlib/modular_input/checkpointer.py

Lines changed: 26 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,13 @@
2222
import logging
2323
import os
2424
import os.path as op
25-
import re
2625
import traceback
2726
from abc import ABCMeta, abstractmethod
28-
from typing import List
27+
from typing import Any, List, Optional
2928

3029
from splunklib import binding
3130

32-
from .. import splunk_rest_client as rest_client
33-
from ..utils import retry
31+
from solnlib import _utils, utils
3432

3533
__all__ = ["CheckpointerException", "KVStoreCheckpointer", "FileCheckpointer"]
3634

@@ -143,11 +141,11 @@ def __init__(
143141
collection_name: str,
144142
session_key: str,
145143
app: str,
146-
owner: str = "nobody",
147-
scheme: str = None,
148-
host: str = None,
149-
port: int = None,
150-
**context: dict
144+
owner: Optional[str] = "nobody",
145+
scheme: Optional[str] = None,
146+
host: Optional[str] = None,
147+
port: Optional[int] = None,
148+
**context: Any
151149
):
152150
"""Initializes KVStoreCheckpointer.
153151
@@ -162,65 +160,39 @@ def __init__(
162160
context: Other configurations for Splunk rest client.
163161
164162
Raises:
165-
CheckpointerException: If init kvstore checkpointer failed.
163+
CheckpointerException: If init KV Store checkpointer failed.
166164
"""
167165
try:
168-
self._collection_data = self._get_collection_data(
169-
collection_name, session_key, app, owner, scheme, host, port, **context
166+
if not context.get("pool_connections"):
167+
context["pool_connections"] = 5
168+
if not context.get("pool_maxsize"):
169+
context["pool_maxsize"] = 5
170+
self._collection_data = _utils.get_collection_data(
171+
collection_name,
172+
session_key,
173+
app,
174+
owner,
175+
scheme,
176+
host,
177+
port,
178+
{"state": "string"},
179+
**context,
170180
)
171181
except KeyError:
172182
raise CheckpointerException("Get kvstore checkpointer failed.")
173183

174-
@retry(exceptions=[binding.HTTPError])
175-
def _get_collection_data(
176-
self, collection_name, session_key, app, owner, scheme, host, port, **context
177-
):
178-
179-
if not context.get("pool_connections"):
180-
context["pool_connections"] = 5
181-
182-
if not context.get("pool_maxsize"):
183-
context["pool_maxsize"] = 5
184-
185-
kvstore = rest_client.SplunkRestClient(
186-
session_key,
187-
app,
188-
owner=owner,
189-
scheme=scheme,
190-
host=host,
191-
port=port,
192-
**context
193-
).kvstore
194-
195-
collection_name = re.sub(r"[^\w]+", "_", collection_name)
196-
try:
197-
kvstore.get(name=collection_name)
198-
except binding.HTTPError as e:
199-
if e.status != 404:
200-
raise
201-
202-
fields = {"state": "string"}
203-
kvstore.create(collection_name, fields=fields)
204-
205-
collections = kvstore.list(search=collection_name)
206-
for collection in collections:
207-
if collection.name == collection_name:
208-
return collection.data
209-
else:
210-
raise KeyError("Get collection data: %s failed." % collection_name)
211-
212-
@retry(exceptions=[binding.HTTPError])
184+
@utils.retry(exceptions=[binding.HTTPError])
213185
def update(self, key, state):
214186
record = {"_key": key, "state": json.dumps(state)}
215187
self._collection_data.batch_save(record)
216188

217-
@retry(exceptions=[binding.HTTPError])
189+
@utils.retry(exceptions=[binding.HTTPError])
218190
def batch_update(self, states):
219191
for state in states:
220192
state["state"] = json.dumps(state["state"])
221193
self._collection_data.batch_save(*states)
222194

223-
@retry(exceptions=[binding.HTTPError])
195+
@utils.retry(exceptions=[binding.HTTPError])
224196
def get(self, key):
225197
try:
226198
record = self._collection_data.query_by_id(key)
@@ -233,7 +205,7 @@ def get(self, key):
233205

234206
return json.loads(record["state"])
235207

236-
@retry(exceptions=[binding.HTTPError])
208+
@utils.retry(exceptions=[binding.HTTPError])
237209
def delete(self, key):
238210
try:
239211
self._collection_data.delete_by_id(key)

0 commit comments

Comments
 (0)