Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 91166367fa | |||
| 2b3c2380a0 | |||
| 2fb50cce71 | |||
| c08f57a6f2 | |||
| cae7fb8775 | |||
| b53717af5b |
+90
-1
@@ -47,6 +47,7 @@ MODE_RANGES = {
|
||||
"late-rem": (0.22, 0.42),
|
||||
"lucid": (0.32, 0.72),
|
||||
}
|
||||
DREAMER_VERSION = "1.1" # 1.0=original exclusion logic; 1.1=score-band exclusion
|
||||
|
||||
# ─── Prompt versioning ──────────────────────────────────────────────────────
|
||||
# Bump the relevant constant manually when changing a prompt.
|
||||
@@ -63,6 +64,11 @@ def prompt_hash(prompts: list[str]) -> str:
|
||||
combined = "".join(prompts)
|
||||
return hashlib.md5(combined.encode()).hexdigest()[:8]
|
||||
|
||||
def extract_folder(source_path):
|
||||
"""Extract top-level Nextcloud folder from source path."""
|
||||
parts = source_path.replace("\\", "/").split("/")
|
||||
return parts[0] if parts else "unknown"
|
||||
|
||||
# ─── Stage 1: Observe ───────────────────────────────────────────────────────
|
||||
|
||||
def observe_corpus():
|
||||
@@ -104,7 +110,55 @@ def get_recent_conversation_topics(days=14):
|
||||
|
||||
# ─── Stage 2: Retrieve ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def retrieve_graphiti(mode, task=None, n_results=8):
|
||||
"""E3 experiment — Graphiti substrate retrieval.
|
||||
Queries Graphiti /search endpoint instead of pgvector.
|
||||
Returns chunks in same format as retrieve() for pipeline compatibility.
|
||||
Note: content is Graphiti facts (synthesized relationships), not raw chunks.
|
||||
"""
|
||||
import requests as req_lib
|
||||
if task:
|
||||
query = task
|
||||
elif mode == "late-rem":
|
||||
delta = observe_corpus()
|
||||
topics = delta.get("recent_topics", [])
|
||||
query = topics[0] if topics else "practice place memory making"
|
||||
elif mode == "early-rem":
|
||||
query = "career decision personal change what matters next"
|
||||
else:
|
||||
query = "research fabrication teaching practice recent work"
|
||||
|
||||
try:
|
||||
resp = req_lib.get(
|
||||
"http://localhost:8001/search",
|
||||
params={"query": query, "limit": n_results, "group_id": "aaron"},
|
||||
timeout=30,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
results = resp.json().get("results", [])
|
||||
chunks = []
|
||||
for r in results:
|
||||
fact = r.get("fact", "")
|
||||
if not fact.strip():
|
||||
continue
|
||||
chunks.append({
|
||||
"source": r.get("source", "graphiti"),
|
||||
"content": fact,
|
||||
"relevance": r.get("score", 0.5),
|
||||
"similarity": r.get("score", 0.5),
|
||||
})
|
||||
return chunks
|
||||
except Exception as e:
|
||||
print(f"[Graphiti retrieval error: {e}] — falling back to empty.")
|
||||
return []
|
||||
|
||||
def retrieve(mode, task=None, n_results=8, excluded_sources=None):
|
||||
# E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search
|
||||
# Default behavior: pgvector similarity search (unchanged)
|
||||
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
|
||||
if substrate == "graphiti":
|
||||
return retrieve_graphiti(mode, task=task, n_results=n_results)
|
||||
from sentence_transformers import SentenceTransformer
|
||||
embedder = SentenceTransformer("all-MiniLM-L6-v2")
|
||||
low, high = MODE_RANGES[mode]
|
||||
@@ -153,6 +207,7 @@ def retrieve(mode, task=None, n_results=8, excluded_sources=None):
|
||||
"source": source or "unknown",
|
||||
"content": doc,
|
||||
"relevance": similarity,
|
||||
"similarity": similarity,
|
||||
})
|
||||
seen_sources.add(source)
|
||||
if len(chunks) >= n_results:
|
||||
@@ -361,6 +416,7 @@ def write_manifest(date_str, stage_data, corpus_data):
|
||||
manifest = {
|
||||
"date": date_str,
|
||||
"prompt_sig": prompt_signature(),
|
||||
"dreamer_version": DREAMER_VERSION,
|
||||
"prompt_hash": prompt_hash([
|
||||
synthesize_nrem.__doc__ or "",
|
||||
synthesize_early_rem.__doc__ or "",
|
||||
@@ -401,6 +457,8 @@ def dream_pipeline():
|
||||
print("\n[NREM] Retrieving...")
|
||||
nrem_chunks = retrieve("nrem", excluded_sources=previously_retrieved | session_retrieved)
|
||||
session_retrieved.update(c["source"] for c in nrem_chunks)
|
||||
# Track sources that scored above Early REM ceiling — these are the only ones Early REM should exclude
|
||||
nrem_high_sources = {c["source"] for c in nrem_chunks if c["similarity"] > 0.55}
|
||||
if not nrem_chunks:
|
||||
print("[NREM] No suitable chunks — aborting pipeline")
|
||||
return None
|
||||
@@ -408,12 +466,17 @@ def dream_pipeline():
|
||||
print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...")
|
||||
nrem_output = synthesize_nrem(nrem_chunks)
|
||||
nrem_file = deliver(nrem_output, "nrem")
|
||||
nrem_sources = [c["source"] for c in nrem_chunks]
|
||||
nrem_folders = list({extract_folder(s) for s in nrem_sources})
|
||||
stage_data = {
|
||||
"nrem": {
|
||||
"chunks_retrieved": len(nrem_chunks),
|
||||
"avg_similarity": round(sum(c["relevance"] for c in nrem_chunks) / len(nrem_chunks), 3),
|
||||
"query": "research fabrication teaching practice recent work",
|
||||
"word_count": len(nrem_output.split()),
|
||||
"sources": nrem_sources,
|
||||
"distinct_folders": nrem_folders,
|
||||
"folder_count": len(nrem_folders),
|
||||
"status": "ok",
|
||||
}
|
||||
}
|
||||
@@ -421,7 +484,9 @@ def dream_pipeline():
|
||||
|
||||
# ── Stage 2: Early REM — informed by NREM ──────────────────────────────
|
||||
print("\n[Early REM] Retrieving...")
|
||||
early_chunks = retrieve("early-rem", excluded_sources=previously_retrieved | session_retrieved)
|
||||
# Early REM excludes previously retrieved + NREM high-scorers only (not full session_retrieved)
|
||||
# Sources that scored in Early REM band during NREM remain available
|
||||
early_chunks = retrieve("early-rem", excluded_sources=previously_retrieved | nrem_high_sources)
|
||||
session_retrieved.update(c["source"] for c in early_chunks)
|
||||
if not early_chunks:
|
||||
print("[Early REM] No suitable chunks — skipping")
|
||||
@@ -430,11 +495,16 @@ def dream_pipeline():
|
||||
print(f"[Early REM] Retrieved {len(early_chunks)} chunks. Synthesizing with NREM context...")
|
||||
early_rem_output = synthesize_early_rem(early_chunks, nrem_output)
|
||||
deliver(early_rem_output, "early-rem")
|
||||
early_sources = [c["source"] for c in early_chunks]
|
||||
early_folders = list({extract_folder(s) for s in early_sources})
|
||||
stage_data["early_rem"] = {
|
||||
"chunks_retrieved": len(early_chunks),
|
||||
"avg_similarity": round(sum(c["relevance"] for c in early_chunks) / len(early_chunks), 3),
|
||||
"query": "career decision personal change what matters next",
|
||||
"word_count": len(early_rem_output.split()),
|
||||
"sources": early_sources,
|
||||
"distinct_folders": early_folders,
|
||||
"folder_count": len(early_folders),
|
||||
"status": "ok",
|
||||
}
|
||||
print(f"[Early REM] Done.\n{early_rem_output[:200]}...")
|
||||
@@ -450,11 +520,22 @@ def dream_pipeline():
|
||||
print(f"[Late REM] Retrieved {len(late_chunks)} chunks. Synthesizing with full context...")
|
||||
late_rem_output = synthesize_late_rem(late_chunks, nrem_output, early_rem_output)
|
||||
deliver(late_rem_output, "late-rem")
|
||||
late_sources = [c["source"] for c in late_chunks]
|
||||
late_folders = [extract_folder(s) for s in late_sources]
|
||||
cross_domain_pairs = sum(
|
||||
1 for i in range(len(late_folders))
|
||||
for j in range(i+1, len(late_folders))
|
||||
if late_folders[i] != late_folders[j]
|
||||
)
|
||||
stage_data["late_rem"] = {
|
||||
"chunks_retrieved": len(late_chunks),
|
||||
"avg_similarity": round(sum(c["relevance"] for c in late_chunks) / len(late_chunks), 3),
|
||||
"query": "practice place memory making",
|
||||
"word_count": len(late_rem_output.split()),
|
||||
"sources": late_sources,
|
||||
"distinct_folders": list(set(late_folders)),
|
||||
"folder_count": len(set(late_folders)),
|
||||
"cross_domain_pairs": cross_domain_pairs,
|
||||
"status": "ok",
|
||||
}
|
||||
print(f"[Late REM] Done.\n{late_rem_output[:200]}...")
|
||||
@@ -474,10 +555,18 @@ def dream_pipeline():
|
||||
print(f"{'='*60}")
|
||||
|
||||
# Write manifest
|
||||
all_session_sources = list(session_retrieved)
|
||||
all_session_folders = list({extract_folder(s) for s in all_session_sources})
|
||||
corpus_data = {
|
||||
"total_chunks": delta.get("new_chunks", 0),
|
||||
"new_chunks_since_last_dream": delta.get("new_chunks", 0),
|
||||
"days_since_last_dream": round(delta.get("days_since_dream", 0), 2),
|
||||
"substrate": "pgvector",
|
||||
"aggregate": {
|
||||
"total_distinct_sources": len(all_session_sources),
|
||||
"total_distinct_folders": len(all_session_folders),
|
||||
"folders_touched": all_session_folders,
|
||||
}
|
||||
}
|
||||
write_manifest(datetime.now().strftime("%Y-%m-%d"), stage_data, corpus_data)
|
||||
|
||||
|
||||
@@ -0,0 +1,222 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
E3 — Dreamer Substrate Comparison Experiment
|
||||
Runs dream pipeline twice: once against pgvector (Condition A),
|
||||
once against Graphiti (Condition B). Implements blinding per protocol.
|
||||
|
||||
Usage:
|
||||
python3 scripts/experiments/e3_dreamer_substrate.py
|
||||
|
||||
Outputs:
|
||||
Journal/Dreams/e3-dream-X-{mode}.md (one condition, identity concealed)
|
||||
Journal/Dreams/e3-dream-Y-{mode}.md (other condition, identity concealed)
|
||||
Journal/Dreams/e3-blind-mapping.json (kept unread until ratings complete)
|
||||
Journal/Dreams/e3-manifest.json (quantitative metrics for both conditions)
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import random
|
||||
import requests
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
||||
|
||||
NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL", "https://nextcloud.aaronnelson.studio")
|
||||
NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron")
|
||||
NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "")
|
||||
DREAMS_WEBDAV_BASE = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams"
|
||||
|
||||
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
|
||||
|
||||
def webdav_put(url, content_str):
|
||||
requests.put(url, data=content_str.encode("utf-8"), auth=auth, timeout=60)
|
||||
|
||||
def webdav_mkcol(url):
|
||||
requests.request("MKCOL", url, auth=auth, timeout=10)
|
||||
|
||||
def run_condition(label, substrate, modes=("nrem", "early-rem", "late-rem", "synthesis")):
|
||||
"""Run the dream pipeline for one substrate condition.
|
||||
Returns dict of {mode: dream_text} and metrics.
|
||||
"""
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Running Condition {label} — substrate: {substrate}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
os.environ["DREAMER_SUBSTRATE"] = substrate
|
||||
|
||||
# Import dream module fresh for this run
|
||||
if "dream" in sys.modules:
|
||||
del sys.modules["dream"]
|
||||
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
|
||||
import dream as d
|
||||
|
||||
# Run pipeline stages manually to capture outputs
|
||||
outputs = {}
|
||||
metrics = {}
|
||||
|
||||
# Observe corpus delta
|
||||
delta = d.observe_corpus()
|
||||
print(f"Corpus delta: {delta.get('new_chunks', 0)} new chunks")
|
||||
|
||||
previously_retrieved = set()
|
||||
session_retrieved = set()
|
||||
nrem_high_sources = set()
|
||||
|
||||
# NREM
|
||||
print("\n[NREM] Retrieving...")
|
||||
nrem_chunks = d.retrieve("nrem", excluded_sources=previously_retrieved | session_retrieved)
|
||||
session_retrieved.update(c["source"] for c in nrem_chunks)
|
||||
nrem_high_sources = {c["source"] for c in nrem_chunks if c.get("similarity", 0) > 0.55}
|
||||
if not nrem_chunks:
|
||||
print("[NREM] No chunks — aborting.")
|
||||
return None, None
|
||||
nrem_output = d.synthesize_nrem(nrem_chunks)
|
||||
outputs["nrem"] = nrem_output
|
||||
metrics["nrem"] = {
|
||||
"chunks_retrieved": len(nrem_chunks),
|
||||
"sources": [c["source"] for c in nrem_chunks],
|
||||
"word_count": len(nrem_output.split()),
|
||||
}
|
||||
print(f"[NREM] Done. {len(nrem_chunks)} chunks, {metrics['nrem']['word_count']} words.")
|
||||
|
||||
# Early REM
|
||||
print("\n[Early REM] Retrieving...")
|
||||
early_chunks = d.retrieve("early-rem", excluded_sources=previously_retrieved | nrem_high_sources)
|
||||
session_retrieved.update(c["source"] for c in early_chunks)
|
||||
if early_chunks:
|
||||
early_output = d.synthesize_early_rem(early_chunks, nrem_output)
|
||||
outputs["early-rem"] = early_output
|
||||
metrics["early-rem"] = {
|
||||
"chunks_retrieved": len(early_chunks),
|
||||
"sources": [c["source"] for c in early_chunks],
|
||||
"word_count": len(early_output.split()),
|
||||
}
|
||||
print(f"[Early REM] Done. {len(early_chunks)} chunks.")
|
||||
else:
|
||||
early_output = nrem_output
|
||||
outputs["early-rem"] = None
|
||||
metrics["early-rem"] = {"chunks_retrieved": 0, "sources": [], "word_count": 0}
|
||||
print("[Early REM] No chunks — using NREM fallback.")
|
||||
|
||||
# Late REM
|
||||
print("\n[Late REM] Retrieving...")
|
||||
late_chunks = d.retrieve("late-rem", excluded_sources=previously_retrieved | session_retrieved)
|
||||
session_retrieved.update(c["source"] for c in late_chunks)
|
||||
if late_chunks:
|
||||
late_output = d.synthesize_late_rem(late_chunks, nrem_output, early_output)
|
||||
outputs["late-rem"] = late_output
|
||||
late_sources = [c["source"] for c in late_chunks]
|
||||
metrics["late-rem"] = {
|
||||
"chunks_retrieved": len(late_chunks),
|
||||
"sources": late_sources,
|
||||
"word_count": len(late_output.split()),
|
||||
}
|
||||
print(f"[Late REM] Done. {len(late_chunks)} chunks.")
|
||||
else:
|
||||
late_output = early_output
|
||||
outputs["late-rem"] = None
|
||||
metrics["late-rem"] = {"chunks_retrieved": 0, "sources": [], "word_count": 0}
|
||||
print("[Late REM] No chunks — using fallback.")
|
||||
|
||||
# Synthesis
|
||||
print("\n[Synthesis] Integrating...")
|
||||
synthesis_output = d.synthesize_final(nrem_output, early_output, late_output)
|
||||
outputs["synthesis"] = synthesis_output
|
||||
metrics["synthesis"] = {"word_count": len(synthesis_output.split())}
|
||||
print(f"[Synthesis] Done. {metrics['synthesis']['word_count']} words.")
|
||||
|
||||
return outputs, metrics
|
||||
|
||||
|
||||
def main():
|
||||
date_str = datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
# Randomize which condition gets label X vs Y for blinding
|
||||
conditions = [("pgvector", "A"), ("graphiti", "B")]
|
||||
labels = ["X", "Y"]
|
||||
random.shuffle(labels)
|
||||
assignment = {conditions[0][0]: labels[0], conditions[1][0]: labels[1]}
|
||||
|
||||
mapping = {
|
||||
"date": date_str,
|
||||
"X": assignment["pgvector"] == "X" and "pgvector" or "graphiti",
|
||||
"Y": assignment["pgvector"] == "Y" and "pgvector" or "graphiti",
|
||||
"reveal_after_rating": True,
|
||||
}
|
||||
# Simpler mapping
|
||||
mapping = {
|
||||
"date": date_str,
|
||||
"X_is": "pgvector" if assignment["pgvector"] == "X" else "graphiti",
|
||||
"Y_is": "pgvector" if assignment["pgvector"] == "Y" else "graphiti",
|
||||
"reveal_after_rating": True,
|
||||
}
|
||||
|
||||
print(f"E3 Dreamer Substrate Comparison — {date_str}")
|
||||
print(f"Blind mapping written (do not read until ratings complete)")
|
||||
|
||||
# Write blind mapping to Nextcloud immediately
|
||||
webdav_mkcol(DREAMS_WEBDAV_BASE)
|
||||
mapping_url = f"{DREAMS_WEBDAV_BASE}/e3-blind-mapping.json"
|
||||
webdav_put(mapping_url, json.dumps(mapping, indent=2))
|
||||
print(f"Mapping written to Journal/Dreams/e3-blind-mapping.json")
|
||||
|
||||
all_metrics = {}
|
||||
|
||||
# Run both conditions
|
||||
for substrate, condition_label in conditions:
|
||||
blind_label = assignment[substrate]
|
||||
outputs, metrics = run_condition(condition_label, substrate)
|
||||
|
||||
if outputs is None:
|
||||
print(f"Condition {condition_label} failed — aborting E3.")
|
||||
return
|
||||
|
||||
all_metrics[blind_label] = {
|
||||
"substrate": "CONCEALED", # concealed until reveal
|
||||
"stages": metrics,
|
||||
}
|
||||
|
||||
# Write dream files with blind labels
|
||||
modes_map = {
|
||||
"nrem": "nrem",
|
||||
"early-rem": "early-rem",
|
||||
"late-rem": "late-rem",
|
||||
"synthesis": "synthesis",
|
||||
}
|
||||
for mode, text in outputs.items():
|
||||
if text is None:
|
||||
continue
|
||||
filename = f"e3-dream-{blind_label}-{mode}.md"
|
||||
header = f"# E3 Dream — {blind_label} — {mode.upper()} — {date_str}\n"
|
||||
header += f"*Substrate concealed until rating complete*\n\n---\n\n"
|
||||
file_content = header + text
|
||||
url = f"{DREAMS_WEBDAV_BASE}/{filename}"
|
||||
webdav_put(url, file_content)
|
||||
print(f"Written: Journal/Dreams/{filename}")
|
||||
|
||||
# Write quantitative metrics (substrate concealed)
|
||||
e3_manifest = {
|
||||
"date": date_str,
|
||||
"experiment": "E3",
|
||||
"protocol": "E3-dreamer-substrate-comparison-protocol.md",
|
||||
"blind_labels": ["X", "Y"],
|
||||
"substrate_concealed": True,
|
||||
"metrics": all_metrics,
|
||||
"rating_dimensions": ["Texture", "Reach", "Specificity", "Forward-facing"],
|
||||
"rating_scale": "1-5 per dimension per dream",
|
||||
}
|
||||
manifest_url = f"{DREAMS_WEBDAV_BASE}/e3-manifest.json"
|
||||
webdav_put(manifest_url, json.dumps(e3_manifest, indent=2))
|
||||
print(f"\nE3 manifest written to Journal/Dreams/e3-manifest.json")
|
||||
print(f"\n{'='*60}")
|
||||
print("E3 runs complete. Eight dream files written.")
|
||||
print("Rate all 8 dreams before reading e3-blind-mapping.json")
|
||||
print(f"{'='*60}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -143,6 +143,8 @@ def ingest_file(filepath):
|
||||
print(f" Indexed {len(chunks)} chunks: {path.name}")
|
||||
|
||||
# Enqueue for Stage 2 → Stage 3 (Graphiti pipeline)
|
||||
# SKIP_STAGE2_ENQUEUE env var set by migration scripts to prevent bulk enqueue
|
||||
if not os.getenv("SKIP_STAGE2_ENQUEUE"):
|
||||
enqueue_stage2(path.name, text)
|
||||
|
||||
return len(chunks)
|
||||
|
||||
@@ -21,7 +21,6 @@ logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [stage2] %(levelname)s %(message)s",
|
||||
handlers=[
|
||||
logging.StreamHandler(),
|
||||
logging.FileHandler("/var/log/aaronai/stage2.log", mode="a"),
|
||||
]
|
||||
)
|
||||
|
||||
@@ -24,7 +24,6 @@ logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [stage3] %(levelname)s %(message)s",
|
||||
handlers=[
|
||||
logging.StreamHandler(),
|
||||
logging.FileHandler("/var/log/aaronai/stage3.log", mode="a"),
|
||||
]
|
||||
)
|
||||
|
||||
+270
-103
@@ -1,62 +1,214 @@
|
||||
"""
|
||||
Aaron AI Watcher — Stage 1 of the encoding pipeline.
|
||||
|
||||
Watches the Nextcloud directory for new or changed files.
|
||||
On detection, chunks + embeds documents in-process (no subprocess),
|
||||
then enqueues to stage_2_queue for async cascade processing.
|
||||
|
||||
Design principles:
|
||||
- Embedding model loaded ONCE at startup, reused across all ingest runs
|
||||
- In-process ingest (no subprocess) — eliminates per-run model reload memory spike
|
||||
- Missed-file recovery on startup — ingests anything new since last state
|
||||
- Heartbeat file updated every loop tick — enables external health monitoring
|
||||
- Parity principle: no filtering, no decisions, faithful capture
|
||||
- Does NOT enqueue to stage_2_queue during bulk migration (SKIP_STAGE2_ENQUEUE env var)
|
||||
|
||||
Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3_queue -> Stage 3 (Graphiti)
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import logging
|
||||
import json
|
||||
import hashlib
|
||||
import logging
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
import psycopg2
|
||||
from dotenv import load_dotenv
|
||||
from sentence_transformers import SentenceTransformer
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
|
||||
from docx import Document as DocxDocument
|
||||
from pypdf import PdfReader
|
||||
from pptx import Presentation
|
||||
|
||||
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
||||
|
||||
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
|
||||
INGEST_SCRIPT = "/home/aaron/aaronai/scripts/ingest.py"
|
||||
PYTHON = "/home/aaron/aaronai/venv/bin/python3"
|
||||
LOG_FILE = "/home/aaron/aaronai/watcher.log"
|
||||
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
|
||||
|
||||
SUPPORTED = {'.pdf', '.docx', '.pptx', '.txt', '.md'}
|
||||
DEBOUNCE_SECONDS = 120
|
||||
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
|
||||
HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat"
|
||||
|
||||
SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"}
|
||||
DEBOUNCE_SECONDS = 120
|
||||
CHUNK_SIZE = 500
|
||||
CHUNK_OVERLAP = 50
|
||||
EMBED_MODEL = "all-MiniLM-L6-v2"
|
||||
|
||||
PG_DSN = os.getenv("PG_DSN")
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler(LOG_FILE),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
format="%(asctime)s [watcher] %(levelname)s %(message)s",
|
||||
handlers=[logging.FileHandler(LOG_FILE)],
|
||||
)
|
||||
log = logging.getLogger("watcher")
|
||||
|
||||
ingestion_state = {
|
||||
"status": "idle",
|
||||
"message": "",
|
||||
"file_count": 0,
|
||||
"started_at": None,
|
||||
"finished_at": None,
|
||||
"last_error": "",
|
||||
}
|
||||
ingestion_lock = threading.Lock()
|
||||
ingestion_state = {
|
||||
"status": "idle", "message": "", "file_count": 0,
|
||||
"started_at": None, "finished_at": None, "last_error": "",
|
||||
}
|
||||
ingestion_thread = None
|
||||
|
||||
|
||||
def set_ingestion_state(**kwargs):
|
||||
with ingestion_lock:
|
||||
ingestion_state.update(kwargs)
|
||||
def load_embedder():
|
||||
log.info(f"Loading embedding model: {EMBED_MODEL}")
|
||||
model = SentenceTransformer(EMBED_MODEL)
|
||||
log.info("Embedding model ready.")
|
||||
return model
|
||||
|
||||
|
||||
def load_state():
|
||||
def get_pg():
|
||||
return psycopg2.connect(PG_DSN)
|
||||
|
||||
|
||||
def extract_text(path: Path) -> str:
|
||||
suffix = path.suffix.lower()
|
||||
try:
|
||||
if suffix == ".docx":
|
||||
doc = DocxDocument(path)
|
||||
return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
|
||||
elif suffix == ".pdf":
|
||||
reader = PdfReader(path)
|
||||
return "".join(
|
||||
page.extract_text() + "\n"
|
||||
for page in reader.pages if page.extract_text()
|
||||
)
|
||||
elif suffix == ".pptx":
|
||||
prs = Presentation(path)
|
||||
return "\n".join(
|
||||
shape.text for slide in prs.slides
|
||||
for shape in slide.shapes
|
||||
if hasattr(shape, "text") and shape.text.strip()
|
||||
)
|
||||
elif suffix in {".txt", ".md"}:
|
||||
return path.read_text(encoding="utf-8", errors="ignore")
|
||||
except Exception as e:
|
||||
log.warning(f"Text extraction failed for {path.name}: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def chunk_text(text: str) -> list:
|
||||
words = text.split()
|
||||
chunks = []
|
||||
start = 0
|
||||
while start < len(words):
|
||||
chunk = " ".join(words[start:start + CHUNK_SIZE])
|
||||
if chunk.strip():
|
||||
chunks.append(chunk)
|
||||
start += CHUNK_SIZE - CHUNK_OVERLAP
|
||||
return chunks
|
||||
|
||||
|
||||
def make_chunk_id(filepath: Path, chunk_index: int) -> str:
|
||||
return hashlib.md5(str(filepath).encode()).hexdigest()[:8] + f"_{chunk_index}"
|
||||
|
||||
|
||||
def enqueue_stage2(source: str, full_text: str):
|
||||
if os.getenv("SKIP_STAGE2_ENQUEUE"):
|
||||
return
|
||||
try:
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute("""
|
||||
INSERT INTO stage_2_queue (source, full_text, char_length)
|
||||
VALUES (%s, %s, %s)
|
||||
ON CONFLICT (source) DO UPDATE SET
|
||||
full_text = EXCLUDED.full_text,
|
||||
char_length = EXCLUDED.char_length,
|
||||
enqueued_at = NOW(),
|
||||
completed_at = NULL,
|
||||
failed_at = NULL,
|
||||
attempts = 0
|
||||
""", (source, full_text[:50000], len(full_text)))
|
||||
pg.commit()
|
||||
pg.close()
|
||||
except Exception as e:
|
||||
log.warning(f"Stage 2 enqueue failed (non-fatal): {e}")
|
||||
|
||||
|
||||
def ingest_file(filepath: Path, embedder) -> int:
|
||||
if filepath.name.startswith(("~$", ".")):
|
||||
return 0
|
||||
if filepath.suffix.lower() not in SUPPORTED:
|
||||
return 0
|
||||
text = extract_text(filepath)
|
||||
if not text.strip():
|
||||
return 0
|
||||
chunks = chunk_text(text)
|
||||
if not chunks:
|
||||
return 0
|
||||
try:
|
||||
embeddings = embedder.encode(chunks).tolist()
|
||||
except Exception as e:
|
||||
log.error(f"Embedding failed for {filepath.name}: {e}")
|
||||
return 0
|
||||
source = filepath.name
|
||||
try:
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
|
||||
chunk_id = make_chunk_id(filepath, i)
|
||||
cur.execute("""
|
||||
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
|
||||
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
document = EXCLUDED.document,
|
||||
embedding = EXCLUDED.embedding,
|
||||
source = EXCLUDED.source,
|
||||
metadata = EXCLUDED.metadata
|
||||
""", (chunk_id, chunk, embedding, source, "document",
|
||||
json.dumps({"source": source, "filepath": str(filepath)})))
|
||||
pg.commit()
|
||||
pg.close()
|
||||
except Exception as e:
|
||||
log.error(f"pgvector write failed for {filepath.name}: {e}")
|
||||
return 0
|
||||
log.info(f"Indexed {len(chunks)} chunks: {filepath.name}")
|
||||
enqueue_stage2(source, text)
|
||||
return len(chunks)
|
||||
|
||||
|
||||
def ingest_files(paths: list, embedder, state: dict) -> dict:
|
||||
total = 0
|
||||
for path in paths:
|
||||
count = ingest_file(path, embedder)
|
||||
total += count
|
||||
state[str(path)] = str(path.stat().st_mtime)
|
||||
log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.")
|
||||
return state
|
||||
|
||||
|
||||
def load_state() -> dict:
|
||||
if Path(STATE_FILE).exists():
|
||||
try:
|
||||
with open(STATE_FILE) as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
def save_state(state):
|
||||
with open(STATE_FILE, 'w') as f:
|
||||
def save_state(state: dict):
|
||||
with open(STATE_FILE, "w") as f:
|
||||
json.dump(state, f)
|
||||
|
||||
|
||||
def get_changed_files():
|
||||
state = load_state()
|
||||
def get_changed_files(state: dict) -> list:
|
||||
changed = []
|
||||
root = Path(NEXTCLOUD_PATH)
|
||||
for path in root.rglob("*"):
|
||||
@@ -64,84 +216,81 @@ def get_changed_files():
|
||||
continue
|
||||
if path.suffix.lower() not in SUPPORTED:
|
||||
continue
|
||||
if path.name.startswith('.') or path.name.startswith('~$'):
|
||||
if path.name.startswith((".", "~$")):
|
||||
continue
|
||||
mtime = str(path.stat().st_mtime)
|
||||
key = str(path)
|
||||
if state.get(key) != mtime:
|
||||
if "Admin/Backups" in str(path) or "Backups" in path.parts:
|
||||
continue
|
||||
if "Journal/Media" in str(path):
|
||||
continue
|
||||
if state.get(str(path)) != str(path.stat().st_mtime):
|
||||
changed.append(path)
|
||||
return changed, state
|
||||
return changed
|
||||
|
||||
|
||||
def run_ingestion():
|
||||
changed, state = get_changed_files()
|
||||
def set_ingestion_state(**kwargs):
|
||||
with ingestion_lock:
|
||||
ingestion_state.update(kwargs)
|
||||
|
||||
|
||||
def write_status(handler):
|
||||
with ingestion_lock:
|
||||
status = {
|
||||
"running": True, "timestamp": time.time(),
|
||||
"pending": handler.pending, "last_event": handler.last_event,
|
||||
"ingestion": dict(ingestion_state),
|
||||
}
|
||||
try:
|
||||
with open(STATUS_FILE, "w") as f:
|
||||
json.dump(status, f)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def write_heartbeat():
|
||||
try:
|
||||
Path(HEARTBEAT_FILE).write_text(str(time.time()))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def run_ingestion(embedder):
|
||||
state = load_state()
|
||||
changed = get_changed_files(state)
|
||||
if not changed:
|
||||
logging.info("No new or changed files detected — skipping ingestion.")
|
||||
log.info("No new or changed files — skipping ingestion.")
|
||||
set_ingestion_state(status="idle", message="No changes detected", file_count=0)
|
||||
return
|
||||
|
||||
count = len(changed)
|
||||
logging.info(f"Found {count} new or changed files — starting ingestion...")
|
||||
log.info(f"Found {count} new or changed files — starting ingestion...")
|
||||
set_ingestion_state(
|
||||
status="ingesting",
|
||||
message=f"Ingesting {count} file(s)...",
|
||||
file_count=count,
|
||||
started_at=time.time(),
|
||||
finished_at=None,
|
||||
last_error="",
|
||||
status="ingesting", message=f"Ingesting {count} file(s)...",
|
||||
file_count=count, started_at=time.time(), finished_at=None, last_error="",
|
||||
)
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=1800
|
||||
)
|
||||
if result.returncode == 0:
|
||||
root = Path(NEXTCLOUD_PATH)
|
||||
for path in root.rglob("*"):
|
||||
if path.is_file() and path.suffix.lower() in SUPPORTED:
|
||||
state[str(path)] = str(path.stat().st_mtime)
|
||||
state = ingest_files(changed, embedder, state)
|
||||
save_state(state)
|
||||
logging.info("Ingestion complete. State updated.")
|
||||
set_ingestion_state(
|
||||
status="idle",
|
||||
message=f"Last run: ingested {count} file(s) successfully",
|
||||
finished_at=time.time(),
|
||||
)
|
||||
else:
|
||||
logging.error(f"Ingestion error: {result.stderr}")
|
||||
set_ingestion_state(
|
||||
status="error",
|
||||
message="Ingestion failed — see log",
|
||||
last_error=result.stderr[-300:],
|
||||
finished_at=time.time(),
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
logging.error("Ingestion timed out.")
|
||||
set_ingestion_state(
|
||||
status="error",
|
||||
message="Ingestion timed out (>30 min)",
|
||||
last_error="TimeoutExpired",
|
||||
finished_at=time.time(),
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Ingestion failed: {e}")
|
||||
log.error(f"Ingestion failed: {e}")
|
||||
set_ingestion_state(
|
||||
status="error",
|
||||
message=f"Ingestion exception: {e}",
|
||||
last_error=str(e),
|
||||
finished_at=time.time(),
|
||||
status="error", message=f"Ingestion exception: {e}",
|
||||
last_error=str(e), finished_at=time.time(),
|
||||
)
|
||||
|
||||
|
||||
def start_ingestion_thread():
|
||||
def start_ingestion_thread(embedder):
|
||||
global ingestion_thread
|
||||
with ingestion_lock:
|
||||
if ingestion_thread and ingestion_thread.is_alive():
|
||||
logging.info("Ingestion already running — skipping.")
|
||||
log.info("Ingestion already running — skipping.")
|
||||
return
|
||||
ingestion_thread = threading.Thread(target=run_ingestion, daemon=True)
|
||||
ingestion_thread = threading.Thread(
|
||||
target=run_ingestion, args=(embedder,), daemon=True
|
||||
)
|
||||
ingestion_thread.start()
|
||||
|
||||
|
||||
@@ -156,54 +305,72 @@ class IngestHandler(FileSystemEventHandler):
|
||||
path = Path(event.src_path)
|
||||
if path.suffix.lower() not in SUPPORTED:
|
||||
return
|
||||
if path.name.startswith('.') or path.name.startswith('~$'):
|
||||
if path.name.startswith((".", "~$")):
|
||||
return
|
||||
if 'Admin/Backups' in str(path) or 'Backups' in path.parts:
|
||||
if "Admin/Backups" in str(path) or "Backups" in path.parts:
|
||||
return
|
||||
if 'Journal/Media' in str(path):
|
||||
if "Journal/Media" in str(path):
|
||||
return
|
||||
if event.event_type not in ('modified', 'created', 'moved'):
|
||||
if event.event_type not in ("modified", "created", "moved"):
|
||||
return
|
||||
logging.info(f"Event: {event.event_type} {event.src_path}")
|
||||
log.info(f"Event: {event.event_type} {event.src_path}")
|
||||
self.pending = True
|
||||
self.last_event = time.time()
|
||||
|
||||
|
||||
def write_status(handler):
|
||||
with ingestion_lock:
|
||||
status = {
|
||||
"running": True,
|
||||
"timestamp": time.time(),
|
||||
"pending": handler.pending,
|
||||
"last_event": handler.last_event,
|
||||
"ingestion": dict(ingestion_state),
|
||||
}
|
||||
with open(STATUS_FILE, 'w') as f:
|
||||
json.dump(status, f)
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Aaron AI Watcher starting...")
|
||||
logging.info(f"Watching: {NEXTCLOUD_PATH}")
|
||||
log.info("Aaron AI Watcher starting...")
|
||||
log.info(f"Watching: {NEXTCLOUD_PATH}")
|
||||
|
||||
embedder = load_embedder()
|
||||
|
||||
log.info("Startup scan: checking for files missed since last run...")
|
||||
state = load_state()
|
||||
missed = get_changed_files(state)
|
||||
if missed:
|
||||
log.info(f"Startup recovery: {len(missed)} missed file(s) — ingesting now.")
|
||||
set_ingestion_state(
|
||||
status="ingesting",
|
||||
message=f"Startup recovery: ingesting {len(missed)} missed file(s)...",
|
||||
file_count=len(missed), started_at=time.time(),
|
||||
)
|
||||
try:
|
||||
state = ingest_files(missed, embedder, state)
|
||||
save_state(state)
|
||||
set_ingestion_state(
|
||||
status="idle",
|
||||
message=f"Startup recovery complete: {len(missed)} file(s) ingested.",
|
||||
finished_at=time.time(),
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(f"Startup recovery failed: {e}")
|
||||
set_ingestion_state(status="error", message=str(e),
|
||||
last_error=str(e), finished_at=time.time())
|
||||
else:
|
||||
log.info("Startup scan: no missed files.")
|
||||
|
||||
handler = IngestHandler()
|
||||
observer = Observer()
|
||||
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
|
||||
observer.start()
|
||||
log.info("Observer started.")
|
||||
|
||||
try:
|
||||
while True:
|
||||
write_heartbeat()
|
||||
write_status(handler)
|
||||
if handler.pending:
|
||||
elapsed = time.time() - handler.last_event
|
||||
if elapsed >= DEBOUNCE_SECONDS:
|
||||
handler.pending = False
|
||||
start_ingestion_thread()
|
||||
start_ingestion_thread(embedder)
|
||||
time.sleep(5)
|
||||
except KeyboardInterrupt:
|
||||
log.info("KeyboardInterrupt — stopping.")
|
||||
observer.stop()
|
||||
|
||||
observer.join()
|
||||
logging.info("Watcher stopped.")
|
||||
log.info("Watcher stopped.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user