feat: ✨ Ingestion PoC success
This commit is contained in:
+229
@@ -0,0 +1,229 @@
|
||||
# ingest.py
|
||||
|
||||
import os
|
||||
import json
|
||||
import dspy
|
||||
import turso
|
||||
import requests
|
||||
|
||||
import json
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from typing import List
|
||||
from tqdm import tqdm
|
||||
from langchain_core.embeddings import Embeddings
|
||||
from langchain_community.vectorstores import FAISS
|
||||
from langchain_core.documents import Document
|
||||
from typing import List
|
||||
from pathlib import Path
|
||||
from langchain_community.document_loaders import TextLoader
|
||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||
|
||||
from experts.ingestion_agent import IngestionAgent
|
||||
|
||||
# exit()
|
||||
|
||||
CHROMA_PATH = "vector_vault"
|
||||
DATA_DIR = "/home/cosmic/DnD"
|
||||
|
||||
def load_documents():
|
||||
"""
|
||||
Recursively walk through DATA_DIR and load all .md files as plain text.
|
||||
Each document gets metadata including source filename and full path.
|
||||
Ideal for RAG embedding pipelines.
|
||||
"""
|
||||
docs = []
|
||||
|
||||
# Define loader mapping
|
||||
loaders = {
|
||||
".md": TextLoader,
|
||||
}
|
||||
|
||||
data_path = Path(DATA_DIR) # Ensure DATA_DIR is defined elsewhere as a string or Path
|
||||
|
||||
if not data_path.exists() or not data_path.is_dir():
|
||||
print(f"⚠️ Data directory '{DATA_DIR}' does not exist or is not a directory.")
|
||||
return docs
|
||||
|
||||
# Walk recursively through all files
|
||||
for file_path in data_path.rglob("*"):
|
||||
if file_path.is_file() and file_path.suffix.lower() == ".md":
|
||||
try:
|
||||
loader = loaders[file_path.suffix](file_path)
|
||||
loaded_docs = loader.load()
|
||||
|
||||
# Add metadata to each document
|
||||
for doc in loaded_docs:
|
||||
doc.metadata["source"] = file_path.name # e.g., "document.md"
|
||||
doc.metadata["full_path"] = str(file_path) # e.g., "/data/docs/document.md"
|
||||
|
||||
docs.extend(loaded_docs)
|
||||
print(f"✅ Loaded: {file_path}") # Remove this line if you want it silent
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Failed to load {file_path}: {e}")
|
||||
|
||||
print(f"📊 Total documents loaded: {len(docs)}")
|
||||
return docs
|
||||
|
||||
|
||||
def chunk_documents(docs):
|
||||
text_splitter = RecursiveCharacterTextSplitter(
|
||||
chunk_size=800,
|
||||
chunk_overlap=100,
|
||||
separators=["\n\n", "\n", ". ", " ", ""]
|
||||
)
|
||||
return text_splitter.split_documents(docs)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def enrich_chunks(chunks: List) -> List:
|
||||
enriched = []
|
||||
# Define your base model name — the same for all 10 slots
|
||||
MODEL_BASE = "lm_studio/qwen/qwen3-8b"
|
||||
API_BASE = "http://192.168.0.49:1234/v1/"
|
||||
dspy.configure(lm=dspy.LM("lm_studio/qwen/qwen3-8b", api_base="http://192.168.0.49:1234/v1/"))
|
||||
|
||||
def process_single_chunk(args):
|
||||
i, chunk = args
|
||||
lm_index = i % 8
|
||||
print(f"Processing chunk {i+1}/{len(chunks)} | using model {lm_index}")
|
||||
|
||||
try:
|
||||
with dspy.context(lm=dspy.LM(f"{MODEL_BASE}:{lm_index}", api_base = API_BASE)):
|
||||
response = IngestionAgent().ingest(note=chunk) # ← Uses thread's selected LM!
|
||||
|
||||
answer = response.answer
|
||||
start = answer.find('{')
|
||||
end = answer.rfind('}') + 1
|
||||
json_str = answer[start:end]
|
||||
metadata = json.loads(json_str)
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Failed to parse JSON for chunk {i}: {e}")
|
||||
metadata = {"synopsis": "Summary failed", "tags": ["error"]}
|
||||
|
||||
# Update the chunk's metadata
|
||||
chunk.metadata.update(metadata)
|
||||
return chunk
|
||||
|
||||
# Run 10 parallel workers — each will pick a different model slot
|
||||
with ThreadPoolExecutor(max_workers=8) as executor:
|
||||
futures = [executor.submit(process_single_chunk, (i, chunk)) for i, chunk in enumerate(chunks)]
|
||||
|
||||
for future in tqdm(as_completed(futures), total=len(chunks), desc="Enriching chunks"):
|
||||
enriched.append(future.result())
|
||||
|
||||
# Restore original order
|
||||
enriched.sort(key=lambda x: chunks.index(x))
|
||||
|
||||
return enriched
|
||||
|
||||
class PrecomputedEmbeddings(Embeddings):
|
||||
def __init__(self, embeddings: List[List[float]]):
|
||||
self.embeddings = embeddings # Store all precomputed vectors
|
||||
|
||||
def embed_documents(self, texts: List[str]) -> List[List[float]]:
|
||||
return self.embeddings # Return the precomputed ones (order must match!)
|
||||
|
||||
def embed_query(self, text):
|
||||
return self.embeddings[0]
|
||||
|
||||
def embedder(texts: List[str]) -> List[List[float]]:
|
||||
embeddings = []
|
||||
base_url = "http://192.168.0.49:1234" # ✅ Add 'http://'
|
||||
embed_url = f"{base_url}/v1/embeddings"
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
for text in texts:
|
||||
payload = {
|
||||
"model": "text-embedding-qwen3-embedding-8b",
|
||||
"input": text
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(embed_url, json=payload, headers=headers) # ✅ POST not GET
|
||||
if response.status_code == 200:
|
||||
data = response.json() # ✅ Parse JSON!
|
||||
embedding = data["data"][0]["embedding"] # ✅ Extract the actual vector
|
||||
embeddings.append(embedding)
|
||||
else:
|
||||
print(f"❌ Embedding failed for '{text[:30]}...': {response.status_code} - {response.text}")
|
||||
# Optionally: insert placeholder zeros if you need to continue
|
||||
# embeddings.append([0.0] * 768) # ← adjust dimension as needed!
|
||||
except Exception as e:
|
||||
print(f"⚠️ Exception embedding '{text[:30]}...': {e}")
|
||||
# embeddings.append([0.0] * 768) # fallback
|
||||
|
||||
return embeddings
|
||||
|
||||
def store_chunks_with_embeddings_locally(chunks, db_path="./local_faiss_db"):
|
||||
"""
|
||||
Stores pre-computed chunks and their embeddings into a local FAISS database.
|
||||
|
||||
Args:
|
||||
chunks: list of LangChain Document objects (with page_content and metadata)
|
||||
embeddings: list of embedding vectors (list of lists of floats) — must match length of chunks
|
||||
db_path: where to save the FAISS index files locally
|
||||
"""
|
||||
|
||||
texts = [chunk.page_content for chunk in chunks]
|
||||
embeddings = embedder(texts)
|
||||
if len(chunks) != len(embeddings):
|
||||
raise ValueError(f"Mismatch! Got {len(chunks)} chunks but {len(embeddings)} embeddings.")
|
||||
|
||||
# Create LangChain Document list (we already have this)
|
||||
documents = chunks # assuming they're already Document objects
|
||||
|
||||
# Build FAISS vectorstore using precomputed embeddings
|
||||
# FAISS.from_embeddings() lets us pass our own embeddings + texts
|
||||
vectorstore = FAISS.from_embeddings(
|
||||
text_embeddings=list(zip([doc.page_content for doc in documents], embeddings)),
|
||||
embedding=PrecomputedEmbeddings(embeddings[0]) # We’ll define this next
|
||||
)
|
||||
|
||||
# Save to disk
|
||||
vectorstore.save_local(db_path)
|
||||
print(f"✅ Successfully stored {len(chunks)} chunks + embeddings into local FAISS DB at '{db_path}'")
|
||||
|
||||
# # Store in Turso
|
||||
# def store_in_turso(chunks):
|
||||
# ## needs refactor, not using chroma
|
||||
# client = turso.PersistentClient(path=CHROMA_PATH)
|
||||
# collection = client.get_or_create_collection("documents")
|
||||
|
||||
# ids = [f"doc_{i}" for i in range(len(chunks))]
|
||||
# metadatas = [chunk.metadata for chunk in chunks]
|
||||
# embeddings = embedder(texts)
|
||||
|
||||
# collection.add(
|
||||
# ids=ids,
|
||||
# documents=texts,
|
||||
# embeddings=embeddings,
|
||||
# metadatas=metadatas
|
||||
# )
|
||||
# print(f"✅ Successfully stored {len(chunks)} chunks in Chroma DB.")
|
||||
|
||||
def main():
|
||||
print("🔍 Loading documents...")
|
||||
docs = load_documents()
|
||||
if not docs:
|
||||
print("⚠️ No files found in 'documents/'. Add some PDFs, TXT, or DOCX.")
|
||||
return
|
||||
|
||||
print(f"📄 Loaded {len(docs)} documents. Splitting into chunks...")
|
||||
chunks = chunk_documents(docs)
|
||||
print(f"🧩 Created {len(chunks)} chunks.")
|
||||
|
||||
print("🧠 Generating summaries and tags using local LLM... (this may take a few minutes)")
|
||||
enriched_chunks = enrich_chunks(chunks)
|
||||
|
||||
print("💾 Storing in vector database...")
|
||||
store_chunks_with_embeddings_locally(enriched_chunks)
|
||||
|
||||
print("🎉 Ingestion complete!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user