4848
4949namespace RabbitMQ . Client . Impl
5050{
51- internal abstract class ChannelBase : IChannel , IRecoverable
51+ internal class Channel : IChannel , IRecoverable
5252 {
5353 ///<summary>Only used to kick-start a connection open
5454 ///sequence. See <see cref="Connection.OpenAsync"/> </summary>
@@ -71,7 +71,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable
7171
7272 internal readonly IConsumerDispatcher ConsumerDispatcher ;
7373
74- protected ChannelBase ( ConnectionConfig config , ISession session , ushort ? perChannelConsumerDispatchConcurrency = null )
74+ public Channel ( ConnectionConfig config , ISession session , ushort ? perChannelConsumerDispatchConcurrency = null )
7575 {
7676 ContinuationTimeout = config . ContinuationTimeout ;
7777 ConsumerDispatcher = new AsyncConsumerDispatcher ( this ,
@@ -92,6 +92,7 @@ protected ChannelBase(ConnectionConfig config, ISession session, ushort? perChan
9292 }
9393
9494 internal TimeSpan HandshakeContinuationTimeout { get ; set ; } = TimeSpan . FromSeconds ( 10 ) ;
95+
9596 public TimeSpan ContinuationTimeout { get ; set ; }
9697
9798 public event AsyncEventHandler < BasicAckEventArgs > BasicAcksAsync
@@ -192,7 +193,7 @@ public void MaybeSetConnectionStartException(Exception ex)
192193 }
193194 }
194195
195- protected void TakeOver ( ChannelBase other )
196+ protected void TakeOver ( Channel other )
196197 {
197198 _basicAcksAsyncWrapper . Takeover ( other . _basicAcksAsyncWrapper ) ;
198199 _basicNacksAsyncWrapper . Takeover ( other . _basicNacksAsyncWrapper ) ;
@@ -355,8 +356,6 @@ await ModelSendAsync(in method, k.CancellationToken)
355356 }
356357 }
357358
358- protected abstract Task < bool > DispatchCommandAsync ( IncomingCommand cmd , CancellationToken cancellationToken ) ;
359-
360359 protected bool Enqueue ( IRpcContinuation k )
361360 {
362361 if ( IsOpen )
@@ -873,14 +872,26 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke
873872 }
874873 }
875874
876- public abstract ValueTask BasicAckAsync ( ulong deliveryTag , bool multiple ,
877- CancellationToken cancellationToken ) ;
875+ public virtual ValueTask BasicAckAsync ( ulong deliveryTag , bool multiple ,
876+ CancellationToken cancellationToken )
877+ {
878+ var method = new BasicAck ( deliveryTag , multiple ) ;
879+ return ModelSendAsync ( in method , cancellationToken ) ;
880+ }
878881
879- public abstract ValueTask BasicNackAsync ( ulong deliveryTag , bool multiple , bool requeue ,
880- CancellationToken cancellationToken ) ;
882+ public virtual ValueTask BasicNackAsync ( ulong deliveryTag , bool multiple , bool requeue ,
883+ CancellationToken cancellationToken )
884+ {
885+ var method = new BasicNack ( deliveryTag , multiple , requeue ) ;
886+ return ModelSendAsync ( in method , cancellationToken ) ;
887+ }
881888
882- public abstract ValueTask BasicRejectAsync ( ulong deliveryTag , bool requeue ,
883- CancellationToken cancellationToken ) ;
889+ public virtual ValueTask BasicRejectAsync ( ulong deliveryTag , bool requeue ,
890+ CancellationToken cancellationToken )
891+ {
892+ var method = new BasicReject ( deliveryTag , requeue ) ;
893+ return ModelSendAsync ( in method , cancellationToken ) ;
894+ }
884895
885896 public async Task BasicCancelAsync ( string consumerTag , bool noWait ,
886897 CancellationToken cancellationToken )
@@ -1881,5 +1892,93 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
18811892 }
18821893 }
18831894 }
1895+
1896+ /// <summary>
1897+ /// Returning <c>true</c> from this method means that the command was server-originated,
1898+ /// and handled already.
1899+ /// Returning <c>false</c> (the default) means that the incoming command is the response to
1900+ /// a client-initiated RPC call, and must be handled.
1901+ /// </summary>
1902+ /// <param name="cmd">The incoming command from the AMQP server</param>
1903+ /// <param name="cancellationToken">The cancellation token</param>
1904+ /// <returns></returns>
1905+ private Task < bool > DispatchCommandAsync ( IncomingCommand cmd , CancellationToken cancellationToken )
1906+ {
1907+ switch ( cmd . CommandId )
1908+ {
1909+ case ProtocolCommandId . BasicCancel :
1910+ {
1911+ // Note: always returns true
1912+ return HandleBasicCancelAsync ( cmd , cancellationToken ) ;
1913+ }
1914+ case ProtocolCommandId . BasicDeliver :
1915+ {
1916+ // Note: always returns true
1917+ return HandleBasicDeliverAsync ( cmd , cancellationToken ) ;
1918+ }
1919+ case ProtocolCommandId . BasicAck :
1920+ {
1921+ return HandleBasicAck ( cmd , cancellationToken ) ;
1922+ }
1923+ case ProtocolCommandId . BasicNack :
1924+ {
1925+ return HandleBasicNack ( cmd , cancellationToken ) ;
1926+ }
1927+ case ProtocolCommandId . BasicReturn :
1928+ {
1929+ // Note: always returns true
1930+ return HandleBasicReturn ( cmd , cancellationToken ) ;
1931+ }
1932+ case ProtocolCommandId . ChannelClose :
1933+ {
1934+ // Note: always returns true
1935+ return HandleChannelCloseAsync ( cmd , cancellationToken ) ;
1936+ }
1937+ case ProtocolCommandId . ChannelCloseOk :
1938+ {
1939+ // Note: always returns true
1940+ return HandleChannelCloseOkAsync ( cmd , cancellationToken ) ;
1941+ }
1942+ case ProtocolCommandId . ChannelFlow :
1943+ {
1944+ // Note: always returns true
1945+ return HandleChannelFlowAsync ( cmd , cancellationToken ) ;
1946+ }
1947+ case ProtocolCommandId . ConnectionBlocked :
1948+ {
1949+ // Note: always returns true
1950+ return HandleConnectionBlockedAsync ( cmd , cancellationToken ) ;
1951+ }
1952+ case ProtocolCommandId . ConnectionClose :
1953+ {
1954+ // Note: always returns true
1955+ return HandleConnectionCloseAsync ( cmd , cancellationToken ) ;
1956+ }
1957+ case ProtocolCommandId . ConnectionSecure :
1958+ {
1959+ // Note: always returns true
1960+ return HandleConnectionSecureAsync ( cmd , cancellationToken ) ;
1961+ }
1962+ case ProtocolCommandId . ConnectionStart :
1963+ {
1964+ // Note: always returns true
1965+ return HandleConnectionStartAsync ( cmd , cancellationToken ) ;
1966+ }
1967+ case ProtocolCommandId . ConnectionTune :
1968+ {
1969+ // Note: always returns true
1970+ return HandleConnectionTuneAsync ( cmd , cancellationToken ) ;
1971+ }
1972+ case ProtocolCommandId . ConnectionUnblocked :
1973+ {
1974+ // Note: always returns true
1975+ return HandleConnectionUnblockedAsync ( cancellationToken ) ;
1976+ }
1977+ default :
1978+ {
1979+ return Task . FromResult ( false ) ;
1980+ }
1981+ }
1982+ }
18841983 }
18851984}
0 commit comments