diff --git a/conductor-client/src/main/java/com/netflix/conductor/client/http/ConductorClient.java b/conductor-client/src/main/java/com/netflix/conductor/client/http/ConductorClient.java index f790eb9a..335bbe4b 100644 --- a/conductor-client/src/main/java/com/netflix/conductor/client/http/ConductorClient.java +++ b/conductor-client/src/main/java/com/netflix/conductor/client/http/ConductorClient.java @@ -210,12 +210,29 @@ private T deserialize(Response response, Type returnType) { return null; } + String contentType = response.header("Content-Type"); + + // Handle binary data (byte[]) for application/octet-stream + if ("application/octet-stream".equals(contentType) && returnType.equals(byte[].class)) { + if (response.body() == null) { + return null; + } + try { + //noinspection unchecked + return (T) response.body().bytes(); + } catch (IOException e) { + throw new ConductorClientException(response.message(), + e, + response.code(), + response.headers().toMultimap()); + } + } + String body = bodyAsString(response); if (body == null || "".equals(body)) { return null; } - String contentType = response.header("Content-Type"); if (contentType == null || isJsonMime(contentType)) { // This is hacky. It's required because Conductor's API is returning raw strings as JSON if (returnType.equals(String.class)) { diff --git a/conductor-client/src/main/java/com/netflix/conductor/client/http/IncomingWebhookClient.java b/conductor-client/src/main/java/com/netflix/conductor/client/http/IncomingWebhookClient.java new file mode 100644 index 00000000..b2c05a54 --- /dev/null +++ b/conductor-client/src/main/java/com/netflix/conductor/client/http/IncomingWebhookClient.java @@ -0,0 +1,96 @@ +/* + * Copyright 2020 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.http; + + +import org.apache.commons.lang3.Validate; + +import com.netflix.conductor.client.http.ConductorClientRequest.Method; + +import com.fasterxml.jackson.core.type.TypeReference; + +/** + * Client for incoming webhook operations in Conductor + */ +public final class IncomingWebhookClient { + + private ConductorClient client; + + /** Creates a default incoming webhook client */ + public IncomingWebhookClient() { + } + + public IncomingWebhookClient(ConductorClient client) { + this.client = client; + } + + /** + * Kept only for backwards compatibility + * + * @param rootUri basePath for the ApiClient + */ + @Deprecated + public void setRootURI(String rootUri) { + if (client != null) { + client.shutdown(); + } + client = new ConductorClient(rootUri); + } + + /** + * Handle incoming webhook with POST method + * + * @param id The webhook ID + * @param payload The webhook payload as string + * @return The response from the webhook handler + */ + public String handleWebhook(String id, String payload) { + Validate.notBlank(id, "Webhook ID cannot be blank"); + Validate.notNull(payload, "Payload cannot be null"); + + ConductorClientRequest.Builder requestBuilder = ConductorClientRequest.builder() + .method(Method.POST) + .path("/webhook/{id}") + .addPathParam("id", id) + .body(payload); + + ConductorClientRequest request = requestBuilder.build(); + + ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + }); + + return resp.getData(); + } + + /** + * Handle incoming webhook with GET method (typically for ping/health checks) + * + * @param id The webhook ID + * @return The response from the webhook handler + */ + public String handlePing(String id) { + Validate.notBlank(id, "Webhook ID cannot be blank"); + + ConductorClientRequest.Builder requestBuilder = ConductorClientRequest.builder() + .method(Method.GET) + .path("/webhook/{id}") + .addPathParam("id", id); + + ConductorClientRequest request = requestBuilder.build(); + + ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + }); + + return resp.getData(); + } +} diff --git a/conductor-client/src/main/java/com/netflix/conductor/client/http/ResourceSharingClient.java b/conductor-client/src/main/java/com/netflix/conductor/client/http/ResourceSharingClient.java new file mode 100644 index 00000000..7428d155 --- /dev/null +++ b/conductor-client/src/main/java/com/netflix/conductor/client/http/ResourceSharingClient.java @@ -0,0 +1,151 @@ +/* + * Copyright 2020 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.http; + +import java.util.List; + +import org.apache.commons.lang3.Validate; + +import com.netflix.conductor.client.http.ConductorClientRequest.Method; +import com.netflix.conductor.common.metadata.workflow.SharedResourceModel; + +import com.fasterxml.jackson.core.type.TypeReference; + +/** + * Client for resource sharing operations in Conductor + */ +public final class ResourceSharingClient { + + private ConductorClient client; + + /** Creates a default resource sharing client */ + public ResourceSharingClient() { + } + + public ResourceSharingClient(ConductorClient client) { + this.client = client; + } + + /** + * Kept only for backwards compatibility + * + * @param rootUri basePath for the ApiClient + */ + @Deprecated + public void setRootURI(String rootUri) { + if (client != null) { + client.shutdown(); + } + client = new ConductorClient(rootUri); + } + + /** + * Share a resource with another user or group + * + * @param resourceType Type of resource to share (e.g., "WORKFLOW", "TASK") + * @param resourceName Name of the resource to share + * @param sharedWith User or group to share the resource with + */ + public void shareResource(String resourceType, String resourceName, String sharedWith) { + Validate.notBlank(resourceType, "ResourceType cannot be blank"); + Validate.notBlank(resourceName, "ResourceName cannot be blank"); + Validate.notBlank(sharedWith, "SharedWith cannot be blank"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.POST) + .path("/share/shareResource") + .addQueryParam("resourceType", resourceType) + .addQueryParam("resourceName", resourceName) + .addQueryParam("sharedWith", sharedWith) + .build(); + + client.execute(request); + } + + /** + * Remove sharing of a resource with a user or group + * + * @param resourceType Type of resource to unshare + * @param resourceName Name of the resource to unshare + * @param sharedWith User or group to remove sharing from + */ + public void removeSharingResource(String resourceType, String resourceName, String sharedWith) { + Validate.notBlank(resourceType, "ResourceType cannot be blank"); + Validate.notBlank(resourceName, "ResourceName cannot be blank"); + Validate.notBlank(sharedWith, "SharedWith cannot be blank"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.DELETE) + .path("/share/removeSharingResource") + .addQueryParam("resourceType", resourceType) + .addQueryParam("resourceName", resourceName) + .addQueryParam("sharedWith", sharedWith) + .build(); + + client.execute(request); + } + + /** + * Get all shared resources accessible to the current user + * + * @return List of shared resources + */ + public List getSharedResources() { + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.GET) + .path("/share/getSharedResources") + .build(); + + ConductorClientResponse> resp = client.execute(request, new TypeReference<>() { + }); + + return resp.getData(); + } + + /** + * Get shared resources filtered by resource type + * + * @param resourceType Type of resource to filter by + * @return List of shared resources of the specified type + */ + public List getSharedResources(String resourceType) { + Validate.notBlank(resourceType, "ResourceType cannot be blank"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.GET) + .path("/share/getSharedResources") + .addQueryParam("resourceType", resourceType) + .build(); + + ConductorClientResponse> resp = client.execute(request, new TypeReference<>() { + }); + + return resp.getData(); + } + + /** + * Check if a specific resource is shared with a user or group + * + * @param resourceType Type of resource + * @param resourceName Name of the resource + * @param sharedWith User or group to check + * @return true if the resource is shared, false otherwise + */ + public boolean isResourceShared(String resourceType, String resourceName, String sharedWith) { + List sharedResources = getSharedResources(resourceType); + return sharedResources.stream() + .anyMatch(resource -> + resource.getResourceName().equals(resourceName) && + resource.getSharedWith().equals(sharedWith)); + } +} diff --git a/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index 2f09b2cd..02726e4c 100644 --- a/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -34,10 +34,12 @@ import com.netflix.conductor.client.exception.ConductorClientException; import com.netflix.conductor.client.http.ConductorClientRequest.Method; import com.netflix.conductor.common.config.ObjectMapperProvider; +import com.netflix.conductor.common.enums.ReturnStrategy; import com.netflix.conductor.common.metadata.tasks.PollData; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskExecLog; import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.common.model.SignalResponse; import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.TaskSummary; @@ -47,7 +49,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; -/** Client for conductor task management including polling for task, updating task status etc. */ +/** + * Client for conductor task management including polling for task, updating task status etc. + */ @Slf4j public final class TaskClient { @@ -61,7 +65,9 @@ public final class TaskClient { private ConductorClient client; - /** Creates a default task client */ + /** + * Creates a default task client + */ public TaskClient() { // client will be set once root uri is set this(null, new DefaultConductorClientConfiguration()); @@ -99,11 +105,11 @@ public void registerListener(TaskClientListener listener) { * Perform a poll for a task of a specific task type. * * @param taskType The taskType to poll for - * @param domain The domain of the task type + * @param domain The domain of the task type * @param workerId Name of the client worker. Used for logging. * @return Task waiting to be executed. */ - public Task pollTask(String taskType, String workerId, String domain){ + public Task pollTask(String taskType, String workerId, String domain) { Validate.notBlank(taskType, "Task type cannot be blank"); Validate.notBlank(workerId, "Worker id cannot be blank"); @@ -126,10 +132,10 @@ public Task pollTask(String taskType, String workerId, String domain){ /** * Perform a batch poll for tasks by task type. Batch size is configurable by count. * - * @param taskType Type of task to poll for - * @param workerId Name of the client worker. Used for logging. - * @param count Maximum number of tasks to be returned. Actual number of tasks returned can be - * less than this number. + * @param taskType Type of task to poll for + * @param workerId Name of the client worker. Used for logging. + * @param count Maximum number of tasks to be returned. Actual number of tasks returned can be + * less than this number. * @param timeoutInMillisecond Long poll wait timeout. * @return List of tasks awaiting to be executed. */ @@ -146,15 +152,15 @@ public List batchPollTasksByTaskType(String taskType, String workerId, int /** * Batch poll for tasks in a domain. Batch size is configurable by count. * - * @param taskType Type of task to poll for - * @param domain The domain of the task type - * @param workerId Name of the client worker. Used for logging. - * @param count Maximum number of tasks to be returned. Actual number of tasks returned can be - * less than this number. + * @param taskType Type of task to poll for + * @param domain The domain of the task type + * @param workerId Name of the client worker. Used for logging. + * @param count Maximum number of tasks to be returned. Actual number of tasks returned can be + * less than this number. * @param timeoutInMillisecond Long poll wait timeout. * @return List of tasks awaiting to be executed. */ - public List batchPollTasksInDomain(String taskType, String domain, String workerId, int count, int timeoutInMillisecond){ + public List batchPollTasksInDomain(String taskType, String domain, String workerId, int count, int timeoutInMillisecond) { Validate.notBlank(taskType, "Task type cannot be blank"); Validate.notBlank(workerId, "Worker id cannot be blank"); Validate.isTrue(count > 0, "Count must be greater than 0"); @@ -192,10 +198,10 @@ public void updateTask(TaskResult taskResult) { public Task updateTaskV2(TaskResult taskResult) { Validate.notNull(taskResult, "Task result cannot be null"); ConductorClientRequest request = ConductorClientRequest.builder() - .method(Method.POST) - .path("/tasks/update-v2") - .body(taskResult) - .build(); + .method(Method.POST) + .path("/tasks/update-v2") + .body(taskResult) + .build(); ConductorClientResponse response = client.execute(request, new TypeReference<>() { }); @@ -214,7 +220,7 @@ public Optional evaluateAndUploadLargePayload(Map taskOu eventDispatcher.publish(new TaskResultPayloadSizeEvent(taskType, taskResultSize)); long payloadSizeThreshold = conductorClientConfiguration.getTaskOutputPayloadThresholdKB() * 1024L; if (taskResultSize > payloadSizeThreshold) { - if (!conductorClientConfiguration.isExternalPayloadStorageEnabled() || taskResultSize + if (!conductorClientConfiguration.isExternalPayloadStorageEnabled() || taskResultSize > conductorClientConfiguration.getTaskOutputMaxPayloadThresholdKB() * 1024L) { throw new IllegalArgumentException( String.format("The TaskResult payload size: %d is greater than the permissible %d bytes", @@ -232,13 +238,14 @@ public Optional evaluateAndUploadLargePayload(Map taskOu throw new ConductorClientException(e); } } + /** * Ack for the task poll. * - * @param taskId Id of the task to be polled + * @param taskId Id of the task to be polled * @param workerId user identified worker. * @return true if the task was found with the given ID and acknowledged. False otherwise. If - * the server returns false, the client should NOT attempt to ack again. + * the server returns false, the client should NOT attempt to ack again. */ public Boolean ack(String taskId, String workerId) { Validate.notBlank(taskId, "Task id cannot be blank"); @@ -258,7 +265,7 @@ public Boolean ack(String taskId, String workerId) { /** * Log execution messages for a task. * - * @param taskId id of the task + * @param taskId id of the task * @param logMessage the message to be logged */ public void logMessageForTask(String taskId, String logMessage) { @@ -278,7 +285,7 @@ public void logMessageForTask(String taskId, String logMessage) { * * @param taskId id of the task. */ - public List getTaskLogs(String taskId){ + public List getTaskLogs(String taskId) { Validate.notBlank(taskId, "Task id cannot be blank"); ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.GET) @@ -312,11 +319,72 @@ public Task getTaskDetails(String taskId) { return resp.getData(); } + /** + * Signals a task with default return strategy (TARGET_WORKFLOW) + * + * @param workflowId Workflow Id of the workflow to be signaled + * @param status Signal status to be set for the workflow + * @param output Output for the task + * @return SignalResponse with target workflow details + */ + public SignalResponse signal(String workflowId, Task.Status status, Map output) { + return signal(workflowId, status, output, ReturnStrategy.TARGET_WORKFLOW); + } + + /** + * Signals a task in a workflow synchronously and returns data based on the specified return strategy. + * + * @param workflowId Workflow Id of the workflow to be signaled + * @param status Signal status to be set for the workflow + * @param output Output for the task + * @param returnStrategy Strategy for what data to return + * @return SignalResponse with data based on the return strategy + */ + public SignalResponse signal(String workflowId, Task.Status status, Map output, ReturnStrategy returnStrategy) { + Validate.notBlank(workflowId, "Workflow id cannot be blank"); + Validate.notNull(status, "Status cannot be null"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.POST) + .path("/tasks/{workflowId}/{status}/signal/sync") + .addPathParam("workflowId", workflowId) + .addPathParam("status", status.name()) + .addQueryParam("returnStrategy", returnStrategy.name()) + .body(output) + .build(); + + ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + }); + return resp.getData(); + } + + /** + * Signals a task in a workflow asynchronously. + * + * @param workflowId Workflow Id of the workflow to be signaled + * @param status Signal status to be set for the workflow + * @param output Output for the task + */ + public void signalAsync(String workflowId, Task.Status status, Map output) { + Validate.notBlank(workflowId, "Workflow id cannot be blank"); + Validate.notNull(status, "Status cannot be null"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.POST) + .path("/tasks/{workflowId}/{status}/signal") + .addPathParam("workflowId", workflowId) + .addPathParam("status", status.name()) + .body(output) + .build(); + + client.execute(request); + } + /** * Removes a task from a taskType queue * * @param taskType the taskType to identify the queue - * @param taskId the id of the task to be removed + * @param taskId the id of the task to be removed */ public void removeTaskFromQueue(String taskType, String taskId) { Validate.notBlank(taskType, "Task type cannot be blank"); @@ -422,12 +490,13 @@ public String requeuePendingTasksByTaskType(String taskType) { return resp.getData(); } + /** * Search for tasks based on payload * * @param query the search string * @return returns the {@link SearchResult} containing the {@link TaskSummary} matching the - * query + * query */ public SearchResult search(String query) { ConductorClientRequest request = ConductorClientRequest.builder() @@ -464,11 +533,11 @@ public SearchResult searchV2(String query) { /** * Paginated search for tasks based on payload * - * @param start start value of page - * @param size number of tasks to be returned - * @param sort sort order + * @param start start value of page + * @param size number of tasks to be returned + * @param sort sort order * @param freeText additional free text query - * @param query the search query + * @param query the search query * @return the {@link SearchResult} containing the {@link TaskSummary} that match the query */ public SearchResult search(Integer start, Integer size, String sort, String freeText, String query) { @@ -491,11 +560,11 @@ public SearchResult search(Integer start, Integer size, String sort /** * Paginated search for tasks based on payload * - * @param start start value of page - * @param size number of tasks to be returned - * @param sort sort order + * @param start start value of page + * @param size number of tasks to be returned + * @param sort sort order * @param freeText additional free text query - * @param query the search query + * @param query the search query * @return the {@link SearchResult} containing the {@link Task} that match the query */ public SearchResult searchV2(Integer start, Integer size, String sort, String freeText, String query) { diff --git a/conductor-client/src/main/java/com/netflix/conductor/client/http/WebhookConfigClient.java b/conductor-client/src/main/java/com/netflix/conductor/client/http/WebhookConfigClient.java new file mode 100644 index 00000000..c3a34c52 --- /dev/null +++ b/conductor-client/src/main/java/com/netflix/conductor/client/http/WebhookConfigClient.java @@ -0,0 +1,253 @@ +/* + * Copyright 2020 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.http; + +import java.util.List; + +import org.apache.commons.lang3.Validate; + +import com.netflix.conductor.client.http.ConductorClientRequest.Method; +import com.netflix.conductor.common.model.Tag; +import com.netflix.conductor.common.model.WebhookConfig; + +import com.fasterxml.jackson.core.type.TypeReference; + +/** + * Client for webhook configuration operations in Conductor + */ +public final class WebhookConfigClient { + + private ConductorClient client; + + /** + * Creates a default webhook config client + */ + public WebhookConfigClient() { + } + + public WebhookConfigClient(ConductorClient client) { + this.client = client; + } + + /** + * Kept only for backwards compatibility + * + * @param rootUri basePath for the ApiClient + */ + @Deprecated + public void setRootURI(String rootUri) { + if (client != null) { + client.shutdown(); + } + client = new ConductorClient(rootUri); + } + + /** + * Create a new webhook configuration + * + * @param webhookConfig The webhook configuration to create + * @return The created webhook configuration with assigned ID + */ + public WebhookConfig createWebhook(WebhookConfig webhookConfig) { + Validate.notNull(webhookConfig, "WebhookConfig cannot be null"); + Validate.notBlank(webhookConfig.getName(), "Webhook name cannot be blank"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.POST) + .path("/metadata/webhook") + .body(webhookConfig) + .build(); + + ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + }); + + return resp.getData(); + } + + /** + * Update an existing webhook configuration + * + * @param id The webhook ID to update + * @param webhookConfig The updated webhook configuration + * @return The updated webhook configuration + */ + public WebhookConfig updateWebhook(String id, WebhookConfig webhookConfig) { + Validate.notBlank(id, "Webhook ID cannot be blank"); + Validate.notNull(webhookConfig, "WebhookConfig cannot be null"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.PUT) + .path("/metadata/webhook/{id}") + .addPathParam("id", id) + .body(webhookConfig) + .build(); + + ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + }); + + return resp.getData(); + } + + /** + * Delete a webhook configuration + * + * @param id The webhook ID to delete + */ + public void deleteWebhook(String id) { + Validate.notBlank(id, "Webhook ID cannot be blank"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.DELETE) + .path("/metadata/webhook/{id}") + .addPathParam("id", id) + .build(); + + client.execute(request); + } + + /** + * Get a webhook configuration by ID + * + * @param id The webhook ID + * @return The webhook configuration + */ + public WebhookConfig getWebhook(String id) { + Validate.notBlank(id, "Webhook ID cannot be blank"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.GET) + .path("/metadata/webhook/{id}") + .addPathParam("id", id) + .build(); + + ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + }); + + return resp.getData(); + } + + /** + * Get all webhook configurations + * + * @return List of all webhook configurations + */ + public List getAllWebhooks() { + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.GET) + .path("/metadata/webhook") + .build(); + + ConductorClientResponse> resp = client.execute(request, new TypeReference<>() { + }); + + return resp.getData(); + } + + /** + * Get tags for a webhook + * + * @param id The webhook ID + * @return List of tags associated with the webhook + */ + public List getTagsForWebhook(String id) { + Validate.notBlank(id, "Webhook ID cannot be blank"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.GET) + .path("/metadata/webhook/{id}/tags") + .addPathParam("id", id) + .build(); + + ConductorClientResponse> resp = client.execute(request, new TypeReference<>() { + }); + + return resp.getData(); + } + + /** + * Set tags for a webhook (replaces existing tags) + * + * @param id The webhook ID + * @param tags The tags to set + */ + public void putTagsForWebhook(String id, List tags) { + Validate.notBlank(id, "Webhook ID cannot be blank"); + Validate.notNull(tags, "Tags list cannot be null"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.PUT) + .path("/metadata/webhook/{id}/tags") + .addPathParam("id", id) + .body(tags) + .build(); + + client.execute(request); + } + + /** + * Delete specific tags from a webhook + * + * @param id The webhook ID + * @param tags The tags to remove + */ + public void deleteTagsForWebhook(String id, List tags) { + Validate.notBlank(id, "Webhook ID cannot be blank"); + Validate.notNull(tags, "Tags list cannot be null"); + + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.DELETE) + .path("/metadata/webhook/{id}/tags") + .addPathParam("id", id) + .body(tags) + .build(); + + client.execute(request); + } + + /** + * Add a single tag to a webhook + * + * @param id The webhook ID + * @param tag The tag to add + */ + public void addTagToWebhook(String id, Tag tag) { + Validate.notNull(tag, "Tag cannot be null"); + putTagsForWebhook(id, List.of(tag)); + } + + /** + * Remove a single tag from a webhook + * + * @param id The webhook ID + * @param tag The tag to remove + */ + public void removeTagFromWebhook(String id, Tag tag) { + Validate.notNull(tag, "Tag cannot be null"); + deleteTagsForWebhook(id, List.of(tag)); + } + + /** + * Check if a webhook exists + * + * @param id The webhook ID + * @return true if the webhook exists, false otherwise + */ + public boolean webhookExists(String id) { + try { + getWebhook(id); + return true; + } catch (Exception e) { + return false; + } + } +} \ No newline at end of file diff --git a/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 48e425e4..cd99f03a 100644 --- a/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -18,9 +18,15 @@ import java.io.InputStream; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import com.netflix.conductor.client.config.ConductorClientConfiguration; import com.netflix.conductor.client.config.DefaultConductorClientConfiguration; @@ -34,15 +40,14 @@ import com.netflix.conductor.client.exception.ConductorClientException; import com.netflix.conductor.client.http.ConductorClientRequest.Method; import com.netflix.conductor.common.config.ObjectMapperProvider; +import com.netflix.conductor.common.enums.Consistency; +import com.netflix.conductor.common.enums.ReturnStrategy; import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.SkipTaskRequest; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; import com.netflix.conductor.common.model.BulkResponse; -import com.netflix.conductor.common.run.ExternalStorageLocation; -import com.netflix.conductor.common.run.SearchResult; -import com.netflix.conductor.common.run.Workflow; -import com.netflix.conductor.common.run.WorkflowSummary; -import com.netflix.conductor.common.run.WorkflowTestRequest; +import com.netflix.conductor.common.model.SignalResponse; +import com.netflix.conductor.common.run.*; import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.fasterxml.jackson.core.type.TypeReference; @@ -57,12 +62,13 @@ public final class WorkflowClient { private final ConductorClientConfiguration conductorClientConfiguration; private final EventDispatcher eventDispatcher = new EventDispatcher<>(); - + private final ExecutorService executorService; private ConductorClient client; - private PayloadStorage payloadStorage; - /** Creates a default workflow client */ + /** + * Creates a default workflow client + */ public WorkflowClient() { // client will be set once root uri is set this(null, new DefaultConductorClientConfiguration()); @@ -76,6 +82,11 @@ public WorkflowClient(ConductorClient client, ConductorClientConfiguration confi this.client = client; this.payloadStorage = new PayloadStorage(client); this.conductorClientConfiguration = config; + + ThreadFactory factory = new BasicThreadFactory.Builder() + .namingPattern("WorkflowClient Executor %d") + .build(); + this.executorService = Executors.newCachedThreadPool(factory); } /** @@ -139,7 +150,7 @@ public void checkAndUploadToExternalStorage(StartWorkflowRequest startWorkflowRe if (!conductorClientConfiguration.isExternalPayloadStorageEnabled() || (workflowInputSize > conductorClientConfiguration.getWorkflowInputMaxPayloadThresholdKB() * 1024L)) { String errorMsg = String.format("Input payload larger than the allowed threshold of: %d KB", - conductorClientConfiguration.getWorkflowInputPayloadThresholdKB()); + conductorClientConfiguration.getWorkflowInputPayloadThresholdKB()); throw new ConductorClientException(errorMsg); } else { eventDispatcher.publish(new WorkflowPayloadUsedEvent(startWorkflowRequest.getName(), @@ -148,15 +159,15 @@ public void checkAndUploadToExternalStorage(StartWorkflowRequest startWorkflowRe ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT.name())); String externalStoragePath = uploadToExternalPayloadStorage( - workflowInputBytes, - workflowInputSize); + workflowInputBytes, + workflowInputSize); startWorkflowRequest.setExternalInputPayloadStoragePath(externalStoragePath); startWorkflowRequest.setInput(null); } } } catch (IOException e) { String errorMsg = String.format("Unable to start workflow:%s, version:%s", - startWorkflowRequest.getName(), startWorkflowRequest.getVersion()); + startWorkflowRequest.getName(), startWorkflowRequest.getVersion()); log.error(errorMsg, e); eventDispatcher.publish(new WorkflowStartedEvent(startWorkflowRequest.getName(), @@ -179,7 +190,7 @@ private String uploadToExternalPayloadStorage(byte[] payloadBytes, long payloadS /** * Retrieve a workflow by workflow id * - * @param workflowId the id of the workflow + * @param workflowId the id of the workflow * @param includeTasks specify if the tasks in the workflow need to be returned * @return the requested workflow */ @@ -203,13 +214,13 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) { /** * Retrieve all workflows for a given correlation id and name * - * @param name the name of the workflow + * @param name the name of the workflow * @param correlationId the correlation id * @param includeClosed specify if all workflows are to be returned or only running workflows - * @param includeTasks specify if the tasks in the workflow need to be returned + * @param includeTasks specify if the tasks in the workflow need to be returned * @return list of workflows for the given correlation id and name */ - public List getWorkflows(String name, String correlationId, boolean includeClosed, boolean includeTasks){ + public List getWorkflows(String name, String correlationId, boolean includeClosed, boolean includeTasks) { Validate.notBlank(name, "name cannot be blank"); Validate.notBlank(correlationId, "correlationId cannot be blank"); @@ -233,7 +244,7 @@ public List getWorkflows(String name, String correlationId, boolean in /** * Removes a workflow from the system * - * @param workflowId the id of the workflow to be deleted + * @param workflowId the id of the workflow to be deleted * @param archiveWorkflow flag to indicate if the workflow should be archived before deletion */ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) { @@ -252,7 +263,7 @@ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) { * Terminates the execution of all given workflows instances * * @param workflowIds the ids of the workflows to be terminated - * @param reason the reason to be logged and displayed + * @param reason the reason to be logged and displayed * @return the {@link BulkResponse} contains bulkErrorResults and bulkSuccessfulResults */ public BulkResponse terminateWorkflows(List workflowIds, String reason) { @@ -275,7 +286,7 @@ public BulkResponse terminateWorkflows(List workflowIds, String reason) * Retrieve all running workflow instances for a given name and version * * @param workflowName the name of the workflow - * @param version the version of the wokflow definition. Defaults to 1. + * @param version the version of the wokflow definition. Defaults to 1. * @return the list of running workflow instances */ public List getRunningWorkflow(String workflowName, Integer version) { @@ -286,9 +297,9 @@ public List getRunningWorkflow(String workflowName, Integer version) { * Retrieve all workflow instances for a given workflow name between a specific time period * * @param workflowName the name of the workflow - * @param version the version of the workflow definition. Defaults to 1. - * @param startTime the start time of the period - * @param endTime the end time of the period + * @param version the version of the workflow definition. Defaults to 1. + * @param startTime the start time of the period + * @param endTime the end time of the period * @return returns a list of workflows created during the specified during the time period */ public List getWorkflowsByTimePeriod(String workflowName, int version, Long startTime, Long endTime) { @@ -350,7 +361,7 @@ public void resumeWorkflow(String workflowId) { /** * Skips a given task from a current RUNNING workflow * - * @param workflowId the id of the workflow instance + * @param workflowId the id of the workflow instance * @param taskReferenceName the reference name of the task to be skipped */ public void skipTaskFromWorkflow(String workflowId, String taskReferenceName) { @@ -373,7 +384,7 @@ public void skipTaskFromWorkflow(String workflowId, String taskReferenceName) { /** * Reruns the workflow from a specific task * - * @param workflowId the id of the workflow + * @param workflowId the id of the workflow * @param rerunWorkflowRequest the request containing the task to rerun from * @return the id of the workflow */ @@ -396,10 +407,10 @@ public String rerunWorkflow(String workflowId, RerunWorkflowRequest rerunWorkflo /** * Restart a completed workflow * - * @param workflowId the workflow id of the workflow to be restarted + * @param workflowId the workflow id of the workflow to be restarted * @param useLatestDefinitions if true, use the latest workflow and task definitions when - * restarting the workflow if false, use the workflow and task definitions embedded in the - * workflow execution when restarting the workflow + * restarting the workflow if false, use the workflow and task definitions embedded in the + * workflow execution when restarting the workflow */ public void restart(String workflowId, boolean useLatestDefinitions) { Validate.notBlank(workflowId, "workflow id cannot be blank"); @@ -448,7 +459,7 @@ public void resetCallbacksForInProgressTasks(String workflowId) { * Terminates the execution of the given workflow instance * * @param workflowId the id of the workflow to be terminated - * @param reason the reason to be logged and displayed + * @param reason the reason to be logged and displayed */ public void terminateWorkflow(String workflowId, String reason) { Validate.notBlank(workflowId, "workflow id cannot be blank"); @@ -494,11 +505,11 @@ public SearchResult searchV2(String query) { /** * Paginated search for workflows based on payload * - * @param start start value of page - * @param size number of workflows to be returned - * @param sort sort order + * @param start start value of page + * @param size number of workflows to be returned + * @param sort sort order * @param freeText additional free text query - * @param query the search query + * @param query the search query * @return the {@link SearchResult} containing the {@link WorkflowSummary} that match the query */ public SearchResult search( @@ -522,11 +533,11 @@ public SearchResult search( /** * Paginated search for workflows based on payload * - * @param start start value of page - * @param size number of workflows to be returned - * @param sort sort order + * @param start start value of page + * @param size number of workflows to be returned + * @param sort sort order * @param freeText additional free text query - * @param query the search query + * @param query the search query * @return the {@link SearchResult} containing the {@link Workflow} that match the query */ public SearchResult searchV2(Integer start, Integer size, String sort, String freeText, String query) { @@ -604,4 +615,171 @@ private List getRunningWorkflow(String name, Integer version, Long start return resp.getData(); } + + /** + * Shuts down the WorkflowClient and cleans up resources + */ + public void shutdown() { + if (executorService != null) { + executorService.shutdown(); + } + } + + /** + * Executes a workflow with return strategy - basic version with server defaults + * Uses server defaults: waitForSeconds=10, consistency=DURABLE, returnStrategy=TARGET_WORKFLOW + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request) { + return executeWorkflowWithReturnStrategy(request, null, null, null, null); + } + + /** + * Executes a workflow with specified return strategy only + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + ReturnStrategy returnStrategy) { + return executeWorkflowWithReturnStrategy(request, null, null, null, returnStrategy); + } + + /** + * Executes a workflow with single task reference to wait for + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + String waitUntilTaskRef) { + List taskRefs = waitUntilTaskRef != null ? List.of(waitUntilTaskRef) : null; + return executeWorkflowWithReturnStrategy(request, taskRefs, null, null, null); + } + + /** + * Executes a workflow with single task reference and wait time + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + String waitUntilTaskRef, + Integer waitForSeconds) { + List taskRefs = waitUntilTaskRef != null ? List.of(waitUntilTaskRef) : null; + return executeWorkflowWithReturnStrategy(request, taskRefs, waitForSeconds, null, null); + } + + /** + * Executes a workflow with single task reference, wait time, and return strategy + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + String waitUntilTaskRef, + Integer waitForSeconds, + ReturnStrategy returnStrategy) { + List taskRefs = waitUntilTaskRef != null ? List.of(waitUntilTaskRef) : null; + return executeWorkflowWithReturnStrategy(request, taskRefs, waitForSeconds, null, returnStrategy); + } + + /** + * Executes a workflow with multiple task references to wait for + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + List waitUntilTaskRef) { + return executeWorkflowWithReturnStrategy(request, waitUntilTaskRef, null, null, null); + } + + /** + * Executes a workflow with multiple task references and wait time + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + List waitUntilTaskRef, + Integer waitForSeconds) { + return executeWorkflowWithReturnStrategy(request, waitUntilTaskRef, waitForSeconds, null, null); + } + + /** + * Executes a workflow with multiple task references, wait time, and return strategy + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + List waitUntilTaskRef, + Integer waitForSeconds, + ReturnStrategy returnStrategy) { + return executeWorkflowWithReturnStrategy(request, waitUntilTaskRef, waitForSeconds, null, returnStrategy); + } + + /** + * Executes a workflow with consistency and return strategy + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + Consistency consistency, + ReturnStrategy returnStrategy) { + return executeWorkflowWithReturnStrategy(request, null, null, consistency, returnStrategy); + } + + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + List waitUntilTaskRef, + Integer waitForSeconds, + Consistency consistency, + ReturnStrategy returnStrategy) { + + CompletableFuture future = new CompletableFuture<>(); + String requestId = UUID.randomUUID().toString(); + + executorService.submit(() -> { + try { + SignalResponse response = executeWorkflowWithReturnStrategySync( + request, waitUntilTaskRef, waitForSeconds, consistency, returnStrategy, requestId); + future.complete(response); + } catch (Throwable t) { + future.completeExceptionally(t); + } + }); + + return future; + } + + private SignalResponse executeWorkflowWithReturnStrategySync( + StartWorkflowRequest request, + List waitUntilTaskRef, + Integer waitForSeconds, + Consistency consistency, + ReturnStrategy returnStrategy, + String requestId) { + + String waitUntilTaskRefStr = null; + if (waitUntilTaskRef != null && !waitUntilTaskRef.isEmpty()) { + waitUntilTaskRefStr = String.join(",", waitUntilTaskRef); + } + + ConductorClientRequest.Builder requestBuilder = ConductorClientRequest.builder() + .method(Method.POST) + .path("/workflow/execute/{name}/{version}") + .addPathParam("name", request.getName()) + .addPathParam("version", request.getVersion()) + .body(request); + + // Only add query parameters if they are not null - let server use defaults + if (requestId != null) { + requestBuilder.addQueryParam("requestId", requestId); + } + if (waitUntilTaskRefStr != null) { + requestBuilder.addQueryParam("waitUntilTaskRef", waitUntilTaskRefStr); + } + if (waitForSeconds != null) { + requestBuilder.addQueryParam("waitForSeconds", waitForSeconds); + } + if (consistency != null) { + requestBuilder.addQueryParam("consistency", consistency.name()); + } + if (returnStrategy != null) { + requestBuilder.addQueryParam("returnStrategy", returnStrategy.name()); + } + + ConductorClientRequest httpRequest = requestBuilder.build(); + ConductorClientResponse resp = client.execute(httpRequest, new TypeReference() { + }); + + return resp.getData(); + } } diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/enums/Consistency.java b/conductor-client/src/main/java/com/netflix/conductor/common/enums/Consistency.java similarity index 94% rename from orkes-client/src/main/java/io/orkes/conductor/client/enums/Consistency.java rename to conductor-client/src/main/java/com/netflix/conductor/common/enums/Consistency.java index db296d06..2cfd9f08 100644 --- a/orkes-client/src/main/java/io/orkes/conductor/client/enums/Consistency.java +++ b/conductor-client/src/main/java/com/netflix/conductor/common/enums/Consistency.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package io.orkes.conductor.client.enums; +package com.netflix.conductor.common.enums; public enum Consistency { SYNCHRONOUS, diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/enums/ReturnStrategy.java b/conductor-client/src/main/java/com/netflix/conductor/common/enums/ReturnStrategy.java similarity index 94% rename from orkes-client/src/main/java/io/orkes/conductor/client/enums/ReturnStrategy.java rename to conductor-client/src/main/java/com/netflix/conductor/common/enums/ReturnStrategy.java index cf4d1945..288903a8 100644 --- a/orkes-client/src/main/java/io/orkes/conductor/client/enums/ReturnStrategy.java +++ b/conductor-client/src/main/java/com/netflix/conductor/common/enums/ReturnStrategy.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package io.orkes.conductor.client.enums; +package com.netflix.conductor.common.enums; public enum ReturnStrategy { TARGET_WORKFLOW, // Default diff --git a/conductor-client/src/main/java/com/netflix/conductor/common/metadata/workflow/SharedResourceModel.java b/conductor-client/src/main/java/com/netflix/conductor/common/metadata/workflow/SharedResourceModel.java new file mode 100644 index 00000000..1b199a30 --- /dev/null +++ b/conductor-client/src/main/java/com/netflix/conductor/common/metadata/workflow/SharedResourceModel.java @@ -0,0 +1,130 @@ +/* + * Copyright 2020 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.common.metadata.workflow; + +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Model representing a shared resource between users/groups + */ +public class SharedResourceModel { + + @JsonProperty("resourceType") + private String resourceType; + + @JsonProperty("resourceName") + private String resourceName; + + @JsonProperty("sharedWith") + private String sharedWith; + + @JsonProperty("owner") + private String owner; + + @JsonProperty("sharedAt") + private Long sharedAt; + + public SharedResourceModel() { + } + + public SharedResourceModel(String resourceType, String resourceName, String sharedWith) { + this.resourceType = resourceType; + this.resourceName = resourceName; + this.sharedWith = sharedWith; + } + + /** + * @return The type of resource being shared (e.g., "WORKFLOW", "TASK") + */ + public String getResourceType() { + return resourceType; + } + + public void setResourceType(String resourceType) { + this.resourceType = resourceType; + } + + /** + * @return The name of the resource being shared + */ + public String getResourceName() { + return resourceName; + } + + public void setResourceName(String resourceName) { + this.resourceName = resourceName; + } + + /** + * @return The user/group the resource is shared with + */ + public String getSharedWith() { + return sharedWith; + } + + public void setSharedWith(String sharedWith) { + this.sharedWith = sharedWith; + } + + /** + * @return The owner of the resource + */ + public String getOwner() { + return owner; + } + + public void setOwner(String owner) { + this.owner = owner; + } + + /** + * @return Timestamp when the resource was shared + */ + public Long getSharedAt() { + return sharedAt; + } + + public void setSharedAt(Long sharedAt) { + this.sharedAt = sharedAt; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SharedResourceModel that = (SharedResourceModel) o; + return Objects.equals(resourceType, that.resourceType) && + Objects.equals(resourceName, that.resourceName) && + Objects.equals(sharedWith, that.sharedWith) && + Objects.equals(owner, that.owner) && + Objects.equals(sharedAt, that.sharedAt); + } + + @Override + public int hashCode() { + return Objects.hash(resourceType, resourceName, sharedWith, owner, sharedAt); + } + + @Override + public String toString() { + return "SharedResourceModel{" + + "resourceType='" + resourceType + '\'' + + ", resourceName='" + resourceName + '\'' + + ", sharedWith='" + sharedWith + '\'' + + ", owner='" + owner + '\'' + + ", sharedAt=" + sharedAt + + '}'; + } +} diff --git a/conductor-client/src/main/java/com/netflix/conductor/common/model/ProtoRegistryEntry.java b/conductor-client/src/main/java/com/netflix/conductor/common/model/ProtoRegistryEntry.java index 6a66980c..1dcc2ccc 100644 --- a/conductor-client/src/main/java/com/netflix/conductor/common/model/ProtoRegistryEntry.java +++ b/conductor-client/src/main/java/com/netflix/conductor/common/model/ProtoRegistryEntry.java @@ -16,7 +16,7 @@ @Data public class ProtoRegistryEntry { - private final String serviceName; - private final String filename; - private final byte[] data; + private String serviceName; + private String filename; + private byte[] data; } diff --git a/conductor-client/src/main/java/com/netflix/conductor/common/model/ServiceRegistry.java b/conductor-client/src/main/java/com/netflix/conductor/common/model/ServiceRegistry.java index b96c946a..f92574ef 100644 --- a/conductor-client/src/main/java/com/netflix/conductor/common/model/ServiceRegistry.java +++ b/conductor-client/src/main/java/com/netflix/conductor/common/model/ServiceRegistry.java @@ -25,7 +25,8 @@ public class ServiceRegistry { private String serviceURI; private final List methods = new ArrayList<>(); private final List requestParams = new ArrayList<>(); - private final Config config = new Config(); + private Config config = new Config(); + private boolean circuitBreakerEnabled = false; public enum Type { HTTP, gRPC diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/model/SignalResponse.java b/conductor-client/src/main/java/com/netflix/conductor/common/model/SignalResponse.java similarity index 97% rename from orkes-client/src/main/java/io/orkes/conductor/client/model/SignalResponse.java rename to conductor-client/src/main/java/com/netflix/conductor/common/model/SignalResponse.java index 9d54a009..97aa1db9 100644 --- a/orkes-client/src/main/java/io/orkes/conductor/client/model/SignalResponse.java +++ b/conductor-client/src/main/java/com/netflix/conductor/common/model/SignalResponse.java @@ -10,16 +10,15 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package io.orkes.conductor.client.model; +package com.netflix.conductor.common.model; import java.util.List; import java.util.Map; +import com.netflix.conductor.common.enums.ReturnStrategy; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.run.Workflow; -import io.orkes.conductor.client.enums.ReturnStrategy; - import lombok.*; @EqualsAndHashCode diff --git a/conductor-client/src/main/java/com/netflix/conductor/common/model/Tag.java b/conductor-client/src/main/java/com/netflix/conductor/common/model/Tag.java new file mode 100644 index 00000000..c39a09a4 --- /dev/null +++ b/conductor-client/src/main/java/com/netflix/conductor/common/model/Tag.java @@ -0,0 +1,60 @@ +/* + * Copyright 2025 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.common.model; + +import java.util.Objects; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class Tag { + + private String key; + private String value; + + @Deprecated(since = "11/21/23") + private String type; + + public static Tag of(String key, String value) { + return Tag.builder().key(key).value(value).build(); + } + + public static Tag of(String keyValue) { + String[] kv = keyValue.split(":"); + return Tag.builder().key(kv[0]).value(kv[1]).build(); + } + + @Override + public String toString() { + return String.format("%s:%s", key, value); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Tag tag = (Tag) o; + return key.equals(tag.key) && value.equals(tag.value); + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } +} \ No newline at end of file diff --git a/conductor-client/src/main/java/com/netflix/conductor/common/model/WebhookConfig.java b/conductor-client/src/main/java/com/netflix/conductor/common/model/WebhookConfig.java new file mode 100644 index 00000000..4e4da9d2 --- /dev/null +++ b/conductor-client/src/main/java/com/netflix/conductor/common/model/WebhookConfig.java @@ -0,0 +1,46 @@ +/* + * Copyright 2020 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.common.model; + +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +@Data +@RequiredArgsConstructor +@AllArgsConstructor +public class WebhookConfig { + + private String id; + + private String name; + + private Map receiverWorkflowNamesToVersions; + + private Map workflowsToStart; + + private Map headers; + + private Verifier verifier; + + private String sourcePlatform; + + public enum Verifier { + SLACK_BASED, + SIGNATURE_BASED, + HEADER_BASED, + TWITTER + } +} diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java index 2125e0f5..d4cddc3f 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java @@ -43,6 +43,8 @@ public class AnnotatedWorkerExecutor { private TaskRunnerConfigurer taskRunner; + protected List executors = new ArrayList<>(); + protected List workers = new ArrayList<>(); protected Map workerToThreadCount = new HashMap<>(); diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/WorkerConfiguration.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/WorkerConfiguration.java index 45a61577..255e175c 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/WorkerConfiguration.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/WorkerConfiguration.java @@ -20,7 +20,8 @@ public WorkerConfiguration(int defaultPollingInterval) { this.defaultPollingInterval = defaultPollingInterval; } - public WorkerConfiguration() {} + public WorkerConfiguration() { + } public int getPollingInterval(String taskName) { return defaultPollingInterval; diff --git a/orkes-client/build.gradle b/orkes-client/build.gradle index fc299d50..f468a91f 100644 --- a/orkes-client/build.gradle +++ b/orkes-client/build.gradle @@ -19,6 +19,7 @@ apply plugin: 'publish-config' dependencies { implementation project(':conductor-client') + implementation project(':java-sdk') implementation "com.squareup.okhttp3:okhttp:${versions.okHttp}" // test dependencies @@ -29,8 +30,11 @@ dependencies { testImplementation "org.powermock:powermock-api-mockito2:2.0.9" testImplementation 'org.spockframework:spock-core:2.3-groovy-3.0' + implementation 'org.springframework:spring-context:6.2.8' testImplementation 'org.codehaus.groovy:groovy:3.0.25' testImplementation 'ch.qos.logback:logback-classic:1.5.6' + + implementation "org.apache.kafka:kafka-clients:${versions.kafka}" } java { diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/ApiClient.java b/orkes-client/src/main/java/io/orkes/conductor/client/ApiClient.java index 74a1be76..fe509539 100644 --- a/orkes-client/src/main/java/io/orkes/conductor/client/ApiClient.java +++ b/orkes-client/src/main/java/io/orkes/conductor/client/ApiClient.java @@ -42,6 +42,12 @@ */ public final class ApiClient extends ConductorClient { + private boolean useGRPC; + private boolean useSSL; + + private String grpcHost = "localhost"; + private int grpcPort = 8090; + public ApiClient(String rootUri, String keyId, String secret) { this(ApiClient.builder() .basePath(rootUri) @@ -56,14 +62,43 @@ public ApiClient() { this(createBuilderWithEnvVars()); } + private ApiClient(ApiClientBuilder builder) { + super(builder); + } + private static ApiClientBuilder createBuilderWithEnvVars() { ApiClientBuilder builder = builder().useEnvVariables(true); builder.applyEnvVariables(); return builder; } - private ApiClient(ApiClientBuilder builder) { - super(builder); + public static ApiClientBuilder builder() { + return new ApiClientBuilder(); + } + + public boolean isUseGRPC() { + return useGRPC; + } + + /** + * Used for GRPC + * + * @param useSSL set f using SSL connection for gRPC + */ + public void setUseSSL(boolean useSSL) { + this.useSSL = useSSL; + } + + /** + * Set the gRPC host and port to use for worker communication. + * + * @param host gRPC server host + * @param port gRPC server port + */ + public void setUseGRPC(String host, int port) { + this.grpcHost = host; + this.grpcPort = port; + this.useGRPC = true; } public Call buildCall( @@ -114,7 +149,7 @@ public void executeAsync(Call call, final Type returnType, final ApiCallback public void onResponse(@NotNull Call call, @NotNull Response response) { T result; try { - result = (T) handleResponse(response, returnType); + result = handleResponse(response, returnType); } catch (ConductorClientException e) { callback.onFailure(e, response.code(), response.headers().toMultimap()); return; @@ -154,10 +189,6 @@ public ApiResponse execute(Call call, Type returnType) { } } - public static ApiClientBuilder builder() { - return new ApiClientBuilder(); - } - public static class ApiClientBuilder extends Builder { public ApiClientBuilder credentials(String key, String secret) { diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java b/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java index fd83db20..3fe5b6fc 100644 --- a/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java +++ b/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java @@ -25,16 +25,15 @@ import com.netflix.conductor.client.http.ConductorClientResponse; import com.netflix.conductor.client.http.TaskClient; import com.netflix.conductor.common.config.ObjectMapperProvider; +import com.netflix.conductor.common.enums.ReturnStrategy; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskExecLog; import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.common.model.SignalResponse; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.TaskSummary; import com.netflix.conductor.common.run.Workflow; -import io.orkes.conductor.client.enums.ReturnStrategy; -import io.orkes.conductor.client.model.SignalResponse; - import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -141,7 +140,7 @@ private Workflow updateTaskSync(Map output, * Signals a task with default return strategy (TARGET_WORKFLOW) */ public SignalResponse signal(String workflowId, Task.Status status, Map output) { - return signal(workflowId, status, output, ReturnStrategy.TARGET_WORKFLOW); + return taskClient.signal(workflowId, status, output, ReturnStrategy.TARGET_WORKFLOW); } /** @@ -154,18 +153,7 @@ public SignalResponse signal(String workflowId, Task.Status status, Map output, ReturnStrategy returnStrategy) { - ConductorClientRequest request = ConductorClientRequest.builder() - .method(ConductorClientRequest.Method.POST) - .path("/tasks/{workflowId}/{status}/signal/sync") - .addPathParam("workflowId", workflowId) - .addPathParam("status", status.name()) - .addQueryParam("returnStrategy", returnStrategy.name()) - .body(output) - .build(); - - ConductorClientResponse resp = client.execute(request, new TypeReference() { - }); - return resp.getData(); + return taskClient.signal(workflowId, status, output, returnStrategy); } /** @@ -176,15 +164,7 @@ public SignalResponse signal(String workflowId, Task.Status status, Map output) { - ConductorClientRequest request = ConductorClientRequest.builder() - .method(ConductorClientRequest.Method.POST) - .path("/tasks/{workflowId}/{status}/signal") - .addPathParam("workflowId", workflowId) - .addPathParam("status", status.name()) - .body(output) - .build(); - - client.execute(request); + taskClient.signalAsync(workflowId, status, output); } public Task pollTask(String taskType, String workerId, String domain) { diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java b/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java index b3ff235e..ceb14c16 100644 --- a/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java +++ b/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java @@ -17,30 +17,28 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import com.netflix.conductor.client.http.ConductorClient; import com.netflix.conductor.client.http.WorkflowClient; +import com.netflix.conductor.common.enums.Consistency; +import com.netflix.conductor.common.enums.ReturnStrategy; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.UpgradeWorkflowRequest; import com.netflix.conductor.common.model.BulkResponse; +import com.netflix.conductor.common.model.SignalResponse; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.WorkflowSummary; import com.netflix.conductor.common.run.WorkflowTestRequest; -import io.orkes.conductor.client.enums.Consistency; -import io.orkes.conductor.client.enums.ReturnStrategy; -import io.orkes.conductor.client.model.*; +import io.orkes.conductor.client.model.CorrelationIdsSearchRequest; +import io.orkes.conductor.client.model.WorkflowRun; +import io.orkes.conductor.client.model.WorkflowStateUpdate; +import io.orkes.conductor.client.model.WorkflowStatus; public class OrkesWorkflowClient implements AutoCloseable { @@ -78,9 +76,10 @@ public CompletableFuture executeWorkflow(StartWorkflowRequest reque /** * Synchronously executes a workflow - * @param request workflow execution request - * @param waitUntilTask waits until workflow has reached this task. - * Useful for executing it synchronously until this task and then continuing asynchronous execution + * + * @param request workflow execution request + * @param waitUntilTask waits until workflow has reached this task. + * Useful for executing it synchronously until this task and then continuing asynchronous execution * @param waitForSeconds maximum amount of time to wait before returning * @return WorkflowRun */ @@ -90,7 +89,8 @@ public CompletableFuture executeWorkflow(StartWorkflowRequest reque /** * Synchronously executes a workflow - * @param request workflow execution request + * + * @param request workflow execution request * @param waitUntilTasks waits until workflow has reached one of these tasks. * Useful for executing it synchronously until this task and then continuing asynchronous execution * Useful when workflow has multiple branches to wait for any of the branches to reach the task @@ -104,10 +104,11 @@ public CompletableFuture executeWorkflow(StartWorkflowRequest reque /** * Synchronously executes a workflow - * @param request workflow execution request + * + * @param request workflow execution request * @param waitUntilTask waits until workflow has reached one of these tasks. - * Useful for executing it synchronously until this task and then continuing asynchronous execution - * @param waitTimeout maximum amount of time to wait before returning + * Useful for executing it synchronously until this task and then continuing asynchronous execution + * @param waitTimeout maximum amount of time to wait before returning * @return WorkflowRun */ public WorkflowRun executeWorkflow(StartWorkflowRequest request, String waitUntilTask, Duration waitTimeout) throws ExecutionException, InterruptedException, TimeoutException { @@ -265,34 +266,98 @@ private CompletableFuture executeWorkflowHttp(StartWorkflowRequest } /** - * Executes a workflow with return strategy - new unified API - * - * @param request workflow execution request - * @return SignalResponse with target workflow details (default strategy) + * Executes a workflow with return strategy - basic version with server defaults + * Uses server defaults: waitForSeconds=10, consistency=DURABLE, returnStrategy=TARGET_WORKFLOW */ public CompletableFuture executeWorkflowWithReturnStrategy(StartWorkflowRequest request) { - return executeWorkflowWithReturnStrategy(request, null, 10, Consistency.SYNCHRONOUS, ReturnStrategy.TARGET_WORKFLOW); + return workflowClient.executeWorkflowWithReturnStrategy(request); } /** - * Executes a workflow with specified return strategy - * - * @param request workflow execution request - * @param returnStrategy strategy for what data to return - * @return SignalResponse based on the return strategy + * Executes a workflow with specified return strategy only */ public CompletableFuture executeWorkflowWithReturnStrategy(StartWorkflowRequest request, ReturnStrategy returnStrategy) { - return executeWorkflowWithReturnStrategy(request, null, 10, Consistency.SYNCHRONOUS, returnStrategy); + return workflowClient.executeWorkflowWithReturnStrategy(request, returnStrategy); + } + + /** + * Executes a workflow with single task reference to wait for + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + String waitUntilTaskRef) { + return workflowClient.executeWorkflowWithReturnStrategy(request, waitUntilTaskRef); + } + + /** + * Executes a workflow with single task reference and wait time + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + String waitUntilTaskRef, + Integer waitForSeconds) { + return workflowClient.executeWorkflowWithReturnStrategy(request, waitUntilTaskRef, waitForSeconds); + } + + /** + * Executes a workflow with single task reference, wait time, and return strategy + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + String waitUntilTaskRef, + Integer waitForSeconds, + ReturnStrategy returnStrategy) { + return workflowClient.executeWorkflowWithReturnStrategy(request, waitUntilTaskRef, waitForSeconds, returnStrategy); + } + + /** + * Executes a workflow with multiple task references to wait for + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + List waitUntilTaskRef) { + return workflowClient.executeWorkflowWithReturnStrategy(request, waitUntilTaskRef); + } + + /** + * Executes a workflow with multiple task references and wait time + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + List waitUntilTaskRef, + Integer waitForSeconds) { + return workflowClient.executeWorkflowWithReturnStrategy(request, waitUntilTaskRef, waitForSeconds); + } + + /** + * Executes a workflow with multiple task references, wait time, and return strategy + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + List waitUntilTaskRef, + Integer waitForSeconds, + ReturnStrategy returnStrategy) { + return workflowClient.executeWorkflowWithReturnStrategy(request, waitUntilTaskRef, waitForSeconds, returnStrategy); + } + + /** + * Executes a workflow with consistency and return strategy + */ + public CompletableFuture executeWorkflowWithReturnStrategy( + StartWorkflowRequest request, + Consistency consistency, + ReturnStrategy returnStrategy) { + return workflowClient.executeWorkflowWithReturnStrategy(request, consistency, returnStrategy); } /** * Executes a workflow with full control over execution parameters * - * @param request workflow execution request + * @param request workflow execution request * @param waitUntilTaskRef reference name of the task to wait for - * @param waitForSeconds maximum time to wait in seconds - * @param consistency execution consistency mode - * @param returnStrategy strategy for what data to return + * @param waitForSeconds maximum time to wait in seconds + * @param consistency execution consistency mode + * @param returnStrategy strategy for what data to return * @return SignalResponse based on the return strategy */ public CompletableFuture executeWorkflowWithReturnStrategy( @@ -301,27 +366,6 @@ public CompletableFuture executeWorkflowWithReturnStrategy( Integer waitForSeconds, Consistency consistency, ReturnStrategy returnStrategy) { - - CompletableFuture future = new CompletableFuture<>(); - String requestId = UUID.randomUUID().toString(); - - executorService.submit(() -> { - try { - SignalResponse response = workflowResource.executeWorkflowWithReturnStrategy( - request, - request.getName(), - request.getVersion(), - waitUntilTaskRef, - requestId, - waitForSeconds, - consistency, - returnStrategy); - future.complete(response); - } catch (Throwable t) { - future.completeExceptionally(t); - } - }); - - return future; + return workflowClient.executeWorkflowWithReturnStrategy(request, waitUntilTaskRef, waitForSeconds, consistency, returnStrategy); } -} \ No newline at end of file +} diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowResource.java b/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowResource.java index 06073e47..8214c01f 100644 --- a/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowResource.java +++ b/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowResource.java @@ -19,12 +19,13 @@ import com.netflix.conductor.client.http.ConductorClientRequest; import com.netflix.conductor.client.http.ConductorClientRequest.Method; import com.netflix.conductor.client.http.ConductorClientResponse; +import com.netflix.conductor.common.enums.Consistency; +import com.netflix.conductor.common.enums.ReturnStrategy; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.UpgradeWorkflowRequest; +import com.netflix.conductor.common.model.SignalResponse; import com.netflix.conductor.common.run.Workflow; -import io.orkes.conductor.client.enums.Consistency; -import io.orkes.conductor.client.enums.ReturnStrategy; import io.orkes.conductor.client.model.*; import com.fasterxml.jackson.core.type.TypeReference; @@ -234,18 +235,31 @@ SignalResponse executeWorkflowWithReturnStrategy(StartWorkflowRequest req, waitUntilTaskRefStr = String.join(",", waitUntilTaskRef); } - ConductorClientRequest request = ConductorClientRequest.builder() + ConductorClientRequest.Builder requestBuilder = ConductorClientRequest.builder() .method(Method.POST) .path("/workflow/execute/{name}/{version}") .addPathParam("name", name) .addPathParam("version", version) - .addQueryParam("requestId", requestId) - .addQueryParam("waitUntilTaskRef", waitUntilTaskRefStr) - .addQueryParam("waitForSeconds", waitForSeconds) - .addQueryParam("consistency", consistency.name()) - .addQueryParam("returnStrategy", returnStrategy.name()) - .body(req) - .build(); + .body(req); + + // Only add query parameters if they are not null - let server use defaults + if (requestId != null) { + requestBuilder.addQueryParam("requestId", requestId); + } + if (waitUntilTaskRefStr != null) { + requestBuilder.addQueryParam("waitUntilTaskRef", waitUntilTaskRefStr); + } + if (waitForSeconds != null) { + requestBuilder.addQueryParam("waitForSeconds", waitForSeconds); + } + if (consistency != null) { + requestBuilder.addQueryParam("consistency", consistency.name()); + } + if (returnStrategy != null) { + requestBuilder.addQueryParam("returnStrategy", returnStrategy.name()); + } + + ConductorClientRequest request = requestBuilder.build(); ConductorClientResponse resp = client.execute(request, new TypeReference() { }); diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/model/event/kafka/KafkaConfiguration.java b/orkes-client/src/main/java/io/orkes/conductor/client/model/event/kafka/KafkaConfiguration.java new file mode 100644 index 00000000..6a32b051 --- /dev/null +++ b/orkes-client/src/main/java/io/orkes/conductor/client/model/event/kafka/KafkaConfiguration.java @@ -0,0 +1,23 @@ +/* + * Copyright 2022 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.model.event.kafka; + +import io.orkes.conductor.client.model.event.QueueConfiguration; + +public class KafkaConfiguration extends QueueConfiguration { + private static String QUEUE_NAME = "kafka"; + + public KafkaConfiguration(String queueTopicName) { + super(QUEUE_NAME, queueTopicName); + } +} diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/model/event/kafka/KafkaConsumer.java b/orkes-client/src/main/java/io/orkes/conductor/client/model/event/kafka/KafkaConsumer.java new file mode 100644 index 00000000..e89ff1ac --- /dev/null +++ b/orkes-client/src/main/java/io/orkes/conductor/client/model/event/kafka/KafkaConsumer.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.model.event.kafka; + +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import io.orkes.conductor.client.model.event.QueueWorkerConfiguration; + +public class KafkaConsumer extends QueueWorkerConfiguration { + private static final String MAX_POLL_RECORDS_CONFIG = "1000"; + + public KafkaConsumer(String bootstrapServersConfig) throws Exception { + super(ConsumerConfig.configNames()); + withConfiguration(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + withConfiguration(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS_CONFIG); + } +} diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/model/event/kafka/KafkaProducer.java b/orkes-client/src/main/java/io/orkes/conductor/client/model/event/kafka/KafkaProducer.java new file mode 100644 index 00000000..0f7035a2 --- /dev/null +++ b/orkes-client/src/main/java/io/orkes/conductor/client/model/event/kafka/KafkaProducer.java @@ -0,0 +1,24 @@ +/* + * Copyright 2022 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.model.event.kafka; + +import org.apache.kafka.clients.producer.ProducerConfig; + +import io.orkes.conductor.client.model.event.QueueWorkerConfiguration; + +public class KafkaProducer extends QueueWorkerConfiguration { + public KafkaProducer(String bootstrapServersConfig) throws Exception { + super(ProducerConfig.configNames()); + withConfiguration(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + } +} diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/spring/ApiClientAutoConfiguration.java b/orkes-client/src/main/java/io/orkes/conductor/client/spring/ApiClientAutoConfiguration.java new file mode 100644 index 00000000..a251ef7c --- /dev/null +++ b/orkes-client/src/main/java/io/orkes/conductor/client/spring/ApiClientAutoConfiguration.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.spring; + + +import org.apache.commons.lang3.StringUtils; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; + +import io.orkes.conductor.client.ApiClient; + +import lombok.extern.slf4j.Slf4j; + +@Configuration(proxyBeanMethods = false) +@Slf4j +public class ApiClientAutoConfiguration { + + public static final String CONDUCTOR_SERVER_URL = "conductor.server.url"; + public static final String CONDUCTOR_CLIENT_KEY_ID = "conductor.security.client.key-id"; + public static final String CONDUCTOR_CLIENT_SECRET = "conductor.security.client.secret"; + public static final String CONDUCTOR_GRPC_SERVER = "conductor.grpc.host"; + + public static final String CONDUCTOR_GRPC_PORT = "conductor.grpc.port"; + + public static final String CONDUCTOR_GRPC_SSL = "conductor.grpc.ssl"; + + @Bean + public ApiClient getApiClient(Environment env) { + String rootUri = env.getProperty(CONDUCTOR_SERVER_URL); + String keyId = env.getProperty(CONDUCTOR_CLIENT_KEY_ID); + String secret = env.getProperty(CONDUCTOR_CLIENT_SECRET); + if (rootUri != null && rootUri.endsWith("/")) { + rootUri = rootUri.substring(0, rootUri.length() - 1); + } + if (StringUtils.isNotBlank(keyId) && StringUtils.isNotBlank(secret)) { + ApiClient apiClient = new ApiClient(rootUri, keyId, secret); + apiClient = configureGrpc(apiClient, env); + return apiClient; + } + + ApiClient apiClient = new ApiClient(rootUri); + apiClient = configureGrpc(apiClient, env); + return apiClient; + } + + private ApiClient configureGrpc(ApiClient apiClient, Environment env) { + + String grpcHost = env.getProperty(CONDUCTOR_GRPC_SERVER); + String grpcPort = env.getProperty(CONDUCTOR_GRPC_PORT); + boolean useSSL = Boolean.parseBoolean(env.getProperty(CONDUCTOR_GRPC_SSL)); + if (StringUtils.isNotBlank(grpcHost)) { + log.info("Using gRPC for worker communication. Server {}, port {}, using SSL? {}", grpcHost, grpcPort, useSSL); + int port = Integer.parseInt(grpcPort); + apiClient.setUseGRPC(grpcHost, port); + apiClient.setUseSSL(useSSL); + } + return apiClient; + } + +} diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesAnnotatedWorkerExecutor.java b/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesAnnotatedWorkerExecutor.java new file mode 100644 index 00000000..4d88dc2b --- /dev/null +++ b/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesAnnotatedWorkerExecutor.java @@ -0,0 +1,61 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.spring; + +import com.netflix.conductor.client.automator.TaskRunnerConfigurer; +import com.netflix.conductor.client.http.TaskClient; +import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor; +import com.netflix.conductor.sdk.workflow.executor.task.WorkerConfiguration; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class OrkesAnnotatedWorkerExecutor extends AnnotatedWorkerExecutor { + + private final TaskClient taskClient; + + private TaskRunnerConfigurer taskRunner; + + public OrkesAnnotatedWorkerExecutor(TaskClient taskClient, WorkerConfiguration workerConfiguration) { + super(taskClient, workerConfiguration); + this.taskClient = taskClient; + } + + + @Override + public void shutdown() { + if (this.taskRunner != null) { + this.taskRunner.shutdown(); + } + } + + @Override + public void startPolling() { + + if (executors.isEmpty()) { + return; + } + + log.info("Starting workers with threadCount {}", workerToThreadCount); + log.info("Worker domains {}", workerDomains); + + this.taskRunner = new TaskRunnerConfigurer.Builder(this.taskClient, executors) + .withTaskThreadCount(workerToThreadCount) + .withTaskToDomain(workerDomains) + .withTaskPollTimeout(5) + .build(); + + taskRunner.init(); + + } +} diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesConductorClientAutoConfiguration.java b/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesConductorClientAutoConfiguration.java new file mode 100644 index 00000000..03b6c35c --- /dev/null +++ b/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesConductorClientAutoConfiguration.java @@ -0,0 +1,94 @@ +/* + * Copyright 2020 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.spring; + + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.netflix.conductor.client.http.EventClient; +import com.netflix.conductor.client.http.MetadataClient; +import com.netflix.conductor.client.http.TaskClient; +import com.netflix.conductor.client.http.WorkflowClient; +import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; +import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor; +import com.netflix.conductor.sdk.workflow.executor.task.WorkerConfiguration; + +import io.orkes.conductor.client.*; + +import lombok.extern.slf4j.Slf4j; + +@Configuration(proxyBeanMethods = false) +@Slf4j +public class OrkesConductorClientAutoConfiguration { + + @Bean + public TaskClient taskClient(ApiClient clients) { + return new TaskClient(clients); + } + + @Bean + public MetadataClient metadataClient(ApiClient clients) { + return new MetadataClient(clients); + } + + @Bean + public WorkflowClient workflowClient(ApiClient clients) { + return new WorkflowClient(clients); + } + + @Bean + public AuthorizationClient authorizationClient(OrkesClients clients) { + return clients.getAuthorizationClient(); + } + + @Bean + public EventClient eventClient(ApiClient clients) { + return new EventClient(clients); + } + + @Bean + public SchedulerClient schedulerClient(OrkesClients clients) { + return clients.getSchedulerClient(); + } + + @Bean + public SecretClient secretClient(OrkesClients clients) { + return clients.getSecretClient(); + } + + @Bean + public OrkesClients orkesClients(ApiClient apiClient) { + OrkesClients clients = new OrkesClients(apiClient); + return clients; + } + + @Bean + public WorkflowExecutor workflowExecutor(ApiClient apiClient, AnnotatedWorkerExecutor annotatedWorkerExecutor) { + ApiClient client = ApiClient.builder() + .useEnvVariables(true) + .build(); + return new WorkflowExecutor( + new TaskClient(client), + new WorkflowClient(client), + new MetadataClient(client), + annotatedWorkerExecutor + ); + } + + @Bean + public AnnotatedWorkerExecutor annotatedWorkerExecutor( + TaskClient taskClient, WorkerConfiguration workerConfiguration) { + return new OrkesAnnotatedWorkerExecutor(taskClient, workerConfiguration); + } +} diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesConductorWorkerAutoConfiguration.java b/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesConductorWorkerAutoConfiguration.java new file mode 100644 index 00000000..557f935e --- /dev/null +++ b/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesConductorWorkerAutoConfiguration.java @@ -0,0 +1,49 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.spring; + +import java.util.Map; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.client.http.TaskClient; +import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor; + +@Component +public class OrkesConductorWorkerAutoConfiguration implements ApplicationListener { + + private final TaskClient taskClient; + + private final AnnotatedWorkerExecutor annotatedWorkerExecutor; + + public OrkesConductorWorkerAutoConfiguration( + TaskClient taskClient, AnnotatedWorkerExecutor annotatedWorkerExecutor) { + this.taskClient = taskClient; + this.annotatedWorkerExecutor = annotatedWorkerExecutor; + } + + @Override + public void onApplicationEvent(ContextRefreshedEvent refreshedEvent) { + ApplicationContext applicationContext = refreshedEvent.getApplicationContext(); + Map beans = applicationContext.getBeansWithAnnotation(Component.class); + beans.values() + .forEach( + bean -> { + annotatedWorkerExecutor.addBean(bean); + }); + annotatedWorkerExecutor.startPolling(); + } +} diff --git a/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesSpringWorkerConfiguration.java b/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesSpringWorkerConfiguration.java new file mode 100644 index 00000000..91685575 --- /dev/null +++ b/orkes-client/src/main/java/io/orkes/conductor/client/spring/OrkesSpringWorkerConfiguration.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.spring; + +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.sdk.workflow.executor.task.WorkerConfiguration; + +@Component +public class OrkesSpringWorkerConfiguration extends WorkerConfiguration { + + private final Environment environment; + + public OrkesSpringWorkerConfiguration(Environment environment) { + this.environment = environment; + } + + @Override + public int getPollingInterval(String taskName) { + return getProperty(taskName, "pollingInterval", Integer.class, 0); + } + + @Override + public int getThreadCount(String taskName) { + return getProperty(taskName, "threadCount", Integer.class, 0); + } + + @Override + public String getDomain(String taskName) { + return getProperty(taskName, "domain", String.class, null); + } + + private T getProperty(String taskName, String property, Class type, T defaultValue) { + String key = "conductor.worker." + taskName + "." + property; + T value = environment.getProperty(key, type, defaultValue); + if(value == null || value == defaultValue) { + key = "conductor.worker.all." + property; + value = environment.getProperty(key, type, defaultValue); + } + return value; + } +} diff --git a/tests/src/test/java/io/orkes/conductor/client/ServiceRegistryClientTest.java b/tests/src/test/java/io/orkes/conductor/client/ServiceRegistryClientTest.java index 4029b0a3..2daaaa8b 100644 --- a/tests/src/test/java/io/orkes/conductor/client/ServiceRegistryClientTest.java +++ b/tests/src/test/java/io/orkes/conductor/client/ServiceRegistryClientTest.java @@ -27,8 +27,7 @@ import io.orkes.conductor.client.util.ClientTestUtil; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; public class ServiceRegistryClientTest { @@ -50,10 +49,11 @@ void setUp() { @Test public void testHttpServiceRegistry() throws InterruptedException { + String serviceUrl = "http://httpBin:8081/api-docs"; ServiceRegistry serviceRegistry = new ServiceRegistry(); serviceRegistry.setName(HTTP_SERVICE_NAME); serviceRegistry.setType(ServiceRegistry.Type.HTTP); - serviceRegistry.setServiceURI("https://petstore.swagger.io/v2/swagger.json"); + serviceRegistry.setServiceURI(serviceUrl); client.addOrUpdateService(serviceRegistry); client.discover(HTTP_SERVICE_NAME, true); @@ -64,10 +64,18 @@ public void testHttpServiceRegistry() throws InterruptedException { .findFirst() .orElseThrow(() -> new NoSuchElementException("No http service found with name: " + HTTP_SERVICE_NAME)); - assertEquals(actualService.getName(), HTTP_SERVICE_NAME); - assertEquals(actualService.getType(), ServiceRegistry.Type.HTTP); - assertEquals(actualService.getServiceURI(), "https://petstore.swagger.io/v2/swagger.json"); + assertEquals(HTTP_SERVICE_NAME, actualService.getName()); + assertEquals(ServiceRegistry.Type.HTTP, actualService.getType()); + assertEquals(serviceUrl, actualService.getServiceURI()); assertTrue(actualService.getMethods().size() > 0); + assertFalse(actualService.isCircuitBreakerEnabled()); + + // Enabled CB for Service registry + serviceRegistry.setCircuitBreakerEnabled(true); + client.addOrUpdateService(serviceRegistry); + + actualService = client.getService(HTTP_SERVICE_NAME); + assertTrue(actualService.isCircuitBreakerEnabled()); int size = actualService.getMethods().size(); @@ -126,7 +134,7 @@ void testGrpcService() throws IOException { client.addOrUpdateServiceMethod(GRPC_SERVICE_NAME, method); actualService = client.getService(GRPC_SERVICE_NAME); assertEquals(size + 1, actualService.getMethods().size()); - + byte[] binaryData; try (InputStream inputStream = getClass().getResourceAsStream("/compiled.bin")) { binaryData = inputStream.readAllBytes(); @@ -147,6 +155,17 @@ void testGrpcService() throws IOException { assertEquals(actualConfig.getSlowCallRateThreshold(), 50); assertEquals(actualConfig.getMaxWaitDurationInHalfOpenState(), 1); + client.getAllProtos(GRPC_SERVICE_NAME) + .forEach(proto -> assertEquals(PROTO_FILENAME, proto.getFilename())); + + byte[] protoData = client.getProtoData(GRPC_SERVICE_NAME, PROTO_FILENAME); + assertEquals(binaryData.length, protoData.length); + assertNotNull(protoData); + + client.deleteProto(GRPC_SERVICE_NAME, PROTO_FILENAME); + // check if proto deleted successfully + client.getAllProtos(GRPC_SERVICE_NAME) + .forEach(proto -> assertNotEquals(PROTO_FILENAME, proto.getFilename())); client.removeService(GRPC_SERVICE_NAME); } } diff --git a/tests/src/test/java/io/orkes/conductor/client/http/TaskClientTests.java b/tests/src/test/java/io/orkes/conductor/client/http/TaskClientTests.java index 1c7f3c78..40e5810d 100644 --- a/tests/src/test/java/io/orkes/conductor/client/http/TaskClientTests.java +++ b/tests/src/test/java/io/orkes/conductor/client/http/TaskClientTests.java @@ -30,20 +30,20 @@ import com.netflix.conductor.client.exception.ConductorClientException; import com.netflix.conductor.common.config.ObjectMapperProvider; +import com.netflix.conductor.common.enums.Consistency; +import com.netflix.conductor.common.enums.ReturnStrategy; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.tasks.TaskExecLog; import com.netflix.conductor.common.metadata.tasks.TaskResult; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.model.SignalResponse; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.sdk.workflow.def.ConductorWorkflow; import com.netflix.conductor.sdk.workflow.def.tasks.SimpleTask; import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; -import io.orkes.conductor.client.enums.Consistency; -import io.orkes.conductor.client.enums.ReturnStrategy; -import io.orkes.conductor.client.model.SignalResponse; import io.orkes.conductor.client.util.ClientTestUtil; import io.orkes.conductor.client.util.TestUtil; @@ -125,7 +125,7 @@ public void testUpdateByRefName() { try { taskClient.updateTaskSync(workflowId, referenceName, TaskResult.Status.COMPLETED, Map.of("k", "value")); } catch (ConductorClientException cce) { - if(cce.getStatusCode() != 404) { + if (cce.getStatusCode() != 404) { throw cce; } } diff --git a/versions.gradle b/versions.gradle index 519ae11d..26994edb 100644 --- a/versions.gradle +++ b/versions.gradle @@ -1,6 +1,7 @@ ext { versions = [ okHttp : '4.12.0', + kafka : '3.6.1', guava : '32.1.2-jre', jackson : '2.17.1', archaius : '0.7.12',