|
| 1 | +import {FeedFrame} from './FeedFrame'; |
| 2 | +import {AvlSet} from '../../../util/trees/avl/AvlSet'; |
| 3 | +import {AvlMap} from '../../../util/trees/avl/AvlMap'; |
| 4 | +import {Cid} from '../../multiformats'; |
| 5 | +import {mutex} from 'thingies/es2020/mutex'; |
| 6 | +import {FanOut} from 'thingies/es2020/fanout'; |
| 7 | +import {FeedConstraints, FeedOpType} from './constants'; |
| 8 | +import * as hlc from '../../hlc'; |
| 9 | +import type {CidCasStruct} from '../../store/cas/CidCasStruct'; |
| 10 | +import type * as types from './types'; |
| 11 | +import type {SyncStore} from '../../../util/events/sync-store'; |
| 12 | + |
| 13 | +export interface FeedDependencies { |
| 14 | + cas: CidCasStruct; |
| 15 | + hlcs: hlc.HlcFactory; |
| 16 | + |
| 17 | + /** |
| 18 | + * Number of operations after which a new frame is created, otherwise the |
| 19 | + * operations are appended to the current frame. Defaults to 25. |
| 20 | + */ |
| 21 | + opsPerFrame?: number; |
| 22 | +} |
| 23 | + |
| 24 | +export class Feed implements types.FeedApi, SyncStore<types.FeedOpInsert[]> { |
| 25 | + public static async merge( |
| 26 | + cas: CidCasStruct, |
| 27 | + baseCid: Cid, |
| 28 | + forkCid: Cid, |
| 29 | + opsPerFrame: number = FeedConstraints.DefaultOpsPerFrameThreshold, |
| 30 | + ): Promise<FeedFrame[]> { |
| 31 | + const [commonParent, baseFrames, forkFrames] = await Feed.findForkTriangle(cas, baseCid, forkCid); |
| 32 | + const ops = Feed.zipOps(baseFrames, forkFrames); |
| 33 | + let lastFrame: FeedFrame | null = commonParent; |
| 34 | + const frames: FeedFrame[] = []; |
| 35 | + while (ops.length) { |
| 36 | + const frameOps = ops.splice(0, opsPerFrame); |
| 37 | + const prev = lastFrame ? lastFrame.cid.toBinaryV1() : null; |
| 38 | + const seq = lastFrame ? lastFrame.seq() + 1 : FeedConstraints.FirstFrameSeq; |
| 39 | + const dto: types.FeedFrameDto = [prev, seq, frameOps]; |
| 40 | + const frame = await FeedFrame.create(dto, cas); |
| 41 | + frame.prev = lastFrame; |
| 42 | + lastFrame = frame; |
| 43 | + frames.push(frame); |
| 44 | + } |
| 45 | + return frames; |
| 46 | + } |
| 47 | + |
| 48 | + protected static zipOps(baseFrames: FeedFrame[], forkFrames: FeedFrame[]): types.FeedOp[] { |
| 49 | + const baseOps: types.FeedOp[] = []; |
| 50 | + const forkOps: types.FeedOp[] = []; |
| 51 | + for (const frame of baseFrames) baseOps.push(...frame.ops()); |
| 52 | + for (const frame of forkFrames) forkOps.push(...frame.ops()); |
| 53 | + const ops: types.FeedOp[] = []; |
| 54 | + while (baseOps.length || forkOps.length) { |
| 55 | + if (!baseOps.length) { |
| 56 | + ops.push(...forkOps); |
| 57 | + break; |
| 58 | + } |
| 59 | + if (!forkOps.length) { |
| 60 | + ops.push(...baseOps); |
| 61 | + break; |
| 62 | + } |
| 63 | + const baseOp = baseOps[0]; |
| 64 | + if (baseOp[0] === FeedOpType.Delete) { |
| 65 | + ops.push(baseOp); |
| 66 | + baseOps.shift(); |
| 67 | + continue; |
| 68 | + } |
| 69 | + const forkOp = forkOps[0]; |
| 70 | + if (forkOp[0] === FeedOpType.Delete) { |
| 71 | + ops.push(forkOp); |
| 72 | + forkOps.shift(); |
| 73 | + continue; |
| 74 | + } |
| 75 | + const baseId = baseOp[1]; |
| 76 | + const forkId = forkOp[1]; |
| 77 | + const cmp = hlc.cmpDto(baseId, forkId); |
| 78 | + if (cmp === 0) { |
| 79 | + ops.push(baseOp); |
| 80 | + baseOps.shift(); |
| 81 | + forkOps.shift(); |
| 82 | + continue; |
| 83 | + } else if (cmp < 0) { |
| 84 | + ops.push(baseOp); |
| 85 | + baseOps.shift(); |
| 86 | + continue; |
| 87 | + } else { |
| 88 | + ops.push(forkOp); |
| 89 | + forkOps.shift(); |
| 90 | + continue; |
| 91 | + } |
| 92 | + } |
| 93 | + return ops; |
| 94 | + } |
| 95 | + |
| 96 | + protected static async findForkTriangle( |
| 97 | + cas: CidCasStruct, |
| 98 | + leftCid: Cid, |
| 99 | + rightCid: Cid, |
| 100 | + ): Promise<[commonParent: FeedFrame | null, leftFrames: FeedFrame[], rightFrames: FeedFrame[]]> { |
| 101 | + const leftHeadFrame = await FeedFrame.read(leftCid, cas); |
| 102 | + const rightHeadFrame = await FeedFrame.read(rightCid, cas); |
| 103 | + const leftFrames: FeedFrame[] = [leftHeadFrame]; |
| 104 | + const rightFrames: FeedFrame[] = [rightHeadFrame]; |
| 105 | + if (leftHeadFrame.seq() > rightHeadFrame.seq()) { |
| 106 | + while (true) { |
| 107 | + const prevCid = leftFrames[leftFrames.length - 1].prevCid(); |
| 108 | + if (!prevCid) throw new Error('INVALID_STATE'); |
| 109 | + const cid = Cid.fromBinaryV1(prevCid); |
| 110 | + const frame = await FeedFrame.read(cid, cas); |
| 111 | + leftFrames.push(frame); |
| 112 | + if (frame.seq() <= rightHeadFrame.seq()) break; |
| 113 | + } |
| 114 | + } |
| 115 | + if (leftHeadFrame.seq() < rightHeadFrame.seq()) { |
| 116 | + while (true) { |
| 117 | + const prevCid = rightFrames[rightFrames.length - 1].prevCid(); |
| 118 | + if (!prevCid) throw new Error('INVALID_STATE'); |
| 119 | + const cid = Cid.fromBinaryV1(prevCid); |
| 120 | + const frame = await FeedFrame.read(cid, cas); |
| 121 | + rightFrames.push(frame); |
| 122 | + if (frame.seq() <= leftHeadFrame.seq()) break; |
| 123 | + } |
| 124 | + } |
| 125 | + while (true) { |
| 126 | + const leftFrame = leftFrames[leftFrames.length - 1]; |
| 127 | + const rightFrame = rightFrames[rightFrames.length - 1]; |
| 128 | + if (leftFrame.seq() !== rightFrame.seq()) throw new Error('INVALID_STATE'); |
| 129 | + if (leftFrame.seq() === 0) return [null, leftFrames, rightFrames]; |
| 130 | + if (leftFrame.cid.is(rightFrame.cid)) { |
| 131 | + leftFrames.pop(); |
| 132 | + rightFrames.pop(); |
| 133 | + return [leftFrame, leftFrames, rightFrames]; |
| 134 | + } |
| 135 | + const prevLeft = leftFrame.prevCid(); |
| 136 | + const prevRight = rightFrame.prevCid(); |
| 137 | + if (!prevLeft || !prevRight) throw new Error('INVALID_STATE'); |
| 138 | + leftFrames.push(await FeedFrame.read(Cid.fromBinaryV1(prevLeft), cas)); |
| 139 | + rightFrames.push(await FeedFrame.read(Cid.fromBinaryV1(prevRight), cas)); |
| 140 | + } |
| 141 | + } |
| 142 | + |
| 143 | + /** |
| 144 | + * Number of operations after which a new frame is created, otherwise the |
| 145 | + * operations are appended to the current frame. |
| 146 | + */ |
| 147 | + public opsPerFrame: number; |
| 148 | + |
| 149 | + /** |
| 150 | + * Emitted when the feed view changes (new entries are added or deleted). |
| 151 | + */ |
| 152 | + public onChange: FanOut<void> = new FanOut(); |
| 153 | + |
| 154 | + protected head: FeedFrame | null = null; |
| 155 | + protected tail: FeedFrame | null = null; |
| 156 | + protected unsaved: types.FeedOp[] = []; |
| 157 | + protected readonly deletes = new AvlSet<hlc.HlcDto>(hlc.cmpDto); |
| 158 | + protected readonly inserts = new AvlMap<hlc.HlcDto, types.FeedOpInsert>(hlc.cmpDto); |
| 159 | + |
| 160 | + constructor(protected readonly deps: FeedDependencies) { |
| 161 | + this.opsPerFrame = deps.opsPerFrame ?? FeedConstraints.DefaultOpsPerFrameThreshold; |
| 162 | + } |
| 163 | + |
| 164 | + public cid(): Cid | undefined { |
| 165 | + return this.head?.cid; |
| 166 | + } |
| 167 | + |
| 168 | + public async loadAll(): Promise<void> { |
| 169 | + while (this.hasMore()) await this.loadMore(); |
| 170 | + } |
| 171 | + |
| 172 | + public clear(): void { |
| 173 | + this.head = null; |
| 174 | + this.tail = null; |
| 175 | + this.unsaved = []; |
| 176 | + this.deletes.clear(); |
| 177 | + if (!this.inserts.isEmpty()) { |
| 178 | + this.inserts.clear(); |
| 179 | + this.onChange.emit(); |
| 180 | + } |
| 181 | + } |
| 182 | + |
| 183 | + public async merge(forkCid: Cid): Promise<void> { |
| 184 | + if (this.unsaved.length) await this.save(); |
| 185 | + if (!this.head) throw new Error('INVALID_STATE'); |
| 186 | + const frames = await Feed.merge(this.deps.cas, this.head.cid, forkCid, this.opsPerFrame); |
| 187 | + for (const frame of frames) this.ingestFrameData(frame, true); |
| 188 | + const head = frames[frames.length - 1]; |
| 189 | + let curr = head; |
| 190 | + for (let i = frames.length - 2; i >= 0; i--) { |
| 191 | + curr.prev = frames[i]; |
| 192 | + curr = frames[i]; |
| 193 | + } |
| 194 | + let existingCurr: FeedFrame | null = this.head; |
| 195 | + while (existingCurr && existingCurr.seq() > curr.seq()) existingCurr = existingCurr.prev; |
| 196 | + if (existingCurr) curr.prev = existingCurr.prev; |
| 197 | + else this.tail = curr; |
| 198 | + this.head = head; |
| 199 | + this.onChange.emit(); |
| 200 | + } |
| 201 | + |
| 202 | + // ------------------------------------------------------------------ FeedApi |
| 203 | + |
| 204 | + public add(data: unknown): hlc.HlcDto { |
| 205 | + const id = this.deps.hlcs.inc(); |
| 206 | + const idDto = hlc.toDto(id); |
| 207 | + const op: types.FeedOpInsert = [FeedOpType.Insert, idDto, data]; |
| 208 | + this.unsaved.push(op); |
| 209 | + this.inserts.set(op[1], op); |
| 210 | + this.onChange.emit(); |
| 211 | + return idDto; |
| 212 | + } |
| 213 | + |
| 214 | + public del(opId: hlc.HlcDto): void { |
| 215 | + const op: types.FeedOpDelete = [FeedOpType.Delete, opId]; |
| 216 | + this.unsaved.push(op); |
| 217 | + this.deletes.add(opId); |
| 218 | + const unsavedOpIndex = this.unsaved.findIndex( |
| 219 | + ([type, id]) => type === FeedOpType.Insert && hlc.cmpDto(opId, id) === 0, |
| 220 | + ); |
| 221 | + if (unsavedOpIndex !== -1) this.unsaved.splice(unsavedOpIndex, 1); |
| 222 | + const deleted = this.inserts.del(opId); |
| 223 | + if (deleted) this.onChange.emit(); |
| 224 | + } |
| 225 | + |
| 226 | + @mutex |
| 227 | + public async loadHead(cid: Cid): Promise<void> { |
| 228 | + this.clear(); |
| 229 | + const frame = await FeedFrame.read(cid, this.deps.cas); |
| 230 | + this.head = this.tail = frame; |
| 231 | + this.ingestFrameData(frame); |
| 232 | + } |
| 233 | + |
| 234 | + @mutex |
| 235 | + public async loadMore(): Promise<void> { |
| 236 | + const tail = this.tail; |
| 237 | + if (!tail) return; |
| 238 | + const prevCidDto = tail.data[0]; |
| 239 | + if (!prevCidDto) return; |
| 240 | + const cid = Cid.fromBinaryV1(prevCidDto); |
| 241 | + const frame = this.tail?.prev ?? (await FeedFrame.read(cid, this.deps.cas)); |
| 242 | + tail.prev = frame; |
| 243 | + this.tail = frame; |
| 244 | + this.ingestFrameData(frame); |
| 245 | + } |
| 246 | + |
| 247 | + public hasMore(): boolean { |
| 248 | + return !!this.tail?.data[0]; |
| 249 | + } |
| 250 | + |
| 251 | + protected ingestFrameData(frame: FeedFrame, silent?: boolean): void { |
| 252 | + const [, , ops] = frame.data; |
| 253 | + for (const op of ops) { |
| 254 | + switch (op[0]) { |
| 255 | + case FeedOpType.Insert: { |
| 256 | + const id = op[1]; |
| 257 | + if (this.deletes.has(id)) continue; |
| 258 | + this.inserts.set(id, op); |
| 259 | + break; |
| 260 | + } |
| 261 | + case FeedOpType.Delete: { |
| 262 | + const id = op[1]; |
| 263 | + this.deletes.add(id); |
| 264 | + this.inserts.del(id); |
| 265 | + break; |
| 266 | + } |
| 267 | + } |
| 268 | + } |
| 269 | + if (!silent) this.onChange.emit(); |
| 270 | + } |
| 271 | + |
| 272 | + @mutex |
| 273 | + public async save(): Promise<Cid> { |
| 274 | + const hasUnsavedChanges = !!this.unsaved.length; |
| 275 | + const head = this.head; |
| 276 | + if (!hasUnsavedChanges) { |
| 277 | + if (head) return head.cid; |
| 278 | + const dto: types.FeedFrameDto = [null, 0, []]; |
| 279 | + const frame = await FeedFrame.create(dto, this.deps.cas); |
| 280 | + this.head = this.tail = frame; |
| 281 | + this.unsaved = []; |
| 282 | + return frame.cid; |
| 283 | + } |
| 284 | + if (!head) { |
| 285 | + const dto: types.FeedFrameDto = [null, 0, this.unsaved]; |
| 286 | + const frame = await FeedFrame.create(dto, this.deps.cas); |
| 287 | + this.head = this.tail = frame; |
| 288 | + this.unsaved = []; |
| 289 | + return frame.cid; |
| 290 | + } |
| 291 | + const headOps = head.ops(); |
| 292 | + const addToHead = headOps.length < this.opsPerFrame; |
| 293 | + if (addToHead) { |
| 294 | + const dto: types.FeedFrameDto = [head.prevCid(), head.seq(), [...headOps, ...this.unsaved]]; |
| 295 | + const frame = await FeedFrame.create(dto, this.deps.cas); |
| 296 | + frame.prev = head.prev; |
| 297 | + this.head = frame; |
| 298 | + this.unsaved = []; |
| 299 | + return frame.cid; |
| 300 | + } |
| 301 | + const dto: types.FeedFrameDto = [head.cid.toBinaryV1(), head.seq() + 1, this.unsaved]; |
| 302 | + const frame = await FeedFrame.create(dto, this.deps.cas); |
| 303 | + frame.prev = head; |
| 304 | + this.head = frame; |
| 305 | + this.unsaved = []; |
| 306 | + return frame.cid; |
| 307 | + } |
| 308 | + |
| 309 | + // ---------------------------------------------------------------- SyncStore |
| 310 | + |
| 311 | + public readonly subscribe = (callback: () => void) => { |
| 312 | + const unsubscribe = this.onChange.listen(() => callback()); |
| 313 | + return () => unsubscribe(); |
| 314 | + }; |
| 315 | + |
| 316 | + public readonly getSnapshot = (): types.FeedOpInsert[] => { |
| 317 | + const ops: types.FeedOpInsert[] = []; |
| 318 | + this.inserts.forEach((node) => ops.push(node.v)); |
| 319 | + return ops; |
| 320 | + }; |
| 321 | +} |
0 commit comments