diff --git a/docs/connector/configuration-reference.adoc b/docs/connector/configuration-reference.adoc
index 9e8a8d9..c605830 100644
--- a/docs/connector/configuration-reference.adoc
+++ b/docs/connector/configuration-reference.adoc
@@ -47,11 +47,17 @@ For example if you use `quarkus.redis.*my-redis*.hosts` configuration then you c
| INCOMING
| `xread-noack`
-| Include the `NOACK` parameter in the `XREADGROUP` call
+| Include the `NOACK` parameter in the `XREADGROUP` call.
| `true`
| No
| INCOMING
+| `broadcast`
+| Allow the received entries to be consumed by multiple channels.
+| `false`
+| No
+| INCOMING
+
| `xadd-maxlen`
| The maximum number of entries to keep in the stream (trims old entries).
| -
diff --git a/docs/migration/migration110to120.adoc b/docs/migration/migration110to120.adoc
index ae8466b..0fbf5ad 100644
--- a/docs/migration/migration110to120.adoc
+++ b/docs/migration/migration110to120.adoc
@@ -3,7 +3,17 @@
== reactive-messaging-redisstream-connector
* RedisStreamsConnector#prudentRun handling fix, separate handling of the flag per config.
+* Consume messages on vert.x context to retain context data in all cases.
+* Added `broadcast` incoming configuration to allow multiple consumers on the same channel (ie. multiple methods with `@Incoming` for the same channel).
=== Migration
Changes are backwards compatible doesn't need any migration.
+
+== quarkus-reactive-messaging-redisstream-extension-sample
+
+* Added both imperative and reactive example for `@Incoming` usage.
+
+=== Migration
+
+Changes are backwards compatible doesn't need any migration.
\ No newline at end of file
diff --git a/quarkus-reactive-messaging-redisstream-extension-sample/pom.xml b/quarkus-reactive-messaging-redisstream-extension-sample/pom.xml
index f36a10a..4bb26ce 100644
--- a/quarkus-reactive-messaging-redisstream-extension-sample/pom.xml
+++ b/quarkus-reactive-messaging-redisstream-extension-sample/pom.xml
@@ -51,13 +51,17 @@
io.quarkus
quarkus-rest-jackson
+
+
+
+
io.quarkus
- quarkus-messaging-kafka
+ quarkus-arc
io.quarkus
- quarkus-arc
+ quarkus-config-yaml
io.quarkus
@@ -73,10 +77,6 @@
rest-assured
test
-
- io.quarkus
- quarkus-messaging-amqp
-
io.quarkus
quarkus-redis-client
@@ -89,6 +89,10 @@
io.quarkus
quarkus-smallrye-fault-tolerance
+
+ hu.icellmobilsoft.reactive.messaging.redisstream
+ quarkus-reactive-messaging-redisstream-additional-fields
+
diff --git a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/java/hu/icellmobilsoft/quarkus/sample/MyMessagingApplication.java b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/java/hu/icellmobilsoft/quarkus/sample/MyMessagingApplication.java
index 85aba5e..76c21d9 100644
--- a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/java/hu/icellmobilsoft/quarkus/sample/MyMessagingApplication.java
+++ b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/java/hu/icellmobilsoft/quarkus/sample/MyMessagingApplication.java
@@ -19,14 +19,16 @@
*/
package hu.icellmobilsoft.quarkus.sample;
-import hu.icellmobilsoft.reactive.messaging.redis.streams.metadata.IncomingRedisStreamMetadata;
-import io.quarkus.logging.Log;
-import io.quarkus.runtime.StartupEvent;
-import io.smallrye.mutiny.Uni;
-import io.smallrye.reactive.messaging.annotations.Blocking;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
+
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@@ -35,10 +37,11 @@
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.MDC;
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
+import hu.icellmobilsoft.reactive.messaging.redis.streams.metadata.IncomingRedisStreamMetadata;
+import io.quarkus.logging.Log;
+import io.quarkus.runtime.StartupEvent;
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.annotations.Blocking;
/**
* Class that handles messages
@@ -99,6 +102,32 @@ public void sendMessage(String message) {
@Blocking(ordered = false, value = "incoming-pool")
@Retry(maxRetries = 2)
public String toUpperCase(String payload) {
+ return doAction(payload);
+ }
+
+ /**
+ * Consume the message from the "words-in" channel using reactive signature.
+ *
+ * @param message
+ * reactive message containing the payload
+ *
+ * @return a void completion stage
+ */
+ @Incoming("words-in")
+ @Blocking(ordered = false, value = "incoming-pool")
+ @Retry(maxRetries = 2)
+ public CompletionStage processReactive(Message message) {
+ try {
+ String result = doAction(message.getPayload());
+ Log.infov("Processed message: {0}", result);
+ return message.ack();
+ } catch (Exception e) {
+ Log.error("Error during logging metadata", e);
+ return message.nack(e);
+ }
+ }
+
+ private String doAction(String payload) {
Log.infov("Message received: [{0}]", payload);
try {
TimeUnit.MILLISECONDS.sleep(2000);
diff --git a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application-amqp.properties b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application-amqp.properties
deleted file mode 100644
index 7e24883..0000000
--- a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application-amqp.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-mp.messaging.incoming.words-in.address=words
-mp.messaging.incoming.words-in.connector=smallrye-amqp
-mp.messaging.outgoing.words-out.address=words
-mp.messaging.outgoing.words-out.connector=smallrye-amqp
-quarkus.kafka.devservices.provider=strimzi
diff --git a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.properties b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.properties
deleted file mode 100644
index 9c8f2af..0000000
--- a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.properties
+++ /dev/null
@@ -1,15 +0,0 @@
-#quarkus.profile=kafka
-mp.messaging.incoming.words-in.connector=reactive-messaging-redis-streams
-mp.messaging.incoming.words-in.stream-key=sample-stream
-mp.messaging.incoming.words-in.group=sample-service
-mp.messaging.incoming.words-in.xread-block-ms=60000
-smallrye.messaging.worker.incoming-pool.max-concurrency=15
-mp.messaging.outgoing.words-out.connector=reactive-messaging-redis-streams
-mp.messaging.outgoing.words-out.stream-key=sample-stream
-mp.messaging.outgoing.words-out.xadd-maxlen=1000
-mp.messaging.outgoing.words-out.xadd-ttl-ms=1000
-#mp.messaging.incoming.words-in.connection-key=redis2
-mp.messaging.connector.reactive-messaging-redis-streams.graceful-timeout-ms=10000
-#quarkus.redis.redis2.hosts=redis://localhost:6379
-quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) [sid:%X{extSessionId}] %s%e%n
-quarkus.log.category."hu.icellmobilsoft".min-level=TRACE
diff --git a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.yml b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.yml
new file mode 100644
index 0000000..ce599fc
--- /dev/null
+++ b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.yml
@@ -0,0 +1,40 @@
+mp:
+ messaging:
+ connector:
+ reactive-messaging-redis-streams:
+ graceful-timeout-ms: 10000
+ incoming:
+ words-in:
+ connector: reactive-messaging-redis-streams
+ group: sample-service
+ stream-key: sample-stream
+ xread-block-ms: 60000
+ #connection-key: redis2
+ broadcast: true
+ outgoing:
+ words-out:
+ connector: reactive-messaging-redis-streams
+ stream-key: sample-stream
+ xadd-maxlen: 1000
+ xadd-ttl-ms: 100000
+quarkus:
+ redis:
+ max-pool-size: 60
+ max-pool-waiting: 100
+ # redis:
+ # redis2:
+ # hosts: redis://localhost:6379
+ log:
+ category:
+ "hu.icellmobilsoft":
+ # level: TRACE
+ min-level: TRACE
+ "hu.icellmobilsoft.quarkus.extension.redisstream.runtime.QuarkusRedisStreamsAdapter":
+ level: TRACE
+ console:
+ format: '%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) [sid:%X{extSessionId}] %s%e%n'
+smallrye:
+ messaging:
+ worker:
+ incoming-pool:
+ max-concurrency: 100
diff --git a/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java b/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java
index ef95a9c..428418f 100644
--- a/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java
+++ b/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java
@@ -62,8 +62,10 @@
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
+import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
+import io.vertx.mutiny.core.Vertx;
/**
* Microprofile Reactive Streams connector for Redis Streams integration.
@@ -88,6 +90,8 @@
@ConnectorAttribute(name = "xread-noack", description = "Include the NOACK parameter in the XREADGROUP call", type = "boolean",
defaultValue = "true",
direction = ConnectorAttribute.Direction.INCOMING)
+@ConnectorAttribute(name = "broadcast", description = "Allow the received entries to be consumed by multiple channels", type = "boolean", defaultValue = "false",
+ direction = ConnectorAttribute.Direction.INCOMING)
@ConnectorAttribute(name = "xadd-maxlen", description = "The maximum number of entries to keep in the stream", type = "int",
direction = ConnectorAttribute.Direction.OUTGOING)
@ConnectorAttribute(name = "xadd-exact-maxlen", description = "Use exact trimming for MAXLEN parameter", type = "boolean", defaultValue = "false",
@@ -110,6 +114,7 @@ public class RedisStreamsConnector implements InboundConnector, OutboundConnecto
private final RedisStreamsProducer redisStreamsProducer;
private String consumer;
+ private Vertx vertx;
private volatile boolean consumerCancelled = false;
private final ConcurrentHashMap prudentRunMap = new ConcurrentHashMap<>();
private final List subscriptions = new CopyOnWriteArrayList<>();
@@ -117,6 +122,7 @@ public class RedisStreamsConnector implements InboundConnector, OutboundConnecto
private final Set underProcessing = ConcurrentHashMap.newKeySet();
private final ReducableSemaphore shutdownPermit = new ReducableSemaphore(1);
private final Integer gracefulShutdownTimeout;
+ private final ExecutionHolder executionHolder;
/**
* Constructs a RedisStreamsConnector with the specified CDI RedisStreamsProducer.
@@ -125,13 +131,17 @@ public class RedisStreamsConnector implements InboundConnector, OutboundConnecto
* the RedisStreamsProducer to be injected
* @param gracefulShutdownTimeout
* graceful timeout config in ms (default {@literal 60_000})
+ * @param executionHolder
+ * the reactive ExecutionHolder to be injected
*/
@Inject
public RedisStreamsConnector(RedisStreamsProducer redisStreamsProducer,
@ConfigProperty(name = ConnectorFactory.CONNECTOR_PREFIX + REACTIVE_MESSAGING_REDIS_STREAMS_CONNECTOR + ".graceful-timeout-ms",
- defaultValue = "60000") Integer gracefulShutdownTimeout) {
+ defaultValue = "60000") Integer gracefulShutdownTimeout,
+ ExecutionHolder executionHolder) {
this.redisStreamsProducer = redisStreamsProducer;
this.gracefulShutdownTimeout = gracefulShutdownTimeout;
+ this.executionHolder = executionHolder;
}
/**
@@ -140,6 +150,7 @@ public RedisStreamsConnector(RedisStreamsProducer redisStreamsProducer,
@PostConstruct
public void init() {
this.consumer = UUID.randomUUID().toString();
+ this.vertx = executionHolder.vertx();
}
/**
@@ -196,7 +207,11 @@ public Flow.Publisher extends Message>> getPublisher(Config config) {
RedisStreams redisAPI = redisStreamsProducer.produce(incomingConfig.getConnectionKey());
redisStreams.add(redisAPI);
- return xreadMulti(redisAPI, incomingConfig);
+ Multi> publisher = xreadMulti(redisAPI, incomingConfig);
+ if(Boolean.TRUE.equals(incomingConfig.getBroadcast())){
+ publisher = publisher.broadcast().toAllSubscribers();
+ }
+ return publisher;
}
/**
@@ -275,7 +290,9 @@ protected Multi> xreadMulti(RedisStreams redisAPI, RedisStreamsC
.invoke(() -> {
consumerCancelled = true;
log.tracev("Subscription for channel [{0}] has been cancelled", incomingConfig.getChannel());
- });
+ })
+ // Ensure execution on Vert.x context in order to have context propagation
+ .emitOn(cmd -> vertx.getOrCreateContext().runOnContext(cmd));
}
/**