diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index ae4ff43781..83faf336b7 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -67,6 +67,7 @@ dependencies { api(project(":polaris-persistence-nosql-mongodb")) api(project(":polaris-persistence-nosql-maintenance-api")) + api(project(":polaris-persistence-nosql-maintenance-impl")) api(project(":polaris-persistence-nosql-maintenance-cel")) api(project(":polaris-persistence-nosql-maintenance-spi")) diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 244e647cd8..d6ecdb8309 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -80,6 +80,7 @@ polaris-persistence-nosql-testextension=persistence/nosql/persistence/testextens polaris-persistence-nosql-varint=persistence/nosql/persistence/varint # persistence / maintenance polaris-persistence-nosql-maintenance-api=persistence/nosql/persistence/maintenance/api +polaris-persistence-nosql-maintenance-impl=persistence/nosql/persistence/maintenance/impl polaris-persistence-nosql-maintenance-cel=persistence/nosql/persistence/maintenance/retain-cel polaris-persistence-nosql-maintenance-spi=persistence/nosql/persistence/maintenance/spi # persistence / database specific implementations diff --git a/persistence/nosql/persistence/maintenance/impl/build.gradle.kts b/persistence/nosql/persistence/maintenance/impl/build.gradle.kts new file mode 100644 index 0000000000..058a0483d2 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/build.gradle.kts @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + id("polaris-server") +} + +description = "Polaris NoSQL persistence maintenance - service implementation" + +dependencies { + implementation(project(":polaris-persistence-nosql-api")) + implementation(project(":polaris-persistence-nosql-maintenance-api")) + implementation(project(":polaris-persistence-nosql-maintenance-spi")) + implementation(project(":polaris-persistence-nosql-realms-api")) + implementation(project(":polaris-idgen-api")) + runtimeOnly(project(":polaris-persistence-nosql-realms-impl")) + runtimeOnly(project(":polaris-persistence-nosql-realms-store-nosql")) + + implementation(libs.guava) + implementation(libs.slf4j.api) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") + compileOnly("com.fasterxml.jackson.core:jackson-databind") + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.jakarta.enterprise.cdi.api) + + compileOnly(project(":polaris-immutables")) + annotationProcessor(project(":polaris-immutables", configuration = "processor")) + + testFixturesApi(project(":polaris-persistence-nosql-api")) + testFixturesApi(project(":polaris-persistence-nosql-maintenance-api")) + testFixturesApi(project(":polaris-persistence-nosql-maintenance-spi")) + testFixturesApi(project(":polaris-persistence-nosql-testextension")) + + testFixturesCompileOnly(project(":polaris-immutables")) + testFixturesAnnotationProcessor(project(":polaris-immutables", configuration = "processor")) + + testFixturesCompileOnly(platform(libs.jackson.bom)) + testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-annotations") + testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-databind") + + testFixturesImplementation(libs.jakarta.annotation.api) + testFixturesImplementation(libs.jakarta.validation.api) + testFixturesCompileOnly(libs.jakarta.enterprise.cdi.api) + + testCompileOnly(platform(libs.jackson.bom)) + testCompileOnly("com.fasterxml.jackson.core:jackson-annotations") + testCompileOnly("com.fasterxml.jackson.core:jackson-databind") + + testRuntimeOnly(libs.logback.classic) + + testImplementation(project(":polaris-idgen-mocks")) + testRuntimeOnly(testFixtures(project(":polaris-persistence-nosql-cdi-weld"))) + testImplementation(libs.weld.se.core) + testImplementation(libs.weld.junit5) + testRuntimeOnly(libs.smallrye.jandex) + + testRuntimeOnly(project(":polaris-persistence-nosql-realms-impl")) + testRuntimeOnly(project(":polaris-persistence-nosql-realms-store-nosql")) + testRuntimeOnly(project(":polaris-persistence-nosql-inmemory")) + testImplementation(testFixtures(project(":polaris-persistence-nosql-inmemory"))) +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AbstractScanItemStatsCollector.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AbstractScanItemStatsCollector.java new file mode 100644 index 0000000000..5fde8273bf --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AbstractScanItemStatsCollector.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import jakarta.annotation.Nonnull; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation.MaintenanceStats; + +abstract class AbstractScanItemStatsCollector implements ScanItemCallback { + final StatsHolder stats = new StatsHolder(); + + /** Collect maintenance-run stats for objects per realm. */ + static final class ScanRefStatsCollector extends AbstractScanItemStatsCollector { + final Map perRealm = new HashMap<>(); + + /** Handles the maintenance-run outcome for a reference in a realm. */ + @Override + public void itemOutcome( + @Nonnull String realm, @Nonnull String ref, @Nonnull ScanItemOutcome outcome) { + stats.add(outcome); + perRealm.computeIfAbsent(realm, realmId -> new StatsHolder()).add(outcome); + } + + /** Retrieve maintenance-run reference stats per realm. */ + Map toRealmObjTypeStatsMap() { + return perRealm.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, r -> r.getValue().toMaintenanceStats())); + } + } + + /** + * Collect maintenance-run stats for objects per realm and {@linkplain + * org.apache.polaris.persistence.nosql.api.obj.ObjType object type}. + */ + static final class ScanObjStatsCollector extends AbstractScanItemStatsCollector { + final Map> perRealmAndObjType = new HashMap<>(); + + /** Handles the maintenance-run outcome for an object in a realm. */ + @Override + public void itemOutcome( + @Nonnull String realm, @Nonnull ObjRef id, @Nonnull ScanItemOutcome outcome) { + stats.add(outcome); + perRealmAndObjType + .computeIfAbsent(realm, realmId -> new HashMap<>()) + .computeIfAbsent(id.type(), objType -> new StatsHolder()) + .add(outcome); + } + + /** + * Retrieve maintenance-run reference stats per realm and {@linkplain + * org.apache.polaris.persistence.nosql.api.obj.ObjType object type}. + */ + Map> toRealmObjTypeStatsMap() { + return perRealmAndObjType.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + r -> + r.getValue().entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, ot -> ot.getValue().toMaintenanceStats())))); + } + } + + /** Maintenance stats holder. */ + static final class StatsHolder { + long scanned; + long newer; + long retained; + long purged; + + /** Consume the outcome for a reference or object-type. */ + void add(ScanItemOutcome outcome) { + scanned++; + switch (outcome) { + case REALM_PURGE, PURGED -> purged++; + case TOO_NEW_RETAINED -> newer++; + case RETAINED, UNHANDLED_RETAINED -> retained++; + default -> throw new IllegalStateException("Unknown outcome " + outcome); + } + } + + /** Produce the serializable-stats container. */ + MaintenanceStats toMaintenanceStats() { + return MaintenanceStats.builder() + .scanned(scanned) + .newer(newer) + .retained(retained) + .purged(purged) + .build(); + } + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AllRetained.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AllRetained.java new file mode 100644 index 0000000000..f49cf06cb9 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AllRetained.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import static java.util.Map.entry; + +import com.google.common.hash.BloomFilter; +import com.google.common.hash.PrimitiveSink; +import jakarta.annotation.Nonnull; +import java.util.Map; +import java.util.function.Predicate; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.maintenance.impl.ScanHandler.RetainCheck; +import org.jspecify.annotations.NonNull; + +/** + * Collects reference names and objects to retain. + * + *

The implementation uses bloom-filters to limit the heap usage when a huge number of + * references/objects is being used. + */ +@SuppressWarnings("UnstableApiUsage") +final class AllRetained { + + /** + * Some "salt" to make the bloom filters non-deterministic, in case there are false-positives, to + * reduce the number of false positives over time. + */ + private final int salt; + + // @NonNull is the jspecify variant, which allows type-usage. + // Jakarta's @Nonnull does not allow type-usage. + private final BloomFilter> refsFilter; + private final BloomFilter> objsFilter; + private long refAdds; + private long objAdds; + + AllRetained(long expectedReferenceCount, long expectedObjCount, double fpp, int salt) { + this.salt = salt; + this.refsFilter = BloomFilter.create(this::refFunnel, expectedReferenceCount, fpp); + this.objsFilter = BloomFilter.create(this::objFunnel, expectedObjCount, fpp); + } + + private void refFunnel(Map.Entry realmRef, @Nonnull PrimitiveSink primitiveSink) { + primitiveSink.putInt(salt); + primitiveSink.putUnencodedChars(realmRef.getKey()); + primitiveSink.putUnencodedChars(realmRef.getValue()); + } + + private void objFunnel(Map.Entry realmObj, @Nonnull PrimitiveSink primitiveSink) { + primitiveSink.putInt(salt); + primitiveSink.putUnencodedChars(realmObj.getKey()); + var id = realmObj.getValue(); + primitiveSink.putLong(id); + } + + void addRetainedRef(String realm, String ref) { + refsFilter.put(entry(realm, ref)); + refAdds++; + } + + void addRetainedObj(String realm, long id) { + objsFilter.put(entry(realm, id)); + objAdds++; + } + + /** The number of {@link #addRetainedRef(String, String)} invocations. */ + long refAdds() { + return refAdds; + } + + /** The number of {@link #addRetainedObj(String, long)} invocations. */ + long objAdds() { + return objAdds; + } + + boolean withinExpectedFpp(double expectedFpp) { + return refsFilter.expectedFpp() < expectedFpp && objsFilter.expectedFpp() < expectedFpp; + } + + RetainCheck referenceRetainCheck() { + return (realm, ref) -> refsFilter.mightContain(entry(realm, ref)); + } + + RetainCheck objRetainCheck(Predicate objTypeIdPredicate) { + return (realm, id) -> + objTypeIdPredicate.test(id.type()) || objsFilter.mightContain(entry(realm, id.id())); + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunObj.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunObj.java new file mode 100644 index 0000000000..6ce50e348b --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunObj.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.util.function.LongSupplier; +import org.apache.polaris.immutables.PolarisImmutable; +import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType; +import org.apache.polaris.persistence.nosql.api.obj.Obj; +import org.apache.polaris.persistence.nosql.api.obj.ObjType; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation; + +/** + * Holds information about one maintenance run. + * + *

This object is eventually overwritten with the final result of the maintenance run. + */ +@PolarisImmutable +@JsonSerialize(as = ImmutableMaintenanceRunObj.class) +@JsonDeserialize(as = ImmutableMaintenanceRunObj.class) +public interface MaintenanceRunObj extends Obj { + + ObjType TYPE = new MaintenanceRunObjType(); + + @Override + default ObjType type() { + return TYPE; + } + + MaintenanceRunInformation runInformation(); + + static ImmutableMaintenanceRunObj.Builder builder() { + return ImmutableMaintenanceRunObj.builder(); + } + + final class MaintenanceRunObjType extends AbstractObjType { + public MaintenanceRunObjType() { + super("mtr", "Maintenance Run", MaintenanceRunObj.class); + } + + @Override + public long cachedObjectExpiresAtMicros(Obj obj, LongSupplier clockMicros) { + var mo = (MaintenanceRunObj) obj; + if (mo.runInformation().finished().isPresent()) { + return CACHE_UNLIMITED; + } + return NOT_CACHED; + } + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunsObj.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunsObj.java new file mode 100644 index 0000000000..ca476351d7 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunsObj.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.polaris.immutables.PolarisImmutable; +import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType; +import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.api.obj.ObjType; + +@PolarisImmutable +@JsonSerialize(as = ImmutableMaintenanceRunsObj.class) +@JsonDeserialize(as = ImmutableMaintenanceRunsObj.class) +public interface MaintenanceRunsObj extends BaseCommitObj { + + String MAINTENANCE_RUNS_REF_NAME = "maintenance-runs"; + + ObjType TYPE = new MaintenanceRunsObjType(); + + static ImmutableMaintenanceRunsObj.Builder builder() { + return ImmutableMaintenanceRunsObj.builder(); + } + + /** + * The ID of the object holding the maintenance run information. + * + *

The {@linkplain MaintenanceRunObj#runInformation() maintenance run information} is + * not included in this object, because {@link MaintenanceRunObj} is initially written as + * "currently running" and then updated with the final state of the maintenance run. Updating the + * {@link MaintenanceRunObj} is not great but okay, but updating a {@link BaseCommitObj} is an + * absolute no-go. + */ + ObjRef maintenanceRunId(); + + @Override + default ObjType type() { + return TYPE; + } + + final class MaintenanceRunsObjType extends AbstractObjType { + public MaintenanceRunsObjType() { + super("mtrs", "Maintenance Runs", MaintenanceRunsObj.class); + } + } + + interface Builder extends BaseCommitObj.Builder {} +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceImpl.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceImpl.java new file mode 100644 index 0000000000..897156b184 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceImpl.java @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.polaris.persistence.nosql.api.Realms.SYSTEM_REALM_ID; +import static org.apache.polaris.persistence.nosql.api.Realms.SYSTEM_REALM_PREFIX; +import static org.apache.polaris.persistence.nosql.api.backend.PersistId.persistId; +import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef; +import static org.apache.polaris.persistence.nosql.api.obj.ObjTypes.objTypeById; +import static org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER; +import static org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_DELETE_BATCH_SIZE; +import static org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_INITIALIZED_FPP; +import static org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_MAX_ACCEPTABLE_FPP; +import static org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunsObj.MAINTENANCE_RUNS_REF_NAME; +import static org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.ACTIVE; +import static org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.INACTIVE; +import static org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.PURGED; +import static org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.PURGING; + +import com.google.common.collect.Streams; +import com.google.common.math.LongMath; +import jakarta.annotation.Nonnull; +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.math.RoundingMode; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.polaris.ids.api.MonotonicClock; +import org.apache.polaris.persistence.nosql.api.Persistence; +import org.apache.polaris.persistence.nosql.api.RealmPersistenceFactory; +import org.apache.polaris.persistence.nosql.api.SystemPersistence; +import org.apache.polaris.persistence.nosql.api.backend.Backend; +import org.apache.polaris.persistence.nosql.api.commit.Committer; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.api.obj.ObjTypes; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunSpec; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceService; +import org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier; +import org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier; +import org.apache.polaris.persistence.nosql.realms.api.RealmDefinition; +import org.apache.polaris.persistence.nosql.realms.api.RealmExpectedStateMismatchException; +import org.apache.polaris.persistence.nosql.realms.api.RealmManagement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retrieve an instance of this class by adding the {@code + * polaris-persistence-nosql-maintenance-impl} artifact to the runtime class path and then use CDI + * via {@code @Inject MaintenanceService} to access it. + */ +@ApplicationScoped +class MaintenanceServiceImpl implements MaintenanceService { + private static final Logger LOGGER = LoggerFactory.getLogger(MaintenanceServiceImpl.class); + + static final int MIN_GRACE_TIME_MINUTES = 5; + + private static final long MIN_GRACE_TIME_MICROS = + TimeUnit.MINUTES.toMicros(MIN_GRACE_TIME_MINUTES); + + private final Backend backend; + private final Persistence systemPersistence; + private final Committer committer; + private final RealmPersistenceFactory realmPersistenceFactory; + private final RealmManagement realmManagement; + private final List perRealmRetainedIdentifiers; + private final Map> objTypeRetainedIdentifiers; + private final MaintenanceConfig maintenanceConfig; + private final MonotonicClock monotonicClock; + + @SuppressWarnings("CdiInjectionPointsInspection") + @Inject + MaintenanceServiceImpl( + Backend backend, + @SystemPersistence Persistence systemPersistence, + RealmPersistenceFactory realmPersistenceFactory, + RealmManagement realmManagement, + Instance realmRetainedIdentifiers, + Instance objTypeRetainedIdentifiers, + MaintenanceConfig maintenanceConfig, + MonotonicClock monotonicClock) { + checkArgument( + SYSTEM_REALM_ID.equals(systemPersistence.realmId()), + "Realms management must happen in the %s realm", + SYSTEM_REALM_ID); + + this.backend = backend; + this.systemPersistence = systemPersistence; + this.realmManagement = realmManagement; + this.realmPersistenceFactory = realmPersistenceFactory; + this.maintenanceConfig = maintenanceConfig; + this.monotonicClock = monotonicClock; + + this.perRealmRetainedIdentifiers = realmRetainedIdentifiers.stream().toList(); + this.objTypeRetainedIdentifiers = + objTypeRetainedIdentifiers.stream() + .collect(Collectors.groupingBy(oti -> oti.handledObjType().id())); + this.committer = + systemPersistence.createCommitter( + MAINTENANCE_RUNS_REF_NAME, MaintenanceRunsObj.class, MaintenanceRunObj.class); + + maintenanceServiceReport(); + } + + @PostConstruct + void init() { + // Do this in a @PostConstruct method as it involves I/O, which isn't a good thing to do in a + // constructor, especially in CDI + systemPersistence.createReferenceSilent(MAINTENANCE_RUNS_REF_NAME); + } + + @Override + @Nonnull + public List maintenanceRunLog() { + var runIds = + Streams.stream( + systemPersistence + .commits() + .commitLog( + MAINTENANCE_RUNS_REF_NAME, OptionalLong.empty(), MaintenanceRunsObj.class)) + .filter(Objects::nonNull) + .limit(maintenanceConfig.retainedRuns().orElse(MaintenanceConfig.DEFAULT_RETAINED_RUNS)) + .map(MaintenanceRunsObj::maintenanceRunId) + .toArray(ObjRef[]::new); + return Stream.of(systemPersistence.fetchMany(MaintenanceRunObj.class, runIds)) + .filter(Objects::nonNull) + .map(MaintenanceRunObj::runInformation) + .toList(); + } + + @Nonnull + @Override + public MaintenanceRunSpec buildMaintenanceRunSpec() { + try (var realms = realmManagement.list()) { + var specBuilder = MaintenanceRunSpec.builder(); + realms.forEach( + realm -> { + switch (realm.status()) { + case CREATED, INITIALIZING, LOADING, PURGED -> { + // Don't handle these states, those are either final or known to contain + // inconsistent data + } + case PURGING -> specBuilder.addRealmsToPurge(realm.id()); + case ACTIVE, INACTIVE -> specBuilder.addRealmsToProces(realm.id()); + default -> + throw new IllegalStateException( + "Unexpected realm status " + realm.status() + " for realm " + realm.id()); + } + }); + return specBuilder.build(); + } + } + + @Override + @Nonnull + public MaintenanceRunInformation performMaintenance( + @Nonnull MaintenanceRunSpec maintenanceRunSpec) { + LOGGER.info( + "Triggering maintenance run with {} realms to purge and {} realms to process", + maintenanceRunSpec.realmsToPurge().size(), + maintenanceRunSpec.realmsToProcess().size()); + + checkArgument( + maintenanceRunSpec.realmsToPurge().stream() + .noneMatch(maintenanceRunSpec.realmsToProcess()::contains) + && maintenanceRunSpec.realmsToProcess().stream() + .noneMatch(maintenanceRunSpec.realmsToPurge()::contains), + "No realm ID must be included in both the set of realms to process and the set of realms to purge"); + checkArgument( + Stream.concat( + maintenanceRunSpec.realmsToPurge().stream(), + maintenanceRunSpec.realmsToProcess().stream()) + .noneMatch(id -> id.startsWith(SYSTEM_REALM_PREFIX)), + "System realm IDs must not be present in the maintenance run specification"); + + var config = maintenanceConfig; + checkConfig(config); + + // TODO follow-up: some safeguard that checks the run-log for an unfinished run, outside of this + // function! + + var allRetained = constructAllRetained(config); + + var runObj = initMaintenanceRunObj(); + var runInfo = MaintenanceRunInformation.builder().from(runObj.runInformation()); + + var maxCreatedAtMicros = calcMaxCreatedAtMicros(config); + + var description = new StringWriter(); + var descriptionWriter = new PrintWriter(description); + + var info = (MaintenanceRunInformation) null; + try { + try { + var realmsToProcess = processRealms(maintenanceRunSpec, allRetained, descriptionWriter); + var realmsToPurge = purgeRealms(maintenanceRunSpec, descriptionWriter); + + if (maintenanceRunSpec.includeSystemRealm()) { + realmsToProcess.add(SYSTEM_REALM_ID); + identifyAgainstRealm(SYSTEM_REALM_ID, allRetained); + } + + if (!maintenanceRunSpec.realmsToPurge().isEmpty() && backend.supportsRealmDeletion()) { + LOGGER.info( + "Purging realms {} directly against the backend database...", + String.join(", ", maintenanceRunSpec.realmsToPurge())); + backend.deleteRealms(maintenanceRunSpec.realmsToPurge()); + runInfo.purgedRealms(maintenanceRunSpec.realmsToPurge().size()); + } else { + runInfo.purgedRealms(0); + } + + var seenRealmsToPurge = new HashSet(); + var expectFpp = config.maxAcceptableFilterFpp().orElse(DEFAULT_MAX_ACCEPTABLE_FPP); + + var refStats = new AbstractScanItemStatsCollector.ScanRefStatsCollector(); + var objStats = new AbstractScanItemStatsCollector.ScanObjStatsCollector(); + + // Ensures that objects with unknown obj-types do not get purged. + // The assumption here is that if the obj-type is not known, there's also no + // ObjTypeRetainedIdentifier/RealmRetainedIdentifier, which could handle these object types. + // As a follow-up, it might be appropriate to add an advanced configuration option to define + // the object types that shall be purged. + // Even if the obj-type is unknown, to clean up after an extension/plugin that used those + // object-types is no longer being used. + var nonGenericObjTypeIds = ObjTypes.nonGenericObjTypes().keySet(); + var objTypeIdPredicate = + (Predicate) objTypeId -> !nonGenericObjTypeIds.contains(objTypeId); + + var canDelete = allRetained.withinExpectedFpp(expectFpp); + try (var refHandler = + new ScanHandler<>( + "reference", + config.referenceScanRateLimitPerSecond(), + maxCreatedAtMicros, + realmsToProcess, + realmsToPurge, + seenRealmsToPurge::add, + allRetained.referenceRetainCheck(), + config.deleteBatchSize().orElse(DEFAULT_DELETE_BATCH_SIZE), + realmRefs -> { + if (canDelete) { + backend.batchDeleteRefs(realmRefs); + } + }, + refStats); + var objHandler = + new ScanHandler<>( + "object", + config.objectScanRateLimitPerSecond(), + maxCreatedAtMicros, + realmsToProcess, + realmsToPurge, + seenRealmsToPurge::add, + allRetained.objRetainCheck(objTypeIdPredicate), + config.deleteBatchSize().orElse(DEFAULT_DELETE_BATCH_SIZE), + realmObjs -> { + if (canDelete) { + backend.batchDeleteObjs( + realmObjs.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> + e.getValue().stream() + .map(oid -> persistId(oid.id(), oid.numParts())) + .collect(Collectors.toSet())))); + } + }, + objStats)) { + + LOGGER.info("Start scanning backend database, too-new: {} ...", maxCreatedAtMicros); + backend.scanBackend( + refHandler.asReferenceScanCallback(monotonicClock::currentTimeMillis), + objHandler.asObjScanCallback(monotonicClock::currentTimeMillis)); + + LOGGER.info("Finished scanning backend database"); + + runInfo + .referenceStats(refStats.stats.toMaintenanceStats()) + .objStats(objStats.stats.toMaintenanceStats()) + .perRealmPerObjTypeStats(objStats.toRealmObjTypeStatsMap()) + .perRealmReferenceStats(refStats.toRealmObjTypeStatsMap()) + .identifiedObjs(allRetained.objAdds()) + .identifiedReferences(allRetained.refAdds()); + + if (!canDelete) { + var warn = + "Maintenance run finished but did not purge all unreferenced objects, " + + "because the probabilistic filter was not properly sized. " + + "The next maintenance run will be sized according to current run."; + if (!seenRealmsToPurge.isEmpty()) { + warn += + format("\nRealms %s NOT marked as purged.", String.join(", ", seenRealmsToPurge)); + } + descriptionWriter.println(warn); + LOGGER.warn(warn); + } + } + + if (canDelete) { + updateRealmsAsPurged(maintenanceRunSpec, seenRealmsToPurge); + } + + runInfo.success(true); + + LOGGER.info("Maintenance run completed successfully"); + } catch (Exception e) { + LOGGER.info("Maintenance run failed", e); + + runInfo.success(false).statusMessage("Maintenance run did not finish successfully."); + + // Add stack trace as detailed information + descriptionWriter.printf("FAILURE:%n"); + e.printStackTrace(descriptionWriter); + } finally { + descriptionWriter.flush(); + runInfo + .finished(monotonicClock.currentInstant()) + .detailedInformation(description.toString()); + info = runInfo.build(); + } + } finally { + LOGGER.info("Persisting maintenance result {}", info); + systemPersistence.write( + MaintenanceRunObj.builder().from(runObj).runInformation(info).build(), + MaintenanceRunObj.class); + } + + return info; + } + + private void updateRealmsAsPurged( + MaintenanceRunSpec maintenanceRunSpec, HashSet seenRealmsToPurge) { + // Update the realm status of the realms that were specified to be purged as `PURGED` if no + // data for those realms has been seen. + maintenanceRunSpec.realmsToPurge().stream() + .filter(r -> !seenRealmsToPurge.contains(r)) + .map(realmManagement::get) + .filter(Optional::isPresent) + .map(Optional::get) + .forEach( + purgedRealm -> { + try { + realmManagement.update( + purgedRealm, + RealmDefinition.builder().from(purgedRealm).status(PURGED).build()); + } catch (RealmExpectedStateMismatchException e) { + // ignore, do the state transition during the next maintenance run + } + }); + } + + private void maintenanceServiceReport() { + LOGGER.info("Using {} realm retained identifiers", perRealmRetainedIdentifiers.size()); + perRealmRetainedIdentifiers.forEach( + realmRetainedIdentifier -> + LOGGER.info("Realm retained identifier: {}", realmRetainedIdentifier.name())); + LOGGER.info( + "Using {} object type identifiers:", + objTypeRetainedIdentifiers.values().stream().mapToInt(List::size).sum()); + objTypeRetainedIdentifiers.forEach( + (type, idents) -> { + LOGGER.info( + "Using {} identifiers for object type '{}' {}", + idents.size(), + type, + objTypeById(type).name()); + idents.forEach( + objTypeRetainedIdentifier -> + LOGGER.info( + "Object type '{}' identifier: {}", type, objTypeRetainedIdentifier.name())); + }); + } + + private Set processRealms( + MaintenanceRunSpec maintenanceRunSpec, + AllRetained allRetained, + PrintWriter descriptionWriter) { + var realmsToProcess = new HashSet(); + for (var realmId : maintenanceRunSpec.realmsToProcess()) { + var currentRealmStatus = + realmManagement.get(realmId).map(RealmDefinition::status).orElse(ACTIVE); + if ((currentRealmStatus == ACTIVE || currentRealmStatus == INACTIVE) + && identifyAgainstRealm(realmId, allRetained)) { + realmsToProcess.add(realmId); + } + } + maintenanceRunSpec.realmsToProcess().stream() + .filter(r -> !realmsToProcess.contains(r)) + .forEach( + r -> { + var msg = + format( + "No realm retained identifier was able to handle the realm '%s' or the realm is not in status ACTIVE or INACTIVE, no references or objects will be purged from this realm.", + r); + descriptionWriter.println(msg); + LOGGER.warn(msg); + }); + return realmsToProcess; + } + + private Set purgeRealms( + MaintenanceRunSpec maintenanceRunSpec, PrintWriter descriptionWriter) { + var realmsToPurge = new HashSet(); + for (var realmId : maintenanceRunSpec.realmsToPurge()) { + var currentRealmStatus = + realmManagement.get(realmId).map(RealmDefinition::status).orElse(PURGED); + if (currentRealmStatus == PURGING || currentRealmStatus == PURGED) { + realmsToPurge.add(realmId); + } + } + maintenanceRunSpec.realmsToPurge().stream() + .filter(r -> !realmsToPurge.contains(r)) + .forEach( + r -> { + var msg = + format( + "The realm '%s' is not in state PURGING, will therefore not be purged.", r); + descriptionWriter.println(msg); + LOGGER.warn(msg); + }); + return realmsToPurge; + } + + private long calcMaxCreatedAtMicros(MaintenanceConfig effectiveConfig) { + var now = monotonicClock.currentTimeMicros(); + var grace = + effectiveConfig + .createdAtGraceTime() + .map( + d -> { + var micros = SECONDS.toMicros(d.toSeconds()); + micros += TimeUnit.NANOSECONDS.toMicros(d.toNanosPart()); + return Math.max(micros, MIN_GRACE_TIME_MICROS); + }) + .orElse(MIN_GRACE_TIME_MICROS); + return now - grace; + } + + private AllRetained constructAllRetained(MaintenanceConfig effectiveConfig) { + var expectedReferenceCount = + effectiveConfig + .expectedReferenceCount() + .orElse(MaintenanceConfig.DEFAULT_EXPECTED_REFERENCE_COUNT); + var expectedObjCount = + effectiveConfig.expectedObjCount().orElse(MaintenanceConfig.DEFAULT_EXPECTED_OBJ_COUNT); + + for (var lastRunIter = + Streams.stream( + systemPersistence + .commits() + .commitLog( + MAINTENANCE_RUNS_REF_NAME, + OptionalLong.empty(), + MaintenanceRunsObj.class)) + .filter(Objects::nonNull) + .map(r -> systemPersistence.fetch(r.maintenanceRunId(), MaintenanceRunObj.class)) + .filter(Objects::nonNull) + .map(MaintenanceRunObj::runInformation) + .iterator(); + lastRunIter.hasNext(); ) { + var ri = lastRunIter.next(); + var refs = + Math.max( + ri.referenceStats().map(st -> st.scanned().orElse(0L)).orElse(0L), + ri.identifiedReferences().orElse(0L)); + var objs = + Math.max( + ri.objStats().map(st -> st.scanned().orElse(0L)).orElse(0L), + ri.identifiedObjs().orElse(0L)); + if (refs == 0L || objs == 0L) { + continue; + } + if (refs > expectedReferenceCount) { + // Add 10% to account for newly created references + expectedReferenceCount = + (long) + (refs + * effectiveConfig + .countFromLastRunMultiplier() + .orElse(DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER)); + } + if (objs > expectedObjCount) { + // Add 10% to account for newly created objects + expectedObjCount = + (long) + (objs + * effectiveConfig + .countFromLastRunMultiplier() + .orElse(DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER)); + } + } + + expectedReferenceCount = Math.max(expectedReferenceCount, 1_000); + expectedObjCount = Math.max(expectedObjCount, 100_000); + var configFpp = effectiveConfig.filterInitializedFpp().orElse(DEFAULT_INITIALIZED_FPP); + LOGGER.info( + "Sized retained collector for {} references and {} objects with an fpp of {}, approximate bloom filter heap sizes: {} and {} bytes", + expectedReferenceCount, + expectedObjCount, + configFpp, + bloomFilterBytes(expectedReferenceCount, configFpp), + bloomFilterBytes(expectedObjCount, configFpp)); + + return new AllRetained( + expectedReferenceCount, expectedObjCount, configFpp, ThreadLocalRandom.current().nextInt()); + } + + private static final double LOG2_SQUARED = Math.log(2) * Math.log(2); + + static long bloomFilterBytes(long elements, double fpp) { + var bits = (long) (-elements * Math.log(fpp) / LOG2_SQUARED); + return LongMath.divide(bits, 64, RoundingMode.CEILING); + } + + private boolean identifyAgainstRealm(String realmId, AllRetained allRetained) { + LOGGER.info("Identifying referenced data in realm '{}'", realmId); + + var pers = realmPersistenceFactory.newBuilder().realmId(realmId).build(); + var collector = new RetainedCollectorImpl(pers, allRetained, objTypeRetainedIdentifiers); + + boolean any = false; + for (var realmRetainedIdentifier : perRealmRetainedIdentifiers) { + LOGGER.info( + "Running maintenance for realm '{}' via '{}'", realmId, realmRetainedIdentifier.name()); + var handled = realmRetainedIdentifier.identifyRetained(collector); + LOGGER.info( + "Realm identifier '{}' {} {}", + realmRetainedIdentifier.name(), + handled ? "handled" : "did not handle", + realmId); + any |= handled; + } + return any; + } + + private MaintenanceRunObj initMaintenanceRunObj() { + return committer + .commitRuntimeException( + (state, refObjSupplier) -> { + var refObj = refObjSupplier.get(); + var res = MaintenanceRunsObj.builder(); + + var ro = + MaintenanceRunObj.builder() + .id(systemPersistence.generateId()) + .runInformation( + MaintenanceRunInformation.builder() + .started(monotonicClock.currentInstant()) + .build()) + .build(); + res.maintenanceRunId(objRef(ro)); + + state.writeIfNew("ro", ro); + + return state.commitResult(ro, res, refObj); + }) + .orElseThrow(); + } + + private static void checkConfig(MaintenanceConfig config) { + config + .retainedRuns() + .ifPresent(v -> checkArgument(v > 1, "Number of maintenance runs must be at least 2")); + config + .expectedReferenceCount() + .ifPresent( + v -> + checkArgument( + v >= MaintenanceConfig.DEFAULT_EXPECTED_REFERENCE_COUNT, + "Expected reference count runs must be greater than or equal to %s", + MaintenanceConfig.DEFAULT_EXPECTED_REFERENCE_COUNT)); + config + .expectedObjCount() + .ifPresent( + v -> + checkArgument( + v >= MaintenanceConfig.DEFAULT_EXPECTED_OBJ_COUNT, + "Expected object count runs must be greater than or equal to %s", + MaintenanceConfig.DEFAULT_EXPECTED_OBJ_COUNT)); + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceRealmRetainedIdentifier.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceRealmRetainedIdentifier.java new file mode 100644 index 0000000000..568e0ed86a --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceRealmRetainedIdentifier.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import static org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_RETAINED_RUNS; +import static org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunsObj.MAINTENANCE_RUNS_REF_NAME; + +import jakarta.annotation.Nonnull; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig; +import org.apache.polaris.persistence.nosql.maintenance.spi.CountDownPredicate; +import org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier; +import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector; + +/** Retained-identifier for the maintenance service's own reference and objects. */ +@SuppressWarnings("CdiInjectionPointsInspection") +@ApplicationScoped +class MaintenanceServiceRealmRetainedIdentifier implements PerRealmRetainedIdentifier { + @Inject MaintenanceConfig maintenanceConfig; + + @Override + public String name() { + return "Maintenance service"; + } + + @Override + public boolean identifyRetained(@Nonnull RetainedCollector collector) { + if (!collector.isSystemRealm()) { + return false; + } + + collector.refRetain( + MAINTENANCE_RUNS_REF_NAME, + MaintenanceRunsObj.class, + new CountDownPredicate<>(maintenanceConfig.retainedRuns().orElse(DEFAULT_RETAINED_RUNS)), + maintenance -> collector.retainObject(maintenance.maintenanceRunId())); + + return true; + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RateLimit.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RateLimit.java new file mode 100644 index 0000000000..bd525f6b73 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RateLimit.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import com.google.common.util.concurrent.RateLimiter; + +interface RateLimit { + void acquire(); + + @SuppressWarnings("UnstableApiUsage") + static RateLimit create(int ratePerSecond) { + if (ratePerSecond <= 0 || ratePerSecond == Integer.MAX_VALUE) { + return new RateLimit() { + @Override + public void acquire() {} + + @Override + public String toString() { + return "unlimited"; + } + }; + } + return new RateLimit() { + final RateLimiter limiter = RateLimiter.create(ratePerSecond); + + @Override + public void acquire() { + limiter.acquire(); + } + + @Override + public String toString() { + return "up to " + ratePerSecond; + } + }; + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RetainedCollectorImpl.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RetainedCollectorImpl.java new file mode 100644 index 0000000000..6590de77fd --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RetainedCollectorImpl.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Collections.emptyIterator; +import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef; + +import com.google.common.collect.AbstractIterator; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.polaris.ids.api.IdGenerator; +import org.apache.polaris.ids.api.MonotonicClock; +import org.apache.polaris.persistence.nosql.api.Persistence; +import org.apache.polaris.persistence.nosql.api.PersistenceParams; +import org.apache.polaris.persistence.nosql.api.commit.Commits; +import org.apache.polaris.persistence.nosql.api.commit.Committer; +import org.apache.polaris.persistence.nosql.api.exceptions.ReferenceAlreadyExistsException; +import org.apache.polaris.persistence.nosql.api.exceptions.ReferenceNotFoundException; +import org.apache.polaris.persistence.nosql.api.index.Index; +import org.apache.polaris.persistence.nosql.api.index.IndexContainer; +import org.apache.polaris.persistence.nosql.api.index.IndexValueSerializer; +import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex; +import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj; +import org.apache.polaris.persistence.nosql.api.obj.Obj; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.api.obj.ObjType; +import org.apache.polaris.persistence.nosql.api.ref.Reference; +import org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier; +import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector; + +/** {@link RetainedCollector} implementation, per realm. */ +final class RetainedCollectorImpl implements Persistence, RetainedCollector { + private final Persistence persistence; + private final AllRetained allRetained; + private final String realmId; + private final Map> objTypeRetainedIdentifiers; + + private final Set currentNesting = new HashSet<>(); + + RetainedCollectorImpl( + Persistence persistence, + AllRetained allRetained, + Map> objTypeRetainedIdentifiers) { + this.persistence = persistence; + this.allRetained = allRetained; + this.realmId = persistence.realmId(); + this.objTypeRetainedIdentifiers = objTypeRetainedIdentifiers; + } + + @Nonnull + @Override + public String realm() { + return realmId; + } + + @Nonnull + @Override + public Persistence realmPersistence() { + return this; + } + + @Override + public void retainObject(@Nonnull ObjRef objRef) { + if (!currentNesting.add(objRef.id())) { + return; + } + try { + allRetained.addRetainedObj(realmId, objRef.id()); + + var otIdents = objTypeRetainedIdentifiers.get(objRef.type()); + if (otIdents != null) { + for (var otIdent : otIdents) { + otIdent.identifyRelatedObj(this, objRef); + } + } + } finally { + currentNesting.remove(objRef.id()); + } + } + + @Override + public void retainReference(@Nonnull String name) { + allRetained.addRetainedRef(realmId, name); + } + + // Persistence delegate + + @Nonnull + @Override + public Reference createReference(@Nonnull String name, @Nonnull Optional pointer) + throws ReferenceAlreadyExistsException { + retainReference(name); + pointer.ifPresent(this::retainObject); + return persistence.createReference(name, pointer); + } + + @Override + public void createReferenceSilent(@Nonnull String name) { + retainReference(name); + persistence.createReferenceSilent(name); + } + + @Override + public void createReferencesSilent(Set referenceNames) { + referenceNames.forEach(this::retainReference); + persistence.createReferencesSilent(referenceNames); + } + + @Nonnull + @Override + public Reference fetchOrCreateReference( + @Nonnull String name, @Nonnull Supplier> pointerForCreate) { + try { + return fetchReference(name); + } catch (ReferenceNotFoundException e) { + try { + var objRef = pointerForCreate.get(); + objRef.ifPresent(this::retainObject); + return createReference(name, objRef); + } catch (ReferenceAlreadyExistsException x) { + // Unlikely that we ever get here (ref does not exist (but then concurrently created) + return fetchReference(name); + } + } + } + + @Nonnull + @Override + public Optional updateReferencePointer( + @Nonnull Reference reference, @Nonnull ObjRef newPointer) throws ReferenceNotFoundException { + retainReference(reference.name()); + retainObject(newPointer); + return persistence.updateReferencePointer(reference, newPointer); + } + + @Nonnull + @Override + public Reference fetchReference(@Nonnull String name) throws ReferenceNotFoundException { + retainReference(name); + var ref = persistence.fetchReference(name); + ref.pointer().ifPresent(this::retainObject); + return ref; + } + + @Nonnull + @Override + public Reference fetchReferenceForUpdate(@Nonnull String name) throws ReferenceNotFoundException { + retainReference(name); + var ref = persistence.fetchReferenceForUpdate(name); + ref.pointer().ifPresent(this::retainObject); + return ref; + } + + @Override + public Optional fetchReferenceHead( + @Nonnull String name, @Nonnull Class clazz) throws ReferenceNotFoundException { + retainReference(name); + var ref = persistence.fetchReferenceHead(name, clazz); + ref.ifPresent(head -> retainObject(objRef(head))); + return ref; + } + + @Nullable + @Override + public T fetch(@Nonnull ObjRef id, @Nonnull Class clazz) { + retainObject(id); + return persistence.fetch(id, clazz); + } + + @Nonnull + @Override + public T[] fetchMany(@Nonnull Class clazz, @Nonnull ObjRef... ids) { + for (var id : ids) { + if (id != null) { + retainObject(id); + } + } + return persistence.fetchMany(clazz, ids); + } + + @Nonnull + @Override + public T write(@Nonnull T obj, @Nonnull Class clazz) { + retainObject(objRef(obj)); + return persistence.write(obj, clazz); + } + + @SafeVarargs + @Nonnull + @Override + public final T[] writeMany(@Nonnull Class clazz, @Nonnull T... objs) { + for (var obj : objs) { + if (obj != null) { + retainObject(objRef(obj)); + } + } + return persistence.writeMany(clazz, objs); + } + + @Override + public void delete(@Nonnull ObjRef id) { + persistence.delete(id); + } + + @Override + public void deleteMany(@Nonnull ObjRef... ids) { + persistence.deleteMany(ids); + } + + @Nullable + @Override + public T conditionalInsert(@Nonnull T obj, @Nonnull Class clazz) { + retainObject(objRef(obj)); + return persistence.conditionalInsert(obj, clazz); + } + + @Nullable + @Override + public T conditionalUpdate( + @Nonnull T expected, @Nonnull T update, @Nonnull Class clazz) { + retainObject(objRef(update)); + return persistence.conditionalUpdate(expected, update, clazz); + } + + @Override + public boolean conditionalDelete(@Nonnull T expected, Class clazz) { + retainObject(objRef(expected)); + return persistence.conditionalDelete(expected, clazz); + } + + @Override + public PersistenceParams params() { + return persistence.params(); + } + + @Override + public int maxSerializedValueSize() { + return persistence.maxSerializedValueSize(); + } + + @Override + public long generateId() { + return persistence.generateId(); + } + + @Override + public ObjRef generateObjId(ObjType type) { + return persistence.generateObjId(type); + } + + @Nullable + @Override + public T getImmediate(@Nonnull ObjRef id, @Nonnull Class clazz) { + retainObject(id); + return persistence.getImmediate(id, clazz); + } + + @Override + public String realmId() { + return persistence.realmId(); + } + + @Override + public MonotonicClock monotonicClock() { + return persistence.monotonicClock(); + } + + @Override + public IdGenerator idGenerator() { + return persistence.idGenerator(); + } + + @Override + public UpdatableIndex buildWriteIndex( + @Nullable IndexContainer indexContainer, + @Nonnull IndexValueSerializer indexValueSerializer) { + return persistence.buildWriteIndex(indexContainer, indexValueSerializer); + } + + @Override + public Index buildReadIndex( + @Nullable IndexContainer indexContainer, + @Nonnull IndexValueSerializer indexValueSerializer) { + return persistence.buildReadIndex(indexContainer, indexValueSerializer); + } + + @Override + public Committer createCommitter( + @Nonnull String refName, + @Nonnull Class referencedObjType, + @Nonnull Class resultType) { + throw new UnsupportedOperationException( + "Committing operations not supported during retained-objects identification"); + } + + @Override + public Commits commits() { + return new Commits() { + @Override + public Iterator commitLog( + String refName, OptionalLong offset, Class clazz) { + checkArgument( + offset.isEmpty(), "Commit offset must be empty during retained-objects identification"); + + var ref = fetchReference(refName); + + return ref.pointer() + .map( + head -> + (Iterator) + new AbstractIterator() { + private ObjRef next = head; + + @Override + protected C computeNext() { + if (next == null) { + return endOfData(); + } + var r = fetch(next, clazz); + if (r == null) { + return endOfData(); + } + next = r.directParent().orElse(null); + return r; + } + }) + .orElse(emptyIterator()); + } + + @Override + public Iterator commitLogReversed( + String refName, long offset, Class clazz) { + throw new UnsupportedOperationException( + "Reversed commit scanning not supported during retained-objects identification"); + } + }; + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanHandler.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanHandler.java new file mode 100644 index 0000000000..502ee34924 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanHandler.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import static org.apache.polaris.persistence.nosql.api.Realms.SYSTEM_REALM_PREFIX; +import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef; + +import jakarta.annotation.Nonnull; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.LongSupplier; +import org.apache.polaris.persistence.nosql.api.backend.Backend; +import org.apache.polaris.persistence.nosql.api.backend.PersistId; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ScanHandler implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(ScanHandler.class); + + final String name; + final RateLimit rateLimit; + final long maxCreatedAtMicros; + final Set realmsToRetain; + final Set realmsToPurge; + final Consumer seenRealmsToPurge; + final RetainCheck retainCheck; + final int deleteBatchSize; + final Consumer>> batchDelete; + final ScanItemCallback itemCallback; + + final Map> deletions = new HashMap<>(); + int numDeletes; + + ScanHandler( + String name, + OptionalInt rateLimit, + long maxCreatedAtMicros, + Set realmsToRetain, + Set realmsToPurge, + Consumer seenRealmsToPurge, + RetainCheck retainCheck, + int deleteBatchSize, + Consumer>> batchDelete, + ScanItemCallback itemCallback) { + this.name = name; + this.rateLimit = RateLimit.create(rateLimit.orElse(-1)); + this.maxCreatedAtMicros = maxCreatedAtMicros; + this.realmsToRetain = realmsToRetain; + this.realmsToPurge = realmsToPurge; + this.seenRealmsToPurge = seenRealmsToPurge; + this.retainCheck = retainCheck; + this.deleteBatchSize = deleteBatchSize; + this.batchDelete = batchDelete; + this.itemCallback = itemCallback; + } + + void scanned(String realmId, I id, long createdAtMicros) { + if (realmId.startsWith(SYSTEM_REALM_PREFIX) && !realmsToRetain.contains(realmId)) { + // some system realm, ignore + return; + } + + rateLimit.acquire(); + ScanItemOutcome outcome; + if (realmsToPurge.contains(realmId)) { + outcome = ScanItemOutcome.REALM_PURGE; + purge(realmId, id); + seenRealmsToPurge.accept(realmId); + } else if (createdAtMicros > maxCreatedAtMicros) { + outcome = ScanItemOutcome.TOO_NEW_RETAINED; + } else if (realmsToRetain.contains(realmId)) { + if (retainCheck.check(realmId, id)) { + outcome = ScanItemOutcome.RETAINED; + } else { + outcome = ScanItemOutcome.PURGED; + purge(realmId, id); + } + } else { + outcome = ScanItemOutcome.UNHANDLED_RETAINED; + } + itemCallback.itemOutcome(realmId, id, outcome); + LOGGER.debug( + "Got '{}' {} {} -> {}, createdAtMicros = {}", + realmId, + name, + id, + outcome.message, + createdAtMicros); + } + + private void purge(String realmId, I id) { + LOGGER.debug("Enqueuing delete for '{}' {}", realmId, id); + deletions.computeIfAbsent(realmId, k -> new HashSet<>()).add(id); + numDeletes++; + if (numDeletes == deleteBatchSize) { + flushDeletes(); + } + } + + private void flushDeletes() { + LOGGER.debug("Flushing {} {} deletions", numDeletes, name); + batchDelete.accept(deletions); + deletions.clear(); + numDeletes = 0; + } + + @Override + public void close() { + if (numDeletes > 0) { + flushDeletes(); + } + } + + public Backend.ObjScanCallback asObjScanCallback(LongSupplier clock) { + return new ProgressObjScanCallback(clock); + } + + public Backend.ReferenceScanCallback asReferenceScanCallback(LongSupplier clock) { + return new ProgressReferenceScanCallback(clock); + } + + @FunctionalInterface + interface RetainCheck { + boolean check(String realm, I id); + } + + private abstract static class ProgressCallback { + private final LongSupplier clock; + private long nextLog; + private long scanned; + + ProgressCallback(LongSupplier clock) { + this.clock = clock; + nextLog = clock.getAsLong() + 2_000L; + } + + protected void called(String what) { + var s = scanned++; + var now = clock.getAsLong(); + if (now >= nextLog) { + LOGGER.info("... scanned {} {} so far", s, what); + nextLog = now + 2_000L; + } + } + } + + private class ProgressReferenceScanCallback extends ProgressCallback + implements Backend.ReferenceScanCallback { + + ProgressReferenceScanCallback(LongSupplier clock) { + super(clock); + } + + @SuppressWarnings("unchecked") + @Override + public void call(@Nonnull String realmId, @Nonnull String refName, long createdAtMicros) { + called("references"); + ((ScanHandler) ScanHandler.this).scanned(realmId, refName, createdAtMicros); + } + } + + private class ProgressObjScanCallback extends ProgressCallback + implements Backend.ObjScanCallback { + ProgressObjScanCallback(LongSupplier clock) { + super(clock); + } + + @SuppressWarnings("unchecked") + @Override + public void call( + @Nonnull String realmId, + @Nonnull String type, + @Nonnull PersistId id, + long createdAtMicros) { + called("objects"); + ((ScanHandler) ScanHandler.this) + .scanned(realmId, objRef(type, id.id(), id.part()), createdAtMicros); + } + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemCallback.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemCallback.java new file mode 100644 index 0000000000..8d197cbb55 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemCallback.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import jakarta.annotation.Nonnull; + +@FunctionalInterface +interface ScanItemCallback { + void itemOutcome(@Nonnull String realm, @Nonnull I id, @Nonnull ScanItemOutcome outcome); +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemOutcome.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemOutcome.java new file mode 100644 index 0000000000..f1a1565a25 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemOutcome.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +enum ScanItemOutcome { + REALM_PURGE("realm purge"), + TOO_NEW_RETAINED("too new"), + RETAINED("retained"), + PURGED("purged"), + UNHANDLED_RETAINED("unhandled/retained"), + ; + + final String message; + + ScanItemOutcome(String message) { + this.message = message; + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java new file mode 100644 index 0000000000..9247f45baa --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Maintenance service implementation: do not directly use the types in this package. + * + *

Uses bloom filters to "collect" the references and objects to retain. The sizing of both + * filters uses the values of scanned references/objects of the last successful maintenance + * run, plus 10%. If no successful maintenance service run is present, the values of the maintenance + * configuration will be used. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/beans.xml b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000000..a297f1aa53 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType new file mode 100644 index 0000000000..e1050243b8 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunsObj$MaintenanceRunsObjType +org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunObj$MaintenanceRunObjType diff --git a/persistence/nosql/persistence/maintenance/impl/src/test/java/org/apache/polaris/persistence/nosql/maintenance/impl/TestMaintenance.java b/persistence/nosql/persistence/maintenance/impl/src/test/java/org/apache/polaris/persistence/nosql/maintenance/impl/TestMaintenance.java new file mode 100644 index 0000000000..131065817f --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/test/java/org/apache/polaris/persistence/nosql/maintenance/impl/TestMaintenance.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef; +import static org.apache.polaris.persistence.nosql.maintenance.impl.MutableMaintenanceConfig.GRACE_TIME; + +import jakarta.inject.Inject; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.polaris.ids.api.SnowflakeIdGenerator; +import org.apache.polaris.ids.mocks.MutableMonotonicClock; +import org.apache.polaris.persistence.nosql.api.Persistence; +import org.apache.polaris.persistence.nosql.api.RealmPersistenceFactory; +import org.apache.polaris.persistence.nosql.api.exceptions.ReferenceNotFoundException; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation.MaintenanceStats; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunSpec; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceService; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.jboss.weld.junit5.EnableWeld; +import org.jboss.weld.junit5.WeldInitiator; +import org.jboss.weld.junit5.WeldSetup; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@SuppressWarnings("CdiInjectionPointsInspection") +@ExtendWith(SoftAssertionsExtension.class) +@EnableWeld +public class TestMaintenance { + @InjectSoftAssertions protected SoftAssertions soft; + + @WeldSetup WeldInitiator weld = WeldInitiator.performDefaultDiscovery(); + + String realmOne; + String realmTwo; + Persistence persOne; + Persistence persTwo; + + @Inject MaintenanceService maintenance; + @Inject RealmPersistenceFactory realmPersistenceFactory; + @Inject MutableMonotonicClock mutableMonotonicClock; + + @BeforeEach + protected void setup() { + RealmIdentOne.testCallback = c -> true; + RealmIdentTwo.testCallback = c -> true; + ObjTypeIdentOne.testCallback = (c, id) -> {}; + ObjTypeIdentTwo.testCallback = (c, id) -> {}; + + // Set the "grace time" to 0 so tests can write refs+objs and get those purged + MutableMaintenanceConfig.setCurrent( + MaintenanceConfig.builder().createdAtGraceTime(GRACE_TIME).build()); + + realmOne = UUID.randomUUID().toString(); + realmTwo = UUID.randomUUID().toString(); + + // 'skipDecorators' is used to bypass the cache, which cannot be consistent after maintenance + // purged some references/objects + persOne = realmPersistenceFactory.newBuilder().realmId(realmOne).skipDecorators().build(); + persTwo = realmPersistenceFactory.newBuilder().realmId(realmTwo).skipDecorators().build(); + } + + @AfterEach + protected void cleanup() { + mutableMonotonicClock.advanceBoth(GRACE_TIME); + + // Use maintenance to clean the backend for the next test + maintenance.performMaintenance( + MaintenanceRunSpec.builder() + .includeSystemRealm(false) + .realmsToPurge(Set.of(realmOne, realmTwo)) + .build()); + } + + @Test + public void noRealmsSpecified() { + persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(), ObjOne.class); + persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(), ObjTwo.class); + + persOne.createReference("ref1", Optional.empty()); + persTwo.createReference("ref1", Optional.empty()); + + mutableMonotonicClock.advanceBoth(GRACE_TIME); + + // Run maintenance, no realm given to retain or purge, must not purge anything + var runInfo = + maintenance.performMaintenance( + MaintenanceRunSpec.builder().includeSystemRealm(false).build()); + + soft.assertThat(runInfo.referenceStats()) + .contains(MaintenanceStats.builder().scanned(2L).purged(0L).retained(2L).newer(0L).build()); + soft.assertThat(runInfo.objStats()) + .contains(MaintenanceStats.builder().scanned(2L).purged(0L).retained(2L).newer(0L).build()); + soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L); + soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(0L); + } + + @Test + public void simple() { + var rOneObj1 = + persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(), ObjOne.class); + var rTwoObj2 = + persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(), ObjTwo.class); + + var systemRealmCalled = new AtomicInteger(); + RealmIdentOne.testCallback = + collector -> { + if (collector.isSystemRealm()) { + systemRealmCalled.incrementAndGet(); + } + return true; + }; + + persOne.createReference("ref1", Optional.empty()); + persOne.createReference("ref2", Optional.empty()); + persTwo.createReference("ref1", Optional.empty()); + persTwo.createReference("ref2", Optional.empty()); + + mutableMonotonicClock.advanceBoth(GRACE_TIME); + + // Run maintenance, provide realms, must purge unidentified + var runInfo = + maintenance.performMaintenance( + MaintenanceRunSpec.builder() + .includeSystemRealm(false) + .realmsToProcess(Set.of(realmOne, realmTwo)) + .build()); + + soft.assertThat(runInfo.referenceStats()) + .contains(MaintenanceStats.builder().scanned(4L).purged(4L).retained(0L).newer(0L).build()); + soft.assertThat(runInfo.objStats()) + .contains(MaintenanceStats.builder().scanned(2L).purged(2L).retained(0L).newer(0L).build()); + soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L); + soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(0L); + + soft.assertThatExceptionOfType(ReferenceNotFoundException.class) + .isThrownBy(() -> persOne.fetchReference("ref1")); + soft.assertThatExceptionOfType(ReferenceNotFoundException.class) + .isThrownBy(() -> persOne.fetchReference("ref2")); + soft.assertThatExceptionOfType(ReferenceNotFoundException.class) + .isThrownBy(() -> persTwo.fetchReference("ref1")); + soft.assertThatExceptionOfType(ReferenceNotFoundException.class) + .isThrownBy(() -> persTwo.fetchReference("ref2")); + + soft.assertThat(persOne.fetch(objRef(rOneObj1), ObjOne.class)).isNull(); + soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull(); + } + + @Test + public void systemRealm() { + var systemRealmCalled = new AtomicInteger(); + RealmIdentOne.testCallback = + collector -> { + if (collector.isSystemRealm()) { + systemRealmCalled.incrementAndGet(); + } + return true; + }; + + mutableMonotonicClock.advanceBoth(GRACE_TIME); + + // Run maintenance, provide realms, must purge unidentified + var runInfo = + maintenance.performMaintenance( + MaintenanceRunSpec.builder() + .includeSystemRealm(true) // default + .build()); + + soft.assertThat(systemRealmCalled).hasValue(1); + + soft.assertThat(runInfo.referenceStats()) + .contains(MaintenanceStats.builder().scanned(1L).purged(0L).retained(0L).newer(1L).build()); + soft.assertThat(runInfo.objStats()) + .contains(MaintenanceStats.builder().scanned(4L).purged(0L).retained(0L).newer(4L).build()); + soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(2L); + soft.assertThat(runInfo.identifiedObjs().orElse(-1)) + .isEqualTo((1 << SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS) + 4L); + } + + @Test + public void simpleRetainViaRealmIdentifier() { + var rOneObj1 = + persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(), ObjOne.class); + var rTwoObj2 = + persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(), ObjTwo.class); + + persOne.createReference("ref1", Optional.empty()); + persOne.createReference("ref2", Optional.empty()); + persTwo.createReference("ref1", Optional.empty()); + persTwo.createReference("ref2", Optional.empty()); + + // identify rOneObj1 as "live" + RealmIdentOne.testCallback = + c -> { + if (c.realm().equals(realmOne)) { + c.retainObject(objRef(rOneObj1)); + c.retainReference("ref1"); + } + return true; + }; + + mutableMonotonicClock.advanceBoth(GRACE_TIME); + + // Run maintenance, provide realms, must purge unidentified + var runInfo = + maintenance.performMaintenance( + MaintenanceRunSpec.builder() + .includeSystemRealm(false) + .realmsToProcess(Set.of(realmOne, realmTwo)) + .build()); + + soft.assertThat(runInfo.referenceStats()) + .contains(MaintenanceStats.builder().scanned(4L).purged(3L).retained(1L).newer(0L).build()); + soft.assertThat(runInfo.objStats()) + .contains(MaintenanceStats.builder().scanned(2L).purged(1L).retained(1L).newer(0L).build()); + soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(1L); + soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(1L); + + soft.assertThatCode(() -> persOne.fetchReference("ref1")).doesNotThrowAnyException(); + soft.assertThatExceptionOfType(ReferenceNotFoundException.class) + .isThrownBy(() -> persOne.fetchReference("ref2")); + soft.assertThatExceptionOfType(ReferenceNotFoundException.class) + .isThrownBy(() -> persTwo.fetchReference("ref1")); + soft.assertThatExceptionOfType(ReferenceNotFoundException.class) + .isThrownBy(() -> persTwo.fetchReference("ref2")); + + soft.assertThat(persOne.fetch(objRef(rOneObj1), ObjOne.class)).isEqualTo(rOneObj1); + soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull(); + } + + @Test + public void simpleRetainViaRealmIdentifierPersistence() { + var rOneObj1 = + persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(), ObjOne.class); + var rTwoObj2 = + persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(), ObjTwo.class); + + persOne.createReference("ref1", Optional.empty()); + persOne.createReference("ref2", Optional.empty()); + persTwo.createReference("ref1", Optional.empty()); + persTwo.createReference("ref2", Optional.empty()); + + // identify rOneObj1 as "live" + RealmIdentOne.testCallback = + c -> { + if (c.realm().equals(realmOne)) { + c.realmPersistence().fetch(objRef(rOneObj1), ObjOne.class); + c.realmPersistence().fetchReference("ref1"); + } + return true; + }; + + mutableMonotonicClock.advanceBoth(GRACE_TIME); + + // Run maintenance, provide realms, must purge unidentified + var runInfo = + maintenance.performMaintenance( + MaintenanceRunSpec.builder() + .includeSystemRealm(false) + .realmsToProcess(Set.of(realmOne, realmTwo)) + .build()); + + soft.assertThat(runInfo.referenceStats()) + .contains(MaintenanceStats.builder().scanned(4L).purged(3L).retained(1L).newer(0L).build()); + soft.assertThat(runInfo.objStats()) + .contains(MaintenanceStats.builder().scanned(2L).purged(1L).retained(1L).newer(0L).build()); + soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(1L); + soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(1L); + + soft.assertThatCode(() -> persOne.fetchReference("ref1")).doesNotThrowAnyException(); + soft.assertThatExceptionOfType(ReferenceNotFoundException.class) + .isThrownBy(() -> persOne.fetchReference("ref2")); + soft.assertThatExceptionOfType(ReferenceNotFoundException.class) + .isThrownBy(() -> persTwo.fetchReference("ref1")); + soft.assertThatExceptionOfType(ReferenceNotFoundException.class) + .isThrownBy(() -> persTwo.fetchReference("ref2")); + + soft.assertThat(persOne.fetch(objRef(rOneObj1), ObjOne.class)).isEqualTo(rOneObj1); + soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull(); + } + + @Test + public void simpleRetainViaObjTypeIdentifier() { + var rOneObj1 = + persOne.write(ObjOne.builder().text("foo1").id(persOne.generateId()).build(), ObjOne.class); + var rOneObj2 = + persOne.write(ObjTwo.builder().text("foo2").id(persOne.generateId()).build(), ObjTwo.class); + var rTwoObj1 = + persTwo.write(ObjOne.builder().text("bar2").id(persTwo.generateId()).build(), ObjOne.class); + var rTwoObj2 = + persTwo.write(ObjTwo.builder().text("bar2").id(persTwo.generateId()).build(), ObjTwo.class); + + // identify rOneObj1 as "live" + RealmIdentOne.testCallback = + c -> { + c.retainObject(objRef(rOneObj1)); + return true; + }; + // identify rObjObj2 via obj-type callback + ObjTypeIdentOne.testCallback = + (c, id) -> { + if (id.equals(objRef(rOneObj1))) { + c.retainObject(objRef(rOneObj2)); + } + }; + // identify rTwoObj1 + RealmIdentTwo.testCallback = + c -> { + c.retainObject(objRef(rTwoObj1)); + return true; + }; + + mutableMonotonicClock.advanceBoth(GRACE_TIME); + + // Run maintenance, provide realms, must purge unidentified + var runInfo = + maintenance.performMaintenance( + MaintenanceRunSpec.builder() + .includeSystemRealm(false) + .realmsToProcess(Set.of(realmOne, realmTwo)) + .build()); + + soft.assertThat(runInfo.referenceStats()) + .contains(MaintenanceStats.builder().scanned(0L).purged(0L).retained(0L).newer(0L).build()); + soft.assertThat(runInfo.objStats()) + .contains(MaintenanceStats.builder().scanned(4L).purged(1L).retained(3L).newer(0L).build()); + soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L); + soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(6L); + + soft.assertThat(persOne.fetch(objRef(rOneObj1), ObjOne.class)).isEqualTo(rOneObj1); + soft.assertThat(persOne.fetch(objRef(rOneObj2), ObjTwo.class)).isEqualTo(rOneObj2); + soft.assertThat(persTwo.fetch(objRef(rTwoObj1), ObjOne.class)).isEqualTo(rTwoObj1); + soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull(); + } + + @Test + public void noRealmIdentifierHandlesRealms() { + var rOneObj1 = + persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(), ObjOne.class); + var rTwoObj2 = + persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(), ObjTwo.class); + persOne.createReference("ref1", Optional.empty()); + persOne.createReference("ref2", Optional.empty()); + persTwo.createReference("ref1", Optional.empty()); + persTwo.createReference("ref2", Optional.empty()); + + RealmIdentOne.testCallback = c -> false; + RealmIdentTwo.testCallback = c -> false; + + mutableMonotonicClock.advanceBoth(GRACE_TIME); + + // Run maintenance, provide realms, no realm-identifier handles realm, must NOT purge + var runInfo = + maintenance.performMaintenance( + MaintenanceRunSpec.builder() + .includeSystemRealm(false) + .realmsToProcess(Set.of(realmOne, realmTwo)) + .build()); + + soft.assertThat(runInfo.referenceStats()) + .contains(MaintenanceStats.builder().scanned(4L).purged(0L).retained(4L).newer(0L).build()); + soft.assertThat(runInfo.objStats()) + .contains(MaintenanceStats.builder().scanned(2L).purged(0L).retained(2L).newer(0L).build()); + soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L); + soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(0L); + + soft.assertThat(persOne.fetch(objRef(rOneObj1), ObjOne.class)).isEqualTo(rOneObj1); + soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isEqualTo(rTwoObj2); + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/test/resources/weld.properties b/persistence/nosql/persistence/maintenance/impl/src/test/resources/weld.properties new file mode 100644 index 0000000000..c26169e0e1 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/test/resources/weld.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://bugs.openjdk.org/browse/JDK-8349545 +org.jboss.weld.bootstrap.concurrentDeployment=false diff --git a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceConfigurationProducer.java b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceConfigurationProducer.java new file mode 100644 index 0000000000..3e283e9557 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceConfigurationProducer.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig; + +@ApplicationScoped +public class MaintenanceConfigurationProducer { + public static MutableMaintenanceConfig config = new MutableMaintenanceConfig(); + + @Produces + MaintenanceConfig produceMaintenanceConfig() { + return config; + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MutableMaintenanceConfig.java b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MutableMaintenanceConfig.java new file mode 100644 index 0000000000..d2bbc31ad8 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MutableMaintenanceConfig.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import static org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceServiceImpl.MIN_GRACE_TIME_MINUTES; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonInclude; +import java.time.Duration; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.OptionalInt; +import java.util.OptionalLong; +import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig; + +public class MutableMaintenanceConfig implements MaintenanceConfig { + /** Minimum allowed by MaintenanceServiceImpl. */ + public static final Duration GRACE_TIME = Duration.ofMinutes(MIN_GRACE_TIME_MINUTES); + + private static MaintenanceConfig current = MaintenanceConfig.builder().build(); + + public static void setCurrent(MaintenanceConfig config) { + current = config; + } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @Override + public OptionalLong expectedReferenceCount() { + return current.expectedReferenceCount(); + } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @Override + public OptionalLong expectedObjCount() { + return current.expectedObjCount(); + } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @Override + public OptionalDouble countFromLastRunMultiplier() { + return current.countFromLastRunMultiplier(); + } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @Override + public OptionalDouble filterInitializedFpp() { + return current.filterInitializedFpp(); + } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @Override + public OptionalDouble maxAcceptableFilterFpp() { + return current.maxAcceptableFilterFpp(); + } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @Override + public OptionalInt retainedRuns() { + return current.retainedRuns(); + } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @JsonFormat(shape = JsonFormat.Shape.STRING) + @Override + public Optional createdAtGraceTime() { + return current.createdAtGraceTime(); + } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @Override + public OptionalInt objectScanRateLimitPerSecond() { + return current.objectScanRateLimitPerSecond(); + } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @Override + public OptionalInt referenceScanRateLimitPerSecond() { + return current.referenceScanRateLimitPerSecond(); + } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @Override + public OptionalInt deleteBatchSize() { + return current.deleteBatchSize(); + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjOne.java b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjOne.java new file mode 100644 index 0000000000..72afcc0767 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjOne.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import jakarta.annotation.Nullable; +import org.apache.polaris.immutables.PolarisImmutable; +import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType; +import org.apache.polaris.persistence.nosql.api.obj.Obj; +import org.apache.polaris.persistence.nosql.api.obj.ObjType; + +@PolarisImmutable +@JsonSerialize(as = ImmutableObjOne.class) +@JsonDeserialize(as = ImmutableObjOne.class) +public interface ObjOne extends Obj { + + ObjType TYPE = new ObjOneType(); + + @Override + default ObjType type() { + return TYPE; + } + + @Nullable + String text(); + + static ImmutableObjOne.Builder builder() { + return ImmutableObjOne.builder(); + } + + final class ObjOneType extends AbstractObjType { + public ObjOneType() { + super("maint-test-one", "maint-one", ObjOne.class); + } + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTwo.java b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTwo.java new file mode 100644 index 0000000000..571e5d20ca --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTwo.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import jakarta.annotation.Nullable; +import org.apache.polaris.immutables.PolarisImmutable; +import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType; +import org.apache.polaris.persistence.nosql.api.obj.Obj; +import org.apache.polaris.persistence.nosql.api.obj.ObjType; + +@PolarisImmutable +@JsonSerialize(as = ImmutableObjTwo.class) +@JsonDeserialize(as = ImmutableObjTwo.class) +public interface ObjTwo extends Obj { + + ObjType TYPE = new ObjTwoType(); + + @Override + default ObjType type() { + return TYPE; + } + + @Nullable + String text(); + + static ImmutableObjTwo.Builder builder() { + return ImmutableObjTwo.builder(); + } + + final class ObjTwoType extends AbstractObjType { + public ObjTwoType() { + super("maint-test-two", "maint-two", ObjTwo.class); + } + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentOne.java b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentOne.java new file mode 100644 index 0000000000..be9ef1cd33 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentOne.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import jakarta.annotation.Nonnull; +import jakarta.enterprise.context.ApplicationScoped; +import java.util.function.BiConsumer; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.api.obj.ObjType; +import org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier; +import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector; + +@ApplicationScoped +public class ObjTypeIdentOne implements ObjTypeRetainedIdentifier { + + static BiConsumer testCallback; + + @Override + public String name() { + return "TEST ObjTypeRetainedIdentifier ONE"; + } + + @Nonnull + @Override + public ObjType handledObjType() { + return ObjOne.TYPE; + } + + @Override + public void identifyRelatedObj(@Nonnull RetainedCollector collector, @Nonnull ObjRef objRef) { + if (testCallback != null) { + testCallback.accept(collector, objRef); + } + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentTwo.java b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentTwo.java new file mode 100644 index 0000000000..9052a38825 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentTwo.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import jakarta.annotation.Nonnull; +import jakarta.enterprise.context.ApplicationScoped; +import java.util.function.BiConsumer; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.api.obj.ObjType; +import org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier; +import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector; + +@ApplicationScoped +public class ObjTypeIdentTwo implements ObjTypeRetainedIdentifier { + static BiConsumer testCallback; + + @Override + public String name() { + return "TEST ObjTypeRetainedIdentifier TWO"; + } + + @Nonnull + @Override + public ObjType handledObjType() { + return ObjTwo.TYPE; + } + + @Override + public void identifyRelatedObj(@Nonnull RetainedCollector collector, @Nonnull ObjRef objRef) { + if (testCallback != null) { + testCallback.accept(collector, objRef); + } + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentOne.java b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentOne.java new file mode 100644 index 0000000000..229aa2ac43 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentOne.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import jakarta.annotation.Nonnull; +import jakarta.enterprise.context.ApplicationScoped; +import java.util.function.Function; +import org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier; +import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector; + +@ApplicationScoped +public class RealmIdentOne implements PerRealmRetainedIdentifier { + static Function testCallback; + + @Override + public String name() { + return "TEST RealmRetainedIdentifier ONE"; + } + + @Override + public boolean identifyRetained(@Nonnull RetainedCollector collector) { + return testCallback != null ? testCallback.apply(collector) : false; + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentTwo.java b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentTwo.java new file mode 100644 index 0000000000..22f2a61db8 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentTwo.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.impl; + +import jakarta.annotation.Nonnull; +import jakarta.enterprise.context.ApplicationScoped; +import java.util.function.Function; +import org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier; +import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector; + +@ApplicationScoped +public class RealmIdentTwo implements PerRealmRetainedIdentifier { + static Function testCallback; + + @Override + public String name() { + return "TEST RealmRetainedIdentifier TWO"; + } + + @Override + public boolean identifyRetained(@Nonnull RetainedCollector collector) { + return testCallback != null ? testCallback.apply(collector) : false; + } +} diff --git a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java new file mode 100644 index 0000000000..b486d52fae --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.persistence.nosql.maintenance.impl; diff --git a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/beans.xml b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/beans.xml new file mode 100644 index 0000000000..a297f1aa53 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType new file mode 100644 index 0000000000..c2c5d5f950 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.polaris.persistence.nosql.maintenance.impl.ObjOne$ObjOneType +org.apache.polaris.persistence.nosql.maintenance.impl.ObjTwo$ObjTwoType