@@ -32,3 +32,42 @@ def reduce(self, records):
3232 data = list (data_chunk .data )
3333 assert len (data ) == 1
3434 assert int (data [0 ]['sum' ]) == sum (range (0 , 10 ))
35+
36+
37+ def test_simple_reporting_command_with_map ():
38+ @searchcommands .Configuration ()
39+ class MapAndReduceReportingCommand (searchcommands .ReportingCommand ):
40+ def map (self , records ):
41+ for record in records :
42+ record ["value" ] = str (int (record ["value" ]) * 2 )
43+ yield record
44+
45+ def reduce (self , records ):
46+ total = 0
47+ for record in records :
48+ total += int (record ["value" ])
49+ yield {"sum" : total }
50+
51+ cmd = MapAndReduceReportingCommand ()
52+ ifile = io .BytesIO ()
53+
54+ input_data = [{"value" : str (i )} for i in range (5 )]
55+
56+ mapped_data = list (cmd .map (input_data ))
57+
58+ ifile .write (chunky .build_getinfo_chunk ())
59+ ifile .write (chunky .build_data_chunk (mapped_data ))
60+ ifile .seek (0 )
61+
62+ ofile = io .BytesIO ()
63+ cmd ._process_protocol_v2 ([], ifile , ofile )
64+
65+ ofile .seek (0 )
66+ chunk_stream = chunky .ChunkedDataStream (ofile )
67+ chunk_stream .read_chunk ()
68+ data_chunk = chunk_stream .read_chunk ()
69+ assert data_chunk .meta ['finished' ] is True
70+
71+ result = list (data_chunk .data )
72+ expected_sum = sum (i * 2 for i in range (5 ))
73+ assert int (result [0 ]["sum" ]) == expected_sum
0 commit comments