File tree Expand file tree Collapse file tree 1 file changed +8
-1
lines changed
src/intersect_sdk/_internal/control_plane/brokers Expand file tree Collapse file tree 1 file changed +8
-1
lines changed Original file line number Diff line number Diff line change @@ -311,15 +311,22 @@ def _on_channel_closed(
311311 def _on_output_channel_open (self , channel : Channel ) -> None :
312312 channel_num = 0
313313 self ._channel_out = channel
314- self ._channel_flags .set_nth_flag (channel_num )
315314 cb = functools .partial (self ._on_channel_closed , channel_num = channel_num )
316315 self ._channel_out .add_on_close_callback (cb )
316+ # producer flag should first make sure the exchange exists before publishing
317+ channel .exchange_declare (
318+ exchange = _INTERSECT_MESSAGE_EXCHANGE ,
319+ exchange_type = 'topic' ,
320+ durable = True ,
321+ callback = lambda _frame : self ._channel_flags .set_nth_flag (channel_num ),
322+ )
317323 logger .info ('AMQP: output channel ready' )
318324
319325 # CONSUMER #
320326 def _on_input_channel_open (self , channel : Channel ) -> None :
321327 channel_num = 1
322328 self ._channel_in = channel
329+ # consumer channel flag can be set immediately
323330 self ._channel_flags .set_nth_flag (channel_num )
324331 cb_1 = functools .partial (self ._on_channel_closed , channel_num = channel_num )
325332 self ._channel_in .add_on_close_callback (cb_1 )
You can’t perform that action at this time.
0 commit comments