Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit 9f849e8

Browse files
committed
test: Passing integration/Encryption.test.js with revoke tests.
Cleaned up, but still WIP encryption/subscription error handling. Tracking down unhandled error. Fix unhandled rejection in keyexchange. Improve subscription error handling. Update Encryption tests. Passing integration/Encryption.test.js with revoke tests. Fix publish/Encrypt typing.
1 parent c4cfb2d commit 9f849e8

File tree

5 files changed

+201
-18
lines changed

5 files changed

+201
-18
lines changed

src/publish/index.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ function getCreateStreamMessage(client) {
189189
rotateGroupKey(maybeStreamId) {
190190
return encrypt.rotateGroupKey(maybeStreamId)
191191
},
192+
startKeyExchange() {
193+
return encrypt.start()
194+
},
192195
rekey(maybeStreamId) {
193196
return encrypt.rekey(maybeStreamId)
194197
},

src/subscribe/index.ts

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ export class Subscription extends Emitter {
5050
this.client = client
5151
this.options = validateOptions(opts)
5252
this.key = this.options.key
53-
this.id = counterId(`Subscription.${this.options.id || ''}${this.key}`)
53+
this.id = counterId(`Subscription:${this.options.id || ''}${this.key}`)
54+
this.debug = client.debug.extend(this.id)
55+
this.debug('create')
5456
this.streamId = this.options.streamId
5557
this.streamPartition = this.options.streamPartition
5658

@@ -75,14 +77,20 @@ export class Subscription extends Emitter {
7577
if (event !== 'error') {
7678
return super.emit(event, ...args)
7779
}
80+
const [error] = args
7881

82+
if (!this.listenerCount('error')) {
83+
this.debug('emitting error but no error listeners, cancelling subscription', error)
84+
this.cancel(error)
85+
return false
86+
}
7987
try {
80-
if (this.listenerCount('error')) {
81-
// debugger
82-
return super.emit('error', ...args)
83-
}
84-
throw args[0]
88+
this.debug('emit error', error)
89+
return super.emit('error', ...args)
8590
} catch (err) {
91+
if (err !== error) {
92+
this.debug('error emitting error!', err)
93+
}
8694
this.cancel(err)
8795
return false
8896
}
@@ -94,6 +102,7 @@ export class Subscription extends Emitter {
94102
*/
95103

96104
async onPipelineEnd(err?: Error) {
105+
this.debug('onPipelineEnd', err)
97106
let error = err
98107
try {
99108
await this._onFinally(error)
@@ -205,6 +214,9 @@ class SubscriptionSession extends Emitter {
205214

206215
this.subscriptions = new Set() // active subs
207216
this.deletedSubscriptions = new Set() // hold so we can clean up
217+
this.id = counterId(`SubscriptionSession:${this.options.id || ''}${this.options.key}`)
218+
this.debug = this.client.debug.extend(this.id)
219+
this.debug('create')
208220
this._init()
209221
}
210222

@@ -322,6 +334,12 @@ class SubscriptionSession extends Emitter {
322334

323335
emit(...args: Todo[]) {
324336
const subs = this._getSubs()
337+
if (args[0] === 'error') {
338+
this.debug(args[0], args[1])
339+
} else {
340+
this.debug(args[0])
341+
}
342+
325343
try {
326344
multiEmit(subs, ...args)
327345
} catch (error) {
@@ -345,6 +363,7 @@ class SubscriptionSession extends Emitter {
345363

346364
async add(sub: Todo) {
347365
this.subscriptions.add(sub)
366+
this.debug('add', sub && sub.id)
348367
const { connection } = this.client
349368
await connection.addHandle(`adding${sub.id}`)
350369
try {
@@ -366,11 +385,18 @@ class SubscriptionSession extends Emitter {
366385
return
367386
}
368387

388+
if (this.subscriptions.has(sub)) {
389+
this.debug('remove', sub && sub.id)
390+
}
391+
369392
const cancelTask = sub.cancel()
370-
this.subscriptions.delete(sub)
371-
this.deletedSubscriptions.add(sub)
372-
await this.step()
373-
await cancelTask
393+
try {
394+
this.subscriptions.delete(sub)
395+
this.deletedSubscriptions.add(sub)
396+
await this.step()
397+
} finally {
398+
await cancelTask
399+
}
374400
}
375401

376402
/**

src/subscribe/pipeline.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export default function MessagePipeline(client, opts = {}, onFinally = async (er
3939
const seenErrors = new WeakSet()
4040
const onErrorFn = options.onError ? options.onError : (error) => { throw error }
4141
const onError = async (err) => {
42+
// don't handle same error multiple times
4243
if (seenErrors.has(err)) {
4344
return
4445
}

test/integration/Encryption.test.js

Lines changed: 158 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -619,17 +619,172 @@ describe('decryption', () => {
619619

620620
describe('revoking permissions', () => {
621621
let client2
622+
622623
beforeEach(async () => {
623-
client2 = createClient()
624+
client2 = createClient({ id: 'subscriber' })
624625
await client2.connect()
625626
await client2.session.getSessionToken()
627+
client2.debug('new SUBSCRIBER')
626628
})
627629

628630
afterEach(async () => {
629631
await client2.disconnect()
630632
})
631633

632-
it('fails gracefully if permission revoked', async () => {
634+
it('fails gracefully if permission revoked with low cache maxAge', async () => {
635+
await client.disconnect()
636+
await setupClient({
637+
id: 'publisher',
638+
cache: {
639+
maxAge: 1,
640+
}
641+
})
642+
643+
const MAX_MESSAGES = 6
644+
await client.rotateGroupKey(stream.id)
645+
646+
const p1 = await stream.grantPermission('stream_get', client2.getPublisherId())
647+
const p2 = await stream.grantPermission('stream_subscribe', client2.getPublisherId())
648+
649+
const sub = await client2.subscribe({
650+
stream: stream.id,
651+
})
652+
653+
const errs = []
654+
const onSubError = jest.fn((err) => {
655+
errs.push(err)
656+
throw err
657+
})
658+
659+
sub.on('error', onSubError)
660+
661+
const received = []
662+
// Publish after subscribed
663+
let count = 0
664+
const REVOKE_AFTER = 3
665+
const gotMessages = Defer()
666+
// do publish in background otherwise permission is revoked before subscriber starts processing
667+
const publishTask = publishTestMessages(MAX_MESSAGES, {
668+
timestamp: 1111111,
669+
async afterEach() {
670+
count += 1
671+
if (count === REVOKE_AFTER) {
672+
await gotMessages
673+
await stream.revokePermission(p1.id)
674+
await stream.revokePermission(p2.id)
675+
await client.rekey(stream.id)
676+
}
677+
}
678+
})
679+
let t
680+
await expect(async () => {
681+
for await (const m of sub) {
682+
received.push(m.getParsedContent())
683+
if (received.length === REVOKE_AFTER) {
684+
gotMessages.resolve()
685+
clearTimeout(t)
686+
t = setTimeout(() => {
687+
sub.cancel()
688+
}, 10000)
689+
}
690+
691+
if (received.length === MAX_MESSAGES) {
692+
clearTimeout(t)
693+
break
694+
}
695+
}
696+
}).rejects.toThrow('not a subscriber')
697+
clearTimeout(t)
698+
const published = await publishTask
699+
700+
expect(received).toEqual([
701+
...published.slice(0, REVOKE_AFTER),
702+
])
703+
704+
expect(onSubError).toHaveBeenCalledTimes(1)
705+
})
706+
707+
it('fails gracefully if permission revoked with low cache maxAge fail first message', async () => {
708+
await client.disconnect()
709+
await setupClient({
710+
id: 'publisher',
711+
cache: {
712+
maxAge: 1,
713+
}
714+
})
715+
const MAX_MESSAGES = 3
716+
await client.rotateGroupKey(stream.id)
717+
718+
const p1 = await stream.grantPermission('stream_get', client2.getPublisherId())
719+
const p2 = await stream.grantPermission('stream_subscribe', client2.getPublisherId())
720+
721+
const sub = await client2.subscribe({
722+
stream: stream.id,
723+
})
724+
sub.debug('sub', sub.id)
725+
726+
const errs = []
727+
const onSubError = jest.fn((err) => {
728+
errs.push(err)
729+
throw err
730+
})
731+
732+
sub.on('error', onSubError)
733+
734+
const received = []
735+
// Publish after subscribed
736+
let count = 0
737+
const REVOKE_AFTER = 1
738+
const gotMessages = Defer()
739+
// do publish in background otherwise permission is revoked before subscriber starts processing
740+
const publishTask = publishTestMessages(MAX_MESSAGES, {
741+
timestamp: 1111111,
742+
async afterEach() {
743+
count += 1
744+
if (count === REVOKE_AFTER) {
745+
await gotMessages
746+
await stream.revokePermission(p1.id)
747+
await stream.revokePermission(p2.id)
748+
await client.rekey(stream.id)
749+
}
750+
}
751+
})
752+
let t
753+
await expect(async () => {
754+
for await (const m of sub) {
755+
received.push(m.getParsedContent())
756+
if (received.length === REVOKE_AFTER) {
757+
gotMessages.resolve()
758+
clearTimeout(t)
759+
t = setTimeout(() => {
760+
sub.cancel()
761+
}, 10000)
762+
}
763+
764+
if (received.length === MAX_MESSAGES) {
765+
clearTimeout(t)
766+
break
767+
}
768+
}
769+
}).rejects.toThrow('not a subscriber')
770+
clearTimeout(t)
771+
const published = await publishTask
772+
773+
expect(received).toEqual([
774+
...published.slice(0, REVOKE_AFTER),
775+
])
776+
777+
expect(onSubError).toHaveBeenCalledTimes(1)
778+
})
779+
780+
it('fails gracefully if permission revoked with high cache maxAge', async () => {
781+
await client.disconnect()
782+
await setupClient({
783+
cache: {
784+
maxAge: 99999999,
785+
}
786+
})
787+
633788
const MAX_MESSAGES = 10
634789
await client.rotateGroupKey(stream.id)
635790

@@ -692,7 +847,7 @@ describe('decryption', () => {
692847
...published.slice(0, REVOKE_AFTER),
693848
])
694849

695-
expect(onSubError).toHaveBeenCalledTimes(MAX_MESSAGES - REVOKE_AFTER + 1) // + 1 for final err
850+
expect(onSubError).toHaveBeenCalledTimes(1)
696851
})
697852
})
698853
})

test/integration/MultipleClients.test.js

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -639,8 +639,6 @@ describeRepeats('PubSub with multiple clients', () => {
639639
}
640640
}
641641

642-
const MAX_MESSAGES = 10
643-
644642
try {
645643
await mainClient.session.getSessionToken()
646644
await mainClient.connect()
@@ -681,7 +679,7 @@ describeRepeats('PubSub with multiple clients', () => {
681679
let counter = 0
682680
/* eslint-enable no-await-in-loop */
683681
await Promise.all(publishers.map(async (pubClient) => {
684-
const publisherId = await pubClient.getPublisherId()
682+
const publisherId = pubClient.getPublisherId()
685683
const publishTestMessages = getPublishTestMessages(pubClient, {
686684
stream,
687685
waitForLast: true,
@@ -699,12 +697,12 @@ describeRepeats('PubSub with multiple clients', () => {
699697
resend: {
700698
last: 1000,
701699
}
702-
}, (msg, streamMessage) => {
700+
}, async (msg, streamMessage) => {
703701
const msgs = receivedMessagesOther[streamMessage.getPublisherId()] || []
704702
msgs.push(msg)
705703
receivedMessagesOther[streamMessage.getPublisherId()] = msgs
706704
if (msgs.length === MAX_MESSAGES) {
707-
return otherSub.end()
705+
await otherSub.end()
708706
}
709707
})
710708
}

0 commit comments

Comments
 (0)