@@ -419,9 +419,10 @@ describeRepeats('PubSub with multiple clients', () => {
419419 const mainSub = await mainClient . subscribe ( {
420420 stream : stream . id ,
421421 } , ( msg , streamMessage ) => {
422- const msgs = receivedMessagesMain [ streamMessage . getPublisherId ( ) . toLowerCase ( ) ] || [ ]
422+ const key = streamMessage . getPublisherId ( ) . toLowerCase ( )
423+ const msgs = receivedMessagesMain [ key ] || [ ]
423424 msgs . push ( msg )
424- receivedMessagesMain [ streamMessage . getPublisherId ( ) . toLowerCase ( ) ] = msgs
425+ receivedMessagesMain [ key ] = msgs
425426 if ( Object . values ( receivedMessagesMain ) . every ( ( m ) => m . length === MAX_MESSAGES ) ) {
426427 mainSub . unsubscribe ( )
427428 }
@@ -469,9 +470,10 @@ describeRepeats('PubSub with multiple clients', () => {
469470 last : 1000 ,
470471 }
471472 } , ( msg , streamMessage ) => {
472- const msgs = receivedMessagesOther [ streamMessage . getPublisherId ( ) . toLowerCase ( ) ] || [ ]
473+ const key = streamMessage . getPublisherId ( ) . toLowerCase ( )
474+ const msgs = receivedMessagesOther [ key ] || [ ]
473475 msgs . push ( msg )
474- receivedMessagesOther [ streamMessage . getPublisherId ( ) . toLowerCase ( ) ] = msgs
476+ receivedMessagesOther [ key ] = msgs
475477 } )
476478
477479 addAfter ( async ( ) => {
@@ -499,215 +501,193 @@ describeRepeats('PubSub with multiple clients', () => {
499501 } catch ( err ) {
500502 return false
501503 }
502- } , 15000 ) . catch ( ( err ) => {
504+ } , 15000 , 300 ) . catch ( ( err ) => {
503505 // convert timeout to actual error
504506 checkMessages ( published , receivedMessagesMain )
505507 checkMessages ( published , receivedMessagesOther )
506508 throw err
507509 } )
508-
509- checkMessages ( published , receivedMessagesMain )
510- checkMessages ( published , receivedMessagesOther )
511510 } , 60000 )
512511 } )
513512
514513 test ( 'works with multiple publishers on one stream' , async ( ) => {
515- const onEnd = [ ]
516- async function createPublisher ( ) {
517- const pubClient = createClient ( {
518- auth : {
519- privateKey : fakePrivateKey ( ) ,
520- }
521- } )
522- onEnd . push ( ( ) => pubClient . disconnect ( ) )
523- pubClient . on ( 'error' , getOnError ( errors ) )
524- const pubUser = await pubClient . getUserInfo ( )
525- await stream . grantPermission ( 'stream_get' , pubUser . username )
526- await stream . grantPermission ( 'stream_publish' , pubUser . username )
527- // needed to check last
528- await stream . grantPermission ( 'stream_subscribe' , pubUser . username )
529- await pubClient . session . getSessionToken ( )
530- await pubClient . connect ( )
531- return pubClient
532- }
533-
534- try {
535- await mainClient . session . getSessionToken ( )
536- await mainClient . connect ( )
514+ await mainClient . session . getSessionToken ( )
515+ await mainClient . connect ( )
537516
538- otherClient = createClient ( {
539- auth : {
540- privateKey
541- }
542- } )
543- otherClient . on ( 'error' , getOnError ( errors ) )
544- await otherClient . session . getSessionToken ( )
545- const otherUser = await otherClient . getUserInfo ( )
546- await stream . grantPermission ( 'stream_get' , otherUser . username )
547- await stream . grantPermission ( 'stream_subscribe' , otherUser . username )
548- await otherClient . connect ( )
517+ otherClient = createClient ( {
518+ auth : {
519+ privateKey
520+ }
521+ } )
522+ otherClient . on ( 'error' , getOnError ( errors ) )
523+ await otherClient . session . getSessionToken ( )
524+ const otherUser = await otherClient . getUserInfo ( )
525+ await stream . grantPermission ( 'stream_get' , otherUser . username )
526+ await stream . grantPermission ( 'stream_subscribe' , otherUser . username )
527+ await otherClient . connect ( )
549528
550- const receivedMessagesOther = { }
551- const receivedMessagesMain = { }
552- // subscribe to stream from other client instance
553- await otherClient . subscribe ( {
554- stream : stream . id ,
555- } , ( msg , streamMessage ) => {
556- const msgs = receivedMessagesOther [ streamMessage . getPublisherId ( ) ] || [ ]
557- msgs . push ( msg )
558- receivedMessagesOther [ streamMessage . getPublisherId ( ) ] = msgs
559- } )
529+ const receivedMessagesOther = { }
530+ const receivedMessagesMain = { }
531+ // subscribe to stream from other client instance
532+ await otherClient . subscribe ( {
533+ stream : stream . id ,
534+ } , ( msg , streamMessage ) => {
535+ const key = streamMessage . getPublisherId ( ) . toLowerCase ( )
536+ const msgs = receivedMessagesOther [ key ] || [ ]
537+ msgs . push ( msg )
538+ receivedMessagesOther [ key ] = msgs
539+ } )
560540
561- // subscribe to stream from main client instance
562- await mainClient . subscribe ( {
563- stream : stream . id ,
564- } , ( msg , streamMessage ) => {
565- const msgs = receivedMessagesMain [ streamMessage . getPublisherId ( ) ] || [ ]
566- msgs . push ( msg )
567- receivedMessagesMain [ streamMessage . getPublisherId ( ) ] = msgs
568- } )
541+ // subscribe to stream from main client instance
542+ await mainClient . subscribe ( {
543+ stream : stream . id ,
544+ } , ( msg , streamMessage ) => {
545+ const key = streamMessage . getPublisherId ( ) . toLowerCase ( )
546+ const msgs = receivedMessagesMain [ key ] || [ ]
547+ msgs . push ( msg )
548+ receivedMessagesMain [ key ] = msgs
549+ } )
569550
570- /* eslint-disable no-await-in-loop */
571- const publishers = [ ]
572- for ( let i = 0 ; i < 3 ; i ++ ) {
573- publishers . push ( await createPublisher ( ) )
574- }
575- /* eslint-enable no-await-in-loop */
576- const published = { }
577- await Promise . all ( publishers . map ( async ( pubClient ) => {
578- const publisherId = ( await pubClient . getPublisherId ( ) ) . toLowerCase ( )
579- const publishTestMessages = getPublishTestMessages ( pubClient , {
580- stream,
581- waitForLast : true ,
582- } )
583- await publishTestMessages ( 10 , {
584- delay : 500 + Math . random ( ) * 1500 ,
585- afterEach ( msg ) {
586- published [ publisherId ] = published [ publisherId ] || [ ]
587- published [ publisherId ] . push ( msg )
588- }
589- } )
590- } ) )
551+ /* eslint-disable no-await-in-loop */
552+ const publishers = [ ]
553+ for ( let i = 0 ; i < 3 ; i ++ ) {
554+ publishers . push ( await createPublisher ( ) )
555+ }
591556
592- // eslint-disable-next-line no-inner-declarations
593- function checkMessages ( received ) {
594- for ( const [ key , msgs ] of Object . entries ( published ) ) {
595- expect ( received [ key ] ) . toEqual ( msgs )
557+ /* eslint-enable no-await-in-loop */
558+ const published = { }
559+ await Promise . all ( publishers . map ( async ( pubClient ) => {
560+ const publisherId = ( await pubClient . getPublisherId ( ) ) . toLowerCase ( )
561+ const publishTestMessages = getPublishTestMessages ( pubClient , {
562+ stream,
563+ waitForLast : true ,
564+ } )
565+ await publishTestMessages ( 10 , {
566+ delay : 500 + Math . random ( ) * 1500 ,
567+ afterEach ( msg ) {
568+ published [ publisherId ] = published [ publisherId ] || [ ]
569+ published [ publisherId ] . push ( msg )
596570 }
571+ } )
572+ } ) )
573+
574+ await waitForCondition ( ( ) => {
575+ try {
576+ checkMessages ( published , receivedMessagesMain )
577+ checkMessages ( published , receivedMessagesOther )
578+ return true
579+ } catch ( err ) {
580+ return false
597581 }
582+ } , 5000 ) . catch ( ( ) => {
583+ checkMessages ( published , receivedMessagesMain )
584+ checkMessages ( published , receivedMessagesOther )
585+ } )
598586
599- checkMessages ( receivedMessagesMain )
600- checkMessages ( receivedMessagesOther )
601- } finally {
602- await Promise . all ( onEnd . map ( ( fn ) => fn ( ) ) )
603- }
604587 } , 40000 )
605588
606589 test ( 'works with multiple publishers on one stream with late subscriber' , async ( ) => {
607- const onEnd = [ ]
608- async function createPublisher ( ) {
609- const pubClient = createClient ( {
610- auth : {
611- privateKey : fakePrivateKey ( ) ,
612- }
613- } )
614- onEnd . push ( ( ) => pubClient . disconnect ( ) )
615- pubClient . on ( 'error' , getOnError ( errors ) )
616- const pubUser = await pubClient . getUserInfo ( )
617- await stream . grantPermission ( 'stream_get' , pubUser . username )
618- await stream . grantPermission ( 'stream_publish' , pubUser . username )
619- // needed to check last
620- await stream . grantPermission ( 'stream_subscribe' , pubUser . username )
621- await pubClient . session . getSessionToken ( )
622- await pubClient . connect ( )
623- return pubClient
624- }
625-
626590 const published = { }
627- function checkMessages ( received ) {
628- for ( const [ key , msgs ] of Object . entries ( published ) ) {
629- expect ( received [ key ] ) . toEqual ( msgs )
630- }
631- }
591+ await mainClient . session . getSessionToken ( )
592+ await mainClient . connect ( )
632593
633- try {
634- await mainClient . session . getSessionToken ( )
635- await mainClient . connect ( )
594+ otherClient = createClient ( {
595+ auth : {
596+ privateKey
597+ }
598+ } )
599+ otherClient . on ( 'error' , getOnError ( errors ) )
600+ await otherClient . session . getSessionToken ( )
601+ const otherUser = await otherClient . getUserInfo ( )
602+ await stream . grantPermission ( 'stream_get' , otherUser . username )
603+ await stream . grantPermission ( 'stream_subscribe' , otherUser . username )
604+ await otherClient . connect ( )
636605
637- otherClient = createClient ( {
638- auth : {
639- privateKey
640- }
641- } )
642- otherClient . on ( 'error' , getOnError ( errors ) )
643- await otherClient . session . getSessionToken ( )
644- const otherUser = await otherClient . getUserInfo ( )
645- await stream . grantPermission ( 'stream_get' , otherUser . username )
646- await stream . grantPermission ( 'stream_subscribe' , otherUser . username )
647- await otherClient . connect ( )
606+ const receivedMessagesOther = { }
607+ const receivedMessagesMain = { }
608+
609+ // subscribe to stream from main client instance
610+ const mainSub = await mainClient . subscribe ( {
611+ stream : stream . id ,
612+ } , ( msg , streamMessage ) => {
613+ const key = streamMessage . getPublisherId ( ) . toLowerCase ( )
614+ const msgs = receivedMessagesMain [ key ] || [ ]
615+ msgs . push ( msg )
616+ receivedMessagesMain [ key ] = msgs
617+ if ( Object . values ( receivedMessagesMain ) . every ( ( m ) => m . length === MAX_MESSAGES ) ) {
618+ mainSub . cancel ( )
619+ }
620+ } )
648621
649- const receivedMessagesOther = { }
650- const receivedMessagesMain = { }
622+ /* eslint-disable no-await-in-loop */
623+ const publishers = [ ]
624+ for ( let i = 0 ; i < 3 ; i ++ ) {
625+ publishers . push ( await createPublisher ( ) )
626+ }
651627
652- // subscribe to stream from main client instance
653- const mainSub = await mainClient . subscribe ( {
654- stream : stream . id ,
655- } , ( msg , streamMessage ) => {
656- const msgs = receivedMessagesMain [ streamMessage . getPublisherId ( ) ] || [ ]
657- msgs . push ( msg )
658- receivedMessagesMain [ streamMessage . getPublisherId ( ) ] = msgs
659- if ( Object . values ( receivedMessagesMain ) . every ( ( m ) => m . length === MAX_MESSAGES ) ) {
660- mainSub . cancel ( )
661- }
628+ let counter = 0
629+ /* eslint-enable no-await-in-loop */
630+ await Promise . all ( publishers . map ( async ( pubClient ) => {
631+ const waitForStorage = getWaitForStorage ( pubClient , {
632+ stream,
633+ timeout : 10000 ,
634+ count : MAX_MESSAGES * publishers . length ,
662635 } )
663636
664- /* eslint-disable no-await-in-loop */
665- const publishers = [ ]
666- for ( let i = 0 ; i < 3 ; i ++ ) {
667- publishers . push ( await createPublisher ( ) )
668- }
637+ const publisherId = ( await pubClient . getPublisherId ( ) ) . toString ( ) . toLowerCase ( )
638+ const publishTestMessages = getPublishTestMessages ( pubClient , {
639+ stream,
640+ waitForLast : true ,
641+ waitForLastTimeout : 10000 ,
642+ waitForLastCount : MAX_MESSAGES * publishers . length ,
643+ delay : 500 + Math . random ( ) * 1500 ,
644+ } )
669645
670- let counter = 0
671- /* eslint-enable no-await-in-loop */
672- await Promise . all ( publishers . map ( async ( pubClient ) => {
673- const publisherId = ( await pubClient . getPublisherId ( ) ) . toString ( )
674- const publishTestMessages = getPublishTestMessages ( pubClient , {
675- stream,
676- waitForLast : true ,
677- } )
678- await publishTestMessages ( MAX_MESSAGES , {
679- delay : 500 + Math . random ( ) * 1500 ,
680- async afterEach ( pubMsg ) {
681- published [ publisherId ] = published [ publisherId ] || [ ]
682- published [ publisherId ] . push ( pubMsg )
683- counter += 1
684- if ( counter === 3 ) {
685- // late subscribe to stream from other client instance
686- const otherSub = await otherClient . subscribe ( {
687- stream : stream . id ,
688- resend : {
689- last : 1000 ,
690- }
691- } , async ( msg , streamMessage ) => {
692- const msgs = receivedMessagesOther [ streamMessage . getPublisherId ( ) ] || [ ]
693- msgs . push ( msg )
694- receivedMessagesOther [ streamMessage . getPublisherId ( ) ] = msgs
695- if ( msgs . length === MAX_MESSAGES ) {
696- await otherSub . cancel ( )
697- }
698- } )
699- }
646+ async function addLateSubscriber ( ) {
647+ // late subscribe to stream from other client instance
648+ const lateSub = await otherClient . subscribe ( {
649+ stream : stream . id ,
650+ resend : {
651+ last : 1000 ,
700652 }
653+ } , ( msg , streamMessage ) => {
654+ const key = streamMessage . getPublisherId ( ) . toLowerCase ( )
655+ const msgs = receivedMessagesOther [ key ] || [ ]
656+ msgs . push ( msg )
657+ receivedMessagesOther [ key ] = msgs
701658 } )
702- } ) )
703659
704- await wait ( 15000 )
660+ addAfter ( async ( ) => {
661+ await lateSub . unsubscribe ( )
662+ } )
663+ }
705664
706- checkMessages ( receivedMessagesMain )
707- checkMessages ( receivedMessagesOther )
708- } finally {
709- await Promise . all ( onEnd . map ( ( fn ) => fn ( ) ) )
710- }
665+ await publishTestMessages ( MAX_MESSAGES , {
666+ async afterEach ( pubMsg , req ) {
667+ published [ publisherId ] = published [ publisherId ] || [ ]
668+ published [ publisherId ] . push ( pubMsg )
669+ counter += 1
670+ if ( counter === 3 ) {
671+ await waitForStorage ( req ) // make sure lastest message has hit storage
672+ // late subscribe to stream from other client instance
673+ await addLateSubscriber ( )
674+ }
675+ }
676+ } )
677+ } ) )
678+
679+ await waitForCondition ( ( ) => {
680+ try {
681+ checkMessages ( published , receivedMessagesMain )
682+ checkMessages ( published , receivedMessagesOther )
683+ return true
684+ } catch ( err ) {
685+ return false
686+ }
687+ } , 15000 , 300 ) . catch ( ( ) => {
688+ checkMessages ( published , receivedMessagesMain )
689+ checkMessages ( published , receivedMessagesOther )
690+ } )
711691 } , 60000 )
712692
713693 test ( 'disconnecting one client does not disconnect the other' , async ( ) => {
0 commit comments