Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -58,6 +59,41 @@ public abstract static class Builder {
}
}

@AutoValue
public abstract static class TimePartitioningConfig implements Serializable {
@SchemaFieldDescription("The time partitioning type.")
public abstract String getType();

@SchemaFieldDescription(
"If not set, the table is partitioned by pseudo column '_PARTITIONTIME'; if set, the table is partitioned by this field.")
public abstract @Nullable String getField();

@SchemaFieldDescription(
"The number of milliseconds for which to keep the storage for a partition.")
public abstract @Nullable Long getExpirationMs();

@SchemaFieldDescription("If set to true, queries over this table require a partition filter.")
public abstract @Nullable Boolean getRequirePartitionFilter();

public static Builder builder() {
return new AutoValue_BigQueryWriteConfiguration_TimePartitioningConfig.Builder();
}

@AutoValue.Builder
public abstract static class Builder implements Serializable {

public abstract Builder setType(String type);

public abstract Builder setField(String field);

public abstract Builder setExpirationMs(Long expirationMs);

public abstract Builder setRequirePartitionFilter(Boolean requirePartitionFilter);

public abstract TimePartitioningConfig build();
}
}

public void validate() {
String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: ";

Expand Down Expand Up @@ -197,6 +233,9 @@ public static Builder builder() {
@SchemaFieldDescription("A list of columns to cluster the BigQuery table by.")
public abstract @Nullable List<String> getClusteringFields();

@SchemaFieldDescription("Configuration for BigQuery time partitioning.")
public abstract @Nullable TimePartitioningConfig getTimePartitioningConfig();

@SchemaFieldDescription(
"Configuration for creating BigLake tables. The following options are available:"
+ "\n - connectionId (REQUIRED): the name of your cloud resource connection,"
Expand Down Expand Up @@ -239,6 +278,8 @@ public abstract static class Builder {

public abstract Builder setClusteringFields(List<String> clusteringFields);

public abstract Builder setTimePartitioningConfig(TimePartitioningConfig config);

public abstract Builder setBigLakeConfiguration(
java.util.Map<String, String> bigLakeConfiguration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@
import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.TimePartitioningConfig;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.RowFilter;
Expand All @@ -50,9 +55,27 @@ public class PortableBigQueryDestinations extends DynamicDestinations<Row, Strin
private final @Nullable List<String> primaryKey;
private final RowFilter rowFilter;
private final @Nullable List<String> clusteringFields;
private final @Nullable TimePartitioningConfig timePartitioningConfig;
private static final Set<String> allowedTypes =
new HashSet<>(Arrays.asList("DAY", "HOUR", "MONTH", "YEAR"));

public PortableBigQueryDestinations(Schema rowSchema, BigQueryWriteConfiguration configuration) {
this.clusteringFields = configuration.getClusteringFields();
this.timePartitioningConfig = configuration.getTimePartitioningConfig();

// Validate partition field exists if time partitioning field is set
if (this.timePartitioningConfig != null && this.timePartitioningConfig.getField() != null) {
String partitionField = this.timePartitioningConfig.getField();

// Check if the partition field exists in the schema
boolean fieldExists =
rowSchema.getFields().stream().anyMatch(field -> field.getName().equals(partitionField));
if (!fieldExists) {
throw new IllegalArgumentException(
String.format(
"The partition field '%s' does not exist in the input schema.", partitionField));
}
}
// DYNAMIC_DESTINATIONS magic string is the old way of doing it for cross-language.
// In that case, we do no interpolation
if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
Expand Down Expand Up @@ -83,9 +106,42 @@ public String getDestination(@Nullable ValueInSingleWindow<Row> element) {
@Override
public TableDestination getTable(String destination) {

TimePartitioning timePartitioning = null;

if (timePartitioningConfig != null) {
String type = timePartitioningConfig.getType();
String field = timePartitioningConfig.getField();
Long expirationMs = timePartitioningConfig.getExpirationMs();
Boolean requirePartitionFilter = timePartitioningConfig.getRequirePartitionFilter();

if (!allowedTypes.contains(type)) {
throw new IllegalArgumentException(
String.format(
"Invalid TimePartitioning 'type': '%s'. Allowed values are: %s",
type, allowedTypes));
}

timePartitioning =
new TimePartitioning().setType(type); // type is required, as checked earlier

if (field != null) {
timePartitioning.setField(field);
}

if (expirationMs != null) {
timePartitioning.setExpirationMs(expirationMs);
}

if (requirePartitionFilter != null) {
timePartitioning.setRequirePartitionFilter(requirePartitionFilter);
}
}

if (clusteringFields != null && !clusteringFields.isEmpty()) {
Clustering clustering = new Clustering().setFields(clusteringFields);
return new TableDestination(destination, null, null, clustering);
return new TableDestination(destination, null, timePartitioning, clustering);
} else if (timePartitioning != null) {
return new TableDestination(destination, null, timePartitioning);
}
return new TableDestination(destination, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TimePartitioning;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -83,10 +84,13 @@ public class BigQueryManagedIT {

private static final String PROJECT =
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();

private static final String BIG_QUERY_DATASET_ID = "bigquery_managed_" + System.nanoTime();

private static final Clustering CLUSTERING = new Clustering().setFields(Arrays.asList("str"));

private static final TimePartitioning TIME_PARTITIONING = new TimePartitioning().setType("DAY");

@BeforeClass
public static void setUpTestEnvironment() throws IOException, InterruptedException {
// Create one BQ dataset for all test cases.
Expand All @@ -102,8 +106,12 @@ public static void cleanup() {
public void testBatchFileLoadsWriteRead() throws IOException, InterruptedException {
String table =
String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName());

Map<String, Object> writeConfig =
ImmutableMap.of("table", table, "clustering_fields", Collections.singletonList("str"));
ImmutableMap.of(
"table", table,
"clustering_fields", Collections.singletonList("str"),
"time_partitioning_config", ImmutableMap.of("type", "DAY"));

// file loads requires a GCS temp location
String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot();
Expand All @@ -124,10 +132,11 @@ public void testBatchFileLoadsWriteRead() throws IOException, InterruptedExcepti
PAssert.that(outputRows).containsInAnyOrder(ROWS);
readPipeline.run().waitUntilFinish();

// Asserting clustering
// Asserting clustering and time partitioning
Table tableMetadata =
BQ_CLIENT.getTableResource(PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName());
Assert.assertEquals(CLUSTERING, tableMetadata.getClustering());
Assert.assertEquals(TIME_PARTITIONING, tableMetadata.getTimePartitioning());
}

@Test
Expand Down
78 changes: 72 additions & 6 deletions sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,16 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
BIGQUERY_DATASET = 'python_xlang_storage_write'

ELEMENTS = [
# (int, float, numeric, string, bool, bytes, timestamp)
# (int, float, numeric, string, bool, bytes, timestamp, timestamp)
{
"int": 1,
"float": 0.1,
"numeric": Decimal("1.11"),
"str": "a",
"bool": True,
"bytes": b'a',
"timestamp": Timestamp(1000, 100)
"timestamp": Timestamp(1000, 100),
"event_time": Timestamp(1722243600)
},
{
"int": 2,
Expand All @@ -83,7 +84,8 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
"str": "b",
"bool": False,
"bytes": b'b',
"timestamp": Timestamp(2000, 200)
"timestamp": Timestamp(2000, 200),
"event_time": Timestamp(1722277200)
},
{
"int": 3,
Expand All @@ -92,7 +94,8 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
"str": "c",
"bool": True,
"bytes": b'd',
"timestamp": Timestamp(3000, 300)
"timestamp": Timestamp(3000, 300),
"event_time": Timestamp(1722304200)
},
{
"int": 4,
Expand All @@ -101,12 +104,13 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
"str": "d",
"bool": False,
"bytes": b'd',
"timestamp": Timestamp(4000, 400)
"timestamp": Timestamp(4000, 400),
"event_time": Timestamp(1722383999)
}
]
ALL_TYPES_SCHEMA = (
"int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING,"
"bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP")
"bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP,event_time:TIMESTAMP")

def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
Expand Down Expand Up @@ -336,6 +340,68 @@ def test_write_with_clustering(self):
self.assertEqual(clustering_fields, ['int'])
hamcrest_assert(p, bq_matcher)

def test_write_with_time_partitioning(self):
table = 'write_with_time_partitioning'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)

EXPECTED_DATA = [
# (int, float, numeric, string, bool, bytes, timestamp, timestamp)
{
"int": 1,
"float": 0.1,
"numeric": Decimal("1.11"),
"str": "a",
"bool": True,
"bytes": b'a',
"timestamp": Timestamp(1000, 100),
"event_time": Timestamp(1722243600)
},
{
"int": 2,
"float": 0.2,
"numeric": Decimal("2.22"),
"str": "b",
"bool": False,
"bytes": b'b',
"timestamp": Timestamp(2000, 200),
"event_time": Timestamp(1722277200)
}
]

bq_matcher = BigqueryFullResultMatcher(
project=self.project,
query="SELECT * FROM {}.{} WHERE DATE(event_time) = '2024-07-29'".
format(self.dataset_id, table),
data=self.parse_expected_data(EXPECTED_DATA))

with beam.Pipeline(argv=self.args) as p:
_ = (
p
| "Create test data" >> beam.Create(self.ELEMENTS)
| beam.io.WriteToBigQuery(
table=table_id,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
schema=self.ALL_TYPES_SCHEMA,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
additional_bq_parameters={
'timePartitioning': {
'type': 'DAY',
'field': 'event_time',
'expiration_ms': 2592000000,
'require_partition_filter': True
}
}))

# After pipeline finishes, verify time partitioning is applied
table = self.bigquery_client.get_table(self.project, self.dataset_id, table)

self.assertEqual(table.timePartitioning.type, 'DAY')
self.assertEqual(table.timePartitioning.field, 'event_time')
self.assertEqual(table.timePartitioning.requirePartitionFilter, True)
self.assertEqual(table.timePartitioning.expirationMs, 2592000000)
hamcrest_assert(p, bq_matcher)

def test_write_with_beam_rows_cdc(self):
table = 'write_with_beam_rows_cdc'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
Expand Down
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2714,13 +2714,17 @@ def expand(self, input):
table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS

clustering_fields = []
time_partitioning_config = None

if self.additional_bq_parameters:
if callable(self.additional_bq_parameters):
raise NotImplementedError(
"Currently, dynamic clustering and timepartitioning is not "
"supported for STORAGE_WRITE_API write method.")
clustering_fields = (
self.additional_bq_parameters.get("clustering", {}).get("fields", []))
time_partitioning_config = (
self.additional_bq_parameters.get("timePartitioning", None))

output = (
input_beam_rows
Expand All @@ -2738,6 +2742,7 @@ def expand(self, input):
use_cdc_writes=self._use_cdc_writes,
primary_key=self._primary_key,
clustering_fields=clustering_fields,
time_partitioning_config=time_partitioning_config,
big_lake_configuration=self._big_lake_configuration,
error_handling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
Expand Down
Loading
Loading