diff --git a/integration_tests/dbt_project/dbt_project.yml b/integration_tests/dbt_project/dbt_project.yml index bb2663282..1de92bbce 100644 --- a/integration_tests/dbt_project/dbt_project.yml +++ b/integration_tests/dbt_project/dbt_project.yml @@ -31,4 +31,4 @@ models: +file_format: "{{ 'delta' if target.type in ['spark', 'fabricspark'] else none }}" flags: - require_batched_execution_for_custom_microbatch_strategy: True \ No newline at end of file + require_batched_execution_for_custom_microbatch_strategy: True diff --git a/integration_tests/dbt_project/macros/get_anomaly_config.sql b/integration_tests/dbt_project/macros/get_anomaly_config.sql index c0faab2ff..12139702c 100644 --- a/integration_tests/dbt_project/macros/get_anomaly_config.sql +++ b/integration_tests/dbt_project/macros/get_anomaly_config.sql @@ -1,16 +1,28 @@ -{% macro get_anomaly_config(model_config, config) %} +{% macro get_anomaly_config( + model_config, config, source_meta_config=none, meta_config=none +) %} {{ return( - adapter.dispatch("get_anomaly_config", "elementary")(model_config, config) + adapter.dispatch("get_anomaly_config", "elementary")( + model_config, config, source_meta_config, meta_config + ) ) }} {% endmacro %} -{% macro default__get_anomaly_config(model_config, config) %} +{% macro default__get_anomaly_config( + model_config, config, source_meta_config=none, meta_config=none +) %} {% set mock_model = { "alias": "mock_model", "config": {"elementary": model_config}, } %} + {% if source_meta_config is not none %} + {% do mock_model.update({"source_meta": {"elementary": source_meta_config}}) %} + {% endif %} + {% if meta_config is not none %} + {% do mock_model.update({"meta": {"elementary": meta_config}}) %} + {% endif %} {# trick elementary into thinking this is the running model #} {% do context.update( { @@ -25,11 +37,19 @@ ) %} {% endmacro %} -{% macro spark__get_anomaly_config(model_config, config) %} +{% macro spark__get_anomaly_config( + model_config, config, source_meta_config=none, meta_config=none +) %} {% set mock_model = { "alias": "mock_model", "config": {"elementary": model_config}, } %} + {% if source_meta_config is not none %} + {% do mock_model.update({"source_meta": {"elementary": source_meta_config}}) %} + {% endif %} + {% if meta_config is not none %} + {% do mock_model.update({"meta": {"elementary": meta_config}}) %} + {% endif %} {# trick elementary into thinking this is the running model #} {% do context.update( { @@ -44,11 +64,19 @@ ) %} {% endmacro %} -{% macro clickhouse__get_anomaly_config(model_config, config) %} +{% macro clickhouse__get_anomaly_config( + model_config, config, source_meta_config=none, meta_config=none +) %} {% set mock_model = { "alias": "mock_model", "config": {"elementary": model_config}, } %} + {% if source_meta_config is not none %} + {% do mock_model.update({"source_meta": {"elementary": source_meta_config}}) %} + {% endif %} + {% if meta_config is not none %} + {% do mock_model.update({"meta": {"elementary": meta_config}}) %} + {% endif %} {# trick elementary into thinking this is the running model #} {% do context.update( { diff --git a/integration_tests/tests/test_anomaly_test_configuration.py b/integration_tests/tests/test_anomaly_test_configuration.py index 89fcd3291..d39ca37a7 100644 --- a/integration_tests/tests/test_anomaly_test_configuration.py +++ b/integration_tests/tests/test_anomaly_test_configuration.py @@ -131,3 +131,38 @@ def test_anomaly_test_configuration( ) adapted_config = json.loads(result[0]) assert adapted_config == expected_config + + +@pytest.mark.skip_for_dbt_fusion +def test_source_meta_config_is_picked_up(dbt_project: DbtProject): + """source-level meta.elementary is inherited when no model/test-level config overrides it.""" + result = dbt_project.dbt_runner.run_operation( + "elementary_tests.get_anomaly_config", + macro_args={ + "model_config": {}, + "config": {}, + "source_meta_config": { + "timestamp_column": "source_ts", + "where_expression": "is_deleted = false", + }, + }, + ) + config = json.loads(result[0]) + assert config["timestamp_column"] == "source_ts" + assert config["where_expression"] == "is_deleted = false" + + +@pytest.mark.skip_for_dbt_fusion +def test_model_meta_overrides_source_meta_config(dbt_project: DbtProject): + """Table-level meta.elementary takes precedence over source-level meta.elementary.""" + result = dbt_project.dbt_runner.run_operation( + "elementary_tests.get_anomaly_config", + macro_args={ + "model_config": {}, + "config": {}, + "source_meta_config": {"timestamp_column": "source_ts"}, + "meta_config": {"timestamp_column": "model_ts"}, + }, + ) + config = json.loads(result[0]) + assert config["timestamp_column"] == "model_ts" diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index dea35420a..60e49fb36 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -1,7 +1,6 @@ from contextlib import contextmanager import pytest - from dbt_project import DbtProject @@ -44,20 +43,30 @@ def _microbatch_model_sql(source_model_name: str) -> str: amount, order_date from {{ ref('__MICROBATCH_SOURCE_MODEL__') }} -""".replace("__MICROBATCH_SOURCE_MODEL__", source_model_name) +""".replace( + "__MICROBATCH_SOURCE_MODEL__", source_model_name + ) @contextmanager def _with_microbatch_test_models(dbt_project: DbtProject, model_suffix: str): source_model_name = f"mb_src_{model_suffix}" target_model_name = f"mb_tgt_{model_suffix}" - source_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{source_model_name}.sql") - target_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{target_model_name}.sql") + source_model_path = dbt_project.tmp_models_dir_path.joinpath( + f"{source_model_name}.sql" + ) + target_model_path = dbt_project.tmp_models_dir_path.joinpath( + f"{target_model_name}.sql" + ) source_model_path.write_text(_microbatch_source_model_sql()) target_model_path.write_text(_microbatch_model_sql(source_model_name)) - relative_source_model_path = source_model_path.relative_to(dbt_project.project_dir_path) - relative_target_model_path = target_model_path.relative_to(dbt_project.project_dir_path) + relative_source_model_path = source_model_path.relative_to( + dbt_project.project_dir_path + ) + relative_target_model_path = target_model_path.relative_to( + dbt_project.project_dir_path + ) try: yield relative_source_model_path, relative_target_model_path, target_model_name finally: @@ -75,9 +84,7 @@ def _run_microbatch_model_and_get_latest_success_result( model_path, target_model_name, ): - dbt_project.dbt_runner.run( - select=f"{source_model_path} {model_path}" - ) + dbt_project.dbt_runner.run(select=f"{source_model_path} {model_path}") unique_id = f"model.elementary_tests.{target_model_name}" run_results = dbt_project.read_table( @@ -91,14 +98,14 @@ def _run_microbatch_model_and_get_latest_success_result( @contextmanager def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str): - macro_path = ( - dbt_project.project_dir_path / "macros" / "microbatch.sql" - ) + macro_path = dbt_project.project_dir_path / "macros" / "microbatch.sql" macro_sql = """ {% macro __MACRO_NAME__(arg_dict) %} {{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }} {% endmacro %} -""".replace("__MACRO_NAME__", macro_name) +""".replace( + "__MACRO_NAME__", macro_name + ) if macro_path.exists(): raise FileExistsError(f"Expected no macro file at {macro_path}") @@ -110,7 +117,9 @@ def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str): macro_path.unlink() -@pytest.mark.skip_targets(["spark", "vertica", "bigquery", "athena", "clickhouse", "dremio"]) +@pytest.mark.skip_targets( + ["spark", "vertica", "bigquery", "athena", "clickhouse", "dremio"] +) @pytest.mark.skip_for_dbt_fusion @pytest.mark.parametrize( "macro_name,expected_compiled_code,model_suffix", @@ -134,10 +143,10 @@ def test_microbatch_run_results_compiled_code_behavior( ) assert run_results, "Expected a successful run result row for microbatch model" if expected_compiled_code: - assert run_results[0]["compiled_code"], ( - "Expected compiled_code to be populated when override macro is present" - ) + assert run_results[0][ + "compiled_code" + ], "Expected compiled_code to be populated when override macro is present" else: - assert not run_results[0]["compiled_code"], ( - "Expected compiled_code to stay empty when override macro is absent" - ) + assert not run_results[0][ + "compiled_code" + ], "Expected compiled_code to stay empty when override macro is absent" diff --git a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql index 856dd91cf..588c32cd5 100644 --- a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql +++ b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql @@ -31,14 +31,12 @@ model.get("unique_id") if model is mapping else model.unique_id ) | default(none, true) %} {% set model_compiled_code = ( - model.get("compiled_code") if model is mapping else model.compiled_code + model.get("compiled_code") + if model is mapping + else model.compiled_code ) | default(none, true) %} - {% if model_unique_id is none %} - {{ return(none) }} - {% endif %} - {% if not model_compiled_code %} - {{ return(none) }} - {% endif %} + {% if model_unique_id is none %} {{ return(none) }} {% endif %} + {% if not model_compiled_code %} {{ return(none) }} {% endif %} {% set compiled_code_by_unique_id = elementary.get_cache( "microbatch_compiled_code_by_unique_id" diff --git a/macros/utils/graph/get_compiled_code.sql b/macros/utils/graph/get_compiled_code.sql index 6ce92316e..80f78a578 100644 --- a/macros/utils/graph/get_compiled_code.sql +++ b/macros/utils/graph/get_compiled_code.sql @@ -5,7 +5,9 @@ "microbatch_compiled_code_by_unique_id", {} ).get(node.get("unique_id")) %} {% endif %} - {% set compiled_code = adapter.dispatch("format_compiled_code", "elementary")(compiled_code) %} + {% set compiled_code = adapter.dispatch("format_compiled_code", "elementary")( + compiled_code + ) %} {% set max_column_size = elementary.get_column_size() %} {% if as_column_value and max_column_size and compiled_code and compiled_code | length > max_column_size %} diff --git a/macros/utils/graph/get_elementary_config_from_node.sql b/macros/utils/graph/get_elementary_config_from_node.sql index 69f086797..8ac7e43db 100644 --- a/macros/utils/graph/get_elementary_config_from_node.sql +++ b/macros/utils/graph/get_elementary_config_from_node.sql @@ -1,25 +1,24 @@ -{% macro get_elementary_config_from_node(node) %} - {% set res = {} %} - {% set node_config = node.get("config") %} - {% if node_config %} - {% set elementary_config = node.config.get("elementary") %} +{% macro _merge_elementary_from_meta(res, meta_dict) %} + {% if meta_dict and meta_dict is mapping %} + {% set elementary_config = meta_dict.get("elementary") %} {% if elementary_config and elementary_config is mapping %} {% do res.update(elementary_config) %} {% endif %} - {% set config_meta = node.config.get("meta") %} - {% if config_meta and config_meta is mapping %} - {% set elementary_config = config_meta.get("elementary") %} - {% if elementary_config and elementary_config is mapping %} - {% do res.update(elementary_config) %} - {% endif %} - {% endif %} {% endif %} - {% set node_meta = node.get("meta") %} - {% if node_meta and node_meta is mapping %} - {% set elementary_config = node_meta.get("elementary") %} - {% if elementary_config and elementary_config is mapping %} - {% do res.update(elementary_config) %} +{% endmacro %} + +{% macro get_elementary_config_from_node(node) %} + {% set res = {} %} + {% set node_config = node.get("config") %} + {% if node_config %} + {# config.elementary is already the elementary config dict itself (not nested) #} + {% set config_elementary = node.config.get("elementary") %} + {% if config_elementary and config_elementary is mapping %} + {% do res.update(config_elementary) %} {% endif %} + {% do elementary._merge_elementary_from_meta(res, node.config.get("meta")) %} {% endif %} + {% do elementary._merge_elementary_from_meta(res, node.get("source_meta")) %} + {% do elementary._merge_elementary_from_meta(res, node.get("meta")) %} {{ return(res) }} {% endmacro %}