Skip to content

Commit 60f6935

Browse files
authored
Merge pull request #249 from koic/fix_econnreset_in_sse_notification
Handle `Errno::ECONNRESET` in SSE stream operations
2 parents 2fc4f5e + 9c1c249 commit 60f6935

File tree

2 files changed

+202
-4
lines changed

2 files changed

+202
-4
lines changed

lib/mcp/server/transports/streamable_http_transport.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def initialize(server, stateless: false)
1919

2020
REQUIRED_POST_ACCEPT_TYPES = ["application/json", "text/event-stream"].freeze
2121
REQUIRED_GET_ACCEPT_TYPES = ["text/event-stream"].freeze
22+
STREAM_WRITE_ERRORS = [IOError, Errno::EPIPE, Errno::ECONNRESET].freeze
2223

2324
def handle_request(request)
2425
case request.env["REQUEST_METHOD"]
@@ -58,7 +59,7 @@ def send_notification(method, params = nil, session_id: nil)
5859
begin
5960
send_to_stream(session[:stream], notification)
6061
true
61-
rescue IOError, Errno::EPIPE => e
62+
rescue *STREAM_WRITE_ERRORS => e
6263
MCP.configuration.exception_reporter.call(
6364
e,
6465
{ session_id: session_id, error: "Failed to send notification" },
@@ -77,7 +78,7 @@ def send_notification(method, params = nil, session_id: nil)
7778
begin
7879
send_to_stream(session[:stream], notification)
7980
sent_count += 1
80-
rescue IOError, Errno::EPIPE => e
81+
rescue *STREAM_WRITE_ERRORS => e
8182
MCP.configuration.exception_reporter.call(
8283
e,
8384
{ session_id: sid, error: "Failed to send notification" },
@@ -289,7 +290,7 @@ def send_response_to_stream(stream, response, session_id)
289290
message = JSON.parse(response)
290291
send_to_stream(stream, message)
291292
handle_accepted
292-
rescue IOError, Errno::EPIPE => e
293+
rescue *STREAM_WRITE_ERRORS => e
293294
MCP.configuration.exception_reporter.call(
294295
e,
295296
{ session_id: session_id, error: "Stream closed during response" },
@@ -366,7 +367,7 @@ def send_keepalive_ping(session_id)
366367
send_ping_to_stream(@sessions[session_id][:stream])
367368
end
368369
end
369-
rescue IOError, Errno::EPIPE => e
370+
rescue *STREAM_WRITE_ERRORS => e
370371
MCP.configuration.exception_reporter.call(
371372
e,
372373
{ session_id: session_id, error: "Stream closed" },

test/mcp/server/transports/streamable_http_transport_test.rb

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,53 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
210210
end
211211
end
212212

213+
test "handles POST request when Errno::ECONNRESET raised" do
214+
# Create and initialize a session.
215+
init_request = create_rack_request(
216+
"POST",
217+
"/",
218+
{ "CONTENT_TYPE" => "application/json" },
219+
{ jsonrpc: "2.0", method: "initialize", id: "123" }.to_json,
220+
)
221+
init_response = @transport.handle_request(init_request)
222+
session_id = init_response[1]["Mcp-Session-Id"]
223+
224+
# Use a mock stream that raises Errno::ECONNRESET on write.
225+
mock_stream = Object.new
226+
mock_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET }
227+
mock_stream.define_singleton_method(:close) {}
228+
229+
# Connect with SSE using the mock stream.
230+
get_request = create_rack_request(
231+
"GET",
232+
"/",
233+
{ "HTTP_MCP_SESSION_ID" => session_id },
234+
)
235+
response = @transport.handle_request(get_request)
236+
response[2].call(mock_stream) if response[2].is_a?(Proc)
237+
238+
# Give the stream time to set up.
239+
sleep(0.1)
240+
241+
request = create_rack_request(
242+
"POST",
243+
"/",
244+
{
245+
"CONTENT_TYPE" => "application/json",
246+
"HTTP_MCP_SESSION_ID" => session_id,
247+
},
248+
{ jsonrpc: "2.0", method: "ping", id: "789" }.to_json,
249+
)
250+
251+
# This should handle Errno::ECONNRESET and return the original response.
252+
response = @transport.handle_request(request)
253+
assert_equal 200, response[0]
254+
assert_equal({ "Content-Type" => "application/json" }, response[1])
255+
256+
# Verify session was cleaned up.
257+
assert_not @transport.instance_variable_get(:@sessions).key?(session_id)
258+
end
259+
213260
test "handles GET request with missing session ID" do
214261
request = create_rack_request(
215262
"GET",
@@ -558,6 +605,156 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
558605
assert_not @transport.instance_variable_get(:@sessions).key?(session_id)
559606
end
560607

608+
test "send_notification handles Errno::ECONNRESET gracefully" do
609+
# Create and initialize a session.
610+
init_request = create_rack_request(
611+
"POST",
612+
"/",
613+
{ "CONTENT_TYPE" => "application/json" },
614+
{ jsonrpc: "2.0", method: "initialize", id: "123" }.to_json,
615+
)
616+
init_response = @transport.handle_request(init_request)
617+
session_id = init_response[1]["Mcp-Session-Id"]
618+
619+
# Use a mock stream that raises Errno::ECONNRESET on write.
620+
mock_stream = Object.new
621+
mock_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET }
622+
mock_stream.define_singleton_method(:close) {}
623+
624+
# Connect with SSE using the mock stream.
625+
get_request = create_rack_request(
626+
"GET",
627+
"/",
628+
{ "HTTP_MCP_SESSION_ID" => session_id },
629+
)
630+
response = @transport.handle_request(get_request)
631+
response[2].call(mock_stream) if response[2].is_a?(Proc)
632+
633+
# Give the stream time to set up.
634+
sleep(0.1)
635+
636+
# Try to send notification - should handle ECONNRESET without raising.
637+
result = @transport.send_notification("test", { message: "test" }, session_id: session_id)
638+
639+
# Should return false and clean up the session.
640+
refute result
641+
642+
# Verify session was cleaned up.
643+
assert_not @transport.instance_variable_get(:@sessions).key?(session_id)
644+
end
645+
646+
test "send_notification broadcast continues when one session raises Errno::ECONNRESET" do
647+
# Create two sessions.
648+
init_request1 = create_rack_request(
649+
"POST",
650+
"/",
651+
{ "CONTENT_TYPE" => "application/json" },
652+
{ jsonrpc: "2.0", method: "initialize", id: "1" }.to_json,
653+
)
654+
init_response1 = @transport.handle_request(init_request1)
655+
session_id1 = init_response1[1]["Mcp-Session-Id"]
656+
657+
init_request2 = create_rack_request(
658+
"POST",
659+
"/",
660+
{ "CONTENT_TYPE" => "application/json" },
661+
{ jsonrpc: "2.0", method: "initialize", id: "2" }.to_json,
662+
)
663+
init_response2 = @transport.handle_request(init_request2)
664+
session_id2 = init_response2[1]["Mcp-Session-Id"]
665+
666+
# Session 1: mock stream that raises ECONNRESET.
667+
broken_stream = Object.new
668+
broken_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET }
669+
broken_stream.define_singleton_method(:close) {}
670+
671+
get_request1 = create_rack_request(
672+
"GET",
673+
"/",
674+
{ "HTTP_MCP_SESSION_ID" => session_id1 },
675+
)
676+
response1 = @transport.handle_request(get_request1)
677+
response1[2].call(broken_stream) if response1[2].is_a?(Proc)
678+
679+
# Session 2: healthy stream.
680+
healthy_stream = StringIO.new
681+
get_request2 = create_rack_request(
682+
"GET",
683+
"/",
684+
{ "HTTP_MCP_SESSION_ID" => session_id2 },
685+
)
686+
response2 = @transport.handle_request(get_request2)
687+
response2[2].call(healthy_stream) if response2[2].is_a?(Proc)
688+
689+
# Give the streams time to set up.
690+
sleep(0.1)
691+
692+
# Broadcast notification - should not abort despite ECONNRESET from session 1.
693+
sent_count = @transport.send_notification("test", { message: "hello" }, **{})
694+
695+
# Session 2 should have received the notification.
696+
assert_equal 1, sent_count
697+
698+
healthy_stream.rewind
699+
output = healthy_stream.read
700+
assert_includes output, '"method":"test"'
701+
702+
# Session 1 should have been cleaned up.
703+
assert_not @transport.instance_variable_get(:@sessions).key?(session_id1)
704+
705+
# Session 2 should still exist.
706+
assert @transport.instance_variable_get(:@sessions).key?(session_id2)
707+
end
708+
709+
test "send_keepalive_ping handles Errno::ECONNRESET gracefully" do
710+
# Create and initialize a session.
711+
init_request = create_rack_request(
712+
"POST",
713+
"/",
714+
{ "CONTENT_TYPE" => "application/json" },
715+
{ jsonrpc: "2.0", method: "initialize", id: "123" }.to_json,
716+
)
717+
init_response = @transport.handle_request(init_request)
718+
session_id = init_response[1]["Mcp-Session-Id"]
719+
720+
# Use a mock stream that raises Errno::ECONNRESET on write.
721+
mock_stream = Object.new
722+
mock_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET }
723+
mock_stream.define_singleton_method(:close) {}
724+
725+
# Connect with SSE using the mock stream.
726+
get_request = create_rack_request(
727+
"GET",
728+
"/",
729+
{ "HTTP_MCP_SESSION_ID" => session_id },
730+
)
731+
response = @transport.handle_request(get_request)
732+
response[2].call(mock_stream) if response[2].is_a?(Proc)
733+
734+
# Give the stream time to set up.
735+
sleep(0.1)
736+
737+
# send_keepalive_ping is private; re-raises to exit the keepalive loop.
738+
# Errno::ECONNRESET should be caught by the rescue clause (which reports
739+
# the exception) before being re-raised. Verify that exception_reporter
740+
# is called — this fails if ECONNRESET is not in the rescue list.
741+
reported_errors = []
742+
original_reporter = MCP.configuration.exception_reporter
743+
MCP.configuration.exception_reporter = ->(error, context) { reported_errors << [error, context] }
744+
745+
begin
746+
assert_raises(Errno::ECONNRESET) do
747+
@transport.send(:send_keepalive_ping, session_id)
748+
end
749+
750+
assert_equal(1, reported_errors.size)
751+
assert_instance_of(Errno::ECONNRESET, reported_errors.first[0])
752+
assert_equal("Stream closed", reported_errors.first[1][:error])
753+
ensure
754+
MCP.configuration.exception_reporter = original_reporter
755+
end
756+
end
757+
561758
test "responds with 405 for unsupported methods" do
562759
request = create_rack_request(
563760
"PUT",

0 commit comments

Comments
 (0)