ruff check and format
This commit is contained in:
+22
-19
@@ -1,4 +1,3 @@
|
||||
import json
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from pathlib import Path
|
||||
|
||||
@@ -8,10 +7,9 @@ from langchain_community.vectorstores import FAISS
|
||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||
from tqdm import tqdm
|
||||
|
||||
from config_loader import load_config
|
||||
from embedding import LocalLMEmbeddings
|
||||
from experts.ingestion_agent import IngestionAgent
|
||||
from config_loader import load_config
|
||||
|
||||
|
||||
CFG = load_config()
|
||||
DATA_DIR = CFG["ingestion"]["data_dir"]
|
||||
@@ -20,10 +18,11 @@ MODEL_BASE = CFG["models"]["enrich"]
|
||||
EMBEDDING_MODEL = CFG["models"]["embedding"]
|
||||
API_BASE = CFG["api"]["base_url"]
|
||||
API_VERSION = CFG["api"]["api_version"]
|
||||
MAX_WORKERS=CFG["ingestion"]["max_workers"]
|
||||
CHUNK_SIZE=CFG["ingestion"]["chunk_size"],
|
||||
CHUNK_OVERLAP=CFG["ingestion"]["chunk_overlap"]
|
||||
EMBEDDING_BATCH_SIZE=CFG["ingestion"]["embedding_batch_size"]
|
||||
MAX_WORKERS = CFG["ingestion"]["max_workers"]
|
||||
CHUNK_SIZE = (CFG["ingestion"]["chunk_size"],)
|
||||
CHUNK_OVERLAP = CFG["ingestion"]["chunk_overlap"]
|
||||
EMBEDDING_BATCH_SIZE = CFG["ingestion"]["embedding_batch_size"]
|
||||
|
||||
|
||||
def load_documents():
|
||||
docs = []
|
||||
@@ -50,27 +49,28 @@ def load_documents():
|
||||
|
||||
return docs
|
||||
|
||||
|
||||
def chunk_documents(docs):
|
||||
# LangChain preserves metadata during splitting automatically
|
||||
text_splitter = RecursiveCharacterTextSplitter(
|
||||
chunk_size=CHUNK_SIZE,
|
||||
chunk_overlap=CHUNK_OVERLAP,
|
||||
separators=["\n\n", "\n", ". ", " ", ""]
|
||||
chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separators=["\n\n", "\n", ". ", " ", ""]
|
||||
)
|
||||
return text_splitter.split_documents(docs)
|
||||
|
||||
|
||||
def enrich_chunks(chunks: list) -> list:
|
||||
|
||||
def process_single_chunk(indexed_chunk):
|
||||
idx, chunk = indexed_chunk
|
||||
lm_index = idx % 8
|
||||
|
||||
|
||||
try:
|
||||
with dspy.context(lm=dspy.LM(f"{MODEL_BASE}:{lm_index}", api_base=API_BASE+API_VERSION)):
|
||||
with dspy.context(
|
||||
lm=dspy.LM(f"{MODEL_BASE}:{lm_index}", api_base=API_BASE + API_VERSION)
|
||||
):
|
||||
response = IngestionAgent().forward(note=chunk.page_content)
|
||||
|
||||
|
||||
# This is now an object, not a string!
|
||||
metadata = response.answer.dict()
|
||||
metadata = response.answer.dict()
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Failed for chunk {idx}: {e}")
|
||||
@@ -78,7 +78,6 @@ def enrich_chunks(chunks: list) -> list:
|
||||
|
||||
chunk.metadata.update(metadata)
|
||||
return chunk
|
||||
|
||||
|
||||
enriched_results = []
|
||||
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
||||
@@ -92,6 +91,7 @@ def enrich_chunks(chunks: list) -> list:
|
||||
enriched_results.sort(key=lambda x: x[0])
|
||||
return [item[1] for item in enriched_results]
|
||||
|
||||
|
||||
def store_chunks_locally(chunks, db_path=DATABASE_PATH):
|
||||
embeddings_model = LocalLMEmbeddings(
|
||||
model=EMBEDDING_MODEL,
|
||||
@@ -106,14 +106,17 @@ def store_chunks_locally(chunks, db_path=DATABASE_PATH):
|
||||
print(f"✅ Successfully stored in FAISS at '{db_path}'")
|
||||
return vectorstore
|
||||
|
||||
|
||||
def main():
|
||||
docs = load_documents()
|
||||
if not docs: return
|
||||
|
||||
if not docs:
|
||||
return
|
||||
|
||||
chunks = chunk_documents(docs)
|
||||
enriched_chunks = enrich_chunks(chunks)
|
||||
store_chunks_locally(enriched_chunks)
|
||||
print("🎉 Ingestion complete!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user