From dea341d60a2fd6c242ddcb8c93f835fda9a24d93 Mon Sep 17 00:00:00 2001 From: Vincenzo Verrone Date: Thu, 16 Apr 2020 02:48:54 +0200 Subject: [PATCH 1/3] Resolves #363 --- .../spark/AsyncCosmosDBConnection.scala | 25 +++++++++++--- .../cosmosdb/spark/CosmosDBConnection.scala | 33 ++++++++++++++----- .../cosmosdb/spark/SparkTokenResolver.scala | 7 ++++ .../spark/config/CosmosDBConfig.scala | 1 + .../cosmosdb/spark/util/CosmosUtils.scala | 12 +++++++ 5 files changed, 65 insertions(+), 13 deletions(-) create mode 100644 src/main/scala/com/microsoft/azure/cosmosdb/spark/SparkTokenResolver.scala create mode 100644 src/main/scala/com/microsoft/azure/cosmosdb/spark/util/CosmosUtils.scala diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala index 2955f830..bd463809 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala @@ -31,19 +31,20 @@ import com.microsoft.azure.cosmosdb._ import com.microsoft.azure.cosmosdb.internal._ import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRowConverter -import com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBWriteStreamRetryPolicy import org.apache.spark.sql.Row import scala.collection.JavaConversions._ import scala.language.implicitConversions import scala.reflect.ClassTag - import java.util.concurrent.ConcurrentHashMap +import com.microsoft.azure.cosmosdb.spark.util.CosmosUtils + case class AsyncClientConfiguration(host: String, key: String, connectionPolicy: ConnectionPolicy, - consistencyLevel: ConsistencyLevel) + consistencyLevel: ConsistencyLevel, + tokenResolver: TokenResolver) object AsyncCosmosDBConnection { private lazy val clients: ConcurrentHashMap[Config, AsyncDocumentClient] = { @@ -104,12 +105,25 @@ object AsyncCosmosDBConnection { .getOrElse(CosmosDBConfig.DefaultConsistencyLevel)) val resourceToken = config.getOrElse[String](CosmosDBConfig.ResourceToken, "") + val resourceKey = config.getOrElse[String](CosmosDBConfig.Masterkey, resourceToken) + + // Check Resource Token and Token Resolver + var tokenResolver: TokenResolver = null + val tokenResolverClassName = config.getOrElse[String](CosmosDBConfig.TokenResolver, "") + + if (!tokenResolverClassName.isEmpty) { + tokenResolver = CosmosUtils.getTokenResolverFromClassName(tokenResolverClassName) + if (classOf[SparkTokenResolver].isAssignableFrom(tokenResolver.getClass)) { + tokenResolver.asInstanceOf[SparkTokenResolver].initialize(config) + } + } AsyncClientConfiguration( config.get[String](CosmosDBConfig.Endpoint).get, - config.getOrElse[String](CosmosDBConfig.Masterkey, resourceToken), + resourceKey, connectionPolicy, - consistencyLevel + consistencyLevel, + tokenResolver ) } @@ -126,6 +140,7 @@ object AsyncCosmosDBConnection { .withMasterKeyOrResourceToken(clientConfig.key) .withConnectionPolicy(clientConfig.connectionPolicy) .withConsistencyLevel(clientConfig.consistencyLevel) + .withTokenResolver(clientConfig.tokenResolver) .build() } } diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala index 3663c24a..e37b1570 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala @@ -24,8 +24,9 @@ package com.microsoft.azure.cosmosdb.spark import java.lang.management.ManagementFactory +import com.microsoft.azure.cosmosdb.{CosmosResourceType, TokenResolver} import com.microsoft.azure.cosmosdb.spark.config._ -import com.microsoft.azure.documentdb +import com.microsoft.azure.cosmosdb.spark.util.CosmosUtils import com.microsoft.azure.documentdb._ import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor import com.microsoft.azure.documentdb.internal._ @@ -42,7 +43,8 @@ case class ClientConfiguration(host: String, key: String, connectionPolicy: ConnectionPolicy, consistencyLevel: ConsistencyLevel, - resourceLink: String) + resourceLink: String, + tokenResolver: TokenResolver) object CosmosDBConnection extends CosmosDBLoggingTrait { // For verification purpose @@ -482,11 +484,24 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog val consistencyLevel = ConsistencyLevel.valueOf(config.get[String](CosmosDBConfig.ConsistencyLevel) .getOrElse(CosmosDBConfig.DefaultConsistencyLevel)) - //Check if resource token exists - val resourceToken = config.getOrElse[String](CosmosDBConfig.ResourceToken, "") - var resourceLink: String = "" - if(!resourceToken.isEmpty) { - resourceLink = s"dbs/${config.get[String](CosmosDBConfig.Database).get}/colls/${config.get[String](CosmosDBConfig.Collection).get}" + // check Token Resolver before checking resource token + var resourceLink = s"dbs/${config.get[String](CosmosDBConfig.Database).get}/colls/${config.get[String](CosmosDBConfig.Collection).get}" + var resourceToken = config.getOrElse(CosmosDBConfig.ResourceToken, "") + + var tokenResolver: TokenResolver = null + val tokenResolverClassName = config.getOrElse[String](CosmosDBConfig.TokenResolver, "") + + if (!tokenResolverClassName.isEmpty) { + tokenResolver = CosmosUtils.getTokenResolverFromClassName(tokenResolverClassName) + if (classOf[SparkTokenResolver].isAssignableFrom(tokenResolver.getClass)) { + tokenResolver.asInstanceOf[SparkTokenResolver].initialize(config) + } + + resourceToken = tokenResolver.getAuthorizationToken("GET", resourceLink, CosmosResourceType.DocumentCollection, config.asOptions) + } + + if(resourceToken.isEmpty) { + resourceLink = "" } ClientConfiguration( @@ -494,7 +509,9 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog config.getOrElse[String](CosmosDBConfig.Masterkey, resourceToken), connectionPolicy, consistencyLevel, - resourceLink) + resourceLink, + tokenResolver + ) } } diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/SparkTokenResolver.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/SparkTokenResolver.scala new file mode 100644 index 00000000..a517e5ab --- /dev/null +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/SparkTokenResolver.scala @@ -0,0 +1,7 @@ +package com.microsoft.azure.cosmosdb.spark + +import com.microsoft.azure.cosmosdb.spark.config.Config + +trait SparkTokenResolver { + def initialize(config: Config): Unit +} diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala index 10bf6fcb..b818e05e 100755 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala @@ -39,6 +39,7 @@ object CosmosDBConfig { val Collection = "collection" val Masterkey = "masterkey" val ResourceToken = "resourcetoken" + val TokenResolver = "tokenresolver" val PreferredRegionsList = "preferredregions" val ConsistencyLevel = "consistencylevel" diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/CosmosUtils.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/CosmosUtils.scala new file mode 100644 index 00000000..9a3340b0 --- /dev/null +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/CosmosUtils.scala @@ -0,0 +1,12 @@ +package com.microsoft.azure.cosmosdb.spark.util + +import com.microsoft.azure.cosmosdb.{TokenResolver} + +object CosmosUtils extends Serializable { + + def getTokenResolverFromClassName(className: String, constructorArgs: AnyRef*): TokenResolver = { + val argsClassSeq = constructorArgs.map(e => e.getClass) + Class.forName(className).getDeclaredConstructor(argsClassSeq:_*).newInstance(constructorArgs:_*).asInstanceOf[TokenResolver] + } + +} From 63b36db957332d97998bf26d7edc30a5cd936971 Mon Sep 17 00:00:00 2001 From: Vincenzo Verrone Date: Mon, 20 Apr 2020 18:02:56 +0200 Subject: [PATCH 2/3] Extended TokenResolver to handle shaded dependencies --- .../azure/cosmosdb/spark/AsyncCosmosDBConnection.scala | 8 +++----- .../azure/cosmosdb/spark/CosmosDBConnection.scala | 9 +++------ ...rkTokenResolver.scala => CosmosDBTokenResolver.scala} | 3 ++- .../azure/cosmosdb/spark/util/CosmosUtils.scala | 6 +++--- 4 files changed, 11 insertions(+), 15 deletions(-) rename src/main/scala/com/microsoft/azure/cosmosdb/spark/{SparkTokenResolver.scala => CosmosDBTokenResolver.scala} (58%) diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala index bd463809..7f0dded1 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala @@ -44,7 +44,7 @@ case class AsyncClientConfiguration(host: String, key: String, connectionPolicy: ConnectionPolicy, consistencyLevel: ConsistencyLevel, - tokenResolver: TokenResolver) + tokenResolver: CosmosDBTokenResolver) object AsyncCosmosDBConnection { private lazy val clients: ConcurrentHashMap[Config, AsyncDocumentClient] = { @@ -108,14 +108,12 @@ object AsyncCosmosDBConnection { val resourceKey = config.getOrElse[String](CosmosDBConfig.Masterkey, resourceToken) // Check Resource Token and Token Resolver - var tokenResolver: TokenResolver = null + var tokenResolver: CosmosDBTokenResolver = null val tokenResolverClassName = config.getOrElse[String](CosmosDBConfig.TokenResolver, "") if (!tokenResolverClassName.isEmpty) { tokenResolver = CosmosUtils.getTokenResolverFromClassName(tokenResolverClassName) - if (classOf[SparkTokenResolver].isAssignableFrom(tokenResolver.getClass)) { - tokenResolver.asInstanceOf[SparkTokenResolver].initialize(config) - } + tokenResolver.initialize(config) } AsyncClientConfiguration( diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala index 8461bb26..90ef69f2 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala @@ -44,7 +44,7 @@ case class ClientConfiguration(host: String, connectionPolicy: ConnectionPolicy, consistencyLevel: ConsistencyLevel, resourceLink: String, - tokenResolver: TokenResolver) + tokenResolver: CosmosDBTokenResolver) object CosmosDBConnection extends CosmosDBLoggingTrait { // For verification purpose @@ -497,15 +497,12 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog var resourceLink = s"dbs/${config.get[String](CosmosDBConfig.Database).get}/colls/${config.get[String](CosmosDBConfig.Collection).get}" var resourceToken = config.getOrElse(CosmosDBConfig.ResourceToken, "") - var tokenResolver: TokenResolver = null + var tokenResolver: CosmosDBTokenResolver = null val tokenResolverClassName = config.getOrElse[String](CosmosDBConfig.TokenResolver, "") if (!tokenResolverClassName.isEmpty) { tokenResolver = CosmosUtils.getTokenResolverFromClassName(tokenResolverClassName) - if (classOf[SparkTokenResolver].isAssignableFrom(tokenResolver.getClass)) { - tokenResolver.asInstanceOf[SparkTokenResolver].initialize(config) - } - + tokenResolver.initialize(config) resourceToken = tokenResolver.getAuthorizationToken("GET", resourceLink, CosmosResourceType.DocumentCollection, config.asOptions) } diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/SparkTokenResolver.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBTokenResolver.scala similarity index 58% rename from src/main/scala/com/microsoft/azure/cosmosdb/spark/SparkTokenResolver.scala rename to src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBTokenResolver.scala index a517e5ab..e81f1e25 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/SparkTokenResolver.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBTokenResolver.scala @@ -1,7 +1,8 @@ package com.microsoft.azure.cosmosdb.spark import com.microsoft.azure.cosmosdb.spark.config.Config +import com.microsoft.azure.cosmosdb.TokenResolver -trait SparkTokenResolver { +trait CosmosDBTokenResolver extends TokenResolver { def initialize(config: Config): Unit } diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/CosmosUtils.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/CosmosUtils.scala index 9a3340b0..b2983775 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/CosmosUtils.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/CosmosUtils.scala @@ -1,12 +1,12 @@ package com.microsoft.azure.cosmosdb.spark.util -import com.microsoft.azure.cosmosdb.{TokenResolver} +import com.microsoft.azure.cosmosdb.spark.CosmosDBTokenResolver object CosmosUtils extends Serializable { - def getTokenResolverFromClassName(className: String, constructorArgs: AnyRef*): TokenResolver = { + def getTokenResolverFromClassName(className: String, constructorArgs: AnyRef*): CosmosDBTokenResolver = { val argsClassSeq = constructorArgs.map(e => e.getClass) - Class.forName(className).getDeclaredConstructor(argsClassSeq:_*).newInstance(constructorArgs:_*).asInstanceOf[TokenResolver] + Class.forName(className).getDeclaredConstructor(argsClassSeq:_*).newInstance(constructorArgs:_*).asInstanceOf[CosmosDBTokenResolver] } } From 3a12707c0a0a7d4ce104cb61e9b3df45d4a76f2d Mon Sep 17 00:00:00 2001 From: Vincenzo Verrone Date: Mon, 20 Apr 2020 23:21:25 +0200 Subject: [PATCH 3/3] Fix token resolution when using TokenResover + fix shaded dependencies --- pom.xml | 2 ++ .../azure/cosmosdb/spark/AsyncCosmosDBConnection.scala | 6 ++++-- .../microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 051c511f..e1391b51 100644 --- a/pom.xml +++ b/pom.xml @@ -317,6 +317,8 @@ limitations under the License. com.microsoft.azure.cosmosdb.internal.query.metrics.* com.microsoft.azure.cosmosdb.internal.query.orderbyquery.* com.microsoft.azure.cosmosdb.internal.routing.* + com.microsoft.azure.cosmosdb.TokenResolver + com.microsoft.azure.cosmosdb.CosmosResourceType diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala index 7f0dded1..e70585f6 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/AsyncCosmosDBConnection.scala @@ -104,8 +104,7 @@ object AsyncCosmosDBConnection { val consistencyLevel = ConsistencyLevel.valueOf(config.get[String](CosmosDBConfig.ConsistencyLevel) .getOrElse(CosmosDBConfig.DefaultConsistencyLevel)) - val resourceToken = config.getOrElse[String](CosmosDBConfig.ResourceToken, "") - val resourceKey = config.getOrElse[String](CosmosDBConfig.Masterkey, resourceToken) + var resourceKey: String = null // Check Resource Token and Token Resolver var tokenResolver: CosmosDBTokenResolver = null @@ -114,6 +113,9 @@ object AsyncCosmosDBConnection { if (!tokenResolverClassName.isEmpty) { tokenResolver = CosmosUtils.getTokenResolverFromClassName(tokenResolverClassName) tokenResolver.initialize(config) + } else { + val resourceToken = config.getOrElse[String](CosmosDBConfig.ResourceToken, "") + resourceKey = config.getOrElse[String](CosmosDBConfig.Masterkey, resourceToken) } AsyncClientConfiguration( diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala index 90ef69f2..11301de0 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala @@ -24,7 +24,7 @@ package com.microsoft.azure.cosmosdb.spark import java.lang.management.ManagementFactory -import com.microsoft.azure.cosmosdb.{CosmosResourceType, TokenResolver} +import com.microsoft.azure.cosmosdb.{CosmosResourceType} import com.microsoft.azure.cosmosdb.spark.config._ import com.microsoft.azure.cosmosdb.spark.util.CosmosUtils import com.microsoft.azure.documentdb._