Conversation
Add CouchbaseStore implementation using the acouchbase async SDK for async key-value operations with Couchbase Server. Features: - Full AsyncKeyValue protocol implementation - TTL support via Couchbase document expiration - Collection sanitization strategy for safe collection names - Connection string or pre-configured cluster support - Proper resource management with async context manager Install with: pip install py-key-value-aio[couchbase] Closes #292 Co-authored-by: Bill Easton <strawgate@users.noreply.github.com>
📝 WalkthroughWalkthroughThis PR introduces comprehensive Couchbase support to the async key-value store library. It adds a new async Couchbase-backed store implementation with document serialization, collection name sanitization, full lifecycle management, CRUD operations, and Docker-based integration tests. Changes
Sequence DiagramsequenceDiagram
actor Client
participant CouchbaseStore
participant AsyncCluster
participant Couchbase as Couchbase Server
Client->>CouchbaseStore: __init__(connection_string, credentials)
CouchbaseStore->>CouchbaseStore: Initialize configuration
Client->>CouchbaseStore: _setup()
CouchbaseStore->>AsyncCluster: Connect via connection_string
AsyncCluster->>Couchbase: Establish connection
Couchbase-->>AsyncCluster: Connection established
AsyncCluster-->>CouchbaseStore: Cluster ready
CouchbaseStore->>CouchbaseStore: Get bucket reference
Client->>CouchbaseStore: _setup_collection(collection="users")
CouchbaseStore->>CouchbaseStore: Sanitize collection name
CouchbaseStore->>CouchbaseStore: Store collection reference
Client->>CouchbaseStore: _put_managed_entry(key, entry, TTL)
CouchbaseStore->>CouchbaseStore: Serialize entry to JSON dict
CouchbaseStore->>AsyncCluster: Upsert document with TTL
AsyncCluster->>Couchbase: Store document
Couchbase-->>AsyncCluster: Document stored
AsyncCluster-->>CouchbaseStore: Upsert complete
Client->>CouchbaseStore: _get_managed_entry(key)
CouchbaseStore->>AsyncCluster: Fetch document
AsyncCluster->>Couchbase: Get document
Couchbase-->>AsyncCluster: Return document
AsyncCluster-->>CouchbaseStore: Document retrieved
CouchbaseStore->>CouchbaseStore: Deserialize JSON dict to entry
CouchbaseStore-->>Client: Return ManagedEntry
Client->>CouchbaseStore: _delete_managed_entry(key)
CouchbaseStore->>AsyncCluster: Delete document
AsyncCluster->>Couchbase: Remove document
Couchbase-->>AsyncCluster: Deletion confirmed
AsyncCluster-->>CouchbaseStore: Delete complete
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pyproject.toml (1)
54-66:⚠️ Potential issue | 🟡 MinorRelax Couchbase SDK minimum version to 4.0.0.
The store uses only APIs available in Couchbase Python SDK 4.0.0+:
acouchbase.cluster.AsyncCluster,couchbase.options.GetOptions,couchbase.options.UpsertOptions, andcouchbase.auth.PasswordAuthenticatorwere all introduced in the 4.0 release line. The currentcouchbase>=4.3.0constraint is unnecessarily restrictive and should be relaxed tocouchbase>=4.0.0to allow more users to use compatible SDK versions.
🤖 Fix all issues with AI agents
In `@src/key_value/aio/stores/couchbase/store.py`:
- Around line 51-81: The prepare_load method currently uses data.pop("value")
which can raise KeyError; wrap the access in a guarded check or try/except and,
if missing, raise a DeserializationError with a clear message instead of letting
KeyError propagate so callers like _get_managed_entry receive a
DeserializationError; specifically, replace the direct pop in prepare_load with
a safe retrieval (e.g., check "value" in data or try: ... except KeyError) and
raise DeserializationError(message="Value field not found in Couchbase
document") when absent while preserving the existing logic that extracts
value["object"] when present.
- Around line 381-392: _delete_collection currently only removes the in-memory
reference (_collections_by_name.pop) and returns True without deleting
documents; either implement a proper deletion (issue a Couchbase N1QL DELETE or
KV delete-by-prefix using the cluster/collection client to remove docs whose
keys start with the collection prefix) inside _delete_collection and only return
True on success (handle and log errors and return False/raise on failure), or
explicitly mark the operation unsupported by raising NotImplementedError (or
returning False) instead of returning True; update references to
_collections_by_name accordingly so re-initializing a collection cannot leave
stale data.
- Around line 157-210: The __init__ currently allows both cluster and
connection_string to be passed and silently ignores connection_string; add an
explicit guard at the start of Couchbase store __init__ to detect and raise a
ValueError when both cluster and connection_string are provided (message like
"Provide either cluster or connection_string, not both"), leaving the existing
checks for neither provided and for username/password unchanged; ensure you
still set cluster_provided = True when cluster is passed so
client_provided_by_user remains correct.
- Around line 212-222: The _setup method currently registers self._cluster.close
via self._exit_stack.callback which won’t await AsyncCluster.close and can leak
connections; change the cleanup registration in _setup to use
self._exit_stack.push_async_callback(self._cluster.close) (or detect coroutine
with inspect.iscoroutinefunction and wrap accordingly) so that when not
self._client_provided_by_user the async close is awaited properly; keep the
existing bucket = self._cluster.bucket(...) and await bucket.on_connect()
behavior unchanged.
In `@tests/stores/couchbase/test_couchbase.py`:
- Around line 125-139: The test function signatures for
test_long_collection_name and test_special_characters_in_collection_name exceed
140 characters; reformat each async def signature to multiple lines by placing
parameters on separate lines (e.g., one parameter per line) while preserving the
type annotations (CouchbaseStore) and the inline comment "# pyright:
ignore[reportIncompatibleMethodOverride]" and keeping the decorator "@override"
attached to the second test; ensure indentation and trailing colon remain
correct so the function bodies are unchanged.
| @override | ||
| def prepare_load(self, data: dict[str, Any]) -> dict[str, Any]: | ||
| value = data.pop("value") | ||
|
|
||
| if "object" in value: | ||
| data["value"] = value["object"] | ||
| else: | ||
| msg = "Value field not found in Couchbase document" | ||
| raise DeserializationError(message=msg) | ||
|
|
||
| if date_created := data.get("created_at"): | ||
| if isinstance(date_created, str): | ||
| # Parse ISO format datetime string | ||
| data["created_at"] = datetime.fromisoformat(date_created.replace("Z", "+00:00")) | ||
| elif not isinstance(date_created, datetime): | ||
| msg = "Expected `created_at` field to be a datetime or ISO string" | ||
| raise DeserializationError(message=msg) | ||
| else: | ||
| data["created_at"] = date_created.replace(tzinfo=timezone.utc) | ||
|
|
||
| if date_expires := data.get("expires_at"): | ||
| if isinstance(date_expires, str): | ||
| # Parse ISO format datetime string | ||
| data["expires_at"] = datetime.fromisoformat(date_expires.replace("Z", "+00:00")) | ||
| elif not isinstance(date_expires, datetime): | ||
| msg = "Expected `expires_at` field to be a datetime or ISO string" | ||
| raise DeserializationError(message=msg) | ||
| else: | ||
| data["expires_at"] = date_expires.replace(tzinfo=timezone.utc) | ||
|
|
||
| return data |
There was a problem hiding this comment.
Handle missing value key to avoid KeyError leaks.
data.pop("value") can raise KeyError, which won’t be caught as a DeserializationError in _get_managed_entry. Prefer a guarded pop and raise a DeserializationError instead.
🛠️ Proposed fix
- value = data.pop("value")
+ value = data.pop("value", None)
+ if value is None:
+ msg = "Value field not found in Couchbase document"
+ raise DeserializationError(message=msg)🤖 Prompt for AI Agents
In `@src/key_value/aio/stores/couchbase/store.py` around lines 51 - 81, The
prepare_load method currently uses data.pop("value") which can raise KeyError;
wrap the access in a guarded check or try/except and, if missing, raise a
DeserializationError with a clear message instead of letting KeyError propagate
so callers like _get_managed_entry receive a DeserializationError; specifically,
replace the direct pop in prepare_load with a safe retrieval (e.g., check
"value" in data or try: ... except KeyError) and raise
DeserializationError(message="Value field not found in Couchbase document") when
absent while preserving the existing logic that extracts value["object"] when
present.
| def __init__( | ||
| self, | ||
| *, | ||
| cluster: AsyncCluster | None = None, | ||
| connection_string: str | None = None, | ||
| username: str | None = None, | ||
| password: str | None = None, | ||
| bucket_name: str | None = None, | ||
| scope_name: str | None = None, | ||
| default_collection: str | None = None, | ||
| collection_sanitization_strategy: SanitizationStrategy | None = None, | ||
| ) -> None: | ||
| """Initialize the Couchbase store. | ||
|
|
||
| Values are stored as native JSON documents for better query support and performance. | ||
|
|
||
| Args: | ||
| cluster: The async Couchbase cluster to use (mutually exclusive with connection_string). | ||
| If provided, the store will not manage the cluster's lifecycle. | ||
| The caller is responsible for managing the cluster's lifecycle. | ||
| connection_string: The Couchbase connection string (mutually exclusive with cluster). | ||
| username: The username for authentication (required if connection_string is provided). | ||
| password: The password for authentication (required if connection_string is provided). | ||
| bucket_name: The name of the Couchbase bucket. | ||
| scope_name: The name of the Couchbase scope. | ||
| default_collection: The default collection to use if no collection is provided. | ||
| collection_sanitization_strategy: The sanitization strategy to use for collections. | ||
| """ | ||
|
|
||
| cluster_provided = cluster is not None | ||
|
|
||
| if cluster: | ||
| self._cluster = cluster | ||
| else: | ||
| if not connection_string: | ||
| msg = "Either cluster or connection_string must be provided" | ||
| raise ValueError(msg) | ||
| if not username or not password: | ||
| msg = "username and password are required when using connection_string" | ||
| raise ValueError(msg) | ||
|
|
||
| auth = PasswordAuthenticator(username, password) | ||
| self._cluster = AsyncCluster(connection_string, ClusterOptions(auth)) | ||
|
|
||
| self._bucket_name = bucket_name or DEFAULT_BUCKET | ||
| self._scope_name = scope_name or DEFAULT_SCOPE | ||
| self._collections_by_name = {} | ||
| self._adapter = CouchbaseSerializationAdapter() | ||
|
|
||
| super().__init__( | ||
| default_collection=default_collection, | ||
| collection_sanitization_strategy=collection_sanitization_strategy, | ||
| client_provided_by_user=cluster_provided, | ||
| ) |
There was a problem hiding this comment.
Enforce mutual exclusivity of cluster vs connection_string.
Docs say they’re mutually exclusive, but both can currently be provided (and connection_string would be ignored). Add a guard to prevent ambiguous configuration.
🛠️ Proposed fix
cluster_provided = cluster is not None
+
+ if cluster and connection_string:
+ msg = "Provide either cluster or connection_string, not both"
+ raise ValueError(msg)🤖 Prompt for AI Agents
In `@src/key_value/aio/stores/couchbase/store.py` around lines 157 - 210, The
__init__ currently allows both cluster and connection_string to be passed and
silently ignores connection_string; add an explicit guard at the start of
Couchbase store __init__ to detect and raise a ValueError when both cluster and
connection_string are provided (message like "Provide either cluster or
connection_string, not both"), leaving the existing checks for neither provided
and for username/password unchanged; ensure you still set cluster_provided =
True when cluster is passed so client_provided_by_user remains correct.
| @override | ||
| async def _setup(self) -> None: | ||
| """Initialize the cluster connection and register cleanup.""" | ||
| # Wait for cluster connection | ||
| bucket = self._cluster.bucket(self._bucket_name) # pyright: ignore[reportUnknownMemberType] | ||
| await bucket.on_connect() # pyright: ignore[reportUnknownMemberType] | ||
|
|
||
| # Register cleanup if we own the cluster | ||
| if not self._client_provided_by_user: | ||
| self._exit_stack.callback(self._cluster.close) | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the file structure and understand the context
cd src/key_value/aio/stores/couchbase && head -250 store.py | tail -100Repository: strawgate/py-key-value
Length of output: 4699
🏁 Script executed:
# Check imports and AsyncCluster usage
head -50 src/key_value/aio/stores/couchbase/store.pyRepository: strawgate/py-key-value
Length of output: 1785
🏁 Script executed:
# Search for _cluster and close usage patterns
rg "AsyncCluster|_cluster.close" src/key_value/aio/stores/couchbase/Repository: strawgate/py-key-value
Length of output: 632
🏁 Script executed:
# Check if there are any type hints or stubs for couchbase
fd -e pyi . | rg -i couchbaseRepository: strawgate/py-key-value
Length of output: 48
🌐 Web query:
couchbase-sdk python AsyncCluster.close() async method signature
💡 Result:
In the Couchbase Python SDK (acouchbase), the asyncio cluster close method is:
async def close(self) -> NoneIt’s documented as async close() → None on acouchbase.cluster.AsyncCluster. [1]
Sources: [1] (docs.couchbase.com)
Citations:
Use push_async_callback() to properly close the async cluster.
The Couchbase SDK's AsyncCluster.close() is an async method and must be awaited. Registering it via ExitStack.callback() won't await it, causing connection leaks. Use push_async_callback() instead or a runtime check with inspect.iscoroutinefunction() for compatibility:
Fix
+import inspect
...
if not self._client_provided_by_user:
- self._exit_stack.callback(self._cluster.close)
+ close = self._cluster.close
+ if inspect.iscoroutinefunction(close):
+ self._exit_stack.push_async_callback(close)
+ else:
+ self._exit_stack.callback(close)🤖 Prompt for AI Agents
In `@src/key_value/aio/stores/couchbase/store.py` around lines 212 - 222, The
_setup method currently registers self._cluster.close via
self._exit_stack.callback which won’t await AsyncCluster.close and can leak
connections; change the cleanup registration in _setup to use
self._exit_stack.push_async_callback(self._cluster.close) (or detect coroutine
with inspect.iscoroutinefunction and wrap accordingly) so that when not
self._client_provided_by_user the async close is awaited properly; keep the
existing bucket = self._cluster.bucket(...) and await bucket.on_connect()
behavior unchanged.
| @override | ||
| async def _delete_collection(self, *, collection: str) -> bool: | ||
| """Delete all documents in the collection. | ||
|
|
||
| Note: This doesn't delete the Couchbase collection itself, just removes all | ||
| documents with keys matching the collection prefix. | ||
| """ | ||
| # Since we prefix keys with collection name, we can't easily delete all | ||
| # documents without a query. For now, just clear the reference. | ||
| self._collections_by_name.pop(collection, None) | ||
|
|
||
| return True |
There was a problem hiding this comment.
_delete_collection reports success without deleting data.
This method only removes the in-memory reference and returns True, but leaves all documents intact. That violates destroy semantics and can cause stale data if the collection is re-initialized. Consider implementing a query-based delete or explicitly signaling “unsupported” rather than returning success.
🛠️ Safer interim behavior
- # Since we prefix keys with collection name, we can't easily delete all
- # documents without a query. For now, just clear the reference.
- self._collections_by_name.pop(collection, None)
-
- return True
+ # Since we prefix keys with collection name, we can't easily delete all
+ # documents without a query. Signal that deletion isn't implemented.
+ msg = "CouchbaseStore.delete_collection does not delete documents yet"
+ raise NotImplementedError(msg)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @override | |
| async def _delete_collection(self, *, collection: str) -> bool: | |
| """Delete all documents in the collection. | |
| Note: This doesn't delete the Couchbase collection itself, just removes all | |
| documents with keys matching the collection prefix. | |
| """ | |
| # Since we prefix keys with collection name, we can't easily delete all | |
| # documents without a query. For now, just clear the reference. | |
| self._collections_by_name.pop(collection, None) | |
| return True | |
| `@override` | |
| async def _delete_collection(self, *, collection: str) -> bool: | |
| """Delete all documents in the collection. | |
| Note: This doesn't delete the Couchbase collection itself, just removes all | |
| documents with keys matching the collection prefix. | |
| """ | |
| # Since we prefix keys with collection name, we can't easily delete all | |
| # documents without a query. Signal that deletion isn't implemented. | |
| msg = "CouchbaseStore.delete_collection does not delete documents yet" | |
| raise NotImplementedError(msg) |
🤖 Prompt for AI Agents
In `@src/key_value/aio/stores/couchbase/store.py` around lines 381 - 392,
_delete_collection currently only removes the in-memory reference
(_collections_by_name.pop) and returns True without deleting documents; either
implement a proper deletion (issue a Couchbase N1QL DELETE or KV
delete-by-prefix using the cluster/collection client to remove docs whose keys
start with the collection prefix) inside _delete_collection and only return True
on success (handle and log errors and return False/raise on failure), or
explicitly mark the operation unsupported by raising NotImplementedError (or
returning False) instead of returning True; update references to
_collections_by_name accordingly so re-initializing a collection cannot leave
stale data.
| async def test_long_collection_name(self, store: CouchbaseStore, sanitizing_store: CouchbaseStore): # pyright: ignore[reportIncompatibleMethodOverride] | ||
| with pytest.raises(Exception): # noqa: B017, PT011 | ||
| await store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"}) | ||
|
|
||
| await sanitizing_store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"}) | ||
| assert await sanitizing_store.get(collection="test_collection" * 100, key="test_key") == {"test": "test"} | ||
|
|
||
| @override | ||
| async def test_special_characters_in_collection_name(self, store: CouchbaseStore, sanitizing_store: CouchbaseStore): # pyright: ignore[reportIncompatibleMethodOverride] | ||
| """Tests that special characters in the collection name will not raise an error.""" | ||
| with pytest.raises(Exception): # noqa: B017, PT011 | ||
| await store.put(collection="test_collection!@#$%^&*()", key="test_key", value={"test": "test"}) | ||
|
|
||
| await sanitizing_store.put(collection="test_collection!@#$%^&*()", key="test_key", value={"test": "test"}) | ||
| assert await sanitizing_store.get(collection="test_collection!@#$%^&*()", key="test_key") == {"test": "test"} |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and read the test file
fd "test_couchbase.py" tests/Repository: strawgate/py-key-value
Length of output: 106
🏁 Script executed:
# Read the specific lines from the test file
sed -n '125,139p' tests/stores/couchbase/test_couchbase.py | cat -nRepository: strawgate/py-key-value
Length of output: 1400
🏁 Script executed:
# Let's also check the exact character length of the problematic lines
python3 << 'EOF'
import sys
# Read the file
with open('tests/stores/couchbase/test_couchbase.py', 'r') as f:
lines = f.readlines()
# Check lines 125 and 132 (converting from 1-indexed to 0-indexed)
for line_num in [124, 131]: # 125 and 132 in 1-indexed
if line_num < len(lines):
line = lines[line_num].rstrip('\n')
print(f"Line {line_num + 1}: Length={len(line)}")
print(f"Content: {line}")
print()
EOFRepository: strawgate/py-key-value
Length of output: 297
🏁 Script executed:
# Let's also examine the broader context and check if the test class structure uses ContextManagerStoreTestMixin
head -n 150 tests/stores/couchbase/test_couchbase.py | tail -n 50Repository: strawgate/py-key-value
Length of output: 2911
🏁 Script executed:
# Let's verify the exact length of line 133 (test_special_characters_in_collection_name)
python3 << 'EOF'
with open('tests/stores/couchbase/test_couchbase.py', 'r') as f:
lines = f.readlines()
# Check line 133 (0-indexed = 132)
if len(lines) > 132:
line = lines[132].rstrip('\n')
print(f"Line 133: Length={len(line)}")
print(f"Content: {line}")
EOFRepository: strawgate/py-key-value
Length of output: 269
Wrap method signatures to keep lines ≤ 140 characters.
Lines 125 and 133 exceed the 140-character limit (156 and 173 characters respectively). Split the parameters across multiple lines while keeping the type annotations and pyright ignore comment intact.
🛠️ Proposed refactor
- async def test_long_collection_name(self, store: CouchbaseStore, sanitizing_store: CouchbaseStore): # pyright: ignore[reportIncompatibleMethodOverride]
+ async def test_long_collection_name(
+ self,
+ store: CouchbaseStore,
+ sanitizing_store: CouchbaseStore,
+ ): # pyright: ignore[reportIncompatibleMethodOverride]
with pytest.raises(Exception): # noqa: B017, PT011
await store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"})
await sanitizing_store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"})
assert await sanitizing_store.get(collection="test_collection" * 100, key="test_key") == {"test": "test"}
`@override`
- async def test_special_characters_in_collection_name(self, store: CouchbaseStore, sanitizing_store: CouchbaseStore): # pyright: ignore[reportIncompatibleMethodOverride]
+ async def test_special_characters_in_collection_name(
+ self,
+ store: CouchbaseStore,
+ sanitizing_store: CouchbaseStore,
+ ): # pyright: ignore[reportIncompatibleMethodOverride]
"""Tests that special characters in the collection name will not raise an error."""
with pytest.raises(Exception): # noqa: B017, PT011
await store.put(collection="test_collection!@#$%^&*()", key="test_key", value={"test": "test"})🤖 Prompt for AI Agents
In `@tests/stores/couchbase/test_couchbase.py` around lines 125 - 139, The test
function signatures for test_long_collection_name and
test_special_characters_in_collection_name exceed 140 characters; reformat each
async def signature to multiple lines by placing parameters on separate lines
(e.g., one parameter per line) while preserving the type annotations
(CouchbaseStore) and the inline comment "# pyright:
ignore[reportIncompatibleMethodOverride]" and keeping the decorator "@override"
attached to the second test; ensure indentation and trailing colon remain
correct so the function bodies are unchanged.
Add CouchbaseStore implementation using the acouchbase async SDK for async key-value operations with Couchbase Server.
Features:
Install with:
pip install py-key-value-aio[couchbase]Closes #292
Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Tests