From f200993110073b1de61a61a3a4f52d12a4cbf2b0 Mon Sep 17 00:00:00 2001 From: njuboy11 Date: Mon, 15 Jun 2026 19:49:45 +0800 Subject: [PATCH] fix(rerank): filter empty docs before rerank API call; concurrent read_batch 1. openai_rerank.py: Filter out empty/blank documents before sending to rerank API (SiliconFlow drops empty docs, causing result count mismatch). Map results back to original indices with 0.0 score for empty docs. 2. viking_fs.py: Make read_batch truly concurrent with asyncio.gather instead of sequential file reads. Added Tuple to typing imports. These two issues combined could add 3-5s to search latency with 24 candidates (empty doc causes rerank fallback + serial file reads). --- openviking/models/rerank/openai_rerank.py | 37 +++++++++++++++++------ openviking/storage/viking_fs.py | 19 +++++++----- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/openviking/models/rerank/openai_rerank.py b/openviking/models/rerank/openai_rerank.py index 50ff96843d..d251eb965d 100644 --- a/openviking/models/rerank/openai_rerank.py +++ b/openviking/models/rerank/openai_rerank.py @@ -63,10 +63,28 @@ def rerank_batch(self, query: str, documents: List[str]) -> Optional[List[float] if not documents: return [] + # Filter out empty/blank documents that providers like SiliconFlow silently drop. + # Track original indices so we can reconstruct a full scores array. + non_empty_docs: List[str] = [] + non_empty_indices: List[int] = [] + for i, doc in enumerate(documents): + text = (doc or "").strip() + if text: + non_empty_docs.append(text) + non_empty_indices.append(i) + else: + logger.debug( + "[OpenAIRerankClient] Skipping empty document at index %s", i + ) + + if not non_empty_docs: + return [0.0] * len(documents) + req_body = { "model": self.model_name, "query": query, - "documents": documents, + "documents": non_empty_docs, + "top_n": len(non_empty_docs), } try: @@ -87,7 +105,7 @@ def rerank_batch(self, query: str, documents: List[str]) -> Optional[List[float] result = response.json() # Update token usage tracking (estimate, OpenAI rerank doesn't provide token info) - self._extract_and_update_token_usage(result, query, documents) + self._extract_and_update_token_usage(result, query, non_empty_docs) # Standard OpenAI/Cohere rerank format: results[].{index, relevance_score} results = result.get("results") @@ -95,26 +113,27 @@ def rerank_batch(self, query: str, documents: List[str]) -> Optional[List[float] logger.warning(f"[OpenAIRerankClient] Unexpected response format: {result}") return None - if len(results) != len(documents): + if len(results) != len(non_empty_docs): logger.warning( "[OpenAIRerankClient] Unexpected rerank result length: expected=%s actual=%s", - len(documents), + len(non_empty_docs), len(results), ) return None - # Results may not be in original order — sort by index + # Map API results (indexed against non_empty_docs) back to original indices scores = [0.0] * len(documents) for item in results: - idx = item.get("index") - if idx is None or not (0 <= idx < len(documents)): + api_idx = item.get("index") + if api_idx is None or not (0 <= api_idx < len(non_empty_docs)): logger.warning( "[OpenAIRerankClient] Out-of-bounds or missing index in result: %s", item ) return None - scores[idx] = item.get("relevance_score", 0.0) + original_idx = non_empty_indices[api_idx] + scores[original_idx] = item.get("relevance_score", 0.0) - logger.debug(f"[OpenAIRerankClient] Reranked {len(documents)} documents") + logger.debug(f"[OpenAIRerankClient] Reranked {len(non_empty_docs)} documents") return scores except Exception as e: diff --git a/openviking/storage/viking_fs.py b/openviking/storage/viking_fs.py index 3bc90b3b1f..2f780aa150 100644 --- a/openviking/storage/viking_fs.py +++ b/openviking/storage/viking_fs.py @@ -22,7 +22,7 @@ from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import PurePath -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from openviking.core.namespace import ( canonicalize_uri, @@ -2381,19 +2381,22 @@ async def _write_relation_table( async def read_batch( self, uris: List[str], level: str = "l0", ctx: Optional[RequestContext] = None ) -> Dict[str, str]: - """Batch read content from multiple URIs.""" - results = {} - for uri in uris: + """Batch read content from multiple URIs concurrently.""" + async def _read_one(uri: str) -> Tuple[str, str]: try: - content = "" if level == "l0": content = await self.abstract(uri, ctx=ctx) elif level == "l1": content = await self.overview(uri, ctx=ctx) - results[uri] = content + else: + content = "" + return uri, content except Exception: - pass - return results + return uri, "" + + tasks = [_read_one(uri) for uri in uris] + gathered = await asyncio.gather(*tasks) + return {uri: content for uri, content in gathered} # ========== Other Preserved Methods ==========