feat: Working PoC of the Dungeon Masters Vault

This commit is contained in:
2026-01-27 21:24:18 +00:00
parent 645e9461ce
commit 4296a4df88
15 changed files with 347 additions and 563 deletions
-231
View File
@@ -1,231 +0,0 @@
import os
from pathlib import Path
from types import SimpleNamespace
import yaml
from .user_config import UserConfig
class Config:
"""Main Config Class for application-level configuration."""
ENVIRONMENT = "dev"
DEBUG = True
LOG_LEVEL = "DEBUG"
FILE_SEARCH_DIRECTORIES = [os.path.expanduser("~")]
FILE_SEARCH_DIRECTORIES.extend(UserConfig.FILE_SEARCH_DIRECTORIES)
class Model:
"""Application-level model configuration with inheritance support."""
# Application-level defaults for all agents
# TODO: We need to decide on what we want our defaults to be,
# would we advise shipping with lm_studio or ollama?
# These can be overridden by user_config.py
PROVIDER = "ollama_chat"
MODEL_NAME = "qwen3:latest"
# Default connection settings (None = no custom connection)
HOST_ADDRESS = None
HOST_PORT = None
HOST_API_KEY = None
HOST_API_PATH = None # e.g., "v1" for OpenAI-compatible APIs
# Application-level agent configurations (usually empty)
ORCHESTRATOR = {}
EXPERTS = {"default": {}, "weather": {}, "games": {}, "lighting": {}}
# Helper method to get merged configuration (app + user)
@classmethod
def _get_base_config(cls):
"""Get base configuration with provider and model settings."""
base_config = {
"provider": cls.PROVIDER,
"model_name": cls.MODEL_NAME,
}
# Add base connection settings only if they exist
if hasattr(cls, "HOST_ADDRESS") and cls.HOST_ADDRESS:
api_path = getattr(cls, "HOST_API_PATH", "") or ""
base_config["api_base"] = (
f"http://{cls.HOST_ADDRESS}:{cls.HOST_PORT}/{api_path}"
)
if hasattr(cls, "HOST_API_KEY") and cls.HOST_API_KEY:
base_config["api_key"] = cls.HOST_API_KEY
return base_config
@classmethod
def _merge_user_config(cls, base_config):
"""Merge user configuration overrides with base config."""
try:
user_model_config = UserConfig.Model
# Override base config with user settings
if hasattr(user_model_config, "PROVIDER"):
base_config["provider"] = user_model_config.PROVIDER
if hasattr(user_model_config, "MODEL_NAME"):
base_config["model_name"] = user_model_config.MODEL_NAME
if (
hasattr(user_model_config, "HOST_ADDRESS")
and user_model_config.HOST_ADDRESS
):
api_path = getattr(user_model_config, "HOST_API_PATH", "") or ""
base_config["api_base"] = (
f"http://{user_model_config.HOST_ADDRESS}:"
f"{user_model_config.HOST_PORT}/{api_path}"
)
if (
hasattr(user_model_config, "HOST_API_KEY")
and user_model_config.HOST_API_KEY
):
base_config["api_key"] = user_model_config.HOST_API_KEY
return user_model_config
except ImportError:
return None
@classmethod
def get_agent_config(cls, agent_type, agent_name=None):
"""Get configuration for a specific agent type and name.
Merges application config with user config overrides.
Args:
agent_type (str): 'orchestrator' or 'expert'
agent_name (str): For experts, specific agent name like
'weather', 'games'
Returns:
dict: Complete configuration for the agent
"""
base_config = cls._get_base_config()
user_model_config = cls._merge_user_config(base_config)
# Get application-level agent config
if agent_type.lower() == "orchestrator":
return cls._get_orchestrator_config(base_config, user_model_config)
elif agent_type.lower() == "expert":
return cls._get_expert_config(
base_config, user_model_config, agent_name
)
else:
return base_config
@classmethod
def _get_orchestrator_config(cls, base_config, user_model_config):
"""Get orchestrator-specific configuration."""
app_agent_config = getattr(cls, "ORCHESTRATOR", {})
user_agent_config = (
getattr(user_model_config, "ORCHESTRATOR", {})
if user_model_config
else {}
)
return {**base_config, **app_agent_config, **user_agent_config}
@classmethod
def _get_expert_config(cls, base_config, user_model_config, agent_name):
"""Get expert-specific configuration."""
app_experts_config = getattr(cls, "EXPERTS", {})
user_experts_config = (
getattr(user_model_config, "EXPERTS", {}) if user_model_config else {}
)
# Start with default expert config
app_expert_config = app_experts_config.get("default", {})
user_expert_default = user_experts_config.get("default", {})
expert_config = {**app_expert_config, **user_expert_default}
# If specific agent name provided, merge its config
if agent_name:
app_specific_config = app_experts_config.get(agent_name, {})
user_specific_config = user_experts_config.get(agent_name, {})
expert_config = {
**expert_config,
**app_specific_config,
**user_specific_config,
}
return {**base_config, **expert_config}
class Weather:
"""Weather-related configuration and mappings."""
CODE_MAP = {
0: "Clear sky",
1: "Mainly clear",
2: "Partly cloudy",
3: "Overcast",
45: "Fog",
48: "Depositing rime fog",
51: "Light drizzle",
53: "Moderate drizzle",
55: "Dense drizzle",
56: "Light freezing drizzle",
57: "Dense freezing drizzle",
61: "Slight rain",
63: "Moderate rain",
65: "Heavy rain",
66: "Light freezing rain",
67: "Heavy freezing rain",
71: "Slight snow",
73: "Moderate snow",
75: "Heavy snow",
77: "Snow grains",
80: "Slight rain showers",
81: "Moderate rain showers",
82: "Violent rain showers",
85: "Slight snow showers",
86: "Heavy snow showers",
95: "Thunderstorm",
96: "Thunderstorm with slight hail",
99: "Thunderstorm with heavy hail",
}
@classmethod # Load from YAML
def load_yaml(cls, file_path="src/config/config.yaml"):
"""Load configuration from YAML file.
Args:
file_path (str): Path to the YAML configuration file.
Returns:
AppConfig: Configuration instance with loaded settings.
"""
yaml_file = Path(file_path)
if not yaml_file.exists():
default_dict = {"DEBUG": True}
with open(yaml_file, "w") as f:
yaml.dump(default_dict, f)
with open(yaml_file) as f:
config_data = yaml.safe_load(f)
# Populate lights and rooms
lights_data = config_data.get("lights", {})
rooms_data = config_data.get("rooms", {})
class Lights:
pass
class Rooms:
pass
for lightname, light_config in lights_data.items():
light_obj = SimpleNamespace(light_config)
setattr(Lights, lightname.replace(" ", "").lower(), light_obj)
for roomname, room_config in rooms_data.items():
room_obj = SimpleNamespace(room_config)
setattr(Rooms, roomname.replace(" ", "").lower(), room_obj)
cls.Lights = Lights
cls.Rooms = Rooms
# Load the YAML config when the module is imported
Config.load_yaml()
-31
View File
@@ -1,31 +0,0 @@
"""User-specific configuration file.
DO NOT commit user_config.py to version control!
"""
class UserConfig:
"""User-specific model configurations - override application defaults."""
class Model:
"""Personal model preferences and overrides."""
# Base model overrides (affects all agents unless specifically overridden)
PROVIDER = "lm_studio"
MODEL_NAME = "openai/gpt-oss-20b"
HOST_ADDRESS = "192.168.0.49"
HOST_PORT = "1234"
HOST_API_KEY = "no-key"
HOST_API_PATH = "v1"
# Orchestrator personal config
ORCHESTRATOR = {}
# Expert agents personal config
EXPERTS = {
"default": {
"model_name": "qwen/qwen3-coder-30b",
},
"ingest": {},
"ask": {},
}
-97
View File
@@ -1,97 +0,0 @@
"""User-specific configuration file.
Copy this to user_config.py and customize with your personal settings.
DO NOT commit user_config.py to version control!
"""
class UserConfig:
"""User-specific model configurations - override application defaults."""
# List of file paths you want the AI to start in when searching for files
# We already default to your user home folder
FILE_SEARCH_DIRECTORIES = []
class Model:
"""Personal model preferences and overrides."""
# Personal model preferences
# Uncomment and modify as needed
# Base model overrides (affects all agents unless specifically overridden)
PROVIDER = "lm_studio"
MODEL_NAME = "openai/gpt-oss-20b"
HOST_ADDRESS = "127.0.0.1"
HOST_PORT = "1234"
HOST_API_KEY = "your-personal-key"
HOST_API_PATH = "v1"
# Orchestrator personal config
ORCHESTRATOR = {
# 'model_name': 'gpt-4',
# 'api_base': 'https://api.openai.com/v1',
# 'api_key': 'your-openai-key'
}
# Expert agents personal config
# if using multiple models from your host set above
# you only need to add the model name.
EXPERTS = {
"default": {
# 'model_name': 'claude-3-sonnet',
# 'api_base': 'https://api.anthropic.com',
# 'api_key': 'your-anthropic-key'
},
"weather": {
# 'model_name': 'gpt-4-turbo',
# 'api_base': 'https://api.openai.com/v1',
# 'api_key': 'your-openai-key'
},
"games": {
# 'model_name': 'claude-3-opus',
# 'api_base': 'https://api.anthropic.com',
# 'api_key': 'your-anthropic-key'
},
}
# Example configurations:
#
# Use local Ollama with custom port:
# class Model:
# HOST_ADDRESS = '127.0.0.1'
# HOST_PORT = '11434'
# HOST_API_KEY = 'local'
#
# Use OpenAI for everything:
# class Model:
# PROVIDER = 'openai_chat'
# MODEL_NAME = 'gpt-4'
# ORCHESTRATOR = {
# 'api_base': 'https://api.openai.com/v1',
# 'api_key': 'your-openai-key'
# }
# EXPERTS = {
# 'default': {
# 'api_base': 'https://api.openai.com/v1',
# 'api_key': 'your-openai-key'
# }
# }
#
# Mixed providers:
# class Model:
# ORCHESTRATOR = {
# 'model_name': 'gpt-4',
# 'api_base': 'https://api.openai.com/v1',
# 'api_key': 'your-openai-key'
# }
# EXPERTS = {
# 'weather': {
# 'model_name': 'claude-3-sonnet',
# 'api_base': 'https://api.anthropic.com',
# 'api_key': 'your-anthropic-key'
# },
# 'games': {
# 'model_name': 'llama3:8b' # Uses local Ollama
# }
# }
-1
View File
@@ -4,7 +4,6 @@ Separates model creation logic from configuration.
"""
import dspy
from config import Config
+45
View File
@@ -0,0 +1,45 @@
import requests
from langchain_core.embeddings import Embeddings
class LocalLMEmbeddings(Embeddings):
def __init__(
self, model: str, base_url: str = "http://192.168.0.49:1234", batch_size: int = 32
):
self.url = f"{base_url}/v1/embeddings"
self.model = model
self.batch_size = batch_size
def _post_request(self, input_texts: list[str]) -> list[list[float]]:
"""Handles the actual HTTP POST to the local server."""
payload = {"model": self.model, "input": input_texts}
try:
response = requests.post(
self.url, json=payload, timeout=120
) # Longer timeout for batches
response.raise_for_status()
data = response.json()
return [item["embedding"] for item in data["data"]]
except Exception as e:
print(f"❌ Batch request failed: {e}")
# Returning empty lists to maintain index integrity if needed,
# or you could raise the error to stop the pipeline.
return [[] for _ in input_texts]
def embed_documents(self, texts: list[str]) -> list[list[float]]:
"""Splits 500+ chunks into batches of 32 and processes them."""
all_embeddings = []
for i in range(0, len(texts), self.batch_size):
batch = texts[i : i + self.batch_size]
print(f"🚀 Processing batch {(i // self.batch_size) + 1} (Size: {len(batch)})...")
batch_vectors = self._post_request(batch)
all_embeddings.extend(batch_vectors)
return all_embeddings
def embed_query(self, text: str) -> list[float]:
"""Embeds the single search query."""
result = self._post_request([text])
return result[0] if result else []
+3 -4
View File
@@ -1,8 +1,9 @@
import dspy
class ingestionSignature(dspy.Signature):
"""You are going to be given dungeon masters notes, on session plans, recaps, npcs, players.
You must summarize these document in one sentence
You must summarize these document in one sentence
and extract as many relevant tags aspossible as a JSON list:
{{'synopsis': '...', 'tags': [...]}}\n\nDocument:\n{content}"
/no_think
@@ -18,6 +19,4 @@ class IngestionAgent(dspy.Module):
def __init__(self):
"""Initialize the Oracle with available expert tools."""
# self.tools = []
self.ingest = dspy.Predict(
signature=ingestionSignature
)
self.ingest = dspy.Predict(signature=ingestionSignature)
+2 -5
View File
@@ -6,8 +6,7 @@ from .file import FileAgent
class OrchestratorSignature(dspy.Signature):
"""
"""
""" """
question: str = dspy.InputField()
history: dspy.History = dspy.InputField()
@@ -22,9 +21,7 @@ class TheOracle(dspy.Module):
self.tools = [
self.consult_file_expert,
]
self.oracle = dspy.ReAct(
signature=OrchestratorSignature, tools=self.tools, max_iters=10
)
self.oracle = dspy.ReAct(signature=OrchestratorSignature, tools=self.tools, max_iters=10)
def consult_file_expert(self, command: str) -> str:
"""Use this expert when you want to save or retrieve information from files.
+70 -180
View File
@@ -1,228 +1,118 @@
# ingest.py
import os
import json
import dspy
import turso
import requests
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List
from tqdm import tqdm
from langchain_core.embeddings import Embeddings
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from typing import List
from pathlib import Path
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
import dspy
from langchain_community.document_loaders import TextLoader
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from tqdm import tqdm
from embedding import LocalLMEmbeddings
from experts.ingestion_agent import IngestionAgent
# exit()
CHROMA_PATH = "vector_vault"
DATA_DIR = "/home/cosmic/DnD"
def load_documents():
"""
Recursively walk through DATA_DIR and load all .md files as plain text.
Each document gets metadata including source filename and full path.
Ideal for RAG embedding pipelines.
"""
docs = []
# Define loader mapping
loaders = {
".md": TextLoader,
}
data_path = Path(DATA_DIR) # Ensure DATA_DIR is defined elsewhere as a string or Path
data_path = Path(DATA_DIR)
if not data_path.exists() or not data_path.is_dir():
print(f"⚠️ Data directory '{DATA_DIR}' does not exist or is not a directory.")
print(f"⚠️ Data directory '{DATA_DIR}' does not exist.")
return docs
# Walk recursively through all files
for file_path in data_path.rglob("*"):
if file_path.is_file() and file_path.suffix.lower() == ".md":
try:
loader = loaders[file_path.suffix](file_path)
loaded_docs = loader.load()
for file_path in data_path.rglob("*.md"):
try:
loader = TextLoader(str(file_path))
loaded_docs = loader.load()
# Add metadata to each document
for doc in loaded_docs:
doc.metadata["source"] = file_path.name # e.g., "document.md"
doc.metadata["full_path"] = str(file_path) # e.g., "/data/docs/document.md"
for doc in loaded_docs:
# Ensure these keys are set before splitting
doc.metadata["source"] = file_path.name
doc.metadata["full_path"] = str(file_path.absolute())
docs.extend(loaded_docs)
print(f"✅ Loaded: {file_path}") # Remove this line if you want it silent
docs.extend(loaded_docs)
print(f"✅ Loaded: {file_path.name}")
except Exception as e:
print(f"❌ Failed to load {file_path}: {e}")
except Exception as e:
print(f"❌ Failed to load {file_path}: {e}")
print(f"📊 Total documents loaded: {len(docs)}")
return docs
def chunk_documents(docs):
# LangChain preserves metadata during splitting automatically
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=100,
chunk_size=800,
chunk_overlap=100,
separators=["\n\n", "\n", ". ", " ", ""]
)
return text_splitter.split_documents(docs)
def enrich_chunks(chunks: List) -> List:
enriched = []
# Define your base model name — the same for all 10 slots
def enrich_chunks(chunks: list) -> list:
MODEL_BASE = "lm_studio/qwen/qwen3-8b"
API_BASE = "http://192.168.0.49:1234/v1/"
dspy.configure(lm=dspy.LM("lm_studio/qwen/qwen3-8b", api_base="http://192.168.0.49:1234/v1/"))
def process_single_chunk(args):
i, chunk = args
lm_index = i % 8
print(f"Processing chunk {i+1}/{len(chunks)} | using model {lm_index}")
def process_single_chunk(indexed_chunk):
idx, chunk = indexed_chunk
lm_index = idx % 8
try:
with dspy.context(lm=dspy.LM(f"{MODEL_BASE}:{lm_index}", api_base = API_BASE)):
response = IngestionAgent().ingest(note=chunk) # ← Uses thread's selected LM!
answer = response.answer
start = answer.find('{')
end = answer.rfind('}') + 1
json_str = answer[start:end]
metadata = json.loads(json_str)
# Configure context for this specific thread
with dspy.context(lm=dspy.LM(f"{MODEL_BASE}:{lm_index}", api_base=API_BASE)):
# Pass the text, but we will update the original chunk object
response = IngestionAgent().ingest(note=chunk.page_content)
answer = response.answer
start = answer.find("{")
end = answer.rfind("}") + 1
metadata_extracted = json.loads(answer[start:end])
# UPDATE: Put AI data in a sub-key to avoid overwriting 'source'
chunk.metadata["enrichment"] = metadata_extracted
# Also flatten tags for easier searching if needed
if "tags" in metadata_extracted:
chunk.metadata["tags"] = metadata_extracted["tags"]
except Exception as e:
print(f"⚠️ Failed to parse JSON for chunk {i}: {e}")
metadata = {"synopsis": "Summary failed", "tags": ["error"]}
# If enrichment fails, we KEEP the chunk but flag the error
# This ensures 'source' and 'full_path' are NEVER lost
chunk.metadata["enrichment_error"] = str(e)
chunk.metadata["tags"] = ["error"]
# Update the chunk's metadata
chunk.metadata.update(metadata)
return chunk
return idx, chunk
# Run 10 parallel workers — each will pick a different model slot
enriched_results = []
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(process_single_chunk, (i, chunk)) for i, chunk in enumerate(chunks)]
# Wrap chunks in enumerate to keep track of order
futures = [executor.submit(process_single_chunk, (i, c)) for i, c in enumerate(chunks)]
for future in tqdm(as_completed(futures), total=len(chunks), desc="Enriching chunks"):
enriched.append(future.result())
enriched_results.append(future.result())
# Restore original order
enriched.sort(key=lambda x: chunks.index(x))
# Sort by the index (first element of tuple) and return only the chunk
enriched_results.sort(key=lambda x: x[0])
return [item[1] for item in enriched_results]
return enriched
class PrecomputedEmbeddings(Embeddings):
def __init__(self, embeddings: List[List[float]]):
self.embeddings = embeddings # Store all precomputed vectors
def embed_documents(self, texts: List[str]) -> List[List[float]]:
return self.embeddings # Return the precomputed ones (order must match!)
def embed_query(self, text):
return self.embeddings[0]
def embedder(texts: List[str]) -> List[List[float]]:
embeddings = []
base_url = "http://192.168.0.49:1234" # ✅ Add 'http://'
embed_url = f"{base_url}/v1/embeddings"
headers = {"Content-Type": "application/json"}
for text in texts:
payload = {
"model": "text-embedding-qwen3-embedding-8b",
"input": text
}
try:
response = requests.post(embed_url, json=payload, headers=headers) # ✅ POST not GET
if response.status_code == 200:
data = response.json() # ✅ Parse JSON!
embedding = data["data"][0]["embedding"] # ✅ Extract the actual vector
embeddings.append(embedding)
else:
print(f"❌ Embedding failed for '{text[:30]}...': {response.status_code} - {response.text}")
# Optionally: insert placeholder zeros if you need to continue
# embeddings.append([0.0] * 768) # ← adjust dimension as needed!
except Exception as e:
print(f"⚠️ Exception embedding '{text[:30]}...': {e}")
# embeddings.append([0.0] * 768) # fallback
return embeddings
def store_chunks_with_embeddings_locally(chunks, db_path="./local_faiss_db"):
"""
Stores pre-computed chunks and their embeddings into a local FAISS database.
Args:
chunks: list of LangChain Document objects (with page_content and metadata)
embeddings: list of embedding vectors (list of lists of floats) — must match length of chunks
db_path: where to save the FAISS index files locally
"""
texts = [chunk.page_content for chunk in chunks]
embeddings = embedder(texts)
if len(chunks) != len(embeddings):
raise ValueError(f"Mismatch! Got {len(chunks)} chunks but {len(embeddings)} embeddings.")
# Create LangChain Document list (we already have this)
documents = chunks # assuming they're already Document objects
# Build FAISS vectorstore using precomputed embeddings
# FAISS.from_embeddings() lets us pass our own embeddings + texts
vectorstore = FAISS.from_embeddings(
text_embeddings=list(zip([doc.page_content for doc in documents], embeddings)),
embedding=PrecomputedEmbeddings(embeddings[0]) # Well define this next
def store_chunks_locally(chunks, db_path="./local_faiss_db"):
embeddings_model = LocalLMEmbeddings(
model="text-embedding-qwen3-embedding-8b",
base_url="http://192.168.0.49:1234",
batch_size=32,
)
# Save to disk
print(f"Index creation started for {len(chunks)} chunks...")
# FAISS.from_documents extracts metadata directly from the Document objects
vectorstore = FAISS.from_documents(documents=chunks, embedding=embeddings_model)
vectorstore.save_local(db_path)
print(f"✅ Successfully stored {len(chunks)} chunks + embeddings into local FAISS DB at '{db_path}'")
# # Store in Turso
# def store_in_turso(chunks):
# ## needs refactor, not using chroma
# client = turso.PersistentClient(path=CHROMA_PATH)
# collection = client.get_or_create_collection("documents")
# ids = [f"doc_{i}" for i in range(len(chunks))]
# metadatas = [chunk.metadata for chunk in chunks]
# embeddings = embedder(texts)
# collection.add(
# ids=ids,
# documents=texts,
# embeddings=embeddings,
# metadatas=metadatas
# )
# print(f"✅ Successfully stored {len(chunks)} chunks in Chroma DB.")
print(f"✅ Successfully stored in FAISS at '{db_path}'")
return vectorstore
def main():
print("🔍 Loading documents...")
docs = load_documents()
if not docs:
print("⚠️ No files found in 'documents/'. Add some PDFs, TXT, or DOCX.")
return
print(f"📄 Loaded {len(docs)} documents. Splitting into chunks...")
if not docs: return
chunks = chunk_documents(docs)
print(f"🧩 Created {len(chunks)} chunks.")
print("🧠 Generating summaries and tags using local LLM... (this may take a few minutes)")
enriched_chunks = enrich_chunks(chunks)
print("💾 Storing in vector database...")
store_chunks_with_embeddings_locally(enriched_chunks)
store_chunks_locally(enriched_chunks)
print("🎉 Ingestion complete!")
if __name__ == "__main__":
+19 -12
View File
@@ -1,14 +1,16 @@
import chromadb
import streamlit as st
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_community.llms import Ollama
from langchain_core.prompts import PromptTemplate
import chromadb
# CONFIG
BASE_IP = "192.168.0.49"
LM_STUDIO_PORT = "1234"
CHROMA_PATH = "vector_db"
MODEL_NAME = "lmstudio-community/qwen/qwen3-next-80b-a3b-instruct-q8_0.gguf" # Use "llama3", "phi3", etc.
MODEL_NAME = (
"lmstudio-community/qwen/qwen3-next-80b-a3b-instruct-q8_0.gguf" # Use "llama3", "phi3", etc.
)
EMBEDDING_MODEL = "all-MiniLM-L6-v2"
# Load embedding model
@@ -42,8 +44,9 @@ prompt = PromptTemplate.from_template(prompt_template)
st.title("📄 Local RAG Knowledge Assistant")
st.write("Upload files to `documents/` and run `ingest.py` first.")
query = st.text_input("Ask a question about your documents:",
placeholder="What are the key financial metrics?")
query = st.text_input(
"Ask a question about your documents:", placeholder="What are the key financial metrics?"
)
if query:
with st.spinner("Searching for relevant info..."):
@@ -52,9 +55,7 @@ if query:
# Retrieve top 5 most similar chunks
results = collection.query(
query_embeddings=[query_embedding],
n_results=5,
include=["documents", "metadatas"]
query_embeddings=[query_embedding], n_results=5, include=["documents", "metadatas"]
)
documents = results["documents"][0]
@@ -65,8 +66,11 @@ if query:
for i, doc in enumerate(documents):
meta = metadatas[i]
synopsis = meta.get("synopsis", "No summary")
tags = ", ".join(meta.get("tags", [])) if isinstance(
meta.get("tags"), list) else str(meta.get("tags"))
tags = (
", ".join(meta.get("tags", []))
if isinstance(meta.get("tags"), list)
else str(meta.get("tags"))
)
source = meta.get("source", "Unknown")
context += f"""
@@ -92,7 +96,10 @@ Source: {source}
for i, doc in enumerate(documents):
meta = metadatas[i]
source = meta.get("source", "Unknown")
tags = ", ".join(meta.get("tags", [])) if isinstance(
meta.get("tags"), list) else str(meta.get("tags"))
tags = (
", ".join(meta.get("tags", []))
if isinstance(meta.get("tags"), list)
else str(meta.get("tags"))
)
st.markdown(f"**Source**: `{source}` | **Tags**: {tags}")
st.text_area(f"Snippet {i+1}", doc, height=120, disabled=True)
st.text_area(f"Snippet {i + 1}", doc, height=120, disabled=True)
+109
View File
@@ -0,0 +1,109 @@
import sys
import dspy
from langchain_community.vectorstores import FAISS
from embedding import LocalLMEmbeddings
from pathlib import Path
# --- DSPy Signature ---
class DnDContextQA(dspy.Signature):
"""Answer DnD campaign questions using provided snippets and full file context."""
context = dspy.InputField(desc="Relevant chunks and full file contents from the campaign notes.")
question = dspy.InputField()
answer = dspy.OutputField(desc="A detailed answer based on the notes, citing the source file.")
# --- DSPy Module ---
class DnDRAG(dspy.Module):
def __init__(self, db_path="./local_faiss_db", k=3):
super().__init__()
# 1. Setup Embeddings & Load FAISS
self.embeddings = LocalLMEmbeddings(
model="text-embedding-qwen3-embedding-8b",
base_url="http://192.168.0.49:1234"
)
self.vectorstore = FAISS.load_local(
db_path, self.embeddings, allow_dangerous_deserialization=True
)
self.k = k
# 2. Setup the Predictor (Chain of Thought for better reasoning)
self.generate_answer = dspy.ChainOfThought(DnDContextQA)
def get_full_file_content(self, file_path):
"""Helper to read the full source file if it exists."""
try:
return Path(file_path).read_text(encoding='utf-8')
except Exception:
return ""
def forward(self, question):
# 1. Search for top-k chunks
results = self.vectorstore.similarity_search(question, k=self.k)
# 2. Extract unique file paths to load "Full Context"
# This prevents the LLM from being 'blind' to the rest of a relevant session note
unique_paths = list(set([doc.metadata.get("full_path") for doc in results]))
context_parts = []
for i, doc in enumerate(results):
source = doc.metadata.get("source", "Unknown")
context_parts.append(f"--- Chunk {i+1} from {source} ---\n{doc.page_content}")
# 3. Add the Full Content of the top match (optional, but requested!)
# We'll just take the top 1 file to avoid context window explosion
if unique_paths:
top_file_content = self.get_full_file_content(unique_paths[0])
context_parts.append(f"\n=== FULL SOURCE FILE: {Path(unique_paths[0]).name} ===\n{top_file_content[:10000]}")
# 4. Join everything into one context string
context_str = "\n\n".join(context_parts)
# 5. Generate Response
prediction = self.generate_answer(context=context_str, question=question)
return dspy.Prediction(answer=prediction.answer, context=context_str)
def main():
# 1. Setup the LLM
print("🚀 Initializing Qwen-8B via LM Studio...")
lm = dspy.LM("lm_studio/qwen/qwen3-8b", api_base="http://192.168.0.49:1234/v1/")
dspy.configure(lm=lm)
# 2. Load the RAG System (only happens once!)
print("📚 Loading FAISS index and campaign notes...")
try:
rag_system = DnDRAG()
print("✅ Ready! Ask me anything about the campaign. (Type 'exit' or 'q' to quit)")
except Exception as e:
print(f"❌ Failed to initialize: {e}")
return
# 3. Interactive Loop
while True:
try:
print("\n" + "" * 30)
query = input("📝 Query: ").strip()
# Exit conditions
if query.lower() in ["exit", "quit", "q"]:
print("Farewell, traveler. Good luck on your quest!")
break
if not query:
continue
print("🔍 Searching and thinking...")
response = rag_system(question=query)
# Print Response
print("\n📜 AI RESPONSE:")
print(response.answer)
except KeyboardInterrupt:
print("\n\nExiting... See you next session!")
sys.exit(0)
except Exception as e:
print(f"\n⚠️ An error occurred: {e}")
if __name__ == "__main__":
main()
+31
View File
@@ -0,0 +1,31 @@
from langchain_community.vectorstores import FAISS
from embedding import LocalLMEmbeddings
def retrieve_enriched_context(query, db_path="./local_faiss_db"):
# 1. Re-initialize the same embedding model
embeddings_model = LocalLMEmbeddings(
model="text-embedding-qwen3-embedding-8b", base_url="http://192.168.0.49:1234"
)
# 2. Load the index from disk
# allow_dangerous_deserialization is required because FAISS uses pickle
vectorstore = FAISS.load_local(db_path, embeddings_model, allow_dangerous_deserialization=True)
# 3. Perform the search
# k=4 means "bring back the top 4 most relevant chunks"
results_with_scores = vectorstore.similarity_search_with_score(query, k=4)
return results_with_scores
# --- Example Usage ---
query = "the party get free bread but i cant remember why?"
hits = retrieve_enriched_context(query)
for doc, score in hits:
print(f"\n🎯 [Score: {score:.4f}]")
print(f"📄 Content: {doc.page_content[:200]}...")
print(f"🛠️ Metadata (Enrichment): {doc.metadata}")
# print(f"doc: {doc}")