diff --git a/.codex/skills/qa-cluster-bringup/SKILL.md b/.codex/skills/qa-cluster-bringup/SKILL.md index 834a5816..2f11312d 100644 --- a/.codex/skills/qa-cluster-bringup/SKILL.md +++ b/.codex/skills/qa-cluster-bringup/SKILL.md @@ -113,6 +113,27 @@ If `cluster doctor` reports admin/UI/ingest key mismatches, roll the API, UI, ingest, and gateway deployments after patching `mcp-sentinel-secrets` (see `CLAUDE.md` → API keys). Do not paper over a `Degraded` reading. +### Kind image architecture gotcha + +Before manually refreshing API/UI/operator/gateway images in Kind, match the +image platform to the Kind node: + +```bash +NODE_ARCH="$(docker version --format '{{.Server.Arch}}')" +IMAGE="registry.registry.svc.cluster.local:5000/:qa-$(date +%s)" +docker build --platform="linux/${NODE_ARCH}" -t "$IMAGE" -f "$DOCKERFILE" . +docker image inspect "$IMAGE" --format '{{.Os}}/{{.Architecture}}' +kind load docker-image "$IMAGE" --name mcp-runtime +kubectl set image -n "$NAMESPACE" deploy/"$DEPLOYMENT" "$CONTAINER=$IMAGE" +kubectl rollout status -n "$NAMESPACE" deploy/"$DEPLOYMENT" --timeout=180s +``` + +Do not trust a successful `kind load docker-image` by itself. Containerd can +load a `linux/amd64` image into a `linux/arm64` Kind node, but the pod will not +run correctly. If a service image was built for the wrong architecture, rebuild +API and UI with the node architecture, load those exact image references, set +the deployment image to the same ref, and wait for rollout. + ## Step 6 — Expose the gateway ```bash diff --git a/.codex/skills/qa-e2e-ui/SKILL.md b/.codex/skills/qa-e2e-ui/SKILL.md index 18177bc4..622f8a6a 100644 --- a/.codex/skills/qa-e2e-ui/SKILL.md +++ b/.codex/skills/qa-e2e-ui/SKILL.md @@ -1,6 +1,6 @@ --- name: qa-e2e-ui -description: Browser-first real-cluster Sentinel UI/dashboard QA - role-based navigation, auth flows, every tab, forms, filters, destructive actions, rendered data, network/API evidence, console evidence, responsive/accessibility checks, cleanup, static assets, and public-host defenses. Use when Codex is asked to QA UI changes, dashboard regressions, login/admin/tenant flows, browser-visible API behavior, or copyable MCP connect config. Complements qa-e2e-security with feature-correctness and browser interaction checks. Assumes qa-cluster-bringup has run. +description: Browser-first real-cluster Sentinel UI/dashboard QA - role-based navigation, auth flows, every tab, forms, filters, destructive actions, rendered data, network/API evidence, console evidence, responsive/accessibility checks, cleanup, static assets, and public-host defenses. Use when Codex is asked to QA UI changes, dashboard regressions, login/admin/tenant flows, browser-visible API behavior, copyable MCP connect config, or backend analytics/observability changes that must be validated through UI controls. Complements qa-e2e-security with feature-correctness and browser interaction checks. Assumes qa-cluster-bringup has run. --- # QA - E2E UI (live cluster) @@ -57,6 +57,23 @@ trap 'rm -rf "$QA_TMP"' EXIT If the live cluster has unrelated user changes, work with them. Do not retire or mutate non-temporary objects unless the user explicitly approves. +If you rebuild the API or UI image before a browser pass, build for the Kind +node architecture, not your memory of the last machine: + +```bash +NODE_ARCH="$(docker version --format '{{.Server.Arch}}')" +IMAGE="registry.registry.svc.cluster.local:5000/:ui-qa-$(date +%s)" +docker build --platform="linux/${NODE_ARCH}" -t "$IMAGE" -f "$DOCKERFILE" . +docker image inspect "$IMAGE" --format '{{.Os}}/{{.Architecture}}' +kind load docker-image "$IMAGE" --name mcp-runtime +kubectl set image -n mcp-sentinel deploy/"$DEPLOYMENT" "$CONTAINER=$IMAGE" +kubectl rollout status -n mcp-sentinel deploy/"$DEPLOYMENT" --timeout=180s +``` + +`kind load docker-image` can load the wrong architecture into containerd. A UI +QA pass is invalid if the pod is still running an old image or an image whose +architecture does not match the Kind node. + ## Step 2 - Choose audit mode - **full-ui**: every visible capability and every role. Default for broad UI @@ -73,10 +90,17 @@ Diff guidance: | `services/ui/main.go` | Dashboard Load, Auth, UI->API Proxy, affected tabs | | `services/ui/static/**` | Browser Matrix, Static Assets, Responsive/A11y | | `services/api/internal/runtimeapi/**` | Catalog, Governance, Operations, API Contract | +| `services/api/internal/runtimeapi/*observability*`, `services/mcp-gateway/**`, `k8s/*prometheus*` | My Activity scoped observability, Prometheus and Grafana actions, scoped query evidence, tenant negative checks | | `services/api/internal/platformstore/**` or auth code | Auth, API Keys, Role Gating | | `config/ingress/**`, `k8s/**` UI ingress | Public-host Defense | | `docs/**`, `website/**` only | Dashboard Load smoke, then report docs-only scope | +Before selecting checks, map every touched UI-visible change to a browser +workflow. If the diff affects data rendered by a tab but not `services/ui/**` +directly, still validate the owning tab and controls through the browser. Do +not report a backend-only pass for a change whose success or failure is visible +in the dashboard. + ## Step 3 - Browser instrumentation is required Prefer MCP browser tools in Codex sessions: @@ -142,6 +166,23 @@ Create or mutate only temporary `qa-audit-*` objects for destructive actions. Record every skipped page/control with the reason and the fixture or cluster mode needed to cover it later. +For changed UI-visible behavior, include at least one positive and one negative +browser/API assertion for the exact changed control. Examples: + +- Metrics/observability: tenant-owned `qa-audit-*` server shows per-server + `Prometheus` and `Grafana` actions by default. The clicked URL or fetched + link is scoped to its namespace/server, and a shared or foreign namespace + returns 403/404 for the same tenant session. +- Tenant observability buttons: tenant users should not see the raw header + `Prometheus` or `Grafana` links; those are admin-only cluster-wide surfaces. + Tenant users should see per-server `Prometheus` and `Grafana` actions in My + Activity when scoped observability is available. +- Analytics: changing the server selector triggers a network request with the + selected namespace/server filters and excludes shared catalog servers from + personal totals. +- Role-gated actions: the allowed role sees and can run the action; signed-out + or disallowed roles cannot see it or receive an intentional auth error. + ## Step 6 - Curl smoke and API contract evidence Use curl to support browser findings, not replace them. diff --git a/.codex/skills/qa-e2e-ui/references/ui-coverage.md b/.codex/skills/qa-e2e-ui/references/ui-coverage.md index 982bf41f..6d6920e9 100644 --- a/.codex/skills/qa-e2e-ui/references/ui-coverage.md +++ b/.codex/skills/qa-e2e-ui/references/ui-coverage.md @@ -42,6 +42,7 @@ Admin (`admin@mcpruntime.org` / `admin@123`): - Hidden tab fallback works after logout. - Auto-refresh starts only for admin dashboard and stops on logout. - Grafana and Prometheus header links appear only for admin in dev path mode. +- Normal tenant users never see the raw header Grafana/Prometheus links. API-key login: @@ -114,13 +115,28 @@ Tenant user only: - Metrics: My Servers, Ready, Requests, Denied, Deny Rate. - Deployed Servers table: identity, namespace, status, inventory, endpoint, - Analytics action, Copy URL, tenant Retire. + Analytics action, Prometheus action, Grafana action, Copy URL, tenant Retire. - Usage controls: All servers, per-server selector, 24h/7d/30d/90d, Refresh. - Usage tables: per-server usage, top tools, recent activity. - Empty state: no personal servers. - Shared catalog servers are excluded from personal count. - Analytics empty response and API error render clear states. - If a selected server disappears, selector and tables recover cleanly. +- Scoped observability: clicking Prometheus for a temporary tenant server opens + an allowlisted `/api/runtime/observability/prometheus/query` URL with + `namespace` and `server` filters for that exact server. +- Scoped observability links: `/api/runtime/observability/links` returns only + queries for the authorized tenant server and does not expose arbitrary PromQL. +- Grafana tenant action: clicking Grafana opens the default scoped platform + Grafana dashboard endpoint for that namespace/server. It must not open the raw + cluster-wide `/grafana` UI for tenant users. +- Tenant negative checks: the same user receives an intentional 403/404 when + requesting observability links or Prometheus queries for a shared or foreign + namespace/server. +- Backend metrics regressions: when the diff touches gateway metrics, + Prometheus scrape config, or runtime observability APIs, the browser matrix + must include the Metrics button plus network evidence from the scoped + Prometheus query endpoint, even when no UI file changed. ## API Keys diff --git a/.gitignore b/.gitignore index a0ed1b37..5d103d3d 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ r.go tags # Local/generated artifacts +/.playwright-mcp/ /mcp-sentinel/ /services/mcp-server/mcp-sentinel-mcp-server /services/ui/mcp-sentinel-ui diff --git a/docs/cluster-readiness.md b/docs/cluster-readiness.md index ba1b4cae..bfc3be54 100644 --- a/docs/cluster-readiness.md +++ b/docs/cluster-readiness.md @@ -245,7 +245,9 @@ When `MCP_PLATFORM_DOMAIN=example.com` is set, setup derives these public names: - `registry.example.com` for registry ingress. - `mcp.example.com` for MCP server traffic. -- `platform.example.com` for the dashboard, API, and Grafana paths. Prometheus remains an internal metrics backend. +- `platform.example.com` for the dashboard, API, and admin-gated Grafana paths. + Prometheus remains an internal metrics backend; tenant users reach it only + through server-scoped API queries. All configured public names must resolve to the cluster ingress address before certificate issuance. For Let's Encrypt HTTP-01, port 80 must reach the ingress diff --git a/docs/getting-started.md b/docs/getting-started.md index ea509100..367f76e5 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -182,6 +182,12 @@ When building setup images from a machine with a different CPU architecture than the cluster, set `MCP_IMAGE_PLATFORM` to the target node platform, for example `MCP_IMAGE_PLATFORM=linux/amd64` for standard VPS/k3s nodes. +Tenant users open server-scoped Prometheus and Grafana views from Activity +server rows. The platform API verifies access to the exact `MCPServer` and +expands only allowlisted Prometheus queries; raw Prometheus remains internal. +The bundled Prometheus discovers operator-managed MCPServer Services and +scrapes metrics from their gateway sidecars. + You can also skip the saved provision step and pass `--external-registry-url registry.example.com` directly to `setup`. diff --git a/docs/internals/go-package-reference.md b/docs/internals/go-package-reference.md index 05f8f634..2cdfa653 100644 --- a/docs/internals/go-package-reference.md +++ b/docs/internals/go-package-reference.md @@ -2451,6 +2451,8 @@ const ( DefaultPort = 8088 // DefaultGatewayPort is the default container port for the MCP proxy sidecar. DefaultGatewayPort = 8091 + // DefaultGatewayMetricsPort is the default Prometheus scrape port for the MCP gateway sidecar. + DefaultGatewayMetricsPort = 9103 // DefaultServicePort is the default service port. DefaultServicePort = 80 ) diff --git a/docs/security/authz-matrix.md b/docs/security/authz-matrix.md index 39974842..3391548a 100644 --- a/docs/security/authz-matrix.md +++ b/docs/security/authz-matrix.md @@ -63,6 +63,9 @@ Expected codes: | `/api/user/api-keys` | GET, POST | 401 | 200 | 200 | 200 | 401/403 | Lifecycle for user-owned keys. | | `/api/user/api-keys/{id}` | GET, DEL | 401 | 200 | 200 | 200 | 401/403 | | | `/api/runtime/servers` | GET, POST | 401 | 200 | 200 | 200 | 401/403 | List/create MCP servers. | +| `/api/runtime/observability/links` | GET | 401 | 200/403 | 200/403 | 200 | 401/403 | Normal users are limited to team namespaces or caller-owned catalog servers. | +| `/api/runtime/observability/grafana/dashboard` | GET | 401 | 200/403 | 200/403 | 200 | 401/403 | Renders a server-scoped dashboard through the API. | +| `/api/runtime/observability/prometheus/query` | GET | 401 | 200/403 | 200/403 | 200 | 401/403 | PromQL is allowlisted and server-scoped by the API. | | `/api/runtime/teams` | GET | 401 | 200 | 200 | 200 | 401/403 | | | `/api/runtime/teams` | POST | 401 | 403 | 403 | 200 | 401/403 | Admin-only team + namespace provisioning. | | `/api/runtime/teams/{id}` | GET | 401 | 200 | 200 | 200 | 401/403 | Team members can read only their teams; admins can read all teams. | diff --git a/docs/sentinel.md b/docs/sentinel.md index 9053191b..46745059 100644 --- a/docs/sentinel.md +++ b/docs/sentinel.md @@ -9,7 +9,7 @@ | **mcp-gateway** | Transparent sidecar. Extracts identity, evaluates tool-level policy, emits allow/deny audit events, forwards traffic upstream. | | **ingest** | Receives `POST /events`, validates ingest-scoped API keys or optional JWTs, writes to Kafka. | | **processor** | Consumes Kafka, batches, writes into ClickHouse with indexed audit fields. | -| **api** | Analytics endpoints, dashboard summaries, user/team-scoped analytics, runtime governance APIs (grants/sessions), platform audit, MCP server catalog, component operations. | +| **api** | Analytics endpoints, dashboard summaries, user/team-scoped analytics, scoped observability links and Prometheus queries, runtime governance APIs (grants/sessions), platform audit, MCP server catalog, component operations. | | **ui** | Control-plane dashboard: user MCP server dashboard, MCP server catalog and connect config, user API keys, analytics dashboard, governance, MCP operations, and platform management. | | **gateway** | Kubernetes deployment fronting the sentinel API, ingest, and UI surfaces. | | **workspace assistant sample** | Sample MCP server in `examples/workspace-assistant-mcp` for end-to-end smoke tests. | @@ -106,6 +106,32 @@ For local `setup --test-mode` clusters, setup seeds two email/password logins: | **Prometheus** | Not exposed | `prometheus:9090` | Internal metrics backend and Grafana datasource. Use a temporary `kubectl port-forward` only for backend debugging. | | **MCP gateway sidecar** | per-server route, for example `/workspace-assistant-mcp/mcp` | pod-local sidecar port | Enforces policy and forwards to the MCP server container. | +### Scoped user observability + +The Activity server list exposes Prometheus and Grafana actions only for an +`MCPServer` the authenticated principal can observe. The API checks the live +server before returning links or querying Prometheus, and normal users are +limited to their team namespaces or explicitly caller-owned catalog servers. + +Prometheus requests use +`/api/runtime/observability/prometheus/query?namespace=&server=&query_id=`. +The `query_id` is allowlisted (`up`, `request_rate`, `deny_rate`, +`latency_p95`); arbitrary PromQL is never accepted. `PROMETHEUS_API_URL` +defaults to `http://prometheus:9090/prometheus`. + +Without an external Grafana dashboard template, the API renders a scoped +dashboard from the same allowlisted queries. Set `GRAFANA_SERVER_DASHBOARD_URL` +to a template containing `{namespace}` and `{server}` only when that Grafana +deployment enforces tenant-aware access. Normal-user external links also +require `GRAFANA_SCOPED_USER_ACCESS=true`. + +The bundled Prometheus uses read-only Kubernetes discovery for annotated +MCPServer Services and scrapes the gateway sidecar `/metrics` endpoint. +Gateway metrics include request totals, policy decisions, latency, request and +response bytes, in-flight requests, and policy reload state. HTTP and MCP method +labels are normalized to bounded sets to prevent attacker-controlled label +cardinality. + ### Auth model | Service | Auth behavior | @@ -114,7 +140,7 @@ For local `setup --test-mode` clusters, setup seeds two email/password logins: | **ui** | `/auth/login` creates an HttpOnly UI session from `api_key`, `id_token`, or `email`/`password`. The UI then proxies `/api/*` with an upstream API key or bearer token. `/auth/admin-check` accepts admin UI sessions or keys from `ADMIN_API_KEYS`; it falls back to `API_KEYS` only when the explicit legacy dev/test fallback is enabled. | | **ingest** | `/live`, `/ready`, and `/health` are open. `/events` accepts `x-api-key` from `INGEST_API_KEYS`, legacy `API_KEYS`, or a configured OIDC bearer token. If no API keys and no JWKS are configured, intake auth is bypassed. | | **processor** | No data API. It exposes metrics and a simple health check on the metrics port. | -| **mcp-gateway** | No admin API. It authenticates MCP requests according to the rendered server policy: header identity or OAuth bearer tokens, depending on `spec.auth`. | +| **mcp-gateway** | No admin API. It authenticates MCP requests according to the rendered server policy and exposes Prometheus metrics at `/metrics`. | ### API service @@ -140,6 +166,9 @@ metrics on `METRICS_PORT` (default `9090`). | `GET`, `POST` | `/api/runtime/servers` | List or apply `MCPServer` resources through runtime authz scope. `tenant` mode defaults signed-in users to team namespaces they belong to; `org` mode includes the org catalog plus team namespaces; `public` mode allows anonymous catalog reads and signed-in publishes in the public catalog plus team namespaces. Responses include `publish_policy` for active-server quota/cooldown visibility. | | `DELETE` | `/api/runtime/servers/{namespace}/{name}` | Retire an owned MCPServer. Retiring deletes the MCPServer from Kubernetes and frees one active-server quota slot. | | `GET` | `/api/runtime/server-events?namespace=&server=` | Recent analytics events for one administered MCPServer; full identity/session/payload details are not exposed to regular namespace readers. | +| `GET` | `/api/runtime/observability/links?namespace=&server=` | Return scoped observability actions for one authorized MCPServer. | +| `GET` | `/api/runtime/observability/grafana/dashboard?namespace=&server=` | Render an API-scoped dashboard backed by allowlisted Prometheus queries. | +| `GET` | `/api/runtime/observability/prometheus/query?namespace=&server=&query_id=` | Run one allowlisted, server-scoped Prometheus query. | | `GET`, `POST` | `/api/runtime/grants` | List or apply `MCPAccessGrant` resources. Lists are scoped to administered servers; apply requires admin, server owner, or team owner. | | `GET`, `DELETE` | `/api/runtime/grants/{namespace}/{name}` | Read or delete one grant for an administered server. | | `PATCH` | `/api/runtime/grants/{namespace}/{name}` | Set `spec.disabled` with `{"disabled":true|false}`; requires admin, server owner, or team owner. | diff --git a/internal/cli/setup/platform/helpers_test.go b/internal/cli/setup/platform/helpers_test.go index 2e941849..ae1eb221 100644 --- a/internal/cli/setup/platform/helpers_test.go +++ b/internal/cli/setup/platform/helpers_test.go @@ -2224,6 +2224,29 @@ func TestPrometheusScrapesClickHouseMetricsPort(t *testing.T) { } } +func TestPrometheusDiscoversGatewaySidecarMetrics(t *testing.T) { + content, err := os.ReadFile("../../../../k8s/11-prometheus.yaml") + if err != nil { + t.Fatalf("failed to read prometheus manifest: %v", err) + } + text := string(content) + for _, want := range []string{ + "kind: ServiceAccount", + "name: prometheus", + "name: mcp-sentinel-prometheus-discovery", + "resources: [\"endpoints\", \"pods\", \"services\"]", + "job_name: mcp-gateway-sidecars", + "role: endpoints", + "__meta_kubernetes_service_annotation_prometheus_io_scrape", + "__meta_kubernetes_service_label_app_kubernetes_io_managed_by", + "__meta_kubernetes_service_annotation_prometheus_io_port", + } { + if !strings.Contains(text, want) { + t.Fatalf("expected Prometheus manifest to contain %q, got:\n%s", want, text) + } + } +} + func TestClickHouseExposesPrometheusMetrics(t *testing.T) { for _, path := range []string{"../../../../k8s/03-clickhouse.yaml", "../../../../k8s/03-clickhouse-hostpath.yaml"} { content, err := os.ReadFile(path) diff --git a/internal/operator/constants.go b/internal/operator/constants.go index 74b58974..1738e576 100644 --- a/internal/operator/constants.go +++ b/internal/operator/constants.go @@ -21,6 +21,8 @@ const ( DefaultPort = 8088 // DefaultGatewayPort is the default container port for the MCP proxy sidecar. DefaultGatewayPort = 8091 + // DefaultGatewayMetricsPort is the default Prometheus scrape port for the MCP gateway sidecar. + DefaultGatewayMetricsPort = 9103 // DefaultServicePort is the default service port. DefaultServicePort = 80 ) diff --git a/internal/operator/controller_test.go b/internal/operator/controller_test.go index 48633f24..7558df80 100644 --- a/internal/operator/controller_test.go +++ b/internal/operator/controller_test.go @@ -528,6 +528,7 @@ func TestReconcileDeploymentAddsGatewaySidecar(t *testing.T) { envByName[envVar.Name] = envVar } assertEqual(t, "gatewayPortEnv", envByName["PORT"].Value, "8091") + assertEqual(t, "gatewayMetricsPortEnv", envByName["METRICS_PORT"].Value, "9103") assertEqual(t, "gatewayUpstreamEnv", envByName["UPSTREAM_URL"].Value, "http://127.0.0.1:8088") assertEqual(t, "gatewayOTELServiceName", envByName["OTEL_SERVICE_NAME"].Value, "gateway-server-gateway") assertEqual(t, "gatewayOTELEndpoint", envByName["OTEL_EXPORTER_OTLP_ENDPOINT"].Value, "http://otel-collector.mcp-sentinel.svc.cluster.local:4318") @@ -586,10 +587,76 @@ func TestReconcileServiceUsesGatewayPortWhenEnabled(t *testing.T) { t.Fatalf("failed to fetch service: %v", err) } - if len(service.Spec.Ports) != 1 { - t.Fatalf("expected 1 service port, got %d", len(service.Spec.Ports)) + if len(service.Spec.Ports) != 2 { + t.Fatalf("expected 2 service ports, got %d", len(service.Spec.Ports)) } assertEqual(t, "serviceTargetPort", service.Spec.Ports[0].TargetPort.IntVal, int32(8091)) + assertEqual(t, "serviceMetricsPort", service.Spec.Ports[1].Port, int32(DefaultGatewayMetricsPort)) + assertEqual(t, "serviceManagedByLabel", service.Labels["app.kubernetes.io/managed-by"], "mcp-runtime") + assertEqual(t, "servicePrometheusScrape", service.Annotations["prometheus.io/scrape"], "true") + assertEqual(t, "servicePrometheusPath", service.Annotations["prometheus.io/path"], "/metrics") + assertEqual(t, "servicePrometheusPort", service.Annotations["prometheus.io/port"], "9103") +} + +func TestReconcileServicePreservesExistingAnnotations(t *testing.T) { + replicas := int32(1) + mcpServer := mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "annotated-gateway", + Namespace: "default", + }, + Spec: mcpv1alpha1.MCPServerSpec{ + Image: "example.com/gateway-service", + ImageTag: "latest", + Port: 8088, + ServicePort: 80, + Replicas: &replicas, + Gateway: &mcpv1alpha1.GatewayConfig{ + Enabled: true, + Image: "example.com/mcp-gateway:latest", + Port: 8091, + }, + }, + } + + scheme := runtime.NewScheme() + if err := mcpv1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add mcp scheme: %v", err) + } + if err := corev1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add core scheme: %v", err) + } + + existingService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: mcpServer.Name, + Namespace: mcpServer.Namespace, + Annotations: map[string]string{ + "example.com/custom": "keep-me", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{Port: 80}}, + }, + } + + client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&mcpServer, existingService).Build() + reconciler := MCPServerReconciler{ + Client: client, + Scheme: scheme, + } + + if err := reconciler.reconcileService(context.Background(), &mcpServer); err != nil { + t.Fatalf("reconcileService() error = %v", err) + } + + var service corev1.Service + if err := client.Get(context.Background(), types.NamespacedName{Name: mcpServer.Name, Namespace: mcpServer.Namespace}, &service); err != nil { + t.Fatalf("failed to fetch service: %v", err) + } + + assertEqual(t, "customAnnotation", service.Annotations["example.com/custom"], "keep-me") + assertEqual(t, "servicePrometheusScrape", service.Annotations["prometheus.io/scrape"], "true") } func TestResolveGatewayImage(t *testing.T) { diff --git a/internal/operator/deployment.go b/internal/operator/deployment.go index f66801b2..a1313aeb 100644 --- a/internal/operator/deployment.go +++ b/internal/operator/deployment.go @@ -427,8 +427,10 @@ func (r *MCPServerReconciler) buildGatewayContainer(mcpServer *mcpv1alpha1.MCPSe } port := mcpServer.Spec.Gateway.Port + metricsPort := int32(DefaultGatewayMetricsPort) envVars := []corev1.EnvVar{ {Name: "PORT", Value: strconv.Itoa(int(port))}, + {Name: "METRICS_PORT", Value: strconv.Itoa(int(metricsPort))}, {Name: "UPSTREAM_URL", Value: mcpServer.Spec.Gateway.UpstreamURL}, {Name: "POLICY_FILE", Value: gatewayPolicyFilePath}, {Name: "MCP_SERVER_NAME", Value: mcpServer.Name}, @@ -512,6 +514,11 @@ func (r *MCPServerReconciler) buildGatewayContainer(mcpServer *mcpv1alpha1.MCPSe ContainerPort: port, Protocol: corev1.ProtocolTCP, }, + { + Name: "metrics", + ContainerPort: metricsPort, + Protocol: corev1.ProtocolTCP, + }, }, Env: envVars, SecurityContext: kubeworkload.RestrictedReadOnlyContainerSecurityContext(), diff --git a/internal/operator/service.go b/internal/operator/service.go index 2a61e15f..6dc0efda 100644 --- a/internal/operator/service.go +++ b/internal/operator/service.go @@ -2,6 +2,7 @@ package operator import ( "context" + "strconv" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,20 +30,40 @@ func (r *MCPServerReconciler) reconcileService(ctx context.Context, mcpServer *m op, err := ctrl.CreateOrUpdate(ctx, r.Client, service, func() error { labels := map[string]string{ - "app": mcpServer.Name, + LabelApp: mcpServer.Name, + LabelManagedBy: LabelManagedByValue, + } + service.Labels = labels + if service.Annotations == nil { + service.Annotations = map[string]string{} + } + if gatewayEnabled(mcpServer) { + service.Annotations["prometheus.io/path"] = "/metrics" + service.Annotations["prometheus.io/port"] = strconv.Itoa(DefaultGatewayMetricsPort) + service.Annotations["prometheus.io/scrape"] = "true" + } + + ports := []corev1.ServicePort{ + { + Name: "http", + Port: mcpServer.Spec.ServicePort, + TargetPort: intstr.FromInt32(targetPort), + Protocol: corev1.ProtocolTCP, + }, + } + if gatewayEnabled(mcpServer) { + ports = append(ports, corev1.ServicePort{ + Name: "metrics", + Port: DefaultGatewayMetricsPort, + TargetPort: intstr.FromInt32(DefaultGatewayMetricsPort), + Protocol: corev1.ProtocolTCP, + }) } service.Spec = corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, Selector: labels, - Ports: []corev1.ServicePort{ - { - Name: "http", - Port: mcpServer.Spec.ServicePort, - TargetPort: intstr.FromInt32(targetPort), - Protocol: corev1.ProtocolTCP, - }, - }, + Ports: ports, } if err := ctrl.SetControllerReference(mcpServer, service, r.Scheme); err != nil { diff --git a/k8s/01-config.yaml b/k8s/01-config.yaml index 07bfd5c7..735acf7f 100644 --- a/k8s/01-config.yaml +++ b/k8s/01-config.yaml @@ -30,4 +30,5 @@ data: MCP_CLUSTER_NAME: "local" # Dashboard observability links GRAFANA_URL: "/grafana" + PROMETHEUS_API_URL: "http://prometheus:9090/prometheus" DASHBOARD_REFRESH_INTERVAL: "30" diff --git a/k8s/11-prometheus.yaml b/k8s/11-prometheus.yaml index a6095ee3..cc34196d 100644 --- a/k8s/11-prometheus.yaml +++ b/k8s/11-prometheus.yaml @@ -1,4 +1,33 @@ apiVersion: v1 +kind: ServiceAccount +metadata: + name: prometheus + namespace: mcp-sentinel +automountServiceAccountToken: true +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: mcp-sentinel-prometheus-discovery +rules: + - apiGroups: [""] + resources: ["endpoints", "pods", "services"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: mcp-sentinel-prometheus-discovery +subjects: + - kind: ServiceAccount + name: prometheus + namespace: mcp-sentinel +roleRef: + kind: ClusterRole + name: mcp-sentinel-prometheus-discovery + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: v1 kind: ConfigMap metadata: name: prometheus-config @@ -20,6 +49,37 @@ data: - job_name: clickhouse static_configs: - targets: ["clickhouse:9363"] + - job_name: mcp-gateway-sidecars + kubernetes_sd_configs: + - role: endpoints + relabel_configs: + - action: keep + source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scrape] + regex: "true" + - action: keep + source_labels: [__meta_kubernetes_service_label_app_kubernetes_io_managed_by] + regex: mcp-runtime + - action: keep + source_labels: [__meta_kubernetes_endpoint_port_name] + regex: http + - action: replace + source_labels: [__meta_kubernetes_service_annotation_prometheus_io_path] + target_label: __metrics_path__ + regex: "(.+)" + - action: replace + source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port] + target_label: __address__ + regex: ([^:]+)(?::[0-9]+)?;([0-9]+) + replacement: "$1:$2" + - action: replace + source_labels: [__meta_kubernetes_namespace] + target_label: namespace + - action: replace + source_labels: [__meta_kubernetes_service_name] + target_label: server + - action: replace + source_labels: [__meta_kubernetes_pod_name] + target_label: pod --- apiVersion: v1 kind: Service @@ -49,7 +109,8 @@ spec: labels: app: prometheus spec: - automountServiceAccountToken: false + serviceAccountName: prometheus + automountServiceAccountToken: true securityContext: runAsNonRoot: true runAsUser: 65534 diff --git a/services/api/internal/runtimeapi/observability.go b/services/api/internal/runtimeapi/observability.go new file mode 100644 index 00000000..5d709a76 --- /dev/null +++ b/services/api/internal/runtimeapi/observability.go @@ -0,0 +1,563 @@ +package runtimeapi + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "html" + "io" + "log" + "net/http" + "net/url" + "strings" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + mcpv1alpha1 "mcp-runtime/api/v1alpha1" + "mcp-runtime/pkg/controlplane" + "mcp-runtime/pkg/k8sclient" +) + +const ( + defaultPrometheusAPIURL = "http://prometheus:9090/prometheus" + prometheusErrorBodyLimit = 1024 + + envPrometheusAPIURL = "PROMETHEUS_API_URL" + envMCPPrometheusAPIURL = "MCP_PROMETHEUS_API_URL" + envGrafanaServerDashboardURL = "GRAFANA_SERVER_DASHBOARD_URL" + envGrafanaScopedUserAccess = "GRAFANA_SCOPED_USER_ACCESS" +) + +type observabilityLinksResponse struct { + Namespace string `json:"namespace"` + Server string `json:"server"` + TeamID string `json:"team_id,omitempty"` + Prometheus observabilityPrometheusSet `json:"prometheus"` + Grafana observabilityGrafanaLink `json:"grafana"` +} + +type observabilityPrometheusSet struct { + Queries []observabilityPrometheusQueryLink `json:"queries"` + DirectAdminOnly bool `json:"direct_admin_only"` +} + +type observabilityPrometheusQueryLink struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + URL string `json:"url"` + Query string `json:"query,omitempty"` +} + +type observabilityGrafanaLink struct { + Available bool `json:"available"` + URL string `json:"url,omitempty"` + DirectAdminOnly bool `json:"direct_admin_only"` + Reason string `json:"reason,omitempty"` +} + +type scopedPrometheusQuery struct { + ID string + Name string + Description string + Query string +} + +type observabilityRequestError struct { + status int + message string +} + +func (e observabilityRequestError) Error() string { + return e.message +} + +func (s *RuntimeServer) HandleRuntimeObservabilityLinks(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("allow", http.MethodGet) + writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) + return + } + p, target, err := s.authorizedObservabilityTarget(r) + if err != nil { + writeObservabilityError(w, err) + return + } + writeJSON(w, http.StatusOK, observabilityLinksForMCPServer(*target, p, r)) +} + +func (s *RuntimeServer) HandleRuntimeObservabilityPrometheusQuery(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("allow", http.MethodGet) + writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) + return + } + _, target, err := s.authorizedObservabilityTarget(r) + if err != nil { + writeObservabilityError(w, err) + return + } + + queryID := strings.TrimSpace(r.URL.Query().Get("query_id")) + query, ok := scopedPrometheusQueryByID(queryID, target.Namespace, target.Name) + if !ok { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "unknown query_id"}) + return + } + + promURL, err := prometheusQueryURL(prometheusAPIBaseURL(), query.Query) + if err != nil { + log.Printf("observability prometheus proxy configuration error: %v", err) + writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "prometheus not configured"}) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, promURL, nil) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "query_build_failed"}) + return + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Printf("observability prometheus query failed namespace=%q server=%q query_id=%q err=%v", target.Namespace, target.Name, query.ID, err) + writeJSON(w, http.StatusBadGateway, map[string]string{"error": "prometheus_unavailable"}) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body := readPrometheusErrorBody(resp.Body) + log.Printf( + "observability prometheus query failed namespace=%q server=%q query_id=%q status=%d body=%q", + target.Namespace, + target.Name, + query.ID, + resp.StatusCode, + body, + ) + writeJSON(w, http.StatusBadGateway, map[string]string{"error": "prometheus_query_failed"}) + return + } + var payload any + if err := json.NewDecoder(io.LimitReader(resp.Body, 4<<20)).Decode(&payload); err != nil { + writeJSON(w, http.StatusBadGateway, map[string]string{"error": "prometheus_response_invalid"}) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "namespace": target.Namespace, + "server": target.Name, + "query_id": query.ID, + "query": query.Query, + "prometheus": payload, + }) +} + +func (s *RuntimeServer) HandleRuntimeObservabilityGrafanaDashboard(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("allow", http.MethodGet) + writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method_not_allowed"}) + return + } + _, target, err := s.authorizedObservabilityTarget(r) + if err != nil { + writeObservabilityError(w, err) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + queryResults := make([]grafanaDashboardQueryResult, 0, len(scopedPrometheusQueries(target.Namespace, target.Name))) + for _, query := range scopedPrometheusQueries(target.Namespace, target.Name) { + result := grafanaDashboardQueryResult{ + ID: query.ID, + Name: query.Name, + Description: query.Description, + Query: query.Query, + } + payload, err := queryPrometheus(ctx, query.Query) + if err != nil { + result.Error = err.Error() + } else if body, err := json.MarshalIndent(payload, "", " "); err != nil { + result.Error = "prometheus response could not be rendered" + } else { + result.Body = string(body) + } + queryResults = append(queryResults, result) + } + + w.Header().Set("content-type", "text/html; charset=utf-8") + w.Header().Set("cache-control", "no-store") + w.Header().Set("content-security-policy", "default-src 'none'; style-src 'unsafe-inline'") + w.Header().Set("referrer-policy", "no-referrer") + w.Header().Set("x-content-type-options", "nosniff") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(renderScopedGrafanaDashboard(target.Namespace, target.Name, queryResults))) +} + +type grafanaDashboardQueryResult struct { + ID string + Name string + Description string + Query string + Body string + Error string +} + +func (s *RuntimeServer) authorizedObservabilityTarget(r *http.Request) (principal, *mcpv1alpha1.MCPServer, error) { + if s == nil { + return principal{}, nil, observabilityRequestError{status: http.StatusServiceUnavailable, message: "runtime not available"} + } + control := s.controlPlane() + if control == nil { + return principal{}, nil, observabilityRequestError{status: http.StatusServiceUnavailable, message: "kubernetes not available"} + } + p, ok := principalFromContext(r.Context()) + if !ok { + return principal{}, nil, observabilityRequestError{status: http.StatusUnauthorized, message: "unauthorized"} + } + namespace := strings.TrimSpace(r.URL.Query().Get("namespace")) + serverName := strings.TrimSpace(r.URL.Query().Get("server")) + if namespace == "" || serverName == "" { + return principal{}, nil, observabilityRequestError{status: http.StatusBadRequest, message: "namespace and server are required"} + } + if p.Role != roleAdmin && !principalCanReadNamespace(p, namespace) { + return principal{}, nil, observabilityRequestError{status: http.StatusNotFound, message: "server not found"} + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + target, err := control.GetServer(ctx, namespace, serverName) + if apierrors.IsNotFound(err) { + return principal{}, nil, observabilityRequestError{status: http.StatusNotFound, message: "server not found"} + } + if err != nil { + code, msg := observabilityTargetReadStatus(err) + return principal{}, nil, observabilityRequestError{status: code, message: msg} + } + if !mcpServerObservableByPrincipal(*target, p) { + return principal{}, nil, observabilityRequestError{status: http.StatusNotFound, message: "server not found"} + } + return p, target, nil +} + +func observabilityTargetReadStatus(err error) (int, string) { + if err == nil { + return http.StatusNotFound, "server not found" + } + if apierrors.IsNotFound(err) { + return http.StatusNotFound, "server not found" + } + code, msg := k8sclient.HTTPStatusFromK8sError(err) + if code == http.StatusForbidden { + return http.StatusNotFound, "server not found" + } + return code, msg +} + +func writeObservabilityError(w http.ResponseWriter, err error) { + var requestErr observabilityRequestError + if errors.As(err, &requestErr) { + writeJSON(w, requestErr.status, map[string]string{"error": requestErr.message}) + return + } + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "observability_failed"}) +} + +func observabilityLinksForMCPServer(server mcpv1alpha1.MCPServer, p principal, r *http.Request) observabilityLinksResponse { + info := controlplane.ServerInfoFromMCPServer(server, controlplane.ServerDeploymentStatus{}) + return observabilityLinksForServerInfo(info, p, r) +} + +func observabilityLinksForServerInfo(info controlplane.ServerInfo, p principal, r *http.Request) observabilityLinksResponse { + queries := scopedPrometheusQueries(info.Namespace, info.Name) + queryLinks := make([]observabilityPrometheusQueryLink, 0, len(queries)) + for _, query := range queries { + apiPath := observabilityPrometheusAPIPath(info.Namespace, info.Name, query.ID) + queryLinks = append(queryLinks, observabilityPrometheusQueryLink{ + ID: query.ID, + Name: query.Name, + Description: query.Description, + URL: publicAPIURL(r, apiPath), + Query: query.Query, + }) + } + return observabilityLinksResponse{ + Namespace: info.Namespace, + Server: info.Name, + TeamID: strings.TrimSpace(info.TeamID), + Prometheus: observabilityPrometheusSet{ + Queries: queryLinks, + DirectAdminOnly: true, + }, + Grafana: grafanaLinkForServer(info, p, r), + } +} + +func observabilityPrometheusAPIPath(namespace, serverName, queryID string) string { + values := url.Values{} + values.Set("namespace", namespace) + values.Set("server", serverName) + values.Set("query_id", queryID) + return "/api/runtime/observability/prometheus/query?" + values.Encode() +} + +func observabilityGrafanaDashboardAPIPath(namespace, serverName string) string { + values := url.Values{} + values.Set("namespace", namespace) + values.Set("server", serverName) + return "/api/runtime/observability/grafana/dashboard?" + values.Encode() +} + +func publicAPIURL(r *http.Request, apiPath string) string { + if !strings.HasPrefix(apiPath, "/") { + apiPath = "/" + apiPath + } + host := forwardedHost(r) + if host == "" { + return apiPath + } + return forwardedScheme(r) + "://" + strings.TrimRight(host, "/") + apiPath +} + +func grafanaLinkForServer(info controlplane.ServerInfo, p principal, r *http.Request) observabilityGrafanaLink { + template := strings.TrimSpace(envOr(envGrafanaServerDashboardURL, "")) + if template == "" { + return observabilityGrafanaLink{ + Available: true, + URL: publicAPIURL(r, observabilityGrafanaDashboardAPIPath(info.Namespace, info.Name)), + DirectAdminOnly: false, + } + } + link := expandObservabilityURLTemplate(template, info.Namespace, info.Name) + if p.Role == roleAdmin || grafanaScopedUserAccessEnabled() { + return observabilityGrafanaLink{ + Available: true, + URL: link, + DirectAdminOnly: p.Role != roleAdmin, + } + } + return observabilityGrafanaLink{ + Available: false, + DirectAdminOnly: true, + Reason: "grafana requires tenant-aware access before user links are exposed", + } +} + +func queryPrometheus(ctx context.Context, query string) (any, error) { + promURL, err := prometheusQueryURL(prometheusAPIBaseURL(), query) + if err != nil { + return nil, fmt.Errorf("prometheus not configured") + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, promURL, nil) + if err != nil { + return nil, fmt.Errorf("query build failed") + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("prometheus unavailable") + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + log.Printf( + "observability prometheus dashboard query failed status=%d body=%q", + resp.StatusCode, + readPrometheusErrorBody(resp.Body), + ) + return nil, fmt.Errorf("prometheus query failed") + } + var payload any + if err := json.NewDecoder(io.LimitReader(resp.Body, 4<<20)).Decode(&payload); err != nil { + return nil, fmt.Errorf("prometheus response invalid") + } + return payload, nil +} + +func readPrometheusErrorBody(body io.Reader) string { + if body == nil { + return "" + } + payload, err := io.ReadAll(io.LimitReader(body, prometheusErrorBodyLimit)) + if err != nil { + return "" + } + return strings.TrimSpace(string(payload)) +} + +func renderScopedGrafanaDashboard(namespace, serverName string, results []grafanaDashboardQueryResult) string { + var b strings.Builder + b.WriteString(``) + b.WriteString(`Scoped Grafana - ` + html.EscapeString(namespace) + `/` + html.EscapeString(serverName) + ``) + b.WriteString(`
`) + b.WriteString(`

Scoped Grafana

`) + b.WriteString(`

Namespace ` + html.EscapeString(namespace) + ` / server ` + html.EscapeString(serverName) + `. Queries are generated by the platform API and pinned to this scope.

`) + b.WriteString(`
`) + for _, result := range results { + b.WriteString(`
`) + b.WriteString(`

` + html.EscapeString(result.Name) + `

`) + b.WriteString(`

` + html.EscapeString(result.Description) + `

`) + b.WriteString(`
` + html.EscapeString(result.Query) + `
`) + if result.Error != "" { + b.WriteString(`
` + html.EscapeString(result.Error) + `
`) + } else { + b.WriteString(`
` + html.EscapeString(result.Body) + `
`) + } + b.WriteString(`
`) + } + b.WriteString(`
`) + return b.String() +} + +func expandObservabilityURLTemplate(template, namespace, serverName string) string { + replacer := strings.NewReplacer( + "{namespace}", url.QueryEscape(namespace), + "{server}", url.QueryEscape(serverName), + ) + return replacer.Replace(template) +} + +func grafanaScopedUserAccessEnabled() bool { + switch strings.ToLower(strings.TrimSpace(envOr(envGrafanaScopedUserAccess, ""))) { + case "1", "true", "yes", "on", "enabled": + return true + default: + return false + } +} + +func mcpServerObservableByPrincipal(server mcpv1alpha1.MCPServer, p principal) bool { + if p.Role == roleAdmin { + return true + } + if principalOwnsObservabilityNamespace(p, server.Namespace) { + return true + } + return serverHasUserOwnerLabel(server.Labels, p) +} + +func serverInfoObservableByPrincipal(info controlplane.ServerInfo, p principal) bool { + if p.Role == roleAdmin { + return true + } + if principalOwnsObservabilityNamespace(p, info.Namespace) { + return true + } + return serverHasUserOwnerLabel(info.Labels, p) +} + +func principalOwnsObservabilityNamespace(p principal, namespace string) bool { + namespace = strings.TrimSpace(namespace) + if namespace == "" || namespace == sharedCatalogNamespace || isModeCatalogNamespace(namespace) { + return false + } + if strings.TrimSpace(p.Namespace) == namespace { + return true + } + for _, team := range p.Teams { + if strings.TrimSpace(team.Namespace) == namespace { + return true + } + } + for _, allowed := range p.AllowedNamespaces { + if strings.TrimSpace(allowed) == namespace { + return true + } + } + return false +} + +func serverHasUserOwnerLabel(labels map[string]string, p principal) bool { + userID := strings.TrimSpace(p.UserID()) + if userID == "" { + return false + } + return strings.TrimSpace(labels[platformUserIDLabel]) == userID +} + +func scopedPrometheusQueryByID(queryID, namespace, serverName string) (scopedPrometheusQuery, bool) { + queryID = strings.TrimSpace(queryID) + if queryID == "" { + queryID = "up" + } + for _, query := range scopedPrometheusQueries(namespace, serverName) { + if query.ID == queryID { + return query, true + } + } + return scopedPrometheusQuery{}, false +} + +func scopedPrometheusQueries(namespace, serverName string) []scopedPrometheusQuery { + selector := fmt.Sprintf(`namespace=%s,server=%s`, promQLString(namespace), promQLString(serverName)) + return []scopedPrometheusQuery{ + { + ID: "up", + Name: "Target health", + Description: "Prometheus scrape health for this MCP server scope.", + Query: "up{" + selector + "}", + }, + { + ID: "request_rate", + Name: "Request rate", + Description: "Five-minute MCP gateway request rate for this server.", + Query: "sum(rate(mcp_gateway_requests_total{" + selector + "}[5m]))", + }, + { + ID: "deny_rate", + Name: "Deny rate", + Description: "Five-minute policy denial rate for this server.", + Query: `sum(rate(mcp_gateway_policy_decisions_total{` + selector + `,decision="deny"}[5m]))`, + }, + { + ID: "latency_p95", + Name: "p95 latency", + Description: "Five-minute p95 gateway latency for this server.", + Query: "histogram_quantile(0.95, sum(rate(mcp_gateway_request_duration_seconds_bucket{" + selector + "}[5m])) by (le))", + }, + } +} + +func promQLString(value string) string { + value = strings.ReplaceAll(value, `\`, `\\`) + value = strings.ReplaceAll(value, "\n", `\n`) + value = strings.ReplaceAll(value, `"`, `\"`) + return `"` + value + `"` +} + +func prometheusAPIBaseURL() string { + if value := strings.TrimSpace(envOr(envMCPPrometheusAPIURL, "")); value != "" { + return value + } + if value := strings.TrimSpace(envOr(envPrometheusAPIURL, "")); value != "" { + return value + } + return defaultPrometheusAPIURL +} + +func prometheusQueryURL(base, query string) (string, error) { + base = strings.TrimSpace(base) + if base == "" { + return "", errors.New("prometheus API URL is empty") + } + u, err := url.Parse(base) + if err != nil { + return "", err + } + if u.Scheme != "http" && u.Scheme != "https" { + return "", errors.New("prometheus API URL must be absolute http(s)") + } + joined, err := url.JoinPath(u.String(), "api", "v1", "query") + if err != nil { + return "", err + } + queryURL, err := url.Parse(joined) + if err != nil { + return "", err + } + values := queryURL.Query() + values.Set("query", query) + queryURL.RawQuery = values.Encode() + return queryURL.String(), nil +} diff --git a/services/api/internal/runtimeapi/runtime_test.go b/services/api/internal/runtimeapi/runtime_test.go index 03b74726..0da2adc6 100644 --- a/services/api/internal/runtimeapi/runtime_test.go +++ b/services/api/internal/runtimeapi/runtime_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "errors" + "log" "net/http" "net/http/httptest" "strings" @@ -636,6 +637,358 @@ func TestRuntimeServersNonAdminRejectsOtherNamespace(t *testing.T) { } } +func TestRuntimeServersObservabilityLinksOnlyForObservableServers(t *testing.T) { + scheme := runtime.NewScheme() + if err := mcpv1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme: %v", err) + } + private := ownedTestMCPServer("private-demo", "user-1", "user-1") + shared := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shared-demo", + Namespace: sharedCatalogNamespace, + }, + Spec: mcpv1alpha1.MCPServerSpec{Image: "registry.example.com/shared-demo"}, + } + server := &RuntimeServer{ + k8sClients: &k8sclient.Clients{ + Dynamic: dynamicfake.NewSimpleDynamicClient(scheme, private, shared), + Clientset: kubernetesfake.NewSimpleClientset(), + }, + } + request := httptest.NewRequest(http.MethodGet, "/api/runtime/servers", nil) + request = request.WithContext(withPrincipal(request.Context(), principal{ + Role: roleUser, + Subject: "user-1", + Namespace: "user-1", + AllowedNamespaces: []string{ + "user-1", + sharedCatalogNamespace, + }, + })) + recorder := httptest.NewRecorder() + + server.HandleRuntimeServers(recorder, request) + + if recorder.Code != http.StatusOK { + t.Fatalf("status = %d, body = %s", recorder.Code, recorder.Body.String()) + } + var payload struct { + Servers []serverInfo `json:"servers"` + } + if err := json.NewDecoder(recorder.Body).Decode(&payload); err != nil { + t.Fatalf("decode response: %v", err) + } + if len(payload.Servers) != 2 { + t.Fatalf("servers = %#v, want private and shared", payload.Servers) + } + for _, got := range payload.Servers { + switch got.Name { + case "private-demo": + if got.Observability == nil || len(got.Observability.Prometheus.Queries) == 0 { + t.Fatalf("private server observability missing: %#v", got.Observability) + } + case "shared-demo": + if got.Observability != nil { + t.Fatalf("shared server should not expose user observability links: %#v", got.Observability) + } + } + } +} + +func TestRuntimeObservabilityLinksAllowOwnedNamespace(t *testing.T) { + server := newRuntimeServerWithMCPServers(t, ownedTestMCPServer("demo", "user-1", "user-1")) + request := httptest.NewRequest(http.MethodGet, "/api/runtime/observability/links?namespace=user-1&server=demo", nil) + request.Header.Set("X-Forwarded-Host", "platform.example.test") + request.Header.Set("X-Forwarded-Proto", "https") + request = request.WithContext(withPrincipal(request.Context(), principal{ + Role: roleUser, + Subject: "user-1", + Namespace: "user-1", + AllowedNamespaces: []string{ + "user-1", + sharedCatalogNamespace, + }, + })) + recorder := httptest.NewRecorder() + + server.HandleRuntimeObservabilityLinks(recorder, request) + + if recorder.Code != http.StatusOK { + t.Fatalf("status = %d, body = %s", recorder.Code, recorder.Body.String()) + } + var payload observabilityLinksResponse + if err := json.NewDecoder(recorder.Body).Decode(&payload); err != nil { + t.Fatalf("decode response: %v", err) + } + if payload.Namespace != "user-1" || payload.Server != "demo" { + t.Fatalf("target = %s/%s, want user-1/demo", payload.Namespace, payload.Server) + } + if len(payload.Prometheus.Queries) == 0 { + t.Fatalf("prometheus queries missing: %#v", payload.Prometheus) + } + if got := payload.Prometheus.Queries[0].URL; !strings.HasPrefix(got, "https://platform.example.test/api/runtime/observability/prometheus/query?") { + t.Fatalf("prometheus URL = %q", got) + } + if !payload.Grafana.Available { + t.Fatalf("grafana should be available through the default scoped dashboard: %#v", payload.Grafana) + } + if payload.Grafana.DirectAdminOnly { + t.Fatalf("default scoped grafana dashboard should not be admin-only: %#v", payload.Grafana) + } + if got := payload.Grafana.URL; !strings.HasPrefix(got, "https://platform.example.test/api/runtime/observability/grafana/dashboard?") { + t.Fatalf("grafana URL = %q", got) + } +} + +func TestRuntimeObservabilityLinksGenerateGrafanaURLAfterAuthorization(t *testing.T) { + t.Setenv(envGrafanaServerDashboardURL, "/grafana/d/server/mcp-server?var-namespace={namespace}&var-server={server}") + t.Setenv(envGrafanaScopedUserAccess, "true") + server := newRuntimeServerWithMCPServers(t, &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "team-demo", + Namespace: "mcp-team-acme", + }, + Spec: mcpv1alpha1.MCPServerSpec{ + Image: "registry.example.com/acme/team-demo", + TeamID: "team-acme-id", + }, + }) + request := httptest.NewRequest(http.MethodGet, "/api/runtime/observability/links?namespace=mcp-team-acme&server=team-demo", nil) + request = request.WithContext(withPrincipal(request.Context(), principal{ + Role: roleUser, + Subject: "user-1", + Namespace: "user-1", + AllowedNamespaces: []string{ + "user-1", + "mcp-team-acme", + }, + Teams: []principalTeam{{ + ID: "team-acme-id", + Slug: "acme", + Namespace: "mcp-team-acme", + Role: teamRoleMember, + }}, + })) + recorder := httptest.NewRecorder() + + server.HandleRuntimeObservabilityLinks(recorder, request) + + if recorder.Code != http.StatusOK { + t.Fatalf("status = %d, body = %s", recorder.Code, recorder.Body.String()) + } + var payload observabilityLinksResponse + if err := json.NewDecoder(recorder.Body).Decode(&payload); err != nil { + t.Fatalf("decode response: %v", err) + } + if !payload.Grafana.Available { + t.Fatalf("grafana should be available with scoped user access enabled: %#v", payload.Grafana) + } + wantURL := "/grafana/d/server/mcp-server?var-namespace=mcp-team-acme&var-server=team-demo" + if payload.Grafana.URL != wantURL { + t.Fatalf("grafana URL = %q, want %q", payload.Grafana.URL, wantURL) + } +} + +func TestRuntimeObservabilityRejectsCrossTenantServer(t *testing.T) { + server := newRuntimeServerWithMCPServers(t, ownedTestMCPServer("demo", "tenant-b", "tenant-b-user")) + request := httptest.NewRequest(http.MethodGet, "/api/runtime/observability/links?namespace=tenant-b&server=demo", nil) + request = request.WithContext(withPrincipal(request.Context(), principal{ + Role: roleUser, + Subject: "tenant-a-user", + Namespace: "tenant-a", + AllowedNamespaces: []string{ + "tenant-a", + sharedCatalogNamespace, + }, + })) + recorder := httptest.NewRecorder() + + server.HandleRuntimeObservabilityLinks(recorder, request) + + if recorder.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d; body = %s", recorder.Code, http.StatusNotFound, recorder.Body.String()) + } +} + +func TestRuntimeObservabilityRejectsUnownedSharedCatalogServer(t *testing.T) { + server := newRuntimeServerWithMCPServers(t, &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shared-demo", + Namespace: sharedCatalogNamespace, + }, + Spec: mcpv1alpha1.MCPServerSpec{Image: "registry.example.com/shared-demo"}, + }) + request := httptest.NewRequest(http.MethodGet, "/api/runtime/observability/links?namespace=mcp-servers&server=shared-demo", nil) + request = request.WithContext(withPrincipal(request.Context(), principal{ + Role: roleUser, + Subject: "user-1", + Namespace: "user-1", + AllowedNamespaces: []string{ + "user-1", + sharedCatalogNamespace, + }, + })) + recorder := httptest.NewRecorder() + + server.HandleRuntimeObservabilityLinks(recorder, request) + + if recorder.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d; body = %s", recorder.Code, http.StatusNotFound, recorder.Body.String()) + } +} + +func TestRuntimeObservabilityPrometheusProxyScopesQuery(t *testing.T) { + var gotPath, gotQuery string + prometheus := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotPath = r.URL.Path + gotQuery = r.URL.Query().Get("query") + w.Header().Set("content-type", "application/json") + _, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`)) + })) + defer prometheus.Close() + t.Setenv(envMCPPrometheusAPIURL, prometheus.URL+"/prometheus") + + server := newRuntimeServerWithMCPServers(t, ownedTestMCPServer("demo", "user-1", "user-1")) + request := httptest.NewRequest(http.MethodGet, "/api/runtime/observability/prometheus/query?namespace=user-1&server=demo&query_id=request_rate", nil) + request = request.WithContext(withPrincipal(request.Context(), principal{ + Role: roleUser, + Subject: "user-1", + Namespace: "user-1", + AllowedNamespaces: []string{ + "user-1", + sharedCatalogNamespace, + }, + })) + recorder := httptest.NewRecorder() + + server.HandleRuntimeObservabilityPrometheusQuery(recorder, request) + + if recorder.Code != http.StatusOK { + t.Fatalf("status = %d, body = %s", recorder.Code, recorder.Body.String()) + } + if gotPath != "/prometheus/api/v1/query" { + t.Fatalf("prometheus path = %q, want /prometheus/api/v1/query", gotPath) + } + wantQuery := `sum(rate(mcp_gateway_requests_total{namespace="user-1",server="demo"}[5m]))` + if gotQuery != wantQuery { + t.Fatalf("prometheus query = %q, want %q", gotQuery, wantQuery) + } + if strings.Contains(recorder.Body.String(), "tenant-b") { + t.Fatalf("response leaked foreign tenant marker: %s", recorder.Body.String()) + } +} + +func TestRuntimeObservabilityPrometheusProxyLogsBoundedFailureBody(t *testing.T) { + failureBody := strings.Repeat("x", prometheusErrorBodyLimit+200) + "\nsecret-after-limit" + prometheus := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, failureBody, http.StatusUnprocessableEntity) + })) + defer prometheus.Close() + t.Setenv(envMCPPrometheusAPIURL, prometheus.URL) + + var logs bytes.Buffer + originalWriter := log.Writer() + originalFlags := log.Flags() + log.SetOutput(&logs) + log.SetFlags(0) + t.Cleanup(func() { + log.SetOutput(originalWriter) + log.SetFlags(originalFlags) + }) + + server := newRuntimeServerWithMCPServers(t, ownedTestMCPServer("demo", "user-1", "user-1")) + request := httptest.NewRequest(http.MethodGet, "/api/runtime/observability/prometheus/query?namespace=user-1&server=demo&query_id=up", nil) + request = request.WithContext(withPrincipal(request.Context(), principal{ + Role: roleUser, + Subject: "user-1", + Namespace: "user-1", + AllowedNamespaces: []string{"user-1"}, + })) + recorder := httptest.NewRecorder() + + server.HandleRuntimeObservabilityPrometheusQuery(recorder, request) + + if recorder.Code != http.StatusBadGateway { + t.Fatalf("status = %d, want %d", recorder.Code, http.StatusBadGateway) + } + got := logs.String() + if !strings.Contains(got, `status=422 body="`) { + t.Fatalf("log missing upstream status/body: %q", got) + } + if strings.Contains(got, "secret-after-limit") { + t.Fatalf("log included content beyond %d-byte limit: %q", prometheusErrorBodyLimit, got) + } +} + +func TestRuntimeObservabilityGrafanaDashboardScopesQueries(t *testing.T) { + var gotQueries []string + prometheus := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotQueries = append(gotQueries, r.URL.Query().Get("query")) + w.Header().Set("content-type", "application/json") + _, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`)) + })) + defer prometheus.Close() + t.Setenv(envMCPPrometheusAPIURL, prometheus.URL+"/prometheus") + + server := newRuntimeServerWithMCPServers(t, ownedTestMCPServer("demo", "user-1", "user-1")) + request := httptest.NewRequest(http.MethodGet, "/api/runtime/observability/grafana/dashboard?namespace=user-1&server=demo", nil) + request = request.WithContext(withPrincipal(request.Context(), principal{ + Role: roleUser, + Subject: "user-1", + Namespace: "user-1", + AllowedNamespaces: []string{ + "user-1", + sharedCatalogNamespace, + }, + })) + recorder := httptest.NewRecorder() + + server.HandleRuntimeObservabilityGrafanaDashboard(recorder, request) + + if recorder.Code != http.StatusOK { + t.Fatalf("status = %d, body = %s", recorder.Code, recorder.Body.String()) + } + if got := recorder.Header().Get("content-type"); !strings.Contains(got, "text/html") { + t.Fatalf("content-type = %q, want text/html", got) + } + body := recorder.Body.String() + for _, want := range []string{"Scoped Grafana", "user-1", "demo", "mcp_gateway_requests_total"} { + if !strings.Contains(body, want) { + t.Fatalf("dashboard body missing %q: %s", want, body) + } + } + if strings.Contains(body, "tenant-b") { + t.Fatalf("dashboard leaked foreign tenant marker: %s", body) + } + if len(gotQueries) == 0 { + t.Fatal("expected scoped prometheus queries") + } + for _, query := range gotQueries { + if !strings.Contains(query, `namespace="user-1",server="demo"`) { + t.Fatalf("query = %q, want user/server scoped selector", query) + } + } +} + +func newRuntimeServerWithMCPServers(t *testing.T, servers ...*mcpv1alpha1.MCPServer) *RuntimeServer { + t.Helper() + scheme := runtime.NewScheme() + if err := mcpv1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme: %v", err) + } + objects := make([]runtime.Object, 0, len(servers)) + for _, server := range servers { + objects = append(objects, server) + } + return &RuntimeServer{ + k8sClients: &k8sclient.Clients{ + Dynamic: dynamicfake.NewSimpleDynamicClient(scheme, objects...), + Clientset: kubernetesfake.NewSimpleClientset(), + }, + } +} + func TestRuntimeServerApplyNonAdminRejectsSharedCatalogNamespace(t *testing.T) { server := &RuntimeServer{ k8sClients: &k8sclient.Clients{ diff --git a/services/api/internal/runtimeapi/servers.go b/services/api/internal/runtimeapi/servers.go index 96ca3b48..70c8807f 100644 --- a/services/api/internal/runtimeapi/servers.go +++ b/services/api/internal/runtimeapi/servers.go @@ -715,9 +715,10 @@ func (s *RuntimeServer) handleRuntimeServerDelete(w http.ResponseWriter, r *http type serverInfo struct { controlplane.ServerInfo - LiveInventory *liveInventory `json:"liveInventory"` - LiveInventoryError string `json:"liveInventoryError,omitempty"` - AccessJSON map[string]any `json:"access_json,omitempty"` + LiveInventory *liveInventory `json:"liveInventory"` + LiveInventoryError string `json:"liveInventoryError,omitempty"` + AccessJSON map[string]any `json:"access_json,omitempty"` + Observability *observabilityLinksResponse `json:"observability,omitempty"` } type serverDeploymentStatus = controlplane.ServerDeploymentStatus @@ -748,6 +749,10 @@ func serverInfoWithAccessJSON(info controlplane.ServerInfo, r *http.Request) ser }, } } + if p, ok := principalFromContext(r.Context()); ok && serverInfoObservableByPrincipal(info, p) { + links := observabilityLinksForServerInfo(info, p, r) + out.Observability = &links + } return out } diff --git a/services/api/routes.go b/services/api/routes.go index d941b788..86e4f3a3 100644 --- a/services/api/routes.go +++ b/services/api/routes.go @@ -78,6 +78,9 @@ func (s *apiServer) registerRuntimeRoutes(mux *http.ServeMux) { mux.Handle("/api/runtime/server-events", s.auth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { runtimehandlers.HandleRuntimeServerEvents(runtimeServer, w, r) }))) + mux.Handle("/api/runtime/observability/links", s.auth(http.HandlerFunc(runtimeServer.HandleRuntimeObservabilityLinks))) + mux.Handle("/api/runtime/observability/grafana/dashboard", s.auth(http.HandlerFunc(runtimeServer.HandleRuntimeObservabilityGrafanaDashboard))) + mux.Handle("/api/runtime/observability/prometheus/query", s.auth(http.HandlerFunc(runtimeServer.HandleRuntimeObservabilityPrometheusQuery))) mux.Handle("/api/runtime/teams", s.auth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { runtimehandlers.HandleRuntimeTeams(runtimeServer, w, r) }))) diff --git a/services/mcp-gateway/go.mod b/services/mcp-gateway/go.mod index b5df0f46..7ac4ea9d 100644 --- a/services/mcp-gateway/go.mod +++ b/services/mcp-gateway/go.mod @@ -5,6 +5,7 @@ go 1.26.0 require ( github.com/MicahParks/keyfunc v1.9.0 github.com/golang-jwt/jwt/v4 v4.5.2 + github.com/prometheus/client_golang v1.23.2 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.69.0 go.opentelemetry.io/otel v1.44.0 go.opentelemetry.io/otel/trace v1.44.0 @@ -24,9 +25,9 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 // indirect github.com/klauspost/compress v1.18.3 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pierrec/lz4/v4 v4.1.25 // indirect - github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/procfs v0.19.2 // indirect diff --git a/services/mcp-gateway/main.go b/services/mcp-gateway/main.go index 7b27685c..063987eb 100644 --- a/services/mcp-gateway/main.go +++ b/services/mcp-gateway/main.go @@ -12,6 +12,7 @@ import ( "syscall" "time" + "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" _ "go.uber.org/automaxprocs" // align GOMAXPROCS with container CPU quota @@ -20,6 +21,7 @@ import ( func main() { port := serviceutil.EnvOr("PORT", "8091") + metricsPort := serviceutil.EnvOr("METRICS_PORT", "9103") upstream := serviceutil.EnvOr("UPSTREAM_URL", "http://127.0.0.1:8090") analyticsURL := strings.TrimSpace(os.Getenv("ANALYTICS_INGEST_URL")) apiKey := strings.TrimSpace(os.Getenv("ANALYTICS_API_KEY")) @@ -56,6 +58,7 @@ func main() { srv := &gatewayServer{ proxy: proxy, + metrics: newGatewayMetrics(prometheus.DefaultRegisterer), analyticsURL: analyticsURL, apiKey: apiKey, source: source, @@ -87,6 +90,8 @@ func main() { }) mux.HandleFunc("/", srv.handleGateway) + metricsShutdown, metricsErrs := serviceutil.StartMetricsServer(metricsPort) + shutdown, err := initTracer("mcp-gateway") if err != nil { log.Printf("otel init failed: %v", err) @@ -98,7 +103,7 @@ func main() { }() } - log.Printf("mcp-gateway listening on :%s -> %s", port, upstream) + log.Printf("mcp-gateway listening on :%s -> %s (metrics on :%s)", port, upstream, metricsPort) handler := otelhttp.NewHandler(mux, "http.server") httpServer := &http.Server{ Addr: ":" + port, @@ -109,20 +114,25 @@ func main() { IdleTimeout: 60 * time.Second, } - serverErr := make(chan error, 1) + serverErrs := make(chan error, 2) + go func() { + if err, ok := <-metricsErrs; ok { + serverErrs <- err + } + }() go func() { - serverErr <- httpServer.ListenAndServe() + if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + serverErrs <- err + } }() ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() select { - case err := <-serverErr: + case err := <-serverErrs: srv.stopAnalyticsDispatcher() - if err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Fatalf("server failed: %v", err) - } + log.Fatalf("server failed: %v", err) case <-ctx.Done(): shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -130,6 +140,10 @@ func main() { srv.stopAnalyticsDispatcher() log.Fatalf("server shutdown failed: %v", err) } + if err := metricsShutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) { + srv.stopAnalyticsDispatcher() + log.Fatalf("metrics shutdown failed: %v", err) + } srv.stopAnalyticsDispatcher() } } diff --git a/services/mcp-gateway/main_test.go b/services/mcp-gateway/main_test.go index 183b6ef6..cf764df8 100644 --- a/services/mcp-gateway/main_test.go +++ b/services/mcp-gateway/main_test.go @@ -8,9 +8,11 @@ import ( "encoding/json" "io" "math/big" + "net" "net/http" "net/http/httptest" "net/url" + "os" "path/filepath" "strings" "sync/atomic" @@ -18,6 +20,9 @@ import ( "time" "github.com/golang-jwt/jwt/v4" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/client_golang/prometheus/testutil" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" @@ -522,6 +527,230 @@ func TestAuditPayloadIncludesLatencyMetadata(t *testing.T) { } } +func TestGatewayMetricsNotExposedOnMainListener(t *testing.T) { + t.Parallel() + + registry := prometheus.NewRegistry() + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + t.Cleanup(upstream.Close) + + target, err := url.Parse(upstream.URL) + if err != nil { + t.Fatalf("url.Parse() error = %v", err) + } + + proxy := &gatewayServer{ + proxy: newUpstreamReverseProxy(target), + metrics: newGatewayMetrics(registry), + httpClient: &http.Client{Timeout: 2 * time.Second}, + defaultHumanHeader: defaultHumanHeader, + defaultAgentHeader: defaultAgentHeader, + defaultTeamHeader: defaultTeamHeader, + defaultSessionHeader: defaultSessionHeader, + defaultPolicyMode: defaultPolicyMode, + defaultPolicyDecision: defaultPolicyDecision, + defaultPolicyVersion: "test-policy", + oauthProviders: map[string]*oauthProvider{}, + } + proxy.snapshotPolicy(policySnapshot{Policy: headerPolicy()}) + + mux := http.NewServeMux() + mux.HandleFunc("/", proxy.handleGateway) + + recorder := httptest.NewRecorder() + request := httptest.NewRequest(http.MethodGet, "/metrics", nil) + mux.ServeHTTP(recorder, request) + + if strings.Contains(recorder.Body.String(), "mcp_gateway_requests_total") { + t.Fatalf("main listener exposed prometheus metrics: status=%d body=%s", recorder.Code, recorder.Body.String()) + } +} + +func TestGatewayMetricsAvailableOnMetricsPort(t *testing.T) { + t.Parallel() + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Listen() error = %v", err) + } + t.Cleanup(func() { + _ = listener.Close() + }) + + registry := prometheus.NewRegistry() + metrics := newGatewayMetrics(registry) + metrics.recordRequest( + gatewayMetricScope{Namespace: "mcp-servers", Server: "demo", Cluster: "kind", TeamID: "team-acme"}, + httptest.NewRequest(http.MethodPost, "http://proxy.example.com/mcp", nil), + "tools/call", + policypkg.Decision{Allowed: true, Status: http.StatusOK, Reason: "allowed", PolicyVersion: "test-policy"}, + http.StatusOK, + time.Millisecond, + 0, + 0, + ) + + metricsServer := &http.Server{ + Handler: serviceutilMetricsHandler(registry), + } + go func() { + _ = metricsServer.Serve(listener) + }() + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = metricsServer.Shutdown(ctx) + }) + + resp, err := http.Get("http://" + listener.Addr().String() + "/metrics") + if err != nil { + t.Fatalf("Get() error = %v", err) + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("ReadAll() error = %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("metrics status = %d, want %d", resp.StatusCode, http.StatusOK) + } + if !strings.Contains(string(body), "mcp_gateway_requests_total") { + t.Fatalf("metrics body missing gateway counter: %s", body) + } +} + +func serviceutilMetricsHandler(registry *prometheus.Registry) http.Handler { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) + return mux +} + +func TestGatewayMetricsRecordRequestsPolicyDecisionsAndBytes(t *testing.T) { + t.Parallel() + + registry := prometheus.NewRegistry() + policy := &policypkg.Document{ + Server: policypkg.Server{ + Name: "demo", + Namespace: "mcp-servers", + TeamID: "team-acme", + Cluster: "kind", + }, + Auth: &policypkg.Auth{ + Mode: "header", + HumanIDHeader: defaultHumanHeader, + AgentIDHeader: defaultAgentHeader, + TeamIDHeader: defaultTeamHeader, + SessionIDHeader: defaultSessionHeader, + }, + Policy: &policypkg.Config{ + Mode: "allow-list", + DefaultDecision: "deny", + PolicyVersion: "test-policy", + }, + } + proxy := newTestGatewayServer(t, policy, func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }) + proxy.metrics = newGatewayMetrics(registry) + + body := `{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"echo"}}` + req := httptest.NewRequest(http.MethodPost, "http://proxy.example.com/mcp", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set(defaultHumanHeader, "human-1") + recorder := httptest.NewRecorder() + + proxy.handleGateway(recorder, req) + + if recorder.Code != http.StatusForbidden { + t.Fatalf("status = %d, want %d", recorder.Code, http.StatusForbidden) + } + + requestLabels := []string{"mcp-servers", "demo", "kind", "team-acme", http.MethodPost, "tools/call", "deny", "403"} + if got := testutil.ToFloat64(proxy.metrics.requestsTotal.WithLabelValues(requestLabels...)); got != 1 { + t.Fatalf("requests total = %v, want 1", got) + } + if got := testutil.ToFloat64(proxy.metrics.policyDecisionsTotal.WithLabelValues("mcp-servers", "demo", "kind", "team-acme", "deny", "no_matching_grant", "tools/call")); got != 1 { + t.Fatalf("policy decisions total = %v, want 1", got) + } + if got := testutil.ToFloat64(proxy.metrics.requestBytesTotal.WithLabelValues(requestLabels...)); got != float64(len(body)) { + t.Fatalf("request bytes total = %v, want %d", got, len(body)) + } + if got := testutil.ToFloat64(proxy.metrics.responseBytesTotal.WithLabelValues(requestLabels...)); got == 0 { + t.Fatal("response bytes total = 0, want denied response bytes") + } + if got := testutil.ToFloat64(proxy.metrics.inflightRequests.WithLabelValues("mcp-servers", "demo", "kind", "team-acme")); got != 0 { + t.Fatalf("inflight requests = %v, want 0 after request completion", got) + } + if metrics := gatherMetricsText(t, registry); !strings.Contains(metrics, "mcp_gateway_request_duration_seconds_bucket") { + t.Fatalf("duration histogram was not exposed:\n%s", metrics) + } +} + +func TestGatewayMetricsRecordPolicyReloadResults(t *testing.T) { + t.Parallel() + + registry := prometheus.NewRegistry() + policyFile := filepath.Join(t.TempDir(), "policy.json") + if err := os.WriteFile(policyFile, []byte(`{ + "server":{"name":"demo","namespace":"mcp-servers","team_id":"team-acme","cluster":"kind"}, + "auth":{"mode":"header"}, + "policy":{"mode":"observe","default_decision":"deny","policy_version":"test-policy"} + }`), 0o600); err != nil { + t.Fatalf("WriteFile() error = %v", err) + } + proxy := &gatewayServer{ + metrics: newGatewayMetrics(registry), + policyFile: policyFile, + serverName: "demo", + serverNamespace: "mcp-servers", + clusterName: "kind", + defaultHumanHeader: defaultHumanHeader, + defaultAgentHeader: defaultAgentHeader, + defaultTeamHeader: defaultTeamHeader, + defaultSessionHeader: defaultSessionHeader, + defaultPolicyMode: defaultPolicyMode, + defaultPolicyDecision: defaultPolicyDecision, + defaultPolicyVersion: "test-policy", + } + + if err := proxy.reloadPolicy(); err != nil { + t.Fatalf("reloadPolicy() error = %v", err) + } + if got := testutil.ToFloat64(proxy.metrics.policyReloadsTotal.WithLabelValues("mcp-servers", "demo", "kind", "team-acme", "success")); got != 1 { + t.Fatalf("successful policy reloads = %v, want 1", got) + } + if got := testutil.ToFloat64(proxy.metrics.policyLastReload.WithLabelValues("mcp-servers", "demo", "kind", "team-acme")); got == 0 { + t.Fatal("policy last reload timestamp = 0, want non-zero") + } + + if err := os.WriteFile(policyFile, []byte(`{`), 0o600); err != nil { + t.Fatalf("WriteFile() error = %v", err) + } + if err := proxy.reloadPolicy(); err == nil { + t.Fatal("reloadPolicy() error = nil, want malformed policy error") + } + if got := testutil.ToFloat64(proxy.metrics.policyReloadsTotal.WithLabelValues("mcp-servers", "demo", "kind", "team-acme", "error")); got != 1 { + t.Fatalf("failed policy reloads = %v, want 1", got) + } +} + +func TestGatewayMetricMethodsUseBoundedLabels(t *testing.T) { + t.Parallel() + + if got := metricHTTPMethod("CUSTOM-" + strings.Repeat("x", 80)); got != "OTHER" { + t.Fatalf("metricHTTPMethod() = %q, want OTHER", got) + } + if got := metricRPCMethod("attacker/" + strings.Repeat("x", 80)); got != "other" { + t.Fatalf("metricRPCMethod() = %q, want other", got) + } + if got := metricRPCMethod("tools/call"); got != "tools/call" { + t.Fatalf("metricRPCMethod(tools/call) = %q", got) + } +} + func TestAuditPayloadIncludesToolRiskLevel(t *testing.T) { t.Parallel() @@ -999,6 +1228,15 @@ func newTestGatewayServer(t *testing.T, policy *policypkg.Document, upstream htt return server } +func gatherMetricsText(t *testing.T, registry *prometheus.Registry) string { + t.Helper() + + recorder := httptest.NewRecorder() + request := httptest.NewRequest(http.MethodGet, "/metrics", nil) + promhttp.HandlerFor(registry, promhttp.HandlerOpts{}).ServeHTTP(recorder, request) + return recorder.Body.String() +} + func oauthPolicy(issuerURL string) *policypkg.Document { return &policypkg.Document{ Auth: &policypkg.Auth{ diff --git a/services/mcp-gateway/metrics.go b/services/mcp-gateway/metrics.go new file mode 100644 index 00000000..c2485bb4 --- /dev/null +++ b/services/mcp-gateway/metrics.go @@ -0,0 +1,209 @@ +package main + +import ( + "net/http" + "strconv" + "strings" + "time" + + "github.com/prometheus/client_golang/prometheus" + + policypkg "mcp-runtime/pkg/policy" +) + +type gatewayMetrics struct { + requestsTotal *prometheus.CounterVec + policyDecisionsTotal *prometheus.CounterVec + requestDurationSeconds *prometheus.HistogramVec + inflightRequests *prometheus.GaugeVec + requestBytesTotal *prometheus.CounterVec + responseBytesTotal *prometheus.CounterVec + policyReloadsTotal *prometheus.CounterVec + policyLastReload *prometheus.GaugeVec +} + +type gatewayMetricScope struct { + Namespace string + Server string + Cluster string + TeamID string +} + +func newGatewayMetrics(registerer prometheus.Registerer) *gatewayMetrics { + m := &gatewayMetrics{ + requestsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "mcp_gateway_requests_total", + Help: "Total HTTP requests handled by MCP gateway sidecars.", + }, []string{"namespace", "server", "cluster", "team_id", "method", "rpc_method", "decision", "status"}), + policyDecisionsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "mcp_gateway_policy_decisions_total", + Help: "Total policy decisions made by MCP gateway sidecars.", + }, []string{"namespace", "server", "cluster", "team_id", "decision", "reason", "rpc_method"}), + requestDurationSeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "mcp_gateway_request_duration_seconds", + Help: "End-to-end request duration observed by MCP gateway sidecars.", + Buckets: prometheus.DefBuckets, + }, []string{"namespace", "server", "cluster", "team_id", "method", "rpc_method", "decision", "status"}), + inflightRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "mcp_gateway_inflight_requests", + Help: "Current in-flight HTTP requests handled by MCP gateway sidecars.", + }, []string{"namespace", "server", "cluster", "team_id"}), + requestBytesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "mcp_gateway_request_bytes_total", + Help: "Total request body bytes observed by MCP gateway sidecars.", + }, []string{"namespace", "server", "cluster", "team_id", "method", "rpc_method", "decision", "status"}), + responseBytesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "mcp_gateway_response_bytes_total", + Help: "Total response body bytes written by MCP gateway sidecars.", + }, []string{"namespace", "server", "cluster", "team_id", "method", "rpc_method", "decision", "status"}), + policyReloadsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "mcp_gateway_policy_reloads_total", + Help: "Total gateway policy reload attempts.", + }, []string{"namespace", "server", "cluster", "team_id", "result"}), + policyLastReload: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "mcp_gateway_policy_last_reload_success_timestamp_seconds", + Help: "Unix timestamp of the last successful gateway policy reload.", + }, []string{"namespace", "server", "cluster", "team_id"}), + } + if registerer != nil { + registerer.MustRegister( + m.requestsTotal, + m.policyDecisionsTotal, + m.requestDurationSeconds, + m.inflightRequests, + m.requestBytesTotal, + m.responseBytesTotal, + m.policyReloadsTotal, + m.policyLastReload, + ) + } + return m +} + +func (s *gatewayServer) metricScope(policy *policypkg.Document) gatewayMetricScope { + return gatewayMetricScope{ + Namespace: policypkg.FirstNonEmpty(policypkg.PolicyServerNamespace(policy), s.serverNamespace), + Server: policypkg.FirstNonEmpty(policypkg.PolicyServerName(policy), s.serverName), + Cluster: policypkg.FirstNonEmpty(policypkg.PolicyServerCluster(policy), s.clusterName), + TeamID: policypkg.PolicyServerTeamID(policy), + } +} + +func (m *gatewayMetrics) trackInflight(scope gatewayMetricScope) func() { + if m == nil { + return func() {} + } + labels := []string{scope.Namespace, scope.Server, scope.Cluster, scope.TeamID} + m.inflightRequests.WithLabelValues(labels...).Inc() + return func() { + m.inflightRequests.WithLabelValues(labels...).Dec() + } +} + +func (m *gatewayMetrics) recordRequest( + scope gatewayMetricScope, + r *http.Request, + rpcMethod string, + decision policypkg.Decision, + status int, + duration time.Duration, + requestBytes int64, + responseBytes int, +) { + if m == nil || r == nil { + return + } + labels := []string{ + scope.Namespace, + scope.Server, + scope.Cluster, + scope.TeamID, + metricHTTPMethod(r.Method), + metricRPCMethod(rpcMethod), + metricDecision(decision), + strconv.Itoa(status), + } + m.requestsTotal.WithLabelValues(labels...).Inc() + m.requestDurationSeconds.WithLabelValues(labels...).Observe(duration.Seconds()) + m.requestBytesTotal.WithLabelValues(labels...).Add(float64(maxInt64(requestBytes, 0))) + m.responseBytesTotal.WithLabelValues(labels...).Add(float64(responseBytes)) +} + +func (m *gatewayMetrics) recordPolicyDecision(scope gatewayMetricScope, rpcMethod string, decision policypkg.Decision) { + if m == nil { + return + } + m.policyDecisionsTotal.WithLabelValues( + scope.Namespace, + scope.Server, + scope.Cluster, + scope.TeamID, + metricDecision(decision), + metricReason(decision), + metricRPCMethod(rpcMethod), + ).Inc() +} + +func (m *gatewayMetrics) recordPolicyReload(scope gatewayMetricScope, err error) { + if m == nil { + return + } + result := "success" + if err != nil { + result = "error" + } + m.policyReloadsTotal.WithLabelValues(scope.Namespace, scope.Server, scope.Cluster, scope.TeamID, result).Inc() + if err == nil { + m.policyLastReload.WithLabelValues(scope.Namespace, scope.Server, scope.Cluster, scope.TeamID).Set(float64(time.Now().Unix())) + } +} + +func metricDecision(decision policypkg.Decision) string { + if decision.Allowed { + return "allow" + } + return "deny" +} + +func metricReason(decision policypkg.Decision) string { + if value := strings.TrimSpace(decision.Reason); value != "" { + return value + } + if decision.Allowed { + return "allowed" + } + return "denied" +} + +func metricHTTPMethod(method string) string { + switch strings.ToUpper(strings.TrimSpace(method)) { + case http.MethodGet, http.MethodPost, http.MethodPut, http.MethodPatch, + http.MethodDelete, http.MethodHead, http.MethodOptions: + return strings.ToUpper(strings.TrimSpace(method)) + default: + return "OTHER" + } +} + +func metricRPCMethod(method string) string { + switch strings.TrimSpace(method) { + case "": + return "none" + case "initialize", + "notifications/initialized", + "ping", + "tools/list", + "tools/call", + "resources/list", + "resources/read", + "resources/subscribe", + "resources/unsubscribe", + "prompts/list", + "prompts/get", + "completion/complete", + "logging/setLevel": + return strings.TrimSpace(method) + default: + return "other" + } +} diff --git a/services/mcp-gateway/policy_cache.go b/services/mcp-gateway/policy_cache.go index 1d757ae5..e2650f1a 100644 --- a/services/mcp-gateway/policy_cache.go +++ b/services/mcp-gateway/policy_cache.go @@ -42,9 +42,11 @@ func (s *gatewayServer) reloadPolicy() error { fallback = s.defaultPolicyDocument() } s.snapshotPolicy(policySnapshot{Policy: fallback, Err: err}) + s.metrics.recordPolicyReload(s.metricScope(fallback), err) return err } s.snapshotPolicy(policySnapshot{Policy: doc}) + s.metrics.recordPolicyReload(s.metricScope(doc), nil) return nil } diff --git a/services/mcp-gateway/proxy.go b/services/mcp-gateway/proxy.go index 8ef8dd4c..b831fb1b 100644 --- a/services/mcp-gateway/proxy.go +++ b/services/mcp-gateway/proxy.go @@ -48,6 +48,16 @@ func (s *gatewayServer) handleGateway(w http.ResponseWriter, r *http.Request) { Reason: "allowed", PolicyVersion: s.defaultPolicyVersion, } + scope := s.metricScope(policy) + stopInflight := s.metrics.trackInflight(scope) + defer stopInflight() + policyDecisionObserved := false + defer func() { + s.metrics.recordRequest(scope, r, rpcMethod, decision, recorder.status, time.Since(start), r.ContentLength, recorder.bytes) + if policyDecisionObserved { + s.metrics.recordPolicyDecision(scope, rpcMethod, decision) + } + }() oauthResult := oauthAuthResult{ Allowed: true, Status: http.StatusOK, @@ -63,6 +73,7 @@ func (s *gatewayServer) handleGateway(w http.ResponseWriter, r *http.Request) { oauthResult.Reason, policypkg.ChoosePolicyVersion(policypkg.PolicyVersion(policy), s.defaultPolicyVersion), ) + policyDecisionObserved = true s.writeDeniedResponse(recorder, r, originalPath, rpcMethod, toolName, authCtx, policy, decision, start, inspection.IsRPCAttempt) return } @@ -89,6 +100,7 @@ func (s *gatewayServer) handleGateway(w http.ResponseWriter, r *http.Request) { ToolName: policypkg.ToolName(toolName), }, time.Now()) } + policyDecisionObserved = true } if !decision.Allowed { diff --git a/services/mcp-gateway/types.go b/services/mcp-gateway/types.go index d3942110..7654a151 100644 --- a/services/mcp-gateway/types.go +++ b/services/mcp-gateway/types.go @@ -60,6 +60,7 @@ type analyticsEvent struct { type gatewayServer struct { proxy *httputil.ReverseProxy + metrics *gatewayMetrics analyticsURL string apiKey string source string diff --git a/services/ui/main_test.go b/services/ui/main_test.go index d51dcc06..49963473 100644 --- a/services/ui/main_test.go +++ b/services/ui/main_test.go @@ -410,6 +410,10 @@ func TestStaticAppMovesTenantRetireActionToMyActivity(t *testing.T) { `function isTenantUser()`, `if (isTenantUser() && server.namespace && server.name)`, `retireButton.textContent = "Retire"`, + `metricsButton.textContent = "Prometheus"`, + `grafanaButton.textContent = "Grafana"`, + `async function openScopedObservability(server, target)`, + `server.observability?.prometheus?.queries?.length`, `authenticated && !isTenantUser()`, `await loadUserDashboard()`, } { diff --git a/services/ui/static/app.js b/services/ui/static/app.js index 3d1480d0..127a6a37 100644 --- a/services/ui/static/app.js +++ b/services/ui/static/app.js @@ -1289,6 +1289,24 @@ function createUserServerActionsCell(server) { }); actions.appendChild(analyticsButton); + if (server.observability?.prometheus?.queries?.length) { + const metricsButton = document.createElement("button"); + metricsButton.type = "button"; + metricsButton.className = "ghost action-btn"; + metricsButton.textContent = "Prometheus"; + metricsButton.addEventListener("click", () => openScopedObservability(server, "prometheus")); + actions.appendChild(metricsButton); + } + + if (server.observability?.grafana?.available && server.observability?.grafana?.url) { + const grafanaButton = document.createElement("button"); + grafanaButton.type = "button"; + grafanaButton.className = "ghost action-btn"; + grafanaButton.textContent = "Grafana"; + grafanaButton.addEventListener("click", () => openScopedObservability(server, "grafana")); + actions.appendChild(grafanaButton); + } + if (server.endpoint) { const copyButton = document.createElement("button"); copyButton.type = "button"; @@ -1311,6 +1329,35 @@ function createUserServerActionsCell(server) { return cell; } +async function openScopedObservability(server, target) { + if (!server?.namespace || !server?.name) return; + try { + const links = server.observability || await fetchJSON( + `/runtime/observability/links?namespace=${encodeURIComponent(server.namespace)}&server=${encodeURIComponent(server.name)}` + ); + if (target === "grafana") { + if (links?.grafana?.available && links.grafana.url) { + window.open(links.grafana.url, "_blank", "noopener"); + return; + } + showToast(links?.grafana?.reason || "Grafana is not available for this server", "error"); + return; + } + const queries = links?.prometheus?.queries || []; + const requestRate = queries.find((query) => query.id === "request_rate"); + const query = requestRate || queries[0]; + if (!query?.url) { + showToast("Prometheus metrics are not available for this server", "error"); + return; + } + window.open(query.url, "_blank", "noopener"); + } catch (err) { + if (isUnauthorizedError(err)) return; + console.error("Failed to open observability link:", err); + showToast(readErrorMessage(err, "Observability is unavailable"), "error"); + } +} + function renderUserDashboardAnalytics(data) { renderUserAnalyticsBreakdown(data?.servers || []); renderUserAnalyticsTools(data?.tools || []);