Skip to content

Commit 0d676f2

Browse files
ioquatixsamuel-williams-shopify
authored andcommitted
Better handling of failures.
1 parent e3599ba commit 0d676f2

File tree

4 files changed

+241
-12
lines changed

4 files changed

+241
-12
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
module Async
2+
module Container
3+
module Supervisor
4+
# A robust loop that executes a block at aligned intervals.
5+
#
6+
# The alignment is modulo the current clock in seconds.
7+
#
8+
# If an error occurs during the execution of the block, it is logged and the loop continues.
9+
#
10+
# @parameter interval [Integer] The interval in seconds between executions of the block.
11+
module Loop
12+
def self.run(interval = 60, &block)
13+
while true
14+
# Compute the wait time to the next interval:
15+
wait = interval - (Time.now.to_f % interval)
16+
if wait.positive?
17+
# Sleep until the next interval boundary:
18+
sleep(wait)
19+
end
20+
21+
begin
22+
yield
23+
rescue => error
24+
Console.error(self, "Loop error:", error)
25+
end
26+
end
27+
end
28+
end
29+
end
30+
end
31+
end

lib/async/container/supervisor/memory_monitor.rb

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
require "memory/leak/cluster"
77
require "set"
88

9+
require_relative "loop"
10+
911
module Async
1012
module Container
1113
module Supervisor
@@ -91,15 +93,24 @@ def memory_leak_detected(process_id, monitor)
9193

9294
# Try to capture a memory sample:
9395
connections.each do |connection|
94-
result = connection.call(do: :memory_sample, **@memory_sample)
95-
96-
Console.info(self, "Memory sample completed:", child: {process_id: process_id}, result: result)
96+
begin
97+
result = connection.call(do: :memory_sample, **@memory_sample)
98+
99+
Console.info(self, "Memory sample completed:", child: {process_id: process_id}, result: result)
100+
rescue Errno::EPIPE, IOError => error
101+
# Connection already closed - process may have crashed
102+
Console.warn(self, "Failed to capture memory sample (connection closed):", child: {process_id: process_id}, error: error)
103+
end
97104
end
98105
end
99106

100107
# Kill the process gently:
101-
Console.info(self, "Killing process!", child: {process_id: process_id})
102-
Process.kill(:INT, process_id)
108+
begin
109+
Console.info(self, "Killing process!", child: {process_id: process_id})
110+
Process.kill(:INT, process_id)
111+
rescue => error
112+
Console.warn(self, "Failed to kill process!", child: {process_id: process_id}, exception: error)
113+
end
103114

104115
true
105116
end
@@ -109,14 +120,17 @@ def memory_leak_detected(process_id, monitor)
109120
# @returns [Async::Task] The task that is running the memory monitor.
110121
def run
111122
Async do
112-
while true
123+
Loop.run(interval: @interval) do
113124
# This block must return true if the process was killed.
114125
@cluster.check! do |process_id, monitor|
115126
Console.error(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor)
116-
memory_leak_detected(process_id, monitor)
127+
128+
begin
129+
memory_leak_detected(process_id, monitor)
130+
rescue => error
131+
Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
132+
end
117133
end
118-
119-
sleep(@interval)
120134
end
121135
end
122136
end

lib/async/container/supervisor/process_monitor.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# Copyright, 2025, by Samuel Williams.
55

66
require "process/metrics"
7+
require_relative "loop"
78

89
module Async
910
module Container
@@ -70,15 +71,13 @@ def status(call)
7071
# @returns [Async::Task] The task that is running the process monitor.
7172
def run
7273
Async do
73-
while true
74+
Loop.run(interval: @interval) do
7475
metrics = self.metrics
7576

7677
# Log each process individually for better searchability in log platforms:
7778
metrics.each do |process_id, general|
7879
Console.info(self, "Process metrics captured.", general: general)
7980
end
80-
81-
sleep(@interval)
8281
end
8382
end
8483
end
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "async/container/supervisor/memory_monitor"
7+
require "async/container/supervisor/connection"
8+
require "sus/fixtures/console/captured_logger"
9+
10+
describe Async::Container::Supervisor::MemoryMonitor do
11+
include Sus::Fixtures::Console::CapturedLogger
12+
13+
let(:monitor) {subject.new(interval: 10, memory_sample: {duration: 1, timeout: 5})}
14+
15+
it "can register and remove connections" do
16+
stream = StringIO.new
17+
connection = Async::Container::Supervisor::Connection.new(stream, 0, process_id: Process.pid)
18+
19+
# These should not raise errors
20+
expect do
21+
monitor.register(connection)
22+
monitor.remove(connection)
23+
end.not.to raise_exception
24+
end
25+
26+
it "can respond to status calls" do
27+
# Create a mock call
28+
call_messages = []
29+
call = Object.new
30+
def call.push(**message)
31+
@messages ||= []
32+
@messages << message
33+
end
34+
def call.messages
35+
@messages || []
36+
end
37+
38+
monitor.status(call)
39+
40+
expect(call.messages).not.to be(:empty?)
41+
status = call.messages.first
42+
expect(status).to have_keys(:memory_monitor)
43+
end
44+
45+
with "closed connection handling" do
46+
it "handles Errno::EPIPE when connection is broken" do
47+
# Use a fake PID that won't kill our test process
48+
fake_pid = 999999
49+
50+
# Create a mock connection that will raise EPIPE on write
51+
stream = StringIO.new
52+
connection = Async::Container::Supervisor::Connection.new(stream, 0, process_id: fake_pid)
53+
54+
# Mock the call method to raise EPIPE
55+
def connection.call(**message)
56+
raise Errno::EPIPE, "Broken pipe"
57+
end
58+
59+
monitor.register(connection)
60+
61+
# Mock a simple monitor object
62+
mock_monitor = Object.new
63+
def mock_monitor.to_s
64+
"MockMonitor"
65+
end
66+
67+
# Mock Process.kill to avoid killing anything
68+
expect(Process).to receive(:kill).and_return(nil)
69+
70+
# This should handle the exception gracefully and not hang
71+
result = monitor.memory_leak_detected(fake_pid, mock_monitor)
72+
73+
# Verify the process would have been killed
74+
expect(result).to be == true
75+
end
76+
77+
it "handles IOError when connection stream is closed" do
78+
# Use a fake PID that won't kill our test process
79+
fake_pid = 999998
80+
81+
# Create a connection with a closed stream
82+
stream = StringIO.new
83+
stream.close
84+
connection = Async::Container::Supervisor::Connection.new(stream, 0, process_id: fake_pid)
85+
86+
# Mock the call method to raise IOError
87+
def connection.call(**message)
88+
raise IOError, "closed stream"
89+
end
90+
91+
monitor.register(connection)
92+
93+
# Mock a simple monitor object
94+
mock_monitor = Object.new
95+
def mock_monitor.to_s
96+
"MockMonitor"
97+
end
98+
99+
# Mock Process.kill to avoid killing anything
100+
expect(Process).to receive(:kill).and_return(nil)
101+
102+
# This should handle the exception gracefully and not hang
103+
result = monitor.memory_leak_detected(fake_pid, mock_monitor)
104+
105+
# Verify the process would have been killed
106+
expect(result).to be == true
107+
end
108+
109+
it "handles write errors when stream is already closed" do
110+
# Use a fake PID that won't kill our test process
111+
fake_pid = 999997
112+
113+
read_io, write_io = IO.pipe
114+
115+
# Close the read end to simulate a broken pipe
116+
read_io.close
117+
118+
connection = Async::Container::Supervisor::Connection.new(write_io, 0, process_id: fake_pid)
119+
monitor.register(connection)
120+
121+
# Mock a simple monitor object
122+
mock_monitor = Object.new
123+
def mock_monitor.to_s
124+
"MockMonitor"
125+
end
126+
127+
# Mock Process.kill to avoid killing anything
128+
expect(Process).to receive(:kill).and_return(nil)
129+
130+
# This should handle the exception gracefully
131+
result = monitor.memory_leak_detected(fake_pid, mock_monitor)
132+
133+
# Verify the process would have been killed
134+
expect(result).to be == true
135+
ensure
136+
write_io&.close
137+
end
138+
139+
it "continues monitoring even if one connection fails" do
140+
# Use a fake PID that won't kill our test process
141+
fake_pid = 999996
142+
143+
# Create two connections, one will fail
144+
good_stream = StringIO.new
145+
good_connection = Async::Container::Supervisor::Connection.new(good_stream, 0, process_id: fake_pid)
146+
147+
bad_stream = StringIO.new
148+
bad_connection = Async::Container::Supervisor::Connection.new(bad_stream, 2, process_id: fake_pid)
149+
150+
# Make the bad connection raise on call
151+
def bad_connection.call(**message)
152+
raise Errno::EPIPE, "Broken pipe"
153+
end
154+
155+
# Make the good connection succeed
156+
def good_connection.call(**message)
157+
@call_count ||= 0
158+
@call_count += 1
159+
{success: true, count: @call_count}
160+
end
161+
def good_connection.call_count
162+
@call_count || 0
163+
end
164+
165+
monitor.register(bad_connection)
166+
monitor.register(good_connection)
167+
168+
mock_monitor = Object.new
169+
def mock_monitor.to_s
170+
"MockMonitor"
171+
end
172+
173+
# Mock Process.kill to avoid killing anything
174+
expect(Process).to receive(:kill).and_return(nil)
175+
176+
# With the fix, this should NOT raise because bad_connection's exception is caught
177+
result = monitor.memory_leak_detected(fake_pid, mock_monitor)
178+
179+
# Both connections should have been attempted - bad one fails, good one succeeds
180+
expect(good_connection.call_count).to be == 1
181+
expect(result).to be == true
182+
end
183+
end
184+
end
185+

0 commit comments

Comments
 (0)