Skip to content

Commit d6ffc8c

Browse files
committed
#27 documentation and broadcast consuming
1 parent 5b34409 commit d6ffc8c

File tree

5 files changed

+63
-11
lines changed

5 files changed

+63
-11
lines changed

docs/connector/configuration-reference.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,17 @@ For example if you use `quarkus.redis.*my-redis*.hosts` configuration then you c
4747
| INCOMING
4848

4949
| `xread-noack`
50-
| Include the `NOACK` parameter in the `XREADGROUP` call
50+
| Include the `NOACK` parameter in the `XREADGROUP` call.
5151
| `true`
5252
| No
5353
| INCOMING
5454

55+
| `broadcast`
56+
| Allow the received entries to be consumed by multiple channels.
57+
| `false`
58+
| No
59+
| INCOMING
60+
5561
| `xadd-maxlen`
5662
| The maximum number of entries to keep in the stream (trims old entries).
5763
| -

docs/migration/migration110to120.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,17 @@
33
== reactive-messaging-redisstream-connector
44

55
* RedisStreamsConnector#prudentRun handling fix, separate handling of the flag per config.
6+
* Consume messages on vert.x context to retain context data in all cases.
7+
* Added `broadcast` incoming configuration to allow multiple consumers on the same channel (ie. multiple methods with `@Incoming` for the same channel).
68

79
=== Migration
810

911
Changes are backwards compatible doesn't need any migration.
12+
13+
== quarkus-reactive-messaging-redisstream-extension-sample
14+
15+
* Added both imperative and reactive example for `@Incoming` usage.
16+
17+
=== Migration
18+
19+
Changes are backwards compatible doesn't need any migration.

quarkus-reactive-messaging-redisstream-extension-sample/src/main/java/hu/icellmobilsoft/quarkus/sample/MyMessagingApplication.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@
1919
*/
2020
package hu.icellmobilsoft.quarkus.sample;
2121

22-
import hu.icellmobilsoft.reactive.messaging.redis.streams.metadata.IncomingRedisStreamMetadata;
23-
import io.quarkus.logging.Log;
24-
import io.quarkus.runtime.StartupEvent;
25-
import io.smallrye.mutiny.Uni;
26-
import io.smallrye.reactive.messaging.annotations.Blocking;
22+
import java.util.Optional;
23+
import java.util.UUID;
24+
import java.util.concurrent.CompletionStage;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.stream.Stream;
27+
2728
import jakarta.enterprise.context.ApplicationScoped;
2829
import jakarta.enterprise.event.Observes;
2930
import jakarta.inject.Inject;
31+
3032
import org.eclipse.microprofile.faulttolerance.Retry;
3133
import org.eclipse.microprofile.reactive.messaging.Channel;
3234
import org.eclipse.microprofile.reactive.messaging.Emitter;
@@ -35,10 +37,11 @@
3537
import org.eclipse.microprofile.reactive.messaging.Outgoing;
3638
import org.jboss.logging.MDC;
3739

38-
import java.util.Optional;
39-
import java.util.UUID;
40-
import java.util.concurrent.TimeUnit;
41-
import java.util.stream.Stream;
40+
import hu.icellmobilsoft.reactive.messaging.redis.streams.metadata.IncomingRedisStreamMetadata;
41+
import io.quarkus.logging.Log;
42+
import io.quarkus.runtime.StartupEvent;
43+
import io.smallrye.mutiny.Uni;
44+
import io.smallrye.reactive.messaging.annotations.Blocking;
4245

4346
/**
4447
* Class that handles messages
@@ -99,6 +102,32 @@ public void sendMessage(String message) {
99102
@Blocking(ordered = false, value = "incoming-pool")
100103
@Retry(maxRetries = 2)
101104
public String toUpperCase(String payload) {
105+
return doAction(payload);
106+
}
107+
108+
/**
109+
* Consume the message from the "words-in" channel using reactive signature.
110+
*
111+
* @param message
112+
* reactive message containing the payload
113+
*
114+
* @return a void completion stage
115+
*/
116+
@Incoming("words-in")
117+
@Blocking(ordered = false, value = "incoming-pool")
118+
@Retry(maxRetries = 2)
119+
public CompletionStage<Void> processReactive(Message<String> message) {
120+
try {
121+
String result = doAction(message.getPayload());
122+
Log.infov("Processed message: {0}", result);
123+
return message.ack();
124+
} catch (Exception e) {
125+
Log.error("Error during logging metadata", e);
126+
return message.nack(e);
127+
}
128+
}
129+
130+
private String doAction(String payload) {
102131
Log.infov("Message received: [{0}]", payload);
103132
try {
104133
TimeUnit.MILLISECONDS.sleep(2000);

quarkus-reactive-messaging-redisstream-extension-sample/src/main/resources/application.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mp:
1010
stream-key: sample-stream
1111
xread-block-ms: 60000
1212
#connection-key: redis2
13+
broadcast: true
1314
outgoing:
1415
words-out:
1516
connector: reactive-messaging-redis-streams

reactive-messaging-redisstream-connector/src/main/java/hu/icellmobilsoft/reactive/messaging/redis/streams/RedisStreamsConnector.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@
9090
@ConnectorAttribute(name = "xread-noack", description = "Include the NOACK parameter in the XREADGROUP call", type = "boolean",
9191
defaultValue = "true",
9292
direction = ConnectorAttribute.Direction.INCOMING)
93+
@ConnectorAttribute(name = "broadcast", description = "Allow the received entries to be consumed by multiple channels", type = "boolean", defaultValue = "false",
94+
direction = ConnectorAttribute.Direction.INCOMING)
9395
@ConnectorAttribute(name = "xadd-maxlen", description = "The maximum number of entries to keep in the stream", type = "int",
9496
direction = ConnectorAttribute.Direction.OUTGOING)
9597
@ConnectorAttribute(name = "xadd-exact-maxlen", description = "Use exact trimming for MAXLEN parameter", type = "boolean", defaultValue = "false",
@@ -205,7 +207,11 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
205207

206208
RedisStreams redisAPI = redisStreamsProducer.produce(incomingConfig.getConnectionKey());
207209
redisStreams.add(redisAPI);
208-
return xreadMulti(redisAPI, incomingConfig);
210+
Multi<Message<Object>> publisher = xreadMulti(redisAPI, incomingConfig);
211+
if(Boolean.TRUE.equals(incomingConfig.getBroadcast())){
212+
publisher = publisher.broadcast().toAllSubscribers();
213+
}
214+
return publisher;
209215
}
210216

211217
/**

0 commit comments

Comments
 (0)