388 lines
14 KiB
Python
388 lines
14 KiB
Python
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List
|
|
|
|
import dspy
|
|
import turso
|
|
from langchain_community.document_loaders import TextLoader
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
|
from tqdm import tqdm
|
|
|
|
from config_loader import load_config
|
|
from embedding import LocalLMEmbeddings
|
|
from experts.ingestion_agent import IngestionAgent
|
|
from toon_utils import save_entities_from_chunks
|
|
|
|
CFG = load_config()
|
|
DATA_DIR = CFG["ingestion"]["data_dir"]
|
|
DATABASE_PATH = CFG["ingestion"]["db_path"]
|
|
DATABASE_NAME = CFG["ingestion"]["db_name"]
|
|
MODEL_BASE = CFG["models"]["enrich"]
|
|
EMBEDDING_MODEL = CFG["models"]["embedding"]
|
|
API_BASE = CFG["api"]["base_url"]
|
|
API_VERSION = CFG["api"]["api_version"]
|
|
# MAX_WORKERS = CFG["ingestion"]["max_workers"]
|
|
ACTIVE_LLMS = CFG["ingestion"]["active_llms"]
|
|
PARALLEL_REQUESTS_PER_LLM = CFG["ingestion"]["parallel_requests_per_llm"]
|
|
CHUNK_SIZE = CFG["ingestion"]["chunk_size"]
|
|
CHUNK_OVERLAP = CFG["ingestion"]["chunk_overlap"]
|
|
EMBEDDING_BATCH_SIZE = CFG["ingestion"]["embedding_batch_size"]
|
|
TIMEFILE = CFG["ingestion"]["time_file_location"]
|
|
|
|
RIGHT_NOW = datetime.now().isoformat()
|
|
|
|
|
|
def load_documents(last_update_time):
|
|
docs = []
|
|
data_path = Path(DATA_DIR)
|
|
|
|
if not data_path.exists() or not data_path.is_dir():
|
|
print(f"⚠️ Data directory '{DATA_DIR}' does not exist.")
|
|
return docs
|
|
|
|
for file_path in data_path.rglob("*.md"):
|
|
file_modified_date = datetime.fromtimestamp(file_path.stat().st_mtime)
|
|
|
|
if file_modified_date < last_update_time:
|
|
continue
|
|
try:
|
|
loader = TextLoader(str(file_path))
|
|
loaded_docs = loader.load()
|
|
|
|
for doc in loaded_docs:
|
|
# Ensure these keys are set before splitting
|
|
doc.metadata["source"] = file_path.name
|
|
doc.metadata["full_path"] = str(file_path.absolute())
|
|
|
|
docs.extend(loaded_docs)
|
|
# print(f"✅ Loaded: {file_path.name}")
|
|
except Exception as e:
|
|
print(f"❌ Failed to load {file_path}: {e}")
|
|
|
|
print(f"✅ Loaded: {len(docs)} Files")
|
|
return docs
|
|
|
|
|
|
def normalize_path(path_str):
|
|
"""Convert string path to normalized absolute path."""
|
|
return str(Path(path_str).resolve())
|
|
|
|
|
|
def chunk_documents(docs):
|
|
# LangChain preserves metadata during splitting automatically
|
|
text_splitter = RecursiveCharacterTextSplitter(
|
|
chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separators=["\n\n", "\n", ". ", " ", ""]
|
|
)
|
|
return text_splitter.split_documents(docs)
|
|
|
|
|
|
def enrich_chunks(chunks: list) -> list:
|
|
def process_single_chunk(indexed_chunk):
|
|
idx, chunk = indexed_chunk
|
|
lm_index = idx % ACTIVE_LLMS
|
|
|
|
try:
|
|
with dspy.context(
|
|
lm=dspy.LM(model=f"{MODEL_BASE}{lm_index}", api_base=API_BASE + API_VERSION),
|
|
chat_template_kwargs={"enable_thinking": False},
|
|
):
|
|
response = IngestionAgent().ingest(note=chunk.page_content)
|
|
|
|
# This is now an object, not a string!
|
|
metadata = response.answer
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Failed for chunk {idx}: {e}")
|
|
metadata = {"synopsis": "Summary failed", "tags": ["error"], "entities": []}
|
|
|
|
chunk.metadata.update(metadata)
|
|
return (idx, chunk)
|
|
|
|
enriched_results = []
|
|
with ThreadPoolExecutor(max_workers=PARALLEL_REQUESTS_PER_LLM * ACTIVE_LLMS) as executor:
|
|
# Wrap chunks in enumerate to keep track of order
|
|
futures = [executor.submit(process_single_chunk, (i, c)) for i, c in enumerate(chunks)]
|
|
|
|
for future in tqdm(as_completed(futures), total=len(chunks), desc="Enriching chunks"):
|
|
enriched_results.append(future.result())
|
|
|
|
# Sort by the index (first element of tuple) and return only the chunk
|
|
enriched_results.sort(key=lambda x: x[0])
|
|
return [item[1] for item in enriched_results]
|
|
|
|
|
|
def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> List[Dict[str, Any]]:
|
|
"""
|
|
Embed chunks in batches using the model's batch processing capability.
|
|
Each batch is processed efficiently in parallel.
|
|
|
|
Args:
|
|
chunks: List of document chunks (e.g., from LangChain's Document objects)
|
|
batch_size: Number of chunks to embed per batch (default: 32)
|
|
|
|
Returns:
|
|
List of dictionaries with full metadata and embeddings.
|
|
"""
|
|
# Initialize embedding model with batch support
|
|
embeddings_model = LocalLMEmbeddings(
|
|
model=EMBEDDING_MODEL,
|
|
base_url=API_BASE,
|
|
batch_size=batch_size, # Let the model handle batching internally
|
|
)
|
|
|
|
# Prepare output list
|
|
embedded_chunks = []
|
|
|
|
total_chunks = len(chunks)
|
|
|
|
print(f"🚀 Starting batch embedding of {total_chunks} chunks in batches of {batch_size}...")
|
|
|
|
# Process chunks in batches
|
|
for i in tqdm(range(0, total_chunks, batch_size), desc="Embedding batches"):
|
|
batch = chunks[i : i + batch_size]
|
|
print(f"🚀 Processing batch {(i // batch_size) + 1} (Size: {len(batch)})...")
|
|
batch_content = [chunk.page_content for chunk in batch]
|
|
try:
|
|
batch_embeddings = embeddings_model.embed_documents(batch_content)
|
|
# Process each chunk in the batch
|
|
for j, (chunk, embedding) in enumerate(zip(batch, batch_embeddings)):
|
|
# Extract metadata
|
|
file_path_orig = chunk.metadata.get("full_path", "unknown")
|
|
file_path = normalize_path(file_path_orig)
|
|
|
|
file_name = chunk.metadata.get("source", "unknown")
|
|
content = chunk.page_content
|
|
|
|
synopsis = chunk.metadata.get("synopsis", "No summary")
|
|
tags = chunk.metadata.get("tags", [])
|
|
entities = chunk.metadata.get("entities", [])
|
|
|
|
# Create structured dictionary
|
|
chunk_data = {
|
|
"file_path": file_path,
|
|
"file_name": file_name,
|
|
"chunk_data": content,
|
|
"synopsis": synopsis,
|
|
"tags": tags,
|
|
"entities": entities,
|
|
"embedding": embedding,
|
|
"timestamp": RIGHT_NOW,
|
|
"original_index": i + j, # Track original position
|
|
}
|
|
|
|
embedded_chunks.append(chunk_data)
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Batch processing failed at index {i}: {e}")
|
|
# Fallback: process individually (if needed)
|
|
for j, chunk in enumerate(batch):
|
|
try:
|
|
content = chunk.page_content
|
|
embedding = embeddings_model.embed_query(content)
|
|
|
|
file_path_orig = chunk.metadata.get("full_path", "unknown")
|
|
file_path = normalize_path(file_path_orig)
|
|
file_name = chunk.metadata.get("source", "unknown")
|
|
synopsis = chunk.metadata.get("synopsis", "No summary")
|
|
tags = chunk.metadata.get("tags", [])
|
|
entities = chunk.metadata.get("entities", [])
|
|
|
|
chunk_data = {
|
|
"file_path": file_path,
|
|
"file_name": file_name,
|
|
"chunk_data": content,
|
|
"synopsis": synopsis,
|
|
"tags": tags,
|
|
"entities": entities,
|
|
"embedding": embedding,
|
|
"timestamp": RIGHT_NOW,
|
|
"original_index": i + j,
|
|
}
|
|
embedded_chunks.append(chunk_data)
|
|
|
|
except Exception as inner_e:
|
|
print(f"❌ Failed to embed individual chunk {i + j}: {inner_e}")
|
|
embedded_chunks.append(
|
|
{
|
|
"file_path": normalize_path(chunk.metadata.get("full_path", "unknown")),
|
|
"file_name": chunk.metadata.get("source", "unknown"),
|
|
"chunk_data": chunk.page_content,
|
|
"synopsis": "Embedding failed",
|
|
"tags": ["error"],
|
|
"entities": [],
|
|
"embedding": [],
|
|
"timestamp": RIGHT_NOW,
|
|
"original_index": i + j,
|
|
}
|
|
)
|
|
|
|
print(f"✅ Completed embedding {len(embedded_chunks)} chunks in batches of {batch_size}.")
|
|
return embedded_chunks
|
|
|
|
|
|
def save_to_db(chunk_dicts):
|
|
"""
|
|
Save a list of dictionaries to the Turso database.
|
|
Each dict maps to a row in the 'notes' table.
|
|
"""
|
|
print("connecting to db")
|
|
con = turso.connect(DATABASE_PATH + DATABASE_NAME)
|
|
print("opening cursor")
|
|
cur = con.cursor()
|
|
|
|
# SQL with named placeholders for clarity and safety
|
|
insert_sql = """
|
|
INSERT INTO notes (
|
|
file_path, file_name, chunk_data, synopsis, tags, entities, embedding, timestamp
|
|
) VALUES (?, ?, ?, ?, ?, ?, vector32(?), ?)
|
|
"""
|
|
|
|
# Prepare batch data: convert each dict to a tuple in correct order
|
|
batch_data = []
|
|
for entry in chunk_dicts:
|
|
# Convert list of floats to comma-separated string for Turso vector32
|
|
embedding_str = str(entry["embedding"])
|
|
|
|
batch_data.append(
|
|
(
|
|
entry["file_path"],
|
|
entry["file_name"],
|
|
entry["chunk_data"],
|
|
entry["synopsis"],
|
|
",".join(entry["tags"]), # Store as comma-separated string
|
|
",".join(e.get("name", str(e)) if isinstance(e, dict) else str(e) for e in entry["entities"]), # Store as comma-separated string
|
|
embedding_str,
|
|
entry["timestamp"],
|
|
)
|
|
)
|
|
print("data to insert:", len(batch_data))
|
|
# Execute batch insert
|
|
cur.executemany(insert_sql, batch_data)
|
|
con.commit()
|
|
con.close()
|
|
|
|
print(f"✅ Saved {len(batch_data)} chunks to database.")
|
|
|
|
|
|
def create_db():
|
|
Path(DATABASE_PATH).mkdir(exist_ok=True)
|
|
con = turso.connect(DATABASE_PATH + DATABASE_NAME)
|
|
cur = con.cursor()
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS notes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
file_path TEXT NOT NULL,
|
|
file_name TEXT NOT NULL,
|
|
chunk_data TEXT NOT NULL,
|
|
synopsis TEXT,
|
|
tags TEXT, -- comma-separated
|
|
entities TEXT, -- comma-separated
|
|
embedding F32_BLOB(4096),
|
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
# UNIQUE(file_path, chunk_data) -- avoid duplicates
|
|
|
|
# Indexes for faster queries
|
|
cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding ON notes(embedding);")
|
|
cur.execute("CREATE INDEX IF NOT EXISTS idx_file_path ON notes(file_path);")
|
|
cur.execute("CREATE INDEX IF NOT EXISTS idx_tags ON notes(tags);")
|
|
cur.execute("CREATE INDEX IF NOT EXISTS idx_synopsis ON notes(synopsis);")
|
|
|
|
con.commit()
|
|
con.close()
|
|
print("✅ Database and indexes created.")
|
|
|
|
|
|
def get_last_update_time():
|
|
try:
|
|
with open(TIMEFILE, "r") as file:
|
|
last_update_str = file.read()
|
|
last_update = datetime.strptime(last_update_str, "%Y/%m/%d - %H:%M:%S")
|
|
except FileNotFoundError:
|
|
print("File Not found, setting time to ingest all files")
|
|
last_update = datetime(year=2000, month=1, day=1)
|
|
return last_update
|
|
|
|
|
|
def update_timefile():
|
|
current_time = datetime.now()
|
|
current_time_str = current_time.strftime("%Y/%m/%d - %H:%M:%S")
|
|
with open(TIMEFILE, "w") as file:
|
|
file.write(current_time_str)
|
|
return current_time_str
|
|
|
|
|
|
def delete_from_db(embedded_chunks):
|
|
"""
|
|
Delete existing rows from the 'notes' table where the file_path matches
|
|
any file_path in the provided embedded_chunks list.
|
|
This ensures we don't violate the UNIQUE constraint when inserting new chunks.
|
|
"""
|
|
if not embedded_chunks:
|
|
print("No chunks to process; skipping deletion.")
|
|
return
|
|
|
|
# Extract unique file_paths from the chunks
|
|
file_paths = set(normalize_path(entry["file_path"]) for entry in embedded_chunks)
|
|
|
|
if not file_paths:
|
|
print("No file paths found in embedded_chunks; skipping deletion.")
|
|
return
|
|
|
|
print(f"Deleting existing rows for {len(file_paths)} file(s)")
|
|
|
|
con = turso.connect(DATABASE_PATH + DATABASE_NAME)
|
|
cur = con.cursor()
|
|
|
|
# Use a single DELETE statement with IN clause for efficiency
|
|
placeholders = ", ".join("?" for _ in file_paths)
|
|
delete_sql = f"DELETE FROM notes WHERE file_path IN ({placeholders})"
|
|
|
|
try:
|
|
cur.execute(delete_sql, list(file_paths))
|
|
deleted_count = cur.rowcount
|
|
print(f"✅ Deleted {deleted_count} rows matching file paths.")
|
|
except Exception as e:
|
|
print(f"❌ Error deleting from database: {e}")
|
|
con.rollback()
|
|
finally:
|
|
con.close()
|
|
|
|
|
|
def main():
|
|
create_db()
|
|
last_update_time = get_last_update_time()
|
|
print(f"Last update time: {last_update_time}")
|
|
|
|
docs = load_documents(last_update_time)
|
|
if not docs:
|
|
print("No Recently Updated Files to Ingest")
|
|
return
|
|
|
|
chunks = chunk_documents(docs)
|
|
print(f"Split into {len(chunks)} chunks.")
|
|
|
|
enriched_chunks = enrich_chunks(chunks)
|
|
print(f"Enriched {len(enriched_chunks)} chunks.")
|
|
|
|
embedded_chunks = embed_chunks(enriched_chunks)
|
|
print(f"Embedded {len(embedded_chunks)} chunks.")
|
|
|
|
save_entities_from_chunks(embedded_chunks)
|
|
|
|
# remove existing rows from notes table that match file path
|
|
delete_from_db(embedded_chunks)
|
|
|
|
save_to_db(embedded_chunks)
|
|
print("🎉 Ingestion complete!")
|
|
|
|
updated = update_timefile()
|
|
print(f"Updated timefile to: {updated}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|