@@ -11221,7 +11221,7 @@ public void testThrottleRecordsWhenPendingDeliveriesExist() {
1122111221 PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(),
1122211222 List.of(
1122311223 new PersisterStateBatch(15L, 19L, RecordState.AVAILABLE.id, (short) 2),
11224- new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2 ),
11224+ new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 3 ),
1122511225 new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3)))))));
1122611226 Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
1122711227 SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
@@ -11610,6 +11610,121 @@ public void testAcquisitionNotThrottledIfHighDeliveryCountRecordNotAcquired() {
1161011610 assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray());
1161111611 }
1161211612
11613+ @Test
11614+ public void testAcquisitionThrottlingWithOngoingStateTransition() {
11615+ Persister persister = Mockito.mock(Persister.class);
11616+ ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
11617+ Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
11618+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
11619+ PartitionFactory.newPartitionAllData(0, 3, 15L, Errors.NONE.code(), Errors.NONE.message(),
11620+ List.of(
11621+ new PersisterStateBatch(15L, 19L, RecordState.AVAILABLE.id, (short) 1),
11622+ // Batch of 20-24 has been set to delivery count of 2 so in next acquisition it will be 3,
11623+ // and post that it should be throttled but because of pending state transition it
11624+ // should not be throttled.
11625+ new PersisterStateBatch(20L, 24L, RecordState.AVAILABLE.id, (short) 2),
11626+ new PersisterStateBatch(25L, 29L, RecordState.AVAILABLE.id, (short) 2),
11627+ new PersisterStateBatch(30L, 34L, RecordState.AVAILABLE.id, (short) 2),
11628+ // Similarly, batch of 35-39 has been set to delivery count of 2 so in next offset
11629+ // acquisition, some offsets will be at 3 delivery count, and post that offsets
11630+ // should be throttled but because of pending state transition they will not be throttled.
11631+ new PersisterStateBatch(35, 39L, RecordState.AVAILABLE.id, (short) 2),
11632+ new PersisterStateBatch(40, 44L, RecordState.ARCHIVED.id, (short) 5),
11633+ new PersisterStateBatch(45, 49L, RecordState.AVAILABLE.id, (short) 1)))))));
11634+ Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
11635+ SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
11636+
11637+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
11638+ assertTrue(result.isDone());
11639+ assertFalse(result.isCompletedExceptionally());
11640+
11641+ // Acquire batches 20-24 and 36-37 (offset based) and create a pending state transition.
11642+ fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5);
11643+ assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(20L).batchState());
11644+ fetchAcquiredRecords(sharePartition, memoryRecords(36, 2), 2);
11645+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(35L).state());
11646+ assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(35L).offsetState().get(36L).state());
11647+ assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(35L).offsetState().get(37L).state());
11648+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(38L).state());
11649+
11650+ // Create a pending future which will block state updates.
11651+ CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>();
11652+ Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
11653+
11654+ // Release batch of 20-24 and offset 36-37, which will have pending state transition.
11655+ sharePartition.acknowledge(
11656+ MEMBER_ID,
11657+ List.of(new ShareAcknowledgementBatch(20, 24, List.of(AcknowledgeType.RELEASE.id)),
11658+ new ShareAcknowledgementBatch(36, 37, List.of(AcknowledgeType.RELEASE.id))));
11659+
11660+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(20L).batchState());
11661+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(36L).state());
11662+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(37L).state());
11663+
11664+ assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition());
11665+ assertFalse(sharePartition.cachedState().get(35L).offsetState().get(35L).hasOngoingStateTransition());
11666+ assertTrue(sharePartition.cachedState().get(35L).offsetState().get(36L).hasOngoingStateTransition());
11667+ assertTrue(sharePartition.cachedState().get(35L).offsetState().get(37L).hasOngoingStateTransition());
11668+ assertFalse(sharePartition.cachedState().get(35L).offsetState().get(38L).hasOngoingStateTransition());
11669+
11670+ ByteBuffer buffer = ByteBuffer.allocate(4096);
11671+ memoryRecordsBuilder(buffer, 15, 5).close();
11672+ memoryRecordsBuilder(buffer, 20, 5).close();
11673+ memoryRecordsBuilder(buffer, 25, 5).close();
11674+ memoryRecordsBuilder(buffer, 30, 5).close();
11675+ memoryRecordsBuilder(buffer, 35, 5).close();
11676+ memoryRecordsBuilder(buffer, 40, 5).close();
11677+ memoryRecordsBuilder(buffer, 45, 5).close();
11678+ buffer.flip();
11679+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
11680+
11681+ // Acquire batches and batch 15-19, 25-29 will be acquired as batch 20-24 has pending state transition.
11682+ // Without pending transition, the acquisition would have happened only for 20-24 batch as the batch
11683+ // 20-24 would have marked to be throttled but eventually couldn't be acquired because of state transition.
11684+ fetchAcquiredRecords(sharePartition.acquire(
11685+ MEMBER_ID,
11686+ ShareAcquireMode.BATCH_OPTIMIZED,
11687+ BATCH_SIZE,
11688+ 10,
11689+ 15,
11690+ fetchPartitionData(records),
11691+ FETCH_ISOLATION_HWM),
11692+ 10);
11693+
11694+ assertEquals(7, sharePartition.cachedState().size());
11695+ assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState());
11696+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(20L).batchState());
11697+ assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition());
11698+ assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(25L).batchState());
11699+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(30L).batchState());
11700+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(35L).state());
11701+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(36L).state());
11702+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(37L).state());
11703+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(38L).state());
11704+ assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(40L).batchState());
11705+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(45L).batchState());
11706+
11707+ // Re-trigger the acquisition and rest all the records will be acquired, including the offsets
11708+ // ones. The throttling should not happen because of pending state transition.
11709+ List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
11710+ MEMBER_ID,
11711+ ShareAcquireMode.BATCH_OPTIMIZED,
11712+ BATCH_SIZE,
11713+ 500,
11714+ 15,
11715+ fetchPartitionData(records),
11716+ FETCH_ISOLATION_HWM),
11717+ 13);
11718+
11719+ List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(30, 34, 3));
11720+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(35, 35, 3));
11721+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(38, 38, 3));
11722+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(39, 39, 3));
11723+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(45, 49, 2));
11724+
11725+ assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray());
11726+ }
11727+
1161311728 /**
1161411729 * This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT).
1161511730 */
0 commit comments