Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions server/querier/engine/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

//"github.com/k0kubun/pp"
"github.com/bitly/go-simplejson"
logging "github.com/op/go-logging"
"github.com/xwb1989/sqlparser"

Expand Down Expand Up @@ -112,6 +113,7 @@ type CHEngine struct {
ORGID string
Language string
NativeField map[string]*metrics.Metrics
CustomMetrics map[string]*simplejson.Json
}

func init() {
Expand Down Expand Up @@ -1329,6 +1331,15 @@ func (e *CHEngine) TransFrom(froms sqlparser.TableExprs) error {
e.Table = table
// native field
if config.ControllerCfg.DFWebService.Enabled && (slices.Contains([]string{chCommon.DB_NAME_DEEPFLOW_ADMIN, chCommon.DB_NAME_DEEPFLOW_TENANT, chCommon.DB_NAME_APPLICATION_LOG, chCommon.DB_NAME_EXT_METRICS}, e.DB) || slices.Contains([]string{chCommon.TABLE_NAME_L7_FLOW_LOG, chCommon.TABLE_NAME_EVENT, chCommon.TABLE_NAME_FILE_EVENT}, e.Table)) {
// get custom-metrics
var err error
if e.CustomMetrics == nil {
e.CustomMetrics, err = chCommon.GetCustomMetrics(e.ORGID)
if err != nil {
log.Error(err.Error())
}
}
customMetrics := e.CustomMetrics
e.NativeField = map[string]*metrics.Metrics{}
getNativeUrl := fmt.Sprintf("http://localhost:%d/v1/native-fields/?db=%s&table_name=%s", config.ControllerCfg.ListenPort, e.DB, e.Table)
resp, err := ctlcommon.CURLPerform("GET", getNativeUrl, nil, ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_ORG_ID, e.ORGID))
Expand All @@ -1346,9 +1357,18 @@ func (e *CHEngine) TransFrom(froms sqlparser.TableExprs) error {
continue
}
if fieldType == chCommon.NATIVE_FIELD_TYPE_METRIC {
var mUnit string
mType := metrics.METRICS_TYPE_COUNTER
customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", e.DB, e.Table, nativeMetric)]
if ok {
mType = customMetric.Get("TYPE").MustInt()
mUnit = customMetric.Get("UNIT").MustString()
displayName = customMetric.Get("DISPLAY_NAME").MustString()
description = customMetric.Get("DESCRIPTION").MustString()
}
metric := metrics.NewMetrics(
0, nativeMetric,
displayName, displayName, displayName, "", "", "", metrics.METRICS_TYPE_COUNTER,
displayName, displayName, displayName, mUnit, mUnit, mUnit, mType,
chCommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "",
)
e.NativeField[nativeMetric] = metric
Expand Down Expand Up @@ -1872,7 +1892,7 @@ func (e *CHEngine) parseSelectBinaryExpr(node sqlparser.Expr) (binary Function,
if fieldFunc != nil {
return fieldFunc, nil
}
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if ok {
return &Field{Value: metricStruct.DBField}, nil
}
Expand Down Expand Up @@ -1985,7 +2005,7 @@ func (e *CHEngine) parseWhere(node sqlparser.Expr, w *Where, isCheck bool) (view
switch comparExpr.(type) {
case *sqlparser.ColName, *sqlparser.SQLVal:
whereTag := chCommon.ParseAlias(node.Left)
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if ok && metricStruct.Type != metrics.METRICS_TYPE_TAG {
whereTag = metricStruct.DBField
}
Expand Down Expand Up @@ -2013,7 +2033,7 @@ func (e *CHEngine) parseWhere(node sqlparser.Expr, w *Where, isCheck bool) (view
return nil, errors.New(fmt.Sprintf("parse where error: %s(%T)", sqlparser.String(node), node))
}
whereTag := chCommon.ParseAlias(node.Expr)
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if ok && metricStruct.Type != metrics.METRICS_TYPE_TAG {
whereTag = metricStruct.DBField
}
Expand Down
4 changes: 4 additions & 0 deletions server/querier/engine/clickhouse/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ const (
NATIVE_FIELD_STATE_NORMAL = 1
)

const (
API_CUSTOM_METRICS_FORMAT = "http://localhost:%d/v1/custom-metrics"
)

var DB_TABLE_MAP = map[string][]string{
DB_NAME_FLOW_LOG: []string{"l4_flow_log", "l7_flow_log", "l4_packet", "l7_packet"},
DB_NAME_FLOW_METRICS: []string{"network", "network_map", "application", "application_map", "traffic_policy"},
Expand Down
27 changes: 26 additions & 1 deletion server/querier/engine/clickhouse/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ import (
"strconv"
"strings"

"github.com/bitly/go-simplejson"
"github.com/xwb1989/sqlparser"

ctlcommon "github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/querier/config"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
logging "github.com/op/go-logging"
"github.com/xwb1989/sqlparser"
)

var log = logging.MustGetLogger("common")
Expand Down Expand Up @@ -252,3 +255,25 @@ func GetExtTables(db, where, queryCacheTTL, orgID string, useQueryCache bool, ct
}
return values
}

func GetCustomMetrics(orgID string) (map[string]*simplejson.Json, error) {
result := make(map[string]*simplejson.Json)
url := fmt.Sprintf(API_CUSTOM_METRICS_FORMAT, config.ControllerCfg.ListenPort)
resp, err := ctlcommon.CURLPerform("GET", url, nil,
ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_ORG_ID, orgID),
ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_USER_ID, strconv.Itoa(ctlcommon.USER_ID_SUPER_ADMIN)),
ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_USER_TYPE, strconv.Itoa(ctlcommon.USER_TYPE_SUPER_ADMIN)),
)
if err != nil {
return result, fmt.Errorf("request controller url (%s) failed: %s", url, err.Error())
}
resultArray := resp.Get("DATA").MustArray()
for i := range resultArray {
item := resp.Get("DATA").GetIndex(i)
db := item.Get("DB").MustString()
table := item.Get("TABLE").MustString()
name := item.Get("NAME").MustString()
result[fmt.Sprintf("%s.%s.%s", db, table, name)] = item
}
return result, nil
}
6 changes: 3 additions & 3 deletions server/querier/engine/clickhouse/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func GetAggFunc(name string, args []string, alias string, derivativeArgs []strin
if !ok {
return nil, 0, "", nil
}
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if !ok {
return nil, 0, "", nil
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func GetTopKTrans(name string, args []string, alias string, e *CHEngine) (Statem
field = field[5 : len(field)-1]
isEnum = true
}
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
return nil, 0, "", nil
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func GetUniqTrans(name string, args []string, alias string, e *CHEngine) (Statem
var metricStruct *metrics.Metrics
for _, field := range fields {
field = strings.Trim(field, "`")
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
return nil, 0, "", nil
}
Expand Down
49 changes: 37 additions & 12 deletions server/querier/engine/clickhouse/metrics/ext_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"slices"
"strings"

simplejson "github.com/bitly/go-simplejson"
ctlcommon "github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/querier/config"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
Expand All @@ -32,7 +33,7 @@ var EXT_METRICS = map[string]*Metrics{}

func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache bool, ctx context.Context) (map[string]*Metrics, error) {
loadMetrics := make(map[string]*Metrics)
if slices.Contains([]string{common.DB_NAME_DEEPFLOW_ADMIN, common.DB_NAME_DEEPFLOW_TENANT, common.DB_NAME_APPLICATION_LOG, common.DB_NAME_EXT_METRICS}, db) || slices.Contains([]string{common.TABLE_NAME_L7_FLOW_LOG, common.TABLE_NAME_EVENT, common.TABLE_NAME_FILE_EVENT}, table) {
if slices.Contains([]string{common.DB_NAME_DEEPFLOW_ADMIN, common.DB_NAME_DEEPFLOW_TENANT, common.DB_NAME_APPLICATION_LOG, common.DB_NAME_EXT_METRICS}, db) || slices.Contains([]string{common.TABLE_NAME_L7_FLOW_LOG}, table) {
externalChClient := client.Client{
Host: config.Cfg.Clickhouse.Host,
Port: config.Cfg.Clickhouse.Port,
Expand All @@ -59,27 +60,42 @@ func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache
log.Error(err)
return nil, err
}
var customMetrics map[string]*simplejson.Json
if config.ControllerCfg.DFWebService.Enabled {
customMetrics, err = common.GetCustomMetrics(orgID)
if err != nil {
log.Error(err.Error())
}
}
for i, value := range externalMetricFloatRst.Values {
tagName := value.([]interface{})[0]
tableName := value.([]interface{})[1].(string)
externalTag := tagName.(string)
metrics_names_field, metrics_values_field := METRICS_ARRAY_NAME_MAP[db][0], METRICS_ARRAY_NAME_MAP[db][1]
dbField := fmt.Sprintf("if(indexOf(%s, '%s')=0, null, %s[indexOf(%s, '%s')])", metrics_names_field, externalTag, metrics_values_field, metrics_names_field, externalTag)
var mUnit, description string
mType := METRICS_TYPE_COUNTER
metricName := fmt.Sprintf("metrics.%s", externalTag)
displayName := metricName
customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", db, tableName, metricName)]
if ok {
mType = customMetric.Get("TYPE").MustInt()
mUnit = customMetric.Get("UNIT").MustString()
displayName = customMetric.Get("DISPLAY_NAME").MustString()
description = customMetric.Get("DESCRIPTION").MustString()
}
lm := NewMetrics(
i, dbField, metricName, metricName, metricName, "", "", "", METRICS_TYPE_COUNTER,
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", tableName, "", "", "", "", "",
i, dbField, displayName, displayName, displayName, mUnit, mUnit, mUnit, mType,
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", tableName, description, description, description, "", "",
)
loadMetrics[fmt.Sprintf("%s-%s", metricName, tableName)] = lm
}
if !slices.Contains([]string{common.TABLE_NAME_EVENT, common.TABLE_NAME_FILE_EVENT}, table) {
lm := NewMetrics(
len(loadMetrics), "metrics",
"metrics", "metrics", "metrics", "", "", "", METRICS_TYPE_ARRAY,
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, "", "", "", "", "",
)
loadMetrics[fmt.Sprintf("%s-%s", "metrics", table)] = lm
}
lm := NewMetrics(
len(loadMetrics), "metrics",
"metrics", "metrics", "metrics", "", "", "", METRICS_TYPE_ARRAY,
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, "", "", "", "", "",
)
loadMetrics[fmt.Sprintf("%s-%s", "metrics", table)] = lm

// native metrics
if config.ControllerCfg.DFWebService.Enabled {
Expand All @@ -101,8 +117,17 @@ func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache
if fieldType != common.NATIVE_FIELD_TYPE_METRIC {
continue
}
var mUnit string
mType := METRICS_TYPE_COUNTER
customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", db, table, nativeMetric)]
if ok {
mType = customMetric.Get("TYPE").MustInt()
mUnit = customMetric.Get("UNIT").MustString()
displayName = customMetric.Get("DISPLAY_NAME").MustString()
description = customMetric.Get("DESCRIPTION").MustString()
}
lm := NewMetrics(
len(loadMetrics), nativeMetric, displayName, displayName, displayName, "", "", "", METRICS_TYPE_COUNTER,
len(loadMetrics), nativeMetric, displayName, displayName, displayName, mUnit, mUnit, mUnit, mType,
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "",
)
loadMetrics[fmt.Sprintf("%s-%s", nativeMetric, table)] = lm
Expand Down
34 changes: 28 additions & 6 deletions server/querier/engine/clickhouse/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"slices"
"strings"

simplejson "github.com/bitly/go-simplejson"
"github.com/deepflowio/deepflow/server/querier/common"
"github.com/deepflowio/deepflow/server/querier/config"
ckcommon "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/common"
Expand Down Expand Up @@ -116,7 +117,7 @@ func NewReplaceMetrics(dbField string, condition string) *Metrics {
}
}

func GetAggMetrics(field, db, table, orgID string, nativeField map[string]*Metrics) (*Metrics, bool) {
func GetAggMetrics(field, db, table, orgID string, nativeField map[string]*Metrics, customMetrics map[string]*simplejson.Json) (*Metrics, bool) {
field = strings.Trim(field, "`")
if field == COUNT_METRICS_NAME {
return &Metrics{
Expand All @@ -129,7 +130,7 @@ func GetAggMetrics(field, db, table, orgID string, nativeField map[string]*Metri
Table: table,
}, true
}
return GetMetrics(field, db, table, orgID, nativeField)
return GetMetrics(field, db, table, orgID, nativeField, customMetrics)
}

func GetTagTypeMetrics(tagDescriptions *common.Result, newAllMetrics map[string]*Metrics, db, table, orgID string) error {
Expand Down Expand Up @@ -225,7 +226,7 @@ func GetTagTypeMetrics(tagDescriptions *common.Result, newAllMetrics map[string]
return nil
}

func GetMetrics(field, db, table, orgID string, nativeField map[string]*Metrics) (*Metrics, bool) {
func GetMetrics(field, db, table, orgID string, nativeField map[string]*Metrics, customMetrics map[string]*simplejson.Json) (*Metrics, bool) {
newAllMetrics := map[string]*Metrics{}
field = strings.Trim(field, "`")
// flow_tag database has no metrics
Expand All @@ -241,17 +242,38 @@ func GetMetrics(field, db, table, orgID string, nativeField map[string]*Metrics)
)
return metric, true
}

// dynamic metrics
if slices.Contains([]string{ckcommon.DB_NAME_DEEPFLOW_ADMIN, ckcommon.DB_NAME_DEEPFLOW_TENANT, ckcommon.DB_NAME_APPLICATION_LOG, ckcommon.DB_NAME_EXT_METRICS}, db) || slices.Contains([]string{ckcommon.TABLE_NAME_L7_FLOW_LOG, ckcommon.TABLE_NAME_EVENT, ckcommon.TABLE_NAME_FILE_EVENT}, table) {
if slices.Contains([]string{ckcommon.DB_NAME_DEEPFLOW_ADMIN, ckcommon.DB_NAME_DEEPFLOW_TENANT, ckcommon.DB_NAME_APPLICATION_LOG, ckcommon.DB_NAME_EXT_METRICS}, db) || slices.Contains([]string{ckcommon.TABLE_NAME_L7_FLOW_LOG}, table) {
fieldSplit := strings.Split(field, ".")
if len(fieldSplit) > 1 {
if fieldSplit[0] == "metrics" {
fieldName := strings.Replace(field, "metrics.", "", 1)
var mUnit, description string
displayName := field
mType := METRICS_TYPE_COUNTER
if customMetrics == nil && config.ControllerCfg.DFWebService.Enabled {
var fetchErr error
customMetrics, fetchErr = ckcommon.GetCustomMetrics(orgID)
if fetchErr != nil {
log.Error(fetchErr.Error())
customMetrics = map[string]*simplejson.Json{}
}
}
if customMetrics != nil {
customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", db, table, field)]
if ok {
mType = customMetric.Get("TYPE").MustInt()
mUnit = customMetric.Get("UNIT").MustString()
displayName = customMetric.Get("DISPLAY_NAME").MustString()
description = customMetric.Get("DESCRIPTION").MustString()
}
}
metrics_names_field, metrics_values_field := METRICS_ARRAY_NAME_MAP[db][0], METRICS_ARRAY_NAME_MAP[db][1]
metric := NewMetrics(
0, fmt.Sprintf("if(indexOf(%s, '%s')=0,null,%s[indexOf(%s, '%s')])", metrics_names_field, fieldName, metrics_values_field, metrics_names_field, fieldName),
field, field, field, "", "", "", METRICS_TYPE_COUNTER,
ckcommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, "", "", "", "", "",
displayName, displayName, displayName, mUnit, mUnit, mUnit, mType,
ckcommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "",
)
return metric, true
} else if fieldSplit[0] == "tag" {
Expand Down
2 changes: 1 addition & 1 deletion server/querier/engine/clickhouse/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func GetPrometheusAllTagTranslator(e *CHEngine) (string, string, error) {
}

func GetMetricsTag(name string, alias string, e *CHEngine) (Statement, error) {
metricStruct, ok := metrics.GetMetrics(strings.Trim(name, "`"), e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok := metrics.GetMetrics(strings.Trim(name, "`"), e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if !ok {
return nil, nil
}
Expand Down
Loading