@@ -314,15 +314,23 @@ impl Handler<PackagedSplitBatch> for Uploader {
314314 if batch. publish_lock . is_dead ( ) {
315315 // TODO: Remove the junk right away?
316316 info ! ( "splits' publish lock is dead" ) ;
317- split_update_sender. discard ( ) ?;
318- return Ok ( ( ) ) ;
317+ if let Err ( e) = split_update_sender. discard ( ) {
318+ warn ! ( cause=?e, "could not discard split" ) ;
319+ }
320+ return ;
319321 }
320322
321- let split_streamer = SplitPayloadBuilder :: get_split_payload (
323+ let split_streamer = match SplitPayloadBuilder :: get_split_payload (
322324 & packaged_split. split_files ,
323325 & packaged_split. serialized_split_fields ,
324326 & packaged_split. hotcache_bytes ,
325- ) ?;
327+ ) {
328+ Ok ( split_streamer) => split_streamer,
329+ Err ( e) => {
330+ warn ! ( cause=?e, split_id=packaged_split. split_id( ) , "could not create split streamer" ) ;
331+ return ;
332+ }
333+ } ;
326334 let split_metadata = create_split_metadata (
327335 & merge_policy,
328336 retention_policy. as_ref ( ) ,
@@ -340,11 +348,22 @@ impl Handler<PackagedSplitBatch> for Uploader {
340348
341349 }
342350
343- let stage_splits_request = StageSplitsRequest :: try_from_splits_metadata ( index_uid. clone ( ) , split_metadata_list. clone ( ) ) ?;
344- metastore
351+ let stage_splits_request = match StageSplitsRequest :: try_from_splits_metadata ( index_uid. clone ( ) , split_metadata_list. clone ( ) ) {
352+ Ok ( stage_splits_request) => stage_splits_request,
353+ Err ( e) => {
354+ warn ! ( cause=?e, "could not create stage splits request" ) ;
355+ return ;
356+ }
357+ } ;
358+ if let Err ( e) = metastore
345359 . clone ( )
346360 . stage_splits ( stage_splits_request)
347- . await ?;
361+ . await
362+ {
363+ warn ! ( cause=?e, "failed to stage splits" ) ;
364+ return ;
365+ } ;
366+
348367 counters. num_staged_splits . fetch_add ( split_metadata_list. len ( ) as u64 , Ordering :: SeqCst ) ;
349368
350369 let mut packaged_splits_and_metadata = Vec :: with_capacity ( batch. splits . len ( ) ) ;
@@ -363,7 +382,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
363382 if let Err ( cause) = upload_result {
364383 warn ! ( cause=?cause, split_id=packaged_split. split_id( ) , "Failed to upload split. Killing!" ) ;
365384 kill_switch. kill ( ) ;
366- bail ! ( "failed to upload split `{}`. killing the actor context" , packaged_split . split_id ( ) ) ;
385+ return ;
367386 }
368387
369388 packaged_splits_and_metadata. push ( ( packaged_split, metadata) ) ;
@@ -379,11 +398,17 @@ impl Handler<PackagedSplitBatch> for Uploader {
379398 batch. batch_parent_span ,
380399 ) ;
381400
382- split_update_sender. send ( splits_update, & ctx_clone) . await ?;
401+ let target = match & split_update_sender {
402+ SplitsUpdateSender :: Sequencer ( _) => "sequencer" ,
403+ SplitsUpdateSender :: Publisher ( _) => "publisher" ,
404+ } ;
405+ if let Err ( e) = split_update_sender. send ( splits_update, & ctx_clone) . await {
406+ warn ! ( cause=?e, target, "failed to send uploaded split" ) ;
407+ return ;
408+ }
383409 // We explicitly drop it in order to force move the permit guard into the async
384410 // task.
385411 mem:: drop ( permit_guard) ;
386- Result :: < ( ) , anyhow:: Error > :: Ok ( ( ) )
387412 }
388413 . instrument ( Span :: current ( ) ) ,
389414 "upload_single_task"
0 commit comments