Skip to content

Conversation

@Soulter
Copy link
Member

@Soulter Soulter commented Jan 4, 2026

closes: #4148

related to PR: #4178

Co-authored-by: @kawayiYokami

TODOs:

  • use TokenUsage to calculate context token usage.
  • fully test

Modifications / 改动点

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果


Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

由 Sourcery 提供的摘要

为本地代理引入可配置的上下文管理与压缩系统,包括在达到上下文限制时使用基于 LLM 的总结功能。

新功能:

  • 新增 ContextManager,通过可插拔的压缩器(按轮数截断以及基于 LLM 的总结)在 token 限制内管理会话历史。
  • 支持按提供商配置 max_context_tokens,在后端和控制台中,如模型元数据可用,则自动填充。
  • 在提供商设置中暴露上下文管理选项,包括处理上下文溢出的策略以及 LLM 压缩参数。

改进:

  • 重构本地代理运行器,将上下文截断/压缩委托给新的 ContextManagerContextTruncator,并将其接入整个处理流水线。
  • 调整控制台配置界面的布局和输入方式,以改进滑块/文本框对齐和可读性。
Original summary in English

Summary by Sourcery

Introduce a configurable context management and compression system for local agents, including LLM-based summarization when context limits are reached.

New Features:

  • Add a ContextManager with pluggable compressors (truncate-by-turns and LLM-based summarization) to manage conversation history within token limits.
  • Support provider-specific max_context_tokens configuration, auto-filled from model metadata in both backend and dashboard when available.
  • Expose context management options in provider settings, including strategies for handling context overflows and LLM compression parameters.

Enhancements:

  • Refactor local agent runner to delegate context truncation/compression to the new ContextManager and ContextTruncator, and wire these through the processing pipeline.
  • Adjust dashboard config UI layout and inputs for better slider/text field alignment and readability.

Co-authored-by: kawayiYokami <[email protected]>
@auto-assign auto-assign bot requested review from Fridemn and anka-afk January 4, 2026 16:31
@dosubot dosubot bot added size:XL This PR changes 500-999 lines, ignoring generated files. area:core The bug / feature is about astrbot's core, backend labels Jan 4, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 7 个问题,并留下了一些总体反馈:

  • ContextTruncator.truncate_by_turns 中,当 keep_most_recent_turns <= dequeue_turns 或非常小时,切片 (-(keep_most_recent_turns - dequeue_turns + 1) * 2 :) 的行为会比较意外,甚至可能删除几乎所有内容;建议将 dequeue_turns 相对于 keep_most_recent_turns 做约束,并针对这些边界情况增加显式的保护和测试。
  • 默认的 LLM 压缩指令字符串同时出现在配置默认值和 LLMSummaryCompressor 中,存在重复;建议将这个默认值集中管理(例如使用一个共享常量),避免未来修改时后端逻辑与配置产生偏差。
  • TokenCounter._estimate_tokens 的启发式规则(每字符 0.6/0.3)比较随意,可能与实际模型的分词差距较大;如果这个值用于控制压缩行为,建议使该启发式在不同 provider/model 维度上可配置,或者明确限制其仅用于非关键的、近似性的决策。
供 AI Agent 使用的提示词
Please address the comments from this code review:

## Overall Comments
-`ContextTruncator.truncate_by_turns` 中,当 `keep_most_recent_turns <= dequeue_turns` 或非常小时,切片 `(-(keep_most_recent_turns - dequeue_turns + 1) * 2 :)` 的行为会比较意外,甚至可能删除几乎所有内容;建议将 `dequeue_turns` 相对于 `keep_most_recent_turns` 做约束,并针对这些边界情况增加显式的保护和测试。
- 默认的 LLM 压缩指令字符串同时出现在配置默认值和 `LLMSummaryCompressor` 中,存在重复;建议将这个默认值集中管理(例如使用一个共享常量),避免未来修改时后端逻辑与配置产生偏差。
- `TokenCounter._estimate_tokens` 的启发式规则(每字符 0.6/0.3)比较随意,可能与实际模型的分词差距较大;如果这个值用于控制压缩行为,建议使该启发式在不同 provider/model 维度上可配置,或者明确限制其仅用于非关键的、近似性的决策。

## Individual Comments

### Comment 1
<location> `astrbot/core/agent/context/truncator.py:22-31` </location>
<code_context>
+    def truncate_by_turns(
</code_context>

<issue_to_address>
**issue (bug_risk):** truncate_by_turns 在 keep_most_recent_turns 较小或为 0 时语义不清晰,可能导致出人意料的切片结果

`keep_most_recent_turns``dequeue_turns` 之间的交互关系比较难以理解,并且在取值较小时行为会变得古怪。比如:
-`keep_most_recent_turns == 0``dequeue_turns == 1` 时,`(keep_most_recent_turns - dequeue_turns + 1) * 2` 等于 0,因此 `messages[-0:]` 实际上会返回完整列表(完全没有截断)。
-`keep_most_recent_turns < dequeue_turns` 时,会产生负的切片下标,从而删除超过 `dequeue_turns` 的轮数。
更清晰的一种方式,是先根据“轮”的计数算出需要保留的消息数量,仅从末尾进行一次切片,然后再根据消息边界进行调整。这样可以消除负/零切片的边界情况,使得在所有调用点上都更容易推理其行为。
</issue_to_address>

### Comment 2
<location> `astrbot/core/agent/context/truncator.py:68-77` </location>
<code_context>
+
+        return self.fix_messages(truncated_contexts)
+
+    def truncate_by_halving(
+        self,
+        messages: list[Message],
+    ) -> list[Message]:
+        """对半砍策略,删除 50% 的消息"""
+        if len(messages) <= 2:
+            return messages
+
+        first_non_system = 0
+        for i, msg in enumerate(messages):
+            if msg.role != "system":
+                first_non_system = i
+                break
+
+        messages_to_delete = (len(messages) - first_non_system) // 2
+
+        result = messages[:first_non_system]
+        result.extend(messages[first_non_system + messages_to_delete :])
+
+        index = next(
+            (i for i, item in enumerate(result) if item.role == "user"),
+            None,
+        )
+        if index is not None:
+            result = result[index:]
+
+        return self.fix_messages(result)
</code_context>

<issue_to_address>
**issue (bug_risk):** 对半截断策略可能会丢弃初始 system 消息以及更早的指令

在 `truncate_by_halving` 中,你首先保留了前面的 system 消息,但之后又从第一个 `user` 角色开始重新对 `result` 做切片:
```python
index = next((i for i, item in enumerate(result) if item.role == "user"), None)
if index is not None:
    result = result[index:]
```
这可能会丢弃之前保留的 system 提示和出现在首个 user 消息之前的其它非用户指令,相当于在“对半截断对话历史”之前就把配置类信息丢掉了。更安全的做法是像 `truncate_by_turns` 一样,始终保留初始的 system 消息,仅对非 system 部分执行对半截断,然后在不对其应用基于 `user` 的切片逻辑的前提下,再把 system 消息拼接回结果。
</issue_to_address>

### Comment 3
<location> `astrbot/core/agent/context/compressor.py:42-51` </location>
<code_context>
+        return messages
+
+
+class TruncateByTurnsCompressor(ContextCompressor):
+    """Truncate by turns compressor implementation.
+    Truncates the message list by removing older turns.
+    """
+
+    def __init__(self, truncate_turns: int = 1):
+        """Initialize the truncate by turns compressor.
+
+        Args:
+            truncate_turns: The number of turns to remove when truncating (default: 1).
+        """
+        self.truncate_turns = truncate_turns
+
+    async def compress(self, messages: list[Message]) -> list[Message]:
+        truncator = ContextTruncator()
+        truncated_messages = truncator.truncate_by_turns(
+            messages,
+            keep_most_recent_turns=0,
</code_context>

<issue_to_address>
**issue (bug_risk):** 在 TruncateByTurnsCompressor 中使用 keep_most_recent_turns=0 可能无法真正删除历史

`truncate_by_turns` 的保护分支与切片逻辑在 `keep_most_recent_turns=0` 时,往往会直接返回未修改的 `messages`(在某些情况下甚至保留完整列表,参见前一个评论)。这意味着该压缩器可能实际上并未删除 `truncate_turns` 对应数量的历史轮次。可以考虑添加一个专门的“丢弃最旧 N 个轮次”的操作,不依赖 `keep_most_recent_turns`;或者传入一个非零的 `keep_most_recent_turns`,以匹配你期望保留的窗口大小。
</issue_to_address>

### Comment 4
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:57` </location>
<code_context>
         self.req = request
         self.streaming = kwargs.get("streaming", False)
+
+        # enforce max turns, will discard older turns when exceeded BEFORE compression
+        # -1 means no limit
+        self.enforce_max_turns = kwargs.get("enforce_max_turns", -1)
</code_context>

<issue_to_address>
**issue (complexity):** 建议将所有截断与压缩逻辑统一收敛到一个基于配置驱动的 ContextManager.process 调用中,这样 runner 就不需要直接管理上下文细节。

你可以通过让 `ContextManager` 成为 *所有* 上下文变更(轮次截断 + 压缩)的单一入口点,并在其中集中处理配置与错误,来降低新增的复杂度。

### 1. 将截断 + 压缩统一到一个 `ContextManager.process` 调用

当前 `step` 需要协调两条流程:

```python
# step()
if self.enforce_max_turns != -1:
    try:
        truncated_messages = self.context_truncator.truncate_by_turns(
            self.run_context.messages,
            keep_most_recent_turns=self.enforce_max_turns,
            dequeue_turns=self.truncate_turns,
        )
        self.run_context.messages = truncated_messages
    except Exception as e:
        logger.error(...)

try:
    await self.do_context_compress()
except Exception as e:
    logger.error(...)
```

可以把这部分逻辑下沉到 `ContextManager`,使得 runner 仅需:

```python
# step()
try:
    self.run_context.messages = await self.context_manager.process(
        self.run_context.messages
    )
except Exception as e:
    logger.error("Error during context processing: %s", e, exc_info=True)
```

同时可以完全移除 runner 中的 `ContextTruncator``do_context_compress`### 2. 将配置集中到一个统一的 context 配置对象中

目前一部分开关在 runner 中,一部分在 `ContextManager` 中,可以通过定义一个小型配置对象,并在初始化时一次性传入来统一管理:

```python
# context/config.py
from dataclasses import dataclass
from typing import Optional
from astrbot.core.provider.provider import Provider

@dataclass
class ContextConfig:
    max_context_tokens: int
    enforce_max_turns: int  # -1 means no limit
    truncate_turns: int
    llm_compress_instruction: Optional[str]
    llm_compress_keep_recent: int
    llm_compress_provider: Optional[Provider]
````reset` 中使用:

```python
# in __init__/reset of the runner
from ..context.config import ContextConfig

self.context_config = ContextConfig(
    max_context_tokens=provider.provider_config.get("max_context_tokens", 0),
    enforce_max_turns=kwargs.get("enforce_max_turns", -1),
    truncate_turns=kwargs.get("truncate_turns", 1),
    llm_compress_instruction=kwargs.get("llm_compress_instruction"),
    llm_compress_keep_recent=kwargs.get("llm_compress_keep_recent", 0),
    llm_compress_provider=kwargs.get("llm_compress_provider"),
)

self.context_manager = ContextManager(self.context_config)
```

这样 `ToolLoopAgentRunner` 就无需了解 `truncate_by_turns` 或压缩实现的细节。

### 3. 将截断 + 压缩 + 错误处理移动到 `ContextManager` 内部`ContextManager.process` 中实现组合逻辑,使行为保持不变但在本地更加一致集中:

```python
# context/manager.py
class ContextManager:
    def __init__(self, config: ContextConfig) -> None:
        self.config = config
        self._truncator = ContextTruncator()
        # existing token counter/compressor init...

    async def process(self, messages: list[Message]) -> list[Message]:
        try:
            result = messages

            # 1. 基于轮次的截断
            if self.config.enforce_max_turns != -1:
                result = self._truncator.truncate_by_turns(
                    result,
                    keep_most_recent_turns=self.config.enforce_max_turns,
                    dequeue_turns=self.config.truncate_turns,
                )

            # 2. 基于 token 的压缩(沿用现有逻辑)
            result = await self._compress_if_needed(result)

            return result
        except Exception as e:
            logger.error("Error during context processing: %s", e, exc_info=True)
            # 尽力而为:返回当前结果(或回退到原始 messages)
            return messages
```

然后可以删除 runner 中的:

```python
self.context_truncator = ContextTruncator()
...
async def do_context_compress(self): ...
...
# 以及 step() 中两个独立的 try/except 块
```

这样既保留了所有现有功能(最大轮次限制、轮次出队、压缩),又带来以下好处:

- runner 只需要一次 `context_manager.process(...)` 调用。
- 所有配置集中在一个地方(`ContextConfig`)。
- 上下文逻辑的错误处理也集中在 `ContextManager` 内部。
</issue_to_address>

### Comment 5
<location> `astrbot/core/agent/context/manager.py:14` </location>
<code_context>
+    from astrbot.core.provider.provider import Provider
+
+
+class ContextManager:
+    """Context compression manager."""
+
</code_context>

<issue_to_address>
**issue (complexity):** 建议将压缩和 token 检查逻辑直接内联到 `process` 中,避免额外的辅助方法,让控制流程在一个地方更容易理解。

你可以通过把线性的压缩流程内联到 `process` 中,并移除两个辅助方法,来简化 `ContextManager`。这样既能保持所有行为(阈值检查、压缩、对半截断)不变,又能减少间接层级和心智负担。

### 简化 `process`,移除 `_initial_token_check` / `_run_compression`

```python
class ContextManager:
    COMPRESSION_THRESHOLD = 0.82

    async def process(self, messages: list[Message]) -> list[Message]:
        """Process the messages, compressing/truncating if needed."""
        if self.max_context_tokens <= 0 or not messages:
            return messages

        # Initial token check
        total_tokens = self.token_counter.count_tokens(messages)
        usage_rate = total_tokens / self.max_context_tokens
        logger.debug(
            "ContextManager: total tokens = %s, max_context_tokens = %s",
            total_tokens,
            self.max_context_tokens,
        )

        if usage_rate <= self.COMPRESSION_THRESHOLD:
            return messages

        # First pass: primary compression strategy
        messages = await self.compressor.compress(messages)

        # Second pass: double-check and halve if still too large
        tokens_after = self.token_counter.count_tokens(messages)
        if tokens_after / self.max_context_tokens > self.COMPRESSION_THRESHOLD:
            messages = self.truncator.truncate_by_halving(messages)

        return messages
```

这样就可以安全地移除 `_initial_token_check``_run_compression`。行为保持不变,但控制流程变成了一个易于理解的单一管线:

1. 检查限制 → 提前返回。
2. 统计 token → 阈值检查。
3. 通过选定策略压缩。
4. 再次统计,如果仍然过大则对半截断。
</issue_to_address>

### Comment 6
<location> `astrbot/core/agent/context/compressor.py:14` </location>
<code_context>
+from ..context.truncator import ContextTruncator
+
+
+class ContextCompressor(ABC):
+    """
+    Abstract base class for context compressors.
</code_context>

<issue_to_address>
**issue (complexity):** 建议通过用可调用/Protocol 替代 ABC、将 no-op 默认压缩器内联,以及把消息切分从 LLM 调用中分离出来,来简化压缩器设计、减少间接层级并分离关注点。

在保留全部功能的前提下,可以适当缩小抽象的表面面积。

### 1. 用轻量的 Protocol / 可调用对象替代 ABC

如果所有压缩器都是在代码中显式声明并接线(没有动态插件加载),那么 ABC 的开销略显过重。使用 `Protocol` 或简单的可调用类型可以保留“策略”这个概念,同时减少基类带来的间接层级。

例如:

```python
# compressor.py
from typing import Protocol, TYPE_CHECKING, Awaitable, Callable

if TYPE_CHECKING:
    from ..message import Message

class ContextCompressor(Protocol):
    async def __call__(self, messages: list["Message"]) -> list["Message"]:
        ...
```

然后压缩器可以直接实现为可调用对象:

```python
from dataclasses import dataclass
from ..context.truncator import ContextTruncator

@dataclass
class TruncateByTurnsCompressor:
    truncate_turns: int = 1

    async def __call__(self, messages: list[Message]) -> list[Message]:
        truncator = ContextTruncator()
        return truncator.truncate_by_turns(
            messages,
            keep_most_recent_turns=0,
            dequeue_turns=self.truncate_turns,
        )
````ContextManager` 中的使用方式也会更简单:

```python
self._compressor: ContextCompressor = TruncateByTurnsCompressor(truncate_turns=1)

compressed = await self._compressor(messages)
```

这样既保留了多态性,又移除了抽象基类带来的额外间接层级。

### 2. 用一个简单的可调用替代 `DefaultCompressor`

如果默认行为只是“什么都不做并返回原消息”,而 `DefaultCompressor` 只是编码了 `return messages`,那么可以避免额外的命名类:

```python
from typing import Awaitable, Callable

NoopCompressor: ContextCompressor = lambda messages: _noop(messages)

async def _noop(messages: list[Message]) -> list[Message]:
    return messages
```

或者在当前使用 `DefaultCompressor()` 的地方直接传入 `_noop`。这样可以在不增加额外类型的前提下保持原有行为。

### 3. 在 `LLMSummaryCompressor` 中将消息切分逻辑从 LLM 调用中拆分出来

`LLMSummaryCompressor` 目前既负责消息分段,又负责调用 provider。你可以保留该类,但通过抽取一个纯函数来完成切分,使逻辑更容易理解和测试:

```python
def _split_history(
    messages: list[Message], keep_recent: int
) -> tuple[Message | None, list[Message], list[Message]]:
    system_msg = messages[0] if messages and messages[0].role == "system" else None
    start_idx = 1 if system_msg else 0

    messages_to_summarize = messages[start_idx : -keep_recent]
    recent_messages = messages[-keep_recent:] if keep_recent > 0 else []

    return system_msg, messages_to_summarize, recent_messages
```

然后 `compress` 可以变得更加线性:

```python
async def compress(self, messages: list[Message]) -> list[Message]:
    if len(messages) <= self.keep_recent + 1:
        return messages

    system_msg, messages_to_summarize, recent_messages = _split_history(
        messages, self.keep_recent
    )
    if not messages_to_summarize:
        return messages

    instruction_message = Message(role="user", content=self.instruction_text)
    llm_payload = messages_to_summarize + [instruction_message]

    try:
        response = await self.provider.text_chat(contexts=llm_payload)
        summary_content = response.completion_text
    except Exception as e:
        logger.error(f"Failed to generate summary: {e}")
        return messages

    result: list[Message] = []
    if system_msg:
        result.append(system_msg)

    result.append(
        Message(
            role="system",
            content=f"History conversation summary: {summary_content}",
        )
    )
    result.extend(recent_messages)
    return result
```

这样既保持了行为不变,又使不同关注点(分段 vs 调用 LLM)更清晰,更易于推理。
</issue_to_address>

### Comment 7
<location> `astrbot/core/agent/context/truncator.py:4` </location>
<code_context>
+from ..message import Message
+
+
+class ContextTruncator:
+    """Context truncator."""
+
</code_context>

<issue_to_address>
**issue (complexity):** 建议抽取共享的清理(sanitize)辅助函数,并明确基于轮次的计算逻辑,让两个截断方法复用相同的修复逻辑,从而提升可读性和可维护性。

你可以在保留当前行为的前提下,通过以下方式减少复杂度和重复:

1. **抽取共享的“sanitize”逻辑**(先截到第一个 `user` 再调用 `fix_messages`2. **`truncate_by_turns` 中显式化“轮”相关的计算**
3. **让两个截断策略都复用这些辅助函数**

### 1. 集中“修复”/sanitize 行为

这两个方法都会:

- 截到第一个 `user` 消息之后,并且
- 调用 `fix_messages`。

可以将这一过程命名为一个步骤,让调用者明确“截断”和“上下文修复”是两个独立的阶段:

```python
class ContextTruncator:
    def _strip_before_first_user(self, messages: list[Message]) -> list[Message]:
        index = next(
            (i for i, m in enumerate(messages) if m.role == "user"),
            None,
        )
        if index is not None and index > 0:
            return messages[index:]
        return messages

    def sanitize(self, messages: list[Message]) -> list[Message]:
        """Normalize context order and repair tool segments."""
        messages = self._strip_before_first_user(messages)
        return self.fix_messages(messages)

    def fix_messages(self, messages: list[Message]) -> list[Message]:
        fixed_messages: list[Message] = []
        for message in messages:
            if message.role == "tool":
                if len(fixed_messages) < 2:
                    fixed_messages = []
                else:
                    fixed_messages.append(message)
            else:
                fixed_messages.append(message)
        return fixed_messages
```

然后两个截断方法都只在末尾调用一次 `sanitize(...)`,让职责更加清晰。

### 2. 简化并文档化 `truncate_by_turns` 的数学逻辑

你可以保持现有意图不变,同时让基于轮次的计算更易读:

```python
    def truncate_by_turns(
        self,
        messages: list[Message],
        keep_most_recent_turns: int,
        dequeue_turns: int = 1,
    ) -> list[Message]:
        if keep_most_recent_turns == -1:
            return messages

        if not messages:
            return messages

        system_message: Message | None = None
        non_system = messages

        if messages[0].role == "system":
            system_message = messages[0]
            non_system = messages[1:]

        total_turns = len(non_system) // 2
        if total_turns <= keep_most_recent_turns:
            return messages

        # Drop oldest turns but ensure we drop at least `dequeue_turns`
        turns_to_keep = keep_most_recent_turns
        turns_to_drop = max(total_turns - turns_to_keep, dequeue_turns)
        start_index = turns_to_drop * 2  # 2 messages per turn

        truncated = non_system[start_index:]

        if system_message is not None:
            truncated = [system_message] + truncated

        return self.sanitize(truncated)
```

这样依旧保持原有行为(“保留最近 N 轮,并至少丢弃 `dequeue_turns` 轮”),但避免了晦涩的 `-(keep_most_recent_turns - dequeue_turns + 1) * 2` 表达式,并把所有计算都绑定在显式的“轮次”概念上。

### 3. 在 `truncate_by_halving` 中复用辅助函数

有了 `sanitize` 后,`truncate_by_halving` 就可以更短、更少重复:

```python
    def truncate_by_halving(self, messages: list[Message]) -> list[Message]:
        if len(messages) <= 2:
            return messages

        first_non_system = next(
            (i for i, m in enumerate(messages) if m.role != "system"),
            len(messages),
        )

        messages_to_delete = (len(messages) - first_non_system) // 2

        kept_prefix = messages[:first_non_system]
        kept_suffix = messages[first_non_system + messages_to_delete :]

        result = kept_prefix + kept_suffix
        return self.sanitize(result)
```

这样:

- 保留了当前所有行为,
- 将结构性“修复”逻辑集中在一处,
- 并使两种截断策略更易理解和维护。
</issue_to_address>

Sourcery 对开源项目免费 —— 如果你觉得这次评审有帮助,欢迎分享 ✨
帮我变得更有用!请对每条评论点 👍 或 👎,我会根据你的反馈持续改进评审质量。
Original comment in English

Hey - I've found 7 issues, and left some high level feedback:

  • In ContextTruncator.truncate_by_turns, the slice (-(keep_most_recent_turns - dequeue_turns + 1) * 2 :) will behave unexpectedly or even drop almost everything when keep_most_recent_turns <= dequeue_turns or very small; consider clamping dequeue_turns relative to keep_most_recent_turns and adding explicit guards/tests for these edge cases.
  • The default LLM compression instruction string is duplicated in both the config default and LLMSummaryCompressor; consider centralizing this default (e.g., a shared constant) so future edits don’t drift between backend logic and configuration.
  • The TokenCounter._estimate_tokens heuristic (0.6/0.3 per-character) is quite ad‑hoc and may diverge significantly from actual model tokenization; if this value is used to gate compression behavior, consider making the heuristic configurable per-provider/model or clearly constraining its use to non-critical, approximate decisions.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `ContextTruncator.truncate_by_turns`, the slice `(-(keep_most_recent_turns - dequeue_turns + 1) * 2 :)` will behave unexpectedly or even drop almost everything when `keep_most_recent_turns <= dequeue_turns` or very small; consider clamping `dequeue_turns` relative to `keep_most_recent_turns` and adding explicit guards/tests for these edge cases.
- The default LLM compression instruction string is duplicated in both the config default and `LLMSummaryCompressor`; consider centralizing this default (e.g., a shared constant) so future edits don’t drift between backend logic and configuration.
- The `TokenCounter._estimate_tokens` heuristic (0.6/0.3 per-character) is quite ad‑hoc and may diverge significantly from actual model tokenization; if this value is used to gate compression behavior, consider making the heuristic configurable per-provider/model or clearly constraining its use to non-critical, approximate decisions.

## Individual Comments

### Comment 1
<location> `astrbot/core/agent/context/truncator.py:22-31` </location>
<code_context>
+    def truncate_by_turns(
</code_context>

<issue_to_address>
**issue (bug_risk):** truncate_by_turns semantics are unclear for small/zero keep_most_recent_turns and can lead to surprising slices

The interaction between `keep_most_recent_turns` and `dequeue_turns` is difficult to reason about and behaves oddly for small values. For instance:
- `keep_most_recent_turns == 0` and `dequeue_turns == 1` makes `(keep_most_recent_turns - dequeue_turns + 1) * 2` equal 0, so `messages[-0:]` returns the full list (no truncation).
- `keep_most_recent_turns < dequeue_turns` yields a negative slice index, dropping more than `dequeue_turns` turns.
A clearer approach would be to first compute the exact number of messages to keep based on turn counting, slice once from the end using that count, and then adjust to message boundaries. This would eliminate the negative/zero slice edge cases and make the behavior easier to reason about in all call sites.
</issue_to_address>

### Comment 2
<location> `astrbot/core/agent/context/truncator.py:68-77` </location>
<code_context>
+
+        return self.fix_messages(truncated_contexts)
+
+    def truncate_by_halving(
+        self,
+        messages: list[Message],
+    ) -> list[Message]:
+        """对半砍策略,删除 50% 的消息"""
+        if len(messages) <= 2:
+            return messages
+
+        first_non_system = 0
+        for i, msg in enumerate(messages):
+            if msg.role != "system":
+                first_non_system = i
+                break
+
+        messages_to_delete = (len(messages) - first_non_system) // 2
+
+        result = messages[:first_non_system]
+        result.extend(messages[first_non_system + messages_to_delete :])
+
+        index = next(
+            (i for i, item in enumerate(result) if item.role == "user"),
+            None,
+        )
+        if index is not None:
+            result = result[index:]
+
+        return self.fix_messages(result)
</code_context>

<issue_to_address>
**issue (bug_risk):** Halving truncation can drop the initial system message and earlier instructions

In `truncate_by_halving`, you first preserve leading system messages, but then you re-slice `result` from the first `user` role:
```python
index = next((i for i, item in enumerate(result) if item.role == "user"), None)
if index is not None:
    result = result[index:]
```
This can drop the preserved system prompt and any other non-user instructions before the first user message, effectively discarding configuration rather than just halving the conversational history. It would be safer to always keep the initial system message (as in `truncate_by_turns`) and apply halving only to the non-system portion, then prepend the system message without applying the `user`-based slicing to it.
</issue_to_address>

### Comment 3
<location> `astrbot/core/agent/context/compressor.py:42-51` </location>
<code_context>
+        return messages
+
+
+class TruncateByTurnsCompressor(ContextCompressor):
+    """Truncate by turns compressor implementation.
+    Truncates the message list by removing older turns.
+    """
+
+    def __init__(self, truncate_turns: int = 1):
+        """Initialize the truncate by turns compressor.
+
+        Args:
+            truncate_turns: The number of turns to remove when truncating (default: 1).
+        """
+        self.truncate_turns = truncate_turns
+
+    async def compress(self, messages: list[Message]) -> list[Message]:
+        truncator = ContextTruncator()
+        truncated_messages = truncator.truncate_by_turns(
+            messages,
+            keep_most_recent_turns=0,
</code_context>

<issue_to_address>
**issue (bug_risk):** Using keep_most_recent_turns=0 in TruncateByTurnsCompressor may not actually drop any history

`truncate_by_turns` has guard clauses and slice logic that, for `keep_most_recent_turns=0`, can frequently return `messages` unchanged (and in some cases preserve the full list; see prior comment). That means this compressor may not actually remove `truncate_turns` worth of history. Consider either adding a dedicated "drop N oldest turns" operation that doesn’t depend on `keep_most_recent_turns`, or passing a nonzero `keep_most_recent_turns` that matches the intended retained window.
</issue_to_address>

### Comment 4
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:57` </location>
<code_context>
         self.req = request
         self.streaming = kwargs.get("streaming", False)
+
+        # enforce max turns, will discard older turns when exceeded BEFORE compression
+        # -1 means no limit
+        self.enforce_max_turns = kwargs.get("enforce_max_turns", -1)
</code_context>

<issue_to_address>
**issue (complexity):** Consider consolidating all truncation and compression handling into a single, config-driven ContextManager.process call so the runner no longer manages context details directly.

You can reduce the added complexity by making `ContextManager` the single entry point for *all* context mutation (turn truncation + compression), and by centralizing configuration and error handling there.

### 1. Unify truncation + compression into one `ContextManager.process` call

Right now `step` coordinates two flows:

```python
# step()
if self.enforce_max_turns != -1:
    try:
        truncated_messages = self.context_truncator.truncate_by_turns(
            self.run_context.messages,
            keep_most_recent_turns=self.enforce_max_turns,
            dequeue_turns=self.truncate_turns,
        )
        self.run_context.messages = truncated_messages
    except Exception as e:
        logger.error(...)

try:
    await self.do_context_compress()
except Exception as e:
    logger.error(...)
```

You can push this into `ContextManager` so the runner just does:

```python
# step()
try:
    self.run_context.messages = await self.context_manager.process(
        self.run_context.messages
    )
except Exception as e:
    logger.error("Error during context processing: %s", e, exc_info=True)
```

And remove `ContextTruncator` and `do_context_compress` from the runner entirely.

### 2. Centralize configuration into a single context config

Instead of keeping half the knobs in the runner and half in `ContextManager`, define a small config object and pass that once:

```python
# context/config.py
from dataclasses import dataclass
from typing import Optional
from astrbot.core.provider.provider import Provider

@dataclass
class ContextConfig:
    max_context_tokens: int
    enforce_max_turns: int  # -1 means no limit
    truncate_turns: int
    llm_compress_instruction: Optional[str]
    llm_compress_keep_recent: int
    llm_compress_provider: Optional[Provider]
```

Use it in `reset`:

```python
# in __init__/reset of the runner
from ..context.config import ContextConfig

self.context_config = ContextConfig(
    max_context_tokens=provider.provider_config.get("max_context_tokens", 0),
    enforce_max_turns=kwargs.get("enforce_max_turns", -1),
    truncate_turns=kwargs.get("truncate_turns", 1),
    llm_compress_instruction=kwargs.get("llm_compress_instruction"),
    llm_compress_keep_recent=kwargs.get("llm_compress_keep_recent", 0),
    llm_compress_provider=kwargs.get("llm_compress_provider"),
)

self.context_manager = ContextManager(self.context_config)
```

Now `ToolLoopAgentRunner` doesn’t need to know about `truncate_by_turns` or compression internals.

### 3. Move truncation + compression + error handling inside `ContextManager`

Implement the combined logic inside `ContextManager.process` so behavior stays the same but is locally coherent:

```python
# context/manager.py
class ContextManager:
    def __init__(self, config: ContextConfig) -> None:
        self.config = config
        self._truncator = ContextTruncator()
        # existing token counter/compressor init...

    async def process(self, messages: list[Message]) -> list[Message]:
        try:
            result = messages

            # 1. turn-based truncation
            if self.config.enforce_max_turns != -1:
                result = self._truncator.truncate_by_turns(
                    result,
                    keep_most_recent_turns=self.config.enforce_max_turns,
                    dequeue_turns=self.config.truncate_turns,
                )

            # 2. token-based compression (existing logic)
            result = await self._compress_if_needed(result)

            return result
        except Exception as e:
            logger.error("Error during context processing: %s", e, exc_info=True)
            # best-effort: return whatever we have (or fall back to original)
            return messages
```

Then delete the runner’s:

```python
self.context_truncator = ContextTruncator()
...
async def do_context_compress(self): ...
...
# and the two separate try/except blocks in step()
```

This keeps all existing functionality (max-turn enforcement, turn dequeueing, compression) but:

- The runner has a single `context_manager.process(...)` call.
- Configuration is in one place (`ContextConfig`).
- Error handling for context logic is localized in `ContextManager`.
</issue_to_address>

### Comment 5
<location> `astrbot/core/agent/context/manager.py:14` </location>
<code_context>
+    from astrbot.core.provider.provider import Provider
+
+
+class ContextManager:
+    """Context compression manager."""
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider inlining the compression and token-check logic directly into `process` to avoid extra helper methods and make the control flow easier to follow in one place.

You can simplify `ContextManager` by inlining the linear compression flow into `process` and removing the two helper methods. This keeps all behavior (threshold check, compression, halving) intact but reduces indirection and mental overhead.

### Simplify `process` and drop `_initial_token_check` / `_run_compression`

```python
class ContextManager:
    COMPRESSION_THRESHOLD = 0.82

    async def process(self, messages: list[Message]) -> list[Message]:
        """Process the messages, compressing/truncating if needed."""
        if self.max_context_tokens <= 0 or not messages:
            return messages

        # Initial token check
        total_tokens = self.token_counter.count_tokens(messages)
        usage_rate = total_tokens / self.max_context_tokens
        logger.debug(
            "ContextManager: total tokens = %s, max_context_tokens = %s",
            total_tokens,
            self.max_context_tokens,
        )

        if usage_rate <= self.COMPRESSION_THRESHOLD:
            return messages

        # First pass: primary compression strategy
        messages = await self.compressor.compress(messages)

        # Second pass: double-check and halve if still too large
        tokens_after = self.token_counter.count_tokens(messages)
        if tokens_after / self.max_context_tokens > self.COMPRESSION_THRESHOLD:
            messages = self.truncator.truncate_by_halving(messages)

        return messages
```

Then you can safely remove `_initial_token_check` and `_run_compression`. The behavior is identical, but the control flow is now a single, easy-to-follow pipeline:

1. Check limit → early return.
2. Count tokens → threshold check.
3. Compress via selected strategy.
4. Recount and optionally halve.
</issue_to_address>

### Comment 6
<location> `astrbot/core/agent/context/compressor.py:14` </location>
<code_context>
+from ..context.truncator import ContextTruncator
+
+
+class ContextCompressor(ABC):
+    """
+    Abstract base class for context compressors.
</code_context>

<issue_to_address>
**issue (complexity):** Consider simplifying the compressor design by replacing the ABC with a callable/Protocol, inlining the no-op default, and extracting message-splitting from LLM calls to separate concerns and reduce indirection.

You can keep all functionality while reducing the abstraction surface a bit.

### 1. Replace the ABC with a lightweight Protocol / callable

If all compressors are known and wired up in code (no dynamic plugin loading), an ABC is heavier than needed. A `Protocol` or even a plain callable type keeps the “strategy” concept without the base-class indirection.

For example:

```python
# compressor.py
from typing import Protocol, TYPE_CHECKING, Awaitable, Callable

if TYPE_CHECKING:
    from ..message import Message

class ContextCompressor(Protocol):
    async def __call__(self, messages: list["Message"]) -> list["Message"]:
        ...
```

Then compressors can just be callables:

```python
from dataclasses import dataclass
from ..context.truncator import ContextTruncator

@dataclass
class TruncateByTurnsCompressor:
    truncate_turns: int = 1

    async def __call__(self, messages: list[Message]) -> list[Message]:
        truncator = ContextTruncator()
        return truncator.truncate_by_turns(
            messages,
            keep_most_recent_turns=0,
            dequeue_turns=self.truncate_turns,
        )
```

Usage in `ContextManager` becomes simpler as well:

```python
self._compressor: ContextCompressor = TruncateByTurnsCompressor(truncate_turns=1)

compressed = await self._compressor(messages)
```

This keeps polymorphism but removes the abstract base class and its extra indirection.

### 2. Drop `DefaultCompressor` in favor of a trivial callable

If the default behavior is “no-op compression” and `DefaultCompressor` is only encoding `return messages`, you can avoid another named class:

```python
from typing import Awaitable, Callable

NoopCompressor: ContextCompressor = lambda messages: _noop(messages)

async def _noop(messages: list[Message]) -> list[Message]:
    return messages
```

Or just pass `_noop` wherever you currently pass `DefaultCompressor()`. This preserves behavior without an extra type.

### 3. Split message-slicing from LLM invocation in `LLMSummaryCompressor`

`LLMSummaryCompressor` currently does both message partitioning and the provider call. You can keep the class but make the logic easier to follow/test by extracting a pure helper for the slicing:

```python
def _split_history(
    messages: list[Message], keep_recent: int
) -> tuple[Message | None, list[Message], list[Message]]:
    system_msg = messages[0] if messages and messages[0].role == "system" else None
    start_idx = 1 if system_msg else 0

    messages_to_summarize = messages[start_idx : -keep_recent]
    recent_messages = messages[-keep_recent:] if keep_recent > 0 else []

    return system_msg, messages_to_summarize, recent_messages
```

Then `compress` becomes more linear:

```python
async def compress(self, messages: list[Message]) -> list[Message]:
    if len(messages) <= self.keep_recent + 1:
        return messages

    system_msg, messages_to_summarize, recent_messages = _split_history(
        messages, self.keep_recent
    )
    if not messages_to_summarize:
        return messages

    instruction_message = Message(role="user", content=self.instruction_text)
    llm_payload = messages_to_summarize + [instruction_message]

    try:
        response = await self.provider.text_chat(contexts=llm_payload)
        summary_content = response.completion_text
    except Exception as e:
        logger.error(f"Failed to generate summary: {e}")
        return messages

    result: list[Message] = []
    if system_msg:
        result.append(system_msg)

    result.append(
        Message(
            role="system",
            content=f"History conversation summary: {summary_content}",
        )
    )
    result.extend(recent_messages)
    return result
```

Behavior remains identical, but concerns (partitioning vs. LLM call) are clearer and easier to reason about.
</issue_to_address>

### Comment 7
<location> `astrbot/core/agent/context/truncator.py:4` </location>
<code_context>
+from ..message import Message
+
+
+class ContextTruncator:
+    """Context truncator."""
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting shared sanitization helpers and clarifying the turn-based calculations so both truncation methods reuse the same repair logic and are easier to read and maintain.

You can keep the current behavior while reducing complexity and duplication by:

1. **Extracting shared “sanitize” logic** (first-user trimming + `fix_messages`)
2. **Making the turn math explicit in `truncate_by_turns`**
3. **Reusing helpers in both truncation strategies**

### 1. Centralize the “fix” / sanitize behavior

Both methods:

- strip to the first `user` message, and
- call `fix_messages`.

That can be made a single, named step so callers know truncation is separate from “context repair”:

```python
class ContextTruncator:
    def _strip_before_first_user(self, messages: list[Message]) -> list[Message]:
        index = next(
            (i for i, m in enumerate(messages) if m.role == "user"),
            None,
        )
        if index is not None and index > 0:
            return messages[index:]
        return messages

    def sanitize(self, messages: list[Message]) -> list[Message]:
        """Normalize context order and repair tool segments."""
        messages = self._strip_before_first_user(messages)
        return self.fix_messages(messages)

    def fix_messages(self, messages: list[Message]) -> list[Message]:
        fixed_messages: list[Message] = []
        for message in messages:
            if message.role == "tool":
                if len(fixed_messages) < 2:
                    fixed_messages = []
                else:
                    fixed_messages.append(message)
            else:
                fixed_messages.append(message)
        return fixed_messages
```

Then both truncators just call `sanitize(...)` at the end, making the responsibility more explicit.

### 2. Simplify and document `truncate_by_turns` math

You can keep the logic but make the turn-based calculation more readable:

```python
    def truncate_by_turns(
        self,
        messages: list[Message],
        keep_most_recent_turns: int,
        dequeue_turns: int = 1,
    ) -> list[Message]:
        if keep_most_recent_turns == -1:
            return messages

        if not messages:
            return messages

        system_message: Message | None = None
        non_system = messages

        if messages[0].role == "system":
            system_message = messages[0]
            non_system = messages[1:]

        total_turns = len(non_system) // 2
        if total_turns <= keep_most_recent_turns:
            return messages

        # Drop oldest turns but ensure we drop at least `dequeue_turns`
        turns_to_keep = keep_most_recent_turns
        turns_to_drop = max(total_turns - turns_to_keep, dequeue_turns)
        start_index = turns_to_drop * 2  # 2 messages per turn

        truncated = non_system[start_index:]

        if system_message is not None:
            truncated = [system_message] + truncated

        return self.sanitize(truncated)
```

This keeps the same behavior (“keep N recent turns, drop at least `dequeue_turns`”) but avoids the opaque `-(keep_most_recent_turns - dequeue_turns + 1) * 2` expression and ties everything to explicit turn counts.

### 3. Reuse helpers in `truncate_by_halving`

With `sanitize` in place, `truncate_by_halving` becomes shorter and less duplicated:

```python
    def truncate_by_halving(self, messages: list[Message]) -> list[Message]:
        if len(messages) <= 2:
            return messages

        first_non_system = next(
            (i for i, m in enumerate(messages) if m.role != "system"),
            len(messages),
        )

        messages_to_delete = (len(messages) - first_non_system) // 2

        kept_prefix = messages[:first_non_system]
        kept_suffix = messages[first_non_system + messages_to_delete :]

        result = kept_prefix + kept_suffix
        return self.sanitize(result)
```

This:

- keeps all current behavior,
- consolidates the structural “repair” logic,
- and makes the truncation strategies easier to understand and maintain.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

from ..message import Message


class ContextTruncator:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议抽取共享的清理(sanitize)辅助函数,并明确基于轮次的计算逻辑,让两个截断方法复用相同的修复逻辑,从而提升可读性和可维护性。

你可以在保留当前行为的前提下,通过以下方式减少复杂度和重复:

  1. 抽取共享的“sanitize”逻辑(先截到第一个 user 再调用 fix_messages
  2. truncate_by_turns 中显式化“轮”相关的计算
  3. 让两个截断策略都复用这些辅助函数

1. 集中“修复”/sanitize 行为

这两个方法都会:

  • 截到第一个 user 消息之后,并且
  • 调用 fix_messages

可以将这一过程命名为一个步骤,让调用者明确“截断”和“上下文修复”是两个独立的阶段:

class ContextTruncator:
    def _strip_before_first_user(self, messages: list[Message]) -> list[Message]:
        index = next(
            (i for i, m in enumerate(messages) if m.role == "user"),
            None,
        )
        if index is not None and index > 0:
            return messages[index:]
        return messages

    def sanitize(self, messages: list[Message]) -> list[Message]:
        """Normalize context order and repair tool segments."""
        messages = self._strip_before_first_user(messages)
        return self.fix_messages(messages)

    def fix_messages(self, messages: list[Message]) -> list[Message]:
        fixed_messages: list[Message] = []
        for message in messages:
            if message.role == "tool":
                if len(fixed_messages) < 2:
                    fixed_messages = []
                else:
                    fixed_messages.append(message)
            else:
                fixed_messages.append(message)
        return fixed_messages

然后两个截断方法都只在末尾调用一次 sanitize(...),让职责更加清晰。

2. 简化并文档化 truncate_by_turns 的数学逻辑

你可以保持现有意图不变,同时让基于轮次的计算更易读:

    def truncate_by_turns(
        self,
        messages: list[Message],
        keep_most_recent_turns: int,
        dequeue_turns: int = 1,
    ) -> list[Message]:
        if keep_most_recent_turns == -1:
            return messages

        if not messages:
            return messages

        system_message: Message | None = None
        non_system = messages

        if messages[0].role == "system":
            system_message = messages[0]
            non_system = messages[1:]

        total_turns = len(non_system) // 2
        if total_turns <= keep_most_recent_turns:
            return messages

        # Drop oldest turns but ensure we drop at least `dequeue_turns`
        turns_to_keep = keep_most_recent_turns
        turns_to_drop = max(total_turns - turns_to_keep, dequeue_turns)
        start_index = turns_to_drop * 2  # 2 messages per turn

        truncated = non_system[start_index:]

        if system_message is not None:
            truncated = [system_message] + truncated

        return self.sanitize(truncated)

这样依旧保持原有行为(“保留最近 N 轮,并至少丢弃 dequeue_turns 轮”),但避免了晦涩的 -(keep_most_recent_turns - dequeue_turns + 1) * 2 表达式,并把所有计算都绑定在显式的“轮次”概念上。

3. 在 truncate_by_halving 中复用辅助函数

有了 sanitize 后,truncate_by_halving 就可以更短、更少重复:

    def truncate_by_halving(self, messages: list[Message]) -> list[Message]:
        if len(messages) <= 2:
            return messages

        first_non_system = next(
            (i for i, m in enumerate(messages) if m.role != "system"),
            len(messages),
        )

        messages_to_delete = (len(messages) - first_non_system) // 2

        kept_prefix = messages[:first_non_system]
        kept_suffix = messages[first_non_system + messages_to_delete :]

        result = kept_prefix + kept_suffix
        return self.sanitize(result)

这样:

  • 保留了当前所有行为,
  • 将结构性“修复”逻辑集中在一处,
  • 并使两种截断策略更易理解和维护。
Original comment in English

issue (complexity): Consider extracting shared sanitization helpers and clarifying the turn-based calculations so both truncation methods reuse the same repair logic and are easier to read and maintain.

You can keep the current behavior while reducing complexity and duplication by:

  1. Extracting shared “sanitize” logic (first-user trimming + fix_messages)
  2. Making the turn math explicit in truncate_by_turns
  3. Reusing helpers in both truncation strategies

1. Centralize the “fix” / sanitize behavior

Both methods:

  • strip to the first user message, and
  • call fix_messages.

That can be made a single, named step so callers know truncation is separate from “context repair”:

class ContextTruncator:
    def _strip_before_first_user(self, messages: list[Message]) -> list[Message]:
        index = next(
            (i for i, m in enumerate(messages) if m.role == "user"),
            None,
        )
        if index is not None and index > 0:
            return messages[index:]
        return messages

    def sanitize(self, messages: list[Message]) -> list[Message]:
        """Normalize context order and repair tool segments."""
        messages = self._strip_before_first_user(messages)
        return self.fix_messages(messages)

    def fix_messages(self, messages: list[Message]) -> list[Message]:
        fixed_messages: list[Message] = []
        for message in messages:
            if message.role == "tool":
                if len(fixed_messages) < 2:
                    fixed_messages = []
                else:
                    fixed_messages.append(message)
            else:
                fixed_messages.append(message)
        return fixed_messages

Then both truncators just call sanitize(...) at the end, making the responsibility more explicit.

2. Simplify and document truncate_by_turns math

You can keep the logic but make the turn-based calculation more readable:

    def truncate_by_turns(
        self,
        messages: list[Message],
        keep_most_recent_turns: int,
        dequeue_turns: int = 1,
    ) -> list[Message]:
        if keep_most_recent_turns == -1:
            return messages

        if not messages:
            return messages

        system_message: Message | None = None
        non_system = messages

        if messages[0].role == "system":
            system_message = messages[0]
            non_system = messages[1:]

        total_turns = len(non_system) // 2
        if total_turns <= keep_most_recent_turns:
            return messages

        # Drop oldest turns but ensure we drop at least `dequeue_turns`
        turns_to_keep = keep_most_recent_turns
        turns_to_drop = max(total_turns - turns_to_keep, dequeue_turns)
        start_index = turns_to_drop * 2  # 2 messages per turn

        truncated = non_system[start_index:]

        if system_message is not None:
            truncated = [system_message] + truncated

        return self.sanitize(truncated)

This keeps the same behavior (“keep N recent turns, drop at least dequeue_turns”) but avoids the opaque -(keep_most_recent_turns - dequeue_turns + 1) * 2 expression and ties everything to explicit turn counts.

3. Reuse helpers in truncate_by_halving

With sanitize in place, truncate_by_halving becomes shorter and less duplicated:

    def truncate_by_halving(self, messages: list[Message]) -> list[Message]:
        if len(messages) <= 2:
            return messages

        first_non_system = next(
            (i for i, m in enumerate(messages) if m.role != "system"),
            len(messages),
        )

        messages_to_delete = (len(messages) - first_non_system) // 2

        kept_prefix = messages[:first_non_system]
        kept_suffix = messages[first_non_system + messages_to_delete :]

        result = kept_prefix + kept_suffix
        return self.sanitize(result)

This:

  • keeps all current behavior,
  • consolidates the structural “repair” logic,
  • and makes the truncation strategies easier to understand and maintain.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a comprehensive context management and compression system for the AstrBot agent framework to prevent context overflow during long conversations. The implementation provides two strategies: truncation by conversation turns and LLM-based intelligent summarization, with automatic token counting to trigger compression when usage exceeds 82% of the model's context window.

Key Changes:

  • Added pluggable context management architecture with ContextManager, ContextCompressor, ContextTruncator, and TokenCounter modules
  • Integrated LLM-based summarization that can compress old conversation history while preserving recent messages
  • Extended provider configuration to include max_context_tokens with automatic inference from model metadata
  • Relocated context management settings from "Others" to a dedicated "Truncate and Compress" section in the UI

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
astrbot/core/agent/context/manager.py Implements main ContextManager orchestrating token checking and compression strategies
astrbot/core/agent/context/compressor.py Provides LLMSummaryCompressor and TruncateByTurnsCompressor implementations
astrbot/core/agent/context/truncator.py Handles message list truncation by turns or halving strategies
astrbot/core/agent/context/token_counter.py Estimates token usage using heuristic calculations for Chinese and other text
astrbot/core/agent/runners/tool_loop_agent_runner.py Integrates context management into the agent execution loop
astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py Wires compression parameters from config to agent runner and injects model context limits
astrbot/core/config/default.py Adds configuration schema for context compression strategies and LLM compression parameters
dashboard/src/i18n/locales/zh-CN/features/config-metadata.json Moves context settings to new "truncate_and_compress" section with Chinese labels
dashboard/src/i18n/locales/en-US/features/config-metadata.json Moves context settings to new "truncate_and_compress" section with English labels
dashboard/src/composables/useProviderSources.ts Automatically populates max_context_tokens from model metadata when creating providers
dashboard/src/components/shared/ConfigItemRenderer.vue Adjusts flex layout for slider/text field controls

- Implemented a full test suite for ContextManager covering initialization, message processing, token-based compression, and error handling.
- Added tests for ContextTruncator focusing on message fixing, truncation by turns, dropping oldest turns, and halving.
- Ensured that both test suites validate edge cases and maintain expected behavior with various message types, including system and tool messages.
@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. and removed size:XL This PR changes 500-999 lines, ignoring generated files. labels Jan 5, 2026
@Soulter
Copy link
Member Author

Soulter commented Jan 5, 2026

LGTM

@Soulter Soulter merged commit 241f1c2 into master Jan 5, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] 为agent增加自动压缩功能

2 participants