Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.

Commit 88ba87d

Browse files
Merge pull request #392 from revinjchalil/2.4
Fix streaming checkpoint issues
2 parents 91c15e5 + 91de948 commit 88ba87d

File tree

7 files changed

+26
-18
lines changed

7 files changed

+26
-18
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ limitations under the License.
2121
<groupId>com.microsoft.azure</groupId>
2222
<artifactId>azure-cosmosdb-spark_2.4.0_2.11</artifactId>
2323
<packaging>jar</packaging>
24-
<version>3.0.5</version>
24+
<version>3.0.6</version>
2525
<name>${project.groupId}:${project.artifactId}</name>
2626
<description>Spark Connector for Microsoft Azure CosmosDB</description>
2727
<url>http://azure.microsoft.com/en-us/services/documentdb/</url>

src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@
2323
package com.microsoft.azure.cosmosdb.spark
2424

2525
object Constants {
26-
val currentVersion = "2.4.0_2.11-3.0.5"
26+
val currentVersion = "2.4.0_2.11-3.0.6"
2727
val userAgentSuffix = s" SparkConnector/$currentVersion"
2828
}

src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
230230
}
231231
}
232232

233+
// set nextContinuation if the given partition does not have an existing checkpoint file and the "startFromBeginning" is False
234+
if (nextContinuation == null || nextContinuation.isEmpty){
235+
nextContinuation = feedResponse.getResponseContinuation
236+
}
237+
233238
logDebug(s"<-- readChangeFeed, Count: ${cfDocuments.length.toString}, NextContinuation: $nextContinuation")
234239

235240
updateTokenFunc(originalContinuation, nextContinuation, partitionId)

src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ object CosmosDBRDDIterator {
5050
var lastFeedOptions: FeedOptions = _
5151
var hdfsUtils: HdfsUtils = _
5252

53-
def initializeHdfsUtils(hadoopConfig: Map[String, String]): Any = {
53+
def initializeHdfsUtils(hadoopConfig: Map[String, String], changeFeedCheckpointLocation: String): Any = {
5454
if (hdfsUtils == null) {
5555
this.synchronized {
5656
if (hdfsUtils == null) {
57-
hdfsUtils = HdfsUtils(hadoopConfig)
57+
hdfsUtils = HdfsUtils(hadoopConfig, changeFeedCheckpointLocation)
5858
}
5959
}
6060
}
@@ -133,7 +133,11 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
133133
extends Iterator[Document]
134134
with CosmosDBLoggingTrait {
135135

136-
CosmosDBRDDIterator.initializeHdfsUtils(hadoopConfig.toMap)
136+
val changeFeedCheckpointLocation: String = config
137+
.get[String](CosmosDBConfig.ChangeFeedCheckpointLocation)
138+
.getOrElse(StringUtils.EMPTY)
139+
140+
CosmosDBRDDIterator.initializeHdfsUtils(hadoopConfig.toMap, changeFeedCheckpointLocation)
137141

138142
// The continuation token for the target CosmosDB partition
139143
private var cfCurrentToken: String = _
@@ -223,9 +227,6 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
223227

224228
val objectMapper: ObjectMapper = new ObjectMapper()
225229

226-
val changeFeedCheckpointLocation: String = config
227-
.get[String](CosmosDBConfig.ChangeFeedCheckpointLocation)
228-
.getOrElse(StringUtils.EMPTY)
229230
val queryName: String = config
230231
.get[String](CosmosDBConfig.ChangeFeedQueryName)
231232
.get

src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBSource.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,13 @@ private[spark] class CosmosDBSource(sqlContext: SQLContext,
6363
}
6464

6565
if (currentSchema == null) {
66+
val changeFeedCheckpointLocation: String = streamConfigMap
67+
.getOrElse(CosmosDBConfig.ChangeFeedCheckpointLocation, StringUtils.EMPTY)
6668
CosmosDBRDDIterator.initializeHdfsUtils(HdfsUtils.getConfigurationMap(
67-
sqlContext.sparkSession.sparkContext.hadoopConfiguration).toMap)
69+
sqlContext.sparkSession.sparkContext.hadoopConfiguration).toMap, changeFeedCheckpointLocation)
6870

6971
// Delete current tokens and next tokens checkpoint directories to ensure change feed starts from beginning if set
7072
if (streamConfigMap.getOrElse(CosmosDBConfig.ChangeFeedStartFromTheBeginning, String.valueOf(false)).toBoolean) {
71-
val changeFeedCheckpointLocation: String = streamConfigMap
72-
.getOrElse(CosmosDBConfig.ChangeFeedCheckpointLocation, StringUtils.EMPTY)
7373
val queryName = Config(streamConfigMap)
7474
.get[String](CosmosDBConfig.ChangeFeedQueryName).get
7575
val currentTokensCheckpointPath = changeFeedCheckpointLocation + "/" + HdfsUtils.filterFilename(queryName)

src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/DefaultCosmosDBWriteStreamPoisonMessageNotificationHandler.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,17 @@
2222
*/
2323
package com.microsoft.azure.cosmosdb.spark.streaming
2424

25-
import com.microsoft.azure.cosmosdb.{Document, ResourceResponse, RequestOptions}
25+
import com.microsoft.azure.cosmosdb.{Document, RequestOptions, ResourceResponse}
2626
import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
2727
import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait
2828
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
29-
3029
import java.io.{FileNotFoundException, PrintWriter, StringWriter}
3130
import java.time.Instant
3231
import java.time.format.DateTimeFormatter
3332
import java.util.concurrent.atomic.AtomicLong
3433

34+
import org.apache.commons.lang3.StringUtils
35+
3536
class DefaultCosmosDBWriteStreamPoisonMessageNotificationHandler(configMap: Map[String, String])
3637
extends CosmosDBWriteStreamPoisonMessageNotificationHandler
3738
with CosmosDBLoggingTrait
@@ -45,9 +46,9 @@ class DefaultCosmosDBWriteStreamPoisonMessageNotificationHandler(configMap: Map[
4546
.getOrElse(
4647
CosmosDBConfig.PoisonMessageLocation,
4748
String.valueOf(CosmosDBConfig.DefaultPoisonMessageLocation))
48-
49+
4950
private lazy val hdfsUtils: HdfsUtils = {
50-
HdfsUtils(configMap)
51+
HdfsUtils(configMap, poisonMessageLocation)
5152
}
5253

5354
def onPoisonMessage(lastError: Throwable, document: Document): Unit =
@@ -61,7 +62,7 @@ class DefaultCosmosDBWriteStreamPoisonMessageNotificationHandler(configMap: Map[
6162

6263
val payload = document.toJson()
6364

64-
logWarning(s"POSION MESSAGE Id: $id, Error: $error, Document payload: $payload")
65+
logWarning(s"POISON MESSAGE Id: $id, Error: $error, Document payload: $payload")
6566

6667
if (this.poisonMessageLocation != "")
6768
{

src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,17 @@ import org.apache.hadoop.conf.Configuration
3131
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
3232

3333
import scala.collection.mutable
34+
import java.net.URI
3435

35-
case class HdfsUtils(configMap: Map[String, String]) extends CosmosDBLoggingTrait {
36+
case class HdfsUtils(configMap: Map[String, String], changeFeedCheckpointLocation: String) extends CosmosDBLoggingTrait {
3637
private val fsConfig: Configuration = {
3738
val config = new Configuration()
3839
configMap.foreach(e => config.set(e._1, e._2))
3940
config
4041
}
4142

4243
private val maxRetryCount = 10
43-
private val fs = FileSystem.get(fsConfig)
44+
private val fs = FileSystem.get(new URI(changeFeedCheckpointLocation), fsConfig)
4445

4546
def write(base: String, filePath: String, content: String): Unit = {
4647
val path = new Path(base + "/" + filePath)

0 commit comments

Comments
 (0)