Skip to content

Commit 2b99cf7

Browse files
committed
update: use UNLINK and chunking keys to delete task keys
1 parent 21b33e1 commit 2b99cf7

File tree

6 files changed

+36
-23
lines changed

6 files changed

+36
-23
lines changed

cmd/root.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,11 @@ func mustInitializeQueueBackend() {
297297
queueBackend, err = backendfactory.NewBackend(logger, backendconfig.Config{
298298
BackendType: cmdOpts.Backend,
299299
Redis: &backendconfig.RedisConfig{
300-
KeyPrefix: cmdOpts.Redis.KeyPrefix,
301-
Client: cmdOpts.Redis.NewClient(),
302-
Backoff: cmdOpts.Redis.Backoff,
303-
ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet,
300+
KeyPrefix: cmdOpts.Redis.KeyPrefix,
301+
Client: cmdOpts.Redis.NewClient(),
302+
Backoff: cmdOpts.Redis.Backoff,
303+
ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet,
304+
ChunkSizeInDelete: cmdOpts.Redis.ChunkSizeInDelete,
304305
},
305306
})
306307

pkg/backend/config/config.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ type Config struct {
3030
}
3131

3232
type RedisConfig struct {
33-
KeyPrefix string
34-
Client *redis.Client
35-
Backoff BackoffConfig
36-
ChunkSizeInGet int
33+
KeyPrefix string
34+
Client *redis.Client
35+
Backoff BackoffConfig
36+
ChunkSizeInGet int
37+
ChunkSizeInDelete int
3738
}
3839

3940
// TODO: support UniversalOptions
@@ -52,7 +53,8 @@ type RedisClientConfig struct {
5253
IdleTimeout time.Duration `json:"idleTimeout" yaml:"idleTimeout" default:"5m"`
5354
IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"`
5455

55-
ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"`
56+
ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"`
57+
ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInGet" default:"1000"`
5658
}
5759

5860
func (c RedisClientConfig) NewClient() *redis.Client {

pkg/backend/redis/queue.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error {
218218
// .. task_keys = collect task keys
219219
// WATCh task_keys
220220
// MULTI
221-
// DEL {queue_key} worker_keys task_keys
221+
// UNLINK {queue_key} worker_keys task_keys
222222
// HDEL {all_queues_key} {queueName}
223223
// EXEC
224224
txf := func(tx *redis.Tx) error {
@@ -240,8 +240,16 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error {
240240
tx.Watch(taskKeysToDelete...)
241241
keysToDelete = append(keysToDelete, taskKeysToDelete...)
242242

243+
chunkSize := b.ChunkSizeInGet
244+
numOfKeysToDelete := len(keysToDelete)
243245
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
244-
pipe.Del(keysToDelete...)
246+
for begin := 0; begin < numOfKeysToDelete; begin += chunkSize {
247+
end := begin + chunkSize
248+
if end > numOfKeysToDelete {
249+
end = numOfKeysToDelete
250+
}
251+
pipe.Unlink(keysToDelete[begin:end]...)
252+
}
245253
pipe.HDel(b.allQueuesKey(), queue.Spec.Name)
246254
return nil
247255
})

pkg/backend/redis/redis_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,11 @@ var _ = Describe("Backend", func() {
107107
ibackend, err := NewBackend(logger, backendconfig.Config{
108108
BackendType: "redis",
109109
Redis: &backendconfig.RedisConfig{
110-
KeyPrefix: "test",
111-
Client: client,
112-
Backoff: backoffConfig,
113-
ChunkSizeInGet: 1000,
110+
KeyPrefix: "test",
111+
Client: client,
112+
Backoff: backoffConfig,
113+
ChunkSizeInGet: 1000,
114+
ChunkSizeInDelete: 1000,
114115
},
115116
})
116117
Expect(err).NotTo(HaveOccurred())

pkg/backend/redis/task.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ import (
3838
)
3939

4040
const (
41-
KB = 1 << 10
42-
PayloadMaxSizeInKB = 1
43-
MessageMaxSizeInKB = 1
44-
HistoryLengthMax = 10
45-
MaxNameLength = 1024
41+
KB = 1 << 10
42+
PayloadMaxSizeInKB = 1
43+
MessageMaxSizeInKB = 1
44+
HistoryLengthMax = 10
45+
MaxNameLength = 1024
4646
)
4747

4848
func (b *Backend) ensureQueueAndWorkerExists(queueUID, workerUID uuid.UUID) (*taskqueue.TaskQueue, *worker.Worker, error) {

pkg/worker/worker_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,10 @@ var _ = Describe("Worker", func() {
7474
bcknd, err = backendfactory.NewBackend(logger, backendconfig.Config{
7575
BackendType: "redis",
7676
Redis: &backendconfig.RedisConfig{
77-
Client: client,
78-
Backoff: backendConfig,
79-
ChunkSizeInGet: 1000,
77+
Client: client,
78+
Backoff: backendConfig,
79+
ChunkSizeInGet: 1000,
80+
ChunkSizeInDelete: 1000,
8081
},
8182
})
8283
Expect(err).NotTo(HaveOccurred())

0 commit comments

Comments
 (0)