Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,33 @@ module AMQProxy
upstream_channel = channel_pool.get(DownstreamChannel.new(self, frame.channel))
@channel_map[frame.channel] = upstream_channel
write AMQ::Protocol::Frame::Channel::OpenOk.new(frame.channel)
when AMQ::Protocol::Frame::Channel::Close
src_channel = frame.channel
begin
upstream_channel = @channel_map[frame.channel]?
if upstream_channel.nil?
# Channel is already closing (value is nil) or doesn't exist
if @channel_map.has_key?(frame.channel)
# Channel exists but is nil (already closing from upstream side)
# Send CloseOk to acknowledge the client's close request
write AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
@channel_map.delete(frame.channel)
else
# Channel doesn't exist at all - error condition
close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame)
end
else
# Channel is open, forward the close to upstream
begin
upstream_channel.write(frame)
# Mark channel as closing so close_all_upstream_channels won't try to close it again
@channel_map[frame.channel] = nil
rescue ex : Upstream::WriteError
# Upstream write failed, send error close to client
close_channel(src_channel, 500_u16, "UPSTREAM_ERROR")
Comment on lines +99 to +100
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of operations here is problematic. The code sets @channel_map[frame.channel] = nil at line 97 BEFORE attempting to write to upstream. If the upstream write at line 95 fails with Upstream::WriteError, the code attempts to call close_channel at line 100, but since @channel_map was already set to nil, close_channel won't send a Close frame (it checks if the channel exists in the map).

However, since the client initiated this Close request, the proper response upon error would be to send a CloseOk acknowledging the client's close, not another Close frame. Consider sending CloseOk in the error handler instead of calling close_channel, or move the @channel_map[frame.channel] = nil assignment to after the successful write, only setting it to nil within the rescue block if needed.

Suggested change
# Upstream write failed, send error close to client
close_channel(src_channel, 500_u16, "UPSTREAM_ERROR")
# Upstream write to close failed; acknowledge client's close and clean up
write AMQ::Protocol::Frame::Channel::CloseOk.new(src_channel)
@channel_map.delete(src_channel)

Copilot uses AI. Check for mistakes.
end
end
end
Comment on lines +77 to +103
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire Channel::Close handling block accesses and modifies @channel_map (lines 80, 83, 87, 97) without any synchronization. This creates race conditions with:

  1. The write() method which modifies @channel_map at lines 179-181 (also unsynchronized)
  2. The close_channel() method which reads @channel_map at line 203
  3. The close_all_upstream_channels() method which iterates over @channel_map

Since @channel_map can be accessed concurrently from multiple fibers (e.g., when processing frames from upstream via DownstreamChannel.write which calls Client.write), all accesses to @channel_map should be protected by the same lock (such as @lock) to ensure thread safety. Consider wrapping all @channel_map operations in @lock.synchronize blocks or refactoring to ensure consistent synchronization throughout the class.

Copilot uses AI. Check for mistakes.
when AMQ::Protocol::Frame::Channel::CloseOk
# Server closed channel, CloseOk reply to server is already sent
@channel_map.delete(frame.channel)
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deletion of @channel_map entry at line 106 is not synchronized, which can race with other concurrent accesses to @channel_map from the write() method, close_channel() method, and close_all_upstream_channels() method. Ensure this operation is protected by @lock to maintain thread safety.

Suggested change
@channel_map.delete(frame.channel)
@lock.synchronize do
@channel_map.delete(frame.channel)
end

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -149,7 +176,9 @@ module AMQProxy
end
case frame
when AMQ::Protocol::Frame::Channel::Close
@channel_map[frame.channel] = nil
if @channel_map[frame.channel]
@channel_map[frame.channel] = nil
end
Comment on lines 177 to +181
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modification of @channel_map at lines 179-181 happens outside the @lock.synchronize block that ends at line 176. This creates a race condition where multiple concurrent calls to write() with Channel::Close frames could interleave their checks and modifications of @channel_map.

Additionally, accessing @channel_map[frame.channel] without the ? operator can raise a KeyError if the channel was deleted from the map between the write operation and this check. This is especially problematic since the check happens after releasing the lock.

Move the @channel_map modification (lines 177-186) inside the @lock.synchronize block, or use @channel_map[frame.channel]? to safely handle missing keys. The former approach is preferred for consistency with how the rest of the write method handles synchronization.

Copilot uses AI. Check for mistakes.
when AMQ::Protocol::Frame::Channel::CloseOk
@channel_map.delete(frame.channel)
when AMQ::Protocol::Frame::Connection::CloseOk
Expand All @@ -170,7 +199,10 @@ module AMQProxy
end

def close_channel(id, code, reason)
write AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16)
# Only send Channel::Close if we haven't already sent one
if @channel_map[id]?
write AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16)
end
end
Comment on lines 201 to 206
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method accesses @channel_map without any synchronization, which could lead to race conditions. The @channel_map is accessed concurrently by the read_loop (which runs in a fiber) and the write method (which uses @lock for synchronization). Without proper locking, there's a race condition where:

  1. This method reads @channel_map[id] and finds it non-nil
  2. Another fiber sets @channel_map[id] = nil or deletes it
  3. This method sends a duplicate Close frame

Since the write method uses @lock for synchronization when modifying @channel_map (lines 179-181), this method should use the same lock to ensure thread-safe access. Alternatively, consider setting @channel_map[id] = nil before writing the Close frame to prevent duplicates, similar to the pattern used in the write method.

Copilot uses AI. Check for mistakes.

private def close_all_upstream_channels(code = 500_u16, reason = "CLIENT_DISCONNECTED")
Expand Down
7 changes: 6 additions & 1 deletion src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ module AMQProxy
end

def close_channel(id, code, reason)
send AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16)
# Only send Channel::Close if the channel is still open (exists in our map)
@channels_lock.synchronize do
if @channels.has_key?(id)
send AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16)
end
end
end
Comment on lines 59 to 66
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential race condition between this check and the channel deletion that happens in the read_loop. The read_loop deletes channels from @channels at lines 94 and 98 when receiving Channel::Close and Channel::CloseOk frames. Between the time this method checks has_key?(id) and sends the Close frame, the read_loop could receive and process a CloseOk, deleting the channel from @channels. This would still allow the duplicate Close frame to be sent.

To properly prevent duplicate Close frames, the deletion from @channels should also happen within this synchronized block, or the close_channel method should mark the channel as closing before sending the Close frame. Consider deleting the channel from @channels here before sending the Close frame, similar to how the read_loop handles it when receiving Close from upstream.

Copilot uses AI. Check for mistakes.

def channels
Expand Down