Skip to content

Commit 425ed66

Browse files
Copilotaepfli
andcommitted
Add flagd-selector header support to in-process service with backward compatibility
Co-authored-by: aepfli <[email protected]>
1 parent e3b6e66 commit 425ed66

File tree

3 files changed

+214
-2
lines changed

3 files changed

+214
-2
lines changed

providers/flagd/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,25 @@ The flagd provider currently support following flag evaluation metadata,
189189
| `scope` | string | "selector" set for the associated source in flagd |
190190
| `providerID` | string | "providerID" set for the associated source in flagd |
191191

192+
## Selector Handling
193+
194+
When using the in-process resolver with a gRPC sync source, the provider supports filtering flag configurations using a selector. The selector can be configured using the `WithSelector` option or the `FLAGD_SOURCE_SELECTOR` environment variable.
195+
196+
### Header-based Selector (Recommended)
197+
198+
The provider now sends the selector as a `flagd-selector` gRPC metadata header when communicating with flagd sync services. This approach is consistent with how selectors are handled across all flagd services (sync, evaluation, and OFREP).
199+
200+
```go
201+
provider, err := flagd.NewProvider(
202+
flagd.WithInProcessResolver(),
203+
flagd.WithSelector("source=database,app=myapp"),
204+
)
205+
```
206+
207+
### Backward Compatibility
208+
209+
For backward compatibility with older flagd versions, the provider continues to include the selector in the gRPC request fields alongside the header. This dual approach ensures compatibility during the migration period until all flagd instances are updated.
210+
192211
## Logging
193212

194213
If not configured, logging falls back to the standard Go log package at error level only.

providers/flagd/pkg/service/in_process/service.go

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"go.uber.org/zap"
1111
googlegrpc "google.golang.org/grpc"
12+
"google.golang.org/grpc/metadata"
1213

1314
"github.com/open-feature/flagd/core/pkg/evaluator"
1415
"github.com/open-feature/flagd/core/pkg/logger"
@@ -320,14 +321,47 @@ func makeSyncProvider(cfg Configuration, log *logger.Logger) (sync.ISync, string
320321

321322
log.Info("operating in in-process mode with flags sourced from " + uri)
322323

324+
// Prepare gRPC dial options with selector interceptors
325+
var dialOptions []googlegrpc.DialOption
326+
327+
if len(cfg.GrpcDialOptionsOverride) > 0 {
328+
// User has provided custom dial options
329+
dialOptions = cfg.GrpcDialOptionsOverride
330+
331+
// Add interceptors to inject flagd-selector header if selector is configured
332+
if cfg.Selector != "" {
333+
dialOptions = append(dialOptions,
334+
googlegrpc.WithChainUnaryInterceptor(selectorUnaryInterceptor(cfg.Selector)),
335+
googlegrpc.WithChainStreamInterceptor(selectorStreamInterceptor(cfg.Selector)),
336+
)
337+
}
338+
} else if cfg.Selector != "" {
339+
// No custom dial options, but we need to add selector interceptors
340+
// Build the dial options including credentials and interceptors
341+
credBuilder := &credentials.CredentialBuilder{}
342+
tCredentials, err := credBuilder.Build(cfg.TLSEnabled, cfg.CertificatePath)
343+
if err != nil {
344+
log.Error("error building transport credentials, falling back to using Selector field only", zap.Error(err))
345+
// Fall back to not using GrpcDialOptionsOverride, which means only the Selector field will be used
346+
dialOptions = nil
347+
} else {
348+
dialOptions = []googlegrpc.DialOption{
349+
googlegrpc.WithTransportCredentials(tCredentials),
350+
googlegrpc.WithChainUnaryInterceptor(selectorUnaryInterceptor(cfg.Selector)),
351+
googlegrpc.WithChainStreamInterceptor(selectorStreamInterceptor(cfg.Selector)),
352+
}
353+
}
354+
}
355+
// else: No selector and no custom options, let grpc.Sync handle everything
356+
323357
return &grpc.Sync{
324358
CredentialBuilder: &credentials.CredentialBuilder{},
325-
GrpcDialOptionsOverride: cfg.GrpcDialOptionsOverride,
359+
GrpcDialOptionsOverride: dialOptions,
326360
Logger: log,
327361
Secure: cfg.TLSEnabled,
328362
CertPath: cfg.CertificatePath,
329363
ProviderID: cfg.ProviderID,
330-
Selector: cfg.Selector,
364+
Selector: cfg.Selector, // Keep for backward compatibility
331365
URI: uri,
332366
}, uri
333367
}
@@ -352,3 +386,37 @@ func isValidTargetScheme(targetUri string) bool {
352386
regx := regexp.MustCompile("^" + grpc.SupportedScheme)
353387
return regx.Match([]byte(targetUri))
354388
}
389+
390+
// selectorUnaryInterceptor adds the flagd-selector metadata header to unary gRPC calls
391+
func selectorUnaryInterceptor(selector string) googlegrpc.UnaryClientInterceptor {
392+
return func(
393+
ctx context.Context,
394+
method string,
395+
req, reply interface{},
396+
cc *googlegrpc.ClientConn,
397+
invoker googlegrpc.UnaryInvoker,
398+
opts ...googlegrpc.CallOption,
399+
) error {
400+
if selector != "" {
401+
ctx = metadata.AppendToOutgoingContext(ctx, "flagd-selector", selector)
402+
}
403+
return invoker(ctx, method, req, reply, cc, opts...)
404+
}
405+
}
406+
407+
// selectorStreamInterceptor adds the flagd-selector metadata header to streaming gRPC calls
408+
func selectorStreamInterceptor(selector string) googlegrpc.StreamClientInterceptor {
409+
return func(
410+
ctx context.Context,
411+
desc *googlegrpc.StreamDesc,
412+
cc *googlegrpc.ClientConn,
413+
method string,
414+
streamer googlegrpc.Streamer,
415+
opts ...googlegrpc.CallOption,
416+
) (googlegrpc.ClientStream, error) {
417+
if selector != "" {
418+
ctx = metadata.AppendToOutgoingContext(ctx, "flagd-selector", selector)
419+
}
420+
return streamer(ctx, desc, cc, method, opts...)
421+
}
422+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package process
2+
3+
import (
4+
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
5+
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
6+
"context"
7+
"fmt"
8+
"google.golang.org/grpc"
9+
"google.golang.org/grpc/metadata"
10+
"net"
11+
"testing"
12+
"time"
13+
)
14+
15+
// Test that the flagd-selector header is sent in gRPC metadata
16+
func TestSelectorHeaderIsSent(t *testing.T) {
17+
// given
18+
host := "localhost"
19+
port := 8091
20+
selector := "source=test,app=selector-test"
21+
headerReceived := make(chan string, 1)
22+
23+
listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
24+
if err != nil {
25+
t.Fatal(err)
26+
}
27+
28+
// Mock server that captures the flagd-selector header
29+
mockServer := &selectorHeaderCapturingServer{
30+
listener: listen,
31+
headerReceived: headerReceived,
32+
mockResponse: &v1.SyncFlagsResponse{
33+
FlagConfiguration: flagRsp,
34+
},
35+
}
36+
37+
inProcessService := NewInProcessService(Configuration{
38+
Host: host,
39+
Port: port,
40+
Selector: selector,
41+
TLSEnabled: false,
42+
})
43+
44+
// when
45+
go func() {
46+
server := grpc.NewServer()
47+
syncv1grpc.RegisterFlagSyncServiceServer(server, mockServer)
48+
if err := server.Serve(mockServer.listener); err != nil {
49+
t.Logf("Server exited with error: %v", err)
50+
}
51+
}()
52+
53+
// Initialize service
54+
err = inProcessService.Init()
55+
if err != nil {
56+
t.Fatal(err)
57+
}
58+
59+
// then - verify that the flagd-selector header was sent
60+
select {
61+
case receivedSelector := <-headerReceived:
62+
if receivedSelector != selector {
63+
t.Fatalf("Expected selector header to be %q, but got %q", selector, receivedSelector)
64+
}
65+
case <-time.After(3 * time.Second):
66+
t.Fatal("Timeout waiting for flagd-selector header to be received")
67+
}
68+
69+
inProcessService.Shutdown()
70+
}
71+
72+
// Mock server that captures the flagd-selector header from incoming requests
73+
type selectorHeaderCapturingServer struct {
74+
listener net.Listener
75+
headerReceived chan string
76+
mockResponse *v1.SyncFlagsResponse
77+
}
78+
79+
func (s *selectorHeaderCapturingServer) SyncFlags(req *v1.SyncFlagsRequest, stream syncv1grpc.FlagSyncService_SyncFlagsServer) error {
80+
// Extract metadata from context
81+
md, ok := metadata.FromIncomingContext(stream.Context())
82+
if ok {
83+
// Check for flagd-selector header
84+
if values := md.Get("flagd-selector"); len(values) > 0 {
85+
s.headerReceived <- values[0]
86+
} else {
87+
s.headerReceived <- ""
88+
}
89+
} else {
90+
s.headerReceived <- ""
91+
}
92+
93+
// Send mock response
94+
err := stream.Send(s.mockResponse)
95+
if err != nil {
96+
return err
97+
}
98+
99+
// Keep stream open for a bit
100+
time.Sleep(1 * time.Second)
101+
return nil
102+
}
103+
104+
func (s *selectorHeaderCapturingServer) FetchAllFlags(ctx context.Context, req *v1.FetchAllFlagsRequest) (*v1.FetchAllFlagsResponse, error) {
105+
// Extract metadata from context
106+
md, ok := metadata.FromIncomingContext(ctx)
107+
if ok {
108+
// Check for flagd-selector header
109+
if values := md.Get("flagd-selector"); len(values) > 0 {
110+
s.headerReceived <- values[0]
111+
} else {
112+
s.headerReceived <- ""
113+
}
114+
} else {
115+
s.headerReceived <- ""
116+
}
117+
118+
return &v1.FetchAllFlagsResponse{
119+
FlagConfiguration: flagRsp,
120+
}, nil
121+
}
122+
123+
func (s *selectorHeaderCapturingServer) GetMetadata(ctx context.Context, req *v1.GetMetadataRequest) (*v1.GetMetadataResponse, error) {
124+
return &v1.GetMetadataResponse{}, nil
125+
}

0 commit comments

Comments
 (0)