From b08eb19f55b02ffe6e06cf341efa1b1921ac551c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 13 Apr 2026 12:33:45 +0200 Subject: [PATCH 1/3] CI: Activate OCI building on PRs --- .github/workflows/release-oci-full.yml | 2 +- .github/workflows/release-oci-ingest.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/release-oci-full.yml b/.github/workflows/release-oci-full.yml index abb1930f..c791b7fb 100644 --- a/.github/workflows/release-oci-full.yml +++ b/.github/workflows/release-oci-full.yml @@ -7,7 +7,7 @@ on: - '*.*.*' # Run on pull requests. - # pull_request: + pull_request: # Run each night. schedule: diff --git a/.github/workflows/release-oci-ingest.yml b/.github/workflows/release-oci-ingest.yml index 42fd7f81..9c2cc053 100644 --- a/.github/workflows/release-oci-ingest.yml +++ b/.github/workflows/release-oci-ingest.yml @@ -7,7 +7,7 @@ on: - '*.*.*' # Run on pull requests. - # pull_request: + pull_request: # Run each night. schedule: From beab5de355e4238b1bd7b1569b278e69080b3974 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 13 Apr 2026 12:32:24 +0200 Subject: [PATCH 2/3] I/O: Add CSV file import, with transformations --- CHANGES.md | 1 + cratedb_toolkit/cluster/core.py | 8 +- cratedb_toolkit/io/file/__init__.py | 0 cratedb_toolkit/io/file/csv.py | 125 ++++++++++++++++++ cratedb_toolkit/io/router.py | 13 ++ cratedb_toolkit/io/util.py | 5 +- cratedb_toolkit/model.py | 8 ++ .../testing/testcontainers/cratedb.py | 1 + cratedb_toolkit/util/database.py | 3 +- pyproject.toml | 2 + tests/io/file/__init__.py | 0 tests/io/file/data/__init__.py | 0 tests/io/file/data/climate_ddl.sql | 14 ++ tests/io/file/data/climate_json_json.csv | 4 + tests/io/file/data/climate_json_python.csv | 4 + tests/io/file/data/climate_wkt_json.csv | 4 + tests/io/file/test_csv.py | 46 +++++++ 17 files changed, 231 insertions(+), 7 deletions(-) create mode 100644 cratedb_toolkit/io/file/__init__.py create mode 100644 cratedb_toolkit/io/file/csv.py create mode 100644 tests/io/file/__init__.py create mode 100644 tests/io/file/data/__init__.py create mode 100644 tests/io/file/data/climate_ddl.sql create mode 100644 tests/io/file/data/climate_json_json.csv create mode 100644 tests/io/file/data/climate_json_python.csv create mode 100644 tests/io/file/data/climate_wkt_json.csv create mode 100644 tests/io/file/test_csv.py diff --git a/CHANGES.md b/CHANGES.md index 3df4b9c6..6ebfb095 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,7 @@ - Kinesis: Added `ctk kinesis` CLI group with `list-checkpoints` and `prune-checkpoints` commands for checkpoint table maintenance - Dependencies: Permitted installation of click 8.3 +- I/O: Added CSV file import, with transformations ## 2026/03/16 v0.0.46 - I/O: API improvements: `ctk {load,save} table` became `ctk {load,save}` diff --git a/cratedb_toolkit/cluster/core.py b/cratedb_toolkit/cluster/core.py index f89e77e3..8689af29 100644 --- a/cratedb_toolkit/cluster/core.py +++ b/cratedb_toolkit/cluster/core.py @@ -545,10 +545,10 @@ def load_table( ctk load kinesis+dms:///arn:aws:kinesis:eu-central-1:831394476016:stream/testdrive ctk load kinesis+dms:///path/to/dms-over-kinesis.jsonl """ - - self._load_table_result = self._router.load_table( - source=source, target=self.address, transformation=transformation - ) + address = self.address + if target: + address = address.with_table_address(target) + self._load_table_result = self._router.load_table(source=source, target=address, transformation=transformation) return self def save_table( diff --git a/cratedb_toolkit/io/file/__init__.py b/cratedb_toolkit/io/file/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/io/file/csv.py b/cratedb_toolkit/io/file/csv.py new file mode 100644 index 00000000..033f9deb --- /dev/null +++ b/cratedb_toolkit/io/file/csv.py @@ -0,0 +1,125 @@ +""" +CSV file integration for CrateDB Toolkit. + +This module provides functionality to transfer data between CSV files +and CrateDB database tables, supporting both import and export operations. +""" + +import dataclasses +import logging +from typing import Dict, List, Optional + +import polars as pl +from boltons.urlutils import URL + +from cratedb_toolkit.io.util import parse_uri, polars_to_cratedb + +logger = logging.getLogger(__name__) + + +DEFAULT_SEPARATOR = "," +DEFAULT_QUOTE_CHAR = '"' +DEFAULT_BATCH_SIZE = 75_000 + + +@dataclasses.dataclass +class CsvFileAddress: + """ + Represent a CSV file location and provide loader methods. + """ + + url: URL + location: str + pipeline: Optional[List[str]] = dataclasses.field(default_factory=list) + batch_size: Optional[int] = DEFAULT_BATCH_SIZE + # TODO: What about other parameters? See `polars.io.csv.functions`. + separator: Optional[str] = DEFAULT_SEPARATOR + quote_char: Optional[str] = DEFAULT_QUOTE_CHAR + + @classmethod + def from_url(cls, url: str) -> "CsvFileAddress": + """ + Parse a CSV file location and return a CsvFileAddress object. + + Examples: + + csv://./var/lib/example.csv + https://guided-path.s3.us-east-1.amazonaws.com/demo_climate_data_export.csv + """ + url_obj, location = parse_uri(url, "csv") + return cls( + url=url_obj, + location=location, + pipeline=url_obj.query_params.getlist("pipe"), + batch_size=int(url_obj.query_params.get("batch-size", DEFAULT_BATCH_SIZE)), + separator=url_obj.query_params.get("separator", DEFAULT_SEPARATOR), + quote_char=url_obj.query_params.get("quote-char", DEFAULT_QUOTE_CHAR), + ) + + @property + def storage_options(self) -> Dict[str, str]: + """ + Provide file storage options. + + TODO: Generalize. + """ + prefixes = ["aws_", "azure_", "google_", "delta_"] + return self.collect_properties(self.url.query_params, prefixes) + + @staticmethod + def collect_properties(query_params: Dict, prefixes: List) -> Dict[str, str]: + """ + Collect parameters from URL query string. + + TODO: Generalize. + """ + opts = {} + for name, value in query_params.items(): + for prefix in prefixes: + if name.lower().startswith(prefix) and value is not None: + opts[name.upper()] = value + break + return opts + + def load_table(self) -> pl.LazyFrame: + """ + Load the CSV file as a Polars LazyFrame. + """ + + # Read from data source. + lf = pl.scan_csv( + self.location, + separator=self.separator, + quote_char=self.quote_char, + storage_options=self.storage_options, + ) + + # Optionally apply transformations. + if self.pipeline: + from macropipe import MacroPipe + + mp = MacroPipe.from_recipes(*self.pipeline) + lf = mp.apply(lf) + + return lf + + +def from_csv(source_url, target_url, progress: bool = False) -> bool: + """ + Scan a CSV file from local filesystem or object store, and load into CrateDB. + Documentation: https://cratedb-toolkit.readthedocs.io/io/file/csv.html + + See also: https://docs.pola.rs/api/python/stable/reference/api/polars.scan_csv.html + + # Synopsis: Load from filesystem. + ctk load \ + "csv://./var/lib/example.csv" \ + "crate://crate@localhost:4200/demo/example" + """ + source = CsvFileAddress.from_url(source_url) + logger.info(f"File address: {source.location}") + return polars_to_cratedb( + frame=source.load_table(), + target_url=target_url, + chunk_size=source.batch_size, + ) diff --git a/cratedb_toolkit/io/router.py b/cratedb_toolkit/io/router.py index b989b285..fc9544de 100644 --- a/cratedb_toolkit/io/router.py +++ b/cratedb_toolkit/io/router.py @@ -107,6 +107,19 @@ def load_table( progress=True, ) + elif ( + source_url_obj.scheme.startswith("csv") + or source_url_obj.scheme.endswith("csv") + or source_url_obj.path.endswith(".csv") + ): + from cratedb_toolkit.io.file.csv import from_csv + + adjusted_url = str(source_url_obj) + if source_url_obj.scheme.startswith("csv"): + adjusted_url = str(source_url_obj.path) + + return from_csv(adjusted_url, target_url) + elif source_url_obj.scheme.startswith("deltalake") or source_url_obj.scheme.endswith("deltalake"): from cratedb_toolkit.io.deltalake import from_deltalake diff --git a/cratedb_toolkit/io/util.py b/cratedb_toolkit/io/util.py index dc38c57f..6be619bd 100644 --- a/cratedb_toolkit/io/util.py +++ b/cratedb_toolkit/io/util.py @@ -40,9 +40,10 @@ def polars_to_cratedb(frame: pl.LazyFrame, target_url, chunk_size: int) -> bool: """ Write a Polars LazyFrame to a CrateDB table, in batches/chunks. """ - cratedb_address = DatabaseAddress.from_string(target_url) + target_url_obj = URL(target_url) + if_exists = target_url_obj.query_params.pop("if-exists", "fail") + cratedb_address = DatabaseAddress.from_string(str(target_url_obj)) cratedb_url, cratedb_table = cratedb_address.decode() - if_exists = URL(target_url).query_params.get("if-exists") or "fail" if cratedb_table.table is None: raise ValueError("Table name is missing. Please adjust CrateDB database URL.") logger.info("Target address: %s", cratedb_address) diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index 6479e5b4..79026dfb 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -203,6 +203,13 @@ def schema(self) -> t.Union[str, None]: """ return self.uri.query_params.get("schema") or self.uri.path.lstrip("/") + def with_table_address(self, table_address: "TableAddress") -> "DatabaseAddress": + cp = deepcopy(self) + cp.uri.path = f"/{table_address.schema}/{table_address.table}" + if table_address.if_exists: + cp.uri.query_params["if-exists"] = table_address.if_exists + return cp + @dataclasses.dataclass class TableAddress: @@ -212,6 +219,7 @@ class TableAddress: schema: t.Optional[str] = None table: t.Optional[str] = None + if_exists: t.Optional[str] = None def __bool__(self): return bool(self.table) diff --git a/cratedb_toolkit/testing/testcontainers/cratedb.py b/cratedb_toolkit/testing/testcontainers/cratedb.py index a1e65c7e..dc2b21f3 100644 --- a/cratedb_toolkit/testing/testcontainers/cratedb.py +++ b/cratedb_toolkit/testing/testcontainers/cratedb.py @@ -142,6 +142,7 @@ def get_connection_url(self, dialect: str = "crate", host: Optional[str] = None) """ # TODO: When using `db_name=self.CRATEDB_DB`: # Connection.__init__() got an unexpected keyword argument 'database' + # Connection.__init__() got an unexpected keyword argument 'table' return super()._create_connection_url( dialect=dialect, username=self.CRATEDB_USER, diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index ae10a989..a1ae7f4a 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -460,8 +460,9 @@ def decode_database_table(url: str) -> t.Tuple[t.Union[str, None], t.Union[str, if "too many values to unpack" not in str(ex) and "not enough values to unpack" not in str(ex): raise + table = url_.query_params.get("table") + if not database: database = url_.query_params.get("database") - table = url_.query_params.get("table") if url_.scheme == "crate" and not database: database = url_.query_params.get("schema") if database is None and table is None: diff --git a/pyproject.toml b/pyproject.toml index 3f8bc1d9..e87cd770 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -202,6 +202,7 @@ optional-dependencies.io-opentable = [ "cratedb-toolkit[deltalake,iceberg]", ] optional-dependencies.io-recipe = [ + "macropipe[geo]==0.0.0", "tikray>=0.2,<0.4", ] optional-dependencies.kinesis = [ @@ -292,6 +293,7 @@ line-length = 120 line-length = 120 extend-exclude = [ "amqp-to-mqtt.py", + "examples", "workbench.py", ] lint.select = [ diff --git a/tests/io/file/__init__.py b/tests/io/file/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/io/file/data/__init__.py b/tests/io/file/data/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/io/file/data/climate_ddl.sql b/tests/io/file/data/climate_ddl.sql new file mode 100644 index 00000000..4f647ad3 --- /dev/null +++ b/tests/io/file/data/climate_ddl.sql @@ -0,0 +1,14 @@ +CREATE TABLE "{schema}".climate_data +( + "timestamp" TIMESTAMP WITHOUT TIME ZONE, + "geo_location" GEO_POINT, + "data" OBJECT(DYNAMIC) AS ( + "temperature" DOUBLE PRECISION, + "u10" DOUBLE PRECISION, + "v10" DOUBLE PRECISION, + "pressure" DOUBLE PRECISION, + "latitude" DOUBLE PRECISION, + "longitude" DOUBLE PRECISION, + "humidity" DOUBLE PRECISION + ) +); diff --git a/tests/io/file/data/climate_json_json.csv b/tests/io/file/data/climate_json_json.csv new file mode 100644 index 00000000..175b302d --- /dev/null +++ b/tests/io/file/data/climate_json_json.csv @@ -0,0 +1,4 @@ +timestamp,geo_location,data +1754784000000,'[14.988999953493476, 51.10299998894334]','{"temperature": 19.704827880859398, "pressure": 99310.625, "v10": -1.545882225036621, "u10": 1.7978938817977905, "latitude": 51.102999999999945, "longitude": 14.989}' +1754784000000,'[7.088122218847275, 51.0029999865219]','{"temperature": 19.347802734375023, "pressure": 101470.9609375, "v10": -1.256191611289978, "u10": 0.02778780460357666, "latitude": 51.00299999999994, "longitude": 7.0881222222222195}' +1754784000000,'[7.58817776106298, 51.0029999865219]','{"temperature": 17.713037109375023, "pressure": 98837.3984375, "v10": -1.5747417211532593, "u10": -0.19953763484954834, "latitude": 51.00299999999994, "longitude": 7.588177777777774}' diff --git a/tests/io/file/data/climate_json_python.csv b/tests/io/file/data/climate_json_python.csv new file mode 100644 index 00000000..fcf2c4cd --- /dev/null +++ b/tests/io/file/data/climate_json_python.csv @@ -0,0 +1,4 @@ +timestamp,geo_location,data +1754784000000,"[14.988999953493476, 51.10299998894334]","{'temperature': 19.704827880859398, 'pressure': 99310.625, 'v10': -1.545882225036621, 'u10': 1.7978938817977905, 'latitude': 51.102999999999945, 'longitude': 14.989}" +1754784000000,"[7.088122218847275, 51.0029999865219]","{'temperature': 19.347802734375023, 'pressure': 101470.9609375, 'v10': -1.256191611289978, 'u10': 0.02778780460357666, 'latitude': 51.00299999999994, 'longitude': 7.0881222222222195}" +1754784000000,"[7.58817776106298, 51.0029999865219]","{'temperature': 17.713037109375023, 'pressure': 98837.3984375, 'v10': -1.5747417211532593, 'u10': -0.19953763484954834, 'latitude': 51.00299999999994, 'longitude': 7.588177777777774}" diff --git a/tests/io/file/data/climate_wkt_json.csv b/tests/io/file/data/climate_wkt_json.csv new file mode 100644 index 00000000..0dbcc143 --- /dev/null +++ b/tests/io/file/data/climate_wkt_json.csv @@ -0,0 +1,4 @@ +timestamp,geo_location,data +1754784000000,'POINT ( 14.988999953493476 51.10299998894334 )','{"temperature": 19.704827880859398, "pressure": 99310.625, "v10": -1.545882225036621, "u10": 1.7978938817977905, "latitude": 51.102999999999945, "longitude": 14.989}' +1754784000000,'POINT ( 7.088122218847275 51.0029999865219 )','{"temperature": 19.347802734375023, "pressure": 101470.9609375, "v10": -1.256191611289978, "u10": 0.02778780460357666, "latitude": 51.00299999999994, "longitude": 7.0881222222222195}' +1754784000000,'POINT ( 7.58817776106298 51.0029999865219 )','{"temperature": 17.713037109375023, "pressure": 98837.3984375, "v10": -1.5747417211532593, "u10": -0.19953763484954834, "latitude": 51.00299999999994, "longitude": 7.588177777777774}' diff --git a/tests/io/file/test_csv.py b/tests/io/file/test_csv.py new file mode 100644 index 00000000..64706627 --- /dev/null +++ b/tests/io/file/test_csv.py @@ -0,0 +1,46 @@ +from importlib.resources import files + +import pytest + +import tests.io.file.data +from cratedb_toolkit import DatabaseCluster, InputOutputResource, TableAddress +from tests.conftest import TESTDRIVE_DATA_SCHEMA + +data_folder = files(tests.io.file.data) +ddl = (data_folder / "climate_ddl.sql").read_text().format(schema=TESTDRIVE_DATA_SCHEMA) +climate_json_json = ( + str(data_folder / "climate_json_json.csv") + "?quote-char='&pipe=json_array_to_wkt_point:geo_location" +) +climate_json_python = ( + str(data_folder / "climate_json_python.csv") + + '?quote-char="&pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data' +) +climate_wkt_json = str(data_folder / "climate_wkt_json.csv") + "?quote-char='" + +table_address = TableAddress(schema=TESTDRIVE_DATA_SCHEMA, table="climate_data", if_exists="append") + + +@pytest.fixture(scope="function") +def provision_ddl(cratedb_synchronized) -> None: + cratedb_synchronized.database.run_sql(ddl) + + +def test_load_csv_json_json(cratedb_synchronized, provision_ddl): + cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi) + cluster.load_table(InputOutputResource(climate_json_json), target=table_address) + cluster.adapter.refresh_table(table_address.fullname) + assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned" + + +def test_load_csv_json_python(cratedb_synchronized, provision_ddl): + cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi) + cluster.load_table(InputOutputResource(climate_json_python), target=table_address) + cluster.adapter.refresh_table(table_address.fullname) + assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned" + + +def test_load_csv_wkt_json(cratedb_synchronized, provision_ddl): + cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi) + cluster.load_table(InputOutputResource(climate_wkt_json), target=table_address) + cluster.adapter.refresh_table(table_address.fullname) + assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned" From b8ae50970c88517455fc74e3fd4dad6f0310389c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 13 Apr 2026 22:53:32 +0200 Subject: [PATCH 3/3] I/O: Improve CSV file import, with suggestions by CodeRabbit --- cratedb_toolkit/io/file/csv.py | 55 +++++++++++++++++++++++++--------- cratedb_toolkit/io/router.py | 2 +- cratedb_toolkit/model.py | 4 +++ tests/cluster/test_import.py | 2 +- tests/io/file/test_csv.py | 32 +++++++++++++++----- 5 files changed, 72 insertions(+), 23 deletions(-) diff --git a/cratedb_toolkit/io/file/csv.py b/cratedb_toolkit/io/file/csv.py index 033f9deb..cc2f3b47 100644 --- a/cratedb_toolkit/io/file/csv.py +++ b/cratedb_toolkit/io/file/csv.py @@ -31,7 +31,7 @@ class CsvFileAddress: url: URL location: str pipeline: Optional[List[str]] = dataclasses.field(default_factory=list) - batch_size: Optional[int] = DEFAULT_BATCH_SIZE + batch_size: int = DEFAULT_BATCH_SIZE # TODO: What about other parameters? See `polars.io.csv.functions`. separator: Optional[str] = DEFAULT_SEPARATOR quote_char: Optional[str] = DEFAULT_QUOTE_CHAR @@ -47,11 +47,15 @@ def from_url(cls, url: str) -> "CsvFileAddress": https://guided-path.s3.us-east-1.amazonaws.com/demo_climate_data_export.csv """ url_obj, location = parse_uri(url, "csv") + try: + batch_size = int(url_obj.query_params.get("batch-size", DEFAULT_BATCH_SIZE)) + except ValueError as ex: + raise ValueError("Invalid value for batch size") from ex return cls( url=url_obj, location=location, pipeline=url_obj.query_params.getlist("pipe"), - batch_size=int(url_obj.query_params.get("batch-size", DEFAULT_BATCH_SIZE)), + batch_size=batch_size, separator=url_obj.query_params.get("separator", DEFAULT_SEPARATOR), quote_char=url_obj.query_params.get("quote-char", DEFAULT_QUOTE_CHAR), ) @@ -81,18 +85,22 @@ def collect_properties(query_params: Dict, prefixes: List) -> Dict[str, str]: break return opts - def load_table(self) -> pl.LazyFrame: + def load_table(self, lazy: bool = True) -> pl.LazyFrame: """ Load the CSV file as a Polars LazyFrame. """ # Read from data source. - lf = pl.scan_csv( - self.location, - separator=self.separator, - quote_char=self.quote_char, - storage_options=self.storage_options, - ) + kwargs = { + "separator": self.separator, + "quote_char": self.quote_char, + "storage_options": self.storage_options, + } + # Note: Type checker ignores are only for Python 3.9. + if lazy: + lf = pl.scan_csv(self.location, **kwargs) # ty: ignore[invalid-argument-type] + else: + lf = pl.read_csv(self.location, **kwargs).lazy() # ty: ignore[invalid-argument-type] # Optionally apply transformations. if self.pipeline: @@ -118,8 +126,27 @@ def from_csv(source_url, target_url, progress: bool = False) -> bool: """ source = CsvFileAddress.from_url(source_url) logger.info(f"File address: {source.location}") - return polars_to_cratedb( - frame=source.load_table(), - target_url=target_url, - chunk_size=source.batch_size, - ) + + try: + return polars_to_cratedb( + frame=source.load_table(), + target_url=target_url, + chunk_size=source.batch_size or DEFAULT_BATCH_SIZE, + ) + + # OSError: object-store error: Generic S3 error: Error performing PUT http://169.254.169.254/latest/api/token + # in 218.979617ms, after 2 retries, max_retries: 2, retry_timeout: 10s - HTTP error: + # error sending request (path: s3://guided-path/demo_climate_data_export.csv) + except OSError as ex: + msg = str(ex) + if "Generic S3 error" in msg and "/api/token" in msg: + logger.warning( + "Storage backend authentication is required for streaming reads but failed. " + "Falling back to non-streaming mode: This may result in inefficient reads." + ) + return polars_to_cratedb( + frame=source.load_table(lazy=False), + target_url=target_url, + chunk_size=source.batch_size, + ) + raise OSError(f"Loading data from CSV failed: {source_url}: {msg}") from ex diff --git a/cratedb_toolkit/io/router.py b/cratedb_toolkit/io/router.py index fc9544de..09ad4829 100644 --- a/cratedb_toolkit/io/router.py +++ b/cratedb_toolkit/io/router.py @@ -116,7 +116,7 @@ def load_table( adjusted_url = str(source_url_obj) if source_url_obj.scheme.startswith("csv"): - adjusted_url = str(source_url_obj.path) + source_url_obj.scheme = None return from_csv(adjusted_url, target_url) diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index 79026dfb..2c57181e 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -206,8 +206,12 @@ def schema(self) -> t.Union[str, None]: def with_table_address(self, table_address: "TableAddress") -> "DatabaseAddress": cp = deepcopy(self) cp.uri.path = f"/{table_address.schema}/{table_address.table}" + # Use `if-exists` from table address. if table_address.if_exists: cp.uri.query_params["if-exists"] = table_address.if_exists + # When not supplied, don't let existing spots leak. + else: + cp.uri.query_params.pop("if-exists", None) return cp diff --git a/tests/cluster/test_import.py b/tests/cluster/test_import.py index 0c4ebaf7..83fbbc82 100644 --- a/tests/cluster/test_import.py +++ b/tests/cluster/test_import.py @@ -59,7 +59,7 @@ def test_parquet_import_remote(cloud_environment, caplog): assert result.exit_code == 0, f"ERROR: {result.output}" assert "Loading data." in caplog.text - assert "target=TableAddress(schema=None, table='basic')" in caplog.text + assert "target=TableAddress(schema=None, table='basic'" in caplog.text assert "Import succeeded (status: SUCCEEDED)" in caplog.text with ManagedCluster.from_env() as cluster: diff --git a/tests/io/file/test_csv.py b/tests/io/file/test_csv.py index 64706627..bc231eef 100644 --- a/tests/io/file/test_csv.py +++ b/tests/io/file/test_csv.py @@ -11,11 +11,12 @@ climate_json_json = ( str(data_folder / "climate_json_json.csv") + "?quote-char='&pipe=json_array_to_wkt_point:geo_location" ) -climate_json_python = ( +climate_json_python_local = ( str(data_folder / "climate_json_python.csv") + '?quote-char="&pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data' ) climate_wkt_json = str(data_folder / "climate_wkt_json.csv") + "?quote-char='" +climate_json_python_s3 = "https://guided-path.s3.us-east-1.amazonaws.com/demo_climate_data_export.csv?pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data" table_address = TableAddress(schema=TESTDRIVE_DATA_SCHEMA, table="climate_data", if_exists="append") @@ -25,22 +26,39 @@ def provision_ddl(cratedb_synchronized) -> None: cratedb_synchronized.database.run_sql(ddl) -def test_load_csv_json_json(cratedb_synchronized, provision_ddl): +def test_load_csv_wkt_json(cratedb_synchronized, provision_ddl): + """Load a CSV file that does not need any geo transformations.""" cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi) - cluster.load_table(InputOutputResource(climate_json_json), target=table_address) + cluster.load_table(InputOutputResource(climate_wkt_json), target=table_address) cluster.adapter.refresh_table(table_address.fullname) assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned" -def test_load_csv_json_python(cratedb_synchronized, provision_ddl): +def test_load_geo_csv_json_json(cratedb_synchronized, provision_ddl): + """Load a CSV file that needs geo transformations.""" + pytest.importorskip("polars_st", reason="CSV import needs geo transformations") cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi) - cluster.load_table(InputOutputResource(climate_json_python), target=table_address) + cluster.load_table(InputOutputResource(climate_json_json), target=table_address) cluster.adapter.refresh_table(table_address.fullname) assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned" -def test_load_csv_wkt_json(cratedb_synchronized, provision_ddl): +def test_load_geo_csv_json_python_local(cratedb_synchronized, provision_ddl): + """Load a CSV file that needs geo transformations.""" + pytest.importorskip("polars_st", reason="CSV import needs geo transformations") cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi) - cluster.load_table(InputOutputResource(climate_wkt_json), target=table_address) + cluster.load_table(InputOutputResource(climate_json_python_local), target=table_address) cluster.adapter.refresh_table(table_address.fullname) assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned" + + +@pytest.mark.skip( + "Test takes too long to complete. When aiming to test a remote data source, please use a smaller dataset." +) +def test_load_geo_csv_json_python_s3(cratedb_synchronized, provision_ddl): + """Load a CSV file that needs geo transformations.""" + pytest.importorskip("polars_st", reason="CSV import needs geo transformations") + cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi) + cluster.load_table(InputOutputResource(climate_json_python_s3), target=table_address) + cluster.adapter.refresh_table(table_address.fullname) + assert cluster.adapter.count_records(table_address.fullname) == 22650, "Wrong number of records returned"