Skip to content

Feature/cog 4469 graph aware embeddings integrate joint postgres graph db and#2502

Open
DanielNScott wants to merge 28 commits intotopoteretes:devfrom
DanielNScott:feature/cog-4469-graph-aware-embeddings-integrate-joint-postgres-graph-db-and
Open

Feature/cog 4469 graph aware embeddings integrate joint postgres graph db and#2502
DanielNScott wants to merge 28 commits intotopoteretes:devfrom
DanielNScott:feature/cog-4469-graph-aware-embeddings-integrate-joint-postgres-graph-db-and

Conversation

@DanielNScott
Copy link
Copy Markdown

@DanielNScott DanielNScott commented Mar 26, 2026

Description

Attempted minimal implementation of hybrid postgres back end, merging graph and vector database interactions where possible.

Acceptance Criteria

  • hybrid adapter passes all existing graph and pgvector unit, end-to-end, and CI tests
  • get_unified_engine updated to include new hybrid adapter/retriever
  • GraphCompletionRetriever tested with new hybrid adapter/retriever
  • schema allows JOINs between graph nodes and vector entries
  • update error messages for unsupported operations (raw Cypher, NL search, Graphiti) as with postgres graphdb
  • GraphCompletionRetriever produces equivalent results via combined graph and vector query when using hybrid or non-hybrid postgres backends.

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Code refactoring
  • Other (please specify):

Screenshots

Pre-submission Checklist

  • I have tested my changes thoroughly before submitting this PR (See CONTRIBUTING.md)
  • This PR contains minimal changes necessary to address the issue/feature
  • My code follows the project's coding standards and style guidelines
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if applicable)
  • All new and existing tests pass
  • I have searched existing PRs to ensure this change hasn't been submitted already
  • I have linked any relevant issues in the description
  • My commits have clear and descriptive messages

DCO Affirmation

I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin.

Summary by CodeRabbit

  • New Features

    • PostgreSQL added as a supported graph backend and a pghybrid hybrid graph+vector backend; hybrid write APIs and batch triplet pagination added.
  • Bug Fixes

    • Retrieval/search paths now explicitly report unsupported search types for Postgres backends.
  • Documentation

    • Docs updated with PostgreSQL/hybrid provider configuration and prerequisites.
  • Tests

    • New end-to-end and adapter tests for Postgres and hybrid scenarios; CI expanded to run Postgres test jobs.

Problems:
- Need to migrate basic graph functionality to relational DB

Solutions:
- New adapter implementing GraphDBInterface
- Selection branch included in get_graph_engine.py

Decisions:
- re-use SQLAlchemy setup and configuration
- Remove .query() (raw Cypher) functionality
Problems:
- Cypher search retriever exposes raw cypher interface via .query()
- Natural language retriever writes, uses Cypher via .query()
- Temporal awareness (Graphiti) subsystem directly uses cypher, neo4j

Solutions:
- Raise incompatibility errors proactively across these consumers

Decisions:
- None of these are core functionality, cost should be low
- Graphiti already unguarded hard-coding of multiple configs...
- Complex cypher search cannot be replaced in postgres at present
Problems:
- asyncpg misparses ::jsonb cast as named parameter
- prune_system drops graph tables via delete_database()
- cached adapter instance has no tables after prune
- no CI coverage for postgres graph backend
- no unit or e2e tests for postgres adapter

Solutions:
- replace ::jsonb with CAST(:param AS jsonb) throughout
- add _ensure_initialized() calling idempotent CREATE IF NOT EXISTS
- guard is_empty and delete_graph with _ensure_initialized
- add run-postgres-tests job to graph_db_tests.yml CI workflow
- add 26 unit tests against real Postgres
- add shared e2e test and postgres wrapper

Decisions:
- _ensure_initialized over defensive checks on every method
- real Postgres in tests, not SQLite (dialect differences block it)
- shared test function for future backend reuse (test_graphdb_shared)
- CI uses Postgres 16 service container, matching Kuzu/Neo4j pattern
Problems:
- some white-space and wrapping non-compliance

Solutions:
- fix via ruff
Problems:
- want joint graph and vector interaction in SQL to extent possible
- currently, graph and vector writes are separate transactions
- search requires two round-trips, vector search then graph search

Solutions:
- add PostgresHybridAdapter composing PostgresAdapter and PGVectorAdapter
- implement combined write methods with single-transaction atomicity
   - implement batching to avoid loops on session.execute calls later
- implement combined search by joining graph and vector tables
- exceptions in get_graph_engine and create_vector_engine if given pghybrid
- add pghybrid construction in get_unified_engine

Decisions:
- composition rather than inheritance on subclasses of SQLAlchemyAdapter
   - otherwise subtle configuration bugs can occur (diamond problem)
- hybrid only allowed via get_unified_engine, not individual factories
- per-row inserts for now, batch optimization deferred
- table name validation via regex to prevent SQL injection
Problems:
- pghybrid delegates query() to PostgresAdapter which raises generic error
- isinstance checks only catch PostgresAdapter, not PostgresHybridAdapter
- users would see NotImplementedError instead of descriptive message

Solutions:
- add PostgresHybridAdapter to isinstance checks in three files
- cypher search, natural language, and graphiti now guard both adapters
Problems:
- no test coverage for hybrid adapter

Solutions:
- add unit tests covering graph, vector, combined, and content paths
- content integrity test verifies IDs and payloads across both tables
- tests use local SQLAlchemyAdapter to avoid stale event loop issues
- fixture teardown cleans up vector collection tables between tests

Decisions:
- use uuid5 for test IDs because pgvector tables require UUID columns
- bypass PGVectorAdapter constructor to share connection pool with graph
- e2e and CI deferred because cognify pipeline calls get_graph_engine directly
Problems:
- Need to migrate basic graph functionality to relational DB

Solutions:
- New adapter implementing GraphDBInterface
- Selection branch included in get_graph_engine.py

Decisions:
- re-use SQLAlchemy setup and configuration
- Remove .query() (raw Cypher) functionality
Problems:
- Cypher search retriever exposes raw cypher interface via .query()
- Natural language retriever writes, uses Cypher via .query()
- Temporal awareness (Graphiti) subsystem directly uses cypher, neo4j

Solutions:
- Raise incompatibility errors proactively across these consumers

Decisions:
- None of these are core functionality, cost should be low
- Graphiti already unguarded hard-coding of multiple configs...
- Complex cypher search cannot be replaced in postgres at present
Problems:
- asyncpg misparses ::jsonb cast as named parameter
- prune_system drops graph tables via delete_database()
- cached adapter instance has no tables after prune
- no CI coverage for postgres graph backend
- no unit or e2e tests for postgres adapter

Solutions:
- replace ::jsonb with CAST(:param AS jsonb) throughout
- add _ensure_initialized() calling idempotent CREATE IF NOT EXISTS
- guard is_empty and delete_graph with _ensure_initialized
- add run-postgres-tests job to graph_db_tests.yml CI workflow
- add 26 unit tests against real Postgres
- add shared e2e test and postgres wrapper

Decisions:
- _ensure_initialized over defensive checks on every method
- real Postgres in tests, not SQLite (dialect differences block it)
- shared test function for future backend reuse (test_graphdb_shared)
- CI uses Postgres 16 service container, matching Kuzu/Neo4j pattern
Problems:
- some white-space and wrapping non-compliance

Solutions:
- fix via ruff
@pull-checklist
Copy link
Copy Markdown

Please make sure all the checkboxes are checked:

  • I have tested these changes locally.
  • I have reviewed the code changes.
  • I have added end-to-end and unit tests (if applicable).
  • I have updated the documentation and README.md file (if necessary).
  • I have removed unnecessary code and debug statements.
  • PR title is clear and follows the convention.
  • I have tagged reviewers or team members for feedback.

@github-actions
Copy link
Copy Markdown

Hello @DanielNScott, thank you for submitting a PR! We will respond as soon as possible.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 26, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds Postgres-backed graph and hybrid Postgres+pgvector adapters, integrates them into engine factories, updates retrieval guardrails for unsupported operations, introduces CI workflow changes for Postgres tests, and adds extensive Postgres e2e/unit test suites and hybrid write paths.

Changes

Cohort / File(s) Summary
Workflow & Docs
/.github/workflows/graph_db_tests.yml, CLAUDE.md
Added python-version input and a run-postgres-tests job using pgvector/pgvector:pg17; documented postgres as a supported Graph provider and listed prerequisites/limitations.
Graph Engine Factories
cognee/infrastructure/databases/graph/get_graph_engine.py, cognee/infrastructure/databases/vector/create_vector_engine.py, cognee/infrastructure/databases/unified/get_unified_engine.py
Recognize postgres and pghybrid (via USE_UNIFIED_PROVIDER) paths; create PostgresAdapter/PGVectorAdapter/ PostgresHybridAdapter from relational config; _create_hybrid_adapter made async.
Graph Interface
cognee/infrastructure/databases/graph/graph_db_interface.py
Added get_triplets_batch(offset, limit) with default NotImplementedError to the GraphDBInterface.
Postgres Graph Adapter
cognee/infrastructure/databases/graph/postgres/adapter.py
New async PostgresAdapter implementing GraphDBInterface: JSONB-backed node/edge schemas, indexes, batch upserts, reads (neighbors/connections/filtered graphs), metrics (recursive CTE), deletion, and get_triplets_batch.
Postgres Hybrid Adapter
cognee/infrastructure/databases/hybrid/postgres/adapter.py
New PostgresHybridAdapter composing PostgresAdapter + PGVectorAdapter: hybrid write/delete APIs (add_nodes_with_vectors, add_edges_with_vectors, delete_*_with_vectors), coordinated transactions, collection name validation, hybrid search search_graph_with_distances.
Retrieval & Guardrails
cognee/modules/retrieval/cypher_search_retriever.py, cognee/modules/retrieval/natural_language_retriever.py, cognee/tasks/temporal_awareness/index_graphiti_objects.py
Runtime type checks added to raise SearchTypeNotSupported or RuntimeError for Postgres or non-Neo4j backends before running Cypher/NL/Graphiti flows.
Storage Task Changes
cognee/tasks/storage/add_data_points.py
Branch to use hybrid write capabilities when engine exposes EngineCapability.HYBRID_WRITE, replacing separate graph/vector indexing with unified hybrid calls.
End-to-End & Adapter Tests
cognee/tests/e2e/postgres/* (test_graphdb_postgres.py, test_graphdb_shared.py, test_pghybrid.py, test_postgres_adapter.py, test_postgres_hybrid_adapter.py)
Added comprehensive E2E and adapter tests for Postgres and pghybrid: adapter unit tests, hybrid integration tests, shared E2E harness, and CI-targeted test runners.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~55 minutes

Possibly related PRs

Suggested labels

core-team

Suggested reviewers

  • lxobr
🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.52% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title is partially related to the changeset—it mentions the core feature (Postgres hybrid backend integration) but is incomplete and truncated. Complete the title to be concise and specific, e.g., 'Add PostgreSQL hybrid graph-vector adapter for joint operations' or 'Implement pghybrid backend merging graph and vector writes'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The description adequately covers objectives, acceptance criteria, change type, and checklists; all required sections are completed with substantive content.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gitguardian
Copy link
Copy Markdown

gitguardian bot commented Mar 26, 2026

⚠️ GitGuardian has uncovered 1 secret following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

Since your pull request originates from a forked repository, GitGuardian is not able to associate the secrets uncovered with secret incidents on your GitGuardian dashboard.
Skipping this check run and merging your pull request will create secret incidents on your GitGuardian dashboard.

🔎 Detected hardcoded secret in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
9573981 Triggered Generic Password 4347a95 .github/workflows/graph_db_tests.yml View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secret safely. Learn here the best practices.
  3. Revoke and rotate this secret.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

@DanielNScott DanielNScott marked this pull request as draft March 26, 2026 15:51
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (6)
cognee/modules/retrieval/natural_language_retriever.py (1)

108-120: Minor: Consider moving SearchTypeNotSupported import outside the conditional.

The import of SearchTypeNotSupported inside the if block works but is slightly unusual. For consistency with how PostgresAdapter and PostgresHybridAdapter are imported, consider moving it outside the conditional:

♻️ Suggested refactor
         from cognee.infrastructure.databases.graph.postgres.adapter import PostgresAdapter
         from cognee.infrastructure.databases.hybrid.postgres.adapter import PostgresHybridAdapter
+        from cognee.modules.retrieval.exceptions import SearchTypeNotSupported

         if isinstance(graph_engine, (PostgresAdapter, PostgresHybridAdapter)):
-            from cognee.modules.retrieval.exceptions import SearchTypeNotSupported
-
             raise SearchTypeNotSupported(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/modules/retrieval/natural_language_retriever.py` around lines 108 -
120, Move the SearchTypeNotSupported import out of the conditional so imports
are consistent: at the top of the block where PostgresAdapter and
PostgresHybridAdapter are imported, also import SearchTypeNotSupported (from
cognee.modules.retrieval.exceptions) and then keep only the
isinstance(graph_engine, (PostgresAdapter, PostgresHybridAdapter)) check to
raise SearchTypeNotSupported; this keeps import semantics consistent for
natural_language_retriever.py and makes the raise statement reference the
already-imported SearchTypeNotSupported.
cognee/infrastructure/databases/graph/postgres/adapter.py (1)

4-4: Unused import: asyncio.

The asyncio module is imported but not used in this file.

🧹 Remove unused import
-import asyncio
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/graph/postgres/adapter.py` at line 4, Remove
the unused top-level import of asyncio in this module: delete the line importing
asyncio so the file no longer imports an unused module (look for the import
asyncio statement at the top of the file in
cognee/infrastructure/databases/graph/postgres/adapter.py).
cognee/tests/test_graphdb_shared.py (1)

100-103: History count assertion may be fragile.

The assertion expects exactly 6 history entries after 3 searches. Per the get_history implementation (which UNIONs Query and Result entries), this assumes each search logs exactly 1 Query + 1 Result. If the search behavior changes (e.g., logging multiple results per search), this test will break.

Consider either:

  • Adding a comment explaining the expected breakdown (e.g., "3 queries + 3 results = 6")
  • Using >= 3 to assert at least the queries were logged
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/tests/test_graphdb_shared.py` around lines 100 - 103, The test's
strict assertion that len(history) == 6 is fragile because get_history() UNIONs
Query and Result entries and search behavior may produce more than one Result
per query; update the assertion in the test (around the history variable after
calling get_history(user.id)) to either assert len(history) >= 3 (to ensure at
least the three Query entries were logged) or add a clear comment documenting
the expected breakdown (e.g., "expecting 3 queries + 3 results = 6") if you
truly intend to lock to 6; reference the get_default_user(), get_history(),
history and provider variables when making the change.
cognee/infrastructure/databases/hybrid/postgres/adapter.py (2)

469-471: Broad exception swallowing may mask real errors.

Catching all exceptions when the Triplet table doesn't exist could hide legitimate database errors (connection issues, permission errors, etc.). Consider catching a more specific exception or at least logging the error.

🛡️ Proposed fix to narrow exception handling
             if triplet_ids:
                 try:
                     table = _validate_table_name("Triplet_text")
                     await session.execute(
                         text(f"DELETE FROM {table} WHERE CAST(id AS text) = ANY(:ids)"),
                         {"ids": triplet_ids},
                     )
                 except Exception:
-                    # Triplet collection may not exist
-                    pass
+                except Exception as e:
+                    # Triplet collection may not exist; log but don't fail
+                    logger.debug("Triplet_text table delete skipped: %s", e)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 469
- 471, The bare "except Exception:" block around the Triplet collection/table
access swallows all errors; narrow it to catch the specific "table not found"
exception from your DB driver (e.g., psycopg2.errors.UndefinedTable or
SQLAlchemy's NoSuchTableError) and log other exceptions instead of ignoring
them; update the handler in the block that deals with the Triplet
table/collection to catch the specific exception, call the module logger (or
processLogger) to record the error detail when it occurs, and re-raise or let
other exceptions propagate so real DB errors (connection/permission) are not
hidden.

261-264: Collection name parsing assumes underscore-separated format.

The rsplit("_", 1) assumes collection names follow {TypeName}_{field_name} format. If a type name contains underscores (e.g., My_Entity), this will incorrectly split as My and Entity_name.

Consider passing the type name and field name separately rather than reconstructing them from the collection string.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 261
- 264, The loop that calls self._vector.create_vector_index uses
collection.rsplit("_", 1) which breaks when type names contain underscores;
change the API so vector_groups yields explicit (type_name, field_name) pairs
instead of encoded strings, update this loop to: for type_name, field_name in
vector_groups: await self._vector.create_vector_index(type_name, field_name),
and update any callers that build vector_groups (or the producer function) to
return tuples rather than underscored strings; ensure any code that previously
relied on collection strings is adjusted to construct or accept (type_name,
field_name) so create_vector_index receives correct values.
cognee/tests/unit/infrastructure/databases/hybrid/test_postgres_hybrid_adapter.py (1)

46-55: Unusual __new__ pattern to bypass constructor.

This workaround avoids PGVectorAdapter.__init__ calling get_relational_engine() which returns a cached instance tied to a previous event loop. While the comment on lines 34-35 explains the intent, consider adding a brief inline comment near __new__ itself explaining why this pattern is necessary.

This is a known pytest-asyncio limitation with lru_cache-decorated factories.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@cognee/tests/unit/infrastructure/databases/hybrid/test_postgres_hybrid_adapter.py`
around lines 46 - 55, The test uses PGVectorAdapter.__new__(PGVectorAdapter) to
bypass PGVectorAdapter.__init__ and avoid calling get_relational_engine() (which
returns an lru_cache-backed engine tied to a previous event loop); add a concise
inline comment directly above the __new__ call explaining this pytest-asyncio +
lru_cache event-loop issue and why the constructor is being skipped (mention
get_relational_engine and lru_cache) so future readers understand the
workaround.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.github/workflows/graph_db_tests.yml:
- Around line 93-96: The workflow is missing a workflow_call input for
python-version which is referenced by the Cognee Setup step (uses:
./.github/actions/cognee_setup) in multiple jobs; add a python-version entry
under workflow_call.inputs (e.g., name: python-version, required: false or true,
with a sensible default like "3.11") so the input is declared and can be passed
from calling workflows (or defaulted) and update any job invocation if you want
to make it required; ensure the input key is exactly python-version to match the
existing uses.

In `@cognee/infrastructure/databases/graph/postgres/adapter.py`:
- Around line 418-425: The code building WHERE clauses directly interpolates
attribute names (the n.{attr} fragment in the loop over attribute_filters) which
can lead to SQL injection; update the logic in the loop that constructs
where_parts/params to validate each attr against a server-side allowlist of
permitted column names (or map input keys to safe column identifiers) and raise
an error if an attribute is not recognized, then continue using parameterized
values (params / :filt_...) for the values; specifically change the code that
generates "n.{attr} = ANY(:{param})" to only insert attr after verifying it
exists in the allowlist (or mapped name) so dynamic names are never used raw in
the SQL.
- Around line 614-660: Add get_triplets_batch to the GraphDBInterface so callers
can rely on a formal contract instead of using hasattr; declare the async
signature (async def get_triplets_batch(self, offset: int, limit: int) ->
List[Dict[str, Any]]) in GraphDBInterface (or as an abstractmethod raising
NotImplementedError) and update adapters accordingly (PostgresAdapter,
Neo4jAdapter, KuzuAdapter already implement it; either implement a
no-op/NotImplemented in NeptuneGraphDB or document that NeptuneGraphDB does not
support this method), and update any callers to call the interface method rather
than using hasattr checks.

---

Nitpick comments:
In `@cognee/infrastructure/databases/graph/postgres/adapter.py`:
- Line 4: Remove the unused top-level import of asyncio in this module: delete
the line importing asyncio so the file no longer imports an unused module (look
for the import asyncio statement at the top of the file in
cognee/infrastructure/databases/graph/postgres/adapter.py).

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py`:
- Around line 469-471: The bare "except Exception:" block around the Triplet
collection/table access swallows all errors; narrow it to catch the specific
"table not found" exception from your DB driver (e.g.,
psycopg2.errors.UndefinedTable or SQLAlchemy's NoSuchTableError) and log other
exceptions instead of ignoring them; update the handler in the block that deals
with the Triplet table/collection to catch the specific exception, call the
module logger (or processLogger) to record the error detail when it occurs, and
re-raise or let other exceptions propagate so real DB errors
(connection/permission) are not hidden.
- Around line 261-264: The loop that calls self._vector.create_vector_index uses
collection.rsplit("_", 1) which breaks when type names contain underscores;
change the API so vector_groups yields explicit (type_name, field_name) pairs
instead of encoded strings, update this loop to: for type_name, field_name in
vector_groups: await self._vector.create_vector_index(type_name, field_name),
and update any callers that build vector_groups (or the producer function) to
return tuples rather than underscored strings; ensure any code that previously
relied on collection strings is adjusted to construct or accept (type_name,
field_name) so create_vector_index receives correct values.

In `@cognee/modules/retrieval/natural_language_retriever.py`:
- Around line 108-120: Move the SearchTypeNotSupported import out of the
conditional so imports are consistent: at the top of the block where
PostgresAdapter and PostgresHybridAdapter are imported, also import
SearchTypeNotSupported (from cognee.modules.retrieval.exceptions) and then keep
only the isinstance(graph_engine, (PostgresAdapter, PostgresHybridAdapter))
check to raise SearchTypeNotSupported; this keeps import semantics consistent
for natural_language_retriever.py and makes the raise statement reference the
already-imported SearchTypeNotSupported.

In `@cognee/tests/test_graphdb_shared.py`:
- Around line 100-103: The test's strict assertion that len(history) == 6 is
fragile because get_history() UNIONs Query and Result entries and search
behavior may produce more than one Result per query; update the assertion in the
test (around the history variable after calling get_history(user.id)) to either
assert len(history) >= 3 (to ensure at least the three Query entries were
logged) or add a clear comment documenting the expected breakdown (e.g.,
"expecting 3 queries + 3 results = 6") if you truly intend to lock to 6;
reference the get_default_user(), get_history(), history and provider variables
when making the change.

In
`@cognee/tests/unit/infrastructure/databases/hybrid/test_postgres_hybrid_adapter.py`:
- Around line 46-55: The test uses PGVectorAdapter.__new__(PGVectorAdapter) to
bypass PGVectorAdapter.__init__ and avoid calling get_relational_engine() (which
returns an lru_cache-backed engine tied to a previous event loop); add a concise
inline comment directly above the __new__ call explaining this pytest-asyncio +
lru_cache event-loop issue and why the constructor is being skipped (mention
get_relational_engine and lru_cache) so future readers understand the
workaround.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 826f5eaf-18fc-40a7-bcd6-aa440d8b7fe9

📥 Commits

Reviewing files that changed from the base of the PR and between 7cfb55f and 47794eb.

📒 Files selected for processing (17)
  • .github/workflows/graph_db_tests.yml
  • CLAUDE.md
  • cognee/infrastructure/databases/graph/get_graph_engine.py
  • cognee/infrastructure/databases/graph/postgres/__init__.py
  • cognee/infrastructure/databases/graph/postgres/adapter.py
  • cognee/infrastructure/databases/hybrid/postgres/__init__.py
  • cognee/infrastructure/databases/hybrid/postgres/adapter.py
  • cognee/infrastructure/databases/unified/get_unified_engine.py
  • cognee/infrastructure/databases/vector/create_vector_engine.py
  • cognee/modules/retrieval/cypher_search_retriever.py
  • cognee/modules/retrieval/natural_language_retriever.py
  • cognee/tasks/temporal_awareness/index_graphiti_objects.py
  • cognee/tests/test_graphdb_postgres.py
  • cognee/tests/test_graphdb_shared.py
  • cognee/tests/unit/infrastructure/databases/graph/test_postgres_adapter.py
  • cognee/tests/unit/infrastructure/databases/hybrid/__init__.py
  • cognee/tests/unit/infrastructure/databases/hybrid/test_postgres_hybrid_adapter.py

@DanielNScott DanielNScott marked this pull request as ready for review March 26, 2026 17:04
@DanielNScott
Copy link
Copy Markdown
Author

Hey @dexters1, @lxobr asked me to tag you. If you meant assign, Lazar, it looks like I can't do that without having my access modified, although I'm not 100% certain at this moment.

@dexters1 dexters1 self-requested a review March 26, 2026 17:50
embedding_engine=embedding_engine,
)

elif vector_db_provider.lower() == "pghybrid":
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a Postgres vector database code and implementation. Does this one add some optimizations? Do we want to keep both available as options

# Single session, single commit for the whole batch
async with self._session() as session:
for row in rows:
await session.execute(
Copy link
Copy Markdown
Collaborator

@dexters1 dexters1 Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we batch insert the rows instead of adding row per row? Should have better performance then

@dexters1
Copy link
Copy Markdown
Collaborator

dexters1 commented Mar 26, 2026

Hey @dexters1, @lxobr asked me to tag you. If you meant assign, Lazar, it looks like I can't do that without having my access modified, although I'm not 100% certain at this moment.

Thanks Dan, added myself as a reviewer.

@dexters1
Copy link
Copy Markdown
Collaborator

dexters1 commented Mar 26, 2026

@DanielNScott @lxobr Do we have any speed metrics for the integration? So we can compare the performance to KuzuDB and Apache AGE

I'd like the metrics if possible before deep diving into the implementation

@dexters1
Copy link
Copy Markdown
Collaborator

@DanielNScott @lxobr multi user / multi dataset mode is not supported with this PR, just something we all keep in mind.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
cognee/infrastructure/databases/graph/get_graph_engine.py (1)

216-219: Line exceeds 100-character limit.

This f-string line is quite long. Consider extracting the provider list to a variable for readability and to comply with the line length guideline.

♻️ Suggested refactor
+    all_providers = list(supported_databases.keys()) + [
+        "neo4j", "kuzu", "kuzu-remote", "postgres", "pghybrid", "neptune", "neptune_analytics"
+    ]
     raise EnvironmentError(
         f"Unsupported graph database provider: {graph_database_provider}. "
-        f"Supported providers are: {', '.join(list(supported_databases.keys()) + ['neo4j', 'kuzu', 'kuzu-remote', 'postgres', 'pghybrid', 'neptune', 'neptune_analytics'])}"
+        f"Supported providers are: {', '.join(all_providers)}"
     )

As per coding guidelines: "Maintain line length of 100 characters maximum".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/graph/get_graph_engine.py` around lines 216 -
219, The long f-string in the EnvironmentError should be shortened by extracting
the supported provider list into a separate variable and using that variable in
the error message; compute a provider_list (e.g., provider_list = ",
".join(list(supported_databases.keys()) + ['neo4j', 'kuzu', 'kuzu-remote',
'postgres', 'pghybrid', 'neptune', 'neptune_analytics'])) and then raise
EnvironmentError(f"Unsupported graph database provider:
{graph_database_provider}. Supported providers are: {provider_list}"), updating
the code around the raise in the same scope where supported_databases and
graph_database_provider are used.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@cognee/infrastructure/databases/graph/get_graph_engine.py`:
- Around line 216-219: The long f-string in the EnvironmentError should be
shortened by extracting the supported provider list into a separate variable and
using that variable in the error message; compute a provider_list (e.g.,
provider_list = ", ".join(list(supported_databases.keys()) + ['neo4j', 'kuzu',
'kuzu-remote', 'postgres', 'pghybrid', 'neptune', 'neptune_analytics'])) and
then raise EnvironmentError(f"Unsupported graph database provider:
{graph_database_provider}. Supported providers are: {provider_list}"), updating
the code around the raise in the same scope where supported_databases and
graph_database_provider are used.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: e58c2e01-09a5-414b-9bfb-4b35e081e060

📥 Commits

Reviewing files that changed from the base of the PR and between 47794eb and 4d48851.

📒 Files selected for processing (1)
  • cognee/infrastructure/databases/graph/get_graph_engine.py

Problems:
- 36+ graph and 60+ vector call sites block pghybrid error guard
- hybrid adapter missing embedding_engine attribute for indexing
- dotenv override=True in cognee init overwrites test env vars
- cached configs survive env var changes across test boundaries

Solutions:
- add USE_UNIFIED_PROVIDER env var read at top of both factories
- get_graph_engine returns PostgresAdapter when flag is pghybrid
- create_vector_engine returns PGVectorAdapter when flag is pghybrid
- get_unified_engine reads flag for hybrid detection and construction
- expose embedding_engine on hybrid adapter from vector adapter
- e2e test sets env vars after import then clears all five caches

Decisions:
- explicit env var flag over auto-detection from two provider configs
- each factory returns independent component, not shared hybrid instance
- env vars set after cognee import to survive dotenv override=True
- hybrid combined queries only available through get_unified_engine
@DanielNScott
Copy link
Copy Markdown
Author

Speed comparison running on a list of items from inside the repository (contributors.md, claude.md, and several others, used as a test corpus)

Backend      add      cognify   chunks   summaries   graph_completion   total
-----------  -------  --------  -------  ----------  -----------------  --------
default      3.25s    88.31s    1.34s    0.85s       14.76s             108.52s
postgres     3.21s    80.24s    0.96s    0.98s       14.38s             99.77s
pghybrid     3.64s    89.94s    1.30s    1.10s       19.95s             115.92s
neo4j        3.15s    84.82s    1.18s    1.37s       18.55s             109.07s

To your questions / points:

  • yes we can (and should) batch instead of looping in the SQL
  • good point re SQLAlchemy
  • vector adapter isn't actually being replaced, it's just being wrapped for the unified case
    • the point of this is to be able to write joint graph + vector SQL moving forward

@DanielNScott
Copy link
Copy Markdown
Author

Summary of issues and decision-making related to this PR

pghybrid-integration.md

@dexters1
Copy link
Copy Markdown
Collaborator

@DanielNScott check the failing CI/CD tests. We should move tests that require a postgres connection out of unit tests and handle postgres related integration tests as their own github action

@@ -0,0 +1,378 @@
"""Unit tests for the Postgres hybrid adapter.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests that require a Postgres DB should not be part of unit tests. Can you move them out and have separate postgres e2e tests folder as they are more e2e tests

@dexters1
Copy link
Copy Markdown
Collaborator

Speed comparison running on a list of items from inside the repository (contributors.md, claude.md, and several others, used as a test corpus)

Backend      add      cognify   chunks   summaries   graph_completion   total
-----------  -------  --------  -------  ----------  -----------------  --------
default      3.25s    88.31s    1.34s    0.85s       14.76s             108.52s
postgres     3.21s    80.24s    0.96s    0.98s       14.38s             99.77s
pghybrid     3.64s    89.94s    1.30s    1.10s       19.95s             115.92s
neo4j        3.15s    84.82s    1.18s    1.37s       18.55s             109.07s

To your questions / points:

  • yes we can (and should) batch instead of looping in the SQL

  • good point re SQLAlchemy

  • vector adapter isn't actually being replaced, it's just being wrapped for the unified case

    • the point of this is to be able to write joint graph + vector SQL moving forward

Speed comparison running on a list of items from inside the repository (contributors.md, claude.md, and several others, used as a test corpus)

Backend      add      cognify   chunks   summaries   graph_completion   total
-----------  -------  --------  -------  ----------  -----------------  --------
default      3.25s    88.31s    1.34s    0.85s       14.76s             108.52s
postgres     3.21s    80.24s    0.96s    0.98s       14.38s             99.77s
pghybrid     3.64s    89.94s    1.30s    1.10s       19.95s             115.92s
neo4j        3.15s    84.82s    1.18s    1.37s       18.55s             109.07s

To your questions / points:

  • yes we can (and should) batch instead of looping in the SQL

  • good point re SQLAlchemy

  • vector adapter isn't actually being replaced, it's just being wrapped for the unified case

    • the point of this is to be able to write joint graph + vector SQL moving forward

What LLM model are you using? Can you try benchmarks with a faster model as graph_completion usually takes around 3.5s - 4s

@DanielNScott
Copy link
Copy Markdown
Author

Updated timing: using LLM env settings provided by Igor:

Backend      add      cognify   chunks   summaries   graph_completion   total
-----------  -------  --------  -------  ----------  -----------------  --------
default      5.35s    39.95s    1.43s    1.08s       8.82s              56.63s
postgres     3.61s    52.92s    1.41s    1.80s       7.32s              67.06s
pghybrid     2.77s    49.36s    1.60s    1.80s       12.65s             68.19s
neo4j        4.25s    53.16s    1.54s    1.71s       22.80s             83.47s

Problems:
- Inadequate batching
   - add_nodes loops N awaits, one INSERT round-trip each
   - add_edges same per-row loop pattern
   - has_edges loops N SELECT EXISTS queries
- raw text() SQL construction, not SQLAlchemy

Solutions:
- multi-row insert instead of looping
   - add_nodes uses pg_insert with multi-row VALUES batch
   - add_edges uses pg_insert with multi-row VALUES batch
   - has_edges uses SQLAlchemy values() with EXISTS join

- added SQLAlchemy Core table definitions for DML
   - will remove next commit to maintain single source of truth

Decisions:
- maximum batching optimization
   - multi-row INSERT over executemany for single-statement execution
   - executemany pipelines but still N server-side executions
   - looped awaits worst, executemany middle, VALUES best batching

- chose SQLAlchemy Core over raw text() for proactive safety
   - text() with bind params was safe but not project convention
   - SQLAlchemy eliminates all raw SQL string construction

- keep problematic reproduction of DDL logic for commit specificity
   - will fix in follow up
Problems:
- raw DDL strings and Core table objects duplicated schema

Solutions:
- replaced _SCHEMA_DDL list with Core Table and Index definitions
- initialize() now uses _meta.create_all() instead of raw DDL
- removed asyncio import (unused)
- replaced postgresql TIMESTAMP with generic DateTime(timezone=True)
@lxobr
Copy link
Copy Markdown
Collaborator

lxobr commented Mar 31, 2026

Also, @DanielNScott , could you please check the failing test and address CodeRabbits comments if they make sense or dismiss them?

…adapter.

Note:
- All problems below were cosmetic or stylistic, not substantive.

Problems:
- SQL injection via unvalidated attribute names in get_filtered_graph_data
- empty attribute_filters can produce invalid SQL syntax
- local "values" var shadows sqlalchemy values import in add_edges
- JSON serialize then deserialize indirection in add_nodes
- get_nodeset_subgraph issues four sequential DB queries
- get_filtered_graph_data issued two sequential DB queries
- public methods missing docstrings, unlike Neo4j and Kuzu adapters
- section banners inconsistent with other adapters
- narrating inline comments restate what code already says
- python-version input missing from graph_db_tests workflow
- SearchTypeNotSupported imported inline instead of at module level

Solutions:
- add attribute whitelist, raise ValueError for invalid filter keys
- fall back to get_graph_data when attribute_filters is empty
- rename local values to rows in add_edges
- inline property dict construction, remove _split_core_and_extra
- combine get_nodeset_subgraph into single CTE query
- combine get_filtered_graph_data into single CTE query
- add Parameters/Returns docstrings to all public methods
- remove section banners throughout adapter
- trim narrating comments, keep complex clarifying ones
- add python-version input with default 3.11 to workflow_call
- move SearchTypeNotSupported to module-level import

Reasoning and Related Notes:
- prune_system metadata=True drops relational DB including graph tables
   - _ensure_initialized kept defensive as a result
- JSON round-trip in add_nodes retained for UUID and datetime serialization
   - specifically, enforces JSONB compatibility (postgres wants python dict)
   - alternative is in-place modification
   - adding rules for doing this would replicate JSONEncoder
   - JSONEncoder outputs strings, so we need to convert back if used
Problems:
- tuple merge order lets properties dict override authoritative node ID
- __init__ and _session missing type annotations
- magic number 6 in test_graphdb_shared unexplained
- raised but incorrect: add weighted edges test

Solutions:
- swap merge order so tuple ID always wins
- add type annotations to __init__ and _session
- add comment explaining 3 searches x 2 entries = 6
- no duplication of weighted edges test necessary or appropriate

Reasoning and Related Notes:
- weighted edges test is backend-agnostic, exercises get_graph_from_model only
   - running it under Postgres job would not test adapter round-trip
   - no backend actually has a round-trip weighted edges test
   - therefore kuzu and neo4j cases are incorrect and duplicates

- recursive CTE performance in get_graph_metrics is not part of pipeline
   - Kuzu returns -1 for components, Neo4j uses GDS plugin
…al-graphdb-in-postgres' into feature/cog-4469-graph-aware-embeddings-integrate-joint-postgres-graph-db-and
@DanielNScott DanielNScott force-pushed the feature/cog-4469-graph-aware-embeddings-integrate-joint-postgres-graph-db-and branch from 4d48851 to b3203b3 Compare April 1, 2026 21:46
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

♻️ Duplicate comments (1)
cognee/tests/unit/infrastructure/databases/hybrid/test_postgres_hybrid_adapter.py (1)

1-6: ⚠️ Potential issue | 🟠 Major

This module is exercising integration behavior from cognee/tests/unit/.

The fixture requires a real Postgres/pgvector backend and a live embedding engine, so these are not hermetic unit tests. Keeping them under unit/ will make local unit runs depend on external services and repeats the earlier concern about database-backed tests living in the unit suite.

As per coding guidelines "Organize tests in cognee/tests/ directory with subdirectories: unit/, integration/, cli_tests/, tasks/" and "Avoid external state in Python tests; rely on test fixtures and CI-provided environment variables".

Also applies to: 21-23, 48-49

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@cognee/tests/unit/infrastructure/databases/hybrid/test_postgres_hybrid_adapter.py`
around lines 1 - 6, The tests in test_postgres_hybrid_adapter.py are integration
tests (they require a real Postgres/pgvector backend and embedding engine) and
should not live in the unit suite; move the file into the integration tests
directory (or add pytest.mark.integration) and update the tests (e.g., the
top-level test module and any fixtures used in that module) to guard against
missing external services by reading CI-provided env vars
(DB_HOST/DB_PORT/DB_USERNAME/DB_PASSWORD/DB_NAME) and skipping when those are
not set (use pytest.skip or pytest.importorskip), and update CI/test discovery
accordingly so unit runs remain hermetic.
🧹 Nitpick comments (3)
cognee/tests/test_graphdb_shared.py (1)

1-29: Move this shared harness under the integration-test area.

This file is a reusable full-flow harness, not a unit-style test module. Keeping it as cognee/tests/test_graphdb_shared.py makes the test tree harder to navigate than putting shared integration helpers alongside the rest of the integration suite.

As per coding guidelines "Place Python integration tests under cognee/tests/integration/" and "Organize tests in cognee/tests/ directory with subdirectories: unit/, integration/, cli_tests/, tasks/".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/tests/test_graphdb_shared.py` around lines 1 - 29, This shared
full-flow harness (module containing TEST_DATA_DIR and run_graph_db_test) should
be relocated from the unit-test tree into the integration-test package: move the
file into the project's integration tests namespace (e.g., tests.integration) so
test discovery and organization follow the guidelines; update any module imports
or relative references that break after moving (references to TEST_DATA_DIR,
run_graph_db_test, and any top-level imports such as
cognee.infrastructure.files.storage) and adjust test-suite/test-runner
references so callers import the harness from its new integration package
instead of cognee.tests.test_graphdb_shared.
cognee/infrastructure/databases/hybrid/postgres/adapter.py (2)

561-574: Consider using named column access for maintainability.

Accessing row elements by index (row[0], row[1], etc.) is fragile if the SQL column order changes. Using SQLAlchemy's row mapping provides safer access.

♻️ Proposed refactor for named access
         async with self._graph._session() as session:
             result = await session.execute(sql, params)
-            rows = result.fetchall()
+            rows = result.mappings().fetchall()

         return [
             {
-                "node_id": row[0],
-                "node_name": row[1],
-                "node_type": row[2],
-                "relationship_name": row[3],
-                "distance": row[-1],
+                "node_id": row["node_id"],
+                "node_name": row["node_name"],
+                "node_type": row["node_type"],
+                "relationship_name": row["relationship_name"],
+                "distance": row["distance"],
             }
             for row in rows
         ]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 561
- 574, The current code builds dicts from positional row indexes (in the block
using self._graph._session(), variable result and rows), which is fragile;
change to use SQLAlchemy named access by calling result.mappings().all() (or
using row._mapping) so you can reference columns by name (e.g., "node_id",
"node_name", "node_type", "relationship_name", "distance") instead of row[0],
row[1], etc.; update the list comprehension to pull values by those names for
maintainability and resilience to column-order changes.

52-57: Add type hints and docstring to constructor.

The constructor parameters lack type annotations, which reduces clarity and IDE support.

🔧 Suggested improvement
-    def __init__(self, graph_adapter, vector_adapter):
+    def __init__(
+        self,
+        graph_adapter: "PostgresAdapter",
+        vector_adapter: "PGVectorAdapter",
+    ) -> None:
+        """Initialize hybrid adapter with graph and vector backends.
+
+        Args:
+            graph_adapter: PostgresAdapter instance for graph operations.
+            vector_adapter: PGVectorAdapter instance for vector operations.
+        """
         self._graph = graph_adapter
         self._vector = vector_adapter

Note: Import the types via TYPE_CHECKING to avoid circular imports:

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from cognee.infrastructure.databases.graph.postgres.adapter import PostgresAdapter
    from cognee.infrastructure.databases.vector.pgvector.adapter import PGVectorAdapter
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 52 -
57, Update the __init__ constructor to include type hints for graph_adapter and
vector_adapter and add a concise docstring describing parameters and purpose;
import the PostgresAdapter and PGVectorAdapter types under TYPE_CHECKING as
suggested to avoid circular imports, annotate the parameters (e.g.,
graph_adapter: "PostgresAdapter", vector_adapter: "PGVectorAdapter") and the
instance attribute embedding_engine, and ensure the docstring mentions that
embedding_engine is exposed for callers (used by index_data_points) and the
roles of self._graph and self._vector.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.github/workflows/graph_db_tests.yml:
- Around line 74-133: The CI job run-postgres-tests currently runs only
test_postgres_adapter.py and test_graphdb_postgres.py; add steps to execute the
new unified-provider tests cognee/tests/test_pghybrid.py and
cognee/tests/unit/infrastructure/databases/hybrid/test_postgres_hybrid_adapter.py
so the pghybrid path is covered. In the run-postgres-tests job (look for the job
name run-postgres-tests and the existing steps "Run Postgres Adapter Unit Tests"
and "Run Postgres Graph E2E Test") add two new steps that set the same DB env
vars (DB_PROVIDER, DB_HOST, DB_PORT, DB_USERNAME, DB_PASSWORD, DB_NAME) and run
the appropriate test commands (pytest for the unit hybrid adapter test and a
python invocation for test_pghybrid.py) so both the unit and E2E hybrid paths
are executed in CI.

In `@cognee/infrastructure/databases/graph/postgres/adapter.py`:
- Around line 772-773: The ORDER BY in get_triplets_batch is unstable because it
only sorts by e.source_id and e.target_id, allowing rows with the same
source/target but different relationship_name to change order across pages;
update the SQL used in get_triplets_batch to include e.relationship_name (i.e.,
ORDER BY e.source_id, e.target_id, e.relationship_name) so pagination with
OFFSET/LIMIT produces a deterministic, stable sequence.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py`:
- Around line 254-261: The loop that builds embeddings_by_collection currently
calls DataPoint.get_embeddable_data(dp) and passes the raw list (which may
contain None) to self._vector.embed_data, causing failures; change the logic in
the block that iterates vector_groups so you first build a filtered list of (dp,
text) pairs where DataPoint.get_embeddable_data(dp) is not None, call
self._vector.embed_data only with the non-None texts, then zip the returned
vectors back to the corresponding DataPoint objects and populate
embeddings_by_collection[collection]; ensure you skip (or optionally log)
datapoints whose embeddable data is None so embed_data never receives None
values.
- Around line 264-267: The loop over vector_groups can raise IndexError when a
collection name lacks an underscore; update the loop in the method containing
the for collection in vector_groups to defensively split the collection string
(use rpartition/rsplit with check) before calling
self._vector.create_vector_index so it safely handles names without an
underscore (e.g., fall back to a default field or skip/log the malformed
collection). Ensure you reference collection and call
self._vector.create_vector_index(collection_type, field_name) only after
verifying both parts exist and log or skip otherwise.
- Around line 473-482: The current bare "except Exception: pass" in the deletion
block around session.execute hides real errors; replace it by first checking
collection/table existence with await
self._vector.has_collection("Triplet_text") before calling _validate_table_name
and session.execute, or if you prefer catching errors, catch only the specific
DB exception that means "table doesn't exist" (e.g., the DB-specific
UndefinedTable/ProgrammingError) and log/raise other exceptions; reference the
_validate_table_name("Triplet_text"), session.execute(...), and
self._vector.has_collection("Triplet_text") symbols when making the change so
only the intended missing-table case is ignored while legitimate errors
propagate or are logged.

In `@cognee/infrastructure/databases/unified/get_unified_engine.py`:
- Around line 69-81: The code builds the asyncpg DSN from
get_relational_config() values that may be None; add a fast-fail validation
before constructing connection_string: in get_unified_engine (the block that
calls get_relational_config()), check that relational_config.db_username,
db_password, db_host, db_port (and db_name if desired) are not None/empty and
raise a clear ValueError (or custom exception) explaining the missing pghybrid
relational config fields; only construct connection_string and instantiate the
vector adapter (PostgresAdapter/any vector client) after validation passes so
you fail early with an actionable message.

In `@cognee/tasks/temporal_awareness/index_graphiti_objects.py`:
- Around line 23-32: The current check only rejects Postgres adapters
(PostgresAdapter, PostgresHybridAdapter) but should reject any non-Neo4j backend
so Neo4j-only Cypher never runs; change the guard around graph_engine to
validate that the backend is Neo4j and raise otherwise (for example, replace the
isinstance(...) check with either an explicit Neo4j adapter type check
(isinstance(graph_engine, Neo4jAdapter)) or a provider attribute check like if
getattr(graph_engine, "provider", "").lower() != "neo4j": raise
RuntimeError(...) so any non-Neo4j engine (Kuzu, Postgres, etc.) is rejected
early).

In `@cognee/tests/test_pghybrid.py`:
- Around line 20-42: The test currently mutates global process state at import
by setting os.environ[...] and calling cache_clear() for get_relational_config,
get_graph_config, create_relational_engine, _create_graph_engine and
_create_vector_engine; move all of those os.environ assignments and
cache_clear() calls out of module scope into a dedicated setup function (e.g.,
main() or a helper called from main) so nothing is executed at import time, and
convert this file into an integration test under cognee/tests/integration/ (or
update its name to avoid test_*.py top-level import by test collectors) so the
environment setup runs only when the integration test is invoked by CI or the
test runner.

In
`@cognee/tests/unit/infrastructure/databases/hybrid/test_postgres_hybrid_adapter.py`:
- Around line 295-297: The current test loop over connections uses an or so a
missing field can still pass; update the assertion in the loop that iterates
over connections (variables connections and edge, checking
edge["relationship_name"] == "RELATED_TO") to require both fields and values
exist by asserting the edge contains both "weight" and "edge_text" and that
edge["weight"] == 0.8 and edge["edge_text"] == "quantum ML" (or use edge.get
checks combined with a logical AND) so the test fails if either field is absent
or incorrect.

---

Duplicate comments:
In
`@cognee/tests/unit/infrastructure/databases/hybrid/test_postgres_hybrid_adapter.py`:
- Around line 1-6: The tests in test_postgres_hybrid_adapter.py are integration
tests (they require a real Postgres/pgvector backend and embedding engine) and
should not live in the unit suite; move the file into the integration tests
directory (or add pytest.mark.integration) and update the tests (e.g., the
top-level test module and any fixtures used in that module) to guard against
missing external services by reading CI-provided env vars
(DB_HOST/DB_PORT/DB_USERNAME/DB_PASSWORD/DB_NAME) and skipping when those are
not set (use pytest.skip or pytest.importorskip), and update CI/test discovery
accordingly so unit runs remain hermetic.

---

Nitpick comments:
In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py`:
- Around line 561-574: The current code builds dicts from positional row indexes
(in the block using self._graph._session(), variable result and rows), which is
fragile; change to use SQLAlchemy named access by calling
result.mappings().all() (or using row._mapping) so you can reference columns by
name (e.g., "node_id", "node_name", "node_type", "relationship_name",
"distance") instead of row[0], row[1], etc.; update the list comprehension to
pull values by those names for maintainability and resilience to column-order
changes.
- Around line 52-57: Update the __init__ constructor to include type hints for
graph_adapter and vector_adapter and add a concise docstring describing
parameters and purpose; import the PostgresAdapter and PGVectorAdapter types
under TYPE_CHECKING as suggested to avoid circular imports, annotate the
parameters (e.g., graph_adapter: "PostgresAdapter", vector_adapter:
"PGVectorAdapter") and the instance attribute embedding_engine, and ensure the
docstring mentions that embedding_engine is exposed for callers (used by
index_data_points) and the roles of self._graph and self._vector.

In `@cognee/tests/test_graphdb_shared.py`:
- Around line 1-29: This shared full-flow harness (module containing
TEST_DATA_DIR and run_graph_db_test) should be relocated from the unit-test tree
into the integration-test package: move the file into the project's integration
tests namespace (e.g., tests.integration) so test discovery and organization
follow the guidelines; update any module imports or relative references that
break after moving (references to TEST_DATA_DIR, run_graph_db_test, and any
top-level imports such as cognee.infrastructure.files.storage) and adjust
test-suite/test-runner references so callers import the harness from its new
integration package instead of cognee.tests.test_graphdb_shared.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 4e311dbd-1f9f-42bb-aaeb-c7ed2285bd82

📥 Commits

Reviewing files that changed from the base of the PR and between 4d48851 and b3203b3.

📒 Files selected for processing (18)
  • .github/workflows/graph_db_tests.yml
  • CLAUDE.md
  • cognee/infrastructure/databases/graph/get_graph_engine.py
  • cognee/infrastructure/databases/graph/postgres/__init__.py
  • cognee/infrastructure/databases/graph/postgres/adapter.py
  • cognee/infrastructure/databases/hybrid/postgres/__init__.py
  • cognee/infrastructure/databases/hybrid/postgres/adapter.py
  • cognee/infrastructure/databases/unified/get_unified_engine.py
  • cognee/infrastructure/databases/vector/create_vector_engine.py
  • cognee/modules/retrieval/cypher_search_retriever.py
  • cognee/modules/retrieval/natural_language_retriever.py
  • cognee/tasks/temporal_awareness/index_graphiti_objects.py
  • cognee/tests/test_graphdb_postgres.py
  • cognee/tests/test_graphdb_shared.py
  • cognee/tests/test_pghybrid.py
  • cognee/tests/unit/infrastructure/databases/graph/test_postgres_adapter.py
  • cognee/tests/unit/infrastructure/databases/hybrid/__init__.py
  • cognee/tests/unit/infrastructure/databases/hybrid/test_postgres_hybrid_adapter.py
✅ Files skipped from review due to trivial changes (2)
  • CLAUDE.md
  • cognee/tests/test_graphdb_postgres.py
🚧 Files skipped from review as they are similar to previous changes (4)
  • cognee/infrastructure/databases/vector/create_vector_engine.py
  • cognee/modules/retrieval/cypher_search_retriever.py
  • cognee/infrastructure/databases/graph/get_graph_engine.py
  • cognee/tests/unit/infrastructure/databases/graph/test_postgres_adapter.py

Comment on lines +264 to +267
for collection in vector_groups:
await self._vector.create_vector_index(
collection.rsplit("_", 1)[0], collection.rsplit("_", 1)[1]
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential IndexError if collection name has no underscore.

collection.rsplit("_", 1)[1] will raise IndexError if the collection name doesn't contain an underscore. While the current naming convention ({TypeName}_{field_name}) should always include one, defensive handling would be safer.

🛡️ Proposed fix with defensive split
         # Ensure vector collection tables exist
         for collection in vector_groups:
-            await self._vector.create_vector_index(
-                collection.rsplit("_", 1)[0], collection.rsplit("_", 1)[1]
-            )
+            parts = collection.rsplit("_", 1)
+            if len(parts) == 2:
+                await self._vector.create_vector_index(parts[0], parts[1])
+            else:
+                logger.warning(f"Unexpected collection name format: {collection}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 264
- 267, The loop over vector_groups can raise IndexError when a collection name
lacks an underscore; update the loop in the method containing the for collection
in vector_groups to defensively split the collection string (use
rpartition/rsplit with check) before calling self._vector.create_vector_index so
it safely handles names without an underscore (e.g., fall back to a default
field or skip/log the malformed collection). Ensure you reference collection and
call self._vector.create_vector_index(collection_type, field_name) only after
verifying both parts exist and log or skip otherwise.

Comment on lines +20 to +42
os.environ["USE_UNIFIED_PROVIDER"] = "pghybrid"
os.environ["ENABLE_BACKEND_ACCESS_CONTROL"] = "false"
os.environ["DB_PROVIDER"] = "postgres"
os.environ["DB_HOST"] = os.environ.get("DB_HOST", "localhost")
os.environ["DB_PORT"] = os.environ.get("DB_PORT", "5432")
os.environ["DB_USERNAME"] = os.environ.get("DB_USERNAME", "cognee")
os.environ["DB_PASSWORD"] = os.environ.get("DB_PASSWORD", "cognee")
os.environ["DB_NAME"] = os.environ.get("DB_NAME", "cognee_db")

# Clear all cached configs and engine factories so they re-read env vars
from cognee.infrastructure.databases.relational.config import get_relational_config
from cognee.infrastructure.databases.relational.create_relational_engine import (
create_relational_engine,
)
from cognee.infrastructure.databases.graph.config import get_graph_config
from cognee.infrastructure.databases.graph.get_graph_engine import _create_graph_engine
from cognee.infrastructure.databases.vector.create_vector_engine import _create_vector_engine

get_relational_config.cache_clear()
get_graph_config.cache_clear()
create_relational_engine.cache_clear()
_create_graph_engine.cache_clear()
_create_vector_engine.cache_clear()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid module-scope backend mutation in this test entrypoint.

These os.environ[...] = ... assignments and cache_clear() calls run as soon as the module is imported, so anything that imports this file inherits pghybrid settings before main() executes. Please move that setup behind main() (or a helper it calls) and keep this script in the integration-test area rather than as an import-sensitive top-level test_*.py.

As per coding guidelines "Place Python integration tests under cognee/tests/integration/" and "Avoid external state in Python tests; rely on test fixtures and CI-provided environment variables".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/tests/test_pghybrid.py` around lines 20 - 42, The test currently
mutates global process state at import by setting os.environ[...] and calling
cache_clear() for get_relational_config, get_graph_config,
create_relational_engine, _create_graph_engine and _create_vector_engine; move
all of those os.environ assignments and cache_clear() calls out of module scope
into a dedicated setup function (e.g., main() or a helper called from main) so
nothing is executed at import time, and convert this file into an integration
test under cognee/tests/integration/ (or update its name to avoid test_*.py
top-level import by test collectors) so the environment setup runs only when the
integration test is invoked by CI or the test runner.

Comment on lines +295 to +297
for _, edge, _ in connections:
if edge["relationship_name"] == "RELATED_TO":
assert edge.get("weight") == 0.8 or edge.get("edge_text") == "quantum ML"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Tighten the edge-payload assertion.

Line 297 uses or, so this still passes when either weight or edge_text is missing. For an integrity test, both fields should be asserted.

🔧 Proposed fix
         for _, edge, _ in connections:
             if edge["relationship_name"] == "RELATED_TO":
-                assert edge.get("weight") == 0.8 or edge.get("edge_text") == "quantum ML"
+                assert edge.get("weight") == 0.8
+                assert edge.get("edge_text") == "quantum ML"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@cognee/tests/unit/infrastructure/databases/hybrid/test_postgres_hybrid_adapter.py`
around lines 295 - 297, The current test loop over connections uses an or so a
missing field can still pass; update the assertion in the loop that iterates
over connections (variables connections and edge, checking
edge["relationship_name"] == "RELATED_TO") to require both fields and values
exist by asserting the edge contains both "weight" and "edge_text" and that
edge["weight"] == 0.8 and edge["edge_text"] == "quantum ML" (or use edge.get
checks combined with a logical AND) so the test fails if either field is absent
or incorrect.

Problems:
- postgres tests in unit/ fail without live database in CI
- integration/ is auto-swept by CI which lacks postgres service
- Igor requested separate postgres test directory and github action

Solutions:
- created cognee/tests/e2e/postgres/ for service-dependent tests
- moved 5 test files from unit/ and tests/ root to e2e/postgres
- updated imports to reflect new module paths
- updated graph_db_tests.yml to reference new file locations

Architectural Trade-Off Decisions:
- chose e2e/ over integration/ to avoid automatic CI sweep
- graph_db_tests.yml invokes explicitly so tests still run in CI

Reasoning and Related Notes:
- dexters1 (Igor) requested own github action for postgres tests
- all 39 tests pass locally against live postgres
Problems:
- get_triplets_batch ordering unstable with duplicate source/target pairs
- None embeddable data passed to embedding engine can cause failures
- bare except in Triplet_text deletion can hide real database errors
- pghybrid DSN built from None config fields gives unhelpful errors
- graphiti guard only blocks postgres, not kuzu or other non-neo4j backends

Solutions:
- added relationship_name to ORDER BY for deterministic pagination
- filtered None values from get_embeddable_data before calling embed_data
- narrowed except to log and distinguish missing-table from real errors
- added config validation before DSN construction in get_unified_engine
- replaced postgres blocklist with positive isinstance Neo4jAdapter check
Problems:
- edge property test assertion used or, passing on partial data
- hybrid search results built from fragile positional row indexes
- hybrid adapter __init__ lacked type hints on parameters
- get_graph_engine error message exceeded 100-char line limit

Solutions:
- split or assertion into two separate asserts for weight and edge_text
- switched to result.mappings() for named column access in hybrid search
- added TYPE_CHECKING imports and type annotations to __init__
- extracted provider list to variable before f-string interpolation
…face.

Problems:
- get_triplets_batch not declared on interface, callers use hasattr checks
- Neptune adapter does not implement it and cannot be required to

Solutions:
- added get_triplets_batch to GraphDBInterface as non-abstract optional method
- default raises NotImplementedError following existing feedback_weights pattern
- docstring documents which adapters implement it
Problems:
- CI had no coverage for pghybrid hybrid adapter or e2e path
- postgres:16 image lacks pgvector extension needed by hybrid tests
- TEST_DATA_DIR resolved to wrong path after file move to e2e/postgres
- hybrid adapter search wrapper missing node_name_filter_operator param
- combined_write stored belongs_to_set as null instead of empty list
- nodeset filtering test failed because entities lack belongs_to_set

Solutions:
- added hybrid adapter and pghybrid e2e steps to graph_db_tests.yml
- upgraded CI service image to pgvector/pgvector:pg17
- fixed TEST_DATA_DIR to resolve relative to tests/ root via parents[2]
- added node_name_filter_operator passthrough in hybrid search wrapper
- normalized belongs_to_set to empty list in combined_write payload
- disabled nodeset test with TODO noting cross-backend pipeline gap

Reasoning and Related Notes:
- nodeset gap affects all backends, not a postgres-specific issue
- entities from extract_graph_from_data never inherit belongs_to_set
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

♻️ Duplicate comments (3)
cognee/infrastructure/databases/hybrid/postgres/adapter.py (1)

491-503: ⚠️ Potential issue | 🟠 Major

Propagate unexpected Triplet_text delete failures.

Ignoring a missing Triplet_text table is fine, but the current else branch only logs and continues. A real SQL or permission error will still commit the rest of the transaction and report success.

🔧 Suggested fix
             if triplet_ids:
-                try:
-                    table = _validate_table_name("Triplet_text")
-                    await session.execute(
-                        text(f"DELETE FROM {table} WHERE CAST(id AS text) = ANY(:ids)"),
-                        {"ids": triplet_ids},
-                    )
-                except Exception as e:
-                    error_msg = str(e).lower()
-                    if "does not exist" in error_msg or "relation" in error_msg:
-                        logger.debug("Triplet_text table not found, skipping: %s", e)
-                    else:
-                        logger.warning("Unexpected error deleting from Triplet_text: %s", e)
+                if await self._vector.has_collection("Triplet_text"):
+                    table = _validate_table_name("Triplet_text")
+                    await session.execute(
+                        text(f"DELETE FROM {table} WHERE CAST(id AS text) = ANY(:ids)"),
+                        {"ids": triplet_ids},
+                    )

As per coding guidelines "Prefer explicit, structured error handling in Python code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 491
- 503, The delete-from-Triplet_text block swallows unexpected SQL errors (in the
except else branch) which allows the transaction to commit; modify the except
handling around the try/except that calls _validate_table_name("Triplet_text")
and session.execute(...) so that non-“table not found” errors are not only
logged via logger.warning but are re-raised (or wrapped and raised) to propagate
the failure and abort the transaction; keep the existing debug path for "does
not exist"/"relation" but ensure any other Exception is raised after logging.
cognee/tests/e2e/postgres/test_pghybrid.py (1)

19-42: ⚠️ Potential issue | 🟠 Major

Move the backend overrides out of module scope.

Because this is still a test_*.py module, importing it for collection flips USE_UNIFIED_PROVIDER, mutates DB env, and clears global caches before main() runs. That can leak pghybrid state into unrelated tests even when this script is never executed directly.

🔧 Suggested fix
 from cognee.tests.e2e.postgres.test_graphdb_shared import run_graph_db_test
@@
-# Now override env vars (after dotenv has run)
-os.environ["USE_UNIFIED_PROVIDER"] = "pghybrid"
-os.environ["ENABLE_BACKEND_ACCESS_CONTROL"] = "false"
-os.environ["DB_PROVIDER"] = "postgres"
-os.environ["DB_HOST"] = os.environ.get("DB_HOST", "localhost")
-os.environ["DB_PORT"] = os.environ.get("DB_PORT", "5432")
-os.environ["DB_USERNAME"] = os.environ.get("DB_USERNAME", "cognee")
-os.environ["DB_PASSWORD"] = os.environ.get("DB_PASSWORD", "cognee")
-os.environ["DB_NAME"] = os.environ.get("DB_NAME", "cognee_db")
@@
-get_relational_config.cache_clear()
-get_graph_config.cache_clear()
-create_relational_engine.cache_clear()
-_create_graph_engine.cache_clear()
-_create_vector_engine.cache_clear()
+def _configure_pghybrid_env() -> None:
+    os.environ["USE_UNIFIED_PROVIDER"] = "pghybrid"
+    os.environ["ENABLE_BACKEND_ACCESS_CONTROL"] = "false"
+    os.environ["DB_PROVIDER"] = "postgres"
+    os.environ.setdefault("DB_HOST", "localhost")
+    os.environ.setdefault("DB_PORT", "5432")
+    os.environ.setdefault("DB_USERNAME", "cognee")
+    os.environ.setdefault("DB_PASSWORD", "cognee")
+    os.environ.setdefault("DB_NAME", "cognee_db")
+
+    get_relational_config.cache_clear()
+    get_graph_config.cache_clear()
+    create_relational_engine.cache_clear()
+    _create_graph_engine.cache_clear()
+    _create_vector_engine.cache_clear()
@@
-async def main():
+async def main() -> None:
+    _configure_pghybrid_env()
     await run_graph_db_test("postgres")

As per coding guidelines "Avoid external state in Python tests; rely on test fixtures and CI-provided environment variables".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/tests/e2e/postgres/test_pghybrid.py` around lines 19 - 42, Move the
module-level env overrides and cache clearing into a test-scoped fixture instead
of executing at import time: create a pytest fixture (e.g.,
pghybrid_env_override) in this file that sets the env vars
(USE_UNIFIED_PROVIDER, ENABLE_BACKEND_ACCESS_CONTROL, DB_PROVIDER, DB_HOST,
DB_PORT, DB_USERNAME, DB_PASSWORD, DB_NAME) via monkeypatch.setenv (or
temporarily assign os.environ) and then calls
get_relational_config.cache_clear(), get_graph_config.cache_clear(),
create_relational_engine.cache_clear(), _create_graph_engine.cache_clear(), and
_create_vector_engine.cache_clear(); yield control to run the test and on
teardown restore original env and clear caches again—mark the fixture
autouse=True or explicitly use it in the tests so the overrides only run when
the test executes, not on module import.
cognee/infrastructure/databases/unified/get_unified_engine.py (1)

76-92: ⚠️ Potential issue | 🟠 Major

Validate DB_PROVIDER and DB_NAME before building the pghybrid DSN.

This guard still accepts USE_UNIFIED_PROVIDER=pghybrid with a non-Postgres relational engine and can build .../None when DB_NAME is unset. Failing fast here keeps the error actionable instead of deferring it into adapter initialization.

🔧 Suggested fix
         # Vector adapter: build connection string from relational config
         relational_config = get_relational_config()
+        if relational_config.db_provider != "postgres":
+            raise EnvironmentError(
+                "USE_UNIFIED_PROVIDER=pghybrid requires DB_PROVIDER=postgres"
+            )
         required = {
+            "DB_NAME": relational_config.db_name,
             "DB_HOST": relational_config.db_host,
             "DB_PORT": relational_config.db_port,
             "DB_USERNAME": relational_config.db_username,
             "DB_PASSWORD": relational_config.db_password,
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/unified/get_unified_engine.py` around lines
76 - 92, The code builds the pghybrid DSN without verifying the DB provider and
database name; add a guard before forming connection_string that checks the
unified provider is 'pghybrid' (inspect the DB_PROVIDER / USE_UNIFIED_PROVIDER
env value used by your flow) and that relational_config.db_name is set and
truthy, and if not raise an EnvironmentError with a clear message; update
get_relational_config()/relational_config validation block to verify DB_PROVIDER
== "pghybrid" (or equivalent flag) and relational_config.db_name before
composing connection_string so you don't produce a DSN ending in "/None" or
accept a non-Postgres engine.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.github/workflows/graph_db_tests.yml:
- Around line 135-143: The CI job running test_postgres_hybrid_adapter.py is
missing the embedding configuration required by the fixture that builds the
embedding engine and calls embed_data(); update the "Run Postgres Hybrid Adapter
Tests" step to export the same embedding-related environment variables used
elsewhere (e.g., EMBEDDING_PROVIDER, EMBEDDING_API_KEY, EMBEDDING_MODEL or
whichever vars your embedding setup uses) so the embedding engine fixture can
initialize and write vectors; ensure the step that runs pytest includes those
EMBEDDING_* env entries matching the other CI steps that exercise embeddings and
vector writes.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py`:
- Around line 433-467: The method delete_nodes_with_vectors currently only
deletes graph_node rows when collections is None; update it so when collections
is None it fetches all known vector collection table names (e.g., via the
adapter's registry or by querying the DB for the vector tables) and then runs
the same DELETE logic for each collection in the same transaction. Concretely,
inside delete_nodes_with_vectors (after await self._graph._ensure_initialized()
and before await session.commit()), determine the list of target collections
when collections is None, validate each name with _validate_table_name, and
execute the DELETE statement used for the explicit-collections path (DELETE FROM
{table} WHERE CAST(id AS text) = ANY(:ids)) for every collection so embeddings
are removed in the same session/transaction as the graph deletion.
- Around line 318-320: The code currently replaces the JSON-safe serialized
payload with the raw belongs_to_set (payload = serialize_data(dp.model_dump());
payload["belongs_to_set"] = dp.belongs_to_set), which can reintroduce
non-serializable nested DataPoint objects and break json.dumps(payload).
Instead, keep the result of serialize_data(dp.model_dump()) and set
payload["belongs_to_set"] to a JSON-safe representation (e.g., serialize each
entry in dp.belongs_to_set or call serialize_data on dp.belongs_to_set) so
payload remains JSON-serializable before calling json.dumps; update the
assignment around serialize_data, payload["belongs_to_set"], and any downstream
json.dumps usage accordingly.
- Around line 321-325: The INSERT into {table} that currently uses ON CONFLICT
(id) DO NOTHING leaves stale vectors when re-indexing; modify the upsert in the
SQL built around the INSERT INTO {table} (id, payload, vector) statement so that
ON CONFLICT (id) DO UPDATE sets at minimum the vector and payload columns to the
incoming values (use EXCLUDED.vector and EXCLUDED.payload, casting payload to
json as needed), ensuring reprocessed nodes replace existing embeddings/payloads
rather than being ignored; update the statement in adapter.py where that INSERT
is constructed.
- Around line 146-149: The PostgresHybridAdapter.override of
get_nodeset_subgraph is missing the node_name_filter_operator parameter required
by GraphDBInterface; update the async method signature of get_nodeset_subgraph
in PostgresHybridAdapter to accept node_name_filter_operator (with the same type
and default as the interface) and forward it to self._graph.get_nodeset_subgraph
so the adapter remains polymorphically compatible with GraphDBInterface.
- Around line 254-278: The embedding step is using
DataPoint.get_embeddable_data(dp) which ignores the grouped field_name and thus
writes the same embedding to every collection; change the logic in the loop over
vector_groups to use the field-specific value you stored (the second element of
each tuple) when producing embeddable text for each dp — e.g., call a
field-aware helper (pass field_name into DataPoint.get_embeddable_data or
extract the field via getattr(dp, field_name) and convert it to embeddable text)
and then pass those per-field texts to self._vector.embed_data so each
collection gets embeddings derived from its specific indexed field (refer to
vector_groups, valid_items, embeddings_by_collection, and
DataPoint.get_embeddable_data).

In `@cognee/tests/e2e/postgres/test_postgres_adapter.py`:
- Around line 389-424: The tests currently expect nested start_node/end_node
structures and ValueError validation, but PostgresAdapter.get_triplets_batch
(and other adapters) returns flat rows with keys like "source", "target", and
"relationship"; update the assertions in test_get_triplets_batch and
test_get_triplets_batch_offset to check for the flat shape (e.g., assert keys
"source"/"target"/"relationship" exist and their values match the added node
ids/relationship names) and remove or change the
test_get_triplets_batch_validation expectations that use
pytest.raises(ValueError) so they reflect the adapter's actual behavior (either
remove the raises checks or assert the observed behavior instead).

---

Duplicate comments:
In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py`:
- Around line 491-503: The delete-from-Triplet_text block swallows unexpected
SQL errors (in the except else branch) which allows the transaction to commit;
modify the except handling around the try/except that calls
_validate_table_name("Triplet_text") and session.execute(...) so that non-“table
not found” errors are not only logged via logger.warning but are re-raised (or
wrapped and raised) to propagate the failure and abort the transaction; keep the
existing debug path for "does not exist"/"relation" but ensure any other
Exception is raised after logging.

In `@cognee/infrastructure/databases/unified/get_unified_engine.py`:
- Around line 76-92: The code builds the pghybrid DSN without verifying the DB
provider and database name; add a guard before forming connection_string that
checks the unified provider is 'pghybrid' (inspect the DB_PROVIDER /
USE_UNIFIED_PROVIDER env value used by your flow) and that
relational_config.db_name is set and truthy, and if not raise an
EnvironmentError with a clear message; update
get_relational_config()/relational_config validation block to verify DB_PROVIDER
== "pghybrid" (or equivalent flag) and relational_config.db_name before
composing connection_string so you don't produce a DSN ending in "/None" or
accept a non-Postgres engine.

In `@cognee/tests/e2e/postgres/test_pghybrid.py`:
- Around line 19-42: Move the module-level env overrides and cache clearing into
a test-scoped fixture instead of executing at import time: create a pytest
fixture (e.g., pghybrid_env_override) in this file that sets the env vars
(USE_UNIFIED_PROVIDER, ENABLE_BACKEND_ACCESS_CONTROL, DB_PROVIDER, DB_HOST,
DB_PORT, DB_USERNAME, DB_PASSWORD, DB_NAME) via monkeypatch.setenv (or
temporarily assign os.environ) and then calls
get_relational_config.cache_clear(), get_graph_config.cache_clear(),
create_relational_engine.cache_clear(), _create_graph_engine.cache_clear(), and
_create_vector_engine.cache_clear(); yield control to run the test and on
teardown restore original env and clear caches again—mark the fixture
autouse=True or explicitly use it in the tests so the overrides only run when
the test executes, not on module import.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 1657e176-d358-4a5d-96df-0974747c4c98

📥 Commits

Reviewing files that changed from the base of the PR and between b3203b3 and 03625fa.

📒 Files selected for processing (14)
  • .github/workflows/graph_db_tests.yml
  • cognee/infrastructure/databases/graph/get_graph_engine.py
  • cognee/infrastructure/databases/graph/graph_db_interface.py
  • cognee/infrastructure/databases/graph/postgres/adapter.py
  • cognee/infrastructure/databases/hybrid/postgres/adapter.py
  • cognee/infrastructure/databases/unified/get_unified_engine.py
  • cognee/tasks/temporal_awareness/index_graphiti_objects.py
  • cognee/tests/e2e/__init__.py
  • cognee/tests/e2e/postgres/__init__.py
  • cognee/tests/e2e/postgres/test_graphdb_postgres.py
  • cognee/tests/e2e/postgres/test_graphdb_shared.py
  • cognee/tests/e2e/postgres/test_pghybrid.py
  • cognee/tests/e2e/postgres/test_postgres_adapter.py
  • cognee/tests/e2e/postgres/test_postgres_hybrid_adapter.py
✅ Files skipped from review due to trivial changes (1)
  • cognee/tests/e2e/postgres/test_graphdb_postgres.py
🚧 Files skipped from review as they are similar to previous changes (3)
  • cognee/tasks/temporal_awareness/index_graphiti_objects.py
  • cognee/infrastructure/databases/graph/get_graph_engine.py
  • cognee/infrastructure/databases/graph/postgres/adapter.py

Comment on lines +433 to +467
collections: Optional[Dict[str, List[str]]] = None,
) -> None:
"""Delete nodes from graph and their embeddings from vector tables
in a single database transaction.

Args:
node_ids: IDs of nodes to delete.
collections: Mapping of collection_name -> list of IDs to delete
from that collection. If None, deletes node_ids from all
known collections.
"""
if not node_ids:
return

await self._graph._ensure_initialized()

async with self._graph._session() as session:
# Delete from graph (CASCADE removes edges)
await session.execute(
text("DELETE FROM graph_node WHERE id = ANY(:ids)"),
{"ids": node_ids},
)

# Delete from vector collections
if collections:
for collection_name, ids in collections.items():
if not ids:
continue
table = _validate_table_name(collection_name)
await session.execute(
text(f"DELETE FROM {table} WHERE CAST(id AS text) = ANY(:ids)"),
{"ids": ids},
)

await session.commit()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

delete_nodes_with_vectors() skips the vector side for the default call.

The docstring says collections=None should remove these IDs from all known vector collections, but the implementation only deletes from graph_node in that path. That leaves orphaned embeddings behind and desynchronizes graph and vector search.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 433
- 467, The method delete_nodes_with_vectors currently only deletes graph_node
rows when collections is None; update it so when collections is None it fetches
all known vector collection table names (e.g., via the adapter's registry or by
querying the DB for the vector tables) and then runs the same DELETE logic for
each collection in the same transaction. Concretely, inside
delete_nodes_with_vectors (after await self._graph._ensure_initialized() and
before await session.commit()), determine the list of target collections when
collections is None, validate each name with _validate_table_name, and execute
the DELETE statement used for the explicit-collections path (DELETE FROM {table}
WHERE CAST(id AS text) = ANY(:ids)) for every collection so embeddings are
removed in the same session/transaction as the graph deletion.

Comment on lines +389 to +424
triplets = await adapter.get_triplets_batch(offset=0, limit=10)
assert len(triplets) == 1
assert triplets[0]["start_node"]["name"] == "Start"
assert triplets[0]["end_node"]["name"] == "End"
assert triplets[0]["relationship_properties"]["relationship_name"] == "CONNECTS"


@pytest.mark.asyncio
async def test_get_triplets_batch_offset(adapter):
await adapter.add_nodes(
[
_FakeDataPoint(id="to1", name="A", type="T"),
_FakeDataPoint(id="to2", name="B", type="T"),
_FakeDataPoint(id="to3", name="C", type="T"),
]
)
await adapter.add_edges(
[
("to1", "to2", "R1", {}),
("to1", "to3", "R2", {}),
]
)

all_triplets = await adapter.get_triplets_batch(offset=0, limit=10)
assert len(all_triplets) == 2

one_triplet = await adapter.get_triplets_batch(offset=1, limit=1)
assert len(one_triplet) == 1


@pytest.mark.asyncio
async def test_get_triplets_batch_validation(adapter):
with pytest.raises(ValueError):
await adapter.get_triplets_batch(offset=-1, limit=10)
with pytest.raises(ValueError):
await adapter.get_triplets_batch(offset=0, limit=-1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

These triplet assertions do not match the adapter contract.

PostgresAdapter.get_triplets_batch() in cognee/infrastructure/databases/graph/postgres/adapter.py returns flat {"source", "target", "relationship"} rows, and the Neo4j/Kuzu implementations in this PR follow the same shape. The nested start_node / end_node assertions and ValueError expectations here will fail even when the adapter behaves consistently.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/tests/e2e/postgres/test_postgres_adapter.py` around lines 389 - 424,
The tests currently expect nested start_node/end_node structures and ValueError
validation, but PostgresAdapter.get_triplets_batch (and other adapters) returns
flat rows with keys like "source", "target", and "relationship"; update the
assertions in test_get_triplets_batch and test_get_triplets_batch_offset to
check for the flat shape (e.g., assert keys "source"/"target"/"relationship"
exist and their values match the added node ids/relationship names) and remove
or change the test_get_triplets_batch_validation expectations that use
pytest.raises(ValueError) so they reflect the adapter's actual behavior (either
remove the raises checks or assert the observed behavior instead).

Comment on lines +342 to +348
async def test_add_edges_with_vectors(adapter):
"""Edges should be inserted into graph and edge types into vector table.

Note: this test passes in isolation but may fail when run in suite
due to PGVectorAdapter connection pool state from prior tests.
This is a pre-existing PGVectorAdapter issue, not a hybrid adapter bug.
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not land a test that is already documented as suite-flaky.

Lines 345-347 say this case may fail when the module runs as a suite, and the workflow now executes the whole file with pytest. That makes the new CI path nondeterministic by design.

Problems:
- add_data_points split unified engine into separate graph and vector calls
- hybrid adapter's combined_write methods weren't called from the pipeline
- vector payload format did not match PGVectorAdapter's IndexSchema format

Solutions:
- added HYBRID_WRITE capability check to route nodes and edges atomically
- add_nodes_with_vectors and add_edges_with_vectors now called when hybrid
- non-hybrid backends fall back to existing separate graph and vector calls
- payload now uses IndexSchema and serialize_data identical to PGVectorAdapter
- updated test assertions to match IndexSchema payload format

Reasoning and Related Notes:
- Import and re-use IndexSchema to keep linkage with PGVectorAdatper
Problems:
- hybrid joint-add edge type payloads differed from PGVectorAdapter
- no test verified that hybrid and separate paths produce identical payloads

Solutions:
- hybrid edge type payloads now use IndexSchema and serialize_data
- added test inserting a 3-node 3-edge graph via both paths and comparing
- test asserts identical vector payload keys and values for nodes and edges

Reasoning and Related Notes:
- hybrid and separate paths do not share serialization code yet
- test will catch drift if either path changes independently
- didn't want to change PGVectorAdapter in this PR
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (5)
cognee/infrastructure/databases/hybrid/postgres/adapter.py (5)

326-330: ⚠️ Potential issue | 🟠 Major

Update existing node vectors instead of ignoring re-indexes.

ON CONFLICT (id) DO NOTHING leaves stale payload/vector rows behind when an existing datapoint is reprocessed after its indexed text changes.

Suggested fix
                         text(f"""
                             INSERT INTO {table} (id, payload, vector)
                             VALUES (:id, CAST(:payload AS json), :vector)
-                            ON CONFLICT (id) DO NOTHING
+                            ON CONFLICT (id) DO UPDATE SET
+                                payload = EXCLUDED.payload,
+                                vector = EXCLUDED.vector
                         """),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 326
- 330, The INSERT currently uses "ON CONFLICT (id) DO NOTHING" which leaves
stale payloads/vectors when re-indexing; update the conflict clause in the SQL
executed by session.execute (the INSERT INTO {table} in this adapter) to perform
"ON CONFLICT (id) DO UPDATE" and set the payload and vector columns from the
incoming values (use EXCLUDED for the conflicting row values, e.g. set payload =
CAST(EXCLUDED.payload AS json), vector = EXCLUDED.vector) so existing rows are
overwritten on re-index rather than ignored.

438-475: ⚠️ Potential issue | 🟠 Major

Delete vectors when collections is omitted.

The docstring says collections=None removes these IDs from all known vector collections, but this implementation only deletes from graph_node in that path. That leaves orphaned embeddings behind and breaks graph/vector parity.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 438
- 475, The delete_nodes_with_vectors method currently only removes graph_node
rows when collections is None; update delete_nodes_with_vectors to, after
awaiting self._graph._ensure_initialized(), fetch the list of known vector
collections (use the existing mechanism in this adapter that lists collection
names), then iterate those collection names and run the same DELETE statement
(using _validate_table_name(collection_name) and the CAST/id ANY(:ids) pattern)
for each collection when collections is None; keep the existing behavior when
collections is provided, perform deletes inside the same async with
self._graph._session() transaction, and commit once at the end (preserve
session.execute and await session.commit usage).

499-513: ⚠️ Potential issue | 🟠 Major

Re-raise unexpected Triplet delete failures.

Lines 506-511 still swallow any non-missing-table error, then Line 513 commits the transaction as if the vector delete succeeded. That can silently desynchronize graph and vector state.

Suggested fix
                 except Exception as e:
                     error_msg = str(e).lower()
                     if "does not exist" in error_msg or "relation" in error_msg:
                         logger.debug("Triplet_text table not found, skipping: %s", e)
                     else:
                         logger.warning("Unexpected error deleting from Triplet_text: %s", e)
+                        raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 499
- 513, The current delete block for Triplet_text swallows non-missing-table
exceptions and still calls session.commit(), risking state desync; modify the
try/except around _validate_table_name and session.execute so that if error_msg
contains "does not exist" or "relation" you log.debug and continue, but for any
other Exception you log.warning and then re-raise the exception (or raise a new
one) so the caller can handle it and session.commit() is not reached;
alternatively move session.commit() inside the try block after a successful
session.execute to ensure commit only happens on successful delete.

256-280: ⚠️ Potential issue | 🟠 Major

Embed the indexed field, not the whole datapoint.

This block groups rows by field_name, but Line 270 still calls DataPoint.get_embeddable_data(dp). A datapoint with multiple index_fields will therefore write the same embedding into every collection instead of one embedding per indexed field.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 256
- 280, The code groups datapoints by collection using field_name but still calls
DataPoint.get_embeddable_data(dp) so the same embedding is used for every index
field; update the logic in the loop that builds valid_items (the block using
vector_groups, DataPoint.get_embeddable_data, texts, and _vector.embed_data) to
extract the specific field value for each (dp, field_name) pair (e.g., use
getattr(dp, field_name) or call a get_embeddable_data overload that accepts
field_name) and build texts from those per-field values so
embeddings_by_collection stores one embedding per indexed field/collection
rather than one per datapoint. Ensure references to collection, field_name,
valid_items, and embeddings_by_collection are updated accordingly.

148-151: ⚠️ Potential issue | 🟠 Major

Keep get_nodeset_subgraph() compatible with GraphDBInterface.

Line 149 drops node_name_filter_operator, so any polymorphic caller using the interface contract can raise TypeError when the hybrid adapter is substituted.

Suggested fix
     async def get_nodeset_subgraph(
-        self, node_type: Type[Any], node_name: List[str]
+        self,
+        node_type: Type[Any],
+        node_name: List[str],
+        node_name_filter_operator: str = "OR",
     ) -> Tuple[List[Tuple[str, dict]], List[Tuple[str, str, str, dict]]]:
-        return await self._graph.get_nodeset_subgraph(node_type, node_name)
+        return await self._graph.get_nodeset_subgraph(
+            node_type, node_name, node_name_filter_operator
+        )

Based on learnings "Implement database adapters by extending GraphDBInterface or VectorDBInterface".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 148
- 151, The adapter method get_nodeset_subgraph currently omits the
node_name_filter_operator parameter required by GraphDBInterface; update the
signature of
cognee.infrastructure.databases.hybrid.postgres.adapter.PostgresHybridAdapter.get_nodeset_subgraph
to accept the node_name_filter_operator parameter (use the same default/typing
as the interface) and forward it to self._graph.get_nodeset_subgraph so callers
using the GraphDBInterface contract remain compatible; ensure the parameter is
passed through in the await call and adjust any type hints/imports to match
GraphDBInterface.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py`:
- Around line 570-583: The query multiplies nodes by their incident edges
because LEFT JOIN graph_edge is applied before LIMIT, causing duplicates and
incorrect ordering; fix by first selecting the top-N unique vector rows (compute
vc.vector <=> CAST(:query_vector AS vector) AS distance from the {table} vc,
ORDER BY distance LIMIT :limit) in a subquery (alias e.g. top_vectors) and then
join that subquery to graph_node (gn) and LEFT JOIN graph_edge (ge) — or
alternatively use DISTINCT ON (gn.id) with ORDER BY gn.id, distance to
deduplicate by gn.id — ensure the final ORDER BY and LIMIT operate over unique
gn.id (use MIN(distance) per node if aggregating) so the result returns unique
nodes with correct distance ranking.

In `@cognee/tasks/storage/add_data_points.py`:
- Around line 151-154: In hybrid mode we must only write embeddings to the
vector store and still perform the graph bookkeeping via upsert_nodes, instead
of calling graph_engine.add_nodes_with_vectors which inserts into graph_node;
change the use_hybrid branch so it (1) calls index_data_points(triplets,
vector_engine=vector_engine) to write vectors and (2) invokes
graph_engine.upsert_nodes(...) (or the existing node-upsert helper used
elsewhere) to do the graph bookkeeping using the Triplet/node metadata — do not
call add_nodes_with_vectors in this path.

---

Duplicate comments:
In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py`:
- Around line 326-330: The INSERT currently uses "ON CONFLICT (id) DO NOTHING"
which leaves stale payloads/vectors when re-indexing; update the conflict clause
in the SQL executed by session.execute (the INSERT INTO {table} in this adapter)
to perform "ON CONFLICT (id) DO UPDATE" and set the payload and vector columns
from the incoming values (use EXCLUDED for the conflicting row values, e.g. set
payload = CAST(EXCLUDED.payload AS json), vector = EXCLUDED.vector) so existing
rows are overwritten on re-index rather than ignored.
- Around line 438-475: The delete_nodes_with_vectors method currently only
removes graph_node rows when collections is None; update
delete_nodes_with_vectors to, after awaiting self._graph._ensure_initialized(),
fetch the list of known vector collections (use the existing mechanism in this
adapter that lists collection names), then iterate those collection names and
run the same DELETE statement (using _validate_table_name(collection_name) and
the CAST/id ANY(:ids) pattern) for each collection when collections is None;
keep the existing behavior when collections is provided, perform deletes inside
the same async with self._graph._session() transaction, and commit once at the
end (preserve session.execute and await session.commit usage).
- Around line 499-513: The current delete block for Triplet_text swallows
non-missing-table exceptions and still calls session.commit(), risking state
desync; modify the try/except around _validate_table_name and session.execute so
that if error_msg contains "does not exist" or "relation" you log.debug and
continue, but for any other Exception you log.warning and then re-raise the
exception (or raise a new one) so the caller can handle it and session.commit()
is not reached; alternatively move session.commit() inside the try block after a
successful session.execute to ensure commit only happens on successful delete.
- Around line 256-280: The code groups datapoints by collection using field_name
but still calls DataPoint.get_embeddable_data(dp) so the same embedding is used
for every index field; update the logic in the loop that builds valid_items (the
block using vector_groups, DataPoint.get_embeddable_data, texts, and
_vector.embed_data) to extract the specific field value for each (dp,
field_name) pair (e.g., use getattr(dp, field_name) or call a
get_embeddable_data overload that accepts field_name) and build texts from those
per-field values so embeddings_by_collection stores one embedding per indexed
field/collection rather than one per datapoint. Ensure references to collection,
field_name, valid_items, and embeddings_by_collection are updated accordingly.
- Around line 148-151: The adapter method get_nodeset_subgraph currently omits
the node_name_filter_operator parameter required by GraphDBInterface; update the
signature of
cognee.infrastructure.databases.hybrid.postgres.adapter.PostgresHybridAdapter.get_nodeset_subgraph
to accept the node_name_filter_operator parameter (use the same default/typing
as the interface) and forward it to self._graph.get_nodeset_subgraph so callers
using the GraphDBInterface contract remain compatible; ensure the parameter is
passed through in the await call and adjust any type hints/imports to match
GraphDBInterface.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 032e51ff-f38f-4ed3-b337-7c1683ed47a5

📥 Commits

Reviewing files that changed from the base of the PR and between 03625fa and 3317006.

📒 Files selected for processing (3)
  • cognee/infrastructure/databases/hybrid/postgres/adapter.py
  • cognee/tasks/storage/add_data_points.py
  • cognee/tests/e2e/postgres/test_postgres_hybrid_adapter.py
✅ Files skipped from review due to trivial changes (1)
  • cognee/tests/e2e/postgres/test_postgres_hybrid_adapter.py

Comment on lines +570 to +583
sql = text(f"""
SELECT
gn.id AS node_id,
gn.name AS node_name,
gn.type AS node_type,
ge.relationship_name,
ge.source_id,
ge.target_id,
vc.vector <=> CAST(:query_vector AS vector) AS distance
FROM graph_node gn
JOIN {table} vc ON CAST(vc.id AS text) = gn.id
LEFT JOIN graph_edge ge ON ge.source_id = gn.id OR ge.target_id = gn.id
ORDER BY distance
LIMIT :limit
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Return unique nodes from the all-node hybrid search.

The LEFT JOIN graph_edge multiplies each node by its incident-edge count. ORDER BY distance LIMIT :limit is therefore applied to joined rows, not unique nodes, so this path can return duplicates and miss equally relevant nodes compared with the existing vector-search flow.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/infrastructure/databases/hybrid/postgres/adapter.py` around lines 570
- 583, The query multiplies nodes by their incident edges because LEFT JOIN
graph_edge is applied before LIMIT, causing duplicates and incorrect ordering;
fix by first selecting the top-N unique vector rows (compute vc.vector <=>
CAST(:query_vector AS vector) AS distance from the {table} vc, ORDER BY distance
LIMIT :limit) in a subquery (alias e.g. top_vectors) and then join that subquery
to graph_node (gn) and LEFT JOIN graph_edge (ge) — or alternatively use DISTINCT
ON (gn.id) with ORDER BY gn.id, distance to deduplicate by gn.id — ensure the
final ORDER BY and LIMIT operate over unique gn.id (use MIN(distance) per node
if aggregating) so the result returns unique nodes with correct distance
ranking.

Comment on lines +151 to +154
if use_hybrid:
await graph_engine.add_nodes_with_vectors(triplets)
else:
await index_data_points(triplets, vector_engine=vector_engine)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep triplet embedding writes vector-only in hybrid mode.

The non-hybrid branch only indexes Triplet objects in the vector store, but Line 152 sends them through add_nodes_with_vectors(), which also inserts them into graph_node. That changes graph contents and skips the earlier upsert_nodes(...) bookkeeping, so hybrid mode is no longer equivalent to the existing backends.

Suggested fix
         if triplets:
-            if use_hybrid:
-                await graph_engine.add_nodes_with_vectors(triplets)
-            else:
-                await index_data_points(triplets, vector_engine=vector_engine)
+            await index_data_points(triplets, vector_engine=vector_engine)
             logger.info(f"Created and indexed {len(triplets)} triplets from graph structure")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cognee/tasks/storage/add_data_points.py` around lines 151 - 154, In hybrid
mode we must only write embeddings to the vector store and still perform the
graph bookkeeping via upsert_nodes, instead of calling
graph_engine.add_nodes_with_vectors which inserts into graph_node; change the
use_hybrid branch so it (1) calls index_data_points(triplets,
vector_engine=vector_engine) to write vectors and (2) invokes
graph_engine.upsert_nodes(...) (or the existing node-upsert helper used
elsewhere) to do the graph bookkeeping using the Triplet/node metadata — do not
call add_nodes_with_vectors in this path.

…t routing.

Problems:
- hybrid adapter CI step missing embedding API secrets
- get_nodeset_subgraph override drops node_name_filter_operator parameter
- vector ON CONFLICT DO NOTHING leaves stale embeddings on reprocess
- delete_nodes_with_vectors docstring promised all-collection deletion
- hybrid path inserted triplets as graph nodes instead of vector-only

Solutions:
- added EMBEDDING_* env vars to hybrid adapter CI step
- added node_name_filter_operator to get_nodeset_subgraph matching interface
- changed both node and edge vector inserts to ON CONFLICT DO UPDATE
- corrected docstring to reflect graph-only behavior when collections is None
- triplets now always use vector-only index_data_points regardless of backend
@DanielNScott
Copy link
Copy Markdown
Author

Ok, I've gone through the review items below and tweaked things, fixed them, etc. There were a number of minor points; most of 4-20 were cosmetic. A few items in 21-25 were important. 1-3 are from @dexters1. I also fixed the routing issue we discussed earlier @lxobr. The "joint" as opposed to separate graph and vector routing was accidentally never turned on (!) and I added an test verifying that the joint path and the separated paths through the hybrid adapter produce completely equivalent results.

Review items:

  1. Clarify relationship between pghybrid and existing pgvector code paths
  2. Batch insert rows instead of row-by-row for performance
  3. Move postgres-dependent tests out of unit tests into separate folder
  4. Wire pghybrid tests into CI via graph_db_tests.yml
  5. Stabilize ORDER BY in get_triplets_batch pagination
  6. Filter None values from get_embeddable_data before embedding
  7. Add defensive split for collection names lacking underscore
  8. Narrow bare except Exception in Triplet_text deletion
  9. Fail fast on incomplete pghybrid relational config
  10. Reject all non-Neo4j backends in graphiti, not just postgres
  11. Add get_triplets_batch to GraphDBInterface
  12. Move module-scope env mutation in test_pghybrid.py into main()
  13. Fix edge property assertion using or instead of and
  14. Use named column access instead of positional row indexes
  15. Add type hints to hybrid adapter constructor
  16. Move test_graphdb_shared.py to integration area
  17. Move SearchTypeNotSupported import outside conditional
  18. Remove unused import asyncio
  19. Relax fragile history count assertion from == 6 to >= 3
  20. Shorten long f-string in get_graph_engine error message
  21. Add embedding env vars to hybrid adapter CI step
  22. Add missing node_name_filter_operator to get_nodeset_subgraph override
  23. Change vector inserts from ON CONFLICT DO NOTHING to DO UPDATE
  24. Fix delete_nodes_with_vectors docstring promising all-collection deletion
  25. Fix hybrid path inserting triplets as graph nodes instead of vector-only

Resolutions:

  1. Left as-is; prevents misconfiguration when code calls get_vector_engine directly under pghybrid
  2. Done. Already addressed in prior commit
  3. Done. Created cognee/tests/e2e/postgres/ and moved postgres test files there
  4. Done. Added CI steps and upgraded service image to pgvector/pgvector:pg17
  5. Done. Added e.relationship_name to ORDER BY clause
  6. Done. Added None filtering before calling embed_data
  7. Skipped; the underscore is guaranteed by the f-string 16 lines above
  8. Done. Except now logs the error and distinguishes missing-table from unexpected failures
  9. Done. Relational config fields are validated before building the connection string
  10. Done. Replaced exclusions list with isinstance(Neo4jAdapter) check
  11. Done. Added non-abstract method on GraphDBInterface with NotImplementedError default
  12. Left as-is; file is now in e2e/postgres/ off the pytest sweep path. Import-order constraints make fragile.
  13. Done. Two assertions implement AND logic
  14. Done. Used result.mappings() to access columns by name as suggested
  15. Done. Added TYPE_CHECKING imports and parameter annotations
  16. Done. Already addressed by item 3; file now lives alongside its consumers
  17. Done. Import is at module scope
  18. Done. Import removed
  19. Declined; the assertion is intentional and documented with a comment
  20. Done. Extracted provider list to a variable as suggested
  21. Done. Added EMBEDDING_* secrets and ENV to hybrid adapter CI step
  22. Done. Parameter and passthrough added matching GraphDBInterface contract
  23. Done. Both node and edge vector inserts now upsert on conflict
  24. Done. Corrected docstring; callers must supply collection info explicitly
  25. Done. Triplets now use index_data_points regardless of hybrid capability

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants