From 5d7889e8e68640929ee2f4a760280db368ea0346 Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio <104133639+lorenzorubi-db@users.noreply.github.com> Date: Mon, 18 Dec 2023 12:05:58 +0100 Subject: [PATCH 01/13] delta housekeeping initial commit --- discoverx/delta_housekeeping.py | 179 ++++++++++++++++++++++++++++ discoverx/explorer.py | 48 +++++++- examples/exec_delta_housekeeping.py | 31 +++++ 3 files changed, 256 insertions(+), 2 deletions(-) create mode 100644 discoverx/delta_housekeeping.py create mode 100644 examples/exec_delta_housekeeping.py diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py new file mode 100644 index 0000000..1ae3ce7 --- /dev/null +++ b/discoverx/delta_housekeeping.py @@ -0,0 +1,179 @@ +from typing import Iterable +from discoverx.table_info import TableInfo + +from pyspark.sql import DataFrame +from pyspark.sql.window import Window +import pyspark.sql.types as T +import pyspark.sql.functions as F + + + + +class DeltaHousekeeping: + empty_schema = T.StructType([ + T.StructField("catalog", T.StringType()), + T.StructField("database", T.StringType()), + T.StructField("tableName", T.StringType()), + ]) + + @staticmethod + def _process_describe_history(describe_detail_df, describe_history_df) -> DataFrame: + """ + processes the DESCRIBE HISTORY result of potentially several tables in different schemas/catalogs + Provides + - table stats (size and number of files) + - timestamp for last & second last OPTIMIZE + - stats of OPTIMIZE (including ZORDER) + - timestamp for last & second last VACUUM + + TODO reconsider if it is better outside of the class + """ + if not "operation" in describe_history_df.columns: + return describe_detail_df + + # window over operation + operation_order = ( + describe_history_df + .filter(F.col("operation").isin(["OPTIMIZE", "VACUUM END"])) + .withColumn("operation_order", F.row_number().over( + Window.partitionBy(["catalog", "database", "tableName", "operation"]).orderBy(F.col("timestamp").desc()) + )) + ) + # max & 2nd timestamp of OPTIMIZE into output + out = describe_detail_df.join( + operation_order + .filter((F.col("operation") == "OPTIMIZE") & (F.col("operation_order") == 1)) + .select("catalog", "database", "tableName", "timestamp") + .withColumnRenamed("timestamp", "max_optimize_timestamp"), + how="outer", on=["catalog", "database", "tableName"] + ) + out = out.join( + operation_order + .filter((F.col("operation") == "OPTIMIZE") & (F.col("operation_order") == 2)) + .select("catalog", "database", "tableName", "timestamp") + .withColumnRenamed("timestamp", "2nd_optimize_timestamp"), + how="outer", on=["catalog", "database", "tableName"] + ) + # max timestamp of VACUUM into output + out = out.join( + operation_order + .filter((F.col("operation") == "VACUUM END") & (F.col("operation_order") == 1)) + .select("catalog", "database", "tableName", "timestamp") + .withColumnRenamed("timestamp", "max_vacuum_timestamp"), + how="outer", on=["catalog", "database", "tableName"] + ) + out = out.join( + operation_order + .filter((F.col("operation") == "VACUUM END") & (F.col("operation_order") == 2)) + .select("catalog", "database", "tableName", "timestamp") + .withColumnRenamed("timestamp", "2nd_vacuum_timestamp"), + how="outer", on=["catalog", "database", "tableName"] + ) + # summary of table metrics + table_metrics_1 = ( + operation_order.filter((F.col("operation") == "OPTIMIZE") & (F.col("operation_order") == 1)) + .select([ + F.col("catalog"), + F.col("database"), + F.col("tableName"), + F.col("min_file_size"), + F.col("p50_file_size"), + F.col("max_file_size"), + F.col("z_order_by"), + ]) + ) + + # write to output + out = out.join( + table_metrics_1, + how="outer", on=["catalog", "database", "tableName"] + ) + + return out + + def scan( + self, + table_info_list: Iterable[TableInfo], + housekeeping_table_name: str = "lorenzorubi.default.housekeeping_summary_v2", # TODO remove + do_save_as_table: bool = True, + ): + dd_list = [] + statements = [] + errors = [] + + if not isinstance(table_info_list, Iterable): + table_info_list = [table_info_list] + + for table_info in table_info_list: + try: + dd = spark.sql(f""" + DESCRIBE DETAIL {table_info.catalog}.{table_info.schema}.{table_info.table}; + """) + + dd = ( + dd + .withColumn("split", F.split(F.col('name'), '\.')) + .withColumn("catalog", F.col("split").getItem(0)) + .withColumn("database", F.col("split").getItem(1)) + .withColumn("tableName", F.col("split").getItem(2)) + .select([ + F.col("catalog"), + F.col("database"), + F.col("tableName"), + F.col("numFiles").alias("number_of_files"), + F.col("sizeInBytes").alias("bytes"), + ]) + ) + dd_list.append(dd) + statements.append(f""" + SELECT + '{table_info.catalog}' AS catalog, + '{table_info.schema}' AS database, + '{table_info.table}' AS tableName, + operation, + timestamp, + operationMetrics.minFileSize AS min_file_size, + operationMetrics.p50FileSize AS p50_file_size, + operationMetrics.maxFileSize AS max_file_size, + operationParameters.zOrderBy AS z_order_by + FROM (DESCRIBE HISTORY {table_info.catalog}.{table_info.schema}.{table_info.table}) + WHERE operation in ('OPTIMIZE', 'VACUUM END') + """) + except Exception as e: + errors.append(spark.createDataFrame( + [(table_info.catalog, table_info.schema, table_info.table, str(e))], + ["catalog", "database", "tableName", "error"] + )) + + statement = " UNION ".join(statements) + + dh = spark.createDataFrame([], self.empty_schema) + if statements: + dh = self.process_describe_history( + reduce( + lambda left, right: left.union(right), + dd_list + ), + spark.sql(statement), + None + ) + + errors_df = spark.createDataFrame([], self.empty_schema) + if errors: + errors_df = reduce( + lambda left, right: left.union(right), + errors + ) + + out = dh.unionByName(errors_df, allowMissingColumns=True) + if do_save_as_table: + ( + out + .write + .format("delta") + .mode("append") + .option("mergeSchema", "true") + .saveAsTable(housekeeping_table_name) + ) + return out + diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 46f5a14..1b57c5d 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -1,7 +1,8 @@ import concurrent.futures import copy import re -from typing import Optional, List +import more_itertools +from typing import Optional, List, Callable, Iterable from discoverx import logging from discoverx.common import helper from discoverx.discovery import Discovery @@ -11,6 +12,7 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import lit from discoverx.table_info import InfoFetcher, TableInfo +from discoverx.delta_housekeeping import DeltaHousekeeping logger = logging.Logging() @@ -147,7 +149,7 @@ def scan( discover.scan(rules=rules, sample_size=sample_size, what_if=what_if) return discover - def map(self, f) -> list[any]: + def map(self, f: Callable) -> list[any]: """Runs a function for each table in the data explorer Args: @@ -178,6 +180,48 @@ def map(self, f) -> list[any]: return res + def map_chunked(self, f: Callable, tables_per_chunk: int, **kwargs) -> list[any]: + """Runs a function for each table in the data explorer + + Args: + f (function): The function to run. The function should accept either a list of TableInfo objects as input and return a list of any object as output. + + Returns: + list[any]: A list of the results of running the function for each table + """ + res = [] + table_list = self._info_fetcher.get_tables_info( + self._catalogs, + self._schemas, + self._tables, + self._having_columns, + self._with_tags, + ) + with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_concurrency) as executor: + # Submit tasks to the thread pool + futures = [ + executor.submit(f, table_chunk, **kwargs) for table_chunk in more_itertools.chunked(table_list, tables_per_chunk) + ] + + # Process completed tasks + for future in concurrent.futures.as_completed(futures): + result = future.result() + if result is not None: + res.extend(result) + + logger.debug("Finished lakehouse map_chunked task") + + return res + + def delta_housekeeping(self) -> DataFrame: + """ + + """ + dh = DeltaHousekeeping() + self.map( + dh.scan + ) + class DataExplorerActions: def __init__( diff --git a/examples/exec_delta_housekeeping.py b/examples/exec_delta_housekeeping.py new file mode 100644 index 0000000..e8bfeee --- /dev/null +++ b/examples/exec_delta_housekeeping.py @@ -0,0 +1,31 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Run arbitrary operations across multiple tables +# MAGIC + +# COMMAND ---------- + +# MAGIC %reload_ext autoreload +# MAGIC %autoreload 2 + +# COMMAND ---------- + + +from discoverx import DX + +dx = DX() + +# COMMAND ---------- + + +# COMMAND ---------- + +result = ( + dx.from_tables("lorenzorubi.*.*") + .with_concurrency(1) # You can increase the concurrency with this parameter + .delta_housekeeping() +) +print(len(result)) + +# COMMAND ---------- + From 90bab2790fc6f380b86a7b88e5aae5977a14ee97 Mon Sep 17 00:00:00 2001 From: lorenzorubi-db Date: Mon, 18 Dec 2023 11:34:57 +0000 Subject: [PATCH 02/13] debugging initial version --- discoverx/delta_housekeeping.py | 39 ++++++++++++++++++----------- discoverx/explorer.py | 4 +-- examples/exec_delta_housekeeping.py | 4 +-- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py index 1ae3ce7..8393405 100644 --- a/discoverx/delta_housekeeping.py +++ b/discoverx/delta_housekeeping.py @@ -1,20 +1,22 @@ from typing import Iterable +from functools import reduce from discoverx.table_info import TableInfo -from pyspark.sql import DataFrame +from pyspark.sql import DataFrame, SparkSession from pyspark.sql.window import Window import pyspark.sql.types as T import pyspark.sql.functions as F - - class DeltaHousekeeping: - empty_schema = T.StructType([ - T.StructField("catalog", T.StringType()), - T.StructField("database", T.StringType()), - T.StructField("tableName", T.StringType()), - ]) + + def __init__(self, spark: SparkSession) -> None: + self._spark = spark + self.empty_schema = T.StructType([ + T.StructField("catalog", T.StringType()), + T.StructField("database", T.StringType()), + T.StructField("tableName", T.StringType()), + ]) @staticmethod def _process_describe_history(describe_detail_df, describe_history_df) -> DataFrame: @@ -97,6 +99,11 @@ def scan( housekeeping_table_name: str = "lorenzorubi.default.housekeeping_summary_v2", # TODO remove do_save_as_table: bool = True, ): + """ + Scans a table_info / table_info_list to fetch Delta stats + - DESCRIBE DETAIL + - DESCRIBE HISTORY + """ dd_list = [] statements = [] errors = [] @@ -106,10 +113,12 @@ def scan( for table_info in table_info_list: try: - dd = spark.sql(f""" + # runs a describe detail per table, figures out if exception + dd = self._spark.sql(f""" DESCRIBE DETAIL {table_info.catalog}.{table_info.schema}.{table_info.table}; """) + # prepares a DESCRIBE HISTORY statement per table (will be run outside of the loop) dd = ( dd .withColumn("split", F.split(F.col('name'), '\.')) @@ -140,25 +149,25 @@ def scan( WHERE operation in ('OPTIMIZE', 'VACUUM END') """) except Exception as e: - errors.append(spark.createDataFrame( + errors.append(self._spark.createDataFrame( [(table_info.catalog, table_info.schema, table_info.table, str(e))], ["catalog", "database", "tableName", "error"] )) + # statement to UNION all DESCRIBE HISTORY together statement = " UNION ".join(statements) - dh = spark.createDataFrame([], self.empty_schema) + dh = self._spark.createDataFrame([], self.empty_schema) if statements: - dh = self.process_describe_history( + dh = self._process_describe_history( reduce( lambda left, right: left.union(right), dd_list ), - spark.sql(statement), - None + self._spark.sql(statement), ) - errors_df = spark.createDataFrame([], self.empty_schema) + errors_df = self._spark.createDataFrame([], self.empty_schema) if errors: errors_df = reduce( lambda left, right: left.union(right), diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 1b57c5d..d7e7b29 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -217,8 +217,8 @@ def delta_housekeeping(self) -> DataFrame: """ """ - dh = DeltaHousekeeping() - self.map( + dh = DeltaHousekeeping(self._spark) + return self.map( dh.scan ) diff --git a/examples/exec_delta_housekeeping.py b/examples/exec_delta_housekeeping.py index e8bfeee..3b90a47 100644 --- a/examples/exec_delta_housekeeping.py +++ b/examples/exec_delta_housekeeping.py @@ -15,9 +15,6 @@ dx = DX() -# COMMAND ---------- - - # COMMAND ---------- result = ( @@ -29,3 +26,4 @@ # COMMAND ---------- + From 94629e03213d1988bed2717cc2bb8ea05bf1664d Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio <104133639+lorenzorubi-db@users.noreply.github.com> Date: Mon, 18 Dec 2023 13:18:19 +0100 Subject: [PATCH 03/13] convert output to pandas --- discoverx/delta_housekeeping.py | 23 ++++++++++++++--------- discoverx/explorer.py | 8 +++++--- examples/exec_delta_housekeeping.py | 2 +- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py index 8393405..eca5d5d 100644 --- a/discoverx/delta_housekeeping.py +++ b/discoverx/delta_housekeeping.py @@ -2,6 +2,8 @@ from functools import reduce from discoverx.table_info import TableInfo +import pandas as pd + from pyspark.sql import DataFrame, SparkSession from pyspark.sql.window import Window import pyspark.sql.types as T @@ -19,7 +21,9 @@ def __init__(self, spark: SparkSession) -> None: ]) @staticmethod - def _process_describe_history(describe_detail_df, describe_history_df) -> DataFrame: + def _process_describe_history( + describe_detail_df: DataFrame, describe_history_df: DataFrame + ) -> DataFrame: """ processes the DESCRIBE HISTORY result of potentially several tables in different schemas/catalogs Provides @@ -94,11 +98,11 @@ def _process_describe_history(describe_detail_df, describe_history_df) -> DataFr return out def scan( - self, - table_info_list: Iterable[TableInfo], - housekeeping_table_name: str = "lorenzorubi.default.housekeeping_summary_v2", # TODO remove - do_save_as_table: bool = True, - ): + self, + table_info_list: Iterable[TableInfo], + housekeeping_table_name: str = "lorenzorubi.default.housekeeping_summary_v2", # TODO remove + do_save_as_table: bool = True, + ) -> pd.DataFrame: """ Scans a table_info / table_info_list to fetch Delta stats - DESCRIBE DETAIL @@ -117,8 +121,6 @@ def scan( dd = self._spark.sql(f""" DESCRIBE DETAIL {table_info.catalog}.{table_info.schema}.{table_info.table}; """) - - # prepares a DESCRIBE HISTORY statement per table (will be run outside of the loop) dd = ( dd .withColumn("split", F.split(F.col('name'), '\.')) @@ -134,6 +136,8 @@ def scan( ]) ) dd_list.append(dd) + + # prepares a DESCRIBE HISTORY statement per table (will be run outside of the loop) statements.append(f""" SELECT '{table_info.catalog}' AS catalog, @@ -175,6 +179,7 @@ def scan( ) out = dh.unionByName(errors_df, allowMissingColumns=True) + if do_save_as_table: ( out @@ -184,5 +189,5 @@ def scan( .option("mergeSchema", "true") .saveAsTable(housekeeping_table_name) ) - return out + return out.toPandas() diff --git a/discoverx/explorer.py b/discoverx/explorer.py index d7e7b29..6df804a 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -2,6 +2,7 @@ import copy import re import more_itertools +import pandas as pd from typing import Optional, List, Callable, Iterable from discoverx import logging from discoverx.common import helper @@ -213,14 +214,15 @@ def map_chunked(self, f: Callable, tables_per_chunk: int, **kwargs) -> list[any] return res - def delta_housekeeping(self) -> DataFrame: + def delta_housekeeping(self) -> pd.DataFrame: """ - + Gathers stats and recommendations on Delta Housekeeping """ dh = DeltaHousekeeping(self._spark) - return self.map( + dfs_pd: Iterable[pd.DataFrame] = self.map( dh.scan ) + return reduce(lambda x, y: x.union(y), dfs_pd) # TODO create DeltaHousekeepingActions and implement `apply` class DataExplorerActions: diff --git a/examples/exec_delta_housekeeping.py b/examples/exec_delta_housekeeping.py index 3b90a47..5a08a9a 100644 --- a/examples/exec_delta_housekeeping.py +++ b/examples/exec_delta_housekeeping.py @@ -22,8 +22,8 @@ .with_concurrency(1) # You can increase the concurrency with this parameter .delta_housekeeping() ) -print(len(result)) # COMMAND ---------- +display(result) From 543f852ae7682c1e4f2167635f8649198e8854d5 Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio <104133639+lorenzorubi-db@users.noreply.github.com> Date: Mon, 18 Dec 2023 13:28:45 +0100 Subject: [PATCH 04/13] debugging -convert output to pandas --- discoverx/explorer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 6df804a..83e740d 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -222,7 +222,7 @@ def delta_housekeeping(self) -> pd.DataFrame: dfs_pd: Iterable[pd.DataFrame] = self.map( dh.scan ) - return reduce(lambda x, y: x.union(y), dfs_pd) # TODO create DeltaHousekeepingActions and implement `apply` + return pd.concat(dfs_pd) # TODO create DeltaHousekeepingActions and implement `apply` class DataExplorerActions: From 567b303104a26c8a46ca7941b51d026361b1bca9 Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio <104133639+lorenzorubi-db@users.noreply.github.com> Date: Tue, 19 Dec 2023 15:21:25 +0100 Subject: [PATCH 05/13] DeltaHousekeepingActions object and tests --- discoverx/delta_housekeeping.py | 37 +++++++++++++++++++ discoverx/explorer.py | 8 ++-- setup.py | 1 + .../delta_housekeeping/dhk_pandas_result.csv | 20 ++++++++++ .../expected_need_optimize.csv | 4 ++ tests/unit/delta_housekeeping_test.py | 25 +++++++++++++ 6 files changed, 90 insertions(+), 5 deletions(-) create mode 100644 tests/unit/data/delta_housekeeping/dhk_pandas_result.csv create mode 100644 tests/unit/data/delta_housekeeping/expected_need_optimize.csv create mode 100644 tests/unit/delta_housekeeping_test.py diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py index eca5d5d..6a6463b 100644 --- a/discoverx/delta_housekeeping.py +++ b/discoverx/delta_housekeeping.py @@ -191,3 +191,40 @@ def scan( ) return out.toPandas() + + +class DeltaHousekeepingActions: + def __init__( + self, + # delta_housekeeping: DeltaHousekeeping, + mapped_pd_dfs: Iterable[pd.DataFrame], + # spark: SparkSession = None, + min_table_size_optimize: int = 128*1024*1024, + stats: pd.DataFrame = None, # for testability only + ) -> None: + # self._delta_housekeeping = delta_housekeeping + if stats is None: + self._mapped_pd_dfs = mapped_pd_dfs + stats = pd.concat(self._mapped_pd_dfs) + self._stats: pd.DataFrame = stats + # if spark is None: + # spark = SparkSession.builder.getOrCreate() + # self._spark = spark + self.min_table_size_optimize = min_table_size_optimize + self.tables_not_optimized_legend = "Tables that are never OPTIMIZED and would benefit from it" + + def stats(self) -> pd.DataFrame: + return self._stats + + def _need_optimize(self) -> pd.DataFrame: + stats = self._stats + return ( + stats.loc[stats.max_optimize_timestamp.isnull() & (stats.bytes > self.min_table_size_optimize)] + ) + + def apply(self): + return [ + {self.tables_not_optimized_legend: self._need_optimize()} + ] + + diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 83e740d..94412f4 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -13,7 +13,7 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import lit from discoverx.table_info import InfoFetcher, TableInfo -from discoverx.delta_housekeeping import DeltaHousekeeping +from discoverx.delta_housekeeping import DeltaHousekeeping, DeltaHousekeepingActions logger = logging.Logging() @@ -219,10 +219,8 @@ def delta_housekeeping(self) -> pd.DataFrame: Gathers stats and recommendations on Delta Housekeeping """ dh = DeltaHousekeeping(self._spark) - dfs_pd: Iterable[pd.DataFrame] = self.map( - dh.scan - ) - return pd.concat(dfs_pd) # TODO create DeltaHousekeepingActions and implement `apply` + dfs_pd: Iterable[pd.DataFrame] = self.map(dh.scan) + return DeltaHousekeepingActions(dfs_pd) class DataExplorerActions: diff --git a/setup.py b/setup.py index 9233b4d..0e4d73f 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ "delta-spark>=2.2.0", "pandas<2.0.0", # From 2.0.0 onwards, pandas does not support iteritems() anymore, spark.createDataFrame will fail "numpy<1.24", # From 1.24 onwards, module 'numpy' has no attribute 'bool'. + "more_itertools", ] TEST_REQUIREMENTS = [ diff --git a/tests/unit/data/delta_housekeeping/dhk_pandas_result.csv b/tests/unit/data/delta_housekeeping/dhk_pandas_result.csv new file mode 100644 index 0000000..6a60520 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dhk_pandas_result.csv @@ -0,0 +1,20 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error +lorenzorubi,default,housekeeping_summary_v3,1,3787,null,null,null,null,null,null,null,null,null +lorenzorubi,maxmind_geo,gold_ipv6,1,4907069,null,null,null,null,null,null,null,null,null +lorenzorubi,default,click_sales,6,326068799,null,null,2023-12-06T16:40:28Z,2023-12-05T01:19:47Z,null,null,null,null,null +lorenzorubi,default,housekeeping_summary,1,192917,2023-12-05T05:50:14Z,2023-12-05T05:21:22Z,null,null,192917,192917,192917,[],null +lorenzorubi,default,housekeeping_summary_v2,3,12326,2023-12-18T11:25:35Z,null,null,null,5273,5273,5273,[],null +lorenzorubi,maxmind_geo,raw_locations,1,6933,null,null,null,null,null,null,null,null,null +lorenzorubi,tpch,customer,1,61897021,null,null,null,null,null,null,null,null,null +lorenzorubi,tpch,nation,1,3007,null,null,null,null,null,null,null,null,null +lorenzorubi,maxmind_geo,raw_ipv6,1,1783720,null,null,null,null,null,null,null,null,null +lorenzorubi,maxmind_geo,gold_ipv4,1,7220024,null,null,null,null,null,null,null,null,null +lorenzorubi,dais_dlt_2023,enriched_orders,null,null,null,null,null,null,null,null,null,null,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`enriched_orders` does not support DESCRIBE DETAIL. ; line 2 pos 20 +lorenzorubi,default,click_sales_history,1,7710,null,null,null,null,null,null,null,null,null +lorenzorubi,tpch,orders,2406,317120666,null,null,null,null,null,null,null,null,null +lorenzorubi,default,complete_data,6,326060019,null,null,2023-12-06T16:40:36Z,2023-12-05T01:19:25Z,null,null,null,null,null +lorenzorubi,maxmind_geo,raw_ipv4,1,3115269,null,null,null,null,null,null,null,null,null +lorenzorubi,gcp_cost_analysis,sku_prices,1,835,null,null,null,null,null,null,null,null,null +lorenzorubi,dais_dlt_2023,daily_totalorders_by_nation,null,null,null,null,null,null,null,null,null,null,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`daily_totalorders_by_nation` does not support DESCRIBE DETAIL. ; line 2 pos 20 +lorenzorubi,gcp_cost_analysis,project_ids,2,1774,null,null,null,null,null,null,null,null,null +lorenzorubi,dais_dlt_2023,daily_2nd_high_orderprice,null,null,null,null,null,null,null,null,null,null,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`daily_2nd_high_orderprice` does not support DESCRIBE DETAIL. ; line 2 pos 20 diff --git a/tests/unit/data/delta_housekeeping/expected_need_optimize.csv b/tests/unit/data/delta_housekeeping/expected_need_optimize.csv new file mode 100644 index 0000000..237a44c --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_need_optimize.csv @@ -0,0 +1,4 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error +lorenzorubi,default,click_sales,6.0,326068799.0,,,2023-12-06T16:40:28Z,2023-12-05T01:19:47Z,,,,, +lorenzorubi,tpch,orders,2406.0,317120666.0,,,,,,,,, +lorenzorubi,default,complete_data,6.0,326060019.0,,,2023-12-06T16:40:36Z,2023-12-05T01:19:25Z,,,,, diff --git a/tests/unit/delta_housekeeping_test.py b/tests/unit/delta_housekeeping_test.py new file mode 100644 index 0000000..1b760f1 --- /dev/null +++ b/tests/unit/delta_housekeeping_test.py @@ -0,0 +1,25 @@ +import pytest +import pandas as pd +from discoverx.delta_housekeeping import DeltaHousekeepingActions +from pathlib import Path + + +def test_need_optimize(request): + module_path = Path(request.module.__file__) + test_file_path = module_path.parent / "data/delta_housekeeping/dhk_pandas_result.csv" + stats = pd.read_csv(str(test_file_path.resolve())) + dha = DeltaHousekeepingActions( + None, + stats=stats, + ) + res = dha.apply() + assert len(res) == 1 + need_optimize = [item for item in res if list(res[0].keys())[0] == dha.tables_not_optimized_legend] + assert len(need_optimize) == 1 + need_optimize_df = list(need_optimize[0].values())[0] + need_optimize_df.to_csv(module_path.parent / "data/delta_housekeeping/expected_need_optimize.csv", index=False) + expected = pd.read_csv(module_path.parent / "data/delta_housekeeping/expected_need_optimize.csv") + pd.testing.assert_frame_equal( + need_optimize_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected.loc[:, ["catalog", "database", "tableName"]], + ) From bded30562c3fd3716f261c0f12e811d6b6886fec Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio <104133639+lorenzorubi-db@users.noreply.github.com> Date: Thu, 21 Dec 2023 17:58:44 +0100 Subject: [PATCH 06/13] added more insights to housekeeping and refactored tests --- discoverx/delta_housekeeping.py | 150 +++++++++++++++++- examples/exec_delta_housekeeping.py | 23 ++- tests/unit/delta_housekeeping_actions_test.py | 49 ++++++ tests/unit/delta_housekeeping_test.py | 25 --- 4 files changed, 213 insertions(+), 34 deletions(-) create mode 100644 tests/unit/delta_housekeeping_actions_test.py delete mode 100644 tests/unit/delta_housekeeping_test.py diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py index 6a6463b..c3cac7e 100644 --- a/discoverx/delta_housekeeping.py +++ b/discoverx/delta_housekeeping.py @@ -1,9 +1,10 @@ from typing import Iterable from functools import reduce -from discoverx.table_info import TableInfo - +from datetime import datetime import pandas as pd +from discoverx.table_info import TableInfo + from pyspark.sql import DataFrame, SparkSession from pyspark.sql.window import Window import pyspark.sql.types as T @@ -199,7 +200,13 @@ def __init__( # delta_housekeeping: DeltaHousekeeping, mapped_pd_dfs: Iterable[pd.DataFrame], # spark: SparkSession = None, - min_table_size_optimize: int = 128*1024*1024, + min_table_size_optimize: int = 128*1024*1024, # i.e. 128 MB + min_days_not_optimized: int = 7, + min_days_not_vacuumed: int = 31, + max_optimize_freq: int = 2, + max_vacuum_freq: int = 2, + small_file_threshold: int = 32*1024*1024, # i.e. 32 MB + min_number_of_files_for_zorder: int = 8, stats: pd.DataFrame = None, # for testability only ) -> None: # self._delta_housekeeping = delta_housekeeping @@ -211,20 +218,149 @@ def __init__( # spark = SparkSession.builder.getOrCreate() # self._spark = spark self.min_table_size_optimize = min_table_size_optimize + self.min_days_not_optimized = min_days_not_optimized + self.min_days_not_vacuumed = min_days_not_vacuumed + self.max_optimize_freq = max_optimize_freq + self.max_vacuum_freq = max_vacuum_freq + self.small_file_threshold = small_file_threshold + self.min_number_of_files_for_zorder = min_number_of_files_for_zorder self.tables_not_optimized_legend = "Tables that are never OPTIMIZED and would benefit from it" + self.tables_not_vacuumed_legend = "Tables that are never VACUUM'ed" + self.tables_not_optimized_last_days = "Tables that are not OPTIMIZED often enough" + self.tables_not_vacuumed_last_days = "Tables that are not VACUUM'ed often enough" + self.tables_optimized_too_freq = "Tables that are OPTIMIZED too often" + self.tables_vacuumed_too_freq = "Tables that are VACUUM'ed too often" + self.tables_do_not_need_optimize = "Tables that are too small to be OPTIMIZED" + self.tables_to_analyze = "Tables that need more analysis (small_files)" + self.tables_zorder_not_effective = "Tables for which ZORDER is not being effective" def stats(self) -> pd.DataFrame: return self._stats def _need_optimize(self) -> pd.DataFrame: + stats = self._stats.copy() + stats = stats.loc[stats.max_optimize_timestamp.isnull() & stats.bytes.notnull()] + return ( + stats.loc[(stats.bytes.astype(int) > self.min_table_size_optimize)] + ) + + def _optimize_not_needed(self) -> pd.DataFrame: + stats = self._stats.copy() + stats = stats.loc[stats.max_optimize_timestamp.isnull() & stats.bytes.notnull()] + return ( + stats.loc[stats.max_optimize_timestamp.notnull() & (stats.bytes.astype(int) > self.min_table_size_optimize)] + ) + + def _not_optimized_last_days(self) -> pd.DataFrame: + stats = self._stats.copy() + stats['max_optimize_timestamp'] = pd.to_datetime(stats['max_optimize_timestamp']) + stats['optimize_lag'] = ( + datetime.utcnow().replace(tzinfo=stats.dtypes["max_optimize_timestamp"].tz) - stats['max_optimize_timestamp'] + ).dt.days + return ( + stats[stats['optimize_lag'] < self.min_days_not_optimized] + ) + + def _optimized_too_frequently(self) -> pd.DataFrame: + stats = self._stats.copy() + stats['max_optimize_timestamp'] = pd.to_datetime(stats['max_optimize_timestamp']) + stats['2nd_optimize_timestamp'] = pd.to_datetime(stats['2nd_optimize_timestamp']) + stats['optimize_lag'] = (stats['max_optimize_timestamp'] - stats['2nd_optimize_timestamp']).dt.days + return ( + stats[stats['optimize_lag'] < self.max_optimize_freq] + ) + + def _never_vacuumed(self) -> pd.DataFrame: stats = self._stats return ( - stats.loc[stats.max_optimize_timestamp.isnull() & (stats.bytes > self.min_table_size_optimize)] + stats.loc[stats.max_vacuum_timestamp.isnull()] + ) + + def _not_vacuumed_last_days(self) -> pd.DataFrame: + stats = self._stats.copy() + stats['max_vacuum_timestamp'] = pd.to_datetime(stats['max_vacuum_timestamp']) + stats['vacuum_lag'] = ( + datetime.utcnow().replace(tzinfo=stats.dtypes["max_vacuum_timestamp"].tz) - stats['max_vacuum_timestamp'] + ).dt.days + return ( + stats[stats['vacuum_lag'] < self.min_days_not_vacuumed] + ) + + def _vacuumed_too_frequently(self) -> pd.DataFrame: + stats = self._stats.copy() + stats['max_vacuum_timestamp'] = pd.to_datetime(stats['max_vacuum_timestamp']) + stats['2nd_vacuum_timestamp'] = pd.to_datetime(stats['2nd_vacuum_timestamp']) + stats['vacuum_lag'] = (stats['max_vacuum_timestamp'] - stats['2nd_vacuum_timestamp']).dt.days + return ( + stats[stats['vacuum_lag'] < self.max_vacuum_freq] + ) + + def _analyze_these_tables(self) -> pd.DataFrame: + stats = self._stats.copy() + stats = stats.loc[stats['max_optimize_timestamp'].notnull() & + stats['p50_file_size'].notnull() & + (stats['number_of_files'] > 1)] + stats = stats.loc[(stats['p50_file_size'] < self.small_file_threshold)] + return ( + stats.sort_values(by=['database', 'tableName', 'number_of_files'], ascending=[True, True, False]) + ) + + def _zorder_not_effective(self) -> pd.DataFrame: + stats = self._stats.copy() + stats = stats.loc[stats['max_optimize_timestamp'].notnull() & + stats['p50_file_size'].notnull()] + + # clean up z_order_by column and split into array + stats['z_order_by_clean'] = stats['z_order_by'].apply( + lambda x: None if x == "[]" else x.replace('[', '').replace(']', '').replace('"', '')) + stats['z_order_by_array'] = stats['z_order_by_clean'].str.split(',') + + # filter rows with zorder columns and number_of_files is less than threshold + stats = stats[stats['z_order_by_array'].str.len() > 0] + stats = stats[stats['number_of_files'].astype(int) < self.min_number_of_files_for_zorder] + return ( + stats ) def apply(self): - return [ - {self.tables_not_optimized_legend: self._need_optimize()} - ] + out = [] + for df, legend in zip([ + self._need_optimize(), + self._never_vacuumed(), + self._not_optimized_last_days(), + self._not_vacuumed_last_days(), + self._optimized_too_frequently(), + self._vacuumed_too_frequently(), + self._optimize_not_needed(), + self._analyze_these_tables(), + self._zorder_not_effective(), + ], [ + self.tables_not_optimized_legend, + self.tables_not_vacuumed_legend, + self.tables_not_optimized_last_days, + self.tables_not_vacuumed_last_days, + self.tables_optimized_too_freq, + self.tables_vacuumed_too_freq, + self.tables_do_not_need_optimize, + self.tables_to_analyze, + self.tables_zorder_not_effective, + ]): + if not df.empty: + out.append({legend: df}) + return out + + def to_html(self): + # TODO better formatting! + from bs4 import BeautifulSoup + res = self.apply() + soup = BeautifulSoup(features='xml') + body = soup.new_tag('body') + soup.insert(0, body) + for r in res: + for k,v in r.items(): + title_s = soup.new_tag('title') + title_s.string = k + body.insert(0, v.to_html()) + body.insert(0, title_s) diff --git a/examples/exec_delta_housekeeping.py b/examples/exec_delta_housekeeping.py index 5a08a9a..d221432 100644 --- a/examples/exec_delta_housekeeping.py +++ b/examples/exec_delta_housekeeping.py @@ -5,6 +5,7 @@ # COMMAND ---------- +# TODO remove # MAGIC %reload_ext autoreload # MAGIC %autoreload 2 @@ -19,11 +20,29 @@ result = ( dx.from_tables("lorenzorubi.*.*") - .with_concurrency(1) # You can increase the concurrency with this parameter .delta_housekeeping() + .stats() ) +display(result) + +# COMMAND ---------- + +result = ( + dx.from_tables("lorenzorubi.*.*") + .delta_housekeeping() + .apply() +) + +# COMMAND ---------- + +result = ( + dx.from_tables("lorenzorubi.*.*") + .delta_housekeeping() + .html() +) + +displayHTML(result) # COMMAND ---------- -display(result) diff --git a/tests/unit/delta_housekeeping_actions_test.py b/tests/unit/delta_housekeeping_actions_test.py new file mode 100644 index 0000000..7ccbd9a --- /dev/null +++ b/tests/unit/delta_housekeeping_actions_test.py @@ -0,0 +1,49 @@ +import pytest +import pandas as pd +from discoverx.delta_housekeeping import DeltaHousekeepingActions +from pathlib import Path + + +def _resolve_file_path(request, relative_path): + module_path = Path(request.module.__file__) + test_file_path = module_path.parent / relative_path + return pd.read_csv(str(test_file_path.resolve())) + + +@pytest.fixture() +def housekeeping_stats(request): + return _resolve_file_path(request, "data/delta_housekeeping/dhk_pandas_result.csv") + + +@pytest.fixture() +def expected_need_optimize(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_need_optimize.csv") + + +def test_apply_output(housekeeping_stats, expected_need_optimize): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha.apply() + assert len(res) == 7 + need_optimize = [item for item in res if (list(item.keys())[0] == dha.tables_not_optimized_legend)] + assert len(need_optimize) == 1 + need_optimize_df = list(need_optimize[0].values())[0] + pd.testing.assert_frame_equal( + need_optimize_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected_need_optimize.loc[:, ["catalog", "database", "tableName"]], + ) + # TODO complete all the tests + + +def test_empty_apply_output(housekeeping_stats): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + min_table_size_optimize=1024*1024*1024*1024 + ) + res = dha.apply() + assert len(res) == 6 + need_optimize = [item for item in res if list(item.keys())[0] == dha.tables_not_optimized_legend] + assert len(need_optimize) == 0 diff --git a/tests/unit/delta_housekeeping_test.py b/tests/unit/delta_housekeeping_test.py deleted file mode 100644 index 1b760f1..0000000 --- a/tests/unit/delta_housekeeping_test.py +++ /dev/null @@ -1,25 +0,0 @@ -import pytest -import pandas as pd -from discoverx.delta_housekeeping import DeltaHousekeepingActions -from pathlib import Path - - -def test_need_optimize(request): - module_path = Path(request.module.__file__) - test_file_path = module_path.parent / "data/delta_housekeeping/dhk_pandas_result.csv" - stats = pd.read_csv(str(test_file_path.resolve())) - dha = DeltaHousekeepingActions( - None, - stats=stats, - ) - res = dha.apply() - assert len(res) == 1 - need_optimize = [item for item in res if list(res[0].keys())[0] == dha.tables_not_optimized_legend] - assert len(need_optimize) == 1 - need_optimize_df = list(need_optimize[0].values())[0] - need_optimize_df.to_csv(module_path.parent / "data/delta_housekeeping/expected_need_optimize.csv", index=False) - expected = pd.read_csv(module_path.parent / "data/delta_housekeeping/expected_need_optimize.csv") - pd.testing.assert_frame_equal( - need_optimize_df.reset_index().loc[:, ["catalog", "database", "tableName"]], - expected.loc[:, ["catalog", "database", "tableName"]], - ) From cf4ef0784079aee2c09edb565b01569c1d8d0a9e Mon Sep 17 00:00:00 2001 From: lorenzorubi-db Date: Thu, 21 Dec 2023 18:42:18 +0000 Subject: [PATCH 07/13] regression and cleanup --- discoverx/delta_housekeeping.py | 8 +++--- examples/exec_delta_housekeeping.py | 41 +++++++++++++++-------------- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py index c3cac7e..4928e80 100644 --- a/discoverx/delta_housekeeping.py +++ b/discoverx/delta_housekeeping.py @@ -255,7 +255,7 @@ def _not_optimized_last_days(self) -> pd.DataFrame: stats = self._stats.copy() stats['max_optimize_timestamp'] = pd.to_datetime(stats['max_optimize_timestamp']) stats['optimize_lag'] = ( - datetime.utcnow().replace(tzinfo=stats.dtypes["max_optimize_timestamp"].tz) - stats['max_optimize_timestamp'] + datetime.utcnow() - stats['max_optimize_timestamp'] # TODO careful ).dt.days return ( stats[stats['optimize_lag'] < self.min_days_not_optimized] @@ -280,7 +280,7 @@ def _not_vacuumed_last_days(self) -> pd.DataFrame: stats = self._stats.copy() stats['max_vacuum_timestamp'] = pd.to_datetime(stats['max_vacuum_timestamp']) stats['vacuum_lag'] = ( - datetime.utcnow().replace(tzinfo=stats.dtypes["max_vacuum_timestamp"].tz) - stats['max_vacuum_timestamp'] + datetime.utcnow() - stats['max_vacuum_timestamp'] # TODO careful ).dt.days return ( stats[stats['vacuum_lag'] < self.min_days_not_vacuumed] @@ -300,7 +300,7 @@ def _analyze_these_tables(self) -> pd.DataFrame: stats = stats.loc[stats['max_optimize_timestamp'].notnull() & stats['p50_file_size'].notnull() & (stats['number_of_files'] > 1)] - stats = stats.loc[(stats['p50_file_size'] < self.small_file_threshold)] + stats = stats.loc[(stats['p50_file_size'].astype(int) < self.small_file_threshold)] return ( stats.sort_values(by=['database', 'tableName', 'number_of_files'], ascending=[True, True, False]) ) @@ -364,3 +364,5 @@ def to_html(self): title_s.string = k body.insert(0, v.to_html()) body.insert(0, title_s) + + return soup diff --git a/examples/exec_delta_housekeeping.py b/examples/exec_delta_housekeeping.py index d221432..23b4318 100644 --- a/examples/exec_delta_housekeeping.py +++ b/examples/exec_delta_housekeeping.py @@ -1,47 +1,48 @@ # Databricks notebook source # MAGIC %md -# MAGIC # Run arbitrary operations across multiple tables +# MAGIC # Run Delta Housekeeping across multiple tables # MAGIC # COMMAND ---------- # TODO remove -# MAGIC %reload_ext autoreload -# MAGIC %autoreload 2 +%reload_ext autoreload +%autoreload 2 # COMMAND ---------- - from discoverx import DX dx = DX() # COMMAND ---------- -result = ( - dx.from_tables("lorenzorubi.*.*") - .delta_housekeeping() - .stats() +# DBTITLE 1,Run the discoverx DeltaHousekeeping operation -generates an output object you can apply operations to +output = ( + dx.from_tables("lorenzorubi.*.*") + .delta_housekeeping() ) -display(result) # COMMAND ---------- -result = ( - dx.from_tables("lorenzorubi.*.*") - .delta_housekeeping() - .apply() -) +# DBTITLE 1,Generate a pandas dataframe with stats per table +display(output.stats()) # COMMAND ---------- -result = ( - dx.from_tables("lorenzorubi.*.*") - .delta_housekeeping() - .html() -) +# DBTITLE 1,apply() operation generates a list of dictionaries (if you need to postprocess the output) +result = output.apply() + +# COMMAND ---------- + +for r in result: + print(list(r.keys())[0]) + display(list(r.values())[0]) + +# COMMAND ---------- -displayHTML(result) +# DBTITLE 1,to_html() outputs the DeltaHousekeeping recommendations +displayHTML(output.to_html()) # COMMAND ---------- From bc303cd47c54f1f034502c830e794e328bde794f Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio Date: Wed, 3 Jan 2024 14:17:16 +0100 Subject: [PATCH 08/13] move implementation of map_chunked to a separated branch + improved unit tests --- discoverx/delta_housekeeping.py | 14 ++++---- discoverx/explorer.py | 34 ------------------- setup.py | 1 - tests/unit/delta_housekeeping_actions_test.py | 4 +-- tests/unit/explorer_test.py | 12 +++++++ 5 files changed, 21 insertions(+), 44 deletions(-) diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py index 4928e80..889066f 100644 --- a/discoverx/delta_housekeeping.py +++ b/discoverx/delta_housekeeping.py @@ -1,6 +1,6 @@ from typing import Iterable from functools import reduce -from datetime import datetime +from datetime import datetime, timezone import pandas as pd from discoverx.table_info import TableInfo @@ -102,7 +102,7 @@ def scan( self, table_info_list: Iterable[TableInfo], housekeeping_table_name: str = "lorenzorubi.default.housekeeping_summary_v2", # TODO remove - do_save_as_table: bool = True, + do_save_as_table: bool = False, ) -> pd.DataFrame: """ Scans a table_info / table_info_list to fetch Delta stats @@ -155,7 +155,7 @@ def scan( """) except Exception as e: errors.append(self._spark.createDataFrame( - [(table_info.catalog, table_info.schema, table_info.table, str(e))], + [(table_info.catalog or "", table_info.schema, table_info.table, str(e))], ["catalog", "database", "tableName", "error"] )) @@ -253,9 +253,9 @@ def _optimize_not_needed(self) -> pd.DataFrame: def _not_optimized_last_days(self) -> pd.DataFrame: stats = self._stats.copy() - stats['max_optimize_timestamp'] = pd.to_datetime(stats['max_optimize_timestamp']) + stats['max_optimize_timestamp'] = pd.to_datetime(stats['max_optimize_timestamp'], utc=True) stats['optimize_lag'] = ( - datetime.utcnow() - stats['max_optimize_timestamp'] # TODO careful + datetime.now(timezone.utc) - stats['max_optimize_timestamp'] ).dt.days return ( stats[stats['optimize_lag'] < self.min_days_not_optimized] @@ -278,9 +278,9 @@ def _never_vacuumed(self) -> pd.DataFrame: def _not_vacuumed_last_days(self) -> pd.DataFrame: stats = self._stats.copy() - stats['max_vacuum_timestamp'] = pd.to_datetime(stats['max_vacuum_timestamp']) + stats['max_vacuum_timestamp'] = pd.to_datetime(stats['max_vacuum_timestamp'], utc=True) stats['vacuum_lag'] = ( - datetime.utcnow() - stats['max_vacuum_timestamp'] # TODO careful + datetime.now(timezone.utc) - stats['max_vacuum_timestamp'] ).dt.days return ( stats[stats['vacuum_lag'] < self.min_days_not_vacuumed] diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 94412f4..378d86f 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -1,7 +1,6 @@ import concurrent.futures import copy import re -import more_itertools import pandas as pd from typing import Optional, List, Callable, Iterable from discoverx import logging @@ -181,39 +180,6 @@ def map(self, f: Callable) -> list[any]: return res - def map_chunked(self, f: Callable, tables_per_chunk: int, **kwargs) -> list[any]: - """Runs a function for each table in the data explorer - - Args: - f (function): The function to run. The function should accept either a list of TableInfo objects as input and return a list of any object as output. - - Returns: - list[any]: A list of the results of running the function for each table - """ - res = [] - table_list = self._info_fetcher.get_tables_info( - self._catalogs, - self._schemas, - self._tables, - self._having_columns, - self._with_tags, - ) - with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_concurrency) as executor: - # Submit tasks to the thread pool - futures = [ - executor.submit(f, table_chunk, **kwargs) for table_chunk in more_itertools.chunked(table_list, tables_per_chunk) - ] - - # Process completed tasks - for future in concurrent.futures.as_completed(futures): - result = future.result() - if result is not None: - res.extend(result) - - logger.debug("Finished lakehouse map_chunked task") - - return res - def delta_housekeeping(self) -> pd.DataFrame: """ Gathers stats and recommendations on Delta Housekeeping diff --git a/setup.py b/setup.py index 0e4d73f..9233b4d 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,6 @@ "delta-spark>=2.2.0", "pandas<2.0.0", # From 2.0.0 onwards, pandas does not support iteritems() anymore, spark.createDataFrame will fail "numpy<1.24", # From 1.24 onwards, module 'numpy' has no attribute 'bool'. - "more_itertools", ] TEST_REQUIREMENTS = [ diff --git a/tests/unit/delta_housekeeping_actions_test.py b/tests/unit/delta_housekeeping_actions_test.py index 7ccbd9a..125d305 100644 --- a/tests/unit/delta_housekeeping_actions_test.py +++ b/tests/unit/delta_housekeeping_actions_test.py @@ -26,7 +26,7 @@ def test_apply_output(housekeeping_stats, expected_need_optimize): stats=housekeeping_stats, ) res = dha.apply() - assert len(res) == 7 + assert len(res) == 6 need_optimize = [item for item in res if (list(item.keys())[0] == dha.tables_not_optimized_legend)] assert len(need_optimize) == 1 need_optimize_df = list(need_optimize[0].values())[0] @@ -44,6 +44,6 @@ def test_empty_apply_output(housekeeping_stats): min_table_size_optimize=1024*1024*1024*1024 ) res = dha.apply() - assert len(res) == 6 + assert len(res) == 5 need_optimize = [item for item in res if list(item.keys())[0] == dha.tables_not_optimized_legend] assert len(need_optimize) == 0 diff --git a/tests/unit/explorer_test.py b/tests/unit/explorer_test.py index 8475599..1df5f19 100644 --- a/tests/unit/explorer_test.py +++ b/tests/unit/explorer_test.py @@ -1,3 +1,4 @@ +import pandas import pytest from discoverx.explorer import DataExplorer, DataExplorerActions, InfoFetcher, TableInfo @@ -89,3 +90,14 @@ def test_no_tables_matching_filter(spark, info_fetcher): data_explorer = DataExplorer("some_catalog.default.non_existent_table", spark, info_fetcher) with pytest.raises(ValueError): data_explorer.map(lambda table_info: table_info) + + +def test_delta_housekeeeping_call(spark, info_fetcher): + data_explorer = DataExplorer("*.default.*", spark, info_fetcher) + result: pandas.DataFrame = data_explorer.delta_housekeeping().stats() + print(result['tableName'].count()) + assert result['tableName'].count() == 3 + for res in result['tableName'].tolist(): + assert res in ["tb_all_types", "tb_1", "tb_2"] + for col in result.columns: + assert col in ["catalog", "database", "tableName", "error"] From feeafaf06c70f673af921e9fb6c886a0f3f8ac29 Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio Date: Wed, 3 Jan 2024 15:35:40 +0100 Subject: [PATCH 09/13] readability, cleanup, follow discoverx patterns --- README.md | 5 + discoverx/delta_housekeeping.py | 94 +++++++++++++------ examples/exec_delta_housekeeping.py | 24 +++-- tests/unit/delta_housekeeping_actions_test.py | 4 +- tests/unit/explorer_test.py | 2 +- 5 files changed, 87 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index c733664..9100573 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,11 @@ Operations are applied concurrently across multiple tables * OPTIMIZE with z-order on tables having specified columns * Detect tables having too many small files ([example notebook](examples/detect_small_files.py)) * Visualise quantity of data written per table per period + * Delta housekeeping analysis ([example notebook](examples/exec_delta_housekeeping.py)) which provide: + * stats (size of tables and number of files, timestamps of latest OPTIMIZE & VACUUM operations, stats of OPTIMIZE) + * recommendations on tables that need to be OPTIMIZED/VACUUM'ed + * are tables OPTIMIZED/VACUUM'ed often enough + * tables that have small files / tables for which ZORDER is not being effective * **Governance** * PII detection with Presidio ([example notebook](examples/pii_detection_presidio.py)) * Text Analysis with MosaicML and Databricks MLflow ([example notebook](examples/text_analysis_mosaicml_mlflow.py)) diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py index 889066f..ceda5aa 100644 --- a/discoverx/delta_housekeeping.py +++ b/discoverx/delta_housekeeping.py @@ -98,11 +98,30 @@ def _process_describe_history( return out + @staticmethod + def save_as_table( + result: DataFrame, + housekeeping_table_name: str, + ): + """ + Static method to store intermediate results of the scan operation into Delta + Would make sense only if using map_chunked from the `DataExplorer` object + (otherwise tables are writen one by one into Delta with overhead) + + TODO create function in `DataExplorer` that uses this for a chunked + """ + ( + result + .write + .format("delta") + .mode("append") + .option("mergeSchema", "true") + .saveAsTable(housekeeping_table_name) + ) + def scan( self, table_info_list: Iterable[TableInfo], - housekeeping_table_name: str = "lorenzorubi.default.housekeeping_summary_v2", # TODO remove - do_save_as_table: bool = False, ) -> pd.DataFrame: """ Scans a table_info / table_info_list to fetch Delta stats @@ -181,25 +200,20 @@ def scan( out = dh.unionByName(errors_df, allowMissingColumns=True) - if do_save_as_table: - ( - out - .write - .format("delta") - .mode("append") - .option("mergeSchema", "true") - .saveAsTable(housekeeping_table_name) - ) - return out.toPandas() class DeltaHousekeepingActions: + """ + Processes the output of the `DeltaHousekeeping` object to provide recommendations + - tables that need to be OPTIMIZED/VACUUM'ed + - are tables OPTIMIZED/VACUUM'ed often enough + - tables that have small files / tables for which ZORDER is not being effective + """ + def __init__( self, - # delta_housekeeping: DeltaHousekeeping, mapped_pd_dfs: Iterable[pd.DataFrame], - # spark: SparkSession = None, min_table_size_optimize: int = 128*1024*1024, # i.e. 128 MB min_days_not_optimized: int = 7, min_days_not_vacuumed: int = 31, @@ -209,14 +223,10 @@ def __init__( min_number_of_files_for_zorder: int = 8, stats: pd.DataFrame = None, # for testability only ) -> None: - # self._delta_housekeeping = delta_housekeeping if stats is None: self._mapped_pd_dfs = mapped_pd_dfs stats = pd.concat(self._mapped_pd_dfs) self._stats: pd.DataFrame = stats - # if spark is None: - # spark = SparkSession.builder.getOrCreate() - # self._spark = spark self.min_table_size_optimize = min_table_size_optimize self.min_days_not_optimized = min_days_not_optimized self.min_days_not_vacuumed = min_days_not_vacuumed @@ -234,9 +244,6 @@ def __init__( self.tables_to_analyze = "Tables that need more analysis (small_files)" self.tables_zorder_not_effective = "Tables for which ZORDER is not being effective" - def stats(self) -> pd.DataFrame: - return self._stats - def _need_optimize(self) -> pd.DataFrame: stats = self._stats.copy() stats = stats.loc[stats.max_optimize_timestamp.isnull() & stats.bytes.notnull()] @@ -322,7 +329,37 @@ def _zorder_not_effective(self) -> pd.DataFrame: stats ) - def apply(self): + def stats(self) -> DataFrame: + """Ouputs the stats per table""" + import pyspark.pandas as ps + + return ps.from_pandas(self._stats) + + def display(self) -> None: + """Executes the Delta housekeeping analysis and displays a sample of results""" + return self.apply().display() + + def apply(self) -> DataFrame: + """Displays recommendations in a DataFrame format""" + import pyspark.pandas as ps + + out = None + for recomm in self.generate_recommendations(): + for legend, df in recomm.items(): + out_df = ps.from_pandas(df).withColumn("recommendation", F.lit(legend)) + if out is None: + out = out_df + else: + out = out.unionByName(out_df, allowMissingColumns=True) + return out + + def generate_recommendations(self) -> Iterable[dict]: + """ + Generates Delta Housekeeping recommendations as a list of dictionaries (internal use + unit tests only) + A dict per recommendation where: + - The key is the legend of the recommendation + - The value is a pandas df with the affected tables + """ out = [] for df, legend in zip([ self._need_optimize(), @@ -349,20 +386,19 @@ def apply(self): out.append({legend: df}) return out - def to_html(self): + def explain(self) -> None: # TODO better formatting! from bs4 import BeautifulSoup - res = self.apply() soup = BeautifulSoup(features='xml') body = soup.new_tag('body') soup.insert(0, body) - for r in res: - for k,v in r.items(): + for recomm in self.generate_recommendations(): + for legend, df in recomm.items(): title_s = soup.new_tag('title') - title_s.string = k - body.insert(0, v.to_html()) + title_s.string = legend + body.insert(0, df.to_html()) body.insert(0, title_s) - return soup + displayHTML(soup) diff --git a/examples/exec_delta_housekeeping.py b/examples/exec_delta_housekeeping.py index 23b4318..a39ba06 100644 --- a/examples/exec_delta_housekeeping.py +++ b/examples/exec_delta_housekeeping.py @@ -1,13 +1,16 @@ # Databricks notebook source # MAGIC %md # MAGIC # Run Delta Housekeeping across multiple tables +# MAGIC Analysis that provides stats on Delta tables / recommendations for improvements, including: +# MAGIC - stats:size of tables and number of files, timestamps of latest OPTIMIZE & VACUUM operations, stats of OPTIMIZE) +# MAGIC - recommendations on tables that need to be OPTIMIZED/VACUUM'ed +# MAGIC - are tables OPTIMIZED/VACUUM'ed often enough +# MAGIC - tables that have small files / tables for which ZORDER is not being effective # MAGIC # COMMAND ---------- -# TODO remove -%reload_ext autoreload -%autoreload 2 +# MAGIC %pip install dbl-discoverx # COMMAND ---------- @@ -25,24 +28,25 @@ # COMMAND ---------- -# DBTITLE 1,Generate a pandas dataframe with stats per table -display(output.stats()) +# DBTITLE 1,Display the stats per table +stats = output.stats() +stats.display() # COMMAND ---------- # DBTITLE 1,apply() operation generates a list of dictionaries (if you need to postprocess the output) result = output.apply() +result.display() # COMMAND ---------- -for r in result: - print(list(r.keys())[0]) - display(list(r.values())[0]) +# DBTITLE 1,display() runs apply and displays the result +output.display() # COMMAND ---------- -# DBTITLE 1,to_html() outputs the DeltaHousekeeping recommendations -displayHTML(output.to_html()) +# DBTITLE 1,explain() outputs the DeltaHousekeeping recommendations in HTML format +output.explain() # COMMAND ---------- diff --git a/tests/unit/delta_housekeeping_actions_test.py b/tests/unit/delta_housekeeping_actions_test.py index 125d305..faed05e 100644 --- a/tests/unit/delta_housekeeping_actions_test.py +++ b/tests/unit/delta_housekeeping_actions_test.py @@ -25,7 +25,7 @@ def test_apply_output(housekeeping_stats, expected_need_optimize): None, stats=housekeeping_stats, ) - res = dha.apply() + res = dha.generate_recommendations() assert len(res) == 6 need_optimize = [item for item in res if (list(item.keys())[0] == dha.tables_not_optimized_legend)] assert len(need_optimize) == 1 @@ -43,7 +43,7 @@ def test_empty_apply_output(housekeeping_stats): stats=housekeeping_stats, min_table_size_optimize=1024*1024*1024*1024 ) - res = dha.apply() + res = dha.generate_recommendations() assert len(res) == 5 need_optimize = [item for item in res if list(item.keys())[0] == dha.tables_not_optimized_legend] assert len(need_optimize) == 0 diff --git a/tests/unit/explorer_test.py b/tests/unit/explorer_test.py index 1df5f19..429f3ae 100644 --- a/tests/unit/explorer_test.py +++ b/tests/unit/explorer_test.py @@ -94,7 +94,7 @@ def test_no_tables_matching_filter(spark, info_fetcher): def test_delta_housekeeeping_call(spark, info_fetcher): data_explorer = DataExplorer("*.default.*", spark, info_fetcher) - result: pandas.DataFrame = data_explorer.delta_housekeeping().stats() + result: pandas.DataFrame = data_explorer.delta_housekeeping()._stats print(result['tableName'].count()) assert result['tableName'].count() == 3 for res in result['tableName'].tolist(): From e8a1b66e77786345463e1463cdc25ed1c8ee11af Mon Sep 17 00:00:00 2001 From: lorenzorubi-db Date: Wed, 3 Jan 2024 17:01:55 +0000 Subject: [PATCH 10/13] debugging on cluster + adding spark session to `DeltaHousekeepingActions` --- discoverx/delta_housekeeping.py | 15 +++++++++------ discoverx/explorer.py | 2 +- examples/exec_delta_housekeeping.py | 12 +++--------- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py index ceda5aa..f13cc29 100644 --- a/discoverx/delta_housekeeping.py +++ b/discoverx/delta_housekeeping.py @@ -214,6 +214,7 @@ class DeltaHousekeepingActions: def __init__( self, mapped_pd_dfs: Iterable[pd.DataFrame], + spark: SparkSession = None, min_table_size_optimize: int = 128*1024*1024, # i.e. 128 MB min_days_not_optimized: int = 7, min_days_not_vacuumed: int = 31, @@ -227,6 +228,11 @@ def __init__( self._mapped_pd_dfs = mapped_pd_dfs stats = pd.concat(self._mapped_pd_dfs) self._stats: pd.DataFrame = stats + + if spark is None: + spark = SparkSession.builder.getOrCreate() + self._spark = spark + self.min_table_size_optimize = min_table_size_optimize self.min_days_not_optimized = min_days_not_optimized self.min_days_not_vacuumed = min_days_not_vacuumed @@ -331,9 +337,7 @@ def _zorder_not_effective(self) -> pd.DataFrame: def stats(self) -> DataFrame: """Ouputs the stats per table""" - import pyspark.pandas as ps - - return ps.from_pandas(self._stats) + return self._spark.createDataFrame(self._stats) def display(self) -> None: """Executes the Delta housekeeping analysis and displays a sample of results""" @@ -341,12 +345,10 @@ def display(self) -> None: def apply(self) -> DataFrame: """Displays recommendations in a DataFrame format""" - import pyspark.pandas as ps - out = None for recomm in self.generate_recommendations(): for legend, df in recomm.items(): - out_df = ps.from_pandas(df).withColumn("recommendation", F.lit(legend)) + out_df = self._spark.createDataFrame(df).withColumn("recommendation", F.lit(legend)) if out is None: out = out_df else: @@ -389,6 +391,7 @@ def generate_recommendations(self) -> Iterable[dict]: def explain(self) -> None: # TODO better formatting! from bs4 import BeautifulSoup + from databricks.sdk.runtime import displayHTML soup = BeautifulSoup(features='xml') diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 378d86f..ed9ffc0 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -186,7 +186,7 @@ def delta_housekeeping(self) -> pd.DataFrame: """ dh = DeltaHousekeeping(self._spark) dfs_pd: Iterable[pd.DataFrame] = self.map(dh.scan) - return DeltaHousekeepingActions(dfs_pd) + return DeltaHousekeepingActions(dfs_pd, spark=self._spark) class DataExplorerActions: diff --git a/examples/exec_delta_housekeeping.py b/examples/exec_delta_housekeeping.py index a39ba06..504f0cb 100644 --- a/examples/exec_delta_housekeeping.py +++ b/examples/exec_delta_housekeeping.py @@ -28,19 +28,13 @@ # COMMAND ---------- -# DBTITLE 1,Display the stats per table -stats = output.stats() -stats.display() - -# COMMAND ---------- - -# DBTITLE 1,apply() operation generates a list of dictionaries (if you need to postprocess the output) +# DBTITLE 1,apply() operation generates a spark dataframe with recommendations result = output.apply() -result.display() +result.select("catalog", "database", "tableName", "recommendation").display() # COMMAND ---------- -# DBTITLE 1,display() runs apply and displays the result +# DBTITLE 1,display() runs apply and displays the full result (including stats per table) output.display() # COMMAND ---------- From e177ef44e1e1930a0d6690b782848b2feea20ede Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio Date: Wed, 3 Jan 2024 19:40:42 +0100 Subject: [PATCH 11/13] simplify scan implementation & remove dependency to BeautifulSoup --- discoverx/delta_housekeeping.py | 130 +++++++++++++------------------- 1 file changed, 52 insertions(+), 78 deletions(-) diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py index f13cc29..02d5b23 100644 --- a/discoverx/delta_housekeeping.py +++ b/discoverx/delta_housekeeping.py @@ -119,46 +119,29 @@ def save_as_table( .saveAsTable(housekeeping_table_name) ) - def scan( - self, - table_info_list: Iterable[TableInfo], - ) -> pd.DataFrame: - """ - Scans a table_info / table_info_list to fetch Delta stats - - DESCRIBE DETAIL - - DESCRIBE HISTORY - """ - dd_list = [] - statements = [] - errors = [] - - if not isinstance(table_info_list, Iterable): - table_info_list = [table_info_list] - - for table_info in table_info_list: - try: - # runs a describe detail per table, figures out if exception - dd = self._spark.sql(f""" - DESCRIBE DETAIL {table_info.catalog}.{table_info.schema}.{table_info.table}; - """) - dd = ( - dd - .withColumn("split", F.split(F.col('name'), '\.')) - .withColumn("catalog", F.col("split").getItem(0)) - .withColumn("database", F.col("split").getItem(1)) - .withColumn("tableName", F.col("split").getItem(2)) - .select([ - F.col("catalog"), - F.col("database"), - F.col("tableName"), - F.col("numFiles").alias("number_of_files"), - F.col("sizeInBytes").alias("bytes"), - ]) - ) - dd_list.append(dd) - - # prepares a DESCRIBE HISTORY statement per table (will be run outside of the loop) - statements.append(f""" + def get_describe_detail(self, table_info: TableInfo): + dd = self._spark.sql(f""" + DESCRIBE DETAIL {table_info.catalog}.{table_info.schema}.{table_info.table}; + """) + dd = ( + dd + .withColumn("split", F.split(F.col('name'), '\.')) + .withColumn("catalog", F.col("split").getItem(0)) + .withColumn("database", F.col("split").getItem(1)) + .withColumn("tableName", F.col("split").getItem(2)) + .select([ + F.col("catalog"), + F.col("database"), + F.col("tableName"), + F.col("numFiles").alias("number_of_files"), + F.col("sizeInBytes").alias("bytes"), + ]) + ) + return dd + + @staticmethod + def get_describe_history_statement(table_info: TableInfo): + return f""" SELECT '{table_info.catalog}' AS catalog, '{table_info.schema}' AS database, @@ -171,36 +154,35 @@ def scan( operationParameters.zOrderBy AS z_order_by FROM (DESCRIBE HISTORY {table_info.catalog}.{table_info.schema}.{table_info.table}) WHERE operation in ('OPTIMIZE', 'VACUUM END') - """) - except Exception as e: - errors.append(self._spark.createDataFrame( - [(table_info.catalog or "", table_info.schema, table_info.table, str(e))], - ["catalog", "database", "tableName", "error"] - )) - - # statement to UNION all DESCRIBE HISTORY together - statement = " UNION ".join(statements) - - dh = self._spark.createDataFrame([], self.empty_schema) - if statements: - dh = self._process_describe_history( - reduce( - lambda left, right: left.union(right), - dd_list - ), - self._spark.sql(statement), - ) + """ - errors_df = self._spark.createDataFrame([], self.empty_schema) - if errors: - errors_df = reduce( - lambda left, right: left.union(right), - errors - ) + def scan( + self, + table_info: TableInfo, + ) -> pd.DataFrame: + """ + Scans a table_info to fetch Delta stats + - DESCRIBE DETAIL + - DESCRIBE HISTORY + """ + try: + # runs a describe detail per table, figures out if exception + dd = self.get_describe_detail(table_info) - out = dh.unionByName(errors_df, allowMissingColumns=True) + # prepares a DESCRIBE HISTORY statement per table (will be run outside the try-catch) + statement = self.get_describe_history_statement(table_info) - return out.toPandas() + return self._process_describe_history( + dd, + self._spark.sql(statement), + ).toPandas() + + except Exception as e: + errors_df = self._spark.createDataFrame( + [(table_info.catalog or "", table_info.schema, table_info.table, str(e))], + ["catalog", "database", "tableName", "error"] + ) + return errors_df.toPandas() class DeltaHousekeepingActions: @@ -390,18 +372,10 @@ def generate_recommendations(self) -> Iterable[dict]: def explain(self) -> None: # TODO better formatting! - from bs4 import BeautifulSoup - from databricks.sdk.runtime import displayHTML + from databricks.sdk.runtime import display - soup = BeautifulSoup(features='xml') - body = soup.new_tag('body') - soup.insert(0, body) for recomm in self.generate_recommendations(): for legend, df in recomm.items(): - title_s = soup.new_tag('title') - title_s.string = legend - body.insert(0, df.to_html()) - body.insert(0, title_s) - - displayHTML(soup) + display(legend) + display(df) From 023b02f87f3e5a5e15aa1cfacb55e84878614bdc Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio Date: Fri, 5 Jan 2024 18:31:18 +0100 Subject: [PATCH 12/13] faster implementation + unit tests --- discoverx/delta_housekeeping.py | 65 ++++----- .../delta_housekeeping/dd_click_sales.csv | 2 + .../dd_housekeeping_summary.csv | 2 + .../delta_housekeeping/dh_click_sales.csv | 4 + .../dh_housekeeping_summary.csv | 25 ++++ .../expected_pdh_click_sales.csv | 2 + .../expected_pdh_housekeeping_summary.csv | 2 + tests/unit/delta_housekeeping_test.py | 138 ++++++++++++++++++ 8 files changed, 206 insertions(+), 34 deletions(-) create mode 100644 tests/unit/data/delta_housekeeping/dd_click_sales.csv create mode 100644 tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv create mode 100644 tests/unit/data/delta_housekeeping/dh_click_sales.csv create mode 100644 tests/unit/data/delta_housekeeping/dh_housekeeping_summary.csv create mode 100644 tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv create mode 100644 tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv create mode 100644 tests/unit/delta_housekeeping_test.py diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py index 02d5b23..03de50c 100644 --- a/discoverx/delta_housekeeping.py +++ b/discoverx/delta_housekeeping.py @@ -24,7 +24,7 @@ def __init__(self, spark: SparkSession) -> None: @staticmethod def _process_describe_history( describe_detail_df: DataFrame, describe_history_df: DataFrame - ) -> DataFrame: + ) -> pd.DataFrame: """ processes the DESCRIBE HISTORY result of potentially several tables in different schemas/catalogs Provides @@ -33,10 +33,13 @@ def _process_describe_history( - stats of OPTIMIZE (including ZORDER) - timestamp for last & second last VACUUM + returns a pandas DataFrame, and converts Spark internal dfs to pandas as soon as they are manageable + the reason being that DESCRIBE HISTORY / DESCRIBE DETAIL cannot be cached + TODO reconsider if it is better outside of the class """ if not "operation" in describe_history_df.columns: - return describe_detail_df + return describe_detail_df.toPandas() # window over operation operation_order = ( @@ -46,52 +49,46 @@ def _process_describe_history( Window.partitionBy(["catalog", "database", "tableName", "operation"]).orderBy(F.col("timestamp").desc()) )) ) + + if operation_order.isEmpty(): + return describe_detail_df.toPandas() + + operation_order = operation_order.toPandas() + # max & 2nd timestamp of OPTIMIZE into output - out = describe_detail_df.join( - operation_order - .filter((F.col("operation") == "OPTIMIZE") & (F.col("operation_order") == 1)) - .select("catalog", "database", "tableName", "timestamp") - .withColumnRenamed("timestamp", "max_optimize_timestamp"), + out = describe_detail_df.toPandas().merge( + operation_order[(operation_order.operation == "OPTIMIZE") & (operation_order.operation_order == 1)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': 'max_optimize_timestamp'}), how="outer", on=["catalog", "database", "tableName"] ) - out = out.join( - operation_order - .filter((F.col("operation") == "OPTIMIZE") & (F.col("operation_order") == 2)) - .select("catalog", "database", "tableName", "timestamp") - .withColumnRenamed("timestamp", "2nd_optimize_timestamp"), + out = out.merge( + operation_order[(operation_order.operation == "OPTIMIZE") & (operation_order.operation_order == 2)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': '2nd_optimize_timestamp'}), how="outer", on=["catalog", "database", "tableName"] ) # max timestamp of VACUUM into output - out = out.join( - operation_order - .filter((F.col("operation") == "VACUUM END") & (F.col("operation_order") == 1)) - .select("catalog", "database", "tableName", "timestamp") - .withColumnRenamed("timestamp", "max_vacuum_timestamp"), + out = out.merge( + operation_order[(operation_order.operation == "VACUUM END") & (operation_order.operation_order == 1)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': 'max_vacuum_timestamp'}), how="outer", on=["catalog", "database", "tableName"] ) - out = out.join( - operation_order - .filter((F.col("operation") == "VACUUM END") & (F.col("operation_order") == 2)) - .select("catalog", "database", "tableName", "timestamp") - .withColumnRenamed("timestamp", "2nd_vacuum_timestamp"), + out = out.merge( + operation_order[(operation_order.operation == "VACUUM END") & (operation_order.operation_order == 2)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': '2nd_vacuum_timestamp'}), how="outer", on=["catalog", "database", "tableName"] ) # summary of table metrics table_metrics_1 = ( - operation_order.filter((F.col("operation") == "OPTIMIZE") & (F.col("operation_order") == 1)) - .select([ - F.col("catalog"), - F.col("database"), - F.col("tableName"), - F.col("min_file_size"), - F.col("p50_file_size"), - F.col("max_file_size"), - F.col("z_order_by"), - ]) + operation_order[(operation_order['operation'] == 'OPTIMIZE') & (operation_order['operation_order'] == 1)] + .loc[:, ['catalog', 'database', 'tableName', 'min_file_size', 'p50_file_size', 'max_file_size', 'z_order_by']] ) # write to output - out = out.join( + out = out.merge( table_metrics_1, how="outer", on=["catalog", "database", "tableName"] ) @@ -175,7 +172,7 @@ def scan( return self._process_describe_history( dd, self._spark.sql(statement), - ).toPandas() + ) except Exception as e: errors_df = self._spark.createDataFrame( diff --git a/tests/unit/data/delta_housekeeping/dd_click_sales.csv b/tests/unit/data/delta_housekeeping/dd_click_sales.csv new file mode 100644 index 0000000..4ffb5a3 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dd_click_sales.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes +lorenzorubi,default,click_sales,6,326068799 diff --git a/tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv b/tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv new file mode 100644 index 0000000..70fc5a1 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes +lorenzorubi,default,housekeeping_summary,1,192917 diff --git a/tests/unit/data/delta_housekeeping/dh_click_sales.csv b/tests/unit/data/delta_housekeeping/dh_click_sales.csv new file mode 100644 index 0000000..f35d6b6 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dh_click_sales.csv @@ -0,0 +1,4 @@ +catalog,database,tableName,operation,timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,click_sales,VACUUM END,2023-12-06T16:40:28Z,null,null,null,null +lorenzorubi,default,click_sales,VACUUM END,2023-12-05T01:19:47Z,null,null,null,null +lorenzorubi,default,click_sales,VACUUM END,2023-11-25T04:03:41Z,null,null,null,null diff --git a/tests/unit/data/delta_housekeeping/dh_housekeeping_summary.csv b/tests/unit/data/delta_housekeeping/dh_housekeeping_summary.csv new file mode 100644 index 0000000..d1ee36e --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dh_housekeeping_summary.csv @@ -0,0 +1,25 @@ +catalog,database,tableName,operation,timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T05:50:14Z,192917,192917,192917,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T05:21:22Z,184203,184203,184203,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T04:37:19Z,176955,176955,176955,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T04:10:26Z,168560,168560,168560,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T03:11:02Z,161710,161710,161710,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T02:44:41Z,154166,154166,154166,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T02:18:54Z,145990,145990,145990,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T01:42:12Z,137677,137677,137677,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T01:09:19Z,130864,130864,130864,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:53:33Z,123702,123702,123702,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:43:44Z,118806,118806,118806,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:28:00Z,111983,111983,111983,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:14:21Z,104790,104790,104790,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T23:47:02Z,97314,97314,97314,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T23:18:17Z,91509,91509,91509,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T22:14:48Z,84152,84152,84152,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T21:57:53Z,76464,76464,76464,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T21:30:49Z,67498,67498,67498,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T21:18:59Z,59412,59412,59412,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T20:30:48Z,51173,51173,51173,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T20:12:59Z,42346,42346,42346,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T19:35:05Z,34463,34463,34463,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T19:30:46Z,28604,28604,28604,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T19:06:51Z,8412,17592,17592,[] diff --git a/tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv b/tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv new file mode 100644 index 0000000..569c1e8 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,click_sales,6,326068799,,,2023-12-06T16:40:28Z,2023-12-05T01:19:47Z,,,, diff --git a/tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv b/tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv new file mode 100644 index 0000000..af564ba --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,housekeeping_summary,1,192917,2023-12-05T05:50:14Z,2023-12-05T05:21:22Z,,,192917,192917,192917,[] diff --git a/tests/unit/delta_housekeeping_test.py b/tests/unit/delta_housekeeping_test.py new file mode 100644 index 0000000..e53d0bf --- /dev/null +++ b/tests/unit/delta_housekeeping_test.py @@ -0,0 +1,138 @@ +import pytest +import pandas as pd +from discoverx.delta_housekeeping import DeltaHousekeeping +from pathlib import Path +import pyspark.sql.functions as F + + +def _resolve_file_path(request, relative_path): + module_path = Path(request.module.__file__) + test_file_path = module_path.parent / relative_path + return pd.read_csv( + str(test_file_path.resolve()), + dtype={ + "max_optimize_timestamp": "str", + "2nd_optimize_timestamp": "str", + "max_vacuum_timestamp": "str", + "2nd_vacuum_timestamp": "str", + } + ) + + +@pytest.fixture() +def dh_click_sales(request): + return _resolve_file_path(request, "data/delta_housekeeping/dh_click_sales.csv") + + +@pytest.fixture() +def dd_click_sales(request): + return _resolve_file_path(request, "data/delta_housekeeping/dd_click_sales.csv") + + +@pytest.fixture() +def expected_pdh_click_sales(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_pdh_click_sales.csv") + + +@pytest.fixture() +def dh_housekeeping_summary(request): + return _resolve_file_path(request, "data/delta_housekeeping/dh_housekeeping_summary.csv") + + +@pytest.fixture() +def dd_housekeeping_summary(request): + return _resolve_file_path(request, "data/delta_housekeeping/dd_housekeeping_summary.csv") + + +@pytest.fixture() +def expected_pdh_housekeeping_summary(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_pdh_housekeeping_summary.csv") + + +@pytest.mark.skip() +def test_process_describe_history_template(): + from pyspark.sql import SparkSession + spark = SparkSession.builder.getOrCreate() + + dh = DeltaHousekeeping(spark) + dd_click_sales = pd.read_csv( + "/Users/lorenzo.rubio/Documents/GitHub/discoverx_lorenzorubi-db/tests/unit/data/delta_housekeeping/dd_click_sales.csv") + dh_click_sales = pd.read_csv( + "/Users/lorenzo.rubio/Documents/GitHub/discoverx_lorenzorubi-db/tests/unit/data/delta_housekeeping/dh_click_sales.csv") + expected_pdh_click_sales = pd.read_csv( + "/Users/lorenzo.rubio/Documents/GitHub/discoverx_lorenzorubi-db/tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv" + ) + + describe_detail_df = spark.createDataFrame(dd_click_sales) + # describe_detail_df = ( + # describe_detail_df + # .withColumn("split", F.split(F.col('name'), '\.')) + # .withColumn("catalog", F.col("split").getItem(0)) + # .withColumn("database", F.col("split").getItem(1)) + # .withColumn("tableName", F.col("split").getItem(2)) + # .select([ + # F.col("catalog"), + # F.col("database"), + # F.col("tableName"), + # F.col("numFiles").alias("number_of_files"), + # F.col("sizeInBytes").alias("bytes"), + # ]) + # ) + # describe_detail_df.toPandas().to_csv("/Users/lorenzo.rubio/Documents/GitHub/discoverx_lorenzorubi-db/tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv", index=False) + + describe_history_df = spark.createDataFrame(dh_click_sales) + describe_history_df = describe_history_df.withColumn("operation", F.lit("NOOP")) + + out = dh._process_describe_history(describe_detail_df, describe_history_df) + + out.toPandas().to_csv("/Users/lorenzo.rubio/Documents/GitHub/discoverx_lorenzorubi-db/tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv", index=False) + assert out + + +def test_process_describe_history_no_optimize(spark, dh_click_sales, dd_click_sales, expected_pdh_click_sales): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_click_sales) + describe_history_df = spark.createDataFrame(dh_click_sales) + out = dh._process_describe_history(describe_detail_df, describe_history_df) + pd.testing.assert_frame_equal( + out.reset_index(), + expected_pdh_click_sales.reset_index(), + ) + + +def test_process_describe_history_no_vacuum( + spark, dh_housekeeping_summary, dd_housekeeping_summary, expected_pdh_housekeeping_summary +): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_housekeeping_summary) + describe_history_df = spark.createDataFrame(dh_housekeeping_summary) + out = dh._process_describe_history(describe_detail_df, describe_history_df) + pd.testing.assert_frame_equal( + out.reset_index(), + expected_pdh_housekeeping_summary.reset_index(), + ) + + +def test_process_describe_history_no_operation(spark, dd_click_sales): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_click_sales) + describe_history_df = spark.createDataFrame([], "string") + out = dh._process_describe_history(describe_detail_df, describe_history_df) + # output should be equal to DESCRIBE DETAIL + pd.testing.assert_frame_equal( + out.reset_index(), + dd_click_sales.reset_index(), + ) + + +def test_process_describe_history_empty_history(spark, dd_click_sales, dh_click_sales): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_click_sales) + describe_history_df = spark.createDataFrame(dh_click_sales) + describe_history_df = describe_history_df.withColumn("operation", F.lit("NOOP")) + out = dh._process_describe_history(describe_detail_df, describe_history_df) + # output should be equal to DESCRIBE DETAIL + pd.testing.assert_frame_equal( + out.reset_index(), + dd_click_sales.reset_index(), + ) \ No newline at end of file From c2b028f26de52f02aa44e420244785afd5b016dd Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio Date: Fri, 5 Jan 2024 18:35:33 +0100 Subject: [PATCH 13/13] cleanup --- tests/unit/delta_housekeeping_test.py | 40 --------------------------- 1 file changed, 40 deletions(-) diff --git a/tests/unit/delta_housekeeping_test.py b/tests/unit/delta_housekeeping_test.py index e53d0bf..63aea52 100644 --- a/tests/unit/delta_housekeeping_test.py +++ b/tests/unit/delta_housekeeping_test.py @@ -49,46 +49,6 @@ def expected_pdh_housekeeping_summary(request): return _resolve_file_path(request, "data/delta_housekeeping/expected_pdh_housekeeping_summary.csv") -@pytest.mark.skip() -def test_process_describe_history_template(): - from pyspark.sql import SparkSession - spark = SparkSession.builder.getOrCreate() - - dh = DeltaHousekeeping(spark) - dd_click_sales = pd.read_csv( - "/Users/lorenzo.rubio/Documents/GitHub/discoverx_lorenzorubi-db/tests/unit/data/delta_housekeeping/dd_click_sales.csv") - dh_click_sales = pd.read_csv( - "/Users/lorenzo.rubio/Documents/GitHub/discoverx_lorenzorubi-db/tests/unit/data/delta_housekeeping/dh_click_sales.csv") - expected_pdh_click_sales = pd.read_csv( - "/Users/lorenzo.rubio/Documents/GitHub/discoverx_lorenzorubi-db/tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv" - ) - - describe_detail_df = spark.createDataFrame(dd_click_sales) - # describe_detail_df = ( - # describe_detail_df - # .withColumn("split", F.split(F.col('name'), '\.')) - # .withColumn("catalog", F.col("split").getItem(0)) - # .withColumn("database", F.col("split").getItem(1)) - # .withColumn("tableName", F.col("split").getItem(2)) - # .select([ - # F.col("catalog"), - # F.col("database"), - # F.col("tableName"), - # F.col("numFiles").alias("number_of_files"), - # F.col("sizeInBytes").alias("bytes"), - # ]) - # ) - # describe_detail_df.toPandas().to_csv("/Users/lorenzo.rubio/Documents/GitHub/discoverx_lorenzorubi-db/tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv", index=False) - - describe_history_df = spark.createDataFrame(dh_click_sales) - describe_history_df = describe_history_df.withColumn("operation", F.lit("NOOP")) - - out = dh._process_describe_history(describe_detail_df, describe_history_df) - - out.toPandas().to_csv("/Users/lorenzo.rubio/Documents/GitHub/discoverx_lorenzorubi-db/tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv", index=False) - assert out - - def test_process_describe_history_no_optimize(spark, dh_click_sales, dd_click_sales, expected_pdh_click_sales): dh = DeltaHousekeeping(spark) describe_detail_df = spark.createDataFrame(dd_click_sales)