@@ -3,6 +3,9 @@ const logger = require('log4js').getLogger("modzy.job-client");
33const fs = require ( 'fs' ) ;
44const FormData = require ( 'form-data' ) ;
55const ApiError = require ( './api-error.js' ) ;
6+ const parseUrl = require ( 'url' ) . parse ;
7+ const humanReadToBytes = require ( "./size" ) ;
8+ const { byteArrayToChunks, fileToChunks} = require ( "./utils" ) ;
69
710/**
811 * Utility class that mask the interaction with the job api
@@ -150,6 +153,7 @@ class JobClient{
150153 */
151154 submitJobFiles ( modelId , versionId , fileSources ) {
152155 let job = { } ;
156+ let chunkSize = 1024 * 1024 ;
153157 return this . submitJob (
154158 {
155159 "model" : {
@@ -160,12 +164,25 @@ class JobClient{
160164 ) . then (
161165 ( openJob ) => {
162166 job = openJob ;
163- let inputPomise = Promise . resolve ( openJob ) ;
167+ return this . getFeatures ( ) ;
168+ }
169+ ) . then (
170+ ( features ) => {
171+ try {
172+ return humanReadToBytes ( features [ "inputChunkMaximumSize" ] ) ;
173+ } catch ( error ) {
174+ logger . warn ( `unexpected error extracting inputChunkMaximumSize from ${ features } , error: ${ error } ` ) ;
175+ return 1024 * 1024 ; //default 1Mi
176+ }
177+ }
178+ ) . then (
179+ ( maxChunkSize ) => {
180+ let inputPomise = Promise . resolve ( job ) ;
164181 Object . keys ( fileSources ) . forEach (
165182 inputItemKey => {
166183 Object . keys ( fileSources [ inputItemKey ] ) . forEach (
167184 dataItemKey => {
168- inputPomise = inputPomise . then ( ( ) => this . appendInput ( openJob , inputItemKey , dataItemKey , fileSources [ inputItemKey ] [ dataItemKey ] ) ) ;
185+ inputPomise = inputPomise . then ( ( ) => this . appendInput ( job , inputItemKey , dataItemKey , fileSources [ inputItemKey ] [ dataItemKey ] , maxChunkSize ) ) ;
169186 }
170187 ) ;
171188 }
@@ -177,7 +194,7 @@ class JobClient{
177194 return this . closeJob ( job ) ;
178195 }
179196 ) . catch (
180- ( apiError ) => {
197+ ( apiError ) => {
181198 //Try to cancel the job
182199 return this . cancelJob ( job . jobIdentifier )
183200 . then ( ( _ ) => { throw ( apiError ) ; } )
@@ -361,37 +378,46 @@ class JobClient{
361378 ) ;
362379 }
363380
364- appendInput ( job , inputItemKey , dataItemKey , value ) {
381+ appendInput ( job , inputItemKey , dataItemKey , inputValue , chunkSize ) {
365382 const requestURL = `${ this . baseURL } /${ job . jobIdentifier } /${ inputItemKey } /${ dataItemKey } ` ;
366- logger . debug ( `appendInput(${ job . jobIdentifier } , ${ inputItemKey } , ${ dataItemKey } ) POST ${ requestURL } ` ) ;
367- const data = new FormData ( ) ;
368- if ( value . byteLength !== undefined ) {
369- data . append ( "input" , value , dataItemKey ) ;
370- } else {
371- //If is a file we need to trick axios
372- data . append ( "input" , fs . createReadStream ( value ) , { knownLength : fs . statSync ( value ) . size } ) ;
383+
384+ let iterator ;
385+ if ( inputValue . byteLength !== undefined ) {
386+ iterator = byteArrayToChunks ( inputValue , chunkSize ) ;
387+ } else {
388+ iterator = fileToChunks ( inputValue , chunkSize ) ;
373389 }
390+ return this . appendInputChunk ( requestURL , iterator , chunkSize , dataItemKey , 0 ) ;
391+ }
374392
375- return axios . post (
376- requestURL ,
377- data ,
378- {
379- headers : {
380- ...data . getHeaders ( ) ,
381- "Content-Length" : data . getLengthSync ( ) ,
382- 'Authorization' : `ApiKey ${ this . apiKey } `
383- }
384- }
385- )
393+ appendInputChunk ( requestURL , asyncGenerator , chunkSize , dataItemKey , chunkCount ) {
394+ return asyncGenerator
395+ . next ( )
386396 . then (
387- ( response ) => {
388- logger . info ( `appendInput(${ job . jobIdentifier } , ${ inputItemKey } , ${ dataItemKey } ) :: ${ response . status } ${ response . statusText } ` ) ;
389- return job ;
390- }
391- )
392- . catch (
393- ( error ) => {
394- throw ( new ApiError ( error ) ) ;
397+ ( entry ) => {
398+ if ( entry && entry . value ) {
399+ return new Promise (
400+ ( resolve , reject ) => {
401+ logger . debug ( `appendInputChunk(${ requestURL } ) [${ chunkCount } ] POST ${ entry . value . length } bytes` ) ;
402+ const requestObj = parseUrl ( requestURL ) ;
403+ requestObj . headers = { 'Authorization' : `ApiKey ${ this . apiKey } ` } ;
404+ const data = new FormData ( { maxDataSize : chunkSize } ) ;
405+ data . append ( "input" , entry . value , dataItemKey ) ;
406+ data . submit ( requestObj , function ( error , response ) {
407+ logger . info ( `appendInputChunk(${ requestURL } ) [${ chunkCount } ] :: ${ response . statusCode } ${ response . statusMessage } ` ) ;
408+ if ( error || response . statusCode >= 400 ) {
409+ reject ( new ApiError ( error , requestURL , response . statusCode , response . statusMessage ) ) ;
410+ }
411+ resolve ( response . resume ( ) ) ;
412+ } ) ;
413+ }
414+ ) . then (
415+ ( _ ) => {
416+ return this . appendInputChunk ( requestURL , asyncGenerator , chunkSize , dataItemKey , chunkCount + 1 ) ;
417+ }
418+ ) ;
419+ }
420+ return null ;
395421 }
396422 ) ;
397423 }
@@ -444,6 +470,31 @@ class JobClient{
444470 ) ;
445471 }
446472
473+ /**
474+ * Call the Modzy API Service that return the jobs features
475+ * @return {Object } a updated job instance
476+ * @throws {ApiError } If there is something wrong with the sevice or the call
477+ */
478+ getFeatures ( ) {
479+ const requestURL = `${ this . baseURL } /features` ;
480+ logger . debug ( `getFeatures() GET ${ requestURL } ` ) ;
481+ return axios . get (
482+ requestURL ,
483+ { headers : { 'Authorization' :`ApiKey ${ this . apiKey } ` } }
484+ )
485+ . then (
486+ ( response ) => {
487+ logger . info ( `getFeatures() :: ${ response . status } ${ response . statusText } ` ) ;
488+ return response . data ;
489+ }
490+ )
491+ . catch (
492+ ( error ) => {
493+ throw ( new ApiError ( error ) ) ;
494+ }
495+ ) ;
496+ }
497+
447498 /**
448499 * Call the Modzy API Service that cancel the Job by it's identifier
449500 * @param {string } jobId - Identifier of the job
0 commit comments