Skip to content

Commit 6ebb2ea

Browse files
authored
Merge pull request #572 from streamich/session-history-next
Session history next
2 parents 2389ca3 + 7d6a033 commit 6ebb2ea

File tree

35 files changed

+831
-454
lines changed

35 files changed

+831
-454
lines changed

src/json-crdt/history/LocalHistoryCrud.ts renamed to src/json-crdt-repo/LocalHistoryCrud.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import {CborEncoder} from '@jsonjoy.com/json-pack/lib/cbor/CborEncoder';
22
import {CborDecoder} from '@jsonjoy.com/json-pack/lib/cbor/CborDecoder';
3-
import {LogEncoder} from '../log/codec/LogEncoder';
4-
import {LogDecoder} from '../log/codec/LogDecoder';
3+
import {LogEncoder} from '../json-crdt/log/codec/LogEncoder';
4+
import {LogDecoder} from '../json-crdt/log/codec/LogDecoder';
55
import type {CrudApi} from 'memfs/lib/crud/types';
66
import type {Locks} from 'thingies/es2020/Locks';
7-
import type {Patch} from '../../json-crdt-patch';
8-
import type {Log} from '../log/Log';
7+
import type {Patch} from '../json-crdt-patch';
8+
import type {Log} from '../json-crdt/log/Log';
99
import type {LocalHistory} from './types';
1010

1111
export const genId = (octets: number = 8): string => {
@@ -30,9 +30,8 @@ export class LocalHistoryCrud implements LocalHistory {
3030
protected readonly locks: Locks,
3131
) {}
3232

33-
public async create(collection: string[], log: Log): Promise<{id: string}> {
33+
public async create(collection: string[], log: Log, id: string = genId()): Promise<{id: string}> {
3434
const blob = this.encode(log);
35-
const id = genId();
3635
await this.lock(collection, id, async () => {
3736
await this.crud.put([...collection, id], STATE_FILE_NAME, blob, {throwIf: 'exists'});
3837
});
@@ -54,12 +53,17 @@ export class LocalHistoryCrud implements LocalHistory {
5453
const {frontier} = this.decoder.decode(blob, {format: 'seq.cbor', frontier: true});
5554
return {
5655
log: frontier!,
57-
cursor: '',
56+
cursor: '1',
5857
};
5958
}
6059

61-
public readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}> {
62-
throw new Error('Method not implemented.');
60+
public async readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}> {
61+
const blob = await this.crud.get([...collection, id], STATE_FILE_NAME);
62+
const {history} = this.decoder.decode(blob, {format: 'seq.cbor', history: true});
63+
return {
64+
log: history!,
65+
cursor: '',
66+
};
6367
}
6468

6569
public async update(collection: string[], id: string, patches: Patch[]): Promise<void> {
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import {createRace} from 'thingies/es2020/createRace';
2+
import {FanOutUnsubscribe} from 'thingies/es2020/fanout';
3+
import {InsValOp, Patch} from '../json-crdt-patch';
4+
import {ValNode} from '../json-crdt/nodes';
5+
import {toSchema} from '../json-crdt/schema/toSchema';
6+
import {Log} from '../json-crdt/log/Log';
7+
import {RedoItem, UndoItem, UndoRedoStack} from './UndoRedoStack';
8+
import type {LocalHistory} from './types';
9+
10+
class Undo implements UndoItem {
11+
constructor(public readonly undo: () => Redo) {}
12+
}
13+
14+
class Redo implements RedoItem {
15+
constructor(public readonly redo: () => Undo) {}
16+
}
17+
18+
export class SessionHistory {
19+
constructor(
20+
public readonly collection: string[],
21+
public readonly id: string,
22+
protected readonly local: LocalHistory,
23+
) {}
24+
25+
private readonly __onPatchRace = createRace();
26+
27+
public attachUndoRedo(stack: UndoRedoStack): FanOutUnsubscribe {
28+
// const onBeforePatch = (patch: Patch) => {
29+
// this.__onPatchRace(() => {
30+
// const undo = this.createUndo(patch);
31+
// stack.push(undo);
32+
// });
33+
// };
34+
// const unsubscribe = this.log.end.api.onBeforePatch.listen(onBeforePatch);
35+
// return unsubscribe;
36+
throw new Error('Method not implemented.');
37+
}
38+
39+
public createUndo(patch: Patch): Undo {
40+
const undoTasks: Array<() => void> = [];
41+
const ops = patch.ops;
42+
const length = ops.length;
43+
for (let i = length - 1; i >= 0; i--) {
44+
const op = ops[i];
45+
switch (op.name()) {
46+
case 'ins_val': {
47+
// const insOp = op as InsValOp;
48+
// const valNode = this.log.end.index.get(insOp.obj);
49+
// if (!(valNode instanceof ValNode)) throw new Error('INVALID_NODE');
50+
// const copy = toSchema(valNode.node());
51+
// const valNodeId = valNode.id;
52+
// const task = () => {
53+
// const end = this.log.end;
54+
// const valNode = end.index.get(valNodeId);
55+
// if (!valNode) return;
56+
// end.api.wrap(valNode).asVal().set(copy);
57+
// };
58+
// undoTasks.push(task);
59+
}
60+
}
61+
}
62+
const undo = new Undo(() => {
63+
this.__onPatchRace(() => {
64+
for (const task of undoTasks) task();
65+
});
66+
return new Redo(() => {
67+
const undo = this.__onPatchRace(() => {
68+
// // TODO: This line needs to be changed:
69+
// const redoPatch = Patch.fromBinary(patch.toBinary());
70+
// this.log.end.api.builder.patch = redoPatch;
71+
// return this.createUndo(redoPatch);
72+
});
73+
return undo!;
74+
});
75+
});
76+
return undo;
77+
}
78+
}
File renamed without changes.

src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts renamed to src/json-crdt-repo/__tests__/LocalHistoryCrud.spec.ts

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ import {memfs} from 'memfs';
22
import {NodeCrud} from 'memfs/lib/node-to-crud';
33
import {Locks} from 'thingies/es2020/Locks';
44
import {LocalHistoryCrud} from '../LocalHistoryCrud';
5-
import {Log} from '../../log/Log';
6-
import {Model} from '../../model';
5+
import {Log} from '../../json-crdt/log/Log';
6+
import {Model} from '../../json-crdt/model';
77

88
const setup = async () => {
99
const {fs, vol} = memfs();
@@ -60,3 +60,37 @@ test('can delete a document', async () => {
6060
expect((err as Error).message).toBe(`Collection /test/${id} does not exist`);
6161
}
6262
});
63+
64+
test('can update document', async () => {
65+
const {local} = await setup();
66+
const model = Model.withLogicalClock();
67+
model.api.root({
68+
foo: 'spam',
69+
});
70+
const log = Log.fromNewModel(model);
71+
const {id} = await local.create(['test'], log);
72+
const {log: log2} = await local.read(['test'], id);
73+
log2.end.api.obj([]).set({
74+
bar: 'eggs',
75+
});
76+
const patch = log2.end.api.flush();
77+
await local.update(['test'], id, [patch]);
78+
const {log: log3} = await local.read(['test'], id);
79+
expect(log3.end.view()).toStrictEqual({
80+
foo: 'spam',
81+
bar: 'eggs',
82+
});
83+
});
84+
85+
test('can delete document', async () => {
86+
const {local} = await setup();
87+
const model = Model.withLogicalClock();
88+
model.api.root({
89+
foo: 'spam',
90+
});
91+
const log = Log.fromNewModel(model);
92+
const {id} = await local.create(['test'], log);
93+
await local.read(['test'], id);
94+
await local.delete(['test'], id);
95+
expect(() => local.read(['test'], id)).rejects.toThrow(`Collection /test/${id} does not exist`);
96+
});
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import type {CallerToMethods, TypedRpcClient} from '../../reactive-rpc/common';
2+
import type {JsonJoyDemoRpcCaller} from '../../server';
3+
import type {RemoteHistory, RemoteModel, RemotePatch} from './types';
4+
5+
type Methods = CallerToMethods<JsonJoyDemoRpcCaller>;
6+
7+
export type Cursor = number;
8+
9+
export interface RemoteServerModel extends RemoteModel {
10+
seq: number;
11+
created: number;
12+
updated: number;
13+
}
14+
15+
export interface RemoteServerPatch extends RemotePatch {
16+
seq: number;
17+
}
18+
19+
export class RemoteHistoryDemoServer implements RemoteHistory<Cursor, RemoteServerModel, RemoteServerPatch> {
20+
constructor(protected readonly client: TypedRpcClient<Methods>) {}
21+
22+
public async create(id: string, patches: RemotePatch[]): Promise<void> {
23+
await this.client.call('block.new', {
24+
id,
25+
patches: patches.map((patch) => ({
26+
blob: patch.blob,
27+
})),
28+
});
29+
}
30+
31+
/**
32+
* Load latest state of the model, and any unmerged "tip" of patches
33+
* it might have.
34+
*/
35+
public async read(id: string): Promise<{cursor: Cursor; model: RemoteServerModel; patches: RemoteServerPatch[]}> {
36+
const {model, patches} = await this.client.call('block.get', {id});
37+
return {
38+
cursor: model.seq,
39+
model,
40+
patches: [],
41+
};
42+
}
43+
44+
public async scanFwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; patches: RemoteServerPatch[]}> {
45+
const limit = 100;
46+
const res = await this.client.call('block.scan', {
47+
id,
48+
seq: cursor,
49+
limit: cursor + limit,
50+
});
51+
if (res.patches.length === 0) {
52+
return {
53+
cursor,
54+
patches: [],
55+
};
56+
}
57+
return {
58+
cursor: res.patches[res.patches.length - 1].seq,
59+
patches: res.patches,
60+
};
61+
}
62+
63+
public async scanBwd(
64+
id: string,
65+
cursor: Cursor,
66+
): Promise<{cursor: Cursor; model: RemoteServerModel; patches: RemoteServerPatch[]}> {
67+
throw new Error('The "blocks.history" should be able to return starting model.');
68+
}
69+
70+
public async update(
71+
id: string,
72+
cursor: Cursor,
73+
patches: RemotePatch[],
74+
): Promise<{cursor: Cursor; patches: RemoteServerPatch[]}> {
75+
const res = await this.client.call('block.upd', {
76+
id,
77+
patches: patches.map((patch, seq) => ({
78+
seq,
79+
created: Date.now(),
80+
blob: patch.blob,
81+
})),
82+
});
83+
return {
84+
cursor: res.patches.length ? res.patches[res.patches.length - 1].seq : cursor,
85+
patches: res.patches,
86+
};
87+
}
88+
89+
public async delete(id: string): Promise<void> {
90+
await this.client.call('block.del', {id});
91+
}
92+
93+
/**
94+
* Subscribe to the latest changes to the model.
95+
* @param callback
96+
*/
97+
public listen(id: string, cursor: Cursor, callback: (changes: RemoteServerPatch[]) => void): void {
98+
throw new Error('Method not implemented.');
99+
}
100+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import {Model} from '../../../json-crdt/model';
2+
import {buildE2eClient} from '../../../reactive-rpc/common/testing/buildE2eClient';
3+
import {createCaller} from '../../../server/routes/index';
4+
import {RemoteHistoryDemoServer} from '../RemoteHistoryDemoServer';
5+
6+
const setup = () => {
7+
const {caller, router} = createCaller();
8+
const {client} = buildE2eClient(caller);
9+
const remote = new RemoteHistoryDemoServer(client);
10+
11+
return {
12+
router,
13+
caller,
14+
client,
15+
remote,
16+
};
17+
};
18+
19+
let cnt = 0;
20+
const genId = () => Math.random().toString(36).slice(2) + '-' + Date.now().toString(36) + '-' + cnt++;
21+
22+
describe('.create()', () => {
23+
test('can create a block with a simple patch', async () => {
24+
const {remote, caller} = await setup();
25+
const model = Model.withLogicalClock();
26+
model.api.root({foo: 'bar'});
27+
const patch = model.api.flush();
28+
const blob = patch.toBinary();
29+
const id = genId();
30+
await remote.create(id, [{blob}]);
31+
const {data} = await caller.call('block.get', {id}, {});
32+
// console.log(data.patches);
33+
const model2 = Model.fromBinary(data.model.blob);
34+
expect(model2.view()).toEqual({foo: 'bar'});
35+
});
36+
});

src/json-crdt-repo/remote/types.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/**
2+
* A history of patches that have been applied to a model, stored on the
3+
* "remote": (1) server; (2) content addressable storage; or (3) somewhere in a
4+
* peer-to-peer network.
5+
*/
6+
export interface RemoteHistory<Cursor, M extends RemoteModel = RemoteModel, P extends RemotePatch = RemotePatch> {
7+
create(id: string, patches: RemotePatch[]): Promise<void>;
8+
9+
/**
10+
* Load latest state of the model, and any unmerged "tip" of patches
11+
* it might have.
12+
*
13+
* @todo Maybe `state` and `tip` should be serialized to JSON?
14+
*/
15+
read(id: string): Promise<{cursor: Cursor; model: M; patches: P[]}>;
16+
17+
scanFwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; patches: P[]}>;
18+
19+
scanBwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; model: M; patches: P[]}>;
20+
21+
update(id: string, cursor: Cursor, patches: RemotePatch[]): Promise<{cursor: Cursor; patches: P[]}>;
22+
23+
delete?(id: string): Promise<void>;
24+
25+
/**
26+
* Subscribe to the latest changes to the model.
27+
* @param callback
28+
*/
29+
listen(id: string, cursor: Cursor, callback: (patches: P[]) => void): void;
30+
}
31+
32+
export interface RemoteModel {
33+
blob: Uint8Array;
34+
}
35+
36+
export interface RemotePatch {
37+
blob: Uint8Array;
38+
}

src/json-crdt-repo/types.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import type {Patch} from '../json-crdt-patch';
2+
import type {Log} from '../json-crdt/log/Log';
3+
import type {Model} from '../json-crdt/model';
4+
5+
export interface LocalHistory {
6+
create(collection: string[], log: Log): Promise<{id: string}>;
7+
read(collection: string[], id: string): Promise<{log: Log; cursor: string}>;
8+
readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}>;
9+
update(collection: string[], id: string, patches: Patch[]): Promise<void>;
10+
delete(collection: string[], id: string): Promise<void>;
11+
}
12+
13+
export interface EditingSessionHistory {
14+
load(id: string): Promise<Model>;
15+
loadHistory(id: string): Promise<Log>;
16+
undo(id: string): Promise<void>;
17+
redo(id: string): Promise<void>;
18+
}

0 commit comments

Comments
 (0)