Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit c1d72c3

Browse files
authored
NET-236: Create stream by path (#256)
The id parameter of client.createStream can be a path (or full stream id, as previously). It will be prefixed with a lowercased Ethereum address derived from user's private key. E.g. client.createStream({ id: '/foo/bar' }) // -> creates 0xabcdeabcde123456789012345678901234567890/foo/bar
1 parent 6c56fe2 commit c1d72c3

File tree

6 files changed

+166
-49
lines changed

6 files changed

+166
-49
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ See "Subscription options" for resend options
9494
### Programmatically creating a stream
9595

9696
```js
97-
const stream = await client.getOrCreateStream({
98-
name: 'My awesome stream created via the API',
97+
const stream = await client.createStream({
98+
id: '/foo/bar', // or 0x1234567890123456789012345678901234567890/foo/bar or mydomain.eth/foo/bar
9999
})
100100
console.log(`Stream ${stream.id} has been created!`)
101101

@@ -328,7 +328,7 @@ All the below functions return a Promise which gets resolved with the result.
328328
| getStream(streamId) | Fetches a stream object from the API. |
329329
| listStreams(query) | Fetches an array of stream objects from the API. For the query params, consult the [API docs](https://api-explorer.streamr.com). |
330330
| getStreamByName(name) | Fetches a stream which exactly matches the given name. |
331-
| createStream(\[properties]) | Creates a stream with the given properties. For more information on the stream properties, consult the [API docs](https://api-explorer.streamr.com). |
331+
| createStream(\[properties]) | Creates a stream with the given properties. For more information on the stream properties, consult the [API docs](https://api-explorer.streamr.com). If you specify `id`, it can be a full streamId or a path (e.g. `/foo/bar` will create a stream with id `<your-etherereum-address>/foo/bar` if you have authenticated with a private key)|
332332
| getOrCreateStream(properties) | Gets a stream with the id or name given in `properties`, or creates it if one is not found. |
333333
| publish(streamId, message, timestamp, partitionKey) | Publishes a new message to the given stream. |
334334

src/rest/StreamEndpoints.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import qs from 'qs'
55
import debugFactory from 'debug'
66

77
import { getEndpointUrl } from '../utils'
8-
import { validateOptions } from '../stream/utils'
8+
import { createStreamId, validateOptions } from '../stream/utils'
99
import { Stream, StreamOperation, StreamProperties } from '../stream'
1010
import { StreamPart } from '../stream/StreamPart'
1111
import { isKeyExchangeStream } from '../stream/encryption/KeyExchangeUtils'
@@ -133,18 +133,22 @@ export class StreamEndpoints {
133133

134134
/**
135135
* @category Important
136+
* @param props - if id is specified, it can be full streamId or path
136137
*/
137138
async createStream(props?: Partial<StreamProperties>) {
138139
this.client.debug('createStream %o', {
139140
props,
140141
})
141-
142+
const body = (props?.id !== undefined) ? {
143+
...props,
144+
id: await createStreamId(props.id, () => this.client.getAddress())
145+
} : props
142146
const json = await authFetch<StreamProperties>(
143147
getEndpointUrl(this.client.options.restUrl, 'streams'),
144148
this.client.session,
145149
{
146150
method: 'POST',
147-
body: JSON.stringify(props),
151+
body: JSON.stringify(body),
148152
},
149153
)
150154
return new Stream(this.client, json)

src/stream/utils.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { inspect } from 'util'
77
import { ControlLayer } from 'streamr-client-protocol'
88

99
import { pTimeout } from '../utils'
10-
import { Todo } from '../types'
10+
import { EthereumAddress, Todo } from '../types'
1111
import { StreamrClient } from '../StreamrClient'
1212
import { StreamPartDefinition, ValidatedStreamPartDefinition } from '.'
1313

@@ -199,3 +199,22 @@ export async function waitForRequestResponse(client: StreamrClient, request: Tod
199199
...opts, // e.g. timeout, rejectOnTimeout
200200
})
201201
}
202+
203+
export const createStreamId = async (streamIdOrPath: string, ownerProvider?: () => Promise<EthereumAddress|undefined>) => {
204+
if (streamIdOrPath === undefined) {
205+
throw new Error('Missing stream id')
206+
}
207+
208+
if (!streamIdOrPath.startsWith('/')) {
209+
return streamIdOrPath
210+
}
211+
212+
if (ownerProvider === undefined) {
213+
throw new Error(`Owner provider missing for stream id: ${streamIdOrPath}`)
214+
}
215+
const owner = await ownerProvider()
216+
if (owner === undefined) {
217+
throw new Error(`Owner missing for stream id: ${streamIdOrPath}`)
218+
}
219+
return owner.toLowerCase() + streamIdOrPath
220+
}

test/integration/StreamEndpoints.test.ts

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Stream, StreamOperation } from '../../src/stream'
44
import { StorageNode } from '../../src/stream/StorageNode'
55

66
import { StreamrClient } from '../../src/StreamrClient'
7-
import { uid } from '../utils'
7+
import { uid, fakeAddress } from '../utils'
88

99
import config from './config'
1010

@@ -15,6 +15,7 @@ import config from './config'
1515
function TestStreamEndpoints(getName: () => string) {
1616
let client: StreamrClient
1717
let wallet: Wallet
18+
let createdStreamPath: string
1819
let createdStream: Stream
1920

2021
const createClient = (opts = {}) => new StreamrClient({
@@ -34,7 +35,9 @@ function TestStreamEndpoints(getName: () => string) {
3435
})
3536

3637
beforeAll(async () => {
38+
createdStreamPath = `/StreamEndpoints-${Date.now()}`
3739
createdStream = await client.createStream({
40+
id: `${wallet.address}${createdStreamPath}`,
3841
name: getName(),
3942
requireSignedData: true,
4043
requireEncryptedData: false,
@@ -55,6 +58,22 @@ function TestStreamEndpoints(getName: () => string) {
5558
expect(stream.requireEncryptedData).toBe(true)
5659
})
5760

61+
it('valid id', async () => {
62+
const newId = `${wallet.address}/StreamEndpoints-createStream-newId-${Date.now()}`
63+
const newStream = await client.createStream({
64+
id: newId,
65+
})
66+
expect(newStream.id).toEqual(newId)
67+
})
68+
69+
it('valid path', async () => {
70+
const newPath = `/StreamEndpoints-createStream-newPath-${Date.now()}`
71+
const newStream = await client.createStream({
72+
id: newPath,
73+
})
74+
expect(newStream.id).toEqual(`${wallet.address.toLowerCase()}${newPath}`)
75+
})
76+
5877
it('invalid id', () => {
5978
return expect(() => client.createStream({ id: 'invalid.eth/foobar' })).rejects.toThrow(ValidationError)
6079
})
@@ -68,7 +87,7 @@ function TestStreamEndpoints(getName: () => string) {
6887
})
6988

7089
it('get a non-existing Stream', async () => {
71-
const id = `${wallet.address}/StreamEndpoints-integration-nonexisting-${Date.now()}`
90+
const id = `${wallet.address}/StreamEndpoints-nonexisting-${Date.now()}`
7291
return expect(() => client.getStream(id)).rejects.toThrow(NotFoundError)
7392
})
7493
})
@@ -81,43 +100,63 @@ function TestStreamEndpoints(getName: () => string) {
81100
})
82101

83102
it('get a non-existing Stream', async () => {
84-
const name = `${wallet.address}/StreamEndpoints-integration-nonexisting-${Date.now()}`
103+
const name = `${wallet.address}/StreamEndpoints-nonexisting-${Date.now()}`
85104
return expect(() => client.getStreamByName(name)).rejects.toThrow(NotFoundError)
86105
})
87106
})
88107

89108
describe('getOrCreate', () => {
90-
it('getOrCreate an existing Stream by name', async () => {
109+
it('existing Stream by name', async () => {
91110
const existingStream = await client.getOrCreateStream({
92111
name: createdStream.name,
93112
})
94113
expect(existingStream.id).toBe(createdStream.id)
95114
expect(existingStream.name).toBe(createdStream.name)
96115
})
97116

98-
it('getOrCreate an existing Stream by id', async () => {
117+
it('existing Stream by id', async () => {
99118
const existingStream = await client.getOrCreateStream({
100119
id: createdStream.id,
101120
})
102121
expect(existingStream.id).toBe(createdStream.id)
103122
expect(existingStream.name).toBe(createdStream.name)
104123
})
105124

106-
it('getOrCreate a new Stream by name', async () => {
125+
it('new Stream by name', async () => {
107126
const newName = uid('stream')
108127
const newStream = await client.getOrCreateStream({
109128
name: newName,
110129
})
111130
expect(newStream.name).toEqual(newName)
112131
})
113132

114-
it('getOrCreate a new Stream by id', async () => {
115-
const newId = `${wallet.address}/StreamEndpoints-integration-${Date.now()}`
133+
it('new Stream by id', async () => {
134+
const newId = `${wallet.address}/StreamEndpoints-getOrCreate-newId-${Date.now()}`
116135
const newStream = await client.getOrCreateStream({
117136
id: newId,
118137
})
119138
expect(newStream.id).toEqual(newId)
120139
})
140+
141+
it('new Stream by path', async () => {
142+
const newPath = `/StreamEndpoints-getOrCreate-newPath-${Date.now()}`
143+
const newStream = await client.getOrCreateStream({
144+
id: newPath,
145+
})
146+
expect(newStream.id).toEqual(`${wallet.address.toLowerCase()}${newPath}`)
147+
})
148+
149+
it('fails if stream prefixed with other users address', async () => {
150+
// can't create streams for other users
151+
const otherAddress = `0x${fakeAddress()}`
152+
const newPath = `/StreamEndpoints-getOrCreate-newPath-${Date.now()}`
153+
// backend should error
154+
await expect(async () => {
155+
await client.getOrCreateStream({
156+
id: `${otherAddress}${newPath}`,
157+
})
158+
}).rejects.toThrow('Validation')
159+
})
121160
})
122161

123162
describe('listStreams', () => {

test/unit/StreamUtils.test.ts

Lines changed: 85 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,101 @@
11
import { Stream } from '../../src/stream'
2-
import { validateOptions } from '../../src/stream/utils'
2+
import { createStreamId, validateOptions } from '../../src/stream/utils'
33

44
describe('Stream utils', () => {
55

6-
it('no definition', () => {
7-
expect(() => validateOptions(undefined as any)).toThrow()
8-
expect(() => validateOptions(null as any)).toThrow()
9-
expect(() => validateOptions({})).toThrow()
10-
})
6+
describe('validateOptions', () => {
117

12-
it('string', () => {
13-
expect(validateOptions('foo')).toMatchObject({
14-
streamId: 'foo',
15-
streamPartition: 0,
16-
key: 'foo::0'
8+
it('no definition', () => {
9+
expect(() => validateOptions(undefined as any)).toThrow()
10+
expect(() => validateOptions(null as any)).toThrow()
11+
expect(() => validateOptions({})).toThrow()
1712
})
18-
})
1913

20-
it('object', () => {
21-
expect(validateOptions({ streamId: 'foo' })).toMatchObject({
22-
streamId: 'foo',
23-
streamPartition: 0,
24-
key: 'foo::0'
14+
it('string', () => {
15+
expect(validateOptions('foo')).toMatchObject({
16+
streamId: 'foo',
17+
streamPartition: 0,
18+
key: 'foo::0'
19+
})
2520
})
26-
expect(validateOptions({ streamId: 'foo', streamPartition: 123 })).toMatchObject({
27-
streamId: 'foo',
28-
streamPartition: 123,
29-
key: 'foo::123'
21+
22+
it('object', () => {
23+
expect(validateOptions({ streamId: 'foo' })).toMatchObject({
24+
streamId: 'foo',
25+
streamPartition: 0,
26+
key: 'foo::0'
27+
})
28+
expect(validateOptions({ streamId: 'foo', streamPartition: 123 })).toMatchObject({
29+
streamId: 'foo',
30+
streamPartition: 123,
31+
key: 'foo::123'
32+
})
33+
expect(validateOptions({ id: 'foo', partition: 123 })).toMatchObject({
34+
streamId: 'foo',
35+
streamPartition: 123,
36+
key: 'foo::123'
37+
})
3038
})
31-
expect(validateOptions({ id: 'foo', partition: 123 })).toMatchObject({
32-
streamId: 'foo',
33-
streamPartition: 123,
34-
key: 'foo::123'
39+
40+
it('stream', () => {
41+
const stream = new Stream(undefined as any, {
42+
id: 'foo',
43+
name: 'bar'
44+
})
45+
expect(validateOptions({ stream })).toMatchObject({
46+
streamId: 'foo',
47+
streamPartition: 0,
48+
key: 'foo::0'
49+
})
3550
})
51+
3652
})
3753

38-
it('stream', () => {
39-
const stream = new Stream(undefined as any, {
40-
id: 'foo',
41-
name: 'bar'
54+
describe('createStreamId', () => {
55+
const ownerProvider = () => Promise.resolve('0xaAAAaaaaAA123456789012345678901234567890')
56+
57+
it('path', async () => {
58+
const path = '/foo/BAR'
59+
const actual = await createStreamId(path, ownerProvider)
60+
expect(actual).toBe('0xaaaaaaaaaa123456789012345678901234567890/foo/BAR')
4261
})
43-
expect(validateOptions({ stream })).toMatchObject({
44-
streamId: 'foo',
45-
streamPartition: 0,
46-
key: 'foo::0'
62+
63+
it('path: no owner', () => {
64+
const path = '/foo/BAR'
65+
return expect(createStreamId(path, async () => undefined)).rejects.toThrowError('Owner missing for stream id: /foo/BAR')
66+
})
67+
68+
it('path: no owner provider', () => {
69+
const path = '/foo/BAR'
70+
return expect(createStreamId(path, undefined)).rejects.toThrowError('Owner provider missing for stream id: /foo/BAR')
4771
})
48-
})
4972

73+
it('full: ethereum address', async () => {
74+
const id = '0xbbbbbBbBbB123456789012345678901234567890/foo/BAR'
75+
const actual = await createStreamId(id)
76+
expect(actual).toBe(id)
77+
})
78+
79+
it('full: ENS domain', async () => {
80+
const id = 'example.eth/foo/BAR'
81+
const actual = await createStreamId(id)
82+
expect(actual).toBe(id)
83+
})
84+
85+
it('legacy', async () => {
86+
const id = 'abcdeFGHJI1234567890ab'
87+
const actual = await createStreamId(id)
88+
expect(actual).toBe(id)
89+
})
90+
91+
it('system', async () => {
92+
const id = 'SYSTEM/keyexchange/0xcccccccccc123456789012345678901234567890'
93+
const actual = await createStreamId(id)
94+
expect(actual).toBe(id)
95+
})
96+
97+
it('undefined', () => {
98+
return expect(createStreamId(undefined as any)).rejects.toThrowError('Missing stream id')
99+
})
100+
})
50101
})

test/utils.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ export function fakePrivateKey() {
1717
return crypto.randomBytes(32).toString('hex')
1818
}
1919

20+
export function fakeAddress() {
21+
return crypto.randomBytes(32).toString('hex').slice(0, 40)
22+
}
23+
2024
const TEST_REPEATS = (process.env.TEST_REPEATS) ? parseInt(process.env.TEST_REPEATS, 10) : 1
2125

2226
export function describeRepeats(msg: any, fn: any, describeFn = describe) {

0 commit comments

Comments
 (0)