Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit fbb62bf

Browse files
committed
fix: Improve group key store.
1 parent adf864e commit fbb62bf

File tree

4 files changed

+162
-99
lines changed

4 files changed

+162
-99
lines changed

src/stream/KeyExchange.ts

Lines changed: 97 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -70,24 +70,27 @@ type GroupKeyStoreOptions = {
7070
groupKeys: [GroupKeyId, GroupKey][]
7171
}
7272

73-
function GroupKeyStore({ clientId, streamId, groupKeys }: GroupKeyStoreOptions) {
74-
const store = new PersistentStore({ clientId, streamId })
75-
76-
let currentGroupKeyId: GroupKeyId | undefined // current key id if any
77-
const nextGroupKeys: GroupKey[] = [] // the keys to use next, disappears if not actually used. Max queue size 2
78-
79-
groupKeys.forEach(([groupKeyId, groupKey]) => {
80-
GroupKey.validate(groupKey)
81-
if (groupKeyId !== groupKey.id) {
82-
throw new Error(`Ids must match: groupKey.id: ${groupKey.id}, groupKeyId: ${groupKeyId}`)
83-
}
84-
// use last init key as current
85-
currentGroupKeyId = groupKey.id
86-
})
73+
export class GroupKeyStore {
74+
store
75+
currentGroupKeyId: GroupKeyId | undefined // current key id if any
76+
nextGroupKeys: GroupKey[] = [] // the keys to use next, disappears if not actually used. Max queue size 2
77+
78+
constructor({ clientId, streamId, groupKeys }: GroupKeyStoreOptions) {
79+
this.store = new PersistentStore({ clientId, streamId })
80+
81+
groupKeys.forEach(([groupKeyId, groupKey]) => {
82+
GroupKey.validate(groupKey)
83+
if (groupKeyId !== groupKey.id) {
84+
throw new Error(`Ids must match: groupKey.id: ${groupKey.id}, groupKeyId: ${groupKeyId}`)
85+
}
86+
// use last init key as current
87+
this.currentGroupKeyId = groupKey.id
88+
})
89+
}
8790

88-
async function storeKey(groupKey: GroupKey) {
91+
private async storeKey(groupKey: GroupKey) {
8992
GroupKey.validate(groupKey)
90-
const existingKey = await store.get(groupKey.id)
93+
const existingKey = await this.store.get(groupKey.id)
9194
if (existingKey) {
9295
if (!existingKey.equals(groupKey)) {
9396
throw new GroupKey.InvalidGroupKeyError(
@@ -96,90 +99,95 @@ function GroupKeyStore({ clientId, streamId, groupKeys }: GroupKeyStoreOptions)
9699
)
97100
}
98101

99-
await store.set(groupKey.id, existingKey)
102+
await this.store.set(groupKey.id, existingKey)
100103
return existingKey
101104
}
102105

103-
await store.set(groupKey.id, groupKey)
106+
await this.store.set(groupKey.id, groupKey)
104107
return groupKey
105108
}
106109

107-
return {
108-
async has(id: GroupKeyId) {
109-
if (currentGroupKeyId === id) { return true }
110+
async has(id: GroupKeyId) {
111+
if (this.currentGroupKeyId === id) { return true }
110112

111-
if (nextGroupKeys.some((nextKey) => nextKey.id === id)) { return true }
113+
if (this.nextGroupKeys.some((nextKey) => nextKey.id === id)) { return true }
112114

113-
return store.has(id)
114-
},
115-
async isEmpty() {
116-
return !nextGroupKeys.length && await store.size() === 0
117-
},
118-
async useGroupKey(): Promise<[GroupKey | undefined, GroupKey | undefined]> {
119-
const nextGroupKey = nextGroupKeys.pop()
120-
// First use of group key on this stream, no current key. Make next key current.
121-
if (!currentGroupKeyId && nextGroupKey) {
122-
await storeKey(nextGroupKey)
123-
currentGroupKeyId = nextGroupKey.id
124-
return [
125-
await this.get(currentGroupKeyId),
126-
undefined,
127-
]
128-
}
115+
return this.store.has(id)
116+
}
129117

130-
// Keep using current key (empty next)
131-
if (currentGroupKeyId != null && !nextGroupKey) {
132-
return [
133-
await this.get(currentGroupKeyId),
134-
undefined
135-
]
136-
}
118+
async isEmpty() {
119+
return !this.nextGroupKeys.length && await this.store.size() === 0
120+
}
137121

138-
// Key changed (non-empty next). return current + next. Make next key current.
139-
if (currentGroupKeyId != null && nextGroupKey != null) {
140-
await storeKey(nextGroupKey)
141-
const prevGroupKey = await this.get(currentGroupKeyId)
142-
currentGroupKeyId = nextGroupKey.id
143-
// use current key one more time
144-
return [
145-
prevGroupKey,
146-
nextGroupKey,
147-
]
148-
}
122+
async useGroupKey(): Promise<[GroupKey | undefined, GroupKey | undefined]> {
123+
const nextGroupKey = this.nextGroupKeys.pop()
124+
// First use of group key on this stream, no current key. Make next key current.
125+
if (!this.currentGroupKeyId && nextGroupKey) {
126+
this.currentGroupKeyId = nextGroupKey.id
127+
return [
128+
await this.get(this.currentGroupKeyId),
129+
undefined,
130+
]
131+
}
149132

150-
// Generate & use new key if none already set.
151-
await this.rotateGroupKey()
152-
return this.useGroupKey()
153-
},
154-
async get(id: GroupKeyId) {
155-
return store.get(id)
156-
},
157-
async clear() {
158-
currentGroupKeyId = undefined
159-
nextGroupKeys.length = 0
160-
return store.clear()
161-
},
162-
async rotateGroupKey() {
163-
return this.setNextGroupKey(GroupKey.generate())
164-
},
165-
async add(groupKey: GroupKey) {
166-
return storeKey(groupKey)
167-
},
168-
async setNextGroupKey(newKey: GroupKey) {
169-
GroupKey.validate(newKey)
170-
nextGroupKeys.unshift(newKey)
171-
nextGroupKeys.length = Math.min(nextGroupKeys.length, 2)
172-
},
173-
async rekey() {
174-
const newKey = GroupKey.generate()
175-
await storeKey(newKey)
176-
currentGroupKeyId = newKey.id
177-
nextGroupKeys.length = 0
133+
// Keep using current key (empty next)
134+
if (this.currentGroupKeyId != null && !nextGroupKey) {
135+
return [
136+
await this.get(this.currentGroupKeyId),
137+
undefined
138+
]
139+
}
140+
141+
// Key changed (non-empty next). return current + next. Make next key current.
142+
if (this.currentGroupKeyId != null && nextGroupKey != null) {
143+
const prevId = this.currentGroupKeyId
144+
this.currentGroupKeyId = nextGroupKey.id
145+
const prevGroupKey = await this.get(prevId)
146+
// use current key one more time
147+
return [
148+
prevGroupKey,
149+
nextGroupKey,
150+
]
178151
}
152+
153+
// Generate & use new key if none already set.
154+
await this.rotateGroupKey()
155+
return this.useGroupKey()
156+
}
157+
158+
async get(id: GroupKeyId) {
159+
return this.store.get(id)
160+
}
161+
162+
async clear() {
163+
this.currentGroupKeyId = undefined
164+
this.nextGroupKeys.length = 0
165+
return this.store.clear()
166+
}
167+
168+
async rotateGroupKey() {
169+
return this.setNextGroupKey(GroupKey.generate())
170+
}
171+
172+
async add(groupKey: GroupKey) {
173+
return this.storeKey(groupKey)
174+
}
175+
176+
async setNextGroupKey(newKey: GroupKey) {
177+
GroupKey.validate(newKey)
178+
this.nextGroupKeys.unshift(newKey)
179+
this.nextGroupKeys.length = Math.min(this.nextGroupKeys.length, 2)
180+
await this.storeKey(newKey)
181+
}
182+
183+
async rekey() {
184+
const newKey = GroupKey.generate()
185+
await this.storeKey(newKey)
186+
this.currentGroupKeyId = newKey.id
187+
this.nextGroupKeys.length = 0
179188
}
180189
}
181190

182-
type GroupKeyStorage = ReturnType<typeof GroupKeyStore>
183191
type GroupKeysSerialized = Record<GroupKeyId, GroupKeyish>
184192

185193
function parseGroupKeys(groupKeys: GroupKeysSerialized = {}): Map<GroupKeyId, GroupKey> {
@@ -243,7 +251,7 @@ async function catchKeyExchangeError(client: StreamrClient, streamMessage: Strea
243251
}
244252
}
245253

246-
async function PublisherKeyExhangeSubscription(client: StreamrClient, getGroupKeyStore: (streamId: string) => Promise<GroupKeyStorage>) {
254+
async function PublisherKeyExhangeSubscription(client: StreamrClient, getGroupKeyStore: (streamId: string) => Promise<GroupKeyStore>) {
247255
async function onKeyExchangeMessage(_parsedContent: any, streamMessage: StreamMessage) {
248256
return catchKeyExchangeError(client, streamMessage, async () => {
249257
if (streamMessage.messageType !== StreamMessage.MESSAGE_TYPES.GROUP_KEY_REQUEST) {
@@ -316,7 +324,7 @@ export function PublisherKeyExhange(client: StreamrClient, { groupKeys = {} }: K
316324
let enabled = true
317325
const getGroupKeyStore = pMemoize(async (streamId) => {
318326
const clientId = await client.getAddress()
319-
return GroupKeyStore({
327+
return new GroupKeyStore({
320328
clientId,
321329
streamId,
322330
groupKeys: [...parseGroupKeys(groupKeys[streamId]).entries()]
@@ -403,7 +411,7 @@ async function getGroupKeysFromStreamMessage(streamMessage: StreamMessage, encry
403411

404412
async function SubscriberKeyExhangeSubscription(
405413
client: StreamrClient,
406-
getGroupKeyStore: (streamId: string) => Promise<GroupKeyStorage>,
414+
getGroupKeyStore: (streamId: string) => Promise<GroupKeyStore>,
407415
encryptionUtil: EncryptionUtil
408416
) {
409417
let sub: Subscription
@@ -436,7 +444,7 @@ export function SubscriberKeyExchange(client: StreamrClient, { groupKeys = {} }:
436444

437445
const getGroupKeyStore = pMemoize(async (streamId) => {
438446
const clientId = await client.getAddress()
439-
return GroupKeyStore({
447+
return new GroupKeyStore({
440448
clientId,
441449
streamId,
442450
groupKeys: [...parseGroupKeys(groupKeys[streamId]).entries()]

src/stream/PersistentStore.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,13 @@ class ServerStorage {
6464

6565
async set(key: string, value: string) {
6666
await this.init()
67-
return this.store!.run('INSERT INTO GroupKeys VALUES ($id, $groupKey, $streamId) ON CONFLICT DO NOTHING', {
67+
const result = await this.store!.run('INSERT INTO GroupKeys VALUES ($id, $groupKey, $streamId) ON CONFLICT DO NOTHING', {
6868
$id: key,
6969
$groupKey: value,
7070
$streamId: this.streamId,
7171
})
72+
73+
return !!result?.changes
7274
}
7375

7476
async delete(key: string) {

test/integration/Encryption.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ describeRepeats('decryption', () => {
140140
}))
141141

142142
client.once('error', done.reject)
143-
client.setNextGroupKey(stream.id, groupKey)
143+
await client.setNextGroupKey(stream.id, groupKey)
144144
// Publish after subscribed
145145
await Promise.all([
146146
client.publish(stream.id, msg),

test/unit/GroupKeyStore.test.ts

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,72 @@
11
import crypto from 'crypto'
22
import { GroupKey } from '../../src/stream/Encryption'
3-
import GroupKeyStore from '../../src/stream/PersistentStore'
4-
import { uid, addAfterFn } from '../utils'
3+
import PersistentStore from '../../src/stream/PersistentStore'
4+
import { GroupKeyStore } from '../../src/stream/KeyExchange'
5+
import { uid, addAfterFn, describeRepeats } from '../utils'
56

6-
describe('GroupKeyStore', () => {
7+
describeRepeats('GroupKeyStore', () => {
78
let clientId: string
89
let streamId: string
910
let store: GroupKeyStore
1011

12+
beforeEach(() => {
13+
clientId = `0x${crypto.randomBytes(20).toString('hex')}`
14+
streamId = uid('stream')
15+
store = new GroupKeyStore({
16+
clientId,
17+
streamId,
18+
groupKeys: [],
19+
})
20+
})
21+
22+
afterEach(async () => {
23+
if (!store) { return }
24+
await store.clear()
25+
})
26+
27+
it('can get set and delete', async () => {
28+
const groupKey = GroupKey.generate()
29+
expect(await store.get(groupKey.id)).toBeFalsy()
30+
expect(await store.add(groupKey)).toBeTruthy()
31+
expect(await store.get(groupKey.id)).toEqual(groupKey)
32+
expect(await store.clear()).toBeTruthy()
33+
expect(await store.clear()).toBeFalsy()
34+
expect(await store.get(groupKey.id)).toBeFalsy()
35+
})
36+
37+
it('can set next and use', async () => {
38+
const groupKey = GroupKey.generate()
39+
await store.setNextGroupKey(groupKey)
40+
expect(await store.useGroupKey()).toEqual([groupKey, undefined])
41+
expect(await store.useGroupKey()).toEqual([groupKey, undefined])
42+
const groupKey2 = GroupKey.generate()
43+
await store.setNextGroupKey(groupKey2)
44+
expect(await store.useGroupKey()).toEqual([groupKey, groupKey2])
45+
expect(await store.useGroupKey()).toEqual([groupKey2, undefined])
46+
})
47+
48+
it('can set next in parallel and use', async () => {
49+
const groupKey = GroupKey.generate()
50+
const groupKey2 = GroupKey.generate()
51+
await Promise.all([
52+
store.setNextGroupKey(groupKey),
53+
store.setNextGroupKey(groupKey2),
54+
])
55+
expect(await store.useGroupKey()).toEqual([groupKey, undefined])
56+
})
57+
})
58+
59+
describeRepeats('PersistentStore', () => {
60+
let clientId: string
61+
let streamId: string
62+
let store: PersistentStore
63+
1164
const addAfter = addAfterFn()
1265

1366
beforeEach(() => {
1467
clientId = `0x${crypto.randomBytes(20).toString('hex')}`
1568
streamId = uid('stream')
16-
store = new GroupKeyStore({
69+
store = new PersistentStore({
1770
clientId,
1871
streamId,
1972
})
@@ -51,7 +104,7 @@ describe('GroupKeyStore', () => {
51104
})
52105

53106
it('can get set and delete in parallel', async () => {
54-
const store2 = new GroupKeyStore({
107+
const store2 = new PersistentStore({
55108
clientId,
56109
streamId,
57110
})
@@ -87,7 +140,7 @@ describe('GroupKeyStore', () => {
87140

88141
it('does not conflict with other streamIds', async () => {
89142
const streamId2 = uid('stream')
90-
const store2 = new GroupKeyStore({
143+
const store2 = new PersistentStore({
91144
clientId,
92145
streamId: streamId2,
93146
})
@@ -107,7 +160,7 @@ describe('GroupKeyStore', () => {
107160

108161
it('does not conflict with other clientIds', async () => {
109162
const clientId2 = `0x${crypto.randomBytes(20).toString('hex')}`
110-
const store2 = new GroupKeyStore({
163+
const store2 = new PersistentStore({
111164
clientId: clientId2,
112165
streamId,
113166
})
@@ -127,7 +180,7 @@ describe('GroupKeyStore', () => {
127180

128181
it('does not conflict with other clientIds', async () => {
129182
const clientId2 = `0x${crypto.randomBytes(20).toString('hex')}`
130-
const store2 = new GroupKeyStore({
183+
const store2 = new PersistentStore({
131184
clientId: clientId2,
132185
streamId,
133186
})

0 commit comments

Comments
 (0)