Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand All @@ -29,7 +29,7 @@ public enum ShareAcquireMode {

public final String name;

public final byte id;
final byte id;

ShareAcquireMode(final String name, final byte id) {
this.name = name;
Expand Down Expand Up @@ -65,7 +65,7 @@ public static ShareAcquireMode forId(byte id) {

@Override
public String toString() {
return "ShareAcquireMode(" + name + " (" + id + "))";
return name;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my knowledge: Why this change?

Copy link
Contributor Author

@JimmyWang6 JimmyWang6 Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This originates from the comment at #20246 (comment), and also I find the valid values here from website seem a bit odd:
image

They appear to come directly from the Validator's toString() output, so it would be better to make the enum consistent and also fix this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for letting me know.

}

public static class Validator implements ConfigDef.Validator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.common.IsolationLevel;

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import kafka.server.share.SharePartitionManager.SharePartitionListener;

import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
Expand Down Expand Up @@ -1827,13 +1827,13 @@ private List<AcquiredRecords> createBatches(
sharePartitionMetrics);
int delayMs = recordLockDurationMsOrDefault(groupConfigManager, groupId, defaultRecordLockDurationMs);
long lastOffset = acquiredRecords.firstOffset() + maxFetchRecords - 1;
acquiredRecords.setLastOffset(lastOffset);
inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset, delayMs);
updateFindNextFetchOffset(true);

cachedState.put(acquiredRecords.firstOffset(), inFlightBatch);
sharePartitionMetrics.recordInFlightBatchMessageCount(
acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1);
acquiredRecords.setLastOffset(lastOffset);
return List.of(acquiredRecords);
Comment on lines -1830 to 1837
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see any tests changes so are we testing the metrics in record limit mode, if we would be then we should have caught the issue earlier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — I did overlook the metrics-related test cases in record limit mode. I will add them. Thank you for pointing this out :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides this, the current inFlightBatchMessageCount records only batch-level inFlightMessageCount and may not accurately reflect per-offset counts; perhaps we should add an offset-level metric. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should add an offset-level metric.

Out of curiosity, is the difference between them equal to the limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @chirag-wadhwa5 noted in comment #20246 ,InFlightMessageCount currently reports only the total number of in-flight messages for the share partition and does not indicate how many records within that partition are in each state; for example, records in ACKNOWLEDGED or ARCHIVED should not be considered in-flight. Perhaps we could add a Gauge metric to track the number of records in each state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @apoorvmittal10
I've added the relevant tests to cover the metrics in record limit mode. Please take a look.

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;

import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import kafka.server.ReplicaManager;

import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import kafka.server.share.SharePartitionManager.SharePartitionListener;

import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
Expand Down
Loading