From dffb16808cb5eb8e428d6854ede4d462dae1fe48 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Mon, 18 May 2026 19:23:07 -0500 Subject: [PATCH 1/2] feat(waterdata): add get_waterdata for generalized CQL2 queries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Python analogue of R ``dataRetrieval::read_waterdata``. The typed ``get_*`` wrappers (``get_daily``, ``get_continuous``, …) only support exact-equality predicates on whitelisted parameters. Some users need more — top-level ``or``, ``like`` with ``%`` wildcards, comparison operators, nested boolean trees — and today have no surface for it. ``get_waterdata(service, cql, ...)`` accepts a raw CQL2 query (``dict`` or pre-serialized JSON string) and POSTs it against any recognized collection, then walks pages and post-processes the result with the same pipeline the typed wrappers use. Reuses existing infrastructure: ``_walk_pages``, ``_deal_with_empty``, ``_arrange_cols``, ``_type_cols``, ``_sort_rows``, and ``_switch_properties_id``. The new pieces are: - ``_OUTPUT_ID_BY_SERVICE`` (utils.py) — a single mapping from service name to the renamed-``id`` column the rest of the package exposes, hoisted from the typed wrappers so the generalized entry point can pick the right one. - ``_construct_cql_request`` (utils.py) — focused POST/CQL2 request builder; distinct from ``_construct_api_requests`` because the body comes in verbatim rather than being derived from typed kwargs. - ``get_waterdata`` (api.py) — public entry point. CQL2 grammar reference: https://api.waterdata.usgs.gov/docs/ogcapi/complex-queries/ Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/__init__.py | 2 + dataretrieval/waterdata/api.py | 160 ++++++++++++++++++++++++++++ dataretrieval/waterdata/utils.py | 73 +++++++++++++ 3 files changed, 235 insertions(+) diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index f81966c4..847b3a9d 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -28,6 +28,7 @@ get_stats_date_range, get_stats_por, get_time_series_metadata, + get_waterdata, ) from .filters import FILTER_LANG from .nearest import get_nearest_continuous @@ -64,4 +65,5 @@ "get_stats_date_range", "get_stats_por", "get_time_series_metadata", + "get_waterdata", ] diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index ad268194..bf45c8ef 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -26,10 +26,20 @@ SERVICES, ) from dataretrieval.waterdata.utils import ( + _OUTPUT_ID_BY_SERVICE, + GEOPANDAS, SAMPLES_URL, + _arrange_cols, _check_profiles, + _construct_cql_request, + _deal_with_empty, _default_headers, _get_args, + _normalize_str_iterable, + _sort_rows, + _switch_properties_id, + _type_cols, + _walk_pages, get_ogc_data, get_stats_data, ) @@ -2832,3 +2842,153 @@ def get_channel( args = _get_args(locals()) return get_ogc_data(args, output_id, service) + + +def get_waterdata( + service: str, + cql: str | dict, + *, + properties: str | Iterable[str] | None = None, + bbox: list[float] | None = None, + limit: int | None = None, + skip_geometry: bool | None = None, + convert_type: bool = True, + client: requests.Session | None = None, +) -> tuple[pd.DataFrame, BaseMetadata]: + """Generalized OGC API CQL2 query. + + Python analogue of R ``dataRetrieval::read_waterdata``. Use this + when you need a predicate the typed wrappers (``get_daily``, + ``get_continuous``, …) can't express — top-level ``or``, ``like`` + with ``%`` wildcards, comparison operators, nested boolean trees, + geometry-based predicates beyond a bbox, and so on. The typed + wrappers are nicer when they cover the case; reach for this when + they don't. + + The CQL2 grammar is documented at + https://api.waterdata.usgs.gov/docs/ogcapi/complex-queries/. + + Parameters + ---------- + service : str + OGC collection name (e.g. ``"daily"``, ``"monitoring-locations"``). + See :data:`dataretrieval.waterdata.utils._OUTPUT_ID_BY_SERVICE` + for the recognized list. + cql : str or dict + CQL2 query. A ``dict`` is JSON-serialized for transport; a + ``str`` is sent through unchanged. The query goes into the + HTTP POST body with ``Content-Type: + application/query-cql-json``. + properties : str or iterable of str, optional + Server-side property whitelist (passed as ``properties=`` on + the URL). Reduces payload size and bypasses the response-shape + post-processing for any column not listed. ``"id"`` resolves + to the service's ``output_id`` (e.g. ``daily_id``) the same way + it does in the typed wrappers. + bbox : list of float, optional + Bounding box ``[xmin, ymin, xmax, ymax]`` in CRS 4326. + limit : int, optional + Page size, clamped server-side to 50,000. + skip_geometry : bool, optional + If True, the server omits geometry from each feature + (``skipGeometry=true``). + convert_type : bool, default True + Coerce date/datetime/numeric columns to typed dtypes after the + DataFrame is built. + client : requests.Session, optional + Reuse an existing HTTP session (handy for batching or + injecting custom retry/auth adapters). A short-lived session + is created internally if not provided. + + Returns + ------- + df : pandas.DataFrame or geopandas.GeoDataFrame + Result of the query. GeoDataFrame when ``geopandas`` is + installed and geometry is present. + md : :class:`dataretrieval.utils.BaseMetadata` + Request metadata (URL, query time, response headers). + + Examples + -------- + .. code:: + + >>> # Daily values for two parameter codes at two sites + >>> # (compound AND-of-INs). + >>> from dataretrieval import waterdata + >>> cql = { + ... "op": "and", + ... "args": [ + ... { + ... "op": "in", + ... "args": [ + ... {"property": "parameter_code"}, + ... ["00060", "00065"], + ... ], + ... }, + ... { + ... "op": "in", + ... "args": [ + ... {"property": "monitoring_location_id"}, + ... ["USGS-07367300", "USGS-03277200"], + ... ], + ... }, + ... ], + ... } + >>> df, md = waterdata.get_waterdata( + ... service="daily", + ... cql=cql, + ... time=("2023-01-01", "2024-01-01"), + ... ) + + >>> # Monitoring locations whose HUC starts with "02070010" + >>> # (LIKE with the CQL2 ``%`` wildcard). + >>> df, md = waterdata.get_waterdata( + ... service="monitoring-locations", + ... cql='{"op": "like", "args": [' + ... '{"property": "hydrologic_unit_code"},' + ... ' "02070010%"]}', + ... ) + """ + if service not in _OUTPUT_ID_BY_SERVICE: + raise ValueError( + f"Unknown service {service!r}. Valid services: " + f"{sorted(_OUTPUT_ID_BY_SERVICE)}." + ) + output_id = _OUTPUT_ID_BY_SERVICE[service] + + # ``dict`` is the pythonic input — serialize on the way out. ``str`` + # is sent verbatim so callers who already have a CQL2 doc (e.g. + # imported from a config file) don't need to re-parse it. + body = json.dumps(cql, separators=(",", ":")) if isinstance(cql, dict) else cql + + if isinstance(properties, str): + properties_list = [properties] + elif properties is None: + properties_list = None + else: + properties_list = _normalize_str_iterable(properties, "properties") + + # Translate user-facing names (``daily_id``) to the wire-format + # ``id`` the OGC API expects, matching the typed wrappers. + wire_properties = _switch_properties_id( + properties_list, id_name=output_id, service=service + ) + + req = _construct_cql_request( + service=service, + cql_body=body, + properties=wire_properties or None, + bbox=bbox, + limit=limit, + skip_geometry=skip_geometry, + ) + + df, response = _walk_pages(geopd=GEOPANDAS, req=req, client=client) + + df = _deal_with_empty(df, properties_list, service) + if convert_type: + df = _type_cols(df) + df = _arrange_cols(df, properties_list, output_id) + df = _sort_rows(df) + + return df, BaseMetadata(response) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 91228357..cc4f784a 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -157,6 +157,26 @@ def _switch_properties_id(properties: list[str] | None, id_name: str, service: s # parameters and require POST with CQL2 JSON instead. _CQL2_REQUIRED_SERVICES = frozenset({"monitoring-locations"}) +# Service name → the column the rest of the package exposes the +# per-record OGC ``id`` under. Used by the generalized +# :func:`get_waterdata` entry point, which doesn't have a per-service +# wrapper to hard-code this in. Values here match what each typed +# ``get_*`` function in :mod:`dataretrieval.waterdata.api` uses for +# its own ``output_id``. +_OUTPUT_ID_BY_SERVICE: dict[str, str] = { + "daily": "daily_id", + "continuous": "continuous_id", + "latest-continuous": "latest_continuous_id", + "latest-daily": "latest_daily_id", + "field-measurements": "field_measurement_id", + "field-measurements-metadata": "field_series_id", + "monitoring-locations": "monitoring_location_id", + "time-series-metadata": "time_series_id", + "combined-metadata": "combined_meta_id", + "peaks": "peak_id", + "channel-measurements": "channel_measurements_id", +} + def _parse_datetime(value: str) -> datetime | None: """Parse a single datetime string against the supported formats. @@ -528,6 +548,59 @@ def _construct_api_requests( return request.prepare() +def _construct_cql_request( + service: str, + cql_body: str, + properties: list[str] | None = None, + bbox: list[float] | None = None, + limit: int | None = None, + skip_geometry: bool | None = None, +) -> requests.PreparedRequest: + """Build a POST/CQL2 request for the generalized ``get_waterdata`` path. + + Distinct from :func:`_construct_api_requests` because that function + derives the CQL2 body from typed kwargs; here the body is passed + through verbatim so a caller can express predicates the typed + wrappers can't (top-level ``or``, ``like`` with ``%`` wildcards, + comparison operators, …). + + Parameters + ---------- + service : str + Collection name, e.g. ``"daily"``. + cql_body : str + Pre-serialized CQL2-JSON. Sent as the request body unchanged. + properties : list of str, optional + Server-side property whitelist. Joined with commas. + bbox : list of float, optional + Bounding box ``[xmin, ymin, xmax, ymax]``. Joined with commas. + limit : int, optional + Page size; clamped to the server max of 50,000. + skip_geometry : bool, optional + If True, sets ``skipGeometry=true`` so the server omits + geometry from each feature. + """ + service_url = f"{OGC_API_URL}/collections/{service}/items" + params: dict[str, Any] = { + "skipGeometry": bool(skip_geometry), + "limit": 50000 if limit is None or limit > 50000 else limit, + } + if bbox is not None and len(bbox) > 0: + params["bbox"] = ",".join(map(str, bbox)) + if properties: + params["properties"] = ",".join(properties) + headers = _default_headers() + headers["Content-Type"] = "application/query-cql-json" + request = requests.Request( + method="POST", + url=service_url, + headers=headers, + data=cql_body, + params=params, + ) + return request.prepare() + + def _next_req_url(resp: requests.Response) -> str | None: """ Extracts the URL for the next page of results from an HTTP response from a From 1383ce9f4ac0008c5bb8306ceacc90a7addfe360 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Mon, 18 May 2026 19:43:08 -0500 Subject: [PATCH 2/2] refactor(waterdata): simplify get_waterdata internals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code-review pass on PR #284. - Lift ``WATERDATA_SERVICES`` Literal into ``types.py``. Use it as the ``service`` arg type of ``get_waterdata`` so editors offer completion and type-checkers catch typos. The runtime source of truth (``_OUTPUT_ID_BY_SERVICE`` in utils.py) is unchanged; the Literal is kept in sync by hand and a comment notes that. - Extract ``_ogc_query_params(properties, bbox, limit, skip_geometry)`` in utils.py. The same ``skipGeometry``/``limit``/``bbox``/``properties`` block previously appeared twice — once in ``_construct_api_requests`` and once in the new ``_construct_cql_request`` — and is now built in one place. - Extract ``_finalize_ogc_frame(df, response, properties, service, output_id, convert_type)`` for the post-processing tail (``_deal_with_empty`` -> ``_type_cols`` -> ``_arrange_cols`` -> ``_sort_rows`` -> ``BaseMetadata``). Both ``get_ogc_data`` and ``get_waterdata`` route through it now, so the typed-kwargs and raw-CQL2 paths produce identically-shaped DataFrames by construction rather than by parallel maintenance. - Drop the ``client`` kwarg from ``get_waterdata``. None of the other public ``get_*`` getters expose it, and the rationale (HTTP session reuse) applies to all of them or none. If we want to expose session reuse, that's a separate PR that touches the whole family. - Collapse the ``properties`` normalization block to None-first ordering so the common case (no properties) reads first. - Drop the docstring breadcrumb to ``utils._OUTPUT_ID_BY_SERVICE``; point readers at ``types.WATERDATA_SERVICES`` (the user-facing Literal) instead. All 148 unit tests pass; ``_construct_api_requests`` and ``_construct_cql_request`` produce byte-identical requests to before. --- dataretrieval/waterdata/api.py | 47 ++++++--------- dataretrieval/waterdata/types.py | 20 +++++++ dataretrieval/waterdata/utils.py | 98 +++++++++++++++++--------------- 3 files changed, 89 insertions(+), 76 deletions(-) diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index bf45c8ef..e4002c68 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -24,21 +24,19 @@ METADATA_COLLECTIONS, PROFILES, SERVICES, + WATERDATA_SERVICES, ) from dataretrieval.waterdata.utils import ( _OUTPUT_ID_BY_SERVICE, GEOPANDAS, SAMPLES_URL, - _arrange_cols, _check_profiles, _construct_cql_request, - _deal_with_empty, _default_headers, + _finalize_ogc_frame, _get_args, _normalize_str_iterable, - _sort_rows, _switch_properties_id, - _type_cols, _walk_pages, get_ogc_data, get_stats_data, @@ -2845,7 +2843,7 @@ def get_channel( def get_waterdata( - service: str, + service: WATERDATA_SERVICES, cql: str | dict, *, properties: str | Iterable[str] | None = None, @@ -2853,7 +2851,6 @@ def get_waterdata( limit: int | None = None, skip_geometry: bool | None = None, convert_type: bool = True, - client: requests.Session | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Generalized OGC API CQL2 query. @@ -2871,9 +2868,9 @@ def get_waterdata( Parameters ---------- service : str - OGC collection name (e.g. ``"daily"``, ``"monitoring-locations"``). - See :data:`dataretrieval.waterdata.utils._OUTPUT_ID_BY_SERVICE` - for the recognized list. + OGC collection name. Must be one of + :data:`dataretrieval.waterdata.types.WATERDATA_SERVICES` + (e.g. ``"daily"``, ``"monitoring-locations"``). cql : str or dict CQL2 query. A ``dict`` is JSON-serialized for transport; a ``str`` is sent through unchanged. The query goes into the @@ -2886,7 +2883,8 @@ def get_waterdata( to the service's ``output_id`` (e.g. ``daily_id``) the same way it does in the typed wrappers. bbox : list of float, optional - Bounding box ``[xmin, ymin, xmax, ymax]`` in CRS 4326. + Bounding box ``[xmin, ymin, xmax, ymax]`` in CRS 4326. Combines + with the CQL filter as an additional spatial predicate. limit : int, optional Page size, clamped server-side to 50,000. skip_geometry : bool, optional @@ -2895,10 +2893,6 @@ def get_waterdata( convert_type : bool, default True Coerce date/datetime/numeric columns to typed dtypes after the DataFrame is built. - client : requests.Session, optional - Reuse an existing HTTP session (handy for batching or - injecting custom retry/auth adapters). A short-lived session - is created internally if not provided. Returns ------- @@ -2934,11 +2928,7 @@ def get_waterdata( ... }, ... ], ... } - >>> df, md = waterdata.get_waterdata( - ... service="daily", - ... cql=cql, - ... time=("2023-01-01", "2024-01-01"), - ... ) + >>> df, md = waterdata.get_waterdata(service="daily", cql=cql) >>> # Monitoring locations whose HUC starts with "02070010" >>> # (LIKE with the CQL2 ``%`` wildcard). @@ -2961,10 +2951,10 @@ def get_waterdata( # imported from a config file) don't need to re-parse it. body = json.dumps(cql, separators=(",", ":")) if isinstance(cql, dict) else cql - if isinstance(properties, str): - properties_list = [properties] - elif properties is None: + if properties is None: properties_list = None + elif isinstance(properties, str): + properties_list = [properties] else: properties_list = _normalize_str_iterable(properties, "properties") @@ -2983,12 +2973,7 @@ def get_waterdata( skip_geometry=skip_geometry, ) - df, response = _walk_pages(geopd=GEOPANDAS, req=req, client=client) - - df = _deal_with_empty(df, properties_list, service) - if convert_type: - df = _type_cols(df) - df = _arrange_cols(df, properties_list, output_id) - df = _sort_rows(df) - - return df, BaseMetadata(response) + df, response = _walk_pages(geopd=GEOPANDAS, req=req) + return _finalize_ogc_frame( + df, response, properties_list, service, output_id, convert_type + ) diff --git a/dataretrieval/waterdata/types.py b/dataretrieval/waterdata/types.py index f5e1496b..c22930e4 100644 --- a/dataretrieval/waterdata/types.py +++ b/dataretrieval/waterdata/types.py @@ -40,6 +40,26 @@ "results", ] +# OGC API collection names that the typed waterdata getters cover. +# Used as the ``service`` arg type of :func:`get_waterdata`. Keep in +# sync with ``_OUTPUT_ID_BY_SERVICE`` in +# :mod:`dataretrieval.waterdata.utils` — the dict is the source of +# truth at runtime; this Literal exists so editors can offer +# completion and type-checkers can catch typos at call sites. +WATERDATA_SERVICES = Literal[ + "channel-measurements", + "combined-metadata", + "continuous", + "daily", + "field-measurements", + "field-measurements-metadata", + "latest-continuous", + "latest-daily", + "monitoring-locations", + "peaks", + "time-series-metadata", +] + PROFILES = Literal[ "actgroup", "actmetric", diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index cc4f784a..b4f90fc5 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -512,14 +512,7 @@ def _construct_api_requests( for k, v in kwargs.items() } - params["skipGeometry"] = skip_geometry - params["limit"] = 50000 if limit is None or limit > 50000 else limit - - # `len()` instead of truthiness: a numpy ndarray would raise on `if bbox:`. - if bbox is not None and len(bbox) > 0: - params["bbox"] = ",".join(map(str, bbox)) - if properties: - params["properties"] = ",".join(properties) + params.update(_ogc_query_params(properties, bbox, limit, skip_geometry)) # Translate CQL filter Python names to the hyphenated URL parameter that # the OGC API expects. The Python kwarg is `filter_lang` because hyphens @@ -548,6 +541,26 @@ def _construct_api_requests( return request.prepare() +def _ogc_query_params( + properties: list[str] | None = None, + bbox: list[float] | None = None, + limit: int | None = None, + skip_geometry: bool | None = None, +) -> dict[str, Any]: + """The ``skipGeometry``/``limit``/``bbox``/``properties`` block of + an OGC URL, used by both :func:`_construct_api_requests` and + :func:`_construct_cql_request`.""" + params: dict[str, Any] = { + "skipGeometry": skip_geometry, + "limit": 50000 if limit is None or limit > 50000 else limit, + } + if bbox is not None and len(bbox) > 0: + params["bbox"] = ",".join(map(str, bbox)) + if properties: + params["properties"] = ",".join(properties) + return params + + def _construct_cql_request( service: str, cql_body: str, @@ -558,37 +571,13 @@ def _construct_cql_request( ) -> requests.PreparedRequest: """Build a POST/CQL2 request for the generalized ``get_waterdata`` path. - Distinct from :func:`_construct_api_requests` because that function - derives the CQL2 body from typed kwargs; here the body is passed - through verbatim so a caller can express predicates the typed - wrappers can't (top-level ``or``, ``like`` with ``%`` wildcards, - comparison operators, …). - - Parameters - ---------- - service : str - Collection name, e.g. ``"daily"``. - cql_body : str - Pre-serialized CQL2-JSON. Sent as the request body unchanged. - properties : list of str, optional - Server-side property whitelist. Joined with commas. - bbox : list of float, optional - Bounding box ``[xmin, ymin, xmax, ymax]``. Joined with commas. - limit : int, optional - Page size; clamped to the server max of 50,000. - skip_geometry : bool, optional - If True, sets ``skipGeometry=true`` so the server omits - geometry from each feature. + Distinct from :func:`_construct_api_requests`: that function derives + the CQL2 body from typed kwargs and also handles the GET-with-comma- + separated-values path. Here the body is passed through verbatim so + a caller can express predicates the typed wrappers can't (top-level + ``or``, ``like`` with ``%`` wildcards, comparison operators, …). """ service_url = f"{OGC_API_URL}/collections/{service}/items" - params: dict[str, Any] = { - "skipGeometry": bool(skip_geometry), - "limit": 50000 if limit is None or limit > 50000 else limit, - } - if bbox is not None and len(bbox) > 0: - params["bbox"] = ",".join(map(str, bbox)) - if properties: - params["properties"] = ",".join(properties) headers = _default_headers() headers["Content-Type"] = "application/query-cql-json" request = requests.Request( @@ -596,7 +585,7 @@ def _construct_cql_request( url=service_url, headers=headers, data=cql_body, - params=params, + params=_ogc_query_params(properties, bbox, limit, skip_geometry), ) return request.prepare() @@ -941,6 +930,29 @@ def _sort_rows(df: pd.DataFrame) -> pd.DataFrame: return df +def _finalize_ogc_frame( + df: pd.DataFrame, + response: requests.Response, + properties: list[str] | None, + service: str, + output_id: str, + convert_type: bool, +) -> tuple[pd.DataFrame, BaseMetadata]: + """Apply the standard OGC post-processing tail to a raw frame and + wrap the response in :class:`BaseMetadata`. + + Shared by :func:`get_ogc_data` (typed-kwargs path) and + :func:`dataretrieval.waterdata.api.get_waterdata` (raw-CQL2 path) + so both surfaces produce identically-shaped DataFrames. + """ + df = _deal_with_empty(df, properties, service) + if convert_type: + df = _type_cols(df) + df = _arrange_cols(df, properties, output_id) + df = _sort_rows(df) + return df, BaseMetadata(response) + + def get_ogc_data( args: dict[str, Any], output_id: str, service: str ) -> tuple[pd.DataFrame, BaseMetadata]: @@ -987,13 +999,9 @@ def get_ogc_data( args = {k: v for k, v in args.items() if v is not None} return_list, response = _fetch_once(args) - return_list = _deal_with_empty(return_list, properties, service) - if convert_type: - return_list = _type_cols(return_list) - return_list = _arrange_cols(return_list, properties, output_id) - return_list = _sort_rows(return_list) - - return return_list, BaseMetadata(response) + return _finalize_ogc_frame( + return_list, response, properties, service, output_id, convert_type + ) @filters.chunked(build_request=_construct_api_requests)