Skip to content

Commit 3235330

Browse files
committed
[Fix #1087] A2A implementation
Signed-off-by: fjtirado <ftirados@ibm.com>
1 parent 3b5f7d3 commit 3235330

20 files changed

Lines changed: 938 additions & 6 deletions

impl/a2a/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<parent>
4+
<groupId>io.serverlessworkflow</groupId>
5+
<artifactId>serverlessworkflow-impl</artifactId>
6+
<version>8.0.0-SNAPSHOT</version>
7+
</parent>
8+
<artifactId>serverlessworkflow-impl-a2a</artifactId>
9+
<name>Serverless Workflow :: Impl :: A2A</name>
10+
<dependencies>
11+
<dependency>
12+
<groupId>io.serverlessworkflow</groupId>
13+
<artifactId>serverlessworkflow-impl-core</artifactId>
14+
</dependency>
15+
<dependency>
16+
<groupId>org.a2aproject.sdk</groupId>
17+
<artifactId>a2a-java-sdk-client</artifactId>
18+
</dependency>
19+
<dependency>
20+
<groupId>org.junit.jupiter</groupId>
21+
<artifactId>junit-jupiter-engine</artifactId>
22+
<scope>test</scope>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.junit.jupiter</groupId>
26+
<artifactId>junit-jupiter-params</artifactId>
27+
<scope>test</scope>
28+
</dependency>
29+
</dependencies>
30+
</project>
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.impl.WorkflowModel;
19+
import io.serverlessworkflow.impl.WorkflowPosition;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.function.Consumer;
22+
23+
class A2AExceptionHandler implements Consumer<Throwable> {
24+
25+
private final CompletableFuture<WorkflowModel> future;
26+
private final WorkflowPosition position;
27+
28+
A2AExceptionHandler(CompletableFuture<WorkflowModel> future, WorkflowPosition position) {
29+
this.future = future;
30+
this.position = position;
31+
}
32+
33+
@Override
34+
public void accept(Throwable ex) {
35+
future.completeExceptionally(A2AUtils.workflowException(position, ex));
36+
}
37+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.impl.TaskContext;
19+
import io.serverlessworkflow.impl.WorkflowContext;
20+
import io.serverlessworkflow.impl.WorkflowModel;
21+
import io.serverlessworkflow.impl.WorkflowValueResolver;
22+
import io.serverlessworkflow.impl.executors.CallableTask;
23+
import java.net.URI;
24+
import java.util.Map;
25+
import java.util.concurrent.CompletableFuture;
26+
import org.a2aproject.sdk.client.Client;
27+
import org.a2aproject.sdk.client.config.ClientConfig;
28+
import org.a2aproject.sdk.client.http.A2ACardResolver;
29+
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport;
30+
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfig;
31+
import org.a2aproject.sdk.spec.A2AClientException;
32+
33+
class A2AExecutor implements CallableTask {
34+
35+
private final WorkflowValueResolver<URI> uriSupplier;
36+
private final A2ARequestDispatcher dispatcher;
37+
private final WorkflowValueResolver<Map<String, Object>> mapResolver;
38+
39+
public A2AExecutor(
40+
WorkflowValueResolver<URI> uriSupplier,
41+
A2ARequestDispatcher dispatcher,
42+
WorkflowValueResolver<Map<String, Object>> mapResolver) {
43+
this.uriSupplier = uriSupplier;
44+
this.dispatcher = dispatcher;
45+
this.mapResolver = mapResolver;
46+
}
47+
48+
@Override
49+
public CompletableFuture<WorkflowModel> apply(
50+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
51+
URI uri = uriSupplier.apply(workflowContext, taskContext, input);
52+
try {
53+
return dispatcher.apply(
54+
Client.builder(
55+
A2ACardResolver.builder()
56+
.baseUrl(uri.resolve("/").toString())
57+
.agentCardPath(uri.getPath())
58+
.build()
59+
.getAgentCard())
60+
.clientConfig(new ClientConfig.Builder().build())
61+
.withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig())
62+
.build(),
63+
mapResolver.apply(workflowContext, taskContext, input),
64+
workflowContext,
65+
taskContext);
66+
} catch (A2AClientException ex) {
67+
throw A2AUtils.workflowException(taskContext.position(), ex);
68+
}
69+
}
70+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.api.types.A2AArguments;
19+
import io.serverlessworkflow.api.types.CallA2A;
20+
import io.serverlessworkflow.api.types.TaskBase;
21+
import io.serverlessworkflow.impl.TaskContext;
22+
import io.serverlessworkflow.impl.WorkflowContext;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
24+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
25+
import io.serverlessworkflow.impl.WorkflowUtils;
26+
import io.serverlessworkflow.impl.WorkflowValueResolver;
27+
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
28+
import io.serverlessworkflow.impl.executors.CallableTaskFactory;
29+
import java.net.URI;
30+
31+
public class A2AExecutorBuilder implements CallableTaskBuilder<CallA2A> {
32+
33+
@Override
34+
public boolean accept(Class<? extends TaskBase> clazz) {
35+
return CallA2A.class.equals(clazz);
36+
}
37+
38+
@Override
39+
public CallableTaskFactory init(
40+
CallA2A task, WorkflowDefinition definition, WorkflowMutablePosition position) {
41+
A2AArguments args = task.getWith();
42+
43+
WorkflowValueResolver<URI> uriSupplier;
44+
if (args.getServer() != null) {
45+
uriSupplier = definition.resourceLoader().uriSupplier(args.getServer());
46+
} else if (args.getAgentCard() != null) {
47+
uriSupplier = definition.resourceLoader().uriSupplier(args.getAgentCard().getEndpoint());
48+
} else {
49+
throw new IllegalArgumentException("Neither server nor agent card is set for task:" + task);
50+
}
51+
52+
A2ARequestDispatcher dispatcher =
53+
switch (args.getMethod()) {
54+
case MESSAGE_SEND ->
55+
new MessageDispatcher(
56+
new MessageConsumerFactory() {
57+
@Override
58+
protected MessageConsumer buildConsumer(
59+
WorkflowContext workflowContext, TaskContext taskContext) {
60+
return new MessageSendConsumer(
61+
workflowContext.definition(), completableFuture);
62+
}
63+
});
64+
case MESSAGE_STREAM ->
65+
new MessageDispatcher(
66+
new MessageConsumerFactory() {
67+
68+
@Override
69+
protected MessageConsumer buildConsumer(
70+
WorkflowContext workflowContext, TaskContext taskContext) {
71+
return new MessageStreamConsumer(
72+
workflowContext.definition(), completableFuture, taskContext.position());
73+
}
74+
});
75+
case TASKS_LIST -> new ListTaskParamsDispatcher();
76+
case TASKS_GET -> new GetTaskParamsDispatcher();
77+
case TASKS_CANCEL -> new CancelTaskParamsDispatcher();
78+
// TODO handle missing cases
79+
case AGENT_GET_AUTHENTICATED_EXTENDED_CARD,
80+
TASKS_PUSH_NOTIFICATION_CONFIG_DELETE,
81+
TASKS_PUSH_NOTIFICATION_CONFIG_GET,
82+
TASKS_PUSH_NOTIFICATION_CONFIG_LIST,
83+
TASKS_PUSH_NOTIFICATION_CONFIG_SET,
84+
TASKS_RESUBSCRIBE ->
85+
throw new UnsupportedOperationException("Unimplemented case: " + args.getMethod());
86+
};
87+
88+
return () ->
89+
new A2AExecutor(
90+
uriSupplier,
91+
dispatcher,
92+
WorkflowUtils.buildMapResolver(
93+
definition.application(),
94+
args.getParameters().getString(),
95+
args.getParameters().getWithA2AParameters().getAdditionalProperties()));
96+
}
97+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.impl.TaskContext;
19+
import io.serverlessworkflow.impl.WorkflowContext;
20+
import io.serverlessworkflow.impl.WorkflowModel;
21+
import java.util.Map;
22+
import java.util.concurrent.CompletableFuture;
23+
import org.a2aproject.sdk.client.Client;
24+
25+
@FunctionalInterface
26+
interface A2ARequestDispatcher {
27+
CompletableFuture<WorkflowModel> apply(
28+
Client client,
29+
Map<String, Object> parameters,
30+
WorkflowContext workflowContext,
31+
TaskContext taskContext);
32+
}

0 commit comments

Comments
 (0)