@@ -5,10 +5,6 @@ use std::{
55 HashSet ,
66 } ,
77 future:: Future ,
8- hash:: {
9- Hash ,
10- Hasher ,
11- } ,
128 sync:: Arc ,
139} ;
1410
@@ -42,13 +38,7 @@ use common::{
4238use futures:: FutureExt ;
4339use hashlink:: LinkedHashSet ;
4440use keybroker:: Identity ;
45- use tokio:: {
46- select,
47- sync:: {
48- OwnedSemaphorePermit ,
49- Semaphore ,
50- } ,
51- } ;
41+ use tokio:: select;
5242use tokio_util:: task:: JoinMap ;
5343use value:: {
5444 DeveloperDocumentId ,
@@ -77,15 +67,16 @@ use crate::{
7767pub mod index_writer;
7868
7969pub struct IndexWorker < RT : Runtime > {
80- // Index IDs that are currently being backfilled.
70+ /// Index IDs that are currently being backfilled.
8171 in_progress_index_ids : HashSet < IndexId , ahash:: RandomState > ,
82- /// Order-preserving HashMap that represents the order that pending
83- /// index backfills will be loaded in.
84- pending : LinkedHashSet < IndexBackfill , ahash:: RandomState > ,
85- // Index IDs that are pending backfill.
86- pending_index_ids : HashSet < IndexId , ahash:: RandomState > ,
87- concurrency_limiter : Arc < Semaphore > ,
88- work_pool : JoinMap < IndexBackfill , anyhow:: Result < ( ) > > ,
72+ /// The index backfill tasks
73+ in_progress : JoinMap < Vec < IndexId > , anyhow:: Result < ( ) > > ,
74+ /// Order-preserving HashSet that represents the order that pending index
75+ /// backfills will be processed. This does not include indexes that are
76+ /// `in_progress`.
77+ pending : LinkedHashSet < ( IndexId , TabletId ) , ahash:: RandomState > ,
78+ /// Limit on the size of `in_progress`
79+ max_concurrency : usize ,
8980 database : Database < RT > ,
9081 index_writer : IndexWriter < RT > ,
9182 backoff : Backoff ,
@@ -94,19 +85,6 @@ pub struct IndexWorker<RT: Runtime> {
9485 pub should_terminate : bool ,
9586}
9687
97- #[ derive( Clone , Debug , Eq , PartialEq ) ]
98- pub struct IndexBackfill {
99- pub tablet_id : TabletId ,
100- pub index_ids : Vec < IndexId > ,
101- }
102-
103- impl Hash for IndexBackfill {
104- fn hash < H : Hasher > ( & self , state : & mut H ) {
105- self . tablet_id . hash ( state) ;
106- self . index_ids . hash ( state) ;
107- }
108- }
109-
11088impl < RT : Runtime > IndexWorker < RT > {
11189 #[ allow( clippy:: new_ret_no_self) ]
11290 pub fn new (
@@ -124,10 +102,9 @@ impl<RT: Runtime> IndexWorker<RT> {
124102 ) ;
125103 let mut worker = IndexWorker {
126104 in_progress_index_ids : Default :: default ( ) ,
105+ in_progress : JoinMap :: new ( ) ,
127106 pending : Default :: default ( ) ,
128- pending_index_ids : Default :: default ( ) ,
129- concurrency_limiter : Arc :: new ( Semaphore :: new ( * INDEX_BACKFILL_CONCURRENCY ) ) ,
130- work_pool : JoinMap :: new ( ) ,
107+ max_concurrency : * INDEX_BACKFILL_CONCURRENCY ,
131108 database,
132109 index_writer,
133110 backoff : Backoff :: new ( * INDEX_WORKERS_INITIAL_BACKOFF , * INDEX_WORKERS_MAX_BACKOFF ) ,
@@ -171,10 +148,9 @@ impl<RT: Runtime> IndexWorker<RT> {
171148 ) ;
172149 let mut worker = IndexWorker {
173150 in_progress_index_ids : Default :: default ( ) ,
151+ in_progress : JoinMap :: new ( ) ,
174152 pending : Default :: default ( ) ,
175- pending_index_ids : Default :: default ( ) ,
176- concurrency_limiter : Arc :: new ( Semaphore :: new ( 10 ) ) ,
177- work_pool : JoinMap :: new ( ) ,
153+ max_concurrency : 10 ,
178154 database,
179155 index_writer,
180156 backoff : Backoff :: new ( * INDEX_WORKERS_INITIAL_BACKOFF , * INDEX_WORKERS_MAX_BACKOFF ) ,
@@ -194,31 +170,32 @@ impl<RT: Runtime> IndexWorker<RT> {
194170 tracing:: error!( "IndexWorker loop failed: {e:?}" ) ;
195171 continue ;
196172 }
197- if worker. work_pool . is_empty ( ) && worker. pending . is_empty ( ) {
173+ if worker. in_progress . is_empty ( ) && worker. pending . is_empty ( ) {
198174 return r;
199175 }
200176 }
201177 }
202178 }
203179
204180 async fn run ( & mut self ) -> anyhow:: Result < ( ) > {
205- let mut backfill_queue = Vec :: new ( ) ;
206181 // Get all the documents from the `_index` table.
207182 let mut tx = self . database . begin ( Identity :: system ( ) ) . await ?;
208183 // _index doesn't have `by_creation_time` index, and thus must use `by_id`.
209184 let index_documents = tx
210185 . query_system ( TableNamespace :: Global , & SystemIndex :: < IndexTable > :: by_id ( ) ) ?
211186 . all ( )
212187 . await ?;
213- let mut to_backfill_by_tablet = BTreeMap :: new ( ) ;
214188 let mut num_to_backfill = 0 ;
215189 for index_metadata in & index_documents {
216190 if let IndexConfig :: Database { on_disk_state, .. } = & index_metadata. config {
217191 if matches ! ( on_disk_state, DatabaseIndexState :: Backfilling ( _) ) {
218- to_backfill_by_tablet
219- . entry ( * index_metadata. name . table ( ) )
220- . or_insert_with ( Vec :: new)
221- . push ( index_metadata. id ( ) . internal_id ( ) ) ;
192+ let index_id = index_metadata. id ( ) . internal_id ( ) ;
193+ let tablet_id = * index_metadata. name . table ( ) ;
194+ if !self . in_progress_index_ids . contains ( & index_id)
195+ && !self . pending . contains ( & ( index_id, tablet_id) )
196+ {
197+ self . pending . insert ( ( index_id, tablet_id) ) ;
198+ }
222199 num_to_backfill += 1 ;
223200 }
224201 }
@@ -229,118 +206,73 @@ impl<RT: Runtime> IndexWorker<RT> {
229206 tx. begin_timestamp( )
230207 ) ;
231208
232- for ( tablet_id, index_ids) in to_backfill_by_tablet {
233- let mut to_backfill = Vec :: new ( ) ;
234- for index_id in index_ids {
235- if !self . in_progress_index_ids . contains ( & index_id)
236- && !self . pending_index_ids . contains ( & index_id)
237- {
238- to_backfill. push ( index_id) ;
239- }
240- }
241- backfill_queue. push ( IndexBackfill {
242- tablet_id,
243- index_ids : to_backfill,
244- } ) ;
245- }
246-
247209 let token = tx. into_token ( ) ?;
248210 let subscription = self . database . subscribe ( token) . await ?;
249- for index_backfill in backfill_queue. clone ( ) {
250- self . pending . insert ( index_backfill. clone ( ) ) ;
251- self . pending_index_ids . extend ( index_backfill. index_ids ) ;
252- }
253211
254212 #[ cfg( any( test, feature = "testing" ) ) ]
255- if self . should_terminate
256- && backfill_queue. is_empty ( )
257- && self . work_pool . is_empty ( )
258- && self . pending . is_empty ( )
259- {
213+ if self . should_terminate && self . in_progress . is_empty ( ) && self . pending . is_empty ( ) {
260214 return Ok ( ( ) ) ;
261215 }
262216
217+ // Start new work if allowed by the concurrency limit
218+ while self . in_progress . len ( ) < self . max_concurrency
219+ && let Some ( ( index_id, tablet_id) ) = self . pending . pop_front ( )
220+ {
221+ self . queue_index_backfill ( index_id, tablet_id) ;
222+ }
263223 select ! {
264224 biased;
265225 // Start by finding indexes that have finished backfilling
266- ( index_backfill, res) = self . work_pool. join_next( ) . map( |o| o. expect( "Can't be none if work pool is nonempty" ) ) , if !self . work_pool. is_empty( ) => {
267- if let Err ( e) = res {
268- tracing:: error!( "IndexWorker loop failed: {e:?}" ) ;
269- report_error( & mut anyhow:: anyhow!( format!( "{e:?}: IndexWorker loop failed on backfill {index_backfill:?}" ) ) ) . await ;
270- }
271-
272- for index_id in & index_backfill. index_ids {
273- self . in_progress_index_ids. remove( index_id) ;
226+ res = self . in_progress. join_next( ) , if !self . in_progress. is_empty( ) => {
227+ let ( index_ids, res) = res. expect( "join_next cannot return None if nonempty" ) ;
228+ // First, make sure `in_progress_index_ids` is always consistent with `in_progress`
229+ for & index_id in & index_ids {
230+ self . in_progress_index_ids. remove( & index_id) ;
274231 }
232+ // If backfill tasks are failing, return an error here so that we back off
233+ let ( ) = res??;
234+ tracing:: info!( "Finished backfilling {index_ids:?}" ) ;
235+ // Return so that we possibly queue up more work
275236 }
276- // Next, queue up new work
277- permit = self . concurrency_limiter. clone( ) . acquire_owned( ) ,
278- if !self . pending. is_empty( ) =>
279- {
280- let permit = permit. expect( "Semaphore can't go away" ) ;
281- self . queue_index_backfill( permit) ;
282- }
283- // Finally, wait for invalidation
237+ // Alternatively, wait for invalidation
284238 _ = subscription. wait_for_invalidation( ) . fuse( ) => {
285- tracing:: info!( "IndexIndexWorker received invalidation, going to sleep" ) ;
286239 self . backoff. reset( ) ;
287240 }
288241 }
289242
290243 Ok ( ( ) )
291244 }
292245
293- /// Spawns a task to load the process the next index backfill. The `permit`
294- /// is held to ensure we limit our concurrency.
295- fn queue_index_backfill ( & mut self , permit : OwnedSemaphorePermit ) {
296- // Try to queue an index backfill
297- let Some ( index_backfill) = self . pending . pop_front ( ) else {
298- return ;
299- } ;
300- for index_id in & index_backfill. index_ids {
301- self . pending_index_ids . remove ( index_id) ;
302- }
246+ /// Spawns a task to process the next index backfill.
247+ fn queue_index_backfill ( & mut self , index_id : IndexId , tablet_id : TabletId ) {
248+ let mut index_ids = vec ! [ index_id] ;
249+ // Since we're mainly limited by the speed of reading the table, let's
250+ // grab all the other pending indexes for this table at once
251+ self . pending . retain ( |& ( other_index_id, other_tablet_id) | {
252+ if other_tablet_id == tablet_id {
253+ index_ids. push ( other_index_id) ;
254+ false
255+ } else {
256+ true
257+ }
258+ } ) ;
259+
260+ // TODO: this allows more than one `backfill_tablet` task to run
261+ // simultaneously on the same table; it could be better to cancel old
262+ // tasks and restart them.
303263
304- if self . work_pool . contains_key ( & index_backfill) {
305- let IndexBackfill { tablet_id, .. } = index_backfill;
306- tracing:: warn!( "Skipping duplicate index backfill {tablet_id} (already in progress)" ) ;
307- return ;
264+ for & index_id in & index_ids {
265+ self . in_progress_index_ids . insert ( index_id) ;
308266 }
309- if index_backfill
310- . index_ids
311- . iter ( )
312- . any ( |index_id| self . in_progress_index_ids . contains ( index_id) )
313- {
314- let IndexBackfill {
267+ self . in_progress . spawn (
268+ index_ids. clone ( ) ,
269+ Self :: backfill_tablet (
315270 tablet_id,
316271 index_ids,
317- ..
318- } = index_backfill;
319- tracing:: warn!(
320- "Skipping backfill of indexes {index_ids:?} on tablet {tablet_id} until current \
321- backfill completes"
322- ) ;
323- return ;
324- }
325-
326- let IndexBackfill {
327- tablet_id,
328- index_ids,
329- ..
330- } = index_backfill. clone ( ) ;
331-
332- let database = self . database . clone ( ) ;
333- let index_writer = self . index_writer . clone ( ) ;
334-
335- for index_id in & index_ids {
336- self . in_progress_index_ids . insert ( * index_id) ;
337- }
338-
339- self . work_pool . spawn ( index_backfill, async move {
340- let _permit = permit;
341-
342- Self :: backfill_tablet ( tablet_id, index_ids, database, index_writer) . await
343- } ) ;
272+ self . database . clone ( ) ,
273+ self . index_writer . clone ( ) ,
274+ ) ,
275+ ) ;
344276 }
345277
346278 async fn backfill_tablet (
0 commit comments