Skip to content

Commit bb8c114

Browse files
committed
fix(json-crdt): 🐛 collect patches from onFlush emits
1 parent fbb9a81 commit bb8c114

File tree

3 files changed

+94
-21
lines changed

3 files changed

+94
-21
lines changed

src/json-crdt/history/PatchLog.ts

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,39 +28,53 @@ export class PatchLog implements Printable {
2828
* Model factory function that creates a new JSON CRDT model instance, which
2929
* is used as the starting point of the log. It is called every time a new
3030
* model is needed to replay the log.
31+
*
32+
* @readonly Internally this function may be updated, but externally it is
33+
* read-only.
3134
*/
32-
public readonly start: () => Model;
35+
public start: () => Model;
3336

3437
/**
3538
* The end of the log, the current state of the document. It is the model
3639
* instance that is used to apply new patches to the log.
40+
*
41+
* @readonly
3742
*/
3843
public readonly end: Model;
3944

4045
/**
4146
* The patches in the log, stored in an AVL tree for efficient replaying. The
4247
* collection of patches which are applied to the `start()` model to reach
4348
* the `end` model.
49+
*
50+
* @readonly
4451
*/
4552
public readonly patches = new AvlMap<ITimestampStruct, Patch>(compare);
46-
private _patchesUnsub: FanOutUnsubscribe;
53+
54+
private __onPatch: FanOutUnsubscribe;
55+
private __onFlush: FanOutUnsubscribe;
4756

4857
constructor(start: () => Model) {
4958
this.start = start;
50-
this.end = start();
51-
this._patchesUnsub = this.end.api.onPatch.listen((patch) => {
59+
const end = this.end = start();
60+
const onPatch = (patch: Patch) => {
5261
const id = patch.getId();
5362
if (!id) return;
5463
this.patches.set(id, patch);
55-
});
64+
};
65+
const api = end.api;
66+
this.__onPatch = api.onPatch.listen(onPatch);
67+
this.__onFlush = api.onFlush.listen(onPatch);
5668
}
5769

5870
/**
59-
* Call this method to destroy the `PatchLog` instance. It unsubscribes from
60-
* the model's `onPatch` event listener.
71+
* Call this method to destroy the `PatchLog` instance. It unsubscribes patch
72+
* and flush listeners from the `end` model and clears the patch log.
6173
*/
6274
public destroy() {
63-
this._patchesUnsub();
75+
this.__onPatch();
76+
this.__onFlush();
77+
this.patches.clear();
6478
}
6579

6680
/**
@@ -90,24 +104,45 @@ export class PatchLog implements Printable {
90104
return clone;
91105
}
92106

107+
/**
108+
* Advance the start of the log to a specified timestamp, excluding the patch
109+
* at the given timestamp. This method removes all patches from the log that
110+
* are older than the given timestamp and updates the `start()` factory
111+
* function to replay the log from the new start.
112+
*
113+
* @param ts Timestamp ID of the patch to advance to.
114+
*/
115+
public advanceTo(ts: ITimestampStruct): void {
116+
const newStartPatches: Patch[] = [];
117+
let node = first(this.patches.root)
118+
for (; node && compare(ts, node.k) >= 0; node = next(node)) newStartPatches.push(node.v);
119+
for (const patch of newStartPatches) this.patches.del(patch.getId()!);
120+
const oldStart = this.start;
121+
this.start = (): Model => {
122+
const model = oldStart();
123+
for (const patch of newStartPatches) model.applyPatch(patch);
124+
return model;
125+
};
126+
}
127+
93128
// ---------------------------------------------------------------- Printable
94129

95130
public toString(tab?: string) {
96-
const log: Patch[] = [];
97-
this.patches.forEach(({v}) => log.push(v));
131+
const patches: Patch[] = [];
132+
this.patches.forEach(({v}) => patches.push(v));
98133
return (
99134
`log` +
100135
printTree(tab, [
101-
(tab) => `start` + printTree(tab, [tab => this.start().toString(tab)]),
136+
(tab) => `start` + printTree(tab, [(tab) => this.start().toString(tab)]),
102137
() => '',
103138
(tab) =>
104-
'history' +
139+
'history' +
105140
printTree(
106141
tab,
107-
log.map((patch, i) => (tab) => `${i}: ${patch.toString(tab)}`),
142+
patches.map((patch, i) => (tab) => `${i}: ${patch.toString(tab)}`),
108143
),
109144
() => '',
110-
(tab) => `end` + printTree(tab, [tab => this.end.toString(tab)]),
145+
(tab) => `end` + printTree(tab, [(tab) => this.end.toString(tab)]),
111146
])
112147
);
113148
}

src/json-crdt/history/__tests__/PatchLog.spec.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,41 @@ test('can replay to specific patch', () => {
2727
expect(model3.view()).toEqual({foo: 'bar', x: 1});
2828
expect(model4.view()).toEqual({foo: 'bar', x: 1, y: 2});
2929
});
30+
31+
test('can advance the log from start', () => {
32+
const {log} = setup({foo: 'bar'});
33+
log.end.api.obj([]).set({x: 1});
34+
const patch1 = log.end.api.flush();
35+
log.end.api.obj([]).set({y: 2});
36+
const patch2 = log.end.api.flush();
37+
log.end.api.obj([]).set({foo: 'baz'});
38+
const patch3 = log.end.api.flush();
39+
expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2});
40+
expect(log.start().view()).toEqual(undefined);
41+
log.advanceTo(patch1.getId()!);
42+
expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2});
43+
expect(log.start().view()).toEqual({foo: 'bar', x: 1});
44+
log.advanceTo(patch2.getId()!);
45+
expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2});
46+
expect(log.start().view()).toEqual({foo: 'bar', x: 1, y: 2});
47+
expect(log.patches.size()).toBe(1);
48+
log.advanceTo(patch3.getId()!);
49+
expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2});
50+
expect(log.start().view()).toEqual({foo: 'baz', x: 1, y: 2});
51+
expect(log.patches.size()).toBe(0);
52+
});
53+
54+
test('can advance multiple patches at once', () => {
55+
const {log} = setup({foo: 'bar'});
56+
log.end.api.obj([]).set({x: 1});
57+
log.end.api.flush();
58+
log.end.api.obj([]).set({y: 2});
59+
const patch2 = log.end.api.flush();
60+
log.end.api.obj([]).set({foo: 'baz'});
61+
log.end.api.flush();
62+
expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2});
63+
expect(log.start().view()).toEqual(undefined);
64+
log.advanceTo(patch2.getId()!);
65+
expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2});
66+
expect(log.start().view()).toEqual({foo: 'bar', x: 1, y: 2});
67+
});

src/json-crdt/history/types.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import {Patch} from "../../json-crdt-patch";
2-
import {PatchLog} from "../history/PatchLog";
3-
import {Model} from "../model";
1+
import {Patch} from '../../json-crdt-patch';
2+
import {PatchLog} from '../history/PatchLog';
3+
import {Model} from '../model';
44

55
/**
66
* A history of patches that have been applied to a model, stored on the
77
* "remote": (1) server; (2) content addressable storage; or (3) peer-to-peer
88
* network.
9-
*
9+
*
1010
* Cases:
11-
*
11+
*
1212
* - `RemoteHistoryServer`
1313
* - `RemoteHistoryServerIdempotent`
1414
* - `RemoteHistoryCAS`
@@ -18,7 +18,7 @@ export interface RemoteHistory<Cursor> {
1818
/**
1919
* Load latest state of the model, and any unmerged "tip" of patches
2020
* it might have.
21-
*
21+
*
2222
* @todo Maybe `state` and `tip` should be serialized to JSON?
2323
*/
2424
loadLatest(id: string): Promise<[cursor: Cursor, state: Model]>;
@@ -31,7 +31,7 @@ export interface RemoteHistory<Cursor> {
3131

3232
/**
3333
* Subscribe to the latest changes to the model.
34-
* @param callback
34+
* @param callback
3535
*/
3636
subscribe(id: string, cursor: Cursor, callback: (changes: Patch[]) => void): void;
3737
}

0 commit comments

Comments
 (0)