Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/dbunique/db_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"

"github.com/riverqueue/river/rivershared/structtag"
"github.com/riverqueue/river/rivershared/uniquestates"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
Expand Down Expand Up @@ -64,14 +65,14 @@ func buildUniqueKeyString(timeGen rivertype.TimeGenerator, uniqueOpts *UniqueOpt
if uniqueOpts.ByArgs {
var encodedArgsForUnique []byte
// Get unique JSON keys from the JobArgs struct:
uniqueFields, err := getSortedUniqueFieldsCached(params.Args)
uniqueFields, err := structtag.SortedFieldsWithTag(params.Args, "unique")
if err != nil {
return "", err
}

if len(uniqueFields) > 0 {
// Extract unique values from the EncodedArgs JSON
uniqueValues := extractUniqueValues(params.EncodedArgs, uniqueFields)
uniqueValues := structtag.ExtractValues(params.EncodedArgs, uniqueFields)

// Assemble the JSON object using bytes.Buffer
// Better to overallocate a bit than to allocate multiple times, so just
Expand Down
3 changes: 3 additions & 0 deletions rivershared/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ require (
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.2.0 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/text v0.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
8 changes: 8 additions & 0 deletions rivershared/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM=
github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dbunique
package structtag

import (
"fmt"
Expand All @@ -12,15 +12,8 @@ import (
"github.com/riverqueue/river/rivertype"
)

var (
// uniqueFieldsCache caches the unique fields for each JobArgs type. These are
// global to ensure that each struct type's tags are only extracted once.
uniqueFieldsCache = make(map[reflect.Type][]string) //nolint:gochecknoglobals
cacheMutex sync.RWMutex //nolint:gochecknoglobals
)

// extractUniqueValues extracts the raw JSON values of the specified keys from the JSON-encoded args.
func extractUniqueValues(encodedArgs []byte, uniqueKeys []string) []string {
// ExtractValues extracts the raw JSON values of the specified keys from the JSON-encoded args.
func ExtractValues(encodedArgs []byte, uniqueKeys []string) []string {
// Use GetManyBytes to retrieve multiple values at once
results := gjson.GetManyBytes(encodedArgs, uniqueKeys...)

Expand All @@ -29,23 +22,66 @@ func extractUniqueValues(encodedArgs []byte, uniqueKeys []string) []string {
if res.Exists() {
uniqueValues[i] = res.Raw // Use Raw to get the JSON-encoded value
} else {
// Handle missing keys as "undefined" (they'll be skipped when building
// the unique key). We don't want to use "null" here because the JSON may
// actually contain "null" as a value.
// Handle missing keys as "undefined" (they'll be skipped when
// building the key). We don't want to use "null" here because the
// JSON may actually contain "null" as a value.
uniqueValues[i] = "undefined"
}
}

return uniqueValues
}

// getSortedUniqueFields uses reflection to retrieve the JSON keys of fields
// marked with `river:"unique"` among potentially other comma-separated values.
// The return values are the JSON keys using the same logic as the `json` struct tag.
type uniqueFieldCacheKey struct {
typ reflect.Type
tagValue string
}

var (
// uniqueFieldsCache caches the unique fields for each JobArgs type. These are
// global to ensure that each struct type's tags are only extracted once.
uniqueFieldsCache = make(map[uniqueFieldCacheKey][]string) //nolint:gochecknoglobals
cacheMutex sync.RWMutex //nolint:gochecknoglobals
)

// SortedFieldsWithTag retrieves unique fields with caching to avoid
// extracting fields from the same struct type repeatedly.
func SortedFieldsWithTag(args rivertype.JobArgs, tagValue string) ([]string, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also renamed this from GetSortedTagFieldsCached to SortedFieldsWithTag. Seems a little better for a public API name.

var (
typ = reflect.TypeOf(args)
cacheKey = uniqueFieldCacheKey{typ: typ, tagValue: tagValue}
)

// Check cache first
cacheMutex.RLock()
if fields, ok := uniqueFieldsCache[cacheKey]; ok {
cacheMutex.RUnlock()
return fields, nil
}
cacheMutex.RUnlock()

// Not in cache; retrieve using reflection
fields, err := sortedFieldsWithTagUncached(reflect.TypeOf(args), tagValue, nil, make(map[reflect.Type]struct{}))
if err != nil {
return nil, err
}

// Store in cache
cacheMutex.Lock()
uniqueFieldsCache[cacheKey] = fields
cacheMutex.Unlock()

return fields, nil
}

// sortedFieldsWithTagUncached uses reflection to retrieve the JSON keys of fields
// marked with `river:"<tagValue>"` among potentially other comma-separated
// values. The return values are the JSON keys using the same logic as the
// `json` struct tag.
//
// typesSeen should be a map passed through to make sure that recursive types
// don't cause a stack overflow.
func getSortedUniqueFields(typ reflect.Type, path []string, typesSeen map[reflect.Type]struct{}) ([]string, error) {
func sortedFieldsWithTagUncached(typ reflect.Type, tagValue string, path []string, typesSeen map[reflect.Type]struct{}) ([]string, error) {
// Handle pointer to struct
if typ.Kind() == reflect.Ptr {
typ = typ.Elem()
Expand Down Expand Up @@ -93,7 +129,7 @@ func getSortedUniqueFields(typ reflect.Type, path []string, typesSeen map[reflec
if riverTag, ok := field.Tag.Lookup("river"); ok {
tags := strings.SplitSeq(riverTag, ",")
for tag := range tags {
if strings.TrimSpace(tag) == "unique" {
if strings.TrimSpace(tag) == tagValue {
hasUniqueTag = true
}
}
Expand All @@ -108,15 +144,15 @@ func getSortedUniqueFields(typ reflect.Type, path []string, typesSeen map[reflec
fullPath = append(path, uniqueName) //nolint:gocritic
}

uniqueSubFields, err := getSortedUniqueFields(field.Type, fullPath, typesSeen)
uniqueSubFields, err := sortedFieldsWithTagUncached(field.Type, tagValue, fullPath, typesSeen)
if err != nil {
return nil, err
}

if len(uniqueSubFields) > 0 {
uniqueFields = append(uniqueFields, uniqueSubFields...)
} else if hasUniqueTag {
// If a struct field is marked `river:"unique"`, use its entire
// If a struct field is marked `river:"<tagValue>"`, use its entire
// JSON serialization as a unique value. This may not be the
// greatest idea practically, but keeping it in place for
// backwards compatibility.
Expand All @@ -137,33 +173,6 @@ func getSortedUniqueFields(typ reflect.Type, path []string, typesSeen map[reflec
return uniqueFields, nil
}

// getSortedUniqueFieldsCached retrieves unique fields with caching to avoid
// extracting fields from the same struct type repeatedly.
func getSortedUniqueFieldsCached(args rivertype.JobArgs) ([]string, error) {
typ := reflect.TypeOf(args)

// Check cache first
cacheMutex.RLock()
if fields, ok := uniqueFieldsCache[typ]; ok {
cacheMutex.RUnlock()
return fields, nil
}
cacheMutex.RUnlock()

// Not in cache; retrieve using reflection
fields, err := getSortedUniqueFields(reflect.TypeOf(args), nil, make(map[reflect.Type]struct{}))
if err != nil {
return nil, err
}

// Store in cache
cacheMutex.Lock()
uniqueFieldsCache[typ] = fields
cacheMutex.Unlock()

return fields, nil
}

// parseJSONTag extracts the JSON key from the struct tag.
// It handles tags with options, e.g., `json:"recipient,omitempty"`.
func parseJSONTag(tag string) string {
Expand Down
Loading
Loading