-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54367][Connect]Propagate close() from SessionState to CatalogPlugins to prevent leak #53078
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,9 @@ object SparkConnectTestUtils { | |
| sessionId = UUID.randomUUID().toString, | ||
| session = session) | ||
| SparkConnectService.sessionManager.putSessionForTesting(ret) | ||
| if (session != null) { | ||
| ret.initializeSession() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this needed now?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Earlier tests create dummy session that are not yet fully initialized. Now as all sessions got properly closed, calling closed on not initialized sessions was causing failed tests as well. |
||
| } | ||
| ret | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.spark.sql.internal | ||
|
|
||
| import java.io.File | ||
| import java.io.{Closeable, File} | ||
| import java.net.URI | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
|
|
@@ -91,7 +91,7 @@ private[sql] class SessionState( | |
| val columnarRules: Seq[ColumnarRule], | ||
| val adaptiveRulesHolder: AdaptiveRulesHolder, | ||
| val planNormalizationRules: Seq[Rule[LogicalPlan]], | ||
| val artifactManagerBuilder: () => ArtifactManager) { | ||
| val artifactManagerBuilder: () => ArtifactManager) extends Closeable { | ||
|
|
||
| // The following fields are lazy to avoid creating the Hive client when creating SessionState. | ||
| lazy val catalog: SessionCatalog = catalogBuilder() | ||
|
|
@@ -110,6 +110,10 @@ private[sql] class SessionState( | |
|
|
||
| def catalogManager: CatalogManager = analyzer.catalogManager | ||
|
|
||
| override def close(): Unit = { | ||
| catalogManager.close() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to make sure that the session manager supports cloning.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the review and feedback. I just want to make sure I fully understand the requirement here. When you mention this, are you thinking of the Do you mind point me in the right direction for the implementation you have in mind? Again, thanks so much for your review and time. |
||
| } | ||
|
|
||
| def newHadoopConf(): Configuration = SessionState.newHadoopConf( | ||
| sharedState.sparkContext.hadoopConfiguration, | ||
| conf) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this a problem now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added due to failed unit tests with mock object where i was getting class cast exception earlier (sample error can be found in https://github.com/MonkeyCanCode/spark/actions/runs/19394233457/job/55492018508).
Here is the snippet:
Before this PR, the session cleanup process in the tests was less explicit. Now with proper resources closed, the generic test
StreamObserveris causing issue during casting with existed code.