@@ -33,13 +33,17 @@ struct Request
3333 # ignoring other fields
3434 shutdown:: Bool
3535end
36+ is_shutdown (r:: Request ) = r. shutdown
3637
3738# worker executes Request and returns a serialized Response object *if* Request has an id
3839struct Response
3940 result
4041 error:: Union{Nothing, Exception}
4142 id:: UInt64 # matches a corresponding Request.id
43+ # if true, worker is shutting down, so we can stop listening to it.
44+ shutdown:: Bool
4245end
46+ is_shutdown (r:: Response ) = r. shutdown
4347
4448# simple Future that coordinator can wait on until a Response comes back for a Request
4549struct Future
@@ -232,6 +236,7 @@ function process_responses(w::Worker, ev::Threads.Event)
232236 # get the next Response from the worker
233237 r = deserialize (w. socket)
234238 @assert r isa Response " Received invalid response from worker $(w. pid) : $(r) "
239+ is_shutdown (r) && break
235240 # println("Received response $(r) from worker $(w.pid)")
236241 @lock lock begin
237242 @assert haskey (reqs, r. id) " Received response for unknown request $(r. id) from worker $(w. pid) "
@@ -318,7 +323,14 @@ function serve_requests(io)
318323 while true
319324 req = deserialize (io)
320325 @assert req isa Request
321- req. shutdown && break
326+ if is_shutdown (req)
327+ resp = Response (nothing , nothing , rand (UInt64), true )
328+ @lock iolock begin
329+ # println("sending response: $(resp)")
330+ serialize (io, resp)
331+ flush (io)
332+ end
333+ end
322334 # println("received request: $(req)")
323335 Threads. @spawn begin
324336 r = $ req
0 commit comments