@@ -641,7 +641,7 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
641641 case head
642642 case redirected( HTTPResponseHead , URL )
643643 case body
644- case end
644+ case endOrError
645645 }
646646
647647 let task : HTTPClient . Task < Delegate . Response >
@@ -651,6 +651,8 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
651651 let logger : Logger // We are okay to store the logger here because a TaskHandler is just for one request.
652652
653653 var state : State = . idle
654+ var expectedBodyLength : Int ?
655+ var actualBodyLength : Int = 0
654656 var pendingRead = false
655657 var mayRead = true
656658 var closing = false {
@@ -785,7 +787,7 @@ extension TaskHandler: ChannelDuplexHandler {
785787 } catch {
786788 promise? . fail ( error)
787789 self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
788- self . state = . end
790+ self . state = . endOrError
789791 return
790792 }
791793
@@ -799,12 +801,23 @@ extension TaskHandler: ChannelDuplexHandler {
799801 assert ( head. version == HTTPVersion ( major: 1 , minor: 1 ) ,
800802 " Sending a request in HTTP version \( head. version) which is unsupported by the above `if` " )
801803
804+ let contentLengths = head. headers [ canonicalForm: " content-length " ]
805+ assert ( contentLengths. count <= 1 )
806+
807+ self . expectedBodyLength = contentLengths. first. flatMap { Int ( $0) }
808+
802809 context. write ( wrapOutboundOut ( . head( head) ) ) . map {
803810 self . callOutToDelegateFireAndForget ( value: head, self . delegate. didSendRequestHead)
804811 } . flatMap {
805812 self . writeBody ( request: request, context: context)
806813 } . flatMap {
807814 context. eventLoop. assertInEventLoop ( )
815+ if let expectedBodyLength = self . expectedBodyLength, expectedBodyLength != self . actualBodyLength {
816+ self . state = . endOrError
817+ let error = HTTPClientError . bodyLengthMismatch
818+ self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
819+ return context. eventLoop. makeFailedFuture ( error)
820+ }
808821 return context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) )
809822 } . map {
810823 context. eventLoop. assertInEventLoop ( )
@@ -813,10 +826,10 @@ extension TaskHandler: ChannelDuplexHandler {
813826 } . flatMapErrorThrowing { error in
814827 context. eventLoop. assertInEventLoop ( )
815828 switch self . state {
816- case . end :
829+ case . endOrError :
817830 break
818831 default :
819- self . state = . end
832+ self . state = . endOrError
820833 self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
821834 }
822835 throw error
@@ -833,9 +846,11 @@ extension TaskHandler: ChannelDuplexHandler {
833846 let promise = self . task. eventLoop. makePromise ( of: Void . self)
834847 // All writes have to be switched to the channel EL if channel and task ELs differ
835848 if context. eventLoop. inEventLoop {
849+ self . actualBodyLength += part. readableBytes
836850 context. writeAndFlush ( self . wrapOutboundOut ( . body( part) ) , promise: promise)
837851 } else {
838852 context. eventLoop. execute {
853+ self . actualBodyLength += part. readableBytes
839854 context. writeAndFlush ( self . wrapOutboundOut ( . body( part) ) , promise: promise)
840855 }
841856 }
@@ -898,12 +913,12 @@ extension TaskHandler: ChannelDuplexHandler {
898913 case . end:
899914 switch self . state {
900915 case . redirected( let head, let redirectURL) :
901- self . state = . end
916+ self . state = . endOrError
902917 self . task. releaseAssociatedConnection ( delegateType: Delegate . self, closing: self . closing) . whenSuccess {
903918 self . redirectHandler? . redirect ( status: head. status, to: redirectURL, promise: self . task. promise)
904919 }
905920 default :
906- self . state = . end
921+ self . state = . endOrError
907922 self . callOutToDelegate ( promise: self . task. promise, self . delegate. didFinishRequest)
908923 }
909924 }
@@ -918,14 +933,14 @@ extension TaskHandler: ChannelDuplexHandler {
918933 context. read ( )
919934 }
920935 case . failure( let error) :
921- self . state = . end
936+ self . state = . endOrError
922937 self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
923938 }
924939 }
925940
926941 func userInboundEventTriggered( context: ChannelHandlerContext , event: Any ) {
927942 if ( event as? IdleStateHandler . IdleStateEvent) == . read {
928- self . state = . end
943+ self . state = . endOrError
929944 let error = HTTPClientError . readTimeout
930945 self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
931946 } else {
@@ -935,7 +950,7 @@ extension TaskHandler: ChannelDuplexHandler {
935950
936951 func triggerUserOutboundEvent( context: ChannelHandlerContext , event: Any , promise: EventLoopPromise < Void > ? ) {
937952 if ( event as? TaskCancelEvent ) != nil {
938- self . state = . end
953+ self . state = . endOrError
939954 let error = HTTPClientError . cancelled
940955 self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
941956 promise? . succeed ( ( ) )
@@ -946,10 +961,10 @@ extension TaskHandler: ChannelDuplexHandler {
946961
947962 func channelInactive( context: ChannelHandlerContext ) {
948963 switch self . state {
949- case . end :
964+ case . endOrError :
950965 break
951966 case . body, . head, . idle, . redirected, . sent:
952- self . state = . end
967+ self . state = . endOrError
953968 let error = HTTPClientError . remoteConnectionClosed
954969 self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
955970 }
@@ -960,7 +975,7 @@ extension TaskHandler: ChannelDuplexHandler {
960975 switch error {
961976 case NIOSSLError . uncleanShutdown:
962977 switch self . state {
963- case . end :
978+ case . endOrError :
964979 /// Some HTTP Servers can 'forget' to respond with CloseNotify when client is closing connection,
965980 /// this could lead to incomplete SSL shutdown. But since request is already processed, we can ignore this error.
966981 break
@@ -969,11 +984,11 @@ extension TaskHandler: ChannelDuplexHandler {
969984 /// We can also ignore this error like `.end`.
970985 break
971986 default :
972- self . state = . end
987+ self . state = . endOrError
973988 self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
974989 }
975990 default :
976- self . state = . end
991+ self . state = . endOrError
977992 self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
978993 }
979994 }
0 commit comments