Skip to content

Commit c70e0a0

Browse files
WheezyxMateusz Wedel
andauthored
Allow configuring UDP packets buffer in receiver. (#760)
* Allow configuring UDP packets buffer in receiver. * Fix mixed descriptions * Update CHANGELOG.md --------- Co-authored-by: Mateusz Wedel <[email protected]>
1 parent 42d3b33 commit c70e0a0

File tree

11 files changed

+37
-18
lines changed

11 files changed

+37
-18
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
40.1.0
2+
------
3+
- Add support for configuration of receiver's buffer size - `receive-buffer-size`
4+
15
40.0.5
26
------
37
- Clean up a bunch of old stuff in the repo

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ This configuration mode allows the following configuration options:
9898
Defaults to `false`.
9999
- `receive-batch-size`: the number of datagrams to attempt to read. It is more CPU efficient to read multiple, however
100100
it takes extra memory. See [Memory allocation for read buffers] section below for details. Defaults to 50.
101+
- `receive-buffer-size`: the size of single buffer that's used to store datagram during read. Number of buffers is defined
102+
by `receive-batch-size`. Default to 65535 what is theoretical maximum UDP packet size.
101103
- `conn-per-reader`: attempts to create a connection for every UDP receiver. Not supported by all OS versions. It will
102104
be ignored when unix sockets are used for the connection.
103105
Defaults to `false`.
@@ -398,7 +400,7 @@ Memory allocation for read buffers
398400
By default `gostatsd` will batch read multiple packets to optimise read performance. The amount of memory allocated
399401
for these read buffers is determined by the config options:
400402

401-
max-readers * receive-batch-size * 64KB (max packet size)
403+
max-readers * receive-batch-size * receive-buffer-size
402404

403405
The metric `avg_packets_in_batch` can be used to track the average number of datagrams received per batch, and the
404406
`--receive-batch-size` flag used to tune it. There may be some benefit to tuning the `--max-readers` flag as well.

cmd/gostatsd/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ func constructServer(v *viper.Viper) (*statsd.Server, error) {
166166
PercentThreshold: pt,
167167
HeartbeatEnabled: v.GetBool(gostatsd.ParamHeartbeatEnabled),
168168
ReceiveBatchSize: v.GetInt(gostatsd.ParamReceiveBatchSize),
169+
ReceiveBufferSize: v.GetInt(gostatsd.ParamReceiveBufferSize),
169170
ConnPerReader: v.GetBool(gostatsd.ParamConnPerReader),
170171
ServerMode: v.GetString(gostatsd.ParamServerMode),
171172
LogRawMetric: v.GetBool(gostatsd.ParamLogRawMetric),

cmd/lambda-extension/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func NewServer(v *viper.Viper, logger logrus.FieldLogger) *statsd.Server {
8787
StatserType: v.GetString(gostatsd.ParamStatserType),
8888
HeartbeatEnabled: v.GetBool(gostatsd.ParamHeartbeatEnabled),
8989
ReceiveBatchSize: v.GetInt(gostatsd.ParamReceiveBatchSize),
90+
ReceiveBufferSize: v.GetInt(gostatsd.ParamReceiveBufferSize),
9091
ConnPerReader: v.GetBool(gostatsd.ParamConnPerReader),
9192
ServerMode: GostatsdForwarderMode,
9293
LogRawMetric: v.GetBool(gostatsd.ParamLogRawMetric),

cmd/tester/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func main() {
4949
MaxQueueSize: gostatsd.DefaultMaxQueueSize,
5050
PercentThreshold: gostatsd.DefaultPercentThreshold,
5151
ReceiveBatchSize: gostatsd.DefaultReceiveBatchSize,
52+
ReceiveBufferSize: gostatsd.DefaultReceiveBufferSize,
5253
Viper: viper.New(),
5354
}
5455
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(s.Benchmark)*time.Second)

defaults_and_params.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ const (
7979
DefaultHeartbeatEnabled = false
8080
// DefaultReceiveBatchSize is the number of datagrams to read in each receive batch
8181
DefaultReceiveBatchSize = 50
82+
// DefaultReceiveBufferSize is the number of size of a buffer for each datagram during reads
83+
DefaultReceiveBufferSize = 0xffff
8284
// DefaultEstimatedTags is the estimated number of expected tags on an individual metric submitted externally
8385
DefaultEstimatedTags = 4
8486
// DefaultConnPerReader is the default for whether to create a connection per reader
@@ -162,6 +164,8 @@ const (
162164
ParamHeartbeatEnabled = "heartbeat-enabled"
163165
// ParamReceiveBatchSize is the name of the parameter with the number of datagrams to read in each receive batch
164166
ParamReceiveBatchSize = "receive-batch-size"
167+
// ParamReceiveBufferSize is the name of the parameter with the number that defines size of buffer of each datagram during reads
168+
ParamReceiveBufferSize = "receive-buffer-size"
165169
// ParamConnPerReader is the name of the parameter indicating whether to create a connection per reader
166170
ParamConnPerReader = "conn-per-reader"
167171
// ParamBadLineRateLimitPerMinute is the name of the parameter indicating how many bad lines can be logged per minute
@@ -216,6 +220,7 @@ func AddFlags(fs *pflag.FlagSet) {
216220
fs.String(ParamPercentThreshold, strings.Join(toStringSlice(DefaultPercentThreshold), " "), "Space separated list of percentiles")
217221
fs.Bool(ParamHeartbeatEnabled, DefaultHeartbeatEnabled, "Enables heartbeat")
218222
fs.Int(ParamReceiveBatchSize, DefaultReceiveBatchSize, "The number of datagrams to read in each receive batch")
223+
fs.Int(ParamReceiveBufferSize, DefaultReceiveBufferSize, "The number that defines size of buffer of each datagram during reads")
219224
fs.Bool(ParamConnPerReader, DefaultConnPerReader, "Create a separate connection per reader (requires system support for reusing addresses)")
220225
fs.String(ParamServerMode, DefaultServerMode, "The server mode to run in")
221226
fs.String(ParamHostname, getHost(), "overrides the hostname of the server")

internal/awslambda/extension/manager_integration_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func createStatsDServer(apiAddr string, fc flush.Coordinator, log logrus.FieldLo
128128
MaxParsers: 1,
129129
MaxWorkers: 1,
130130
ReceiveBatchSize: 50,
131+
ReceiveBufferSize: 0xffff,
131132
MetricsAddr: l.LocalAddr().String(),
132133
ServerMode: "forwarder",
133134
Viper: v,

pkg/statsd/receiver.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@ import (
1616
"github.com/atlassian/gostatsd/pkg/stats"
1717
)
1818

19-
// ip packet size is stored in two bytes and that is how big in theory the packet can be.
20-
// In practice it is highly unlikely but still possible to get packets bigger than usual MTU of 1500.
21-
const packetSizeUDP = 0xffff
22-
2319
// DatagramReceiver receives datagrams on its PacketConn and passes them off to be parsed
2420
type DatagramReceiver struct {
2521
// Counter fields below must be read/written only using atomic instructions.
@@ -31,20 +27,26 @@ type DatagramReceiver struct {
3127
bufPool *pool.DatagramBufferPool
3228

3329
receiveBatchSize int // The number of datagrams to read in each batch
34-
numReaders int
35-
socketFactory SocketFactory
30+
31+
// ip packet size is stored in two bytes and that is how big in theory the packet can be.
32+
// In practice it is highly unlikely but still possible to get packets bigger than usual MTU of 1500.
33+
// Defaults to 64kB
34+
receiveBufferSize int // size of each buffer used to read datagrams
35+
numReaders int
36+
socketFactory SocketFactory
3637

3738
out chan<- []*Datagram // Output chan of read datagram batches
3839
}
3940

4041
// NewDatagramReceiver initialises a new DatagramReceiver.
41-
func NewDatagramReceiver(out chan<- []*Datagram, sf SocketFactory, numReaders, receiveBatchSize int) *DatagramReceiver {
42+
func NewDatagramReceiver(out chan<- []*Datagram, sf SocketFactory, numReaders, receiveBatchSize int, receiveBufferSize int) *DatagramReceiver {
4243
return &DatagramReceiver{
43-
out: out,
44-
receiveBatchSize: receiveBatchSize,
45-
numReaders: numReaders,
46-
socketFactory: sf,
47-
bufPool: pool.NewDatagramBufferPool(packetSizeUDP),
44+
out: out,
45+
receiveBatchSize: receiveBatchSize,
46+
receiveBufferSize: receiveBufferSize,
47+
numReaders: numReaders,
48+
socketFactory: sf,
49+
bufPool: pool.NewDatagramBufferPool(receiveBufferSize),
4850
}
4951
}
5052

pkg/statsd/receiver_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func BenchmarkReceive(b *testing.B) {
2828
//
2929
// ... so this is pretty arbitrary.
3030
ch := make(chan []*Datagram, 5000)
31-
mr := NewDatagramReceiver(ch, nil, 0, gostatsd.DefaultReceiveBatchSize)
31+
mr := NewDatagramReceiver(ch, nil, 0, gostatsd.DefaultReceiveBatchSize, gostatsd.DefaultReceiveBufferSize)
3232
c, done := fakesocket.NewCountedFakePacketConn(uint64(b.N))
3333

3434
var wg sync.WaitGroup
@@ -74,7 +74,7 @@ func BenchmarkReceive(b *testing.B) {
7474

7575
func TestDatagramReceiver_Receive(t *testing.T) {
7676
ch := make(chan []*Datagram, 1)
77-
mr := NewDatagramReceiver(ch, nil, 0, 2)
77+
mr := NewDatagramReceiver(ch, nil, 0, 2, gostatsd.DefaultReceiveBufferSize)
7878
c := fakesocket.NewFakePacketConn()
7979

8080
ctx, cancel := context.WithCancel(context.Background())
@@ -103,7 +103,7 @@ func TestDatagramReceiver_UnixSocketConnection(t *testing.T) {
103103

104104
// Datagram receiver listening in Unix Domain Socket
105105
socketPath := os.TempDir() + "/gostatsd_receiver_test_receive_uds.sock"
106-
mr := NewDatagramReceiver(ch, socketFactory(socketPath, false), 1, 2)
106+
mr := NewDatagramReceiver(ch, socketFactory(socketPath, false), 1, 2, gostatsd.DefaultReceiveBatchSize)
107107
ctx, cancel := context.WithCancel(context.Background())
108108

109109
var wg sync.WaitGroup
@@ -137,7 +137,7 @@ func TestDatagramReceiver_UnixSocketIsRemovedOnContextCancellation(t *testing.T)
137137
ch := make(chan []*Datagram, 1)
138138

139139
socketPath := os.TempDir() + "/gostatsd_receiver_test_receive_uds.sock"
140-
mr := NewDatagramReceiver(ch, socketFactory(socketPath, false), 1, 2)
140+
mr := NewDatagramReceiver(ch, socketFactory(socketPath, false), 1, 2, gostatsd.DefaultReceiveBatchSize)
141141
ctx, cancel := context.WithCancel(context.Background())
142142

143143
var wg sync.WaitGroup

pkg/statsd/statsd.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type Server struct {
5454
HeartbeatEnabled bool
5555
HeartbeatTags gostatsd.Tags
5656
ReceiveBatchSize int
57+
ReceiveBufferSize int
5758
DisabledSubTypes gostatsd.TimerSubtypes
5859
HistogramLimit uint32
5960
BadLineRateLimitPerSecond rate.Limit
@@ -196,7 +197,7 @@ func (s *Server) RunWithCustomSocket(ctx context.Context, sf SocketFactory) erro
196197
}
197198

198199
// Create the Receiver
199-
receiver := NewDatagramReceiver(datagrams, sf, s.MaxReaders, s.ReceiveBatchSize)
200+
receiver := NewDatagramReceiver(datagrams, sf, s.MaxReaders, s.ReceiveBatchSize, s.ReceiveBufferSize)
200201
runnables = gostatsd.MaybeAppendRunnable(runnables, receiver)
201202

202203
// Create the Statser

0 commit comments

Comments
 (0)