Skip to content

Commit 5c766e4

Browse files
authored
feat: introduce global await for index creation (#790)
Right before data is being written and right after scripts have been invoked we now call cypher procedure "db.awaitIndexes". This ensures that any custom indexes created from custom scripts will have leeway room to finish construction before we write data. Can be tweaked using option "index.await.timeout". Default 300, set to 0 for old behaviour i.e. no wait.
1 parent 814060e commit 5c766e4

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

common/src/main/scala/org/neo4j/spark/util/Neo4jOptions.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ class Neo4jOptions(private val options: java.util.Map[String, String]) extends S
134134
)
135135
}
136136

137+
val indexAwait = getParameter(INDEX_AWAIT_TIMEOUT_SEC, DEFAULT_INDEX_AWAIT_TIMEOUT_SEC.toString).toInt
138+
137139
val query: Neo4jQueryOptions = (
138140
getParameter(QUERY.toString.toLowerCase),
139141
getParameter(LABELS.toString.toLowerCase),
@@ -564,6 +566,9 @@ object Neo4jOptions {
564566
// map aggregation
565567
val SCHEMA_MAP_GROUP_DUPLICATE_KEYS = "schema.map.group.duplicate.keys"
566568

569+
// index options
570+
val INDEX_AWAIT_TIMEOUT_SEC = "index.await.timeout"
571+
567572
// partitions
568573
val PARTITIONS = "partitions"
569574

@@ -651,6 +656,8 @@ object Neo4jOptions {
651656

652657
val DEFAULT_MAP_GROUP_DUPLICATE_KEYS = false
653658

659+
val DEFAULT_INDEX_AWAIT_TIMEOUT_SEC = 300
660+
654661
var DEFAULT_AUTH_PARAMETERS: Map[String, String] =
655662
Seq("username", "password", "ticket", "principal", "credentials", "realm", "scheme", "token")
656663
.map(name => name -> DEFAULT_EMPTY).toMap

spark-3/src/main/scala/org/neo4j/spark/writer/Neo4jBatchWriter.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ class Neo4jBatchWriter(
4343
val scriptResult = schemaService.execute(neo4jOptions.script)
4444
schemaService.close()
4545

46+
if (neo4jOptions.indexAwait > 0) {
47+
val session = driverCache.getOrCreate().session(neo4jOptions.session.toNeo4jSession())
48+
session.run(s"CALL db.awaitIndexes(${neo4jOptions.indexAwait})").consume()
49+
}
50+
4651
new Neo4jDataWriterFactory(
4752
neo4j,
4853
jobId,

0 commit comments

Comments
 (0)