Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions openviking/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,13 @@ async def load(self):
pass

# Load .meta.json
meta_loaded = False
try:
meta_content = await self._viking_fs.read_file(
f"{self._session_uri}/.meta.json", ctx=self.ctx
)
self._meta = SessionMeta.from_dict(json.loads(meta_content))
meta_loaded = True
except Exception:
# Old session without meta — derive from existing data
self._meta.message_count = len(self._messages)
Expand All @@ -431,6 +433,15 @@ async def load(self):
self._meta.created_by_account_id = self.ctx.account_id
if not self._meta.created_by_user_id:
self._meta.created_by_user_id = self.ctx.user.user_id
# Self-heal a lost commit clear: if a commit persisted .meta.json (and the
# archive .done marker) but the live messages.jsonl clear never reached
# disk, load() would read the already-archived messages back as live and
# re-insert them as active context (#1487). Since add_message keeps
# .meta.json's message_count in lockstep with messages.jsonl, a loaded
# message count that exceeds the committed message_count means the clear
# was lost; trust the committed count and drop the stale leading prefix.
if meta_loaded:
await self._reconcile_stale_live_messages()
# WM v2: always rebuild pending_tokens from current messages so the
# counter stays consistent across restarts and is also backfilled for
# legacy sessions whose .meta.json predates these fields. O(n) once,
Expand All @@ -439,6 +450,41 @@ async def load(self):

self._loaded = True

async def _reconcile_stale_live_messages(self) -> None:
"""Drop archived messages that a lost commit clear left in messages.jsonl.

``commit_async`` clears the live ``messages.jsonl`` and persists the new
(lower) ``message_count`` to ``.meta.json``. If that clear is lost while
``.meta.json`` lands, ``load()`` reads the archived tail back as live
messages. ``add_message`` keeps ``message_count`` equal to the
``messages.jsonl`` line count, so ``len(self._messages) > message_count``
on load is the signature of that lost clear. Recover by keeping only the
most recent ``message_count`` messages (the retained tail) and rewriting
``messages.jsonl`` so the heal survives the next restart.
"""
committed_count = int(self._meta.message_count or 0)
if committed_count < 0 or len(self._messages) <= committed_count:
return

stale_count = len(self._messages) - committed_count
self._messages = self._messages[stale_count:] if committed_count else []
logger.warning(
"Session %s: dropped %d stale live message(s) from a lost commit clear "
"(messages.jsonl exceeded committed message_count=%d)",
self.session_id,
stale_count,
committed_count,
)
try:
await self._write_to_agfs_async(messages=self._messages)
except Exception as e:
logger.error(
"Session %s: failed to rewrite messages.jsonl during stale-message "
"reconciliation: %s",
self.session_id,
e,
)

def _rebuild_pending_tokens(self) -> None:
"""Recompute ``pending_tokens`` from the current message list.

Expand Down
150 changes: 150 additions & 0 deletions tests/session/test_session_stale_message_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: AGPL-3.0

"""Regression tests for #1487.

``commit_async`` clears the live ``messages.jsonl`` and persists the new
(lower) ``message_count`` to ``.meta.json``. If that clear is lost while
``.meta.json`` and the archive ``.done`` marker land on disk, ``load()`` used
to read the already-archived messages back as live messages and re-insert them
as active context.

``Session.load()`` now reconciles this on load: since ``add_message`` keeps
``.meta.json``'s ``message_count`` in lockstep with the ``messages.jsonl`` line
count, a loaded message count that exceeds the committed ``message_count`` is
the signature of a lost clear, and the stale leading prefix is dropped.

These tests drive a real ``Session`` against a minimal in-memory filesystem, so
they exercise the actual ``load()`` path without the AGFS subprocess or an LLM.
"""

import json

import pytest

from openviking.message import TextPart
from openviking.session import Session


class FakeVikingFS:
"""Minimal in-memory VikingFS covering the methods load()/reconcile use."""

def __init__(self):
self.files: dict[str, str] = {}

async def read_file(self, uri, offset=0, limit=-1, ctx=None):
if uri not in self.files:
raise FileNotFoundError(uri)
return self.files[uri]

async def write_file(self, uri, content, ctx=None):
if isinstance(content, bytes):
content = content.decode("utf-8")
self.files[uri] = content

async def ls(self, uri, ctx=None):
base = uri.rstrip("/")
names = set()
for f in self.files:
if f.startswith(base + "/"):
names.add(f[len(base) + 1 :].split("/")[0])
return [{"name": n} for n in sorted(names)]


def _msg_line(idx: int, role: str, text: str) -> str:
from openviking.message import Message

return Message(id=f"m{idx}", role=role, parts=[TextPart(text)]).to_jsonl()


def _seed(fs: FakeVikingFS, session_uri: str, jsonl_messages, message_count):
"""Persist a session state directly: messages.jsonl + .meta.json."""
content = "".join(_msg_line(i, r, t) + "\n" for i, (r, t) in enumerate(jsonl_messages))
fs.files[f"{session_uri}/messages.jsonl"] = content
fs.files[f"{session_uri}/.meta.json"] = json.dumps(
{"session_id": "s1487", "message_count": message_count, "last_commit_at": "t0"}
)


class TestStaleLiveMessageReconciliation:
def _session(self, fs):
return Session(viking_fs=fs, session_id="s1487")

async def test_lost_clear_drops_all_stale_messages(self):
"""keep=0 commit: message_count=0 but jsonl still holds archived rows."""
fs = FakeVikingFS()
uri = Session(viking_fs=fs, session_id="s1487")._session_uri
_seed(
fs,
uri,
[("user", "old A"), ("assistant", "old B")],
message_count=0, # committed clear says zero live messages
)

s = self._session(fs)
await s.load()

assert len(s.messages) == 0
# messages.jsonl is rewritten so the heal survives a restart.
assert fs.files[f"{uri}/messages.jsonl"].strip() == ""

async def test_lost_clear_keeps_retained_tail(self):
"""keep>0 commit: message_count=1 but jsonl holds archived prefix + tail."""
fs = FakeVikingFS()
uri = Session(viking_fs=fs, session_id="s1487")._session_uri
_seed(
fs,
uri,
[("user", "old A"), ("assistant", "old B"), ("user", "kept tail")],
message_count=1, # only the last message should remain live
)

s = self._session(fs)
await s.load()

assert [m.content for m in s.messages] == ["kept tail"]
# Disk healed to exactly the retained tail.
reloaded = self._session(fs)
await reloaded.load()
assert [m.content for m in reloaded.messages] == ["kept tail"]

async def test_consistent_state_is_untouched(self):
"""Normal case: message_count matches jsonl line count -> no change."""
fs = FakeVikingFS()
uri = Session(viking_fs=fs, session_id="s1487")._session_uri
_seed(
fs,
uri,
[("user", "live one"), ("assistant", "live two")],
message_count=2,
)
before = fs.files[f"{uri}/messages.jsonl"]

s = self._session(fs)
await s.load()

assert [m.content for m in s.messages] == ["live one", "live two"]
# No spurious rewrite of messages.jsonl.
assert fs.files[f"{uri}/messages.jsonl"] == before

async def test_legacy_session_without_meta_is_untouched(self):
"""No .meta.json (legacy): reconciliation must not run."""
fs = FakeVikingFS()
uri = Session(viking_fs=fs, session_id="s1487")._session_uri
fs.files[f"{uri}/messages.jsonl"] = (
_msg_line(0, "user", "legacy one")
+ "\n"
+ _msg_line(1, "assistant", "legacy two")
+ "\n"
)

s = self._session(fs)
await s.load()

assert [m.content for m in s.messages] == ["legacy one", "legacy two"]


if __name__ == "__main__":
import sys

sys.exit(pytest.main([__file__, "-v"]))