diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java
index f79c7459e6..f6f2e2c104 100755
--- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java
+++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java
@@ -5,7 +5,7 @@
import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;
-import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
import com.scalar.db.dataloader.cli.util.DirectoryUtils;
@@ -29,6 +29,7 @@
import com.scalar.db.dataloader.core.util.KeyUtils;
import com.scalar.db.io.Key;
import com.scalar.db.service.StorageFactory;
+import com.scalar.db.service.TransactionFactory;
import java.io.BufferedWriter;
import java.nio.charset.Charset;
import java.nio.file.Files;
@@ -69,11 +70,13 @@ public Integer call() throws Exception {
}
StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
+ TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath);
TableMetadataService metaDataService =
new TableMetadataService(storageFactory.getStorageAdmin());
ScalarDbDao scalarDbDao = new ScalarDbDao();
- ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat);
+ ExportManager exportManager =
+ createExportManager(transactionFactory, scalarDbDao, outputFormat);
TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);
@@ -156,17 +159,17 @@ private void validateOutputDirectory() throws DirectoryValidationException {
}
private ExportManager createExportManager(
- StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
+ TransactionFactory transactionFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
ProducerTaskFactory taskFactory =
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
- DistributedStorage storage = storageFactory.getStorage();
+ DistributedTransactionManager manager = transactionFactory.getTransactionManager();
switch (fileFormat) {
case JSON:
- return new JsonExportManager(storage, scalarDbDao, taskFactory);
+ return new JsonExportManager(manager, scalarDbDao, taskFactory);
case JSONL:
- return new JsonLineExportManager(storage, scalarDbDao, taskFactory);
+ return new JsonLineExportManager(manager, scalarDbDao, taskFactory);
case CSV:
- return new CsvExportManager(storage, scalarDbDao, taskFactory);
+ return new CsvExportManager(manager, scalarDbDao, taskFactory);
default:
throw new AssertionError("Invalid file format" + fileFormat);
}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java
index bccf691adc..9589e405ac 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java
@@ -1,6 +1,6 @@
package com.scalar.db.dataloader.core.dataexport;
-import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
@@ -15,16 +15,23 @@
public class CsvExportManager extends ExportManager {
/**
- * Constructs a {@code CsvExportManager} with the specified {@link DistributedStorage}, {@link
- * ScalarDbDao}, and {@link ProducerTaskFactory}.
+ * Constructs a {@code CsvExportManager} for exporting data using a {@link
+ * DistributedTransactionManager}.
*
- * @param storage the {@code DistributedStorage} instance used to read data from the database
- * @param dao the {@code ScalarDbDao} used to execute export-related database operations
- * @param producerTaskFactory the factory used to create producer tasks for exporting data
+ *
This constructor is used when exporting data in transactional mode, allowing data to be read
+ * from ScalarDB within a distributed transaction context.
+ *
+ * @param manager the {@link DistributedTransactionManager} used to read data in transactional
+ * mode
+ * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
+ * @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
+ * exporting data
*/
public CsvExportManager(
- DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
- super(storage, dao, producerTaskFactory);
+ DistributedTransactionManager manager,
+ ScalarDbDao dao,
+ ProducerTaskFactory producerTaskFactory) {
+ super(manager, dao, producerTaskFactory);
}
/**
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java
index 302d828a3b..4a6a41596e 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java
@@ -1,9 +1,9 @@
package com.scalar.db.dataloader.core.dataexport;
-import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Result;
-import com.scalar.db.api.Scanner;
import com.scalar.db.api.TableMetadata;
+import com.scalar.db.api.TransactionManagerCrudOperable;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
@@ -12,6 +12,8 @@
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException;
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
+import com.scalar.db.exception.transaction.CrudException;
+import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.io.DataType;
import java.io.BufferedWriter;
import java.io.IOException;
@@ -34,7 +36,7 @@
public abstract class ExportManager {
private static final Logger logger = LoggerFactory.getLogger(ExportManager.class);
- private final DistributedStorage storage;
+ private final DistributedTransactionManager manager;
private final ScalarDbDao dao;
private final ProducerTaskFactory producerTaskFactory;
private final Object lock = new Object();
@@ -83,7 +85,8 @@ public ExportReport startExport(
BufferedWriter bufferedWriter = new BufferedWriter(writer);
boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON;
- try (Scanner scanner = createScanner(exportOptions, dao, storage)) {
+ try (TransactionManagerCrudOperable.Scanner scanner =
+ createScanner(exportOptions, dao, manager)) {
Iterator iterator = scanner.iterator();
AtomicBoolean isFirstBatch = new AtomicBoolean(true);
@@ -110,8 +113,11 @@ public ExportReport startExport(
// TODO: handle this
}
processFooter(exportOptions, tableMetadata, bufferedWriter);
- } catch (InterruptedException | IOException e) {
- logger.error("Error during export: {}", e.getMessage());
+ } catch (InterruptedException
+ | IOException
+ | UnknownTransactionStatusException
+ | CrudException e) {
+ logger.error("Error during export: ", e);
} finally {
bufferedWriter.flush();
}
@@ -213,12 +219,12 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat
*
* @param exportOptions export options
* @param dao ScalarDB dao object
- * @param storage distributed storage object
+ * @param manager DistributedTransactionManager object
* @return created scanner
* @throws ScalarDbDaoException throws if any issue occurs in creating scanner object
*/
- private Scanner createScanner(
- ExportOptions exportOptions, ScalarDbDao dao, DistributedStorage storage)
+ private TransactionManagerCrudOperable.Scanner createScanner(
+ ExportOptions exportOptions, ScalarDbDao dao, DistributedTransactionManager manager)
throws ScalarDbDaoException {
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
if (isScanAll) {
@@ -227,7 +233,7 @@ private Scanner createScanner(
exportOptions.getTableName(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
- storage);
+ manager);
} else {
return dao.createScanner(
exportOptions.getNamespace(),
@@ -237,7 +243,7 @@ private Scanner createScanner(
exportOptions.getSortOrders(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
- storage);
+ manager);
}
}
}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java
index fadac644a2..4115db8904 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java
@@ -1,6 +1,6 @@
package com.scalar.db.dataloader.core.dataexport;
-import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
@@ -11,16 +11,23 @@
public class JsonExportManager extends ExportManager {
/**
- * Constructs a {@code JsonExportManager} with the specified {@link DistributedStorage}, {@link
- * ScalarDbDao}, and {@link ProducerTaskFactory}.
+ * Constructs a {@code JsonExportManager} for exporting data using a {@link
+ * DistributedTransactionManager}.
*
- * @param storage the {@code DistributedStorage} instance used to read data from the database
- * @param dao the {@code ScalarDbDao} used to execute export-related database operations
- * @param producerTaskFactory the factory used to create producer tasks for exporting data
+ * This constructor is used when exporting data in transactional mode, allowing data to be read
+ * from ScalarDB within a distributed transaction context and exported in JSON format.
+ *
+ * @param manager the {@link DistributedTransactionManager} used to read data in transactional
+ * mode
+ * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
+ * @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
+ * exporting data
*/
public JsonExportManager(
- DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
- super(storage, dao, producerTaskFactory);
+ DistributedTransactionManager manager,
+ ScalarDbDao dao,
+ ProducerTaskFactory producerTaskFactory) {
+ super(manager, dao, producerTaskFactory);
}
/**
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java
index 2ce21deb7d..27f853ca61 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java
@@ -1,6 +1,6 @@
package com.scalar.db.dataloader.core.dataexport;
-import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
@@ -13,16 +13,23 @@
public class JsonLineExportManager extends ExportManager {
/**
- * Constructs a {@code JsonLineExportManager} with the specified {@link DistributedStorage},
- * {@link ScalarDbDao}, and {@link ProducerTaskFactory}.
+ * Constructs a {@code JsonLineExportManager} for exporting data using a {@link
+ * DistributedTransactionManager}.
*
- * @param storage the {@code DistributedStorage} instance used to read data from the database
- * @param dao the {@code ScalarDbDao} used to execute export-related database operations
- * @param producerTaskFactory the factory used to create producer tasks for exporting data
+ *
This constructor is used when exporting data in transactional mode, allowing data to be read
+ * from ScalarDB within a distributed transaction context and exported in JSON Lines format.
+ *
+ * @param manager the {@link DistributedTransactionManager} used to read data in transactional
+ * mode
+ * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
+ * @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
+ * exporting data
*/
public JsonLineExportManager(
- DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
- super(storage, dao, producerTaskFactory);
+ DistributedTransactionManager manager,
+ ScalarDbDao dao,
+ ProducerTaskFactory producerTaskFactory) {
+ super(manager, dao, producerTaskFactory);
}
/**
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java
index afd7b124af..6afceae89a 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java
@@ -2,6 +2,7 @@
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransaction;
+import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Get;
import com.scalar.db.api.GetBuilder;
import com.scalar.db.api.Put;
@@ -10,6 +11,7 @@
import com.scalar.db.api.Scan;
import com.scalar.db.api.ScanBuilder;
import com.scalar.db.api.Scanner;
+import com.scalar.db.api.TransactionManagerCrudOperable;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.exception.storage.ExecutionException;
@@ -219,47 +221,61 @@ public List scan(
}
/**
- * Create a ScalarDB scanner instance
+ * Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from
+ * ScalarDB.
*
- * @param namespace ScalarDB namespace
- * @param table ScalarDB table name
- * @param projectionColumns List of column projection to use during scan
- * @param limit Scan limit value
- * @param storage Distributed storage for ScalarDB connection that is running in storage mode
- * @return ScalarDB Scanner object
- * @throws ScalarDbDaoException if scan fails
+ * a scanner that iterates over the retrieved records. It performs the scan transactionally
+ * using a {@link DistributedTransactionManager}.
+ *
+ * @param namespace the ScalarDB namespace to scan
+ * @param table the ScalarDB table name
+ * @param projectionColumns the list of column names to include in the scan results
+ * @param limit the maximum number of records to retrieve
+ * @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in
+ * storage mode
+ * @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over the scan
+ * results
+ * @throws ScalarDbDaoException if an error occurs while creating or executing the scan
*/
- public Scanner createScanner(
+ public TransactionManagerCrudOperable.Scanner createScanner(
String namespace,
String table,
List projectionColumns,
int limit,
- DistributedStorage storage)
+ DistributedTransactionManager manager)
throws ScalarDbDaoException {
Scan scan =
createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit);
try {
- return storage.scan(scan);
- } catch (ExecutionException e) {
+ return manager.getScanner(scan);
+ } catch (CrudException e) {
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}
/**
- * Create a ScalarDB scanner instance
+ * Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from
+ * ScalarDB.
*
- * @param namespace ScalarDB namespace
- * @param table ScalarDB table name
- * @param partitionKey Partition key used in ScalarDB scan
- * @param scanRange Optional range to set ScalarDB scan start and end values
- * @param sortOrders Optional scan clustering key sorting values
- * @param projectionColumns List of column projection to use during scan
- * @param limit Scan limit value
- * @param storage Distributed storage for ScalarDB connection that is running in storage mode
- * @return ScalarDB Scanner object
- * @throws ScalarDbDaoException if scan fails
+ * returns a scanner that iterates over the matching records. The scan is performed
+ * transactionally through a {@link DistributedTransactionManager}.
+ *
+ * @param namespace the ScalarDB namespace to scan
+ * @param table the ScalarDB table name
+ * @param partitionKey the optional {@link Key} representing the partition key for the scan
+ * @param scanRange the optional {@link ScanRange} defining the start and end boundaries for the
+ * scan
+ * @param sortOrders the optional list of {@link Scan.Ordering} objects defining the clustering
+ * key sort order
+ * @param projectionColumns the optional list of column names to include in the scan results
+ * @param limit the maximum number of records to retrieve
+ * @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in
+ * storage mode
+ * @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over scan
+ * results
+ * @throws ScalarDbDaoException if an error occurs while creating or executing the scan
*/
- public Scanner createScanner(
+ public TransactionManagerCrudOperable.Scanner createScanner(
String namespace,
String table,
@Nullable Key partitionKey,
@@ -267,13 +283,13 @@ public Scanner createScanner(
@Nullable List sortOrders,
@Nullable List projectionColumns,
int limit,
- DistributedStorage storage)
+ DistributedTransactionManager manager)
throws ScalarDbDaoException {
Scan scan =
createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit);
try {
- return storage.scan(scan);
- } catch (ExecutionException e) {
+ return manager.getScanner(scan);
+ } catch (CrudException e) {
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java
index c61aa6f989..26e6a88a85 100644
--- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java
@@ -1,9 +1,9 @@
package com.scalar.db.dataloader.core.dataexport;
-import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Result;
-import com.scalar.db.api.Scanner;
import com.scalar.db.api.TableMetadata;
+import com.scalar.db.api.TransactionManagerCrudOperable;
import com.scalar.db.common.ResultImpl;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScanRange;
@@ -33,14 +33,14 @@
public class CsvExportManagerTest {
TableMetadata mockData;
- DistributedStorage storage;
+ DistributedTransactionManager manager;
@Spy ScalarDbDao dao;
ProducerTaskFactory producerTaskFactory;
ExportManager exportManager;
@BeforeEach
void setup() {
- storage = Mockito.mock(DistributedStorage.class);
+ manager = Mockito.mock(DistributedTransactionManager.class);
mockData = UnitTestUtils.createTestTableMetadata();
dao = Mockito.mock(ScalarDbDao.class);
producerTaskFactory = new ProducerTaskFactory(null, false, true);
@@ -49,8 +49,9 @@ void setup() {
@Test
void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile()
throws IOException, ScalarDbDaoException {
- exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory);
- Scanner scanner = Mockito.mock(Scanner.class);
+ exportManager = new CsvExportManager(manager, dao, producerTaskFactory);
+ TransactionManagerCrudOperable.Scanner scanner =
+ Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
String filePath = Paths.get("").toAbsolutePath() + "/output.csv";
Map> values = UnitTestUtils.createTestValues();
Result result = new ResultImpl(values, mockData);
@@ -67,7 +68,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile()
exportOptions.getTableName(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
- storage))
+ manager))
.thenReturn(scanner);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
try (BufferedWriter writer =
@@ -88,8 +89,9 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile()
void startExport_givenPartitionKey_shouldGenerateOutputFile()
throws IOException, ScalarDbDaoException {
producerTaskFactory = new ProducerTaskFactory(",", false, false);
- exportManager = new CsvExportManager(storage, dao, producerTaskFactory);
- Scanner scanner = Mockito.mock(Scanner.class);
+ exportManager = new CsvExportManager(manager, dao, producerTaskFactory);
+ TransactionManagerCrudOperable.Scanner scanner =
+ Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
String filePath = Paths.get("").toAbsolutePath() + "/output.csv";
Map> values = UnitTestUtils.createTestValues();
Result result = new ResultImpl(values, mockData);
@@ -114,7 +116,7 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile()
exportOptions.getSortOrders(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
- storage))
+ manager))
.thenReturn(scanner);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
try (BufferedWriter writer =
@@ -150,8 +152,9 @@ private void runExportAndAssertFirstLine(boolean excludeHeader, String expectedF
throws Exception {
// Arrange
producerTaskFactory = new ProducerTaskFactory(",", false, false);
- exportManager = new CsvExportManager(storage, dao, producerTaskFactory);
- Scanner scanner = Mockito.mock(Scanner.class);
+ exportManager = new CsvExportManager(manager, dao, producerTaskFactory);
+ TransactionManagerCrudOperable.Scanner scanner =
+ Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
String filePath = Paths.get("").toAbsolutePath() + "/output.csv";
Map> values = UnitTestUtils.createTestValues();
Result result = new ResultImpl(values, mockData);
@@ -178,7 +181,7 @@ private void runExportAndAssertFirstLine(boolean excludeHeader, String expectedF
exportOptions.getSortOrders(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
- storage))
+ manager))
.thenReturn(scanner);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
try (BufferedWriter writer =
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java
index bd18aeabd1..b71d080323 100644
--- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java
@@ -1,9 +1,9 @@
package com.scalar.db.dataloader.core.dataexport;
-import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Result;
-import com.scalar.db.api.Scanner;
import com.scalar.db.api.TableMetadata;
+import com.scalar.db.api.TransactionManagerCrudOperable;
import com.scalar.db.common.ResultImpl;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScanRange;
@@ -33,14 +33,14 @@
public class JsonExportManagerTest {
TableMetadata mockData;
- DistributedStorage storage;
+ DistributedTransactionManager manager;
@Spy ScalarDbDao dao;
ProducerTaskFactory producerTaskFactory;
ExportManager exportManager;
@BeforeEach
void setup() {
- storage = Mockito.mock(DistributedStorage.class);
+ manager = Mockito.mock(DistributedTransactionManager.class);
mockData = UnitTestUtils.createTestTableMetadata();
dao = Mockito.mock(ScalarDbDao.class);
producerTaskFactory = new ProducerTaskFactory(null, false, true);
@@ -49,8 +49,9 @@ void setup() {
@Test
void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile()
throws IOException, ScalarDbDaoException {
- exportManager = new JsonExportManager(storage, dao, producerTaskFactory);
- Scanner scanner = Mockito.mock(Scanner.class);
+ exportManager = new JsonExportManager(manager, dao, producerTaskFactory);
+ TransactionManagerCrudOperable.Scanner scanner =
+ Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
String filePath = Paths.get("").toAbsolutePath() + "/output.json";
Map> values = UnitTestUtils.createTestValues();
Result result = new ResultImpl(values, mockData);
@@ -68,7 +69,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile()
exportOptions.getTableName(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
- storage))
+ manager))
.thenReturn(scanner);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
try (BufferedWriter writer =
@@ -88,8 +89,9 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile()
@Test
void startExport_givenPartitionKey_shouldGenerateOutputFile()
throws IOException, ScalarDbDaoException {
- exportManager = new JsonExportManager(storage, dao, producerTaskFactory);
- Scanner scanner = Mockito.mock(Scanner.class);
+ exportManager = new JsonExportManager(manager, dao, producerTaskFactory);
+ TransactionManagerCrudOperable.Scanner scanner =
+ Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
String filePath = Paths.get("").toAbsolutePath() + "/output.json";
Map> values = UnitTestUtils.createTestValues();
Result result = new ResultImpl(values, mockData);
@@ -114,7 +116,7 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile()
exportOptions.getSortOrders(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
- storage))
+ manager))
.thenReturn(scanner);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
try (BufferedWriter writer =
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java
index 31e4326a33..f2a6d1b360 100644
--- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java
@@ -1,9 +1,9 @@
package com.scalar.db.dataloader.core.dataexport;
-import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Result;
-import com.scalar.db.api.Scanner;
import com.scalar.db.api.TableMetadata;
+import com.scalar.db.api.TransactionManagerCrudOperable;
import com.scalar.db.common.ResultImpl;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScanRange;
@@ -32,14 +32,14 @@
public class JsonLineExportManagerTest {
TableMetadata mockData;
- DistributedStorage storage;
+ DistributedTransactionManager manager;
@Spy ScalarDbDao dao;
ProducerTaskFactory producerTaskFactory;
ExportManager exportManager;
@BeforeEach
void setup() {
- storage = Mockito.mock(DistributedStorage.class);
+ manager = Mockito.mock(DistributedTransactionManager.class);
mockData = UnitTestUtils.createTestTableMetadata();
dao = Mockito.mock(ScalarDbDao.class);
producerTaskFactory = new ProducerTaskFactory(null, false, true);
@@ -48,8 +48,9 @@ void setup() {
@Test
void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile()
throws IOException, ScalarDbDaoException {
- exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory);
- Scanner scanner = Mockito.mock(Scanner.class);
+ exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory);
+ TransactionManagerCrudOperable.Scanner scanner =
+ Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl";
Map> values = UnitTestUtils.createTestValues();
Result result = new ResultImpl(values, mockData);
@@ -67,7 +68,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile()
exportOptions.getTableName(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
- storage))
+ manager))
.thenReturn(scanner);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
try (BufferedWriter writer =
@@ -87,8 +88,9 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile()
@Test
void startExport_givenPartitionKey_shouldGenerateOutputFile()
throws IOException, ScalarDbDaoException {
- exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory);
- Scanner scanner = Mockito.mock(Scanner.class);
+ exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory);
+ TransactionManagerCrudOperable.Scanner scanner =
+ Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl";
Map> values = UnitTestUtils.createTestValues();
Result result = new ResultImpl(values, mockData);
@@ -113,7 +115,7 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile()
exportOptions.getSortOrders(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
- storage))
+ manager))
.thenReturn(scanner);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
try (BufferedWriter writer =