Skip to content

Commit b2bf468

Browse files
committed
CSHARP-5777: Avoid ThreadPool-dependent IO methods in sync API
Consolidate Stream extension methods.
1 parent 4236cf6 commit b2bf468

File tree

6 files changed

+375
-250
lines changed

6 files changed

+375
-250
lines changed

src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void Compress(Stream input, Stream output)
3434
{
3535
var uncompressedSize = (int)(input.Length - input.Position);
3636
var uncompressedBytes = new byte[uncompressedSize]; // does not include uncompressed message headers
37-
input.ReadBytes(OperationContext.NoTimeout, uncompressedBytes, offset: 0, count: uncompressedSize, socketTimeout: Timeout.InfiniteTimeSpan);
37+
input.ReadBytes(uncompressedBytes, offset: 0, count: uncompressedSize);
3838
var maxCompressedSize = Snappy.GetMaxCompressedLength(uncompressedSize);
3939
var compressedBytes = new byte[maxCompressedSize];
4040
var compressedSize = Snappy.Compress(uncompressedBytes, compressedBytes);
@@ -50,7 +50,7 @@ public void Decompress(Stream input, Stream output)
5050
{
5151
var compressedSize = (int)(input.Length - input.Position);
5252
var compressedBytes = new byte[compressedSize];
53-
input.ReadBytes(OperationContext.NoTimeout, compressedBytes, offset: 0, count: compressedSize, socketTimeout: Timeout.InfiniteTimeSpan);
53+
input.ReadBytes(compressedBytes, offset: 0, count: compressedSize);
5454
var uncompressedSize = Snappy.GetUncompressedLength(compressedBytes);
5555
var decompressedBytes = new byte[uncompressedSize];
5656
var decompressedSize = Snappy.Decompress(compressedBytes, decompressedBytes);

src/MongoDB.Driver/Core/Connections/BinaryConnection.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -345,14 +345,14 @@ private IByteBuffer ReceiveBuffer(OperationContext operationContext)
345345
try
346346
{
347347
var messageSizeBytes = new byte[4];
348-
_stream.ReadBytes(operationContext, messageSizeBytes, 0, 4, _socketReadTimeout);
348+
_stream.ReadBytes(messageSizeBytes, 0, 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken);
349349
var messageSize = BinaryPrimitives.ReadInt32LittleEndian(messageSizeBytes);
350350
EnsureMessageSizeIsValid(messageSize);
351351
var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default);
352352
var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize);
353353
buffer.Length = messageSize;
354354
buffer.SetBytes(0, messageSizeBytes, 0, 4);
355-
_stream.ReadBytes(operationContext, buffer, 4, messageSize - 4, _socketReadTimeout);
355+
_stream.ReadBytes(buffer, 4, messageSize - 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken);
356356
_lastUsedAtUtc = DateTime.UtcNow;
357357
buffer.MakeReadOnly();
358358
return buffer;
@@ -370,14 +370,14 @@ private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationCon
370370
try
371371
{
372372
var messageSizeBytes = new byte[4];
373-
await _stream.ReadBytesAsync(operationContext, messageSizeBytes, 0, 4, _socketReadTimeout).ConfigureAwait(false);
373+
await _stream.ReadBytesAsync(messageSizeBytes, 0, 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false);
374374
var messageSize = BinaryPrimitives.ReadInt32LittleEndian(messageSizeBytes);
375375
EnsureMessageSizeIsValid(messageSize);
376376
var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default);
377377
var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize);
378378
buffer.Length = messageSize;
379379
buffer.SetBytes(0, messageSizeBytes, 0, 4);
380-
await _stream.ReadBytesAsync(operationContext, buffer, 4, messageSize - 4, _socketReadTimeout).ConfigureAwait(false);
380+
await _stream.ReadBytesAsync(buffer, 4, messageSize - 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false);
381381
_lastUsedAtUtc = DateTime.UtcNow;
382382
buffer.MakeReadOnly();
383383
return buffer;
@@ -475,7 +475,8 @@ private void SendBuffer(OperationContext operationContext, IByteBuffer buffer)
475475

476476
try
477477
{
478-
_stream.WriteBytes(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout);
478+
var timeout = operationContext.RemainingTimeoutOrDefault(_socketWriteTimeout);
479+
_stream.WriteBytes(buffer, 0, buffer.Length, (int)timeout.TotalMilliseconds, operationContext.CancellationToken);
479480
_lastUsedAtUtc = DateTime.UtcNow;
480481
}
481482
catch (Exception ex)
@@ -495,7 +496,8 @@ private async Task SendBufferAsync(OperationContext operationContext, IByteBuffe
495496

496497
try
497498
{
498-
await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout).ConfigureAwait(false);
499+
var timeout = operationContext.RemainingTimeoutOrDefault(_socketWriteTimeout);
500+
await _stream.WriteBytesAsync(buffer, 0, buffer.Length, (int)timeout.TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false);
499501
_lastUsedAtUtc = DateTime.UtcNow;
500502
}
501503
catch (Exception ex)

src/MongoDB.Driver/Core/Connections/Socks5Helper.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public static void PerformSocks5Handshake(Stream stream, EndPoint endPoint, Sock
104104
var greetingRequestLength = CreateGreetingRequest(buffer, useAuth);
105105
stream.Write(buffer, 0, greetingRequestLength);
106106

107-
stream.ReadBytes(buffer, 0, 2, cancellationToken);
107+
stream.ReadBytes(buffer, 0, 2, cancellationToken: cancellationToken);
108108
var requiresAuthenticationStep = ProcessGreetingResponse(buffer, useAuth);
109109

110110
// If we have username and password, but the proxy doesn't need them, we skip the authentication step.
@@ -113,16 +113,16 @@ public static void PerformSocks5Handshake(Stream stream, EndPoint endPoint, Sock
113113
var authenticationRequestLength = CreateAuthenticationRequest(buffer, authenticationSettings);
114114
stream.Write(buffer, 0, authenticationRequestLength);
115115

116-
stream.ReadBytes(buffer, 0, 2, cancellationToken);
116+
stream.ReadBytes(buffer, 0, 2, cancellationToken: cancellationToken);
117117
ProcessAuthenticationResponse(buffer);
118118
}
119119

120120
var connectRequestLength = CreateConnectRequest(buffer, targetHost, targetPort);
121121
stream.Write(buffer, 0, connectRequestLength);
122122

123-
stream.ReadBytes(buffer, 0, 5, cancellationToken);
123+
stream.ReadBytes(buffer, 0, 5, cancellationToken: cancellationToken);
124124
var skip = ProcessConnectResponse(buffer);
125-
stream.ReadBytes(buffer, 0, skip, cancellationToken);
125+
stream.ReadBytes(buffer, 0, skip, cancellationToken: cancellationToken);
126126
}
127127
finally
128128
{
@@ -141,7 +141,7 @@ public static async Task PerformSocks5HandshakeAsync(Stream stream, EndPoint end
141141
var greetingRequestLength = CreateGreetingRequest(buffer, useAuth);
142142
await stream.WriteAsync(buffer, 0, greetingRequestLength, cancellationToken).ConfigureAwait(false);
143143

144-
await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken).ConfigureAwait(false);
144+
await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken: cancellationToken).ConfigureAwait(false);
145145
var requiresAuthenticationStep = ProcessGreetingResponse(buffer, useAuth);
146146

147147
// If we have username and password, but the proxy doesn't need them, we skip the authentication step.
@@ -150,16 +150,16 @@ public static async Task PerformSocks5HandshakeAsync(Stream stream, EndPoint end
150150
var authenticationRequestLength = CreateAuthenticationRequest(buffer, authenticationSettings);
151151
await stream.WriteAsync(buffer, 0, authenticationRequestLength, cancellationToken).ConfigureAwait(false);
152152

153-
await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken).ConfigureAwait(false);
153+
await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken: cancellationToken).ConfigureAwait(false);
154154
ProcessAuthenticationResponse(buffer);
155155
}
156156

157157
var connectRequestLength = CreateConnectRequest(buffer, targetHost, targetPort);
158158
await stream.WriteAsync(buffer, 0, connectRequestLength, cancellationToken).ConfigureAwait(false);
159159

160-
await stream.ReadBytesAsync(buffer, 0, 5, cancellationToken).ConfigureAwait(false);
160+
await stream.ReadBytesAsync(buffer, 0, 5, cancellationToken: cancellationToken).ConfigureAwait(false);
161161
var skip = ProcessConnectResponse(buffer);
162-
await stream.ReadBytesAsync(buffer, 0, skip, cancellationToken).ConfigureAwait(true);
162+
await stream.ReadBytesAsync(buffer, 0, skip, cancellationToken: cancellationToken).ConfigureAwait(true);
163163
}
164164
finally
165165
{
@@ -340,4 +340,4 @@ private static void EnsureSocksSuccess(byte code, string operation)
340340

341341
throw new IOException($"SOCKS5 {operation} failed. {message}");
342342
}
343-
}
343+
}

0 commit comments

Comments
 (0)