diff --git a/docs/source/feature_python_extensions.rst b/docs/source/feature_python_extensions.rst index e3359c9..9c396ac 100644 --- a/docs/source/feature_python_extensions.rst +++ b/docs/source/feature_python_extensions.rst @@ -11,53 +11,85 @@ Python Extensions Overview -------- -Python Extensions allow data engineers to write custom Python modules that extend the framework's capabilities. Extensions are organized in a central ``extensions/`` directory and can be imported as standard Python modules throughout your dataflow specifications. +Python Extensions let you add custom Python code that integrates with the framework. There are two kinds of assets under ``src/extensions/``: + +1. **Importable modules** (``extensions/libraries/``) — added to Python's path so Data Flow Specs can reference them with ``module`` syntax (sources, transforms, sinks). +2. **Init hook scripts** (``extensions/pre_init/`` and ``extensions/post_init/``) — optional ``.py`` files executed at pipeline initialization time (see `Init hooks`_). .. important:: - Extensions provide a powerful mechanism for implementing custom logic—sources, transforms, and sinks—while maintaining clean separation between framework code and business logic. + Extensions provide a powerful mechanism for implementing custom logic—sources, transforms, and sinks, while maintaining clean separation between framework code and business logic. Shared helpers belong in ``libraries/``; hook folders are for scripts that run once per pipeline update, not for packages you import from specs. + +.. admonition:: Deprecation Notice + :class: warning + + As of v0.10.0, importable modules moved from a flat ``src/extensions/`` directory to ``src/extensions/libraries/``. The flat layout still works but emits a **deprecation warning** when top-level ``*.py`` files exist under ``extensions/`` and ``libraries/`` is absent. This feature allows development teams to: -- **Centralize Custom Logic**: Organize all custom Python code in one location -- **Reuse Across Dataflows**: Reference the same functions from multiple dataflow specs -- **Maintain Clean Imports**: Use standard Python module imports (e.g., ``transforms.my_function``) -- **Manage Dependencies**: Install additional Python packages via ``requirements_additional.txt`` -- **Test Independently**: Extensions can be unit tested outside of Spark Declarative Pipelines +- **Centralize custom logic**: Organize importable code under ``extensions/libraries/`` +- **Reuse across dataflows**: Reference the same functions from multiple dataflow specs +- **Maintain clean imports**: Use standard Python module imports (e.g., ``transforms.my_function``) +- **Run setup and teardown**: Use ``pre_init`` / ``post_init`` hooks for registration, Spark configuration, or ``@dp.on_event_hook`` without mixing them into library modules +- **Manage dependencies**: Install additional Python packages via ``requirements_additional.txt`` +- **Test independently**: Extension libraries can be unit tested outside of Spark Declarative Pipelines .. note:: - Extensions are loaded during pipeline initialization when the framework adds the ``extensions/`` directory to the Python path. Any additional dependencies specified in ``requirements_additional.txt`` are installed before the pipeline starts. + The framework adds **``extensions/libraries/``** (under both the framework and the bundle) to ``sys.path`` during pipeline initialization. Init hooks are **not** on ``sys.path``; they are executed with ``runpy.run_path`` like ``python your_script.py``. Any additional dependencies specified in ``requirements_additional.txt`` are installed before the pipeline starts. How It Works ------------ -The extension system consists of three main components: +The extension system has these parts: -1. **Extensions Directory**: A ``src/extensions/`` folder in your pipeline bundle containing Python modules -2. **Module References**: Dataflow specs reference extension functions using ``module`` syntax (e.g., ``transforms.my_function``) -3. **Dependency Management**: Optional ``requirements_additional.txt`` files for installing pip packages +1. **Libraries directory** — ``src/extensions/libraries/`` in your pipeline bundle (and optionally under the framework bundle) holds importable Python modules. +2. **Init hook directories** — ``src/extensions/pre_init/`` and ``src/extensions/post_init/`` hold optional scripts run before and after SDP declarations inside ``DLTPipelineBuilder.initialize_pipeline()``. +3. **Module references** — Dataflow specs reference extension functions using ``module`` syntax (e.g., ``transforms.my_function``). +4. **Dependency management** — Optional ``requirements_additional.txt`` files for installing pip packages. Directory Structure ^^^^^^^^^^^^^^^^^^^ -Extensions live in the ``src/extensions/`` directory of your pipeline bundle: +Importable modules and hook scripts use this layout (framework bundle mirrors the same shape): :: my_pipeline_bundle/ ├── src/ │ ├── extensions/ - │ │ ├── __init__.py # Optional, for package imports - │ │ ├── sources.py # Custom source functions - │ │ ├── transforms.py # Custom transform functions - │ │ └── sinks.py # Custom sink functions + │ │ ├── libraries/ # On sys.path — importable modules + │ │ │ ├── __init__.py # Optional + │ │ │ ├── sources.py # Custom source functions + │ │ │ ├── transforms.py # Custom transform functions + │ │ │ └── sinks.py # Custom sink functions + │ │ ├── pre_init/ # Optional — run before SDP table/view declarations + │ │ └── post_init/ # Optional — run after declarations (e.g. event hooks) │ ├── dataflows/ │ │ └── ... │ └── pipeline_configs/ │ └── ... └── requirements_additional.txt # Optional pip dependencies +.. admonition:: Deprecation Notice + :class: warning + + As of v0.10.0, importable modules moved from a flat ``src/extensions/`` directory to ``src/extensions/libraries/``. The flat layout still works but emits a **deprecation warning** when top-level ``*.py`` files exist under ``extensions/`` and ``libraries/`` is absent. + +Init hooks +^^^^^^^^^^ + +- **pre_init**: Runs after the builder has loaded configs and dataflow specs (substitutions, secrets, operational metadata, Spark config from framework config) and **before** any ``DataFlow.create_dataflow()`` / SDP declarations. +- **post_init**: Runs **after** all dataflows for the pipeline have been created (the SDP graph is assembled; the pipeline update has not started yet). + +Within each folder, scripts run in **sorted filename order**. Files whose names start with ``_`` are skipped. A hook that raises an exception fails the pipeline. Framework ``extensions/`` hooks run before bundle hooks at each phase. + +Use **numeric prefixes** (e.g. ``01_setup.py``, ``02_register.py``) to fix order. Hooks may call ``pipeline_config.get_spark()``, ``get_logger()``, and other singletons directly. + +.. seealso:: + + The repository file ``docs/extensions-and-init-hooks.md`` contains the same layout summary in Markdown. + Dependency Management --------------------- @@ -73,7 +105,7 @@ Custom functions that generate DataFrames for use as data sources. .. code-block:: python - :caption: src/extensions/sources.py + :caption: src/extensions/libraries/sources.py from pyspark.sql import DataFrame, SparkSession from pyspark.sql import functions as F @@ -173,11 +205,7 @@ Custom functions that transform DataFrames after they are read from a source. **Example:** .. code-block:: python - :caption: src/extensions/transforms.py - - from pyspark.sql import DataFrame - from pyspark.sql import functions as F - from typing import Dict + :caption: src/extensions/libraries/transforms.py from pyspark.sql import DataFrame from pyspark.sql import functions as F @@ -283,7 +311,7 @@ Custom functions for ``foreach_batch_sink`` targets that process micro-batches. **Example:** .. code-block:: python - :caption: src/extensions/sinks.py + :caption: src/extensions/libraries/sinks.py from pyspark.sql import DataFrame from typing import Dict @@ -376,4 +404,3 @@ Additional Resources - :doc:`feature_python_functions` - Python transform functions (file path approach) - :doc:`dataflow_spec_ref_source_details` - Complete source configuration reference - :doc:`dataflow_spec_ref_target_details` - Complete target configuration reference - diff --git a/samples/bronze_sample/src/extensions/sources.py b/samples/bronze_sample/src/extensions/libraries/sources.py similarity index 90% rename from samples/bronze_sample/src/extensions/sources.py rename to samples/bronze_sample/src/extensions/libraries/sources.py index 72538eb..c85261e 100644 --- a/samples/bronze_sample/src/extensions/sources.py +++ b/samples/bronze_sample/src/extensions/libraries/sources.py @@ -2,7 +2,7 @@ Python source extensions for the bronze sample pipeline. These functions are loaded via the pythonModule reference in dataflow specs -and are available because the extensions directory is added to sys.path +and are available because the extensions/libraries directory is added to sys.path during pipeline initialization. """ from pyspark.sql import DataFrame, SparkSession @@ -13,11 +13,11 @@ def get_customer_cdf(spark: SparkSession, tokens: Dict) -> DataFrame: """ Get customer data with Change Data Feed enabled. - + Args: spark: SparkSession instance tokens: Dictionary of tokens from the dataflow spec - + Returns: DataFrame with customer data and a TEST_COLUMN added """ @@ -28,4 +28,3 @@ def get_customer_cdf(spark: SparkSession, tokens: Dict) -> DataFrame: df = spark.readStream.options(**reader_options).table(source_table) return df.withColumn("TEST_COLUMN", F.lit("testing from extension...")) - diff --git a/samples/bronze_sample/src/extensions/transforms.py b/samples/bronze_sample/src/extensions/libraries/transforms.py similarity index 94% rename from samples/bronze_sample/src/extensions/transforms.py rename to samples/bronze_sample/src/extensions/libraries/transforms.py index 0fff8df..1d5fbb3 100644 --- a/samples/bronze_sample/src/extensions/transforms.py +++ b/samples/bronze_sample/src/extensions/libraries/transforms.py @@ -2,7 +2,7 @@ Python transform extensions for the bronze sample pipeline. These functions are loaded via the pythonTransform.module reference in dataflow specs -and are available because the extensions directory is added to sys.path +and are available because the extensions/libraries directory is added to sys.path during pipeline initialization. Transform functions receive a DataFrame and optionally tokens, and return a DataFrame. @@ -15,12 +15,12 @@ def customer_aggregation(df: DataFrame) -> DataFrame: """ Apply customer aggregation transformation. - + Groups by CUSTOMER_ID and counts records within a 10-minute watermark window. - + Args: df: Input DataFrame with customer data - + Returns: DataFrame with CUSTOMER_ID and COUNT columns """ @@ -34,24 +34,23 @@ def customer_aggregation(df: DataFrame) -> DataFrame: def customer_aggregation_with_tokens(df: DataFrame, tokens: Dict) -> DataFrame: """ Apply customer aggregation transformation with configurable parameters. - + Args: df: Input DataFrame with customer data tokens: Configuration tokens with: - watermark_column: Column to use for watermark (default: load_timestamp) - watermark_delay: Watermark delay duration (default: 10 minutes) - group_by_column: Column to group by (default: CUSTOMER_ID) - + Returns: DataFrame with grouped counts """ watermark_column = tokens.get("watermarkColumn", "load_timestamp") watermark_delay = tokens.get("watermarkDelay", "10 minutes") group_by_column = tokens.get("groupByColumn", "CUSTOMER_ID") - + return ( df.withWatermark(watermark_column, watermark_delay) .groupBy(group_by_column) .agg(F.count("*").alias("COUNT")) ) - diff --git a/samples/bronze_sample/src/extensions/libraries/utility.py b/samples/bronze_sample/src/extensions/libraries/utility.py new file mode 100644 index 0000000..713f9be --- /dev/null +++ b/samples/bronze_sample/src/extensions/libraries/utility.py @@ -0,0 +1,20 @@ +import logging +import sys + +def set_logger(logger_name: str, log_level: str = "INFO") -> logging.Logger: + """Set up and return a logger with a specified name and log level.""" + logger = logging.getLogger(logger_name) + log_level = getattr(logging, log_level, logging.INFO) + logger.setLevel(log_level) + + # Clear existing handlers to avoid duplicate logging + if logger.hasHandlers(): + logger.handlers.clear() + + # Add a new handler + console_output_handler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + console_output_handler.setFormatter(formatter) + logger.addHandler(console_output_handler) + + return logger \ No newline at end of file diff --git a/samples/bronze_sample/src/extensions/post_init/post_init_test.py b/samples/bronze_sample/src/extensions/post_init/post_init_test.py new file mode 100644 index 0000000..60d8c27 --- /dev/null +++ b/samples/bronze_sample/src/extensions/post_init/post_init_test.py @@ -0,0 +1,18 @@ +""" +Quiet post_init sample: record context once at load time (no event hooks). +""" +from __future__ import annotations + +import pipeline_config +from utility import set_logger + +logger = set_logger("BronzeSample") +_details = pipeline_config.get_pipeline_details() + +logger.info( + "post_init: bundle context catalog=%s schema=%s layer=%s env=%s", + _details.pipeline_catalog, + _details.pipeline_schema, + _details.pipeline_layer, + _details.workspace_env, +) diff --git a/samples/bronze_sample/src/extensions/pre_init/pre_init_test.py b/samples/bronze_sample/src/extensions/pre_init/pre_init_test.py new file mode 100644 index 0000000..843c3ba --- /dev/null +++ b/samples/bronze_sample/src/extensions/pre_init/pre_init_test.py @@ -0,0 +1,19 @@ +""" +Quiet pre_init sample: record context once at load time (no event hooks). +""" + +from __future__ import annotations + +import pipeline_config +from utility import set_logger + +logger = set_logger("BronzeSample") +_details = pipeline_config.get_pipeline_details() + +logger.info( + "pre_init: bundle context catalog=%s schema=%s layer=%s env=%s", + _details.pipeline_catalog, + _details.pipeline_schema, + _details.pipeline_layer, + _details.workspace_env, +) \ No newline at end of file diff --git a/src/constants.py b/src/constants.py index 2122a22..f0f39a3 100644 --- a/src/constants.py +++ b/src/constants.py @@ -41,7 +41,10 @@ class FrameworkPaths: Attributes: CONFIG_PATH (str): Path to the config directory. - EXTENSIONS_PATH (str): The path for extensions. + EXTENSIONS_PATH (str): The path for extensions (legacy flat layout; prefer EXTENSIONS_LIBRARIES_PATH). + EXTENSIONS_LIBRARIES_PATH (str): Importable Python modules (on sys.path). + PRE_INIT_HOOKS_PATH (str): Executable .py hooks before initialize_pipeline SDP declarations. + POST_INIT_HOOKS_PATH (str): Executable .py hooks after SDP declarations. GLOBAL_CONFIG (tuple): Paths to the global configuration files. GLOBAL_SUBSTITUTIONS (tuple): Paths to the global substitutions files. GLOBAL_SECRETS (tuple): Paths to the global secrets files. @@ -55,6 +58,9 @@ class FrameworkPaths: """ CONFIG_PATH: str = "./config" EXTENSIONS_PATH: str = "./extensions" + EXTENSIONS_LIBRARIES_PATH: str = "./extensions/libraries" + PRE_INIT_HOOKS_PATH: str = "./extensions/pre_init" + POST_INIT_HOOKS_PATH: str = "./extensions/post_init" GLOBAL_CONFIG: tuple = ("./config/global.json", "./config/global.yaml", "./config/global.yml") GLOBAL_SUBSTITUTIONS: tuple = ("_substitutions.json", "_substitutions.yaml", "_substitutions.yml") GLOBAL_SECRETS: tuple = ("_secrets.json", "_secrets.yaml", "_secrets.yml") @@ -112,7 +118,10 @@ class PipelineBundlePaths: DATAFLOW_SPEC_PATH (str): The path for dataflow specifications. DML_PATH (str): The path for DML (Data Manipulation Language) files. DQE_PATH (str): The path for data quality expectations. - EXTENSIONS_PATH (str): The path for extensions. + EXTENSIONS_PATH (str): The path for extensions (legacy flat layout; prefer EXTENSIONS_LIBRARIES_PATH). + EXTENSIONS_LIBRARIES_PATH (str): Importable Python modules (on sys.path). + PRE_INIT_HOOKS_PATH (str): Executable .py hooks before initialize_pipeline SDP declarations. + POST_INIT_HOOKS_PATH (str): Executable .py hooks after SDP declarations. GLOBAL_CONFIG_FILE (tuple): The file names for global configuration files. PIPELINE_CONFIGS_PATH (str): The path for pipeline configuration files. PYTHON_FUNCTION_PATH (str): The path for python functions. @@ -124,6 +133,9 @@ class PipelineBundlePaths: DML_PATH: str = "./dml" DQE_PATH: str = "./expectations" EXTENSIONS_PATH: str = "./extensions" + EXTENSIONS_LIBRARIES_PATH: str = "./extensions/libraries" + PRE_INIT_HOOKS_PATH: str = "./extensions/pre_init" + POST_INIT_HOOKS_PATH: str = "./extensions/post_init" GLOBAL_CONFIG_FILE: tuple = ("./global.json", "./global.yaml", "./global.yml") PIPELINE_CONFIGS_PATH: str = "./pipeline_configs" PYTHON_FUNCTION_PATH: str = "./python_functions" diff --git a/src/dataflow/sources/python.py b/src/dataflow/sources/python.py index d4aeecf..b7c9451 100644 --- a/src/dataflow/sources/python.py +++ b/src/dataflow/sources/python.py @@ -20,7 +20,7 @@ class SourcePython(OperationalMetadataMixin): - functionPath: Path to a Python file containing a 'get_df' function - pythonModule: Module.function reference (e.g., 'transforms.get_customer_data') - The module must be in the extensions directory (added to sys.path) + The module must be under extensions/libraries (added to sys.path) - pythonFunction: Direct function reference (for internal framework use) Attributes: diff --git a/src/dataflow_spec_builder/dataflow_spec_builder.py b/src/dataflow_spec_builder/dataflow_spec_builder.py index bd07ff2..b89a42a 100644 --- a/src/dataflow_spec_builder/dataflow_spec_builder.py +++ b/src/dataflow_spec_builder/dataflow_spec_builder.py @@ -612,14 +612,18 @@ def _resolve_python_function_path(self, filename: str, base_path: str, spec_data Search order for regular specs: 1. base_path/python_functions/ - 2. bundle_path/extensions/python_functions/ - 3. framework_path/extensions/python_functions/ + 2. bundle_path/extensions/libraries/ + 3. framework_path/extensions/libraries/ + 4. bundle_path/extensions/ (legacy) + 5. framework_path/extensions/ (legacy) Search order for template-generated specs (adds one additional location): 1. base_path/python_functions/ 2. bundle_path/templates/python_functions/ - 3. bundle_path/extensions/python_functions/ - 4. framework_path/extensions/python_functions/ + 3. bundle_path/extensions/libraries/ + 4. framework_path/extensions/libraries/ + 5. bundle_path/extensions/ (legacy) + 6. framework_path/extensions/ (legacy) """ search_paths = { "base dataflow directory": @@ -627,9 +631,13 @@ def _resolve_python_function_path(self, filename: str, base_path: str, spec_data "templates directory": os.path.join(self.bundle_path, PipelineBundlePaths.TEMPLATE_PATH, PipelineBundlePaths.PYTHON_FUNCTION_PATH, filename), - "bundle extensions directory": + "bundle extensions libraries directory": + os.path.join(self.bundle_path, PipelineBundlePaths.EXTENSIONS_LIBRARIES_PATH, filename), + "framework extensions libraries directory": + os.path.join(self.framework_path, FrameworkPaths.EXTENSIONS_LIBRARIES_PATH, filename), + "bundle extensions directory (legacy)": os.path.join(self.bundle_path, PipelineBundlePaths.EXTENSIONS_PATH, filename), - "framework extensions directory": + "framework extensions directory (legacy)": os.path.join(self.framework_path, FrameworkPaths.EXTENSIONS_PATH, filename), } diff --git a/src/dlt_pipeline_builder.py b/src/dlt_pipeline_builder.py index 2f1af72..9cc3cdd 100644 --- a/src/dlt_pipeline_builder.py +++ b/src/dlt_pipeline_builder.py @@ -2,7 +2,6 @@ from datetime import datetime, timezone import json import os -import sys from pyspark import pipelines as dp from pyspark.dbutils import DBUtils @@ -15,6 +14,7 @@ ) from dataflow import DataFlow from dataflow_spec_builder import DataflowSpecBuilder +from extension_loader import add_extensions_libraries_to_sys_path, run_init_hooks from pipeline_details import PipelineDetails from secrets_manager import SecretsManager from substitution_manager import SubstitutionManager @@ -172,7 +172,7 @@ def _init_pipeline_components(self) -> None: # Initialize secrets manager self._init_secrets_manager() - # Preload shared Python modules + # Preload shared Python modules (extensions/libraries on sys.path) self._preload_extensions() # Initialize dataflow specifications @@ -390,31 +390,27 @@ def _apply_spark_config(self) -> None: self.spark.conf.set(prop, value) def _preload_extensions(self) -> None: - """Add shared extension directories to sys.path.""" - # Framework extensions - framework_extensions = os.path.join(self.framework_path, FrameworkPaths.EXTENSIONS_PATH) - if os.path.exists(framework_extensions): - sys.path.insert(0, framework_extensions) - self.logger.info("Added framework extensions to sys.path: %s", framework_extensions) - - # Bundle extensions - bundle_extensions = os.path.join(self.bundle_path, PipelineBundlePaths.EXTENSIONS_PATH) - if os.path.exists(bundle_extensions): - sys.path.insert(0, bundle_extensions) - self.logger.info("Added bundle extensions to sys.path: %s", bundle_extensions) + """Add extensions/libraries directories to sys.path (legacy flat extensions/ supported).""" + add_extensions_libraries_to_sys_path( + self.framework_path, + self.bundle_path, + self.logger, + ) def initialize_pipeline(self) -> None: """Initialize the Spark Declarative Pipeline.""" def create_dataflow(spec): """Create a dataflow from a specification.""" return DataFlow(dataflow_spec=spec).create_dataflow() - + + run_init_hooks(self.framework_path, self.bundle_path, "pre_init", self.logger) + self.logger.info("Initializing Pipeline...") pipeline_builder_threading_disabled = self.pipeline_config.get( FrameworkSettings.PIPELINE_BUILDER_DISABLE_THREADING_KEY, True ) - + self.logger.info("Processing Dataflow Specs...") if pipeline_builder_threading_disabled: self.logger.info("Pipeline Builder Threading Disabled, creating dataflows sequentially...") @@ -434,3 +430,5 @@ def create_dataflow(spec): ] for future in futures: future.result() + + run_init_hooks(self.framework_path, self.bundle_path, "post_init", self.logger) diff --git a/src/extension_loader.py b/src/extension_loader.py new file mode 100644 index 0000000..c1ed7a3 --- /dev/null +++ b/src/extension_loader.py @@ -0,0 +1,101 @@ +""" +Extension library sys.path setup and pre/post init hook execution. + +Importable modules live under extensions/libraries/. Executable hooks live under +extensions/pre_init/ and extensions/post_init/ and are not added to sys.path. +""" + +from __future__ import annotations + +import os +import runpy +import sys +import warnings +from typing import Literal + +from constants import FrameworkPaths + +Phase = Literal["pre_init", "post_init"] + +_PHASE_SUBDIR = { + "pre_init": "pre_init", + "post_init": "post_init", +} + +_PHASE_LABEL = { + "pre_init": "Pre-Init", + "post_init": "Post-Init", +} + + +def _legacy_extensions_has_top_level_py(extensions_dir: str) -> bool: + """True if extensions_dir exists and has at least one top-level .py file.""" + if not os.path.isdir(extensions_dir): + return False + for name in os.listdir(extensions_dir): + full = os.path.join(extensions_dir, name) + if os.path.isfile(full) and name.endswith(".py"): + return True + return False + + +def add_extensions_libraries_to_sys_path( + framework_path: str, + bundle_path: str, + logger, +) -> None: + """ + Add extensions/libraries to sys.path for each of framework and bundle. + + If extensions/libraries/ exists, it is used. Otherwise, if the legacy flat + extensions/ directory contains top-level .py files, that directory is added + and a deprecation warning is emitted. + """ + for base, level in ((framework_path, "framework"), (bundle_path, "bundle")): + libraries = os.path.normpath(os.path.join(base, FrameworkPaths.EXTENSIONS_LIBRARIES_PATH)) + legacy_root = os.path.normpath(os.path.join(base, FrameworkPaths.EXTENSIONS_PATH)) + + if os.path.isdir(libraries): + sys.path.insert(0, libraries) + logger.info("Added %s extensions libraries to sys.path: %s", level, libraries) + elif _legacy_extensions_has_top_level_py(legacy_root): + sys.path.insert(0, legacy_root) + msg = ( + f"Top-level .py files under {legacy_root} are deprecated. " + f"Move importable modules to {os.path.join(legacy_root, 'libraries')}. " + "Support for flat extensions/ without libraries/ will be removed in a future release." + ) + logger.warning(msg) + warnings.warn(msg, DeprecationWarning, stacklevel=1) + + +def run_init_hooks( + framework_path: str, + bundle_path: str, + phase: Phase, + logger, +) -> None: + """ + Execute all .py files under extensions// for framework then bundle. + + Files are sorted by filename; files starting with '_' are skipped. + Each file is run with runpy.run_path(..., run_name='__main__'). + """ + if phase not in _PHASE_SUBDIR: + raise ValueError(f"Invalid phase: {phase!r}. Expected one of {list(_PHASE_SUBDIR)}.") + + sub = _PHASE_SUBDIR[phase] + label = _PHASE_LABEL[phase] + + for base, level in ((framework_path, "framework"), (bundle_path, "bundle")): + folder = os.path.normpath(os.path.join(base, FrameworkPaths.EXTENSIONS_PATH, sub)) + if not os.path.isdir(folder): + continue + for filename in sorted(os.listdir(folder)): + if not filename.endswith(".py") or filename.startswith("_"): + continue + path = os.path.join(folder, filename) + if not os.path.isfile(path): + continue + logger.info("Running %s %s hook: %s", level, label, path) + runpy.run_path(path, run_name="__main__") diff --git a/src/utility.py b/src/utility.py index 190f02a..8bc3ec5 100644 --- a/src/utility.py +++ b/src/utility.py @@ -404,7 +404,7 @@ def load_python_function_from_module( """ Load and validate a Python function from an extension module. - The module must be importable via sys.path (typically from the extensions directory + The module must be importable via sys.path (typically from extensions/libraries, which is added to sys.path during pipeline initialization). Args: