General-purpose RAG system with a hexagonal architecture (Ports & Adapters), FastAPI, three retrieval modes (BM25, dense vector, hybrid), and swappable LLM connectors (OpenAI, OpenRouter, Ollama). Designed as a solid base to iterate in experimental environments. Default runtime mode is
sparse(SQLite-only). In dense/hybrid modes, vector state is persisted to disk (faissornumpybackend).
-
Clean architecture
- Hexagonal (Ports & Adapters): domain decoupled from infrastructure.
- Explicit typing and domain models.
-
Retrieval
- Sparse: BM25 (offline).
- Dense: vector index (
faiss/numpy) + embeddings backend (OpenAI or SentenceTransformers). - Hybrid: dense + BM25 combination with configurable weight.
-
LLMs
- OpenAI Chat (via API key).
- OpenRouter support (as OpenAI-compatible provider and dedicated proxy endpoint).
- Local Ollama (over HTTP). Current clients are synchronous.
-
Persistence
- SQLite via SQLAlchemy: documents and Q&A history.
- Vector index on disk for dense/hybrid mode (
faissornumpybackend).
-
API
- FastAPI with validation and OpenAPI at
/docs. - Health:
/api/health, Readiness:/api/ready, Ollama health:/api/health/ollama. - Config:
/api/config, Templates:/api/templates. - OpenRouter proxy (OpenAI-compatible):
POST /api/openrouter/generate.
- FastAPI with validation and OpenAPI at
-
Tests
- Unit, integration, and E2E with
pytest.
- Unit, integration, and E2E with
git clone https://github.com/Intrinsical-AI/rag-prototype.git
cd rag-prototype
# Recommended: uv-managed local venv + lockfile installs
# If your environment has a non-writable home directory, keep uv cache local:
# export UV_CACHE_DIR=.uv_cache
uv venv .venv
source .venv/bin/activate
# Windows: .venv\Scripts\activate
# Install runtime deps (uses uv.lock); --extra server adds FastAPI/uvicorn
uv sync --frozen --extra server
# (Optional) Dense/Hybrid deps (FAISS)
# uv sync --frozen --extra server --extra dense
#
# (Optional) SentenceTransformers embeddings (heavy: torch/transformers)
# uv sync --frozen --extra dense-st
# (Optional) Dev/Test/Lint groups
# uv sync --frozen --group dev --group test --group lint --no-default-groupsInitialize sample data and start:
# Load sample CSV into SQLite and, if applicable, build vector index
rag-bootstrap
# FastAPI server
rag-server
# UI: http://localhost:8000/
# Health: http://localhost:8000/api/health
# Ollama health: http://localhost:8000/api/health/ollama
# Docs: http://localhost:8000/docsAlternative startup (without
rag-serverwrapper):uvicorn local_rag_backend.http.main:app --reload.
Default src/local_rag_backend/settings.py (Pydantic Settings). Overridden with environment variables or a .env file (case-insensitive).
Security note: when exposing this service behind a reverse proxy, keep
API_KEYenabled and ensure the proxy sanitizes forwarding headers. Runtime auth guards evaluate client origin usingX-Forwarded-Forand RFC 7239Forwarded; untrusted/unsanitized header chains can weaken source attribution. WhenAPI_KEYis unset andPUBLIC_BIND_REQUIRES_API_KEY=true, ambiguous forwarding chains (e.g. empty/unknown-only proxy headers) are rejected fail-closed.
Key variables (non-exhaustive):
| Variable | Default | Scope | Description |
|---|---|---|---|
APP_HOST |
127.0.0.1 |
server | Service host |
APP_PORT |
8000 |
server | Service port |
DEBUG |
false |
server | Reload/detailed logging |
LOG_LEVEL |
INFO |
server | Logging level |
API_KEY |
— | security | If set, require X-API-Key: <API_KEY> for /api/* and /metrics |
PUBLIC_BIND_REQUIRES_API_KEY |
true |
security | Refuse unsafe public startup and reject non-local /api/* + /metrics requests when API_KEY is unset |
CORS_ALLOW_ORIGINS |
[] |
security | Allowed CORS origins when DEBUG=false (JSON list or comma-separated) |
RETRIEVAL_MODE |
sparse |
retrieval | sparse | dense | hybrid |
DATA_DIR |
data |
storage | Base data directory (SQLite parent, vector index paths) |
SQLITE_URL |
sqlite:///./data/app.db |
storage | SQLite URL |
FAQ_CSV |
data/faq.csv |
ingestion | FAQ CSV |
CSV_HAS_HEADER |
true |
ingestion | CSV has header |
INGEST_CHUNK_STRATEGY |
chars_v1 |
ingestion | Chunking strategy identifier (deterministic) |
INGEST_CHUNKER_VERSION |
chars_v1 |
ingestion | Version token included in chunk dedup hashes |
INGEST_CHUNK_CHARS |
1200 |
ingestion | Chunk size in characters (200..8000) |
INGEST_CHUNK_OVERLAP |
200 |
ingestion | Chunk overlap in characters (0..4000, < CHUNK_CHARS) |
INGEST_BATCH_SIZE |
64 |
ingestion | File-plans per ingestion batch (1..512) |
INGEST_CLEAN_LOWERCASE |
true |
ingestion | Lowercase during ingestion preprocessing |
INGEST_CLEAN_REMOVE_HTML |
true |
ingestion | Remove HTML tags during ingestion preprocessing |
INGEST_CLEAN_COLLAPSE_WHITESPACE |
true |
ingestion | Collapse consecutive whitespace |
INGEST_CLEAN_STRIP |
true |
ingestion | Strip leading/trailing whitespace |
ST_EMBEDDING_MODEL |
all-MiniLM-L6-v2 |
dense/hybrid | SentenceTransformers model |
OPENAI_EMBEDDING_MODEL |
text-embedding-3-small |
OpenAI | Embeddings model |
VECTOR_BACKEND |
auto |
dense/hybrid | Vector backend selector: auto | faiss | numpy |
STORAGE_PROFILE |
(auto) | consistency | Optional explicit storage profile (sql_only_local, sql_faiss_local, sql_numpy_local) |
WRITE_LOCK_TIMEOUT_S |
30.0 |
consistency | Timeout (seconds) for multi-store write lock |
WRITE_LOCK_POLL_S |
0.05 |
consistency | Poll interval (seconds) while waiting for lock |
MUTATION_BATCH_MAX_SIZE |
32 |
consistency | Max queued mutation requests coalesced per batch cycle (1..512) |
MUTATION_BATCH_MAX_WAIT_MS |
50 |
consistency | Coalescing wait time before draining a mutation batch (0..5000) |
MUTATION_RECOVERY_ENABLED |
true |
consistency | Enable startup/background replay of incomplete mutations |
MUTATION_RECOVERY_INTERVAL_S |
30.0 |
consistency | Background recovery interval (seconds) |
INDEX_PATH |
data/index.faiss |
dense/hybrid | FAISS file |
ID_MAP_PATH |
data/id_map.json |
dense/hybrid | FAISS ID map (JSON) |
(derived) index_manifest.json |
data/index_manifest.json |
dense/hybrid | Index manifest (model/dim/chunker) for drift detection |
ENABLE_RERANKER |
false |
retrieval | Wrap selected retriever with reranking layer |
RERANKER_CANDIDATE_K |
20 |
retrieval | Candidate set size fetched before reranking (3..200) |
RERANKER_STRATEGY |
overlap_v1 |
retrieval | Reranker strategy identifier |
ENABLE_MONITORING |
false |
monitoring | Enable metrics middleware and /metrics endpoint |
OPENAI_TOP_P |
1.0 |
OpenAI | top-p parameter |
OPENROUTER_ENABLED |
false |
OpenRouter | Enable OpenRouter proxy |
OPENROUTER_API_KEY |
— | OpenRouter | API key |
OPENROUTER_BASE_URL |
https://openrouter.ai/api/v1 |
OpenRouter | Base URL |
OPENROUTER_MODEL |
openai/gpt-4o-mini |
OpenRouter | Default model |
OPENROUTER_SITE_URL |
— | OpenRouter | Optional Referer header |
OPENROUTER_APP_TITLE |
— | OpenRouter | Optional X-Title header |
HYBRID_RETRIEVAL_ALPHA |
0.5 |
hybrid | Weight of the sparse component (0=dense, 1=sparse) |
OPENAI_API_KEY |
— | OpenAI | API key |
OPENAI_MODEL |
gpt-4o-mini |
OpenAI | Chat model |
OPENAI_REQUEST_TIMEOUT |
60 |
OpenAI | Timeout (s) for OpenAI-compatible HTTP requests |
OPENAI_TEMPERATURE |
0.2 |
OpenAI | Temperature |
OPENAI_MAX_TOKENS |
256 |
OpenAI | Max tokens |
OPENAI_PROMPT_TEMPLATE |
(builtin template) | prompting | Prompt template for OpenAI/OpenRouter generators |
OLLAMA_ENABLED |
false |
Ollama | Enable Ollama |
OLLAMA_MODEL |
lfm2.5-thinking |
Ollama | Model served by Ollama |
OLLAMA_BASE_URL |
http://localhost:11434 |
Ollama | Server URL |
OLLAMA_REQUEST_TIMEOUT |
180 |
Ollama | Timeout (s) |
OLLAMA_PROMPT_TEMPLATE |
(builtin template) | prompting | Prompt template for Ollama generator |
When RETRIEVAL_MODE=dense|hybrid, the system writes an index_manifest.json next to INDEX_PATH.
It records stable identifiers for the index build (embedding backend/model, dimension, chunker strategy/version).
If you change any of these settings, /api/ready and rag-status will report drift and instruct you to rebuild:
rag-rebuild-index (or POST /api/index/rebuild).
Note: fresh-install only storage contract.
- canonical document IDs are opaque strings (
doc:<uuid7>) - SQL documents use
doc_idas the primary key - vector
id_map.jsonstoreslist[str] - no runtime migration/fallback for legacy schemas or legacy id maps
RETRIEVAL_MODE=sparse:RetrieverPort := SparseBM25Retriever(BM25 corpus + SQL doc repo)RETRIEVAL_MODE=dense:RetrieverPort := DenseVectorRetriever(embedder + vector index + SQL doc repo)RETRIEVAL_MODE=hybrid:RetrieverPort := HybridRetriever(DenseVectorRetriever, SparseBM25Retriever, alpha)- If
ENABLE_RERANKER=true, the selected retriever is wrapped as:RetrieverPort := RerankingRetriever(base=<selected>)
This boundary is enforced in composition/adapters.py and consumed by AppContainer.
The ingestion process is orchestrated by IngestionPipeline:
- Load items from a
LoaderPort(e.g.,CSVLoader) returningLoadedItem(text, lineage, metadata). - Preprocess (
preprocess_text) and chunk (default_chunker) with overlap. - Format chunks (metadata header) and batch-ingest via
ETLService.ingest().
- Sparse: stores directly in SQLite (no embeddings required).
- Dense / Hybrid:
- Save chunks in SQLite
- Generate embeddings with OpenAI (if
OPENAI_API_KEY) or SentenceTransformers (ST_EMBEDDING_MODEL) - Upsert into the vector index (
INDEX_PATH,ID_MAP_PATH)
Chunking parameters (in settings):
INGEST_CHUNK_CHARS(default 1200)INGEST_CHUNK_OVERLAP(default 200)INGEST_CHUNKER_VERSION(defaultchars_v1): changes the dedup key used by/api/docsto force re-chunk/re-embed.INGEST_BATCH_SIZE(default64, valid range1..512): file-plans processed per ingestion batch.
Available scripts:
# Ingest from CSV and build vector index if applicable
rag-bootstrap
# Ingest .txt/.md/.csv from file(s) or directory(ies)
rag-ingest ./my_notes ./docs/handbook.md ./data/faq.csv
# Keep symlink targets out of scope (also skips symlink paths passed as root inputs)
rag-ingest --no-follow-symlinks ./docs
# Rebuild vector index from current SQLite documents (idempotent; dense/hybrid only)
rag-rebuild-index
# Unified docs mutation (canonical write path)
cat > /tmp/mutate_upsert.json <<'JSON'
{"op_id":"op-upsert-1","upserts":[{"external_id":"doc-1","content":"hello"}]}
JSON
rag-mutate-docs --json /tmp/mutate_upsert.json
# Delete by SQL doc IDs
cat > /tmp/mutate_delete_ids.json <<'JSON'
{"op_id":"op-del-ids-1","delete_ids":["doc:...","doc:..."]}
JSON
rag-mutate-docs --json /tmp/mutate_delete_ids.json
# Delete by external IDs (creates tombstones)
cat > /tmp/mutate_delete_external_ids.json <<'JSON'
{"op_id":"op-del-ext-1","delete_external_ids":["chunk:abcd...","file:/path:part=file:chunk=0"]}
JSON
rag-mutate-docs --json /tmp/mutate_delete_external_ids.json
# Summarized system and files status
rag-status
# Offline retrieval evaluation (reproducible gate; default dataset from `datasets/rag_eval_v1.jsonl`)
rag-eval --retrieval-mode sparseRetrieval mode is selected via
RETRIEVAL_MODE(there is no--modeflag).
Optional: better file type detection (best-effort) using python-magic:
uv sync --frozen --extra magic
# or: pip install rag-prototype[magic]rag-ingest detection is Unicode-aware (UTF-8 text with non-ASCII characters is accepted) and
handles unreadable files as best-effort skips instead of aborting the full ingestion run.
Optional: Prometheus metrics (/metrics) and structured-ish domain metrics:
uv sync --frozen --extra monitoring
# then:
export ENABLE_MONITORING=true
rag-serverOptional: reranker (retrieval quality knob, measurable via rag-eval):
export ENABLE_RERANKER=true
export RERANKER_CANDIDATE_K=20# Build and start backend + Ollama
docker compose up -d --build
# (Optional) Pull a model into Ollama once the service is up
docker exec -it ollama ollama pull lfm2.5-thinking
# Verify services
curl http://localhost:8000/api/health
curl http://localhost:8000/api/health/ollamaNotes:
- Backend listens on
8000, Ollama on11434. - Configure providers via
.envor environment variables (see.env.example). - In
docker-compose.yml,OLLAMA_ENABLED=trueandOLLAMA_BASE_URL=http://ollama:11434are set. docker-compose.ymldefaults toRETRIEVAL_MODE=sparsefor a lightweight image.- For dense/hybrid in compose, build backend with extras, for example:
docker compose build --build-arg RAG_EXTRAS=dense rag-backend
# add dense-st too if you need SentenceTransformers:
# docker compose build --build-arg RAG_EXTRAS=dense,dense-st rag-backend
docker compose up -dDocker build expectations (CI parity). Recommended local verification:
docker build --target production .
.
├── data/ # CSV, SQLite DB, vector index files
├── src/local_rag_backend/
│ ├── core/ # domain, ports, services, use cases
│ │ ├── domain/ # entities, types, storage profiles
│ │ ├── ports/ # abstract contracts (Protocol-based)
│ │ ├── services/ # domain services (ETL, RAG runtime, reranking)
│ │ └── use_cases/ # application use cases (ingest, query, mutation, …)
│ ├── infrastructure/ # adapters: llms, retrievers, storage, loaders, observability
│ ├── composition/ # DI container, factory, wiring (transport-neutral)
│ ├── http/ # FastAPI transport adapter (routers, schemas, middleware)
│ ├── cli_commands/ # CLI transport adapters (ingest, mutate, eval, …)
│ ├── scripts/ # internal scripts (sample data ingestion)
│ └── frontend/ # packaged index.html to serve at /
└── tests/ # unit + integration + e2e- LLM: implement
GeneratorPort(seeinfrastructure/llms/*) and wire it incomposition/factory.py. - Retriever: implement
RetrieverPortand wire it throughcomposition/adapters.py(build_retriever_from_settings/build_retriever_with_default_embedder_from_settings). - Vector store: implement
VectorRepoPort(e.g., an alternative to FAISS). - Document store: implement
DocumentRepoPortto use a DB other than SQLite. - Loader: implement
LoaderPortfor new sources (PDFs, web, etc.).
-
GET /→ Serves packagedindex.htmlor the source treesrc/local_rag_backend/frontend/index.html. -
GET /api/healthandGET /api/ready -
GET /api/health/ollama -
GET /api/configandGET /api/templates -
POST /api/ask- Body:
{ "question": "str", "k": int (1..10, default 3) } - Response:
{ "answer": "str", "sources": [ { "document": {"id": "doc:...", "content": "str"}, "score": float(0..1) }, ... ] }
- Body:
-
POST /api/ask_eval(ephemeral per-request RAG config for retrieval/generator evaluation) -
GET /api/history?limit=1..100&offset>=0- Response: list of
{ id, question, answer, created_at, source_ids[] }wheresource_idsare string document IDs
- Response: list of
-
FastAPI docs:
GET /docsandGET /openapi.json -
POST /api/docs(ingest texts) andGET /api/docs(list docs) -
POST /api/docs/import(ingest conversations from ChatGPT/Gemini export JSON) -
POST /api/docs/mutate(canonical unified docs mutation: upserts, delete_ids, delete_external_ids) -
POST /api/index/rebuild(idempotent rebuild of vector index from SQLite; dense/hybrid only) -
POST /api/openrouter/generate(enabled if OpenRouter configured)
Notes:
- Retrieval “scores” are normalized to [0,1] in the adapters.
- The service persists each Q/A with the IDs of the retrieved sources (best-effort; retrieval/answer response is not blocked if history persistence fails).
- For
/api/ask, default provider selection isollama->openai->openrouterdepending on active configuration. - In dense/hybrid mode, the vector index is derived operational state; write via
/api/docs/mutate(orrag-mutate-docs) rather than mutating stores independently. - Write-path consistency uses
MutationCoordinatorwithDURABLE_SAGA: SQL commit + vector delta (apply_delta_atomic) + journaled compensation/recovery. - Full rebuild is an explicit repair operation only (
/api/index/rebuildorrag-rebuild-index), not a normal write fallback. - v1.0 removed legacy write endpoints:
/api/docs/upsert,/api/docs/delete,/api/docs/delete_by_external_id. - In dense/hybrid mode,
/api/readyis intentionally strict and returns503when it detects missing/corrupt index files or drift between SQLite documents and the vector index (hinting how to rebuild). - For public/proxy deployments, use
API_KEYand sanitizeX-Forwarded-For/Forwardedat the edge proxy.
Example:
curl -X POST "http://localhost:8000/api/ask" \
-H "Content-Type: application/json" \
-d '{"question": "What is RAG?", "k": 3}'The following diagram maps the real runtime path of a request from
src/local_rag_backend/http/routers/rag_router.py to core/ports and into
infrastructure/retrieval.
flowchart TD
C[Client HTTP] --> M[FastAPI app\nhttp/main.py]
M --> AR[API Router\nhttp/api_router.py]
AR --> RR[RAG Router\nhttp/routers/rag_router.py::ask]
RR --> D1[Dependency\nhttp/dependencies.py::get_rag_service]
D1 --> F1[Factory\ncomposition/factory.py::get_rag_service]
F1 --> AC[AppContainer\ncomposition/container.py::get_rag_service]
AC --> BRS[build_rag_service\ncomposition/container.py]
BRS --> RS[core/services/rag_runtime.py::RagService]
BRS --> COMP[build_retriever_with_default_embedder_from_settings\ncomposition/adapters.py]
COMP --> RP[core/ports::RetrieverPort]
RP --> SBR[infrastructure/retrieval/sparse_bm25.py::SparseBM25Retriever]
RP --> DFR[infrastructure/retrieval/dense_vector.py::DenseVectorRetriever]
RP --> HR[infrastructure/retrieval/hybrid.py::HybridRetriever]
COMP --> RER[core/services/reranking.py::RerankingRetriever]
RER --> RP
BRS --> GP[core/ports::GeneratorPort]
GP --> OAI[infrastructure/llms/openai_chat.py::OpenAIGenerator]
GP --> OLL[infrastructure/llms/ollama_chat.py::OllamaGenerator]
BRS --> HP[core/ports::QAHistoryPort]
HP --> HSQL[infrastructure/persistence/sql/history_storage.py::HistorySqlStorage]
RR --> RB[infrastructure/concurrency/blocking.py::run_blocking]
RB --> RS
RS --> RP
RS --> GP
RS --> HP
RS --> RR
RR --> RESP[HTTP response\nAskResponse]
sequenceDiagram
autonumber
participant Client
participant Router as http/routers/rag_router.py::ask
participant Dep as http/dependencies.py::get_rag_service
participant Factory as composition/factory.py::get_rag_service
participant Container as composition/container.py::AppContainer
participant RagService as core/services/rag_runtime.py::RagService
participant Retriever as core/ports::RetrieverPort
participant InfraRet as infrastructure/retrieval/*
participant Gen as core/ports::GeneratorPort
participant Hist as core/ports::QAHistoryPort
Client->>Router: POST /api/ask {question, k}
Router->>Dep: resolve RagService dependency
Dep->>Factory: get_rag_service()
Factory->>Container: get_rag_service() (cached by version)
Container-->>Factory: RagService instance
Factory-->>Dep: RagService
Dep-->>Router: RagService
Router->>RagService: run_blocking(service.ask, question, k)
RagService->>Retriever: retrieve(question, k)
Retriever->>InfraRet: SparseBM25Retriever OR DenseVectorRetriever OR HybridRetriever
InfraRet-->>Retriever: (docs, scores)
Retriever-->>RagService: (docs, scores)
RagService->>Gen: generate(question, contexts)
Gen-->>RagService: answer
RagService->>Hist: save(question, answer, source_ids)
RagService-->>Router: {answer, docs, scores}
Router-->>Client: AskResponse
http/schemas/*: HTTP request/response contracts (Pydantic transport layer).core/use_cases/results.py: use-case outputs shared by API/CLI.core/services/types.py: transport-agnostic core DTOs (chunking/eval/detection).core/domain/entities.py: domain entities and business invariants.infrastructure/persistence/*/models.py: ORM persistence models.
- Synchronous LLM clients (httpx/OpenAI SDK); migration to async is straightforward but not included.
- Minimal UI without front-end tests.
- Minimal API-key auth is available (
API_KEY), but there is no user/role authZ or rate limiting. - When using the FAISS backend, the index type is
IndexFlatL2(simple). For large volumes, consider IVF/HNSW or other backends.
- Singleton per process:
RagServiceis initialized as a singleton incomposition/factory. Withuvicorn --workers N, each process loads its own instance (and its retrieval/index adapters). Align deployment and warm-up as needed. - Cross-process coordination files: multi-store write lock and RAG reload token are stored in a shared coordination directory (
Settings.get_coordination_dir()), preferring explicitDATA_DIR; whenDATA_DIRis default andSQLITE_URLis absolute, it uses the DB parent directory to keep workers/CLI aligned. - Metrics: if
ENABLE_MONITORING=trueandprometheus-clientis installed,/metricsprovides Prometheus format. - Dense/Hybrid: must use the same embedding model for indexing and querying (
ST_EMBEDDING_MODEL).
UV_CACHE_DIR=.uv_cache uv sync --frozen --group test --group lint --extra server --no-default-groups
UV_CACHE_DIR=.uv_cache uv run --active --no-sync pytest -q
UV_CACHE_DIR=.uv_cache uv run --active --no-sync ruff check src tests
uv run pre-commit run --all-filesTest suite includes unit, integration, and E2E (FastAPI TestClient). The vector layer defaults to
VECTOR_BACKEND=auto(FAISS when available, NumPy fallback otherwise), and many tests use stubs/mocks for external providers. The suite enforces--cov-fail-under=85viapyproject.toml.
Current CI gates include:
pre-commit run --all-filesruff check src testsandruff format --check src testsmypy src- architecture guardrails:
pytest -q -o addopts='' tests/unit/http/test_architecture_*.py - tests on Python
3.11and3.12(Ubuntu) plus Windows smoke tests - security scan job (
bandit+safetyreport generation) - Docker build for
--target productiononmain/master
Workflow trigger note:
- PRs/commits that only change docs (
**/*.md,docs/**) do not trigger CI due topaths-ignorein.github/workflows/ci.yml. - Run local validation manually for doc-only changes when they alter architecture/API/operations guidance.
For local parity, use:
make lint
make type
make test
make sec # strict
make sec-soft # non-blocking local auditYou can ingest data from any LangChain document loader via the LangChainLoader adapter, which implements the project's LoaderPort.
Installation:
uv sync --frozen --extra loaders
# or when installing from PyPI:
# pip install rag-prototype[loaders]Quick usage example:
from langchain_community.document_loaders import WebBaseLoader
from local_rag_backend.core.services.etl import ETLService
from local_rag_backend.core.services.ingestion import IngestionPipeline
from local_rag_backend.infrastructure.ingestion.loaders import LangChainLoader
# 1) Create/obtain your ETLService as usual (doc store, vector store, embedder)
etl = ETLService(doc_repo, vector_repo, embedder)
# 2) Wrap any LangChain loader
lc_loader = WebBaseLoader(["https://example.com"]) # or DirectoryLoader, SitemapLoader, etc.
loader = LangChainLoader(lc_loader, drop_empty=True, metadata_filter={"lang": "en"})
# 3) Run the pipeline
pipeline = IngestionPipeline(loader=loader, etl_service=etl)
count = pipeline.run()
print(f"Ingested {count} chunks")Notes:
drop_empty=Trueskips whitespace-only documents.metadata_filter={...}yields only items whose metadata includes the given key/value pairs.- The adapter expects each LangChain
Documentto havepage_contentandmetadatafields. It gracefully falls back to dict-like objects or stringification when needed.
MIT. See LICENSE file for details.
Built with ❤️ by Intrinsical AI & Co.