1414 * limitations under the License.
1515 */
1616
17- internal import Atomics
17+ private import Synchronization
1818
1919/// A gRPC client.
2020///
@@ -110,7 +110,7 @@ internal import Atomics
110110/// additional resources that need their lifecycles managed you should consider using [Swift Service
111111/// Lifecycle](https://github.com/swift-server/swift-service-lifecycle).
112112@available ( macOS 15 . 0 , iOS 18 . 0 , watchOS 11 . 0 , tvOS 18 . 0 , visionOS 2 . 0 , * )
113- public struct GRPCClient : Sendable {
113+ public final class GRPCClient : Sendable {
114114 /// The transport which provides a bidirectional communication channel with the server.
115115 private let transport : any ClientTransport
116116
@@ -123,10 +123,10 @@ public struct GRPCClient: Sendable {
123123 private let interceptors : [ any ClientInterceptor ]
124124
125125 /// The current state of the client.
126- private let state : ManagedAtomic < State >
126+ private let state : Mutex < State >
127127
128128 /// The state of the client.
129- private enum State : UInt8 , AtomicValue {
129+ private enum State : Sendable {
130130 /// The client hasn't been started yet. Can transition to `running` or `stopped`.
131131 case notStarted
132132 /// The client is running and can send RPCs. Can transition to `stopping`.
@@ -137,6 +137,56 @@ public struct GRPCClient: Sendable {
137137 /// The client has stopped, no RPCs are in flight and no more will be accepted. This state
138138 /// is terminal.
139139 case stopped
140+
141+ mutating func run( ) throws {
142+ switch self {
143+ case . notStarted:
144+ self = . running
145+
146+ case . running:
147+ throw RuntimeError (
148+ code: . clientIsAlreadyRunning,
149+ message: " The client is already running and can only be started once. "
150+ )
151+
152+ case . stopping, . stopped:
153+ throw RuntimeError (
154+ code: . clientIsStopped,
155+ message: " The client has stopped and can only be started once. "
156+ )
157+ }
158+ }
159+
160+ mutating func stopped( ) {
161+ self = . stopped
162+ }
163+
164+ mutating func beginGracefulShutdown( ) -> Bool {
165+ switch self {
166+ case . notStarted:
167+ self = . stopped
168+ return false
169+ case . running:
170+ self = . stopping
171+ return true
172+ case . stopping, . stopped:
173+ return false
174+ }
175+ }
176+
177+ func checkExecutable( ) throws {
178+ switch self {
179+ case . notStarted, . running:
180+ // Allow .notStarted as making a request can race with 'run()'. Transports should tolerate
181+ // queuing the request if not yet started.
182+ ( )
183+ case . stopping, . stopped:
184+ throw RuntimeError (
185+ code: . clientIsStopped,
186+ message: " Client has been stopped. Can't make any more RPCs. "
187+ )
188+ }
189+ }
140190 }
141191
142192 /// Creates a new client with the given transport, interceptors and configuration.
@@ -154,7 +204,7 @@ public struct GRPCClient: Sendable {
154204 ) {
155205 self . transport = transport
156206 self . interceptors = interceptors
157- self . state = ManagedAtomic ( . notStarted)
207+ self . state = Mutex ( . notStarted)
158208 }
159209
160210 /// Start the client.
@@ -165,33 +215,11 @@ public struct GRPCClient: Sendable {
165215 /// The client, and by extension this function, can only be run once. If the client is already
166216 /// running or has already been closed then a ``RuntimeError`` is thrown.
167217 public func run( ) async throws {
168- let ( wasNotStarted, original) = self . state. compareExchange (
169- expected: . notStarted,
170- desired: . running,
171- ordering: . sequentiallyConsistent
172- )
173-
174- guard wasNotStarted else {
175- switch original {
176- case . notStarted:
177- // The value wasn't exchanged so the original value can't be 'notStarted'.
178- fatalError ( )
179- case . running:
180- throw RuntimeError (
181- code: . clientIsAlreadyRunning,
182- message: " The client is already running and can only be started once. "
183- )
184- case . stopping, . stopped:
185- throw RuntimeError (
186- code: . clientIsStopped,
187- message: " The client has stopped and can only be started once. "
188- )
189- }
190- }
218+ try self . state. withLock { try $0. run ( ) }
191219
192- // When we exit this function we must have stopped.
220+ // When this function exits the client must have stopped.
193221 defer {
194- self . state. store ( . stopped , ordering : . sequentiallyConsistent )
222+ self . state. withLock { $0 . stopped ( ) }
195223 }
196224
197225 do {
@@ -211,50 +239,9 @@ public struct GRPCClient: Sendable {
211239 /// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
212240 /// executing ``run()`` if you want to abruptly stop in-flight RPCs.
213241 public func beginGracefulShutdown( ) {
214- while true {
215- let ( wasRunning, actualState) = self . state. compareExchange (
216- expected: . running,
217- desired: . stopping,
218- ordering: . sequentiallyConsistent
219- )
220-
221- // Transition from running to stopping: close the transport.
222- if wasRunning {
223- self . transport. beginGracefulShutdown ( )
224- return
225- }
226-
227- // The expected state wasn't 'running'. There are two options:
228- // 1. The client isn't running yet.
229- // 2. The client is already stopping or stopped.
230- switch actualState {
231- case . notStarted:
232- // Not started: try going straight to stopped.
233- let ( wasNotStarted, _) = self . state. compareExchange (
234- expected: . notStarted,
235- desired: . stopped,
236- ordering: . sequentiallyConsistent
237- )
238-
239- // If the exchange happened then just return: the client wasn't started so there's no
240- // transport to start.
241- //
242- // If the exchange didn't happen then continue looping: the client must've been started by
243- // another thread.
244- if wasNotStarted {
245- return
246- } else {
247- continue
248- }
249-
250- case . running:
251- // Unreachable: the value was exchanged and this was the expected value.
252- fatalError ( )
253-
254- case . stopping, . stopped:
255- // No exchange happened but the client is already stopping.
256- return
257- }
242+ let wasRunning = self . state. withLock { $0. beginGracefulShutdown ( ) }
243+ if wasRunning {
244+ self . transport. beginGracefulShutdown ( )
258245 }
259246 }
260247
@@ -371,18 +358,7 @@ public struct GRPCClient: Sendable {
371358 options: CallOptions ,
372359 handler: @Sendable @escaping ( ClientResponse . Stream < Response > ) async throws -> ReturnValue
373360 ) async throws -> ReturnValue {
374- switch self . state. load ( ordering: . sequentiallyConsistent) {
375- case . notStarted, . running:
376- // Allow .notStarted as making a request can race with 'run()'. Transports should tolerate
377- // queuing the request if not yet started.
378- ( )
379- case . stopping, . stopped:
380- throw RuntimeError (
381- code: . clientIsStopped,
382- message: " Client has been stopped. Can't make any more RPCs. "
383- )
384- }
385-
361+ try self . state. withLock { try $0. checkExecutable ( ) }
386362 let methodConfig = self . transport. configuration ( forMethod: descriptor)
387363 var options = options
388364 options. formUnion ( with: methodConfig)
0 commit comments