diff --git a/internal/dbunique/db_unique.go b/internal/dbunique/db_unique.go index e942f8be..f69e2cea 100644 --- a/internal/dbunique/db_unique.go +++ b/internal/dbunique/db_unique.go @@ -9,6 +9,7 @@ import ( "github.com/tidwall/gjson" "github.com/tidwall/sjson" + "github.com/riverqueue/river/rivershared/structtag" "github.com/riverqueue/river/rivershared/uniquestates" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivershared/util/sliceutil" @@ -64,14 +65,14 @@ func buildUniqueKeyString(timeGen rivertype.TimeGenerator, uniqueOpts *UniqueOpt if uniqueOpts.ByArgs { var encodedArgsForUnique []byte // Get unique JSON keys from the JobArgs struct: - uniqueFields, err := getSortedUniqueFieldsCached(params.Args) + uniqueFields, err := structtag.SortedFieldsWithTag(params.Args, "unique") if err != nil { return "", err } if len(uniqueFields) > 0 { // Extract unique values from the EncodedArgs JSON - uniqueValues := extractUniqueValues(params.EncodedArgs, uniqueFields) + uniqueValues := structtag.ExtractValues(params.EncodedArgs, uniqueFields) // Assemble the JSON object using bytes.Buffer // Better to overallocate a bit than to allocate multiple times, so just diff --git a/rivershared/go.mod b/rivershared/go.mod index d1c90db6..eadd4f9c 100644 --- a/rivershared/go.mod +++ b/rivershared/go.mod @@ -21,6 +21,9 @@ require ( github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/tidwall/gjson v1.18.0 // indirect + github.com/tidwall/match v1.2.0 // indirect + github.com/tidwall/pretty v1.2.1 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/text v0.32.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/rivershared/go.sum b/rivershared/go.sum index 5193c6c4..8f51078c 100644 --- a/rivershared/go.sum +++ b/rivershared/go.sum @@ -32,6 +32,14 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM= +github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI= diff --git a/internal/dbunique/unique_fields.go b/rivershared/structtag/struct_tag.go similarity index 70% rename from internal/dbunique/unique_fields.go rename to rivershared/structtag/struct_tag.go index a8696108..69b88211 100644 --- a/internal/dbunique/unique_fields.go +++ b/rivershared/structtag/struct_tag.go @@ -1,4 +1,4 @@ -package dbunique +package structtag import ( "fmt" @@ -12,15 +12,8 @@ import ( "github.com/riverqueue/river/rivertype" ) -var ( - // uniqueFieldsCache caches the unique fields for each JobArgs type. These are - // global to ensure that each struct type's tags are only extracted once. - uniqueFieldsCache = make(map[reflect.Type][]string) //nolint:gochecknoglobals - cacheMutex sync.RWMutex //nolint:gochecknoglobals -) - -// extractUniqueValues extracts the raw JSON values of the specified keys from the JSON-encoded args. -func extractUniqueValues(encodedArgs []byte, uniqueKeys []string) []string { +// ExtractValues extracts the raw JSON values of the specified keys from the JSON-encoded args. +func ExtractValues(encodedArgs []byte, uniqueKeys []string) []string { // Use GetManyBytes to retrieve multiple values at once results := gjson.GetManyBytes(encodedArgs, uniqueKeys...) @@ -29,9 +22,9 @@ func extractUniqueValues(encodedArgs []byte, uniqueKeys []string) []string { if res.Exists() { uniqueValues[i] = res.Raw // Use Raw to get the JSON-encoded value } else { - // Handle missing keys as "undefined" (they'll be skipped when building - // the unique key). We don't want to use "null" here because the JSON may - // actually contain "null" as a value. + // Handle missing keys as "undefined" (they'll be skipped when + // building the key). We don't want to use "null" here because the + // JSON may actually contain "null" as a value. uniqueValues[i] = "undefined" } } @@ -39,13 +32,56 @@ func extractUniqueValues(encodedArgs []byte, uniqueKeys []string) []string { return uniqueValues } -// getSortedUniqueFields uses reflection to retrieve the JSON keys of fields -// marked with `river:"unique"` among potentially other comma-separated values. -// The return values are the JSON keys using the same logic as the `json` struct tag. +type uniqueFieldCacheKey struct { + typ reflect.Type + tagValue string +} + +var ( + // uniqueFieldsCache caches the unique fields for each JobArgs type. These are + // global to ensure that each struct type's tags are only extracted once. + uniqueFieldsCache = make(map[uniqueFieldCacheKey][]string) //nolint:gochecknoglobals + cacheMutex sync.RWMutex //nolint:gochecknoglobals +) + +// SortedFieldsWithTag retrieves unique fields with caching to avoid +// extracting fields from the same struct type repeatedly. +func SortedFieldsWithTag(args rivertype.JobArgs, tagValue string) ([]string, error) { + var ( + typ = reflect.TypeOf(args) + cacheKey = uniqueFieldCacheKey{typ: typ, tagValue: tagValue} + ) + + // Check cache first + cacheMutex.RLock() + if fields, ok := uniqueFieldsCache[cacheKey]; ok { + cacheMutex.RUnlock() + return fields, nil + } + cacheMutex.RUnlock() + + // Not in cache; retrieve using reflection + fields, err := sortedFieldsWithTagUncached(reflect.TypeOf(args), tagValue, nil, make(map[reflect.Type]struct{})) + if err != nil { + return nil, err + } + + // Store in cache + cacheMutex.Lock() + uniqueFieldsCache[cacheKey] = fields + cacheMutex.Unlock() + + return fields, nil +} + +// sortedFieldsWithTagUncached uses reflection to retrieve the JSON keys of fields +// marked with `river:""` among potentially other comma-separated +// values. The return values are the JSON keys using the same logic as the +// `json` struct tag. // // typesSeen should be a map passed through to make sure that recursive types // don't cause a stack overflow. -func getSortedUniqueFields(typ reflect.Type, path []string, typesSeen map[reflect.Type]struct{}) ([]string, error) { +func sortedFieldsWithTagUncached(typ reflect.Type, tagValue string, path []string, typesSeen map[reflect.Type]struct{}) ([]string, error) { // Handle pointer to struct if typ.Kind() == reflect.Ptr { typ = typ.Elem() @@ -93,7 +129,7 @@ func getSortedUniqueFields(typ reflect.Type, path []string, typesSeen map[reflec if riverTag, ok := field.Tag.Lookup("river"); ok { tags := strings.SplitSeq(riverTag, ",") for tag := range tags { - if strings.TrimSpace(tag) == "unique" { + if strings.TrimSpace(tag) == tagValue { hasUniqueTag = true } } @@ -108,7 +144,7 @@ func getSortedUniqueFields(typ reflect.Type, path []string, typesSeen map[reflec fullPath = append(path, uniqueName) //nolint:gocritic } - uniqueSubFields, err := getSortedUniqueFields(field.Type, fullPath, typesSeen) + uniqueSubFields, err := sortedFieldsWithTagUncached(field.Type, tagValue, fullPath, typesSeen) if err != nil { return nil, err } @@ -116,7 +152,7 @@ func getSortedUniqueFields(typ reflect.Type, path []string, typesSeen map[reflec if len(uniqueSubFields) > 0 { uniqueFields = append(uniqueFields, uniqueSubFields...) } else if hasUniqueTag { - // If a struct field is marked `river:"unique"`, use its entire + // If a struct field is marked `river:""`, use its entire // JSON serialization as a unique value. This may not be the // greatest idea practically, but keeping it in place for // backwards compatibility. @@ -137,33 +173,6 @@ func getSortedUniqueFields(typ reflect.Type, path []string, typesSeen map[reflec return uniqueFields, nil } -// getSortedUniqueFieldsCached retrieves unique fields with caching to avoid -// extracting fields from the same struct type repeatedly. -func getSortedUniqueFieldsCached(args rivertype.JobArgs) ([]string, error) { - typ := reflect.TypeOf(args) - - // Check cache first - cacheMutex.RLock() - if fields, ok := uniqueFieldsCache[typ]; ok { - cacheMutex.RUnlock() - return fields, nil - } - cacheMutex.RUnlock() - - // Not in cache; retrieve using reflection - fields, err := getSortedUniqueFields(reflect.TypeOf(args), nil, make(map[reflect.Type]struct{})) - if err != nil { - return nil, err - } - - // Store in cache - cacheMutex.Lock() - uniqueFieldsCache[typ] = fields - cacheMutex.Unlock() - - return fields, nil -} - // parseJSONTag extracts the JSON key from the struct tag. // It handles tags with options, e.g., `json:"recipient,omitempty"`. func parseJSONTag(tag string) string { diff --git a/rivershared/structtag/struct_tag_test.go b/rivershared/structtag/struct_tag_test.go new file mode 100644 index 00000000..3a49c574 --- /dev/null +++ b/rivershared/structtag/struct_tag_test.go @@ -0,0 +1,235 @@ +package structtag + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivertype" +) + +func TestExtractValues(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + encodedArgs []byte + uniqueKeys []string + expectedValues []string + }{ + { + name: "SimpleStringFields", + encodedArgs: []byte(`{"name":"alice","email":"alice@example.com","age":30}`), + uniqueKeys: []string{"name", "email"}, + expectedValues: []string{`"alice"`, `"alice@example.com"`}, + }, + { + name: "NumericField", + encodedArgs: []byte(`{"name":"bob","count":42}`), + uniqueKeys: []string{"count"}, + expectedValues: []string{"42"}, + }, + { + name: "BooleanField", + encodedArgs: []byte(`{"active":true,"disabled":false}`), + uniqueKeys: []string{"active", "disabled"}, + expectedValues: []string{"true", "false"}, + }, + { + name: "NestedField", + encodedArgs: []byte(`{"user":{"name":"charlie","address":{"city":"NYC"}}}`), + uniqueKeys: []string{"user.name", "user.address.city"}, + expectedValues: []string{`"charlie"`, `"NYC"`}, + }, + { + name: "MissingField", + encodedArgs: []byte(`{"name":"dave"}`), + uniqueKeys: []string{"name", "missing"}, + expectedValues: []string{`"dave"`, "undefined"}, + }, + { + name: "NullValue", + encodedArgs: []byte(`{"name":null}`), + uniqueKeys: []string{"name"}, + expectedValues: []string{"null"}, + }, + { + name: "ObjectValue", + encodedArgs: []byte(`{"user":{"id":1,"name":"eve"}}`), + uniqueKeys: []string{"user"}, + expectedValues: []string{`{"id":1,"name":"eve"}`}, + }, + { + name: "ArrayValue", + encodedArgs: []byte(`{"tags":["a","b","c"]}`), + uniqueKeys: []string{"tags"}, + expectedValues: []string{`["a","b","c"]`}, + }, + { + name: "EmptyKeys", + encodedArgs: []byte(`{"name":"frank"}`), + uniqueKeys: []string{}, + expectedValues: []string{}, + }, + { + name: "AllMissing", + encodedArgs: []byte(`{}`), + uniqueKeys: []string{"a", "b"}, + expectedValues: []string{"undefined", "undefined"}, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + actualValues := ExtractValues(tt.encodedArgs, tt.uniqueKeys) + require.Equal(t, tt.expectedValues, actualValues) + }) + } +} + +func TestSortedFieldsWithTag(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + argsFunc func() rivertype.JobArgs + tagValue string + expectedFields []string + }{ + { + name: "SimpleUniqueFields", + argsFunc: func() rivertype.JobArgs { + type SimpleArgs struct { + testutil.JobArgsReflectKind[SimpleArgs] + + Name string `json:"name" river:"unique"` + Email string `json:"email" river:"unique"` + Age int `json:"age"` + } + + return SimpleArgs{} + }, + tagValue: "unique", + expectedFields: []string{"email", "name"}, + }, + { + name: "NestedStructWithUniqueFields", + argsFunc: func() rivertype.JobArgs { + type Inner struct { + FieldA string `json:"field_a" river:"unique"` + FieldB int `json:"field_b"` + } + + type NestedOuter struct { + testutil.JobArgsReflectKind[NestedOuter] + + InnerField Inner `json:"inner_field"` + FieldC string `json:"field_c" river:"unique"` + FieldD int `json:"field_d" river:"unique,otheroption"` + } + + return NestedOuter{} + }, + tagValue: "unique", + expectedFields: []string{"field_c", "field_d", "inner_field.field_a"}, + }, + { + name: "NoUniqueFields", + argsFunc: func() rivertype.JobArgs { + type NoUniqueArgs struct { + testutil.JobArgsReflectKind[NoUniqueArgs] + + Name string `json:"name"` + Age int `json:"age"` + } + + return NoUniqueArgs{} + }, + tagValue: "unique", + expectedFields: nil, + }, + { + name: "NoJSONTagUsesFieldName", + argsFunc: func() rivertype.JobArgs { + type NoJSONTagArgs struct { + testutil.JobArgsReflectKind[NoJSONTagArgs] + + Name string `river:"unique"` + } + + return NoJSONTagArgs{} + }, + tagValue: "unique", + expectedFields: []string{"Name"}, + }, + { + name: "DifferentTagValue", + argsFunc: func() rivertype.JobArgs { + type DifferentTagArgs struct { + testutil.JobArgsReflectKind[DifferentTagArgs] + + Field1 string `json:"field1" river:"special"` + Field2 string `json:"field2" river:"unique"` + } + + return DifferentTagArgs{} + }, + tagValue: "special", + expectedFields: []string{"field1"}, + }, + { + name: "AnonymousEmbeddedStruct", + argsFunc: func() rivertype.JobArgs { + type EmbeddedBase struct { + BaseField string `json:"base_field" river:"unique"` + } + + type WithEmbeddedArgs struct { + testutil.JobArgsReflectKind[WithEmbeddedArgs] + EmbeddedBase + + OwnField string `json:"own_field" river:"unique"` + } + + return WithEmbeddedArgs{} + }, + tagValue: "unique", + expectedFields: []string{"base_field", "own_field"}, + }, + { + name: "DeeplyNestedStruct", + argsFunc: func() rivertype.JobArgs { + type DeepNested struct { + Value string `json:"value" river:"unique"` + } + + type MiddleLevel struct { + Deep DeepNested `json:"deep"` + } + + type DeepNestedArgs struct { + testutil.JobArgsReflectKind[DeepNestedArgs] + + Middle MiddleLevel `json:"middle"` + } + + return DeepNestedArgs{} + }, + tagValue: "unique", + expectedFields: []string{"middle.deep.value"}, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + actualFields, err := SortedFieldsWithTag(tt.argsFunc(), tt.tagValue) + require.NoError(t, err) + require.Equal(t, tt.expectedFields, actualFields) + }) + } +}