From befa1c64bb94610dc86aef266283b70446bb005d Mon Sep 17 00:00:00 2001 From: taskbot Date: Fri, 13 Mar 2026 16:41:29 +0100 Subject: [PATCH] fix(vmcp): /status uses live health monitor state The /status endpoint was reading backend health from the static registry snapshot (set at discovery time and never updated), while /api/backends/health correctly read from the live health monitor. This caused the two endpoints to report inconsistent state for the same backend (issue #4103). Fix buildStatusResponse() to call GetAllBackendHealthStates() and prefer the monitor's runtime health over the registry's initial value. When health monitoring is disabled the registry value is used as before, preserving backwards compatibility. Add unit tests that assert /status reflects live monitor state for both healthy and unhealthy transitions, and an e2e It block in the circuit breaker lifecycle suite that compares both endpoints side-by-side once the unstable backend's circuit breaker opens. Closes: #4103 --- pkg/vmcp/server/status.go | 24 +- pkg/vmcp/server/status_test.go | 285 ++++++++++++++---- test/e2e/thv-operator/virtualmcp/helpers.go | 67 ++++ .../virtualmcp_circuit_breaker_test.go | 85 ++++++ 4 files changed, 401 insertions(+), 60 deletions(-) diff --git a/pkg/vmcp/server/status.go b/pkg/vmcp/server/status.go index 484ee5bd88..0eafe5f284 100644 --- a/pkg/vmcp/server/status.go +++ b/pkg/vmcp/server/status.go @@ -12,6 +12,7 @@ import ( "github.com/stacklok/toolhive/pkg/versions" "github.com/stacklok/toolhive/pkg/vmcp" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" + "github.com/stacklok/toolhive/pkg/vmcp/health" ) // StatusResponse represents the vMCP server's operational status. @@ -63,17 +64,36 @@ func (s *Server) buildStatusResponse(ctx context.Context) StatusResponse { backends := s.backendRegistry.List(ctx) backendStatuses := make([]BackendStatus, 0, len(backends)) + // Get live health states from the health monitor (if enabled) so that + // /status reflects the same runtime health as /api/backends/health. + // Skip the call — and the map allocation — entirely when monitoring is + // disabled. Reading from a nil map is safe in Go and returns zero values. + s.healthMonitorMu.RLock() + healthMon := s.healthMonitor + s.healthMonitorMu.RUnlock() + + var liveHealthStates map[string]*health.State + if healthMon != nil { + liveHealthStates = healthMon.GetAllBackendStates() + } + hasHealthyBackend := false for _, backend := range backends { + // Prefer the live health monitor state over the static registry value. + healthStatus := backend.HealthStatus + if liveState, ok := liveHealthStates[backend.ID]; ok { + healthStatus = liveState.Status + } + status := BackendStatus{ Name: backend.Name, - Health: string(backend.HealthStatus), + Health: string(healthStatus), Transport: backend.TransportType, AuthType: getAuthType(backend.AuthConfig), } backendStatuses = append(backendStatuses, status) - if backend.HealthStatus == vmcp.BackendHealthy { + if healthStatus == vmcp.BackendHealthy { hasHealthyBackend = true } } diff --git a/pkg/vmcp/server/status_test.go b/pkg/vmcp/server/status_test.go index 417ea2eeb8..b5a1645bad 100644 --- a/pkg/vmcp/server/status_test.go +++ b/pkg/vmcp/server/status_test.go @@ -6,6 +6,7 @@ package server_test import ( "context" "encoding/json" + "errors" "net/http" "testing" "time" @@ -19,6 +20,7 @@ import ( "github.com/stacklok/toolhive/pkg/vmcp/aggregator" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" discoveryMocks "github.com/stacklok/toolhive/pkg/vmcp/discovery/mocks" + "github.com/stacklok/toolhive/pkg/vmcp/health" "github.com/stacklok/toolhive/pkg/vmcp/mocks" "github.com/stacklok/toolhive/pkg/vmcp/router" "github.com/stacklok/toolhive/pkg/vmcp/server" @@ -40,66 +42,15 @@ type BackendStatus struct { AuthType string `json:"auth_type,omitempty"` } -// createTestServerWithBackends creates a test server instance with custom backends. +// createTestServerWithBackends creates a test server instance with custom backends +// and no health monitoring. It is a convenience wrapper around createTestServerWithHealthMonitor. func createTestServerWithBackends(t *testing.T, backends []vmcp.Backend, groupRef string) *server.Server { t.Helper() - - ctrl := gomock.NewController(t) - t.Cleanup(ctrl.Finish) - - mockBackendClient := mocks.NewMockBackendClient(ctrl) - mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) - rt := router.NewDefaultRouter() - - port := networking.FindAvailable() - require.NotZero(t, port, "Failed to find available port") - - mockDiscoveryMgr.EXPECT(). - Discover(gomock.Any(), gomock.Any()). - Return(&aggregator.AggregatedCapabilities{ - Tools: []vmcp.Tool{}, - Resources: []vmcp.Resource{}, - Prompts: []vmcp.Prompt{}, - RoutingTable: &vmcp.RoutingTable{ - Tools: make(map[string]*vmcp.BackendTarget), - Resources: make(map[string]*vmcp.BackendTarget), - Prompts: make(map[string]*vmcp.BackendTarget), - }, - Metadata: &aggregator.AggregationMetadata{}, - }, nil). - AnyTimes() - mockDiscoveryMgr.EXPECT().Stop().AnyTimes() - - ctx, cancel := context.WithCancel(t.Context()) - - srv, err := server.New(ctx, &server.Config{ - Name: "test-vmcp", - Version: "1.0.0", - Host: "127.0.0.1", - Port: port, - GroupRef: groupRef, - SessionFactory: newNoopMockFactory(t), - }, rt, mockBackendClient, mockDiscoveryMgr, vmcp.NewImmutableRegistry(backends), nil) - require.NoError(t, err) - - t.Cleanup(cancel) - errCh := make(chan error, 1) - go func() { - if err := srv.Start(ctx); err != nil { - errCh <- err - } - }() - - select { - case <-srv.Ready(): - case err := <-errCh: - t.Fatalf("Server failed to start: %v", err) - case <-time.After(5 * time.Second): - t.Fatalf("Server did not become ready within 5s (address: %s)", srv.Address()) - } - - time.Sleep(10 * time.Millisecond) - return srv + return createTestServerWithHealthMonitor(t, backends, + health.MonitorConfig{}, // zero value → no health monitor started + nil, // no mock expectations needed + groupRef, + ) } func TestStatusEndpoint_HTTPBehavior(t *testing.T) { @@ -242,3 +193,221 @@ func TestStatusEndpoint_BackendFieldMapping(t *testing.T) { assert.Equal(t, "streamable-http", b.Transport) assert.Equal(t, authtypes.StrategyTypeTokenExchange, b.AuthType) } + +// createTestServerWithHealthMonitor creates a test server with health monitoring enabled. +// setupMock configures mock expectations on the backend client (e.g. ListCapabilities responses for health checks). +// groupRef is set in the server config (empty string is fine for tests that don't need it). +func createTestServerWithHealthMonitor( + t *testing.T, + backends []vmcp.Backend, + monitorCfg health.MonitorConfig, + setupMock func(mockClient *mocks.MockBackendClient), + groupRef string, +) *server.Server { + t.Helper() + + // ctrl.Finish must run last so that all mock calls have already stopped. + // t.Cleanup is LIFO, so register it first — it will execute third. + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + mockBackendClient := mocks.NewMockBackendClient(ctrl) + mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) + rt := router.NewDefaultRouter() + + if setupMock != nil { + setupMock(mockBackendClient) + } + + port := networking.FindAvailable() + require.NotZero(t, port, "Failed to find available port") + + mockDiscoveryMgr.EXPECT(). + Discover(gomock.Any(), gomock.Any()). + Return(&aggregator.AggregatedCapabilities{ + Tools: []vmcp.Tool{}, + Resources: []vmcp.Resource{}, + Prompts: []vmcp.Prompt{}, + RoutingTable: &vmcp.RoutingTable{ + Tools: make(map[string]*vmcp.BackendTarget), + Resources: make(map[string]*vmcp.BackendTarget), + Prompts: make(map[string]*vmcp.BackendTarget), + }, + Metadata: &aggregator.AggregationMetadata{}, + }, nil). + AnyTimes() + mockDiscoveryMgr.EXPECT().Stop().AnyTimes() + + ctx, cancel := context.WithCancel(t.Context()) + + var healthMonCfg *health.MonitorConfig + if (monitorCfg != health.MonitorConfig{}) { + healthMonCfg = &monitorCfg + } + srv, err := server.New(ctx, &server.Config{ + Name: "test-vmcp", + Version: "1.0.0", + Host: "127.0.0.1", + Port: port, + GroupRef: groupRef, + HealthMonitorConfig: healthMonCfg, + SessionFactory: newNoopMockFactory(t), + }, rt, mockBackendClient, mockDiscoveryMgr, vmcp.NewImmutableRegistry(backends), nil) + require.NoError(t, err) + + type startResult struct { + err error + } + done := make(chan startResult, 1) + go func() { + done <- startResult{err: srv.Start(ctx)} + }() + + // Cleanup order (LIFO): + // 1. cancel() — stops the server and health monitor goroutines + // 2. <-done — waits for srv.Start (and all goroutines) to return + // 3. ctrl.Finish — validates mock expectations after all calls have stopped + t.Cleanup(func() { + result := <-done + if result.err != nil && !errors.Is(result.err, context.Canceled) { + t.Errorf("server exited with unexpected error: %v", result.err) + } + }) + t.Cleanup(cancel) + + select { + case <-srv.Ready(): + case result := <-done: + t.Fatalf("server exited before becoming ready: %v", result.err) + case <-time.After(5 * time.Second): + t.Fatalf("Server did not become ready within 5s (address: %s)", srv.Address()) + } + + return srv +} + +// queryStatus fetches and decodes /status from the given server. +func queryStatus(t *testing.T, srv *server.Server) StatusResponse { + t.Helper() + resp, err := http.Get("http://" + srv.Address() + "/status") + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode, "unexpected HTTP status from /status") + var status StatusResponse + require.NoError(t, json.NewDecoder(resp.Body).Decode(&status)) + return status +} + +// TestStatusEndpoint_ReflectsLiveHealthMonitor_Unhealthy verifies the fix for +// https://github.com/stacklok/toolhive/issues/4103: /status must report the +// same health state as the live health monitor, not the stale registry value. +// +// Without the fix, a backend registered as "healthy" would always appear healthy +// in /status even after the health monitor had marked it unhealthy. +func TestStatusEndpoint_ReflectsLiveHealthMonitor_Unhealthy(t *testing.T) { + t.Parallel() + + // Backend starts as "healthy" in the registry – this is the stale value + // that the old code would always return from /status. + backends := []vmcp.Backend{{ + ID: "b1", + Name: "test-backend", + TransportType: "streamable-http", + HealthStatus: vmcp.BackendHealthy, + }} + + monitorCfg := health.MonitorConfig{ + CheckInterval: 5 * time.Millisecond, + UnhealthyThreshold: 1, // one failure → unhealthy immediately + Timeout: time.Second, + } + + srv := createTestServerWithHealthMonitor(t, backends, monitorCfg, func(mockClient *mocks.MockBackendClient) { + // All health checks fail – the monitor should mark the backend unhealthy. + mockClient.EXPECT(). + ListCapabilities(gomock.Any(), gomock.Any()). + Return(nil, errors.New("connection refused")). + AnyTimes() + }, "") + + // Poll /status until the live monitor state propagates. If the fix is + // absent the backend stays "healthy" forever and the assertion times out. + require.Eventually(t, func() bool { + status := queryStatus(t, srv) + if len(status.Backends) == 0 { + return false + } + return status.Backends[0].Health == string(vmcp.BackendUnhealthy) + }, 5*time.Second, 20*time.Millisecond, "expected /status to report backend as unhealthy") + + // The overall server health flag must also be false when no backend is healthy. + status := queryStatus(t, srv) + assert.False(t, status.Healthy) + require.Len(t, status.Backends, 1) + assert.Equal(t, string(vmcp.BackendUnhealthy), status.Backends[0].Health) +} + +// TestStatusEndpoint_ReflectsLiveHealthMonitor_Healthy confirms that /status +// correctly reports a backend as healthy when the health monitor records success. +func TestStatusEndpoint_ReflectsLiveHealthMonitor_Healthy(t *testing.T) { + t.Parallel() + + backends := []vmcp.Backend{{ + ID: "b1", + Name: "test-backend", + TransportType: "streamable-http", + HealthStatus: vmcp.BackendUnknown, // registry starts with unknown + }} + + monitorCfg := health.MonitorConfig{ + CheckInterval: 5 * time.Millisecond, + UnhealthyThreshold: 3, + Timeout: time.Second, + } + + srv := createTestServerWithHealthMonitor(t, backends, monitorCfg, func(mockClient *mocks.MockBackendClient) { + // Health checks succeed – the monitor should mark the backend healthy. + mockClient.EXPECT(). + ListCapabilities(gomock.Any(), gomock.Any()). + Return(&vmcp.CapabilityList{}, nil). + AnyTimes() + }, "") + + // Poll until the healthy state from the monitor appears in /status. + require.Eventually(t, func() bool { + status := queryStatus(t, srv) + if len(status.Backends) == 0 { + return false + } + return status.Backends[0].Health == string(vmcp.BackendHealthy) + }, 5*time.Second, 20*time.Millisecond, "expected /status to report backend as healthy") + + status := queryStatus(t, srv) + assert.True(t, status.Healthy) + require.Len(t, status.Backends, 1) + assert.Equal(t, string(vmcp.BackendHealthy), status.Backends[0].Health) +} + +// TestStatusEndpoint_FallsBackToRegistry_WhenMonitorDisabled confirms the +// no-monitor path is unchanged: health status comes from the registry. +func TestStatusEndpoint_FallsBackToRegistry_WhenMonitorDisabled(t *testing.T) { + t.Parallel() + + backends := []vmcp.Backend{ + {ID: "b1", Name: "healthy-backend", HealthStatus: vmcp.BackendHealthy}, + {ID: "b2", Name: "unhealthy-backend", HealthStatus: vmcp.BackendUnhealthy}, + } + + // createTestServerWithBackends does NOT configure a health monitor. + srv := createTestServerWithBackends(t, backends, "") + + status := queryStatus(t, srv) + + require.Len(t, status.Backends, 2) + healthByName := make(map[string]string) + for _, b := range status.Backends { + healthByName[b.Name] = b.Health + } + assert.Equal(t, string(vmcp.BackendHealthy), healthByName["healthy-backend"]) + assert.Equal(t, string(vmcp.BackendUnhealthy), healthByName["unhealthy-backend"]) +} diff --git a/test/e2e/thv-operator/virtualmcp/helpers.go b/test/e2e/thv-operator/virtualmcp/helpers.go index 618b1d9df0..337e232f7e 100644 --- a/test/e2e/thv-operator/virtualmcp/helpers.go +++ b/test/e2e/thv-operator/virtualmcp/helpers.go @@ -1802,3 +1802,70 @@ func DeployMockOAuth2Server( } return inClusterTokenURL, cleanup } + +// ---- /status and /api/backends/health HTTP helpers ---- + +// VMCPStatusResponse mirrors server.StatusResponse +// (pkg/vmcp/server/status.go) for test deserialization. +type VMCPStatusResponse struct { + Backends []VMCPBackendStatus `json:"backends"` + Healthy bool `json:"healthy"` + Version string `json:"version"` + GroupRef string `json:"group_ref"` +} + +// VMCPBackendStatus mirrors server.BackendStatus +// (pkg/vmcp/server/status.go) for test deserialization. +type VMCPBackendStatus struct { + Name string `json:"name"` + Health string `json:"health"` // "healthy", "degraded", "unhealthy", "unknown" + Transport string `json:"transport"` + AuthType string `json:"auth_type,omitempty"` +} + +// VMCPBackendsHealthResponse mirrors BackendHealthResponse +// (pkg/vmcp/server/server.go) for test deserialization. +type VMCPBackendsHealthResponse struct { + MonitoringEnabled bool `json:"monitoring_enabled"` + Backends map[string]*VMCPBackendHealthState `json:"backends,omitempty"` +} + +// VMCPBackendHealthState mirrors health.State for test deserialization. +// Field names are capitalized (no json tags on the server struct). +type VMCPBackendHealthState struct { + Status string `json:"Status"` + ConsecutiveFailures int `json:"ConsecutiveFailures"` + LastErrorCategory string `json:"LastErrorCategory"` +} + +// getAndDecodeJSON issues a GET to url, checks for HTTP 200, and decodes the +// JSON body into a value of type T. Returns a pointer to the decoded value. +func getAndDecodeJSON[T any](url, label string) (*T, error) { + resp, err := http.Get(url) //nolint:gosec // test helper, URL is constructed from controlled input + if err != nil { + return nil, fmt.Errorf("GET %s: %w", label, err) + } + defer resp.Body.Close() //nolint:errcheck + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("%s returned HTTP %d", label, resp.StatusCode) + } + var result T + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("decode %s: %w", label, err) + } + return &result, nil +} + +// GetVMCPStatus queries the /status endpoint on the given NodePort and returns +// the parsed response. +func GetVMCPStatus(nodePort int32) (*VMCPStatusResponse, error) { + return getAndDecodeJSON[VMCPStatusResponse]( + fmt.Sprintf("http://localhost:%d/status", nodePort), "/status") +} + +// GetVMCPBackendsHealth queries the /api/backends/health endpoint on the given +// NodePort and returns the parsed response. +func GetVMCPBackendsHealth(nodePort int32) (*VMCPBackendsHealthResponse, error) { + return getAndDecodeJSON[VMCPBackendsHealthResponse]( + fmt.Sprintf("http://localhost:%d/api/backends/health", nodePort), "/api/backends/health") +} diff --git a/test/e2e/thv-operator/virtualmcp/virtualmcp_circuit_breaker_test.go b/test/e2e/thv-operator/virtualmcp/virtualmcp_circuit_breaker_test.go index 764065e39e..ce34ae8882 100644 --- a/test/e2e/thv-operator/virtualmcp/virtualmcp_circuit_breaker_test.go +++ b/test/e2e/thv-operator/virtualmcp/virtualmcp_circuit_breaker_test.go @@ -363,6 +363,91 @@ var _ = Describe("VirtualMCPServer Circuit Breaker Lifecycle", Ordered, func() { GinkgoWriter.Printf(" Circuit breaker state verified above; capability filtering covered by unit tests\n") }) + It("should report consistent health state in /status and /api/backends/health", func() { + By("Obtaining the vMCP NodePort") + nodePort := GetVMCPNodePort(ctx, k8sClient, vmcpServerName, testNamespace, timeout, pollingInterval) + + // Fetch both endpoints in the same polling iteration so that a backend + // transitioning between states mid-comparison does not cause a spurious + // mismatch. Both snapshots must agree before the assertion passes. + // + // Backend IDs in /api/backends/health equal MCPServer names (set in + // k8s.go GetWorkloadAsVMCPBackend, ID = mcpServer.Name). + By("Polling until both endpoints agree on health state and unstable backend is unhealthy") + var ( + lastStatusResp *VMCPStatusResponse + lastBackendsHealth *VMCPBackendsHealthResponse + ) + Eventually(func() error { + bh, err := GetVMCPBackendsHealth(nodePort) + if err != nil { + return fmt.Errorf("GET /api/backends/health: %w", err) + } + if !bh.MonitoringEnabled { + return fmt.Errorf("/api/backends/health: monitoring not enabled") + } + if len(bh.Backends) == 0 { + return fmt.Errorf("/api/backends/health: no backends listed") + } + + sr, err := GetVMCPStatus(nodePort) + if err != nil { + return fmt.Errorf("GET /status: %w", err) + } + if len(sr.Backends) == 0 { + return fmt.Errorf("/status: no backends listed") + } + + // Build name→health map from /status snapshot. + statusHealthByName := make(map[string]string, len(sr.Backends)) + for _, b := range sr.Backends { + statusHealthByName[b.Name] = b.Health + } + + // Both snapshots must agree on every backend in /api/backends/health. + for backendID, healthState := range bh.Backends { + statusHealth, found := statusHealthByName[backendID] + if !found { + return fmt.Errorf("backend %q in /api/backends/health but missing from /status", backendID) + } + if statusHealth != healthState.Status { + return fmt.Errorf("backend %q: /status=%q /api/backends/health=%q (inconsistent)", + backendID, statusHealth, healthState.Status) + } + } + + // The unstable backend must be unhealthy in this consistent snapshot. + unstableHealthState, inHealth := bh.Backends[backend2Name] + if !inHealth { + return fmt.Errorf("unstable backend %q not found in /api/backends/health", backend2Name) + } + if unstableHealthState.Status == "healthy" { + return fmt.Errorf("unstable backend %q still healthy in /api/backends/health", backend2Name) + } + unstableStatusHealth, inStatus := statusHealthByName[backend2Name] + if !inStatus { + return fmt.Errorf("unstable backend %q not found in /status", backend2Name) + } + if unstableStatusHealth == "healthy" { + return fmt.Errorf("unstable backend %q still healthy in /status (issue #4103 regression)", backend2Name) + } + + lastBackendsHealth = bh + lastStatusResp = sr + return nil + }, timeout, pollingInterval).Should(Succeed(), + "endpoints should converge on a consistent unhealthy state for the unstable backend") + + // Log the final consistent snapshot for debugging. + for _, b := range lastStatusResp.Backends { + healthEntry := lastBackendsHealth.Backends[b.Name] + if healthEntry != nil { + GinkgoWriter.Printf("✓ backend=%s /status=%s /api/backends/health=%s\n", + b.Name, b.Health, healthEntry.Status) + } + } + }) + It("should close circuit breaker when backend recovers", func() { By("Restoring unstable backend by fixing the image") backend := &mcpv1alpha1.MCPServer{}