Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1407,13 +1407,16 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
if (decoderStateObj instanceof StorageContentValidationDecoderPolicy.DecoderState) {
DecoderState decoderState = (DecoderState) decoderStateObj;

// Use totalEncodedBytesProcessed to request NEW bytes from the server
// The pending buffer already contains bytes we've received, so we request
// starting from the next byte after what we've already received
long encodedOffset = decoderState.getTotalEncodedBytesProcessed();
// Use getRetryOffset() to get the correct offset for retry
// This accounts for pending bytes that have been received but not yet consumed
long encodedOffset = decoderState.getRetryOffset();
long remainingCount = finalCount - encodedOffset;
retryRange = new BlobRange(initialOffset + encodedOffset, remainingCount);

LOGGER.info(
"Structured message smart retry: resuming from offset {} (initial={}, encoded={})",
initialOffset + encodedOffset, initialOffset, encodedOffset);

// Preserve the decoder state for the retry
retryContext = retryContext
.addData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY, decoderState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@ public class StructuredMessageDecoder {
private int numSegments;
private final long expectedContentLength;

private int messageOffset = 0;
private long messageOffset = 0;
private int currentSegmentNumber = 0;
private int currentSegmentContentLength = 0;
private int currentSegmentContentOffset = 0;
private long currentSegmentContentLength = 0;
private long currentSegmentContentOffset = 0;

private long messageCrc64 = 0;
private long segmentCrc64 = 0;
private final Map<Integer, Long> segmentCrcs = new HashMap<>();

// Track the last complete segment boundary for smart retry
private long lastCompleteSegmentStart = 0;
private long currentSegmentStart = 0;

/**
* Constructs a new StructuredMessageDecoder.
*
Expand All @@ -45,6 +49,50 @@ public StructuredMessageDecoder(long expectedContentLength) {
this.expectedContentLength = expectedContentLength;
}

/**
* Gets the byte offset where the last complete segment ended.
* This is used for smart retry to resume from a segment boundary.
*
* @return The byte offset of the last complete segment boundary.
*/
public long getLastCompleteSegmentStart() {
return lastCompleteSegmentStart;
}

/**
* Gets the current message offset (total bytes consumed from the structured message).
*
* @return The current message offset.
*/
public long getMessageOffset() {
return messageOffset;
}

/**
* Resets the decoder position to the last complete segment boundary.
* This is used during smart retry to ensure the decoder is in sync with
* the data being provided from the retry offset.
*/
public void resetToLastCompleteSegment() {
if (messageOffset != lastCompleteSegmentStart) {
LOGGER.atInfo()
.addKeyValue("fromOffset", messageOffset)
.addKeyValue("toOffset", lastCompleteSegmentStart)
.addKeyValue("currentSegmentNum", currentSegmentNumber)
.addKeyValue("currentSegmentContentOffset", currentSegmentContentOffset)
.addKeyValue("currentSegmentContentLength", currentSegmentContentLength)
.log("Resetting decoder to last complete segment boundary");
messageOffset = lastCompleteSegmentStart;
// Reset current segment state - next decode will read the segment header
currentSegmentContentOffset = 0;
currentSegmentContentLength = 0;
} else {
LOGGER.atVerbose()
.addKeyValue("offset", messageOffset)
.log("Decoder already at last complete segment boundary, no reset needed");
}
}

/**
* Reads the message header from the given buffer.
*
Expand Down Expand Up @@ -79,6 +127,91 @@ private void readMessageHeader(ByteBuffer buffer) {
messageOffset += V1_HEADER_LENGTH;
}

/**
* Converts a ByteBuffer range to hex string for diagnostic purposes.
*/
private static String toHex(ByteBuffer buf, int len) {
int pos = buf.position();
int peek = Math.min(len, buf.remaining());
byte[] out = new byte[peek];
buf.get(out, 0, peek);
buf.position(pos);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < out.length; i++) {
sb.append(String.format("%02X", out[i]));
if (i < out.length - 1)
sb.append(' ');
}
return sb.toString();
}

/**
* Peeks the next segment length without consuming from the buffer.
* Used by the policy to calculate encoded segment size before slicing.
*
* @param buffer The buffer to peek from.
* @param relativeIndex The position in the buffer to start reading from.
* @return The segment content length, or -1 if not enough bytes.
*/
public long peekNextSegmentLength(ByteBuffer buffer, int relativeIndex) {
// Need at least V1_SEGMENT_HEADER_LENGTH bytes to read segment number (2) + segment size (8)
if (relativeIndex + V1_SEGMENT_HEADER_LENGTH > buffer.limit()) {
return -1;
}
// Segment size is at offset 2 (after segment number which is 2 bytes)
return buffer.getLong(relativeIndex + 2);
}

/**
* Gets the flags for the current message (needed to determine if CRC is present).
*
* @return The message flags, or null if header not yet read.
*/
public StructuredMessageFlags getFlags() {
return flags;
}

/**
* Reads and validates segment length with diagnostic logging.
*/
private long readAndValidateSegmentLength(ByteBuffer buffer, long remaining) {
final int SEGMENT_SIZE_BYTES = 8; // segment size is 8 bytes (long)
if (buffer.remaining() < SEGMENT_SIZE_BYTES) {
LOGGER.error("Not enough bytes to read segment size. pos={}, remaining={}", buffer.position(),
buffer.remaining());
throw new IllegalStateException("Not enough bytes to read segment size");
}

// Diagnostic: dump first 16 bytes at this position so we can see what's being read
LOGGER.atInfo()
.addKeyValue("decoderOffset", messageOffset)
.addKeyValue("bufferPos", buffer.position())
.addKeyValue("bufferRemaining", buffer.remaining())
.addKeyValue("peek16", toHex(buffer, 16))
.addKeyValue("lastCompleteSegment", lastCompleteSegmentStart)
.log("Decoder about to read segment length");

long segmentLength = buffer.getLong();

if (segmentLength < 0 || segmentLength > remaining) {
// Peek next bytes for extra detail
String peekNext = toHex(buffer, 16);
LOGGER.error(
"Invalid segment length read: segmentLength={}, remaining={}, decoderOffset={}, "
+ "lastCompleteSegment={}, bufferPos={}, peek-next-bytes={}",
segmentLength, remaining, messageOffset, lastCompleteSegmentStart, buffer.position(), peekNext);
throw new IllegalArgumentException("Invalid segment size detected: " + segmentLength + " (remaining="
+ remaining + ", decoderOffset=" + messageOffset + ")");
}

LOGGER.atVerbose()
.addKeyValue("segmentLength", segmentLength)
.addKeyValue("decoderOffset", messageOffset)
.log("Valid segment length read");

return segmentLength;
}

/**
* Reads the segment header from the given buffer.
*
Expand All @@ -90,13 +223,13 @@ private void readSegmentHeader(ByteBuffer buffer) {
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Segment header is incomplete."));
}

// Mark the start of this segment (before reading the header)
currentSegmentStart = messageOffset;

int segmentNum = Short.toUnsignedInt(buffer.getShort());
int segmentSize = (int) buffer.getLong();

if (segmentSize < 0 || segmentSize > buffer.remaining()) {
throw LOGGER
.logExceptionAsError(new IllegalArgumentException("Invalid segment size detected: " + segmentSize));
}
// Read segment size with validation and diagnostics
long segmentSize = readAndValidateSegmentLength(buffer, buffer.remaining());

if (segmentNum != currentSegmentNumber + 1) {
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Unexpected segment number."));
Expand Down Expand Up @@ -126,8 +259,8 @@ private void readSegmentHeader(ByteBuffer buffer) {
* @throws IllegalArgumentException if there is a segment size mismatch.
*/
private void readSegmentContent(ByteBuffer buffer, ByteArrayOutputStream output, int size) {
int toRead = Math.min(buffer.remaining(), currentSegmentContentLength - currentSegmentContentOffset);
toRead = Math.min(toRead, size);
long remaining = currentSegmentContentLength - currentSegmentContentOffset;
int toRead = (int) Math.min(buffer.remaining(), Math.min(remaining, size));

if (toRead == 0) {
return;
Expand Down Expand Up @@ -182,10 +315,17 @@ private void readSegmentFooter(ByteBuffer buffer) {
messageOffset += CRC64_LENGTH;
}

// Mark that this segment is complete - update the last complete segment boundary
// This is the position where we can safely resume if a retry occurs
lastCompleteSegmentStart = messageOffset;
LOGGER.atInfo()
.addKeyValue("segmentNum", currentSegmentNumber)
.addKeyValue("offset", lastCompleteSegmentStart)
.addKeyValue("segmentLength", currentSegmentContentLength)
.log("Segment complete at byte offset");

if (currentSegmentNumber == numSegments) {
readMessageFooter(buffer);
} else {
readSegmentHeader(buffer);
}
}

Expand Down
Loading