Skip to content

Commit cc0dc96

Browse files
committed
http_client: refactoring
1 parent 94e94de commit cc0dc96

File tree

11 files changed

+1165
-0
lines changed

11 files changed

+1165
-0
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package types
2+
3+
import "errors"
4+
5+
var (
6+
ErrInvalidLimiterConfig = errors.New("http_client: invalid limiter config")
7+
ErrDeadlineTooClose = errors.New("http_client: deadline too close")
8+
)
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package types
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"time"
8+
9+
"github.com/prometheus/client_golang/prometheus"
10+
)
11+
12+
// Middleware wraps a RoundTripper to add functionality.
13+
// It receives the next RoundTripper in the chain and returns a new RoundTripper
14+
// that may intercept, modify, or observe requests and responses.
15+
type Middleware func(next http.RoundTripper) http.RoundTripper
16+
17+
// RoundTripperFunc allows using a function as a RoundTripper.
18+
type RoundTripperFunc func(*http.Request) (*http.Response, error)
19+
20+
func (f RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
21+
return f(r)
22+
}
23+
24+
// Limiter defines the interface for rate limiting.
25+
// Wait blocks until a request can be made according to the rate limit,
26+
// or until the context is cancelled. It returns the total time spent waiting
27+
// and any error that occurred (typically context.Canceled or context.DeadlineExceeded).
28+
type Limiter interface {
29+
Wait(ctx context.Context) (time.Duration, error)
30+
}
31+
32+
const (
33+
LabelClient = "client"
34+
LabelHost = "host"
35+
LabelMethod = "method"
36+
LabelSource = "source"
37+
)
38+
39+
type Metrics struct {
40+
RateLimitWaitSeconds *prometheus.HistogramVec
41+
RateLimit429Total *prometheus.CounterVec
42+
DeadlineCancelledTotal *prometheus.CounterVec
43+
}
44+
45+
func NewMetrics(namespace, subsystem string) *Metrics {
46+
return &Metrics{
47+
RateLimitWaitSeconds: prometheus.NewHistogramVec(
48+
prometheus.HistogramOpts{ //nolint:exhaustruct // Prometheus options have many optional fields
49+
Namespace: namespace,
50+
Subsystem: subsystem,
51+
Name: "rate_limit_wait_seconds",
52+
Help: "Time spent waiting due to rate limiting (server headers or local bucket).",
53+
Buckets: prometheus.DefBuckets,
54+
},
55+
[]string{LabelClient, LabelHost, LabelMethod, LabelSource},
56+
),
57+
RateLimit429Total: prometheus.NewCounterVec(
58+
prometheus.CounterOpts{ //nolint:exhaustruct // Prometheus options have many optional fields
59+
Namespace: namespace,
60+
Subsystem: subsystem,
61+
Name: "rate_limit_429_total",
62+
Help: "Total number of HTTP 429 responses.",
63+
},
64+
[]string{LabelClient, LabelHost, LabelMethod},
65+
),
66+
DeadlineCancelledTotal: prometheus.NewCounterVec(
67+
prometheus.CounterOpts{ //nolint:exhaustruct // Prometheus options have many optional fields
68+
Namespace: namespace,
69+
Subsystem: subsystem,
70+
Name: "deadline_canceled_total",
71+
Help: "Requests canceled early due to short deadline.",
72+
},
73+
[]string{LabelClient, LabelHost, LabelMethod},
74+
),
75+
}
76+
}
77+
78+
func (m *Metrics) Register(reg prometheus.Registerer) error {
79+
if err := reg.Register(m.RateLimitWaitSeconds); err != nil {
80+
return fmt.Errorf("register wait_seconds: %w", err)
81+
}
82+
83+
if err := reg.Register(m.RateLimit429Total); err != nil {
84+
return fmt.Errorf("register 429: %w", err)
85+
}
86+
87+
if err := reg.Register(m.DeadlineCancelledTotal); err != nil {
88+
return fmt.Errorf("register deadline_canceled: %w", err)
89+
}
90+
91+
return nil
92+
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package http_client
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/shortlink-org/go-sdk/http/client/internal/types"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestNewTokenBucketLimiter_InvalidConfig(t *testing.T) {
13+
_, err := NewTokenBucketLimiter(0, 10, 0)
14+
require.Error(t, err)
15+
require.Equal(t, types.ErrInvalidLimiterConfig, err)
16+
17+
_, err = NewTokenBucketLimiter(10, 0, 0)
18+
require.Error(t, err)
19+
require.Equal(t, types.ErrInvalidLimiterConfig, err)
20+
21+
_, err = NewTokenBucketLimiter(-1, 10, 0)
22+
require.Error(t, err)
23+
require.Equal(t, types.ErrInvalidLimiterConfig, err)
24+
25+
_, err = NewTokenBucketLimiter(10, -1, 0)
26+
require.Error(t, err)
27+
require.Equal(t, types.ErrInvalidLimiterConfig, err)
28+
}
29+
30+
func TestNewTokenBucketLimiter_JitterNormalization(t *testing.T) {
31+
limiter, err := NewTokenBucketLimiter(10, 10, -0.5)
32+
require.NoError(t, err)
33+
require.NotNil(t, limiter)
34+
35+
limiter2, err := NewTokenBucketLimiter(10, 10, 1.5)
36+
require.NoError(t, err)
37+
require.NotNil(t, limiter2)
38+
}
39+
40+
func TestTokenBucketLimiter_Wait_Immediate(t *testing.T) {
41+
limiter, err := NewTokenBucketLimiter(10, 10, 0)
42+
require.NoError(t, err)
43+
44+
ctx := context.Background()
45+
wait, err := limiter.Wait(ctx)
46+
require.NoError(t, err)
47+
require.Equal(t, time.Duration(0), wait)
48+
}
49+
50+
func TestTokenBucketLimiter_Wait_ContextCancellation(t *testing.T) {
51+
limiter, err := NewTokenBucketLimiter(0.1, 1, 0) // very slow rate
52+
require.NoError(t, err)
53+
54+
// Consume the token
55+
_, err = limiter.Wait(context.Background())
56+
require.NoError(t, err)
57+
58+
// Now we'll have to wait, but cancel the context
59+
ctx, cancel := context.WithCancel(context.Background())
60+
cancel() // cancel immediately
61+
62+
wait, err := limiter.Wait(ctx)
63+
require.Error(t, err)
64+
require.Equal(t, context.Canceled, err)
65+
require.GreaterOrEqual(t, wait, time.Duration(0))
66+
}
67+
68+
func TestTokenBucketLimiter_Wait_ContextTimeout(t *testing.T) {
69+
limiter, err := NewTokenBucketLimiter(0.1, 1, 0) // very slow rate
70+
require.NoError(t, err)
71+
72+
// Consume the token
73+
_, err = limiter.Wait(context.Background())
74+
require.NoError(t, err)
75+
76+
// Now we'll have to wait, but timeout the context
77+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
78+
defer cancel()
79+
80+
wait, err := limiter.Wait(ctx)
81+
require.Error(t, err)
82+
require.Equal(t, context.DeadlineExceeded, err)
83+
require.GreaterOrEqual(t, wait, time.Duration(0))
84+
}
85+
86+
func TestTokenBucketLimiter_Refill(t *testing.T) {
87+
limiter, err := NewTokenBucketLimiter(10, 5, 0) // 10 tokens/sec, burst 5
88+
require.NoError(t, err)
89+
90+
// Consume all tokens
91+
for i := 0; i < 5; i++ {
92+
_, err = limiter.Wait(context.Background())
93+
require.NoError(t, err)
94+
}
95+
96+
// Wait for refill
97+
time.Sleep(150 * time.Millisecond) // should refill ~1.5 tokens
98+
99+
// Should be able to get at least one token
100+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
101+
defer cancel()
102+
103+
wait, err := limiter.Wait(ctx)
104+
require.NoError(t, err)
105+
require.Less(t, wait, 100*time.Millisecond)
106+
}
107+
108+
func TestTokenBucketLimiter_Jitter(t *testing.T) {
109+
limiter, err := NewTokenBucketLimiter(1, 1, 0.1) // 10% jitter
110+
require.NoError(t, err)
111+
112+
// Consume the token
113+
_, err = limiter.Wait(context.Background())
114+
require.NoError(t, err)
115+
116+
// Wait should have some jitter applied
117+
// Base wait should be ~1 second, jitter can add up to 10% = 100ms
118+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
119+
defer cancel()
120+
121+
wait1, err1 := limiter.Wait(ctx)
122+
require.NoError(t, err1)
123+
124+
// Wait time should be between base (1s) and base + jitter (1.1s)
125+
// but could be slightly more due to timing
126+
baseWait := 1 * time.Second
127+
maxWait := 1200 * time.Millisecond // 1.2 seconds
128+
require.GreaterOrEqual(t, wait1, baseWait*9/10) // at least 90% of base (allowing negative jitter)
129+
require.LessOrEqual(t, wait1, maxWait) // at most 20% more than base
130+
}
131+
132+
func TestTokenBucketLimiter_BurstLimit(t *testing.T) {
133+
limiter, err := NewTokenBucketLimiter(10, 3, 0) // 10 tokens/sec, burst 3
134+
require.NoError(t, err)
135+
136+
// Consume burst
137+
for i := 0; i < 3; i++ {
138+
_, err = limiter.Wait(context.Background())
139+
require.NoError(t, err)
140+
}
141+
142+
// Next wait should take time even though rate is high
143+
// With 10 tokens/sec, we need 0.1s (100ms) to refill 1 token
144+
ctx, cancel := context.WithTimeout(context.Background(), 80*time.Millisecond)
145+
defer cancel()
146+
147+
wait, err := limiter.Wait(ctx)
148+
// Should timeout because we need to wait for refill (~100ms) but timeout is 80ms
149+
require.Error(t, err)
150+
require.Equal(t, context.DeadlineExceeded, err)
151+
// Wait time accumulates during the actual wait and may be close to timeout
152+
// It should be less than the required refill time (~100ms)
153+
require.Less(t, wait, 110*time.Millisecond) // should be less than refill time
154+
require.Greater(t, wait, 50*time.Millisecond) // but more than half timeout
155+
}
156+
157+
func TestTokenBucketLimiter_ConcurrentAccess(t *testing.T) {
158+
limiter, err := NewTokenBucketLimiter(100, 10, 0)
159+
require.NoError(t, err)
160+
161+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
162+
defer cancel()
163+
164+
done := make(chan error, 10)
165+
166+
// Launch 10 goroutines
167+
for i := 0; i < 10; i++ {
168+
go func() {
169+
_, err := limiter.Wait(ctx)
170+
done <- err
171+
}()
172+
}
173+
174+
// All should succeed
175+
for i := 0; i < 10; i++ {
176+
err := <-done
177+
require.NoError(t, err)
178+
}
179+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package deadline
2+
3+
import (
4+
"net/http"
5+
"time"
6+
7+
"github.com/shortlink-org/go-sdk/http/client/internal/types"
8+
)
9+
10+
type Config struct {
11+
Threshold time.Duration
12+
Metrics *types.Metrics
13+
Client string
14+
}
15+
16+
func Middleware(cfg Config) types.Middleware {
17+
if cfg.Threshold <= 0 {
18+
return func(next http.RoundTripper) http.RoundTripper { return next }
19+
}
20+
21+
return func(next http.RoundTripper) http.RoundTripper {
22+
return types.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
23+
ctx := req.Context()
24+
25+
dl, ok := ctx.Deadline()
26+
// Check if deadline is too close, accounting for potential clock skew.
27+
// Using time.Until ensures we check relative to current time.
28+
if ok && time.Until(dl) < cfg.Threshold {
29+
if cfg.Metrics != nil {
30+
cfg.Metrics.DeadlineCancelledTotal.
31+
WithLabelValues(cfg.Client, req.URL.Host, req.Method).
32+
Inc()
33+
}
34+
35+
return nil, types.ErrDeadlineTooClose
36+
}
37+
38+
return next.RoundTrip(req)
39+
})
40+
}
41+
}

0 commit comments

Comments
 (0)