Skip to content

Commit 1d81db7

Browse files
committed
Initial commit
1 parent 186d3c3 commit 1d81db7

File tree

8 files changed

+69
-88
lines changed

8 files changed

+69
-88
lines changed

data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
1414
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
1515
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
16-
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbStorageManager;
17-
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbTransactionManager;
1816
import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig;
1917
import com.scalar.db.dataloader.core.dataimport.log.LogMode;
2018
import com.scalar.db.dataloader.core.dataimport.log.SingleFileImportLogger;
@@ -26,7 +24,6 @@
2624
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
2725
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
2826
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
29-
import com.scalar.db.service.StorageFactory;
3027
import com.scalar.db.service.TransactionFactory;
3128
import java.io.BufferedReader;
3229
import java.io.File;
@@ -145,32 +142,14 @@ private ImportManager createImportManager(
145142
throws IOException {
146143
File configFile = new File(configFilePath);
147144
ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory();
148-
ImportManager importManager;
149-
if (scalarDbMode == ScalarDbMode.TRANSACTION) {
150-
ScalarDbTransactionManager scalarDbTransactionManager =
151-
new ScalarDbTransactionManager(TransactionFactory.create(configFile));
152-
importManager =
153-
new ImportManager(
154-
tableMetadataMap,
155-
reader,
156-
importOptions,
157-
importProcessorFactory,
158-
ScalarDbMode.TRANSACTION,
159-
null,
160-
scalarDbTransactionManager.getDistributedTransactionManager());
161-
} else {
162-
ScalarDbStorageManager scalarDbStorageManager =
163-
new ScalarDbStorageManager(StorageFactory.create(configFile));
164-
importManager =
165-
new ImportManager(
166-
tableMetadataMap,
167-
reader,
168-
importOptions,
169-
importProcessorFactory,
170-
ScalarDbMode.STORAGE,
171-
scalarDbStorageManager.getDistributedStorage(),
172-
null);
173-
}
145+
ImportManager importManager =
146+
new ImportManager(
147+
tableMetadataMap,
148+
reader,
149+
importOptions,
150+
importProcessorFactory,
151+
ScalarDbMode.TRANSACTION,
152+
TransactionFactory.create(configFile).getTransactionManager());
174153
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
175154
importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory));
176155
} else {

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.scalar.db.dataloader.core.dataimport;
22

3-
import com.scalar.db.api.DistributedStorage;
43
import com.scalar.db.api.DistributedTransactionManager;
54
import com.scalar.db.api.TableMetadata;
65
import com.scalar.db.dataloader.core.ScalarDbMode;
@@ -44,7 +43,6 @@ public class ImportManager implements ImportEventListener {
4443
private final ImportProcessorFactory importProcessorFactory;
4544
private final List<ImportEventListener> listeners = new ArrayList<>();
4645
private final ScalarDbMode scalarDbMode;
47-
private final DistributedStorage distributedStorage;
4846
private final DistributedTransactionManager distributedTransactionManager;
4947

5048
/**
@@ -62,7 +60,6 @@ public void startImport() {
6260
.tableMetadataByTableName(tableMetadata)
6361
.dao(new ScalarDbDao())
6462
.distributedTransactionManager(distributedTransactionManager)
65-
.distributedStorage(distributedStorage)
6663
.tableColumnDataTypes(getTableColumnDataTypes())
6764
.build();
6865
ImportProcessor processor = importProcessorFactory.createImportProcessor(params);
@@ -169,9 +166,7 @@ public void onAllDataChunksCompleted() {
169166
/** Close resources properly once the process is completed */
170167
public void closeResources() {
171168
try {
172-
if (distributedStorage != null) {
173-
distributedStorage.close();
174-
} else if (distributedTransactionManager != null) {
169+
if (distributedTransactionManager != null) {
175170
distributedTransactionManager.close();
176171
}
177172
} catch (Throwable e) {

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.scalar.db.dataloader.core.ScanRange;
1717
import com.scalar.db.exception.storage.ExecutionException;
1818
import com.scalar.db.exception.transaction.CrudException;
19+
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
1920
import com.scalar.db.io.Column;
2021
import com.scalar.db.io.Key;
2122
import java.io.IOException;
@@ -29,31 +30,39 @@
2930
public class ScalarDbDao {
3031

3132
/**
32-
* Retrieve record from ScalarDB instance in storage mode
33+
* Retrieves a record from a ScalarDB table using the specified partition and optional clustering
34+
* keys while operating in storage mode through a {@link DistributedTransactionManager}.
3335
*
34-
* @param namespace Namespace name
35-
* @param table Table name
36-
* @param partitionKey Partition key
37-
* @param clusteringKey Optional clustering key for get
38-
* @param storage Distributed storage for ScalarDB connection that is running in storage mode.
39-
* @return Optional get result
40-
* @throws ScalarDbDaoException if something goes wrong while reading the data
36+
* <p>This method creates a {@link Get} operation for the given namespace and table, executes it
37+
* using the provided transaction manager, and returns the result if the record exists.
38+
*
39+
* @param namespace the name of the ScalarDB namespace containing the target table
40+
* @param table the name of the table to retrieve the record from
41+
* @param partitionKey the partition key identifying the record's partition
42+
* @param clusteringKey the optional clustering key identifying a specific record within the
43+
* partition; may be {@code null} if the table does not use clustering keys
44+
* @param manager the {@link DistributedTransactionManager} instance used to perform the get
45+
* operation
46+
* @return an {@link Optional} containing the {@link Result} if the record exists, or an empty
47+
* {@link Optional} if not found
48+
* @throws ScalarDbDaoException if an error occurs while performing the get operation or
49+
* interacting with ScalarDB
4150
*/
4251
public Optional<Result> get(
4352
String namespace,
4453
String table,
4554
Key partitionKey,
4655
Key clusteringKey,
47-
DistributedStorage storage)
56+
DistributedTransactionManager manager)
4857
throws ScalarDbDaoException {
4958

5059
// Retrieving the key data for logging
5160
String loggingKey = keysToString(partitionKey, clusteringKey);
5261

5362
try {
5463
Get get = createGetWith(namespace, table, partitionKey, clusteringKey);
55-
return storage.get(get);
56-
} catch (ExecutionException e) {
64+
return manager.get(get);
65+
} catch (CrudException | UnknownTransactionStatusException e) {
5766
throw new ScalarDbDaoException("error GET " + loggingKey, e);
5867
}
5968
}
@@ -117,28 +126,37 @@ public void put(
117126
}
118127

119128
/**
120-
* Save record in ScalarDB instance
129+
* Saves a record into a ScalarDB table using the specified partition and optional clustering keys
130+
* through a {@link DistributedTransactionManager}.
121131
*
122-
* @param namespace Namespace name
123-
* @param table Table name
124-
* @param partitionKey Partition key
125-
* @param clusteringKey Optional clustering key
126-
* @param columns List of column values to be inserted or updated
127-
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
128-
* @throws ScalarDbDaoException if something goes wrong while executing the transaction
132+
* <p>This method constructs a {@link Put} operation with the provided key and column information,
133+
* then executes it using the given transaction manager. The operation inserts a new record or
134+
* updates an existing one if a record with the same primary key already exists.
135+
*
136+
* @param namespace the name of the ScalarDB namespace containing the target table
137+
* @param table the name of the table where the record will be inserted or updated
138+
* @param partitionKey the partition key identifying the record's partition
139+
* @param clusteringKey the optional clustering key identifying a specific record within the
140+
* partition; may be {@code null} if the table does not use clustering keys
141+
* @param columns the list of {@link Column} objects representing the column values to insert or
142+
* update
143+
* @param manager the {@link DistributedTransactionManager} instance used to perform the put
144+
* operation
145+
* @throws ScalarDbDaoException if an error occurs while executing the put operation or
146+
* interacting with ScalarDB
129147
*/
130148
public void put(
131149
String namespace,
132150
String table,
133151
Key partitionKey,
134152
Key clusteringKey,
135153
List<Column<?>> columns,
136-
DistributedStorage storage)
154+
DistributedTransactionManager manager)
137155
throws ScalarDbDaoException {
138156
Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns);
139157
try {
140-
storage.put(put);
141-
} catch (ExecutionException e) {
158+
manager.put(put);
159+
} catch (CrudException | UnknownTransactionStatusException e) {
142160
throw new ScalarDbDaoException(
143161
DataLoaderError.ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e);
144162
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importR
360360
.dao(params.getDao())
361361
.build();
362362
ImportTaskResult importRecordResult =
363-
new ImportStorageTask(taskParams, params.getDistributedStorage()).execute();
363+
new ImportStorageTask(taskParams, params.getDistributedTransactionManager()).execute();
364364

365365
ImportTaskResult modifiedTaskResult =
366366
ImportTaskResult.builder()

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.dataloader.core.dataimport.task;
22

33
import com.scalar.db.api.DistributedStorage;
4+
import com.scalar.db.api.DistributedTransactionManager;
45
import com.scalar.db.api.Result;
56
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException;
67
import com.scalar.db.io.Column;
@@ -9,12 +10,12 @@
910
import java.util.Optional;
1011

1112
/**
12-
* An import task that interacts with a {@link DistributedStorage} for data retrieval and storage
13+
* An import task that interacts with a {@link DistributedStorage} for data retrieval and manager
1314
* operations.
1415
*
1516
* <p>This class extends {@link ImportTask} and provides concrete implementations for fetching and
1617
* storing records using a {@link DistributedStorage} instance. It acts as a bridge between the
17-
* import process and the underlying distributed storage system.
18+
* import process and the underlying distributed manager system.
1819
*
1920
* <p>The task handles both read and write operations:
2021
*
@@ -23,31 +24,31 @@
2324
* <li>Storing new or updated records with their associated columns
2425
* </ul>
2526
*
26-
* <p>All storage operations are performed through the provided {@link DistributedStorage} instance,
27+
* <p>All manager operations are performed through the provided {@link DistributedStorage} instance,
2728
* which must be properly initialized before creating this task.
2829
*/
2930
public class ImportStorageTask extends ImportTask {
3031

31-
private final DistributedStorage storage;
32+
private final DistributedTransactionManager manager;
3233

3334
/**
34-
* Constructs an {@code ImportStorageTask} with the specified parameters and storage.
35+
* Constructs an {@code ImportStorageTask} with the specified parameters and manager.
3536
*
3637
* @param params the import task parameters containing configuration and DAO objects
37-
* @param storage the distributed storage instance to be used for data operations
38-
* @throws NullPointerException if either params or storage is null
38+
* @param manager the distributed manager instance to be used for data operations
39+
* @throws NullPointerException if either params or manager is null
3940
*/
40-
public ImportStorageTask(ImportTaskParams params, DistributedStorage storage) {
41+
public ImportStorageTask(ImportTaskParams params, DistributedTransactionManager manager) {
4142
super(params);
42-
this.storage = storage;
43+
this.manager = manager;
4344
}
4445

4546
/**
46-
* Retrieves a data record from the distributed storage using the specified keys.
47+
* Retrieves a data record from the distributed manager using the specified keys.
4748
*
4849
* <p>This method attempts to fetch a single record from the specified table using both partition
4950
* and clustering keys. The operation is performed through the configured DAO using the associated
50-
* storage instance.
51+
* manager instance.
5152
*
5253
* @param namespace the namespace of the table to query
5354
* @param tableName the name of the table to query
@@ -62,15 +63,15 @@ public ImportStorageTask(ImportTaskParams params, DistributedStorage storage) {
6263
protected Optional<Result> getDataRecord(
6364
String namespace, String tableName, Key partitionKey, Key clusteringKey)
6465
throws ScalarDbDaoException {
65-
return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, this.storage);
66+
return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, this.manager);
6667
}
6768

6869
/**
69-
* Saves a record into the distributed storage with the specified keys and columns.
70+
* Saves a record into the distributed manager with the specified keys and columns.
7071
*
7172
* <p>This method writes or updates a record in the specified table using the provided keys and
7273
* column values. The operation is performed through the configured DAO using the associated
73-
* storage instance.
74+
* manager instance.
7475
*
7576
* @param namespace the namespace of the target table
7677
* @param tableName the name of the target table
@@ -88,6 +89,6 @@ protected void saveRecord(
8889
Key clusteringKey,
8990
List<Column<?>> columns)
9091
throws ScalarDbDaoException {
91-
params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, this.storage);
92+
params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, this.manager);
9293
}
9394
}

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static org.mockito.Mockito.mock;
44
import static org.mockito.Mockito.when;
55

6-
import com.scalar.db.api.DistributedStorage;
76
import com.scalar.db.api.DistributedTransaction;
87
import com.scalar.db.api.DistributedTransactionManager;
98
import com.scalar.db.api.TableMetadata;
@@ -32,7 +31,6 @@ class CsvImportProcessorTest {
3231
@Mock TableColumnDataTypes tableColumnDataTypes;
3332

3433
ScalarDbDao dao;
35-
@Mock DistributedStorage distributedStorage;
3634
DistributedTransactionManager distributedTransactionManager;
3735
CsvImportProcessor csvImportProcessor;
3836

@@ -65,7 +63,7 @@ void setup() throws ScalarDbDaoException, TransactionException {
6563
"table",
6664
UnitTestUtils.getPartitionKey(1),
6765
UnitTestUtils.getClusteringKey(),
68-
distributedStorage))
66+
distributedTransactionManager))
6967
.thenReturn(UnitTestUtils.getResult(1));
7068
Mockito.when(
7169
dao.get(
@@ -84,7 +82,6 @@ void test_importProcessWithStorage() {
8482
.scalarDbMode(ScalarDbMode.STORAGE)
8583
.importOptions(importOptions)
8684
.dao(dao)
87-
.distributedStorage(distributedStorage)
8885
.distributedTransactionManager(distributedTransactionManager)
8986
.tableColumnDataTypes(tableColumnDataTypes)
9087
.tableMetadataByTableName(tableMetadataByTableName)
@@ -103,7 +100,6 @@ void test_importProcessWithTransaction() {
103100
.scalarDbMode(ScalarDbMode.TRANSACTION)
104101
.importOptions(importOptions)
105102
.dao(dao)
106-
.distributedStorage(distributedStorage)
107103
.distributedTransactionManager(distributedTransactionManager)
108104
.tableColumnDataTypes(tableColumnDataTypes)
109105
.tableMetadataByTableName(tableMetadataByTableName)

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static org.mockito.Mockito.mock;
44
import static org.mockito.Mockito.when;
55

6-
import com.scalar.db.api.DistributedStorage;
76
import com.scalar.db.api.DistributedTransaction;
87
import com.scalar.db.api.DistributedTransactionManager;
98
import com.scalar.db.api.TableMetadata;
@@ -32,7 +31,6 @@ class JsonImportProcessorTest {
3231
@Mock TableColumnDataTypes tableColumnDataTypes;
3332

3433
ScalarDbDao dao;
35-
@Mock DistributedStorage distributedStorage;
3634
DistributedTransactionManager distributedTransactionManager;
3735
JsonImportProcessor jsonImportProcessor;
3836

@@ -65,7 +63,7 @@ void setup() throws ScalarDbDaoException, TransactionException {
6563
"table",
6664
UnitTestUtils.getPartitionKey(1),
6765
UnitTestUtils.getClusteringKey(),
68-
distributedStorage))
66+
distributedTransactionManager))
6967
.thenReturn(UnitTestUtils.getResult(1));
7068
Mockito.when(
7169
dao.get(
@@ -84,7 +82,6 @@ void test_importProcessWithStorage() {
8482
.scalarDbMode(ScalarDbMode.STORAGE)
8583
.importOptions(importOptions)
8684
.dao(dao)
87-
.distributedStorage(distributedStorage)
8885
.distributedTransactionManager(distributedTransactionManager)
8986
.tableColumnDataTypes(tableColumnDataTypes)
9087
.tableMetadataByTableName(tableMetadataByTableName)
@@ -103,7 +100,6 @@ void test_importProcessWithTransaction() {
103100
.scalarDbMode(ScalarDbMode.TRANSACTION)
104101
.importOptions(importOptions)
105102
.dao(dao)
106-
.distributedStorage(distributedStorage)
107103
.distributedTransactionManager(distributedTransactionManager)
108104
.tableColumnDataTypes(tableColumnDataTypes)
109105
.tableMetadataByTableName(tableMetadataByTableName)

0 commit comments

Comments
 (0)