Skip to content

Commit 4b60dae

Browse files
authored
[yaml] Iceberg add files yaml transform and test (#37938)
* something * add iceberg transform * update pipelines with verification * fix comments * address dlq error * add an AssertEqual per comment
1 parent 661add4 commit 4b60dae

File tree

2 files changed

+139
-0
lines changed

2 files changed

+139
-0
lines changed

sdks/python/apache_beam/yaml/standard_io.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,27 @@
403403
'ReadFromIcebergCDC': 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1'
404404
config:
405405
gradle_target: 'sdks:java:io:expansion-service:shadowJar'
406+
407+
#IcebergAddFiles
408+
- type: renaming
409+
transforms:
410+
'IcebergAddFiles': 'IcebergAddFiles'
411+
config:
412+
mappings:
413+
'IcebergAddFiles':
414+
table: 'table'
415+
catalog_properties: 'catalog_properties'
416+
config_properties: 'config_properties'
417+
triggering_frequency_seconds: 'triggering_frequency_seconds'
418+
append_batch_size: 'append_batch_size'
419+
location_prefix: 'location_prefix'
420+
partition_fields: 'partition_fields'
421+
table_properties: 'table_properties'
422+
error_handling: 'error_handling'
423+
underlying_provider:
424+
type: beamJar
425+
transforms:
426+
'IcebergAddFiles': 'beam:schematransform:iceberg_add_files:v1'
427+
config:
428+
gradle_target: 'sdks:java:io:expansion-service:shadowJar'
429+
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
fixtures:
19+
- name: TEMP_DIR
20+
type: "tempfile.TemporaryDirectory"
21+
22+
pipelines:
23+
24+
# Pipeline 1: Write a dummy Parquet file
25+
- pipeline:
26+
type: chain
27+
transforms:
28+
- type: Create
29+
config:
30+
elements:
31+
- {label: "11a", rank: 0, bool: true}
32+
- {label: "37a", rank: 1, bool: false}
33+
- {label: "389a", rank: 2, bool: false}
34+
- {label: "3821b", rank: 3, bool: true}
35+
- {label: "990c", rank: 4, bool: true}
36+
- {label: "1024d", rank: 5, bool: false}
37+
- type: WriteToParquet
38+
config:
39+
path: "{TEMP_DIR}/data/data"
40+
file_name_suffix: ".parquet"
41+
42+
# Pipeline 2: Add our generated file to the Iceberg table
43+
- pipeline:
44+
type: chain
45+
transforms:
46+
- type: Create
47+
config:
48+
elements:
49+
# By default, Beam writes a sharded file like <prefix>-00000-of-00001
50+
- {file: "{TEMP_DIR}/data/data-00000-of-00001.parquet"}
51+
- type: IcebergAddFiles
52+
config:
53+
table: "default.table"
54+
location_prefix: "{TEMP_DIR}/data/"
55+
catalog_properties:
56+
type: "hadoop"
57+
warehouse: "{TEMP_DIR}/dir"
58+
59+
# Pipeline 3: Read from Iceberg and verify the contents
60+
- pipeline:
61+
type: chain
62+
transforms:
63+
- type: ReadFromIceberg
64+
config:
65+
table: "default.table"
66+
catalog_properties:
67+
type: "hadoop"
68+
warehouse: "{TEMP_DIR}/dir"
69+
- type: AssertEqual
70+
config:
71+
elements:
72+
- {label: "11a", rank: 0, bool: true}
73+
- {label: "37a", rank: 1, bool: false}
74+
- {label: "389a", rank: 2, bool: false}
75+
- {label: "3821b", rank: 3, bool: true}
76+
- {label: "990c", rank: 4, bool: true}
77+
- {label: "1024d", rank: 5, bool: false}
78+
79+
# Pipeline 4: Add an invalid file to trigger the DLQ
80+
- pipeline:
81+
type: composite
82+
transforms:
83+
- type: Create
84+
config:
85+
elements:
86+
- {file: "gs://dummy-bucket/does-not-exist.txt"}
87+
- type: IcebergAddFiles
88+
name: AddInvalidFile
89+
input: Create
90+
config:
91+
table: "default.table"
92+
location_prefix: "gs://dummy-bucket/"
93+
catalog_properties:
94+
type: "hadoop"
95+
warehouse: "{TEMP_DIR}/dir"
96+
error_handling:
97+
output: error_output
98+
- type: WriteToJson
99+
name: WriteErrorsToJson
100+
input: AddInvalidFile.error_output
101+
config:
102+
path: "{TEMP_DIR}/error.json"
103+
104+
# Pipeline 5: Ensure errors were written
105+
- pipeline:
106+
type: chain
107+
transforms:
108+
- type: ReadFromJson
109+
config:
110+
path: "{TEMP_DIR}/error.json*"
111+
- type: AssertEqual
112+
config:
113+
elements:
114+
- {file: "gs://dummy-bucket/does-not-exist.txt", error: "Could not determine the file's format"}
115+

0 commit comments

Comments
 (0)