@@ -82,7 +82,7 @@ def iter_resp_lines(resp):
8282 last_was_empty = False # Reset empty-line flag
8383 else :
8484 if not last_was_empty :
85- yield '\n ' # Only print one empty line
85+ yield '' # Only print one empty line
8686 last_was_empty = True # Mark that we handled an empty line
8787 next_newline = buffer .find (b'\n ' )
8888
@@ -113,24 +113,29 @@ def get_watch_argument_name(self, func):
113113 return 'watch'
114114
115115 def unmarshal_event (self , data , return_type ):
116- js = json .loads (data )
117- js ['raw_object' ] = js ['object' ]
118- # BOOKMARK event is treated the same as ERROR for a quick fix of
119- # decoding exception
120- # TODO: make use of the resource_version in BOOKMARK event for more
121- # efficient WATCH
122- if return_type and js ['type' ] != 'ERROR' and js ['type' ] != 'BOOKMARK' :
123- obj = SimpleNamespace (data = json .dumps (js ['raw_object' ]))
124- js ['object' ] = self ._api_client .deserialize (obj , return_type )
125- if hasattr (js ['object' ], 'metadata' ):
126- self .resource_version = js ['object' ].metadata .resource_version
127- # For custom objects that we don't have model defined, json
128- # deserialization results in dictionary
129- elif (isinstance (js ['object' ], dict ) and 'metadata' in js ['object' ]
130- and 'resourceVersion' in js ['object' ]['metadata' ]):
131- self .resource_version = js ['object' ]['metadata' ][
132- 'resourceVersion' ]
133- return js
116+ if not data or data .isspace ():
117+ return None
118+ try :
119+ js = json .loads (data )
120+ js ['raw_object' ] = js ['object' ]
121+ # BOOKMARK event is treated the same as ERROR for a quick fix of
122+ # decoding exception
123+ # TODO: make use of the resource_version in BOOKMARK event for more
124+ # efficient WATCH
125+ if return_type and js ['type' ] != 'ERROR' and js ['type' ] != 'BOOKMARK' :
126+ obj = SimpleNamespace (data = json .dumps (js ['raw_object' ]))
127+ js ['object' ] = self ._api_client .deserialize (obj , return_type )
128+ if hasattr (js ['object' ], 'metadata' ):
129+ self .resource_version = js ['object' ].metadata .resource_version
130+ # For custom objects that we don't have model defined, json
131+ # deserialization results in dictionary
132+ elif (isinstance (js ['object' ], dict ) and 'metadata' in js ['object' ]
133+ and 'resourceVersion' in js ['object' ]['metadata' ]):
134+ self .resource_version = js ['object' ]['metadata' ][
135+ 'resourceVersion' ]
136+ return js
137+ except json .JSONDecodeError :
138+ return None
134139
135140 def stream (self , func , * args , ** kwargs ):
136141 """Watch an API resource and stream the result back via a generator.
@@ -209,7 +214,7 @@ def stream(self, func, *args, **kwargs):
209214 yield line # Normal non-empty line
210215 last_was_empty = False
211216 elif not last_was_empty :
212- yield '/n ' # Only yield one empty line
217+ yield '' # Only yield one empty line
213218 last_was_empty = True
214219 if self ._stop :
215220 break
0 commit comments