Skip to content

Commit d9f08dd

Browse files
committed
Merge branch 'master' into release/1.6.15
2 parents ed75051 + 465522d commit d9f08dd

File tree

8 files changed

+276
-1
lines changed

8 files changed

+276
-1
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
---
2+
name: Bug report
3+
about: Create a report to help us improve
4+
title: ''
5+
labels: ''
6+
assignees: ''
7+
8+
---
9+
10+
**Describe the bug**
11+
A clear and concise description of what the bug is.
12+
13+
**To Reproduce**
14+
Steps to reproduce the behavior:
15+
1. Go to '...'
16+
2. Click on '....'
17+
3. Scroll down to '....'
18+
4. See error
19+
20+
**Expected behavior**
21+
A clear and concise description of what you expected to happen.
22+
23+
**Logs or Screenshots**
24+
If applicable, add logs or screenshots to help explain your problem.
25+
26+
**Splunk (please complete the following information):**
27+
- Version: [e.g. 8.0.5]
28+
- OS: [e.g. Ubuntu 20.04.1]
29+
- Deployment: [e.g. single-instance]
30+
31+
**SDK (please complete the following information):**
32+
- Version: [e.g. 1.6.14]
33+
- Language Runtime Version: [e.g. Python 3.7]
34+
- OS: [e.g. MacOS 10.15.7]
35+
36+
**Additional context**
37+
Add any other context about the problem here.

.github/ISSUE_TEMPLATE/custom.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
name: Custom issue template
3+
about: Describe this issue template's purpose here.
4+
title: ''
5+
labels: ''
6+
assignees: ''
7+
8+
---
9+
10+
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
---
2+
name: Feature request
3+
about: Suggest an idea for this project
4+
title: ''
5+
labels: ''
6+
assignees: ''
7+
8+
---
9+
10+
**Is your feature request related to a problem? Please describe.**
11+
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
12+
13+
**Describe the solution you'd like**
14+
A clear and concise description of what you want to happen.
15+
16+
**Describe alternatives you've considered**
17+
A clear and concise description of any alternative solutions or features you've considered.
18+
19+
**Additional context**
20+
Add any other context or screenshots about the feature request here.

splunklib/searchcommands/search_command.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,8 @@ def _execute(self, ifile, process):
856856

857857
@staticmethod
858858
def _as_binary_stream(ifile):
859-
if six.PY2:
859+
naught = ifile.read(0)
860+
if isinstance(naught, bytes):
860861
return ifile
861862

862863
try:
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import collections
2+
import csv
3+
import io
4+
import json
5+
6+
import splunklib.searchcommands.internals
7+
from splunklib import six
8+
9+
10+
class Chunk(object):
11+
def __init__(self, version, meta, data):
12+
self.version = six.ensure_str(version)
13+
self.meta = json.loads(meta)
14+
dialect = splunklib.searchcommands.internals.CsvDialect
15+
self.data = csv.DictReader(io.StringIO(data.decode("utf-8")),
16+
dialect=dialect)
17+
18+
19+
class ChunkedDataStreamIter(collections.Iterator):
20+
def __init__(self, chunk_stream):
21+
self.chunk_stream = chunk_stream
22+
23+
def __next__(self):
24+
return self.next()
25+
26+
def next(self):
27+
try:
28+
return self.chunk_stream.read_chunk()
29+
except EOFError:
30+
raise StopIteration
31+
32+
33+
class ChunkedDataStream(collections.Iterable):
34+
def __iter__(self):
35+
return ChunkedDataStreamIter(self)
36+
37+
def __init__(self, stream):
38+
empty = stream.read(0)
39+
assert isinstance(empty, bytes)
40+
self.stream = stream
41+
42+
def read_chunk(self):
43+
header = self.stream.readline()
44+
45+
while len(header) > 0 and header.strip() == b'':
46+
header = self.stream.readline() # Skip empty lines
47+
if len(header) == 0:
48+
raise EOFError
49+
50+
version, meta, data = header.rstrip().split(b',')
51+
metabytes = self.stream.read(int(meta))
52+
databytes = self.stream.read(int(data))
53+
return Chunk(version, metabytes, databytes)
54+
55+
56+
def build_chunk(keyval, data=None):
57+
metadata = six.ensure_binary(json.dumps(keyval), 'utf-8')
58+
data_output = _build_data_csv(data)
59+
return b"chunked 1.0,%d,%d\n%s%s" % (len(metadata), len(data_output), metadata, data_output)
60+
61+
62+
def build_empty_searchinfo():
63+
return {
64+
'earliest_time': 0,
65+
'latest_time': 0,
66+
'search': "",
67+
'dispatch_dir': "",
68+
'sid': "",
69+
'args': [],
70+
'splunk_version': "42.3.4",
71+
}
72+
73+
74+
def build_getinfo_chunk():
75+
return build_chunk({
76+
'action': 'getinfo',
77+
'preview': False,
78+
'searchinfo': build_empty_searchinfo()})
79+
80+
81+
def build_data_chunk(data, finished=True):
82+
return build_chunk({'action': 'execute', 'finished': finished}, data)
83+
84+
85+
def _build_data_csv(data):
86+
if data is None:
87+
return b''
88+
if isinstance(data, bytes):
89+
return data
90+
csvout = splunklib.six.StringIO()
91+
92+
headers = set()
93+
for datum in data:
94+
headers.update(datum.keys())
95+
writer = csv.DictWriter(csvout, headers,
96+
dialect=splunklib.searchcommands.internals.CsvDialect)
97+
writer.writeheader()
98+
for datum in data:
99+
writer.writerow(datum)
100+
return six.ensure_binary(csvout.getvalue())
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import io
2+
import time
3+
4+
from . import chunked_data_stream as chunky
5+
6+
from splunklib.searchcommands import Configuration, GeneratingCommand
7+
8+
9+
def test_simple_generator():
10+
@Configuration()
11+
class GeneratorTest(GeneratingCommand):
12+
def generate(self):
13+
for num in range(1, 10):
14+
yield {'_time': time.time(), 'event_index': num}
15+
generator = GeneratorTest()
16+
in_stream = io.BytesIO()
17+
in_stream.write(chunky.build_getinfo_chunk())
18+
in_stream.write(chunky.build_chunk({'action': 'execute'}))
19+
in_stream.seek(0)
20+
out_stream = io.BytesIO()
21+
generator._process_protocol_v2([], in_stream, out_stream)
22+
out_stream.seek(0)
23+
24+
ds = chunky.ChunkedDataStream(out_stream)
25+
is_first_chunk = True
26+
finished_seen = False
27+
expected = set(map(lambda i: str(i), range(1, 10)))
28+
seen = set()
29+
for chunk in ds:
30+
if is_first_chunk:
31+
assert chunk.meta["generating"] is True
32+
assert chunk.meta["type"] == "stateful"
33+
is_first_chunk = False
34+
finished_seen = chunk.meta.get("finished", False)
35+
for row in chunk.data:
36+
seen.add(row["event_index"])
37+
print(out_stream.getvalue())
38+
print(expected)
39+
print(seen)
40+
assert expected.issubset(seen)
41+
assert finished_seen
42+
43+
44+
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import io
2+
3+
import splunklib.searchcommands as searchcommands
4+
from . import chunked_data_stream as chunky
5+
6+
7+
def test_simple_reporting_command():
8+
@searchcommands.Configuration()
9+
class TestReportingCommand(searchcommands.ReportingCommand):
10+
def reduce(self, records):
11+
value = 0
12+
for record in records:
13+
value += int(record["value"])
14+
yield {'sum': value}
15+
16+
cmd = TestReportingCommand()
17+
ifile = io.BytesIO()
18+
data = list()
19+
for i in range(0, 10):
20+
data.append({"value": str(i)})
21+
ifile.write(chunky.build_getinfo_chunk())
22+
ifile.write(chunky.build_data_chunk(data))
23+
ifile.seek(0)
24+
ofile = io.BytesIO()
25+
cmd._process_protocol_v2([], ifile, ofile)
26+
ofile.seek(0)
27+
chunk_stream = chunky.ChunkedDataStream(ofile)
28+
getinfo_response = chunk_stream.read_chunk()
29+
assert getinfo_response.meta['type'] == 'reporting'
30+
data_chunk = chunk_stream.read_chunk()
31+
assert data_chunk.meta['finished'] is True # Should only be one row
32+
data = list(data_chunk.data)
33+
assert len(data) == 1
34+
assert int(data[0]['sum']) == sum(range(0, 10))
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import io
2+
3+
from . import chunked_data_stream as chunky
4+
from splunklib.searchcommands import StreamingCommand, Configuration
5+
6+
7+
def test_simple_streaming_command():
8+
@Configuration()
9+
class TestStreamingCommand(StreamingCommand):
10+
11+
def stream(self, records):
12+
for record in records:
13+
record["out_index"] = record["in_index"]
14+
yield record
15+
16+
cmd = TestStreamingCommand()
17+
ifile = io.BytesIO()
18+
ifile.write(chunky.build_getinfo_chunk())
19+
data = list()
20+
for i in range(0, 10):
21+
data.append({"in_index": str(i)})
22+
ifile.write(chunky.build_data_chunk(data, finished=True))
23+
ifile.seek(0)
24+
ofile = io.BytesIO()
25+
cmd._process_protocol_v2([], ifile, ofile)
26+
ofile.seek(0)
27+
output = chunky.ChunkedDataStream(ofile)
28+
getinfo_response = output.read_chunk()
29+
assert getinfo_response.meta["type"] == "streaming"

0 commit comments

Comments
 (0)