Skip to content
52 changes: 33 additions & 19 deletions datacontract/engines/data_contract_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,9 @@ def check_property_regex(

def check_row_count(model_name: str, threshold: str, quoting_config: QuotingConfig = QuotingConfig()):
check_type = "row_count"
if "%" in threshold:
logger.warning("Row count threshold cannot be specified as a percentage.")
Comment on lines +600 to +601
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the warning, we should not create the check (Soda would ignore the percent sign and only use the integer, which is not what the user is intending)

return None
check_key = f"{model_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
Expand Down Expand Up @@ -625,10 +628,11 @@ def check_model_duplicate_values(
check_type = "model_duplicate_values"
check_key = f"{model_name}__{check_type}"
col_joined = ", ".join(_quote_field_name(col, quoting_config) for col in cols)
metric = "duplicate_count" if not threshold.endswith("%") else "duplicate_percent"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
{
f"duplicate_count({col_joined}) {threshold}": {"name": check_key},
f"{metric}({col_joined}) {threshold}": {"name": check_key},
}
],
}
Expand All @@ -637,7 +641,7 @@ def check_model_duplicate_values(
key=check_key,
category="quality",
type=check_type,
name=f"Check that model {model_name} has duplicate_count {threshold} for columns {col_joined}",
name=f"Check that model {model_name} has {metric} {threshold} for columns {col_joined}",
model=model_name,
field=None,
engine="soda",
Expand All @@ -653,10 +657,11 @@ def check_property_duplicate_values(

check_type = "field_duplicate_values"
check_key = f"{model_name}__{field_name}__{check_type}"
metric = "duplicate_count" if not threshold.endswith("%") else "duplicate_percent"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
{
f"duplicate_count({field_name_for_soda}) {threshold}": {
f"{metric}({field_name_for_soda}) {threshold}": {
"name": check_key,
},
}
Expand All @@ -667,7 +672,7 @@ def check_property_duplicate_values(
key=check_key,
category="quality",
type=check_type,
name=f"Check that field {field_name} has duplicate_count {threshold}",
name=f"Check that field {field_name} has {metric} {threshold}",
model=model_name,
field=field_name,
engine="soda",
Expand All @@ -683,10 +688,11 @@ def check_property_null_values(

check_type = "field_null_values"
check_key = f"{model_name}__{field_name}__{check_type}"
metric = "missing_count" if not threshold.endswith("%") else "missing_percent"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
{
f"missing_count({field_name_for_soda}) {threshold}": {
f"{metric}({field_name_for_soda}) {threshold}": {
"name": check_key,
},
}
Expand All @@ -697,7 +703,7 @@ def check_property_null_values(
key=check_key,
category="quality",
type=check_type,
name=f"Check that field {field_name} has missing_count {threshold}",
name=f"Check that field {field_name} has {metric} {threshold}",
model=model_name,
field=field_name,
engine="soda",
Expand All @@ -724,11 +730,11 @@ def check_property_invalid_values(

if valid_values is not None:
sodacl_check_config["valid values"] = _escape_sql_string_values(valid_values)

metric = "invalid_count" if not threshold.endswith("%") else "invalid_percent"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
{
f"invalid_count({field_name_for_soda}) {threshold}": sodacl_check_config,
f"{metric}({field_name_for_soda}) {threshold}": sodacl_check_config,
}
],
}
Expand All @@ -737,7 +743,7 @@ def check_property_invalid_values(
key=check_key,
category="quality",
type=check_type,
name=f"Check that field {field_name} has invalid_count {threshold}",
name=f"Check that field {field_name} has {metric} {threshold}",
model=model_name,
field=field_name,
engine="soda",
Expand Down Expand Up @@ -767,10 +773,11 @@ def check_property_missing_values(
if filtered_missing_values:
sodacl_check_config["missing values"] = _escape_sql_string_values(filtered_missing_values)

metric = "missing_count" if not threshold.endswith("%") else "missing_percent"
sodacl_check_dict = {
checks_for(model_name, quoting_config, check_type): [
{
f"missing_count({field_name_for_soda}) {threshold}": sodacl_check_config,
f"{metric}({field_name_for_soda}) {threshold}": sodacl_check_config,
}
],
}
Expand All @@ -779,7 +786,7 @@ def check_property_missing_values(
key=check_key,
category="quality",
type=check_type,
name=f"Check that field {field_name} has missing_count {threshold}",
name=f"Check that field {field_name} has {metric} {threshold}",
model=model_name,
field=field_name,
engine="soda",
Expand Down Expand Up @@ -961,32 +968,39 @@ def prepare_query(


def to_sodacl_threshold(quality: DataQuality) -> str | None:
if quality.unit is not None and quality.unit.lower() not in ("rows", "percent"):
logger.warning(
f"Unsupported quality.unit ={quality.unit} in quality check, must be 'rows' (default) or 'percent'"
)
return None
threshold_suffix = "%" if (quality.unit or "").lower() == "percent" else ""

if quality.mustBe is not None:
Copy link
Copy Markdown
Author

@HenrikSpiegel HenrikSpiegel May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussion:

I initially implemented this going for minimal changes and ensuring the changes matched the style of the repository.

I would however personally prefer to re-write the function using a operator mapping:

arg_to_op = {
  'mustBe' : '=',
  'mustNotBe': '!='
}
for arg, op in arg_to_op.items():
  if val := getattr(arg, quality) is not none:
    return f"{op} {val}{threshold_suffix}"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get your point, but this won't work out for the 'between' cases, right?
The reason for this repetitive style is that it is easier to understand the code with all relevant cases, whereas a mapping as you proposed has more mental overhead to read. Of course this is a trade-off, since copying things like the {threshold_suffix} you added can be error-prone as well. But in this case, I'd vote for keeping it as is.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely fair - I'll let it be.

You are right wrt. to the between operator that would require handling as a special case.
Since the function pattern is using early return - one would just have to add an if-statement for between after the loop.

return f"= {quality.mustBe}"
return f"= {quality.mustBe}{threshold_suffix}"
if quality.mustNotBe is not None:
return f"!= {quality.mustNotBe}"
return f"!= {quality.mustNotBe}{threshold_suffix}"
if quality.mustBeGreaterThan is not None:
return f"> {quality.mustBeGreaterThan}"
return f"> {quality.mustBeGreaterThan}{threshold_suffix}"
if quality.mustBeGreaterOrEqualTo is not None:
return f">= {quality.mustBeGreaterOrEqualTo}"
return f">= {quality.mustBeGreaterOrEqualTo}{threshold_suffix}"
if quality.mustBeLessThan is not None:
return f"< {quality.mustBeLessThan}"
return f"< {quality.mustBeLessThan}{threshold_suffix}"
if quality.mustBeLessOrEqualTo is not None:
return f"<= {quality.mustBeLessOrEqualTo}"
return f"<= {quality.mustBeLessOrEqualTo}{threshold_suffix}"
if quality.mustBeBetween is not None:
if len(quality.mustBeBetween) != 2:
logger.warning(
f"Quality check has invalid mustBeBetween, must have exactly 2 integers in an array: {quality.mustBeBetween}"
)
return None
return f"between {quality.mustBeBetween[0]} and {quality.mustBeBetween[1]}"
return f"between {quality.mustBeBetween[0]}{threshold_suffix} and {quality.mustBeBetween[1]}{threshold_suffix}"
if quality.mustNotBeBetween is not None:
if len(quality.mustNotBeBetween) != 2:
logger.warning(
f"Quality check has invalid mustNotBeBetween, must have exactly 2 integers in an array: {quality.mustNotBeBetween}"
)
return None
return f"not between {quality.mustNotBeBetween[0]} and {quality.mustNotBeBetween[1]}"
return f"not between {quality.mustNotBeBetween[0]}{threshold_suffix} and {quality.mustNotBeBetween[1]}{threshold_suffix}"
return None


Expand Down
148 changes: 148 additions & 0 deletions tests/test_data_contract_checks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import logging

import pytest
import yaml
from open_data_contract_standard.model import (
DataQuality,
Expand All @@ -17,13 +20,16 @@
check_property_is_present,
check_property_missing_values,
check_property_not_equal,
check_property_null_values,
check_property_required,
check_property_type,
check_property_unique,
check_row_count,
create_checks,
prepare_query,
to_schema_checks,
to_sla_freshness_check,
to_sodacl_threshold,
)


Expand Down Expand Up @@ -616,3 +622,145 @@ def test_to_schema_checks_databricks_struct_with_nested_varchar_skips_type_check
types = [c.type for c in checks]
assert "field_is_present" in types
assert "field_type" not in types


# --- Tests for to_sodacl_threshold function ---


@pytest.mark.parametrize(
"unit,quality_kwargs,expected",
[
# Test percent unit appends %
("percent", {"mustBe": 5}, "= 5%"),
("percent", {"mustBeGreaterThan": 10}, "> 10%"),
("percent", {"mustBeGreaterOrEqualTo": 10}, ">= 10%"),
("percent", {"mustBeLessThan": 20}, "< 20%"),
("percent", {"mustBeLessOrEqualTo": 30}, "<= 30%"),
("percent", {"mustNotBe": 0}, "!= 0%"),
("percent", {"mustBeBetween": [5, 15]}, "between 5% and 15%"),
("percent", {"mustNotBeBetween": [80, 100]}, "not between 80% and 100%"),
# Test case insensitivity
("PERCENT", {"mustBeLessThan": 10}, "< 10%"),
("Percent", {"mustBe": 15}, "= 15%"),
# Test rows unit does not append %
("rows", {"mustBe": 100}, "= 100"),
("rows", {"mustBeGreaterThan": 50}, "> 50"),
# Test default (None) does not append %
(None, {"mustBe": 50}, "= 50"),
(None, {"mustBeLessThan": 75}, "< 75"),
],
)
def test_to_sodacl_threshold_with_units(unit, quality_kwargs, expected):
"""Test that to_sodacl_threshold correctly handles different units and operators."""
quality = DataQuality(unit=unit, **quality_kwargs)
result = to_sodacl_threshold(quality)
assert result == expected


def test_to_sodacl_threshold_invalid_unit(caplog):
"""Test that invalid unit returns None and raises a warning."""
quality = DataQuality(unit="invalid", mustBe=5)
with caplog.at_level(logging.WARNING):
result = to_sodacl_threshold(quality)

assert result is None
assert "Unsupported quality.unit" in caplog.text


# --- Tests for unit handling in check_* functions ---


@pytest.mark.parametrize(
"threshold,expected_metric,expected_check",
[
("< 5%", "missing_percent", "missing_percent(status) < 5%"),
("< 10", "missing_count", "missing_count(status) < 10"),
],
)
def test_check_property_null_values_metric_selection(threshold, expected_metric, expected_check):
"""Test that check_property_null_values selects correct metric based on threshold."""
check = check_property_null_values("orders", "status", threshold)

assert check.model == "orders"
assert check.field == "status"
assert check.type == "field_null_values"
assert expected_metric in check.name

impl = yaml.safe_load(check.implementation)
checks = impl["checks for orders"]
check_keys = list(checks[0].keys())
assert check_keys == [expected_check]


@pytest.mark.parametrize(
"threshold,expected_metric,expected_check",
[
("= 0%", "invalid_percent", "invalid_percent(status) = 0%"),
("= 0", "invalid_count", "invalid_count(status) = 0"),
],
)
def test_check_property_invalid_values_metric_selection(threshold, expected_metric, expected_check):
"""Test that check_property_invalid_values selects correct metric based on threshold."""
check = check_property_invalid_values("orders", "status", threshold, valid_values=["active", "inactive"])

assert check.model == "orders"
assert check.field == "status"
assert check.type == "field_invalid_values"
assert expected_metric in check.name

impl = yaml.safe_load(check.implementation)
checks = impl["checks for orders"]
check_keys = list(checks[0].keys())
assert check_keys == [expected_check]


@pytest.mark.parametrize(
"threshold,expected_metric,expected_check",
[
("<= 2%", "missing_percent", "missing_percent(status) <= 2%"),
("<= 5", "missing_count", "missing_count(status) <= 5"),
],
)
def test_check_property_missing_values_metric_selection(threshold, expected_metric, expected_check):
"""Test that check_property_missing_values selects correct metric based on threshold."""
check = check_property_missing_values("orders", "status", threshold, missing_values=["n/a", "null"])

assert check.model == "orders"
assert check.field == "status"
assert check.type == "field_missing_values"
assert expected_metric in check.name

impl = yaml.safe_load(check.implementation)
checks = impl["checks for orders"]
check_keys = list(checks[0].keys())
assert check_keys == [expected_check]


@pytest.mark.parametrize(
"threshold",
[
"> 100%",
"between 10% and 50%",
],
)
def test_check_row_count_with_percent_returns_none(threshold, caplog):
"""Test that check_row_count returns None when threshold contains '%'."""
with caplog.at_level(logging.WARNING):
result = check_row_count("orders", threshold, QuotingConfig())
assert result is None
assert "Row count threshold cannot be specified as a percentage." in caplog.text


def test_check_row_count_without_percent_works():
"""Test that check_row_count works normally without '%'."""
check = check_row_count("orders", "> 1000", QuotingConfig())

assert check is not None
assert check.model == "orders"
assert check.type == "row_count"

# Parse the implementation to verify the SodaCL check
impl = yaml.safe_load(check.implementation)
checks = impl["checks for orders"]
check_keys = list(checks[0].keys())
assert check_keys == ["row_count > 1000"]