Skip to content

Commit 326f7a7

Browse files
committed
Added changes based on feedback
1 parent 5b6f86a commit 326f7a7

File tree

3 files changed

+4
-14
lines changed

3 files changed

+4
-14
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.scalar.db.dataloader.core.dataimport.processor;
22

3-
import com.scalar.db.api.DistributedStorage;
43
import com.scalar.db.api.DistributedTransactionManager;
54
import com.scalar.db.api.TableMetadata;
65
import com.scalar.db.dataloader.core.ScalarDbMode;
@@ -36,9 +35,6 @@ public class ImportProcessorParams {
3635
/** Data Access Object for ScalarDB operations. */
3736
ScalarDbDao dao;
3837

39-
/** Storage interface for non-transactional operations. */
40-
DistributedStorage distributedStorage;
41-
4238
/** Transaction manager for handling transactional operations. */
4339
DistributedTransactionManager distributedTransactionManager;
4440
}

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ void setUp() {
4545
options,
4646
processorFactory,
4747
ScalarDbMode.STORAGE,
48-
distributedStorage,
49-
null); // Only one resource present
48+
distributedTransactionManager);
5049
importManager.addListener(listener1);
5150
importManager.addListener(listener2);
5251
}
@@ -57,7 +56,7 @@ void onAllDataChunksCompleted_shouldNotifyListenersAndCloseStorage() {
5756

5857
verify(listener1).onAllDataChunksCompleted();
5958
verify(listener2).onAllDataChunksCompleted();
60-
verify(distributedStorage).close();
59+
verify(distributedTransactionManager).close();
6160
}
6261

6362
@Test
@@ -69,7 +68,7 @@ void onAllDataChunksCompleted_shouldAggregateListenerExceptionAndStillCloseResou
6968

7069
assertTrue(thrown.getMessage().contains("Error during completion"));
7170
assertEquals("Listener1 failed", thrown.getCause().getMessage());
72-
verify(distributedStorage).close();
71+
verify(distributedTransactionManager).close();
7372
}
7473

7574
@Test
@@ -81,7 +80,6 @@ void closeResources_shouldCloseTransactionManagerIfStorageIsNull() {
8180
mock(ImportOptions.class),
8281
mock(ImportProcessorFactory.class),
8382
ScalarDbMode.TRANSACTION,
84-
null,
8583
distributedTransactionManager);
8684

8785
managerWithTx.closeResources();
@@ -90,7 +88,7 @@ void closeResources_shouldCloseTransactionManagerIfStorageIsNull() {
9088

9189
@Test
9290
void closeResources_shouldThrowIfResourceCloseFails() {
93-
doThrow(new RuntimeException("Close failed")).when(distributedStorage).close();
91+
doThrow(new RuntimeException("Close failed")).when(distributedTransactionManager).close();
9492

9593
RuntimeException ex =
9694
assertThrows(RuntimeException.class, () -> importManager.closeResources());

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ void process_withStorageMode_shouldProcessAllDataChunks() {
8787
BufferedReader reader = new BufferedReader(new StringReader("test data"));
8888
when(params.getScalarDbMode()).thenReturn(ScalarDbMode.STORAGE);
8989
when(params.getDao()).thenReturn(dao);
90-
when(params.getDistributedStorage()).thenReturn(distributedStorage);
9190
when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes);
9291

9392
TestImportProcessor processor = new TestImportProcessor(params);
@@ -150,7 +149,6 @@ void process_withMultipleDataChunks_shouldUseThreadPool() {
150149
final int maxThreads = 4;
151150
when(importOptions.getMaxThreads()).thenReturn(maxThreads);
152151
when(params.getDao()).thenReturn(dao);
153-
when(params.getDistributedStorage()).thenReturn(distributedStorage);
154152
when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes);
155153
when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName);
156154

@@ -205,7 +203,6 @@ void process_withLargeNumberOfTasks_shouldWaitForAllTasksToComplete() {
205203
final int maxThreads = 2;
206204
when(importOptions.getMaxThreads()).thenReturn(maxThreads);
207205
when(params.getDao()).thenReturn(dao);
208-
when(params.getDistributedStorage()).thenReturn(distributedStorage);
209206
when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes);
210207
when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName);
211208

@@ -235,7 +232,6 @@ void process_withShutdown_shouldShutdownExecutorsGracefully() {
235232
// Arrange
236233
when(params.getScalarDbMode()).thenReturn(ScalarDbMode.STORAGE);
237234
when(params.getDao()).thenReturn(dao);
238-
when(params.getDistributedStorage()).thenReturn(distributedStorage);
239235
when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes);
240236
when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName);
241237

0 commit comments

Comments
 (0)