Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit cc23d7d

Browse files
committed
refactor: Simplify key exchange subscriber by removing buffering & pending promise de-duplication.
1 parent c43250f commit cc23d7d

File tree

1 file changed

+43
-167
lines changed

1 file changed

+43
-167
lines changed

src/stream/encryption/KeyExchangeSubscriber.ts

Lines changed: 43 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ import {
33
} from 'streamr-client-protocol'
44
import pMemoize from 'p-memoize'
55
import { uuid, Defer } from '../../utils'
6-
import Scaffold from '../../utils/Scaffold'
7-
import mem from 'mem'
86

97
import { validateOptions } from '../utils'
108
import EncryptionUtil, { GroupKey } from './Encryption'
@@ -22,7 +20,7 @@ import {
2220
type MessageMatch = (content: any, streamMessage: StreamMessage) => boolean
2321

2422
function waitForSubMessage(sub: Subscription, matchFn: MessageMatch) {
25-
const task = Defer()
23+
const task = Defer<StreamMessage | undefined>()
2624
const onMessage = (content: any, streamMessage: StreamMessage) => {
2725
try {
2826
if (matchFn(content, streamMessage)) {
@@ -85,12 +83,7 @@ export class SubscriberKeyExchange {
8583
client
8684
initialGroupKeys
8785
encryptionUtil
88-
pending = new Map<GroupKeyId, ReturnType<typeof Defer>>()
89-
getBuffer = mem<(groupKeyId: GroupKeyId) => GroupKeyId[], [string]>(() => [])
90-
timeouts: Record<string, ReturnType<typeof setTimeout>> = Object.create(null)
91-
next
9286
enabled = true
93-
sub?: Subscription
9487

9588
constructor(client: StreamrClient, { groupKeys = {} }: KeyExhangeOptions = {}) {
9689
this.client = client
@@ -102,80 +95,51 @@ export class SubscriberKeyExchange {
10295
}
10396
})
10497
this.encryptionUtil = new EncryptionUtil(client.options.keyExchange)
105-
this.next = this.initNext()
98+
}
99+
100+
async getSubscription() {
101+
return SubscriberKeyExhangeSubscription(this.client, this.getGroupKeyStore, this.encryptionUtil)
106102
}
107103

108104
async requestKeys({ streamId, publisherId, groupKeyIds }: {
109105
streamId: string,
110106
publisherId: string,
111107
groupKeyIds: GroupKeyId[]
112108
}) {
113-
let done = false
114109
const requestId = uuid('GroupKeyRequest')
115110
const rsaPublicKey = this.encryptionUtil.getPublicKey()
116111
const keyExchangeStreamId = getKeyExchangeStreamId(publisherId)
117-
let responseTask: ReturnType<typeof Defer>
118-
let cancelTask: ReturnType<typeof Defer>
119-
let receivedGroupKeys: GroupKey[] = []
120-
let response: any
121-
122-
this.requestKeysStep = Scaffold([
123-
async () => {
124-
if (!this.sub) { throw new Error('no subscription') }
125-
cancelTask = Defer()
126-
responseTask = waitForSubMessage(this.sub, (content, streamMessage) => {
127-
const { messageType } = streamMessage
128-
const matchesMessageType = (
129-
messageType === StreamMessage.MESSAGE_TYPES.GROUP_KEY_RESPONSE
130-
|| messageType === StreamMessage.MESSAGE_TYPES.GROUP_KEY_ERROR_RESPONSE
131-
)
132-
133-
if (!matchesMessageType) {
134-
return false
135-
}
136-
137-
const groupKeyResponse = GroupKeyResponse.fromArray(content)
138-
return groupKeyResponse.requestId === requestId
139-
})
140-
141-
cancelTask.then(responseTask.resolve).catch(responseTask.reject)
142-
return () => {
143-
cancelTask.resolve(undefined)
144-
}
145-
}, async () => {
146-
const msg = new GroupKeyRequest({
147-
streamId,
148-
requestId,
149-
rsaPublicKey,
150-
groupKeyIds,
151-
})
152-
await this.client.publish(keyExchangeStreamId, msg)
153-
}, async () => {
154-
response = await responseTask
155-
return () => {
156-
response = undefined
112+
let sub!: Subscription
113+
try {
114+
sub = await this.getSubscription()
115+
const responseTask = waitForSubMessage(sub, (content, streamMessage) => {
116+
const { messageType } = streamMessage
117+
const matchesMessageType = (
118+
messageType === StreamMessage.MESSAGE_TYPES.GROUP_KEY_RESPONSE
119+
|| messageType === StreamMessage.MESSAGE_TYPES.GROUP_KEY_ERROR_RESPONSE
120+
)
121+
122+
if (!matchesMessageType) {
123+
return false
157124
}
158-
}, async () => {
159-
receivedGroupKeys = response ? await getGroupKeysFromStreamMessage(response, this.encryptionUtil) : []
160125

161-
return () => {
162-
receivedGroupKeys = []
163-
}
164-
},
165-
], async () => this.enabled && !done, {
166-
id: `requestKeys.${requestId}`,
167-
onChange(isGoingUp) {
168-
if (!isGoingUp && cancelTask) {
169-
cancelTask.resolve(undefined)
170-
}
126+
const groupKeyResponse = GroupKeyResponse.fromArray(content)
127+
return groupKeyResponse.requestId === requestId
128+
})
129+
const msg = new GroupKeyRequest({
130+
streamId,
131+
requestId,
132+
rsaPublicKey,
133+
groupKeyIds,
134+
})
135+
await this.client.publish(keyExchangeStreamId, msg)
136+
const response = await responseTask
137+
return response ? getGroupKeysFromStreamMessage(response, this.encryptionUtil) : []
138+
} finally {
139+
if (sub) {
140+
await sub.unsubscribe()
171141
}
172-
})
173-
174-
await this.requestKeysStep()
175-
const keys = receivedGroupKeys.slice()
176-
done = true
177-
await this.requestKeysStep()
178-
return keys
142+
}
179143
}
180144

181145
async getGroupKeyStore(streamId: string) {
@@ -187,46 +151,6 @@ export class SubscriberKeyExchange {
187151
})
188152
}
189153

190-
private async processBuffer({ streamId, publisherId }: { streamId: string, publisherId: string }) {
191-
if (!this.enabled) { return }
192-
const key = `${streamId}.${publisherId}`
193-
const currentBuffer = this.getBuffer(key)
194-
const groupKeyIds = currentBuffer.slice()
195-
const groupKeyStore = await this.getGroupKeyStore(streamId)
196-
currentBuffer.length = 0
197-
try {
198-
const receivedGroupKeys = await this.requestKeys({
199-
streamId,
200-
publisherId,
201-
groupKeyIds,
202-
})
203-
if (!this.enabled) { return }
204-
await Promise.all(receivedGroupKeys.map(async (groupKey) => (
205-
groupKeyStore.add(groupKey)
206-
)))
207-
if (!this.enabled) { return }
208-
await Promise.all(groupKeyIds.map(async (id) => {
209-
if (!this.pending.has(id)) { return }
210-
const groupKeyTask = groupKeyStore.get(id)
211-
const task = this.pending.get(id)
212-
this.pending.delete(id)
213-
const groupKey = await groupKeyTask
214-
if (task) {
215-
task.resolve(groupKey)
216-
}
217-
}))
218-
} catch (err) {
219-
groupKeyIds.forEach((id) => {
220-
if (!this.pending.has(id)) { return }
221-
const task = this.pending.get(id)
222-
this.pending.delete(id)
223-
if (task) {
224-
task.reject(err)
225-
}
226-
})
227-
}
228-
}
229-
230154
async getKey(streamMessage: StreamMessage) {
231155
const streamId = streamMessage.getStreamId()
232156
const publisherId = streamMessage.getPublisherId()
@@ -245,79 +169,32 @@ export class SubscriberKeyExchange {
245169
return existingGroupKey
246170
}
247171

248-
if (this.pending.has(groupKeyId)) {
249-
return this.pending.get(groupKeyId)
250-
}
172+
const receivedGroupKeys = await this.requestKeys({
173+
streamId,
174+
publisherId,
175+
groupKeyIds: [groupKeyId],
176+
})
251177

252-
const key = `${streamId}.${publisherId}`
253-
const buffer = this.getBuffer(key)
254-
buffer.push(groupKeyId)
255-
this.pending.set(groupKeyId, Defer())
178+
await Promise.all(receivedGroupKeys.map(async (groupKey: GroupKey) => (
179+
groupKeyStore.add(groupKey)
180+
)))
256181

257-
if (!this.timeouts[key]) {
258-
this.timeouts[key] = setTimeout(() => {
259-
delete this.timeouts[key]
260-
this.processBuffer({ streamId, publisherId })
261-
}, 1000)
262-
}
263-
264-
return this.pending.get(groupKeyId)
182+
return groupKeyStore.get(groupKeyId)
265183
}
266184

267185
cleanupPending() {
268-
Array.from(Object.entries(this.timeouts)).forEach(([key, value]) => {
269-
clearTimeout(value)
270-
delete this.timeouts[key]
271-
})
272-
const pendingValues = Array.from(this.pending.values())
273-
this.pending.clear()
274-
pendingValues.forEach((value) => {
275-
value.resolve(undefined)
276-
})
277186
pMemoize.clear(this.getGroupKeyStore)
278187
}
279188

280-
initNext() {
281-
return Scaffold([
282-
async () => {
283-
await this.encryptionUtil.onReady()
284-
},
285-
async () => {
286-
this.sub = await SubscriberKeyExhangeSubscription(this.client, this.getGroupKeyStore, this.encryptionUtil)
287-
return async () => {
288-
if (!this.sub) { return }
289-
const cancelTask = this.sub.cancel()
290-
this.sub = undefined
291-
await cancelTask
292-
}
293-
}
294-
], async () => this.enabled, {
295-
id: `SubscriberKeyExhangeSubscription.${this.client.id}`,
296-
onChange: (shouldUp) => {
297-
if (!shouldUp) {
298-
this.cleanupPending()
299-
}
300-
},
301-
onDone: async () => {
302-
// clean up requestKey
303-
if (this.requestKeysStep) {
304-
await this.requestKeysStep()
305-
}
306-
}
307-
})
308-
}
309-
310189
async getGroupKey(streamMessage: StreamMessage) {
311190
if (!streamMessage.groupKeyId) { return [] }
312-
await this.next()
313-
if (!this.enabled) { return [] }
191+
await this.encryptionUtil.onReady()
314192

315193
return this.getKey(streamMessage)
316194
}
317195

318196
async start() {
319197
this.enabled = true
320-
return this.next()
321198
}
322199

323200
async addNewKey(streamMessage: StreamMessage) {
@@ -331,6 +208,5 @@ export class SubscriberKeyExchange {
331208

332209
async stop() {
333210
this.enabled = false
334-
return this.next()
335211
}
336212
}

0 commit comments

Comments
 (0)