diff --git a/README.md b/README.md index 284de788..2c33cc48 100644 --- a/README.md +++ b/README.md @@ -58,8 +58,8 @@ The result of this query is a `GraphQLResult` that encodes to the following JSON ### Subscription This package supports GraphQL subscription. To create a subscription field in a GraphQL schema, use the `subscribe` -resolver that returns any type that conforms to `AsyncSequence`. You must also provide a `resolver`, which defines how -to process each event as it occurs and must return the field result type. Here is an example: +resolver that returns any type that conforms to `AsyncSequence`, with a `Sendable` element type. You must also provide +a `resolver`, which defines how to process each event as it occurs and must return the field result type. Here is an example: ```swift let schema = try GraphQLSchema( diff --git a/Sources/GraphQL/Subscription/Subscribe.swift b/Sources/GraphQL/Subscription/Subscribe.swift index 56fed7da..277c067f 100644 --- a/Sources/GraphQL/Subscription/Subscribe.swift +++ b/Sources/GraphQL/Subscription/Subscribe.swift @@ -32,25 +32,37 @@ func subscribe( ) return sourceResult.map { sourceStream in - AsyncThrowingStream { - // The type-cast below is required on Swift <6. Once we drop Swift 5 support it may be - // removed. - var iterator = sourceStream.makeAsyncIterator() as (any AsyncIteratorProtocol) - guard let eventPayload = try await iterator.next() else { - return nil + AsyncThrowingStream { continuation in + let task = Task { + do { + for try await event in sourceStream { + // Check for cancellation before processing each event + try Task.checkCancellation() + + let result = try await execute( + schema: schema, + documentAST: documentAST, + // Despite the warning, we must force unwrap because on optional unwrap, + // compiler throws: + // `marker protocol 'Sendable' cannot be used in a conditional cast` + rootValue: event as! (any Sendable), + context: context, + variableValues: variableValues, + operationName: operationName + ) + continuation.yield(result) + } + continuation.finish() + } catch is CancellationError { + continuation.finish() + } catch { + continuation.finish(throwing: error) + } + } + + continuation.onTermination = { @Sendable _ in + task.cancel() } - // Despite the warning, we must force unwrap because on optional unwrap, compiler - // throws: - // `marker protocol 'Sendable' cannot be used in a conditional cast` - let rootValue = eventPayload as! (any Sendable) - return try await execute( - schema: schema, - documentAST: documentAST, - rootValue: rootValue, - context: context, - variableValues: variableValues, - operationName: operationName - ) } } } @@ -206,8 +218,9 @@ func executeSubscription( } else if let error = resolved as? GraphQLError { return .failure(.init([error])) } else if let stream = resolved as? any AsyncSequence { - // Despite the warning, we must force unwrap because on optional unwrap, compiler throws: - // `marker protocol 'Sendable' cannot be used in a conditional cast` + // Force cast is required because Swift doesn't allow conditional casting with marker + // protocols like Sendable. This is safe because subscription resolvers should only + // return Sendable AsyncSequences. return .success(stream as! (any AsyncSequence & Sendable)) } else if resolved == nil { return .failure(.init([