Skip to content

Commit 88a9869

Browse files
committed
JSONResultsReader changes
1 parent f1db833 commit 88a9869

File tree

7 files changed

+88
-24
lines changed

7 files changed

+88
-24
lines changed

examples/follow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def follow(job, count, items):
4242
job.refresh()
4343
continue
4444
stream = items(offset+1)
45-
for event in results.ResultsReader(stream):
45+
for event in results.JSONResultsReader(stream):
4646
pprint(event)
4747
offset = total
4848

@@ -72,10 +72,10 @@ def main():
7272

7373
if job['reportSearch'] is not None: # Is it a transforming search?
7474
count = lambda: int(job['numPreviews'])
75-
items = lambda _: job.preview()
75+
items = lambda _: job.preview(output_mode='json')
7676
else:
7777
count = lambda: int(job['eventCount'])
78-
items = lambda offset: job.events(offset=offset)
78+
items = lambda offset: job.events(offset=offset, output_mode='json')
7979

8080
try:
8181
follow(job, count, items)

examples/oneshot.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
"(e.g., export PYTHONPATH=~/splunk-sdk-python.")
3333

3434
def pretty(response):
35-
reader = results.ResultsReader(response)
35+
reader = results.JSONResultsReader(response)
3636
for result in reader:
3737
if isinstance(result, dict):
3838
pprint(result)
@@ -46,7 +46,7 @@ def main():
4646
search = opts.args[0]
4747
service = connect(**opts.kwargs)
4848
socket.setdefaulttimeout(None)
49-
response = service.jobs.oneshot(search)
49+
response = service.jobs.oneshot(search, output_mode='json')
5050

5151
pretty(response)
5252

examples/search_modes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def modes(argv):
2424
while not job.is_ready():
2525
time.sleep(0.5)
2626
pass
27-
reader = results.ResultsReader(job.events())
27+
reader = results.JSONResultsReader(job.events(output_mode='json'))
2828
# Events found: 0
2929
print('Events found with adhoc_search_level="smart": %s' % len([e for e in reader]))
3030

@@ -33,7 +33,7 @@ def modes(argv):
3333
while not job.is_ready():
3434
time.sleep(0.5)
3535
pass
36-
reader = results.ResultsReader(job.events())
36+
reader = results.ResultsReader(job.events(output_mode='json'))
3737
# Events found: 10
3838
print('Events found with adhoc_search_level="verbose": %s' % len([e for e in reader]))
3939

examples/stail.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from pprint import pprint
2626

2727
from splunklib.client import connect
28-
from splunklib.results import ResultsReader
28+
from splunklib.results import ResultsReader, JSONResultsReader
2929

3030
try:
3131
import utils
@@ -49,9 +49,10 @@ def main():
4949
search=search,
5050
earliest_time="rt",
5151
latest_time="rt",
52-
search_mode="realtime")
52+
search_mode="realtime",
53+
output_mode="json")
5354

54-
for result in ResultsReader(result.body):
55+
for result in JSONResultsReader(result.body):
5556
if result is not None:
5657
print(pprint(result))
5758

splunklib/results.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
from __future__ import absolute_import
3636

37-
from io import BytesIO
37+
from io import BufferedReader, BytesIO
3838

3939
from splunklib import six
4040
try:
@@ -43,6 +43,7 @@
4343
import xml.etree.ElementTree as et
4444

4545
from collections import OrderedDict
46+
from json import loads as json_loads
4647

4748
try:
4849
from splunklib.six.moves import cStringIO as StringIO
@@ -287,6 +288,68 @@ def __itertext(self):
287288
else:
288289
raise
289290

291+
class JSONResultsReader(object):
292+
"""This class returns dictionaries and Splunk messages from a JSON results
293+
stream.
294+
``JSONResultsReader`` is iterable, and returns a ``dict`` for results, or a
295+
:class:`Message` object for Splunk messages. This class has one field,
296+
``is_preview``, which is ``True`` when the results are a preview from a
297+
running search, or ``False`` when the results are from a completed search.
298+
This function has no network activity other than what is implicit in the
299+
stream it operates on.
300+
:param `stream`: The stream to read from (any object that supports
301+
``.read()``).
302+
**Example**::
303+
import results
304+
response = ... # the body of an HTTP response
305+
reader = results.JSONResultsReader(response)
306+
for result in reader:
307+
if isinstance(result, dict):
308+
print "Result: %s" % result
309+
elif isinstance(result, results.Message):
310+
print "Message: %s" % result
311+
print "is_preview = %s " % reader.is_preview
312+
"""
313+
# Be sure to update the docstrings of client.Jobs.oneshot,
314+
# client.Job.results_preview and client.Job.results to match any
315+
# changes made to JSONResultsReader.
316+
#
317+
# This wouldn't be a class, just the _parse_results function below,
318+
# except that you cannot get the current generator inside the
319+
# function creating that generator. Thus it's all wrapped up for
320+
# the sake of one field.
321+
def __init__(self, stream):
322+
# The search/jobs/exports endpoint, when run with
323+
# earliest_time=rt and latest_time=rt, output_mode=json, streams a sequence of
324+
# JSON documents, each containing a result, as opposed to one
325+
# results element containing lots of results.
326+
stream = BufferedReader(stream)
327+
self.is_preview = None
328+
self._gen = self._parse_results(stream)
290329

330+
def __iter__(self):
331+
return self
291332

333+
def next(self):
334+
return next(self._gen)
292335

336+
__next__ = next
337+
338+
def _parse_results(self, stream):
339+
"""Parse results and messages out of *stream*."""
340+
for line in stream.readlines():
341+
strip_line = line.strip()
342+
if strip_line.__len__() == 0 : continue
343+
parsed_line = json_loads(strip_line)
344+
if "preview" in parsed_line:
345+
self.is_preview = parsed_line["preview"]
346+
if "messages" in parsed_line and parsed_line["messages"].__len__() > 0:
347+
for message in parsed_line["messages"]:
348+
msg_type = message.get("type", "Unknown Message Type")
349+
text = message.get("text")
350+
yield Message(msg_type, text)
351+
if "result" in parsed_line:
352+
yield parsed_line["result"]
353+
if "results" in parsed_line:
354+
for result in parsed_line["results"]:
355+
yield result

tests/test_job.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ def test_oneshot_with_garbage_fails(self):
5454

5555
def test_oneshot(self):
5656
jobs = self.service.jobs
57-
stream = jobs.oneshot("search index=_internal earliest=-1m | head 3")
58-
result = results.ResultsReader(stream)
57+
stream = jobs.oneshot("search index=_internal earliest=-1m | head 3", output_mode='json')
58+
result = results.JSONResultsReader(stream)
5959
ds = list(result)
6060
self.assertEqual(result.is_preview, False)
6161
self.assertTrue(isinstance(ds[0], dict) or \
@@ -69,8 +69,8 @@ def test_export_with_garbage_fails(self):
6969

7070
def test_export(self):
7171
jobs = self.service.jobs
72-
stream = jobs.export("search index=_internal earliest=-1m | head 3")
73-
result = results.ResultsReader(stream)
72+
stream = jobs.export("search index=_internal earliest=-1m | head 3", output_mode='json')
73+
result = results.JSONResultsReader(stream)
7474
ds = list(result)
7575
self.assertEqual(result.is_preview, False)
7676
self.assertTrue(isinstance(ds[0], dict) or \
@@ -82,7 +82,7 @@ def test_export_docstring_sample(self):
8282
import splunklib.client as client
8383
import splunklib.results as results
8484
service = self.service # cheat
85-
rr = results.ResultsReader(service.jobs.export("search * | head 5"))
85+
rr = results.JSONResultsReader(service.jobs.export("search * | head 5", output_mode='json'))
8686
for result in rr:
8787
if isinstance(result, results.Message):
8888
# Diagnostic messages may be returned in the results
@@ -98,7 +98,7 @@ def test_results_docstring_sample(self):
9898
job = service.jobs.create("search * | head 5")
9999
while not job.is_done():
100100
sleep(0.2)
101-
rr = results.ResultsReader(job.results())
101+
rr = results.JSONResultsReader(job.results(output_mode='json'))
102102
for result in rr:
103103
if isinstance(result, results.Message):
104104
# Diagnostic messages may be returned in the results
@@ -113,7 +113,7 @@ def test_preview_docstring_sample(self):
113113
import splunklib.results as results
114114
service = self.service # cheat
115115
job = service.jobs.create("search * | head 5")
116-
rr = results.ResultsReader(job.preview())
116+
rr = results.JSONResultsReader(job.preview(output_mode='json'))
117117
for result in rr:
118118
if isinstance(result, results.Message):
119119
# Diagnostic messages may be returned in the results
@@ -130,7 +130,7 @@ def test_oneshot_docstring_sample(self):
130130
import splunklib.client as client
131131
import splunklib.results as results
132132
service = self.service # cheat
133-
rr = results.ResultsReader(service.jobs.oneshot("search * | head 5"))
133+
rr = results.JSONResultsReader(service.jobs.oneshot("search * | head 5", output_mode='json'))
134134
for result in rr:
135135
if isinstance(result, results.Message):
136136
# Diagnostic messages may be returned in the results
@@ -295,12 +295,12 @@ def test_get_preview_and_events(self):
295295
self.assertEventuallyTrue(self.job.is_done)
296296
self.assertLessEqual(int(self.job['eventCount']), 3)
297297

298-
preview_stream = self.job.preview()
299-
preview_r = results.ResultsReader(preview_stream)
298+
preview_stream = self.job.preview(output_mode='json')
299+
preview_r = results.JSONResultsReader(preview_stream)
300300
self.assertFalse(preview_r.is_preview)
301301

302-
events_stream = self.job.events()
303-
events_r = results.ResultsReader(events_stream)
302+
events_stream = self.job.events(output_mode='json')
303+
events_r = results.JSONResultsReader(events_stream)
304304

305305
n_events = len([x for x in events_r if isinstance(x, dict)])
306306
n_preview = len([x for x in preview_r if isinstance(x, dict)])

tests/test_results.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def test_read_from_empty_result_set(self):
3030
job = self.service.jobs.create("search index=_internal_does_not_exist | head 2")
3131
while not job.is_done():
3232
sleep(0.5)
33-
self.assertEqual(0, len(list(results.ResultsReader(io.BufferedReader(job.results())))))
33+
self.assertEqual(0, len(list(results.JSONResultsReader(io.BufferedReader(job.results(output_mode='json'))))))
3434

3535
def test_read_normal_results(self):
3636
xml_text = """

0 commit comments

Comments
 (0)