Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit 165f10c

Browse files
authored
Merge pull request #216 from streamr-dev/cherry-picked-improvements
Error handling improvements.
2 parents 1e49df0 + 9bd0bae commit 165f10c

26 files changed

+1760
-599
lines changed

.github/workflows/test-build.yml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,11 @@ jobs:
184184
npm install
185185
npm link streamr-client
186186
./gradlew fatjar
187-
- name: run-client-testing
188-
timeout-minutes: 10
187+
- uses: nick-invision/retry@v2
188+
name: run-client-testing
189189
working-directory: streamr-client-testing
190-
run: java -jar build/libs/client_testing-1.0-SNAPSHOT.jar -s $TEST_NAME -c config/$CONFIG_NAME.conf -n $NUM_MESSAGES
190+
with:
191+
max_attempts: 2
192+
timeout_minutes: 3
193+
retry_on: error
194+
command: java -jar build/libs/client_testing-1.0-SNAPSHOT.jar -s $TEST_NAME -c config/$CONFIG_NAME.conf -n $NUM_MESSAGES

src/Config.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ export type EthereumConfig = ExternalProvider|JsonRpcFetchFunc
2020
* @category Important
2121
*/
2222
export type StrictStreamrClientOptions = {
23+
/** Custom human-readable debug id for client. Used in logging. Unique id will be generated regardless. */
24+
id?: string,
2325
/**
2426
* Authentication: identity used by this StreamrClient instance.
2527
* Can contain member privateKey or (window.)ethereum
@@ -169,8 +171,8 @@ export default function ClientConfig(opts: StreamrClientOptions = {}) {
169171
...opts.dataUnion
170172
},
171173
cache: {
172-
...opts.cache,
173174
...STREAM_CLIENT_DEFAULTS.cache,
175+
...opts.cache,
174176
}
175177
// NOTE: sidechain is not merged with the defaults
176178
}

src/Ethereum.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ export default class StreamrEthereum {
2020
const key = auth.privateKey
2121
const address = getAddress(computeAddress(key))
2222
this._getAddress = async () => address
23-
this.getSigner = () => new Wallet(key, this.getMainnetProvider())
24-
this.getSidechainSigner = async () => new Wallet(key, this.getSidechainProvider())
23+
this._getSigner = () => new Wallet(key, this.getMainnetProvider())
24+
this._getSidechainSigner = async () => new Wallet(key, this.getSidechainProvider())
2525
} else if (auth.ethereum) {
2626
this._getAddress = async () => {
2727
try {
@@ -61,6 +61,10 @@ export default class StreamrEthereum {
6161
}
6262
}
6363

64+
canEncrypt() {
65+
return !!(this._getAddress && this._getSigner)
66+
}
67+
6468
async getAddress() {
6569
if (!this._getAddress) {
6670
// _getAddress is assigned in constructor

src/StreamrClient.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede
188188
// TODO annotate connection parameter as internal parameter if possible?
189189
constructor(options: StreamrClientOptions = {}, connection?: StreamrConnection) {
190190
super()
191-
this.id = counterId(`${this.constructor.name}:${uid}`)
191+
this.id = counterId(`${this.constructor.name}:${uid}${options.id || ''}`)
192192
this.debug = Debug(this.id)
193193

194194
this.options = Config(options)
@@ -217,9 +217,9 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede
217217
.on('disconnected', this.onConnectionDisconnected)
218218
.on('error', this.onConnectionError)
219219

220+
this.ethereum = new StreamrEthereum(this)
220221
this.publisher = Publisher(this)
221222
this.subscriber = new Subscriber(this)
222-
this.ethereum = new StreamrEthereum(this)
223223

224224
Plugin(this, new StreamEndpoints(this))
225225
Plugin(this, new LoginEndpoints(this))
@@ -357,10 +357,14 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede
357357
let subTask: Todo
358358
let sub: Todo
359359
const hasResend = !!(opts.resend || opts.from || opts.to || opts.last)
360-
const onEnd = () => {
360+
const onEnd = (err?: Error) => {
361361
if (sub && typeof onMessage === 'function') {
362362
sub.off('message', onMessage)
363363
}
364+
365+
if (err) {
366+
throw err
367+
}
364368
}
365369

366370
if (hasResend) {
@@ -429,6 +433,13 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede
429433
return this.getAddress()
430434
}
431435

436+
/**
437+
* True if authenticated with private key/ethereum provider
438+
*/
439+
canEncrypt() {
440+
return this.ethereum.canEncrypt()
441+
}
442+
432443
/**
433444
* Get token balance in "wei" (10^-18 parts) for given address
434445
*/

src/publish/Encrypt.ts

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,27 @@ const { StreamMessage } = MessageLayer
1010
type PublisherKeyExhangeAPI = ReturnType<typeof PublisherKeyExhange>
1111

1212
export default function Encrypt(client: StreamrClient) {
13-
const publisherKeyExchange = PublisherKeyExhange(client, {
14-
groupKeys: {
15-
...client.options.groupKeys,
13+
let publisherKeyExchange: ReturnType<typeof PublisherKeyExhange>
14+
15+
function getPublisherKeyExchange() {
16+
if (!publisherKeyExchange) {
17+
publisherKeyExchange = PublisherKeyExhange(client, {
18+
groupKeys: {
19+
...client.options.groupKeys,
20+
}
21+
})
1622
}
17-
})
23+
return publisherKeyExchange
24+
}
25+
1826
async function encrypt(streamMessage: MessageLayer.StreamMessage, stream: Stream) {
27+
if (!client.canEncrypt()) {
28+
return
29+
}
30+
1931
if (
20-
!publisherKeyExchange.hasAnyGroupKey(stream.id)
21-
&& !stream.requireEncryptedData
32+
!stream.requireEncryptedData
33+
&& !getPublisherKeyExchange().hasAnyGroupKey(stream.id)
2234
) {
2335
// not needed
2436
return
@@ -27,19 +39,23 @@ export default function Encrypt(client: StreamrClient) {
2739
if (streamMessage.messageType !== StreamMessage.MESSAGE_TYPES.MESSAGE) {
2840
return
2941
}
30-
const groupKey = await publisherKeyExchange.useGroupKey(stream.id)
42+
const groupKey = await getPublisherKeyExchange().useGroupKey(stream.id)
3143
await EncryptionUtil.encryptStreamMessage(streamMessage, groupKey)
3244
}
3345

3446
return Object.assign(encrypt, {
3547
setNextGroupKey(...args: Parameters<PublisherKeyExhangeAPI['setNextGroupKey']>) {
36-
return publisherKeyExchange.setNextGroupKey(...args)
48+
return getPublisherKeyExchange().setNextGroupKey(...args)
3749
},
3850
rotateGroupKey(...args: Parameters<PublisherKeyExhangeAPI['rotateGroupKey']>) {
39-
return publisherKeyExchange.rotateGroupKey(...args)
51+
return getPublisherKeyExchange().rotateGroupKey(...args)
52+
},
53+
start() {
54+
return getPublisherKeyExchange().start()
4055
},
4156
stop() {
42-
return publisherKeyExchange.stop()
57+
if (!publisherKeyExchange) { return Promise.resolve() }
58+
return getPublisherKeyExchange().stop()
4359
}
4460
})
4561
}

src/publish/index.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ function getCreateStreamMessage(client) {
122122
[streamId, streamPartition, publisherId, msgChainId].join('|')
123123
),
124124
...cacheOptions,
125+
maxAge: undefined
125126
}), {
126127
clear() {
127128
mem.clear(getMsgChainer)
@@ -188,6 +189,9 @@ function getCreateStreamMessage(client) {
188189
rotateGroupKey(maybeStreamId) {
189190
return encrypt.rotateGroupKey(maybeStreamId)
190191
},
192+
startKeyExchange() {
193+
return encrypt.start()
194+
},
191195
clear() {
192196
computeStreamPartition.clear()
193197
getMsgChainer.clear()
@@ -300,6 +304,9 @@ export default function Publisher(client) {
300304
throw error
301305
}
302306
},
307+
async startKeyExchange() {
308+
return createStreamMessage.startKeyExchange()
309+
},
303310
async stop() {
304311
sendQueue.clear()
305312
createStreamMessage.clear()

0 commit comments

Comments
 (0)