Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider;
import org.apache.polaris.service.config.ReservedProperties;
Expand Down Expand Up @@ -106,14 +105,14 @@ public abstract class AbstractPolarisGenericTableCatalogTest {
@Inject CallContext callContext;
@Inject RealmConfig realmConfig;
@Inject StorageAccessConfigProvider storageAccessConfigProvider;
@Inject FileIOFactory fileIOFactory;

private PolarisGenericTableCatalog genericTableCatalog;
private IcebergCatalog icebergCatalog;
private AwsStorageConfigInfo storageConfigModel;
private String realmName;
private PolarisCallContext polarisContext;
private PolarisAdminService adminService;
private FileIOFactory fileIOFactory;
private PolarisPrincipal authenticatedRoot;
private PolarisEntity catalogEntity;

Expand Down Expand Up @@ -195,7 +194,6 @@ public void before(TestInfo testInfo) {
new PolarisPassthroughResolutionView(
resolutionManifestFactory, authenticatedRoot, CATALOG_NAME);
TaskExecutor taskExecutor = Mockito.mock();
this.fileIOFactory = new DefaultFileIOFactory();

StsClient stsClient = Mockito.mock(StsClient.class);
when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.Profiles;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.ExceptionMappingFileIO;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
Expand Down Expand Up @@ -238,13 +237,14 @@ public Map<String, String> getConfigOverrides() {
@Inject RealmConfig realmConfig;
@Inject ResolutionManifestFactory resolutionManifestFactory;
@Inject StorageAccessConfigProvider storageAccessConfigProvider;
@Inject FileIOFactory fileIOFactory;
@Inject TaskFileIOSupplier taskFileIOSupplier;

private IcebergCatalog catalog;
private String realmName;
private PolarisCallContext polarisContext;
private PolarisAdminService adminService;
private ResolverFactory resolverFactory;
private FileIOFactory fileIOFactory;
private InMemoryFileIO fileIO;
private PolarisEntity catalogEntity;
private PolarisPrincipal authenticatedRoot;
Expand Down Expand Up @@ -338,8 +338,6 @@ public void before(TestInfo testInfo) {
.build()
.asCatalog(serviceIdentityProvider)));

this.fileIOFactory = new DefaultFileIOFactory();

StsClient stsClient = Mockito.mock(StsClient.class);
when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
.thenReturn(
Expand Down Expand Up @@ -983,7 +981,7 @@ public void testValidateNotificationFailToCreateFileIO() {
// filename.
final String tableLocation = "s3://externally-owned-bucket/validate_table/";
final String tableMetadataLocation = tableLocation + "metadata/";
FileIOFactory fileIOFactory = spy(new DefaultFileIOFactory());
FileIOFactory fileIOFactory = spy(this.fileIOFactory);
IcebergCatalog catalog = newIcebergCatalog(catalog().name(), metaStoreManager, fileIOFactory);
catalog.initialize(
CATALOG_NAME,
Expand Down Expand Up @@ -1899,9 +1897,7 @@ public void testDropTableWithPurge() {
.containsEntry(StorageAccessProperty.AWS_KEY_ID.getPropertyName(), TEST_ACCESS_KEY)
.containsEntry(StorageAccessProperty.AWS_SECRET_KEY.getPropertyName(), SECRET_ACCESS_KEY)
.containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), SESSION_TOKEN);
FileIO fileIO =
new TaskFileIOSupplier(new DefaultFileIOFactory(), storageAccessConfigProvider)
.apply(taskEntity, TABLE);
FileIO fileIO = taskFileIOSupplier.apply(taskEntity, TABLE);
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
.isInstanceOf(InMemoryFileIO.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.Profiles;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider;
import org.apache.polaris.service.config.ReservedProperties;
Expand Down Expand Up @@ -113,6 +112,7 @@ public Map<String, String> getConfigOverrides() {
@Inject CallContext callContext;
@Inject RealmConfig realmConfig;
@Inject StorageAccessConfigProvider storageAccessConfigProvider;
@Inject FileIOFactory fileIOFactory;

private IcebergCatalog catalog;

Expand Down Expand Up @@ -189,7 +189,6 @@ public void before(TestInfo testInfo) {
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
resolutionManifestFactory, authenticatedRoot, CATALOG_NAME);
FileIOFactory fileIOFactory = new DefaultFileIOFactory();

testPolarisEventListener = (TestPolarisEventListener) polarisEventListener;
testPolarisEventListener.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.apache.polaris.core.persistence.dao.entity.CreatePrincipalResult;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.service.admin.PolarisAuthzTestBase;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
import org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory;
import org.apache.polaris.service.http.IfNoneMatch;
Expand Down Expand Up @@ -1891,7 +1890,7 @@ public void testSendNotificationSufficientPrivileges() {
resolverFactory,
Mockito.mock(),
storageAccessConfigProvider,
new DefaultFileIOFactory(),
fileIOFactory,
polarisEventListener,
metaStoreManager,
callContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider;
import org.apache.polaris.service.config.ReservedProperties;
Expand Down Expand Up @@ -132,14 +131,14 @@ public abstract class AbstractPolicyCatalogTest {
@Inject CallContext callContext;
@Inject RealmConfig realmConfig;
@Inject StorageAccessConfigProvider storageAccessConfigProvider;
@Inject FileIOFactory fileIOFactory;

private PolicyCatalog policyCatalog;
private IcebergCatalog icebergCatalog;
private AwsStorageConfigInfo storageConfigModel;
private String realmName;
private PolarisCallContext polarisContext;
private PolarisAdminService adminService;
private FileIOFactory fileIOFactory;
private PolarisPrincipal authenticatedRoot;
private PolarisEntity catalogEntity;

Expand Down Expand Up @@ -214,7 +213,6 @@ public void before(TestInfo testInfo) {
new PolarisPassthroughResolutionView(
resolutionManifestFactory, authenticatedRoot, CATALOG_NAME);
TaskExecutor taskExecutor = Mockito.mock();
this.fileIOFactory = new DefaultFileIOFactory();

StsClient stsClient = Mockito.mock(StsClient.class);
when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatPredicate;

import io.quarkus.test.InjectMock;
import io.quarkus.test.junit.QuarkusMock;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import java.io.IOException;
Expand All @@ -30,6 +32,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
Expand All @@ -43,34 +46,43 @@
import org.apache.iceberg.inmemory.InMemoryFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.AsyncTaskType;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.persistence.BasePersistence;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.service.TestFileIOFactory;
import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

@QuarkusTest
public class BatchFileCleanupTaskHandlerTest {
@Inject MetaStoreManagerFactory metaStoreManagerFactory;
@Inject CallContext callContext;
@InjectMock TaskFileIOSupplier taskFileIOSupplier;

private final RealmContext realmContext = () -> "realmName";
private PolarisCallContext polarisCallContext;
private ExecutorService executor;

@BeforeEach
public void beforeEach() {
QuarkusMock.installMockForType(realmContext, RealmContext.class);
polarisCallContext = callContext.getPolarisCallContext();
executor = Executors.newSingleThreadExecutor();
}

private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) {
return new TaskFileIOSupplier(
new TestFileIOFactory(fileIO), Mockito.mock(StorageAccessConfigProvider.class));
@AfterEach
public void afterEach() {
executor.close();
}

private PolarisCallContext newCallContext() {
BasePersistence metaStore = metaStoreManagerFactory.getOrCreateSession(realmContext);
return new PolarisCallContext(realmContext, metaStore);
private BatchFileCleanupTaskHandler newBatchFileCleanupTaskHandler(FileIO fileIO) {
Mockito.when(taskFileIOSupplier.apply(Mockito.any(), Mockito.any())).thenReturn(fileIO);
return new BatchFileCleanupTaskHandler(taskFileIOSupplier, executor);
}

@Test
public void testMetadataFileCleanup() throws IOException {
PolarisCallContext polarisCallContext = newCallContext();
FileIO fileIO =
new InMemoryFileIO() {
@Override
Expand All @@ -79,9 +91,7 @@ public void close() {
}
};
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1");
BatchFileCleanupTaskHandler handler =
new BatchFileCleanupTaskHandler(
buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor());
BatchFileCleanupTaskHandler handler = newBatchFileCleanupTaskHandler(fileIO);

long snapshotId1 = 100L;
ManifestFile manifestFile1 =
Expand Down Expand Up @@ -179,12 +189,9 @@ public void close() {

@Test
public void testMetadataFileCleanupIfFileNotExist() throws IOException {
PolarisCallContext polarisCallContext = newCallContext();
FileIO fileIO = new InMemoryFileIO();
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1");
BatchFileCleanupTaskHandler handler =
new BatchFileCleanupTaskHandler(
buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor());
BatchFileCleanupTaskHandler handler = newBatchFileCleanupTaskHandler(fileIO);
long snapshotId = 100L;
ManifestFile manifestFile =
TaskTestUtils.manifestFile(
Expand Down Expand Up @@ -219,7 +226,6 @@ public void testMetadataFileCleanupIfFileNotExist() throws IOException {

@Test
public void testCleanupWithRetries() throws IOException {
PolarisCallContext polarisCallContext = newCallContext();
Map<String, AtomicInteger> retryCounter = new HashMap<>();
FileIO fileIO =
new InMemoryFileIO() {
Expand All @@ -240,9 +246,7 @@ public void deleteFile(String location) {
}
};
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1");
BatchFileCleanupTaskHandler handler =
new BatchFileCleanupTaskHandler(
buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor());
BatchFileCleanupTaskHandler handler = newBatchFileCleanupTaskHandler(fileIO);
long snapshotId = 100L;
ManifestFile manifestFile =
TaskTestUtils.manifestFile(
Expand Down
Loading