Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.

Commit e071c3b

Browse files
Merge pull request #385 from rifuller/fix_iterator_size
Reduce the size of data sent to driver when reading
2 parents 23f415e + 2a2a8b2 commit e071c3b

File tree

1 file changed

+23
-7
lines changed

1 file changed

+23
-7
lines changed

src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor
2929
import scala.collection.JavaConversions._
3030
import scala.collection.mutable.ListBuffer
3131
import scala.language.implicitConversions
32+
import scala.reflect.ClassTag
3233
import scala.util.control.Breaks._
3334

3435
private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLoggingTrait with Serializable {
@@ -46,10 +47,10 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
4647
CosmosDBConnectionCache.reinitializeClient(clientConfig)
4748
}
4849

49-
def getAllPartitions: Array[PartitionKeyRange] = {
50+
def getAllPartitions: List[PartitionKeyRange] = {
5051
val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig)
5152
val ranges = documentClient.readPartitionKeyRanges(collectionLink, null.asInstanceOf[FeedOptions])
52-
ranges.getQueryIterator.toArray
53+
getListFromFeedResponse(ranges)
5354
}
5455

5556
def getDocumentBulkImporter: DocumentBulkExecutor = {
@@ -64,20 +65,35 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
6465
feedOpts: FeedOptions): Iterator[Document] = {
6566

6667
val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig)
67-
val feedResponse = documentClient.queryDocuments(collectionLink, new SqlQuerySpec(queryString), feedOpts)
68-
feedResponse.getQueryIterable.iterator()
68+
val feedResponse: FeedResponse[Document] = documentClient.queryDocuments(collectionLink, new SqlQuerySpec(queryString), feedOpts)
69+
getListFromFeedResponse(feedResponse).iterator
6970
}
7071

7172
def queryDocuments(collectionLink: String, queryString: String,
7273
feedOpts: FeedOptions): Iterator[Document] = {
7374
val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig)
74-
val feedResponse = documentClient.queryDocuments(collectionLink, new SqlQuerySpec(queryString), feedOpts)
75-
feedResponse.getQueryIterable.iterator()
75+
val feedResponse: FeedResponse[Document] = documentClient.queryDocuments(collectionLink, new SqlQuerySpec(queryString), feedOpts)
76+
getListFromFeedResponse(feedResponse).iterator
7677
}
7778

7879
def readDocuments(feedOptions: FeedOptions): Iterator[Document] = {
7980
val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig)
80-
documentClient.readDocuments(collectionLink, feedOptions).getQueryIterable.iterator()
81+
val resp: FeedResponse[Document] = documentClient.readDocuments(collectionLink, feedOptions)
82+
getListFromFeedResponse(resp).iterator
83+
}
84+
85+
/**
86+
* Takes the results from a FeedResponse and puts them in a standard List. The FeedResponse
87+
* otherwise hides a lot of extra fields behind the Iterator[T] interface that would still
88+
* need to be serialized when being collected on the driver.
89+
* @param response
90+
* @return
91+
*/
92+
private def getListFromFeedResponse[T <: com.microsoft.azure.documentdb.Resource : ClassTag](response: FeedResponse[T]): List[T] = {
93+
response
94+
.getQueryIterable
95+
.iterator
96+
.toList
8197
}
8298

8399
def readChangeFeed(changeFeedOptions: ChangeFeedOptions,

0 commit comments

Comments
 (0)