from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path from typing import Any, Dict, List import dspy import turso from langchain_community.document_loaders import TextLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from tqdm import tqdm from config_loader import load_config from embedding import LocalLMEmbeddings from experts.ingestion_agent import IngestionAgent from toon_utils import save_entities_from_chunks CFG = load_config() DATA_DIR = CFG["ingestion"]["data_dir"] DATABASE_PATH = CFG["ingestion"]["db_path"] DATABASE_NAME = CFG["ingestion"]["db_name"] MODEL_BASE = CFG["models"]["enrich"] EMBEDDING_MODEL = CFG["models"]["embedding"] API_BASE = CFG["api"]["base_url"] API_VERSION = CFG["api"]["api_version"] # MAX_WORKERS = CFG["ingestion"]["max_workers"] ACTIVE_LLMS = CFG["ingestion"]["active_llms"] PARALLEL_REQUESTS_PER_LLM = CFG["ingestion"]["parallel_requests_per_llm"] CHUNK_SIZE = CFG["ingestion"]["chunk_size"] CHUNK_OVERLAP = CFG["ingestion"]["chunk_overlap"] EMBEDDING_BATCH_SIZE = CFG["ingestion"]["embedding_batch_size"] TIMEFILE = CFG["ingestion"]["time_file_location"] RIGHT_NOW = datetime.now().isoformat() def load_documents(last_update_time): docs = [] data_path = Path(DATA_DIR) if not data_path.exists() or not data_path.is_dir(): print(f"⚠️ Data directory '{DATA_DIR}' does not exist.") return docs for file_path in data_path.rglob("*.md"): file_modified_date = datetime.fromtimestamp(file_path.stat().st_mtime) if file_modified_date < last_update_time: continue try: loader = TextLoader(str(file_path)) loaded_docs = loader.load() for doc in loaded_docs: # Ensure these keys are set before splitting doc.metadata["source"] = file_path.name doc.metadata["full_path"] = str(file_path.absolute()) docs.extend(loaded_docs) # print(f"✅ Loaded: {file_path.name}") except Exception as e: print(f"❌ Failed to load {file_path}: {e}") print(f"✅ Loaded: {len(docs)} Files") return docs def normalize_path(path_str): """Convert string path to normalized absolute path.""" return str(Path(path_str).resolve()) def chunk_documents(docs): # LangChain preserves metadata during splitting automatically text_splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separators=["\n\n", "\n", ". ", " ", ""] ) return text_splitter.split_documents(docs) def enrich_chunks(chunks: list) -> list: def process_single_chunk(indexed_chunk): idx, chunk = indexed_chunk lm_index = idx % ACTIVE_LLMS try: with dspy.context( lm=dspy.LM(model=f"{MODEL_BASE}{lm_index}", api_base=API_BASE + API_VERSION), chat_template_kwargs={"enable_thinking": False}, ): response = IngestionAgent().ingest(note=chunk.page_content) # This is now an object, not a string! metadata = response.answer except Exception as e: print(f"⚠️ Failed for chunk {idx}: {e}") metadata = {"synopsis": "Summary failed", "tags": ["error"], "entities": []} chunk.metadata.update(metadata) return (idx, chunk) enriched_results = [] with ThreadPoolExecutor(max_workers=PARALLEL_REQUESTS_PER_LLM * ACTIVE_LLMS) as executor: # Wrap chunks in enumerate to keep track of order futures = [executor.submit(process_single_chunk, (i, c)) for i, c in enumerate(chunks)] for future in tqdm(as_completed(futures), total=len(chunks), desc="Enriching chunks"): enriched_results.append(future.result()) # Sort by the index (first element of tuple) and return only the chunk enriched_results.sort(key=lambda x: x[0]) return [item[1] for item in enriched_results] def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> List[Dict[str, Any]]: """ Embed chunks in batches using the model's batch processing capability. Each batch is processed efficiently in parallel. Args: chunks: List of document chunks (e.g., from LangChain's Document objects) batch_size: Number of chunks to embed per batch (default: 32) Returns: List of dictionaries with full metadata and embeddings. """ # Initialize embedding model with batch support embeddings_model = LocalLMEmbeddings( model=EMBEDDING_MODEL, base_url=API_BASE, batch_size=batch_size, # Let the model handle batching internally ) # Prepare output list embedded_chunks = [] total_chunks = len(chunks) print(f"🚀 Starting batch embedding of {total_chunks} chunks in batches of {batch_size}...") # Process chunks in batches for i in tqdm(range(0, total_chunks, batch_size), desc="Embedding batches"): batch = chunks[i : i + batch_size] print(f"🚀 Processing batch {(i // batch_size) + 1} (Size: {len(batch)})...") batch_content = [chunk.page_content for chunk in batch] try: batch_embeddings = embeddings_model.embed_documents(batch_content) # Process each chunk in the batch for j, (chunk, embedding) in enumerate(zip(batch, batch_embeddings)): # Extract metadata file_path_orig = chunk.metadata.get("full_path", "unknown") file_path = normalize_path(file_path_orig) file_name = chunk.metadata.get("source", "unknown") content = chunk.page_content synopsis = chunk.metadata.get("synopsis", "No summary") tags = chunk.metadata.get("tags", []) entities = chunk.metadata.get("entities", []) # Create structured dictionary chunk_data = { "file_path": file_path, "file_name": file_name, "chunk_data": content, "synopsis": synopsis, "tags": tags, "entities": entities, "embedding": embedding, "timestamp": RIGHT_NOW, "original_index": i + j, # Track original position } embedded_chunks.append(chunk_data) except Exception as e: print(f"⚠️ Batch processing failed at index {i}: {e}") # Fallback: process individually (if needed) for j, chunk in enumerate(batch): try: content = chunk.page_content embedding = embeddings_model.embed_query(content) file_path_orig = chunk.metadata.get("full_path", "unknown") file_path = normalize_path(file_path_orig) file_name = chunk.metadata.get("source", "unknown") synopsis = chunk.metadata.get("synopsis", "No summary") tags = chunk.metadata.get("tags", []) entities = chunk.metadata.get("entities", []) chunk_data = { "file_path": file_path, "file_name": file_name, "chunk_data": content, "synopsis": synopsis, "tags": tags, "entities": entities, "embedding": embedding, "timestamp": RIGHT_NOW, "original_index": i + j, } embedded_chunks.append(chunk_data) except Exception as inner_e: print(f"❌ Failed to embed individual chunk {i + j}: {inner_e}") embedded_chunks.append( { "file_path": normalize_path(chunk.metadata.get("full_path", "unknown")), "file_name": chunk.metadata.get("source", "unknown"), "chunk_data": chunk.page_content, "synopsis": "Embedding failed", "tags": ["error"], "entities": [], "embedding": [], "timestamp": RIGHT_NOW, "original_index": i + j, } ) print(f"✅ Completed embedding {len(embedded_chunks)} chunks in batches of {batch_size}.") return embedded_chunks def save_to_db(chunk_dicts): """ Save a list of dictionaries to the Turso database. Each dict maps to a row in the 'notes' table. """ print("connecting to db") con = turso.connect(DATABASE_PATH + DATABASE_NAME) print("opening cursor") cur = con.cursor() # SQL with named placeholders for clarity and safety insert_sql = """ INSERT INTO notes ( file_path, file_name, chunk_data, synopsis, tags, entities, embedding, timestamp ) VALUES (?, ?, ?, ?, ?, ?, vector32(?), ?) """ # Prepare batch data: convert each dict to a tuple in correct order batch_data = [] for entry in chunk_dicts: # Convert list of floats to comma-separated string for Turso vector32 embedding_str = str(entry["embedding"]) batch_data.append( ( entry["file_path"], entry["file_name"], entry["chunk_data"], entry["synopsis"], ",".join(entry["tags"]), # Store as comma-separated string ",".join(e.get("name", str(e)) if isinstance(e, dict) else str(e) for e in entry["entities"]), # Store as comma-separated string embedding_str, entry["timestamp"], ) ) print("data to insert:", len(batch_data)) # Execute batch insert cur.executemany(insert_sql, batch_data) con.commit() con.close() print(f"✅ Saved {len(batch_data)} chunks to database.") def create_db(): Path(DATABASE_PATH).mkdir(exist_ok=True) con = turso.connect(DATABASE_PATH + DATABASE_NAME) cur = con.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS notes ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT NOT NULL, file_name TEXT NOT NULL, chunk_data TEXT NOT NULL, synopsis TEXT, tags TEXT, -- comma-separated entities TEXT, -- comma-separated embedding F32_BLOB(4096), timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # UNIQUE(file_path, chunk_data) -- avoid duplicates # Indexes for faster queries cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding ON notes(embedding);") cur.execute("CREATE INDEX IF NOT EXISTS idx_file_path ON notes(file_path);") cur.execute("CREATE INDEX IF NOT EXISTS idx_tags ON notes(tags);") cur.execute("CREATE INDEX IF NOT EXISTS idx_synopsis ON notes(synopsis);") con.commit() con.close() print("✅ Database and indexes created.") def get_last_update_time(): try: with open(TIMEFILE, "r") as file: last_update_str = file.read() last_update = datetime.strptime(last_update_str, "%Y/%m/%d - %H:%M:%S") except FileNotFoundError: print("File Not found, setting time to ingest all files") last_update = datetime(year=2000, month=1, day=1) return last_update def update_timefile(): current_time = datetime.now() current_time_str = current_time.strftime("%Y/%m/%d - %H:%M:%S") with open(TIMEFILE, "w") as file: file.write(current_time_str) return current_time_str def delete_from_db(embedded_chunks): """ Delete existing rows from the 'notes' table where the file_path matches any file_path in the provided embedded_chunks list. This ensures we don't violate the UNIQUE constraint when inserting new chunks. """ if not embedded_chunks: print("No chunks to process; skipping deletion.") return # Extract unique file_paths from the chunks file_paths = set(normalize_path(entry["file_path"]) for entry in embedded_chunks) if not file_paths: print("No file paths found in embedded_chunks; skipping deletion.") return print(f"Deleting existing rows for {len(file_paths)} file(s)") con = turso.connect(DATABASE_PATH + DATABASE_NAME) cur = con.cursor() # Use a single DELETE statement with IN clause for efficiency placeholders = ", ".join("?" for _ in file_paths) delete_sql = f"DELETE FROM notes WHERE file_path IN ({placeholders})" try: cur.execute(delete_sql, list(file_paths)) deleted_count = cur.rowcount print(f"✅ Deleted {deleted_count} rows matching file paths.") except Exception as e: print(f"❌ Error deleting from database: {e}") con.rollback() finally: con.close() def main(): create_db() last_update_time = get_last_update_time() print(f"Last update time: {last_update_time}") docs = load_documents(last_update_time) if not docs: print("No Recently Updated Files to Ingest") return chunks = chunk_documents(docs) print(f"Split into {len(chunks)} chunks.") enriched_chunks = enrich_chunks(chunks) print(f"Enriched {len(enriched_chunks)} chunks.") embedded_chunks = embed_chunks(enriched_chunks) print(f"Embedded {len(embedded_chunks)} chunks.") save_entities_from_chunks(embedded_chunks) # remove existing rows from notes table that match file path delete_from_db(embedded_chunks) save_to_db(embedded_chunks) print("🎉 Ingestion complete!") updated = update_timefile() print(f"Updated timefile to: {updated}") if __name__ == "__main__": main()