From cd5c9c041f789b59a594d5b119be1a71a44a92e6 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 9 Nov 2025 17:32:30 +1300 Subject: [PATCH 1/3] Introduce robust loop helper. --- .../async/container/supervisor/a_monitor.rb | 10 +++ lib/async/container/supervisor/loop.rb | 34 ++++++++++ .../container/supervisor/memory_monitor.rb | 26 +++++-- .../container/supervisor/process_monitor.rb | 7 +- releases.md | 1 + test/async/container/memory_monitor.rb | 11 +++ test/async/container/process_monitor.rb | 68 ++++++------------- 7 files changed, 100 insertions(+), 57 deletions(-) create mode 100644 lib/async/container/supervisor/loop.rb diff --git a/fixtures/async/container/supervisor/a_monitor.rb b/fixtures/async/container/supervisor/a_monitor.rb index b4379e9..88310c5 100644 --- a/fixtures/async/container/supervisor/a_monitor.rb +++ b/fixtures/async/container/supervisor/a_monitor.rb @@ -29,6 +29,16 @@ module Supervisor type: be == :remove, ) end + + it "can respond to status calls" do + worker = Worker.new(endpoint: endpoint) + connection = worker.connect + + response = connection.call(do: :status) + + # Maybe we could be a bit more specific. + expect(response).to be_a(Array) + end end end end diff --git a/lib/async/container/supervisor/loop.rb b/lib/async/container/supervisor/loop.rb new file mode 100644 index 0000000..6892eb7 --- /dev/null +++ b/lib/async/container/supervisor/loop.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +module Async + module Container + module Supervisor + # A helper for running loops at aligned intervals. + module Loop + # A robust loop that executes a block at aligned intervals. + # + # The alignment is modulo the current clock in seconds. + # + # If an error occurs during the execution of the block, it is logged and the loop continues. + # + # @parameter interval [Integer] The interval in seconds between executions of the block. + def self.run(interval: 60, &block) + while true + # Compute the wait time to the next interval: + wait = interval - (Time.now.to_f % interval) + if wait.positive? + # Sleep until the next interval boundary: + sleep(wait) + end + + begin + yield + rescue => error + Console.error(self, "Loop error:", error) + end + end + end + end + end + end +end diff --git a/lib/async/container/supervisor/memory_monitor.rb b/lib/async/container/supervisor/memory_monitor.rb index bfc498e..f28b15e 100644 --- a/lib/async/container/supervisor/memory_monitor.rb +++ b/lib/async/container/supervisor/memory_monitor.rb @@ -6,6 +6,8 @@ require "memory/leak/cluster" require "set" +require_relative "loop" + module Async module Container module Supervisor @@ -32,6 +34,9 @@ def initialize(interval: 10, total_size_limit: nil, memory_sample: MEMORY_SAMPLE @processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity} end + # @attribute [Memory::Leak::Cluster] The cluster of processes being monitored. + attr_reader :cluster + # Add a process to the memory monitor. You may override this to control how processes are added to the cluster. # # @parameter process_id [Integer] The process ID to add. @@ -98,8 +103,14 @@ def memory_leak_detected(process_id, monitor) end # Kill the process gently: - Console.info(self, "Killing process!", child: {process_id: process_id}) - Process.kill(:INT, process_id) + begin + Console.info(self, "Killing process!", child: {process_id: process_id}) + Process.kill(:INT, process_id) + rescue Errno::ESRCH + # No such process - he's dead Jim. + rescue => error + Console.warn(self, "Failed to kill process!", child: {process_id: process_id}, exception: error) + end true end @@ -109,14 +120,17 @@ def memory_leak_detected(process_id, monitor) # @returns [Async::Task] The task that is running the memory monitor. def run Async do - while true + Loop.run(interval: @interval) do # This block must return true if the process was killed. @cluster.check! do |process_id, monitor| Console.error(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor) - memory_leak_detected(process_id, monitor) + + begin + memory_leak_detected(process_id, monitor) + rescue => error + Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error) + end end - - sleep(@interval) end end end diff --git a/lib/async/container/supervisor/process_monitor.rb b/lib/async/container/supervisor/process_monitor.rb index 688248e..88a9c18 100644 --- a/lib/async/container/supervisor/process_monitor.rb +++ b/lib/async/container/supervisor/process_monitor.rb @@ -4,6 +4,9 @@ # Copyright, 2025, by Samuel Williams. require "process/metrics" +require_relative "loop" + +require_relative "loop" module Async module Container @@ -70,15 +73,13 @@ def status(call) # @returns [Async::Task] The task that is running the process monitor. def run Async do - while true + Loop.run(interval: @interval) do metrics = self.metrics # Log each process individually for better searchability in log platforms: metrics.each do |process_id, general| Console.info(self, "Process metrics captured.", general: general) end - - sleep(@interval) end end end diff --git a/releases.md b/releases.md index 9c04d80..ca33615 100644 --- a/releases.md +++ b/releases.md @@ -3,6 +3,7 @@ ## Unreleased - Better handling of write failures in `Connection::Call.dispatch`, ensuring we don't leak calls. + - Robust monitor loop handling - restart on failure, and align loop iterations. ## v0.8.0 diff --git a/test/async/container/memory_monitor.rb b/test/async/container/memory_monitor.rb index e894792..02e06ba 100644 --- a/test/async/container/memory_monitor.rb +++ b/test/async/container/memory_monitor.rb @@ -21,6 +21,17 @@ task = monitor.run expect(task).to be(:running?) end + + it "can handle failures" do + expect(monitor.cluster).to receive(:check!).and_raise(Errno::ESRCH) + + task = monitor.run + expect(task).to be(:running?) + + sleep 1 + + expect(task).to be(:running?) + end end end diff --git a/test/async/container/process_monitor.rb b/test/async/container/process_monitor.rb index a50b4e0..6407cc7 100644 --- a/test/async/container/process_monitor.rb +++ b/test/async/container/process_monitor.rb @@ -11,60 +11,32 @@ describe Async::Container::Supervisor::ProcessMonitor do include Sus::Fixtures::Console::CapturedLogger - let(:monitor) {subject.new(interval: 10)} + let(:monitor) {subject.new(interval: 1)} it_behaves_like Async::Container::Supervisor::AMonitor - it "has a parent process id" do - expect(monitor.ppid).to be == Process.ppid - end - - it "can capture process metrics" do - metrics = monitor.metrics - - # Should capture at least the current process - expect(metrics).to be_a(Hash) - expect(metrics).not.to be(:empty?) - - # Check that we have a metric for the current process - metric = metrics[Process.pid] - expect(metric).not.to be_nil - expect(metric.process_id).to be == Process.pid - expect(metric.command).to be_a(String) + with "#ppid" do + it "defaults to the current parent process id" do + expect(monitor.ppid).to be == Process.ppid + end end - it "can respond to status calls" do - # Create a mock connection and call - stream = StringIO.new - connection = Async::Container::Supervisor::Connection.new(stream) + with "#run" do + include Sus::Fixtures::Async::SchedulerContext - # Create a mock call - call_messages = [] - call = Object.new - def call.push(**message) - @messages ||= [] - @messages << message - end - def call.messages - @messages || [] + it "can run the monitor" do + task = monitor.run + expect(task).to be(:running?) end - monitor.status(call) - - expect(call.messages).not.to be(:empty?) - status = call.messages.first - expect(status).to have_keys(:process_monitor) - expect(status[:process_monitor]).to have_keys(:ppid, :metrics) - end - - it "can register and remove connections" do - stream = StringIO.new - connection = Async::Container::Supervisor::Connection.new(stream, 0, process_id: Process.pid) - - # These should not raise errors - expect do - monitor.register(connection) - monitor.remove(connection) - end.not.to raise_exception + it "can handle failures" do + expect(monitor).to receive(:metrics).and_raise(Errno::ESRCH) + + task = monitor.run + expect(task).to be(:running?) + + sleep 1 + + expect(task).to be(:running?) + end end end - From 6302e8d5488dd39f2745ae79887eee1cb8b67db0 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 9 Nov 2025 20:07:27 +1300 Subject: [PATCH 2/3] Disable memory sampler by default. --- .../container/supervisor/memory_monitor.rb | 4 +-- lib/async/container/supervisor/worker.rb | 25 +++++++++++-------- releases.md | 2 ++ test/async/container/supervisor.rb | 6 ++--- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/lib/async/container/supervisor/memory_monitor.rb b/lib/async/container/supervisor/memory_monitor.rb index f28b15e..2839cfa 100644 --- a/lib/async/container/supervisor/memory_monitor.rb +++ b/lib/async/container/supervisor/memory_monitor.rb @@ -15,14 +15,12 @@ module Supervisor # # Uses the `memory` gem to track process memory and detect leaks. class MemoryMonitor - MEMORY_SAMPLE = {duration: 30, timeout: 30*4} - # Create a new memory monitor. # # @parameter interval [Integer] The interval at which to check for memory leaks. # @parameter total_size_limit [Integer] The total size limit of all processes, or nil for no limit. # @parameter options [Hash] Options to pass to the cluster when adding processes. - def initialize(interval: 10, total_size_limit: nil, memory_sample: MEMORY_SAMPLE, **options) + def initialize(interval: 10, total_size_limit: nil, memory_sample: false, **options) @interval = interval @cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit) diff --git a/lib/async/container/supervisor/worker.rb b/lib/async/container/supervisor/worker.rb index 56bf5fb..3c3ebde 100644 --- a/lib/async/container/supervisor/worker.rb +++ b/lib/async/container/supervisor/worker.rb @@ -32,18 +32,25 @@ def initialize(state = nil, endpoint: Supervisor.endpoint) include Dispatchable - private def dump(call) + private def dump(call, buffer: true) if path = call[:path] File.open(path, "w") do |file| yield file end call.finish(path: path) - else + elsif buffer buffer = StringIO.new yield buffer - call.finish(data: buffer.string) + if message = call[:log] + Console.info(self, message, data: buffer.string) + call.finish + else + call.finish(data: buffer.string) + end + else + call.fail(error: {message: "Buffered output not supported!"}) end end @@ -69,7 +76,7 @@ def do_scheduler_dump(call) def do_memory_dump(call) require "objspace" - dump(call) do |file| + dump(call, buffer: false) do |file| ObjectSpace.dump_all(output: file) end end @@ -109,13 +116,9 @@ def do_memory_sample(call) report = sampler.report - # This is a temporary log to help with debugging: - buffer = StringIO.new - report.print(buffer) - Console.info(self, "Memory sample completed.", report: buffer.string) - - # Generate a report focused on retained objects (likely leaks): - call.finish(report: report) + dump(call) do |file| + file.puts(report.to_s) + end ensure GC.start end diff --git a/releases.md b/releases.md index ca33615..1c4af3c 100644 --- a/releases.md +++ b/releases.md @@ -4,6 +4,8 @@ - Better handling of write failures in `Connection::Call.dispatch`, ensuring we don't leak calls. - Robust monitor loop handling - restart on failure, and align loop iterations. + - Disable memory sampler by default and use text output format. + - Introduce support for redirecting dump output to logs. ## v0.8.0 diff --git a/test/async/container/supervisor.rb b/test/async/container/supervisor.rb index 72d0792..a331434 100644 --- a/test/async/container/supervisor.rb +++ b/test/async/container/supervisor.rb @@ -76,8 +76,7 @@ def reader_target.dispatch(call) ) # Verify we got the forwarded response - expect(result).to have_keys(:report) - expect(result[:report]).to have_keys(:total_allocated, :total_retained) + expect(result).to have_keys(:data) ensure client_conn&.close worker_task&.stop @@ -117,8 +116,7 @@ def reader_target.dispatch(call); end result = connection.call(do: :memory_sample, duration: 1) # The result should contain a report - expect(result).to have_keys(:report) - expect(result[:report]).to have_keys(:total_allocated, :total_retained) + expect(result).to have_keys(:data) ensure worker_task&.stop end From dacc7d8a130d9d4a005e086e7617966b197170cd Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 9 Nov 2025 20:17:09 +1300 Subject: [PATCH 3/3] Fix test failure on ruby head. --- fixtures/async/container/supervisor/a_monitor.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fixtures/async/container/supervisor/a_monitor.rb b/fixtures/async/container/supervisor/a_monitor.rb index 88310c5..bb51cc2 100644 --- a/fixtures/async/container/supervisor/a_monitor.rb +++ b/fixtures/async/container/supervisor/a_monitor.rb @@ -28,6 +28,8 @@ module Supervisor expect(event).to have_attributes( type: be == :remove, ) + ensure + connection&.close end it "can respond to status calls" do @@ -38,6 +40,8 @@ module Supervisor # Maybe we could be a bit more specific. expect(response).to be_a(Array) + ensure + connection&.close end end end