Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions camel/societies/workforce/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class WorkforceEventBase(BaseModel):
"worker_deleted",
"queue_status",
"all_tasks_completed",
"task_streaming_chunk",
]
metadata: Optional[Dict[str, Any]] = None
timestamp: datetime = Field(
Expand Down Expand Up @@ -96,6 +97,14 @@ class TaskFailedEvent(WorkforceEventBase):
worker_id: Optional[str] = None


class TaskStreamingChunkEvent(WorkforceEventBase):
event_type: Literal["task_streaming_chunk"] = "task_streaming_chunk"
task_id: str
worker_id: str
chunk: str
chunk_index: int


class AllTasksCompletedEvent(WorkforceEventBase):
event_type: Literal["all_tasks_completed"] = "all_tasks_completed"

Expand All @@ -109,6 +118,7 @@ class QueueStatusEvent(WorkforceEventBase):


WorkforceEvent = Union[
TaskStreamingChunkEvent,
TaskDecomposedEvent,
TaskCreatedEvent,
TaskAssignedEvent,
Expand Down
58 changes: 53 additions & 5 deletions camel/societies/workforce/single_agent_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from camel.agents import ChatAgent
from camel.agents.chat_agent import AsyncStreamingChatAgentResponse
from camel.logger import get_logger
from camel.societies.workforce.events import TaskStreamingChunkEvent
from camel.societies.workforce.prompts import PROCESS_TASK_PROMPT
from camel.societies.workforce.structured_output_handler import (
StructuredOutputHandler,
Expand Down Expand Up @@ -393,9 +394,32 @@ async def _process_task(
# Handle streaming response
if isinstance(response, AsyncStreamingChatAgentResponse):
content = ""
chunk_index = 0
async for chunk in response:
if chunk.msg:
content = chunk.msg.content
if chunk.msg and chunk.msg.content:
chunk_event = TaskStreamingChunkEvent(
task_id=task.id,
worker_id=self.node_id,
chunk=chunk.msg.content,
chunk_index=chunk_index,
)
if (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here assumes self.callback exists in SingleAgentWorker, but it doesn't. Workers have no way to access the callbacks registered in Workforce.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we need add a mechanism to propagate callbacks from Workforce to Worker instances.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the worker callback, I suggest implementing callback support at the BaseNode level, since all node types (both Worker and Workforce) could benefit from this change.
However, it might be good to get some feedback from @Wendong-Fan on this idea.

hasattr(self, 'callback')
and self.callback is not None
):
try:
await (
self.callback.log_task_streaming_chunk(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log_task_streaming_chunk is defined as a synchronous method in WorkforceCallback

chunk_event
)
)
except Exception as e:
logger.warning(
f"Failed to log streaming chunk: {e}"
)

content += chunk.msg.content
chunk_index += 1
response_content = content
else:
# Regular ChatAgentResponse
Expand All @@ -422,10 +446,34 @@ async def _process_task(
# Handle streaming response for native output
if isinstance(response, AsyncStreamingChatAgentResponse):
task_result = None
chunk_index = 0
async for chunk in response:
if chunk.msg and chunk.msg.parsed:
task_result = chunk.msg.parsed
response_content = chunk.msg.content
if chunk.msg:
if chunk.msg.content:
chunk_event = TaskStreamingChunkEvent(
task_id=task.id,
worker_id=self.node_id,
chunk=chunk.msg.content,
chunk_index=chunk_index,
)
if (
hasattr(self, 'callback')
and self.callback is not None
):
try:
await self.callback.log_task_streaming_chunk( # noqa: E501
chunk_event
)
except Exception as e:
logger.warning(
f"Failed to log chunk: {e}"
)
chunk_index += 1

if chunk.msg.parsed:
task_result = chunk.msg.parsed
response_content = chunk.msg.content

# If no parsed result found in streaming, create fallback
if task_result is None:
task_result = TaskResult(
Expand Down
5 changes: 5 additions & 0 deletions camel/societies/workforce/workforce_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
TaskDecomposedEvent,
TaskFailedEvent,
TaskStartedEvent,
TaskStreamingChunkEvent,
WorkerCreatedEvent,
WorkerDeletedEvent,
)
Expand Down Expand Up @@ -72,3 +73,7 @@ def log_worker_deleted(self, event: WorkerDeletedEvent) -> None:
@abstractmethod
def log_all_tasks_completed(self, event: AllTasksCompletedEvent) -> None:
pass

@abstractmethod
def log_task_streaming_chunk(self, event: TaskStreamingChunkEvent) -> None:
pass
27 changes: 27 additions & 0 deletions camel/societies/workforce/workforce_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
TaskDecomposedEvent,
TaskFailedEvent,
TaskStartedEvent,
TaskStreamingChunkEvent,
WorkerCreatedEvent,
WorkerDeletedEvent,
)
Expand All @@ -49,6 +50,7 @@ def __init__(self, workforce_id: str):
self._task_hierarchy: Dict[str, Dict[str, Any]] = {}
self._worker_information: Dict[str, Dict[str, Any]] = {}
self._initial_worker_logs: List[Dict[str, Any]] = []
self._streaming_chunks: Dict[str, List[Dict[str, Any]]] = {}

def _log_event(self, event_type: str, **kwargs: Any) -> None:
r"""Internal method to create and store a log entry.
Expand All @@ -67,6 +69,31 @@ def _log_event(self, event_type: str, **kwargs: Any) -> None:
if event_type == 'worker_created':
self._initial_worker_logs.append(log_entry)

def log_task_streaming_chunk(self, event: TaskStreamingChunkEvent) -> None:
r"""Logs a streaming chunk from task execution.

Args:
event (TaskStreamingChunkEvent): The streaming chunk event.
"""
if event.task_id not in self._streaming_chunks:
self._streaming_chunks[event.task_id] = []

chunk_data = {
'chunk_index': event.chunk_index,
'chunk': event.chunk,
'worker_id': event.worker_id,
}
self._streaming_chunks[event.task_id].append(chunk_data)

self._log_event(
event_type=event.event_type,
task_id=event.task_id,
worker_id=event.worker_id,
chunk=event.chunk,
chunk_index=event.chunk_index,
metadata=event.metadata or {},
)

def log_task_created(
self,
event: TaskCreatedEvent,
Expand Down
Loading