RDF, cleanup relations and remove unnecessary bindings, add distributed mode for RDF reindex#26902
Conversation
…ed mode for RDF reindex
❌ Lint Check Failed — ESLint + Prettier + Organise Imports (src)The following files have style issues that need to be fixed: Fix locally (fast — only for changed files in the branch): make ui-checkstyle-src-changedOr to fix all files: make ui-checkstyle-src |
✅ TypeScript Types Auto-UpdatedThe generated TypeScript types have been automatically updated based on JSON schema changes in this PR. |
openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfRepository.java
Outdated
Show resolved
Hide resolved
.../java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexExecutor.java
Outdated
Show resolved
Hide resolved
.../java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexExecutor.java
Show resolved
Hide resolved
.../java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexExecutor.java
Show resolved
Hide resolved
| } | ||
|
|
||
| try { |
There was a problem hiding this comment.
💡 Edge Case: RDF store clear removed but recreateIndex config still exists
The code that cleared the RDF store before re-indexing (rdfRepository.clearAll()) was removed from reIndexFromStartToEnd() (line 294-296 deleted). However, the recreateIndex configuration option still exists in both the JSON schema (rdfIndexingAppConfig.json line 103-108) and the UI schema (RdfIndexApp.json line 90-95). If a user enables recreateIndex, the setting is silently ignored, leading to stale data remaining in the RDF store during a full reindex.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
There was a problem hiding this comment.
Pull request overview
This PR extends the RDF knowledge graph/reindexing feature set by adding UI/schema support for RDF indexing configuration (including distributed mode), improving runtime behavior around app pages, and wiring RDF job status updates through WebSockets.
Changes:
- Added a new RDF indexing application schema and made application-run configuration UI resilient to missing schemas.
- Extended RDF graph explore API types/params to support entity/relationship filtering and filter options in responses.
- Added an RDF WebSocket broadcast channel and subscribed the App Runs History UI to RDF job status updates; optimized Airflow status fetching based on route.
Reviewed changes
Copilot reviewed 42 out of 43 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/RdfIndexApp.json | New UI schema for RDF indexing configuration (incl. distributed options). |
| openmetadata-ui/src/main/resources/ui/src/rest/rdfAPI.ts | Adds typed graph filter params/options and updates entity-graph API signature. |
| openmetadata-ui/src/main/resources/ui/src/components/KnowledgeGraph/KnowledgeGraph.interface.ts | Adds filter option types to KnowledgeGraph data model. |
| openmetadata-ui/src/main/resources/ui/src/context/AirflowStatusProvider/AirflowStatusProvider.tsx | Skips Airflow status fetch on routes where it’s not needed. |
| openmetadata-ui/src/main/resources/ui/src/context/AirflowStatusProvider/AirflowStatusProvider.test.tsx | Adds route-aware tests via MemoryRouter and validates skipping behavior. |
| openmetadata-ui/src/main/resources/ui/src/constants/constants.ts | Adds RDF_INDEX_JOB_BROADCAST_CHANNEL socket event constant. |
| openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppSchedule/AppScheduleProps.interface.ts | Makes jsonSchema optional to support apps without a schema. |
| openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.interface.ts | Makes jsonSchema optional to support schema-less runs/history UI. |
| openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx | Disables config UI when schema missing and subscribes to RDF job WebSocket updates. |
| openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.test.tsx | Adds WebSocket mocking, schema-unavailable test, and RDF channel subscription test. |
| openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppDetails/AppDetails.component.tsx | Handles missing schema imports by clearing schema and showing a toast. |
| openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppDetails/ApplicationsClassBase.test.ts | Verifies RDF schema import works and includes new distributed fields. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexCoordinator.java | Implements distributed RDF indexing coordinator flow (partition claim/aggregate refresh). |
| openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java | Registers the distributed RDF job participant at application startup. |
| public RdfIndexPartition claimNextPartition(UUID jobId) { | ||
| long claimAt = (System.currentTimeMillis() * 1000) + claimCounter.incrementAndGet(); | ||
| int updated = | ||
| collectionDAO | ||
| .rdfIndexPartitionDAO() | ||
| .claimNextPartitionAtomic(jobId.toString(), serverId, claimAt); | ||
| if (updated <= 0) { | ||
| return null; | ||
| } | ||
|
|
||
| RdfIndexPartitionRecord record = | ||
| collectionDAO | ||
| .rdfIndexPartitionDAO() | ||
| .findLatestClaimedPartition(jobId.toString(), serverId, claimAt); | ||
| return record != null ? toPartition(record) : null; |
There was a problem hiding this comment.
claimAt is being computed as System.currentTimeMillis() * 1000 + counter and passed as the :now parameter to claimNextPartitionAtomic, which sets claimedAt/startedAt/lastUpdateAt to that value. This inflates timestamps by 1000× and will break stale-heartbeat detection and any time-based reporting. Use a real epoch-millis now for timestamp columns, and if you need a unique claim token for findLatestClaimedPartition, generate it separately (or change the lookup query to not require exact claimedAt equality).
| if (status == IndexJobStatus.COMPLETED | ||
| || status == IndexJobStatus.COMPLETED_WITH_ERRORS | ||
| || status == IndexJobStatus.FAILED | ||
| || status == IndexJobStatus.STOPPED) { |
There was a problem hiding this comment.
refreshAggregatedJob() sets completedAt(now) every time a terminal status is observed. This can rewrite the original completion timestamp on subsequent refreshes (e.g., other partitions finishing later, retries, or periodic refresh). Consider only setting completedAt if it is currently null, or persisting the first time the job becomes terminal so completion time remains stable.
| if (status == IndexJobStatus.COMPLETED | |
| || status == IndexJobStatus.COMPLETED_WITH_ERRORS | |
| || status == IndexJobStatus.FAILED | |
| || status == IndexJobStatus.STOPPED) { | |
| if (completedAt == null | |
| && (status == IndexJobStatus.COMPLETED | |
| || status == IndexJobStatus.COMPLETED_WITH_ERRORS | |
| || status == IndexJobStatus.FAILED | |
| || status == IndexJobStatus.STOPPED)) { |
| CollectionDAO collectionDAO = jdbi.onDemand(CollectionDAO.class); | ||
| RdfDistributedJobParticipant participant = new RdfDistributedJobParticipant(collectionDAO); | ||
| environment.lifecycle().manage(participant); |
There was a problem hiding this comment.
The catch block logs only ex.getMessage() and drops the stack trace, which makes diagnosing startup failures in distributed RDF job participant registration difficult. Log the exception object as well (e.g., LOG.warn(..., ex)) or rethrow if registration is required for correctness.
openmetadata-ui/src/main/resources/ui/src/components/KnowledgeGraph/KnowledgeGraph.interface.ts
Outdated
Show resolved
Hide resolved
| try { | ||
| const schema = await applicationsClassBase.importSchema(fqn); | ||
| setJsonSchema(schema); | ||
| } catch (_) { | ||
| setJsonSchema(undefined); | ||
| showErrorToast( | ||
| t('message.no-application-schema-found', { appName: fqn }) | ||
| ); | ||
| } |
There was a problem hiding this comment.
The schema import error is swallowed (catch (_)) and always shows the generic "no schema" toast. If the dynamic import fails for other reasons (e.g., chunk load error), this message can be misleading and makes debugging harder. Consider capturing the error and either passing it to showErrorToast (with the fallback message) or at least logging it for diagnosis.
| ] | ||
| }, | ||
| "default": [ | ||
| "all" | ||
| ], | ||
| "uiFieldType": "treeSelect", | ||
| "uniqueItems": true |
There was a problem hiding this comment.
entities.items.enum does not include the value "all", but entities.default is set to ["all"]. This makes the default invalid against the schema and can break form defaults/validation. Either add "all" to the enum or change the default to valid enum values.
🔴 Playwright Results — 12 failure(s), 42 flaky✅ 3410 passed · ❌ 12 failed · 🟡 42 flaky · ⏭️ 226 skipped
Genuine Failures (failed on all attempts)❌
|
|
| const schema = await applicationsClassBase.importSchema(fqn); | ||
| setJsonSchema(schema); | ||
| } catch (error) { | ||
| console.error(`Failed to load application schema for ${fqn}`, error); |
Check failure
Code scanning / CodeQL
Use of externally-controlled format string High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 39 minutes ago
General fix: ensure that untrusted input is never used as the format string for logging functions. Either (a) do not use it in the first argument when there are extra arguments, or (b) separate static text (format string) from dynamic values using %s placeholders or multiple arguments, so any % in user input is not treated as a format specifier.
Best fix here without changing existing functionality: adjust the console.error call at line 116 in AppDetails.component.tsx so that the first argument is a static, trusted format string, and fqn plus the error object are passed as additional arguments. The visible log message remains effectively the same, but any % characters in fqn are safely treated as data. For example:
console.error('Failed to load application schema for %s', fqn, error);This change is localized to the fetchAppDetails function in AppDetails.component.tsx. No new imports are required, and no behavior outside logging format is altered. All other files (useRequiredParams.ts, useFqn.ts) do not need modification for this specific issue.
| @@ -113,7 +113,7 @@ | ||
| const schema = await applicationsClassBase.importSchema(fqn); | ||
| setJsonSchema(schema); | ||
| } catch (error) { | ||
| console.error(`Failed to load application schema for ${fqn}`, error); | ||
| console.error('Failed to load application schema for %s', fqn, error); | ||
| setJsonSchema(undefined); | ||
| showErrorToast( | ||
| t('message.no-application-schema-found', { appName: fqn }) |
✅ TypeScript Types Auto-UpdatedThe generated TypeScript types have been automatically updated based on JSON schema changes in this PR. |
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
| try { | ||
| boolean containsAll = jobData.getEntities().contains(ALL); | ||
| if (containsAll) { | ||
| jobData.setEntities(getAll()); | ||
| jobData.setEntities(resolveEntityTypes(jobData.getEntities())); | ||
| if (jobData.getEntities().isEmpty()) { | ||
| throw new IllegalStateException( | ||
| "No repository-backed entity types configured for RDF indexing"); | ||
| } |
There was a problem hiding this comment.
resolveEntityTypes() returns an empty set when jobData.getEntities() is null/empty, and execute() then throws "No repository-backed entity types...". This contradicts the UI/spec schema and default app configs in this PR which set entities: [] to mean "index all supported entities"; as-is, the default/scheduled RDF index job will fail immediately. Consider treating null/empty entities as ALL (e.g., populate with Entity.getEntityList() or call getAll()), and only error when the resolved set is empty after filtering unsupported entities.
| private String buildErrorResponse(String message) { | ||
| return String.format("{\"error\": \"%s\"}", message); | ||
| } |
There was a problem hiding this comment.
buildErrorResponse() interpolates message into a JSON string without escaping. If the message contains quotes/newlines (or user-controlled data), the response becomes invalid JSON and could enable response injection. Prefer building the JSON with an object mapper (or returning a Map/POJO) and set the response MediaType.APPLICATION_JSON.
| "auditLog", | ||
| "webAnalyticEvent", | ||
| "entityUsage", | ||
| "eventSubscription", |
There was a problem hiding this comment.
EXCLUDED_RELATIONSHIP_ENTITY_TYPES includes "eventSubscription", but the canonical entity type string is "eventsubscription" (see Entity.EVENT_SUBSCRIPTION). As written, event subscription relationships won't be excluded and will add noisy edges to the RDF graph. Update the constant to use the canonical entity type string.
| "eventSubscription", | |
| "eventsubscription", |
| package org.openmetadata.service.apps.bundles.rdf; | ||
|
|
||
| import java.util.ArrayList; |
There was a problem hiding this comment.
This new Java file is missing the standard Apache 2.0 license header block that other OpenMetadata service classes include (e.g., .../searchIndex/distributed/DistributedJobParticipant.java). Add the header to avoid license/format checks failing in CI.
| package org.openmetadata.service.apps.bundles.rdf.distributed; | ||
|
|
||
| import java.util.List; |
There was a problem hiding this comment.
This new Java file is missing the standard Apache 2.0 license header block that other OpenMetadata service classes include (e.g., .../searchIndex/distributed/DistributedJobParticipant.java). Add the header to avoid license/format checks failing in CI.
| package org.openmetadata.service.apps.bundles.rdf.distributed; | ||
|
|
||
| import java.util.Map; |
There was a problem hiding this comment.
This new Java file is missing the standard Apache 2.0 license header block that other OpenMetadata service classes include (e.g., .../searchIndex/distributed/DistributedJobParticipant.java). Add the header to avoid license/format checks failing in CI.
| package org.openmetadata.service.apps.bundles.rdf.distributed; | ||
|
|
||
| import org.openmetadata.schema.system.EntityStats; |
There was a problem hiding this comment.
This new Java file is missing the standard Apache 2.0 license header block that other OpenMetadata service classes include (e.g., .../searchIndex/distributed/DistributedJobParticipant.java). Add the header to avoid license/format checks failing in CI.
| package org.openmetadata.service.apps.bundles.rdf.distributed; | ||
|
|
||
| import io.dropwizard.lifecycle.Managed; |
There was a problem hiding this comment.
This new Java file is missing the standard Apache 2.0 license header block that other OpenMetadata service classes include (e.g., .../searchIndex/distributed/DistributedJobParticipant.java). Add the header to avoid license/format checks failing in CI.
| var mockSocket = { | ||
| on: jest.fn(), | ||
| off: jest.fn(), | ||
| }; |
There was a problem hiding this comment.
var is used for mockSocket here; elsewhere in the UI test suite the pattern is const/let. Using var is function-scoped and can lead to accidental reassignments across tests. Switch this to const (or let if you truly need reassignment).
| try { | ||
| const schema = await applicationsClassBase.importSchema(fqn); | ||
| setJsonSchema(schema); | ||
| } catch (error) { | ||
| console.error(`Failed to load application schema for ${fqn}`, error); | ||
| setJsonSchema(undefined); |
There was a problem hiding this comment.
This console.error will violate the repo's no-console ESLint rule (errors on any console usage) unless explicitly disabled. Either add the standard // eslint-disable-next-line no-console comment (as done elsewhere in the codebase) or route this through the project's logging/toast utilities.
|



…
Describe your changes:
Fixes
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Summary by Gitar
RdfBatchProcessorutility with shared logic for entity and relationship indexinguseDistributedIndexingandpartitionSizeconfiguration optionsThis will update automatically on new commits.