Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
import com.scalar.db.dataloader.cli.util.DirectoryUtils;
Expand All @@ -29,6 +29,7 @@
import com.scalar.db.dataloader.core.util.KeyUtils;
import com.scalar.db.io.Key;
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
import java.io.BufferedWriter;
import java.nio.charset.Charset;
import java.nio.file.Files;
Expand Down Expand Up @@ -69,11 +70,13 @@ public Integer call() throws Exception {
}

StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath);
TableMetadataService metaDataService =
new TableMetadataService(storageFactory.getStorageAdmin());
ScalarDbDao scalarDbDao = new ScalarDbDao();

ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat);
ExportManager exportManager =
createExportManager(transactionFactory, scalarDbDao, outputFormat);

TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);

Expand Down Expand Up @@ -156,17 +159,17 @@ private void validateOutputDirectory() throws DirectoryValidationException {
}

private ExportManager createExportManager(
StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
TransactionFactory transactionFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
ProducerTaskFactory taskFactory =
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
DistributedStorage storage = storageFactory.getStorage();
DistributedTransactionManager manager = transactionFactory.getTransactionManager();
switch (fileFormat) {
case JSON:
return new JsonExportManager(storage, scalarDbDao, taskFactory);
return new JsonExportManager(manager, scalarDbDao, taskFactory);
case JSONL:
return new JsonLineExportManager(storage, scalarDbDao, taskFactory);
return new JsonLineExportManager(manager, scalarDbDao, taskFactory);
case CSV:
return new CsvExportManager(storage, scalarDbDao, taskFactory);
return new CsvExportManager(manager, scalarDbDao, taskFactory);
default:
throw new AssertionError("Invalid file format" + fileFormat);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.scalar.db.dataloader.core.dataexport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
Expand All @@ -15,16 +15,23 @@
public class CsvExportManager extends ExportManager {

/**
* Constructs a {@code CsvExportManager} with the specified {@link DistributedStorage}, {@link
* ScalarDbDao}, and {@link ProducerTaskFactory}.
* Constructs a {@code CsvExportManager} for exporting data using a {@link
* DistributedTransactionManager}.
*
* @param storage the {@code DistributedStorage} instance used to read data from the database
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
* @param producerTaskFactory the factory used to create producer tasks for exporting data
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
* from ScalarDB within a distributed transaction context.
*
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
* mode
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
* exporting data
*/
public CsvExportManager(
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
super(storage, dao, producerTaskFactory);
DistributedTransactionManager manager,
ScalarDbDao dao,
ProducerTaskFactory producerTaskFactory) {
super(manager, dao, producerTaskFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.scalar.db.dataloader.core.dataexport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.api.TransactionManagerCrudOperable;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
Expand All @@ -12,6 +12,8 @@
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException;
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.io.DataType;
import java.io.BufferedWriter;
import java.io.IOException;
Expand All @@ -34,7 +36,7 @@
public abstract class ExportManager {
private static final Logger logger = LoggerFactory.getLogger(ExportManager.class);

private final DistributedStorage storage;
private final DistributedTransactionManager manager;
private final ScalarDbDao dao;
private final ProducerTaskFactory producerTaskFactory;
private final Object lock = new Object();
Expand Down Expand Up @@ -83,7 +85,8 @@ public ExportReport startExport(
BufferedWriter bufferedWriter = new BufferedWriter(writer);
boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON;

try (Scanner scanner = createScanner(exportOptions, dao, storage)) {
try (TransactionManagerCrudOperable.Scanner scanner =
createScanner(exportOptions, dao, manager)) {

Iterator<Result> iterator = scanner.iterator();
AtomicBoolean isFirstBatch = new AtomicBoolean(true);
Expand All @@ -110,8 +113,11 @@ public ExportReport startExport(
// TODO: handle this
}
processFooter(exportOptions, tableMetadata, bufferedWriter);
} catch (InterruptedException | IOException e) {
logger.error("Error during export: {}", e.getMessage());
} catch (InterruptedException
| IOException
| UnknownTransactionStatusException
| CrudException e) {
logger.error("Error during export: ", e);
} finally {
bufferedWriter.flush();
}
Expand Down Expand Up @@ -213,12 +219,12 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat
*
* @param exportOptions export options
* @param dao ScalarDB dao object
* @param storage distributed storage object
* @param manager DistributedTransactionManager object
* @return created scanner
* @throws ScalarDbDaoException throws if any issue occurs in creating scanner object
*/
private Scanner createScanner(
ExportOptions exportOptions, ScalarDbDao dao, DistributedStorage storage)
private TransactionManagerCrudOperable.Scanner createScanner(
ExportOptions exportOptions, ScalarDbDao dao, DistributedTransactionManager manager)
throws ScalarDbDaoException {
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
if (isScanAll) {
Expand All @@ -227,7 +233,7 @@ private Scanner createScanner(
exportOptions.getTableName(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
storage);
manager);
} else {
return dao.createScanner(
exportOptions.getNamespace(),
Expand All @@ -237,7 +243,7 @@ private Scanner createScanner(
exportOptions.getSortOrders(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
storage);
manager);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.scalar.db.dataloader.core.dataexport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
Expand All @@ -11,16 +11,23 @@
public class JsonExportManager extends ExportManager {

/**
* Constructs a {@code JsonExportManager} with the specified {@link DistributedStorage}, {@link
* ScalarDbDao}, and {@link ProducerTaskFactory}.
* Constructs a {@code JsonExportManager} for exporting data using a {@link
* DistributedTransactionManager}.
*
* @param storage the {@code DistributedStorage} instance used to read data from the database
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
* @param producerTaskFactory the factory used to create producer tasks for exporting data
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
* from ScalarDB within a distributed transaction context and exported in JSON format.
*
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
* mode
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
* exporting data
*/
public JsonExportManager(
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
super(storage, dao, producerTaskFactory);
DistributedTransactionManager manager,
ScalarDbDao dao,
ProducerTaskFactory producerTaskFactory) {
super(manager, dao, producerTaskFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.scalar.db.dataloader.core.dataexport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
Expand All @@ -13,16 +13,23 @@
public class JsonLineExportManager extends ExportManager {

/**
* Constructs a {@code JsonLineExportManager} with the specified {@link DistributedStorage},
* {@link ScalarDbDao}, and {@link ProducerTaskFactory}.
* Constructs a {@code JsonLineExportManager} for exporting data using a {@link
* DistributedTransactionManager}.
*
* @param storage the {@code DistributedStorage} instance used to read data from the database
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
* @param producerTaskFactory the factory used to create producer tasks for exporting data
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
* from ScalarDB within a distributed transaction context and exported in JSON Lines format.
*
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
* mode
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
* exporting data
*/
public JsonLineExportManager(
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
super(storage, dao, producerTaskFactory);
DistributedTransactionManager manager,
ScalarDbDao dao,
ProducerTaskFactory producerTaskFactory) {
super(manager, dao, producerTaskFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Get;
import com.scalar.db.api.GetBuilder;
import com.scalar.db.api.Put;
Expand All @@ -10,6 +11,7 @@
import com.scalar.db.api.Scan;
import com.scalar.db.api.ScanBuilder;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.TransactionManagerCrudOperable;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.exception.storage.ExecutionException;
Expand Down Expand Up @@ -219,61 +221,75 @@ public List<Result> scan(
}

/**
* Create a ScalarDB scanner instance
* Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from
* ScalarDB.
*
* @param namespace ScalarDB namespace
* @param table ScalarDB table name
* @param projectionColumns List of column projection to use during scan
* @param limit Scan limit value
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
* @return ScalarDB Scanner object
* @throws ScalarDbDaoException if scan fails
* <p>a scanner that iterates over the retrieved records. It performs the scan transactionally
* using a {@link DistributedTransactionManager}.
*
* @param namespace the ScalarDB namespace to scan
* @param table the ScalarDB table name
* @param projectionColumns the list of column names to include in the scan results
* @param limit the maximum number of records to retrieve
* @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in
* storage mode
* @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over the scan
* results
* @throws ScalarDbDaoException if an error occurs while creating or executing the scan
*/
public Scanner createScanner(
public TransactionManagerCrudOperable.Scanner createScanner(
String namespace,
String table,
List<String> projectionColumns,
int limit,
DistributedStorage storage)
DistributedTransactionManager manager)
throws ScalarDbDaoException {
Scan scan =
createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit);
try {
return storage.scan(scan);
} catch (ExecutionException e) {
return manager.getScanner(scan);
} catch (CrudException e) {
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}

/**
* Create a ScalarDB scanner instance
* Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from
* ScalarDB.
*
* @param namespace ScalarDB namespace
* @param table ScalarDB table name
* @param partitionKey Partition key used in ScalarDB scan
* @param scanRange Optional range to set ScalarDB scan start and end values
* @param sortOrders Optional scan clustering key sorting values
* @param projectionColumns List of column projection to use during scan
* @param limit Scan limit value
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
* @return ScalarDB Scanner object
* @throws ScalarDbDaoException if scan fails
* <p>returns a scanner that iterates over the matching records. The scan is performed
* transactionally through a {@link DistributedTransactionManager}.
*
* @param namespace the ScalarDB namespace to scan
* @param table the ScalarDB table name
* @param partitionKey the optional {@link Key} representing the partition key for the scan
* @param scanRange the optional {@link ScanRange} defining the start and end boundaries for the
* scan
* @param sortOrders the optional list of {@link Scan.Ordering} objects defining the clustering
* key sort order
* @param projectionColumns the optional list of column names to include in the scan results
* @param limit the maximum number of records to retrieve
* @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in
* storage mode
* @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over scan
* results
* @throws ScalarDbDaoException if an error occurs while creating or executing the scan
*/
public Scanner createScanner(
public TransactionManagerCrudOperable.Scanner createScanner(
String namespace,
String table,
@Nullable Key partitionKey,
@Nullable ScanRange scanRange,
@Nullable List<Scan.Ordering> sortOrders,
@Nullable List<String> projectionColumns,
int limit,
DistributedStorage storage)
DistributedTransactionManager manager)
throws ScalarDbDaoException {
Scan scan =
createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit);
try {
return storage.scan(scan);
} catch (ExecutionException e) {
return manager.getScanner(scan);
} catch (CrudException e) {
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}
Expand Down
Loading