Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5198b37
Extend query config with query.masked config with Type.PASSWORD for s…
KGaneshDatta Nov 12, 2025
bb93366
Fix checkstyle errors and added tests
KGaneshDatta Nov 13, 2025
330e9fe
Making query masked config internal and fixed validation
KGaneshDatta Nov 14, 2025
b8d712a
Fix UTs with Validation errors
KGaneshDatta Nov 14, 2025
3a22709
Update the documentation of new config
KGaneshDatta Nov 14, 2025
9330777
Update getQuery() method to return optional
KGaneshDatta Nov 21, 2025
48350ca
Minor checkstyle fixes
KGaneshDatta Nov 21, 2025
a202be6
Fixed Validation with query mode
KGaneshDatta Nov 25, 2025
0081562
Added functionality to fail tasks when non-retriable exceptions
KGaneshDatta Nov 26, 2025
7eb279a
Redact the sensitive values from logs and exceptions and add validati…
KGaneshDatta Nov 28, 2025
34a0807
Remove RetryUtils and add condition to redact values only when query.…
KGaneshDatta Nov 28, 2025
d1dfc62
Minor changes in comments and code
KGaneshDatta Dec 1, 2025
226373d
Removed config object in Tablequerier and masked only query value in …
KGaneshDatta Dec 1, 2025
ad7df13
Refactored code and added few tests for coverage
KGaneshDatta Dec 2, 2025
aed3f90
Fix checkstyle errors
KGaneshDatta Dec 2, 2025
c0922d3
Extended redactSensitiveData logging util and Minor code changes
KGaneshDatta Dec 2, 2025
a74fd74
Minor nits
KGaneshDatta Dec 2, 2025
6bc4195
Fix tests
KGaneshDatta Dec 2, 2025
9c654e9
Remove trim.sensitive.log config in source connector config
KGaneshDatta Dec 3, 2025
552d19d
Renamed sensitiveLog methodName
KGaneshDatta Dec 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!-- JDBC drivers, only included in runtime so they get packaged -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ public void start(Map<String, String> properties) throws ConnectException {
List<String> excludeList = config.tableExcludeListRegexes();
Set<String> excludeListSet = excludeList.isEmpty() ? null : new HashSet<>(excludeList);

String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
if (!query.isEmpty()) {
if (config.getQuery().isPresent()) {
if (whitelistSet != null || blacklistSet != null
|| includeListSet != null || excludeListSet != null) {
log.error(
Expand All @@ -145,7 +144,7 @@ public void start(Map<String, String> properties) throws ConnectException {
excludeListSet, // New
Time.SYSTEM
);
if (query.isEmpty()) {
if (!config.getQuery().isPresent()) {
tableMonitorThread.start();
log.info("Starting Table Monitor Thread");
}
Expand All @@ -170,9 +169,8 @@ public Config validate(Map<String, String> connectorConfigs) {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
log.info("Starting with the task Configuration method.");
String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
List<Map<String, String>> taskConfigs;
if (!query.isEmpty()) {
if (config.getQuery().isPresent()) {
log.info("Custom query provided, generating task configuration for the query");
Map<String, String> taskProps = new HashMap<>(configProperties);
taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ public class BulkTableQuerier extends TableQuerier {
private static final Logger log = LoggerFactory.getLogger(BulkTableQuerier.class);

public BulkTableQuerier(
JdbcSourceTaskConfig config,
DatabaseDialect dialect,
QueryMode mode,
String name,
String topicPrefix,
String suffix
) {
super(dialect, mode, name, topicPrefix, suffix);
super(config, dialect, mode, name, topicPrefix, suffix);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Map;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicReference;

import com.microsoft.sqlserver.jdbc.SQLServerConnection;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -336,6 +338,12 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
public static final String QUERY_DEFAULT = "";
private static final String QUERY_DISPLAY = "Query";

public static final String QUERY_MASKED_CONFIG = "query.masked";
private static final String QUERY_MASKED_DOC =
"Same as 'query' configuration but the query string is masked"
+ "Use this config to prevent sensitive information from being logged.";
private static final String QUERY_MASKED_DISPLAY = "Query (Masked)";

public static final String TOPIC_PREFIX_CONFIG = "topic.prefix";
private static final String TOPIC_PREFIX_DOC =
"Prefix to prepend to table names to generate the name of the Kafka topic to publish data "
Expand Down Expand Up @@ -400,6 +408,9 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
"Number of times to retry SQL exceptions encountered when executing queries.";
public static final String QUERY_RETRIES_DISPLAY = "Query Retry Attempts";

public static final String TRIM_SENSITIVE_LOG_ENABLED = "trim.sensitive.log";
private static final String TRIM_SENSITIVE_LOG_ENABLED_DEFAULT = "false";

/**
* The properties that begin with this prefix will be used to configure a class, specified by
* {@code jdbc.credentials.provider.class} if it implements {@link Configurable}.
Expand All @@ -420,6 +431,20 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
private static final EnumRecommender QUOTE_METHOD_RECOMMENDER =
EnumRecommender.in(QuoteMethod.values());

private static final ConfigDef.Recommender HIDDEN_RECOMMENDER =
new ConfigDef.Recommender() {
@Override
public java.util.List<Object> validValues(
String name, Map<String, Object> config) {
return java.util.Collections.emptyList();
}

@Override
public boolean visible(String name, Map<String, Object> config) {
return false;
}
};

public static final String DATABASE_GROUP = "Database";
public static final String MODE_GROUP = "Mode";
public static final String CONNECTOR_GROUP = "Connector";
Expand Down Expand Up @@ -825,6 +850,13 @@ private static void addSchemaAndDialectOptions(ConfigDef config, int orderInGrou

private static final void addModeOptions(ConfigDef config) {
int orderInGroup = 0;
orderInGroup = defineModeConfig(config, orderInGroup);
orderInGroup = defineIncrementTimestampConfigs(config, orderInGroup);
orderInGroup = defineQueryAndQuoteConfigs(config, orderInGroup);
defineTransactionAndRetryConfigs(config, orderInGroup);
}

private static int defineModeConfig(ConfigDef config, int orderInGroup) {
config.define(
MODE_CONFIG,
Type.STRING,
Expand All @@ -849,7 +881,12 @@ private static final void addModeOptions(ConfigDef config) {
TIMESTAMP_COLUMN_MAPPING_CONFIG,
VALIDATE_NON_NULL_CONFIG
)
).define(
);
return orderInGroup;
}

private static int defineIncrementTimestampConfigs(ConfigDef config, int orderInGroup) {
config.define(
INCREMENTING_COLUMN_NAME_CONFIG,
Type.STRING,
INCREMENTING_COLUMN_NAME_DEFAULT,
Expand Down Expand Up @@ -917,7 +954,12 @@ private static final void addModeOptions(ConfigDef config) {
Width.SHORT,
VALIDATE_NON_NULL_DISPLAY,
MODE_DEPENDENTS_RECOMMENDER
).define(
);
return orderInGroup;
}

private static int defineQueryAndQuoteConfigs(ConfigDef config, int orderInGroup) {
config.define(
QUERY_CONFIG,
Type.STRING,
QUERY_DEFAULT,
Expand All @@ -927,6 +969,17 @@ private static final void addModeOptions(ConfigDef config) {
++orderInGroup,
Width.SHORT,
QUERY_DISPLAY
).define(
QUERY_MASKED_CONFIG,
Type.PASSWORD,
QUERY_DEFAULT,
Importance.MEDIUM,
QUERY_MASKED_DOC,
MODE_GROUP,
++orderInGroup,
Width.SHORT,
QUERY_MASKED_DISPLAY,
HIDDEN_RECOMMENDER
).define(
QUOTE_SQL_IDENTIFIERS_CONFIG,
Type.STRING,
Expand All @@ -948,7 +1001,12 @@ private static final void addModeOptions(ConfigDef config) {
++orderInGroup,
Width.MEDIUM,
QUERY_SUFFIX_DISPLAY
).define(
);
return orderInGroup;
}

private static void defineTransactionAndRetryConfigs(ConfigDef config, int orderInGroup) {
config.define(
TRANSACTION_ISOLATION_MODE_CONFIG,
Type.STRING,
TRANSACTION_ISOLATION_MODE_DEFAULT,
Expand Down Expand Up @@ -1029,6 +1087,11 @@ private static final void addConnectorOptions(ConfigDef config) {
Type.LONG,
TABLE_MONITORING_STARTUP_POLLING_LIMIT_MS_DEFAULT,
Importance.LOW
).defineInternal(
TRIM_SENSITIVE_LOG_ENABLED,
ConfigDef.Type.BOOLEAN,
TRIM_SENSITIVE_LOG_ENABLED_DEFAULT,
ConfigDef.Importance.LOW
).define(
TABLE_POLL_INTERVAL_MS_CONFIG,
Type.LONG,
Expand Down Expand Up @@ -1432,6 +1495,33 @@ public List<String> incrementingColMappingRegexes() {
.collect(java.util.stream.Collectors.toList());
}

/**
* Get the query string from either query or query.masked config.
* Prioritizes query.masked over query if both are set (though validation should prevent this).
*
* @return Optional containing the query string if present, empty Optional otherwise.
*/
public Optional<String> getQuery() {
Password maskedQuery = getPassword(QUERY_MASKED_CONFIG);
if (maskedQuery != null && maskedQuery.value() != null && !maskedQuery.value().isEmpty()) {
return Optional.of(maskedQuery.value());
}

String query = getString(QUERY_CONFIG);
if (query != null && !query.isEmpty()) {
return Optional.of(query);
}

return Optional.empty();
}

public boolean isQueryMasked() {
Password maskedQuery = getPassword(QUERY_MASKED_CONFIG);
return maskedQuery != null
&& maskedQuery.value() != null
&& !maskedQuery.value().isEmpty();
}

public boolean modeUsesTimestampColumn() {
String mode = getString(MODE_CONFIG);
return Arrays.asList(MODE_TIMESTAMP, MODE_TIMESTAMP_INCREMENTING).contains(mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@

package io.confluent.connect.jdbc.source;

import io.confluent.connect.jdbc.util.LogUtil;
import io.confluent.connect.jdbc.util.TableId;
import io.confluent.connect.jdbc.util.TableCollectionUtils;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.CachedConnectionProvider;
import io.confluent.connect.jdbc.util.RecordQueue;
import io.confluent.connect.jdbc.util.Version;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -43,13 +51,6 @@

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.util.CachedConnectionProvider;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.RecordQueue;
import io.confluent.connect.jdbc.util.TableCollectionUtils;
import io.confluent.connect.jdbc.util.TableId;
import io.confluent.connect.jdbc.util.Version;
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TransactionIsolationMode;

/**
Expand Down Expand Up @@ -95,10 +96,9 @@ public void start(Map<String, String> properties) {

List<String> tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG);
Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED);
String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG);
List<String> tableType = config.getList(JdbcSourceConnectorConfig.TABLE_TYPE_CONFIG);

if ((tables.isEmpty() && query.isEmpty())) {
if ((tables.isEmpty() && !config.getQuery().isPresent())) {
// We are still waiting for the tables call to complete.
// Start task but do nothing.
if (!tablesFetched) {
Expand All @@ -115,7 +115,7 @@ public void start(Map<String, String> properties) {
+ " table name.");
}

if ((!tables.isEmpty() && !query.isEmpty())) {
if ((!tables.isEmpty() && config.getQuery().isPresent())) {
throw new ConfigException("Invalid configuration: a JdbcSourceTask"
+ " cannot have both a table and a query assigned to it");
}
Expand Down Expand Up @@ -147,10 +147,10 @@ public void start(Map<String, String> properties) {
)
)
);
TableQuerier.QueryMode queryMode = !query.isEmpty() ? TableQuerier.QueryMode.QUERY :
TableQuerier.QueryMode.TABLE;
TableQuerier.QueryMode queryMode =
config.getQuery().isPresent() ? TableQuerier.QueryMode.QUERY : TableQuerier.QueryMode.TABLE;
final List<String> tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY
? Collections.singletonList(query) : tables;
? Collections.singletonList(config.getQuery().get()) : tables;

String mode = config.getString(JdbcSourceTaskConfig.MODE_CONFIG);
//used only in table mode
Expand Down Expand Up @@ -288,6 +288,7 @@ mode, getIncrementingColumn(tables.get(0)),
if (mode.equals(JdbcSourceTaskConfig.MODE_BULK)) {
tableQueue.add(
new BulkTableQuerier(
config,
dialect,
queryMode,
tableOrQuery,
Expand All @@ -298,6 +299,7 @@ mode, getIncrementingColumn(tables.get(0)),
} else if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)) {
tableQueue.add(
new TimestampIncrementingTableQuerier(
config,
dialect,
queryMode,
tableOrQuery,
Expand All @@ -314,6 +316,7 @@ mode, getIncrementingColumn(tables.get(0)),
} else if (mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)) {
tableQueue.add(
new TimestampTableQuerier(
config,
dialect,
queryMode,
tableOrQuery,
Expand All @@ -329,6 +332,7 @@ mode, getIncrementingColumn(tables.get(0)),
} else if (mode.endsWith(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) {
tableQueue.add(
new TimestampIncrementingTableQuerier(
config,
dialect,
queryMode,
tableOrQuery,
Expand Down Expand Up @@ -459,6 +463,9 @@ protected Map<String, Object> computeInitialOffset(
String tableOrQuery,
Map<String, Object> partitionOffset,
ZoneId zoneId) {
if (config.isQueryMasked()) {
tableOrQuery = LogUtil.sensitiveLog(true, tableOrQuery);
}
if (!(partitionOffset == null)) {
log.info("Partition offset for '{}' is not null. Using existing offset.", tableOrQuery);
return partitionOffset;
Expand Down
Loading