1414
1515import Logging
1616import UnixSignals
17+ import AsyncAlgorithms
1718
1819/// A ``ServiceGroup`` is responsible for running a number of services, setting up signal handling and signalling graceful shutdown to the services.
1920public actor ServiceGroup: Sendable , Service {
@@ -23,7 +24,8 @@ public actor ServiceGroup: Sendable, Service {
2324 case initial( services: [ ServiceGroupConfiguration . ServiceConfiguration ] )
2425 /// The state once ``ServiceGroup/run()`` has been called.
2526 case running(
26- gracefulShutdownStreamContinuation: AsyncStream < Void > . Continuation
27+ gracefulShutdownStreamContinuation: AsyncStream < Void > . Continuation ,
28+ addedServiceChannel: AsyncChannel < ServiceGroupConfiguration . ServiceConfiguration >
2729 )
2830 /// The state once ``ServiceGroup/run()`` has finished.
2931 case finished
@@ -106,6 +108,38 @@ public actor ServiceGroup: Sendable, Service {
106108 self . maximumCancellationDuration = configuration. _maximumCancellationDuration
107109 }
108110
111+ /// Adds a new service to the group.
112+ ///
113+ /// If the group is currently running, the added service will be started immediately.
114+ /// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started.
115+ /// - Parameters:
116+ /// - serviceConfiguration: The service configuration to add.
117+ public func addServiceUnlessShutdown( _ serviceConfiguration: ServiceGroupConfiguration . ServiceConfiguration ) async {
118+ switch self . state {
119+ case var . initial( services: services) :
120+ self . state = . initial( services: [ ] )
121+ services. append ( serviceConfiguration)
122+ self . state = . initial( services: services)
123+
124+ case . running( _, let addedServiceChannel) :
125+ await addedServiceChannel. send ( serviceConfiguration)
126+
127+ case . finished:
128+ // Since this is a best effort operation we don't have to do anything here
129+ return
130+ }
131+ }
132+
133+ /// Adds a new service to the group.
134+ ///
135+ /// If the group is currently running, the added service will be started immediately.
136+ /// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started.
137+ /// - Parameters:
138+ /// - service: The service to add.
139+ public func addServiceUnlessShutdown( _ service: any Service ) async {
140+ await self . addServiceUnlessShutdown ( ServiceGroupConfiguration . ServiceConfiguration ( service: service) )
141+ }
142+
109143 /// Runs all the services by spinning up a child task per service.
110144 /// Furthermore, this method sets up the correct signal handlers
111145 /// for graceful shutdown.
@@ -128,16 +162,19 @@ public actor ServiceGroup: Sendable, Service {
128162 }
129163
130164 let ( gracefulShutdownStream, gracefulShutdownContinuation) = AsyncStream . makeStream ( of: Void . self)
165+ let addedServiceChannel = AsyncChannel < ServiceGroupConfiguration . ServiceConfiguration > ( )
131166
132167 self . state = . running(
133- gracefulShutdownStreamContinuation: gracefulShutdownContinuation
168+ gracefulShutdownStreamContinuation: gracefulShutdownContinuation,
169+ addedServiceChannel: addedServiceChannel
134170 )
135171
136172 var potentialError : Error ?
137173 do {
138174 try await self . _run (
139175 services: & services,
140- gracefulShutdownStream: gracefulShutdownStream
176+ gracefulShutdownStream: gracefulShutdownStream,
177+ addedServiceChannel: addedServiceChannel
141178 )
142179 } catch {
143180 potentialError = error
@@ -173,7 +210,7 @@ public actor ServiceGroup: Sendable, Service {
173210 self . state = . finished
174211 return
175212
176- case . running( let gracefulShutdownStreamContinuation) :
213+ case . running( let gracefulShutdownStreamContinuation, _ ) :
177214 // We cannot transition to shuttingDown here since we are signalling over to the task
178215 // that runs `run`. This task is responsible for transitioning to shuttingDown since
179216 // there might be multiple signals racing to trigger it
@@ -198,11 +235,13 @@ public actor ServiceGroup: Sendable, Service {
198235 case gracefulShutdownFinished
199236 case gracefulShutdownTimedOut
200237 case cancellationCaught
238+ case newServiceAdded( ServiceGroupConfiguration . ServiceConfiguration )
201239 }
202240
203241 private func _run(
204242 services: inout [ ServiceGroupConfiguration . ServiceConfiguration ] ,
205- gracefulShutdownStream: AsyncStream < Void >
243+ gracefulShutdownStream: AsyncStream < Void > ,
244+ addedServiceChannel: AsyncChannel < ServiceGroupConfiguration . ServiceConfiguration >
206245 ) async throws {
207246 self . logger. debug (
208247 " Starting service lifecycle " ,
@@ -280,25 +319,12 @@ public actor ServiceGroup: Sendable, Service {
280319 let gracefulShutdownManager = GracefulShutdownManager ( )
281320 gracefulShutdownManagers. append ( gracefulShutdownManager)
282321
283- // This must be addTask and not addTaskUnlessCancelled
284- // because we must run all the services for the below logic to work.
285- group. addTask {
286- return await TaskLocals . $gracefulShutdownManager. withValue ( gracefulShutdownManager) {
287- do {
288- try await serviceConfiguration. service. run ( )
289- return . serviceFinished( service: serviceConfiguration, index: index)
290- } catch {
291- return . serviceThrew( service: serviceConfiguration, index: index, error: error)
292- }
293- }
294- }
295- }
296-
297- group. addTask {
298- // This child task is waiting forever until the group gets cancelled.
299- let ( stream, _) = AsyncStream . makeStream ( of: Void . self)
300- await stream. first { _ in true }
301- return . cancellationCaught
322+ self . addServiceTask (
323+ group: & group,
324+ service: serviceConfiguration,
325+ gracefulShutdownManager: gracefulShutdownManager,
326+ index: index
327+ )
302328 }
303329
304330 // We are storing the services in an optional array now. When a slot in the array is
@@ -310,12 +336,52 @@ public actor ServiceGroup: Sendable, Service {
310336 " We did not create a graceful shutdown manager per service "
311337 )
312338
339+ group. addTask {
340+ // This child task is waiting forever until the group gets cancelled.
341+ let ( stream, _) = AsyncStream . makeStream ( of: Void . self)
342+ await stream. first { _ in true }
343+ return . cancellationCaught
344+ }
345+
346+ // Adds a task that listens to added services and funnels them into the task group
347+ self . addAddedServiceListenerTask ( group: & group, channel: addedServiceChannel)
348+
313349 // We are going to wait for any of the services to finish or
314350 // the signal sequence to throw an error.
315351 while !group. isEmpty {
316352 let result : ChildTaskResult ? = try await group. next ( )
317353
318354 switch result {
355+ case . newServiceAdded( let serviceConfiguration) :
356+ self . logger. debug (
357+ " Starting added service " ,
358+ metadata: [
359+ self . loggingConfiguration. keys. serviceKey: " \( serviceConfiguration. service) "
360+ ]
361+ )
362+
363+ let gracefulShutdownManager = GracefulShutdownManager ( )
364+ gracefulShutdownManagers. append ( gracefulShutdownManager)
365+ services. append ( serviceConfiguration)
366+
367+ precondition (
368+ services. count == gracefulShutdownManagers. count,
369+ " Mismatch between services and graceful shutdown managers "
370+ )
371+
372+ self . addServiceTask (
373+ group: & group,
374+ service: serviceConfiguration,
375+ gracefulShutdownManager: gracefulShutdownManager,
376+ index: services. count - 1
377+ )
378+
379+ // Each listener task can only handle a single added service, so we must add a new listener
380+ self . addAddedServiceListenerTask (
381+ group: & group,
382+ channel: addedServiceChannel
383+ )
384+
319385 case . serviceFinished( let service, let index) :
320386 if group. isCancelled {
321387 // The group is cancelled and we expect all services to finish
@@ -530,10 +596,13 @@ public actor ServiceGroup: Sendable, Service {
530596 group: inout ThrowingTaskGroup < ChildTaskResult , Error > ,
531597 gracefulShutdownManagers: [ GracefulShutdownManager ]
532598 ) async throws {
533- guard case . running = self . state else {
599+ guard case let . running( _ , addedServiceChannel ) = self . state else {
534600 fatalError ( " Unexpected state " )
535601 }
536602
603+ // Signal to stop adding new services (it is important that no new services are added after this point)
604+ addedServiceChannel. finish ( )
605+
537606 if #available( macOS 13 . 0 , iOS 16 . 0 , watchOS 9 . 0 , tvOS 16 . 0 , * ) ,
538607 let maximumGracefulShutdownDuration = self . maximumGracefulShutdownDuration
539608 {
@@ -717,6 +786,10 @@ public actor ServiceGroup: Sendable, Service {
717786 // We are going to continue the result loop since we have to wait for our service
718787 // to finish.
719788 break
789+
790+ case . newServiceAdded:
791+ // Since adding services is best effort, we simply ignore this
792+ break
720793 }
721794 }
722795 }
@@ -777,6 +850,46 @@ public actor ServiceGroup: Sendable, Service {
777850 cancellationTimeoutTask = nil
778851 }
779852 }
853+
854+ private func addServiceTask(
855+ group: inout ThrowingTaskGroup< ChildTaskResult , Error > ,
856+ service serviceConfiguration: ServiceGroupConfiguration. ServiceConfiguration,
857+ gracefulShutdownManager: GracefulShutdownManager,
858+ index: Int
859+ ) {
860+ // This must be addTask and not addTaskUnlessCancelled
861+ // because we must run all the services for the shutdown logic to work.
862+ group. addTask {
863+ return await TaskLocals . $gracefulShutdownManager. withValue ( gracefulShutdownManager) {
864+ do {
865+ try await serviceConfiguration. service. run ( )
866+ return . serviceFinished( service: serviceConfiguration, index: index)
867+ } catch {
868+ return . serviceThrew( service: serviceConfiguration, index: index, error: error)
869+ }
870+ }
871+ }
872+ }
873+
874+ private func addAddedServiceListenerTask(
875+ group: inout ThrowingTaskGroup< ChildTaskResult , Error > ,
876+ channel: AsyncChannel < ServiceGroupConfiguration . ServiceConfiguration >
877+ ) {
878+ group. addTask {
879+ return await withTaskCancellationHandler {
880+ var iterator = channel. makeAsyncIterator ( )
881+ if let addedService = await iterator. next ( ) {
882+ return . newServiceAdded( addedService)
883+ }
884+
885+ return . gracefulShutdownFinished
886+ } onCancel: {
887+ // Once the group is cancelled we will no longer read from the channel.
888+ // This will resume any suspended producer in `addServiceUnlessShutdown`.
889+ channel. finish ( )
890+ }
891+ }
892+ }
780893}
781894
782895// This should be removed once we support Swift 5.9+
0 commit comments