Skip to content

Commit a979478

Browse files
authored
grpc transport v2 (#167)
Signed-off-by: Wei Liu <[email protected]>
1 parent e18ff00 commit a979478

File tree

20 files changed

+1888
-93
lines changed

20 files changed

+1888
-93
lines changed

pkg/cloudevents/constants/constants.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,7 @@ const (
44
ConfigTypeMQTT = "mqtt"
55
ConfigTypeGRPC = "grpc"
66
)
7+
8+
// GRPCSubscriptionIDKey is the key for the gRPC subscription ID.
9+
// This ID is generated by the gRPC server after the client subscribes to it.
10+
const GRPCSubscriptionIDKey = "subscription-id"

pkg/cloudevents/generic/clients/agentclient_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func TestAgentResync(t *testing.T) {
7272

7373
go func() {
7474
transport := agent.(*CloudEventAgentClient[*generictesting.MockResource]).transport
75-
err = transport.Receive(ctx, func(event cloudevents.Event) {
75+
err = transport.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
7676
select {
7777
case eventChan <- receiveEvent{event: event}:
7878
case <-ctx.Done():
@@ -155,7 +155,7 @@ func TestAgentPublish(t *testing.T) {
155155
stop := make(chan bool)
156156
go func() {
157157
cloudEventsClient := agent.(*CloudEventAgentClient[*generictesting.MockResource]).transport
158-
err = cloudEventsClient.Receive(ctx, func(event cloudevents.Event) {
158+
err = cloudEventsClient.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
159159
select {
160160
case eventChan <- receiveEvent{event: event}:
161161
case <-ctx.Done():
@@ -325,7 +325,7 @@ func TestStatusResyncResponse(t *testing.T) {
325325

326326
go func() {
327327
cloudEventsClient := agent.(*CloudEventAgentClient[*generictesting.MockResource]).transport
328-
_ = cloudEventsClient.Receive(ctx, func(event cloudevents.Event) {
328+
_ = cloudEventsClient.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
329329
mutex.Lock()
330330
defer mutex.Unlock()
331331
receivedEvents = append(receivedEvents, event)

pkg/cloudevents/generic/clients/baseclient.go

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (c *baseClient) connect(ctx context.Context) error {
7979
// TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection
8080
// errors
8181
if err != nil {
82-
// failed to reconnect, try agin
82+
// failed to reconnect, try again
8383
runtime.HandleErrorWithContext(ctx, err, "the cloudevents client reconnect failed")
8484
<-wait.RealTimer(DelayFn()).C()
8585
continue
@@ -89,14 +89,11 @@ func (c *baseClient) connect(ctx context.Context) error {
8989
metrics.IncreaseClientReconnectedCounter(c.clientID)
9090
c.setClientReady(true)
9191
c.sendReceiverSignal(restartReceiverSignal)
92-
c.sendReconnectedSignal()
9392
}
9493

9594
select {
9695
case <-ctx.Done():
97-
if c.receiverChan != nil {
98-
close(c.receiverChan)
99-
}
96+
c.closeChannels()
10097
return
10198
case err, ok := <-c.transport.ErrorChan():
10299
if !ok {
@@ -164,22 +161,52 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
164161
return
165162
}
166163

164+
// send subscription request before starting to receive events
165+
if err := c.transport.Subscribe(ctx); err != nil {
166+
runtime.HandleErrorWithContext(ctx, err, "failed to subscribe")
167+
return
168+
}
169+
167170
c.receiverChan = make(chan int)
168171

169172
// start a go routine to handle cloudevents subscription
170173
go func() {
171-
receiverCtx, receiverCancel := context.WithCancel(context.TODO())
174+
receiverCtx, receiverCancel := context.WithCancel(ctx)
172175
startReceiving := true
176+
subscribed := true
173177

174178
for {
179+
if !subscribed {
180+
// resubscribe before restarting the receiver
181+
if err := c.transport.Subscribe(ctx); err != nil {
182+
if ctx.Err() != nil {
183+
receiverCancel()
184+
return
185+
}
186+
187+
runtime.HandleError(fmt.Errorf("failed to resubscribe, %v", err))
188+
select {
189+
case <-ctx.Done():
190+
receiverCancel()
191+
return
192+
case <-wait.RealTimer(DelayFn()).C():
193+
}
194+
continue
195+
}
196+
subscribed = true
197+
// notify the client caller to resync the resources
198+
c.sendReconnectedSignal(ctx)
199+
}
200+
175201
if startReceiving {
176202
go func() {
177-
if err := c.transport.Receive(receiverCtx, func(evt cloudevents.Event) {
203+
if err := c.transport.Receive(receiverCtx, func(ctx context.Context, evt cloudevents.Event) {
204+
logger := klog.FromContext(ctx)
178205
logger.V(2).Info("Received event", "event", evt.Context)
179206
if logger.V(5).Enabled() {
180207
logger.V(5).Info("Received event", "event", evt.String())
181208
}
182-
receive(receiverCtx, evt)
209+
receive(ctx, evt)
183210
}); err != nil {
184211
runtime.HandleError(fmt.Errorf("failed to receive cloudevents, %v", err))
185212
}
@@ -191,7 +218,7 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
191218
case <-ctx.Done():
192219
receiverCancel()
193220
return
194-
case signal, ok := <-c.receiverChan:
221+
case signal, ok := <-c.getReceiverChan():
195222
if !ok {
196223
// receiver channel is closed, stop the receiver
197224
receiverCancel()
@@ -202,8 +229,9 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
202229
case restartReceiverSignal:
203230
logger.V(2).Info("restart the cloudevents receiver")
204231
// rebuild the receiver context and restart receiving
205-
receiverCtx, receiverCancel = context.WithCancel(context.TODO())
232+
receiverCtx, receiverCancel = context.WithCancel(ctx)
206233
startReceiving = true
234+
subscribed = false
207235
case stopReceiverSignal:
208236
logger.V(2).Info("stop the cloudevents receiver")
209237
receiverCancel()
@@ -224,10 +252,32 @@ func (c *baseClient) sendReceiverSignal(signal int) {
224252
}
225253
}
226254

227-
func (c *baseClient) sendReconnectedSignal() {
255+
func (c *baseClient) closeChannels() {
256+
c.Lock()
257+
defer c.Unlock()
258+
259+
if c.receiverChan != nil {
260+
close(c.receiverChan)
261+
c.receiverChan = nil
262+
}
263+
if c.reconnectedChan != nil {
264+
close(c.reconnectedChan)
265+
c.reconnectedChan = nil
266+
}
267+
}
268+
269+
func (c *baseClient) sendReconnectedSignal(ctx context.Context) {
228270
c.RLock()
229271
defer c.RUnlock()
230-
c.reconnectedChan <- struct{}{}
272+
if c.reconnectedChan != nil {
273+
select {
274+
case c.reconnectedChan <- struct{}{}:
275+
// Signal sent successfully
276+
default:
277+
// No receiver listening on reconnectedChan, that's okay - don't block
278+
klog.FromContext(ctx).Info("reconnected signal not sent, no receiver listening")
279+
}
280+
}
231281
}
232282

233283
func (c *baseClient) isClientReady() bool {
@@ -241,3 +291,9 @@ func (c *baseClient) setClientReady(ready bool) {
241291
defer c.Unlock()
242292
c.clientReady = ready
243293
}
294+
295+
func (c *baseClient) getReceiverChan() chan int {
296+
c.RLock()
297+
defer c.RUnlock()
298+
return c.receiverChan
299+
}

pkg/cloudevents/generic/clients/sourceclient_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func TestSourceResync(t *testing.T) {
6161
eventChan := make(chan receiveEvent)
6262
stop := make(chan bool)
6363
go func() {
64-
err = source.transport.Receive(ctx, func(event cloudevents.Event) {
64+
err = source.transport.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
6565
select {
6666
case eventChan <- receiveEvent{event: event}:
6767
case <-ctx.Done():
@@ -134,7 +134,7 @@ func TestSourcePublish(t *testing.T) {
134134
eventChan := make(chan receiveEvent)
135135
stop := make(chan bool)
136136
go func() {
137-
err = source.transport.Receive(ctx, func(event cloudevents.Event) {
137+
err = source.transport.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
138138
select {
139139
case eventChan <- receiveEvent{event: event}:
140140
case <-ctx.Done():
@@ -332,7 +332,7 @@ func TestSpecResyncResponse(t *testing.T) {
332332
stop := make(chan bool)
333333
mutex := &sync.Mutex{}
334334
go func() {
335-
_ = source.transport.Receive(ctx, func(event cloudevents.Event) {
335+
_ = source.transport.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
336336
mutex.Lock()
337337
defer mutex.Unlock()
338338
receivedEvents = append(receivedEvents, event)

pkg/cloudevents/generic/options/builder/optionsbuilder.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
88
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
99
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
10+
grpcv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc"
1011
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
1112
)
1213

@@ -57,7 +58,7 @@ func BuildCloudEventsSourceOptions(config any,
5758
case *mqtt.MQTTOptions:
5859
return mqtt.NewSourceOptions(config, clientId, sourceId), nil
5960
case *grpc.GRPCOptions:
60-
return grpc.NewSourceOptions(config, sourceId, dataType), nil
61+
return grpcv2.NewSourceOptions(config, sourceId, dataType), nil
6162
default:
6263
return nil, fmt.Errorf("unsupported client configuration type %T", config)
6364
}
@@ -70,7 +71,7 @@ func BuildCloudEventsAgentOptions(config any,
7071
case *mqtt.MQTTOptions:
7172
return mqtt.NewAgentOptions(config, clusterName, clientId), nil
7273
case *grpc.GRPCOptions:
73-
return grpc.NewAgentOptions(config, clusterName, clientId, dataType), nil
74+
return grpcv2.NewAgentOptions(config, clusterName, clientId, dataType), nil
7475
default:
7576
return nil, fmt.Errorf("unsupported client configuration type %T", config)
7677
}

pkg/cloudevents/generic/options/builder/optionsbuilder_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package builder
22

33
import (
4-
"encoding/json"
54
"os"
6-
"strings"
5+
"reflect"
76
"testing"
87
"time"
98

@@ -31,10 +30,11 @@ url: grpc
3130
)
3231

3332
type buildingCloudEventsOptionTestCase struct {
34-
name string
35-
configType string
36-
configFile *os.File
37-
expectedOptions any
33+
name string
34+
configType string
35+
configFile *os.File
36+
expectedOptions any
37+
expectedTransportType string
3838
}
3939

4040
func TestBuildCloudEventsSourceOptions(t *testing.T) {
@@ -56,6 +56,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
5656
Timeout: 60 * time.Second,
5757
},
5858
},
59+
expectedTransportType: "*mqtt.mqttSourceTransport",
5960
},
6061
{
6162
name: "grpc config",
@@ -72,6 +73,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
7273
},
7374
},
7475
},
76+
expectedTransportType: "*grpc.grpcTransport",
7577
},
7678
}
7779

@@ -107,10 +109,9 @@ func assertOptions(t *testing.T, c buildingCloudEventsOptionTestCase) {
107109
t.Errorf("unexpected error %v", err)
108110
}
109111

110-
optionsRaw, _ := json.Marshal(options)
111-
expectedRaw, _ := json.Marshal(c.expectedOptions)
112+
tt := reflect.TypeOf(options.CloudEventsTransport)
112113

113-
if !strings.Contains(string(optionsRaw), string(expectedRaw)) {
114-
t.Errorf("the results %v\n does not contain the original options %v\n", string(optionsRaw), string(expectedRaw))
114+
if tt.String() != c.expectedTransportType {
115+
t.Errorf("expected %s, but got %s", c.expectedTransportType, tt)
115116
}
116117
}

pkg/cloudevents/generic/options/fake/fakeoptions.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ func (c *EventChan) Connect(ctx context.Context) error {
4343
return nil
4444
}
4545

46+
func (c *EventChan) Subscribe(ctx context.Context) error {
47+
return nil
48+
}
49+
4650
func (c *EventChan) Send(ctx context.Context, evt cloudevents.Event) error {
4751
select {
4852
case c.evtChan <- evt:
@@ -61,7 +65,7 @@ func (c *EventChan) Receive(ctx context.Context, fn options.ReceiveHandlerFn) er
6165
if !ok {
6266
return nil
6367
}
64-
fn(e)
68+
fn(ctx, e)
6569
case <-ctx.Done():
6670
return ctx.Err()
6771
}

pkg/cloudevents/generic/options/grpc/agentoptions.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type grpcAgentTransport struct {
2020
dataType types.CloudEventsDataType
2121
}
2222

23+
// Deprecated: use v2.grpc.NewAgentOptions instead
2324
func NewAgentOptions(grpcOptions *GRPCOptions,
2425
clusterName, agentID string, dataType types.CloudEventsDataType) *options.CloudEventsAgentOptions {
2526
return &options.CloudEventsAgentOptions{
@@ -82,6 +83,12 @@ func (o *grpcAgentTransport) Send(ctx context.Context, evt cloudevents.Event) er
8283
return nil
8384
}
8485

86+
func (o *grpcAgentTransport) Subscribe(ctx context.Context) error {
87+
// Subscription is handled by the cloudevents client during receiver startup.
88+
// No action needed here.
89+
return nil
90+
}
91+
8592
func (o *grpcAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
8693
return o.cloudEventsClient.StartReceiver(ctx, fn)
8794
}

pkg/cloudevents/generic/options/grpc/sourceoptions.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ func (o *gRPCSourceTransport) Send(ctx context.Context, evt cloudevents.Event) e
7878
return nil
7979
}
8080

81+
func (o *gRPCSourceTransport) Subscribe(ctx context.Context) error {
82+
// Subscription is handled by the cloudevents client during receiver startup.
83+
// No action needed here.
84+
return nil
85+
}
86+
8187
func (o *gRPCSourceTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
8288
return o.cloudEventsClient.StartReceiver(ctx, fn)
8389
}

pkg/cloudevents/generic/options/mqtt/agentoptions.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,14 @@ func (o *mqttAgentTransport) Send(ctx context.Context, evt cloudevents.Event) er
138138
return nil
139139
}
140140

141+
func (o *mqttAgentTransport) Subscribe(ctx context.Context) error {
142+
// Subscription is handled by the cloudevents client during receiver startup.
143+
// No action needed here.
144+
// TODO: consider implementing native subscription logic in v2 to decouple from
145+
// the CloudEvents SDK.
146+
return nil
147+
}
148+
141149
func (o *mqttAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
142150
return o.cloudEventsClient.StartReceiver(ctx, fn)
143151
}

0 commit comments

Comments
 (0)