@@ -27,3 +27,71 @@ def stream(self, records):
2727 output = chunky .ChunkedDataStream (ofile )
2828 getinfo_response = output .read_chunk ()
2929 assert getinfo_response .meta ["type" ] == "streaming"
30+
31+
32+ def test_field_preservation_negative ():
33+ @Configuration ()
34+ class TestStreamingCommand (StreamingCommand ):
35+
36+ def stream (self , records ):
37+ for index , record in enumerate (records ):
38+ if index % 2 != 0 :
39+ record ["odd_field" ] = True
40+ else :
41+ record ["even_field" ] = True
42+ yield record
43+
44+ cmd = TestStreamingCommand ()
45+ ifile = io .BytesIO ()
46+ ifile .write (chunky .build_getinfo_chunk ())
47+ data = list ()
48+ for i in range (0 , 10 ):
49+ data .append ({"in_index" : str (i )})
50+ ifile .write (chunky .build_data_chunk (data , finished = True ))
51+ ifile .seek (0 )
52+ ofile = io .BytesIO ()
53+ cmd ._process_protocol_v2 ([], ifile , ofile )
54+ ofile .seek (0 )
55+ output_iter = chunky .ChunkedDataStream (ofile ).__iter__ ()
56+ output_iter .next ()
57+ output_records = [i for i in output_iter .next ().data ]
58+
59+ # Assert that count of records having "odd_field" is 0
60+ assert len (list (filter (lambda r : "odd_field" in r , output_records ))) == 0
61+
62+ # Assert that count of records having "even_field" is 10
63+ assert len (list (filter (lambda r : "even_field" in r , output_records ))) == 10
64+
65+
66+ def test_field_preservation_positive ():
67+ @Configuration ()
68+ class TestStreamingCommand (StreamingCommand ):
69+
70+ def stream (self , records ):
71+ for index , record in enumerate (records ):
72+ if index % 2 != 0 :
73+ self .add_field (record , "odd_field" , True )
74+ else :
75+ self .add_field (record , "even_field" , True )
76+ yield record
77+
78+ cmd = TestStreamingCommand ()
79+ ifile = io .BytesIO ()
80+ ifile .write (chunky .build_getinfo_chunk ())
81+ data = list ()
82+ for i in range (0 , 10 ):
83+ data .append ({"in_index" : str (i )})
84+ ifile .write (chunky .build_data_chunk (data , finished = True ))
85+ ifile .seek (0 )
86+ ofile = io .BytesIO ()
87+ cmd ._process_protocol_v2 ([], ifile , ofile )
88+ ofile .seek (0 )
89+ output_iter = chunky .ChunkedDataStream (ofile ).__iter__ ()
90+ output_iter .next ()
91+ output_records = [i for i in output_iter .next ().data ]
92+
93+ # Assert that count of records having "odd_field" is 10
94+ assert len (list (filter (lambda r : "odd_field" in r , output_records ))) == 10
95+
96+ # Assert that count of records having "even_field" is 10
97+ assert len (list (filter (lambda r : "even_field" in r , output_records ))) == 10
0 commit comments