Skip to content

Commit 9b08035

Browse files
authored
Fix wait_for race condition. (#45)
1 parent a557bee commit 9b08035

File tree

3 files changed

+288
-4
lines changed

3 files changed

+288
-4
lines changed

lib/async/container/group.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,14 @@ def wait
100100
end
101101
end
102102

103+
private def each_running(&block)
104+
# We create a copy of the values here, in case the block modifies the running set:
105+
@running.values.each(&block)
106+
end
107+
103108
# Perform a health check on all running processes.
104109
def health_check!
105-
@running.each_value do |fiber|
110+
each_running do |fiber|
106111
fiber.resume(:health_check!)
107112
end
108113
end
@@ -111,7 +116,7 @@ def health_check!
111116
# This resumes the controlling fiber with an instance of {Interrupt}.
112117
def interrupt
113118
Console.info(self, "Sending interrupt to #{@running.size} running processes...")
114-
@running.each_value do |fiber|
119+
each_running do |fiber|
115120
fiber.resume(Interrupt)
116121
end
117122
end
@@ -120,7 +125,7 @@ def interrupt
120125
# This resumes the controlling fiber with an instance of {Terminate}.
121126
def terminate
122127
Console.info(self, "Sending terminate to #{@running.size} running processes...")
123-
@running.each_value do |fiber|
128+
each_running do |fiber|
124129
fiber.resume(Terminate)
125130
end
126131
end
@@ -129,7 +134,7 @@ def terminate
129134
# This resumes the controlling fiber with an instance of {Kill}.
130135
def kill
131136
Console.info(self, "Sending kill to #{@running.size} running processes...")
132-
@running.each_value do |fiber|
137+
each_running do |fiber|
133138
fiber.resume(Kill)
134139
end
135140
end

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Fix race condition where `wait_for` could modify `@running` while it was being iterated over (`each_value`) during health checks.
6+
37
## v0.27.3
48

59
- Add log for starting child, including container statistics.

test/async/container/group.rb

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "async/container/group"
7+
require "async/container/channel"
8+
9+
describe Async::Container::Group do
10+
let(:group) {Async::Container::Group.new}
11+
12+
with "#size" do
13+
it "returns zero for empty group" do
14+
expect(group.size).to be == 0
15+
end
16+
17+
it "returns the number of running processes" do
18+
channel1 = Async::Container::Channel.new
19+
channel2 = Async::Container::Channel.new
20+
21+
fiber1 = Fiber.new {Fiber.yield}
22+
fiber2 = Fiber.new {Fiber.yield}
23+
24+
fiber1.resume
25+
fiber2.resume
26+
27+
group.running[channel1.in] = fiber1
28+
group.running[channel2.in] = fiber2
29+
30+
expect(group.size).to be == 2
31+
end
32+
end
33+
34+
with "#running?" do
35+
it "returns false for empty group" do
36+
expect(group).not.to be(:running?)
37+
end
38+
39+
it "returns true when processes are running" do
40+
channel = Async::Container::Channel.new
41+
fiber = Fiber.new {Fiber.yield}
42+
fiber.resume
43+
44+
group.running[channel.in] = fiber
45+
46+
expect(group).to be(:running?)
47+
end
48+
end
49+
50+
with "#any?" do
51+
it "returns false for empty group" do
52+
expect(group).not.to be(:any?)
53+
end
54+
55+
it "returns true when processes are running" do
56+
channel = Async::Container::Channel.new
57+
fiber = Fiber.new {Fiber.yield}
58+
fiber.resume
59+
60+
group.running[channel.in] = fiber
61+
62+
expect(group).to be(:any?)
63+
end
64+
end
65+
66+
with "#empty?" do
67+
it "returns true for empty group" do
68+
expect(group).to be(:empty?)
69+
end
70+
71+
it "returns false when processes are running" do
72+
channel = Async::Container::Channel.new
73+
fiber = Fiber.new {Fiber.yield}
74+
fiber.resume
75+
76+
group.running[channel.in] = fiber
77+
78+
expect(group).not.to be(:empty?)
79+
end
80+
end
81+
82+
with "#inspect" do
83+
it "provides human-readable representation" do
84+
expect(group.inspect).to be =~ /Async::Container::Group/
85+
expect(group.inspect).to be =~ /running=0/
86+
end
87+
88+
it "shows the number of running processes" do
89+
channel = Async::Container::Channel.new
90+
fiber = Fiber.new {Fiber.yield}
91+
fiber.resume
92+
93+
group.running[channel.in] = fiber
94+
95+
expect(group.inspect).to be =~ /running=1/
96+
end
97+
end
98+
99+
with "#health_check!" do
100+
it "resumes all fibers with :health_check! message" do
101+
messages = []
102+
103+
2.times do
104+
channel = Async::Container::Channel.new
105+
fiber = Fiber.new do
106+
result = Fiber.yield
107+
messages << result
108+
end
109+
110+
fiber.resume
111+
group.running[channel.in] = fiber
112+
end
113+
114+
group.health_check!
115+
116+
expect(messages).to be == [:health_check!, :health_check!]
117+
end
118+
119+
it "does nothing for empty group" do
120+
expect do
121+
group.health_check!
122+
end.not.to raise_exception
123+
end
124+
end
125+
126+
with "#interrupt" do
127+
it "resumes all fibers with Interrupt" do
128+
messages = []
129+
130+
2.times do
131+
channel = Async::Container::Channel.new
132+
fiber = Fiber.new do
133+
result = Fiber.yield
134+
messages << result
135+
end
136+
137+
fiber.resume
138+
group.running[channel.in] = fiber
139+
end
140+
141+
group.interrupt
142+
143+
expect(messages).to be == [Async::Container::Interrupt, Async::Container::Interrupt]
144+
end
145+
end
146+
147+
with "#terminate" do
148+
it "resumes all fibers with Terminate" do
149+
messages = []
150+
151+
2.times do
152+
channel = Async::Container::Channel.new
153+
fiber = Fiber.new do
154+
result = Fiber.yield
155+
messages << result
156+
end
157+
158+
fiber.resume
159+
group.running[channel.in] = fiber
160+
end
161+
162+
group.terminate
163+
164+
expect(messages).to be == [Async::Container::Terminate, Async::Container::Terminate]
165+
end
166+
end
167+
168+
with "#kill" do
169+
it "resumes all fibers with Kill" do
170+
messages = []
171+
172+
2.times do
173+
channel = Async::Container::Channel.new
174+
fiber = Fiber.new do
175+
result = Fiber.yield
176+
messages << result
177+
end
178+
179+
fiber.resume
180+
group.running[channel.in] = fiber
181+
end
182+
183+
group.kill
184+
185+
expect(messages).to be == [Async::Container::Kill, Async::Container::Kill]
186+
end
187+
end
188+
189+
# Regression test for a bug where restarting a child during health check caused
190+
# "RuntimeError: can't add a new key into hash during iteration"
191+
#
192+
# The scenario:
193+
# - A container spawns children with `restart: true` and `health_check_timeout: N`
194+
# - health_check! calls @running.each_value { |fiber| fiber.resume(:health_check!) }
195+
# - A resumed fiber detects health check failure and kills the child
196+
# - The spawn fiber's while loop continues (restart: true) and calls wait_for with a new child
197+
# - wait_for tries to add to @running while health_check! is still iterating
198+
# - This used to cause: RuntimeError: can't add a new key into hash during iteration
199+
it "can restart child during health_check! iteration without error" do
200+
channel1 = Async::Container::Channel.new
201+
channel2 = Async::Container::Channel.new
202+
203+
# Simulate the spawn fiber that restarts on health check failure
204+
restart = true
205+
fiber = Fiber.new do
206+
while restart
207+
result = Fiber.yield # Wait to be resumed
208+
209+
if result == :health_check!
210+
# Health check failed! Simulate the restart logic:
211+
# The wait_for will return (simulated by breaking from this iteration)
212+
# and the while loop continues, creating a new child
213+
214+
# Simulate: child.kill! happens, wait_for returns
215+
# Now the while loop continues and calls wait_for with new child
216+
Fiber.new do
217+
group.wait_for(channel2) do |msg|
218+
# New child waiting
219+
end
220+
end.resume
221+
222+
restart = false # Only do this once for the test
223+
end
224+
end
225+
end
226+
227+
# Start the fiber and add it to @running (simulating first wait_for call)
228+
fiber.resume
229+
group.running[channel1.in] = fiber
230+
231+
# The fix ensures this doesn't raise RuntimeError during iteration
232+
expect do
233+
group.health_check!
234+
end.not.to raise_exception
235+
end
236+
237+
# Regression test with multiple children where one restarts during health check
238+
it "can handle one of multiple children restarting during health check" do
239+
# Create two children, both with restart capability
240+
2.times do |i|
241+
channel = Async::Container::Channel.new
242+
243+
fiber = Fiber.new do
244+
iteration = 0
245+
loop do
246+
iteration += 1
247+
result = Fiber.yield
248+
249+
# First child fails health check on first iteration
250+
if i == 0 && iteration == 1 && result == :health_check!
251+
# Simulate health check failure and restart:
252+
# Kill the old child, create new one
253+
new_channel = Async::Container::Channel.new
254+
255+
# This mimics what happens in spawn's while @running loop
256+
# after wait_for returns due to child being killed
257+
group.wait_for(new_channel) do |msg|
258+
# New child process
259+
end
260+
261+
break # Exit this child's loop
262+
end
263+
end
264+
end
265+
266+
fiber.resume
267+
group.running[channel.in] = fiber
268+
end
269+
270+
# The fix ensures this doesn't raise RuntimeError when the first fiber restarts
271+
expect do
272+
group.health_check!
273+
end.not.to raise_exception
274+
end
275+
end

0 commit comments

Comments
 (0)