@@ -3,7 +3,30 @@ import { Readable } from "stream";
33import { GraphClientError } from "../../../GraphClientError" ;
44import { FileObject , SliceType } from "../../LargeFileUploadTask" ;
55import { Range } from "../Range" ;
6+
7+ /**
8+ * @interface
9+ * Interface to store slice of a stream and range of the slice.
10+ * @property {Buffer } fileSlice - The slice of the stream
11+ * @property {Range } range - The range of the slice
12+ */
13+ interface SliceRecord {
14+ fileSlice : Buffer ;
15+ range : Range ;
16+ }
17+
18+ /**
19+ * @class
20+ * FileObject class for Readable Stream upload
21+ */
622export class StreamUpload implements FileObject < Readable > {
23+ /**
24+ * @private
25+ * Represents a cache of the last attempted upload slice.
26+ * This can be used when resuming a previously failed slice upload.
27+ */
28+ private previousSlice : SliceRecord ;
29+
730 public constructor ( public content : Readable , public name : string , public size : number ) {
831 if ( ! content || ! name || ! size ) {
932 throw new GraphClientError ( "Please provide the Readable Stream content, name of the file and size of the file" ) ;
@@ -17,19 +40,68 @@ export class StreamUpload implements FileObject<Readable> {
1740 * @returns The sliced file part
1841 */
1942 public async sliceFile ( range : Range ) : Promise < SliceType > {
20- const rangeSize = range . maxValue - range . minValue + 1 ;
43+ let rangeSize = range . maxValue - range . minValue + 1 ;
2144 /* readable.readable Is true if it is safe to call readable.read(),
2245 * which means the stream has not been destroyed or emitted 'error' or 'end'
2346 */
47+ const bufs = [ ] ;
48+
49+ /**
50+ * The sliceFile reads the first `rangeSize` number of bytes from the stream.
51+ * The previousSlice property is used to seek the range of bytes in the previous slice.
52+ * Suppose, the sliceFile reads bytes from `10 - 20` from the stream but the upload of this slice fails.
53+ * When the user resumes, the stream will have bytes from position 21.
54+ * The previousSlice.Range is used to compare if the requested range is cached in the previousSlice property or present in the Readable Stream.
55+ */
56+ if ( this . previousSlice ) {
57+ if ( range . minValue < this . previousSlice . range . minValue ) {
58+ throw new GraphClientError ( "An error occurred while uploading the stream. Please restart the stream upload from the first byte of the file." ) ;
59+ }
60+
61+ if ( range . minValue < this . previousSlice . range . maxValue ) {
62+ const previousRangeMin = this . previousSlice . range . minValue ;
63+ const previousRangeMax = this . previousSlice . range . maxValue ;
64+
65+ // Check if the requested range is same as previously sliced range
66+ if ( range . minValue === previousRangeMin && range . maxValue === previousRangeMax ) {
67+ return this . previousSlice . fileSlice ;
68+ }
69+
70+ /**
71+ * The following check considers a possibility
72+ * of an upload failing after some of the bytes of the previous slice
73+ * were successfully uploaded.
74+ * Example - Previous slice range - `10 - 20`. Current requested range is `15 - 20`.
75+ */
76+ if ( range . maxValue === previousRangeMax ) {
77+ return this . previousSlice . fileSlice . slice ( range . minValue , range . maxValue + 1 ) ;
78+ }
79+
80+ /**
81+ * If an upload fails after some of the bytes of the previous slice
82+ * were successfully uploaded and the new Range.Maximum is greater than the previous Range.Maximum
83+ * Example - Previous slice range - `10 - 20`. Current requested range is `15 - 25`,
84+ * then read the bytes from position 15 to 20 from previousSlice.fileSlice and read bytes from position 21 to 25 from the Readable Stream
85+ */
86+ bufs . push ( this . previousSlice . fileSlice . slice ( range . minValue , previousRangeMax + 1 ) ) ;
87+
88+ rangeSize = range . maxValue - previousRangeMax ;
89+ }
90+ }
91+
2492 if ( this . content && this . content . readable ) {
2593 if ( this . content . readableLength >= rangeSize ) {
26- return this . content . read ( rangeSize ) ;
94+ bufs . push ( this . content . read ( rangeSize ) ) ;
2795 } else {
28- return await this . readNBytesFromStream ( rangeSize ) ;
96+ bufs . push ( await this . readNBytesFromStream ( rangeSize ) ) ;
2997 }
3098 } else {
3199 throw new GraphClientError ( "Stream is not readable." ) ;
32100 }
101+ const slicedChunk = Buffer . concat ( bufs ) ;
102+ this . previousSlice = { fileSlice : slicedChunk , range } ;
103+
104+ return slicedChunk ;
33105 }
34106
35107 /**
0 commit comments