Skip to content

Commit 69efed4

Browse files
committed
chore: attach type usage to job labels
1 parent 6fef9be commit 69efed4

File tree

5 files changed

+69
-4
lines changed

5 files changed

+69
-4
lines changed

bigframes/core/compile/configs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,4 @@ class CompileResult:
3434
sql: str
3535
sql_schema: typing.Sequence[google.cloud.bigquery.SchemaField]
3636
row_order: typing.Optional[ordering.RowOrdering]
37+
encoded_type_refs: str

bigframes/core/compile/ibis_compiler/ibis_compiler.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import bigframes.core.compile.concat as concat_impl
3030
import bigframes.core.compile.configs as configs
3131
import bigframes.core.compile.explode
32+
from bigframes.core.logging import data_types as data_type_logger
3233
import bigframes.core.nodes as nodes
3334
import bigframes.core.ordering as bf_ordering
3435
import bigframes.core.rewrite as rewrites
@@ -56,23 +57,30 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult:
5657
)
5758
if request.sort_rows:
5859
result_node = cast(nodes.ResultNode, rewrites.column_pruning(result_node))
60+
encoded_type_refs = data_type_logger.encode_type_refs(result_node)
5961
sql = compile_result_node(result_node)
6062
return configs.CompileResult(
61-
sql, result_node.schema.to_bigquery(), result_node.order_by
63+
sql,
64+
result_node.schema.to_bigquery(),
65+
result_node.order_by,
66+
encoded_type_refs,
6267
)
6368

6469
ordering: Optional[bf_ordering.RowOrdering] = result_node.order_by
6570
result_node = dataclasses.replace(result_node, order_by=None)
6671
result_node = cast(nodes.ResultNode, rewrites.column_pruning(result_node))
6772
result_node = cast(nodes.ResultNode, rewrites.defer_selection(result_node))
73+
encoded_type_refs = data_type_logger.encode_type_refs(result_node)
6874
sql = compile_result_node(result_node)
6975
# Return the ordering iff no extra columns are needed to define the row order
7076
if ordering is not None:
7177
output_order = (
7278
ordering if ordering.referenced_columns.issubset(result_node.ids) else None
7379
)
7480
assert (not request.materialize_all_order_keys) or (output_order is not None)
75-
return configs.CompileResult(sql, result_node.schema.to_bigquery(), output_order)
81+
return configs.CompileResult(
82+
sql, result_node.schema.to_bigquery(), output_order, encoded_type_refs
83+
)
7684

7785

7886
def _replace_unsupported_ops(node: nodes.BigFrameNode):

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from bigframes.core.compile.sqlglot.expressions import typed_expr
3535
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
3636
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
37+
from bigframes.core.logging import data_types as data_type_logger
3738
import bigframes.core.ordering as bf_ordering
3839
from bigframes.core.rewrite import schema_binding
3940

@@ -65,9 +66,13 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult:
6566
result_node = typing.cast(
6667
nodes.ResultNode, rewrite.defer_selection(result_node)
6768
)
69+
encoded_type_refs = data_type_logger.encode_type_refs(result_node)
6870
sql = _compile_result_node(result_node, uid_gen)
6971
return configs.CompileResult(
70-
sql, result_node.schema.to_bigquery(), result_node.order_by
72+
sql,
73+
result_node.schema.to_bigquery(),
74+
result_node.order_by,
75+
encoded_type_refs,
7176
)
7277

7378
ordering: typing.Optional[bf_ordering.RowOrdering] = result_node.order_by
@@ -76,14 +81,17 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult:
7681

7782
result_node = _remap_variables(result_node, uid_gen)
7883
result_node = typing.cast(nodes.ResultNode, rewrite.defer_selection(result_node))
84+
encoded_type_refs = data_type_logger.encode_type_refs(result_node)
7985
sql = _compile_result_node(result_node, uid_gen)
8086
# Return the ordering iff no extra columns are needed to define the row order
8187
if ordering is not None:
8288
output_order = (
8389
ordering if ordering.referenced_columns.issubset(result_node.ids) else None
8490
)
8591
assert (not request.materialize_all_order_keys) or (output_order is not None)
86-
return configs.CompileResult(sql, result_node.schema.to_bigquery(), output_order)
92+
return configs.CompileResult(
93+
sql, result_node.schema.to_bigquery(), output_order, encoded_type_refs
94+
)
8795

8896

8997
def _remap_variables(

bigframes/session/bq_caching_executor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,8 @@ def _export_gbq(
318318
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
319319
)
320320

321+
# Attach data type usage to the job labels
322+
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
321323
# TODO(swast): plumb through the api_name of the user-facing api that
322324
# caused this query.
323325
iterator, job = self._run_execute_query(
@@ -661,6 +663,8 @@ def _execute_plan_gbq(
661663
)
662664
job_config.destination = destination_table
663665

666+
# Attach data type usage to the job labels
667+
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
664668
iterator, query_job = self._run_execute_query(
665669
sql=compiled.sql,
666670
job_config=job_config,
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from unittest import mock
16+
17+
from bigframes.core.logging import data_types
18+
import bigframes.session._io.bigquery as bq_io
19+
20+
21+
def test_data_type_logging(scalars_df_index):
22+
s = scalars_df_index["int64_col"] + 1.5
23+
24+
# We want to check the job_config passed to _query_and_wait_bigframes
25+
with mock.patch(
26+
"bigframes.session._io.bigquery.start_query_with_client",
27+
wraps=bq_io.start_query_with_client,
28+
) as mock_query:
29+
s.to_pandas()
30+
31+
# Verify call args
32+
assert mock_query.called
33+
call_args = mock_query.call_args
34+
job_config = call_args.kwargs.get("job_config")
35+
36+
# Verify we actually got a job_config
37+
assert job_config is not None
38+
39+
# Use the captured job_config for assertions
40+
job_labels = job_config.labels
41+
assert "bigframes-dtypes" in job_labels
42+
assert job_labels["bigframes-dtypes"] == data_types.encode_type_refs(
43+
s._block._expr.node
44+
)

0 commit comments

Comments
 (0)