Skip to content

Cleanly shutdown CBS and queue watching thread#102

Merged
MrCreosote merged 2 commits intomainfrom
dev_kb_sdk_plus
Jul 29, 2025
Merged

Cleanly shutdown CBS and queue watching thread#102
MrCreosote merged 2 commits intomainfrom
dev_kb_sdk_plus

Conversation

@MrCreosote
Copy link
Member

When running integration tests with the callback server via Callback, I found that the tests would hang forever, I suspect because the queue watching process would never terminate. This PR reworks shutdown for the callback server and watch thread when setup via JobRunner.callback(). For now it doesn't alter how run() works, since that has watch loop and CBS termination built in and seems to work as is.

The new stop() and wait_for_stop() methods will be exercised in the next PR.

I manually tested that starting and curling the root of the CBS in a container works.

When running integration tests with the callback server via Callback, I
found that the tests would hang forever, I suspect because the queue
watching process would never terminate. This PR reworks shutdown for the
callback server and watch thread when setup via JobRunner.callback().
For now it doesn't alter how run() works, since that has watch loop and
CBS termination built in and seems to work as is.

The new stop() and wait_for_stop() methods will be exercised in the next
PR.

I manually tested that starting and curling the root of the CBS in a
container works.
@MrCreosote MrCreosote requested a review from bio-boris July 28, 2025 16:59
@codecov
Copy link

codecov bot commented Jul 28, 2025

Codecov Report

❌ Patch coverage is 38.63636% with 27 lines in your changes missing coverage. Please review.
✅ Project coverage is 78.35%. Comparing base (d744990) to head (36e0ce6).
⚠️ Report is 8 commits behind head on main.

Files with missing lines Patch % Lines
JobRunner/JobRunner.py 42.10% 11 Missing ⚠️
JobRunner/Callback.py 37.50% 10 Missing ⚠️
JobRunner/callback_server.py 33.33% 6 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #102      +/-   ##
==========================================
- Coverage   79.23%   78.35%   -0.89%     
==========================================
  Files          13       13              
  Lines        1074     1095      +21     
==========================================
+ Hits          851      858       +7     
- Misses        223      237      +14     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@MrCreosote
Copy link
Member Author

It looks like the stuff I changed didn't have tests. I'll add them in the next PR

@bio-boris bio-boris requested a review from Copilot July 29, 2025 00:55
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a clean shutdown mechanism for the callback server (CBS) and queue watching thread when using JobRunner.callback(). Previously, integration tests would hang indefinitely because the queue watching process and callback server would never terminate properly.

Key changes include:

  • Added shutdown event mechanism using multiprocessing.Event to coordinate termination between processes
  • Converted synchronous blocking operations to threaded operations with proper cleanup
  • Introduced stop() and wait_for_stop() methods for graceful shutdown

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
JobRunner/callback_server.py Added shutdown event parameter and async listener to cleanly stop the Sanic server
JobRunner/JobRunner.py Added threading support, shutdown coordination with Event, and new stop/wait methods
JobRunner/Callback.py Refactored to use direct JobRunner instance instead of subprocess, enabling proper cleanup

Comment on lines +472 to +478
After calling this method this instance of the job runner is no longer usable.
"""
self._shutdown_event.set()
self._stop = True
self.wait_for_stop()

def wait_for_stop(self):
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Calling wait_for_stop() within stop() creates a blocking operation that may not be desired. Consider separating the shutdown initiation (stop) from the waiting (wait_for_stop) to give callers more control over when to block.

Suggested change
After calling this method this instance of the job runner is no longer usable.
"""
self._shutdown_event.set()
self._stop = True
self.wait_for_stop()
def wait_for_stop(self):
This method initiates the shutdown process but does not block.
Call `wait_for_stop()` to block until the shutdown is complete.
"""
self._shutdown_event.set()
self._stop = True
def wait_for_stop(self):
"""
Block until the callback server and watch thread have finished.
"""

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling wait_for_stop() within stop() creates a blocking operation that may not be desired.

The blocking operation is how it's supposed to work - the calling thread should block until the CBS process and thread have exited and no zombie processes are left

@app.after_server_start
async def shutdown_listener(app, _):
while not shutdown_event.is_set():
await asyncio.sleep(0.1)
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The 0.1 second polling interval in the shutdown listener creates unnecessary CPU usage. Consider using a larger interval (e.g., 1.0 second) or implementing an event-driven approach if more responsive shutdown is needed.

Suggested change
await asyncio.sleep(0.1)
await asyncio.sleep(1.0)

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100 milliseconds is an eternity for a modern processor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, one of your AI compatriots suggested it. Argue with him / her / it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't wait for the point where code reviews are just AIs arguing with each other

return job_params

def run(self):
def start(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a more descriptive name ? It is not clear what is being started.

I have forgotten what much of the code does, and from looking at these files it is not obvious what even is a "CallBack" vs a "CallBackServer"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.cbs is no longer used, is that right? if so, it can be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed both issues

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callback.py is a small helper to start the callback server from the command line (or a docker command, really). callback_server.py is the Sanic server code.

That said, no argument from me about naming

@MrCreosote MrCreosote merged commit f037f7f into main Jul 29, 2025
10 of 12 checks passed
@bio-boris bio-boris mentioned this pull request Aug 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants