Skip to content

Commit 5c074a9

Browse files
committed
feat(meter): ignore late events
1 parent 5dcd40d commit 5c074a9

File tree

4 files changed

+43
-24
lines changed

4 files changed

+43
-24
lines changed

openmeter/billing/httpdriver/invoicecost.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ func (h *handler) GetInvoiceLineCost() GetInvoiceLineCostHandler {
130130
To: &line.Period.End,
131131
FilterGroupBy: meterGroupByFilters,
132132
FilterCustomer: []streaming.Customer{*customer},
133+
// We ignore late events because the data is ingested after the invoice is collected
134+
IgnoreLateEvents: invoice.CollectionAt,
133135
}
134136

135137
if request.Params.GroupBy != nil {

openmeter/streaming/clickhouse/connector.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,19 @@ func (c *Connector) QueryMeter(ctx context.Context, namespace string, meter mete
142142
sort.Strings(groupBy)
143143

144144
query := queryMeter{
145-
Database: c.config.Database,
146-
EventsTableName: c.config.EventsTableName,
147-
Namespace: namespace,
148-
Meter: meter,
149-
From: params.From,
150-
To: params.To,
151-
FilterCustomer: params.FilterCustomer,
152-
FilterSubject: params.FilterSubject,
153-
FilterGroupBy: params.FilterGroupBy,
154-
GroupBy: groupBy,
155-
WindowSize: params.WindowSize,
156-
WindowTimeZone: params.WindowTimeZone,
145+
Database: c.config.Database,
146+
EventsTableName: c.config.EventsTableName,
147+
Namespace: namespace,
148+
Meter: meter,
149+
From: params.From,
150+
To: params.To,
151+
IgnoreLateEvents: params.IgnoreLateEvents,
152+
FilterCustomer: params.FilterCustomer,
153+
FilterSubject: params.FilterSubject,
154+
FilterGroupBy: params.FilterGroupBy,
155+
GroupBy: groupBy,
156+
WindowSize: params.WindowSize,
157+
WindowTimeZone: params.WindowTimeZone,
157158
}
158159

159160
// Load cached rows if any

openmeter/streaming/clickhouse/meter_query.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,19 @@ import (
1818
)
1919

2020
type queryMeter struct {
21-
Database string
22-
EventsTableName string
23-
Namespace string
24-
Meter meterpkg.Meter
25-
FilterCustomer []streaming.Customer
26-
FilterSubject []string
27-
FilterGroupBy map[string][]string
28-
From *time.Time
29-
To *time.Time
30-
GroupBy []string
31-
WindowSize *meterpkg.WindowSize
32-
WindowTimeZone *time.Location
21+
Database string
22+
EventsTableName string
23+
Namespace string
24+
Meter meterpkg.Meter
25+
FilterCustomer []streaming.Customer
26+
FilterSubject []string
27+
FilterGroupBy map[string][]string
28+
From *time.Time
29+
To *time.Time
30+
IgnoreLateEvents *time.Time
31+
GroupBy []string
32+
WindowSize *meterpkg.WindowSize
33+
WindowTimeZone *time.Location
3334
}
3435

3536
// from returns the from time for the query.
@@ -286,6 +287,13 @@ func (d *queryMeter) timeWhere(query *sqlbuilder.SelectBuilder) *sqlbuilder.Sele
286287
query = query.Where(query.LessThan(timeColumn, d.To.Unix()))
287288
}
288289

290+
// If ignore late events is set, we filter out data that was ingested after the query period
291+
// Note: This filter runs on a non sorthed column
292+
if d.IgnoreLateEvents != nil {
293+
ingestedAtColumn := getColumn("ingested_at")
294+
query = query.Where(query.LessThan(ingestedAtColumn, d.IgnoreLateEvents.Unix()))
295+
}
296+
289297
return query
290298
}
291299

openmeter/streaming/query_params.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ type QueryParams struct {
1919
GroupBy []string
2020
WindowSize *meter.WindowSize
2121
WindowTimeZone *time.Location
22+
// IgnoreLateEvents is used to filter out data that was ingested after the query period.
23+
// This is useful for queries that are not real-time, such as billing queries.
24+
IgnoreLateEvents *time.Time
2225
}
2326

2427
// Validate validates query params focusing on `from` and `to` being aligned with query and meter window sizes
@@ -41,6 +44,11 @@ func (p *QueryParams) Validate() error {
4144
}
4245
}
4346

47+
// Check that ignore late events is after to
48+
if p.IgnoreLateEvents != nil && p.To != nil && p.To.After(*p.IgnoreLateEvents) {
49+
errs = append(errs, errors.New("ignore late events must be after to"))
50+
}
51+
4452
// This is required because otherwise the response would be ambiguous
4553
if len(p.FilterSubject) > 1 && !slices.Contains(p.GroupBy, "subject") {
4654
errs = append(errs, errors.New("multiple subject filters are only allowed with subject group by"))

0 commit comments

Comments
 (0)