Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit b50eb2a

Browse files
authored
Merge pull request #196 from streamr-dev/NET-182-detect-fields
[NET-182] Add detectFields implementation to Stream
2 parents 448485f + ba22250 commit b50eb2a

File tree

4 files changed

+173
-32
lines changed

4 files changed

+173
-32
lines changed

src/StreamrClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ export default class StreamrClient extends EventEmitter {
337337
return this.publisher.rotateGroupKey(...args)
338338
}
339339

340-
async subscribe(opts: Todo, onMessage: OnMessageCallback) {
340+
async subscribe(opts: Todo, onMessage?: OnMessageCallback) {
341341
let subTask: Todo
342342
let sub: Todo
343343
const hasResend = !!(opts.resend || opts.from || opts.to || opts.last)
@@ -372,7 +372,7 @@ export default class StreamrClient extends EventEmitter {
372372
await this.subscriber.unsubscribe(opts)
373373
}
374374

375-
async resend(opts: Todo, onMessage: OnMessageCallback) {
375+
async resend(opts: Todo, onMessage?: OnMessageCallback) {
376376
const task = this.subscriber.resend(opts)
377377
if (typeof onMessage !== 'function') {
378378
return task

src/stream/index.ts

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,39 @@ export enum StreamOperation {
1616

1717
export type StreamProperties = Todo
1818

19-
export default class Stream {
19+
const VALID_FIELD_TYPES = ['number', 'string', 'boolean', 'list', 'map'] as const
20+
21+
type Field = {
22+
name: string;
23+
type: typeof VALID_FIELD_TYPES[number];
24+
}
2025

26+
function getFieldType(value: any): (Field['type'] | undefined) {
27+
const type = typeof value
28+
switch (true) {
29+
case Array.isArray(value): {
30+
return 'list'
31+
}
32+
case type === 'object': {
33+
return 'map'
34+
}
35+
case (VALID_FIELD_TYPES as ReadonlyArray<string>).includes(type): {
36+
// see https://github.com/microsoft/TypeScript/issues/36275
37+
return type as Field['type']
38+
}
39+
default: {
40+
return undefined
41+
}
42+
}
43+
}
44+
45+
export default class Stream {
2146
// TODO add field definitions for all fields
2247
// @ts-expect-error
2348
id: string
49+
config: {
50+
fields: Field[];
51+
} = { fields: [] }
2452
_client: StreamrClient
2553

2654
constructor(client: StreamrClient, props: StreamProperties) {
@@ -125,10 +153,31 @@ export default class Stream {
125153
}
126154

127155
async detectFields() {
128-
return authFetch(
129-
getEndpointUrl(this._client.options.restUrl, 'streams', this.id, 'detectFields'),
130-
this._client.session,
131-
)
156+
// Get last message of the stream to be used for field detecting
157+
const sub = await this._client.resend({
158+
stream: this.id,
159+
resend: {
160+
last: 1,
161+
},
162+
})
163+
164+
const receivedMsgs = await sub.collect()
165+
166+
if (!receivedMsgs.length) { return }
167+
168+
const [lastMessage] = receivedMsgs
169+
170+
const fields = Object.entries(lastMessage).map(([name, value]) => {
171+
const type = getFieldType(value)
172+
return !!type && {
173+
name,
174+
type,
175+
}
176+
}).filter(Boolean) as Field[] // see https://github.com/microsoft/TypeScript/issues/30621
177+
178+
// Save field config back to the stream
179+
this.config.fields = fields
180+
await this.update()
132181
}
133182

134183
async addToStorageNode(address: string) {

test/integration/Stream.test.js

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import StreamrClient from '../../src/StreamrClient'
2+
import { uid, fakePrivateKey, getPublishTestMessages } from '../utils'
3+
4+
import config from './config'
5+
6+
const createClient = (opts = {}) => new StreamrClient({
7+
...config.clientOptions,
8+
auth: {
9+
privateKey: fakePrivateKey(),
10+
},
11+
autoConnect: false,
12+
autoDisconnect: false,
13+
...opts,
14+
})
15+
16+
describe('Stream', () => {
17+
let client
18+
let stream
19+
20+
beforeEach(async () => {
21+
client = createClient()
22+
await client.connect()
23+
24+
stream = await client.createStream({
25+
name: uid('stream-integration-test')
26+
})
27+
})
28+
29+
afterEach(async () => {
30+
await client.disconnect()
31+
})
32+
33+
describe('detectFields()', () => {
34+
it('does detect primitive types', async () => {
35+
const msg = {
36+
number: 123,
37+
boolean: true,
38+
object: {
39+
k: 1,
40+
v: 2,
41+
},
42+
array: [1, 2, 3],
43+
string: 'test',
44+
}
45+
const publishTestMessages = getPublishTestMessages(client, {
46+
streamId: stream.id,
47+
waitForLast: true,
48+
createMessage: () => msg,
49+
})
50+
await publishTestMessages(1)
51+
52+
expect(stream.config.fields).toEqual([])
53+
await stream.detectFields()
54+
const expectedFields = [
55+
{
56+
name: 'number',
57+
type: 'number',
58+
},
59+
{
60+
name: 'boolean',
61+
type: 'boolean',
62+
},
63+
{
64+
name: 'object',
65+
type: 'map',
66+
},
67+
{
68+
name: 'array',
69+
type: 'list',
70+
},
71+
{
72+
name: 'string',
73+
type: 'string',
74+
},
75+
]
76+
77+
expect(stream.config.fields).toEqual(expectedFields)
78+
const loadedStream = await client.getStream(stream.id)
79+
expect(loadedStream.config.fields).toEqual(expectedFields)
80+
})
81+
82+
it('skips unsupported types', async () => {
83+
const msg = {
84+
null: null,
85+
empty: {},
86+
func: () => null,
87+
nonexistent: undefined,
88+
symbol: Symbol('test'),
89+
// TODO: bigint: 10n,
90+
}
91+
const publishTestMessages = getPublishTestMessages(client, {
92+
streamId: stream.id,
93+
waitForLast: true,
94+
createMessage: () => msg,
95+
})
96+
await publishTestMessages(1)
97+
98+
expect(stream.config.fields).toEqual([])
99+
await stream.detectFields()
100+
const expectedFields = [
101+
{
102+
name: 'null',
103+
type: 'map',
104+
},
105+
{
106+
name: 'empty',
107+
type: 'map',
108+
},
109+
]
110+
111+
expect(stream.config.fields).toEqual(expectedFields)
112+
113+
const loadedStream = await client.getStream(stream.id)
114+
expect(loadedStream.config.fields).toEqual(expectedFields)
115+
})
116+
})
117+
})

test/integration/StreamEndpoints.test.js

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { ethers } from 'ethers'
2-
import { wait } from 'streamr-test-utils'
32

43
import StreamrClient from '../../src/StreamrClient'
54
import { uid } from '../utils'
@@ -176,30 +175,6 @@ function TestStreamEndpoints(getName) {
176175
})
177176
})
178177

179-
describe.skip('Stream configuration', () => {
180-
it('Stream.detectFields', async () => {
181-
await client.connect()
182-
await client.publish(createdStream.id, {
183-
foo: 'bar',
184-
count: 0,
185-
})
186-
// Need time to propagate to storage
187-
await wait(10000)
188-
const stream = await createdStream.detectFields()
189-
expect(stream.config.fields).toEqual([
190-
{
191-
name: 'foo',
192-
type: 'string',
193-
},
194-
{
195-
name: 'count',
196-
type: 'number',
197-
},
198-
])
199-
await client.disconnect()
200-
}, 15000)
201-
})
202-
203178
describe('Stream permissions', () => {
204179
it('Stream.getPermissions', async () => {
205180
const permissions = await createdStream.getPermissions()

0 commit comments

Comments
 (0)