Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions docs/source/dataflow_spec_ref_cdc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The ``cdcSettings`` and ``cdcSnapshotSettings`` enable and pass configuration in
- See :ref:`cdcSnapshotSettings` for more information.

cdcSettings
~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~

The ``cdcSettings`` object contains the following properties:

Expand All @@ -36,7 +36,7 @@ The ``cdcSettings`` object contains the following properties:
- The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order.
* - **scd_type**
- ``string``
- Whether to store records as SCD type 1 or SCD type 2. Set to ``1`` for SCD type 1 or 2 for SCD type ``2``.
- Whether to store records as SCD type 1 or SCD type 2. Set to ``1`` for SCD type 1 or 2 for SCD type ``2``.
* - **apply_as_deletes**
- ``string``
- (*optional*) Specifies when a CDC event should be treated as a DELETE rather than an upsert.
Expand Down Expand Up @@ -147,6 +147,27 @@ CDC Historical Snapshot Source Configuration
.. note::
If ``recursiveFileLookup`` is set to ``true``, ensure that the ``path`` parameter is compatible with recursive directory traversal. When using the ``{version}`` placeholder, place it in the directory portion of the path rather than the filename (e.g. ``/data/{version}/file.parquet``). When using regex named capture groups, the pattern spans the full relative path from the first dynamic segment, so ``recursiveFileLookup`` must be ``true`` if the version spans multiple directory levels.

The ``source`` object contains the following properties for ``table`` based sources:

.. list-table::
:header-rows: 1

* - Parameter
- Type
- Description
* - **table**
- ``string``
- The table name to load the source data from, as either a 2-part ``schema.table`` (resolving to *<pipeline_target>*.schema.table) or 3-part ``catalog.schema.table`` identifier.
* - **versionColumn**
- ``string``
- The column name to use for versioning.
* - **startingVersion**
- ``string`` or ``integer``
- (*optional*) The version to start processing from.
* - **selectExp**
- ``list``
- (*optional*) A list of select expressions to apply to the source data.

.. _file-path-patterns:

File Path Patterns
Expand Down Expand Up @@ -235,23 +256,3 @@ File Path Patterns

See ``samples/bronze_sample/src/dataflows/feature_samples/dataflowspec/historical_snapshot_files_datetime_recursive_and_partitioned_regex_main.json`` for a complete working example.

The ``source`` object contains the following properties for ``table`` based sources:

.. list-table::
:header-rows: 1

* - Parameter
- Type
- Description
* - **table**
- ``string``
- The table name to load the source data from.
* - **versionColumn**
- ``string``
- The column name to use for versioning.
* - **startingVersion**
- ``string`` or ``integer``
- (*optional*) The version to start processing from.
* - **selectExp**
- ``list``
- (*optional*) A list of select expressions to apply to the source data.
69 changes: 38 additions & 31 deletions src/dataflow/cdc_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def is_file_source(self) -> bool:

class CDCSnapshotFlow:
"""A class to create a CDC Snapshot flow."""

def __init__(self, settings: CDCSnapshotSettings):
self.settings = settings
self.logger = pipeline_config.get_logger()
Expand Down Expand Up @@ -210,7 +210,7 @@ def sorted_versions(self) -> List[VersionInfo]:
if self._sorted_versions is None and self._available_versions:
self._sorted_versions = sorted(self._available_versions, key=lambda x: x.raw_value)
return self._sorted_versions or []

@property
def version_values(self) -> List[Union[int, datetime]]:
"""Get version values."""
Expand Down Expand Up @@ -239,7 +239,7 @@ def create(
flow_name: Optional[str] = None # TODO: Add flow name
) -> None:
"""Create CDC from snapshot flow.

Args:
dataflow_config: DataFlow configuration
target_table: Name of the target table
Expand Down Expand Up @@ -318,35 +318,35 @@ def _get_available_versions(self, latest_snapshot_version: Optional[Union[int, d
return self._get_available_table_versions(latest_snapshot_version)
else:
raise ValueError(f"Unsupported source type: {self.sourceType}")

def _list_files(self, path, recursive=True):
"""List files in a directory, with optional recursive file lookup.

Args:
path: Directory path to list files from
recursive: If True, list files recursively. If False, list only files in the immediate directory.

Returns:
List of file objects from dbutils.fs.ls()
"""
dbutils = pipeline_config.get_dbutils()
all_files = []

for f in dbutils.fs().ls(path):
all_files.append(f)

if recursive and f.isDir():
all_files.extend(self._list_files(f.path, recursive=True))

return all_files

def _path_to_regex_pattern(self, path: str) -> str:
"""Convert path to normalized regex pattern with named groups.

Curly-brace syntax is converted to named capture groups:
- {version} -> (?P<version_main>.+) (single capture group for version)
- {fragment} -> (?P<fragment>.*?) (single capture group for fragment)

If path already contains regex named groups (?P<version_ or (?P<fragment>,
it is returned as-is (already a regex pattern).
"""
Expand Down Expand Up @@ -403,9 +403,9 @@ def _get_available_file_versions(self, latest_snapshot_version: Optional[Union[i
self.logger.debug(f"CDC Snapshot: Using recursive file lookup: {recursive_file_lookup}")
files_list = self._list_files(parent_dir, recursive=recursive_file_lookup)
files_with_path_info = [FilePathInfo(full_path=f.path, filename_with_version_path='/'.join(f.path.split('/')[dynamic_idx:])) for f in files_list]

self.logger.debug(f"CDC Snapshot: Found {len(files_with_path_info)} files")

# Extract version from filename and filter by latest_snapshot_version if provided
available_versions = []
for file in files_with_path_info:
Expand All @@ -432,14 +432,14 @@ def _get_available_file_versions(self, latest_snapshot_version: Optional[Union[i
except ValueError as e:
self.logger.warning(f"CDC Snapshot: Skipping file '{file.filename_with_version_path}' - {e}")
continue

return available_versions

def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[int, datetime]]) -> List[VersionInfo]:
"""Get list of available versions from table."""
spark = pipeline_config.get_spark()
table_name = self.source.table

self.logger.info(f"CDC Snapshot: Getting versions from table: {table_name}")
try:
df = spark.table(table_name)
Expand All @@ -449,7 +449,7 @@ def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[

# Get the version column
version_column = self.source.versionColumn

# Check if the version column is a valid data type
valid_data_types = ["timestamp", "date", "integer","long"]
version_column_type = df.schema[version_column].dataType.typeName()
Expand All @@ -460,24 +460,24 @@ def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[

# Get the version values and filter by latest_snapshot_version if provided
if latest_snapshot_version is not None:
latest_version_info = VersionInfo(
latest_version_info = VersionInfo(
raw_value=latest_snapshot_version,
version_type=self.source.versionType)
version_df = df.select(version_column).where(f"{version_column} > {latest_version_info.sql_formatted_value}").distinct()
else:
version_df = df.select(version_column).distinct()

available_versions = []
for row in version_df.collect():
version = row[version_column]

if version is None:
continue

if self.source.startingVersion is not None and version < self.source.startingVersion:
self.logger.debug(f"CDC Snapshot: Skipping version {version} because it is less than the starting version {self.source.startingVersion}")
continue

if latest_snapshot_version is not None and version <= latest_snapshot_version:
self.logger.debug(f"CDC Snapshot: Skipping version {version} because it is less than or equal to the latest snapshot version {latest_snapshot_version}")
continue
Expand All @@ -487,16 +487,16 @@ def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[
version_type=self.source.versionType,
datetime_format=None
)

available_versions.append(version_info)

self.logger.debug(f"CDC Snapshot: Found {len(available_versions)} available versions")

return available_versions

def _extract_version_from_filename(self, filename: str, file_pattern: str) -> Optional[VersionInfo]:
"""Extract version from filename using pattern (regex or curly-brace; normalized to regex).

Version is taken from all named groups starting with version_, concatenated in name order.
Curly-brace {version} is converted to (?P<version_main>.+); {fragment} to (?P<fragment>.*?).
"""
Expand All @@ -521,7 +521,7 @@ def _extract_version_from_filename(self, filename: str, file_pattern: str) -> Op
raw_value = datetime.strptime(version_str, self.source.datetimeFormat)
else:
raw_value = int(version_str)

return VersionInfo(
raw_value=raw_value,
version_type=self.source.versionType,
Expand Down Expand Up @@ -617,23 +617,30 @@ def _read_snapshot_dataframe(self, version_info: VersionInfo, dataflow_config: D

elif self.sourceType == CDCSnapshotSourceTypes.TABLE:
table_parts = self.source.table.split(".")
if len(table_parts) < 2:
raise ValueError(f"Invalid table name format: {self.source.table}. Expected format: database.schema.table")

if not 1 < len(table_parts) < 4:
raise ValueError(f"Invalid table name format: {self.source.table}. Accepted formats are: schema.table & database.schema.table")

if len(table_parts) == 3:
database = f"{table_parts[0]}.{table_parts[1]}"
else:
# set to the specified target catalog by default
_pipeline_details = pipeline_config.get_pipeline_details()
database = f"{_pipeline_details.pipeline_catalog}.{table_parts[0]}"

table = table_parts[-1]
database = f"{table_parts[0]}.{table_parts[1]}"

select_exp = self.source.selectExp
where_clause = [
f"{self.source.versionColumn} = {version_info.sql_formatted_value}"]

self.logger.info(f"CDC Snapshot: Reading table: {database}.{table} with where clause: {where_clause}")
df = SourceDelta(
database=database,
table=table,
whereClause=where_clause,
selectExp=select_exp
).read_source(read_config)

if self.source.deduplicateMode == DeduplicateMode.KEYS_ONLY:
df = self._deduplicate_by_keys(df)
self.logger.debug("CDC Snapshot: Applied deduplication by keys")
Expand Down