This commit is contained in:
2026-02-11 07:32:38 +00:00
parent 8d0a74e865
commit 09508931af
2 changed files with 54 additions and 44 deletions
+1 -1
View File
@@ -7,7 +7,7 @@ api:
models: models:
enrich: "lm_studio/qwen/qwen3-8b" enrich: "lm_studio/qwen/qwen3-8b"
embedding: "text-embedding-qwen3-embedding-8b" embedding: "text-embedding-qwen3-embedding-8b"
retrieval: "lm_studio/qwen/qwen3-next-80b" retrieval: "lm_studio/qwen/qwen3-30b-a3b-2507"
# --- Ingestion Settings --- # --- Ingestion Settings ---
ingestion: ingestion:
+53 -43
View File
@@ -4,12 +4,10 @@ from pathlib import Path
from datetime import datetime from datetime import datetime
import dspy import dspy
from langchain_community.document_loaders import TextLoader from langchain_community.document_loaders import TextLoader
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_text_splitters import RecursiveCharacterTextSplitter
from tqdm import tqdm from tqdm import tqdm
from typing import List, Dict, Any from typing import List, Dict, Any
from config_loader import load_config from config_loader import load_config
from embedding import LocalLMEmbeddings from embedding import LocalLMEmbeddings
from experts.ingestion_agent import IngestionAgent from experts.ingestion_agent import IngestionAgent
@@ -21,14 +19,15 @@ MODEL_BASE = CFG["models"]["enrich"]
EMBEDDING_MODEL = CFG["models"]["embedding"] EMBEDDING_MODEL = CFG["models"]["embedding"]
API_BASE = CFG["api"]["base_url"] API_BASE = CFG["api"]["base_url"]
API_VERSION = CFG["api"]["api_version"] API_VERSION = CFG["api"]["api_version"]
MAX_WORKERS=CFG["ingestion"]["max_workers"] MAX_WORKERS = CFG["ingestion"]["max_workers"]
CHUNK_SIZE=CFG["ingestion"]["chunk_size"] CHUNK_SIZE = CFG["ingestion"]["chunk_size"]
CHUNK_OVERLAP=CFG["ingestion"]["chunk_overlap"] CHUNK_OVERLAP = CFG["ingestion"]["chunk_overlap"]
EMBEDDING_BATCH_SIZE=CFG["ingestion"]["embedding_batch_size"] EMBEDDING_BATCH_SIZE = CFG["ingestion"]["embedding_batch_size"]
TIMEFILE = CFG["ingestion"]["time_file_location"] TIMEFILE = CFG["ingestion"]["time_file_location"]
RIGHT_NOW = datetime.now().isoformat() RIGHT_NOW = datetime.now().isoformat()
def load_documents(last_update_time): def load_documents(last_update_time):
docs = [] docs = []
data_path = Path(DATA_DIR) data_path = Path(DATA_DIR)
@@ -59,10 +58,12 @@ def load_documents(last_update_time):
print(f"✅ Loaded: {len(docs)} Files") print(f"✅ Loaded: {len(docs)} Files")
return docs return docs
def normalize_path(path_str): def normalize_path(path_str):
"""Convert string path to normalized absolute path.""" """Convert string path to normalized absolute path."""
return str(Path(path_str).resolve()) return str(Path(path_str).resolve())
def chunk_documents(docs): def chunk_documents(docs):
# LangChain preserves metadata during splitting automatically # LangChain preserves metadata during splitting automatically
text_splitter = RecursiveCharacterTextSplitter( text_splitter = RecursiveCharacterTextSplitter(
@@ -77,11 +78,11 @@ def enrich_chunks(chunks: list) -> list:
lm_index = idx % 8 lm_index = idx % 8
try: try:
with dspy.context(lm=dspy.LM(model=MODEL_BASE, api_base=API_BASE+API_VERSION)): with dspy.context(lm=dspy.LM(model=MODEL_BASE, api_base=API_BASE + API_VERSION)):
response = IngestionAgent().ingest(note=chunk.page_content) response = IngestionAgent().ingest(note=chunk.page_content)
# This is now an object, not a string! # This is now an object, not a string!
metadata = response.answer metadata = response.answer
except Exception as e: except Exception as e:
print(f"⚠️ Failed for chunk {idx}: {e}") print(f"⚠️ Failed for chunk {idx}: {e}")
@@ -89,7 +90,6 @@ def enrich_chunks(chunks: list) -> list:
chunk.metadata.update(metadata) chunk.metadata.update(metadata)
return (idx, chunk) return (idx, chunk)
enriched_results = [] enriched_results = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
@@ -132,7 +132,7 @@ def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> L
# Process chunks in batches # Process chunks in batches
for i in tqdm(range(0, total_chunks, batch_size), desc="Embedding batches"): for i in tqdm(range(0, total_chunks, batch_size), desc="Embedding batches"):
batch = chunks[i:i + batch_size] batch = chunks[i : i + batch_size]
batch_content = [chunk.page_content for chunk in batch] batch_content = [chunk.page_content for chunk in batch]
try: try:
@@ -163,7 +163,7 @@ def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> L
"entities": entities, "entities": entities,
"embedding": embedding, "embedding": embedding,
"timestamp": RIGHT_NOW, "timestamp": RIGHT_NOW,
"original_index": i + j # Track original position "original_index": i + j, # Track original position
} }
embedded_chunks.append(chunk_data) embedded_chunks.append(chunk_data)
@@ -192,35 +192,38 @@ def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> L
"entities": entities, "entities": entities,
"embedding": embedding, "embedding": embedding,
"timestamp": RIGHT_NOW, "timestamp": RIGHT_NOW,
"original_index": i + j "original_index": i + j,
} }
embedded_chunks.append(chunk_data) embedded_chunks.append(chunk_data)
except Exception as inner_e: except Exception as inner_e:
print(f"❌ Failed to embed individual chunk {i + j}: {inner_e}") print(f"❌ Failed to embed individual chunk {i + j}: {inner_e}")
embedded_chunks.append({ embedded_chunks.append(
"file_path": normalize_path(chunk.metadata.get("full_path", "unknown")), {
"file_name": chunk.metadata.get("source", "unknown"), "file_path": normalize_path(chunk.metadata.get("full_path", "unknown")),
"chunk_data": content, "file_name": chunk.metadata.get("source", "unknown"),
"synopsis": "Embedding failed", "chunk_data": content,
"tags": ["error"], "synopsis": "Embedding failed",
"entities": [], "tags": ["error"],
"embedding": [], "entities": [],
"timestamp": RIGHT_NOW, "embedding": [],
"original_index": i + j "timestamp": RIGHT_NOW,
}) "original_index": i + j,
}
)
print(f"✅ Completed embedding {len(embedded_chunks)} chunks in batches of {batch_size}.") print(f"✅ Completed embedding {len(embedded_chunks)} chunks in batches of {batch_size}.")
return embedded_chunks return embedded_chunks
def save_to_db(chunk_dicts): def save_to_db(chunk_dicts):
""" """
Save a list of dictionaries to the Turso database. Save a list of dictionaries to the Turso database.
Each dict maps to a row in the 'notes' table. Each dict maps to a row in the 'notes' table.
""" """
print('connecting to db') print("connecting to db")
con = turso.connect(DATABASE_PATH) con = turso.connect(DATABASE_PATH)
print('opening cursor') print("opening cursor")
cur = con.cursor() cur = con.cursor()
# SQL with named placeholders for clarity and safety # SQL with named placeholders for clarity and safety
@@ -236,17 +239,19 @@ def save_to_db(chunk_dicts):
# Convert list of floats to comma-separated string for Turso vector32 # Convert list of floats to comma-separated string for Turso vector32
embedding_str = str(entry["embedding"]) embedding_str = str(entry["embedding"])
batch_data.append(( batch_data.append(
entry["file_path"], (
entry["file_name"], entry["file_path"],
entry["chunk_data"], entry["file_name"],
entry["synopsis"], entry["chunk_data"],
",".join(entry["tags"]), # Store as comma-separated string entry["synopsis"],
",".join(entry["entities"]), # Store as comma-separated string ",".join(entry["tags"]), # Store as comma-separated string
embedding_str, ",".join(entry["entities"]), # Store as comma-separated string
entry["timestamp"] embedding_str,
)) entry["timestamp"],
print('data to insert:',len(batch_data)) )
)
print("data to insert:", len(batch_data))
# Execute batch insert # Execute batch insert
cur.executemany(insert_sql, batch_data) cur.executemany(insert_sql, batch_data)
con.commit() con.commit()
@@ -254,6 +259,7 @@ def save_to_db(chunk_dicts):
print(f"✅ Saved {len(batch_data)} chunks to database.") print(f"✅ Saved {len(batch_data)} chunks to database.")
def create_db(): def create_db():
con = turso.connect(DATABASE_PATH) con = turso.connect(DATABASE_PATH)
cur = con.cursor() cur = con.cursor()
@@ -271,7 +277,7 @@ def create_db():
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
) )
""") """)
# UNIQUE(file_path, chunk_data) -- avoid duplicates # UNIQUE(file_path, chunk_data) -- avoid duplicates
# Indexes for faster queries # Indexes for faster queries
cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding ON notes(embedding);") cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding ON notes(embedding);")
@@ -283,16 +289,18 @@ def create_db():
con.close() con.close()
print("✅ Database and indexes created.") print("✅ Database and indexes created.")
def get_last_update_time(): def get_last_update_time():
try: try:
with open(TIMEFILE, "r") as file: with open(TIMEFILE, "r") as file:
last_update_str = file.read() last_update_str = file.read()
last_update = datetime.strptime(last_update_str,"%Y/%m/%d - %H:%M:%S") last_update = datetime.strptime(last_update_str, "%Y/%m/%d - %H:%M:%S")
except FileNotFoundError: except FileNotFoundError:
print("File Not found, setting time to ingest all files") print("File Not found, setting time to ingest all files")
last_update = datetime(year=2000,month=1,day=1) last_update = datetime(year=2000, month=1, day=1)
return last_update return last_update
def update_timefile(): def update_timefile():
current_time = datetime.now() current_time = datetime.now()
current_time_str = current_time.strftime("%Y/%m/%d - %H:%M:%S") current_time_str = current_time.strftime("%Y/%m/%d - %H:%M:%S")
@@ -300,6 +308,7 @@ def update_timefile():
file.write(current_time_str) file.write(current_time_str)
return current_time_str return current_time_str
def delete_from_db(embedded_chunks): def delete_from_db(embedded_chunks):
""" """
Delete existing rows from the 'notes' table where the file_path matches Delete existing rows from the 'notes' table where the file_path matches
@@ -337,7 +346,6 @@ def delete_from_db(embedded_chunks):
con.close() con.close()
def main(): def main():
create_db() create_db()
last_update_time = get_last_update_time() last_update_time = get_last_update_time()
@@ -366,5 +374,7 @@ def main():
updated = update_timefile() updated = update_timefile()
print(f"Updated timefile to: {updated}") print(f"Updated timefile to: {updated}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()