From ba183473b89eec533d4f700e2b3f7ee9551b2549 Mon Sep 17 00:00:00 2001 From: Benjamin Arntzen Date: Tue, 6 May 2025 09:56:13 +0100 Subject: [PATCH 1/5] Experimental range support --- codex/rest/api.nim | 153 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 133 insertions(+), 20 deletions(-) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 0d9e5d8020..33bd73986a 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -15,6 +15,7 @@ push: import std/sequtils import std/mimetypes import std/os +import std/strformat import pkg/questionable import pkg/questionable/results @@ -39,6 +40,7 @@ import ../manifest import ../streams/asyncstreamwrapper import ../stores import ../utils/options +import ../streams/seekablestream import ./coders import ./json @@ -46,6 +48,45 @@ import ./json logScope: topics = "codex restapi" +proc parseRangeHeader(rangeHeader: string): Result[(int, Option[int]), string] = + ## Parses a "Range: bytes=start-end" or "Range: bytes=start-" header. + ## Returns Ok((start, end)) or Err(message). + ## 'end' is inclusive. If '-' is used for end, returns none. + ## Very basic implementation, only supports single ranges starting with "bytes=". + if not rangeHeader.startsWith("bytes="): + return err("Invalid Range header format: Does not start with 'bytes='") + + let parts = rangeHeader[6..^1].split('-') + if parts.len != 2: + return err("Invalid Range header format: Expected 'start-end' or 'start-'") + + let startStr = parts[0].strip() + let endStr = parts[1].strip() + + var startPos: int + try: + startPos = parseInt(startStr) + if startPos < 0: + return err("Invalid Range header format: Start position cannot be negative") + except ValueError: + return err("Invalid Range header format: Invalid start position number") + + if endStr == "": + # Format "bytes=start-" + return ok((startPos, none(int))) + else: + # Format "bytes=start-end" + var endPos: int + try: + endPos = parseInt(endStr) + except ValueError: + return err("Invalid Range header format: Invalid end position number") + + if endPos < startPos: + return err("Invalid Range header format: End position cannot be less than start position") + + return ok((startPos, some(endPos))) + declareCounter(codex_api_uploads, "codex API uploads") declareCounter(codex_api_downloads, "codex API downloads") @@ -71,17 +112,24 @@ proc isPending(resp: HttpResponseRef): bool = ## sendBody(resp: HttpResponseRef, ...) twice, which is illegal. return resp.getResponseState() == HttpResponseState.Empty +proc getDefaultRangeError(): Result[(int, Option[int]), string] = + ## Returns a default error Result for range parsing. + return err("No range requested") + proc retrieveCid( - node: CodexNodeRef, cid: Cid, local: bool = true, resp: HttpResponseRef + node: CodexNodeRef, cid: Cid, local: bool = true, resp: HttpResponseRef, range: Result[(int, Option[int]), string] = getDefaultRangeError() ): Future[void] {.async: (raises: [CancelledError, HttpWriteError]).} = - ## Download a file from the node in a streaming - ## manner + ## Download a file from the node in a streaming manner. + ## Supports HTTP Range requests (e.g., "Range: bytes=100-200" or "Range: bytes=100-"). + ## Invalid range headers are ignored, resulting in a full download. ## var stream: LPStream - - var bytes = 0 + var sentBytes = 0 try: + # Always indicate acceptance of range requests + resp.setHeader("Accept-Ranges", "bytes") + without stream =? (await node.retrieve(cid, local)), error: if error of BlockNotFoundError: resp.status = Http404 @@ -117,24 +165,75 @@ proc retrieveCid( # For erasure-coded datasets, we need to return the _original_ length; i.e., # the length of the non-erasure-coded dataset, as that's what we will be # returning to the client. - let contentLength = - if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize - resp.setHeader("Content-Length", $(contentLength.int)) + let totalSize = + (if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize).int + + var rangeStart = 0 + var rangeEnd = totalSize - 1 # Inclusive + var isRangeRequest = false + + if range.isOk: + let (startReq, endReqOpt) = range.get() + # Validate the requested range + if startReq < totalSize: + isRangeRequest = true + rangeStart = startReq + if endReq =? endReqOpt: + # bytes=start-end (inclusive end) + rangeEnd = min(endReq, totalSize - 1) + # else: bytes=start- (rangeEnd remains totalSize - 1) + + # Ensure end >= start after validation/clamping + if rangeEnd < rangeStart: + # Requested range is impossible (e.g., start=100, end=50, totalSize=1000) + # or fully outside the content (e.g., start=1000, totalSize=500) + # Respond with 416 Range Not Satisfiable + resp.status = Http416 + resp.setHeader("Content-Range", $"bytes */{totalSize}") + await resp.sendBody("Requested range not satisfiable") + return + + let contentLength = rangeEnd - rangeStart + 1 + + if isRangeRequest: + resp.status = Http206 + resp.setHeader("Content-Range", $"bytes {rangeStart}-{rangeEnd}/{totalSize}") + resp.setHeader("Content-Length", $contentLength) + # Set the starting position in the seekable stream + if stream of SeekableStream: + SeekableStream(stream).setPos(rangeStart) + else: + # This should not happen if node.retrieve returns StoreStream or similar + error "Stream returned by node.retrieve is not seekable, cannot fulfill range request", streamType = $type(stream) + resp.status = Http500 + await resp.sendBody("Internal error: Cannot seek in content stream") + return + else: + # Full request + resp.setHeader("Content-Length", $contentLength) # Here contentLength == totalSize await resp.prepare(HttpResponseStreamType.Plain) - while not stream.atEof: + var bytesToSend = contentLength + while bytesToSend > 0 and not stream.atEof: var buff = newSeqUninitialized[byte](DefaultBlockSize.int) - len = await stream.readOnce(addr buff[0], buff.len) + readLen = await stream.readOnce(addr buff[0], min(buff.len, bytesToSend)) - buff.setLen(len) + buff.setLen(readLen) if buff.len <= 0: break - bytes += buff.len + sentBytes += buff.len + bytesToSend -= buff.len await resp.send(addr buff[0], buff.len) + + # Check if we sent less than expected (e.g., stream ended early) + if bytesToSend > 0 and not stream.atEof: + warn "Stream ended prematurely while sending range request", cid = cid, expected = contentLength, sent = sentBytes + # The connection will likely be closed by the client detecting the size mismatch + await resp.finish() codex_api_downloads.inc() except CancelledError as exc: @@ -145,7 +244,7 @@ proc retrieveCid( if resp.isPending(): await resp.sendBody(exc.msg) finally: - info "Sent bytes", cid = cid, bytes + info "Sent bytes", cid = cid, bytes = sentBytes, local = local, rangeRequested = range.isOk if not stream.isNil: await stream.close() @@ -282,7 +381,16 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute resp.setCorsHeaders("GET", corsOrigin) resp.setHeader("Access-Control-Headers", "X-Requested-With") - await node.retrieveCid(cid.get(), local = true, resp = resp) + # Parse Range header if present + var requestedRange: Result[(int, Option[int]), string] = getDefaultRangeError() + let rangeHeader = request.headers.getString("Range", "") + if rangeHeader != "": + requestedRange = parseRangeHeader(rangeHeader) + if requestedRange.isErr: + warn "Invalid Range header received", header = rangeHeader, error = requestedRange.error + requestedRange = getDefaultRangeError() # Reset to indicate no valid range + + await retrieveCid(node, cid.get(), local = true, resp = resp, range = requestedRange) router.api(MethodDelete, "/api/codex/v1/data/{cid}") do( cid: Cid, resp: HttpResponseRef @@ -329,12 +437,9 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute router.api(MethodGet, "/api/codex/v1/data/{cid}/network/stream") do( cid: Cid, resp: HttpResponseRef ) -> RestApiResponse: + var headers = buildCorsHeaders("GET", allowedOrigin) ## Download a file from the network in a streaming ## manner - ## - - var headers = buildCorsHeaders("GET", allowedOrigin) - if cid.isErr: return RestApiResponse.error(Http400, $cid.error(), headers = headers) @@ -342,8 +447,16 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute resp.setCorsHeaders("GET", corsOrigin) resp.setHeader("Access-Control-Headers", "X-Requested-With") - resp.setHeader("Access-Control-Expose-Headers", "Content-Disposition") - await node.retrieveCid(cid.get(), local = false, resp = resp) + # Parse Range header if present + var requestedRange: Result[(int, Option[int]), string] = getDefaultRangeError() + let rangeHeader = request.headers.getString("Range", "") + if rangeHeader != "": + requestedRange = parseRangeHeader(rangeHeader) + if requestedRange.isErr: + warn "Invalid Range header received", header = rangeHeader, error = requestedRange.error + requestedRange = getDefaultRangeError() # Reset to indicate no valid range + + await retrieveCid(node, cid.get(), local = false, resp = resp, range = requestedRange) router.api(MethodGet, "/api/codex/v1/data/{cid}/network/manifest") do( cid: Cid, resp: HttpResponseRef From d01a47bd75024648364d02527e8ffaacf593b99d Mon Sep 17 00:00:00 2001 From: Benjamin Arntzen Date: Tue, 6 May 2025 10:59:20 +0100 Subject: [PATCH 2/5] Reliablity fixes for content ranges --- codex/rest/api.nim | 97 ++++++++++++++++++++++++++++++---------------- 1 file changed, 64 insertions(+), 33 deletions(-) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 33bd73986a..e43aa0f1a8 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -126,6 +126,10 @@ proc retrieveCid( var stream: LPStream var sentBytes = 0 + var isRangeRequest = false + var rangeStart = 0 + var rangeEnd = 0 + try: # Always indicate acceptance of range requests resp.setHeader("Accept-Ranges", "bytes") @@ -168,9 +172,9 @@ proc retrieveCid( let totalSize = (if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize).int - var rangeStart = 0 - var rangeEnd = totalSize - 1 # Inclusive - var isRangeRequest = false + rangeStart = 0 + rangeEnd = totalSize - 1 # Inclusive + isRangeRequest = false if range.isOk: let (startReq, endReqOpt) = range.get() @@ -181,15 +185,20 @@ proc retrieveCid( if endReq =? endReqOpt: # bytes=start-end (inclusive end) rangeEnd = min(endReq, totalSize - 1) - # else: bytes=start- (rangeEnd remains totalSize - 1) + else: + # bytes=start- (rangeEnd remains totalSize - 1) + rangeEnd = totalSize - 1 + + debug "Range request", start=rangeStart, endPos=rangeEnd, totalSize=totalSize # Ensure end >= start after validation/clamping if rangeEnd < rangeStart: # Requested range is impossible (e.g., start=100, end=50, totalSize=1000) # or fully outside the content (e.g., start=1000, totalSize=500) # Respond with 416 Range Not Satisfiable + warn "Invalid range request", start=rangeStart, endPos=rangeEnd, totalSize=totalSize resp.status = Http416 - resp.setHeader("Content-Range", $"bytes */{totalSize}") + resp.setHeader("Content-Range", "bytes */" & $totalSize) await resp.sendBody("Requested range not satisfiable") return @@ -197,14 +206,22 @@ proc retrieveCid( if isRangeRequest: resp.status = Http206 - resp.setHeader("Content-Range", $"bytes {rangeStart}-{rangeEnd}/{totalSize}") + resp.setHeader("Content-Range", "bytes " & $rangeStart & "-" & $rangeEnd & "/" & $totalSize) resp.setHeader("Content-Length", $contentLength) + # Set the starting position in the seekable stream if stream of SeekableStream: - SeekableStream(stream).setPos(rangeStart) + try: + SeekableStream(stream).setPos(rangeStart) + debug "Seekable stream position set", position=rangeStart, cid=cid + except CatchableError as seekErr: + error "Failed to seek in stream", cid=cid, position=rangeStart, error=seekErr.msg + resp.status = Http500 + await resp.sendBody("Internal error: Failed to seek to requested position") + return else: # This should not happen if node.retrieve returns StoreStream or similar - error "Stream returned by node.retrieve is not seekable, cannot fulfill range request", streamType = $type(stream) + error "Stream returned by node.retrieve is not seekable, cannot fulfill range request", streamType = $type(stream), cid=cid resp.status = Http500 await resp.sendBody("Internal error: Cannot seek in content stream") return @@ -215,36 +232,50 @@ proc retrieveCid( await resp.prepare(HttpResponseStreamType.Plain) var bytesToSend = contentLength - while bytesToSend > 0 and not stream.atEof: - var - buff = newSeqUninitialized[byte](DefaultBlockSize.int) - readLen = await stream.readOnce(addr buff[0], min(buff.len, bytesToSend)) - - buff.setLen(readLen) - if buff.len <= 0: - break - - sentBytes += buff.len - bytesToSend -= buff.len - - await resp.send(addr buff[0], buff.len) - - # Check if we sent less than expected (e.g., stream ended early) - if bytesToSend > 0 and not stream.atEof: - warn "Stream ended prematurely while sending range request", cid = cid, expected = contentLength, sent = sentBytes - # The connection will likely be closed by the client detecting the size mismatch - - await resp.finish() - codex_api_downloads.inc() + try: + while bytesToSend > 0 and not stream.atEof: + var + buff = newSeqUninitialized[byte](DefaultBlockSize.int) + readLen = await stream.readOnce(addr buff[0], min(buff.len, bytesToSend)) + + buff.setLen(readLen) + if buff.len <= 0: + debug "End of stream reached during streaming", cid=cid, remaining=bytesToSend + break + + sentBytes += buff.len + bytesToSend -= buff.len + + await resp.send(addr buff[0], buff.len) + + # Check if we sent less than expected (e.g., stream ended early) + if bytesToSend > 0 and not stream.atEof: + warn "Stream ended prematurely while sending content", cid=cid, expected=contentLength, sent=sentBytes, missing=bytesToSend + # The connection will likely be closed by the client detecting the size mismatch + + await resp.finish() + codex_api_downloads.inc() + except HttpWriteError as writeErr: + # This is likely a client disconnection during data transfer + # Log and move on, don't try to send more response data + warn "Client disconnected during download", cid=cid, sent=sentBytes, expected=contentLength, error=writeErr.msg + except CatchableError as streamErr: + # Other streaming errors + warn "Error during streaming", cid=cid, sent=sentBytes, error=streamErr.msg except CancelledError as exc: raise exc except CatchableError as exc: - warn "Error streaming blocks", exc = exc.msg - resp.status = Http500 + # Handle other exceptions outside the streaming loop + warn "Error preparing stream", exc=exc.msg if resp.isPending(): - await resp.sendBody(exc.msg) + try: + resp.status = Http500 + await resp.sendBody(exc.msg) + except HttpWriteError: + # Connection might already be closed + warn "Unable to send error response, client likely disconnected", cid=cid finally: - info "Sent bytes", cid = cid, bytes = sentBytes, local = local, rangeRequested = range.isOk + info "Sent bytes", cid=cid, bytes=sentBytes, local=local, rangeRequested=isRangeRequest, rangeStart=(if isRangeRequest: rangeStart else: 0), rangeEnd=(if isRangeRequest: rangeEnd else: 0) if not stream.isNil: await stream.close() From f55d521c9cf4e769e44dda17a31302f11ead5d5f Mon Sep 17 00:00:00 2001 From: Benjamin Arntzen Date: Tue, 6 May 2025 11:46:19 +0100 Subject: [PATCH 3/5] Experiment with better seeking for long files --- codex/rest/api.nim | 75 ++++++++++---- codex/streams/rangestream.nim | 190 ++++++++++++++++++++++++++++++++++ 2 files changed, 242 insertions(+), 23 deletions(-) create mode 100644 codex/streams/rangestream.nim diff --git a/codex/rest/api.nim b/codex/rest/api.nim index e43aa0f1a8..eab7cafc45 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -41,6 +41,7 @@ import ../streams/asyncstreamwrapper import ../stores import ../utils/options import ../streams/seekablestream +import ../streams/rangestream import ./coders import ./json @@ -116,6 +117,37 @@ proc getDefaultRangeError(): Result[(int, Option[int]), string] = ## Returns a default error Result for range parsing. return err("No range requested") +proc retrieveRange*( + node: CodexNodeRef, cid: Cid, rangeStart: int, rangeEnd: int, local: bool = true +): Future[?!LPStream] {.async.} = + ## Retrieve a specific byte range from a file by CID + ## This is more efficient than retrieving the whole file and seeking + ## + + without manifest =? (await node.fetchManifest(cid)), err: + return failure(err) + + # Create a proper stream that only retrieves the necessary blocks + let + totalSize = (if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize).int + contentLength = rangeEnd - rangeStart + 1 + stream = RangeStream.new( + node.blockStore(), + manifest, + rangeStart, + contentLength, + pad = false + ) + + trace "Range stream created", + cid = cid, + rangeStart = rangeStart, + rangeEnd = rangeEnd, + totalSize = totalSize, + contentLength = contentLength + + return success(LPStream(stream)) + proc retrieveCid( node: CodexNodeRef, cid: Cid, local: bool = true, resp: HttpResponseRef, range: Result[(int, Option[int]), string] = getDefaultRangeError() ): Future[void] {.async: (raises: [CancelledError, HttpWriteError]).} = @@ -134,7 +166,7 @@ proc retrieveCid( # Always indicate acceptance of range requests resp.setHeader("Accept-Ranges", "bytes") - without stream =? (await node.retrieve(cid, local)), error: + without manifest =? (await node.fetchManifest(cid)), error: if error of BlockNotFoundError: resp.status = Http404 await resp.sendBody( @@ -146,13 +178,7 @@ proc retrieveCid( await resp.sendBody(error.msg) return - # It is ok to fetch again the manifest because it will hit the cache - without manifest =? (await node.fetchManifest(cid)), err: - error "Failed to fetch manifest", err = err.msg - resp.status = Http404 - await resp.sendBody(err.msg) - return - + # Set content type and disposition headers if manifest.mimetype.isSome: resp.setHeader("Content-Type", manifest.mimetype.get()) else: @@ -204,31 +230,34 @@ proc retrieveCid( let contentLength = rangeEnd - rangeStart + 1 + # Set appropriate headers for the response if isRangeRequest: resp.status = Http206 resp.setHeader("Content-Range", "bytes " & $rangeStart & "-" & $rangeEnd & "/" & $totalSize) resp.setHeader("Content-Length", $contentLength) - # Set the starting position in the seekable stream - if stream of SeekableStream: - try: - SeekableStream(stream).setPos(rangeStart) - debug "Seekable stream position set", position=rangeStart, cid=cid - except CatchableError as seekErr: - error "Failed to seek in stream", cid=cid, position=rangeStart, error=seekErr.msg - resp.status = Http500 - await resp.sendBody("Internal error: Failed to seek to requested position") - return - else: - # This should not happen if node.retrieve returns StoreStream or similar - error "Stream returned by node.retrieve is not seekable, cannot fulfill range request", streamType = $type(stream), cid=cid + # For range requests, get a stream that only retrieves the needed blocks + # This is more efficient than retrieving the entire file + without rangeStream =? (await retrieveRange(node, cid, rangeStart, rangeEnd, local)), error: + error "Failed to create range stream", cid=cid, error=error.msg resp.status = Http500 - await resp.sendBody("Internal error: Cannot seek in content stream") + await resp.sendBody("Internal error: Failed to create range stream") return + + stream = rangeStream + debug "Range stream created", cid=cid, rangeStart=rangeStart, rangeEnd=rangeEnd else: - # Full request + # Full request - get the entire file + without fullStream =? (await node.retrieve(cid, local)), error: + resp.status = Http500 + await resp.sendBody(error.msg) + return + + stream = fullStream + # Full request headers resp.setHeader("Content-Length", $contentLength) # Here contentLength == totalSize + # Prepare the response for streaming await resp.prepare(HttpResponseStreamType.Plain) var bytesToSend = contentLength diff --git a/codex/streams/rangestream.nim b/codex/streams/rangestream.nim new file mode 100644 index 0000000000..7075184f6e --- /dev/null +++ b/codex/streams/rangestream.nim @@ -0,0 +1,190 @@ +## Nim-Codex +## Copyright (c) 2021-2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils +import pkg/chronos +import pkg/libbacktrace +import pkg/questionable +import pkg/questionable/results +import pkg/stew/results as stew_results + +import ../blocktype as bt +# import ./asyncstreamwrapper # No longer needed +import ./storestream +import ./seekablestream +import ../logutils +import ../stores +import ../manifest + +logScope: + topics = "codex rangestream" + +type + RangeStream* = ref object of SeekableStream + store: BlockStore + manifest: Manifest + streamStartPos: int # Start position in original dataset + streamLength: int # Length of the streamed range + currentPos: int # Current position relative to streamStartPos (0 to streamLength-1) + leftToProcess: int # Bytes left to process from the stream + storeStream: StoreStream # Underlying stream for reading data + pad: bool + +proc getBlocksForRange( + self: RangeStream, + offset: int, + length: int +): Future[?!seq[int]] {.async.} = + ## Get the block indices needed to satisfy a range request + ## Note: This function seems unused within RangeStream itself. + ## Keeping it for now in case it's used externally. + let + blockSize = self.manifest.blockSize.int + firstBlock = offset div blockSize + lastBlock = min((offset + length - 1) div blockSize, self.manifest.blocksCount - 1) + + var blockIndices: seq[int] = @[] + for i in firstBlock..lastBlock: + blockIndices.add(i) + + return success(blockIndices) + +proc new*( + T: type RangeStream, + store: BlockStore, + manifest: Manifest, + startPos: int, + length: int, + pad: bool = false +): RangeStream = + ## Create a range stream that efficiently retrieves only the necessary blocks + ## for the requested byte range + + let stream = RangeStream( + store: store, + manifest: manifest, + streamStartPos: startPos, + streamLength: length, + currentPos: 0, + leftToProcess: length, + storeStream: nil, # Initialize lazily + pad: pad + ) + + # Base stream initialization might be needed here if SeekableStream requires it + # stream.initStream() # Assuming SeekableStream has initStream + + return stream + +method atEof*(self: RangeStream): bool = + self.leftToProcess <= 0 + +# Helper proc to initialize the underlying storeStream if needed +proc ensureStoreStream(self: RangeStream) = + if self.storeStream.isNil: + self.storeStream = StoreStream.new(self.store, self.manifest, self.pad) + # Set initial position + self.storeStream.setPos(self.streamStartPos + self.currentPos) # Start at current absolute position + debug "RangeStream initialized underlying StoreStream", startPos=self.streamStartPos, currentPos=self.currentPos, storeStreamPos=self.storeStream.offset + +method read*( + self: RangeStream, pbytes: pointer, nbytes: int +): Future[int] {.async.} = + ## Read bytes from the specified range within the underlying data. + + if self.atEof: return 0 # Already at end of range + + # Ensure underlying stream is ready + self.ensureStoreStream() + + # Read only as many bytes as needed for the range + let bytesToRead = min(nbytes, self.leftToProcess) + if bytesToRead == 0: return 0 + + var readBytes = 0 + try: + # Use StoreStream's async readOnce + readBytes = await self.storeStream.readOnce(pbytes, bytesToRead) + trace "RangeStream read bytes", got=readBytes, requested=bytesToRead, left=self.leftToProcess-readBytes + except LPStreamEOFError: + # StoreStream reached EOF unexpectedly before range end? Log and treat as 0 bytes read for the range. + warn "StoreStream EOF encountered while reading range", requested=bytesToRead, got=0, rangeLeft=self.leftToProcess + readBytes = 0 + except LPStreamError as exc: + warn "StoreStream error while reading range", msg=exc.msg, requested=bytesToRead, rangeLeft=self.leftToProcess + raise exc # Re-raise other LPStream errors + + if readBytes > 0: + self.currentPos += readBytes + self.leftToProcess -= readBytes + elif self.leftToProcess > 0: + # If readOnce returned 0 but we still expected bytes, it's effectively EOF for the range + debug "RangeStream read 0 bytes, marking as EOF", left=self.leftToProcess + self.leftToProcess = 0 # Mark as EOF + + return readBytes + +method close*(self: RangeStream) {.async.} = + ## Close the RangeStream and its underlying StoreStream if initialized. + trace "Closing RangeStream", currentPos=self.currentPos, left=self.leftToProcess + if not self.storeStream.isNil: + await self.storeStream.close() # Use the async close + self.storeStream = nil # Clear the reference + # Call base close implementation if necessary (LPStream might handle this) + # await procCall LPStream(self).closeImpl() + +method readOnce*( + self: RangeStream, pbytes: pointer, nbytes: int +): Future[int] {.async.} = + # Delegate to the main read method + return await self.read(pbytes, nbytes) + +method write*( + self: RangeStream, pbytes: pointer, nbytes: int +): Future[int] {.async.} = + # Range streams are read-only + raise newException(LPStreamError, "RangeStream is read-only") + # return 0 # Previous behavior + +method getPosition*(self: RangeStream): int = + ## Get the current position within the defined range (0 to streamLength-1) + return self.currentPos + +method setPos*(self: RangeStream, pos: int): bool = # No longer async + ## Set the position within the defined range (0 to streamLength-1) + + # Validate position is within the allowed range [0, streamLength) + if pos < 0 or pos >= self.streamLength: + warn "Attempted to seek outside RangeStream bounds", requested=pos, length=self.streamLength + return false + + # Ensure underlying stream is ready + self.ensureStoreStream() + + # Calculate absolute position in the original dataset + let absolutePos = self.streamStartPos + pos + + # Set position in underlying StoreStream (synchronous) + self.storeStream.setPos(absolutePos) + + # Update RangeStream state + self.currentPos = pos + self.leftToProcess = self.streamLength - pos + + debug "RangeStream position set", rangePos=pos, absPos=absolutePos, left=self.leftToProcess + return true + +method truncate*(self: RangeStream, size: int): Future[bool] {.async.} = + # Range streams are read-only + raise newException(LPStreamError, "RangeStream is read-only") + # return false # Previous behavior + +method getLengthSync*(self: RangeStream): int = + ## Get the total length of the defined range. + return self.streamLength \ No newline at end of file From d7c1797b2b9c901847e96ccecc357de843ab5796 Mon Sep 17 00:00:00 2001 From: Benjamin Arntzen Date: Tue, 6 May 2025 17:06:16 +0100 Subject: [PATCH 4/5] Progress on fixing a segfault --- codex/node.nim | 62 ++++++++++++ codex/rest/api.nim | 169 ++++++++++++++++++++----------- codex/streams/seekablestream.nim | 4 +- 3 files changed, 175 insertions(+), 60 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index fb653c0d76..a5835f9dee 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -14,6 +14,7 @@ import std/sequtils import std/strformat import std/sugar import times +import std/streams import pkg/taskpools import pkg/questionable @@ -37,6 +38,7 @@ import ./merkletree import ./stores import ./blockexchange import ./streams +import ./streams/rangestream import ./erasure import ./discovery import ./contracts @@ -325,10 +327,70 @@ proc retrieve*( if err of AsyncTimeoutError: return failure(err) + # Not a manifest, must be a single block return await self.streamSingleBlock(cid) await self.streamEntireDataset(manifest, cid) +proc retrieveLocalRange*( + self: CodexNodeRef, cid: Cid, rangeStart: int, rangeEnd: int +): Future[?!LPStream] {.async.} = + ## Retrieve a specific byte range efficiently using only the local block store. + without manifest =? (await self.fetchManifest(cid)), err: + # Manifest fetch might still go to network if not local, but block reads won't. + return failure(err) + + let + totalSize = (if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize).int + clampedEnd = min(rangeEnd, totalSize - 1) # Ensure end is within bounds + contentLength = if clampedEnd >= rangeStart: clampedEnd - rangeStart + 1 else: 0 + + if contentLength <= 0: + # Requested range is impossible or zero length + warn "Invalid or zero-length local range requested", cid, rangeStart, rangeEnd, totalSize + let emptyStream = BufferStream.new() # Use BufferStream + await emptyStream.pushEof() # Mark as immediately finished + return success(LPStream(emptyStream)) # Return empty stream + + let stream = RangeStream.new( + self.blockStore(), # Use local store + manifest, + rangeStart, + contentLength, + pad = false # Assuming padding is not needed for local retrieval + ) + trace "Local range stream created", cid, rangeStart, rangeEnd, contentLength + return success(LPStream(stream)) + +proc retrieveNetworkRange*( + self: CodexNodeRef, cid: Cid, rangeStart: int, rangeEnd: int +): Future[?!LPStream] {.async.} = + ## Retrieve a specific byte range efficiently using the network store. + without manifest =? (await self.fetchManifest(cid)), err: + return failure(err) + + let + totalSize = (if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize).int + clampedEnd = min(rangeEnd, totalSize - 1) # Ensure end is within bounds + contentLength = if clampedEnd >= rangeStart: clampedEnd - rangeStart + 1 else: 0 + + if contentLength <= 0: + # Requested range is impossible or zero length + warn "Invalid or zero-length network range requested", cid, rangeStart, rangeEnd, totalSize + let emptyStream = BufferStream.new() # Use BufferStream + await emptyStream.pushEof() # Mark as immediately finished + return success(LPStream(emptyStream)) # Return empty stream + + let stream = RangeStream.new( + self.networkStore, # Use network store + manifest, + rangeStart, + contentLength, + pad = false # Assuming padding is not needed here either + ) + trace "Network range stream created", cid, rangeStart, rangeEnd, contentLength + return success(LPStream(stream)) + proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = if err =? (await self.networkStore.delBlock(cid)).errorOption: error "Error deleting block", cid, err = err.msg diff --git a/codex/rest/api.nim b/codex/rest/api.nim index eab7cafc45..c7ace673a7 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -117,37 +117,6 @@ proc getDefaultRangeError(): Result[(int, Option[int]), string] = ## Returns a default error Result for range parsing. return err("No range requested") -proc retrieveRange*( - node: CodexNodeRef, cid: Cid, rangeStart: int, rangeEnd: int, local: bool = true -): Future[?!LPStream] {.async.} = - ## Retrieve a specific byte range from a file by CID - ## This is more efficient than retrieving the whole file and seeking - ## - - without manifest =? (await node.fetchManifest(cid)), err: - return failure(err) - - # Create a proper stream that only retrieves the necessary blocks - let - totalSize = (if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize).int - contentLength = rangeEnd - rangeStart + 1 - stream = RangeStream.new( - node.blockStore(), - manifest, - rangeStart, - contentLength, - pad = false - ) - - trace "Range stream created", - cid = cid, - rangeStart = rangeStart, - rangeEnd = rangeEnd, - totalSize = totalSize, - contentLength = contentLength - - return success(LPStream(stream)) - proc retrieveCid( node: CodexNodeRef, cid: Cid, local: bool = true, resp: HttpResponseRef, range: Result[(int, Option[int]), string] = getDefaultRangeError() ): Future[void] {.async: (raises: [CancelledError, HttpWriteError]).} = @@ -161,6 +130,7 @@ proc retrieveCid( var isRangeRequest = false var rangeStart = 0 var rangeEnd = 0 + var responseFinishedOrFailed = false # Flag to track response state try: # Always indicate acceptance of range requests @@ -236,16 +206,43 @@ proc retrieveCid( resp.setHeader("Content-Range", "bytes " & $rangeStart & "-" & $rangeEnd & "/" & $totalSize) resp.setHeader("Content-Length", $contentLength) - # For range requests, get a stream that only retrieves the needed blocks - # This is more efficient than retrieving the entire file - without rangeStream =? (await retrieveRange(node, cid, rangeStart, rangeEnd, local)), error: - error "Failed to create range stream", cid=cid, error=error.msg - resp.status = Http500 - await resp.sendBody("Internal error: Failed to create range stream") - return + # Get the appropriate range stream from the node + var rangeStreamResult: Future[?!LPStream] + if local: + trace "Requesting local range stream", cid=cid, start=rangeStart, endPos=rangeEnd + rangeStreamResult = node.retrieveLocalRange(cid, rangeStart, rangeEnd) + else: + trace "Requesting network range stream", cid=cid, start=rangeStart, endPos=rangeEnd + rangeStreamResult = node.retrieveNetworkRange(cid, rangeStart, rangeEnd) + + # ADDED LOG BEFORE AWAIT + debug "About to await rangeStreamResult", cid=cid, local=local + # Explicitly type rangestream + let awaitedResult = await rangeStreamResult + if awaitedResult.isErr: + let error = awaitedResult.error # Extract error for logging/use + error "Failed to create range stream", cid=cid, error=error.msg, local=local + resp.status = Http500 + await resp.sendBody("Internal error: Failed to create range stream") + responseFinishedOrFailed = true + return - stream = rangeStream - debug "Range stream created", cid=cid, rangeStart=rangeStart, rangeEnd=rangeEnd + # ADDED LOG AFTER SUCCESSFUL AWAIT (Moved here) + debug "Successfully awaited rangeStreamResult", cid=cid, local=local + + let rangestream = awaitedResult.get() # Get the successful result + stream = rangestream # Assign the LPStream + # ADDED IMMEDIATE LOG (Re-applying) + let immediateStreamTypeStr = $typeof(stream) + debug "Assigned rangestream to stream variable, before resp.prepare", cid=cid, streamType=immediateStreamTypeStr + let streamTypeStr = $typeof(stream) + debug "Range stream acquired", cid=cid, rangeStart=rangeStart, rangeEnd=rangeEnd, local=local, streamType=streamTypeStr + if stream.isNil: + warn "Range stream is nil immediately after acquisition", cid=cid, rangeStart=rangeStart, rangeEnd=rangeEnd + elif stream.atEof: + warn "Range stream from retrieveNetworkRange is at EOF immediately", cid=cid, rangeStart=rangeStart, rangeEnd=rangeEnd + else: + debug "Range stream from retrieveNetworkRange is NOT at EOF immediately", cid=cid, rangeStart=rangeStart, rangeEnd=rangeEnd else: # Full request - get the entire file without fullStream =? (await node.retrieve(cid, local)), error: @@ -254,22 +251,39 @@ proc retrieveCid( return stream = fullStream + # ADDED IMMEDIATE LOG (Re-applying) + let immediateStreamTypeStr = $typeof(stream) + debug "Assigned fullStream to stream variable, before resp.prepare", cid=cid, streamType=immediateStreamTypeStr # Full request headers resp.setHeader("Content-Length", $contentLength) # Here contentLength == totalSize + if stream.isNil: + warn "Full stream is nil immediately after acquisition", cid=cid, local=local + elif stream.atEof: + warn "Full stream from retrieve is at EOF immediately", cid=cid, local=local + else: + debug "Full stream from retrieve is NOT at EOF immediately", cid=cid, local=local - # Prepare the response for streaming await resp.prepare(HttpResponseStreamType.Plain) + # *** CRASH HAPPENS SOMEWHERE BETWEEN stream assignment AND HERE (or during resp.prepare) *** + var bytesToSend = contentLength - try: + debug "Preparing to send data", cid=cid, bytesToSend=bytesToSend, streamIsNil=stream.isNil, streamAtEofInitial= (not stream.isNil and stream.atEof), isRangeRequest=isRangeRequest, rangeStartReport=rangeStart, rangeEndReport=rangeEnd, totalSizeReport=totalSize + try: # <-- Start of inner streaming try block while bytesToSend > 0 and not stream.atEof: var buff = newSeqUninitialized[byte](DefaultBlockSize.int) - readLen = await stream.readOnce(addr buff[0], min(buff.len, bytesToSend)) + maxRead = min(buff.len, bytesToSend) + + debug "Attempting stream.readOnce", cid=cid, maxRead=maxRead, currentBytesToSend=bytesToSend, streamAtEofBeforeRead=stream.atEof + + var readLen = await stream.readOnce(addr buff[0], maxRead) + + debug "Stream readOnce returned", cid=cid, readLen=readLen, requestedRead=maxRead, streamAtEofAfterRead=stream.atEof, currentSentBytes=sentBytes buff.setLen(readLen) if buff.len <= 0: - debug "End of stream reached during streaming", cid=cid, remaining=bytesToSend + debug "Stream read returned 0 or negative, or buff became empty. Breaking loop.", cid=cid, readLen=readLen, buffLen=buff.len, remainingBytesToSend=bytesToSend break sentBytes += buff.len @@ -277,36 +291,73 @@ proc retrieveCid( await resp.send(addr buff[0], buff.len) - # Check if we sent less than expected (e.g., stream ended early) if bytesToSend > 0 and not stream.atEof: warn "Stream ended prematurely while sending content", cid=cid, expected=contentLength, sent=sentBytes, missing=bytesToSend - # The connection will likely be closed by the client detecting the size mismatch + # Consider setting responseFinishedOrFailed = true here? Or let finish() handle it? - await resp.finish() + responseFinishedOrFailed = true + await resp.finish() codex_api_downloads.inc() except HttpWriteError as writeErr: - # This is likely a client disconnection during data transfer - # Log and move on, don't try to send more response data - warn "Client disconnected during download", cid=cid, sent=sentBytes, expected=contentLength, error=writeErr.msg + responseFinishedOrFailed = true + warn "Client disconnected during download (inner try)", cid=cid, sent=sentBytes, expected=contentLength, error=writeErr.msg + except CancelledError as streamCancelledErr: + responseFinishedOrFailed = true + warn "Streaming cancelled (inner try)", cid=cid, sent=sentBytes, error=streamCancelledErr.msg + raise streamCancelledErr except CatchableError as streamErr: - # Other streaming errors - warn "Error during streaming", cid=cid, sent=sentBytes, error=streamErr.msg + responseFinishedOrFailed = true + warn "Error during streaming (inner try)", cid=cid, sent=sentBytes, error=streamErr.msg + # Attempt to send a 500 if the response is still pending + if resp.isPending(): + resp.status = Http500 + await resp.sendBody(streamErr.msg) + + except AssertionDefect as assertExc: + # ADDED: Catch AssertionDefect specifically + responseFinishedOrFailed = true + let excTypeStr = $typeof(assertExc) + warn "AssertionDefect in retrieveCid (outer try)", cid=cid, errorContext="AssertionDefect", excMsg=assertExc.msg, excType=excTypeStr + if resp.isPending(): + try: + resp.status = Http500 + await resp.sendBody("Assertion Failed: " & assertExc.msg) + except HttpWriteError: + warn "Unable to send error response (outer AssertionDefect), client likely disconnected", cid=cid except CancelledError as exc: + responseFinishedOrFailed = true + warn "retrieveCid cancelled (outer try)", cid=cid, error=exc.msg raise exc - except CatchableError as exc: - # Handle other exceptions outside the streaming loop - warn "Error preparing stream", exc=exc.msg + except Exception as exc: # Broaden catch from CatchableError to Exception (This is the intended change) + responseFinishedOrFailed = true # Set it unconditionally here + let excTypeStr = $typeof(exc) + warn "Error in retrieveCid (outer try)", cid=cid, errorContext="Outer Exception", excMsg=exc.msg, excType=excTypeStr if resp.isPending(): try: resp.status = Http500 await resp.sendBody(exc.msg) except HttpWriteError: - # Connection might already be closed - warn "Unable to send error response, client likely disconnected", cid=cid + warn "Unable to send error response (outer try), client likely disconnected", cid=cid finally: + # Determine stream type for logging + let streamType = if stream.isNil: "nil" else: $typeof(stream) + info "Finally block reached", cid=cid, streamType=streamType, isStreamNil=(stream.isNil), responseFinishedOrFailed=responseFinishedOrFailed + + # Original info log info "Sent bytes", cid=cid, bytes=sentBytes, local=local, rangeRequested=isRangeRequest, rangeStart=(if isRangeRequest: rangeStart else: 0), rangeEnd=(if isRangeRequest: rangeEnd else: 0) - if not stream.isNil: - await stream.close() + + # Safely close the stream only if it wasn't already handled by finish/failure + if not stream.isNil and not responseFinishedOrFailed: + info "Attempting to close potentially orphaned stream", cid=cid, streamType=streamType + try: + await stream.close() + info "Orphaned stream closed successfully", cid=cid, streamType=streamType + except CatchableError as closeExc: # Reverted to CatchableError + discard + elif not stream.isNil and responseFinishedOrFailed: + trace "Skipping stream.close() because response finished or failed.", cid=cid, streamType=streamType + elif stream.isNil: + trace "Finally stream check", msg="Stream was nil in finally block, nothing to close." proc buildCorsHeaders( httpMethod: string, allowedOrigin: Option[string] diff --git a/codex/streams/seekablestream.nim b/codex/streams/seekablestream.nim index c48ec28f0f..08549a6572 100644 --- a/codex/streams/seekablestream.nim +++ b/codex/streams/seekablestream.nim @@ -21,7 +21,9 @@ type SeekableStream* = ref object of LPStream offset*: int method `size`*(self: SeekableStream): int {.base.} = - raiseAssert("method unimplemented") + # Base implementation returns -1 to indicate unknown size + # Subclasses should override this method to provide the actual size + return -1 proc setPos*(self: SeekableStream, pos: int) = self.offset = pos From 2c44a3826358d2ca48b0d8632db666bb92e56a5a Mon Sep 17 00:00:00 2001 From: Benjamin Arntzen Date: Tue, 6 May 2025 19:15:06 +0100 Subject: [PATCH 5/5] Fix segfault --- codex/node.nim | 12 ++++++++-- codex/rest/api.nim | 36 +++++------------------------- codex/streams/rangestream.nim | 41 +++++++++++++++++++---------------- 3 files changed, 37 insertions(+), 52 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index a5835f9dee..4cb7c5a49f 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -350,7 +350,8 @@ proc retrieveLocalRange*( warn "Invalid or zero-length local range requested", cid, rangeStart, rangeEnd, totalSize let emptyStream = BufferStream.new() # Use BufferStream await emptyStream.pushEof() # Mark as immediately finished - return success(LPStream(emptyStream)) # Return empty stream + # Return with correct type signature but preserve instance identity + return success(LPStream(emptyStream)) let stream = RangeStream.new( self.blockStore(), # Use local store @@ -359,7 +360,10 @@ proc retrieveLocalRange*( contentLength, pad = false # Assuming padding is not needed for local retrieval ) + # Initialize the stream to ensure its objName is set to enable tracing + stream.initStream() trace "Local range stream created", cid, rangeStart, rangeEnd, contentLength + # Return with correct type signature but preserve instance identity return success(LPStream(stream)) proc retrieveNetworkRange*( @@ -379,7 +383,8 @@ proc retrieveNetworkRange*( warn "Invalid or zero-length network range requested", cid, rangeStart, rangeEnd, totalSize let emptyStream = BufferStream.new() # Use BufferStream await emptyStream.pushEof() # Mark as immediately finished - return success(LPStream(emptyStream)) # Return empty stream + # Return with correct type signature but preserve instance identity + return success(LPStream(emptyStream)) let stream = RangeStream.new( self.networkStore, # Use network store @@ -388,7 +393,10 @@ proc retrieveNetworkRange*( contentLength, pad = false # Assuming padding is not needed here either ) + # Initialize the stream to ensure its objName is set to enable tracing + stream.initStream() trace "Network range stream created", cid, rangeStart, rangeEnd, contentLength + # Return with correct type signature but preserve instance identity return success(LPStream(stream)) proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = diff --git a/codex/rest/api.nim b/codex/rest/api.nim index c7ace673a7..e84ad0f7a9 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -215,34 +215,18 @@ proc retrieveCid( trace "Requesting network range stream", cid=cid, start=rangeStart, endPos=rangeEnd rangeStreamResult = node.retrieveNetworkRange(cid, rangeStart, rangeEnd) - # ADDED LOG BEFORE AWAIT - debug "About to await rangeStreamResult", cid=cid, local=local - # Explicitly type rangestream let awaitedResult = await rangeStreamResult if awaitedResult.isErr: - let error = awaitedResult.error # Extract error for logging/use + let error = awaitedResult.error error "Failed to create range stream", cid=cid, error=error.msg, local=local resp.status = Http500 await resp.sendBody("Internal error: Failed to create range stream") responseFinishedOrFailed = true return - # ADDED LOG AFTER SUCCESSFUL AWAIT (Moved here) - debug "Successfully awaited rangeStreamResult", cid=cid, local=local - - let rangestream = awaitedResult.get() # Get the successful result - stream = rangestream # Assign the LPStream - # ADDED IMMEDIATE LOG (Re-applying) - let immediateStreamTypeStr = $typeof(stream) - debug "Assigned rangestream to stream variable, before resp.prepare", cid=cid, streamType=immediateStreamTypeStr - let streamTypeStr = $typeof(stream) - debug "Range stream acquired", cid=cid, rangeStart=rangeStart, rangeEnd=rangeEnd, local=local, streamType=streamTypeStr - if stream.isNil: - warn "Range stream is nil immediately after acquisition", cid=cid, rangeStart=rangeStart, rangeEnd=rangeEnd - elif stream.atEof: - warn "Range stream from retrieveNetworkRange is at EOF immediately", cid=cid, rangeStart=rangeStart, rangeEnd=rangeEnd - else: - debug "Range stream from retrieveNetworkRange is NOT at EOF immediately", cid=cid, rangeStart=rangeStart, rangeEnd=rangeEnd + let rangestream = awaitedResult.get() + stream = rangestream + debug "Assigned rangestream to stream in retrieveCid", streamType = $typeof(stream), objectId = (if stream.isNil: "nil-oid" else: $stream.oid), isNil = stream.isNil else: # Full request - get the entire file without fullStream =? (await node.retrieve(cid, local)), error: @@ -251,17 +235,7 @@ proc retrieveCid( return stream = fullStream - # ADDED IMMEDIATE LOG (Re-applying) - let immediateStreamTypeStr = $typeof(stream) - debug "Assigned fullStream to stream variable, before resp.prepare", cid=cid, streamType=immediateStreamTypeStr - # Full request headers - resp.setHeader("Content-Length", $contentLength) # Here contentLength == totalSize - if stream.isNil: - warn "Full stream is nil immediately after acquisition", cid=cid, local=local - elif stream.atEof: - warn "Full stream from retrieve is at EOF immediately", cid=cid, local=local - else: - debug "Full stream from retrieve is NOT at EOF immediately", cid=cid, local=local + debug "Assigned fullStream to stream in retrieveCid", streamType = $typeof(stream), objectId = (if stream.isNil: "nil-oid" else: $stream.oid), isNil = stream.isNil await resp.prepare(HttpResponseStreamType.Plain) diff --git a/codex/streams/rangestream.nim b/codex/streams/rangestream.nim index 7075184f6e..7abedfc8ce 100644 --- a/codex/streams/rangestream.nim +++ b/codex/streams/rangestream.nim @@ -12,7 +12,7 @@ import pkg/chronos import pkg/libbacktrace import pkg/questionable import pkg/questionable/results -import pkg/stew/results as stew_results +# import pkg/stew/results as stew_results # Removed deprecated import import ../blocktype as bt # import ./asyncstreamwrapper # No longer needed @@ -77,12 +77,12 @@ proc new*( pad: pad ) - # Base stream initialization might be needed here if SeekableStream requires it - # stream.initStream() # Assuming SeekableStream has initStream + # Initialize the base LPStream object + stream.initStream() return stream -method atEof*(self: RangeStream): bool = +method atEof*(self: RangeStream): bool {.raises: [].} = self.leftToProcess <= 0 # Helper proc to initialize the underlying storeStream if needed @@ -95,17 +95,19 @@ proc ensureStoreStream(self: RangeStream) = method read*( self: RangeStream, pbytes: pointer, nbytes: int -): Future[int] {.async.} = +): Future[int] {.async: (raises: [CancelledError, LPStreamError]), base.} = ## Read bytes from the specified range within the underlying data. - if self.atEof: return 0 # Already at end of range + if self.atEof: + return 0 # Ensure underlying stream is ready self.ensureStoreStream() # Read only as many bytes as needed for the range let bytesToRead = min(nbytes, self.leftToProcess) - if bytesToRead == 0: return 0 + if bytesToRead == 0: + return 0 var readBytes = 0 try: @@ -118,7 +120,7 @@ method read*( readBytes = 0 except LPStreamError as exc: warn "StoreStream error while reading range", msg=exc.msg, requested=bytesToRead, rangeLeft=self.leftToProcess - raise exc # Re-raise other LPStream errors + raise exc if readBytes > 0: self.currentPos += readBytes @@ -130,33 +132,34 @@ method read*( return readBytes -method close*(self: RangeStream) {.async.} = +method close*( + self: RangeStream +): Future[void] {.async: (raises: [])} = ## Close the RangeStream and its underlying StoreStream if initialized. trace "Closing RangeStream", currentPos=self.currentPos, left=self.leftToProcess if not self.storeStream.isNil: await self.storeStream.close() # Use the async close self.storeStream = nil # Clear the reference - # Call base close implementation if necessary (LPStream might handle this) - # await procCall LPStream(self).closeImpl() + # Call base close implementation + await procCall LPStream(self).closeImpl() method readOnce*( self: RangeStream, pbytes: pointer, nbytes: int -): Future[int] {.async.} = +): Future[int] {.async: (raises: [CancelledError, LPStreamError])} = # Delegate to the main read method return await self.read(pbytes, nbytes) method write*( - self: RangeStream, pbytes: pointer, nbytes: int -): Future[int] {.async.} = + self: RangeStream, msg: seq[byte] +): Future[void] {.async: (raises: [CancelledError, LPStreamError])} = # Range streams are read-only raise newException(LPStreamError, "RangeStream is read-only") - # return 0 # Previous behavior -method getPosition*(self: RangeStream): int = +method getPosition*(self: RangeStream): int {.base.} = ## Get the current position within the defined range (0 to streamLength-1) return self.currentPos -method setPos*(self: RangeStream, pos: int): bool = # No longer async +method setPos*(self: RangeStream, pos: int): bool {.base.} = # No longer async ## Set the position within the defined range (0 to streamLength-1) # Validate position is within the allowed range [0, streamLength) @@ -180,11 +183,11 @@ method setPos*(self: RangeStream, pos: int): bool = # No longer async debug "RangeStream position set", rangePos=pos, absPos=absolutePos, left=self.leftToProcess return true -method truncate*(self: RangeStream, size: int): Future[bool] {.async.} = +method truncate*(self: RangeStream, size: int): Future[bool] {.async: (raises: [CancelledError, LPStreamError]), base.} = # Range streams are read-only raise newException(LPStreamError, "RangeStream is read-only") # return false # Previous behavior -method getLengthSync*(self: RangeStream): int = +method getLengthSync*(self: RangeStream): int {.base.} = ## Get the total length of the defined range. return self.streamLength \ No newline at end of file