Skip to content
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@
- Added support for PrPr feature `Session.client_telemetry`.
- Added support for `Session.udf_profiler`.
- Added support for `functions.ai_translate`.
- Added support for the following `iceberg_config` options in `DataFrameWriter.save_as_table` and `DataFrame.copy_into_table`:
- `target_file_size`
- `partition_by`
- Added support for the following functions in `functions.py`:
- String and Binary functions:
- `base64_decode_binary`
- `bucket`
- `compress`
- `day`
- `decompress_binary`
- `decompress_string`
- `md5_binary`
Expand All @@ -23,6 +28,7 @@
- `sha2_binary`
- `soundex_p123`
- `strtok`
- `truncate`
- `try_base64_decode_binary`
- `try_base64_decode_string`
- `try_hex_decode_binary`
Expand Down Expand Up @@ -60,6 +66,9 @@
- Added support for `Dataframe.groupby.rolling()`.
- Added support for mapping `np.percentile` with DataFrame and Series inputs to `Series.quantile`.
- Added support for setting the `random_state` parameter to an integer when calling `DataFrame.sample` or `Series.sample`.
- Added support for the following `iceberg_config` options in `to_iceberg`:
- `target_file_size`
- `partition_by`

#### Improvements

Expand Down
3 changes: 3 additions & 0 deletions docs/source/snowpark/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ Functions
boolor
boolxor
boolxor_agg
bucket
build_stage_file_url
builtin
bround
Expand Down Expand Up @@ -189,6 +190,7 @@ Functions
datediff
date_add
date_sub
day
daydiff
dayname
dayofmonth
Expand Down Expand Up @@ -555,6 +557,7 @@ Functions
translate
trim
trunc
truncate
try_cast
try_parse_json
try_to_binary
Expand Down
52 changes: 49 additions & 3 deletions src/snowflake/snowpark/_internal/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
import uuid
from collections import Counter, defaultdict
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Union
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Optional, Union
from logging import getLogger

from snowflake.connector import IntegrityError
Expand Down Expand Up @@ -177,6 +177,7 @@
ExprAliasUpdateDict,
)
from snowflake.snowpark.types import BooleanType, _NumericType
from snowflake.snowpark.column import Column

ARRAY_BIND_THRESHOLD = 512

Expand Down Expand Up @@ -904,6 +905,43 @@ def to_sql_try_avoid_cast(
parse_local_name,
)

def _process_partition_by_in_iceberg_config(
self,
iceberg_config: Optional[dict],
df_aliased_col_name_to_real_col_name: Union[
DefaultDict[str, Dict[str, str]], DefaultDict[str, ExprAliasUpdateDict]
],
) -> Optional[dict]:
"""
Process partition_by expressions from iceberg_config, converting Column objects to SQL strings.
Returns a new iceberg_config dict with partition_by as a list of SQL strings, or the original config if no processing needed.
"""
if iceberg_config is None or iceberg_config.get("partition_by") is None:
return iceberg_config

iceberg_config = {k.lower(): v for k, v in iceberg_config.items()}
pb = iceberg_config["partition_by"]

# Convert to list and filter out empty expressions
partition_exprs = pb if isinstance(pb, (list, tuple)) else [pb]
partition_sqls = []
for expr in partition_exprs:
if isinstance(expr, Column):
partition_sqls.append(
self.analyze(expr._expression, df_aliased_col_name_to_real_col_name)
)
elif isinstance(expr, str):
if expr: # Ignore empty strings
partition_sqls.append(str(expr))
else:
raise TypeError(
f"partition_by in iceberg_config expected Column or str, got: {type(expr)}"
)

if partition_sqls:
return {**iceberg_config, "partition_by": partition_sqls}
return iceberg_config

def resolve(self, logical_plan: LogicalPlan) -> SnowflakePlan:
self.subquery_plans = []
self.generated_alias_maps = (
Expand Down Expand Up @@ -1164,6 +1202,10 @@ def do_resolve_with_resolved_children(

if isinstance(logical_plan, SnowflakeCreateTable):
resolved_child = resolved_children[logical_plan.children[0]]
iceberg_config = self._process_partition_by_in_iceberg_config(
logical_plan.iceberg_config, df_aliased_col_name_to_real_col_name
)

return self.plan_builder.save_as_table(
table_name=logical_plan.table_name,
column_names=logical_plan.column_names,
Expand All @@ -1184,7 +1226,7 @@ def do_resolve_with_resolved_children(
use_scoped_temp_objects=self.session._use_scoped_temp_objects,
creation_source=logical_plan.creation_source,
child_attributes=resolved_child.attributes,
iceberg_config=logical_plan.iceberg_config,
iceberg_config=iceberg_config,
table_exists=logical_plan.table_exists,
)

Expand Down Expand Up @@ -1416,6 +1458,10 @@ def do_resolve_with_resolved_children(
if format_name is not None:
format_type_options["FORMAT_NAME"] = format_name
assert logical_plan.file_format is not None
iceberg_config = self._process_partition_by_in_iceberg_config(
logical_plan.iceberg_config, df_aliased_col_name_to_real_col_name
)

return self.plan_builder.copy_into_table(
path=logical_plan.file_path,
table_name=logical_plan.table_name,
Expand All @@ -1435,7 +1481,7 @@ def do_resolve_with_resolved_children(
else None,
user_schema=logical_plan.user_schema,
create_table_from_infer_schema=logical_plan.create_table_from_infer_schema,
iceberg_config=logical_plan.iceberg_config,
iceberg_config=iceberg_config,
)

if isinstance(logical_plan, CopyIntoLocationNode):
Expand Down
58 changes: 44 additions & 14 deletions src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
EXTERNAL_VOLUME = " EXTERNAL_VOLUME "
CATALOG = " CATALOG "
BASE_LOCATION = " BASE_LOCATION "
TARGET_FILE_SIZE = " TARGET_FILE_SIZE "
CATALOG_SYNC = " CATALOG_SYNC "
STORAGE_SERIALIZATION_POLICY = " STORAGE_SERIALIZATION_POLICY "
REG_EXP = " REGEXP "
Expand Down Expand Up @@ -231,23 +232,34 @@ def format_uuid(uuid: Optional[str], with_new_line: bool = True) -> str:
return f"{UUID_COMMENT.format(uuid)}"


def validate_iceberg_config(iceberg_config: Optional[dict]) -> Dict[str, str]:
def validate_iceberg_config(
iceberg_config: Optional[dict],
) -> tuple[Dict[str, str], list]:
"""
Validate and process iceberg config, returning (options_dict, partition_exprs_list).
"""
if iceberg_config is None:
return dict()
return dict(), []

iceberg_config = {k.lower(): v for k, v in iceberg_config.items()}

return {
# Extract partition_by (already processed as SQL strings by analyzer)
partition_exprs = iceberg_config.get("partition_by", [])

options = {
EXTERNAL_VOLUME: iceberg_config.get("external_volume", None),
CATALOG: iceberg_config.get("catalog", None),
BASE_LOCATION: iceberg_config.get("base_location", None),
TARGET_FILE_SIZE: iceberg_config.get("target_file_size", None),
CATALOG_SYNC: iceberg_config.get("catalog_sync", None),
STORAGE_SERIALIZATION_POLICY: iceberg_config.get(
"storage_serialization_policy", None
),
ICEBERG_VERSION: iceberg_config.get("iceberg_version", None),
}

return options, partition_exprs


def result_scan_statement(uuid_place_holder: str) -> str:
return (
Expand Down Expand Up @@ -311,6 +323,20 @@ def partition_spec(col_exprs: List[str]) -> str:
return f"PARTITION BY {COMMA.join(col_exprs)}" if col_exprs else EMPTY_STRING


def iceberg_partition_clause(partition_exprs: List[str]) -> str:
return (
(
SPACE
+ PARTITION_BY
+ LEFT_PARENTHESIS
+ COMMA.join(partition_exprs)
+ RIGHT_PARENTHESIS
)
if partition_exprs
else EMPTY_STRING
)


def order_by_spec(col_exprs: List[str]) -> str:
if not col_exprs:
return EMPTY_STRING
Expand Down Expand Up @@ -1103,15 +1129,17 @@ def create_table_statement(
CHANGE_TRACKING: change_tracking,
}

iceberg_config = validate_iceberg_config(iceberg_config)
options.update(iceberg_config)
iceberg_options, partition_exprs = validate_iceberg_config(iceberg_config)
options.update(iceberg_options)
options_statement = get_options_statement(options)

partition_by_clause = iceberg_partition_clause(partition_exprs)

return (
f"{CREATE}{(OR + REPLACE) if replace else EMPTY_STRING}"
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{cluster_by_clause}"
f"{ICEBERG if iceberg_options else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{partition_by_clause}{cluster_by_clause}"
f"{options_statement}{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql}"
)

Expand Down Expand Up @@ -1192,15 +1220,18 @@ def create_table_as_select_statement(
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
CHANGE_TRACKING: change_tracking,
}
iceberg_config = validate_iceberg_config(iceberg_config)
options.update(iceberg_config)
iceberg_options, partition_exprs = validate_iceberg_config(iceberg_config)
options.update(iceberg_options)
options_statement = get_options_statement(options)

partition_by_clause = iceberg_partition_clause(partition_exprs)

return (
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}"
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}"
f"{ICEBERG if iceberg_options else EMPTY_STRING}{TABLE}"
f"{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} "
f"{table_name}{column_definition_sql}{cluster_by_clause}{options_statement}"
f"{table_name}{column_definition_sql}{partition_by_clause}{cluster_by_clause}{options_statement}"
f"{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql} {AS}{project_statement([], child)}"
)

Expand Down Expand Up @@ -1506,9 +1537,8 @@ def create_or_replace_dynamic_table_statement(
}
)

iceberg_options = get_options_statement(
validate_iceberg_config(iceberg_config)
).strip()
iceberg_options, _ = validate_iceberg_config(iceberg_config)
iceberg_options = get_options_statement(iceberg_options).strip()

return (
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}{TRANSIENT if is_transient else EMPTY_STRING}"
Expand Down
2 changes: 2 additions & 0 deletions src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,8 @@ def save_as_table(
the Iceberg table stores its metadata files and data in Parquet format
catalog: specifies either Snowflake or a catalog integration to use for this table
base_location: the base directory that snowflake can write iceberg metadata and files to
target_file_size: specifies a target Parquet file size for the table.
Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB'
catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
storage_serialization_policy: specifies the storage serialization policy for the table
iceberg_version: Overrides the version of iceberg to use. Defaults to 2 when unset.
Expand Down
14 changes: 14 additions & 0 deletions src/snowflake/snowpark/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4725,13 +4725,21 @@ def copy_into_table(
statement_params: Dictionary of statement level parameters to be set while executing this action.
iceberg_config: A dictionary that can contain the following iceberg configuration values:

* partition_by: specifies one or more partition expressions for the Iceberg table.
Can be a single Column, column name, SQL expression string, or a list of these.
Supports identity partitioning (column names) as well as partition transform functions
like bucket(), truncate(), year(), month(), day(), hour().

* external_volume: specifies the identifier for the external volume where
the Iceberg table stores its metadata files and data in Parquet format

* catalog: specifies either Snowflake or a catalog integration to use for this table

* base_location: the base directory that snowflake can write iceberg metadata and files to

* target_file_size: specifies a target Parquet file size for the table.
Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB'

* catalog_sync: optionally sets the catalog integration configured for Polaris Catalog

* storage_serialization_policy: specifies the storage serialization policy for the table
Expand Down Expand Up @@ -5516,10 +5524,16 @@ def create_or_replace_dynamic_table(
statement_params: Dictionary of statement level parameters to be set while executing this action.
iceberg_config: A dictionary that can contain the following iceberg configuration values:

- partition_by: specifies one or more partition expressions for the Iceberg table.
Can be a single Column, column name, SQL expression string, or a list of these.
Supports identity partitioning (column names) as well as partition transform functions
like bucket(), truncate(), year(), month(), day(), hour().
- external_volume: specifies the identifier for the external volume where
the Iceberg table stores its metadata files and data in Parquet format.
- catalog: specifies either Snowflake or a catalog integration to use for this table.
- base_location: the base directory that snowflake can write iceberg metadata and files to.
- target_file_size: specifies a target Parquet file size for the table.
Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB'
- catalog_sync: optionally sets the catalog integration configured for Polaris Catalog.
- storage_serialization_policy: specifies the storage serialization policy for the table.
copy_grants: A boolean value that specifies whether to retain the access permissions from the original view
Expand Down
15 changes: 14 additions & 1 deletion src/snowflake/snowpark/dataframe_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ def save_as_table(
max_data_extension_time: Optional[int] = None,
change_tracking: Optional[bool] = None,
copy_grants: bool = False,
iceberg_config: Optional[Dict[str, str]] = None,
iceberg_config: Optional[
Dict[str, Union[str, Iterable[ColumnOrSqlExpr]]]
] = None,
table_exists: Optional[bool] = None,
_emit_ast: bool = True,
**kwargs: Optional[Dict[str, Any]],
Expand Down Expand Up @@ -306,13 +308,21 @@ def save_as_table(
asynchronously and returns an :class:`AsyncJob`.
iceberg_config: A dictionary that can contain the following iceberg configuration values:

* partition_by: specifies one or more partition expressions for the Iceberg table.
Can be a single Column, column name, SQL expression string, or a list of these.
Supports identity partitioning (column names) as well as partition transform functions
like bucket(), truncate(), year(), month(), day(), hour().

* external_volume: specifies the identifier for the external volume where
the Iceberg table stores its metadata files and data in Parquet format

* catalog: specifies either Snowflake or a catalog integration to use for this table

* base_location: the base directory that snowflake can write iceberg metadata and files to

* target_file_size: specifies a target Parquet file size for the table.
Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB'

* catalog_sync: optionally sets the catalog integration configured for Polaris Catalog

* storage_serialization_policy: specifies the storage serialization policy for the table
Expand Down Expand Up @@ -344,11 +354,14 @@ def save_as_table(
See `Create your first Iceberg table <https://docs.snowflake.com/en/user-guide/tutorials/create-your-first-iceberg-table>`_ for more information on creating iceberg resources.

>>> df = session.create_dataframe([[1,2],[3,4]], schema=["a", "b"])
>>> from snowflake.snowpark.functions import col, bucket
>>> iceberg_config = {
... "external_volume": "example_volume",
... "catalog": "example_catalog",
... "base_location": "/iceberg_root",
... "storage_serialization_policy": "OPTIMIZED",
... "target_file_size": "128MB",
... "partition_by": ["a", bucket(3, col("b"))],
... }
>>> df.write.mode("overwrite").save_as_table("my_table", iceberg_config=iceberg_config) # doctest: +SKIP
"""
Expand Down
Loading
Loading