@@ -370,18 +370,26 @@ def _maybe_connect(self, node_id):
370370 conn = self ._conns .get (node_id )
371371
372372 if conn is None :
373- broker = self .cluster .broker_metadata (node_id )
374- assert broker , 'Broker id %s not in current metadata' % (node_id ,)
375-
376- log .debug ("Initiating connection to node %s at %s:%s" ,
377- node_id , broker .host , broker .port )
378- host , port , afi = get_ip_port_afi (broker .host )
379- cb = WeakMethod (self ._conn_state_change )
380- conn = BrokerConnection (host , broker .port , afi ,
381- state_change_callback = cb ,
382- node_id = node_id ,
383- ** self .config )
384- self ._conns [node_id ] = conn
373+ broker_metadata = self .cluster .broker_metadata (node_id )
374+
375+ # The broker may have been removed from the cluster after the
376+ # call to `maybe_connect`. At this point there is no way to
377+ # recover, so just ignore the connection
378+ if broker_metadata is None :
379+ log .debug ("Node %s is not available anymore, discarding connection" , node_id )
380+ if node_id in self ._connecting :
381+ self ._connecting .remove (node_id )
382+ return False
383+ else :
384+ log .debug ("Initiating connection to node %s at %s:%s" ,
385+ node_id , broker_metadata .host , broker_metadata .port )
386+ host , port , afi = get_ip_port_afi (broker_metadata .host )
387+ cb = WeakMethod (self ._conn_state_change )
388+ conn = BrokerConnection (host , broker_metadata .port , afi ,
389+ state_change_callback = cb ,
390+ node_id = node_id ,
391+ ** self .config )
392+ self ._conns [node_id ] = conn
385393
386394 # Check if existing connection should be recreated because host/port changed
387395 elif self ._should_recycle_connection (conn ):
0 commit comments