Skip to content

Commit 629b5f7

Browse files
Better handling of write failures. Fixes #6.
1 parent 3068f7c commit 629b5f7

File tree

4 files changed

+42
-5
lines changed

4 files changed

+42
-5
lines changed

lib/async/container/supervisor/connection.rb

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# Copyright, 2025, by Samuel Williams.
55

66
require "json"
7+
require "async"
78

89
module Async
910
module Container
@@ -154,12 +155,14 @@ def self.dispatch(connection, target, id, message)
154155
connection.write(id: id, **response)
155156
end
156157
ensure
157-
# If the queue is closed, we don't need to send a finished message.
158+
# Ensure the call is removed from the connection's calls hash, otherwise it will leak:
159+
connection.calls.delete(id)
160+
161+
# If the queue is closed, we don't need to send a finished message:
158162
unless call.closed?
159-
connection.write(id: id, finished: true)
163+
# If the above write failed, this is likely to fail too, and we can safely ignore it.
164+
connection.write(id: id, finished: true) rescue nil
160165
end
161-
162-
connection.calls.delete(id)
163166
end
164167
end
165168

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Better handling of write failures in `Connection::Call.dispatch`, ensuring we don't leak calls.
6+
37
## v0.8.0
48

59
- Add `Async::Container::Supervisor::ProcessMonitor` for logging CPU and memory metrics periodically.

test/async/container/connection.rb

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,41 @@
66
require "async/container/supervisor/connection"
77
require "stringio"
88

9+
class TestTarget
10+
def initialize(&block)
11+
@block = block
12+
end
13+
14+
def dispatch(call)
15+
@block.call(call)
16+
end
17+
end
18+
919
describe Async::Container::Supervisor::Connection do
1020
let(:stream) {StringIO.new}
1121
let(:connection) {Async::Container::Supervisor::Connection.new(stream)}
1222

23+
with "dispatch" do
24+
it "handles failed writes" do
25+
stream.write(JSON.dump({id: 1, do: :test}) << "\n")
26+
stream.rewind
27+
28+
expect(stream).to receive(:write).and_raise(IOError, "Test error")
29+
30+
target = TestTarget.new do |call|
31+
Async do
32+
call.push(status: "working")
33+
sleep(0) # Yield back to the dispatch to allow the write to fail.
34+
call.finish(status: "done")
35+
end
36+
end
37+
38+
connection.run(target)
39+
40+
expect(connection.calls).to be(:empty?)
41+
end
42+
end
43+
1344
with subject::Call do
1445
let(:test_call) {Async::Container::Supervisor::Connection::Call.new(connection, 1, {do: :test, data: "value"})}
1546

test/async/container/supervisor.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,5 @@ def reader_target.dispatch(call); end
123123
ensure
124124
worker_task&.stop
125125
end
126-
127126
end
128127
end

0 commit comments

Comments
 (0)