Skip to content

Commit 1b07fc3

Browse files
Added DummySettings and DummyVectorStore for standalone embedding pipeline testing
1 parent fbf7cf8 commit 1b07fc3

File tree

1 file changed

+145
-87
lines changed

1 file changed

+145
-87
lines changed
Lines changed: 145 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,111 @@
11
import os
22
import logging
3-
from typing import List, Any
3+
from typing import List, Any, Optional, Dict
44
from retry import retry
55
from tqdm import tqdm
6-
from application.core.settings import settings
7-
from application.vectorstore.vector_creator import VectorCreator
86

7+
# Optional: these imports can be swapped or mocked for testing
8+
try:
9+
from application.core.settings import settings
10+
from application.vectorstore.vector_creator import VectorCreator
11+
except ImportError:
12+
# Allow standalone testing without full project dependencies
13+
class DummySettings:
14+
VECTOR_STORE = "faiss"
15+
settings = DummySettings()
916

10-
def sanitize_content(content: str) -> str:
11-
"""
12-
Remove NUL characters that can cause vector store ingestion to fail.
13-
14-
Args:
15-
content (str): Raw content that may contain NUL characters
16-
17-
Returns:
18-
str: Sanitized content with NUL characters removed
19-
"""
20-
if not content:
21-
return content
22-
return content.replace('\x00', '')
17+
class DummyVectorStore:
18+
def __init__(self):
19+
self.texts = []
20+
21+
def add_texts(self, texts, metadatas=None):
22+
for text, meta in zip(texts, metadatas or [{}]):
23+
self.texts.append({"text": text, "metadata": meta})
24+
25+
def delete_index(self):
26+
self.texts.clear()
27+
28+
def save_local(self, folder_name):
29+
os.makedirs(folder_name, exist_ok=True)
30+
with open(os.path.join(folder_name, "store.txt"), "w") as f:
31+
for entry in self.texts:
32+
f.write(f"{entry}\n")
33+
34+
class VectorCreator:
35+
@staticmethod
36+
def create_vectorstore(store_type, **kwargs):
37+
return DummyVectorStore()
38+
39+
40+
# -----------------------------------------------------------
41+
# Utility Functions
42+
# -----------------------------------------------------------
43+
44+
def sanitize_content(content: Optional[str]) -> str:
45+
"""Remove NUL characters that cause vector ingestion failures."""
46+
return content.replace('\x00', '') if content else ""
2347

2448

2549
@retry(tries=10, delay=60)
2650
def add_text_to_store_with_retry(store: Any, doc: Any, source_id: str) -> None:
27-
"""Add a document's text and metadata to the vector store with retry logic.
28-
29-
Args:
30-
store: The vector store object.
31-
doc: The document to be added.
32-
source_id: Unique identifier for the source.
33-
34-
Raises:
35-
Exception: If document addition fails after all retry attempts.
36-
"""
51+
"""Add a document's text and metadata to the vector store with retries."""
3752
try:
38-
# Sanitize content to remove NUL characters that cause ingestion failures
39-
doc.page_content = sanitize_content(doc.page_content)
40-
41-
doc.metadata["source_id"] = str(source_id)
42-
store.add_texts([doc.page_content], metadatas=[doc.metadata])
53+
doc.page_content = sanitize_content(getattr(doc, "page_content", ""))
54+
metadata = getattr(doc, "metadata", {}) or {}
55+
metadata["source_id"] = str(source_id)
56+
57+
store.add_texts([doc.page_content], metadatas=[metadata])
58+
logging.debug(f"✅ Successfully added document to store: {metadata.get('source_id')}")
4359
except Exception as e:
44-
logging.error(f"Failed to add document with retry: {e}", exc_info=True)
60+
logging.error(f"Failed to add document: {e}", exc_info=True)
4561
raise
4662

4763

48-
def embed_and_store_documents(docs: List[Any], folder_name: str, source_id: str, task_status: Any) -> None:
49-
"""Embeds documents and stores them in a vector store.
64+
# -----------------------------------------------------------
65+
# Main Embedding Function
66+
# -----------------------------------------------------------
5067

51-
Args:
52-
docs: List of documents to be embedded and stored.
53-
folder_name: Directory to save the vector store.
54-
source_id: Unique identifier for the source.
55-
task_status: Task state manager for progress updates.
56-
57-
Returns:
58-
None
59-
60-
Raises:
61-
OSError: If unable to create folder or save vector store.
62-
Exception: If vector store creation or document embedding fails.
68+
def embed_and_store_documents(
69+
docs: List[Any],
70+
folder_name: str,
71+
source_id: str,
72+
task_status: Optional[Any] = None,
73+
retries: int = 10,
74+
retry_delay: int = 60,
75+
) -> None:
76+
"""
77+
Embed documents and store them in a vector store.
78+
Includes fault tolerance, retry logic, and progress saving.
6379
"""
64-
# Ensure the folder exists
65-
if not os.path.exists(folder_name):
66-
os.makedirs(folder_name)
80+
os.makedirs(folder_name, exist_ok=True)
81+
logging.info(f"📁 Output folder ready: {folder_name}")
82+
83+
# Early return if there are no documents
84+
if not docs:
85+
logging.info("No documents provided to embed. Initializing empty store and exiting.")
86+
# Create store and save empty state if possible
87+
store = VectorCreator.create_vectorstore(
88+
settings.VECTOR_STORE,
89+
source_id=source_id,
90+
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
91+
)
92+
try:
93+
store.save_local(folder_name)
94+
logging.info("✅ Empty vector store saved successfully.")
95+
except Exception as e:
96+
logging.critical(f"🔥 Failed to save empty vector store: {e}", exc_info=True)
97+
return
6798

68-
# Initialize vector store
99+
# Initialize vector store. For FAISS, the implementation originally popped
100+
# the first doc out of the list; make this safe if docs is small.
69101
if settings.VECTOR_STORE == "faiss":
70-
docs_init = [docs.pop(0)]
102+
docs_init = []
103+
if len(docs) > 0:
104+
# pop the first doc for any special initialization behavior
105+
try:
106+
docs_init = [docs.pop(0)]
107+
except Exception:
108+
docs_init = []
71109
store = VectorCreator.create_vectorstore(
72110
settings.VECTOR_STORE,
73111
docs_init=docs_init,
@@ -80,43 +118,63 @@ def embed_and_store_documents(docs: List[Any], folder_name: str, source_id: str,
80118
source_id=source_id,
81119
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
82120
)
83-
store.delete_index()
121+
# clear any existing index for non-faiss backends
122+
try:
123+
store.delete_index()
124+
except Exception:
125+
# not all backends may implement delete_index
126+
logging.debug("store.delete_index() not available for this backend")
84127

85128
total_docs = len(docs)
129+
logging.info(f"🚀 Starting embedding process for {total_docs} documents.")
86130

87-
# Process and embed documents
88-
for idx, doc in tqdm(
89-
enumerate(docs),
90-
desc="Embedding 🦖",
91-
unit="docs",
92-
total=total_docs,
93-
bar_format="{l_bar}{bar}| Time Left: {remaining}",
94-
):
95-
try:
96-
# Update task status for progress tracking
97-
progress = int(((idx + 1) / total_docs) * 100)
98-
task_status.update_state(state="PROGRESS", meta={"current": progress})
99-
100-
# Add document to vector store
101-
add_text_to_store_with_retry(store, doc, source_id)
102-
except Exception as e:
103-
logging.error(f"Error embedding document {idx}: {e}", exc_info=True)
104-
logging.info(f"Saving progress at document {idx} out of {total_docs}")
105-
try:
106-
store.save_local(folder_name)
107-
logging.info("Progress saved successfully")
108-
except Exception as save_error:
109-
logging.error(f"CRITICAL: Failed to save progress: {save_error}", exc_info=True)
110-
# Continue without breaking to attempt final save
111-
break
112-
113-
# Save the vector store
114-
if settings.VECTOR_STORE == "faiss":
115-
try:
116-
store.save_local(folder_name)
117-
logging.info("Vector store saved successfully.")
118-
except Exception as e:
119-
logging.error(f"CRITICAL: Failed to save final vector store: {e}", exc_info=True)
120-
raise OSError(f"Unable to save vector store to {folder_name}: {e}") from e
131+
if total_docs == 0:
132+
logging.info("No remaining documents to process after initialization.")
121133
else:
122-
logging.info("Vector store saved successfully.")
134+
for idx, doc in tqdm(
135+
enumerate(docs),
136+
desc="Embedding 🦖",
137+
total=total_docs,
138+
unit="docs",
139+
bar_format="{l_bar}{bar}| Time Left: {remaining}",
140+
):
141+
try:
142+
# protect against division by zero by using total_docs which is >0 here
143+
progress = int(((idx + 1) / total_docs) * 100)
144+
if task_status:
145+
task_status.update_state(state="PROGRESS", meta={"current": progress})
146+
147+
add_text_to_store_with_retry(store, doc, source_id)
148+
except Exception as e:
149+
logging.error(f"⚠️ Error embedding document {idx}: {e}", exc_info=True)
150+
logging.info(f"Saving progress at document {idx} / {total_docs}...")
151+
try:
152+
store.save_local(folder_name)
153+
logging.info("✅ Partial progress saved successfully.")
154+
except Exception as save_error:
155+
logging.critical(f"❌ Failed to save partial progress: {save_error}", exc_info=True)
156+
break
157+
158+
# Save final store
159+
try:
160+
store.save_local(folder_name)
161+
logging.info("🎉 Vector store saved successfully.")
162+
except Exception as e:
163+
logging.critical(f"🔥 Failed to save final store: {e}", exc_info=True)
164+
raise OSError(f"Unable to save vector store: {e}") from e
165+
166+
167+
# -----------------------------------------------------------
168+
# Example Usage (for testing)
169+
# -----------------------------------------------------------
170+
171+
if __name__ == "__main__":
172+
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
173+
174+
class DummyDoc:
175+
def __init__(self, content, meta=None):
176+
self.page_content = content
177+
self.metadata = meta or {}
178+
179+
dummy_docs = [DummyDoc(f"Sample document {i}") for i in range(5)]
180+
embed_and_store_documents(dummy_docs, "output_vectors", source_id="12345")

0 commit comments

Comments
 (0)