Skip to content

feat(mcap): stateful replay daemon with RPC control#150

Merged
freol35241 merged 5 commits into
mainfrom
feat/mcap-replay-daemon
Jun 11, 2026
Merged

feat(mcap): stateful replay daemon with RPC control#150
freol35241 merged 5 commits into
mainfrom
feat/mcap-replay-daemon

Conversation

@freol35241

Copy link
Copy Markdown
Contributor

Summary

Replaces the one-shot mcap2keelson replayer (258 lines) with a long-lived stateful replay daemon (1328 lines) controllable over Zenoh RPC. PR 5 (final) of the 5.1.0V-trial split, and the largest.

What it does

  • McapReplayControl RPC service — 12 procedures: load_file, play, pause, stop, seek, set_speed, set_loop, step, set_segment, set_channel_filter, list_files, get_status. STOPPED / PLAYING / PAUSED / LOADING state machine; timing preserved on the walk; load_file runs on a worker thread and returns "accepted" immediately.
  • replay_status broadcast (keelson.ReplayStatus) at 5 Hz while playing / 1 Hz idle, carrying DaemonInfo (version/host/base-dir) so clients can discover replayers from the stream; async load progress + last error surface via the status.
  • New interfaces/McapReplayControl.proto, messages/payloads/ReplayStatus.proto, replay_status subject.

Folded-in prerequisite

Includes the typed ErrorResponse.Code enum (was the closed #144) — the replay daemon is its only consumer, mapping every RPC failure to a code (NOT_FOUND, OUT_OF_RANGE, INVALID_STATE, PERMISSION_DENIED, …).

Applied onto current main

Main hadn't touched mcap2keelson.py or the test files since the trial's base, so those are taken as-authored. The only divergence was mcap/README.md: I kept main's recording-example changes (the dual @target -k patterns) and swapped in the trial's rewritten "## MCAP Replay" section. No new dependencies (requirements.txt/lock unchanged).

Validation

  • uv run pytest -m "not e2e" connectors/mcap/81 passed.
  • uv run pytest -m e2e connectors/mcap/48 passed (incl. the 24-test 977-line replay RPC e2e: state transitions, seek/segment/speed/step/channel-filter, async load LOADING→PAUSED, path-traversal PERMISSION_DENIED, loopback guard).
  • mcap2keelson --help runs; ruff + black clean; Python + JS SDKs regenerate cleanly.

Review notes — recommended follow-ups (not fixed here; faithful extraction)

From the earlier review of this code, worth addressing before relying on it in production:

  1. No-summary-statistics MCAP files: start/end/count default to 0, so seek/segment/progress silently break (the old replayer fell back to a scan). Not covered by tests.
  2. Busy spin-wait in the timing loop (sleep(0.0)) can peg a core on sparse recordings now that it's a long-lived daemon.
  3. _load_file prior_state revert captures LOADING rather than the true pre-load state (self-corrects via the worker's outer handler, but the revert path is dead).
  4. Loop path (set_loop + EOF re-seek) is toggled but not exercised end-to-end.

Independent of PR 3 (#147, two-stage shutdown), but benefits from it — this is the first long-lived blocking daemon.

🤖 Generated with Claude Code

Replace the one-shot mcap2keelson replayer (258 lines) with a long-lived
stateful daemon (1328 lines) controllable over Zenoh RPC.

- New McapReplayControl RPC service (12 procedures: load_file, play,
  pause, stop, seek, set_speed, set_loop, step, set_segment,
  set_channel_filter, list_files, get_status) with a
  STOPPED/PLAYING/PAUSED/LOADING state machine.
- Broadcasts keelson.ReplayStatus on the `replay_status` subject (5 Hz
  while playing, 1 Hz idle) carrying DaemonInfo so clients can discover
  replayers from the stream alone; async load lifecycle/errors surface
  via load_progress_pct / last_load_error.
- New interfaces/McapReplayControl.proto, messages/payloads/
  ReplayStatus.proto, and the `replay_status` subject.

Folds in the typed ErrorResponse.Code enum (NOT_FOUND, OUT_OF_RANGE,
INVALID_STATE, PERMISSION_DENIED, ...) — the replay daemon is its sole
consumer, mapping every RPC error to a code.

Adds a 977-line e2e suite (24 tests) covering the RPC surface, state
transitions, seek/segment/speed/step/channel-filter, async load, the
path-traversal guard and the self-publish loopback guard; adapts the
battle/cli/e2e tests to the daemon shape.

The replay README section is rewritten; the keelson2mcap recording docs
(dual @target -k patterns) on main are preserved.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This was referenced Jun 9, 2026

@freol35241 freol35241 left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review — 🔴 Changes requested

@TedSjoblom — flagging you as the owner for these changes (you authored the replay daemon on the trial branch; I've assigned the PR to you). (Posted as a COMMENT-event review rather than a formal "Request changes" because GitHub doesn't allow requesting changes on a PR you opened — treat this as changes-requested.)

Strong engineering overall — the concurrency model (single STATE_LOCK, the PAUSE_EVENT / COMMAND_EVENT split, the iterator-break pattern) is disciplined, and the e2e coverage is broad. Two things gate a merge: one blocking defect, plus a design question that determines whether the full control surface is justified.

Design question (please answer before more investment)

play / pause / seek / set_speed / step / segment-loop is a media-scrubber control surface, which is only justified by an interactive consumer — a UI that scrubs through a recording. If the real use is "replay a file onto the bus for a test/demo," the previous one-shot replayer was the right scope and this is a lot of machinery for it. Is there a replay-control UI (or a concrete plan for one)? The answer decides whether we proceed as-is or trim the surface.

Blocking

  • Spin-wait pegs a CPU core while playing (inline at the timing loop). Must fix before merge.

Should-fix

  • No-summary-statistics files load half-broken, silently — seek/segment/progress disabled with no error (inline).
  • prior_state revert on load failure is dead/misleading (inline).

Nice-to-have

  • The loop path (set_loop + EOF re-seek) reads correctly but has no end-to-end test — please add one.

Details inline. Happy to pair on any of these.

Comment thread connectors/mcap/bin/mcap2keelson.py Outdated
Comment thread connectors/mcap/bin/mcap2keelson.py Outdated
Comment thread connectors/mcap/bin/mcap2keelson.py Outdated
…k, dead revert

Resolves the changes-requested review on the stateful replay daemon (PR #150):

- Blocking: replace the time.sleep(0.0) spin-wait in _walk_iterator with
  bounded min(remaining, 5ms) sleeps so the long-lived daemon idles between
  messages instead of pegging a CPU core (and starving the status/RPC threads
  of the GIL) through quiet stretches of a recording.
- Should-fix: recover start/end/count/channels by scanning files whose MCAP
  summary lacks statistics (new _scan_file, run off STATE_LOCK), so
  seek/set_segment/progress work — and the scrubber UI's timeline unlocks —
  instead of silently degrading to a zeroed [0,0] range. Extend the loopback
  guard to the scanned topics.
- Should-fix: drop the dead prior_state capture/revert in _load_file; the load
  worker's forced STOPPED is the single honest load-failure transition.
- Add e2e coverage: loop re-seek on EOF (climb-then-reset), and no-summary
  scan recovery (load + mid-file seek succeed).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@TedSjoblom

Copy link
Copy Markdown
Contributor

Thanks for the thorough review — pushed 3102202 addressing the blocking item, both should-fixes, and the loop test.

Design question — yes, there's an interactive consumer

The crowsnest-dev operator UI ships a production React replay mini-app (FloatAppMcapReplay.jsx, merged 2026-05-24) that drives this surface over zenoh-ts: load_file, play, pause, stop, seek, set_speed, set_loop (+ get_status / list_files), rendering a scrubbable timeline and progress/counters from the replay_status fields (start_time / end_time / current_time / progress_pct / total|played_message_count). So the scrubber surface is justified — keeping it.

You're right that step, set_segment, and set_channel_filter aren't wired into the UI yet — they round out the same control surface for programmatic/CLI consumers and planned UI controls. Happy to split those three out if you'd prefer a leaner first cut.

This also reframes the no-summary item as a real UI bug rather than just degraded ergonomics: the scrubber locks its timeline when a file reports start == end == 0, which is what pushed me toward the scan-recovery option below (rejecting would also have regressed the old one-shot replayer's ability to play un-indexed files).

Changes in 3102202

  • Spin-wait (blocking): _walk_iterator now sleeps in bounded min(remaining, 5 ms) slices instead of time.sleep(0.0) — the CPU idles through inter-message gaps while stop/seek/pause still react within ~5 ms. Your suggested bound, restructured so the sleep argument is always > 0 (no negative-sleep edge) and the existing shutdown/COMMAND_EVENT/pause early-exits are preserved.
  • No-summary scan (should-fix): new _scan_file() does a single iter_messages(log_time_order=False) pass — off STATE_LOCK, with load-progress reporting — to recover start/end/count/channels when summary.statistics is absent. Verified safe: the mcap SeekingReader re-seek(0)s on every iter_messages call, so the load-time scan doesn't disturb subsequent walks. Loopback guard extended to the scanned topics.
  • Dead prior_state revert (should-fix): removed; _load_file_worker's forced STOPPED (+ last_load_error) is now the single honest "load failed" transition.
  • Loop (nice-to-have): added test_loop_replays_from_start_on_eof (asserts the climb-then-reset signature of an EOF re-seek, never STOPPED), plus test_load_no_summary_statistics_recovers_by_scan for the scan path.

Validation: ruff + black clean; 131 mcap tests green (81 unit + 50 e2e). Replies inline on the three threads.

Follow-up on the crowsnest side (not this PR): surface load_progress_pct / last_load_error in the UI now that scan-based loads can take longer and can fail asynchronously.

_walk_iterator only reset its wall-clock timing anchor (first /
reference_wall_ns) at start, seek, and step — never on resume-from-pause or
a mid-playback speed change. The anchor keeps ticking through a pause, so on
resume the messages that came 'due' during the pause burst out at once (the
test shows 17 messages in ~50 us); a mid-file speed-up likewise bursts and a
slow-down stalls. Both paths are driven by the crowsnest replay UI's pause
button and live speed dropdown.

Re-anchor when resuming from a pause or when the speed changed since the last
message, mirroring the step/seek re-anchor already in the loop. Add
test_resume_after_pause_does_not_burst, which asserts post-resume messages
arrive spread over wall-clock time at the recording's ~50 ms cadence — it
fails without the fix (burst within milliseconds).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@freol35241

Copy link
Copy Markdown
Contributor Author

@TedSjoblom — pushed 5455884, which trims the control surface down to what's actually consumed. Context for the call and the changes:

Why trim

You confirmed (and the crowsnest FloatAppMcapReplay.jsx consumer backs it up) that replay-onto-the-bus is a real need, so the stateful daemon stays — this isn't a revert to the one-shot replayer. But the UI drives only load_file / play / pause / stop / seek / set_speed / set_loop (+ list_files). step, set_segment, and set_channel_filter had no consumer, and they were the source of most of the state-machine branching (the step_remaining timing special-case, segment remapping threaded through seek/loop/EOF, the per-emit filter lookup). Per your own offer to split them out, I've removed them rather than carry unexercised machinery. They can come back behind a concrete consumer.

set_channel_filter specifically I'd argue against re-adding as an RPC: filtering at the publisher drops the channel for every subscriber on the bus, not just the asker — consumers already filter by subscribing to the keys they want. If it's about bus load, that's a load-time/CLI concern, not a live global toggle.

What else fell out for free

Dropping get_status removed its return type, which means the duplicate RPC-local ReplayStatus + DaemonInfo (the inline copies that had to stay byte-compatible with the broadcast payload by hand) are gone, along with the duplicate _build_rpc_status builder. State is now read only from the replay_status broadcast — which already publishes continuously plus an immediate sample on every mutation, so there's no polling latency lost. Also removed the now-unused segment_*/filtered_channels fields from the keelson.ReplayStatus pub payload.

Preserved untouched

Your three review fixes — bounded-sleep timing, the no-summary scan fallback, and the resume/speed-change clock re-anchor — are all intact.

Tests

  • E2E suite now observes state via the broadcast (a _latest_status helper) instead of get_status; dropped the 7 step/segment/filter tests.
  • Added the data-plane coverage that was missing: the whole daemon exists to put recordings back on the bus intact, but only one prior test touched a replayed topic (and only checked arrival timing). New tests:
    • test_replay_delivers_payloads_intact_on_original_keys — multi-channel replay, asserts every payload arrives on its original key, in order, complete (also restores multi-channel publisher-declaration coverage).
    • test_replay_key_tag_publishes_on_suffixed_keys — asserts --replay-key-tag republishes on <topic>/replay and leaves the plain key silent.
  • Green: 22 replay e2e + 81 mcap unit + 99 SDK tests. SDKs + docs regenerated.

Net

−447 lines of source. load_file left async as-is (didn't want to change semantics the UI relies on without a separate discussion). Filed #156 for the shared RPC-server dispatcher, since this is now the second connector (with mavlink) hand-rolling the same queryable dispatch + [RPC] logging + ErrorResponse scaffolding.

freol35241 and others added 2 commits June 10, 2026 15:13
Drop step, set_segment, set_channel_filter, and get_status from the
replay control interface, keeping the 8 procedures the crowsnest replay
UI actually drives (list_files, load_file, play, pause, stop, seek,
set_speed, set_loop). Removing get_status also removes its return type,
so the duplicate RPC-local ReplayStatus/DaemonInfo — previously
hand-synced with the broadcast payload — are gone; state is now read
solely from the replay_status broadcast.

Rename the interface McapReplayControl -> ReplayControl (and
McapReplaySuccessResponse -> ReplaySuccessResponse, file ->
interfaces/ReplayControl.proto). The control contract is
recording-format-agnostic — nothing in it is MCAP-specific — so it
follows the same naming rule as the other interfaces (VehicleControl,
Configurable): named for the domain concept, not the connector that
implements it. Format identity stays on the connector (mcap-replay).
The service name is not part of the Zenoh RPC key, so this is wire-
compatible; only generated type names change. Matches the already
format-neutral keelson.ReplayStatus broadcast payload.

Also remove the now-unused segment/filter fields from the
keelson.ReplayStatus pub payload, the corresponding daemon state and
walk-loop branches, and the dead request summarizers. The bounded-sleep,
no-summary scan fallback, and resume/speed clock re-anchor fixes are
preserved untouched.

Tests: observe state via the replay_status broadcast instead of
get_status; drop the step/segment/filter tests; add two data-plane e2e
tests that were missing — payload fidelity + per-channel completeness on
the original keys (multi-channel), and the --replay-key-tag /replay
suffix on the wire.

22 replay e2e + 81 mcap unit + 102 SDK tests green.
Follow-up filed as #156 (shared RPC-server dispatcher in scaffolding).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
# Conflicts:
#	messages/subjects.yaml
@freol35241 freol35241 force-pushed the feat/mcap-replay-daemon branch from 5da8d52 to f52ddb4 Compare June 10, 2026 15:14
@freol35241

Copy link
Copy Markdown
Contributor Author

Heads-up — renamed the interface and force-pushed the branch (re-fetch / reset your local before pulling).

McapReplayControlReplayControl (and McapReplaySuccessResponseReplaySuccessResponse, file → interfaces/ReplayControl.proto). Reasoning: the control contract has nothing MCAP-specific in it — play/pause/seek/set_speed/load_file/list_files are all format-agnostic — so it should follow the same naming rule as the other interfaces (VehicleControl, Configurable): named for the domain concept, not the connector that implements it. Format identity stays on the connector (mcap-replay), and a future klog replayer could serve the same contract. It also lines up with the already-neutral keelson.ReplayStatus broadcast payload.

Wire-compatible — the service name isn't part of the Zenoh RPC key (…/@rpc/{procedure}/{responder_id}), so crowsnest is unaffected; only the generated type names consumers import change. Folded into the existing trim commit per review. Green: 86 SDK + 81 mcap unit + 22 replay e2e; SDKs/docs regenerated; merged with latest main.

@freol35241 freol35241 merged commit a3292d5 into main Jun 11, 2026
8 checks passed
@freol35241 freol35241 deleted the feat/mcap-replay-daemon branch June 11, 2026 04:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants