chore: 🧹 removing clutter

This commit is contained in:
2026-01-27 22:04:31 +00:00
parent 4296a4df88
commit d5f8d72e46
12 changed files with 235 additions and 387 deletions
+97
View File
@@ -0,0 +1,97 @@
This `README.md` is designed to reflect the sophisticated local RAG pipeline you've built, highlighting the multi-threaded enrichment and the DSPy-powered "Smart Retrieval" system.
---
# 🐉 DnD Campaign Oracle: Local RAG Assistant
An advanced Retrieval-Augmented Generation (RAG) system designed for Dungeon Masters. This tool ingests markdown-based campaign notes, enriches them with AI-generated metadata, and provides an interactive terminal interface to query your worlds lore using **DSPy** and **Local LLMs**.
## ⚔️ Key Features
* **Parallel Enrichment:** Utilizes a `ThreadPoolExecutor` to process multiple document chunks simultaneously across local LLM slots for high-speed ingestion.
* **Structured Metadata:** Uses **DSPy TypedPredictors** and **Pydantic** to force LLMs to output valid JSON synopses, tags, and entity lists.
* **Deep Context Retrieval:** Unlike standard RAG, this system retrieves relevant chunks and then "peeks" at the full source file to provide the LLM with broader narrative context.
* **Local-First:** Designed to run entirely on your hardware using **LM Studio** and **FAISS**, keeping your campaign secrets private.
---
## 🏗️ Architecture
1. **Ingestion:** Scans `DATA_DIR` for `.md` files.
2. **Chunking:** Splits documents into 800-character segments with overlap.
3. **Enrichment:** A DSPy `IngestionAgent` analyzes each chunk to extract:
* **Synopsis:** A one-sentence summary.
* **Tags:** Plot points, item names, or themes.
* **Entities:** Specific NPCs, Locations, or Factions.
4. **Vector Store:** Chunks and metadata are embedded using `text-embedding-qwen3` and stored in a local **FAISS** index.
5. **Interactive RAG:** A terminal loop that uses **Chain of Thought (CoT)** reasoning to answer queries based on retrieved context.
---
## 🛠️ Setup
### Prerequisites
* **Python 3.10+**
* **LM Studio:** Running a local server at `http://192.168.0.49:1234` (or your specific IP).
* **Models:** * Inference: `qwen3-8b` (or similar).
* Embedding: `text-embedding-qwen3-embedding-8b`.
### Installation
```bash
uv sync
```
---
## 🚀 Usage
### 1. Ingest & Enrich
Run the ingestion script to process your markdown files and build the vector database.
```bash
uv run src/ingest.py
```
### 2. Query the Oracle
Launch the interactive session to ask questions about your campaign.
```bash
uv run src/retrieve.py
```
**Example Query:**
> `📝 Query: Why did the party get free bread at the Golden Grain Inn?`
> `📜 AI RESPONSE: Based on the session notes from 'Session_12.md', the party received free bread because the Rogue successfully intimidated the baker's assistant, and the Cleric later performed a minor miracle (Thaumaturgy) that impressed the owner.`
---
## 📂 File Structure
* `ingest.py`: Handles file loading, multi-threaded enrichment, and FAISS storage.
* `retrieve.py`: The interactive terminal-based retrieval loop.
* `experts/ingestion_agent.py`: Contains the `IngestionAgent` and Pydantic schemas.
* `embedding.py`: Custom wrapper for `LocalLMEmbeddings` with batch processing support.
* `local_faiss_db/`: Directory where the vector index and metadata are persisted.
---
## ⚙️ Configuration
In `ingest_notes.py`, you can tune the processing speed:
* `max_workers=8`: Adjust based on your GPU/CPU capability to handle concurrent LLM requests.
* `chunk_size=800`: Increase for more context per chunk, decrease for more granular searching.
---
-66
View File
@@ -1,66 +0,0 @@
# class PrecomputedEmbeddings(Embeddings):
# def __init__(self, embeddings: List[List[float]]):
# self.embeddings = embeddings # Store all precomputed vectors
# def embed_documents(self, texts: List[str]) -> List[List[float]]:
# return self.embeddings # Return the precomputed ones (order must match!)
# def embed_query(self, text):
# return self.embeddings[0]
# def embedder(texts: List[str]) -> List[List[float]]:
# embeddings = []
# base_url = "http://192.168.0.49:1234" # ✅ Add 'http://'
# embed_url = f"{base_url}/v1/embeddings"
# headers = {"Content-Type": "application/json"}
# for text in texts:
# payload = {
# "model": "text-embedding-qwen3-embedding-8b",
# "input": text
# }
# try:
# response = requests.post(embed_url, json=payload, headers=headers) # ✅ POST not GET
# if response.status_code == 200:
# data = response.json() # ✅ Parse JSON!
# embedding = data["data"][0]["embedding"] # ✅ Extract the actual vector
# embeddings.append(embedding)
# else:
# print(f"❌ Embedding failed for '{text[:30]}...': {response.status_code} - {response.text}")
# # Optionally: insert placeholder zeros if you need to continue
# # embeddings.append([0.0] * 768) # ← adjust dimension as needed!
# except Exception as e:
# print(f"⚠️ Exception embedding '{text[:30]}...': {e}")
# # embeddings.append([0.0] * 768) # fallback
# return embeddings
# def store_chunks_with_embeddings_locally(chunks, db_path="./local_faiss_db"):
# """
# Stores pre-computed chunks and their embeddings into a local FAISS database.
# Args:
# chunks: list of LangChain Document objects (with page_content and metadata)
# embeddings: list of embedding vectors (list of lists of floats) — must match length of chunks
# db_path: where to save the FAISS index files locally
# """
# texts = [chunk.page_content for chunk in chunks]
# embeddings = embedder(texts)
# if len(chunks) != len(embeddings):
# raise ValueError(f"Mismatch! Got {len(chunks)} chunks but {len(embeddings)} embeddings.")
# # Create LangChain Document list (we already have this)
# documents = chunks # assuming they're already Document objects
# # Build FAISS vectorstore using precomputed embeddings
# # FAISS.from_embeddings() lets us pass our own embeddings + texts
# vectorstore = FAISS.from_embeddings(
# text_embeddings=list(zip([doc.page_content for doc in documents], embeddings)),
# embedding=PrecomputedEmbeddings(embeddings[0]) # Well define this next
# )
# # Save to disk
# vectorstore.save_local(db_path)
# print(f"✅ Successfully stored {len(chunks)} chunks + embeddings into local FAISS DB at '{db_path}'")
+22
View File
@@ -0,0 +1,22 @@
# --- Connection Settings ---
api:
base_url: "http://192.168.0.49:1234"
api_version: "/v1/"
# --- Model Settings ---
models:
inference: "lm_studio/qwen/qwen3-8b"
embedding: "text-embedding-qwen3-embedding-8b"
# --- Ingestion Settings ---
ingestion:
data_dir: "/home/cosmic/DnD"
db_path: "./local_faiss_db"
max_workers: 8
chunk_size: 800
chunk_overlap: 100
# --- Retrieval Settings ---
retrieval:
top_k: 4
context_limit: 10000 # Max characters from full file context
+10
View File
@@ -0,0 +1,10 @@
import yaml
from pathlib import Path
def load_config(config_path="src/config.yaml"):
with open(config_path, "r") as f:
return yaml.safe_load(f)
# Usage example:
# CFG = load_config()
# print(CFG['api']['base_url'])
-50
View File
@@ -1,50 +0,0 @@
"""Model Factory for creating language model instances.
Separates model creation logic from configuration.
"""
import dspy
from config import Config
class ModelFactory:
"""Factory class for creating language model instances based on configuration."""
@staticmethod
def create_dspy_model(agent_type: str, agent_name: str = None) -> dspy.LM:
"""Create a dspy.LM object for a specific agent with conditional parameters.
Only includes api_base and api_key if they are configured.
Args:
agent_type (str): 'orchestrator' or 'expert'
agent_name (str): For experts, specific agent name like 'weather', 'games'
Returns:
dspy.LM: Configured language model object
"""
config = Config.Model.get_agent_config(agent_type, agent_name)
# Build dspy.LM parameters conditionally
lm_params = {"model": f"{config['provider']}/{config['model_name']}"}
# Only add api_base if it's configured (not None)
if config.get("api_base"):
lm_params["api_base"] = config["api_base"]
# Only add api_key if it's configured (not None)
if config.get("api_key"):
lm_params["api_key"] = config["api_key"]
return dspy.LM(**lm_params)
@staticmethod
def create_orchestrator_model() -> dspy.LM:
"""Create orchestrator model."""
return ModelFactory.create_dspy_model("orchestrator")
@staticmethod
def create_weather_model() -> dspy.LM:
"""Create weather expert model."""
return ModelFactory.create_dspy_model("expert", "ingest")
+62
View File
@@ -0,0 +1,62 @@
import dspy
from langchain_community.vectorstores import FAISS
from embedding import LocalLMEmbeddings
from pathlib import Path
# --- DSPy Signature ---
class DnDContextQA(dspy.Signature):
"""Answer DnD campaign questions using provided snippets and full file context.
/no_think"""
context = dspy.InputField(desc="Relevant chunks and full file contents from the campaign notes.")
question = dspy.InputField()
answer = dspy.OutputField(desc="A detailed answer based on the notes, citing the source file.")
# --- DSPy Module ---
class DnDRAG(dspy.Module):
def __init__(self, db_path="./local_faiss_db", k=3):
super().__init__()
# 1. Setup Embeddings & Load FAISS
self.embeddings = LocalLMEmbeddings(
model="text-embedding-qwen3-embedding-8b",
base_url="http://192.168.0.49:1234"
)
self.vectorstore = FAISS.load_local(
db_path, self.embeddings, allow_dangerous_deserialization=True
)
self.k = k
# 2. Setup the Predictor (Chain of Thought for better reasoning)
self.generate_answer = dspy.ChainOfThought(DnDContextQA)
def get_full_file_content(self, file_path):
"""Helper to read the full source file if it exists."""
try:
return Path(file_path).read_text(encoding='utf-8')
except Exception:
return ""
def forward(self, question):
# 1. Search for top-k chunks
results = self.vectorstore.similarity_search(question, k=self.k)
# 2. Extract unique file paths to load "Full Context"
# This prevents the LLM from being 'blind' to the rest of a relevant session note
unique_paths = list(set([doc.metadata.get("full_path") for doc in results]))
context_parts = []
for i, doc in enumerate(results):
source = doc.metadata.get("source", "Unknown")
context_parts.append(f"--- Chunk {i+1} from {source} ---\n{doc.page_content}")
# 3. Add the Full Content of the top match (optional, but requested!)
# We'll just take the top 1 file to avoid context window explosion
if unique_paths:
top_file_content = self.get_full_file_content(unique_paths[0])
context_parts.append(f"\n=== FULL SOURCE FILE: {Path(unique_paths[0]).name} ===\n{top_file_content[:10000]}")
# 4. Join everything into one context string
context_str = "\n\n".join(context_parts)
# 5. Generate Response
prediction = self.generate_answer(context=context_str, question=question)
return dspy.Prediction(answer=prediction.answer, context=context_str)
+24 -15
View File
@@ -1,22 +1,31 @@
import dspy
from pydantic import BaseModel, Field
from typing import List
# 1. Define the structure of your metadata
class DocMetadata(BaseModel):
synopsis: str = Field(description="A one-sentence summary of the document.")
tags: List[str] = Field(description="Relevant tags (NPCs, Locations, Items, Plot Points).")
entities: List[str] = Field(description="Key names of people, places, or factions.")
class ingestionSignature(dspy.Signature):
"""You are going to be given dungeon masters notes, on session plans, recaps, npcs, players.
You must summarize these document in one sentence
and extract as many relevant tags aspossible as a JSON list:
{{'synopsis': '...', 'tags': [...]}}\n\nDocument:\n{content}"
/no_think
class IngestionSignature(dspy.Signature):
"""
note: str = dspy.InputField()
answer: str = dspy.OutputField()
You are an expert Dungeon Master's assistant.
Analyze the provided notes and extract a concise synopsis and relevant metadata.
"""
note: str = dspy.InputField(desc="The DM notes or session recap content.")
# By using the Pydantic model as the type, DSPy handles the JSON formatting for you
answer: DocMetadata = dspy.OutputField()
class IngestionAgent(dspy.Module):
"""The Ingestion Agent is responsible for Document tagging and summarising."""
def __init__(self):
"""Initialize the Oracle with available expert tools."""
# self.tools = []
self.ingest = dspy.Predict(signature=ingestionSignature)
super().__init__()
# We use TypedPredictor to enforce the Pydantic schema
# We use ChainOfThought because it helps 8B models "reason" through the tags
# before committing to the final JSON structure.
self.process = dspy.TypedPredictor(IngestionSignature)
def forward(self, note: str):
# The .answer will now be a DocMetadata object, not a string!
prediction = self.process(note=note)
return prediction
-33
View File
@@ -1,33 +0,0 @@
import dspy
from core import ModelFactory
from .file import FileAgent
class OrchestratorSignature(dspy.Signature):
""" """
question: str = dspy.InputField()
history: dspy.History = dspy.InputField()
answer: str = dspy.OutputField()
class TheOracle(dspy.Module):
"""The Oracle is the orchestrator of all the agents."""
def __init__(self):
"""Initialize the Oracle with available expert tools."""
self.tools = [
self.consult_file_expert,
]
self.oracle = dspy.ReAct(signature=OrchestratorSignature, tools=self.tools, max_iters=10)
def consult_file_expert(self, command: str) -> str:
"""Use this expert when you want to save or retrieve information from files.
Also used to find files and update files
"""
with dspy.context(lm=ModelFactory.create_file_model()):
result = FileAgent().file_agent(command=command)
return result.answer
+19 -25
View File
@@ -10,8 +10,11 @@ from tqdm import tqdm
from embedding import LocalLMEmbeddings
from experts.ingestion_agent import IngestionAgent
from config_loader import load_config
DATA_DIR = "/home/cosmic/DnD"
CFG = load_config()
DATA_DIR = CFG["ingestion"]["data_dir"]
def load_documents():
docs = []
@@ -41,47 +44,38 @@ def load_documents():
def chunk_documents(docs):
# LangChain preserves metadata during splitting automatically
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=100,
chunk_size=CFG["ingestion"]["chunk_size"],
chunk_overlap=CFG["ingestion"]["chunk_overlap"],
separators=["\n\n", "\n", ". ", " ", ""]
)
return text_splitter.split_documents(docs)
def enrich_chunks(chunks: list) -> list:
MODEL_BASE = "lm_studio/qwen/qwen3-8b"
API_BASE = "http://192.168.0.49:1234/v1/"
MODEL_BASE = CFG["models"]["inference"]
API_BASE = CFG["api"]["base_url"]
API_VERSION = CFG["api"]["api_version"]
def process_single_chunk(indexed_chunk):
idx, chunk = indexed_chunk
lm_index = idx % 8
try:
# Configure context for this specific thread
with dspy.context(lm=dspy.LM(f"{MODEL_BASE}:{lm_index}", api_base=API_BASE)):
# Pass the text, but we will update the original chunk object
response = IngestionAgent().ingest(note=chunk.page_content)
with dspy.context(lm=dspy.LM(f"{MODEL_BASE}:{lm_index}", api_base=API_BASE+API_VERSION)):
response = IngestionAgent().forward(note=chunk.page_content)
answer = response.answer
start = answer.find("{")
end = answer.rfind("}") + 1
metadata_extracted = json.loads(answer[start:end])
# UPDATE: Put AI data in a sub-key to avoid overwriting 'source'
chunk.metadata["enrichment"] = metadata_extracted
# Also flatten tags for easier searching if needed
if "tags" in metadata_extracted:
chunk.metadata["tags"] = metadata_extracted["tags"]
# This is now an object, not a string!
metadata = response.answer.dict()
except Exception as e:
# If enrichment fails, we KEEP the chunk but flag the error
# This ensures 'source' and 'full_path' are NEVER lost
chunk.metadata["enrichment_error"] = str(e)
chunk.metadata["tags"] = ["error"]
print(f"⚠️ Failed for chunk {idx}: {e}")
metadata = {"synopsis": "Summary failed", "tags": ["error"], "entities": []}
chunk.metadata.update(metadata)
return chunk
return idx, chunk
enriched_results = []
with ThreadPoolExecutor(max_workers=8) as executor:
with ThreadPoolExecutor(max_workers=CFG["ingestion"]["max_workers"]) as executor:
# Wrap chunks in enumerate to keep track of order
futures = [executor.submit(process_single_chunk, (i, c)) for i, c in enumerate(chunks)]
-105
View File
@@ -1,105 +0,0 @@
import chromadb
import streamlit as st
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_community.llms import Ollama
from langchain_core.prompts import PromptTemplate
# CONFIG
BASE_IP = "192.168.0.49"
LM_STUDIO_PORT = "1234"
CHROMA_PATH = "vector_db"
MODEL_NAME = (
"lmstudio-community/qwen/qwen3-next-80b-a3b-instruct-q8_0.gguf" # Use "llama3", "phi3", etc.
)
EMBEDDING_MODEL = "all-MiniLM-L6-v2"
# Load embedding model
embedder = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
# Load local LLM for answering
llm = Ollama(model=MODEL_NAME, temperature=0.3)
# Initialize Chroma client
client = chromadb.PersistentClient(path=CHROMA_PATH)
collection = client.get_collection("documents")
# Prompt template
prompt_template = """
You are a helpful assistant that answers questions using ONLY the context provided.
Do not make up information or use external knowledge.
Question: {question}
Context:
{context}
If you cannot find an answer, say "I don't know based on the provided documents."
Answer:
"""
prompt = PromptTemplate.from_template(prompt_template)
# Streamlit UI
st.title("📄 Local RAG Knowledge Assistant")
st.write("Upload files to `documents/` and run `ingest.py` first.")
query = st.text_input(
"Ask a question about your documents:", placeholder="What are the key financial metrics?"
)
if query:
with st.spinner("Searching for relevant info..."):
# Embed query
query_embedding = embedder.embed_query(query)
# Retrieve top 5 most similar chunks
results = collection.query(
query_embeddings=[query_embedding], n_results=5, include=["documents", "metadatas"]
)
documents = results["documents"][0]
metadatas = results["metadatas"][0]
# Build context from retrieved chunks + metadata
context = ""
for i, doc in enumerate(documents):
meta = metadatas[i]
synopsis = meta.get("synopsis", "No summary")
tags = (
", ".join(meta.get("tags", []))
if isinstance(meta.get("tags"), list)
else str(meta.get("tags"))
)
source = meta.get("source", "Unknown")
context += f"""
--- Document Snippet ---
{doc}
Synopsis: {synopsis}
Tags: {tags}
Source: {source}
---
"""
# Ask LLM
full_prompt = prompt.format(question=query, context=context)
with st.spinner("Generating answer..."):
response = llm.invoke(full_prompt)
st.subheader("🔍 Answer:")
st.write(response)
st.subheader("📚 Sources (retrieved chunks):")
for i, doc in enumerate(documents):
meta = metadatas[i]
source = meta.get("source", "Unknown")
tags = (
", ".join(meta.get("tags", []))
if isinstance(meta.get("tags"), list)
else str(meta.get("tags"))
)
st.markdown(f"**Source**: `{source}` | **Tags**: {tags}")
st.text_area(f"Snippet {i + 1}", doc, height=120, disabled=True)
+1 -62
View File
@@ -1,67 +1,6 @@
import sys
import dspy
from langchain_community.vectorstores import FAISS
from embedding import LocalLMEmbeddings
from pathlib import Path
# --- DSPy Signature ---
class DnDContextQA(dspy.Signature):
"""Answer DnD campaign questions using provided snippets and full file context."""
context = dspy.InputField(desc="Relevant chunks and full file contents from the campaign notes.")
question = dspy.InputField()
answer = dspy.OutputField(desc="A detailed answer based on the notes, citing the source file.")
# --- DSPy Module ---
class DnDRAG(dspy.Module):
def __init__(self, db_path="./local_faiss_db", k=3):
super().__init__()
# 1. Setup Embeddings & Load FAISS
self.embeddings = LocalLMEmbeddings(
model="text-embedding-qwen3-embedding-8b",
base_url="http://192.168.0.49:1234"
)
self.vectorstore = FAISS.load_local(
db_path, self.embeddings, allow_dangerous_deserialization=True
)
self.k = k
# 2. Setup the Predictor (Chain of Thought for better reasoning)
self.generate_answer = dspy.ChainOfThought(DnDContextQA)
def get_full_file_content(self, file_path):
"""Helper to read the full source file if it exists."""
try:
return Path(file_path).read_text(encoding='utf-8')
except Exception:
return ""
def forward(self, question):
# 1. Search for top-k chunks
results = self.vectorstore.similarity_search(question, k=self.k)
# 2. Extract unique file paths to load "Full Context"
# This prevents the LLM from being 'blind' to the rest of a relevant session note
unique_paths = list(set([doc.metadata.get("full_path") for doc in results]))
context_parts = []
for i, doc in enumerate(results):
source = doc.metadata.get("source", "Unknown")
context_parts.append(f"--- Chunk {i+1} from {source} ---\n{doc.page_content}")
# 3. Add the Full Content of the top match (optional, but requested!)
# We'll just take the top 1 file to avoid context window explosion
if unique_paths:
top_file_content = self.get_full_file_content(unique_paths[0])
context_parts.append(f"\n=== FULL SOURCE FILE: {Path(unique_paths[0]).name} ===\n{top_file_content[:10000]}")
# 4. Join everything into one context string
context_str = "\n\n".join(context_parts)
# 5. Generate Response
prediction = self.generate_answer(context=context_str, question=question)
return dspy.Prediction(answer=prediction.answer, context=context_str)
from experts.dnd_agent import DnDRAG
def main():
# 1. Setup the LLM
-31
View File
@@ -1,31 +0,0 @@
from langchain_community.vectorstores import FAISS
from embedding import LocalLMEmbeddings
def retrieve_enriched_context(query, db_path="./local_faiss_db"):
# 1. Re-initialize the same embedding model
embeddings_model = LocalLMEmbeddings(
model="text-embedding-qwen3-embedding-8b", base_url="http://192.168.0.49:1234"
)
# 2. Load the index from disk
# allow_dangerous_deserialization is required because FAISS uses pickle
vectorstore = FAISS.load_local(db_path, embeddings_model, allow_dangerous_deserialization=True)
# 3. Perform the search
# k=4 means "bring back the top 4 most relevant chunks"
results_with_scores = vectorstore.similarity_search_with_score(query, k=4)
return results_with_scores
# --- Example Usage ---
query = "the party get free bread but i cant remember why?"
hits = retrieve_enriched_context(query)
for doc, score in hits:
print(f"\n🎯 [Score: {score:.4f}]")
print(f"📄 Content: {doc.page_content[:200]}...")
print(f"🛠️ Metadata (Enrichment): {doc.metadata}")
# print(f"doc: {doc}")