Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
*/
private final HeartbeatMetricsManager metricsManager;

private volatile boolean isClosed = false;

public static final String CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the new CONSUMER " +
"group protocol. Set group.protocol=classic on the consumer configs to revert to the CLASSIC protocol " +
"until the cluster is upgraded.";
Expand Down Expand Up @@ -257,6 +259,11 @@ public long maximumTimeToWait(long currentTimeMs) {
return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
}

@Override
public void signalClose() {
isClosed = true;
}

/**
* Reset the poll timer, indicating that the user has called consumer.poll(). If the member
* is in {@link MemberState#STALE} state due to expired poll timer, this will transition the
Expand Down Expand Up @@ -315,7 +322,10 @@ private NetworkClientDelegate.UnsentRequest logResponse(final NetworkClientDeleg
else
logger.error("{} failed because of {}: {}", heartbeatRequestName(), error, response);
} else {
logger.error("{} failed because of unexpected exception.", heartbeatRequestName(), exception);
if (isClosed) {
logger.debug("{} failed because of exception during close: {}", heartbeatRequestName(), exception);
} else
logger.error("{} failed because of unexpected exception: {}", heartbeatRequestName(), exception);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ private static List<StreamsGroupHeartbeatRequestData.TopicInfo> getChangelogTopi
*/
private final Timer pollTimer;

private volatile boolean isClosed = false;

public StreamsGroupHeartbeatRequestManager(final LogContext logContext,
final Time time,
final ConsumerConfig config,
Expand Down Expand Up @@ -417,6 +419,11 @@ public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
return EMPTY;
}

@Override
public void signalClose() {
isClosed = true;
}

public StreamsMembershipManager membershipManager() {
return membershipManager;
}
Expand Down Expand Up @@ -486,7 +493,10 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequestAndLogResponse(f
else
logger.error("StreamsGroupHeartbeatRequest failed because of {}: {}", error, response);
} else {
logger.error("StreamsGroupHeartbeatRequest failed because of unexpected exception.", exception);
if (isClosed) {
logger.debug("StreamsGroupHeartbeatRequest failed because of exception during close: ", exception);
} else
logger.error("StreamsGroupHeartbeatRequest failed because of unexpected exception: ", exception);
}
});
}
Expand Down