Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.

Commit 4309f32

Browse files
committed
update the code to correctly handle SAM types
1 parent 88ba87d commit 4309f32

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD
3131
import org.apache.spark.sql.sources.Filter
3232
import org.apache.spark.sql.types.StructType
3333
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
34+
import org.apache.spark.util.TaskCompletionListener
3435
import org.apache.spark.{Partition, TaskContext}
3536

3637
import scala.collection.mutable
@@ -114,9 +115,10 @@ class CosmosDBRDD(
114115
case cosmosDBPartition: CosmosDBPartition =>
115116
logInfo(s"CosmosDBRDD:compute: Start CosmosDBRDD compute task for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
116117

117-
context.addTaskCompletionListener((ctx: TaskContext) => {
118+
val taskCompletionListener:TaskCompletionListener = (ctx: TaskContext) => {
118119
logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
119-
})
120+
}
121+
context.addTaskCompletionListener(taskCompletionListener)
120122

121123
new CosmosDBRDDIterator(
122124
hadoopConfig,

src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import com.microsoft.azure.documentdb.internal.HttpConstants.SubStatusCodes
4141
import org.apache.commons.lang3.StringUtils
4242
import org.apache.spark._
4343
import org.apache.spark.sql.sources.Filter
44+
import org.apache.spark.util.TaskCompletionListener
4445

4546
import scala.collection.mutable
4647

@@ -424,9 +425,10 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
424425
})
425426

426427
// Register an on-task-completion callback to close the input stream.
427-
taskContext.addTaskCompletionListener((_: TaskContext) => {
428-
closeIfNeeded()
429-
})
428+
val taskCompletionListener: TaskCompletionListener = new TaskCompletionListener() {
429+
override def onTaskCompletion(context: TaskContext): Unit = closeIfNeeded()
430+
}
431+
taskContext.addTaskCompletionListener(taskCompletionListener)
430432

431433
if (!readingChangeFeed) {
432434
queryDocuments

0 commit comments

Comments
 (0)