Skip to content

Commit c5d4e2f

Browse files
committed
docs(watermill): translate README to English
1 parent a66ccb2 commit c5d4e2f

File tree

1 file changed

+106
-0
lines changed

1 file changed

+106
-0
lines changed

watermill/README.md

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Watermill SDK wrapper
2+
3+
A lightweight wrapper around [ThreeDotsLabs/watermill](https://watermill.io) that integrates observability and is ready for production use in Shortlink services.
4+
5+
## Features
6+
7+
- **Ready-to-use `Client`**: internally sets up `message.Router`, configures logger, global middleware (panic/retry/correlation), metrics, and OTEL tracing.
8+
- **Metrics + exemplars**: publish/consume counters and histograms with automatic `topic`, `trace_id`, `span_id` attributes.
9+
- **Tracing**: middleware extracts context from Watermill metadata, creates `watermill.consume` span and propagates context. On publish, creates `watermill.publish` span and writes TraceID/SpanID to message metadata.
10+
- **DLQ**: optional configuration sends messages after N errors to `<topic>.DLQ` with `DLQMessage` body (payload + metadata + error text + retry counter).
11+
- **Kafka backend**: `backends/kafka` contains a slight-fork wrapper of Watermill Kafka (publisher/subscriber + OTEL tracer). RabbitMQ is not yet implemented (stub).
12+
13+
## Installation
14+
15+
Add the module as a dependency:
16+
17+
```bash
18+
go get github.com/shortlink-org/go-sdk/watermill
19+
```
20+
21+
## Quick Start
22+
23+
```go
24+
ctx := context.Background()
25+
cfg := config.New() // github.com/shortlink-org/go-sdk/config
26+
log := logger.New(ctx) // github.com/shortlink-org/go-sdk/logger
27+
meter := monitoring.Metrics // see observability/metrics
28+
tracer := tracing.TracerProvider // see observability/tracing
29+
30+
backend, _ := kafka.NewSubscriber(...)
31+
client, err := watermill.New(ctx, log, cfg, backend, meter, tracer)
32+
if err != nil {
33+
log.Fatal(err)
34+
}
35+
36+
handler := client.Router.AddHandler(
37+
"example",
38+
"input.topic",
39+
client.Subscriber,
40+
"output.topic",
41+
client.Publisher,
42+
func(msg *message.Message) ([]*message.Message, error) {
43+
// business logic
44+
return nil, nil
45+
},
46+
)
47+
48+
go client.Router.Run(ctx)
49+
defer client.Close()
50+
```
51+
52+
## Configuration
53+
54+
Values are read from `github.com/shortlink-org/go-sdk/config.Config` (Viper). Important keys:
55+
56+
| Key | Default | Description |
57+
|-----|---------|-------------|
58+
| `WATERMILL_RETRY_MAX_RETRIES` | `3` | number of retries in retry middleware |
59+
| `WATERMILL_RETRY_INITIAL_INTERVAL` | `150ms` | initial retry interval |
60+
| `WATERMILL_RETRY_MAX_INTERVAL` | `2s` | maximum interval between retries |
61+
| `WATERMILL_RETRY_MULTIPLIER` | `2.0` | exponential backoff multiplier |
62+
| `WATERMILL_DLQ_ENABLED` | `false` | enable DLQ middleware |
63+
| `WATERMILL_DLQ_MAX_RETRIES` | `5` | number of errors allowed before sending to `<topic>.DLQ` |
64+
65+
## DLQ Message
66+
67+
After exceeding `maxRetries`, a JSON message is sent to `<topic>.DLQ`:
68+
69+
```json
70+
{
71+
"topic": "my-topic",
72+
"payload": "... base64 ...",
73+
"metadata": {"received_topic": "my-topic", "watermill_dlq_retry_count": "5"},
74+
"error": "handler error string",
75+
"retry_count": 5,
76+
"original_uuid": "<source message uuid>"
77+
}
78+
```
79+
80+
TraceID/SpanID are preserved in metadata, so downstream consumers can continue the trace.
81+
82+
## Observability
83+
84+
- **Metrics** — published via the provided `metric.MeterProvider`. Names:
85+
- `watermill_messages_published_total`
86+
- `watermill_messages_consumed_total`
87+
- `watermill_messages_failed_total`
88+
- `watermill_publish_latency_seconds`
89+
- `watermill_consume_latency_seconds`
90+
All metrics have `topic`, `trace_id`, `span_id` attributes. Errors are additionally tagged with `stage=publish|consume` and `error` (truncated to 128 characters).
91+
92+
- **Tracing** — requires `trace.TracerProvider`. Middleware automatically extracts/injects context in Watermill metadata (`otel_trace_id`, `otel_span_id`).
93+
94+
## Kafka Backend
95+
96+
The `backends/kafka` directory contains a driver copied from Watermill with several improvements:
97+
98+
- OTEL support via `otelsarama`
99+
- helper functions for contexts (partition/offset/timestamp)
100+
101+
Usage is similar to upstream Watermill. See tests in `backends/kafka/pubsub_test.go` and configuration via `SubscriberConfig`/`PublisherConfig`.
102+
103+
## Limitations
104+
105+
- RabbitMQ backend is not yet implemented (file `backends/rabbit/rabbit.go`).
106+
- Limited automated tests in the package (use Watermill upstream tests for validation). Before release, run `go test ./watermill/...`.

0 commit comments

Comments
 (0)