@@ -69,6 +69,15 @@ extension HTTPConnectionPool {
6969 }
7070 }
7171
72+ var canOrWillBeAbleToExecuteRequests : Bool {
73+ switch self . state {
74+ case . leased, . backingOff, . idle, . starting:
75+ return true
76+ case . closed:
77+ return false
78+ }
79+ }
80+
7281 var isLeased : Bool {
7382 switch self . state {
7483 case . leased:
@@ -281,6 +290,10 @@ extension HTTPConnectionPool {
281290 return connecting
282291 }
283292
293+ private var maximumAdditionalGeneralPurposeConnections : Int {
294+ self . maximumConcurrentConnections - ( self . overflowIndex - 1 )
295+ }
296+
284297 /// Is there at least one connection that is able to run requests
285298 var hasActiveConnections : Bool {
286299 self . connections. contains ( where: { $0. isIdle || $0. isLeased } )
@@ -530,8 +543,8 @@ extension HTTPConnectionPool {
530543 return migrationContext
531544 }
532545
533- /// we only handle starting and backing off connection here.
534- /// All running connections must be handled by the enclosing state machine
546+ /// We only handle starting and backing off connection here.
547+ /// All already running connections must be handled by the enclosing state machine.
535548 /// - Parameters:
536549 /// - starting: starting HTTP connections from previous state machine
537550 /// - backingOff: backing off HTTP connections from previous state machine
@@ -541,17 +554,96 @@ extension HTTPConnectionPool {
541554 ) {
542555 for (connectionID, eventLoop) in starting {
543556 let newConnection = HTTP1ConnectionState ( connectionID: connectionID, eventLoop: eventLoop)
544- self . connections. append ( newConnection)
557+ self . connections. insert ( newConnection, at: self . overflowIndex)
558+ /// If we can grow, we mark the connection as a general purpose connection.
559+ /// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
560+ if self . canGrow {
561+ self . overflowIndex = self . connections. index ( after: self . overflowIndex)
562+ }
545563 }
546564
547565 for (connectionID, eventLoop) in backingOff {
548566 var backingOffConnection = HTTP1ConnectionState ( connectionID: connectionID, eventLoop: eventLoop)
549567 // TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
550568 backingOffConnection. failedToConnect ( )
551- self . connections. append ( backingOffConnection)
569+ self . connections. insert ( backingOffConnection, at: self . overflowIndex)
570+ /// If we can grow, we mark the connection as a general purpose connection.
571+ /// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
572+ if self . canGrow {
573+ self . overflowIndex = self . connections. index ( after: self . overflowIndex)
574+ }
552575 }
553576 }
554577
578+ /// We will create new connections for each `requiredEventLoopOfPendingRequests`
579+ /// In addition, we also create more general purpose connections if we do not have enough to execute
580+ /// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
581+ /// until we reach `maximumConcurrentConnections`
582+ /// - Parameters:
583+ /// - requiredEventLoopsForPendingRequests:
584+ /// event loops for which we have requests with a required event loop.
585+ /// Duplicates are not allowed.
586+ /// - generalPurposeRequestCountPerPreferredEventLoop:
587+ /// request count with no required event loop,
588+ /// grouped by preferred event loop and ordered descending by number of requests
589+ /// - Returns: new connections that must be created
590+ mutating func createConnectionsAfterMigrationIfNeeded(
591+ requiredEventLoopOfPendingRequests: [ ( EventLoop , Int ) ] ,
592+ generalPurposeRequestCountGroupedByPreferredEventLoop: [ ( EventLoop , Int ) ]
593+ ) -> [ ( Connection . ID , EventLoop ) ] {
594+ // create new connections for requests with a required event loop
595+
596+ // we may already start connections for those requests and do not want to start to many
597+ let startingRequiredEventLoopConnectionCount = Dictionary (
598+ self . connections [ self . overflowIndex..< self . connections. endIndex] . lazy. map {
599+ ( $0. eventLoop. id, 1 )
600+ } ,
601+ uniquingKeysWith: +
602+ )
603+ var connectionToCreate = requiredEventLoopOfPendingRequests
604+ . flatMap { ( eventLoop, requestCount) -> [ ( Connection . ID , EventLoop ) ] in
605+ // We need a connection for each queued request with a required event loop.
606+ // Therefore, we look how many request we have queued for a given `eventLoop` and
607+ // how many connections we are already starting on the given `eventLoop`.
608+ // If we have not enough, we will create additional connections to have at least
609+ // on connection per request.
610+ let connectionsToStart = requestCount - startingRequiredEventLoopConnectionCount[ eventLoop. id, default: 0 ]
611+ return stride ( from: 0 , to: connectionsToStart, by: 1 ) . lazy. map { _ in
612+ ( self . createNewOverflowConnection ( on: eventLoop) , eventLoop)
613+ }
614+ }
615+
616+ // create new connections for requests without a required event loop
617+
618+ // TODO: improve algorithm to create connections uniformly across all preferred event loops
619+ // while paying attention to the number of queued request per event loop
620+ // Currently we start by creating new connections on the event loop with the most queued
621+ // requests. If we have create a enough connections to cover all requests for the given
622+ // event loop we will continue with the event loop with the second most queued requests
623+ // and so on and so forth. We do not need to sort the array because
624+ let newGeneralPurposeConnections : [ ( Connection . ID , EventLoop ) ] = generalPurposeRequestCountGroupedByPreferredEventLoop
625+ // we do not want to allocated intermediate arrays.
626+ . lazy
627+ // we flatten the grouped list of event loops by lazily repeating the event loop
628+ // for each request.
629+ // As a result we get one event loop per request (`[EventLoop]`).
630+ . flatMap { eventLoop, requestCount in
631+ repeatElement ( eventLoop, count: requestCount)
632+ }
633+ // we may already start connections and do not want to start too many
634+ . dropLast ( self . startingGeneralPurposeConnections)
635+ // we need to respect the used defined `maximumConcurrentConnections`
636+ . prefix ( self . maximumAdditionalGeneralPurposeConnections)
637+ // we now create a connection for each remaining event loop
638+ . map { eventLoop in
639+ ( self . createNewConnection ( on: eventLoop) , eventLoop)
640+ }
641+
642+ connectionToCreate. append ( contentsOf: newGeneralPurposeConnections)
643+
644+ return connectionToCreate
645+ }
646+
555647 // MARK: Shutdown
556648
557649 mutating func shutdown( ) -> CleanupContext {
0 commit comments