From 707d2ce5c05e3b6ed0b31a89abe3831f664be978 Mon Sep 17 00:00:00 2001 From: Khoa Dang Date: Mon, 28 Aug 2017 17:16:54 -0700 Subject: [PATCH 1/8] Enable travis test --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0f57d3f6..72481c0f 100644 --- a/pom.xml +++ b/pom.xml @@ -244,7 +244,7 @@ limitations under the License. test - + test From e0374d9167abf94c2d1806d228d1781c6e78f418 Mon Sep 17 00:00:00 2001 From: Khoa Dang Date: Mon, 28 Aug 2017 17:46:29 -0700 Subject: [PATCH 2/8] Remove osx build and test --- .travis.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index d806a39a..6fc37524 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,5 @@ matrix: include: - os: linux jdk: oraclejdk8 - - os: osx - osx_image: xcode8 script: - mvn clean package -DargLine="-DCosmosDBEndpoint=$CosmosDBEndpoint -DCosmosDBKey=$CosmosDBKey" From 8b46ee5999da341555ff1cfe15d7dcc3ced03a8f Mon Sep 17 00:00:00 2001 From: Khoa Dang Date: Mon, 28 Aug 2017 18:09:58 -0700 Subject: [PATCH 3/8] Simplify the build --- pom.xml | 36 +++--------------------------------- 1 file changed, 3 insertions(+), 33 deletions(-) diff --git a/pom.xml b/pom.xml index 72481c0f..787212ea 100644 --- a/pom.xml +++ b/pom.xml @@ -125,42 +125,19 @@ limitations under the License. ./src/test/scala + maven-assembly-plugin jar-with-dependencies - - - make-assembly - package - - single - - - + org.apache.maven.plugins maven-dependency-plugin 2.8 - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/alternateLocation - false - false - true - ::* - - - org.apache.maven.plugins @@ -219,17 +196,10 @@ limitations under the License. + org.apache.maven.plugins maven-source-plugin 2.2.1 - - - attach-sources - - jar-no-fork - - - org.scalatest From 0a6a1c9cb061358bc785e60c1775bbef58c13404 Mon Sep 17 00:00:00 2001 From: Khoa Dang Date: Mon, 28 Aug 2017 18:24:22 -0700 Subject: [PATCH 4/8] Debug only: add change feed logging --- .../microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala index af769ab2..b381f1fd 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala @@ -27,6 +27,7 @@ import com.microsoft.azure.cosmosdb.spark.config._ import com.microsoft.azure.documentdb._ import com.microsoft.azure.documentdb.internal._ import com.microsoft.azure.documentdb.rx._ +import org.codehaus.jackson.map.ObjectMapper import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer @@ -162,6 +163,8 @@ private[spark] case class CosmosDBConnection(config: Config) extends LoggingTrai while (feedResponse.getQueryIterator.hasNext) { cfDocuments.addAll(feedResponse.getQueryIterable.fetchNextBlock()) } + val objectMapper = new ObjectMapper() + logInfo(s"change feed (partition=${changeFeedOptions.getPartitionKeyRangeId}, token=${changeFeedOptions.getRequestContinuation}): documents with id: ${objectMapper.writeValueAsString(cfDocuments.map(x => x.getId).toArray)}") Tuple2.apply(cfDocuments.iterator(), feedResponse.getResponseContinuation) } From e94256dd940d6edb9eb643d9fd7382d14fe39ab6 Mon Sep 17 00:00:00 2001 From: Khoa Dang Date: Tue, 29 Aug 2017 11:40:24 -0700 Subject: [PATCH 5/8] Adjust the unit streaming tests --- .../azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala b/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala index f73f869d..1f36299c 100644 --- a/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala +++ b/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala @@ -580,7 +580,7 @@ class CosmosDBDataFrameSpec extends RequiresCosmosDB { val streamingGapMs = TimeUnit.SECONDS.toMillis(10) val insertIntervalMs = TimeUnit.SECONDS.toMillis(1) / 2 // There is a delay from starting the writing to the stream to the first data being written - val streamingSinkDelayMs = TimeUnit.SECONDS.toMillis(7) + val streamingSinkDelayMs = TimeUnit.SECONDS.toMillis(8) val insertIterations: Int = ((streamingGapMs * 2 + streamingTimeMs) / insertIntervalMs).toInt val cfCheckpointPath = "./changefeedcheckpoint" From 7570dcbc44533b4a17d08785ed0ca9540d1da638 Mon Sep 17 00:00:00 2001 From: Khoa Dang Date: Tue, 29 Aug 2017 12:19:01 -0700 Subject: [PATCH 6/8] Refactor tests --- .../cosmosdb/spark/CosmosDBConnection.scala | 2 +- .../azure/cosmosdb/spark/util/HdfsUtils.scala | 2 +- src/test/resources/log4j.properties | 1 + .../cosmosdb/spark/CosmosDBDefaults.scala | 24 +++++++++++++++---- .../cosmosdb/spark/RequiresCosmosDB.scala | 4 ++++ .../spark/schema/CosmosDBDataFrameSpec.scala | 3 ++- 6 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala index b381f1fd..d9ecd9a9 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala @@ -164,7 +164,7 @@ private[spark] case class CosmosDBConnection(config: Config) extends LoggingTrai cfDocuments.addAll(feedResponse.getQueryIterable.fetchNextBlock()) } val objectMapper = new ObjectMapper() - logInfo(s"change feed (partition=${changeFeedOptions.getPartitionKeyRangeId}, token=${changeFeedOptions.getRequestContinuation}): documents with id: ${objectMapper.writeValueAsString(cfDocuments.map(x => x.getId).toArray)}") + logDebug(s"change feed (partition=${changeFeedOptions.getPartitionKeyRangeId}, token=${changeFeedOptions.getRequestContinuation}): documents with id: ${objectMapper.writeValueAsString(cfDocuments.map(x => x.getId).toArray)}") Tuple2.apply(cfDocuments.iterator(), feedResponse.getResponseContinuation) } diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala index 2501689a..3d0d8fe8 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala @@ -45,7 +45,7 @@ case class HdfsUtils(configMap: Map[String, String]) extends LoggingTrait { val os = fs.create(path) os.writeUTF(content) os.close() - logInfo(s"Write $content for $path") + logDebug(s"Write $content for $path") } def read(base: String, filePath: String): String = { diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index 577f6f91..9d0b6200 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -9,6 +9,7 @@ log4j.category.org.apache.http.wire=INFO log4j.category.org.apache.http.headers=INFO log4j.category.com.microsoft.azure.documentdb=INFO +log4j.category.com.microsoft.azure.cosmosdb.spark=DEBUG # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender diff --git a/src/test/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBDefaults.scala b/src/test/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBDefaults.scala index 70ba84b2..8cfe1643 100644 --- a/src/test/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBDefaults.scala +++ b/src/test/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBDefaults.scala @@ -35,8 +35,22 @@ object CosmosDBDefaults { class CosmosDBDefaults extends LoggingTrait { - val CosmosDBEndpoint: String = System.getProperty("CosmosDBEndpoint") - val CosmosDBKey: String = System.getProperty("CosmosDBKey") + val CosmosDBEndpoint: String = { + val cosmosDBEndpoint = "CosmosDBEndpoint" + val endpoint = System.getProperty(cosmosDBEndpoint) + if (endpoint != null) + endpoint + else + System.getenv(cosmosDBEndpoint) + } + val CosmosDBKey: String = { + val cosmosDBKey = "CosmosDBKey" + val key = System.getProperty(cosmosDBKey) + if (key != null) + key + else + System.getenv(cosmosDBKey) + } val DatabaseName = "cosmosdb-spark-connector-test" val PartitionKeyName = "pkey" @@ -78,7 +92,7 @@ class CosmosDBDefaults extends LoggingTrait { documentDBClient.createDatabase(database, null) logInfo(s"Created collection with Id ${database.getId}") } catch { - case NonFatal(e) => logError(s"Failed to create database '$databaseName'", e) + case NonFatal(e) => logWarning(s"Failed to create database '$databaseName'", e) } } @@ -88,7 +102,7 @@ class CosmosDBDefaults extends LoggingTrait { documentDBClient.deleteDatabase(databaseLink, null) logInfo(s"Deleted collection with link '$databaseLink'") } catch { - case NonFatal(e) => logError(s"Failed to delete database '$databaseLink'", e) + case NonFatal(e) => logWarning(s"Failed to delete database '$databaseLink'", e) } } @@ -125,7 +139,7 @@ class CosmosDBDefaults extends LoggingTrait { documentDBClient.deleteCollection(collectionLink, null) logInfo(s"Deleted collection with link '$collectionLink'") } catch { - case NonFatal(e) => logError(s"Failed to delete collection '$collectionLink'", e) + case NonFatal(e) => logWarning(s"Failed to delete collection '$collectionLink'", e) } } diff --git a/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala b/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala index 85b985ca..c6c87ed5 100644 --- a/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala +++ b/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala @@ -72,6 +72,10 @@ trait RequiresCosmosDB extends FlatSpecLike with Matchers with BeforeAndAfterAll cosmosDBDefaults.createDatabase(cosmosDBDefaults.DatabaseName) } + override def afterAll(): Unit = { + cosmosDBDefaults.deleteDatabase(cosmosDBDefaults.DatabaseName) + } + override def beforeEach(): Unit = { // this is run first // most of the initializations needed are moved to withFixture to use the test name diff --git a/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala b/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala index 1f36299c..28b295a3 100644 --- a/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala +++ b/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala @@ -623,7 +623,6 @@ class CosmosDBDataFrameSpec extends RequiresCosmosDB { newDoc.set(cosmosDBDefaults.PartitionKeyName, i) newDoc.set("content", s"sample content for document with ID $i") documentClient.createDocument(sourceCollectionLink, newDoc, null, true) - logInfo(s"Created document with ID $i") TimeUnit.MILLISECONDS.sleep(insertIntervalMs) }) docIdIndex = docIdIndex + insertIterations @@ -759,6 +758,8 @@ class CosmosDBDataFrameSpec extends RequiresCosmosDB { streamingQuery.stop() CosmosDBRDDIterator.resetCollectionContinuationTokens() + + cosmosDBDefaults.deleteCollection(databaseName, sinkCollection) } it should "work with a slow source" in withSparkSession() { spark => From 3ba00eef57b7e5a380c369a6e9253e9ca4f3cda7 Mon Sep 17 00:00:00 2001 From: Khoa Dang Date: Tue, 29 Aug 2017 12:43:15 -0700 Subject: [PATCH 7/8] Delete db before creating it --- .../microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala b/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala index c6c87ed5..b27d5093 100644 --- a/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala +++ b/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala @@ -69,11 +69,8 @@ trait RequiresCosmosDB extends FlatSpecLike with Matchers with BeforeAndAfterAll // if running against localhost emulator HttpClientFactory.DISABLE_HOST_NAME_VERIFICATION = true - cosmosDBDefaults.createDatabase(cosmosDBDefaults.DatabaseName) - } - - override def afterAll(): Unit = { cosmosDBDefaults.deleteDatabase(cosmosDBDefaults.DatabaseName) + cosmosDBDefaults.createDatabase(cosmosDBDefaults.DatabaseName) } override def beforeEach(): Unit = { From d4e09045d9a262d8c0add576fb7698b46d24808c Mon Sep 17 00:00:00 2001 From: Khoa Dang Date: Tue, 29 Aug 2017 13:04:59 -0700 Subject: [PATCH 8/8] Not deleting the database --- .../com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala b/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala index b27d5093..28481b22 100644 --- a/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala +++ b/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala @@ -68,8 +68,7 @@ trait RequiresCosmosDB extends FlatSpecLike with Matchers with BeforeAndAfterAll override def beforeAll(): Unit = { // if running against localhost emulator HttpClientFactory.DISABLE_HOST_NAME_VERIFICATION = true - - cosmosDBDefaults.deleteDatabase(cosmosDBDefaults.DatabaseName) + cosmosDBDefaults.createDatabase(cosmosDBDefaults.DatabaseName) }