From 627fe69b82840d5d7fc1bf2089724f51fdb1d655 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Mon, 10 Nov 2025 15:21:08 +0530 Subject: [PATCH 01/17] initial commit --- .../dataloader/cli/command/dataexport/ExportCommand.java | 8 ++++++-- .../db/dataloader/core/dataexport/CsvExportManager.java | 7 ++++--- .../db/dataloader/core/dataexport/ExportManager.java | 3 ++- .../db/dataloader/core/dataexport/JsonExportManager.java | 9 +++++---- .../core/dataexport/JsonLineExportManager.java | 9 +++++---- 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index 712a57c12c..adf54105ae 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -6,6 +6,7 @@ import static java.nio.file.StandardOpenOption.CREATE; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.cli.exception.DirectoryValidationException; import com.scalar.db.dataloader.cli.util.DirectoryUtils; @@ -36,6 +37,8 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; + +import com.scalar.db.service.TransactionFactory; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +67,7 @@ public Integer call() throws Exception { validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS); StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath); + TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath) TableMetadataService metaDataService = new TableMetadataService(storageFactory.getStorageAdmin()); ScalarDbDao scalarDbDao = new ScalarDbDao(); @@ -146,10 +150,10 @@ private void validateOutputDirectory() throws DirectoryValidationException { } private ExportManager createExportManager( - StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) { + TransactionFactory transactionFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) { ProducerTaskFactory taskFactory = new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson); - DistributedStorage storage = storageFactory.getStorage(); + DistributedTransactionManager manager = transactionFactory.getTransactionManager(); switch (fileFormat) { case JSON: return new JsonExportManager(storage, scalarDbDao, taskFactory); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java index bccf691adc..3da65da552 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; @@ -18,13 +19,13 @@ public class CsvExportManager extends ExportManager { * Constructs a {@code CsvExportManager} with the specified {@link DistributedStorage}, {@link * ScalarDbDao}, and {@link ProducerTaskFactory}. * - * @param storage the {@code DistributedStorage} instance used to read data from the database + * @param manager the {@code DistributedTransactionManager} instance used to read data from the database * @param dao the {@code ScalarDbDao} used to execute export-related database operations * @param producerTaskFactory the factory used to create producer tasks for exporting data */ public CsvExportManager( - DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { - super(storage, dao, producerTaskFactory); + DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + super(manager, dao, producerTaskFactory); } /** diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index fdc27d664c..8278c7b877 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; @@ -34,7 +35,7 @@ public abstract class ExportManager { private static final Logger logger = LoggerFactory.getLogger(ExportManager.class); - private final DistributedStorage storage; + private final DistributedTransactionManager distributedTransactionManager; private final ScalarDbDao dao; private final ProducerTaskFactory producerTaskFactory; private final Object lock = new Object(); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java index fadac644a2..26737e4ed6 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; @@ -11,16 +12,16 @@ public class JsonExportManager extends ExportManager { /** - * Constructs a {@code JsonExportManager} with the specified {@link DistributedStorage}, {@link + * Constructs a {@code JsonExportManager} with the specified {@link DistributedTransactionManager}, {@link * ScalarDbDao}, and {@link ProducerTaskFactory}. * - * @param storage the {@code DistributedStorage} instance used to read data from the database + * @param manager the {@code DistributedTransactionManager} instance used to read data from the database * @param dao the {@code ScalarDbDao} used to execute export-related database operations * @param producerTaskFactory the factory used to create producer tasks for exporting data */ public JsonExportManager( - DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { - super(storage, dao, producerTaskFactory); + DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + super(manager, dao, producerTaskFactory); } /** diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java index 2ce21deb7d..1c084df0c2 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; @@ -13,16 +14,16 @@ public class JsonLineExportManager extends ExportManager { /** - * Constructs a {@code JsonLineExportManager} with the specified {@link DistributedStorage}, + * Constructs a {@code JsonLineExportManager} with the specified {@link DistributedTransactionManager}, * {@link ScalarDbDao}, and {@link ProducerTaskFactory}. * - * @param storage the {@code DistributedStorage} instance used to read data from the database + * @param manager the {@code DistributedTransactionManager} instance used to read data from the database * @param dao the {@code ScalarDbDao} used to execute export-related database operations * @param producerTaskFactory the factory used to create producer tasks for exporting data */ public JsonLineExportManager( - DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { - super(storage, dao, producerTaskFactory); + DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + super(manager, dao, producerTaskFactory); } /** From 32b4b701d5af20841c67ff15ef837e54066bc9e9 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Mon, 10 Nov 2025 17:18:02 +0530 Subject: [PATCH 02/17] Initial changes --- .../cli/command/dataexport/ExportCommand.java | 17 +++-- .../core/dataexport/CsvExportManager.java | 20 ++++-- .../core/dataexport/ExportManager.java | 14 ++-- .../core/dataexport/JsonExportManager.java | 20 ++++-- .../dataexport/JsonLineExportManager.java | 20 ++++-- .../core/dataimport/dao/ScalarDbDao.java | 72 ++++++++++++------- .../core/dataexport/CsvExportManagerTest.java | 29 ++++---- .../dataexport/JsonExportManagerTest.java | 22 +++--- .../dataexport/JsonLineExportManagerTest.java | 22 +++--- 9 files changed, 139 insertions(+), 97 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index adf54105ae..e6605a504b 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -5,7 +5,6 @@ import static java.nio.file.StandardOpenOption.APPEND; import static java.nio.file.StandardOpenOption.CREATE; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.cli.exception.DirectoryValidationException; @@ -30,6 +29,7 @@ import com.scalar.db.dataloader.core.util.KeyUtils; import com.scalar.db.io.Key; import com.scalar.db.service.StorageFactory; +import com.scalar.db.service.TransactionFactory; import java.io.BufferedWriter; import java.nio.charset.Charset; import java.nio.file.Files; @@ -37,8 +37,6 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; - -import com.scalar.db.service.TransactionFactory; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,12 +65,13 @@ public Integer call() throws Exception { validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS); StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath); - TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath) + TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath); TableMetadataService metaDataService = new TableMetadataService(storageFactory.getStorageAdmin()); ScalarDbDao scalarDbDao = new ScalarDbDao(); - ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat); + ExportManager exportManager = + createExportManager(transactionFactory, scalarDbDao, outputFormat); TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table); @@ -153,14 +152,14 @@ private ExportManager createExportManager( TransactionFactory transactionFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) { ProducerTaskFactory taskFactory = new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson); - DistributedTransactionManager manager = transactionFactory.getTransactionManager(); + DistributedTransactionManager manager = transactionFactory.getTransactionManager(); switch (fileFormat) { case JSON: - return new JsonExportManager(storage, scalarDbDao, taskFactory); + return new JsonExportManager(manager, scalarDbDao, taskFactory); case JSONL: - return new JsonLineExportManager(storage, scalarDbDao, taskFactory); + return new JsonLineExportManager(manager, scalarDbDao, taskFactory); case CSV: - return new CsvExportManager(storage, scalarDbDao, taskFactory); + return new CsvExportManager(manager, scalarDbDao, taskFactory); default: throw new AssertionError("Invalid file format" + fileFormat); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java index 3da65da552..9589e405ac 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java @@ -1,6 +1,5 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; @@ -16,15 +15,22 @@ public class CsvExportManager extends ExportManager { /** - * Constructs a {@code CsvExportManager} with the specified {@link DistributedStorage}, {@link - * ScalarDbDao}, and {@link ProducerTaskFactory}. + * Constructs a {@code CsvExportManager} for exporting data using a {@link + * DistributedTransactionManager}. * - * @param manager the {@code DistributedTransactionManager} instance used to read data from the database - * @param dao the {@code ScalarDbDao} used to execute export-related database operations - * @param producerTaskFactory the factory used to create producer tasks for exporting data + *

This constructor is used when exporting data in transactional mode, allowing data to be read + * from ScalarDB within a distributed transaction context. + * + * @param manager the {@link DistributedTransactionManager} used to read data in transactional + * mode + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations + * @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for + * exporting data */ public CsvExportManager( - DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + DistributedTransactionManager manager, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { super(manager, dao, producerTaskFactory); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index 8278c7b877..67653762df 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -1,10 +1,10 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.FileFormat; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; @@ -87,7 +87,7 @@ public ExportReport startExport( BufferedWriter bufferedWriter = new BufferedWriter(writer); boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON; - try (Scanner scanner = createScanner(exportOptions, dao, storage)) { + try (Scanner scanner = createScanner(exportOptions, dao, distributedTransactionManager)) { Iterator iterator = scanner.iterator(); AtomicBoolean isFirstBatch = new AtomicBoolean(true); @@ -217,12 +217,12 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat * * @param exportOptions export options * @param dao ScalarDB dao object - * @param storage distributed storage object + * @param manager DistributedTransactionManager object * @return created scanner * @throws ScalarDbDaoException throws if any issue occurs in creating scanner object */ - private Scanner createScanner( - ExportOptions exportOptions, ScalarDbDao dao, DistributedStorage storage) + private TransactionManagerCrudOperable.Scanner createScanner( + ExportOptions exportOptions, ScalarDbDao dao, DistributedTransactionManager manager) throws ScalarDbDaoException { boolean isScanAll = exportOptions.getScanPartitionKey() == null; if (isScanAll) { @@ -231,7 +231,7 @@ private Scanner createScanner( exportOptions.getTableName(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage); + manager); } else { return dao.createScanner( exportOptions.getNamespace(), @@ -241,7 +241,7 @@ private Scanner createScanner( exportOptions.getSortOrders(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage); + manager); } } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java index 26737e4ed6..4115db8904 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java @@ -1,6 +1,5 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; @@ -12,15 +11,22 @@ public class JsonExportManager extends ExportManager { /** - * Constructs a {@code JsonExportManager} with the specified {@link DistributedTransactionManager}, {@link - * ScalarDbDao}, and {@link ProducerTaskFactory}. + * Constructs a {@code JsonExportManager} for exporting data using a {@link + * DistributedTransactionManager}. * - * @param manager the {@code DistributedTransactionManager} instance used to read data from the database - * @param dao the {@code ScalarDbDao} used to execute export-related database operations - * @param producerTaskFactory the factory used to create producer tasks for exporting data + *

This constructor is used when exporting data in transactional mode, allowing data to be read + * from ScalarDB within a distributed transaction context and exported in JSON format. + * + * @param manager the {@link DistributedTransactionManager} used to read data in transactional + * mode + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations + * @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for + * exporting data */ public JsonExportManager( - DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + DistributedTransactionManager manager, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { super(manager, dao, producerTaskFactory); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java index 1c084df0c2..27f853ca61 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java @@ -1,6 +1,5 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; @@ -14,15 +13,22 @@ public class JsonLineExportManager extends ExportManager { /** - * Constructs a {@code JsonLineExportManager} with the specified {@link DistributedTransactionManager}, - * {@link ScalarDbDao}, and {@link ProducerTaskFactory}. + * Constructs a {@code JsonLineExportManager} for exporting data using a {@link + * DistributedTransactionManager}. * - * @param manager the {@code DistributedTransactionManager} instance used to read data from the database - * @param dao the {@code ScalarDbDao} used to execute export-related database operations - * @param producerTaskFactory the factory used to create producer tasks for exporting data + *

This constructor is used when exporting data in transactional mode, allowing data to be read + * from ScalarDB within a distributed transaction context and exported in JSON Lines format. + * + * @param manager the {@link DistributedTransactionManager} used to read data in transactional + * mode + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations + * @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for + * exporting data */ public JsonLineExportManager( - DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + DistributedTransactionManager manager, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { super(manager, dao, producerTaskFactory); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java index afd7b124af..5242acdeb3 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java @@ -2,6 +2,7 @@ import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Get; import com.scalar.db.api.GetBuilder; import com.scalar.db.api.Put; @@ -10,6 +11,7 @@ import com.scalar.db.api.Scan; import com.scalar.db.api.ScanBuilder; import com.scalar.db.api.Scanner; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.exception.storage.ExecutionException; @@ -219,47 +221,63 @@ public List scan( } /** - * Create a ScalarDB scanner instance + * Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from + * ScalarDB. * - * @param namespace ScalarDB namespace - * @param table ScalarDB table name - * @param projectionColumns List of column projection to use during scan - * @param limit Scan limit value - * @param storage Distributed storage for ScalarDB connection that is running in storage mode - * @return ScalarDB Scanner object - * @throws ScalarDbDaoException if scan fails + *

This method builds and executes a {@link Scan} operation for the specified table and returns + * a scanner that iterates over the retrieved records. It operates in storage mode using a {@link + * DistributedTransactionManager}. + * + * @param namespace the ScalarDB namespace to scan + * @param table the ScalarDB table name + * @param projectionColumns the list of column names to include in the scan results + * @param limit the maximum number of records to retrieve + * @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in + * storage mode + * @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over the scan + * results + * @throws ScalarDbDaoException if an error occurs while creating or executing the scan */ - public Scanner createScanner( + public TransactionManagerCrudOperable.Scanner createScanner( String namespace, String table, List projectionColumns, int limit, - DistributedStorage storage) + DistributedTransactionManager manager) throws ScalarDbDaoException { Scan scan = createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit); try { - return storage.scan(scan); - } catch (ExecutionException e) { + return manager.getScanner(scan); + } catch (CrudException e) { throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e); } } /** - * Create a ScalarDB scanner instance + * Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from + * ScalarDB. * - * @param namespace ScalarDB namespace - * @param table ScalarDB table name - * @param partitionKey Partition key used in ScalarDB scan - * @param scanRange Optional range to set ScalarDB scan start and end values - * @param sortOrders Optional scan clustering key sorting values - * @param projectionColumns List of column projection to use during scan - * @param limit Scan limit value - * @param storage Distributed storage for ScalarDB connection that is running in storage mode - * @return ScalarDB Scanner object - * @throws ScalarDbDaoException if scan fails + *

This method builds and executes a {@link Scan} operation using the provided parameters and + * returns a scanner that iterates over the matching records. It is used in storage mode through a + * {@link DistributedTransactionManager}. + * + * @param namespace the ScalarDB namespace to scan + * @param table the ScalarDB table name + * @param partitionKey the optional {@link Key} representing the partition key for the scan + * @param scanRange the optional {@link ScanRange} defining the start and end boundaries for the + * scan + * @param sortOrders the optional list of {@link Scan.Ordering} objects defining the clustering + * key sort order + * @param projectionColumns the optional list of column names to include in the scan results + * @param limit the maximum number of records to retrieve + * @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in + * storage mode + * @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over scan + * results + * @throws ScalarDbDaoException if an error occurs while creating or executing the scan */ - public Scanner createScanner( + public TransactionManagerCrudOperable.Scanner createScanner( String namespace, String table, @Nullable Key partitionKey, @@ -267,13 +285,13 @@ public Scanner createScanner( @Nullable List sortOrders, @Nullable List projectionColumns, int limit, - DistributedStorage storage) + DistributedTransactionManager manager) throws ScalarDbDaoException { Scan scan = createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit); try { - return storage.scan(scan); - } catch (ExecutionException e) { + return manager.getScanner(scan); + } catch (CrudException e) { throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e); } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java index c61aa6f989..8edd940bc4 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java @@ -1,9 +1,9 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; -import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.common.ResultImpl; import com.scalar.db.dataloader.core.FileFormat; import com.scalar.db.dataloader.core.ScanRange; @@ -33,14 +33,14 @@ public class CsvExportManagerTest { TableMetadata mockData; - DistributedStorage storage; + DistributedTransactionManager manager; @Spy ScalarDbDao dao; ProducerTaskFactory producerTaskFactory; ExportManager exportManager; @BeforeEach void setup() { - storage = Mockito.mock(DistributedStorage.class); + manager = Mockito.mock(DistributedTransactionManager.class); mockData = UnitTestUtils.createTestTableMetadata(); dao = Mockito.mock(ScalarDbDao.class); producerTaskFactory = new ProducerTaskFactory(null, false, true); @@ -49,8 +49,9 @@ void setup() { @Test void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -67,7 +68,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() exportOptions.getTableName(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = @@ -88,8 +89,9 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() void startExport_givenPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { producerTaskFactory = new ProducerTaskFactory(",", false, false); - exportManager = new CsvExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new CsvExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -114,7 +116,7 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile() exportOptions.getSortOrders(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = @@ -150,8 +152,9 @@ private void runExportAndAssertFirstLine(boolean excludeHeader, String expectedF throws Exception { // Arrange producerTaskFactory = new ProducerTaskFactory(",", false, false); - exportManager = new CsvExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new CsvExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -178,7 +181,7 @@ private void runExportAndAssertFirstLine(boolean excludeHeader, String expectedF exportOptions.getSortOrders(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java index c1ef7ead1a..c37715180e 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java @@ -1,9 +1,9 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; -import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.common.ResultImpl; import com.scalar.db.dataloader.core.FileFormat; import com.scalar.db.dataloader.core.ScanRange; @@ -33,14 +33,14 @@ public class JsonExportManagerTest { TableMetadata mockData; - DistributedStorage storage; + DistributedTransactionManager manager; @Spy ScalarDbDao dao; ProducerTaskFactory producerTaskFactory; ExportManager exportManager; @BeforeEach void setup() { - storage = Mockito.mock(DistributedStorage.class); + manager = Mockito.mock(DistributedTransactionManager.class); mockData = UnitTestUtils.createTestTableMetadata(); dao = Mockito.mock(ScalarDbDao.class); producerTaskFactory = new ProducerTaskFactory(null, false, true); @@ -49,8 +49,9 @@ void setup() { @Test void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new JsonExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.json"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -68,7 +69,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() exportOptions.getTableName(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = @@ -88,8 +89,9 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() @Test void startExport_givenPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new JsonExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.json"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -114,7 +116,7 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile() exportOptions.getSortOrders(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java index 31e4326a33..f2a6d1b360 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java @@ -1,9 +1,9 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; -import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.common.ResultImpl; import com.scalar.db.dataloader.core.FileFormat; import com.scalar.db.dataloader.core.ScanRange; @@ -32,14 +32,14 @@ public class JsonLineExportManagerTest { TableMetadata mockData; - DistributedStorage storage; + DistributedTransactionManager manager; @Spy ScalarDbDao dao; ProducerTaskFactory producerTaskFactory; ExportManager exportManager; @BeforeEach void setup() { - storage = Mockito.mock(DistributedStorage.class); + manager = Mockito.mock(DistributedTransactionManager.class); mockData = UnitTestUtils.createTestTableMetadata(); dao = Mockito.mock(ScalarDbDao.class); producerTaskFactory = new ProducerTaskFactory(null, false, true); @@ -48,8 +48,9 @@ void setup() { @Test void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -67,7 +68,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() exportOptions.getTableName(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = @@ -87,8 +88,9 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() @Test void startExport_givenPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -113,7 +115,7 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile() exportOptions.getSortOrders(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = From 1e08ecffe2691b470b37d19d7edae3cd35a95fc5 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 11 Nov 2025 11:07:28 +0530 Subject: [PATCH 03/17] scanner class updated --- .../db/dataloader/core/dataexport/ExportManager.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index 67653762df..550b41a331 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -2,7 +2,6 @@ import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; -import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.FileFormat; @@ -13,6 +12,8 @@ import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; import com.scalar.db.dataloader.core.util.TableMetadataUtil; +import com.scalar.db.exception.transaction.CrudException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.io.DataType; import java.io.BufferedWriter; import java.io.IOException; @@ -87,7 +88,8 @@ public ExportReport startExport( BufferedWriter bufferedWriter = new BufferedWriter(writer); boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON; - try (Scanner scanner = createScanner(exportOptions, dao, distributedTransactionManager)) { + try (TransactionManagerCrudOperable.Scanner scanner = + createScanner(exportOptions, dao, distributedTransactionManager)) { Iterator iterator = scanner.iterator(); AtomicBoolean isFirstBatch = new AtomicBoolean(true); @@ -114,7 +116,10 @@ public ExportReport startExport( // TODO: handle this } processFooter(exportOptions, tableMetadata, bufferedWriter); - } catch (InterruptedException | IOException e) { + } catch (InterruptedException + | IOException + | UnknownTransactionStatusException + | CrudException e) { logger.error("Error during export: {}", e.getMessage()); } finally { bufferedWriter.flush(); From e7c68a72db2ffa0df175612a072243a28acb6612 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 11 Nov 2025 11:51:32 +0530 Subject: [PATCH 04/17] Feedback changes --- .../com/scalar/db/dataloader/core/dataexport/ExportManager.java | 2 +- .../db/dataloader/core/dataexport/CsvExportManagerTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index 550b41a331..8f079b2f99 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -120,7 +120,7 @@ public ExportReport startExport( | IOException | UnknownTransactionStatusException | CrudException e) { - logger.error("Error during export: {}", e.getMessage()); + logger.error("Error during export: ", e); } finally { bufferedWriter.flush(); } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java index 8edd940bc4..26e6a88a85 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java @@ -49,7 +49,7 @@ void setup() { @Test void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory); + exportManager = new CsvExportManager(manager, dao, producerTaskFactory); TransactionManagerCrudOperable.Scanner scanner = Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; From b973df88d81b3ab42c7e515837fd53baa5ef2fe5 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 11 Nov 2025 12:03:14 +0530 Subject: [PATCH 05/17] Feedback changes javadoc and param rename --- .../db/dataloader/core/dataexport/ExportManager.java | 4 ++-- .../db/dataloader/core/dataimport/dao/ScalarDbDao.java | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index 8f079b2f99..0743a79106 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -36,7 +36,7 @@ public abstract class ExportManager { private static final Logger logger = LoggerFactory.getLogger(ExportManager.class); - private final DistributedTransactionManager distributedTransactionManager; + private final DistributedTransactionManager manager; private final ScalarDbDao dao; private final ProducerTaskFactory producerTaskFactory; private final Object lock = new Object(); @@ -89,7 +89,7 @@ public ExportReport startExport( boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON; try (TransactionManagerCrudOperable.Scanner scanner = - createScanner(exportOptions, dao, distributedTransactionManager)) { + createScanner(exportOptions, dao, manager)) { Iterator iterator = scanner.iterator(); AtomicBoolean isFirstBatch = new AtomicBoolean(true); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java index 5242acdeb3..6afceae89a 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java @@ -224,9 +224,8 @@ public List scan( * Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from * ScalarDB. * - *

This method builds and executes a {@link Scan} operation for the specified table and returns - * a scanner that iterates over the retrieved records. It operates in storage mode using a {@link - * DistributedTransactionManager}. + *

a scanner that iterates over the retrieved records. It performs the scan transactionally + * using a {@link DistributedTransactionManager}. * * @param namespace the ScalarDB namespace to scan * @param table the ScalarDB table name @@ -258,9 +257,8 @@ public TransactionManagerCrudOperable.Scanner createScanner( * Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from * ScalarDB. * - *

This method builds and executes a {@link Scan} operation using the provided parameters and - * returns a scanner that iterates over the matching records. It is used in storage mode through a - * {@link DistributedTransactionManager}. + *

returns a scanner that iterates over the matching records. The scan is performed + * transactionally through a {@link DistributedTransactionManager}. * * @param namespace the ScalarDB namespace to scan * @param table the ScalarDB table name From 186d3c384535d50d423fd085567187f643e8f722 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 11 Nov 2025 18:08:11 +0530 Subject: [PATCH 06/17] Replace storage with transaction in table metadata service --- .../cli/command/dataexport/ExportCommand.java | 4 +--- .../cli/command/dataimport/ImportCommand.java | 8 ++++---- .../core/tablemetadata/TableMetadataService.java | 6 +++--- .../core/tablemetadata/TableMetadataServiceTest.java | 10 +++++----- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index e6605a504b..e6c28ae2f0 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -28,7 +28,6 @@ import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService; import com.scalar.db.dataloader.core.util.KeyUtils; import com.scalar.db.io.Key; -import com.scalar.db.service.StorageFactory; import com.scalar.db.service.TransactionFactory; import java.io.BufferedWriter; import java.nio.charset.Charset; @@ -64,10 +63,9 @@ public Integer call() throws Exception { spec.commandLine(), dataChunkSize, DataLoaderError.INVALID_DATA_CHUNK_SIZE); validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS); - StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath); TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath); TableMetadataService metaDataService = - new TableMetadataService(storageFactory.getStorageAdmin()); + new TableMetadataService(transactionFactory.getTransactionAdmin()); ScalarDbDao scalarDbDao = new ScalarDbDao(); ExportManager exportManager = diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java index 73f40b0577..0964c96c23 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -4,7 +4,7 @@ import static com.scalar.db.dataloader.cli.util.CommandLineInputUtils.validatePositiveValue; import com.fasterxml.jackson.databind.ObjectMapper; -import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.DistributedTransactionAdmin; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.FileFormat; @@ -107,9 +107,9 @@ private Map createTableMetadataMap( ControlFile controlFile, String namespace, String tableName) throws IOException, TableMetadataException { File configFile = new File(configFilePath); - StorageFactory storageFactory = StorageFactory.create(configFile); - try (DistributedStorageAdmin storageAdmin = storageFactory.getStorageAdmin()) { - TableMetadataService tableMetadataService = new TableMetadataService(storageAdmin); + TransactionFactory transactionFactory = TransactionFactory.create(configFile); + try (DistributedTransactionAdmin transactionAdmin = transactionFactory.getTransactionAdmin()) { + TableMetadataService tableMetadataService = new TableMetadataService(transactionAdmin); Map tableMetadataMap = new HashMap<>(); if (controlFile != null) { for (ControlFileTable table : controlFile.getTables()) { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java index 70d49a51ee..a688e5bbfa 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java @@ -1,6 +1,6 @@ package com.scalar.db.dataloader.core.tablemetadata; -import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.DistributedTransactionAdmin; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.util.TableMetadataUtil; @@ -18,7 +18,7 @@ @RequiredArgsConstructor public class TableMetadataService { - private final DistributedStorageAdmin storageAdmin; + private final DistributedTransactionAdmin transactionAdmin; /** * Retrieves the {@link TableMetadata} for a specific namespace and table name. @@ -32,7 +32,7 @@ public class TableMetadataService { public TableMetadata getTableMetadata(String namespace, String tableName) throws TableMetadataException { try { - TableMetadata tableMetadata = storageAdmin.getTableMetadata(namespace, tableName); + TableMetadata tableMetadata = transactionAdmin.getTableMetadata(namespace, tableName); if (tableMetadata == null) { throw new TableMetadataException( DataLoaderError.MISSING_NAMESPACE_OR_TABLE.buildMessage(namespace, tableName)); diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataServiceTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataServiceTest.java index ff9d5d8b0c..f782c565e2 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataServiceTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataServiceTest.java @@ -2,7 +2,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; -import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.DistributedTransactionAdmin; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.UnitTestUtils; @@ -17,15 +17,15 @@ class TableMetadataServiceTest { - DistributedStorageAdmin storageAdmin; + DistributedTransactionAdmin transactionAdmin; TableMetadataService tableMetadataService; @BeforeEach void setup() throws ExecutionException { - storageAdmin = Mockito.mock(DistributedStorageAdmin.class); - Mockito.when(storageAdmin.getTableMetadata("namespace", "table")) + transactionAdmin = Mockito.mock(DistributedTransactionAdmin.class); + Mockito.when(transactionAdmin.getTableMetadata("namespace", "table")) .thenReturn(UnitTestUtils.createTestTableMetadata()); - tableMetadataService = new TableMetadataService(storageAdmin); + tableMetadataService = new TableMetadataService(transactionAdmin); } @Test From 1d81db79e09a213d8eb8d52d497787b1afc93f97 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 11 Nov 2025 18:40:15 +0530 Subject: [PATCH 07/17] Initial commit --- .../cli/command/dataimport/ImportCommand.java | 37 +++-------- .../core/dataimport/ImportManager.java | 7 +-- .../core/dataimport/dao/ScalarDbDao.java | 62 ++++++++++++------- .../dataimport/processor/ImportProcessor.java | 2 +- .../dataimport/task/ImportStorageTask.java | 31 +++++----- .../processor/CsvImportProcessorTest.java | 6 +- .../processor/JsonImportProcessorTest.java | 6 +- .../JsonLinesImportProcessorTest.java | 6 +- 8 files changed, 69 insertions(+), 88 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java index 0964c96c23..2e7300e03d 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -13,8 +13,6 @@ import com.scalar.db.dataloader.core.dataimport.ImportOptions; import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile; import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable; -import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbStorageManager; -import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbTransactionManager; import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; import com.scalar.db.dataloader.core.dataimport.log.LogMode; import com.scalar.db.dataloader.core.dataimport.log.SingleFileImportLogger; @@ -26,7 +24,6 @@ import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException; import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService; import com.scalar.db.dataloader.core.util.TableMetadataUtil; -import com.scalar.db.service.StorageFactory; import com.scalar.db.service.TransactionFactory; import java.io.BufferedReader; import java.io.File; @@ -145,32 +142,14 @@ private ImportManager createImportManager( throws IOException { File configFile = new File(configFilePath); ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory(); - ImportManager importManager; - if (scalarDbMode == ScalarDbMode.TRANSACTION) { - ScalarDbTransactionManager scalarDbTransactionManager = - new ScalarDbTransactionManager(TransactionFactory.create(configFile)); - importManager = - new ImportManager( - tableMetadataMap, - reader, - importOptions, - importProcessorFactory, - ScalarDbMode.TRANSACTION, - null, - scalarDbTransactionManager.getDistributedTransactionManager()); - } else { - ScalarDbStorageManager scalarDbStorageManager = - new ScalarDbStorageManager(StorageFactory.create(configFile)); - importManager = - new ImportManager( - tableMetadataMap, - reader, - importOptions, - importProcessorFactory, - ScalarDbMode.STORAGE, - scalarDbStorageManager.getDistributedStorage(), - null); - } + ImportManager importManager = + new ImportManager( + tableMetadataMap, + reader, + importOptions, + importProcessorFactory, + ScalarDbMode.TRANSACTION, + TransactionFactory.create(configFile).getTransactionManager()); if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) { importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory)); } else { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java index 07ef2dd756..89b836c750 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java @@ -1,6 +1,5 @@ package com.scalar.db.dataloader.core.dataimport; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.ScalarDbMode; @@ -44,7 +43,6 @@ public class ImportManager implements ImportEventListener { private final ImportProcessorFactory importProcessorFactory; private final List listeners = new ArrayList<>(); private final ScalarDbMode scalarDbMode; - private final DistributedStorage distributedStorage; private final DistributedTransactionManager distributedTransactionManager; /** @@ -62,7 +60,6 @@ public void startImport() { .tableMetadataByTableName(tableMetadata) .dao(new ScalarDbDao()) .distributedTransactionManager(distributedTransactionManager) - .distributedStorage(distributedStorage) .tableColumnDataTypes(getTableColumnDataTypes()) .build(); ImportProcessor processor = importProcessorFactory.createImportProcessor(params); @@ -169,9 +166,7 @@ public void onAllDataChunksCompleted() { /** Close resources properly once the process is completed */ public void closeResources() { try { - if (distributedStorage != null) { - distributedStorage.close(); - } else if (distributedTransactionManager != null) { + if (distributedTransactionManager != null) { distributedTransactionManager.close(); } } catch (Throwable e) { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java index 6afceae89a..119d532d70 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java @@ -16,6 +16,7 @@ import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CrudException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.io.Column; import com.scalar.db.io.Key; import java.io.IOException; @@ -29,22 +30,30 @@ public class ScalarDbDao { /** - * Retrieve record from ScalarDB instance in storage mode + * Retrieves a record from a ScalarDB table using the specified partition and optional clustering + * keys while operating in storage mode through a {@link DistributedTransactionManager}. * - * @param namespace Namespace name - * @param table Table name - * @param partitionKey Partition key - * @param clusteringKey Optional clustering key for get - * @param storage Distributed storage for ScalarDB connection that is running in storage mode. - * @return Optional get result - * @throws ScalarDbDaoException if something goes wrong while reading the data + *

This method creates a {@link Get} operation for the given namespace and table, executes it + * using the provided transaction manager, and returns the result if the record exists. + * + * @param namespace the name of the ScalarDB namespace containing the target table + * @param table the name of the table to retrieve the record from + * @param partitionKey the partition key identifying the record's partition + * @param clusteringKey the optional clustering key identifying a specific record within the + * partition; may be {@code null} if the table does not use clustering keys + * @param manager the {@link DistributedTransactionManager} instance used to perform the get + * operation + * @return an {@link Optional} containing the {@link Result} if the record exists, or an empty + * {@link Optional} if not found + * @throws ScalarDbDaoException if an error occurs while performing the get operation or + * interacting with ScalarDB */ public Optional get( String namespace, String table, Key partitionKey, Key clusteringKey, - DistributedStorage storage) + DistributedTransactionManager manager) throws ScalarDbDaoException { // Retrieving the key data for logging @@ -52,8 +61,8 @@ public Optional get( try { Get get = createGetWith(namespace, table, partitionKey, clusteringKey); - return storage.get(get); - } catch (ExecutionException e) { + return manager.get(get); + } catch (CrudException | UnknownTransactionStatusException e) { throw new ScalarDbDaoException("error GET " + loggingKey, e); } } @@ -117,15 +126,24 @@ public void put( } /** - * Save record in ScalarDB instance + * Saves a record into a ScalarDB table using the specified partition and optional clustering keys + * through a {@link DistributedTransactionManager}. * - * @param namespace Namespace name - * @param table Table name - * @param partitionKey Partition key - * @param clusteringKey Optional clustering key - * @param columns List of column values to be inserted or updated - * @param storage Distributed storage for ScalarDB connection that is running in storage mode - * @throws ScalarDbDaoException if something goes wrong while executing the transaction + *

This method constructs a {@link Put} operation with the provided key and column information, + * then executes it using the given transaction manager. The operation inserts a new record or + * updates an existing one if a record with the same primary key already exists. + * + * @param namespace the name of the ScalarDB namespace containing the target table + * @param table the name of the table where the record will be inserted or updated + * @param partitionKey the partition key identifying the record's partition + * @param clusteringKey the optional clustering key identifying a specific record within the + * partition; may be {@code null} if the table does not use clustering keys + * @param columns the list of {@link Column} objects representing the column values to insert or + * update + * @param manager the {@link DistributedTransactionManager} instance used to perform the put + * operation + * @throws ScalarDbDaoException if an error occurs while executing the put operation or + * interacting with ScalarDB */ public void put( String namespace, @@ -133,12 +151,12 @@ public void put( Key partitionKey, Key clusteringKey, List> columns, - DistributedStorage storage) + DistributedTransactionManager manager) throws ScalarDbDaoException { Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns); try { - storage.put(put); - } catch (ExecutionException e) { + manager.put(put); + } catch (CrudException | UnknownTransactionStatusException e) { throw new ScalarDbDaoException( DataLoaderError.ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index 81daf9646e..a8c3def492 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -360,7 +360,7 @@ private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importR .dao(params.getDao()) .build(); ImportTaskResult importRecordResult = - new ImportStorageTask(taskParams, params.getDistributedStorage()).execute(); + new ImportStorageTask(taskParams, params.getDistributedTransactionManager()).execute(); ImportTaskResult modifiedTaskResult = ImportTaskResult.builder() diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java index e847cc3a34..824c475371 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataimport.task; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; import com.scalar.db.io.Column; @@ -9,12 +10,12 @@ import java.util.Optional; /** - * An import task that interacts with a {@link DistributedStorage} for data retrieval and storage + * An import task that interacts with a {@link DistributedStorage} for data retrieval and manager * operations. * *

This class extends {@link ImportTask} and provides concrete implementations for fetching and * storing records using a {@link DistributedStorage} instance. It acts as a bridge between the - * import process and the underlying distributed storage system. + * import process and the underlying distributed manager system. * *

The task handles both read and write operations: * @@ -23,31 +24,31 @@ *

  • Storing new or updated records with their associated columns * * - *

    All storage operations are performed through the provided {@link DistributedStorage} instance, + *

    All manager operations are performed through the provided {@link DistributedStorage} instance, * which must be properly initialized before creating this task. */ public class ImportStorageTask extends ImportTask { - private final DistributedStorage storage; + private final DistributedTransactionManager manager; /** - * Constructs an {@code ImportStorageTask} with the specified parameters and storage. + * Constructs an {@code ImportStorageTask} with the specified parameters and manager. * * @param params the import task parameters containing configuration and DAO objects - * @param storage the distributed storage instance to be used for data operations - * @throws NullPointerException if either params or storage is null + * @param manager the distributed manager instance to be used for data operations + * @throws NullPointerException if either params or manager is null */ - public ImportStorageTask(ImportTaskParams params, DistributedStorage storage) { + public ImportStorageTask(ImportTaskParams params, DistributedTransactionManager manager) { super(params); - this.storage = storage; + this.manager = manager; } /** - * Retrieves a data record from the distributed storage using the specified keys. + * Retrieves a data record from the distributed manager using the specified keys. * *

    This method attempts to fetch a single record from the specified table using both partition * and clustering keys. The operation is performed through the configured DAO using the associated - * storage instance. + * manager instance. * * @param namespace the namespace of the table to query * @param tableName the name of the table to query @@ -62,15 +63,15 @@ public ImportStorageTask(ImportTaskParams params, DistributedStorage storage) { protected Optional getDataRecord( String namespace, String tableName, Key partitionKey, Key clusteringKey) throws ScalarDbDaoException { - return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, this.storage); + return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, this.manager); } /** - * Saves a record into the distributed storage with the specified keys and columns. + * Saves a record into the distributed manager with the specified keys and columns. * *

    This method writes or updates a record in the specified table using the provided keys and * column values. The operation is performed through the configured DAO using the associated - * storage instance. + * manager instance. * * @param namespace the namespace of the target table * @param tableName the name of the target table @@ -88,6 +89,6 @@ protected void saveRecord( Key clusteringKey, List> columns) throws ScalarDbDaoException { - params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, this.storage); + params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, this.manager); } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java index 5457bc51d1..58eeaa8325 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java @@ -3,7 +3,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; @@ -32,7 +31,6 @@ class CsvImportProcessorTest { @Mock TableColumnDataTypes tableColumnDataTypes; ScalarDbDao dao; - @Mock DistributedStorage distributedStorage; DistributedTransactionManager distributedTransactionManager; CsvImportProcessor csvImportProcessor; @@ -65,7 +63,7 @@ void setup() throws ScalarDbDaoException, TransactionException { "table", UnitTestUtils.getPartitionKey(1), UnitTestUtils.getClusteringKey(), - distributedStorage)) + distributedTransactionManager)) .thenReturn(UnitTestUtils.getResult(1)); Mockito.when( dao.get( @@ -84,7 +82,6 @@ void test_importProcessWithStorage() { .scalarDbMode(ScalarDbMode.STORAGE) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName) @@ -103,7 +100,6 @@ void test_importProcessWithTransaction() { .scalarDbMode(ScalarDbMode.TRANSACTION) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName) diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java index a5705d3684..2b1bc74ad5 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java @@ -3,7 +3,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; @@ -32,7 +31,6 @@ class JsonImportProcessorTest { @Mock TableColumnDataTypes tableColumnDataTypes; ScalarDbDao dao; - @Mock DistributedStorage distributedStorage; DistributedTransactionManager distributedTransactionManager; JsonImportProcessor jsonImportProcessor; @@ -65,7 +63,7 @@ void setup() throws ScalarDbDaoException, TransactionException { "table", UnitTestUtils.getPartitionKey(1), UnitTestUtils.getClusteringKey(), - distributedStorage)) + distributedTransactionManager)) .thenReturn(UnitTestUtils.getResult(1)); Mockito.when( dao.get( @@ -84,7 +82,6 @@ void test_importProcessWithStorage() { .scalarDbMode(ScalarDbMode.STORAGE) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName) @@ -103,7 +100,6 @@ void test_importProcessWithTransaction() { .scalarDbMode(ScalarDbMode.TRANSACTION) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName) diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java index 30992f1d35..edb3b8ea2b 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java @@ -3,7 +3,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; @@ -32,7 +31,6 @@ class JsonLinesImportProcessorTest { @Mock TableColumnDataTypes tableColumnDataTypes; ScalarDbDao dao; - @Mock DistributedStorage distributedStorage; DistributedTransactionManager distributedTransactionManager; JsonLinesImportProcessor jsonLinesImportProcessor; @@ -65,7 +63,7 @@ void setup() throws ScalarDbDaoException, TransactionException { "table", UnitTestUtils.getPartitionKey(1), UnitTestUtils.getClusteringKey(), - distributedStorage)) + distributedTransactionManager)) .thenReturn(UnitTestUtils.getResult(1)); Mockito.when( dao.get( @@ -84,7 +82,6 @@ void test_importProcessWithStorage() { .scalarDbMode(ScalarDbMode.STORAGE) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName) @@ -103,7 +100,6 @@ void test_importProcessWithTransaction() { .scalarDbMode(ScalarDbMode.TRANSACTION) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName) From 36ff32f10f5c67c8b97975d3ceb6022e0a2509ff Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 12 Nov 2025 15:31:52 +0530 Subject: [PATCH 08/17] transaction mode use fixed --- .../db/dataloader/cli/command/dataimport/ImportCommand.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java index 2e7300e03d..c5ae0ded2c 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -8,7 +8,6 @@ import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.FileFormat; -import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.dataimport.ImportManager; import com.scalar.db.dataloader.core.dataimport.ImportOptions; import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile; @@ -148,7 +147,7 @@ private ImportManager createImportManager( reader, importOptions, importProcessorFactory, - ScalarDbMode.TRANSACTION, + scalarDbMode, TransactionFactory.create(configFile).getTransactionManager()); if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) { importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory)); From 5b6f86a7717c1c7af786c9ee358342b77d167e97 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 12 Nov 2025 16:51:30 +0530 Subject: [PATCH 09/17] Removed unused code and udpated java doc comments --- .../core/dataimport/dao/ScalarDbDao.java | 41 ------------ .../dao/ScalarDbStorageManager.java | 49 -------------- .../dao/ScalarDbTransactionManager.java | 35 ---------- .../dataimport/task/ImportStorageTask.java | 64 ++++++++++--------- 4 files changed, 33 insertions(+), 156 deletions(-) delete mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbStorageManager.java delete mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbTransactionManager.java diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java index 119d532d70..a56ceca1e0 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java @@ -1,6 +1,5 @@ package com.scalar.db.dataloader.core.dataimport.dao; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Get; @@ -10,16 +9,13 @@ import com.scalar.db.api.Result; import com.scalar.db.api.Scan; import com.scalar.db.api.ScanBuilder; -import com.scalar.db.api.Scanner; import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.ScanRange; -import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CrudException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.io.Column; import com.scalar.db.io.Key; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; @@ -162,43 +158,6 @@ public void put( } } - /** - * Scan a ScalarDB table - * - * @param namespace ScalarDB namespace - * @param table ScalarDB table name - * @param partitionKey Partition key used in ScalarDB scan - * @param range Optional range to set ScalarDB scan start and end values - * @param sorts Optional scan clustering key sorting values - * @param projections List of column projection to use during scan - * @param limit Scan limit value - * @param storage Distributed storage for ScalarDB connection that is running in storage mode - * @return List of ScalarDB scan results - * @throws ScalarDbDaoException if scan fails - */ - public List scan( - String namespace, - String table, - Key partitionKey, - ScanRange range, - List sorts, - List projections, - int limit, - DistributedStorage storage) - throws ScalarDbDaoException { - // Create scan - Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit); - - // scan data - try { - try (Scanner scanner = storage.scan(scan)) { - return scanner.all(); - } - } catch (ExecutionException | IOException e) { - throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e); - } - } - /** * Scan a ScalarDB table * diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbStorageManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbStorageManager.java deleted file mode 100644 index 54185b9b3a..0000000000 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbStorageManager.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.scalar.db.dataloader.core.dataimport.dao; - -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.DistributedStorageAdmin; -import com.scalar.db.service.StorageFactory; -import javax.annotation.Nullable; - -/** - * A manager class for handling ScalarDB operations in storage mode. - * - *

    Provides access to {@link DistributedStorage} for data operations and {@link - * DistributedStorageAdmin} for administrative operations such as schema management. - * - *

    This class is typically used when interacting with ScalarDB in a non-transactional, - * storage-only configuration. - */ -public class ScalarDbStorageManager { - - @Nullable private final DistributedStorage storage; - private final DistributedStorageAdmin storageAdmin; - - /** - * Constructs a {@code ScalarDbStorageManager} using the provided {@link StorageFactory}. - * - * @param storageFactory the factory used to create the ScalarDB storage and admin instances - */ - public ScalarDbStorageManager(StorageFactory storageFactory) { - storage = storageFactory.getStorage(); - storageAdmin = storageFactory.getStorageAdmin(); - } - - /** - * Returns distributed storage for ScalarDB connection that is running in storage mode - * - * @return distributed storage object - */ - public DistributedStorage getDistributedStorage() { - return storage; - } - - /** - * Returns distributed storage admin for ScalarDB admin operations - * - * @return distributed storage admin object - */ - public DistributedStorageAdmin getDistributedStorageAdmin() { - return storageAdmin; - } -} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbTransactionManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbTransactionManager.java deleted file mode 100644 index e3b6f1bb85..0000000000 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbTransactionManager.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.scalar.db.dataloader.core.dataimport.dao; - -import com.scalar.db.api.DistributedTransactionManager; -import com.scalar.db.service.TransactionFactory; - -/** - * A manager class for handling ScalarDB operations in transaction mode. - * - *

    Provides access to {@link DistributedTransactionManager} for data operations - * - *

    This class is typically used when interacting with ScalarDB in a transactional configuration. - */ -public class ScalarDbTransactionManager { - - private final DistributedTransactionManager transactionManager; - - /** - * Constructs a {@code ScalarDbTransactionManager} using the provided {@link TransactionFactory}. - * - * @param transactionFactory the factory used to create the ScalarDB storage and admin instances - */ - public ScalarDbTransactionManager(TransactionFactory transactionFactory) { - transactionManager = transactionFactory.getTransactionManager(); - } - - /** - * Returns distributed Transaction manager for ScalarDB connection that is running in transaction - * mode - * - * @return distributed transaction manager object - */ - public DistributedTransactionManager getDistributedTransactionManager() { - return transactionManager; - } -} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java index 824c475371..378cc0f3f4 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java @@ -1,6 +1,5 @@ package com.scalar.db.dataloader.core.dataimport.task; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; @@ -10,33 +9,34 @@ import java.util.Optional; /** - * An import task that interacts with a {@link DistributedStorage} for data retrieval and manager - * operations. + * An import task that performs data operations using a {@link DistributedTransactionManager}. * - *

    This class extends {@link ImportTask} and provides concrete implementations for fetching and - * storing records using a {@link DistributedStorage} instance. It acts as a bridge between the - * import process and the underlying distributed manager system. + *

    This class extends {@link ImportTask} and provides concrete implementations for retrieving and + * storing records using a {@link DistributedTransactionManager} instance. It serves as a bridge + * between the import process and the underlying transactional data management layer in ScalarDB. * - *

    The task handles both read and write operations: + *

    The task supports both read and write operations: * *

      - *
    • Reading existing records using partition and clustering keys - *
    • Storing new or updated records with their associated columns + *
    • Retrieving existing records using partition and optional clustering keys. + *
    • Inserting or updating records with their associated column values. *
    * - *

    All manager operations are performed through the provided {@link DistributedStorage} instance, - * which must be properly initialized before creating this task. + *

    All data operations are delegated to the DAO configured within {@link ImportTaskParams}, + * executed through the provided {@link DistributedTransactionManager}. The manager must be properly + * initialized and connected before creating an instance of this class. */ public class ImportStorageTask extends ImportTask { private final DistributedTransactionManager manager; /** - * Constructs an {@code ImportStorageTask} with the specified parameters and manager. + * Constructs an {@code ImportStorageTask} with the specified parameters and transaction manager. * * @param params the import task parameters containing configuration and DAO objects - * @param manager the distributed manager instance to be used for data operations - * @throws NullPointerException if either params or manager is null + * @param manager the {@link DistributedTransactionManager} instance used for transactional data + * operations + * @throws NullPointerException if either {@code params} or {@code manager} is {@code null} */ public ImportStorageTask(ImportTaskParams params, DistributedTransactionManager manager) { super(params); @@ -44,20 +44,21 @@ public ImportStorageTask(ImportTaskParams params, DistributedTransactionManager } /** - * Retrieves a data record from the distributed manager using the specified keys. + * Retrieves a data record from the database using the specified keys. * - *

    This method attempts to fetch a single record from the specified table using both partition - * and clustering keys. The operation is performed through the configured DAO using the associated - * manager instance. + *

    This method attempts to fetch a single record from the given table using the provided + * partition and optional clustering keys. The retrieval is executed through the DAO configured in + * {@link ImportTaskParams}, utilizing the associated {@link DistributedTransactionManager}. * - * @param namespace the namespace of the table to query - * @param tableName the name of the table to query + * @param namespace the ScalarDB namespace of the target table + * @param tableName the name of the target table * @param partitionKey the partition key identifying the record's partition - * @param clusteringKey the clustering key for further record identification within the partition - * @return an {@link Optional} containing the {@link Result} if the record exists, otherwise an - * empty {@link Optional} - * @throws ScalarDbDaoException if an error occurs during the retrieval operation, such as - * connection issues or invalid table/namespace + * @param clusteringKey the clustering key for uniquely identifying the record within the + * partition + * @return an {@link Optional} containing the {@link Result} if a record exists, or an empty + * {@link Optional} if no matching record is found + * @throws ScalarDbDaoException if an error occurs during the retrieval operation, such as a + * connection failure or invalid table/namespace */ @Override protected Optional getDataRecord( @@ -67,17 +68,18 @@ protected Optional getDataRecord( } /** - * Saves a record into the distributed manager with the specified keys and columns. + * Saves or updates a record in the database using the specified keys and column values. * *

    This method writes or updates a record in the specified table using the provided keys and - * column values. The operation is performed through the configured DAO using the associated - * manager instance. + * columns. The operation is performed through the DAO configured in {@link ImportTaskParams}, + * utilizing the associated {@link DistributedTransactionManager}. * - * @param namespace the namespace of the target table + * @param namespace the ScalarDB namespace of the target table * @param tableName the name of the target table * @param partitionKey the partition key determining where the record will be stored - * @param clusteringKey the clustering key for organizing records within the partition - * @param columns the list of columns containing the record's data to be saved + * @param clusteringKey the clustering key for uniquely identifying the record within the + * partition + * @param columns the list of columns containing the data to be stored * @throws ScalarDbDaoException if an error occurs during the save operation, such as connection * issues, invalid data types, or constraint violations */ From 1f0644e467b984b84d5d132bf8d77e91fc952a2e Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 12 Nov 2025 18:09:16 +0530 Subject: [PATCH 10/17] Add changes based on feedback --- .../cli/command/dataexport/ExportCommand.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index e6c28ae2f0..6967514b40 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -64,15 +64,18 @@ public Integer call() throws Exception { validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS); TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath); - TableMetadataService metaDataService = - new TableMetadataService(transactionFactory.getTransactionAdmin()); + TableMetadata tableMetadata; + try (com.scalar.db.api.DistributedTransactionAdmin admin = + transactionFactory.getTransactionAdmin()) { + TableMetadataService metaDataService = new TableMetadataService(admin); + tableMetadata = metaDataService.getTableMetadata(namespace, table); + } + ScalarDbDao scalarDbDao = new ScalarDbDao(); ExportManager exportManager = createExportManager(transactionFactory, scalarDbDao, outputFormat); - TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table); - Key partitionKey = partitionKeyValue != null ? getKeysFromList(partitionKeyValue, tableMetadata) : null; Key scanStartKey = From 326f7a7afd223180e40f8a228bf3cdab8ba5b238 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 12 Nov 2025 18:15:35 +0530 Subject: [PATCH 11/17] Added changes based on feedback --- .../dataimport/processor/ImportProcessorParams.java | 4 ---- .../dataloader/core/dataimport/ImportManagerTest.java | 10 ++++------ .../core/dataimport/processor/ImportProcessorTest.java | 4 ---- 3 files changed, 4 insertions(+), 14 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java index 2d85325044..49448b6c4b 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java @@ -1,6 +1,5 @@ package com.scalar.db.dataloader.core.dataimport.processor; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.ScalarDbMode; @@ -36,9 +35,6 @@ public class ImportProcessorParams { /** Data Access Object for ScalarDB operations. */ ScalarDbDao dao; - /** Storage interface for non-transactional operations. */ - DistributedStorage distributedStorage; - /** Transaction manager for handling transactional operations. */ DistributedTransactionManager distributedTransactionManager; } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java index bf348c5321..86294eff1d 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java @@ -45,8 +45,7 @@ void setUp() { options, processorFactory, ScalarDbMode.STORAGE, - distributedStorage, - null); // Only one resource present + distributedTransactionManager); importManager.addListener(listener1); importManager.addListener(listener2); } @@ -57,7 +56,7 @@ void onAllDataChunksCompleted_shouldNotifyListenersAndCloseStorage() { verify(listener1).onAllDataChunksCompleted(); verify(listener2).onAllDataChunksCompleted(); - verify(distributedStorage).close(); + verify(distributedTransactionManager).close(); } @Test @@ -69,7 +68,7 @@ void onAllDataChunksCompleted_shouldAggregateListenerExceptionAndStillCloseResou assertTrue(thrown.getMessage().contains("Error during completion")); assertEquals("Listener1 failed", thrown.getCause().getMessage()); - verify(distributedStorage).close(); + verify(distributedTransactionManager).close(); } @Test @@ -81,7 +80,6 @@ void closeResources_shouldCloseTransactionManagerIfStorageIsNull() { mock(ImportOptions.class), mock(ImportProcessorFactory.class), ScalarDbMode.TRANSACTION, - null, distributedTransactionManager); managerWithTx.closeResources(); @@ -90,7 +88,7 @@ void closeResources_shouldCloseTransactionManagerIfStorageIsNull() { @Test void closeResources_shouldThrowIfResourceCloseFails() { - doThrow(new RuntimeException("Close failed")).when(distributedStorage).close(); + doThrow(new RuntimeException("Close failed")).when(distributedTransactionManager).close(); RuntimeException ex = assertThrows(RuntimeException.class, () -> importManager.closeResources()); diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java index d60ebecb00..63370131ab 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java @@ -87,7 +87,6 @@ void process_withStorageMode_shouldProcessAllDataChunks() { BufferedReader reader = new BufferedReader(new StringReader("test data")); when(params.getScalarDbMode()).thenReturn(ScalarDbMode.STORAGE); when(params.getDao()).thenReturn(dao); - when(params.getDistributedStorage()).thenReturn(distributedStorage); when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes); TestImportProcessor processor = new TestImportProcessor(params); @@ -150,7 +149,6 @@ void process_withMultipleDataChunks_shouldUseThreadPool() { final int maxThreads = 4; when(importOptions.getMaxThreads()).thenReturn(maxThreads); when(params.getDao()).thenReturn(dao); - when(params.getDistributedStorage()).thenReturn(distributedStorage); when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes); when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName); @@ -205,7 +203,6 @@ void process_withLargeNumberOfTasks_shouldWaitForAllTasksToComplete() { final int maxThreads = 2; when(importOptions.getMaxThreads()).thenReturn(maxThreads); when(params.getDao()).thenReturn(dao); - when(params.getDistributedStorage()).thenReturn(distributedStorage); when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes); when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName); @@ -235,7 +232,6 @@ void process_withShutdown_shouldShutdownExecutorsGracefully() { // Arrange when(params.getScalarDbMode()).thenReturn(ScalarDbMode.STORAGE); when(params.getDao()).thenReturn(dao); - when(params.getDistributedStorage()).thenReturn(distributedStorage); when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes); when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName); From e8f9feb3c1991b6168ce1633df0cc864e8286d98 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Thu, 13 Nov 2025 09:25:36 +0530 Subject: [PATCH 12/17] Removed usunsed params --- .../db/dataloader/core/dataimport/ImportManagerTest.java | 3 --- .../core/dataimport/processor/ImportProcessorTest.java | 2 -- 2 files changed, 5 deletions(-) diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java index 86294eff1d..56e3517ce0 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java @@ -7,7 +7,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.ScalarDbMode; @@ -23,7 +22,6 @@ public class ImportManagerTest { private ImportManager importManager; private ImportEventListener listener1; private ImportEventListener listener2; - private DistributedStorage distributedStorage; private DistributedTransactionManager distributedTransactionManager; @BeforeEach @@ -35,7 +33,6 @@ void setUp() { listener1 = mock(ImportEventListener.class); listener2 = mock(ImportEventListener.class); - distributedStorage = mock(DistributedStorage.class); distributedTransactionManager = mock(DistributedTransactionManager.class); importManager = diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java index 63370131ab..24c272a45d 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java @@ -10,7 +10,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; @@ -62,7 +61,6 @@ class ImportProcessorTest { @Mock private ImportProcessorParams params; @Mock private ImportOptions importOptions; @Mock private ScalarDbDao dao; - @Mock private DistributedStorage distributedStorage; @Mock private DistributedTransactionManager distributedTransactionManager; @Mock private DistributedTransaction distributedTransaction; @Mock private TableColumnDataTypes tableColumnDataTypes; From 98e84bbde0ee0e5b67f5e31cb3951787705de1c6 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Thu, 13 Nov 2025 09:33:38 +0530 Subject: [PATCH 13/17] correction --- .../db/dataloader/cli/command/dataexport/ExportCommand.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index 6967514b40..ae2b80c77b 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -5,6 +5,7 @@ import static java.nio.file.StandardOpenOption.APPEND; import static java.nio.file.StandardOpenOption.CREATE; +import com.scalar.db.api.DistributedTransactionAdmin; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.cli.exception.DirectoryValidationException; @@ -65,8 +66,7 @@ public Integer call() throws Exception { TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath); TableMetadata tableMetadata; - try (com.scalar.db.api.DistributedTransactionAdmin admin = - transactionFactory.getTransactionAdmin()) { + try (DistributedTransactionAdmin admin = transactionFactory.getTransactionAdmin()) { TableMetadataService metaDataService = new TableMetadataService(admin); tableMetadata = metaDataService.getTableMetadata(namespace, table); } From 671f95a611ccbf0e20132aeee09e1daff3b796e0 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Fri, 14 Nov 2025 08:58:55 +0530 Subject: [PATCH 14/17] Removed unused commit --- .../db/dataloader/cli/command/dataexport/ExportCommand.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index 1d8ff440b8..61bf353ada 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -32,10 +32,6 @@ import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService; import com.scalar.db.dataloader.core.util.KeyUtils; import com.scalar.db.io.Key; -<<<<<<< HEAD -======= -import com.scalar.db.service.StorageFactory; ->>>>>>> master import com.scalar.db.service.TransactionFactory; import java.io.BufferedWriter; import java.nio.charset.Charset; From 6418f5ccd49bbb9bfb7c024cdcf4e75e1b8254ba Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Fri, 14 Nov 2025 09:00:02 +0530 Subject: [PATCH 15/17] Fixed merge import issues --- .../db/dataloader/cli/command/dataexport/ExportCommand.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index 61bf353ada..1a72319fc3 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -5,10 +5,7 @@ import static java.nio.file.StandardOpenOption.APPEND; import static java.nio.file.StandardOpenOption.CREATE; -<<<<<<< HEAD import com.scalar.db.api.DistributedTransactionAdmin; -======= ->>>>>>> master import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.cli.exception.DirectoryValidationException; From 96ba8f01f5b50fb4e318be6b5097fd26024430e6 Mon Sep 17 00:00:00 2001 From: Peckstadt Yves Date: Fri, 14 Nov 2025 14:32:43 +0900 Subject: [PATCH 16/17] Deprecate `include-metadata` option for data loader export (#3159) --- .../cli/command/dataexport/ExportCommand.java | 27 +++++- .../dataexport/ExportCommandOptions.java | 15 +++- .../command/dataexport/ExportCommandTest.java | 87 ++++++++++++++++++- .../core/dataexport/CsvExportManager.java | 24 +---- .../core/dataexport/ExportManager.java | 18 ---- .../core/dataexport/ExportOptions.java | 1 - .../dataexport/producer/CsvProducerTask.java | 11 +-- .../producer/JsonLineProducerTask.java | 11 +-- .../dataexport/producer/JsonProducerTask.java | 11 +-- .../dataexport/producer/ProducerTask.java | 9 -- .../producer/ProducerTaskFactory.java | 13 +-- .../core/util/TableMetadataUtil.java | 38 -------- .../core/dataexport/CsvExportManagerTest.java | 11 +-- .../dataexport/JsonExportManagerTest.java | 2 +- .../dataexport/JsonLineExportManagerTest.java | 2 +- .../producer/CsvProducerTaskTest.java | 18 ++-- .../producer/JsonLineProducerTaskTest.java | 20 ++--- .../producer/JsonProducerTaskTest.java | 19 ++-- .../producer/ProducerTaskFactoryTest.java | 6 +- 19 files changed, 168 insertions(+), 175 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index 1a72319fc3..8331c45f87 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -55,6 +55,7 @@ public class ExportCommand extends ExportCommandOptions implements Callable