diff --git a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java index 2d85039c2..9cfa4a792 100644 --- a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java +++ b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java @@ -117,8 +117,7 @@ public void start(Map properties) throws ConnectException { List excludeList = config.tableExcludeListRegexes(); Set excludeListSet = excludeList.isEmpty() ? null : new HashSet<>(excludeList); - String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); - if (!query.isEmpty()) { + if (config.getQuery().isPresent()) { if (whitelistSet != null || blacklistSet != null || includeListSet != null || excludeListSet != null) { log.error( @@ -145,7 +144,7 @@ public void start(Map properties) throws ConnectException { excludeListSet, // New Time.SYSTEM ); - if (query.isEmpty()) { + if (!config.getQuery().isPresent()) { tableMonitorThread.start(); log.info("Starting Table Monitor Thread"); } @@ -170,9 +169,8 @@ public Config validate(Map connectorConfigs) { @Override public List> taskConfigs(int maxTasks) { log.info("Starting with the task Configuration method."); - String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); List> taskConfigs; - if (!query.isEmpty()) { + if (config.getQuery().isPresent()) { log.info("Custom query provided, generating task configuration for the query"); Map taskProps = new HashMap<>(configProperties); taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, ""); diff --git a/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java index 2f2d0613d..f46dd55ae 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java +++ b/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java @@ -15,6 +15,7 @@ package io.confluent.connect.jdbc.source; +import io.confluent.connect.jdbc.util.LogUtil; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; @@ -44,9 +45,10 @@ public BulkTableQuerier( QueryMode mode, String name, String topicPrefix, - String suffix + String suffix, + Boolean isQueryMasked ) { - super(dialect, mode, name, topicPrefix, suffix); + super(dialect, mode, name, topicPrefix, suffix, isQueryMasked); } @Override @@ -70,7 +72,8 @@ protected void createPreparedStatement(Connection db) throws SQLException { String queryStr = builder.toString(); recordQuery(queryStr); - log.trace("{} prepared SQL query: {}", this, queryStr); + log.trace( + "{} prepared SQL query: {}", this, LogUtil.maybeRedact(shouldRedactSensitiveLogs, query)); stmt = dialect.createPreparedStatement(db, queryStr); } @@ -116,8 +119,17 @@ public SourceRecord extractRecord() throws SQLException { @Override public String toString() { - return "BulkTableQuerier{" + "table='" + tableId + '\'' + ", query='" + query + '\'' - + ", topicPrefix='" + topicPrefix + '\'' + '}'; + return "BulkTableQuerier{" + + "table='" + + tableId + + '\'' + + ", query='" + + LogUtil.maybeRedact(shouldRedactSensitiveLogs, query) + + '\'' + + ", topicPrefix='" + + topicPrefix + + '\'' + + '}'; } } diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 2fa613205..651aef7d3 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -27,6 +27,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import com.microsoft.sqlserver.jdbc.SQLServerConnection; @@ -54,6 +55,7 @@ import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; @@ -336,6 +338,12 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { public static final String QUERY_DEFAULT = ""; private static final String QUERY_DISPLAY = "Query"; + public static final String QUERY_MASKED_CONFIG = "query.masked"; + private static final String QUERY_MASKED_DOC = + "Same as 'query' configuration but the query string is masked" + + "Use this config to prevent sensitive information from being logged."; + private static final String QUERY_MASKED_DISPLAY = "Query (Masked)"; + public static final String TOPIC_PREFIX_CONFIG = "topic.prefix"; private static final String TOPIC_PREFIX_DOC = "Prefix to prepend to table names to generate the name of the Kafka topic to publish data " @@ -420,6 +428,25 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { private static final EnumRecommender QUOTE_METHOD_RECOMMENDER = EnumRecommender.in(QuoteMethod.values()); + /** + * A recommender that hides configuration parameters from being displayed in config list + * This is used for sensitive or internal configurations that should not be exposed + * to users through standard configuration interfaces. + */ + private static final ConfigDef.Recommender HIDDEN_RECOMMENDER = + new ConfigDef.Recommender() { + @Override + public java.util.List validValues( + String name, Map config) { + return java.util.Collections.emptyList(); + } + + @Override + public boolean visible(String name, Map config) { + return false; + } + }; + public static final String DATABASE_GROUP = "Database"; public static final String MODE_GROUP = "Mode"; public static final String CONNECTOR_GROUP = "Connector"; @@ -825,6 +852,13 @@ private static void addSchemaAndDialectOptions(ConfigDef config, int orderInGrou private static final void addModeOptions(ConfigDef config) { int orderInGroup = 0; + orderInGroup = defineModeConfig(config, orderInGroup); + orderInGroup = defineIncrementTimestampConfigs(config, orderInGroup); + orderInGroup = defineQueryAndQuoteConfigs(config, orderInGroup); + defineTransactionAndRetryConfigs(config, orderInGroup); + } + + private static int defineModeConfig(ConfigDef config, int orderInGroup) { config.define( MODE_CONFIG, Type.STRING, @@ -849,7 +883,12 @@ private static final void addModeOptions(ConfigDef config) { TIMESTAMP_COLUMN_MAPPING_CONFIG, VALIDATE_NON_NULL_CONFIG ) - ).define( + ); + return orderInGroup; + } + + private static int defineIncrementTimestampConfigs(ConfigDef config, int orderInGroup) { + config.define( INCREMENTING_COLUMN_NAME_CONFIG, Type.STRING, INCREMENTING_COLUMN_NAME_DEFAULT, @@ -917,7 +956,12 @@ private static final void addModeOptions(ConfigDef config) { Width.SHORT, VALIDATE_NON_NULL_DISPLAY, MODE_DEPENDENTS_RECOMMENDER - ).define( + ); + return orderInGroup; + } + + private static int defineQueryAndQuoteConfigs(ConfigDef config, int orderInGroup) { + config.define( QUERY_CONFIG, Type.STRING, QUERY_DEFAULT, @@ -927,6 +971,17 @@ private static final void addModeOptions(ConfigDef config) { ++orderInGroup, Width.SHORT, QUERY_DISPLAY + ).define( + QUERY_MASKED_CONFIG, + Type.PASSWORD, + QUERY_DEFAULT, + Importance.MEDIUM, + QUERY_MASKED_DOC, + MODE_GROUP, + ++orderInGroup, + Width.SHORT, + QUERY_MASKED_DISPLAY, + HIDDEN_RECOMMENDER ).define( QUOTE_SQL_IDENTIFIERS_CONFIG, Type.STRING, @@ -948,7 +1003,12 @@ private static final void addModeOptions(ConfigDef config) { ++orderInGroup, Width.MEDIUM, QUERY_SUFFIX_DISPLAY - ).define( + ); + return orderInGroup; + } + + private static void defineTransactionAndRetryConfigs(ConfigDef config, int orderInGroup) { + config.define( TRANSACTION_ISOLATION_MODE_CONFIG, Type.STRING, TRANSACTION_ISOLATION_MODE_DEFAULT, @@ -1432,6 +1492,32 @@ public List incrementingColMappingRegexes() { .collect(java.util.stream.Collectors.toList()); } + /** + * Get the query string from either query or query.masked config. + * + * @return Optional containing the query string if present, empty Optional otherwise. + */ + public Optional getQuery() { + Password maskedQuery = getPassword(QUERY_MASKED_CONFIG); + if (maskedQuery != null && maskedQuery.value() != null && !maskedQuery.value().isEmpty()) { + return Optional.of(maskedQuery.value()); + } + + String query = getString(QUERY_CONFIG); + if (query != null && !query.isEmpty()) { + return Optional.of(query); + } + + return Optional.empty(); + } + + public boolean isQueryMasked() { + Password maskedQuery = getPassword(QUERY_MASKED_CONFIG); + return maskedQuery != null + && maskedQuery.value() != null + && !maskedQuery.value().isEmpty(); + } + public boolean modeUsesTimestampColumn() { String mode = getString(MODE_CONFIG); return Arrays.asList(MODE_TIMESTAMP, MODE_TIMESTAMP_INCREMENTING).contains(mode); diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java index 7508e05ce..64d49b895 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java @@ -50,6 +50,7 @@ import io.confluent.connect.jdbc.util.TableCollectionUtils; import io.confluent.connect.jdbc.util.TableId; import io.confluent.connect.jdbc.util.Version; +import io.confluent.connect.jdbc.util.LogUtil; import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TransactionIsolationMode; /** @@ -95,10 +96,9 @@ public void start(Map properties) { List tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG); Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED); - String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG); List tableType = config.getList(JdbcSourceConnectorConfig.TABLE_TYPE_CONFIG); - if ((tables.isEmpty() && query.isEmpty())) { + if ((tables.isEmpty() && !config.getQuery().isPresent())) { // We are still waiting for the tables call to complete. // Start task but do nothing. if (!tablesFetched) { @@ -115,7 +115,7 @@ public void start(Map properties) { + " table name."); } - if ((!tables.isEmpty() && !query.isEmpty())) { + if ((!tables.isEmpty() && config.getQuery().isPresent())) { throw new ConfigException("Invalid configuration: a JdbcSourceTask" + " cannot have both a table and a query assigned to it"); } @@ -147,10 +147,10 @@ public void start(Map properties) { ) ) ); - TableQuerier.QueryMode queryMode = !query.isEmpty() ? TableQuerier.QueryMode.QUERY : - TableQuerier.QueryMode.TABLE; + TableQuerier.QueryMode queryMode = + config.getQuery().isPresent() ? TableQuerier.QueryMode.QUERY : TableQuerier.QueryMode.TABLE; final List tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY - ? Collections.singletonList(query) : tables; + ? Collections.singletonList(config.getQuery().get()) : tables; String mode = config.getString(JdbcSourceTaskConfig.MODE_CONFIG); //used only in table mode @@ -225,6 +225,7 @@ public void start(Map properties) { = config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG); ZoneId zoneId = config.zoneId(); String suffix = config.getString(JdbcSourceTaskConfig.QUERY_SUFFIX_CONFIG).trim(); + Boolean queryMasked = config.isQueryMasked(); if (queryMode.equals(TableQuerier.QueryMode.TABLE)) { validateColumnsExist( @@ -292,7 +293,8 @@ mode, getIncrementingColumn(tables.get(0)), queryMode, tableOrQuery, topicPrefix, - suffix + suffix, + queryMasked ) ); } else if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)) { @@ -308,7 +310,8 @@ mode, getIncrementingColumn(tables.get(0)), timestampDelayInterval, zoneId, suffix, - timestampGranularity + timestampGranularity, + queryMasked ) ); } else if (mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)) { @@ -323,7 +326,8 @@ mode, getIncrementingColumn(tables.get(0)), timestampDelayInterval, zoneId, suffix, - timestampGranularity + timestampGranularity, + queryMasked ) ); } else if (mode.endsWith(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) { @@ -339,7 +343,8 @@ mode, getIncrementingColumn(tables.get(0)), timestampDelayInterval, zoneId, suffix, - timestampGranularity + timestampGranularity, + queryMasked ) ); } @@ -459,6 +464,9 @@ protected Map computeInitialOffset( String tableOrQuery, Map partitionOffset, ZoneId zoneId) { + if (config.isQueryMasked()) { + tableOrQuery = LogUtil.maybeRedact(true, tableOrQuery); + } if (!(partitionOffset == null)) { log.info("Partition offset for '{}' is not null. Using existing offset.", tableOrQuery); return partitionOffset; diff --git a/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java index 9f070c22b..b679b6500 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java @@ -15,6 +15,7 @@ package io.confluent.connect.jdbc.source; +import io.confluent.connect.jdbc.util.LogUtil; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ public enum QueryMode { protected ResultSet resultSet; protected SchemaMapping schemaMapping; private String loggedQueryString; + protected final Boolean shouldRedactSensitiveLogs; private int attemptedRetries; @@ -64,7 +66,8 @@ public TableQuerier( QueryMode mode, String nameOrQuery, String topicPrefix, - String suffix + String suffix, + Boolean isQueryMasked ) { this.dialect = dialect; this.mode = mode; @@ -74,6 +77,7 @@ public TableQuerier( this.lastUpdate = 0; this.suffix = suffix; this.attemptedRetries = 0; + this.shouldRedactSensitiveLogs = isQueryMasked; } public long getLastUpdate() { @@ -179,7 +183,7 @@ protected void addSuffixIfPresent(ExpressionBuilder builder) { protected void recordQuery(String query) { if (query != null && !query.equals(loggedQueryString)) { // For usability, log the statement at INFO level only when it changes - log.info("Begin using SQL query: {}", query); + log.info("Begin using SQL query: {}", LogUtil.maybeRedact(shouldRedactSensitiveLogs, query)); loggedQueryString = query; } } diff --git a/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java b/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java index f25b5c482..2d9462113 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java @@ -6,6 +6,7 @@ import io.confluent.connect.jdbc.dialect.DatabaseDialect; import io.confluent.connect.jdbc.util.CachedConnectionProvider; +import io.confluent.connect.jdbc.util.LogUtil; import io.confluent.connect.jdbc.util.RecordDestination; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; @@ -25,6 +26,7 @@ public class TableQuerierProcessor { private final JdbcSourceTaskConfig config; private final Time time; private final PriorityQueue tableQueue; + private final Boolean shouldRedactSensitiveLogs; private CachedConnectionProvider cachedConnectionProvider; private final int maxRetriesPerQuerier; private final Duration timeout = Duration.ofSeconds(90); @@ -41,6 +43,7 @@ public TableQuerierProcessor( this.tableQueue = tableQueue; this.cachedConnectionProvider = cachedConnectionProvider; this.maxRetriesPerQuerier = config.getInt(JdbcSourceConnectorConfig.QUERY_RETRIES_CONFIG); + this.shouldRedactSensitiveLogs = config.isQueryMasked(); } public long process(RecordDestination destination) { @@ -95,8 +98,7 @@ private boolean isReadyToProcess() { // If the call to get tables has not completed we will not do anything. // This is only valid in table mode. Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED); - String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG); - return !query.isEmpty() || tablesFetched; + return config.getQuery().isPresent() || tablesFetched; } private void processQuerier(RecordDestination destination, TableQuerier querier) @@ -131,28 +133,30 @@ private void processQuerier(RecordDestination destination, TableQu private void handleNonTransientException(RecordDestination destination, TableQuerier querier, SQLNonTransientException sqle) { + SQLException redactedException = shouldRedactSensitiveLogs + ? LogUtil.redactSensitiveData(sqle) : sqle; log.error("Non-transient SQL exception while running query for table: {}", - querier, sqle); + querier, redactedException); resetAndRequeueHead(querier, true); // This task has failed, report failure to destination - destination.failWith(new ConnectException(sqle)); + destination.failWith(new ConnectException(redactedException)); } - private void handleSqlException(RecordDestination destination, + private void handleSqlException(RecordDestination destination, TableQuerier querier, SQLException sqle) { + SQLException redactedException = shouldRedactSensitiveLogs + ? LogUtil.redactSensitiveData(sqle) : sqle; log.error( - "SQL exception while running query for table: {}." - + " Attempting retry {} of {} attempts.", + "SQL exception while running query for table: {}." + " Attempting retry {} of {} attempts.", querier, querier.getAttemptedRetryCount() + 1, maxRetriesPerQuerier, - sqle - ); + redactedException); resetAndRequeueHead(querier, false); - if (maxRetriesPerQuerier > 0 - && querier.getAttemptedRetryCount() >= maxRetriesPerQuerier) { - destination.failWith(new ConnectException("Failed to Query table after retries", sqle)); + if (maxRetriesPerQuerier > 0 && querier.getAttemptedRetryCount() >= maxRetriesPerQuerier) { + destination.failWith( + new ConnectException("Failed to query table after retries", redactedException)); return; } querier.incrementRetryCount(); diff --git a/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java index f90dbf44b..c727d88c6 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java @@ -42,6 +42,7 @@ import io.confluent.connect.jdbc.util.ColumnId; import io.confluent.connect.jdbc.util.DateTimeUtils; import io.confluent.connect.jdbc.util.ExpressionBuilder; +import io.confluent.connect.jdbc.util.LogUtil; /** *

@@ -77,14 +78,20 @@ public class TimestampIncrementingTableQuerier extends TableQuerier implements C private final long timestampDelay; private final ZoneId zoneId; - public TimestampIncrementingTableQuerier(DatabaseDialect dialect, QueryMode mode, String name, - String topicPrefix, - List timestampColumnNames, - String incrementingColumnName, - Map offsetMap, Long timestampDelay, - ZoneId zoneId, String suffix, - TimestampGranularity timestampGranularity) { - super(dialect, mode, name, topicPrefix, suffix); + public TimestampIncrementingTableQuerier( + DatabaseDialect dialect, + QueryMode mode, + String name, + String topicPrefix, + List timestampColumnNames, + String incrementingColumnName, + Map offsetMap, + Long timestampDelay, + ZoneId zoneId, + String suffix, + TimestampGranularity timestampGranularity, + Boolean isQueryMasked) { + super(dialect, mode, name, topicPrefix, suffix, isQueryMasked); this.incrementingColumnName = incrementingColumnName; this.timestampColumnNames = timestampColumnNames != null ? timestampColumnNames : Collections.emptyList(); @@ -270,13 +277,19 @@ public Long lastIncrementedValue() { @Override public String toString() { return "TimestampIncrementingTableQuerier{" - + "table=" + tableId - + ", query='" + query + '\'' - + ", topicPrefix='" + topicPrefix + '\'' - + ", incrementingColumn='" + (incrementingColumnName != null - ? incrementingColumnName - : "") + '\'' - + ", timestampColumns=" + timestampColumnNames - + '}'; + + "table=" + + tableId + + ", query='" + + LogUtil.maybeRedact(shouldRedactSensitiveLogs, query) + + '\'' + + ", topicPrefix='" + + topicPrefix + + '\'' + + ", incrementingColumn='" + + (incrementingColumnName != null ? incrementingColumnName : "") + + '\'' + + ", timestampColumns=" + + timestampColumnNames + + '}'; } } \ No newline at end of file diff --git a/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java index 3d7baef61..eb1d143b6 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java @@ -17,6 +17,7 @@ import java.time.ZoneId; +import io.confluent.connect.jdbc.util.LogUtil; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; @@ -54,6 +55,7 @@ public class TimestampTableQuerier extends TimestampIncrementingTableQuerier { private PendingRecord nextRecord; private Timestamp latestCommittableTimestamp; + @SuppressWarnings("checkstyle:ParameterNumber") public TimestampTableQuerier( DatabaseDialect dialect, QueryMode mode, @@ -64,7 +66,8 @@ public TimestampTableQuerier( Long timestampDelay, ZoneId zoneId, String suffix, - TimestampGranularity timestampGranularity + TimestampGranularity timestampGranularity, + Boolean isQueryMasked ) { super( dialect, @@ -77,7 +80,8 @@ public TimestampTableQuerier( timestampDelay, zoneId, suffix, - timestampGranularity + timestampGranularity, + isQueryMasked ); this.latestCommittableTimestamp = this.offset.getTimestampOffset(); @@ -177,10 +181,16 @@ public void reset(long now, boolean resetOffset) { @Override public String toString() { return "TimestampTableQuerier{" - + "table=" + tableId - + ", query='" + query + '\'' - + ", topicPrefix='" + topicPrefix + '\'' - + ", timestampColumns=" + timestampColumnNames + + "table=" + + tableId + + ", query='" + + LogUtil.maybeRedact(shouldRedactSensitiveLogs, query) + + '\'' + + ", topicPrefix='" + + topicPrefix + + '\'' + + ", timestampColumns=" + + timestampColumnNames + '}'; } diff --git a/src/main/java/io/confluent/connect/jdbc/util/LogUtil.java b/src/main/java/io/confluent/connect/jdbc/util/LogUtil.java index 2bbeeb4a6..acb75f2e6 100644 --- a/src/main/java/io/confluent/connect/jdbc/util/LogUtil.java +++ b/src/main/java/io/confluent/connect/jdbc/util/LogUtil.java @@ -23,6 +23,8 @@ * error information to investigate incidents while at the same time avoid logging sensitive data. */ public class LogUtil { + private static final String REDACTED_VALUE = ""; + public static SQLException trimSensitiveData(SQLException e) { return (SQLException) trimSensitiveData((Throwable)e); } @@ -48,6 +50,40 @@ public static Throwable trimSensitiveData(Throwable t) { e.getUpdateCounts()); } + public static SQLException redactSensitiveData(SQLException e) { + return (SQLException) redactSensitiveData((Throwable) e); + } + + public static Throwable redactSensitiveData(Throwable t) { + if (!(t instanceof SQLException)) { + return t; + } + + if (!(t instanceof BatchUpdateException)) { + // t is a SQLException, but not BatchUpdateException. + SQLException oldSqlException = (SQLException) t; + SQLException newSqlException = + new SQLException( + REDACTED_VALUE, oldSqlException.getSQLState(), oldSqlException.getErrorCode()); + newSqlException.setNextException(redactSensitiveData(oldSqlException.getNextException())); + newSqlException.setStackTrace(oldSqlException.getStackTrace()); + return newSqlException; + } + + // At this point t is BatchUpdateException; redact its message too. + BatchUpdateException oldBatchUpdateException = (BatchUpdateException) t; + BatchUpdateException newBatchUpdateException = + new BatchUpdateException( + REDACTED_VALUE, + oldBatchUpdateException.getSQLState(), + oldBatchUpdateException.getErrorCode(), + oldBatchUpdateException.getUpdateCounts()); + newBatchUpdateException.setNextException( + redactSensitiveData(oldBatchUpdateException.getNextException())); + newBatchUpdateException.setStackTrace(oldBatchUpdateException.getStackTrace()); + return newBatchUpdateException; + } + // This implementation assumes it to be Postgres, see toString() of ServerErrorMessage.java // as well as the constructor of PSQLException.java with "boolean detail" flag in // https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/main/java/org/postgresql/util/ @@ -79,4 +115,11 @@ private static String getNonSensitiveErrorMessage(String errMsg) { String msg2 = errMsg.substring(errorStartIdx, errorEndIdx); return msg1 + msg2; } + + public static String maybeRedact(boolean shouldRedactSensitiveLogs, String msg) { + if (shouldRedactSensitiveLogs) { + return REDACTED_VALUE; + } + return String.valueOf(msg); + } } diff --git a/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java b/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java index b5445287e..f32401430 100644 --- a/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java +++ b/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; /** * Validation class for JDBC Source Connector configurations. @@ -38,6 +39,8 @@ public class JdbcSourceConnectorValidation { private static final Logger log = LoggerFactory.getLogger(JdbcSourceConnectorValidation.class); + private static final Pattern SELECT_STATEMENT_PATTERN = + Pattern.compile("(?is)^SELECT\\b"); protected JdbcSourceConnectorConfig config; protected Config validationResult; private final Map connectorConfigs; @@ -80,7 +83,8 @@ public Config validate() { } boolean validationResult = validateMultiConfigs() - && validateLegacyNewConfigCompatibility(); + && validateLegacyNewConfigCompatibility() + && validateQueryConfigs(); if (validationResult && isUsingNewConfigs()) { validationResult = validateTableInclusionConfigs() @@ -88,17 +92,14 @@ public Config validate() { } validationResult = validationResult && validatePluginSpecificNeeds(); - if (!validationResult) { log.info("Validation failed"); } else { log.info("Validation succeeded"); } - } catch (Exception e) { log.error("Error during validation", e); } - return this.validationResult; } @@ -159,6 +160,10 @@ private boolean validateLegacyNewConfigCompatibility() { boolean usingLegacyConfigs = isUsingLegacyConfigs(); boolean usingNewConfigs = isUsingNewConfigs(); + if (config.getQuery().isPresent()) { + return true; + } + if (usingLegacyConfigs && usingNewConfigs) { return addConfigErrorsForLegacyAndNewConfigConflict(); } @@ -179,18 +184,16 @@ private boolean isUsingLegacyConfigs() { Set blacklistSet = config.getTableBlacklistSet(); String incrementingColumnName = config.getIncrementingColumnName(); List timestampColumnName = config.getTimestampColumnName(); - boolean hasWhitelist = !whitelistSet.isEmpty(); boolean hasBlacklist = !blacklistSet.isEmpty(); - boolean hasLegacyIncrementing = incrementingColumnName != null + boolean hasLegacyIncrementing = incrementingColumnName != null && !incrementingColumnName.trim().isEmpty(); - boolean hasLegacyTimestamp = timestampColumnName != null - && !timestampColumnName.isEmpty() + boolean hasLegacyTimestamp = timestampColumnName != null + && !timestampColumnName.isEmpty() && !timestampColumnName.get(0).trim().isEmpty(); - return hasWhitelist || hasBlacklist || hasLegacyIncrementing || hasLegacyTimestamp; } - + /** * Check if any new config keys are being used. * New keys: table.include.list, table.exclude.list, incrementing.column.mapping, @@ -201,17 +204,15 @@ private boolean isUsingNewConfigs() { Set excludeListSet = config.getTableExcludeListSet(); List incrementingColumnMapping = config.getIncrementingColumnMapping(); List timestampColumnsMapping = config.getTimestampColumnMapping(); - boolean hasIncludeList = !includeListSet.isEmpty(); boolean hasExcludeList = !excludeListSet.isEmpty(); - boolean hasNewIncrementing = incrementingColumnMapping != null + boolean hasNewIncrementing = incrementingColumnMapping != null && !incrementingColumnMapping.isEmpty(); - boolean hasNewTimestamp = timestampColumnsMapping != null + boolean hasNewTimestamp = timestampColumnsMapping != null && !timestampColumnsMapping.isEmpty(); - return hasIncludeList || hasExcludeList || hasNewIncrementing || hasNewTimestamp; } - + /** * Validate conflict between legacy and new configs. * Only add errors to configs that are actually present and conflicting. @@ -223,52 +224,43 @@ private boolean addConfigErrorsForLegacyAndNewConfigConflict() { + "(table.include.list, table.exclude.list, timestamp.columns.mapping, " + "incrementing.column.mapping). Please choose one approach: either use all legacy " + "configurations or all new configurations."; - // Only add errors to configs that are actually present and non-empty Set whitelistSet = config.getTableWhitelistSet(); if (!whitelistSet.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, msg); } - Set blacklistSet = config.getTableBlacklistSet(); if (!blacklistSet.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG, msg); } - Set includeListSet = config.getTableIncludeListSet(); if (!includeListSet.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TABLE_INCLUDE_LIST_CONFIG, msg); } - Set excludeListSet = config.getTableExcludeListSet(); if (!excludeListSet.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG, msg); } - List timestampColumnName = config.getTimestampColumnName(); - if (timestampColumnName != null && !timestampColumnName.isEmpty() + if (timestampColumnName != null && !timestampColumnName.isEmpty() && !timestampColumnName.get(0).trim().isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TIMESTAMP_COLUMN_NAME_CONFIG, msg); } - List timestampColumnsMapping = config.getTimestampColumnMapping(); if (timestampColumnsMapping != null && !timestampColumnsMapping.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TIMESTAMP_COLUMN_MAPPING_CONFIG, msg); } - String incrementingColumnName = config.getIncrementingColumnName(); if (incrementingColumnName != null && !incrementingColumnName.trim().isEmpty()) { addConfigError(JdbcSourceConnectorConfig.INCREMENTING_COLUMN_NAME_CONFIG, msg); } - List incrementingColumnMapping = config.getIncrementingColumnMapping(); if (incrementingColumnMapping != null && !incrementingColumnMapping.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.INCREMENTING_COLUMN_MAPPING_CONFIG, msg); } - return false; } - + /** * Validate that at least one configuration is provided. */ @@ -278,30 +270,90 @@ private boolean addConfigErrorsForNoConfigProvided() { + JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG + ", " + JdbcSourceConnectorConfig.TABLE_INCLUDE_LIST_CONFIG + ", or " + JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG + "."; - addConfigError(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, msg); addConfigError(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG, msg); addConfigError(JdbcSourceConnectorConfig.TABLE_INCLUDE_LIST_CONFIG, msg); addConfigError(JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG, msg); return false; } - + /** * Validate new config requirements (when using new configs only). */ private boolean validateTableInclusionConfigs() { Set includeListSet = config.getTableIncludeListSet(); Set excludeListSet = config.getTableExcludeListSet(); - // Validate that exclude list requires include list if (!excludeListSet.isEmpty() && includeListSet.isEmpty()) { - String msg = JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG + String msg = JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG + " cannot be used without " + JdbcSourceConnectorConfig.TABLE_INCLUDE_LIST_CONFIG + ". Exclude list only applies to tables that match the include list."; addConfigError(JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG, msg); return false; } - + return true; + } + + /** + * Validate that only one of query or query.masked configs is set at a time. + * Both configs should not be set simultaneously to avoid ambiguity. + */ + private boolean validateQueryConfigs() { + String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); + String queryMaskedValue = null; + org.apache.kafka.common.config.types.Password queryMasked = + config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); + if (queryMasked != null && queryMasked.value() != null) { + queryMaskedValue = queryMasked.value(); + } + + boolean hasQuery = query != null && !query.isEmpty(); + boolean hasQueryMasked = queryMaskedValue != null && !queryMaskedValue.isEmpty(); + + if (hasQuery && hasQueryMasked) { + String msg = "Both 'query' and 'query.masked' configs cannot be set at the same time. " + + "Please use only one of them."; + + addConfigError(JdbcSourceConnectorConfig.QUERY_CONFIG, msg); + addConfigError(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, msg); + + log.error("Validation failed: Both query and query.masked configs are set"); + return false; + } + + if (config.getQuery().isPresent() && isUsingTableFilteringConfigs()) { + String msg = + "Do not specify table filtering configs with 'query'. " + + "Remove table.whitelist / table.blacklist / table.include.list / " + + "table.exclude.list when using query mode" + + " or 'query' when using table filtering mode."; + addConfigError(JdbcSourceConnectorConfig.QUERY_CONFIG, msg); + addConfigError(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, msg); + if (!config.getTableWhitelistSet().isEmpty()) { + addConfigError(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, msg); + } + if (!config.getTableBlacklistSet().isEmpty()) { + addConfigError(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG, msg); + } + if (!config.getTableIncludeListSet().isEmpty()) { + addConfigError(JdbcSourceConnectorConfig.TABLE_INCLUDE_LIST_CONFIG, msg); + } + if (!config.getTableExcludeListSet().isEmpty()) { + addConfigError(JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG, msg); + } + return false; + } + + if (hasQuery + && !validateSelectStatement(query, JdbcSourceConnectorConfig.QUERY_CONFIG)) { + return false; + } + if (hasQueryMasked + && !validateSelectStatement( + queryMaskedValue, JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG)) { + return false; + } + return true; } @@ -335,7 +387,6 @@ private boolean validateTsColProvidedWhenRequired() { List timestampColumnsMapping = config.getTimestampColumnMapping(); boolean hasNewTimestampConfig = timestampColumnsMapping != null && !timestampColumnsMapping.isEmpty(); - if (!hasNewTimestampConfig) { String msg = String.format( "Timestamp column configuration must be provided when using mode '%s' or '%s'. " @@ -358,10 +409,8 @@ private boolean validateTsColProvidedWhenRequired() { private boolean validateTsColNotProvidedWhenNotRequired() { if (!config.modeUsesTimestampColumn()) { List timestampColumnsMapping = config.getTimestampColumnMapping(); - boolean hasNewTimestampConfig = timestampColumnsMapping != null && !timestampColumnsMapping.isEmpty(); - if (hasNewTimestampConfig) { String msg = String.format( "Timestamp column configurations should not be provided if mode is not '%s' or '%s'. " @@ -386,10 +435,8 @@ private boolean validateTsColNotProvidedWhenNotRequired() { private boolean validateIncrColProvidedWhenRequired() { if (config.modeUsesIncrementingColumn()) { List incrementingColumnMapping = config.getIncrementingColumnMapping(); - boolean hasNewIncrementingConfig = incrementingColumnMapping != null && !incrementingColumnMapping.isEmpty(); - if (!hasNewIncrementingConfig) { String msg = String.format( "Incrementing column configuration must be provided when using mode '%s' or '%s'. " @@ -412,10 +459,8 @@ private boolean validateIncrColProvidedWhenRequired() { private boolean validateIncrColumnNotProvidedWhenNotRequired() { if (!config.modeUsesIncrementingColumn()) { List incrementingColumnMapping = config.getIncrementingColumnMapping(); - boolean hasNewIncrementingConfig = incrementingColumnMapping != null && !incrementingColumnMapping.isEmpty(); - if (hasNewIncrementingConfig) { String msg = String.format( "Incrementing column configurations " @@ -442,4 +487,30 @@ protected void addConfigError(String configName, String errorMessage) { .ifPresent(cv -> cv.addErrorMessage(errorMessage)); } -} + /** Validate that provided query strings start with a SELECT statement. */ + private boolean validateSelectStatement(String statement, String configKey) { + String trimmedStatement = statement.trim(); + if (!SELECT_STATEMENT_PATTERN.matcher(trimmedStatement).find()) { + String msg = + String.format( + "Only SELECT statements are supported for '%s'. Please provide " + + "a statement that starts with SELECT.", + configKey); + addConfigError(configKey, msg); + log.error(msg); + return false; + } + return true; + } + + /** + * Determine whether any table filtering configurations are in use. + */ + private boolean isUsingTableFilteringConfigs() { + return !config.getTableWhitelistSet().isEmpty() + || !config.getTableBlacklistSet().isEmpty() + || !config.getTableIncludeListSet().isEmpty() + || !config.getTableExcludeListSet().isEmpty(); + } + +} \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java index eea717b34..8201efac6 100644 --- a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java +++ b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java @@ -424,4 +424,154 @@ public void testWhitespaceInRegexPatternsIsHandledCorrectly() { assertTrue(includeList.contains("schema1\\.users")); assertTrue(includeList.contains("schema2\\.orders")); } + + @Test + public void testQueryMaskedValueIsMaskedInConfigValue() { + // Setup config with query.masked + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + String sensitiveQuery = "SELECT * FROM sensitive_table WHERE secret_column = 'confidential'"; + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, sensitiveQuery); + + // Validate and get config values + Map validatedConfig = JdbcSourceConnectorConfig.CONFIG_DEF.validateAll(props); + ConfigValue queryMaskedValue = validatedConfig.get(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); + + assertNotNull(queryMaskedValue); + + // The value should be masked (shown as [hidden] or Password object) + Object maskedValue = queryMaskedValue.value(); + assertNotNull(maskedValue); + + // When converted to string, Password objects show as "[hidden]" + String maskedString = maskedValue.toString(); + assertEquals("[hidden]", maskedString); + + // The actual query should NOT be visible in the string representation + assertFalse(maskedString.contains(sensitiveQuery)); + } + + @Test + public void testGetQueryStringReturnsQueryWhenOnlyQuerySet() { + // Setup config with only query (not query.masked) + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + String expectedQuery = "SELECT * FROM public_table"; + props.put(JdbcSourceConnectorConfig.QUERY_CONFIG, expectedQuery); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + assertTrue(config.getQuery().isPresent()); + assertEquals(expectedQuery, config.getQuery().get()); + } + + @Test + public void testGetQueryStringReturnsEmptyWhenNeitherSet() { + // Setup config without query or query.masked + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // getQuery() should return empty Optional + assertFalse(config.getQuery().isPresent()); + } + + @Test + public void testIsQueryMaskedTrueWhenMaskedQueryPresent() { + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1"); + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, "SELECT * FROM sensitive_table"); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + assertTrue(config.isQueryMasked()); + } + + @Test + public void testIsQueryMaskedFalseWhenMaskedQueryMissing() { + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1"); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + assertFalse(config.isQueryMasked()); + } + + @Test + public void testQueryMaskedSupportsComplexQueryWithMultipleJoins() { + // Test that complex queries work fine with query.masked + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + + String complexQuery = "SELECT u.id, u.username, u.email, " + + "p.profile_pic, a.street, a.city, a.country, " + + "o.order_id, o.order_date, o.total_amount " + + "FROM users u " + + "INNER JOIN profiles p ON u.id = p.user_id " + + "LEFT JOIN addresses a ON u.id = a.user_id " + + "LEFT JOIN orders o ON u.id = o.user_id " + + "WHERE u.status = 'active' " + + "AND u.created_at >= '2024-01-01' " + + "AND o.total_amount > 100.00 " + + "ORDER BY o.order_date DESC " + + "LIMIT 1000"; + + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, complexQuery); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + assertTrue(config.getQuery().isPresent()); + assertEquals(complexQuery, config.getQuery().get()); + } + + @Test + public void testQueryMaskedSupportsSpecialCharacters() { + // Test that queries with special characters work fine + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + + String queryWithSpecialChars = "SELECT * FROM users " + + "WHERE name LIKE '%O''Brien%' " + + "AND email = 'test@example.com' " + + "AND description LIKE '%Line1\nLine2%' " + + "AND data LIKE '%Tab\tSeparated%'"; + + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, queryWithSpecialChars); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // Verify special characters are preserved + assertTrue(config.getQuery().isPresent()); + assertEquals(queryWithSpecialChars, config.getQuery().get()); + } + + @Test + public void testQueryMaskedWithEmptyStringBehavior() { + // Test behavior with empty string + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, ""); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // Empty string should return empty Optional + assertFalse(config.getQuery().isPresent()); + } } diff --git a/src/test/java/io/confluent/connect/jdbc/source/TableQuerierTest.java b/src/test/java/io/confluent/connect/jdbc/source/TableQuerierTest.java index dba07c082..94af95cef 100644 --- a/src/test/java/io/confluent/connect/jdbc/source/TableQuerierTest.java +++ b/src/test/java/io/confluent/connect/jdbc/source/TableQuerierTest.java @@ -41,6 +41,7 @@ public class TableQuerierTest { private static final String QUERY = "SELECT * FROM name"; DatabaseDialect databaseDialectMock; + Connection connectionMock; @@ -61,7 +62,7 @@ public void init() @Test public void testTimestampIncrementingTableQuerierInTableModeWithSuffix() throws SQLException { TimestampIncrementingTableQuerier querier = new TimestampIncrementingTableQuerier( - databaseDialectMock, + databaseDialectMock, QueryMode.TABLE, TABLE_NAME, null, @@ -71,7 +72,8 @@ public void testTimestampIncrementingTableQuerierInTableModeWithSuffix() throws TIMESTAMP_DELAY, null, SUFFIX, - JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL + JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL, + false ); querier.createPreparedStatement(connectionMock); @@ -82,7 +84,7 @@ public void testTimestampIncrementingTableQuerierInTableModeWithSuffix() throws @Test public void testTimestampIncrementingTableQuerierInQueryModeWithSuffix() throws SQLException { TimestampIncrementingTableQuerier querier = new TimestampIncrementingTableQuerier( - databaseDialectMock, + databaseDialectMock, QueryMode.QUERY, QUERY, null, @@ -92,7 +94,8 @@ public void testTimestampIncrementingTableQuerierInQueryModeWithSuffix() throws TIMESTAMP_DELAY, null, SUFFIX, - JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL + JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL, + false ); querier.createPreparedStatement(connectionMock); @@ -107,7 +110,8 @@ public void testBulkTableQuerierInTableModeWithSuffix() throws SQLException { QueryMode.TABLE, TABLE_NAME, null, - SUFFIX + SUFFIX, + false ); querier.createPreparedStatement(connectionMock); @@ -122,7 +126,8 @@ public void testBulkTableQuerierInQueryModeWithSuffix() throws SQLException { QueryMode.QUERY, QUERY, null, - SUFFIX + SUFFIX, + false ); querier.createPreparedStatement(connectionMock); @@ -137,7 +142,8 @@ public void testBulkTableQuerierInQueryModeWithoutSuffix() throws SQLException { QueryMode.QUERY, QUERY, null, - "" /* default value */ + "", /* default value */ + false ); querier.createPreparedStatement(connectionMock); diff --git a/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerierTest.java b/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerierTest.java index a3f69d70e..efe842cd3 100644 --- a/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerierTest.java +++ b/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerierTest.java @@ -100,7 +100,8 @@ private TimestampIncrementingTableQuerier querier( 10211197100L, // Timestamp delay ZoneId.of("UTC"), "", - JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL + JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL, + false ); } diff --git a/src/test/java/io/confluent/connect/jdbc/source/TimestampTableQuerierTest.java b/src/test/java/io/confluent/connect/jdbc/source/TimestampTableQuerierTest.java index 852e13778..a26f0374c 100644 --- a/src/test/java/io/confluent/connect/jdbc/source/TimestampTableQuerierTest.java +++ b/src/test/java/io/confluent/connect/jdbc/source/TimestampTableQuerierTest.java @@ -97,7 +97,8 @@ private TimestampIncrementingTableQuerier querier(Timestamp initialTimestampOffs 10211197100L, // Timestamp delay ZoneId.of("UTC"), "", - JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL + JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL, + false ); } diff --git a/src/test/java/io/confluent/connect/jdbc/util/LogUtilTest.java b/src/test/java/io/confluent/connect/jdbc/util/LogUtilTest.java index 9a18c2cb7..465699656 100644 --- a/src/test/java/io/confluent/connect/jdbc/util/LogUtilTest.java +++ b/src/test/java/io/confluent/connect/jdbc/util/LogUtilTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertEquals; public class LogUtilTest { + private static final String REDACTED = ""; @Test public void testNonSqlThrowable() { @@ -130,6 +131,62 @@ public void testSecondLevelNestedBatchUpdateSensitiveNoError() { assertEqualsSQLException(expectedTrimmed, actualTrimmed); } + @Test + public void testSensitiveLogWithTrimEnabled() { + String sensitiveMessage = "SELECT * FROM users WHERE password='secret123'"; + String result = LogUtil.maybeRedact(true, sensitiveMessage); + assertEquals(REDACTED, result); + } + + @Test + public void testSensitiveLogWithTrimDisabled() { + String message = "SELECT * FROM users WHERE id=1"; + String result = LogUtil.maybeRedact(false, message); + assertEquals(message, result); + } + + @Test + public void testRedactSensitiveDataWithNonSqlThrowable() { + Throwable t = new RuntimeException("secret"); + Assert.assertSame(t, LogUtil.redactSensitiveData(t)); + } + + @Test + public void testRedactSensitiveDataWithSqlExceptionChain() { + SQLException e1 = new SQLException("sensitive-message-e1", "42000", 10); + SQLException e2 = new SQLException("sensitive-message-e2", "42001", 20); + e1.setNextException(e2); + + SQLException expected = new SQLException(REDACTED, "42000", 10); + SQLException expectedChild = new SQLException(REDACTED, "42001", 20); + expected.setNextException(expectedChild); + + SQLException redacted = LogUtil.redactSensitiveData(e1); + + assertEqualsSQLException(expected, redacted); + } + + @Test + public void testRedactSensitiveDataWithBatchUpdateException() { + BatchUpdateException e1 = + new BatchUpdateException("sensitive message-e1", "42002", 30, new int[0]); + + SQLException e2 = new SQLException("sensitive message-e2", "42003", 40); + e1.setNextException(e2); + + BatchUpdateException expected = + new BatchUpdateException(REDACTED, "42002", 30, new int[0]); + SQLException expectedChild = new SQLException(REDACTED, "42003", 40); + expected.setNextException(expectedChild); + + SQLException actual = LogUtil.redactSensitiveData(e1); + Assert.assertTrue(actual instanceof BatchUpdateException); + Assert.assertArrayEquals( + expected.getUpdateCounts(), ((BatchUpdateException) actual).getUpdateCounts()); + + assertEqualsSQLException(expected, actual); + } + @Test public void testSecondLevelNestedBatchUpdateSensitiveNoDetails() { SQLException e1 = new SQLException("e1"); diff --git a/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java b/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java index d1ab169a4..5922d7865 100644 --- a/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java +++ b/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java @@ -777,5 +777,127 @@ public void validate_withModeBulkWithNewIncrementingMapping_setsError() { assertErrors(MODE_CONFIG, 1); assertErrorMatches(MODE_CONFIG, ".*Incrementing column configurations should not be provided.*"); } + + @Test + public void validate_withBothQueryAndQueryMasked_setsError() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); + props.put(QUERY_CONFIG, "SELECT * FROM users"); + props.put(QUERY_MASKED_CONFIG, "SELECT * FROM sensitive_data"); + + validate(); + + assertErrors(2); + assertErrors(QUERY_CONFIG, 1); + assertErrors(QUERY_MASKED_CONFIG, 1); + assertErrorMatches(QUERY_CONFIG, ".*Both 'query' and 'query.masked' configs cannot be set.*"); + assertErrorMatches(QUERY_MASKED_CONFIG, ".*Both 'query' and 'query.masked' configs cannot be set.*"); + } + + @Test + public void validate_withOnlyQuery_noErrors() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(QUERY_CONFIG, "SELECT * FROM users WHERE active = true"); + + validate(); + + assertNoErrors(); + } + + @Test + public void validate_withOnlyQueryMasked_noErrors() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(QUERY_MASKED_CONFIG, "SELECT * FROM users WHERE active = true"); + + validate(); + + assertNoErrors(); + } + + @Test + public void validate_withBothQueryAndQueryMaskedEmpty_noErrors() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); + // Both empty should be fine as it's equivalent to neither being set + props.put(QUERY_CONFIG, ""); + props.put(QUERY_MASKED_CONFIG, ""); + + validate(); + + assertNoErrors(); + } + + @Test + public void validate_withQueryStartingWithUpdate_setsError() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(QUERY_CONFIG, "UPDATE users SET active = false"); + + validate(); + + assertErrors(1); + assertErrors(QUERY_CONFIG, 1); + assertErrorMatches(QUERY_CONFIG, ".*Only SELECT statements are supported for 'query'.*"); + } + + @Test + public void validate_withQueryMaskedStartingWithUpdate_setsError() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(QUERY_MASKED_CONFIG, "UPDATE users SET active = false"); + + validate(); + + assertErrors(1); + assertErrors(QUERY_MASKED_CONFIG, 1); + assertErrorMatches( + QUERY_MASKED_CONFIG, + "Only SELECT statements are supported for 'query.masked'" + ); + } + + @Test + public void validate_withQueryMaskedAndIncrementingColumn_noErrors() { + props.put(MODE_CONFIG, MODE_INCREMENTING); + props.put(QUERY_MASKED_CONFIG, "SELECT * FROM users"); + props.put(INCREMENTING_COLUMN_NAME_CONFIG, "id"); + + validate(); + + assertNoErrors(); + } + + @Test + public void validate_withQueryMaskedContainingComplexQuery_noErrors() { + props.put(MODE_CONFIG, MODE_BULK); + // Test with a complex query containing multiple joins + String complexQuery = "SELECT a.id, a.name, b.email, c.address, d.phone " + + "FROM users a " + + "INNER JOIN emails b ON a.id = b.user_id " + + "LEFT JOIN addresses c ON a.id = c.user_id " + + "LEFT JOIN phones d ON a.id = d.user_id " + + "WHERE a.created_at > '2024-01-01' AND a.status = 'active'"; + props.put(QUERY_MASKED_CONFIG, complexQuery); + + validate(); + + assertNoErrors(); + } + + @Test + public void validate_withQueryAndTableFilteringConfigs_setsError() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(TABLE_INCLUDE_LIST_CONFIG, "database.schema.table.*"); + props.put(QUERY_CONFIG, "SELECT * FROM users"); + + validate(); + + assertErrors(3); + assertErrors(QUERY_CONFIG, 1); + assertErrors(QUERY_MASKED_CONFIG, 1); + assertErrors(TABLE_INCLUDE_LIST_CONFIG, 1); + assertErrorMatches( + QUERY_CONFIG, + "Do not specify table filtering configs with 'query'" + ); + } }