diff --git a/debezium-bom/pom.xml b/debezium-bom/pom.xml
index 033b7d7836f..9a1083ad7f6 100644
--- a/debezium-bom/pom.xml
+++ b/debezium-bom/pom.xml
@@ -42,7 +42,7 @@
- 4.1.1
+ 6.0.0
2.17.241
diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java
index 5a86eb44abb..716f8fa95f9 100644
--- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java
+++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java
@@ -44,6 +44,36 @@ public class RedisCommonConfig {
.withDescription("Use SSL for Redis connection")
.withDefault(DEFAULT_SSL_ENABLED);
+ private static final String DEFAULT_TRUSTSTORE_PATH = "";
+ private static final Field PROP_SSL_TRUSTSTORE_PATH = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.truststore.path")
+ .withDescription("Path to the truststore file")
+ .withDefault(DEFAULT_TRUSTSTORE_PATH);
+
+ private static final String DEFAULT_TRUSTSTORE_PASSWORD = "";
+ private static final Field PROP_SSL_TRUSTSTORE_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.truststore.password")
+ .withDescription("Password for the truststore")
+ .withDefault(DEFAULT_TRUSTSTORE_PASSWORD);
+
+ private static final String DEFAULT_TRUSTSTORE_TYPE = "JKS";
+ private static final Field PROP_SSL_TRUSTSTORE_TYPE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.truststore.type")
+ .withDescription("Type of the truststore (e.g., JKS, PKCS12)")
+ .withDefault(DEFAULT_TRUSTSTORE_TYPE);
+
+ private static final String DEFAULT_KEYSTORE_PATH = "";
+ private static final Field PROP_SSL_KEYSTORE_PATH = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.keystore.path")
+ .withDescription("Path to the keystore file")
+ .withDefault(DEFAULT_KEYSTORE_PATH);
+
+ private static final String DEFAULT_KEYSTORE_PASSWORD = "";
+ private static final Field PROP_SSL_KEYSTORE_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.keystore.password")
+ .withDescription("Password for the keystore")
+ .withDefault(DEFAULT_KEYSTORE_PASSWORD);
+
+ private static final String DEFAULT_KEYSTORE_TYPE = "JKS";
+ private static final Field PROP_SSL_KEYSTORE_TYPE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.keystore.type")
+ .withDescription("Type of the keystore (e.g., JKS, PKCS12)")
+ .withDefault(DEFAULT_KEYSTORE_TYPE);
+
private static final boolean DEFAULT_HOSTNAME_VERIFICATION = false;
private static final Field PROP_SSL_HOSTNAME_VERIFICATION_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.hostname.verification.enabled")
.withDescription("Enable hostname verification")
@@ -99,8 +129,15 @@ public class RedisCommonConfig {
private int dbIndex;
private String user;
private String password;
+
private boolean sslEnabled;
private boolean hostnameVerificationEnabled;
+ private String truststorePath;
+ private String truststorePassword;
+ private String truststoreType;
+ private String keystorePath;
+ private String keystorePassword;
+ private String keystoreType;
private Integer initialRetryDelay;
private Integer maxRetryDelay;
@@ -126,9 +163,14 @@ public RedisCommonConfig(Configuration config, String prefix) {
}
protected List getAllConfigurationFields() {
- return Collect.arrayListOf(PROP_ADDRESS, PROP_DB_INDEX, PROP_USER, PROP_PASSWORD, PROP_SSL_ENABLED, PROP_SSL_HOSTNAME_VERIFICATION_ENABLED,
- PROP_CONNECTION_TIMEOUT, PROP_SOCKET_TIMEOUT, PROP_RETRY_INITIAL_DELAY, PROP_RETRY_MAX_DELAY, PROP_WAIT_ENABLED, PROP_WAIT_TIMEOUT,
- PROP_WAIT_RETRY_ENABLED, PROP_WAIT_RETRY_DELAY);
+ return Collect.arrayListOf(
+ PROP_ADDRESS, PROP_DB_INDEX, PROP_USER, PROP_PASSWORD,
+ PROP_SSL_ENABLED, PROP_SSL_HOSTNAME_VERIFICATION_ENABLED,
+ PROP_SSL_TRUSTSTORE_PATH, PROP_SSL_TRUSTSTORE_PASSWORD, PROP_SSL_TRUSTSTORE_TYPE,
+ PROP_SSL_KEYSTORE_PATH, PROP_SSL_KEYSTORE_PASSWORD, PROP_SSL_KEYSTORE_TYPE,
+ PROP_CONNECTION_TIMEOUT, PROP_SOCKET_TIMEOUT,
+ PROP_RETRY_INITIAL_DELAY, PROP_RETRY_MAX_DELAY,
+ PROP_WAIT_ENABLED, PROP_WAIT_TIMEOUT, PROP_WAIT_RETRY_ENABLED, PROP_WAIT_RETRY_DELAY);
}
protected void init(Configuration config) {
@@ -136,8 +178,15 @@ protected void init(Configuration config) {
dbIndex = config.getInteger(PROP_DB_INDEX);
user = config.getString(PROP_USER);
password = config.getString(PROP_PASSWORD);
+
sslEnabled = config.getBoolean(PROP_SSL_ENABLED);
hostnameVerificationEnabled = config.getBoolean(PROP_SSL_HOSTNAME_VERIFICATION_ENABLED);
+ truststorePath = config.getString(PROP_SSL_TRUSTSTORE_PATH);
+ truststorePassword = config.getString(PROP_SSL_TRUSTSTORE_PASSWORD);
+ truststoreType = config.getString(PROP_SSL_TRUSTSTORE_TYPE);
+ keystorePath = config.getString(PROP_SSL_KEYSTORE_PATH);
+ keystorePassword = config.getString(PROP_SSL_KEYSTORE_PASSWORD);
+ keystoreType = config.getString(PROP_SSL_KEYSTORE_TYPE);
initialRetryDelay = config.getInteger(PROP_RETRY_INITIAL_DELAY);
maxRetryDelay = config.getInteger(PROP_RETRY_MAX_DELAY);
@@ -216,4 +265,27 @@ public long getWaitRetryDelay() {
return waitRetryDelay;
}
+ public String getTruststorePath() {
+ return truststorePath;
+ }
+
+ public String getTruststorePassword() {
+ return truststorePassword;
+ }
+
+ public String getTruststoreType() {
+ return truststoreType;
+ }
+
+ public String getKeystorePath() {
+ return keystorePath;
+ }
+
+ public String getKeystorePassword() {
+ return keystorePassword;
+ }
+
+ public String getKeystoreType() {
+ return keystoreType;
+ }
}
diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisConnection.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisConnection.java
index 52512b8acf5..879a564b6fd 100644
--- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisConnection.java
+++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisConnection.java
@@ -5,6 +5,7 @@
*/
package io.debezium.storage.redis;
+import java.io.File;
import java.util.regex.Pattern;
import javax.net.ssl.SSLParameters;
@@ -19,6 +20,8 @@
import redis.clients.jedis.DefaultJedisClientConfig.Builder;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
+import redis.clients.jedis.SslOptions;
+import redis.clients.jedis.SslVerifyMode;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
@@ -34,36 +37,38 @@ public class RedisConnection {
public static final String DEBEZIUM_SCHEMA_HISTORY = "debezium:schema_history";
private static final String HOST_PORT_ERROR = "Invalid host:port format in '<...>.redis.address' property.";
- private String address;
- private int dbIndex;
- private String user;
- private String password;
- private int connectionTimeout;
- private int socketTimeout;
- private boolean sslEnabled;
- private boolean hostnameVerificationEnabled;
-
- /**
- *
- * @param address
- * @param user
- * @param password
- * @param connectionTimeout
- * @param socketTimeout
- * @param sslEnabled
- */
- public RedisConnection(String address, int dbIndex, String user, String password, int connectionTimeout, int socketTimeout, boolean sslEnabled,
- boolean hostnameVerificationEnabled) {
- validateHostPort(address);
-
- this.address = address;
- this.dbIndex = dbIndex;
- this.user = user;
- this.password = password;
- this.connectionTimeout = connectionTimeout;
- this.socketTimeout = socketTimeout;
- this.sslEnabled = sslEnabled;
- this.hostnameVerificationEnabled = hostnameVerificationEnabled;
+ private final String address;
+ private final int dbIndex;
+ private final String user;
+ private final String password;
+ private final int connectionTimeout;
+ private final int socketTimeout;
+ private final boolean sslEnabled;
+ private final boolean hostnameVerificationEnabled;
+ private final String truststorePath;
+ private final String truststorePassword;
+ private final String truststoreType;
+ private final String keystorePath;
+ private final String keystorePassword;
+ private final String keystoreType;
+
+ public RedisConnection(RedisCommonConfig config) {
+ validateHostPort(config.getAddress());
+
+ this.address = config.getAddress();
+ this.dbIndex = config.getDbIndex();
+ this.user = config.getUser();
+ this.password = config.getPassword();
+ this.connectionTimeout = config.getConnectionTimeout();
+ this.socketTimeout = config.getSocketTimeout();
+ this.sslEnabled = config.isSslEnabled();
+ this.hostnameVerificationEnabled = config.isHostnameVerificationEnabled();
+ this.truststorePath = config.getTruststorePath();
+ this.truststorePassword = config.getTruststorePassword();
+ this.truststoreType = config.getTruststoreType();
+ this.keystorePath = config.getKeystorePath();
+ this.keystorePassword = config.getKeystorePassword();
+ this.keystoreType = config.getKeystoreType();
}
/**
@@ -91,6 +96,32 @@ public RedisClient getRedisClient(String clientName, boolean waitEnabled, long w
.socketTimeoutMillis(this.socketTimeout)
.ssl(this.sslEnabled);
+ boolean configureSslOptions = this.sslEnabled && (!Strings.isNullOrEmpty(this.truststorePath) ||
+ !Strings.isNullOrEmpty(this.keystorePath));
+
+ // The SslOptions in Jedis override the default SSL context if explicitly configured.
+ // - When a custom truststore or keystore is provided for the Jedis client, hostname verification
+ // must also be configured explicitly through the SslOptions.
+ // - If no custom truststore or keystore is provided, hostname verification will rely on the
+ // SSLParameters, which use the truststore or keystore specified via system properties.
+ if (configureSslOptions) {
+ var tsPasswordRaw = !Strings.isNullOrEmpty(truststorePassword) ? truststorePassword.toCharArray() : null;
+ var ksPasswordRaw = !Strings.isNullOrEmpty(keystorePassword) ? keystorePassword.toCharArray() : null;
+ var sslOptions = SslOptions.builder()
+ .truststore(new File(truststorePath), tsPasswordRaw)
+ .trustStoreType(truststoreType)
+ .keystore(new File(keystorePath), ksPasswordRaw)
+ .keyStoreType(keystoreType)
+ .sslVerifyMode(hostnameVerificationEnabled ? SslVerifyMode.FULL : SslVerifyMode.CA)
+ .build();
+ configBuilder.sslOptions(sslOptions);
+ } else if (hostnameVerificationEnabled) {
+ // Enforce strict hostname verification to prevent man-in-the-middle attacks.
+ var sslParameters = new SSLParameters();
+ sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
+ configBuilder.sslParameters(sslParameters);
+ }
+
if (!Strings.isNullOrEmpty(this.user)) {
configBuilder = configBuilder.user(this.user);
}
@@ -99,13 +130,6 @@ public RedisClient getRedisClient(String clientName, boolean waitEnabled, long w
configBuilder = configBuilder.password(this.password);
}
- if (hostnameVerificationEnabled) {
- // Enforce strict hostname verification to prevent man-in-the-middle attacks.
- var sslParameters = new SSLParameters();
- sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
- configBuilder.sslParameters(sslParameters);
- }
-
client = new Jedis(address, configBuilder.build());
// make sure that client is connected
diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java
index 44ffb911492..ae2c55e94a8 100644
--- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java
+++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java
@@ -55,8 +55,7 @@ public class RedisSchemaHistory extends AbstractSchemaHistory {
private RedisSchemaHistoryConfig config;
void connect() {
- RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getDbIndex(), config.getUser(), config.getPassword(),
- config.getConnectionTimeout(), config.getSocketTimeout(), config.isSslEnabled(), config.isHostnameVerificationEnabled());
+ RedisConnection redisConnection = new RedisConnection(config);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY, config.isWaitEnabled(), config.getWaitTimeout(),
config.isWaitRetryEnabled(), config.getWaitRetryDelay());
}
diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/offset/RedisOffsetBackingStore.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/offset/RedisOffsetBackingStore.java
index 384dffe5b6c..4a254979003 100644
--- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/offset/RedisOffsetBackingStore.java
+++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/offset/RedisOffsetBackingStore.java
@@ -47,8 +47,7 @@ public void setRedisClient(RedisClient client) {
void connect() {
closeClient();
- RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getDbIndex(), config.getUser(), config.getPassword(),
- config.getConnectionTimeout(), config.getSocketTimeout(), config.isSslEnabled(), config.isHostnameVerificationEnabled());
+ RedisConnection redisConnection = new RedisConnection(config);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, config.isWaitEnabled(), config.getWaitTimeout(),
config.isWaitRetryEnabled(), config.getWaitRetryDelay());
}