Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion profile.json.example
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"user": "user name",
"role": "user role",
"private_key": "private key",
"host": "acountname.snowflakecomputing.com:443",
"host": "account_identifier.snowflakecomputing.com:443",
"schema": "schema name",
"database": "database name",
"warehouse": "warehouse name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import net.snowflake.ingest.streaming.*;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -76,7 +77,7 @@ public class DirectTopicPartitionChannel implements TopicPartitionChannel {
// should be skipped
private boolean needToSkipCurrentBatch = false;

private final SnowflakeStreamingIngestClient streamingIngestClient;
private SnowflakeStreamingIngestClient streamingIngestClient;

// Topic partition Object from connect consisting of topic and partition
private final TopicPartition topicPartition;
Expand Down Expand Up @@ -412,6 +413,11 @@ private InsertValidationResponse insertRowWithFallback(
*/
private void insertRowFallbackSupplier(Throwable ex)
throws TopicPartitionChannelInsertionException {
if (isClientInvalidError(ex)) {
this.streamingIngestClient =
StreamingClientProvider.getStreamingClientProviderInstance()
.recreateClient(sfConnectorConfig);
}
final long offsetRecoveredFromSnowflake =
streamingApiFallbackSupplier(StreamingApiFallbackInvoker.INSERT_ROWS_FALLBACK);
throw new TopicPartitionChannelInsertionException(
Expand Down Expand Up @@ -478,6 +484,19 @@ public long fetchOffsetTokenWithRetry() {
return offsetTokenExecutor.get(this::fetchLatestCommittedOffsetFromSnowflake);
}

/**
* Checks if the exception indicates a client invalidation error.
*
* @param e the exception to check
* @return true if the exception is a CLOSED_CLIENT error
*/
private boolean isClientInvalidError(Throwable e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to handle this in streamingIngestClient.openChannel()

Copy link
Author

@sfc-gh-aminyaylov sfc-gh-aminyaylov Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I added a client.isClosed() check to the openChannelForTable retry policy.

Furthermore, I'm thinking about possible race conditions when reopening a client that's shared by multiple channels, for example:

Channel A: insertRows() → recreateClient()
Channel B: openChannel() → recreateClient()

The following implicit locking should handle this safely. I also added a validity check to avoid double recreation:

// ConcurrentHashMap.compute() ensures that only one thread recreates a client for the specified key.
return this.registeredClients.asMap().compute(clientProperties, (key, client) -> {
  if (StreamingClientHandler.isClientValid(client)) {
    LOGGER.info("Client was already recreated by another thread: {}", client.getName());
    return client;
  }
  LOGGER.info("Recreating client: {}", clientProperties.toString());
  return this.streamingClientHandler.createClient(clientProperties);
});

if (!(e instanceof SFException)) {
return false;
}
return ErrorCode.CLOSED_CLIENT.getMessageCode().equals(((SFException) e).getVendorCode());
}

/**
* Fallback function to be executed when either of insertRows API or getOffsetToken sends
* SFException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,6 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
/* SinkTaskContext has access to all methods/APIs available to talk to Kafka Connect runtime*/
private final SinkTaskContext sinkTaskContext;

// ------ Streaming Ingest ------ //
// needs url, username. p8 key, role name
private final SnowflakeStreamingIngestClient streamingIngestClient;

// Config set in JSON
private final Map<String, String> connectorConfig;

Expand Down Expand Up @@ -150,9 +146,6 @@ public SnowflakeSinkServiceV2(
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL))
.map(Boolean::parseBoolean)
.orElse(SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL_DEFAULT);
this.streamingIngestClient =
StreamingClientProvider.getStreamingClientProviderInstance()
.getClient(this.connectorConfig);

this.behaviorOnNullValues = behaviorOnNullValues;
this.partitionsToChannel = new HashMap<>();
Expand Down Expand Up @@ -276,8 +269,12 @@ private TopicPartitionChannel createTopicPartitionChannel(
rowSchemaManager,
streamingErrorHandler);
} else {
// Get current client from provider (may have been recreated by another channel)
SnowflakeStreamingIngestClient client =
StreamingClientProvider.getStreamingClientProviderInstance()
.getClient(this.connectorConfig);
return new DirectTopicPartitionChannel(
this.streamingIngestClient,
client,
topicPartition,
partitionChannelKey, // Streaming channel name
tableName,
Expand Down Expand Up @@ -405,8 +402,10 @@ public void closeAll() {

partitionsToChannel.clear();

SnowflakeStreamingIngestClient client =
StreamingClientProvider.getStreamingClientProviderInstance().getClient(this.connectorConfig);
StreamingClientProvider.getStreamingClientProviderInstance()
.closeClient(this.connectorConfig, this.streamingIngestClient);
.closeClient(this.connectorConfig, client);
}

private void closeAllSequentially() {
Expand Down Expand Up @@ -528,13 +527,13 @@ private void stopForSSv1() {
// stopping the client may cause unexpected behaviour
if (!isOptimizationEnabled) {
try {
SnowflakeStreamingIngestClient client =
StreamingClientProvider.getStreamingClientProviderInstance()
.getClient(this.connectorConfig);
StreamingClientProvider.getStreamingClientProviderInstance()
.closeClient(connectorConfig, this.streamingIngestClient);
.closeClient(connectorConfig, client);
} catch (Exception e) {
LOGGER.warn(
"Could not close streaming ingest client {}. Reason: {}",
streamingIngestClient.getName(),
e.getMessage());
LOGGER.warn("Could not close streaming ingest client. Reason: {}", e.getMessage());
}
}
}
Expand Down Expand Up @@ -577,7 +576,8 @@ public static String partitionChannelKey(String topic, int partition) {
/* Used for testing */
@VisibleForTesting
public SnowflakeStreamingIngestClient getStreamingIngestClient() {
return this.streamingIngestClient;
return StreamingClientProvider.getStreamingClientProviderInstance()
.getClient(this.connectorConfig);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,25 @@ public SnowflakeStreamingIngestClient getClient(Map<String, String> connectorCon
return resultClient;
}

/**
* Recreates the client for the given connector configuration.
*
* @param connectorConfig The connector configuration
* @return The newly created client
*/
public SnowflakeStreamingIngestClient recreateClient(Map<String, String> connectorConfig) {
StreamingClientProperties clientProperties = new StreamingClientProperties(connectorConfig);

// Atomically update the cache entry and return the new client
return this.registeredClients.asMap().compute(clientProperties, (key, oldClient) -> {
if (oldClient != null) {
LOGGER.warn("Client is invalid, recreating streaming client: {}", oldClient.getName());
this.streamingClientHandler.closeClient(oldClient);
}
return this.streamingClientHandler.createClient(clientProperties);
});
}

/**
* Closes the given client and deregisters it from the cache if necessary. It will also call close
* on the registered client if exists, which should be the same as the given client so the call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
public class SnowpipeStreamingV2PartitionChannel implements TopicPartitionChannel {
private static final KCLogger LOGGER =
new KCLogger(SnowpipeStreamingV2PartitionChannel.class.getName());

// TODO: Replace with structured error code once SSv2 SDK provides error code constants
private static final String CLIENT_CLOSED_ERROR_CODE = "CLIENT_CLOSED";

private final StreamingClientProperties streamingClientProperties;
private final StreamingIngestClientV2Provider streamingIngestClientV2Provider;

Expand Down Expand Up @@ -338,6 +342,10 @@ private void insertRowWithFallback(Map<String, Object> transformedRecord, long o
*/
private void insertRowFallbackSupplier(Throwable ex)
throws TopicPartitionChannelInsertionException {
if (isClientInvalidError(ex)) {
streamingIngestClientV2Provider.recreateClient(
connectorConfig, pipeName, streamingClientProperties);
}
final long offsetRecoveredFromSnowflake =
streamingApiFallbackSupplier(StreamingApiFallbackInvoker.APPEND_ROW_FALLBACK);
throw new TopicPartitionChannelInsertionException(
Expand All @@ -355,6 +363,19 @@ public long fetchOffsetTokenWithRetry() {
return offsetTokenExecutor.get(this::fetchLatestCommittedOffsetFromSnowflake);
}

/**
* Checks if the exception indicates a client invalidation error.
*
* @param e the exception to check
* @return true if the exception is a CLOSED_CLIENT error
*/
private boolean isClientInvalidError(Throwable e) {
if (!(e instanceof SFException)) {
return false;
}
return CLIENT_CLOSED_ERROR_CODE.equals(((SFException) e).getErrorCodeName());
}

/**
* Fallback function to be executed when either of insertRows API or getOffsetToken sends
* SFException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import com.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeURL;
import com.snowflake.kafka.connector.internal.streaming.StreamingClientProperties;
import java.util.HashMap;
Expand All @@ -19,6 +20,8 @@
* within single method.
*/
public class StreamingIngestClientV2Provider {
private static final KCLogger LOGGER =
new KCLogger(StreamingIngestClientV2Provider.class.getName());

private static final String STREAMING_CLIENT_V2_PREFIX_NAME = "KC_CLIENT_V2_";
private static final String DEFAULT_CLIENT_NAME = "DEFAULT_CLIENT";
Expand All @@ -38,6 +41,27 @@ public StreamingIngestClientV2Wrapper getClient(
}
}

/**
* Recreates the client for the given pipe name.
*
* @param connectorConfig the connector configuration
* @param pipeName the pipe name
* @param streamingClientProperties the streaming client properties
*/
public void recreateClient(
Map<String, String> connectorConfig,
String pipeName,
StreamingClientProperties streamingClientProperties) {
synchronized (pipeToClientMap) {
SnowflakeStreamingIngestClient oldClient = pipeToClientMap.remove(pipeName);
if (oldClient != null) {
LOGGER.warn("Client is invalid, recreating client for pipe: {}", pipeName);
oldClient.close();
}
pipeToClientMap.put(pipeName, createClient(connectorConfig, pipeName, streamingClientProperties));
}
}

public void close(String pipeName) {
synchronized (pipeToClientMap) {
Optional.ofNullable(pipeToClientMap.remove(pipeName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TopicPartitionChannelIT {

Expand Down Expand Up @@ -61,6 +63,65 @@ public static void afterAll() {
conn.close();
}

/**
* Tests that when a client is closed, the connector:
* 1. Detects the client closed error
* 2. Recreates the client
* 3. Reopens the channel with the new client
* 4. Successfully inserts data
*
* @param ssv2Enabled whether to test SSv1 or SSv2
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testClientRecreationOn_ClosedClientError(boolean ssv2Enabled) throws Exception {
Map<String, String> config = getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_V2_ENABLED, String.valueOf(ssv2Enabled));

InMemorySinkTaskContext inMemorySinkTaskContext =
new InMemorySinkTaskContext(Collections.singleton(topicPartition));

SnowflakeSinkServiceV2 service =
StreamingSinkServiceBuilder.builder(conn, config)
.withSinkTaskContext(inMemorySinkTaskContext)
.build();
service.startPartition(testTableName, topicPartition);

// Insert initial records to verify setup
List<SinkRecord> initialRecords =
TestUtils.createJsonStringSinkRecords(0, 5, topic, PARTITION);
service.insert(initialRecords);

TestUtils.assertWithRetry(() -> service.getOffset(topicPartition) == 5);

// Get the current client name for comparison
SnowflakeStreamingIngestClient originalClient = service.getStreamingIngestClient();
String originalClientName = originalClient.getName();

// Close the client to simulate CLOSED_CLIENT error condition
originalClient.close();

// Insert more records - this should trigger CLOSED_CLIENT error and recreate the client
List<SinkRecord> newRecords = TestUtils.createJsonStringSinkRecords(5, 5, topic, PARTITION);
service.insert(newRecords);

// Verify data was inserted successfully despite client being closed
TestUtils.assertWithRetry(() -> service.getOffset(topicPartition) == 10);

// Verify a new client was created
SnowflakeStreamingIngestClient newClient = service.getStreamingIngestClient();
String newClientName = newClient.getName();
Assertions.assertNotEquals(
originalClientName, newClientName, "Client should have been recreated with a new name");

// Verify all data made it to Snowflake
Assertions.assertEquals(10, TestUtils.tableSize(testTableName));

service.closeAll();
}

@Test
public void testAutoChannelReopenOn_OffsetTokenSFException() throws Exception {
Map<String, String> config = getConfForStreaming();
Expand Down
Loading