From 7a6b6f39251d209751efa579b468644a23e706c5 Mon Sep 17 00:00:00 2001 From: Emmanuel Hugonnet Date: Fri, 7 Nov 2025 11:55:01 +0100 Subject: [PATCH] WIP Signed-off-by: Emmanuel Hugonnet --- .../main/java/io/a2a/client/MessageEvent.java | 17 +- examples/helloworld/client/pom.xml | 2 +- .../examples/helloworld/HelloWorldClient.java | 9 +- examples/helloworld/server/pom.xml | 7 +- .../helloworld/AgentExecutorProducer.java | 5 +- extensions/timestamp/pom.xml | 40 ++ .../TimeStampAgentExecutorWrapper.java | 51 ++ .../timestamp/TimeStampEventQueue.java | 133 ++++++ .../timestamp/TimeStampEventQueueTest.java | 437 ++++++++++++++++++ pom.xml | 1 + .../java/io/a2a/server/events/EventQueue.java | 12 +- spec/src/main/java/io/a2a/spec/Message.java | 1 - .../io/a2a/spec/TaskStatusUpdateEvent.java | 1 + 13 files changed, 700 insertions(+), 16 deletions(-) create mode 100644 extensions/timestamp/pom.xml create mode 100644 extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampAgentExecutorWrapper.java create mode 100644 extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampEventQueue.java create mode 100644 extensions/timestamp/src/test/java/io/a2a/extension/timestamp/TimeStampEventQueueTest.java diff --git a/client/base/src/main/java/io/a2a/client/MessageEvent.java b/client/base/src/main/java/io/a2a/client/MessageEvent.java index b5970ab78..94c8a1058 100644 --- a/client/base/src/main/java/io/a2a/client/MessageEvent.java +++ b/client/base/src/main/java/io/a2a/client/MessageEvent.java @@ -21,6 +21,19 @@ public MessageEvent(Message message) { public Message getMessage() { return message; } -} - + @Override + public String toString() { + String messageAsString = "{" + + "role=" + message.getRole() + + ", parts=" + message.getParts() + + ", messageId=" + message.getMessageId() + + ", contextId=" + message.getContextId() + + ", taskId=" + message.getTaskId() + + ", metadata=" + message.getMetadata() + + ", kind=" + message.getKind() + + ", referenceTaskIds=" + message.getReferenceTaskIds() + + ", extensions=" + message.getExtensions() + '}'; + return "MessageEvent{" + "message=" + messageAsString + '}'; + } +} diff --git a/examples/helloworld/client/pom.xml b/examples/helloworld/client/pom.xml index 8f5b63406..eb1b747be 100644 --- a/examples/helloworld/client/pom.xml +++ b/examples/helloworld/client/pom.xml @@ -12,7 +12,7 @@ a2a-java-sdk-examples-client - Java SDK A2A Examples + Java SDK A2A HelloWorld Example - Client Examples for the Java SDK for the Agent2Agent Protocol (A2A) diff --git a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java index a82438a35..5922624fd 100644 --- a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java +++ b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java @@ -12,16 +12,18 @@ import io.a2a.A2A; import io.a2a.client.Client; -import io.a2a.client.ClientBuilder; import io.a2a.client.ClientEvent; import io.a2a.client.MessageEvent; import io.a2a.client.http.A2ACardResolver; import io.a2a.client.transport.jsonrpc.JSONRPCTransport; import io.a2a.client.transport.jsonrpc.JSONRPCTransportConfig; +import io.a2a.client.transport.spi.interceptors.ClientCallContext; +import io.a2a.common.A2AHeaders; import io.a2a.spec.AgentCard; import io.a2a.spec.Message; import io.a2a.spec.Part; import io.a2a.spec.TextPart; +import java.util.Collections; /** * A simple example of using the A2A Java SDK to communicate with an A2A server. @@ -61,6 +63,7 @@ public static void main(String[] args) { List> consumers = new ArrayList<>(); consumers.add((event, agentCard) -> { if (event instanceof MessageEvent messageEvent) { + System.out.println("Received client MessageEvent: " + messageEvent); Message responseMessage = messageEvent.getMessage(); StringBuilder textBuilder = new StringBuilder(); if (responseMessage.getParts() != null) { @@ -89,11 +92,11 @@ public static void main(String[] args) { .streamingErrorHandler(streamingErrorHandler) .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig()) .build(); + ClientCallContext clientContext = new ClientCallContext(Collections.emptyMap(), Map.of(A2AHeaders.X_A2A_EXTENSIONS, "https://github.com/a2aproject/a2a-samples/extensions/timestamp/v1")); Message message = A2A.toUserMessage(MESSAGE_TEXT); // the message ID will be automatically generated for you - System.out.println("Sending message: " + MESSAGE_TEXT); - client.sendMessage(message); + client.sendMessage(message, clientContext); System.out.println("Message sent successfully. Responses will be handled by the configured consumers."); try { diff --git a/examples/helloworld/server/pom.xml b/examples/helloworld/server/pom.xml index 5c660336f..92792fb7f 100644 --- a/examples/helloworld/server/pom.xml +++ b/examples/helloworld/server/pom.xml @@ -12,7 +12,7 @@ a2a-java-sdk-examples-server - Java SDK A2A Examples + Java SDK A2A HelloWorld Example - Server Examples for the Java SDK for the Agent2Agent Protocol (A2A) @@ -20,6 +20,11 @@ io.github.a2asdk a2a-java-sdk-client + + io.github.a2asdk + a2a-java-timestamp-extension + ${project.version} + io.github.a2asdk a2a-java-sdk-reference-jsonrpc diff --git a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java index 1d7519b60..dcc246580 100644 --- a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java +++ b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java @@ -7,6 +7,7 @@ import io.a2a.server.agentexecution.RequestContext; import io.a2a.server.events.EventQueue; import io.a2a.A2A; +import io.a2a.extension.timestamp.TimeStampAgentExecutorWrapper; import io.a2a.spec.JSONRPCError; import io.a2a.spec.UnsupportedOperationError; @@ -15,7 +16,7 @@ public class AgentExecutorProducer { @Produces public AgentExecutor agentExecutor() { - return new AgentExecutor() { + return new TimeStampAgentExecutorWrapper(new AgentExecutor() { @Override public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError { eventQueue.enqueueEvent(A2A.toAgentMessage("Hello World")); @@ -25,6 +26,6 @@ public void execute(RequestContext context, EventQueue eventQueue) throws JSONRP public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPCError { throw new UnsupportedOperationError(); } - }; + }); } } diff --git a/extensions/timestamp/pom.xml b/extensions/timestamp/pom.xml new file mode 100644 index 000000000..0ee8f4a9c --- /dev/null +++ b/extensions/timestamp/pom.xml @@ -0,0 +1,40 @@ + + + 4.0.0 + + + io.github.a2asdk + a2a-java-sdk-parent + 0.4.0.Alpha1-SNAPSHOT + ../../pom.xml + + + a2a-java-timestamp-extension + A2A Java SDK :: Extension :: Timestamp + Simple Timestamp Extension + + + + io.github.a2asdk + a2a-java-sdk-server-common + + + + org.junit.jupiter + junit-jupiter + test + + + org.mockito + mockito-core + test + + + org.mockito + mockito-junit-jupiter + test + + + diff --git a/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampAgentExecutorWrapper.java b/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampAgentExecutorWrapper.java new file mode 100644 index 000000000..ccee969ca --- /dev/null +++ b/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampAgentExecutorWrapper.java @@ -0,0 +1,51 @@ +package io.a2a.extension.timestamp; + +import io.a2a.server.agentexecution.AgentExecutor; +import io.a2a.server.agentexecution.RequestContext; +import io.a2a.server.events.EventQueue; +import io.a2a.spec.JSONRPCError; +import java.util.logging.Logger; + +public class TimeStampAgentExecutorWrapper implements AgentExecutor { + + public static final String CORE_PATH = "github.com/a2aproject/a2a-samples/extensions/timestamp/v1"; + public static final String URI = "https://" + CORE_PATH; + public static final String TIMESTAMP_FIELD = CORE_PATH + "/timestamp"; + + private static final Logger logger = Logger.getLogger(TimeStampAgentExecutorWrapper.class.getName()); + private final AgentExecutor delegate; + + public TimeStampAgentExecutorWrapper(AgentExecutor delegate) { + this.delegate = delegate; + } + + @Override + public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError { + if(isActivated(context)) { + delegate.execute(context, new TimeStampEventQueue(eventQueue)); + } else { + delegate.execute(context, eventQueue); + } + } + + @Override + public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPCError { + if(isActivated(context)) { + delegate.cancel(context, new TimeStampEventQueue(eventQueue)); + } else { + delegate.cancel(context, eventQueue); + } + } + + private boolean isActivated(final RequestContext context) { + if (context.getCallContext().isExtensionActivated(URI)) { + return true; + } + if (context.getCallContext().isExtensionRequested(URI)) { + logger.info("Activated extension: " + URI); + context.getCallContext().activateExtension(URI); + return true; + } + return false; + } +} diff --git a/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampEventQueue.java b/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampEventQueue.java new file mode 100644 index 000000000..e02a4ca1f --- /dev/null +++ b/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampEventQueue.java @@ -0,0 +1,133 @@ +package io.a2a.extension.timestamp; + +import static io.a2a.extension.timestamp.TimeStampAgentExecutorWrapper.TIMESTAMP_FIELD; +import static io.a2a.extension.timestamp.TimeStampAgentExecutorWrapper.URI; + +import io.a2a.server.events.EventQueue; +import io.a2a.spec.Artifact; +import io.a2a.spec.Event; +import io.a2a.spec.Message; +import io.a2a.spec.Task; +import io.a2a.spec.TaskArtifactUpdateEvent; +import io.a2a.spec.TaskStatusUpdateEvent; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TimeStampEventQueue extends EventQueue { + + private final EventQueue delegate; + + public TimeStampEventQueue(EventQueue delegate) { + this.delegate = delegate; + } + + @Override + public void enqueueEvent(Event event) { + this.delegate.enqueueEvent(timestampEvent(event)); + } + + private Event timestampEvent(Event event) { + if (event instanceof Message message) { + return processMessage(message); + } + if (event instanceof TaskArtifactUpdateEvent taskArtifactUpdateEvent) { + return processTaskArtifactUpdateEvent(taskArtifactUpdateEvent); + } + if (event instanceof TaskStatusUpdateEvent taskStatusUpdateEvent) { + return processTaskStatusUpdateEvent(taskStatusUpdateEvent); + } + if (event instanceof Task task) { + return processTask(task); + } + return event; + } + + private Message processMessage(Message message) { + Map metadata = message.getMetadata() == null ? new HashMap<>() : new HashMap<>(message.getMetadata()); + if (!metadata.containsKey(TIMESTAMP_FIELD)) { + metadata.put(TIMESTAMP_FIELD, OffsetDateTime.now(ZoneOffset.UTC)); + } + List extensions = message.getExtensions() == null ? new ArrayList<>() : new ArrayList<>(message.getExtensions()); + if (!extensions.contains(URI)) { + extensions.add(URI); + } + return new Message.Builder(message).metadata(metadata).extensions(extensions).build(); + } + + private Task processTask(Task task) { + Map metadata = task.getMetadata() == null ? new HashMap<>() : new HashMap<>(task.getMetadata()); + if (!metadata.containsKey(TIMESTAMP_FIELD)) { + metadata.put(TIMESTAMP_FIELD, OffsetDateTime.now(ZoneOffset.UTC)); + } + List artifacts = new ArrayList<>(); + for (Artifact artifact : task.getArtifacts()) { + artifacts.add(processArtifact(artifact)); + } + return new Task.Builder(task).artifacts(artifacts).metadata(metadata).build(); + } + + private TaskStatusUpdateEvent processTaskStatusUpdateEvent(TaskStatusUpdateEvent taskStatusUpdateEvent) { + Map metadata = taskStatusUpdateEvent.getMetadata() == null ? new HashMap<>() : new HashMap<>(taskStatusUpdateEvent.getMetadata()); + if (!metadata.containsKey(TIMESTAMP_FIELD)) { + metadata.put(TIMESTAMP_FIELD, OffsetDateTime.now(ZoneOffset.UTC)); + } + return new TaskStatusUpdateEvent.Builder(taskStatusUpdateEvent).metadata(metadata).build(); + } + + private TaskArtifactUpdateEvent processTaskArtifactUpdateEvent(TaskArtifactUpdateEvent taskArtifactUpdateEvent) { + Map metadata = taskArtifactUpdateEvent.getMetadata() == null ? new HashMap<>() : new HashMap<>(taskArtifactUpdateEvent.getMetadata()); + if (!metadata.containsKey(TIMESTAMP_FIELD)) { + metadata.put(TIMESTAMP_FIELD, OffsetDateTime.now(ZoneOffset.UTC)); + } + if (taskArtifactUpdateEvent.getArtifact() != null) { + return new TaskArtifactUpdateEvent.Builder(taskArtifactUpdateEvent).artifact(processArtifact(taskArtifactUpdateEvent.getArtifact())).metadata(metadata).build(); + } + return new TaskArtifactUpdateEvent.Builder(taskArtifactUpdateEvent).metadata(metadata).build(); + } + + private Artifact processArtifact(Artifact artifact) { + Map metadata = artifact.metadata() == null ? new HashMap<>() : new HashMap<>(artifact.metadata()); + if (!metadata.containsKey(TIMESTAMP_FIELD)) { + metadata.put(TIMESTAMP_FIELD, OffsetDateTime.now(ZoneOffset.UTC)); + } + List extensions = artifact.extensions() == null ? new ArrayList<>() : new ArrayList<>(artifact.extensions()); + if (!extensions.contains(URI)) { + extensions.add(URI); + } + return new Artifact.Builder(artifact).metadata(metadata).extensions(extensions).build(); + } + + @Override + public void awaitQueuePollerStart() throws InterruptedException { + this.delegate.awaitQueuePollerStart(); + } + + @Override + public void close() { + this.delegate.close(); + } + + @Override + public void close(boolean immediate) { + this.delegate.close(immediate); + } + + @Override + public void close(boolean immediate, boolean notifyParent) { + this.delegate.close(immediate, notifyParent); + } + + @Override + public void signalQueuePollerStarted() { + this.delegate.signalQueuePollerStarted(); + } + + @Override + public EventQueue tap() { + return this; + } +} diff --git a/extensions/timestamp/src/test/java/io/a2a/extension/timestamp/TimeStampEventQueueTest.java b/extensions/timestamp/src/test/java/io/a2a/extension/timestamp/TimeStampEventQueueTest.java new file mode 100644 index 000000000..4bbb1536f --- /dev/null +++ b/extensions/timestamp/src/test/java/io/a2a/extension/timestamp/TimeStampEventQueueTest.java @@ -0,0 +1,437 @@ +package io.a2a.extension.timestamp; + +import static io.a2a.extension.timestamp.TimeStampAgentExecutorWrapper.TIMESTAMP_FIELD; +import static io.a2a.extension.timestamp.TimeStampAgentExecutorWrapper.URI; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.a2a.server.events.EventQueue; +import io.a2a.spec.Artifact; +import io.a2a.spec.Event; +import io.a2a.spec.Message; +import io.a2a.spec.Message.Role; +import io.a2a.spec.Task; +import io.a2a.spec.TaskArtifactUpdateEvent; +import io.a2a.spec.TaskState; +import io.a2a.spec.TaskStatus; +import io.a2a.spec.TaskStatusUpdateEvent; +import io.a2a.spec.TextPart; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +public class TimeStampEventQueueTest { + + private EventQueue delegateQueue; + private TimeStampEventQueue timestampQueue; + + @BeforeEach + public void setUp() { + delegateQueue = mock(EventQueue.class); + timestampQueue = new TimeStampEventQueue(delegateQueue); + } + + @Test + public void testEnqueueEvent_delegatesEvent() { + Event event = mock(Event.class); + + timestampQueue.enqueueEvent(event); + + verify(delegateQueue).enqueueEvent(any(Event.class)); + } + + @Test + public void testProcessMessage_withNullMetadataAndExtensions() { + Message message = new Message.Builder() + .role(Role.USER) + .parts(List.of(new TextPart("test message"))) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(message); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + Message processedMessage = (Message) eventCaptor.getValue(); + + assertNotNull(processedMessage.getMetadata()); + assertTrue(processedMessage.getMetadata().containsKey(TIMESTAMP_FIELD)); + assertNotNull(processedMessage.getMetadata().get(TIMESTAMP_FIELD)); + assertTrue(processedMessage.getMetadata().get(TIMESTAMP_FIELD) instanceof OffsetDateTime); + + assertNotNull(processedMessage.getExtensions()); + assertTrue(processedMessage.getExtensions().contains(URI)); + } + + @Test + public void testProcessMessage_withEmptyMetadata() { + Map metadata = new HashMap<>(); + List extensions = new ArrayList<>(); + + Message message = new Message.Builder() + .role(Role.USER) + .parts(List.of(new TextPart("test message"))) + .metadata(metadata) + .extensions(extensions) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(message); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + Message processedMessage = (Message) eventCaptor.getValue(); + + assertTrue(processedMessage.getMetadata().containsKey(TIMESTAMP_FIELD)); + assertTrue(processedMessage.getExtensions().contains(URI)); + } + + @Test + public void testProcessMessage_withExistingMetadata() { + Map metadata = new HashMap<>(); + metadata.put("existing", "value"); + List extensions = new ArrayList<>(); + extensions.add("existing-extension"); + + Message message = new Message.Builder() + .role(Role.USER) + .parts(List.of(new TextPart("test message"))) + .metadata(metadata) + .extensions(extensions) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(message); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + Message processedMessage = (Message) eventCaptor.getValue(); + + assertTrue(processedMessage.getMetadata().containsKey(TIMESTAMP_FIELD)); + assertTrue(processedMessage.getMetadata().containsKey("existing")); + assertTrue(processedMessage.getExtensions().contains(URI)); + assertTrue(processedMessage.getExtensions().contains("existing-extension")); + } + + @Test + public void testProcessMessage_withExistingTimestamp() { + OffsetDateTime existingTimestamp = OffsetDateTime.now(); + Map metadata = new HashMap<>(); + metadata.put(TIMESTAMP_FIELD, existingTimestamp); + List extensions = new ArrayList<>(); + extensions.add(URI); + + Message message = new Message.Builder() + .role(Role.USER) + .parts(List.of(new TextPart("test message"))) + .metadata(metadata) + .extensions(extensions) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(message); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + Message processedMessage = (Message) eventCaptor.getValue(); + + assertEquals(existingTimestamp, processedMessage.getMetadata().get(TIMESTAMP_FIELD)); + } + + @Test + public void testProcessTask_withNullMetadata() { + Task task = new Task.Builder() + .id("test task") + .contextId("context-id") + .status(new TaskStatus(TaskState.COMPLETED)) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(task); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + Task processedTask = (Task) eventCaptor.getValue(); + + assertNotNull(processedTask.getMetadata()); + assertTrue(processedTask.getMetadata().containsKey(TIMESTAMP_FIELD)); + assertNotNull(processedTask.getMetadata().get(TIMESTAMP_FIELD)); + } + + @Test + public void testProcessTask_withExistingMetadata() { + Map metadata = new HashMap<>(); + metadata.put("existing", "value"); + + Task task = new Task.Builder() + .id("test task") + .contextId("context-id") + .metadata(metadata) + .status(new TaskStatus(TaskState.COMPLETED)) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(task); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + Task processedTask = (Task) eventCaptor.getValue(); + + assertTrue(processedTask.getMetadata().containsKey(TIMESTAMP_FIELD)); + assertTrue(processedTask.getMetadata().containsKey("existing")); + } + + @Test + public void testProcessTask_withExistingTimestamp() { + OffsetDateTime existingTimestamp = OffsetDateTime.now(); + Map metadata = new HashMap<>(); + metadata.put(TIMESTAMP_FIELD, existingTimestamp); + + Task task = new Task.Builder() + .id("test task") + .contextId("context-id") + .metadata(metadata) + .status(new TaskStatus(TaskState.COMPLETED)) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(task); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + Task processedTask = (Task) eventCaptor.getValue(); + + assertEquals(existingTimestamp, processedTask.getMetadata().get(TIMESTAMP_FIELD)); + } + + @Test + public void testProcessTaskStatusUpdateEvent_withNullMetadata() { + TaskStatusUpdateEvent event = new TaskStatusUpdateEvent.Builder() + .taskId("task-1") + .contextId("context-1") + .status(new TaskStatus(TaskState.COMPLETED)) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(event); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + TaskStatusUpdateEvent processedEvent = (TaskStatusUpdateEvent) eventCaptor.getValue(); + + assertNotNull(processedEvent.getMetadata()); + assertTrue(processedEvent.getMetadata().containsKey(TIMESTAMP_FIELD)); + } + + @Test + public void testProcessTaskStatusUpdateEvent_withExistingMetadata() { + Map metadata = new HashMap<>(); + metadata.put("existing", "value"); + + TaskStatusUpdateEvent event = new TaskStatusUpdateEvent.Builder() + .taskId("task-1") + .contextId("context-1") + .metadata(metadata) + .status(new TaskStatus(TaskState.COMPLETED)) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(event); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + TaskStatusUpdateEvent processedEvent = (TaskStatusUpdateEvent) eventCaptor.getValue(); + + assertTrue(processedEvent.getMetadata().containsKey(TIMESTAMP_FIELD)); + assertTrue(processedEvent.getMetadata().containsKey("existing")); + } + + @Test + public void testProcessTaskStatusUpdateEvent_withExistingTimestamp() { + OffsetDateTime existingTimestamp = OffsetDateTime.now(); + Map metadata = new HashMap<>(); + metadata.put(TIMESTAMP_FIELD, existingTimestamp); + + TaskStatusUpdateEvent event = new TaskStatusUpdateEvent.Builder() + .taskId("task-1") + .contextId("context-1") + .metadata(metadata) + .status(new TaskStatus(TaskState.COMPLETED)) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(event); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + TaskStatusUpdateEvent processedEvent = (TaskStatusUpdateEvent) eventCaptor.getValue(); + + assertEquals(existingTimestamp, processedEvent.getMetadata().get(TIMESTAMP_FIELD)); + } + + @Test + public void testProcessTaskArtifactUpdateEvent_withNullMetadata() { + TaskArtifactUpdateEvent event = new TaskArtifactUpdateEvent.Builder() + .taskId("task-1") + .contextId("context-1") + .append(false) + .artifact(new Artifact.Builder() + .artifactId("artifact-id") + .description("Test artifact") + .name("Artifact") + .parts(List.of(new TextPart("test message"))) + .build()) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(event); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + TaskArtifactUpdateEvent processedEvent = (TaskArtifactUpdateEvent) eventCaptor.getValue(); + + assertNotNull(processedEvent.getMetadata()); + assertTrue(processedEvent.getMetadata().containsKey(TIMESTAMP_FIELD)); + } + + @Test + public void testProcessTaskArtifactUpdateEvent_withExistingMetadata() { + Map metadata = new HashMap<>(); + metadata.put("existing", "value"); + + TaskArtifactUpdateEvent event = new TaskArtifactUpdateEvent.Builder() + .taskId("task-1") + .contextId("context-1") + .append(false) + .metadata(metadata) + .artifact(new Artifact.Builder() + .artifactId("artifact-id") + .description("Test artifact") + .name("Artifact") + .parts(List.of(new TextPart("test message"))) + .build()) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(event); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + TaskArtifactUpdateEvent processedEvent = (TaskArtifactUpdateEvent) eventCaptor.getValue(); + + assertTrue(processedEvent.getMetadata().containsKey(TIMESTAMP_FIELD)); + assertTrue(processedEvent.getMetadata().containsKey("existing")); + } + + @Test + public void testProcessTaskArtifactUpdateEvent_withExistingTimestamp() { + OffsetDateTime existingTimestamp = OffsetDateTime.now(); + Map metadata = new HashMap<>(); + metadata.put(TIMESTAMP_FIELD, existingTimestamp); + + TaskArtifactUpdateEvent event = new TaskArtifactUpdateEvent.Builder() + .taskId("task-1") + .contextId("context-1") + .append(false) + .metadata(metadata) + .artifact(new Artifact.Builder() + .artifactId("artifact-id") + .description("Test artifact") + .name("Artifact") + .parts(List.of(new TextPart("test message"))) + .build()) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(event); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + TaskArtifactUpdateEvent processedEvent = (TaskArtifactUpdateEvent) eventCaptor.getValue(); + + assertEquals(existingTimestamp, processedEvent.getMetadata().get(TIMESTAMP_FIELD)); + } + + @Test + public void testProcessTaskArtifactUpdateEvent_withArtifact() { + Map artifactMetadata = new HashMap<>(); + List extensions = new ArrayList<>(); + + Artifact artifact = new Artifact.Builder() + .artifactId("artifact-id") + .parts(List.of(new TextPart("test part"))) + .metadata(artifactMetadata) + .extensions(extensions) + .build(); + + Map metadata = new HashMap<>(); + + TaskArtifactUpdateEvent event = new TaskArtifactUpdateEvent.Builder() + .taskId("task-1") + .contextId("context-1") + .append(false) + .artifact(artifact) + .metadata(metadata) + .build(); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + timestampQueue.enqueueEvent(event); + + verify(delegateQueue).enqueueEvent(eventCaptor.capture()); + TaskArtifactUpdateEvent processedEvent = (TaskArtifactUpdateEvent) eventCaptor.getValue(); + + assertTrue(processedEvent.getMetadata().containsKey(TIMESTAMP_FIELD)); + + // Verify artifact was processed + assertNotNull(processedEvent.getArtifact()); + assertTrue(processedEvent.getArtifact().metadata().containsKey(TIMESTAMP_FIELD)); + assertTrue(processedEvent.getArtifact().extensions().contains(URI)); + } + + @Test + public void testUnknownEventType_passesThrough() { + Event unknownEvent = mock(Event.class); + + timestampQueue.enqueueEvent(unknownEvent); + + verify(delegateQueue).enqueueEvent(unknownEvent); + } + + @Test + public void testAwaitQueuePollerStart() throws InterruptedException { + timestampQueue.awaitQueuePollerStart(); + + verify(delegateQueue).awaitQueuePollerStart(); + } + + @Test + public void testClose() { + timestampQueue.close(); + + verify(delegateQueue).close(); + } + + @Test + public void testCloseWithImmediate() { + timestampQueue.close(true); + + verify(delegateQueue).close(true); + } + + @Test + public void testCloseWithImmediateAndNotifyParent() { + timestampQueue.close(true, true); + + verify(delegateQueue).close(true, true); + } + + @Test + public void testSignalQueuePollerStarted() { + timestampQueue.signalQueuePollerStarted(); + + verify(delegateQueue).signalQueuePollerStarted(); + } + + @Test + public void testTap() { + EventQueue tappedQueue = timestampQueue.tap(); + + assertEquals(timestampQueue, tappedQueue); + } +} diff --git a/pom.xml b/pom.xml index 02129d631..59fbf8426 100644 --- a/pom.xml +++ b/pom.xml @@ -457,6 +457,7 @@ transport/jsonrpc transport/grpc transport/rest + extensions/timestamp diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java index d590d8890..6a8a154ac 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java +++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java @@ -96,7 +96,7 @@ public int getQueueSize() { public abstract void awaitQueuePollerStart() throws InterruptedException ; - abstract void signalQueuePollerStarted(); + public abstract void signalQueuePollerStarted(); public void enqueueEvent(Event event) { enqueueItem(new LocalEventQueueItem(event)); @@ -119,7 +119,7 @@ public void enqueueItem(EventQueueItem item) { LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this); } - abstract EventQueue tap(); + public abstract EventQueue tap(); /** * Dequeues an EventQueueItem from the queue. @@ -265,7 +265,7 @@ static class MainQueue extends EventQueue { taskId, onCloseCallbacks.size(), taskStateProvider != null); } - EventQueue tap() { + public EventQueue tap() { ChildQueue child = new ChildQueue(this); children.add(child); return child; @@ -310,7 +310,7 @@ public void awaitQueuePollerStart() throws InterruptedException { } @Override - void signalQueuePollerStarted() { + public void signalQueuePollerStarted() { if (pollingStarted.get()) { return; } @@ -415,7 +415,7 @@ private void internalEnqueueItem(EventQueueItem item) { } @Override - EventQueue tap() { + public EventQueue tap() { throw new IllegalStateException("Can only tap the main queue"); } @@ -425,7 +425,7 @@ public void awaitQueuePollerStart() throws InterruptedException { } @Override - void signalQueuePollerStarted() { + public void signalQueuePollerStarted() { parent.signalQueuePollerStarted(); } diff --git a/spec/src/main/java/io/a2a/spec/Message.java b/spec/src/main/java/io/a2a/spec/Message.java index dd7e860a5..3e9df40dd 100644 --- a/spec/src/main/java/io/a2a/spec/Message.java +++ b/spec/src/main/java/io/a2a/spec/Message.java @@ -128,7 +128,6 @@ public String asString() { return this.role; } } - public static class Builder { private Role role; diff --git a/spec/src/main/java/io/a2a/spec/TaskStatusUpdateEvent.java b/spec/src/main/java/io/a2a/spec/TaskStatusUpdateEvent.java index 25e2cd170..10050dfa5 100644 --- a/spec/src/main/java/io/a2a/spec/TaskStatusUpdateEvent.java +++ b/spec/src/main/java/io/a2a/spec/TaskStatusUpdateEvent.java @@ -96,6 +96,7 @@ public Builder(TaskStatusUpdateEvent existingTaskStatusUpdateEvent) { this.isFinal = existingTaskStatusUpdateEvent.isFinal; this.metadata = existingTaskStatusUpdateEvent.metadata; } + public Builder taskId(String id) { this.taskId = id; return this;