Skip to content

Add consumer pinning#833

Draft
caspervonb wants to merge 2 commits intomainfrom
add-consumer-pinning
Draft

Add consumer pinning#833
caspervonb wants to merge 2 commits intomainfrom
add-consumer-pinning

Conversation

@caspervonb
Copy link
Copy Markdown
Collaborator

No description provided.

@caspervonb
Copy link
Copy Markdown
Collaborator Author

Please review @claude

@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer bot commented Mar 25, 2026

Claude finished @caspervonb's task in 3m 35s —— View job


PR Review: Add consumer pinning

  • Gather context and read changed files
  • Review API types and schema changes
  • Review consumer pull implementation
  • Review stream operations
  • Review error handling
  • Review test coverage
  • Post final review

Overall, this is a solid implementation of consumer pinning (ADR-42) with good test coverage. The core flow — acquiring a Nats-Pin-Id from message headers and resending it in subsequent pull requests — is correctly implemented. A few issues are worth addressing before merging.


Issues

Bug: PullConsumer.get_info() doesn't refresh from server

pull.py:544-546:

async def get_info(self) -> ConsumerInfo:
    # Refresh info from server
    return self._info

The comment says "Refresh info from server" but it returns the cached _info. This breaks the Consumer protocol contract and would silently return stale data (e.g. stale priority_groups state). It should call self._stream.get_consumer_info(self.name).

Fix this →

Bug: ConsumerUnpinResponse.error has wrong type

types.py:1585:

class ConsumerUnpinResponse(TypedDict):
    error: NotRequired[ErrorResponse]  # wrong

ErrorResponse is {"error": Error} (a wrapper), so this would model a doubly-nested {"error": {"error": {...}}}. Looking at the schema in definitions.json#error_response and how other responses use error fields (e.g. StreamSourceInfo.error: NotRequired[Error]), this should be NotRequired[Error].

Fix this →

Lint/format issues in types.py

Two formatting issues that will likely fail ruff format --check:

  1. Missing blank line before class AllDeliverPolicy (types.py:277): PriorityPolicy = Literal[...] is followed immediately by class AllDeliverPolicy with only one blank line instead of two.

  2. Missing blank line before class ConsumerDeleteResponse (types.py:792): ConsumerInfoRequest ends at line 790 and ConsumerDeleteResponse starts at line 792 with only one blank line.

Fix this →


Minor issues

ConsumerInfoRequest is an empty, unused TypedDict (types.py:787-790)

class ConsumerInfoRequest(TypedDict):
    """A request for the JetStream $JS.API.CONSUMER.INFO API"""
    pass

consumer_info() in api/client.py sends no request body. This class is never imported or referenced. It adds confusion about whether a request body is needed.

JetStream convenience layer is missing unpin_consumer

The JetStream class has top-level convenience wrappers for delete_consumer, update_consumer, get_consumer, etc., but not for unpin_consumer. For consistency, callers shouldn't need to do (await js.get_stream(...)).unpin_consumer(...).

priority_policy field loosely typed in ConsumerConfig dataclass (consumer/__init__.py:120)

priority_policy: str | None = None

PriorityPolicy = Literal['none', 'overflow', 'pinned_client', 'prioritized'] exists in api/types.py. The dataclass could import and use it for better static type checking.

Polling loop in _request_loop (pull.py:449-468)

The 100ms polling sleep in _request_loop predates this PR, but becomes more visible now that priority group / pin ID state makes the loop more critical. A condition variable or asyncio event would avoid unnecessary wakeups, though this is a minor concern.


What's good

  • The Nats-Pin-Id extraction from headers and inclusion in subsequent requests is correctly implemented in PullMessageStream._send_request() and PullMessageBatch.__anext__().
  • Status 423 (PinIDMismatchError) is handled correctly: the PullMessageBatch surfaces it as an error, and PullMessageStream clears _pin_id and retries.
  • The ConsumerConfig / ConsumerInfo dataclasses properly handle the new priority_policy, priority_groups, priority_timeout fields and the ns→timedelta conversion.
  • stream.unpin_consumer() correctly delegates to api.consumer_unpin() and maps ConsumerNotFoundError.
  • PinIDMismatchError is exported from the package __init__.py.
  • Tests cover the main scenarios: pinned fetch, TTL expiry/repin, messages() stream pinning, unpin API, not-found error, config round-trip, and priority group info state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant