diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index cba2b65cbba7d..b5d81c681192e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -97,6 +97,8 @@ public abstract class AbstractHeartbeatRequestManager getChangelogTopi */ private final Timer pollTimer; + private volatile boolean isClosed = false; + public StreamsGroupHeartbeatRequestManager(final LogContext logContext, final Time time, final ConsumerConfig config, @@ -417,6 +419,11 @@ public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) { return EMPTY; } + @Override + public void signalClose() { + isClosed = true; + } + public StreamsMembershipManager membershipManager() { return membershipManager; } @@ -486,7 +493,10 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequestAndLogResponse(f else logger.error("StreamsGroupHeartbeatRequest failed because of {}: {}", error, response); } else { - logger.error("StreamsGroupHeartbeatRequest failed because of unexpected exception.", exception); + if (isClosed) { + logger.debug("StreamsGroupHeartbeatRequest failed because of exception during close: ", exception); + } else + logger.error("StreamsGroupHeartbeatRequest failed because of unexpected exception: ", exception); } }); }