11package pubsub
22
33import (
4- "context"
5- "fmt"
6-
7- "cloud.google.com/go/pubsub/v2"
8- cloudevents "github.com/cloudevents/sdk-go/v2"
9- "google.golang.org/api/option"
10- "google.golang.org/grpc"
11- "google.golang.org/grpc/credentials/insecure"
12- "k8s.io/klog/v2"
13-
144 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
15- "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
165)
176
18- var _ options.CloudEventTransport = & pubsubAgentTransport {}
19-
20- // pubsubAgentTransport is a CloudEventTransport implementation for Pub/Sub for agents.
21- type pubsubAgentTransport struct {
22- PubSubOptions
23- clusterName string
24- client * pubsub.Client
25- // Publisher for status updates
26- publisher * pubsub.Publisher
27- // Publisher for resync broadcasts
28- resyncPublisher * pubsub.Publisher
29- // Subscriber for spec updates
30- subscriber * pubsub.Subscriber
31- // Subscriber for resync broadcasts
32- resyncSubscriber * pubsub.Subscriber
33- // TODO: handle error channel
34- errorChan chan error
35- }
36-
377// NewAgentOptions creates a new CloudEventsAgentOptions for Pub/Sub.
388func NewAgentOptions (pubsubOptions * PubSubOptions ,
399 clusterName , agentID string ) * options.CloudEventsAgentOptions {
4010 return & options.CloudEventsAgentOptions {
41- CloudEventsTransport : & pubsubAgentTransport {
11+ CloudEventsTransport : & pubsubTransport {
4212 PubSubOptions : * pubsubOptions ,
4313 clusterName : clusterName ,
4414 errorChan : make (chan error ),
@@ -47,119 +17,3 @@ func NewAgentOptions(pubsubOptions *PubSubOptions,
4717 ClusterName : clusterName ,
4818 }
4919}
50-
51- func (o * pubsubAgentTransport ) Connect (ctx context.Context ) error {
52- options := []option.ClientOption {}
53- if o .CredentialsFile != "" {
54- options = append (options , option .WithCredentialsFile (o .CredentialsFile ))
55- }
56- if o .Endpoint != "" {
57- options = append (options , option .WithEndpoint (o .Endpoint ))
58- if o .CredentialsFile == "" {
59- // use the insecure connection for local development/testing when no credentials are provided
60- pubsubConn , err := grpc .NewClient (o .Endpoint , grpc .WithTransportCredentials (insecure .NewCredentials ()))
61- if err != nil {
62- return err
63- }
64- options = append (options , option .WithGRPCConn (pubsubConn ))
65- }
66- }
67-
68- client , err := pubsub .NewClient (ctx , o .ProjectID , options ... )
69- if err != nil {
70- return err
71- }
72-
73- // initialize pubsub client, publisher and subscriber
74- o .client = client
75- o .publisher = client .Publisher (o .Topics .AgentEvents )
76- o .resyncPublisher = client .Publisher (o .Topics .AgentBroadcast )
77- o .subscriber = client .Subscriber (o .Subscriptions .SourceEvents )
78- o .resyncSubscriber = client .Subscriber (o .Subscriptions .SourceBroadcast )
79-
80- return nil
81- }
82-
83- func (o * pubsubAgentTransport ) Send (ctx context.Context , evt cloudevents.Event ) error {
84- msg , err := Encode (evt )
85- if err != nil {
86- return err
87- }
88-
89- eventType , err := types .ParseCloudEventsType (evt .Context .GetType ())
90- if err != nil {
91- return fmt .Errorf ("unsupported event type %s, %v" , eventType , err )
92- }
93-
94- // determine publisher based on event type
95- var result * pubsub.PublishResult
96- if eventType .Action == types .ResyncRequestAction {
97- result = o .resyncPublisher .Publish (ctx , msg )
98- } else {
99- result = o .publisher .Publish (ctx , msg )
100- }
101-
102- // block until the result is returned
103- _ , err = result .Get (ctx )
104- return err
105- }
106-
107- func (o * pubsubAgentTransport ) Receive (ctx context.Context , fn options.ReceiveHandlerFn ) error {
108- // create error channels for both subscribers
109- subscriberErrChan := make (chan error , 1 )
110- resyncSubscriberErrChan := make (chan error , 1 )
111-
112- // start the subscriber for spec updates
113- go func () {
114- err := o .subscriber .Receive (ctx , func (ctx context.Context , msg * pubsub.Message ) {
115- evt , err := Decode (msg )
116- if err != nil {
117- // also send ACK on decode error since redelivery won't fix it.
118- klog .Errorf ("failed to decode pubsub message to resource spec event: %v" , err )
119- } else {
120- fn (evt )
121- }
122- // send ACK after all receiver handlers complete.
123- msg .Ack ()
124- })
125- if err != nil {
126- subscriberErrChan <- fmt .Errorf ("subscriber is interrupted by error: %w" , err )
127- }
128- }()
129-
130- // start the resync subscriber for broadcast messages
131- go func () {
132- err := o .resyncSubscriber .Receive (ctx , func (ctx context.Context , msg * pubsub.Message ) {
133- evt , err := Decode (msg )
134- if err != nil {
135- // also send ACK on decode error since redelivery won't fix it.
136- klog .Errorf ("failed to decode pubsub message to resource statusresync event: %v" , err )
137- } else {
138- fn (evt )
139- }
140- // send ACK after all receiver handlers complete.
141- msg .Ack ()
142- })
143- if err != nil {
144- resyncSubscriberErrChan <- fmt .Errorf ("resync subscriber is interrupted by error: %w" , err )
145- }
146- }()
147-
148- // wait for either subscriber to error or context cancellation
149- select {
150- case err := <- subscriberErrChan :
151- return err
152- case err := <- resyncSubscriberErrChan :
153- return err
154- case <- ctx .Done ():
155- return ctx .Err ()
156- }
157- }
158-
159- func (o * pubsubAgentTransport ) Close (ctx context.Context ) error {
160- return o .client .Close ()
161- }
162-
163- func (o * pubsubAgentTransport ) ErrorChan () <- chan error {
164- return o .errorChan
165- }
0 commit comments