diff --git a/src/config.yaml b/src/config.yaml index af23feb..377153a 100644 --- a/src/config.yaml +++ b/src/config.yaml @@ -7,7 +7,7 @@ api: models: enrich: "lm_studio/qwen/qwen3-8b" embedding: "text-embedding-qwen3-embedding-8b" - retrieval: "lm_studio/qwen/qwen3-next-80b" + retrieval: "lm_studio/qwen/qwen3-30b-a3b-2507" # --- Ingestion Settings --- ingestion: diff --git a/src/ingest.py b/src/ingest.py index 7cc81cd..9188013 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -4,12 +4,10 @@ from pathlib import Path from datetime import datetime import dspy from langchain_community.document_loaders import TextLoader -from langchain_community.vectorstores import FAISS from langchain_text_splitters import RecursiveCharacterTextSplitter from tqdm import tqdm from typing import List, Dict, Any - from config_loader import load_config from embedding import LocalLMEmbeddings from experts.ingestion_agent import IngestionAgent @@ -21,14 +19,15 @@ MODEL_BASE = CFG["models"]["enrich"] EMBEDDING_MODEL = CFG["models"]["embedding"] API_BASE = CFG["api"]["base_url"] API_VERSION = CFG["api"]["api_version"] -MAX_WORKERS=CFG["ingestion"]["max_workers"] -CHUNK_SIZE=CFG["ingestion"]["chunk_size"] -CHUNK_OVERLAP=CFG["ingestion"]["chunk_overlap"] -EMBEDDING_BATCH_SIZE=CFG["ingestion"]["embedding_batch_size"] +MAX_WORKERS = CFG["ingestion"]["max_workers"] +CHUNK_SIZE = CFG["ingestion"]["chunk_size"] +CHUNK_OVERLAP = CFG["ingestion"]["chunk_overlap"] +EMBEDDING_BATCH_SIZE = CFG["ingestion"]["embedding_batch_size"] TIMEFILE = CFG["ingestion"]["time_file_location"] RIGHT_NOW = datetime.now().isoformat() + def load_documents(last_update_time): docs = [] data_path = Path(DATA_DIR) @@ -59,10 +58,12 @@ def load_documents(last_update_time): print(f"✅ Loaded: {len(docs)} Files") return docs + def normalize_path(path_str): """Convert string path to normalized absolute path.""" return str(Path(path_str).resolve()) + def chunk_documents(docs): # LangChain preserves metadata during splitting automatically text_splitter = RecursiveCharacterTextSplitter( @@ -77,11 +78,11 @@ def enrich_chunks(chunks: list) -> list: lm_index = idx % 8 try: - with dspy.context(lm=dspy.LM(model=MODEL_BASE, api_base=API_BASE+API_VERSION)): + with dspy.context(lm=dspy.LM(model=MODEL_BASE, api_base=API_BASE + API_VERSION)): response = IngestionAgent().ingest(note=chunk.page_content) - + # This is now an object, not a string! - metadata = response.answer + metadata = response.answer except Exception as e: print(f"⚠️ Failed for chunk {idx}: {e}") @@ -89,7 +90,6 @@ def enrich_chunks(chunks: list) -> list: chunk.metadata.update(metadata) return (idx, chunk) - enriched_results = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: @@ -132,7 +132,7 @@ def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> L # Process chunks in batches for i in tqdm(range(0, total_chunks, batch_size), desc="Embedding batches"): - batch = chunks[i:i + batch_size] + batch = chunks[i : i + batch_size] batch_content = [chunk.page_content for chunk in batch] try: @@ -163,7 +163,7 @@ def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> L "entities": entities, "embedding": embedding, "timestamp": RIGHT_NOW, - "original_index": i + j # Track original position + "original_index": i + j, # Track original position } embedded_chunks.append(chunk_data) @@ -192,35 +192,38 @@ def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> L "entities": entities, "embedding": embedding, "timestamp": RIGHT_NOW, - "original_index": i + j + "original_index": i + j, } embedded_chunks.append(chunk_data) except Exception as inner_e: print(f"❌ Failed to embed individual chunk {i + j}: {inner_e}") - embedded_chunks.append({ - "file_path": normalize_path(chunk.metadata.get("full_path", "unknown")), - "file_name": chunk.metadata.get("source", "unknown"), - "chunk_data": content, - "synopsis": "Embedding failed", - "tags": ["error"], - "entities": [], - "embedding": [], - "timestamp": RIGHT_NOW, - "original_index": i + j - }) + embedded_chunks.append( + { + "file_path": normalize_path(chunk.metadata.get("full_path", "unknown")), + "file_name": chunk.metadata.get("source", "unknown"), + "chunk_data": content, + "synopsis": "Embedding failed", + "tags": ["error"], + "entities": [], + "embedding": [], + "timestamp": RIGHT_NOW, + "original_index": i + j, + } + ) print(f"✅ Completed embedding {len(embedded_chunks)} chunks in batches of {batch_size}.") return embedded_chunks + def save_to_db(chunk_dicts): """ Save a list of dictionaries to the Turso database. Each dict maps to a row in the 'notes' table. """ - print('connecting to db') + print("connecting to db") con = turso.connect(DATABASE_PATH) - print('opening cursor') + print("opening cursor") cur = con.cursor() # SQL with named placeholders for clarity and safety @@ -236,17 +239,19 @@ def save_to_db(chunk_dicts): # Convert list of floats to comma-separated string for Turso vector32 embedding_str = str(entry["embedding"]) - batch_data.append(( - entry["file_path"], - entry["file_name"], - entry["chunk_data"], - entry["synopsis"], - ",".join(entry["tags"]), # Store as comma-separated string - ",".join(entry["entities"]), # Store as comma-separated string - embedding_str, - entry["timestamp"] - )) - print('data to insert:',len(batch_data)) + batch_data.append( + ( + entry["file_path"], + entry["file_name"], + entry["chunk_data"], + entry["synopsis"], + ",".join(entry["tags"]), # Store as comma-separated string + ",".join(entry["entities"]), # Store as comma-separated string + embedding_str, + entry["timestamp"], + ) + ) + print("data to insert:", len(batch_data)) # Execute batch insert cur.executemany(insert_sql, batch_data) con.commit() @@ -254,6 +259,7 @@ def save_to_db(chunk_dicts): print(f"✅ Saved {len(batch_data)} chunks to database.") + def create_db(): con = turso.connect(DATABASE_PATH) cur = con.cursor() @@ -271,7 +277,7 @@ def create_db(): timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) """) - # UNIQUE(file_path, chunk_data) -- avoid duplicates + # UNIQUE(file_path, chunk_data) -- avoid duplicates # Indexes for faster queries cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding ON notes(embedding);") @@ -283,16 +289,18 @@ def create_db(): con.close() print("✅ Database and indexes created.") + def get_last_update_time(): - try: + try: with open(TIMEFILE, "r") as file: last_update_str = file.read() - last_update = datetime.strptime(last_update_str,"%Y/%m/%d - %H:%M:%S") + last_update = datetime.strptime(last_update_str, "%Y/%m/%d - %H:%M:%S") except FileNotFoundError: print("File Not found, setting time to ingest all files") - last_update = datetime(year=2000,month=1,day=1) + last_update = datetime(year=2000, month=1, day=1) return last_update + def update_timefile(): current_time = datetime.now() current_time_str = current_time.strftime("%Y/%m/%d - %H:%M:%S") @@ -300,6 +308,7 @@ def update_timefile(): file.write(current_time_str) return current_time_str + def delete_from_db(embedded_chunks): """ Delete existing rows from the 'notes' table where the file_path matches @@ -337,7 +346,6 @@ def delete_from_db(embedded_chunks): con.close() - def main(): create_db() last_update_time = get_last_update_time() @@ -366,5 +374,7 @@ def main(): updated = update_timefile() print(f"Updated timefile to: {updated}") + if __name__ == "__main__": - main() \ No newline at end of file + main() +