feat: ✨ Refactor it Turso!
This commit is contained in:
+2
-1
@@ -12,11 +12,12 @@ models:
|
||||
# --- Ingestion Settings ---
|
||||
ingestion:
|
||||
data_dir: "/home/cosmic/DnD"
|
||||
db_path: "./local_faiss_db"
|
||||
db_path: "./data/dmv.db"
|
||||
max_workers: 8
|
||||
chunk_size: 800
|
||||
chunk_overlap: 100
|
||||
embedding_batch_size: 32
|
||||
time_file_location: "./data/time_file.txt"
|
||||
|
||||
# --- Retrieval Settings ---
|
||||
retrieval:
|
||||
|
||||
+198
-11
@@ -1,6 +1,7 @@
|
||||
import turso
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from pathlib import Path
|
||||
|
||||
from datetime import datetime
|
||||
import dspy
|
||||
from langchain_community.document_loaders import TextLoader
|
||||
from langchain_community.vectorstores import FAISS
|
||||
@@ -22,8 +23,9 @@ MAX_WORKERS=CFG["ingestion"]["max_workers"]
|
||||
CHUNK_SIZE=CFG["ingestion"]["chunk_size"]
|
||||
CHUNK_OVERLAP=CFG["ingestion"]["chunk_overlap"]
|
||||
EMBEDDING_BATCH_SIZE=CFG["ingestion"]["embedding_batch_size"]
|
||||
TIMEFILE = CFG["ingestion"]["time_file_location"]
|
||||
|
||||
def load_documents():
|
||||
def load_documents(last_update_time):
|
||||
docs = []
|
||||
data_path = Path(DATA_DIR)
|
||||
|
||||
@@ -32,6 +34,10 @@ def load_documents():
|
||||
return docs
|
||||
|
||||
for file_path in data_path.rglob("*.md"):
|
||||
file_modified_date = datetime.fromtimestamp(file_path.stat().st_mtime)
|
||||
|
||||
if file_modified_date < last_update_time:
|
||||
continue
|
||||
try:
|
||||
loader = TextLoader(str(file_path))
|
||||
loaded_docs = loader.load()
|
||||
@@ -90,31 +96,212 @@ def enrich_chunks(chunks: list) -> list:
|
||||
return [item[1] for item in enriched_results]
|
||||
|
||||
|
||||
def store_chunks_locally(chunks, db_path=DATABASE_PATH):
|
||||
def embed_chunks(chunks):
|
||||
"""
|
||||
Embed chunks and return a list of dictionaries with full metadata.
|
||||
Each dict contains:
|
||||
- file_path
|
||||
- file_name
|
||||
- chunk_data
|
||||
- synopsis
|
||||
- tags
|
||||
- entities
|
||||
- embedding
|
||||
"""
|
||||
embeddings_model = LocalLMEmbeddings(
|
||||
model=EMBEDDING_MODEL,
|
||||
base_url=API_BASE,
|
||||
batch_size=EMBEDDING_BATCH_SIZE,
|
||||
)
|
||||
|
||||
print(f"Index creation started for {len(chunks)} chunks...")
|
||||
# FAISS.from_documents extracts metadata directly from the Document objects
|
||||
vectorstore = FAISS.from_documents(documents=chunks, embedding=embeddings_model)
|
||||
vectorstore.save_local(db_path)
|
||||
print(f"✅ Successfully stored in FAISS at '{db_path}'")
|
||||
return vectorstore
|
||||
# Prepare list of dictionaries for output
|
||||
embedded_chunks = []
|
||||
|
||||
for i, chunk in enumerate(tqdm(chunks, desc="Embedding chunks")):
|
||||
try:
|
||||
# Extract metadata
|
||||
file_path = chunk.metadata.get("full_path", "unknown")
|
||||
file_name = chunk.metadata.get("source", "unknown")
|
||||
content = chunk.page_content
|
||||
|
||||
# Extract enriched metadata (from IngestionAgent)
|
||||
synopsis = chunk.metadata.get("synopsis", "No summary")
|
||||
tags = chunk.metadata.get("tags", [])
|
||||
entities = chunk.metadata.get("entities", [])
|
||||
|
||||
# Generate embedding
|
||||
embedding = embeddings_model.embed_query(content)
|
||||
|
||||
# Create structured dictionary
|
||||
chunk_data = {
|
||||
"file_path": file_path,
|
||||
"file_name": file_name,
|
||||
"chunk_data": content,
|
||||
"synopsis": synopsis,
|
||||
"tags": tags,
|
||||
"entities": entities,
|
||||
"embedding": embedding,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
embedded_chunks.append(chunk_data)
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Failed to embed chunk {i}: {e}")
|
||||
# Fallback entry
|
||||
embedded_chunks.append({
|
||||
"file_path": chunk.metadata.get("full_path", "unknown"),
|
||||
"file_name": chunk.metadata.get("source", "unknown"),
|
||||
"chunk_data": content,
|
||||
"synopsis": "Embedding failed",
|
||||
"tags": ["error"],
|
||||
"entities": [],
|
||||
"embedding": [],
|
||||
"timestamp": datetime.now().isoformat()
|
||||
})
|
||||
|
||||
return embedded_chunks
|
||||
|
||||
|
||||
def save_to_db(chunk_dicts):
|
||||
"""
|
||||
Save a list of dictionaries to the Turso database.
|
||||
Each dict maps to a row in the 'notes' table.
|
||||
"""
|
||||
print('connecting to db')
|
||||
con = turso.connect(DATABASE_PATH)
|
||||
print('opening cursor')
|
||||
cur = con.cursor()
|
||||
|
||||
# SQL with named placeholders for clarity and safety
|
||||
insert_sql = """
|
||||
INSERT INTO notes (
|
||||
file_path, file_name, chunk_data, synopsis, tags, entities, embedding, timestamp
|
||||
) VALUES (?, ?, ?, ?, ?, ?, vector32(?), ?)
|
||||
"""
|
||||
|
||||
# Prepare batch data: convert each dict to a tuple in correct order
|
||||
batch_data = []
|
||||
for entry in chunk_dicts:
|
||||
# Convert list of floats to comma-separated string for Turso vector32
|
||||
embedding_str = str(entry["embedding"])
|
||||
|
||||
batch_data.append((
|
||||
entry["file_path"],
|
||||
entry["file_name"],
|
||||
entry["chunk_data"],
|
||||
entry["synopsis"],
|
||||
",".join(entry["tags"]), # Store as comma-separated string
|
||||
",".join(entry["entities"]), # Store as comma-separated string
|
||||
embedding_str,
|
||||
entry["timestamp"]
|
||||
))
|
||||
print('data to insert:',len(batch_data))
|
||||
# Execute batch insert
|
||||
cur.executemany(insert_sql, batch_data)
|
||||
con.commit()
|
||||
con.close()
|
||||
|
||||
print(f"✅ Saved {len(batch_data)} chunks to database.")
|
||||
|
||||
def create_db():
|
||||
con = turso.connect(DATABASE_PATH)
|
||||
cur = con.cursor()
|
||||
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS notes (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
file_path TEXT NOT NULL,
|
||||
file_name TEXT NOT NULL,
|
||||
chunk_data TEXT NOT NULL,
|
||||
synopsis TEXT,
|
||||
tags TEXT, -- comma-separated
|
||||
entities TEXT, -- comma-separated
|
||||
embedding F32_BLOB(4096),
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(file_path, chunk_data) -- avoid duplicates
|
||||
)
|
||||
""")
|
||||
|
||||
# Indexes for faster queries
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding ON notes(embedding);")
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS idx_file_path ON notes(file_path);")
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS idx_tags ON notes(tags);")
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS idx_synopsis ON notes(synopsis);")
|
||||
|
||||
con.commit()
|
||||
con.close()
|
||||
print("✅ Database and indexes created.")
|
||||
|
||||
def get_last_update_time():
|
||||
try:
|
||||
with open(TIMEFILE, "r") as file:
|
||||
last_update_str = file.read()
|
||||
last_update = datetime.strptime(last_update_str,"%Y/%m/%d - %H:%M:%S")
|
||||
except FileNotFoundError:
|
||||
print("File Not found, setting time to ingest all files")
|
||||
last_update = datetime(year=2000,month=1,day=1)
|
||||
return last_update
|
||||
|
||||
def update_timefile():
|
||||
current_time = datetime.now()
|
||||
current_time_str = current_time.strftime("%Y/%m/%d - %H:%M:%S")
|
||||
with open(TIMEFILE, "w") as file:
|
||||
file.write(current_time_str)
|
||||
return current_time_str
|
||||
|
||||
|
||||
def main():
|
||||
docs = load_documents()
|
||||
create_db()
|
||||
last_update_time = get_last_update_time()
|
||||
print(f"Last update time: {last_update_time}")
|
||||
|
||||
docs = load_documents(last_update_time)
|
||||
if not docs:
|
||||
print("No Recently Updated Files to Ingest")
|
||||
return
|
||||
|
||||
chunks = chunk_documents(docs)
|
||||
print(f"Split into {len(chunks)} chunks.")
|
||||
|
||||
enriched_chunks = enrich_chunks(chunks)
|
||||
store_chunks_locally(enriched_chunks)
|
||||
print(f"Enriched {len(enriched_chunks)} chunks.")
|
||||
|
||||
embedded_chunks = embed_chunks(enriched_chunks)
|
||||
print(f"Embedded {len(embedded_chunks)} chunks.")
|
||||
|
||||
save_to_db(embedded_chunks)
|
||||
print("🎉 Ingestion complete!")
|
||||
|
||||
updated = update_timefile()
|
||||
print(f"Updated timefile to: {updated}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
#TODO: create function to delete rows that match new files coming in
|
||||
# and handle danabase insert fails
|
||||
"""
|
||||
Traceback (most recent call last):
|
||||
File "/home/cosmic/source/dungeon_masters_vault/.venv/lib/python3.13/site-packages/turso/lib.py", line 643, in executemany
|
||||
result = _run_execute_with_io(stmt, self._connection.extra_io)
|
||||
File "/home/cosmic/source/dungeon_masters_vault/.venv/lib/python3.13/site-packages/turso/lib.py", line 163, in _run_execute_with_io
|
||||
result = stmt.execute()
|
||||
turso.Constraint: UNIQUE constraint failed: notes.(file_path, chunk_data) (19)
|
||||
|
||||
During handling of the above exception, another exception occurred:
|
||||
|
||||
Traceback (most recent call last):
|
||||
File "/home/cosmic/source/dungeon_masters_vault/src/ingest.py", line 280, in <module>
|
||||
main()
|
||||
~~~~^^
|
||||
File "/home/cosmic/source/dungeon_masters_vault/src/ingest.py", line 273, in main
|
||||
save_to_db(embedded_chunks)
|
||||
~~~~~~~~~~^^^^^^^^^^^^^^^^^
|
||||
File "/home/cosmic/source/dungeon_masters_vault/src/ingest.py", line 201, in save_to_db
|
||||
cur.executemany(insert_sql, batch_data)
|
||||
~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
File "/home/cosmic/source/dungeon_masters_vault/.venv/lib/python3.13/site-packages/turso/lib.py", line 656, in executemany
|
||||
raise _map_turso_exception(exc)
|
||||
turso.lib.IntegrityError: UNIQUE constraint failed: notes.(file_path, chunk_data) (19)
|
||||
"""
|
||||
@@ -0,0 +1,83 @@
|
||||
import turso
|
||||
|
||||
from config_loader import load_config
|
||||
from embedding import LocalLMEmbeddings
|
||||
|
||||
CFG = load_config()
|
||||
EMBEDDING_MODEL = CFG["models"]["embedding"]
|
||||
API_BASE = CFG["api"]["base_url"]
|
||||
EMBEDDING_BATCH_SIZE=CFG["ingestion"]["embedding_batch_size"]
|
||||
|
||||
con = turso.connect("dmv.db")
|
||||
cur = con.cursor()
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS notes (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
file_path TEXT NOT NULL,
|
||||
file_name TEXT NOT NULL,
|
||||
chunk_data TEXT,
|
||||
embedding F32_BLOB(4096),
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
)""")
|
||||
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding ON notes(embedding);")
|
||||
# OR, if using libsql vector extension:
|
||||
# cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding_vector ON notes(libsql_vector_idx(embedding));")
|
||||
|
||||
embeddings_model = LocalLMEmbeddings(
|
||||
model=EMBEDDING_MODEL,
|
||||
base_url=API_BASE,
|
||||
batch_size=EMBEDDING_BATCH_SIZE,
|
||||
)
|
||||
|
||||
texts_to_embed = [
|
||||
"The quick brown fox jumped over the lazy dog",
|
||||
"Tiffany is my wife, she writes books and watches films",
|
||||
"Mazie and Bella are my labradour dogs that are two and three years old, they are white and have a pink nose",
|
||||
"The movie Titanic is about a love story on a big boat. but the boat sinks in the end"
|
||||
]
|
||||
|
||||
reply = embeddings_model._post_request(texts_to_embed)
|
||||
zipped = zip(texts_to_embed,reply)
|
||||
|
||||
|
||||
# Instead of looping and executing one INSERT at a time
|
||||
# Batch insert using multiple VALUES
|
||||
batch_insert_sql = """
|
||||
INSERT INTO notes (file_path, file_name, chunk_data, embedding)
|
||||
VALUES (?, ?, ?, vector32(?))
|
||||
"""
|
||||
|
||||
# Prepare batch data
|
||||
batch_data = []
|
||||
for number, (text, embed) in enumerate(zipped):
|
||||
batch_data.append((
|
||||
f"path/to/file_{number}",
|
||||
f"file_{number}",
|
||||
text,
|
||||
str(embed) # format as comma-separated string
|
||||
))
|
||||
|
||||
cur.executemany(batch_insert_sql, batch_data)
|
||||
con.commit()
|
||||
|
||||
query_string = ["tell me about a film on a ship"]
|
||||
query_reply = embeddings_model._post_request(query_string)
|
||||
|
||||
|
||||
cur.execute(f"""
|
||||
SELECT id,
|
||||
file_path,
|
||||
file_name,
|
||||
chunk_data,
|
||||
vector_distance_cos(embedding, vector32('{query_reply[0]}')) AS distance
|
||||
FROM notes
|
||||
ORDER BY distance ASC;
|
||||
""")
|
||||
# vector_extract(embedding)
|
||||
|
||||
print(query_string[0])
|
||||
|
||||
rows = cur.fetchall()
|
||||
for row in rows:
|
||||
print(row)
|
||||
Reference in New Issue
Block a user