Skip to content

Commit 34cd941

Browse files
fix: Encapsulates subscription steam consumption
This fixes an issue observed in downstream code, where awaiting a GraphQL subscription with `makeAsyncIterator().next()` results in an error of `Fatal error: attempt to await next() on more than one task`. This encapsulates the consumption of the underlying resolver stream in a single cancellable task so that downstream code does not share the underlying stream iterator.
1 parent 1e71c9a commit 34cd941

File tree

2 files changed

+35
-22
lines changed

2 files changed

+35
-22
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ The result of this query is a `GraphQLResult` that encodes to the following JSON
5858
### Subscription
5959

6060
This package supports GraphQL subscription. To create a subscription field in a GraphQL schema, use the `subscribe`
61-
resolver that returns any type that conforms to `AsyncSequence`. You must also provide a `resolver`, which defines how
62-
to process each event as it occurs and must return the field result type. Here is an example:
61+
resolver that returns any type that conforms to `AsyncSequence`, with a `Sendable` element type. You must also provide
62+
a `resolver`, which defines how to process each event as it occurs and must return the field result type. Here is an example:
6363

6464
```swift
6565
let schema = try GraphQLSchema(

Sources/GraphQL/Subscription/Subscribe.swift

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,37 @@ func subscribe(
3232
)
3333

3434
return sourceResult.map { sourceStream in
35-
AsyncThrowingStream<GraphQLResult, Error> {
36-
// The type-cast below is required on Swift <6. Once we drop Swift 5 support it may be
37-
// removed.
38-
var iterator = sourceStream.makeAsyncIterator() as (any AsyncIteratorProtocol)
39-
guard let eventPayload = try await iterator.next() else {
40-
return nil
35+
AsyncThrowingStream<GraphQLResult, Error> { continuation in
36+
let task = Task {
37+
do {
38+
for try await event in sourceStream {
39+
// Check for cancellation before processing each event
40+
try Task.checkCancellation()
41+
42+
let result = try await execute(
43+
schema: schema,
44+
documentAST: documentAST,
45+
// Despite the warning, we must force unwrap because on optional unwrap,
46+
// compiler throws:
47+
// `marker protocol 'Sendable' cannot be used in a conditional cast`
48+
rootValue: event as! (any Sendable),
49+
context: context,
50+
variableValues: variableValues,
51+
operationName: operationName
52+
)
53+
continuation.yield(result)
54+
}
55+
continuation.finish()
56+
} catch is CancellationError {
57+
continuation.finish()
58+
} catch {
59+
continuation.finish(throwing: error)
60+
}
61+
}
62+
63+
continuation.onTermination = { @Sendable _ in
64+
task.cancel()
4165
}
42-
// Despite the warning, we must force unwrap because on optional unwrap, compiler
43-
// throws:
44-
// `marker protocol 'Sendable' cannot be used in a conditional cast`
45-
let rootValue = eventPayload as! (any Sendable)
46-
return try await execute(
47-
schema: schema,
48-
documentAST: documentAST,
49-
rootValue: rootValue,
50-
context: context,
51-
variableValues: variableValues,
52-
operationName: operationName
53-
)
5466
}
5567
}
5668
}
@@ -206,8 +218,9 @@ func executeSubscription(
206218
} else if let error = resolved as? GraphQLError {
207219
return .failure(.init([error]))
208220
} else if let stream = resolved as? any AsyncSequence {
209-
// Despite the warning, we must force unwrap because on optional unwrap, compiler throws:
210-
// `marker protocol 'Sendable' cannot be used in a conditional cast`
221+
// Force cast is required because Swift doesn't allow conditional casting with marker
222+
// protocols like Sendable. This is safe because subscription resolvers should only
223+
// return Sendable AsyncSequences.
211224
return .success(stream as! (any AsyncSequence & Sendable))
212225
} else if resolved == nil {
213226
return .failure(.init([

0 commit comments

Comments
 (0)