@@ -151,7 +151,7 @@ function set_worker_state(w, state)
151151end
152152
153153function check_worker_state (w:: Worker )
154- if w. state === W_CREATED
154+ if ( @atomic w. state) === W_CREATED
155155 if ! isclusterlazy ()
156156 if PGRP. topology === :all_to_all
157157 # Since higher pids connect with lower pids, the remote worker
@@ -163,10 +163,10 @@ function check_worker_state(w::Worker)
163163 else
164164 w. ct_time = time ()
165165 if myid () > w. id
166- t = Threads . @spawn Threads . threadpool () exec_conn_func (w)
166+ t = @async exec_conn_func (w)
167167 else
168168 # route request via node 1
169- t = Threads . @spawn Threads . threadpool () remotecall_fetch ((p,to_id) -> remotecall_fetch (exec_conn_func, p, to_id), 1 , w. id, myid ())
169+ t = @async remotecall_fetch ((p,to_id) -> remotecall_fetch (exec_conn_func, p, to_id), 1 , w. id, myid ())
170170 end
171171 errormonitor (t)
172172 wait_for_conn (w)
@@ -190,20 +190,14 @@ function exec_conn_func(w::Worker)
190190end
191191
192192function wait_for_conn (w)
193- if w. state === W_CREATED
193+ if ( @atomic w. state) === W_CREATED
194194 timeout = worker_timeout () - (time () - w. ct_time)
195195 timeout <= 0 && error (" peer $(w. id) has not connected to $(myid ()) " )
196196
197- T = Threads. @spawn Threads. threadpool () begin
198- sleep ($ timeout)
199- lock (w. c_state) do
200- notify (w. c_state; all= true )
201- end
202- end
203- errormonitor (T)
204- lock (w. c_state) do
205- wait (w. c_state)
206- w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
197+ if timedwait (() -> (@atomic w. state) === W_CONNECTED, timeout) === :timed_out
198+ # Notify any waiters on the state and throw
199+ @lock w. c_state notify (w. c_state)
200+ error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
207201 end
208202 end
209203 nothing
@@ -258,7 +252,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
258252 else
259253 sock = listen (interface, LPROC. bind_port)
260254 end
261- errormonitor (Threads . @spawn while isopen (sock)
255+ errormonitor (@async while isopen (sock)
262256 client = accept (sock)
263257 process_messages (client, client, true )
264258 end )
290284
291285
292286function redirect_worker_output (ident, stream)
293- t = Threads . @spawn while ! eof (stream)
287+ t = @async while ! eof (stream)
294288 line = readline (stream)
295289 if startswith (line, " From worker " )
296290 # stdout's of "additional" workers started from an initial worker on a host are not available
@@ -329,7 +323,7 @@ function read_worker_host_port(io::IO)
329323 leader = String[]
330324 try
331325 while ntries > 0
332- readtask = Threads . @spawn Threads . threadpool () readline (io)
326+ readtask = @async readline (io)
333327 yield ()
334328 while ! istaskdone (readtask) && ((time_ns () - t0) < timeout)
335329 sleep (0.05 )
@@ -430,7 +424,7 @@ if launching workers programmatically, execute `addprocs` in its own task.
430424
431425```julia
432426# On busy clusters, call `addprocs` asynchronously
433- t = Threads.@spawn addprocs(...)
427+ t = @async addprocs(...)
434428```
435429
436430```julia
@@ -496,13 +490,14 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
496490 # call manager's `launch` is a separate task. This allows the master
497491 # process initiate the connection setup process as and when workers come
498492 # online
499- t_launch = Threads. @spawn Threads. threadpool () launch (manager, params, launched, launch_ntfy)
493+ # NOTE: Must be `@async`. See FIXME above
494+ t_launch = @async launch (manager, params, launched, launch_ntfy)
500495
501496 @sync begin
502497 while true
503498 if isempty (launched)
504499 istaskdone (t_launch) && break
505- Threads . @spawn Threads . threadpool () begin
500+ @async begin # NOTE: Must be `@async`. See FIXME above
506501 sleep (1 )
507502 notify (launch_ntfy)
508503 end
@@ -512,7 +507,8 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
512507 if ! isempty (launched)
513508 wconfig = popfirst! (launched)
514509 let wconfig= wconfig
515- Threads. @spawn Threads. threadpool () setup_launched_worker (manager, wconfig, launched_q)
510+ # NOTE: Must be `@async`. See FIXME above
511+ @async setup_launched_worker (manager, wconfig, launched_q)
516512 end
517513 end
518514 end
@@ -592,7 +588,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
592588 wconfig. port = port
593589
594590 let wconfig= wconfig
595- Threads . @spawn Threads . threadpool () begin
591+ @async begin
596592 pid = create_worker (manager, wconfig)
597593 remote_do (redirect_output_from_additional_worker, frompid, pid, port)
598594 push! (launched_q, pid)
@@ -660,7 +656,7 @@ function create_worker(manager, wconfig)
660656 for jw in PGRP. workers
661657 if (jw. id != 1 ) && (jw. id < w. id)
662658 # wait for wl to join
663- if jw. state === W_CREATED
659+ if ( @atomic jw. state) === W_CREATED
664660 lock (jw. c_state) do
665661 wait (jw. c_state)
666662 end
@@ -688,7 +684,7 @@ function create_worker(manager, wconfig)
688684
689685 for wl in wlist
690686 lock (wl. c_state) do
691- if wl. state === W_CREATED
687+ if ( @atomic wl. state) === W_CREATED
692688 # wait for wl to join
693689 wait (wl. c_state)
694690 end
@@ -758,7 +754,7 @@ function check_master_connect()
758754 end
759755
760756 errormonitor (
761- Threads . @spawn begin
757+ @async begin
762758 timeout = worker_timeout ()
763759 if timedwait (() -> ! haskey (map_pid_wrkr, 1 ), timeout) === :timed_out
764760 print (stderr , " Master process (id 1) could not connect within $(timeout) seconds.\n exiting.\n " )
@@ -890,7 +886,7 @@ function nprocs()
890886 n = length (PGRP. workers)
891887 # filter out workers in the process of being setup/shutdown.
892888 for jw in PGRP. workers
893- if ! isa (jw, LocalProcess) && (jw. state != = W_CONNECTED)
889+ if ! isa (jw, LocalProcess) && (( @atomic jw. state) != = W_CONNECTED)
894890 n = n - 1
895891 end
896892 end
@@ -941,7 +937,7 @@ julia> procs()
941937function procs ()
942938 if myid () == 1 || (PGRP. topology === :all_to_all && ! isclusterlazy ())
943939 # filter out workers in the process of being setup/shutdown.
944- return Int[x. id for x in PGRP. workers if isa (x, LocalProcess) || (x. state === W_CONNECTED)]
940+ return Int[x. id for x in PGRP. workers if isa (x, LocalProcess) || (( @atomic x. state) === W_CONNECTED)]
945941 else
946942 return Int[x. id for x in PGRP. workers]
947943 end
950946function id_in_procs (id) # faster version of `id in procs()`
951947 if myid () == 1 || (PGRP. topology === :all_to_all && ! isclusterlazy ())
952948 for x in PGRP. workers
953- if (x. id:: Int ) == id && (isa (x, LocalProcess) || (x:: Worker ). state === W_CONNECTED)
949+ if (x. id:: Int ) == id && (isa (x, LocalProcess) || (@atomic ( x:: Worker ). state) === W_CONNECTED)
954950 return true
955951 end
956952 end
@@ -972,7 +968,7 @@ Specifically all workers bound to the same ip-address as `pid` are returned.
972968"""
973969function procs (pid:: Integer )
974970 if myid () == 1
975- all_workers = [x for x in PGRP. workers if isa (x, LocalProcess) || (x. state === W_CONNECTED)]
971+ all_workers = [x for x in PGRP. workers if isa (x, LocalProcess) || (( @atomic x. state) === W_CONNECTED)]
976972 if (pid == 1 ) || (isa (map_pid_wrkr[pid]. manager, LocalManager))
977973 Int[x. id for x in filter (w -> (w. id== 1 ) || (isa (w. manager, LocalManager)), all_workers)]
978974 else
@@ -1050,13 +1046,13 @@ function rmprocs(pids...; waitfor=typemax(Int))
10501046
10511047 pids = vcat (pids... )
10521048 if waitfor == 0
1053- t = Threads . @spawn Threads . threadpool () _rmprocs (pids, typemax (Int))
1049+ t = @async _rmprocs (pids, typemax (Int))
10541050 yield ()
10551051 return t
10561052 else
10571053 _rmprocs (pids, waitfor)
10581054 # return a dummy task object that user code can wait on.
1059- return Threads . @spawn Threads . threadpool () nothing
1055+ return @async nothing
10601056 end
10611057end
10621058
@@ -1079,11 +1075,11 @@ function _rmprocs(pids, waitfor)
10791075
10801076 start = time_ns ()
10811077 while (time_ns () - start) < waitfor* 1e9
1082- all (w -> w. state === W_TERMINATED, rmprocset) && break
1078+ all (w -> ( @atomic w. state) === W_TERMINATED, rmprocset) && break
10831079 sleep (min (0.1 , waitfor - (time_ns () - start)/ 1e9 ))
10841080 end
10851081
1086- unremoved = [wrkr. id for wrkr in filter (w -> w. state != = W_TERMINATED, rmprocset)]
1082+ unremoved = [wrkr. id for wrkr in filter (w -> ( @atomic w. state) != = W_TERMINATED, rmprocset)]
10871083 if length (unremoved) > 0
10881084 estr = string (" rmprocs: pids " , unremoved, " not terminated after " , waitfor, " seconds." )
10891085 throw (ErrorException (estr))
@@ -1239,7 +1235,7 @@ function interrupt(pids::AbstractVector=workers())
12391235 @assert myid () == 1
12401236 @sync begin
12411237 for pid in pids
1242- Threads . @spawn Threads . threadpool () interrupt (pid)
1238+ @async interrupt (pid)
12431239 end
12441240 end
12451241end
0 commit comments