feat: ✨ removed duplicate files when re-ingesting
This commit is contained in:
+142
-79
@@ -7,6 +7,8 @@ from langchain_community.document_loaders import TextLoader
|
||||
from langchain_community.vectorstores import FAISS
|
||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||
from tqdm import tqdm
|
||||
from typing import List, Dict, Any
|
||||
|
||||
|
||||
from config_loader import load_config
|
||||
from embedding import LocalLMEmbeddings
|
||||
@@ -25,6 +27,8 @@ CHUNK_OVERLAP=CFG["ingestion"]["chunk_overlap"]
|
||||
EMBEDDING_BATCH_SIZE=CFG["ingestion"]["embedding_batch_size"]
|
||||
TIMEFILE = CFG["ingestion"]["time_file_location"]
|
||||
|
||||
RIGHT_NOW = datetime.now().isoformat()
|
||||
|
||||
def load_documents(last_update_time):
|
||||
docs = []
|
||||
data_path = Path(DATA_DIR)
|
||||
@@ -48,12 +52,16 @@ def load_documents(last_update_time):
|
||||
doc.metadata["full_path"] = str(file_path.absolute())
|
||||
|
||||
docs.extend(loaded_docs)
|
||||
print(f"✅ Loaded: {file_path.name}")
|
||||
# print(f"✅ Loaded: {file_path.name}")
|
||||
except Exception as e:
|
||||
print(f"❌ Failed to load {file_path}: {e}")
|
||||
|
||||
print(f"✅ Loaded: {len(docs)} Files")
|
||||
return docs
|
||||
|
||||
def normalize_path(path_str):
|
||||
"""Convert string path to normalized absolute path."""
|
||||
return str(Path(path_str).resolve())
|
||||
|
||||
def chunk_documents(docs):
|
||||
# LangChain preserves metadata during splitting automatically
|
||||
@@ -96,73 +104,115 @@ def enrich_chunks(chunks: list) -> list:
|
||||
return [item[1] for item in enriched_results]
|
||||
|
||||
|
||||
def embed_chunks(chunks):
|
||||
def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Embed chunks and return a list of dictionaries with full metadata.
|
||||
Each dict contains:
|
||||
- file_path
|
||||
- file_name
|
||||
- chunk_data
|
||||
- synopsis
|
||||
- tags
|
||||
- entities
|
||||
- embedding
|
||||
Embed chunks in batches using the model's batch processing capability.
|
||||
Each batch is processed efficiently in parallel.
|
||||
|
||||
Args:
|
||||
chunks: List of document chunks (e.g., from LangChain's Document objects)
|
||||
batch_size: Number of chunks to embed per batch (default: 32)
|
||||
|
||||
Returns:
|
||||
List of dictionaries with full metadata and embeddings.
|
||||
"""
|
||||
# Initialize embedding model with batch support
|
||||
embeddings_model = LocalLMEmbeddings(
|
||||
model=EMBEDDING_MODEL,
|
||||
base_url=API_BASE,
|
||||
batch_size=EMBEDDING_BATCH_SIZE,
|
||||
batch_size=batch_size, # Let the model handle batching internally
|
||||
)
|
||||
|
||||
# Prepare list of dictionaries for output
|
||||
# Prepare output list
|
||||
embedded_chunks = []
|
||||
|
||||
for i, chunk in enumerate(tqdm(chunks, desc="Embedding chunks")):
|
||||
total_chunks = len(chunks)
|
||||
|
||||
print(f"🚀 Starting batch embedding of {total_chunks} chunks in batches of {batch_size}...")
|
||||
|
||||
# Process chunks in batches
|
||||
for i in tqdm(range(0, total_chunks, batch_size), desc="Embedding batches"):
|
||||
batch = chunks[i:i + batch_size]
|
||||
batch_content = [chunk.page_content for chunk in batch]
|
||||
|
||||
try:
|
||||
# Extract metadata
|
||||
file_path = chunk.metadata.get("full_path", "unknown")
|
||||
file_name = chunk.metadata.get("source", "unknown")
|
||||
content = chunk.page_content
|
||||
# Use model's batched embedding method
|
||||
# batch_embeddings = embeddings_model.embed_query(batch_content)
|
||||
batch_embeddings = embeddings_model.embed_documents(batch_content)
|
||||
|
||||
# Extract enriched metadata (from IngestionAgent)
|
||||
synopsis = chunk.metadata.get("synopsis", "No summary")
|
||||
tags = chunk.metadata.get("tags", [])
|
||||
entities = chunk.metadata.get("entities", [])
|
||||
# Process each chunk in the batch
|
||||
for j, (chunk, embedding) in enumerate(zip(batch, batch_embeddings)):
|
||||
# Extract metadata
|
||||
file_path_orig = chunk.metadata.get("full_path", "unknown")
|
||||
file_path = normalize_path(file_path_orig)
|
||||
|
||||
# Generate embedding
|
||||
embedding = embeddings_model.embed_query(content)
|
||||
file_name = chunk.metadata.get("source", "unknown")
|
||||
content = chunk.page_content
|
||||
|
||||
# Create structured dictionary
|
||||
chunk_data = {
|
||||
"file_path": file_path,
|
||||
"file_name": file_name,
|
||||
"chunk_data": content,
|
||||
"synopsis": synopsis,
|
||||
"tags": tags,
|
||||
"entities": entities,
|
||||
"embedding": embedding,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
synopsis = chunk.metadata.get("synopsis", "No summary")
|
||||
tags = chunk.metadata.get("tags", [])
|
||||
entities = chunk.metadata.get("entities", [])
|
||||
|
||||
embedded_chunks.append(chunk_data)
|
||||
# Create structured dictionary
|
||||
chunk_data = {
|
||||
"file_path": file_path,
|
||||
"file_name": file_name,
|
||||
"chunk_data": content,
|
||||
"synopsis": synopsis,
|
||||
"tags": tags,
|
||||
"entities": entities,
|
||||
"embedding": embedding,
|
||||
"timestamp": RIGHT_NOW,
|
||||
"original_index": i + j # Track original position
|
||||
}
|
||||
|
||||
embedded_chunks.append(chunk_data)
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Failed to embed chunk {i}: {e}")
|
||||
# Fallback entry
|
||||
embedded_chunks.append({
|
||||
"file_path": chunk.metadata.get("full_path", "unknown"),
|
||||
"file_name": chunk.metadata.get("source", "unknown"),
|
||||
"chunk_data": content,
|
||||
"synopsis": "Embedding failed",
|
||||
"tags": ["error"],
|
||||
"entities": [],
|
||||
"embedding": [],
|
||||
"timestamp": datetime.now().isoformat()
|
||||
})
|
||||
print(f"⚠️ Batch processing failed at index {i}: {e}")
|
||||
# Fallback: process individually (if needed)
|
||||
for j, chunk in enumerate(batch):
|
||||
try:
|
||||
content = chunk.page_content
|
||||
embedding = embeddings_model.embed_query(content)
|
||||
|
||||
file_path_orig = chunk.metadata.get("full_path", "unknown")
|
||||
file_path = normalize_path(file_path_orig)
|
||||
file_name = chunk.metadata.get("source", "unknown")
|
||||
synopsis = chunk.metadata.get("synopsis", "No summary")
|
||||
tags = chunk.metadata.get("tags", [])
|
||||
entities = chunk.metadata.get("entities", [])
|
||||
|
||||
chunk_data = {
|
||||
"file_path": file_path,
|
||||
"file_name": file_name,
|
||||
"chunk_data": content,
|
||||
"synopsis": synopsis,
|
||||
"tags": tags,
|
||||
"entities": entities,
|
||||
"embedding": embedding,
|
||||
"timestamp": RIGHT_NOW,
|
||||
"original_index": i + j
|
||||
}
|
||||
embedded_chunks.append(chunk_data)
|
||||
|
||||
except Exception as inner_e:
|
||||
print(f"❌ Failed to embed individual chunk {i + j}: {inner_e}")
|
||||
embedded_chunks.append({
|
||||
"file_path": normalize_path(chunk.metadata.get("full_path", "unknown")),
|
||||
"file_name": chunk.metadata.get("source", "unknown"),
|
||||
"chunk_data": content,
|
||||
"synopsis": "Embedding failed",
|
||||
"tags": ["error"],
|
||||
"entities": [],
|
||||
"embedding": [],
|
||||
"timestamp": RIGHT_NOW,
|
||||
"original_index": i + j
|
||||
})
|
||||
|
||||
print(f"✅ Completed embedding {len(embedded_chunks)} chunks in batches of {batch_size}.")
|
||||
return embedded_chunks
|
||||
|
||||
|
||||
def save_to_db(chunk_dicts):
|
||||
"""
|
||||
Save a list of dictionaries to the Turso database.
|
||||
@@ -218,10 +268,10 @@ def create_db():
|
||||
tags TEXT, -- comma-separated
|
||||
entities TEXT, -- comma-separated
|
||||
embedding F32_BLOB(4096),
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(file_path, chunk_data) -- avoid duplicates
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
# UNIQUE(file_path, chunk_data) -- avoid duplicates
|
||||
|
||||
# Indexes for faster queries
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding ON notes(embedding);")
|
||||
@@ -250,6 +300,43 @@ def update_timefile():
|
||||
file.write(current_time_str)
|
||||
return current_time_str
|
||||
|
||||
def delete_from_db(embedded_chunks):
|
||||
"""
|
||||
Delete existing rows from the 'notes' table where the file_path matches
|
||||
any file_path in the provided embedded_chunks list.
|
||||
This ensures we don't violate the UNIQUE constraint when inserting new chunks.
|
||||
"""
|
||||
if not embedded_chunks:
|
||||
print("No chunks to process; skipping deletion.")
|
||||
return
|
||||
|
||||
# Extract unique file_paths from the chunks
|
||||
file_paths = set(normalize_path(entry["file_path"]) for entry in embedded_chunks)
|
||||
|
||||
if not file_paths:
|
||||
print("No file paths found in embedded_chunks; skipping deletion.")
|
||||
return
|
||||
|
||||
print(f"Deleting existing rows for {len(file_paths)} file(s)")
|
||||
|
||||
con = turso.connect(DATABASE_PATH)
|
||||
cur = con.cursor()
|
||||
|
||||
# Use a single DELETE statement with IN clause for efficiency
|
||||
placeholders = ", ".join("?" for _ in file_paths)
|
||||
delete_sql = f"DELETE FROM notes WHERE file_path IN ({placeholders})"
|
||||
|
||||
try:
|
||||
cur.execute(delete_sql, list(file_paths))
|
||||
deleted_count = cur.rowcount
|
||||
print(f"✅ Deleted {deleted_count} rows matching file paths.")
|
||||
except Exception as e:
|
||||
print(f"❌ Error deleting from database: {e}")
|
||||
con.rollback()
|
||||
finally:
|
||||
con.close()
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
create_db()
|
||||
@@ -270,6 +357,9 @@ def main():
|
||||
embedded_chunks = embed_chunks(enriched_chunks)
|
||||
print(f"Embedded {len(embedded_chunks)} chunks.")
|
||||
|
||||
# remove existing rows from notes table that match file path
|
||||
delete_from_db(embedded_chunks)
|
||||
|
||||
save_to_db(embedded_chunks)
|
||||
print("🎉 Ingestion complete!")
|
||||
|
||||
@@ -277,31 +367,4 @@ def main():
|
||||
print(f"Updated timefile to: {updated}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
#TODO: create function to delete rows that match new files coming in
|
||||
# and handle danabase insert fails
|
||||
"""
|
||||
Traceback (most recent call last):
|
||||
File "/home/cosmic/source/dungeon_masters_vault/.venv/lib/python3.13/site-packages/turso/lib.py", line 643, in executemany
|
||||
result = _run_execute_with_io(stmt, self._connection.extra_io)
|
||||
File "/home/cosmic/source/dungeon_masters_vault/.venv/lib/python3.13/site-packages/turso/lib.py", line 163, in _run_execute_with_io
|
||||
result = stmt.execute()
|
||||
turso.Constraint: UNIQUE constraint failed: notes.(file_path, chunk_data) (19)
|
||||
|
||||
During handling of the above exception, another exception occurred:
|
||||
|
||||
Traceback (most recent call last):
|
||||
File "/home/cosmic/source/dungeon_masters_vault/src/ingest.py", line 280, in <module>
|
||||
main()
|
||||
~~~~^^
|
||||
File "/home/cosmic/source/dungeon_masters_vault/src/ingest.py", line 273, in main
|
||||
save_to_db(embedded_chunks)
|
||||
~~~~~~~~~~^^^^^^^^^^^^^^^^^
|
||||
File "/home/cosmic/source/dungeon_masters_vault/src/ingest.py", line 201, in save_to_db
|
||||
cur.executemany(insert_sql, batch_data)
|
||||
~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
File "/home/cosmic/source/dungeon_masters_vault/.venv/lib/python3.13/site-packages/turso/lib.py", line 656, in executemany
|
||||
raise _map_turso_exception(exc)
|
||||
turso.lib.IntegrityError: UNIQUE constraint failed: notes.(file_path, chunk_data) (19)
|
||||
"""
|
||||
main()
|
||||
Reference in New Issue
Block a user