Skip to content

Commit 8d38aa7

Browse files
authored
Initiate Coordinator Reconnect w/ Backoff from Heartbeat Thread (#2695)
1 parent 7e38cf2 commit 8d38aa7

File tree

2 files changed

+4
-1
lines changed

2 files changed

+4
-1
lines changed

kafka/consumer/group.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,9 @@ def _poll_once(self, timer, max_records, update_offsets=True):
747747

748748
# We do not want to be stuck blocking in poll if we are missing some positions
749749
# since the offset lookup may be backing off after a failure
750-
poll_timeout_ms = min(timer.timeout_ms, self._coordinator.time_to_next_poll() * 1000)
750+
poll_timeout_ms = timer.timeout_ms
751+
if self.config['group_id'] is not None:
752+
poll_timeout_ms = min(poll_timeout_ms, self._coordinator.time_to_next_poll() * 1000)
751753
if not has_all_fetch_positions:
752754
log.debug('poll: do not have all fetch positions...')
753755
poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms'])

kafka/coordinator/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,6 +1198,7 @@ def _run_once(self):
11981198
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
11991199

12001200
elif not self.coordinator.connected():
1201+
self.coordinator._client.maybe_connect(self.coordinator.coordinator_id)
12011202
self.coordinator._client._lock.release()
12021203
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
12031204

0 commit comments

Comments
 (0)