Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ public Integer call() throws Exception {
FileUtils.validateFilePath(scalarDbPropertiesFilePath);
validatePositiveValue(
spec.commandLine(), dataChunkSize, DataLoaderError.INVALID_DATA_CHUNK_SIZE);
validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS);
// Only validate the argument when provided by the user, if not set a default
if (maxThreads != null) {
validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS);
} else {
maxThreads = Runtime.getRuntime().availableProcessors();
}

StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
TableMetadataService metaDataService =
Expand Down Expand Up @@ -126,6 +131,11 @@ private void validateDeprecatedOptions() {
DEPRECATED_END_EXCLUSIVE_OPTION,
END_INCLUSIVE_OPTION,
END_INCLUSIVE_OPTION_SHORT);
validateDeprecatedOptionPair(
spec.commandLine(),
DEPRECATED_THREADS_OPTION,
MAX_THREADS_OPTION,
MAX_THREADS_OPTION_SHORT);
}

private String getScalarDbPropertiesFilePath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public class ExportCommandOptions {
public static final String END_INCLUSIVE_OPTION = "--end-inclusive";
public static final String END_INCLUSIVE_OPTION_SHORT = "-ei";
public static final String DEPRECATED_END_EXCLUSIVE_OPTION = "--end-exclusive";
public static final String MAX_THREADS_OPTION = "--max-threads";
public static final String MAX_THREADS_OPTION_SHORT = "-mt";
public static final String DEPRECATED_THREADS_OPTION = "--threads";

@CommandLine.Option(
names = {"--config", "-c"},
Expand Down Expand Up @@ -77,7 +80,18 @@ public class ExportCommandOptions {
paramLabel = "<MAX_THREADS>",
description =
"Maximum number of threads to use for parallel processing (default: number of available processors)")
protected int maxThreads;
protected Integer maxThreads;

/**
* @deprecated As of release 3.6.2. Will be removed in release 4.0.0. Use --max-threads instead
*/
@Deprecated
@CommandLine.Option(
names = {DEPRECATED_THREADS_OPTION},
paramLabel = "<THREADS>",
description = "Deprecated: Use --max-threads instead",
hidden = true)
protected Integer threadsDeprecated;

@CommandLine.Option(
names = {"--start-key", "-sk"},
Expand Down Expand Up @@ -184,5 +198,10 @@ public void applyDeprecatedOptions() {
if (endExclusiveDeprecated != null) {
scanEndInclusive = !endExclusiveDeprecated;
}

// If the deprecated option is set, use its value
if (threadsDeprecated != null) {
maxThreads = threadsDeprecated;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ public Integer call() throws Exception {
spec.commandLine(), dataChunkSize, DataLoaderError.INVALID_DATA_CHUNK_SIZE);
validatePositiveValue(
spec.commandLine(), transactionSize, DataLoaderError.INVALID_TRANSACTION_SIZE);
validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS);
// Only validate the argument when provided by the user, if not set a default
if (maxThreads != null) {
validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS);
} else {
maxThreads = Runtime.getRuntime().availableProcessors();
}
validatePositiveValue(
spec.commandLine(), dataChunkQueueSize, DataLoaderError.INVALID_DATA_CHUNK_QUEUE_SIZE);
ControlFile controlFile = parseControlFileFromPath(controlFilePath).orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,18 @@ public class ImportCommandOptions {
names = {"--max-threads", "-mt"},
paramLabel = "<MAX_THREADS>",
description =
"Maximum number of threads to use for parallel processing (default: number of available processors)",
defaultValue = "16")
protected int maxThreads;
"Maximum number of threads to use for parallel processing (default: number of available processors)")
protected Integer maxThreads;

// Deprecated option - kept for backward compatibility
/**
* @deprecated As of release 3.6.2. Will be removed in release 4.0.0. Use --max-threads instead
*/
@Deprecated
@CommandLine.Option(
names = {DEPRECATED_THREADS_OPTION},
paramLabel = "<THREADS>",
description = "Deprecated: Use --max-threads instead",
hidden = true)
@Deprecated
protected Integer threadsDeprecated;

@CommandLine.Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,87 @@ void call_withOnlyDeprecatedEndExclusive_shouldApplyInvertedValue() {
// end-exclusive=false should become end-inclusive=true
assertEquals(true, command.scanEndInclusive);
}

@Test
void call_withOnlyDeprecatedThreads_shouldApplyValue() {
// Simulate command line parsing with only deprecated option
String[] args = {
"--config",
"scalardb.properties",
"--namespace",
"scalar",
"--table",
"asset",
"--format",
"JSON",
"--threads",
"12"
};
ExportCommand command = new ExportCommand();
CommandLine cmd = new CommandLine(command);
cmd.parseArgs(args);

// Verify the deprecated value was parsed
assertEquals(12, command.threadsDeprecated);

// Apply deprecated options (this is what the command does after validation)
command.applyDeprecatedOptions();

// Verify the value was applied to maxThreads
assertEquals(12, command.maxThreads);
}

@Test
void call_withMaxThreadsSpecified_shouldUseSpecifiedValue() {
// Simulate command line parsing with --max-threads
String[] args = {
"--config",
"scalardb.properties",
"--namespace",
"scalar",
"--table",
"asset",
"--format",
"JSON",
"--max-threads",
"8"
};
ExportCommand command = new ExportCommand();
CommandLine cmd = new CommandLine(command);
cmd.parseArgs(args);

// Verify the value was parsed
assertEquals(8, command.maxThreads);
}

@Test
void call_withoutMaxThreads_shouldDefaultToAvailableProcessors() {
// Simulate command line parsing without --max-threads
String[] args = {
"--config",
"scalardb.properties",
"--namespace",
"scalar",
"--table",
"asset",
"--format",
"JSON"
};
ExportCommand command = new ExportCommand();
CommandLine cmd = new CommandLine(command);
cmd.parseArgs(args);

// Verify maxThreads is null before validation
assertEquals(null, command.maxThreads);

// Simulate what happens in call() after validation
command.spec = cmd.getCommandSpec();
command.applyDeprecatedOptions();
if (command.maxThreads == null) {
command.maxThreads = Runtime.getRuntime().availableProcessors();
}

// Verify it was set to available processors
assertEquals(Runtime.getRuntime().availableProcessors(), command.maxThreads);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,54 @@ void call_withEnableLogSuccessShortForm_shouldSetToTrueWithoutValue() throws Exc
// Verify the short form flag was parsed correctly without requiring a value
assertTrue(command.enableLogSuccessRecords);
}

@Test
void call_withMaxThreadsSpecified_shouldUseSpecifiedValue() {
// Simulate command line parsing with --max-threads
String[] args = {
"--config",
"scalardb.properties",
"--file",
"import.json",
"--namespace",
"scalar",
"--table",
"asset",
"--max-threads",
"8"
};
ImportCommand command = new ImportCommand();
CommandLine cmd = new CommandLine(command);
cmd.parseArgs(args);

// Verify the value was parsed
assertEquals(8, command.maxThreads);
}

@Test
void call_withoutMaxThreads_shouldDefaultToAvailableProcessors() {
// Simulate command line parsing without --max-threads
String[] args = {
"--config", "scalardb.properties",
"--file", "import.json",
"--namespace", "scalar",
"--table", "asset"
};
ImportCommand command = new ImportCommand();
CommandLine cmd = new CommandLine(command);
cmd.parseArgs(args);

// Verify maxThreads is null before validation
assertEquals(null, command.maxThreads);

// Simulate what happens in call() after validation
command.spec = cmd.getCommandSpec();
command.applyDeprecatedOptions();
if (command.maxThreads == null) {
command.maxThreads = Runtime.getRuntime().availableProcessors();
}

// Verify it was set to available processors
assertEquals(Runtime.getRuntime().availableProcessors(), command.maxThreads);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,8 @@ public ExportReport startExport(
handleTransactionMetadata(exportOptions, tableMetadata);
processHeader(exportOptions, tableMetadata, writer);

int maxThreadCount =
exportOptions.getMaxThreadCount() == 0
? Runtime.getRuntime().availableProcessors()
: exportOptions.getMaxThreadCount();
ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount);
ExecutorService executorService =
Executors.newFixedThreadPool(exportOptions.getMaxThreadCount());

BufferedWriter bufferedWriter = new BufferedWriter(writer);
boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public class ExportOptions {
private final FileFormat outputFileFormat;
private final ScanRange scanRange;
private final int limit;
private final int maxThreadCount;
private final boolean prettyPrintJson;

@Builder.Default private final int dataChunkSize = 200;
@Builder.Default private final int maxThreadCount = Runtime.getRuntime().availableProcessors();
@Builder.Default private final String delimiter = ";";
@Builder.Default private final boolean excludeHeaderRow = false;
@Builder.Default private final boolean includeTransactionMetadata = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ImportOptions {
private final ControlFile controlFile;
private final String namespace;
private final String tableName;
private final int maxThreads;
@Builder.Default private final int maxThreads = Runtime.getRuntime().availableProcessors();
private final String customHeaderRow;
private final int dataChunkQueueSize;
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,32 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile()
Assertions.assertTrue(file.exists());
Assertions.assertTrue(file.delete());
}

@Test
void exportOptions_withoutMaxThreadCount_shouldUseDefaultAvailableProcessors() {
// Create ExportOptions without explicitly setting maxThreadCount
ExportOptions exportOptions =
ExportOptions.builder("namespace", "table", null, FileFormat.JSON)
.sortOrders(Collections.emptyList())
.scanRange(new ScanRange(null, null, false, false))
.build();

// Verify the default was applied
Assertions.assertEquals(
Runtime.getRuntime().availableProcessors(), exportOptions.getMaxThreadCount());
}

@Test
void exportOptions_withExplicitMaxThreadCount_shouldUseProvidedValue() {
// Create ExportOptions with explicit maxThreadCount
ExportOptions exportOptions =
ExportOptions.builder("namespace", "table", null, FileFormat.JSON)
.sortOrders(Collections.emptyList())
.scanRange(new ScanRange(null, null, false, false))
.maxThreadCount(8)
.build();

// Verify the explicit value was used
Assertions.assertEquals(8, exportOptions.getMaxThreadCount());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.scalar.db.dataloader.core.dataimport;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

/** Unit tests for ImportOptions builder and default values. */
public class ImportOptionsTest {

@Test
void importOptions_withoutMaxThreads_shouldUseDefaultAvailableProcessors() {
// Create ImportOptions without explicitly setting maxThreads
ImportOptions importOptions =
ImportOptions.builder()
.namespace("test_namespace")
.tableName("test_table")
.dataChunkSize(100)
.transactionBatchSize(10)
.dataChunkQueueSize(64)
.build();

// Verify the default was applied
assertEquals(Runtime.getRuntime().availableProcessors(), importOptions.getMaxThreads());
}

@Test
void importOptions_withExplicitMaxThreads_shouldUseProvidedValue() {
// Create ImportOptions with explicit maxThreads
ImportOptions importOptions =
ImportOptions.builder()
.namespace("test_namespace")
.tableName("test_table")
.dataChunkSize(100)
.transactionBatchSize(10)
.maxThreads(8)
.dataChunkQueueSize(64)
.build();

// Verify the explicit value was used
assertEquals(8, importOptions.getMaxThreads());
}
}