Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 145 additions & 87 deletions application/parser/embedding_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,111 @@
import os
import logging
from typing import List, Any
from typing import List, Any, Optional, Dict
from retry import retry
from tqdm import tqdm
from application.core.settings import settings
from application.vectorstore.vector_creator import VectorCreator

# Optional: these imports can be swapped or mocked for testing
try:
from application.core.settings import settings
from application.vectorstore.vector_creator import VectorCreator
except ImportError:
# Allow standalone testing without full project dependencies
class DummySettings:
VECTOR_STORE = "faiss"
settings = DummySettings()

def sanitize_content(content: str) -> str:
"""
Remove NUL characters that can cause vector store ingestion to fail.

Args:
content (str): Raw content that may contain NUL characters

Returns:
str: Sanitized content with NUL characters removed
"""
if not content:
return content
return content.replace('\x00', '')
class DummyVectorStore:
def __init__(self):
self.texts = []

def add_texts(self, texts, metadatas=None):
for text, meta in zip(texts, metadatas or [{}]):
self.texts.append({"text": text, "metadata": meta})

def delete_index(self):
self.texts.clear()

def save_local(self, folder_name):
os.makedirs(folder_name, exist_ok=True)
with open(os.path.join(folder_name, "store.txt"), "w") as f:
for entry in self.texts:
f.write(f"{entry}\n")

class VectorCreator:
@staticmethod
def create_vectorstore(store_type, **kwargs):
return DummyVectorStore()


# -----------------------------------------------------------
# Utility Functions
# -----------------------------------------------------------

def sanitize_content(content: Optional[str]) -> str:
"""Remove NUL characters that cause vector ingestion failures."""
return content.replace('\x00', '') if content else ""


@retry(tries=10, delay=60)
def add_text_to_store_with_retry(store: Any, doc: Any, source_id: str) -> None:
"""Add a document's text and metadata to the vector store with retry logic.

Args:
store: The vector store object.
doc: The document to be added.
source_id: Unique identifier for the source.

Raises:
Exception: If document addition fails after all retry attempts.
"""
"""Add a document's text and metadata to the vector store with retries."""
try:
# Sanitize content to remove NUL characters that cause ingestion failures
doc.page_content = sanitize_content(doc.page_content)

doc.metadata["source_id"] = str(source_id)
store.add_texts([doc.page_content], metadatas=[doc.metadata])
doc.page_content = sanitize_content(getattr(doc, "page_content", ""))
metadata = getattr(doc, "metadata", {}) or {}
metadata["source_id"] = str(source_id)

store.add_texts([doc.page_content], metadatas=[metadata])
logging.debug(f"✅ Successfully added document to store: {metadata.get('source_id')}")
except Exception as e:
logging.error(f"Failed to add document with retry: {e}", exc_info=True)
logging.error(f"Failed to add document: {e}", exc_info=True)
raise


def embed_and_store_documents(docs: List[Any], folder_name: str, source_id: str, task_status: Any) -> None:
"""Embeds documents and stores them in a vector store.
# -----------------------------------------------------------
# Main Embedding Function
# -----------------------------------------------------------

Args:
docs: List of documents to be embedded and stored.
folder_name: Directory to save the vector store.
source_id: Unique identifier for the source.
task_status: Task state manager for progress updates.

Returns:
None

Raises:
OSError: If unable to create folder or save vector store.
Exception: If vector store creation or document embedding fails.
def embed_and_store_documents(
docs: List[Any],
folder_name: str,
source_id: str,
task_status: Optional[Any] = None,
retries: int = 10,
retry_delay: int = 60,
) -> None:
"""
Embed documents and store them in a vector store.
Includes fault tolerance, retry logic, and progress saving.
"""
# Ensure the folder exists
if not os.path.exists(folder_name):
os.makedirs(folder_name)
os.makedirs(folder_name, exist_ok=True)
logging.info(f"📁 Output folder ready: {folder_name}")

# Early return if there are no documents
if not docs:
logging.info("No documents provided to embed. Initializing empty store and exiting.")
# Create store and save empty state if possible
store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE,
source_id=source_id,
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
try:
store.save_local(folder_name)
logging.info("✅ Empty vector store saved successfully.")
except Exception as e:
logging.critical(f"🔥 Failed to save empty vector store: {e}", exc_info=True)
return

# Initialize vector store
# Initialize vector store. For FAISS, the implementation originally popped
# the first doc out of the list; make this safe if docs is small.
if settings.VECTOR_STORE == "faiss":
docs_init = [docs.pop(0)]
docs_init = []
if len(docs) > 0:
# pop the first doc for any special initialization behavior
try:
docs_init = [docs.pop(0)]
except Exception:
docs_init = []
store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE,
docs_init=docs_init,
Expand All @@ -80,43 +118,63 @@ def embed_and_store_documents(docs: List[Any], folder_name: str, source_id: str,
source_id=source_id,
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
store.delete_index()
# clear any existing index for non-faiss backends
try:
store.delete_index()
except Exception:
# not all backends may implement delete_index
logging.debug("store.delete_index() not available for this backend")

total_docs = len(docs)
logging.info(f"🚀 Starting embedding process for {total_docs} documents.")

# Process and embed documents
for idx, doc in tqdm(
enumerate(docs),
desc="Embedding 🦖",
unit="docs",
total=total_docs,
bar_format="{l_bar}{bar}| Time Left: {remaining}",
):
try:
# Update task status for progress tracking
progress = int(((idx + 1) / total_docs) * 100)
task_status.update_state(state="PROGRESS", meta={"current": progress})

# Add document to vector store
add_text_to_store_with_retry(store, doc, source_id)
except Exception as e:
logging.error(f"Error embedding document {idx}: {e}", exc_info=True)
logging.info(f"Saving progress at document {idx} out of {total_docs}")
try:
store.save_local(folder_name)
logging.info("Progress saved successfully")
except Exception as save_error:
logging.error(f"CRITICAL: Failed to save progress: {save_error}", exc_info=True)
# Continue without breaking to attempt final save
break

# Save the vector store
if settings.VECTOR_STORE == "faiss":
try:
store.save_local(folder_name)
logging.info("Vector store saved successfully.")
except Exception as e:
logging.error(f"CRITICAL: Failed to save final vector store: {e}", exc_info=True)
raise OSError(f"Unable to save vector store to {folder_name}: {e}") from e
if total_docs == 0:
logging.info("No remaining documents to process after initialization.")
else:
logging.info("Vector store saved successfully.")
for idx, doc in tqdm(
enumerate(docs),
desc="Embedding 🦖",
total=total_docs,
unit="docs",
bar_format="{l_bar}{bar}| Time Left: {remaining}",
):
try:
# protect against division by zero by using total_docs which is >0 here
progress = int(((idx + 1) / total_docs) * 100)
if task_status:
task_status.update_state(state="PROGRESS", meta={"current": progress})

add_text_to_store_with_retry(store, doc, source_id)
except Exception as e:
logging.error(f"⚠️ Error embedding document {idx}: {e}", exc_info=True)
logging.info(f"Saving progress at document {idx} / {total_docs}...")
try:
store.save_local(folder_name)
logging.info("✅ Partial progress saved successfully.")
except Exception as save_error:
logging.critical(f"❌ Failed to save partial progress: {save_error}", exc_info=True)
break

# Save final store
try:
store.save_local(folder_name)
logging.info("🎉 Vector store saved successfully.")
except Exception as e:
logging.critical(f"🔥 Failed to save final store: {e}", exc_info=True)
raise OSError(f"Unable to save vector store: {e}") from e


# -----------------------------------------------------------
# Example Usage (for testing)
# -----------------------------------------------------------

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

class DummyDoc:
def __init__(self, content, meta=None):
self.page_content = content
self.metadata = meta or {}

dummy_docs = [DummyDoc(f"Sample document {i}") for i in range(5)]
embed_and_store_documents(dummy_docs, "output_vectors", source_id="12345")