Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions accesslogs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Options struct {
QueueLimit int `user:"true" help:"log file upload queue limit" default:"100"`
RetryLimit int `user:"true" help:"maximum number of retries for log file uploads" default:"3"`
ShutdownTimeout time.Duration `user:"true" help:"time limit waiting for queued logs to finish uploading when gateway is shutting down" default:"1m"`
UploadTimeout time.Duration `user:"true" help:"time limit for each individual log file upload" default:"30s"`
}
}

Expand All @@ -118,6 +119,9 @@ func NewProcessor(log *zap.Logger, opts Options) *Processor {
if opts.UploadingOptions.ShutdownTimeout <= 0 {
opts.UploadingOptions.ShutdownTimeout = time.Minute
}
if opts.UploadingOptions.UploadTimeout <= 0 {
opts.UploadingOptions.UploadTimeout = 30 * time.Second
}

return &Processor{
log: log,
Expand All @@ -126,6 +130,7 @@ func NewProcessor(log *zap.Logger, opts Options) *Processor {
queueLimit: opts.UploadingOptions.QueueLimit,
retryLimit: opts.UploadingOptions.RetryLimit,
shutdownTimeout: opts.UploadingOptions.ShutdownTimeout,
uploadTimeout: opts.UploadingOptions.UploadTimeout,
}),

defaultEntryLimit: opts.DefaultEntryLimit,
Expand Down
43 changes: 25 additions & 18 deletions accesslogs/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ type uploader interface {
var _ uploader = (*sequentialUploader)(nil)

type upload struct {
store Storage
bucket string
key string
body []byte
retries int
store Storage
bucket string
key string
body []byte
retries int
uploadTimeout time.Duration
}

type sequentialUploader struct {
Expand All @@ -79,6 +80,7 @@ type sequentialUploader struct {
queueLimit int
retryLimit int
shutdownTimeout time.Duration
uploadTimeout time.Duration

mu sync.Mutex
queue chan upload
Expand All @@ -94,6 +96,7 @@ type sequentialUploaderOptions struct {
queueLimit int
retryLimit int
shutdownTimeout time.Duration
uploadTimeout time.Duration
}

func newSequentialUploader(log *zap.Logger, opts sequentialUploaderOptions) *sequentialUploader {
Expand All @@ -103,6 +106,7 @@ func newSequentialUploader(log *zap.Logger, opts sequentialUploaderOptions) *seq
queueLimit: opts.queueLimit,
retryLimit: opts.retryLimit,
shutdownTimeout: opts.shutdownTimeout,
uploadTimeout: opts.uploadTimeout,
queue: make(chan upload, opts.queueLimit),
}
}
Expand All @@ -129,11 +133,12 @@ func (u *sequentialUploader) queueUpload(store Storage, bucket, key string, body
u.mu.Unlock()

u.queue <- upload{
store: store,
bucket: bucket,
key: key,
body: body,
retries: 0,
store: store,
bucket: bucket,
key: key,
body: body,
retries: 0,
uploadTimeout: u.uploadTimeout,
}

return nil
Expand All @@ -154,11 +159,12 @@ func (u *sequentialUploader) queueUploadWithoutQueueLimit(store Storage, bucket,
u.mu.Unlock()

u.queue <- upload{
store: store,
bucket: bucket,
key: key,
body: body,
retries: 0,
store: store,
bucket: bucket,
key: key,
body: body,
retries: 0,
uploadTimeout: u.uploadTimeout,
}

return nil
Expand Down Expand Up @@ -192,9 +198,10 @@ func (u *sequentialUploader) run() error {
for {
select {
case up := <-u.queue:
// TODO(artur): we need to figure out what context we want
// to pass here. WithTimeout(Background, …)?
if err := up.store.Put(context.TODO(), up.bucket, up.key, up.body); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), up.uploadTimeout)
err := up.store.Put(ctx, up.bucket, up.key, up.body)
cancel()
if err != nil {
if up.retries == u.retryLimit {
mon.Event("upload_dropped")
u.log.Error("retry limit reached",
Expand Down
40 changes: 40 additions & 0 deletions accesslogs/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestLimits(t *testing.T) {
queueLimit: 2,
retryLimit: 1,
shutdownTimeout: time.Second,
uploadTimeout: time.Minute,
})

for range 2 {
Expand All @@ -57,6 +58,7 @@ func TestQueueNoLimit(t *testing.T) {
queueLimit: 2,
retryLimit: 1,
shutdownTimeout: time.Second,
uploadTimeout: time.Minute,
})
defer ctx.Check(u.close)
ctx.Go(u.run)
Expand Down Expand Up @@ -88,6 +90,7 @@ func TestQueueNoLimitErroringStorage(t *testing.T) {
queueLimit: 10,
retryLimit: 1,
shutdownTimeout: time.Second,
uploadTimeout: time.Minute,
})
defer ctx.Check(u.close)
ctx.Go(u.run)
Expand All @@ -112,6 +115,7 @@ func TestQueueErroringStorage(t *testing.T) {
queueLimit: 10,
retryLimit: 1,
shutdownTimeout: time.Second,
uploadTimeout: time.Minute,
})
defer ctx.Check(u.close)
ctx.Go(u.run)
Expand All @@ -120,3 +124,39 @@ func TestQueueErroringStorage(t *testing.T) {
require.NoError(t, u.queueUpload(s, "test", "test", testrand.Bytes(memory.KiB)))
}
}

type slowStorage struct {
delay time.Duration
}

func (s slowStorage) Put(ctx context.Context, bucket, key string, data []byte) error {
select {
case <-time.After(s.delay):
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func TestUploadTimeout(t *testing.T) {
t.Parallel()

ctx := testcontext.New(t)
defer ctx.Cleanup()

log := zaptest.NewLogger(t)
defer ctx.Check(log.Sync)

s := slowStorage{delay: 2 * time.Second}
u := newSequentialUploader(log, sequentialUploaderOptions{
entryLimit: 5 * memory.KiB,
queueLimit: 10,
retryLimit: 1,
shutdownTimeout: 5 * time.Second,
uploadTimeout: 500 * time.Millisecond,
})
defer ctx.Check(u.close)
ctx.Go(u.run)

require.NoError(t, u.queueUpload(s, "test", "test", testrand.Bytes(memory.KiB)))
}