-
Notifications
You must be signed in to change notification settings - Fork 99
MINIFICPP-2666 Move Kafka tests to modular docker tests #2059
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
e172fa7 to
4c9af78
Compare
dc13c8e to
c95e615
Compare
4c9af78 to
2640d9b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR migrates Kafka integration tests from the legacy test framework to the new modular docker-based testing framework, enabling SSL certificate generation for each MiNiFi container and modernizing the test infrastructure.
Key Changes:
- Introduces a new
KafkaServercontainer class with SSL/SASL authentication support - Migrates all Kafka test scenarios to use the new step definition framework
- Adds SSL certificate generation utilities for secure communication testing
Reviewed Changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| extensions/kafka/tests/features/steps/kafka_server_container.py | New KafkaServer container implementation with SSL/SASL configuration |
| extensions/kafka/tests/features/steps/steps.py | Migrated Kafka test step definitions to new framework |
| extensions/kafka/tests/features/publishkafka.feature | Updated PublishKafka scenarios to use new test framework |
| extensions/kafka/tests/features/consumekafka.feature | Updated ConsumeKafka scenarios to use new test framework |
| extensions/kafka/tests/features/environment.py | Added Kafka helper Docker image builder and scenario hooks |
| behave_framework/src/minifi_test_framework/core/ssl_utils.py | New SSL certificate generation utilities |
| behave_framework/src/minifi_test_framework/containers/minifi_container.py | Enhanced to generate SSL certificates for each container |
| behave_framework/src/minifi_test_framework/containers/container.py | Added support for binary files and container lifecycle management |
| behave_framework/src/minifi_test_framework/steps/flow_building_steps.py | Added property removal support and SSL context service setup |
| behave_framework/src/minifi_test_framework/steps/checking_steps.py | Added regex matching and enhanced file verification capabilities |
| behave_framework/pyproject.toml | Added SSL-related dependencies (m2crypto, pyopenssl, pyjks) |
| docker/test/integration/minifi/processors/ConsumeKafka.py | Removed old ConsumeKafka processor class |
| docker/test/integration/cluster/containers/KafkaBrokerContainer.py | Removed old KafkaBrokerContainer implementation |
| docker/test/integration/cluster/checkers/KafkaHelper.py | Removed old KafkaHelper class |
| docker/RunBehaveTests.sh | Added Kafka tests to the test execution script |
Comments suppressed due to low confidence (6)
extensions/kafka/tests/features/steps/steps.py:91
- There are duplicate function definitions with the same name
step_implin this file. Python will only keep the last definition, causing earlier step definitions to be overridden. Each step function should have a unique name.
For example, lines 32, 37, 67, 74, 81, 88 all define def step_impl(context): or similar signatures. These should be renamed to unique names like step_impl_kafka_server_setup, step_impl_consume_kafka_setup, etc.
extensions/kafka/tests/features/steps/steps.py:215
- There are two different functions with the same name
wait_for_consumer_registrationdefined on lines 203 and 211. Python will only keep the last definition, making the first function (line 203) unreachable. These functions should have unique names likewait_for_consumer_initial_registrationandwait_for_consumer_reregistration.
extensions/kafka/tests/features/steps/steps.py:22 - Import of 'checking_steps' is not used.
extensions/kafka/tests/features/steps/steps.py:23 - Import of 'configuration_steps' is not used.
extensions/kafka/tests/features/steps/steps.py:24 - Import of 'core_steps' is not used.
extensions/kafka/tests/features/steps/steps.py:25 - Import of 'flow_building_steps' is not used.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
behave_framework/src/minifi_test_framework/containers/minifi_container.py
Show resolved
Hide resolved
extensions/kafka/tests/features/steps/kafka_server_container.py
Outdated
Show resolved
Hide resolved
9d8ce58 to
0f60d4a
Compare
0f60d4a to
985912c
Compare
| def start(self): | ||
| if self.container: | ||
| self.container.start() | ||
|
|
||
| def stop(self): | ||
| if self.container: | ||
| self.container.stop() | ||
|
|
||
| def kill(self): | ||
| if self.container: | ||
| self.container.kill() | ||
|
|
||
| def restart(self): | ||
| if self.container: | ||
| self.container.restart() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we log something if self.container does not exist? I assume that shouldn't happen normally, so a log would help with debugging if it does.
|
|
||
| return os.path.join(temp_dir, os.path.basename(directory_path.strip('/'))) | ||
| except Exception as e: | ||
| logging.error(f"Error extracting files from container: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does e contain the container's name and directory_path? If it doesn't, then I would add these two to the error message.
| def create_topic(self, topic_name: str): | ||
| (code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {self.container_name}:9092"]) | ||
| logging.info("Create topic output: %s", output) | ||
| return code == 0 | ||
|
|
||
| def produce_message(self, topic_name: str, message: str): | ||
| (code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message}'"]) | ||
| logging.info("Produce message output: %s", output) | ||
| return code == 0 | ||
|
|
||
| def produce_message_with_key(self, topic_name: str, message: str, message_key: str): | ||
| (code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' --property 'parse.key=true' --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message_key}:{message}'"]) | ||
| logging.info("Produce message with key output: %s", output) | ||
| return code == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we log code, as well, please?
| And a message with content "<message 2>" is published to the "ConsumeKafkaTest" topic with key "consume_kafka_test_key" | ||
|
|
||
| Then two flowfiles with the contents "<message 1>" and "<message 2>" are placed in the monitored directory in less than 45 seconds | ||
| Then the contents of "/tmp/output" in less than 30 seconds are: "<message 1>" and "<message 2>" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old phrasing sounds more natural and clearer to me: "happens in less than x seconds" requires an event ("flowfiles are placed"), not a state ("the contents are"). Can we change it back?
| Examples: Topic names and formats to test | ||
| | message 1 | message 2 | topic names | topic name format | | ||
| | Ulysses | James Joyce | ConsumeKafkaTest | (not set) | | ||
| | The Great Gatsby | F. Scott Fitzgerald | ConsumeKafkaTest | Names | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was the first example line removed?
| return code == 0 | ||
|
|
||
| def produce_message(self, topic_name: str, message: str): | ||
| (code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message}'"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: I'd avoid bashisms and use a standard pipe.
| (code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message}'"]) | |
| (code, output) = self.exec_run(["/bin/sh", "-c", f"echo '{message}' | /opt/kafka/bin/kafka-console-producer.sh --topic '{topic_name}' --bootstrap-server '{self.container_name}':9092"]) |
This will also likely fail on Windows, with @martinzink 's changes to run the tests on windows containers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats moot, because we wont be able to support this on windows anyways, there is no kafka server windows container and as of now we cant mix and match windows and linux containers
| return code == 0 | ||
|
|
||
| def produce_message_with_key(self, topic_name: str, message: str, message_key: str): | ||
| (code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' --property 'parse.key=true' --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message_key}:{message}'"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| (code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' --property 'parse.key=true' --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message_key}:{message}'"]) | |
| (code, output) = self.exec_run(["/bin/sh", "-c", f" echo ''{message_key}:{message}'' | /opt/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' --property 'parse.key=true' --topic '{topic_name}' --bootstrap-server '{self.container_name}':9092"]) |
| if self.container.status == "running": | ||
| return self._verify_file_contents_in_running_container(directory_path, expected_contents) | ||
|
|
||
| return self._verify_file_contents_in_stopped_container(directory_path, expected_contents) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the container stops after the check, before/during scanning the directory?
| context=None) | ||
|
|
||
| def create_topic(self, topic_name: str): | ||
| (code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {self.container_name}:9092"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: sh + quotes to handle more special characters
| (code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {self.container_name}:9092"]) | |
| (code, output) = self.exec_run(["/bin/sh", "-c", f"/opt/kafka/bin/kafka-topics.sh --create --topic '{topic_name}' --bootstrap-server '{self.container_name}':9092"]) |
| And RouteOnAttribute is EVENT_DRIVEN | ||
| And a LogAttribute processor | ||
| And LogAttribute is EVENT_DRIVEN | ||
| And a PutFile processor with the "Directory" property set to "/tmp/output" | ||
| And PutFile is EVENT_DRIVEN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be default in the test framework to have non-source processors use event driven scheduling? to reduce boilerplate
| And the publisher performs a <transaction type> transaction publishing to the "ConsumeKafkaTest" topic these messages: <messages sent> | ||
|
|
||
| Then <number of flowfiles expected> flowfiles are placed in the monitored directory in less than 15 seconds | ||
| Then there are <number of flowfiles expected> files in the "/tmp/output" directory in less than 15 seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fgerlits comment applies to this too: it's clearer to expect an event than a state, so the old version was better.
https://issues.apache.org/jira/browse/MINIFICPP-2666
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically main)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.