Skip to content

Commit 4f4e609

Browse files
committed
initial end-to-end tests for streaming, reporting, generating
1 parent 7645d29 commit 4f4e609

File tree

5 files changed

+199
-1
lines changed

5 files changed

+199
-1
lines changed

splunklib/searchcommands/search_command.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,8 @@ def _execute(self, ifile, process):
851851

852852
@staticmethod
853853
def _as_binary_stream(ifile):
854-
if six.PY2:
854+
naught = ifile.read(0)
855+
if isinstance(naught, bytes):
855856
return ifile
856857

857858
try:
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import collections
2+
import csv
3+
import io
4+
import json
5+
6+
import splunklib.searchcommands.internals
7+
from splunklib import six
8+
9+
10+
class Chunk(object):
11+
def __init__(self, version, meta, data):
12+
self.version = six.ensure_str(version)
13+
self.meta = json.loads(meta)
14+
self.data = csv.DictReader(io.StringIO(data.decode("utf-8")), dialect=splunklib.searchcommands.internals.CsvDialect)
15+
16+
17+
class ChunkedDataStreamIter(collections.Iterator):
18+
def __init__(self, chunk_stream):
19+
self.chunk_stream = chunk_stream
20+
21+
def __next__(self):
22+
return self.next()
23+
24+
def next(self):
25+
try:
26+
return self.chunk_stream.read_chunk()
27+
except EOFError:
28+
raise StopIteration
29+
30+
31+
class ChunkedDataStream(collections.Iterable):
32+
def __iter__(self):
33+
return ChunkedDataStreamIter(self)
34+
35+
def __init__(self, stream):
36+
empty = stream.read(0)
37+
assert isinstance(empty, bytes)
38+
self.stream = stream
39+
40+
def read_chunk(self):
41+
header = self.stream.readline()
42+
if len(header) == 0:
43+
raise EOFError
44+
45+
while len(header) > 0 and header.strip() == b'':
46+
header = self.stream.readline() # Skip empty lines
47+
version, meta, data = header.rstrip().split(b',')
48+
metabytes = self.stream.read(int(meta))
49+
databytes = self.stream.read(int(data))
50+
print("sent")
51+
return Chunk(version, metabytes, databytes)
52+
53+
54+
def build_chunk(kv, data=None):
55+
metadata = six.ensure_binary(json.dumps(kv), 'utf-8')
56+
data_output = _build_data_csv(data)
57+
return b"chunked 1.0,%d,%d\n%s%s" % (len(metadata), len(data_output), metadata, data_output)
58+
59+
60+
def build_empty_searchinfo():
61+
return {
62+
'earliest_time': 0,
63+
'latest_time': 0,
64+
'search': "",
65+
'dispatch_dir': "",
66+
'sid': "",
67+
'args': [],
68+
'splunk_version': "42.3.4",
69+
}
70+
71+
72+
def build_getinfo_chunk():
73+
return build_chunk({'action': 'getinfo', 'preview': False, 'searchinfo': build_empty_searchinfo()})
74+
75+
76+
def build_data_chunk(data, finished=True):
77+
return build_chunk({'action': 'execute', 'finished': finished}, data)
78+
79+
80+
def _build_data_csv(data):
81+
if data is None:
82+
return b''
83+
if isinstance(data, bytes):
84+
return data
85+
csvout = io.StringIO()
86+
headers = set()
87+
for datum in data:
88+
headers.update(datum.keys())
89+
writer = csv.DictWriter(csvout, headers, dialect=splunklib.searchcommands.internals.CsvDialect)
90+
writer.writeheader()
91+
for datum in data:
92+
writer.writerow(datum)
93+
return csvout.getvalue().encode("utf-8")
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import io
2+
import time
3+
4+
from . import chunked_data_stream as chunky
5+
6+
from splunklib.searchcommands import Configuration, GeneratingCommand
7+
8+
9+
def test_simple_generator():
10+
@Configuration()
11+
class GeneratorTest(GeneratingCommand):
12+
def generate(self):
13+
for num in range(1, 10):
14+
yield {'_time': time.time(), 'event_index': num}
15+
generator = GeneratorTest()
16+
in_stream = io.BytesIO()
17+
in_stream.write(chunky.build_getinfo_chunk())
18+
in_stream.write(chunky.build_chunk({'action': 'execute'}))
19+
in_stream.seek(0)
20+
out_stream = io.BytesIO()
21+
generator._process_protocol_v2([], in_stream, out_stream)
22+
out_stream.seek(0)
23+
24+
ds = chunky.ChunkedDataStream(out_stream)
25+
is_first_chunk = True
26+
finished_seen = False
27+
expected = set(map(lambda i: str(i), range(1, 10)))
28+
seen = set()
29+
for chunk in ds:
30+
if is_first_chunk:
31+
assert chunk.meta["generating"] is True
32+
assert chunk.meta["type"] == "stateful"
33+
is_first_chunk = False
34+
finished_seen = chunk.meta.get("finished", False)
35+
for row in chunk.data:
36+
seen.add(row["event_index"])
37+
assert expected.issubset(seen)
38+
assert finished_seen
39+
40+
41+
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import io
2+
3+
import splunklib.searchcommands as searchcommands
4+
from . import chunked_data_stream as chunky
5+
6+
7+
def test_simple_reporting_command():
8+
@searchcommands.Configuration()
9+
class TestReportingCommand(searchcommands.ReportingCommand):
10+
def reduce(self, records):
11+
value = 0
12+
for record in records:
13+
value += int(record["value"])
14+
yield {'sum': value}
15+
16+
cmd = TestReportingCommand()
17+
ifile = io.BytesIO()
18+
data = list()
19+
for i in range(0, 10):
20+
data.append({"value": str(i)})
21+
ifile.write(chunky.build_getinfo_chunk())
22+
ifile.write(chunky.build_data_chunk(data))
23+
ifile.seek(0)
24+
ofile = io.BytesIO()
25+
cmd._process_protocol_v2([], ifile, ofile)
26+
ofile.seek(0)
27+
chunk_stream = chunky.ChunkedDataStream(ofile)
28+
getinfo_response = chunk_stream.read_chunk()
29+
assert getinfo_response.meta['type'] == 'reporting'
30+
data_chunk = chunk_stream.read_chunk()
31+
assert data_chunk.meta['finished'] is True # Should only be one row
32+
data = list(data_chunk.data)
33+
assert len(data) == 1
34+
assert int(data[0]['sum']) == sum(range(0, 10))
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import io
2+
3+
from . import chunked_data_stream as chunky
4+
from splunklib.searchcommands import StreamingCommand, Configuration
5+
6+
7+
def test_simple_streaming_command():
8+
@Configuration()
9+
class TestStreamingCommand(StreamingCommand):
10+
11+
def stream(self, records):
12+
for record in records:
13+
record["out_index"] = record["in_index"]
14+
yield record
15+
16+
cmd = TestStreamingCommand()
17+
ifile = io.BytesIO()
18+
ifile.write(chunky.build_getinfo_chunk())
19+
data = list()
20+
for i in range(0, 10):
21+
data.append({"in_index": str(i)})
22+
ifile.write(chunky.build_data_chunk(data, finished=True))
23+
ifile.seek(0)
24+
ofile = io.BytesIO()
25+
cmd._process_protocol_v2([], ifile, ofile)
26+
ofile.seek(0)
27+
output = chunky.ChunkedDataStream(ofile)
28+
getinfo_response = output.read_chunk()
29+
assert getinfo_response.meta["type"] == "streaming"

0 commit comments

Comments
 (0)