|
| 1 | +import { wait } from 'streamr-test-utils' |
| 2 | + |
| 3 | +import { describeRepeats, fakePrivateKey, uid, getPublishTestMessages, addAfterFn } from '../utils' |
| 4 | +// import { Defer } from '../../src/utils' |
| 5 | +import { StreamrClient } from '../../src/StreamrClient' |
| 6 | +import { GroupKey } from '../../src/stream/Encryption' |
| 7 | +import Connection from '../../src/Connection' |
| 8 | + |
| 9 | +import config from './config' |
| 10 | + |
| 11 | +const TIMEOUT = 30 * 1000 |
| 12 | + |
| 13 | +describeRepeats('decryption', () => { |
| 14 | + let publishTestMessages |
| 15 | + let expectErrors = 0 // check no errors by default |
| 16 | + let errors = [] |
| 17 | + let subscriber |
| 18 | + const addAfter = addAfterFn() |
| 19 | + |
| 20 | + const getOnError = (errs) => jest.fn((err) => { |
| 21 | + errs.push(err) |
| 22 | + }) |
| 23 | + |
| 24 | + let onError = jest.fn() |
| 25 | + let publisher |
| 26 | + let stream |
| 27 | + |
| 28 | + const createClient = (opts = {}) => { |
| 29 | + const c = new StreamrClient({ |
| 30 | + ...config.clientOptions, |
| 31 | + auth: { |
| 32 | + privateKey: fakePrivateKey(), |
| 33 | + }, |
| 34 | + autoConnect: false, |
| 35 | + autoDisconnect: false, |
| 36 | + // @ts-expect-error |
| 37 | + disconnectDelay: 1, |
| 38 | + publishAutoDisconnectDelay: 50, |
| 39 | + maxRetries: 2, |
| 40 | + ...opts, |
| 41 | + }) |
| 42 | + c.onError = jest.fn() |
| 43 | + c.on('error', onError) |
| 44 | + |
| 45 | + return c |
| 46 | + } |
| 47 | + |
| 48 | + beforeEach(() => { |
| 49 | + errors = [] |
| 50 | + expectErrors = 0 |
| 51 | + onError = getOnError(errors) |
| 52 | + }) |
| 53 | + |
| 54 | + afterEach(async () => { |
| 55 | + await wait(0) |
| 56 | + // ensure no unexpected errors |
| 57 | + expect(errors).toHaveLength(expectErrors) |
| 58 | + if (publisher) { |
| 59 | + expect(publisher.onError).toHaveBeenCalledTimes(expectErrors) |
| 60 | + } |
| 61 | + }) |
| 62 | + |
| 63 | + afterEach(async () => { |
| 64 | + await wait(0) |
| 65 | + if (publisher) { |
| 66 | + publisher.debug('disconnecting after test') |
| 67 | + await publisher.disconnect() |
| 68 | + } |
| 69 | + |
| 70 | + if (subscriber) { |
| 71 | + subscriber.debug('disconnecting after test') |
| 72 | + await subscriber.disconnect() |
| 73 | + } |
| 74 | + |
| 75 | + const openSockets = Connection.getOpen() |
| 76 | + if (openSockets !== 0) { |
| 77 | + await Connection.closeOpen() |
| 78 | + throw new Error(`sockets not closed: ${openSockets}`) |
| 79 | + } |
| 80 | + }) |
| 81 | + |
| 82 | + let publisherPrivateKey: string |
| 83 | + let subscriberPrivateKey: string |
| 84 | + |
| 85 | + async function setupPublisher(opts?: any) { |
| 86 | + const client = createClient(opts) |
| 87 | + await Promise.all([ |
| 88 | + client.session.getSessionToken(), |
| 89 | + client.connect(), |
| 90 | + ]) |
| 91 | + |
| 92 | + const name = uid('stream') |
| 93 | + stream = await client.createStream({ |
| 94 | + name, |
| 95 | + requireEncryptedData: true, |
| 96 | + }) |
| 97 | + |
| 98 | + await stream.addToStorageNode(config.clientOptions.storageNode.address) |
| 99 | + |
| 100 | + publishTestMessages = getPublishTestMessages(client, { |
| 101 | + stream, |
| 102 | + waitForLast: true, |
| 103 | + }) |
| 104 | + return client |
| 105 | + } |
| 106 | + |
| 107 | + beforeEach(async () => { |
| 108 | + publisherPrivateKey = fakePrivateKey() |
| 109 | + publisher = await setupPublisher({ |
| 110 | + id: 'publisher', |
| 111 | + auth: { |
| 112 | + privateKey: publisherPrivateKey, |
| 113 | + } |
| 114 | + }) |
| 115 | + subscriberPrivateKey = fakePrivateKey() |
| 116 | + subscriber = createClient({ |
| 117 | + id: 'subscriber', |
| 118 | + autoConnect: true, |
| 119 | + autoDisconnect: true, |
| 120 | + auth: { |
| 121 | + privateKey: subscriberPrivateKey, |
| 122 | + } |
| 123 | + }) |
| 124 | + const otherUser = await subscriber.getUserInfo() |
| 125 | + await stream.grantPermission('stream_get', otherUser.username) |
| 126 | + await stream.grantPermission('stream_subscribe', otherUser.username) |
| 127 | + const groupKey = GroupKey.generate() |
| 128 | + await publisher.setNextGroupKey(stream.id, groupKey) |
| 129 | + }) |
| 130 | + |
| 131 | + it('publisher persists group key', async () => { |
| 132 | + // ensure publisher can read a persisted group key |
| 133 | + // 1. publish some messages with publisher |
| 134 | + // 2. then disconnect publisher |
| 135 | + // 3. create new publisher with same key |
| 136 | + // 4. subscribe with subscriber |
| 137 | + // because original publisher is disconnected |
| 138 | + // subscriber will need to ask new publisher |
| 139 | + // for group keys, which the new publisher will have to read from |
| 140 | + // persistence |
| 141 | + const published = await publishTestMessages(5) |
| 142 | + await publisher.disconnect() |
| 143 | + const publisher2 = createClient({ |
| 144 | + id: 'publisher2', |
| 145 | + auth: { |
| 146 | + privateKey: publisherPrivateKey, |
| 147 | + } |
| 148 | + }) |
| 149 | + |
| 150 | + addAfter(async () => { |
| 151 | + await publisher2.disconnect() |
| 152 | + }) |
| 153 | + |
| 154 | + await publisher2.connect() |
| 155 | + |
| 156 | + // TODO: this should probably happen automatically if there are keys |
| 157 | + // also probably needs to create a connection handle |
| 158 | + await publisher2.publisher.startKeyExchange() |
| 159 | + |
| 160 | + const sub = await subscriber.subscribe({ |
| 161 | + stream: stream.id, |
| 162 | + resend: { |
| 163 | + last: 5, |
| 164 | + } |
| 165 | + }) |
| 166 | + const received = [] |
| 167 | + for await (const m of sub) { |
| 168 | + received.push(m.getParsedContent()) |
| 169 | + if (received.length === published.length) { |
| 170 | + break |
| 171 | + } |
| 172 | + } |
| 173 | + |
| 174 | + expect(received).toEqual(published) |
| 175 | + }, 2 * TIMEOUT) |
| 176 | + |
| 177 | + it('subscriber persists group key', async () => { |
| 178 | + // we want to check that subscriber can read a group key |
| 179 | + // persisted by another subscriber: |
| 180 | + // 1. create publisher and subscriber |
| 181 | + // 2. after subscriber gets first message |
| 182 | + // 3. disconnect both subscriber and publisher |
| 183 | + // 4. then create a new subscriber with same key as original subscriber |
| 184 | + // 5. and subscribe to same stream. |
| 185 | + // this should pick up group key persisted by first subscriber |
| 186 | + // publisher is disconnected, so can't ask for new group keys |
| 187 | + const sub = await subscriber.subscribe({ |
| 188 | + stream: stream.id, |
| 189 | + }) |
| 190 | + const published = await publishTestMessages(5) |
| 191 | + |
| 192 | + const received = [] |
| 193 | + for await (const m of sub) { |
| 194 | + received.push(m.getParsedContent()) |
| 195 | + if (received.length === 1) { |
| 196 | + break |
| 197 | + } |
| 198 | + } |
| 199 | + await subscriber.disconnect() |
| 200 | + await publisher.disconnect() |
| 201 | + |
| 202 | + const subscriber2 = createClient({ |
| 203 | + id: 'subscriber2', |
| 204 | + auth: { |
| 205 | + privateKey: subscriberPrivateKey |
| 206 | + } |
| 207 | + }) |
| 208 | + |
| 209 | + addAfter(async () => { |
| 210 | + await subscriber2.disconnect() |
| 211 | + }) |
| 212 | + |
| 213 | + await subscriber2.connect() |
| 214 | + const sub2 = await subscriber2.subscribe({ |
| 215 | + stream: stream.id, |
| 216 | + resend: { |
| 217 | + last: 5 |
| 218 | + } |
| 219 | + }) |
| 220 | + |
| 221 | + const received2 = [] |
| 222 | + for await (const m of sub2) { |
| 223 | + received2.push(m.getParsedContent()) |
| 224 | + if (received2.length === published.length) { |
| 225 | + break |
| 226 | + } |
| 227 | + } |
| 228 | + expect(received2).toEqual(published) |
| 229 | + expect(received).toEqual(published.slice(0, 1)) |
| 230 | + }, 2 * TIMEOUT) |
| 231 | + |
| 232 | + it('can run multiple publishers in parallel', async () => { |
| 233 | + const sub = await subscriber.subscribe({ |
| 234 | + stream: stream.id, |
| 235 | + }) |
| 236 | + |
| 237 | + // ensure publishers don't clobber each others data |
| 238 | + const publisher2 = createClient({ |
| 239 | + id: 'publisher2', |
| 240 | + auth: { |
| 241 | + privateKey: publisherPrivateKey, |
| 242 | + } |
| 243 | + }) |
| 244 | + |
| 245 | + addAfter(async () => { |
| 246 | + await publisher2.disconnect() |
| 247 | + }) |
| 248 | + |
| 249 | + await publisher2.connect() |
| 250 | + const publishTestMessages2 = getPublishTestMessages(publisher2, { |
| 251 | + stream, |
| 252 | + waitForLast: true, |
| 253 | + }) |
| 254 | + |
| 255 | + const [published1, published2] = await Promise.all([ |
| 256 | + publishTestMessages(5), |
| 257 | + publishTestMessages2(6), // use different lengths so we can differentiate who published what |
| 258 | + ]) |
| 259 | + |
| 260 | + const received1 = [] |
| 261 | + const received2 = [] |
| 262 | + for await (const m of sub) { |
| 263 | + const content = m.getParsedContent() |
| 264 | + // 'n of 6' messages belong to publisher2 |
| 265 | + if (content.value.endsWith('of 6')) { |
| 266 | + received2.push(content) |
| 267 | + } else { |
| 268 | + received1.push(content) |
| 269 | + } |
| 270 | + |
| 271 | + if (received1.length === published1.length && received2.length === published2.length) { |
| 272 | + break |
| 273 | + } |
| 274 | + } |
| 275 | + |
| 276 | + expect(received1).toEqual(published1) |
| 277 | + expect(received2).toEqual(published2) |
| 278 | + }, 2 * TIMEOUT) |
| 279 | +}) |
| 280 | + |
0 commit comments