Skip to content

Conversation

@lordgamez
Copy link
Contributor

@lordgamez lordgamez commented Nov 6, 2025

  • Create SSL certificates for each MiNiFi container for SSL tests

https://issues.apache.org/jira/browse/MINIFICPP-2666


Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically main)?

  • Is your initial contribution a single, squashed commit?

For code changes:

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file?
  • If applicable, have you updated the NOTICE file?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR migrates Kafka integration tests from the legacy test framework to the new modular docker-based testing framework, enabling SSL certificate generation for each MiNiFi container and modernizing the test infrastructure.

Key Changes:

  • Introduces a new KafkaServer container class with SSL/SASL authentication support
  • Migrates all Kafka test scenarios to use the new step definition framework
  • Adds SSL certificate generation utilities for secure communication testing

Reviewed Changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
extensions/kafka/tests/features/steps/kafka_server_container.py New KafkaServer container implementation with SSL/SASL configuration
extensions/kafka/tests/features/steps/steps.py Migrated Kafka test step definitions to new framework
extensions/kafka/tests/features/publishkafka.feature Updated PublishKafka scenarios to use new test framework
extensions/kafka/tests/features/consumekafka.feature Updated ConsumeKafka scenarios to use new test framework
extensions/kafka/tests/features/environment.py Added Kafka helper Docker image builder and scenario hooks
behave_framework/src/minifi_test_framework/core/ssl_utils.py New SSL certificate generation utilities
behave_framework/src/minifi_test_framework/containers/minifi_container.py Enhanced to generate SSL certificates for each container
behave_framework/src/minifi_test_framework/containers/container.py Added support for binary files and container lifecycle management
behave_framework/src/minifi_test_framework/steps/flow_building_steps.py Added property removal support and SSL context service setup
behave_framework/src/minifi_test_framework/steps/checking_steps.py Added regex matching and enhanced file verification capabilities
behave_framework/pyproject.toml Added SSL-related dependencies (m2crypto, pyopenssl, pyjks)
docker/test/integration/minifi/processors/ConsumeKafka.py Removed old ConsumeKafka processor class
docker/test/integration/cluster/containers/KafkaBrokerContainer.py Removed old KafkaBrokerContainer implementation
docker/test/integration/cluster/checkers/KafkaHelper.py Removed old KafkaHelper class
docker/RunBehaveTests.sh Added Kafka tests to the test execution script
Comments suppressed due to low confidence (6)

extensions/kafka/tests/features/steps/steps.py:91

  • There are duplicate function definitions with the same name step_impl in this file. Python will only keep the last definition, causing earlier step definitions to be overridden. Each step function should have a unique name.

For example, lines 32, 37, 67, 74, 81, 88 all define def step_impl(context): or similar signatures. These should be renamed to unique names like step_impl_kafka_server_setup, step_impl_consume_kafka_setup, etc.
extensions/kafka/tests/features/steps/steps.py:215

  • There are two different functions with the same name wait_for_consumer_registration defined on lines 203 and 211. Python will only keep the last definition, making the first function (line 203) unreachable. These functions should have unique names like wait_for_consumer_initial_registration and wait_for_consumer_reregistration.
    extensions/kafka/tests/features/steps/steps.py:22
  • Import of 'checking_steps' is not used.
    extensions/kafka/tests/features/steps/steps.py:23
  • Import of 'configuration_steps' is not used.
    extensions/kafka/tests/features/steps/steps.py:24
  • Import of 'core_steps' is not used.
    extensions/kafka/tests/features/steps/steps.py:25
  • Import of 'flow_building_steps' is not used.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@lordgamez lordgamez force-pushed the MINIFICPP-2666 branch 3 times, most recently from 9d8ce58 to 0f60d4a Compare November 17, 2025 13:17
@lordgamez lordgamez changed the base branch from MINIFICPP-2624_opcua to main November 21, 2025 08:39
Comment on lines +100 to +114
def start(self):
if self.container:
self.container.start()

def stop(self):
if self.container:
self.container.stop()

def kill(self):
if self.container:
self.container.kill()

def restart(self):
if self.container:
self.container.restart()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we log something if self.container does not exist? I assume that shouldn't happen normally, so a log would help with debugging if it does.


return os.path.join(temp_dir, os.path.basename(directory_path.strip('/')))
except Exception as e:
logging.error(f"Error extracting files from container: {e}")
Copy link
Contributor

@fgerlits fgerlits Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does e contain the container's name and directory_path? If it doesn't, then I would add these two to the error message.

Comment on lines +100 to +113
def create_topic(self, topic_name: str):
(code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {self.container_name}:9092"])
logging.info("Create topic output: %s", output)
return code == 0

def produce_message(self, topic_name: str, message: str):
(code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message}'"])
logging.info("Produce message output: %s", output)
return code == 0

def produce_message_with_key(self, topic_name: str, message: str, message_key: str):
(code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' --property 'parse.key=true' --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message_key}:{message}'"])
logging.info("Produce message with key output: %s", output)
return code == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log code, as well, please?

And a message with content "<message 2>" is published to the "ConsumeKafkaTest" topic with key "consume_kafka_test_key"

Then two flowfiles with the contents "<message 1>" and "<message 2>" are placed in the monitored directory in less than 45 seconds
Then the contents of "/tmp/output" in less than 30 seconds are: "<message 1>" and "<message 2>"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old phrasing sounds more natural and clearer to me: "happens in less than x seconds" requires an event ("flowfiles are placed"), not a state ("the contents are"). Can we change it back?

Examples: Topic names and formats to test
| message 1 | message 2 | topic names | topic name format |
| Ulysses | James Joyce | ConsumeKafkaTest | (not set) |
| The Great Gatsby | F. Scott Fitzgerald | ConsumeKafkaTest | Names |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was the first example line removed?

return code == 0

def produce_message(self, topic_name: str, message: str):
(code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message}'"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: I'd avoid bashisms and use a standard pipe.

Suggested change
(code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message}'"])
(code, output) = self.exec_run(["/bin/sh", "-c", f"echo '{message}' | /opt/kafka/bin/kafka-console-producer.sh --topic '{topic_name}' --bootstrap-server '{self.container_name}':9092"])

This will also likely fail on Windows, with @martinzink 's changes to run the tests on windows containers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats moot, because we wont be able to support this on windows anyways, there is no kafka server windows container and as of now we cant mix and match windows and linux containers

return code == 0

def produce_message_with_key(self, topic_name: str, message: str, message_key: str):
(code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' --property 'parse.key=true' --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message_key}:{message}'"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' --property 'parse.key=true' --topic {topic_name} --bootstrap-server {self.container_name}:9092 <<< '{message_key}:{message}'"])
(code, output) = self.exec_run(["/bin/sh", "-c", f" echo ''{message_key}:{message}'' | /opt/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' --property 'parse.key=true' --topic '{topic_name}' --bootstrap-server '{self.container_name}':9092"])

Comment on lines +336 to +339
if self.container.status == "running":
return self._verify_file_contents_in_running_container(directory_path, expected_contents)

return self._verify_file_contents_in_stopped_container(directory_path, expected_contents)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the container stops after the check, before/during scanning the directory?

context=None)

def create_topic(self, topic_name: str):
(code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {self.container_name}:9092"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: sh + quotes to handle more special characters

Suggested change
(code, output) = self.exec_run(["/bin/bash", "-c", f"/opt/kafka/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {self.container_name}:9092"])
(code, output) = self.exec_run(["/bin/sh", "-c", f"/opt/kafka/bin/kafka-topics.sh --create --topic '{topic_name}' --bootstrap-server '{self.container_name}':9092"])

Comment on lines +54 to +58
And RouteOnAttribute is EVENT_DRIVEN
And a LogAttribute processor
And LogAttribute is EVENT_DRIVEN
And a PutFile processor with the "Directory" property set to "/tmp/output"
And PutFile is EVENT_DRIVEN
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be default in the test framework to have non-source processors use event driven scheduling? to reduce boilerplate

And the publisher performs a <transaction type> transaction publishing to the "ConsumeKafkaTest" topic these messages: <messages sent>

Then <number of flowfiles expected> flowfiles are placed in the monitored directory in less than 15 seconds
Then there are <number of flowfiles expected> files in the "/tmp/output" directory in less than 15 seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fgerlits comment applies to this too: it's clearer to expect an event than a state, so the old version was better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

low-impact Test only or trivial change that's most likely not gonna introduce any new bugs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants