diff --git a/camel/agents/_utils.py b/camel/agents/_utils.py index 415ab0a0ee..edae576813 100644 --- a/camel/agents/_utils.py +++ b/camel/agents/_utils.py @@ -25,44 +25,6 @@ logger = logging.getLogger(__name__) -def build_default_summary_prompt(conversation_text: str) -> str: - r"""Create the default prompt used for conversation summarization. - - Args: - conversation_text (str): The conversation to be summarized. - - Returns: - str: A formatted prompt instructing the model to produce a structured - markdown summary. - """ - template = textwrap.dedent( - """\ - Summarize the conversation below. - Produce markdown that strictly follows this outline and numbering: - - Summary: - 1. **Primary Request and Intent**: - 2. **Key Concepts**: - 3. **Errors and Fixes**: - 4. **Problem Solving**: - 5. **Pending Tasks**: - 6. **Current Work**: - 7. **Optional Next Step**: - - Requirements: - - Use bullet lists under each section (`- item`). If a section has no - information, output `- None noted`. - - Keep the ordering, headings, and formatting as written above. - - Focus on concrete actions, findings, and decisions. - - Do not invent details that are not supported by the conversation. - - Conversation: - {conversation_text} - """ - ) - return template.format(conversation_text=conversation_text) - - def generate_tool_prompt(tool_schema_list: List[Dict[str, Any]]) -> str: r"""Generates a tool prompt based on the provided tool schema list. diff --git a/camel/agents/chat_agent.py b/camel/agents/chat_agent.py index 3e8e89af08..3ee64d3a54 100644 --- a/camel/agents/chat_agent.py +++ b/camel/agents/chat_agent.py @@ -30,7 +30,6 @@ import uuid import warnings from dataclasses import dataclass -from datetime import datetime from pathlib import Path from typing import ( TYPE_CHECKING, @@ -57,7 +56,6 @@ from camel.agents._types import ModelResponse, ToolCallRequest from camel.agents._utils import ( - build_default_summary_prompt, convert_to_function_tool, convert_to_schema, get_info_dict, @@ -103,6 +101,7 @@ ) from camel.utils.commons import dependencies_required from camel.utils.context_utils import ContextUtility +from camel.utils.message_summarizer import MessageSummarizer TOKEN_LIMIT_ERROR_MARKERS = ( "context_length_exceeded", @@ -598,7 +597,8 @@ def __init__( self.retry_delay = max(0.0, retry_delay) self.step_timeout = step_timeout self._context_utility: Optional[ContextUtility] = None - self._context_summary_agent: Optional["ChatAgent"] = None + self._message_summarizer: Optional["MessageSummarizer"] = None + self._message_summarizer_lock = asyncio.Lock() self.stream_accumulate = stream_accumulate self._last_tool_call_record: Optional[ToolCallingRecord] = None self._last_tool_call_signature: Optional[str] = None @@ -1636,239 +1636,17 @@ def summarize( stacklevel=2, ) - result: Dict[str, Any] = { - "summary": "", - "file_path": None, - "status": "", - } - - try: - # Use external context if set, otherwise create local one - if self._context_utility is None: - if working_directory is not None: - self._context_utility = ContextUtility( - working_directory=str(working_directory) - ) - else: - self._context_utility = ContextUtility() - context_util = self._context_utility - - # Get conversation directly from agent's memory - messages, _ = self.memory.get_context() - - if not messages: - status_message = ( - "No conversation context available to summarize." - ) - result["status"] = status_message - return result - - # Convert messages to conversation text - conversation_lines = [] - user_messages: List[str] = [] - for message in messages: - role = message.get('role', 'unknown') - content = message.get('content', '') - - # Skip summary messages if include_summaries is False - if not include_summaries and isinstance(content, str): - # Check if this is a summary message by looking for marker - if content.startswith('[CONTEXT_SUMMARY]'): - continue - - # Handle tool call messages (assistant calling tools) - tool_calls = message.get('tool_calls') - if tool_calls and isinstance(tool_calls, (list, tuple)): - for tool_call in tool_calls: - # Handle both dict and object formats - if isinstance(tool_call, dict): - func_name = tool_call.get('function', {}).get( - 'name', 'unknown_tool' - ) - func_args_str = tool_call.get('function', {}).get( - 'arguments', '{}' - ) - else: - # Handle object format (Pydantic or similar) - func_name = getattr( - getattr(tool_call, 'function', None), - 'name', - 'unknown_tool', - ) - func_args_str = getattr( - getattr(tool_call, 'function', None), - 'arguments', - '{}', - ) - - # Parse and format arguments for readability - try: - import json - - args_dict = json.loads(func_args_str) - args_formatted = ', '.join( - f"{k}={v}" for k, v in args_dict.items() - ) - except (json.JSONDecodeError, ValueError, TypeError): - args_formatted = func_args_str - - conversation_lines.append( - f"[TOOL CALL] {func_name}({args_formatted})" - ) - - # Handle tool response messages - elif role == 'tool': - tool_name = message.get('name', 'unknown_tool') - if not content: - content = str(message.get('content', '')) - conversation_lines.append( - f"[TOOL RESULT] {tool_name} → {content}" - ) - - # Handle regular content messages (user/assistant/system) - elif content: - content = str(content) - if role == 'user': - user_messages.append(content) - conversation_lines.append(f"{role}: {content}") - - conversation_text = "\n".join(conversation_lines).strip() - - if not conversation_text: - status_message = ( - "Conversation context is empty; skipping summary." - ) - result["status"] = status_message - return result - - if self._context_summary_agent is None: - self._context_summary_agent = ChatAgent( - system_message=( - "You are a helpful assistant that summarizes " - "conversations" - ), - model=self.model_backend, - agent_id=f"{self.agent_id}_context_summarizer", - ) - else: - self._context_summary_agent.reset() - - if summary_prompt: - prompt_text = ( - f"{summary_prompt.rstrip()}\n\n" - f"AGENT CONVERSATION TO BE SUMMARIZED:\n" - f"{conversation_text}" - ) - else: - prompt_text = build_default_summary_prompt(conversation_text) - - try: - # Use structured output if response_format is provided - if response_format: - response = self._context_summary_agent.step( - prompt_text, response_format=response_format - ) - else: - response = self._context_summary_agent.step(prompt_text) - except Exception as step_exc: - error_message = ( - f"Failed to generate summary using model: {step_exc}" - ) - logger.error(error_message) - result["status"] = error_message - return result - - if not response.msgs: - status_message = ( - "Failed to generate summary from model response." - ) - result["status"] = status_message - return result - - summary_content = response.msgs[-1].content.strip() - if not summary_content: - status_message = "Generated summary is empty." - result["status"] = status_message - return result - - # handle structured output if response_format was provided - structured_output = None - if response_format and response.msgs[-1].parsed: - structured_output = response.msgs[-1].parsed - - # determine filename: use provided filename, or extract from - # structured output, or generate timestamp - if filename: - base_filename = filename - elif structured_output and hasattr( - structured_output, 'task_title' - ): - # use task_title from structured output for filename - task_title = structured_output.task_title - clean_title = ContextUtility.sanitize_workflow_filename( - task_title - ) - base_filename = ( - f"{clean_title}_workflow" if clean_title else "workflow" - ) - else: - base_filename = f"context_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # noqa: E501 - - base_filename = Path(base_filename).with_suffix("").name - - metadata = context_util.get_session_metadata() - metadata.update( - { - "agent_id": self.agent_id, - "message_count": len(messages), - } - ) - - # convert structured output to custom markdown if present - if structured_output: - # convert structured output to custom markdown - summary_content = context_util.structured_output_to_markdown( - structured_data=structured_output, metadata=metadata - ) - if add_user_messages: - summary_content = self._append_user_messages_section( - summary_content, user_messages - ) - - # Save the markdown (either custom structured or default) - save_status = context_util.save_markdown_file( - base_filename, - summary_content, - title="Conversation Summary" - if not structured_output - else None, - metadata=metadata if not structured_output else None, - ) - - file_path = ( - context_util.get_working_directory() / f"{base_filename}.md" - ) - summary_content = ( - f"[CONTEXT_SUMMARY] The following is a summary of our " - f"conversation from a previous session: {summary_content}" + # Delegate to asummarize using asyncio.run + return asyncio.run( + self.asummarize( + filename=filename, + summary_prompt=summary_prompt, + response_format=response_format, + working_directory=working_directory, + include_summaries=include_summaries, + add_user_messages=add_user_messages, ) - # Prepare result dictionary - result_dict = { - "summary": summary_content, - "file_path": str(file_path), - "status": save_status, - "structured_summary": structured_output, - } - - result.update(result_dict) - logger.info("Conversation summary saved to %s", file_path) - return result - - except Exception as exc: - error_message = f"Failed to summarize conversation context: {exc}" - logger.error(error_message) - result["status"] = error_message - return result + ) async def asummarize( self, @@ -1912,251 +1690,27 @@ async def asummarize( path, status message, and optionally structured_summary if response_format was provided. """ - - result: Dict[str, Any] = { - "summary": "", - "file_path": None, - "status": "", - } - - try: - # Use external context if set, otherwise create local one - if self._context_utility is None: - if working_directory is not None: - self._context_utility = ContextUtility( - working_directory=str(working_directory) - ) - else: - self._context_utility = ContextUtility() - context_util = self._context_utility - - # Get conversation directly from agent's memory - messages, _ = self.memory.get_context() - - if not messages: - status_message = ( - "No conversation context available to summarize." + # Get conversation directly from agent's memory + messages, _ = self.memory.get_context() + + # Initialize MessageSummarizer if not already initialized (thread-safe) + async with self._message_summarizer_lock: + if self._message_summarizer is None: + self._message_summarizer = MessageSummarizer( + model_backend=self.model_backend.current_model ) - result["status"] = status_message - return result - - # Convert messages to conversation text - conversation_lines = [] - user_messages: List[str] = [] - for message in messages: - role = message.get('role', 'unknown') - content = message.get('content', '') - - # Skip summary messages if include_summaries is False - if not include_summaries and isinstance(content, str): - # Check if this is a summary message by looking for marker - if content.startswith('[CONTEXT_SUMMARY]'): - continue - - # Handle tool call messages (assistant calling tools) - tool_calls = message.get('tool_calls') - if tool_calls and isinstance(tool_calls, (list, tuple)): - for tool_call in tool_calls: - # Handle both dict and object formats - if isinstance(tool_call, dict): - func_name = tool_call.get('function', {}).get( - 'name', 'unknown_tool' - ) - func_args_str = tool_call.get('function', {}).get( - 'arguments', '{}' - ) - else: - # Handle object format (Pydantic or similar) - func_name = getattr( - getattr(tool_call, 'function', None), - 'name', - 'unknown_tool', - ) - func_args_str = getattr( - getattr(tool_call, 'function', None), - 'arguments', - '{}', - ) - - # Parse and format arguments for readability - try: - import json - - args_dict = json.loads(func_args_str) - args_formatted = ', '.join( - f"{k}={v}" for k, v in args_dict.items() - ) - except (json.JSONDecodeError, ValueError, TypeError): - args_formatted = func_args_str - conversation_lines.append( - f"[TOOL CALL] {func_name}({args_formatted})" - ) - - # Handle tool response messages - elif role == 'tool': - tool_name = message.get('name', 'unknown_tool') - if not content: - content = str(message.get('content', '')) - conversation_lines.append( - f"[TOOL RESULT] {tool_name} → {content}" - ) - - # Handle regular content messages (user/assistant/system) - elif content: - content = str(content) - if role == 'user': - user_messages.append(content) - conversation_lines.append(f"{role}: {content}") - - conversation_text = "\n".join(conversation_lines).strip() - - if not conversation_text: - status_message = ( - "Conversation context is empty; skipping summary." - ) - result["status"] = status_message - return result - - if self._context_summary_agent is None: - self._context_summary_agent = ChatAgent( - system_message=( - "You are a helpful assistant that summarizes " - "conversations" - ), - model=self.model_backend, - agent_id=f"{self.agent_id}_context_summarizer", - ) - else: - self._context_summary_agent.reset() - - if summary_prompt: - prompt_text = ( - f"{summary_prompt.rstrip()}\n\n" - f"AGENT CONVERSATION TO BE SUMMARIZED:\n" - f"{conversation_text}" - ) - else: - prompt_text = build_default_summary_prompt(conversation_text) - - try: - # Use structured output if response_format is provided - if response_format: - response = await self._context_summary_agent.astep( - prompt_text, response_format=response_format - ) - else: - response = await self._context_summary_agent.astep( - prompt_text - ) - - # Handle streaming response - if isinstance(response, AsyncStreamingChatAgentResponse): - # Collect final response - final_response = await response - response = final_response - - except Exception as step_exc: - error_message = ( - f"Failed to generate summary using model: {step_exc}" - ) - logger.error(error_message) - result["status"] = error_message - return result - - if not response.msgs: - status_message = ( - "Failed to generate summary from model response." - ) - result["status"] = status_message - return result - - summary_content = response.msgs[-1].content.strip() - if not summary_content: - status_message = "Generated summary is empty." - result["status"] = status_message - return result - - # handle structured output if response_format was provided - structured_output = None - if response_format and response.msgs[-1].parsed: - structured_output = response.msgs[-1].parsed - - # determine filename: use provided filename, or extract from - # structured output, or generate timestamp - if filename: - base_filename = filename - elif structured_output and hasattr( - structured_output, 'task_title' - ): - # use task_title from structured output for filename - task_title = structured_output.task_title - clean_title = ContextUtility.sanitize_workflow_filename( - task_title - ) - base_filename = ( - f"{clean_title}_workflow" if clean_title else "workflow" - ) - else: - base_filename = f"context_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # noqa: E501 - - base_filename = Path(base_filename).with_suffix("").name - - metadata = context_util.get_session_metadata() - metadata.update( - { - "agent_id": self.agent_id, - "message_count": len(messages), - } - ) - - # convert structured output to custom markdown if present - if structured_output: - # convert structured output to custom markdown - summary_content = context_util.structured_output_to_markdown( - structured_data=structured_output, metadata=metadata - ) - if add_user_messages: - summary_content = self._append_user_messages_section( - summary_content, user_messages - ) - - # Save the markdown (either custom structured or default) - save_status = context_util.save_markdown_file( - base_filename, - summary_content, - title="Conversation Summary" - if not structured_output - else None, - metadata=metadata if not structured_output else None, - ) - - file_path = ( - context_util.get_working_directory() / f"{base_filename}.md" - ) - - summary_content = ( - f"[CONTEXT_SUMMARY] The following is a summary of our " - f"conversation from a previous session: {summary_content}" - ) - - # Prepare result dictionary - result_dict = { - "summary": summary_content, - "file_path": str(file_path), - "status": save_status, - "structured_summary": structured_output, - } - - result.update(result_dict) - logger.info("Conversation summary saved to %s", file_path) - return result - - except Exception as exc: - error_message = f"Failed to summarize conversation context: {exc}" - logger.error(error_message) - result["status"] = error_message - return result + # Delegate to MessageSummarizer + return await self._message_summarizer.asummarize( + messages=messages, + agent_id=self.agent_id, + filename=filename, + summary_prompt=summary_prompt, + response_format=response_format, + working_directory=working_directory, + include_summaries=include_summaries, + add_user_messages=add_user_messages, + ) def clear_memory(self) -> None: r"""Clear the agent's memory and reset to initial state. diff --git a/camel/utils/message_summarizer.py b/camel/utils/message_summarizer.py index 26115b6797..381953a191 100644 --- a/camel/utils/message_summarizer.py +++ b/camel/utils/message_summarizer.py @@ -11,14 +11,21 @@ # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= -from typing import List, Optional +import json +import textwrap +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Type, Union from pydantic import BaseModel, Field -from camel.agents import ChatAgent -from camel.messages import BaseMessage +from camel.logger import get_logger +from camel.messages import BaseMessage, OpenAIMessage from camel.models import BaseModelBackend, ModelFactory from camel.types import ModelPlatformType, ModelType +from camel.utils.context_utils import ContextUtility + +logger = get_logger(__name__) class MessageSummary(BaseModel): @@ -80,6 +87,10 @@ def __init__( ) else: self.model_backend = model_backend + + # Import ChatAgent here to avoid circular import + from camel.agents import ChatAgent + self.agent = ChatAgent( BaseMessage.make_assistant_message( role_name="Message Summarizer", @@ -105,6 +116,157 @@ def __init__( model=self.model_backend, ) + def build_default_summary_prompt(self, conversation_text: str) -> str: + r"""Create the default prompt used for conversation summarization. + + Args: + conversation_text (str): The conversation to be summarized. + + Returns: + str: A formatted prompt instructing the model to produce a + structured markdown summary. + """ + template = textwrap.dedent( + """\ + Summarize the conversation below. + Produce markdown that strictly follows this outline and numbering: + + Summary: + 1. **Primary Request and Intent**: + 2. **Key Concepts**: + 3. **Errors and Fixes**: + 4. **Problem Solving**: + 5. **Pending Tasks**: + 6. **Current Work**: + 7. **Optional Next Step**: + + Requirements: + - Use bullet lists under each section (`- item`). If a section + has no information, output `- None noted`. + - Keep the ordering, headings, and formatting as written above. + - Focus on concrete actions, findings, and decisions. + - Do not invent details that are not supported by the conversation. + + Conversation: + {conversation_text} + """ + ) + return template.format(conversation_text=conversation_text) + + @staticmethod + def _append_user_messages_section( + summary_content: str, user_messages: List[str] + ) -> str: + r"""Append a section with user messages to the summary content. + + Args: + summary_content (str): The existing summary content. + user_messages (List[str]): List of user messages to append. + + Returns: + str: Updated summary content with user messages section. + """ + section_title = "- **All User Messages**:" + sanitized_messages: List[str] = [] + for msg in user_messages: + if not isinstance(msg, str): + msg = str(msg) + cleaned = " ".join(msg.strip().splitlines()) + if cleaned: + sanitized_messages.append(cleaned) + + bullet_block = ( + "\n".join(f"- {m}" for m in sanitized_messages) + if sanitized_messages + else "- None noted" + ) + user_section = f"{section_title}\n{bullet_block}" + + summary_clean = summary_content.rstrip() + return f"{summary_clean}\n\n{user_section}" + + @staticmethod + def _convert_messages_to_text( + messages: List[OpenAIMessage], include_summaries: bool + ) -> tuple[str, List[str]]: + r"""Convert OpenAI messages to conversation text. + + Args: + messages (List[OpenAIMessage]): List of OpenAI messages. + include_summaries (bool): Whether to include summary messages. + + Returns: + tuple[str, List[str]]: Conversation text and list of user messages. + """ + conversation_lines = [] + user_messages: List[str] = [] + for message in messages: + role = message.get('role', 'unknown') + content = message.get('content', '') + + # Skip summary messages if include_summaries is False + if not include_summaries and isinstance(content, str): + # Check if this is a summary message by looking for marker + if content.startswith('[CONTEXT_SUMMARY]'): + continue + + # Handle tool call messages (assistant calling tools) + tool_calls = message.get('tool_calls') + if tool_calls and isinstance(tool_calls, (list, tuple)): + for tool_call in tool_calls: + # Handle both dict and object formats + if isinstance(tool_call, dict): + func_name = tool_call.get('function', {}).get( + 'name', 'unknown_tool' + ) + func_args_str = tool_call.get('function', {}).get( + 'arguments', '{}' + ) + else: + # Handle object format (Pydantic or similar) + func_name = getattr( + getattr(tool_call, 'function', None), + 'name', + 'unknown_tool', + ) + func_args_str = getattr( + getattr(tool_call, 'function', None), + 'arguments', + '{}', + ) + + # Parse and format arguments for readability + try: + args_dict = json.loads(func_args_str) + args_formatted = ', '.join( + f"{k}={v}" for k, v in args_dict.items() + ) + except (json.JSONDecodeError, ValueError, TypeError): + args_formatted = func_args_str + + conversation_lines.append( + f"[TOOL CALL] {func_name}({args_formatted})" + ) + + # Handle tool response messages + elif role == 'tool': + tool_name = message.get('name', 'unknown_tool') + if not content: + content = str(message.get('content', '')) + conversation_lines.append( + f"[TOOL RESULT] {tool_name} → {content}" + ) + + # Handle regular content messages (user/assistant/system) + elif content: + content = str(content) + if role == 'user': + user_messages.append(content) + conversation_lines.append(f"{role}: {content}") + + conversation_text = "\n".join(conversation_lines).strip() + return conversation_text, user_messages + def summarize(self, messages: List[BaseMessage]) -> MessageSummary: r"""Generate a structured summary of the provided messages. @@ -146,3 +308,211 @@ def summarize(self, messages: List[BaseMessage]) -> MessageSummary: raise ValueError("The parsed response is not a MessageSummary.") return summary + + async def asummarize( + self, + messages: List[OpenAIMessage], + agent_id: str = "agent", + filename: Optional[str] = None, + summary_prompt: Optional[str] = None, + response_format: Optional[Type[BaseModel]] = None, + working_directory: Optional[Union[str, Path]] = None, + include_summaries: bool = False, + add_user_messages: bool = True, + ) -> Dict[str, Any]: + r"""Asynchronously summarize conversation messages and persist to file. + + This method converts OpenAI-format messages to a readable conversation + text, generates a summary using an LLM, and saves the result to a + markdown file. + + Args: + messages (List[OpenAIMessage]): List of OpenAI messages to + summarize. + agent_id (str): Identifier for the agent generating the summary. + (default: :obj:`"agent"`) + filename (Optional[str]): The base filename (without extension) to + use for the markdown file. Defaults to a timestamped name when + not provided. + summary_prompt (Optional[str]): Custom prompt for the summarizer. + When omitted, a default prompt highlighting key decisions, + action items, and open questions is used. + response_format (Optional[Type[BaseModel]]): A Pydantic model + defining the expected structure of the response. If provided, + the summary will be generated as structured output and included + in the result. + working_directory (Optional[str|Path]): Optional directory to save + the markdown summary file. If provided, overrides the default + directory used by ContextUtility. + include_summaries (bool): Whether to include previously generated + summaries in the content to be summarized. If False (default), + only non-summary messages will be summarized. If True, all + messages including previous summaries will be summarized + (full compression). (default: :obj:`False`) + add_user_messages (bool): Whether add user messages to summary. + (default: :obj:`True`) + + Returns: + Dict[str, Any]: A dictionary containing the summary text, file + path, status message, and optionally structured_summary if + response_format was provided. + """ + result: Dict[str, Any] = { + "summary": "", + "file_path": None, + "status": "", + } + + try: + # Initialize context utility + if working_directory is not None: + context_util = ContextUtility( + working_directory=str(working_directory) + ) + else: + context_util = ContextUtility() + + if not messages: + status_message = ( + "No conversation context available to summarize." + ) + result["status"] = status_message + return result + + # Convert messages to conversation text + conversation_text, user_messages = self._convert_messages_to_text( + messages, include_summaries + ) + + if not conversation_text: + status_message = ( + "Conversation context is empty; skipping summary." + ) + result["status"] = status_message + return result + + # Reset agent for fresh summarization + self.agent.reset() + + # Build prompt + if summary_prompt: + prompt_text = ( + f"{summary_prompt.rstrip()}\n\n" + f"AGENT CONVERSATION TO BE SUMMARIZED:\n" + f"{conversation_text}" + ) + else: + prompt_text = self.build_default_summary_prompt( + conversation_text + ) + + try: + # Use structured output if response_format is provided + if response_format: + response = await self.agent.astep( + prompt_text, response_format=response_format + ) + else: + response = await self.agent.astep(prompt_text) + + except Exception as step_exc: + error_message = ( + f"Failed to generate summary using model: {step_exc}" + ) + logger.error(error_message) + result["status"] = error_message + return result + + if not response.msgs: + status_message = ( + "Failed to generate summary from model response." + ) + result["status"] = status_message + return result + + summary_content = response.msgs[-1].content.strip() + if not summary_content: + status_message = "Generated summary is empty." + result["status"] = status_message + return result + + # Handle structured output if response_format was provided + structured_output = None + if response_format and response.msgs[-1].parsed: + structured_output = response.msgs[-1].parsed + + # Determine filename: use provided filename, or extract from + # structured output, or generate timestamp + if filename: + base_filename = filename + elif structured_output and hasattr( + structured_output, 'task_title' + ): + # Use task_title from structured output for filename + task_title = structured_output.task_title + clean_title = ContextUtility.sanitize_workflow_filename( + task_title + ) + base_filename = ( + f"{clean_title}_workflow" if clean_title else "workflow" + ) + else: + base_filename = f"context_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # noqa: E501 + + base_filename = Path(base_filename).with_suffix("").name + + metadata = context_util.get_session_metadata() + metadata.update( + { + "agent_id": agent_id, + "message_count": len(messages), + } + ) + + # Convert structured output to custom markdown if present + if structured_output: + # Convert structured output to custom markdown + summary_content = context_util.structured_output_to_markdown( + structured_data=structured_output, metadata=metadata + ) + if add_user_messages: + summary_content = self._append_user_messages_section( + summary_content, user_messages + ) + + # Save the markdown (either custom structured or default) + save_status = context_util.save_markdown_file( + base_filename, + summary_content, + title="Conversation Summary" + if not structured_output + else None, + metadata=metadata if not structured_output else None, + ) + + file_path = ( + context_util.get_working_directory() / f"{base_filename}.md" + ) + + summary_content = ( + f"[CONTEXT_SUMMARY] The following is a summary of our " + f"conversation from a previous session: {summary_content}" + ) + + # Prepare result dictionary + result_dict = { + "summary": summary_content, + "file_path": str(file_path), + "status": save_status, + "structured_summary": structured_output, + } + + result.update(result_dict) + logger.info("Conversation summary saved to %s", file_path) + return result + + except Exception as exc: + error_message = f"Failed to summarize conversation context: {exc}" + logger.error(error_message) + result["status"] = error_message + return result diff --git a/examples/utils/message_summarizer_example.py b/examples/utils/message_summarizer_example.py new file mode 100644 index 0000000000..8c991699d3 --- /dev/null +++ b/examples/utils/message_summarizer_example.py @@ -0,0 +1,388 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +""" +Example demonstrating the use of MessageSummarizer for conversation +summarization. + +This example shows: +1. Using MessageSummarizer standalone for structured summaries +2. Using ChatAgent.asummarize() which delegates to MessageSummarizer +3. Custom prompts and working directories +4. Parallel summarization of multiple agents +5. ContextSummarizerToolkit integration with MessageSummarizer +6. Adding toolkit tools to agents for self-managed memory +""" + +import asyncio + +from camel.agents import ChatAgent +from camel.messages import BaseMessage +from camel.models import ModelFactory +from camel.toolkits import ContextSummarizerToolkit +from camel.types import ModelPlatformType, ModelType +from camel.utils.message_summarizer import MessageSummarizer + + +def example_1_standalone_message_summarizer(): + """Example 1: Using MessageSummarizer standalone for summaries.""" + print("\n" + "=" * 70) + print("Example 1: Standalone MessageSummarizer with Structured Output") + print("=" * 70 + "\n") + + # Create a MessageSummarizer instance + model = ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=ModelType.GPT_4O_MINI, + ) + summarizer = MessageSummarizer(model_backend=model) + + # Create some sample messages + messages = [ + BaseMessage.make_user_message( + role_name="User", + content=( + "I need help building a Python web scraper for news " + "articles." + ), + ), + BaseMessage.make_assistant_message( + role_name="Assistant", + content=( + "I can help you build a web scraper. We'll use " + "BeautifulSoup and requests." + ), + ), + BaseMessage.make_user_message( + role_name="User", + content="Great! Can we also add support for handling pagination?", + ), + BaseMessage.make_assistant_message( + role_name="Assistant", + content=( + "Yes, we can iterate through pages by following 'next' " + "links." + ), + ), + ] + + # Generate structured summary using the summarize method + summary = summarizer.summarize(messages) + + print("Structured Summary:") + print(f" Summary: {summary.summary}") + print(f" Participants: {summary.participants}") + print(f" Key Topics: {summary.key_topics_and_entities}") + print(f" Decisions: {summary.decisions_and_outcomes}") + print(f" Action Items: {summary.action_items}") + print(f" Progress: {summary.progress_on_main_task}") + + +async def example_2_chat_agent_asummarize(): + """Example 2: Using ChatAgent.asummarize() for summarization.""" + print("\n" + "=" * 70) + print("Example 2: ChatAgent.asummarize() with File Persistence") + print("=" * 70 + "\n") + + # Create a ChatAgent + agent = ChatAgent( + system_message=BaseMessage.make_assistant_message( + role_name="Assistant", + content="You are a helpful coding assistant.", + ), + model=ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=ModelType.GPT_4O_MINI, + ), + ) + + # Simulate a conversation + user_messages = [ + "Can you help me optimize a SQL query?", + "It's taking too long to run on large datasets.", + "The query joins 3 tables and has multiple WHERE clauses.", + ] + + for msg in user_messages: + agent.step(msg) + + # Summarize the conversation and save to file + result = await agent.asummarize( + filename="sql_optimization_session", + working_directory="./conversation_summaries", + include_summaries=False, + add_user_messages=True, + ) + + print("Summary Result:") + print(f" Status: {result['status']}") + print(f" File Path: {result['file_path']}") + print(f" Summary Preview: {result['summary'][:200]}...") + + +async def example_3_custom_prompt(): + """Example 3: Using custom summary prompts.""" + print("\n" + "=" * 70) + print("Example 3: Custom Summary Prompt") + print("=" * 70 + "\n") + + # Create a ChatAgent + agent = ChatAgent( + system_message=BaseMessage.make_assistant_message( + role_name="Assistant", + content="You are a project management assistant.", + ), + model=ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=ModelType.GPT_4O_MINI, + ), + ) + + # Simulate a project discussion + messages = [ + "We need to discuss the Q4 roadmap.", + "What are the key features we should prioritize?", + "Let's focus on user authentication and payment integration.", + ] + + for msg in messages: + agent.step(msg) + + # Custom prompt for project-focused summary + custom_prompt = """ + Summarize the following project discussion. + Focus on: + 1. Project goals and objectives + 2. Key features and priorities + 3. Timeline and milestones (if mentioned) + 4. Team assignments (if mentioned) + 5. Next steps and action items + + Provide a concise, actionable summary. + """ + + # Summarize with custom prompt + result = await agent.asummarize( + summary_prompt=custom_prompt, + filename="q4_roadmap_discussion", + working_directory="./project_summaries", + ) + + print("Custom Summary Result:") + print(f" Status: {result['status']}") + print(f" File Path: {result['file_path']}") + + +async def example_4_parallel_summarization(): + """Example 4: Parallel summarization of multiple agent conversations.""" + print("\n" + "=" * 70) + print("Example 4: Parallel Summarization of Multiple Agents") + print("=" * 70 + "\n") + + # Create multiple agents for different tasks + agent_1 = ChatAgent( + system_message=BaseMessage.make_assistant_message( + role_name="CodeReviewer", + content="You are a code review assistant.", + ), + model=ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=ModelType.GPT_4O_MINI, + ), + agent_id="code_reviewer", + ) + + agent_2 = ChatAgent( + system_message=BaseMessage.make_assistant_message( + role_name="TestWriter", + content="You are a test writing assistant.", + ), + model=ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=ModelType.GPT_4O_MINI, + ), + agent_id="test_writer", + ) + + # Simulate conversations + agent_1.step("Review this function for performance issues.") + agent_2.step("Write unit tests for the authentication module.") + + # Summarize both agents in parallel + results = await asyncio.gather( + agent_1.asummarize(filename="code_review_session"), + agent_2.asummarize(filename="test_writing_session"), + ) + + print("Parallel Summarization Results:") + for i, result in enumerate(results, 1): + print(f" Agent {i}:") + print(f" Status: {result['status']}") + print(f" File: {result['file_path']}") + + +def example_5_context_summarizer_toolkit(): + """Example 5: Using ContextSummarizerToolkit with MessageSummarizer.""" + print("\n" + "=" * 70) + print("Example 5: ContextSummarizerToolkit Integration") + print("=" * 70 + "\n") + + # Create a ChatAgent + agent = ChatAgent( + system_message=BaseMessage.make_assistant_message( + role_name="Assistant", + content="You are a helpful assistant with context management.", + ), + model=ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=ModelType.GPT_4O_MINI, + ), + ) + + # Create ContextSummarizerToolkit with custom prompt template + custom_template = """ + You are analyzing a technical conversation. Extract: + - Main technical challenges discussed + - Solutions implemented or proposed + - Tools and technologies mentioned + - Unresolved issues or blockers + """ + + toolkit = ContextSummarizerToolkit( + agent=agent, + working_directory="./toolkit_summaries", + summary_prompt_template=custom_template, + ) + + # Simulate a long conversation that needs compression + messages = [ + "I'm working on a microservices architecture.", + "We need to implement service discovery.", + "Should we use Consul or Eureka?", + "Let's use Consul with Docker.", + "How do we handle authentication across services?", + "We should implement JWT tokens.", + "What about rate limiting?", + "We can use Redis for distributed rate limiting.", + "We also need to set up monitoring.", + "Let's use Prometheus and Grafana.", + # ... Many more messages to simulate memory pressure + ] + + for msg in messages: + agent.step(msg) + + # Check if we should compress context + should_compress = toolkit.should_compress_context( + message_limit=10, # Low limit for demo purposes + ) + print(f"Should compress context: {should_compress}") + + if should_compress: + # Use the toolkit to summarize and compress + result = toolkit.summarize_full_conversation_history() + print(f"\nCompression Result: {result}") + + # Get memory info + memory_info = toolkit.get_conversation_memory_info() + print(f"\nMemory Info:\n{memory_info}") + + # Search through conversation history + search_results = toolkit.search_full_conversation_history( + keywords=["authentication", "JWT", "tokens"], + top_k=3, + ) + print(f"\nSearch Results:\n{search_results}") + + # Get the current summary + current_summary = toolkit.get_current_summary() + if current_summary: + print(f"\nCurrent Summary:\n{current_summary[:200]}...") + + +def example_6_toolkit_with_agent_tools(): + """Example 6: Using toolkit tools with an agent.""" + print("\n" + "=" * 70) + print("Example 6: Agent with ContextSummarizer Tools") + print("=" * 70 + "\n") + + # Create a ChatAgent with tools + agent = ChatAgent( + system_message=BaseMessage.make_assistant_message( + role_name="Assistant", + content=( + "You are a helpful assistant with memory management tools. " + "You can summarize conversations, search history, and check " + "memory status." + ), + ), + model=ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=ModelType.GPT_4O_MINI, + ), + ) + + # Create toolkit and add its tools to the agent + toolkit = ContextSummarizerToolkit( + agent=agent, + working_directory="./agent_with_tools_summaries", + ) + + # Add toolkit tools to agent + agent.add_tools(toolkit.get_tools()) + + # Simulate conversation + print("Simulating conversation with memory management tools...") + + # Agent can now use summarization tools when needed + print("\nAvailable Tools:") + for tool in toolkit.get_tools(): + print(f" - {tool.get_function_name()}") + + # The agent can now call these tools during conversations to manage + # its own memory, especially useful for long-running conversations + print("\nAgent now has access to:") + print(" 1. summarize_full_conversation_history()") + print(" 2. search_full_conversation_history(keywords, top_k)") + print(" 3. get_conversation_memory_info()") + print( + "\nThese tools use MessageSummarizer internally for unified " + "summarization!" + ) + + +def main(): + """Run all examples.""" + print("\n" + "=" * 70) + print("MessageSummarizer Examples") + print("=" * 70) + + # Example 1: Standalone MessageSummarizer (synchronous) + example_1_standalone_message_summarizer() + + # Examples 2-4: Async examples + asyncio.run(example_2_chat_agent_asummarize()) + asyncio.run(example_3_custom_prompt()) + asyncio.run(example_4_parallel_summarization()) + + # Examples 5-6: ContextSummarizerToolkit integration + example_5_context_summarizer_toolkit() + example_6_toolkit_with_agent_tools() + + print("\n" + "=" * 70) + print("All examples completed!") + print("=" * 70 + "\n") + + +if __name__ == "__main__": + main()