Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit a7fa7a3

Browse files
committed
fix(keyexchange): Improve subscriber async cleanup.
1 parent 259d6d1 commit a7fa7a3

File tree

3 files changed

+31
-5
lines changed

3 files changed

+31
-5
lines changed

src/stream/KeyExchange.ts

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -536,8 +536,13 @@ export function SubscriberKeyExchange(client: StreamrClient, { groupKeys = {} }:
536536
return Promise.resolve()
537537
}
538538
const groupKeyStore = await getGroupKeyStore(streamId)
539-
if (await groupKeyStore.has(groupKeyId)) {
540-
return groupKeyStore.get(groupKeyId)
539+
540+
if (!enabled) { return Promise.resolve() }
541+
const existingGroupKey = await groupKeyStore.get(groupKeyId)
542+
if (!enabled) { return Promise.resolve() }
543+
544+
if (existingGroupKey) {
545+
return existingGroupKey
541546
}
542547

543548
if (pending.has(groupKeyId)) {
@@ -550,6 +555,7 @@ export function SubscriberKeyExchange(client: StreamrClient, { groupKeys = {} }:
550555
pending.set(groupKeyId, Defer())
551556

552557
async function processBuffer() {
558+
if (!enabled) { return }
553559
const currentBuffer = getBuffer(key)
554560
const groupKeyIds = currentBuffer.slice()
555561
currentBuffer.length = 0
@@ -559,9 +565,11 @@ export function SubscriberKeyExchange(client: StreamrClient, { groupKeys = {} }:
559565
publisherId,
560566
groupKeyIds,
561567
})
568+
if (!enabled) { return }
562569
await Promise.all(receivedGroupKeys.map(async (groupKey) => (
563570
groupKeyStore.add(groupKey)
564571
)))
572+
if (!enabled) { return }
565573
await Promise.all(groupKeyIds.map(async (id) => {
566574
if (!pending.has(id)) { return }
567575
const groupKeyTask = groupKeyStore.get(id)
@@ -590,14 +598,26 @@ export function SubscriberKeyExchange(client: StreamrClient, { groupKeys = {} }:
590598
return pending.get(groupKeyId)
591599
}
592600

601+
function cleanupPending() {
602+
Array.from(Object.entries(timeouts)).forEach(([key, value]) => {
603+
clearTimeout(value)
604+
delete timeouts[key]
605+
})
606+
const pendingValues = Array.from(pending.values())
607+
pending.clear()
608+
pendingValues.forEach((value) => {
609+
value.resolve(undefined)
610+
})
611+
pMemoize.clear(getGroupKeyStore)
612+
}
613+
593614
const next = Scaffold([
594615
async () => {
595-
return encryptionUtil.onReady()
616+
await encryptionUtil.onReady()
596617
},
597618
async () => {
598619
sub = await SubscriberKeyExhangeSubscription(client, getGroupKeyStore, encryptionUtil)
599620
return async () => {
600-
pMemoize.clear(getGroupKeyStore)
601621
if (!sub) { return }
602622
const cancelTask = sub.cancel()
603623
sub = undefined
@@ -606,6 +626,11 @@ export function SubscriberKeyExchange(client: StreamrClient, { groupKeys = {} }:
606626
}
607627
], async () => enabled, {
608628
id: `SubscriberKeyExhangeSubscription.${client.id}`,
629+
onChange(shouldUp) {
630+
if (!shouldUp) {
631+
cleanupPending()
632+
}
633+
},
609634
async onDone() {
610635
// clean up requestKey
611636
if (requestKeysStep) {

src/subscribe/Decrypt.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export default function Decrypt(client, options = {}) {
5656
}
5757

5858
return Object.assign(decrypt, {
59-
stop() {
59+
async stop() {
6060
return requestKey.stop()
6161
}
6262
})

src/subscribe/pipeline.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ export default function MessagePipeline(client, opts = {}, onFinally = async (er
129129
// custom pipeline steps
130130
...afterSteps
131131
], async (err, ...args) => {
132+
decrypt.stop()
132133
await msgStream.cancel(err)
133134
try {
134135
if (err) {

0 commit comments

Comments
 (0)