@@ -65,7 +65,7 @@ function GroupKeyStore({ groupKeys = new Map() }) {
6565 } )
6666
6767 let currentGroupKeyId // current key id if any
68- let nextGroupKey // key to use next, disappears if not actually used.
68+ const nextGroupKeys = [ ] // the keys to use next, disappears if not actually used. Max queue size 2
6969
7070 store . forEach ( ( groupKey ) => {
7171 GroupKey . validate ( GroupKey . from ( groupKey ) )
@@ -79,7 +79,8 @@ function GroupKeyStore({ groupKeys = new Map() }) {
7979 const existingKey = GroupKey . from ( store . get ( groupKey . id ) )
8080 if ( ! existingKey . equals ( groupKey ) ) {
8181 throw new GroupKey . InvalidGroupKeyError (
82- `Trying to add groupKey ${ groupKey . id } but key exists & is not equivalent to new GroupKey: ${ groupKey } .`
82+ `Trying to add groupKey ${ groupKey . id } but key exists & is not equivalent to new GroupKey: ${ groupKey } .` ,
83+ groupKey
8384 )
8485 }
8586
@@ -96,29 +97,49 @@ function GroupKeyStore({ groupKeys = new Map() }) {
9697 has ( groupKeyId ) {
9798 if ( currentGroupKeyId === groupKeyId ) { return true }
9899
99- if ( nextGroupKey && nextGroupKey . id === groupKeyId ) { return true }
100+ if ( nextGroupKeys . some ( ( nextKey ) => nextKey . id === groupKeyId ) ) { return true }
100101
101102 return store . has ( groupKeyId )
102103 } ,
103104 isEmpty ( ) {
104- return ! nextGroupKey && store . size === 0
105+ return nextGroupKeys . length === 0 && store . size === 0
105106 } ,
106107 useGroupKey ( ) {
107- if ( nextGroupKey ) {
108- // next key becomes current key
109- storeKey ( nextGroupKey )
110-
111- currentGroupKeyId = nextGroupKey . id
112- nextGroupKey = undefined
113- }
114-
115- if ( ! currentGroupKeyId ) {
116- // generate & use key if none already set
117- this . rotateGroupKey ( )
118- return this . useGroupKey ( )
108+ const nextGroupKey = nextGroupKeys . pop ( )
109+ switch ( true ) {
110+ // First use of group key on this stream, no current key. Make next key current.
111+ case ! ! ( ! currentGroupKeyId && nextGroupKey ) : {
112+ storeKey ( nextGroupKey )
113+ currentGroupKeyId = nextGroupKey . id
114+ return [
115+ this . get ( currentGroupKeyId ) ,
116+ undefined ,
117+ ]
118+ }
119+ // Keep using current key (empty next)
120+ case ! ! ( currentGroupKeyId && ! nextGroupKey ) : {
121+ return [
122+ this . get ( currentGroupKeyId ) ,
123+ undefined
124+ ]
125+ }
126+ // Key changed (non-empty next). return current + next. Make next key current.
127+ case ! ! ( currentGroupKeyId && nextGroupKey ) : {
128+ storeKey ( nextGroupKey )
129+ const prevGroupKey = this . get ( currentGroupKeyId )
130+ currentGroupKeyId = nextGroupKey . id
131+ // use current key one more time
132+ return [
133+ prevGroupKey ,
134+ nextGroupKey ,
135+ ]
136+ }
137+ // Generate & use new key if none already set.
138+ default : {
139+ this . rotateGroupKey ( )
140+ return this . useGroupKey ( )
141+ }
119142 }
120-
121- return this . get ( currentGroupKeyId )
122143 } ,
123144 get ( groupKeyId ) {
124145 const groupKey = store . get ( groupKeyId )
@@ -127,7 +148,7 @@ function GroupKeyStore({ groupKeys = new Map() }) {
127148 } ,
128149 clear ( ) {
129150 currentGroupKeyId = undefined
130- nextGroupKey = undefined
151+ nextGroupKeys . length = 0
131152 return store . clear ( )
132153 } ,
133154 rotateGroupKey ( ) {
@@ -138,7 +159,14 @@ function GroupKeyStore({ groupKeys = new Map() }) {
138159 } ,
139160 setNextGroupKey ( newKey ) {
140161 GroupKey . validate ( newKey )
141- nextGroupKey = newKey
162+ nextGroupKeys . unshift ( newKey )
163+ nextGroupKeys . length = Math . min ( nextGroupKeys . length , 2 )
164+ } ,
165+ rekey ( ) {
166+ const newKey = GroupKey . generate ( )
167+ storeKey ( newKey )
168+ currentGroupKeyId = newKey . id
169+ nextGroupKeys . length = 0
142170 }
143171 }
144172}
@@ -215,7 +243,8 @@ async function PublisherKeyExhangeSubscription(client, getGroupKeyStore) {
215243 const subscriberId = streamMessage . getPublisherId ( )
216244
217245 const groupKeyStore = getGroupKeyStore ( streamId )
218- const encryptedGroupKeys = groupKeyIds . map ( ( id ) => {
246+ const isSubscriber = await client . isStreamSubscriber ( streamId , subscriberId )
247+ const encryptedGroupKeys = ! isSubscriber ? [ ] : groupKeyIds . map ( ( id ) => {
219248 const groupKey = groupKeyStore . get ( id )
220249 if ( ! groupKey ) {
221250 return null // will be filtered out
@@ -316,9 +345,17 @@ export function PublisherKeyExhange(client, { groupKeys = {} } = {}) {
316345 return ! groupKeyStore . isEmpty ( )
317346 }
318347
348+ async function rekey ( streamId ) {
349+ if ( ! enabled ) { return }
350+ const groupKeyStore = getGroupKeyStore ( streamId )
351+ groupKeyStore . rekey ( )
352+ await next ( )
353+ }
354+
319355 return {
320356 setNextGroupKey,
321357 useGroupKey,
358+ rekey,
322359 rotateGroupKey,
323360 hasAnyGroupKey,
324361 async start ( ) {
@@ -341,7 +378,7 @@ async function getGroupKeysFromStreamMessage(streamMessage, encryptionUtil) {
341378
342379async function SubscriberKeyExhangeSubscription ( client , getGroupKeyStore , encryptionUtil ) {
343380 let sub
344- async function onKeyExchangeMessage ( parsedContent , streamMessage ) {
381+ async function onKeyExchangeMessage ( _parsedContent , streamMessage ) {
345382 try {
346383 const { messageType } = streamMessage
347384 const { MESSAGE_TYPES } = StreamMessage
@@ -537,9 +574,9 @@ export function SubscriberKeyExchange(client, { groupKeys = {} } = {}) {
537574 } )
538575
539576 async function getGroupKey ( streamMessage ) {
540- if ( ! streamMessage . groupKeyId ) { return undefined }
577+ if ( ! streamMessage . groupKeyId ) { return [ ] }
541578 await next ( )
542- if ( ! enabled ) { return undefined }
579+ if ( ! enabled ) { return [ ] }
543580
544581 return getKey ( streamMessage )
545582 }
@@ -549,6 +586,12 @@ export function SubscriberKeyExchange(client, { groupKeys = {} } = {}) {
549586 enabled = true
550587 return next ( )
551588 } ,
589+ addNewKey ( streamMessage ) {
590+ if ( ! streamMessage . newGroupKey ) { return }
591+ const streamId = streamMessage . getStreamId ( )
592+ const groupKeyStore = getGroupKeyStore ( streamId )
593+ groupKeyStore . add ( streamMessage . newGroupKey )
594+ } ,
552595 async stop ( ) {
553596 enabled = false
554597 return next ( )
0 commit comments