From 705b02b767a6f19521d0a3df2adb098b72b6656c Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Wed, 20 May 2026 23:05:05 +0200 Subject: [PATCH 1/7] docs: add data-fetchers architecture plan --- docs/data-fetchers.md | 392 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 392 insertions(+) create mode 100644 docs/data-fetchers.md diff --git a/docs/data-fetchers.md b/docs/data-fetchers.md new file mode 100644 index 0000000..1240a87 --- /dev/null +++ b/docs/data-fetchers.md @@ -0,0 +1,392 @@ +# Data Fetchers + +Two new services to pull data from external sources and feed it into the existing +ingestion pipeline via the shared `data/incoming/` volume. + +--- + +## All ingress paths in one view + +The system has four ways data can enter the pipeline. It is worth naming them +together to see where the new fetchers fit: + +``` + PUSH (passive — external party initiates) + ───────────────────────────────────────────────────────────────────── + MNO via SFTP → sftp_receiver (port 2222) → incoming/ + User via browser → webserver /api/upload → incoming/ + + PULL (active — we initiate, scheduled) + ───────────────────────────────────────────────────────────────────── + External SFTP → sftp_fetcher (new) ─┐ + REST API → api_fetcher (new) ─┤→ incoming/ + + ↓ + parser → TimescaleDB +``` + +### The incoming SFTP server is infrastructure, not a fetcher + +`sftp_receiver` is a passive server (atmoz/sftp image) that MNOs are configured +to push files to. It does not contain polling logic or scheduling. It will stay +as-is; there is no reason to refactor it into a fetcher. + +### The web upload is part of the webserver, not a fetcher + +`/api/upload` (the drag-and-drop endpoint) is already implemented and functional — +it writes files directly to `incoming/` behind `@login_required`. It makes sense +to keep it in the webserver because it is tightly coupled to the user session and +the browser UI. It is not currently promoted as a primary workflow (MNOs use SFTP +instead), but it remains useful for ad-hoc uploads by administrators. + +If it eventually needs to be a first-class workflow, the work is in the UI +(`data_uploads.html`) and access control, not in refactoring it into a separate +service. + +**The new `sftp_fetcher` and `api_fetcher` are fetchers** because they contain +active scheduling, state management, and retry logic — none of which belong in a +webserver or a passive SSH daemon. + +### Why fetchers write directly to the shared volume, not through the SFTP server + +The SFTP server is an ingress point for external parties _pushing_ data in. +For data we actively pull, writing directly to the shared volume avoids an +unnecessary SSH round-trip and extra per-user account management. + +--- + +## Directory layout + +``` +fetchers/ + shared/ # Python package shared by both fetchers + __init__.py + config.py # load_config() with env-var substitution for secrets + incoming_writer.py # atomic_write(filename, content) → incoming/ + state.py # Persistent state (seen files / last cursor) via JSON file + polling.py # run_poll_loop(interval_s, callback) with exponential backoff + sftp_fetcher/ + fetcher.py # SFTPFetcher: list remote dir, download new files + main.py + config.yml + Dockerfile + requirements.txt + api_fetcher/ + fetcher.py # APIFetcher: GET endpoint, handle pagination/cursors + main.py + config.yml + Dockerfile + requirements.txt +``` + +Both Dockerfiles copy the `fetchers/shared/` package and add it to `PYTHONPATH`. + +--- + +## Shared components (`fetchers/shared/`) + +### `config.py` +Loads a YAML config file (same pattern as `mno_data_source_simulator`). +Secrets (passwords, API keys) are never stored in YAML; config values may reference +`$ENV_VAR_NAME` and the loader substitutes them from the environment. + +### `incoming_writer.py` +Writes a file atomically to the `incoming/` directory: +1. Write to a `.tmp` file in the same directory. +2. `os.replace()` to final filename. + +This prevents the parser's `file_watcher` from picking up a partial write +(the watcher already has a stability check, but atomic writes are a safer +belt-and-suspenders approach). + +### `state.py` +Persists fetcher state to a JSON file on the same volume so that a container +restart does not re-fetch everything: +- **SFTP**: set of seen `(filename, mtime)` pairs per remote path. +- **API**: per-endpoint cursor — either a UTC timestamp (`last_fetched_until`) or + an opaque pagination token, depending on what the API supports. In continuous + mode this cursor is what determines the `since` parameter of the next request; + in backfill mode it tracks progress through the requested window. + +### `polling.py` +A simple `while True` loop that calls a user-supplied `poll()` callback, +catches exceptions, logs them, and backs off exponentially (max ~5 min) before +retrying. On success it sleeps for the configured `poll_interval_seconds`. + +--- + +## `sftp_fetcher` + +Connects to one or more external SFTP servers, lists a configured remote path, +downloads files not yet seen (checked via `state.py`), and writes them to +`incoming/` via `incoming_writer.py`. + +Key config fields per source: +```yaml +sources: + - name: operator_x + host: sftp.operator-x.example + port: 22 + username: gmdi_pull + private_key_env: OPERATOR_X_SSH_KEY # path to key file, from env + remote_path: /outgoing/cml + poll_interval_seconds: 60 + file_glob: "*.csv" + after_download: leave # leave | delete | move (see below) +``` + +Library: `paramiko` (already used by `mno_data_source_simulator`). + +### Source cleanup (`after_download`) + +What to do with a file on the source SFTP after a successful download is +not yet decided. Three options: + +| Value | Behaviour | Requirement | +|---|---|---| +| `leave` | Nothing — file stays on source indefinitely | None (default, safest) | +| `delete` | Remove file from source immediately after download | Write permission on source | +| `move` | Rename to a `done/` subfolder on the source | Write permission on source | + +`leave` is the safest default: the MNO manages their own SFTP retention and we +never need write access. The `state.py` seen-file set is what prevents +re-downloading already-processed files. + +### Why not rsync? + +Rsync is attractive because it handles "what's new" naturally by comparing +source and destination trees. The problem is that the *destination* here is +`incoming/`, which is **transient** — the parser moves files to `archived/` +after processing. Rsync would see those files are gone locally and re-download +them on the next run. + +A workaround is to rsync into a **persistent local mirror** directory +(`fetchers/sftp_fetcher/mirror//`) and then copy only new files +from the mirror to `incoming/`. This works, but: +- doubles local storage, +- requires two-step logic (rsync + copy), +- adds an `rsync` binary dependency to the image. + +The `state.py` approach achieves the same result with less complexity: the +state file *is* the mirror index, persisted independently of the filesystem. +Rsync to a persistent mirror remains a valid alternative if the state file +feels fragile or if diff-based resume of partial downloads is needed. + +--- + +## `api_fetcher` + +Polls one or more REST endpoints and writes the raw JSON response to `incoming/` +as a `.json` file. No conversion is done in the fetcher — the raw payload is +preserved so no information is lost. The parser is responsible for interpreting +the JSON (see [Parser changes](#parser-changes-for-json-support) below). + +### Operating modes + +**Continuous** (default): fetches recent data on a repeating schedule. On each +poll, the `since` parameter is set to the `last_fetched_until` cursor stored by +`state.py`. After a successful response the cursor is advanced to +`now - overlap_seconds` (a small lookback guards against late-arriving data). +The fetcher runs indefinitely until stopped. + +**Backfill**: fetches a fixed historical window once, then exits (or optionally +switches to continuous mode afterwards). Useful for initial ingestion of +historical data. Progress through the window is checkpointed in `state.py` so +a crash can resume from where it left off rather than restarting the whole +period. The window is divided into chunks of `backfill_chunk_hours` to avoid +requests that are too large. + +Both modes use the same cursor mechanism in `state.py`; the difference is +whether the end of the window is `now` (continuous) or a fixed timestamp +(backfill). + +### What is and isn't standardized + +**Auth** (`/login/`, `/refresh/`) is standardized — implemented once in +`fetchers/shared/` and reused for every JWT-based source. + +**The query interface is not standardized** — parameter names, date formats, +pagination style, and source-specific filters all vary per API. These are +entirely driven by config, so no code changes are needed when adding a new +source. The config handles: + +| Concern | Config key | Example values | +|---|---|---| +| Window param names | `params:` keys | `date_from`/`date_to` vs `since`/`until` | +| Date/time format | `window_format` | `YYYY-MM-DD` vs `YYYY-MM-DDTHH:MM:SSZ` | +| Pagination style | `pagination.type` | `page` (number+size) vs `cursor` vs `offset` | +| Extra fixed params | `params:` literal values | `performance_event: RSL` | +| Multiple fetches per window | `param_variants` | separate RSL and TSL requests | + +### `param_variants` — multiple requests per window + +Some APIs require separate requests for each measurement type (e.g. +`performance_event: RSL` and `performance_event: TSL`). The `param_variants` +list causes the fetcher to issue one request per variant per window chunk and +write each result to a separate file: + +```yaml +sources: + - name: operator_y + url: https://api.operator-y.example/cml/ + auth: + type: jwt + login_url: https://api.operator-y.example/login/ + refresh_url: https://api.operator-y.example/refresh/ + username_env: OPERATOR_Y_USERNAME + password_env: OPERATOR_Y_PASSWORD + mode: continuous # continuous | backfill + poll_interval_seconds: 300 + overlap_seconds: 86400 # date-only resolution → 1-day lookback + backfill_start: "2025-01-01" + backfill_chunk_hours: 24 + window_format: "YYYY-MM-DD" # date-only, no time component + params: + date_from: "{window_start}" + date_to: "{window_end}" + page_size: 1000 + pagination: + type: page # page | cursor | offset | none + page_param: page + size_param: page_size + param_variants: # one file written per variant per window + - suffix: rsl + params: + performance_event: RSL + - suffix: tsl + params: + performance_event: TSL + output_filename_pattern: "{source_name}_{window_start}_{window_end}_{variant_suffix}_data.json" +``` + +The fetcher loops over variants × pages. Results for each variant are +collected into a single JSON file per window chunk before being written to +`incoming/`. + +### JWT auth flow + +The `jwt` auth type implements the standard refresh-token pattern: + +1. `POST /login/` `{username, password}` → `{access, refresh}` +2. Add `Authorization: Bearer ` to every data request +3. On 401, `POST /refresh/` `{refresh}` → `{access}`, retry once +4. If refresh also fails, re-login from credentials + +This pattern (used by DRF Simple JWT, FastAPI, and many others) is +implemented generically in `fetchers/shared/` — the only deployment-specific +config is `login_url`, `refresh_url`, and the credential env vars. No code +changes are needed when switching from the mock to a real API that follows +this flow. + +The **development mock** (a small Flask or FastAPI app committed to this repo) +implements the same three endpoints with dummy tokens and synthetic JSON data. +This means the full fetcher → parser → DB pipeline can be exercised locally +without any real credentials. + +Library: `httpx` (supports sync and async, has timeout/retry primitives). + +--- + +## Parser changes for JSON support + +The current parser dispatch in `service_logic.py` is hardcoded: +`"meta" in filename → parse_metadata_csv`, everything else → +`parse_rawdata_csv`. Adding JSON support is the natural first step of the +planned modular parser refactor. + +**Proposed dispatch approach:** route by file extension rather than filename +substring, with a registry of parser modules: + +``` +parser/parsers/ + demo_csv_data/ # existing + parse_raw.py + parse_metadata.py + api_json/ # new + parse_raw.py # JSON → DataFrame with the same schema + parse_metadata.py # optional: if the API also returns link metadata + field_map.yml # per-source field mapping (source_name → column names) +``` + +`service_logic.py` would select the parser module based on extension +(`.csv` → `demo_csv_data`, `.json` → `api_json`) and then apply the same +meta/data filename convention within each module. + +JSON files are archived exactly like CSV files (gzip-compressed to +`archived/YYYY-MM-DD/`) so the raw API payloads are retained alongside the +processed data. + +--- + +## Docker Compose integration + +Add both services to `docker-compose.override.yml`: + +```yaml +services: + sftp_fetcher: + build: ./fetchers/sftp_fetcher + volumes: + - ./data:/app/data + - ./fetchers/shared:/app/shared # or baked into image + env_file: .env + restart: unless-stopped + + api_fetcher: + build: ./fetchers/api_fetcher + volumes: + - ./data:/app/data + - ./fetchers/shared:/app/shared + env_file: .env + restart: unless-stopped +``` + +No dependency on `parser` is needed — they just share the volume. + +--- + +## Open questions + +- **JSON metadata**: For the initial implementation, link metadata is supplied + as a one-off CSV file (same as today). Two upgrade paths for later: + (a) metadata embedded in the raw data JSON — the `api_json` parser extracts + and upserts it alongside measurements; (b) metadata available at a separate + API endpoint — `api_fetcher` fetches it once at startup (or on a slow + schedule) and writes it to `incoming/` as a metadata JSON file. +- **File naming for JSON**: Filename is free to define. Suggested convention: + `{mno_username}_{window_start}_{window_end}_data.json` + e.g. `operator_y_20260101T000000Z_20260102T000000Z_data.json`. + This embeds source identity and query window, is sortable, and leaves room + for a future `_meta.json` sibling. The meta/data distinction by filename + substring (currently `"meta"` vs `"data"/"raw"`) will be revisited as part + of the modular parser work — extension-based dispatch makes it redundant for + CSV too. +- **Credentials management**: For now, env vars via `.env` / Docker secrets. + If the number of sources grows, a secrets manager (Vault, AWS SSM) may be + worth considering. +- **Shared package versioning**: `fetchers/shared/` is copied into both + images at build time. A change to shared code requires rebuilding both + images — acceptable for now, worth noting. + +--- + +## Implementation order + +API pull is priority 1. The full modular parser refactor is deferred until a +second format makes it worthwhile. + +**PR 1 — `feat/api-fetcher`** *(self-contained end-to-end, no existing code broken)* +1. `fetchers/shared/` package (config, incoming_writer, state, polling) +2. `fetchers/api_fetcher/` service (auth, fetcher, main, Dockerfile) +3. `fetchers/api_fetcher/mock_server/` — Flask app for local dev and integration tests +4. `parser/parsers/api_json/` — JSON parser + `example_field_map.yml` +5. Minimal JSON branch in `parser/service_logic.py` (single `elif` on `.json` extension) +6. Integration test: mock server fixture, assert records land in DB + +**PR 2 — `feat/sftp-fetcher`** *(no parser changes; reuses `fetchers/shared/` from PR 1)* +1. `fetchers/sftp_fetcher/` service +2. Integration test: mock SFTP server, assert files land in `incoming/` + +**PR 3 — `feat/parser-modular`** *(cleanup; defer until a second format arrives)* +1. Extension-based registry replacing `"meta"/"data"` filename substring checks +2. Revisit meta/data naming convention across all formats From c050a162576926da85aa78af72b1a162af1e5676 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Wed, 20 May 2026 23:05:32 +0200 Subject: [PATCH 2/7] feat: add fetchers/shared package and api_fetcher service - fetchers/shared: config loader, atomic incoming writer, state persistence (cursor + seen-files), poll loop with exponential backoff - fetchers/api_fetcher: JWT auth with auto-refresh, windowed paginated fetcher (continuous + backfill modes, param_variants for RSL/TSL), entry point, example config.yml, Dockerfile (build ctx ./fetchers) - fetchers/api_fetcher/mock_server: minimal Flask mock with JWT auth and synthetic hourly CML data (POST /login/, POST /refresh/, GET /cml/) --- fetchers/api_fetcher/Dockerfile | 26 ++ fetchers/api_fetcher/auth.py | 95 ++++++++ fetchers/api_fetcher/config.yml | 63 +++++ fetchers/api_fetcher/fetcher.py | 228 ++++++++++++++++++ fetchers/api_fetcher/main.py | 114 +++++++++ fetchers/api_fetcher/mock_server/Dockerfile | 6 + fetchers/api_fetcher/mock_server/app.py | 147 +++++++++++ .../api_fetcher/mock_server/requirements.txt | 1 + fetchers/api_fetcher/requirements.txt | 2 + fetchers/shared/__init__.py | 0 fetchers/shared/config.py | 32 +++ fetchers/shared/incoming_writer.py | 31 +++ fetchers/shared/polling.py | 34 +++ fetchers/shared/state.py | 84 +++++++ 14 files changed, 863 insertions(+) create mode 100644 fetchers/api_fetcher/Dockerfile create mode 100644 fetchers/api_fetcher/auth.py create mode 100644 fetchers/api_fetcher/config.yml create mode 100644 fetchers/api_fetcher/fetcher.py create mode 100644 fetchers/api_fetcher/main.py create mode 100644 fetchers/api_fetcher/mock_server/Dockerfile create mode 100644 fetchers/api_fetcher/mock_server/app.py create mode 100644 fetchers/api_fetcher/mock_server/requirements.txt create mode 100644 fetchers/api_fetcher/requirements.txt create mode 100644 fetchers/shared/__init__.py create mode 100644 fetchers/shared/config.py create mode 100644 fetchers/shared/incoming_writer.py create mode 100644 fetchers/shared/polling.py create mode 100644 fetchers/shared/state.py diff --git a/fetchers/api_fetcher/Dockerfile b/fetchers/api_fetcher/Dockerfile new file mode 100644 index 0000000..f4024d9 --- /dev/null +++ b/fetchers/api_fetcher/Dockerfile @@ -0,0 +1,26 @@ +# Build context must be ./fetchers/ (not this subdirectory) so that the +# shared/ package can be COPY-ed alongside api_fetcher/. +# +# In docker-compose.dev.yml: +# build: +# context: ./fetchers +# dockerfile: api_fetcher/Dockerfile + +FROM python:3.11-slim + +WORKDIR /app + +# Install dependencies first (layer-cached unless requirements change) +COPY api_fetcher/requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the shared package and the fetcher itself +COPY shared/ ./shared/ +COPY api_fetcher/ ./api_fetcher/ + +# PYTHONPATH lets "from shared.xxx import ..." resolve to /app/shared +ENV PYTHONPATH=/app + +WORKDIR /app/api_fetcher + +CMD ["python", "main.py"] diff --git a/fetchers/api_fetcher/auth.py b/fetchers/api_fetcher/auth.py new file mode 100644 index 0000000..20959f0 --- /dev/null +++ b/fetchers/api_fetcher/auth.py @@ -0,0 +1,95 @@ +import logging +from typing import Optional + +logger = logging.getLogger(__name__) + + +class JWTAuth: + """Manages JWT-based authentication for a DRF Simple JWT API. + + Holds the active access/refresh token pair and transparently refreshes or + re-logs in when requests fail with HTTP 401. + + Usage:: + + auth = JWTAuth(login_url="http://api/login/", refresh_url="http://api/refresh/", + username="user", password="pass") + response = auth.get(client, "http://api/data/", params={"foo": "bar"}) + """ + + def __init__( + self, + login_url: str, + refresh_url: str, + username: str, + password: str, + ): + self._login_url = login_url + self._refresh_url = refresh_url + self._username = username + self._password = password + self._access_token: Optional[str] = None + self._refresh_token: Optional[str] = None + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _login(self, client) -> None: + logger.info("Logging in to %s", self._login_url) + resp = client.post( + self._login_url, + json={"username": self._username, "password": self._password}, + ) + resp.raise_for_status() + data = resp.json() + self._access_token = data["access"] + self._refresh_token = data["refresh"] + + def _refresh(self, client) -> None: + logger.debug("Refreshing access token at %s", self._refresh_url) + resp = client.post( + self._refresh_url, json={"refresh": self._refresh_token} + ) + if resp.status_code == 401: + # Refresh token itself has expired — fall back to full login + logger.info("Refresh token expired, re-logging in") + self._login(client) + return + resp.raise_for_status() + self._access_token = resp.json()["access"] + + def _auth_headers(self) -> dict: + return {"Authorization": f"Bearer {self._access_token}"} + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def get(self, client, url: str, params: Optional[dict] = None) -> object: + """Perform an authenticated GET, handling 401 with auto-refresh. + + Tries up to three times: + 1. First attempt with the cached access token (logging in first if needed). + 2. On 401 → refresh the access token and retry. + 3. If still 401 → full re-login and final retry. + """ + if self._access_token is None: + self._login(client) + + for attempt in range(3): + resp = client.get(url, params=params, headers=self._auth_headers()) + if resp.status_code == 401: + if attempt == 0: + self._refresh(client) + elif attempt == 1: + self._login(client) + else: + resp.raise_for_status() + else: + resp.raise_for_status() + return resp + + # Should be unreachable, but keeps type checkers happy + resp.raise_for_status() # type: ignore[union-attr] + return resp diff --git a/fetchers/api_fetcher/config.yml b/fetchers/api_fetcher/config.yml new file mode 100644 index 0000000..17ee5dc --- /dev/null +++ b/fetchers/api_fetcher/config.yml @@ -0,0 +1,63 @@ +# ── api_fetcher/config.yml ──────────────────────────────────────────────────── +# Example configuration pointing at the local mock server. +# +# Secrets (username/password) are never stored here. +# Fields ending in _env hold the NAME of an environment variable that the +# fetcher reads at start-up. +# ───────────────────────────────────────────────────────────────────────────── + +log_level: INFO + +# Where the parser picks up incoming files (shared Docker volume). +incoming_dir: /app/data/incoming + +# State persisted across container restarts. +state_path: /app/data/state/api_fetcher_state.json + +# How often to run the fetch loop (seconds). +poll_interval_seconds: 60 + +sources: + - name: mock_operator + mode: continuous # or "backfill" + # backfill_start: "2026-01-01" + # backfill_end: "2026-02-01" + + # ------------------------------------------------------------------ + # Time-window settings + # ------------------------------------------------------------------ + window_format: date # "date" → YYYY-MM-DD, "datetime" → ISO 8601 UTC + chunk_hours: 24 # size of each fetch window + overlap_seconds: 86400 # look back 1 day to avoid missing records at midnight + + # ------------------------------------------------------------------ + # Authentication + # ------------------------------------------------------------------ + auth: + type: jwt + login_url: http://mock_server:5001/login/ + refresh_url: http://mock_server:5001/refresh/ + username_env: API_USERNAME # read from environment at runtime + password_env: API_PASSWORD + + # ------------------------------------------------------------------ + # Query endpoint and parameters + # ------------------------------------------------------------------ + endpoint: http://mock_server:5001/cml/ + date_from_key: date_from + date_to_key: date_to + page_key: page + page_size_key: page_size + page_size: 100 + results_key: results # key inside the paginated response object + + # Base params sent with every request (no variant-specific params here). + base_params: {} + + # Each variant produces a separate JSON file. The "suffix" key is used + # only for the filename; all other keys are passed as query params. + param_variants: + - suffix: rsl + performance_event: RSL + - suffix: tsl + performance_event: TSL diff --git a/fetchers/api_fetcher/fetcher.py b/fetchers/api_fetcher/fetcher.py new file mode 100644 index 0000000..79f16da --- /dev/null +++ b/fetchers/api_fetcher/fetcher.py @@ -0,0 +1,228 @@ +"""APIFetcher: polls a paginated REST API and writes raw JSON files. + +One APIFetcher is created per *source* entry in ``config.yml``. Each fetch +cycle: + +1. Computes the ``[window_start, window_end)`` time window. +2. For every variant in ``param_variants`` (e.g. RSL vs TSL), fetches all + pages from the API. +3. Writes the combined page results as a single JSON file to + ``incoming_dir/``. +4. Advances the state cursor so the next cycle continues from ``window_end``. +""" + +import json +import logging +from datetime import datetime, timezone, timedelta +from pathlib import Path +from typing import Optional + +from shared.incoming_writer import atomic_write +from shared.state import FetcherState + +logger = logging.getLogger(__name__) + +_DT_FMT = "%Y-%m-%dT%H:%M:%SZ" # datetime window_format +_DATE_FMT = "%Y-%m-%d" # date window_format + + +class APIFetcher: + """Fetch one configured API source. + + :param name: Source name; used as the file-name prefix and state key. + :param cfg: The ``source`` dict from ``config.yml``. + :param incoming_dir: ``Path`` to the shared ``data/incoming/`` directory. + :param state: Shared :class:`~shared.state.FetcherState` instance. + :param auth: A :class:`~auth.JWTAuth` (or compatible) auth object. + :param client: An open ``httpx.Client`` session. + """ + + def __init__( + self, + name: str, + cfg: dict, + incoming_dir: Path, + state: FetcherState, + auth, + client, + ): + self.name = name + self.cfg = cfg + self.incoming_dir = incoming_dir + self.state = state + self.auth = auth + self.client = client + + self._window_format: str = cfg.get("window_format", "datetime") + self._chunk_hours: int = cfg.get("chunk_hours", 24) + self._overlap_seconds: int = cfg.get("overlap_seconds", 60) + self._mode: str = cfg.get("mode", "continuous") + self._endpoint: str = cfg["endpoint"] + self._base_params: dict = cfg.get("base_params", {}) + self._param_variants: list[dict] = cfg.get("param_variants", [{}]) + self._page_key: str = cfg.get("page_key", "page") + self._page_size_key: str = cfg.get("page_size_key", "page_size") + self._page_size: int = cfg.get("page_size", 100) + self._results_key: Optional[str] = cfg.get("results_key") + self._date_from_key: str = cfg.get("date_from_key", "date_from") + self._date_to_key: str = cfg.get("date_to_key", "date_to") + + # Backfill mode: fixed range + self._backfill_start: Optional[str] = cfg.get("backfill_start") + self._backfill_end: Optional[str] = cfg.get("backfill_end") + + # ------------------------------------------------------------------ + # Window computation + # ------------------------------------------------------------------ + + def _fmt(self, dt: datetime) -> str: + if self._window_format == "date": + return dt.strftime(_DATE_FMT) + return dt.strftime(_DT_FMT) + + def _parse(self, s: str) -> datetime: + for fmt in (_DT_FMT, _DATE_FMT): + try: + dt = datetime.strptime(s, fmt) + return dt.replace(tzinfo=timezone.utc) + except ValueError: + pass + raise ValueError(f"Cannot parse datetime string: {s!r}") + + def _next_window(self) -> Optional[tuple[str, str]]: + now = datetime.now(tz=timezone.utc) + + if self._mode == "backfill": + if not self._backfill_start or not self._backfill_end: + raise ValueError( + f"Source {self.name!r}: backfill mode requires " + "'backfill_start' and 'backfill_end'" + ) + cursor = self.state.get_cursor(self.name) + start = self._parse(cursor) if cursor else self._parse(self._backfill_start) + end_limit = self._parse(self._backfill_end) + + if start >= end_limit: + logger.info( + "Source %s: backfill complete (cursor >= backfill_end)", self.name + ) + return None + + end = min(start + timedelta(hours=self._chunk_hours), end_limit) + return self._fmt(start), self._fmt(end) + + # continuous mode + cursor = self.state.get_cursor(self.name) + if cursor: + start = self._parse(cursor) - timedelta(seconds=self._overlap_seconds) + else: + start = now - timedelta(hours=self._chunk_hours) + + end = min(start + timedelta(hours=self._chunk_hours), now) + if end <= start: + return None + + return self._fmt(start), self._fmt(end) + + # ------------------------------------------------------------------ + # Fetching + # ------------------------------------------------------------------ + + def _fetch_variant(self, date_from: str, date_to: str, variant: dict) -> list: + """Fetch all pages for one variant, return combined records list.""" + params = { + **self._base_params, + self._date_from_key: date_from, + self._date_to_key: date_to, + **variant, + } + if self._page_size: + params[self._page_size_key] = self._page_size + + records: list = [] + page = 1 + while True: + params[self._page_key] = page + resp = self.auth.get(self.client, self._endpoint, params=params) + data = resp.json() + + if self._results_key: + page_records = data.get(self._results_key, []) + elif isinstance(data, list): + page_records = data + else: + # Assume a mapping with a natural results container + page_records = data.get("results", data.get("data", [])) + + records.extend(page_records) + + total_count = data.get("count") if isinstance(data, dict) else None + if total_count is not None and len(records) >= total_count: + break + if len(page_records) < self._page_size: + break + page += 1 + + logger.debug( + "Source %s variant %s: fetched %d records for %s→%s", + self.name, + variant, + len(records), + date_from, + date_to, + ) + return records + + def _make_filename(self, date_from: str, date_to: str, variant: dict) -> str: + # Use the "suffix" key of the variant as a file tag, or empty. + suffix = variant.get("suffix", "") + safe_from = date_from.replace(":", "").replace("-", "") + safe_to = date_to.replace(":", "").replace("-", "") + parts = [self.name, safe_from, safe_to] + if suffix: + parts.append(suffix) + parts.append("data") + return "_".join(parts) + ".json" + + # ------------------------------------------------------------------ + # Main entry point + # ------------------------------------------------------------------ + + def fetch(self) -> bool: + """Run one fetch cycle. Returns ``True`` if any data was written.""" + window = self._next_window() + if window is None: + return False + + date_from, date_to = window + wrote_any = False + + for variant in self._param_variants: + # Remove the internal 'suffix' key before sending to the API + api_variant = {k: v for k, v in variant.items() if k != "suffix"} + try: + records = self._fetch_variant(date_from, date_to, api_variant) + except Exception: + logger.exception( + "Source %s: failed fetching variant %s", self.name, variant + ) + continue + + if not records: + logger.info( + "Source %s: no records for %s→%s variant %s", + self.name, + date_from, + date_to, + variant, + ) + else: + filename = self._make_filename(date_from, date_to, variant) + content = json.dumps(records, ensure_ascii=False).encode() + atomic_write(self.incoming_dir, filename, content) + wrote_any = True + + # Advance cursor to window end regardless of whether we got records + # (so we don't repeatedly re-fetch empty windows). + self.state.set_cursor(self.name, date_to) + return wrote_any diff --git a/fetchers/api_fetcher/main.py b/fetchers/api_fetcher/main.py new file mode 100644 index 0000000..d0d4b94 --- /dev/null +++ b/fetchers/api_fetcher/main.py @@ -0,0 +1,114 @@ +"""Entry point for the API fetcher service. + +Reads ``config.yml`` (or the path set by ``CONFIG_PATH``), creates one +:class:`~fetcher.APIFetcher` per source, and drives them in a round-robin +poll loop. +""" + +import logging +import os +import sys +from pathlib import Path + +import httpx + +from shared.config import load_config, resolve_env +from shared.polling import run_poll_loop +from shared.state import FetcherState +from auth import JWTAuth +from fetcher import APIFetcher + + +def setup_logging(level: str = "INFO") -> None: + logging.basicConfig( + level=getattr(logging, level.upper(), logging.INFO), + format="%(asctime)s %(name)-30s %(levelname)-8s %(message)s", + ) + + +def build_auth(auth_cfg: dict, client: httpx.Client): + auth_type = auth_cfg.get("type", "jwt") + if auth_type == "jwt": + username = resolve_env(auth_cfg["username_env"]) + password = resolve_env(auth_cfg["password_env"]) + return JWTAuth( + login_url=auth_cfg["login_url"], + refresh_url=auth_cfg["refresh_url"], + username=username, + password=password, + ) + if auth_type == "bearer": + token = resolve_env(auth_cfg["token_env"]) + + class _BearerAuth: + def get(self, c, url, params=None): + resp = c.get( + url, params=params, headers={"Authorization": f"Bearer {token}"} + ) + resp.raise_for_status() + return resp + + return _BearerAuth() + if auth_type == "api_key": + key = resolve_env(auth_cfg["key_env"]) + header = auth_cfg.get("header", "X-Api-Key") + + class _APIKeyAuth: + def get(self, c, url, params=None): + resp = c.get(url, params=params, headers={header: key}) + resp.raise_for_status() + return resp + + return _APIKeyAuth() + raise ValueError(f"Unsupported auth type: {auth_type!r}") + + +def main() -> None: + config_path = os.environ.get("CONFIG_PATH", "config.yml") + cfg = load_config(config_path) + + log_level = cfg.get("log_level", os.environ.get("LOG_LEVEL", "INFO")) + setup_logging(log_level) + log = logging.getLogger("api_fetcher") + + incoming_dir = Path(os.environ.get("INCOMING_DIR", cfg.get("incoming_dir", "/app/data/incoming"))) + state_path = Path(os.environ.get("STATE_PATH", cfg.get("state_path", "/app/data/state/api_fetcher_state.json"))) + poll_interval = float(cfg.get("poll_interval_seconds", 60)) + + state = FetcherState(state_path) + + sources: list[dict] = cfg.get("sources", []) + if not sources: + log.error("No sources defined in config; exiting") + sys.exit(1) + + # Shared HTTP client (connection pooling across all sources) + with httpx.Client(timeout=30) as client: + fetchers: list[APIFetcher] = [] + for src in sources: + auth = build_auth(src["auth"], client) + fetcher = APIFetcher( + name=src["name"], + cfg=src, + incoming_dir=incoming_dir, + state=state, + auth=auth, + client=client, + ) + fetchers.append(fetcher) + log.info("Registered source %r (mode=%s)", src["name"], src.get("mode", "continuous")) + + def poll(): + for f in fetchers: + f.fetch() + + log.info( + "Starting poll loop: %d source(s), interval=%.0fs", + len(fetchers), + poll_interval, + ) + run_poll_loop(poll, poll_interval, log) + + +if __name__ == "__main__": + main() diff --git a/fetchers/api_fetcher/mock_server/Dockerfile b/fetchers/api_fetcher/mock_server/Dockerfile new file mode 100644 index 0000000..bd171e5 --- /dev/null +++ b/fetchers/api_fetcher/mock_server/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY app.py . +CMD ["python", "app.py"] diff --git a/fetchers/api_fetcher/mock_server/app.py b/fetchers/api_fetcher/mock_server/app.py new file mode 100644 index 0000000..ec43436 --- /dev/null +++ b/fetchers/api_fetcher/mock_server/app.py @@ -0,0 +1,147 @@ +"""Mock REST API server for api_fetcher development and testing. + +Implements the DRF Simple JWT + CML data endpoints that the api_fetcher +expects. Never use in production. + +Endpoints: + POST /login/ → {access, refresh} + POST /refresh/ → {access} or 401 + GET /cml/ → {count, next, previous, results: [...]} + +Query params for GET /cml/: + date_from YYYY-MM-DD + date_to YYYY-MM-DD + performance_event RSL or TSL + page 1-based page number + page_size records per page (default 100) +""" + +import secrets +from datetime import datetime, timedelta + +from flask import Flask, jsonify, request + +app = Flask(__name__) + +# ── In-memory token store ───────────────────────────────────────────────────── +# Maps token_hex → {"type": "access"|"refresh", "username": str} +_TOKENS: dict[str, dict] = {} + +_VALID_USERS = {"testuser": "testpass"} + + +def _new_token(username: str, token_type: str) -> str: + tok = secrets.token_hex(24) + _TOKENS[tok] = {"type": token_type, "username": username} + return tok + + +def _require_bearer() -> tuple[bool, str]: + """Return (ok, username_or_error_msg).""" + header = request.headers.get("Authorization", "") + if not header.startswith("Bearer "): + return False, "Missing or malformed Authorization header" + token = header.split(" ", 1)[1] + entry = _TOKENS.get(token) + if not entry or entry["type"] != "access": + return False, "Invalid or expired access token" + return True, entry["username"] + + +# ── Auth endpoints ──────────────────────────────────────────────────────────── + + +@app.post("/login/") +def login(): + body = request.get_json(silent=True) or {} + username = body.get("username", "") + password = body.get("password", "") + if _VALID_USERS.get(username) != password: + return jsonify({"detail": "No active account found with the given credentials"}), 401 + access = _new_token(username, "access") + refresh = _new_token(username, "refresh") + return jsonify({"access": access, "refresh": refresh}) + + +@app.post("/refresh/") +def refresh(): + body = request.get_json(silent=True) or {} + tok = body.get("refresh", "") + entry = _TOKENS.get(tok) + if not entry or entry["type"] != "refresh": + return jsonify({"detail": "Token is invalid or expired"}), 401 + new_access = _new_token(entry["username"], "access") + return jsonify({"access": new_access}) + + +# ── Data endpoint ───────────────────────────────────────────────────────────── + + +def _generate_records( + date_from: str, + date_to: str, + performance_event: str, + link_ids: list[str], +) -> list[dict]: + """Synthesise hourly CML records for the requested window.""" + try: + start = datetime.strptime(date_from, "%Y-%m-%d") + end = datetime.strptime(date_to, "%Y-%m-%d") + except ValueError: + return [] + + records = [] + current = start + while current < end: + for link_id in link_ids: + records.append( + { + "timestamp": current.strftime("%Y-%m-%dT%H:%M:%SZ"), + "link_id": link_id, + "sublink_id": "1", + "performance_event": performance_event, + # Synthetic measurement value + "value": round(-40.0 - (hash((current, link_id)) % 20), 1), + } + ) + current += timedelta(hours=1) + return records + + +@app.get("/cml/") +def cml_data(): + ok, username_or_err = _require_bearer() + if not ok: + return jsonify({"detail": username_or_err}), 401 + + date_from = request.args.get("date_from", "") + date_to = request.args.get("date_to", "") + performance_event = request.args.get("performance_event", "RSL").upper() + page = int(request.args.get("page", 1)) + page_size = int(request.args.get("page_size", 100)) + + if not date_from or not date_to: + return jsonify({"detail": "date_from and date_to are required"}), 400 + if performance_event not in ("RSL", "TSL"): + return jsonify({"detail": "performance_event must be RSL or TSL"}), 400 + + link_ids = ["10001", "10002", "10003"] + all_records = _generate_records(date_from, date_to, performance_event, link_ids) + + total = len(all_records) + start_idx = (page - 1) * page_size + end_idx = start_idx + page_size + page_records = all_records[start_idx:end_idx] + + return jsonify( + { + "count": total, + "next": None if end_idx >= total else f"?page={page + 1}", + "previous": None if page == 1 else f"?page={page - 1}", + "results": page_records, + } + ) + + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=5001, debug=False) diff --git a/fetchers/api_fetcher/mock_server/requirements.txt b/fetchers/api_fetcher/mock_server/requirements.txt new file mode 100644 index 0000000..001e7c4 --- /dev/null +++ b/fetchers/api_fetcher/mock_server/requirements.txt @@ -0,0 +1 @@ +flask>=3.0 diff --git a/fetchers/api_fetcher/requirements.txt b/fetchers/api_fetcher/requirements.txt new file mode 100644 index 0000000..21b8f47 --- /dev/null +++ b/fetchers/api_fetcher/requirements.txt @@ -0,0 +1,2 @@ +httpx>=0.27 +pyyaml>=6.0 diff --git a/fetchers/shared/__init__.py b/fetchers/shared/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fetchers/shared/config.py b/fetchers/shared/config.py new file mode 100644 index 0000000..ed65d89 --- /dev/null +++ b/fetchers/shared/config.py @@ -0,0 +1,32 @@ +import logging +import os + +import yaml + +logger = logging.getLogger(__name__) + + +def load_config(config_path: str = "config.yml") -> dict: + """Load a YAML config file. + + Secrets are never stored inline in YAML. Fields ending in ``_env`` + (e.g. ``username_env``) hold the *name* of an environment variable; + call :func:`resolve_env` to read the actual value at runtime. + """ + logger.info("Loading config from %s", config_path) + with open(config_path) as f: + return yaml.safe_load(f) + + +def resolve_env(var_name: str) -> str: + """Read a required environment variable. + + Raises ``ValueError`` if the variable is not set, so misconfiguration + fails loudly at startup rather than silently later. + """ + value = os.environ.get(var_name) + if value is None: + raise ValueError( + f"Required environment variable {var_name!r} is not set" + ) + return value diff --git a/fetchers/shared/incoming_writer.py b/fetchers/shared/incoming_writer.py new file mode 100644 index 0000000..feddce2 --- /dev/null +++ b/fetchers/shared/incoming_writer.py @@ -0,0 +1,31 @@ +import logging +import os +from pathlib import Path + +logger = logging.getLogger(__name__) + + +def atomic_write(incoming_dir: Path, filename: str, content: bytes) -> Path: + """Write *content* to ``incoming_dir/filename`` atomically. + + Writes to a ``.tmp`` sibling first, then uses ``os.replace()`` so the + parser's file-watcher never observes a partial file. The watcher already + has its own stability check, but atomic writes are a belt-and-suspenders + guarantee. + """ + incoming_dir = Path(incoming_dir) + incoming_dir.mkdir(parents=True, exist_ok=True) + tmp_path = incoming_dir / (filename + ".tmp") + final_path = incoming_dir / filename + try: + tmp_path.write_bytes(content) + os.replace(tmp_path, final_path) + logger.info("Wrote %s (%d bytes)", final_path.name, len(content)) + return final_path + except Exception: + if tmp_path.exists(): + try: + tmp_path.unlink() + except OSError: + pass + raise diff --git a/fetchers/shared/polling.py b/fetchers/shared/polling.py new file mode 100644 index 0000000..5ebb379 --- /dev/null +++ b/fetchers/shared/polling.py @@ -0,0 +1,34 @@ +import logging +import time +from typing import Callable + +logger = logging.getLogger(__name__) + +_MAX_BACKOFF_S = 300 # 5 minutes + + +def run_poll_loop( + poll_fn: Callable, + interval_s: float, + log: logging.Logger = None, +) -> None: + """Call *poll_fn* repeatedly, sleeping *interval_s* between successful calls. + + On exception: logs the error, backs off exponentially (doubles each + failure up to ``_MAX_BACKOFF_S``), then retries. The backoff resets to + *interval_s* after a successful call. + + Runs until the process is killed (``SIGTERM`` / ``SIGINT``). + """ + if log is None: + log = logger + backoff = interval_s + while True: + try: + poll_fn() + backoff = interval_s # reset on success + time.sleep(interval_s) + except Exception: + log.exception("Poll failed, retrying in %.0fs", backoff) + time.sleep(backoff) + backoff = min(backoff * 2, _MAX_BACKOFF_S) diff --git a/fetchers/shared/state.py b/fetchers/shared/state.py new file mode 100644 index 0000000..f027822 --- /dev/null +++ b/fetchers/shared/state.py @@ -0,0 +1,84 @@ +import json +import logging +import os +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + + +class FetcherState: + """Persist fetcher progress to a JSON file so container restarts resume + from where they left off rather than re-fetching everything. + + API fetchers use the cursor methods (``get_cursor`` / ``set_cursor``). + SFTP fetchers use the seen-files methods (``is_seen`` / ``mark_seen``). + """ + + def __init__(self, state_path: Path): + self.path = Path(state_path) + self._data: dict = self._load() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _load(self) -> dict: + if self.path.exists(): + try: + with open(self.path) as f: + return json.load(f) + except (json.JSONDecodeError, OSError) as exc: + logger.warning( + "Could not load state from %s (%s); starting fresh", + self.path, + exc, + ) + return {"sources": {}} + + def _save(self) -> None: + self.path.parent.mkdir(parents=True, exist_ok=True) + tmp = self.path.with_suffix(".tmp") + try: + with open(tmp, "w") as f: + json.dump(self._data, f, indent=2) + os.replace(tmp, self.path) + except Exception: + if tmp.exists(): + try: + tmp.unlink() + except OSError: + pass + raise + + def _source(self, name: str) -> dict: + return self._data["sources"].setdefault(name, {}) + + # ------------------------------------------------------------------ + # API cursor + # ------------------------------------------------------------------ + + def get_cursor(self, source_name: str) -> Optional[str]: + """Return the ``last_fetched_until`` ISO timestamp for an API source, + or ``None`` if this source has never been fetched.""" + return self._source(source_name).get("last_fetched_until") + + def set_cursor(self, source_name: str, value: str) -> None: + """Advance the cursor for an API source and persist to disk.""" + self._source(source_name)["last_fetched_until"] = value + self._save() + + # ------------------------------------------------------------------ + # SFTP seen-files + # ------------------------------------------------------------------ + + def is_seen(self, source_name: str, filename: str, mtime: str) -> bool: + """Return ``True`` if this ``(filename, mtime)`` pair was already + downloaded from the named SFTP source.""" + seen = self._source(source_name).get("seen_files", {}) + return seen.get(filename) == mtime + + def mark_seen(self, source_name: str, filename: str, mtime: str) -> None: + """Record that a file has been downloaded and persist.""" + self._source(source_name).setdefault("seen_files", {})[filename] = mtime + self._save() From afee6ab2db00666ec0f77465b0697c2a15ac7aeb Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Wed, 20 May 2026 23:06:56 +0200 Subject: [PATCH 3/7] =?UTF-8?q?feat:=20add=20parser/parsers/api=5Fjson=20?= =?UTF-8?q?=E2=80=94=20JSON=20raw-data=20parser?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Parses JSON files produced by api_fetcher. A field map (loaded from FIELD_MAP_PATH env var) does longest-prefix matching on the filename stem to route source fields to [time, cml_id, sublink_id, rsl, tsl]. Includes example_field_map.yml for the mock server (RSL + TSL split). --- parser/parsers/api_json/__init__.py | 0 parser/parsers/api_json/example_field_map.yml | 28 +++++ parser/parsers/api_json/parse_raw.py | 111 ++++++++++++++++++ 3 files changed, 139 insertions(+) create mode 100644 parser/parsers/api_json/__init__.py create mode 100644 parser/parsers/api_json/example_field_map.yml create mode 100644 parser/parsers/api_json/parse_raw.py diff --git a/parser/parsers/api_json/__init__.py b/parser/parsers/api_json/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/parser/parsers/api_json/example_field_map.yml b/parser/parsers/api_json/example_field_map.yml new file mode 100644 index 0000000..054ded2 --- /dev/null +++ b/parser/parsers/api_json/example_field_map.yml @@ -0,0 +1,28 @@ +# example_field_map.yml +# +# Maps file-name prefixes to column routing rules for the JSON parser. +# +# Keys are longest-prefix matched against the filename stem of each incoming +# JSON file. Values map *internal column names* (time, cml_id, sublink_id, +# rsl, tsl) to the *source field names* found in the JSON records. +# +# Columns omitted from a mapping are filled with NaN, which is correct when +# RSL and TSL arrive in separate files. +# +# Mount this file (or a deployment-specific replacement) at +# /app/config/field_map.yml inside the parser container and set +# FIELD_MAP_PATH=/app/config/field_map.yml. + +# Matches files like: mock_operator_20260101_20260102_rsl_data.json +mock_operator_rsl: + time: timestamp + cml_id: link_id + sublink_id: sublink_id + rsl: value + +# Matches files like: mock_operator_20260101_20260102_tsl_data.json +mock_operator_tsl: + time: timestamp + cml_id: link_id + sublink_id: sublink_id + tsl: value diff --git a/parser/parsers/api_json/parse_raw.py b/parser/parsers/api_json/parse_raw.py new file mode 100644 index 0000000..c155e28 --- /dev/null +++ b/parser/parsers/api_json/parse_raw.py @@ -0,0 +1,111 @@ +"""Parse a raw JSON file produced by the API fetcher. + +The JSON file is a list of records, e.g.:: + + [ + {"timestamp": "2026-01-01T00:00:00Z", "link_id": "10001", + "sublink_id": "1", "value": -45.2}, + ... + ] + +A *field map* (loaded from the path in the ``FIELD_MAP_PATH`` environment +variable) maps file-name prefixes to column renaming / routing rules:: + + # field_map.yml + mock_operator_rsl: + time: timestamp + cml_id: link_id + sublink_id: sublink_id + rsl: value # "value" field → rsl column; tsl will be NaN + mock_operator_tsl: + time: timestamp + cml_id: link_id + sublink_id: sublink_id + tsl: value # "value" field → tsl column; rsl will be NaN + +The key lookup is a *longest-prefix* match against the file-name stem +(without extension), so a single file-name prefix can cover many date-stamped +files. +""" + +import json +import logging +import os +from pathlib import Path + +import pandas as pd +import yaml + +logger = logging.getLogger(__name__) + +_FIELD_MAP_PATH_ENV = "FIELD_MAP_PATH" +_DEFAULT_FIELD_MAP_PATH = "/app/config/field_map.yml" +_REQUIRED_OUTPUT_COLS = ["time", "cml_id", "sublink_id", "rsl", "tsl"] + + +def _load_field_map(field_map_path: str) -> dict: + with open(field_map_path) as f: + return yaml.safe_load(f) + + +def _longest_prefix_match(stem: str, field_map: dict) -> str: + """Return the field-map key that is the longest prefix of *stem*.""" + candidates = [k for k in field_map if stem.startswith(k)] + if not candidates: + raise ValueError( + f"No field-map entry matches filename stem {stem!r}. " + f"Available prefixes: {sorted(field_map)}" + ) + return max(candidates, key=len) + + +def parse_api_json_raw(filepath: Path) -> pd.DataFrame: + """Load a raw API JSON file and return a normalised DataFrame. + + The returned DataFrame always has columns ``[time, cml_id, sublink_id, rsl, tsl]``. + Columns not provided by the source file are filled with ``NaN``. + + :param filepath: Path to the ``.json`` file in ``data/incoming/``. + :raises FileNotFoundError: if the file does not exist. + :raises ValueError: if no field-map entry matches the filename. + :raises KeyError: if a required source field is missing from the records. + """ + field_map_path = os.environ.get(_FIELD_MAP_PATH_ENV, _DEFAULT_FIELD_MAP_PATH) + field_map = _load_field_map(field_map_path) + + stem = filepath.stem # e.g. "mock_operator_20260101_20260102_rsl_data" + matched_key = _longest_prefix_match(stem, field_map) + mapping: dict = field_map[matched_key] # internal_col → source_field + + with open(filepath) as f: + records: list = json.load(f) + + if not records: + logger.warning("Empty JSON file: %s", filepath.name) + return pd.DataFrame(columns=_REQUIRED_OUTPUT_COLS) + + df_raw = pd.DataFrame(records) + + # Build output DataFrame column by column from the mapping + df = pd.DataFrame() + for internal_col, source_field in mapping.items(): + if source_field not in df_raw.columns: + raise KeyError( + f"Field {source_field!r} not found in {filepath.name}. " + f"Available fields: {list(df_raw.columns)}" + ) + df[internal_col] = df_raw[source_field] + + # Fill any missing output columns with NaN + for col in _REQUIRED_OUTPUT_COLS: + if col not in df.columns: + df[col] = float("nan") + + # Normalise the time column to UTC-aware datetime + df["time"] = pd.to_datetime(df["time"], utc=True) + + # Enforce expected column order + df = df[_REQUIRED_OUTPUT_COLS] + + logger.info("Parsed %d records from %s", len(df), filepath.name) + return df From 8deb1808f9442f324091a14a517b3c46c787fe1f Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Wed, 20 May 2026 23:07:10 +0200 Subject: [PATCH 4/7] feat: wire JSON support into parser and add docker-compose.dev.yml parser/service_logic.py: import parse_api_json_raw; add elif .json branch routing JSON files to write_rawdata (before else: quarantine) parser/main.py: add .json to FileWatcher extensions; add *.json glob in process_existing_files for startup replay docker-compose.dev.yml: adds mock_server (port 5001) and api_fetcher services for local development; mounts shared data volume and example field_map.yml --- docker-compose.dev.yml | 45 +++++++++++++++++++++++++++++++++++++++++ parser/main.py | 12 ++++++++++- parser/service_logic.py | 8 ++++++++ 3 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 docker-compose.dev.yml diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml new file mode 100644 index 0000000..ed19d1b --- /dev/null +++ b/docker-compose.dev.yml @@ -0,0 +1,45 @@ +# docker-compose.dev.yml +# Development overlay — adds the mock server and api_fetcher service. +# +# Usage: +# docker compose -f docker-compose.yml \ +# -f docker-compose.override.yml \ +# -f docker-compose.dev.yml \ +# up +# +# The api_fetcher is wired to use the same shared data volume as the parsers +# so files written to data/incoming/ are picked up automatically. + +services: + + mock_server: + build: + context: ./fetchers/api_fetcher/mock_server + ports: + - "5001:5001" + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:5001/login/')"] + interval: 10s + timeout: 5s + retries: 5 + + api_fetcher: + build: + context: ./fetchers + dockerfile: api_fetcher/Dockerfile + depends_on: + mock_server: + condition: service_healthy + environment: + - CONFIG_PATH=/app/api_fetcher/config.yml + - API_USERNAME=testuser + - API_PASSWORD=testpass + - INCOMING_DIR=/app/data/incoming + - STATE_PATH=/app/data/state/api_fetcher_state.json + - LOG_LEVEL=INFO + - FIELD_MAP_PATH=/app/config/field_map.yml + volumes: + # Shared data volume (same as parsers) + - ./data:/app/data + # Field map: use the example map for local dev + - ./parser/parsers/api_json/example_field_map.yml:/app/config/field_map.yml:ro diff --git a/parser/main.py b/parser/main.py index 81a4498..b0524d2 100644 --- a/parser/main.py +++ b/parser/main.py @@ -61,6 +61,16 @@ def process_existing_files(db_writer, file_manager, logger): logger.info("Found %d data file(s) to process", len(data_files)) process_rawdata_files_batch(data_files, db_writer, file_manager, logger) + # JSON files (from api_fetcher): process individually + json_files = sorted( + f for f in Config.INCOMING_DIR.glob("*.json") if f.is_file() + ) + for f in json_files: + try: + process_cml_file(f, db_writer, file_manager, logger) + except Exception: + pass + def main(): setup_logging() @@ -98,7 +108,7 @@ def on_new_file(filepath): watcher = FileWatcher( str(Config.INCOMING_DIR), on_new_file, - {".csv"}, + {".csv", ".json"}, ) watcher.start() diff --git a/parser/service_logic.py b/parser/service_logic.py index c970378..43b1995 100644 --- a/parser/service_logic.py +++ b/parser/service_logic.py @@ -9,6 +9,7 @@ import pandas as pd from .parsers.demo_csv_data.parse_raw import parse_rawdata_csv from .parsers.demo_csv_data.parse_metadata import parse_metadata_csv +from .parsers.api_json.parse_raw import parse_api_json_raw def process_rawdata_files_batch( @@ -149,6 +150,13 @@ def process_cml_file(filepath: Path, db_writer, file_manager, logger=None): file_manager.archive_file(filepath) db_writer.log_file_event(filepath.name, "archived", rows_written=rows) return "rawdata" + elif filepath.suffix.lower() == ".json": + df = parse_api_json_raw(filepath) + rows = db_writer.write_rawdata(df) + logger.info(f"Wrote {rows} data rows from {filepath.name}") + file_manager.archive_file(filepath) + db_writer.log_file_event(filepath.name, "archived", rows_written=rows) + return "rawdata" else: error_msg = f"Unsupported file type: {filepath.name}" file_manager.quarantine_file(filepath, error_msg) From edd7af218daebb4fb219b640422251fa52fed1bd Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Wed, 20 May 2026 23:12:13 +0200 Subject: [PATCH 5/7] fix: three bugs found during review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fetcher.py: in continuous mode, end was computed as start+chunk_hours; with overlap_seconds >= chunk_hours*3600 (as in the example config) the window end equalled the cursor, so the cursor never advanced (infinite loop). Fix: end = cursor_dt + chunk_hours; overlap only shifts start back. parser/main.py: process_existing_files returned early when there were no CSV files, skipping the JSON glob entirely. Fix: guard only the CSV section, always run the JSON section. mock_server/app.py + docker-compose.dev.yml: healthcheck hit GET /login/ which is POST-only; Flask returned 405, urlopen raised HTTPError, check always failed → api_fetcher never started. Fix: add GET /health endpoint, point healthcheck there. --- docker-compose.dev.yml | 2 +- fetchers/api_fetcher/fetcher.py | 8 ++++++-- fetchers/api_fetcher/mock_server/app.py | 5 +++++ parser/main.py | 27 ++++++++++++------------- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index ed19d1b..398a3da 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -18,7 +18,7 @@ services: ports: - "5001:5001" healthcheck: - test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:5001/login/')"] + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:5001/health')"] interval: 10s timeout: 5s retries: 5 diff --git a/fetchers/api_fetcher/fetcher.py b/fetchers/api_fetcher/fetcher.py index 79f16da..5ca744a 100644 --- a/fetchers/api_fetcher/fetcher.py +++ b/fetchers/api_fetcher/fetcher.py @@ -114,11 +114,15 @@ def _next_window(self) -> Optional[tuple[str, str]]: # continuous mode cursor = self.state.get_cursor(self.name) if cursor: - start = self._parse(cursor) - timedelta(seconds=self._overlap_seconds) + cursor_dt = self._parse(cursor) + # Shift the left edge back by overlap to catch late-arriving records, + # but always advance the right edge beyond the cursor. + start = cursor_dt - timedelta(seconds=self._overlap_seconds) + end = min(cursor_dt + timedelta(hours=self._chunk_hours), now) else: start = now - timedelta(hours=self._chunk_hours) + end = now - end = min(start + timedelta(hours=self._chunk_hours), now) if end <= start: return None diff --git a/fetchers/api_fetcher/mock_server/app.py b/fetchers/api_fetcher/mock_server/app.py index ec43436..68041e0 100644 --- a/fetchers/api_fetcher/mock_server/app.py +++ b/fetchers/api_fetcher/mock_server/app.py @@ -51,6 +51,11 @@ def _require_bearer() -> tuple[bool, str]: # ── Auth endpoints ──────────────────────────────────────────────────────────── +@app.get("/health") +def health(): + return jsonify({"status": "ok"}) + + @app.post("/login/") def login(): body = request.get_json(silent=True) or {} diff --git a/parser/main.py b/parser/main.py index b0524d2..ee864a9 100644 --- a/parser/main.py +++ b/parser/main.py @@ -43,23 +43,22 @@ def process_existing_files(db_writer, file_manager, logger): incoming = sorted( f for f in Config.INCOMING_DIR.glob("*.csv") if f.is_file() ) - if not incoming: - return - metadata_files = [f for f in incoming if "meta" in f.name.lower()] - data_files = [f for f in incoming if f not in set(metadata_files)] + if incoming: + metadata_files = [f for f in incoming if "meta" in f.name.lower()] + data_files = [f for f in incoming if f not in set(metadata_files)] - # Metadata files: process individually (typically just one) - for f in metadata_files: - try: - process_cml_file(f, db_writer, file_manager, logger) - except Exception: - pass + # Metadata files: process individually (typically just one) + for f in metadata_files: + try: + process_cml_file(f, db_writer, file_manager, logger) + except Exception: + pass - # Data files: batch-process for efficiency - if data_files: - logger.info("Found %d data file(s) to process", len(data_files)) - process_rawdata_files_batch(data_files, db_writer, file_manager, logger) + # Data files: batch-process for efficiency + if data_files: + logger.info("Found %d data file(s) to process", len(data_files)) + process_rawdata_files_batch(data_files, db_writer, file_manager, logger) # JSON files (from api_fetcher): process individually json_files = sorted( From d1abbe26b436d5627d666d5d61525ab2275a68da Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Wed, 20 May 2026 23:19:09 +0200 Subject: [PATCH 6/7] fix: add pyyaml to parser requirements (needed by api_json parser) --- parser/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/parser/requirements.txt b/parser/requirements.txt index 5e2b737..4325c14 100644 --- a/parser/requirements.txt +++ b/parser/requirements.txt @@ -6,5 +6,6 @@ netcdf4 xarray watchdog>=3.0.0 python-dateutil>=2.8.0 +pyyaml>=6.0 pytest pytest-cov \ No newline at end of file From 8f87cc5a976153fe1dfbd5e7f03fe69f23c3226f Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Wed, 20 May 2026 23:33:13 +0200 Subject: [PATCH 7/7] fix+test: JSON dispatch bug and add missing test coverage service_logic.py: the JSON elif was placed AFTER 'raw'/'data' in name, but all api_fetcher filenames end in _data.json and matched the CSV branch first, making the JSON branch unreachable. Fix: check .json suffix first, before the substring checks. test_service_logic.py: add test_process_cml_file_json_archived covering the repaired JSON branch (patches parse_api_json_raw, asserts write and archive calls). test_api_json_parser.py: new test module for parse_api_json_raw covering prefix matching, RSL/TSL column routing, UTC datetime normalisation, empty-file handling, and error paths (missing field, no prefix match). --- parser/service_logic.py | 16 +-- parser/tests/test_api_json_parser.py | 144 +++++++++++++++++++++++++++ parser/tests/test_service_logic.py | 21 ++++ 3 files changed, 173 insertions(+), 8 deletions(-) create mode 100644 parser/tests/test_api_json_parser.py diff --git a/parser/service_logic.py b/parser/service_logic.py index 43b1995..122da5f 100644 --- a/parser/service_logic.py +++ b/parser/service_logic.py @@ -125,7 +125,14 @@ def process_cml_file(filepath: Path, db_writer, file_manager, logger=None): raise try: - if "meta" in name: + if filepath.suffix.lower() == ".json": + df = parse_api_json_raw(filepath) + rows = db_writer.write_rawdata(df) + logger.info(f"Wrote {rows} data rows from {filepath.name}") + file_manager.archive_file(filepath) + db_writer.log_file_event(filepath.name, "archived", rows_written=rows) + return "rawdata" + elif "meta" in name: df = parse_metadata_csv(filepath) rows = db_writer.write_metadata(df) logger.info(f"Wrote {rows} metadata rows from {filepath.name}") @@ -150,13 +157,6 @@ def process_cml_file(filepath: Path, db_writer, file_manager, logger=None): file_manager.archive_file(filepath) db_writer.log_file_event(filepath.name, "archived", rows_written=rows) return "rawdata" - elif filepath.suffix.lower() == ".json": - df = parse_api_json_raw(filepath) - rows = db_writer.write_rawdata(df) - logger.info(f"Wrote {rows} data rows from {filepath.name}") - file_manager.archive_file(filepath) - db_writer.log_file_event(filepath.name, "archived", rows_written=rows) - return "rawdata" else: error_msg = f"Unsupported file type: {filepath.name}" file_manager.quarantine_file(filepath, error_msg) diff --git a/parser/tests/test_api_json_parser.py b/parser/tests/test_api_json_parser.py new file mode 100644 index 0000000..63ddc00 --- /dev/null +++ b/parser/tests/test_api_json_parser.py @@ -0,0 +1,144 @@ +"""Tests for parser/parsers/api_json/parse_raw.py.""" + +import json +import os + +import pandas as pd +import pytest + +from ..parsers.api_json.parse_raw import ( + _longest_prefix_match, + parse_api_json_raw, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_FIELD_MAP = { + "mock_operator_rsl": { + "time": "timestamp", + "cml_id": "link_id", + "sublink_id": "sublink_id", + "rsl": "value", + }, + "mock_operator_tsl": { + "time": "timestamp", + "cml_id": "link_id", + "sublink_id": "sublink_id", + "tsl": "value", + }, +} + +_RECORDS = [ + {"timestamp": "2026-01-01T00:00:00Z", "link_id": "10001", "sublink_id": "1", "value": -45.2}, + {"timestamp": "2026-01-01T01:00:00Z", "link_id": "10001", "sublink_id": "1", "value": -44.8}, +] + + +def _write_field_map(tmp_path, content=None) -> str: + import yaml + path = tmp_path / "field_map.yml" + path.write_text(yaml.dump(content or _FIELD_MAP)) + return str(path) + + +def _write_json(tmp_path, name, records=None) -> object: + from pathlib import Path + p = tmp_path / name + p.write_text(json.dumps(records if records is not None else _RECORDS)) + return p + + +# --------------------------------------------------------------------------- +# _longest_prefix_match +# --------------------------------------------------------------------------- + +def test_prefix_match_exact(): + assert _longest_prefix_match("mock_operator_rsl_20260101", _FIELD_MAP) == "mock_operator_rsl" + + +def test_prefix_match_longest_wins(): + fm = {"mock": {}, "mock_operator_rsl": {}} + assert _longest_prefix_match("mock_operator_rsl_20260101", fm) == "mock_operator_rsl" + + +def test_prefix_match_no_match_raises(): + with pytest.raises(ValueError, match="No field-map entry"): + _longest_prefix_match("unknown_operator_20260101", _FIELD_MAP) + + +# --------------------------------------------------------------------------- +# parse_api_json_raw — happy paths +# --------------------------------------------------------------------------- + +def test_parse_rsl_file(tmp_path, monkeypatch): + fm_path = _write_field_map(tmp_path) + monkeypatch.setenv("FIELD_MAP_PATH", fm_path) + f = _write_json(tmp_path, "mock_operator_rsl_20260101_20260102_data.json") + + df = parse_api_json_raw(f) + + assert list(df.columns) == ["time", "cml_id", "sublink_id", "rsl", "tsl"] + assert len(df) == 2 + assert df["rsl"].notna().all() + assert df["tsl"].isna().all() + assert df["cml_id"].iloc[0] == "10001" + + +def test_parse_tsl_file(tmp_path, monkeypatch): + fm_path = _write_field_map(tmp_path) + monkeypatch.setenv("FIELD_MAP_PATH", fm_path) + f = _write_json(tmp_path, "mock_operator_tsl_20260101_20260102_data.json") + + df = parse_api_json_raw(f) + + assert df["tsl"].notna().all() + assert df["rsl"].isna().all() + + +def test_time_column_is_utc_datetime(tmp_path, monkeypatch): + fm_path = _write_field_map(tmp_path) + monkeypatch.setenv("FIELD_MAP_PATH", fm_path) + f = _write_json(tmp_path, "mock_operator_rsl_20260101_data.json") + + df = parse_api_json_raw(f) + + assert pd.api.types.is_datetime64_any_dtype(df["time"]) + assert str(df["time"].dt.tz) == "UTC" + + +def test_empty_json_file_returns_empty_df(tmp_path, monkeypatch): + fm_path = _write_field_map(tmp_path) + monkeypatch.setenv("FIELD_MAP_PATH", fm_path) + f = _write_json(tmp_path, "mock_operator_rsl_20260101_data.json", records=[]) + + df = parse_api_json_raw(f) + + assert isinstance(df, pd.DataFrame) + assert len(df) == 0 + assert list(df.columns) == ["time", "cml_id", "sublink_id", "rsl", "tsl"] + + +# --------------------------------------------------------------------------- +# parse_api_json_raw — error paths +# --------------------------------------------------------------------------- + +def test_missing_source_field_raises(tmp_path, monkeypatch): + fm_path = _write_field_map(tmp_path) + monkeypatch.setenv("FIELD_MAP_PATH", fm_path) + records = [{"timestamp": "2026-01-01T00:00:00Z", "link_id": "10001", "sublink_id": "1"}] + # 'value' key is absent → KeyError + f = _write_json(tmp_path, "mock_operator_rsl_20260101_data.json", records=records) + + with pytest.raises(KeyError, match="value"): + parse_api_json_raw(f) + + +def test_no_matching_prefix_raises(tmp_path, monkeypatch): + fm_path = _write_field_map(tmp_path) + monkeypatch.setenv("FIELD_MAP_PATH", fm_path) + f = _write_json(tmp_path, "unknown_operator_20260101_data.json") + + with pytest.raises(ValueError, match="No field-map entry"): + parse_api_json_raw(f) diff --git a/parser/tests/test_service_logic.py b/parser/tests/test_service_logic.py index b5d2a7b..aec63a5 100644 --- a/parser/tests/test_service_logic.py +++ b/parser/tests/test_service_logic.py @@ -246,6 +246,27 @@ def test_process_cml_file_rawdata_archived(tmp_path, mock_db_writer, mock_file_m ) +def test_process_cml_file_json_archived(tmp_path, mock_db_writer, mock_file_manager): + """JSON files are parsed via parse_api_json_raw, written, and archived.""" + f = tmp_path / "mock_operator_rsl_20260101_data.json" + f.write_text('[{"timestamp":"2026-01-01T00:00:00Z","link_id":"10001","sublink_id":"1","value":-45.2}]') + mock_db_writer.write_rawdata.return_value = 1 + + fake_df = pd.DataFrame( + {"time": ["2026-01-01T00:00:00Z"], "cml_id": ["10001"], + "sublink_id": ["1"], "rsl": [-45.2], "tsl": [float("nan")]} + ) + with patch("parser.service_logic.parse_api_json_raw", return_value=fake_df): + result = process_cml_file(f, mock_db_writer, mock_file_manager) + + assert result == "rawdata" + mock_db_writer.write_rawdata.assert_called_once_with(fake_df) + mock_file_manager.archive_file.assert_called_once_with(f) + mock_db_writer.log_file_event.assert_called_once_with( + f.name, "archived", rows_written=1 + ) + + def test_process_cml_file_unsupported_quarantined( tmp_path, mock_db_writer, mock_file_manager ):