From 039c43868d94a769c84cadf1acc33de06e59b812 Mon Sep 17 00:00:00 2001 From: Paul Brackin Date: Mon, 10 Nov 2025 14:50:55 -0800 Subject: [PATCH] tweak next_record() to allow time for Q to be populated --- aerospike-core/src/query/recordset.rs | 44 ++++++++++++++++----------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/aerospike-core/src/query/recordset.rs b/aerospike-core/src/query/recordset.rs index baacd9d..af41e1d 100644 --- a/aerospike-core/src/query/recordset.rs +++ b/aerospike-core/src/query/recordset.rs @@ -17,6 +17,8 @@ extern crate rand; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; +use std::thread; +use std::time::Duration; use aerospike_rt::Mutex; use futures::executor::block_on; @@ -129,10 +131,30 @@ impl Recordset { } /// Returns a result from the queue if it exists. Otherwise, returns None. + /// Includes retry logic to handle timing issues where records may not be + /// immediately available when called right after query/scan. pub fn next_record(&self) -> Option> { - match self.rx.try_recv() { - Ok(r) => Some(r), - Err(_) => None, + const MAX_ATTEMPTS: u32 = 1000; // Safety limit to prevent infinite loops + let mut attempts = 0; + + loop { + match self.rx.try_recv() { + Ok(r) => return Some(r), + Err(_) => { + if !self.is_active() { + return None; + } + + if attempts >= MAX_ATTEMPTS { + return None; + } + // Yield to async runtime and sleep briefly to allow background tasks + // to populate the queue + block_on(aerospike_rt::task::yield_now()); + thread::sleep(Duration::from_micros(100)); + attempts += 1; + } + } } } @@ -147,21 +169,9 @@ impl<'a> Iterator for &'a Recordset { type Item = Result; /// Implements a blocking iterator. + /// Uses next_record() which includes retry logic for timing issues. fn next(&mut self) -> Option> { - loop { - let result = self.next_record(); - if result.is_some() { - return result; - } - - if self.is_active() { - block_on(aerospike_rt::task::yield_now()); - continue; - } - - // ends the iterator - return None; - } + self.next_record() } }