diff --git a/docs/observability.md b/docs/observability.md index 39ea9155..3ceaae9c 100644 --- a/docs/observability.md +++ b/docs/observability.md @@ -99,7 +99,8 @@ All metrics are prefixed with `thv_reg_srv_` to distinguish them from other metr | `thv_reg_srv_http_request_duration_seconds` | Histogram | `method`, `route`, `status_code` | Duration of HTTP requests | | `thv_reg_srv_http_requests_total` | Counter | `method`, `route`, `status_code` | Total number of HTTP requests | | `thv_reg_srv_http_active_requests` | UpDownCounter | - | Number of in-flight requests | -| `thv_reg_srv_servers_total` | Gauge | `registry` | Number of servers in each registry | +| `thv_reg_srv_servers_total` | Gauge | `source` | Number of distinct servers in each source | +| `thv_reg_srv_skills_total` | Gauge | `source` | Number of distinct skills in each source | | `thv_reg_srv_sync_duration_seconds` | Histogram | `registry`, `success` | Duration of sync operations | ### Histogram Buckets diff --git a/examples/otel/grafana/dashboards/registry-server.json b/examples/otel/grafana/dashboards/registry-server.json index fc23de68..36edd83b 100644 --- a/examples/otel/grafana/dashboards/registry-server.json +++ b/examples/otel/grafana/dashboards/registry-server.json @@ -263,11 +263,11 @@ { "datasource": { "type": "prometheus", "uid": "${datasource}" }, "expr": "thv_reg_srv_servers_total", - "legendFormat": "{{ registry }}", + "legendFormat": "{{ source }}", "refId": "A" } ], - "title": "Servers per Registry", + "title": "Servers per Source", "type": "stat" }, { @@ -316,11 +316,11 @@ { "datasource": { "type": "prometheus", "uid": "${datasource}" }, "expr": "thv_reg_srv_servers_total", - "legendFormat": "{{ registry }}", + "legendFormat": "{{ source }}", "refId": "A" } ], - "title": "Servers per Registry (over time)", + "title": "Servers per Source (over time)", "type": "timeseries" }, { diff --git a/internal/app/builder.go b/internal/app/builder.go index ca68f5cf..d75a9bac 100644 --- a/internal/app/builder.go +++ b/internal/app/builder.go @@ -74,6 +74,10 @@ type registryAppConfig struct { coordinatorOpts []coordinator.Option } +type registryMetricsReaderFactory interface { + CreateRegistryMetricsReader(ctx context.Context) (telemetry.RegistryMetricReader, error) +} + func baseConfig(opts ...RegistryAppOptions) (*registryAppConfig, error) { cfg := ®istryAppConfig{ address: defaultHTTPAddress, @@ -359,13 +363,22 @@ func buildSyncComponents( slog.Info("Sync metrics enabled") } - registryMetrics, err := telemetry.NewRegistryMetrics(b.meterProvider) - if err != nil { - return nil, fmt.Errorf("failed to create registry metrics: %w", err) - } - if registryMetrics != nil { - coordOpts = append(coordOpts, coordinator.WithRegistryMetrics(registryMetrics)) - slog.Info("Registry metrics enabled") + if readerFactory, ok := b.storageFactory.(registryMetricsReaderFactory); ok { + registryMetricsReader, err := readerFactory.CreateRegistryMetricsReader(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create registry metrics reader: %w", err) + } + + registryMetrics, err := telemetry.NewRegistryMetrics(b.meterProvider, registryMetricsReader) + if err != nil { + return nil, fmt.Errorf("failed to create registry metrics: %w", err) + } + if registryMetrics != nil { + coordOpts = append(coordOpts, coordinator.WithRegistryMetrics(registryMetrics)) + slog.Info("Registry metrics enabled") + } + } else { + slog.Debug("Registry metrics disabled: storage factory does not provide a registry metrics reader") } } diff --git a/internal/app/builder_test.go b/internal/app/builder_test.go index 6ebb1b1c..27318398 100644 --- a/internal/app/builder_test.go +++ b/internal/app/builder_test.go @@ -686,7 +686,7 @@ func TestBuildSyncComponents_WithMeterProvider(t *testing.T) { description string }{ { - name: "with meter provider creates sync and registry metrics", + name: "with meter provider creates available metrics", meterProvider: noop.NewMeterProvider(), description: "coordinator should be created with metrics when meter provider is set", }, diff --git a/internal/app/storage/database_factory_test.go b/internal/app/storage/database_factory_test.go index 53feaec8..a47100c6 100644 --- a/internal/app/storage/database_factory_test.go +++ b/internal/app/storage/database_factory_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -98,6 +99,108 @@ func TestNewDatabaseFactory(t *testing.T) { } } +func TestDatabaseFactory_CreateRegistryMetricsReader(t *testing.T) { + t.Parallel() + + ctx := context.Background() + db, cleanupFunc := database.SetupTestDBContainer(t, ctx) + t.Cleanup(cleanupFunc) + + err := database.MigrateUp(ctx, db) + require.NoError(t, err) + + cfg := &config.Config{ + Database: &config.DatabaseConfig{ + Host: db.Config().Host, + Port: int(db.Config().Port), + User: db.Config().User, + Password: database.DBPass, + Database: db.Config().Database, + SSLMode: "disable", + }, + } + factory, err := NewDatabaseFactory(ctx, cfg) + require.NoError(t, err) + t.Cleanup(factory.Cleanup) + + alphaSourceID := insertMetricTestSource(ctx, t, factory, "alpha") + insertMetricTestSource(ctx, t, factory, "empty") + + serverEntryID := insertMetricTestEntry(ctx, t, factory, alphaSourceID, "MCP", "server-a") + insertMetricTestVersion(ctx, t, factory, serverEntryID, "server-a", "1.0.0") + insertMetricTestVersion(ctx, t, factory, serverEntryID, "server-a", "2.0.0") + skillEntryID := insertMetricTestEntry(ctx, t, factory, alphaSourceID, "SKILL", "skill-a") + insertMetricTestVersion(ctx, t, factory, skillEntryID, "skill-a", "1.0.0") + + reader, err := factory.CreateRegistryMetricsReader(ctx) + require.NoError(t, err) + + counts, err := reader.RegistryMetricCounts(ctx) + require.NoError(t, err) + + bySource := make(map[string][2]int64, len(counts)) + for _, count := range counts { + bySource[count.SourceName] = [2]int64{count.ServerCount, count.SkillCount} + } + + assert.Equal(t, [2]int64{1, 1}, bySource["alpha"]) + assert.Equal(t, [2]int64{0, 0}, bySource["empty"]) +} + +func insertMetricTestSource( + ctx context.Context, + t *testing.T, + factory *DatabaseFactory, + name string, +) uuid.UUID { + t.Helper() + + var sourceID uuid.UUID + err := factory.pool.QueryRow(ctx, ` +INSERT INTO source (name, source_type, syncable, creation_type) +VALUES ($1, 'git', true, 'CONFIG') +RETURNING id`, name).Scan(&sourceID) + require.NoError(t, err) + + return sourceID +} + +func insertMetricTestEntry( + ctx context.Context, + t *testing.T, + factory *DatabaseFactory, + sourceID uuid.UUID, + entryType string, + name string, +) uuid.UUID { + t.Helper() + + var entryID uuid.UUID + err := factory.pool.QueryRow(ctx, ` +INSERT INTO registry_entry (source_id, entry_type, name) +VALUES ($1, $2, $3) +RETURNING id`, sourceID, entryType, name).Scan(&entryID) + require.NoError(t, err) + + return entryID +} + +func insertMetricTestVersion( + ctx context.Context, + t *testing.T, + factory *DatabaseFactory, + entryID uuid.UUID, + name string, + version string, +) { + t.Helper() + + _, err := factory.pool.Exec(ctx, ` +INSERT INTO entry_version (entry_id, name, version) +VALUES ($1, $2, $3)`, entryID, name, version) + require.NoError(t, err) +} + func TestDatabaseFactory_CreateStateService(t *testing.T) { t.Parallel() diff --git a/internal/app/storage/registry_metrics.go b/internal/app/storage/registry_metrics.go new file mode 100644 index 00000000..e04b0d0b --- /dev/null +++ b/internal/app/storage/registry_metrics.go @@ -0,0 +1,58 @@ +package storage + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/stacklok/toolhive-registry-server/internal/telemetry" +) + +const registryMetricCountsQuery = ` +SELECT src.name, + COUNT(re.id) FILTER (WHERE re.entry_type = 'MCP')::bigint AS server_count, + COUNT(re.id) FILTER (WHERE re.entry_type = 'SKILL')::bigint AS skill_count + FROM source src + LEFT JOIN registry_entry re ON re.source_id = src.id + GROUP BY src.name + ORDER BY src.name` + +type registryMetricsReader struct { + pool *pgxpool.Pool +} + +var _ telemetry.RegistryMetricReader = (*registryMetricsReader)(nil) + +// CreateRegistryMetricsReader creates a reader for source-level registry metrics. +func (d *DatabaseFactory) CreateRegistryMetricsReader(_ context.Context) (telemetry.RegistryMetricReader, error) { + if d.pool == nil { + return nil, fmt.Errorf("pgx pool is required") + } + + return ®istryMetricsReader{pool: d.pool}, nil +} + +func (r *registryMetricsReader) RegistryMetricCounts( + ctx context.Context, +) ([]telemetry.RegistryMetricCount, error) { + rows, err := r.pool.Query(ctx, registryMetricCountsQuery) + if err != nil { + return nil, fmt.Errorf("query registry metric counts: %w", err) + } + defer rows.Close() + + var counts []telemetry.RegistryMetricCount + for rows.Next() { + var count telemetry.RegistryMetricCount + if err := rows.Scan(&count.SourceName, &count.ServerCount, &count.SkillCount); err != nil { + return nil, fmt.Errorf("scan registry metric count: %w", err) + } + counts = append(counts, count) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate registry metric counts: %w", err) + } + + return counts, nil +} diff --git a/internal/sync/coordinator/coordinator.go b/internal/sync/coordinator/coordinator.go index b934d3b5..44bde233 100644 --- a/internal/sync/coordinator/coordinator.go +++ b/internal/sync/coordinator/coordinator.go @@ -188,6 +188,9 @@ func (c *defaultCoordinator) Stop() error { // Wait for coordinator to finish <-c.done } + if err := c.registryMetrics.Unregister(); err != nil { + return fmt.Errorf("failed to unregister registry metrics: %w", err) + } return nil } @@ -319,10 +322,5 @@ func (c *defaultCoordinator) performRegistrySync( c.syncMetrics.RecordSyncDuration(ctx, registryName, syncDuration, true) } - // Record registry metrics - if c.registryMetrics != nil { - c.registryMetrics.RecordServersTotal(ctx, registryName, int64(result.ServerCount)) - c.registryMetrics.RecordSkillsTotal(ctx, registryName, int64(result.SkillCount)) - } } } diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 78da721d..342cd6ca 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -19,67 +19,89 @@ const ( // RegistryMetrics holds the OpenTelemetry instruments for registry metrics type RegistryMetrics struct { - serversTotal metric.Int64Gauge - skillsTotal metric.Int64Gauge + serversTotal metric.Int64ObservableGauge + skillsTotal metric.Int64ObservableGauge + registration metric.Registration +} + +// RegistryMetricCount is the point-in-time count of registry entries for a source. +type RegistryMetricCount struct { + SourceName string + ServerCount int64 + SkillCount int64 +} + +// RegistryMetricReader reads registry metric values at collection time. +type RegistryMetricReader interface { + RegistryMetricCounts(ctx context.Context) ([]RegistryMetricCount, error) } // NewRegistryMetrics creates a new RegistryMetrics instance with the given meter provider. // If provider is nil, it returns nil (no-op metrics). -func NewRegistryMetrics(provider metric.MeterProvider) (*RegistryMetrics, error) { +func NewRegistryMetrics(provider metric.MeterProvider, reader RegistryMetricReader) (*RegistryMetrics, error) { if provider == nil { return nil, nil } meter := provider.Meter(RegistryMetricsMeterName) - serversTotal, err := meter.Int64Gauge( + serversTotal, err := meter.Int64ObservableGauge( "thv_reg_srv_servers_total", - metric.WithDescription("Number of servers in each registry"), + metric.WithDescription("Number of distinct servers in each source"), metric.WithUnit("{server}"), ) if err != nil { return nil, err } - skillsTotal, err := meter.Int64Gauge( + skillsTotal, err := meter.Int64ObservableGauge( "thv_reg_srv_skills_total", - metric.WithDescription("Number of skills in each registry"), + metric.WithDescription("Number of distinct skills in each source"), metric.WithUnit("{skill}"), ) if err != nil { return nil, err } + var registration metric.Registration + if reader != nil { + registration, err = meter.RegisterCallback( + func(ctx context.Context, observer metric.Observer) error { + counts, err := reader.RegistryMetricCounts(ctx) + if err != nil { + return err + } + + for _, count := range counts { + attrs := metric.WithAttributes(attribute.String("source", count.SourceName)) + observer.ObserveInt64(serversTotal, count.ServerCount, attrs) + observer.ObserveInt64(skillsTotal, count.SkillCount, attrs) + } + + return nil + }, + serversTotal, + skillsTotal, + ) + if err != nil { + return nil, err + } + } + return &RegistryMetrics{ serversTotal: serversTotal, skillsTotal: skillsTotal, + registration: registration, }, nil } -// RecordServersTotal records the current number of servers in a registry -func (m *RegistryMetrics) RecordServersTotal(ctx context.Context, registryName string, count int64) { - if m == nil || m.serversTotal == nil { - return - } - - attrs := []attribute.KeyValue{ - attribute.String("registry", registryName), - } - - m.serversTotal.Record(ctx, count, metric.WithAttributes(attrs...)) -} - -// RecordSkillsTotal records the current number of skills in a registry -func (m *RegistryMetrics) RecordSkillsTotal(ctx context.Context, registryName string, count int64) { - if m == nil || m.skillsTotal == nil { - return - } - - attrs := []attribute.KeyValue{ - attribute.String("registry", registryName), +// Unregister removes the observable callback registered for registry metrics. +func (m *RegistryMetrics) Unregister() error { + if m == nil || m.registration == nil { + return nil } - m.skillsTotal.Record(ctx, count, metric.WithAttributes(attrs...)) + return m.registration.Unregister() } // SyncMetrics holds the OpenTelemetry instruments for sync operation metrics diff --git a/internal/telemetry/metrics_test.go b/internal/telemetry/metrics_test.go index 514baf92..e2190ec4 100644 --- a/internal/telemetry/metrics_test.go +++ b/internal/telemetry/metrics_test.go @@ -2,22 +2,36 @@ package telemetry import ( "context" + "errors" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +type staticRegistryMetricReader struct { + counts []RegistryMetricCount + err error +} + +func (r *staticRegistryMetricReader) RegistryMetricCounts(_ context.Context) ([]RegistryMetricCount, error) { + if r.err != nil { + return nil, r.err + } + return r.counts, nil +} + func TestNewRegistryMetrics(t *testing.T) { t.Parallel() t.Run("returns nil when provider is nil", func(t *testing.T) { t.Parallel() - metrics, err := NewRegistryMetrics(nil) + metrics, err := NewRegistryMetrics(nil, nil) require.NoError(t, err) assert.Nil(t, metrics) }) @@ -28,57 +42,98 @@ func TestNewRegistryMetrics(t *testing.T) { mp := sdkmetric.NewMeterProvider() defer func() { _ = mp.Shutdown(context.Background()) }() - metrics, err := NewRegistryMetrics(mp) + metrics, err := NewRegistryMetrics(mp, &staticRegistryMetricReader{}) require.NoError(t, err) assert.NotNil(t, metrics) assert.NotNil(t, metrics.serversTotal) }) } -func TestRegistryMetrics_RecordServersTotal(t *testing.T) { +func TestRegistryMetrics_ObservableTotals(t *testing.T) { t.Parallel() - t.Run("no-op when metrics is nil", func(t *testing.T) { + t.Run("observes server and skill counts with source attribute", func(t *testing.T) { t.Parallel() - var metrics *RegistryMetrics - // Should not panic - metrics.RecordServersTotal(context.Background(), "test-registry", 10) + reader := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + defer func() { _ = mp.Shutdown(context.Background()) }() + + metricsReader := &staticRegistryMetricReader{ + counts: []RegistryMetricCount{ + {SourceName: "prod-source", ServerCount: 42, SkillCount: 3}, + {SourceName: "dev-source", ServerCount: 10, SkillCount: 1}, + }, + } + metrics, err := NewRegistryMetrics(mp, metricsReader) + require.NoError(t, err) + require.NotNil(t, metrics) + + var rm metricdata.ResourceMetrics + err = reader.Collect(context.Background(), &rm) + require.NoError(t, err) + + servers := findInt64Gauge(t, rm, "thv_reg_srv_servers_total") + require.Len(t, servers.DataPoints, 2) + assertInt64GaugePoint(t, servers, "prod-source", 42) + assertInt64GaugePoint(t, servers, "dev-source", 10) + + skills := findInt64Gauge(t, rm, "thv_reg_srv_skills_total") + require.Len(t, skills.DataPoints, 2) + assertInt64GaugePoint(t, skills, "prod-source", 3) + assertInt64GaugePoint(t, skills, "dev-source", 1) }) - t.Run("records server count with registry attribute", func(t *testing.T) { + t.Run("returns reader errors from collection", func(t *testing.T) { t.Parallel() reader := sdkmetric.NewManualReader() mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) defer func() { _ = mp.Shutdown(context.Background()) }() - metrics, err := NewRegistryMetrics(mp) + expectedErr := errors.New("read failed") + metrics, err := NewRegistryMetrics(mp, &staticRegistryMetricReader{err: expectedErr}) require.NoError(t, err) require.NotNil(t, metrics) - // Record some metrics - metrics.RecordServersTotal(context.Background(), "prod-registry", 42) - metrics.RecordServersTotal(context.Background(), "dev-registry", 10) - - // Collect metrics var rm metricdata.ResourceMetrics err = reader.Collect(context.Background(), &rm) - require.NoError(t, err) + require.ErrorIs(t, err, expectedErr) + }) +} - // Verify metrics were recorded - require.NotEmpty(t, rm.ScopeMetrics) +func findInt64Gauge(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.Gauge[int64] { + t.Helper() - // Find our registry metrics scope - var foundScope bool - for _, scope := range rm.ScopeMetrics { - if scope.Scope.Name == RegistryMetricsMeterName { - foundScope = true - assert.NotEmpty(t, scope.Metrics) + for _, scope := range rm.ScopeMetrics { + if scope.Scope.Name != RegistryMetricsMeterName { + continue + } + for _, m := range scope.Metrics { + if m.Name == name { + gauge, ok := m.Data.(metricdata.Gauge[int64]) + require.True(t, ok, "expected int64 gauge data type for %s", name) + return gauge } } - assert.True(t, foundScope, "expected to find registry metrics scope") - }) + } + + require.FailNow(t, "metric not found", name) + return metricdata.Gauge[int64]{} +} + +func assertInt64GaugePoint(t *testing.T, gauge metricdata.Gauge[int64], sourceName string, value int64) { + t.Helper() + + expectedAttrs := attribute.NewSet(attribute.String("source", sourceName)) + for _, point := range gauge.DataPoints { + if point.Attributes.Equals(&expectedAttrs) { + assert.Equal(t, value, point.Value) + return + } + } + + require.FailNowf(t, "gauge point not found", "source=%s", sourceName) } func TestNewSyncMetrics(t *testing.T) {