Skip to content

Commit 5b8ff87

Browse files
Merge branch '10.8.x' into master
2 parents 360724a + 923968c commit 5b8ff87

17 files changed

+431
-23
lines changed

src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,4 +695,14 @@ interface ColumnConverter {
695695
*/
696696
Object convert(ResultSet resultSet) throws SQLException, IOException;
697697
}
698+
699+
/**
700+
* Resolve a synonym to its base table name.
701+
*
702+
* @param connection the database connection; may not be null
703+
* @param synonymName the name of the synonym to resolve; may not be null
704+
* @return the base table name if the synonym exists, null otherwise
705+
* @throws SQLException if there is an error accessing the metadata
706+
*/
707+
String resolveSynonym(Connection connection, String synonymName) throws SQLException;
698708
}

src/main/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialect.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.kafka.connect.data.Time;
2323
import org.apache.kafka.connect.data.Timestamp;
2424

25+
import java.sql.Connection;
26+
import java.sql.SQLException;
2527
import java.util.Collection;
2628

2729
import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
@@ -190,4 +192,9 @@ protected String sanitizedUrl(String url) {
190192
return super.sanitizedUrl(url)
191193
.replaceAll("(?i)([:;]password=)[^;]*", "$1****");
192194
}
195+
196+
@Override
197+
public String resolveSynonym(Connection connection, String synonymName) throws SQLException {
198+
throw new SQLException("DB2 does not support synonyms. Please use views instead.");
199+
}
193200
}

src/main/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialect.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.kafka.connect.data.Timestamp;
2424

2525
import java.util.Collection;
26+
import java.sql.Connection;
27+
import java.sql.SQLException;
2628

2729
import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
2830
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
@@ -176,4 +178,9 @@ protected String sanitizedUrl(String url) {
176178
return super.sanitizedUrl(url)
177179
.replaceAll("(?i)(;password=)[^;]*", "$1****");
178180
}
181+
182+
@Override
183+
public String resolveSynonym(Connection connection, String tableId) throws SQLException {
184+
throw new SQLException("Derby does not support synonyms. Please use views instead.");
185+
}
179186
}

src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2092,4 +2092,18 @@ public String identifier() {
20922092
public String toString() {
20932093
return name();
20942094
}
2095+
2096+
@Override
2097+
public String resolveSynonym(Connection connection, String synonymName) throws SQLException {
2098+
DatabaseMetaData metadata = connection.getMetaData();
2099+
String tableName = parseTableIdentifier(synonymName).tableName();
2100+
ResultSet tableRs = metadata.getTables(null, null, tableName, new String[]{"SYNONYM"});
2101+
if (tableRs.next()) {
2102+
ResultSet synonymRs = metadata.getColumns(null, null, tableName, null);
2103+
if (synonymRs.next()) {
2104+
return synonymRs.getString("TABLE_NAME");
2105+
}
2106+
}
2107+
return null;
2108+
}
20952109
}

src/main/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialect.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,9 @@ protected String sanitizedUrl(String url) {
178178
.replaceAll("(?i)([(,]password=)[^,)]*", "$1****")
179179
.replaceAll("(://[^:]*:)([^@]*)@", "$1****@");
180180
}
181+
182+
@Override
183+
public String resolveSynonym(Connection connection, String synonymName) throws SQLException {
184+
throw new SQLException("MySQL does not support synonyms. Please use views instead.");
185+
}
181186
}

src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import java.io.ByteArrayInputStream;
2828
import java.io.StringReader;
2929
import java.nio.ByteBuffer;
30+
import java.sql.Connection;
3031
import java.sql.PreparedStatement;
32+
import java.sql.ResultSet;
3133
import java.sql.SQLException;
3234
import java.sql.Types;
3335
import org.apache.kafka.common.config.AbstractConfig;
@@ -405,4 +407,20 @@ public TableId parseTableIdentifier(String fqn) {
405407
}
406408
return tableId;
407409
}
410+
411+
@Override
412+
public String resolveSynonym(Connection connection, String synonymName) throws SQLException {
413+
try (PreparedStatement stmt = connection.prepareStatement(
414+
"SELECT TABLE_OWNER, TABLE_NAME FROM ALL_SYNONYMS WHERE OWNER = ? AND "
415+
+ "SYNONYM_NAME = ?")) {
416+
String tableName = parseTableIdentifier(synonymName).tableName();
417+
stmt.setString(1, connection.getMetaData().getUserName().toUpperCase());
418+
stmt.setString(2, tableName.toUpperCase());
419+
ResultSet rs = stmt.executeQuery();
420+
if (rs.next()) {
421+
return rs.getString("TABLE_NAME");
422+
}
423+
}
424+
return null;
425+
}
408426
}

src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ public PostgreSqlDatabaseDialect(AbstractConfig config) {
100100
super(config, new IdentifierRules(".", "\"", "\""));
101101
}
102102

103+
104+
@Override
105+
public String resolveSynonym(Connection connection, String synonymName) throws SQLException {
106+
throw new SQLException("PostgreSQL does not support synonyms. Please use views instead.");
107+
}
108+
103109
@Override
104110
public Connection getConnection() throws SQLException {
105111
Connection result = super.getConnection();

src/main/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialect.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.kafka.connect.data.Time;
2323
import org.apache.kafka.connect.data.Timestamp;
2424

25+
import java.sql.Connection;
26+
import java.sql.SQLException;
2527
import java.util.Collection;
2628
import java.util.Collections;
2729
import java.util.List;
@@ -166,4 +168,11 @@ public String buildUpsertQueryStatement(
166168
builder.append(" WITH PRIMARY KEY");
167169
return builder.toString();
168170
}
171+
172+
@Override
173+
public String resolveSynonym(Connection connection, String synonymName) throws SQLException {
174+
throw new SQLException(
175+
"Kafka JDBC Connector doesn't support Synonym Types on SAP Hana. "
176+
+ "Please use other table types instead.");
177+
}
169178
}

src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,17 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30+
import java.sql.Connection;
31+
import java.sql.DatabaseMetaData;
3032
import java.sql.PreparedStatement;
3133
import java.sql.ResultSet;
3234
import java.sql.ResultSetMetaData;
3335
import java.sql.SQLException;
36+
import java.sql.Statement;
3437
import java.sql.Types;
3538
import java.time.ZonedDateTime;
3639
import java.time.format.DateTimeFormatter;
40+
import java.util.ArrayList;
3741
import java.util.Calendar;
3842
import java.util.Collection;
3943
import java.util.Collections;
@@ -504,6 +508,54 @@ public void validateSpecificColumnTypes(
504508
verifiedSqlServerTimestamp = true;
505509
}
506510

511+
@Override
512+
public List<TableId> tableIds(Connection conn) throws SQLException {
513+
DatabaseMetaData metadata = conn.getMetaData();
514+
String[] tableTypes = tableTypes(metadata, this.tableTypes);
515+
String tableTypeDisplay = displayableTableTypes(tableTypes, ", ");
516+
log.debug("Using {} dialect to get {}", this, tableTypeDisplay);
517+
List<TableId> tableIds = new ArrayList<>();
518+
519+
// Get regular tables using metadata connection
520+
try (ResultSet rs = metadata.getTables(catalogPattern(), schemaPattern(), "%", tableTypes)) {
521+
while (rs.next()) {
522+
String catalogName = rs.getString(1);
523+
String schemaName = rs.getString(2);
524+
String tableName = rs.getString(3);
525+
TableId tableId = new TableId(catalogName, schemaName, tableName);
526+
if (includeTable(tableId)) {
527+
tableIds.add(tableId);
528+
}
529+
}
530+
}
531+
532+
// Get synonyms from sys.synonyms on a separate statement
533+
if (this.tableTypes.stream().anyMatch("SYNONYM"::equalsIgnoreCase)) {
534+
String synonymQuery =
535+
"SELECT "
536+
+ "DB_NAME() as catalog_name, "
537+
+ "SCHEMA_NAME(schema_id) as schema_name, "
538+
+ "name as synonym_name "
539+
+ "FROM sys.synonyms";
540+
541+
try (Statement stmt = conn.createStatement();
542+
ResultSet synonymRs = stmt.executeQuery(synonymQuery)) {
543+
while (synonymRs.next()) {
544+
String catalogName = synonymRs.getString(1);
545+
String schemaName = synonymRs.getString(2);
546+
String synonymName = synonymRs.getString(3);
547+
548+
TableId tableId = new TableId(catalogName, schemaName, synonymName);
549+
if (includeTable(tableId)) {
550+
tableIds.add(tableId);
551+
}
552+
}
553+
}
554+
}
555+
log.debug("Used {} dialect to find {} {}", this, tableIds.size(), tableTypeDisplay);
556+
return tableIds;
557+
}
558+
507559
@Override
508560
protected ColumnDefinition columnDefinition(
509561
ResultSet resultSet,
@@ -577,4 +629,20 @@ protected String sanitizedUrl(String url) {
577629
.replaceAll("(?i)(;keyStoreSecret=)[^;]*", "$1****")
578630
.replaceAll("(?i)(;gsscredential=)[^;]*", "$1****");
579631
}
632+
633+
@Override
634+
public String resolveSynonym(Connection connection, String synonymName) throws SQLException {
635+
try (PreparedStatement stmt =
636+
connection.prepareStatement(
637+
"SELECT PARSENAME(s.base_object_name, 1) AS TABLE_NAME "
638+
+ "FROM sys.synonyms s WHERE s.name = ?")) {
639+
String tableName = parseTableIdentifier(synonymName).tableName();
640+
stmt.setString(1, tableName.toUpperCase());
641+
ResultSet rs = stmt.executeQuery();
642+
if (rs.next()) {
643+
return rs.getString("TABLE_NAME");
644+
}
645+
}
646+
return null;
647+
}
580648
}

src/main/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialect.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.kafka.connect.data.Time;
2323
import org.apache.kafka.connect.data.Timestamp;
2424

25+
import java.sql.Connection;
26+
import java.sql.SQLException;
2527
import java.util.ArrayList;
2628
import java.util.Collection;
2729
import java.util.Collections;
@@ -144,4 +146,9 @@ public String buildUpsertQueryStatement(
144146
protected String currentTimestampDatabaseQuery() {
145147
return "SELECT strftime('%Y-%m-%d %H:%M:%S.%f','now')";
146148
}
149+
150+
@Override
151+
public String resolveSynonym(Connection connection, String synonymName) throws SQLException {
152+
throw new SQLException("SQLite does not support synonyms. Please use views instead.");
153+
}
147154
}

0 commit comments

Comments
 (0)