From a4000a0e1a0044611f64cc8f32526e42902448ae Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sat, 4 Apr 2026 00:23:26 +0700 Subject: [PATCH 1/6] feat: add destination_type column to attempts table (schema + logstore) Add destination_type to the Attempt model and persist it across all three logstore drivers (PostgreSQL, ClickHouse, in-memory). Includes schema migrations with indexes and filtering support in ListAttemptRequest. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/destregistry/registry.go | 7 +- internal/logstore/chlogstore/chlogstore.go | 56 ++++++++++------ internal/logstore/driver/driver.go | 21 +++--- internal/logstore/memlogstore/memlogstore.go | 23 ++++--- internal/logstore/pglogstore/pglogstore.go | 65 ++++++++++++------- .../000003_attempt_destination_type.down.sql | 2 + .../000003_attempt_destination_type.up.sql | 2 + .../000008_attempt_destination_type.down.sql | 2 + .../000008_attempt_destination_type.up.sql | 2 + internal/models/entities.go | 21 +++--- 10 files changed, 124 insertions(+), 77 deletions(-) create mode 100644 internal/migrator/migrations/clickhouse/000003_attempt_destination_type.down.sql create mode 100644 internal/migrator/migrations/clickhouse/000003_attempt_destination_type.up.sql create mode 100644 internal/migrator/migrations/postgres/000008_attempt_destination_type.down.sql create mode 100644 internal/migrator/migrations/postgres/000008_attempt_destination_type.up.sql diff --git a/internal/destregistry/registry.go b/internal/destregistry/registry.go index b11763f49..dc107ab06 100644 --- a/internal/destregistry/registry.go +++ b/internal/destregistry/registry.go @@ -142,9 +142,10 @@ func (r *registry) PublishEvent(ctx context.Context, destination *models.Destina } attempt := &models.Attempt{ - ID: idgen.Attempt(), - DestinationID: destination.ID, - EventID: event.ID, + ID: idgen.Attempt(), + DestinationID: destination.ID, + DestinationType: destination.Type, + EventID: event.ID, } // Create a new context with timeout diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index 60872769f..74aa92845 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -337,6 +337,11 @@ func buildAttemptQuery(table string, req driver.ListAttemptRequest, q pagination args = append(args, req.DestinationIDs) } + if len(req.DestinationTypes) > 0 { + conditions = append(conditions, "destination_type IN ?") + args = append(args, req.DestinationTypes) + } + if req.Status != "" { conditions = append(conditions, "status = ?") args = append(args, req.Status) @@ -383,6 +388,7 @@ func buildAttemptQuery(table string, req driver.ListAttemptRequest, q pagination event_id, tenant_id, destination_id, + destination_type, topic, eligible_for_retry, event_time, @@ -411,6 +417,7 @@ func scanAttemptRecords(rows clickhouse.Rows) ([]attemptRecordWithPosition, erro eventID string tenantID string destinationID string + destinationType string topic string eligibleForRetry bool eventTime time.Time @@ -429,6 +436,7 @@ func scanAttemptRecords(rows clickhouse.Rows) ([]attemptRecordWithPosition, erro &eventID, &tenantID, &destinationID, + &destinationType, &topic, &eligibleForRetry, &eventTime, @@ -463,16 +471,17 @@ func scanAttemptRecords(rows clickhouse.Rows) ([]attemptRecordWithPosition, erro results = append(results, attemptRecordWithPosition{ AttemptRecord: &driver.AttemptRecord{ Attempt: &models.Attempt{ - ID: attemptID, - TenantID: tenantID, - EventID: eventID, - DestinationID: destinationID, - AttemptNumber: int(attemptNumber), - Manual: manual, - Status: status, - Time: attemptTime, - Code: code, - ResponseData: responseData, + ID: attemptID, + TenantID: tenantID, + EventID: eventID, + DestinationID: destinationID, + DestinationType: destinationType, + AttemptNumber: int(attemptNumber), + Manual: manual, + Status: status, + Time: attemptTime, + Code: code, + ResponseData: responseData, }, Event: &models.Event{ ID: eventID, @@ -582,6 +591,7 @@ func (s *logStoreImpl) RetrieveAttempt(ctx context.Context, req driver.RetrieveA event_id, tenant_id, destination_id, + destination_type, topic, eligible_for_retry, event_time, @@ -604,6 +614,7 @@ func (s *logStoreImpl) RetrieveAttempt(ctx context.Context, req driver.RetrieveA eventID string tenantID string destinationID string + destinationType string topic string eligibleForRetry bool eventTime time.Time @@ -622,6 +633,7 @@ func (s *logStoreImpl) RetrieveAttempt(ctx context.Context, req driver.RetrieveA &eventID, &tenantID, &destinationID, + &destinationType, &topic, &eligibleForRetry, &eventTime, @@ -658,16 +670,17 @@ func (s *logStoreImpl) RetrieveAttempt(ctx context.Context, req driver.RetrieveA return &driver.AttemptRecord{ Attempt: &models.Attempt{ - ID: attemptID, - TenantID: tenantID, - EventID: eventID, - DestinationID: destinationID, - AttemptNumber: int(attemptNumber), - Manual: manual, - Status: status, - Time: attemptTime, - Code: code, - ResponseData: responseData, + ID: attemptID, + TenantID: tenantID, + EventID: eventID, + DestinationID: destinationID, + DestinationType: destinationType, + AttemptNumber: int(attemptNumber), + Manual: manual, + Status: status, + Time: attemptTime, + Code: code, + ResponseData: responseData, }, Event: &models.Event{ ID: eventID, @@ -734,7 +747,7 @@ func (s *logStoreImpl) InsertMany(ctx context.Context, entries []*models.LogEntr // Insert attempts with their paired event data attemptBatch, err := s.chDB.PrepareBatch(ctx, fmt.Sprintf(`INSERT INTO %s ( - event_id, tenant_id, destination_id, topic, eligible_for_retry, event_time, metadata, data, + event_id, tenant_id, destination_id, destination_type, topic, eligible_for_retry, event_time, metadata, data, attempt_id, status, attempt_time, code, response_data, manual, attempt_number )`, s.attemptsTable), ) @@ -759,6 +772,7 @@ func (s *logStoreImpl) InsertMany(ctx context.Context, entries []*models.LogEntr a.EventID, event.TenantID, a.DestinationID, + a.DestinationType, event.Topic, event.EligibleForRetry, event.Time, diff --git a/internal/logstore/driver/driver.go b/internal/logstore/driver/driver.go index d1b530ab6..fae1db1f7 100644 --- a/internal/logstore/driver/driver.go +++ b/internal/logstore/driver/driver.go @@ -49,16 +49,17 @@ type ListEventResponse struct { } type ListAttemptRequest struct { - Next string - Prev string - Limit int - TimeFilter TimeFilter // optional - filter attempts by time - TenantIDs []string // optional - filter by tenant (if empty, returns all tenants) - EventIDs []string // optional - filter by event ID - DestinationIDs []string // optional - Status string // optional: "success", "failed" - Topics []string // optional - SortOrder string // optional: "asc", "desc" (default: "desc") + Next string + Prev string + Limit int + TimeFilter TimeFilter // optional - filter attempts by time + TenantIDs []string // optional - filter by tenant (if empty, returns all tenants) + EventIDs []string // optional - filter by event ID + DestinationIDs []string // optional + DestinationTypes []string // optional - filter by destination type + Status string // optional: "success", "failed" + Topics []string // optional + SortOrder string // optional: "asc", "desc" (default: "desc") } type ListAttemptResponse struct { diff --git a/internal/logstore/memlogstore/memlogstore.go b/internal/logstore/memlogstore/memlogstore.go index 2f4382b75..6d8dc641c 100644 --- a/internal/logstore/memlogstore/memlogstore.go +++ b/internal/logstore/memlogstore/memlogstore.go @@ -392,6 +392,10 @@ func (s *memLogStore) matchesAttemptFilter(a *models.Attempt, event *models.Even } } + if len(req.DestinationTypes) > 0 && !slices.Contains(req.DestinationTypes, a.DestinationType) { + return false + } + if req.Status != "" && a.Status != req.Status { return false } @@ -460,15 +464,16 @@ func copyAttempt(a *models.Attempt) *models.Attempt { return nil } copied := &models.Attempt{ - ID: a.ID, - TenantID: a.TenantID, - EventID: a.EventID, - DestinationID: a.DestinationID, - AttemptNumber: a.AttemptNumber, - Manual: a.Manual, - Status: a.Status, - Time: a.Time, - Code: a.Code, + ID: a.ID, + TenantID: a.TenantID, + EventID: a.EventID, + DestinationID: a.DestinationID, + DestinationType: a.DestinationType, + AttemptNumber: a.AttemptNumber, + Manual: a.Manual, + Status: a.Status, + Time: a.Time, + Code: a.Code, } if a.ResponseData != nil { diff --git a/internal/logstore/pglogstore/pglogstore.go b/internal/logstore/pglogstore/pglogstore.go index de9e1b4d8..69112124e 100644 --- a/internal/logstore/pglogstore/pglogstore.go +++ b/internal/logstore/pglogstore/pglogstore.go @@ -338,6 +338,12 @@ func buildAttemptQuery(req driver.ListAttemptRequest, q pagination.QueryInput) ( argNum++ } + if len(req.DestinationTypes) > 0 { + conditions = append(conditions, fmt.Sprintf("destination_type = ANY($%d)", argNum)) + args = append(args, req.DestinationTypes) + argNum++ + } + if req.Status != "" { conditions = append(conditions, fmt.Sprintf("status = $%d", argNum)) args = append(args, req.Status) @@ -392,6 +398,7 @@ func buildAttemptQuery(req driver.ListAttemptRequest, q pagination.QueryInput) ( event_id, tenant_id, destination_id, + destination_type, topic, status, time, @@ -442,6 +449,7 @@ func scanAttemptRecords(rows pgx.Rows) ([]attemptRecordWithPosition, error) { eventID string tenantID string destinationID string + destinationType string topic string status string attemptTime time.Time @@ -460,6 +468,7 @@ func scanAttemptRecords(rows pgx.Rows) ([]attemptRecordWithPosition, error) { &eventID, &tenantID, &destinationID, + &destinationType, &topic, &status, &attemptTime, @@ -482,16 +491,17 @@ func scanAttemptRecords(rows pgx.Rows) ([]attemptRecordWithPosition, error) { results = append(results, attemptRecordWithPosition{ AttemptRecord: &driver.AttemptRecord{ Attempt: &models.Attempt{ - ID: id, - TenantID: tenantID, - EventID: eventID, - DestinationID: destinationID, - AttemptNumber: attemptNumber, - Manual: manual, - Status: status, - Time: attemptTime, - Code: code, - ResponseData: responseData, + ID: id, + TenantID: tenantID, + EventID: eventID, + DestinationID: destinationID, + DestinationType: destinationType, + AttemptNumber: attemptNumber, + Manual: manual, + Status: status, + Time: attemptTime, + Code: code, + ResponseData: responseData, }, Event: &models.Event{ ID: eventID, @@ -596,6 +606,7 @@ func (s *logStore) RetrieveAttempt(ctx context.Context, req driver.RetrieveAttem event_id, tenant_id, destination_id, + destination_type, topic, status, time, @@ -618,6 +629,7 @@ func (s *logStore) RetrieveAttempt(ctx context.Context, req driver.RetrieveAttem eventID string tenantID string destinationID string + destinationType string topic string status string attemptTime time.Time @@ -636,6 +648,7 @@ func (s *logStore) RetrieveAttempt(ctx context.Context, req driver.RetrieveAttem &eventID, &tenantID, &destinationID, + &destinationType, &topic, &status, &attemptTime, @@ -661,16 +674,17 @@ func (s *logStore) RetrieveAttempt(ctx context.Context, req driver.RetrieveAttem return &driver.AttemptRecord{ Attempt: &models.Attempt{ - ID: id, - TenantID: tenantID, - EventID: eventID, - DestinationID: destinationID, - AttemptNumber: attemptNumber, - Manual: manual, - Status: status, - Time: attemptTime, - Code: code, - ResponseData: responseData, + ID: id, + TenantID: tenantID, + EventID: eventID, + DestinationID: destinationID, + DestinationType: destinationType, + AttemptNumber: attemptNumber, + Manual: manual, + Status: status, + Time: attemptTime, + Code: code, + ResponseData: responseData, }, Event: &models.Event{ ID: eventID, @@ -731,14 +745,14 @@ func (s *logStore) InsertMany(ctx context.Context, entries []*models.LogEntry) e if len(entries) > 0 { _, err = tx.Exec(ctx, ` INSERT INTO attempts ( - id, event_id, tenant_id, destination_id, topic, status, + id, event_id, tenant_id, destination_id, destination_type, topic, status, time, attempt_number, manual, code, response_data, event_time, eligible_for_retry, event_data, event_metadata ) SELECT * FROM unnest( - $1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], - $7::timestamptz[], $8::integer[], $9::boolean[], $10::text[], $11::jsonb[], - $12::timestamptz[], $13::boolean[], $14::text[], $15::jsonb[] + $1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[], + $8::timestamptz[], $9::integer[], $10::boolean[], $11::text[], $12::jsonb[], + $13::timestamptz[], $14::boolean[], $15::text[], $16::jsonb[] ) ON CONFLICT (time, id) DO UPDATE SET status = EXCLUDED.status, @@ -799,6 +813,7 @@ func attemptArrays(entries []*models.LogEntry) []any { eventIDs := make([]string, n) tenantIDs := make([]string, n) destinationIDs := make([]string, n) + destinationTypes := make([]string, n) topics := make([]string, n) statuses := make([]string, n) times := make([]time.Time, n) @@ -819,6 +834,7 @@ func attemptArrays(entries []*models.LogEntry) []any { eventIDs[i] = a.EventID tenantIDs[i] = e.TenantID destinationIDs[i] = a.DestinationID + destinationTypes[i] = a.DestinationType topics[i] = e.Topic statuses[i] = a.Status times[i] = a.Time @@ -837,6 +853,7 @@ func attemptArrays(entries []*models.LogEntry) []any { eventIDs, tenantIDs, destinationIDs, + destinationTypes, topics, statuses, times, diff --git a/internal/migrator/migrations/clickhouse/000003_attempt_destination_type.down.sql b/internal/migrator/migrations/clickhouse/000003_attempt_destination_type.down.sql new file mode 100644 index 000000000..cac34b99e --- /dev/null +++ b/internal/migrator/migrations/clickhouse/000003_attempt_destination_type.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE {deployment_prefix}attempts DROP INDEX IF EXISTS idx_destination_type; +ALTER TABLE {deployment_prefix}attempts DROP COLUMN IF EXISTS destination_type; diff --git a/internal/migrator/migrations/clickhouse/000003_attempt_destination_type.up.sql b/internal/migrator/migrations/clickhouse/000003_attempt_destination_type.up.sql new file mode 100644 index 000000000..3e4874116 --- /dev/null +++ b/internal/migrator/migrations/clickhouse/000003_attempt_destination_type.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE {deployment_prefix}attempts ADD COLUMN destination_type String DEFAULT ''; +ALTER TABLE {deployment_prefix}attempts ADD INDEX idx_destination_type destination_type TYPE bloom_filter GRANULARITY 1; diff --git a/internal/migrator/migrations/postgres/000008_attempt_destination_type.down.sql b/internal/migrator/migrations/postgres/000008_attempt_destination_type.down.sql new file mode 100644 index 000000000..a3bc96213 --- /dev/null +++ b/internal/migrator/migrations/postgres/000008_attempt_destination_type.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_attempts_tenant_desttype_time; +ALTER TABLE attempts DROP COLUMN IF EXISTS destination_type; diff --git a/internal/migrator/migrations/postgres/000008_attempt_destination_type.up.sql b/internal/migrator/migrations/postgres/000008_attempt_destination_type.up.sql new file mode 100644 index 000000000..4d5fd9ae0 --- /dev/null +++ b/internal/migrator/migrations/postgres/000008_attempt_destination_type.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE attempts ADD COLUMN destination_type text NOT NULL DEFAULT ''; +CREATE INDEX idx_attempts_tenant_desttype_time ON attempts (tenant_id, destination_type, time DESC, id DESC); diff --git a/internal/models/entities.go b/internal/models/entities.go index d81c53131..e577d868d 100644 --- a/internal/models/entities.go +++ b/internal/models/entities.go @@ -126,16 +126,17 @@ const ( ) type Attempt struct { - ID string `json:"id"` - TenantID string `json:"tenant_id"` - EventID string `json:"event_id"` - DestinationID string `json:"destination_id"` - AttemptNumber int `json:"attempt_number"` - Manual bool `json:"manual"` - Status string `json:"status"` - Time time.Time `json:"time"` - Code string `json:"code"` - ResponseData map[string]interface{} `json:"response_data"` + ID string `json:"id"` + TenantID string `json:"tenant_id"` + EventID string `json:"event_id"` + DestinationID string `json:"destination_id"` + DestinationType string `json:"destination_type"` + AttemptNumber int `json:"attempt_number"` + Manual bool `json:"manual"` + Status string `json:"status"` + Time time.Time `json:"time"` + Code string `json:"code"` + ResponseData map[string]interface{} `json:"response_data"` } // ============================== Types ============================== From f06640525f4ded089e4b753cd769ee01e939b816 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sat, 4 Apr 2026 00:23:34 +0700 Subject: [PATCH 2/6] feat: expose destination_type in attempt API response and filtering Add destination_type to APIAttempt response and support destination_type[] query param for filtering in ListAttempts. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/apirouter/log_handlers.go | 47 ++++++++++++++++-------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/internal/apirouter/log_handlers.go b/internal/apirouter/log_handlers.go index 984bde9d4..a15e08fca 100644 --- a/internal/apirouter/log_handlers.go +++ b/internal/apirouter/log_handlers.go @@ -81,14 +81,15 @@ func parseIncludeOptions(c *gin.Context) IncludeOptions { // APIAttempt is the API response for an attempt type APIAttempt struct { - ID string `json:"id"` - TenantID string `json:"tenant_id"` - Status string `json:"status"` - Time time.Time `json:"time"` - Code string `json:"code,omitempty"` - ResponseData map[string]interface{} `json:"response_data,omitempty"` - AttemptNumber int `json:"attempt_number"` - Manual bool `json:"manual"` + ID string `json:"id"` + TenantID string `json:"tenant_id"` + Status string `json:"status"` + Time time.Time `json:"time"` + Code string `json:"code,omitempty"` + ResponseData map[string]interface{} `json:"response_data,omitempty"` + AttemptNumber int `json:"attempt_number"` + Manual bool `json:"manual"` + DestinationType string `json:"destination_type"` EventID string `json:"event_id"` DestinationID string `json:"destination_id"` @@ -148,15 +149,16 @@ type EventPaginatedResult struct { // destination field is populated. func toAPIAttempt(ar *logstore.AttemptRecord, opts IncludeOptions, destDisplay *destregistry.DestinationDisplay) APIAttempt { api := APIAttempt{ - ID: ar.Attempt.ID, - TenantID: ar.Attempt.TenantID, - Status: ar.Attempt.Status, - Time: ar.Attempt.Time, - Code: ar.Attempt.Code, - AttemptNumber: ar.Attempt.AttemptNumber, - Manual: ar.Attempt.Manual, - EventID: ar.Attempt.EventID, - DestinationID: ar.Attempt.DestinationID, + ID: ar.Attempt.ID, + TenantID: ar.Attempt.TenantID, + Status: ar.Attempt.Status, + Time: ar.Attempt.Time, + Code: ar.Attempt.Code, + AttemptNumber: ar.Attempt.AttemptNumber, + Manual: ar.Attempt.Manual, + DestinationType: ar.Attempt.DestinationType, + EventID: ar.Attempt.EventID, + DestinationID: ar.Attempt.DestinationID, } if opts.ResponseData { @@ -261,11 +263,12 @@ func (h *LogHandlers) listAttemptsInternal(c *gin.Context, tenantIDs []string, d } req := logstore.ListAttemptRequest{ - TenantIDs: tenantIDs, - EventIDs: ParseArrayQueryParam(c, "event_id"), - DestinationIDs: destinationIDs, - Status: c.Query("status"), - Topics: ParseArrayQueryParam(c, "topic"), + TenantIDs: tenantIDs, + EventIDs: ParseArrayQueryParam(c, "event_id"), + DestinationIDs: destinationIDs, + DestinationTypes: ParseArrayQueryParam(c, "destination_type"), + Status: c.Query("status"), + Topics: ParseArrayQueryParam(c, "topic"), TimeFilter: logstore.TimeFilter{ GTE: attemptTimeFilter.GTE, LTE: attemptTimeFilter.LTE, From 929f48e88d6ff723219290c5071f0f361c9904d9 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sat, 4 Apr 2026 00:23:42 +0700 Subject: [PATCH 3/6] feat: support destination_type as attempt metrics dimension Add destination_type as a groupable dimension and filterable field in attempt metrics queries across all three logstore drivers and the API. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/apirouter/metrics_handlers.go | 6 ++++-- internal/logstore/bucket/dimkey.go | 2 ++ internal/logstore/chlogstore/metrics.go | 14 ++++++++++++++ internal/logstore/driver/metrics.go | 15 ++++++++------- internal/logstore/memlogstore/metrics.go | 11 +++++++++++ internal/logstore/pglogstore/metrics.go | 14 ++++++++++++++ 6 files changed, 53 insertions(+), 9 deletions(-) diff --git a/internal/apirouter/metrics_handlers.go b/internal/apirouter/metrics_handlers.go index e14d10d44..cc0f4bcaf 100644 --- a/internal/apirouter/metrics_handlers.go +++ b/internal/apirouter/metrics_handlers.go @@ -56,8 +56,8 @@ var ( eventFilters = newStringSet("tenant_id", "topic", "destination_id") attemptMeasures = newStringSet("count", "successful_count", "failed_count", "error_rate", "first_attempt_count", "retry_count", "manual_retry_count", "avg_attempt_number", "rate", "successful_rate", "failed_rate") - attemptDimensions = newStringSet("tenant_id", "destination_id", "topic", "status", "code", "manual", "attempt_number") - attemptFilters = newStringSet("tenant_id", "destination_id", "topic", "status", "code", "manual", "attempt_number") + attemptDimensions = newStringSet("tenant_id", "destination_id", "destination_type", "topic", "status", "code", "manual", "attempt_number") + attemptFilters = newStringSet("tenant_id", "destination_id", "destination_type", "topic", "status", "code", "manual", "attempt_number") ) // --- API response types --- @@ -396,6 +396,8 @@ func attemptDataPointToAPI(dp logstore.AttemptMetricsDataPoint, measures, dimens dims["tenant_id"] = derefString(dp.TenantID) case "destination_id": dims["destination_id"] = derefString(dp.DestinationID) + case "destination_type": + dims["destination_type"] = derefString(dp.DestinationType) case "topic": dims["topic"] = derefString(dp.Topic) case "status": diff --git a/internal/logstore/bucket/dimkey.go b/internal/logstore/bucket/dimkey.go index 23c49dcd2..8a7c4b5fe 100644 --- a/internal/logstore/bucket/dimkey.go +++ b/internal/logstore/bucket/dimkey.go @@ -35,6 +35,8 @@ func AttemptDimKey(dp *driver.AttemptMetricsDataPoint, dims []string) DimKey { parts[i] = derefStr(dp.TenantID) case "destination_id": parts[i] = derefStr(dp.DestinationID) + case "destination_type": + parts[i] = derefStr(dp.DestinationType) case "topic": parts[i] = derefStr(dp.Topic) case "status": diff --git a/internal/logstore/chlogstore/metrics.go b/internal/logstore/chlogstore/metrics.go index a1667c8fd..8b12394e3 100644 --- a/internal/logstore/chlogstore/metrics.go +++ b/internal/logstore/chlogstore/metrics.go @@ -286,6 +286,7 @@ func (s *logStoreImpl) QueryAttemptMetrics(ctx context.Context, req driver.Metri sfTimeBucket sf = iota sfTenantID sfDestID + sfDestType sfTopic sfStatus sfCode @@ -321,6 +322,10 @@ func (s *logStoreImpl) QueryAttemptMetrics(ctx context.Context, req driver.Metri selectExprs = append(selectExprs, "destination_id") groupExprs = append(groupExprs, "destination_id") order = append(order, sfDestID) + case "destination_type": + selectExprs = append(selectExprs, "destination_type") + groupExprs = append(groupExprs, "destination_type") + order = append(order, sfDestType) case "topic": selectExprs = append(selectExprs, "topic") groupExprs = append(groupExprs, "topic") @@ -392,6 +397,9 @@ func (s *logStoreImpl) QueryAttemptMetrics(ctx context.Context, req driver.Metri if dests, ok := req.Filters["destination_id"]; ok { conditions, args = addInFilter(conditions, args, "destination_id", dests) } + if destTypes, ok := req.Filters["destination_type"]; ok { + conditions, args = addInFilter(conditions, args, "destination_type", destTypes) + } if topics, ok := req.Filters["topic"]; ok { conditions, args = addInFilter(conditions, args, "topic", topics) } @@ -430,6 +438,7 @@ func (s *logStoreImpl) QueryAttemptMetrics(ctx context.Context, req driver.Metri tbVal time.Time tenantIDVal string destIDVal string + destTypeVal string topicVal string statusVal string codeVal string @@ -454,6 +463,8 @@ func (s *logStoreImpl) QueryAttemptMetrics(ctx context.Context, req driver.Metri scanDests[i] = &tenantIDVal case sfDestID: scanDests[i] = &destIDVal + case sfDestType: + scanDests[i] = &destTypeVal case sfTopic: scanDests[i] = &topicVal case sfStatus: @@ -501,6 +512,9 @@ func (s *logStoreImpl) QueryAttemptMetrics(ctx context.Context, req driver.Metri case sfDestID: v := destIDVal dp.DestinationID = &v + case sfDestType: + v := destTypeVal + dp.DestinationType = &v case sfTopic: v := topicVal dp.Topic = &v diff --git a/internal/logstore/driver/metrics.go b/internal/logstore/driver/metrics.go index a58ccba83..363bd2f58 100644 --- a/internal/logstore/driver/metrics.go +++ b/internal/logstore/driver/metrics.go @@ -102,13 +102,14 @@ type AttemptMetricsDataPoint struct { SuccessfulRate *float64 FailedRate *float64 // Dimensions - TenantID *string - DestinationID *string - Topic *string - Status *string - Code *string - Manual *bool - AttemptNumber *int + TenantID *string + DestinationID *string + DestinationType *string + Topic *string + Status *string + Code *string + Manual *bool + AttemptNumber *int } type AttemptMetricsResponse struct { diff --git a/internal/logstore/memlogstore/metrics.go b/internal/logstore/memlogstore/metrics.go index 431a703bb..433a8df87 100644 --- a/internal/logstore/memlogstore/metrics.go +++ b/internal/logstore/memlogstore/metrics.go @@ -156,6 +156,7 @@ func (s *memLogStore) QueryAttemptMetrics(ctx context.Context, req driver.Metric timeBucket string tenantID string destID string + destType string topic string status string code string @@ -176,6 +177,8 @@ func (s *memLogStore) QueryAttemptMetrics(ctx context.Context, req driver.Metric key.tenantID = ae.event.TenantID case "destination_id": key.destID = ae.attempt.DestinationID + case "destination_type": + key.destType = ae.attempt.DestinationType case "topic": key.topic = ae.event.Topic case "status": @@ -214,6 +217,9 @@ func (s *memLogStore) QueryAttemptMetrics(ctx context.Context, req driver.Metric case "destination_id": v := key.destID dp.DestinationID = &v + case "destination_type": + v := key.destType + dp.DestinationType = &v case "topic": v := key.topic dp.Topic = &v @@ -362,6 +368,11 @@ func matchesAttemptMetricsFilter(a *models.Attempt, event *models.Event, req dri return false } } + if destTypes, ok := req.Filters["destination_type"]; ok { + if !contains(destTypes, a.DestinationType) { + return false + } + } if topics, ok := req.Filters["topic"]; ok { if !contains(topics, event.Topic) { return false diff --git a/internal/logstore/pglogstore/metrics.go b/internal/logstore/pglogstore/metrics.go index 10ec0e70d..a1f524ca7 100644 --- a/internal/logstore/pglogstore/metrics.go +++ b/internal/logstore/pglogstore/metrics.go @@ -289,6 +289,7 @@ func (s *logStore) QueryAttemptMetrics(ctx context.Context, req driver.MetricsRe sfTimeBucket sf = iota sfTenantID sfDestID + sfDestType sfTopic sfStatus sfCode @@ -324,6 +325,10 @@ func (s *logStore) QueryAttemptMetrics(ctx context.Context, req driver.MetricsRe selectExprs = append(selectExprs, "destination_id") groupExprs = append(groupExprs, "destination_id") order = append(order, sfDestID) + case "destination_type": + selectExprs = append(selectExprs, "destination_type") + groupExprs = append(groupExprs, "destination_type") + order = append(order, sfDestType) case "topic": selectExprs = append(selectExprs, "topic") groupExprs = append(groupExprs, "topic") @@ -390,6 +395,9 @@ func (s *logStore) QueryAttemptMetrics(ctx context.Context, req driver.MetricsRe if dests, ok := req.Filters["destination_id"]; ok { conditions = append(conditions, "destination_id = ANY("+arg(dests)+")") } + if destTypes, ok := req.Filters["destination_type"]; ok { + conditions = append(conditions, "destination_type = ANY("+arg(destTypes)+")") + } if topics, ok := req.Filters["topic"]; ok { conditions = append(conditions, "topic = ANY("+arg(topics)+")") } @@ -425,6 +433,7 @@ func (s *logStore) QueryAttemptMetrics(ctx context.Context, req driver.MetricsRe tbVal time.Time tenantIDVal string destIDVal string + destTypeVal string topicVal string statusVal string codeVal string @@ -449,6 +458,8 @@ func (s *logStore) QueryAttemptMetrics(ctx context.Context, req driver.MetricsRe scanDests[i] = &tenantIDVal case sfDestID: scanDests[i] = &destIDVal + case sfDestType: + scanDests[i] = &destTypeVal case sfTopic: scanDests[i] = &topicVal case sfStatus: @@ -496,6 +507,9 @@ func (s *logStore) QueryAttemptMetrics(ctx context.Context, req driver.MetricsRe case sfDestID: v := destIDVal dp.DestinationID = &v + case sfDestType: + v := destTypeVal + dp.DestinationType = &v case sfTopic: v := topicVal dp.Topic = &v From b0e6c6c9329785e88f37d17fc6332f5d4a76f465 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sat, 4 Apr 2026 00:43:53 +0700 Subject: [PATCH 4/6] test: add integration tests for destination_type filter and metrics dimension Add WithDestinationType factory option and tests covering: - ListAttempts destination_type[] filter - destination_type in attempt API response - destination_type as metrics dimension and filter Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/apirouter/log_handlers_test.go | 43 ++++++++++++++++++ internal/apirouter/metrics_handlers_test.go | 49 +++++++++++++++++++++ internal/util/testutil/event.go | 6 +++ 3 files changed, 98 insertions(+) diff --git a/internal/apirouter/log_handlers_test.go b/internal/apirouter/log_handlers_test.go index 7611d6351..bb6ad63ff 100644 --- a/internal/apirouter/log_handlers_test.go +++ b/internal/apirouter/log_handlers_test.go @@ -935,6 +935,49 @@ func TestAPI_Attempts(t *testing.T) { }) }) + t.Run("destination_type filter and response", func(t *testing.T) { + h := newAPITest(t) + + e1 := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1")) + e2 := ef.AnyPointer(ef.WithID("e2"), ef.WithTenantID("t1")) + a1 := attemptForEvent(e1, af.WithID("a1"), af.WithDestinationType("webhook")) + a2 := attemptForEvent(e2, af.WithID("a2"), af.WithDestinationType("sqs")) + require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{ + {Event: e1, Attempt: a1}, + {Event: e2, Attempt: a2}, + })) + + t.Run("filter by destination_type", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, "/api/v1/attempts?destination_type[0]=webhook", nil) + resp := h.do(h.withAPIKey(req)) + + require.Equal(t, http.StatusOK, resp.Code) + + var result apirouter.AttemptPaginatedResult + require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result)) + require.Len(t, result.Models, 1) + assert.Equal(t, "a1", result.Models[0].ID) + assert.Equal(t, "webhook", result.Models[0].DestinationType) + }) + + t.Run("destination_type present in response", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, "/api/v1/attempts", nil) + resp := h.do(h.withAPIKey(req)) + + require.Equal(t, http.StatusOK, resp.Code) + + var result apirouter.AttemptPaginatedResult + require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result)) + require.Len(t, result.Models, 2) + types := map[string]bool{} + for _, m := range result.Models { + types[m.DestinationType] = true + } + assert.True(t, types["webhook"]) + assert.True(t, types["sqs"]) + }) + }) + t.Run("Validation", func(t *testing.T) { t.Run("invalid dir returns 422", func(t *testing.T) { h := newAPITest(t) diff --git a/internal/apirouter/metrics_handlers_test.go b/internal/apirouter/metrics_handlers_test.go index 8f44f444d..81981c2f4 100644 --- a/internal/apirouter/metrics_handlers_test.go +++ b/internal/apirouter/metrics_handlers_test.go @@ -515,6 +515,55 @@ func TestAPI_MetricsAttempts(t *testing.T) { } }) + t.Run("destination_type dimension and filter", func(t *testing.T) { + h := newAPITest(t) + + attemptTime := baseStart.Add(30 * time.Minute) + e1 := ef.AnyPointer(ef.WithTenantID("t1"), ef.WithTime(attemptTime)) + e2 := ef.AnyPointer(ef.WithTenantID("t1"), ef.WithTime(attemptTime)) + a1 := attemptForEvent(e1, af.WithDestinationType("webhook"), af.WithTime(attemptTime)) + a2 := attemptForEvent(e2, af.WithDestinationType("sqs"), af.WithTime(attemptTime)) + require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{ + {Event: e1, Attempt: a1}, + {Event: e2, Attempt: a2}, + })) + + t.Run("as dimension", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, + "/api/v1/metrics/attempts?"+baseQS+"&measures[0]=count&dimensions[0]=destination_type", nil) + resp := h.do(h.withAPIKey(req)) + + require.Equal(t, http.StatusOK, resp.Code) + + var result apirouter.APIMetricsResponse + require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result)) + require.Len(t, result.Data, 2) + types := map[string]bool{} + for _, dp := range result.Data { + assert.Contains(t, dp.Dimensions, "destination_type") + types[dp.Dimensions["destination_type"].(string)] = true + } + assert.True(t, types["webhook"]) + assert.True(t, types["sqs"]) + }) + + t.Run("as filter", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, + "/api/v1/metrics/attempts?"+baseQS+"&measures[0]=count&filters[destination_type][0]=webhook", nil) + resp := h.do(h.withAPIKey(req)) + + require.Equal(t, http.StatusOK, resp.Code) + + var result apirouter.APIMetricsResponse + require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result)) + if len(result.Data) > 0 { + count, ok := result.Data[0].Metrics["count"] + assert.True(t, ok) + assert.Equal(t, float64(1), count) + } + }) + }) + t.Run("start equals end returns 400", func(t *testing.T) { h := newAPITest(t) sameTime := time.Now().UTC().Truncate(time.Second) diff --git a/internal/util/testutil/event.go b/internal/util/testutil/event.go index 0cbaef40a..bc23e3673 100644 --- a/internal/util/testutil/event.go +++ b/internal/util/testutil/event.go @@ -175,6 +175,12 @@ func (f *mockAttemptFactory) WithDestinationID(destinationID string) func(*model } } +func (f *mockAttemptFactory) WithDestinationType(destinationType string) func(*models.Attempt) { + return func(attempt *models.Attempt) { + attempt.DestinationType = destinationType + } +} + func (f *mockAttemptFactory) WithStatus(status string) func(*models.Attempt) { return func(attempt *models.Attempt) { attempt.Status = status From 8e42724de9feb58c1129752a0d71344e402bcaad Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sat, 4 Apr 2026 01:06:28 +0700 Subject: [PATCH 5/6] chore: gofmt --- .../destwebhookstandard/destwebhookstandard_publish_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/destregistry/providers/destwebhookstandard/destwebhookstandard_publish_test.go b/internal/destregistry/providers/destwebhookstandard/destwebhookstandard_publish_test.go index 4038b3d66..68460b3c5 100644 --- a/internal/destregistry/providers/destwebhookstandard/destwebhookstandard_publish_test.go +++ b/internal/destregistry/providers/destwebhookstandard/destwebhookstandard_publish_test.go @@ -6,8 +6,8 @@ import ( "encoding/json" "io" "net/http" - "strconv" "net/http/httptest" + "strconv" "strings" "sync" "testing" From 432c66eaea50c584c6ac76d863107c9f05cbd80c Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sat, 4 Apr 2026 20:31:02 +0700 Subject: [PATCH 6/6] fix: use safe type assertion in metrics destination_type test Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/apirouter/metrics_handlers_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/apirouter/metrics_handlers_test.go b/internal/apirouter/metrics_handlers_test.go index 81981c2f4..25dda2313 100644 --- a/internal/apirouter/metrics_handlers_test.go +++ b/internal/apirouter/metrics_handlers_test.go @@ -540,8 +540,9 @@ func TestAPI_MetricsAttempts(t *testing.T) { require.Len(t, result.Data, 2) types := map[string]bool{} for _, dp := range result.Data { - assert.Contains(t, dp.Dimensions, "destination_type") - types[dp.Dimensions["destination_type"].(string)] = true + dt, ok := dp.Dimensions["destination_type"].(string) + require.True(t, ok, "destination_type dimension should be a string") + types[dt] = true } assert.True(t, types["webhook"]) assert.True(t, types["sqs"])