@@ -78,12 +78,14 @@ class SocketFrameHandler : IFrameHandler
7878 private readonly byte [ ] _frameHeaderBuffer ;
7979 private bool _closed ;
8080 private ArrayPool < byte > _pool = ArrayPool < byte > . Shared ;
81+ private readonly bool _enableSynchronousWriteLoop ;
8182
8283 public SocketFrameHandler ( AmqpTcpEndpoint endpoint ,
8384 Func < AddressFamily , ITcpClient > socketFactory ,
84- TimeSpan connectionTimeout , TimeSpan readTimeout , TimeSpan writeTimeout )
85+ TimeSpan connectionTimeout , TimeSpan readTimeout , TimeSpan writeTimeout , bool enableSynchronousWriteLoop )
8586 {
8687 _endpoint = endpoint ;
88+ _enableSynchronousWriteLoop = enableSynchronousWriteLoop ;
8789 _frameHeaderBuffer = new byte [ 6 ] ;
8890 var channel = Channel . CreateUnbounded < ReadOnlyMemory < byte > > (
8991 new UnboundedChannelOptions
@@ -134,7 +136,15 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
134136 _writer = new BufferedStream ( netstream , _socket . Client . SendBufferSize ) ;
135137
136138 WriteTimeout = writeTimeout ;
137- _writerTask = Task . Run ( WriteLoop , CancellationToken . None ) ;
139+ if ( _enableSynchronousWriteLoop )
140+ {
141+ TaskCreationOptions tco = TaskCreationOptions . LongRunning | TaskCreationOptions . DenyChildAttach ;
142+ _writerTask = Task . Factory . StartNew ( SynchronousWriteLoop , CancellationToken . None , tco , TaskScheduler . Default ) ;
143+ }
144+ else
145+ {
146+ _writerTask = Task . Run ( WriteLoop , CancellationToken . None ) ;
147+ }
138148 }
139149
140150 public AmqpTcpEndpoint Endpoint
@@ -270,17 +280,41 @@ private async Task WriteLoop()
270280 while ( await _channelReader . WaitToReadAsync ( ) . ConfigureAwait ( false ) )
271281 {
272282 _socket . Client . Poll ( _writeableStateTimeoutMicroSeconds , SelectMode . SelectWrite ) ;
273- while ( _channelReader . TryRead ( out var memory ) )
283+ while ( _channelReader . TryRead ( out ReadOnlyMemory < byte > memory ) )
274284 {
275- MemoryMarshal . TryGetArray ( memory , out ArraySegment < byte > segment ) ;
276- await _writer . WriteAsync ( segment . Array , segment . Offset , segment . Count ) . ConfigureAwait ( false ) ;
277- MemoryPool . Return ( segment . Array ) ;
285+ if ( MemoryMarshal . TryGetArray ( memory , out ArraySegment < byte > segment ) )
286+ {
287+ if ( segment . Array != null )
288+ {
289+ await _writer . WriteAsync ( segment . Array , segment . Offset , segment . Count ) . ConfigureAwait ( false ) ;
290+ MemoryPool . Return ( segment . Array ) ;
291+ }
292+ }
278293 }
279-
280294 await _writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
281295 }
282296 }
283297
298+ private void SynchronousWriteLoop ( )
299+ {
300+ while ( _channelReader . WaitToReadAsync ( ) . AsTask ( ) . Result )
301+ {
302+ _socket . Client . Poll ( _writeableStateTimeoutMicroSeconds , SelectMode . SelectWrite ) ;
303+ while ( _channelReader . TryRead ( out ReadOnlyMemory < byte > memory ) )
304+ {
305+ if ( MemoryMarshal . TryGetArray ( memory , out ArraySegment < byte > segment ) )
306+ {
307+ if ( segment . Array != null )
308+ {
309+ _writer . Write ( segment . Array , segment . Offset , segment . Count ) ;
310+ MemoryPool . Return ( segment . Array ) ;
311+ }
312+ }
313+ }
314+ _writer . Flush ( ) ;
315+ }
316+ }
317+
284318 private static bool ShouldTryIPv6 ( AmqpTcpEndpoint endpoint )
285319 {
286320 return Socket . OSSupportsIPv6 && endpoint . AddressFamily != AddressFamily . InterNetwork ;
0 commit comments