diff --git a/agentscope-extensions/agentscope-extensions-studio/src/main/java/io/agentscope/core/studio/StudioMessageHook.java b/agentscope-extensions/agentscope-extensions-studio/src/main/java/io/agentscope/core/studio/StudioMessageHook.java index b739c2ed5..57c25e9f0 100644 --- a/agentscope-extensions/agentscope-extensions-studio/src/main/java/io/agentscope/core/studio/StudioMessageHook.java +++ b/agentscope-extensions/agentscope-extensions-studio/src/main/java/io/agentscope/core/studio/StudioMessageHook.java @@ -15,10 +15,19 @@ */ package io.agentscope.core.studio; +import io.agentscope.core.hook.ActingChunkEvent; import io.agentscope.core.hook.Hook; import io.agentscope.core.hook.HookEvent; +import io.agentscope.core.hook.PostActingEvent; import io.agentscope.core.hook.PostCallEvent; +import io.agentscope.core.hook.PostReasoningEvent; +import io.agentscope.core.hook.ReasoningChunkEvent; import io.agentscope.core.message.Msg; +import io.agentscope.core.message.MsgRole; +import io.agentscope.core.message.ToolResultBlock; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -91,6 +100,112 @@ public Mono onEvent(T event) { return Mono.just(event); }); } + + // Reasoning incremental chunk + if (event instanceof ReasoningChunkEvent) { + ReasoningChunkEvent e = (ReasoningChunkEvent) event; + Msg chunk = e.getIncrementalChunk(); + if (chunk != null && studioClient != null) { + Msg tagged = withEventMetadata(chunk, "reasoning", false); + return studioClient + .pushMessage(tagged) + .thenReturn(event) + .onErrorResume( + ex -> { + logger.error("Failed to push reasoning chunk to Studio", ex); + return Mono.just(event); + }); + } + return Mono.just(event); + } + + // Reasoning final result (after stream completes) + if (event instanceof PostReasoningEvent) { + PostReasoningEvent e = (PostReasoningEvent) event; + Msg finalMsg = e.getReasoningMessage(); + if (finalMsg != null && studioClient != null) { + Msg tagged = withEventMetadata(finalMsg, "reasoning", true); + return studioClient + .pushMessage(tagged) + .thenReturn(event) + .onErrorResume( + ex -> { + logger.error("Failed to push final reasoning to Studio", ex); + return Mono.just(event); + }); + } + return Mono.just(event); + } + + // Acting (tool) incremental chunk + if (event instanceof ActingChunkEvent) { + ActingChunkEvent e = (ActingChunkEvent) event; + ToolResultBlock chunk = e.getChunk(); + if (chunk != null && studioClient != null) { + // build a Msg with TOOL role similar to streaming hook + Msg toolMsg = + Msg.builder() + .name("tool") + .role(MsgRole.TOOL) + .content(List.of(chunk)) + .build(); + Msg tagged = withEventMetadata(toolMsg, "tool_result", false); + return studioClient + .pushMessage(tagged) + .thenReturn(event) + .onErrorResume( + ex -> { + logger.error("Failed to push acting chunk to Studio", ex); + return Mono.just(event); + }); + } + return Mono.just(event); + } + + // Acting (tool) final result + if (event instanceof PostActingEvent) { + PostActingEvent e = (PostActingEvent) event; + ToolResultBlock result = e.getToolResult(); + if (result != null && studioClient != null) { + Msg toolMsg = + Msg.builder() + .name("tool") + .role(MsgRole.TOOL) + .content(List.of(result)) + .build(); + Msg tagged = withEventMetadata(toolMsg, "tool_result", true); + return studioClient + .pushMessage(tagged) + .thenReturn(event) + .onErrorResume( + ex -> { + logger.error("Failed to push tool result to Studio", ex); + return Mono.just(event); + }); + } + return Mono.just(event); + } + return Mono.just(event); } + + /** + * Wrap/augment a Msg with metadata that indicates event type and whether it's the last chunk. + */ + private Msg withEventMetadata(Msg orig, String eventType, boolean isLast) { + Map meta = new HashMap<>(); + if (orig.getMetadata() != null) { + meta.putAll(orig.getMetadata()); + } + meta.put("studio_event_type", eventType); + meta.put("studio_is_last", isLast); + // Rebuild msg copying fields and injecting metadata; adapt if Msg.Builder has different + // methods + return Msg.builder() + .name(orig.getName()) + .role(orig.getRole()) + .content(orig.getContent()) + .metadata(meta) + .build(); + } } diff --git a/agentscope-extensions/agentscope-extensions-studio/src/test/java/io/agentscope/core/studio/StudioMessageHookTest.java b/agentscope-extensions/agentscope-extensions-studio/src/test/java/io/agentscope/core/studio/StudioMessageHookTest.java index 48b07bc8b..4ae64efac 100644 --- a/agentscope-extensions/agentscope-extensions-studio/src/test/java/io/agentscope/core/studio/StudioMessageHookTest.java +++ b/agentscope-extensions/agentscope-extensions-studio/src/test/java/io/agentscope/core/studio/StudioMessageHookTest.java @@ -17,6 +17,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -24,15 +25,23 @@ import static org.mockito.Mockito.when; import io.agentscope.core.agent.Agent; +import io.agentscope.core.hook.ActingChunkEvent; import io.agentscope.core.hook.HookEvent; +import io.agentscope.core.hook.PostActingEvent; import io.agentscope.core.hook.PostCallEvent; +import io.agentscope.core.hook.PostReasoningEvent; import io.agentscope.core.hook.PreCallEvent; +import io.agentscope.core.hook.ReasoningChunkEvent; import io.agentscope.core.message.Msg; import io.agentscope.core.message.MsgRole; import io.agentscope.core.message.TextBlock; +import io.agentscope.core.message.ToolResultBlock; +import io.agentscope.core.message.ToolUseBlock; +import io.agentscope.core.tool.Toolkit; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -229,4 +238,150 @@ void testMultipleEvents() { verify(mockStudioClient, times(1)).pushMessage(msg1); verify(mockStudioClient, times(1)).pushMessage(msg2); } + + @Test + @DisplayName("Should forward reasoning chunk to Studio") + void testReasoningChunkEventForwards() { + // Mock push success + when(mockStudioClient.pushMessage(any(Msg.class))).thenReturn(Mono.empty()); + + // Build incremental and accumulated msg + Msg incremental = + Msg.builder() + .name("Assistant") + .role(MsgRole.ASSISTANT) + .content(TextBlock.builder().text("partial chunk").build()) + .build(); + Msg accumulated = + Msg.builder() + .name("Assistant") + .role(MsgRole.ASSISTANT) + .content(TextBlock.builder().text("accumulated so far").build()) + .build(); + + ReasoningChunkEvent event = + new ReasoningChunkEvent(mockAgent, "test-model", null, incremental, accumulated); + + Mono result = hook.onEvent(event); + + StepVerifier.create(result).expectNext(event).verifyComplete(); + + verify(mockStudioClient, times(1)) + .pushMessage( + argThat( + m -> { + Map md = m.getMetadata(); + return md != null + && "reasoning".equals(md.get("studio_event_type")) + && Boolean.FALSE.equals(md.get("studio_is_last")); + })); + } + + @Test + @DisplayName("Should forward final reasoning result to Studio") + void testPostReasoningEventForwards() { + // Mock push success + when(mockStudioClient.pushMessage(any(Msg.class))).thenReturn(Mono.empty()); + + Msg finalMsg = + Msg.builder() + .name("Assistant") + .role(MsgRole.ASSISTANT) + .content(TextBlock.builder().text("final reasoning").build()) + .build(); + + PostReasoningEvent event = new PostReasoningEvent(mockAgent, "test-model", null, finalMsg); + + Mono result = hook.onEvent(event); + + StepVerifier.create(result).expectNext(event).verifyComplete(); + + verify(mockStudioClient, times(1)) + .pushMessage( + argThat( + m -> { + Map md = m.getMetadata(); + return md != null + && "reasoning".equals(md.get("studio_event_type")) + && Boolean.TRUE.equals(md.get("studio_is_last")); + })); + } + + @Test + @DisplayName("Should forward acting (tool) chunk to Studio") + void testActingChunkEventForwards() { + // Mock push success + when(mockStudioClient.pushMessage(any(Msg.class))).thenReturn(Mono.empty()); + + Toolkit toolkit = new Toolkit(); + ToolUseBlock toolUse = + ToolUseBlock.builder().id("call-1").name("test_tool").input(Map.of()).build(); + ToolResultBlock chunk = ToolResultBlock.text("progress update"); + + ActingChunkEvent event = new ActingChunkEvent(mockAgent, toolkit, toolUse, chunk); + + Mono result = hook.onEvent(event); + + StepVerifier.create(result).expectNext(event).verifyComplete(); + + verify(mockStudioClient, times(1)) + .pushMessage( + argThat( + m -> { + // Expect a TOOL role message with metadata and a + // ToolResultBlock in content + Map md = m.getMetadata(); + boolean hasMeta = + md != null + && "tool_result" + .equals(md.get("studio_event_type")) + && Boolean.FALSE.equals( + md.get("studio_is_last")); + boolean isToolRole = m.getRole() == MsgRole.TOOL; + boolean hasToolResult = + !m.getContentBlocks( + io.agentscope.core.message + .ToolResultBlock.class) + .isEmpty(); + return hasMeta && isToolRole && hasToolResult; + })); + } + + @Test + @DisplayName("Should forward final acting (tool) result to Studio") + void testPostActingEventForwards() { + // Mock push success + when(mockStudioClient.pushMessage(any(Msg.class))).thenReturn(Mono.empty()); + + Toolkit toolkit = new Toolkit(); + ToolUseBlock toolUse = + ToolUseBlock.builder().id("call-1").name("test_tool").input(Map.of()).build(); + ToolResultBlock resultBlock = ToolResultBlock.text("final result"); + + PostActingEvent event = new PostActingEvent(mockAgent, toolkit, toolUse, resultBlock); + + Mono result = hook.onEvent(event); + + StepVerifier.create(result).expectNext(event).verifyComplete(); + + verify(mockStudioClient, times(1)) + .pushMessage( + argThat( + m -> { + Map md = m.getMetadata(); + boolean hasMeta = + md != null + && "tool_result" + .equals(md.get("studio_event_type")) + && Boolean.TRUE.equals( + md.get("studio_is_last")); + boolean isToolRole = m.getRole() == MsgRole.TOOL; + boolean hasToolResult = + !m.getContentBlocks( + io.agentscope.core.message + .ToolResultBlock.class) + .isEmpty(); + return hasMeta && isToolRole && hasToolResult; + })); + } }