Skip to content

dawidstruzik/sdp-extractor

Repository files navigation

Spark Declarative Pipelines - Schema Extractor

Extract table schemas from Spark Declarative Pipeline definitions without running the pipeline, connecting to Databricks, or reading any data.

Problem

Spark Declarative Pipelines (SDP) define tables using Python decorators like @materialized_view and @table. These pipelines are designed to run on Databricks, but teams often need to validate that the table schemas match their data contracts (e.g. YAML contract files) before deploying. There is no built-in way to extract schemas from pipeline code without executing the pipeline.

How it works

SDP decorators (@materialized_view, @table, @temporary_view) internally register table definitions into a GraphElementRegistry. On Databricks, this registry sends definitions to the Spark Connect server. Locally, the extractor substitutes a LocalGraphElementRegistry, a simple in-memory list.

The extractor imports the pipeline module inside a graph_element_registration_context that activates the local registry. When Python executes the module, each decorator fires and pushes its metadata (name, schema, partition columns, etc.) into the list. The function bodies (e.g. spark.read.format("delta").load(...)) are captured as references but never called. Once the import completes, the collected definitions are serialized to JSON.

Recommendation

For reliable local schema validation, declare schema= explicitly on all pipeline tables, using either StructType(...) or a DDL string like "id STRING NOT NULL, value DOUBLE".

This enables:

  • Deterministic schema extraction with no Spark runtime dependency (StructType) or only a local SparkSession (DDL strings)
  • A clear, reviewable contract between pipeline code and downstream consumers
  • The ability to compare extracted schemas against YAML contracts in CI

Tables without an explicit schema= rely on runtime inference from the DataFrame returned by the function body. The extractor reports these as "schema_source": "none" since extracting them would require actually executing the pipeline.

Usage

Prerequisites

# Python 3.13+, uv
uv sync

For DDL string parsing, Java is also required (any JDK 17+).

Extract schemas

# StructType schemas only (no Spark/Java needed)
uv run python schema_extractor.py example_pipeline.py

# Also parse DDL string schemas (starts a local SparkSession)
uv run python schema_extractor.py --parse-ddl example_pipeline.py

# Write to file instead of stdout
uv run python schema_extractor.py --parse-ddl example_pipeline.py -o output.schema.json

Example output

See example_output.json for the full output. Abbreviated:

{
  "pipeline_tables": [
    {
      "name": "bronze_raw_materials",
      "kind": "materialized_view",
      "comment": "Raw materials master data from ERP",
      "schema_source": "struct_type",
      "columns": [
        { "name": "material_id", "nullable": false, "type": "string" },
        { "name": "material_name", "nullable": true, "type": "string" },
        { "name": "unit_price", "nullable": true, "type": "double" }
      ]
    },
    {
      "name": "silver_quality_checks",
      "kind": "streaming_table",
      "schema_source": "struct_type",
      "columns": ["..."],
      "partition_cols": ["check_type"],
      "cluster_by": ["material_id"]
    },
    {
      "name": "gold_material_summary",
      "kind": "materialized_view",
      "schema_source": "none"
    }
  ],
  "pipeline_flows": [
    { "name": "bronze_raw_materials", "target": "bronze_raw_materials" },
    { "name": "silver_quality_checks", "target": "silver_quality_checks" }
  ]
}

Nested types (arrays of structs, maps) are fully expanded in the output:

{
  "name": "ingredients",
  "nullable": true,
  "type": "array<struct<material_id:string,percentage:double,material_name:string>>",
  "element": {
    "type": "struct<material_id:string,percentage:double,material_name:string>",
    "fields": [
      { "name": "material_id", "nullable": false, "type": "string" },
      { "name": "percentage", "nullable": false, "type": "double" },
      { "name": "material_name", "nullable": true, "type": "string" }
    ]
  }
}

What gets extracted

Field Source
Table name and kind (materialized_view, streaming_table, temporary_view, sink) Decorator type
Comment comment= parameter
Column schema (name, type, nullable, nested structure) schema= parameter
Partition columns partition_cols= parameter
Cluster columns cluster_by= parameter
Table properties table_properties= parameter
Format format= parameter
Flow graph (which flow writes to which table) Decorator registration

Schema declaration styles

SDP supports two ways to declare schemas on a decorator:

StructType (Python object) -- serialized directly, no Spark needed:

@materialized_view(
    name="my_table",
    schema=StructType([
        StructField("id", StringType(), nullable=False),
        StructField("value", DoubleType(), nullable=True),
    ]),
)
def my_table():
    ...

DDL string -- requires a local SparkSession to parse via StructType.fromDDL():

@materialized_view(
    name="my_table",
    schema="id STRING NOT NULL, value DOUBLE",
)
def my_table():
    ...

Development

uv sync                        # install dependencies
uv run ruff check .            # lint
uv run ruff format --check .   # format check
uv run ty check                # type check

Files

File Purpose
schema_extractor.py Schema extraction tool (CLI + library)
example_pipeline.py Sample pipeline demonstrating all schema styles
example_output.json Reference output from running the extractor

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages