Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -2018,7 +2018,14 @@ private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batch
*/
private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch, long requestFirstOffset, long requestLastOffset) {
if (inFlightBatch.offsetState() == null) {
return inFlightBatch.batchDeliveryCount() >= throttleRecordsDeliveryLimit;
// If offsetState is null, it means the batch is not split and represents a single batch.
// Check if the batch is in AVAILABLE state and has no ongoing transition.
// The requested batch shall always be within the request first and last offset as the sub
// map batches are only fetched to consider.
if (inFlightBatch.batchState() == RecordState.AVAILABLE && !inFlightBatch.batchHasOngoingStateTransition()) {
return inFlightBatch.batchDeliveryCount() >= throttleRecordsDeliveryLimit;
}
return false;
}

return inFlightBatch.offsetState().entrySet().stream().filter(entry -> {
Expand All @@ -2028,10 +2035,7 @@ private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch, long
if (entry.getKey() > requestLastOffset) {
return false;
}
if (entry.getValue().state() != RecordState.AVAILABLE) {
return false;
}
return true;
return entry.getValue().state() == RecordState.AVAILABLE && !entry.getValue().hasOngoingStateTransition();
}).mapToInt(entry -> entry.getValue().deliveryCount()).max().orElse(0) >= throttleRecordsDeliveryLimit;
}

Expand Down
117 changes: 116 additions & 1 deletion core/src/test/java/kafka/server/share/SharePartitionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11201,7 +11201,7 @@ public void testThrottleRecordsWhenPendingDeliveriesExist() {
PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(),
List.of(
new PersisterStateBatch(15L, 19L, RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2),
new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 3),
new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3)))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
Expand Down Expand Up @@ -11590,6 +11590,121 @@ public void testAcquisitionNotThrottledIfHighDeliveryCountRecordNotAcquired() {
assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray());
}

@Test
public void testAcquisitionThrottlingWithOngoingStateTransition() {
Persister persister = Mockito.mock(Persister.class);
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionAllData(0, 3, 15L, Errors.NONE.code(), Errors.NONE.message(),
List.of(
new PersisterStateBatch(15L, 19L, RecordState.AVAILABLE.id, (short) 1),
// Batch of 20-24 has been set to delivery count of 2 so in next acquisition it will be 3,
// and post that it should be throttled but because of pending state transition it
// should not be throttled.
new PersisterStateBatch(20L, 24L, RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(25L, 29L, RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(30L, 34L, RecordState.AVAILABLE.id, (short) 2),
// Similarly, batch of 35-39 has been set to delivery count of 2 so in next offset
// acquisition, some offsets will be at 3 delivery count, and post that offsets
// should be throttled but because of pending state transition they will not be throttled.
new PersisterStateBatch(35, 39L, RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(40, 44L, RecordState.ARCHIVED.id, (short) 5),
new PersisterStateBatch(45, 49L, RecordState.AVAILABLE.id, (short) 1)))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();

CompletableFuture<Void> result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertFalse(result.isCompletedExceptionally());

// Acquire batches 20-24 and 36-37 (offset based) and create a pending state transition.
fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5);
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(20L).batchState());
fetchAcquiredRecords(sharePartition, memoryRecords(36, 2), 2);
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(35L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(35L).offsetState().get(36L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(35L).offsetState().get(37L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(38L).state());

// Create a pending future which will block state updates.
CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>();
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);

// Release batch of 20-24 and offset 36-37, which will have pending state transition.
sharePartition.acknowledge(
MEMBER_ID,
List.of(new ShareAcknowledgementBatch(20, 24, List.of(AcknowledgeType.RELEASE.id)),
new ShareAcknowledgementBatch(36, 37, List.of(AcknowledgeType.RELEASE.id))));

assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(36L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(37L).state());

assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(35L).offsetState().get(35L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(35L).offsetState().get(36L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(35L).offsetState().get(37L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(35L).offsetState().get(38L).hasOngoingStateTransition());

ByteBuffer buffer = ByteBuffer.allocate(4096);
memoryRecordsBuilder(buffer, 15, 5).close();
memoryRecordsBuilder(buffer, 20, 5).close();
memoryRecordsBuilder(buffer, 25, 5).close();
memoryRecordsBuilder(buffer, 30, 5).close();
memoryRecordsBuilder(buffer, 35, 5).close();
memoryRecordsBuilder(buffer, 40, 5).close();
memoryRecordsBuilder(buffer, 45, 5).close();
buffer.flip();
MemoryRecords records = MemoryRecords.readableRecords(buffer);

// Acquire batches and batch 15-19, 25-29 will be acquired as batch 20-24 has pending state transition.
// Without pending transition, the acquisition would have happened only for 20-24 batch as the batch
// 20-24 would have marked to be throttled but eventually couldn't be acquired because of state transition.
fetchAcquiredRecords(sharePartition.acquire(
MEMBER_ID,
ShareAcquireMode.BATCH_OPTIMIZED,
BATCH_SIZE,
10,
15,
fetchPartitionData(records),
FETCH_ISOLATION_HWM),
10);

assertEquals(7, sharePartition.cachedState().size());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(20L).batchState());
assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(25L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(30L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(35L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(36L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(37L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(38L).state());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(40L).batchState());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(45L).batchState());

// Re-trigger the acquisition and rest all the records will be acquired, including the offsets
// ones. The throttling should not happen because of pending state transition.
List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
MEMBER_ID,
ShareAcquireMode.BATCH_OPTIMIZED,
BATCH_SIZE,
500,
15,
fetchPartitionData(records),
FETCH_ISOLATION_HWM),
13);

List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(30, 34, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(35, 35, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(38, 38, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(39, 39, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(45, 49, 2));

assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray());
}

/**
* This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT).
*/
Expand Down