Skip to content

Commit 5c47be2

Browse files
committed
Revert "[SPARK-54349][PYTHON] Refactor code a bit to simplify faulthandler integration extension"
This reverts commit 6ee5a16.
1 parent 6ee5a16 commit 5c47be2

10 files changed

+173
-138
lines changed

python/pyspark/sql/worker/analyze_udtf.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# limitations under the License.
1616
#
1717

18+
import faulthandler
1819
import inspect
1920
import os
2021
import sys
@@ -34,12 +35,7 @@
3435
from pyspark.sql.functions import OrderingColumn, PartitioningColumn, SelectedColumn
3536
from pyspark.sql.types import _parse_datatype_json_string, StructType
3637
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
37-
from pyspark.util import (
38-
handle_worker_exception,
39-
local_connect_and_auth,
40-
with_faulthandler,
41-
start_faulthandler_periodic_traceback,
42-
)
38+
from pyspark.util import handle_worker_exception, local_connect_and_auth
4339
from pyspark.worker_util import (
4440
check_python_version,
4541
read_command,
@@ -104,7 +100,6 @@ def read_arguments(infile: IO) -> Tuple[List[AnalyzeArgument], Dict[str, Analyze
104100
return args, kwargs
105101

106102

107-
@with_faulthandler
108103
def main(infile: IO, outfile: IO) -> None:
109104
"""
110105
Runs the Python UDTF's `analyze` static method.
@@ -113,10 +108,18 @@ def main(infile: IO, outfile: IO) -> None:
113108
in JVM and receive the Python UDTF and its arguments for the `analyze` static method,
114109
and call the `analyze` static method, and send back a AnalyzeResult as a result of the method.
115110
"""
111+
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
112+
tracebackDumpIntervalSeconds = os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
116113
try:
114+
if faulthandler_log_path:
115+
faulthandler_log_path = os.path.join(faulthandler_log_path, str(os.getpid()))
116+
faulthandler_log_file = open(faulthandler_log_path, "w")
117+
faulthandler.enable(file=faulthandler_log_file)
118+
117119
check_python_version(infile)
118120

119-
start_faulthandler_periodic_traceback()
121+
if tracebackDumpIntervalSeconds is not None and int(tracebackDumpIntervalSeconds) > 0:
122+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), repeat=True)
120123

121124
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))
122125
setup_memory_limits(memory_limit_mb)
@@ -263,6 +266,11 @@ def invalid_analyze_result_field(field_name: str, expected_field: str) -> PySpar
263266
except BaseException as e:
264267
handle_worker_exception(e, outfile)
265268
sys.exit(-1)
269+
finally:
270+
if faulthandler_log_path:
271+
faulthandler.disable()
272+
faulthandler_log_file.close()
273+
os.remove(faulthandler_log_path)
266274

267275
send_accumulator_updates(outfile)
268276

@@ -274,6 +282,9 @@ def invalid_analyze_result_field(field_name: str, expected_field: str) -> PySpar
274282
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
275283
sys.exit(-1)
276284

285+
# Force to cancel dump_traceback_later
286+
faulthandler.cancel_dump_traceback_later()
287+
277288

278289
if __name__ == "__main__":
279290
# Read information about how to connect back to the JVM from the environment.

python/pyspark/sql/worker/commit_data_source_write.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17+
import faulthandler
1718
import os
1819
import sys
1920
from typing import IO
@@ -28,12 +29,7 @@
2829
SpecialLengths,
2930
)
3031
from pyspark.sql.datasource import DataSourceWriter, WriterCommitMessage
31-
from pyspark.util import (
32-
handle_worker_exception,
33-
local_connect_and_auth,
34-
with_faulthandler,
35-
start_faulthandler_periodic_traceback,
36-
)
32+
from pyspark.util import handle_worker_exception, local_connect_and_auth
3733
from pyspark.worker_util import (
3834
check_python_version,
3935
pickleSer,
@@ -44,7 +40,6 @@
4440
)
4541

4642

47-
@with_faulthandler
4843
def main(infile: IO, outfile: IO) -> None:
4944
"""
5045
Main method for committing or aborting a data source write operation.
@@ -54,10 +49,18 @@ def main(infile: IO, outfile: IO) -> None:
5449
responsible for invoking either the `commit` or the `abort` method on a data source
5550
writer instance, given a list of commit messages.
5651
"""
52+
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
53+
tracebackDumpIntervalSeconds = os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
5754
try:
55+
if faulthandler_log_path:
56+
faulthandler_log_path = os.path.join(faulthandler_log_path, str(os.getpid()))
57+
faulthandler_log_file = open(faulthandler_log_path, "w")
58+
faulthandler.enable(file=faulthandler_log_file)
59+
5860
check_python_version(infile)
5961

60-
start_faulthandler_periodic_traceback()
62+
if tracebackDumpIntervalSeconds is not None and int(tracebackDumpIntervalSeconds) > 0:
63+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), repeat=True)
6164

6265
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))
6366
setup_memory_limits(memory_limit_mb)
@@ -103,6 +106,11 @@ def main(infile: IO, outfile: IO) -> None:
103106
except BaseException as e:
104107
handle_worker_exception(e, outfile)
105108
sys.exit(-1)
109+
finally:
110+
if faulthandler_log_path:
111+
faulthandler.disable()
112+
faulthandler_log_file.close()
113+
os.remove(faulthandler_log_path)
106114

107115
send_accumulator_updates(outfile)
108116

@@ -114,6 +122,9 @@ def main(infile: IO, outfile: IO) -> None:
114122
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
115123
sys.exit(-1)
116124

125+
# Force to cancel dump_traceback_later
126+
faulthandler.cancel_dump_traceback_later()
127+
117128

118129
if __name__ == "__main__":
119130
# Read information about how to connect back to the JVM from the environment.

python/pyspark/sql/worker/create_data_source.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17+
import faulthandler
1718
import inspect
1819
import os
1920
import sys
@@ -31,12 +32,7 @@
3132
)
3233
from pyspark.sql.datasource import DataSource, CaseInsensitiveDict
3334
from pyspark.sql.types import _parse_datatype_json_string, StructType
34-
from pyspark.util import (
35-
handle_worker_exception,
36-
local_connect_and_auth,
37-
with_faulthandler,
38-
start_faulthandler_periodic_traceback,
39-
)
35+
from pyspark.util import handle_worker_exception, local_connect_and_auth
4036
from pyspark.worker_util import (
4137
check_python_version,
4238
read_command,
@@ -49,7 +45,6 @@
4945
)
5046

5147

52-
@with_faulthandler
5348
def main(infile: IO, outfile: IO) -> None:
5449
"""
5550
Main method for creating a Python data source instance.
@@ -67,10 +62,18 @@ def main(infile: IO, outfile: IO) -> None:
6762
This process then creates a `DataSource` instance using the above information and
6863
sends the pickled instance as well as the schema back to the JVM.
6964
"""
65+
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
66+
tracebackDumpIntervalSeconds = os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
7067
try:
68+
if faulthandler_log_path:
69+
faulthandler_log_path = os.path.join(faulthandler_log_path, str(os.getpid()))
70+
faulthandler_log_file = open(faulthandler_log_path, "w")
71+
faulthandler.enable(file=faulthandler_log_file)
72+
7173
check_python_version(infile)
7274

73-
start_faulthandler_periodic_traceback()
75+
if tracebackDumpIntervalSeconds is not None and int(tracebackDumpIntervalSeconds) > 0:
76+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), repeat=True)
7477

7578
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))
7679
setup_memory_limits(memory_limit_mb)
@@ -169,6 +172,11 @@ def main(infile: IO, outfile: IO) -> None:
169172
except BaseException as e:
170173
handle_worker_exception(e, outfile)
171174
sys.exit(-1)
175+
finally:
176+
if faulthandler_log_path:
177+
faulthandler.disable()
178+
faulthandler_log_file.close()
179+
os.remove(faulthandler_log_path)
172180

173181
send_accumulator_updates(outfile)
174182

@@ -180,6 +188,9 @@ def main(infile: IO, outfile: IO) -> None:
180188
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
181189
sys.exit(-1)
182190

191+
# Force to cancel dump_traceback_later
192+
faulthandler.cancel_dump_traceback_later()
193+
183194

184195
if __name__ == "__main__":
185196
# Read information about how to connect back to the JVM from the environment.

python/pyspark/sql/worker/data_source_pushdown_filters.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717

1818
import base64
19+
import faulthandler
1920
import json
2021
import os
2122
import sys
@@ -48,12 +49,7 @@
4849
)
4950
from pyspark.sql.types import StructType, VariantVal, _parse_datatype_json_string
5051
from pyspark.sql.worker.plan_data_source_read import write_read_func_and_partitions
51-
from pyspark.util import (
52-
handle_worker_exception,
53-
local_connect_and_auth,
54-
with_faulthandler,
55-
start_faulthandler_periodic_traceback,
56-
)
52+
from pyspark.util import handle_worker_exception, local_connect_and_auth
5753
from pyspark.worker_util import (
5854
check_python_version,
5955
pickleSer,
@@ -123,7 +119,6 @@ def deserializeFilter(jsonDict: dict) -> Filter:
123119
return filter
124120

125121

126-
@with_faulthandler
127122
def main(infile: IO, outfile: IO) -> None:
128123
"""
129124
Main method for planning a data source read with filter pushdown.
@@ -145,10 +140,18 @@ def main(infile: IO, outfile: IO) -> None:
145140
on the reader and determines which filters are supported. The indices of the supported
146141
filters are sent back to the JVM, along with the list of partitions and the read function.
147142
"""
143+
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
144+
tracebackDumpIntervalSeconds = os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
148145
try:
146+
if faulthandler_log_path:
147+
faulthandler_log_path = os.path.join(faulthandler_log_path, str(os.getpid()))
148+
faulthandler_log_file = open(faulthandler_log_path, "w")
149+
faulthandler.enable(file=faulthandler_log_file)
150+
149151
check_python_version(infile)
150152

151-
start_faulthandler_periodic_traceback()
153+
if tracebackDumpIntervalSeconds is not None and int(tracebackDumpIntervalSeconds) > 0:
154+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), repeat=True)
152155

153156
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))
154157
setup_memory_limits(memory_limit_mb)
@@ -255,6 +258,11 @@ def main(infile: IO, outfile: IO) -> None:
255258
except BaseException as e:
256259
handle_worker_exception(e, outfile)
257260
sys.exit(-1)
261+
finally:
262+
if faulthandler_log_path:
263+
faulthandler.disable()
264+
faulthandler_log_file.close()
265+
os.remove(faulthandler_log_path)
258266

259267
send_accumulator_updates(outfile)
260268

@@ -266,6 +274,9 @@ def main(infile: IO, outfile: IO) -> None:
266274
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
267275
sys.exit(-1)
268276

277+
# Force to cancel dump_traceback_later
278+
faulthandler.cancel_dump_traceback_later()
279+
269280

270281
if __name__ == "__main__":
271282
# Read information about how to connect back to the JVM from the environment.

python/pyspark/sql/worker/lookup_data_sources.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17+
import faulthandler
1718
from importlib import import_module
1819
from pkgutil import iter_modules
1920
import os
@@ -28,12 +29,7 @@
2829
SpecialLengths,
2930
)
3031
from pyspark.sql.datasource import DataSource
31-
from pyspark.util import (
32-
handle_worker_exception,
33-
local_connect_and_auth,
34-
with_faulthandler,
35-
start_faulthandler_periodic_traceback,
36-
)
32+
from pyspark.util import handle_worker_exception, local_connect_and_auth
3733
from pyspark.worker_util import (
3834
check_python_version,
3935
pickleSer,
@@ -44,7 +40,6 @@
4440
)
4541

4642

47-
@with_faulthandler
4843
def main(infile: IO, outfile: IO) -> None:
4944
"""
5045
Main method for looking up the available Python Data Sources in Python path.
@@ -56,10 +51,18 @@ def main(infile: IO, outfile: IO) -> None:
5651
This is responsible for searching the available Python Data Sources so they can be
5752
statically registered automatically.
5853
"""
54+
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
55+
tracebackDumpIntervalSeconds = os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
5956
try:
57+
if faulthandler_log_path:
58+
faulthandler_log_path = os.path.join(faulthandler_log_path, str(os.getpid()))
59+
faulthandler_log_file = open(faulthandler_log_path, "w")
60+
faulthandler.enable(file=faulthandler_log_file)
61+
6062
check_python_version(infile)
6163

62-
start_faulthandler_periodic_traceback()
64+
if tracebackDumpIntervalSeconds is not None and int(tracebackDumpIntervalSeconds) > 0:
65+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), repeat=True)
6366

6467
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))
6568
setup_memory_limits(memory_limit_mb)
@@ -86,6 +89,11 @@ def main(infile: IO, outfile: IO) -> None:
8689
except BaseException as e:
8790
handle_worker_exception(e, outfile)
8891
sys.exit(-1)
92+
finally:
93+
if faulthandler_log_path:
94+
faulthandler.disable()
95+
faulthandler_log_file.close()
96+
os.remove(faulthandler_log_path)
8997

9098
send_accumulator_updates(outfile)
9199

@@ -97,6 +105,9 @@ def main(infile: IO, outfile: IO) -> None:
97105
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
98106
sys.exit(-1)
99107

108+
# Force to cancel dump_traceback_later
109+
faulthandler.cancel_dump_traceback_later()
110+
100111

101112
if __name__ == "__main__":
102113
# Read information about how to connect back to the JVM from the environment.

0 commit comments

Comments
 (0)