Compare commits

..

6 Commits

6 changed files with 597 additions and 119 deletions
+90 -1
View File
@@ -47,6 +47,7 @@ MODE_RANGES = {
"late-rem": (0.22, 0.42), "late-rem": (0.22, 0.42),
"lucid": (0.32, 0.72), "lucid": (0.32, 0.72),
} }
DREAMER_VERSION = "1.1" # 1.0=original exclusion logic; 1.1=score-band exclusion
# ─── Prompt versioning ────────────────────────────────────────────────────── # ─── Prompt versioning ──────────────────────────────────────────────────────
# Bump the relevant constant manually when changing a prompt. # Bump the relevant constant manually when changing a prompt.
@@ -63,6 +64,11 @@ def prompt_hash(prompts: list[str]) -> str:
combined = "".join(prompts) combined = "".join(prompts)
return hashlib.md5(combined.encode()).hexdigest()[:8] return hashlib.md5(combined.encode()).hexdigest()[:8]
def extract_folder(source_path):
"""Extract top-level Nextcloud folder from source path."""
parts = source_path.replace("\\", "/").split("/")
return parts[0] if parts else "unknown"
# ─── Stage 1: Observe ─────────────────────────────────────────────────────── # ─── Stage 1: Observe ───────────────────────────────────────────────────────
def observe_corpus(): def observe_corpus():
@@ -104,7 +110,55 @@ def get_recent_conversation_topics(days=14):
# ─── Stage 2: Retrieve ────────────────────────────────────────────────────── # ─── Stage 2: Retrieve ──────────────────────────────────────────────────────
def retrieve_graphiti(mode, task=None, n_results=8):
"""E3 experiment — Graphiti substrate retrieval.
Queries Graphiti /search endpoint instead of pgvector.
Returns chunks in same format as retrieve() for pipeline compatibility.
Note: content is Graphiti facts (synthesized relationships), not raw chunks.
"""
import requests as req_lib
if task:
query = task
elif mode == "late-rem":
delta = observe_corpus()
topics = delta.get("recent_topics", [])
query = topics[0] if topics else "practice place memory making"
elif mode == "early-rem":
query = "career decision personal change what matters next"
else:
query = "research fabrication teaching practice recent work"
try:
resp = req_lib.get(
"http://localhost:8001/search",
params={"query": query, "limit": n_results, "group_id": "aaron"},
timeout=30,
)
resp.raise_for_status()
results = resp.json().get("results", [])
chunks = []
for r in results:
fact = r.get("fact", "")
if not fact.strip():
continue
chunks.append({
"source": r.get("source", "graphiti"),
"content": fact,
"relevance": r.get("score", 0.5),
"similarity": r.get("score", 0.5),
})
return chunks
except Exception as e:
print(f"[Graphiti retrieval error: {e}] — falling back to empty.")
return []
def retrieve(mode, task=None, n_results=8, excluded_sources=None): def retrieve(mode, task=None, n_results=8, excluded_sources=None):
# E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search
# Default behavior: pgvector similarity search (unchanged)
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
if substrate == "graphiti":
return retrieve_graphiti(mode, task=task, n_results=n_results)
from sentence_transformers import SentenceTransformer from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("all-MiniLM-L6-v2") embedder = SentenceTransformer("all-MiniLM-L6-v2")
low, high = MODE_RANGES[mode] low, high = MODE_RANGES[mode]
@@ -153,6 +207,7 @@ def retrieve(mode, task=None, n_results=8, excluded_sources=None):
"source": source or "unknown", "source": source or "unknown",
"content": doc, "content": doc,
"relevance": similarity, "relevance": similarity,
"similarity": similarity,
}) })
seen_sources.add(source) seen_sources.add(source)
if len(chunks) >= n_results: if len(chunks) >= n_results:
@@ -361,6 +416,7 @@ def write_manifest(date_str, stage_data, corpus_data):
manifest = { manifest = {
"date": date_str, "date": date_str,
"prompt_sig": prompt_signature(), "prompt_sig": prompt_signature(),
"dreamer_version": DREAMER_VERSION,
"prompt_hash": prompt_hash([ "prompt_hash": prompt_hash([
synthesize_nrem.__doc__ or "", synthesize_nrem.__doc__ or "",
synthesize_early_rem.__doc__ or "", synthesize_early_rem.__doc__ or "",
@@ -401,6 +457,8 @@ def dream_pipeline():
print("\n[NREM] Retrieving...") print("\n[NREM] Retrieving...")
nrem_chunks = retrieve("nrem", excluded_sources=previously_retrieved | session_retrieved) nrem_chunks = retrieve("nrem", excluded_sources=previously_retrieved | session_retrieved)
session_retrieved.update(c["source"] for c in nrem_chunks) session_retrieved.update(c["source"] for c in nrem_chunks)
# Track sources that scored above Early REM ceiling — these are the only ones Early REM should exclude
nrem_high_sources = {c["source"] for c in nrem_chunks if c["similarity"] > 0.55}
if not nrem_chunks: if not nrem_chunks:
print("[NREM] No suitable chunks — aborting pipeline") print("[NREM] No suitable chunks — aborting pipeline")
return None return None
@@ -408,12 +466,17 @@ def dream_pipeline():
print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...") print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...")
nrem_output = synthesize_nrem(nrem_chunks) nrem_output = synthesize_nrem(nrem_chunks)
nrem_file = deliver(nrem_output, "nrem") nrem_file = deliver(nrem_output, "nrem")
nrem_sources = [c["source"] for c in nrem_chunks]
nrem_folders = list({extract_folder(s) for s in nrem_sources})
stage_data = { stage_data = {
"nrem": { "nrem": {
"chunks_retrieved": len(nrem_chunks), "chunks_retrieved": len(nrem_chunks),
"avg_similarity": round(sum(c["relevance"] for c in nrem_chunks) / len(nrem_chunks), 3), "avg_similarity": round(sum(c["relevance"] for c in nrem_chunks) / len(nrem_chunks), 3),
"query": "research fabrication teaching practice recent work", "query": "research fabrication teaching practice recent work",
"word_count": len(nrem_output.split()), "word_count": len(nrem_output.split()),
"sources": nrem_sources,
"distinct_folders": nrem_folders,
"folder_count": len(nrem_folders),
"status": "ok", "status": "ok",
} }
} }
@@ -421,7 +484,9 @@ def dream_pipeline():
# ── Stage 2: Early REM — informed by NREM ────────────────────────────── # ── Stage 2: Early REM — informed by NREM ──────────────────────────────
print("\n[Early REM] Retrieving...") print("\n[Early REM] Retrieving...")
early_chunks = retrieve("early-rem", excluded_sources=previously_retrieved | session_retrieved) # Early REM excludes previously retrieved + NREM high-scorers only (not full session_retrieved)
# Sources that scored in Early REM band during NREM remain available
early_chunks = retrieve("early-rem", excluded_sources=previously_retrieved | nrem_high_sources)
session_retrieved.update(c["source"] for c in early_chunks) session_retrieved.update(c["source"] for c in early_chunks)
if not early_chunks: if not early_chunks:
print("[Early REM] No suitable chunks — skipping") print("[Early REM] No suitable chunks — skipping")
@@ -430,11 +495,16 @@ def dream_pipeline():
print(f"[Early REM] Retrieved {len(early_chunks)} chunks. Synthesizing with NREM context...") print(f"[Early REM] Retrieved {len(early_chunks)} chunks. Synthesizing with NREM context...")
early_rem_output = synthesize_early_rem(early_chunks, nrem_output) early_rem_output = synthesize_early_rem(early_chunks, nrem_output)
deliver(early_rem_output, "early-rem") deliver(early_rem_output, "early-rem")
early_sources = [c["source"] for c in early_chunks]
early_folders = list({extract_folder(s) for s in early_sources})
stage_data["early_rem"] = { stage_data["early_rem"] = {
"chunks_retrieved": len(early_chunks), "chunks_retrieved": len(early_chunks),
"avg_similarity": round(sum(c["relevance"] for c in early_chunks) / len(early_chunks), 3), "avg_similarity": round(sum(c["relevance"] for c in early_chunks) / len(early_chunks), 3),
"query": "career decision personal change what matters next", "query": "career decision personal change what matters next",
"word_count": len(early_rem_output.split()), "word_count": len(early_rem_output.split()),
"sources": early_sources,
"distinct_folders": early_folders,
"folder_count": len(early_folders),
"status": "ok", "status": "ok",
} }
print(f"[Early REM] Done.\n{early_rem_output[:200]}...") print(f"[Early REM] Done.\n{early_rem_output[:200]}...")
@@ -450,11 +520,22 @@ def dream_pipeline():
print(f"[Late REM] Retrieved {len(late_chunks)} chunks. Synthesizing with full context...") print(f"[Late REM] Retrieved {len(late_chunks)} chunks. Synthesizing with full context...")
late_rem_output = synthesize_late_rem(late_chunks, nrem_output, early_rem_output) late_rem_output = synthesize_late_rem(late_chunks, nrem_output, early_rem_output)
deliver(late_rem_output, "late-rem") deliver(late_rem_output, "late-rem")
late_sources = [c["source"] for c in late_chunks]
late_folders = [extract_folder(s) for s in late_sources]
cross_domain_pairs = sum(
1 for i in range(len(late_folders))
for j in range(i+1, len(late_folders))
if late_folders[i] != late_folders[j]
)
stage_data["late_rem"] = { stage_data["late_rem"] = {
"chunks_retrieved": len(late_chunks), "chunks_retrieved": len(late_chunks),
"avg_similarity": round(sum(c["relevance"] for c in late_chunks) / len(late_chunks), 3), "avg_similarity": round(sum(c["relevance"] for c in late_chunks) / len(late_chunks), 3),
"query": "practice place memory making", "query": "practice place memory making",
"word_count": len(late_rem_output.split()), "word_count": len(late_rem_output.split()),
"sources": late_sources,
"distinct_folders": list(set(late_folders)),
"folder_count": len(set(late_folders)),
"cross_domain_pairs": cross_domain_pairs,
"status": "ok", "status": "ok",
} }
print(f"[Late REM] Done.\n{late_rem_output[:200]}...") print(f"[Late REM] Done.\n{late_rem_output[:200]}...")
@@ -474,10 +555,18 @@ def dream_pipeline():
print(f"{'='*60}") print(f"{'='*60}")
# Write manifest # Write manifest
all_session_sources = list(session_retrieved)
all_session_folders = list({extract_folder(s) for s in all_session_sources})
corpus_data = { corpus_data = {
"total_chunks": delta.get("new_chunks", 0), "total_chunks": delta.get("new_chunks", 0),
"new_chunks_since_last_dream": delta.get("new_chunks", 0), "new_chunks_since_last_dream": delta.get("new_chunks", 0),
"days_since_last_dream": round(delta.get("days_since_dream", 0), 2), "days_since_last_dream": round(delta.get("days_since_dream", 0), 2),
"substrate": "pgvector",
"aggregate": {
"total_distinct_sources": len(all_session_sources),
"total_distinct_folders": len(all_session_folders),
"folders_touched": all_session_folders,
}
} }
write_manifest(datetime.now().strftime("%Y-%m-%d"), stage_data, corpus_data) write_manifest(datetime.now().strftime("%Y-%m-%d"), stage_data, corpus_data)
+222
View File
@@ -0,0 +1,222 @@
#!/usr/bin/env python3
"""
E3 — Dreamer Substrate Comparison Experiment
Runs dream pipeline twice: once against pgvector (Condition A),
once against Graphiti (Condition B). Implements blinding per protocol.
Usage:
python3 scripts/experiments/e3_dreamer_substrate.py
Outputs:
Journal/Dreams/e3-dream-X-{mode}.md (one condition, identity concealed)
Journal/Dreams/e3-dream-Y-{mode}.md (other condition, identity concealed)
Journal/Dreams/e3-blind-mapping.json (kept unread until ratings complete)
Journal/Dreams/e3-manifest.json (quantitative metrics for both conditions)
"""
import os
import sys
import json
import random
import requests
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL", "https://nextcloud.aaronnelson.studio")
NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron")
NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "")
DREAMS_WEBDAV_BASE = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams"
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
def webdav_put(url, content_str):
requests.put(url, data=content_str.encode("utf-8"), auth=auth, timeout=60)
def webdav_mkcol(url):
requests.request("MKCOL", url, auth=auth, timeout=10)
def run_condition(label, substrate, modes=("nrem", "early-rem", "late-rem", "synthesis")):
"""Run the dream pipeline for one substrate condition.
Returns dict of {mode: dream_text} and metrics.
"""
print(f"\n{'='*60}")
print(f"Running Condition {label} — substrate: {substrate}")
print(f"{'='*60}")
os.environ["DREAMER_SUBSTRATE"] = substrate
# Import dream module fresh for this run
if "dream" in sys.modules:
del sys.modules["dream"]
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
import dream as d
# Run pipeline stages manually to capture outputs
outputs = {}
metrics = {}
# Observe corpus delta
delta = d.observe_corpus()
print(f"Corpus delta: {delta.get('new_chunks', 0)} new chunks")
previously_retrieved = set()
session_retrieved = set()
nrem_high_sources = set()
# NREM
print("\n[NREM] Retrieving...")
nrem_chunks = d.retrieve("nrem", excluded_sources=previously_retrieved | session_retrieved)
session_retrieved.update(c["source"] for c in nrem_chunks)
nrem_high_sources = {c["source"] for c in nrem_chunks if c.get("similarity", 0) > 0.55}
if not nrem_chunks:
print("[NREM] No chunks — aborting.")
return None, None
nrem_output = d.synthesize_nrem(nrem_chunks)
outputs["nrem"] = nrem_output
metrics["nrem"] = {
"chunks_retrieved": len(nrem_chunks),
"sources": [c["source"] for c in nrem_chunks],
"word_count": len(nrem_output.split()),
}
print(f"[NREM] Done. {len(nrem_chunks)} chunks, {metrics['nrem']['word_count']} words.")
# Early REM
print("\n[Early REM] Retrieving...")
early_chunks = d.retrieve("early-rem", excluded_sources=previously_retrieved | nrem_high_sources)
session_retrieved.update(c["source"] for c in early_chunks)
if early_chunks:
early_output = d.synthesize_early_rem(early_chunks, nrem_output)
outputs["early-rem"] = early_output
metrics["early-rem"] = {
"chunks_retrieved": len(early_chunks),
"sources": [c["source"] for c in early_chunks],
"word_count": len(early_output.split()),
}
print(f"[Early REM] Done. {len(early_chunks)} chunks.")
else:
early_output = nrem_output
outputs["early-rem"] = None
metrics["early-rem"] = {"chunks_retrieved": 0, "sources": [], "word_count": 0}
print("[Early REM] No chunks — using NREM fallback.")
# Late REM
print("\n[Late REM] Retrieving...")
late_chunks = d.retrieve("late-rem", excluded_sources=previously_retrieved | session_retrieved)
session_retrieved.update(c["source"] for c in late_chunks)
if late_chunks:
late_output = d.synthesize_late_rem(late_chunks, nrem_output, early_output)
outputs["late-rem"] = late_output
late_sources = [c["source"] for c in late_chunks]
metrics["late-rem"] = {
"chunks_retrieved": len(late_chunks),
"sources": late_sources,
"word_count": len(late_output.split()),
}
print(f"[Late REM] Done. {len(late_chunks)} chunks.")
else:
late_output = early_output
outputs["late-rem"] = None
metrics["late-rem"] = {"chunks_retrieved": 0, "sources": [], "word_count": 0}
print("[Late REM] No chunks — using fallback.")
# Synthesis
print("\n[Synthesis] Integrating...")
synthesis_output = d.synthesize_final(nrem_output, early_output, late_output)
outputs["synthesis"] = synthesis_output
metrics["synthesis"] = {"word_count": len(synthesis_output.split())}
print(f"[Synthesis] Done. {metrics['synthesis']['word_count']} words.")
return outputs, metrics
def main():
date_str = datetime.now().strftime("%Y-%m-%d")
# Randomize which condition gets label X vs Y for blinding
conditions = [("pgvector", "A"), ("graphiti", "B")]
labels = ["X", "Y"]
random.shuffle(labels)
assignment = {conditions[0][0]: labels[0], conditions[1][0]: labels[1]}
mapping = {
"date": date_str,
"X": assignment["pgvector"] == "X" and "pgvector" or "graphiti",
"Y": assignment["pgvector"] == "Y" and "pgvector" or "graphiti",
"reveal_after_rating": True,
}
# Simpler mapping
mapping = {
"date": date_str,
"X_is": "pgvector" if assignment["pgvector"] == "X" else "graphiti",
"Y_is": "pgvector" if assignment["pgvector"] == "Y" else "graphiti",
"reveal_after_rating": True,
}
print(f"E3 Dreamer Substrate Comparison — {date_str}")
print(f"Blind mapping written (do not read until ratings complete)")
# Write blind mapping to Nextcloud immediately
webdav_mkcol(DREAMS_WEBDAV_BASE)
mapping_url = f"{DREAMS_WEBDAV_BASE}/e3-blind-mapping.json"
webdav_put(mapping_url, json.dumps(mapping, indent=2))
print(f"Mapping written to Journal/Dreams/e3-blind-mapping.json")
all_metrics = {}
# Run both conditions
for substrate, condition_label in conditions:
blind_label = assignment[substrate]
outputs, metrics = run_condition(condition_label, substrate)
if outputs is None:
print(f"Condition {condition_label} failed — aborting E3.")
return
all_metrics[blind_label] = {
"substrate": "CONCEALED", # concealed until reveal
"stages": metrics,
}
# Write dream files with blind labels
modes_map = {
"nrem": "nrem",
"early-rem": "early-rem",
"late-rem": "late-rem",
"synthesis": "synthesis",
}
for mode, text in outputs.items():
if text is None:
continue
filename = f"e3-dream-{blind_label}-{mode}.md"
header = f"# E3 Dream — {blind_label}{mode.upper()}{date_str}\n"
header += f"*Substrate concealed until rating complete*\n\n---\n\n"
file_content = header + text
url = f"{DREAMS_WEBDAV_BASE}/{filename}"
webdav_put(url, file_content)
print(f"Written: Journal/Dreams/{filename}")
# Write quantitative metrics (substrate concealed)
e3_manifest = {
"date": date_str,
"experiment": "E3",
"protocol": "E3-dreamer-substrate-comparison-protocol.md",
"blind_labels": ["X", "Y"],
"substrate_concealed": True,
"metrics": all_metrics,
"rating_dimensions": ["Texture", "Reach", "Specificity", "Forward-facing"],
"rating_scale": "1-5 per dimension per dream",
}
manifest_url = f"{DREAMS_WEBDAV_BASE}/e3-manifest.json"
webdav_put(manifest_url, json.dumps(e3_manifest, indent=2))
print(f"\nE3 manifest written to Journal/Dreams/e3-manifest.json")
print(f"\n{'='*60}")
print("E3 runs complete. Eight dream files written.")
print("Rate all 8 dreams before reading e3-blind-mapping.json")
print(f"{'='*60}")
if __name__ == "__main__":
main()
+2
View File
@@ -143,6 +143,8 @@ def ingest_file(filepath):
print(f" Indexed {len(chunks)} chunks: {path.name}") print(f" Indexed {len(chunks)} chunks: {path.name}")
# Enqueue for Stage 2 → Stage 3 (Graphiti pipeline) # Enqueue for Stage 2 → Stage 3 (Graphiti pipeline)
# SKIP_STAGE2_ENQUEUE env var set by migration scripts to prevent bulk enqueue
if not os.getenv("SKIP_STAGE2_ENQUEUE"):
enqueue_stage2(path.name, text) enqueue_stage2(path.name, text)
return len(chunks) return len(chunks)
-1
View File
@@ -21,7 +21,6 @@ logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format="%(asctime)s [stage2] %(levelname)s %(message)s", format="%(asctime)s [stage2] %(levelname)s %(message)s",
handlers=[ handlers=[
logging.StreamHandler(),
logging.FileHandler("/var/log/aaronai/stage2.log", mode="a"), logging.FileHandler("/var/log/aaronai/stage2.log", mode="a"),
] ]
) )
-1
View File
@@ -24,7 +24,6 @@ logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format="%(asctime)s [stage3] %(levelname)s %(message)s", format="%(asctime)s [stage3] %(levelname)s %(message)s",
handlers=[ handlers=[
logging.StreamHandler(),
logging.FileHandler("/var/log/aaronai/stage3.log", mode="a"), logging.FileHandler("/var/log/aaronai/stage3.log", mode="a"),
] ]
) )
+270 -103
View File
@@ -1,62 +1,214 @@
"""
Aaron AI Watcher — Stage 1 of the encoding pipeline.
Watches the Nextcloud directory for new or changed files.
On detection, chunks + embeds documents in-process (no subprocess),
then enqueues to stage_2_queue for async cascade processing.
Design principles:
- Embedding model loaded ONCE at startup, reused across all ingest runs
- In-process ingest (no subprocess) — eliminates per-run model reload memory spike
- Missed-file recovery on startup — ingests anything new since last state
- Heartbeat file updated every loop tick — enables external health monitoring
- Parity principle: no filtering, no decisions, faithful capture
- Does NOT enqueue to stage_2_queue during bulk migration (SKIP_STAGE2_ENQUEUE env var)
Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3_queue -> Stage 3 (Graphiti)
"""
import os
import time import time
import subprocess
import logging
import json import json
import hashlib
import logging
import threading import threading
from pathlib import Path from pathlib import Path
import psycopg2
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
from docx import Document as DocxDocument
from pypdf import PdfReader
from pptx import Presentation
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
INGEST_SCRIPT = "/home/aaron/aaronai/scripts/ingest.py"
PYTHON = "/home/aaron/aaronai/venv/bin/python3"
LOG_FILE = "/home/aaron/aaronai/watcher.log" LOG_FILE = "/home/aaron/aaronai/watcher.log"
STATE_FILE = "/home/aaron/aaronai/watcher_state.json" STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
SUPPORTED = {'.pdf', '.docx', '.pptx', '.txt', '.md'}
DEBOUNCE_SECONDS = 120
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json" STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat"
SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"}
DEBOUNCE_SECONDS = 120
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
EMBED_MODEL = "all-MiniLM-L6-v2"
PG_DSN = os.getenv("PG_DSN")
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='%(asctime)s - %(message)s', format="%(asctime)s [watcher] %(levelname)s %(message)s",
handlers=[ handlers=[logging.FileHandler(LOG_FILE)],
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
) )
log = logging.getLogger("watcher")
ingestion_state = {
"status": "idle",
"message": "",
"file_count": 0,
"started_at": None,
"finished_at": None,
"last_error": "",
}
ingestion_lock = threading.Lock() ingestion_lock = threading.Lock()
ingestion_state = {
"status": "idle", "message": "", "file_count": 0,
"started_at": None, "finished_at": None, "last_error": "",
}
ingestion_thread = None ingestion_thread = None
def set_ingestion_state(**kwargs): def load_embedder():
with ingestion_lock: log.info(f"Loading embedding model: {EMBED_MODEL}")
ingestion_state.update(kwargs) model = SentenceTransformer(EMBED_MODEL)
log.info("Embedding model ready.")
return model
def load_state(): def get_pg():
return psycopg2.connect(PG_DSN)
def extract_text(path: Path) -> str:
suffix = path.suffix.lower()
try:
if suffix == ".docx":
doc = DocxDocument(path)
return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
elif suffix == ".pdf":
reader = PdfReader(path)
return "".join(
page.extract_text() + "\n"
for page in reader.pages if page.extract_text()
)
elif suffix == ".pptx":
prs = Presentation(path)
return "\n".join(
shape.text for slide in prs.slides
for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip()
)
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
log.warning(f"Text extraction failed for {path.name}: {e}")
return ""
def chunk_text(text: str) -> list:
words = text.split()
chunks = []
start = 0
while start < len(words):
chunk = " ".join(words[start:start + CHUNK_SIZE])
if chunk.strip():
chunks.append(chunk)
start += CHUNK_SIZE - CHUNK_OVERLAP
return chunks
def make_chunk_id(filepath: Path, chunk_index: int) -> str:
return hashlib.md5(str(filepath).encode()).hexdigest()[:8] + f"_{chunk_index}"
def enqueue_stage2(source: str, full_text: str):
if os.getenv("SKIP_STAGE2_ENQUEUE"):
return
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
char_length = EXCLUDED.char_length,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text[:50000], len(full_text)))
pg.commit()
pg.close()
except Exception as e:
log.warning(f"Stage 2 enqueue failed (non-fatal): {e}")
def ingest_file(filepath: Path, embedder) -> int:
if filepath.name.startswith(("~$", ".")):
return 0
if filepath.suffix.lower() not in SUPPORTED:
return 0
text = extract_text(filepath)
if not text.strip():
return 0
chunks = chunk_text(text)
if not chunks:
return 0
try:
embeddings = embedder.encode(chunks).tolist()
except Exception as e:
log.error(f"Embedding failed for {filepath.name}: {e}")
return 0
source = filepath.name
try:
pg = get_pg()
cur = pg.cursor()
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
chunk_id = make_chunk_id(filepath, i)
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (chunk_id, chunk, embedding, source, "document",
json.dumps({"source": source, "filepath": str(filepath)})))
pg.commit()
pg.close()
except Exception as e:
log.error(f"pgvector write failed for {filepath.name}: {e}")
return 0
log.info(f"Indexed {len(chunks)} chunks: {filepath.name}")
enqueue_stage2(source, text)
return len(chunks)
def ingest_files(paths: list, embedder, state: dict) -> dict:
total = 0
for path in paths:
count = ingest_file(path, embedder)
total += count
state[str(path)] = str(path.stat().st_mtime)
log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.")
return state
def load_state() -> dict:
if Path(STATE_FILE).exists(): if Path(STATE_FILE).exists():
try:
with open(STATE_FILE) as f: with open(STATE_FILE) as f:
return json.load(f) return json.load(f)
except Exception:
pass
return {} return {}
def save_state(state): def save_state(state: dict):
with open(STATE_FILE, 'w') as f: with open(STATE_FILE, "w") as f:
json.dump(state, f) json.dump(state, f)
def get_changed_files(): def get_changed_files(state: dict) -> list:
state = load_state()
changed = [] changed = []
root = Path(NEXTCLOUD_PATH) root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"): for path in root.rglob("*"):
@@ -64,84 +216,81 @@ def get_changed_files():
continue continue
if path.suffix.lower() not in SUPPORTED: if path.suffix.lower() not in SUPPORTED:
continue continue
if path.name.startswith('.') or path.name.startswith('~$'): if path.name.startswith((".", "~$")):
continue continue
mtime = str(path.stat().st_mtime) if "Admin/Backups" in str(path) or "Backups" in path.parts:
key = str(path) continue
if state.get(key) != mtime: if "Journal/Media" in str(path):
continue
if state.get(str(path)) != str(path.stat().st_mtime):
changed.append(path) changed.append(path)
return changed, state return changed
def run_ingestion(): def set_ingestion_state(**kwargs):
changed, state = get_changed_files() with ingestion_lock:
ingestion_state.update(kwargs)
def write_status(handler):
with ingestion_lock:
status = {
"running": True, "timestamp": time.time(),
"pending": handler.pending, "last_event": handler.last_event,
"ingestion": dict(ingestion_state),
}
try:
with open(STATUS_FILE, "w") as f:
json.dump(status, f)
except Exception:
pass
def write_heartbeat():
try:
Path(HEARTBEAT_FILE).write_text(str(time.time()))
except Exception:
pass
def run_ingestion(embedder):
state = load_state()
changed = get_changed_files(state)
if not changed: if not changed:
logging.info("No new or changed files detected — skipping ingestion.") log.info("No new or changed files — skipping ingestion.")
set_ingestion_state(status="idle", message="No changes detected", file_count=0) set_ingestion_state(status="idle", message="No changes detected", file_count=0)
return return
count = len(changed) count = len(changed)
logging.info(f"Found {count} new or changed files — starting ingestion...") log.info(f"Found {count} new or changed files — starting ingestion...")
set_ingestion_state( set_ingestion_state(
status="ingesting", status="ingesting", message=f"Ingesting {count} file(s)...",
message=f"Ingesting {count} file(s)...", file_count=count, started_at=time.time(), finished_at=None, last_error="",
file_count=count,
started_at=time.time(),
finished_at=None,
last_error="",
) )
try: try:
result = subprocess.run( state = ingest_files(changed, embedder, state)
[PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH],
capture_output=True,
text=True,
timeout=1800
)
if result.returncode == 0:
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_file() and path.suffix.lower() in SUPPORTED:
state[str(path)] = str(path.stat().st_mtime)
save_state(state) save_state(state)
logging.info("Ingestion complete. State updated.")
set_ingestion_state( set_ingestion_state(
status="idle", status="idle",
message=f"Last run: ingested {count} file(s) successfully", message=f"Last run: ingested {count} file(s) successfully",
finished_at=time.time(), finished_at=time.time(),
) )
else:
logging.error(f"Ingestion error: {result.stderr}")
set_ingestion_state(
status="error",
message="Ingestion failed — see log",
last_error=result.stderr[-300:],
finished_at=time.time(),
)
except subprocess.TimeoutExpired:
logging.error("Ingestion timed out.")
set_ingestion_state(
status="error",
message="Ingestion timed out (>30 min)",
last_error="TimeoutExpired",
finished_at=time.time(),
)
except Exception as e: except Exception as e:
logging.error(f"Ingestion failed: {e}") log.error(f"Ingestion failed: {e}")
set_ingestion_state( set_ingestion_state(
status="error", status="error", message=f"Ingestion exception: {e}",
message=f"Ingestion exception: {e}", last_error=str(e), finished_at=time.time(),
last_error=str(e),
finished_at=time.time(),
) )
def start_ingestion_thread(): def start_ingestion_thread(embedder):
global ingestion_thread global ingestion_thread
with ingestion_lock:
if ingestion_thread and ingestion_thread.is_alive(): if ingestion_thread and ingestion_thread.is_alive():
logging.info("Ingestion already running — skipping.") log.info("Ingestion already running — skipping.")
return return
ingestion_thread = threading.Thread(target=run_ingestion, daemon=True) ingestion_thread = threading.Thread(
target=run_ingestion, args=(embedder,), daemon=True
)
ingestion_thread.start() ingestion_thread.start()
@@ -156,54 +305,72 @@ class IngestHandler(FileSystemEventHandler):
path = Path(event.src_path) path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED: if path.suffix.lower() not in SUPPORTED:
return return
if path.name.startswith('.') or path.name.startswith('~$'): if path.name.startswith((".", "~$")):
return return
if 'Admin/Backups' in str(path) or 'Backups' in path.parts: if "Admin/Backups" in str(path) or "Backups" in path.parts:
return return
if 'Journal/Media' in str(path): if "Journal/Media" in str(path):
return return
if event.event_type not in ('modified', 'created', 'moved'): if event.event_type not in ("modified", "created", "moved"):
return return
logging.info(f"Event: {event.event_type} {event.src_path}") log.info(f"Event: {event.event_type} {event.src_path}")
self.pending = True self.pending = True
self.last_event = time.time() self.last_event = time.time()
def write_status(handler):
with ingestion_lock:
status = {
"running": True,
"timestamp": time.time(),
"pending": handler.pending,
"last_event": handler.last_event,
"ingestion": dict(ingestion_state),
}
with open(STATUS_FILE, 'w') as f:
json.dump(status, f)
def main(): def main():
logging.info("Aaron AI Watcher starting...") log.info("Aaron AI Watcher starting...")
logging.info(f"Watching: {NEXTCLOUD_PATH}") log.info(f"Watching: {NEXTCLOUD_PATH}")
embedder = load_embedder()
log.info("Startup scan: checking for files missed since last run...")
state = load_state()
missed = get_changed_files(state)
if missed:
log.info(f"Startup recovery: {len(missed)} missed file(s) — ingesting now.")
set_ingestion_state(
status="ingesting",
message=f"Startup recovery: ingesting {len(missed)} missed file(s)...",
file_count=len(missed), started_at=time.time(),
)
try:
state = ingest_files(missed, embedder, state)
save_state(state)
set_ingestion_state(
status="idle",
message=f"Startup recovery complete: {len(missed)} file(s) ingested.",
finished_at=time.time(),
)
except Exception as e:
log.error(f"Startup recovery failed: {e}")
set_ingestion_state(status="error", message=str(e),
last_error=str(e), finished_at=time.time())
else:
log.info("Startup scan: no missed files.")
handler = IngestHandler() handler = IngestHandler()
observer = Observer() observer = Observer()
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True) observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
observer.start() observer.start()
log.info("Observer started.")
try: try:
while True: while True:
write_heartbeat()
write_status(handler) write_status(handler)
if handler.pending: if handler.pending:
elapsed = time.time() - handler.last_event elapsed = time.time() - handler.last_event
if elapsed >= DEBOUNCE_SECONDS: if elapsed >= DEBOUNCE_SECONDS:
handler.pending = False handler.pending = False
start_ingestion_thread() start_ingestion_thread(embedder)
time.sleep(5) time.sleep(5)
except KeyboardInterrupt: except KeyboardInterrupt:
log.info("KeyboardInterrupt — stopping.")
observer.stop() observer.stop()
observer.join() observer.join()
logging.info("Watcher stopped.") log.info("Watcher stopped.")
if __name__ == "__main__": if __name__ == "__main__":