Skip to content

Conversation

@JimmyWang6
Copy link
Contributor

This PR moves ShareAcquireMode to the consumer.internal package and addresses comments from #20246 (review)

@github-actions github-actions bot added triage PRs from the community core Kafka Broker consumer KIP-932 Queues for Kafka clients small Small PRs labels Nov 24, 2025
@AndrewJSchofield AndrewJSchofield added ci-approved and removed triage PRs from the community labels Nov 24, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Please update KIP-1206 to remove the enum.

@JimmyWang6
Copy link
Contributor Author

@AndrewJSchofield Thanks for the comment, already updated the KIP.

sharePartitionMetrics);
int delayMs = recordLockDurationMsOrDefault(groupConfigManager, groupId, defaultRecordLockDurationMs);
long lastOffset = acquiredRecords.firstOffset() + maxFetchRecords - 1;
sharePartitionMetrics.recordInFlightBatchMessageCount(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we please update the metric after the batch has been added to the cached state ? Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done.

@chirag-wadhwa5
Copy link
Collaborator

Thanks for the PR. LGTM !

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, some comments.

@Override
public String toString() {
return "ShareAcquireMode(" + name + " (" + id + "))";
return name;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my knowledge: Why this change?

Copy link
Contributor Author

@JimmyWang6 JimmyWang6 Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This originates from the comment at #20246 (comment), and also I find the valid values here from website seem a bit odd:
image

They appear to come directly from the Validator's toString() output, so it would be better to make the enum consistent and also fix this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for letting me know.

Comment on lines -1830 to 1837
acquiredRecords.setLastOffset(lastOffset);
inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset, delayMs);
updateFindNextFetchOffset(true);

cachedState.put(acquiredRecords.firstOffset(), inFlightBatch);
sharePartitionMetrics.recordInFlightBatchMessageCount(
acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1);
acquiredRecords.setLastOffset(lastOffset);
return List.of(acquiredRecords);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see any tests changes so are we testing the metrics in record limit mode, if we would be then we should have caught the issue earlier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — I did overlook the metrics-related test cases in record limit mode. I will add them. Thank you for pointing this out :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides this, the current inFlightBatchMessageCount records only batch-level inFlightMessageCount and may not accurately reflect per-offset counts; perhaps we should add an offset-level metric. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should add an offset-level metric.

Out of curiosity, is the difference between them equal to the limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @chirag-wadhwa5 noted in comment #20246 ,InFlightMessageCount currently reports only the total number of in-flight messages for the share partition and does not indicate how many records within that partition are in each state; for example, records in ACKNOWLEDGED or ARCHIVED should not be considered in-flight. Perhaps we could add a Gauge metric to track the number of records in each state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants