@@ -28,7 +28,6 @@ import (
2828 igrpclog "google.golang.org/grpc/internal/grpclog"
2929 "google.golang.org/grpc/internal/xds/clients"
3030 "google.golang.org/grpc/internal/xds/clients/internal/backoff"
31- "google.golang.org/grpc/internal/xds/clients/internal/buffer"
3231 "google.golang.org/grpc/internal/xds/clients/internal/pretty"
3332 "google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource"
3433
@@ -48,6 +47,13 @@ const (
4847 perRPCVerbosityLevel = 9
4948)
5049
50+ // request represents a queued request message to be sent on the ADS stream. It
51+ // contains the type of the resource and the list of resource names to be sent.
52+ type request struct {
53+ typ ResourceType
54+ resourceNames []string
55+ }
56+
5157// response represents a response received on the ADS stream. It contains the
5258// type URL, version, and resources for the response.
5359type response struct {
@@ -76,9 +82,7 @@ type adsStreamEventHandler interface {
7682type resourceTypeState struct {
7783 version string // Last acked version. Should not be reset when the stream breaks.
7884 nonce string // Last received nonce. Should be reset when the stream breaks.
79- bufferedRequests chan struct {} // Channel to buffer requests when writing is blocked.
8085 subscribedResources map [string ]* xdsresource.ResourceWatchState // Map of subscribed resource names to their state.
81- pendingWrite bool // True if there is a pending write for this resource type.
8286}
8387
8488// adsStreamImpl provides the functionality associated with an ADS (Aggregated
@@ -99,15 +103,16 @@ type adsStreamImpl struct {
99103 // The following fields are initialized in the constructor and are not
100104 // written to afterwards, and hence can be accessed without a mutex.
101105 streamCh chan clients.Stream // New ADS streams are pushed here.
102- requestCh * buffer.Unbounded // Subscriptions and unsubscriptions are pushed here.
103106 runnerDoneCh chan struct {} // Notify completion of runner goroutine.
104107 cancel context.CancelFunc // To cancel the context passed to the runner goroutine.
105108 fc * adsFlowControl // Flow control for ADS stream.
109+ notifySender chan struct {} // To notify the sending goroutine of a pending request.
106110
107111 // Guards access to the below fields (and to the contents of the map).
108112 mu sync.Mutex
109113 resourceTypeState map [ResourceType ]* resourceTypeState // Map of resource types to their state.
110114 firstRequest bool // False after the first request is sent out.
115+ pendingRequests []request // Subscriptions and unsubscriptions are pushed here.
111116}
112117
113118// adsStreamOpts contains the options for creating a new ADS Stream.
@@ -132,9 +137,9 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
132137 watchExpiryTimeout : opts .watchExpiryTimeout ,
133138
134139 streamCh : make (chan clients.Stream , 1 ),
135- requestCh : buffer .NewUnbounded (),
136140 runnerDoneCh : make (chan struct {}),
137141 fc : newADSFlowControl (),
142+ notifySender : make (chan struct {}, 1 ),
138143 resourceTypeState : make (map [ResourceType ]* resourceTypeState ),
139144 }
140145
@@ -151,76 +156,79 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
151156func (s * adsStreamImpl ) Stop () {
152157 s .cancel ()
153158 s .fc .stop ()
154- s .requestCh .Close ()
155159 <- s .runnerDoneCh
156160 s .logger .Infof ("Shutdown ADS stream" )
157161}
158162
159163// subscribe subscribes to the given resource. It is assumed that multiple
160164// subscriptions for the same resource is deduped at the caller. A discovery
161- // request is sent out on the underlying stream for the resource type when there
162- // is sufficient flow control quota .
165+ // request is sent out on the underlying stream, for the resource type with the
166+ // newly subscribed resource .
163167func (s * adsStreamImpl ) subscribe (typ ResourceType , name string ) {
164168 if s .logger .V (2 ) {
165169 s .logger .Infof ("Subscribing to resource %q of type %q" , name , typ .TypeName )
166170 }
167171
168172 s .mu .Lock ()
169- defer s .mu .Unlock ()
170-
171173 state , ok := s .resourceTypeState [typ ]
172174 if ! ok {
173175 // An entry in the type state map is created as part of the first
174176 // subscription request for this type.
175- state = & resourceTypeState {
176- subscribedResources : make (map [string ]* xdsresource.ResourceWatchState ),
177- bufferedRequests : make (chan struct {}, 1 ),
178- }
177+ state = & resourceTypeState {subscribedResources : make (map [string ]* xdsresource.ResourceWatchState )}
179178 s .resourceTypeState [typ ] = state
180179 }
181180
182181 // Create state for the newly subscribed resource. The watch timer will
183182 // be started when a request for this resource is actually sent out.
184183 state .subscribedResources [name ] = & xdsresource.ResourceWatchState {State : xdsresource .ResourceWatchStateStarted }
185- state .pendingWrite = true
186184
187185 // Send a request for the resource type with updated subscriptions.
188- s .requestCh .Put (typ )
186+ s .pendingRequests = append (s .pendingRequests , request {typ : typ , resourceNames : resourceNames (state .subscribedResources )})
187+ s .mu .Unlock ()
188+
189+ select {
190+ case s .notifySender <- struct {}{}:
191+ default :
192+ }
189193}
190194
191- // Unsubscribe cancels the subscription to the given resource. It is a no-op if
195+ // unsubscribe cancels the subscription to the given resource. It is a no-op if
192196// the given resource does not exist. The watch expiry timer associated with the
193197// resource is stopped if one is active. A discovery request is sent out on the
194- // stream for the resource type when there is sufficient flow control quota .
195- func (s * adsStreamImpl ) Unsubscribe (typ ResourceType , name string ) {
198+ // stream for the resource type with the updated set of resource names .
199+ func (s * adsStreamImpl ) unsubscribe (typ ResourceType , name string ) {
196200 if s .logger .V (2 ) {
197201 s .logger .Infof ("Unsubscribing to resource %q of type %q" , name , typ .TypeName )
198202 }
199203
200204 s .mu .Lock ()
201- defer s .mu .Unlock ()
202-
203205 state , ok := s .resourceTypeState [typ ]
204206 if ! ok {
207+ s .mu .Unlock ()
205208 return
206209 }
207-
208210 rs , ok := state .subscribedResources [name ]
209211 if ! ok {
212+ s .mu .Unlock ()
210213 return
211214 }
212215 if rs .ExpiryTimer != nil {
213216 rs .ExpiryTimer .Stop ()
214217 }
215218 delete (state .subscribedResources , name )
216- state .pendingWrite = true
217219
218220 // Send a request for the resource type with updated subscriptions.
219- s .requestCh .Put (typ )
221+ s .pendingRequests = append (s .pendingRequests , request {typ : typ , resourceNames : resourceNames (state .subscribedResources )})
222+ s .mu .Unlock ()
223+
224+ select {
225+ case s .notifySender <- struct {}{}:
226+ default :
227+ }
220228}
221229
222230// runner is a long-running goroutine that handles the lifecycle of the ADS
223- // stream. It spwans another goroutine to handle writes of discovery request
231+ // stream. It spawns another goroutine to handle writes of discovery request
224232// messages on the stream. Whenever an existing stream fails, it performs
225233// exponential backoff (if no messages were received on that stream) before
226234// creating a new stream.
@@ -280,53 +288,44 @@ func (s *adsStreamImpl) send(ctx context.Context) {
280288 stream = nil
281289 continue
282290 }
283- case req , ok := <- s .requestCh .Get ():
284- if ! ok {
285- return
291+ case <- s .notifySender :
292+ // If there's no stream yet, skip the request. This request will be resent
293+ // when a new stream is created. If no stream is created, the watcher will
294+ // timeout (same as server not sending response back).
295+ if stream == nil {
296+ continue
286297 }
287- s .requestCh .Load ()
288298
289- typ := req .(ResourceType )
290- if err := s .sendNew (stream , typ ); err != nil {
299+ // Resetting the pendingRequests slice to nil works for both cases:
300+ // - When we successfully sends the requests out on the wire.
301+ // - When sending fails. This can happen only when the stream fails,
302+ // and in this case, we rely on the `sendExisting` to send out
303+ // requests for all subscriptions when the stream is recreated.
304+ s .mu .Lock ()
305+ if err := s .sendNewLocked (stream , s .pendingRequests ); err != nil {
291306 stream = nil
292- continue
293307 }
308+ s .pendingRequests = nil
309+ s .mu .Unlock ()
294310 }
295311 }
296312}
297313
298- // sendNew attempts to send a discovery request based on a new subscription or
299- // unsubscription. If there is no flow control quota, the request is buffered
300- // and will be sent later. This method also starts the watch expiry timer for
301- // resources that were sent in the request for the first time, i.e. their watch
302- // state is `watchStateStarted`.
303- func (s * adsStreamImpl ) sendNew (stream clients.Stream , typ ResourceType ) error {
304- s .mu .Lock ()
305- defer s .mu .Unlock ()
306-
307- // If there's no stream yet, skip the request. This request will be resent
308- // when a new stream is created. If no stream is created, the watcher will
309- // timeout (same as server not sending response back).
310- if stream == nil {
311- return nil
312- }
313-
314- // If local processing of the most recently received response is not yet
315- // complete, i.e. fc.pending == true, queue this write and return early.
316- // This allows us to batch writes for requests which are generated as part
317- // of local processing of a received response.
318- state := s .resourceTypeState [typ ]
319- bufferRequest := func () {
320- select {
321- case state .bufferedRequests <- struct {}{}:
322- default :
314+ // sendNewLocked attempts to send a discovery request based on a new subscription or
315+ // unsubscription. This method also starts the watch expiry timer for resources
316+ // that were sent in the request for the first time, i.e. their watch state is
317+ // `watchStateStarted`.
318+ //
319+ // Caller needs to hold c.mu.
320+ func (s * adsStreamImpl ) sendNewLocked (stream clients.Stream , requests []request ) error {
321+ for _ , req := range requests {
322+ state := s .resourceTypeState [req .typ ]
323+ if err := s .sendMessageLocked (stream , req .resourceNames , req .typ .TypeURL , state .version , state .nonce , nil ); err != nil {
324+ return err
323325 }
326+ s .startWatchTimersLocked (req .typ , req .resourceNames )
324327 }
325- if s .fc .runIfPending (bufferRequest ) {
326- return nil
327- }
328-
329- return s .sendMessageIfWritePendingLocked (stream , typ , state )
328+ return nil
330329}
331330
332331// sendExisting sends out discovery requests for existing resources when
@@ -337,6 +336,10 @@ func (s *adsStreamImpl) sendExisting(stream clients.Stream) error {
337336 s .mu .Lock ()
338337 defer s .mu .Unlock ()
339338
339+ // Clear any queued requests. Previously subscribed to resources will be
340+ // resent below.
341+ s .pendingRequests = nil
342+
340343 for typ , state := range s .resourceTypeState {
341344 // Reset only the nonces map when the stream restarts.
342345 //
@@ -355,69 +358,15 @@ func (s *adsStreamImpl) sendExisting(stream clients.Stream) error {
355358 continue
356359 }
357360
358- state . pendingWrite = true
359- if err := s .sendMessageIfWritePendingLocked (stream , typ , state ); err != nil {
361+ names := resourceNames ( state . subscribedResources )
362+ if err := s .sendMessageLocked (stream , names , typ . TypeURL , state . version , state . nonce , nil ); err != nil {
360363 return err
361364 }
365+ s .startWatchTimersLocked (typ , names )
362366 }
363367 return nil
364368}
365369
366- // sendBuffered sends out discovery requests for resources that were buffered
367- // when they were subscribed to, because local processing of the previously
368- // received response was not yet complete.
369- //
370- // The stream argument is guaranteed to be non-nil.
371- func (s * adsStreamImpl ) sendBuffered (stream clients.Stream ) error {
372- s .mu .Lock ()
373- defer s .mu .Unlock ()
374-
375- for typ , state := range s .resourceTypeState {
376- select {
377- case <- state .bufferedRequests :
378- if err := s .sendMessageIfWritePendingLocked (stream , typ , state ); err != nil {
379- return err
380- }
381- default :
382- // No buffered request.
383- continue
384- }
385- }
386- return nil
387- }
388-
389- // sendMessageIfWritePendingLocked attempts to sends a discovery request to the
390- // server, if there is a pending write for the given resource type.
391- //
392- // If the request is successfully sent, the pending write field is cleared and
393- // watch timers are started for the resources in the request.
394- //
395- // Caller needs to hold c.mu.
396- func (s * adsStreamImpl ) sendMessageIfWritePendingLocked (stream clients.Stream , typ ResourceType , state * resourceTypeState ) error {
397- if ! state .pendingWrite {
398- if s .logger .V (2 ) {
399- s .logger .Infof ("Skipping sending request for type %q, because all subscribed resources were already sent" , typ .TypeURL )
400- }
401- return nil
402- }
403-
404- names := resourceNames (state .subscribedResources )
405- if err := s .sendMessageLocked (stream , names , typ .TypeURL , state .version , state .nonce , nil ); err != nil {
406- return err
407- }
408- state .pendingWrite = false
409-
410- // Drain the buffered requests channel because we just sent a request for this
411- // resource type.
412- select {
413- case <- state .bufferedRequests :
414- default :
415- }
416-
417- s .startWatchTimersLocked (typ , names )
418- return nil
419- }
420-
421370// sendMessageLocked sends a discovery request to the server, populating the
422371// different fields of the message with the given parameters. Returns a non-nil
423372// error if the request could not be sent.
@@ -467,11 +416,9 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
467416// recv is responsible for receiving messages from the ADS stream.
468417//
469418// It performs the following actions:
470- // - Waits for local flow control to be available before sending buffered
471- // requests, if any.
472- // - Receives a message from the ADS stream. If an error is encountered here,
473- // it is handled by the onError method which propagates the error to all
474- // watchers.
419+ // - Waits for local flow control to be available before it receives a message
420+ // from the ADS stream. If an error is encountered here, it is handled by
421+ // the onError method which propagates the error to all watchers.
475422// - Invokes the event handler's OnADSResponse method to process the message.
476423// - Sends an ACK or NACK to the server based on the response.
477424//
@@ -488,10 +435,6 @@ func (s *adsStreamImpl) recv(stream clients.Stream) bool {
488435 return msgReceived
489436 }
490437
491- // Send out a request if anything was buffered while we were waiting for
492- // local processing of the previous response to complete.
493- s .sendBuffered (stream )
494-
495438 resources , url , version , nonce , err := s .recvMessage (stream )
496439 if err != nil {
497440 s .onError (err , msgReceived )
@@ -760,23 +703,6 @@ func (fc *adsFlowControl) setPending(pending bool) {
760703 }
761704}
762705
763- func (fc * adsFlowControl ) runIfPending (f func ()) bool {
764- fc .mu .Lock ()
765- defer fc .mu .Unlock ()
766-
767- if fc .stopped {
768- return false
769- }
770-
771- // If there's a pending update, run the function while still holding the
772- // lock. This ensures that the pending state does not change between the
773- // check and the function call.
774- if fc .pending {
775- f ()
776- }
777- return fc .pending
778- }
779-
780706// wait blocks until all the watchers have consumed the most recent update.
781707// Returns true if the flow control was stopped while waiting, false otherwise.
782708func (fc * adsFlowControl ) wait () bool {
0 commit comments