Skip to content

Commit 4d7ebbd

Browse files
authored
feat(realtime): realtime explicit REST call (#1751)
1 parent 188fa17 commit 4d7ebbd

File tree

2 files changed

+279
-0
lines changed

2 files changed

+279
-0
lines changed

packages/core/realtime-js/src/RealtimeChannel.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,68 @@ export default class RealtimeChannel {
435435
}
436436
return this._on(type, filter, callback)
437437
}
438+
/**
439+
* Sends a broadcast message explicitly via REST API.
440+
*
441+
* This method always uses the REST API endpoint regardless of WebSocket connection state.
442+
* Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback.
443+
*
444+
* @param event The name of the broadcast event
445+
* @param payload Payload to be sent (required)
446+
* @param opts Options including timeout
447+
* @returns Promise resolving to object with success status, and error details if failed
448+
*/
449+
async httpSend(
450+
event: string,
451+
payload: any,
452+
opts: { timeout?: number } = {}
453+
): Promise<{ success: true } | { success: false; status: number; error: string }> {
454+
const authorization = this.socket.accessTokenValue
455+
? `Bearer ${this.socket.accessTokenValue}`
456+
: ''
457+
458+
if (payload === undefined || payload === null) {
459+
return Promise.reject('Payload is required for httpSend()')
460+
}
461+
462+
const options = {
463+
method: 'POST',
464+
headers: {
465+
Authorization: authorization,
466+
apikey: this.socket.apiKey ? this.socket.apiKey : '',
467+
'Content-Type': 'application/json',
468+
},
469+
body: JSON.stringify({
470+
messages: [
471+
{
472+
topic: this.subTopic,
473+
event,
474+
payload: payload,
475+
private: this.private,
476+
},
477+
],
478+
}),
479+
}
480+
481+
const response = await this._fetchWithTimeout(
482+
this.broadcastEndpointURL,
483+
options,
484+
opts.timeout ?? this.timeout
485+
)
486+
487+
if (response.status === 202) {
488+
return { success: true }
489+
}
490+
491+
let errorMessage = response.statusText
492+
try {
493+
const errorBody = await response.json()
494+
errorMessage = errorBody.error || errorBody.message || errorMessage
495+
} catch {}
496+
497+
return Promise.reject(new Error(errorMessage))
498+
}
499+
438500
/**
439501
* Sends a message into the channel.
440502
*
@@ -454,6 +516,12 @@ export default class RealtimeChannel {
454516
opts: { [key: string]: any } = {}
455517
): Promise<RealtimeChannelSendResponse> {
456518
if (!this._canPush() && args.type === 'broadcast') {
519+
console.warn(
520+
'Realtime send() is automatically falling back to REST API. ' +
521+
'This behavior will be deprecated in the future. ' +
522+
'Please use httpSend() explicitly for REST delivery.'
523+
)
524+
457525
const { event, payload: endpoint_payload } = args
458526
const authorization = this.socket.accessTokenValue
459527
? `Bearer ${this.socket.accessTokenValue}`

packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,3 +473,214 @@ describe('send', () => {
473473
})
474474
})
475475
})
476+
477+
describe('httpSend', () => {
478+
const createMockResponse = (status: number, statusText?: string, body?: any) => ({
479+
status,
480+
statusText: statusText || 'OK',
481+
headers: new Headers(),
482+
body: null,
483+
json: vi.fn().mockResolvedValue(body || {}),
484+
})
485+
486+
const createSocket = (hasToken = false, fetchMock?: any) => {
487+
const config: any = {
488+
fetch: fetchMock,
489+
params: { apikey: 'abc123' },
490+
}
491+
if (hasToken) {
492+
config.accessToken = () => Promise.resolve('token123')
493+
}
494+
return new RealtimeClient(testSetup.url, config)
495+
}
496+
497+
const testCases = [
498+
{
499+
name: 'without access token',
500+
hasToken: false,
501+
expectedAuth: '',
502+
},
503+
{
504+
name: 'with access token',
505+
hasToken: true,
506+
expectedAuth: 'Bearer token123',
507+
},
508+
]
509+
510+
testCases.forEach(({ name, hasToken, expectedAuth }) => {
511+
describe(name, () => {
512+
test('sends with correct Authorization header', async () => {
513+
const mockResponse = createMockResponse(202)
514+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
515+
const socket = createSocket(hasToken, fetchStub)
516+
if (hasToken) await socket.setAuth()
517+
const channel = socket.channel('topic')
518+
519+
const result = await channel.httpSend('test', { data: 'test' })
520+
521+
expect(result).toEqual({ success: true })
522+
expect(fetchStub).toHaveBeenCalledTimes(1)
523+
const [, options] = fetchStub.mock.calls[0]
524+
expect(options.headers.Authorization).toBe(expectedAuth)
525+
expect(options.headers.apikey).toBe('abc123')
526+
})
527+
528+
test('rejects when payload is not provided', async () => {
529+
const socket = createSocket(hasToken)
530+
if (hasToken) await socket.setAuth()
531+
const channel = socket.channel('topic')
532+
533+
await expect(channel.httpSend('test', undefined as any)).rejects.toBe(
534+
'Payload is required for httpSend()'
535+
)
536+
})
537+
538+
test('rejects when payload is null', async () => {
539+
const socket = createSocket(hasToken)
540+
if (hasToken) await socket.setAuth()
541+
const channel = socket.channel('topic')
542+
543+
await expect(channel.httpSend('test', null as any)).rejects.toBe(
544+
'Payload is required for httpSend()'
545+
)
546+
})
547+
548+
test('handles timeout error', async () => {
549+
const timeoutError = new Error('Request timeout')
550+
timeoutError.name = 'AbortError'
551+
const fetchStub = vi.fn().mockRejectedValue(timeoutError)
552+
const socket = createSocket(hasToken, fetchStub)
553+
if (hasToken) await socket.setAuth()
554+
const channel = socket.channel('topic')
555+
556+
await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow('Request timeout')
557+
})
558+
559+
test('handles non-202 status', async () => {
560+
const mockResponse = createMockResponse(500, 'Internal Server Error', {
561+
error: 'Server error',
562+
})
563+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
564+
const socket = createSocket(hasToken, fetchStub)
565+
if (hasToken) await socket.setAuth()
566+
const channel = socket.channel('topic')
567+
568+
await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow('Server error')
569+
})
570+
571+
test('respects custom timeout option', async () => {
572+
const mockResponse = createMockResponse(202)
573+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
574+
const socket = createSocket(hasToken, fetchStub)
575+
if (hasToken) await socket.setAuth()
576+
const channel = socket.channel('topic')
577+
578+
const result = await channel.httpSend('test', { data: 'test' }, { timeout: 3000 })
579+
580+
expect(result).toEqual({ success: true })
581+
expect(fetchStub).toHaveBeenCalledTimes(1)
582+
const [, options] = fetchStub.mock.calls[0]
583+
expect(options.headers.Authorization).toBe(expectedAuth)
584+
})
585+
586+
test('sends correct payload', async () => {
587+
const mockResponse = createMockResponse(202)
588+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
589+
const socket = createSocket(hasToken, fetchStub)
590+
if (hasToken) await socket.setAuth()
591+
const channel = socket.channel('topic')
592+
593+
const result = await channel.httpSend('test-payload', { data: 'value' })
594+
595+
expect(result).toEqual({ success: true })
596+
expect(fetchStub).toHaveBeenCalledTimes(1)
597+
const [, options] = fetchStub.mock.calls[0]
598+
expect(options.headers.Authorization).toBe(expectedAuth)
599+
expect(options.body).toBe(
600+
'{"messages":[{"topic":"topic","event":"test-payload","payload":{"data":"value"},"private":false}]}'
601+
)
602+
})
603+
})
604+
})
605+
606+
describe('with access token - additional scenarios', () => {
607+
test('returns success true on 202 status with private channel', async () => {
608+
const mockResponse = createMockResponse(202, 'Accepted')
609+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
610+
const socket = createSocket(true, fetchStub)
611+
await socket.setAuth()
612+
const channel = socket.channel('topic', { config: { private: true } })
613+
614+
const result = await channel.httpSend('test-explicit', { data: 'explicit' })
615+
616+
expect(result).toEqual({ success: true })
617+
const expectedUrl = testSetup.url
618+
.replace('/socket', '')
619+
.replace('wss', 'https')
620+
.concat('/api/broadcast')
621+
expect(fetchStub).toHaveBeenCalledTimes(1)
622+
const [url, options] = fetchStub.mock.calls[0]
623+
expect(url).toBe(expectedUrl)
624+
expect(options.method).toBe('POST')
625+
expect(options.headers.Authorization).toBe('Bearer token123')
626+
expect(options.headers.apikey).toBe('abc123')
627+
expect(options.body).toBe(
628+
'{"messages":[{"topic":"topic","event":"test-explicit","payload":{"data":"explicit"},"private":true}]}'
629+
)
630+
})
631+
632+
test('uses default timeout when not specified', async () => {
633+
const mockResponse = createMockResponse(202)
634+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
635+
const socket = new RealtimeClient(testSetup.url, {
636+
fetch: fetchStub,
637+
timeout: 5000,
638+
params: { apikey: 'abc123' },
639+
accessToken: () => Promise.resolve('token123'),
640+
})
641+
await socket.setAuth()
642+
const channel = socket.channel('topic')
643+
644+
const result = await channel.httpSend('test', { data: 'test' })
645+
646+
expect(result).toEqual({ success: true })
647+
expect(fetchStub).toHaveBeenCalledTimes(1)
648+
const [, options] = fetchStub.mock.calls[0]
649+
expect(options.signal).toBeDefined()
650+
})
651+
652+
test('uses statusText when error body has no error field', async () => {
653+
const mockResponse = {
654+
status: 400,
655+
statusText: 'Bad Request',
656+
headers: new Headers(),
657+
body: null,
658+
json: vi.fn().mockResolvedValue({ message: 'Invalid request' }),
659+
}
660+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
661+
const socket = createSocket(true, fetchStub)
662+
await socket.setAuth()
663+
const channel = socket.channel('topic')
664+
665+
await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow('Invalid request')
666+
})
667+
668+
test('falls back to statusText when json parsing fails', async () => {
669+
const mockResponse = {
670+
status: 503,
671+
statusText: 'Service Unavailable',
672+
headers: new Headers(),
673+
body: null,
674+
json: vi.fn().mockRejectedValue(new Error('Invalid JSON')),
675+
}
676+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
677+
const socket = createSocket(true, fetchStub)
678+
await socket.setAuth()
679+
const channel = socket.channel('topic')
680+
681+
await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow(
682+
'Service Unavailable'
683+
)
684+
})
685+
})
686+
})

0 commit comments

Comments
 (0)