Skip to content

Commit 207e4d6

Browse files
committed
MINOR: Address flaky wait for producer acks logging for checking when not al… (#20957)
Adding the following for some system tests - Increased timeout for `await_produced_messages` - Added more logging for `test_fencing_consumer` which has exhibited flakiness and is hard to reproduce Reviewers: Lianet Magrans<[email protected]>
1 parent dbd32d8 commit 207e4d6

File tree

2 files changed

+47
-6
lines changed

2 files changed

+47
-6
lines changed

tests/kafkatest/services/kafka/kafka.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2023,3 +2023,29 @@ def get_offset_shell(self, time=None, topic=None, partitions=None, topic_partiti
20232023

20242024
def java_class_name(self):
20252025
return "kafka\.Kafka"
2026+
2027+
def describe_consumer_group_members(self, group, node=None, command_config=None):
2028+
""" Describe a consumer group.
2029+
"""
2030+
if node is None:
2031+
node = self.nodes[0]
2032+
consumer_group_script = self.path.script("kafka-consumer-groups.sh", node)
2033+
2034+
if command_config is None:
2035+
command_config = ""
2036+
else:
2037+
command_config = "--command-config " + command_config
2038+
2039+
cmd = fix_opts_for_new_jvm(node)
2040+
cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
2041+
(consumer_group_script,
2042+
self.bootstrap_servers(self.security_protocol),
2043+
command_config, group)
2044+
2045+
cmd += " --members"
2046+
2047+
output_lines = []
2048+
for line in node.account.ssh_capture(cmd):
2049+
if not (line.startswith("SLF4J") or line.startswith("GROUP") or line.strip() == ""):
2050+
output_lines.append(line.strip())
2051+
return output_lines

tests/kafkatest/tests/client/consumer_test.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
297297
producer = self.setup_producer(self.TOPIC)
298298

299299
producer.start()
300-
self.await_produced_messages(producer)
300+
self.await_produced_messages(producer, timeout_sec=120)
301301

302302
consumer = self.setup_consumer(self.TOPIC, static_membership=True, group_protocol=group_protocol)
303303

@@ -324,18 +324,33 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
324324
# Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join.
325325
self.await_consumed_messages(consumer)
326326
assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance"
327-
assert len(consumer.joined_nodes()) == len(consumer.nodes)
327+
try:
328+
assert len(consumer.joined_nodes()) == len(consumer.nodes)
329+
except AssertionError:
330+
self.logger.debug("All members not in group %s. Describe output is %s", self.group_id,
331+
" ".join(self.kafka.describe_consumer_group_members(self.group_id)))
332+
raise
328333
assert len(conflict_consumer.joined_nodes()) == 0
329-
334+
330335
# Stop existing nodes, so conflicting ones should be able to join.
331336
consumer.stop_all()
332337
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
333338
timeout_sec=60,
334-
err_msg="Timed out waiting for the consumer to shutdown")
339+
err_msg="Timed out waiting for the consumer to shutdown. Describe output is %s" % " ".join(self.kafka.describe_consumer_group_members(self.group_id)))
340+
341+
# Wait until the group becomes empty to ensure the instance ID is released.
342+
# We use the 60-second timeout because the consumer session timeout is 45 seconds adding some time for latency.
343+
wait_until(lambda: self.group_id in self.kafka.list_consumer_groups(state="empty"),
344+
timeout_sec=60,
345+
err_msg="Timed out waiting for the consumers to be removed from the group. Describe output is %s." % " ".join(self.kafka.describe_consumer_group_members(self.group_id)))
346+
335347
conflict_consumer.start()
336-
self.await_members(conflict_consumer, num_conflict_consumers)
348+
try:
349+
self.await_members(conflict_consumer, num_conflict_consumers)
350+
except TimeoutError:
351+
self.logger.debug("All conflict members not in group %s. Describe output is %s", self.group_id, " ".join(self.kafka.describe_consumer_group_members(self.group_id)))
352+
raise
337353

338-
339354
else:
340355
consumer.start()
341356
conflict_consumer.start()

0 commit comments

Comments
 (0)