Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit d270100

Browse files
authored
NET-178: Storage node methods #197
Methods to query+modify storage nodes. Two new entity classes: StreamPart (streamId+partition) and StorageNode (currently just an address, but can be extended later).
1 parent 5eaf97d commit d270100

File tree

5 files changed

+100
-0
lines changed

5 files changed

+100
-0
lines changed

src/rest/StreamEndpoints.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import debugFactory from 'debug'
77
import { getEndpointUrl } from '../utils'
88
import { validateOptions } from '../stream/utils'
99
import Stream from '../stream'
10+
import StreamPart from '../stream/StreamPart'
1011
import { isKeyExchangeStream } from '../stream/KeyExchange'
1112

1213
import authFetch from './authFetch'
@@ -202,6 +203,15 @@ export async function getStreamLast(streamObjectOrId) {
202203
return json
203204
}
204205

206+
export async function getStreamPartsByStorageNode(address) {
207+
const json = await authFetch(getEndpointUrl(this.options.restUrl, 'storageNodes', address, 'streams'), this.session)
208+
let result = []
209+
json.forEach((stream) => {
210+
result = result.concat(StreamPart.fromStream(stream))
211+
})
212+
return result
213+
}
214+
205215
export async function publishHttp(streamObjectOrId, data, requestOptions = {}, keepAlive = true) {
206216
let streamId
207217
if (streamObjectOrId instanceof Stream) {

src/stream/StorageNode.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
export default class StorageNode {
2+
constructor(address) {
3+
this._address = address
4+
}
5+
6+
getAddress() {
7+
return this._address
8+
}
9+
}

src/stream/StreamPart.js

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
export default class StreamPart {
2+
constructor(streamId, streamPartition) {
3+
this._streamId = streamId
4+
this._streamPartition = streamPartition
5+
}
6+
7+
static fromStream({ id, partitions }) {
8+
const result = []
9+
for (let i = 0; i < partitions; i++) {
10+
result.push(new StreamPart(id, i))
11+
}
12+
return result
13+
}
14+
15+
getStreamId() {
16+
return this._streamId
17+
}
18+
19+
getStreamPartition() {
20+
return this._streamPartition
21+
}
22+
}

src/stream/index.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { getEndpointUrl } from '../utils'
22
import authFetch from '../rest/authFetch'
33

4+
import StorageNode from './StorageNode'
5+
46
export default class Stream {
57
constructor(client, props) {
68
this._client = client
@@ -109,6 +111,37 @@ export default class Stream {
109111
)
110112
}
111113

114+
async addToStorageNode(address) {
115+
return authFetch(
116+
getEndpointUrl(this._client.options.restUrl, 'streams', this.id, 'storageNodes'),
117+
this._client.session,
118+
{
119+
method: 'POST',
120+
body: JSON.stringify({
121+
address
122+
})
123+
},
124+
)
125+
}
126+
127+
async removeFromStorageNode(address) {
128+
return authFetch(
129+
getEndpointUrl(this._client.options.restUrl, 'streams', this.id, 'storageNodes', address),
130+
this._client.session,
131+
{
132+
method: 'DELETE'
133+
},
134+
)
135+
}
136+
137+
async getStorageNodes() {
138+
const json = await authFetch(
139+
getEndpointUrl(this._client.options.restUrl, 'streams', this.id, 'storageNodes'),
140+
this._client.session,
141+
)
142+
return json.map((item) => new StorageNode(item.storageNodeAddress))
143+
}
144+
112145
async publish(...theArgs) {
113146
return this._client.publish(this.id, ...theArgs)
114147
}

test/integration/StreamEndpoints.test.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,32 @@ function TestStreamEndpoints(getName) {
229229
expect(await client.getStream(createdStream.id)).toBe(undefined)
230230
})
231231
})
232+
233+
describe.only('Storage node assignment', () => {
234+
it('add', async () => {
235+
const storageNodeAddress = ethers.Wallet.createRandom().address
236+
const stream = await client.createStream()
237+
await stream.addToStorageNode(storageNodeAddress)
238+
const storageNodes = await stream.getStorageNodes()
239+
expect(storageNodes.length).toBe(1)
240+
expect(storageNodes[0].getAddress()).toBe(storageNodeAddress)
241+
const storedStreamParts = await client.getStreamPartsByStorageNode(storageNodeAddress)
242+
expect(storedStreamParts.length).toBe(1)
243+
expect(storedStreamParts[0].getStreamId()).toBe(stream.id)
244+
expect(storedStreamParts[0].getStreamPartition()).toBe(0)
245+
})
246+
247+
it('remove', async () => {
248+
const storageNodeAddress = ethers.Wallet.createRandom().address
249+
const stream = await client.createStream()
250+
await stream.addToStorageNode(storageNodeAddress)
251+
await stream.removeFromStorageNode(storageNodeAddress)
252+
const storageNodes = await stream.getStorageNodes()
253+
expect(storageNodes).toHaveLength(0)
254+
const storedStreamParts = await client.getStreamPartsByStorageNode(storageNodeAddress)
255+
expect(storedStreamParts).toHaveLength(0)
256+
})
257+
})
232258
}
233259

234260
describe('StreamEndpoints', () => {

0 commit comments

Comments
 (0)