Skip to content

Commit 1720bf7

Browse files
Merge branch '10.8.x' into master
2 parents 5b8ff87 + 316ffac commit 1720bf7

File tree

2 files changed

+23
-5
lines changed

2 files changed

+23
-5
lines changed

src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
173173
} else {
174174
log.info("No custom query provided, generating task configurations for tables");
175175
List<TableId> currentTables = tableMonitorThread.tables();
176+
log.trace("Current tables from tableMonitorThread: {}", currentTables);
177+
176178
if (currentTables == null || currentTables.isEmpty()) {
177179
taskConfigs = new ArrayList<>(1);
178180
Map<String, String> taskProps = new HashMap<>(configProperties);
@@ -189,11 +191,13 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
189191
log.warn("The connector has not been able to read the "
190192
+ "list of tables from the database yet.");
191193
} else {
194+
log.trace("currentTables is empty - no tables found after fetch");
192195
taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true");
193196
log.warn("No tables were found so there's no work to be done.");
194197
}
195198
taskConfigs.add(taskProps);
196199
} else {
200+
log.trace("Found {} tables to process", currentTables.size());
197201
int numGroups = Math.min(currentTables.size(), maxTasks);
198202
List<List<TableId>> tablesGrouped =
199203
ConnectorUtils.groupPartitions(currentTables, numGroups);
@@ -204,6 +208,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
204208
builder.appendList().delimitedBy(",").of(taskTables);
205209
taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, builder.toString());
206210
taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true");
211+
log.trace("Assigned tables {} to task with tablesFetched=true", taskTables);
207212
taskConfigs.add(taskProps);
208213
}
209214
log.info("Current Tables size: {}", currentTables.size());

src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,22 +82,26 @@ public void run() {
8282
log.info("Starting thread to monitor tables.");
8383
while (shutdownLatch.getCount() > 0) {
8484
try {
85+
log.trace("Starting table update check cycle");
8586
if (updateTables()) {
86-
log.info("Task Reconfiguration has been invoked.");
87+
log.trace("Table changes detected, requesting task reconfiguration");
8788
context.requestTaskReconfiguration();
89+
} else {
90+
log.trace("No table changes detected in this cycle");
8891
}
8992
} catch (Exception e) {
9093
throw fail(e);
9194
}
9295

9396
try {
94-
log.debug("Waiting {} ms to check for changed.", pollMs);
97+
log.debug("Waiting {} ms to check for changed tables", pollMs);
9598
boolean shuttingDown = shutdownLatch.await(pollMs, TimeUnit.MILLISECONDS);
9699
if (shuttingDown) {
100+
log.info("Shutdown signal received, stopping table monitor thread");
97101
return;
98102
}
99103
} catch (InterruptedException e) {
100-
log.error("Unexpected InterruptedException, ignoring: ", e);
104+
log.error("Unexpected InterruptedException in table monitor thread", e);
101105
}
102106
}
103107
}
@@ -108,6 +112,7 @@ public void run() {
108112
* successfully yet
109113
*/
110114
public List<TableId> tables() {
115+
log.trace("Requesting current tables list");
111116
awaitTablesReady(startupMs);
112117
List<TableId> tablesSnapshot = tables.get();
113118
if (tablesSnapshot == null) {
@@ -123,7 +128,7 @@ public List<TableId> tables() {
123128

124129
if (tablesSnapshot.isEmpty()) {
125130
log.info(
126-
"Based on the supplied filtering rules, there are no matching tables to read from"
131+
"Based on the supplied filtering rules, there are no matching tables to read data."
127132
);
128133
} else {
129134
log.debug(
@@ -136,6 +141,7 @@ public List<TableId> tables() {
136141
}
137142

138143
if (!duplicates.isEmpty()) {
144+
log.warn("Duplicate table names detected: {}", duplicates);
139145
String configText;
140146
if (whitelist != null) {
141147
configText = "'" + JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG + "'";
@@ -174,8 +180,9 @@ public void shutdown() {
174180
private boolean updateTables() {
175181
final List<TableId> allTables;
176182
try {
183+
log.trace("Fetching all tables from database");
177184
allTables = dialect.tableIds(connectionProvider.getConnection());
178-
log.debug("Got the following tables: {}", allTables);
185+
log.debug("Retrieved {} tables from database: {}", allTables.size(), allTables);
179186
} catch (SQLException e) {
180187
log.error(
181188
"Error while trying to get updated table list, ignoring and waiting for next table poll"
@@ -188,24 +195,29 @@ private boolean updateTables() {
188195

189196
final List<TableId> filteredTables = new ArrayList<>(allTables.size());
190197
if (whitelist != null) {
198+
log.trace("Applying whitelist filter to tables");
191199
for (TableId table : allTables) {
192200
String fqn1 = dialect.expressionBuilder().append(table, QuoteMethod.NEVER).toString();
193201
String fqn2 = dialect.expressionBuilder().append(table, QuoteMethod.ALWAYS).toString();
194202
if (whitelist.contains(fqn1) || whitelist.contains(fqn2)
195203
|| whitelist.contains(table.tableName())) {
196204
filteredTables.add(table);
205+
log.trace("Table {} passed whitelist filter", table);
197206
}
198207
}
199208
} else if (blacklist != null) {
209+
log.trace("Applying blacklist filter to tables");
200210
for (TableId table : allTables) {
201211
String fqn1 = dialect.expressionBuilder().append(table, QuoteMethod.NEVER).toString();
202212
String fqn2 = dialect.expressionBuilder().append(table, QuoteMethod.ALWAYS).toString();
203213
if (!(blacklist.contains(fqn1) || blacklist.contains(fqn2)
204214
|| blacklist.contains(table.tableName()))) {
205215
filteredTables.add(table);
216+
log.trace("Table {} passed blacklist filter", table);
206217
}
207218
}
208219
} else {
220+
log.trace("No filters applied, using all tables");
209221
filteredTables.addAll(allTables);
210222
}
211223

@@ -215,6 +227,7 @@ private boolean updateTables() {
215227
}
216228
synchronized (tables) {
217229
tables.notifyAll();
230+
log.trace("Notified all waiting threads about table updates");
218231
}
219232
return !Objects.equals(priorTablesSnapshot, filteredTables);
220233
}

0 commit comments

Comments
 (0)