diff --git a/Project.toml b/Project.toml index 9b37bc3c..6881085a 100644 --- a/Project.toml +++ b/Project.toml @@ -17,6 +17,7 @@ Dates = "1" Logging = "1" LoggingExtras = "1" Pkg = "1" +Profile = "1" Random = "1" Serialization = "1" Sockets = "1" @@ -30,9 +31,10 @@ DeepDiffs = "ab62b9b5-e342-54a8-a765-a90f495de1a6" IOCapture = "b5f81e59-6552-4d32-b1f0-c071b021bf89" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" +Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" XMLDict = "228000da-037f-5747-90a9-8195ccbf91a5" [targets] -test = ["AutoHashEquals", "DeepDiffs", "IOCapture", "Logging", "Pkg", "Random", "Test", "XMLDict"] +test = ["AutoHashEquals", "DeepDiffs", "IOCapture", "Logging", "Pkg", "Profile", "Random", "Test", "XMLDict"] diff --git a/src/ReTestItems.jl b/src/ReTestItems.jl index 7acf5677..878e9934 100644 --- a/src/ReTestItems.jl +++ b/src/ReTestItems.jl @@ -192,6 +192,11 @@ will be run. `paths` passed to it cannot contain test files, either because the path doesn't exist or the path points to a file which is not a test file. Default is `false`. Can also be set using the `RETESTITEMS_VALIDATE_PATHS` environment variable. +- `timeout_profile_wait::Real=0`: When non-zero, a worker that times-out will trigger a CPU profile + for which we will wait `timeout_profile_wait` seconds before terminating the worker. + Zero means no profile will be taken. Can also be set using the `RETESTITEMS_TIMEOUT_PROFILE_WAIT` + environment variable. See the [Profile documentation](https://docs.julialang.org/en/v1/stdlib/Profile/#Triggered-During-Execution) + for more information on triggered profiles. Note you can use `worker_init_expr` to tweak the profile settings on workers. """ function runtests end @@ -237,6 +242,7 @@ function runtests( verbose_results::Bool=(logs !== :issues && isinteractive()), test_end_expr::Expr=Expr(:block), validate_paths::Bool=parse(Bool, get(ENV, "RETESTITEMS_VALIDATE_PATHS", "false")), + timeout_profile_wait::Real=parse(Int, get(ENV, "RETESTITEMS_TIMEOUT_PROFILE_WAIT", "0")), ) nworker_threads = _validated_nworker_threads(nworker_threads) paths′ = _validated_paths(paths, validate_paths) @@ -244,7 +250,8 @@ function runtests( logs in LOG_DISPLAY_MODES || throw(ArgumentError("`logs` must be one of $LOG_DISPLAY_MODES, got $(repr(logs))")) report && logs == :eager && throw(ArgumentError("`report=true` is not compatible with `logs=:eager`")) (0 ≤ memory_threshold ≤ 1) || throw(ArgumentError("`memory_threshold` must be between 0 and 1, got $(repr(memory_threshold))")) - testitem_timeout > 0 || throw(ArgumentError("`testitem_timeout` must be a postive number, got $(repr(testitem_timeout))")) + testitem_timeout > 0 || throw(ArgumentError("`testitem_timeout` must be a positive number, got $(repr(testitem_timeout))")) + timeout_profile_wait >= 0 || throw(ArgumentError("`timeout_profile_wait` must be a non-negative number, got $(repr(timeout_profile_wait))")) # If we were given paths but none were valid, then nothing to run. !isempty(paths) && isempty(paths′) && return nothing shouldrun_combined(ti) = shouldrun(ti) && _shouldrun(name, ti.name) && _shouldrun(tags, ti.tags) @@ -253,13 +260,15 @@ function runtests( nworkers = max(0, nworkers) retries = max(0, retries) timeout = ceil(Int, testitem_timeout) + timeout_profile_wait = ceil(Int, timeout_profile_wait) + (timeout_profile_wait > 0 && Sys.iswindows()) && @warn "CPU profiles on timeout is not supported on Windows, ignoring `timeout_profile_wait`" debuglvl = Int(debug) if debuglvl > 0 LoggingExtras.withlevel(LoggingExtras.Debug; verbosity=debuglvl) do - _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs) + _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs, timeout_profile_wait) end else - return _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs) + return _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs, timeout_profile_wait) end end @@ -273,7 +282,7 @@ end # By tracking and reusing test environments, we can avoid this issue. const TEST_ENVS = Dict{String, String}() -function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, worker_init_expr::Expr, test_end_expr::Expr, testitem_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol) +function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, worker_init_expr::Expr, test_end_expr::Expr, testitem_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol, timeout_profile_wait::Int) # Don't recursively call `runtests` e.g. if we `include` a file which calls it. # So we ignore the `runtests(...)` call in `test/runtests.jl` when `runtests(...)` # was called from the command line. @@ -293,7 +302,7 @@ function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, wor if is_running_test_runtests_jl(proj_file) # Assume this is `Pkg.test`, so test env already active. @debugv 2 "Running in current environment `$(Base.active_project())`" - return _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs) + return _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs, timeout_profile_wait) else @debugv 1 "Activating test environment for `$proj_file`" orig_proj = Base.active_project() @@ -306,7 +315,7 @@ function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, wor testenv = TestEnv.activate() TEST_ENVS[proj_file] = testenv end - _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs) + _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs, timeout_profile_wait) finally Base.set_active_project(orig_proj) end @@ -317,6 +326,7 @@ end function _runtests_in_current_env( shouldrun, paths, projectfile::String, nworkers::Int, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr, testitem_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol, + timeout_profile_wait::Int, ) start_time = time() proj_name = something(Pkg.Types.read_project(projectfile).name, "") @@ -381,7 +391,7 @@ function _runtests_in_current_env( ti = starting[i] @spawn begin with_logger(original_logger) do - manage_worker($w, $proj_name, $testitems, $ti, $nworker_threads, $worker_init_expr, $test_end_expr, $testitem_timeout, $retries, $memory_threshold, $verbose_results, $debug, $report, $logs) + manage_worker($w, $proj_name, $testitems, $ti, $nworker_threads, $worker_init_expr, $test_end_expr, $testitem_timeout, $retries, $memory_threshold, $verbose_results, $debug, $report, $logs, $timeout_profile_wait) end end end @@ -492,7 +502,7 @@ end function manage_worker( worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr, - default_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol + default_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol, timeout_profile_wait::Int ) ntestitems = length(testitems.testitems) run_number = 1 @@ -551,23 +561,35 @@ function manage_worker( end catch e @debugv 2 "Error" exception=e - println(DEFAULT_STDOUT[]) - _print_captured_logs(DEFAULT_STDOUT[], testitem, run_number) # Handle the exception if e isa TimeoutException - @debugv 1 "Test item $(repr(testitem.name)) timed out. Terminating worker $worker" - terminate!(worker) + if timeout_profile_wait > 0 + @warn "$worker timed out running test item $(repr(testitem.name)) after $timeout seconds. \ + A CPU profile will be triggered on the worker and then it will be terminated." + trigger_profile(worker, timeout_profile_wait, :timeout) + end + terminate!(worker, :timeout) wait(worker) + # TODO: We print the captured logs after the worker is terminated, + # which means that we include an annoying stackrace from the worker termination, + # but the profiles don't seem to get flushed properly if we don't do this. + # This is not an issue with eager logs, but when going through a file, this seems to help. + println(DEFAULT_STDOUT[]) + _print_captured_logs(DEFAULT_STDOUT[], testitem, run_number) @error "$worker timed out running test item $(repr(testitem.name)) after $timeout seconds. \ Recording test error." record_timeout!(testitem, run_number, timeout) elseif e isa WorkerTerminatedException + println(DEFAULT_STDOUT[]) + _print_captured_logs(DEFAULT_STDOUT[], testitem, run_number) @error "$worker died running test item $(repr(testitem.name)). \ Recording test error." record_worker_terminated!(testitem, worker, run_number) else # We don't expect any other kind of error, so rethrow, which will propagate # back up to the main coordinator task and throw to the user + println(DEFAULT_STDOUT[]) + _print_captured_logs(DEFAULT_STDOUT[], testitem, run_number) rethrow() end # Handle retries diff --git a/src/log_capture.jl b/src/log_capture.jl index 968cee70..6d36ac9d 100644 --- a/src/log_capture.jl +++ b/src/log_capture.jl @@ -107,7 +107,9 @@ If target is String it is assumed it is a file path. _redirect_logs(f, path::String) = open(io->_redirect_logs(f, io), path, "w") function _redirect_logs(f, target::IO) target === DEFAULT_STDOUT[] && return f() - colored_io = IOContext(target, :color => get(DEFAULT_STDOUT[], :color, false)) + # If we're not doing :eager logs, make sure the displaysize is large so we don't truncate + # CPU profiles. + colored_io = IOContext(target, :color => get(DEFAULT_STDOUT[], :color, false), :displaysize => (10000,10000)) # In case the default logger was changed by the user, we need to make sure the new logstate # is poinitng to the new stderr. # Adapted from https://github.com/JuliaIO/Suppressor.jl/blob/cbfc46f1450b03d6b69dad4c35de739290ff0aff/src/Suppressor.jl#L158-L161 diff --git a/src/workers.jl b/src/workers.jl index 94f01521..ebaac6f3 100644 --- a/src/workers.jl +++ b/src/workers.jl @@ -3,6 +3,7 @@ module Workers using Sockets, Serialization export Worker, remote_eval, remote_fetch, terminate!, WorkerTerminatedException +export trigger_profile function try_with_timeout(f, timeout) cond = Threads.Condition() @@ -105,6 +106,25 @@ function watch_and_terminate!(w::Worker, ev::Threads.Event) true end +# Send signal to the given `Worker` process to trigger a profile. +# Users can customise this profiling in the usual way, e.g. via +# `JULIA_PROFILE_PEEK_HEAP_SNAPSHOT`, but `Profile.set_peek_duration`, `Profile.peek_report[]` +# would have to be modified in the worker process. +# See https://docs.julialang.org/en/v1/stdlib/Profile/#Triggered-During-Execution +# Called when timeout_profile_wait is non-zero. +function trigger_profile(w::Worker, timeout_profile_wait, from::Symbol=:manual) + if !Sys.iswindows() + @debug "sending profile request to worker $(w.pid) from $from" + if Sys.islinux() + kill(w.process, 10) # SIGUSR1 + elseif Sys.isbsd() + kill(w.process, 29) # SIGINFO + end + sleep(timeout_profile_wait) # Leave time for it to print the profile. + end + return nothing +end + # gracefully terminate a worker by sending a shutdown message # and waiting for the other tasks to perform worker shutdown function Base.close(w::Worker) diff --git a/test/integrationtests.jl b/test/integrationtests.jl index 42374825..20761260 100644 --- a/test/integrationtests.jl +++ b/test/integrationtests.jl @@ -683,7 +683,7 @@ end @test err.value == string(ErrorException("Timed out after 4s running test item \"Test item takes 60 seconds\" (run=1)")) for t in (0, -1.1) - expected = ArgumentError("`testitem_timeout` must be a postive number, got $t") + expected = ArgumentError("`testitem_timeout` must be a positive number, got $t") @test_throws expected runtests(file; nworkers, testitem_timeout=t) end end @@ -819,6 +819,77 @@ end @test ts.time_end - ts.time_start ≈ timeout end +@testset "CPU profile timeout trigger" begin + using Profile + # We're only testing that the signal was registered and that the stacktrace was printed. + # We also tried testing that the CPU profile was displayed here, but that was too flaky in CI. + function capture_timeout_profile(f, timeout_profile_wait; kwargs...) + logs = mktemp() do path, io + redirect_stdio(stdout=io, stderr=io, stdin=devnull) do + encased_testset() do + if isnothing(timeout_profile_wait) + runtests(joinpath(TEST_FILES_DIR, "_timeout_tests.jl"); nworkers=1, testitem_timeout=3, kwargs...) + else + runtests(joinpath(TEST_FILES_DIR, "_timeout_tests.jl"); nworkers=1, testitem_timeout=3, timeout_profile_wait, kwargs...) + end + end + end + flush(io) + close(io) + read(path, String) + end + f(logs) + @assert occursin("timed out running test item \"Test item takes 60 seconds\" after 3 seconds", logs) + return logs + end + + @testset "timeout_profile_wait=0 means no CPU profile" begin + capture_timeout_profile(0) do logs + @test !occursin("Information request received", logs) + end + end + + + default_peektime = Profile.get_peek_duration() + @testset "non-zero timeout_profile_wait means we collect a CPU profile" begin + capture_timeout_profile(5) do logs + @test occursin("Information request received. A stacktrace will print followed by a $(default_peektime) second profile", logs) + @test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows) + @test occursin("Profile collected.", logs) + end + end + + + @testset "`set_peek_duration` is respected in `worker_init_expr`" begin + capture_timeout_profile(5, worker_init_expr=:(using Profile; Profile.set_peek_duration($default_peektime + 1.0))) do logs + @test occursin("Information request received. A stacktrace will print followed by a $(default_peektime + 1.0) second profile", logs) + @test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows) + @test occursin("Profile collected.", logs) + end + end + + + # The RETESTITEMS_TIMEOUT_PROFILE_WAIT environment variable can be used to set the timeout_profile_wait. + @testset "RETESTITEMS_TIMEOUT_PROFILE_WAIT environment variable" begin + withenv("RETESTITEMS_TIMEOUT_PROFILE_WAIT" => "5") do + capture_timeout_profile(nothing) do logs + @test occursin("Information request received", logs) + @test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows) + @test occursin("Profile collected.", logs) + end + end + end + + # The profile is collected for each worker thread. + @testset "CPU profile with $(repr(log_capture))" for log_capture in (:eager, :batched) + capture_timeout_profile(5, nworker_threads=VERSION >= v"1.9" ? "3,2" : "3", logs=log_capture) do logs + @test occursin("Information request received", logs) + @test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows) + @test occursin("Profile collected.", logs) + end + end +end + @testset "worker always crashes immediately" begin file = joinpath(TEST_FILES_DIR, "_happy_tests.jl") diff --git a/test/workers.jl b/test/workers.jl index 40b0e802..6eece79a 100644 --- a/test/workers.jl +++ b/test/workers.jl @@ -93,4 +93,28 @@ using Test close(w) end + @testset "CPU profile" begin + logs = mktemp() do path, io + w = Worker(threads=VERSION >= v"1.9" ? "3,2" : "3", worker_redirect_io=io) + fut = remote_eval(w, :(sleep(5), yield())) + sleep(0.5) + trigger_profile(w, 1, :test) + fetch(fut) + close(w) + flush(io) + close(io) + return read(path, String) + end + + @test occursin(r"Thread 1 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs) + @test occursin(r"Thread 2 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs) + @test occursin(r"Thread 3 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs) + if VERSION >= v"1.9" + @test occursin(r"Thread 4 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs) + @test occursin(r"Thread 5 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs) + @test !occursin(r"Thread 6 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs) + else + @test !occursin(r"Thread 4 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs) + end + end end # workers.jl testset