142 lines
4.0 KiB
Python
142 lines
4.0 KiB
Python
import os
|
|
import sys
|
|
import hashlib
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv
|
|
import chromadb
|
|
from sentence_transformers import SentenceTransformer
|
|
from docx import Document
|
|
from pypdf import PdfReader
|
|
from pptx import Presentation
|
|
|
|
load_dotenv(Path.home() / "aaronai" / ".env")
|
|
|
|
print("Loading embedding model...")
|
|
embedder = SentenceTransformer("all-MiniLM-L6-v2")
|
|
|
|
db_path = str(Path.home() / "aaronai" / "db")
|
|
client = chromadb.PersistentClient(path=db_path)
|
|
collection = client.get_or_create_collection(
|
|
name="aaronai",
|
|
metadata={"hnsw:space": "cosine"}
|
|
)
|
|
|
|
def extract_text_from_docx(path):
|
|
doc = Document(path)
|
|
return "\n".join([para.text for para in doc.paragraphs if para.text.strip()])
|
|
|
|
def extract_text_from_pdf(path):
|
|
reader = PdfReader(path)
|
|
text = ""
|
|
for page in reader.pages:
|
|
extracted = page.extract_text()
|
|
if extracted:
|
|
text += extracted + "\n"
|
|
return text
|
|
|
|
def extract_text_from_pptx(path):
|
|
prs = Presentation(path)
|
|
text = ""
|
|
for slide in prs.slides:
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text") and shape.text.strip():
|
|
text += shape.text + "\n"
|
|
return text
|
|
|
|
def extract_text_from_txt(path):
|
|
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
|
return f.read()
|
|
|
|
def chunk_text(text, chunk_size=500, overlap=50):
|
|
words = text.split()
|
|
chunks = []
|
|
start = 0
|
|
while start < len(words):
|
|
end = start + chunk_size
|
|
chunk = " ".join(words[start:end])
|
|
if chunk.strip():
|
|
chunks.append(chunk)
|
|
start += chunk_size - overlap
|
|
return chunks
|
|
|
|
def make_id(filepath, chunk_index):
|
|
path_hash = hashlib.md5(str(filepath).encode()).hexdigest()[:8]
|
|
return f"{path_hash}_{chunk_index}"
|
|
|
|
def ingest_file(filepath):
|
|
path = Path(filepath)
|
|
suffix = path.suffix.lower()
|
|
|
|
# Skip temp files
|
|
if path.name.startswith("~$") or path.name.startswith("."):
|
|
return 0
|
|
|
|
try:
|
|
if suffix == ".docx":
|
|
text = extract_text_from_docx(path)
|
|
elif suffix == ".pdf":
|
|
text = extract_text_from_pdf(path)
|
|
elif suffix == ".pptx":
|
|
text = extract_text_from_pptx(path)
|
|
elif suffix in [".txt", ".md"]:
|
|
text = extract_text_from_txt(path)
|
|
else:
|
|
return 0
|
|
|
|
if not text.strip():
|
|
return 0
|
|
|
|
chunks = chunk_text(text)
|
|
if not chunks:
|
|
return 0
|
|
|
|
embeddings = embedder.encode(chunks).tolist()
|
|
ids = [make_id(path, i) for i in range(len(chunks))]
|
|
metadatas = [{
|
|
"source": path.name,
|
|
"filepath": str(path),
|
|
"folder": str(path.parent.relative_to(Path(sys.argv[1]) if len(sys.argv) > 1 else path.parent))
|
|
} for _ in chunks]
|
|
|
|
collection.upsert(
|
|
documents=chunks,
|
|
embeddings=embeddings,
|
|
ids=ids,
|
|
metadatas=metadatas
|
|
)
|
|
print(f" Indexed {len(chunks)} chunks: {path.name}")
|
|
return len(chunks)
|
|
|
|
except Exception as e:
|
|
print(f" Error: {path.name}: {e}")
|
|
return 0
|
|
|
|
def ingest_folder(folder_path):
|
|
folder = Path(folder_path)
|
|
if not folder.exists():
|
|
print(f"Folder not found: {folder_path}")
|
|
sys.exit(1)
|
|
|
|
supported = [".docx", ".pdf", ".pptx", ".txt", ".md"]
|
|
files = [f for f in folder.rglob("*")
|
|
if f.suffix.lower() in supported
|
|
and not f.name.startswith("~$")
|
|
and not f.name.startswith(".")]
|
|
|
|
if not files:
|
|
print("No supported files found.")
|
|
sys.exit(1)
|
|
|
|
print(f"Found {len(files)} files to process\n")
|
|
total_chunks = 0
|
|
for f in files:
|
|
total_chunks += ingest_file(f)
|
|
|
|
print(f"\nDone. Total chunks indexed: {total_chunks}")
|
|
print(f"Database stored at: {db_path}")
|
|
|
|
if __name__ == "__main__":
|
|
target = sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "aaronai" / "docs")
|
|
print(f"Ingesting from: {target}\n")
|
|
ingest_folder(target)
|