[DCP Ingestion]#503
Conversation
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| UnusedCode | 6 medium |
| ErrorProne | 6 high |
| Security | 3 medium |
| CodeStyle | 17 minor |
| Complexity | 4 medium |
🟢 Metrics 42 complexity · 0 duplication
Metric Results Complexity 42 Duplication 0
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Code Review
This pull request introduces a new 'DCP Bridge' mode to the ingestion tool, which facilitates bridging the gap between custom data resolution and the production platform by exporting data to sharded JSON-LD files. The changes include a new exporter module, documentation, and logic to automatically trigger Google Cloud Workflows for data ingestion. The review feedback highlights several important improvements: replacing hardcoded GCP resource fallbacks with environment-driven configuration, adopting parameterized SQL queries to mitigate injection risks in the exporter, and refining exception handling when triggering external workflows to avoid silent failures.
| spanner_instance = os.getenv("GCP_SPANNER_INSTANCE_ID", "gabe-test-dcp-instance") | ||
| spanner_database = os.getenv("GCP_SPANNER_DATABASE_NAME", "gabe-test-dcp-db-v2") | ||
| workflow_name = os.getenv("WORKFLOW_NAME", "gabe-test-ingestion-orchestrator") | ||
| project_id = os.getenv("PROJECT_ID", "datcom-website-dev") | ||
| location = os.getenv("WORKFLOW_LOCATION", "us-central1") | ||
| temp_location = os.getenv("TEMP_LOCATION", "gs://gabe-test-ingestion-bucket-datcom-website-dev/temp") | ||
| region = os.getenv("REGION", "us-central1") |
There was a problem hiding this comment.
| offset = 0 | ||
| while True: | ||
| triples_tuples = db.engine.fetch_all( | ||
| f"SELECT subject_id, predicate, object_id, object_value FROM triples LIMIT {chunk_size} OFFSET {offset}" |
There was a problem hiding this comment.
Using f-strings to construct SQL queries is generally discouraged as it can lead to SQL injection vulnerabilities if the variables are ever sourced from user input. Although chunk_size and offset are currently integers, it is better practice to use parameterized queries.
| f"SELECT subject_id, predicate, object_id, object_value FROM triples LIMIT {chunk_size} OFFSET {offset}" | |
| "SELECT subject_id, predicate, object_id, object_value FROM triples LIMIT ? OFFSET ?", (chunk_size, offset) |
| obs_counter = 0 | ||
| while True: | ||
| obs_tuples = db.engine.fetch_all( | ||
| f"SELECT entity, variable, date, value, provenance, unit, scaling_factor, measurement_method, observation_period, properties FROM observations LIMIT {chunk_size} OFFSET {offset}" |
There was a problem hiding this comment.
Using f-strings to construct SQL queries is generally discouraged. Please use parameterized queries to ensure safety and follow best practices.
| f"SELECT entity, variable, date, value, provenance, unit, scaling_factor, measurement_method, observation_period, properties FROM observations LIMIT {chunk_size} OFFSET {offset}" | |
| "SELECT entity, variable, date, value, provenance, unit, scaling_factor, measurement_method, observation_period, properties FROM observations LIMIT ? OFFSET ?", (chunk_size, offset) |
| except Exception as e: | ||
| logging.error(f"Error triggering workflow via API: {e}") |
There was a problem hiding this comment.
Catching a broad Exception and only logging it can hide unexpected errors and make debugging difficult. Consider catching more specific exceptions related to the API call (e.g., requests.exceptions.RequestException) or re-raising the exception after logging to ensure the failure is not silently ignored by the caller.
No description provided.