Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions .github/workflows/object-storage-adapter-check.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
name: Test Object Storage Adapter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since AWS does not provide an S3 emulator, and other S3-compatible storage services do not support conditional deletes, we will need to run integration tests when releasing a new ScalarDB version.

on:
workflow_dispatch:
inputs:
INT_TEST_JAVA_RUNTIME_VERSION:
description: JDK version used to run the integration test
type: choice
required: false
default: '8'
options:
- '8'
- '11'
- '17'
- '21'
INT_TEST_JAVA_RUNTIME_VENDOR:
description: Vendor of the JDK used to run the integration test
type: choice
required: false
default: 'temurin'
options:
- 'corretto'
- 'microsoft'
- 'oracle'
- 'temurin'

env:
TERM: dumb
JAVA_VERSION: '8'
JAVA_VENDOR: 'temurin'
INT_TEST_JAVA_RUNTIME_VERSION: "${{ github.event_name != 'workflow_dispatch' && '8' || inputs.INT_TEST_JAVA_RUNTIME_VERSION }}"
INT_TEST_JAVA_RUNTIME_VENDOR: "${{ github.event_name != 'workflow_dispatch' && 'temurin' || inputs.INT_TEST_JAVA_RUNTIME_VENDOR }}"
# Gradle will parse 'ORG_GRADLE_PROJECT_<project_property_name>' environment variables as project properties.
# The following variables configure the 'com.scalar.db.jdk-configuration' Gradle plugin.
ORG_GRADLE_PROJECT_javaVersion: '8'
ORG_GRADLE_PROJECT_javaVendor: 'temurin'
ORG_GRADLE_PROJECT_integrationTestJavaRuntimeVersion: "${{ github.event_name != 'workflow_dispatch' && '8' || inputs.INT_TEST_JAVA_RUNTIME_VERSION }}"
ORG_GRADLE_PROJECT_integrationTestJavaRuntimeVendor: "${{ github.event_name != 'workflow_dispatch' && 'temurin' || inputs.INT_TEST_JAVA_RUNTIME_VENDOR }}"
# This variable evaluates to: if {!(Temurin JDK 8) && !(Oracle JDK)} then {true} else {false}
# Oracle JDK that are linux compatible and publicly available through direct download exist for all LTS versions
SET_UP_INT_TEST_RUNTIME_NON_ORACLE_JDK: "${{ (github.event_name == 'workflow_dispatch' && !(inputs.INT_TEST_JAVA_RUNTIME_VERSION == '8' && inputs.INT_TEST_JAVA_RUNTIME_VENDOR == 'temurin') && !(inputs.INT_TEST_JAVA_RUNTIME_VENDOR == 'oracle')) && 'true' || 'false' }}"
INT_TEST_GRADLE_OPTIONS_FOR_GROUP_COMMIT: '"-Dscalardb.consensus_commit.coordinator.group_commit.enabled=true" "-Dscalardb.consensus_commit.coordinator.group_commit.old_group_abort_timeout_millis=15000" --tests "**.ConsensusCommit**"'
AWS_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_SECRET_ACCESS_KEY }}
S3_REGION: ap-northeast-1
S3_BUCKET_NAME: scalardb-test-bucket

jobs:
integration-test-s3:
name: S3 integration test (${{ matrix.mode.label }})
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
mode:
- label: default
group_commit_enabled: false
- label: with_group_commit
group_commit_enabled: true

steps:
- uses: actions/checkout@v5

- name: Set up JDK ${{ env.JAVA_VERSION }} (${{ env.JAVA_VENDOR }})
uses: actions/setup-java@v5
with:
java-version: ${{ env.JAVA_VERSION }}
distribution: ${{ env.JAVA_VENDOR }}

- name: Set up JDK ${{ env.INT_TEST_JAVA_RUNTIME_VERSION }} (${{ env.INT_TEST_JAVA_RUNTIME_VENDOR }}) to run integration test
uses: actions/setup-java@v5
if: ${{ env.SET_UP_INT_TEST_RUNTIME_NON_ORACLE_JDK == 'true'}}
with:
java-version: ${{ env.INT_TEST_JAVA_RUNTIME_VERSION }}
distribution: ${{ env.INT_TEST_JAVA_RUNTIME_VENDOR }}

- name: Login to Oracle container registry
uses: docker/login-action@v3
if: ${{ env.INT_TEST_JAVA_RUNTIME_VENDOR == 'oracle' }}
with:
registry: container-registry.oracle.com
username: ${{ secrets.OCR_USERNAME }}
password: ${{ secrets.OCR_TOKEN }}

- name: Set up JDK ${{ env.INT_TEST_JAVA_RUNTIME_VERSION }} (oracle) to run the integration test
if: ${{ env.INT_TEST_JAVA_RUNTIME_VENDOR == 'oracle' }}
run: |
container_id=$(docker create "container-registry.oracle.com/java/jdk:${{ env.INT_TEST_JAVA_RUNTIME_VERSION }}")
docker cp -L "$container_id:/usr/java/default" /usr/lib/jvm/oracle-jdk && docker rm "$container_id"
- name: Setup Gradle
uses: gradle/actions/setup-gradle@v5

- name: Execute Gradle 'integrationTestObjectStorage' task
run: ./gradlew integrationTestObjectStorage -Dscalardb.object_storage.storage=s3 -Dscalardb.object_storage.endpoint=${{ env.S3_REGION }}/${{ env.S3_BUCKET_NAME }} ${{ matrix.mode.group_commit_enabled && env.INT_TEST_GRADLE_OPTIONS_FOR_GROUP_COMMIT || '' }}

- name: Upload Gradle test reports
if: always()
uses: actions/upload-artifact@v5
with:
name: cassandra_3.0_integration_test_reports_${{ matrix.mode.label }}
path: core/build/reports/tests/integrationTestObjectStorage
2 changes: 2 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ dependencies {
implementation platform("software.amazon.awssdk:bom:${awssdkVersion}")
implementation 'software.amazon.awssdk:applicationautoscaling'
implementation 'software.amazon.awssdk:dynamodb'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:aws-crt-client'
testImplementation 'software.amazon.awssdk:iam'
testImplementation 'software.amazon.awssdk:iam-policy-builder'
implementation "org.apache.commons:commons-dbcp2:${commonsDbcp2Version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import java.util.Properties;

public class ObjectStorageEnv {
private static final String PROP_OBJECT_STORAGE_STORAGE = "scalardb.object_storage.storage";
private static final String PROP_OBJECT_STORAGE_ENDPOINT = "scalardb.object_storage.endpoint";
private static final String PROP_OBJECT_STORAGE_USERNAME = "scalardb.object_storage.username";
private static final String PROP_OBJECT_STORAGE_PASSWORD = "scalardb.object_storage.password";

private static final String DEFAULT_OBJECT_STORAGE_STORAGE = BlobStorageConfig.STORAGE_NAME;
private static final String DEFAULT_OBJECT_STORAGE_ENDPOINT =
"http://localhost:10000/test/test-container";
private static final String DEFAULT_OBJECT_STORAGE_USERNAME = "test";
Expand All @@ -19,6 +21,8 @@ public class ObjectStorageEnv {
private ObjectStorageEnv() {}

public static Properties getProperties(String testName) {
String storage =
System.getProperty(PROP_OBJECT_STORAGE_STORAGE, DEFAULT_OBJECT_STORAGE_STORAGE);
String accountName =
System.getProperty(PROP_OBJECT_STORAGE_USERNAME, DEFAULT_OBJECT_STORAGE_USERNAME);
String accountKey =
Expand All @@ -30,7 +34,7 @@ public static Properties getProperties(String testName) {
properties.setProperty(DatabaseConfig.CONTACT_POINTS, endpoint);
properties.setProperty(DatabaseConfig.USERNAME, accountName);
properties.setProperty(DatabaseConfig.PASSWORD, accountKey);
properties.setProperty(DatabaseConfig.STORAGE, BlobStorageConfig.STORAGE_NAME);
properties.setProperty(DatabaseConfig.STORAGE, storage);
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,12 @@ public void deleteByPrefix_WithNonExistingPrefix_ShouldDoNothing() throws Except
@Test
public void close_ShouldNotThrowException() {
// Arrange
Properties properties = getProperties(TEST_NAME);
ObjectStorageConfig objectStorageConfig =
ObjectStorageUtils.getObjectStorageConfig(new DatabaseConfig(properties));
ObjectStorageWrapper wrapper = ObjectStorageWrapperFactory.create(objectStorageConfig);

// Act Assert
assertThatCode(() -> wrapper.close()).doesNotThrowAnyException();
assertThatCode(wrapper::close).doesNotThrowAnyException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.scalar.db.storage.objectstorage;

public class ConflictOccurredException extends ObjectStorageWrapperException {

public ConflictOccurredException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import com.scalar.db.exception.storage.RetriableExecutionException;
import java.util.Collections;
import java.util.List;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class MutateStatementHandler extends StatementHandler {
public MutateStatementHandler(
ObjectStorageWrapper wrapper, TableMetadataManager metadataManager) {
Expand Down Expand Up @@ -78,7 +80,7 @@ private void writePartition(ObjectStoragePartitionSnapshot snapshot) throws Exec
wrapper.insert(snapshot.getObjectKey(), snapshot.getPartition().serialize());
}
}
} catch (PreconditionFailedException e) {
} catch (PreconditionFailedException | ConflictOccurredException e) {
throw new RetriableExecutionException(
CoreError.OBJECT_STORAGE_CONFLICT_OCCURRED_IN_MUTATION.buildMessage(e.getMessage()), e);
} catch (ObjectStorageWrapperException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class ObjectStorageAdmin implements DistributedStorageAdmin {
public static final String NAMESPACE_METADATA_TABLE = "namespaces";
public static final String TABLE_METADATA_TABLE = "metadata";
Expand Down Expand Up @@ -48,6 +50,29 @@ public ObjectStorageAdmin(DatabaseConfig databaseConfig) {
metadataNamespace = objectStorageConfig.getMetadataNamespace();
}

private static String getTableMetadataKey(String namespace, String table) {
return String.join(
String.valueOf(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER), namespace, table);
}

private static String getNamespaceNameFromTableMetadataKey(String tableMetadataKey) {
List<String> parts =
Splitter.on(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER).splitToList(tableMetadataKey);
if (parts.size() != 2 || parts.get(0).isEmpty()) {
throw new IllegalArgumentException("Invalid table metadata key: " + tableMetadataKey);
}
return parts.get(0);
}

private static String getTableNameFromTableMetadataKey(String tableMetadataKey) {
List<String> parts =
Splitter.on(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER).splitToList(tableMetadataKey);
if (parts.size() != 2 || parts.get(1).isEmpty()) {
throw new IllegalArgumentException("Invalid table metadata key: " + tableMetadataKey);
}
return parts.get(1);
}

@Override
public StorageInfo getStorageInfo(String namespace) throws ExecutionException {
return STORAGE_INFO;
Expand Down Expand Up @@ -450,29 +475,6 @@ private void deleteTableData(String namespace, String table) throws ExecutionExc
}
}

private static String getTableMetadataKey(String namespace, String table) {
return String.join(
String.valueOf(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER), namespace, table);
}

private static String getNamespaceNameFromTableMetadataKey(String tableMetadataKey) {
List<String> parts =
Splitter.on(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER).splitToList(tableMetadataKey);
if (parts.size() != 2 || parts.get(0).isEmpty()) {
throw new IllegalArgumentException("Invalid table metadata key: " + tableMetadataKey);
}
return parts.get(0);
}

private static String getTableNameFromTableMetadataKey(String tableMetadataKey) {
List<String> parts =
Splitter.on(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER).splitToList(tableMetadataKey);
if (parts.size() != 2 || parts.get(1).isEmpty()) {
throw new IllegalArgumentException("Invalid table metadata key: " + tableMetadataKey);
}
return parts.get(1);
}

private void checkTableMetadata(TableMetadata metadata) {
Set<String> secondaryIndexNames = metadata.getSecondaryIndexNames();
if (!secondaryIndexNames.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ public interface ObjectStorageConfig {
*/
String getStorageName();

/**
* Returns the endpoint for the object storage service.
*
* @return the endpoint
*/
String getEndpoint();

/**
* Returns the username for authentication.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.storage.objectstorage.blobstorage.BlobStorageConfig;
import com.scalar.db.storage.objectstorage.s3.S3Config;
import java.util.Objects;

public class ObjectStorageUtils {
Expand All @@ -23,6 +24,8 @@ public static String[] parseObjectKey(String objectKey) {
public static ObjectStorageConfig getObjectStorageConfig(DatabaseConfig databaseConfig) {
if (Objects.equals(databaseConfig.getStorage(), BlobStorageConfig.STORAGE_NAME)) {
return new BlobStorageConfig(databaseConfig);
} else if (Objects.equals(databaseConfig.getStorage(), S3Config.STORAGE_NAME)) {
return new S3Config(databaseConfig);
} else {
throw new IllegalArgumentException(
"Unsupported Object Storage: " + databaseConfig.getStorage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import java.util.Optional;
import java.util.Set;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public interface ObjectStorageWrapper {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@

public class ObjectStorageWrapperException extends Exception {

public ObjectStorageWrapperException(String message) {
super(message);
}

public ObjectStorageWrapperException(String message, Throwable cause) {
super(message, cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.scalar.db.storage.objectstorage.blobstorage.BlobStorageConfig;
import com.scalar.db.storage.objectstorage.blobstorage.BlobStorageWrapper;
import com.scalar.db.storage.objectstorage.s3.S3Config;
import com.scalar.db.storage.objectstorage.s3.S3Wrapper;
import java.util.Objects;

public class ObjectStorageWrapperFactory {
Expand All @@ -10,6 +12,9 @@ public static ObjectStorageWrapper create(ObjectStorageConfig objectStorageConfi
if (Objects.equals(objectStorageConfig.getStorageName(), BlobStorageConfig.STORAGE_NAME)) {
assert objectStorageConfig instanceof BlobStorageConfig;
return new BlobStorageWrapper((BlobStorageConfig) objectStorageConfig);
} else if (Objects.equals(objectStorageConfig.getStorageName(), S3Config.STORAGE_NAME)) {
assert objectStorageConfig instanceof S3Config;
return new S3Wrapper((S3Config) objectStorageConfig);
} else {
throw new IllegalArgumentException(
"Unsupported Object Storage: " + objectStorageConfig.getStorageName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@

public class PreconditionFailedException extends ObjectStorageWrapperException {

public PreconditionFailedException(String message) {
super(message);
}

public PreconditionFailedException(String message, Throwable cause) {
super(message, cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public BlobStorageConfig(DatabaseConfig databaseConfig) {
bucket = fullEndpoint.substring(lastSlashIndex + 1);
} else {
throw new IllegalArgumentException(
"Invalid contact points format. Expected: BLOB_URI/BUCKET_NAME");
"Invalid contact points format. Expected: BLOB_URI/CONTAINER_NAME");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Blob Storage, a bucket is called a container.

}
username = databaseConfig.getUsername().orElse(null);
password = databaseConfig.getPassword().orElse(null);
Expand Down Expand Up @@ -94,11 +94,6 @@ public String getStorageName() {
return STORAGE_NAME;
}

@Override
public String getEndpoint() {
return endpoint;
}

@Override
public String getUsername() {
return username;
Expand All @@ -119,6 +114,10 @@ public String getMetadataNamespace() {
return metadataNamespace;
}

public String getEndpoint() {
return endpoint;
}

public long getParallelUploadBlockSizeInBytes() {
return parallelUploadBlockSizeInBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class BlobStorageWrapper implements ObjectStorageWrapper {
private final BlobContainerClient client;
private final Duration requestTimeoutInSeconds;
Expand Down
Loading