-
Notifications
You must be signed in to change notification settings - Fork 14.8k
MINOR: Move ShareAcquireMode to consumer.internal package #20973
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Please update KIP-1206 to remove the enum.
|
@AndrewJSchofield Thanks for the comment, already updated the KIP. |
| sharePartitionMetrics); | ||
| int delayMs = recordLockDurationMsOrDefault(groupConfigManager, groupId, defaultRecordLockDurationMs); | ||
| long lastOffset = acquiredRecords.firstOffset() + maxFetchRecords - 1; | ||
| sharePartitionMetrics.recordInFlightBatchMessageCount( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we please update the metric after the batch has been added to the cached state ? Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, done.
|
Thanks for the PR. LGTM ! |
apoorvmittal10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, some comments.
| @Override | ||
| public String toString() { | ||
| return "ShareAcquireMode(" + name + " (" + id + "))"; | ||
| return name; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my knowledge: Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This originates from the comment at #20246 (comment), and also I find the valid values here from website seem a bit odd:

They appear to come directly from the Validator's toString() output, so it would be better to make the enum consistent and also fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for letting me know.
| acquiredRecords.setLastOffset(lastOffset); | ||
| inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset, delayMs); | ||
| updateFindNextFetchOffset(true); | ||
|
|
||
| cachedState.put(acquiredRecords.firstOffset(), inFlightBatch); | ||
| sharePartitionMetrics.recordInFlightBatchMessageCount( | ||
| acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1); | ||
| acquiredRecords.setLastOffset(lastOffset); | ||
| return List.of(acquiredRecords); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not see any tests changes so are we testing the metrics in record limit mode, if we would be then we should have caught the issue earlier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right — I did overlook the metrics-related test cases in record limit mode. I will add them. Thank you for pointing this out :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides this, the current inFlightBatchMessageCount records only batch-level inFlightMessageCount and may not accurately reflect per-offset counts; perhaps we should add an offset-level metric. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps we should add an offset-level metric.
Out of curiosity, is the difference between them equal to the limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @chirag-wadhwa5 noted in comment #20246 ,InFlightMessageCount currently reports only the total number of in-flight messages for the share partition and does not indicate how many records within that partition are in each state; for example, records in ACKNOWLEDGED or ARCHIVED should not be considered in-flight. Perhaps we could add a Gauge metric to track the number of records in each state.
This PR moves
ShareAcquireModeto theconsumer.internalpackage and addresses comments from #20246 (review)