@@ -943,4 +943,149 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase {
943943 XCTAssertEqual ( action3. request, . none)
944944 XCTAssertEqual ( action3. connection, . none)
945945 }
946+
947+ func testMaxConcurrentStreamsIsRespected( ) {
948+ let elg = EmbeddedEventLoopGroup ( loops: 4 )
949+ defer { XCTAssertNoThrow ( try elg. syncShutdownGracefully ( ) ) }
950+
951+ guard var ( connections, state) = try ? MockConnectionPool . http2 ( elg: elg, maxConcurrentStreams: 100 ) else {
952+ return XCTFail ( " Test setup failed " )
953+ }
954+
955+ let generalPurposeConnection = connections. randomParkedConnection ( ) !
956+ var queuer = MockRequestQueuer ( )
957+
958+ // schedule 1000 requests on the pool. The first 100 will be executed right away. All others
959+ // shall be queued.
960+ for i in 0 ..< 1000 {
961+ let requestEL = elg. next ( )
962+ let mockRequest = MockHTTPRequest ( eventLoop: requestEL)
963+ let request = HTTPConnectionPool . Request ( mockRequest)
964+
965+ let executeAction = state. executeRequest ( request)
966+ switch i {
967+ case 0 :
968+ XCTAssertEqual ( executeAction. connection, . cancelTimeoutTimer( generalPurposeConnection. id) )
969+ XCTAssertNoThrow ( try connections. activateConnection ( generalPurposeConnection. id) )
970+ XCTAssertEqual ( executeAction. request, . executeRequest( request, generalPurposeConnection, cancelTimeout: false ) )
971+ XCTAssertNoThrow ( try connections. execute ( mockRequest, on: generalPurposeConnection) )
972+ case 1 ..< 100 :
973+ XCTAssertEqual ( executeAction. request, . executeRequest( request, generalPurposeConnection, cancelTimeout: false ) )
974+ XCTAssertEqual ( executeAction. connection, . none)
975+ XCTAssertNoThrow ( try connections. execute ( mockRequest, on: generalPurposeConnection) )
976+ case 100 ..< 1000 :
977+ XCTAssertEqual ( executeAction. request, . scheduleRequestTimeout( for: request, on: requestEL) )
978+ XCTAssertEqual ( executeAction. connection, . none)
979+ XCTAssertNoThrow ( try queuer. queue ( mockRequest, id: request. id) )
980+ default :
981+ XCTFail ( " Unexpected " )
982+ }
983+ }
984+
985+ // let's end processing 500 requests. For every finished request, we will execute another one
986+ // right away
987+ while queuer. count > 500 {
988+ XCTAssertNoThrow ( try connections. finishExecution ( generalPurposeConnection. id) )
989+ let finishAction = state. http2ConnectionStreamClosed ( generalPurposeConnection. id)
990+ XCTAssertEqual ( finishAction. connection, . none)
991+ guard case . executeRequestsAndCancelTimeouts( let requests, generalPurposeConnection) = finishAction. request else {
992+ return XCTFail ( " Unexpected request action: \( finishAction. request) " )
993+ }
994+ guard requests. count == 1 , let request = requests. first else {
995+ return XCTFail ( " Expected to get exactly one request! " )
996+ }
997+ let mockRequest = request. __testOnly_wrapped_request ( )
998+ XCTAssertNoThrow ( try queuer. get ( request. id, request: mockRequest) )
999+ XCTAssertNoThrow ( try connections. execute ( mockRequest, on: generalPurposeConnection) )
1000+ }
1001+
1002+ XCTAssertEqual ( queuer. count, 500 )
1003+
1004+ // Next the server allows for more concurrent streams
1005+ let newMaxStreams = 200
1006+ XCTAssertNoThrow ( try connections. newHTTP2ConnectionSettingsReceived ( generalPurposeConnection. id, maxConcurrentStreams: newMaxStreams) )
1007+ let newMaxStreamsAction = state. newHTTP2MaxConcurrentStreamsReceived ( generalPurposeConnection. id, newMaxStreams: newMaxStreams)
1008+ XCTAssertEqual ( newMaxStreamsAction. connection, . none)
1009+ guard case . executeRequestsAndCancelTimeouts( let requests, generalPurposeConnection) = newMaxStreamsAction. request else {
1010+ return XCTFail ( " Unexpected request action after new max concurrent stream setting: \( newMaxStreamsAction. request) " )
1011+ }
1012+ XCTAssertEqual ( requests. count, 100 , " Expected to execute 100 more requests " )
1013+ for request in requests {
1014+ let mockRequest = request. __testOnly_wrapped_request ( )
1015+ XCTAssertNoThrow ( try connections. execute ( mockRequest, on: generalPurposeConnection) )
1016+ XCTAssertNoThrow ( try queuer. get ( request. id, request: mockRequest) )
1017+ }
1018+
1019+ XCTAssertEqual ( queuer. count, 400 )
1020+
1021+ // let's end processing 100 requests. For every finished request, we will execute another one
1022+ // right away
1023+ while queuer. count > 300 {
1024+ XCTAssertNoThrow ( try connections. finishExecution ( generalPurposeConnection. id) )
1025+ let finishAction = state. http2ConnectionStreamClosed ( generalPurposeConnection. id)
1026+ XCTAssertEqual ( finishAction. connection, . none)
1027+ guard case . executeRequestsAndCancelTimeouts( let requests, generalPurposeConnection) = finishAction. request else {
1028+ return XCTFail ( " Unexpected request action: \( finishAction. request) " )
1029+ }
1030+ guard requests. count == 1 , let request = requests. first else {
1031+ return XCTFail ( " Expected to get exactly one request! " )
1032+ }
1033+ let mockRequest = request. __testOnly_wrapped_request ( )
1034+ XCTAssertNoThrow ( try queuer. get ( request. id, request: mockRequest) )
1035+ XCTAssertNoThrow ( try connections. execute ( mockRequest, on: generalPurposeConnection) )
1036+ }
1037+
1038+ // Next the server allows for fewer concurrent streams
1039+ let fewerMaxStreams = 50
1040+ XCTAssertNoThrow ( try connections. newHTTP2ConnectionSettingsReceived ( generalPurposeConnection. id, maxConcurrentStreams: fewerMaxStreams) )
1041+ let fewerMaxStreamsAction = state. newHTTP2MaxConcurrentStreamsReceived ( generalPurposeConnection. id, newMaxStreams: fewerMaxStreams)
1042+ XCTAssertEqual ( fewerMaxStreamsAction. connection, . none)
1043+ XCTAssertEqual ( fewerMaxStreamsAction. request, . none)
1044+
1045+ // for the next 150 requests that are finished, no new request must be executed.
1046+ for _ in 0 ..< 150 {
1047+ XCTAssertNoThrow ( try connections. finishExecution ( generalPurposeConnection. id) )
1048+ XCTAssertEqual ( state. http2ConnectionStreamClosed ( generalPurposeConnection. id) , . none)
1049+ }
1050+
1051+ XCTAssertEqual ( queuer. count, 300 )
1052+
1053+ // let's end all remaining requests. For every finished request, we will execute another one
1054+ // right away
1055+ while queuer. count > 0 {
1056+ XCTAssertNoThrow ( try connections. finishExecution ( generalPurposeConnection. id) )
1057+ let finishAction = state. http2ConnectionStreamClosed ( generalPurposeConnection. id)
1058+ XCTAssertEqual ( finishAction. connection, . none)
1059+ guard case . executeRequestsAndCancelTimeouts( let requests, generalPurposeConnection) = finishAction. request else {
1060+ return XCTFail ( " Unexpected request action: \( finishAction. request) " )
1061+ }
1062+ guard requests. count == 1 , let request = requests. first else {
1063+ return XCTFail ( " Expected to get exactly one request! " )
1064+ }
1065+ let mockRequest = request. __testOnly_wrapped_request ( )
1066+ XCTAssertNoThrow ( try queuer. get ( request. id, request: mockRequest) )
1067+ XCTAssertNoThrow ( try connections. execute ( mockRequest, on: generalPurposeConnection) )
1068+ }
1069+
1070+ // Now we only need to drain the remaining 50 requests on the connection
1071+ var timeoutTimerScheduled = false
1072+ for remaining in stride ( from: 50 , through: 1 , by: - 1 ) {
1073+ XCTAssertNoThrow ( try connections. finishExecution ( generalPurposeConnection. id) )
1074+ let finishAction = state. http2ConnectionStreamClosed ( generalPurposeConnection. id)
1075+ XCTAssertEqual ( finishAction. request, . none)
1076+ switch remaining {
1077+ case 1 :
1078+ timeoutTimerScheduled = true
1079+ XCTAssertEqual ( finishAction. connection, . scheduleTimeoutTimer( generalPurposeConnection. id, on: generalPurposeConnection. eventLoop) )
1080+ XCTAssertNoThrow ( try connections. parkConnection ( generalPurposeConnection. id) )
1081+ case 2 ... 50 :
1082+ XCTAssertEqual ( finishAction. connection, . none)
1083+ default :
1084+ XCTFail ( " Unexpected value: \( remaining) " )
1085+ }
1086+ }
1087+ XCTAssertTrue ( timeoutTimerScheduled)
1088+ XCTAssertNotNil ( connections. randomParkedConnection ( ) )
1089+ XCTAssertEqual ( connections. count, 1 )
1090+ }
9461091}
0 commit comments