diff --git a/sdk/core/azure-core-management/CHANGELOG.md b/sdk/core/azure-core-management/CHANGELOG.md index 85f61d6a14e2..ba90e9ce82b7 100644 --- a/sdk/core/azure-core-management/CHANGELOG.md +++ b/sdk/core/azure-core-management/CHANGELOG.md @@ -4,6 +4,12 @@ ### Features Added +- Added continuation token support for ARM Long-Running Operations (LROs): + - `PollingState.toContinuationToken()` - Serialize polling state to a Base64-encoded token + - `PollingState.fromContinuationToken()` - Deserialize token back to polling state + - `SyncPollerFactory.resumeFromToken()` - Resume a SyncPoller from a continuation token + - `ArmLroSyncPoller` - New implementation that supports continuation tokens for ARM LROs + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/core/azure-core-management/src/main/java/com/azure/core/management/implementation/polling/PollingState.java b/sdk/core/azure-core-management/src/main/java/com/azure/core/management/implementation/polling/PollingState.java index fb2bcd1d7b98..7ec25fa524ca 100644 --- a/sdk/core/azure-core-management/src/main/java/com/azure/core/management/implementation/polling/PollingState.java +++ b/sdk/core/azure-core-management/src/main/java/com/azure/core/management/implementation/polling/PollingState.java @@ -169,6 +169,81 @@ public void store(PollingContext context) { } } + /** + * Serializes the current PollingState into a continuation token string that can be used to resume + * the long-running operation at a later time or in a different process. + *

+ * The continuation token is a Base64-encoded JSON representation of the polling state. It contains + * all necessary information to reconstruct the poller and continue polling the operation, including: + *

+ *

+ * Security Note: The continuation token contains the operation URL. Ensure tokens + * are stored securely and transmitted over secure channels. + *

+ * Compatibility Note: The token format is internal and may change between SDK versions. + * Tokens should only be used with the same version of the SDK that generated them. + * + * @return A Base64-encoded continuation token string representing the current polling state. + * @throws RuntimeException if the state cannot be serialized into a token. + */ + public String toContinuationToken() { + try { + String jsonState = this.serializerAdapter.serialize(this, SerializerEncoding.JSON); + return java.util.Base64.getEncoder() + .encodeToString(jsonState.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + } catch (IOException ioe) { + throw LOGGER.logExceptionAsError( + new RuntimeException("Failed to serialize PollingState to continuation token.", ioe)); + } + } + + /** + * Deserializes a continuation token string into a PollingState object that can be used to resume + * a long-running operation. + *

+ * This method is the counterpart to {@link #toContinuationToken()} and reconstructs a PollingState + * from a previously serialized token. The reconstructed state can then be used to create a new + * SyncPoller that continues polling from where the previous poller left off. + * + * @param continuationToken The Base64-encoded continuation token string, previously obtained from + * {@link #toContinuationToken()}. + * @param serializerAdapter The serializer for decoding the token. This should be the same serializer + * type used by the service client. + * @return A PollingState object reconstructed from the continuation token. + * @throws IllegalArgumentException if the {@code continuationToken} or {@code serializerAdapter} is null or empty. + * @throws RuntimeException if the token cannot be decoded or deserialized. This may occur if: + *

+ */ + public static PollingState fromContinuationToken(String continuationToken, SerializerAdapter serializerAdapter) { + Objects.requireNonNull(continuationToken, "'continuationToken' cannot be null."); + Objects.requireNonNull(serializerAdapter, "'serializerAdapter' cannot be null."); + + if (continuationToken.isEmpty()) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException("'continuationToken' cannot be empty.")); + } + + try { + byte[] decodedBytes = java.util.Base64.getDecoder().decode(continuationToken); + String jsonState = new String(decodedBytes, java.nio.charset.StandardCharsets.UTF_8); + return PollingState.from(serializerAdapter, jsonState); + } catch (IllegalArgumentException iae) { + throw LOGGER.logExceptionAsError(new RuntimeException( + "Failed to decode continuation token. The token may be malformed or corrupted.", iae)); + } catch (RuntimeException re) { + throw LOGGER.logExceptionAsError(new RuntimeException("Failed to deserialize continuation token. " + + "The token may have been created with a different SDK version.", re)); + } + } + /** * @return the current status of the long-running-operation. */ @@ -277,7 +352,7 @@ FinalResult getFinalResult() { /** * @return the last response body this PollingState received */ - String getLastResponseBody() { + public String getLastResponseBody() { return this.lastResponseBody; } diff --git a/sdk/core/azure-core-management/src/main/java/com/azure/core/management/polling/ArmLroSyncPoller.java b/sdk/core/azure-core-management/src/main/java/com/azure/core/management/polling/ArmLroSyncPoller.java new file mode 100644 index 000000000000..1e48573fda1b --- /dev/null +++ b/sdk/core/azure-core-management/src/main/java/com/azure/core/management/polling/ArmLroSyncPoller.java @@ -0,0 +1,117 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.management.polling; + +import com.azure.core.management.implementation.polling.PollingState; +import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.polling.PollResponse; +import com.azure.core.util.polling.PollingContext; +import com.azure.core.util.polling.SyncPoller; +import com.azure.core.util.serializer.SerializerAdapter; + +import java.time.Duration; +import java.util.Objects; + +/** + * Azure Resource Manager (ARM) Long-Running Operation SyncPoller implementation with continuation token support. + *

+ * This implementation wraps a standard {@link SyncPoller} and adds ARM-specific functionality including + * the ability to serialize the poller state to a continuation token and resume from such a token. + *

+ * This class is package-private and should only be created through {@link SyncPollerFactory}. + * + * @param The type of poll response value. + * @param The type of the final result of long-running operation. + */ +final class ArmLroSyncPoller implements SyncPoller, U> { + private static final ClientLogger LOGGER = new ClientLogger(ArmLroSyncPoller.class); + + private final SyncPoller, U> innerPoller; + private final SerializerAdapter serializerAdapter; + // We'll need a way to access the PollingContext from the inner poller + // Since SimpleSyncPoller doesn't expose it, we'll need to track it ourselves + private final PollingContextAccessor> contextAccessor; + + /** + * Functional interface to access the PollingContext from a poller. + */ + @FunctionalInterface + interface PollingContextAccessor { + PollingContext getContext(); + } + + /** + * Creates an ArmLroSyncPoller. + * + * @param innerPoller The underlying SyncPoller implementation. + * @param serializerAdapter The serializer for encoding/decoding. + * @param contextAccessor Accessor to get the current PollingContext. + */ + ArmLroSyncPoller(SyncPoller, U> innerPoller, SerializerAdapter serializerAdapter, + PollingContextAccessor> contextAccessor) { + this.innerPoller = Objects.requireNonNull(innerPoller, "'innerPoller' cannot be null."); + this.serializerAdapter = Objects.requireNonNull(serializerAdapter, "'serializerAdapter' cannot be null."); + this.contextAccessor = Objects.requireNonNull(contextAccessor, "'contextAccessor' cannot be null."); + } + + @Override + public PollResponse> poll() { + return innerPoller.poll(); + } + + @Override + public PollResponse> waitForCompletion() { + return innerPoller.waitForCompletion(); + } + + @Override + public PollResponse> waitForCompletion(Duration timeout) { + return innerPoller.waitForCompletion(timeout); + } + + @Override + public PollResponse> + waitUntil(com.azure.core.util.polling.LongRunningOperationStatus statusToWaitFor) { + return innerPoller.waitUntil(statusToWaitFor); + } + + @Override + public PollResponse> waitUntil(Duration timeout, + com.azure.core.util.polling.LongRunningOperationStatus statusToWaitFor) { + return innerPoller.waitUntil(timeout, statusToWaitFor); + } + + @Override + public U getFinalResult() { + return innerPoller.getFinalResult(); + } + + @Override + public U getFinalResult(Duration timeout) { + return innerPoller.getFinalResult(timeout); + } + + @Override + public void cancelOperation() { + innerPoller.cancelOperation(); + } + + @Override + public SyncPoller, U> setPollInterval(Duration pollInterval) { + innerPoller.setPollInterval(pollInterval); + return this; + } + + @Override + public String serializeContinuationToken() { + try { + PollingContext> context = contextAccessor.getContext(); + PollingState pollingState = PollingState.from(serializerAdapter, context); + return pollingState.toContinuationToken(); + } catch (Exception e) { + throw LOGGER.logExceptionAsError(new RuntimeException("Failed to serialize continuation token. " + + "The poller may not have been started or the polling state may be unavailable.", e)); + } + } +} diff --git a/sdk/core/azure-core-management/src/main/java/com/azure/core/management/polling/SyncPollerFactory.java b/sdk/core/azure-core-management/src/main/java/com/azure/core/management/polling/SyncPollerFactory.java index f325f3825179..7f592a4865c7 100644 --- a/sdk/core/azure-core-management/src/main/java/com/azure/core/management/polling/SyncPollerFactory.java +++ b/sdk/core/azure-core-management/src/main/java/com/azure/core/management/polling/SyncPollerFactory.java @@ -5,14 +5,19 @@ import com.azure.core.http.HttpPipeline; import com.azure.core.http.rest.Response; +import com.azure.core.management.implementation.polling.PollOperation; +import com.azure.core.management.implementation.polling.PollingState; import com.azure.core.management.implementation.polling.SyncPollOperation; import com.azure.core.util.BinaryData; import com.azure.core.util.Context; +import com.azure.core.util.polling.PollResponse; +import com.azure.core.util.polling.PollingContext; import com.azure.core.util.polling.SyncPoller; import com.azure.core.util.serializer.SerializerAdapter; import java.lang.reflect.Type; import java.time.Duration; +import java.util.function.Function; import java.util.function.Supplier; /** @@ -62,10 +67,132 @@ public static SyncPoller, U> create(SerializerAdapter seria public static SyncPoller, U> create(SerializerAdapter serializerAdapter, HttpPipeline httpPipeline, Type pollResultType, Type finalResultType, Duration defaultPollDuration, Supplier> lroInitialResponseSupplier, Context context) { - return SyncPoller.createPoller(defaultPollDuration, - SyncPollOperation.activationFunction(serializerAdapter, pollResultType, lroInitialResponseSupplier), + + // Create a holder for the PollingContext that we can access later + @SuppressWarnings({ "unchecked", "rawtypes" }) + final PollingContext>[] contextHolder = (PollingContext>[]) new PollingContext[1]; + + // Wrap the activation function to capture the context + Function>, PollResponse>> wrappedActivation = pollingContext -> { + contextHolder[0] = pollingContext; + return SyncPollOperation + .activationFunction(serializerAdapter, pollResultType, lroInitialResponseSupplier) + .apply(pollingContext); + }; + + SyncPoller, U> innerPoller = SyncPoller.createPoller(defaultPollDuration, wrappedActivation, + SyncPollOperation.pollFunction(serializerAdapter, httpPipeline, pollResultType, context), + SyncPollOperation.cancelFunction(context), + SyncPollOperation.fetchResultFunction(serializerAdapter, httpPipeline, finalResultType, context)); + + // Wrap in ArmLroSyncPoller to add continuation token support + return new ArmLroSyncPoller<>(innerPoller, serializerAdapter, () -> contextHolder[0]); + } + + /** + * Resumes a SyncPoller for an Azure Resource Manager (ARM) long-running-operation (LRO) from a continuation token. + *

+ * This method recreates a SyncPoller from a previously serialized continuation token, allowing the polling + * operation to be resumed from its last known state. This is useful for scenarios where a process needs to + * survive restarts or where polling needs to be transferred between different processes or instances. + *

+ * The continuation token must have been obtained from a previous poller via + * {@link SyncPoller#serializeContinuationToken()}. + *

+ * Example: Resuming a server creation operation + *

{@code
+     * // Original process - start operation and get token
+     * SyncPoller, ServerInner> poller = 
+     *     client.beginCreate(resourceGroup, serverName, parameters);
+     * String token = poller.serializeContinuationToken();
+     * // Store token...
+     * 
+     * // Later, in a different process - resume from token
+     * SyncPoller, ServerInner> resumedPoller = 
+     *     SyncPollerFactory.resumeFromToken(
+     *         token,
+     *         client.getSerializerAdapter(),
+     *         client.getHttpPipeline(),
+     *         new TypeReference>() {}.getJavaType(),
+     *         ServerInner.class,
+     *         Duration.ofSeconds(30),
+     *         Context.NONE);
+     * 
+     * // Continue polling until completion
+     * ServerInner result = resumedPoller.getFinalResult();
+     * }
+ * + * @param continuationToken The Base64-encoded continuation token string obtained from a previous poller. + * @param serializerAdapter The serializer for any encoding and decoding. This should be the same type + * as used by the original poller. + * @param httpPipeline The HttpPipeline for making HTTP requests (e.g., poll requests). This should be + * configured with the same authentication and policies as the original poller. + * @param pollResultType The type of the poll result. If no result is expected, this should be Void.class. + * @param finalResultType The type of the final result. If no result is expected, this should be Void.class. + * @param defaultPollDuration The default poll interval to use if the service does not return a retry-after value. + * @param context The context shared by all requests. + * @param The type of poll result. + * @param The type of final result. + * @return A SyncPoller that resumes polling from the state captured in the continuation token. + * @throws IllegalArgumentException if {@code continuationToken} or {@code serializerAdapter} is null or empty. + * @throws RuntimeException if the token cannot be decoded or deserialized, which may occur if: + *
    + *
  • The token is malformed or corrupted
  • + *
  • The token was created with a different SDK version
  • + *
  • The token format has changed
  • + *
+ */ + public static SyncPoller, U> resumeFromToken(String continuationToken, + SerializerAdapter serializerAdapter, HttpPipeline httpPipeline, Type pollResultType, Type finalResultType, + Duration defaultPollDuration, Context context) { + + // Deserialize the continuation token to get the PollingState + PollingState pollingState = PollingState.fromContinuationToken(continuationToken, serializerAdapter); + + // Create a holder for the PollingContext + @SuppressWarnings({ "unchecked", "rawtypes" }) + final PollingContext>[] contextHolder = (PollingContext>[]) new PollingContext[1]; + + // Create an activation function that returns the current state as the activation response + Function>, PollResponse>> activationFunction = pollingContext -> { + contextHolder[0] = pollingContext; + pollingState.store(pollingContext); + T result = PollOperation.deserialize(serializerAdapter, pollingState.getLastResponseBody(), pollResultType); + return new PollResponse<>(pollingState.getOperationStatus(), new PollResult<>(result), + pollingState.getPollDelay()); + }; + + // Create the poller with the standard poll, cancel, and fetch result functions + SyncPoller, U> innerPoller = SyncPoller.createPoller(defaultPollDuration, activationFunction, SyncPollOperation.pollFunction(serializerAdapter, httpPipeline, pollResultType, context), SyncPollOperation.cancelFunction(context), SyncPollOperation.fetchResultFunction(serializerAdapter, httpPipeline, finalResultType, context)); + + // Wrap in ArmLroSyncPoller to add continuation token support + return new ArmLroSyncPoller<>(innerPoller, serializerAdapter, () -> contextHolder[0]); + } + + /** + * Resumes a SyncPoller for an Azure Resource Manager (ARM) long-running-operation (LRO) from a continuation token. + *

+ * This is a convenience overload that uses {@link Context#NONE} for the context parameter. + * + * @param continuationToken The Base64-encoded continuation token string obtained from a previous poller. + * @param serializerAdapter The serializer for any encoding and decoding. + * @param httpPipeline The HttpPipeline for making HTTP requests. + * @param pollResultType The type of the poll result. + * @param finalResultType The type of the final result. + * @param defaultPollDuration The default poll interval to use. + * @param The type of poll result. + * @param The type of final result. + * @return A SyncPoller that resumes polling from the state captured in the continuation token. + * @throws IllegalArgumentException if {@code continuationToken} or {@code serializerAdapter} is null or empty. + * @throws RuntimeException if the token cannot be decoded or deserialized. + */ + public static SyncPoller, U> resumeFromToken(String continuationToken, + SerializerAdapter serializerAdapter, HttpPipeline httpPipeline, Type pollResultType, Type finalResultType, + Duration defaultPollDuration) { + return resumeFromToken(continuationToken, serializerAdapter, httpPipeline, pollResultType, finalResultType, + defaultPollDuration, Context.NONE); } } diff --git a/sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/FooWithProvisioningState.java b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/FooWithProvisioningState.java index 13e80ea1e4bc..98d2c1b57a13 100644 --- a/sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/FooWithProvisioningState.java +++ b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/FooWithProvisioningState.java @@ -12,12 +12,12 @@ public class FooWithProvisioningState { public FooWithProvisioningState() { } - FooWithProvisioningState(String state) { + public FooWithProvisioningState(String state) { this.properties = new Properties(); this.properties.provisioningState = state; } - FooWithProvisioningState(String state, String resourceId) { + public FooWithProvisioningState(String state, String resourceId) { this.properties = new Properties(); this.properties.provisioningState = state; this.properties.resourceId = resourceId; diff --git a/sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/PollingStateContinuationTokenTest.java b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/PollingStateContinuationTokenTest.java new file mode 100644 index 000000000000..fd7de0d440ca --- /dev/null +++ b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/PollingStateContinuationTokenTest.java @@ -0,0 +1,195 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.management.implementation.polling; + +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.HttpMethod; +import com.azure.core.http.HttpRequest; +import com.azure.core.management.serializer.SerializerFactory; +import com.azure.core.util.serializer.SerializerAdapter; +import org.junit.jupiter.api.Test; + +import java.net.URL; +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests for PollingState continuation token serialization and deserialization. + */ +@SuppressWarnings("deprecation") +public class PollingStateContinuationTokenTest { + + private static final SerializerAdapter SERIALIZER = SerializerFactory.createDefaultManagementSerializerAdapter(); + + @Test + public void testToContinuationTokenAndFromContinuationToken() throws Exception { + // Arrange - create a PollingState + URL lroUrl = new URL( + "https://management.azure.com/subscriptions/sub1/resourceGroups/rg1/providers/Microsoft.Compute/virtualMachines/vm1"); + HttpRequest request = new HttpRequest(HttpMethod.PUT, lroUrl); + HttpHeaders headers = new HttpHeaders(); + headers.set("Azure-AsyncOperation", + "https://management.azure.com/subscriptions/sub1/providers/Microsoft.Compute/locations/westus/operations/op1"); + headers.set("Retry-After", "5"); + + String responseBody = "{\"id\":\"vm1\",\"name\":\"myVM\",\"properties\":{\"provisioningState\":\"Creating\"}}"; + + PollingState originalState = PollingState.create(SERIALIZER, request, 201, headers, responseBody); + + // Act - serialize to token + String token = originalState.toContinuationToken(); + + // Assert - token should not be null or empty + assertNotNull(token, "Continuation token should not be null"); + assertFalse(token.isEmpty(), "Continuation token should not be empty"); + + // Act - deserialize from token + PollingState deserializedState = PollingState.fromContinuationToken(token, SERIALIZER); + + // Assert - verify the deserialized state matches the original + assertNotNull(deserializedState, "Deserialized state should not be null"); + assertEquals(originalState.getOperationStatus(), deserializedState.getOperationStatus(), + "Operation status should match"); + assertNotNull(deserializedState.getPollDelay(), "Poll delay should not be null"); + assertEquals(Duration.ofSeconds(5), deserializedState.getPollDelay(), "Poll delay should be 5 seconds"); + } + + @Test + public void testContinuationTokenWithNullValue() { + // Act & Assert + assertThrows(NullPointerException.class, () -> { + PollingState.fromContinuationToken(null, SERIALIZER); + }, "Should throw NullPointerException for null token"); + } + + @Test + public void testContinuationTokenWithEmptyValue() { + // Act & Assert + RuntimeException exception = assertThrows(RuntimeException.class, () -> { + PollingState.fromContinuationToken("", SERIALIZER); + }, "Should throw exception for empty token"); + + assertTrue(exception.getMessage().contains("cannot be empty") + || exception.getCause() instanceof IllegalArgumentException, "Exception should indicate empty token"); + } + + @Test + public void testContinuationTokenWithInvalidBase64() { + // Arrange - create an invalid base64 string + String invalidToken = "This is not valid base64!@#$%"; + + // Act & Assert + assertThrows(RuntimeException.class, () -> { + PollingState.fromContinuationToken(invalidToken, SERIALIZER); + }, "Should throw exception for invalid base64 token"); + } + + @Test + public void testContinuationTokenWithInvalidJson() { + // Arrange - create a valid base64 string but invalid JSON content + String invalidJsonToken = java.util.Base64.getEncoder() + .encodeToString("{this is not valid json".getBytes(java.nio.charset.StandardCharsets.UTF_8)); + + // Act & Assert + assertThrows(RuntimeException.class, () -> { + PollingState.fromContinuationToken(invalidJsonToken, SERIALIZER); + }, "Should throw exception for invalid JSON in token"); + } + + @Test + public void testContinuationTokenPreservesPollingUrl() throws Exception { + // Arrange + URL lroUrl = new URL( + "https://management.azure.com/subscriptions/sub1/resourceGroups/rg1/providers/Microsoft.MySql/servers/server1"); + HttpRequest request = new HttpRequest(HttpMethod.PUT, lroUrl); + HttpHeaders headers = new HttpHeaders(); + String asyncOpUrl + = "https://management.azure.com/subscriptions/sub1/providers/Microsoft.MySql/locations/westus/operationStatuses/op123"; + headers.set("Azure-AsyncOperation", asyncOpUrl); + + String responseBody = "{\"id\":\"server1\",\"properties\":{\"provisioningState\":\"Provisioning\"}}"; + + PollingState originalState = PollingState.create(SERIALIZER, request, 201, headers, responseBody); + + // Act + String token = originalState.toContinuationToken(); + PollingState deserializedState = PollingState.fromContinuationToken(token, SERIALIZER); + + // Assert - verify polling URL is preserved + assertEquals(new URL(asyncOpUrl), deserializedState.getPollUrl(), + "Polling URL should be preserved in the continuation token"); + } + + @Test + public void testContinuationTokenWithLocationHeader() throws Exception { + // Arrange + URL lroUrl = new URL( + "https://management.azure.com/subscriptions/sub1/resourceGroups/rg1/providers/Microsoft.Compute/disks/disk1"); + HttpRequest request = new HttpRequest(HttpMethod.DELETE, lroUrl); + HttpHeaders headers = new HttpHeaders(); + String locationUrl + = "https://management.azure.com/subscriptions/sub1/providers/Microsoft.Compute/locations/westus/operationStatuses/op456"; + headers.set("Location", locationUrl); + + PollingState originalState = PollingState.create(SERIALIZER, request, 202, headers, null); + + // Act + String token = originalState.toContinuationToken(); + PollingState deserializedState = PollingState.fromContinuationToken(token, SERIALIZER); + + // Assert + assertEquals(new URL(locationUrl), deserializedState.getPollUrl(), + "Location URL should be preserved in the continuation token"); + } + + @Test + public void testContinuationTokenRoundTrip() throws Exception { + // Arrange - create, poll, then serialize + URL lroUrl = new URL( + "https://management.azure.com/subscriptions/sub1/resourceGroups/rg1/providers/Microsoft.Storage/storageAccounts/sa1"); + HttpRequest request = new HttpRequest(HttpMethod.PUT, lroUrl); + HttpHeaders initialHeaders = new HttpHeaders(); + initialHeaders.set("Azure-AsyncOperation", + "https://management.azure.com/subscriptions/sub1/providers/Microsoft.Storage/locations/westus/asyncOperations/op789"); + + String initialBody = "{\"id\":\"sa1\",\"properties\":{\"provisioningState\":\"Creating\"}}"; + + PollingState state = PollingState.create(SERIALIZER, request, 201, initialHeaders, initialBody); + + // Simulate a poll update + HttpHeaders pollHeaders = new HttpHeaders(); + pollHeaders.set("Retry-After", "10"); + String pollBody = "{\"status\":\"InProgress\"}"; + state.update(200, pollHeaders, pollBody); + + // Act - serialize and deserialize + String token1 = state.toContinuationToken(); + PollingState state2 = PollingState.fromContinuationToken(token1, SERIALIZER); + String token2 = state2.toContinuationToken(); + + // Assert - tokens should be equivalent (both represent the same state) + PollingState state3 = PollingState.fromContinuationToken(token2, SERIALIZER); + assertEquals(state2.getOperationStatus(), state3.getOperationStatus(), + "Round-trip serialization should preserve operation status"); + assertEquals(state2.getPollDelay(), state3.getPollDelay(), + "Round-trip serialization should preserve poll delay"); + } + + @Test + public void testContinuationTokenWithNullSerializer() throws Exception { + // Arrange + URL lroUrl = new URL("https://management.azure.com/subscriptions/sub1/test"); + HttpRequest request = new HttpRequest(HttpMethod.PUT, lroUrl); + HttpHeaders headers = new HttpHeaders(); + + PollingState state = PollingState.create(SERIALIZER, request, 200, headers, "{}"); + String token = state.toContinuationToken(); + + // Act & Assert + assertThrows(NullPointerException.class, () -> { + PollingState.fromContinuationToken(token, null); + }, "Should throw NullPointerException for null serializer"); + } +} diff --git a/sdk/core/azure-core-management/src/test/java/com/azure/core/management/polling/SyncPollerContinuationTokenTest.java b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/polling/SyncPollerContinuationTokenTest.java new file mode 100644 index 000000000000..f3041fbc3694 --- /dev/null +++ b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/polling/SyncPollerContinuationTokenTest.java @@ -0,0 +1,230 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.management.polling; + +import com.azure.core.http.HttpClient; +import com.azure.core.http.HttpHeaderName; +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.HttpMethod; +import com.azure.core.http.HttpPipeline; +import com.azure.core.http.HttpPipelineBuilder; +import com.azure.core.http.HttpRequest; +import com.azure.core.http.HttpResponse; +import com.azure.core.http.rest.Response; +import com.azure.core.http.rest.SimpleResponse; +import com.azure.core.management.implementation.polling.FooWithProvisioningState; +import com.azure.core.management.serializer.SerializerFactory; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.polling.LongRunningOperationStatus; +import com.azure.core.util.polling.PollResponse; +import com.azure.core.util.polling.SyncPoller; +import com.azure.core.util.serializer.SerializerAdapter; +import com.azure.core.util.serializer.SerializerEncoding; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests for SyncPoller continuation token support. + */ +@SuppressWarnings("deprecation") +public class SyncPollerContinuationTokenTest { + + private static final SerializerAdapter SERIALIZER = SerializerFactory.createDefaultManagementSerializerAdapter(); + private static final Duration POLL_INTERVAL = Duration.ofMillis(50); + + @Test + public void testSerializeAndResumeContinuationToken() throws Exception { + // Arrange - create a mock HTTP client that simulates an LRO + AtomicInteger callCount = new AtomicInteger(0); + String asyncOpUrl + = "https://management.azure.com/subscriptions/sub1/providers/Microsoft.Compute/locations/westus/operations/op1"; + + HttpClient mockHttpClient = new HttpClient() { + @Override + public Mono send(HttpRequest request) { + return Mono.fromCallable(() -> createMockResponse(request, callCount)); + } + }; + + HttpPipeline pipeline = new HttpPipelineBuilder().httpClient(mockHttpClient).build(); + + // Create initial LRO response + Response initialResponse = createInitialResponse(asyncOpUrl); + + // Act - create poller and poll once + SyncPoller, FooWithProvisioningState> originalPoller + = SyncPollerFactory.create(SERIALIZER, pipeline, FooWithProvisioningState.class, + FooWithProvisioningState.class, POLL_INTERVAL, () -> initialResponse, Context.NONE); + + // Poll once to update state + PollResponse> firstPoll = originalPoller.poll(); + assertEquals(LongRunningOperationStatus.IN_PROGRESS, firstPoll.getStatus(), + "First poll should show IN_PROGRESS"); + + // Serialize to continuation token + String token = originalPoller.serializeContinuationToken(); + assertNotNull(token, "Continuation token should not be null"); + assertFalse(token.isEmpty(), "Continuation token should not be empty"); + + // Act - resume from token in a "new" poller + SyncPoller, FooWithProvisioningState> resumedPoller + = SyncPollerFactory.resumeFromToken(token, SERIALIZER, pipeline, FooWithProvisioningState.class, + FooWithProvisioningState.class, POLL_INTERVAL, Context.NONE); + + // Poll the resumed poller + PollResponse> resumedPoll = resumedPoller.poll(); + assertNotNull(resumedPoll, "Resumed poll should not be null"); + + // Continue polling until completion + PollResponse> finalResponse = resumedPoller.waitForCompletion(); + assertEquals(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED, finalResponse.getStatus(), + "Polling should complete successfully"); + + // Verify we can get poll result (note: final result would require a GET to the resource URL, which we're not mocking) + assertNotNull(finalResponse.getValue(), "Final poll result should not be null"); + } + + @Test + public void testContinuationTokenMultipleResumes() throws Exception { + // Arrange + AtomicInteger callCount = new AtomicInteger(0); + String asyncOpUrl + = "https://management.azure.com/subscriptions/sub1/providers/Microsoft.Storage/locations/westus/operations/op2"; + + HttpClient mockHttpClient = request -> Mono.fromCallable(() -> createMockResponse(request, callCount)); + HttpPipeline pipeline = new HttpPipelineBuilder().httpClient(mockHttpClient).build(); + Response initialResponse = createInitialResponse(asyncOpUrl); + + // Create original poller + SyncPoller, FooWithProvisioningState> poller1 + = SyncPollerFactory.create(SERIALIZER, pipeline, FooWithProvisioningState.class, + FooWithProvisioningState.class, POLL_INTERVAL, () -> initialResponse, Context.NONE); + + poller1.poll(); // Poll once + String token1 = poller1.serializeContinuationToken(); + + // Resume from token1 + SyncPoller, FooWithProvisioningState> poller2 + = SyncPollerFactory.resumeFromToken(token1, SERIALIZER, pipeline, FooWithProvisioningState.class, + FooWithProvisioningState.class, POLL_INTERVAL, Context.NONE); + + poller2.poll(); // Poll once more + String token2 = poller2.serializeContinuationToken(); + + // Resume from token2 (second resume) + SyncPoller, FooWithProvisioningState> poller3 + = SyncPollerFactory.resumeFromToken(token2, SERIALIZER, pipeline, FooWithProvisioningState.class, + FooWithProvisioningState.class, POLL_INTERVAL, Context.NONE); + + // Final polling + PollResponse> finalResponse = poller3.waitForCompletion(); + assertEquals(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED, finalResponse.getStatus(), + "Multiple resumes should still lead to completion"); + } + + @Test + public void testContinuationTokenWithInvalidToken() { + // Arrange + HttpPipeline pipeline = new HttpPipelineBuilder().build(); + String invalidToken = "invalid-base64-token!@#"; + + // Act & Assert + assertThrows(RuntimeException.class, () -> { + SyncPollerFactory.resumeFromToken(invalidToken, SERIALIZER, pipeline, FooWithProvisioningState.class, + FooWithProvisioningState.class, POLL_INTERVAL); + }, "Should throw exception for invalid token"); + } + + // Helper method to create initial LRO response + private Response createInitialResponse(String asyncOpUrl) throws IOException { + FooWithProvisioningState foo = new FooWithProvisioningState("Creating"); + + String responseBody = SERIALIZER.serialize(foo, SerializerEncoding.JSON); + + HttpHeaders headers = new HttpHeaders(); + headers.set(HttpHeaderName.AZURE_ASYNCOPERATION, asyncOpUrl); + headers.set(HttpHeaderName.RETRY_AFTER, "1"); + + HttpRequest request = new HttpRequest(HttpMethod.PUT, + "https://management.azure.com/subscriptions/sub1/resourceGroups/rg1/providers/Microsoft.Compute/vms/vm1"); + + return new SimpleResponse<>(request, 201, headers, BinaryData.fromString(responseBody)); + } + + // Helper method to create mock HTTP responses for polling + private HttpResponse createMockResponse(HttpRequest request, AtomicInteger callCount) throws IOException { + int count = callCount.incrementAndGet(); + + // Simulate polling progression: Creating -> Updating -> Succeeded + String provisioningState; + int statusCode = 200; + + if (count <= 2) { + provisioningState = "InProgress"; + } else if (count <= 4) { + provisioningState = "Updating"; + } else { + provisioningState = "Succeeded"; + } + + String responseBody = String.format("{\"status\":\"%s\"}", provisioningState); + + return new HttpResponse(request) { + @Override + public int getStatusCode() { + return statusCode; + } + + @Override + public String getHeaderValue(String name) { + if (HttpHeaderName.RETRY_AFTER.getCaseSensitiveName().equalsIgnoreCase(name)) { + return "1"; + } + return null; + } + + @Override + public HttpHeaders getHeaders() { + HttpHeaders headers = new HttpHeaders(); + headers.set(HttpHeaderName.RETRY_AFTER, "1"); + return headers; + } + + @Override + public Flux getBody() { + return Flux.just(ByteBuffer.wrap(responseBody.getBytes(StandardCharsets.UTF_8))); + } + + @Override + public Mono getBodyAsByteArray() { + return Mono.just(responseBody.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public Mono getBodyAsString() { + return Mono.just(responseBody); + } + + @Override + public Mono getBodyAsString(java.nio.charset.Charset charset) { + return Mono.just(responseBody); + } + + @Override + public BinaryData getBodyAsBinaryData() { + return BinaryData.fromString(responseBody); + } + }; + } +} diff --git a/sdk/core/azure-core/CHANGELOG.md b/sdk/core/azure-core/CHANGELOG.md index d9173bd1bb63..eb0a2dbe497c 100644 --- a/sdk/core/azure-core/CHANGELOG.md +++ b/sdk/core/azure-core/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features Added +- Added `SyncPoller.serializeContinuationToken()` method to support continuation tokens for long-running operations. Default implementation throws `UnsupportedOperationException`; ARM LRO pollers created via `SyncPollerFactory` support this feature. + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/polling/SyncPoller.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/polling/SyncPoller.java index 27a9259c9b9b..75e25e4d2fd8 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/util/polling/SyncPoller.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/polling/SyncPoller.java @@ -165,6 +165,49 @@ default SyncPoller setPollInterval(Duration pollInterval) { return this; } + /** + * Serializes the current state of the poller into a continuation token that can be used to resume the + * long-running operation at a later time or in a different process. + *

+ * The continuation token captures the current state of the polling operation, including the operation URL, + * current status, and other necessary metadata. This token can be stored (e.g., in a database or file) and + * later used to recreate a poller that continues from the same point. + *

+ * Example: Exporting and resuming a long-running operation + *

{@code
+     * // Start a long-running operation
+     * SyncPoller, MyResource> poller = client.beginCreateResource(params);
+     * 
+     * // Export the continuation token (e.g., before process shutdown)
+     * String token = poller.serializeContinuationToken();
+     * // Store token in persistent storage...
+     * 
+     * // Later, in a different process or after restart:
+     * // Retrieve token from persistent storage...
+     * SyncPoller, MyResource> resumedPoller = 
+     *     client.resumeCreateResource(token);
+     * 
+     * // Continue polling until completion
+     * MyResource result = resumedPoller.getFinalResult();
+     * }
+ *

+ * Security Note: The continuation token may contain sensitive information such as + * operation URLs. Ensure tokens are stored securely and transmitted over secure channels. + *

+ * Compatibility Note: The token format is internal to the SDK and may change between + * versions. Tokens should only be used with the same SDK version that generated them. + * + * @return A continuation token string that represents the current state of the poller. + * @throws UnsupportedOperationException if this poller implementation does not support continuation tokens. + * This may occur for pollers that don't maintain sufficient state to be resumed, or for + * operations that cannot be polled externally. + */ + default String serializeContinuationToken() { + throw new UnsupportedOperationException("This poller does not support continuation tokens. " + + "Continuation tokens are only supported for Azure Resource Manager long-running operations " + + "created via SyncPollerFactory."); + } + /** * Creates default SyncPoller. *