Skip to content

Commit a66ccb2

Browse files
committed
feat(watermill): add watermill integration with observability support
- Add watermill client with Kafka and RabbitMQ backends - Implement OTEL tracing and metrics middleware - Add configurable retry middleware via config - Fix logger adapter With() method to properly merge fields - Add graceful shutdown with error collection - Fix resource leak in subscriber error path - Use explicit meter/tracer providers (DI pattern) - Add metric descriptions and proper units (seconds) - Fix context extraction in PublisherWrapper
1 parent ce2c639 commit a66ccb2

20 files changed

+2510
-0
lines changed

go.work

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ use (
2929
./transactional_outbox
3030
./types
3131
./uow
32+
./watermill
3233
)

go.work.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ github.com/Djarvur/go-err113 v0.1.1/go.mod h1:IaWJdYFLg76t2ihfflPZnM1LIQszWOsFDh
3434
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0/go.mod h1:P4WPRUkOhJC13W//jWpyfJNDAIpvRbAUIYLX/4jtlE0=
3535
github.com/MirrexOne/unqueryvet v1.2.1/go.mod h1:IWwCwMQlSWjAIteW0t+28Q5vouyktfujzYznSIWiuOg=
3636
github.com/OpenPeeDeeP/depguard/v2 v2.2.1/go.mod h1:q4DKzC4UcVaAvcfd41CZh0PWpGgzrVxUYBlgKNGquUo=
37+
github.com/Shopify/toxiproxy/v2 v2.5.0/go.mod h1:yhM2epWtAmel9CB8r2+L+PCmhH6yH2pITaPAo7jxJl0=
3738
github.com/alecthomas/chroma/v2 v2.20.0/go.mod h1:e7tViK0xh/Nf4BYHl00ycY6rV7b8iXBksI9E359yNmA=
3839
github.com/alecthomas/go-check-sumtype v0.3.1/go.mod h1:A8TSiN3UPRw3laIgWEUOHHLPa6/r9MtoigdlP5h3K/E=
3940
github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE=
@@ -254,6 +255,7 @@ github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR
254255
github.com/sivchari/containedctx v1.0.3/go.mod h1:c1RDvCbnJLtH4lLcYD/GqwiBSSf4F5Qk0xld2rBqzJ4=
255256
github.com/snowflakedb/gosnowflake v1.6.19/go.mod h1:FM1+PWUdwB9udFDsXdfD58NONC0m+MlOSmQRvimobSM=
256257
github.com/sonatard/noctx v0.4.0/go.mod h1:64XdbzFb18XL4LporKXp8poqZtPKbCrqQ402CV+kJas=
258+
github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
257259
github.com/sourcegraph/go-diff v0.7.0/go.mod h1:iBszgVvyxdc8SFZ7gm69go2KDdt3ag071iBaWPF6cjs=
258260
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
259261
github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs=

mq/go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ github.com/ClickHouse/clickhouse-go v1.3.12/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhH
4646
github.com/IBM/sarama v1.46.3 h1:njRsX6jNlnR+ClJ8XmkO+CM4unbrNr/2vB5KK6UA+IE=
4747
github.com/IBM/sarama v1.46.3/go.mod h1:GTUYiF9DMOZVe3FwyGT+dtSPceGFIgA+sPc5u6CBwko=
4848
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
49+
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
4950
github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw=
5051
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
5152
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
type contextKey int
9+
10+
const (
11+
_ contextKey = iota
12+
partitionContextKey
13+
partitionOffsetContextKey
14+
timestampContextKey
15+
keyContextKey
16+
)
17+
18+
func setPartitionToCtx(ctx context.Context, partition int32) context.Context {
19+
return context.WithValue(ctx, partitionContextKey, partition)
20+
}
21+
22+
// MessagePartitionFromCtx returns Kafka partition of the consumed message
23+
func MessagePartitionFromCtx(ctx context.Context) (int32, bool) {
24+
partition, ok := ctx.Value(partitionContextKey).(int32)
25+
return partition, ok
26+
}
27+
28+
func setPartitionOffsetToCtx(ctx context.Context, offset int64) context.Context {
29+
return context.WithValue(ctx, partitionOffsetContextKey, offset)
30+
}
31+
32+
// MessagePartitionOffsetFromCtx returns Kafka partition offset of the consumed message
33+
func MessagePartitionOffsetFromCtx(ctx context.Context) (int64, bool) {
34+
offset, ok := ctx.Value(partitionOffsetContextKey).(int64)
35+
return offset, ok
36+
}
37+
38+
func setMessageTimestampToCtx(ctx context.Context, timestamp time.Time) context.Context {
39+
return context.WithValue(ctx, timestampContextKey, timestamp)
40+
}
41+
42+
// MessageTimestampFromCtx returns Kafka internal timestamp of the consumed message
43+
func MessageTimestampFromCtx(ctx context.Context) (time.Time, bool) {
44+
timestamp, ok := ctx.Value(timestampContextKey).(time.Time)
45+
return timestamp, ok
46+
}
47+
48+
func setMessageKeyToCtx(ctx context.Context, key []byte) context.Context {
49+
return context.WithValue(ctx, keyContextKey, key)
50+
}
51+
52+
// MessageKeyFromCtx returns Kafka internal key of the consumed message
53+
func MessageKeyFromCtx(ctx context.Context) ([]byte, bool) {
54+
key, ok := ctx.Value(keyContextKey).([]byte)
55+
return key, ok
56+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package kafka
2+
3+
import (
4+
"github.com/IBM/sarama"
5+
"github.com/ThreeDotsLabs/watermill/message"
6+
"github.com/pkg/errors"
7+
)
8+
9+
const UUIDHeaderKey = "_watermill_message_uuid"
10+
11+
// Marshaler marshals Watermill's message to Kafka message.
12+
type Marshaler interface {
13+
Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
14+
}
15+
16+
// Unmarshaler unmarshals Kafka's message to Watermill's message.
17+
type Unmarshaler interface {
18+
Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
19+
}
20+
21+
type MarshalerUnmarshaler interface {
22+
Marshaler
23+
Unmarshaler
24+
}
25+
26+
type DefaultMarshaler struct{}
27+
28+
func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
29+
if value := msg.Metadata.Get(UUIDHeaderKey); value != "" {
30+
return nil, errors.Errorf("metadata %s is reserved by watermill for message UUID", UUIDHeaderKey)
31+
}
32+
33+
headers := []sarama.RecordHeader{{
34+
Key: []byte(UUIDHeaderKey),
35+
Value: []byte(msg.UUID),
36+
}}
37+
for key, value := range msg.Metadata {
38+
headers = append(headers, sarama.RecordHeader{
39+
Key: []byte(key),
40+
Value: []byte(value),
41+
})
42+
}
43+
44+
return &sarama.ProducerMessage{
45+
Topic: topic,
46+
Value: sarama.ByteEncoder(msg.Payload),
47+
Headers: headers,
48+
}, nil
49+
}
50+
51+
func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*message.Message, error) {
52+
var messageID string
53+
metadata := make(message.Metadata, len(kafkaMsg.Headers))
54+
55+
for _, header := range kafkaMsg.Headers {
56+
if string(header.Key) == UUIDHeaderKey {
57+
messageID = string(header.Value)
58+
} else {
59+
metadata.Set(string(header.Key), string(header.Value))
60+
}
61+
}
62+
63+
msg := message.NewMessage(messageID, kafkaMsg.Value)
64+
msg.Metadata = metadata
65+
66+
return msg, nil
67+
}
68+
69+
type GeneratePartitionKey func(topic string, msg *message.Message) (string, error)
70+
71+
type kafkaJsonWithPartitioning struct {
72+
DefaultMarshaler
73+
74+
generatePartitionKey GeneratePartitionKey
75+
}
76+
77+
func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler {
78+
return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey}
79+
}
80+
81+
func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
82+
kafkaMsg, err := j.DefaultMarshaler.Marshal(topic, msg)
83+
if err != nil {
84+
return nil, err
85+
}
86+
87+
key, err := j.generatePartitionKey(topic, msg)
88+
if err != nil {
89+
return nil, errors.Wrap(err, "cannot generate partition key")
90+
}
91+
kafkaMsg.Key = sarama.ByteEncoder(key)
92+
93+
return kafkaMsg, nil
94+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package kafka_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/IBM/sarama"
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/ThreeDotsLabs/watermill"
11+
"github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
12+
"github.com/ThreeDotsLabs/watermill/message"
13+
)
14+
15+
func TestDefaultMarshaler_MarshalUnmarshal(t *testing.T) {
16+
m := kafka.DefaultMarshaler{}
17+
18+
msg := message.NewMessage(watermill.NewUUID(), []byte("payload"))
19+
msg.Metadata.Set("foo", "bar")
20+
21+
marshaled, err := m.Marshal("topic", msg)
22+
require.NoError(t, err)
23+
24+
unmarshaledMsg, err := m.Unmarshal(producerToConsumerMessage(marshaled))
25+
require.NoError(t, err)
26+
27+
assert.True(t, msg.Equals(unmarshaledMsg))
28+
}
29+
30+
func BenchmarkDefaultMarshaler_Marshal(b *testing.B) {
31+
m := kafka.DefaultMarshaler{}
32+
33+
msg := message.NewMessage(watermill.NewUUID(), []byte("payload"))
34+
msg.Metadata.Set("foo", "bar")
35+
36+
for i := 0; i < b.N; i++ {
37+
_, _ = m.Marshal("foo", msg)
38+
}
39+
}
40+
41+
func BenchmarkDefaultMarshaler_Unmarshal(b *testing.B) {
42+
m := kafka.DefaultMarshaler{}
43+
44+
msg := message.NewMessage(watermill.NewUUID(), []byte("payload"))
45+
msg.Metadata.Set("foo", "bar")
46+
47+
marshaled, err := m.Marshal("foo", msg)
48+
if err != nil {
49+
b.Fatal(err)
50+
}
51+
52+
consumedMsg := producerToConsumerMessage(marshaled)
53+
54+
for i := 0; i < b.N; i++ {
55+
_, _ = m.Unmarshal(consumedMsg)
56+
}
57+
}
58+
59+
func TestWithPartitioningMarshaler_MarshalUnmarshal(t *testing.T) {
60+
m := kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) {
61+
return msg.Metadata.Get("partition"), nil
62+
})
63+
64+
partitionKey := "1"
65+
msg := message.NewMessage(watermill.NewUUID(), []byte("payload"))
66+
msg.Metadata.Set("partition", partitionKey)
67+
68+
producerMsg, err := m.Marshal("topic", msg)
69+
require.NoError(t, err)
70+
71+
unmarshaledMsg, err := m.Unmarshal(producerToConsumerMessage(producerMsg))
72+
require.NoError(t, err)
73+
74+
assert.True(t, msg.Equals(unmarshaledMsg))
75+
76+
assert.NoError(t, err)
77+
78+
producerKey, err := producerMsg.Key.Encode()
79+
require.NoError(t, err)
80+
81+
assert.Equal(t, string(producerKey), partitionKey)
82+
}
83+
84+
func producerToConsumerMessage(producerMessage *sarama.ProducerMessage) *sarama.ConsumerMessage {
85+
var key []byte
86+
87+
if producerMessage.Key != nil {
88+
var err error
89+
key, err = producerMessage.Key.Encode()
90+
if err != nil {
91+
panic(err)
92+
}
93+
}
94+
95+
var value []byte
96+
if producerMessage.Value != nil {
97+
var err error
98+
value, err = producerMessage.Value.Encode()
99+
if err != nil {
100+
panic(err)
101+
}
102+
}
103+
104+
var headers []*sarama.RecordHeader
105+
for i := range producerMessage.Headers {
106+
headers = append(headers, &producerMessage.Headers[i])
107+
}
108+
109+
return &sarama.ConsumerMessage{
110+
Key: key,
111+
Value: value,
112+
Topic: producerMessage.Topic,
113+
Partition: producerMessage.Partition,
114+
Offset: producerMessage.Offset,
115+
Timestamp: producerMessage.Timestamp,
116+
Headers: headers,
117+
}
118+
}

0 commit comments

Comments
 (0)