Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit aeb68ef

Browse files
committed
refactor: Split key exchange publisher/subscriber into separate files.
1 parent 4f62e4b commit aeb68ef

File tree

6 files changed

+284
-248
lines changed

6 files changed

+284
-248
lines changed

src/publish/Encrypt.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { MessageLayer } from 'streamr-client-protocol'
33
import EncryptionUtil from '../stream/encryption/Encryption'
44
import { Stream } from '../stream'
55
import { StreamrClient } from '../StreamrClient'
6-
import { PublisherKeyExhange } from '../stream/encryption/KeyExchange'
6+
import { PublisherKeyExhange } from '../stream/encryption/KeyExchangePublisher'
77

88
const { StreamMessage } = MessageLayer
99

src/rest/StreamEndpoints.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { getEndpointUrl } from '../utils'
88
import { validateOptions } from '../stream/utils'
99
import { Stream, StreamOperation, StreamProperties } from '../stream'
1010
import { StreamPart } from '../stream/StreamPart'
11-
import { isKeyExchangeStream } from '../stream/encryption/KeyExchange'
11+
import { isKeyExchangeStream } from '../stream/encryption/KeyExchangeUtils'
1212

1313
import authFetch, { ErrorCode, NotFoundError } from './authFetch'
1414
import { EthereumAddress } from '../types'
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
import {
2+
StreamMessage, GroupKeyRequest, GroupKeyResponse, EncryptedGroupKey, GroupKeyErrorResponse, Errors
3+
} from 'streamr-client-protocol'
4+
import pMemoize from 'p-memoize'
5+
6+
import Scaffold from '../../utils/Scaffold'
7+
8+
import { validateOptions } from '../utils'
9+
import EncryptionUtil, { GroupKey, StreamMessageProcessingError } from './Encryption'
10+
import type { Subscription } from '../../subscribe'
11+
import { StreamrClient } from '../../StreamrClient'
12+
import GroupKeyStore from './GroupKeyStore'
13+
import {
14+
subscribeToKeyExchangeStream,
15+
parseGroupKeys,
16+
getKeyExchangeStreamId,
17+
GroupKeysSerialized
18+
} from './KeyExchangeUtils'
19+
20+
const { ValidationError } = Errors
21+
22+
class InvalidGroupKeyRequestError extends ValidationError {
23+
code: string
24+
constructor(...args: ConstructorParameters<typeof ValidationError>) {
25+
super(...args)
26+
this.code = 'INVALID_GROUP_KEY_REQUEST'
27+
if (Error.captureStackTrace) {
28+
Error.captureStackTrace(this, this.constructor)
29+
}
30+
}
31+
}
32+
33+
async function catchKeyExchangeError(client: StreamrClient, streamMessage: StreamMessage, fn: (...args: any[]) => Promise<void>) {
34+
try {
35+
return await fn()
36+
} catch (error) {
37+
const subscriberId = streamMessage.getPublisherId()
38+
const msg = streamMessage.getParsedContent()
39+
const { streamId, requestId, groupKeyIds } = GroupKeyRequest.fromArray(msg)
40+
return client.publish(getKeyExchangeStreamId(subscriberId), new GroupKeyErrorResponse({
41+
requestId,
42+
streamId,
43+
errorCode: error.code || 'UNEXPECTED_ERROR',
44+
errorMessage: error.message,
45+
groupKeyIds
46+
}))
47+
}
48+
}
49+
50+
async function PublisherKeyExhangeSubscription(client: StreamrClient, getGroupKeyStore: (streamId: string) => Promise<GroupKeyStore>) {
51+
async function onKeyExchangeMessage(_parsedContent: any, streamMessage: StreamMessage) {
52+
return catchKeyExchangeError(client, streamMessage, async () => {
53+
if (streamMessage.messageType !== StreamMessage.MESSAGE_TYPES.GROUP_KEY_REQUEST) {
54+
return Promise.resolve()
55+
}
56+
57+
// No need to check if parsedContent contains the necessary fields because it was already checked during deserialization
58+
const { requestId, streamId, rsaPublicKey, groupKeyIds } = GroupKeyRequest.fromArray(streamMessage.getParsedContent())
59+
60+
const subscriberId = streamMessage.getPublisherId()
61+
62+
const groupKeyStore = await getGroupKeyStore(streamId)
63+
const isSubscriber = await client.isStreamSubscriber(streamId, subscriberId)
64+
const encryptedGroupKeys = (!isSubscriber ? [] : await Promise.all(groupKeyIds.map(async (id) => {
65+
const groupKey = await groupKeyStore.get(id)
66+
if (!groupKey) {
67+
return null // will be filtered out
68+
}
69+
const key = EncryptionUtil.encryptWithPublicKey(groupKey.data, rsaPublicKey, true)
70+
return new EncryptedGroupKey(id, key)
71+
}))).filter(Boolean) as EncryptedGroupKey[]
72+
73+
client.debug('Publisher: Subscriber requested groupKeys: %d. Got: %d. %o', groupKeyIds.length, encryptedGroupKeys.length, {
74+
subscriberId,
75+
groupKeyIds,
76+
responseKeys: encryptedGroupKeys.map(({ groupKeyId }) => groupKeyId),
77+
})
78+
79+
const response = new GroupKeyResponse({
80+
streamId,
81+
requestId,
82+
encryptedGroupKeys,
83+
})
84+
85+
// hack overriding toStreamMessage method to set correct encryption type
86+
const toStreamMessage = response.toStreamMessage.bind(response)
87+
response.toStreamMessage = (...args) => {
88+
const msg = toStreamMessage(...args)
89+
msg.encryptionType = StreamMessage.ENCRYPTION_TYPES.RSA
90+
return msg
91+
}
92+
93+
return client.publish(getKeyExchangeStreamId(subscriberId), response)
94+
})
95+
}
96+
97+
const sub = await subscribeToKeyExchangeStream(client, onKeyExchangeMessage)
98+
sub.on('error', (err: Error | StreamMessageProcessingError) => {
99+
if (!('streamMessage' in err)) {
100+
return // do nothing
101+
}
102+
103+
// wrap error and translate into ErrorResponse.
104+
catchKeyExchangeError(client, err.streamMessage, () => { // eslint-disable-line promise/no-promise-in-callback
105+
// rethrow so catchKeyExchangeError handles it
106+
throw new InvalidGroupKeyRequestError(err.message)
107+
}).catch((unexpectedError) => {
108+
sub.emit('error', unexpectedError)
109+
})
110+
})
111+
112+
return sub
113+
}
114+
115+
type KeyExhangeOptions = {
116+
groupKeys?: Record<string, GroupKeysSerialized>
117+
}
118+
119+
export function PublisherKeyExhange(client: StreamrClient, { groupKeys = {} }: KeyExhangeOptions = {}) {
120+
let enabled = true
121+
const getGroupKeyStore = pMemoize(async (streamId) => {
122+
const clientId = await client.getAddress()
123+
return new GroupKeyStore({
124+
clientId,
125+
streamId,
126+
groupKeys: [...parseGroupKeys(groupKeys[streamId]).entries()]
127+
})
128+
}, {
129+
cacheKey([maybeStreamId]) {
130+
const { streamId } = validateOptions(maybeStreamId)
131+
return streamId
132+
}
133+
})
134+
135+
let sub: Subscription | undefined
136+
const next = Scaffold([
137+
async () => {
138+
sub = await PublisherKeyExhangeSubscription(client, getGroupKeyStore)
139+
return async () => {
140+
if (!sub) { return }
141+
const cancelTask = sub.cancel()
142+
sub = undefined
143+
await cancelTask
144+
}
145+
}
146+
], async () => enabled)
147+
148+
async function rotateGroupKey(streamId: string) {
149+
if (!enabled) { return }
150+
const groupKeyStore = await getGroupKeyStore(streamId)
151+
await groupKeyStore.rotateGroupKey()
152+
}
153+
154+
async function setNextGroupKey(streamId: string, groupKey: GroupKey) {
155+
if (!enabled) { return }
156+
const groupKeyStore = await getGroupKeyStore(streamId)
157+
158+
await groupKeyStore.setNextGroupKey(groupKey)
159+
}
160+
161+
async function useGroupKey(streamId: string) {
162+
await next()
163+
if (!enabled) { return [] }
164+
const groupKeyStore = await getGroupKeyStore(streamId)
165+
return groupKeyStore.useGroupKey()
166+
}
167+
168+
async function hasAnyGroupKey(streamId: string) {
169+
const groupKeyStore = await getGroupKeyStore(streamId)
170+
return !groupKeyStore.isEmpty()
171+
}
172+
173+
async function rekey(streamId: string) {
174+
if (!enabled) { return }
175+
const groupKeyStore = await getGroupKeyStore(streamId)
176+
await groupKeyStore.rekey()
177+
await next()
178+
}
179+
180+
return {
181+
setNextGroupKey,
182+
useGroupKey,
183+
rekey,
184+
rotateGroupKey,
185+
hasAnyGroupKey,
186+
async start() {
187+
enabled = true
188+
return next()
189+
},
190+
async stop() {
191+
pMemoize.clear(getGroupKeyStore)
192+
enabled = false
193+
return next()
194+
}
195+
}
196+
}

0 commit comments

Comments
 (0)