From c641f9fb405298ad06bcb71f3d7b0e9cc82b0600 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Tue, 30 Dec 2025 19:53:11 +0530 Subject: [PATCH 1/9] Support 0 eval results --- tools/statvar_importer/property_value_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/statvar_importer/property_value_mapper.py b/tools/statvar_importer/property_value_mapper.py index 76d5209216..8c60f481a3 100644 --- a/tools/statvar_importer/property_value_mapper.py +++ b/tools/statvar_importer/property_value_mapper.py @@ -347,7 +347,7 @@ def _process_eval(self, pvs: dict, data_key: str) -> bool: self._log_every_n) if not eval_prop: eval_prop = data_key - if eval_data and eval_data != eval_str: + if eval_data is not None and eval_data != eval_str: pvs[eval_prop] = eval_data self._counters.add_counter('processed-eval', 1, eval_str) pvs.pop(eval_key) From f30e89550743d537d0aed8f59e3031cd2452cf19 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Wed, 25 Mar 2026 16:32:16 +0530 Subject: [PATCH 2/9] Use environment variable for DC API root --- util/dc_api_wrapper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index f1a1fee247..98f7209d0e 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -15,7 +15,7 @@ It uses the DataCommonsClient library module for DC APIs and adds support for batched requests, retries and HTTP caching. -DC API requires an environment variable set for DC_API_KEY. +DC API requires an environment variable set for DC_API_KEY and DC_API_ROOT. Please refer to https://docs.datacommons.org/api/python/v2 for more details. """ @@ -265,7 +265,7 @@ def get_datacommons_client(config: dict = None) -> DataCommonsClient: """Returns a DataCommonsClient object initialized using config.""" config = _validate_v2_config(config) api_key = get_dc_api_key(config) - dc_instance = config.get('dc_api_root') + dc_instance = config.get('dc_api_root', os.environ.get('DC_API_ROOT')) url = None # Check if API root is a host or url endpoint. if dc_instance: From 2a0e93d8f73aeef6ff2b9b48bf9b5b819ed99182 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Wed, 25 Mar 2026 22:17:08 +0530 Subject: [PATCH 3/9] cleanup dc_api_root configs --- scripts/earthengine/utils.py | 2 -- scripts/earthengine/utils_test.py | 1 - .../common/datacommons_api_wrappers/datacommons_wrappers.py | 3 --- .../datacommons_api_wrappers/datacommons_wrappers_test.py | 2 -- .../india_rbi_state_statistics/environment_sdg_metadata.csv | 1 - .../india_rbi_state_statistics/infrastructure_metadata.csv | 1 - .../india_rbi_state_statistics/rbi_metadata.csv | 5 ----- .../denmark_demographics/denmark_demographics_metadata.csv | 1 - .../fema/flood_insurance_claims/us_flood_nfip_config.py | 3 --- .../state_domestic_product_metadata.csv | 1 - .../statistics_poland/StatisticsPoland_metadata.csv | 3 --- .../ap_ib_gt_enrollment/config/common_metadata.csv | 1 - .../state/config/SATorACT_Participation_metadata.csv | 1 - tools/statvar_importer/config_flags.py | 2 +- util/dc_api_wrapper.py | 3 ++- 15 files changed, 3 insertions(+), 27 deletions(-) diff --git a/scripts/earthengine/utils.py b/scripts/earthengine/utils.py index 44dbd71535..7e59202eaa 100644 --- a/scripts/earthengine/utils.py +++ b/scripts/earthengine/utils.py @@ -46,7 +46,6 @@ # Constants _MAX_LATITUDE = 90.0 _MAX_LONGITUDE = 180.0 -_DC_API_ROOT = 'https://api.datacommons.org' # Utilities for dicts. @@ -372,7 +371,6 @@ def place_id_to_lat_lng(placeid: str, { 'dc_api_version': 'V2', 'dc_api_use_cache': True, - 'dc_api_root': _DC_API_ROOT, }, ) node_props = resp.get(placeid) if resp else None diff --git a/scripts/earthengine/utils_test.py b/scripts/earthengine/utils_test.py index bfdd347661..f0f8e567cd 100644 --- a/scripts/earthengine/utils_test.py +++ b/scripts/earthengine/utils_test.py @@ -394,5 +394,4 @@ def test_place_id_to_lat_lng_dc_api(self): [placeid], ['latitude', 'longitude'], { 'dc_api_version': 'V2', 'dc_api_use_cache': True, - 'dc_api_root': utils._DC_API_ROOT, }) diff --git a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py index 39c93bc599..70e0936ee5 100644 --- a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py +++ b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py @@ -64,9 +64,6 @@ def dc_check_existence(dcid_list: list, wrapper_config = { 'dc_api_batch_size': max_items, - 'dc_api_root': - 'https://autopush.api.datacommons.org' - if use_autopush else 'https://api.datacommons.org' } return dc_api_is_defined_dcid(dcid_list, wrapper_config) diff --git a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py index 5eb9d2a497..e0374c7010 100644 --- a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py +++ b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py @@ -37,14 +37,12 @@ def test_dc_check_existence_mock(self, mock_is_defined): mock_is_defined.assert_called_with( ['node1'], { 'dc_api_batch_size': 450, - 'dc_api_root': 'https://autopush.api.datacommons.org' }) # Test 2: use_autopush=False dc_check_existence(['node2'], use_autopush=False, max_items=10) mock_is_defined.assert_called_with(['node2'], { 'dc_api_batch_size': 10, - 'dc_api_root': 'https://api.datacommons.org' }) @mock.patch('datacommons_wrappers.request_post_json') diff --git a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/environment_sdg_metadata.csv b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/environment_sdg_metadata.csv index 782aa4c330..f11ac0b3c4 100644 --- a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/environment_sdg_metadata.csv +++ b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/environment_sdg_metadata.csv @@ -2,4 +2,3 @@ parameter,value header_rows,3 output_columns,"observationAbout,observationDate,variableMeasured,value,unit,observationPeriod" mapped_rows,3 -dc_api_root,https://api.datacommons.org diff --git a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/infrastructure_metadata.csv b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/infrastructure_metadata.csv index 475c900919..f5c45d8c4e 100644 --- a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/infrastructure_metadata.csv +++ b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/infrastructure_metadata.csv @@ -2,4 +2,3 @@ parameter,value header_rows,5 output_columns,"observationAbout,observationDate,variableMeasured,value,unit,observationPeriod" mapped_rows,5 -dc_api_root,https://api.datacommons.org diff --git a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/rbi_metadata.csv b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/rbi_metadata.csv index ad0d50f768..c2042f4fd4 100644 --- a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/rbi_metadata.csv +++ b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/rbi_metadata.csv @@ -2,8 +2,3 @@ parameter,value output_columns,"observationAbout,observationDate,variableMeasured,value,unit,observationPeriod" header_rows,4 mapped_rows,4 -dc_api_root,https://api.datacommons.org - - - - diff --git a/statvar_imports/denmark_demographics/denmark_demographics_metadata.csv b/statvar_imports/denmark_demographics/denmark_demographics_metadata.csv index 41f8f31e37..95d252a541 100644 --- a/statvar_imports/denmark_demographics/denmark_demographics_metadata.csv +++ b/statvar_imports/denmark_demographics/denmark_demographics_metadata.csv @@ -1,3 +1,2 @@ parameter,value output_columns,"observationDate,value,observationAbout,variableMeasured" -dc_api_root,https://api.datacommons.org diff --git a/statvar_imports/fema/flood_insurance_claims/us_flood_nfip_config.py b/statvar_imports/fema/flood_insurance_claims/us_flood_nfip_config.py index 90e53db883..082ce8b7e3 100644 --- a/statvar_imports/fema/flood_insurance_claims/us_flood_nfip_config.py +++ b/statvar_imports/fema/flood_insurance_claims/us_flood_nfip_config.py @@ -68,7 +68,4 @@ 5, 'dc_api_use_cache': True, - #'dc_api_root': 'http://autopush.api.datacommons.org', - 'dc_api_root': - 'http://api.datacommons.org', } diff --git a/statvar_imports/india_rbistatedomesticproduct/state_domestic_product_metadata.csv b/statvar_imports/india_rbistatedomesticproduct/state_domestic_product_metadata.csv index ee630bdff4..0c90bd2702 100644 --- a/statvar_imports/india_rbistatedomesticproduct/state_domestic_product_metadata.csv +++ b/statvar_imports/india_rbistatedomesticproduct/state_domestic_product_metadata.csv @@ -11,4 +11,3 @@ comments, output_columns,"observationAbout,observationDate,variableMeasured,value,unit,measurementMethod,observationPeriod" #header_rows,6 #mapped_rows,5 -dc_api_root,https://api.datacommons.org diff --git a/statvar_imports/statistics_poland/StatisticsPoland_metadata.csv b/statvar_imports/statistics_poland/StatisticsPoland_metadata.csv index b909a13a08..a3a30ec1c2 100644 --- a/statvar_imports/statistics_poland/StatisticsPoland_metadata.csv +++ b/statvar_imports/statistics_poland/StatisticsPoland_metadata.csv @@ -9,6 +9,3 @@ places_within,country/POL #skip_rows,1 header_rows,5 mapped_columns,2 -dc_api_root,https://api.datacommons.org - - diff --git a/statvar_imports/us_urban_school/ap_ib_gt_enrollment/config/common_metadata.csv b/statvar_imports/us_urban_school/ap_ib_gt_enrollment/config/common_metadata.csv index 41a321a836..2c8f80a15c 100644 --- a/statvar_imports/us_urban_school/ap_ib_gt_enrollment/config/common_metadata.csv +++ b/statvar_imports/us_urban_school/ap_ib_gt_enrollment/config/common_metadata.csv @@ -3,4 +3,3 @@ mapped_rows,1 output_columns,"observationDate,observationAbout,variableMeasured,value" #input_rows,10 mapped_columns,2 -dc_api_root,https://api.datacommons.org diff --git a/statvar_imports/us_urban_school/sat_act_participation/state/config/SATorACT_Participation_metadata.csv b/statvar_imports/us_urban_school/sat_act_participation/state/config/SATorACT_Participation_metadata.csv index 4909fa4a53..75997951a4 100644 --- a/statvar_imports/us_urban_school/sat_act_participation/state/config/SATorACT_Participation_metadata.csv +++ b/statvar_imports/us_urban_school/sat_act_participation/state/config/SATorACT_Participation_metadata.csv @@ -1,3 +1,2 @@ parameter,value output_columns,"observationAbout,observationDate,value,variableMeasured,unit,scalingFactor" -dc_api_root,https://api.datacommons.org diff --git a/tools/statvar_importer/config_flags.py b/tools/statvar_importer/config_flags.py index d5214a3510..94a162f33c 100644 --- a/tools/statvar_importer/config_flags.py +++ b/tools/statvar_importer/config_flags.py @@ -370,7 +370,7 @@ def get_default_config() -> dict: True, # Settings for DC API. 'dc_api_root': - 'http://api.datacommons.org', + os.environ.get('DC_API_ROOT', 'http://api.datacommons.org'), 'dc_api_use_cache': False, 'dc_api_batch_size': diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index 98f7209d0e..682d4aeeaf 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -520,7 +520,8 @@ def dc_api_resolve_latlng(lat_lngs: list, dictionary containing the resolved place information. """ config = _validate_v2_config(config) - api_root = config.get('dc_api_root', _DEFAULT_API_ROOT) + api_root = config.get('dc_api_root', + os.environ.get('DC_API_ROOT', _DEFAULT_API_ROOT)) v1_data = {} v1_data['coordinates'] = lat_lngs num_ids = len(lat_lngs) From dfad7506200cbf30ee076a9e258e975460c965a2 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Wed, 25 Mar 2026 23:12:48 +0530 Subject: [PATCH 4/9] lint fix --- scripts/earthengine/utils.py | 12 +-- scripts/earthengine/utils_test.py | 10 +- .../datacommons_wrappers.py | 3 +- .../datacommons_wrappers_test.py | 7 +- tools/statvar_importer/config_flags.py | 2 +- util/dc_api_wrapper.py | 100 +++++++++++------- 6 files changed, 77 insertions(+), 57 deletions(-) diff --git a/scripts/earthengine/utils.py b/scripts/earthengine/utils.py index 7e59202eaa..aeb94045ac 100644 --- a/scripts/earthengine/utils.py +++ b/scripts/earthengine/utils.py @@ -19,11 +19,11 @@ from datetime import datetime import glob import os +from pathlib import Path import pickle import re import sys import tempfile -from pathlib import Path from typing import Union from absl import logging @@ -305,8 +305,8 @@ def grid_get_neighbor_ids(grid_id: str) -> list: if lat_offset != 0 or lng_offset != 0: neighbour_lat = lat + lat_offset * deg neighbour_lng = lng + lng_offset * deg - if abs(neighbour_lat) < _MAX_LATITUDE and abs( - neighbour_lng) < _MAX_LONGITUDE: + if (abs(neighbour_lat) < _MAX_LATITUDE and + abs(neighbour_lng) < _MAX_LONGITUDE): neighbours.append( grid_id_from_lat_lng( deg, @@ -433,7 +433,7 @@ def add_namespace(dcid: str, prefix: str = 'dcid:') -> str: def str_get_numeric_value( - value: Union[str, list, int, float]) -> Union[int, float, None]: + value: Union[str, list, int, float],) -> Union[int, float, None]: """Returns the numeric value from input string or None.""" if isinstance(value, list): value = value[0] @@ -528,7 +528,7 @@ def date_advance_by_period(date_str: str, if not date_str: return '' dt = datetime.strptime(date_str, date_format) - (delta, unit) = date_parse_time_period(time_period) + delta, unit = date_parse_time_period(time_period) if not delta or not unit: logging.error( f'Unable to parse time period: {time_period} for date: {date_str}') @@ -545,7 +545,7 @@ def date_format_by_time_period(date_str: str, time_period: str) -> str: """ if not time_period: return date_str - (delta, unit) = date_parse_time_period(time_period) + delta, unit = date_parse_time_period(time_period) date_parts = date_str.split('-') if unit == 'years': return date_parts[0] diff --git a/scripts/earthengine/utils_test.py b/scripts/earthengine/utils_test.py index f0f8e567cd..e93be53fea 100644 --- a/scripts/earthengine/utils_test.py +++ b/scripts/earthengine/utils_test.py @@ -390,8 +390,8 @@ def test_place_id_to_lat_lng_dc_api(self): lat, lng = utils.place_id_to_lat_lng(placeid, dc_api_lookup=True) self.assertAlmostEqual(37.221614, lat) self.assertAlmostEqual(-121.68954, lng) - mock_get.assert_called_once_with( - [placeid], ['latitude', 'longitude'], { - 'dc_api_version': 'V2', - 'dc_api_use_cache': True, - }) + mock_get.assert_called_once_with([placeid], + ['latitude', 'longitude'], { + 'dc_api_version': 'V2', + 'dc_api_use_cache': True, + }) diff --git a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py index 70e0936ee5..eb0e487f2a 100644 --- a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py +++ b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py @@ -62,8 +62,7 @@ def dc_check_existence(dcid_list: list, Dict object with dcids as key values and boolean values signifying existence as values. """ wrapper_config = { - 'dc_api_batch_size': - max_items, + 'dc_api_batch_size': max_items, } return dc_api_is_defined_dcid(dcid_list, wrapper_config) diff --git a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py index e0374c7010..e605afb5a9 100644 --- a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py +++ b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py @@ -34,10 +34,9 @@ def test_dc_check_existence_mock(self, mock_is_defined): # Test 1: Default (use_autopush=True by default in function signature) mock_is_defined.return_value = {'node1': True} dc_check_existence(['node1']) - mock_is_defined.assert_called_with( - ['node1'], { - 'dc_api_batch_size': 450, - }) + mock_is_defined.assert_called_with(['node1'], { + 'dc_api_batch_size': 450, + }) # Test 2: use_autopush=False dc_check_existence(['node2'], use_autopush=False, max_items=10) diff --git a/tools/statvar_importer/config_flags.py b/tools/statvar_importer/config_flags.py index 94a162f33c..a7449ca032 100644 --- a/tools/statvar_importer/config_flags.py +++ b/tools/statvar_importer/config_flags.py @@ -370,7 +370,7 @@ def get_default_config() -> dict: True, # Settings for DC API. 'dc_api_root': - os.environ.get('DC_API_ROOT', 'http://api.datacommons.org'), + os.environ.get('DC_API_ROOT', 'https://api.datacommons.org'), 'dc_api_use_cache': False, 'dc_api_batch_size': diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index 682d4aeeaf..1265452f8c 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -21,16 +21,21 @@ import os import sys -import urllib -import requests from typing import Union +import urllib from absl import logging from datacommons_client.client import DataCommonsClient from datacommons_client.utils.error_handling import APIError, DCConnectionError, DCStatusError +import requests import requests_cache -from tenacity import (RetryCallState, Retrying, retry_if_exception, - stop_after_attempt, wait_fixed) +from tenacity import ( + RetryCallState, + Retrying, + retry_if_exception, + stop_after_attempt, + wait_fixed, +) _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPT_DIR) @@ -64,8 +69,14 @@ def _get_exception_status_code(exception): def _should_retry_exception(exception: Exception) -> bool: - if isinstance(exception, (DCConnectionError, requests.exceptions.Timeout, - requests.exceptions.ChunkedEncodingError)): + if isinstance( + exception, + ( + DCConnectionError, + requests.exceptions.Timeout, + requests.exceptions.ChunkedEncodingError, + ), + ): return True if isinstance(exception, (urllib.error.HTTPError, DCStatusError, APIError)): status_code = _get_exception_status_code(exception) @@ -105,6 +116,7 @@ def dc_api_wrapper( retries: Maximum number of attempts (including the first attempt). retry_sec: Interval in seconds between retries for which caller is blocked. use_cache: If True, uses request cache for faster response. + Returns: The response from the DataCommons API call. """ @@ -147,8 +159,9 @@ def dc_api_wrapper( logging.error(f'Got exception for api: {function}, {e}') return None except Exception as e: - e.add_note(f'DC API call failed for {function} with max attempts ' - f'{max_attempts}.') + e.add_note( + f'DC API call failed for {function} with max attempts {max_attempts}.' + ) raise @@ -252,11 +265,13 @@ def get_dc_api_key(config: dict = None) -> str: api_key = config.get('dc_api_key', os.environ.get('DC_API_KEY')) if not api_key: logging.log_first_n( - logging.WARNING, f'Using default DC API key with limited quota. ' - 'Please set an API key in the environment variable: DC_API_KEY.' - 'Refer https://docs.datacommons.org/api/python/v2/#authentication ' - 'for more details.', - n=1) + logging.WARNING, + f'Using default DC API key with limited quota. ' + f'Please set an API key in the environment variable: DC_API_KEY.' + f'Refer https://docs.datacommons.org/api/python/v2/#authentication ' + f'for more details.', + n=1, + ) api_key = _DEFAULT_DC_API_KEY return api_key @@ -286,6 +301,7 @@ def get_datacommons_client(config: dict = None) -> DataCommonsClient: def dc_api_is_defined_dcid(dcids: list, config: dict = {}) -> dict: """Returns a dictionary with dcids mapped to True/False based on whether + the dcid is defined in the API and has a 'typeOf' property. Uses the property_value() DC API to lookup 'typeOf' for each dcid. dcids not defined in KG get a value of False. @@ -301,11 +317,13 @@ def dc_api_is_defined_dcid(dcids: list, config: dict = {}) -> dict: # Set parameters for node API. client = get_datacommons_client(config) api_function = client.node.fetch_property_values - api_result = dc_api_batched_wrapper(function=api_function, - dcids=dcids, - args={'properties': 'typeOf'}, - dcid_arg_kw='node_dcids', - config=config) + api_result = dc_api_batched_wrapper( + function=api_function, + dcids=dcids, + args={'properties': 'typeOf'}, + dcid_arg_kw='node_dcids', + config=config, + ) response = {} for dcid in dcids: dcid_stripped = _strip_namespace(dcid) @@ -348,11 +366,13 @@ def _dc_api_get_node_property_v2(dcids: list, api_function = client.node.fetch_property_values args = {'properties': prop} dcid_arg_kw = 'node_dcids' - api_result = dc_api_batched_wrapper(function=api_function, - dcids=dcids, - args=args, - dcid_arg_kw=dcid_arg_kw, - config=config) + api_result = dc_api_batched_wrapper( + function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=config, + ) response = {} for dcid in dcids: dcid_stripped = _strip_namespace(dcid) @@ -398,11 +418,13 @@ def dc_api_get_node_property_values(dcids: list, config: dict = {}) -> dict: api_function = client.node.fetch args = {'expression': '->*'} dcid_arg_kw = 'node_dcids' - api_result = dc_api_batched_wrapper(function=api_function, - dcids=dcids, - args=args, - dcid_arg_kw=dcid_arg_kw, - config=config) + api_result = dc_api_batched_wrapper( + function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=config, + ) response = {} for dcid, arcs in api_result.items(): pvs = {} @@ -446,11 +468,13 @@ def dc_api_resolve_placeid(dcids: list, api_function = client.resolve.fetch args = {'expression': f'<-{in_prop}->dcid'} dcid_arg_kw = 'node_ids' - api_result = dc_api_batched_wrapper(function=api_function, - dcids=dcids, - args=args, - dcid_arg_kw=dcid_arg_kw, - config=config) + api_result = dc_api_batched_wrapper( + function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=config, + ) results = {} if api_result: for node in api_result.get('entities', []): @@ -478,7 +502,7 @@ def dc_api_resolve_latlng(lat_lngs: list, } if return_v1_response is True, a v1 response of this form is returned: - + { "placeCoordinates": [ { @@ -552,8 +576,7 @@ def dc_api_resolve_latlng(lat_lngs: list, def _convert_v2_to_v1_coordinate_response(v2_response: dict) -> dict: - """Converts a v2 coordinate resolution response to a v1 response. - """ + """Converts a v2 coordinate resolution response to a v1 response.""" v1_response = {'placeCoordinates': []} for entity in v2_response.get('entities', []): node = entity.get('node', '') @@ -573,15 +596,14 @@ def _convert_v2_to_v1_coordinate_response(v2_response: dict) -> dict: candidate.get('dcid') for candidate in entity.get('candidates', []) ], - 'places': entity.get('candidates', []) + 'places': entity.get('candidates', []), } v1_response['placeCoordinates'].append(place_coordinate) return v1_response def _convert_v1_to_v2_coordinate_request(v1_request: dict) -> dict: - """Converts a v1 coordinate resolution request to a v2 request. - """ + """Converts a v1 coordinate resolution request to a v2 request.""" v2_request = {'nodes': [], 'property': '<-geoCoordinate->dcid'} for coordinate in v1_request.get('coordinates', []): lat = coordinate.get('latitude') From bd91137f1ec3ebf4f895474d124d18b559d894d3 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 16 Apr 2026 19:48:04 +0530 Subject: [PATCH 5/9] Script to upload observations to spanner graph --- .../spanner_upload_statvar_observations.py | 255 ++++++++++++++++++ 1 file changed, 255 insertions(+) create mode 100644 tools/spanner_graph/spanner_upload_statvar_observations.py diff --git a/tools/spanner_graph/spanner_upload_statvar_observations.py b/tools/spanner_graph/spanner_upload_statvar_observations.py new file mode 100644 index 0000000000..624df6aee4 --- /dev/null +++ b/tools/spanner_graph/spanner_upload_statvar_observations.py @@ -0,0 +1,255 @@ +import csv +import json +import os +import sys + +from google.cloud import spanner + +from absl import app +from absl import flags +from absl import logging + +_FLAGS = flags.FLAGS + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) +_DATA_DIR = os.path.dirname(os.path.dirname(_SCRIPT_DIR)) +sys.path.append(_DATA_DIR) +sys.path.append(os.path.join(_DATA_DIR, 'util')) +sys.path.append(os.path.join(_DATA_DIR, 'tools', 'statvar_importer')) + +from counters import Counters + +import file_util +import mcf_file_util + +flags.DEFINE_string("spanner_instance_id", "dc-kg-test", "Spanner Instance ID.") +flags.DEFINE_string("spanner_database_id", "dc_graph_2026_01_27", + "Spanner Database ID.") +flags.DEFINE_string("spanner_input_file", "", + "Input file with observations to be uploaded.") +flags.DEFINE_string("spanner_table_config_file", "", + "Table config file and input column mappings") +flags.DEFINE_list("spanner_default_values", [], + "Default list of property=values") +flags.DEFINE_boolean("dry_run", False, "Dry run mode.") +flags.DEFINE_boolean("spanner_debug", False, "Spanner debug mode.") + +# Map from input csv to spanner graph table columns +_DEFAULT_TABLE_CONFIG = { + # TimeSeries Table with the schema: + # id STRING(1024) NOT NULL, + # variable_measured STRING(1024) NOT NULL, + # provenance STRING(1024) NOT NULL, + "TimeSeries": { + "id": + "{variableMeasured};{provenance};{unit};{observationPeriod};{measurementMethod}", + "variable_measured": + "{variableMeasured}", + "provenance": + "{provenance}", + }, + # TimeSeriesAttribute Table with facet properties + # id STRING(1024) NOT NULL, + # property STRING(1024) NOT NULL, + # value STRING(1024) NOT NULL, + "TimeSeriesAttribute": [ + { + "id": + "{TimeSeries_id}", + "property": + "facetId", + "value": + "{provenance};{donor};{recipient};{unit};{observationPeriod};{measurementMethod}", + }, + { + "id": "{TimeSeries_id}", + "property": "importName", + "value": "{provenance}", + }, + { + "id": "{TimeSeries_id}", + "property": "recipient", + "value": "{recipient}", + }, + { + "id": "{TimeSeries_id}", + "property": "donor", + "value": "{donor}", + }, + { + "id": "{TimeSeries_id}", + "property": "measurementMethod", + "value": "{measurementMethod}", + }, + { + "id": "{TimeSeries_id}", + "property": "unit", + "value": "{unit}", + }, + ], + # StatVarObservation + # id STRING(1024) NOT NULL, + # date STRING(32) NOT NULL, + # value STRING(1024) NOT NULL, + "StatVarObservation": { + "id": "{TimeSeries_id}", + "date": "{observationDate}", + "value": "{value}", + }, + + # ObservationAttribute ( + # id STRING(1024) NOT NULL, + # date STRING(32) NOT NULL, + # property STRING(1024) NOT NULL, + # value STRING(1024) NOT NULL, + "ObservationAttribute": [{ + "id": "{StatVarObservation_id}", + "date": "{observationDate}", + "property": "footnote", + "value": "{footnote}", + },], +} + + +class SpannerStatVarObservationsUploader: + + def __init__(self, + instance_id, + database_id, + table_config_file: str = "", + default_values: list = [], + dry_run: bool = False, + counters=None): + self._instance_id = instance_id + self._database_id = database_id + self._table_config_file = table_config_file + self._default_values = {} + if isinstance(default_values, str): + default_values = default_values.split(',') + if default_values: + # Parse default values as prop=value + for default_value in default_values: + key, value = default_value.split("=") + self._default_values[key.strip()] = value.strip() + self._spanner_client = spanner.Client() + self._instance = self._spanner_client.instance(instance_id) + self._database = self._instance.database(database_id) + self._counters = counters or Counters() + self._dry_run = dry_run + self._table_config = None + self.load_spanner_config(self._table_config_file) + + def load_spanner_config(self, table_config_file): + if table_config_file: + with open(table_config_file, "r") as f: + self._table_config = json.load(f) + else: + self._table_config = _DEFAULT_TABLE_CONFIG + + def _build_value_from_template(self, template: str, row: dict) -> str: + """Build a value from a template string and a row dictionary.""" + try: + return template.format(**row) + except KeyError as e: + logging.error(f"KeyError: {e} in template: {template}, row: {row}") + raise e + + def process_input_row(self, row: dict): + """Process a single row from the input CSV and generate Spanner mutations.""" + mutations = [] + pvs = dict(self._default_values) + pvs.update(row) + # Generate all table columns values from the input row + self._counters.add_counter('spanner-input-rows', 1) + for table, table_cfg in self._table_config.items(): + # Generate mutation per table from the input row + table_row = {} + if isinstance(table_cfg, dict): + for col, template in table_cfg.items(): + if isinstance(template, str): + value = self._build_value_from_template(template, pvs) + if value: + table_row[col] = value + pvs[f'{table}_{col}'] = value + elif isinstance(table_cfg, list): + table_row = [] + for index, tpl in enumerate(table_cfg): + sub_table_row = {} + for col, template in tpl.items(): + value = self._build_value_from_template(template, pvs) + if value: + sub_table_row[col] = value + pvs_key = f'{table}{col}' + if pvs_key in pvs: + pvs_key = f'{table}_{col}_{index}' + pvs[pvs_key] = value + table_row.append(sub_table_row) + mutations.append((table, table_row)) + logging.debug(f'Generated table from {row}: {mutations}') + self._counters.add_counter('spanner-mutations-generated', 1) + + num_mutations = 0 + if not self._dry_run: + with self.database.batch() as batch: + for table, table_row in mutations: + if isinstance(table_row, list): + for row in table_row: + batch.insert(table=table, + columns=row.keys(), + values=[row.values()]) + num_mutations += len(row.keys()) + self._counters.add_counter( + f'spanner-rows-inserted-{table}', 1) + else: + batch.insert(table=table, + columns=table_row.keys(), + values=[table_row.values()]) + num_mutations += len(table_row.keys()) + self._counters.add_counter(f'spanner-rows-inserted-{table}', + 1) + return num_mutations + + def process_input_file(self, input_file: str): + input_files = file_util.file_get_matching(input_file) + num_mutations = 0 + for infile in input_files: + self._counters.add_counter('spanner-input-files', 1) + logging.info(f'Processing input file: {infile}') + with file_util.FileIO(infile, 'r') as f: + self._counters.add_counter( + 'total', file_util.file_estimate_num_rows(infile)) + reader = csv.DictReader(f) + for row in reader: + for prop in row.keys(): + row[prop] = mcf_file_util.strip_namespace(row[prop]) + num_mutations += self.process_input_row(row) + self._counters.add_counter('processed', 1) + logging.info( + f'Added {num_mutations} rows from {len(input_files)} files to spanner database {self._instance_id}.{self._database_id}' + ) + + +def spanner_upload_statvar_observations(instance_id, database_id, + table_config_file, default_values, + input_file, dry_run): + uploader = SpannerStatVarObservationsUploader(instance_id, database_id, + table_config_file, + default_values, dry_run) + uploader.process_input_file(input_file) + return uploader._counters + + +def main(_): + if _FLAGS.spanner_debug: + logging.set_verbosity(logging.DEBUG) + spanner_upload_statvar_observations(_FLAGS.spanner_instance_id, + _FLAGS.spanner_database_id, + _FLAGS.spanner_table_config_file, + _FLAGS.spanner_default_values, + _FLAGS.spanner_input_file, + _FLAGS.dry_run) + + +if __name__ == "__main__": + app.run(main) From 3c5ba0fbc013bb63de07d4343453ea53e66e19dd Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Fri, 17 Apr 2026 01:23:31 +0530 Subject: [PATCH 6/9] add facetid and delete --- .../spanner_upload_statvar_observations.py | 93 +++++++++++++++---- tools/statvar_importer/eval_functions.py | 17 ++++ 2 files changed, 93 insertions(+), 17 deletions(-) diff --git a/tools/spanner_graph/spanner_upload_statvar_observations.py b/tools/spanner_graph/spanner_upload_statvar_observations.py index 624df6aee4..725e7e92e3 100644 --- a/tools/spanner_graph/spanner_upload_statvar_observations.py +++ b/tools/spanner_graph/spanner_upload_statvar_observations.py @@ -19,6 +19,7 @@ sys.path.append(os.path.join(_DATA_DIR, 'tools', 'statvar_importer')) from counters import Counters +from eval_functions import evaluate_statement import file_util import mcf_file_util @@ -34,16 +35,31 @@ "Default list of property=values") flags.DEFINE_boolean("dry_run", False, "Dry run mode.") flags.DEFINE_boolean("spanner_debug", False, "Spanner debug mode.") +flags.DEFINE_boolean("spanner_delete", False, "Delete data from Spanner.") # Map from input csv to spanner graph table columns _DEFAULT_TABLE_CONFIG = { + # Default variables for common StatVarObservation properties + "DefaultVariables": { + "measurementMethod": "", + "provenance": "", + "unit": "", + "observationPeriod": "", + "scalingFactor": "", + }, + # Local variables reused in multiple tables + "LocalVariables": { + "FacetId": "=crc32('{provenance}-{measurementMethod}-{observationPeriod}-{unit}-{scalingFactor}')", + }, + + "SpannerTables": { # TimeSeries Table with the schema: # id STRING(1024) NOT NULL, # variable_measured STRING(1024) NOT NULL, # provenance STRING(1024) NOT NULL, "TimeSeries": { "id": - "{variableMeasured};{provenance};{unit};{observationPeriod};{measurementMethod}", + "{variableMeasured};{donor};{recipient};{FacetId}", "variable_measured": "{variableMeasured}", "provenance": @@ -109,6 +125,7 @@ "property": "footnote", "value": "{footnote}", },], + } } @@ -147,28 +164,37 @@ def load_spanner_config(self, table_config_file): else: self._table_config = _DEFAULT_TABLE_CONFIG - def _build_value_from_template(self, template: str, row: dict) -> str: - """Build a value from a template string and a row dictionary.""" - try: - return template.format(**row) - except KeyError as e: - logging.error(f"KeyError: {e} in template: {template}, row: {row}") - raise e + + def _get_default_variables(self, row: dict ): + def_vars = self._table_config.get("DefaultVariables", {}) + def_vars.update(self._default_values) + def_vars.update(row) + return def_vars + + def _get_local_variables(self, pvs: dict): + local_vars = self._table_config.get("LocalVariables", {}) + for var in local_vars.keys(): + value = _build_value_from_template(local_vars[var], pvs) + pvs[var] = value + return pvs def process_input_row(self, row: dict): """Process a single row from the input CSV and generate Spanner mutations.""" mutations = [] - pvs = dict(self._default_values) - pvs.update(row) + pvs = self._get_default_variables(row) + pvs = self._get_local_variables(pvs) + # Generate all table columns values from the input row self._counters.add_counter('spanner-input-rows', 1) - for table, table_cfg in self._table_config.items(): + spanner_tables = self._table_config.get("SpannerTables", {}) + for table, table_cfg in spanner_tables.items(): # Generate mutation per table from the input row table_row = {} if isinstance(table_cfg, dict): - for col, template in table_cfg.items(): + for col in list(table_cfg.keys()): + template = table_cfg[col] if isinstance(template, str): - value = self._build_value_from_template(template, pvs) + value = _build_value_from_template(template, pvs) if value: table_row[col] = value pvs[f'{table}_{col}'] = value @@ -176,8 +202,9 @@ def process_input_row(self, row: dict): table_row = [] for index, tpl in enumerate(table_cfg): sub_table_row = {} - for col, template in tpl.items(): - value = self._build_value_from_template(template, pvs) + for col in tpl.keys(): + template = tpl[col] + value = _build_value_from_template(template, pvs) if value: sub_table_row[col] = value pvs_key = f'{table}{col}' @@ -229,14 +256,45 @@ def process_input_file(self, input_file: str): f'Added {num_mutations} rows from {len(input_files)} files to spanner database {self._instance_id}.{self._database_id}' ) + def delete_statvar_observations(self): + """Delete data from all tables for a given provenance.""" + pvs = self._get_default_variables() + provenance = pvs.get('provenance') + if not self._dry_run: + for table, template in self._table_config.get("SpannerTables", {}).items(): + if 'provenance' in template: + row_ct = self.database.execute_partitioned_dml( + f"DELETE FROM {table} WHERE provenance = '{provenance}'" + ) + logging.info(f"Bulk deleted {row_ct} records from {table}.") + self._counters.add_counter(f'spanner-rows-deleted-{table}', row_ct) + else: + logging.info(f"Dry run: would have deleted data from {len(self._table_config.get("SpannerTables", {}))} tables.") + + +def _build_value_from_template(template: str, row: dict) -> str: + """Build a value from a template string and a row dictionary.""" + try: + if template.startswith('='): + variable, value = evaluate_statement(template[1:], row) + row[variable] = value + return value + return template.format(**row) + except KeyError as e: + logging.error(f"KeyError: {e} in template: {template}, row: {row}") + raise e + def spanner_upload_statvar_observations(instance_id, database_id, table_config_file, default_values, - input_file, dry_run): + input_file, delete, dry_run): uploader = SpannerStatVarObservationsUploader(instance_id, database_id, table_config_file, default_values, dry_run) - uploader.process_input_file(input_file) + if delete: + uploader.delete_statvar_observations() + if input_file: + uploader.process_input_file(input_file) return uploader._counters @@ -248,6 +306,7 @@ def main(_): _FLAGS.spanner_table_config_file, _FLAGS.spanner_default_values, _FLAGS.spanner_input_file, + _FLAGS.spanner_delete, _FLAGS.dry_run) diff --git a/tools/statvar_importer/eval_functions.py b/tools/statvar_importer/eval_functions.py index 44cec2f665..3b0e425c6c 100644 --- a/tools/statvar_importer/eval_functions.py +++ b/tools/statvar_importer/eval_functions.py @@ -37,6 +37,7 @@ from datetime import datetime import re +import zlib from absl import logging import dateutil @@ -109,6 +110,18 @@ def str_to_camel_case(input_string: str, return ''.join( [w[0].upper() + w[1:] for w in clean_str.split(' ') if len(w) > 0]) +def crc32(input_string: str) -> str: + """Computes the CRC32 hash of a string + + Args: + input_string: The string to be hashed. + + Returns: + The CRC32 hash of the string. + """ + if not isinstance(input_string, str): + input_string = str(input_string) + return str(zlib.crc32(input_string.encode('utf-8'))) # A dictionary of functions and modules that are safe to use in `eval()`. # This dictionary acts as a safelist, defining the execution environment for @@ -136,6 +149,10 @@ def str_to_camel_case(input_string: str, # - `re.sub`: The 'sub' function for string substitution. 're': re, 're_sub': re.sub, + + # Hash functions: + # - `crc32`: The 'crc32' function for stable hash ids generation. + 'crc32': crc32, } From d6de40a928749f6b2b2ba9a986cdf336a6d6030b Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Tue, 21 Apr 2026 20:55:11 +0530 Subject: [PATCH 7/9] batch mutations across rows --- ...panner_table_config_ingestion_history.json | 11 + .../spanner_upload_statvar_observations.py | 407 +++++++++++++----- tools/statvar_importer/eval_functions.py | 2 + 3 files changed, 306 insertions(+), 114 deletions(-) create mode 100644 tools/spanner_graph/spanner_table_config_ingestion_history.json diff --git a/tools/spanner_graph/spanner_table_config_ingestion_history.json b/tools/spanner_graph/spanner_table_config_ingestion_history.json new file mode 100644 index 0000000000..4f0ceb9630 --- /dev/null +++ b/tools/spanner_graph/spanner_table_config_ingestion_history.json @@ -0,0 +1,11 @@ +{ + "comment": "Ingestion history table config to update stale read timestamp after an import""DefaultVariables": { + "importName": "", + "user": "" + }, + "SpannerTables": { + "IngestionHistory": { + "WorkflowExecutionID": "import-{importName}-{user}" + } + } +} \ No newline at end of file diff --git a/tools/spanner_graph/spanner_upload_statvar_observations.py b/tools/spanner_graph/spanner_upload_statvar_observations.py index 725e7e92e3..56757decd3 100644 --- a/tools/spanner_graph/spanner_upload_statvar_observations.py +++ b/tools/spanner_graph/spanner_upload_statvar_observations.py @@ -24,6 +24,7 @@ import file_util import mcf_file_util +flags.DEFINE_string("spanner_project", "datcom-store", "Spanner Instance ID.") flags.DEFINE_string("spanner_instance_id", "dc-kg-test", "Spanner Instance ID.") flags.DEFINE_string("spanner_database_id", "dc_graph_2026_01_27", "Spanner Database ID.") @@ -36,6 +37,7 @@ flags.DEFINE_boolean("dry_run", False, "Dry run mode.") flags.DEFINE_boolean("spanner_debug", False, "Spanner debug mode.") flags.DEFINE_boolean("spanner_delete", False, "Delete data from Spanner.") +flags.DEFINE_integer("spanner_batch_size", 1000, "Number of input rows to process as a single spanner batch update.") # Map from input csv to spanner graph table columns _DEFAULT_TABLE_CONFIG = { @@ -46,100 +48,193 @@ "unit": "", "observationPeriod": "", "scalingFactor": "", + "observationAbout": "", + "footnote": "", }, # Local variables reused in multiple tables "LocalVariables": { - "FacetId": "=crc32('{provenance}-{measurementMethod}-{observationPeriod}-{unit}-{scalingFactor}')", + # "FacetId": "=crc32('{provenance}-{measurementMethod}-{observationPeriod}-{unit}-{scalingFactor}')", + "FacetId": "{provenance}-{measurementMethod}-{observationPeriod}-{unit}-{scalingFactor}", }, "SpannerTables": { - # TimeSeries Table with the schema: - # id STRING(1024) NOT NULL, - # variable_measured STRING(1024) NOT NULL, - # provenance STRING(1024) NOT NULL, - "TimeSeries": { - "id": - "{variableMeasured};{donor};{recipient};{FacetId}", - "variable_measured": - "{variableMeasured}", - "provenance": - "{provenance}", - }, - # TimeSeriesAttribute Table with facet properties - # id STRING(1024) NOT NULL, - # property STRING(1024) NOT NULL, - # value STRING(1024) NOT NULL, - "TimeSeriesAttribute": [ - { - "id": - "{TimeSeries_id}", - "property": - "facetId", - "value": - "{provenance};{donor};{recipient};{unit};{observationPeriod};{measurementMethod}", - }, - { - "id": "{TimeSeries_id}", - "property": "importName", - "value": "{provenance}", - }, - { - "id": "{TimeSeries_id}", - "property": "recipient", - "value": "{recipient}", - }, - { - "id": "{TimeSeries_id}", - "property": "donor", - "value": "{donor}", - }, - { - "id": "{TimeSeries_id}", - "property": "measurementMethod", - "value": "{measurementMethod}", - }, - { - "id": "{TimeSeries_id}", - "property": "unit", - "value": "{unit}", - }, - ], - # StatVarObservation - # id STRING(1024) NOT NULL, - # date STRING(32) NOT NULL, - # value STRING(1024) NOT NULL, - "StatVarObservation": { - "id": "{TimeSeries_id}", - "date": "{observationDate}", - "value": "{value}", - }, + # TimeSeries Table with the schema: + # id STRING(1024) NOT NULL, + # variable_measured STRING(1024) NOT NULL, + # provenance STRING(1024) NOT NULL, + "TimeSeries": { + "id": + "{variableMeasured};{donor};{recipient};{FacetId}", + "variable_measured": + "{variableMeasured}", + "provenance": + "{provenance}", + }, + # TimeSeriesAttribute Table with facet properties + # id STRING(1024) NOT NULL, + # property STRING(1024) NOT NULL, + # value STRING(1024) NOT NULL, + "TimeSeriesAttribute": [ + { + "id": + "{TimeSeries_id}", + "property": + "facetId", + "value": + "{provenance};{donor};{recipient};{unit};{observationPeriod};{measurementMethod}", + }, + { + "id": "{TimeSeries_id}", + "property": "importName", + "value": "{provenance}", + }, + { + "id": "{TimeSeries_id}", + "property": "recipient", + "value": "{recipient}", + }, + { + "id": "{TimeSeries_id}", + "property": "donor", + "value": "{donor}", + }, + { + "id": "{TimeSeries_id}", + "property": "measurementMethod", + "value": "{measurementMethod}", + }, + { + "id": "{TimeSeries_id}", + "property": "unit", + "value": "{unit}", + }, + ], + # StatVarObservation + # id STRING(1024) NOT NULL, + # date STRING(32) NOT NULL, + # value STRING(1024) NOT NULL, + "StatVarObservation": { + "id": "{TimeSeries_id}", + "date": "{observationDate}", + "value": "{value}", + }, - # ObservationAttribute ( - # id STRING(1024) NOT NULL, - # date STRING(32) NOT NULL, - # property STRING(1024) NOT NULL, - # value STRING(1024) NOT NULL, - "ObservationAttribute": [{ - "id": "{StatVarObservation_id}", - "date": "{observationDate}", - "property": "footnote", - "value": "{footnote}", - },], + # ObservationAttribute ( + # id STRING(1024) NOT NULL, + # date STRING(32) NOT NULL, + # property STRING(1024) NOT NULL, + # value STRING(1024) NOT NULL, + #"ObservationAttribute": [{ + # "id": "{StatVarObservation_id}", + # "date": "{observationDate}", + # "property": "footnote", + # "value": "{footnote}", + #}], } } class SpannerStatVarObservationsUploader: + """Upload statvar observations to Spanner. + + Uses a table config file to determine how to map input CSV columns to Spanner tables. + The table config file is a JSON file with the following structure: + { + "DefaultVariables": { + # List of default property:values for each input row + "variableMeasured": "", + "provenance": "", + "unit": "", + "observationPeriod": "", + "scalingFactor": "", + }, + "LocalVariables": { + # Additional local varaiables computed form teh input row used in tables + "FacetId": "=crc32('{provenance}-{measurementMethod}-{observationPeriod}-{unit}-{scalingFactor}')", + }, + "SpannerTables": { + # Mappings per spanner table, in order of insertion. + # Each mapping can be a single dict or a list of dicts for tables with multiple rows per input row. + # A variable is also created for each properoty in each table in the form {TableName}_{PropertyName} + # This can be referenced in other tables using {_} + "TimeSeries": { + "id": "{variableMeasured};{donor};{recipient};{FacetId}", + "variable_measured": "{variableMeasured}", + "provenance": "{provenance}", + }, + "TimeSeriesAttribute": [ + { + "id": "{TimeSeries_id}", + "property": "facetId", + "value": "{provenance};{donor};{recipient};{unit};{observationPeriod};{measurementMethod}", + }, + { + "id": "{TimeSeries_id}", + "property": "importName", + "value": "{provenance}", + }, + { + "id": "{TimeSeries_id}", + "property": "observationAbout", + "value": "{observationAbout}", + }, + { + "id": "{TimeSeries_id}", + "property": "measurementMethod", + "value": "{measurementMethod}", + }, + { + "id": "{TimeSeries_id}", + "property": "unit", + "value": "{unit}", + }, + { + "id": "{TimeSeries_id}", + "property": "observationPeriod", + "value": "{observationPeriod}", + }, + { + "id": "{TimeSeries_id}", + "property": "scalingFactor", + "value": "{scalingFactor}", + } + ], + "StatVarObservation": { + "id": "{TimeSeries_id}", + "date": "{observationDate}", + "value": "{value}", + }, + "ObservationAttribute": [ + { + "id": "{StatVarObservation_id}", + "date": "{observationDate}", + "property": "errorMargin", + "value": "{errorMargin}", + }, + { + "id": "{StatVarObservation_id}", + "date": "{observationDate}", + "property": "footnote", + "value": "{footnote}", + }, + ], + } + } + """ def __init__(self, + project_id, instance_id, database_id, table_config_file: str = "", default_values: list = [], + batch_size: int = 1000, dry_run: bool = False, counters=None): + self._project_id = project_id self._instance_id = instance_id self._database_id = database_id + self._spanner_batch_size = batch_size self._table_config_file = table_config_file self._default_values = {} if isinstance(default_values, str): @@ -147,9 +242,9 @@ def __init__(self, if default_values: # Parse default values as prop=value for default_value in default_values: - key, value = default_value.split("=") + key, value = default_value.split("=", 1) self._default_values[key.strip()] = value.strip() - self._spanner_client = spanner.Client() + self._spanner_client = spanner.Client(project=self._project_id) self._instance = self._spanner_client.instance(instance_id) self._database = self._instance.database(database_id) self._counters = counters or Counters() @@ -157,18 +252,25 @@ def __init__(self, self._table_config = None self.load_spanner_config(self._table_config_file) - def load_spanner_config(self, table_config_file): + # Cache of keys already inserted into spanner + self._inserted_keys = set() + logging.info(f'Created spanner uploader for {instance_id}:{database_id} with table config: {self._table_config}') + + def load_spanner_config(self, table_config_file:str) -> dict: if table_config_file: - with open(table_config_file, "r") as f: + with file_util.FileIO(table_config_file, "r") as f: self._table_config = json.load(f) else: self._table_config = _DEFAULT_TABLE_CONFIG + self._table_config_name = table_config_file + return self._table_config - def _get_default_variables(self, row: dict ): + def _get_default_variables(self, row: dict = None) -> dict: def_vars = self._table_config.get("DefaultVariables", {}) def_vars.update(self._default_values) - def_vars.update(row) + if row: + def_vars.update(row) return def_vars def _get_local_variables(self, pvs: dict): @@ -178,8 +280,15 @@ def _get_local_variables(self, pvs: dict): pvs[var] = value return pvs - def process_input_row(self, row: dict): - """Process a single row from the input CSV and generate Spanner mutations.""" + def process_input_row(self, row: dict, batch): + """Process a single row from the input CSV and generate Spanner mutations. + + Args: + row: A dictionary representing a single row from the input CSV. + + Returns: + A list of Spanner mutations. + """ mutations = [] pvs = self._get_default_variables(row) pvs = self._get_local_variables(pvs) @@ -199,6 +308,7 @@ def process_input_row(self, row: dict): table_row[col] = value pvs[f'{table}_{col}'] = value elif isinstance(table_cfg, list): + # Handle list of templates for the same table table_row = [] for index, tpl in enumerate(table_cfg): sub_table_row = {} @@ -207,55 +317,117 @@ def process_input_row(self, row: dict): value = _build_value_from_template(template, pvs) if value: sub_table_row[col] = value - pvs_key = f'{table}{col}' + pvs_key = f'{table}_{col}' if pvs_key in pvs: pvs_key = f'{table}_{col}_{index}' - pvs[pvs_key] = value + pvs[pvs_key] = value table_row.append(sub_table_row) mutations.append((table, table_row)) logging.debug(f'Generated table from {row}: {mutations}') self._counters.add_counter('spanner-mutations-generated', 1) + return self._insert_spanner_mutations(batch, mutations, row) + + def _insert_spanner_mutations(self, batch, mutations, input_row: dict) -> int: + """Insert mutations into Spanner. + Args: + mutations: List of mutations to insert. + input_row: Input row from the CSV. + + Returns: + Number of mutations inserted. + """ num_mutations = 0 - if not self._dry_run: - with self.database.batch() as batch: - for table, table_row in mutations: - if isinstance(table_row, list): - for row in table_row: - batch.insert(table=table, - columns=row.keys(), - values=[row.values()]) - num_mutations += len(row.keys()) - self._counters.add_counter( - f'spanner-rows-inserted-{table}', 1) - else: - batch.insert(table=table, - columns=table_row.keys(), - values=[table_row.values()]) + if self._dry_run: + return num_mutations + for table, table_row in mutations: + if isinstance(table_row, list): + for row in table_row: + if self._insert_spanner_batch_row(batch, table, row): + num_mutations += len(row.keys()) + else: + if self._insert_spanner_batch_row(batch, table, table_row): num_mutations += len(table_row.keys()) - self._counters.add_counter(f'spanner-rows-inserted-{table}', - 1) return num_mutations - def process_input_file(self, input_file: str): + def _insert_spanner_batch_row(self, batch, table:str, table_row:dict) -> bool: + """Insert a single row into the batch. + + Args: + batch: Batch to insert into. + table: Table name. + table_row: Row to insert. + + Returns: + True if the row was inserted, False otherwise. + """ + key = _build_key(table, table_row) + if key in self._inserted_keys: + self._counters.add_counter( + f'spanner-inserts-skipped-{table}', 1) + return False + self._inserted_keys.add(key) + batch.insert(table=table, + columns=table_row.keys(), + values=[table_row.values()]) + self._counters.add_counter(f'spanner-rows-inserted-{table}', 1) + return True + + + def process_input_file(self, input_file: str) -> int: + """Process an input file and insert mutations into Spanner. + + Args: + input_file: Input file to process. + + Returns: + Number of mutations inserted into Spanner tables. + """ input_files = file_util.file_get_matching(input_file) num_mutations = 0 for infile in input_files: self._counters.add_counter('spanner-input-files', 1) - logging.info(f'Processing input file: {infile}') with file_util.FileIO(infile, 'r') as f: - self._counters.add_counter( - 'total', file_util.file_estimate_num_rows(infile)) + estimated_rows = file_util.file_estimate_num_rows(infile) + self._counters.add_counter('total', estimated_rows) + logging.info(f'Processing input file: {infile} with estimated rows: {estimated_rows}') reader = csv.DictReader(f) - for row in reader: - for prop in row.keys(): - row[prop] = mcf_file_util.strip_namespace(row[prop]) - num_mutations += self.process_input_row(row) - self._counters.add_counter('processed', 1) + batch_count = 0 + while self.process_input_batch(reader): + batch_count += 1 + self._counters.add_counter('spanner-batch-count', 1) + logging.info(f'Processed batch:{batch_count} of {self._spanner_batch_size} rows for {infile}') + logging.info(f'Processed input file: {infile} in {batch_count} batches') + if not input_files: + # Process an empty row for any default variables + num_mutations += self.process_input_row({}, None) + self._counters.add_counter('processed', 1) logging.info( f'Added {num_mutations} rows from {len(input_files)} files to spanner database {self._instance_id}.{self._database_id}' ) + + def process_input_batch(self, csv_reader) -> bool: + """Process a batch of inputs from a file. + + Returns: + True if the input is not completely processed and can be called again. + """ + num_mutations = 0 + with self._database.batch() as batch: + num_input_rows = 0 + while num_input_rows < self._spanner_batch_size: + row = next(csv_reader, None) + if row is None: + # End of the input. + return False + for prop in row.keys(): + row[prop] = mcf_file_util.strip_namespace(row[prop]) + num_mutations += self.process_input_row(row, batch) + num_input_rows += 1 + self._counters.add_counter('processed', 1) + return True + def delete_statvar_observations(self): """Delete data from all tables for a given provenance.""" pvs = self._get_default_variables() @@ -263,7 +435,7 @@ def delete_statvar_observations(self): if not self._dry_run: for table, template in self._table_config.get("SpannerTables", {}).items(): if 'provenance' in template: - row_ct = self.database.execute_partitioned_dml( + row_ct = self._database.execute_partitioned_dml( f"DELETE FROM {table} WHERE provenance = '{provenance}'" ) logging.info(f"Bulk deleted {row_ct} records from {table}.") @@ -284,13 +456,16 @@ def _build_value_from_template(template: str, row: dict) -> str: logging.error(f"KeyError: {e} in template: {template}, row: {row}") raise e +def _build_key(table:str, table_row:dict) -> int: + """Build a key for a table row.""" + return hash(tuple(table_row.values())) -def spanner_upload_statvar_observations(instance_id, database_id, +def spanner_upload_statvar_observations(project, instance_id, database_id, table_config_file, default_values, - input_file, delete, dry_run): - uploader = SpannerStatVarObservationsUploader(instance_id, database_id, + input_file, delete, batch_size, dry_run): + uploader = SpannerStatVarObservationsUploader(project,instance_id, database_id, table_config_file, - default_values, dry_run) + default_values, batch_size, dry_run) if delete: uploader.delete_statvar_observations() if input_file: @@ -301,13 +476,17 @@ def spanner_upload_statvar_observations(instance_id, database_id, def main(_): if _FLAGS.spanner_debug: logging.set_verbosity(logging.DEBUG) - spanner_upload_statvar_observations(_FLAGS.spanner_instance_id, + counters = spanner_upload_statvar_observations( + _FLAGS.spanner_project, + _FLAGS.spanner_instance_id, _FLAGS.spanner_database_id, _FLAGS.spanner_table_config_file, _FLAGS.spanner_default_values, _FLAGS.spanner_input_file, _FLAGS.spanner_delete, + _FLAGS.spanner_batch_size, _FLAGS.dry_run) + counters.print_counters() if __name__ == "__main__": diff --git a/tools/statvar_importer/eval_functions.py b/tools/statvar_importer/eval_functions.py index 3b0e425c6c..cdb3e8b296 100644 --- a/tools/statvar_importer/eval_functions.py +++ b/tools/statvar_importer/eval_functions.py @@ -110,6 +110,7 @@ def str_to_camel_case(input_string: str, return ''.join( [w[0].upper() + w[1:] for w in clean_str.split(' ') if len(w) > 0]) + def crc32(input_string: str) -> str: """Computes the CRC32 hash of a string @@ -123,6 +124,7 @@ def crc32(input_string: str) -> str: input_string = str(input_string) return str(zlib.crc32(input_string.encode('utf-8'))) + # A dictionary of functions and modules that are safe to use in `eval()`. # This dictionary acts as a safelist, defining the execution environment for # the `evaluate_statement` function. By controlling the available globals, From ed8036f8d008a6eaeb1c96487eeafabd81b0d9f1 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Wed, 22 Apr 2026 14:29:13 +0530 Subject: [PATCH 8/9] batch limit by cell mutations --- .../spanner_upload_statvar_observations.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/tools/spanner_graph/spanner_upload_statvar_observations.py b/tools/spanner_graph/spanner_upload_statvar_observations.py index 56757decd3..563cae922a 100644 --- a/tools/spanner_graph/spanner_upload_statvar_observations.py +++ b/tools/spanner_graph/spanner_upload_statvar_observations.py @@ -53,8 +53,9 @@ }, # Local variables reused in multiple tables "LocalVariables": { - # "FacetId": "=crc32('{provenance}-{measurementMethod}-{observationPeriod}-{unit}-{scalingFactor}')", - "FacetId": "{provenance}-{measurementMethod}-{observationPeriod}-{unit}-{scalingFactor}", + #"FacetId": "=md5(f'{provenance}-{measurementMethod}-{observationPeriod}-{unit}-{scalingFactor}')", + "FacetId": "=crc32(f'{provenance}-{measurementMethod}-{observationPeriod}-{unit}-{scalingFactor}')", + #"FacetId": "{provenance}-{measurementMethod}-{observationPeriod}-{unit}-{scalingFactor}", }, "SpannerTables": { @@ -287,7 +288,7 @@ def process_input_row(self, row: dict, batch): row: A dictionary representing a single row from the input CSV. Returns: - A list of Spanner mutations. + The number of Spanner mutations for cells. """ mutations = [] pvs = self._get_default_variables(row) @@ -396,11 +397,12 @@ def process_input_file(self, input_file: str) -> int: while self.process_input_batch(reader): batch_count += 1 self._counters.add_counter('spanner-batch-count', 1) - logging.info(f'Processed batch:{batch_count} of {self._spanner_batch_size} rows for {infile}') + logging.info(f'Processed batch:{batch_count} of {self._spanner_batch_size} mutations for {infile}') logging.info(f'Processed input file: {infile} in {batch_count} batches') if not input_files: # Process an empty row for any default variables num_mutations += self.process_input_row({}, None) + self._counters.add_counter('spanner-mutations-cells', num_mutations) self._counters.add_counter('processed', 1) logging.info( f'Added {num_mutations} rows from {len(input_files)} files to spanner database {self._instance_id}.{self._database_id}' @@ -413,10 +415,10 @@ def process_input_batch(self, csv_reader) -> bool: Returns: True if the input is not completely processed and can be called again. """ - num_mutations = 0 with self._database.batch() as batch: + num_mutations = 0 num_input_rows = 0 - while num_input_rows < self._spanner_batch_size: + while num_mutations < self._spanner_batch_size: row = next(csv_reader, None) if row is None: # End of the input. @@ -425,6 +427,7 @@ def process_input_batch(self, csv_reader) -> bool: row[prop] = mcf_file_util.strip_namespace(row[prop]) num_mutations += self.process_input_row(row, batch) num_input_rows += 1 + self._counters.add_counter('spanner-mutations', num_mutations) self._counters.add_counter('processed', 1) return True @@ -448,8 +451,9 @@ def _build_value_from_template(template: str, row: dict) -> str: """Build a value from a template string and a row dictionary.""" try: if template.startswith('='): - variable, value = evaluate_statement(template[1:], row) - row[variable] = value + variable, value = evaluate_statement(template, row) + if variable: + row[variable] = value return value return template.format(**row) except KeyError as e: From 8302590f8731f81a7f710e6dadb000596d8f65b5 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Wed, 22 Apr 2026 14:29:56 +0530 Subject: [PATCH 9/9] md5 hash eval --- tools/statvar_importer/eval_functions.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tools/statvar_importer/eval_functions.py b/tools/statvar_importer/eval_functions.py index cdb3e8b296..3363204ee2 100644 --- a/tools/statvar_importer/eval_functions.py +++ b/tools/statvar_importer/eval_functions.py @@ -37,6 +37,7 @@ from datetime import datetime import re +import hashlib import zlib from absl import logging @@ -124,6 +125,19 @@ def crc32(input_string: str) -> str: input_string = str(input_string) return str(zlib.crc32(input_string.encode('utf-8'))) +def md5(input_string: str) -> str: + """Computes the MD5 hash of a string + + Args: + input_string: The string to be hashed. + + Retu rns: + The MD5 hash of the string. + """ + if not isinstance(input_string, str): + input_string = str(input_string) + return hashlib.md5(input_string.encode()).hexdigest() + # A dictionary of functions and modules that are safe to use in `eval()`. # This dictionary acts as a safelist, defining the execution environment for @@ -155,6 +169,7 @@ def crc32(input_string: str) -> str: # Hash functions: # - `crc32`: The 'crc32' function for stable hash ids generation. 'crc32': crc32, + 'md5': md5, }