Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 52 additions & 25 deletions docs/source/feature_python_extensions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,53 +11,85 @@ Python Extensions

Overview
--------
Python Extensions allow data engineers to write custom Python modules that extend the framework's capabilities. Extensions are organized in a central ``extensions/`` directory and can be imported as standard Python modules throughout your dataflow specifications.
Python Extensions let you add custom Python code that integrates with the framework. There are two kinds of assets under ``src/extensions/``:

1. **Importable modules** (``extensions/libraries/``) — added to Python's path so Data Flow Specs can reference them with ``module`` syntax (sources, transforms, sinks).
2. **Init hook scripts** (``extensions/pre_init/`` and ``extensions/post_init/``) — optional ``.py`` files executed at pipeline initialization time (see `Init hooks`_).

.. important::

Extensions provide a powerful mechanism for implementing custom logic—sources, transforms, and sinks—while maintaining clean separation between framework code and business logic.
Extensions provide a powerful mechanism for implementing custom logic—sources, transforms, and sinks, while maintaining clean separation between framework code and business logic. Shared helpers belong in ``libraries/``; hook folders are for scripts that run once per pipeline update, not for packages you import from specs.

.. admonition:: Deprecation Notice
:class: warning

As of v0.10.0, importable modules moved from a flat ``src/extensions/`` directory to ``src/extensions/libraries/``. The flat layout still works but emits a **deprecation warning** when top-level ``*.py`` files exist under ``extensions/`` and ``libraries/`` is absent.

This feature allows development teams to:

- **Centralize Custom Logic**: Organize all custom Python code in one location
- **Reuse Across Dataflows**: Reference the same functions from multiple dataflow specs
- **Maintain Clean Imports**: Use standard Python module imports (e.g., ``transforms.my_function``)
- **Manage Dependencies**: Install additional Python packages via ``requirements_additional.txt``
- **Test Independently**: Extensions can be unit tested outside of Spark Declarative Pipelines
- **Centralize custom logic**: Organize importable code under ``extensions/libraries/``
- **Reuse across dataflows**: Reference the same functions from multiple dataflow specs
- **Maintain clean imports**: Use standard Python module imports (e.g., ``transforms.my_function``)
- **Run setup and teardown**: Use ``pre_init`` / ``post_init`` hooks for registration, Spark configuration, or ``@dp.on_event_hook`` without mixing them into library modules
- **Manage dependencies**: Install additional Python packages via ``requirements_additional.txt``
- **Test independently**: Extension libraries can be unit tested outside of Spark Declarative Pipelines

.. note::

Extensions are loaded during pipeline initialization when the framework adds the ``extensions/`` directory to the Python path. Any additional dependencies specified in ``requirements_additional.txt`` are installed before the pipeline starts.
The framework adds **``extensions/libraries/``** (under both the framework and the bundle) to ``sys.path`` during pipeline initialization. Init hooks are **not** on ``sys.path``; they are executed with ``runpy.run_path`` like ``python your_script.py``. Any additional dependencies specified in ``requirements_additional.txt`` are installed before the pipeline starts.

How It Works
------------

The extension system consists of three main components:
The extension system has these parts:

1. **Extensions Directory**: A ``src/extensions/`` folder in your pipeline bundle containing Python modules
2. **Module References**: Dataflow specs reference extension functions using ``module`` syntax (e.g., ``transforms.my_function``)
3. **Dependency Management**: Optional ``requirements_additional.txt`` files for installing pip packages
1. **Libraries directory** — ``src/extensions/libraries/`` in your pipeline bundle (and optionally under the framework bundle) holds importable Python modules.
2. **Init hook directories** — ``src/extensions/pre_init/`` and ``src/extensions/post_init/`` hold optional scripts run before and after SDP declarations inside ``DLTPipelineBuilder.initialize_pipeline()``.
3. **Module references** — Dataflow specs reference extension functions using ``module`` syntax (e.g., ``transforms.my_function``).
4. **Dependency management** — Optional ``requirements_additional.txt`` files for installing pip packages.

Directory Structure
^^^^^^^^^^^^^^^^^^^

Extensions live in the ``src/extensions/`` directory of your pipeline bundle:
Importable modules and hook scripts use this layout (framework bundle mirrors the same shape):

::

my_pipeline_bundle/
├── src/
│ ├── extensions/
│ │ ├── __init__.py # Optional, for package imports
│ │ ├── sources.py # Custom source functions
│ │ ├── transforms.py # Custom transform functions
│ │ └── sinks.py # Custom sink functions
│ │ ├── libraries/ # On sys.path — importable modules
│ │ │ ├── __init__.py # Optional
│ │ │ ├── sources.py # Custom source functions
│ │ │ ├── transforms.py # Custom transform functions
│ │ │ └── sinks.py # Custom sink functions
│ │ ├── pre_init/ # Optional — run before SDP table/view declarations
│ │ └── post_init/ # Optional — run after declarations (e.g. event hooks)
│ ├── dataflows/
│ │ └── ...
│ └── pipeline_configs/
│ └── ...
└── requirements_additional.txt # Optional pip dependencies

.. admonition:: Deprecation Notice
:class: warning

As of v0.10.0, importable modules moved from a flat ``src/extensions/`` directory to ``src/extensions/libraries/``. The flat layout still works but emits a **deprecation warning** when top-level ``*.py`` files exist under ``extensions/`` and ``libraries/`` is absent.

Init hooks
^^^^^^^^^^

- **pre_init**: Runs after the builder has loaded configs and dataflow specs (substitutions, secrets, operational metadata, Spark config from framework config) and **before** any ``DataFlow.create_dataflow()`` / SDP declarations.
- **post_init**: Runs **after** all dataflows for the pipeline have been created (the SDP graph is assembled; the pipeline update has not started yet).

Within each folder, scripts run in **sorted filename order**. Files whose names start with ``_`` are skipped. A hook that raises an exception fails the pipeline. Framework ``extensions/`` hooks run before bundle hooks at each phase.

Use **numeric prefixes** (e.g. ``01_setup.py``, ``02_register.py``) to fix order. Hooks may call ``pipeline_config.get_spark()``, ``get_logger()``, and other singletons directly.

.. seealso::

The repository file ``docs/extensions-and-init-hooks.md`` contains the same layout summary in Markdown.

Dependency Management
---------------------

Expand All @@ -73,7 +105,7 @@ Custom functions that generate DataFrames for use as data sources.


.. code-block:: python
:caption: src/extensions/sources.py
:caption: src/extensions/libraries/sources.py

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
Expand Down Expand Up @@ -173,11 +205,7 @@ Custom functions that transform DataFrames after they are read from a source.
**Example:**

.. code-block:: python
:caption: src/extensions/transforms.py

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from typing import Dict
:caption: src/extensions/libraries/transforms.py

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
Expand Down Expand Up @@ -283,7 +311,7 @@ Custom functions for ``foreach_batch_sink`` targets that process micro-batches.
**Example:**

.. code-block:: python
:caption: src/extensions/sinks.py
:caption: src/extensions/libraries/sinks.py

from pyspark.sql import DataFrame
from typing import Dict
Expand Down Expand Up @@ -376,4 +404,3 @@ Additional Resources
- :doc:`feature_python_functions` - Python transform functions (file path approach)
- :doc:`dataflow_spec_ref_source_details` - Complete source configuration reference
- :doc:`dataflow_spec_ref_target_details` - Complete target configuration reference

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Python source extensions for the bronze sample pipeline.

These functions are loaded via the pythonModule reference in dataflow specs
and are available because the extensions directory is added to sys.path
and are available because the extensions/libraries directory is added to sys.path
during pipeline initialization.
"""
from pyspark.sql import DataFrame, SparkSession
Expand All @@ -13,11 +13,11 @@
def get_customer_cdf(spark: SparkSession, tokens: Dict) -> DataFrame:
"""
Get customer data with Change Data Feed enabled.

Args:
spark: SparkSession instance
tokens: Dictionary of tokens from the dataflow spec

Returns:
DataFrame with customer data and a TEST_COLUMN added
"""
Expand All @@ -28,4 +28,3 @@ def get_customer_cdf(spark: SparkSession, tokens: Dict) -> DataFrame:

df = spark.readStream.options(**reader_options).table(source_table)
return df.withColumn("TEST_COLUMN", F.lit("testing from extension..."))

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Python transform extensions for the bronze sample pipeline.

These functions are loaded via the pythonTransform.module reference in dataflow specs
and are available because the extensions directory is added to sys.path
and are available because the extensions/libraries directory is added to sys.path
during pipeline initialization.

Transform functions receive a DataFrame and optionally tokens, and return a DataFrame.
Expand All @@ -15,12 +15,12 @@
def customer_aggregation(df: DataFrame) -> DataFrame:
"""
Apply customer aggregation transformation.

Groups by CUSTOMER_ID and counts records within a 10-minute watermark window.

Args:
df: Input DataFrame with customer data

Returns:
DataFrame with CUSTOMER_ID and COUNT columns
"""
Expand All @@ -34,24 +34,23 @@ def customer_aggregation(df: DataFrame) -> DataFrame:
def customer_aggregation_with_tokens(df: DataFrame, tokens: Dict) -> DataFrame:
"""
Apply customer aggregation transformation with configurable parameters.

Args:
df: Input DataFrame with customer data
tokens: Configuration tokens with:
- watermark_column: Column to use for watermark (default: load_timestamp)
- watermark_delay: Watermark delay duration (default: 10 minutes)
- group_by_column: Column to group by (default: CUSTOMER_ID)

Returns:
DataFrame with grouped counts
"""
watermark_column = tokens.get("watermarkColumn", "load_timestamp")
watermark_delay = tokens.get("watermarkDelay", "10 minutes")
group_by_column = tokens.get("groupByColumn", "CUSTOMER_ID")

return (
df.withWatermark(watermark_column, watermark_delay)
.groupBy(group_by_column)
.agg(F.count("*").alias("COUNT"))
)

20 changes: 20 additions & 0 deletions samples/bronze_sample/src/extensions/libraries/utility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import logging
import sys

def set_logger(logger_name: str, log_level: str = "INFO") -> logging.Logger:
"""Set up and return a logger with a specified name and log level."""
logger = logging.getLogger(logger_name)
log_level = getattr(logging, log_level, logging.INFO)
logger.setLevel(log_level)

# Clear existing handlers to avoid duplicate logging
if logger.hasHandlers():
logger.handlers.clear()

# Add a new handler
console_output_handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_output_handler.setFormatter(formatter)
logger.addHandler(console_output_handler)

return logger
18 changes: 18 additions & 0 deletions samples/bronze_sample/src/extensions/post_init/post_init_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
Quiet post_init sample: record context once at load time (no event hooks).
"""
from __future__ import annotations

import pipeline_config
from utility import set_logger

logger = set_logger("BronzeSample")
_details = pipeline_config.get_pipeline_details()

logger.info(
"post_init: bundle context catalog=%s schema=%s layer=%s env=%s",
_details.pipeline_catalog,
_details.pipeline_schema,
_details.pipeline_layer,
_details.workspace_env,
)
19 changes: 19 additions & 0 deletions samples/bronze_sample/src/extensions/pre_init/pre_init_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
Quiet pre_init sample: record context once at load time (no event hooks).
"""

from __future__ import annotations

import pipeline_config
from utility import set_logger

logger = set_logger("BronzeSample")
_details = pipeline_config.get_pipeline_details()

logger.info(
"pre_init: bundle context catalog=%s schema=%s layer=%s env=%s",
_details.pipeline_catalog,
_details.pipeline_schema,
_details.pipeline_layer,
_details.workspace_env,
)
16 changes: 14 additions & 2 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ class FrameworkPaths:

Attributes:
CONFIG_PATH (str): Path to the config directory.
EXTENSIONS_PATH (str): The path for extensions.
EXTENSIONS_PATH (str): The path for extensions (legacy flat layout; prefer EXTENSIONS_LIBRARIES_PATH).
EXTENSIONS_LIBRARIES_PATH (str): Importable Python modules (on sys.path).
PRE_INIT_HOOKS_PATH (str): Executable .py hooks before initialize_pipeline SDP declarations.
POST_INIT_HOOKS_PATH (str): Executable .py hooks after SDP declarations.
GLOBAL_CONFIG (tuple): Paths to the global configuration files.
GLOBAL_SUBSTITUTIONS (tuple): Paths to the global substitutions files.
GLOBAL_SECRETS (tuple): Paths to the global secrets files.
Expand All @@ -55,6 +58,9 @@ class FrameworkPaths:
"""
CONFIG_PATH: str = "./config"
EXTENSIONS_PATH: str = "./extensions"
EXTENSIONS_LIBRARIES_PATH: str = "./extensions/libraries"
PRE_INIT_HOOKS_PATH: str = "./extensions/pre_init"
POST_INIT_HOOKS_PATH: str = "./extensions/post_init"
GLOBAL_CONFIG: tuple = ("./config/global.json", "./config/global.yaml", "./config/global.yml")
GLOBAL_SUBSTITUTIONS: tuple = ("_substitutions.json", "_substitutions.yaml", "_substitutions.yml")
GLOBAL_SECRETS: tuple = ("_secrets.json", "_secrets.yaml", "_secrets.yml")
Expand Down Expand Up @@ -112,7 +118,10 @@ class PipelineBundlePaths:
DATAFLOW_SPEC_PATH (str): The path for dataflow specifications.
DML_PATH (str): The path for DML (Data Manipulation Language) files.
DQE_PATH (str): The path for data quality expectations.
EXTENSIONS_PATH (str): The path for extensions.
EXTENSIONS_PATH (str): The path for extensions (legacy flat layout; prefer EXTENSIONS_LIBRARIES_PATH).
EXTENSIONS_LIBRARIES_PATH (str): Importable Python modules (on sys.path).
PRE_INIT_HOOKS_PATH (str): Executable .py hooks before initialize_pipeline SDP declarations.
POST_INIT_HOOKS_PATH (str): Executable .py hooks after SDP declarations.
GLOBAL_CONFIG_FILE (tuple): The file names for global configuration files.
PIPELINE_CONFIGS_PATH (str): The path for pipeline configuration files.
PYTHON_FUNCTION_PATH (str): The path for python functions.
Expand All @@ -124,6 +133,9 @@ class PipelineBundlePaths:
DML_PATH: str = "./dml"
DQE_PATH: str = "./expectations"
EXTENSIONS_PATH: str = "./extensions"
EXTENSIONS_LIBRARIES_PATH: str = "./extensions/libraries"
PRE_INIT_HOOKS_PATH: str = "./extensions/pre_init"
POST_INIT_HOOKS_PATH: str = "./extensions/post_init"
GLOBAL_CONFIG_FILE: tuple = ("./global.json", "./global.yaml", "./global.yml")
PIPELINE_CONFIGS_PATH: str = "./pipeline_configs"
PYTHON_FUNCTION_PATH: str = "./python_functions"
Expand Down
2 changes: 1 addition & 1 deletion src/dataflow/sources/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SourcePython(OperationalMetadataMixin):

- functionPath: Path to a Python file containing a 'get_df' function
- pythonModule: Module.function reference (e.g., 'transforms.get_customer_data')
The module must be in the extensions directory (added to sys.path)
The module must be under extensions/libraries (added to sys.path)
- pythonFunction: Direct function reference (for internal framework use)

Attributes:
Expand Down
20 changes: 14 additions & 6 deletions src/dataflow_spec_builder/dataflow_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,24 +612,32 @@ def _resolve_python_function_path(self, filename: str, base_path: str, spec_data

Search order for regular specs:
1. base_path/python_functions/<filename>
2. bundle_path/extensions/python_functions/<filename>
3. framework_path/extensions/python_functions/<filename>
2. bundle_path/extensions/libraries/<filename>
3. framework_path/extensions/libraries/<filename>
4. bundle_path/extensions/<filename> (legacy)
5. framework_path/extensions/<filename> (legacy)

Search order for template-generated specs (adds one additional location):
1. base_path/python_functions/<filename>
2. bundle_path/templates/python_functions/<filename>
3. bundle_path/extensions/python_functions/<filename>
4. framework_path/extensions/python_functions/<filename>
3. bundle_path/extensions/libraries/<filename>
4. framework_path/extensions/libraries/<filename>
5. bundle_path/extensions/<filename> (legacy)
6. framework_path/extensions/<filename> (legacy)
"""
search_paths = {
"base dataflow directory":
os.path.join(base_path, PipelineBundlePaths.PYTHON_FUNCTION_PATH, filename),
"templates directory":
os.path.join(self.bundle_path, PipelineBundlePaths.TEMPLATE_PATH,
PipelineBundlePaths.PYTHON_FUNCTION_PATH, filename),
"bundle extensions directory":
"bundle extensions libraries directory":
os.path.join(self.bundle_path, PipelineBundlePaths.EXTENSIONS_LIBRARIES_PATH, filename),
"framework extensions libraries directory":
os.path.join(self.framework_path, FrameworkPaths.EXTENSIONS_LIBRARIES_PATH, filename),
"bundle extensions directory (legacy)":
os.path.join(self.bundle_path, PipelineBundlePaths.EXTENSIONS_PATH, filename),
"framework extensions directory":
"framework extensions directory (legacy)":
os.path.join(self.framework_path, FrameworkPaths.EXTENSIONS_PATH, filename),
}

Expand Down
Loading