diff --git a/docs/migration.adoc b/docs/migration.adoc index a759009..8c9b5ab 100644 --- a/docs/migration.adoc +++ b/docs/migration.adoc @@ -2,4 +2,5 @@ Next section describes the changes between releases. -include::migration/migration100to110.adoc[leveloffset=+1] \ No newline at end of file +include::migration/migration100to110.adoc[leveloffset=+1] +include::migration/migration110to120.adoc[leveloffset=+1] diff --git a/docs/migration/migration110to120.adoc b/docs/migration/migration110to120.adoc new file mode 100644 index 0000000..ae8466b --- /dev/null +++ b/docs/migration/migration110to120.adoc @@ -0,0 +1,9 @@ += 1.1.0 -> 1.2.0 + +== reactive-messaging-redisstream-connector + +* RedisStreamsConnector#prudentRun handling fix, separate handling of the flag per config. + +=== Migration + +Changes are backwards compatible doesn't need any migration. diff --git a/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java b/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java index 3a17ebf..ef95a9c 100644 --- a/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java +++ b/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java @@ -34,6 +34,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import jakarta.annotation.PostConstruct; import jakarta.annotation.Priority; @@ -110,7 +111,7 @@ public class RedisStreamsConnector implements InboundConnector, OutboundConnecto private final RedisStreamsProducer redisStreamsProducer; private String consumer; private volatile boolean consumerCancelled = false; - private volatile boolean prudentRun = true; + private final ConcurrentHashMap prudentRunMap = new ConcurrentHashMap<>(); private final List subscriptions = new CopyOnWriteArrayList<>(); private final List redisStreams = new CopyOnWriteArrayList<>(); private final Set underProcessing = ConcurrentHashMap.newKeySet(); @@ -209,6 +210,7 @@ public Flow.Publisher> getPublisher(Config config) { * @return the Multi for reading messages from the Redis stream */ protected Multi> xreadMulti(RedisStreams redisAPI, RedisStreamsConnectorIncomingConfiguration incomingConfig) { + final AtomicBoolean prudentRunFlag = getOrCreatePrudentRunFlag(incomingConfig); return Multi.createBy() // Multi creation .repeating() @@ -218,7 +220,7 @@ protected Multi> xreadMulti(RedisStreams redisAPI, RedisStreamsC // Log first or recovered subscription to multi .onSubscription() .invoke(() -> { - if (prudentRun) { + if (prudentRunFlag.get()) { log.infov( "Subscribing channel [{0}] to redis stream [{1}] consuming group [{2}] as consumer [{3}] using redis connection key [{4}]", incomingConfig.getChannel(), @@ -260,7 +262,7 @@ protected Multi> xreadMulti(RedisStreams redisAPI, RedisStreamsC "Uncaught exception while processing messages from channel [{0}], trying to recover..", incomingConfig.getChannel()); // ensure that the subscription is logged again to have information on recovery - prudentRun = true; + prudentRunFlag.set(true); } // try to recover only if the consumer is not cancelled return !consumerCancelled; @@ -407,9 +409,10 @@ protected Uni callXack(StreamEntry streamEntry, RedisStreams redisAPI, Red * @return a Uni containing a list of stream entries read from the Redis stream */ private Uni> xReadMessage(RedisStreams redisAPI, RedisStreamsConnectorIncomingConfiguration incomingConfig) { + final AtomicBoolean prudentRunFlag = getOrCreatePrudentRunFlag(incomingConfig); String streamKey = incomingConfig.getStreamKey(); String group = incomingConfig.getGroup(); - return Uni.createFrom().item(prudentRun).flatMap(prudent -> { + return Uni.createFrom().item(prudentRunFlag.get()).flatMap(prudent -> { if (!prudent) { return Uni.createFrom().item(true); } @@ -422,7 +425,7 @@ private Uni> xReadMessage(RedisStreams redisAPI, RedisStreamsC .withBackOff(Duration.of(1, ChronoUnit.SECONDS), Duration.of(30, ChronoUnit.SECONDS)) .atMost(3) // we created the group so prudent run is not needed anymore - .invoke(() -> prudentRun = false) + .invoke(() -> prudentRunFlag.set(false)) .replaceWith(xReadGroup(redisAPI, incomingConfig, streamKey, group)); } @@ -581,4 +584,9 @@ private boolean isGroupAlreadyExists(Throwable throwable) { return Optional.ofNullable(throwable.getMessage()).filter(m -> m.startsWith("BUSYGROUP")).isPresent(); } + private AtomicBoolean getOrCreatePrudentRunFlag(RedisStreamsConnectorIncomingConfiguration incomingConfig) { + String key = incomingConfig.getStreamKey() + "::" + incomingConfig.getGroup(); + return prudentRunMap.computeIfAbsent(key, k -> new AtomicBoolean(true)); + } + }