Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit c4cfb2d

Browse files
committed
Add failing multiple publishers + late subscriber test.
1 parent da847fe commit c4cfb2d

File tree

1 file changed

+215
-0
lines changed

1 file changed

+215
-0
lines changed

test/integration/MultipleClients.test.js

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,221 @@ describeRepeats('PubSub with multiple clients', () => {
512512
}, 60000)
513513
})
514514

515+
test('works with multiple publishers on one stream', async () => {
516+
const onEnd = []
517+
async function createPublisher() {
518+
const pubClient = createClient({
519+
auth: {
520+
privateKey: fakePrivateKey(),
521+
}
522+
})
523+
onEnd.push(() => pubClient.disconnect())
524+
pubClient.on('error', getOnError(errors))
525+
const pubUser = await pubClient.getUserInfo()
526+
await stream.grantPermission('stream_get', pubUser.username)
527+
await stream.grantPermission('stream_publish', pubUser.username)
528+
// needed to check last
529+
await stream.grantPermission('stream_subscribe', pubUser.username)
530+
await pubClient.session.getSessionToken()
531+
await pubClient.connect()
532+
return pubClient
533+
}
534+
535+
try {
536+
await mainClient.session.getSessionToken()
537+
await mainClient.connect()
538+
539+
otherClient = createClient({
540+
auth: {
541+
privateKey
542+
}
543+
})
544+
otherClient.on('error', getOnError(errors))
545+
await otherClient.session.getSessionToken()
546+
const otherUser = await otherClient.getUserInfo()
547+
await stream.grantPermission('stream_get', otherUser.username)
548+
await stream.grantPermission('stream_subscribe', otherUser.username)
549+
await otherClient.connect()
550+
551+
const receivedMessagesOther = {}
552+
const receivedMessagesMain = {}
553+
// subscribe to stream from other client instance
554+
await otherClient.subscribe({
555+
stream: stream.id,
556+
}, (msg, streamMessage) => {
557+
const msgs = receivedMessagesOther[streamMessage.getPublisherId()] || []
558+
msgs.push(msg)
559+
receivedMessagesOther[streamMessage.getPublisherId()] = msgs
560+
})
561+
562+
// subscribe to stream from main client instance
563+
await mainClient.subscribe({
564+
stream: stream.id,
565+
}, (msg, streamMessage) => {
566+
const msgs = receivedMessagesMain[streamMessage.getPublisherId()] || []
567+
msgs.push(msg)
568+
receivedMessagesMain[streamMessage.getPublisherId()] = msgs
569+
})
570+
571+
/* eslint-disable no-await-in-loop */
572+
const publishers = []
573+
for (let i = 0; i < 3; i++) {
574+
publishers.push(await createPublisher())
575+
}
576+
/* eslint-enable no-await-in-loop */
577+
const published = {}
578+
await Promise.all(publishers.map(async (pubClient) => {
579+
const publisherId = await pubClient.getPublisherId()
580+
const publishTestMessages = getPublishTestMessages(pubClient, {
581+
stream,
582+
waitForLast: true,
583+
})
584+
await publishTestMessages(10, {
585+
delay: 500 + Math.random() * 1500,
586+
afterEach(msg) {
587+
published[publisherId] = published[publisherId] || []
588+
published[publisherId].push(msg)
589+
}
590+
})
591+
}))
592+
593+
await wait(5000)
594+
595+
mainClient.debug('%j', {
596+
published,
597+
receivedMessagesMain,
598+
receivedMessagesOther,
599+
})
600+
601+
// eslint-disable-next-line no-inner-declarations
602+
function checkMessages(received) {
603+
for (const [key, msgs] of Object.entries(published)) {
604+
expect(received[key]).toEqual(msgs)
605+
}
606+
}
607+
608+
checkMessages(receivedMessagesMain)
609+
checkMessages(receivedMessagesOther)
610+
} finally {
611+
await Promise.all(onEnd.map((fn) => fn()))
612+
}
613+
}, 40000)
614+
615+
test('works with multiple publishers on one stream with late subscriber', async () => {
616+
const onEnd = []
617+
async function createPublisher() {
618+
const pubClient = createClient({
619+
auth: {
620+
privateKey: fakePrivateKey(),
621+
}
622+
})
623+
onEnd.push(() => pubClient.disconnect())
624+
pubClient.on('error', getOnError(errors))
625+
const pubUser = await pubClient.getUserInfo()
626+
await stream.grantPermission('stream_get', pubUser.username)
627+
await stream.grantPermission('stream_publish', pubUser.username)
628+
// needed to check last
629+
await stream.grantPermission('stream_subscribe', pubUser.username)
630+
await pubClient.session.getSessionToken()
631+
await pubClient.connect()
632+
return pubClient
633+
}
634+
635+
const published = {}
636+
function checkMessages(received) {
637+
for (const [key, msgs] of Object.entries(published)) {
638+
expect(received[key]).toEqual(msgs)
639+
}
640+
}
641+
642+
const MAX_MESSAGES = 10
643+
644+
try {
645+
await mainClient.session.getSessionToken()
646+
await mainClient.connect()
647+
648+
otherClient = createClient({
649+
auth: {
650+
privateKey
651+
}
652+
})
653+
otherClient.on('error', getOnError(errors))
654+
await otherClient.session.getSessionToken()
655+
const otherUser = await otherClient.getUserInfo()
656+
await stream.grantPermission('stream_get', otherUser.username)
657+
await stream.grantPermission('stream_subscribe', otherUser.username)
658+
await otherClient.connect()
659+
660+
const receivedMessagesOther = {}
661+
const receivedMessagesMain = {}
662+
663+
// subscribe to stream from main client instance
664+
const mainSub = await mainClient.subscribe({
665+
stream: stream.id,
666+
}, (msg, streamMessage) => {
667+
const msgs = receivedMessagesMain[streamMessage.getPublisherId()] || []
668+
msgs.push(msg)
669+
receivedMessagesMain[streamMessage.getPublisherId()] = msgs
670+
if (Object.values(receivedMessagesMain).every((m) => m.length === MAX_MESSAGES)) {
671+
mainSub.end()
672+
}
673+
})
674+
675+
/* eslint-disable no-await-in-loop */
676+
const publishers = []
677+
for (let i = 0; i < 3; i++) {
678+
publishers.push(await createPublisher())
679+
}
680+
681+
let counter = 0
682+
/* eslint-enable no-await-in-loop */
683+
await Promise.all(publishers.map(async (pubClient) => {
684+
const publisherId = await pubClient.getPublisherId()
685+
const publishTestMessages = getPublishTestMessages(pubClient, {
686+
stream,
687+
waitForLast: true,
688+
})
689+
await publishTestMessages(MAX_MESSAGES, {
690+
delay: 500 + Math.random() * 1500,
691+
async afterEach(pubMsg) {
692+
published[publisherId] = published[publisherId] || []
693+
published[publisherId].push(pubMsg)
694+
counter += 1
695+
if (counter === 3) {
696+
// late subscribe to stream from other client instance
697+
const otherSub = await otherClient.subscribe({
698+
stream: stream.id,
699+
resend: {
700+
last: 1000,
701+
}
702+
}, (msg, streamMessage) => {
703+
const msgs = receivedMessagesOther[streamMessage.getPublisherId()] || []
704+
msgs.push(msg)
705+
receivedMessagesOther[streamMessage.getPublisherId()] = msgs
706+
if (msgs.length === MAX_MESSAGES) {
707+
return otherSub.end()
708+
}
709+
})
710+
}
711+
}
712+
})
713+
}))
714+
715+
await wait(15000)
716+
717+
mainClient.debug('%j', {
718+
published,
719+
receivedMessagesMain,
720+
receivedMessagesOther,
721+
})
722+
723+
checkMessages(receivedMessagesMain)
724+
checkMessages(receivedMessagesOther)
725+
} finally {
726+
await Promise.all(onEnd.map((fn) => fn()))
727+
}
728+
}, 60000)
729+
515730
test('disconnecting one client does not disconnect the other', async () => {
516731
otherClient = createClient({
517732
id: 'other',

0 commit comments

Comments
 (0)