-
Notifications
You must be signed in to change notification settings - Fork 108
SNOW-2193898 Recreate streaming ingest client on CLIENT_CLOSED error #1185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| * @param e the exception to check | ||
| * @return true if the exception is a CLOSED_CLIENT error | ||
| */ | ||
| private boolean isClientInvalidError(Throwable e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also need to handle this in streamingIngestClient.openChannel()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I added a client.isClosed() check to the openChannelForTable retry policy.
Furthermore, I'm thinking about possible race conditions when reopening a client that's shared by multiple channels, for example:
Channel A: insertRows() → recreateClient()
Channel B: openChannel() → recreateClient()
The following implicit locking should handle this safely. I also added a validity check to avoid double recreation:
// ConcurrentHashMap.compute() ensures that only one thread recreates a client for the specified key.
return this.registeredClients.asMap().compute(clientProperties, (key, client) -> {
if (StreamingClientHandler.isClientValid(client)) {
LOGGER.info("Client was already recreated by another thread: {}", client.getName());
return client;
}
LOGGER.info("Recreating client: {}", clientProperties.toString());
return this.streamingClientHandler.createClient(clientProperties);
});
This stack of pull requests is managed by Graphite. Learn more about stacking. |

Overview
SNOW-2193898 Support transparent failover with Client Redirect in streaming mode
Problem
When Streaming Ingest SDK returns a
CLOSED_CLIENTerror, Kafka Connector will attempt to reopen channels with the same closed client, leading to repeated failures.Customers are currently required to manually restart their KC after changing deployment: https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-classic-kafka#failover-limitations
Solution
Implement automatic client recreation when a
CLOSED_CLIENTerror is detected. Streaming Ingest SDK will separately be updated to close the client when it detects a change to the primary deployment.Changes
SSv1
DirectTopicPartitionChannelstreamingIngestClientfield non-final to allow updatesinsertRowFallbackSupplier()to callStreamingClientProvider.recreateClient()when client is closedSSv2
SnowpipeStreamingV2PartitionChannelCLIENT_CLOSED_ERROR_CODEconstant with TODO (SSv2 SDK does not yet return structured error codes)insertRowFallbackSupplier()to callStreamingIngestClientV2Provider.recreateClient()Client optimization
Be mindful to support both optimized and unoptimized path. To summarize:
StreamingClientProvider. In this case, theSinkService does not cache the client.SinkService manages its own client and the shared cache managed by StreamingClientProvider is not used. In this case,SinkService must maintain its own reference to the client object and handle closing (which in the optimized path is done by the provider cache eviction policy).Testing
testClientRecreationOn_ClosedClientError()that validates:Urgency
This review is high priority. Customer requesting KC streaming ingest support for Client Redirect.
Risks
Client recreate is an expensive operation. However:
Backward/forward compatible
[x] This change does not introduce any breaking API or data format changes.
Pre-review checklist
snowflake.ingestion.method.Yes- Added end to end and Unit Tests.No- This change should be enabled on all KC instances.