diff --git a/codex/node.nim b/codex/node.nim index 9932deb6b1..99eefb43af 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -53,6 +53,7 @@ logScope: topics = "codex node" const DefaultFetchBatch = 10 +const DefaultLeafBatch = 100 type Contracts* = @@ -345,19 +346,9 @@ proc deleteEntireDataset(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} without manifest =? Manifest.decode(manifestBlock), err: return failure(err) - let runtimeQuota = initDuration(milliseconds = 100) - var lastIdle = getTime() - for i in 0 ..< manifest.blocksCount: - if (getTime() - lastIdle) >= runtimeQuota: - await idleAsync() - lastIdle = getTime() - - if err =? (await store.delBlock(manifest.treeCid, i)).errorOption: - # The contract for delBlock is fuzzy, but we assume that if the block is - # simply missing we won't get an error. This is a best effort operation and - # can simply be retried. - error "Failed to delete block within dataset", index = i, err = err.msg - return failure(err) + if err =? (await store.delBlocks(manifest.treeCid, manifest.blocksCount)).errorOption: + error "Error deleting blocks", err = err.msg + return failure(err) if err =? (await store.delBlock(cid)).errorOption: error "Error deleting manifest block", err = err.msg @@ -399,6 +390,8 @@ proc store*( dataCodec = BlockCodec chunker = LPStreamChunker.new(stream, chunkSize = blockSize) + var proofs = newSeq[CodexProof]() + var cids: seq[Cid] try: @@ -430,13 +423,23 @@ proc store*( without treeCid =? tree.rootCid(CIDv1, dataCodec), err: return failure(err) + var + batch = newSeq[(Cid, CodexProof)]() + batchStartIndex = 0 + for index, cid in cids: without proof =? tree.getProof(index), err: return failure(err) - if err =? - (await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption: - # TODO add log here - return failure(err) + batch.add((cid, proof)) + + if batch.len >= DefaultLeafBatch or index == cids.len - 1: + if err =? ( + await self.networkStore.putCidAndProofBatch(treeCid, batchStartIndex, batch) + ).errorOption: + # TODO add log here + return failure(err) + batch.setLen(0) + batchStartIndex = index + 1 let manifest = Manifest.new( treeCid = treeCid, diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 243d4ed696..cbf36bc3f2 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -366,7 +366,7 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute router.api(MethodGet, "/api/codex/v1/space") do() -> RestApiResponse: let json = %RestRepoStore( - totalBlocks: repoStore.totalBlocks, + totalBlocks: repoStore.storageStats.totalBlocks, quotaMaxBytes: repoStore.quotaMaxBytes, quotaUsedBytes: repoStore.quotaUsedBytes, quotaReservedBytes: repoStore.quotaReservedBytes, diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 1ea57a0f23..e5832fa709 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -224,6 +224,9 @@ proc buildSlot*[T, H]( slotIndex = slotIndex trace "Building slot tree" + var + batch = newSeq[(Cid, CodexProof)]() + batchStartIndex = 0 without tree =? (await self.buildSlotTree(slotIndex)) and treeCid =? tree.root .? toSlotCid, err: @@ -240,10 +243,14 @@ proc buildSlot*[T, H]( error "Failed to get proof for slot tree", err = err.msg return failure(err) - if err =? - (await self.store.putCidAndProof(treeCid, i, cellCid, encodableProof)).errorOption: - error "Failed to store slot tree", err = err.msg - return failure(err) + batch.add((cellCid, encodableProof)) + + if batch.len >= 50 or i == tree.leaves.len - 1: + if err =? (await self.store.putCidAndProofBatch(treeCid, batchStartIndex, batch)).errorOption: + error "Failed to store slot tree", err = err.msg + return failure(err) + batch.setLen(0) + batchStartIndex = i + 1 tree.root() diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 78fab0da7c..5cbd705674 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -87,6 +87,14 @@ method putCidAndProof*( raiseAssert("putCidAndProof not implemented!") +method putCidAndProofBatch*( + self: BlockStore, treeCid: Cid, startIndex: int, entries: seq[(Cid, CodexProof)] +): Future[?!void] {.base, gcsafe.} = + ## Put a batch of block proofs to the blockstore + ## + + raiseAssert("putCidAndProofBatch not implemented!") + method getCidAndProof*( self: BlockStore, treeCid: Cid, index: Natural ): Future[?!(Cid, CodexProof)] {.base, gcsafe.} = @@ -127,6 +135,14 @@ method delBlock*( raiseAssert("delBlock not implemented!") +method delBlocks*( + self: BlockStore, treeCid: Cid, count: int +): Future[?!void] {.base, gcsafe.} = + ## Delete a block from the blockstore + ## + + raiseAssert("delBlock not implemented!") + method hasBlock*(self: BlockStore, cid: Cid): Future[?!bool] {.base, gcsafe.} = ## Check if the block exists in the blockstore ## diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index f94bca3300..e36fb585aa 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -69,6 +69,11 @@ method putBlock*( await self.engine.resolveBlocks(@[blk]) return success() +method putCidAndProofBatch*( + self: NetworkStore, treeCid: Cid, startIndex: int, entries: seq[(Cid, CodexProof)] +): Future[?!void] = + self.localStore.putCidAndProofBatch(treeCid, startIndex, entries) + method putCidAndProof*( self: NetworkStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof ): Future[?!void] = @@ -128,6 +133,13 @@ method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] = trace "Deleting block from network store", cid return self.localStore.delBlock(cid) +method delBlocks*(self: NetworkStore, treeCid: Cid, count: int): Future[?!void] = + ## Delete a block from the blockstore + ## + + trace "Deleting blocks from network store", treeCid + return self.localStore.delBlocks(treeCid, count) + {.pop.} method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} = diff --git a/codex/stores/repostore/coders.nim b/codex/stores/repostore/coders.nim index 47df721936..e656886d3e 100644 --- a/codex/stores/repostore/coders.nim +++ b/codex/stores/repostore/coders.nim @@ -19,10 +19,10 @@ import ../../errors import ../../merkletree import ../../utils/json -proc encode*(t: QuotaUsage): seq[byte] = +proc encode*(t: StorageStats): seq[byte] = t.toJson().toBytes() -proc decode*(T: type QuotaUsage, bytes: seq[byte]): ?!T = +proc decode*(T: type StorageStats, bytes: seq[byte]): ?!T = T.fromJson(bytes) proc encode*(t: BlockMetadata): seq[byte] = diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim index 125741e193..4926fb19fa 100644 --- a/codex/stores/repostore/operations.nim +++ b/codex/stores/repostore/operations.nim @@ -82,57 +82,97 @@ proc getLeafMetadata*( success(leafMd) -proc updateTotalBlocksCount*( - self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0 -): Future[?!void] {.async.} = - await self.metaDs.modify( - CodexTotalBlocksKey, - proc(maybeCurrCount: ?Natural): Future[?Natural] {.async.} = - let count: Natural = - if currCount =? maybeCurrCount: - currCount + plusCount - minusCount - else: - plusCount - minusCount - - self.totalBlocks = count - codex_repostore_blocks.set(count.int64) - count.some, - ) - -proc updateQuotaUsage*( +proc updateQuotaAndBlockCount*( self: RepoStore, + plusCount: Natural = 0, + minusCount: Natural = 0, plusUsed: NBytes = 0.NBytes, minusUsed: NBytes = 0.NBytes, plusReserved: NBytes = 0.NBytes, minusReserved: NBytes = 0.NBytes, ): Future[?!void] {.async.} = await self.metaDs.modify( - QuotaUsedKey, - proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} = - var usage: QuotaUsage - - if currUsage =? maybeCurrUsage: - usage = QuotaUsage( - used: currUsage.used + plusUsed - minusUsed, - reserved: currUsage.reserved + plusReserved - minusReserved, + CodexTotalBlocksKey, + proc(maybeCurrStats: ?StorageStats): Future[?StorageStats] {.async.} = + var stats: StorageStats + if currStats =? maybeCurrStats: + stats = StorageStats( + quotaUsed: currStats.quotaUsed + plusUsed - minusUsed, + quotaReserved: currStats.quotaReserved + plusReserved - minusReserved, + totalBlocks: currStats.totalBlocks + plusCount - minusCount, ) else: - usage = - QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved) + stats = StorageStats( + quotaUsed: plusUsed - minusUsed, + quotaReserved: plusReserved - minusReserved, + totalBlocks: plusCount - minusCount, + ) - if usage.used + usage.reserved > self.quotaMaxBytes: + if stats.quotaUsed + stats.quotaReserved > self.quotaMaxBytes: raise newException( QuotaNotEnoughError, - "Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " & - $usage.reserved & ", limit: " & $self.quotaMaxBytes, + "Quota usage would exceed the limit. Used: " & $stats.quotaUsed & + ", reserved: " & $stats.quotaReserved & ", limit: " & $self.quotaMaxBytes, ) else: - self.quotaUsage = usage - codex_repostore_bytes_used.set(usage.used.int64) - codex_repostore_bytes_reserved.set(usage.reserved.int64) - return usage.some, + self.storageStats = stats + codex_repostore_bytes_used.set(stats.quotaUsed.int64) + codex_repostore_bytes_reserved.set(stats.quotaReserved.int64) + codex_repostore_blocks.set(stats.totalBlocks.int64) + return stats.some, ) +# proc updateTotalBlocksCount*( +# self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0 +# ): Future[?!void] {.async.} = +# await self.metaDs.modify( +# CodexTotalBlocksKey, +# proc(maybeCurrCount: ?Natural): Future[?Natural] {.async.} = +# let count: Natural = +# if currCount =? maybeCurrCount: +# currCount + plusCount - minusCount +# else: +# plusCount - minusCount + +# self.totalBlocks = count +# codex_repostore_blocks.set(count.int64) +# count.some, +# ) + +# proc updateQuotaUsage*( +# self: RepoStore, +# plusUsed: NBytes = 0.NBytes, +# minusUsed: NBytes = 0.NBytes, +# plusReserved: NBytes = 0.NBytes, +# minusReserved: NBytes = 0.NBytes, +# ): Future[?!void] {.async.} = +# await self.metaDs.modify( +# QuotaUsedKey, +# proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} = +# var usage: QuotaUsage + +# if currUsage =? maybeCurrUsage: +# usage = QuotaUsage( +# used: currUsage.used + plusUsed - minusUsed, +# reserved: currUsage.reserved + plusReserved - minusReserved, +# ) +# else: +# usage = +# QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved) + +# if usage.used + usage.reserved > self.quotaMaxBytes: +# raise newException( +# QuotaNotEnoughError, +# "Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " & +# $usage.reserved & ", limit: " & $self.quotaMaxBytes, +# ) +# else: +# self.quotaUsage = usage +# codex_repostore_bytes_used.set(usage.used.int64) +# codex_repostore_bytes_reserved.set(usage.reserved.int64) +# return usage.some, +# ) + proc updateBlockMetadata*( self: RepoStore, cid: Cid, diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index d7305107fa..4b943ec235 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -15,6 +15,8 @@ import pkg/libp2p/[cid, multicodec] import pkg/questionable import pkg/questionable/results +import times + import ./coders import ./types import ./operations @@ -114,11 +116,59 @@ method ensureExpiry*( await self.ensureExpiry(leafMd.blkCid, expiry) +method putCidAndProofBatch*( + self: RepoStore, treeCid: Cid, startIndex: int, entries: seq[(Cid, CodexProof)] +): Future[?!void] {.async.} = + var + batch = newSeq[BatchEntry]() + results = newSeq[StoreResult](entries.len) + lock = self.locks.mgetOrPut(treeCid, newAsyncLock()) + treeIndex = startIndex + + try: + await lock.acquire() + for i, entry in entries: + without key =? createBlockCidAndProofMetadataKey(treeCid, treeIndex), err: + return failure(err) + + # Check existence before adding to batch + without exists =? await self.metaDs.has(key), err: + return failure(err) + if exists: + results[i] = StoreResult(kind: AlreadyInStore) + else: + results[i] = StoreResult(kind: Stored) + var metadata = LeafMetadata(blkCid: entry[0], proof: entry[1]) + batch.add((key: key, data: metadata.encode)) + treeIndex += 1 + + try: + if err =? (await self.metaDs.ds.put(batch)).errorOption: + return failure(err) + except CatchableError as e: + return failure(e.msg) + finally: + lock.release() + if not lock.locked: + self.locks.del(treeCid) + + # Update reference counts for blocks that were stored + for i, entry in entries: + if results[i].kind == Stored and entry[0].mcodec == BlockCodec: + if err =? (await self.updateBlockMetadata(entry[0], plusRefCount = 1)).errorOption: + return failure(err) + trace "Leaf metadata stored, block refCount incremented" + else: + trace "Leaf metadata already exists" + + return success() + method putCidAndProof*( self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof ): Future[?!void] {.async.} = ## Put a block to the blockstore ## + # TODO: Add locking for treeCid logScope: treeCid = treeCid @@ -126,17 +176,33 @@ method putCidAndProof*( blkCid = blkCid trace "Storing LeafMetadata" + var lock = self.locks.mgetOrPut(treeCid, newAsyncLock()) + try: + await lock.acquire() + without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: + return failure(err) - without res =? await self.putLeafMetadata(treeCid, index, blkCid, proof), err: - return failure(err) + without exists =? await self.metaDs.has(key), err: + return failure(err) - if blkCid.mcodec == BlockCodec: - if res == Stored: - if err =? (await self.updateBlockMetadata(blkCid, plusRefCount = 1)).errorOption: + if exists: + trace "Leaf metadata already exists" + return success() + else: + var metadata = LeafMetadata(blkCid: blkCid, proof: proof) + if err =? (await self.metaDs.put(key, metadata)).errorOption: return failure(err) + + if blkCid.mcodec == BlockCodec: + if err =? (await self.updateBlockMetadata(blkCid, plusRefCount = 1)).errorOption: + echo "updateBlockMetadata failed" + return failure(err) + trace "Leaf metadata stored, block refCount incremented" - else: - trace "Leaf metadata already exists" + finally: + lock.release() + if not lock.locked: + self.locks.del(treeCid) return success() @@ -155,11 +221,10 @@ method getCid*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Cid] {.a success(leafMd.blkCid) method putBlock*( - self: RepoStore, blk: Block, ttl = Duration.none + self: RepoStore, blk: Block, ttl = timer.Duration.none ): Future[?!void] {.async.} = ## Put a block to the blockstore ## - logScope: cid = blk.cid @@ -170,13 +235,10 @@ method putBlock*( if res.kind == Stored: trace "Block Stored" - if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption: - # rollback changes + if err =? (await self.updateQuotaAndBlockCount(plusCount = 1, plusUsed = res.used)).errorOption: without delRes =? await self.tryDeleteBlock(blk.cid), err: return failure(err) - return failure(err) - if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption: return failure(err) if onBlock =? self.onBlockStored: @@ -200,10 +262,15 @@ proc delBlockInternal(self: RepoStore, cid: Cid): Future[?!DeleteResultKind] {.a if res.kind == Deleted: trace "Block deleted" - if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption: - return failure(err) + # if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption: + # return failure(err) + + # if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: + # return failure(err) - if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: + if err =? ( + await self.updateQuotaAndBlockCount(minusCount = 1, minusUsed = res.released) + ).errorOption: return failure(err) success(res.kind) @@ -228,6 +295,54 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = trace "Block already deleted" success() +method delBlocks*( + self: RepoStore, treeCid: Cid, blocksCount: int +): Future[?!void] {.async.} = + let runtimeQuota = initDuration(milliseconds = 100) + var + lastIdle = getTime() + batch = newSeq[Key]() + blckCids = newSeq[Cid]() + batchSize = 1000 + + for i in 0 ..< blocksCount: + if (getTime() - lastIdle) >= runtimeQuota: + await idleAsync() + lastIdle = getTime() + + without key =? createBlockCidAndProofMetadataKey(treeCid, i), err: + return failure(err) + + without leafMd =? await get[LeafMetadata](self.metaDs, key), err: + if err of DatastoreKeyNotFound: + continue + else: + return failure(err) + + blckCids.add(leafMd.blkCid) + batch.add(key) + + if batch.len >= batchSize or i == blocksCount - 1: + try: + if err =? (await self.metaDs.ds.delete(batch)).errorOption: + return failure(err) + except CatchableError as e: + return failure(e.msg) + batch = newSeq[Key]() + + for i, cid in blckCids: + if (getTime() - lastIdle) >= runtimeQuota: + await idleAsync() + lastIdle = getTime() + + if err =? (await self.updateBlockMetadata(cid, minusRefCount = 1)).errorOption: + return failure(err) + + if err =? (await self.delBlockInternal(cid)).errorOption: + return failure(err) + + success() + method delBlock*( self: RepoStore, treeCid: Cid, index: Natural ): Future[?!void] {.async.} = @@ -386,7 +501,7 @@ proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = trace "Reserving bytes", bytes - await self.updateQuotaUsage(plusReserved = bytes) + await self.updateQuotaAndBlockCount(plusReserved = bytes) proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = ## Release bytes @@ -394,7 +509,7 @@ proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = trace "Releasing bytes", bytes - await self.updateQuotaUsage(minusReserved = bytes) + await self.updateQuotaAndBlockCount(minusReserved = bytes) proc start*(self: RepoStore): Future[void] {.async.} = ## Start repo @@ -405,10 +520,7 @@ proc start*(self: RepoStore): Future[void] {.async.} = return trace "Starting rep" - if err =? (await self.updateTotalBlocksCount()).errorOption: - raise newException(CodexError, err.msg) - - if err =? (await self.updateQuotaUsage()).errorOption: + if err =? (await self.updateQuotaAndBlockCount()).errorOption: raise newException(CodexError, err.msg) self.started = true diff --git a/codex/stores/repostore/types.nim b/codex/stores/repostore/types.nim index 42f528e947..a98c8f9381 100644 --- a/codex/stores/repostore/types.nim +++ b/codex/stores/repostore/types.nim @@ -33,14 +33,15 @@ type metaDs*: TypedDatastore clock*: Clock quotaMaxBytes*: NBytes - quotaUsage*: QuotaUsage - totalBlocks*: Natural + storageStats*: StorageStats blockTtl*: Duration started*: bool + locks*: TableRef[Cid, AsyncLock] - QuotaUsage* {.serialize.} = object - used*: NBytes - reserved*: NBytes + StorageStats* {.serialize.} = object + quotaUsed*: NBytes + quotaReserved*: NBytes + totalBlocks*: Natural BlockMetadata* {.serialize.} = object expiry*: SecondsSince1970 @@ -73,10 +74,10 @@ type used*: NBytes func quotaUsedBytes*(self: RepoStore): NBytes = - self.quotaUsage.used + self.storageStats.quotaUsed func quotaReservedBytes*(self: RepoStore): NBytes = - self.quotaUsage.reserved + self.storageStats.quotaReserved func totalUsed*(self: RepoStore): NBytes = (self.quotaUsedBytes + self.quotaReservedBytes) @@ -106,4 +107,5 @@ func new*( quotaMaxBytes: quotaMaxBytes, blockTtl: blockTtl, onBlockStored: CidCallback.none, + locks: newTable[Cid, AsyncLock](), ) diff --git a/tests/codex/stores/repostore/testcoders.nim b/tests/codex/stores/repostore/testcoders.nim index 9d341af076..0c363dabea 100644 --- a/tests/codex/stores/repostore/testcoders.nim +++ b/tests/codex/stores/repostore/testcoders.nim @@ -19,8 +19,10 @@ suite "Test coders": let ordinals = enumRangeInt64(E) E(ordinals[rand(ordinals.len - 1)]) - proc rand(T: type QuotaUsage): T = - QuotaUsage(used: rand(NBytes), reserved: rand(NBytes)) + proc rand(T: type StorageStats): T = + StorageStats( + quotaUsed: rand(NBytes), quotaReserved: rand(NBytes), totalBlocks: rand(Natural) + ) proc rand(T: type BlockMetadata): T = BlockMetadata( @@ -38,10 +40,10 @@ suite "Test coders": check: success(val) == Natural.decode(encode(val)) - test "QuotaUsage encode/decode": - for val in newSeqWith[QuotaUsage](100, rand(QuotaUsage)): + test "StorageStats encode/decode": + for val in newSeqWith[StorageStats](100, rand(StorageStats)): check: - success(val) == QuotaUsage.decode(encode(val)) + success(val) == StorageStats.decode(encode(val)) test "BlockMetadata encode/decode": for val in newSeqWith[BlockMetadata](100, rand(BlockMetadata)): diff --git a/vendor/nim-datastore b/vendor/nim-datastore index d67860add6..0beff39fbe 160000 --- a/vendor/nim-datastore +++ b/vendor/nim-datastore @@ -1 +1 @@ -Subproject commit d67860add63fd23cdacde1d3da8f4739c2660c2d +Subproject commit 0beff39fbe702e58640244600f176bb84f1429a9