Skip to content

Commit f70efe5

Browse files
authored
Replace DistributedStorage with DistributedTransactionManager in Data Loader export (#3140)
1 parent ba20b01 commit f70efe5

File tree

9 files changed

+155
-102
lines changed

9 files changed

+155
-102
lines changed

data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import static java.nio.file.StandardOpenOption.APPEND;
66
import static java.nio.file.StandardOpenOption.CREATE;
77

8-
import com.scalar.db.api.DistributedStorage;
8+
import com.scalar.db.api.DistributedTransactionManager;
99
import com.scalar.db.api.TableMetadata;
1010
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
1111
import com.scalar.db.dataloader.cli.util.DirectoryUtils;
@@ -29,6 +29,7 @@
2929
import com.scalar.db.dataloader.core.util.KeyUtils;
3030
import com.scalar.db.io.Key;
3131
import com.scalar.db.service.StorageFactory;
32+
import com.scalar.db.service.TransactionFactory;
3233
import java.io.BufferedWriter;
3334
import java.nio.charset.Charset;
3435
import java.nio.file.Files;
@@ -69,11 +70,13 @@ public Integer call() throws Exception {
6970
}
7071

7172
StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
73+
TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath);
7274
TableMetadataService metaDataService =
7375
new TableMetadataService(storageFactory.getStorageAdmin());
7476
ScalarDbDao scalarDbDao = new ScalarDbDao();
7577

76-
ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat);
78+
ExportManager exportManager =
79+
createExportManager(transactionFactory, scalarDbDao, outputFormat);
7780

7881
TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);
7982

@@ -156,17 +159,17 @@ private void validateOutputDirectory() throws DirectoryValidationException {
156159
}
157160

158161
private ExportManager createExportManager(
159-
StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
162+
TransactionFactory transactionFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
160163
ProducerTaskFactory taskFactory =
161164
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
162-
DistributedStorage storage = storageFactory.getStorage();
165+
DistributedTransactionManager manager = transactionFactory.getTransactionManager();
163166
switch (fileFormat) {
164167
case JSON:
165-
return new JsonExportManager(storage, scalarDbDao, taskFactory);
168+
return new JsonExportManager(manager, scalarDbDao, taskFactory);
166169
case JSONL:
167-
return new JsonLineExportManager(storage, scalarDbDao, taskFactory);
170+
return new JsonLineExportManager(manager, scalarDbDao, taskFactory);
168171
case CSV:
169-
return new CsvExportManager(storage, scalarDbDao, taskFactory);
172+
return new CsvExportManager(manager, scalarDbDao, taskFactory);
170173
default:
171174
throw new AssertionError("Invalid file format" + fileFormat);
172175
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

3-
import com.scalar.db.api.DistributedStorage;
3+
import com.scalar.db.api.DistributedTransactionManager;
44
import com.scalar.db.api.TableMetadata;
55
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
66
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
@@ -15,16 +15,23 @@
1515
public class CsvExportManager extends ExportManager {
1616

1717
/**
18-
* Constructs a {@code CsvExportManager} with the specified {@link DistributedStorage}, {@link
19-
* ScalarDbDao}, and {@link ProducerTaskFactory}.
18+
* Constructs a {@code CsvExportManager} for exporting data using a {@link
19+
* DistributedTransactionManager}.
2020
*
21-
* @param storage the {@code DistributedStorage} instance used to read data from the database
22-
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
23-
* @param producerTaskFactory the factory used to create producer tasks for exporting data
21+
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
22+
* from ScalarDB within a distributed transaction context.
23+
*
24+
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
25+
* mode
26+
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
27+
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
28+
* exporting data
2429
*/
2530
public CsvExportManager(
26-
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
27-
super(storage, dao, producerTaskFactory);
31+
DistributedTransactionManager manager,
32+
ScalarDbDao dao,
33+
ProducerTaskFactory producerTaskFactory) {
34+
super(manager, dao, producerTaskFactory);
2835
}
2936

3037
/**

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

3-
import com.scalar.db.api.DistributedStorage;
3+
import com.scalar.db.api.DistributedTransactionManager;
44
import com.scalar.db.api.Result;
5-
import com.scalar.db.api.Scanner;
65
import com.scalar.db.api.TableMetadata;
6+
import com.scalar.db.api.TransactionManagerCrudOperable;
77
import com.scalar.db.dataloader.core.FileFormat;
88
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask;
99
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
@@ -12,6 +12,8 @@
1212
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
1313
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException;
1414
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
15+
import com.scalar.db.exception.transaction.CrudException;
16+
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
1517
import com.scalar.db.io.DataType;
1618
import java.io.BufferedWriter;
1719
import java.io.IOException;
@@ -34,7 +36,7 @@
3436
public abstract class ExportManager {
3537
private static final Logger logger = LoggerFactory.getLogger(ExportManager.class);
3638

37-
private final DistributedStorage storage;
39+
private final DistributedTransactionManager manager;
3840
private final ScalarDbDao dao;
3941
private final ProducerTaskFactory producerTaskFactory;
4042
private final Object lock = new Object();
@@ -83,7 +85,8 @@ public ExportReport startExport(
8385
BufferedWriter bufferedWriter = new BufferedWriter(writer);
8486
boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON;
8587

86-
try (Scanner scanner = createScanner(exportOptions, dao, storage)) {
88+
try (TransactionManagerCrudOperable.Scanner scanner =
89+
createScanner(exportOptions, dao, manager)) {
8790

8891
Iterator<Result> iterator = scanner.iterator();
8992
AtomicBoolean isFirstBatch = new AtomicBoolean(true);
@@ -110,8 +113,11 @@ public ExportReport startExport(
110113
// TODO: handle this
111114
}
112115
processFooter(exportOptions, tableMetadata, bufferedWriter);
113-
} catch (InterruptedException | IOException e) {
114-
logger.error("Error during export: {}", e.getMessage());
116+
} catch (InterruptedException
117+
| IOException
118+
| UnknownTransactionStatusException
119+
| CrudException e) {
120+
logger.error("Error during export: ", e);
115121
} finally {
116122
bufferedWriter.flush();
117123
}
@@ -213,12 +219,12 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat
213219
*
214220
* @param exportOptions export options
215221
* @param dao ScalarDB dao object
216-
* @param storage distributed storage object
222+
* @param manager DistributedTransactionManager object
217223
* @return created scanner
218224
* @throws ScalarDbDaoException throws if any issue occurs in creating scanner object
219225
*/
220-
private Scanner createScanner(
221-
ExportOptions exportOptions, ScalarDbDao dao, DistributedStorage storage)
226+
private TransactionManagerCrudOperable.Scanner createScanner(
227+
ExportOptions exportOptions, ScalarDbDao dao, DistributedTransactionManager manager)
222228
throws ScalarDbDaoException {
223229
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
224230
if (isScanAll) {
@@ -227,7 +233,7 @@ private Scanner createScanner(
227233
exportOptions.getTableName(),
228234
exportOptions.getProjectionColumns(),
229235
exportOptions.getLimit(),
230-
storage);
236+
manager);
231237
} else {
232238
return dao.createScanner(
233239
exportOptions.getNamespace(),
@@ -237,7 +243,7 @@ private Scanner createScanner(
237243
exportOptions.getSortOrders(),
238244
exportOptions.getProjectionColumns(),
239245
exportOptions.getLimit(),
240-
storage);
246+
manager);
241247
}
242248
}
243249
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

3-
import com.scalar.db.api.DistributedStorage;
3+
import com.scalar.db.api.DistributedTransactionManager;
44
import com.scalar.db.api.TableMetadata;
55
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
66
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
@@ -11,16 +11,23 @@
1111
public class JsonExportManager extends ExportManager {
1212

1313
/**
14-
* Constructs a {@code JsonExportManager} with the specified {@link DistributedStorage}, {@link
15-
* ScalarDbDao}, and {@link ProducerTaskFactory}.
14+
* Constructs a {@code JsonExportManager} for exporting data using a {@link
15+
* DistributedTransactionManager}.
1616
*
17-
* @param storage the {@code DistributedStorage} instance used to read data from the database
18-
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
19-
* @param producerTaskFactory the factory used to create producer tasks for exporting data
17+
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
18+
* from ScalarDB within a distributed transaction context and exported in JSON format.
19+
*
20+
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
21+
* mode
22+
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
23+
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
24+
* exporting data
2025
*/
2126
public JsonExportManager(
22-
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
23-
super(storage, dao, producerTaskFactory);
27+
DistributedTransactionManager manager,
28+
ScalarDbDao dao,
29+
ProducerTaskFactory producerTaskFactory) {
30+
super(manager, dao, producerTaskFactory);
2431
}
2532

2633
/**

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

3-
import com.scalar.db.api.DistributedStorage;
3+
import com.scalar.db.api.DistributedTransactionManager;
44
import com.scalar.db.api.TableMetadata;
55
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
66
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
@@ -13,16 +13,23 @@
1313
public class JsonLineExportManager extends ExportManager {
1414

1515
/**
16-
* Constructs a {@code JsonLineExportManager} with the specified {@link DistributedStorage},
17-
* {@link ScalarDbDao}, and {@link ProducerTaskFactory}.
16+
* Constructs a {@code JsonLineExportManager} for exporting data using a {@link
17+
* DistributedTransactionManager}.
1818
*
19-
* @param storage the {@code DistributedStorage} instance used to read data from the database
20-
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
21-
* @param producerTaskFactory the factory used to create producer tasks for exporting data
19+
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
20+
* from ScalarDB within a distributed transaction context and exported in JSON Lines format.
21+
*
22+
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
23+
* mode
24+
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
25+
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
26+
* exporting data
2227
*/
2328
public JsonLineExportManager(
24-
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
25-
super(storage, dao, producerTaskFactory);
29+
DistributedTransactionManager manager,
30+
ScalarDbDao dao,
31+
ProducerTaskFactory producerTaskFactory) {
32+
super(manager, dao, producerTaskFactory);
2633
}
2734

2835
/**

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.scalar.db.api.DistributedStorage;
44
import com.scalar.db.api.DistributedTransaction;
5+
import com.scalar.db.api.DistributedTransactionManager;
56
import com.scalar.db.api.Get;
67
import com.scalar.db.api.GetBuilder;
78
import com.scalar.db.api.Put;
@@ -10,6 +11,7 @@
1011
import com.scalar.db.api.Scan;
1112
import com.scalar.db.api.ScanBuilder;
1213
import com.scalar.db.api.Scanner;
14+
import com.scalar.db.api.TransactionManagerCrudOperable;
1315
import com.scalar.db.dataloader.core.DataLoaderError;
1416
import com.scalar.db.dataloader.core.ScanRange;
1517
import com.scalar.db.exception.storage.ExecutionException;
@@ -219,61 +221,75 @@ public List<Result> scan(
219221
}
220222

221223
/**
222-
* Create a ScalarDB scanner instance
224+
* Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from
225+
* ScalarDB.
223226
*
224-
* @param namespace ScalarDB namespace
225-
* @param table ScalarDB table name
226-
* @param projectionColumns List of column projection to use during scan
227-
* @param limit Scan limit value
228-
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
229-
* @return ScalarDB Scanner object
230-
* @throws ScalarDbDaoException if scan fails
227+
* <p>a scanner that iterates over the retrieved records. It performs the scan transactionally
228+
* using a {@link DistributedTransactionManager}.
229+
*
230+
* @param namespace the ScalarDB namespace to scan
231+
* @param table the ScalarDB table name
232+
* @param projectionColumns the list of column names to include in the scan results
233+
* @param limit the maximum number of records to retrieve
234+
* @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in
235+
* storage mode
236+
* @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over the scan
237+
* results
238+
* @throws ScalarDbDaoException if an error occurs while creating or executing the scan
231239
*/
232-
public Scanner createScanner(
240+
public TransactionManagerCrudOperable.Scanner createScanner(
233241
String namespace,
234242
String table,
235243
List<String> projectionColumns,
236244
int limit,
237-
DistributedStorage storage)
245+
DistributedTransactionManager manager)
238246
throws ScalarDbDaoException {
239247
Scan scan =
240248
createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit);
241249
try {
242-
return storage.scan(scan);
243-
} catch (ExecutionException e) {
250+
return manager.getScanner(scan);
251+
} catch (CrudException e) {
244252
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
245253
}
246254
}
247255

248256
/**
249-
* Create a ScalarDB scanner instance
257+
* Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from
258+
* ScalarDB.
250259
*
251-
* @param namespace ScalarDB namespace
252-
* @param table ScalarDB table name
253-
* @param partitionKey Partition key used in ScalarDB scan
254-
* @param scanRange Optional range to set ScalarDB scan start and end values
255-
* @param sortOrders Optional scan clustering key sorting values
256-
* @param projectionColumns List of column projection to use during scan
257-
* @param limit Scan limit value
258-
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
259-
* @return ScalarDB Scanner object
260-
* @throws ScalarDbDaoException if scan fails
260+
* <p>returns a scanner that iterates over the matching records. The scan is performed
261+
* transactionally through a {@link DistributedTransactionManager}.
262+
*
263+
* @param namespace the ScalarDB namespace to scan
264+
* @param table the ScalarDB table name
265+
* @param partitionKey the optional {@link Key} representing the partition key for the scan
266+
* @param scanRange the optional {@link ScanRange} defining the start and end boundaries for the
267+
* scan
268+
* @param sortOrders the optional list of {@link Scan.Ordering} objects defining the clustering
269+
* key sort order
270+
* @param projectionColumns the optional list of column names to include in the scan results
271+
* @param limit the maximum number of records to retrieve
272+
* @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in
273+
* storage mode
274+
* @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over scan
275+
* results
276+
* @throws ScalarDbDaoException if an error occurs while creating or executing the scan
261277
*/
262-
public Scanner createScanner(
278+
public TransactionManagerCrudOperable.Scanner createScanner(
263279
String namespace,
264280
String table,
265281
@Nullable Key partitionKey,
266282
@Nullable ScanRange scanRange,
267283
@Nullable List<Scan.Ordering> sortOrders,
268284
@Nullable List<String> projectionColumns,
269285
int limit,
270-
DistributedStorage storage)
286+
DistributedTransactionManager manager)
271287
throws ScalarDbDaoException {
272288
Scan scan =
273289
createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit);
274290
try {
275-
return storage.scan(scan);
276-
} catch (ExecutionException e) {
291+
return manager.getScanner(scan);
292+
} catch (CrudException e) {
277293
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
278294
}
279295
}

0 commit comments

Comments
 (0)