From 627fe69b82840d5d7fc1bf2089724f51fdb1d655 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Mon, 10 Nov 2025 15:21:08 +0530 Subject: [PATCH 1/5] initial commit --- .../dataloader/cli/command/dataexport/ExportCommand.java | 8 ++++++-- .../db/dataloader/core/dataexport/CsvExportManager.java | 7 ++++--- .../db/dataloader/core/dataexport/ExportManager.java | 3 ++- .../db/dataloader/core/dataexport/JsonExportManager.java | 9 +++++---- .../core/dataexport/JsonLineExportManager.java | 9 +++++---- 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index 712a57c12c..adf54105ae 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -6,6 +6,7 @@ import static java.nio.file.StandardOpenOption.CREATE; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.cli.exception.DirectoryValidationException; import com.scalar.db.dataloader.cli.util.DirectoryUtils; @@ -36,6 +37,8 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; + +import com.scalar.db.service.TransactionFactory; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +67,7 @@ public Integer call() throws Exception { validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS); StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath); + TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath) TableMetadataService metaDataService = new TableMetadataService(storageFactory.getStorageAdmin()); ScalarDbDao scalarDbDao = new ScalarDbDao(); @@ -146,10 +150,10 @@ private void validateOutputDirectory() throws DirectoryValidationException { } private ExportManager createExportManager( - StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) { + TransactionFactory transactionFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) { ProducerTaskFactory taskFactory = new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson); - DistributedStorage storage = storageFactory.getStorage(); + DistributedTransactionManager manager = transactionFactory.getTransactionManager(); switch (fileFormat) { case JSON: return new JsonExportManager(storage, scalarDbDao, taskFactory); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java index bccf691adc..3da65da552 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; @@ -18,13 +19,13 @@ public class CsvExportManager extends ExportManager { * Constructs a {@code CsvExportManager} with the specified {@link DistributedStorage}, {@link * ScalarDbDao}, and {@link ProducerTaskFactory}. * - * @param storage the {@code DistributedStorage} instance used to read data from the database + * @param manager the {@code DistributedTransactionManager} instance used to read data from the database * @param dao the {@code ScalarDbDao} used to execute export-related database operations * @param producerTaskFactory the factory used to create producer tasks for exporting data */ public CsvExportManager( - DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { - super(storage, dao, producerTaskFactory); + DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + super(manager, dao, producerTaskFactory); } /** diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index fdc27d664c..8278c7b877 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; @@ -34,7 +35,7 @@ public abstract class ExportManager { private static final Logger logger = LoggerFactory.getLogger(ExportManager.class); - private final DistributedStorage storage; + private final DistributedTransactionManager distributedTransactionManager; private final ScalarDbDao dao; private final ProducerTaskFactory producerTaskFactory; private final Object lock = new Object(); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java index fadac644a2..26737e4ed6 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; @@ -11,16 +12,16 @@ public class JsonExportManager extends ExportManager { /** - * Constructs a {@code JsonExportManager} with the specified {@link DistributedStorage}, {@link + * Constructs a {@code JsonExportManager} with the specified {@link DistributedTransactionManager}, {@link * ScalarDbDao}, and {@link ProducerTaskFactory}. * - * @param storage the {@code DistributedStorage} instance used to read data from the database + * @param manager the {@code DistributedTransactionManager} instance used to read data from the database * @param dao the {@code ScalarDbDao} used to execute export-related database operations * @param producerTaskFactory the factory used to create producer tasks for exporting data */ public JsonExportManager( - DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { - super(storage, dao, producerTaskFactory); + DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + super(manager, dao, producerTaskFactory); } /** diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java index 2ce21deb7d..1c084df0c2 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; @@ -13,16 +14,16 @@ public class JsonLineExportManager extends ExportManager { /** - * Constructs a {@code JsonLineExportManager} with the specified {@link DistributedStorage}, + * Constructs a {@code JsonLineExportManager} with the specified {@link DistributedTransactionManager}, * {@link ScalarDbDao}, and {@link ProducerTaskFactory}. * - * @param storage the {@code DistributedStorage} instance used to read data from the database + * @param manager the {@code DistributedTransactionManager} instance used to read data from the database * @param dao the {@code ScalarDbDao} used to execute export-related database operations * @param producerTaskFactory the factory used to create producer tasks for exporting data */ public JsonLineExportManager( - DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { - super(storage, dao, producerTaskFactory); + DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + super(manager, dao, producerTaskFactory); } /** From 32b4b701d5af20841c67ff15ef837e54066bc9e9 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Mon, 10 Nov 2025 17:18:02 +0530 Subject: [PATCH 2/5] Initial changes --- .../cli/command/dataexport/ExportCommand.java | 17 +++-- .../core/dataexport/CsvExportManager.java | 20 ++++-- .../core/dataexport/ExportManager.java | 14 ++-- .../core/dataexport/JsonExportManager.java | 20 ++++-- .../dataexport/JsonLineExportManager.java | 20 ++++-- .../core/dataimport/dao/ScalarDbDao.java | 72 ++++++++++++------- .../core/dataexport/CsvExportManagerTest.java | 29 ++++---- .../dataexport/JsonExportManagerTest.java | 22 +++--- .../dataexport/JsonLineExportManagerTest.java | 22 +++--- 9 files changed, 139 insertions(+), 97 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index adf54105ae..e6605a504b 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -5,7 +5,6 @@ import static java.nio.file.StandardOpenOption.APPEND; import static java.nio.file.StandardOpenOption.CREATE; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.cli.exception.DirectoryValidationException; @@ -30,6 +29,7 @@ import com.scalar.db.dataloader.core.util.KeyUtils; import com.scalar.db.io.Key; import com.scalar.db.service.StorageFactory; +import com.scalar.db.service.TransactionFactory; import java.io.BufferedWriter; import java.nio.charset.Charset; import java.nio.file.Files; @@ -37,8 +37,6 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; - -import com.scalar.db.service.TransactionFactory; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,12 +65,13 @@ public Integer call() throws Exception { validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS); StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath); - TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath) + TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath); TableMetadataService metaDataService = new TableMetadataService(storageFactory.getStorageAdmin()); ScalarDbDao scalarDbDao = new ScalarDbDao(); - ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat); + ExportManager exportManager = + createExportManager(transactionFactory, scalarDbDao, outputFormat); TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table); @@ -153,14 +152,14 @@ private ExportManager createExportManager( TransactionFactory transactionFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) { ProducerTaskFactory taskFactory = new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson); - DistributedTransactionManager manager = transactionFactory.getTransactionManager(); + DistributedTransactionManager manager = transactionFactory.getTransactionManager(); switch (fileFormat) { case JSON: - return new JsonExportManager(storage, scalarDbDao, taskFactory); + return new JsonExportManager(manager, scalarDbDao, taskFactory); case JSONL: - return new JsonLineExportManager(storage, scalarDbDao, taskFactory); + return new JsonLineExportManager(manager, scalarDbDao, taskFactory); case CSV: - return new CsvExportManager(storage, scalarDbDao, taskFactory); + return new CsvExportManager(manager, scalarDbDao, taskFactory); default: throw new AssertionError("Invalid file format" + fileFormat); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java index 3da65da552..9589e405ac 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java @@ -1,6 +1,5 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; @@ -16,15 +15,22 @@ public class CsvExportManager extends ExportManager { /** - * Constructs a {@code CsvExportManager} with the specified {@link DistributedStorage}, {@link - * ScalarDbDao}, and {@link ProducerTaskFactory}. + * Constructs a {@code CsvExportManager} for exporting data using a {@link + * DistributedTransactionManager}. * - * @param manager the {@code DistributedTransactionManager} instance used to read data from the database - * @param dao the {@code ScalarDbDao} used to execute export-related database operations - * @param producerTaskFactory the factory used to create producer tasks for exporting data + *

This constructor is used when exporting data in transactional mode, allowing data to be read + * from ScalarDB within a distributed transaction context. + * + * @param manager the {@link DistributedTransactionManager} used to read data in transactional + * mode + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations + * @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for + * exporting data */ public CsvExportManager( - DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + DistributedTransactionManager manager, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { super(manager, dao, producerTaskFactory); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index 8278c7b877..67653762df 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -1,10 +1,10 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.FileFormat; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; @@ -87,7 +87,7 @@ public ExportReport startExport( BufferedWriter bufferedWriter = new BufferedWriter(writer); boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON; - try (Scanner scanner = createScanner(exportOptions, dao, storage)) { + try (Scanner scanner = createScanner(exportOptions, dao, distributedTransactionManager)) { Iterator iterator = scanner.iterator(); AtomicBoolean isFirstBatch = new AtomicBoolean(true); @@ -217,12 +217,12 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat * * @param exportOptions export options * @param dao ScalarDB dao object - * @param storage distributed storage object + * @param manager DistributedTransactionManager object * @return created scanner * @throws ScalarDbDaoException throws if any issue occurs in creating scanner object */ - private Scanner createScanner( - ExportOptions exportOptions, ScalarDbDao dao, DistributedStorage storage) + private TransactionManagerCrudOperable.Scanner createScanner( + ExportOptions exportOptions, ScalarDbDao dao, DistributedTransactionManager manager) throws ScalarDbDaoException { boolean isScanAll = exportOptions.getScanPartitionKey() == null; if (isScanAll) { @@ -231,7 +231,7 @@ private Scanner createScanner( exportOptions.getTableName(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage); + manager); } else { return dao.createScanner( exportOptions.getNamespace(), @@ -241,7 +241,7 @@ private Scanner createScanner( exportOptions.getSortOrders(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage); + manager); } } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java index 26737e4ed6..4115db8904 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java @@ -1,6 +1,5 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; @@ -12,15 +11,22 @@ public class JsonExportManager extends ExportManager { /** - * Constructs a {@code JsonExportManager} with the specified {@link DistributedTransactionManager}, {@link - * ScalarDbDao}, and {@link ProducerTaskFactory}. + * Constructs a {@code JsonExportManager} for exporting data using a {@link + * DistributedTransactionManager}. * - * @param manager the {@code DistributedTransactionManager} instance used to read data from the database - * @param dao the {@code ScalarDbDao} used to execute export-related database operations - * @param producerTaskFactory the factory used to create producer tasks for exporting data + *

This constructor is used when exporting data in transactional mode, allowing data to be read + * from ScalarDB within a distributed transaction context and exported in JSON format. + * + * @param manager the {@link DistributedTransactionManager} used to read data in transactional + * mode + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations + * @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for + * exporting data */ public JsonExportManager( - DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + DistributedTransactionManager manager, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { super(manager, dao, producerTaskFactory); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java index 1c084df0c2..27f853ca61 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java @@ -1,6 +1,5 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; @@ -14,15 +13,22 @@ public class JsonLineExportManager extends ExportManager { /** - * Constructs a {@code JsonLineExportManager} with the specified {@link DistributedTransactionManager}, - * {@link ScalarDbDao}, and {@link ProducerTaskFactory}. + * Constructs a {@code JsonLineExportManager} for exporting data using a {@link + * DistributedTransactionManager}. * - * @param manager the {@code DistributedTransactionManager} instance used to read data from the database - * @param dao the {@code ScalarDbDao} used to execute export-related database operations - * @param producerTaskFactory the factory used to create producer tasks for exporting data + *

This constructor is used when exporting data in transactional mode, allowing data to be read + * from ScalarDB within a distributed transaction context and exported in JSON Lines format. + * + * @param manager the {@link DistributedTransactionManager} used to read data in transactional + * mode + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations + * @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for + * exporting data */ public JsonLineExportManager( - DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { + DistributedTransactionManager manager, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { super(manager, dao, producerTaskFactory); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java index afd7b124af..5242acdeb3 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java @@ -2,6 +2,7 @@ import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Get; import com.scalar.db.api.GetBuilder; import com.scalar.db.api.Put; @@ -10,6 +11,7 @@ import com.scalar.db.api.Scan; import com.scalar.db.api.ScanBuilder; import com.scalar.db.api.Scanner; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.exception.storage.ExecutionException; @@ -219,47 +221,63 @@ public List scan( } /** - * Create a ScalarDB scanner instance + * Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from + * ScalarDB. * - * @param namespace ScalarDB namespace - * @param table ScalarDB table name - * @param projectionColumns List of column projection to use during scan - * @param limit Scan limit value - * @param storage Distributed storage for ScalarDB connection that is running in storage mode - * @return ScalarDB Scanner object - * @throws ScalarDbDaoException if scan fails + *

This method builds and executes a {@link Scan} operation for the specified table and returns + * a scanner that iterates over the retrieved records. It operates in storage mode using a {@link + * DistributedTransactionManager}. + * + * @param namespace the ScalarDB namespace to scan + * @param table the ScalarDB table name + * @param projectionColumns the list of column names to include in the scan results + * @param limit the maximum number of records to retrieve + * @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in + * storage mode + * @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over the scan + * results + * @throws ScalarDbDaoException if an error occurs while creating or executing the scan */ - public Scanner createScanner( + public TransactionManagerCrudOperable.Scanner createScanner( String namespace, String table, List projectionColumns, int limit, - DistributedStorage storage) + DistributedTransactionManager manager) throws ScalarDbDaoException { Scan scan = createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit); try { - return storage.scan(scan); - } catch (ExecutionException e) { + return manager.getScanner(scan); + } catch (CrudException e) { throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e); } } /** - * Create a ScalarDB scanner instance + * Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from + * ScalarDB. * - * @param namespace ScalarDB namespace - * @param table ScalarDB table name - * @param partitionKey Partition key used in ScalarDB scan - * @param scanRange Optional range to set ScalarDB scan start and end values - * @param sortOrders Optional scan clustering key sorting values - * @param projectionColumns List of column projection to use during scan - * @param limit Scan limit value - * @param storage Distributed storage for ScalarDB connection that is running in storage mode - * @return ScalarDB Scanner object - * @throws ScalarDbDaoException if scan fails + *

This method builds and executes a {@link Scan} operation using the provided parameters and + * returns a scanner that iterates over the matching records. It is used in storage mode through a + * {@link DistributedTransactionManager}. + * + * @param namespace the ScalarDB namespace to scan + * @param table the ScalarDB table name + * @param partitionKey the optional {@link Key} representing the partition key for the scan + * @param scanRange the optional {@link ScanRange} defining the start and end boundaries for the + * scan + * @param sortOrders the optional list of {@link Scan.Ordering} objects defining the clustering + * key sort order + * @param projectionColumns the optional list of column names to include in the scan results + * @param limit the maximum number of records to retrieve + * @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in + * storage mode + * @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over scan + * results + * @throws ScalarDbDaoException if an error occurs while creating or executing the scan */ - public Scanner createScanner( + public TransactionManagerCrudOperable.Scanner createScanner( String namespace, String table, @Nullable Key partitionKey, @@ -267,13 +285,13 @@ public Scanner createScanner( @Nullable List sortOrders, @Nullable List projectionColumns, int limit, - DistributedStorage storage) + DistributedTransactionManager manager) throws ScalarDbDaoException { Scan scan = createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit); try { - return storage.scan(scan); - } catch (ExecutionException e) { + return manager.getScanner(scan); + } catch (CrudException e) { throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e); } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java index c61aa6f989..8edd940bc4 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java @@ -1,9 +1,9 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; -import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.common.ResultImpl; import com.scalar.db.dataloader.core.FileFormat; import com.scalar.db.dataloader.core.ScanRange; @@ -33,14 +33,14 @@ public class CsvExportManagerTest { TableMetadata mockData; - DistributedStorage storage; + DistributedTransactionManager manager; @Spy ScalarDbDao dao; ProducerTaskFactory producerTaskFactory; ExportManager exportManager; @BeforeEach void setup() { - storage = Mockito.mock(DistributedStorage.class); + manager = Mockito.mock(DistributedTransactionManager.class); mockData = UnitTestUtils.createTestTableMetadata(); dao = Mockito.mock(ScalarDbDao.class); producerTaskFactory = new ProducerTaskFactory(null, false, true); @@ -49,8 +49,9 @@ void setup() { @Test void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -67,7 +68,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() exportOptions.getTableName(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = @@ -88,8 +89,9 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() void startExport_givenPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { producerTaskFactory = new ProducerTaskFactory(",", false, false); - exportManager = new CsvExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new CsvExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -114,7 +116,7 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile() exportOptions.getSortOrders(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = @@ -150,8 +152,9 @@ private void runExportAndAssertFirstLine(boolean excludeHeader, String expectedF throws Exception { // Arrange producerTaskFactory = new ProducerTaskFactory(",", false, false); - exportManager = new CsvExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new CsvExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -178,7 +181,7 @@ private void runExportAndAssertFirstLine(boolean excludeHeader, String expectedF exportOptions.getSortOrders(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java index c1ef7ead1a..c37715180e 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java @@ -1,9 +1,9 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; -import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.common.ResultImpl; import com.scalar.db.dataloader.core.FileFormat; import com.scalar.db.dataloader.core.ScanRange; @@ -33,14 +33,14 @@ public class JsonExportManagerTest { TableMetadata mockData; - DistributedStorage storage; + DistributedTransactionManager manager; @Spy ScalarDbDao dao; ProducerTaskFactory producerTaskFactory; ExportManager exportManager; @BeforeEach void setup() { - storage = Mockito.mock(DistributedStorage.class); + manager = Mockito.mock(DistributedTransactionManager.class); mockData = UnitTestUtils.createTestTableMetadata(); dao = Mockito.mock(ScalarDbDao.class); producerTaskFactory = new ProducerTaskFactory(null, false, true); @@ -49,8 +49,9 @@ void setup() { @Test void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new JsonExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.json"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -68,7 +69,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() exportOptions.getTableName(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = @@ -88,8 +89,9 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() @Test void startExport_givenPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new JsonExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.json"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -114,7 +116,7 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile() exportOptions.getSortOrders(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java index 31e4326a33..f2a6d1b360 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java @@ -1,9 +1,9 @@ package com.scalar.db.dataloader.core.dataexport; -import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; -import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.common.ResultImpl; import com.scalar.db.dataloader.core.FileFormat; import com.scalar.db.dataloader.core.ScanRange; @@ -32,14 +32,14 @@ public class JsonLineExportManagerTest { TableMetadata mockData; - DistributedStorage storage; + DistributedTransactionManager manager; @Spy ScalarDbDao dao; ProducerTaskFactory producerTaskFactory; ExportManager exportManager; @BeforeEach void setup() { - storage = Mockito.mock(DistributedStorage.class); + manager = Mockito.mock(DistributedTransactionManager.class); mockData = UnitTestUtils.createTestTableMetadata(); dao = Mockito.mock(ScalarDbDao.class); producerTaskFactory = new ProducerTaskFactory(null, false, true); @@ -48,8 +48,9 @@ void setup() { @Test void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -67,7 +68,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() exportOptions.getTableName(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = @@ -87,8 +88,9 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() @Test void startExport_givenPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); - Scanner scanner = Mockito.mock(Scanner.class); + exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl"; Map> values = UnitTestUtils.createTestValues(); Result result = new ResultImpl(values, mockData); @@ -113,7 +115,7 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile() exportOptions.getSortOrders(), exportOptions.getProjectionColumns(), exportOptions.getLimit(), - storage)) + manager)) .thenReturn(scanner); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = From 1e08ecffe2691b470b37d19d7edae3cd35a95fc5 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 11 Nov 2025 11:07:28 +0530 Subject: [PATCH 3/5] scanner class updated --- .../db/dataloader/core/dataexport/ExportManager.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index 67653762df..550b41a331 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -2,7 +2,6 @@ import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; -import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.FileFormat; @@ -13,6 +12,8 @@ import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; import com.scalar.db.dataloader.core.util.TableMetadataUtil; +import com.scalar.db.exception.transaction.CrudException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.io.DataType; import java.io.BufferedWriter; import java.io.IOException; @@ -87,7 +88,8 @@ public ExportReport startExport( BufferedWriter bufferedWriter = new BufferedWriter(writer); boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON; - try (Scanner scanner = createScanner(exportOptions, dao, distributedTransactionManager)) { + try (TransactionManagerCrudOperable.Scanner scanner = + createScanner(exportOptions, dao, distributedTransactionManager)) { Iterator iterator = scanner.iterator(); AtomicBoolean isFirstBatch = new AtomicBoolean(true); @@ -114,7 +116,10 @@ public ExportReport startExport( // TODO: handle this } processFooter(exportOptions, tableMetadata, bufferedWriter); - } catch (InterruptedException | IOException e) { + } catch (InterruptedException + | IOException + | UnknownTransactionStatusException + | CrudException e) { logger.error("Error during export: {}", e.getMessage()); } finally { bufferedWriter.flush(); From e7c68a72db2ffa0df175612a072243a28acb6612 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 11 Nov 2025 11:51:32 +0530 Subject: [PATCH 4/5] Feedback changes --- .../com/scalar/db/dataloader/core/dataexport/ExportManager.java | 2 +- .../db/dataloader/core/dataexport/CsvExportManagerTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index 550b41a331..8f079b2f99 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -120,7 +120,7 @@ public ExportReport startExport( | IOException | UnknownTransactionStatusException | CrudException e) { - logger.error("Error during export: {}", e.getMessage()); + logger.error("Error during export: ", e); } finally { bufferedWriter.flush(); } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java index 8edd940bc4..26e6a88a85 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java @@ -49,7 +49,7 @@ void setup() { @Test void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { - exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory); + exportManager = new CsvExportManager(manager, dao, producerTaskFactory); TransactionManagerCrudOperable.Scanner scanner = Mockito.mock(TransactionManagerCrudOperable.Scanner.class); String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; From b973df88d81b3ab42c7e515837fd53baa5ef2fe5 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 11 Nov 2025 12:03:14 +0530 Subject: [PATCH 5/5] Feedback changes javadoc and param rename --- .../db/dataloader/core/dataexport/ExportManager.java | 4 ++-- .../db/dataloader/core/dataimport/dao/ScalarDbDao.java | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index 8f079b2f99..0743a79106 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -36,7 +36,7 @@ public abstract class ExportManager { private static final Logger logger = LoggerFactory.getLogger(ExportManager.class); - private final DistributedTransactionManager distributedTransactionManager; + private final DistributedTransactionManager manager; private final ScalarDbDao dao; private final ProducerTaskFactory producerTaskFactory; private final Object lock = new Object(); @@ -89,7 +89,7 @@ public ExportReport startExport( boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON; try (TransactionManagerCrudOperable.Scanner scanner = - createScanner(exportOptions, dao, distributedTransactionManager)) { + createScanner(exportOptions, dao, manager)) { Iterator iterator = scanner.iterator(); AtomicBoolean isFirstBatch = new AtomicBoolean(true); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java index 5242acdeb3..6afceae89a 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java @@ -224,9 +224,8 @@ public List scan( * Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from * ScalarDB. * - *

This method builds and executes a {@link Scan} operation for the specified table and returns - * a scanner that iterates over the retrieved records. It operates in storage mode using a {@link - * DistributedTransactionManager}. + *

a scanner that iterates over the retrieved records. It performs the scan transactionally + * using a {@link DistributedTransactionManager}. * * @param namespace the ScalarDB namespace to scan * @param table the ScalarDB table name @@ -258,9 +257,8 @@ public TransactionManagerCrudOperable.Scanner createScanner( * Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from * ScalarDB. * - *

This method builds and executes a {@link Scan} operation using the provided parameters and - * returns a scanner that iterates over the matching records. It is used in storage mode through a - * {@link DistributedTransactionManager}. + *

returns a scanner that iterates over the matching records. The scan is performed + * transactionally through a {@link DistributedTransactionManager}. * * @param namespace the ScalarDB namespace to scan * @param table the ScalarDB table name