Skip to content

Commit 315c185

Browse files
authored
Merge pull request #934 from streamich/log-improvements
Log improvements
2 parents c6a0ab6 + 20e5630 commit 315c185

File tree

2 files changed

+282
-3
lines changed

2 files changed

+282
-3
lines changed

src/json-crdt/log/Log.ts

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import {AvlMap} from 'sonic-forest/lib/avl/AvlMap';
2-
import {first, next} from 'sonic-forest/lib/util';
2+
import {first, next, prev} from 'sonic-forest/lib/util';
33
import {printTree} from 'tree-dump/lib/printTree';
44
import {listToUint8} from '@jsonjoy.com/util/lib/buffers/concat';
5+
import {cloneBinary} from '@jsonjoy.com/util/lib/json-clone/cloneBinary';
56
import {Model} from '../model';
67
import {toSchema} from '../schema/toSchema';
78
import {
@@ -77,7 +78,7 @@ export class Log<N extends JsonNode = JsonNode<any>, Metadata extends Record<str
7778
*
7879
* @readonly
7980
*/
80-
public readonly patches = new AvlMap<ITimestampStruct, Patch>(compare);
81+
public patches = new AvlMap<ITimestampStruct, Patch>(compare);
8182

8283
private __onPatch: FanOutUnsubscribe;
8384
private __onFlush: FanOutUnsubscribe;
@@ -124,7 +125,6 @@ export class Log<N extends JsonNode = JsonNode<any>, Metadata extends Record<str
124125
public destroy() {
125126
this.__onPatch();
126127
this.__onFlush();
127-
this.patches.clear();
128128
}
129129

130130
/**
@@ -182,6 +182,106 @@ export class Log<N extends JsonNode = JsonNode<any>, Metadata extends Record<str
182182
};
183183
}
184184

185+
/**
186+
* Finds the latest patch for a given session ID.
187+
*
188+
* @param sid Session ID to find the latest patch for.
189+
* @return The latest patch for the given session ID, or `undefined` if no
190+
* such patch exists.
191+
*/
192+
public findMax(sid: number): Patch | undefined {
193+
let curr = this.patches.max;
194+
while (curr) {
195+
if (curr.k.sid === sid) return curr.v;
196+
curr = prev(curr);
197+
}
198+
return;
199+
}
200+
201+
/**
202+
* @returns A deep clone of the log, including the start function, metadata,
203+
* patches, and the end model.
204+
*/
205+
public clone(): Log<N, Metadata> {
206+
const start = this.start;
207+
const metadata = cloneBinary(this.metadata) as Metadata;
208+
const end = this.end.clone();
209+
const log = new Log(start, end, metadata);
210+
for (const {v} of this.patches.entries()) {
211+
const patch = v.clone();
212+
const id = patch.getId();
213+
if (!id) continue;
214+
log.patches.set(id, patch);
215+
}
216+
return log;
217+
}
218+
219+
// /**
220+
// * Adds a batch of patches to the log, without applying them to the `end`
221+
// * model. It is assumed that the patches are already applied to the `end`
222+
// * model, this method only adds them to the internal patch collection.
223+
// *
224+
// * If you need to apply patches to the `end` model, use `end.applyBatch(batch)`,
225+
// * it will apply them to the model and add them to the log automatically.
226+
// *
227+
// * @param batch Array of patches to add to the log.
228+
// */
229+
// public add(batch: Patch[]): void {
230+
// const patches = this.patches;
231+
// for (const patch of batch) {
232+
// const id = patch.getId();
233+
// if (id) patches.set(id, patch);
234+
// }
235+
// }
236+
237+
/**
238+
* Rebase a batch of patches on top of the current end of the log, or on top
239+
* of the latest patch for a given session ID.
240+
*
241+
* @param batch A batch of patches to rebase.
242+
* @param sid Session ID to find the latest patch for rebasing. If not provided,
243+
* the latest patch in the log is used.
244+
* @returns The rebased patches.
245+
*/
246+
public rebaseBatch(batch: Patch[], sid?: number): Patch[] {
247+
const rebasePatch = sid ? this.findMax(sid) : this.patches.max?.v;
248+
if (!rebasePatch) return batch;
249+
const rebaseId = rebasePatch.getId();
250+
if (!rebaseId) return batch;
251+
let nextTime = rebaseId.time + rebasePatch.span();
252+
const rebased: Patch[] = [];
253+
const length = batch.length;
254+
for (let i = 0; i < length; i++) {
255+
const patch = batch[i].rebase(nextTime);
256+
nextTime += patch.span();
257+
rebased.push(patch);
258+
}
259+
return rebased;
260+
}
261+
262+
/**
263+
* Resets the log to the state of another log. Consumes all state fron the `to`
264+
* log. The `to` log will be destroyed and should not be used after calling
265+
* this method.
266+
*
267+
* If you want to preserve the `to` log, use `.clone()` method first.
268+
*
269+
* ```ts
270+
* const log1 = new Log();
271+
* const log2 = new Log();
272+
* log1.reset(log2.clone());
273+
* ```
274+
*
275+
* @param to The log to consume the state from.
276+
*/
277+
public reset(to: Log<N, Metadata>): void {
278+
this.start = to.start;
279+
this.metadata = to.metadata;
280+
this.patches = to.patches;
281+
this.end.reset(to.end);
282+
to.destroy();
283+
}
284+
185285
/**
186286
* Creates a patch which reverts the given patch. The RGA insertion operations
187287
* are reversed just by deleting the inserted values. All other operations

src/json-crdt/log/__tests__/Log.spec.ts

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import {deepEqual} from '@jsonjoy.com/util/lib/json-equal/deepEqual';
12
import {type DelOp, type InsStrOp, s} from '../../../json-crdt-patch';
23
import {Model} from '../../model';
34
import {Log} from '../Log';
@@ -119,6 +120,184 @@ describe('.advanceTo()', () => {
119120
});
120121
});
121122

123+
describe('.findMax()', () => {
124+
test('can advance the log from start', () => {
125+
const model = Model.create();
126+
const sid0 = model.clock.sid;
127+
const sid1 = Model.sid();
128+
model.api.set({foo: 'bar'});
129+
const log = Log.fromNewModel(model);
130+
log.end.api.obj([]).set({x: 1});
131+
const patch1 = log.end.api.flush();
132+
log.end.setSid(sid1);
133+
log.end.api.obj([]).set({y: 2});
134+
const patch2 = log.end.api.flush();
135+
log.end.setSid(sid0);
136+
log.end.api.obj([]).set({foo: 'baz'});
137+
const patch3 = log.end.api.flush();
138+
const found0 = log.findMax(sid0);
139+
const found1 = log.findMax(sid1);
140+
const found2 = log.findMax(12345);
141+
expect(found0).toBe(patch3);
142+
expect(found1).toBe(patch2);
143+
expect(found2).toBe(void 0);
144+
});
145+
});
146+
147+
const setupTwoLogs = () => {
148+
const model = Model.create({foo: 'bar'});
149+
const log1 = Log.fromNewModel(model);
150+
log1.metadata = {time: 123};
151+
log1.end.api.obj([]).set({x: 1});
152+
log1.end.api.flush();
153+
log1.end.api.obj([]).set({y: 2});
154+
log1.end.api.flush();
155+
log1.end.api.obj([]).set({foo: 'baz'});
156+
log1.end.api.flush();
157+
const log2 = log1.clone();
158+
return {log1, log2};
159+
};
160+
161+
const assertLogsEqual = (log1: Log<any, any>, log2: Log<any, any>) => {
162+
expect(log1.start()).not.toBe(log2.start());
163+
expect(deepEqual(log1.start().view(), log2.start().view())).toBe(true);
164+
expect(log1.start().clock.sid).toEqual(log2.start().clock.sid);
165+
expect(log1.start().clock.time).toEqual(log2.start().clock.time);
166+
expect(log1.end).not.toBe(log2.end);
167+
expect(deepEqual(log1.end.view(), log2.end.view())).toBe(true);
168+
expect(log1.end.clock.sid).toEqual(log2.end.clock.sid);
169+
expect(log1.end.clock.time).toEqual(log2.end.clock.time);
170+
expect(log1.metadata).not.toBe(log2.metadata);
171+
expect(deepEqual(log1.metadata, log2.metadata)).toBe(true);
172+
expect(log1.patches.size()).toBe(log2.patches.size());
173+
expect(log1.patches.min!.v.toBinary()).toEqual(log2.patches.min!.v.toBinary());
174+
expect(log1.patches.max!.v.toBinary()).toEqual(log2.patches.max!.v.toBinary());
175+
expect(log1.patches.min!.v).not.toBe(log2.patches.min!.v);
176+
expect(log1.patches.max!.v).not.toBe(log2.patches.max!.v);
177+
};
178+
179+
describe('.clone()', () => {
180+
test('start model has the same view and clock', () => {
181+
const {log1, log2} = setupTwoLogs();
182+
expect(log1.start()).not.toBe(log2.start());
183+
expect(deepEqual(log1.start().view(), log2.start().view())).toBe(true);
184+
expect(log1.start().clock.sid).toEqual(log2.start().clock.sid);
185+
expect(log1.start().clock.time).toEqual(log2.start().clock.time);
186+
});
187+
188+
test('end model has the same view and clock', () => {
189+
const {log1, log2} = setupTwoLogs();
190+
expect(log1.end).not.toBe(log2.end);
191+
expect(deepEqual(log1.end.view(), log2.end.view())).toBe(true);
192+
expect(log1.end.clock.sid).toEqual(log2.end.clock.sid);
193+
expect(log1.end.clock.time).toEqual(log2.end.clock.time);
194+
});
195+
196+
test('metadata is the same but has different identity', () => {
197+
const {log1, log2} = setupTwoLogs();
198+
expect(log1.metadata).not.toBe(log2.metadata);
199+
expect(deepEqual(log1.metadata, log2.metadata)).toBe(true);
200+
});
201+
202+
test('patch log is the same', () => {
203+
const {log1, log2} = setupTwoLogs();
204+
expect(log1.patches.size()).toBe(log2.patches.size());
205+
expect(log1.patches.min!.v.toBinary()).toEqual(log2.patches.min!.v.toBinary());
206+
expect(log1.patches.max!.v.toBinary()).toEqual(log2.patches.max!.v.toBinary());
207+
expect(log1.patches.min!.v).not.toBe(log2.patches.min!.v);
208+
expect(log1.patches.max!.v).not.toBe(log2.patches.max!.v);
209+
});
210+
211+
test('can evolve logs independently', () => {
212+
const {log1, log2} = setupTwoLogs();
213+
assertLogsEqual(log1, log2);
214+
log1.end.api.obj([]).set({a: 1});
215+
log1.end.api.flush();
216+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 1});
217+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2});
218+
log2.end.api.obj([]).set({b: 2});
219+
log2.end.api.flush();
220+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 1});
221+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 2});
222+
});
223+
});
224+
225+
describe('.rebaseBatch()', () => {
226+
test('can rebase a concurrent batch onto another log', () => {
227+
const {log1, log2} = setupTwoLogs();
228+
log1.end.api.obj([]).set({a: 1});
229+
log2.end.api.obj([]).set({b: 2});
230+
const patch1 = log1.end.api.flush();
231+
const patch2 = log2.end.api.flush();
232+
expect(patch1.toBinary()).not.toEqual(patch2.toBinary());
233+
expect(patch1.getId()?.sid).toBe(patch2.getId()?.sid);
234+
expect(patch1.getId()?.time).toBe(patch2.getId()?.time);
235+
expect(patch1.span()).toEqual(patch2.span());
236+
const [patch3] = log1.rebaseBatch([patch2]);
237+
expect(patch1.toBinary()).not.toEqual(patch3.toBinary());
238+
expect(patch1.getId()?.sid).toBe(patch3.getId()?.sid);
239+
expect(patch1.getId()!.time + patch1.span()).toBe(patch3.getId()?.time);
240+
log1.end.applyPatch(patch3);
241+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 1, b: 2});
242+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 2});
243+
expect(() => assertLogsEqual(log1, log2)).toThrow();
244+
log2.reset(log1.clone());
245+
assertLogsEqual(log1, log2);
246+
});
247+
248+
test('can rebase a concurrent batch onto another log (multiple patches)', () => {
249+
const {log1, log2} = setupTwoLogs();
250+
log1.end.api.obj([]).set({a: 1});
251+
log2.end.api.obj([]).set({b: 2});
252+
log1.end.api.flush();
253+
const patch2 = log2.end.api.flush();
254+
log1.end.api.obj([]).set({a: 2});
255+
log2.end.api.obj([]).set({b: 3});
256+
log1.end.api.flush();
257+
const patch4 = log2.end.api.flush();
258+
log2.end.api.obj([]).set({b: 3});
259+
const patch5 = log2.end.api.flush();
260+
const batch2 = [patch2, patch4, patch5];
261+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2});
262+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
263+
const batch3 = log1.rebaseBatch(batch2);
264+
expect(batch3[0].getId()!.time).toBe(log1.end.clock.time);
265+
log1.end.applyBatch(batch3);
266+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2, b: 3});
267+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
268+
expect(() => assertLogsEqual(log1, log2)).toThrow();
269+
log2.reset(log1.clone());
270+
assertLogsEqual(log1, log2);
271+
});
272+
273+
test('can specify rebase sid', () => {
274+
const {log1, log2} = setupTwoLogs();
275+
expect(log1.end.clock.sid).toBe(log2.end.clock.sid);
276+
log1.end.api.obj([]).set({a: 1});
277+
log2.end.api.obj([]).set({b: 2});
278+
log1.end.api.flush();
279+
const patch2 = log2.end.api.flush();
280+
log1.end.setSid(12345);
281+
log1.end.api.obj([]).set({a: 2});
282+
log2.end.api.obj([]).set({b: 3});
283+
log1.end.api.flush();
284+
const patch4 = log2.end.api.flush();
285+
log2.end.api.obj([]).set({b: 3});
286+
const patch5 = log2.end.api.flush();
287+
const batch2 = [patch2, patch4, patch5];
288+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2});
289+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
290+
const batch3 = log1.rebaseBatch(batch2, log2.end.clock.sid);
291+
expect(batch3[0].getId()!.time).not.toBe(log1.end.clock.time);
292+
log1.end.applyBatch(batch3);
293+
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2, b: 3});
294+
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
295+
expect(() => assertLogsEqual(log1, log2)).toThrow();
296+
log2.reset(log1.clone());
297+
assertLogsEqual(log1, log2);
298+
});
299+
});
300+
122301
describe('.undo()', () => {
123302
describe('RGA', () => {
124303
describe('str', () => {

0 commit comments

Comments
 (0)