@@ -268,10 +268,10 @@ where
268268 }
269269 } ) ;
270270 tokio:: spawn ( async move {
271- // TODO: Is it important to catch this error? It should also result in an error on the
272- // response stream. If we deem it important, we could one-shot send it into the
273- // BlobAddProgress and return from there. Not sure.
274271 if let Err ( err) = sink. send_all ( & mut input) . await {
272+ // if we get an error in send_all due to the connection being closed, this will just fail again.
273+ // if we get an error due to something else (serialization or size limit), tell the remote to abort.
274+ sink. send ( AddStreamUpdate :: Abort ) . await . ok ( ) ;
275275 warn ! ( "Failed to send input stream to remote: {err:?}" ) ;
276276 }
277277 } ) ;
@@ -281,7 +281,7 @@ where
281281
282282 /// Write a blob by passing bytes.
283283 pub async fn add_bytes ( & self , bytes : impl Into < Bytes > ) -> anyhow:: Result < AddOutcome > {
284- let input = futures_lite :: stream :: once ( Ok ( bytes. into ( ) ) ) ;
284+ let input = chunked_bytes_stream ( bytes. into ( ) , 1024 * 64 ) . map ( Ok ) ;
285285 self . add_stream ( input, SetTagOption :: Auto ) . await ?. await
286286 }
287287
@@ -291,7 +291,7 @@ where
291291 bytes : impl Into < Bytes > ,
292292 name : impl Into < Tag > ,
293293 ) -> anyhow:: Result < AddOutcome > {
294- let input = futures_lite :: stream :: once ( Ok ( bytes. into ( ) ) ) ;
294+ let input = chunked_bytes_stream ( bytes. into ( ) , 1024 * 64 ) . map ( Ok ) ;
295295 self . add_stream ( input, SetTagOption :: Named ( name. into ( ) ) )
296296 . await ?
297297 . await
@@ -987,6 +987,12 @@ pub struct DownloadOptions {
987987 pub mode : DownloadMode ,
988988}
989989
990+ fn chunked_bytes_stream ( mut b : Bytes , c : usize ) -> impl Stream < Item = Bytes > {
991+ futures_lite:: stream:: iter ( std:: iter:: from_fn ( move || {
992+ Some ( b. split_to ( b. len ( ) . min ( c) ) ) . filter ( |x| !x. is_empty ( ) )
993+ } ) )
994+ }
995+
990996#[ cfg( test) ]
991997mod tests {
992998 use std:: { path:: Path , time:: Duration } ;
0 commit comments