Skip to content

Commit cd2f7a0

Browse files
committed
ts: record child metrics with low frequency poller
This change adds the ability to get changefeed child metrics to be recorded. The child metrics have their names augmented with their labels. Each agg metric has up to 1024 child labels collected. Epic: CRDB-55079 Release: None
1 parent 4b3fa1d commit cd2f7a0

File tree

7 files changed

+525
-10
lines changed

7 files changed

+525
-10
lines changed

pkg/server/status/recorder.go

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -451,18 +451,40 @@ func (mr *MetricsRecorder) GetTimeSeriesData(childMetrics bool) []tspb.TimeSerie
451451
return nil
452452
}
453453

454+
now := mr.clock.Now()
455+
lastDataCount := atomic.LoadInt64(&mr.lastDataCount)
456+
data := make([]tspb.TimeSeriesData, 0, lastDataCount)
457+
454458
if childMetrics {
455459
if !ChildMetricsStorageEnabled.Get(&mr.settings.SV) {
456460
return nil
457461
}
458-
return nil // TODO(jasonlmfong): to be implemented
459-
}
460462

461-
lastDataCount := atomic.LoadInt64(&mr.lastDataCount)
462-
data := make([]tspb.TimeSeriesData, 0, lastDataCount)
463+
// Record child metrics from app registry for system tenant only.
464+
recorder := registryRecorder{
465+
registry: mr.mu.appRegistry,
466+
format: nodeTimeSeriesPrefix,
467+
source: mr.mu.desc.NodeID.String(),
468+
timestampNanos: now.UnixNano(),
469+
}
470+
recorder.recordChangefeedChildMetrics(&data)
471+
472+
// Record child metrics from app-level registries for secondary tenants
473+
for tenantID, r := range mr.mu.tenantRegistries {
474+
tenantRecorder := registryRecorder{
475+
registry: r,
476+
format: nodeTimeSeriesPrefix,
477+
source: tsutil.MakeTenantSource(mr.mu.desc.NodeID.String(), tenantID.String()),
478+
timestampNanos: now.UnixNano(),
479+
}
480+
tenantRecorder.recordChangefeedChildMetrics(&data)
481+
}
482+
483+
atomic.CompareAndSwapInt64(&mr.lastDataCount, lastDataCount, int64(len(data)))
484+
return data
485+
}
463486

464487
// Record time series from node-level registries.
465-
now := mr.clock.Now()
466488
recorder := registryRecorder{
467489
registry: mr.mu.nodeRegistry,
468490
format: nodeTimeSeriesPrefix,
@@ -907,6 +929,71 @@ func (rr registryRecorder) recordChild(
907929
})
908930
}
909931

932+
// recordChangefeedChildMetrics iterates through changefeed metrics in the registry and processes child metrics
933+
// for those that have TsdbRecordLabeled set to true in their metadata.
934+
// Records up to 1024 child metrics to prevent unbounded memory usage and performance issues.
935+
//
936+
// NB: Only available for Counter and Gauge metrics.
937+
func (rr registryRecorder) recordChangefeedChildMetrics(dest *[]tspb.TimeSeriesData) {
938+
maxChildMetricsPerMetric := 1024
939+
940+
labels := rr.registry.GetLabels()
941+
rr.registry.Each(func(name string, v interface{}) {
942+
// Check if the metric has child collection enabled in its metadata
943+
iterable, ok := v.(metric.Iterable)
944+
if !ok {
945+
// If we can't get metadata, skip child collection for safety
946+
return
947+
}
948+
metadata := iterable.GetMetadata()
949+
if !metadata.GetTsdbRecordLabeled() {
950+
return // Skip this metric if child collection is not enabled
951+
}
952+
if metadata.Category != metric.Metadata_CHANGEFEEDS {
953+
return
954+
}
955+
956+
prom, ok := v.(metric.PrometheusExportable)
957+
if !ok {
958+
return
959+
}
960+
promIter, ok := v.(metric.PrometheusIterable)
961+
if !ok {
962+
return
963+
}
964+
m := prom.ToPrometheusMetric()
965+
m.Label = append(labels, prom.GetLabels(false /* useStaticLabels */)...)
966+
967+
var childMetricsCount int
968+
processChildMetric := func(childMetric *prometheusgo.Metric) {
969+
if childMetricsCount >= maxChildMetricsPerMetric {
970+
return // Stop processing once we hit the limit
971+
}
972+
973+
var value float64
974+
if childMetric.Gauge != nil {
975+
value = *childMetric.Gauge.Value
976+
} else if childMetric.Counter != nil {
977+
value = *childMetric.Counter.Value
978+
} else {
979+
return
980+
}
981+
*dest = append(*dest, tspb.TimeSeriesData{
982+
Name: fmt.Sprintf(rr.format, prom.GetName(false /* useStaticLabels */)+metric.EncodeLabeledName(childMetric)),
983+
Source: rr.source,
984+
Datapoints: []tspb.TimeSeriesDatapoint{
985+
{
986+
TimestampNanos: rr.timestampNanos,
987+
Value: value,
988+
},
989+
},
990+
})
991+
childMetricsCount++
992+
}
993+
promIter.Each(m.Label, processChildMetric)
994+
})
995+
}
996+
910997
// GetTotalMemory returns either the total system memory (in bytes) or if
911998
// possible the cgroups available memory.
912999
func GetTotalMemory(ctx context.Context) (int64, error) {

0 commit comments

Comments
 (0)