Skip to content

Commit 9a893b3

Browse files
committed
MINOR: Address flaky wait for producer acks logging for checking when not al… (#20957)
Adding the following for some system tests - Increased timeout for `await_produced_messages` - Added more logging for `test_fencing_consumer` which has exhibited flakiness and is hard to reproduce Reviewers: Lianet Magrans<[email protected]>
1 parent e8397fc commit 9a893b3

File tree

2 files changed

+44
-8
lines changed

2 files changed

+44
-8
lines changed

tests/kafkatest/services/kafka/kafka.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2012,3 +2012,29 @@ def get_offset_shell(self, time=None, topic=None, partitions=None, topic_partiti
20122012

20132013
def java_class_name(self):
20142014
return "kafka\.Kafka"
2015+
2016+
def describe_consumer_group_members(self, group, node=None, command_config=None):
2017+
""" Describe a consumer group.
2018+
"""
2019+
if node is None:
2020+
node = self.nodes[0]
2021+
consumer_group_script = self.path.script("kafka-consumer-groups.sh", node)
2022+
2023+
if command_config is None:
2024+
command_config = ""
2025+
else:
2026+
command_config = "--command-config " + command_config
2027+
2028+
cmd = fix_opts_for_new_jvm(node)
2029+
cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
2030+
(consumer_group_script,
2031+
self.bootstrap_servers(self.security_protocol),
2032+
command_config, group)
2033+
2034+
cmd += " --members"
2035+
2036+
output_lines = []
2037+
for line in node.account.ssh_capture(cmd):
2038+
if not (line.startswith("SLF4J") or line.startswith("GROUP") or line.strip() == ""):
2039+
output_lines.append(line.strip())
2040+
return output_lines

tests/kafkatest/tests/client/consumer_test.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
306306
producer = self.setup_producer(self.TOPIC)
307307

308308
producer.start()
309-
self.await_produced_messages(producer)
309+
self.await_produced_messages(producer, timeout_sec=120)
310310

311311
consumer = self.setup_consumer(self.TOPIC, static_membership=True, group_protocol=group_protocol)
312312

@@ -333,7 +333,12 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
333333
# Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join.
334334
self.await_consumed_messages(consumer)
335335
assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance"
336-
assert len(consumer.joined_nodes()) == len(consumer.nodes)
336+
try:
337+
assert len(consumer.joined_nodes()) == len(consumer.nodes)
338+
except AssertionError:
339+
self.logger.debug("All members not in group %s. Describe output is %s", self.group_id,
340+
" ".join(self.kafka.describe_consumer_group_members(self.group_id)))
341+
raise
337342
assert len(conflict_consumer.joined_nodes()) == 0
338343

339344
# Conflict consumers will terminate due to a fatal UnreleasedInstanceIdException error.
@@ -344,16 +349,21 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
344349
consumer.stop_all()
345350
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
346351
timeout_sec=60,
347-
err_msg="Timed out waiting for the consumer to shutdown")
352+
err_msg="Timed out waiting for the consumer to shutdown. Describe output is %s" % " ".join(self.kafka.describe_consumer_group_members(self.group_id)))
353+
348354
# Wait until the group becomes empty to ensure the instance ID is released.
349-
# We use the 50-second timeout because the consumer session timeout is 45 seconds.
355+
# We use the 60-second timeout because the consumer session timeout is 45 seconds adding some time for latency.
350356
wait_until(lambda: self.group_id in self.kafka.list_consumer_groups(state="empty"),
351-
timeout_sec=50,
352-
err_msg="Timed out waiting for the consumers to be removed from the group.")
357+
timeout_sec=60,
358+
err_msg="Timed out waiting for the consumers to be removed from the group. Describe output is %s." % " ".join(self.kafka.describe_consumer_group_members(self.group_id)))
359+
353360
conflict_consumer.start()
354-
self.await_members(conflict_consumer, num_conflict_consumers)
361+
try:
362+
self.await_members(conflict_consumer, num_conflict_consumers)
363+
except TimeoutError:
364+
self.logger.debug("All conflict members not in group %s. Describe output is %s", self.group_id, " ".join(self.kafka.describe_consumer_group_members(self.group_id)))
365+
raise
355366

356-
357367
else:
358368
consumer.start()
359369
conflict_consumer.start()

0 commit comments

Comments
 (0)