@@ -1149,23 +1149,157 @@ struct CollectEverythingLogHandler: LogHandler {
11491149 }
11501150}
11511151
1152+ /// A ``HTTPClientResponseDelegate`` that buffers the incoming response parts for the consumer. The consumer can
1153+ /// consume the bytes by calling ``next()`` on the delegate.
1154+ ///
1155+ /// The sole purpose of this class is to enable straight-line stream tests.
1156+ class ResponseStreamDelegate : HTTPClientResponseDelegate {
1157+ typealias Response = Void
1158+
1159+ enum State {
1160+ /// The delegate is in the idle state. There are no http response parts to be buffered
1161+ /// and the consumer did not signal a demand. Transitions to all other states are allowed.
1162+ case idle
1163+ /// The consumer has signaled a demand for more bytes, but none where available. Can
1164+ /// transition to `.idle` (when new bytes arrive), `.finished` (when the stream finishes or fails)
1165+ case waitingForBytes( EventLoopPromise < ByteBuffer ? > )
1166+ /// The consumer has signaled no further demand but bytes keep arriving. Valid transitions
1167+ /// to `.idle` (when bytes are consumed), `.finished` (when bytes are consumed, and the
1168+ /// stream has ended), `.failed` (if an error is forwarded)
1169+ case buffering( ByteBuffer , done: Bool )
1170+ /// Stores an error for consumption. Valid transitions are: `.finished`, when the error was consumed.
1171+ case failed( Error )
1172+ /// The stream has finished and all bytes or errors where consumed.
1173+ case finished
1174+ }
1175+
1176+ let eventLoop : EventLoop
1177+ private var state : State = . idle
1178+
1179+ init ( eventLoop: EventLoop ) {
1180+ self . eventLoop = eventLoop
1181+ }
1182+
1183+ func next( ) -> EventLoopFuture < ByteBuffer ? > {
1184+ if self . eventLoop. inEventLoop {
1185+ return self . next0 ( )
1186+ } else {
1187+ return self . eventLoop. flatSubmit {
1188+ self . next0 ( )
1189+ }
1190+ }
1191+ }
1192+
1193+ private func next0( ) -> EventLoopFuture < ByteBuffer ? > {
1194+ switch self . state {
1195+ case . idle:
1196+ let promise = self . eventLoop. makePromise ( of: ByteBuffer ? . self)
1197+ self . state = . waitingForBytes( promise)
1198+ return promise. futureResult
1199+
1200+ case . buffering( let byteBuffer, done: false ) :
1201+ self . state = . idle
1202+ return self . eventLoop. makeSucceededFuture ( byteBuffer)
1203+
1204+ case . buffering( let byteBuffer, done: true ) :
1205+ self . state = . finished
1206+ return self . eventLoop. makeSucceededFuture ( byteBuffer)
1207+
1208+ case . waitingForBytes:
1209+ preconditionFailure ( " Don't call `.next` twice " )
1210+
1211+ case . failed( let error) :
1212+ self . state = . finished
1213+ return self . eventLoop. makeFailedFuture ( error)
1214+
1215+ case . finished:
1216+ return self . eventLoop. makeSucceededFuture ( nil )
1217+ }
1218+ }
1219+
1220+ // MARK: HTTPClientResponseDelegate
1221+
1222+ func didSendRequestHead( task: HTTPClient . Task < Response > , _ head: HTTPRequestHead ) {
1223+ self . eventLoop. preconditionInEventLoop ( )
1224+ }
1225+
1226+ func didSendRequestPart( task: HTTPClient . Task < Response > , _ part: IOData ) {
1227+ self . eventLoop. preconditionInEventLoop ( )
1228+ }
1229+
1230+ func didSendRequest( task: HTTPClient . Task < Response > ) {
1231+ self . eventLoop. preconditionInEventLoop ( )
1232+ }
1233+
1234+ func didReceiveHead( task: HTTPClient . Task < Response > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
1235+ self . eventLoop. preconditionInEventLoop ( )
1236+ return task. eventLoop. makeSucceededVoidFuture ( )
1237+ }
1238+
1239+ func didReceiveBodyPart( task: HTTPClient . Task < Response > , _ buffer: ByteBuffer ) -> EventLoopFuture < Void > {
1240+ self . eventLoop. preconditionInEventLoop ( )
1241+
1242+ switch self . state {
1243+ case . idle:
1244+ self . state = . buffering( buffer, done: false )
1245+ case . waitingForBytes( let promise) :
1246+ self . state = . idle
1247+ promise. succeed ( buffer)
1248+ case . buffering( var byteBuffer, done: false ) :
1249+ var buffer = buffer
1250+ byteBuffer. writeBuffer ( & buffer)
1251+ self . state = . buffering( byteBuffer, done: false )
1252+ case . buffering( _, done: true ) , . finished, . failed:
1253+ preconditionFailure ( " Invalid state: \( self . state) " )
1254+ }
1255+
1256+ return task. eventLoop. makeSucceededVoidFuture ( )
1257+ }
1258+
1259+ func didReceiveError( task: HTTPClient . Task < Response > , _ error: Error ) {
1260+ self . eventLoop. preconditionInEventLoop ( )
1261+
1262+ switch self . state {
1263+ case . idle:
1264+ self . state = . failed( error)
1265+ case . waitingForBytes( let promise) :
1266+ self . state = . finished
1267+ promise. fail ( error)
1268+ case . buffering( _, done: false ) :
1269+ self . state = . failed( error)
1270+ case . buffering( _, done: true ) , . finished, . failed:
1271+ preconditionFailure ( " Invalid state: \( self . state) " )
1272+ }
1273+ }
1274+
1275+ func didFinishRequest( task: HTTPClient . Task < Response > ) throws {
1276+ self . eventLoop. preconditionInEventLoop ( )
1277+
1278+ switch self . state {
1279+ case . idle:
1280+ self . state = . finished
1281+ case . waitingForBytes( let promise) :
1282+ self . state = . finished
1283+ promise. succeed ( nil )
1284+ case . buffering( let byteBuffer, done: false ) :
1285+ self . state = . buffering( byteBuffer, done: true )
1286+ case . buffering( _, done: true ) , . finished, . failed:
1287+ preconditionFailure ( " Invalid state: \( self . state) " )
1288+ }
1289+ }
1290+ }
1291+
11521292class HTTPEchoHandler : ChannelInboundHandler {
11531293 typealias InboundIn = HTTPServerRequestPart
11541294 typealias OutboundOut = HTTPServerResponsePart
11551295
1156- var promises : CircularBuffer < EventLoopPromise < Void > > = CircularBuffer ( )
1157-
11581296 func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
11591297 let request = self . unwrapInboundIn ( data)
11601298 switch request {
11611299 case . head:
1162- context. writeAndFlush ( self . wrapOutboundOut ( . head( . init( version: . init ( major : 1 , minor : 1 ) , status: . ok) ) ) , promise: nil )
1300+ context. writeAndFlush ( self . wrapOutboundOut ( . head( . init( version: . http1_1 , status: . ok) ) ) , promise: nil )
11631301 case . body( let bytes) :
1164- context. writeAndFlush ( self . wrapOutboundOut ( . body( . byteBuffer( bytes) ) ) ) . whenSuccess {
1165- if let promise = self . promises. popFirst ( ) {
1166- promise. succeed ( ( ) )
1167- }
1168- }
1302+ context. writeAndFlush ( self . wrapOutboundOut ( . body( . byteBuffer( bytes) ) ) , promise: nil )
11691303 case . end:
11701304 context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) , promise: nil )
11711305 context. close ( promise: nil )
0 commit comments