Skip to content

Commit 94e3fc3

Browse files
nickrobinson251NHDalyDrvi
authored
Trigger a profile before terminating the worker upon timeout (#120)
* Trigger a profile before terminating the worker upon timeout * Introduce `timeout_profile_wait` to control timeout-triggered profiles + tests * Print captured logs after we finish writing the profile --------- Co-authored-by: Nathan Daly <NHDaly@gmail.com> Co-authored-by: Tomáš Drvoštěp <tomas.drvostep@gmail.com>
1 parent 5b8aa58 commit 94e3fc3

File tree

6 files changed

+156
-15
lines changed

6 files changed

+156
-15
lines changed

Project.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Dates = "1"
1717
Logging = "1"
1818
LoggingExtras = "1"
1919
Pkg = "1"
20+
Profile = "1"
2021
Random = "1"
2122
Serialization = "1"
2223
Sockets = "1"
@@ -30,9 +31,10 @@ DeepDiffs = "ab62b9b5-e342-54a8-a765-a90f495de1a6"
3031
IOCapture = "b5f81e59-6552-4d32-b1f0-c071b021bf89"
3132
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
3233
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
34+
Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"
3335
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
3436
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
3537
XMLDict = "228000da-037f-5747-90a9-8195ccbf91a5"
3638

3739
[targets]
38-
test = ["AutoHashEquals", "DeepDiffs", "IOCapture", "Logging", "Pkg", "Random", "Test", "XMLDict"]
40+
test = ["AutoHashEquals", "DeepDiffs", "IOCapture", "Logging", "Pkg", "Profile", "Random", "Test", "XMLDict"]

src/ReTestItems.jl

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ will be run.
192192
`paths` passed to it cannot contain test files, either because the path doesn't exist or
193193
the path points to a file which is not a test file. Default is `false`.
194194
Can also be set using the `RETESTITEMS_VALIDATE_PATHS` environment variable.
195+
- `timeout_profile_wait::Real=0`: When non-zero, a worker that times-out will trigger a CPU profile
196+
for which we will wait `timeout_profile_wait` seconds before terminating the worker.
197+
Zero means no profile will be taken. Can also be set using the `RETESTITEMS_TIMEOUT_PROFILE_WAIT`
198+
environment variable. See the [Profile documentation](https://docs.julialang.org/en/v1/stdlib/Profile/#Triggered-During-Execution)
199+
for more information on triggered profiles. Note you can use `worker_init_expr` to tweak the profile settings on workers.
195200
"""
196201
function runtests end
197202

@@ -237,14 +242,16 @@ function runtests(
237242
verbose_results::Bool=(logs !== :issues && isinteractive()),
238243
test_end_expr::Expr=Expr(:block),
239244
validate_paths::Bool=parse(Bool, get(ENV, "RETESTITEMS_VALIDATE_PATHS", "false")),
245+
timeout_profile_wait::Real=parse(Int, get(ENV, "RETESTITEMS_TIMEOUT_PROFILE_WAIT", "0")),
240246
)
241247
nworker_threads = _validated_nworker_threads(nworker_threads)
242248
paths′ = _validated_paths(paths, validate_paths)
243249

244250
logs in LOG_DISPLAY_MODES || throw(ArgumentError("`logs` must be one of $LOG_DISPLAY_MODES, got $(repr(logs))"))
245251
report && logs == :eager && throw(ArgumentError("`report=true` is not compatible with `logs=:eager`"))
246252
(0 memory_threshold 1) || throw(ArgumentError("`memory_threshold` must be between 0 and 1, got $(repr(memory_threshold))"))
247-
testitem_timeout > 0 || throw(ArgumentError("`testitem_timeout` must be a postive number, got $(repr(testitem_timeout))"))
253+
testitem_timeout > 0 || throw(ArgumentError("`testitem_timeout` must be a positive number, got $(repr(testitem_timeout))"))
254+
timeout_profile_wait >= 0 || throw(ArgumentError("`timeout_profile_wait` must be a non-negative number, got $(repr(timeout_profile_wait))"))
248255
# If we were given paths but none were valid, then nothing to run.
249256
!isempty(paths) && isempty(paths′) && return nothing
250257
shouldrun_combined(ti) = shouldrun(ti) && _shouldrun(name, ti.name) && _shouldrun(tags, ti.tags)
@@ -253,13 +260,15 @@ function runtests(
253260
nworkers = max(0, nworkers)
254261
retries = max(0, retries)
255262
timeout = ceil(Int, testitem_timeout)
263+
timeout_profile_wait = ceil(Int, timeout_profile_wait)
264+
(timeout_profile_wait > 0 && Sys.iswindows()) && @warn "CPU profiles on timeout is not supported on Windows, ignoring `timeout_profile_wait`"
256265
debuglvl = Int(debug)
257266
if debuglvl > 0
258267
LoggingExtras.withlevel(LoggingExtras.Debug; verbosity=debuglvl) do
259-
_runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs)
268+
_runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs, timeout_profile_wait)
260269
end
261270
else
262-
return _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs)
271+
return _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs, timeout_profile_wait)
263272
end
264273
end
265274

@@ -273,7 +282,7 @@ end
273282
# By tracking and reusing test environments, we can avoid this issue.
274283
const TEST_ENVS = Dict{String, String}()
275284

276-
function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, worker_init_expr::Expr, test_end_expr::Expr, testitem_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol)
285+
function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, worker_init_expr::Expr, test_end_expr::Expr, testitem_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol, timeout_profile_wait::Int)
277286
# Don't recursively call `runtests` e.g. if we `include` a file which calls it.
278287
# So we ignore the `runtests(...)` call in `test/runtests.jl` when `runtests(...)`
279288
# was called from the command line.
@@ -293,7 +302,7 @@ function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, wor
293302
if is_running_test_runtests_jl(proj_file)
294303
# Assume this is `Pkg.test`, so test env already active.
295304
@debugv 2 "Running in current environment `$(Base.active_project())`"
296-
return _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs)
305+
return _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs, timeout_profile_wait)
297306
else
298307
@debugv 1 "Activating test environment for `$proj_file`"
299308
orig_proj = Base.active_project()
@@ -306,7 +315,7 @@ function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, wor
306315
testenv = TestEnv.activate()
307316
TEST_ENVS[proj_file] = testenv
308317
end
309-
_runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs)
318+
_runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs, timeout_profile_wait)
310319
finally
311320
Base.set_active_project(orig_proj)
312321
end
@@ -317,6 +326,7 @@ end
317326
function _runtests_in_current_env(
318327
shouldrun, paths, projectfile::String, nworkers::Int, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr,
319328
testitem_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol,
329+
timeout_profile_wait::Int,
320330
)
321331
start_time = time()
322332
proj_name = something(Pkg.Types.read_project(projectfile).name, "")
@@ -381,7 +391,7 @@ function _runtests_in_current_env(
381391
ti = starting[i]
382392
@spawn begin
383393
with_logger(original_logger) do
384-
manage_worker($w, $proj_name, $testitems, $ti, $nworker_threads, $worker_init_expr, $test_end_expr, $testitem_timeout, $retries, $memory_threshold, $verbose_results, $debug, $report, $logs)
394+
manage_worker($w, $proj_name, $testitems, $ti, $nworker_threads, $worker_init_expr, $test_end_expr, $testitem_timeout, $retries, $memory_threshold, $verbose_results, $debug, $report, $logs, $timeout_profile_wait)
385395
end
386396
end
387397
end
@@ -492,7 +502,7 @@ end
492502

493503
function manage_worker(
494504
worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr,
495-
default_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol
505+
default_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol, timeout_profile_wait::Int
496506
)
497507
ntestitems = length(testitems.testitems)
498508
run_number = 1
@@ -551,23 +561,35 @@ function manage_worker(
551561
end
552562
catch e
553563
@debugv 2 "Error" exception=e
554-
println(DEFAULT_STDOUT[])
555-
_print_captured_logs(DEFAULT_STDOUT[], testitem, run_number)
556564
# Handle the exception
557565
if e isa TimeoutException
558-
@debugv 1 "Test item $(repr(testitem.name)) timed out. Terminating worker $worker"
559-
terminate!(worker)
566+
if timeout_profile_wait > 0
567+
@warn "$worker timed out running test item $(repr(testitem.name)) after $timeout seconds. \
568+
A CPU profile will be triggered on the worker and then it will be terminated."
569+
trigger_profile(worker, timeout_profile_wait, :timeout)
570+
end
571+
terminate!(worker, :timeout)
560572
wait(worker)
573+
# TODO: We print the captured logs after the worker is terminated,
574+
# which means that we include an annoying stackrace from the worker termination,
575+
# but the profiles don't seem to get flushed properly if we don't do this.
576+
# This is not an issue with eager logs, but when going through a file, this seems to help.
577+
println(DEFAULT_STDOUT[])
578+
_print_captured_logs(DEFAULT_STDOUT[], testitem, run_number)
561579
@error "$worker timed out running test item $(repr(testitem.name)) after $timeout seconds. \
562580
Recording test error."
563581
record_timeout!(testitem, run_number, timeout)
564582
elseif e isa WorkerTerminatedException
583+
println(DEFAULT_STDOUT[])
584+
_print_captured_logs(DEFAULT_STDOUT[], testitem, run_number)
565585
@error "$worker died running test item $(repr(testitem.name)). \
566586
Recording test error."
567587
record_worker_terminated!(testitem, worker, run_number)
568588
else
569589
# We don't expect any other kind of error, so rethrow, which will propagate
570590
# back up to the main coordinator task and throw to the user
591+
println(DEFAULT_STDOUT[])
592+
_print_captured_logs(DEFAULT_STDOUT[], testitem, run_number)
571593
rethrow()
572594
end
573595
# Handle retries

src/log_capture.jl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ If target is String it is assumed it is a file path.
107107
_redirect_logs(f, path::String) = open(io->_redirect_logs(f, io), path, "w")
108108
function _redirect_logs(f, target::IO)
109109
target === DEFAULT_STDOUT[] && return f()
110-
colored_io = IOContext(target, :color => get(DEFAULT_STDOUT[], :color, false))
110+
# If we're not doing :eager logs, make sure the displaysize is large so we don't truncate
111+
# CPU profiles.
112+
colored_io = IOContext(target, :color => get(DEFAULT_STDOUT[], :color, false), :displaysize => (10000,10000))
111113
# In case the default logger was changed by the user, we need to make sure the new logstate
112114
# is poinitng to the new stderr.
113115
# Adapted from https://github.com/JuliaIO/Suppressor.jl/blob/cbfc46f1450b03d6b69dad4c35de739290ff0aff/src/Suppressor.jl#L158-L161

src/workers.jl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module Workers
33
using Sockets, Serialization
44

55
export Worker, remote_eval, remote_fetch, terminate!, WorkerTerminatedException
6+
export trigger_profile
67

78
function try_with_timeout(f, timeout)
89
cond = Threads.Condition()
@@ -105,6 +106,25 @@ function watch_and_terminate!(w::Worker, ev::Threads.Event)
105106
true
106107
end
107108

109+
# Send signal to the given `Worker` process to trigger a profile.
110+
# Users can customise this profiling in the usual way, e.g. via
111+
# `JULIA_PROFILE_PEEK_HEAP_SNAPSHOT`, but `Profile.set_peek_duration`, `Profile.peek_report[]`
112+
# would have to be modified in the worker process.
113+
# See https://docs.julialang.org/en/v1/stdlib/Profile/#Triggered-During-Execution
114+
# Called when timeout_profile_wait is non-zero.
115+
function trigger_profile(w::Worker, timeout_profile_wait, from::Symbol=:manual)
116+
if !Sys.iswindows()
117+
@debug "sending profile request to worker $(w.pid) from $from"
118+
if Sys.islinux()
119+
kill(w.process, 10) # SIGUSR1
120+
elseif Sys.isbsd()
121+
kill(w.process, 29) # SIGINFO
122+
end
123+
sleep(timeout_profile_wait) # Leave time for it to print the profile.
124+
end
125+
return nothing
126+
end
127+
108128
# gracefully terminate a worker by sending a shutdown message
109129
# and waiting for the other tasks to perform worker shutdown
110130
function Base.close(w::Worker)

test/integrationtests.jl

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ end
683683
@test err.value == string(ErrorException("Timed out after 4s running test item \"Test item takes 60 seconds\" (run=1)"))
684684

685685
for t in (0, -1.1)
686-
expected = ArgumentError("`testitem_timeout` must be a postive number, got $t")
686+
expected = ArgumentError("`testitem_timeout` must be a positive number, got $t")
687687
@test_throws expected runtests(file; nworkers, testitem_timeout=t)
688688
end
689689
end
@@ -819,6 +819,77 @@ end
819819
@test ts.time_end - ts.time_start timeout
820820
end
821821

822+
@testset "CPU profile timeout trigger" begin
823+
using Profile
824+
# We're only testing that the signal was registered and that the stacktrace was printed.
825+
# We also tried testing that the CPU profile was displayed here, but that was too flaky in CI.
826+
function capture_timeout_profile(f, timeout_profile_wait; kwargs...)
827+
logs = mktemp() do path, io
828+
redirect_stdio(stdout=io, stderr=io, stdin=devnull) do
829+
encased_testset() do
830+
if isnothing(timeout_profile_wait)
831+
runtests(joinpath(TEST_FILES_DIR, "_timeout_tests.jl"); nworkers=1, testitem_timeout=3, kwargs...)
832+
else
833+
runtests(joinpath(TEST_FILES_DIR, "_timeout_tests.jl"); nworkers=1, testitem_timeout=3, timeout_profile_wait, kwargs...)
834+
end
835+
end
836+
end
837+
flush(io)
838+
close(io)
839+
read(path, String)
840+
end
841+
f(logs)
842+
@assert occursin("timed out running test item \"Test item takes 60 seconds\" after 3 seconds", logs)
843+
return logs
844+
end
845+
846+
@testset "timeout_profile_wait=0 means no CPU profile" begin
847+
capture_timeout_profile(0) do logs
848+
@test !occursin("Information request received", logs)
849+
end
850+
end
851+
852+
853+
default_peektime = Profile.get_peek_duration()
854+
@testset "non-zero timeout_profile_wait means we collect a CPU profile" begin
855+
capture_timeout_profile(5) do logs
856+
@test occursin("Information request received. A stacktrace will print followed by a $(default_peektime) second profile", logs)
857+
@test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows)
858+
@test occursin("Profile collected.", logs)
859+
end
860+
end
861+
862+
863+
@testset "`set_peek_duration` is respected in `worker_init_expr`" begin
864+
capture_timeout_profile(5, worker_init_expr=:(using Profile; Profile.set_peek_duration($default_peektime + 1.0))) do logs
865+
@test occursin("Information request received. A stacktrace will print followed by a $(default_peektime + 1.0) second profile", logs)
866+
@test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows)
867+
@test occursin("Profile collected.", logs)
868+
end
869+
end
870+
871+
872+
# The RETESTITEMS_TIMEOUT_PROFILE_WAIT environment variable can be used to set the timeout_profile_wait.
873+
@testset "RETESTITEMS_TIMEOUT_PROFILE_WAIT environment variable" begin
874+
withenv("RETESTITEMS_TIMEOUT_PROFILE_WAIT" => "5") do
875+
capture_timeout_profile(nothing) do logs
876+
@test occursin("Information request received", logs)
877+
@test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows)
878+
@test occursin("Profile collected.", logs)
879+
end
880+
end
881+
end
882+
883+
# The profile is collected for each worker thread.
884+
@testset "CPU profile with $(repr(log_capture))" for log_capture in (:eager, :batched)
885+
capture_timeout_profile(5, nworker_threads=VERSION >= v"1.9" ? "3,2" : "3", logs=log_capture) do logs
886+
@test occursin("Information request received", logs)
887+
@test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows)
888+
@test occursin("Profile collected.", logs)
889+
end
890+
end
891+
end
892+
822893
@testset "worker always crashes immediately" begin
823894
file = joinpath(TEST_FILES_DIR, "_happy_tests.jl")
824895

test/workers.jl

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,28 @@ using Test
9393
close(w)
9494
end
9595

96+
@testset "CPU profile" begin
97+
logs = mktemp() do path, io
98+
w = Worker(threads=VERSION >= v"1.9" ? "3,2" : "3", worker_redirect_io=io)
99+
fut = remote_eval(w, :(sleep(5), yield()))
100+
sleep(0.5)
101+
trigger_profile(w, 1, :test)
102+
fetch(fut)
103+
close(w)
104+
flush(io)
105+
close(io)
106+
return read(path, String)
107+
end
108+
109+
@test occursin(r"Thread 1 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
110+
@test occursin(r"Thread 2 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
111+
@test occursin(r"Thread 3 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
112+
if VERSION >= v"1.9"
113+
@test occursin(r"Thread 4 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
114+
@test occursin(r"Thread 5 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
115+
@test !occursin(r"Thread 6 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
116+
else
117+
@test !occursin(r"Thread 4 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
118+
end
119+
end
96120
end # workers.jl testset

0 commit comments

Comments
 (0)