@@ -234,13 +234,20 @@ async def close(self):
234234 for key in keys_schema_updates_subs :
235235 sub = self .schema_updates_subs .get (key )
236236 task = self .schema_tasks .get (key )
237- del self .schema_updates_data [key ]
238- del self .schema_updates_subs [key ]
239- del self .producers_per_station [key ]
240- del self .schema_tasks [key ]
241- task .cancel ()
242- await sub .unsubscribe ()
243- await self .update_configurations_sub .unsubscribe ()
237+ if key in self .schema_updates_data :
238+ del self .schema_updates_data [key ]
239+ if key in self .schema_updates_subs :
240+ del self .schema_updates_subs [key ]
241+ if key in self .producers_per_station :
242+ del self .producers_per_station [key ]
243+ if key in self .schema_tasks :
244+ del self .schema_tasks [key ]
245+ if task is not None :
246+ task .cancel ()
247+ if sub is not None :
248+ await sub .unsubscribe ()
249+ if self .update_configurations_sub is not None :
250+ await self .update_configurations_sub .unsubscribe ()
244251 except :
245252 return
246253
@@ -454,12 +461,18 @@ async def destroy(self):
454461 sub = self .connection .schema_updates_subs .get (
455462 station_name_internal )
456463 task = self .connection .schema_tasks .get (station_name_internal )
457- del self .connection .schema_updates_data [station_name_internal ]
458- del self .connection .schema_updates_subs [station_name_internal ]
459- del self .connection .producers_per_station [station_name_internal ]
460- del self .connection .schema_tasks [station_name_internal ]
461- task .cancel ()
462- await sub .unsubscribe ()
464+ if station_name_internal in self .connection .schema_updates_data :
465+ del self .connection .schema_updates_data [station_name_internal ]
466+ if station_name_internal in self .connection .schema_updates_subs :
467+ del self .connection .schema_updates_subs [station_name_internal ]
468+ if station_name_internal in self .connection .producers_per_station :
469+ del self .connection .producers_per_station [station_name_internal ]
470+ if station_name_internal in self .connection .schema_tasks :
471+ del self .connection .schema_tasks [station_name_internal ]
472+ if task is not None :
473+ task .cancel ()
474+ if sub is not None :
475+ await sub .unsubscribe ()
463476
464477 except Exception as e :
465478 raise MemphisError (str (e )) from e
@@ -673,11 +686,16 @@ async def destroy(self):
673686 sub = self .connection .schema_updates_subs .get (
674687 station_name_internal )
675688 task = self .connection .schema_tasks .get (station_name_internal )
676- del self .connection .schema_updates_data [station_name_internal ]
677- del self .connection .schema_updates_subs [station_name_internal ]
678- del self .connection .schema_tasks [station_name_internal ]
679- task .cancel ()
680- await sub .unsubscribe ()
689+ if station_name_internal in self .connection .schema_updates_data :
690+ del self .connection .schema_updates_data [station_name_internal ]
691+ if station_name_internal in self .connection .schema_updates_subs :
692+ del self .connection .schema_updates_subs [station_name_internal ]
693+ if station_name_internal in self .connection .schema_tasks :
694+ del self .connection .schema_tasks [station_name_internal ]
695+ if task is not None :
696+ task .cancel ()
697+ if sub is not None :
698+ await sub .unsubscribe ()
681699
682700 except Exception as e :
683701 raise Exception (e )
@@ -759,9 +777,12 @@ async def __ping_consumer(self, callback):
759777 async def destroy (self ):
760778 """Destroy the consumer.
761779 """
762- self .t_consume .cancel ()
763- self .t_dls .cancel ()
764- self .t_ping .cancel ()
780+ if self .t_consume is not None :
781+ self .t_consume .cancel ()
782+ if self .t_dls is not None :
783+ self .t_dls .cancel ()
784+ if self .t_ping is not None :
785+ self .t_ping .cancel ()
765786 self .pull_interval_ms = None
766787 try :
767788 destroyConsumerReq = {
0 commit comments