Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,10 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rko = (rd_kafka_op_t *)rkmessages[i]->_private;
rd_kafka_toppar_t *rktp = rko->rko_rktp;
int64_t offset = rkmessages[i]->offset + 1;
if (unlikely(rktp && (rktp->rktp_app_pos.offset < offset)))
/* Only update position for messages that are not EOF */
if (unlikely(rktp && (rktp->rktp_app_pos.offset < offset) &&
(rkmessages[i]->err !=
RD_KAFKA_RESP_ERR__PARTITION_EOF)))
rd_kafka_update_app_pos(
rk, rktp,
RD_KAFKA_FETCH_POS(
Expand Down
28 changes: 15 additions & 13 deletions src/rdkafka_ssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,9 @@ RD_EXPORT
#else
static
#endif
const char *
rd_kafka_ssl_normalize_hostname(const char *hostname,
char *normalized,
size_t size) {
const char *rd_kafka_ssl_normalize_hostname(const char *hostname,
char *normalized,
size_t size) {
size_t len;

rd_snprintf(normalized, size, "%s", hostname);
Expand Down Expand Up @@ -510,16 +509,19 @@ static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans,
if ((t = strrchr(name, ':')))
*t = '\0';

/* Normalize hostname (remove trailing dot) for both SNI and certificate verification */
/* Normalize hostname (remove trailing dot) for both SNI and certificate
* verification */
rd_kafka_ssl_normalize_hostname(name, name_for_verify,
sizeof(name_for_verify));
sizeof(name_for_verify));

#if (OPENSSL_VERSION_NUMBER >= 0x0090806fL) && !defined(OPENSSL_NO_TLSEXT)
/* If non-numerical hostname, send it for SNI */
if (!(/*ipv6*/ (strchr(name_for_verify, ':') &&
strspn(name_for_verify, "0123456789abcdefABCDEF:.[]%") ==
strlen(name_for_verify)) ||
/*ipv4*/ strspn(name_for_verify, "0123456789.") == strlen(name_for_verify)) &&
if (!(/*ipv6*/ (
strchr(name_for_verify, ':') &&
strspn(name_for_verify, "0123456789abcdefABCDEF:.[]%") ==
strlen(name_for_verify)) ||
/*ipv4*/ strspn(name_for_verify, "0123456789.") ==
strlen(name_for_verify)) &&
!SSL_set_tlsext_host_name(rktrans->rktrans_ssl, name_for_verify))
goto fail;
#endif
Expand All @@ -545,9 +547,9 @@ static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans,

param = SSL_get0_param(rktrans->rktrans_ssl);

if (!X509_VERIFY_PARAM_set1_host(param, name_for_verify,
strnlen(name_for_verify,
sizeof(name_for_verify))))
if (!X509_VERIFY_PARAM_set1_host(
param, name_for_verify,
strnlen(name_for_verify, sizeof(name_for_verify))))
goto fail;
}
#else
Expand Down
133 changes: 133 additions & 0 deletions tests/0137-barrier_batch_consume.c
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,145 @@ static void do_test_consume_batch_control_msgs(void) {
}


/**
* @brief Test that rd_kafka_consume_batch_queue correctly updates consumer
* position when EOF messages are received with
* enable.partition.eof=true.
*
* This is a regression test for the bug where EOF messages incorrectly
* advanced the consumer position by 2 instead of 1 (last_offset + 2 instead
* of last_offset + 1).
*/
static void do_test_consume_batch_eof_position(void) {
const char *topic;
rd_kafka_t *consumer;
rd_kafka_conf_t *conf;
rd_kafka_queue_t *rkq;
uint64_t testid;
const int partition_cnt = 1;
const int partition = 0;
const int produce_msg_cnt = 5;
const int consume_msg_cnt = 10;
const int timeout_ms = 5000;
const int session_timeout_s = 60;
const int replication_factor = -1;
const int topic_creation_timeout_ms = 5000;
rd_kafka_message_t **rkmessages;
int msg_cnt, i;
int64_t last_real_offset = -1;
int64_t eof_offset = -1;
int64_t position_after_eof;
rd_kafka_topic_partition_list_t *positions;
rd_kafka_resp_err_t err;
int eof_received = 0;

SUB_TEST("Testing EOF position with consume_batch_queue");

/* Create consumer configuration with enable.partition.eof=true */
test_conf_init(&conf, NULL, session_timeout_s);
test_conf_set(conf, "enable.auto.commit", "false");
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "enable.partition.eof", "true");

testid = test_id_generate();

topic = test_mk_topic_name("0137-barrier_batch_consume", 1);

/* Create topic */
test_create_topic_wait_exists(NULL, topic, partition_cnt,
replication_factor,
topic_creation_timeout_ms);
test_produce_msgs_easy(topic, testid, partition, produce_msg_cnt);

consumer = test_create_consumer(topic, NULL, conf, NULL);
test_consumer_subscribe(consumer, topic);
test_consumer_wait_assignment(consumer, rd_false);

/* Create generic consume queue */
rkq = rd_kafka_queue_get_consumer(consumer);

/* Consume messages in batches until we get EOF */
rkmessages = malloc(consume_msg_cnt * sizeof(*rkmessages));

while (!eof_received) {
msg_cnt = (int)rd_kafka_consume_batch_queue(
rkq, timeout_ms, rkmessages, consume_msg_cnt);

TEST_ASSERT(msg_cnt >= 0, "consume_batch_queue failed");

if (msg_cnt == 0) {
TEST_WARN("No messages received, retrying...");
continue;
}

/* Check if EOF messages are received */
for (i = 0; i < msg_cnt; i++) {
rd_kafka_message_t *rkm = rkmessages[i];

if (rkm->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
eof_received = 1;
eof_offset = rkm->offset;
TEST_SAY("Received EOF at offset %" PRId64 "\n",
eof_offset);
} else if (!rkm->err) {
last_real_offset = rkm->offset;
}
}

/* Destroy messages */
for (i = 0; i < msg_cnt; i++)
rd_kafka_message_destroy(rkmessages[i]);
}

rd_free(rkmessages);

/* Test that the last real message offset is the expected value */
TEST_ASSERT(last_real_offset == produce_msg_cnt - 1,
"Expected last message offset %" PRId64 ", got %" PRId64,
(int64_t)(produce_msg_cnt - 1), last_real_offset);

/* Get consumer position after EOF */
positions = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(positions, topic, partition);

err = rd_kafka_position(consumer, positions);
TEST_ASSERT(!err, "rd_kafka_position failed: %s",
rd_kafka_err2str(err));

/* Extract the position value from the partition list */
position_after_eof = positions->elems[0].offset;

TEST_SAY("Last real message offset: %" PRId64
"\n"
"EOF offset: %" PRId64
"\n"
"Position after EOF: %" PRId64 "\n",
last_real_offset, eof_offset, position_after_eof);

TEST_ASSERT(position_after_eof == last_real_offset + 1,
"Position after EOF should be %" PRId64
" (last_offset + 1), "
"but got %" PRId64
". This indicates the EOF offset bug where "
"position incorrectly advances by +2 instead of +1",
last_real_offset + 1, position_after_eof);

rd_kafka_topic_partition_list_destroy(positions);
rd_kafka_queue_destroy(rkq);
test_consumer_close(consumer);
rd_kafka_destroy(consumer);

SUB_TEST_PASS();
}


int main_0137_barrier_batch_consume(int argc, char **argv) {
do_test_consume_batch_with_seek();
do_test_consume_batch_store_offset();
do_test_consume_batch_with_pause_and_resume_different_batch();
do_test_consume_batch_with_pause_and_resume_same_batch();
do_test_consume_batch_control_msgs();
do_test_consume_batch_eof_position();

return 0;
}
6 changes: 3 additions & 3 deletions tests/0154-ssl_hostname_normalize.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@

/* Function under test (exposed via ENABLE_DEVEL) */
extern const char *rd_kafka_ssl_normalize_hostname(const char *hostname,
char *normalized,
size_t size);
char *normalized,
size_t size);


/**
Expand Down Expand Up @@ -96,7 +96,7 @@ static void test_hostname_normalize(void) {

/* Call the actual function under test */
result = rd_kafka_ssl_normalize_hostname(input, normalized,
sizeof(normalized));
sizeof(normalized));

TEST_SAYL(3, "Test case %d: %s\n", i + 1, desc);
TEST_SAYL(3, " Input: \"%s\"\n", input);
Expand Down