Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions camel/societies/workforce/workforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from .workforce_metrics import WorkforceMetrics

if TYPE_CHECKING:
from camel.responses import ChatAgentResponse
from camel.utils.context_utils import ContextUtility, WorkflowSummary

from colorama import Fore
Expand Down Expand Up @@ -1316,13 +1317,20 @@ def _cleanup_task_tracking(self, task_id: str) -> None:
del self._assignees[task_id]

def _decompose_task(
self, task: Task
self,
task: Task,
stream_callback: Optional[
Callable[["ChatAgentResponse"], None]
] = None,
) -> Union[List[Task], Generator[List[Task], None, None]]:
r"""Decompose the task into subtasks. This method will also set the
relationship between the task and its subtasks.

Args:
task (Task): The task to decompose.
stream_callback (Callable[[ChatAgentResponse], None], optional): A
callback function that receives each chunk (ChatAgentResponse)
during streaming decomposition.

Returns:
Union[List[Task], Generator[List[Task], None, None]]:
Expand All @@ -1341,7 +1349,9 @@ def _decompose_task(
)
)
self.task_agent.reset()
result = task.decompose(self.task_agent, decompose_prompt)
result = task.decompose(
self.task_agent, decompose_prompt, stream_callback=stream_callback
)

# Handle both streaming and non-streaming results
if isinstance(result, Generator):
Expand Down
46 changes: 40 additions & 6 deletions camel/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

import re
from enum import Enum
from types import GeneratorType
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
Iterable,
List,
Literal,
Optional,
Expand All @@ -31,7 +33,7 @@

if TYPE_CHECKING:
from camel.agents import ChatAgent
from camel.agents.chat_agent import StreamingChatAgentResponse
from camel.responses import ChatAgentResponse
import uuid

from camel.logger import get_logger
Expand Down Expand Up @@ -409,6 +411,9 @@ def decompose(
agent: "ChatAgent",
prompt: Optional[str] = None,
task_parser: Callable[[str, str], List["Task"]] = parse_response,
stream_callback: Optional[
Callable[["ChatAgentResponse"], None]
] = None,
) -> Union[List["Task"], Generator[List["Task"], None, None]]:
r"""Decompose a task to a list of sub-tasks. Automatically detects
streaming or non-streaming based on agent configuration.
Expand All @@ -420,6 +425,10 @@ def decompose(
task_parser (Callable[[str, str], List[Task]], optional): A
function to extract Task from response. If not provided,
the default parse_response will be used.
stream_callback (Callable[[ChatAgentResponse], None], optional): A
callback function that receives each chunk (ChatAgentResponse)
during streaming. This allows tracking the decomposition
progress in real-time.

Returns:
Union[List[Task], Generator[List[Task], None, None]]: If agent is
Expand All @@ -441,21 +450,38 @@ def decompose(
# Auto-detect streaming based on response type
from camel.agents.chat_agent import StreamingChatAgentResponse

if isinstance(response, StreamingChatAgentResponse):
return self._decompose_streaming(response, task_parser)
else:
return self._decompose_non_streaming(response, task_parser)
is_streaming = isinstance(
response, StreamingChatAgentResponse
) or isinstance(response, GeneratorType)
if (
not is_streaming
and hasattr(response, "__iter__")
and not hasattr(response, "msg")
):
is_streaming = True

if is_streaming:
return self._decompose_streaming(
response, task_parser, stream_callback=stream_callback
)
return self._decompose_non_streaming(response, task_parser)

def _decompose_streaming(
self,
response: "StreamingChatAgentResponse",
response: Iterable,
task_parser: Callable[[str, str], List["Task"]],
stream_callback: Optional[
Callable[["ChatAgentResponse"], None]
] = None,
) -> Generator[List["Task"], None, None]:
r"""Handle streaming response for task decomposition.

Args:
response: Streaming response from agent
task_parser: Function to parse tasks from response
stream_callback (Callable[[ChatAgentResponse], None], optional): A
callback function that receives each chunk (ChatAgentResponse)
during streaming.

Yields:
List[Task]: New tasks as they are parsed from streaming response
Expand All @@ -466,6 +492,14 @@ def _decompose_streaming(
# Process streaming response
for chunk in response:
accumulated_content = chunk.msg.content
if stream_callback:
try:
stream_callback(chunk)
except Exception:
logger.warning(
"stream_callback failed during decomposition",
exc_info=True,
)

# Try to parse partial tasks from accumulated content
try:
Expand Down
Loading