Skip to content

Commit 84a8968

Browse files
Merge pull request #35 from i-Cell-Mobilsoft-Open-Source/feature/27-emit-consumer-multi-on-vertx-context
Feature/27 emit consumer multi on vertx context
2 parents b356304 + f0ede0f commit 84a8968

File tree

8 files changed

+125
-39
lines changed

8 files changed

+125
-39
lines changed

docs/connector/configuration-reference.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,17 @@ For example if you use `quarkus.redis.*my-redis*.hosts` configuration then you c
4747
| INCOMING
4848

4949
| `xread-noack`
50-
| Include the `NOACK` parameter in the `XREADGROUP` call
50+
| Include the `NOACK` parameter in the `XREADGROUP` call.
5151
| `true`
5252
| No
5353
| INCOMING
5454

55+
| `broadcast`
56+
| Allow the received entries to be consumed by multiple channels.
57+
| `false`
58+
| No
59+
| INCOMING
60+
5561
| `xadd-maxlen`
5662
| The maximum number of entries to keep in the stream (trims old entries).
5763
| -

docs/migration/migration110to120.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,17 @@
33
== reactive-messaging-redisstream-connector
44

55
* RedisStreamsConnector#prudentRun handling fix, separate handling of the flag per config.
6+
* Consume messages on vert.x context to retain context data in all cases.
7+
* Added `broadcast` incoming configuration to allow multiple consumers on the same channel (ie. multiple methods with `@Incoming` for the same channel).
68

79
=== Migration
810

911
Changes are backwards compatible doesn't need any migration.
12+
13+
== quarkus-reactive-messaging-redisstream-extension-sample
14+
15+
* Added both imperative and reactive example for `@Incoming` usage.
16+
17+
=== Migration
18+
19+
Changes are backwards compatible doesn't need any migration.

quarkus-reactive-messaging-redisstream-extension-sample/pom.xml

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,17 @@
5151
<groupId>io.quarkus</groupId>
5252
<artifactId>quarkus-rest-jackson</artifactId>
5353
</dependency>
54+
<!-- <dependency>-->
55+
<!-- <groupId>io.quarkus</groupId>-->
56+
<!-- <artifactId>quarkus-messaging-kafka</artifactId>-->
57+
<!-- </dependency>-->
5458
<dependency>
5559
<groupId>io.quarkus</groupId>
56-
<artifactId>quarkus-messaging-kafka</artifactId>
60+
<artifactId>quarkus-arc</artifactId>
5761
</dependency>
5862
<dependency>
5963
<groupId>io.quarkus</groupId>
60-
<artifactId>quarkus-arc</artifactId>
64+
<artifactId>quarkus-config-yaml</artifactId>
6165
</dependency>
6266
<dependency>
6367
<groupId>io.quarkus</groupId>
@@ -73,10 +77,6 @@
7377
<artifactId>rest-assured</artifactId>
7478
<scope>test</scope>
7579
</dependency>
76-
<dependency>
77-
<groupId>io.quarkus</groupId>
78-
<artifactId>quarkus-messaging-amqp</artifactId>
79-
</dependency>
8080
<dependency>
8181
<groupId>io.quarkus</groupId>
8282
<artifactId>quarkus-redis-client</artifactId>
@@ -89,6 +89,10 @@
8989
<groupId>io.quarkus</groupId>
9090
<artifactId>quarkus-smallrye-fault-tolerance</artifactId>
9191
</dependency>
92+
<dependency>
93+
<groupId>hu.icellmobilsoft.reactive.messaging.redisstream</groupId>
94+
<artifactId>quarkus-reactive-messaging-redisstream-additional-fields</artifactId>
95+
</dependency>
9296
</dependencies>
9397

9498
<build>

quarkus-reactive-messaging-redisstream-extension-sample/src/main/java/hu/icellmobilsoft/quarkus/sample/MyMessagingApplication.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@
1919
*/
2020
package hu.icellmobilsoft.quarkus.sample;
2121

22-
import hu.icellmobilsoft.reactive.messaging.redis.streams.metadata.IncomingRedisStreamMetadata;
23-
import io.quarkus.logging.Log;
24-
import io.quarkus.runtime.StartupEvent;
25-
import io.smallrye.mutiny.Uni;
26-
import io.smallrye.reactive.messaging.annotations.Blocking;
22+
import java.util.Optional;
23+
import java.util.UUID;
24+
import java.util.concurrent.CompletionStage;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.stream.Stream;
27+
2728
import jakarta.enterprise.context.ApplicationScoped;
2829
import jakarta.enterprise.event.Observes;
2930
import jakarta.inject.Inject;
31+
3032
import org.eclipse.microprofile.faulttolerance.Retry;
3133
import org.eclipse.microprofile.reactive.messaging.Channel;
3234
import org.eclipse.microprofile.reactive.messaging.Emitter;
@@ -35,10 +37,11 @@
3537
import org.eclipse.microprofile.reactive.messaging.Outgoing;
3638
import org.jboss.logging.MDC;
3739

38-
import java.util.Optional;
39-
import java.util.UUID;
40-
import java.util.concurrent.TimeUnit;
41-
import java.util.stream.Stream;
40+
import hu.icellmobilsoft.reactive.messaging.redis.streams.metadata.IncomingRedisStreamMetadata;
41+
import io.quarkus.logging.Log;
42+
import io.quarkus.runtime.StartupEvent;
43+
import io.smallrye.mutiny.Uni;
44+
import io.smallrye.reactive.messaging.annotations.Blocking;
4245

4346
/**
4447
* Class that handles messages
@@ -99,6 +102,32 @@ public void sendMessage(String message) {
99102
@Blocking(ordered = false, value = "incoming-pool")
100103
@Retry(maxRetries = 2)
101104
public String toUpperCase(String payload) {
105+
return doAction(payload);
106+
}
107+
108+
/**
109+
* Consume the message from the "words-in" channel using reactive signature.
110+
*
111+
* @param message
112+
* reactive message containing the payload
113+
*
114+
* @return a void completion stage
115+
*/
116+
@Incoming("words-in")
117+
@Blocking(ordered = false, value = "incoming-pool")
118+
@Retry(maxRetries = 2)
119+
public CompletionStage<Void> processReactive(Message<String> message) {
120+
try {
121+
String result = doAction(message.getPayload());
122+
Log.infov("Processed message: {0}", result);
123+
return message.ack();
124+
} catch (Exception e) {
125+
Log.error("Error during logging metadata", e);
126+
return message.nack(e);
127+
}
128+
}
129+
130+
private String doAction(String payload) {
102131
Log.infov("Message received: [{0}]", payload);
103132
try {
104133
TimeUnit.MILLISECONDS.sleep(2000);

quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application-amqp.properties

Lines changed: 0 additions & 5 deletions
This file was deleted.

quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.properties

Lines changed: 0 additions & 15 deletions
This file was deleted.
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
mp:
2+
messaging:
3+
connector:
4+
reactive-messaging-redis-streams:
5+
graceful-timeout-ms: 10000
6+
incoming:
7+
words-in:
8+
connector: reactive-messaging-redis-streams
9+
group: sample-service
10+
stream-key: sample-stream
11+
xread-block-ms: 60000
12+
#connection-key: redis2
13+
broadcast: true
14+
outgoing:
15+
words-out:
16+
connector: reactive-messaging-redis-streams
17+
stream-key: sample-stream
18+
xadd-maxlen: 1000
19+
xadd-ttl-ms: 100000
20+
quarkus:
21+
redis:
22+
max-pool-size: 60
23+
max-pool-waiting: 100
24+
# redis:
25+
# redis2:
26+
# hosts: redis://localhost:6379
27+
log:
28+
category:
29+
"hu.icellmobilsoft":
30+
# level: TRACE
31+
min-level: TRACE
32+
"hu.icellmobilsoft.quarkus.extension.redisstream.runtime.QuarkusRedisStreamsAdapter":
33+
level: TRACE
34+
console:
35+
format: '%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) [sid:%X{extSessionId}] %s%e%n'
36+
smallrye:
37+
messaging:
38+
worker:
39+
incoming-pool:
40+
max-concurrency: 100

reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@
6262
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
6363
import io.smallrye.reactive.messaging.connector.InboundConnector;
6464
import io.smallrye.reactive.messaging.connector.OutboundConnector;
65+
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
6566
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
6667
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
68+
import io.vertx.mutiny.core.Vertx;
6769

6870
/**
6971
* Microprofile Reactive Streams connector for Redis Streams integration.
@@ -88,6 +90,8 @@
8890
@ConnectorAttribute(name = "xread-noack", description = "Include the NOACK parameter in the XREADGROUP call", type = "boolean",
8991
defaultValue = "true",
9092
direction = ConnectorAttribute.Direction.INCOMING)
93+
@ConnectorAttribute(name = "broadcast", description = "Allow the received entries to be consumed by multiple channels", type = "boolean", defaultValue = "false",
94+
direction = ConnectorAttribute.Direction.INCOMING)
9195
@ConnectorAttribute(name = "xadd-maxlen", description = "The maximum number of entries to keep in the stream", type = "int",
9296
direction = ConnectorAttribute.Direction.OUTGOING)
9397
@ConnectorAttribute(name = "xadd-exact-maxlen", description = "Use exact trimming for MAXLEN parameter", type = "boolean", defaultValue = "false",
@@ -110,13 +114,15 @@ public class RedisStreamsConnector implements InboundConnector, OutboundConnecto
110114

111115
private final RedisStreamsProducer redisStreamsProducer;
112116
private String consumer;
117+
private Vertx vertx;
113118
private volatile boolean consumerCancelled = false;
114119
private final ConcurrentHashMap<String, AtomicBoolean> prudentRunMap = new ConcurrentHashMap<>();
115120
private final List<Flow.Subscription> subscriptions = new CopyOnWriteArrayList<>();
116121
private final List<RedisStreams> redisStreams = new CopyOnWriteArrayList<>();
117122
private final Set<String> underProcessing = ConcurrentHashMap.newKeySet();
118123
private final ReducableSemaphore shutdownPermit = new ReducableSemaphore(1);
119124
private final Integer gracefulShutdownTimeout;
125+
private final ExecutionHolder executionHolder;
120126

121127
/**
122128
* Constructs a RedisStreamsConnector with the specified CDI RedisStreamsProducer.
@@ -125,13 +131,17 @@ public class RedisStreamsConnector implements InboundConnector, OutboundConnecto
125131
* the RedisStreamsProducer to be injected
126132
* @param gracefulShutdownTimeout
127133
* graceful timeout config in ms (default {@literal 60_000})
134+
* @param executionHolder
135+
* the reactive ExecutionHolder to be injected
128136
*/
129137
@Inject
130138
public RedisStreamsConnector(RedisStreamsProducer redisStreamsProducer,
131139
@ConfigProperty(name = ConnectorFactory.CONNECTOR_PREFIX + REACTIVE_MESSAGING_REDIS_STREAMS_CONNECTOR + ".graceful-timeout-ms",
132-
defaultValue = "60000") Integer gracefulShutdownTimeout) {
140+
defaultValue = "60000") Integer gracefulShutdownTimeout,
141+
ExecutionHolder executionHolder) {
133142
this.redisStreamsProducer = redisStreamsProducer;
134143
this.gracefulShutdownTimeout = gracefulShutdownTimeout;
144+
this.executionHolder = executionHolder;
135145
}
136146

137147
/**
@@ -140,6 +150,7 @@ public RedisStreamsConnector(RedisStreamsProducer redisStreamsProducer,
140150
@PostConstruct
141151
public void init() {
142152
this.consumer = UUID.randomUUID().toString();
153+
this.vertx = executionHolder.vertx();
143154
}
144155

145156
/**
@@ -196,7 +207,11 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
196207

197208
RedisStreams redisAPI = redisStreamsProducer.produce(incomingConfig.getConnectionKey());
198209
redisStreams.add(redisAPI);
199-
return xreadMulti(redisAPI, incomingConfig);
210+
Multi<Message<Object>> publisher = xreadMulti(redisAPI, incomingConfig);
211+
if(Boolean.TRUE.equals(incomingConfig.getBroadcast())){
212+
publisher = publisher.broadcast().toAllSubscribers();
213+
}
214+
return publisher;
200215
}
201216

202217
/**
@@ -275,7 +290,9 @@ protected Multi<Message<Object>> xreadMulti(RedisStreams redisAPI, RedisStreamsC
275290
.invoke(() -> {
276291
consumerCancelled = true;
277292
log.tracev("Subscription for channel [{0}] has been cancelled", incomingConfig.getChannel());
278-
});
293+
})
294+
// Ensure execution on Vert.x context in order to have context propagation
295+
.emitOn(cmd -> vertx.getOrCreateContext().runOnContext(cmd));
279296
}
280297

281298
/**

0 commit comments

Comments
 (0)