diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 6f7041b92c0be..21d4e67ff2fa0 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -2018,7 +2018,14 @@ private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batch */ private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch, long requestFirstOffset, long requestLastOffset) { if (inFlightBatch.offsetState() == null) { - return inFlightBatch.batchDeliveryCount() >= throttleRecordsDeliveryLimit; + // If offsetState is null, it means the batch is not split and represents a single batch. + // Check if the batch is in AVAILABLE state and has no ongoing transition. + // The requested batch shall always be within the request first and last offset as the sub + // map batches are only fetched to consider. + if (inFlightBatch.batchState() == RecordState.AVAILABLE && !inFlightBatch.batchHasOngoingStateTransition()) { + return inFlightBatch.batchDeliveryCount() >= throttleRecordsDeliveryLimit; + } + return false; } return inFlightBatch.offsetState().entrySet().stream().filter(entry -> { @@ -2028,10 +2035,7 @@ private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch, long if (entry.getKey() > requestLastOffset) { return false; } - if (entry.getValue().state() != RecordState.AVAILABLE) { - return false; - } - return true; + return entry.getValue().state() == RecordState.AVAILABLE && !entry.getValue().hasOngoingStateTransition(); }).mapToInt(entry -> entry.getValue().deliveryCount()).max().orElse(0) >= throttleRecordsDeliveryLimit; } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index c8be18c027ead..37ae617a25a75 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -11201,7 +11201,7 @@ public void testThrottleRecordsWhenPendingDeliveriesExist() { PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(), List.of( new PersisterStateBatch(15L, 19L, RecordState.AVAILABLE.id, (short) 2), - new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 3), new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); @@ -11590,6 +11590,121 @@ public void testAcquisitionNotThrottledIfHighDeliveryCountRecordNotAcquired() { assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); } + @Test + public void testAcquisitionThrottlingWithOngoingStateTransition() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 15L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 19L, RecordState.AVAILABLE.id, (short) 1), + // Batch of 20-24 has been set to delivery count of 2 so in next acquisition it will be 3, + // and post that it should be throttled but because of pending state transition it + // should not be throttled. + new PersisterStateBatch(20L, 24L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(25L, 29L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(30L, 34L, RecordState.AVAILABLE.id, (short) 2), + // Similarly, batch of 35-39 has been set to delivery count of 2 so in next offset + // acquisition, some offsets will be at 3 delivery count, and post that offsets + // should be throttled but because of pending state transition they will not be throttled. + new PersisterStateBatch(35, 39L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(40, 44L, RecordState.ARCHIVED.id, (short) 5), + new PersisterStateBatch(45, 49L, RecordState.AVAILABLE.id, (short) 1))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + // Acquire batches 20-24 and 36-37 (offset based) and create a pending state transition. + fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(20L).batchState()); + fetchAcquiredRecords(sharePartition, memoryRecords(36, 2), 2); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(35L).state()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(35L).offsetState().get(36L).state()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(35L).offsetState().get(37L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(38L).state()); + + // Create a pending future which will block state updates. + CompletableFuture future = new CompletableFuture<>(); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future); + + // Release batch of 20-24 and offset 36-37, which will have pending state transition. + sharePartition.acknowledge( + MEMBER_ID, + List.of(new ShareAcknowledgementBatch(20, 24, List.of(AcknowledgeType.RELEASE.id)), + new ShareAcknowledgementBatch(36, 37, List.of(AcknowledgeType.RELEASE.id)))); + + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(36L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(37L).state()); + + assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(35L).offsetState().get(35L).hasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(35L).offsetState().get(36L).hasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(35L).offsetState().get(37L).hasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(35L).offsetState().get(38L).hasOngoingStateTransition()); + + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 15, 5).close(); + memoryRecordsBuilder(buffer, 20, 5).close(); + memoryRecordsBuilder(buffer, 25, 5).close(); + memoryRecordsBuilder(buffer, 30, 5).close(); + memoryRecordsBuilder(buffer, 35, 5).close(); + memoryRecordsBuilder(buffer, 40, 5).close(); + memoryRecordsBuilder(buffer, 45, 5).close(); + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + // Acquire batches and batch 15-19, 25-29 will be acquired as batch 20-24 has pending state transition. + // Without pending transition, the acquisition would have happened only for 20-24 batch as the batch + // 20-24 would have marked to be throttled but eventually couldn't be acquired because of state transition. + fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + ShareAcquireMode.BATCH_OPTIMIZED, + BATCH_SIZE, + 10, + 15, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 10); + + assertEquals(7, sharePartition.cachedState().size()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(20L).batchState()); + assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(25L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(30L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(35L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(36L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(37L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(38L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(40L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(45L).batchState()); + + // Re-trigger the acquisition and rest all the records will be acquired, including the offsets + // ones. The throttling should not happen because of pending state transition. + List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + ShareAcquireMode.BATCH_OPTIMIZED, + BATCH_SIZE, + 500, + 15, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 13); + + List expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(30, 34, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(35, 35, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(38, 38, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(39, 39, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(45, 49, 2)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + } + /** * This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT). */