@@ -524,8 +524,14 @@ void client_session::transfer_samples_thread(std::shared_ptr<client_session> /*k
524524 try {
525525 // make a new consumer queue
526526 auto queue = serv_->send_buffer_ ->new_consumer (max_buffered_);
527- // the sequence # is merely used to determine chunk boundaries (no need for int64)
528- uint32_t seqn = 0 ;
527+ // the sequence #s are merely used to determine chunk boundaries (no need for int64)
528+ int samples_in_current_chunk = 0 ;
529+ int max_samples_per_chunk = std::numeric_limits<decltype (samples_in_current_chunk)>::max ();
530+ if (chunk_granularity_)
531+ max_samples_per_chunk = chunk_granularity_;
532+ else if (serv_->chunk_size_ )
533+ max_samples_per_chunk = serv_->chunk_size_ ;
534+
529535 while (!serv_->shutdown_ ) {
530536 try {
531537 // get next sample from the sample queue (blocking)
@@ -534,20 +540,14 @@ void client_session::transfer_samples_thread(std::shared_ptr<client_session> /*k
534540 // ignore blank samples (they are basically wakeup notifiers from someone's
535541 // end_serving())
536542 if (!samp) continue ;
537- // optionally override the pushthrough flag by the chunk size of the receiver (if
538- // set) or of the sender (if set)
539- if (chunk_granularity_)
540- samp->pushthrough = (((++seqn) % (uint32_t )chunk_granularity_) == 0 );
541- else if (serv_->chunk_size_ )
542- samp->pushthrough = (((++seqn) % (uint32_t )serv_->chunk_size_ ) == 0 );
543543 // serialize the sample into the stream
544544 if (data_protocol_version_ >= 110 )
545545 samp->save_streambuf (
546546 feedbuf_, data_protocol_version_, use_byte_order_, scratch_);
547547 else
548548 *outarch_ << *samp;
549- // if the sample shall be pushed though...
550- if (samp->pushthrough ) {
549+ // if the sample is marked as force-push or the configured chunk size is reached
550+ if (samp->pushthrough || ++samples_in_current_chunk >= max_samples_per_chunk ) {
551551 // send off the chunk that we aggregated so far
552552 std::unique_lock<std::mutex> lock (completion_mut_);
553553 transfer_completed_ = false ;
@@ -562,6 +562,7 @@ void client_session::transfer_samples_thread(std::shared_ptr<client_session> /*k
562562 feedbuf_.consume (transfer_amount_);
563563 } else
564564 break ;
565+ samples_in_current_chunk = 0 ;
565566 }
566567 } catch (std::exception &e) {
567568 LOG_F (WARNING, " Unexpected glitch in transfer_samples_thread: %s" , e.what ());
0 commit comments