From 60773d2c70fed051c4017b87dc48c4fb78d620ed Mon Sep 17 00:00:00 2001 From: JimmyWang6 Date: Mon, 24 Nov 2025 19:53:50 +0800 Subject: [PATCH 1/6] Move ShareAcquireMode to internal package. --- .../org/apache/kafka/clients/consumer/ConsumerConfig.java | 1 + .../clients/consumer/{ => internals}/ShareAcquireMode.java | 4 ++-- .../consumer/internals/ShareConsumeRequestManager.java | 1 - .../kafka/clients/consumer/internals/ShareFetchConfig.java | 1 - .../org/apache/kafka/common/requests/ShareFetchRequest.java | 2 +- .../consumer/internals/ShareConsumeRequestManagerTest.java | 1 - .../clients/consumer/internals/ShareSessionHandlerTest.java | 1 - core/src/main/java/kafka/server/share/SharePartition.java | 2 +- .../test/java/kafka/server/share/DelayedShareFetchTest.java | 2 +- .../src/test/java/kafka/server/share/ShareFetchUtilsTest.java | 2 +- .../java/kafka/server/share/SharePartitionManagerTest.java | 2 +- core/src/test/java/kafka/server/share/SharePartitionTest.java | 2 +- .../integration/kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | 2 +- .../unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala | 2 +- .../java/org/apache/kafka/server/share/fetch/ShareFetch.java | 2 +- .../org/apache/kafka/server/share/fetch/ShareFetchTest.java | 2 +- 17 files changed, 14 insertions(+), 17 deletions(-) rename clients/src/main/java/org/apache/kafka/clients/consumer/{ => internals}/ShareAcquireMode.java (97%) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 2f527bdf9a1a4..7ad52c2122627 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.MetadataRecoveryStrategy; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode; +import org.apache.kafka.clients.consumer.internals.ShareAcquireMode; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java similarity index 97% rename from clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java index ec19a43e730d0..ec0d988ec65c1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.clients.consumer; +package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -29,7 +29,7 @@ public enum ShareAcquireMode { public final String name; - public final byte id; + final byte id; ShareAcquireMode(final String name, final byte id) { this.name = name; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index c727cfad62542..9c684c0be9933 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; -import org.apache.kafka.clients.consumer.ShareAcquireMode; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java index 3c606e1f18d5e..5dd4c1db04c80 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java @@ -17,7 +17,6 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ShareAcquireMode; import org.apache.kafka.common.IsolationLevel; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java index 53f9373d35fb1..de977fe31b4eb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.consumer.ShareAcquireMode; +import org.apache.kafka.clients.consumer.internals.ShareAcquireMode; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index e024818c42722..b6040950ccdd6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ShareAcquireMode; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java index 870f4ac7b57a5..72d5d7c4e2106 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ShareAcquireMode; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index cb268e81e9313..f068a5086091a 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -20,7 +20,7 @@ import kafka.server.share.SharePartitionManager.SharePartitionListener; import org.apache.kafka.clients.consumer.AcknowledgeType; -import org.apache.kafka.clients.consumer.ShareAcquireMode; +import org.apache.kafka.clients.consumer.internals.ShareAcquireMode; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index a7b138a370883..63869fdf17f35 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -21,7 +21,7 @@ import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; -import org.apache.kafka.clients.consumer.ShareAcquireMode; +import org.apache.kafka.clients.consumer.internals.ShareAcquireMode; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java index 37aa3c8c28b53..a0257ac65dc6f 100644 --- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java +++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java @@ -18,7 +18,7 @@ import kafka.server.ReplicaManager; -import org.apache.kafka.clients.consumer.ShareAcquireMode; +import org.apache.kafka.clients.consumer.internals.ShareAcquireMode; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 5fdf84363a479..84e2c40a843ca 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -22,7 +22,7 @@ import kafka.server.share.SharePartitionManager.SharePartitionListener; import org.apache.kafka.clients.consumer.AcknowledgeType; -import org.apache.kafka.clients.consumer.ShareAcquireMode; +import org.apache.kafka.clients.consumer.internals.ShareAcquireMode; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index c8be18c027ead..1833f7240bcd4 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -22,7 +22,7 @@ import kafka.server.share.SharePartitionManager.SharePartitionListener; import org.apache.kafka.clients.consumer.AcknowledgeType; -import org.apache.kafka.clients.consumer.ShareAcquireMode; +import org.apache.kafka.clients.consumer.internals.ShareAcquireMode; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 71f582ed8d52d..3d7a54ec2e259 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -22,7 +22,7 @@ import kafka.utils.{TestInfoUtils, TestUtils} import kafka.utils.TestUtils.waitUntilTrue import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ListGroupsOptions, NewTopic} import org.apache.kafka.clients.consumer._ -import org.apache.kafka.clients.consumer.internals.{StreamsRebalanceData, StreamsRebalanceListener} +import org.apache.kafka.clients.consumer.internals.{ShareAcquireMode, StreamsRebalanceData, StreamsRebalanceListener} import org.apache.kafka.clients.producer._ import org.apache.kafka.common.acl.AclOperation._ import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index d37991a98c660..753b831b594df 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -30,7 +30,7 @@ import kafka.server.share.{DelayedShareFetch, SharePartition} import kafka.utils.TestUtils.waitUntilTrue import kafka.utils.TestUtils import org.apache.kafka.clients.FetchSessionHandler -import org.apache.kafka.clients.consumer.ShareAcquireMode +import org.apache.kafka.clients.consumer.internals.ShareAcquireMode import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig diff --git a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala index a5520ab764c4d..597d1dc7036c0 100644 --- a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala @@ -18,7 +18,7 @@ package kafka.server import kafka.utils.TestUtils import org.apache.kafka.clients.admin.DescribeShareGroupsOptions -import org.apache.kafka.clients.consumer.ShareAcquireMode +import org.apache.kafka.clients.consumer.internals.ShareAcquireMode import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords import org.apache.kafka.common.message.{FindCoordinatorRequestData, ShareAcknowledgeRequestData, ShareAcknowledgeResponseData, ShareFetchRequestData, ShareFetchResponseData, ShareGroupHeartbeatRequestData} diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java b/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java index 9b16a94f39559..518266d9c0dcf 100644 --- a/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java @@ -17,7 +17,7 @@ package org.apache.kafka.server.share.fetch; -import org.apache.kafka.clients.consumer.ShareAcquireMode; +import org.apache.kafka.clients.consumer.internals.ShareAcquireMode; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; import org.apache.kafka.common.protocol.Errors; diff --git a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java index 013ee0c2c27f5..053d2f5d95326 100644 --- a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java +++ b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.server.share.fetch; -import org.apache.kafka.clients.consumer.ShareAcquireMode; +import org.apache.kafka.clients.consumer.internals.ShareAcquireMode; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; From b48f2a22216cf70469f987e73deb507ad4833928 Mon Sep 17 00:00:00 2001 From: JimmyWang6 Date: Mon, 24 Nov 2025 20:49:52 +0800 Subject: [PATCH 2/6] address comments --- .../kafka/clients/consumer/internals/ShareAcquireMode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java index ec0d988ec65c1..5548b68b9d505 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java @@ -65,7 +65,7 @@ public static ShareAcquireMode forId(byte id) { @Override public String toString() { - return "ShareAcquireMode(" + name + " (" + id + "))"; + return name; } public static class Validator implements ConfigDef.Validator { From 0943bb16b5955536b991e5e485ff4790203364cb Mon Sep 17 00:00:00 2001 From: JimmyWang6 Date: Wed, 26 Nov 2025 15:44:08 +0800 Subject: [PATCH 3/6] address comment --- core/src/main/java/kafka/server/share/SharePartition.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index f068a5086091a..5da182413d10e 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -1827,13 +1827,13 @@ private List createBatches( sharePartitionMetrics); int delayMs = recordLockDurationMsOrDefault(groupConfigManager, groupId, defaultRecordLockDurationMs); long lastOffset = acquiredRecords.firstOffset() + maxFetchRecords - 1; + sharePartitionMetrics.recordInFlightBatchMessageCount( + acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1); acquiredRecords.setLastOffset(lastOffset); inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset, delayMs); updateFindNextFetchOffset(true); cachedState.put(acquiredRecords.firstOffset(), inFlightBatch); - sharePartitionMetrics.recordInFlightBatchMessageCount( - acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1); return List.of(acquiredRecords); } } From cec6ae707c2cf4d43b469740669701a44b4f24e1 Mon Sep 17 00:00:00 2001 From: JimmyWang6 Date: Wed, 26 Nov 2025 19:53:29 +0800 Subject: [PATCH 4/6] address comment --- core/src/main/java/kafka/server/share/SharePartition.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 5da182413d10e..43e30d29c5608 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -1827,13 +1827,13 @@ private List createBatches( sharePartitionMetrics); int delayMs = recordLockDurationMsOrDefault(groupConfigManager, groupId, defaultRecordLockDurationMs); long lastOffset = acquiredRecords.firstOffset() + maxFetchRecords - 1; - sharePartitionMetrics.recordInFlightBatchMessageCount( - acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1); - acquiredRecords.setLastOffset(lastOffset); inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset, delayMs); updateFindNextFetchOffset(true); cachedState.put(acquiredRecords.firstOffset(), inFlightBatch); + sharePartitionMetrics.recordInFlightBatchMessageCount( + acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1); + acquiredRecords.setLastOffset(lastOffset); return List.of(acquiredRecords); } } From d8df8472890e5564a5b86a55af2f131a611d4903 Mon Sep 17 00:00:00 2001 From: JimmyWang6 Date: Thu, 27 Nov 2025 15:28:30 +0800 Subject: [PATCH 5/6] Add a couple of tests to cover related metrics in record_limit mode. --- .../server/share/SharePartitionTest.java | 141 +++++++++++++++--- 1 file changed, 124 insertions(+), 17 deletions(-) diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 1833f7240bcd4..1ccf4198b6e95 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -10273,13 +10273,14 @@ public void testRenewAcknowledgeWithPerOffsetAndBatchMix() { } @Test - public void testAcquireSingleBatchInRecordLimitMode() { + public void testAcquireSingleBatchInRecordLimitMode() throws InterruptedException { Persister persister = Mockito.mock(Persister.class); - SharePartition sharePartition = Mockito.spy(SharePartitionBuilder.builder() + SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withPersister(persister) - .build()); + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); // Member-1 attempts to acquire records in strict mode with a maximum fetch limit of 5 records. MemoryRecords records = memoryRecords(10); @@ -10339,16 +10340,26 @@ public void testAcquireSingleBatchInRecordLimitMode() { assertEquals("member-2", sharePartition.cachedState().get(0L).offsetState().get(5L).memberId()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(9L).state()); assertEquals("member-2", sharePartition.cachedState().get(0L).offsetState().get(5L).memberId()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + "In-flight batch count should be 1."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 10, + "In-flight message count should be 10."); + assertEquals(10, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(1, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(10, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(10, sharePartitionMetrics.inFlightBatchMessageCount().max()); } @Test public void testAcquireMultipleBatchesInRecordLimitMode() throws InterruptedException { Persister persister = Mockito.mock(Persister.class); - SharePartition sharePartition = Mockito.spy(SharePartitionBuilder.builder() + SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withPersister(persister) - .build()); + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); // Create 3 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); @@ -10382,16 +10393,26 @@ public void testAcquireMultipleBatchesInRecordLimitMode() throws InterruptedExce assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).offsetState().get(19L).state()); assertEquals(MEMBER_ID, sharePartition.cachedState().get(10L).offsetState().get(19L).memberId()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(10L).offsetState().get(20L).state()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + "In-flight batch count should be 1."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 20, + "In-flight message count should be 20."); + assertEquals(20, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(1, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(20, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(20, sharePartitionMetrics.inFlightBatchMessageCount().max()); } @Test - public void testAcquireWhenInsufficientRecordsInRecordLimitMode() { + public void testAcquireWhenInsufficientRecordsInRecordLimitMode() throws InterruptedException { Persister persister = Mockito.mock(Persister.class); - SharePartition sharePartition = Mockito.spy(SharePartitionBuilder.builder() + SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withPersister(persister) - .build()); + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); // Create 3 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); @@ -10422,12 +10443,22 @@ public void testAcquireWhenInsufficientRecordsInRecordLimitMode() { assertNull(sharePartition.cachedState().get(10L).offsetState()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); assertEquals(1, sharePartition.timer().size()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + "In-flight batch count should be 1."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 15, + "In-flight message count should be 15."); + assertEquals(15, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(1, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(15, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(15, sharePartitionMetrics.inFlightBatchMessageCount().max()); } @Test - public void testAcquireAndAcknowledgeMultipleSubsetRecordInRecordLimitMode() { + public void testAcquireAndAcknowledgeMultipleSubsetRecordInRecordLimitMode() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) .build(); ByteBuffer buffer = ByteBuffer.allocate(4096); @@ -10511,11 +10542,25 @@ public void testAcquireAndAcknowledgeMultipleSubsetRecordInRecordLimitMode() { expectedOffsetStateMap.put(20L, new InFlightState(RecordState.ACQUIRED, (short) 1, "member-2")); assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + "In-flight batch count should be 1."); + // End offset(20) - Start offset(10) + 1 = 11 + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 11, + "In-flight message count should be 11."); + // 16 messages(5-20) + assertEquals(16, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(1, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(16, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(16, sharePartitionMetrics.inFlightBatchMessageCount().max()); } @Test - public void testAcquireMultipleRecordsWithOverlapAndNewBatchInRecordLimitMode() { - SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + public void testAcquireMultipleRecordsWithOverlapAndNewBatchInRecordLimitMode() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); MemoryRecords records = memoryRecords(5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -10559,6 +10604,15 @@ public void testAcquireMultipleRecordsWithOverlapAndNewBatchInRecordLimitMode() expectedOffsetStateMap.put(3L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); expectedOffsetStateMap.put(4L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(0L).offsetState()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2, + "In-flight batch count should be 2."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 10, + "In-flight message count should be 10."); + assertEquals(10, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(2, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().max()); } @Test @@ -10639,6 +10693,7 @@ public void testAcquisitionLockSingleRecordBatchInRecordLimitMode() throws Inter SharePartition sharePartition = SharePartitionBuilder.builder() .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) .build(); fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); @@ -10651,6 +10706,7 @@ public void testAcquisitionLockSingleRecordBatchInRecordLimitMode() throws Inter DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> assertionFailedMessage(sharePartition, Map.of(10L, List.of()))); + assertEquals(5, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count()); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( MEMBER_ID, ShareAcquireMode.RECORD_LIMIT, @@ -10674,16 +10730,26 @@ public void testAcquisitionLockSingleRecordBatchInRecordLimitMode() throws Inter expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + "In-flight batch count should be 1."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 5, + "In-flight message count should be 5."); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(1, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().max()); } @Test public void testAcquisitionLockTimeoutMultipleRecordBatchInRecordLimitMode() throws InterruptedException { Persister persister = Mockito.mock(Persister.class); - SharePartition sharePartition = Mockito.spy(SharePartitionBuilder.builder() + SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withPersister(persister) - .build()); + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); // Create 3 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); @@ -10729,6 +10795,12 @@ public void testAcquisitionLockTimeoutMultipleRecordBatchInRecordLimitMode() thr sharePartition.timer().size() == 0, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> assertionFailedMessage(sharePartition, Map.of(0L, List.of(0L, 1L)))); + + assertEquals(2, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(1, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().max()); // Acquisition lock timeout task has run already and next fetch offset is moved to 0. assertEquals(0, sharePartition.nextFetchOffset()); assertEquals(1, sharePartition.cachedState().get(0L).offsetState().get(0L).deliveryCount()); @@ -10763,11 +10835,20 @@ public void testAcquisitionLockTimeoutMultipleRecordBatchInRecordLimitMode() thr DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> assertionFailedMessage(sharePartition, Map.of(0L, List.of(0L, 1L)))); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + "In-flight batch count should be 1."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 5, + "In-flight message count should be 5."); + assertEquals(3, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(1, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().max()); assertEquals(0, sharePartition.nextFetchOffset()); } @Test - public void testAcquireCachedStateInitialGapOverlapsWithActualPartitionGapInRecordLimitMode() { + public void testAcquireCachedStateInitialGapOverlapsWithActualPartitionGapInRecordLimitMode() throws InterruptedException { Persister persister = Mockito.mock(Persister.class); ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( @@ -10779,7 +10860,10 @@ public void testAcquireCachedStateInitialGapOverlapsWithActualPartitionGapInReco )))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); - SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); sharePartition.maybeInitialize(); // Creating 2 batches starting from 16, such that there is a natural gap from 11 to 15 @@ -10818,10 +10902,19 @@ public void testAcquireCachedStateInitialGapOverlapsWithActualPartitionGapInReco GapWindow persisterReadResultGapWindow = sharePartition.persisterReadResultGapWindow(); assertNull(persisterReadResultGapWindow); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 5, + "In-flight batch count should be 5."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 45, + "In-flight message count should be 45."); + assertEquals(45, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(10, sharePartitionMetrics.inFlightBatchMessageCount().max()); } @Test - public void testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGapInRecordLimitMode() { + public void testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGapInRecordLimitMode() throws InterruptedException { Persister persister = Mockito.mock(Persister.class); ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( @@ -10833,7 +10926,10 @@ public void testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGapInRe )))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); - SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); sharePartition.maybeInitialize(); // Creating 3 batches starting from 11, such that there is a natural gap from 36 to 40 @@ -10873,6 +10969,17 @@ public void testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGapInRe // Gap still exists from 36 to 40 assertEquals(36L, persisterReadResultGapWindow.gapStartOffset()); assertEquals(50L, persisterReadResultGapWindow.endOffset()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 4, + "In-flight batch count should be 4."); + // End offset(50) - Start offset(11) + 1 = 40 + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 40, + "In-flight message count should be 40."); + // 35 messages: 10 (11-20) + 10 (21-30) + 5 (31-35) + 10 (41-50) + assertEquals(35, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(4, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(10, sharePartitionMetrics.inFlightBatchMessageCount().max()); } @Test From dbeec65448ff3278a023843fcc4e609beef8f373 Mon Sep 17 00:00:00 2001 From: jimmy <961370183@qq.com> Date: Fri, 28 Nov 2025 01:18:06 +0800 Subject: [PATCH 6/6] unit test for ShareAcquireMode. --- .../consumer/internals/ShareAcquireMode.java | 3 + .../internals/ShareAcquireModeTest.java | 57 +++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcquireModeTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java index 5548b68b9d505..ac8568d536f7a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java @@ -40,6 +40,9 @@ public enum ShareAcquireMode { * Case-insensitive acquire mode lookup by string name. */ public static ShareAcquireMode of(final String name) { + if (name == null) { + throw new IllegalArgumentException("ShareAcquireMode is null"); + } try { return ShareAcquireMode.valueOf(name.toUpperCase(Locale.ROOT)); } catch (IllegalArgumentException e) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcquireModeTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcquireModeTest.java new file mode 100644 index 0000000000000..ccfa866c90dd0 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcquireModeTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.config.ConfigException; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ShareAcquireModeTest { + + @Test + public void testFromString() { + assertEquals(ShareAcquireMode.BATCH_OPTIMIZED, ShareAcquireMode.of("batch_optimized")); + assertEquals(ShareAcquireMode.BATCH_OPTIMIZED, ShareAcquireMode.of("BATCH_OPTIMIZED")); + assertEquals(ShareAcquireMode.RECORD_LIMIT, ShareAcquireMode.of("record_limit")); + assertEquals(ShareAcquireMode.RECORD_LIMIT, ShareAcquireMode.of("RECORD_LIMIT")); + assertThrows(IllegalArgumentException.class, () -> ShareAcquireMode.of("invalid_mode")); + assertThrows(IllegalArgumentException.class, () -> ShareAcquireMode.of("")); + assertThrows(IllegalArgumentException.class, () -> ShareAcquireMode.of(null)); + } + + @Test + public void testValidator() { + ShareAcquireMode.Validator validator = new ShareAcquireMode.Validator(); + assertDoesNotThrow(() -> validator.ensureValid("test", "batch_optimized")); + assertDoesNotThrow(() -> validator.ensureValid("test", "BATCH_OPTIMIZED")); + assertDoesNotThrow(() -> validator.ensureValid("test", "record_limit")); + assertDoesNotThrow(() -> validator.ensureValid("test", "RECORD_LIMIT")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "invalid_mode")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", null)); + } + + @Test + public void testValidatorToString() { + ShareAcquireMode.Validator validator = new ShareAcquireMode.Validator(); + assertEquals("[batch_optimized, record_limit]", validator.toString()); + } +}