diff --git a/src/config.yaml b/src/config.yaml index 1be9f7d..af23feb 100644 --- a/src/config.yaml +++ b/src/config.yaml @@ -11,7 +11,7 @@ models: # --- Ingestion Settings --- ingestion: - data_dir: "/home/cosmic/DnD" + data_dir: "/home/devin/DnD" db_path: "./data/dmv.db" max_workers: 8 chunk_size: 800 diff --git a/src/experts/dnd_agent.py b/src/experts/dnd_agent.py index ec92cc0..d312c29 100644 --- a/src/experts/dnd_agent.py +++ b/src/experts/dnd_agent.py @@ -12,6 +12,24 @@ DATABASE_PATH = CFG["ingestion"]["db_path"] EMBEDDING_MODEL = CFG["models"]["embedding"] API_BASE = CFG["api"]["base_url"] +import turso + +# Inside your retrieval logic: +def retrieve_from_turso(question, k=5): + # Example query: search for relevant notes using full-text search or embedding similarity + # Note: Turso supports SQLite, so you can use FTS5 or a vector extension if available + query = f""" + SELECT source, synopsis, tags, entities, content, embedding + FROM notes + WHERE content LIKE ? OR synopsis LIKE ? + ORDER BY (similarity(embedding, ?)) DESC + LIMIT {k} + """ + # You'll need to generate or store embeddings in the DB or use a function to compute similarity + # If embeddings are stored, you can query them directly + # Otherwise, you'll need to compute embeddings in Python and compare + results = turso.execute(query, (f"%{question}%", f"%{question}%", question)) + return results # --- DSPy Signature --- class DnDContextQA(dspy.Signature): diff --git a/src/ingest.py b/src/ingest.py index 44d7b21..7cc81cd 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -7,6 +7,8 @@ from langchain_community.document_loaders import TextLoader from langchain_community.vectorstores import FAISS from langchain_text_splitters import RecursiveCharacterTextSplitter from tqdm import tqdm +from typing import List, Dict, Any + from config_loader import load_config from embedding import LocalLMEmbeddings @@ -25,6 +27,8 @@ CHUNK_OVERLAP=CFG["ingestion"]["chunk_overlap"] EMBEDDING_BATCH_SIZE=CFG["ingestion"]["embedding_batch_size"] TIMEFILE = CFG["ingestion"]["time_file_location"] +RIGHT_NOW = datetime.now().isoformat() + def load_documents(last_update_time): docs = [] data_path = Path(DATA_DIR) @@ -48,12 +52,16 @@ def load_documents(last_update_time): doc.metadata["full_path"] = str(file_path.absolute()) docs.extend(loaded_docs) - print(f"✅ Loaded: {file_path.name}") + # print(f"✅ Loaded: {file_path.name}") except Exception as e: print(f"❌ Failed to load {file_path}: {e}") + print(f"✅ Loaded: {len(docs)} Files") return docs +def normalize_path(path_str): + """Convert string path to normalized absolute path.""" + return str(Path(path_str).resolve()) def chunk_documents(docs): # LangChain preserves metadata during splitting automatically @@ -96,73 +104,115 @@ def enrich_chunks(chunks: list) -> list: return [item[1] for item in enriched_results] -def embed_chunks(chunks): +def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> List[Dict[str, Any]]: """ - Embed chunks and return a list of dictionaries with full metadata. - Each dict contains: - - file_path - - file_name - - chunk_data - - synopsis - - tags - - entities - - embedding + Embed chunks in batches using the model's batch processing capability. + Each batch is processed efficiently in parallel. + + Args: + chunks: List of document chunks (e.g., from LangChain's Document objects) + batch_size: Number of chunks to embed per batch (default: 32) + + Returns: + List of dictionaries with full metadata and embeddings. """ + # Initialize embedding model with batch support embeddings_model = LocalLMEmbeddings( model=EMBEDDING_MODEL, base_url=API_BASE, - batch_size=EMBEDDING_BATCH_SIZE, + batch_size=batch_size, # Let the model handle batching internally ) - # Prepare list of dictionaries for output + # Prepare output list embedded_chunks = [] - for i, chunk in enumerate(tqdm(chunks, desc="Embedding chunks")): + total_chunks = len(chunks) + + print(f"🚀 Starting batch embedding of {total_chunks} chunks in batches of {batch_size}...") + + # Process chunks in batches + for i in tqdm(range(0, total_chunks, batch_size), desc="Embedding batches"): + batch = chunks[i:i + batch_size] + batch_content = [chunk.page_content for chunk in batch] + try: - # Extract metadata - file_path = chunk.metadata.get("full_path", "unknown") - file_name = chunk.metadata.get("source", "unknown") - content = chunk.page_content + # Use model's batched embedding method + # batch_embeddings = embeddings_model.embed_query(batch_content) + batch_embeddings = embeddings_model.embed_documents(batch_content) - # Extract enriched metadata (from IngestionAgent) - synopsis = chunk.metadata.get("synopsis", "No summary") - tags = chunk.metadata.get("tags", []) - entities = chunk.metadata.get("entities", []) + # Process each chunk in the batch + for j, (chunk, embedding) in enumerate(zip(batch, batch_embeddings)): + # Extract metadata + file_path_orig = chunk.metadata.get("full_path", "unknown") + file_path = normalize_path(file_path_orig) - # Generate embedding - embedding = embeddings_model.embed_query(content) + file_name = chunk.metadata.get("source", "unknown") + content = chunk.page_content - # Create structured dictionary - chunk_data = { - "file_path": file_path, - "file_name": file_name, - "chunk_data": content, - "synopsis": synopsis, - "tags": tags, - "entities": entities, - "embedding": embedding, - "timestamp": datetime.now().isoformat() - } + synopsis = chunk.metadata.get("synopsis", "No summary") + tags = chunk.metadata.get("tags", []) + entities = chunk.metadata.get("entities", []) - embedded_chunks.append(chunk_data) + # Create structured dictionary + chunk_data = { + "file_path": file_path, + "file_name": file_name, + "chunk_data": content, + "synopsis": synopsis, + "tags": tags, + "entities": entities, + "embedding": embedding, + "timestamp": RIGHT_NOW, + "original_index": i + j # Track original position + } + + embedded_chunks.append(chunk_data) except Exception as e: - print(f"⚠️ Failed to embed chunk {i}: {e}") - # Fallback entry - embedded_chunks.append({ - "file_path": chunk.metadata.get("full_path", "unknown"), - "file_name": chunk.metadata.get("source", "unknown"), - "chunk_data": content, - "synopsis": "Embedding failed", - "tags": ["error"], - "entities": [], - "embedding": [], - "timestamp": datetime.now().isoformat() - }) + print(f"⚠️ Batch processing failed at index {i}: {e}") + # Fallback: process individually (if needed) + for j, chunk in enumerate(batch): + try: + content = chunk.page_content + embedding = embeddings_model.embed_query(content) + file_path_orig = chunk.metadata.get("full_path", "unknown") + file_path = normalize_path(file_path_orig) + file_name = chunk.metadata.get("source", "unknown") + synopsis = chunk.metadata.get("synopsis", "No summary") + tags = chunk.metadata.get("tags", []) + entities = chunk.metadata.get("entities", []) + + chunk_data = { + "file_path": file_path, + "file_name": file_name, + "chunk_data": content, + "synopsis": synopsis, + "tags": tags, + "entities": entities, + "embedding": embedding, + "timestamp": RIGHT_NOW, + "original_index": i + j + } + embedded_chunks.append(chunk_data) + + except Exception as inner_e: + print(f"❌ Failed to embed individual chunk {i + j}: {inner_e}") + embedded_chunks.append({ + "file_path": normalize_path(chunk.metadata.get("full_path", "unknown")), + "file_name": chunk.metadata.get("source", "unknown"), + "chunk_data": content, + "synopsis": "Embedding failed", + "tags": ["error"], + "entities": [], + "embedding": [], + "timestamp": RIGHT_NOW, + "original_index": i + j + }) + + print(f"✅ Completed embedding {len(embedded_chunks)} chunks in batches of {batch_size}.") return embedded_chunks - def save_to_db(chunk_dicts): """ Save a list of dictionaries to the Turso database. @@ -218,10 +268,10 @@ def create_db(): tags TEXT, -- comma-separated entities TEXT, -- comma-separated embedding F32_BLOB(4096), - timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, - UNIQUE(file_path, chunk_data) -- avoid duplicates + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) """) + # UNIQUE(file_path, chunk_data) -- avoid duplicates # Indexes for faster queries cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding ON notes(embedding);") @@ -250,6 +300,43 @@ def update_timefile(): file.write(current_time_str) return current_time_str +def delete_from_db(embedded_chunks): + """ + Delete existing rows from the 'notes' table where the file_path matches + any file_path in the provided embedded_chunks list. + This ensures we don't violate the UNIQUE constraint when inserting new chunks. + """ + if not embedded_chunks: + print("No chunks to process; skipping deletion.") + return + + # Extract unique file_paths from the chunks + file_paths = set(normalize_path(entry["file_path"]) for entry in embedded_chunks) + + if not file_paths: + print("No file paths found in embedded_chunks; skipping deletion.") + return + + print(f"Deleting existing rows for {len(file_paths)} file(s)") + + con = turso.connect(DATABASE_PATH) + cur = con.cursor() + + # Use a single DELETE statement with IN clause for efficiency + placeholders = ", ".join("?" for _ in file_paths) + delete_sql = f"DELETE FROM notes WHERE file_path IN ({placeholders})" + + try: + cur.execute(delete_sql, list(file_paths)) + deleted_count = cur.rowcount + print(f"✅ Deleted {deleted_count} rows matching file paths.") + except Exception as e: + print(f"❌ Error deleting from database: {e}") + con.rollback() + finally: + con.close() + + def main(): create_db() @@ -270,6 +357,9 @@ def main(): embedded_chunks = embed_chunks(enriched_chunks) print(f"Embedded {len(embedded_chunks)} chunks.") + # remove existing rows from notes table that match file path + delete_from_db(embedded_chunks) + save_to_db(embedded_chunks) print("🎉 Ingestion complete!") @@ -277,31 +367,4 @@ def main(): print(f"Updated timefile to: {updated}") if __name__ == "__main__": - main() - -#TODO: create function to delete rows that match new files coming in -# and handle danabase insert fails -""" -Traceback (most recent call last): - File "/home/cosmic/source/dungeon_masters_vault/.venv/lib/python3.13/site-packages/turso/lib.py", line 643, in executemany - result = _run_execute_with_io(stmt, self._connection.extra_io) - File "/home/cosmic/source/dungeon_masters_vault/.venv/lib/python3.13/site-packages/turso/lib.py", line 163, in _run_execute_with_io - result = stmt.execute() -turso.Constraint: UNIQUE constraint failed: notes.(file_path, chunk_data) (19) - -During handling of the above exception, another exception occurred: - -Traceback (most recent call last): - File "/home/cosmic/source/dungeon_masters_vault/src/ingest.py", line 280, in - main() - ~~~~^^ - File "/home/cosmic/source/dungeon_masters_vault/src/ingest.py", line 273, in main - save_to_db(embedded_chunks) - ~~~~~~~~~~^^^^^^^^^^^^^^^^^ - File "/home/cosmic/source/dungeon_masters_vault/src/ingest.py", line 201, in save_to_db - cur.executemany(insert_sql, batch_data) - ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^ - File "/home/cosmic/source/dungeon_masters_vault/.venv/lib/python3.13/site-packages/turso/lib.py", line 656, in executemany - raise _map_turso_exception(exc) -turso.lib.IntegrityError: UNIQUE constraint failed: notes.(file_path, chunk_data) (19) -""" \ No newline at end of file + main() \ No newline at end of file