Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/connector/configuration-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,17 @@ For example if you use `quarkus.redis.*my-redis*.hosts` configuration then you c
| INCOMING

| `xread-noack`
| Include the `NOACK` parameter in the `XREADGROUP` call
| Include the `NOACK` parameter in the `XREADGROUP` call.
| `true`
| No
| INCOMING

| `broadcast`
| Allow the received entries to be consumed by multiple channels.
| `false`
| No
| INCOMING

| `xadd-maxlen`
| The maximum number of entries to keep in the stream (trims old entries).
| -
Expand Down
10 changes: 10 additions & 0 deletions docs/migration/migration110to120.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@
== reactive-messaging-redisstream-connector

* RedisStreamsConnector#prudentRun handling fix, separate handling of the flag per config.
* Consume messages on vert.x context to retain context data in all cases.
* Added `broadcast` incoming configuration to allow multiple consumers on the same channel (ie. multiple methods with `@Incoming` for the same channel).

=== Migration

Changes are backwards compatible doesn't need any migration.

== quarkus-reactive-messaging-redisstream-extension-sample

* Added both imperative and reactive example for `@Incoming` usage.

=== Migration

Changes are backwards compatible doesn't need any migration.
16 changes: 10 additions & 6 deletions quarkus-reactive-messaging-redisstream-extension-sample/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-jackson</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.quarkus</groupId>-->
<!-- <artifactId>quarkus-messaging-kafka</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
<artifactId>quarkus-config-yaml</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -73,10 +77,6 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-amqp</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-redis-client</artifactId>
Expand All @@ -89,6 +89,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-fault-tolerance</artifactId>
</dependency>
<dependency>
<groupId>hu.icellmobilsoft.reactive.messaging.redisstream</groupId>
<artifactId>quarkus-reactive-messaging-redisstream-additional-fields</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
*/
package hu.icellmobilsoft.quarkus.sample;

import hu.icellmobilsoft.reactive.messaging.redis.streams.metadata.IncomingRedisStreamMetadata;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Blocking;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
Expand All @@ -35,10 +37,11 @@
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.MDC;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import hu.icellmobilsoft.reactive.messaging.redis.streams.metadata.IncomingRedisStreamMetadata;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Blocking;

/**
* Class that handles messages
Expand Down Expand Up @@ -99,6 +102,32 @@ public void sendMessage(String message) {
@Blocking(ordered = false, value = "incoming-pool")
@Retry(maxRetries = 2)
public String toUpperCase(String payload) {
return doAction(payload);
}

/**
* Consume the message from the "words-in" channel using reactive signature.
*
* @param message
* reactive message containing the payload
*
* @return a void completion stage
*/
@Incoming("words-in")
@Blocking(ordered = false, value = "incoming-pool")
@Retry(maxRetries = 2)
public CompletionStage<Void> processReactive(Message<String> message) {
try {
String result = doAction(message.getPayload());
Log.infov("Processed message: {0}", result);
return message.ack();
} catch (Exception e) {
Log.error("Error during logging metadata", e);
return message.nack(e);
}
}

private String doAction(String payload) {
Log.infov("Message received: [{0}]", payload);
try {
TimeUnit.MILLISECONDS.sleep(2000);
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
mp:
messaging:
connector:
reactive-messaging-redis-streams:
graceful-timeout-ms: 10000
incoming:
words-in:
connector: reactive-messaging-redis-streams
group: sample-service
stream-key: sample-stream
xread-block-ms: 60000
#connection-key: redis2
broadcast: true
outgoing:
words-out:
connector: reactive-messaging-redis-streams
stream-key: sample-stream
xadd-maxlen: 1000
xadd-ttl-ms: 100000
quarkus:
redis:
max-pool-size: 60
max-pool-waiting: 100
# redis:
# redis2:
# hosts: redis://localhost:6379
log:
category:
"hu.icellmobilsoft":
# level: TRACE
min-level: TRACE
"hu.icellmobilsoft.quarkus.extension.redisstream.runtime.QuarkusRedisStreamsAdapter":
level: TRACE
console:
format: '%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) [sid:%X{extSessionId}] %s%e%n'
smallrye:
messaging:
worker:
incoming-pool:
max-concurrency: 100
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.vertx.mutiny.core.Vertx;

/**
* Microprofile Reactive Streams connector for Redis Streams integration.
Expand All @@ -88,6 +90,8 @@
@ConnectorAttribute(name = "xread-noack", description = "Include the NOACK parameter in the XREADGROUP call", type = "boolean",
defaultValue = "true",
direction = ConnectorAttribute.Direction.INCOMING)
@ConnectorAttribute(name = "broadcast", description = "Allow the received entries to be consumed by multiple channels", type = "boolean", defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING)
@ConnectorAttribute(name = "xadd-maxlen", description = "The maximum number of entries to keep in the stream", type = "int",
direction = ConnectorAttribute.Direction.OUTGOING)
@ConnectorAttribute(name = "xadd-exact-maxlen", description = "Use exact trimming for MAXLEN parameter", type = "boolean", defaultValue = "false",
Expand All @@ -110,13 +114,15 @@ public class RedisStreamsConnector implements InboundConnector, OutboundConnecto

private final RedisStreamsProducer redisStreamsProducer;
private String consumer;
private Vertx vertx;
private volatile boolean consumerCancelled = false;
private final ConcurrentHashMap<String, AtomicBoolean> prudentRunMap = new ConcurrentHashMap<>();
private final List<Flow.Subscription> subscriptions = new CopyOnWriteArrayList<>();
private final List<RedisStreams> redisStreams = new CopyOnWriteArrayList<>();
private final Set<String> underProcessing = ConcurrentHashMap.newKeySet();
private final ReducableSemaphore shutdownPermit = new ReducableSemaphore(1);
private final Integer gracefulShutdownTimeout;
private final ExecutionHolder executionHolder;

/**
* Constructs a RedisStreamsConnector with the specified CDI RedisStreamsProducer.
Expand All @@ -125,13 +131,17 @@ public class RedisStreamsConnector implements InboundConnector, OutboundConnecto
* the RedisStreamsProducer to be injected
* @param gracefulShutdownTimeout
* graceful timeout config in ms (default {@literal 60_000})
* @param executionHolder
* the reactive ExecutionHolder to be injected
*/
@Inject
public RedisStreamsConnector(RedisStreamsProducer redisStreamsProducer,
@ConfigProperty(name = ConnectorFactory.CONNECTOR_PREFIX + REACTIVE_MESSAGING_REDIS_STREAMS_CONNECTOR + ".graceful-timeout-ms",
defaultValue = "60000") Integer gracefulShutdownTimeout) {
defaultValue = "60000") Integer gracefulShutdownTimeout,
ExecutionHolder executionHolder) {
this.redisStreamsProducer = redisStreamsProducer;
this.gracefulShutdownTimeout = gracefulShutdownTimeout;
this.executionHolder = executionHolder;
}

/**
Expand All @@ -140,6 +150,7 @@ public RedisStreamsConnector(RedisStreamsProducer redisStreamsProducer,
@PostConstruct
public void init() {
this.consumer = UUID.randomUUID().toString();
this.vertx = executionHolder.vertx();
}

/**
Expand Down Expand Up @@ -196,7 +207,11 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {

RedisStreams redisAPI = redisStreamsProducer.produce(incomingConfig.getConnectionKey());
redisStreams.add(redisAPI);
return xreadMulti(redisAPI, incomingConfig);
Multi<Message<Object>> publisher = xreadMulti(redisAPI, incomingConfig);
if(Boolean.TRUE.equals(incomingConfig.getBroadcast())){
publisher = publisher.broadcast().toAllSubscribers();
}
return publisher;
}

/**
Expand Down Expand Up @@ -275,7 +290,9 @@ protected Multi<Message<Object>> xreadMulti(RedisStreams redisAPI, RedisStreamsC
.invoke(() -> {
consumerCancelled = true;
log.tracev("Subscription for channel [{0}] has been cancelled", incomingConfig.getChannel());
});
})
// Ensure execution on Vert.x context in order to have context propagation
.emitOn(cmd -> vertx.getOrCreateContext().runOnContext(cmd));
}

/**
Expand Down