Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions openviking/models/rerank/openai_rerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,28 @@ def rerank_batch(self, query: str, documents: List[str]) -> Optional[List[float]
if not documents:
return []

# Filter out empty/blank documents that providers like SiliconFlow silently drop.
# Track original indices so we can reconstruct a full scores array.
non_empty_docs: List[str] = []
non_empty_indices: List[int] = []
for i, doc in enumerate(documents):
text = (doc or "").strip()
if text:
non_empty_docs.append(text)
non_empty_indices.append(i)
else:
logger.debug(
"[OpenAIRerankClient] Skipping empty document at index %s", i
)

if not non_empty_docs:
return [0.0] * len(documents)

req_body = {
"model": self.model_name,
"query": query,
"documents": documents,
"documents": non_empty_docs,
"top_n": len(non_empty_docs),
}

try:
Expand All @@ -87,34 +105,35 @@ def rerank_batch(self, query: str, documents: List[str]) -> Optional[List[float]
result = response.json()

# Update token usage tracking (estimate, OpenAI rerank doesn't provide token info)
self._extract_and_update_token_usage(result, query, documents)
self._extract_and_update_token_usage(result, query, non_empty_docs)

# Standard OpenAI/Cohere rerank format: results[].{index, relevance_score}
results = result.get("results")
if not results:
logger.warning(f"[OpenAIRerankClient] Unexpected response format: {result}")
return None

if len(results) != len(documents):
if len(results) != len(non_empty_docs):
logger.warning(
"[OpenAIRerankClient] Unexpected rerank result length: expected=%s actual=%s",
len(documents),
len(non_empty_docs),
len(results),
)
return None

# Results may not be in original order — sort by index
# Map API results (indexed against non_empty_docs) back to original indices
scores = [0.0] * len(documents)
for item in results:
idx = item.get("index")
if idx is None or not (0 <= idx < len(documents)):
api_idx = item.get("index")
if api_idx is None or not (0 <= api_idx < len(non_empty_docs)):
logger.warning(
"[OpenAIRerankClient] Out-of-bounds or missing index in result: %s", item
)
return None
scores[idx] = item.get("relevance_score", 0.0)
original_idx = non_empty_indices[api_idx]
scores[original_idx] = item.get("relevance_score", 0.0)

logger.debug(f"[OpenAIRerankClient] Reranked {len(documents)} documents")
logger.debug(f"[OpenAIRerankClient] Reranked {len(non_empty_docs)} documents")
return scores

except Exception as e:
Expand Down
19 changes: 11 additions & 8 deletions openviking/storage/viking_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import PurePath
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

from openviking.core.namespace import (
canonicalize_uri,
Expand Down Expand Up @@ -2381,19 +2381,22 @@ async def _write_relation_table(
async def read_batch(
self, uris: List[str], level: str = "l0", ctx: Optional[RequestContext] = None
) -> Dict[str, str]:
"""Batch read content from multiple URIs."""
results = {}
for uri in uris:
"""Batch read content from multiple URIs concurrently."""
async def _read_one(uri: str) -> Tuple[str, str]:
try:
content = ""
if level == "l0":
content = await self.abstract(uri, ctx=ctx)
elif level == "l1":
content = await self.overview(uri, ctx=ctx)
results[uri] = content
else:
content = ""
return uri, content
except Exception:
pass
return results
return uri, ""

tasks = [_read_one(uri) for uri in uris]
gathered = await asyncio.gather(*tasks)
return {uri: content for uri, content in gathered}

# ========== Other Preserved Methods ==========

Expand Down