22
33import cn .typesafe .km .entity .Cluster ;
44import cn .typesafe .km .service .dto .ConsumerMessage ;
5+ import cn .typesafe .km .service .dto .LiveMessage ;
56import cn .typesafe .km .service .dto .TopicData ;
67import lombok .SneakyThrows ;
78import org .apache .kafka .clients .consumer .ConsumerRecord ;
1011import org .apache .kafka .clients .producer .ProducerRecord ;
1112import org .apache .kafka .clients .producer .RecordMetadata ;
1213import org .apache .kafka .common .TopicPartition ;
14+ import org .springframework .http .codec .ServerSentEvent ;
1315import org .springframework .stereotype .Service ;
1416import org .springframework .util .CollectionUtils ;
1517import org .springframework .util .StringUtils ;
18+ import reactor .core .publisher .Flux ;
1619
1720import javax .annotation .Resource ;
1821import java .time .Duration ;
@@ -81,7 +84,81 @@ public List<ConsumerMessage> data(String clusterId, String topicName, Integer tP
8184 }
8285
8386 return records
84- .subList (0 , Math .min (count , records .size ()))
87+ .subList (0 , Math .min (count , records .size ()))
88+ .stream ()
89+ .map (record -> {
90+ int partition = record .partition ();
91+ long timestamp = record .timestamp ();
92+ String key = record .key ();
93+ String value = record .value ();
94+ long offset = record .offset ();
95+
96+ ConsumerMessage consumerMessage = new ConsumerMessage ();
97+ consumerMessage .setTopic (topicName );
98+ consumerMessage .setOffset (offset );
99+ consumerMessage .setPartition (partition );
100+ consumerMessage .setTimestamp (timestamp );
101+ consumerMessage .setKey (key );
102+ consumerMessage .setValue (value );
103+
104+ return consumerMessage ;
105+ }).collect (Collectors .toList ());
106+ }
107+ }
108+
109+ @ SneakyThrows
110+ public long sendData (String clusterId , String topic , TopicData topicData ) {
111+ Cluster cluster = clusterService .findById (clusterId );
112+ KafkaProducer <String , String > kafkaProducer = clusterService .createProducer (cluster .getServers (), cluster .getSecurityProtocol (), cluster .getSaslMechanism (), cluster .getAuthUsername (), cluster .getAuthPassword ());
113+ ProducerRecord <String , String > producerRecord = new ProducerRecord <>(topic , topicData .getPartition (), topicData .getKey (), topicData .getValue ());
114+ RecordMetadata recordMetadata = kafkaProducer .send (producerRecord ).get ();
115+ return recordMetadata .offset ();
116+ }
117+
118+ public Flux <ServerSentEvent <LiveMessage >> liveData (String clusterId , String topicName , Integer tPartition , String keyFilter , String valueFilter ) {
119+
120+ KafkaConsumer <String , String > kafkaConsumer = clusterService .createConsumer (clusterId );
121+ TopicPartition topicPartition = new TopicPartition (topicName , tPartition );
122+ List <TopicPartition > topicPartitions = Collections .singletonList (topicPartition );
123+ kafkaConsumer .assign (topicPartitions );
124+
125+ Long endOffset = kafkaConsumer .endOffsets (topicPartitions ).get (topicPartition );
126+ kafkaConsumer .seek (topicPartition , endOffset );
127+
128+ return Flux
129+ .interval (Duration .ofSeconds (1 ))
130+ .doFinally (x -> {
131+ kafkaConsumer .close ();
132+ })
133+ .map (sequence -> {
134+
135+ List <ConsumerRecord <String , String >> records = new ArrayList <>();
136+
137+ List <ConsumerRecord <String , String >> polled = kafkaConsumer .poll (Duration .ofMillis (200 )).records (topicPartition );
138+
139+ if (!CollectionUtils .isEmpty (polled )) {
140+
141+ for (ConsumerRecord <String , String > consumerRecord : polled ) {
142+ if (StringUtils .hasText (keyFilter )) {
143+ String key = consumerRecord .key ();
144+ if (StringUtils .hasText (key ) && key .toLowerCase ().contains (keyFilter .toLowerCase ())) {
145+ records .add (consumerRecord );
146+ }
147+ continue ;
148+ }
149+
150+ if (StringUtils .hasText (valueFilter )) {
151+ String value = consumerRecord .value ();
152+ if (StringUtils .hasText (value ) && value .toLowerCase ().contains (valueFilter .toLowerCase ())) {
153+ records .add (consumerRecord );
154+ }
155+ continue ;
156+ }
157+ records .add (consumerRecord );
158+ }
159+ }
160+
161+ List <ConsumerMessage > data = records
85162 .stream ()
86163 .map (record -> {
87164 int partition = record .partition ();
@@ -100,15 +177,21 @@ public List<ConsumerMessage> data(String clusterId, String topicName, Integer tP
100177
101178 return consumerMessage ;
102179 }).collect (Collectors .toList ());
103- }
104- }
105180
106- @ SneakyThrows
107- public long sendData (String clusterId , String topic , TopicData topicData ) {
108- Cluster cluster = clusterService .findById (clusterId );
109- KafkaProducer <String , String > kafkaProducer = clusterService .createProducer (cluster .getServers (), cluster .getSecurityProtocol (), cluster .getSaslMechanism (), cluster .getAuthUsername (), cluster .getAuthPassword ());
110- ProducerRecord <String , String > producerRecord = new ProducerRecord <>(topic ,topicData .getPartition (), topicData .getKey (), topicData .getValue ());
111- RecordMetadata recordMetadata = kafkaProducer .send (producerRecord ).get ();
112- return recordMetadata .offset ();
181+ Long currBeginningOffset = kafkaConsumer .beginningOffsets (topicPartitions ).get (topicPartition );
182+ Long currEndOffset = kafkaConsumer .endOffsets (topicPartitions ).get (topicPartition );
183+
184+ LiveMessage liveMessage = new LiveMessage ();
185+ liveMessage .setBeginningOffset (currBeginningOffset );
186+ liveMessage .setEndOffset (currEndOffset );
187+ liveMessage .setPartition (tPartition );
188+ liveMessage .setMessages (data );
189+
190+ return ServerSentEvent .<LiveMessage >builder ()
191+ .id (String .valueOf (sequence ))
192+ .event ("topic-message-event" )
193+ .data (liveMessage )
194+ .build ();
195+ });
113196 }
114197}
0 commit comments