diff --git a/package.json b/package.json index 0a34759f..b3d2ca52 100644 --- a/package.json +++ b/package.json @@ -52,12 +52,12 @@ "dependencies": { "@graphql-tools/utils": "^10.0.0", "@whatwg-node/fetch": "^0.10.0", - "fets": "^0.8.0", "ansi-colors": "^4.1.3", + "fets": "^0.8.0", "openapi-types": "^12.1.0", "param-case": "^3.0.4", - "title-case": "^3.0.3", "qs": "^6.11.2", + "title-case": "^3.0.3", "tslib": "^2.5.0" }, "scripts": { @@ -69,18 +69,19 @@ "release": "yarn build && changeset publish" }, "devDependencies": { - "@changesets/changelog-github": "0.5.1", - "@changesets/cli": "2.29.7", "@babel/core": "7.28.4", "@babel/plugin-proposal-class-properties": "7.18.6", "@babel/preset-env": "7.28.3", "@babel/preset-typescript": "7.27.1", + "@changesets/changelog-github": "0.5.1", + "@changesets/cli": "2.29.7", "@types/express": "5.0.3", "@types/jest": "30.0.0", "@types/node": "24.9.0", + "@types/qs": "6.14.0", + "@types/readable-stream": "4.0.22", "@types/swagger-ui-dist": "3.30.6", "@types/yamljs": "0.2.34", - "@types/qs": "6.14.0", "babel-jest": "30.2.0", "bob-the-bundler": "7.0.1", "chalk": "^5.4.1", @@ -111,4 +112,4 @@ "access": "public" }, "packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e" -} \ No newline at end of file +} diff --git a/src/router.ts b/src/router.ts index c459ba26..6d12370e 100644 --- a/src/router.ts +++ b/src/router.ts @@ -23,7 +23,7 @@ import { convertName } from './common.js'; import { parseVariable } from './parse.js'; import { type StartSubscriptionEvent, - SubscriptionManager, + createSubscriptionManager, } from './subscriptions.js'; import { logger } from './logger.js'; import { @@ -138,7 +138,6 @@ export function createSofaRouter(sofa: Sofa) { const queryType = sofa.schema.getQueryType(); const mutationType = sofa.schema.getMutationType(); - const subscriptionManager = new SubscriptionManager(sofa); if (queryType) { Object.keys(queryType.getFields()).forEach((fieldName) => { @@ -152,82 +151,85 @@ export function createSofaRouter(sofa: Sofa) { }); } - router.route({ - path: '/webhook', - method: 'POST', - async handler(request, serverContext) { - const { subscription, variables, url }: StartSubscriptionEvent = - await request.json(); - try { - const sofaContext: DefaultSofaServerContext = Object.assign( - serverContext, - { - request, - } - ); - const result = await subscriptionManager.start( - { - subscription, - variables, - url, - }, - sofaContext - ); - return Response.json(result); - } catch (error) { - return Response.json(error, { - status: 500, - statusText: 'Subscription failed' as any, - }); - } - }, - }); + if (sofa.schema.getSubscriptionType()) { + const subscriptionManager = createSubscriptionManager(sofa); + router.route({ + path: '/webhook', + method: 'POST', + async handler(request, serverContext) { + const { subscription, variables, url }: StartSubscriptionEvent = + await request.json(); + try { + const sofaContext: DefaultSofaServerContext = Object.assign( + serverContext, + { + request, + } + ); + const result = await subscriptionManager.start( + { + subscription, + variables, + url, + }, + sofaContext + ); + return Response.json(result); + } catch (error) { + return Response.json(error, { + status: 500, + statusText: 'Subscription failed' as any, + }); + } + }, + }); - router.route({ - path: '/webhook/:id', - method: 'POST', - async handler(request, serverContext) { - const id = request.params?.id!; - const body = await request.json(); - const variables: any = body.variables; - try { - const sofaContext = Object.assign(serverContext, { - request, - }); - const contextValue = await sofa.contextFactory(sofaContext); - const result = await subscriptionManager.update( - { - id, - variables, - }, - contextValue - ); - return Response.json(result); - } catch (error) { - return Response.json(error, { - status: 500, - statusText: 'Subscription failed to update' as any, - }); - } - }, - }); + router.route({ + path: '/webhook/:id', + method: 'POST', + async handler(request, serverContext) { + const id = request.params?.id!; + const body = await request.json(); + const variables: any = body.variables; + try { + const sofaContext = Object.assign(serverContext, { + request, + }); + const contextValue = await sofa.contextFactory(sofaContext); + const result = await subscriptionManager.update( + { + id, + variables, + }, + contextValue + ); + return Response.json(result); + } catch (error) { + return Response.json(error, { + status: 500, + statusText: 'Subscription failed to update' as any, + }); + } + }, + }); - router.route({ - path: '/webhook/:id', - method: 'DELETE', - async handler(request) { - const id = request.params?.id!; - try { - const result = await subscriptionManager.stop(id); - return Response.json(result); - } catch (error) { - return Response.json(error, { - status: 500, - statusText: 'Subscription failed to stop' as any, - }); - } - }, - }); + router.route({ + path: '/webhook/:id', + method: 'DELETE', + async handler(request) { + const id = request.params?.id!; + try { + const result = await subscriptionManager.stop(id); + return Response.json(result); + } catch (error) { + return Response.json(error, { + status: 500, + statusText: 'Subscription failed to stop' as any, + }); + } + }, + }); + } return router; } diff --git a/src/sofa.ts b/src/sofa.ts index a34eed33..ade984db 100644 --- a/src/sofa.ts +++ b/src/sofa.ts @@ -61,6 +61,45 @@ export interface SofaConfig { openAPI?: RouterOpenAPIOptions; swaggerUI?: RouterSwaggerUIOptions; + + /** + * Webhook configuration for subscriptions. + */ + webhooks?: { + /** + * Maximum lifetime of a subscription webhook in seconds. + * After this time, the subscription will be automatically terminated. + * Default is never. + */ + maxSubscriptionWebhookLifetimeSeconds?: number; + /** + * Message sent to the webhook URL upon subscription termination. + * Can be a boolean, string, or a function that returns an object with a reason. + * If set to `false`, no message will be sent. + * If set to `true`, a default message will be sent. + * If a string is provided, it will be used as the reason for termination. + * If a function is provided, it will be called with the reason and should return an object. + * The function can also return more fields than just reason, which will then get attached to the message. Useful for e.g. timestamps. + * Default is to send no message. + * + * The termination reason will be sent in the extensions field of the payload as follows: + * ```json + * { + * "extensions": { + * "webhook": { + * "termination": { + * "reason": "Max subscription lifetime reached (60s)" + * } + * } + * } + * } + * ``` + */ + terminationMessage?: + | boolean + | string + | ((reason: string) => { reason: string }); + }; } export interface Sofa { @@ -79,6 +118,8 @@ export interface Sofa { openAPI?: RouterOpenAPIOptions; swaggerUI?: RouterSwaggerUIOptions; + + webhooks?: SofaConfig['webhooks']; } export function createSofa(config: SofaConfig): Sofa { diff --git a/src/subscriptions.ts b/src/subscriptions.ts index c2354404..160bfa1f 100644 --- a/src/subscriptions.ts +++ b/src/subscriptions.ts @@ -12,29 +12,10 @@ import type { Sofa } from './sofa.js'; import { getOperationInfo } from './ast.js'; import { parseVariable } from './parse.js'; import { logger } from './logger.js'; +import { ObjMap } from 'graphql/jsutils/ObjMap.js'; -function isAsyncIterable(obj: any): obj is AsyncIterable { - return typeof obj[Symbol.asyncIterator] === 'function'; -} - -// To start subscription: -// - an url that Sofa should trigger -// - name of a subscription -// - variables if needed -// - some sort of an auth token -// - Sofa should return a unique id of that subscription -// - respond with OK 200 - -// To stop subscription -// - an id is required -// - respond with OK 200 - -// To update subscription -// - an id is required -// - new set of variables - -export type ID = string; -export type SubscriptionFieldName = string; +type SubscriptionFieldName = string; +type ID = string; export interface StartSubscriptionEvent { subscription: SubscriptionFieldName; @@ -47,10 +28,6 @@ export interface UpdateSubscriptionEvent { variables: any; } -export interface StopSubscriptionResponse { - id: ID; -} - interface BuiltOperation { operationName: string; document: DocumentNode; @@ -58,115 +35,68 @@ interface BuiltOperation { } interface StoredClient { - name: SubscriptionFieldName; url: string; - iterator: AsyncIterator; + subscriptionName: SubscriptionFieldName; + rx: AsyncIterator; + timeoutHandle?: NodeJS.Timeout; } -export class SubscriptionManager { - private operations = new Map(); - private clients = new Map(); - - constructor(private sofa: Sofa) { - this.buildOperations(); - } - - public async start( - event: StartSubscriptionEvent, - contextValue: ContextValue - ) { - const id = crypto.randomUUID(); - const name = event.subscription; - - if (!this.operations.has(name)) { - throw new Error(`Subscription '${name}' is not available`); - } - - logger.info(`[Subscription] Start ${id}`, event); +function isAsyncIterable(obj: any): obj is AsyncIterable { + return typeof obj[Symbol.asyncIterator] === 'function'; +} - const result = await this.execute({ - id, - name, - url: event.url, - variables: event.variables, - contextValue, - }); +export function createSubscriptionManager(sofa: Sofa) { + const subscription = sofa.schema.getSubscriptionType(); - if (typeof result !== 'undefined') { - return result; - } - - return { id }; + if (!subscription) { + throw new Error('Schema does not have subscription type'); } - public async stop(id: ID): Promise { - logger.info(`[Subscription] Stop ${id}`); - - if (!this.clients.has(id)) { - throw new Error(`Subscription with ID '${id}' does not exist`); - } - - const execution = this.clients.get(id)!; - - if (execution.iterator.return) { - execution.iterator.return(); - } + const fieldMap = subscription.getFields(); + const operations = new Map(); + const clients = new Map(); + + for (const field in fieldMap) { + const operationNode = buildOperationNodeForField({ + kind: 'subscription' as OperationTypeNode, + field, + schema: sofa.schema, + models: sofa.models, + ignore: sofa.ignore, + circularReferenceDepth: sofa.depthLimit, + }); + const document: DocumentNode = { + kind: Kind.DOCUMENT, + definitions: [operationNode], + }; - this.clients.delete(id); + const { variables, name: operationName } = getOperationInfo(document)!; - return { id }; + operations.set(field, { + operationName, + document, + variables, + }); } - public async update( - event: UpdateSubscriptionEvent, + const readableStreamFromOperationCall = async ( + id: ID, + subscriptionName: SubscriptionFieldName, + event: StartSubscriptionEvent | UpdateSubscriptionEvent, contextValue: ContextValue - ) { - const { variables, id } = event; - - logger.info(`[Subscription] Update ${id}`, event); - - if (!this.clients.has(id)) { - throw new Error(`Subscription with ID '${id}' does not exist`); + ) => { + const operation = operations.get(subscriptionName); + if (!operation) { + throw new Error(`Subscription '${subscriptionName}' is not available`); } - const { name: subscription, url } = this.clients.get(id)!; - - this.stop(id); - - return this.start( - { - url, - subscription, - variables, - }, - contextValue - ); - } - - private async execute({ - id, - name, - url, - variables, - contextValue, - }: { - id: ID; - name: SubscriptionFieldName; - url: string; - variables: Record; - contextValue: ContextValue; - }) { - const { - document, - operationName, - variables: variableNodes, - } = this.operations.get(name)!; + logger.info(`[Subscription] Start ${id}`, event); - const variableValues = variableNodes.reduce((values, variable) => { + const variableValues = operation.variables.reduce((values, variable) => { const value = parseVariable({ - value: variables[variable.variable.name.value], + value: event.variables[variable.variable.name.value], variable, - schema: this.sofa.schema, + schema: sofa.schema, }); if (typeof value === 'undefined') { @@ -179,97 +109,173 @@ export class SubscriptionManager { }; }, {}); - const execution = await this.sofa.subscribe({ - schema: this.sofa.schema, - document, - operationName, + const subscriptionIterable = await sofa.subscribe({ + schema: sofa.schema, + document: operation.document, + operationName: operation.operationName, variableValues, contextValue, }); - if (isAsyncIterable(execution)) { - // successful - - // add execution to clients - this.clients.set(id, { - name, - url, - iterator: execution as any, - }); - - // success - (async () => { - for await (const result of execution) { - await this.sendData({ - id, - result, - }); - } - })().then( - () => { - // completes - this.clients.delete(id); - }, - (e) => { - logger.info(`Subscription #${id} closed`); - logger.error(e); - this.clients.delete(id); - } - ); - } else { - return execution as ExecutionResult; - } - } - - private async sendData({ id, result }: { id: ID; result: any }) { - if (!this.clients.has(id)) { - throw new Error(`Subscription with ID '${id}' does not exist`); + if (!isAsyncIterable(subscriptionIterable)) { + throw subscriptionIterable as ExecutionResult; } - const { url } = this.clients.get(id)!; - - logger.info(`[Subscription] Trigger ${id}`); + return subscriptionIterable; + }; + const sendMessage = async (message: any, url: string) => { const response = await fetch(url, { method: 'POST', - body: JSON.stringify(result), + body: JSON.stringify(message), headers: { 'Content-Type': 'application/json', }, }); - await response.text(); - } - - private buildOperations() { - const subscription = this.sofa.schema.getSubscriptionType(); - if (!subscription) { - return; + if (!response.ok) { + throw new Error( + `Failed to send data to ${url}: ${response.status} ${response.statusText}` + ); } - const fieldMap = subscription.getFields(); + response.body?.cancel(); // We don't care about the response body but want to free up resources + }; + + const startMessaging = (id: string, url: string, rx: AsyncIterable) => { + (async () => { + for await (const message of rx) { + try { + await sendMessage(message, url); + logger.debug(`[Subscription] Sent message to ${url}`, message); + } catch (error) { + logger.error( + `[Subscription] Error sending message to ${url}:`, + error + ); + stop(id, `Subscription stopped due to delivery error`); + break; + } + } + stop(id, 'Subscription completed gracefully'); + })(); + }; - for (const field in fieldMap) { - const operationNode = buildOperationNodeForField({ - kind: 'subscription' as OperationTypeNode, - field, - schema: this.sofa.schema, - models: this.sofa.models, - ignore: this.sofa.ignore, - circularReferenceDepth: this.sofa.depthLimit, - }); - const document: DocumentNode = { - kind: Kind.DOCUMENT, - definitions: [operationNode], + const start = async ( + event: StartSubscriptionEvent, + contextValue: ContextValue + ) => { + const id = crypto.randomUUID(); + const subscriptionName = event.subscription; + + const rx = await readableStreamFromOperationCall( + id, + subscriptionName, + event, + contextValue + ); + + startMessaging(id, event.url, rx); + + clients.set(id, { + url: event.url, + subscriptionName, + rx, + timeoutHandle: sofa.webhooks?.maxSubscriptionWebhookLifetimeSeconds + ? setTimeout(() => { + stop(id, 'Max subscription lifetime reached'); + }, sofa.webhooks?.maxSubscriptionWebhookLifetimeSeconds * 1000) + : undefined, + }); + + return { id }; + }; + + const stop = async ( + /** + * Subscription ID + */ + id: ID, + /** + * Reason for termination. Set to null to skip sending termination message. + */ + terminationReason?: string | null + ) => { + logger.info(`[Subscription] Stop ${id}`); + + const client = clients.get(id); + + if (!client) { + logger.warn( + `Subscription with ID '${id}' does not exist (${terminationReason}), might have been already stopped. Skipping stop.` + ); + return { id }; + } + + if (sofa.webhooks?.terminationMessage && terminationReason !== null) { + const termination = + typeof sofa.webhooks.terminationMessage === 'function' + ? sofa.webhooks.terminationMessage( + terminationReason || 'Subscription terminated' + ) + : { + reason: + typeof sofa.webhooks.terminationMessage === 'boolean' + ? terminationReason || 'Subscription terminated' + : sofa.webhooks.terminationMessage, + }; + + const terminationMessage: ExecutionResult< + ObjMap, + ObjMap + > = { + extensions: { + webhook: { + termination, + }, + }, }; + await sendMessage(terminationMessage, client.url); + } - const { variables, name: operationName } = getOperationInfo(document)!; + if (client.timeoutHandle) { + clearTimeout(client.timeoutHandle); + } - this.operations.set(field, { - operationName, - document, - variables, - }); + // this terminates the rx stream + if (client.rx.return) { + await client.rx.return(); } - } + // remove the client from the map + clients.delete(id); + + return { id }; + }; + + const update = async ( + event: UpdateSubscriptionEvent, + contextValue: ContextValue + ) => { + logger.info(`[Subscription] Update ${event.id}`, event); + const client = clients.get(event.id); + if (!client) { + throw new Error(`Subscription with ID '${event.id}' does not exist`); + } + + if (client.rx.return) { + await client.rx.return(); + } + + const rx = await readableStreamFromOperationCall( + event.id, + client.subscriptionName, + event, + contextValue + ); + + startMessaging(event.id, client.url, rx); + client.rx = rx; + return { id: event.id }; + }; + return { start, stop, update }; } diff --git a/tests/subscriptions.spec.ts b/tests/subscriptions.spec.ts index fd1e4377..50342e92 100644 --- a/tests/subscriptions.spec.ts +++ b/tests/subscriptions.spec.ts @@ -3,7 +3,14 @@ jest.mock('@whatwg-node/fetch', () => { return { ...original, fetch: jest.fn().mockResolvedValue({ - text: () => ({}), + ok: true, + status: 200, + statusText: 'OK', + json: async () => ({}), + text: async () => '', + body: { + cancel: () => undefined, + }, }), }; }); @@ -205,3 +212,97 @@ test('should start subscriptions with parameters', async () => { await delay(1000); expect(fetch).toHaveBeenCalledTimes(1); }); + +test('should send termination message ', async () => { + (fetch as jest.Mock).mockClear(); + + const pubsub = createPubSub(); + const sofa = useSofa({ + basePath: '/api', + schema: createSchema({ + typeDefs, + resolvers: { + Subscription: { + onBook: { + subscribe: () => pubsub.subscribe(BOOK_ADDED), + }, + }, + }, + }), + webhooks: { + terminationMessage: true, + maxSubscriptionWebhookLifetimeSeconds: 60, + }, + }); + + const res = await sofa.fetch('http://localhost:4000/api/webhook', { + method: 'POST', + body: JSON.stringify({ + subscription: 'onBook', + url: '/book', + }), + }); + expect(res.status).toBe(200); + const resBody = await res.json(); + pubsub.publish(BOOK_ADDED, { onBook: testBook1 }); + await delay(1000); + expect(fetch).toHaveBeenCalledTimes(1); + const deleteRes = await sofa.fetch( + `http://localhost:4000/api/webhook/${resBody.id}`, + { + method: 'DELETE', + } + ); + expect(deleteRes.status).toBe(200); + pubsub.publish(BOOK_ADDED, { onBook: testBook2 }); + await delay(1000); + expect(fetch).toHaveBeenCalledTimes(2); + expect(fetch).toHaveBeenLastCalledWith('/book', { + method: 'POST', + body: '{"extensions":{"webhook":{"termination":{"reason":"Subscription terminated"}}}}', + headers: { 'Content-Type': 'application/json' }, + }); +}); + +test('should send termination message ', async () => { + (fetch as jest.Mock).mockClear(); + + const pubsub = createPubSub(); + const sofa = useSofa({ + basePath: '/api', + schema: createSchema({ + typeDefs, + resolvers: { + Subscription: { + onBook: { + subscribe: () => pubsub.subscribe(BOOK_ADDED), + }, + }, + }, + }), + webhooks: { + terminationMessage: true, + maxSubscriptionWebhookLifetimeSeconds: 2, + }, + }); + + const res = await sofa.fetch('http://localhost:4000/api/webhook', { + method: 'POST', + body: JSON.stringify({ + subscription: 'onBook', + url: '/book', + }), + }); + expect(res.status).toBe(200); + const resBody = await res.json(); + pubsub.publish(BOOK_ADDED, { onBook: testBook1 }); + await delay(1000); + expect(fetch).toHaveBeenCalledTimes(1); + await delay(2000); + expect(fetch).toHaveBeenCalledTimes(2); + expect(fetch).toHaveBeenLastCalledWith('/book', { + method: 'POST', + body: '{"extensions":{"webhook":{"termination":{"reason":"Max subscription lifetime reached"}}}}', + headers: { 'Content-Type': 'application/json' }, + }); +}); \ No newline at end of file diff --git a/yarn.lock b/yarn.lock index ab3f16a1..ef58fcd4 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2026,6 +2026,13 @@ resolved "https://registry.yarnpkg.com/@types/range-parser/-/range-parser-1.2.7.tgz#50ae4353eaaddc04044279812f52c8c65857dbcb" integrity sha512-hKormJbkJqzQGhziax5PItDUTMAM9uE2XXQmM37dyd4hVM+5aVl7oVxMVUiVQn2oCQFN/LKCZdvSM0pFRqbSmQ== +"@types/readable-stream@4.0.22": + version "4.0.22" + resolved "https://registry.yarnpkg.com/@types/readable-stream/-/readable-stream-4.0.22.tgz#882fda4f17b6580acb257df3f22ca69c05470b29" + integrity sha512-/FFhJpfCLAPwAcN3mFycNUa77ddnr8jTgF5VmSNetaemWB2cIlfCA9t0YTM3JAT0wOcv8D4tjPo7pkDhK3EJIg== + dependencies: + "@types/node" "*" + "@types/send@*": version "0.17.5" resolved "https://registry.yarnpkg.com/@types/send/-/send-0.17.5.tgz#d991d4f2b16f2b1ef497131f00a9114290791e74"