diff --git a/docs/connector/configuration-reference.adoc b/docs/connector/configuration-reference.adoc index 9e8a8d9..c605830 100644 --- a/docs/connector/configuration-reference.adoc +++ b/docs/connector/configuration-reference.adoc @@ -47,11 +47,17 @@ For example if you use `quarkus.redis.*my-redis*.hosts` configuration then you c | INCOMING | `xread-noack` -| Include the `NOACK` parameter in the `XREADGROUP` call +| Include the `NOACK` parameter in the `XREADGROUP` call. | `true` | No | INCOMING +| `broadcast` +| Allow the received entries to be consumed by multiple channels. +| `false` +| No +| INCOMING + | `xadd-maxlen` | The maximum number of entries to keep in the stream (trims old entries). | - diff --git a/docs/migration/migration110to120.adoc b/docs/migration/migration110to120.adoc index ae8466b..0fbf5ad 100644 --- a/docs/migration/migration110to120.adoc +++ b/docs/migration/migration110to120.adoc @@ -3,7 +3,17 @@ == reactive-messaging-redisstream-connector * RedisStreamsConnector#prudentRun handling fix, separate handling of the flag per config. +* Consume messages on vert.x context to retain context data in all cases. +* Added `broadcast` incoming configuration to allow multiple consumers on the same channel (ie. multiple methods with `@Incoming` for the same channel). === Migration Changes are backwards compatible doesn't need any migration. + +== quarkus-reactive-messaging-redisstream-extension-sample + +* Added both imperative and reactive example for `@Incoming` usage. + +=== Migration + +Changes are backwards compatible doesn't need any migration. \ No newline at end of file diff --git a/quarkus-reactive-messaging-redisstream-extension-sample/pom.xml b/quarkus-reactive-messaging-redisstream-extension-sample/pom.xml index f36a10a..4bb26ce 100644 --- a/quarkus-reactive-messaging-redisstream-extension-sample/pom.xml +++ b/quarkus-reactive-messaging-redisstream-extension-sample/pom.xml @@ -51,13 +51,17 @@ io.quarkus quarkus-rest-jackson + + + + io.quarkus - quarkus-messaging-kafka + quarkus-arc io.quarkus - quarkus-arc + quarkus-config-yaml io.quarkus @@ -73,10 +77,6 @@ rest-assured test - - io.quarkus - quarkus-messaging-amqp - io.quarkus quarkus-redis-client @@ -89,6 +89,10 @@ io.quarkus quarkus-smallrye-fault-tolerance + + hu.icellmobilsoft.reactive.messaging.redisstream + quarkus-reactive-messaging-redisstream-additional-fields + diff --git a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/java/hu/icellmobilsoft/quarkus/sample/MyMessagingApplication.java b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/java/hu/icellmobilsoft/quarkus/sample/MyMessagingApplication.java index 85aba5e..76c21d9 100644 --- a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/java/hu/icellmobilsoft/quarkus/sample/MyMessagingApplication.java +++ b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/java/hu/icellmobilsoft/quarkus/sample/MyMessagingApplication.java @@ -19,14 +19,16 @@ */ package hu.icellmobilsoft.quarkus.sample; -import hu.icellmobilsoft.reactive.messaging.redis.streams.metadata.IncomingRedisStreamMetadata; -import io.quarkus.logging.Log; -import io.quarkus.runtime.StartupEvent; -import io.smallrye.mutiny.Uni; -import io.smallrye.reactive.messaging.annotations.Blocking; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; + import org.eclipse.microprofile.faulttolerance.Retry; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; @@ -35,10 +37,11 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.jboss.logging.MDC; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; +import hu.icellmobilsoft.reactive.messaging.redis.streams.metadata.IncomingRedisStreamMetadata; +import io.quarkus.logging.Log; +import io.quarkus.runtime.StartupEvent; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.annotations.Blocking; /** * Class that handles messages @@ -99,6 +102,32 @@ public void sendMessage(String message) { @Blocking(ordered = false, value = "incoming-pool") @Retry(maxRetries = 2) public String toUpperCase(String payload) { + return doAction(payload); + } + + /** + * Consume the message from the "words-in" channel using reactive signature. + * + * @param message + * reactive message containing the payload + * + * @return a void completion stage + */ + @Incoming("words-in") + @Blocking(ordered = false, value = "incoming-pool") + @Retry(maxRetries = 2) + public CompletionStage processReactive(Message message) { + try { + String result = doAction(message.getPayload()); + Log.infov("Processed message: {0}", result); + return message.ack(); + } catch (Exception e) { + Log.error("Error during logging metadata", e); + return message.nack(e); + } + } + + private String doAction(String payload) { Log.infov("Message received: [{0}]", payload); try { TimeUnit.MILLISECONDS.sleep(2000); diff --git a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application-amqp.properties b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application-amqp.properties deleted file mode 100644 index 7e24883..0000000 --- a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application-amqp.properties +++ /dev/null @@ -1,5 +0,0 @@ -mp.messaging.incoming.words-in.address=words -mp.messaging.incoming.words-in.connector=smallrye-amqp -mp.messaging.outgoing.words-out.address=words -mp.messaging.outgoing.words-out.connector=smallrye-amqp -quarkus.kafka.devservices.provider=strimzi diff --git a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.properties b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.properties deleted file mode 100644 index 9c8f2af..0000000 --- a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.properties +++ /dev/null @@ -1,15 +0,0 @@ -#quarkus.profile=kafka -mp.messaging.incoming.words-in.connector=reactive-messaging-redis-streams -mp.messaging.incoming.words-in.stream-key=sample-stream -mp.messaging.incoming.words-in.group=sample-service -mp.messaging.incoming.words-in.xread-block-ms=60000 -smallrye.messaging.worker.incoming-pool.max-concurrency=15 -mp.messaging.outgoing.words-out.connector=reactive-messaging-redis-streams -mp.messaging.outgoing.words-out.stream-key=sample-stream -mp.messaging.outgoing.words-out.xadd-maxlen=1000 -mp.messaging.outgoing.words-out.xadd-ttl-ms=1000 -#mp.messaging.incoming.words-in.connection-key=redis2 -mp.messaging.connector.reactive-messaging-redis-streams.graceful-timeout-ms=10000 -#quarkus.redis.redis2.hosts=redis://localhost:6379 -quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) [sid:%X{extSessionId}] %s%e%n -quarkus.log.category."hu.icellmobilsoft".min-level=TRACE diff --git a/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.yml b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.yml new file mode 100644 index 0000000..ce599fc --- /dev/null +++ b/quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.yml @@ -0,0 +1,40 @@ +mp: + messaging: + connector: + reactive-messaging-redis-streams: + graceful-timeout-ms: 10000 + incoming: + words-in: + connector: reactive-messaging-redis-streams + group: sample-service + stream-key: sample-stream + xread-block-ms: 60000 + #connection-key: redis2 + broadcast: true + outgoing: + words-out: + connector: reactive-messaging-redis-streams + stream-key: sample-stream + xadd-maxlen: 1000 + xadd-ttl-ms: 100000 +quarkus: + redis: + max-pool-size: 60 + max-pool-waiting: 100 + # redis: + # redis2: + # hosts: redis://localhost:6379 + log: + category: + "hu.icellmobilsoft": + # level: TRACE + min-level: TRACE + "hu.icellmobilsoft.quarkus.extension.redisstream.runtime.QuarkusRedisStreamsAdapter": + level: TRACE + console: + format: '%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) [sid:%X{extSessionId}] %s%e%n' +smallrye: + messaging: + worker: + incoming-pool: + max-concurrency: 100 diff --git a/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java b/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java index ef95a9c..428418f 100644 --- a/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java +++ b/reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java @@ -62,8 +62,10 @@ import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.connector.OutboundConnector; +import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; +import io.vertx.mutiny.core.Vertx; /** * Microprofile Reactive Streams connector for Redis Streams integration. @@ -88,6 +90,8 @@ @ConnectorAttribute(name = "xread-noack", description = "Include the NOACK parameter in the XREADGROUP call", type = "boolean", defaultValue = "true", direction = ConnectorAttribute.Direction.INCOMING) +@ConnectorAttribute(name = "broadcast", description = "Allow the received entries to be consumed by multiple channels", type = "boolean", defaultValue = "false", + direction = ConnectorAttribute.Direction.INCOMING) @ConnectorAttribute(name = "xadd-maxlen", description = "The maximum number of entries to keep in the stream", type = "int", direction = ConnectorAttribute.Direction.OUTGOING) @ConnectorAttribute(name = "xadd-exact-maxlen", description = "Use exact trimming for MAXLEN parameter", type = "boolean", defaultValue = "false", @@ -110,6 +114,7 @@ public class RedisStreamsConnector implements InboundConnector, OutboundConnecto private final RedisStreamsProducer redisStreamsProducer; private String consumer; + private Vertx vertx; private volatile boolean consumerCancelled = false; private final ConcurrentHashMap prudentRunMap = new ConcurrentHashMap<>(); private final List subscriptions = new CopyOnWriteArrayList<>(); @@ -117,6 +122,7 @@ public class RedisStreamsConnector implements InboundConnector, OutboundConnecto private final Set underProcessing = ConcurrentHashMap.newKeySet(); private final ReducableSemaphore shutdownPermit = new ReducableSemaphore(1); private final Integer gracefulShutdownTimeout; + private final ExecutionHolder executionHolder; /** * Constructs a RedisStreamsConnector with the specified CDI RedisStreamsProducer. @@ -125,13 +131,17 @@ public class RedisStreamsConnector implements InboundConnector, OutboundConnecto * the RedisStreamsProducer to be injected * @param gracefulShutdownTimeout * graceful timeout config in ms (default {@literal 60_000}) + * @param executionHolder + * the reactive ExecutionHolder to be injected */ @Inject public RedisStreamsConnector(RedisStreamsProducer redisStreamsProducer, @ConfigProperty(name = ConnectorFactory.CONNECTOR_PREFIX + REACTIVE_MESSAGING_REDIS_STREAMS_CONNECTOR + ".graceful-timeout-ms", - defaultValue = "60000") Integer gracefulShutdownTimeout) { + defaultValue = "60000") Integer gracefulShutdownTimeout, + ExecutionHolder executionHolder) { this.redisStreamsProducer = redisStreamsProducer; this.gracefulShutdownTimeout = gracefulShutdownTimeout; + this.executionHolder = executionHolder; } /** @@ -140,6 +150,7 @@ public RedisStreamsConnector(RedisStreamsProducer redisStreamsProducer, @PostConstruct public void init() { this.consumer = UUID.randomUUID().toString(); + this.vertx = executionHolder.vertx(); } /** @@ -196,7 +207,11 @@ public Flow.Publisher> getPublisher(Config config) { RedisStreams redisAPI = redisStreamsProducer.produce(incomingConfig.getConnectionKey()); redisStreams.add(redisAPI); - return xreadMulti(redisAPI, incomingConfig); + Multi> publisher = xreadMulti(redisAPI, incomingConfig); + if(Boolean.TRUE.equals(incomingConfig.getBroadcast())){ + publisher = publisher.broadcast().toAllSubscribers(); + } + return publisher; } /** @@ -275,7 +290,9 @@ protected Multi> xreadMulti(RedisStreams redisAPI, RedisStreamsC .invoke(() -> { consumerCancelled = true; log.tracev("Subscription for channel [{0}] has been cancelled", incomingConfig.getChannel()); - }); + }) + // Ensure execution on Vert.x context in order to have context propagation + .emitOn(cmd -> vertx.getOrCreateContext().runOnContext(cmd)); } /**