diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 8ba08d05f..a5c68304b 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -7,7 +7,7 @@ version: v1.0 name: build-test-release agent: machine: - type: s1-prod-ubuntu20-04-amd64-1 + type: s1-prod-ubuntu24-04-amd64-1 fail_fast: cancel: @@ -39,7 +39,7 @@ blocks: commands: - pip install confluent-release-tools -q - . sem-pint - - sudo apt-get --assume-yes install libncurses5 + - sudo apt-get --assume-yes install libncurses6 - mvn -Dcloud -Pjenkins -U -Dmaven.wagon.http.retryHandler.count=10 -Ddependency.check.skip=true --batch-mode --no-transfer-progress clean verify install dependency:analyze validate - . cve-scan - . cache-maven store @@ -74,7 +74,7 @@ after_pipeline: task: agent: machine: - type: s1-prod-ubuntu20-04-arm64-0 + type: s1-prod-ubuntu24-04-arm64-0 jobs: - name: Metrics commands: diff --git a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java index 13b721230..df3fabd2d 100644 --- a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java +++ b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java @@ -61,6 +61,7 @@ public class JdbcSourceConnector extends SourceConnector { private TableMonitorThread tableMonitorThread; private DatabaseDialect dialect; + @Override public String version() { return Version.getVersion(); @@ -131,7 +132,9 @@ public void start(Map properties) throws ConnectException { tablePollMs, whitelistSet, blacklistSet, - Time.SYSTEM + Time.SYSTEM, + config.connectorName(), + JdbcSourceTaskConfig.TASK_ID_CONFIG ); if (query.isEmpty()) { tableMonitorThread.start(); @@ -202,6 +205,7 @@ public List> taskConfigs(int maxTasks) { List> tablesGrouped = ConnectorUtils.groupPartitions(currentTables, numGroups); taskConfigs = new ArrayList<>(tablesGrouped.size()); + int count = 0; for (List taskTables : tablesGrouped) { Map taskProps = new HashMap<>(configProperties); ExpressionBuilder builder = dialect.expressionBuilder(); @@ -209,6 +213,7 @@ public List> taskConfigs(int maxTasks) { taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, builder.toString()); taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true"); log.trace("Assigned tables {} to task with tablesFetched=true", taskTables); + taskProps.put(JdbcSourceTaskConfig.TASK_ID_CONFIG, count++ + ""); taskConfigs.add(taskProps); } log.info("Current Tables size: {}", currentTables.size()); diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index e5b3d32a0..859a72fca 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -1062,11 +1062,14 @@ public static int get(TransactionIsolationMode mode) { } } - protected JdbcSourceConnectorConfig(ConfigDef subclassConfigDef, Map props) { super(subclassConfigDef, props); } + public String connectorName() { + return originalsStrings().get("name"); + } + public NumericMapping numericMapping() { return NumericMapping.get(this); } diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTaskConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTaskConfig.java index 38d324855..5181d7414 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTaskConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTaskConfig.java @@ -31,9 +31,13 @@ public class JdbcSourceTaskConfig extends JdbcSourceConnectorConfig { private static final String TABLES_DOC = "List of tables for this task to watch for changes."; public static final String TABLES_FETCHED = "tables.fetched"; + public static final String TASK_ID_CONFIG = "task.id"; + private static final String TASK_ID_DOC = "Task's id"; + static ConfigDef config = baseConfigDef() .define(TABLES_CONFIG, Type.LIST, Importance.HIGH, TABLES_DOC) - .defineInternal(TABLES_FETCHED, Type.BOOLEAN, false, Importance.HIGH); + .defineInternal(TABLES_FETCHED, Type.BOOLEAN, false, Importance.HIGH) + .define(TASK_ID_CONFIG, Type.STRING, "0" ,Importance.HIGH, TASK_ID_DOC); public JdbcSourceTaskConfig(Map props) { super(config, props); diff --git a/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java b/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java index cb49e1ee6..13a5f60f4 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java @@ -63,7 +63,10 @@ public TableMonitorThread(DatabaseDialect dialect, long pollMs, Set whitelist, Set blacklist, - Time time + Time time, + String connectorName, + String taskId + ) { this.dialect = dialect; this.connectionProvider = connectionProvider; @@ -75,6 +78,7 @@ public TableMonitorThread(DatabaseDialect dialect, this.blacklist = blacklist; this.tables = new AtomicReference<>(); this.time = time; + this.setName(connectorName + "-" + taskId + "-TableMonitorThread"); } @Override diff --git a/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java b/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java index e58603f5e..26814927b 100644 --- a/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java +++ b/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java @@ -56,6 +56,9 @@ public class TableMonitorThreadTest { private static final long STARTUP_LIMIT = 50; private static final long POLL_INTERVAL = 100; + private static final String connectorName = "test-connector"; + private static final String connectorTaskId = "test-task-id"; + private final static TableId FOO = new TableId(null, null, "foo"); private final static TableId BAR = new TableId(null, null, "bar"); private final static TableId BAZ = new TableId(null, null, "baz"); @@ -92,7 +95,7 @@ public class TableMonitorThreadTest { public void testSingleLookup() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_FOO, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -107,7 +110,7 @@ public void testSingleLookup() throws Exception { public void testTablesBlockingTimeoutOnUpdateThread() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, 0, null, null, time); + STARTUP_LIMIT, 0, null, null, time, connectorName, connectorTaskId); CountDownLatch connectionRequested = new CountDownLatch(1); CountDownLatch connectionCompleted = new CountDownLatch(1); @@ -158,7 +161,7 @@ public void testTablesBlockingTimeoutOnUpdateThread() throws Exception { public void testTablesBlockingWithDeadlineOnUpdateThread() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, null, time); + STARTUP_LIMIT, POLL_INTERVAL, null, null, time, connectorName, connectorTaskId); EasyMock.expect(dialect.tableIds(EasyMock.eq(connection))).andReturn(Collections.emptyList()); EasyMock.expect(connectionProvider.getConnection()).andReturn(connection); @@ -185,7 +188,7 @@ public void testWhitelist() throws Exception { Set whitelist = new HashSet<>(Arrays.asList("foo", "bar")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_FOO_BAR, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -201,7 +204,7 @@ public void testBlacklist() throws Exception { Set blacklist = new HashSet<>(Arrays.asList("bar", "baz")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_FOO_BAR_BAZ, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -216,7 +219,7 @@ public void testBlacklist() throws Exception { public void testReconfigOnUpdate() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_FOO); expectTableNames(LIST_FOO, checkTableNames("foo")); context.requestTaskReconfiguration(); @@ -244,7 +247,7 @@ public void testReconfigOnUpdate() throws Exception { @Test public void testInvalidConnection() throws Exception { tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId); EasyMock.expect(connectionProvider.getConnection()).andThrow(new ConnectException("Simulated error with the db.")); CountDownLatch errorLatch = new CountDownLatch(1); @@ -267,7 +270,7 @@ public void testInvalidConnection() throws Exception { public void testDuplicates() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); context.requestTaskReconfiguration(); EasyMock.expectLastCall(); @@ -285,7 +288,7 @@ public void testDuplicateWithUnqualifiedWhitelist() throws Exception { Set whitelist = new HashSet<>(Arrays.asList("dup")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_DUP_ONLY, shutdownThread()); context.requestTaskReconfiguration(); EasyMock.expectLastCall(); @@ -304,7 +307,7 @@ public void testDuplicateWithUnqualifiedBlacklist() throws Exception { Set blacklist = new HashSet<>(Arrays.asList("foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); context.requestTaskReconfiguration(); EasyMock.expectLastCall(); @@ -323,7 +326,7 @@ public void testDuplicateWithQualifiedWhitelist() throws Exception { Set whitelist = new HashSet<>(Arrays.asList("dup1.dup", "foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -338,7 +341,7 @@ public void testDuplicateWithQualifiedBlacklist() throws Exception { Set blacklist = new HashSet<>(Arrays.asList("dup1.dup", "foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); EasyMock.replay(connectionProvider, dialect);