From 47c04354cf9a93665df75f8c23f4fb33b69849b1 Mon Sep 17 00:00:00 2001 From: kangxiang Date: Thu, 19 Mar 2026 18:49:20 +0800 Subject: [PATCH] feat: get custom metrics --- .../querier/engine/clickhouse/clickhouse.go | 28 +++++++++-- .../querier/engine/clickhouse/common/const.go | 4 ++ .../querier/engine/clickhouse/common/utils.go | 27 +++++++++- server/querier/engine/clickhouse/function.go | 6 +-- .../engine/clickhouse/metrics/ext_common.go | 49 ++++++++++++++----- .../engine/clickhouse/metrics/metrics.go | 34 ++++++++++--- server/querier/engine/clickhouse/tag.go | 2 +- 7 files changed, 123 insertions(+), 27 deletions(-) diff --git a/server/querier/engine/clickhouse/clickhouse.go b/server/querier/engine/clickhouse/clickhouse.go index 6374db6ccf9..1f45c239cb8 100644 --- a/server/querier/engine/clickhouse/clickhouse.go +++ b/server/querier/engine/clickhouse/clickhouse.go @@ -28,6 +28,7 @@ import ( "time" //"github.com/k0kubun/pp" + "github.com/bitly/go-simplejson" logging "github.com/op/go-logging" "github.com/xwb1989/sqlparser" @@ -112,6 +113,7 @@ type CHEngine struct { ORGID string Language string NativeField map[string]*metrics.Metrics + CustomMetrics map[string]*simplejson.Json } func init() { @@ -1329,6 +1331,15 @@ func (e *CHEngine) TransFrom(froms sqlparser.TableExprs) error { e.Table = table // native field if config.ControllerCfg.DFWebService.Enabled && (slices.Contains([]string{chCommon.DB_NAME_DEEPFLOW_ADMIN, chCommon.DB_NAME_DEEPFLOW_TENANT, chCommon.DB_NAME_APPLICATION_LOG, chCommon.DB_NAME_EXT_METRICS}, e.DB) || slices.Contains([]string{chCommon.TABLE_NAME_L7_FLOW_LOG, chCommon.TABLE_NAME_EVENT, chCommon.TABLE_NAME_FILE_EVENT}, e.Table)) { + // get custom-metrics + var err error + if e.CustomMetrics == nil { + e.CustomMetrics, err = chCommon.GetCustomMetrics(e.ORGID) + if err != nil { + log.Error(err.Error()) + } + } + customMetrics := e.CustomMetrics e.NativeField = map[string]*metrics.Metrics{} getNativeUrl := fmt.Sprintf("http://localhost:%d/v1/native-fields/?db=%s&table_name=%s", config.ControllerCfg.ListenPort, e.DB, e.Table) resp, err := ctlcommon.CURLPerform("GET", getNativeUrl, nil, ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_ORG_ID, e.ORGID)) @@ -1346,9 +1357,18 @@ func (e *CHEngine) TransFrom(froms sqlparser.TableExprs) error { continue } if fieldType == chCommon.NATIVE_FIELD_TYPE_METRIC { + var mUnit string + mType := metrics.METRICS_TYPE_COUNTER + customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", e.DB, e.Table, nativeMetric)] + if ok { + mType = customMetric.Get("TYPE").MustInt() + mUnit = customMetric.Get("UNIT").MustString() + displayName = customMetric.Get("DISPLAY_NAME").MustString() + description = customMetric.Get("DESCRIPTION").MustString() + } metric := metrics.NewMetrics( 0, nativeMetric, - displayName, displayName, displayName, "", "", "", metrics.METRICS_TYPE_COUNTER, + displayName, displayName, displayName, mUnit, mUnit, mUnit, mType, chCommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "", ) e.NativeField[nativeMetric] = metric @@ -1872,7 +1892,7 @@ func (e *CHEngine) parseSelectBinaryExpr(node sqlparser.Expr) (binary Function, if fieldFunc != nil { return fieldFunc, nil } - metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField) + metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics) if ok { return &Field{Value: metricStruct.DBField}, nil } @@ -1985,7 +2005,7 @@ func (e *CHEngine) parseWhere(node sqlparser.Expr, w *Where, isCheck bool) (view switch comparExpr.(type) { case *sqlparser.ColName, *sqlparser.SQLVal: whereTag := chCommon.ParseAlias(node.Left) - metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField) + metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics) if ok && metricStruct.Type != metrics.METRICS_TYPE_TAG { whereTag = metricStruct.DBField } @@ -2013,7 +2033,7 @@ func (e *CHEngine) parseWhere(node sqlparser.Expr, w *Where, isCheck bool) (view return nil, errors.New(fmt.Sprintf("parse where error: %s(%T)", sqlparser.String(node), node)) } whereTag := chCommon.ParseAlias(node.Expr) - metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField) + metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics) if ok && metricStruct.Type != metrics.METRICS_TYPE_TAG { whereTag = metricStruct.DBField } diff --git a/server/querier/engine/clickhouse/common/const.go b/server/querier/engine/clickhouse/common/const.go index 955e069205b..b25c76e6704 100644 --- a/server/querier/engine/clickhouse/common/const.go +++ b/server/querier/engine/clickhouse/common/const.go @@ -59,6 +59,10 @@ const ( NATIVE_FIELD_STATE_NORMAL = 1 ) +const ( + API_CUSTOM_METRICS_FORMAT = "http://localhost:%d/v1/custom-metrics" +) + var DB_TABLE_MAP = map[string][]string{ DB_NAME_FLOW_LOG: []string{"l4_flow_log", "l7_flow_log", "l4_packet", "l7_packet"}, DB_NAME_FLOW_METRICS: []string{"network", "network_map", "application", "application_map", "traffic_policy"}, diff --git a/server/querier/engine/clickhouse/common/utils.go b/server/querier/engine/clickhouse/common/utils.go index 696f8265398..61a67b8ce0d 100644 --- a/server/querier/engine/clickhouse/common/utils.go +++ b/server/querier/engine/clickhouse/common/utils.go @@ -28,10 +28,13 @@ import ( "strconv" "strings" + "github.com/bitly/go-simplejson" + "github.com/xwb1989/sqlparser" + + ctlcommon "github.com/deepflowio/deepflow/server/controller/common" "github.com/deepflowio/deepflow/server/querier/config" "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client" logging "github.com/op/go-logging" - "github.com/xwb1989/sqlparser" ) var log = logging.MustGetLogger("common") @@ -252,3 +255,25 @@ func GetExtTables(db, where, queryCacheTTL, orgID string, useQueryCache bool, ct } return values } + +func GetCustomMetrics(orgID string) (map[string]*simplejson.Json, error) { + result := make(map[string]*simplejson.Json) + url := fmt.Sprintf(API_CUSTOM_METRICS_FORMAT, config.ControllerCfg.ListenPort) + resp, err := ctlcommon.CURLPerform("GET", url, nil, + ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_ORG_ID, orgID), + ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_USER_ID, strconv.Itoa(ctlcommon.USER_ID_SUPER_ADMIN)), + ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_USER_TYPE, strconv.Itoa(ctlcommon.USER_TYPE_SUPER_ADMIN)), + ) + if err != nil { + return result, fmt.Errorf("request controller url (%s) failed: %s", url, err.Error()) + } + resultArray := resp.Get("DATA").MustArray() + for i := range resultArray { + item := resp.Get("DATA").GetIndex(i) + db := item.Get("DB").MustString() + table := item.Get("TABLE").MustString() + name := item.Get("NAME").MustString() + result[fmt.Sprintf("%s.%s.%s", db, table, name)] = item + } + return result, nil +} diff --git a/server/querier/engine/clickhouse/function.go b/server/querier/engine/clickhouse/function.go index 515e7551b80..b09f0f202e4 100644 --- a/server/querier/engine/clickhouse/function.go +++ b/server/querier/engine/clickhouse/function.go @@ -107,7 +107,7 @@ func GetAggFunc(name string, args []string, alias string, derivativeArgs []strin if !ok { return nil, 0, "", nil } - metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField) + metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics) if !ok { return nil, 0, "", nil } @@ -275,7 +275,7 @@ func GetTopKTrans(name string, args []string, alias string, e *CHEngine) (Statem field = field[5 : len(field)-1] isEnum = true } - metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField) + metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics) if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY { return nil, 0, "", nil } @@ -351,7 +351,7 @@ func GetUniqTrans(name string, args []string, alias string, e *CHEngine) (Statem var metricStruct *metrics.Metrics for _, field := range fields { field = strings.Trim(field, "`") - metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField) + metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics) if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY { return nil, 0, "", nil } diff --git a/server/querier/engine/clickhouse/metrics/ext_common.go b/server/querier/engine/clickhouse/metrics/ext_common.go index c3d9eabc577..fa29cf42423 100644 --- a/server/querier/engine/clickhouse/metrics/ext_common.go +++ b/server/querier/engine/clickhouse/metrics/ext_common.go @@ -22,6 +22,7 @@ import ( "slices" "strings" + simplejson "github.com/bitly/go-simplejson" ctlcommon "github.com/deepflowio/deepflow/server/controller/common" "github.com/deepflowio/deepflow/server/querier/config" "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client" @@ -32,7 +33,7 @@ var EXT_METRICS = map[string]*Metrics{} func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache bool, ctx context.Context) (map[string]*Metrics, error) { loadMetrics := make(map[string]*Metrics) - if slices.Contains([]string{common.DB_NAME_DEEPFLOW_ADMIN, common.DB_NAME_DEEPFLOW_TENANT, common.DB_NAME_APPLICATION_LOG, common.DB_NAME_EXT_METRICS}, db) || slices.Contains([]string{common.TABLE_NAME_L7_FLOW_LOG, common.TABLE_NAME_EVENT, common.TABLE_NAME_FILE_EVENT}, table) { + if slices.Contains([]string{common.DB_NAME_DEEPFLOW_ADMIN, common.DB_NAME_DEEPFLOW_TENANT, common.DB_NAME_APPLICATION_LOG, common.DB_NAME_EXT_METRICS}, db) || slices.Contains([]string{common.TABLE_NAME_L7_FLOW_LOG}, table) { externalChClient := client.Client{ Host: config.Cfg.Clickhouse.Host, Port: config.Cfg.Clickhouse.Port, @@ -59,27 +60,42 @@ func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache log.Error(err) return nil, err } + var customMetrics map[string]*simplejson.Json + if config.ControllerCfg.DFWebService.Enabled { + customMetrics, err = common.GetCustomMetrics(orgID) + if err != nil { + log.Error(err.Error()) + } + } for i, value := range externalMetricFloatRst.Values { tagName := value.([]interface{})[0] tableName := value.([]interface{})[1].(string) externalTag := tagName.(string) metrics_names_field, metrics_values_field := METRICS_ARRAY_NAME_MAP[db][0], METRICS_ARRAY_NAME_MAP[db][1] dbField := fmt.Sprintf("if(indexOf(%s, '%s')=0, null, %s[indexOf(%s, '%s')])", metrics_names_field, externalTag, metrics_values_field, metrics_names_field, externalTag) + var mUnit, description string + mType := METRICS_TYPE_COUNTER metricName := fmt.Sprintf("metrics.%s", externalTag) + displayName := metricName + customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", db, tableName, metricName)] + if ok { + mType = customMetric.Get("TYPE").MustInt() + mUnit = customMetric.Get("UNIT").MustString() + displayName = customMetric.Get("DISPLAY_NAME").MustString() + description = customMetric.Get("DESCRIPTION").MustString() + } lm := NewMetrics( - i, dbField, metricName, metricName, metricName, "", "", "", METRICS_TYPE_COUNTER, - common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", tableName, "", "", "", "", "", + i, dbField, displayName, displayName, displayName, mUnit, mUnit, mUnit, mType, + common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", tableName, description, description, description, "", "", ) loadMetrics[fmt.Sprintf("%s-%s", metricName, tableName)] = lm } - if !slices.Contains([]string{common.TABLE_NAME_EVENT, common.TABLE_NAME_FILE_EVENT}, table) { - lm := NewMetrics( - len(loadMetrics), "metrics", - "metrics", "metrics", "metrics", "", "", "", METRICS_TYPE_ARRAY, - common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, "", "", "", "", "", - ) - loadMetrics[fmt.Sprintf("%s-%s", "metrics", table)] = lm - } + lm := NewMetrics( + len(loadMetrics), "metrics", + "metrics", "metrics", "metrics", "", "", "", METRICS_TYPE_ARRAY, + common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, "", "", "", "", "", + ) + loadMetrics[fmt.Sprintf("%s-%s", "metrics", table)] = lm // native metrics if config.ControllerCfg.DFWebService.Enabled { @@ -101,8 +117,17 @@ func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache if fieldType != common.NATIVE_FIELD_TYPE_METRIC { continue } + var mUnit string + mType := METRICS_TYPE_COUNTER + customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", db, table, nativeMetric)] + if ok { + mType = customMetric.Get("TYPE").MustInt() + mUnit = customMetric.Get("UNIT").MustString() + displayName = customMetric.Get("DISPLAY_NAME").MustString() + description = customMetric.Get("DESCRIPTION").MustString() + } lm := NewMetrics( - len(loadMetrics), nativeMetric, displayName, displayName, displayName, "", "", "", METRICS_TYPE_COUNTER, + len(loadMetrics), nativeMetric, displayName, displayName, displayName, mUnit, mUnit, mUnit, mType, common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "", ) loadMetrics[fmt.Sprintf("%s-%s", nativeMetric, table)] = lm diff --git a/server/querier/engine/clickhouse/metrics/metrics.go b/server/querier/engine/clickhouse/metrics/metrics.go index cb129cd1be3..fcce621fcc6 100644 --- a/server/querier/engine/clickhouse/metrics/metrics.go +++ b/server/querier/engine/clickhouse/metrics/metrics.go @@ -24,6 +24,7 @@ import ( "slices" "strings" + simplejson "github.com/bitly/go-simplejson" "github.com/deepflowio/deepflow/server/querier/common" "github.com/deepflowio/deepflow/server/querier/config" ckcommon "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/common" @@ -116,7 +117,7 @@ func NewReplaceMetrics(dbField string, condition string) *Metrics { } } -func GetAggMetrics(field, db, table, orgID string, nativeField map[string]*Metrics) (*Metrics, bool) { +func GetAggMetrics(field, db, table, orgID string, nativeField map[string]*Metrics, customMetrics map[string]*simplejson.Json) (*Metrics, bool) { field = strings.Trim(field, "`") if field == COUNT_METRICS_NAME { return &Metrics{ @@ -129,7 +130,7 @@ func GetAggMetrics(field, db, table, orgID string, nativeField map[string]*Metri Table: table, }, true } - return GetMetrics(field, db, table, orgID, nativeField) + return GetMetrics(field, db, table, orgID, nativeField, customMetrics) } func GetTagTypeMetrics(tagDescriptions *common.Result, newAllMetrics map[string]*Metrics, db, table, orgID string) error { @@ -225,7 +226,7 @@ func GetTagTypeMetrics(tagDescriptions *common.Result, newAllMetrics map[string] return nil } -func GetMetrics(field, db, table, orgID string, nativeField map[string]*Metrics) (*Metrics, bool) { +func GetMetrics(field, db, table, orgID string, nativeField map[string]*Metrics, customMetrics map[string]*simplejson.Json) (*Metrics, bool) { newAllMetrics := map[string]*Metrics{} field = strings.Trim(field, "`") // flow_tag database has no metrics @@ -241,17 +242,38 @@ func GetMetrics(field, db, table, orgID string, nativeField map[string]*Metrics) ) return metric, true } + // dynamic metrics - if slices.Contains([]string{ckcommon.DB_NAME_DEEPFLOW_ADMIN, ckcommon.DB_NAME_DEEPFLOW_TENANT, ckcommon.DB_NAME_APPLICATION_LOG, ckcommon.DB_NAME_EXT_METRICS}, db) || slices.Contains([]string{ckcommon.TABLE_NAME_L7_FLOW_LOG, ckcommon.TABLE_NAME_EVENT, ckcommon.TABLE_NAME_FILE_EVENT}, table) { + if slices.Contains([]string{ckcommon.DB_NAME_DEEPFLOW_ADMIN, ckcommon.DB_NAME_DEEPFLOW_TENANT, ckcommon.DB_NAME_APPLICATION_LOG, ckcommon.DB_NAME_EXT_METRICS}, db) || slices.Contains([]string{ckcommon.TABLE_NAME_L7_FLOW_LOG}, table) { fieldSplit := strings.Split(field, ".") if len(fieldSplit) > 1 { if fieldSplit[0] == "metrics" { fieldName := strings.Replace(field, "metrics.", "", 1) + var mUnit, description string + displayName := field + mType := METRICS_TYPE_COUNTER + if customMetrics == nil && config.ControllerCfg.DFWebService.Enabled { + var fetchErr error + customMetrics, fetchErr = ckcommon.GetCustomMetrics(orgID) + if fetchErr != nil { + log.Error(fetchErr.Error()) + customMetrics = map[string]*simplejson.Json{} + } + } + if customMetrics != nil { + customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", db, table, field)] + if ok { + mType = customMetric.Get("TYPE").MustInt() + mUnit = customMetric.Get("UNIT").MustString() + displayName = customMetric.Get("DISPLAY_NAME").MustString() + description = customMetric.Get("DESCRIPTION").MustString() + } + } metrics_names_field, metrics_values_field := METRICS_ARRAY_NAME_MAP[db][0], METRICS_ARRAY_NAME_MAP[db][1] metric := NewMetrics( 0, fmt.Sprintf("if(indexOf(%s, '%s')=0,null,%s[indexOf(%s, '%s')])", metrics_names_field, fieldName, metrics_values_field, metrics_names_field, fieldName), - field, field, field, "", "", "", METRICS_TYPE_COUNTER, - ckcommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, "", "", "", "", "", + displayName, displayName, displayName, mUnit, mUnit, mUnit, mType, + ckcommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "", ) return metric, true } else if fieldSplit[0] == "tag" { diff --git a/server/querier/engine/clickhouse/tag.go b/server/querier/engine/clickhouse/tag.go index 8e78b5ca9e7..bcf0d49bac6 100644 --- a/server/querier/engine/clickhouse/tag.go +++ b/server/querier/engine/clickhouse/tag.go @@ -303,7 +303,7 @@ func GetPrometheusAllTagTranslator(e *CHEngine) (string, string, error) { } func GetMetricsTag(name string, alias string, e *CHEngine) (Statement, error) { - metricStruct, ok := metrics.GetMetrics(strings.Trim(name, "`"), e.DB, e.Table, e.ORGID, e.NativeField) + metricStruct, ok := metrics.GetMetrics(strings.Trim(name, "`"), e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics) if !ok { return nil, nil }