Skip to content

Commit b8272d9

Browse files
authored
fix: paginate recalculation job (#2877)
1 parent 43380c1 commit b8272d9

File tree

1 file changed

+37
-10
lines changed

1 file changed

+37
-10
lines changed

openmeter/entitlement/balanceworker/recalculate.go

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
2121
"github.com/openmeterio/openmeter/openmeter/watermill/marshaler"
2222
"github.com/openmeterio/openmeter/pkg/convert"
23+
"github.com/openmeterio/openmeter/pkg/pagination"
2324
)
2425

2526
const (
@@ -28,6 +29,7 @@ const (
2829
DefaultIncludeDeletedDuration = 24 * time.Hour
2930

3031
defaultLRUCacheSize = 10_000
32+
defaultPageSize = 20_000
3133

3234
metricNameRecalculationTime = "balance_worker.entitlement_recalculation_time_ms"
3335
metricNameRecalculationJobCalculationTime = "balance_worker.entitlement_recalculation_job_calculation_time_ms"
@@ -121,18 +123,43 @@ func (r *Recalculator) Recalculate(ctx context.Context, ns string) error {
121123
return errors.New("namespace is required")
122124
}
123125

124-
affectedEntitlements, err := r.opts.Entitlement.EntitlementRepo.ListEntitlements(
125-
ctx,
126-
entitlement.ListEntitlementsParams{
127-
Namespaces: []string{ns},
128-
IncludeDeleted: true,
129-
IncludeDeletedAfter: time.Now().Add(-DefaultIncludeDeletedDuration),
130-
})
131-
if err != nil {
132-
return err
126+
// Note: this is to support namesapces with more than 64k entitlements, as the subqueries
127+
// to expand the edges uses IN statements in ent. We should rather fix ent to actually chunk
128+
// the subqueries.
129+
affectedEntitlements := []entitlement.Entitlement{}
130+
131+
page := 1
132+
133+
for {
134+
affectedEntitlementsPage, err := r.opts.Entitlement.EntitlementRepo.ListEntitlements(
135+
ctx,
136+
entitlement.ListEntitlementsParams{
137+
Namespaces: []string{ns},
138+
IncludeDeleted: true,
139+
IncludeDeletedAfter: time.Now().Add(-DefaultIncludeDeletedDuration),
140+
Page: pagination.Page{
141+
PageNumber: page,
142+
PageSize: defaultPageSize,
143+
},
144+
})
145+
if err != nil {
146+
return err
147+
}
148+
149+
if len(affectedEntitlementsPage.Items) == 0 {
150+
break
151+
}
152+
153+
affectedEntitlements = append(affectedEntitlements, affectedEntitlementsPage.Items...)
154+
155+
if len(affectedEntitlements) >= affectedEntitlementsPage.TotalCount {
156+
break
157+
}
158+
159+
page++
133160
}
134161

135-
return r.processEntitlements(ctx, affectedEntitlements.Items)
162+
return r.processEntitlements(ctx, affectedEntitlements)
136163
}
137164

138165
func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []entitlement.Entitlement) error {

0 commit comments

Comments
 (0)