Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions camel/societies/workforce/workforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,9 @@ def _cleanup_task_tracking(self, task_id: str) -> None:
del self._assignees[task_id]

def _decompose_task(
self, task: Task
self,
task: Task,
stream_callback: Optional[Callable[[str], None]] = None,
) -> Union[List[Task], Generator[List[Task], None, None]]:
r"""Decompose the task into subtasks. This method will also set the
relationship between the task and its subtasks.
Expand All @@ -1341,7 +1343,9 @@ def _decompose_task(
)
)
self.task_agent.reset()
result = task.decompose(self.task_agent, decompose_prompt)
result = task.decompose(
self.task_agent, decompose_prompt, stream_callback=stream_callback
)

# Handle both streaming and non-streaming results
if isinstance(result, Generator):
Expand Down
34 changes: 28 additions & 6 deletions camel/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

import re
from enum import Enum
from types import GeneratorType
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
Iterable,
List,
Literal,
Optional,
Expand All @@ -31,7 +33,6 @@

if TYPE_CHECKING:
from camel.agents import ChatAgent
from camel.agents.chat_agent import StreamingChatAgentResponse
import uuid

from camel.logger import get_logger
Expand Down Expand Up @@ -409,6 +410,7 @@ def decompose(
agent: "ChatAgent",
prompt: Optional[str] = None,
task_parser: Callable[[str, str], List["Task"]] = parse_response,
stream_callback: Optional[Callable[[str], None]] = None,
) -> Union[List["Task"], Generator[List["Task"], None, None]]:
r"""Decompose a task to a list of sub-tasks. Automatically detects
streaming or non-streaming based on agent configuration.
Expand Down Expand Up @@ -441,15 +443,27 @@ def decompose(
# Auto-detect streaming based on response type
from camel.agents.chat_agent import StreamingChatAgentResponse

if isinstance(response, StreamingChatAgentResponse):
return self._decompose_streaming(response, task_parser)
else:
return self._decompose_non_streaming(response, task_parser)
is_streaming = isinstance(
response, StreamingChatAgentResponse
) or isinstance(response, GeneratorType)
if (
not is_streaming
and hasattr(response, "__iter__")
and not hasattr(response, "msg")
):
is_streaming = True

if is_streaming:
return self._decompose_streaming(
response, task_parser, stream_callback=stream_callback
)
return self._decompose_non_streaming(response, task_parser)

def _decompose_streaming(
self,
response: "StreamingChatAgentResponse",
response: Iterable,
task_parser: Callable[[str, str], List["Task"]],
stream_callback: Optional[Callable[[str], None]] = None,
) -> Generator[List["Task"], None, None]:
r"""Handle streaming response for task decomposition.

Expand All @@ -466,6 +480,14 @@ def _decompose_streaming(
# Process streaming response
for chunk in response:
accumulated_content = chunk.msg.content
if stream_callback:
try:
stream_callback(accumulated_content)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to return the chunk object? This would allow developers to get more information.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion! will do it!

except Exception:
logger.warning(
"stream_callback failed during decomposition",
exc_info=True,
)

# Try to parse partial tasks from accumulated content
try:
Expand Down
Loading