Skip to content

Commit 14f7a6e

Browse files
gaogaotiantianhuangxiaopingRD
authored andcommitted
[SPARK-54456][PYTHON] Import worker module after fork to avoid deadlock
### What changes were proposed in this pull request? We lazy-import the worker module after fork to avoid potential deadlock caused by importing some modules that spawns multiple threads. ### Why are the changes needed? https://discuss.python.org/t/switching-default-multiprocessing-context-to-spawn-on-posix-as-well/21868 It's impossible to do a thread-safe fork in CPython. CPython started issuing warnings from 3.12 and switched the default `multiprocessing` start method to "spawn" since 3.14. It would be a huge effort for us to give up `fork` entirely, but we can try out best to not import random modules before fork by lazy-importing worker module after fork. We already have some workers that import dangerous libraries like `pyarrow` - `plan_data_source_read` for example. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI should pass. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53166 from gaogaotiantian/move-worker-module-import. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent 3fbca58 commit 14f7a6e

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

python/pyspark/daemon.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,6 @@
3131

3232
from pyspark.serializers import read_int, write_int, write_with_length, UTF8Deserializer
3333

34-
if len(sys.argv) > 1 and sys.argv[1].startswith("pyspark"):
35-
import importlib
36-
37-
worker_module = importlib.import_module(sys.argv[1])
38-
worker_main = worker_module.main
39-
else:
40-
from pyspark.worker import main as worker_main
41-
4234

4335
def compute_real_exit_code(exit_code):
4436
# SystemExit's code can be integer or string, but os._exit only accepts integers
@@ -78,6 +70,19 @@ def worker(sock, authenticated):
7870
return 1
7971

8072
exit_code = 0
73+
74+
# We don't know what could happen when we import the worker module. We have to
75+
# guarantee that no thread is spawned before we fork, so we have to import the
76+
# worker module after fork. For example, both pandas and pyarrow starts some
77+
# threads when they are imported.
78+
if len(sys.argv) > 1 and sys.argv[1].startswith("pyspark"):
79+
import importlib
80+
81+
worker_module = importlib.import_module(sys.argv[1])
82+
worker_main = worker_module.main
83+
else:
84+
from pyspark.worker import main as worker_main
85+
8186
try:
8287
worker_main(infile, outfile)
8388
except SystemExit as exc:

0 commit comments

Comments
 (0)