3434
3535from __future__ import absolute_import
3636
37- from io import BytesIO
37+ from io import BufferedReader , BytesIO
3838
3939from splunklib import six
40+
41+ from splunklib .six import deprecated
42+
4043try :
4144 import xml .etree .cElementTree as et
4245except :
4346 import xml .etree .ElementTree as et
4447
4548from collections import OrderedDict
49+ from json import loads as json_loads
4650
4751try :
4852 from splunklib .six .moves import cStringIO as StringIO
5458 "Message"
5559]
5660
61+
5762class Message (object ):
5863 """This class represents informational messages that Splunk interleaves in the results stream.
5964
@@ -64,6 +69,7 @@ class Message(object):
6469
6570 m = Message("DEBUG", "There's something in that variable...")
6671 """
72+
6773 def __init__ (self , type_ , message ):
6874 self .type = type_
6975 self .message = message
@@ -77,6 +83,7 @@ def __eq__(self, other):
7783 def __hash__ (self ):
7884 return hash ((self .type , self .message ))
7985
86+
8087class _ConcatenatedStream (object ):
8188 """Lazily concatenate zero or more streams into a stream.
8289
@@ -89,6 +96,7 @@ class _ConcatenatedStream(object):
8996 s = _ConcatenatedStream(StringIO("abc"), StringIO("def"))
9097 assert s.read() == "abcdef"
9198 """
99+
92100 def __init__ (self , * streams ):
93101 self .streams = list (streams )
94102
@@ -107,6 +115,7 @@ def read(self, n=None):
107115 del self .streams [0 ]
108116 return response
109117
118+
110119class _XMLDTDFilter (object ):
111120 """Lazily remove all XML DTDs from a stream.
112121
@@ -120,6 +129,7 @@ class _XMLDTDFilter(object):
120129 s = _XMLDTDFilter("<?xml abcd><element><?xml ...></element>")
121130 assert s.read() == "<element></element>"
122131 """
132+
123133 def __init__ (self , stream ):
124134 self .stream = stream
125135
@@ -150,6 +160,8 @@ def read(self, n=None):
150160 n -= 1
151161 return response
152162
163+
164+ @deprecated ("Use the JSONResultsReader function instead in conjuction with the 'output_mode' query param set to 'json'" )
153165class ResultsReader (object ):
154166 """This class returns dictionaries and Splunk messages from an XML results
155167 stream.
@@ -177,6 +189,7 @@ class ResultsReader(object):
177189 print "Message: %s" % result
178190 print "is_preview = %s " % reader.is_preview
179191 """
192+
180193 # Be sure to update the docstrings of client.Jobs.oneshot,
181194 # client.Job.results_preview and client.Job.results to match any
182195 # changes made to ResultsReader.
@@ -257,16 +270,16 @@ def _parse_results(self, stream):
257270 # So we'll define it here
258271
259272 def __itertext (self ):
260- tag = self .tag
261- if not isinstance (tag , six .string_types ) and tag is not None :
262- return
263- if self .text :
264- yield self .text
265- for e in self :
266- for s in __itertext (e ):
267- yield s
268- if e .tail :
269- yield e .tail
273+ tag = self .tag
274+ if not isinstance (tag , six .string_types ) and tag is not None :
275+ return
276+ if self .text :
277+ yield self .text
278+ for e in self :
279+ for s in __itertext (e ):
280+ yield s
281+ if e .tail :
282+ yield e .tail
270283
271284 text = "" .join (__itertext (elem ))
272285 values .append (text )
@@ -288,5 +301,69 @@ def __itertext(self):
288301 raise
289302
290303
304+ class JSONResultsReader (object ):
305+ """This class returns dictionaries and Splunk messages from a JSON results
306+ stream.
307+ ``JSONResultsReader`` is iterable, and returns a ``dict`` for results, or a
308+ :class:`Message` object for Splunk messages. This class has one field,
309+ ``is_preview``, which is ``True`` when the results are a preview from a
310+ running search, or ``False`` when the results are from a completed search.
311+ This function has no network activity other than what is implicit in the
312+ stream it operates on.
313+ :param `stream`: The stream to read from (any object that supports
314+ ``.read()``).
315+ **Example**::
316+ import results
317+ response = ... # the body of an HTTP response
318+ reader = results.JSONResultsReader(response)
319+ for result in reader:
320+ if isinstance(result, dict):
321+ print "Result: %s" % result
322+ elif isinstance(result, results.Message):
323+ print "Message: %s" % result
324+ print "is_preview = %s " % reader.is_preview
325+ """
326+
327+ # Be sure to update the docstrings of client.Jobs.oneshot,
328+ # client.Job.results_preview and client.Job.results to match any
329+ # changes made to JSONResultsReader.
330+ #
331+ # This wouldn't be a class, just the _parse_results function below,
332+ # except that you cannot get the current generator inside the
333+ # function creating that generator. Thus it's all wrapped up for
334+ # the sake of one field.
335+ def __init__ (self , stream ):
336+ # The search/jobs/exports endpoint, when run with
337+ # earliest_time=rt and latest_time=rt, output_mode=json, streams a sequence of
338+ # JSON documents, each containing a result, as opposed to one
339+ # results element containing lots of results.
340+ stream = BufferedReader (stream )
341+ self .is_preview = None
342+ self ._gen = self ._parse_results (stream )
343+
344+ def __iter__ (self ):
345+ return self
291346
347+ def next (self ):
348+ return next (self ._gen )
349+
350+ __next__ = next
292351
352+ def _parse_results (self , stream ):
353+ """Parse results and messages out of *stream*."""
354+ for line in stream .readlines ():
355+ strip_line = line .strip ()
356+ if strip_line .__len__ () == 0 : continue
357+ parsed_line = json_loads (strip_line )
358+ if "preview" in parsed_line :
359+ self .is_preview = parsed_line ["preview" ]
360+ if "messages" in parsed_line and parsed_line ["messages" ].__len__ () > 0 :
361+ for message in parsed_line ["messages" ]:
362+ msg_type = message .get ("type" , "Unknown Message Type" )
363+ text = message .get ("text" )
364+ yield Message (msg_type , text )
365+ if "result" in parsed_line :
366+ yield parsed_line ["result" ]
367+ if "results" in parsed_line :
368+ for result in parsed_line ["results" ]:
369+ yield result
0 commit comments