@@ -61,13 +61,133 @@ def test_watch_with_decode(self):
6161 if count == 4 :
6262 w .stop ()
6363
64+ # make sure that all three records were consumed by the stream
65+ self .assertEqual (4 , count )
66+
6467 fake_api .get_namespaces .assert_called_once_with (
6568 _preload_content = False , watch = True )
6669 fake_resp .stream .assert_called_once_with (
6770 amt = None , decode_content = False )
6871 fake_resp .close .assert_called_once ()
6972 fake_resp .release_conn .assert_called_once ()
7073
74+ def test_watch_with_interspersed_newlines (self ):
75+ fake_resp = Mock ()
76+ fake_resp .close = Mock ()
77+ fake_resp .release_conn = Mock ()
78+ fake_resp .stream = Mock (
79+ return_value = [
80+ '\n ' ,
81+ '{"type": "ADDED", "object": {"metadata":' ,
82+ '{"name": "test1","resourceVersion": "1"}}}\n {"type": "ADDED", ' ,
83+ '"object": {"metadata": {"name": "test2", "resourceVersion": "2"}}}\n ' ,
84+ '\n ' ,
85+ '' ,
86+ '{"type": "ADDED", "object": {"metadata": {"name": "test3", "resourceVersion": "3"}}}\n ' ,
87+ '\n \n \n ' ,
88+ '\n ' ,
89+ ])
90+
91+ fake_api = Mock ()
92+ fake_api .get_namespaces = Mock (return_value = fake_resp )
93+ fake_api .get_namespaces .__doc__ = ':return: V1NamespaceList'
94+
95+ w = Watch ()
96+ count = 0
97+
98+ # Consume all test events from the mock service, stopping when no more data is available.
99+ # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
100+ # the only way to do so. Without that, the stream will re-read the test data forever.
101+ for e in w .stream (fake_api .get_namespaces , timeout_seconds = 1 ):
102+ count += 1
103+ self .assertEqual ("test%d" % count , e ['object' ].metadata .name )
104+ self .assertEqual (3 , count )
105+
106+ def test_watch_with_multibyte_utf8 (self ):
107+ fake_resp = Mock ()
108+ fake_resp .close = Mock ()
109+ fake_resp .release_conn = Mock ()
110+ fake_resp .stream = Mock (
111+ return_value = [
112+ # two-byte utf-8 character
113+ '{"type":"MODIFIED","object":{"data":{"utf-8":"© 1"},"metadata":{"name":"test1","resourceVersion":"1"}}}\n ' ,
114+ # same copyright character expressed as bytes
115+ b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xC2 \xA9 2"},"metadata":{"name":"test2","resourceVersion":"2"}}}\n '
116+ # same copyright character with bytes split across two stream chunks
117+ b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xC2 ' ,
118+ b'\xA9 3"},"metadata":{"n' ,
119+ # more chunks of the same event, sent as a mix of bytes and strings
120+ 'ame":"test3","resourceVersion":"3"' ,
121+ '}}}' ,
122+ b'\n '
123+ ])
124+
125+ fake_api = Mock ()
126+ fake_api .get_configmaps = Mock (return_value = fake_resp )
127+ fake_api .get_configmaps .__doc__ = ':return: V1ConfigMapList'
128+
129+ w = Watch ()
130+ count = 0
131+
132+ # Consume all test events from the mock service, stopping when no more data is available.
133+ # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
134+ # the only way to do so. Without that, the stream will re-read the test data forever.
135+ for event in w .stream (fake_api .get_configmaps , timeout_seconds = 1 ):
136+ count += 1
137+ self .assertEqual ("MODIFIED" , event ['type' ])
138+ self .assertEqual ("test%d" % count , event ['object' ].metadata .name )
139+ self .assertEqual ("© %d" % count , event ['object' ].data ["utf-8" ])
140+ self .assertEqual (
141+ "%d" % count , event ['object' ].metadata .resource_version )
142+ self .assertEqual ("%d" % count , w .resource_version )
143+ self .assertEqual (3 , count )
144+
145+ def test_watch_with_invalid_utf8 (self ):
146+ fake_resp = Mock ()
147+ fake_resp .close = Mock ()
148+ fake_resp .release_conn = Mock ()
149+ fake_resp .stream = Mock (
150+ # test 1 uses 1 invalid utf-8 byte
151+ # test 2 uses a sequence of 2 invalid utf-8 bytes
152+ # test 3 uses a sequence of 3 invalid utf-8 bytes
153+ return_value = [
154+ # utf-8 sequence for 😄 is \xF0\x9F\x98\x84
155+ # all other sequences below are invalid
156+ # ref: https://www.w3.org/2001/06/utf-8-wrong/UTF-8-test.html
157+ b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0 \x9F \x98 \x84 1","invalid":"\x80 1"},"metadata":{"name":"test1"}}}\n ' ,
158+ b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0 \x9F \x98 \x84 2","invalid":"\xC0 \xAF 2"},"metadata":{"name":"test2"}}}\n ' ,
159+ # mix bytes/strings and split byte sequences across chunks
160+ b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0 \x9F \x98 ' ,
161+ b'\x84 ' ,
162+ b'' ,
163+ b'3","invalid":"\xE0 \x80 ' ,
164+ b'\xAF ' ,
165+ '3"},"metadata":{"n' ,
166+ 'ame":"test3"' ,
167+ '}}}' ,
168+ b'\n '
169+ ])
170+
171+ fake_api = Mock ()
172+ fake_api .get_configmaps = Mock (return_value = fake_resp )
173+ fake_api .get_configmaps .__doc__ = ':return: V1ConfigMapList'
174+
175+ w = Watch ()
176+ count = 0
177+
178+ # Consume all test events from the mock service, stopping when no more data is available.
179+ # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
180+ # the only way to do so. Without that, the stream will re-read the test data forever.
181+ for event in w .stream (fake_api .get_configmaps , timeout_seconds = 1 ):
182+ count += 1
183+ self .assertEqual ("MODIFIED" , event ['type' ])
184+ self .assertEqual ("test%d" % count , event ['object' ].metadata .name )
185+ self .assertEqual ("😄 %d" % count , event ['object' ].data ["utf-8" ])
186+ # expect N replacement characters in test N
187+ self .assertEqual ("� %d" .replace ('�' , '�' * count ) %
188+ count , event ['object' ].data ["invalid" ])
189+ self .assertEqual (3 , count )
190+
71191 def test_watch_for_follow (self ):
72192 fake_resp = Mock ()
73193 fake_resp .close = Mock ()
0 commit comments