Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 25 additions & 22 deletions internal/apirouter/log_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ func parseIncludeOptions(c *gin.Context) IncludeOptions {

// APIAttempt is the API response for an attempt
type APIAttempt struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
Status string `json:"status"`
Time time.Time `json:"time"`
Code string `json:"code,omitempty"`
ResponseData map[string]interface{} `json:"response_data,omitempty"`
AttemptNumber int `json:"attempt_number"`
Manual bool `json:"manual"`
ID string `json:"id"`
TenantID string `json:"tenant_id"`
Status string `json:"status"`
Time time.Time `json:"time"`
Code string `json:"code,omitempty"`
ResponseData map[string]interface{} `json:"response_data,omitempty"`
AttemptNumber int `json:"attempt_number"`
Manual bool `json:"manual"`
DestinationType string `json:"destination_type"`

EventID string `json:"event_id"`
DestinationID string `json:"destination_id"`
Expand Down Expand Up @@ -148,15 +149,16 @@ type EventPaginatedResult struct {
// destination field is populated.
func toAPIAttempt(ar *logstore.AttemptRecord, opts IncludeOptions, destDisplay *destregistry.DestinationDisplay) APIAttempt {
api := APIAttempt{
ID: ar.Attempt.ID,
TenantID: ar.Attempt.TenantID,
Status: ar.Attempt.Status,
Time: ar.Attempt.Time,
Code: ar.Attempt.Code,
AttemptNumber: ar.Attempt.AttemptNumber,
Manual: ar.Attempt.Manual,
EventID: ar.Attempt.EventID,
DestinationID: ar.Attempt.DestinationID,
ID: ar.Attempt.ID,
TenantID: ar.Attempt.TenantID,
Status: ar.Attempt.Status,
Time: ar.Attempt.Time,
Code: ar.Attempt.Code,
AttemptNumber: ar.Attempt.AttemptNumber,
Manual: ar.Attempt.Manual,
DestinationType: ar.Attempt.DestinationType,
EventID: ar.Attempt.EventID,
DestinationID: ar.Attempt.DestinationID,
}

if opts.ResponseData {
Expand Down Expand Up @@ -261,11 +263,12 @@ func (h *LogHandlers) listAttemptsInternal(c *gin.Context, tenantIDs []string, d
}

req := logstore.ListAttemptRequest{
TenantIDs: tenantIDs,
EventIDs: ParseArrayQueryParam(c, "event_id"),
DestinationIDs: destinationIDs,
Status: c.Query("status"),
Topics: ParseArrayQueryParam(c, "topic"),
TenantIDs: tenantIDs,
EventIDs: ParseArrayQueryParam(c, "event_id"),
DestinationIDs: destinationIDs,
DestinationTypes: ParseArrayQueryParam(c, "destination_type"),
Status: c.Query("status"),
Topics: ParseArrayQueryParam(c, "topic"),
TimeFilter: logstore.TimeFilter{
GTE: attemptTimeFilter.GTE,
LTE: attemptTimeFilter.LTE,
Expand Down
43 changes: 43 additions & 0 deletions internal/apirouter/log_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,49 @@ func TestAPI_Attempts(t *testing.T) {
})
})

t.Run("destination_type filter and response", func(t *testing.T) {
h := newAPITest(t)

e1 := ef.AnyPointer(ef.WithID("e1"), ef.WithTenantID("t1"))
e2 := ef.AnyPointer(ef.WithID("e2"), ef.WithTenantID("t1"))
a1 := attemptForEvent(e1, af.WithID("a1"), af.WithDestinationType("webhook"))
a2 := attemptForEvent(e2, af.WithID("a2"), af.WithDestinationType("sqs"))
require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{
{Event: e1, Attempt: a1},
{Event: e2, Attempt: a2},
}))

t.Run("filter by destination_type", func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "/api/v1/attempts?destination_type[0]=webhook", nil)
resp := h.do(h.withAPIKey(req))

require.Equal(t, http.StatusOK, resp.Code)

var result apirouter.AttemptPaginatedResult
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result))
require.Len(t, result.Models, 1)
assert.Equal(t, "a1", result.Models[0].ID)
assert.Equal(t, "webhook", result.Models[0].DestinationType)
})

t.Run("destination_type present in response", func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "/api/v1/attempts", nil)
resp := h.do(h.withAPIKey(req))

require.Equal(t, http.StatusOK, resp.Code)

var result apirouter.AttemptPaginatedResult
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result))
require.Len(t, result.Models, 2)
types := map[string]bool{}
for _, m := range result.Models {
types[m.DestinationType] = true
}
assert.True(t, types["webhook"])
assert.True(t, types["sqs"])
})
})

t.Run("Validation", func(t *testing.T) {
t.Run("invalid dir returns 422", func(t *testing.T) {
h := newAPITest(t)
Expand Down
6 changes: 4 additions & 2 deletions internal/apirouter/metrics_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ var (
eventFilters = newStringSet("tenant_id", "topic", "destination_id")

attemptMeasures = newStringSet("count", "successful_count", "failed_count", "error_rate", "first_attempt_count", "retry_count", "manual_retry_count", "avg_attempt_number", "rate", "successful_rate", "failed_rate")
attemptDimensions = newStringSet("tenant_id", "destination_id", "topic", "status", "code", "manual", "attempt_number")
attemptFilters = newStringSet("tenant_id", "destination_id", "topic", "status", "code", "manual", "attempt_number")
attemptDimensions = newStringSet("tenant_id", "destination_id", "destination_type", "topic", "status", "code", "manual", "attempt_number")
attemptFilters = newStringSet("tenant_id", "destination_id", "destination_type", "topic", "status", "code", "manual", "attempt_number")
)

// --- API response types ---
Expand Down Expand Up @@ -396,6 +396,8 @@ func attemptDataPointToAPI(dp logstore.AttemptMetricsDataPoint, measures, dimens
dims["tenant_id"] = derefString(dp.TenantID)
case "destination_id":
dims["destination_id"] = derefString(dp.DestinationID)
case "destination_type":
dims["destination_type"] = derefString(dp.DestinationType)
case "topic":
dims["topic"] = derefString(dp.Topic)
case "status":
Expand Down
50 changes: 50 additions & 0 deletions internal/apirouter/metrics_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,56 @@ func TestAPI_MetricsAttempts(t *testing.T) {
}
})

t.Run("destination_type dimension and filter", func(t *testing.T) {
h := newAPITest(t)

attemptTime := baseStart.Add(30 * time.Minute)
e1 := ef.AnyPointer(ef.WithTenantID("t1"), ef.WithTime(attemptTime))
e2 := ef.AnyPointer(ef.WithTenantID("t1"), ef.WithTime(attemptTime))
a1 := attemptForEvent(e1, af.WithDestinationType("webhook"), af.WithTime(attemptTime))
a2 := attemptForEvent(e2, af.WithDestinationType("sqs"), af.WithTime(attemptTime))
require.NoError(t, h.logStore.InsertMany(t.Context(), []*models.LogEntry{
{Event: e1, Attempt: a1},
{Event: e2, Attempt: a2},
}))

t.Run("as dimension", func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet,
"/api/v1/metrics/attempts?"+baseQS+"&measures[0]=count&dimensions[0]=destination_type", nil)
resp := h.do(h.withAPIKey(req))

require.Equal(t, http.StatusOK, resp.Code)

var result apirouter.APIMetricsResponse
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result))
require.Len(t, result.Data, 2)
types := map[string]bool{}
for _, dp := range result.Data {
dt, ok := dp.Dimensions["destination_type"].(string)
require.True(t, ok, "destination_type dimension should be a string")
types[dt] = true
}
assert.True(t, types["webhook"])
assert.True(t, types["sqs"])
})

t.Run("as filter", func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet,
"/api/v1/metrics/attempts?"+baseQS+"&measures[0]=count&filters[destination_type][0]=webhook", nil)
resp := h.do(h.withAPIKey(req))

require.Equal(t, http.StatusOK, resp.Code)

var result apirouter.APIMetricsResponse
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result))
if len(result.Data) > 0 {
count, ok := result.Data[0].Metrics["count"]
assert.True(t, ok)
assert.Equal(t, float64(1), count)
}
})
})

t.Run("start equals end returns 400", func(t *testing.T) {
h := newAPITest(t)
sameTime := time.Now().UTC().Truncate(time.Second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"encoding/json"
"io"
"net/http"
"strconv"
"net/http/httptest"
"strconv"
"strings"
"sync"
"testing"
Expand Down
7 changes: 4 additions & 3 deletions internal/destregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ func (r *registry) PublishEvent(ctx context.Context, destination *models.Destina
}

attempt := &models.Attempt{
ID: idgen.Attempt(),
DestinationID: destination.ID,
EventID: event.ID,
ID: idgen.Attempt(),
DestinationID: destination.ID,
DestinationType: destination.Type,
EventID: event.ID,
}

// Create a new context with timeout
Expand Down
2 changes: 2 additions & 0 deletions internal/logstore/bucket/dimkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func AttemptDimKey(dp *driver.AttemptMetricsDataPoint, dims []string) DimKey {
parts[i] = derefStr(dp.TenantID)
case "destination_id":
parts[i] = derefStr(dp.DestinationID)
case "destination_type":
parts[i] = derefStr(dp.DestinationType)
case "topic":
parts[i] = derefStr(dp.Topic)
case "status":
Expand Down
56 changes: 35 additions & 21 deletions internal/logstore/chlogstore/chlogstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,11 @@ func buildAttemptQuery(table string, req driver.ListAttemptRequest, q pagination
args = append(args, req.DestinationIDs)
}

if len(req.DestinationTypes) > 0 {
conditions = append(conditions, "destination_type IN ?")
args = append(args, req.DestinationTypes)
}

if req.Status != "" {
conditions = append(conditions, "status = ?")
args = append(args, req.Status)
Expand Down Expand Up @@ -383,6 +388,7 @@ func buildAttemptQuery(table string, req driver.ListAttemptRequest, q pagination
event_id,
tenant_id,
destination_id,
destination_type,
topic,
eligible_for_retry,
event_time,
Expand Down Expand Up @@ -411,6 +417,7 @@ func scanAttemptRecords(rows clickhouse.Rows) ([]attemptRecordWithPosition, erro
eventID string
tenantID string
destinationID string
destinationType string
topic string
eligibleForRetry bool
eventTime time.Time
Expand All @@ -429,6 +436,7 @@ func scanAttemptRecords(rows clickhouse.Rows) ([]attemptRecordWithPosition, erro
&eventID,
&tenantID,
&destinationID,
&destinationType,
&topic,
&eligibleForRetry,
&eventTime,
Expand Down Expand Up @@ -463,16 +471,17 @@ func scanAttemptRecords(rows clickhouse.Rows) ([]attemptRecordWithPosition, erro
results = append(results, attemptRecordWithPosition{
AttemptRecord: &driver.AttemptRecord{
Attempt: &models.Attempt{
ID: attemptID,
TenantID: tenantID,
EventID: eventID,
DestinationID: destinationID,
AttemptNumber: int(attemptNumber),
Manual: manual,
Status: status,
Time: attemptTime,
Code: code,
ResponseData: responseData,
ID: attemptID,
TenantID: tenantID,
EventID: eventID,
DestinationID: destinationID,
DestinationType: destinationType,
AttemptNumber: int(attemptNumber),
Manual: manual,
Status: status,
Time: attemptTime,
Code: code,
ResponseData: responseData,
},
Event: &models.Event{
ID: eventID,
Expand Down Expand Up @@ -582,6 +591,7 @@ func (s *logStoreImpl) RetrieveAttempt(ctx context.Context, req driver.RetrieveA
event_id,
tenant_id,
destination_id,
destination_type,
topic,
eligible_for_retry,
event_time,
Expand All @@ -604,6 +614,7 @@ func (s *logStoreImpl) RetrieveAttempt(ctx context.Context, req driver.RetrieveA
eventID string
tenantID string
destinationID string
destinationType string
topic string
eligibleForRetry bool
eventTime time.Time
Expand All @@ -622,6 +633,7 @@ func (s *logStoreImpl) RetrieveAttempt(ctx context.Context, req driver.RetrieveA
&eventID,
&tenantID,
&destinationID,
&destinationType,
&topic,
&eligibleForRetry,
&eventTime,
Expand Down Expand Up @@ -658,16 +670,17 @@ func (s *logStoreImpl) RetrieveAttempt(ctx context.Context, req driver.RetrieveA

return &driver.AttemptRecord{
Attempt: &models.Attempt{
ID: attemptID,
TenantID: tenantID,
EventID: eventID,
DestinationID: destinationID,
AttemptNumber: int(attemptNumber),
Manual: manual,
Status: status,
Time: attemptTime,
Code: code,
ResponseData: responseData,
ID: attemptID,
TenantID: tenantID,
EventID: eventID,
DestinationID: destinationID,
DestinationType: destinationType,
AttemptNumber: int(attemptNumber),
Manual: manual,
Status: status,
Time: attemptTime,
Code: code,
ResponseData: responseData,
},
Event: &models.Event{
ID: eventID,
Expand Down Expand Up @@ -734,7 +747,7 @@ func (s *logStoreImpl) InsertMany(ctx context.Context, entries []*models.LogEntr
// Insert attempts with their paired event data
attemptBatch, err := s.chDB.PrepareBatch(ctx,
fmt.Sprintf(`INSERT INTO %s (
event_id, tenant_id, destination_id, topic, eligible_for_retry, event_time, metadata, data,
event_id, tenant_id, destination_id, destination_type, topic, eligible_for_retry, event_time, metadata, data,
attempt_id, status, attempt_time, code, response_data, manual, attempt_number
)`, s.attemptsTable),
)
Expand All @@ -759,6 +772,7 @@ func (s *logStoreImpl) InsertMany(ctx context.Context, entries []*models.LogEntr
a.EventID,
event.TenantID,
a.DestinationID,
a.DestinationType,
event.Topic,
event.EligibleForRetry,
event.Time,
Expand Down
Loading
Loading