Files
aaronAI/scripts/ingest.py
T
2026-04-25 02:05:42 +00:00

142 lines
4.0 KiB
Python

import os
import sys
import hashlib
from pathlib import Path
from dotenv import load_dotenv
import chromadb
from sentence_transformers import SentenceTransformer
from docx import Document
from pypdf import PdfReader
from pptx import Presentation
load_dotenv(Path.home() / "aaronai" / ".env")
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
db_path = str(Path.home() / "aaronai" / "db")
client = chromadb.PersistentClient(path=db_path)
collection = client.get_or_create_collection(
name="aaronai",
metadata={"hnsw:space": "cosine"}
)
def extract_text_from_docx(path):
doc = Document(path)
return "\n".join([para.text for para in doc.paragraphs if para.text.strip()])
def extract_text_from_pdf(path):
reader = PdfReader(path)
text = ""
for page in reader.pages:
extracted = page.extract_text()
if extracted:
text += extracted + "\n"
return text
def extract_text_from_pptx(path):
prs = Presentation(path)
text = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
text += shape.text + "\n"
return text
def extract_text_from_txt(path):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
def chunk_text(text, chunk_size=500, overlap=50):
words = text.split()
chunks = []
start = 0
while start < len(words):
end = start + chunk_size
chunk = " ".join(words[start:end])
if chunk.strip():
chunks.append(chunk)
start += chunk_size - overlap
return chunks
def make_id(filepath, chunk_index):
path_hash = hashlib.md5(str(filepath).encode()).hexdigest()[:8]
return f"{path_hash}_{chunk_index}"
def ingest_file(filepath):
path = Path(filepath)
suffix = path.suffix.lower()
# Skip temp files
if path.name.startswith("~$") or path.name.startswith("."):
return 0
try:
if suffix == ".docx":
text = extract_text_from_docx(path)
elif suffix == ".pdf":
text = extract_text_from_pdf(path)
elif suffix == ".pptx":
text = extract_text_from_pptx(path)
elif suffix in [".txt", ".md"]:
text = extract_text_from_txt(path)
else:
return 0
if not text.strip():
return 0
chunks = chunk_text(text)
if not chunks:
return 0
embeddings = embedder.encode(chunks).tolist()
ids = [make_id(path, i) for i in range(len(chunks))]
metadatas = [{
"source": path.name,
"filepath": str(path),
"folder": str(path.parent.relative_to(Path(sys.argv[1]) if len(sys.argv) > 1 else path.parent))
} for _ in chunks]
collection.upsert(
documents=chunks,
embeddings=embeddings,
ids=ids,
metadatas=metadatas
)
print(f" Indexed {len(chunks)} chunks: {path.name}")
return len(chunks)
except Exception as e:
print(f" Error: {path.name}: {e}")
return 0
def ingest_folder(folder_path):
folder = Path(folder_path)
if not folder.exists():
print(f"Folder not found: {folder_path}")
sys.exit(1)
supported = [".docx", ".pdf", ".pptx", ".txt", ".md"]
files = [f for f in folder.rglob("*")
if f.suffix.lower() in supported
and not f.name.startswith("~$")
and not f.name.startswith(".")]
if not files:
print("No supported files found.")
sys.exit(1)
print(f"Found {len(files)} files to process\n")
total_chunks = 0
for f in files:
total_chunks += ingest_file(f)
print(f"\nDone. Total chunks indexed: {total_chunks}")
print(f"Database stored at: {db_path}")
if __name__ == "__main__":
target = sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "aaronai" / "docs")
print(f"Ingesting from: {target}\n")
ingest_folder(target)