88module Async
99 module Container
1010 module Supervisor
11+ # Represents a bidirectional communication channel between supervisor and worker.
12+ #
13+ # Handles message passing, call/response patterns, and connection lifecycle.
1114 class Connection
15+ # Represents a remote procedure call over a connection.
16+ #
17+ # Manages the call lifecycle, response queueing, and completion signaling.
1218 class Call
19+ # Initialize a new call.
20+ #
21+ # @parameter connection [Connection] The connection this call belongs to.
22+ # @parameter id [Integer] The unique call identifier.
23+ # @parameter message [Hash] The call message/parameters.
1324 def initialize ( connection , id , message )
1425 @connection = connection
1526 @id = id
@@ -18,10 +29,16 @@ def initialize(connection, id, message)
1829 @queue = ::Thread ::Queue . new
1930 end
2031
32+ # Convert the call to a JSON-compatible hash.
33+ #
34+ # @returns [Hash] The message hash.
2135 def as_json ( ...)
2236 @message
2337 end
2438
39+ # Convert the call to a JSON string.
40+ #
41+ # @returns [String] The JSON representation.
2542 def to_json ( ...)
2643 as_json . to_json ( ...)
2744 end
@@ -32,14 +49,24 @@ def to_json(...)
3249 # @attribute [Hash] The message that initiated the call.
3350 attr :message
3451
52+ # Access a parameter from the call message.
53+ #
54+ # @parameter key [Symbol] The parameter name.
55+ # @returns [Object] The parameter value.
3556 def [] key
3657 @message [ key ]
3758 end
3859
60+ # Push a response into the call's queue.
61+ #
62+ # @parameter response [Hash] The response data to push.
3963 def push ( **response )
4064 @queue . push ( response )
4165 end
4266
67+ # Pop a response from the call's queue.
68+ #
69+ # @returns [Hash, nil] The next response or nil if queue is closed.
4370 def pop ( ...)
4471 @queue . pop ( ...)
4572 end
@@ -49,12 +76,20 @@ def close
4976 @queue . close
5077 end
5178
79+ # Iterate over all responses from the call.
80+ #
81+ # @yields {|response| ...} Each response from the queue.
5282 def each ( &block )
5383 while response = self . pop
5484 yield response
5585 end
5686 end
5787
88+ # Finish the call with a final response.
89+ #
90+ # Closes the response queue after pushing the final response.
91+ #
92+ # @parameter response [Hash] The final response data.
5893 def finish ( **response )
5994 # If the remote end has already closed the connection, we don't need to send a finished message:
6095 unless @queue . closed?
@@ -63,18 +98,25 @@ def finish(**response)
6398 end
6499 end
65100
101+ # Finish the call with a failure response.
102+ #
103+ # @parameter response [Hash] The error response data.
66104 def fail ( **response )
67105 self . finish ( failed : true , **response )
68106 end
69107
108+ # Check if the call's queue is closed.
109+ #
110+ # @returns [Boolean] True if the queue is closed.
70111 def closed?
71112 @queue . closed?
72113 end
73114
74115 # Forward this call to another connection, proxying all responses back.
75116 #
76117 # This provides true streaming forwarding - intermediate responses flow through
77- # in real-time rather than being buffered.
118+ # in real-time rather than being buffered. The forwarding runs asynchronously
119+ # to avoid blocking the dispatcher.
78120 #
79121 # @parameter target_connection [Connection] The connection to forward the call to.
80122 # @parameter operation [Hash] The operation request to forward (must include :do key).
@@ -92,6 +134,15 @@ def forward(target_connection, operation)
92134 end
93135 end
94136
137+ # Dispatch a call to a target handler.
138+ #
139+ # Creates a call, dispatches it to the target, and streams responses back
140+ # through the connection.
141+ #
142+ # @parameter connection [Connection] The connection to dispatch on.
143+ # @parameter target [Dispatchable] The target handler.
144+ # @parameter id [Integer] The call identifier.
145+ # @parameter message [Hash] The call message.
95146 def self . dispatch ( connection , target , id , message )
96147 Async do
97148 call = self . new ( connection , id , message )
@@ -112,6 +163,15 @@ def self.dispatch(connection, target, id, message)
112163 end
113164 end
114165
166+ # Make a call on a connection and wait for responses.
167+ #
168+ # If a block is provided, yields each response. Otherwise, buffers intermediate
169+ # responses and returns the final response.
170+ #
171+ # @parameter connection [Connection] The connection to call on.
172+ # @parameter message [Hash] The call message/parameters.
173+ # @yields {|response| ...} Each intermediate response if block given.
174+ # @returns [Hash, Array] The final response or array of intermediate responses.
115175 def self . call ( connection , **message , &block )
116176 id = connection . next_id
117177 call = self . new ( connection , id , message )
@@ -149,6 +209,11 @@ def self.call(connection, **message, &block)
149209 end
150210 end
151211
212+ # Initialize a new connection.
213+ #
214+ # @parameter stream [IO] The underlying IO stream.
215+ # @parameter id [Integer] The starting call ID (default: 0).
216+ # @parameter state [Hash] Initial connection state.
152217 def initialize ( stream , id = 0 , **state )
153218 @stream = stream
154219 @id = id
@@ -164,15 +229,26 @@ def initialize(stream, id = 0, **state)
164229 # @attribute [Hash(Symbol, Object)] State associated with this connection, for example the process ID, etc.
165230 attr_accessor :state
166231
232+ # Generate the next unique call ID.
233+ #
234+ # @returns [Integer] The next call identifier.
167235 def next_id
168236 @id += 2
169237 end
170238
239+ # Write a message to the connection stream.
240+ #
241+ # @parameter message [Hash] The message to write.
171242 def write ( **message )
172243 @stream . write ( JSON . dump ( message ) << "\n " )
173244 @stream . flush
174245 end
175246
247+ # Make a synchronous call and wait for a single response.
248+ #
249+ # @parameter timeout [Numeric, nil] Optional timeout for the call.
250+ # @parameter message [Hash] The call message.
251+ # @returns [Hash] The response.
176252 def call ( timeout : nil , **message )
177253 id = next_id
178254 calls [ id ] = ::Thread ::Queue . new
@@ -184,22 +260,34 @@ def call(timeout: nil, **message)
184260 calls . delete ( id )
185261 end
186262
263+ # Read a message from the connection stream.
264+ #
265+ # @returns [Hash, nil] The parsed message or nil if stream is closed.
187266 def read
188267 if line = @stream &.gets
189268 JSON . parse ( line , symbolize_names : true )
190269 end
191270 end
192271
272+ # Iterate over all messages from the connection.
273+ #
274+ # @yields {|message| ...} Each message read from the stream.
193275 def each
194276 while message = self . read
195277 yield message
196278 end
197279 end
198280
281+ # Make a synchronous call and wait for a single response.
199282 def call ( ...)
200283 Call . call ( self , ...)
201284 end
202285
286+ # Run the connection, processing incoming messages.
287+ #
288+ # Dispatches incoming calls to the target and routes responses to waiting calls.
289+ #
290+ # @parameter target [Dispatchable] The target to dispatch calls to.
203291 def run ( target )
204292 self . each do |message |
205293 if id = message . delete ( :id )
@@ -219,12 +307,20 @@ def run(target)
219307 end
220308 end
221309
310+ # Run the connection in a background task.
311+ #
312+ # @parameter target [Dispatchable] The target to dispatch calls to.
313+ # @parameter parent [Async::Task] The parent task.
314+ # @returns [Async::Task] The background reader task.
222315 def run_in_background ( target , parent : Task . current )
223316 @reader ||= parent . async do
224317 self . run ( target )
225318 end
226319 end
227320
321+ # Close the connection and clean up resources.
322+ #
323+ # Stops the background reader, closes the stream, and closes all pending calls.
228324 def close
229325 if @reader
230326 @reader . stop
0 commit comments