@@ -22,9 +22,14 @@ final class ResumeUploader
2222 private $ params ;
2323 private $ mime ;
2424 private $ contexts ;
25+ private $ finishedEtags ;
2526 private $ host ;
27+ private $ bucket ;
2628 private $ currentUrl ;
2729 private $ config ;
30+ private $ resumeRecordFile ;
31+ private $ version ;
32+ private $ partSize ;
2833
2934 /**
3035 * 上传二进制流到七牛
@@ -36,6 +41,9 @@ final class ResumeUploader
3641 * @param string $params 自定义变量
3742 * @param string $mime 上传数据的mimeType
3843 * @param string $config
44+ * @param string $resumeRecordFile 断点续传的已上传的部分信息记录文件
45+ * @param string $version 分片上传版本 目前支持v1/v2版本 默认v1
46+ * @param string $partSize 分片上传v2必传字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
3947 *
4048 * @link http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
4149 */
@@ -46,7 +54,10 @@ public function __construct(
4654 $ size ,
4755 $ params ,
4856 $ mime ,
49- $ config
57+ $ config ,
58+ $ resumeRecordFile = null ,
59+ $ version = 'v1 ' ,
60+ $ partSize = config::BLOCK_SIZE
5061 ) {
5162
5263 $ this ->upToken = $ upToken ;
@@ -56,9 +67,14 @@ public function __construct(
5667 $ this ->params = $ params ;
5768 $ this ->mime = $ mime ;
5869 $ this ->contexts = array ();
70+ $ this ->finishedEtags = array ("etags " =>array (), "uploadId " =>"" , "expiredAt " =>0 , "uploaded " =>0 );
5971 $ this ->config = $ config ;
72+ $ this ->resumeRecordFile = $ resumeRecordFile ? $ resumeRecordFile : null ;
73+ $ this ->version = $ version ? $ version : 'v1 ' ;
74+ $ this ->partSize = $ partSize ? $ partSize : config::BLOCK_SIZE ;
6075
6176 list ($ accessKey , $ bucket , $ err ) = \Qiniu \explodeUpToken ($ upToken );
77+ $ this ->bucket = $ bucket ;
6278 if ($ err != null ) {
6379 return array (null , $ err );
6480 }
@@ -76,14 +92,88 @@ public function __construct(
7692 public function upload ($ fname )
7793 {
7894 $ uploaded = 0 ;
95+ if ($ this ->version == 'v2 ' ) {
96+ $ partNumber = 1 ;
97+ $ encodedObjectName = $ this ->key ? \Qiniu \base64_urlSafeEncode ($ this ->key ) : '~ ' ;
98+ };
99+ // get upload record from resumeRecordFile
100+ if ($ this ->resumeRecordFile != null ) {
101+ $ blkputRets = null ;
102+ if (file_exists ($ this ->resumeRecordFile )) {
103+ $ stream = fopen ($ this ->resumeRecordFile , 'r ' );
104+ if ($ stream ) {
105+ $ streamLen = filesize ($ this ->resumeRecordFile );
106+ if ($ streamLen > 0 ) {
107+ $ contents = fread ($ stream , $ streamLen );
108+ fclose ($ stream );
109+ if ($ contents ) {
110+ $ blkputRets = json_decode ($ contents , true );
111+ if ($ blkputRets === null ) {
112+ error_log ("resumeFile contents decode error " );
113+ }
114+ } else {
115+ error_log ("read resumeFile failed " );
116+ }
117+ } else {
118+ error_log ("resumeFile is empty " );
119+ }
120+ } else {
121+ error_log ("resumeFile open failed " );
122+ }
123+ } else {
124+ error_log ("resumeFile not exists " );
125+ }
126+
127+ if ($ blkputRets ) {
128+ if ($ this ->version == 'v1 ' ) {
129+ if (isset ($ blkputRets ['contexts ' ]) && isset ($ blkputRets ['uploaded ' ]) &&
130+ is_array ($ blkputRets ['contexts ' ]) && is_int ($ blkputRets ['uploaded ' ])) {
131+ $ this ->contexts = $ blkputRets ['contexts ' ];
132+ $ uploaded = $ blkputRets ['uploaded ' ];
133+ }
134+ } elseif ($ this ->version == 'v2 ' ) {
135+ if (isset ($ blkputRets ["etags " ]) && isset ($ blkputRets ["uploadId " ]) &&
136+ isset ($ blkputRets ["expiredAt " ]) && $ blkputRets ["expiredAt " ] > time ()
137+ && $ blkputRets ["uploaded " ] > 0 && is_array ($ blkputRets ["etags " ]) &&
138+ is_string ($ blkputRets ["uploadId " ]) && is_int ($ blkputRets ["expiredAt " ])) {
139+ $ this ->finishedEtags ['etags ' ] = $ blkputRets ["etags " ];
140+ $ this ->finishedEtags ["uploadId " ] = $ blkputRets ["uploadId " ];
141+ $ this ->finishedEtags ["expiredAt " ] = $ blkputRets ["expiredAt " ];
142+ $ this ->finishedEtags ["uploaded " ] = $ blkputRets ["uploaded " ];
143+ $ uploaded = $ blkputRets ["uploaded " ];
144+ $ partNumber = count ($ this ->finishedEtags ["etags " ]) + 1 ;
145+ } else {
146+ $ this ->makeInitReq ($ encodedObjectName );
147+ }
148+ } else {
149+ throw new \Exception ("only support v1/v2 now! " );
150+ }
151+ } else {
152+ if ($ this ->version == 'v2 ' ) {
153+ $ this ->makeInitReq ($ encodedObjectName );
154+ }
155+ }
156+ } else {
157+ // init a Multipart Upload task if choose v2
158+ if ($ this ->version == 'v2 ' ) {
159+ $ this ->makeInitReq ($ encodedObjectName );
160+ }
161+ }
162+
79163 while ($ uploaded < $ this ->size ) {
80164 $ blockSize = $ this ->blockSize ($ uploaded );
81165 $ data = fread ($ this ->inputStream , $ blockSize );
82166 if ($ data === false ) {
83167 throw new \Exception ("file read failed " , 1 );
84168 }
85- $ crc = \Qiniu \crc32_data ($ data );
86- $ response = $ this ->makeBlock ($ data , $ blockSize );
169+ if ($ this ->version == 'v1 ' ) {
170+ $ crc = \Qiniu \crc32_data ($ data );
171+ $ response = $ this ->makeBlock ($ data , $ blockSize );
172+ } else {
173+ $ md5 = md5 ($ data );
174+ $ response = $ this ->uploadPart ($ data , $ partNumber , $ this ->finishedEtags ["uploadId " ], $ encodedObjectName );
175+ }
176+
87177 $ ret = null ;
88178 if ($ response ->ok () && $ response ->json () != null ) {
89179 $ ret = $ response ->json ();
@@ -93,22 +183,69 @@ public function upload($fname)
93183 if ($ err != null ) {
94184 return array (null , $ err );
95185 }
96-
97186 $ upHostBackup = $ this ->config ->getUpBackupHost ($ accessKey , $ bucket );
98187 $ this ->host = $ upHostBackup ;
99188 }
100- if ($ response ->needRetry () || !isset ($ ret ['crc32 ' ]) || $ crc != $ ret ['crc32 ' ]) {
101- $ response = $ this ->makeBlock ($ data , $ blockSize );
102- $ ret = $ response ->json ();
103- }
104189
105- if (!$ response ->ok () || !isset ($ ret ['crc32 ' ]) || $ crc != $ ret ['crc32 ' ]) {
106- return array (null , new Error ($ this ->currentUrl , $ response ));
190+ if ($ this ->version == 'v1 ' ) {
191+ if ($ response ->needRetry () || !isset ($ ret ['crc32 ' ]) || $ crc != $ ret ['crc32 ' ]) {
192+ $ response = $ this ->makeBlock ($ data , $ blockSize );
193+ $ ret = $ response ->json ();
194+ }
195+
196+ if (!$ response ->ok () || !isset ($ ret ['crc32 ' ]) || $ crc != $ ret ['crc32 ' ]) {
197+ return array (null , new Error ($ this ->currentUrl , $ response ));
198+ }
199+ array_push ($ this ->contexts , $ ret ['ctx ' ]);
200+ } else {
201+ if ($ response ->needRetry () || !isset ($ ret ['md5 ' ]) || $ md5 != $ ret ['md5 ' ]) {
202+ $ response = $ this ->uploadPart (
203+ $ data ,
204+ $ partNumber ,
205+ $ this ->finishedEtags ["uploadId " ],
206+ $ encodedObjectName
207+ );
208+ $ ret = $ response ->json ();
209+ }
210+
211+ if (!$ response ->ok () || !isset ($ ret ['md5 ' ]) || $ md5 != $ ret ['md5 ' ]) {
212+ return array (null , new Error ($ this ->currentUrl , $ response ));
213+ }
214+ $ blockStatus = array ('etag ' => $ ret ['etag ' ], 'partNumber ' => $ partNumber );
215+ array_push ($ this ->finishedEtags ['etags ' ], $ blockStatus );
216+ $ partNumber += 1 ;
107217 }
108- array_push ( $ this -> contexts , $ ret [ ' ctx ' ]);
218+
109219 $ uploaded += $ blockSize ;
220+ if ($ this ->version == 'v2 ' ) {
221+ $ this ->finishedEtags ['uploaded ' ] = $ uploaded ;
222+ }
223+
224+ if ($ this ->resumeRecordFile !== null ) {
225+ if ($ this ->version == 'v1 ' ) {
226+ $ recordData = array (
227+ 'contexts ' => $ this ->contexts ,
228+ 'uploaded ' => $ uploaded
229+ );
230+ $ recordData = json_encode ($ recordData );
231+ } else {
232+ $ recordData = json_encode ($ this ->finishedEtags );
233+ }
234+ if ($ recordData ) {
235+ $ isWritten = file_put_contents ($ this ->resumeRecordFile , $ recordData );
236+ if ($ isWritten === false ) {
237+ error_log ("write resumeRecordFile failed " );
238+ }
239+ } else {
240+ error_log ('resumeRecordData encode failed ' );
241+ }
242+ }
243+ }
244+ if ($ this ->version == 'v1 ' ) {
245+ return $ this ->makeFile ($ fname );
246+ } else {
247+ return $ this ->completeParts ($ fname , $ this ->finishedEtags ['uploadId ' ], $ encodedObjectName );
110248 }
111- return $ this ->makeFile ($ fname );
112249 }
113250
114251 /**
@@ -163,9 +300,84 @@ private function post($url, $data)
163300
164301 private function blockSize ($ uploaded )
165302 {
166- if ($ this ->size < $ uploaded + Config:: BLOCK_SIZE ) {
303+ if ($ this ->size < $ uploaded + $ this -> partSize ) {
167304 return $ this ->size - $ uploaded ;
168305 }
169- return Config::BLOCK_SIZE ;
306+ return $ this ->partSize ;
307+ }
308+
309+ private function makeInitReq ($ encodedObjectName )
310+ {
311+ $ res = $ this ->initReq ($ encodedObjectName );
312+ $ this ->finishedEtags ["uploadId " ] = $ res ['uploadId ' ];
313+ $ this ->finishedEtags ["expiredAt " ] = $ res ['expireAt ' ];
314+ }
315+
316+ /**
317+ * 初始化上传任务
318+ */
319+ private function initReq ($ encodedObjectName )
320+ {
321+ $ url = $ this ->host .'/buckets/ ' .$ this ->bucket .'/objects/ ' .$ encodedObjectName .'/uploads ' ;
322+ $ headers = array (
323+ 'Authorization ' => 'UpToken ' . $ this ->upToken ,
324+ 'Content-Type ' => 'application/json '
325+ );
326+ $ response = $ this ->postWithHeaders ($ url , null , $ headers );
327+ return $ response ->json ();
328+ }
329+
330+ /**
331+ * 分块上传v2
332+ */
333+ private function uploadPart ($ block , $ partNumber , $ uploadId , $ encodedObjectName )
334+ {
335+ $ headers = array (
336+ 'Authorization ' => 'UpToken ' . $ this ->upToken ,
337+ 'Content-Type ' => 'application/octet-stream ' ,
338+ 'Content-MD5 ' => $ block
339+ );
340+ $ url = $ this ->host .'/buckets/ ' .$ this ->bucket .'/objects/ ' .$ encodedObjectName .
341+ '/uploads/ ' .$ uploadId .'/ ' .$ partNumber ;
342+ $ response = $ this ->put ($ url , $ block , $ headers );
343+ return $ response ;
344+ }
345+
346+ private function completeParts ($ fname , $ uploadId , $ encodedObjectName )
347+ {
348+ $ headers = array (
349+ 'Authorization ' => 'UpToken ' .$ this ->upToken ,
350+ 'Content-Type ' => 'application/json '
351+ );
352+ $ etags = $ this ->finishedEtags ['etags ' ];
353+ $ sortedEtags = \Qiniu \arraySort ($ etags , 'partNumber ' );
354+ $ body = array (
355+ 'fname ' => $ fname ,
356+ '$mimeType ' => $ this ->mime ,
357+ 'customVars ' => $ this ->params ,
358+ 'parts ' => $ sortedEtags
359+ );
360+ $ jsonBody = json_encode ($ body );
361+ $ url = $ this ->host .'/buckets/ ' .$ this ->bucket .'/objects/ ' .$ encodedObjectName .'/uploads/ ' .$ uploadId ;
362+ $ response = $ this ->postWithHeaders ($ url , $ jsonBody , $ headers );
363+ if ($ response ->needRetry ()) {
364+ $ response = $ this ->postWithHeaders ($ url , $ jsonBody , $ headers );
365+ }
366+ if (!$ response ->ok ()) {
367+ return array (null , new Error ($ this ->currentUrl , $ response ));
368+ }
369+ return array ($ response ->json (), null );
370+ }
371+
372+ private function put ($ url , $ data , $ headers )
373+ {
374+ $ this ->currentUrl = $ url ;
375+ return Client::put ($ url , $ data , $ headers );
376+ }
377+
378+ private function postWithHeaders ($ url , $ data , $ headers )
379+ {
380+ $ this ->currentUrl = $ url ;
381+ return Client::post ($ url , $ data , $ headers );
170382 }
171383}
0 commit comments