Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit c205059

Browse files
Tuomas Koponentimoxley
authored andcommitted
Add detectFields implementation to Stream
1 parent 448485f commit c205059

File tree

4 files changed

+161
-6
lines changed

4 files changed

+161
-6
lines changed

src/stream/index.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { getEndpointUrl } from '../utils'
1+
import { getEndpointUrl, FieldDetector } from '../utils'
22
import authFetch from '../rest/authFetch'
33

44
import StorageNode from './StorageNode'
@@ -125,10 +125,24 @@ export default class Stream {
125125
}
126126

127127
async detectFields() {
128-
return authFetch(
129-
getEndpointUrl(this._client.options.restUrl, 'streams', this.id, 'detectFields'),
130-
this._client.session,
131-
)
128+
// Get last message of the stream to be used for field detecting
129+
const sub = await this._client.resend({
130+
stream: this.id,
131+
resend: {
132+
last: 1,
133+
},
134+
})
135+
const receivedMsgs = await sub.collect()
136+
137+
if (receivedMsgs.length > 0) {
138+
const lastMessage = receivedMsgs[0]
139+
const fd = new FieldDetector(lastMessage)
140+
const fields = fd.detect()
141+
142+
// Save field config back to the stream
143+
this.config.fields = fields
144+
await this.update()
145+
}
132146
}
133147

134148
async addToStorageNode(address: string) {

src/utils/FieldDetector.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
export default class FieldDetector {
2+
message = null
3+
4+
constructor(message) {
5+
this.message = message
6+
}
7+
8+
detect() {
9+
if (this.message == null) {
10+
throw new Error('Invalid message provided to FieldDetector constructor')
11+
}
12+
13+
const content = this.message
14+
const fields = []
15+
16+
Object.keys(content).forEach((key) => {
17+
let type
18+
if (Array.isArray(content[key])) {
19+
type = 'list'
20+
} else if ((typeof content[key]) === 'object') {
21+
type = 'map'
22+
} else {
23+
type = typeof content[key]
24+
}
25+
fields.push({
26+
name: key,
27+
type,
28+
})
29+
})
30+
31+
return fields
32+
}
33+
}

src/utils/index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ import pkg from '../../package.json'
1111

1212
import AggregatedError from './AggregatedError'
1313
import Scaffold from './Scaffold'
14+
import FieldDetector from './FieldDetector'
1415

15-
export { AggregatedError, Scaffold }
16+
export { AggregatedError, Scaffold, FieldDetector }
1617

1718
const UUID = uuidv4()
1819

test/integration/Stream.test.js

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import StreamrClient from '../../src'
2+
import { uid, fakePrivateKey, getPublishTestMessages } from '../utils'
3+
4+
import config from './config'
5+
6+
const createClient = (opts = {}) => new StreamrClient({
7+
...config.clientOptions,
8+
auth: {
9+
privateKey: fakePrivateKey(),
10+
},
11+
autoConnect: false,
12+
autoDisconnect: false,
13+
...opts,
14+
})
15+
16+
describe('Stream', () => {
17+
let client
18+
let stream
19+
20+
beforeEach(async () => {
21+
client = createClient()
22+
await client.connect()
23+
24+
stream = await client.createStream({
25+
name: uid('stream-integration-test')
26+
})
27+
})
28+
29+
afterEach(async () => {
30+
await client.disconnect()
31+
})
32+
33+
describe('detectFields()', () => {
34+
it('does detect primitive types', async () => {
35+
const msg = {
36+
number: 123,
37+
boolean: true,
38+
object: {
39+
k: 1,
40+
v: 2,
41+
},
42+
array: [1, 2, 3],
43+
string: 'test',
44+
}
45+
const publishTestMessages = getPublishTestMessages(client, {
46+
streamId: stream.id,
47+
waitForLast: true,
48+
createMessage: () => msg,
49+
})
50+
await publishTestMessages(1)
51+
52+
expect(stream.config.fields).toEqual([])
53+
await stream.detectFields()
54+
expect(stream.config.fields).toEqual([
55+
{
56+
name: 'number',
57+
type: 'number',
58+
},
59+
{
60+
name: 'boolean',
61+
type: 'boolean',
62+
},
63+
{
64+
name: 'object',
65+
type: 'map',
66+
},
67+
{
68+
name: 'array',
69+
type: 'list',
70+
},
71+
{
72+
name: 'string',
73+
type: 'string',
74+
},
75+
])
76+
})
77+
78+
it('skips unsupported types', async () => {
79+
const msg = {
80+
null: null,
81+
empty: {},
82+
func: () => null,
83+
nonexistent: undefined,
84+
symbol: Symbol('test'),
85+
}
86+
const publishTestMessages = getPublishTestMessages(client, {
87+
streamId: stream.id,
88+
waitForLast: true,
89+
createMessage: () => msg,
90+
})
91+
await publishTestMessages(1)
92+
93+
expect(stream.config.fields).toEqual([])
94+
await stream.detectFields()
95+
expect(stream.config.fields).toEqual([
96+
{
97+
name: 'null',
98+
type: 'map',
99+
},
100+
{
101+
name: 'empty',
102+
type: 'map',
103+
},
104+
])
105+
})
106+
})
107+
})

0 commit comments

Comments
 (0)