You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Overall, this is a solid implementation of consumer pinning (ADR-42) with good test coverage. The core flow — acquiring a Nats-Pin-Id from message headers and resending it in subsequent pull requests — is correctly implemented. A few issues are worth addressing before merging.
Issues
Bug: PullConsumer.get_info() doesn't refresh from server
pull.py:544-546:
asyncdefget_info(self) ->ConsumerInfo:
# Refresh info from serverreturnself._info
The comment says "Refresh info from server" but it returns the cached _info. This breaks the Consumer protocol contract and would silently return stale data (e.g. stale priority_groups state). It should call self._stream.get_consumer_info(self.name).
ErrorResponse is {"error": Error} (a wrapper), so this would model a doubly-nested {"error": {"error": {...}}}. Looking at the schema in definitions.json#error_response and how other responses use error fields (e.g. StreamSourceInfo.error: NotRequired[Error]), this should be NotRequired[Error].
Two formatting issues that will likely fail ruff format --check:
Missing blank line before class AllDeliverPolicy (types.py:277): PriorityPolicy = Literal[...] is followed immediately by class AllDeliverPolicy with only one blank line instead of two.
Missing blank line before class ConsumerDeleteResponse (types.py:792): ConsumerInfoRequest ends at line 790 and ConsumerDeleteResponse starts at line 792 with only one blank line.
ConsumerInfoRequest is an empty, unused TypedDict (types.py:787-790)
classConsumerInfoRequest(TypedDict):
"""A request for the JetStream $JS.API.CONSUMER.INFO API"""pass
consumer_info() in api/client.py sends no request body. This class is never imported or referenced. It adds confusion about whether a request body is needed.
JetStream convenience layer is missing unpin_consumer
The JetStream class has top-level convenience wrappers for delete_consumer, update_consumer, get_consumer, etc., but not for unpin_consumer. For consistency, callers shouldn't need to do (await js.get_stream(...)).unpin_consumer(...).
priority_policy field loosely typed in ConsumerConfig dataclass (consumer/__init__.py:120)
priority_policy: str|None=None
PriorityPolicy = Literal['none', 'overflow', 'pinned_client', 'prioritized'] exists in api/types.py. The dataclass could import and use it for better static type checking.
Polling loop in _request_loop (pull.py:449-468)
The 100ms polling sleep in _request_loop predates this PR, but becomes more visible now that priority group / pin ID state makes the loop more critical. A condition variable or asyncio event would avoid unnecessary wakeups, though this is a minor concern.
What's good
The Nats-Pin-Id extraction from headers and inclusion in subsequent requests is correctly implemented in PullMessageStream._send_request() and PullMessageBatch.__anext__().
Status 423 (PinIDMismatchError) is handled correctly: the PullMessageBatch surfaces it as an error, and PullMessageStream clears _pin_id and retries.
The ConsumerConfig / ConsumerInfo dataclasses properly handle the new priority_policy, priority_groups, priority_timeout fields and the ns→timedelta conversion.
stream.unpin_consumer() correctly delegates to api.consumer_unpin() and maps ConsumerNotFoundError.
PinIDMismatchError is exported from the package __init__.py.
Tests cover the main scenarios: pinned fetch, TTL expiry/repin, messages() stream pinning, unpin API, not-found error, config round-trip, and priority group info state.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.