Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions fixtures/async/container/supervisor/a_monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ module Supervisor
expect(event).to have_attributes(
type: be == :remove,
)
ensure
connection&.close
end

it "can respond to status calls" do
worker = Worker.new(endpoint: endpoint)
connection = worker.connect

response = connection.call(do: :status)

# Maybe we could be a bit more specific.
expect(response).to be_a(Array)
ensure
connection&.close
end
end
end
Expand Down
34 changes: 34 additions & 0 deletions lib/async/container/supervisor/loop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# frozen_string_literal: true

module Async
module Container
module Supervisor
# A helper for running loops at aligned intervals.
module Loop
# A robust loop that executes a block at aligned intervals.
#
# The alignment is modulo the current clock in seconds.
#
# If an error occurs during the execution of the block, it is logged and the loop continues.
#
# @parameter interval [Integer] The interval in seconds between executions of the block.
def self.run(interval: 60, &block)
while true
# Compute the wait time to the next interval:
wait = interval - (Time.now.to_f % interval)
if wait.positive?
# Sleep until the next interval boundary:
sleep(wait)
end

begin
yield
rescue => error
Console.error(self, "Loop error:", error)
end
end
end
end
end
end
end
30 changes: 21 additions & 9 deletions lib/async/container/supervisor/memory_monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@
require "memory/leak/cluster"
require "set"

require_relative "loop"

module Async
module Container
module Supervisor
# Monitors worker memory usage and restarts workers that exceed limits.
#
# Uses the `memory` gem to track process memory and detect leaks.
class MemoryMonitor
MEMORY_SAMPLE = {duration: 30, timeout: 30*4}

# Create a new memory monitor.
#
# @parameter interval [Integer] The interval at which to check for memory leaks.
# @parameter total_size_limit [Integer] The total size limit of all processes, or nil for no limit.
# @parameter options [Hash] Options to pass to the cluster when adding processes.
def initialize(interval: 10, total_size_limit: nil, memory_sample: MEMORY_SAMPLE, **options)
def initialize(interval: 10, total_size_limit: nil, memory_sample: false, **options)
@interval = interval
@cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit)

Expand All @@ -32,6 +32,9 @@ def initialize(interval: 10, total_size_limit: nil, memory_sample: MEMORY_SAMPLE
@processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity}
end

# @attribute [Memory::Leak::Cluster] The cluster of processes being monitored.
attr_reader :cluster

# Add a process to the memory monitor. You may override this to control how processes are added to the cluster.
#
# @parameter process_id [Integer] The process ID to add.
Expand Down Expand Up @@ -98,8 +101,14 @@ def memory_leak_detected(process_id, monitor)
end

# Kill the process gently:
Console.info(self, "Killing process!", child: {process_id: process_id})
Process.kill(:INT, process_id)
begin
Console.info(self, "Killing process!", child: {process_id: process_id})
Process.kill(:INT, process_id)
rescue Errno::ESRCH
# No such process - he's dead Jim.
rescue => error
Console.warn(self, "Failed to kill process!", child: {process_id: process_id}, exception: error)
end

true
end
Expand All @@ -109,14 +118,17 @@ def memory_leak_detected(process_id, monitor)
# @returns [Async::Task] The task that is running the memory monitor.
def run
Async do
while true
Loop.run(interval: @interval) do
# This block must return true if the process was killed.
@cluster.check! do |process_id, monitor|
Console.error(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor)
memory_leak_detected(process_id, monitor)

begin
memory_leak_detected(process_id, monitor)
rescue => error
Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
end
end

sleep(@interval)
end
end
end
Expand Down
7 changes: 4 additions & 3 deletions lib/async/container/supervisor/process_monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
# Copyright, 2025, by Samuel Williams.

require "process/metrics"
require_relative "loop"

require_relative "loop"

module Async
module Container
Expand Down Expand Up @@ -70,15 +73,13 @@ def status(call)
# @returns [Async::Task] The task that is running the process monitor.
def run
Async do
while true
Loop.run(interval: @interval) do
metrics = self.metrics

# Log each process individually for better searchability in log platforms:
metrics.each do |process_id, general|
Console.info(self, "Process metrics captured.", general: general)
end

sleep(@interval)
end
end
end
Expand Down
25 changes: 14 additions & 11 deletions lib/async/container/supervisor/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,25 @@ def initialize(state = nil, endpoint: Supervisor.endpoint)

include Dispatchable

private def dump(call)
private def dump(call, buffer: true)
if path = call[:path]
File.open(path, "w") do |file|
yield file
end

call.finish(path: path)
else
elsif buffer
buffer = StringIO.new
yield buffer

call.finish(data: buffer.string)
if message = call[:log]
Console.info(self, message, data: buffer.string)
call.finish
else
call.finish(data: buffer.string)
end
else
call.fail(error: {message: "Buffered output not supported!"})
end
end

Expand All @@ -69,7 +76,7 @@ def do_scheduler_dump(call)
def do_memory_dump(call)
require "objspace"

dump(call) do |file|
dump(call, buffer: false) do |file|
ObjectSpace.dump_all(output: file)
end
end
Expand Down Expand Up @@ -109,13 +116,9 @@ def do_memory_sample(call)

report = sampler.report

# This is a temporary log to help with debugging:
buffer = StringIO.new
report.print(buffer)
Console.info(self, "Memory sample completed.", report: buffer.string)

# Generate a report focused on retained objects (likely leaks):
call.finish(report: report)
dump(call) do |file|
file.puts(report.to_s)
end
ensure
GC.start
end
Expand Down
3 changes: 3 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
## Unreleased

- Better handling of write failures in `Connection::Call.dispatch`, ensuring we don't leak calls.
- Robust monitor loop handling - restart on failure, and align loop iterations.
- Disable memory sampler by default and use text output format.
- Introduce support for redirecting dump output to logs.

## v0.8.0

Expand Down
11 changes: 11 additions & 0 deletions test/async/container/memory_monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
task = monitor.run
expect(task).to be(:running?)
end

it "can handle failures" do
expect(monitor.cluster).to receive(:check!).and_raise(Errno::ESRCH)

task = monitor.run
expect(task).to be(:running?)

sleep 1

expect(task).to be(:running?)
end
end
end

68 changes: 20 additions & 48 deletions test/async/container/process_monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,60 +11,32 @@
describe Async::Container::Supervisor::ProcessMonitor do
include Sus::Fixtures::Console::CapturedLogger

let(:monitor) {subject.new(interval: 10)}
let(:monitor) {subject.new(interval: 1)}
it_behaves_like Async::Container::Supervisor::AMonitor

it "has a parent process id" do
expect(monitor.ppid).to be == Process.ppid
end

it "can capture process metrics" do
metrics = monitor.metrics

# Should capture at least the current process
expect(metrics).to be_a(Hash)
expect(metrics).not.to be(:empty?)

# Check that we have a metric for the current process
metric = metrics[Process.pid]
expect(metric).not.to be_nil
expect(metric.process_id).to be == Process.pid
expect(metric.command).to be_a(String)
with "#ppid" do
it "defaults to the current parent process id" do
expect(monitor.ppid).to be == Process.ppid
end
end

it "can respond to status calls" do
# Create a mock connection and call
stream = StringIO.new
connection = Async::Container::Supervisor::Connection.new(stream)
with "#run" do
include Sus::Fixtures::Async::SchedulerContext

# Create a mock call
call_messages = []
call = Object.new
def call.push(**message)
@messages ||= []
@messages << message
end
def call.messages
@messages || []
it "can run the monitor" do
task = monitor.run
expect(task).to be(:running?)
end

monitor.status(call)

expect(call.messages).not.to be(:empty?)
status = call.messages.first
expect(status).to have_keys(:process_monitor)
expect(status[:process_monitor]).to have_keys(:ppid, :metrics)
end

it "can register and remove connections" do
stream = StringIO.new
connection = Async::Container::Supervisor::Connection.new(stream, 0, process_id: Process.pid)

# These should not raise errors
expect do
monitor.register(connection)
monitor.remove(connection)
end.not.to raise_exception
it "can handle failures" do
expect(monitor).to receive(:metrics).and_raise(Errno::ESRCH)

task = monitor.run
expect(task).to be(:running?)

sleep 1

expect(task).to be(:running?)
end
end
end

6 changes: 2 additions & 4 deletions test/async/container/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ def reader_target.dispatch(call)
)

# Verify we got the forwarded response
expect(result).to have_keys(:report)
expect(result[:report]).to have_keys(:total_allocated, :total_retained)
expect(result).to have_keys(:data)
ensure
client_conn&.close
worker_task&.stop
Expand Down Expand Up @@ -117,8 +116,7 @@ def reader_target.dispatch(call); end
result = connection.call(do: :memory_sample, duration: 1)

# The result should contain a report
expect(result).to have_keys(:report)
expect(result[:report]).to have_keys(:total_allocated, :total_retained)
expect(result).to have_keys(:data)
ensure
worker_task&.stop
end
Expand Down
Loading