Skip to content

Commit ccdd720

Browse files
sryzadongjoon-hyun
authored andcommitted
[SPARK-54358][SDP][FOLLOWUP] Add tableIdentifierToPathString helper method for SDP checkpoint path construction
### What changes were proposed in this pull request? Followups from #53070 to improve code clarity. ### Why are the changes needed? Make sure the code for constructing SDP checkpoint directory paths is clear. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? Closes #53089 from sryza/collide-followups. Authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 10d1b4c) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 1016663 commit ccdd720

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SystemMetadata.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.util.Try
2222
import org.apache.hadoop.fs.Path
2323

2424
import org.apache.spark.internal.{Logging, LogKeys}
25+
import org.apache.spark.sql.catalyst.TableIdentifier
2526
import org.apache.spark.sql.classic.SparkSession
2627

2728
sealed trait SystemMetadata {}
@@ -44,7 +45,9 @@ case class FlowSystemMetadata(
4445
Option(if (graph.table.contains(flow.destinationIdentifier) ||
4546
graph.sink.contains(flow.destinationIdentifier)) {
4647
val checkpointRoot = new Path(context.storageRoot, "_checkpoints")
47-
val flowTableId = flow.destinationIdentifier.nameParts.mkString(Path.SEPARATOR)
48+
// Different tables in the pipeline can have flows with the same name, so we include
49+
// the table's fully qualified identifier in the path to avoid collisions.
50+
val flowTableId = tableIdentifierToPathString(flow.destinationIdentifier)
4851
val flowName = flow.identifier.table
4952
val checkpointDir = new Path(
5053
new Path(checkpointRoot, flowTableId),
@@ -62,6 +65,13 @@ case class FlowSystemMetadata(
6265
})
6366
}
6467

68+
/**
69+
* Converts a TableIdentifier to a path string by joining its name parts with the path separator.
70+
*/
71+
private def tableIdentifierToPathString(tableIdentifier: TableIdentifier): String = {
72+
tableIdentifier.nameParts.mkString(Path.SEPARATOR)
73+
}
74+
6575
/** Returns the location for the most recent checkpoint of a given flow. */
6676
def latestCheckpointLocation: String = {
6777
val checkpointsDir = flowCheckpointsDirOpt().get

0 commit comments

Comments
 (0)