Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit fb3eed1

Browse files
authored
Public type definitions (#218)
Define valid types for public API methods Also small bug fix for parsing stream definition options (e.g. when filtering subscriptions with getSubscription(opts))
1 parent 217887d commit fb3eed1

File tree

8 files changed

+154
-56
lines changed

8 files changed

+154
-56
lines changed

src/Connection.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@ const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
1313
// @ts-expect-error
1414
Debug.formatters.n = (v) => Debug.humanize(v)
1515

16+
export interface ConnectionOptions {
17+
url?: string,
18+
autoConnect?: boolean
19+
autoDisconnect?: boolean
20+
disconnectDelay?: number
21+
maxRetries?: number
22+
retryBackoffFactor?: number
23+
maxRetryWait?: number
24+
}
25+
1626
export class ConnectionError extends Error {
1727
reason?: Todo
1828

@@ -120,7 +130,7 @@ const STATE = {
120130
}
121131

122132
/* eslint-disable no-underscore-dangle, no-param-reassign */
123-
function SocketConnector(connection: Todo) {
133+
function SocketConnector(connection: Connection) {
124134
let next: Todo
125135
let socket: Todo
126136
let startedConnecting = false
@@ -189,7 +199,7 @@ function SocketConnector(connection: Todo) {
189199
// connect
190200
async () => {
191201
startedConnecting = true
192-
socket = await OpenWebSocket(connection.options.url, {
202+
socket = await OpenWebSocket(connection.options.url!, {
193203
perMessageDeflate: false,
194204
debug: connection._debug,
195205
})
@@ -287,7 +297,7 @@ const DEFAULT_MAX_RETRIES = 10
287297
export default class Connection extends EventEmitter {
288298

289299
_debug: Todo
290-
options: Todo
300+
options: ConnectionOptions
291301
retryCount: Todo
292302
wantsState: Todo
293303
connectionHandles: Todo
@@ -312,7 +322,7 @@ export default class Connection extends EventEmitter {
312322
}))
313323
}
314324

315-
constructor(options = {}, debug?: Debug.Debugger) {
325+
constructor(options: ConnectionOptions = {}, debug?: Debug.Debugger) {
316326
super()
317327
this._debug = debug !== undefined
318328
? debug.extend(counterId(this.constructor.name))

src/StreamrClient.ts

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { validateOptions } from './stream/utils'
77
import Config, { StreamrClientOptions, StrictStreamrClientOptions } from './Config'
88
import StreamrEthereum from './Ethereum'
99
import Session from './Session'
10-
import Connection, { ConnectionError } from './Connection'
10+
import Connection, { ConnectionError, ConnectionOptions } from './Connection'
1111
import Publisher from './publish'
1212
import { Subscriber, Subscription } from './subscribe'
1313
import { getUserId } from './user'
@@ -18,21 +18,31 @@ import { DataUnion, DataUnionDeployOptions } from './dataunion/DataUnion'
1818
import { BigNumber } from '@ethersproject/bignumber'
1919
import { getAddress } from '@ethersproject/address'
2020
import { Contract } from '@ethersproject/contracts'
21+
import { StreamPartDefinition } from './stream'
2122

2223
// TODO get metadata type from streamr-protocol-js project (it doesn't export the type definitions yet)
2324
export type OnMessageCallback = MaybeAsync<(message: any, metadata: any) => void>
2425

26+
export type ResendOptions = {
27+
from?: { timestamp: number, sequenceNumber?: number }
28+
to?: { timestamp: number, sequenceNumber?: number }
29+
last?: number
30+
}
31+
32+
export type SubscribeOptions = {
33+
resend?: ResendOptions
34+
} & ResendOptions
35+
2536
interface MessageEvent {
2637
data: any
2738
}
2839

2940
/**
3041
* Wrap connection message events with message parsing.
3142
*/
32-
3343
class StreamrConnection extends Connection {
3444
// TODO define args type when we convert Connection class to TypeScript
35-
constructor(options: Todo, debug?: Debug.Debugger) {
45+
constructor(options: ConnectionOptions, debug?: Debug.Debugger) {
3646
super(options, debug)
3747
this.on('message', this.onConnectionMessage)
3848
}
@@ -160,6 +170,7 @@ export class StreamrClient extends EventEmitter {
160170
/** @internal */
161171
ethereum: StreamrEthereum
162172

173+
// TODO annotate connection parameter as internal parameter if possible?
163174
constructor(options: StreamrClientOptions = {}, connection?: StreamrConnection) {
164175
super()
165176
this.id = counterId(`${this.constructor.name}:${uid}`)
@@ -280,13 +291,13 @@ export class StreamrClient extends EventEmitter {
280291
])
281292
}
282293

283-
getSubscriptions(...args: Todo) {
284-
return this.subscriber.getAll(...args)
294+
getSubscriptions(): Subscription[] {
295+
return this.subscriber.getAll()
285296
}
286297

287-
getSubscription(...args: Todo) {
298+
getSubscription(definition: StreamPartDefinition) {
288299
// @ts-expect-error
289-
return this.subscriber.get(...args)
300+
return this.subscriber.get(definition)
290301
}
291302

292303
async ensureConnected() {
@@ -301,8 +312,8 @@ export class StreamrClient extends EventEmitter {
301312
return this.session.logout()
302313
}
303314

304-
async publish(...args: Todo) {
305-
return this.publisher.publish(...args)
315+
async publish(streamObjectOrId: StreamPartDefinition, content: object, timestamp?: number|string|Date, partitionKey?: string) {
316+
return this.publisher.publish(streamObjectOrId, content, timestamp, partitionKey)
306317
}
307318

308319
async getUserId() {
@@ -319,7 +330,7 @@ export class StreamrClient extends EventEmitter {
319330
return this.publisher.rotateGroupKey(...args)
320331
}
321332

322-
async subscribe(opts: Todo, onMessage?: OnMessageCallback) {
333+
async subscribe(opts: SubscribeOptions & StreamPartDefinition, onMessage?: OnMessageCallback) {
323334
let subTask: Todo
324335
let sub: Todo
325336
const hasResend = !!(opts.resend || opts.from || opts.to || opts.last)
@@ -350,10 +361,11 @@ export class StreamrClient extends EventEmitter {
350361
return subTask
351362
}
352363

353-
async unsubscribe(opts: Todo) {
354-
await this.subscriber.unsubscribe(opts)
364+
async unsubscribe(subscription: Subscription) {
365+
await this.subscriber.unsubscribe(subscription)
355366
}
356367

368+
/** @internal */
357369
async resend(opts: Todo, onMessage?: OnMessageCallback): Promise<Subscription> {
358370
const task = this.subscriber.resend(opts)
359371
if (typeof onMessage !== 'function') {
@@ -373,12 +385,12 @@ export class StreamrClient extends EventEmitter {
373385
return task
374386
}
375387

376-
enableAutoConnect(...args: Todo) {
377-
return this.connection.enableAutoConnect(...args)
388+
enableAutoConnect(autoConnect?: boolean) {
389+
return this.connection.enableAutoConnect(autoConnect)
378390
}
379391

380-
enableAutoDisconnect(...args: Todo) {
381-
return this.connection.enableAutoDisconnect(...args)
392+
enableAutoDisconnect(autoDisconnect?: boolean) {
393+
return this.connection.enableAutoDisconnect(autoDisconnect)
382394
}
383395

384396
async getAddress(): Promise<EthereumAddress> {

src/rest/StreamEndpoints.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import StreamPart from '../stream/StreamPart'
1111
import { isKeyExchangeStream } from '../stream/KeyExchange'
1212

1313
import authFetch, { ErrorCode, NotFoundError } from './authFetch'
14-
import { EthereumAddress, Todo } from '../types'
14+
import { EthereumAddress } from '../types'
1515
import { StreamrClient } from '../StreamrClient'
1616
// TODO change this import when streamr-client-protocol exports StreamMessage type or the enums types directly
1717
import { ContentType, EncryptionType, SignatureType, StreamMessageType } from 'streamr-client-protocol/dist/src/protocol/message_layer/StreamMessage'
@@ -226,6 +226,7 @@ export class StreamEndpoints {
226226
}
227227

228228
async getStreamLast(streamObjectOrId: Stream|string): Promise<StreamMessageAsObject> {
229+
// @ts-expect-error
229230
const { streamId, streamPartition = 0, count = 1 } = validateOptions(streamObjectOrId)
230231
this.client.debug('getStreamLast %o', {
231232
streamId,
@@ -234,6 +235,7 @@ export class StreamEndpoints {
234235
})
235236

236237
const url = (
238+
// @ts-expect-error
237239
getEndpointUrl(this.client.options.restUrl, 'streams', streamId, 'data', 'partitions', streamPartition, 'last')
238240
+ `?${qs.stringify({ count })}`
239241
)
@@ -252,7 +254,7 @@ export class StreamEndpoints {
252254
return result
253255
}
254256

255-
async publishHttp(streamObjectOrId: Stream|string, data: Todo, requestOptions: Todo = {}, keepAlive: boolean = true) {
257+
async publishHttp(streamObjectOrId: Stream|string, data: any, requestOptions: any = {}, keepAlive: boolean = true) {
256258
let streamId
257259
if (streamObjectOrId instanceof Stream) {
258260
streamId = streamObjectOrId.id

src/stream/index.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ import authFetch from '../rest/authFetch'
33

44
import StorageNode from './StorageNode'
55
import { StreamrClient } from '../StreamrClient'
6-
import { Todo } from '../types'
6+
7+
// TODO explicit types: e.g. we never provide both streamId and id, or both streamPartition and partition
8+
export type StreamPartDefinition = string | { streamId?: string, streamPartition?: number, id?: string, partition?: number, stream?: Stream }
9+
10+
export type ValidatedStreamPartDefinition = { streamId: string, streamPartition: number, key: string}
711

812
interface StreamPermisionBase {
913
id: number
@@ -247,7 +251,7 @@ export class Stream {
247251
return json.map((item: any) => new StorageNode(item.storageNodeAddress))
248252
}
249253

250-
async publish(...theArgs: Todo) {
251-
return this._client.publish(this.id, ...theArgs)
254+
async publish(content: object, timestamp?: number|string|Date, partitionKey?: string) {
255+
return this._client.publish(this.id, content, timestamp, partitionKey)
252256
}
253257
}

src/stream/utils.js renamed to src/stream/utils.ts

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ import { inspect } from 'util'
77
import { ControlLayer } from 'streamr-client-protocol'
88

99
import { pTimeout } from '../utils'
10+
import { Todo } from '../types'
11+
import { StreamrClient } from '../StreamrClient'
12+
import { StreamPartDefinition, ValidatedStreamPartDefinition } from '.'
1013

11-
export function StreamKey({ streamId, streamPartition = 0 }) {
14+
export function StreamKey({ streamId, streamPartition = 0 }: Todo) {
1215
if (streamId == null) { throw new Error(`StreamKey: invalid streamId (${typeof streamId}): ${streamId}`) }
1316

1417
if (!Number.isInteger(streamPartition) || streamPartition < 0) {
@@ -17,13 +20,13 @@ export function StreamKey({ streamId, streamPartition = 0 }) {
1720
return `${streamId}::${streamPartition}`
1821
}
1922

20-
export function validateOptions(optionsOrStreamId) {
23+
export function validateOptions(optionsOrStreamId: StreamPartDefinition): ValidatedStreamPartDefinition {
2124
if (!optionsOrStreamId) {
2225
throw new Error('streamId is required!')
2326
}
2427

2528
// Backwards compatibility for giving a streamId as first argument
26-
let options = {}
29+
let options: Todo = {}
2730
if (typeof optionsOrStreamId === 'string') {
2831
options = {
2932
streamId: optionsOrStreamId,
@@ -42,7 +45,7 @@ export function validateOptions(optionsOrStreamId) {
4245
options.streamId = optionsOrStreamId.id
4346
}
4447

45-
if (optionsOrStreamId.partition == null && optionsOrStreamId.streamPartition == null) {
48+
if (optionsOrStreamId.partition != null && optionsOrStreamId.streamPartition == null) {
4649
options.streamPartition = optionsOrStreamId.partition
4750
}
4851

@@ -89,7 +92,7 @@ export async function waitForMatchingMessage({
8992
rejectOnTimeout = true,
9093
timeoutMessage,
9194
cancelTask,
92-
}) {
95+
}: Todo) {
9396
if (typeof matchFn !== 'function') {
9497
throw new Error(`matchFn required, got: (${typeof matchFn}) ${matchFn}`)
9598
}
@@ -98,7 +101,7 @@ export async function waitForMatchingMessage({
98101
let cleanup = () => {}
99102

100103
const matchTask = new Promise((resolve, reject) => {
101-
const tryMatch = (...args) => {
104+
const tryMatch = (...args: Todo[]) => {
102105
try {
103106
return matchFn(...args)
104107
} catch (err) {
@@ -107,19 +110,20 @@ export async function waitForMatchingMessage({
107110
return false
108111
}
109112
}
110-
let onDisconnected
111-
const onResponse = (res) => {
113+
let onDisconnected: Todo
114+
const onResponse = (res: Todo) => {
112115
if (!tryMatch(res)) { return }
113116
// clean up err handler
114117
cleanup()
115118
resolve(res)
116119
}
117120

118-
const onErrorResponse = (res) => {
121+
const onErrorResponse = (res: Todo) => {
119122
if (!tryMatch(res)) { return }
120123
// clean up success handler
121124
cleanup()
122125
const error = new Error(res.errorMessage)
126+
// @ts-expect-error
123127
error.code = res.errorCode
124128
reject(error)
125129
}
@@ -128,19 +132,20 @@ export async function waitForMatchingMessage({
128132
if (cancelTask) { cancelTask.catch(() => {}) } // ignore
129133
connection.off('disconnected', onDisconnected)
130134
connection.off(ControlMessage.TYPES.ErrorResponse, onErrorResponse)
131-
types.forEach((type) => {
135+
types.forEach((type: Todo) => {
132136
connection.off(type, onResponse)
133137
})
134138
}
135139

136-
types.forEach((type) => {
140+
types.forEach((type: Todo) => {
137141
connection.on(type, onResponse)
138142
})
139143

140144
connection.on(ControlMessage.TYPES.ErrorResponse, onErrorResponse)
141145

142146
onDisconnected = () => {
143147
cleanup()
148+
// @ts-expect-error
144149
resolve() // noop
145150
}
146151

@@ -171,7 +176,7 @@ export async function waitForMatchingMessage({
171176
* Wait for matching response types to requestId, or ErrorResponse.
172177
*/
173178

174-
export async function waitForResponse({ requestId, timeoutMessage = `Waiting for response to: ${requestId}.`, ...opts }) {
179+
export async function waitForResponse({ requestId, timeoutMessage = `Waiting for response to: ${requestId}.`, ...opts }: Todo) {
175180
if (requestId == null) {
176181
throw new Error(`requestId required, got: (${typeof requestId}) ${requestId}`)
177182
}
@@ -180,13 +185,13 @@ export async function waitForResponse({ requestId, timeoutMessage = `Waiting for
180185
...opts,
181186
requestId,
182187
timeoutMessage,
183-
matchFn(res) {
188+
matchFn(res: Todo) {
184189
return res.requestId === requestId
185190
}
186191
})
187192
}
188193

189-
export async function waitForRequestResponse(client, request, opts = {}) {
194+
export async function waitForRequestResponse(client: StreamrClient, request: Todo, opts: Todo = {}) {
190195
return waitForResponse({
191196
connection: client.connection,
192197
types: PAIRS.get(request.type),

0 commit comments

Comments
 (0)