@@ -3,40 +3,42 @@ import validateOffsetAndLength from '../../../utils/validate-offset-and-length.j
33import { UnixFS } from 'ipfs-unixfs'
44import errCode from 'err-code'
55import * as dagPb from '@ipld/dag-pb'
6- import * as dagCbor from '@ipld/dag-cbor'
76import * as raw from 'multiformats/codecs/raw'
7+ import { pushable } from 'it-pushable'
8+ import parallel from 'it-parallel'
9+ import { pipe } from 'it-pipe'
10+ import map from 'it-map'
811
912/**
1013 * @typedef {import('../../../types').ExporterOptions } ExporterOptions
1114 * @typedef {import('interface-blockstore').Blockstore } Blockstore
1215 * @typedef {import('@ipld/dag-pb').PBNode } PBNode
13- *
16+ * @typedef {import('@ipld/dag-pb').PBLink } PBLink
17+ */
18+
19+ /**
1420 * @param {Blockstore } blockstore
15- * @param {PBNode } node
21+ * @param {PBNode | Uint8Array } node
22+ * @param {import('it-pushable').Pushable<Uint8Array | undefined> } queue
23+ * @param {number } streamPosition
1624 * @param {number } start
1725 * @param {number } end
18- * @param {number } streamPosition
1926 * @param {ExporterOptions } options
20- * @returns {AsyncIterable<Uint8Array > }
27+ * @returns {Promise<void > }
2128 */
22- async function * emitBytes ( blockstore , node , start , end , streamPosition = 0 , options ) {
29+ async function walkDAG ( blockstore , node , queue , streamPosition , start , end , options ) {
2330 // a `raw` node
2431 if ( node instanceof Uint8Array ) {
25- const buf = extractDataFromBlock ( node , streamPosition , start , end )
26-
27- if ( buf . length ) {
28- yield buf
29- }
32+ queue . push ( extractDataFromBlock ( node , streamPosition , start , end ) )
3033
31- streamPosition += buf . length
32-
33- return streamPosition
34+ return
3435 }
3536
3637 if ( node . Data == null ) {
3738 throw errCode ( new Error ( 'no data in PBNode' ) , 'ERR_NOT_UNIXFS' )
3839 }
3940
41+ /** @type {UnixFS } */
4042 let file
4143
4244 try {
@@ -46,54 +48,74 @@ async function * emitBytes (blockstore, node, start, end, streamPosition = 0, op
4648 }
4749
4850 // might be a unixfs `raw` node or have data on intermediate nodes
49- if ( file . data && file . data . length ) {
50- const buf = extractDataFromBlock ( file . data , streamPosition , start , end )
51+ if ( file . data != null ) {
52+ const data = file . data
53+ const buf = extractDataFromBlock ( data , streamPosition , start , end )
5154
52- if ( buf . length ) {
53- yield buf
54- }
55+ queue . push ( buf )
5556
56- streamPosition += file . data . length
57+ streamPosition += buf . byteLength
5758 }
5859
59- let childStart = streamPosition
60+ /** @type {Array<{ link: PBLink, blockStart: number }> } */
61+ const childOps = [ ]
6062
61- // work out which child nodes contain the requested data
6263 for ( let i = 0 ; i < node . Links . length ; i ++ ) {
6364 const childLink = node . Links [ i ]
64- const childEnd = streamPosition + file . blockSizes [ i ]
65+ const childStart = streamPosition // inclusive
66+ const childEnd = childStart + file . blockSizes [ i ] // exclusive
6567
6668 if ( ( start >= childStart && start < childEnd ) || // child has offset byte
67- ( end > childStart && end <= childEnd ) || // child has end byte
69+ ( end >= childStart && end <= childEnd ) || // child has end byte
6870 ( start < childStart && end > childEnd ) ) { // child is between offset and end bytes
69- const block = await blockstore . get ( childLink . Hash , {
70- signal : options . signal
71+ childOps . push ( {
72+ link : childLink ,
73+ blockStart : streamPosition
7174 } )
72- let child
73- switch ( childLink . Hash . code ) {
74- case dagPb . code :
75- child = await dagPb . decode ( block )
76- break
77- case raw . code :
78- child = block
79- break
80- case dagCbor . code :
81- child = await dagCbor . decode ( block )
82- break
83- default :
84- throw Error ( `Unsupported codec: ${ childLink . Hash . code } ` )
85- }
86-
87- for await ( const buf of emitBytes ( blockstore , child , start , end , streamPosition , options ) ) {
88- streamPosition += buf . length
89-
90- yield buf
91- }
9275 }
9376
9477 streamPosition = childEnd
95- childStart = childEnd + 1
78+
79+ if ( streamPosition > end ) {
80+ break
81+ }
9682 }
83+
84+ await pipe (
85+ childOps ,
86+ ( source ) => map ( source , ( op ) => {
87+ return async ( ) => {
88+ const block = await blockstore . get ( op . link . Hash , {
89+ signal : options . signal
90+ } )
91+
92+ return {
93+ ...op ,
94+ block
95+ }
96+ }
97+ } ) ,
98+ ( source ) => parallel ( source , {
99+ ordered : true
100+ } ) ,
101+ async ( source ) => {
102+ for await ( const { link, block, blockStart } of source ) {
103+ let child
104+ switch ( link . Hash . code ) {
105+ case dagPb . code :
106+ child = await dagPb . decode ( block )
107+ break
108+ case raw . code :
109+ child = block
110+ break
111+ default :
112+ throw errCode ( new Error ( `Unsupported codec: ${ link . Hash . code } ` ) , 'ERR_NOT_UNIXFS' )
113+ }
114+
115+ await walkDAG ( blockstore , child , queue , blockStart , start , end , options )
116+ }
117+ }
118+ )
97119}
98120
99121/**
@@ -103,7 +125,7 @@ const fileContent = (cid, node, unixfs, path, resolve, depth, blockstore) => {
103125 /**
104126 * @param {ExporterOptions } options
105127 */
106- function yieldFileContent ( options = { } ) {
128+ async function * yieldFileContent ( options = { } ) {
107129 const fileSize = unixfs . fileSize ( )
108130
109131 if ( fileSize === undefined ) {
@@ -115,10 +137,28 @@ const fileContent = (cid, node, unixfs, path, resolve, depth, blockstore) => {
115137 length
116138 } = validateOffsetAndLength ( fileSize , options . offset , options . length )
117139
118- const start = offset
119- const end = offset + length
140+ const queue = pushable ( {
141+ objectMode : true
142+ } )
143+
144+ walkDAG ( blockstore , node , queue , 0 , offset , offset + length , options )
145+ . catch ( err => {
146+ queue . end ( err )
147+ } )
148+
149+ let read = 0
150+
151+ for await ( const buf of queue ) {
152+ if ( buf != null ) {
153+ yield buf
120154
121- return emitBytes ( blockstore , node , start , end , 0 , options )
155+ read += buf . byteLength
156+
157+ if ( read === length ) {
158+ queue . end ( )
159+ }
160+ }
161+ }
122162 }
123163
124164 return yieldFileContent
0 commit comments