@@ -24,16 +24,25 @@ public struct _FileContentStream: AsyncSequence & Sendable {
2424 public typealias Element = ByteBuffer
2525 typealias Underlying = AsyncThrowingChannel < Element , Error >
2626
27- public func makeAsyncIterator( ) -> AsyncIterator {
28- return AsyncIterator ( underlying: self . asyncChannel. makeAsyncIterator ( ) )
29- }
30-
31- public struct AsyncIterator : AsyncIteratorProtocol {
27+ public final class AsyncIterator : AsyncIteratorProtocol {
3228 public typealias Element = ByteBuffer
3329
30+ deinit {
31+ // This is painful and so wrong but unfortunately, our iterators don't have a cancel signal, so the only
32+ // thing we can do is hope for `deinit` to be invoked :(.
33+ // AsyncIteratorProtocol also doesn't support `~Copyable` so we also have to make this a class.
34+ self . channel? . close ( promise: nil )
35+ }
36+
37+ init ( underlying: Underlying . AsyncIterator , channel: ( any Channel ) ? ) {
38+ self . underlying = underlying
39+ self . channel = channel
40+ }
41+
3442 var underlying : Underlying . AsyncIterator
43+ let channel : ( any Channel ) ?
3544
36- public mutating func next( ) async throws -> ByteBuffer ? {
45+ public func next( ) async throws -> ByteBuffer ? {
3746 return try await self . underlying. next ( )
3847 }
3948 }
@@ -47,22 +56,41 @@ public struct _FileContentStream: AsyncSequence & Sendable {
4756 }
4857
4958 private let asyncChannel : AsyncThrowingChannel < ByteBuffer , Error >
59+ private let channel : ( any Channel ) ?
60+
61+ internal func isSameAs( _ other: FileContentStream ) -> Bool {
62+ return ( self . asyncChannel === other. asyncChannel) && ( self . channel === other. channel)
63+ }
64+
65+ public func makeAsyncIterator( ) -> AsyncIterator {
66+ return AsyncIterator (
67+ underlying: self . asyncChannel. makeAsyncIterator ( ) ,
68+ channel: self . channel
69+ )
70+ }
71+
72+ public func close( ) async throws {
73+ self . asyncChannel. finish ( )
74+ do {
75+ try await self . channel? . close ( ) . get ( )
76+ } catch ChannelError . alreadyClosed {
77+ // That's okay
78+ }
79+ }
5080
5181 public static func makeReader(
5282 fileDescriptor: CInt ,
5383 eventLoop: EventLoop = MultiThreadedEventLoopGroup . singleton. any ( ) ,
5484 blockingPool: NIOThreadPool = . singleton
5585 ) async throws -> _FileContentStream {
56- return try await eventLoop. submit {
57- try FileContentStream ( fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool)
58- } . get ( )
86+ try await FileContentStream ( fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool)
5987 }
6088
6189 internal init (
6290 fileDescriptor: CInt ,
6391 eventLoop: EventLoop ,
6492 blockingPool: NIOThreadPool ? = nil
65- ) throws {
93+ ) async throws {
6694 var statInfo : stat = . init( )
6795 let statError = fstat ( fileDescriptor, & statInfo)
6896 if statError != 0 {
@@ -103,23 +131,36 @@ public struct _FileContentStream: AsyncSequence & Sendable {
103131 asyncChannel. finish ( )
104132 }
105133 }
134+ self . channel = nil
106135 case S_IFSOCK:
107- _ = ClientBootstrap ( group: eventLoop)
136+ self . channel = try await ClientBootstrap ( group: eventLoop)
108137 . channelInitializer { channel in
109- channel. pipeline. addHandler ( ReadIntoAsyncChannelHandler ( sink: asyncChannel) )
138+ do {
139+ try channel. pipeline. syncOperations. addHandler ( ReadIntoAsyncChannelHandler ( sink: asyncChannel) )
140+ return channel. eventLoop. makeSucceededFuture ( ( ) )
141+ } catch {
142+ return channel. eventLoop. makeFailedFuture ( error)
143+ }
110144 }
111145 . withConnectedSocket ( dupedFD)
146+ . get ( )
112147 case S_IFIFO:
113- NIOPipeBootstrap ( group: eventLoop)
148+ self . channel = try await NIOPipeBootstrap ( group: eventLoop)
114149 . channelInitializer { channel in
115- channel. pipeline. addHandler ( ReadIntoAsyncChannelHandler ( sink: asyncChannel) )
150+ do {
151+ try channel. pipeline. syncOperations. addHandler ( ReadIntoAsyncChannelHandler ( sink: asyncChannel) )
152+ return channel. eventLoop. makeSucceededFuture ( ( ) )
153+ } catch {
154+ return channel. eventLoop. makeFailedFuture ( error)
155+ }
116156 }
117157 . takingOwnershipOfDescriptor (
118158 input: dupedFD
119159 )
120- . whenSuccess { channel in
160+ . map { channel in
121161 channel. close ( mode: . output, promise: nil )
122- }
162+ return channel
163+ } . get ( )
123164 case S_IFDIR:
124165 throw IOError ( errnoValue: EISDIR)
125166 case S_IFBLK, S_IFCHR, S_IFLNK:
@@ -265,8 +306,8 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
265306}
266307
267308extension FileHandle {
268- func fileContentStream( eventLoop: EventLoop ) throws -> FileContentStream {
269- let asyncBytes = try FileContentStream ( fileDescriptor: self . fileDescriptor, eventLoop: eventLoop)
309+ func fileContentStream( eventLoop: EventLoop ) async throws -> FileContentStream {
310+ let asyncBytes = try await FileContentStream ( fileDescriptor: self . fileDescriptor, eventLoop: eventLoop)
270311 try self . close ( )
271312 return asyncBytes
272313 }
0 commit comments