diff --git a/src/adapters/node-http/core.ts b/src/adapters/node-http/core.ts index 060c8d2a..618c9b25 100644 --- a/src/adapters/node-http/core.ts +++ b/src/adapters/node-http/core.ts @@ -64,13 +64,46 @@ export const createOpenApiNodeHttpHandler = < body: OpenApiResponse | undefined, ) => { res.statusCode = statusCode; - res.setHeader('Content-Type', 'application/json'); - for (const [key, value] of Object.entries(headers)) { - if (typeof value !== 'undefined') { - res.setHeader(key, value); + + // Support sending SSE streams + if (body && typeof body.getReader === 'function') { + const reader = body.getReader(); + + res.setHeader('Connection', 'keep-alive'); + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.flushHeaders(); + + const processStream = async (reader: ReadableStreamDefaultReader, res: TResponse) => { + try { + let done, value; + do { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + ({ done, value } = await reader.read()); + if (!done) { + res.write(`data: ${Buffer.from(value.buffer).toString()}\n\n`); + // @ts-expect-error flush is not in the types + res.flush(); + } + } while (!done); + } catch (error) { + console.error('Error while reading from stream', error); + } finally { + reader.releaseLock(); + res.end(); + } + }; + + void processStream(reader, res); + } else { + res.setHeader('Content-Type', 'application/json'); + for (const [key, value] of Object.entries(headers)) { + if (typeof value !== 'undefined') { + res.setHeader(key, value); + } } + res.end(JSON.stringify(body)); } - res.end(JSON.stringify(body)); }; const method = req.method! as OpenApiMethod & 'HEAD';