Skip to content

Client connection reuse and server-side query cancellation#152

Open
joe-clickhouse wants to merge 2 commits intomainfrom
joe/client-reuse-improvements
Open

Client connection reuse and server-side query cancellation#152
joe-clickhouse wants to merge 2 commits intomainfrom
joe/client-reuse-improvements

Conversation

@joe-clickhouse
Copy link
Copy Markdown
Collaborator

@joe-clickhouse joe-clickhouse commented Mar 25, 2026

Summary

The goal of this PR is to harden the MCP server for heavy, sustained usage by reusing client connections, adding real server-side query cancellation, and preventing worker thread pool exhaustion from zombie queries.

  • Connection reuse: Cache clickhouse_connect clients by config key instead of creating a new client on every tool call. Eliminates hundreds of ms of overhead per call that a significant contributor to intermittent timeouts under sustained use.
  • Server-side query cancellation: Assign a query_id to every query and issue KILL QUERY on timeout instead of the no-op future.cancel(). Timed-out queries no longer continue running as zombies consuming worker threads and ClickHouse server resources.
  • Timeout alignment: Auto-cap send_receive_timeout to query_timeout + 5, unless explicitly overridden so worker threads unblock shortly after the MCP timeout fires, preventing thread pool exhaustion.
  • Stale client eviction: Evict cached clients on connection errors and on failed liveness pings after idle. Read-only metadata calls like list_databases and list_tables retry once with a fresh client. run_query evicts but does not retry because writes could duplicate.
  • Config resolution fix: Resolve session config overrides on the request thread where FastMCP ContextVar is available before dispatching to the worker thread, fixing a latent bug where PR Client config override support via MCP Context Session states #115 overrides were silently missed inside the executor.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR hardens the mcp_clickhouse MCP server for sustained/high-concurrency usage by reusing ClickHouse clients across tool calls, implementing true server-side query cancellation, and aligning network timeouts to prevent worker exhaustion.

Changes:

  • Added a cached clickhouse_connect client layer keyed by resolved client config, with eviction on liveness/connection failures and limited retries for safe metadata tools.
  • Implemented query ID tracking and server-side cancellation via KILL QUERY on MCP timeouts.
  • Introduced CLICKHOUSE_MCP_MAX_WORKERS and aligned send_receive_timeout to query_timeout + 5 unless explicitly overridden.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
mcp_clickhouse/mcp_server.py Implements client caching, active query tracking, cancellation via KILL QUERY, timeout alignment, and worker pool sizing/logging.
mcp_clickhouse/mcp_env.py Adds CLICKHOUSE_MCP_MAX_WORKERS configuration property.
mcp_clickhouse/__init__.py Exposes additional helpers at package level via imports/__all__.
README.md Documents the updated timeout behavior, cancellation semantics, and new max-workers config.
tests/test_client_cache.py Adds coverage for caching behavior, eviction, and timeout alignment.
tests/test_query_cancellation.py Adds coverage for query_id propagation, active query tracking, and timeout-triggered cancellation.
tests/test_context_config_override.py Ensures test isolation by clearing the client cache around config override tests.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +739 to +743
# Re-check: another thread may have evicted while we pinged
if cache_key in _client_cache:
_client_cache[cache_key] = (candidate, time.time())
logger.debug("Reusing cached client (ping OK after idle)")
return candidate
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrency bug: after pinging an idle cached client, this block overwrites whatever is currently in _client_cache[cache_key] with candidate as long as the key exists. If another thread evicted/replaced the entry while we were pinging, this can resurrect a stale/closed client and discard the newer one. Re-check that the cached entry still refers to the same candidate before updating; otherwise return the current cached client (and close candidate if needed).

Suggested change
# Re-check: another thread may have evicted while we pinged
if cache_key in _client_cache:
_client_cache[cache_key] = (candidate, time.time())
logger.debug("Reusing cached client (ping OK after idle)")
return candidate
# Re-check: another thread may have evicted or replaced while we pinged
entry = _client_cache.get(cache_key)
if entry is not None:
current_client, _ = entry
if current_client is candidate:
# Same client still cached: refresh last-used timestamp
_client_cache[cache_key] = (candidate, time.time())
logger.debug("Reusing cached client (ping OK after idle)")
return candidate
# Different client now cached: prefer the newer cached client
try:
candidate.close()
except Exception:
pass
_client_cache[cache_key] = (current_client, time.time())
logger.debug(
"Reusing cached client (another thread replaced idle client)"
)
return current_client

Copilot uses AI. Check for mistakes.
Comment on lines +746 to +752
logger.warning("Cached client failed ping, creating new client")
with _client_cache_lock:
_client_cache.pop(cache_key, None)
try:
candidate.close()
except Exception:
pass
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related race: when candidate fails ping, this unconditionally pops cache_key from _client_cache. If another thread already created and cached a fresh client for the same key, this can evict the healthy new entry. Only remove the cache entry if it still points to candidate (e.g., compare object identity under the lock).

Copilot uses AI. Check for mistakes.
Comment on lines +574 to +577
client, _ = cached
logger.info("Cancelling query %s via KILL QUERY", query_id)
client.command(f"KILL QUERY WHERE query_id = '{query_id}'")
logger.info("Successfully cancelled query %s", query_id)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query_id is interpolated directly into a ClickHouse command string. If query_id ever contains a quote or is influenced by user input (directly or indirectly), this can break the command or enable injection. Ensure query_id is strictly validated (e.g., UUID format) and/or escaped before embedding into the KILL QUERY statement.

Copilot uses AI. Check for mistakes.
Comment on lines +595 to +602
# Log pool utilization for observability using the pending work queue
pending = QUERY_EXECUTOR._work_queue.qsize()
with _active_queries_lock:
in_flight = len(_active_queries)
if in_flight + pending >= _max_workers:
logger.warning(
"Thread pool saturated: %d in-flight + %d queued vs %d workers",
in_flight, pending, _max_workers,
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on ThreadPoolExecutor internals (QUERY_EXECUTOR._work_queue) for observability. _work_queue is a private attribute and may change across Python versions/implementations, causing runtime errors. Consider tracking queued/in-flight tasks via your own counters/metrics (increment on submit, decrement in callback) instead of touching private fields.

Suggested change
# Log pool utilization for observability using the pending work queue
pending = QUERY_EXECUTOR._work_queue.qsize()
with _active_queries_lock:
in_flight = len(_active_queries)
if in_flight + pending >= _max_workers:
logger.warning(
"Thread pool saturated: %d in-flight + %d queued vs %d workers",
in_flight, pending, _max_workers,
# Log pool utilization for observability based on active queries
with _active_queries_lock:
in_flight = len(_active_queries)
if in_flight >= _max_workers:
logger.warning(
"Thread pool saturated: %d in-flight vs %d workers",
in_flight, _max_workers,

Copilot uses AI. Check for mistakes.
Comment on lines +792 to +807
def _clear_client_cache():
"""Clear the client cache, closing all cached clients.

Used during shutdown and for testing.
"""
with _client_cache_lock:
for _, (client, _) in list(_client_cache.items()):
try:
client.close()
except Exception:
pass
_client_cache.clear()


atexit.register(_clear_client_cache)

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_clear_client_cache is registered with atexit after the executor shutdown hook. Since atexit runs callbacks in reverse registration order, the cache will be cleared (and clients closed) before the thread pool is shut down, potentially closing connections while worker threads are still running. Consider registering a single cleanup function or adjusting registration order so the executor shuts down before closing cached clients.

Copilot uses AI. Check for mistakes.
Comment on lines 3 to 7
from .mcp_server import (
create_clickhouse_client,
_clear_client_cache,
_resolve_client_config,
list_databases,
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__init__.py exports _clear_client_cache and _resolve_client_config via __all__, effectively making private, underscore-prefixed helpers part of the public package API. This makes future refactors harder and encourages external callers to rely on internals. Prefer keeping these unexported (tests can import from mcp_clickhouse.mcp_server) or renaming/dropping the underscore if they’re intended to be public API.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants