diff --git a/CHANGELOG.md b/CHANGELOG.md index bbb22f146f..e8592f3025 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,15 @@ - Added support for PrPr feature `Session.client_telemetry`. - Added support for `Session.udf_profiler`. - Added support for `functions.ai_translate`. +- Added support for the following `iceberg_config` options in `DataFrameWriter.save_as_table` and `DataFrame.copy_into_table`: + - `target_file_size` + - `partition_by` - Added support for the following functions in `functions.py`: - String and Binary functions: - `base64_decode_binary` + - `bucket` - `compress` + - `day` - `decompress_binary` - `decompress_string` - `md5_binary` @@ -23,6 +28,7 @@ - `sha2_binary` - `soundex_p123` - `strtok` + - `truncate` - `try_base64_decode_binary` - `try_base64_decode_string` - `try_hex_decode_binary` @@ -60,6 +66,9 @@ - Added support for `Dataframe.groupby.rolling()`. - Added support for mapping `np.percentile` with DataFrame and Series inputs to `Series.quantile`. - Added support for setting the `random_state` parameter to an integer when calling `DataFrame.sample` or `Series.sample`. +- Added support for the following `iceberg_config` options in `to_iceberg`: + - `target_file_size` + - `partition_by` #### Improvements diff --git a/docs/source/snowpark/functions.rst b/docs/source/snowpark/functions.rst index 192575ee43..e75ece66c6 100644 --- a/docs/source/snowpark/functions.rst +++ b/docs/source/snowpark/functions.rst @@ -120,6 +120,7 @@ Functions boolor boolxor boolxor_agg + bucket build_stage_file_url builtin bround @@ -189,6 +190,7 @@ Functions datediff date_add date_sub + day daydiff dayname dayofmonth @@ -555,6 +557,7 @@ Functions translate trim trunc + truncate try_cast try_parse_json try_to_binary diff --git a/src/snowflake/snowpark/_internal/analyzer/analyzer.py b/src/snowflake/snowpark/_internal/analyzer/analyzer.py index 3aa5d409ec..6446fc2da0 100644 --- a/src/snowflake/snowpark/_internal/analyzer/analyzer.py +++ b/src/snowflake/snowpark/_internal/analyzer/analyzer.py @@ -4,7 +4,7 @@ # import uuid from collections import Counter, defaultdict -from typing import TYPE_CHECKING, DefaultDict, Dict, List, Union +from typing import TYPE_CHECKING, DefaultDict, Dict, List, Optional, Union from logging import getLogger from snowflake.connector import IntegrityError @@ -177,6 +177,7 @@ ExprAliasUpdateDict, ) from snowflake.snowpark.types import BooleanType, _NumericType +from snowflake.snowpark.column import Column ARRAY_BIND_THRESHOLD = 512 @@ -904,6 +905,43 @@ def to_sql_try_avoid_cast( parse_local_name, ) + def _process_partition_by_in_iceberg_config( + self, + iceberg_config: Optional[dict], + df_aliased_col_name_to_real_col_name: Union[ + DefaultDict[str, Dict[str, str]], DefaultDict[str, ExprAliasUpdateDict] + ], + ) -> Optional[dict]: + """ + Process partition_by expressions from iceberg_config, converting Column objects to SQL strings. + Returns a new iceberg_config dict with partition_by as a list of SQL strings, or the original config if no processing needed. + """ + if iceberg_config is None or iceberg_config.get("partition_by") is None: + return iceberg_config + + iceberg_config = {k.lower(): v for k, v in iceberg_config.items()} + pb = iceberg_config["partition_by"] + + # Convert to list and filter out empty expressions + partition_exprs = pb if isinstance(pb, (list, tuple)) else [pb] + partition_sqls = [] + for expr in partition_exprs: + if isinstance(expr, Column): + partition_sqls.append( + self.analyze(expr._expression, df_aliased_col_name_to_real_col_name) + ) + elif isinstance(expr, str): + if expr: # Ignore empty strings + partition_sqls.append(str(expr)) + else: + raise TypeError( + f"partition_by in iceberg_config expected Column or str, got: {type(expr)}" + ) + + if partition_sqls: + return {**iceberg_config, "partition_by": partition_sqls} + return iceberg_config + def resolve(self, logical_plan: LogicalPlan) -> SnowflakePlan: self.subquery_plans = [] self.generated_alias_maps = ( @@ -1164,6 +1202,10 @@ def do_resolve_with_resolved_children( if isinstance(logical_plan, SnowflakeCreateTable): resolved_child = resolved_children[logical_plan.children[0]] + iceberg_config = self._process_partition_by_in_iceberg_config( + logical_plan.iceberg_config, df_aliased_col_name_to_real_col_name + ) + return self.plan_builder.save_as_table( table_name=logical_plan.table_name, column_names=logical_plan.column_names, @@ -1184,7 +1226,7 @@ def do_resolve_with_resolved_children( use_scoped_temp_objects=self.session._use_scoped_temp_objects, creation_source=logical_plan.creation_source, child_attributes=resolved_child.attributes, - iceberg_config=logical_plan.iceberg_config, + iceberg_config=iceberg_config, table_exists=logical_plan.table_exists, ) @@ -1416,6 +1458,10 @@ def do_resolve_with_resolved_children( if format_name is not None: format_type_options["FORMAT_NAME"] = format_name assert logical_plan.file_format is not None + iceberg_config = self._process_partition_by_in_iceberg_config( + logical_plan.iceberg_config, df_aliased_col_name_to_real_col_name + ) + return self.plan_builder.copy_into_table( path=logical_plan.file_path, table_name=logical_plan.table_name, @@ -1435,7 +1481,7 @@ def do_resolve_with_resolved_children( else None, user_schema=logical_plan.user_schema, create_table_from_infer_schema=logical_plan.create_table_from_infer_schema, - iceberg_config=logical_plan.iceberg_config, + iceberg_config=iceberg_config, ) if isinstance(logical_plan, CopyIntoLocationNode): diff --git a/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py b/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py index 883ccf26fe..797dd5c912 100644 --- a/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py +++ b/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py @@ -151,6 +151,7 @@ EXTERNAL_VOLUME = " EXTERNAL_VOLUME " CATALOG = " CATALOG " BASE_LOCATION = " BASE_LOCATION " +TARGET_FILE_SIZE = " TARGET_FILE_SIZE " CATALOG_SYNC = " CATALOG_SYNC " STORAGE_SERIALIZATION_POLICY = " STORAGE_SERIALIZATION_POLICY " REG_EXP = " REGEXP " @@ -231,16 +232,25 @@ def format_uuid(uuid: Optional[str], with_new_line: bool = True) -> str: return f"{UUID_COMMENT.format(uuid)}" -def validate_iceberg_config(iceberg_config: Optional[dict]) -> Dict[str, str]: +def validate_iceberg_config( + iceberg_config: Optional[dict], +) -> tuple[Dict[str, str], list]: + """ + Validate and process iceberg config, returning (options_dict, partition_exprs_list). + """ if iceberg_config is None: - return dict() + return dict(), [] iceberg_config = {k.lower(): v for k, v in iceberg_config.items()} - return { + # Extract partition_by (already processed as SQL strings by analyzer) + partition_exprs = iceberg_config.get("partition_by", []) + + options = { EXTERNAL_VOLUME: iceberg_config.get("external_volume", None), CATALOG: iceberg_config.get("catalog", None), BASE_LOCATION: iceberg_config.get("base_location", None), + TARGET_FILE_SIZE: iceberg_config.get("target_file_size", None), CATALOG_SYNC: iceberg_config.get("catalog_sync", None), STORAGE_SERIALIZATION_POLICY: iceberg_config.get( "storage_serialization_policy", None @@ -248,6 +258,8 @@ def validate_iceberg_config(iceberg_config: Optional[dict]) -> Dict[str, str]: ICEBERG_VERSION: iceberg_config.get("iceberg_version", None), } + return options, partition_exprs + def result_scan_statement(uuid_place_holder: str) -> str: return ( @@ -311,6 +323,20 @@ def partition_spec(col_exprs: List[str]) -> str: return f"PARTITION BY {COMMA.join(col_exprs)}" if col_exprs else EMPTY_STRING +def iceberg_partition_clause(partition_exprs: List[str]) -> str: + return ( + ( + SPACE + + PARTITION_BY + + LEFT_PARENTHESIS + + COMMA.join(partition_exprs) + + RIGHT_PARENTHESIS + ) + if partition_exprs + else EMPTY_STRING + ) + + def order_by_spec(col_exprs: List[str]) -> str: if not col_exprs: return EMPTY_STRING @@ -1103,15 +1129,17 @@ def create_table_statement( CHANGE_TRACKING: change_tracking, } - iceberg_config = validate_iceberg_config(iceberg_config) - options.update(iceberg_config) + iceberg_options, partition_exprs = validate_iceberg_config(iceberg_config) + options.update(iceberg_options) options_statement = get_options_statement(options) + partition_by_clause = iceberg_partition_clause(partition_exprs) + return ( f"{CREATE}{(OR + REPLACE) if replace else EMPTY_STRING}" f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} " - f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}" - f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{cluster_by_clause}" + f"{ICEBERG if iceberg_options else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}" + f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{partition_by_clause}{cluster_by_clause}" f"{options_statement}{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql}" ) @@ -1192,15 +1220,18 @@ def create_table_as_select_statement( MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time, CHANGE_TRACKING: change_tracking, } - iceberg_config = validate_iceberg_config(iceberg_config) - options.update(iceberg_config) + iceberg_options, partition_exprs = validate_iceberg_config(iceberg_config) + options.update(iceberg_options) options_statement = get_options_statement(options) + + partition_by_clause = iceberg_partition_clause(partition_exprs) + return ( f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}" f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} " - f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}" + f"{ICEBERG if iceberg_options else EMPTY_STRING}{TABLE}" f"{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} " - f"{table_name}{column_definition_sql}{cluster_by_clause}{options_statement}" + f"{table_name}{column_definition_sql}{partition_by_clause}{cluster_by_clause}{options_statement}" f"{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql} {AS}{project_statement([], child)}" ) @@ -1506,9 +1537,8 @@ def create_or_replace_dynamic_table_statement( } ) - iceberg_options = get_options_statement( - validate_iceberg_config(iceberg_config) - ).strip() + iceberg_options, _ = validate_iceberg_config(iceberg_config) + iceberg_options = get_options_statement(iceberg_options).strip() return ( f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}{TRANSIENT if is_transient else EMPTY_STRING}" diff --git a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py index 1ae7914c54..a3d40995a7 100644 --- a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py +++ b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py @@ -1298,6 +1298,8 @@ def save_as_table( the Iceberg table stores its metadata files and data in Parquet format catalog: specifies either Snowflake or a catalog integration to use for this table base_location: the base directory that snowflake can write iceberg metadata and files to + target_file_size: specifies a target Parquet file size for the table. + Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB' catalog_sync: optionally sets the catalog integration configured for Polaris Catalog storage_serialization_policy: specifies the storage serialization policy for the table iceberg_version: Overrides the version of iceberg to use. Defaults to 2 when unset. diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index 5b04bc306e..dfad2f91d0 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -4725,6 +4725,11 @@ def copy_into_table( statement_params: Dictionary of statement level parameters to be set while executing this action. iceberg_config: A dictionary that can contain the following iceberg configuration values: + * partition_by: specifies one or more partition expressions for the Iceberg table. + Can be a single Column, column name, SQL expression string, or a list of these. + Supports identity partitioning (column names) as well as partition transform functions + like bucket(), truncate(), year(), month(), day(), hour(). + * external_volume: specifies the identifier for the external volume where the Iceberg table stores its metadata files and data in Parquet format @@ -4732,6 +4737,9 @@ def copy_into_table( * base_location: the base directory that snowflake can write iceberg metadata and files to + * target_file_size: specifies a target Parquet file size for the table. + Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB' + * catalog_sync: optionally sets the catalog integration configured for Polaris Catalog * storage_serialization_policy: specifies the storage serialization policy for the table @@ -5516,10 +5524,16 @@ def create_or_replace_dynamic_table( statement_params: Dictionary of statement level parameters to be set while executing this action. iceberg_config: A dictionary that can contain the following iceberg configuration values: + - partition_by: specifies one or more partition expressions for the Iceberg table. + Can be a single Column, column name, SQL expression string, or a list of these. + Supports identity partitioning (column names) as well as partition transform functions + like bucket(), truncate(), year(), month(), day(), hour(). - external_volume: specifies the identifier for the external volume where the Iceberg table stores its metadata files and data in Parquet format. - catalog: specifies either Snowflake or a catalog integration to use for this table. - base_location: the base directory that snowflake can write iceberg metadata and files to. + - target_file_size: specifies a target Parquet file size for the table. + Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB' - catalog_sync: optionally sets the catalog integration configured for Polaris Catalog. - storage_serialization_policy: specifies the storage serialization policy for the table. copy_grants: A boolean value that specifies whether to retain the access permissions from the original view diff --git a/src/snowflake/snowpark/dataframe_writer.py b/src/snowflake/snowpark/dataframe_writer.py index a34cc72509..4f272da557 100644 --- a/src/snowflake/snowpark/dataframe_writer.py +++ b/src/snowflake/snowpark/dataframe_writer.py @@ -252,7 +252,9 @@ def save_as_table( max_data_extension_time: Optional[int] = None, change_tracking: Optional[bool] = None, copy_grants: bool = False, - iceberg_config: Optional[Dict[str, str]] = None, + iceberg_config: Optional[ + Dict[str, Union[str, Iterable[ColumnOrSqlExpr]]] + ] = None, table_exists: Optional[bool] = None, _emit_ast: bool = True, **kwargs: Optional[Dict[str, Any]], @@ -306,6 +308,11 @@ def save_as_table( asynchronously and returns an :class:`AsyncJob`. iceberg_config: A dictionary that can contain the following iceberg configuration values: + * partition_by: specifies one or more partition expressions for the Iceberg table. + Can be a single Column, column name, SQL expression string, or a list of these. + Supports identity partitioning (column names) as well as partition transform functions + like bucket(), truncate(), year(), month(), day(), hour(). + * external_volume: specifies the identifier for the external volume where the Iceberg table stores its metadata files and data in Parquet format @@ -313,6 +320,9 @@ def save_as_table( * base_location: the base directory that snowflake can write iceberg metadata and files to + * target_file_size: specifies a target Parquet file size for the table. + Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB' + * catalog_sync: optionally sets the catalog integration configured for Polaris Catalog * storage_serialization_policy: specifies the storage serialization policy for the table @@ -344,11 +354,14 @@ def save_as_table( See `Create your first Iceberg table `_ for more information on creating iceberg resources. >>> df = session.create_dataframe([[1,2],[3,4]], schema=["a", "b"]) + >>> from snowflake.snowpark.functions import col, bucket >>> iceberg_config = { ... "external_volume": "example_volume", ... "catalog": "example_catalog", ... "base_location": "/iceberg_root", ... "storage_serialization_policy": "OPTIMIZED", + ... "target_file_size": "128MB", + ... "partition_by": ["a", bucket(3, col("b"))], ... } >>> df.write.mode("overwrite").save_as_table("my_table", iceberg_config=iceberg_config) # doctest: +SKIP """ diff --git a/src/snowflake/snowpark/functions.py b/src/snowflake/snowpark/functions.py index 1cdfacd2e1..5f53649975 100644 --- a/src/snowflake/snowpark/functions.py +++ b/src/snowflake/snowpark/functions.py @@ -4478,6 +4478,25 @@ def hour(e: ColumnOrName, _emit_ast: bool = True) -> Column: return _call_function("hour", c, _emit_ast=_emit_ast) +@publicapi +def day(e: ColumnOrName, _emit_ast: bool = True) -> Column: + """ + Extracts the day from a date or timestamp. + + Example:: + + >>> import datetime + >>> df = session.create_dataframe([ + ... datetime.datetime.strptime("2020-05-01 13:11:20.000", "%Y-%m-%d %H:%M:%S.%f"), + ... datetime.datetime.strptime("2020-08-21 01:30:05.000", "%Y-%m-%d %H:%M:%S.%f") + ... ], schema=["a"]) + >>> df.select(day("a")).collect() + [Row(DAY("A")=1), Row(DAY("A")=21)] + """ + c = _to_col_if_str(e, "day") + return _call_function("day", c, _emit_ast=_emit_ast) + + @publicapi def last_day( expr: ColumnOrName, part: Optional[ColumnOrName] = None, _emit_ast: bool = True @@ -4690,6 +4709,62 @@ def year(e: ColumnOrName, _emit_ast: bool = True) -> Column: return _call_function("year", c, _emit_ast=_emit_ast) +@publicapi +def bucket( + num_buckets: Union[int, ColumnOrName], col: ColumnOrName, _emit_ast: bool = True +) -> Column: + """ + Performs an Iceberg partition bucket transform. + This function should only be used in the iceberg_config['partition_by'] parameter when creating Iceberg tables. + + Example:: + + >>> iceberg_config = { + ... "external_volume": "example_volume", + ... "partition_by": [bucket(10, "a")] + ... } + >>> df.write.save_as_table("my_table", iceberg_config=iceberg_config) # doctest: +SKIP + """ + ast = build_function_expr("bucket", [num_buckets, col]) if _emit_ast else None + + num_buckets = ( + lit(num_buckets, _emit_ast=False) + if isinstance(num_buckets, int) + else _to_col_if_str(num_buckets, "bucket") + ) + col = _to_col_if_str(col, "bucket") + + return _call_function("bucket", num_buckets, col, _ast=ast, _emit_ast=_emit_ast) + + +@publicapi +def truncate( + width: Union[int, ColumnOrName], col: ColumnOrName, _emit_ast: bool = True +) -> Column: + """ + Performs an Iceberg partition truncate transform. + This function should only be used in the iceberg_config['partition_by'] parameter when creating Iceberg tables. + + Example:: + + >>> iceberg_config = { + ... "external_volume": "example_volume", + ... "partition_by": [truncate(3, "a")] + ... } + >>> df.write.save_as_table("my_table", iceberg_config=iceberg_config) # doctest: +SKIP + """ + ast = build_function_expr("truncate", [width, col]) if _emit_ast else None + + width = ( + lit(width, _emit_ast=False) + if isinstance(width, int) + else _to_col_if_str(width, "truncate") + ) + col = _to_col_if_str(col, "truncate") + + return _call_function("truncate", width, col, _ast=ast, _emit_ast=_emit_ast) + + @publicapi def sysdate(_emit_ast: bool = True) -> Column: """ diff --git a/src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py b/src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py index f40bfb6e19..ff6ca460f1 100644 --- a/src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py +++ b/src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py @@ -550,6 +550,11 @@ def to_iceberg( it represents the fully-qualified object identifier (database name, schema name, and table name). iceberg_config: A dictionary that can contain the following iceberg configuration values: + * partition_by: specifies one or more partition expressions for the Iceberg table. + Can be a single Column, column name, SQL expression string, or a list of these. + Supports identity partitioning (column names) as well as partition transform functions + like bucket(), truncate(), year(), month(), day(), hour(). + * external_volume: specifies the identifier for the external volume where the Iceberg table stores its metadata files and data in Parquet format @@ -557,6 +562,9 @@ def to_iceberg( * base_location: the base directory that snowflake can write iceberg metadata and files to + * target_file_size: specifies a target Parquet file size for the table. + Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB' + * catalog_sync: optionally sets the catalog integration configured for Polaris Catalog * storage_serialization_policy: specifies the storage serialization policy for the table diff --git a/src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py b/src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py index ece17f3aa3..486403db7a 100644 --- a/src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py +++ b/src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py @@ -881,6 +881,11 @@ def to_iceberg( it represents the fully-qualified object identifier (database name, schema name, and table name). iceberg_config: A dictionary that can contain the following iceberg configuration values: + * partition_by: specifies one or more partition expressions for the Iceberg table. + Can be a single Column, column name, SQL expression string, or a list of these. + Supports identity partitioning (column names) as well as partition transform functions + like bucket(), truncate(), year(), month(), day(), hour(). + * external_volume: specifies the identifier for the external volume where the Iceberg table stores its metadata files and data in Parquet format @@ -888,6 +893,9 @@ def to_iceberg( * base_location: the base directory that snowflake can write iceberg metadata and files to + * target_file_size: specifies a target Parquet file size for the table. + Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB' + * catalog_sync: optionally sets the catalog integration configured for Polaris Catalog * storage_serialization_policy: specifies the storage serialization policy for the table diff --git a/src/snowflake/snowpark/modin/plugin/extensions/series_extensions.py b/src/snowflake/snowpark/modin/plugin/extensions/series_extensions.py index f79b74b483..56a7e08001 100644 --- a/src/snowflake/snowpark/modin/plugin/extensions/series_extensions.py +++ b/src/snowflake/snowpark/modin/plugin/extensions/series_extensions.py @@ -544,6 +544,11 @@ def to_iceberg( it represents the fully-qualified object identifier (database name, schema name, and table name). iceberg_config: A dictionary that can contain the following iceberg configuration values: + * partition_by: specifies one or more partition expressions for the Iceberg table. + Can be a single Column, column name, SQL expression string, or a list of these. + Supports identity partitioning (column names) as well as partition transform functions + like bucket(), truncate(), year(), month(), day(), hour(). + * external_volume: specifies the identifier for the external volume where the Iceberg table stores its metadata files and data in Parquet format @@ -551,6 +556,9 @@ def to_iceberg( * base_location: the base directory that snowflake can write iceberg metadata and files to + * target_file_size: specifies a target Parquet file size for the table. + Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB' + * catalog_sync: optionally sets the catalog integration configured for Polaris Catalog * storage_serialization_policy: specifies the storage serialization policy for the table diff --git a/tests/integ/scala/test_dataframe_copy_into.py b/tests/integ/scala/test_dataframe_copy_into.py index a3119f5e91..29b11ad2a2 100644 --- a/tests/integ/scala/test_dataframe_copy_into.py +++ b/tests/integ/scala/test_dataframe_copy_into.py @@ -16,7 +16,15 @@ SnowparkDataframeReaderException, SnowparkSQLException, ) -from snowflake.snowpark.functions import builtin, col, get, lit, sql_expr, xmlget +from snowflake.snowpark.functions import ( + bucket, + builtin, + col, + get, + lit, + sql_expr, + xmlget, +) from snowflake.snowpark.types import ( DoubleType, IntegerType, @@ -265,6 +273,7 @@ def test_copy_into_csv_iceberg( "external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL", "CATALOG": "SNOWFLAKE", "BASE_LOCATION": "snowpark_python_tests", + "partition_by": ["b", bucket(10, "a"), "", "truncate(3, b)"], }, ) try: @@ -272,7 +281,7 @@ def test_copy_into_csv_iceberg( ddl = session._run_query(f"select get_ddl('table', '{test_table_name}')") assert ( ddl[0][0] - == f"create or replace ICEBERG TABLE {test_table_name} (\n\tA LONG,\n\tB STRING,\n\tC DOUBLE\n)\n EXTERNAL_VOLUME = 'PYTHON_CONNECTOR_ICEBERG_EXVOL'\n CATALOG = 'SNOWFLAKE'\n BASE_LOCATION = 'snowpark_python_tests/';" + == f"create or replace ICEBERG TABLE {test_table_name} (\n\tA LONG,\n\tB STRING,\n\tC DOUBLE\n)\n PARTITION BY (B, BUCKET(10, A), TRUNCATE(3, B))\n EXTERNAL_VOLUME = 'PYTHON_CONNECTOR_ICEBERG_EXVOL'\n CATALOG = 'SNOWFLAKE'\n BASE_LOCATION = 'snowpark_python_tests/';" ) # Check that a copy_into works on the newly created table. df.copy_into_table(test_table_name) diff --git a/tests/integ/scala/test_dataframe_writer_suite.py b/tests/integ/scala/test_dataframe_writer_suite.py index 5e25f3dc7f..d678128e8a 100644 --- a/tests/integ/scala/test_dataframe_writer_suite.py +++ b/tests/integ/scala/test_dataframe_writer_suite.py @@ -13,7 +13,14 @@ from snowflake.snowpark import Row from snowflake.snowpark._internal.utils import TempObjectType, parse_table_name from snowflake.snowpark.exceptions import SnowparkSQLException -from snowflake.snowpark.functions import col, lit, object_construct, parse_json +from snowflake.snowpark.functions import ( + col, + lit, + object_construct, + parse_json, + truncate, + day, +) from snowflake.snowpark.mock.exceptions import SnowparkLocalTestingException from snowflake.snowpark.types import ( DoubleType, @@ -22,6 +29,7 @@ StringType, StructField, StructType, + TimestampType, ) from unittest.mock import patch from tests.utils import ( @@ -229,6 +237,7 @@ def test_iceberg(session, local_testing_mode): [ StructField("a", StringType()), StructField("b", IntegerType()), + StructField("ts", TimestampType()), ] ), ) @@ -238,19 +247,153 @@ def test_iceberg(session, local_testing_mode): "external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL", "catalog": "SNOWFLAKE", "base_location": "snowpark_python_tests", + "target_file_size": "64MB", + "partition_by": ["a", "bucket(5, b)", "", truncate(3, "a"), day("ts")], "iceberg_version": 3, }, ) try: ddl = session._run_query(f"select get_ddl('table', '{table_name}')") assert ( - ddl[0][0] - == f"create or replace ICEBERG TABLE {table_name} (\n\tA STRING,\n\tB LONG\n)\n EXTERNAL_VOLUME = 'PYTHON_CONNECTOR_ICEBERG_EXVOL'\n CATALOG = 'SNOWFLAKE'\n BASE_LOCATION = 'snowpark_python_tests/';" + ddl[0][0] == f"create or replace ICEBERG TABLE {table_name} (\n\t" + f"A STRING,\n\tB LONG,\n\tTS TIMESTAMP_NTZ(6)\n)\n " + f"PARTITION BY (A, BUCKET(5, B), TRUNCATE(3, A), DAY(TS))\n " + f"EXTERNAL_VOLUME = 'PYTHON_CONNECTOR_ICEBERG_EXVOL'\n CATALOG = 'SNOWFLAKE'\n " + f"BASE_LOCATION = 'snowpark_python_tests/';" ) + + params = session.sql(f"show parameters for table {table_name}").collect() + target_file_size_params = [ + row for row in params if row["key"] == "TARGET_FILE_SIZE" + ] + assert ( + len(target_file_size_params) > 0 + and target_file_size_params[0]["value"] == "64MB" + ), f"Expected TARGET_FILE_SIZE='64MB', got '{target_file_size_params[0]['value']}'" finally: session.table(table_name).drop_table() +def test_iceberg_partition_by(session, local_testing_mode): + if not iceberg_supported(session, local_testing_mode) or is_in_stored_procedure(): + pytest.skip("Test requires iceberg support.") + + session.sql( + "alter session set FEATURE_INCREASED_MAX_LOB_SIZE_PERSISTED=DISABLED" + ).collect() + session.sql( + "alter session set FEATURE_INCREASED_MAX_LOB_SIZE_IN_MEMORY=DISABLED" + ).collect() + + df = session.create_dataframe( + [], + schema=StructType( + [ + StructField("a", StringType()), + StructField("b", IntegerType()), + ] + ), + ) + + # Test 1: Single string value + table_name_1 = Utils.random_table_name() + df.write.save_as_table( + table_name_1, + iceberg_config={ + "external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL", + "catalog": "SNOWFLAKE", + "Partition_by": "b", + }, + ) + try: + ddl = session._run_query(f"select get_ddl('table', '{table_name_1}')") + assert ( + ddl[0][0] == f"create or replace ICEBERG TABLE {table_name_1} (\n\t" + f"A STRING,\n\tB LONG\n)\n " + f"PARTITION BY (B)\n " + f"EXTERNAL_VOLUME = 'PYTHON_CONNECTOR_ICEBERG_EXVOL'\n CATALOG = 'SNOWFLAKE';" + ) + + finally: + session.table(table_name_1).drop_table() + + # Test 2: Empty list + table_name_2 = Utils.random_table_name() + df.write.save_as_table( + table_name_2, + iceberg_config={ + "external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL", + "catalog": "SNOWFLAKE", + "partition_by": [], + }, + ) + try: + ddl = session._run_query(f"select get_ddl('table', '{table_name_2}')") + assert "PARTITION BY" not in ddl[0][0] + finally: + session.table(table_name_2).drop_table() + + # Test 3: Single Column object + table_name_3 = Utils.random_table_name() + df.write.save_as_table( + table_name_3, + iceberg_config={ + "external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL", + "catalog": "SNOWFLAKE", + "partition_by": col("b"), + }, + ) + try: + ddl = session._run_query(f"select get_ddl('table', '{table_name_3}')") + assert "PARTITION BY (B)" in ddl[0][0] + finally: + session.table(table_name_3).drop_table() + + # Test 4: Mix of strings and Column objects with empty strings + table_name_4 = Utils.random_table_name() + df.write.save_as_table( + table_name_4, + iceberg_config={ + "external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL", + "catalog": "SNOWFLAKE", + "partition_by": ["a", "", col("b")], + }, + ) + try: + ddl = session._run_query(f"select get_ddl('table', '{table_name_4}')") + assert "PARTITION BY (A, B)" in ddl[0][0] + finally: + session.table(table_name_4).drop_table() + + # Test 5: No partition_by + table_name_5 = Utils.random_table_name() + df.write.save_as_table( + table_name_5, + iceberg_config={ + "external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL", + "catalog": "SNOWFLAKE", + }, + ) + try: + ddl = session._run_query(f"select get_ddl('table', '{table_name_5}')") + assert "PARTITION BY" not in ddl[0][0] + finally: + session.table(table_name_5).drop_table() + + # Test 6: Invalid type should raise TypeError + with pytest.raises( + TypeError, match="partition_by in iceberg_config expected Column or str" + ): + df.write.save_as_table( + Utils.random_table_name(), + iceberg_config={ + "external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL", + "catalog": "SNOWFLAKE", + "partition_by": 123, + }, + ) + + @pytest.mark.skipif( "config.getoption('local_testing_mode', default=False)", reason="BUG: SNOW-1235716 should raise not implemented error not AttributeError: 'MockExecutionPlan' object has no attribute 'schema_query'", diff --git a/tests/unit/test_analyzer_util_suite.py b/tests/unit/test_analyzer_util_suite.py index 9a20e760ac..3cd5c3e60b 100644 --- a/tests/unit/test_analyzer_util_suite.py +++ b/tests/unit/test_analyzer_util_suite.py @@ -446,13 +446,16 @@ def test_create_iceberg_table_statement(): "external_volume": "example_volume", "catalog": "example_catalog", "base_location": "/root", + "target_file_size": "128MB", "catalog_sync": "integration_name", "storage_serialization_policy": "OPTIMIZED", + "partition_by": ["country", "bucket(10, user_id)"], }, ) == ( - " CREATE ICEBERG TABLE test_table(test_col varchar) EXTERNAL_VOLUME = 'example_volume' " - " CATALOG = 'example_catalog' BASE_LOCATION = '/root' CATALOG_SYNC = 'integration_name'" - " STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED' " + " CREATE ICEBERG TABLE test_table(test_col varchar) " + " PARTITION BY (country, bucket(10, user_id)) EXTERNAL_VOLUME = 'example_volume' " + " CATALOG = 'example_catalog' BASE_LOCATION = '/root' TARGET_FILE_SIZE = '128MB' " + " CATALOG_SYNC = 'integration_name' STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED' " ) @@ -467,9 +470,12 @@ def test_create_iceberg_table_as_select_statement(): "base_location": "/root", "catalog_sync": "integration_name", "storage_serialization_policy": "OPTIMIZED", + "partition_by": ["HOUR(timestamp)", "TRUNCATE(3, category)"], }, ) == ( - " CREATE ICEBERG TABLE test_table EXTERNAL_VOLUME = 'example_volume' CATALOG = " + " CREATE ICEBERG TABLE test_table " + " PARTITION BY (HOUR(timestamp), TRUNCATE(3, category)) " + " EXTERNAL_VOLUME = 'example_volume' CATALOG = " "'example_catalog' BASE_LOCATION = '/root' CATALOG_SYNC = 'integration_name' " "STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED' AS SELECT * \n" " FROM (\nselect * from foo\n)" @@ -498,13 +504,14 @@ def test_create_dynamic_iceberg_table(): "external_volume": "example_volume", "catalog": "example_catalog", "base_location": "/root", + "target_file_size": "32MB", "catalog_sync": "integration_name", "storage_serialization_policy": "OPTIMIZED", }, ) == ( " CREATE OR REPLACE DYNAMIC ICEBERG TABLE my_dt LAG = '1 minute' WAREHOUSE = " "my_warehouse EXTERNAL_VOLUME = 'example_volume' CATALOG = 'example_catalog' " - "BASE_LOCATION = '/root' CATALOG_SYNC = 'integration_name' STORAGE_SERIALIZATION_POLICY " + "BASE_LOCATION = '/root' TARGET_FILE_SIZE = '32MB' CATALOG_SYNC = 'integration_name' STORAGE_SERIALIZATION_POLICY " " = 'OPTIMIZED' AS SELECT * \n" " FROM (\nselect * from foo\n)" )