@@ -134,6 +134,7 @@ def send_fetches(self):
134134 future = self ._client .send (node_id , request , wakeup = False )
135135 future .add_callback (self ._handle_fetch_response , node_id , fetch_offsets , time .time ())
136136 future .add_errback (self ._handle_fetch_error , node_id )
137+ future .add_both (self ._clear_pending_fetch_request , node_id )
137138 futures .append (future )
138139 self ._fetch_futures .extend (futures )
139140 self ._clean_done_fetch_futures ()
@@ -610,36 +611,42 @@ def _create_fetch_requests(self):
610611 log .log (0 , "Skipping fetch for partition %s because node %s is throttled" ,
611612 partition , node_id )
612613
614+ elif not self ._client .ready (node_id ):
615+ # Until we support send request queues, any attempt to send to a not-ready node will be
616+ # immediately failed with NodeNotReadyError.
617+ log .debug ("Skipping fetch for partition %s because connection to leader node is not ready yet" )
618+
613619 elif node_id in self ._nodes_with_pending_fetch_requests :
614620 log .log (0 , "Skipping fetch for partition %s because there is a pending fetch request to node %s" ,
615621 partition , node_id )
616- continue
617622
618- if version < 5 :
619- partition_info = (
620- partition .partition ,
621- position .offset ,
622- self .config ['max_partition_fetch_bytes' ]
623- )
624- elif version <= 8 :
625- partition_info = (
626- partition .partition ,
627- position .offset ,
628- - 1 , # log_start_offset is used internally by brokers / replicas only
629- self .config ['max_partition_fetch_bytes' ],
630- )
631623 else :
632- partition_info = (
633- partition .partition ,
634- position .leader_epoch ,
635- position .offset ,
636- - 1 , # log_start_offset is used internally by brokers / replicas only
637- self .config ['max_partition_fetch_bytes' ],
638- )
639-
640- fetchable [node_id ][partition ] = partition_info
641- log .debug ("Adding fetch request for partition %s at offset %d" ,
642- partition , position .offset )
624+ # Leader is connected and does not have a pending fetch request
625+ if version < 5 :
626+ partition_info = (
627+ partition .partition ,
628+ position .offset ,
629+ self .config ['max_partition_fetch_bytes' ]
630+ )
631+ elif version <= 8 :
632+ partition_info = (
633+ partition .partition ,
634+ position .offset ,
635+ - 1 , # log_start_offset is used internally by brokers / replicas only
636+ self .config ['max_partition_fetch_bytes' ],
637+ )
638+ else :
639+ partition_info = (
640+ partition .partition ,
641+ position .leader_epoch ,
642+ position .offset ,
643+ - 1 , # log_start_offset is used internally by brokers / replicas only
644+ self .config ['max_partition_fetch_bytes' ],
645+ )
646+
647+ fetchable [node_id ][partition ] = partition_info
648+ log .debug ("Adding fetch request for partition %s at offset %d" ,
649+ partition , position .offset )
643650
644651 requests = {}
645652 for node_id , next_partitions in six .iteritems (fetchable ):
@@ -728,14 +735,18 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
728735
729736 if self ._sensors :
730737 self ._sensors .fetch_latency .record ((time .time () - send_time ) * 1000 )
731- self ._nodes_with_pending_fetch_requests .remove (node_id )
732738
733739 def _handle_fetch_error (self , node_id , exception ):
734740 level = logging .INFO if isinstance (exception , Errors .Cancelled ) else logging .ERROR
735741 log .log (level , 'Fetch to node %s failed: %s' , node_id , exception )
736742 if node_id in self ._session_handlers :
737743 self ._session_handlers [node_id ].handle_error (exception )
738- self ._nodes_with_pending_fetch_requests .remove (node_id )
744+
745+ def _clear_pending_fetch_request (self , node_id , _ ):
746+ try :
747+ self ._nodes_with_pending_fetch_requests .remove (node_id )
748+ except KeyError :
749+ pass
739750
740751 def _parse_fetched_data (self , completed_fetch ):
741752 tp = completed_fetch .topic_partition
0 commit comments