Cleanly shutdown CBS and queue watching thread#102
Conversation
When running integration tests with the callback server via Callback, I found that the tests would hang forever, I suspect because the queue watching process would never terminate. This PR reworks shutdown for the callback server and watch thread when setup via JobRunner.callback(). For now it doesn't alter how run() works, since that has watch loop and CBS termination built in and seems to work as is. The new stop() and wait_for_stop() methods will be exercised in the next PR. I manually tested that starting and curling the root of the CBS in a container works.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #102 +/- ##
==========================================
- Coverage 79.23% 78.35% -0.89%
==========================================
Files 13 13
Lines 1074 1095 +21
==========================================
+ Hits 851 858 +7
- Misses 223 237 +14 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
It looks like the stuff I changed didn't have tests. I'll add them in the next PR |
There was a problem hiding this comment.
Pull Request Overview
This PR implements a clean shutdown mechanism for the callback server (CBS) and queue watching thread when using JobRunner.callback(). Previously, integration tests would hang indefinitely because the queue watching process and callback server would never terminate properly.
Key changes include:
- Added shutdown event mechanism using multiprocessing.Event to coordinate termination between processes
- Converted synchronous blocking operations to threaded operations with proper cleanup
- Introduced stop() and wait_for_stop() methods for graceful shutdown
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| JobRunner/callback_server.py | Added shutdown event parameter and async listener to cleanly stop the Sanic server |
| JobRunner/JobRunner.py | Added threading support, shutdown coordination with Event, and new stop/wait methods |
| JobRunner/Callback.py | Refactored to use direct JobRunner instance instead of subprocess, enabling proper cleanup |
| After calling this method this instance of the job runner is no longer usable. | ||
| """ | ||
| self._shutdown_event.set() | ||
| self._stop = True | ||
| self.wait_for_stop() | ||
|
|
||
| def wait_for_stop(self): |
There was a problem hiding this comment.
[nitpick] Calling wait_for_stop() within stop() creates a blocking operation that may not be desired. Consider separating the shutdown initiation (stop) from the waiting (wait_for_stop) to give callers more control over when to block.
| After calling this method this instance of the job runner is no longer usable. | |
| """ | |
| self._shutdown_event.set() | |
| self._stop = True | |
| self.wait_for_stop() | |
| def wait_for_stop(self): | |
| This method initiates the shutdown process but does not block. | |
| Call `wait_for_stop()` to block until the shutdown is complete. | |
| """ | |
| self._shutdown_event.set() | |
| self._stop = True | |
| def wait_for_stop(self): | |
| """ | |
| Block until the callback server and watch thread have finished. | |
| """ |
There was a problem hiding this comment.
Calling wait_for_stop() within stop() creates a blocking operation that may not be desired.
The blocking operation is how it's supposed to work - the calling thread should block until the CBS process and thread have exited and no zombie processes are left
| @app.after_server_start | ||
| async def shutdown_listener(app, _): | ||
| while not shutdown_event.is_set(): | ||
| await asyncio.sleep(0.1) |
There was a problem hiding this comment.
[nitpick] The 0.1 second polling interval in the shutdown listener creates unnecessary CPU usage. Consider using a larger interval (e.g., 1.0 second) or implementing an event-driven approach if more responsive shutdown is needed.
| await asyncio.sleep(0.1) | |
| await asyncio.sleep(1.0) |
There was a problem hiding this comment.
100 milliseconds is an eternity for a modern processor
There was a problem hiding this comment.
Besides, one of your AI compatriots suggested it. Argue with him / her / it
There was a problem hiding this comment.
I can't wait for the point where code reviews are just AIs arguing with each other
JobRunner/Callback.py
Outdated
| return job_params | ||
|
|
||
| def run(self): | ||
| def start(self): |
There was a problem hiding this comment.
Can we make this a more descriptive name ? It is not clear what is being started.
I have forgotten what much of the code does, and from looking at these files it is not obvious what even is a "CallBack" vs a "CallBackServer"
There was a problem hiding this comment.
self.cbs is no longer used, is that right? if so, it can be removed.
There was a problem hiding this comment.
Callback.py is a small helper to start the callback server from the command line (or a docker command, really). callback_server.py is the Sanic server code.
That said, no argument from me about naming
When running integration tests with the callback server via Callback, I found that the tests would hang forever, I suspect because the queue watching process would never terminate. This PR reworks shutdown for the callback server and watch thread when setup via JobRunner.callback(). For now it doesn't alter how run() works, since that has watch loop and CBS termination built in and seems to work as is.
The new stop() and wait_for_stop() methods will be exercised in the next PR.
I manually tested that starting and curling the root of the CBS in a container works.