Skip to content

Commit a159a4c

Browse files
committed
fix: Use binary(16) for UUID type to ensure Spark compatibility
PyArrow's pa.uuid() type creates Python Arrow metadata that differs from Java Arrow's UUID metadata, causing incompatibility with Spark. Python and Rust Arrow implementations don't recognize Java's UUID metadata. - Change UUIDType Arrow schema conversion from pa.uuid() to pa.binary(16) - Add integration test verifying UUID round-trip between PyIceberg and Spark - Update existing tests to expect binary(16) instead of pa.uuid() - Bump Iceberg version to 1.10.1 which includes Java-side UUID fix
1 parent dea8078 commit a159a4c

File tree

4 files changed

+64
-4
lines changed

4 files changed

+64
-4
lines changed

dev/spark/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ ARG BASE_IMAGE_SPARK_VERSION=4.0.1
1818
FROM apache/spark:${BASE_IMAGE_SPARK_VERSION}
1919

2020
# Dependency versions - keep these compatible
21-
ARG ICEBERG_VERSION=1.10.0
21+
ARG ICEBERG_VERSION=1.10.1
2222
ARG ICEBERG_SPARK_RUNTIME_VERSION=4.0_2.13
2323
ARG SPARK_VERSION=4.0.1
2424
ARG HADOOP_VERSION=3.4.1

pyiceberg/io/pyarrow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ def visit_string(self, _: StringType) -> pa.DataType:
789789
return pa.large_string()
790790

791791
def visit_uuid(self, _: UUIDType) -> pa.DataType:
792-
return pa.uuid()
792+
return pa.binary(16)
793793

794794
def visit_unknown(self, _: UnknownType) -> pa.DataType:
795795
"""Type `UnknownType` can be promoted to any primitive type in V3+ tables per the Iceberg spec."""

tests/integration/test_add_files.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -828,7 +828,7 @@ def test_add_files_with_valid_upcast(
828828
pa.field("list", pa.list_(pa.int64()), nullable=False),
829829
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
830830
pa.field("double", pa.float64(), nullable=True),
831-
pa.field("uuid", pa.uuid(), nullable=True),
831+
pa.field("uuid", pa.binary(16), nullable=True),
832832
)
833833
)
834834
)

tests/integration/test_writes/test_writes.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1500,7 +1500,7 @@ def test_table_write_schema_with_valid_upcast(
15001500
pa.field("list", pa.list_(pa.int64()), nullable=False),
15011501
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
15021502
pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double
1503-
pa.field("uuid", pa.uuid(), nullable=True),
1503+
pa.field("uuid", pa.binary(16), nullable=True),
15041504
)
15051505
)
15061506
)
@@ -2530,3 +2530,63 @@ def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Cat
25302530
assert tbl.metadata.next_row_id == initial_next_row_id + len(test_data), (
25312531
"Expected next_row_id to be incremented by the number of added rows"
25322532
)
2533+
2534+
2535+
@pytest.mark.integration
2536+
def test_write_uuid_in_pyiceberg_and_scan(session_catalog: Catalog, spark: SparkSession) -> None:
2537+
"""Test UUID compatibility between PyIceberg and Spark.
2538+
2539+
UUIDs must be written as binary(16) for Spark compatibility since Java Arrow
2540+
metadata differs from Python Arrow metadata for UUID types.
2541+
"""
2542+
identifier = "default.test_write_uuid_in_pyiceberg_and_scan"
2543+
2544+
catalog = load_catalog("default", type="in-memory")
2545+
catalog.create_namespace("ns")
2546+
2547+
schema = Schema(NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), required=False))
2548+
2549+
test_data_with_null = {
2550+
"uuid_col": [
2551+
uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
2552+
None,
2553+
uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
2554+
]
2555+
}
2556+
2557+
try:
2558+
session_catalog.drop_table(identifier=identifier)
2559+
except NoSuchTableError:
2560+
pass
2561+
2562+
table = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=schema)
2563+
2564+
arrow_table = pa.table(test_data_with_null, schema=schema.as_arrow())
2565+
2566+
# Write with pyarrow
2567+
table.append(arrow_table)
2568+
2569+
# Write with pyspark
2570+
spark.sql(
2571+
f"""
2572+
INSERT INTO {identifier} VALUES ("22222222-2222-2222-2222-222222222222")
2573+
"""
2574+
)
2575+
df = spark.table(identifier)
2576+
2577+
table.refresh()
2578+
2579+
assert df.count() == 4
2580+
assert len(table.scan().to_arrow()) == 4
2581+
2582+
result = df.where("uuid_col = '00000000-0000-0000-0000-000000000000'")
2583+
assert result.count() == 1
2584+
2585+
result = df.where("uuid_col = '22222222-2222-2222-2222-222222222222'")
2586+
assert result.count() == 1
2587+
2588+
result = table.scan(row_filter=EqualTo("uuid_col", uuid.UUID("00000000-0000-0000-0000-000000000000").bytes)).to_arrow()
2589+
assert len(result) == 1
2590+
2591+
result = table.scan(row_filter=EqualTo("uuid_col", uuid.UUID("22222222-2222-2222-2222-222222222222").bytes)).to_arrow()
2592+
assert len(result) == 1

0 commit comments

Comments
 (0)