Skip to content

Commit 819196a

Browse files
committed
LOGC-7: Wire processor into main
Transform stub main into complete application - Load and validate configuration - Set up logging - Create and initialize processor - Signal handling - Shutdown timeout with cleanup
1 parent da30d43 commit 819196a

File tree

1 file changed

+103
-5
lines changed

1 file changed

+103
-5
lines changed

cmd/log-courier/main.go

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,54 @@
11
package main
22

33
import (
4+
"context"
5+
"errors"
46
"fmt"
7+
"log/slog"
58
"os"
9+
"os/signal"
10+
"time"
611

712
"github.com/spf13/pflag"
13+
"golang.org/x/sys/unix"
14+
815
"github.com/scality/log-courier/pkg/logcourier"
16+
"github.com/scality/log-courier/pkg/util"
917
)
1018

1119
func main() {
20+
os.Exit(run())
21+
}
22+
23+
// buildProcessorConfig creates processor config from ConfigSpec
24+
func buildProcessorConfig(logger *slog.Logger) logcourier.Config {
25+
return logcourier.Config{
26+
ClickHouseURL: logcourier.ConfigSpec.GetString("clickhouse.url"),
27+
ClickHouseUsername: logcourier.ConfigSpec.GetString("clickhouse.username"),
28+
ClickHousePassword: logcourier.ConfigSpec.GetString("clickhouse.password"),
29+
ClickHouseTimeout: time.Duration(logcourier.ConfigSpec.GetInt("clickhouse.timeout-seconds")) * time.Second,
30+
CountThreshold: logcourier.ConfigSpec.GetInt("consumer.count-threshold"),
31+
TimeThresholdSec: logcourier.ConfigSpec.GetInt("consumer.time-threshold-seconds"),
32+
DiscoveryInterval: time.Duration(logcourier.ConfigSpec.GetInt("consumer.discovery-interval-seconds")) * time.Second,
33+
NumWorkers: logcourier.ConfigSpec.GetInt("consumer.num-workers"),
34+
MaxRetries: logcourier.ConfigSpec.GetInt("retry.max-retries"),
35+
InitialBackoff: time.Duration(logcourier.ConfigSpec.GetInt("retry.initial-backoff-seconds")) * time.Second,
36+
MaxBackoff: time.Duration(logcourier.ConfigSpec.GetInt("retry.max-backoff-seconds")) * time.Second,
37+
S3Endpoint: logcourier.ConfigSpec.GetString("s3.endpoint"),
38+
S3AccessKeyID: logcourier.ConfigSpec.GetString("s3.access-key-id"),
39+
S3SecretAccessKey: logcourier.ConfigSpec.GetString("s3.secret-access-key"),
40+
Logger: logger,
41+
}
42+
}
43+
44+
func run() int {
45+
// Add command-line flags
1246
logcourier.ConfigSpec.AddFlag(pflag.CommandLine, "log-level", "log-level")
1347

1448
configFileFlag := pflag.String("config-file", "", "Path to configuration file")
1549
pflag.Parse()
1650

51+
// Load configuration
1752
configFile := *configFileFlag
1853
if configFile == "" {
1954
configFile = os.Getenv("LOG_COURIER_CONFIG_FILE")
@@ -23,16 +58,79 @@ func main() {
2358
if err != nil {
2459
fmt.Fprintf(os.Stderr, "Configuration error: %v\n", err)
2560
pflag.Usage()
26-
os.Exit(2)
61+
return 2
2762
}
2863

64+
// Validate configuration
2965
err = logcourier.ValidateConfig()
3066
if err != nil {
3167
fmt.Fprintf(os.Stderr, "Configuration validation error: %v\n", err)
32-
pflag.Usage()
33-
os.Exit(2)
68+
return 2
69+
}
70+
71+
// Set up logger
72+
logLevel := util.ParseLogLevel(logcourier.ConfigSpec.GetString("log-level"))
73+
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
74+
Level: logLevel,
75+
}))
76+
77+
// Create processor
78+
ctx := context.Background()
79+
processorCfg := buildProcessorConfig(logger)
80+
81+
processor, err := logcourier.NewProcessor(ctx, processorCfg)
82+
if err != nil {
83+
logger.Error("failed to create processor", "error", err)
84+
return 1
85+
}
86+
defer func() {
87+
if err := processor.Close(); err != nil {
88+
logger.Error("failed to close processor", "error", err)
89+
}
90+
}()
91+
92+
// Set up signal handling for graceful shutdown
93+
ctx, cancel := context.WithCancel(ctx)
94+
defer cancel()
95+
96+
signalsChan := make(chan os.Signal, 1)
97+
signal.Notify(signalsChan, unix.SIGINT, unix.SIGTERM)
98+
99+
// Start processor in goroutine
100+
errChan := make(chan error, 1)
101+
go func() {
102+
errChan <- processor.Run(ctx)
103+
}()
104+
105+
// Wait for signal or error
106+
select {
107+
case sig := <-signalsChan:
108+
logger.Info("signal received", "signal", sig)
109+
cancel()
110+
111+
// Wait for processor to stop gracefully (with timeout)
112+
shutdownTimeout := time.Duration(logcourier.ConfigSpec.GetInt("shutdown-timeout-seconds")) * time.Second
113+
shutdownTimer := time.NewTimer(shutdownTimeout)
114+
defer shutdownTimer.Stop()
115+
116+
select {
117+
case <-shutdownTimer.C:
118+
logger.Warn("shutdown timeout exceeded, forcing exit")
119+
return 1
120+
case err := <-errChan:
121+
if err != nil && !errors.Is(err, context.Canceled) {
122+
logger.Error("processor stopped with error", "error", err)
123+
return 1
124+
}
125+
}
126+
127+
case err := <-errChan:
128+
if err != nil && !errors.Is(err, context.Canceled) {
129+
logger.Error("processor error", "error", err)
130+
return 1
131+
}
34132
}
35133

36-
// TODO: Initialize and run consumer
37-
fmt.Println("log-courier started (stub)")
134+
logger.Info("log-courier stopped")
135+
return 0
38136
}

0 commit comments

Comments
 (0)