Skip to content

Conversation

@sfc-gh-aminyaylov
Copy link

@sfc-gh-aminyaylov sfc-gh-aminyaylov commented Oct 15, 2025

Overview

SNOW-2193898 Support transparent failover with Client Redirect in streaming mode

Problem

When Streaming Ingest SDK returns a CLOSED_CLIENT error, Kafka Connector will attempt to reopen channels with the same closed client, leading to repeated failures.

Customers are currently required to manually restart their KC after changing deployment: https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-classic-kafka#failover-limitations

Solution

Implement automatic client recreation when a CLOSED_CLIENT error is detected. Streaming Ingest SDK will separately be updated to close the client when it detects a change to the primary deployment.

Changes

SSv1

  • DirectTopicPartitionChannel
    • Make streamingIngestClient field non-final to allow updates
    • Update insertRowFallbackSupplier() to call StreamingClientProvider.recreateClient() when client is closed

SSv2

  • SnowpipeStreamingV2PartitionChannel
    • Add CLIENT_CLOSED_ERROR_CODE constant with TODO (SSv2 SDK does not yet return structured error codes)
    • Update insertRowFallbackSupplier() to call StreamingIngestClientV2Provider.recreateClient()

Client optimization

Be mindful to support both optimized and unoptimized path. To summarize:

  • If optimization is enabled, all client with the same client properties will share a single client, managed by the StreamingClientProvider​. In this case, the SinkServicedoes not cache the client.
  • If optimization is disabled, every SinkService​ manages its own client and the shared cache managed by StreamingClientProvider is not used. In this case, SinkServicemust maintain its own reference to the client object and handle closing (which in the optimized path is done by the provider cache eviction policy).

Testing

  • Added IT testClientRecreationOn_ClosedClientError() that validates:
    1. Client is closed (simulating CLOSED_CLIENT error)
    2. New client is created on next operation
    3. Data continues to be inserted successfully
    4. Works for both SSv1 and SSv2

Urgency

This review is high priority. Customer requesting KC streaming ingest support for Client Redirect.

Risks

Client recreate is an expensive operation. However:

  1. The current response of reopening channel without fixing the underlying issue is wasteful.
  2. A well-behaved customer following documentation will already be performing this action automatically when changing primary deployments.

Backward/forward compatible

[x] This change does not introduce any breaking API or data format changes.

Pre-review checklist

  • This change should be part of a Behavior Change Release. See go/behavior-change.
  • This change has passed Merge gate tests
  • Snowpipe Changes
  • Snowpipe Streaming Changes
  • This change is TEST-ONLY
  • This change is README/Javadocs only
  • This change is protected by a config parameter <PARAMETER_NAME> eg snowflake.ingestion.method.
    • Yes - Added end to end and Unit Tests.
    • No - This change should be enabled on all KC instances.
  • Is this change protected by parameter <PARAMETER_NAME> on the server side?
    • The parameter/feature is not yet active in production (partial rollout or PrPr, see Changes for Unreleased Features and Fixes).
    • If there is an issue, it can be safely mitigated by turning the parameter off. This is also verified by a test (See go/ppp).

* @param e the exception to check
* @return true if the exception is a CLOSED_CLIENT error
*/
private boolean isClientInvalidError(Throwable e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to handle this in streamingIngestClient.openChannel()

Copy link
Author

@sfc-gh-aminyaylov sfc-gh-aminyaylov Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I added a client.isClosed() check to the openChannelForTable retry policy.

Furthermore, I'm thinking about possible race conditions when reopening a client that's shared by multiple channels, for example:

Channel A: insertRows() → recreateClient()
Channel B: openChannel() → recreateClient()

The following implicit locking should handle this safely. I also added a validity check to avoid double recreation:

// ConcurrentHashMap.compute() ensures that only one thread recreates a client for the specified key.
return this.registeredClients.asMap().compute(clientProperties, (key, client) -> {
  if (StreamingClientHandler.isClientValid(client)) {
    LOGGER.info("Client was already recreated by another thread: {}", client.getName());
    return client;
  }
  LOGGER.info("Recreating client: {}", clientProperties.toString());
  return this.streamingClientHandler.createClient(clientProperties);
});

Copy link
Author

This stack of pull requests is managed by Graphite. Learn more about stacking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants