Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ LANGSMITH_API_KEY=
LANGSMITH_PROJECT=
LANGSMITH_TRACING=

# Local SearxNG (search engine) instance
# Set `SEARXNG_URL` to your local SearxNG server (default below) or a remote instance.
SEARXNG_URL=http://localhost:8080
# Only necessary for Open Agent Platform
SUPABASE_KEY=
SUPABASE_URL=
Expand Down
2 changes: 2 additions & 0 deletions src/open_deep_research/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class SearchAPI(Enum):
ANTHROPIC = "anthropic"
OPENAI = "openai"
TAVILY = "tavily"
SEARXNG = "searxng"
NONE = "none"

class MCPConfig(BaseModel):
Expand Down Expand Up @@ -85,6 +86,7 @@ class Configuration(BaseModel):
"options": [
{"label": "Tavily", "value": SearchAPI.TAVILY.value},
{"label": "OpenAI Native Web Search", "value": SearchAPI.OPENAI.value},
{"label": "SearxNG (local)", "value": SearchAPI.SEARXNG.value},
{"label": "Anthropic Native Web Search", "value": SearchAPI.ANTHROPIC.value},
{"label": "None", "value": SearchAPI.NONE.value}
]
Expand Down
161 changes: 159 additions & 2 deletions src/open_deep_research/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,32 @@ async def noop():

return formatted_output


# Wrapper: try Tavily first, fallback to SearxNG if Tavily fails or returns no results
@tool(description=TAVILY_SEARCH_DESCRIPTION)
async def tavily_search_with_fallback(
queries: List[str],
max_results: Annotated[int, InjectedToolArg] = 5,
topic: Annotated[Literal["general", "news", "finance"], InjectedToolArg] = "general",
config: RunnableConfig = None,
) -> str:
"""Try Tavily search and fall back to local SearxNG if Tavily fails or returns no results."""
try:
tavily_result = await tavily_search(queries, max_results=max_results, topic=topic, config=config)
if tavily_result and "No valid search results found" not in tavily_result:
return tavily_result
except Exception as e:
logging.warning(f"Tavily search failed: {e}; falling back to SearxNG")

# Fall back to SearxNG
try:
searxng_result = await searxng_search(queries, max_results=max_results, config=config)
return searxng_result
except Exception as e:
logging.error(f"SearxNG fallback also failed: {e}")
# If both fail, return a helpful message
return "Search failed for both Tavily and SearxNG. Please check search provider configuration."

async def tavily_search_async(
search_queries,
max_results: int = 5,
Expand Down Expand Up @@ -172,6 +198,128 @@ async def tavily_search_async(
search_results = await asyncio.gather(*search_tasks)
return search_results


##########################
# SearxNG Search Tool Utils
##########################


async def searxng_search(
queries: List[str],
max_results: Annotated[int, InjectedToolArg] = 5,
config: RunnableConfig = None,
) -> str:
"""Fetch and summarize search results from a local SearxNG instance.

Expects SearxNG to be available at http://localhost:8080/ by default.
Returns a formatted string suitable for the LLM consumption, matching the
structure used by `tavily_search`.
"""
search_results = await searxng_search_async(
queries, max_results=max_results, config=config
)

# Reuse the same formatting logic as tavily_search
unique_results = {}
for response in search_results:
for result in response.get('results', []):
url = result.get('url')
if not url:
continue
if url not in unique_results:
unique_results[url] = {**result, "query": response.get('query')}

configurable = Configuration.from_runnable_config(config)
max_char_to_include = configurable.max_content_length

# Summarize content if available
async def noop():
return None

summarization_tasks = [
noop() if not result.get("raw_content")
else summarize_webpage(
init_chat_model(
model=configurable.summarization_model,
max_tokens=configurable.summarization_model_max_tokens,
api_key=get_api_key_for_model(configurable.summarization_model, config),
).with_structured_output(Summary),
result['raw_content'][:max_char_to_include]
)
for result in unique_results.values()
]

summaries = await asyncio.gather(*summarization_tasks)

summarized_results = {
url: {
'title': result.get('title'),
'content': result.get('content') if summary is None else summary
}
for url, result, summary in zip(
unique_results.keys(), unique_results.values(), summaries
)
}

if not summarized_results:
return "No valid search results found. Please try different search queries or use a different search API."

formatted_output = "Search results: \n\n"
for i, (url, result) in enumerate(summarized_results.items()):
formatted_output += f"\n\n--- SOURCE {i+1}: {result['title']} ---\n"
formatted_output += f"URL: {url}\n\n"
formatted_output += f"SUMMARY:\n{result['content']}\n\n"
formatted_output += "\n\n" + "-" * 80 + "\n"

return formatted_output


async def searxng_search_async(
search_queries: List[str],
max_results: int = 5,
config: RunnableConfig = None,
):
"""Query a local SearxNG instance and normalize results.

Assumes SearxNG is available at http://localhost:8080/ unless
environment variable SEARXNG_URL is set.
"""
searx_base = os.getenv("SEARXNG_URL", "http://localhost:8080")
endpoint = searx_base.rstrip("/") + "/search"

async def fetch(session: aiohttp.ClientSession, query: str):
params = {
"q": query,
"format": "json",
"categories": "general",
"safesearch": 1,
"engines": "",
}
try:
async with session.get(endpoint, params=params, timeout=30) as resp:
if resp.status != 200:
return {"query": query, "results": []}
data = await resp.json()

results = []
for hit in data.get("results", [])[:max_results]:
results.append({
"url": hit.get("url"),
"title": hit.get("title") or hit.get("url"),
"content": hit.get("content") or hit.get("excerpt") or "",
"raw_content": hit.get("content") or hit.get("excerpt") or "",
})

return {"query": query, "results": results}
except Exception as e:
logging.warning(f"SearxNG query failed for '{query}': {e}")
return {"query": query, "results": []}

async with aiohttp.ClientSession() as session:
tasks = [fetch(session, q) for q in search_queries]
responses = await asyncio.gather(*tasks)
return responses

async def summarize_webpage(model: BaseChatModel, webpage_content: str) -> str:
"""Summarize webpage content using AI model with timeout protection.

Expand Down Expand Up @@ -550,14 +698,23 @@ async def get_search_tool(search_api: SearchAPI):
return [{"type": "web_search_preview"}]

elif search_api == SearchAPI.TAVILY:
# Configure Tavily search tool with metadata
search_tool = tavily_search
# Configure Tavily search tool with SearxNG fallback
search_tool = tavily_search_with_fallback
search_tool.metadata = {
**(search_tool.metadata or {}),
"type": "search",
"name": "web_search"
}
return [search_tool]
elif search_api == SearchAPI.SEARXNG:
# Configure local SearxNG search tool
search_tool = searxng_search
search_tool.metadata = {
**(getattr(search_tool, "metadata", {}) or {}),
"type": "search",
"name": "web_search_searxng",
}
return [search_tool]

elif search_api == SearchAPI.NONE:
# No search functionality configured
Expand Down