-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54367][Connect]Propagate close() from SessionState to CatalogPlugins to prevent leak #53078
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Hello @dongjoon-hyun, I am new to the spark community and I had being using spark connect for a while for work. I found out a memory leak in spark connect while dealing with iceberg. I fixed the memory leak locally (it requires changes from both iceberg and spark). Do you mind take a look to see if this is something we should add for OSS when you get a chance? Thanks. |
e86515a to
5cfed61
Compare
| def catalogManager: CatalogManager = analyzer.catalogManager | ||
|
|
||
| override def close(): Unit = { | ||
| catalogManager.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure that the session manager supports cloning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review and feedback. I just want to make sure I fully understand the requirement here. When you mention this, are you thinking of the spark.newSession() case? My main concern is avoid issues where cloning one session could affected a cloned session as now the catalogs may soon be closable as well (https://github.com/apache/iceberg/pull/14590/files#diff-e263afc81b5b90944281c544005466e8671e7756f52b5b59b67a77e7c63aaf13).
Do you mind point me in the right direction for the implementation you have in mind?
Again, thanks so much for your review and time.
| .newBuilder() | ||
| .setResultComplete(ExecutePlanResponse.ResultComplete.newBuilder().build()) | ||
| .build()) | ||
| responseObserver match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this a problem now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added due to failed unit tests with mock object where i was getting class cast exception earlier (sample error can be found in https://github.com/MonkeyCanCode/spark/actions/runs/19394233457/job/55492018508).
Here is the snippet:
[info] org.apache.spark.sql.connect.pipelines.EndToEndAPISuite *** ABORTED *** (81 milliseconds)
[info] java.lang.ClassCastException: class io.grpc.stub.StreamObserver$MockitoMock$lemjBfIA cannot be cast to class org.apache.spark.sql.connect.execution.ExecuteResponseObserver (io.grpc.stub.StreamObserver$MockitoMock$lemjBfIA and org.apache.spark.sql.connect.execution.ExecuteResponseObserver are in unnamed module of loader 'app')
[info] at org.apache.spark.sql.connect.service.SparkConnectListenerBusListener.sendResultComplete(SparkConnectListenerBusListener.scala:140)
[info] at org.apache.spark.sql.connect.service.ServerSideListenerHolder.cleanUp(SparkConnectListenerBusListener.scala:84)
[info] at org.apache.spark.sql.connect.service.SessionHolder.close(SessionHolder.scala:370)
[info] at org.apache.spark.sql.connect.service.SparkConnectSessionManager.shutdownSessionHolder(SparkConnectSessionManager.scala:232)
[info] at org.apache.spark.sql.connect.service.SparkConnectSessionManager.$anonfun$invalidateAllSessions$1(SparkConnectSessionManager.scala:369)
[info] at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
[info] at org.apache.spark.sql.connect.service.SparkConnectSessionManager.invalidateAllSessions(SparkConnectSessionManager.scala:367)
[info] at org.apache.spark.sql.connect.SparkConnectServerTest.clearAllExecutions(SparkConnectServerTest.scala:83)
[info] at org.apache.spark.sql.connect.SparkConnectServerTest.clearAllExecutions$(SparkConnectServerTest.scala:80)
[info] at org.apache.spark.sql.connect.pipelines.EndToEndAPISuite.clearAllExecutions(EndToEndAPISuite.scala:37)
[info] at org.apache.spark.sql.connect.SparkConnectServerTest.beforeEach(SparkConnectServerTest.scala:72)
[info] at org.apache.spark.sql.connect.SparkConnectServerTest.beforeEach$(SparkConnectServerTest.scala:70)
[info] at org.apache.spark.sql.connect.pipelines.EndToEndAPISuite.beforeEach(EndToEndAPISuite.scala:37)
[info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:233)
[info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:323)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68)
[info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
Before this PR, the session cleanup process in the tests was less explicit. Now with proper resources closed, the generic test StreamObserver is causing issue during casting with existed code.
| session = session) | ||
| SparkConnectService.sessionManager.putSessionForTesting(ret) | ||
| if (session != null) { | ||
| ret.initializeSession() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Earlier tests create dummy session that are not yet fully initialized. Now as all sessions got properly closed, calling closed on not initialized sessions was causing failed tests as well.
What changes were proposed in this pull request?
To fix this, I added the following changes:
CatalogPlugininterface extendsjava.io.Closeableclose()method inCatalogManagerthat iterates through all registered catalogs and calls theirclose()methodSessionStateimplementsCloseableand callscatalogManager.close()from itsclose()method.session.sessionState.close()fromSessionHolder.close()when a Spark Connect session is stoppedAbove changes create a clean lifecycle for catalogs when a session ended, a
close()call is propagated down the chain which allow each catalog to release its resources.Why are the changes needed?
Spark Connect server is leaking
SparkSessionobjects each time a client connects and disconnects when dealing with Apache Iceberg Apache Iceberg PR(apache/iceberg#14590).The
SessionHolder.close()method in Spark Connect is responsible for cleaning up a session. It does perform some cleanup such as artifacts and streaming queries but it doesn't perform cleanup on the mainSessionState. This is where theCatalogManagerlives which holds reference toRESTCatalogandS3FileIO. Since theSessionStateis never closed, theseCloseablecatalogs are never closed and their threads leak.Does this PR introduce any user-facing change?
N/A
How was this patch tested?
I have a local setup which can easily reproduce this issue. Here is setup details:
REST catalog: Apache Polaris (created the basic polaris entities via getting start example)
Spark Connect server:
Spark Connect client: install public released apache spark package via pip
Testing config:
spark.connect.session.manager.defaultSessionTimeoutfrom default60mto1mTesting:
org.apache.spark.sql.classic.SparkSessionandorg.apache.spark.sql.internal.SessionStateclose()on spark session implicitly:org.apache.spark.sql.classic.SparkSessionandorg.apache.spark.sql.internal.SessionStateagain and noticed resources are not getting cleanuporg.apache.spark.sql.classic.SparkSessionandorg.apache.spark.sql.internal.SessionStateagain. We will noticed the instances of these classes (along with many others) are not getting cleanup with current code. Also, heap usage will stay high and not able to garbage collected.Was this patch authored or co-authored using generative AI tooling?
No