diff --git a/build.sbt b/build.sbt index fea5abf..1bf9ad1 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ lazy val `kafka-jdbc-connector` = (project in file(".")) .settings( name := "kafka-jdbc-connector", - version := "1.2.0", + version := "1.2.0-SNAPSHOT", organization := "com.agoda", scalaVersion := "2.11.7", crossScalaVersions := Seq("2.11.7", "2.12.2"), diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1da3fca..fb85794 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -2,7 +2,6 @@ import sbt._ object Dependencies { private val ScalaTestV = "3.0.1" - private val LogBack = "ch.qos.logback" % "logback-classic" % "1.2.3" private val ScalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.5.0" private val KafkaConnectApi = "org.apache.kafka" % "connect-api" % "0.9.0.0" diff --git a/src/main/scala/com/agoda/kafka/connector/jdbc/JdbcSourceConnector.scala b/src/main/scala/com/agoda/kafka/connector/jdbc/JdbcSourceConnector.scala index 0a19760..37683a3 100644 --- a/src/main/scala/com/agoda/kafka/connector/jdbc/JdbcSourceConnector.scala +++ b/src/main/scala/com/agoda/kafka/connector/jdbc/JdbcSourceConnector.scala @@ -12,7 +12,7 @@ import scala.util.{Failure, Success, Try} class JdbcSourceConnector extends SourceConnector { private val logger = LoggerFactory.getLogger(classOf[JdbcSourceConnector]) - + Class.forName("org.postgresql.Driver") private var config: JdbcSourceConnectorConfig = _ /** diff --git a/src/main/scala/com/agoda/kafka/connector/jdbc/models/DatabaseProduct.scala b/src/main/scala/com/agoda/kafka/connector/jdbc/models/DatabaseProduct.scala index 3d09de8..f9cd4fc 100644 --- a/src/main/scala/com/agoda/kafka/connector/jdbc/models/DatabaseProduct.scala +++ b/src/main/scala/com/agoda/kafka/connector/jdbc/models/DatabaseProduct.scala @@ -12,6 +12,7 @@ import scala.collection.immutable.IndexedSeq * * MySQL :: MySQL Server. * + * PostgreSQL :: PostgreSQL Server. */ sealed abstract class DatabaseProduct(override val entryName: String) extends EnumEntry @@ -21,4 +22,5 @@ object DatabaseProduct extends Enum[DatabaseProduct] { case object MsSQL extends DatabaseProduct("Microsoft SQL Server") case object MySQL extends DatabaseProduct("MySQL") + case object PostgreSQL extends DatabaseProduct("PostgreSQL") } \ No newline at end of file diff --git a/src/main/scala/com/agoda/kafka/connector/jdbc/services/IdBasedDataService.scala b/src/main/scala/com/agoda/kafka/connector/jdbc/services/IdBasedDataService.scala index 7601b2d..5de3fa1 100644 --- a/src/main/scala/com/agoda/kafka/connector/jdbc/services/IdBasedDataService.scala +++ b/src/main/scala/com/agoda/kafka/connector/jdbc/services/IdBasedDataService.scala @@ -5,7 +5,7 @@ import java.sql.{Connection, PreparedStatement, ResultSet} import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants import com.agoda.kafka.connector.jdbc.models.DatabaseProduct -import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL} +import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL} import com.agoda.kafka.connector.jdbc.models.Mode.IncrementingMode import com.agoda.kafka.connector.jdbc.utils.DataConverter import org.apache.kafka.connect.data.Schema @@ -44,8 +44,9 @@ case class IdBasedDataService(databaseProduct: DatabaseProduct, override def createPreparedStatement(connection: Connection): Try[PreparedStatement] = Try { val preparedStatement = databaseProduct match { - case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$incrementingVariableName = ?, @$batchSizeVariableName = ?") - case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$incrementingVariableName := ?, @$batchSizeVariableName := ?)") + case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$incrementingVariableName = ?, @$batchSizeVariableName = ?") + case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$incrementingVariableName := ?, @$batchSizeVariableName := ?)") + case PostgreSQL => connection.prepareStatement(s"SELECT * from $storedProcedureName (?, ?)") } preparedStatement.setObject(1, incrementingOffset) preparedStatement.setObject(2, batchSize) diff --git a/src/main/scala/com/agoda/kafka/connector/jdbc/services/TimeBasedDataService.scala b/src/main/scala/com/agoda/kafka/connector/jdbc/services/TimeBasedDataService.scala index 48976cc..133b6f0 100644 --- a/src/main/scala/com/agoda/kafka/connector/jdbc/services/TimeBasedDataService.scala +++ b/src/main/scala/com/agoda/kafka/connector/jdbc/services/TimeBasedDataService.scala @@ -5,7 +5,7 @@ import java.util.{Date, GregorianCalendar, TimeZone} import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants import com.agoda.kafka.connector.jdbc.models.DatabaseProduct -import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL} +import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL} import com.agoda.kafka.connector.jdbc.models.Mode.TimestampMode import com.agoda.kafka.connector.jdbc.utils.DataConverter import org.apache.kafka.connect.data.Schema @@ -43,8 +43,9 @@ case class TimeBasedDataService(databaseProduct: DatabaseProduct, override def createPreparedStatement(connection: Connection): Try[PreparedStatement] = Try { val preparedStatement = databaseProduct match { - case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$timestampVariableName = ?, @$batchSizeVariableName = ?") - case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$timestampVariableName := ?, @$batchSizeVariableName := ?)") + case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$timestampVariableName = ?, @$batchSizeVariableName = ?") + case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$timestampVariableName := ?, @$batchSizeVariableName := ?)") + case PostgreSQL => connection.prepareStatement(s"SELECT * from $storedProcedureName (?, ?)") } preparedStatement.setTimestamp(1, new Timestamp(timestampOffset), calendar) preparedStatement.setObject(2, batchSize) diff --git a/src/main/scala/com/agoda/kafka/connector/jdbc/services/TimeIdBasedDataService.scala b/src/main/scala/com/agoda/kafka/connector/jdbc/services/TimeIdBasedDataService.scala index 4e143dc..c9220e9 100644 --- a/src/main/scala/com/agoda/kafka/connector/jdbc/services/TimeIdBasedDataService.scala +++ b/src/main/scala/com/agoda/kafka/connector/jdbc/services/TimeIdBasedDataService.scala @@ -6,7 +6,7 @@ import java.util.{Date, GregorianCalendar, TimeZone} import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants import com.agoda.kafka.connector.jdbc.models.DatabaseProduct -import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL} +import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL} import com.agoda.kafka.connector.jdbc.models.Mode.{IncrementingMode, TimestampMode} import com.agoda.kafka.connector.jdbc.utils.DataConverter import org.apache.kafka.connect.data.Schema @@ -53,8 +53,9 @@ case class TimeIdBasedDataService(databaseProduct: DatabaseProduct, override def createPreparedStatement(connection: Connection): Try[PreparedStatement] = Try { val preparedStatement = databaseProduct match { - case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$timestampVariableName = ?, @$incrementingVariableName = ?, @$batchSizeVariableName = ?") - case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$timestampVariableName := ?, @$incrementingVariableName := ?, @$batchSizeVariableName := ?)") + case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$timestampVariableName = ?, @$incrementingVariableName = ?, @$batchSizeVariableName = ?") + case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$timestampVariableName := ?, @$incrementingVariableName := ?, @$batchSizeVariableName := ?)") + case PostgreSQL => connection.prepareStatement(s"SELECT * from $storedProcedureName (?, ?, ?)") } preparedStatement.setTimestamp(1, new Timestamp(timestampOffset), calendar) preparedStatement.setObject(2, incrementingOffset) diff --git a/src/test/scala/com/agoda/kafka/connector/jdbc/models/DatabaseProductTest.scala b/src/test/scala/com/agoda/kafka/connector/jdbc/models/DatabaseProductTest.scala index 4775f1a..9865b36 100644 --- a/src/test/scala/com/agoda/kafka/connector/jdbc/models/DatabaseProductTest.scala +++ b/src/test/scala/com/agoda/kafka/connector/jdbc/models/DatabaseProductTest.scala @@ -1,19 +1,25 @@ package com.agoda.kafka.connector.jdbc.models -import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL} +import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{ + MsSQL, + MySQL, + PostgreSQL +} import org.scalatest.{Matchers, WordSpec} -class DatabaseProductTest extends WordSpec with Matchers { +class DatabaseProductTest extends WordSpec with Matchers { "module" should { "convert DatabaseProduct to its string representation" in { DatabaseProduct.MySQL.entryName shouldEqual "MySQL" DatabaseProduct.MsSQL.entryName shouldEqual "Microsoft SQL Server" + DatabaseProduct.PostgreSQL.entryName shouldEqual "PostgreSQL" } "convert string to corresponding DatabaseProduct representation" in { DatabaseProduct.withName("MySQL") shouldBe MySQL DatabaseProduct.withName("Microsoft SQL Server") shouldBe MsSQL + DatabaseProduct.withName("PostgreSQL") shouldBe PostgreSQL } } } diff --git a/src/test/scala/com/agoda/kafka/connector/jdbc/services/IdBasedDataServiceTest.scala b/src/test/scala/com/agoda/kafka/connector/jdbc/services/IdBasedDataServiceTest.scala index 8a675f5..11a8895 100644 --- a/src/test/scala/com/agoda/kafka/connector/jdbc/services/IdBasedDataServiceTest.scala +++ b/src/test/scala/com/agoda/kafka/connector/jdbc/services/IdBasedDataServiceTest.scala @@ -3,7 +3,7 @@ package com.agoda.kafka.connector.jdbc.services import java.sql.{Connection, PreparedStatement, ResultSet} import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants -import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL} +import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL} import com.agoda.kafka.connector.jdbc.models.Mode.IncrementingMode import com.agoda.kafka.connector.jdbc.utils.DataConverter import org.apache.kafka.connect.data.{Field, Schema, Struct} @@ -50,6 +50,20 @@ class IdBasedDataServiceTest extends WordSpec with Matchers with MockitoSugar { dataConverter = dataConverter ) + val idBasedDataServicePostgreSQL = + IdBasedDataService( + databaseProduct = PostgreSQL, + storedProcedureName = "stored-procedure", + batchSize = 100, + batchSizeVariableName = "batch-size-variable", + incrementingVariableName = "incrementing-variable", + incrementingOffset = 0L, + incrementingFieldName = "id", + topic = "id-based-data-topic", + keyFieldOpt = None, + dataConverter = dataConverter + ) + "create correct prepared statement for Mssql" in { val connection = mock[Connection] val statement = mock[PreparedStatement] @@ -80,6 +94,21 @@ class IdBasedDataServiceTest extends WordSpec with Matchers with MockitoSugar { verify(statement).setObject(2, 100) } + "create correct prepared statement for PostgreSQL" in { + val connection = mock[Connection] + val statement = mock[PreparedStatement] + + when(connection.prepareStatement("SELECT * from stored-procedure (?, ?)")).thenReturn(statement) + doNothing().when(statement).setObject(1, 0L) + doNothing().when(statement).setObject(2, 100) + + idBasedDataServicePostgreSQL.createPreparedStatement(connection) + + verify(connection).prepareStatement("SELECT * from stored-procedure (?, ?)") + verify(statement).setObject(1, 0L) + verify(statement).setObject(2, 100) + } + "create correct string representation" in { idBasedDataServiceMssql.toString shouldBe s""" diff --git a/src/test/scala/com/agoda/kafka/connector/jdbc/services/TimeBasedDataServiceTest.scala b/src/test/scala/com/agoda/kafka/connector/jdbc/services/TimeBasedDataServiceTest.scala index b6df14c..af2ff1a 100644 --- a/src/test/scala/com/agoda/kafka/connector/jdbc/services/TimeBasedDataServiceTest.scala +++ b/src/test/scala/com/agoda/kafka/connector/jdbc/services/TimeBasedDataServiceTest.scala @@ -4,7 +4,7 @@ import java.sql._ import java.util.{GregorianCalendar, TimeZone} import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants -import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL} +import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL} import com.agoda.kafka.connector.jdbc.models.Mode.{IncrementingMode, TimestampMode} import com.agoda.kafka.connector.jdbc.utils.DataConverter import org.apache.kafka.connect.data.{Field, Schema, Struct} @@ -54,6 +54,21 @@ class TimeBasedDataServiceTest extends WordSpec with Matchers with MockitoSugar calendar = UTC_Calendar ) + val timeBasedDataServicePostgreSQL = + TimeBasedDataService( + databaseProduct = PostgreSQL, + storedProcedureName = "stored-procedure", + batchSize = 100, + batchSizeVariableName = "batch-size-variable", + timestampVariableName = "timestamp-variable", + timestampOffset = 0L, + timestampFieldName = "time", + topic = "time-based-data-topic", + keyFieldOpt = None, + dataConverter = dataConverter, + calendar = UTC_Calendar + ) + val timestamp = new Timestamp(0L) "create correct prepared statement for Mssql" in { @@ -86,6 +101,21 @@ class TimeBasedDataServiceTest extends WordSpec with Matchers with MockitoSugar verify(statement).setObject(2, 100) } + "create correct prepared statement for PostgreSQL" in { + val connection = mock[Connection] + val statement = mock[PreparedStatement] + + when(connection.prepareStatement("SELECT * from stored-procedure (?, ?)")).thenReturn(statement) + doNothing().when(statement).setTimestamp(1, timestamp, UTC_Calendar) + doNothing().when(statement).setObject(2, 100) + + timeBasedDataServicePostgreSQL.createPreparedStatement(connection) + + verify(connection).prepareStatement("SELECT * from stored-procedure (?, ?)") + verify(statement).setTimestamp(1, timestamp, UTC_Calendar) + verify(statement).setObject(2, 100) + } + "create correct string representation" in { timeBasedDataServiceMssql.toString shouldBe s""" diff --git a/src/test/scala/com/agoda/kafka/connector/jdbc/services/TimeIdBasedDataServiceTest.scala b/src/test/scala/com/agoda/kafka/connector/jdbc/services/TimeIdBasedDataServiceTest.scala index d424b08..9b3c447 100644 --- a/src/test/scala/com/agoda/kafka/connector/jdbc/services/TimeIdBasedDataServiceTest.scala +++ b/src/test/scala/com/agoda/kafka/connector/jdbc/services/TimeIdBasedDataServiceTest.scala @@ -4,7 +4,7 @@ import java.sql._ import java.util.{GregorianCalendar, TimeZone} import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants -import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL} +import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL} import com.agoda.kafka.connector.jdbc.models.Mode.{IncrementingMode, TimestampMode} import com.agoda.kafka.connector.jdbc.utils.DataConverter import org.apache.kafka.connect.data.{Field, Schema, Struct} @@ -61,6 +61,24 @@ class TimeIdBasedDataServiceTest extends WordSpec with Matchers with MockitoSuga calendar = UTC_CALENDAR ) + val timeIdBasedDataServicePostgresql = + TimeIdBasedDataService( + databaseProduct = PostgreSQL, + storedProcedureName = "stored-procedure", + batchSize = 100, + batchSizeVariableName = "batch-size-variable", + timestampVariableName = "timestamp-variable", + timestampOffset = 0L, + timestampFieldName = "time", + incrementingVariableName = "incrementing-variable", + incrementingOffset = 0L, + incrementingFieldName = "id", + topic = "time-id-based-data-topic", + keyFieldOpt = None, + dataConverter = dataConverter, + calendar = UTC_CALENDAR + ) + val timestamp = new Timestamp(0L) "create correct prepared statement for Mssql" in { @@ -97,6 +115,23 @@ class TimeIdBasedDataServiceTest extends WordSpec with Matchers with MockitoSuga verify(statement).setObject(3, 100) } + "create correct prepared statement for PostgreSQL" in { + val connection = mock[Connection] + val statement = mock[PreparedStatement] + + when(connection.prepareStatement("SELECT * from stored-procedure (?, ?, ?)")).thenReturn(statement) + doNothing().when(statement).setTimestamp(1, timestamp, UTC_CALENDAR) + doNothing().when(statement).setObject(2, 0L) + doNothing().when(statement).setObject(3, 100) + + timeIdBasedDataServicePostgresql.createPreparedStatement(connection) + + verify(connection).prepareStatement("SELECT * from stored-procedure (?, ?, ?)") + verify(statement).setTimestamp(1, timestamp, UTC_CALENDAR) + verify(statement).setObject(2, 0L) + verify(statement).setObject(3, 100) + } + "create correct string representation" in { timeIdBasedDataServiceMssql.toString shouldBe s"""