Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ All metrics are prefixed with `thv_reg_srv_` to distinguish them from other metr
| `thv_reg_srv_http_request_duration_seconds` | Histogram | `method`, `route`, `status_code` | Duration of HTTP requests |
| `thv_reg_srv_http_requests_total` | Counter | `method`, `route`, `status_code` | Total number of HTTP requests |
| `thv_reg_srv_http_active_requests` | UpDownCounter | - | Number of in-flight requests |
| `thv_reg_srv_servers_total` | Gauge | `registry` | Number of servers in each registry |
| `thv_reg_srv_servers_total` | Gauge | `source` | Number of distinct servers in each source |
| `thv_reg_srv_skills_total` | Gauge | `source` | Number of distinct skills in each source |
| `thv_reg_srv_sync_duration_seconds` | Histogram | `registry`, `success` | Duration of sync operations |

### Histogram Buckets
Expand Down
8 changes: 4 additions & 4 deletions examples/otel/grafana/dashboards/registry-server.json
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,11 @@
{
"datasource": { "type": "prometheus", "uid": "${datasource}" },
"expr": "thv_reg_srv_servers_total",
"legendFormat": "{{ registry }}",
"legendFormat": "{{ source }}",
"refId": "A"
}
],
"title": "Servers per Registry",
"title": "Servers per Source",
"type": "stat"
},
{
Expand Down Expand Up @@ -316,11 +316,11 @@
{
"datasource": { "type": "prometheus", "uid": "${datasource}" },
"expr": "thv_reg_srv_servers_total",
"legendFormat": "{{ registry }}",
"legendFormat": "{{ source }}",
"refId": "A"
}
],
"title": "Servers per Registry (over time)",
"title": "Servers per Source (over time)",
"type": "timeseries"
},
{
Expand Down
27 changes: 20 additions & 7 deletions internal/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ type registryAppConfig struct {
coordinatorOpts []coordinator.Option
}

type registryMetricsReaderFactory interface {
CreateRegistryMetricsReader(ctx context.Context) (telemetry.RegistryMetricReader, error)
}

func baseConfig(opts ...RegistryAppOptions) (*registryAppConfig, error) {
cfg := &registryAppConfig{
address: defaultHTTPAddress,
Expand Down Expand Up @@ -359,13 +363,22 @@ func buildSyncComponents(
slog.Info("Sync metrics enabled")
}

registryMetrics, err := telemetry.NewRegistryMetrics(b.meterProvider)
if err != nil {
return nil, fmt.Errorf("failed to create registry metrics: %w", err)
}
if registryMetrics != nil {
coordOpts = append(coordOpts, coordinator.WithRegistryMetrics(registryMetrics))
slog.Info("Registry metrics enabled")
if readerFactory, ok := b.storageFactory.(registryMetricsReaderFactory); ok {
registryMetricsReader, err := readerFactory.CreateRegistryMetricsReader(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create registry metrics reader: %w", err)
}

registryMetrics, err := telemetry.NewRegistryMetrics(b.meterProvider, registryMetricsReader)
if err != nil {
return nil, fmt.Errorf("failed to create registry metrics: %w", err)
}
if registryMetrics != nil {
coordOpts = append(coordOpts, coordinator.WithRegistryMetrics(registryMetrics))
slog.Info("Registry metrics enabled")
}
} else {
slog.Debug("Registry metrics disabled: storage factory does not provide a registry metrics reader")
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/app/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func TestBuildSyncComponents_WithMeterProvider(t *testing.T) {
description string
}{
{
name: "with meter provider creates sync and registry metrics",
name: "with meter provider creates available metrics",
meterProvider: noop.NewMeterProvider(),
description: "coordinator should be created with metrics when meter provider is set",
},
Expand Down
103 changes: 103 additions & 0 deletions internal/app/storage/database_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -98,6 +99,108 @@ func TestNewDatabaseFactory(t *testing.T) {
}
}

func TestDatabaseFactory_CreateRegistryMetricsReader(t *testing.T) {
t.Parallel()

ctx := context.Background()
db, cleanupFunc := database.SetupTestDBContainer(t, ctx)
t.Cleanup(cleanupFunc)

err := database.MigrateUp(ctx, db)
require.NoError(t, err)

cfg := &config.Config{
Database: &config.DatabaseConfig{
Host: db.Config().Host,
Port: int(db.Config().Port),
User: db.Config().User,
Password: database.DBPass,
Database: db.Config().Database,
SSLMode: "disable",
},
}
factory, err := NewDatabaseFactory(ctx, cfg)
require.NoError(t, err)
t.Cleanup(factory.Cleanup)

alphaSourceID := insertMetricTestSource(ctx, t, factory, "alpha")
insertMetricTestSource(ctx, t, factory, "empty")

serverEntryID := insertMetricTestEntry(ctx, t, factory, alphaSourceID, "MCP", "server-a")
insertMetricTestVersion(ctx, t, factory, serverEntryID, "server-a", "1.0.0")
insertMetricTestVersion(ctx, t, factory, serverEntryID, "server-a", "2.0.0")
skillEntryID := insertMetricTestEntry(ctx, t, factory, alphaSourceID, "SKILL", "skill-a")
insertMetricTestVersion(ctx, t, factory, skillEntryID, "skill-a", "1.0.0")

reader, err := factory.CreateRegistryMetricsReader(ctx)
require.NoError(t, err)

counts, err := reader.RegistryMetricCounts(ctx)
require.NoError(t, err)

bySource := make(map[string][2]int64, len(counts))
for _, count := range counts {
bySource[count.SourceName] = [2]int64{count.ServerCount, count.SkillCount}
}

assert.Equal(t, [2]int64{1, 1}, bySource["alpha"])
assert.Equal(t, [2]int64{0, 0}, bySource["empty"])
}

func insertMetricTestSource(
ctx context.Context,
t *testing.T,
factory *DatabaseFactory,
name string,
) uuid.UUID {
t.Helper()

var sourceID uuid.UUID
err := factory.pool.QueryRow(ctx, `
INSERT INTO source (name, source_type, syncable, creation_type)
VALUES ($1, 'git', true, 'CONFIG')
RETURNING id`, name).Scan(&sourceID)
require.NoError(t, err)

return sourceID
}

func insertMetricTestEntry(
ctx context.Context,
t *testing.T,
factory *DatabaseFactory,
sourceID uuid.UUID,
entryType string,
name string,
) uuid.UUID {
t.Helper()

var entryID uuid.UUID
err := factory.pool.QueryRow(ctx, `
INSERT INTO registry_entry (source_id, entry_type, name)
VALUES ($1, $2, $3)
RETURNING id`, sourceID, entryType, name).Scan(&entryID)
require.NoError(t, err)

return entryID
}

func insertMetricTestVersion(
ctx context.Context,
t *testing.T,
factory *DatabaseFactory,
entryID uuid.UUID,
name string,
version string,
) {
t.Helper()

_, err := factory.pool.Exec(ctx, `
INSERT INTO entry_version (entry_id, name, version)
VALUES ($1, $2, $3)`, entryID, name, version)
require.NoError(t, err)
}

func TestDatabaseFactory_CreateStateService(t *testing.T) {
t.Parallel()

Expand Down
58 changes: 58 additions & 0 deletions internal/app/storage/registry_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package storage

import (
"context"
"fmt"

"github.com/jackc/pgx/v5/pgxpool"

"github.com/stacklok/toolhive-registry-server/internal/telemetry"
)

const registryMetricCountsQuery = `
SELECT src.name,
COUNT(re.id) FILTER (WHERE re.entry_type = 'MCP')::bigint AS server_count,
COUNT(re.id) FILTER (WHERE re.entry_type = 'SKILL')::bigint AS skill_count
FROM source src
LEFT JOIN registry_entry re ON re.source_id = src.id
GROUP BY src.name
ORDER BY src.name`

type registryMetricsReader struct {
pool *pgxpool.Pool
}

var _ telemetry.RegistryMetricReader = (*registryMetricsReader)(nil)

// CreateRegistryMetricsReader creates a reader for source-level registry metrics.
func (d *DatabaseFactory) CreateRegistryMetricsReader(_ context.Context) (telemetry.RegistryMetricReader, error) {
if d.pool == nil {
return nil, fmt.Errorf("pgx pool is required")
}

return &registryMetricsReader{pool: d.pool}, nil
}

func (r *registryMetricsReader) RegistryMetricCounts(
ctx context.Context,
) ([]telemetry.RegistryMetricCount, error) {
rows, err := r.pool.Query(ctx, registryMetricCountsQuery)
if err != nil {
return nil, fmt.Errorf("query registry metric counts: %w", err)
}
defer rows.Close()

var counts []telemetry.RegistryMetricCount
for rows.Next() {
var count telemetry.RegistryMetricCount
if err := rows.Scan(&count.SourceName, &count.ServerCount, &count.SkillCount); err != nil {
return nil, fmt.Errorf("scan registry metric count: %w", err)
}
counts = append(counts, count)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate registry metric counts: %w", err)
}

return counts, nil
}
8 changes: 3 additions & 5 deletions internal/sync/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ func (c *defaultCoordinator) Stop() error {
// Wait for coordinator to finish
<-c.done
}
if err := c.registryMetrics.Unregister(); err != nil {
return fmt.Errorf("failed to unregister registry metrics: %w", err)
}
return nil
}

Expand Down Expand Up @@ -319,10 +322,5 @@ func (c *defaultCoordinator) performRegistrySync(
c.syncMetrics.RecordSyncDuration(ctx, registryName, syncDuration, true)
}

// Record registry metrics
if c.registryMetrics != nil {
c.registryMetrics.RecordServersTotal(ctx, registryName, int64(result.ServerCount))
c.registryMetrics.RecordSkillsTotal(ctx, registryName, int64(result.SkillCount))
}
}
}
Loading
Loading