diff --git a/pkg/vmcp/server/status.go b/pkg/vmcp/server/status.go index 484ee5bd88..0eafe5f284 100644 --- a/pkg/vmcp/server/status.go +++ b/pkg/vmcp/server/status.go @@ -12,6 +12,7 @@ import ( "github.com/stacklok/toolhive/pkg/versions" "github.com/stacklok/toolhive/pkg/vmcp" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" + "github.com/stacklok/toolhive/pkg/vmcp/health" ) // StatusResponse represents the vMCP server's operational status. @@ -63,17 +64,36 @@ func (s *Server) buildStatusResponse(ctx context.Context) StatusResponse { backends := s.backendRegistry.List(ctx) backendStatuses := make([]BackendStatus, 0, len(backends)) + // Get live health states from the health monitor (if enabled) so that + // /status reflects the same runtime health as /api/backends/health. + // Skip the call — and the map allocation — entirely when monitoring is + // disabled. Reading from a nil map is safe in Go and returns zero values. + s.healthMonitorMu.RLock() + healthMon := s.healthMonitor + s.healthMonitorMu.RUnlock() + + var liveHealthStates map[string]*health.State + if healthMon != nil { + liveHealthStates = healthMon.GetAllBackendStates() + } + hasHealthyBackend := false for _, backend := range backends { + // Prefer the live health monitor state over the static registry value. + healthStatus := backend.HealthStatus + if liveState, ok := liveHealthStates[backend.ID]; ok { + healthStatus = liveState.Status + } + status := BackendStatus{ Name: backend.Name, - Health: string(backend.HealthStatus), + Health: string(healthStatus), Transport: backend.TransportType, AuthType: getAuthType(backend.AuthConfig), } backendStatuses = append(backendStatuses, status) - if backend.HealthStatus == vmcp.BackendHealthy { + if healthStatus == vmcp.BackendHealthy { hasHealthyBackend = true } } diff --git a/pkg/vmcp/server/status_test.go b/pkg/vmcp/server/status_test.go index 417ea2eeb8..b5a1645bad 100644 --- a/pkg/vmcp/server/status_test.go +++ b/pkg/vmcp/server/status_test.go @@ -6,6 +6,7 @@ package server_test import ( "context" "encoding/json" + "errors" "net/http" "testing" "time" @@ -19,6 +20,7 @@ import ( "github.com/stacklok/toolhive/pkg/vmcp/aggregator" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" discoveryMocks "github.com/stacklok/toolhive/pkg/vmcp/discovery/mocks" + "github.com/stacklok/toolhive/pkg/vmcp/health" "github.com/stacklok/toolhive/pkg/vmcp/mocks" "github.com/stacklok/toolhive/pkg/vmcp/router" "github.com/stacklok/toolhive/pkg/vmcp/server" @@ -40,66 +42,15 @@ type BackendStatus struct { AuthType string `json:"auth_type,omitempty"` } -// createTestServerWithBackends creates a test server instance with custom backends. +// createTestServerWithBackends creates a test server instance with custom backends +// and no health monitoring. It is a convenience wrapper around createTestServerWithHealthMonitor. func createTestServerWithBackends(t *testing.T, backends []vmcp.Backend, groupRef string) *server.Server { t.Helper() - - ctrl := gomock.NewController(t) - t.Cleanup(ctrl.Finish) - - mockBackendClient := mocks.NewMockBackendClient(ctrl) - mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) - rt := router.NewDefaultRouter() - - port := networking.FindAvailable() - require.NotZero(t, port, "Failed to find available port") - - mockDiscoveryMgr.EXPECT(). - Discover(gomock.Any(), gomock.Any()). - Return(&aggregator.AggregatedCapabilities{ - Tools: []vmcp.Tool{}, - Resources: []vmcp.Resource{}, - Prompts: []vmcp.Prompt{}, - RoutingTable: &vmcp.RoutingTable{ - Tools: make(map[string]*vmcp.BackendTarget), - Resources: make(map[string]*vmcp.BackendTarget), - Prompts: make(map[string]*vmcp.BackendTarget), - }, - Metadata: &aggregator.AggregationMetadata{}, - }, nil). - AnyTimes() - mockDiscoveryMgr.EXPECT().Stop().AnyTimes() - - ctx, cancel := context.WithCancel(t.Context()) - - srv, err := server.New(ctx, &server.Config{ - Name: "test-vmcp", - Version: "1.0.0", - Host: "127.0.0.1", - Port: port, - GroupRef: groupRef, - SessionFactory: newNoopMockFactory(t), - }, rt, mockBackendClient, mockDiscoveryMgr, vmcp.NewImmutableRegistry(backends), nil) - require.NoError(t, err) - - t.Cleanup(cancel) - errCh := make(chan error, 1) - go func() { - if err := srv.Start(ctx); err != nil { - errCh <- err - } - }() - - select { - case <-srv.Ready(): - case err := <-errCh: - t.Fatalf("Server failed to start: %v", err) - case <-time.After(5 * time.Second): - t.Fatalf("Server did not become ready within 5s (address: %s)", srv.Address()) - } - - time.Sleep(10 * time.Millisecond) - return srv + return createTestServerWithHealthMonitor(t, backends, + health.MonitorConfig{}, // zero value → no health monitor started + nil, // no mock expectations needed + groupRef, + ) } func TestStatusEndpoint_HTTPBehavior(t *testing.T) { @@ -242,3 +193,221 @@ func TestStatusEndpoint_BackendFieldMapping(t *testing.T) { assert.Equal(t, "streamable-http", b.Transport) assert.Equal(t, authtypes.StrategyTypeTokenExchange, b.AuthType) } + +// createTestServerWithHealthMonitor creates a test server with health monitoring enabled. +// setupMock configures mock expectations on the backend client (e.g. ListCapabilities responses for health checks). +// groupRef is set in the server config (empty string is fine for tests that don't need it). +func createTestServerWithHealthMonitor( + t *testing.T, + backends []vmcp.Backend, + monitorCfg health.MonitorConfig, + setupMock func(mockClient *mocks.MockBackendClient), + groupRef string, +) *server.Server { + t.Helper() + + // ctrl.Finish must run last so that all mock calls have already stopped. + // t.Cleanup is LIFO, so register it first — it will execute third. + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + mockBackendClient := mocks.NewMockBackendClient(ctrl) + mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) + rt := router.NewDefaultRouter() + + if setupMock != nil { + setupMock(mockBackendClient) + } + + port := networking.FindAvailable() + require.NotZero(t, port, "Failed to find available port") + + mockDiscoveryMgr.EXPECT(). + Discover(gomock.Any(), gomock.Any()). + Return(&aggregator.AggregatedCapabilities{ + Tools: []vmcp.Tool{}, + Resources: []vmcp.Resource{}, + Prompts: []vmcp.Prompt{}, + RoutingTable: &vmcp.RoutingTable{ + Tools: make(map[string]*vmcp.BackendTarget), + Resources: make(map[string]*vmcp.BackendTarget), + Prompts: make(map[string]*vmcp.BackendTarget), + }, + Metadata: &aggregator.AggregationMetadata{}, + }, nil). + AnyTimes() + mockDiscoveryMgr.EXPECT().Stop().AnyTimes() + + ctx, cancel := context.WithCancel(t.Context()) + + var healthMonCfg *health.MonitorConfig + if (monitorCfg != health.MonitorConfig{}) { + healthMonCfg = &monitorCfg + } + srv, err := server.New(ctx, &server.Config{ + Name: "test-vmcp", + Version: "1.0.0", + Host: "127.0.0.1", + Port: port, + GroupRef: groupRef, + HealthMonitorConfig: healthMonCfg, + SessionFactory: newNoopMockFactory(t), + }, rt, mockBackendClient, mockDiscoveryMgr, vmcp.NewImmutableRegistry(backends), nil) + require.NoError(t, err) + + type startResult struct { + err error + } + done := make(chan startResult, 1) + go func() { + done <- startResult{err: srv.Start(ctx)} + }() + + // Cleanup order (LIFO): + // 1. cancel() — stops the server and health monitor goroutines + // 2. <-done — waits for srv.Start (and all goroutines) to return + // 3. ctrl.Finish — validates mock expectations after all calls have stopped + t.Cleanup(func() { + result := <-done + if result.err != nil && !errors.Is(result.err, context.Canceled) { + t.Errorf("server exited with unexpected error: %v", result.err) + } + }) + t.Cleanup(cancel) + + select { + case <-srv.Ready(): + case result := <-done: + t.Fatalf("server exited before becoming ready: %v", result.err) + case <-time.After(5 * time.Second): + t.Fatalf("Server did not become ready within 5s (address: %s)", srv.Address()) + } + + return srv +} + +// queryStatus fetches and decodes /status from the given server. +func queryStatus(t *testing.T, srv *server.Server) StatusResponse { + t.Helper() + resp, err := http.Get("http://" + srv.Address() + "/status") + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode, "unexpected HTTP status from /status") + var status StatusResponse + require.NoError(t, json.NewDecoder(resp.Body).Decode(&status)) + return status +} + +// TestStatusEndpoint_ReflectsLiveHealthMonitor_Unhealthy verifies the fix for +// https://github.com/stacklok/toolhive/issues/4103: /status must report the +// same health state as the live health monitor, not the stale registry value. +// +// Without the fix, a backend registered as "healthy" would always appear healthy +// in /status even after the health monitor had marked it unhealthy. +func TestStatusEndpoint_ReflectsLiveHealthMonitor_Unhealthy(t *testing.T) { + t.Parallel() + + // Backend starts as "healthy" in the registry – this is the stale value + // that the old code would always return from /status. + backends := []vmcp.Backend{{ + ID: "b1", + Name: "test-backend", + TransportType: "streamable-http", + HealthStatus: vmcp.BackendHealthy, + }} + + monitorCfg := health.MonitorConfig{ + CheckInterval: 5 * time.Millisecond, + UnhealthyThreshold: 1, // one failure → unhealthy immediately + Timeout: time.Second, + } + + srv := createTestServerWithHealthMonitor(t, backends, monitorCfg, func(mockClient *mocks.MockBackendClient) { + // All health checks fail – the monitor should mark the backend unhealthy. + mockClient.EXPECT(). + ListCapabilities(gomock.Any(), gomock.Any()). + Return(nil, errors.New("connection refused")). + AnyTimes() + }, "") + + // Poll /status until the live monitor state propagates. If the fix is + // absent the backend stays "healthy" forever and the assertion times out. + require.Eventually(t, func() bool { + status := queryStatus(t, srv) + if len(status.Backends) == 0 { + return false + } + return status.Backends[0].Health == string(vmcp.BackendUnhealthy) + }, 5*time.Second, 20*time.Millisecond, "expected /status to report backend as unhealthy") + + // The overall server health flag must also be false when no backend is healthy. + status := queryStatus(t, srv) + assert.False(t, status.Healthy) + require.Len(t, status.Backends, 1) + assert.Equal(t, string(vmcp.BackendUnhealthy), status.Backends[0].Health) +} + +// TestStatusEndpoint_ReflectsLiveHealthMonitor_Healthy confirms that /status +// correctly reports a backend as healthy when the health monitor records success. +func TestStatusEndpoint_ReflectsLiveHealthMonitor_Healthy(t *testing.T) { + t.Parallel() + + backends := []vmcp.Backend{{ + ID: "b1", + Name: "test-backend", + TransportType: "streamable-http", + HealthStatus: vmcp.BackendUnknown, // registry starts with unknown + }} + + monitorCfg := health.MonitorConfig{ + CheckInterval: 5 * time.Millisecond, + UnhealthyThreshold: 3, + Timeout: time.Second, + } + + srv := createTestServerWithHealthMonitor(t, backends, monitorCfg, func(mockClient *mocks.MockBackendClient) { + // Health checks succeed – the monitor should mark the backend healthy. + mockClient.EXPECT(). + ListCapabilities(gomock.Any(), gomock.Any()). + Return(&vmcp.CapabilityList{}, nil). + AnyTimes() + }, "") + + // Poll until the healthy state from the monitor appears in /status. + require.Eventually(t, func() bool { + status := queryStatus(t, srv) + if len(status.Backends) == 0 { + return false + } + return status.Backends[0].Health == string(vmcp.BackendHealthy) + }, 5*time.Second, 20*time.Millisecond, "expected /status to report backend as healthy") + + status := queryStatus(t, srv) + assert.True(t, status.Healthy) + require.Len(t, status.Backends, 1) + assert.Equal(t, string(vmcp.BackendHealthy), status.Backends[0].Health) +} + +// TestStatusEndpoint_FallsBackToRegistry_WhenMonitorDisabled confirms the +// no-monitor path is unchanged: health status comes from the registry. +func TestStatusEndpoint_FallsBackToRegistry_WhenMonitorDisabled(t *testing.T) { + t.Parallel() + + backends := []vmcp.Backend{ + {ID: "b1", Name: "healthy-backend", HealthStatus: vmcp.BackendHealthy}, + {ID: "b2", Name: "unhealthy-backend", HealthStatus: vmcp.BackendUnhealthy}, + } + + // createTestServerWithBackends does NOT configure a health monitor. + srv := createTestServerWithBackends(t, backends, "") + + status := queryStatus(t, srv) + + require.Len(t, status.Backends, 2) + healthByName := make(map[string]string) + for _, b := range status.Backends { + healthByName[b.Name] = b.Health + } + assert.Equal(t, string(vmcp.BackendHealthy), healthByName["healthy-backend"]) + assert.Equal(t, string(vmcp.BackendUnhealthy), healthByName["unhealthy-backend"]) +} diff --git a/test/e2e/thv-operator/virtualmcp/helpers.go b/test/e2e/thv-operator/virtualmcp/helpers.go index 618b1d9df0..337e232f7e 100644 --- a/test/e2e/thv-operator/virtualmcp/helpers.go +++ b/test/e2e/thv-operator/virtualmcp/helpers.go @@ -1802,3 +1802,70 @@ func DeployMockOAuth2Server( } return inClusterTokenURL, cleanup } + +// ---- /status and /api/backends/health HTTP helpers ---- + +// VMCPStatusResponse mirrors server.StatusResponse +// (pkg/vmcp/server/status.go) for test deserialization. +type VMCPStatusResponse struct { + Backends []VMCPBackendStatus `json:"backends"` + Healthy bool `json:"healthy"` + Version string `json:"version"` + GroupRef string `json:"group_ref"` +} + +// VMCPBackendStatus mirrors server.BackendStatus +// (pkg/vmcp/server/status.go) for test deserialization. +type VMCPBackendStatus struct { + Name string `json:"name"` + Health string `json:"health"` // "healthy", "degraded", "unhealthy", "unknown" + Transport string `json:"transport"` + AuthType string `json:"auth_type,omitempty"` +} + +// VMCPBackendsHealthResponse mirrors BackendHealthResponse +// (pkg/vmcp/server/server.go) for test deserialization. +type VMCPBackendsHealthResponse struct { + MonitoringEnabled bool `json:"monitoring_enabled"` + Backends map[string]*VMCPBackendHealthState `json:"backends,omitempty"` +} + +// VMCPBackendHealthState mirrors health.State for test deserialization. +// Field names are capitalized (no json tags on the server struct). +type VMCPBackendHealthState struct { + Status string `json:"Status"` + ConsecutiveFailures int `json:"ConsecutiveFailures"` + LastErrorCategory string `json:"LastErrorCategory"` +} + +// getAndDecodeJSON issues a GET to url, checks for HTTP 200, and decodes the +// JSON body into a value of type T. Returns a pointer to the decoded value. +func getAndDecodeJSON[T any](url, label string) (*T, error) { + resp, err := http.Get(url) //nolint:gosec // test helper, URL is constructed from controlled input + if err != nil { + return nil, fmt.Errorf("GET %s: %w", label, err) + } + defer resp.Body.Close() //nolint:errcheck + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("%s returned HTTP %d", label, resp.StatusCode) + } + var result T + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("decode %s: %w", label, err) + } + return &result, nil +} + +// GetVMCPStatus queries the /status endpoint on the given NodePort and returns +// the parsed response. +func GetVMCPStatus(nodePort int32) (*VMCPStatusResponse, error) { + return getAndDecodeJSON[VMCPStatusResponse]( + fmt.Sprintf("http://localhost:%d/status", nodePort), "/status") +} + +// GetVMCPBackendsHealth queries the /api/backends/health endpoint on the given +// NodePort and returns the parsed response. +func GetVMCPBackendsHealth(nodePort int32) (*VMCPBackendsHealthResponse, error) { + return getAndDecodeJSON[VMCPBackendsHealthResponse]( + fmt.Sprintf("http://localhost:%d/api/backends/health", nodePort), "/api/backends/health") +} diff --git a/test/e2e/thv-operator/virtualmcp/virtualmcp_circuit_breaker_test.go b/test/e2e/thv-operator/virtualmcp/virtualmcp_circuit_breaker_test.go index 764065e39e..ce34ae8882 100644 --- a/test/e2e/thv-operator/virtualmcp/virtualmcp_circuit_breaker_test.go +++ b/test/e2e/thv-operator/virtualmcp/virtualmcp_circuit_breaker_test.go @@ -363,6 +363,91 @@ var _ = Describe("VirtualMCPServer Circuit Breaker Lifecycle", Ordered, func() { GinkgoWriter.Printf(" Circuit breaker state verified above; capability filtering covered by unit tests\n") }) + It("should report consistent health state in /status and /api/backends/health", func() { + By("Obtaining the vMCP NodePort") + nodePort := GetVMCPNodePort(ctx, k8sClient, vmcpServerName, testNamespace, timeout, pollingInterval) + + // Fetch both endpoints in the same polling iteration so that a backend + // transitioning between states mid-comparison does not cause a spurious + // mismatch. Both snapshots must agree before the assertion passes. + // + // Backend IDs in /api/backends/health equal MCPServer names (set in + // k8s.go GetWorkloadAsVMCPBackend, ID = mcpServer.Name). + By("Polling until both endpoints agree on health state and unstable backend is unhealthy") + var ( + lastStatusResp *VMCPStatusResponse + lastBackendsHealth *VMCPBackendsHealthResponse + ) + Eventually(func() error { + bh, err := GetVMCPBackendsHealth(nodePort) + if err != nil { + return fmt.Errorf("GET /api/backends/health: %w", err) + } + if !bh.MonitoringEnabled { + return fmt.Errorf("/api/backends/health: monitoring not enabled") + } + if len(bh.Backends) == 0 { + return fmt.Errorf("/api/backends/health: no backends listed") + } + + sr, err := GetVMCPStatus(nodePort) + if err != nil { + return fmt.Errorf("GET /status: %w", err) + } + if len(sr.Backends) == 0 { + return fmt.Errorf("/status: no backends listed") + } + + // Build name→health map from /status snapshot. + statusHealthByName := make(map[string]string, len(sr.Backends)) + for _, b := range sr.Backends { + statusHealthByName[b.Name] = b.Health + } + + // Both snapshots must agree on every backend in /api/backends/health. + for backendID, healthState := range bh.Backends { + statusHealth, found := statusHealthByName[backendID] + if !found { + return fmt.Errorf("backend %q in /api/backends/health but missing from /status", backendID) + } + if statusHealth != healthState.Status { + return fmt.Errorf("backend %q: /status=%q /api/backends/health=%q (inconsistent)", + backendID, statusHealth, healthState.Status) + } + } + + // The unstable backend must be unhealthy in this consistent snapshot. + unstableHealthState, inHealth := bh.Backends[backend2Name] + if !inHealth { + return fmt.Errorf("unstable backend %q not found in /api/backends/health", backend2Name) + } + if unstableHealthState.Status == "healthy" { + return fmt.Errorf("unstable backend %q still healthy in /api/backends/health", backend2Name) + } + unstableStatusHealth, inStatus := statusHealthByName[backend2Name] + if !inStatus { + return fmt.Errorf("unstable backend %q not found in /status", backend2Name) + } + if unstableStatusHealth == "healthy" { + return fmt.Errorf("unstable backend %q still healthy in /status (issue #4103 regression)", backend2Name) + } + + lastBackendsHealth = bh + lastStatusResp = sr + return nil + }, timeout, pollingInterval).Should(Succeed(), + "endpoints should converge on a consistent unhealthy state for the unstable backend") + + // Log the final consistent snapshot for debugging. + for _, b := range lastStatusResp.Backends { + healthEntry := lastBackendsHealth.Backends[b.Name] + if healthEntry != nil { + GinkgoWriter.Printf("✓ backend=%s /status=%s /api/backends/health=%s\n", + b.Name, b.Health, healthEntry.Status) + } + } + }) + It("should close circuit breaker when backend recovers", func() { By("Restoring unstable backend by fixing the image") backend := &mcpv1alpha1.MCPServer{}