Skip to content

Commit 9daaae6

Browse files
novincNovin Changizi
andauthored
Added peekable streams to avoid blocking read calls on pipes with no data (#390)
* added peekable streams * set extern method to private * don't upgrade newtonsoft Co-authored-by: Novin Changizi <nochangi@microsoft.com>
1 parent 23afcce commit 9daaae6

File tree

4 files changed

+90
-3
lines changed

4 files changed

+90
-3
lines changed

src/Docker.DotNet/DockerPipeStream.cs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace Docker.DotNet
1010
{
11-
internal class DockerPipeStream : WriteClosableStream
11+
internal class DockerPipeStream : WriteClosableStream, IPeekableStream
1212
{
1313
private readonly PipeStream _stream;
1414
private readonly EventWaitHandle _event = new EventWaitHandle(false, EventResetMode.AutoReset);
@@ -44,6 +44,9 @@ public override long Position
4444
[DllImport("api-ms-win-core-io-l1-1-0.dll", SetLastError = true)]
4545
private static extern int GetOverlappedResult(SafeHandle handle, ref NativeOverlapped overlapped, out int numBytesWritten, int wait);
4646

47+
[DllImport("kernel32.dll", SetLastError = true)]
48+
private static extern bool PeekNamedPipe(SafeHandle handle, byte[] buffer, uint nBufferSize, ref uint bytesRead, ref uint bytesAvail, ref uint BytesLeftThisMessage);
49+
4750
public override void CloseWrite()
4851
{
4952
// The Docker daemon expects a write of zero bytes to signal the end of writes. Use native
@@ -92,6 +95,27 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
9295
return _stream.ReadAsync(buffer, offset, count, cancellationToken);
9396
}
9497

98+
public bool Peek(byte[] buffer, uint toPeek, out uint peeked, out uint available, out uint remaining)
99+
{
100+
peeked = 0;
101+
available = 0;
102+
remaining = 0;
103+
104+
bool aPeekedSuccess = PeekNamedPipe(
105+
_stream.SafePipeHandle,
106+
buffer, toPeek,
107+
ref peeked, ref available, ref remaining);
108+
109+
var error = Marshal.GetLastWin32Error();
110+
111+
if (error == 0 && aPeekedSuccess)
112+
{
113+
return true;
114+
}
115+
116+
return false;
117+
}
118+
95119
public override long Seek(long offset, SeekOrigin origin)
96120
{
97121
throw new NotImplementedException();
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
namespace Docker.DotNet
2+
{
3+
public interface IPeekableStream
4+
{
5+
/// <summary>
6+
/// Peek the underlying stream, can be used in order to avoid a blocking read call when no data is available
7+
/// https://stackoverflow.com/questions/6846365/check-for-eof-in-namedpipeclientstream
8+
/// https://msdn.microsoft.com/en-us/library/windows/desktop/aa365779(v=vs.85).aspx
9+
/// </summary>
10+
/// <param name="buffer">buffer to put peeked data in</param>
11+
/// <param name="toPeek">max number of bytes to peek</param>
12+
/// <param name="peeked">number of bytes that were peeked</param>
13+
/// <param name="available">number of bytes that were available for peeking</param>
14+
/// <param name="remaining">number of available bytes minus number of peeked</param>
15+
/// <returns>whether peek operation succeeded</returns>
16+
bool Peek(byte[] buffer, uint toPeek, out uint peeked, out uint available, out uint remaining);
17+
}
18+
}

src/Docker.DotNet/Microsoft.Net.Http.Client/BufferedReadStream.cs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44
using System.Text;
55
using System.Threading;
66
using System.Threading.Tasks;
7+
using Docker.DotNet;
78

89
#if !NET45
910
using System.Buffers;
1011
#endif
1112

1213
namespace Microsoft.Net.Http.Client
1314
{
14-
internal class BufferedReadStream : WriteClosableStream
15+
internal class BufferedReadStream : WriteClosableStream, IPeekableStream
1516
{
1617
private const char CR = '\r';
1718
private const char LF = '\n';
@@ -145,6 +146,22 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
145146
return _inner.ReadAsync(buffer, offset, count, cancellationToken);
146147
}
147148

149+
public bool Peek(byte[] buffer, uint toPeek, out uint peeked, out uint available, out uint remaining)
150+
{
151+
int read = PeekBuffer(buffer, toPeek, out peeked, out available, out remaining);
152+
if (read > 0)
153+
{
154+
return true;
155+
}
156+
157+
if (_inner is IPeekableStream peekableStream)
158+
{
159+
return peekableStream.Peek(buffer, toPeek, out peeked, out available, out remaining);
160+
}
161+
162+
throw new NotSupportedException("_inner stream isn't a peekable stream");
163+
}
164+
148165
private int ReadBuffer(byte[] buffer, int offset, int count)
149166
{
150167
if (_bufferCount > 0)
@@ -159,6 +176,24 @@ private int ReadBuffer(byte[] buffer, int offset, int count)
159176
return 0;
160177
}
161178

179+
private int PeekBuffer(byte[] buffer, uint toPeek, out uint peeked, out uint available, out uint remaining)
180+
{
181+
if (_bufferCount > 0)
182+
{
183+
int toCopy = Math.Min(_bufferCount, (int)toPeek);
184+
Buffer.BlockCopy(_buffer, _bufferOffset, buffer, 0, toCopy);
185+
peeked = (uint) toCopy;
186+
available = (uint)_bufferCount;
187+
remaining = available - peeked;
188+
return toCopy;
189+
}
190+
191+
peeked = 0;
192+
available = 0;
193+
remaining = 0;
194+
return 0;
195+
}
196+
162197
private async Task EnsureBufferedAsync(CancellationToken cancel)
163198
{
164199
if (_bufferCount == 0)

src/Docker.DotNet/MultiplexedStream.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
namespace Docker.DotNet
1313
{
14-
public class MultiplexedStream : IDisposable
14+
public class MultiplexedStream : IDisposable, IPeekableStream
1515
{
1616
private readonly Stream _stream;
1717
private TargetStream _target;
@@ -54,6 +54,16 @@ public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken c
5454
return _stream.WriteAsync(buffer, offset, count, cancellationToken);
5555
}
5656

57+
public bool Peek(byte[] buffer, uint toPeek, out uint peeked, out uint available, out uint remaining)
58+
{
59+
if (_stream is IPeekableStream peekableStream)
60+
{
61+
return peekableStream.Peek(buffer, toPeek, out peeked, out available, out remaining);
62+
}
63+
64+
throw new NotSupportedException("_stream isn't a peekable stream");
65+
}
66+
5767
public async Task<ReadResult> ReadOutputAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
5868
{
5969
if (!_multiplexed)

0 commit comments

Comments
 (0)