diff --git a/docs/source/dataflow_spec_ref_cdc.rst b/docs/source/dataflow_spec_ref_cdc.rst index 2b2dad1..91c8b93 100644 --- a/docs/source/dataflow_spec_ref_cdc.rst +++ b/docs/source/dataflow_spec_ref_cdc.rst @@ -18,7 +18,7 @@ The ``cdcSettings`` and ``cdcSnapshotSettings`` enable and pass configuration in - See :ref:`cdcSnapshotSettings` for more information. cdcSettings -~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~ The ``cdcSettings`` object contains the following properties: @@ -36,7 +36,7 @@ The ``cdcSettings`` object contains the following properties: - The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order. * - **scd_type** - ``string`` - - Whether to store records as SCD type 1 or SCD type 2. Set to ``1`` for SCD type 1 or 2 for SCD type ``2``. + - Whether to store records as SCD type 1 or SCD type 2. Set to ``1`` for SCD type 1 or 2 for SCD type ``2``. * - **apply_as_deletes** - ``string`` - (*optional*) Specifies when a CDC event should be treated as a DELETE rather than an upsert. @@ -147,6 +147,27 @@ CDC Historical Snapshot Source Configuration .. note:: If ``recursiveFileLookup`` is set to ``true``, ensure that the ``path`` parameter is compatible with recursive directory traversal. When using the ``{version}`` placeholder, place it in the directory portion of the path rather than the filename (e.g. ``/data/{version}/file.parquet``). When using regex named capture groups, the pattern spans the full relative path from the first dynamic segment, so ``recursiveFileLookup`` must be ``true`` if the version spans multiple directory levels. + The ``source`` object contains the following properties for ``table`` based sources: + + .. list-table:: + :header-rows: 1 + + * - Parameter + - Type + - Description + * - **table** + - ``string`` + - The table name to load the source data from, as either a 2-part ``schema.table`` (resolving to **.schema.table) or 3-part ``catalog.schema.table`` identifier. + * - **versionColumn** + - ``string`` + - The column name to use for versioning. + * - **startingVersion** + - ``string`` or ``integer`` + - (*optional*) The version to start processing from. + * - **selectExp** + - ``list`` + - (*optional*) A list of select expressions to apply to the source data. + .. _file-path-patterns: File Path Patterns @@ -235,23 +256,3 @@ File Path Patterns See ``samples/bronze_sample/src/dataflows/feature_samples/dataflowspec/historical_snapshot_files_datetime_recursive_and_partitioned_regex_main.json`` for a complete working example. - The ``source`` object contains the following properties for ``table`` based sources: - - .. list-table:: - :header-rows: 1 - - * - Parameter - - Type - - Description - * - **table** - - ``string`` - - The table name to load the source data from. - * - **versionColumn** - - ``string`` - - The column name to use for versioning. - * - **startingVersion** - - ``string`` or ``integer`` - - (*optional*) The version to start processing from. - * - **selectExp** - - ``list`` - - (*optional*) A list of select expressions to apply to the source data. diff --git a/src/dataflow/cdc_snapshot.py b/src/dataflow/cdc_snapshot.py index 1e9df6a..6a59234 100644 --- a/src/dataflow/cdc_snapshot.py +++ b/src/dataflow/cdc_snapshot.py @@ -176,7 +176,7 @@ def is_file_source(self) -> bool: class CDCSnapshotFlow: """A class to create a CDC Snapshot flow.""" - + def __init__(self, settings: CDCSnapshotSettings): self.settings = settings self.logger = pipeline_config.get_logger() @@ -210,7 +210,7 @@ def sorted_versions(self) -> List[VersionInfo]: if self._sorted_versions is None and self._available_versions: self._sorted_versions = sorted(self._available_versions, key=lambda x: x.raw_value) return self._sorted_versions or [] - + @property def version_values(self) -> List[Union[int, datetime]]: """Get version values.""" @@ -239,7 +239,7 @@ def create( flow_name: Optional[str] = None # TODO: Add flow name ) -> None: """Create CDC from snapshot flow. - + Args: dataflow_config: DataFlow configuration target_table: Name of the target table @@ -318,35 +318,35 @@ def _get_available_versions(self, latest_snapshot_version: Optional[Union[int, d return self._get_available_table_versions(latest_snapshot_version) else: raise ValueError(f"Unsupported source type: {self.sourceType}") - + def _list_files(self, path, recursive=True): """List files in a directory, with optional recursive file lookup. - + Args: path: Directory path to list files from recursive: If True, list files recursively. If False, list only files in the immediate directory. - + Returns: List of file objects from dbutils.fs.ls() """ dbutils = pipeline_config.get_dbutils() all_files = [] - + for f in dbutils.fs().ls(path): all_files.append(f) if recursive and f.isDir(): all_files.extend(self._list_files(f.path, recursive=True)) - + return all_files def _path_to_regex_pattern(self, path: str) -> str: """Convert path to normalized regex pattern with named groups. - + Curly-brace syntax is converted to named capture groups: - {version} -> (?P.+) (single capture group for version) - {fragment} -> (?P.*?) (single capture group for fragment) - + If path already contains regex named groups (?P, it is returned as-is (already a regex pattern). """ @@ -403,9 +403,9 @@ def _get_available_file_versions(self, latest_snapshot_version: Optional[Union[i self.logger.debug(f"CDC Snapshot: Using recursive file lookup: {recursive_file_lookup}") files_list = self._list_files(parent_dir, recursive=recursive_file_lookup) files_with_path_info = [FilePathInfo(full_path=f.path, filename_with_version_path='/'.join(f.path.split('/')[dynamic_idx:])) for f in files_list] - + self.logger.debug(f"CDC Snapshot: Found {len(files_with_path_info)} files") - + # Extract version from filename and filter by latest_snapshot_version if provided available_versions = [] for file in files_with_path_info: @@ -432,14 +432,14 @@ def _get_available_file_versions(self, latest_snapshot_version: Optional[Union[i except ValueError as e: self.logger.warning(f"CDC Snapshot: Skipping file '{file.filename_with_version_path}' - {e}") continue - + return available_versions def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[int, datetime]]) -> List[VersionInfo]: """Get list of available versions from table.""" spark = pipeline_config.get_spark() table_name = self.source.table - + self.logger.info(f"CDC Snapshot: Getting versions from table: {table_name}") try: df = spark.table(table_name) @@ -449,7 +449,7 @@ def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[ # Get the version column version_column = self.source.versionColumn - + # Check if the version column is a valid data type valid_data_types = ["timestamp", "date", "integer","long"] version_column_type = df.schema[version_column].dataType.typeName() @@ -460,24 +460,24 @@ def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[ # Get the version values and filter by latest_snapshot_version if provided if latest_snapshot_version is not None: - latest_version_info = VersionInfo( + latest_version_info = VersionInfo( raw_value=latest_snapshot_version, version_type=self.source.versionType) version_df = df.select(version_column).where(f"{version_column} > {latest_version_info.sql_formatted_value}").distinct() else: version_df = df.select(version_column).distinct() - + available_versions = [] for row in version_df.collect(): version = row[version_column] - + if version is None: continue - + if self.source.startingVersion is not None and version < self.source.startingVersion: self.logger.debug(f"CDC Snapshot: Skipping version {version} because it is less than the starting version {self.source.startingVersion}") continue - + if latest_snapshot_version is not None and version <= latest_snapshot_version: self.logger.debug(f"CDC Snapshot: Skipping version {version} because it is less than or equal to the latest snapshot version {latest_snapshot_version}") continue @@ -487,16 +487,16 @@ def _get_available_table_versions(self, latest_snapshot_version: Optional[Union[ version_type=self.source.versionType, datetime_format=None ) - + available_versions.append(version_info) - + self.logger.debug(f"CDC Snapshot: Found {len(available_versions)} available versions") - + return available_versions def _extract_version_from_filename(self, filename: str, file_pattern: str) -> Optional[VersionInfo]: """Extract version from filename using pattern (regex or curly-brace; normalized to regex). - + Version is taken from all named groups starting with version_, concatenated in name order. Curly-brace {version} is converted to (?P.+); {fragment} to (?P.*?). """ @@ -521,7 +521,7 @@ def _extract_version_from_filename(self, filename: str, file_pattern: str) -> Op raw_value = datetime.strptime(version_str, self.source.datetimeFormat) else: raw_value = int(version_str) - + return VersionInfo( raw_value=raw_value, version_type=self.source.versionType, @@ -617,15 +617,22 @@ def _read_snapshot_dataframe(self, version_info: VersionInfo, dataflow_config: D elif self.sourceType == CDCSnapshotSourceTypes.TABLE: table_parts = self.source.table.split(".") - if len(table_parts) < 2: - raise ValueError(f"Invalid table name format: {self.source.table}. Expected format: database.schema.table") - + if not 1 < len(table_parts) < 4: + raise ValueError(f"Invalid table name format: {self.source.table}. Accepted formats are: schema.table & database.schema.table") + + if len(table_parts) == 3: + database = f"{table_parts[0]}.{table_parts[1]}" + else: + # set to the specified target catalog by default + _pipeline_details = pipeline_config.get_pipeline_details() + database = f"{_pipeline_details.pipeline_catalog}.{table_parts[0]}" + table = table_parts[-1] - database = f"{table_parts[0]}.{table_parts[1]}" + select_exp = self.source.selectExp where_clause = [ f"{self.source.versionColumn} = {version_info.sql_formatted_value}"] - + self.logger.info(f"CDC Snapshot: Reading table: {database}.{table} with where clause: {where_clause}") df = SourceDelta( database=database, @@ -633,7 +640,7 @@ def _read_snapshot_dataframe(self, version_info: VersionInfo, dataflow_config: D whereClause=where_clause, selectExp=select_exp ).read_source(read_config) - + if self.source.deduplicateMode == DeduplicateMode.KEYS_ONLY: df = self._deduplicate_by_keys(df) self.logger.debug("CDC Snapshot: Applied deduplication by keys")