Skip to content

Commit b8ac8f8

Browse files
author
Shailesh Jagannath Padave
committed
moved logic to conductor-client
1 parent 2318b4e commit b8ac8f8

File tree

10 files changed

+390
-131
lines changed

10 files changed

+390
-131
lines changed

conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@
3434
import com.netflix.conductor.client.exception.ConductorClientException;
3535
import com.netflix.conductor.client.http.ConductorClientRequest.Method;
3636
import com.netflix.conductor.common.config.ObjectMapperProvider;
37+
import com.netflix.conductor.common.enums.ReturnStrategy;
3738
import com.netflix.conductor.common.metadata.tasks.PollData;
3839
import com.netflix.conductor.common.metadata.tasks.Task;
3940
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
4041
import com.netflix.conductor.common.metadata.tasks.TaskResult;
42+
import com.netflix.conductor.common.model.SignalResponse;
4143
import com.netflix.conductor.common.run.ExternalStorageLocation;
4244
import com.netflix.conductor.common.run.SearchResult;
4345
import com.netflix.conductor.common.run.TaskSummary;
@@ -232,6 +234,7 @@ public Optional<String> evaluateAndUploadLargePayload(Map<String, Object> taskOu
232234
throw new ConductorClientException(e);
233235
}
234236
}
237+
235238
/**
236239
* Ack for the task poll.
237240
*
@@ -312,6 +315,67 @@ public Task getTaskDetails(String taskId) {
312315
return resp.getData();
313316
}
314317

318+
/**
319+
* Signals a task with default return strategy (TARGET_WORKFLOW)
320+
*
321+
* @param workflowId Workflow Id of the workflow to be signaled
322+
* @param status Signal status to be set for the workflow
323+
* @param output Output for the task
324+
* @return SignalResponse with target workflow details
325+
*/
326+
public SignalResponse signal(String workflowId, Task.Status status, Map<String, Object> output) {
327+
return signal(workflowId, status, output, ReturnStrategy.TARGET_WORKFLOW);
328+
}
329+
330+
/**
331+
* Signals a task in a workflow synchronously and returns data based on the specified return strategy.
332+
*
333+
* @param workflowId Workflow Id of the workflow to be signaled
334+
* @param status Signal status to be set for the workflow
335+
* @param output Output for the task
336+
* @param returnStrategy Strategy for what data to return
337+
* @return SignalResponse with data based on the return strategy
338+
*/
339+
public SignalResponse signal(String workflowId, Task.Status status, Map<String, Object> output, ReturnStrategy returnStrategy) {
340+
Validate.notBlank(workflowId, "Workflow id cannot be blank");
341+
Validate.notNull(status, "Status cannot be null");
342+
343+
ConductorClientRequest request = ConductorClientRequest.builder()
344+
.method(Method.POST)
345+
.path("/tasks/{workflowId}/{status}/signal/sync")
346+
.addPathParam("workflowId", workflowId)
347+
.addPathParam("status", status.name())
348+
.addQueryParam("returnStrategy", returnStrategy.name())
349+
.body(output)
350+
.build();
351+
352+
ConductorClientResponse<SignalResponse> resp = client.execute(request, new TypeReference<>() {
353+
});
354+
return resp.getData();
355+
}
356+
357+
/**
358+
* Signals a task in a workflow asynchronously.
359+
*
360+
* @param workflowId Workflow Id of the workflow to be signaled
361+
* @param status Signal status to be set for the workflow
362+
* @param output Output for the task
363+
*/
364+
public void signalAsync(String workflowId, Task.Status status, Map<String, Object> output) {
365+
Validate.notBlank(workflowId, "Workflow id cannot be blank");
366+
Validate.notNull(status, "Status cannot be null");
367+
368+
ConductorClientRequest request = ConductorClientRequest.builder()
369+
.method(Method.POST)
370+
.path("/tasks/{workflowId}/{status}/signal")
371+
.addPathParam("workflowId", workflowId)
372+
.addPathParam("status", status.name())
373+
.body(output)
374+
.build();
375+
376+
client.execute(request);
377+
}
378+
315379
/**
316380
* Removes a task from a taskType queue
317381
*
@@ -422,6 +486,7 @@ public String requeuePendingTasksByTaskType(String taskType) {
422486

423487
return resp.getData();
424488
}
489+
425490
/**
426491
* Search for tasks based on payload
427492
*

0 commit comments

Comments
 (0)