Skip to content

Commit 823c37e

Browse files
committed
feat: 🎸 add "blocks.listen" method
1 parent 8f8aa13 commit 823c37e

File tree

5 files changed

+139
-6
lines changed

5 files changed

+139
-6
lines changed

src/server/__tests__/blocks.spec.ts

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import {of} from 'rxjs';
12
import {Model} from '../../json-crdt';
23
import {Value} from '../../reactive-rpc/common/messages/Value';
34
import {RpcError, RpcErrorCodes} from '../../reactive-rpc/common/rpc/caller';
45
import {setup} from './setup';
6+
import {tick, until} from '../../__tests__/util';
57

68
describe('blocks.*', () => {
79
describe('blocks.create', () => {
@@ -11,7 +13,7 @@ describe('blocks.*', () => {
1113
const {block} = (await caller.call('blocks.get', {id: 'my-block'}, {})).data;
1214
expect(block).toMatchObject({
1315
id: 'my-block',
14-
seq: 0,
16+
seq: -1,
1517
blob: expect.any(Uint8Array),
1618
created: expect.any(Number),
1719
updated: expect.any(Number),
@@ -282,4 +284,74 @@ describe('blocks.*', () => {
282284
expect(patches[2].blob).toStrictEqual(patch3.toBinary());
283285
});
284286
});
287+
288+
describe('blocks.listen', () => {
289+
test('can listen for block changes', async () => {
290+
const {call, caller} = setup();
291+
await call('blocks.create', {id: 'my-block', patches: []});
292+
await tick(11);
293+
const emits: any[] = [];
294+
caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data));
295+
const model = Model.withLogicalClock();
296+
model.api.root({
297+
text: 'Hell',
298+
});
299+
const patch1 = model.api.flush();
300+
await tick(12);
301+
expect(emits.length).toBe(0);
302+
await call('blocks.edit', {id: 'my-block', patches: [{seq: 0, created: Date.now(), blob: patch1.toBinary()}]});
303+
await until(() => emits.length === 1);
304+
expect(emits.length).toBe(1);
305+
expect(emits[0].data.patches.length).toBe(1);
306+
expect(emits[0].data.patches[0].seq).toBe(0);
307+
model.api.root({
308+
text: 'Hello',
309+
});
310+
const patch2 = model.api.flush();
311+
await tick(12);
312+
expect(emits.length).toBe(1);
313+
await call('blocks.edit', {id: 'my-block', patches: [{seq: 1, created: Date.now(), blob: patch2.toBinary()}]});
314+
await until(() => emits.length === 2);
315+
expect(emits.length).toBe(2);
316+
expect(emits[1].data.patches.length).toBe(1);
317+
expect(emits[1].data.patches[0].seq).toBe(1);
318+
});
319+
320+
test('can subscribe before block is created', async () => {
321+
const {call, caller} = setup();
322+
const emits: any[] = [];
323+
caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data));
324+
const model = Model.withLogicalClock();
325+
model.api.root({
326+
text: 'Hell',
327+
});
328+
const patch1 = model.api.flush();
329+
await tick(12);
330+
expect(emits.length).toBe(0);
331+
await call('blocks.create', {id: 'my-block', patches: [
332+
{
333+
seq: 0,
334+
created: Date.now(),
335+
blob: patch1.toBinary(),
336+
},
337+
]});
338+
await until(() => emits.length === 1);
339+
expect(emits.length).toBe(1);
340+
expect(emits[0].data.patches.length).toBe(1);
341+
expect(emits[0].data.patches[0].seq).toBe(0);
342+
expect(emits[0].data.patches[0].blob).toStrictEqual(patch1.toBinary());
343+
});
344+
345+
test('can receive deletion events', async () => {
346+
const {call, caller} = setup();
347+
const emits: any[] = [];
348+
caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data));
349+
await call('blocks.create', {id: 'my-block', patches: []});
350+
await until(() => emits.length === 1);
351+
expect(emits[0].data.block.seq).toBe(-1);
352+
await call('blocks.remove', {id: 'my-block'});
353+
await until(() => emits.length === 2);
354+
expect(emits[1].data.deleted).toBe(true);
355+
});
356+
});
285357
});

src/server/routes/blocks/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {create} from './methods/create';
22
import {get} from './methods/get';
33
import {remove} from './methods/remove';
44
import {edit} from './methods/edit';
5+
import {listen} from './methods/listen';
56
import {Block, BlockId, BlockPatch, BlockSeq} from './schema';
67
import type {RoutesBase, TypeRouter} from '../../../json-type/system/TypeRouter';
78
import type {RouteDeps} from '../types';
@@ -20,5 +21,6 @@ export const blocks =
2021
( get(d)
2122
( remove(d)
2223
( edit(d)
23-
( r ))))));
24+
( listen(d)
25+
( r )))))));
2426
};
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import {switchMap} from 'rxjs';
2+
import type {RoutesBase, TypeRouter} from '../../../../json-type/system/TypeRouter';
3+
import type {RouteDeps} from '../../types';
4+
import type {BlockId, BlockPatch, Block} from '../schema';
5+
6+
export const listen =
7+
({services}: RouteDeps) =>
8+
<R extends RoutesBase>(router: TypeRouter<R>) => {
9+
const t = router.t;
10+
const PatchType = t.Ref<typeof BlockPatch>('BlockPatch');
11+
12+
const Request = t.Object(
13+
t.prop('id', t.Ref<typeof BlockId>('BlockId')).options({
14+
title: 'Block ID',
15+
description: 'The ID of the block to subscribe to.',
16+
}),
17+
);
18+
19+
const Response = t.Object(
20+
t.propOpt('deleted', t.Boolean()).options({
21+
title: 'Deleted',
22+
description: 'Emitted only when the block is deleted.',
23+
}),
24+
t.propOpt('block', t.Ref<typeof Block>('Block')).options({
25+
title: 'Block',
26+
description: 'The whole block object, emitted only when the block is created.',
27+
}),
28+
t.propOpt('patches', t.Array(PatchType)).options({
29+
title: 'Latest patches',
30+
description: 'Patches that have been applied to the block.',
31+
}),
32+
);
33+
34+
const Func = t
35+
.Function$(Request, Response)
36+
.options({
37+
title: 'Listen for block changes',
38+
description: 'Subscribe to a block to receive updates when it changes.',
39+
})
40+
.implement((req$) => {
41+
return req$.pipe(
42+
switchMap(({id}) => services.pubsub.listen$(`__block:${id}`)),
43+
) as any;
44+
});
45+
46+
return router.fn$('blocks.listen', Func);
47+
};

src/server/services/blocks/BlocksServices.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {MemoryStore} from './MemoryStore';
22
import {StorePatch} from './types';
3-
import type {Services} from '../Services';
43
import {RpcError, RpcErrorCodes} from '../../../reactive-rpc/common/rpc/caller';
4+
import type {Services} from '../Services';
55

66
export class BlocksServices {
77
protected readonly store = new MemoryStore();
@@ -11,6 +11,13 @@ export class BlocksServices {
1111
public async create(id: string, patches: StorePatch[]) {
1212
const {store} = this;
1313
const {block} = await store.create(id, patches);
14+
const data = {
15+
block,
16+
patches,
17+
};
18+
this.services.pubsub.publish(`__block:${id}`, data).catch(error => {
19+
console.error('Error publishing block patches', error);
20+
});
1421
return {block};
1522
}
1623

@@ -24,6 +31,9 @@ export class BlocksServices {
2431

2532
public async remove(id: string) {
2633
await this.store.remove(id);
34+
this.services.pubsub.publish(`__block:${id}`, {deleted: true}).catch(error => {
35+
console.error('Error publishing block deletion', error);
36+
});
2737
}
2838

2939
public async edit(id: string, patches: any[]) {
@@ -32,6 +42,9 @@ export class BlocksServices {
3242
const seq = patches[0].seq;
3343
const {store} = this;
3444
const {block} = await store.edit(id, patches);
45+
this.services.pubsub.publish(`__block:${id}`, {patches}).catch(error => {
46+
console.error('Error publishing block patches', error);
47+
});
3548
const expectedBlockSeq = seq + patches.length - 1;
3649
const hadConcurrentEdits = block.seq !== expectedBlockSeq;
3750
let patchesBack: StorePatch[] = [];

src/server/services/blocks/MemoryStore.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,15 @@ export class MemoryStore implements types.Store {
1818
if (!Array.isArray(patches)) throw new Error('NO_PATCHES');
1919
if (this.blocks.has(id)) throw new Error('BLOCK_EXISTS');
2020
const model = Model.withLogicalClock();
21-
let seq = 0;
21+
let seq = -1;
2222
const now = Date.now();
2323
if (patches.length) {
2424
for (const patch of patches) {
25+
seq++;
2526
if (seq !== patch.seq) throw new Error('PATCHES_OUT_OF_ORDER');
2627
model.applyPatch(Patch.fromBinary(patch.blob));
2728
if (patch.created > now) patch.created = now;
28-
seq++;
2929
}
30-
seq--;
3130
}
3231
const block: types.StoreBlock = {
3332
id,

0 commit comments

Comments
 (0)