Skip to content

Commit d3e5936

Browse files
committed
update: use SScan instead of SMembers Command
1 parent 79c2f88 commit d3e5936

File tree

1 file changed

+28
-13
lines changed

1 file changed

+28
-13
lines changed

pkg/backend/redis/task.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ import (
3838
)
3939

4040
const (
41-
KB = 1 << 10
42-
PayloadMaxSizeInKB = 1
43-
MessageMaxSizeInKB = 1
44-
HistoryLengthMax = 10
45-
MaxNameLength = 1024
41+
KB = 1 << 10
42+
PayloadMaxSizeInKB = 1
43+
MessageMaxSizeInKB = 1
44+
HistoryLengthMax = 10
45+
MaxNameLength = 1024
46+
taskOperationChunkSize = 1024
4647
)
4748

4849
func (b *Backend) ensureQueueAndWorkerExists(queueUID, workerUID uuid.UUID) (*taskqueue.TaskQueue, *worker.Worker, error) {
@@ -162,10 +163,7 @@ func (b *Backend) getTasksByUIDs(queueUID string, taskUIDs []string, filter func
162163
}
163164

164165
func (b *Backend) getTasks(queueUID string, filter func(*task.Task) bool, lggr zerolog.Logger) ([]*task.Task, error) {
165-
taskUIDs, err := b.Client.SMembers(b.tasksKey(queueUID)).Result()
166-
if err == redis.Nil {
167-
return []*task.Task{}, nil
168-
}
166+
taskUIDs, err := b.allTaskUIDsByQueueUID(b.Client, queueUID)
169167
if err != nil {
170168
return nil, err
171169
}
@@ -938,10 +936,7 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string)
938936
b.deadletterQueueKey(queueUID),
939937
b.pendingTaskQueueKey(queueUID),
940938
}
941-
taskUIDs, err := rds.SMembers(b.tasksKey(queueUID)).Result()
942-
if err == redis.Nil {
943-
return []string{}, nil
944-
}
939+
taskUIDs, err := b.allTaskUIDsByQueueUID(rds, queueUID)
945940
if err != nil {
946941
return []string{}, err
947942
}
@@ -950,3 +945,23 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string)
950945
}
951946
return keysToDelete, nil
952947
}
948+
949+
func (b *Backend) allTaskUIDsByQueueUID(rds redis.Cmdable, queueUID string) ([]string, error) {
950+
var cursor uint64
951+
var taskUIDs []string
952+
for {
953+
keys, nextCursor, err := rds.SScan(b.tasksKey(queueUID), cursor, "", taskOperationChunkSize).Result()
954+
if err == redis.Nil {
955+
return []string{}, nil
956+
}
957+
if err != nil {
958+
return []string{}, err
959+
}
960+
taskUIDs = append(taskUIDs, keys...)
961+
cursor = nextCursor
962+
if cursor == 0 {
963+
break
964+
}
965+
}
966+
return taskUIDs, nil
967+
}

0 commit comments

Comments
 (0)