From df96f391635f7a258322b7140b7de2d2e620fe36 Mon Sep 17 00:00:00 2001 From: parthivsurya Date: Sun, 4 Jan 2026 22:31:37 +0530 Subject: [PATCH 1/2] feat: use Redis Lua script for atomic concurrency counting (fixes #9) --- internals/Load-Tester/Raft/tester.go | 58 +++++++++++++------------ internals/Service/redisDB.go | 63 +++++++++++++++------------- 2 files changed, 65 insertions(+), 56 deletions(-) diff --git a/internals/Load-Tester/Raft/tester.go b/internals/Load-Tester/Raft/tester.go index 36412ad..66a9fc5 100644 --- a/internals/Load-Tester/Raft/tester.go +++ b/internals/Load-Tester/Raft/tester.go @@ -4,31 +4,37 @@ import ( "io" // "fmt" "time" - + "Load-Pulse/Config" "Load-Pulse/Service" "Load-Pulse/Statistics" ) func RunTest(workerID int, l *Service.LoadTester) *Statistics.Stats { - var body []byte; - start := time.Now(); - - cfg := Config.GetConfig(); - requestSleepTime := cfg.RequestSleepTime; - - currConcurrencyCount := Service.GetRequestCount(); - for currConcurrencyCount > int64(l.ConcurrencyLimit) { - // workerMsg := fmt.Sprintf("[WORKER-ALERT-%d]: Concurrency Count: %d => Limit Reached !! Waiting\n", workerID, currConcurrencyCount); - // Service.LogError(workerMsg); - time.Sleep(time.Millisecond * time.Duration(requestSleepTime)); - currConcurrencyCount = Service.GetRequestCount(); + var body []byte + start := time.Now() + + cfg := Config.GetConfig() + requestSleepTime := cfg.RequestSleepTime + + // Atomic Concurrency Check + for { + allowed, err := Service.TryIncrementRequestCount(l.ConcurrencyLimit) + if err != nil { + // If Redis fails, log and retry + Service.LogError("[ERR]: Redis Error in Concurrency Check: " + err.Error() + "\n") + time.Sleep(time.Millisecond * time.Duration(requestSleepTime)) + continue + } + if allowed { + break + } + // Limit reached, wait and retry + time.Sleep(time.Millisecond * time.Duration(requestSleepTime)) } - Service.IncrementRequestCount(); - - resp, err := l.Client.Do(l.Request); - rd := time.Since(start); + resp, err := l.Client.Do(l.Request) + rd := time.Since(start) stats := &Statistics.Stats{ Endpoint: l.Endpoint, @@ -38,15 +44,15 @@ func RunTest(workerID int, l *Service.LoadTester) *Statistics.Stats { } if err != nil { - stats.FailedRequests = 1; - Service.DecrementRequestCount(); - return stats; + stats.FailedRequests = 1 + Service.DecrementRequestCount() + return stats } - defer resp.Body.Close(); + defer resp.Body.Close() - body, _ = io.ReadAll(resp.Body); - stats.ResponseSize = float64(len(body)); - Service.DecrementRequestCount(); - return stats; -} \ No newline at end of file + body, _ = io.ReadAll(resp.Body) + stats.ResponseSize = float64(len(body)) + Service.DecrementRequestCount() + return stats +} diff --git a/internals/Service/redisDB.go b/internals/Service/redisDB.go index 0a6cb47..b94ab29 100644 --- a/internals/Service/redisDB.go +++ b/internals/Service/redisDB.go @@ -3,69 +3,72 @@ package Service import ( "context" "log" - "sync" config "Load-Pulse/Config" redis "github.com/redis/go-redis/v9" ) -var client *redis.Client; +var client *redis.Client -var ctx = context.Background(); +var ctx = context.Background() -var mu sync.Mutex; +// Lua script to atomically check and increment +var incrementScript = redis.NewScript(` + local key = KEYS[1] + local limit = tonumber(ARGV[1]) + local current = tonumber(redis.call("GET", key) or "0") + if current < limit then + return redis.call("INCR", key) + else + return -1 + end +`) -func IncrementRequestCount() { - mu.Lock() - defer mu.Unlock() +func TryIncrementRequestCount(limit int) (bool, error) { cfg := config.GetConfig() - err := client.Incr(ctx, cfg.RedisKey).Err() + result, err := incrementScript.Run(ctx, client, []string{cfg.RedisKey}, limit).Int64() if err != nil { - log.Fatal("[ERR]: Error in Incrementing Concurrent Request Count from Redis !!", err); + return false, err } + // If result is -1, limit reached. Otherwise (new count), success. + return result != -1, nil } func DecrementRequestCount() { - mu.Lock() - defer mu.Unlock() - cfg := config.GetConfig(); - err := client.Decr(ctx, cfg.RedisKey).Err(); + cfg := config.GetConfig() + err := client.Decr(ctx, cfg.RedisKey).Err() if err != nil { - log.Fatal("[ERR]: Error in Decrementing Concurrent Request Count from Redis !!", err); + log.Fatal("[ERR]: Error in Decrementing Concurrent Request Count from Redis !!", err) } } func GetRequestCount() int64 { - mu.Lock() - defer mu.Unlock() - cfg := config.GetConfig(); - currentCount, err := client.Get(ctx, cfg.RedisKey).Int64(); + cfg := config.GetConfig() + currentCount, err := client.Get(ctx, cfg.RedisKey).Int64() if err != nil { - log.Fatal("[ERR]: Error in Fetching Concurrent Requests Count from Redis !!", err); - return 0; + log.Fatal("[ERR]: Error in Fetching Concurrent Requests Count from Redis !!", err) + return 0 } - return currentCount; + return currentCount } func ResetRequestCount() { - mu.Lock() - defer mu.Unlock() - cfg := config.GetConfig(); - err := client.Set(ctx, cfg.RedisKey, 0, 0).Err(); + cfg := config.GetConfig() + err := client.Set(ctx, cfg.RedisKey, 0, 0).Err() if err != nil { - log.Fatal("[ERR]: Error in Resetting Concurrent Requests Count in Redis !!", err); + log.Fatal("[ERR]: Error in Resetting Concurrent Requests Count in Redis !!", err) } - LogServer("[LOG]: Concurrency Count Reset Done\n"); + LogServer("[LOG]: Concurrency Count Reset Done\n") } func InitRedisClient() { - cfg := config.GetConfig(); + cfg := config.GetConfig() client = redis.NewClient(&redis.Options{ Addr: cfg.RedisURL, Password: cfg.RedisPassword, DB: 0, Protocol: 2, }) - LogServer("[LOG]: Redis Client Initialized\n"); -} \ No newline at end of file + LogServer("[LOG]: Redis Client Initialized\n") +} From 748d2ea733219039f51d45bdbe9096d94c42cf74 Mon Sep 17 00:00:00 2001 From: parthivsurya Date: Fri, 9 Jan 2026 17:41:27 +0530 Subject: [PATCH 2/2] refactor: extract redis lua script to separate file --- internals/Service/redisDB.go | 15 +++++---------- internals/Service/scripts/increment.lua | 8 ++++++++ 2 files changed, 13 insertions(+), 10 deletions(-) create mode 100644 internals/Service/scripts/increment.lua diff --git a/internals/Service/redisDB.go b/internals/Service/redisDB.go index b94ab29..d1e95b2 100644 --- a/internals/Service/redisDB.go +++ b/internals/Service/redisDB.go @@ -2,6 +2,7 @@ package Service import ( "context" + _ "embed" "log" config "Load-Pulse/Config" @@ -13,17 +14,11 @@ var client *redis.Client var ctx = context.Background() +//go:embed scripts/increment.lua +var kaimScript string + // Lua script to atomically check and increment -var incrementScript = redis.NewScript(` - local key = KEYS[1] - local limit = tonumber(ARGV[1]) - local current = tonumber(redis.call("GET", key) or "0") - if current < limit then - return redis.call("INCR", key) - else - return -1 - end -`) +var incrementScript = redis.NewScript(kaimScript) func TryIncrementRequestCount(limit int) (bool, error) { cfg := config.GetConfig() diff --git a/internals/Service/scripts/increment.lua b/internals/Service/scripts/increment.lua new file mode 100644 index 0000000..9160496 --- /dev/null +++ b/internals/Service/scripts/increment.lua @@ -0,0 +1,8 @@ +local key = KEYS[1] +local limit = tonumber(ARGV[1]) +local current = tonumber(redis.call("GET", key) or "0") +if current < limit then + return redis.call("INCR", key) +else + return -1 +end