From f2d0cc2feb606e8d96316f0f1bfad60c4e7fff80 Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Tue, 4 Nov 2025 19:08:39 -0800 Subject: [PATCH 1/4] catch and send errors on stream --- src/mcp/client/streamable_http.py | 52 ++++++++++++++++++------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 57df64705..7c4f450a7 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -251,6 +251,13 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None: await event_source.response.aclose() break + async def _send_error_response(self, ctx: RequestContext, error: Exception) -> None: + """Send an error response to the client.""" + error_data = ErrorData(code=32000, message=str(error)) + jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.root.id, error=error_data) + session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error)) + await ctx.read_stream_writer.send(session_message) + async def _handle_post_request(self, ctx: RequestContext) -> None: """Handle a POST request with response processing.""" headers = self._prepare_request_headers(ctx.headers) @@ -321,23 +328,23 @@ async def _handle_sse_response( is_initialization: bool = False, ) -> None: """Handle SSE response from the server.""" - try: - event_source = EventSource(response) - async for sse in event_source.aiter_sse(): - is_complete = await self._handle_sse_event( - sse, - ctx.read_stream_writer, - resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None), - is_initialization=is_initialization, - ) - # If the SSE event indicates completion, like returning respose/error - # break the loop - if is_complete: - await response.aclose() - break - except Exception as e: - logger.exception("Error reading SSE stream:") - await ctx.read_stream_writer.send(e) + event_source = EventSource(response) + finished = False + async for sse in event_source.aiter_sse(): + is_complete = await self._handle_sse_event( + sse, + ctx.read_stream_writer, + resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None), + is_initialization=is_initialization, + ) + # If the SSE event indicates completion, like returning respose/error + # break the loop + if is_complete: + finished = True + await response.aclose() + break + if not finished: + raise Exception("SSE stream ended without completing") async def _handle_unexpected_content_type( self, @@ -403,10 +410,13 @@ async def post_writer( ) async def handle_request_async(): - if is_resumption: - await self._handle_resumption_request(ctx) - else: - await self._handle_post_request(ctx) + try: + if is_resumption: + await self._handle_resumption_request(ctx) + else: + await self._handle_post_request(ctx) + except Exception as e: + await self._send_error_response(ctx, e) # If this is a request, start a new task to handle it if isinstance(message.root, JSONRPCRequest): From 333b32cc30450dfe36a1e1fc5dfd44843570e560 Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Thu, 6 Nov 2025 14:54:38 -0800 Subject: [PATCH 2/4] only catch errors in _handle_sse_response --- src/mcp/client/streamable_http.py | 49 ++++++++++++++++--------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 7c4f450a7..853d2fec1 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -328,23 +328,27 @@ async def _handle_sse_response( is_initialization: bool = False, ) -> None: """Handle SSE response from the server.""" - event_source = EventSource(response) - finished = False - async for sse in event_source.aiter_sse(): - is_complete = await self._handle_sse_event( - sse, - ctx.read_stream_writer, - resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None), - is_initialization=is_initialization, - ) - # If the SSE event indicates completion, like returning respose/error - # break the loop - if is_complete: - finished = True - await response.aclose() - break - if not finished: - raise Exception("SSE stream ended without completing") + try: + event_source = EventSource(response) + finished = False + async for sse in event_source.aiter_sse(): + is_complete = await self._handle_sse_event( + sse, + ctx.read_stream_writer, + resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None), + is_initialization=is_initialization, + ) + # If the SSE event indicates completion, like returning respose/error + # break the loop + if is_complete: + finished = True + await response.aclose() + break + if not finished: + raise Exception("SSE stream ended without completing") + except Exception as exc: + logger.exception("Error handling SSE response") + await self._send_error_response(ctx, exc) async def _handle_unexpected_content_type( self, @@ -410,13 +414,10 @@ async def post_writer( ) async def handle_request_async(): - try: - if is_resumption: - await self._handle_resumption_request(ctx) - else: - await self._handle_post_request(ctx) - except Exception as e: - await self._send_error_response(ctx, e) + if is_resumption: + await self._handle_resumption_request(ctx) + else: + await self._handle_post_request(ctx) # If this is a request, start a new task to handle it if isinstance(message.root, JSONRPCRequest): From aca621e4cc0066e2a2fc9da52e80658d644f247e Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Thu, 6 Nov 2025 15:19:40 -0800 Subject: [PATCH 3/4] add typecheck --- src/mcp/client/streamable_http.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 853d2fec1..a307eda81 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -254,9 +254,10 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None: async def _send_error_response(self, ctx: RequestContext, error: Exception) -> None: """Send an error response to the client.""" error_data = ErrorData(code=32000, message=str(error)) - jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.root.id, error=error_data) - session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error)) - await ctx.read_stream_writer.send(session_message) + if isinstance(ctx.session_message.message.root, JSONRPCRequest): + jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.root.id, error=error_data) + session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error)) + await ctx.read_stream_writer.send(session_message) async def _handle_post_request(self, ctx: RequestContext) -> None: """Handle a POST request with response processing.""" From 5e2aea49549cf3369ccbf47feaec632b44934734 Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Fri, 7 Nov 2025 12:31:56 -0800 Subject: [PATCH 4/4] catch errors in _handle_post_request --- src/mcp/client/streamable_http.py | 70 +++++++++++++++++-------------- 1 file changed, 38 insertions(+), 32 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index a307eda81..d610559f2 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -265,41 +265,47 @@ async def _handle_post_request(self, ctx: RequestContext) -> None: message = ctx.session_message.message is_initialization = self._is_initialization_request(message) - async with ctx.client.stream( - "POST", - self.url, - json=message.model_dump(by_alias=True, mode="json", exclude_none=True), - headers=headers, - ) as response: - if response.status_code == 202: - logger.debug("Received 202 Accepted") - return + try: + async with ctx.client.stream( + "POST", + self.url, + json=message.model_dump(by_alias=True, mode="json", exclude_none=True), + headers=headers, + ) as response: + if response.status_code == 202: + logger.debug("Received 202 Accepted") + return - if response.status_code == 404: - if isinstance(message.root, JSONRPCRequest): - await self._send_session_terminated_error( - ctx.read_stream_writer, - message.root.id, - ) - return + if response.status_code == 404: + if isinstance(message.root, JSONRPCRequest): + await self._send_session_terminated_error( + ctx.read_stream_writer, + message.root.id, + ) + return + + response.raise_for_status() + if is_initialization: + self._maybe_extract_session_id_from_response(response) - response.raise_for_status() + # Per https://modelcontextprotocol.io/specification/2025-06-18/basic#notifications: + # The server MUST NOT send a response to notifications. + if isinstance(message.root, JSONRPCRequest): + content_type = response.headers.get(CONTENT_TYPE, "").lower() + if content_type.startswith(JSON): + await self._handle_json_response(response, ctx.read_stream_writer, is_initialization) + elif content_type.startswith(SSE): + await self._handle_sse_response(response, ctx, is_initialization) + else: + await self._handle_unexpected_content_type( + content_type, + ctx.read_stream_writer, + ) + except Exception as exc: if is_initialization: - self._maybe_extract_session_id_from_response(response) - - # Per https://modelcontextprotocol.io/specification/2025-06-18/basic#notifications: - # The server MUST NOT send a response to notifications. - if isinstance(message.root, JSONRPCRequest): - content_type = response.headers.get(CONTENT_TYPE, "").lower() - if content_type.startswith(JSON): - await self._handle_json_response(response, ctx.read_stream_writer, is_initialization) - elif content_type.startswith(SSE): - await self._handle_sse_response(response, ctx, is_initialization) - else: - await self._handle_unexpected_content_type( - content_type, - ctx.read_stream_writer, - ) + raise exc + else: + await self._send_error_response(ctx, exc) async def _handle_json_response( self,