@@ -241,29 +241,9 @@ private async Task CloseAsync(ShutdownEventArgs reason, bool abort)
241241 }
242242 }
243243
244- internal void ConnectionOpen ( string virtualHost )
244+ internal async ValueTask ConnectionOpenAsync ( string virtualHost )
245245 {
246- var k = new SimpleBlockingRpcContinuation ( ) ;
247- lock ( _rpcLock )
248- {
249- Enqueue ( k ) ;
250- try
251- {
252- _Private_ConnectionOpen ( virtualHost ) ;
253- }
254- catch ( AlreadyClosedException )
255- {
256- // let continuation throw OperationInterruptedException,
257- // which is a much more suitable exception before connection
258- // negotiation finishes
259- }
260- k . GetReply ( HandshakeContinuationTimeout ) ;
261- }
262- }
263-
264- internal ValueTask ConnectionOpenAsync ( string virtualHost )
265- {
266- return _Private_ConnectionOpenAsync ( virtualHost ) ;
246+ await _Private_ConnectionOpenAsync ( virtualHost ) . TimeoutAfter ( HandshakeContinuationTimeout ) ;
267247 }
268248
269249 internal async ValueTask < ConnectionSecureOrTune > ConnectionSecureOkAsync ( byte [ ] response )
@@ -401,7 +381,7 @@ protected void ChannelSend<T>(in T method) where T : struct, IOutgoingAmqpMethod
401381 {
402382 Session . Transmit ( in method ) ;
403383 }
404-
384+
405385 [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
406386 protected ValueTask ModelSendAsync < T > ( in T method ) where T : struct , IOutgoingAmqpMethod
407387 {
@@ -419,7 +399,7 @@ protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader heade
419399 }
420400 Session . Transmit ( in method , in header , body ) ;
421401 }
422-
402+
423403 [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
424404 protected ValueTask ModelSendAsync < TMethod , THeader > ( in TMethod method , in THeader header , ReadOnlyMemory < byte > body )
425405 where TMethod : struct , IOutgoingAmqpMethod
@@ -429,7 +409,7 @@ protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THead
429409 {
430410 _flowControlBlock . Wait ( ) ;
431411 }
432-
412+
433413 return Session . TransmitAsync ( in method , in header , body ) ;
434414 }
435415
@@ -950,7 +930,7 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
950930 var cmd = new BasicPublishMemory ( exchange . Bytes , routingKey . Bytes , mandatory , default ) ;
951931 ChannelSend ( in cmd , in basicProperties , body ) ;
952932 }
953-
933+
954934 public ValueTask BasicPublishAsync < TProperties > ( string exchange , string routingKey , in TProperties basicProperties , ReadOnlyMemory < byte > body , bool mandatory )
955935 where TProperties : IReadOnlyBasicProperties , IAmqpHeader
956936 {
@@ -965,7 +945,7 @@ public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingK
965945 var cmd = new BasicPublish ( exchange , routingKey , mandatory , default ) ;
966946 return ModelSendAsync ( in cmd , in basicProperties , body ) ;
967947 }
968-
948+
969949 public ValueTask BasicPublishAsync < TProperties > ( CachedString exchange , CachedString routingKey , in TProperties basicProperties , ReadOnlyMemory < byte > body , bool mandatory )
970950 where TProperties : IReadOnlyBasicProperties , IAmqpHeader
971951 {
0 commit comments