Skip to content

Commit bd31263

Browse files
authored
Merge pull request #561 from streamich/revert-2
Revert 2
2 parents 8829f98 + a41034b commit bd31263

File tree

7 files changed

+245
-15
lines changed

7 files changed

+245
-15
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@
8383
"arg": "^5.0.2",
8484
"hyperdyperid": "^1.2.0",
8585
"multibase": "^4.0.6",
86-
"thingies": "^1.18.0"
86+
"thingies": "^1.20.0"
8787
},
8888
"devDependencies": {
8989
"@automerge/automerge": "2.1.7",
@@ -136,6 +136,7 @@
136136
"lodash": "^4.17.21",
137137
"loro-crdt": "^0.4.1",
138138
"markdown-it": "^13.0.1",
139+
"memfs": "^4.8.1",
139140
"messagepack": "^1.1.12",
140141
"msgpack-lite": "^0.1.26",
141142
"msgpack5": "^6.0.2",
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import {File, FileOptions} from '../file/File';
2+
import {CborEncoder} from '../../json-pack/cbor/CborEncoder';
3+
import type {CrudApi} from 'memfs/lib/crud/types';
4+
import type {Locks} from 'thingies/es2020/Locks';
5+
import type {Patch} from '../../json-crdt-patch';
6+
import type {PatchLog} from './PatchLog';
7+
import type {LocalHistory} from './types';
8+
9+
export const genId = (octets: number = 8): string => {
10+
const uint8 = crypto.getRandomValues(new Uint8Array(octets));
11+
let hex = '';
12+
for (let i = 0; i < octets; i++) hex += uint8[i].toString(16).padStart(2, '0');
13+
return hex;
14+
};
15+
16+
const STATE_FILE_NAME = 'state.seq.cbor';
17+
18+
export class LocalHistoryCrud implements LocalHistory {
19+
protected fileOpts: FileOptions = {
20+
cborEncoder: new CborEncoder(),
21+
};
22+
23+
constructor(
24+
protected readonly crud: CrudApi,
25+
protected readonly locks: Locks,
26+
) {}
27+
28+
public async create(collection: string[], log: PatchLog): Promise<{id: string}> {
29+
// TODO: Remove `log.end`, just `log` should be enough.
30+
const file = new File(log.end, log, this.fileOpts);
31+
const blob = file.toBinary({
32+
format: 'seq.cbor',
33+
model: 'binary',
34+
});
35+
const id = genId();
36+
await this.lock(collection, id, async () => {
37+
await this.crud.put([...collection, id], STATE_FILE_NAME, blob, {throwIf: 'exists'});
38+
});
39+
return {id};
40+
}
41+
42+
public async read(collection: string[], id: string): Promise<{log: PatchLog; cursor: string}> {
43+
const blob = await this.crud.get([...collection, id], STATE_FILE_NAME);
44+
const {log} = File.fromSeqCbor(blob);
45+
return {
46+
log,
47+
cursor: '',
48+
};
49+
}
50+
51+
public readHistory(collection: string[], id: string, cursor: string): Promise<{log: PatchLog; cursor: string}> {
52+
throw new Error('Method not implemented.');
53+
}
54+
55+
public async update(collection: string[], id: string, patches: Patch[]): Promise<void> {
56+
await this.lock(collection, id, async () => {
57+
const blob = await this.crud.get([...collection, id], STATE_FILE_NAME);
58+
const {log} = File.fromSeqCbor(blob);
59+
log.end.applyBatch(patches);
60+
const file = new File(log.end, log, this.fileOpts);
61+
const blob2 = file.toBinary({
62+
format: 'seq.cbor',
63+
model: 'binary',
64+
});
65+
await this.crud.put([...collection, id], STATE_FILE_NAME, blob2, {throwIf: 'missing'});
66+
});
67+
}
68+
69+
public async delete(collection: string[], id: string): Promise<void> {
70+
await this.lock(collection, id, async () => {
71+
await this.crud.drop(collection, true);
72+
});
73+
}
74+
75+
protected async lock(collection: string[], id: string, fn: () => Promise<void>): Promise<void> {
76+
const key = collection.join('/') + '/' + id;
77+
await this.locks.lock(
78+
key,
79+
250,
80+
500,
81+
)(async () => {
82+
await fn();
83+
});
84+
}
85+
}

src/json-crdt/history/PatchLog.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ export class PatchLog implements Printable {
4343
public readonly end: Model;
4444

4545
/**
46-
* The patches in the log, stored in an AVL tree for efficient replaying. The
47-
* collection of patches which are applied to the `start()` model to reach
48-
* the `end` model.
46+
* The collection of patches which are applied to the `start()` model to reach
47+
* the `end` model. The patches in the log, stored in an AVL tree for
48+
* efficient replaying. The patches are sorted by their logical timestamps
49+
* and applied in causal order.
4950
*
5051
* @readonly
5152
*/
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import {createRace} from 'thingies/es2020/createRace';
2+
import {FanOutUnsubscribe} from 'thingies/es2020/fanout';
3+
import {InsValOp, Patch} from '../../json-crdt-patch';
4+
import {ValNode} from '../nodes';
5+
import {toSchema} from '../schema/toSchema';
6+
import {PatchLog} from './PatchLog';
7+
import {RedoItem, UndoItem, UndoRedoStack} from './UndoRedoStack';
8+
9+
class Undo implements UndoItem {
10+
constructor(public readonly undo: () => Redo) {}
11+
}
12+
13+
class Redo implements RedoItem {
14+
constructor(public readonly redo: () => Undo) {}
15+
}
16+
17+
export class SessionHistory {
18+
constructor(public readonly log: PatchLog) {}
19+
20+
private readonly __onPatchRace = createRace();
21+
22+
public attachUndoRedo(stack: UndoRedoStack): FanOutUnsubscribe {
23+
const onBeforePatch = (patch: Patch) => {
24+
this.__onPatchRace(() => {
25+
const undo = this.createUndo(patch);
26+
stack.push(undo);
27+
});
28+
};
29+
const unsubscribe = this.log.end.api.onBeforePatch.listen(onBeforePatch);
30+
return unsubscribe;
31+
}
32+
33+
public createUndo(patch: Patch): Undo {
34+
const undoTasks: Array<() => void> = [];
35+
const ops = patch.ops;
36+
const length = ops.length;
37+
for (let i = length - 1; i >= 0; i--) {
38+
const op = ops[i];
39+
switch (op.name()) {
40+
case 'ins_val': {
41+
const insOp = op as InsValOp;
42+
const valNode = this.log.end.index.get(insOp.obj);
43+
if (!(valNode instanceof ValNode)) throw new Error('INVALID_NODE');
44+
const copy = toSchema(valNode.node());
45+
const valNodeId = valNode.id;
46+
const task = () => {
47+
const end = this.log.end;
48+
const valNode = end.index.get(valNodeId);
49+
if (!valNode) return;
50+
end.api.wrap(valNode).asVal().set(copy);
51+
};
52+
undoTasks.push(task);
53+
}
54+
}
55+
}
56+
const undo = new Undo(() => {
57+
this.__onPatchRace(() => {
58+
for (const task of undoTasks) task();
59+
});
60+
return new Redo(() => {
61+
const undo = this.__onPatchRace(() => {
62+
// TODO: This line needs to be changed:
63+
const redoPatch = Patch.fromBinary(patch.toBinary());
64+
this.log.end.api.builder.patch = redoPatch;
65+
return this.createUndo(redoPatch);
66+
});
67+
return undo!;
68+
});
69+
});
70+
return undo;
71+
}
72+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import {memfs} from 'memfs';
2+
import {NodeCrud} from 'memfs/lib/node-to-crud';
3+
import {Locks} from 'thingies/es2020/Locks';
4+
import {LocalHistoryCrud} from '../LocalHistoryCrud';
5+
import {PatchLog} from '../PatchLog';
6+
import {Model} from '../../model';
7+
8+
const setup = async () => {
9+
const {fs, vol} = memfs();
10+
const crud = new NodeCrud({fs: fs.promises, dir: '/'});
11+
const locks = new Locks();
12+
const local = new LocalHistoryCrud(crud, locks);
13+
return {
14+
fs,
15+
vol,
16+
crud,
17+
locks,
18+
local,
19+
};
20+
};
21+
22+
test('can create a new document', async () => {
23+
const {local} = await setup();
24+
const model = Model.withLogicalClock();
25+
model.api.root({
26+
foo: 'spam',
27+
});
28+
const log = PatchLog.fromNewModel(model);
29+
const {id} = await local.create(['test'], log);
30+
expect(typeof id).toBe('string');
31+
expect(id.length > 6).toBe(true);
32+
const {log: log2} = await local.read(['test'], id);
33+
expect(log2.end.view()).toStrictEqual({foo: 'spam'});
34+
});
35+
36+
test('throws on non-existing document', async () => {
37+
const {local} = await setup();
38+
try {
39+
await local.read(['test'], 'asdfasdf');
40+
throw new Error('FAIL');
41+
} catch (err) {
42+
expect((err as Error).message).toBe('Collection /test/asdfasdf does not exist');
43+
}
44+
});
45+
46+
test('can delete a document', async () => {
47+
const {local} = await setup();
48+
const model = Model.withLogicalClock();
49+
model.api.root({
50+
foo: 'spam',
51+
});
52+
const log = PatchLog.fromNewModel(model);
53+
const {id} = await local.create(['test'], log);
54+
await local.read(['test'], id);
55+
await local.delete(['test'], id);
56+
try {
57+
await local.read(['test'], id);
58+
throw new Error('FAIL');
59+
} catch (err) {
60+
expect((err as Error).message).toBe(`Collection /test/${id} does not exist`);
61+
}
62+
});

src/json-crdt/history/types.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import {Patch} from '../../json-crdt-patch';
2-
import {PatchLog} from '../history/PatchLog';
3-
import {Model} from '../model';
1+
import type {Patch} from '../../json-crdt-patch';
2+
import type {PatchLog} from '../history/PatchLog';
3+
import type {Model} from '../model';
44

55
/**
66
* A history of patches that have been applied to a model, stored on the
@@ -37,9 +37,11 @@ export interface RemoteHistory<Cursor> {
3737
}
3838

3939
export interface LocalHistory {
40-
load(id: string): Promise<EditingSessionHistory>;
41-
// loadHistory(id: string): Promise<PatchLog>;
42-
apply(id: string, patches: Patch[]): Promise<void>;
40+
create(collection: string[], log: PatchLog): Promise<{id: string}>;
41+
read(collection: string[], id: string): Promise<{log: PatchLog; cursor: string}>;
42+
readHistory(collection: string[], id: string, cursor: string): Promise<{log: PatchLog; cursor: string}>;
43+
update(collection: string[], id: string, patches: Patch[]): Promise<void>;
44+
delete(collection: string[], id: string): Promise<void>;
4345
}
4446

4547
export interface EditingSessionHistory {

yarn.lock

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3997,6 +3997,13 @@ memfs@^3.4.1, memfs@^3.4.3:
39973997
dependencies:
39983998
fs-monkey "^1.0.4"
39993999

4000+
memfs@^4.8.1:
4001+
version "4.8.1"
4002+
resolved "https://registry.yarnpkg.com/memfs/-/memfs-4.8.1.tgz#1e02c15c4397212a9a1b037fa4324c6f7dd45b47"
4003+
integrity sha512-7q/AdPzf2WpwPlPL4v1kE2KsJsHl7EF4+hAeVzlyanr2+YnR21NVn9mDqo+7DEaKDRsQy8nvxPlKH4WqMtiO0w==
4004+
dependencies:
4005+
tslib "^2.0.0"
4006+
40004007
merge-descriptors@1.0.1:
40014008
version "1.0.1"
40024009
resolved "https://registry.yarnpkg.com/merge-descriptors/-/merge-descriptors-1.0.1.tgz#b00aaa556dd8b44568150ec9d1b953f3f90cbb61"
@@ -5348,10 +5355,10 @@ test-exclude@^6.0.0:
53485355
glob "^7.1.4"
53495356
minimatch "^3.0.4"
53505357

5351-
thingies@^1.18.0:
5352-
version "1.18.0"
5353-
resolved "https://registry.yarnpkg.com/thingies/-/thingies-1.18.0.tgz#827141872d96f3c3c2c0b432ab0dfdb581b4b4ac"
5354-
integrity sha512-WiB26BQP0MF47Bbvbq0P19KpyfrvdTK07L8xnltobpZ/aJPmu52CBGhYjLsnFgjyawmusJ0gVkTplnnoz2hBkQ==
5358+
thingies@^1.20.0:
5359+
version "1.20.0"
5360+
resolved "https://registry.yarnpkg.com/thingies/-/thingies-1.20.0.tgz#27bf93397c39c3ff36601197e8cf78f43b7b2319"
5361+
integrity sha512-WvXY4CjHp/Uim2Ri0daqu6jkNTHJTk1H8NvuMQiOL0mgtdkqoSH5fkENy2M6XnvsLOp5iwyPcbmokoBjVb4lnQ==
53555362

53565363
thunky@^1.0.2:
53575364
version "1.1.0"
@@ -5453,7 +5460,7 @@ tslib@^1.13.0, tslib@^1.8.1:
54535460
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00"
54545461
integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==
54555462

5456-
tslib@^2.0.1, tslib@^2.0.3, tslib@^2.1.0, tslib@^2.6.2:
5463+
tslib@^2.0.0, tslib@^2.0.1, tslib@^2.0.3, tslib@^2.1.0, tslib@^2.6.2:
54575464
version "2.6.2"
54585465
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.6.2.tgz#703ac29425e7b37cd6fd456e92404d46d1f3e4ae"
54595466
integrity sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==

0 commit comments

Comments
 (0)