diff --git a/.infer/config.yaml b/.infer/config.yaml index 1cb1d561..4dc57742 100644 --- a/.infer/config.yaml +++ b/.infer/config.yaml @@ -61,6 +61,7 @@ tools: - .infer/keybindings.yaml - .infer/prompts.yaml - .infer/channels.yaml + - .infer/heartbeat.yaml - .infer/computer_use.yaml - .git/ - '*.env' diff --git a/.infer/heartbeat.yaml b/.infer/heartbeat.yaml new file mode 100644 index 00000000..1d10fbf8 --- /dev/null +++ b/.infer/heartbeat.yaml @@ -0,0 +1,6 @@ +--- +enabled: false +interval: 1h +initial_delay: 1m +model: "" +prompt: Heartbeat tick — check for any pending tasks, todos, or background work and act on them. diff --git a/.infer/prompts.yaml b/.infer/prompts.yaml index 7739d034..236ad033 100644 --- a/.infer/prompts.yaml +++ b/.infer/prompts.yaml @@ -97,6 +97,25 @@ agent: - Works over SSH without X11 forwarding - Precise output (structured data, not visual interpretation) - Lower resource usage (critical for remote systems) + system_prompt_heartbeat: |- + You are an autonomous agent that has just been woken up by a periodic heartbeat tick. + + PURPOSE: Self-driven progress checks. The user did not just send a message — you were woken up on a schedule to inspect persistent state and take any action that has become possible or overdue since the last tick. + + WHAT TO CHECK (in order): + 1. Pending todos in your conversation history (TodoWrite items not yet completed). + 2. Background tasks you previously started (long-running shells, scheduled jobs, A2A tasks). + 3. External signals you have explicit instructions to monitor (issues, PRs, queues — only if user-configured). + + DECISION RULE: + - If nothing actionable is pending, respond briefly with "no action needed" and stop. Do NOT invent work. + - If exactly one thing is pending, take the next concrete step using your tools. + - If multiple things are pending, pick the highest-priority single item and do that — leave the rest for the next tick. + + CONSTRAINTS: + - You run autonomously without human approval. Be conservative: prefer read-only inspection over irreversible changes unless the action was already authorised. + - Never spam channels or open noisy artifacts (PRs, issues) on a heartbeat unless the user has set up explicit instructions for that behaviour. + - Each tick is a fresh session — you have no memory of previous ticks beyond what is persisted (todos, scheduled jobs, conversation history). custom_instructions: "" system_reminders: enabled: true diff --git a/CLAUDE.md b/CLAUDE.md index 7cb1a887..062d00be 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -595,6 +595,68 @@ images that don't ship `/usr/share/zoneinfo`. ...)` calls in `cmd/root.go` — without those, viper unmarshals an empty config and the defaults function's values are ignored. +## Heartbeat (Periodic Wake-Up) + +The **Heartbeat** wakes the agent on a fixed interval to check for +pending work. It is a peer of the scheduler — both run inside the +`infer channels-manager` daemon, both spawn `infer agent` +subprocesses, but heartbeat is a single global tick (vs. many +user-defined cron jobs) and logs output (vs. routing to a channel). +Disabled by default. + +- Config struct: `config.HeartbeatConfig` in `config/heartbeat.go` +- Config file: `~/.infer/heartbeat.yaml` (separate file, mirrors + channels.yaml; `yaml:"-"` on `Config.Heartbeat`). +- System prompt: `cfg.Prompts.Agent.SystemPromptHeartbeat` in + `prompts.yaml` — separate from `system_prompt`/`system_prompt_plan`. +- Service: `internal/services/heartbeat/heartbeat.go` (`Service` + with `Start(ctx)` / `Stop(ctx)`, ticker-driven, no cron). +- Daemon wiring: `cmd/channels.go` `startHeartbeat()` next to + `startScheduler()`. +- Init wiring: `cmd/init.go` `createHeartbeatConfigFile()`. +- Env vars: `INFER_HEARTBEAT_*` applied via + `applyHeartbeatEnvOverrides` in `cmd/config.go`. + +### Heartbeat architecture + +```text +┌─ infer channels-manager (daemon) ─────────────────────────┐ +│ ChannelManagerService (channels — optional) │ +│ SchedulerService (cron jobs — optional) │ +│ HeartbeatService │ +│ ├─ time.Ticker(interval) │ +│ └─ on tick: spawn `infer agent --heartbeat │ +│ --session-id ` │ +│ log stdout │ +└────────────────────────────────────────────────────────────┘ +``` + +Key properties: + +- **Off by default.** `Heartbeat.Enabled = false` in + `DefaultHeartbeatConfig()`. +- **Daemon gate is relaxed.** `infer channels-manager` boots if + *any* of channels / scheduler / heartbeat is enabled. Heartbeat + alone is a valid run mode. +- **Fresh session per fire.** UUID-format session ID (not channel + prefixed); the Schedule tool's `resolveRouting` will refuse to + operate from a heartbeat run, which is intentional — heartbeat + should not directly create scheduled jobs without explicit + channel context. +- **Overlap guard.** `atomic.Int32` flag suppresses concurrent + ticks when the agent run takes longer than `interval`. Logs a + warning when skipped. +- **System prompt selection.** `infer agent --heartbeat` (cmd flag + added in `cmd/agent.go`) swaps `cfg.Prompts.Agent.SystemPrompt` + for `cfg.Prompts.Agent.SystemPromptHeartbeat` *before* the + service container is built. The agent service stays oblivious to + the new mode. +- **Output.** Agent stdout is logged via the standard logger. No + channel routing — if the user wants a channel notification, the + agent itself uses its tools to send one. + +See `docs/heartbeat.md` for the user-facing guide. + ## Plan Mode Plan mode (`AgentModePlan` in `internal/domain/state.go`) is a read-only diff --git a/README.md b/README.md index dc6c7d8f..0fbd51cb 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ An agentic command-line assistant that writes code, understands project context, - [Tool Approval System](#tool-approval-system) - [Shortcuts](#shortcuts) - [Channels (Remote Messaging)](#channels-remote-messaging) +- [Heartbeat (Periodic Wake-Up)](#heartbeat-periodic-wake-up) - [Global Flags](#global-flags) - [Examples](#examples) - [Development](#development) @@ -73,6 +74,8 @@ An agentic command-line assistant that writes code, understands project context, - **Remote Messaging Channels**: Control the agent from Telegram, WhatsApp, and other platforms via a pluggable channel system - [Learn more →](docs/channels.md) - **Scheduled Tasks**: Ask the agent (over Telegram, etc.) to run a prompt on a cron schedule and deliver the result back through the same channel - recurring ("send me a quote every morning") or one-off ("remind me at 6pm today") - [Learn more →](docs/scheduling.md) +- **Heartbeat (Periodic Wake-Up)**: Wake the agent on a fixed interval to check for pending todos and background work, + with a separate configurable system prompt - off by default - [Learn more →](docs/heartbeat.md) ## Installation @@ -1040,6 +1043,54 @@ database so this works on any base image. For the full guide, including the cron syntax primer and end-to-end Telegram walkthroughs, see [Scheduling Documentation](docs/scheduling.md). +## Heartbeat (Periodic Wake-Up) + +Heartbeat wakes the agent on a fixed interval — without any user input — +so it can check for pending todos, background tasks, or anything else +your system prompt tells it to monitor. It runs alongside the scheduler +inside the `infer channels-manager` daemon and is **disabled by default**. + +Unlike the [Schedule](docs/scheduling.md) tool (which the LLM uses to +create user-driven cron jobs that deliver to a channel), heartbeat is a +single global tick the operator configures once. Output goes to logs; +the agent itself decides whether to send a Telegram message, open a PR, +or just no-op. + +Enable in `.infer/heartbeat.yaml` (seeded by `infer init`): + +```yaml +--- +enabled: true +interval: 1h # Go duration: 30s, 5m, 1h, 24h +initial_delay: 1m # delay before first tick +model: "" # optional override; empty = agent.model +prompt: "Heartbeat tick — check for any pending tasks, todos, or background work and act on them." +``` + +The **system prompt** for heartbeat runs lives in `.infer/prompts.yaml` +under `agent.system_prompt_heartbeat` so you can tune the agent's +wake-up behaviour separately from chat-mode behaviour. + +Then start the daemon: + +```bash +infer channels-manager +``` + +Heartbeat alone is a valid run mode — you don't need any channel +enabled to use it. The daemon hosts whichever of channels / scheduler / +heartbeat are turned on. + +Or via env vars: + +```bash +export INFER_HEARTBEAT_ENABLED=true +export INFER_HEARTBEAT_INTERVAL=30m +``` + +For the full guide, including configuration reference and common +patterns (TODO sweeps, CI watchdogs), see [Heartbeat Documentation](docs/heartbeat.md). + ## Global Flags - `-v, --verbose`: Enable verbose output diff --git a/cmd/agent.go b/cmd/agent.go index f7bb32bf..33a62082 100644 --- a/cmd/agent.go +++ b/cmd/agent.go @@ -52,7 +52,8 @@ Examples: noSave, _ := cmd.Flags().GetBool("no-save") sessionID, _ := cmd.Flags().GetString("session-id") requireApproval, _ := cmd.Flags().GetBool("require-approval") - return RunAgentCommand(Cfg, model, args[0], files, noSave, sessionID, requireApproval) + heartbeat, _ := cmd.Flags().GetBool("heartbeat") + return RunAgentCommand(Cfg, model, args[0], files, noSave, sessionID, requireApproval, heartbeat) }, } @@ -91,7 +92,7 @@ type AgentSession struct { approvalCh chan domain.ApprovalResponse } -func RunAgentCommand(cfg *config.Config, modelFlag, taskDescription string, files []string, noSave bool, sessionID string, requireApproval bool) (err error) { +func RunAgentCommand(cfg *config.Config, modelFlag, taskDescription string, files []string, noSave bool, sessionID string, requireApproval, heartbeat bool) (err error) { defer func() { if r := recover(); r != nil { outputAgentError(fmt.Sprintf("agent panic: %v", r)) @@ -103,6 +104,10 @@ func RunAgentCommand(cfg *config.Config, modelFlag, taskDescription string, file } }() + if heartbeat && cfg.Prompts.Agent.SystemPromptHeartbeat != "" { + cfg.Prompts.Agent.SystemPrompt = cfg.Prompts.Agent.SystemPromptHeartbeat + } + svc := container.NewServiceContainer(cfg) defer func() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -1116,5 +1121,6 @@ func init() { agentCmd.Flags().Bool("no-save", false, "Disable saving conversation to database") agentCmd.Flags().String("session-id", "", "Resume an existing agent session by conversation ID") agentCmd.Flags().Bool("require-approval", false, "Enable IPC-based tool approval via stdin/stdout (used by channel manager)") + agentCmd.Flags().Bool("heartbeat", false, "Run with the heartbeat system prompt (used by the heartbeat service)") rootCmd.AddCommand(agentCmd) } diff --git a/cmd/channels.go b/cmd/channels.go index ca7c9815..c01ff4d9 100644 --- a/cmd/channels.go +++ b/cmd/channels.go @@ -13,6 +13,7 @@ import ( logger "github.com/inference-gateway/cli/internal/logger" services "github.com/inference-gateway/cli/internal/services" channels "github.com/inference-gateway/cli/internal/services/channels" + heartbeat "github.com/inference-gateway/cli/internal/services/heartbeat" scheduler "github.com/inference-gateway/cli/internal/services/scheduler" cobra "github.com/spf13/cobra" ) @@ -46,16 +47,21 @@ Examples: }, } -// RunChannelsCommand starts the channel listener daemon +// RunChannelsCommand starts the channel listener daemon. The daemon +// hosts up to three subsystems — channels, scheduler, and heartbeat — +// and starts whichever are enabled. At least one must be enabled or +// the daemon refuses to boot (otherwise it would just sleep forever). func RunChannelsCommand(cfg *config.Config) error { - if !cfg.Channels.Enabled { - return fmt.Errorf("channels are not enabled. Set enabled: true in .infer/channels.yaml or INFER_CHANNELS_ENABLED=true") + if !cfg.Channels.Enabled && !cfg.Tools.Schedule.Enabled && !cfg.Heartbeat.Enabled { + return fmt.Errorf("nothing to run: enable at least one of channels, scheduler, or heartbeat in .infer/") } cm := services.NewChannelManagerService(cfg.Channels) - if err := registerChannels(cm, cfg); err != nil { - return err + if cfg.Channels.Enabled { + if err := registerChannels(cm, cfg); err != nil { + return err + } } logger.Info("Starting channels-manager", @@ -63,7 +69,6 @@ func RunChannelsCommand(cfg *config.Config) error { "commit", commit, "build_date", date, ) - logger.Info("Starting channel listener...") ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -71,8 +76,11 @@ func RunChannelsCommand(cfg *config.Config) error { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - if err := cm.Start(ctx); err != nil { - return fmt.Errorf("failed to start channels: %w", err) + if cfg.Channels.Enabled { + logger.Info("Starting channel listener...") + if err := cm.Start(ctx); err != nil { + return fmt.Errorf("failed to start channels: %w", err) + } } sched, err := startScheduler(ctx, cm, cfg) @@ -81,12 +89,31 @@ func RunChannelsCommand(cfg *config.Config) error { return fmt.Errorf("failed to start scheduler: %w", err) } - logger.Info("Listening for messages. Press Ctrl+C to stop.") + hb, err := startHeartbeat(ctx, cfg) + if err != nil { + if sched != nil { + stopCtx, stopCancel := context.WithTimeout(context.Background(), 30*time.Second) + _ = sched.Stop(stopCtx) + stopCancel() + } + _ = cm.Stop() + return fmt.Errorf("failed to start heartbeat: %w", err) + } + + logger.Info("Daemon ready. Press Ctrl+C to stop.") <-sigChan - logger.Info("Shutting down channels...") + logger.Info("Shutting down...") cancel() + if hb != nil { + stopCtx, stopCancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := hb.Stop(stopCtx); err != nil { + logger.Error("Failed to stop heartbeat", "error", err) + } + stopCancel() + } + if sched != nil { stopCtx, stopCancel := context.WithTimeout(context.Background(), 30*time.Second) if err := sched.Stop(stopCtx); err != nil { @@ -99,7 +126,7 @@ func RunChannelsCommand(cfg *config.Config) error { return fmt.Errorf("failed to stop channels: %w", err) } - logger.Info("Channels stopped.") + logger.Info("Daemon stopped.") return nil } @@ -134,6 +161,50 @@ func startScheduler(ctx context.Context, cm *services.ChannelManagerService, cfg return svc, nil } +// startHeartbeat initialises the heartbeat service when enabled. +// Returns nil service when disabled. Parses interval/initial_delay as +// time.Duration strings and surfaces parse errors so the daemon fails +// fast on bad config. +func startHeartbeat(ctx context.Context, cfg *config.Config) (*heartbeat.Service, error) { + if !cfg.Heartbeat.Enabled { + return nil, nil + } + + interval, err := time.ParseDuration(cfg.Heartbeat.Interval) + if err != nil { + return nil, fmt.Errorf("parse heartbeat.interval %q: %w", cfg.Heartbeat.Interval, err) + } + + var initialDelay time.Duration + if cfg.Heartbeat.InitialDelay != "" { + initialDelay, err = time.ParseDuration(cfg.Heartbeat.InitialDelay) + if err != nil { + return nil, fmt.Errorf("parse heartbeat.initial_delay %q: %w", cfg.Heartbeat.InitialDelay, err) + } + } + + prompt := cfg.Heartbeat.Prompt + if prompt == "" { + prompt = config.DefaultHeartbeatConfig().Prompt + } + + svc, err := heartbeat.NewService(heartbeat.Options{ + Config: heartbeat.Config{ + Interval: interval, + InitialDelay: initialDelay, + Model: cfg.Heartbeat.Model, + Prompt: prompt, + }, + }) + if err != nil { + return nil, err + } + if err := svc.Start(ctx); err != nil { + return nil, err + } + return svc, nil +} + // registerChannels registers enabled channel implementations with the manager func registerChannels(cm *services.ChannelManagerService, cfg *config.Config) error { registered := 0 diff --git a/cmd/config.go b/cmd/config.go index 4d501a0f..013ddecc 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -606,6 +606,27 @@ func getEffectiveChannelsConfigPath() string { return config.DefaultChannelsPath } +// getEffectiveHeartbeatConfigPath returns the path to the heartbeat config file +// Searches in this order: 1) project .infer/heartbeat.yaml, 2) user home ~/.infer/heartbeat.yaml +func getEffectiveHeartbeatConfigPath() string { + searchPaths := []string{ + config.DefaultHeartbeatPath, + } + + if homeDir, err := os.UserHomeDir(); err == nil { + homePath := filepath.Join(homeDir, config.ConfigDirName, config.HeartbeatFileName) + searchPaths = append(searchPaths, homePath) + } + + for _, path := range searchPaths { + if _, err := os.Stat(path); err == nil { + return path + } + } + + return config.DefaultHeartbeatPath +} + // getEffectiveComputerUseConfigPath returns the path to the computer_use config file // Searches in this order: 1) project .infer/computer_use.yaml, 2) user home ~/.infer/computer_use.yaml func getEffectiveComputerUseConfigPath() string { @@ -709,6 +730,15 @@ func loadConfigFromViper() (*config.Config, error) { cfg.Channels = *channelsCfg applyChannelsEnvOverrides(cfg) + heartbeatPath := getEffectiveHeartbeatConfigPath() + heartbeatCfg, err := config.LoadHeartbeat(heartbeatPath) + if err != nil { + logger.Warn("Failed to load heartbeat config, using defaults", "error", err, "path", heartbeatPath) + heartbeatCfg = config.DefaultHeartbeatConfig() + } + cfg.Heartbeat = *heartbeatCfg + applyHeartbeatEnvOverrides(cfg) + cuPath := getEffectiveComputerUseConfigPath() cuCfg, err := config.LoadComputerUse(cuPath) if err != nil { @@ -729,6 +759,7 @@ func applyPromptsEnvOverrides(cfg *config.Config) { "INFER_PROMPTS_AGENT_SYSTEM_PROMPT": &cfg.Prompts.Agent.SystemPrompt, "INFER_PROMPTS_AGENT_SYSTEM_PROMPT_PLAN": &cfg.Prompts.Agent.SystemPromptPlan, "INFER_PROMPTS_AGENT_SYSTEM_PROMPT_REMOTE": &cfg.Prompts.Agent.SystemPromptRemote, + "INFER_PROMPTS_AGENT_SYSTEM_PROMPT_HEARTBEAT": &cfg.Prompts.Agent.SystemPromptHeartbeat, "INFER_PROMPTS_AGENT_CUSTOM_INSTRUCTIONS": &cfg.Prompts.Agent.CustomInstructions, "INFER_PROMPTS_AGENT_SYSTEM_REMINDERS_REMINDER_TEXT": &cfg.Prompts.Agent.SystemReminders.ReminderText, "INFER_PROMPTS_GIT_COMMIT_MESSAGE_SYSTEM_PROMPT": &cfg.Prompts.Git.CommitMessage.SystemPrompt, @@ -900,6 +931,35 @@ func applyChannelsEnvOverrides(cfg *config.Config) { setStringSlice("INFER_CHANNELS_WHATSAPP_ALLOWED_USERS", &cfg.Channels.WhatsApp.AllowedUsers) } +// applyHeartbeatEnvOverrides applies INFER_HEARTBEAT_* env vars onto +// the in-memory heartbeat config. Run AFTER LoadHeartbeat so envs win +// over heartbeat.yaml. The heartbeat config lives in its own file +// (yaml:"-" mapstructure:"-" on Config.Heartbeat), so viper does not +// bind these env vars itself — this function is the single source of +// env-var support. Mirrors applyChannelsEnvOverrides. +func applyHeartbeatEnvOverrides(cfg *config.Config) { + setBool := func(env string, target *bool) { + val, ok := os.LookupEnv(env) + if !ok { + return + } + if b, err := strconv.ParseBool(strings.TrimSpace(val)); err == nil { + *target = b + } + } + setString := func(env string, target *string) { + if val, ok := os.LookupEnv(env); ok { + *target = val + } + } + + setBool("INFER_HEARTBEAT_ENABLED", &cfg.Heartbeat.Enabled) + setString("INFER_HEARTBEAT_INTERVAL", &cfg.Heartbeat.Interval) + setString("INFER_HEARTBEAT_INITIAL_DELAY", &cfg.Heartbeat.InitialDelay) + setString("INFER_HEARTBEAT_MODEL", &cfg.Heartbeat.Model) + setString("INFER_HEARTBEAT_PROMPT", &cfg.Heartbeat.Prompt) +} + // applyComputerUseEnvOverrides applies INFER_COMPUTER_USE_* env vars onto // the in-memory computer_use config. Run AFTER LoadComputerUse so envs win // over computer_use.yaml. The computer_use config now lives in its own diff --git a/cmd/init.go b/cmd/init.go index b7fdf06a..f74d36e7 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -39,7 +39,7 @@ func initializeProject(cmd *cobra.Command) error { //nolint:funlen userspace, _ := cmd.Flags().GetBool("userspace") skipMigrations, _ := cmd.Flags().GetBool("skip-migrations") - var configPath, gitignorePath, scmShortcutsPath, gitShortcutsPath, mcpShortcutsPath, shellsShortcutsPath, exportShortcutsPath, a2aShortcutsPath, mcpPath, keybindingsPath, promptsPath, channelsPath, computerUsePath, agentsPath string + var configPath, gitignorePath, scmShortcutsPath, gitShortcutsPath, mcpShortcutsPath, shellsShortcutsPath, exportShortcutsPath, a2aShortcutsPath, mcpPath, keybindingsPath, promptsPath, channelsPath, heartbeatPath, computerUsePath, agentsPath string if userspace { homeDir, err := os.UserHomeDir() @@ -58,6 +58,7 @@ func initializeProject(cmd *cobra.Command) error { //nolint:funlen keybindingsPath = filepath.Join(homeDir, config.ConfigDirName, config.KeybindingsFileName) promptsPath = filepath.Join(homeDir, config.ConfigDirName, config.PromptsFileName) channelsPath = filepath.Join(homeDir, config.ConfigDirName, config.ChannelsFileName) + heartbeatPath = filepath.Join(homeDir, config.ConfigDirName, config.HeartbeatFileName) computerUsePath = filepath.Join(homeDir, config.ConfigDirName, config.ComputerUseFileName) agentsPath = filepath.Join(homeDir, config.ConfigDirName, config.AgentsFileName) } else { @@ -73,12 +74,13 @@ func initializeProject(cmd *cobra.Command) error { //nolint:funlen keybindingsPath = config.DefaultKeybindingsPath promptsPath = config.DefaultPromptsPath channelsPath = config.DefaultChannelsPath + heartbeatPath = config.DefaultHeartbeatPath computerUsePath = config.DefaultComputerUsePath agentsPath = config.DefaultAgentsPath } if !overwrite { - if err := validateFilesNotExist(configPath, gitignorePath, scmShortcutsPath, gitShortcutsPath, mcpShortcutsPath, shellsShortcutsPath, exportShortcutsPath, a2aShortcutsPath, mcpPath, keybindingsPath, promptsPath, channelsPath, computerUsePath, agentsPath); err != nil { + if err := validateFilesNotExist(configPath, gitignorePath, scmShortcutsPath, gitShortcutsPath, mcpShortcutsPath, shellsShortcutsPath, exportShortcutsPath, a2aShortcutsPath, mcpPath, keybindingsPath, promptsPath, channelsPath, heartbeatPath, computerUsePath, agentsPath); err != nil { return err } } @@ -143,6 +145,10 @@ plans/ return fmt.Errorf("failed to create channels config file: %w", err) } + if err := createHeartbeatConfigFile(heartbeatPath); err != nil { + return fmt.Errorf("failed to create heartbeat config file: %w", err) + } + cuMigrated, err := createComputerUseConfigFile(computerUsePath) if err != nil { return fmt.Errorf("failed to create computer_use config file: %w", err) @@ -172,6 +178,7 @@ plans/ fmt.Printf(" Created: %s\n", keybindingsPath) fmt.Printf(" Created: %s\n", promptsPath) fmt.Printf(" Created: %s\n", channelsPath) + fmt.Printf(" Created: %s\n", heartbeatPath) fmt.Printf(" Created: %s\n", computerUsePath) fmt.Printf(" Created: %s\n", agentsPath) if migrated { @@ -462,6 +469,17 @@ func createChannelsConfigFile(path string) (bool, error) { return migrated, nil } +// createHeartbeatConfigFile writes a fresh heartbeat.yaml seeded from +// the in-code defaults (disabled, hourly interval). Heartbeat is a new +// feature with no legacy config block to migrate, so this is the +// simpler one-step "create from defaults" pattern. +func createHeartbeatConfigFile(path string) error { + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return fmt.Errorf("failed to create config directory: %w", err) + } + return config.SaveHeartbeat(path, config.DefaultHeartbeatConfig()) +} + // createComputerUseConfigFile writes a fresh computer_use.yaml. Returns // true when the file was seeded from a legacy `computer_use:` block found // in viper (i.e. migrated from config.yaml) rather than from in-code diff --git a/cmd/init_test.go b/cmd/init_test.go index cfb99598..0cfec39d 100644 --- a/cmd/init_test.go +++ b/cmd/init_test.go @@ -27,7 +27,7 @@ func TestInitializeProject(t *testing.T) { "userspace": false, "skip-migrations": true, }, - wantFiles: []string{".infer/config.yaml", ".infer/.gitignore", ".infer/computer_use.yaml"}, + wantFiles: []string{".infer/config.yaml", ".infer/.gitignore", ".infer/computer_use.yaml", ".infer/heartbeat.yaml"}, wantNoFiles: []string{"AGENTS.md"}, wantErr: false, }, diff --git a/cmd/prompts_load_test.go b/cmd/prompts_load_test.go index fe3c15a1..6178edd5 100644 --- a/cmd/prompts_load_test.go +++ b/cmd/prompts_load_test.go @@ -23,6 +23,8 @@ func TestLoadConfigFromViper_PromptsDefaultsWhenFileAbsent(t *testing.T) { require.Equal(t, defaults.Agent.SystemPrompt, cfg.Prompts.Agent.SystemPrompt) require.Equal(t, defaults.Agent.SystemPromptPlan, cfg.Prompts.Agent.SystemPromptPlan) require.Equal(t, defaults.Agent.SystemPromptRemote, cfg.Prompts.Agent.SystemPromptRemote) + require.Equal(t, defaults.Agent.SystemPromptHeartbeat, cfg.Prompts.Agent.SystemPromptHeartbeat) + require.NotEmpty(t, cfg.Prompts.Agent.SystemPromptHeartbeat, "heartbeat prompt must have a non-empty default") require.Equal(t, defaults.Agent.SystemReminders.ReminderText, cfg.Prompts.Agent.SystemReminders.ReminderText) require.Equal(t, defaults.Git.CommitMessage.SystemPrompt, cfg.Prompts.Git.CommitMessage.SystemPrompt) require.Equal(t, defaults.Conversation.TitleGeneration.SystemPrompt, cfg.Prompts.Conversation.TitleGeneration.SystemPrompt) @@ -52,6 +54,7 @@ func TestLoadConfigFromViper_PromptsPartialFileFallsBackForUnsetFields(t *testin defaults := config.DefaultPromptsConfig() require.Equal(t, "USER OVERRIDE: only this is set", cfg.Prompts.Agent.SystemPrompt) require.Equal(t, defaults.Agent.SystemPromptPlan, cfg.Prompts.Agent.SystemPromptPlan, "unset plan prompt should fall back to default") + require.Equal(t, defaults.Agent.SystemPromptHeartbeat, cfg.Prompts.Agent.SystemPromptHeartbeat, "unset heartbeat prompt should fall back to default") require.Equal(t, defaults.Git.CommitMessage.SystemPrompt, cfg.Prompts.Git.CommitMessage.SystemPrompt, "unset git prompt should fall back to default") require.Equal(t, defaults.Init.Prompt, cfg.Prompts.Init.Prompt, "unset init prompt should fall back to default") } diff --git a/config/config.go b/config/config.go index cfd2cafa..0137057a 100644 --- a/config/config.go +++ b/config/config.go @@ -43,6 +43,7 @@ type Config struct { Web WebConfig `yaml:"web" mapstructure:"web"` ComputerUse ComputerUseConfig `yaml:"-" mapstructure:"-"` Channels ChannelsConfig `yaml:"-" mapstructure:"-"` + Heartbeat HeartbeatConfig `yaml:"-" mapstructure:"-"` Prompts PromptsConfig `yaml:"-" mapstructure:"-"` configDir string } @@ -599,6 +600,7 @@ func DefaultConfig() *Config { //nolint:funlen ConfigDirName + "/keybindings.yaml", ConfigDirName + "/prompts.yaml", ConfigDirName + "/channels.yaml", + ConfigDirName + "/heartbeat.yaml", ConfigDirName + "/computer_use.yaml", ".git/", "*.env", diff --git a/config/heartbeat.go b/config/heartbeat.go new file mode 100644 index 00000000..f5e3ca95 --- /dev/null +++ b/config/heartbeat.go @@ -0,0 +1,54 @@ +package config + +import ( + utils "github.com/inference-gateway/cli/config/utils" +) + +const ( + HeartbeatFileName = "heartbeat.yaml" + DefaultHeartbeatPath = ConfigDirName + "/" + HeartbeatFileName +) + +// HeartbeatConfig configures the periodic "wake-up" tick that spawns +// `infer agent` with a tailored system prompt so the agent can check +// for pending work without waiting for user input. Disabled by default. +// +// The companion system prompt lives at +// PromptsConfig.Agent.SystemPromptHeartbeat in prompts.yaml — keeping +// runtime knobs (interval, model) here separate from prompt text. +type HeartbeatConfig struct { + Enabled bool `yaml:"enabled" mapstructure:"enabled"` + Interval string `yaml:"interval" mapstructure:"interval"` + InitialDelay string `yaml:"initial_delay" mapstructure:"initial_delay"` + Model string `yaml:"model" mapstructure:"model"` + Prompt string `yaml:"prompt" mapstructure:"prompt"` +} + +// DefaultHeartbeatConfig returns the in-code default heartbeat +// configuration used when no heartbeat.yaml file exists. `infer init` +// seeds the file from this and the runtime falls back to it when the +// file is absent. +func DefaultHeartbeatConfig() *HeartbeatConfig { + return &HeartbeatConfig{ + Enabled: false, + Interval: "1h", + InitialDelay: "1m", + Model: "", + Prompt: "Heartbeat tick — check for any pending tasks, todos, or background work and act on them.", + } +} + +// LoadHeartbeat reads heartbeat.yaml from disk. When the file is +// missing it returns the in-code defaults so callers can treat absence +// as "use defaults" without special-casing. The file body is run +// through os.ExpandEnv so `${VAR}`-style references resolve from the +// environment. +func LoadHeartbeat(path string) (*HeartbeatConfig, error) { + return utils.LoadYAML(path, "heartbeat", DefaultHeartbeatConfig) +} + +// SaveHeartbeat writes the heartbeat configuration to disk, creating +// any missing parent directories. +func SaveHeartbeat(path string, cfg *HeartbeatConfig) error { + return utils.SaveYAML(path, "heartbeat", cfg) +} diff --git a/config/heartbeat_test.go b/config/heartbeat_test.go new file mode 100644 index 00000000..0e886696 --- /dev/null +++ b/config/heartbeat_test.go @@ -0,0 +1,171 @@ +package config_test + +import ( + "os" + "path/filepath" + "testing" + + config "github.com/inference-gateway/cli/config" +) + +func TestHeartbeatConstants(t *testing.T) { + if config.HeartbeatFileName != "heartbeat.yaml" { + t.Errorf("Expected HeartbeatFileName 'heartbeat.yaml', got %q", config.HeartbeatFileName) + } + expectedPath := config.ConfigDirName + "/" + config.HeartbeatFileName + if config.DefaultHeartbeatPath != expectedPath { + t.Errorf("Expected DefaultHeartbeatPath %q, got %q", expectedPath, config.DefaultHeartbeatPath) + } +} + +func TestDefaultHeartbeatConfig(t *testing.T) { + cfg := config.DefaultHeartbeatConfig() + if cfg == nil { + t.Fatal("DefaultHeartbeatConfig() returned nil") + } + if cfg.Enabled { + t.Error("Expected Enabled to be false by default — heartbeat is opt-in") + } + if cfg.Interval != "1h" { + t.Errorf("Expected Interval='1h', got %q", cfg.Interval) + } + if cfg.InitialDelay != "1m" { + t.Errorf("Expected InitialDelay='1m', got %q", cfg.InitialDelay) + } + if cfg.Model != "" { + t.Errorf("Expected empty Model (falls back to agent.model), got %q", cfg.Model) + } + if cfg.Prompt == "" { + t.Error("Expected non-empty default Prompt") + } +} + +func TestLoadHeartbeat_NonExistentFile(t *testing.T) { + tempDir := t.TempDir() + path := filepath.Join(tempDir, "non-existent.yaml") + + cfg, err := config.LoadHeartbeat(path) + if err != nil { + t.Fatalf("LoadHeartbeat() should not error for missing file, got: %v", err) + } + if cfg == nil { + t.Fatal("LoadHeartbeat() returned nil") + } + defaults := config.DefaultHeartbeatConfig() + if cfg.Enabled != defaults.Enabled || cfg.Interval != defaults.Interval { + t.Errorf("Expected defaults, got %+v", cfg) + } +} + +func TestLoadHeartbeat_ValidYAML(t *testing.T) { + tempDir := t.TempDir() + path := filepath.Join(tempDir, "heartbeat.yaml") + + yamlContent := `--- +enabled: true +interval: 30s +initial_delay: 5s +model: openai/gpt-4 +prompt: "Custom heartbeat prompt" +` + if err := os.WriteFile(path, []byte(yamlContent), 0644); err != nil { + t.Fatalf("Failed to write yaml: %v", err) + } + + cfg, err := config.LoadHeartbeat(path) + if err != nil { + t.Fatalf("LoadHeartbeat() failed: %v", err) + } + if !cfg.Enabled { + t.Error("Expected Enabled true") + } + if cfg.Interval != "30s" { + t.Errorf("Expected Interval='30s', got %q", cfg.Interval) + } + if cfg.InitialDelay != "5s" { + t.Errorf("Expected InitialDelay='5s', got %q", cfg.InitialDelay) + } + if cfg.Model != "openai/gpt-4" { + t.Errorf("Expected Model='openai/gpt-4', got %q", cfg.Model) + } + if cfg.Prompt != "Custom heartbeat prompt" { + t.Errorf("Expected custom prompt, got %q", cfg.Prompt) + } +} + +func TestLoadHeartbeat_EnvironmentVariableExpansion(t *testing.T) { + tempDir := t.TempDir() + path := filepath.Join(tempDir, "heartbeat.yaml") + + t.Setenv("TEST_HEARTBEAT_MODEL", "expanded/model") + + yamlContent := `--- +enabled: true +interval: "10s" +model: "${TEST_HEARTBEAT_MODEL}" +` + if err := os.WriteFile(path, []byte(yamlContent), 0644); err != nil { + t.Fatalf("Failed to write yaml: %v", err) + } + + cfg, err := config.LoadHeartbeat(path) + if err != nil { + t.Fatalf("LoadHeartbeat() failed: %v", err) + } + if cfg.Model != "expanded/model" { + t.Errorf("Expected expanded model 'expanded/model', got %q", cfg.Model) + } +} + +func TestLoadHeartbeat_InvalidYAML(t *testing.T) { + tempDir := t.TempDir() + path := filepath.Join(tempDir, "heartbeat.yaml") + if err := os.WriteFile(path, []byte("not: valid: yaml: ["), 0644); err != nil { + t.Fatalf("Failed to write yaml: %v", err) + } + + if _, err := config.LoadHeartbeat(path); err == nil { + t.Fatal("Expected error from invalid YAML, got nil") + } +} + +func TestSaveHeartbeat_RoundTrip(t *testing.T) { + tempDir := t.TempDir() + path := filepath.Join(tempDir, "subdir", "heartbeat.yaml") + + cfg := &config.HeartbeatConfig{ + Enabled: true, + Interval: "15m", + InitialDelay: "30s", + Model: "openai/gpt-4", + Prompt: "test prompt", + } + + if err := config.SaveHeartbeat(path, cfg); err != nil { + t.Fatalf("SaveHeartbeat() failed: %v", err) + } + if _, err := os.Stat(path); os.IsNotExist(err) { + t.Fatal("File was not created") + } + + loaded, err := config.LoadHeartbeat(path) + if err != nil { + t.Fatalf("LoadHeartbeat() failed: %v", err) + } + if loaded.Enabled != cfg.Enabled || loaded.Interval != cfg.Interval || + loaded.InitialDelay != cfg.InitialDelay || loaded.Model != cfg.Model || + loaded.Prompt != cfg.Prompt { + t.Errorf("Round-trip mismatch: got %+v, want %+v", loaded, cfg) + } +} + +func TestSaveHeartbeat_CreatesParentDirectory(t *testing.T) { + tempDir := t.TempDir() + path := filepath.Join(tempDir, "deeply", "nested", "heartbeat.yaml") + if err := config.SaveHeartbeat(path, config.DefaultHeartbeatConfig()); err != nil { + t.Fatalf("SaveHeartbeat() failed: %v", err) + } + if _, err := os.Stat(path); err != nil { + t.Fatalf("File not created at nested path: %v", err) + } +} diff --git a/config/prompts.go b/config/prompts.go index af9bfe0b..713f0a27 100644 --- a/config/prompts.go +++ b/config/prompts.go @@ -38,6 +38,9 @@ func mergePromptDefaults(loaded, defaults *PromptsConfig) { if loaded.Agent.SystemPromptRemote == "" { loaded.Agent.SystemPromptRemote = defaults.Agent.SystemPromptRemote } + if loaded.Agent.SystemPromptHeartbeat == "" { + loaded.Agent.SystemPromptHeartbeat = defaults.Agent.SystemPromptHeartbeat + } if loaded.Agent.SystemReminders.ReminderText == "" { loaded.Agent.SystemReminders.ReminderText = defaults.Agent.SystemReminders.ReminderText } @@ -110,11 +113,12 @@ type PromptsConfig struct { } type PromptsAgentConfig struct { - SystemPrompt string `yaml:"system_prompt" mapstructure:"system_prompt"` - SystemPromptPlan string `yaml:"system_prompt_plan" mapstructure:"system_prompt_plan"` - SystemPromptRemote string `yaml:"system_prompt_remote" mapstructure:"system_prompt_remote"` - CustomInstructions string `yaml:"custom_instructions" mapstructure:"custom_instructions"` - SystemReminders PromptsAgentRemindersConfig `yaml:"system_reminders" mapstructure:"system_reminders"` + SystemPrompt string `yaml:"system_prompt" mapstructure:"system_prompt"` + SystemPromptPlan string `yaml:"system_prompt_plan" mapstructure:"system_prompt_plan"` + SystemPromptRemote string `yaml:"system_prompt_remote" mapstructure:"system_prompt_remote"` + SystemPromptHeartbeat string `yaml:"system_prompt_heartbeat" mapstructure:"system_prompt_heartbeat"` + CustomInstructions string `yaml:"custom_instructions" mapstructure:"custom_instructions"` + SystemReminders PromptsAgentRemindersConfig `yaml:"system_reminders" mapstructure:"system_reminders"` } type PromptsAgentRemindersConfig struct { @@ -289,6 +293,24 @@ Why prefer direct tools: - Works over SSH without X11 forwarding - Precise output (structured data, not visual interpretation) - Lower resource usage (critical for remote systems)`, + SystemPromptHeartbeat: `You are an autonomous agent that has just been woken up by a periodic heartbeat tick. + +PURPOSE: Self-driven progress checks. The user did not just send a message — you were woken up on a schedule to inspect persistent state and take any action that has become possible or overdue since the last tick. + +WHAT TO CHECK (in order): +1. Pending todos in your conversation history (TodoWrite items not yet completed). +2. Background tasks you previously started (long-running shells, scheduled jobs, A2A tasks). +3. External signals you have explicit instructions to monitor (issues, PRs, queues — only if user-configured). + +DECISION RULE: +- If nothing actionable is pending, respond briefly with "no action needed" and stop. Do NOT invent work. +- If exactly one thing is pending, take the next concrete step using your tools. +- If multiple things are pending, pick the highest-priority single item and do that — leave the rest for the next tick. + +CONSTRAINTS: +- You run autonomously without human approval. Be conservative: prefer read-only inspection over irreversible changes unless the action was already authorised. +- Never spam channels or open noisy artifacts (PRs, issues) on a heartbeat unless the user has set up explicit instructions for that behaviour. +- Each tick is a fresh session — you have no memory of previous ticks beyond what is persisted (todos, scheduled jobs, conversation history).`, CustomInstructions: ``, SystemReminders: PromptsAgentRemindersConfig{ Enabled: true, diff --git a/config/prompts_test.go b/config/prompts_test.go index 104bdad7..4e3701de 100644 --- a/config/prompts_test.go +++ b/config/prompts_test.go @@ -19,6 +19,7 @@ func TestDefaultPromptsConfig_AllPromptsPopulated(t *testing.T) { "agent.system_prompt": cfg.Agent.SystemPrompt, "agent.system_prompt_plan": cfg.Agent.SystemPromptPlan, "agent.system_prompt_remote": cfg.Agent.SystemPromptRemote, + "agent.system_prompt_heartbeat": cfg.Agent.SystemPromptHeartbeat, "agent.system_reminders.reminder_text": cfg.Agent.SystemReminders.ReminderText, "git.commit_message.system_prompt": cfg.Git.CommitMessage.SystemPrompt, "conversation.title_generation.system_prompt": cfg.Conversation.TitleGeneration.SystemPrompt, diff --git a/docs/heartbeat.md b/docs/heartbeat.md new file mode 100644 index 00000000..d9554675 --- /dev/null +++ b/docs/heartbeat.md @@ -0,0 +1,201 @@ +# Heartbeat Guide + +[← Back to README](../README.md) + +The **Heartbeat** wakes the agent on a fixed interval to check for +pending work without waiting for user input. Unlike the +[Schedule](./scheduling.md) tool — which lets the LLM create +user-driven cron jobs that deliver output to a channel — the +heartbeat is a global self-driven tick the operator configures once. + +It exists for autonomy use cases: "every hour, look at my todos and +make progress on whatever is next." The agent decides what to do at +each tick using a tailored system prompt. + +## How it works + +```text +┌───────────────────────────────────────────────────────────────┐ +│ infer channels-manager (long-running daemon) │ +│ │ +│ ├─ ChannelManagerService (channels — optional) │ +│ ├─ SchedulerService (cron jobs — optional) │ +│ └─ HeartbeatService │ +│ ├─ time.Ticker(interval) │ +│ └─ on tick: spawn `infer agent --heartbeat │ +│ --session-id `│ +│ log stdout │ +└───────────────────────────────────────────────────────────────┘ + ▲ + │ reads on startup + ┌────────────┴────────────────┐ + │ .infer/heartbeat.yaml │ + │ .infer/prompts.yaml │ + │ (system_prompt_heartbeat) │ + └─────────────────────────────┘ +``` + +Key properties: + +- **Off by default.** Heartbeat is opt-in via the `enabled` flag in + `heartbeat.yaml`. +- **Single global instance.** One interval, one prompt — for + multi-job use cases use the [Schedule tool](./scheduling.md) + instead. +- **Daemon-bound.** Heartbeat only fires while `infer + channels-manager` is running. If the daemon is down, ticks are + skipped (no replay). +- **Fresh session per tick.** Each tick gets a new UUID session ID; + no context carries between fires. The agent inspects persistent + state (todos, scheduled jobs, conversation history) to decide what + to do. +- **Overlap-safe.** If a tick takes longer than the interval, the + next tick is skipped rather than running concurrently. +- **Output to logs.** The agent's stdout is logged. Whatever + externally-visible action you want the agent to take (post to + Telegram, open a PR, run a job) it does via its own tools — the + heartbeat is just the trigger. + +## Setup + +### 1. Generate the config files + +```bash +infer init +``` + +This creates `.infer/heartbeat.yaml` (disabled by default) and +includes a `system_prompt_heartbeat` entry in `.infer/prompts.yaml`. + +### 2. Enable and tune the heartbeat + +Edit `.infer/heartbeat.yaml`: + +```yaml +--- +enabled: true +interval: 1h # how often to wake (Go duration: 30s, 5m, 1h, 24h) +initial_delay: 1m # delay before first tick (avoids fire-on-start) +model: "" # optional override; empty = agent.model from config.yaml +prompt: "Heartbeat tick — check for any pending tasks, todos, or background work and act on them." +``` + +The `prompt` field is the **user message** sent to the agent each +tick. The **system prompt** (the steering instructions) lives in +`.infer/prompts.yaml`: + +```yaml +agent: + system_prompt_heartbeat: | + You are an autonomous agent that has just been woken up by a + periodic heartbeat tick. + ... +``` + +The default heartbeat system prompt is conservative — it tells the +agent to check pending todos and background tasks, take at most one +concrete step per tick, and exit. Override it to fit your workflow. + +### 3. Run the daemon + +```bash +infer channels-manager +``` + +The daemon hosts up to three subsystems — channels, scheduler, and +heartbeat — and starts whichever are enabled. You can run with +heartbeat alone (no channels, no scheduler) if that is all you need. + +```text +INFO Starting channels-manager +INFO Heartbeat service started interval=1h0m0s initial_delay=1m0s +INFO Daemon ready. Press Ctrl+C to stop. +``` + +When a tick fires: + +```text +INFO Heartbeat tick — spawning agent session_id=… model= +INFO Heartbeat agent output session_id=… line={"role":"assistant","content":"…"} +INFO Heartbeat tick complete session_id=… +``` + +## Configuration reference + +### `.infer/heartbeat.yaml` + +| Field | Type | Default | Description | +| --------------- | --------------- | ----------- | ------------------------------------------------------------------------------------------ | +| `enabled` | bool | `false` | Feature flag. Heartbeat is opt-in. | +| `interval` | duration string | `"1h"` | How often to fire. Parsed via `time.ParseDuration` (e.g. `30s`, `5m`, `1h`, `24h`). | +| `initial_delay` | duration string | `"1m"` | Delay before the first tick after the daemon starts. Set to `"0s"` to fire immediately. | +| `model` | string | `""` | Optional model override. Empty falls back to `agent.model` in `config.yaml`. | +| `prompt` | string | (built-in) | The user message sent to the agent each tick. | + +### Environment variables + +Mirroring the file fields. Env vars win over `heartbeat.yaml`. + +| Variable | Maps to | +| --------------------------------------------- | ------------------------------------ | +| `INFER_HEARTBEAT_ENABLED` | `enabled` | +| `INFER_HEARTBEAT_INTERVAL` | `interval` | +| `INFER_HEARTBEAT_INITIAL_DELAY` | `initial_delay` | +| `INFER_HEARTBEAT_MODEL` | `model` | +| `INFER_HEARTBEAT_PROMPT` | `prompt` | +| `INFER_PROMPTS_AGENT_SYSTEM_PROMPT_HEARTBEAT` | the system prompt in `prompts.yaml` | + +## Common patterns + +### Hourly TODO sweep + +```yaml +# heartbeat.yaml +enabled: true +interval: 1h +prompt: "Sweep open todos. If any are stale (>24h with no progress), pick the highest-priority one and take the next concrete step." +``` + +### Build / CI watchdog + +```yaml +# heartbeat.yaml +enabled: true +interval: 15m +prompt: "Check the status of the latest GitHub Actions run on this repo. If it failed, summarise the failure in the conversation log." +``` + +System prompt should give the agent permission to call the `Github` +tool but warn against opening issues automatically. + +## Troubleshooting + +**Heartbeat never fires** — confirm `enabled: true` and that +`infer channels-manager` is running. The daemon logs `Heartbeat +service started` on boot when it picks up the config. + +**Heartbeat fires too often / not enough** — check `interval` +parses correctly. Bad durations (`1H`, `30 minutes`) cause the +daemon to fail-fast on startup with a clear error. + +**Agent does nothing** — heartbeat works; the agent decided no +action was needed. Check the system prompt — the default explicitly +tells it to no-op when nothing is pending. + +**"Heartbeat tick skipped — previous run still in flight"** — your +agent is taking longer than `interval` to complete. Either increase +the interval, simplify the prompt, or set a tighter `agent.max_turns` +in `config.yaml`. + +## Comparison with the Schedule tool + +| | Heartbeat | Schedule tool | +| -------------- | ---------------------------- | ------------------------------------ | +| Configured by | Operator (yaml file) | LLM (via tool calls) | +| Multiplicity | Single global tick | Many user-defined jobs | +| Trigger | Fixed interval (Go duration) | Cron expression (per job) | +| Output | Logs | A messaging channel (Telegram, …) | +| Use case | Autonomous self-monitoring | "Remind me at 8am tomorrow" | + +The two are complementary and can run side by side in the same +daemon. diff --git a/internal/services/heartbeat/heartbeat.go b/internal/services/heartbeat/heartbeat.go new file mode 100644 index 00000000..3fb5a368 --- /dev/null +++ b/internal/services/heartbeat/heartbeat.go @@ -0,0 +1,234 @@ +// Package heartbeat implements a periodic "wake-up" service that +// spawns the agent on a fixed interval to check for pending work. It +// is hosted by the channels-manager daemon (peer to the scheduler) +// and is disabled by default. +// +// Unlike the scheduler, heartbeat does not route output to a channel. +// Each tick fires `infer agent --heartbeat`, the agent runs to +// completion using a tailored system prompt, and the agent's stdout +// is logged. Whatever externally-visible action the agent takes (e.g. +// posting to Telegram, opening a PR) it does via its own tools. +package heartbeat + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "os/exec" + "strings" + "sync" + "sync/atomic" + "time" + + uuid "github.com/google/uuid" + logger "github.com/inference-gateway/cli/internal/logger" +) + +// ExecCommandFn matches exec.CommandContext. Exposed for tests. +type ExecCommandFn func(ctx context.Context, name string, args ...string) *exec.Cmd + +// Config bundles the runtime knobs the heartbeat service needs. It is +// derived from config.HeartbeatConfig at startup time so the service +// stays decoupled from the broader Config type. +type Config struct { + Interval time.Duration + InitialDelay time.Duration + Model string + Prompt string +} + +// Service drives the heartbeat ticker. Single global instance per +// daemon; constructed by NewService and lifecycle-managed via +// Start/Stop. +type Service struct { + cfg Config + execCmd ExecCommandFn + binaryPath string + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + // running is 1 while a heartbeat agent run is in flight. Used to + // suppress overlapping ticks if a single run takes longer than the + // configured interval. + running atomic.Int32 + + started bool + mu sync.Mutex +} + +// Options bundles dependencies for NewService. +type Options struct { + Config Config + // ExecCommand defaults to exec.CommandContext when nil. + ExecCommand ExecCommandFn + // BinaryPath defaults to os.Args[0] when empty. + BinaryPath string +} + +// NewService constructs a heartbeat Service. Returns an error if the +// configured interval is non-positive. +func NewService(opts Options) (*Service, error) { + if opts.Config.Interval <= 0 { + return nil, errors.New("heartbeat: interval must be > 0") + } + execFn := opts.ExecCommand + if execFn == nil { + execFn = exec.CommandContext + } + bin := opts.BinaryPath + if bin == "" { + bin = os.Args[0] + } + return &Service{ + cfg: opts.Config, + execCmd: execFn, + binaryPath: bin, + }, nil +} + +// Start launches the ticker goroutine. The supplied context's +// cancellation is propagated to in-flight agent subprocesses on +// shutdown. Calling Start more than once is a no-op. +func (s *Service) Start(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.started { + return nil + } + s.ctx, s.cancel = context.WithCancel(ctx) + s.started = true + + logger.Info("Heartbeat service started", + "interval", s.cfg.Interval.String(), + "initial_delay", s.cfg.InitialDelay.String(), + ) + + s.wg.Add(1) + go s.run() + return nil +} + +// Stop cancels the ticker and waits for any in-flight heartbeat run +// to terminate. Honours the supplied context as a hard deadline. +func (s *Service) Stop(ctx context.Context) error { + s.mu.Lock() + if !s.started { + s.mu.Unlock() + return nil + } + s.cancel() + s.mu.Unlock() + + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + select { + case <-done: + logger.Info("Heartbeat service stopped") + return nil + case <-ctx.Done(): + return fmt.Errorf("heartbeat stop: %w", ctx.Err()) + } +} + +func (s *Service) run() { + defer s.wg.Done() + + if s.cfg.InitialDelay > 0 { + select { + case <-time.After(s.cfg.InitialDelay): + case <-s.ctx.Done(): + return + } + } + + s.fireGuarded() + + ticker := time.NewTicker(s.cfg.Interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.fireGuarded() + case <-s.ctx.Done(): + return + } + } +} + +// fireGuarded suppresses overlapping ticks. If a previous heartbeat +// run is still alive, the current tick is skipped — the next one will +// pick it up. +func (s *Service) fireGuarded() { + if !s.running.CompareAndSwap(0, 1) { + logger.Warn("Heartbeat tick skipped — previous run still in flight") + return + } + defer s.running.Store(0) + + if err := s.fire(s.ctx); err != nil { + if errors.Is(err, context.Canceled) { + return + } + logger.Error("Heartbeat run failed", "error", err) + } +} + +// fire spawns a single `infer agent --heartbeat` subprocess and +// streams its stdout to the logger. Each fire gets a fresh UUID +// session ID so no context carries between ticks. +func (s *Service) fire(ctx context.Context) error { + sessionID := uuid.New().String() + args := []string{"agent", "--heartbeat", "--session-id", sessionID} + if s.cfg.Model != "" { + args = append(args, "--model", s.cfg.Model) + } + args = append(args, s.cfg.Prompt) + + logger.Info("Heartbeat tick — spawning agent", + "session_id", sessionID, + "model", s.cfg.Model, + ) + + cmd := s.execCmd(ctx, s.binaryPath, args...) + cmd.Env = os.Environ() + + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("stdout pipe: %w", err) + } + var stderrBuf bytes.Buffer + cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf) + + if err := cmd.Start(); err != nil { + return fmt.Errorf("start agent: %w", err) + } + + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 0, 64*1024), 10*1024*1024) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + logger.Info("Heartbeat agent output", "session_id", sessionID, "line", line) + } + + if err := cmd.Wait(); err != nil { + if stderrBuf.Len() > 0 { + return fmt.Errorf("%w: %s", err, strings.TrimSpace(stderrBuf.String())) + } + return err + } + logger.Info("Heartbeat tick complete", "session_id", sessionID) + return nil +} diff --git a/internal/services/heartbeat/heartbeat_test.go b/internal/services/heartbeat/heartbeat_test.go new file mode 100644 index 00000000..5ee0d374 --- /dev/null +++ b/internal/services/heartbeat/heartbeat_test.go @@ -0,0 +1,176 @@ +package heartbeat + +import ( + "context" + "os/exec" + "sync/atomic" + "testing" + "time" +) + +func TestNewService_ValidatesInterval(t *testing.T) { + if _, err := NewService(Options{}); err == nil { + t.Fatal("expected error when interval is zero") + } + if _, err := NewService(Options{Config: Config{Interval: -time.Second}}); err == nil { + t.Fatal("expected error when interval is negative") + } + if _, err := NewService(Options{Config: Config{Interval: time.Second}}); err != nil { + t.Fatalf("unexpected error for valid interval: %v", err) + } +} + +func TestService_FiresOnInterval(t *testing.T) { + fired := &atomic.Int32{} + svc, err := NewService(Options{ + Config: Config{ + Interval: 50 * time.Millisecond, + InitialDelay: 0, + Prompt: "test prompt", + }, + ExecCommand: func(ctx context.Context, _ string, _ ...string) *exec.Cmd { + fired.Add(1) + return exec.CommandContext(ctx, "echo", "hello") + }, + BinaryPath: "/usr/bin/true", + }) + if err != nil { + t.Fatalf("NewService: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := svc.Start(ctx); err != nil { + t.Fatalf("Start: %v", err) + } + + deadline := time.Now().Add(500 * time.Millisecond) + for time.Now().Before(deadline) { + if fired.Load() >= 3 { + break + } + time.Sleep(20 * time.Millisecond) + } + + stopCtx, stopCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer stopCancel() + if err := svc.Stop(stopCtx); err != nil { + t.Fatalf("Stop: %v", err) + } + + if got := fired.Load(); got < 3 { + t.Errorf("expected at least 3 fires, got %d", got) + } +} + +func TestService_RespectsInitialDelay(t *testing.T) { + fired := &atomic.Int32{} + svc, err := NewService(Options{ + Config: Config{ + Interval: 20 * time.Millisecond, + InitialDelay: 200 * time.Millisecond, + Prompt: "test", + }, + ExecCommand: func(ctx context.Context, _ string, _ ...string) *exec.Cmd { + fired.Add(1) + return exec.CommandContext(ctx, "echo", "hello") + }, + BinaryPath: "/usr/bin/true", + }) + if err != nil { + t.Fatalf("NewService: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := svc.Start(ctx); err != nil { + t.Fatalf("Start: %v", err) + } + + // At 100ms, the initial delay (200ms) should not yet have elapsed. + time.Sleep(100 * time.Millisecond) + if got := fired.Load(); got != 0 { + t.Errorf("expected 0 fires before initial_delay elapses, got %d", got) + } + + stopCtx, stopCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer stopCancel() + if err := svc.Stop(stopCtx); err != nil { + t.Fatalf("Stop: %v", err) + } +} + +func TestService_SkipsOverlappingTicks(t *testing.T) { + fired := &atomic.Int32{} + // Each fire blocks for 200ms (sleep), so with interval=50ms several + // ticks would overlap if we did not guard against it. Expect roughly + // one fire per 200ms window. + svc, err := NewService(Options{ + Config: Config{ + Interval: 50 * time.Millisecond, + InitialDelay: 0, + Prompt: "test", + }, + ExecCommand: func(ctx context.Context, _ string, _ ...string) *exec.Cmd { + fired.Add(1) + return exec.CommandContext(ctx, "sleep", "0.2") + }, + BinaryPath: "/usr/bin/true", + }) + if err != nil { + t.Fatalf("NewService: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := svc.Start(ctx); err != nil { + t.Fatalf("Start: %v", err) + } + + // Run for 500ms — without guard we'd see ~10 fires; with the guard + // we expect at most ~3 (one every 200ms). + time.Sleep(500 * time.Millisecond) + + stopCtx, stopCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer stopCancel() + if err := svc.Stop(stopCtx); err != nil { + t.Fatalf("Stop: %v", err) + } + + got := fired.Load() + if got > 5 { + t.Errorf("overlap guard failed: %d fires in 500ms with 200ms-per-fire work", got) + } + if got < 1 { + t.Errorf("expected at least one fire, got %d", got) + } +} + +func TestService_StopIsIdempotent(t *testing.T) { + svc, err := NewService(Options{ + Config: Config{Interval: time.Second, Prompt: "x"}, + ExecCommand: func(ctx context.Context, _ string, _ ...string) *exec.Cmd { + return exec.CommandContext(ctx, "echo") + }, + }) + if err != nil { + t.Fatalf("NewService: %v", err) + } + + ctx := context.Background() + if err := svc.Stop(ctx); err != nil { + t.Errorf("Stop before Start should be a no-op, got: %v", err) + } + + startCtx, startCancel := context.WithCancel(context.Background()) + defer startCancel() + if err := svc.Start(startCtx); err != nil { + t.Fatalf("Start: %v", err) + } + + stopCtx, stopCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer stopCancel() + if err := svc.Stop(stopCtx); err != nil { + t.Errorf("Stop: %v", err) + } +}