Skip to content

Commit 9d71fd4

Browse files
Introduce robust loop helper.
1 parent ae9f0aa commit 9d71fd4

File tree

6 files changed

+100
-57
lines changed

6 files changed

+100
-57
lines changed

fixtures/async/container/supervisor/a_monitor.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,16 @@ module Supervisor
2929
type: be == :remove,
3030
)
3131
end
32+
33+
it "can respond to status calls" do
34+
worker = Worker.new(endpoint: endpoint)
35+
connection = worker.connect
36+
37+
response = connection.call(do: :status)
38+
39+
# Maybe we could be a bit more specific.
40+
expect(response).to be_a(Array)
41+
end
3242
end
3343
end
3444
end
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# frozen_string_literal: true
2+
3+
module Async
4+
module Container
5+
module Supervisor
6+
# A helper for running loops at aligned intervals.
7+
module Loop
8+
# A robust loop that executes a block at aligned intervals.
9+
#
10+
# The alignment is modulo the current clock in seconds.
11+
#
12+
# If an error occurs during the execution of the block, it is logged and the loop continues.
13+
#
14+
# @parameter interval [Integer] The interval in seconds between executions of the block.
15+
def self.run(interval: 60, &block)
16+
while true
17+
# Compute the wait time to the next interval:
18+
wait = interval - (Time.now.to_f % interval)
19+
if wait.positive?
20+
# Sleep until the next interval boundary:
21+
sleep(wait)
22+
end
23+
24+
begin
25+
yield
26+
rescue => error
27+
Console.error(self, "Loop error:", error)
28+
raise
29+
end
30+
end
31+
end
32+
end
33+
end
34+
end
35+
end

lib/async/container/supervisor/memory_monitor.rb

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
require "memory/leak/cluster"
77
require "set"
88

9+
require_relative "loop"
10+
911
module Async
1012
module Container
1113
module Supervisor
@@ -32,6 +34,9 @@ def initialize(interval: 10, total_size_limit: nil, memory_sample: MEMORY_SAMPLE
3234
@processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity}
3335
end
3436

37+
# @attribute [Memory::Leak::Cluster] The cluster of processes being monitored.
38+
attr_reader :cluster
39+
3540
# Add a process to the memory monitor. You may override this to control how processes are added to the cluster.
3641
#
3742
# @parameter process_id [Integer] The process ID to add.
@@ -98,8 +103,14 @@ def memory_leak_detected(process_id, monitor)
98103
end
99104

100105
# Kill the process gently:
101-
Console.info(self, "Killing process!", child: {process_id: process_id})
102-
Process.kill(:INT, process_id)
106+
begin
107+
Console.info(self, "Killing process!", child: {process_id: process_id})
108+
Process.kill(:INT, process_id)
109+
rescue Errno::ESRCH
110+
# No such process - he's dead Jim.
111+
rescue => error
112+
Console.warn(self, "Failed to kill process!", child: {process_id: process_id}, exception: error)
113+
end
103114

104115
true
105116
end
@@ -109,14 +120,17 @@ def memory_leak_detected(process_id, monitor)
109120
# @returns [Async::Task] The task that is running the memory monitor.
110121
def run
111122
Async do
112-
while true
123+
Loop.run(interval: @interval) do
113124
# This block must return true if the process was killed.
114125
@cluster.check! do |process_id, monitor|
115126
Console.error(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor)
116-
memory_leak_detected(process_id, monitor)
127+
128+
begin
129+
memory_leak_detected(process_id, monitor)
130+
rescue => error
131+
Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
132+
end
117133
end
118-
119-
sleep(@interval)
120134
end
121135
end
122136
end

lib/async/container/supervisor/process_monitor.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
# Copyright, 2025, by Samuel Williams.
55

66
require "process/metrics"
7+
require_relative "loop"
8+
9+
require_relative "loop"
710

811
module Async
912
module Container
@@ -70,15 +73,13 @@ def status(call)
7073
# @returns [Async::Task] The task that is running the process monitor.
7174
def run
7275
Async do
73-
while true
76+
Loop.run(interval: @interval) do
7477
metrics = self.metrics
7578

7679
# Log each process individually for better searchability in log platforms:
7780
metrics.each do |process_id, general|
7881
Console.info(self, "Process metrics captured.", general: general)
7982
end
80-
81-
sleep(@interval)
8283
end
8384
end
8485
end

test/async/container/memory_monitor.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,17 @@
2121
task = monitor.run
2222
expect(task).to be(:running?)
2323
end
24+
25+
it "can handle failures" do
26+
expect(monitor.cluster).to receive(:check!).and_raise(Errno::ESRCH)
27+
28+
task = monitor.run
29+
expect(task).to be(:running?)
30+
31+
sleep 1
32+
33+
expect(task).to be(:running?)
34+
end
2435
end
2536
end
2637

test/async/container/process_monitor.rb

Lines changed: 20 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -11,60 +11,32 @@
1111
describe Async::Container::Supervisor::ProcessMonitor do
1212
include Sus::Fixtures::Console::CapturedLogger
1313

14-
let(:monitor) {subject.new(interval: 10)}
14+
let(:monitor) {subject.new(interval: 1)}
1515
it_behaves_like Async::Container::Supervisor::AMonitor
1616

17-
it "has a parent process id" do
18-
expect(monitor.ppid).to be == Process.ppid
19-
end
20-
21-
it "can capture process metrics" do
22-
metrics = monitor.metrics
23-
24-
# Should capture at least the current process
25-
expect(metrics).to be_a(Hash)
26-
expect(metrics).not.to be(:empty?)
27-
28-
# Check that we have a metric for the current process
29-
metric = metrics[Process.pid]
30-
expect(metric).not.to be_nil
31-
expect(metric.process_id).to be == Process.pid
32-
expect(metric.command).to be_a(String)
17+
with "#ppid" do
18+
it "defaults to the current parent process id" do
19+
expect(monitor.ppid).to be == Process.ppid
20+
end
3321
end
3422

35-
it "can respond to status calls" do
36-
# Create a mock connection and call
37-
stream = StringIO.new
38-
connection = Async::Container::Supervisor::Connection.new(stream)
23+
with "#run" do
24+
include Sus::Fixtures::Async::SchedulerContext
3925

40-
# Create a mock call
41-
call_messages = []
42-
call = Object.new
43-
def call.push(**message)
44-
@messages ||= []
45-
@messages << message
46-
end
47-
def call.messages
48-
@messages || []
26+
it "can run the monitor" do
27+
task = monitor.run
28+
expect(task).to be(:running?)
4929
end
5030

51-
monitor.status(call)
52-
53-
expect(call.messages).not.to be(:empty?)
54-
status = call.messages.first
55-
expect(status).to have_keys(:process_monitor)
56-
expect(status[:process_monitor]).to have_keys(:ppid, :metrics)
57-
end
58-
59-
it "can register and remove connections" do
60-
stream = StringIO.new
61-
connection = Async::Container::Supervisor::Connection.new(stream, 0, process_id: Process.pid)
62-
63-
# These should not raise errors
64-
expect do
65-
monitor.register(connection)
66-
monitor.remove(connection)
67-
end.not.to raise_exception
31+
it "can handle failures" do
32+
expect(monitor).to receive(:metrics).and_raise(Errno::ESRCH)
33+
34+
task = monitor.run
35+
expect(task).to be(:running?)
36+
37+
sleep 1
38+
39+
expect(task).to be(:running?)
40+
end
6841
end
6942
end
70-

0 commit comments

Comments
 (0)