Client connection reuse and server-side query cancellation#152
Client connection reuse and server-side query cancellation#152joe-clickhouse wants to merge 2 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR hardens the mcp_clickhouse MCP server for sustained/high-concurrency usage by reusing ClickHouse clients across tool calls, implementing true server-side query cancellation, and aligning network timeouts to prevent worker exhaustion.
Changes:
- Added a cached
clickhouse_connectclient layer keyed by resolved client config, with eviction on liveness/connection failures and limited retries for safe metadata tools. - Implemented query ID tracking and server-side cancellation via
KILL QUERYon MCP timeouts. - Introduced
CLICKHOUSE_MCP_MAX_WORKERSand alignedsend_receive_timeouttoquery_timeout + 5unless explicitly overridden.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
mcp_clickhouse/mcp_server.py |
Implements client caching, active query tracking, cancellation via KILL QUERY, timeout alignment, and worker pool sizing/logging. |
mcp_clickhouse/mcp_env.py |
Adds CLICKHOUSE_MCP_MAX_WORKERS configuration property. |
mcp_clickhouse/__init__.py |
Exposes additional helpers at package level via imports/__all__. |
README.md |
Documents the updated timeout behavior, cancellation semantics, and new max-workers config. |
tests/test_client_cache.py |
Adds coverage for caching behavior, eviction, and timeout alignment. |
tests/test_query_cancellation.py |
Adds coverage for query_id propagation, active query tracking, and timeout-triggered cancellation. |
tests/test_context_config_override.py |
Ensures test isolation by clearing the client cache around config override tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Re-check: another thread may have evicted while we pinged | ||
| if cache_key in _client_cache: | ||
| _client_cache[cache_key] = (candidate, time.time()) | ||
| logger.debug("Reusing cached client (ping OK after idle)") | ||
| return candidate |
There was a problem hiding this comment.
Concurrency bug: after pinging an idle cached client, this block overwrites whatever is currently in _client_cache[cache_key] with candidate as long as the key exists. If another thread evicted/replaced the entry while we were pinging, this can resurrect a stale/closed client and discard the newer one. Re-check that the cached entry still refers to the same candidate before updating; otherwise return the current cached client (and close candidate if needed).
| # Re-check: another thread may have evicted while we pinged | |
| if cache_key in _client_cache: | |
| _client_cache[cache_key] = (candidate, time.time()) | |
| logger.debug("Reusing cached client (ping OK after idle)") | |
| return candidate | |
| # Re-check: another thread may have evicted or replaced while we pinged | |
| entry = _client_cache.get(cache_key) | |
| if entry is not None: | |
| current_client, _ = entry | |
| if current_client is candidate: | |
| # Same client still cached: refresh last-used timestamp | |
| _client_cache[cache_key] = (candidate, time.time()) | |
| logger.debug("Reusing cached client (ping OK after idle)") | |
| return candidate | |
| # Different client now cached: prefer the newer cached client | |
| try: | |
| candidate.close() | |
| except Exception: | |
| pass | |
| _client_cache[cache_key] = (current_client, time.time()) | |
| logger.debug( | |
| "Reusing cached client (another thread replaced idle client)" | |
| ) | |
| return current_client |
| logger.warning("Cached client failed ping, creating new client") | ||
| with _client_cache_lock: | ||
| _client_cache.pop(cache_key, None) | ||
| try: | ||
| candidate.close() | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Related race: when candidate fails ping, this unconditionally pops cache_key from _client_cache. If another thread already created and cached a fresh client for the same key, this can evict the healthy new entry. Only remove the cache entry if it still points to candidate (e.g., compare object identity under the lock).
| client, _ = cached | ||
| logger.info("Cancelling query %s via KILL QUERY", query_id) | ||
| client.command(f"KILL QUERY WHERE query_id = '{query_id}'") | ||
| logger.info("Successfully cancelled query %s", query_id) |
There was a problem hiding this comment.
query_id is interpolated directly into a ClickHouse command string. If query_id ever contains a quote or is influenced by user input (directly or indirectly), this can break the command or enable injection. Ensure query_id is strictly validated (e.g., UUID format) and/or escaped before embedding into the KILL QUERY statement.
| # Log pool utilization for observability using the pending work queue | ||
| pending = QUERY_EXECUTOR._work_queue.qsize() | ||
| with _active_queries_lock: | ||
| in_flight = len(_active_queries) | ||
| if in_flight + pending >= _max_workers: | ||
| logger.warning( | ||
| "Thread pool saturated: %d in-flight + %d queued vs %d workers", | ||
| in_flight, pending, _max_workers, |
There was a problem hiding this comment.
This relies on ThreadPoolExecutor internals (QUERY_EXECUTOR._work_queue) for observability. _work_queue is a private attribute and may change across Python versions/implementations, causing runtime errors. Consider tracking queued/in-flight tasks via your own counters/metrics (increment on submit, decrement in callback) instead of touching private fields.
| # Log pool utilization for observability using the pending work queue | |
| pending = QUERY_EXECUTOR._work_queue.qsize() | |
| with _active_queries_lock: | |
| in_flight = len(_active_queries) | |
| if in_flight + pending >= _max_workers: | |
| logger.warning( | |
| "Thread pool saturated: %d in-flight + %d queued vs %d workers", | |
| in_flight, pending, _max_workers, | |
| # Log pool utilization for observability based on active queries | |
| with _active_queries_lock: | |
| in_flight = len(_active_queries) | |
| if in_flight >= _max_workers: | |
| logger.warning( | |
| "Thread pool saturated: %d in-flight vs %d workers", | |
| in_flight, _max_workers, |
| def _clear_client_cache(): | ||
| """Clear the client cache, closing all cached clients. | ||
|
|
||
| Used during shutdown and for testing. | ||
| """ | ||
| with _client_cache_lock: | ||
| for _, (client, _) in list(_client_cache.items()): | ||
| try: | ||
| client.close() | ||
| except Exception: | ||
| pass | ||
| _client_cache.clear() | ||
|
|
||
|
|
||
| atexit.register(_clear_client_cache) | ||
|
|
There was a problem hiding this comment.
_clear_client_cache is registered with atexit after the executor shutdown hook. Since atexit runs callbacks in reverse registration order, the cache will be cleared (and clients closed) before the thread pool is shut down, potentially closing connections while worker threads are still running. Consider registering a single cleanup function or adjusting registration order so the executor shuts down before closing cached clients.
| from .mcp_server import ( | ||
| create_clickhouse_client, | ||
| _clear_client_cache, | ||
| _resolve_client_config, | ||
| list_databases, |
There was a problem hiding this comment.
__init__.py exports _clear_client_cache and _resolve_client_config via __all__, effectively making private, underscore-prefixed helpers part of the public package API. This makes future refactors harder and encourages external callers to rely on internals. Prefer keeping these unexported (tests can import from mcp_clickhouse.mcp_server) or renaming/dropping the underscore if they’re intended to be public API.
Summary
The goal of this PR is to harden the MCP server for heavy, sustained usage by reusing client connections, adding real server-side query cancellation, and preventing worker thread pool exhaustion from zombie queries.
clickhouse_connectclients by config key instead of creating a new client on every tool call. Eliminates hundreds of ms of overhead per call that a significant contributor to intermittent timeouts under sustained use.query_idto every query and issueKILL QUERYon timeout instead of the no-opfuture.cancel(). Timed-out queries no longer continue running as zombies consuming worker threads and ClickHouse server resources.send_receive_timeouttoquery_timeout+ 5, unless explicitly overridden so worker threads unblock shortly after the MCP timeout fires, preventing thread pool exhaustion.list_databasesandlist_tablesretry once with a fresh client.run_queryevicts but does not retry because writes could duplicate.ContextVaris available before dispatching to the worker thread, fixing a latent bug where PR Client config override support via MCP Context Session states #115 overrides were silently missed inside the executor.