Skip to content
Merged

Dev #84

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/communication_protocols/http/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "utcp-http"
version = "1.1.1"
version = "1.1.3"
authors = [
{ name = "UTCP Contributors" },
]
Expand Down
112 changes: 112 additions & 0 deletions plugins/communication_protocols/http/src/utcp_http/_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""URL validation shared by every HTTP-based communication protocol.

Centralised so all three HTTP protocols (http, streamable_http, sse) enforce
the same trust boundary at every network edge — manual discovery AND tool
invocation. Issue #83 (CVE-class SSRF) was caused by the runtime invocation
path forgetting the discovery-time check, so this module also provides an
explicit ``ensure_secure_url`` to call before every aiohttp request.
"""

from __future__ import annotations

from ipaddress import ip_address
from typing import Optional
from urllib.parse import urlparse

# Hostnames considered safe to talk to over plain HTTP.
_LOOPBACK_HOSTNAMES = frozenset({"localhost", "127.0.0.1", "::1", "[::1]"})


def is_secure_url(url: str) -> bool:
"""Return True if ``url`` is safe to fetch from a UTCP HTTP protocol.

Allowed:
- Any ``https://`` URL.
- ``http://`` URLs whose host is exactly ``localhost``, ``127.0.0.1``,
or ``::1``.

Disallowed:
- Plain ``http://`` to any other host (MITM exposure).
- URLs whose hostname *starts* with ``localhost`` / ``127.0.0.1`` but
isn't actually loopback (e.g. ``http://localhost.evil.com``,
``http://127.0.0.1.attacker.example``). The earlier ``startswith``
check let these through.
- Anything without a scheme/host (file://, gopher://, javascript:, ...).
"""
if not isinstance(url, str) or not url:
return False

try:
parsed = urlparse(url)
except ValueError:
return False

scheme = (parsed.scheme or "").lower()
if scheme not in {"http", "https"}:
return False

host = (parsed.hostname or "").lower()
if not host:
return False

if scheme == "https":
return True

# http:// is only allowed for loopback.
if host in _LOOPBACK_HOSTNAMES:
return True

# Catch any other literal loopback IP that urlparse normalised
# (e.g. ``http://127.000.000.001``).
try:
return ip_address(host).is_loopback
except ValueError:
return False


def is_loopback_url(url: str) -> bool:
"""Return True if ``url``'s host is a literal loopback address.

Used by the OpenAPI converter to detect the SSRF case where a remote spec
declares ``servers: [{ url: "http://127.0.0.1:..." }]`` to redirect tool
invocation at the host running the agent. Hostname-based — not a string
prefix — so ``http://localhost.evil.com`` returns False.
"""
if not isinstance(url, str) or not url:
return False

try:
parsed = urlparse(url)
except ValueError:
return False

host = (parsed.hostname or "").lower()
if not host:
return False

if host in _LOOPBACK_HOSTNAMES:
return True

try:
return ip_address(host).is_loopback
except ValueError:
return False


def ensure_secure_url(url: str, *, context: Optional[str] = None) -> None:
"""Raise ``ValueError`` if ``url`` is not safe to fetch.

``context`` is a short label (``"manual discovery"``, ``"tool invocation"``,
etc.) included in the error so log readers can tell which trust boundary
was breached.
"""
if is_secure_url(url):
return

where = f" during {context}" if context else ""
raise ValueError(
f"Security error{where}: URL must use HTTPS or be a literal loopback "
f"address (localhost / 127.0.0.1 / ::1). Got: {url!r}. "
"Plain HTTP to any other host is rejected to prevent MITM attacks "
"and SSRF into internal services."
)
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from utcp_http.http_call_template import HttpCallTemplate
from aiohttp import ClientSession, BasicAuth as AiohttpBasicAuth
from utcp_http.openapi_converter import OpenApiConverter
from utcp_http._security import ensure_secure_url
import logging

logging.basicConfig(
Expand Down Expand Up @@ -123,14 +124,10 @@ async def register_manual(self, caller, manual_call_template: CallTemplate) -> R

try:
url = manual_call_template.url

# Security check: Enforce HTTPS or localhost to prevent MITM attacks
if not (url.startswith("https://") or url.startswith("http://localhost") or url.startswith("http://127.0.0.1")):
raise ValueError(
f"Security error: URL must use HTTPS or start with 'http://localhost' or 'http://127.0.0.1'. Got: {url}. "
"Non-secure URLs are vulnerable to man-in-the-middle attacks."
)


# Security check: only HTTPS or loopback HTTP allowed for manual discovery.
ensure_secure_url(url, context="manual discovery")

logger.info(f"Discovering tools from '{manual_call_template.name}' (HTTP) at {url}")

# Use the call template's configuration (headers, auth, HTTP method, etc.)
Expand Down Expand Up @@ -274,7 +271,15 @@ async def call_tool(self, caller, tool_name: str, tool_args: Dict[str, Any], too

# Build the URL with path parameters substituted
url = self._build_url_with_path_params(tool_call_template.url, remaining_args)


# Security check: re-validate the resolved URL before each invocation.
# An attacker-controlled OpenAPI spec discovered over a legitimate HTTPS
# URL can declare ``servers[0].url`` pointing at internal services
# (e.g. http://169.254.169.254 for cloud metadata, http://127.0.0.1:9200
# for an unauthenticated Elasticsearch). Without this re-check, tool
# invocation is a blind SSRF primitive — see GHSA / issue #83.
ensure_secure_url(url, context="tool invocation")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: The SSRF check allows loopback addresses (http://127.0.0.1:*), which contradicts the threat model documented in its own comment. An attacker-controlled OpenAPI spec fetched over a legitimate HTTPS endpoint can set servers[0].url to http://127.0.0.1:9200 (or any other local service), and ensure_secure_url will pass it through because 127.0.0.1 is in the loopback allowlist.

Consider using a stricter variant for the tool-invocation context that only allows HTTPS (no loopback HTTP), or at least remove the misleading comment about blocking http://127.0.0.1:9200.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At plugins/communication_protocols/http/src/utcp_http/http_communication_protocol.py, line 281:

<comment>The SSRF check allows loopback addresses (`http://127.0.0.1:*`), which contradicts the threat model documented in its own comment. An attacker-controlled OpenAPI spec fetched over a legitimate HTTPS endpoint can set `servers[0].url` to `http://127.0.0.1:9200` (or any other local service), and `ensure_secure_url` will pass it through because `127.0.0.1` is in the loopback allowlist.

Consider using a stricter variant for the tool-invocation context that only allows HTTPS (no loopback HTTP), or at least remove the misleading comment about blocking `http://127.0.0.1:9200`.</comment>

<file context>
@@ -274,7 +271,15 @@ async def call_tool(self, caller, tool_name: str, tool_args: Dict[str, Any], too
+        # (e.g. http://169.254.169.254 for cloud metadata, http://127.0.0.1:9200
+        # for an unauthenticated Elasticsearch). Without this re-check, tool
+        # invocation is a blind SSRF primitive — see GHSA / issue #83.
+        ensure_secure_url(url, context="tool invocation")
+
         # The rest of the arguments are query parameters
</file context>


# The rest of the arguments are query parameters
query_params = remaining_args

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from utcp.data.utcp_manual import UtcpManual
from utcp.data.tool import Tool, JsonSchema
from utcp_http.http_call_template import HttpCallTemplate
from utcp_http._security import is_loopback_url

class OpenApiConverter:
"""REQUIRED
Expand Down Expand Up @@ -148,9 +149,32 @@ def convert(self) -> UtcpManual:

# Determine base URL: override > servers > spec_url > fallback
if self._base_url_override:
# Explicit override from UTCP config — caller has accepted the
# trust decision, no further validation here.
base_url = self._base_url_override
elif self.spec.get("servers"):
base_url = self.spec["servers"][0].get("url", "/")

# Rule: a spec fetched from a non-loopback source cannot declare
# a loopback server URL. A user pointing the converter at their
# own localhost OpenAPI spec is allowed to declare loopback
# servers, and an explicit ``base_url`` override always wins
# (handled above).
if (
self.spec_url
and not is_loopback_url(self.spec_url)
and is_loopback_url(base_url)
):
raise ValueError(
"Security error: OpenAPI spec fetched from "
f"{self.spec_url!r} declares a loopback server URL "
f"({base_url!r}). A remote spec is not allowed to "
"redirect tool calls at the agent's own loopback "
"interface — this is the SSRF pattern from "
"GHSA-39j6-4867-gg4w. If you trust this spec, set "
"the call template's ``base_url`` override "
"explicitly to bypass this check."
)
elif self.spec_url:
parsed_url = urlparse(self.spec_url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from utcp.data.auth_implementations.oauth2_auth import OAuth2Auth
from utcp_http.sse_call_template import SseCallTemplate
from aiohttp import ClientSession, BasicAuth as AiohttpBasicAuth
from utcp_http._security import ensure_secure_url
import traceback
import logging

Expand Down Expand Up @@ -77,14 +78,10 @@ async def register_manual(self, caller, manual_call_template: CallTemplate) -> R

try:
url = manual_call_template.url

# Security check: Enforce HTTPS or localhost to prevent MITM attacks
if not (url.startswith("https://") or url.startswith("http://localhost") or url.startswith("http://127.0.0.1")):
raise ValueError(
f"Security error: URL must use HTTPS or start with 'http://localhost' or 'http://127.0.0.1'. Got: {url}. "
"Non-secure URLs are vulnerable to man-in-the-middle attacks."
)


# Security check: only HTTPS or loopback HTTP allowed for manual discovery.
ensure_secure_url(url, context="manual discovery")

logger.info(f"Discovering tools from '{manual_call_template.name}' (SSE) at {url}")

# Use the provider's configuration (headers, auth, etc.)
Expand Down Expand Up @@ -188,7 +185,12 @@ async def call_tool_streaming(self, caller, tool_name: str, tool_args: Dict[str,

# Build the URL with path parameters substituted
url = self._build_url_with_path_params(tool_call_template.url, remaining_args)


# Security check: re-validate the resolved URL before each invocation.
# Defends against SSRF via attacker-controlled OpenAPI specs that point
# ``servers[0].url`` at internal services. See issue #83.
ensure_secure_url(url, context="tool invocation")

# The rest of the arguments are query parameters
query_params = remaining_args

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from utcp.data.auth_implementations import OAuth2Auth
from utcp_http.streamable_http_call_template import StreamableHttpCallTemplate
from aiohttp import ClientSession, BasicAuth as AiohttpBasicAuth, ClientResponse
from utcp_http._security import ensure_secure_url
import logging

logging.basicConfig(
Expand Down Expand Up @@ -78,14 +79,11 @@ async def register_manual(self, caller, manual_call_template: CallTemplate) -> R
raise ValueError("StreamableHttpCommunicationProtocol can only be used with StreamableHttpCallTemplate")

url = manual_call_template.url

# Security check: Enforce HTTPS or localhost to prevent MITM attacks
if not (url.startswith("https://") or url.startswith("http://localhost") or url.startswith("http://127.0.0.1")):
raise ValueError(
f"Security error: URL must use HTTPS or start with 'http://localhost' or 'http://127.0.0.1'. Got: {url}. "
"Non-secure URLs are vulnerable to man-in-the-middle attacks."
)


# Security check: only HTTPS or loopback HTTP allowed for manual discovery.
ensure_secure_url(url, context="manual discovery")


logger.info(f"Discovering tools from '{manual_call_template.name}' (HTTP Stream) at {url}")

try:
Expand Down Expand Up @@ -216,7 +214,12 @@ async def call_tool_streaming(self, caller, tool_name: str, tool_args: Dict[str,

# Build the URL with path parameters substituted
url = self._build_url_with_path_params(tool_call_template.url, remaining_args)


# Security check: re-validate the resolved URL before each invocation.
# Defends against SSRF via attacker-controlled OpenAPI specs that point
# ``servers[0].url`` at internal services. See issue #83.
ensure_secure_url(url, context="tool invocation")

# The rest of the arguments are query parameters
query_params = remaining_args

Expand Down
Loading
Loading