Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2012,3 +2012,29 @@ def get_offset_shell(self, time=None, topic=None, partitions=None, topic_partiti

def java_class_name(self):
return "kafka\.Kafka"

def describe_consumer_group_members(self, group, node=None, command_config=None):
""" Describe a consumer group.
"""
if node is None:
node = self.nodes[0]
consumer_group_script = self.path.script("kafka-consumer-groups.sh", node)

if command_config is None:
command_config = ""
else:
command_config = "--command-config " + command_config

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
(consumer_group_script,
self.bootstrap_servers(self.security_protocol),
command_config, group)

cmd += " --members"

output_lines = []
for line in node.account.ssh_capture(cmd):
if not (line.startswith("SLF4J") or line.startswith("GROUP") or line.strip() == ""):
output_lines.append(line.strip())
return output_lines
26 changes: 18 additions & 8 deletions tests/kafkatest/tests/client/consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
producer = self.setup_producer(self.TOPIC)

producer.start()
self.await_produced_messages(producer)
self.await_produced_messages(producer, timeout_sec=120)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've seen starting a producer can take longer than expected but it does start so increasing the timeout to reduce flakiness


consumer = self.setup_consumer(self.TOPIC, static_membership=True, group_protocol=group_protocol)

Expand All @@ -333,7 +333,12 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
# Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join.
self.await_consumed_messages(consumer)
assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance"
assert len(consumer.joined_nodes()) == len(consumer.nodes)
try:
assert len(consumer.joined_nodes()) == len(consumer.nodes)
except AssertionError:
self.logger.debug("All members not in group %s. Describe output is %s", self.group_id,
" ".join(self.kafka.describe_consumer_group_members(self.group_id)))
raise
assert len(conflict_consumer.joined_nodes()) == 0

# Conflict consumers will terminate due to a fatal UnreleasedInstanceIdException error.
Expand All @@ -344,16 +349,21 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
timeout_sec=60,
err_msg="Timed out waiting for the consumer to shutdown")
err_msg="Timed out waiting for the consumer to shutdown. Describe output is %s" % " ".join(self.kafka.describe_consumer_group_members(self.group_id)))

# Wait until the group becomes empty to ensure the instance ID is released.
# We use the 50-second timeout because the consumer session timeout is 45 seconds.
# We use the 60-second timeout because the consumer session timeout is 45 seconds adding some time for latency.
wait_until(lambda: self.group_id in self.kafka.list_consumer_groups(state="empty"),
timeout_sec=50,
err_msg="Timed out waiting for the consumers to be removed from the group.")
timeout_sec=60,
err_msg="Timed out waiting for the consumers to be removed from the group. Describe output is %s." % " ".join(self.kafka.describe_consumer_group_members(self.group_id)))

conflict_consumer.start()
self.await_members(conflict_consumer, num_conflict_consumers)
try:
self.await_members(conflict_consumer, num_conflict_consumers)
except TimeoutError:
self.logger.debug("All conflict members not in group %s. Describe output is %s", self.group_id, " ".join(self.kafka.describe_consumer_group_members(self.group_id)))
raise


else:
consumer.start()
conflict_consumer.start()
Expand Down