Skip to content

Commit bc27b98

Browse files
committed
Move implementation of Thread and Process into Threaded::Child and Process::Child respectively.
1 parent d625155 commit bc27b98

File tree

8 files changed

+394
-402
lines changed

8 files changed

+394
-402
lines changed

fixtures/async/container/a_container.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
# Released under the MIT License.
44
# Copyright, 2019-2024, by Samuel Williams.
55

6+
require "async"
7+
68
module Async
79
module Container
810
AContainer = Sus::Shared("a container") do
@@ -67,10 +69,15 @@ module Container
6769
it "can stop the child process" do
6870
container.spawn do
6971
sleep(1)
72+
rescue Interrupt
73+
# Ignore.
7074
end
7175

7276
expect(container).to be(:running?)
7377

78+
# TODO Investigate why without this, the interrupt can occur before the process is sleeping...
79+
sleep 0.001
80+
7481
container.stop
7582

7683
expect(container).not.to be(:running?)

lib/async/container/forked.rb

Lines changed: 184 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
# Released under the MIT License.
44
# Copyright, 2017-2024, by Samuel Williams.
55

6+
require_relative "error"
7+
68
require_relative "generic"
7-
require_relative "process"
9+
require_relative "channel"
10+
require_relative "notify/pipe"
811

912
module Async
1013
module Container
@@ -15,11 +18,190 @@ def self.multiprocess?
1518
true
1619
end
1720

21+
# Represents a running child process from the point of view of the parent container.
22+
class Child < Channel
23+
# Represents a running child process from the point of view of the child process.
24+
class Instance < Notify::Pipe
25+
# Wrap an instance around the {Process} instance from within the forked child.
26+
# @parameter process [Process] The process intance to wrap.
27+
def self.for(process)
28+
instance = self.new(process.out)
29+
30+
# The child process won't be reading from the channel:
31+
process.close_read
32+
33+
instance.name = process.name
34+
35+
return instance
36+
end
37+
38+
def initialize(io)
39+
super
40+
41+
@name = nil
42+
end
43+
44+
# Set the process title to the specified value.
45+
# @parameter value [String] The name of the process.
46+
def name= value
47+
if @name = value
48+
::Process.setproctitle(@name)
49+
end
50+
end
51+
52+
# The name of the process.
53+
# @returns [String]
54+
def name
55+
@name
56+
end
57+
58+
# Replace the current child process with a different one. Forwards arguments and options to {::Process.exec}.
59+
# This method replaces the child process with the new executable, thus this method never returns.
60+
def exec(*arguments, ready: true, **options)
61+
if ready
62+
self.ready!(status: "(exec)")
63+
else
64+
self.before_spawn(arguments, options)
65+
end
66+
67+
::Process.exec(*arguments, **options)
68+
end
69+
end
70+
71+
# Fork a child process appropriate for a container.
72+
# @returns [Process]
73+
def self.fork(**options)
74+
# $stderr.puts fork: caller
75+
self.new(**options) do |process|
76+
::Process.fork do
77+
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
78+
Signal.trap(:INT) {::Thread.current.raise(Interrupt)}
79+
Signal.trap(:TERM) {::Thread.current.raise(Terminate)}
80+
Signal.trap(:HUP) {::Thread.current.raise(Restart)}
81+
82+
# This could be a configuration option:
83+
::Thread.handle_interrupt(SignalException => :immediate) do
84+
yield Instance.for(process)
85+
rescue Interrupt
86+
# Graceful exit.
87+
rescue Exception => error
88+
Console.error(self, error)
89+
90+
exit!(1)
91+
end
92+
end
93+
end
94+
end
95+
96+
def self.spawn(*arguments, name: nil, **options)
97+
self.new(name: name) do |process|
98+
Notify::Pipe.new(process.out).before_spawn(arguments, options)
99+
100+
::Process.spawn(*arguments, **options)
101+
end
102+
end
103+
104+
# Initialize the process.
105+
# @parameter name [String] The name to use for the child process.
106+
def initialize(name: nil)
107+
super()
108+
109+
@name = name
110+
@status = nil
111+
@pid = nil
112+
113+
@pid = yield(self)
114+
115+
# The parent process won't be writing to the channel:
116+
self.close_write
117+
end
118+
119+
# Set the name of the process.
120+
# Invokes {::Process.setproctitle} if invoked in the child process.
121+
def name= value
122+
@name = value
123+
124+
# If we are the child process:
125+
::Process.setproctitle(@name) if @pid.nil?
126+
end
127+
128+
# The name of the process.
129+
# @attribute [String]
130+
attr :name
131+
132+
# @attribute [Integer] The process identifier.
133+
attr :pid
134+
135+
# A human readable representation of the process.
136+
# @returns [String]
137+
def inspect
138+
"\#<#{self.class} name=#{@name.inspect} status=#{@status.inspect} pid=#{@pid.inspect}>"
139+
end
140+
141+
alias to_s inspect
142+
143+
# Invoke {#terminate!} and then {#wait} for the child process to exit.
144+
def close
145+
self.terminate!
146+
self.wait
147+
ensure
148+
super
149+
end
150+
151+
# Send `SIGINT` to the child process.
152+
def interrupt!
153+
unless @status
154+
::Process.kill(:INT, @pid)
155+
end
156+
end
157+
158+
# Send `SIGTERM` to the child process.
159+
def terminate!
160+
unless @status
161+
::Process.kill(:TERM, @pid)
162+
end
163+
end
164+
165+
# Send `SIGHUP` to the child process.
166+
def restart!
167+
unless @status
168+
::Process.kill(:HUP, @pid)
169+
end
170+
end
171+
172+
# Wait for the child process to exit.
173+
# @asynchronous This method may block.
174+
#
175+
# @returns [::Process::Status] The process exit status.
176+
def wait
177+
if @pid && @status.nil?
178+
Console.debug(self, "Waiting for process to exit...", pid: @pid)
179+
180+
_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)
181+
182+
while @status.nil?
183+
sleep(0.1)
184+
185+
_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)
186+
187+
if @status.nil?
188+
Console.warn(self) {"Process #{@pid} is blocking, has it exited?"}
189+
end
190+
end
191+
end
192+
193+
Console.debug(self, "Process exited.", pid: @pid, status: @status)
194+
195+
return @status
196+
end
197+
end
198+
199+
18200
# Start a named child process and execute the provided block in it.
19201
# @parameter name [String] The name (title) of the child process.
20202
# @parameter block [Proc] The block to execute in the child process.
21203
def start(name, &block)
22-
Process.fork(name: name, &block)
204+
Child.fork(name: name, &block)
23205
end
24206
end
25207
end

lib/async/container/generic.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
# Released under the MIT License.
44
# Copyright, 2019-2024, by Samuel Williams.
55

6-
require "async"
7-
86
require "etc"
97

108
require_relative "group"
@@ -197,8 +195,12 @@ def run(count: Container.processor_count, **options, &block)
197195

198196
# @deprecated Please use {spawn} or {run} instead.
199197
def async(**options, &block)
198+
# warn "#{self.class}##{__method__} is deprecated, please use `spawn` or `run` instead.", uplevel: 1
199+
200+
require "async"
201+
200202
spawn(**options) do |instance|
201-
Async::Reactor.run(instance, &block)
203+
Async(instance, &block)
202204
end
203205
end
204206

0 commit comments

Comments
 (0)