diff --git a/integration_tests/tests/test_column_anomalies.py b/integration_tests/tests/test_column_anomalies.py index 75651c009..429e2c435 100644 --- a/integration_tests/tests/test_column_anomalies.py +++ b/integration_tests/tests/test_column_anomalies.py @@ -605,3 +605,109 @@ def test_col_anom_excl_detect_train(test_id: str, dbt_project: DbtProject): "Expected FAIL when exclude_detection_period_from_training=True " "(detection data excluded from training baseline, anomaly detected)" ) + + +def test_col_excl_detect_train_seven_day_bucket(test_id: str, dbt_project: DbtProject): + """ + Test exclude_detection_period_from_training with 7-day buckets for column anomalies. + + This tests the fix where the detection period is set to the bucket size + when the bucket period exceeds backfill_days. With 7-day buckets + and default backfill_days (2), without the fix the 2-day exclusion window + cannot contain any 7-day bucket_end, making exclusion ineffective. + + detection_period is intentionally NOT set so that backfill_days stays at + its default (2), which is smaller than the 7-day bucket. + Setting detection_period would override backfill_days and mask the bug. + + Scenario: + - 12 normal 7-day buckets with low null count + - 1 anomalous 7-day bucket with high null count + - time_bucket: 7 days (7 days >> default backfill_days of 2) + - Without exclusion: anomaly absorbed into training → test passes + - With exclusion + fix: anomaly excluded from training → test fails + """ + utc_now = datetime.utcnow().date() + anomaly_bucket_start = utc_now - timedelta(days=7) + normal_bucket_start = anomaly_bucket_start - timedelta(days=12 * 7) + + normal_data: List[Dict[str, Any]] = [] + day = normal_bucket_start + day_idx = 0 + while day < anomaly_bucket_start: + null_count = 1 + (day_idx % 3) + normal_data.extend( + [ + {TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT), "superhero": superhero} + for superhero in ["Superman", "Batman", "Wonder Woman", "Flash"] + ] + ) + normal_data.extend( + [ + {TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT), "superhero": None} + for _ in range(null_count) + ] + ) + day += timedelta(days=1) + day_idx += 1 + + anomalous_data: List[Dict[str, Any]] = [] + day = anomaly_bucket_start + while day < utc_now: + anomalous_data.extend( + [ + {TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT), "superhero": superhero} + for superhero in ["Superman", "Batman", "Wonder Woman", "Flash"] + ] + ) + anomalous_data.extend( + [ + {TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT), "superhero": None} + for _ in range(10) + ] + ) + day += timedelta(days=1) + + all_data = normal_data + anomalous_data + + test_args_without_exclusion = { + "timestamp_column": TIMESTAMP_COLUMN, + "column_anomalies": ["null_count"], + "time_bucket": {"period": "day", "count": 7}, + "training_period": {"period": "day", "count": 91}, + "min_training_set_size": 5, + "anomaly_sensitivity": 10, + "anomaly_direction": "spike", + "exclude_detection_period_from_training": False, + } + + test_result_without = dbt_project.test( + test_id + "_f", + DBT_TEST_NAME, + test_args_without_exclusion, + data=all_data, + test_column="superhero", + test_vars={"force_metrics_backfill": True}, + ) + assert test_result_without["status"] == "pass", ( + "Expected PASS when exclude_detection_period_from_training=False " + "(detection data included in training baseline)" + ) + + test_args_with_exclusion = { + **test_args_without_exclusion, + "exclude_detection_period_from_training": True, + } + + test_result_with = dbt_project.test( + test_id + "_t", + DBT_TEST_NAME, + test_args_with_exclusion, + data=all_data, + test_column="superhero", + test_vars={"force_metrics_backfill": True}, + ) + assert test_result_with["status"] == "fail", ( + "Expected FAIL when exclude_detection_period_from_training=True " + "(large bucket fix: detection period set to bucket size)" + ) diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index c73c10d57..ded0fd207 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -583,3 +583,84 @@ def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject): assert ( test_result_with_exclusion["status"] == "fail" ), "Test should fail when anomaly is excluded from training" + + +def test_excl_detect_train_seven_day_bucket(test_id: str, dbt_project: DbtProject): + """ + Test exclude_detection_period_from_training with 7-day buckets. + + This tests the fix where the detection period is set to the bucket size + when the bucket period exceeds backfill_days. With 7-day buckets + and default backfill_days (2), without the fix the 2-day exclusion window + cannot contain any 7-day bucket_end, making exclusion ineffective. + + detection_period is intentionally NOT set so that backfill_days stays at + its default (2), which is smaller than the 7-day bucket. + Setting detection_period would override backfill_days and mask the bug. + + Scenario: + - 12 normal 7-day buckets + - 1 anomalous 7-day bucket + - time_bucket: 7 days (7 days >> default backfill_days of 2) + - Without exclusion: anomaly absorbed into training → test passes + - With exclusion + fix: anomaly excluded from training → test fails + """ + utc_now = datetime.utcnow() + utc_today = utc_now.date() + anomaly_bucket_start = utc_today - timedelta(days=7) + normal_bucket_start = anomaly_bucket_start - timedelta(days=12 * 7) + + normal_data = [] + day = normal_bucket_start + day_idx = 0 + while day < anomaly_bucket_start: + rows_per_day = 8 + (day_idx % 3) + normal_data.extend( + [{TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT)} for _ in range(rows_per_day)] + ) + day += timedelta(days=1) + day_idx += 1 + + anomalous_data = [] + day = anomaly_bucket_start + while day < utc_today: + anomalous_data.extend( + [{TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT)} for _ in range(50)] + ) + day += timedelta(days=1) + + all_data = normal_data + anomalous_data + + test_args_without_exclusion = { + **DBT_TEST_ARGS, + "training_period": {"period": "day", "count": 91}, + "time_bucket": {"period": "day", "count": 7}, + "sensitivity": 10, + } + + test_result_without = dbt_project.test( + test_id + "_without", + DBT_TEST_NAME, + test_args_without_exclusion, + data=all_data, + test_vars={"force_metrics_backfill": True}, + ) + assert ( + test_result_without["status"] == "pass" + ), "Test should pass when anomaly is included in training" + + test_args_with_exclusion = { + **test_args_without_exclusion, + "exclude_detection_period_from_training": True, + } + + test_result_with = dbt_project.test( + test_id + "_with", + DBT_TEST_NAME, + test_args_with_exclusion, + data=all_data, + test_vars={"force_metrics_backfill": True}, + ) + assert ( + test_result_with["status"] == "fail" + ), "Test should fail when anomaly is excluded from training (large bucket fix)" diff --git a/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql b/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql index 180c53e8e..e94fb564e 100644 --- a/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql +++ b/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql @@ -60,6 +60,21 @@ {%- set backfill_days = elementary.detection_period_to_backfill_days( detection_period, backfill_days, model_graph_node ) -%} + {%- if metric_props.time_bucket %} + {%- set bucket_in_days = elementary.convert_period( + metric_props.time_bucket, "day" + ).count %} + {%- if bucket_in_days > backfill_days %} + {%- do elementary.edr_log( + "backfill_days increased from " + ~ backfill_days + ~ " to " + ~ bucket_in_days + ~ " to match time bucket size." + ) %} + {%- set backfill_days = bucket_in_days %} + {%- endif %} + {%- endif %} {%- set fail_on_zero = elementary.get_test_argument( "fail_on_zero", fail_on_zero, model_graph_node ) %}