@@ -364,7 +364,6 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
364364 bounds_reader. clone ( ) ,
365365 rt. clone ( ) ,
366366 persistence. clone ( ) ,
367- follower_retention_manager,
368367 receive_min_document_snapshot,
369368 document_checkpoint_writer,
370369 snapshot_reader. clone ( ) ,
@@ -804,23 +803,29 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
804803 #[ try_stream( ok = ( Timestamp , Option <( Timestamp , InternalDocumentId ) >) , error = anyhow:: Error ) ]
805804 async fn expired_documents (
806805 rt : & RT ,
807- reader : RepeatablePersistence ,
806+ persistence : Arc < dyn PersistenceReader > ,
808807 cursor : RepeatableTimestamp ,
809808 min_document_snapshot_ts : RepeatableTimestamp ,
810809 ) {
811810 tracing:: trace!(
812811 "expired_documents: reading expired documents from {cursor:?} to {:?}" ,
813812 min_document_snapshot_ts,
814813 ) ;
815- let reader_ = & reader;
814+ let reader = RepeatablePersistence :: new (
815+ persistence,
816+ min_document_snapshot_ts,
817+ // We are reading document log entries from outside of the retention
818+ // window (which we have possibly just shrunk ourselves); there is
819+ // no need to check retention.
820+ Arc :: new ( NoopRetentionValidator ) ,
821+ ) ;
816822 let mut document_chunks = reader
817- . load_documents_with_retention_validator (
823+ . load_documents (
818824 TimestampRange :: new ( * cursor..* min_document_snapshot_ts) ,
819825 Order :: Asc ,
820- Arc :: new ( NoopRetentionValidator ) ,
821826 )
822827 . try_chunks2 ( * RETENTION_READ_CHUNK )
823- . map ( move |chunk| async move {
828+ . map ( |chunk| async {
824829 let chunk = chunk?. to_vec ( ) ;
825830 let mut entries_to_delete: Vec < (
826831 Timestamp ,
@@ -832,11 +837,8 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
832837 // the document was deleted.
833838 // A NoopRetentionValidator is used here because we are fetching revisions
834839 // outside of the document retention window.
835- let prev_revs = reader_
836- . previous_revisions_with_validator (
837- chunk. iter ( ) . map ( |entry| ( entry. id , entry. ts ) ) . collect ( ) ,
838- Arc :: new ( NoopRetentionValidator ) ,
839- )
840+ let prev_revs = reader
841+ . previous_revisions ( chunk. iter ( ) . map ( |entry| ( entry. id , entry. ts ) ) . collect ( ) )
840842 . await ?;
841843 for DocumentLogEntry {
842844 ts,
@@ -920,7 +922,6 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
920922 persistence : Arc < dyn Persistence > ,
921923 rt : & RT ,
922924 cursor : RepeatableTimestamp ,
923- retention_validator : Arc < dyn RetentionValidator > ,
924925 retention_rate_limiter : Arc < RateLimiter < RT > > ,
925926 ) -> anyhow:: Result < ( RepeatableTimestamp , usize ) > {
926927 if !* RETENTION_DOCUMENT_DELETES_ENABLED || * min_snapshot_ts == Timestamp :: MIN {
@@ -936,7 +937,6 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
936937
937938 let reader = persistence. reader ( ) ;
938939 let snapshot_ts = min_snapshot_ts;
939- let reader = RepeatablePersistence :: new ( reader, snapshot_ts, retention_validator. clone ( ) ) ;
940940
941941 tracing:: trace!( "delete_documents: about to grab chunks" ) ;
942942 let expired_chunks = Self :: expired_documents ( rt, reader, cursor, min_snapshot_ts)
@@ -1274,7 +1274,6 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
12741274 bounds_reader : Reader < SnapshotBounds > ,
12751275 rt : RT ,
12761276 persistence : Arc < dyn Persistence > ,
1277- retention_validator : Arc < dyn RetentionValidator > ,
12781277 mut min_document_snapshot_rx : Receiver < RepeatableTimestamp > ,
12791278 mut checkpoint_writer : Writer < Checkpoint > ,
12801279 snapshot_reader : Reader < SnapshotManager > ,
@@ -1329,7 +1328,6 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
13291328 persistence. clone ( ) ,
13301329 & rt,
13311330 cursor,
1332- retention_validator. clone ( ) ,
13331331 retention_rate_limiter. clone ( ) ,
13341332 )
13351333 . await ?;
@@ -1738,7 +1736,6 @@ mod tests {
17381736 } ,
17391737 index:: IndexKey ,
17401738 interval:: Interval ,
1741- knobs:: DOCUMENT_RETENTION_DELAY ,
17421739 persistence:: {
17431740 ConflictStrategy ,
17441741 NoopRetentionValidator ,
@@ -1978,14 +1975,8 @@ mod tests {
19781975 . await ?;
19791976
19801977 let min_snapshot_ts = unchecked_repeatable_ts ( Timestamp :: must ( 4 ) ) ;
1981- // The max repeatable ts needs to be ahead of Timestamp::MIN by at least the
1982- // retention delay, so the anyhow::ensure before we delete doesn't fail
1983- let repeatable_ts =
1984- unchecked_repeatable_ts ( min_snapshot_ts. add ( * DOCUMENT_RETENTION_DELAY ) ?) ;
19851978
19861979 let reader = p. reader ( ) ;
1987- let retention_validator = Arc :: new ( NoopRetentionValidator ) ;
1988- let reader = RepeatablePersistence :: new ( reader, repeatable_ts, retention_validator. clone ( ) ) ;
19891980
19901981 let scanned_stream = LeaderRetentionManager :: < TestRuntime > :: expired_documents (
19911982 & rt,
@@ -2060,19 +2051,12 @@ mod tests {
20602051 . await ?;
20612052
20622053 let min_snapshot_ts = unchecked_repeatable_ts ( Timestamp :: must ( 11 ) ) ;
2063- // The max repeatable ts needs to be ahead of Timestamp::MIN by at least the
2064- // retention delay, so the anyhow::ensure before we delete doesn't fail
2065- let repeatable_ts =
2066- unchecked_repeatable_ts ( min_snapshot_ts. add ( * DOCUMENT_RETENTION_DELAY ) ?) ;
20672054
2068- let retention_validator = Arc :: new ( NoopRetentionValidator ) ;
20692055 let reader = p. reader ( ) ;
2070- let reader =
2071- RepeatablePersistence :: new ( reader. clone ( ) , repeatable_ts, retention_validator. clone ( ) ) ;
20722056
20732057 let scanned_stream = LeaderRetentionManager :: < TestRuntime > :: expired_documents (
20742058 & rt,
2075- reader. clone ( ) ,
2059+ reader,
20762060 RepeatableTimestamp :: MIN ,
20772061 min_snapshot_ts,
20782062 ) ;
0 commit comments