diff --git a/README.md b/README.md index c733664..9100573 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,11 @@ Operations are applied concurrently across multiple tables * OPTIMIZE with z-order on tables having specified columns * Detect tables having too many small files ([example notebook](examples/detect_small_files.py)) * Visualise quantity of data written per table per period + * Delta housekeeping analysis ([example notebook](examples/exec_delta_housekeeping.py)) which provide: + * stats (size of tables and number of files, timestamps of latest OPTIMIZE & VACUUM operations, stats of OPTIMIZE) + * recommendations on tables that need to be OPTIMIZED/VACUUM'ed + * are tables OPTIMIZED/VACUUM'ed often enough + * tables that have small files / tables for which ZORDER is not being effective * **Governance** * PII detection with Presidio ([example notebook](examples/pii_detection_presidio.py)) * Text Analysis with MosaicML and Databricks MLflow ([example notebook](examples/text_analysis_mosaicml_mlflow.py)) diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py new file mode 100644 index 0000000..03de50c --- /dev/null +++ b/discoverx/delta_housekeeping.py @@ -0,0 +1,378 @@ +from typing import Iterable +from functools import reduce +from datetime import datetime, timezone +import pandas as pd + +from discoverx.table_info import TableInfo + +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.window import Window +import pyspark.sql.types as T +import pyspark.sql.functions as F + + +class DeltaHousekeeping: + + def __init__(self, spark: SparkSession) -> None: + self._spark = spark + self.empty_schema = T.StructType([ + T.StructField("catalog", T.StringType()), + T.StructField("database", T.StringType()), + T.StructField("tableName", T.StringType()), + ]) + + @staticmethod + def _process_describe_history( + describe_detail_df: DataFrame, describe_history_df: DataFrame + ) -> pd.DataFrame: + """ + processes the DESCRIBE HISTORY result of potentially several tables in different schemas/catalogs + Provides + - table stats (size and number of files) + - timestamp for last & second last OPTIMIZE + - stats of OPTIMIZE (including ZORDER) + - timestamp for last & second last VACUUM + + returns a pandas DataFrame, and converts Spark internal dfs to pandas as soon as they are manageable + the reason being that DESCRIBE HISTORY / DESCRIBE DETAIL cannot be cached + + TODO reconsider if it is better outside of the class + """ + if not "operation" in describe_history_df.columns: + return describe_detail_df.toPandas() + + # window over operation + operation_order = ( + describe_history_df + .filter(F.col("operation").isin(["OPTIMIZE", "VACUUM END"])) + .withColumn("operation_order", F.row_number().over( + Window.partitionBy(["catalog", "database", "tableName", "operation"]).orderBy(F.col("timestamp").desc()) + )) + ) + + if operation_order.isEmpty(): + return describe_detail_df.toPandas() + + operation_order = operation_order.toPandas() + + # max & 2nd timestamp of OPTIMIZE into output + out = describe_detail_df.toPandas().merge( + operation_order[(operation_order.operation == "OPTIMIZE") & (operation_order.operation_order == 1)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': 'max_optimize_timestamp'}), + how="outer", on=["catalog", "database", "tableName"] + ) + out = out.merge( + operation_order[(operation_order.operation == "OPTIMIZE") & (operation_order.operation_order == 2)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': '2nd_optimize_timestamp'}), + how="outer", on=["catalog", "database", "tableName"] + ) + # max timestamp of VACUUM into output + out = out.merge( + operation_order[(operation_order.operation == "VACUUM END") & (operation_order.operation_order == 1)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': 'max_vacuum_timestamp'}), + how="outer", on=["catalog", "database", "tableName"] + ) + out = out.merge( + operation_order[(operation_order.operation == "VACUUM END") & (operation_order.operation_order == 2)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': '2nd_vacuum_timestamp'}), + how="outer", on=["catalog", "database", "tableName"] + ) + # summary of table metrics + table_metrics_1 = ( + operation_order[(operation_order['operation'] == 'OPTIMIZE') & (operation_order['operation_order'] == 1)] + .loc[:, ['catalog', 'database', 'tableName', 'min_file_size', 'p50_file_size', 'max_file_size', 'z_order_by']] + ) + + # write to output + out = out.merge( + table_metrics_1, + how="outer", on=["catalog", "database", "tableName"] + ) + + return out + + @staticmethod + def save_as_table( + result: DataFrame, + housekeeping_table_name: str, + ): + """ + Static method to store intermediate results of the scan operation into Delta + Would make sense only if using map_chunked from the `DataExplorer` object + (otherwise tables are writen one by one into Delta with overhead) + + TODO create function in `DataExplorer` that uses this for a chunked + """ + ( + result + .write + .format("delta") + .mode("append") + .option("mergeSchema", "true") + .saveAsTable(housekeeping_table_name) + ) + + def get_describe_detail(self, table_info: TableInfo): + dd = self._spark.sql(f""" + DESCRIBE DETAIL {table_info.catalog}.{table_info.schema}.{table_info.table}; + """) + dd = ( + dd + .withColumn("split", F.split(F.col('name'), '\.')) + .withColumn("catalog", F.col("split").getItem(0)) + .withColumn("database", F.col("split").getItem(1)) + .withColumn("tableName", F.col("split").getItem(2)) + .select([ + F.col("catalog"), + F.col("database"), + F.col("tableName"), + F.col("numFiles").alias("number_of_files"), + F.col("sizeInBytes").alias("bytes"), + ]) + ) + return dd + + @staticmethod + def get_describe_history_statement(table_info: TableInfo): + return f""" + SELECT + '{table_info.catalog}' AS catalog, + '{table_info.schema}' AS database, + '{table_info.table}' AS tableName, + operation, + timestamp, + operationMetrics.minFileSize AS min_file_size, + operationMetrics.p50FileSize AS p50_file_size, + operationMetrics.maxFileSize AS max_file_size, + operationParameters.zOrderBy AS z_order_by + FROM (DESCRIBE HISTORY {table_info.catalog}.{table_info.schema}.{table_info.table}) + WHERE operation in ('OPTIMIZE', 'VACUUM END') + """ + + def scan( + self, + table_info: TableInfo, + ) -> pd.DataFrame: + """ + Scans a table_info to fetch Delta stats + - DESCRIBE DETAIL + - DESCRIBE HISTORY + """ + try: + # runs a describe detail per table, figures out if exception + dd = self.get_describe_detail(table_info) + + # prepares a DESCRIBE HISTORY statement per table (will be run outside the try-catch) + statement = self.get_describe_history_statement(table_info) + + return self._process_describe_history( + dd, + self._spark.sql(statement), + ) + + except Exception as e: + errors_df = self._spark.createDataFrame( + [(table_info.catalog or "", table_info.schema, table_info.table, str(e))], + ["catalog", "database", "tableName", "error"] + ) + return errors_df.toPandas() + + +class DeltaHousekeepingActions: + """ + Processes the output of the `DeltaHousekeeping` object to provide recommendations + - tables that need to be OPTIMIZED/VACUUM'ed + - are tables OPTIMIZED/VACUUM'ed often enough + - tables that have small files / tables for which ZORDER is not being effective + """ + + def __init__( + self, + mapped_pd_dfs: Iterable[pd.DataFrame], + spark: SparkSession = None, + min_table_size_optimize: int = 128*1024*1024, # i.e. 128 MB + min_days_not_optimized: int = 7, + min_days_not_vacuumed: int = 31, + max_optimize_freq: int = 2, + max_vacuum_freq: int = 2, + small_file_threshold: int = 32*1024*1024, # i.e. 32 MB + min_number_of_files_for_zorder: int = 8, + stats: pd.DataFrame = None, # for testability only + ) -> None: + if stats is None: + self._mapped_pd_dfs = mapped_pd_dfs + stats = pd.concat(self._mapped_pd_dfs) + self._stats: pd.DataFrame = stats + + if spark is None: + spark = SparkSession.builder.getOrCreate() + self._spark = spark + + self.min_table_size_optimize = min_table_size_optimize + self.min_days_not_optimized = min_days_not_optimized + self.min_days_not_vacuumed = min_days_not_vacuumed + self.max_optimize_freq = max_optimize_freq + self.max_vacuum_freq = max_vacuum_freq + self.small_file_threshold = small_file_threshold + self.min_number_of_files_for_zorder = min_number_of_files_for_zorder + self.tables_not_optimized_legend = "Tables that are never OPTIMIZED and would benefit from it" + self.tables_not_vacuumed_legend = "Tables that are never VACUUM'ed" + self.tables_not_optimized_last_days = "Tables that are not OPTIMIZED often enough" + self.tables_not_vacuumed_last_days = "Tables that are not VACUUM'ed often enough" + self.tables_optimized_too_freq = "Tables that are OPTIMIZED too often" + self.tables_vacuumed_too_freq = "Tables that are VACUUM'ed too often" + self.tables_do_not_need_optimize = "Tables that are too small to be OPTIMIZED" + self.tables_to_analyze = "Tables that need more analysis (small_files)" + self.tables_zorder_not_effective = "Tables for which ZORDER is not being effective" + + def _need_optimize(self) -> pd.DataFrame: + stats = self._stats.copy() + stats = stats.loc[stats.max_optimize_timestamp.isnull() & stats.bytes.notnull()] + return ( + stats.loc[(stats.bytes.astype(int) > self.min_table_size_optimize)] + ) + + def _optimize_not_needed(self) -> pd.DataFrame: + stats = self._stats.copy() + stats = stats.loc[stats.max_optimize_timestamp.isnull() & stats.bytes.notnull()] + return ( + stats.loc[stats.max_optimize_timestamp.notnull() & (stats.bytes.astype(int) > self.min_table_size_optimize)] + ) + + def _not_optimized_last_days(self) -> pd.DataFrame: + stats = self._stats.copy() + stats['max_optimize_timestamp'] = pd.to_datetime(stats['max_optimize_timestamp'], utc=True) + stats['optimize_lag'] = ( + datetime.now(timezone.utc) - stats['max_optimize_timestamp'] + ).dt.days + return ( + stats[stats['optimize_lag'] < self.min_days_not_optimized] + ) + + def _optimized_too_frequently(self) -> pd.DataFrame: + stats = self._stats.copy() + stats['max_optimize_timestamp'] = pd.to_datetime(stats['max_optimize_timestamp']) + stats['2nd_optimize_timestamp'] = pd.to_datetime(stats['2nd_optimize_timestamp']) + stats['optimize_lag'] = (stats['max_optimize_timestamp'] - stats['2nd_optimize_timestamp']).dt.days + return ( + stats[stats['optimize_lag'] < self.max_optimize_freq] + ) + + def _never_vacuumed(self) -> pd.DataFrame: + stats = self._stats + return ( + stats.loc[stats.max_vacuum_timestamp.isnull()] + ) + + def _not_vacuumed_last_days(self) -> pd.DataFrame: + stats = self._stats.copy() + stats['max_vacuum_timestamp'] = pd.to_datetime(stats['max_vacuum_timestamp'], utc=True) + stats['vacuum_lag'] = ( + datetime.now(timezone.utc) - stats['max_vacuum_timestamp'] + ).dt.days + return ( + stats[stats['vacuum_lag'] < self.min_days_not_vacuumed] + ) + + def _vacuumed_too_frequently(self) -> pd.DataFrame: + stats = self._stats.copy() + stats['max_vacuum_timestamp'] = pd.to_datetime(stats['max_vacuum_timestamp']) + stats['2nd_vacuum_timestamp'] = pd.to_datetime(stats['2nd_vacuum_timestamp']) + stats['vacuum_lag'] = (stats['max_vacuum_timestamp'] - stats['2nd_vacuum_timestamp']).dt.days + return ( + stats[stats['vacuum_lag'] < self.max_vacuum_freq] + ) + + def _analyze_these_tables(self) -> pd.DataFrame: + stats = self._stats.copy() + stats = stats.loc[stats['max_optimize_timestamp'].notnull() & + stats['p50_file_size'].notnull() & + (stats['number_of_files'] > 1)] + stats = stats.loc[(stats['p50_file_size'].astype(int) < self.small_file_threshold)] + return ( + stats.sort_values(by=['database', 'tableName', 'number_of_files'], ascending=[True, True, False]) + ) + + def _zorder_not_effective(self) -> pd.DataFrame: + stats = self._stats.copy() + stats = stats.loc[stats['max_optimize_timestamp'].notnull() & + stats['p50_file_size'].notnull()] + + # clean up z_order_by column and split into array + stats['z_order_by_clean'] = stats['z_order_by'].apply( + lambda x: None if x == "[]" else x.replace('[', '').replace(']', '').replace('"', '')) + stats['z_order_by_array'] = stats['z_order_by_clean'].str.split(',') + + # filter rows with zorder columns and number_of_files is less than threshold + stats = stats[stats['z_order_by_array'].str.len() > 0] + stats = stats[stats['number_of_files'].astype(int) < self.min_number_of_files_for_zorder] + return ( + stats + ) + + def stats(self) -> DataFrame: + """Ouputs the stats per table""" + return self._spark.createDataFrame(self._stats) + + def display(self) -> None: + """Executes the Delta housekeeping analysis and displays a sample of results""" + return self.apply().display() + + def apply(self) -> DataFrame: + """Displays recommendations in a DataFrame format""" + out = None + for recomm in self.generate_recommendations(): + for legend, df in recomm.items(): + out_df = self._spark.createDataFrame(df).withColumn("recommendation", F.lit(legend)) + if out is None: + out = out_df + else: + out = out.unionByName(out_df, allowMissingColumns=True) + return out + + def generate_recommendations(self) -> Iterable[dict]: + """ + Generates Delta Housekeeping recommendations as a list of dictionaries (internal use + unit tests only) + A dict per recommendation where: + - The key is the legend of the recommendation + - The value is a pandas df with the affected tables + """ + out = [] + for df, legend in zip([ + self._need_optimize(), + self._never_vacuumed(), + self._not_optimized_last_days(), + self._not_vacuumed_last_days(), + self._optimized_too_frequently(), + self._vacuumed_too_frequently(), + self._optimize_not_needed(), + self._analyze_these_tables(), + self._zorder_not_effective(), + ], [ + self.tables_not_optimized_legend, + self.tables_not_vacuumed_legend, + self.tables_not_optimized_last_days, + self.tables_not_vacuumed_last_days, + self.tables_optimized_too_freq, + self.tables_vacuumed_too_freq, + self.tables_do_not_need_optimize, + self.tables_to_analyze, + self.tables_zorder_not_effective, + ]): + if not df.empty: + out.append({legend: df}) + return out + + def explain(self) -> None: + # TODO better formatting! + from databricks.sdk.runtime import display + + + for recomm in self.generate_recommendations(): + for legend, df in recomm.items(): + display(legend) + display(df) diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 46f5a14..ed9ffc0 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -1,7 +1,8 @@ import concurrent.futures import copy import re -from typing import Optional, List +import pandas as pd +from typing import Optional, List, Callable, Iterable from discoverx import logging from discoverx.common import helper from discoverx.discovery import Discovery @@ -11,6 +12,7 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import lit from discoverx.table_info import InfoFetcher, TableInfo +from discoverx.delta_housekeeping import DeltaHousekeeping, DeltaHousekeepingActions logger = logging.Logging() @@ -147,7 +149,7 @@ def scan( discover.scan(rules=rules, sample_size=sample_size, what_if=what_if) return discover - def map(self, f) -> list[any]: + def map(self, f: Callable) -> list[any]: """Runs a function for each table in the data explorer Args: @@ -178,6 +180,14 @@ def map(self, f) -> list[any]: return res + def delta_housekeeping(self) -> pd.DataFrame: + """ + Gathers stats and recommendations on Delta Housekeeping + """ + dh = DeltaHousekeeping(self._spark) + dfs_pd: Iterable[pd.DataFrame] = self.map(dh.scan) + return DeltaHousekeepingActions(dfs_pd, spark=self._spark) + class DataExplorerActions: def __init__( diff --git a/examples/exec_delta_housekeeping.py b/examples/exec_delta_housekeeping.py new file mode 100644 index 0000000..504f0cb --- /dev/null +++ b/examples/exec_delta_housekeeping.py @@ -0,0 +1,47 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Run Delta Housekeeping across multiple tables +# MAGIC Analysis that provides stats on Delta tables / recommendations for improvements, including: +# MAGIC - stats:size of tables and number of files, timestamps of latest OPTIMIZE & VACUUM operations, stats of OPTIMIZE) +# MAGIC - recommendations on tables that need to be OPTIMIZED/VACUUM'ed +# MAGIC - are tables OPTIMIZED/VACUUM'ed often enough +# MAGIC - tables that have small files / tables for which ZORDER is not being effective +# MAGIC + +# COMMAND ---------- + +# MAGIC %pip install dbl-discoverx + +# COMMAND ---------- + +from discoverx import DX + +dx = DX() + +# COMMAND ---------- + +# DBTITLE 1,Run the discoverx DeltaHousekeeping operation -generates an output object you can apply operations to +output = ( + dx.from_tables("lorenzorubi.*.*") + .delta_housekeeping() +) + +# COMMAND ---------- + +# DBTITLE 1,apply() operation generates a spark dataframe with recommendations +result = output.apply() +result.select("catalog", "database", "tableName", "recommendation").display() + +# COMMAND ---------- + +# DBTITLE 1,display() runs apply and displays the full result (including stats per table) +output.display() + +# COMMAND ---------- + +# DBTITLE 1,explain() outputs the DeltaHousekeeping recommendations in HTML format +output.explain() + +# COMMAND ---------- + + diff --git a/tests/unit/data/delta_housekeeping/dd_click_sales.csv b/tests/unit/data/delta_housekeeping/dd_click_sales.csv new file mode 100644 index 0000000..4ffb5a3 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dd_click_sales.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes +lorenzorubi,default,click_sales,6,326068799 diff --git a/tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv b/tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv new file mode 100644 index 0000000..70fc5a1 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes +lorenzorubi,default,housekeeping_summary,1,192917 diff --git a/tests/unit/data/delta_housekeeping/dh_click_sales.csv b/tests/unit/data/delta_housekeeping/dh_click_sales.csv new file mode 100644 index 0000000..f35d6b6 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dh_click_sales.csv @@ -0,0 +1,4 @@ +catalog,database,tableName,operation,timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,click_sales,VACUUM END,2023-12-06T16:40:28Z,null,null,null,null +lorenzorubi,default,click_sales,VACUUM END,2023-12-05T01:19:47Z,null,null,null,null +lorenzorubi,default,click_sales,VACUUM END,2023-11-25T04:03:41Z,null,null,null,null diff --git a/tests/unit/data/delta_housekeeping/dh_housekeeping_summary.csv b/tests/unit/data/delta_housekeeping/dh_housekeeping_summary.csv new file mode 100644 index 0000000..d1ee36e --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dh_housekeeping_summary.csv @@ -0,0 +1,25 @@ +catalog,database,tableName,operation,timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T05:50:14Z,192917,192917,192917,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T05:21:22Z,184203,184203,184203,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T04:37:19Z,176955,176955,176955,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T04:10:26Z,168560,168560,168560,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T03:11:02Z,161710,161710,161710,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T02:44:41Z,154166,154166,154166,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T02:18:54Z,145990,145990,145990,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T01:42:12Z,137677,137677,137677,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T01:09:19Z,130864,130864,130864,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:53:33Z,123702,123702,123702,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:43:44Z,118806,118806,118806,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:28:00Z,111983,111983,111983,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:14:21Z,104790,104790,104790,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T23:47:02Z,97314,97314,97314,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T23:18:17Z,91509,91509,91509,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T22:14:48Z,84152,84152,84152,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T21:57:53Z,76464,76464,76464,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T21:30:49Z,67498,67498,67498,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T21:18:59Z,59412,59412,59412,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T20:30:48Z,51173,51173,51173,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T20:12:59Z,42346,42346,42346,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T19:35:05Z,34463,34463,34463,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T19:30:46Z,28604,28604,28604,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T19:06:51Z,8412,17592,17592,[] diff --git a/tests/unit/data/delta_housekeeping/dhk_pandas_result.csv b/tests/unit/data/delta_housekeeping/dhk_pandas_result.csv new file mode 100644 index 0000000..6a60520 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dhk_pandas_result.csv @@ -0,0 +1,20 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error +lorenzorubi,default,housekeeping_summary_v3,1,3787,null,null,null,null,null,null,null,null,null +lorenzorubi,maxmind_geo,gold_ipv6,1,4907069,null,null,null,null,null,null,null,null,null +lorenzorubi,default,click_sales,6,326068799,null,null,2023-12-06T16:40:28Z,2023-12-05T01:19:47Z,null,null,null,null,null +lorenzorubi,default,housekeeping_summary,1,192917,2023-12-05T05:50:14Z,2023-12-05T05:21:22Z,null,null,192917,192917,192917,[],null +lorenzorubi,default,housekeeping_summary_v2,3,12326,2023-12-18T11:25:35Z,null,null,null,5273,5273,5273,[],null +lorenzorubi,maxmind_geo,raw_locations,1,6933,null,null,null,null,null,null,null,null,null +lorenzorubi,tpch,customer,1,61897021,null,null,null,null,null,null,null,null,null +lorenzorubi,tpch,nation,1,3007,null,null,null,null,null,null,null,null,null +lorenzorubi,maxmind_geo,raw_ipv6,1,1783720,null,null,null,null,null,null,null,null,null +lorenzorubi,maxmind_geo,gold_ipv4,1,7220024,null,null,null,null,null,null,null,null,null +lorenzorubi,dais_dlt_2023,enriched_orders,null,null,null,null,null,null,null,null,null,null,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`enriched_orders` does not support DESCRIBE DETAIL. ; line 2 pos 20 +lorenzorubi,default,click_sales_history,1,7710,null,null,null,null,null,null,null,null,null +lorenzorubi,tpch,orders,2406,317120666,null,null,null,null,null,null,null,null,null +lorenzorubi,default,complete_data,6,326060019,null,null,2023-12-06T16:40:36Z,2023-12-05T01:19:25Z,null,null,null,null,null +lorenzorubi,maxmind_geo,raw_ipv4,1,3115269,null,null,null,null,null,null,null,null,null +lorenzorubi,gcp_cost_analysis,sku_prices,1,835,null,null,null,null,null,null,null,null,null +lorenzorubi,dais_dlt_2023,daily_totalorders_by_nation,null,null,null,null,null,null,null,null,null,null,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`daily_totalorders_by_nation` does not support DESCRIBE DETAIL. ; line 2 pos 20 +lorenzorubi,gcp_cost_analysis,project_ids,2,1774,null,null,null,null,null,null,null,null,null +lorenzorubi,dais_dlt_2023,daily_2nd_high_orderprice,null,null,null,null,null,null,null,null,null,null,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`daily_2nd_high_orderprice` does not support DESCRIBE DETAIL. ; line 2 pos 20 diff --git a/tests/unit/data/delta_housekeeping/expected_need_optimize.csv b/tests/unit/data/delta_housekeeping/expected_need_optimize.csv new file mode 100644 index 0000000..237a44c --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_need_optimize.csv @@ -0,0 +1,4 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error +lorenzorubi,default,click_sales,6.0,326068799.0,,,2023-12-06T16:40:28Z,2023-12-05T01:19:47Z,,,,, +lorenzorubi,tpch,orders,2406.0,317120666.0,,,,,,,,, +lorenzorubi,default,complete_data,6.0,326060019.0,,,2023-12-06T16:40:36Z,2023-12-05T01:19:25Z,,,,, diff --git a/tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv b/tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv new file mode 100644 index 0000000..569c1e8 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,click_sales,6,326068799,,,2023-12-06T16:40:28Z,2023-12-05T01:19:47Z,,,, diff --git a/tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv b/tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv new file mode 100644 index 0000000..af564ba --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,housekeeping_summary,1,192917,2023-12-05T05:50:14Z,2023-12-05T05:21:22Z,,,192917,192917,192917,[] diff --git a/tests/unit/delta_housekeeping_actions_test.py b/tests/unit/delta_housekeeping_actions_test.py new file mode 100644 index 0000000..faed05e --- /dev/null +++ b/tests/unit/delta_housekeeping_actions_test.py @@ -0,0 +1,49 @@ +import pytest +import pandas as pd +from discoverx.delta_housekeeping import DeltaHousekeepingActions +from pathlib import Path + + +def _resolve_file_path(request, relative_path): + module_path = Path(request.module.__file__) + test_file_path = module_path.parent / relative_path + return pd.read_csv(str(test_file_path.resolve())) + + +@pytest.fixture() +def housekeeping_stats(request): + return _resolve_file_path(request, "data/delta_housekeeping/dhk_pandas_result.csv") + + +@pytest.fixture() +def expected_need_optimize(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_need_optimize.csv") + + +def test_apply_output(housekeeping_stats, expected_need_optimize): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha.generate_recommendations() + assert len(res) == 6 + need_optimize = [item for item in res if (list(item.keys())[0] == dha.tables_not_optimized_legend)] + assert len(need_optimize) == 1 + need_optimize_df = list(need_optimize[0].values())[0] + pd.testing.assert_frame_equal( + need_optimize_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected_need_optimize.loc[:, ["catalog", "database", "tableName"]], + ) + # TODO complete all the tests + + +def test_empty_apply_output(housekeeping_stats): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + min_table_size_optimize=1024*1024*1024*1024 + ) + res = dha.generate_recommendations() + assert len(res) == 5 + need_optimize = [item for item in res if list(item.keys())[0] == dha.tables_not_optimized_legend] + assert len(need_optimize) == 0 diff --git a/tests/unit/delta_housekeeping_test.py b/tests/unit/delta_housekeeping_test.py new file mode 100644 index 0000000..63aea52 --- /dev/null +++ b/tests/unit/delta_housekeeping_test.py @@ -0,0 +1,98 @@ +import pytest +import pandas as pd +from discoverx.delta_housekeeping import DeltaHousekeeping +from pathlib import Path +import pyspark.sql.functions as F + + +def _resolve_file_path(request, relative_path): + module_path = Path(request.module.__file__) + test_file_path = module_path.parent / relative_path + return pd.read_csv( + str(test_file_path.resolve()), + dtype={ + "max_optimize_timestamp": "str", + "2nd_optimize_timestamp": "str", + "max_vacuum_timestamp": "str", + "2nd_vacuum_timestamp": "str", + } + ) + + +@pytest.fixture() +def dh_click_sales(request): + return _resolve_file_path(request, "data/delta_housekeeping/dh_click_sales.csv") + + +@pytest.fixture() +def dd_click_sales(request): + return _resolve_file_path(request, "data/delta_housekeeping/dd_click_sales.csv") + + +@pytest.fixture() +def expected_pdh_click_sales(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_pdh_click_sales.csv") + + +@pytest.fixture() +def dh_housekeeping_summary(request): + return _resolve_file_path(request, "data/delta_housekeeping/dh_housekeeping_summary.csv") + + +@pytest.fixture() +def dd_housekeeping_summary(request): + return _resolve_file_path(request, "data/delta_housekeeping/dd_housekeeping_summary.csv") + + +@pytest.fixture() +def expected_pdh_housekeeping_summary(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_pdh_housekeeping_summary.csv") + + +def test_process_describe_history_no_optimize(spark, dh_click_sales, dd_click_sales, expected_pdh_click_sales): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_click_sales) + describe_history_df = spark.createDataFrame(dh_click_sales) + out = dh._process_describe_history(describe_detail_df, describe_history_df) + pd.testing.assert_frame_equal( + out.reset_index(), + expected_pdh_click_sales.reset_index(), + ) + + +def test_process_describe_history_no_vacuum( + spark, dh_housekeeping_summary, dd_housekeeping_summary, expected_pdh_housekeeping_summary +): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_housekeeping_summary) + describe_history_df = spark.createDataFrame(dh_housekeeping_summary) + out = dh._process_describe_history(describe_detail_df, describe_history_df) + pd.testing.assert_frame_equal( + out.reset_index(), + expected_pdh_housekeeping_summary.reset_index(), + ) + + +def test_process_describe_history_no_operation(spark, dd_click_sales): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_click_sales) + describe_history_df = spark.createDataFrame([], "string") + out = dh._process_describe_history(describe_detail_df, describe_history_df) + # output should be equal to DESCRIBE DETAIL + pd.testing.assert_frame_equal( + out.reset_index(), + dd_click_sales.reset_index(), + ) + + +def test_process_describe_history_empty_history(spark, dd_click_sales, dh_click_sales): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_click_sales) + describe_history_df = spark.createDataFrame(dh_click_sales) + describe_history_df = describe_history_df.withColumn("operation", F.lit("NOOP")) + out = dh._process_describe_history(describe_detail_df, describe_history_df) + # output should be equal to DESCRIBE DETAIL + pd.testing.assert_frame_equal( + out.reset_index(), + dd_click_sales.reset_index(), + ) \ No newline at end of file diff --git a/tests/unit/explorer_test.py b/tests/unit/explorer_test.py index 8475599..429f3ae 100644 --- a/tests/unit/explorer_test.py +++ b/tests/unit/explorer_test.py @@ -1,3 +1,4 @@ +import pandas import pytest from discoverx.explorer import DataExplorer, DataExplorerActions, InfoFetcher, TableInfo @@ -89,3 +90,14 @@ def test_no_tables_matching_filter(spark, info_fetcher): data_explorer = DataExplorer("some_catalog.default.non_existent_table", spark, info_fetcher) with pytest.raises(ValueError): data_explorer.map(lambda table_info: table_info) + + +def test_delta_housekeeeping_call(spark, info_fetcher): + data_explorer = DataExplorer("*.default.*", spark, info_fetcher) + result: pandas.DataFrame = data_explorer.delta_housekeeping()._stats + print(result['tableName'].count()) + assert result['tableName'].count() == 3 + for res in result['tableName'].tolist(): + assert res in ["tb_all_types", "tb_1", "tb_2"] + for col in result.columns: + assert col in ["catalog", "database", "tableName", "error"]