Skip to content

Commit 3a68b76

Browse files
SNOW-2719674-iceberg-options (#4003)
1 parent 48e715d commit 3a68b76

File tree

14 files changed

+403
-28
lines changed

14 files changed

+403
-28
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,15 @@
1010
- Added support for PrPr feature `Session.client_telemetry`.
1111
- Added support for `Session.udf_profiler`.
1212
- Added support for `functions.ai_translate`.
13+
- Added support for the following `iceberg_config` options in `DataFrameWriter.save_as_table` and `DataFrame.copy_into_table`:
14+
- `target_file_size`
15+
- `partition_by`
1316
- Added support for the following functions in `functions.py`:
1417
- String and Binary functions:
1518
- `base64_decode_binary`
19+
- `bucket`
1620
- `compress`
21+
- `day`
1722
- `decompress_binary`
1823
- `decompress_string`
1924
- `md5_binary`
@@ -23,6 +28,7 @@
2328
- `sha2_binary`
2429
- `soundex_p123`
2530
- `strtok`
31+
- `truncate`
2632
- `try_base64_decode_binary`
2733
- `try_base64_decode_string`
2834
- `try_hex_decode_binary`
@@ -64,6 +70,9 @@
6470
- Added support for `Dataframe.groupby.rolling()`.
6571
- Added support for mapping `np.percentile` with DataFrame and Series inputs to `Series.quantile`.
6672
- Added support for setting the `random_state` parameter to an integer when calling `DataFrame.sample` or `Series.sample`.
73+
- Added support for the following `iceberg_config` options in `to_iceberg`:
74+
- `target_file_size`
75+
- `partition_by`
6776

6877
#### Improvements
6978

docs/source/snowpark/functions.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ Functions
120120
boolor
121121
boolxor
122122
boolxor_agg
123+
bucket
123124
build_stage_file_url
124125
builtin
125126
bround
@@ -189,6 +190,7 @@ Functions
189190
datediff
190191
date_add
191192
date_sub
193+
day
192194
daydiff
193195
dayname
194196
dayofmonth
@@ -555,6 +557,7 @@ Functions
555557
translate
556558
trim
557559
trunc
560+
truncate
558561
try_cast
559562
try_parse_json
560563
try_to_binary

src/snowflake/snowpark/_internal/analyzer/analyzer.py

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#
55
import uuid
66
from collections import Counter, defaultdict
7-
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Union
7+
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Optional, Union
88
from logging import getLogger
99

1010
from snowflake.connector import IntegrityError
@@ -177,6 +177,7 @@
177177
ExprAliasUpdateDict,
178178
)
179179
from snowflake.snowpark.types import BooleanType, _NumericType
180+
from snowflake.snowpark.column import Column
180181

181182
ARRAY_BIND_THRESHOLD = 512
182183

@@ -904,6 +905,43 @@ def to_sql_try_avoid_cast(
904905
parse_local_name,
905906
)
906907

908+
def _process_partition_by_in_iceberg_config(
909+
self,
910+
iceberg_config: Optional[dict],
911+
df_aliased_col_name_to_real_col_name: Union[
912+
DefaultDict[str, Dict[str, str]], DefaultDict[str, ExprAliasUpdateDict]
913+
],
914+
) -> Optional[dict]:
915+
"""
916+
Process partition_by expressions from iceberg_config, converting Column objects to SQL strings.
917+
Returns a new iceberg_config dict with partition_by as a list of SQL strings, or the original config if no processing needed.
918+
"""
919+
if iceberg_config is None or iceberg_config.get("partition_by") is None:
920+
return iceberg_config
921+
922+
iceberg_config = {k.lower(): v for k, v in iceberg_config.items()}
923+
pb = iceberg_config["partition_by"]
924+
925+
# Convert to list and filter out empty expressions
926+
partition_exprs = pb if isinstance(pb, (list, tuple)) else [pb]
927+
partition_sqls = []
928+
for expr in partition_exprs:
929+
if isinstance(expr, Column):
930+
partition_sqls.append(
931+
self.analyze(expr._expression, df_aliased_col_name_to_real_col_name)
932+
)
933+
elif isinstance(expr, str):
934+
if expr: # Ignore empty strings
935+
partition_sqls.append(str(expr))
936+
else:
937+
raise TypeError(
938+
f"partition_by in iceberg_config expected Column or str, got: {type(expr)}"
939+
)
940+
941+
if partition_sqls:
942+
return {**iceberg_config, "partition_by": partition_sqls}
943+
return iceberg_config
944+
907945
def resolve(self, logical_plan: LogicalPlan) -> SnowflakePlan:
908946
self.subquery_plans = []
909947
self.generated_alias_maps = (
@@ -1164,6 +1202,10 @@ def do_resolve_with_resolved_children(
11641202

11651203
if isinstance(logical_plan, SnowflakeCreateTable):
11661204
resolved_child = resolved_children[logical_plan.children[0]]
1205+
iceberg_config = self._process_partition_by_in_iceberg_config(
1206+
logical_plan.iceberg_config, df_aliased_col_name_to_real_col_name
1207+
)
1208+
11671209
return self.plan_builder.save_as_table(
11681210
table_name=logical_plan.table_name,
11691211
column_names=logical_plan.column_names,
@@ -1184,7 +1226,7 @@ def do_resolve_with_resolved_children(
11841226
use_scoped_temp_objects=self.session._use_scoped_temp_objects,
11851227
creation_source=logical_plan.creation_source,
11861228
child_attributes=resolved_child.attributes,
1187-
iceberg_config=logical_plan.iceberg_config,
1229+
iceberg_config=iceberg_config,
11881230
table_exists=logical_plan.table_exists,
11891231
)
11901232

@@ -1416,6 +1458,10 @@ def do_resolve_with_resolved_children(
14161458
if format_name is not None:
14171459
format_type_options["FORMAT_NAME"] = format_name
14181460
assert logical_plan.file_format is not None
1461+
iceberg_config = self._process_partition_by_in_iceberg_config(
1462+
logical_plan.iceberg_config, df_aliased_col_name_to_real_col_name
1463+
)
1464+
14191465
return self.plan_builder.copy_into_table(
14201466
path=logical_plan.file_path,
14211467
table_name=logical_plan.table_name,
@@ -1435,7 +1481,7 @@ def do_resolve_with_resolved_children(
14351481
else None,
14361482
user_schema=logical_plan.user_schema,
14371483
create_table_from_infer_schema=logical_plan.create_table_from_infer_schema,
1438-
iceberg_config=logical_plan.iceberg_config,
1484+
iceberg_config=iceberg_config,
14391485
)
14401486

14411487
if isinstance(logical_plan, CopyIntoLocationNode):

src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@
151151
EXTERNAL_VOLUME = " EXTERNAL_VOLUME "
152152
CATALOG = " CATALOG "
153153
BASE_LOCATION = " BASE_LOCATION "
154+
TARGET_FILE_SIZE = " TARGET_FILE_SIZE "
154155
CATALOG_SYNC = " CATALOG_SYNC "
155156
STORAGE_SERIALIZATION_POLICY = " STORAGE_SERIALIZATION_POLICY "
156157
REG_EXP = " REGEXP "
@@ -231,23 +232,34 @@ def format_uuid(uuid: Optional[str], with_new_line: bool = True) -> str:
231232
return f"{UUID_COMMENT.format(uuid)}"
232233

233234

234-
def validate_iceberg_config(iceberg_config: Optional[dict]) -> Dict[str, str]:
235+
def validate_iceberg_config(
236+
iceberg_config: Optional[dict],
237+
) -> tuple[Dict[str, str], list]:
238+
"""
239+
Validate and process iceberg config, returning (options_dict, partition_exprs_list).
240+
"""
235241
if iceberg_config is None:
236-
return dict()
242+
return dict(), []
237243

238244
iceberg_config = {k.lower(): v for k, v in iceberg_config.items()}
239245

240-
return {
246+
# Extract partition_by (already processed as SQL strings by analyzer)
247+
partition_exprs = iceberg_config.get("partition_by", [])
248+
249+
options = {
241250
EXTERNAL_VOLUME: iceberg_config.get("external_volume", None),
242251
CATALOG: iceberg_config.get("catalog", None),
243252
BASE_LOCATION: iceberg_config.get("base_location", None),
253+
TARGET_FILE_SIZE: iceberg_config.get("target_file_size", None),
244254
CATALOG_SYNC: iceberg_config.get("catalog_sync", None),
245255
STORAGE_SERIALIZATION_POLICY: iceberg_config.get(
246256
"storage_serialization_policy", None
247257
),
248258
ICEBERG_VERSION: iceberg_config.get("iceberg_version", None),
249259
}
250260

261+
return options, partition_exprs
262+
251263

252264
def result_scan_statement(uuid_place_holder: str) -> str:
253265
return (
@@ -311,6 +323,20 @@ def partition_spec(col_exprs: List[str]) -> str:
311323
return f"PARTITION BY {COMMA.join(col_exprs)}" if col_exprs else EMPTY_STRING
312324

313325

326+
def iceberg_partition_clause(partition_exprs: List[str]) -> str:
327+
return (
328+
(
329+
SPACE
330+
+ PARTITION_BY
331+
+ LEFT_PARENTHESIS
332+
+ COMMA.join(partition_exprs)
333+
+ RIGHT_PARENTHESIS
334+
)
335+
if partition_exprs
336+
else EMPTY_STRING
337+
)
338+
339+
314340
def order_by_spec(col_exprs: List[str]) -> str:
315341
if not col_exprs:
316342
return EMPTY_STRING
@@ -1103,15 +1129,17 @@ def create_table_statement(
11031129
CHANGE_TRACKING: change_tracking,
11041130
}
11051131

1106-
iceberg_config = validate_iceberg_config(iceberg_config)
1107-
options.update(iceberg_config)
1132+
iceberg_options, partition_exprs = validate_iceberg_config(iceberg_config)
1133+
options.update(iceberg_options)
11081134
options_statement = get_options_statement(options)
11091135

1136+
partition_by_clause = iceberg_partition_clause(partition_exprs)
1137+
11101138
return (
11111139
f"{CREATE}{(OR + REPLACE) if replace else EMPTY_STRING}"
11121140
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
1113-
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
1114-
f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{cluster_by_clause}"
1141+
f"{ICEBERG if iceberg_options else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
1142+
f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{partition_by_clause}{cluster_by_clause}"
11151143
f"{options_statement}{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql}"
11161144
)
11171145

@@ -1192,15 +1220,18 @@ def create_table_as_select_statement(
11921220
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
11931221
CHANGE_TRACKING: change_tracking,
11941222
}
1195-
iceberg_config = validate_iceberg_config(iceberg_config)
1196-
options.update(iceberg_config)
1223+
iceberg_options, partition_exprs = validate_iceberg_config(iceberg_config)
1224+
options.update(iceberg_options)
11971225
options_statement = get_options_statement(options)
1226+
1227+
partition_by_clause = iceberg_partition_clause(partition_exprs)
1228+
11981229
return (
11991230
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}"
12001231
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
1201-
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}"
1232+
f"{ICEBERG if iceberg_options else EMPTY_STRING}{TABLE}"
12021233
f"{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} "
1203-
f"{table_name}{column_definition_sql}{cluster_by_clause}{options_statement}"
1234+
f"{table_name}{column_definition_sql}{partition_by_clause}{cluster_by_clause}{options_statement}"
12041235
f"{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql} {AS}{project_statement([], child)}"
12051236
)
12061237

@@ -1506,9 +1537,8 @@ def create_or_replace_dynamic_table_statement(
15061537
}
15071538
)
15081539

1509-
iceberg_options = get_options_statement(
1510-
validate_iceberg_config(iceberg_config)
1511-
).strip()
1540+
iceberg_options, _ = validate_iceberg_config(iceberg_config)
1541+
iceberg_options = get_options_statement(iceberg_options).strip()
15121542

15131543
return (
15141544
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}{TRANSIENT if is_transient else EMPTY_STRING}"

src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,6 +1298,8 @@ def save_as_table(
12981298
the Iceberg table stores its metadata files and data in Parquet format
12991299
catalog: specifies either Snowflake or a catalog integration to use for this table
13001300
base_location: the base directory that snowflake can write iceberg metadata and files to
1301+
target_file_size: specifies a target Parquet file size for the table.
1302+
Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB'
13011303
catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
13021304
storage_serialization_policy: specifies the storage serialization policy for the table
13031305
iceberg_version: Overrides the version of iceberg to use. Defaults to 2 when unset.

src/snowflake/snowpark/dataframe.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4725,13 +4725,21 @@ def copy_into_table(
47254725
statement_params: Dictionary of statement level parameters to be set while executing this action.
47264726
iceberg_config: A dictionary that can contain the following iceberg configuration values:
47274727
4728+
* partition_by: specifies one or more partition expressions for the Iceberg table.
4729+
Can be a single Column, column name, SQL expression string, or a list of these.
4730+
Supports identity partitioning (column names) as well as partition transform functions
4731+
like bucket(), truncate(), year(), month(), day(), hour().
4732+
47284733
* external_volume: specifies the identifier for the external volume where
47294734
the Iceberg table stores its metadata files and data in Parquet format
47304735
47314736
* catalog: specifies either Snowflake or a catalog integration to use for this table
47324737
47334738
* base_location: the base directory that snowflake can write iceberg metadata and files to
47344739
4740+
* target_file_size: specifies a target Parquet file size for the table.
4741+
Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB'
4742+
47354743
* catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
47364744
47374745
* storage_serialization_policy: specifies the storage serialization policy for the table
@@ -5516,10 +5524,16 @@ def create_or_replace_dynamic_table(
55165524
statement_params: Dictionary of statement level parameters to be set while executing this action.
55175525
iceberg_config: A dictionary that can contain the following iceberg configuration values:
55185526
5527+
- partition_by: specifies one or more partition expressions for the Iceberg table.
5528+
Can be a single Column, column name, SQL expression string, or a list of these.
5529+
Supports identity partitioning (column names) as well as partition transform functions
5530+
like bucket(), truncate(), year(), month(), day(), hour().
55195531
- external_volume: specifies the identifier for the external volume where
55205532
the Iceberg table stores its metadata files and data in Parquet format.
55215533
- catalog: specifies either Snowflake or a catalog integration to use for this table.
55225534
- base_location: the base directory that snowflake can write iceberg metadata and files to.
5535+
- target_file_size: specifies a target Parquet file size for the table.
5536+
Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB'
55235537
- catalog_sync: optionally sets the catalog integration configured for Polaris Catalog.
55245538
- storage_serialization_policy: specifies the storage serialization policy for the table.
55255539
copy_grants: A boolean value that specifies whether to retain the access permissions from the original view

src/snowflake/snowpark/dataframe_writer.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,9 @@ def save_as_table(
252252
max_data_extension_time: Optional[int] = None,
253253
change_tracking: Optional[bool] = None,
254254
copy_grants: bool = False,
255-
iceberg_config: Optional[Dict[str, str]] = None,
255+
iceberg_config: Optional[
256+
Dict[str, Union[str, Iterable[ColumnOrSqlExpr]]]
257+
] = None,
256258
table_exists: Optional[bool] = None,
257259
_emit_ast: bool = True,
258260
**kwargs: Optional[Dict[str, Any]],
@@ -306,13 +308,21 @@ def save_as_table(
306308
asynchronously and returns an :class:`AsyncJob`.
307309
iceberg_config: A dictionary that can contain the following iceberg configuration values:
308310
311+
* partition_by: specifies one or more partition expressions for the Iceberg table.
312+
Can be a single Column, column name, SQL expression string, or a list of these.
313+
Supports identity partitioning (column names) as well as partition transform functions
314+
like bucket(), truncate(), year(), month(), day(), hour().
315+
309316
* external_volume: specifies the identifier for the external volume where
310317
the Iceberg table stores its metadata files and data in Parquet format
311318
312319
* catalog: specifies either Snowflake or a catalog integration to use for this table
313320
314321
* base_location: the base directory that snowflake can write iceberg metadata and files to
315322
323+
* target_file_size: specifies a target Parquet file size for the table.
324+
Valid values: 'AUTO' (default), '16MB', '32MB', '64MB', '128MB'
325+
316326
* catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
317327
318328
* storage_serialization_policy: specifies the storage serialization policy for the table
@@ -344,11 +354,14 @@ def save_as_table(
344354
See `Create your first Iceberg table <https://docs.snowflake.com/en/user-guide/tutorials/create-your-first-iceberg-table>`_ for more information on creating iceberg resources.
345355
346356
>>> df = session.create_dataframe([[1,2],[3,4]], schema=["a", "b"])
357+
>>> from snowflake.snowpark.functions import col, bucket
347358
>>> iceberg_config = {
348359
... "external_volume": "example_volume",
349360
... "catalog": "example_catalog",
350361
... "base_location": "/iceberg_root",
351362
... "storage_serialization_policy": "OPTIMIZED",
363+
... "target_file_size": "128MB",
364+
... "partition_by": ["a", bucket(3, col("b"))],
352365
... }
353366
>>> df.write.mode("overwrite").save_as_table("my_table", iceberg_config=iceberg_config) # doctest: +SKIP
354367
"""

0 commit comments

Comments
 (0)