Skip to content
52 changes: 33 additions & 19 deletions datacontract/engines/data_contract_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,9 @@ def check_property_regex(

def check_row_count(model_name: str, threshold: str, quoting_config: QuotingConfig = QuotingConfig()):
check_type = "row_count"
if "%" in threshold:
logger.warning("Row count threshold cannot be specified as a percentage.")
Comment thread
HenrikSpiegel marked this conversation as resolved.
return None
check_key = f"{model_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
Expand Down Expand Up @@ -625,10 +628,11 @@ def check_model_duplicate_values(
check_type = "model_duplicate_values"
check_key = f"{model_name}__{check_type}"
col_joined = ", ".join(_quote_field_name(col, quoting_config) for col in cols)
metric = "duplicate_count" if not threshold.endswith("%") else "duplicate_percent"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
{
f"duplicate_count({col_joined}) {threshold}": {"name": check_key},
f"{metric}({col_joined}) {threshold}": {"name": check_key},
}
],
}
Expand All @@ -637,7 +641,7 @@ def check_model_duplicate_values(
key=check_key,
category="quality",
type=check_type,
name=f"Check that model {model_name} has duplicate_count {threshold} for columns {col_joined}",
name=f"Check that model {model_name} has {metric} {threshold} for columns {col_joined}",
model=model_name,
field=None,
engine="soda",
Expand All @@ -653,10 +657,11 @@ def check_property_duplicate_values(

check_type = "field_duplicate_values"
check_key = f"{model_name}__{field_name}__{check_type}"
metric = "duplicate_count" if not threshold.endswith("%") else "duplicate_percent"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
{
f"duplicate_count({field_name_for_soda}) {threshold}": {
f"{metric}({field_name_for_soda}) {threshold}": {
"name": check_key,
},
}
Expand All @@ -667,7 +672,7 @@ def check_property_duplicate_values(
key=check_key,
category="quality",
type=check_type,
name=f"Check that field {field_name} has duplicate_count {threshold}",
name=f"Check that field {field_name} has {metric} {threshold}",
model=model_name,
field=field_name,
engine="soda",
Expand All @@ -683,10 +688,11 @@ def check_property_null_values(

check_type = "field_null_values"
check_key = f"{model_name}__{field_name}__{check_type}"
metric = "missing_count" if not threshold.endswith("%") else "missing_percent"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
{
f"missing_count({field_name_for_soda}) {threshold}": {
f"{metric}({field_name_for_soda}) {threshold}": {
"name": check_key,
},
}
Expand All @@ -697,7 +703,7 @@ def check_property_null_values(
key=check_key,
category="quality",
type=check_type,
name=f"Check that field {field_name} has missing_count {threshold}",
name=f"Check that field {field_name} has {metric} {threshold}",
model=model_name,
field=field_name,
engine="soda",
Expand All @@ -724,11 +730,11 @@ def check_property_invalid_values(

if valid_values is not None:
sodacl_check_config["valid values"] = _escape_sql_string_values(valid_values)

metric = "invalid_count" if not threshold.endswith("%") else "invalid_percent"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
{
f"invalid_count({field_name_for_soda}) {threshold}": sodacl_check_config,
f"{metric}({field_name_for_soda}) {threshold}": sodacl_check_config,
}
],
}
Expand All @@ -737,7 +743,7 @@ def check_property_invalid_values(
key=check_key,
category="quality",
type=check_type,
name=f"Check that field {field_name} has invalid_count {threshold}",
name=f"Check that field {field_name} has {metric} {threshold}",
model=model_name,
field=field_name,
engine="soda",
Expand Down Expand Up @@ -767,10 +773,11 @@ def check_property_missing_values(
if filtered_missing_values:
sodacl_check_config["missing values"] = _escape_sql_string_values(filtered_missing_values)

metric = "missing_count" if not threshold.endswith("%") else "missing_percent"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
{
f"missing_count({field_name_for_soda}) {threshold}": sodacl_check_config,
f"{metric}({field_name_for_soda}) {threshold}": sodacl_check_config,
}
],
}
Expand All @@ -779,7 +786,7 @@ def check_property_missing_values(
key=check_key,
category="quality",
type=check_type,
name=f"Check that field {field_name} has missing_count {threshold}",
name=f"Check that field {field_name} has {metric} {threshold}",
model=model_name,
field=field_name,
engine="soda",
Expand Down Expand Up @@ -961,32 +968,39 @@ def prepare_query(


def to_sodacl_threshold(quality: DataQuality) -> str | None:
if quality.unit is not None and quality.unit.lower() not in ("rows", "percent"):
logger.warning(
f"Unsupported quality.unit ={quality.unit} in quality check, must be 'rows' (default) or 'percent'"
)
return None
threshold_suffix = "%" if (quality.unit or "").lower() == "percent" else ""

if quality.mustBe is not None:
Comment thread
HenrikSpiegel marked this conversation as resolved.
return f"= {quality.mustBe}"
return f"= {quality.mustBe}{threshold_suffix}"
if quality.mustNotBe is not None:
return f"!= {quality.mustNotBe}"
return f"!= {quality.mustNotBe}{threshold_suffix}"
if quality.mustBeGreaterThan is not None:
return f"> {quality.mustBeGreaterThan}"
return f"> {quality.mustBeGreaterThan}{threshold_suffix}"
if quality.mustBeGreaterOrEqualTo is not None:
return f">= {quality.mustBeGreaterOrEqualTo}"
return f">= {quality.mustBeGreaterOrEqualTo}{threshold_suffix}"
if quality.mustBeLessThan is not None:
return f"< {quality.mustBeLessThan}"
return f"< {quality.mustBeLessThan}{threshold_suffix}"
if quality.mustBeLessOrEqualTo is not None:
return f"<= {quality.mustBeLessOrEqualTo}"
return f"<= {quality.mustBeLessOrEqualTo}{threshold_suffix}"
if quality.mustBeBetween is not None:
if len(quality.mustBeBetween) != 2:
logger.warning(
f"Quality check has invalid mustBeBetween, must have exactly 2 integers in an array: {quality.mustBeBetween}"
)
return None
return f"between {quality.mustBeBetween[0]} and {quality.mustBeBetween[1]}"
return f"between {quality.mustBeBetween[0]}{threshold_suffix} and {quality.mustBeBetween[1]}{threshold_suffix}"
if quality.mustNotBeBetween is not None:
if len(quality.mustNotBeBetween) != 2:
logger.warning(
f"Quality check has invalid mustNotBeBetween, must have exactly 2 integers in an array: {quality.mustNotBeBetween}"
)
return None
return f"not between {quality.mustNotBeBetween[0]} and {quality.mustNotBeBetween[1]}"
return f"not between {quality.mustNotBeBetween[0]}{threshold_suffix} and {quality.mustNotBeBetween[1]}{threshold_suffix}"
return None


Expand Down
148 changes: 148 additions & 0 deletions tests/test_data_contract_checks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import logging

import pytest
import yaml
from open_data_contract_standard.model import (
DataQuality,
Expand All @@ -17,13 +20,16 @@
check_property_is_present,
check_property_missing_values,
check_property_not_equal,
check_property_null_values,
check_property_required,
check_property_type,
check_property_unique,
check_row_count,
create_checks,
prepare_query,
to_schema_checks,
to_sla_freshness_check,
to_sodacl_threshold,
)


Expand Down Expand Up @@ -616,3 +622,145 @@ def test_to_schema_checks_databricks_struct_with_nested_varchar_skips_type_check
types = [c.type for c in checks]
assert "field_is_present" in types
assert "field_type" not in types


# --- Tests for to_sodacl_threshold function ---


@pytest.mark.parametrize(
"unit,quality_kwargs,expected",
[
# Test percent unit appends %
("percent", {"mustBe": 5}, "= 5%"),
("percent", {"mustBeGreaterThan": 10}, "> 10%"),
("percent", {"mustBeGreaterOrEqualTo": 10}, ">= 10%"),
("percent", {"mustBeLessThan": 20}, "< 20%"),
("percent", {"mustBeLessOrEqualTo": 30}, "<= 30%"),
("percent", {"mustNotBe": 0}, "!= 0%"),
("percent", {"mustBeBetween": [5, 15]}, "between 5% and 15%"),
("percent", {"mustNotBeBetween": [80, 100]}, "not between 80% and 100%"),
# Test case insensitivity
("PERCENT", {"mustBeLessThan": 10}, "< 10%"),
("Percent", {"mustBe": 15}, "= 15%"),
# Test rows unit does not append %
("rows", {"mustBe": 100}, "= 100"),
("rows", {"mustBeGreaterThan": 50}, "> 50"),
# Test default (None) does not append %
(None, {"mustBe": 50}, "= 50"),
(None, {"mustBeLessThan": 75}, "< 75"),
],
)
def test_to_sodacl_threshold_with_units(unit, quality_kwargs, expected):
"""Test that to_sodacl_threshold correctly handles different units and operators."""
quality = DataQuality(unit=unit, **quality_kwargs)
result = to_sodacl_threshold(quality)
assert result == expected


def test_to_sodacl_threshold_invalid_unit(caplog):
"""Test that invalid unit returns None and raises a warning."""
quality = DataQuality(unit="invalid", mustBe=5)
with caplog.at_level(logging.WARNING):
result = to_sodacl_threshold(quality)

assert result is None
assert "Unsupported quality.unit" in caplog.text


# --- Tests for unit handling in check_* functions ---


@pytest.mark.parametrize(
"threshold,expected_metric,expected_check",
[
("< 5%", "missing_percent", "missing_percent(status) < 5%"),
("< 10", "missing_count", "missing_count(status) < 10"),
],
)
def test_check_property_null_values_metric_selection(threshold, expected_metric, expected_check):
"""Test that check_property_null_values selects correct metric based on threshold."""
check = check_property_null_values("orders", "status", threshold)

assert check.model == "orders"
assert check.field == "status"
assert check.type == "field_null_values"
assert expected_metric in check.name

impl = yaml.safe_load(check.implementation)
checks = impl["checks for orders"]
check_keys = list(checks[0].keys())
assert check_keys == [expected_check]


@pytest.mark.parametrize(
"threshold,expected_metric,expected_check",
[
("= 0%", "invalid_percent", "invalid_percent(status) = 0%"),
("= 0", "invalid_count", "invalid_count(status) = 0"),
],
)
def test_check_property_invalid_values_metric_selection(threshold, expected_metric, expected_check):
"""Test that check_property_invalid_values selects correct metric based on threshold."""
check = check_property_invalid_values("orders", "status", threshold, valid_values=["active", "inactive"])

assert check.model == "orders"
assert check.field == "status"
assert check.type == "field_invalid_values"
assert expected_metric in check.name

impl = yaml.safe_load(check.implementation)
checks = impl["checks for orders"]
check_keys = list(checks[0].keys())
assert check_keys == [expected_check]


@pytest.mark.parametrize(
"threshold,expected_metric,expected_check",
[
("<= 2%", "missing_percent", "missing_percent(status) <= 2%"),
("<= 5", "missing_count", "missing_count(status) <= 5"),
],
)
def test_check_property_missing_values_metric_selection(threshold, expected_metric, expected_check):
"""Test that check_property_missing_values selects correct metric based on threshold."""
check = check_property_missing_values("orders", "status", threshold, missing_values=["n/a", "null"])

assert check.model == "orders"
assert check.field == "status"
assert check.type == "field_missing_values"
assert expected_metric in check.name

impl = yaml.safe_load(check.implementation)
checks = impl["checks for orders"]
check_keys = list(checks[0].keys())
assert check_keys == [expected_check]


@pytest.mark.parametrize(
"threshold",
[
"> 100%",
"between 10% and 50%",
],
)
def test_check_row_count_with_percent_returns_none(threshold, caplog):
"""Test that check_row_count returns None when threshold contains '%'."""
with caplog.at_level(logging.WARNING):
result = check_row_count("orders", threshold, QuotingConfig())
assert result is None
assert "Row count threshold cannot be specified as a percentage." in caplog.text


def test_check_row_count_without_percent_works():
"""Test that check_row_count works normally without '%'."""
check = check_row_count("orders", "> 1000", QuotingConfig())

assert check is not None
assert check.model == "orders"
assert check.type == "row_count"

# Parse the implementation to verify the SodaCL check
impl = yaml.safe_load(check.implementation)
checks = impl["checks for orders"]
check_keys = list(checks[0].keys())
assert check_keys == ["row_count > 1000"]