Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dataretrieval/waterdata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
get_stats_date_range,
get_stats_por,
get_time_series_metadata,
get_waterdata,
)
from .filters import FILTER_LANG
from .nearest import get_nearest_continuous
Expand Down Expand Up @@ -64,4 +65,5 @@
"get_stats_date_range",
"get_stats_por",
"get_time_series_metadata",
"get_waterdata",
]
145 changes: 145 additions & 0 deletions dataretrieval/waterdata/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@
METADATA_COLLECTIONS,
PROFILES,
SERVICES,
WATERDATA_SERVICES,
)
from dataretrieval.waterdata.utils import (
_OUTPUT_ID_BY_SERVICE,
GEOPANDAS,
SAMPLES_URL,
_check_profiles,
_construct_cql_request,
_default_headers,
_finalize_ogc_frame,
_get_args,
_normalize_str_iterable,
_switch_properties_id,
_walk_pages,
get_ogc_data,
get_stats_data,
)
Expand Down Expand Up @@ -2832,3 +2840,140 @@ def get_channel(
args = _get_args(locals())

return get_ogc_data(args, output_id, service)


def get_waterdata(
service: WATERDATA_SERVICES,
cql: str | dict,
*,
properties: str | Iterable[str] | None = None,
bbox: list[float] | None = None,
limit: int | None = None,
skip_geometry: bool | None = None,
convert_type: bool = True,
) -> tuple[pd.DataFrame, BaseMetadata]:
"""Generalized OGC API CQL2 query.

Python analogue of R ``dataRetrieval::read_waterdata``. Use this
when you need a predicate the typed wrappers (``get_daily``,
``get_continuous``, …) can't express — top-level ``or``, ``like``
with ``%`` wildcards, comparison operators, nested boolean trees,
geometry-based predicates beyond a bbox, and so on. The typed
wrappers are nicer when they cover the case; reach for this when
they don't.

The CQL2 grammar is documented at
https://api.waterdata.usgs.gov/docs/ogcapi/complex-queries/.

Parameters
----------
service : str
OGC collection name. Must be one of
:data:`dataretrieval.waterdata.types.WATERDATA_SERVICES`
(e.g. ``"daily"``, ``"monitoring-locations"``).
cql : str or dict
CQL2 query. A ``dict`` is JSON-serialized for transport; a
``str`` is sent through unchanged. The query goes into the
HTTP POST body with ``Content-Type:
application/query-cql-json``.
properties : str or iterable of str, optional
Server-side property whitelist (passed as ``properties=`` on
the URL). Reduces payload size and bypasses the response-shape
post-processing for any column not listed. ``"id"`` resolves
to the service's ``output_id`` (e.g. ``daily_id``) the same way
it does in the typed wrappers.
bbox : list of float, optional
Bounding box ``[xmin, ymin, xmax, ymax]`` in CRS 4326. Combines
with the CQL filter as an additional spatial predicate.
limit : int, optional
Page size, clamped server-side to 50,000.
skip_geometry : bool, optional
If True, the server omits geometry from each feature
(``skipGeometry=true``).
convert_type : bool, default True
Coerce date/datetime/numeric columns to typed dtypes after the
DataFrame is built.

Returns
-------
df : pandas.DataFrame or geopandas.GeoDataFrame
Result of the query. GeoDataFrame when ``geopandas`` is
installed and geometry is present.
md : :class:`dataretrieval.utils.BaseMetadata`
Request metadata (URL, query time, response headers).

Examples
--------
.. code::

>>> # Daily values for two parameter codes at two sites
>>> # (compound AND-of-INs).
>>> from dataretrieval import waterdata
>>> cql = {
... "op": "and",
... "args": [
... {
... "op": "in",
... "args": [
... {"property": "parameter_code"},
... ["00060", "00065"],
... ],
... },
... {
... "op": "in",
... "args": [
... {"property": "monitoring_location_id"},
... ["USGS-07367300", "USGS-03277200"],
... ],
... },
... ],
... }
>>> df, md = waterdata.get_waterdata(service="daily", cql=cql)

>>> # Monitoring locations whose HUC starts with "02070010"
>>> # (LIKE with the CQL2 ``%`` wildcard).
>>> df, md = waterdata.get_waterdata(
... service="monitoring-locations",
... cql='{"op": "like", "args": ['
... '{"property": "hydrologic_unit_code"},'
... ' "02070010%"]}',
... )
"""
if service not in _OUTPUT_ID_BY_SERVICE:
raise ValueError(
f"Unknown service {service!r}. Valid services: "
f"{sorted(_OUTPUT_ID_BY_SERVICE)}."
)
output_id = _OUTPUT_ID_BY_SERVICE[service]

# ``dict`` is the pythonic input — serialize on the way out. ``str``
# is sent verbatim so callers who already have a CQL2 doc (e.g.
# imported from a config file) don't need to re-parse it.
body = json.dumps(cql, separators=(",", ":")) if isinstance(cql, dict) else cql

if properties is None:
properties_list = None
elif isinstance(properties, str):
properties_list = [properties]
else:
properties_list = _normalize_str_iterable(properties, "properties")

# Translate user-facing names (``daily_id``) to the wire-format
# ``id`` the OGC API expects, matching the typed wrappers.
wire_properties = _switch_properties_id(
properties_list, id_name=output_id, service=service
)

req = _construct_cql_request(
service=service,
cql_body=body,
properties=wire_properties or None,
bbox=bbox,
limit=limit,
skip_geometry=skip_geometry,
)

df, response = _walk_pages(geopd=GEOPANDAS, req=req)
return _finalize_ogc_frame(
df, response, properties_list, service, output_id, convert_type
)
20 changes: 20 additions & 0 deletions dataretrieval/waterdata/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@
"results",
]

# OGC API collection names that the typed waterdata getters cover.
# Used as the ``service`` arg type of :func:`get_waterdata`. Keep in
# sync with ``_OUTPUT_ID_BY_SERVICE`` in
# :mod:`dataretrieval.waterdata.utils` — the dict is the source of
# truth at runtime; this Literal exists so editors can offer
# completion and type-checkers can catch typos at call sites.
WATERDATA_SERVICES = Literal[
"channel-measurements",
"combined-metadata",
"continuous",
"daily",
"field-measurements",
"field-measurements-metadata",
"latest-continuous",
"latest-daily",
"monitoring-locations",
"peaks",
"time-series-metadata",
]

PROFILES = Literal[
"actgroup",
"actmetric",
Expand Down
111 changes: 96 additions & 15 deletions dataretrieval/waterdata/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,26 @@ def _switch_properties_id(properties: list[str] | None, id_name: str, service: s
# parameters and require POST with CQL2 JSON instead.
_CQL2_REQUIRED_SERVICES = frozenset({"monitoring-locations"})

# Service name → the column the rest of the package exposes the
# per-record OGC ``id`` under. Used by the generalized
# :func:`get_waterdata` entry point, which doesn't have a per-service
# wrapper to hard-code this in. Values here match what each typed
# ``get_*`` function in :mod:`dataretrieval.waterdata.api` uses for
# its own ``output_id``.
_OUTPUT_ID_BY_SERVICE: dict[str, str] = {
"daily": "daily_id",
"continuous": "continuous_id",
"latest-continuous": "latest_continuous_id",
"latest-daily": "latest_daily_id",
"field-measurements": "field_measurement_id",
"field-measurements-metadata": "field_series_id",
"monitoring-locations": "monitoring_location_id",
"time-series-metadata": "time_series_id",
"combined-metadata": "combined_meta_id",
"peaks": "peak_id",
"channel-measurements": "channel_measurements_id",
}


def _parse_datetime(value: str) -> datetime | None:
"""Parse a single datetime string against the supported formats.
Expand Down Expand Up @@ -492,14 +512,7 @@ def _construct_api_requests(
for k, v in kwargs.items()
}

params["skipGeometry"] = skip_geometry
params["limit"] = 50000 if limit is None or limit > 50000 else limit

# `len()` instead of truthiness: a numpy ndarray would raise on `if bbox:`.
if bbox is not None and len(bbox) > 0:
params["bbox"] = ",".join(map(str, bbox))
if properties:
params["properties"] = ",".join(properties)
params.update(_ogc_query_params(properties, bbox, limit, skip_geometry))

# Translate CQL filter Python names to the hyphenated URL parameter that
# the OGC API expects. The Python kwarg is `filter_lang` because hyphens
Expand Down Expand Up @@ -528,6 +541,55 @@ def _construct_api_requests(
return request.prepare()


def _ogc_query_params(
properties: list[str] | None = None,
bbox: list[float] | None = None,
limit: int | None = None,
skip_geometry: bool | None = None,
) -> dict[str, Any]:
"""The ``skipGeometry``/``limit``/``bbox``/``properties`` block of
an OGC URL, used by both :func:`_construct_api_requests` and
:func:`_construct_cql_request`."""
params: dict[str, Any] = {
"skipGeometry": skip_geometry,
"limit": 50000 if limit is None or limit > 50000 else limit,
}
if bbox is not None and len(bbox) > 0:
params["bbox"] = ",".join(map(str, bbox))
if properties:
params["properties"] = ",".join(properties)
return params


def _construct_cql_request(
service: str,
cql_body: str,
properties: list[str] | None = None,
bbox: list[float] | None = None,
limit: int | None = None,
skip_geometry: bool | None = None,
) -> requests.PreparedRequest:
"""Build a POST/CQL2 request for the generalized ``get_waterdata`` path.

Distinct from :func:`_construct_api_requests`: that function derives
the CQL2 body from typed kwargs and also handles the GET-with-comma-
separated-values path. Here the body is passed through verbatim so
a caller can express predicates the typed wrappers can't (top-level
``or``, ``like`` with ``%`` wildcards, comparison operators, …).
"""
service_url = f"{OGC_API_URL}/collections/{service}/items"
headers = _default_headers()
headers["Content-Type"] = "application/query-cql-json"
request = requests.Request(
method="POST",
url=service_url,
headers=headers,
data=cql_body,
params=_ogc_query_params(properties, bbox, limit, skip_geometry),
)
return request.prepare()


def _next_req_url(resp: requests.Response) -> str | None:
"""
Extracts the URL for the next page of results from an HTTP response from a
Expand Down Expand Up @@ -868,6 +930,29 @@ def _sort_rows(df: pd.DataFrame) -> pd.DataFrame:
return df


def _finalize_ogc_frame(
df: pd.DataFrame,
response: requests.Response,
properties: list[str] | None,
service: str,
output_id: str,
convert_type: bool,
) -> tuple[pd.DataFrame, BaseMetadata]:
"""Apply the standard OGC post-processing tail to a raw frame and
wrap the response in :class:`BaseMetadata`.

Shared by :func:`get_ogc_data` (typed-kwargs path) and
:func:`dataretrieval.waterdata.api.get_waterdata` (raw-CQL2 path)
so both surfaces produce identically-shaped DataFrames.
"""
df = _deal_with_empty(df, properties, service)
if convert_type:
df = _type_cols(df)
df = _arrange_cols(df, properties, output_id)
df = _sort_rows(df)
return df, BaseMetadata(response)


def get_ogc_data(
args: dict[str, Any], output_id: str, service: str
) -> tuple[pd.DataFrame, BaseMetadata]:
Expand Down Expand Up @@ -914,13 +999,9 @@ def get_ogc_data(
args = {k: v for k, v in args.items() if v is not None}

return_list, response = _fetch_once(args)
return_list = _deal_with_empty(return_list, properties, service)
if convert_type:
return_list = _type_cols(return_list)
return_list = _arrange_cols(return_list, properties, output_id)
return_list = _sort_rows(return_list)

return return_list, BaseMetadata(response)
return _finalize_ogc_frame(
return_list, response, properties, service, output_id, convert_type
)


@filters.chunked(build_request=_construct_api_requests)
Expand Down
Loading