-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix smart retry for structured message decoding byte tracking #47243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: copilot/add-custom-pipeline-policy
Are you sure you want to change the base?
Changes from 5 commits
66bf061
43830fd
b62739d
5038e4e
3e37e24
12360e4
9d78ce1
12fe68d
3af7a7b
4c1a3fa
06bb51d
82ad940
bf975a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,8 +60,15 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN | |
| Long contentLength = getContentLength(httpResponse.getHeaders()); | ||
|
|
||
| if (contentLength != null && contentLength > 0 && validationOptions != null) { | ||
| // Get or create decoder with state tracking | ||
| DecoderState decoderState = getOrCreateDecoderState(context, contentLength); | ||
| // Check if this is a retry - if so, get the number of decoded bytes to skip | ||
|
||
| long bytesToSkip = context.getData(Constants.STRUCTURED_MESSAGE_DECODED_BYTES_TO_SKIP_CONTEXT_KEY) | ||
| .filter(value -> value instanceof Long) | ||
| .map(value -> (Long) value) | ||
| .orElse(0L); | ||
|
|
||
| // Always create a fresh decoder for each request | ||
| // This is necessary because structured messages must be parsed from the beginning | ||
| DecoderState decoderState = new DecoderState(contentLength, bytesToSkip); | ||
|
|
||
| // Decode using the stateful decoder | ||
| Flux<ByteBuffer> decodedStream = decodeStream(httpResponse.getBody(), decoderState); | ||
|
|
@@ -98,14 +105,10 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState | |
| int availableSize = dataToProcess.remaining(); | ||
| ByteBuffer duplicateForDecode = dataToProcess.duplicate(); | ||
| int initialPosition = duplicateForDecode.position(); | ||
|
|
||
| // Decode - this advances duplicateForDecode's position | ||
| ByteBuffer decodedData = state.decoder.decode(duplicateForDecode, availableSize); | ||
|
|
||
| // Track decoded bytes | ||
| int decodedBytes = decodedData.remaining(); | ||
| state.totalBytesDecoded.addAndGet(decodedBytes); | ||
|
|
||
| // Calculate how much of the input buffer was consumed by checking the duplicate's position | ||
| int bytesConsumed = duplicateForDecode.position() - initialPosition; | ||
| int bytesRemaining = availableSize - bytesConsumed; | ||
|
|
@@ -121,9 +124,33 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState | |
| state.pendingBuffer = null; | ||
| } | ||
|
|
||
| // Return decoded data if any | ||
| // Handle skipping bytes for retries and tracking decoded bytes | ||
| int decodedBytes = decodedData.remaining(); | ||
| if (decodedBytes > 0) { | ||
| return Flux.just(decodedData); | ||
| // Track total decoded bytes | ||
| long totalDecoded = state.totalBytesDecoded.addAndGet(decodedBytes); | ||
|
|
||
| // If we need to skip bytes (retry scenario), adjust the buffer | ||
| if (state.bytesToSkip > 0) { | ||
| long currentPosition = totalDecoded - decodedBytes; // Where we were before adding these bytes | ||
|
|
||
| if (currentPosition + decodedBytes <= state.bytesToSkip) { | ||
| // All these bytes should be skipped | ||
| return Flux.empty(); | ||
| } else if (currentPosition < state.bytesToSkip) { | ||
| // Some bytes should be skipped | ||
| int skipAmount = (int) (state.bytesToSkip - currentPosition); | ||
| decodedData.position(decodedData.position() + skipAmount); | ||
| } | ||
| // else: no bytes need to be skipped, emit all | ||
| } | ||
|
|
||
| // Return decoded data if any remains after skipping | ||
| if (decodedData.hasRemaining()) { | ||
| return Flux.just(decodedData); | ||
| } else { | ||
| return Flux.empty(); | ||
| } | ||
| } else { | ||
| return Flux.empty(); | ||
| } | ||
|
|
@@ -199,20 +226,6 @@ private Long getContentLength(HttpHeaders headers) { | |
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Gets or creates a decoder state from context. | ||
| * | ||
| * @param context The pipeline call context. | ||
| * @param contentLength The content length. | ||
| * @return The decoder state. | ||
| */ | ||
| private DecoderState getOrCreateDecoderState(HttpPipelineCallContext context, long contentLength) { | ||
| return context.getData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY) | ||
| .filter(value -> value instanceof DecoderState) | ||
| .map(value -> (DecoderState) value) | ||
| .orElseGet(() -> new DecoderState(contentLength)); | ||
| } | ||
|
|
||
| /** | ||
| * Checks if the response is a download response. | ||
| * | ||
|
|
@@ -233,18 +246,21 @@ public static class DecoderState { | |
| private final long expectedContentLength; | ||
| private final AtomicLong totalBytesDecoded; | ||
| private final AtomicLong totalEncodedBytesProcessed; | ||
| private final long bytesToSkip; | ||
| private ByteBuffer pendingBuffer; | ||
|
|
||
| /** | ||
| * Creates a new decoder state. | ||
| * | ||
| * @param expectedContentLength The expected length of the encoded content. | ||
| * @param bytesToSkip The number of decoded bytes to skip (for retry scenarios). | ||
| */ | ||
| public DecoderState(long expectedContentLength) { | ||
| public DecoderState(long expectedContentLength, long bytesToSkip) { | ||
| this.expectedContentLength = expectedContentLength; | ||
| this.decoder = new StructuredMessageDecoder(expectedContentLength); | ||
| this.totalBytesDecoded = new AtomicLong(0); | ||
| this.totalEncodedBytesProcessed = new AtomicLong(0); | ||
| this.bytesToSkip = bytesToSkip; | ||
| this.pendingBuffer = null; | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot are these changes needed since the retry will happen mid-stream not from the beginning. so if you have forget to revert these changes then please do along with the any other extra changes thats needs to be reverted for smart retry starting from 0 or beginning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct - I've reverted the test expectations and removed the unused constants/unit tests that were specific to the "restart from beginning" approach. The tests now correctly expect progressive offsets for retry requests (first request at offset 0, subsequent retries at higher offsets based on actual decoder progress).
Reverted in commit 9d78ce1.