1 Commits

Author SHA1 Message Date
Jake 1e20a5452f fix: 🐛 more stable ingestion 2026-03-08 17:28:29 +00:00
11 changed files with 124 additions and 393 deletions
-4
View File
@@ -1,4 +0,0 @@
[submodule "toon-python"]
path = toon-python
url = https://github.com/toon-format/toon-python.git
branch = main
+7 -3
View File
@@ -11,11 +11,15 @@
## Planned Next
* AI in the middle - make the llm generate multiple queries for a wider search
* database retrieve for tag or entity
## Planned Later
* entity chunking & re-ranking
* Logging in Ingestion
* database retrieve for tag or entity
*
* More robust ingestion - llm response sometimes out of expected
## Done
* AI in the middle - make the llm generate multiple queries for a wider search
+30 -21
View File
@@ -12,12 +12,11 @@ models:
# --- Ingestion Settings ---
ingestion:
data_dir: "/home/jake/dnd_test/"
data_dir: "/home/jake/DnD"
db_path: "./data/"
db_name: "dmv.db"
toon_dir: "./data/toon_files"
active_llms: 2
parallel_requests_per_llm: 2
parallel_requests_per_llm: 4
chunk_size: 800
chunk_overlap: 100
embedding_batch_size: 32
@@ -26,27 +25,37 @@ ingestion:
# ---- Agent Settings ----
ingestion_agent:
ingestion_signature: |
You are an expert Dungeon Master's assistant.
Analyze the provided notes and extract a concise synopsis and relevant metadata.
synopsis = A one-sentence summary of the document.
tags = Relevant tags (NPCs, Locations, Items, Plot Points).
entities = A list of Key names of people, places, or factions found in the document.
relationships = A list of object relationships between entities. For each pair of entities that appear together,
specify their relationship type (ally, enemy, mentor, servant, family, business_partner, etc.)
and connection strength (1-5 based on how often they appear together).
Format: [{"entity1": "Name", "entity2": "Name", "type": "relationship_type", "strength": int}, ...]
Output ONLY the metadata dictionary with these keys.
You are an expert Dungeon Master's assistant specialized in campaign note enrichment.
Your task is to analyze DnD session notes and extract structured metadata.
Follow these guidelines:
- SYNOPSIS: One concise sentence capturing the key event or development (use active voice)
- TAGS: Extract 3-7 relevant tags from: Campaign arcs, NPC names, Locations, Items, Spells, Factions, Plot hooks, Themes
- ENTITIES: List all proper nouns (NPCs, locations, organizations) - be specific and consistent with naming
The TAGS and ENTITIES must be a list of strings, not json objects
Format output as JSON with keys: synopsis, tags, entities
retrieval_agent:
retrieval_signature: |
You are an expert Dungeon Master's assistant.
Given the context and the question, answer the question.
Do not make things up, base all of your answers on the context.
Always site the file location of your source of information.
You are an expert Dungeon Master's assistant helping to run a campaign.
When answering questions about your DnD world:
1. Strictly use ONLY the provided context from campaign notes
2. If information is incomplete, infer plausibly based on established lore (flag inferences)
3. Always cite sources: "Per [filename], [quote/summary]"
4. Maintain character voice and narrative style when appropriate
5. For rules questions, distinguish between rules-as-written and DM interpretation
Provide comprehensive answers that help you run the game, including relevant details about NPCs, locations, or plot points.
expansion_agent:
expansion_signature: |
You are a query expansion expert, specialised in Dungeons and Dragons.
Given a user's question, generate 3-5 similar but enhanced search queries that would help find more relevant information.
Each expanded query should be distinct and add different perspective to the original question.
Return only the queries as a JSON list with key "queries"."""
You are a query expansion expert specialized in Dungeons & Dragons campaign management.
Given a user question about their DnD world, generate 3-5 enhanced search queries that:
- Cover different aspects (characters, locations, lore, rules)
- Include synonyms and related terms (e.g., "dragon" → "wyrm", "scales" → "armor")
- Address potential follow-up questions the DM might have
- Vary specificity (broad to narrow)
Return ONLY a JSON array with key "queries". Keep queries concise (5-10 words each).
-13
View File
@@ -1,13 +0,0 @@
the idea here is to drop the vectors and semantic search, in favour of optimised knowledge base and llm tool calling.
the current implementation loads the closest semantic chunks based on semantics.
what if.
we ingest and enrich with a focus on tagging entities (knowing our qa will be around entities)
we transform, grouping all entity related infornation together
we load that grouped information out into toon files.
we give the agent a tool to load 1 or more toon file based on entites in the question.
the context window for modern llm is big enough to fit the entire campain notes, but we still risk poison or confusion if we fill the context window with irrelevant notes.
also wonder if we should give the full file at enrichment rather than chunks? worth experimenting...
+3 -13
View File
@@ -6,16 +6,6 @@ def load_config(config_path="config.yaml"):
return yaml.safe_load(f)
def update_ingestion_signature(new_signature: str):
"""Update the ingestion signature in config.yaml for relationship extraction."""
import yaml
with open("config.yaml") as f:
cfg = yaml.safe_load(f)
cfg["ingestion_agent"]["ingestion_signature"] = new_signature
with open("config.yaml", "w") as f:
yaml.dump(cfg, f, default_flow_style=False)
return cfg
# Usage example:
# CFG = load_config()
# print(CFG['api']['base_url'])
+1 -28
View File
@@ -11,37 +11,10 @@ class IngestionSignature(dspy.Signature):
note: str = dspy.InputField(desc="The DM notes or session recap content.")
answer: dict[str, str | List] = dspy.OutputField(
desc="the metadata dictionary with the keys; synopsis, tags, entities, relationships"
desc="the metadata dictionary with the keys; synopsis, tags, entities"
)
class IngestionAgent(dspy.Module):
def __init__(self):
self.ingest = dspy.Predict(IngestionSignature)
def ingest_with_relationships(self, note: str) -> dict:
"""Ingest notes and return metadata including extracted relationships."""
response = self.ingest(note=note)
result = response.answer
if not isinstance(result, dict):
result = {
"synopsis": "Failed to parse",
"tags": [],
"entities": [],
"relationships": [],
}
if "relationships" not in result:
entities = result.get("entities", [])
relationships = []
for i, ent1 in enumerate(entities):
for ent2 in entities[i + 1 :]:
relationships.append(
{"entity1": ent1, "entity2": ent2, "type": "co-occurs_with", "strength": 1}
)
result["relationships"] = relationships
return result
+77 -72
View File
@@ -1,31 +1,34 @@
import os
from pathlib import Path
import dspy
import turso
from config_loader import load_config
from toon_utils import decode_entity_toon, sanitize_entity_name
from embedding import LocalLMEmbeddings
CFG = load_config()
TOON_DIR = CFG["ingestion"]["toon_dir"]
DATABASE_PATH = CFG["ingestion"]["db_path"]
DATABASE_NAME = CFG["ingestion"]["db_name"]
EMBEDDING_MODEL = CFG["models"]["embedding"]
API_BASE = CFG["api"]["base_url"]
RETRIEVAL_CONFIG = CFG["retrieval_agent"]
EXPANSION_CONFIG = CFG["expansion_agent"]
class EntityLookupSignature(dspy.Signature):
"""Look up entity information from TOON files."""
question: str = dspy.InputField(desc="The user's question containing entity names.")
answer: str = dspy.OutputField(
desc="Comma-separated list of entity names found in the question."
)
class FileLookupSignature(dspy.Signature):
"""Extract file paths mentioned in questions."""
question: str = dspy.InputField()
answer: str = dspy.OutputField(desc="Comma-separated list of file paths.")
def retrieve_from_turso(embedded_question, k=5):
query = f"""
SELECT file_path, synopsis, tags, entities, chunk_data,
vector_distance_cos(embedding, vector32('{embedded_question}')) AS distance
FROM notes
ORDER BY distance ASC
LIMIT {k};
"""
con = turso.connect(DATABASE_PATH + DATABASE_NAME)
cur = con.cursor()
cur.execute(query)
rows = cur.fetchall()
return rows
class DnDContextQA(dspy.Signature):
@@ -36,81 +39,83 @@ class DnDContextQA(dspy.Signature):
answer = dspy.OutputField(desc="A detailed answer based on the notes, citing the source file.")
class ExpansionSignature(dspy.Signature):
f"{EXPANSION_CONFIG['expansion_signature']}"
question = dspy.InputField()
answer = dspy.OutputField(
desc="A list of questions that will be used to vector search the database."
)
class DnDRAG(dspy.Module):
def __init__(self):
super().__init__()
self.embeddings_model = LocalLMEmbeddings(
model=EMBEDDING_MODEL,
base_url=API_BASE,
# batch_size=1,
)
self.retrieval_lm = dspy.LM(
model=CFG["models"]["retrieval"],
api_base=CFG["api"]["base_url"] + CFG["api"]["api_version"],
model=CFG["models"]["retrieval"], api_base=API_BASE + CFG["api"]["api_version"]
)
with dspy.context(lm=self.retrieval_lm, signature=ExpansionSignature):
self.query_expander = dspy.Predict("question -> queries:list[str]")
self.entity_extractor = dspy.Predict(EntityLookupSignature)
self.file_extractor = dspy.Predict(FileLookupSignature)
self.generate_answer = dspy.ReAct(
signature=DnDContextQA, tools=[self.load_entity, self.load_file]
)
self.tools = [self.load_file]
self.generate_answer = dspy.ReAct(signature=DnDContextQA, tools=self.tools)
def forward(self, question):
print("Processing query with TOON-based retrieval...")
print("Enhancing Question")
with dspy.context(lm=self.retrieval_lm):
entities_resp = self.entity_extractor(question=question)
entity_list = [e.strip() for e in entities_resp.answer.split(",")]
expanded_queries = self.query_expander(question=question).queries
print("Enhanced Queries:")
for q in expanded_queries:
print(" ", q)
all_embeddings = self.embeddings_model.embed_documents([question] + expanded_queries)
# print(all_embeddings)
all_results = []
for embedded_question in all_embeddings:
results = retrieve_from_turso(embedded_question, k=5)
all_results.extend(results)
for entity_name in entity_list:
if not entity_name:
continue
entity_data = self.load_entity(entity_name)
if entity_data:
all_results.append(f"Entity: {entity_name}\n{entity_data}")
seen = set()
unique_results = []
for row in all_results:
key = (row[0], row[4])
if key not in seen:
seen.add(key)
unique_results.append(row)
with dspy.context(lm=self.retrieval_lm):
files_resp = self.file_extractor(question=question)
context_parts = []
for i, row in enumerate(unique_results):
source = row[0]
synopsis = row[1]
tags = row[2]
entities = row[3]
content = row[4]
closeness = row[5]
file_list = [f.strip() for f in files_resp.answer.split(",")]
context_parts.append(f"""
--- Chunk {i + 1} from {source} ---
synopsis: {synopsis},
tags: {tags},
entities: {entities},
closeness: {closeness},
{content}
""")
for file_path in file_list:
if not file_path:
continue
file_content = self.load_file(file_path)
if file_content:
all_results.append(f"File: {file_path}\n{file_content}")
context = "\n\n".join(all_results) if all_results else "No relevant information found."
context = "\n\n".join(context_parts)
prediction = self.generate_answer(context=context, question=question)
return dspy.Prediction(answer=prediction.answer, context=context)
def load_entity(self, entity_name: str) -> str | None:
"""Load and decode entity data from TOON file."""
sanitized = sanitize_entity_name(entity_name)
toon_path = Path(TOON_DIR) / f"{sanitized}.toon"
if not toon_path.exists():
return None
try:
with open(toon_path, "r", encoding="utf-8") as f:
content = f.read()
decoded = decode_entity_toon(content)
return str(decoded)
except Exception as e:
print(f"Error loading entity {entity_name}: {e}")
return None
def load_file(self, file_path: str) -> str | None:
"""Load and return specified file content."""
def load_file(self, file_path) -> str | None:
"""Load and return specified file."""
if os.path.exists(file_path):
try:
with open(file_path, encoding="utf-8") as f:
return f.read()
except Exception as e:
print(f"Error reading file {file_path}: {e}")
with open(file_path) as file:
return file.read()
except Exception:
return None
else:
return None
+6 -6
View File
@@ -12,7 +12,6 @@ from tqdm import tqdm
from config_loader import load_config
from embedding import LocalLMEmbeddings
from experts.ingestion_agent import IngestionAgent
from toon_utils import save_entities_from_chunks
CFG = load_config()
DATA_DIR = CFG["ingestion"]["data_dir"]
@@ -177,8 +176,8 @@ def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> L
print(f"⚠️ Batch processing failed at index {i}: {e}")
# Fallback: process individually (if needed)
for j, chunk in enumerate(batch):
content = chunk.page_content
try:
content = chunk.page_content
embedding = embeddings_model.embed_query(content)
file_path_orig = chunk.metadata.get("full_path", "unknown")
@@ -207,7 +206,7 @@ def embed_chunks(chunks: List[Any], batch_size: int = EMBEDDING_BATCH_SIZE) -> L
{
"file_path": normalize_path(chunk.metadata.get("full_path", "unknown")),
"file_name": chunk.metadata.get("source", "unknown"),
"chunk_data": chunk.page_content,
"chunk_data": content,
"synopsis": "Embedding failed",
"tags": ["error"],
"entities": [],
@@ -251,7 +250,10 @@ def save_to_db(chunk_dicts):
entry["chunk_data"],
entry["synopsis"],
",".join(entry["tags"]), # Store as comma-separated string
",".join(e.get("name", str(e)) if isinstance(e, dict) else str(e) for e in entry["entities"]), # Store as comma-separated string
",".join(
str(e) if isinstance(e, str) else e.get("name", str(e))
for e in entry["entities"]
), # Store as comma-separated string
embedding_str,
entry["timestamp"],
)
@@ -371,8 +373,6 @@ def main():
embedded_chunks = embed_chunks(enriched_chunks)
print(f"Embedded {len(embedded_chunks)} chunks.")
save_entities_from_chunks(embedded_chunks)
# remove existing rows from notes table that match file path
delete_from_db(embedded_chunks)
-11
View File
@@ -1,11 +0,0 @@
from toon_utils import encode_entity_toon, sanitize_entity_name
test_name = "Goblin King"
sanitized = sanitize_entity_name(test_name)
print(f"Original: {test_name} -> Sanitized: {sanitized}")
relationships = [
{"entity1": "Goblin King", "entity2": "Orc Commander", "type": "enemy", "strength": 5}
]
content_refs = [{"file": "session_001.txt", "chunk_index": 0}]
toon_data = encode_entity_toon(test_name, "npc", relationships, content_refs)
print(f"TOON encoded (first 200 chars): {toon_data[:200]}")
-221
View File
@@ -1,221 +0,0 @@
import sys
from pathlib import Path
from typing import Any
sys.path.insert(0, "/home/jake/source/dungeon_masters_vault/toon-python/src")
try:
from toon_format import decode as toon_decode
from toon_format import encode as toon_encode
except ImportError:
raise ImportError(
"toon_format not found. Ensure the toon-python library is installed and available.\n"
"Install with: pip install -e /path/to/toon-python"
)
from config_loader import load_config
CFG = load_config()
TOON_DIR = Path(CFG["ingestion"]["toon_dir"])
def sanitize_entity_name(name: str) -> str:
"""Convert entity name to valid filename: lowercase, underscores for spaces, remove special chars."""
import re
name = name.lower().strip()
name = name.replace(" ", "_")
name = re.sub(r"[^a-z0-9_]", "", name)
return name
def encode_entity_toon(
entity_name: str, entity_type: str, relationships: list[dict], content_references: list[dict]
) -> str:
"""Encode entity data to TOON format."""
data = {
"entity": [{"name": entity_name, "type": entity_type}],
"relationships": relationships,
"content_references": content_references,
}
return toon_encode(data)
def decode_entity_toon(toon_content: str) -> dict[str, Any]:
"""Decode TOON content back to Python dictionary."""
return toon_decode(toon_content)
def save_entity_toon(
entity_name: str,
entity_type: str,
relationships: list[dict],
content_references: list[dict],
output_dir: Path | None = None,
) -> Path:
"""Save entity data as a TOON file and return the path."""
if output_dir is None:
output_dir = Path(TOON_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
sanitized_name = sanitize_entity_name(entity_name)
toon_path = output_dir / f"{sanitized_name}.toon"
toon_content = encode_entity_toon(entity_name, entity_type, relationships, content_references)
with open(toon_path, "w", encoding="utf-8") as f:
f.write(toon_content)
return toon_path
def load_entity_toon(entity_name: str, input_dir: Path | None = None) -> dict[str, Any] | None:
"""Load and decode a TOON file for an entity."""
if input_dir is None:
input_dir = Path(TOON_DIR)
sanitized_name = sanitize_entity_name(entity_name)
toon_path = input_dir / f"{sanitized_name}.toon"
if not toon_path.exists():
return None
with open(toon_path, "r", encoding="utf-8") as f:
content = f.read()
return decode_entity_toon(content)
def build_co_occurrence_graph(chunks_with_entities: list[dict]) -> dict[str, dict]:
"""
Build a co-occurrence graph from enriched chunks.
Each chunk contains entities field with list of entity names found in that chunk.
Returns: dict mapping each entity to dict of related entities
"""
graph = {}
for chunk_data in chunks_with_entities:
entities_in_chunk = chunk_data.get("entities", [])
if not isinstance(entities_in_chunk, list) or len(entities_in_chunk) < 2:
continue
for i, entity1 in enumerate(entities_in_chunk):
if entity1 not in graph:
graph[entity1] = {}
for entity2 in entities_in_chunk[i + 1 :]:
if entity2 not in graph[entity1]:
graph[entity1][entity2] = {
"relationship_type": "co-occurs_with",
"count": 0,
"sources": [],
}
graph[entity1][entity2]["count"] += 1
source_info = {
"file": chunk_data.get("file_name", "unknown"),
"chunk_index": chunk_data.get("original_index", 0),
}
if source_info not in graph[entity1][entity2]["sources"]:
graph[entity1][entity2]["sources"].append(source_info)
return graph
def format_relationships_for_toon(relationships: dict[str, dict]) -> list[dict]:
"""Convert relationship graph data to TOON-friendly format."""
result = []
for related_entity, info in relationships.items():
result.append(
{
"entity_name": related_entity,
"relationship_type": info.get("relationship_type", "co-occurs_with"),
"connection_strength": info.get("count", 1),
"source_count": len(info.get("sources", [])),
}
)
return result
def save_entities_from_chunks(
enriched_chunks: list[dict], output_dir: Path | None = None
) -> dict[str, str]:
"""
Extract unique entities from chunks and save as individual TOON files.
Args:
enriched_chunks: List of chunk dicts with 'entities' and 'relationships' fields
output_dir: Directory to save TOON files (defaults to config toon_dir)
Returns:
Dict mapping entity names to their TOON file paths
"""
if output_dir is None:
output_dir = TOON_DIR
output_dir.mkdir(parents=True, exist_ok=True)
entity_to_file_map = {}
for chunk_data in enriched_chunks:
entities = chunk_data.get("entities", [])
relationships = chunk_data.get("relationships", [])
if not isinstance(entities, list) or len(entities) == 0:
continue
source_info = {
"file": chunk_data.get("file_name", "unknown"),
"chunk_index": chunk_data.get("original_index", 0),
}
for entity_item in entities:
if isinstance(entity_item, dict):
entity_name = entity_item.get("name", entity_item.get("entity", ""))
else:
entity_name = str(entity_item)
if not entity_name:
continue
sanitized = sanitize_entity_name(entity_name)
if sanitized not in entity_to_file_map:
toon_path = output_dir / f"{sanitized}.toon"
entity_type = "npc"
content_refs = [source_info]
rels_for_entity = format_relationships_for_toon(
{
r.get("entity2", r.get("entity_name", "")): r
for r in relationships
if r.get("entity1") == entity_name or r.get("entity_name") == entity_name
}
)
toon_content = encode_entity_toon(
entity_name, entity_type, rels_for_entity, content_refs
)
with open(toon_path, "w", encoding="utf-8") as f:
f.write(toon_content)
entity_to_file_map[sanitized] = str(toon_path)
else:
toon_path = Path(entity_to_file_map[sanitized])
existing = load_entity_toon(entity_name, output_dir) or {}
if "content_references" not in existing:
existing["content_references"] = []
existing["content_references"].append(source_info)
with open(toon_path, "w", encoding="utf-8") as f:
f.write(toon_encode(existing))
return entity_to_file_map
Submodule toon-python deleted from 90861444e5