Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@
<!--Compatible protobuf version https://github.com/confluentinc/common/blob/v7.7.0/pom.xml#L91 -->
<protobuf.version>3.25.5</protobuf.version>
<guava.version>33.4.0-jre</guava.version>
<iceberg.version>1.6.1</iceberg.version>
<iceberg.version>1.10.0</iceberg.version>
<jackson.version>2.18.2</jackson.version>
<commons-compress.version>1.27.1</commons-compress.version>
<maven-surefire-plugin.version>3.5.2</maven-surefire-plugin.version>
<snowflake-jdbc.version>3.26.1</snowflake-jdbc.version>
<slf4j-api.version>2.0.17</slf4j-api.version>
<parquet.version>1.14.4</parquet.version>
<parquet.version>1.16.0</parquet.version>
<commons-lang3.version>3.18.0</commons-lang3.version>
</properties>

Expand Down Expand Up @@ -630,7 +630,7 @@
<dependency>
<groupId>com.snowflake</groupId>
<artifactId>snowpipe-streaming</artifactId>
<version>1.0.2</version>
<version>1.1.0</version>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public class SnowflakeSinkConnectorConfig {
public static final String ICEBERG_ENABLED = "snowflake.streaming.iceberg.enabled";
public static final boolean ICEBERG_ENABLED_DEFAULT_VALUE = false;

// Use VARIANT instead of OBJECT for Iceberg table columns (record_content, record_metadata)
public static final String ICEBERG_USE_VARIANT_TYPE = "snowflake.streaming.iceberg.use.variant";
public static final boolean ICEBERG_USE_VARIANT_TYPE_DEFAULT = false;

// with this flag set to true the user is responsible
// for creating the destination objects (pipe and the table)
// when this feature is turned on the normal topic to table mapping applies but the connector
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ public static boolean isIcebergEnabled(Map<String, String> config) {
return Boolean.parseBoolean(config.get(ICEBERG_ENABLED));
}

public static boolean isIcebergUseVariantType(Map<String, String> config) {
return Boolean.parseBoolean(
config.getOrDefault(
SnowflakeSinkConnectorConfig.ICEBERG_USE_VARIANT_TYPE,
String.valueOf(SnowflakeSinkConnectorConfig.ICEBERG_USE_VARIANT_TYPE_DEFAULT)));
}

public static boolean isSchematizationEnabled(Map<String, String> config) {
return Boolean.parseBoolean(
config.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,21 @@ public interface SnowflakeConnectionService {
void createTableWithOnlyMetadataColumn(String tableName);

/**
* Alter the RECORD_METADATA column to be of the required structured OBJECT type for iceberg
* Alter the RECORD_METADATA column to be of the required type (OBJECT or VARIANT) for iceberg
* tables.
*
* @param tableName iceberg table name
* @param config connector configuration to determine if VARIANT type should be used
*/
void initializeMetadataColumnTypeForIceberg(String tableName);
void initializeMetadataColumnTypeForIceberg(String tableName, Map<String, String> config);

/**
* Add the RECORD_METADATA column to the iceberg table if it does not exist.
*
* @param tableName iceberg table name
* @param config connector configuration to determine if VARIANT type should be used
*/
void addMetadataColumnForIcebergIfNotExists(String tableName);
void addMetadataColumnForIcebergIfNotExists(String tableName, Map<String, String> config);

/**
* Calls describe table statement and returns all columns and corresponding types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.snowflake.kafka.connector.streaming.iceberg.IcebergDDLTypes;
import net.snowflake.client.jdbc.SnowflakeDriver;
import net.snowflake.client.jdbc.cloud.storage.StageInfo;

Expand Down Expand Up @@ -145,12 +147,12 @@ public void createTableWithOnlyMetadataColumn(final String tableName) {
}

@Override
public void addMetadataColumnForIcebergIfNotExists(String tableName) {
public void addMetadataColumnForIcebergIfNotExists(String tableName, Map<String, String> config) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query =
"ALTER ICEBERG TABLE identifier(?) ADD COLUMN IF NOT EXISTS RECORD_METADATA "
+ ICEBERG_METADATA_OBJECT_SCHEMA;
+ IcebergDDLTypes.getMetadataType(config);
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
Expand All @@ -167,12 +169,12 @@ public void addMetadataColumnForIcebergIfNotExists(String tableName) {
}

@Override
public void initializeMetadataColumnTypeForIceberg(String tableName) {
public void initializeMetadataColumnTypeForIceberg(String tableName, Map<String, String> config) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query =
"ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE "
+ ICEBERG_METADATA_OBJECT_SCHEMA;
+ IcebergDDLTypes.getMetadataType(config);
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.CUSTOM_SNOWFLAKE_CONVERTERS;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_LOG_ENABLE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ICEBERG_ENABLED;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD;
import static com.snowflake.kafka.connector.Utils.isIcebergEnabled;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -37,11 +35,6 @@ public class DefaultStreamingConfigValidator implements StreamingConfigValidator
public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
Map<String, String> invalidParams = new HashMap<>();

// Validate Iceberg config
if (isIcebergEnabled(inputConfig)) {
invalidParams.put(ICEBERG_ENABLED, "Ingestion to Iceberg table is currently unsupported.");
}

invalidParams.putAll(validateConfigConverters(KEY_CONVERTER_CONFIG_FIELD, inputConfig));
invalidParams.putAll(validateConfigConverters(VALUE_CONVERTER_CONFIG_FIELD, inputConfig));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private void tableActionsOnStartPartition(String tableName) {
} else if (isIcebergEnabled(connectorConfig)) {
icebergTableSchemaValidator.validateTable(
tableName, getRole(connectorConfig), isSchematizationEnabled(connectorConfig));
icebergInitService.initializeIcebergTableProperties(tableName);
icebergInitService.initializeIcebergTableProperties(tableName, connectorConfig);
} else {
createTableIfNotExists(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import com.snowflake.kafka.connector.Utils;
import java.util.Map;

public class IcebergDDLTypes {

public static String ICEBERG_METADATA_VARIANT_TYPE = "VARIANT";
public static String ICEBERG_CONTENT_VARIANT_TYPE = "VARIANT";

public static String ICEBERG_METADATA_OBJECT_SCHEMA =
"OBJECT("
+ "offset LONG,"
Expand All @@ -15,4 +21,16 @@ public class IcebergDDLTypes {
+ "SnowflakeConnectorPushTime BIGINT,"
+ "headers MAP(VARCHAR, VARCHAR)"
+ ")";

public static String getMetadataType(Map<String, String> config) {
return Utils.isIcebergUseVariantType(config)
? ICEBERG_METADATA_VARIANT_TYPE
: ICEBERG_METADATA_OBJECT_SCHEMA;
}

public static String getContentType(Map<String, String> config, String structuredSchema) {
return Utils.isIcebergUseVariantType(config)
? ICEBERG_CONTENT_VARIANT_TYPE
: structuredSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import java.util.Map;

public class IcebergInitService {

Expand All @@ -13,9 +14,9 @@ public IcebergInitService(SnowflakeConnectionService snowflakeConnectionService)
this.snowflakeConnectionService = snowflakeConnectionService;
}

public void initializeIcebergTableProperties(String tableName) {
public void initializeIcebergTableProperties(String tableName, Map<String, String> config) {
LOGGER.info("Initializing properties for Iceberg table: {}", tableName);
snowflakeConnectionService.addMetadataColumnForIcebergIfNotExists(tableName);
snowflakeConnectionService.initializeMetadataColumnTypeForIceberg(tableName);
snowflakeConnectionService.addMetadataColumnForIcebergIfNotExists(tableName, config);
snowflakeConnectionService.initializeMetadataColumnTypeForIceberg(tableName, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
public class IcebergTableSchemaValidator {

private static final String SF_STRUCTURED_OBJECT = "OBJECT";
private static final String SF_VARIANT = "VARIANT";

private final SnowflakeConnectionService snowflakeConnectionService;

Expand Down Expand Up @@ -41,9 +42,7 @@ public void validateTable(String tableName, String role, boolean schemaEvolution

metadata.ifPresent(
m -> {
// if metadata column exists it must be of type OBJECT(), if not exists we create on our
// own this column
if (!isOfStructuredObjectType(m)) {
if (!isOfAcceptableType(m)) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type");
}
});
Expand All @@ -69,12 +68,16 @@ private static void validateNoSchemaEvolutionScenario(List<DescribeTableRow> col
.orElseThrow(
() -> SnowflakeErrors.ERROR_0032.getException("record_content_column_not_found"));

if (!isOfStructuredObjectType(recordContent)) {
if (!isOfAcceptableType(recordContent)) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_record_content_type");
}
}

private static boolean isOfStructuredObjectType(DescribeTableRow metadata) {
return metadata.getType().startsWith(SF_STRUCTURED_OBJECT);
/**
* Check if column type is acceptable for Iceberg tables. Accepts both VARIANT and OBJECT.
*/
private static boolean isOfAcceptableType(DescribeTableRow column) {
String type = column.getType();
return type.equals(SF_VARIANT) || type.startsWith(SF_STRUCTURED_OBJECT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -645,17 +645,6 @@ public void shouldThrowExceptionWhenRoleNotDefinedForSSv2() {
.hasMessageContaining(SNOWFLAKE_ROLE);
}

@Test
public void shouldThrowExceptionWhenBothSSv2AndIcebergEnabled() {
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.streamingConfig().withIcebergEnabled().build();

assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining("Ingestion to Iceberg table is currently unsupported")
.hasMessageContaining(ICEBERG_ENABLED);
}

@Test
public void shouldValidateSSv2WithoutIceberg() {
Map<String, String> config = SnowflakeSinkConnectorConfigBuilder.streamingConfig().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,41 @@ public static void teardown() {
}

protected static void createIcebergTable(String tableName) {
createIcebergTableWithColumnClause(tableName, "record_metadata object()");
createIcebergTable(tableName, false);
}

protected static void createIcebergTable(String tableName, boolean useVariant) {
String metadataType = useVariant ? "variant" : "object()";
createIcebergTableWithColumnClause(tableName, "record_metadata " + metadataType);
}

protected static void createIcebergTableNoSchemaEvolution(String tableName) {
createIcebergTableNoSchemaEvolution(tableName, false);
}

protected static void createIcebergTableNoSchemaEvolution(String tableName, boolean useVariant) {
String metadataType = useVariant ? "variant" : "object()";
String contentType = useVariant ? "variant" : "object()";
createIcebergTableWithColumnClause(
tableName, "record_metadata object(), record_content object()");
tableName, "record_metadata " + metadataType + ", record_content " + contentType, useVariant);
}

protected static void createIcebergTableWithColumnClause(String tableName, String columnClause) {
createIcebergTableWithColumnClause(tableName, columnClause, false);
}
protected static void createIcebergTableWithColumnClause(String tableName, String columnClause, boolean useVariant) {
String query =
"create or replace iceberg table identifier(?) ("
+ columnClause
+ ")"
+ "external_volume = 'test_exvol'"
+ "catalog = 'SNOWFLAKE'"
+ "base_location = 'it'";
+ ") "
+ "external_volume = 'test_exvol' "
+ "catalog = 'SNOWFLAKE' "
+ "base_location = 'it' ";

if (useVariant) {
query += "iceberg_version = 3";
}

doExecuteQueryWithParameter(query, tableName);
String allowStreamingIngestionQuery =
"alter iceberg table identifier(?) set ALLOW_STREAMING_INGESTION_FOR_MANAGED_ICEBERG ="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,23 @@ public abstract class IcebergIngestionIT extends BaseIcebergIT {
protected TopicPartition topicPartition;
protected SnowflakeSinkService service;
protected InMemoryKafkaRecordErrorReporter kafkaRecordErrorReporter;
protected Map<String, String> config;
protected static final String simpleRecordJson = "{\"simple\": \"extra field\"}";

@BeforeEach
public void setUp() {
tableName = TestUtils.randomTableName();
topic = tableName;
topicPartition = new TopicPartition(topic, PARTITION);
Map<String, String> config = getConfForStreaming();
config = getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(ICEBERG_ENABLED, "TRUE");
config.put(ENABLE_SCHEMATIZATION_CONFIG, isSchemaEvolutionEnabled().toString());
// "snowflake.streaming.max.client.lag" = 1 second, for faster tests
config.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1");
config.put(ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString());
config.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ");
config.put(ICEBERG_USE_VARIANT_TYPE, "true");

createIcebergTable();
enableSchemaEvolution(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ protected void createIcebergTable() {
tableName,
Utils.TABLE_COLUMN_METADATA
+ " "
+ IcebergDDLTypes.ICEBERG_METADATA_OBJECT_SCHEMA
+ IcebergDDLTypes.getMetadataType(config)
+ ", "
+ Utils.TABLE_COLUMN_CONTENT
+ " "
+ COMPLEX_JSON_RECORD_CONTENT_OBJECT_SCHEMA);
+ IcebergDDLTypes.getContentType(config, COMPLEX_JSON_RECORD_CONTENT_OBJECT_SCHEMA),
Utils.isIcebergUseVariantType(config)
);
}

private static Stream<Arguments> prepareData() {
Expand Down
Loading
Loading