Skip to content

Replace withColumn with withColumns in data quality templates#93

Open
dgokeeffe wants to merge 1 commit intomainfrom
fix/use-withColumns-over-withColumn
Open

Replace withColumn with withColumns in data quality templates#93
dgokeeffe wants to merge 1 commit intomainfrom
fix/use-withColumns-over-withColumn

Conversation

@dgokeeffe
Copy link
Copy Markdown
Collaborator

Summary

  • Replaces repeated withColumn calls with single withColumns dict calls in data_quality.py.j2 and data_quality_quarantine.py.j2
  • Batches independent column additions into single withColumns calls for better Spark performance
  • Fixes the Jinja loop pattern to emit a dict instead of sequential reassignments

Fixes #92

Test plan

  • Verify generated Python from data_quality.py.j2 uses withColumns({...}) syntax
  • Verify generated Python from data_quality_quarantine.py.j2 uses withColumns({...}) in all applicable spots
  • Run existing unit/e2e tests to confirm no regressions

🤖 Generated with Claude Code

Fixes #92 — uses a single withColumns dict call instead of repeated
withColumn calls in loops and chains, improving both performance and
generated code readability.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@codecov
Copy link
Copy Markdown

codecov bot commented Mar 25, 2026

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
2889 1 2888 1
View the top 1 failed test(s) by shortest run time
tests/test_transform_operational_metadata.py::TestTransformOperationalMetadata::test_data_quality_transform_with_operational_metadata
Stack Traces | 0.01s run time
self = <test_transform_operational_metadata.TestTransformOperationalMetadata object at 0x7f3cf2205bb0>
project_config_with_metadata = ProjectConfig(name='test_project', version='1.0', description=None, author=None, created_date=None, include=None, oper...imports=None, enabled=True)}, presets=None, defaults=None), event_log=None, monitoring=None, required_lhp_version=None)
flowgroup_with_metadata = FlowGroup(pipeline='test_pipeline', flowgroup='test_flowgroup', job_name=None, variables=None, presets=[], use_template=None, template_parameters=None, actions=[], operational_metadata=['_processing_timestamp', '_batch_id'])

    def test_data_quality_transform_with_operational_metadata(self, project_config_with_metadata, flowgroup_with_metadata):
        """Test data quality transform generator with operational metadata."""
        generator = DataQualityTransformGenerator()
    
        action = Action(
            name="test_dq_transform",
            type="transform",
            target="v_test_quality",
            source="v_customers",
            expectations_file="test_expectations.json",
            readMode="stream",
            description="Test DQ transform"
        )
    
        # Create a temporary expectations file
        with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
            f.write('{"email_not_null": {"action": "fail"}}')
            expectations_file = f.name
    
        try:
            action.expectations_file = expectations_file
    
            context = {
                "project_config": project_config_with_metadata,
                "flowgroup": flowgroup_with_metadata,
                "preset_config": {},
                "spec_dir": Path(".")
            }
    
            code = generator.generate(action, context)
    
            # Verify basic structure
            assert "@dp.temporary_view()" in code
            assert "v_test_quality" in code
            assert "spark.readStream.table" in code  # stream mode
            assert "return df" in code
    
            # Verify operational metadata is added
            assert "# Add operational metadata columns" in code
>           assert "df = df.withColumn('_batch_id'" in code
E           assert "df = df.withColumn('_batch_id'" in '@dp.temporary_view()\n# These expectations will fail the pipeline if violated\n@dp.expect_all_or_fail({"email_not_nul...': F.monotonically_increasing_id(),        \'_processing_timestamp\': F.current_timestamp()    })\n    \n    return df'

tests/test_transform_operational_metadata.py:129: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Use withColumns instead of withColumn in data_quality.py.j2 template

1 participant