diff --git a/codex/erasure/backend.nim b/codex/erasure/backend.nim index 32009829b2..2bd3cc4b2a 100644 --- a/codex/erasure/backend.nim +++ b/codex/erasure/backend.nim @@ -29,18 +29,14 @@ method release*(self: ErasureBackend) {.base, gcsafe.} = raiseAssert("not implemented!") method encode*( - self: EncoderBackend, - buffers, parity: ptr UncheckedArray[ptr UncheckedArray[byte]], - dataLen, parityLen: int, + self: EncoderBackend, data, parity: var openArray[seq[byte]] ): Result[void, cstring] {.base, gcsafe.} = ## encode buffers using a backend ## raiseAssert("not implemented!") method decode*( - self: DecoderBackend, - buffers, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], - dataLen, parityLen, recoveredLen: int, + self: DecoderBackend, data, parity, recovered: var openArray[seq[byte]] ): Result[void, cstring] {.base, gcsafe.} = ## decode buffers using a backend ## diff --git a/codex/erasure/backends/leopard.nim b/codex/erasure/backends/leopard.nim index a0016570dc..5407959ac7 100644 --- a/codex/erasure/backends/leopard.nim +++ b/codex/erasure/backends/leopard.nim @@ -22,13 +22,11 @@ type decoder*: Option[LeoDecoder] method encode*( - self: LeoEncoderBackend, - data, parity: ptr UncheckedArray[ptr UncheckedArray[byte]], - dataLen, parityLen: int, + self: LeoEncoderBackend, data, parity: var openArray[seq[byte]] ): Result[void, cstring] = ## Encode data using Leopard backend - if parityLen == 0: + if parity.len == 0: return ok() var encoder = @@ -38,12 +36,10 @@ method encode*( else: self.encoder.get() - encoder.encode(data, parity, dataLen, parityLen) + encoder.encode(data, parity) method decode*( - self: LeoDecoderBackend, - data, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], - dataLen, parityLen, recoveredLen: int, + self: LeoDecoderBackend, data, parity, recovered: var openArray[seq[byte]] ): Result[void, cstring] = ## Decode data using given Leopard backend @@ -54,7 +50,7 @@ method decode*( else: self.decoder.get() - decoder.decode(data, parity, recovered, dataLen, parityLen, recoveredLen) + decoder.decode(data, parity, recovered) method release*(self: LeoEncoderBackend) = if self.encoder.isSome: diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 95516500f9..2f5d342a45 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -32,6 +32,7 @@ import ../utils/asynciter import ../indexingstrategy import ../errors import ../utils/arrayutils +import ../utils/uniqueptr import pkg/stew/byteutils @@ -71,12 +72,14 @@ type DecoderProvider* = proc(size, blocks, parity: int): DecoderBackend {.raises: [Defect], noSideEffect.} - Erasure* = ref object + Erasure* = object taskPool: Taskpool encoderProvider*: EncoderProvider decoderProvider*: DecoderProvider store*: BlockStore + ErasureRef* = ref Erasure + EncodingParams = object ecK: Natural ecM: Natural @@ -95,19 +98,18 @@ type EncodeTask = object success: Atomic[bool] erasure: ptr Erasure - blocks: ptr UncheckedArray[ptr UncheckedArray[byte]] - parity: ptr UncheckedArray[ptr UncheckedArray[byte]] - blockSize, blocksLen, parityLen: int + blocks: seq[seq[byte]] + parity: UniquePtr[seq[seq[byte]]] + blockSize, parityLen: int signal: ThreadSignalPtr DecodeTask = object success: Atomic[bool] erasure: ptr Erasure - blocks: ptr UncheckedArray[ptr UncheckedArray[byte]] - parity: ptr UncheckedArray[ptr UncheckedArray[byte]] - recovered: ptr UncheckedArray[ptr UncheckedArray[byte]] - blockSize, blocksLen: int - parityLen, recoveredLen: int + blocks: seq[seq[byte]] + parity: seq[seq[byte]] + recovered: UniquePtr[seq[seq[byte]]] + blockSize, recoveredLen: int signal: ThreadSignalPtr func indexToPos(steps, idx, step: int): int {.inline.} = @@ -121,7 +123,7 @@ func indexToPos(steps, idx, step: int): int {.inline.} = (idx - step) div steps proc getPendingBlocks( - self: Erasure, manifest: Manifest, indices: seq[int] + self: ErasureRef, manifest: Manifest, indices: seq[int] ): AsyncIter[(?!bt.Block, int)] = ## Get pending blocks iterator ## @@ -157,7 +159,7 @@ proc getPendingBlocks( AsyncIter[(?!bt.Block, int)].new(genNext, isFinished) proc prepareEncodingData( - self: Erasure, + self: ErasureRef, manifest: Manifest, params: EncodingParams, step: Natural, @@ -201,7 +203,7 @@ proc prepareEncodingData( success(resolved.Natural) proc prepareDecodingData( - self: Erasure, + self: ErasureRef, encoded: Manifest, step: Natural, data: ref seq[seq[byte]], @@ -297,48 +299,43 @@ proc init*( proc leopardEncodeTask(tp: Taskpool, task: ptr EncodeTask) {.gcsafe.} = # Task suitable for running in taskpools - look, no GC! - let encoder = - task[].erasure.encoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen) + let encoder = task[].erasure.encoderProvider( + task[].blockSize, task[].blocks.len, task[].parityLen + ) defer: encoder.release() discard task[].signal.fireSync() - if ( - let res = - encoder.encode(task[].blocks, task[].parity, task[].blocksLen, task[].parityLen) - res.isErr - ): + var parity = newSeqWith(task[].parityLen, newSeq[byte](task[].blockSize)) + if (let res = encoder.encode(task[].blocks, parity); res.isErr): warn "Error from leopard encoder backend!", error = $res.error task[].success.store(false) else: + var paritySeq = newSeq[seq[byte]](task[].parityLen) + for i in 0 ..< task[].parityLen: + var innerSeq = isolate(parity[i]) + paritySeq[i] = extract(innerSeq) + + task[].parity = newUniquePtr(paritySeq) task[].success.store(true) proc asyncEncode*( - self: Erasure, - blockSize, blocksLen, parityLen: int, - blocks: ref seq[seq[byte]], - parity: ptr UncheckedArray[ptr UncheckedArray[byte]], -): Future[?!void] {.async: (raises: [CancelledError]).} = - without threadPtr =? ThreadSignalPtr.new(): - return failure("Unable to create thread signal") - - defer: - threadPtr.close().expect("closing once works") - - var data = makeUncheckedArray(blocks) + self: ErasureRef, blockSize, parityLen: int, blocks: seq[seq[byte]] +): Future[?!seq[seq[byte]]] {.async: (raises: [CancelledError]).} = + var threadPtr = ?ThreadSignalPtr.new().mapFailure() defer: - dealloc(data) + if threadPtr != nil: + ?threadPtr.close().mapFailure() + threadPtr = nil ## Create an ecode task with block data var task = EncodeTask( - erasure: addr self, + erasure: cast[ptr Erasure](self), blockSize: blockSize, - blocksLen: blocksLen, parityLen: parityLen, - blocks: data, - parity: parity, + blocks: blocks, signal: threadPtr, ) @@ -347,21 +344,20 @@ proc asyncEncode*( self.taskPool.spawn leopardEncodeTask(self.taskPool, addr task) let threadFut = threadPtr.wait() - if joinErr =? catch(await threadFut.join()).errorOption: - if err =? catch(await noCancel threadFut).errorOption: - return failure(err) - if joinErr of CancelledError: - raise (ref CancelledError) joinErr - else: - return failure(joinErr) + if err =? catch(await threadFut.join()).errorOption: + ?catch(await noCancel threadFut) + if err of CancelledError: + raise (ref CancelledError) err + + return failure(err) if not task.success.load(): return failure("Leopard encoding task failed") - success() + success extractValue(task.parity) proc encodeData( - self: Erasure, manifest: Manifest, params: EncodingParams + self: ErasureRef, manifest: Manifest, params: EncodingParams ): Future[?!Manifest] {.async.} = ## Encode blocks pointed to by the protected manifest ## @@ -383,17 +379,9 @@ proc encodeData( try: for step in 0 ..< params.steps: # TODO: Don't allocate a new seq every time, allocate once and zero out - var - data = seq[seq[byte]].new() # number of blocks to encode - parity = createDoubleArray(params.ecM, manifest.blockSize.int) - defer: - freeDoubleArray(parity, params.ecM) + var data = seq[seq[byte]].new() # number of blocks to encode data[].setLen(params.ecK) - # TODO: this is a tight blocking loop so we sleep here to allow - # other events to be processed, this should be addressed - # by threading - await sleepAsync(10.millis) without resolved =? (await self.prepareEncodingData(manifest, params, step, data, cids, emptyBlock)), @@ -403,21 +391,15 @@ proc encodeData( trace "Erasure coding data", data = data[].len + var parity: seq[seq[byte]] try: - if err =? ( - await self.asyncEncode( - manifest.blockSize.int, params.ecK, params.ecM, data, parity - ) - ).errorOption: - return failure(err) + parity = ?(await self.asyncEncode(manifest.blockSize.int, params.ecM, data[])) except CancelledError as exc: raise exc var idx = params.rounded + step for j in 0 ..< params.ecM: - var innerPtr: ptr UncheckedArray[byte] = parity[][j] - without blk =? bt.Block.new(innerPtr.toOpenArray(0, manifest.blockSize.int - 1)), - error: + without blk =? bt.Block.new(parity[j]), error: trace "Unable to create parity block", err = error.msg return failure(error) @@ -456,7 +438,7 @@ proc encodeData( return failure(exc) proc encode*( - self: Erasure, + self: ErasureRef, manifest: Manifest, blocks: Natural, parity: Natural, @@ -479,58 +461,43 @@ proc encode*( proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} = # Task suitable for running in taskpools - look, no GC! - let decoder = - task[].erasure.decoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen) + let decoder = task[].erasure.decoderProvider( + task[].blockSize, task[].blocks.len, task[].parity.len + ) defer: decoder.release() discard task[].signal.fireSync() - if ( - let res = decoder.decode( - task[].blocks, - task[].parity, - task[].recovered, - task[].blocksLen, - task[].parityLen, - task[].recoveredLen, - ) - res.isErr - ): + var recovered = newSeqWith(task[].blocks.len, newSeq[byte](task[].blockSize)) + + if (let res = decoder.decode(task[].blocks, task[].parity, recovered); res.isErr): warn "Error from leopard decoder backend!", error = $res.error task[].success.store(false) else: + var recoveredSeq = newSeq[seq[byte]](task[].blocks.len) + for i in 0 ..< task[].blocks.len: + var innerSeq = isolate(recovered[i]) + recoveredSeq[i] = extract(innerSeq) + + task[].recovered = newUniquePtr(recoveredSeq) task[].success.store(true) proc asyncDecode*( - self: Erasure, - blockSize, blocksLen, parityLen: int, - blocks, parity: ref seq[seq[byte]], - recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], -): Future[?!void] {.async: (raises: [CancelledError]).} = - without threadPtr =? ThreadSignalPtr.new(): - return failure("Unable to create thread signal") + self: ErasureRef, blockSize: int, blocks, parity: seq[seq[byte]] +): Future[?!seq[seq[byte]]] {.async: (raises: [CancelledError]).} = + var threadPtr = ?ThreadSignalPtr.new().mapFailure() defer: - threadPtr.close().expect("closing once works") - - var - blockData = makeUncheckedArray(blocks) - parityData = makeUncheckedArray(parity) - - defer: - dealloc(blockData) - dealloc(parityData) + if threadPtr != nil: + ?threadPtr.close().mapFailure() + threadPtr = nil ## Create an decode task with block data var task = DecodeTask( - erasure: addr self, + erasure: cast[ptr Erasure](self), blockSize: blockSize, - blocksLen: blocksLen, - parityLen: parityLen, - recoveredLen: blocksLen, - blocks: blockData, - parity: parityData, - recovered: recovered, + blocks: blocks, + parity: parity, signal: threadPtr, ) @@ -539,21 +506,20 @@ proc asyncDecode*( self.taskPool.spawn leopardDecodeTask(self.taskPool, addr task) let threadFut = threadPtr.wait() - if joinErr =? catch(await threadFut.join()).errorOption: - if err =? catch(await noCancel threadFut).errorOption: - return failure(err) - if joinErr of CancelledError: - raise (ref CancelledError) joinErr - else: - return failure(joinErr) + if err =? catch(await threadFut.join()).errorOption: + ?catch(await noCancel threadFut) + if err of CancelledError: + raise (ref CancelledError) err + + return failure(err) if not task.success.load(): return failure("Leopard decoding task failed") - success() + success extractValue(task.recovered) proc decodeInternal( - self: Erasure, encoded: Manifest + self: ErasureRef, encoded: Manifest ): Future[?!(ref seq[Cid], seq[Natural])] {.async.} = logScope: steps = encoded.steps @@ -569,17 +535,9 @@ proc decodeInternal( cids[].setLen(encoded.blocksCount) try: for step in 0 ..< encoded.steps: - # TODO: this is a tight blocking loop so we sleep here to allow - # other events to be processed, this should be addressed - # by threading - await sleepAsync(10.millis) - var data = seq[seq[byte]].new() parityData = seq[seq[byte]].new() - recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int) - defer: - freeDoubleArray(recovered, encoded.ecK) data[].setLen(encoded.ecK) # set len to K parityData[].setLen(encoded.ecM) # set len to M @@ -595,26 +553,18 @@ proc decodeInternal( if dataPieces >= encoded.ecK: trace "Retrieved all the required data blocks" continue - + var recovered: seq[seq[byte]] trace "Erasure decoding data" try: - if err =? ( - await self.asyncDecode( - encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered - ) - ).errorOption: - return failure(err) + recovered = + ?(await self.asyncDecode(encoded.blockSize.int, data[], parityData[])) except CancelledError as exc: raise exc for i in 0 ..< encoded.ecK: let idx = i * encoded.steps + step if data[i].len <= 0 and not cids[idx].isEmpty: - var innerPtr: ptr UncheckedArray[byte] = recovered[][i] - - without blk =? bt.Block.new( - innerPtr.toOpenArray(0, encoded.blockSize.int - 1) - ), error: + without blk =? bt.Block.new(recovered[i]), error: trace "Unable to create block!", exc = error.msg return failure(error) @@ -638,7 +588,7 @@ proc decodeInternal( return (cids, recoveredIndices).success -proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = +proc decode*(self: ErasureRef, encoded: Manifest): Future[?!Manifest] {.async.} = ## Decode a protected manifest into it's original ## manifest ## @@ -670,7 +620,7 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = return decoded.success -proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} = +proc repair*(self: ErasureRef, encoded: Manifest): Future[?!void] {.async.} = ## Repair a protected manifest by reconstructing the full dataset ## ## `encoded` - the encoded (protected) manifest to @@ -715,15 +665,15 @@ proc stop*(self: Erasure) {.async.} = return proc new*( - T: type Erasure, + _: type ErasureRef, store: BlockStore, encoderProvider: EncoderProvider, decoderProvider: DecoderProvider, taskPool: Taskpool, -): Erasure = - ## Create a new Erasure instance for encoding and decoding manifests +): ErasureRef = + ## Create a new ErasureRef instance for encoding and decoding manifests ## - Erasure( + ErasureRef( store: store, encoderProvider: encoderProvider, decoderProvider: decoderProvider, diff --git a/codex/node.nim b/codex/node.nim index e010b08540..9d00275a2a 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -293,7 +293,7 @@ proc streamEntireDataset( proc erasureJob(): Future[void] {.async: (raises: []).} = try: # Spawn an erasure decoding job - let erasure = Erasure.new( + let erasure = ErasureRef.new( self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool ) without _ =? (await erasure.decode(manifest)), error: @@ -532,7 +532,7 @@ proc setupRequest( return failure error # Erasure code the dataset according to provided parameters - let erasure = Erasure.new( + let erasure = ErasureRef.new( self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool ) @@ -678,7 +678,7 @@ proc onStore( if isRepairing: trace "start repairing slot", slotIdx try: - let erasure = Erasure.new( + let erasure = ErasureRef.new( self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool ) if err =? (await erasure.repair(manifest)).errorOption: diff --git a/codex/utils/uniqueptr.nim b/codex/utils/uniqueptr.nim new file mode 100644 index 0000000000..43eeeea6ea --- /dev/null +++ b/codex/utils/uniqueptr.nim @@ -0,0 +1,61 @@ +import std/isolation +type UniquePtr*[T] = object + ## A unique pointer to a seq[seq[T]] in shared memory + ## Can only be moved, not copied + data: ptr T + +proc newUniquePtr*[T](data: sink Isolated[T]): UniquePtr[T] = + ## Creates a new unique sequence in shared memory + ## The memory is automatically freed when the object is destroyed + result.data = cast[ptr T](allocShared0(sizeof(T))) + + result.data[] = extract(data) + +template newUniquePtr*[T](data: T): UniquePtr[T] = + newUniquePtr(isolate(data)) + +proc `=destroy`*[T](p: var UniquePtr[T]) = + ## Destructor for UniquePtr + if p.data != nil: + deallocShared(p.data) + p.data = nil + +proc `=copy`*[T]( + dest: var UniquePtr[T], src: UniquePtr[T] +) {.error: "UniquePtr cannot be copied, only moved".} + +proc `=sink`*[T](dest: var UniquePtr[T], src: UniquePtr[T]) = + if dest.data != nil: + `=destroy`(dest) + dest.data = src.data + # We need to nil out the source data to prevent double-free + # This is handled by Nim's destructive move semantics + +proc `[]`*[T](p: UniquePtr[T]): lent T = + ## Access the data (read-only) + if p.data == nil: + raise newException(NilAccessDefect, "accessing nil UniquePtr") + p.data[] + +# proc `[]`*[T](p: var UniquePtr[T]): var T = +# ## Access the data (mutable) +# if p.data == nil: +# raise newException(NilAccessDefect, "accessing nil UniquePtr") +# p.data[] + +proc isNil*[T](p: UniquePtr[T]): bool = + ## Check if the UniquePtr is nil + p.data == nil + +proc extractValue*[T](p: var UniquePtr[T]): T = + ## Extract the value from the UniquePtr and release the memory + if p.data == nil: + raise newException(NilAccessDefect, "extracting from nil UniquePtr") + + # Move the value out + var isolated = isolate(p.data[]) + result = extract(isolated) + + # Free the shared memory + deallocShared(p.data) + p.data = nil diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index e8d9c743e4..384e9c0294 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -75,7 +75,8 @@ asyncchecksuite "Test Node - Host contracts": let manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) + erasure = + ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) manifestCid = manifestBlock.cid diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 78298ad758..d01e1ff692 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -175,7 +175,7 @@ asyncchecksuite "Test Node - Basic": test "Setup purchase request": let erasure = - Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new()) + ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new()) manifest = await storeDataGetManifest(localStore, chunker) manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() diff --git a/tests/codex/node/testslotrepair.nim b/tests/codex/node/testslotrepair.nim index d96078d292..6891f27d0d 100644 --- a/tests/codex/node/testslotrepair.nim +++ b/tests/codex/node/testslotrepair.nim @@ -95,7 +95,7 @@ asyncchecksuite "Test Node - Slot Repair": manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() erasure = - Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool) + ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool) (await localStore.putBlock(manifestBlock)).tryGet() @@ -174,7 +174,7 @@ asyncchecksuite "Test Node - Slot Repair": manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() erasure = - Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool) + ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool) (await localStore.putBlock(manifestBlock)).tryGet() diff --git a/tests/codex/testerasure.nim b/tests/codex/testerasure.nim index 5046bac29f..964d2f01e9 100644 --- a/tests/codex/testerasure.nim +++ b/tests/codex/testerasure.nim @@ -27,7 +27,7 @@ suite "Erasure encode/decode": var chunker: Chunker var manifest: Manifest var store: BlockStore - var erasure: Erasure + var erasure: ErasureRef let repoTmp = TempLevelDb.new() let metaTmp = TempLevelDb.new() var taskpool: Taskpool @@ -40,7 +40,7 @@ suite "Erasure encode/decode": chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize) store = RepoStore.new(repoDs, metaDs) taskpool = Taskpool.new() - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) + erasure = ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) manifest = await storeDataGetManifest(store, chunker) teardown: @@ -253,7 +253,7 @@ suite "Erasure encode/decode": for i in 0 ..< encodeResults.len: decodeTasks.add(erasure.decode(encodeResults[i].read().tryGet())) # wait for all decoding tasks to finish - let decodeResults = await allFinished(decodeTasks) # TODO: use allFutures + let decodeResults = await allFinished(decodeTasks) # TODO: use allFutures for j in 0 ..< decodeTasks.len: let @@ -302,73 +302,3 @@ suite "Erasure encode/decode": decoded.treeCid == manifest.treeCid decoded.treeCid == encoded.originalTreeCid decoded.blocksCount == encoded.originalBlocksCount - - test "Should complete encode/decode task when cancelled": - let - blocksLen = 10000 - parityLen = 10 - data = seq[seq[byte]].new() - chunker = RandomChunker.new( - rng, size = (blocksLen * BlockSize.int), chunkSize = BlockSize - ) - - data[].setLen(blocksLen) - - for i in 0 ..< blocksLen: - let chunk = await chunker.getBytes() - shallowCopy(data[i], @(chunk)) - - let - parity = createDoubleArray(parityLen, BlockSize.int) - paritySeq = seq[seq[byte]].new() - recovered = createDoubleArray(blocksLen, BlockSize.int) - cancelledTaskParity = createDoubleArray(parityLen, BlockSize.int) - cancelledTaskRecovered = createDoubleArray(blocksLen, BlockSize.int) - - paritySeq[].setLen(parityLen) - defer: - freeDoubleArray(parity, parityLen) - freeDoubleArray(cancelledTaskParity, parityLen) - freeDoubleArray(recovered, blocksLen) - freeDoubleArray(cancelledTaskRecovered, blocksLen) - - for i in 0 ..< parityLen: - paritySeq[i] = cast[seq[byte]](parity[i]) - - # call asyncEncode to get the parity - let encFut = - await erasure.asyncEncode(BlockSize.int, blocksLen, parityLen, data, parity) - check encFut.isOk - - let decFut = await erasure.asyncDecode( - BlockSize.int, blocksLen, parityLen, data, paritySeq, recovered - ) - check decFut.isOk - - # call asyncEncode and cancel the task - let encodeFut = erasure.asyncEncode( - BlockSize.int, blocksLen, parityLen, data, cancelledTaskParity - ) - encodeFut.cancel() - - try: - discard await encodeFut - except CatchableError as exc: - check exc of CancelledError - finally: - for i in 0 ..< parityLen: - check equalMem(parity[i], cancelledTaskParity[i], BlockSize.int) - - # call asyncDecode and cancel the task - let decodeFut = erasure.asyncDecode( - BlockSize.int, blocksLen, parityLen, data, paritySeq, cancelledTaskRecovered - ) - decodeFut.cancel() - - try: - discard await decodeFut - except CatchableError as exc: - check exc of CancelledError - finally: - for i in 0 ..< blocksLen: - check equalMem(recovered[i], cancelledTaskRecovered[i], BlockSize.int)