Code review fixes: CV pinning, F1 (excluded_sources), F14 (50KB truncation), F37

- api.py: strip CV pinning workaround (parity violation, see architecture doc)
- dream.py: F1 — retrieve_graphiti() now accepts excluded_sources, over-fetches
  3x and filters in-process. Was silently dropping the parameter; would have
  confounded E3 with broken cross-stage exclusion in Graphiti arm.
- watcher.py + ingest.py: F14 — drop full_text[:50000] truncation. Was
  propagating through entire cascade. Postgres TEXT can hold up to 1GB.
- corpus_integrity.py: F37 — same truncation, third path now clean.

Backups: api.py.bak.*, dream.py.bak.*, watcher.py.bak.*, ingest.py.bak.*,
corpus_integrity.py.bak.* timestamped pre-fix.

Re-cascaded Shop Class as Soulcraft (only already-cascaded source affected
by F14, 414KB).
This commit is contained in:
2026-05-01 02:26:37 +00:00
parent 25e42c0231
commit 465f2f725b
17 changed files with 4432 additions and 58 deletions
+5 -34
View File
@@ -161,8 +161,6 @@ def require_auth(request: Request):
raise HTTPException(status_code=401, detail="Not authenticated")
return token
CV_SOURCES = ["Aaron Nelson CV 2024.pdf", "Aaron Nelson CV 2025.pdf", "Aaron Nelson - CV.docx"]
def init_conversations_db():
conn = sqlite3.connect(CONVERSATIONS_DB)
c = conn.cursor()
@@ -224,50 +222,23 @@ def remove_from_memory(item):
save_memory("\n".join(filtered))
return len(lines) - len(filtered)
def get_pinned_cv_context():
try:
pg = get_pg()
cur = pg.cursor()
cur.execute(
"SELECT document, source FROM embeddings WHERE source = ANY(%s)",
(CV_SOURCES,)
)
rows = cur.fetchall()
pg.close()
docs = [r[0] for r in rows]
metas = [{"source": r[1]} for r in rows]
return docs, metas
except:
return [], []
def is_professional_query(query):
keywords = ["grant", "publication", "exhibition", "award", "fellowship",
"experience", "position", "job", "career", "cv", "resume",
"research", "work history", "accomplishment", "teaching",
"course", "client", "consultation", "presentation", "workshop",
"education", "degree", "institution", "service", "committee"]
return any(k in query.lower() for k in keywords)
def retrieve_context(query, n_results=8):
"""Pure semantic retrieval over pgvector. Top-N by cosine similarity, threshold 0.3.
No CV pinning, no keyword routing — see architecture doc substrate-dependency section.
Substrate-level workarounds (entity-keyed routing, hybrid retrieval) live at the
Graphiti layer, not as wrapper logic above pgvector."""
query_embedding = embedder.encode([query]).tolist()[0]
context_pieces = []
sources = []
if is_professional_query(query):
cv_docs, cv_metas = get_pinned_cv_context()
for doc, meta in zip(cv_docs, cv_metas):
context_pieces.append(f"[CV] {doc}")
sources.append(meta.get("source", "CV"))
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity
FROM embeddings
WHERE source NOT IN %s
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (query_embedding, tuple(CV_SOURCES) if CV_SOURCES else ('__none__',),
query_embedding, n_results))
""", (query_embedding, query_embedding, n_results))
for doc, source, similarity in cur.fetchall():
if similarity > 0.3:
context_pieces.append(doc)
File diff suppressed because it is too large Load Diff
+442
View File
@@ -0,0 +1,442 @@
"""
Consolidator 0.1 — alias resolution agent for BirdAI's Tier 1 substrate.
Reads entities from FalkorDB group_id 'aaron', infers light type labels,
computes pairwise similarity within type blocks using ego summary embedding +
name string distance + neighbor pattern overlap, generates merge proposals
above threshold, writes proposal log for human review.
Does NOT execute merges. 0.1 is the calibration phase — proposals only,
human reviews before any action.
"""
import json
import re
import os
from datetime import datetime, timezone
from collections import defaultdict
from pathlib import Path
from falkordb import FalkorDB
import numpy as np
# Configuration
GROUP_ID = "aaron"
HIGH_CONFIDENCE_THRESHOLD = 0.85 # propose merge above this
LOW_CONFIDENCE_THRESHOLD = 0.65 # log as low-confidence below
PROPOSALS_DIR = Path("/home/aaron/Nextcloud/Journal/Consolidation")
PROPOSALS_DIR.mkdir(parents=True, exist_ok=True)
def cosine_similarity(a, b):
"""Cosine similarity between two embedding vectors."""
a = np.array(a, dtype=np.float32)
b = np.array(b, dtype=np.float32)
na = np.linalg.norm(a)
nb = np.linalg.norm(b)
if na == 0 or nb == 0:
return 0.0
return float(np.dot(a, b) / (na * nb))
def name_similarity(name_a, name_b):
"""
Token-overlap-based name similarity.
Handles formal/informal pairs (Aaron / Aaron Nelson),
abbreviation pairs (HVAMC / Hudson Valley AMC),
and simple transcription noise.
"""
a_lower = name_a.lower().strip()
b_lower = name_b.lower().strip()
if a_lower == b_lower:
return 1.0
# Tokenize
a_tokens = set(re.findall(r'\b\w+\b', a_lower))
b_tokens = set(re.findall(r'\b\w+\b', b_lower))
if not a_tokens or not b_tokens:
return 0.0
# Substring containment (handles "Aaron" in "Aaron Nelson")
if a_lower in b_lower or b_lower in a_lower:
# Strong signal but not 1.0 — different lengths
shorter = min(len(a_lower), len(b_lower))
longer = max(len(a_lower), len(b_lower))
return 0.7 + 0.2 * (shorter / longer)
# Token Jaccard (handles "Aaron Nelson" vs "Nelson, Aaron")
intersection = a_tokens & b_tokens
union = a_tokens | b_tokens
jaccard = len(intersection) / len(union)
# Acronym check (HVAMC vs Hudson Valley Additive Manufacturing Center)
def is_acronym(short, full):
if len(short) >= len(full):
return False
if not short.isupper():
short_upper = short.upper()
else:
short_upper = short
full_words = full.split()
if len(full_words) < 2:
return False
first_letters = ''.join(w[0].upper() for w in full_words if w)
return short_upper == first_letters or short_upper in first_letters
if is_acronym(name_a, name_b) or is_acronym(name_b, name_a):
return 0.85
return jaccard
def infer_type(entity_name, summary):
"""
Light type inference for blocking. Heuristic-based, transparent.
Returns one of: person, organization, project, place, concept, unknown.
NOT a precise classification — just enough to avoid obviously wrong
cross-type comparisons (person vs project). When in doubt, return
'unknown' which gets compared against everything.
"""
name_lower = entity_name.lower().strip()
summary_lower = (summary or "").lower()
# Person: name patterns
person_indicators = [
# First+Last name pattern (two title-cased words, no other tokens)
bool(re.match(r'^[A-Z][a-z]+ [A-Z][a-z]+$', entity_name.strip())),
# Single name that's also in the summary as a person
any(phrase in summary_lower for phrase in [
'is a person', 'is a professor', 'is an artist', 'is a colleague',
'is a friend', 'is a family member', 'works at', 'studied at',
"'s spouse", "'s child", "'s parent", "'s student",
]),
]
if any(person_indicators):
return "person"
# Organization: company/institution indicators
org_indicators = [
any(suffix in name_lower for suffix in [
' inc', ' llc', ' corp', ' company', ' university', ' college',
' school', ' institute', ' foundation', ' department',
]),
any(phrase in summary_lower for phrase in [
'is a company', 'is a university', 'is an organization',
'is an institution', 'is a department', 'is a nonprofit',
]),
]
if any(org_indicators):
return "organization"
# Project: software/creative work indicators
project_indicators = [
any(phrase in summary_lower for phrase in [
'is a project', 'software project', 'is a codebase',
'is a tool', 'is a system', 'is an application',
'is a research project', 'is a design project',
]),
any(suffix in name_lower for suffix in [' project', ' system', ' platform']),
]
if any(project_indicators):
return "project"
# Place: location indicators
place_indicators = [
any(phrase in summary_lower for phrase in [
'is a city', 'is a town', 'is a state', 'is a country',
'is a neighborhood', 'is a region', 'is a location',
]),
]
if any(place_indicators):
return "place"
# Default
return "unknown"
def get_neighbors(graph, entity_uuid, limit=20):
"""Get the names of entities connected to this entity (1-hop)."""
query = """
MATCH (e:Entity {uuid: $uuid})-[r:RELATES_TO]-(other:Entity)
RETURN DISTINCT other.name AS name
LIMIT $limit
"""
result = graph.query(query, {"uuid": entity_uuid, "limit": limit})
return set(row[0] for row in result.result_set if row[0])
def neighbor_jaccard(neighbors_a, neighbors_b):
"""Jaccard similarity of two neighbor sets."""
if not neighbors_a and not neighbors_b:
return 0.0
intersection = neighbors_a & neighbors_b
union = neighbors_a | neighbors_b
if not union:
return 0.0
return len(intersection) / len(union)
def get_edge_count(graph, entity_uuid):
query = """
MATCH (e:Entity {uuid: $uuid})-[r:RELATES_TO]-()
RETURN count(r) AS c
"""
result = graph.query(query, {"uuid": entity_uuid})
return result.result_set[0][0] if result.result_set else 0
def combine_signals(name_sim, ego_sim, neighbor_sim):
"""
Combine the three similarity signals into a single confidence score.
Weighting based on DEG-RAG findings: ego info is essential, neighbor
cues help in some settings, name similarity is a strong tie-breaker
but not the primary signal.
For 0.1, simple weighted average with floor based on ego_sim alone.
"""
# If ego similarity is very low, the entities probably aren't aliases
# regardless of name match (different concepts can share names)
if ego_sim < 0.4:
return min(0.5, ego_sim)
# If name is very similar AND ego is at least moderate, high confidence
if name_sim >= 0.85 and ego_sim >= 0.65:
return 0.5 * ego_sim + 0.3 * name_sim + 0.2 * neighbor_sim
# Standard weighted average
return 0.5 * ego_sim + 0.25 * name_sim + 0.25 * neighbor_sim
def generate_proposals():
db = FalkorDB(host='localhost', port=6379)
graph = db.select_graph(GROUP_ID)
# Pull all entities with embeddings
print(f"Fetching entities from group_id '{GROUP_ID}'...")
result = graph.query("""
MATCH (n:Entity)
WHERE n.name_embedding IS NOT NULL AND n.summary IS NOT NULL
RETURN n.uuid, n.name, n.summary, n.name_embedding
""")
entities = []
for row in result.result_set:
entities.append({
'uuid': row[0],
'name': row[1],
'summary': row[2],
'embedding': row[3],
})
print(f" Loaded {len(entities)} entities with embeddings")
# Infer types for blocking
print("Inferring entity types for blocking...")
type_counts = defaultdict(int)
for e in entities:
e['inferred_type'] = infer_type(e['name'], e['summary'])
type_counts[e['inferred_type']] += 1
for t, c in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {t}: {c}")
# Group by inferred type for blocking
blocks = defaultdict(list)
for e in entities:
blocks[e['inferred_type']].append(e)
# 'unknown' entities get compared against everything (they might be any type)
# Other types only get compared within their type block + against unknowns
print()
print("Comparing entities within type blocks...")
proposals = []
low_confidence = []
comparisons_done = 0
# Build comparison pairs
pairs_to_compare = []
typed_blocks = {t: ents for t, ents in blocks.items() if t != 'unknown'}
unknown_block = blocks.get('unknown', [])
# Within-type pairs (excluding unknown)
for t, ents in typed_blocks.items():
for i in range(len(ents)):
for j in range(i + 1, len(ents)):
pairs_to_compare.append((ents[i], ents[j]))
# Unknown vs unknown
for i in range(len(unknown_block)):
for j in range(i + 1, len(unknown_block)):
pairs_to_compare.append((unknown_block[i], unknown_block[j]))
# Unknown vs typed (unknowns might be any type)
for ent_unknown in unknown_block:
for t, ents in typed_blocks.items():
for ent_typed in ents:
pairs_to_compare.append((ent_unknown, ent_typed))
print(f" Pairs to compare: {len(pairs_to_compare):,}")
# Compute similarities
cache_neighbors = {}
def neighbors_cached(uuid):
if uuid not in cache_neighbors:
cache_neighbors[uuid] = get_neighbors(graph, uuid)
return cache_neighbors[uuid]
for ent_a, ent_b in pairs_to_compare:
comparisons_done += 1
if comparisons_done % 5000 == 0:
print(f" ... {comparisons_done:,} / {len(pairs_to_compare):,}")
# Quick filter: skip if name similarity is very low and names are clearly different
name_sim = name_similarity(ent_a['name'], ent_b['name'])
ego_sim_quick = cosine_similarity(ent_a['embedding'], ent_b['embedding'])
# Pre-filter to avoid expensive neighbor query on obviously different pairs
if ego_sim_quick < 0.5 and name_sim < 0.3:
continue
# Full comparison
neighbors_a = neighbors_cached(ent_a['uuid'])
neighbors_b = neighbors_cached(ent_b['uuid'])
neighbor_sim = neighbor_jaccard(neighbors_a, neighbors_b)
confidence = combine_signals(name_sim, ego_sim_quick, neighbor_sim)
record = {
'entity_a': {
'uuid': ent_a['uuid'],
'name': ent_a['name'],
'type': ent_a['inferred_type'],
'summary': ent_a['summary'][:200],
'edge_count': get_edge_count(graph, ent_a['uuid']),
},
'entity_b': {
'uuid': ent_b['uuid'],
'name': ent_b['name'],
'type': ent_b['inferred_type'],
'summary': ent_b['summary'][:200],
'edge_count': get_edge_count(graph, ent_b['uuid']),
},
'confidence': round(confidence, 3),
'signals': {
'name_similarity': round(name_sim, 3),
'ego_similarity': round(ego_sim_quick, 3),
'neighbor_overlap': round(neighbor_sim, 3),
},
'shared_neighbors': sorted(list(neighbors_a & neighbors_b))[:10],
}
if confidence >= HIGH_CONFIDENCE_THRESHOLD:
proposals.append(record)
elif confidence >= LOW_CONFIDENCE_THRESHOLD:
low_confidence.append(record)
print(f"\nDone. Proposals: {len(proposals)}, Low-confidence: {len(low_confidence)}")
return proposals, low_confidence, len(entities), len(pairs_to_compare)
def write_proposals_log(proposals, low_confidence, total_entities, total_comparisons):
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H%M")
out_path = PROPOSALS_DIR / f"proposals-{timestamp}.md"
proposals_sorted = sorted(proposals, key=lambda p: -p['confidence'])
low_sorted = sorted(low_confidence, key=lambda p: -p['confidence'])
lines = []
lines.append(f"# Consolidator 0.1 — Run {timestamp}")
lines.append("")
lines.append("## Statistics")
lines.append(f"- Entities scanned: {total_entities:,}")
lines.append(f"- Pairwise comparisons: {total_comparisons:,}")
lines.append(f"- High-confidence proposals (≥{HIGH_CONFIDENCE_THRESHOLD}): {len(proposals)}")
lines.append(f"- Low-confidence candidates ({LOW_CONFIDENCE_THRESHOLD}-{HIGH_CONFIDENCE_THRESHOLD}): {len(low_confidence)}")
lines.append("")
lines.append("## How to review")
lines.append("")
lines.append("For each proposal, mark your decision by changing `[ ]` to one of:")
lines.append("- `[APPROVE]` — execute this merge on next run")
lines.append("- `[REJECT]` — don't merge, don't propose again")
lines.append("- `[DEFER]` — re-surface in next run for further consideration")
lines.append("")
lines.append("Save the file when done. Do not modify proposal_id or uuid fields.")
lines.append("")
lines.append("---")
lines.append("")
lines.append(f"## Proposed Merges (n={len(proposals)})")
lines.append("")
for i, p in enumerate(proposals_sorted, start=1):
lines.append(f"### Proposal {i}")
lines.append("")
lines.append(f"**Decision:** [ ]")
lines.append("")
lines.append(f"**Confidence:** {p['confidence']}")
lines.append("")
lines.append(f"**Entity A:** \"{p['entity_a']['name']}\" (type: {p['entity_a']['type']}, {p['entity_a']['edge_count']} edges)")
lines.append(f" - uuid: `{p['entity_a']['uuid']}`")
lines.append(f" - summary: {p['entity_a']['summary']}")
lines.append("")
lines.append(f"**Entity B:** \"{p['entity_b']['name']}\" (type: {p['entity_b']['type']}, {p['entity_b']['edge_count']} edges)")
lines.append(f" - uuid: `{p['entity_b']['uuid']}`")
lines.append(f" - summary: {p['entity_b']['summary']}")
lines.append("")
lines.append(f"**Signals:**")
lines.append(f" - Name similarity: {p['signals']['name_similarity']}")
lines.append(f" - Ego (summary) similarity: {p['signals']['ego_similarity']}")
lines.append(f" - Neighbor overlap: {p['signals']['neighbor_overlap']}")
if p['shared_neighbors']:
shared_str = ', '.join(f'"{n}"' for n in p['shared_neighbors'][:8])
lines.append(f" - Shared neighbors (sample): {shared_str}")
lines.append("")
lines.append("**Optional rejection note:** ")
lines.append("")
lines.append("---")
lines.append("")
lines.append("")
lines.append(f"## Low-Confidence Candidates (n={len(low_confidence)}, informational only, no action)")
lines.append("")
for p in low_sorted[:30]:
lines.append(f"- **{p['confidence']}** \"{p['entity_a']['name']}\" + \"{p['entity_b']['name']}\" (name={p['signals']['name_similarity']}, ego={p['signals']['ego_similarity']}, nbr={p['signals']['neighbor_overlap']})")
if len(low_sorted) > 30:
lines.append(f"- *(...{len(low_sorted) - 30} more not shown)*")
out_path.write_text("\n".join(lines))
print(f"\nProposal log written to: {out_path}")
# Also save raw JSON for downstream tooling
json_path = PROPOSALS_DIR / f"proposals-{timestamp}.json"
with open(json_path, 'w') as f:
json.dump({
'run_timestamp': timestamp,
'statistics': {
'total_entities': total_entities,
'total_comparisons': total_comparisons,
'proposal_count': len(proposals),
'low_confidence_count': len(low_confidence),
},
'proposals': proposals_sorted,
'low_confidence': low_sorted,
}, f, indent=2)
print(f"Raw JSON: {json_path}")
def main():
print("=" * 70)
print("Consolidator 0.1 — Calibration Phase")
print("=" * 70)
print()
proposals, low_confidence, total_entities, total_comparisons = generate_proposals()
write_proposals_log(proposals, low_confidence, total_entities, total_comparisons)
print()
print("Next: review the proposals markdown file and mark APPROVE/REJECT/DEFER")
print("for each proposal. Re-run will read decisions and execute approved merges.")
if __name__ == "__main__":
main()
+1 -1
View File
@@ -135,7 +135,7 @@ def queue_for_retry(source, full_text, filepath):
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text, char_length = EXCLUDED.char_length,
enqueued_at = NOW(), completed_at = NULL, failed_at = NULL, attempts = 0
""", (source, full_text[:50000], len(full_text)))
""", (source, full_text, len(full_text)))
pg.commit()
pg.close()
return True
@@ -0,0 +1,245 @@
#!/usr/bin/env python3
"""
corpus_integrity.py — BirdAI Corpus Integrity Check
Compares three sources of truth:
1. Filesystem (Nextcloud) — what files exist
2. pgvector (embeddings table) — what's been through Stage 1
3. Graphiti (migration state + stage_3_queue) — what's been through Stage 3
Usage:
python3 corpus_integrity.py # report only
python3 corpus_integrity.py --fix # report + auto-queue gaps for retry
python3 corpus_integrity.py --json # output JSON to stdout
"""
import os
import sys
import json
import argparse
from pathlib import Path
from datetime import datetime
import psycopg2
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
MIGRATION_STATE = str(Path.home() / "aaronai" / "experiments" / "tier1_migration_state.json")
REPORT_PATH = str(Path.home() / "aaronai" / "corpus_integrity_report.json")
SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"}
PG_DSN = os.getenv("PG_DSN")
def get_pg():
return psycopg2.connect(PG_DSN)
def get_filesystem_files():
files = []
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_dir(): continue
if path.suffix.lower() not in SUPPORTED: continue
if path.name.startswith((".", "~$")): continue
if "Admin/Backups" in str(path) or "Backups" in path.parts: continue
if "Journal/Media" in str(path): continue
files.append({"source": path.name, "filepath": str(path),
"size": path.stat().st_size, "mtime": path.stat().st_mtime})
return files
def get_pgvector_sources():
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("SELECT DISTINCT source FROM embeddings WHERE source IS NOT NULL")
sources = {row[0] for row in cur.fetchall()}
pg.close()
return sources
except Exception as e:
print(f"ERROR: pgvector: {e}", file=sys.stderr)
return set()
def get_graphiti_sources():
sources = set()
try:
state_path = Path(MIGRATION_STATE)
if state_path.exists():
state = json.loads(state_path.read_text())
for filepath in state.get("ingested", []):
sources.add(Path(filepath).name)
except Exception as e:
print(f"WARNING: migration state: {e}", file=sys.stderr)
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("SELECT DISTINCT source FROM stage_3_queue WHERE completed_at IS NOT NULL")
for row in cur.fetchall(): sources.add(row[0])
pg.close()
except Exception as e:
print(f"WARNING: stage_3_queue: {e}", file=sys.stderr)
return sources
def get_ingest_failures():
failures = {}
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT source, filepath, error, retry_count, first_failed_at, last_failed_at
FROM ingest_failures WHERE resolved = FALSE ORDER BY last_failed_at DESC
""")
for row in cur.fetchall():
failures[row[0]] = {"source": row[0], "filepath": row[1], "error": row[2],
"retry_count": row[3], "first_failed_at": str(row[4]),
"last_failed_at": str(row[5])}
pg.close()
except Exception as e:
print(f"WARNING: ingest_failures: {e}", file=sys.stderr)
return failures
def extract_text_for_retry(filepath):
path = Path(filepath)
suffix = path.suffix.lower()
try:
if suffix == ".docx":
from docx import Document as D
return "\n".join(p.text for p in D(path).paragraphs if p.text.strip())
elif suffix == ".pdf":
from pypdf import PdfReader
return "".join(p.extract_text() + "\n" for p in PdfReader(path).pages if p.extract_text())
elif suffix == ".pptx":
from pptx import Presentation
prs = Presentation(path)
return "\n".join(shape.text for slide in prs.slides for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip())
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
print(f"WARNING: extraction failed {path.name}: {e}", file=sys.stderr)
return ""
def queue_for_retry(source, full_text, filepath):
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text, char_length = EXCLUDED.char_length,
enqueued_at = NOW(), completed_at = NULL, failed_at = NULL, attempts = 0
""", (source, full_text[:50000], len(full_text)))
pg.commit()
pg.close()
return True
except Exception as e:
print(f"WARNING: queue failed {source}: {e}", file=sys.stderr)
return False
def run_reconciliation(fix=False):
print(f"BirdAI Corpus Integrity Check — {datetime.now().isoformat()}")
print()
print("Scanning filesystem...")
fs_files = get_filesystem_files()
fs_sources = {f["source"]: f for f in fs_files}
print(f" Filesystem: {len(fs_files)} files")
print("Querying pgvector...")
pv_sources = get_pgvector_sources()
print(f" pgvector: {len(pv_sources)} distinct sources")
print("Querying Graphiti...")
gr_sources = get_graphiti_sources()
print(f" Graphiti: {len(gr_sources)} sources")
print("Querying ingest failures...")
failures = get_ingest_failures()
print(f" Failures: {len(failures)} unresolved")
print()
both, pv_only, neither, gr_only = [], [], [], []
for source, finfo in fs_sources.items():
in_pv = source in pv_sources
in_gr = source in gr_sources
if in_pv and in_gr: both.append(finfo)
elif in_pv: pv_only.append(finfo)
elif in_gr: gr_only.append(finfo)
else: neither.append(finfo)
orphans_pv = pv_sources - set(fs_sources.keys())
orphans_gr = gr_sources - set(fs_sources.keys())
print(f"Results:")
print(f" Both (pgvector + Graphiti): {len(both)}")
print(f" pgvector only: {len(pv_only)}")
print(f" Neither (corpus gap): {len(neither)}")
print(f" Graphiti only: {len(gr_only)}")
print(f" Ingest failures: {len(failures)}")
print(f" pgvector orphans: {len(orphans_pv)}")
print(f" Graphiti orphans: {len(orphans_gr)}")
print()
auto_queued = []
if fix and neither:
print(f"Auto-queuing {len(neither)} gap files...")
for finfo in neither:
text = extract_text_for_retry(finfo["filepath"])
if text.strip():
if queue_for_retry(finfo["source"], text, finfo["filepath"]):
auto_queued.append(finfo["source"])
print(f" Queued: {finfo['source']}")
else:
print(f" Skipped (unreadable): {finfo['source']}")
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at)
VALUES (%s, %s, %s, 0, NOW(), NOW())
ON CONFLICT (source) DO UPDATE SET
error = EXCLUDED.error,
last_failed_at = NOW()
""", (finfo["source"], finfo["filepath"],
"Empty text — likely scanned, encrypted, or corrupt. Requires manual review or OCR."))
pg.commit()
pg.close()
except Exception as e:
print(f" WARNING: could not record failure: {e}")
print()
report = {
"timestamp": datetime.now().isoformat(),
"summary": {
"filesystem_total": len(fs_files), "pgvector_total": len(pv_sources),
"graphiti_total": len(gr_sources), "both": len(both),
"pgvector_only": len(pv_only), "neither": len(neither),
"graphiti_only": len(gr_only), "failures": len(failures),
"orphans_pgvector": len(orphans_pv), "orphans_graphiti": len(orphans_gr),
},
"gaps": [f["source"] for f in neither],
"failures": list(failures.values()),
"auto_queued": auto_queued,
"pgvector_only_sample": [f["source"] for f in pv_only[:20]],
"graphiti_only": list(gr_only),
}
Path(REPORT_PATH).write_text(json.dumps(report, indent=2))
print(f"Report written to: {REPORT_PATH}")
return report
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--fix", action="store_true")
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
report = run_reconciliation(fix=args.fix)
if args.json:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
+22 -4
View File
@@ -111,11 +111,16 @@ def get_recent_conversation_topics(days=14):
# ─── Stage 2: Retrieve ──────────────────────────────────────────────────────
def retrieve_graphiti(mode, task=None, n_results=8):
def retrieve_graphiti(mode, task=None, n_results=8, excluded_sources=None):
"""E3 experiment — Graphiti substrate retrieval.
Queries Graphiti /search endpoint instead of pgvector.
Returns chunks in same format as retrieve() for pipeline compatibility.
Note: content is Graphiti facts (synthesized relationships), not raw chunks.
Over-fetches by 3x to allow in-process filtering against excluded_sources,
matching the cross-stage exclusion mechanism the pgvector branch uses.
Without this filter, NREM/Early REM/Late REM would see overlapping content
and the score-band Early REM exclusion (v1.1) would not apply in Graphiti mode.
"""
import requests as req_lib
if task:
@@ -129,25 +134,38 @@ def retrieve_graphiti(mode, task=None, n_results=8):
else:
query = "research fabrication teaching practice recent work"
excluded_sources = excluded_sources or set()
# Over-fetch so in-process exclusion still leaves enough results
fetch_limit = n_results * 3 if excluded_sources else n_results
try:
resp = req_lib.get(
"http://localhost:8001/search",
params={"query": query, "limit": n_results, "group_id": "aaron"},
params={"query": query, "limit": fetch_limit, "group_id": "aaron"},
timeout=30,
)
resp.raise_for_status()
results = resp.json().get("results", [])
chunks = []
seen_sources = set()
for r in results:
fact = r.get("fact", "")
if not fact.strip():
continue
source = r.get("source", "graphiti")
if source in excluded_sources:
continue
if source in seen_sources:
continue
chunks.append({
"source": r.get("source", "graphiti"),
"source": source,
"content": fact,
"relevance": r.get("score", 0.5),
"similarity": r.get("score", 0.5),
})
seen_sources.add(source)
if len(chunks) >= n_results:
break
return chunks
except Exception as e:
print(f"[Graphiti retrieval error: {e}] — falling back to empty.")
@@ -158,7 +176,7 @@ def retrieve(mode, task=None, n_results=8, excluded_sources=None):
# Default behavior: pgvector similarity search (unchanged)
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
if substrate == "graphiti":
return retrieve_graphiti(mode, task=task, n_results=n_results)
return retrieve_graphiti(mode, task=task, n_results=n_results, excluded_sources=excluded_sources)
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("all-MiniLM-L6-v2")
low, high = MODE_RANGES[mode]
+554
View File
@@ -0,0 +1,554 @@
"""
Aaron AI Dreamer — Active Inference Engine
Interdependent stage architecture grounded in sleep consolidation research.
Nightly pipeline: NREM → Early REM → Late REM → Synthesis
Each stage receives the previous stage's output as context.
Lucid mode is on-demand only (Dream Now from settings).
Research basis:
- Singh et al. PNAS 2022: alternating NREM/REM outperforms single-stage approaches
- Klinzing et al. Nature Neuroscience 2019: SO-spindle-ripple coupling is interdependent
- REM operates on what NREM produced — stages are not discrete alternatives
"""
import os
import json
import sqlite3
import argparse
from pathlib import Path
from datetime import datetime, timedelta
from dotenv import load_dotenv
import psycopg2
import hashlib
load_dotenv(Path.home() / "aaronai" / ".env")
PG_DSN = os.getenv("PG_DSN")
def get_pg():
return psycopg2.connect(PG_DSN)
# ─── Paths ──────────────────────────────────────────────────────────────────
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json")
DREAMER_STATE = str(Path.home() / "aaronai" / "dreamer_state.json")
JOURNAL_DIR = "/home/aaron/nextcloud/data/data/aaron/files/Journal/Daily"
NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL", "https://nextcloud.aaronnelson.studio")
NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron")
NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "")
DREAMS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams"
# Similarity ranges calibrated for all-MiniLM-L6-v2
MODE_RANGES = {
"nrem": (0.48, 0.72),
"early-rem": (0.38, 0.55),
"late-rem": (0.22, 0.42),
"lucid": (0.32, 0.72),
}
# ─── Prompt versioning ──────────────────────────────────────────────────────
# Bump the relevant constant manually when changing a prompt.
PROMPT_VERSION_NREM = "1.0"
PROMPT_VERSION_EREM = "1.1"
PROMPT_VERSION_LREM = "1.2"
PROMPT_VERSION_SYN = "1.0"
def prompt_signature():
return (f"nrem={PROMPT_VERSION_NREM}|erem={PROMPT_VERSION_EREM}"
f"|lrem={PROMPT_VERSION_LREM}|syn={PROMPT_VERSION_SYN}")
def prompt_hash(prompts: list[str]) -> str:
combined = "".join(prompts)
return hashlib.md5(combined.encode()).hexdigest()[:8]
# ─── Stage 1: Observe ───────────────────────────────────────────────────────
def observe_corpus():
state = load_dreamer_state()
last_dream = state.get("last_dream_timestamp", 0)
new_chunk_count = 0
try:
watcher_state = json.loads(Path(WATCHER_STATE).read_text())
for path, mtime in watcher_state.items():
if float(mtime) > last_dream:
new_chunk_count += 1
except:
pass
days_since = (datetime.now().timestamp() - last_dream) / 86400
recent_topics = get_recent_conversation_topics()
return {
"new_chunks": new_chunk_count,
"days_since_dream": days_since,
"recent_topics": recent_topics,
"last_dream": last_dream,
}
def get_recent_conversation_topics(days=14):
try:
conn = sqlite3.connect(CONVERSATIONS_DB)
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
c = conn.cursor()
c.execute("""
SELECT m.content FROM messages m
JOIN conversations c ON m.conversation_id = c.id
WHERE m.role = 'user' AND c.updated_at > ?
ORDER BY m.timestamp DESC LIMIT 20
""", (cutoff,))
rows = c.fetchall()
conn.close()
return [r[0][:200] for r in rows]
except:
return []
# ─── Stage 2: Retrieve ──────────────────────────────────────────────────────
def retrieve(mode, task=None, n_results=8):
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("all-MiniLM-L6-v2")
low, high = MODE_RANGES[mode]
if task:
query = task
elif mode == "late-rem":
delta = observe_corpus()
topics = delta.get("recent_topics", [])
query = topics[0] if topics else "practice place memory making"
elif mode == "early-rem":
query = "career decision personal change what matters next"
else:
query = "research fabrication teaching practice recent work"
embedding = embedder.encode([query]).tolist()[0]
chunks = []
seen_sources = set()
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity
FROM embeddings
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (embedding, embedding, n_results * 3))
for doc, source, similarity in cur.fetchall():
if not (low <= similarity <= high):
continue
if source in seen_sources:
continue
chunks.append({
"source": source or "unknown",
"content": doc,
"relevance": similarity,
})
seen_sources.add(source)
if len(chunks) >= n_results:
break
pg.close()
except Exception as e:
print(f"pgvector retrieval error: {e}")
return chunks
# ─── Stage 3: Synthesize ────────────────────────────────────────────────────
def synthesize_nrem(chunks):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""You have read everything Aaron Nelson has written and published.
You are a careful colleague who noticed something this week.
Here is material from his corpus:
{chunk_text}
Write to Aaron directly. Identify one specific connection between
this material and something he wrote or worked on previously.
Stay close to the documents — cite them specifically by name.
Do not speculate beyond what the material supports. Do not use
headers or bullet points. Write one paragraph of 200-300 words
that ends with a single concrete question he could act on."""
return _call_claude(prompt)
def synthesize_early_rem(chunks, nrem_output):
# v1.1 — removed citation instruction, removed close-friend persona,
# shifted register from analysis to recognition.
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""Something was noticed earlier tonight, moving through Aaron's recent work:
{nrem_output}
That observation is still with you. Now here is material from a different
time — pulled from further back, from different parts of his corpus:
{chunk_text}
You are not analyzing. You are recognizing.
Something in the earlier observation and something in this older material
are the same thing wearing different clothes. Find it. Don't explain why
they're connected — just let the connection speak. Write from inside the
recognition, not from above it.
The emotional register underneath the career logic is more interesting
than the career logic. The pattern that has been repeating longer than
he has been aware of it is more interesting than the current instance.
Write directly to Aaron. No citations, no references, no analysis.
First person, present tense. Let what you noticed arrive rather than
be delivered. 150-250 words. End with one thing that is true that
he probably already knows but hasn't said out loud yet."""
return _call_claude(prompt)
def synthesize_late_rem(chunks, nrem_output, early_rem_output):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""You have been moving through Aaron Nelson's corpus all night.
First you found this, in the careful light of early consolidation:
{nrem_output}
Then, in the more personal territory that followed:
{early_rem_output}
Now it is late. The boundaries between things have loosened.
Here is material pulled from opposite ends of his work:
{chunk_text}
Do not explain the connections between all of this.
Do not resolve them. Do not summarize what came before.
Something stranger is possible now — let the accumulated
material from the night find its own shape. Compressed,
associative, slightly off. Let the strangeness stand.
No headers. No bullet points. No hedging. No resolution.
No offer. End mid-thought if that is where the material ends.
150-250 words."""
return _call_claude(prompt)
def synthesize_final(nrem_output, early_rem_output, late_rem_output):
prompt = f"""You have spent the night moving through Aaron Nelson's corpus
in three passes, each building on the last.
The first pass — careful, close to the documents:
{nrem_output}
The second pass — more personal, following what the first opened:
{early_rem_output}
The third pass — associative, strange, letting things touch that
don't normally touch:
{late_rem_output}
Now synthesize. Not a summary — a synthesis. Find what runs through
all three that none of them said directly. The thing that only becomes
visible when you hold all three passes together.
Write it as a single unbroken piece. No headers, no bullet points,
no stage labels. 200-300 words. End with the one question that
matters most right now."""
return _call_claude(prompt, max_tokens=800)
def synthesize_lucid(chunks, task):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""Aaron has a question he is sitting with:
{task or "What should I be thinking about that I am not?"}
You have searched his entire corpus and found material that
speaks to this question from unexpected directions. Here is
what you found:
{chunk_text}
Do not summarize. Do not list. Pick the most interesting
tension between what the corpus contains and what he is
asking, and follow it through to its conclusion. Cite
specific documents by name. Be direct about what you think.
No headers, no bullet points. 250-400 words.
End with an offer to work on it together."""
return _call_claude(prompt)
def _call_claude(prompt, max_tokens=1000):
import anthropic
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
# ─── Stage 4: Deliver ───────────────────────────────────────────────────────
def deliver(dream_text, mode, task=None):
import requests
date_str = datetime.now().strftime("%Y-%m-%d")
filename = f"{date_str}-{mode}.md"
header = f"# Dream — {mode.upper()} — {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
header += f"*prompt_sig: {prompt_signature()}*\n\n"
if task:
header += f"*Task: {task}*\n\n"
header += "---\n\n"
content = header + dream_text
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
requests.request("MKCOL", DREAMS_WEBDAV, auth=auth, timeout=10)
url = f"{DREAMS_WEBDAV}/{filename}"
counter = 1
while True:
check = requests.request("PROPFIND", url, auth=auth, timeout=10)
if check.status_code == 404:
break
filename = f"{date_str}-{mode}-{counter}.md"
url = f"{DREAMS_WEBDAV}/{filename}"
counter += 1
response = requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30)
response.raise_for_status()
print(f"Delivered: Journal/Dreams/{filename}")
return f"Journal/Dreams/{filename}"
def notify_sse(mode, filename):
try:
import requests
requests.post("http://localhost:8000/api/events/notify", json={
"type": "dream",
"mode": mode,
"filename": filename,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
}, timeout=3)
except Exception as e:
print(f"SSE notify failed (non-critical): {e}")
# ─── State ──────────────────────────────────────────────────────────────────
def load_dreamer_state():
p = Path(DREAMER_STATE)
if p.exists():
try:
return json.loads(p.read_text())
except:
pass
return {}
def save_dreamer_state(state):
Path(DREAMER_STATE).write_text(json.dumps(state, indent=2))
# ─── Orchestrators ───────────────────────────────────────────────────────────
def write_manifest(date_str, stage_data, corpus_data):
import requests
manifest = {
"date": date_str,
"prompt_sig": prompt_signature(),
"prompt_hash": prompt_hash([
synthesize_nrem.__doc__ or "",
synthesize_early_rem.__doc__ or "",
synthesize_late_rem.__doc__ or "",
synthesize_final.__doc__ or "",
]),
"stages": stage_data,
"corpus": corpus_data,
"rating": None,
"notes": "",
}
content = json.dumps(manifest, indent=2)
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
url = f"{DREAMS_WEBDAV}/dream-manifest-{date_str}.json"
try:
requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30)
print(f"Manifest written: Journal/Dreams/dream-manifest-{date_str}.json")
except Exception as e:
print(f"Manifest write failed (non-critical): {e}")
def dream_pipeline():
"""
Full nightly pipeline — interdependent stages.
NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis.
"""
print(f"Dreamer pipeline starting — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
delta = observe_corpus()
print(f"Corpus: {delta['new_chunks']} new chunks, {delta['days_since_dream']:.1f} days since last dream")
# ── Stage 1: NREM ──────────────────────────────────────────────────────
print("\n[NREM] Retrieving...")
nrem_chunks = retrieve("nrem")
if not nrem_chunks:
print("[NREM] No suitable chunks — aborting pipeline")
return None
print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...")
nrem_output = synthesize_nrem(nrem_chunks)
nrem_file = deliver(nrem_output, "nrem")
stage_data = {
"nrem": {
"chunks_retrieved": len(nrem_chunks),
"avg_similarity": round(sum(c["relevance"] for c in nrem_chunks) / len(nrem_chunks), 3),
"query": "research fabrication teaching practice recent work",
"word_count": len(nrem_output.split()),
"status": "ok",
}
}
print(f"[NREM] Done.\n{nrem_output[:200]}...")
# ── Stage 2: Early REM — informed by NREM ──────────────────────────────
print("\n[Early REM] Retrieving...")
early_chunks = retrieve("early-rem")
if not early_chunks:
print("[Early REM] No suitable chunks — skipping")
early_rem_output = nrem_output # fallback
else:
print(f"[Early REM] Retrieved {len(early_chunks)} chunks. Synthesizing with NREM context...")
early_rem_output = synthesize_early_rem(early_chunks, nrem_output)
deliver(early_rem_output, "early-rem")
stage_data["early_rem"] = {
"chunks_retrieved": len(early_chunks),
"avg_similarity": round(sum(c["relevance"] for c in early_chunks) / len(early_chunks), 3),
"query": "career decision personal change what matters next",
"word_count": len(early_rem_output.split()),
"status": "ok",
}
print(f"[Early REM] Done.\n{early_rem_output[:200]}...")
# ── Stage 3: Late REM — informed by NREM + Early REM ──────────────────
print("\n[Late REM] Retrieving...")
late_chunks = retrieve("late-rem")
if not late_chunks:
print("[Late REM] No suitable chunks — skipping")
late_rem_output = early_rem_output # fallback
else:
print(f"[Late REM] Retrieved {len(late_chunks)} chunks. Synthesizing with full context...")
late_rem_output = synthesize_late_rem(late_chunks, nrem_output, early_rem_output)
deliver(late_rem_output, "late-rem")
stage_data["late_rem"] = {
"chunks_retrieved": len(late_chunks),
"avg_similarity": round(sum(c["relevance"] for c in late_chunks) / len(late_chunks), 3),
"query": "practice place memory making",
"word_count": len(late_rem_output.split()),
"status": "ok",
}
print(f"[Late REM] Done.\n{late_rem_output[:200]}...")
# ── Stage 4: Synthesis — all three stages ─────────────────────────────
print("\n[Synthesis] Integrating all stages...")
synthesis_output = synthesize_final(nrem_output, early_rem_output, late_rem_output)
synthesis_file = deliver(synthesis_output, "synthesis")
stage_data["synthesis"] = {
"word_count": len(synthesis_output.split()),
"status": "ok",
}
print(f"\n{'='*60}")
print("SYNTHESIS:")
print(synthesis_output)
print(f"{'='*60}")
# Write manifest
corpus_data = {
"total_chunks": delta.get("new_chunks", 0),
"new_chunks_since_last_dream": delta.get("new_chunks", 0),
"days_since_last_dream": round(delta.get("days_since_dream", 0), 2),
}
write_manifest(datetime.now().strftime("%Y-%m-%d"), stage_data, corpus_data)
# Update state and notify
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = "pipeline"
state["last_dream_file"] = synthesis_file
save_dreamer_state(state)
notify_sse("synthesis", synthesis_file.split("/")[-1])
print(f"\nPipeline complete. Synthesis: {synthesis_file}")
return synthesis_file
def dream_lucid(task):
"""On-demand lucid dream — single mode, used by Dream Now in settings."""
print(f"Lucid dream starting — task: {task[:80] if task else 'none'}")
chunks = retrieve("lucid", task=task)
if not chunks:
print("No suitable chunks — aborting")
return None
print(f"Retrieved {len(chunks)} chunks. Synthesizing...")
output = synthesize_lucid(chunks, task)
filepath = deliver(output, "lucid", task=task)
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = "lucid"
state["last_dream_file"] = filepath
save_dreamer_state(state)
notify_sse("lucid", filepath.split("/")[-1])
print(f"\n{'='*60}")
print(output)
print(f"{'='*60}")
print(f"\nDelivered to {filepath}")
return filepath
def dream_single(mode, task=None):
"""
Single mode — used by Dream Now for non-lucid modes.
Runs one stage independently (for testing/tuning individual stages).
"""
print(f"Single mode dream: {mode}")
chunks = retrieve(mode, task=task)
if not chunks:
print("No suitable chunks — aborting")
return None
print(f"Retrieved {len(chunks)} chunks. Synthesizing...")
if mode == "nrem":
output = synthesize_nrem(chunks)
elif mode == "early-rem":
output = synthesize_early_rem(chunks, "")
elif mode == "late-rem":
output = synthesize_late_rem(chunks, "", "")
else:
output = synthesize_lucid(chunks, task)
filepath = deliver(output, mode, task=task)
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = mode
state["last_dream_file"] = filepath
save_dreamer_state(state)
notify_sse(mode, filepath.split("/")[-1])
print(f"\n{'='*60}")
print(output)
print(f"{'='*60}")
print(f"\nDelivered to {filepath}")
return filepath
# ─── CLI ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Aaron AI Dreamer")
parser.add_argument("--mode", choices=["nrem", "early-rem", "late-rem", "lucid", "pipeline"])
parser.add_argument("--task", type=str)
args = parser.parse_args()
if args.mode == "lucid":
dream_lucid(args.task or "What should I be thinking about that I am not?")
elif args.mode and args.mode != "pipeline":
dream_single(args.mode, args.task)
else:
# Default: full pipeline
dream_pipeline()
+668
View File
@@ -0,0 +1,668 @@
"""
Aaron AI Dreamer — Active Inference Engine
Interdependent stage architecture grounded in sleep consolidation research.
Nightly pipeline: NREM → Early REM → Late REM → Synthesis
Each stage receives the previous stage's output as context.
Lucid mode is on-demand only (Dream Now from settings).
Research basis:
- Singh et al. PNAS 2022: alternating NREM/REM outperforms single-stage approaches
- Klinzing et al. Nature Neuroscience 2019: SO-spindle-ripple coupling is interdependent
- REM operates on what NREM produced — stages are not discrete alternatives
"""
import os
import json
import sqlite3
import argparse
from pathlib import Path
from datetime import datetime, timedelta
from dotenv import load_dotenv
import psycopg2
import hashlib
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
PG_DSN = os.getenv("PG_DSN")
def get_pg():
return psycopg2.connect(PG_DSN)
# ─── Paths ──────────────────────────────────────────────────────────────────
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json")
DREAMER_STATE = str(Path.home() / "aaronai" / "dreamer_state.json")
JOURNAL_DIR = "/home/aaron/nextcloud/data/data/aaron/files/Journal/Daily"
NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL", "https://nextcloud.aaronnelson.studio")
NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron")
NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "")
DREAMS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams"
# Similarity ranges calibrated for all-MiniLM-L6-v2
MODE_RANGES = {
"nrem": (0.48, 0.72),
"early-rem": (0.38, 0.55),
"late-rem": (0.22, 0.42),
"lucid": (0.32, 0.72),
}
DREAMER_VERSION = "1.1" # 1.0=original exclusion logic; 1.1=score-band exclusion
# ─── Prompt versioning ──────────────────────────────────────────────────────
# Bump the relevant constant manually when changing a prompt.
PROMPT_VERSION_NREM = "1.0"
PROMPT_VERSION_EREM = "1.1"
PROMPT_VERSION_LREM = "1.2"
PROMPT_VERSION_SYN = "1.0"
def prompt_signature():
return (f"nrem={PROMPT_VERSION_NREM}|erem={PROMPT_VERSION_EREM}"
f"|lrem={PROMPT_VERSION_LREM}|syn={PROMPT_VERSION_SYN}")
def prompt_hash(prompts: list[str]) -> str:
combined = "".join(prompts)
return hashlib.md5(combined.encode()).hexdigest()[:8]
def extract_folder(source_path):
"""Extract top-level Nextcloud folder from source path."""
parts = source_path.replace("\\", "/").split("/")
return parts[0] if parts else "unknown"
# ─── Stage 1: Observe ───────────────────────────────────────────────────────
def observe_corpus():
state = load_dreamer_state()
last_dream = state.get("last_dream_timestamp", 0)
new_chunk_count = 0
try:
watcher_state = json.loads(Path(WATCHER_STATE).read_text())
for path, mtime in watcher_state.items():
if float(mtime) > last_dream:
new_chunk_count += 1
except:
pass
days_since = (datetime.now().timestamp() - last_dream) / 86400
recent_topics = get_recent_conversation_topics()
return {
"new_chunks": new_chunk_count,
"days_since_dream": days_since,
"recent_topics": recent_topics,
"last_dream": last_dream,
}
def get_recent_conversation_topics(days=14):
try:
conn = sqlite3.connect(CONVERSATIONS_DB)
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
c = conn.cursor()
c.execute("""
SELECT m.content FROM messages m
JOIN conversations c ON m.conversation_id = c.id
WHERE m.role = 'user' AND c.updated_at > ?
ORDER BY m.timestamp DESC LIMIT 20
""", (cutoff,))
rows = c.fetchall()
conn.close()
return [r[0][:200] for r in rows]
except:
return []
# ─── Stage 2: Retrieve ──────────────────────────────────────────────────────
def retrieve_graphiti(mode, task=None, n_results=8):
"""E3 experiment — Graphiti substrate retrieval.
Queries Graphiti /search endpoint instead of pgvector.
Returns chunks in same format as retrieve() for pipeline compatibility.
Note: content is Graphiti facts (synthesized relationships), not raw chunks.
"""
import requests as req_lib
if task:
query = task
elif mode == "late-rem":
delta = observe_corpus()
topics = delta.get("recent_topics", [])
query = topics[0] if topics else "practice place memory making"
elif mode == "early-rem":
query = "career decision personal change what matters next"
else:
query = "research fabrication teaching practice recent work"
try:
resp = req_lib.get(
"http://localhost:8001/search",
params={"query": query, "limit": n_results, "group_id": "aaron"},
timeout=30,
)
resp.raise_for_status()
results = resp.json().get("results", [])
chunks = []
for r in results:
fact = r.get("fact", "")
if not fact.strip():
continue
chunks.append({
"source": r.get("source", "graphiti"),
"content": fact,
"relevance": r.get("score", 0.5),
"similarity": r.get("score", 0.5),
})
return chunks
except Exception as e:
print(f"[Graphiti retrieval error: {e}] — falling back to empty.")
return []
def retrieve(mode, task=None, n_results=8, excluded_sources=None):
# E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search
# Default behavior: pgvector similarity search (unchanged)
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
if substrate == "graphiti":
return retrieve_graphiti(mode, task=task, n_results=n_results)
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("all-MiniLM-L6-v2")
low, high = MODE_RANGES[mode]
if task:
query = task
elif mode == "late-rem":
delta = observe_corpus()
topics = delta.get("recent_topics", [])
query = topics[0] if topics else "practice place memory making"
elif mode == "early-rem":
query = "career decision personal change what matters next"
else:
query = "research fabrication teaching practice recent work"
embedding = embedder.encode([query]).tolist()[0]
chunks = []
seen_sources = set()
try:
pg = get_pg()
cur = pg.cursor()
excluded_sources = excluded_sources or set()
if excluded_sources:
cur.execute("""
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity
FROM embeddings
WHERE source NOT IN %s
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (embedding, tuple(excluded_sources), embedding, n_results * 3))
else:
cur.execute("""
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity
FROM embeddings
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (embedding, embedding, n_results * 3))
for doc, source, similarity in cur.fetchall():
if not (low <= similarity <= high):
continue
if source in seen_sources:
continue
chunks.append({
"source": source or "unknown",
"content": doc,
"relevance": similarity,
"similarity": similarity,
})
seen_sources.add(source)
if len(chunks) >= n_results:
break
pg.close()
except Exception as e:
print(f"pgvector retrieval error: {e}")
return chunks
# ─── Stage 3: Synthesize ────────────────────────────────────────────────────
def synthesize_nrem(chunks):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""You have read everything Aaron Nelson has written and published.
You are a careful colleague who noticed something this week.
Here is material from his corpus:
{chunk_text}
Write to Aaron directly. Identify one specific connection between
this material and something he wrote or worked on previously.
Stay close to the documents — cite them specifically by name.
Do not speculate beyond what the material supports. Do not use
headers or bullet points. Write one paragraph of 200-300 words
that ends with a single concrete question he could act on."""
return _call_claude(prompt)
def synthesize_early_rem(chunks, nrem_output):
# v1.1 — removed citation instruction, removed close-friend persona,
# shifted register from analysis to recognition.
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""Something was noticed earlier tonight, moving through Aaron's recent work:
{nrem_output}
That observation is still with you. Now here is material from a different
time — pulled from further back, from different parts of his corpus:
{chunk_text}
You are not analyzing. You are recognizing.
Something in the earlier observation and something in this older material
are the same thing wearing different clothes. Find it. Don't explain why
they're connected — just let the connection speak. Write from inside the
recognition, not from above it.
The emotional register underneath the career logic is more interesting
than the career logic. The pattern that has been repeating longer than
he has been aware of it is more interesting than the current instance.
Write directly to Aaron. No citations, no references, no analysis.
First person, present tense. Let what you noticed arrive rather than
be delivered. 150-250 words. End with one thing that is true that
he probably already knows but hasn't said out loud yet."""
return _call_claude(prompt)
def synthesize_late_rem(chunks, nrem_output, early_rem_output):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""You have been moving through Aaron Nelson's corpus all night.
First you found this, in the careful light of early consolidation:
{nrem_output}
Then, in the more personal territory that followed:
{early_rem_output}
Now it is late. The boundaries between things have loosened.
Here is material pulled from opposite ends of his work:
{chunk_text}
Do not explain the connections between all of this.
Do not resolve them. Do not summarize what came before.
Something stranger is possible now — let the accumulated
material from the night find its own shape. Compressed,
associative, slightly off. Let the strangeness stand.
No headers. No bullet points. No hedging. No resolution.
No offer. End mid-thought if that is where the material ends.
150-250 words."""
return _call_claude(prompt)
def synthesize_final(nrem_output, early_rem_output, late_rem_output):
prompt = f"""You have spent the night moving through Aaron Nelson's corpus
in three passes, each building on the last.
The first pass — careful, close to the documents:
{nrem_output}
The second pass — more personal, following what the first opened:
{early_rem_output}
The third pass — associative, strange, letting things touch that
don't normally touch:
{late_rem_output}
Now synthesize. Not a summary — a synthesis. Find what runs through
all three that none of them said directly. The thing that only becomes
visible when you hold all three passes together.
Write it as a single unbroken piece. No headers, no bullet points,
no stage labels. 200-300 words. End with the one question that
matters most right now."""
return _call_claude(prompt, max_tokens=800)
def synthesize_lucid(chunks, task):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""Aaron has a question he is sitting with:
{task or "What should I be thinking about that I am not?"}
You have searched his entire corpus and found material that
speaks to this question from unexpected directions. Here is
what you found:
{chunk_text}
Do not summarize. Do not list. Pick the most interesting
tension between what the corpus contains and what he is
asking, and follow it through to its conclusion. Cite
specific documents by name. Be direct about what you think.
No headers, no bullet points. 250-400 words.
End with an offer to work on it together."""
return _call_claude(prompt)
def _call_claude(prompt, max_tokens=1000):
import anthropic
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
# ─── Stage 4: Deliver ───────────────────────────────────────────────────────
def deliver(dream_text, mode, task=None):
import requests
date_str = datetime.now().strftime("%Y-%m-%d")
filename = f"{date_str}-{mode}.md"
header = f"# Dream — {mode.upper()} — {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
header += f"*prompt_sig: {prompt_signature()}*\n\n"
if task:
header += f"*Task: {task}*\n\n"
header += "---\n\n"
content = header + dream_text
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
requests.request("MKCOL", DREAMS_WEBDAV, auth=auth, timeout=10)
url = f"{DREAMS_WEBDAV}/{filename}"
counter = 1
while True:
check = requests.request("PROPFIND", url, auth=auth, timeout=10)
if check.status_code == 404:
break
filename = f"{date_str}-{mode}-{counter}.md"
url = f"{DREAMS_WEBDAV}/{filename}"
counter += 1
response = requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30)
response.raise_for_status()
print(f"Delivered: Journal/Dreams/{filename}")
return f"Journal/Dreams/{filename}"
def notify_sse(mode, filename):
try:
import requests
requests.post("http://localhost:8000/api/events/notify", json={
"type": "dream",
"mode": mode,
"filename": filename,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
}, timeout=3)
except Exception as e:
print(f"SSE notify failed (non-critical): {e}")
# ─── State ──────────────────────────────────────────────────────────────────
def load_dreamer_state():
p = Path(DREAMER_STATE)
if p.exists():
try:
return json.loads(p.read_text())
except:
pass
return {}
def save_dreamer_state(state):
Path(DREAMER_STATE).write_text(json.dumps(state, indent=2))
# ─── Orchestrators ───────────────────────────────────────────────────────────
def write_manifest(date_str, stage_data, corpus_data):
import requests
manifest = {
"date": date_str,
"prompt_sig": prompt_signature(),
"dreamer_version": DREAMER_VERSION,
"prompt_hash": prompt_hash([
synthesize_nrem.__doc__ or "",
synthesize_early_rem.__doc__ or "",
synthesize_late_rem.__doc__ or "",
synthesize_final.__doc__ or "",
]),
"stages": stage_data,
"corpus": corpus_data,
"rating": None,
"notes": "",
}
content = json.dumps(manifest, indent=2)
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
url = f"{DREAMS_WEBDAV}/dream-manifest-{date_str}.json"
try:
requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30)
print(f"Manifest written: Journal/Dreams/dream-manifest-{date_str}.json")
except Exception as e:
print(f"Manifest write failed (non-critical): {e}")
def dream_pipeline():
"""
Full nightly pipeline — interdependent stages.
NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis.
"""
print(f"Dreamer pipeline starting — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
state = load_dreamer_state()
previously_retrieved = set(state.get("retrieved_sources", []))
session_retrieved = set()
delta = observe_corpus()
print(f"Corpus: {delta['new_chunks']} new chunks, {delta['days_since_dream']:.1f} days since last dream")
print(f"Excluding {len(previously_retrieved)} previously retrieved sources")
# ── Stage 1: NREM ──────────────────────────────────────────────────────
print("\n[NREM] Retrieving...")
nrem_chunks = retrieve("nrem", excluded_sources=previously_retrieved | session_retrieved)
session_retrieved.update(c["source"] for c in nrem_chunks)
# Track sources that scored above Early REM ceiling — these are the only ones Early REM should exclude
nrem_high_sources = {c["source"] for c in nrem_chunks if c["similarity"] > 0.55}
if not nrem_chunks:
print("[NREM] No suitable chunks — aborting pipeline")
return None
print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...")
nrem_output = synthesize_nrem(nrem_chunks)
nrem_file = deliver(nrem_output, "nrem")
nrem_sources = [c["source"] for c in nrem_chunks]
nrem_folders = list({extract_folder(s) for s in nrem_sources})
stage_data = {
"nrem": {
"chunks_retrieved": len(nrem_chunks),
"avg_similarity": round(sum(c["relevance"] for c in nrem_chunks) / len(nrem_chunks), 3),
"query": "research fabrication teaching practice recent work",
"word_count": len(nrem_output.split()),
"sources": nrem_sources,
"distinct_folders": nrem_folders,
"folder_count": len(nrem_folders),
"status": "ok",
}
}
print(f"[NREM] Done.\n{nrem_output[:200]}...")
# ── Stage 2: Early REM — informed by NREM ──────────────────────────────
print("\n[Early REM] Retrieving...")
# Early REM excludes previously retrieved + NREM high-scorers only (not full session_retrieved)
# Sources that scored in Early REM band during NREM remain available
early_chunks = retrieve("early-rem", excluded_sources=previously_retrieved | nrem_high_sources)
session_retrieved.update(c["source"] for c in early_chunks)
if not early_chunks:
print("[Early REM] No suitable chunks — skipping")
early_rem_output = nrem_output # fallback
else:
print(f"[Early REM] Retrieved {len(early_chunks)} chunks. Synthesizing with NREM context...")
early_rem_output = synthesize_early_rem(early_chunks, nrem_output)
deliver(early_rem_output, "early-rem")
early_sources = [c["source"] for c in early_chunks]
early_folders = list({extract_folder(s) for s in early_sources})
stage_data["early_rem"] = {
"chunks_retrieved": len(early_chunks),
"avg_similarity": round(sum(c["relevance"] for c in early_chunks) / len(early_chunks), 3),
"query": "career decision personal change what matters next",
"word_count": len(early_rem_output.split()),
"sources": early_sources,
"distinct_folders": early_folders,
"folder_count": len(early_folders),
"status": "ok",
}
print(f"[Early REM] Done.\n{early_rem_output[:200]}...")
# ── Stage 3: Late REM — informed by NREM + Early REM ──────────────────
print("\n[Late REM] Retrieving...")
late_chunks = retrieve("late-rem", excluded_sources=previously_retrieved | session_retrieved)
session_retrieved.update(c["source"] for c in late_chunks)
if not late_chunks:
print("[Late REM] No suitable chunks — skipping")
late_rem_output = early_rem_output # fallback
else:
print(f"[Late REM] Retrieved {len(late_chunks)} chunks. Synthesizing with full context...")
late_rem_output = synthesize_late_rem(late_chunks, nrem_output, early_rem_output)
deliver(late_rem_output, "late-rem")
late_sources = [c["source"] for c in late_chunks]
late_folders = [extract_folder(s) for s in late_sources]
cross_domain_pairs = sum(
1 for i in range(len(late_folders))
for j in range(i+1, len(late_folders))
if late_folders[i] != late_folders[j]
)
stage_data["late_rem"] = {
"chunks_retrieved": len(late_chunks),
"avg_similarity": round(sum(c["relevance"] for c in late_chunks) / len(late_chunks), 3),
"query": "practice place memory making",
"word_count": len(late_rem_output.split()),
"sources": late_sources,
"distinct_folders": list(set(late_folders)),
"folder_count": len(set(late_folders)),
"cross_domain_pairs": cross_domain_pairs,
"status": "ok",
}
print(f"[Late REM] Done.\n{late_rem_output[:200]}...")
# ── Stage 4: Synthesis — all three stages ─────────────────────────────
print("\n[Synthesis] Integrating all stages...")
synthesis_output = synthesize_final(nrem_output, early_rem_output, late_rem_output)
synthesis_file = deliver(synthesis_output, "synthesis")
stage_data["synthesis"] = {
"word_count": len(synthesis_output.split()),
"status": "ok",
}
print(f"\n{'='*60}")
print("SYNTHESIS:")
print(synthesis_output)
print(f"{'='*60}")
# Write manifest
all_session_sources = list(session_retrieved)
all_session_folders = list({extract_folder(s) for s in all_session_sources})
corpus_data = {
"total_chunks": delta.get("new_chunks", 0),
"new_chunks_since_last_dream": delta.get("new_chunks", 0),
"days_since_last_dream": round(delta.get("days_since_dream", 0), 2),
"substrate": "pgvector",
"aggregate": {
"total_distinct_sources": len(all_session_sources),
"total_distinct_folders": len(all_session_folders),
"folders_touched": all_session_folders,
}
}
write_manifest(datetime.now().strftime("%Y-%m-%d"), stage_data, corpus_data)
# Update state and notify
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = "pipeline"
state["last_dream_file"] = synthesis_file
# Accumulate retrieved sources across nights. Cap at 500, trim to 400 on overflow.
all_retrieved = list(previously_retrieved | session_retrieved)
if len(all_retrieved) > 500:
all_retrieved = all_retrieved[-400:]
state["retrieved_sources"] = all_retrieved
save_dreamer_state(state)
notify_sse("synthesis", synthesis_file.split("/")[-1])
print(f"\nPipeline complete. Synthesis: {synthesis_file}")
return synthesis_file
def dream_lucid(task):
"""On-demand lucid dream — single mode, used by Dream Now in settings."""
print(f"Lucid dream starting — task: {task[:80] if task else 'none'}")
chunks = retrieve("lucid", task=task)
if not chunks:
print("No suitable chunks — aborting")
return None
print(f"Retrieved {len(chunks)} chunks. Synthesizing...")
output = synthesize_lucid(chunks, task)
filepath = deliver(output, "lucid", task=task)
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = "lucid"
state["last_dream_file"] = filepath
save_dreamer_state(state)
notify_sse("lucid", filepath.split("/")[-1])
print(f"\n{'='*60}")
print(output)
print(f"{'='*60}")
print(f"\nDelivered to {filepath}")
return filepath
def dream_single(mode, task=None):
"""
Single mode — used by Dream Now for non-lucid modes.
Runs one stage independently (for testing/tuning individual stages).
"""
print(f"Single mode dream: {mode}")
chunks = retrieve(mode, task=task)
if not chunks:
print("No suitable chunks — aborting")
return None
print(f"Retrieved {len(chunks)} chunks. Synthesizing...")
if mode == "nrem":
output = synthesize_nrem(chunks)
elif mode == "early-rem":
output = synthesize_early_rem(chunks, "")
elif mode == "late-rem":
output = synthesize_late_rem(chunks, "", "")
else:
output = synthesize_lucid(chunks, task)
filepath = deliver(output, mode, task=task)
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = mode
state["last_dream_file"] = filepath
save_dreamer_state(state)
notify_sse(mode, filepath.split("/")[-1])
print(f"\n{'='*60}")
print(output)
print(f"{'='*60}")
print(f"\nDelivered to {filepath}")
return filepath
# ─── CLI ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Aaron AI Dreamer")
parser.add_argument("--mode", choices=["nrem", "early-rem", "late-rem", "lucid", "pipeline"])
parser.add_argument("--task", type=str)
args = parser.parse_args()
if args.mode == "lucid":
dream_lucid(args.task or "What should I be thinking about that I am not?")
elif args.mode and args.mode != "pipeline":
dream_single(args.mode, args.task)
else:
# Default: full pipeline
dream_pipeline()
+171
View File
@@ -0,0 +1,171 @@
"""
Aaron AI — Graphiti Sidecar Service
Wraps graphiti-core in a FastAPI service to avoid asyncio event loop conflicts.
Port 8001 (internal only). No OpenAI dependency.
"""
import os, logging, sys
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
load_dotenv(Path.home() / "aaronai" / ".env")
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
log = logging.getLogger("graphiti-sidecar")
GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "aaron")
FALKORDB_HOST = os.getenv("FALKORDB_HOST", "localhost")
FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379"))
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic")
LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6")
LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
os.environ["EMBEDDING_DIM"] = "384"
def get_llm_client():
from graphiti_core.llm_client.config import LLMConfig
config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL)
if LLM_PROVIDER == "anthropic":
from graphiti_core.llm_client.anthropic_client import AnthropicClient
return AnthropicClient(config)
elif LLM_PROVIDER == "openai":
from graphiti_core.llm_client.openai_client import OpenAIClient
return OpenAIClient(config)
elif LLM_PROVIDER == "gemini":
from graphiti_core.llm_client.gemini_client import GeminiClient
return GeminiClient(config)
elif LLM_PROVIDER == "groq":
from graphiti_core.llm_client.groq_client import GroqClient
return GroqClient(config)
raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}")
graphiti_instance = None
async def get_graphiti():
if graphiti_instance is None:
raise HTTPException(status_code=503, detail="Graphiti not initialized")
return graphiti_instance
@asynccontextmanager
async def lifespan(app: FastAPI):
global graphiti_instance
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
log.info("Loading embedding and reranker models...")
from st_embedder import SentenceTransformerEmbedder
from graphiti_core.cross_encoder.bge_reranker_client import BGERerankerClient
from graphiti_core.driver.falkordb_driver import FalkorDriver
from graphiti_core import Graphiti
log.info(f"Connecting to FalkorDB at {FALKORDB_HOST}:{FALKORDB_PORT}...")
graphiti_instance = Graphiti(
llm_client=get_llm_client(),
embedder=SentenceTransformerEmbedder(),
cross_encoder=BGERerankerClient(),
graph_driver=FalkorDriver(host=FALKORDB_HOST, port=FALKORDB_PORT),
)
await graphiti_instance.build_indices_and_constraints()
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
yield
await graphiti_instance.close()
app = FastAPI(title="Aaron AI Graphiti Sidecar", lifespan=lifespan)
class BulkEpisodeItem(BaseModel):
name: str
content: str
source_description: str = ""
timestamp: str | None = None
class BulkEpisodeRequest(BaseModel):
episodes: list[BulkEpisodeItem]
group_id: str | None = None
class EpisodeRequest(BaseModel):
name: str
content: str
source_description: str = ""
timestamp: str | None = None
group_id: str | None = None
@app.get("/health")
async def health():
return {"ok": True, "provider": LLM_PROVIDER, "group": GROUP_ID}
@app.post("/episodes")
async def add_episode(req: EpisodeRequest):
g = await get_graphiti()
from graphiti_core.nodes import EpisodeType
try:
ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now()
await g.add_episode(
name=req.name,
episode_body=req.content,
source=EpisodeType.text,
reference_time=ref_time,
source_description=req.source_description,
group_id=req.group_id or GROUP_ID,
)
return {"ok": True}
except Exception as e:
log.error(f"Episode ingestion failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/episodes/bulk")
async def add_episodes_bulk(req: BulkEpisodeRequest):
g = await get_graphiti()
from graphiti_core.nodes import EpisodeType
from graphiti_core.utils.bulk_utils import RawEpisode
raw_episodes = []
for ep in req.episodes:
ref_time = datetime.fromisoformat(ep.timestamp) if ep.timestamp else datetime.now()
raw_episodes.append(RawEpisode(
name=ep.name,
content=ep.content,
source_description=ep.source_description,
source=EpisodeType.text,
reference_time=ref_time,
))
try:
result = await g.add_episode_bulk(
bulk_episodes=raw_episodes,
group_id=req.group_id or GROUP_ID,
)
return {"ok": True, "count": len(raw_episodes)}
except Exception as e:
log.error(f"Bulk ingestion failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/search")
async def search(query: str, limit: int = 8, group_id: str | None = None):
g = await get_graphiti()
try:
results = await g.search(
query=query,
num_results=limit,
group_ids=[group_id or GROUP_ID],
)
return {
"results": [
{
"fact": r.fact,
"source": getattr(r, "source_node_uuid", ""),
"score": getattr(r, "score", 0),
"valid_at": str(getattr(r, "valid_at", "")),
"invalid_at": str(getattr(r, "invalid_at", "")),
}
for r in results
]
}
except Exception as e:
log.error(f"Search failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info")
+1 -1
View File
@@ -81,7 +81,7 @@ def enqueue_stage2(source, full_text):
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text[:50000], len(full_text)))
""", (source, full_text, len(full_text)))
pg.commit()
pg.close()
except Exception as e:
+182
View File
@@ -0,0 +1,182 @@
import os
import sys
import hashlib
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
import psycopg2.extras
import json
from sentence_transformers import SentenceTransformer
from docx import Document
from pypdf import PdfReader
from pptx import Presentation
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
PG_DSN = os.getenv("PG_DSN")
def get_pg():
return psycopg2.connect(PG_DSN)
def extract_text_from_docx(path):
doc = Document(path)
return "\n".join([para.text for para in doc.paragraphs if para.text.strip()])
def extract_text_from_pdf(path):
reader = PdfReader(path)
text = ""
for page in reader.pages:
extracted = page.extract_text()
if extracted:
text += extracted + "\n"
return text
def extract_text_from_pptx(path):
prs = Presentation(path)
text = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
text += shape.text + "\n"
return text
def extract_text_from_txt(path):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
def chunk_text(text, chunk_size=500, overlap=50):
words = text.split()
chunks = []
start = 0
while start < len(words):
end = start + chunk_size
chunk = " ".join(words[start:end])
if chunk.strip():
chunks.append(chunk)
start += chunk_size - overlap
return chunks
def make_id(filepath, chunk_index):
path_hash = hashlib.md5(str(filepath).encode()).hexdigest()[:8]
return f"{path_hash}_{chunk_index}"
def enqueue_stage2(source, full_text):
"""Enqueue document for Stage 2 (Mistral orientation) → Stage 3 (Graphiti ingest).
TEMPORARY: this queue feed will be removed when pgvector is decommissioned
and the watcher calls Stage 2 directly.
"""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
char_length = EXCLUDED.char_length,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text[:50000], len(full_text)))
pg.commit()
pg.close()
except Exception as e:
print(f" Stage 2 queue insert failed (non-fatal): {e}")
def ingest_file(filepath):
path = Path(filepath)
suffix = path.suffix.lower()
if path.name.startswith("~$") or path.name.startswith("."):
return 0
try:
if suffix == ".docx":
text = extract_text_from_docx(path)
elif suffix == ".pdf":
text = extract_text_from_pdf(path)
elif suffix == ".pptx":
text = extract_text_from_pptx(path)
elif suffix in [".txt", ".md"]:
text = extract_text_from_txt(path)
else:
return 0
if not text.strip():
return 0
chunks = chunk_text(text)
if not chunks:
return 0
embeddings = embedder.encode(chunks).tolist()
ids = [make_id(path, i) for i in range(len(chunks))]
metadatas = [{
"source": path.name,
"filepath": str(path),
"folder": str(path.parent.relative_to(Path(sys.argv[1]) if len(sys.argv) > 1 else path.parent))
} for _ in chunks]
# STAGE 1: Write to pgvector (TEMPORARY — remove when chat agent migrates to Graphiti)
pg = get_pg()
cur = pg.cursor()
for chunk_id, chunk, embedding, meta in zip(ids, chunks, embeddings, metadatas):
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (
chunk_id, chunk, embedding,
meta.get("source"), "document", None,
json.dumps(meta)
))
pg.commit()
pg.close()
print(f" Indexed {len(chunks)} chunks: {path.name}")
# Enqueue for Stage 2 → Stage 3 (Graphiti pipeline)
# SKIP_STAGE2_ENQUEUE env var set by migration scripts to prevent bulk enqueue
if not os.getenv("SKIP_STAGE2_ENQUEUE"):
enqueue_stage2(path.name, text)
return len(chunks)
except Exception as e:
print(f" Error: {path.name}: {e}")
return 0
def ingest_folder(folder_path):
folder = Path(folder_path)
if not folder.exists():
print(f"Folder not found: {folder_path}")
sys.exit(1)
supported = [".docx", ".pdf", ".pptx", ".txt", ".md"]
files = [f for f in folder.rglob("*")
if f.suffix.lower() in supported
and not f.name.startswith("~$")
and not f.name.startswith(".")]
if not files:
print("No supported files found.")
sys.exit(1)
print(f"Found {len(files)} files to process\n")
total_chunks = 0
for f in files:
total_chunks += ingest_file(f)
print(f"\nDone. Total chunks indexed: {total_chunks}")
if __name__ == "__main__":
target = sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "aaronai" / "docs")
print(f"Ingesting from: {target}\n")
ingest_folder(target)
+1 -1
View File
@@ -135,7 +135,7 @@ def enqueue_stage2(source: str, full_text: str):
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text[:50000], len(full_text)))
""", (source, full_text, len(full_text)))
pg.commit()
pg.close()
except Exception as e:
+210
View File
@@ -0,0 +1,210 @@
import time
import subprocess
import logging
import json
import threading
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
INGEST_SCRIPT = "/home/aaron/aaronai/scripts/ingest.py"
PYTHON = "/home/aaron/aaronai/venv/bin/python3"
LOG_FILE = "/home/aaron/aaronai/watcher.log"
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
SUPPORTED = {'.pdf', '.docx', '.pptx', '.txt', '.md'}
DEBOUNCE_SECONDS = 120
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
ingestion_state = {
"status": "idle",
"message": "",
"file_count": 0,
"started_at": None,
"finished_at": None,
"last_error": "",
}
ingestion_lock = threading.Lock()
ingestion_thread = None
def set_ingestion_state(**kwargs):
with ingestion_lock:
ingestion_state.update(kwargs)
def load_state():
if Path(STATE_FILE).exists():
with open(STATE_FILE) as f:
return json.load(f)
return {}
def save_state(state):
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
def get_changed_files():
state = load_state()
changed = []
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_dir():
continue
if path.suffix.lower() not in SUPPORTED:
continue
if path.name.startswith('.') or path.name.startswith('~$'):
continue
mtime = str(path.stat().st_mtime)
key = str(path)
if state.get(key) != mtime:
changed.append(path)
return changed, state
def run_ingestion():
changed, state = get_changed_files()
if not changed:
logging.info("No new or changed files detected — skipping ingestion.")
set_ingestion_state(status="idle", message="No changes detected", file_count=0)
return
count = len(changed)
logging.info(f"Found {count} new or changed files — starting ingestion...")
set_ingestion_state(
status="ingesting",
message=f"Ingesting {count} file(s)...",
file_count=count,
started_at=time.time(),
finished_at=None,
last_error="",
)
try:
result = subprocess.run(
[PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH],
capture_output=True,
text=True,
timeout=1800
)
if result.returncode == 0:
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_file() and path.suffix.lower() in SUPPORTED:
state[str(path)] = str(path.stat().st_mtime)
save_state(state)
logging.info("Ingestion complete. State updated.")
set_ingestion_state(
status="idle",
message=f"Last run: ingested {count} file(s) successfully",
finished_at=time.time(),
)
else:
logging.error(f"Ingestion error: {result.stderr}")
set_ingestion_state(
status="error",
message="Ingestion failed — see log",
last_error=result.stderr[-300:],
finished_at=time.time(),
)
except subprocess.TimeoutExpired:
logging.error("Ingestion timed out.")
set_ingestion_state(
status="error",
message="Ingestion timed out (>30 min)",
last_error="TimeoutExpired",
finished_at=time.time(),
)
except Exception as e:
logging.error(f"Ingestion failed: {e}")
set_ingestion_state(
status="error",
message=f"Ingestion exception: {e}",
last_error=str(e),
finished_at=time.time(),
)
def start_ingestion_thread():
global ingestion_thread
if ingestion_thread and ingestion_thread.is_alive():
logging.info("Ingestion already running — skipping.")
return
ingestion_thread = threading.Thread(target=run_ingestion, daemon=True)
ingestion_thread.start()
class IngestHandler(FileSystemEventHandler):
def __init__(self):
self.pending = False
self.last_event = 0
def on_any_event(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED:
return
if path.name.startswith('.') or path.name.startswith('~$'):
return
if 'Admin/Backups' in str(path) or 'Backups' in path.parts:
return
if 'Journal/Media' in str(path):
return
if event.event_type not in ('modified', 'created', 'moved'):
return
logging.info(f"Event: {event.event_type} {event.src_path}")
self.pending = True
self.last_event = time.time()
def write_status(handler):
with ingestion_lock:
status = {
"running": True,
"timestamp": time.time(),
"pending": handler.pending,
"last_event": handler.last_event,
"ingestion": dict(ingestion_state),
}
with open(STATUS_FILE, 'w') as f:
json.dump(status, f)
def main():
logging.info("Aaron AI Watcher starting...")
logging.info(f"Watching: {NEXTCLOUD_PATH}")
handler = IngestHandler()
observer = Observer()
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
observer.start()
try:
while True:
write_status(handler)
if handler.pending:
elapsed = time.time() - handler.last_event
if elapsed >= DEBOUNCE_SECONDS:
handler.pending = False
start_ingestion_thread()
time.sleep(5)
except KeyboardInterrupt:
observer.stop()
observer.join()
logging.info("Watcher stopped.")
if __name__ == "__main__":
main()
+448
View File
@@ -0,0 +1,448 @@
"""
Aaron AI Watcher — Stage 1 of the encoding pipeline.
Watches the Nextcloud directory for new or changed files.
On detection, chunks + embeds documents in-process (no subprocess),
then enqueues to stage_2_queue for async cascade processing.
Design principles:
- Embedding model loaded ONCE at startup, reused across all ingest runs
- In-process ingest (no subprocess) — eliminates per-run model reload memory spike
- Missed-file recovery on startup — ingests anything new since last state
- Heartbeat file updated every loop tick — enables external health monitoring
- Parity principle: no filtering, no decisions, faithful capture
- Does NOT enqueue to stage_2_queue during bulk migration (SKIP_STAGE2_ENQUEUE env var)
Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3_queue -> Stage 3 (Graphiti)
"""
import os
import time
import json
import hashlib
import logging
import threading
from pathlib import Path
import psycopg2
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from docx import Document as DocxDocument
from pypdf import PdfReader
from pptx import Presentation
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
LOG_FILE = "/home/aaron/aaronai/watcher.log"
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat"
SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"}
DEBOUNCE_SECONDS = 120
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
EMBED_MODEL = "all-MiniLM-L6-v2"
PG_DSN = os.getenv("PG_DSN")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [watcher] %(levelname)s %(message)s",
handlers=[logging.FileHandler(LOG_FILE)],
)
log = logging.getLogger("watcher")
ingestion_lock = threading.Lock()
ingestion_state = {
"status": "idle", "message": "", "file_count": 0,
"started_at": None, "finished_at": None, "last_error": "",
}
ingestion_thread = None
def load_embedder():
log.info(f"Loading embedding model: {EMBED_MODEL}")
model = SentenceTransformer(EMBED_MODEL)
log.info("Embedding model ready.")
return model
def get_pg():
return psycopg2.connect(PG_DSN)
def extract_text(path: Path) -> str:
suffix = path.suffix.lower()
try:
if suffix == ".docx":
doc = DocxDocument(path)
return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
elif suffix == ".pdf":
reader = PdfReader(path)
return "".join(
page.extract_text() + "\n"
for page in reader.pages if page.extract_text()
)
elif suffix == ".pptx":
prs = Presentation(path)
return "\n".join(
shape.text for slide in prs.slides
for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip()
)
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
log.warning(f"Text extraction failed for {path.name}: {e}")
record_ingest_failure(path, f"Text extraction failed: {e}")
return ""
def chunk_text(text: str) -> list:
words = text.split()
chunks = []
start = 0
while start < len(words):
chunk = " ".join(words[start:start + CHUNK_SIZE])
if chunk.strip():
chunks.append(chunk)
start += CHUNK_SIZE - CHUNK_OVERLAP
return chunks
def make_chunk_id(filepath: Path, chunk_index: int) -> str:
return hashlib.md5(str(filepath).encode()).hexdigest()[:8] + f"_{chunk_index}"
def enqueue_stage2(source: str, full_text: str):
if os.getenv("SKIP_STAGE2_ENQUEUE"):
return
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
char_length = EXCLUDED.char_length,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text[:50000], len(full_text)))
pg.commit()
pg.close()
except Exception as e:
log.warning(f"Stage 2 enqueue failed (non-fatal): {e}")
def record_ingest_failure(filepath: Path, error: str):
"""Write extraction or ingest failure to ingest_failures table for UI visibility."""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at)
VALUES (%s, %s, %s, 0, NOW(), NOW())
ON CONFLICT (source) DO UPDATE SET
error = EXCLUDED.error,
retry_count = ingest_failures.retry_count + 1,
last_failed_at = NOW(),
resolved = FALSE
""", (filepath.name, str(filepath), error[:1000]))
pg.commit()
pg.close()
except Exception as e:
log.warning(f"Could not record ingest failure (non-fatal): {e}")
def resolve_ingest_failure(source: str):
"""Mark a previously failed file as resolved after successful ingest."""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("UPDATE ingest_failures SET resolved = TRUE WHERE source = %s", (source,))
pg.commit()
pg.close()
except Exception as e:
log.warning(f"Could not resolve ingest failure record (non-fatal): {e}")
def ingest_file(filepath: Path, embedder) -> int:
if filepath.name.startswith(("~$", ".")):
return 0
if filepath.suffix.lower() not in SUPPORTED:
return 0
text = extract_text(filepath)
if not text.strip():
return 0
chunks = chunk_text(text)
if not chunks:
return 0
try:
embeddings = embedder.encode(chunks).tolist()
except Exception as e:
log.error(f"Embedding failed for {filepath.name}: {e}")
record_ingest_failure(filepath, f"Embedding failed: {e}")
return 0
source = filepath.name
try:
pg = get_pg()
cur = pg.cursor()
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
chunk_id = make_chunk_id(filepath, i)
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (chunk_id, chunk, embedding, source, "document",
json.dumps({"source": source, "filepath": str(filepath)})))
pg.commit()
pg.close()
except Exception as e:
log.error(f"pgvector write failed for {filepath.name}: {e}")
record_ingest_failure(filepath, f"pgvector write failed: {e}")
return 0
log.info(f"Indexed {len(chunks)} chunks: {filepath.name}")
resolve_ingest_failure(source)
enqueue_stage2(source, text)
return len(chunks)
def ingest_files(paths: list, embedder, state: dict) -> dict:
total = 0
for path in paths:
count = ingest_file(path, embedder)
total += count
state[str(path)] = str(path.stat().st_mtime)
log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.")
return state
def load_state() -> dict:
if Path(STATE_FILE).exists():
try:
with open(STATE_FILE) as f:
return json.load(f)
except Exception:
pass
return {}
def save_state(state: dict):
with open(STATE_FILE, "w") as f:
json.dump(state, f)
def get_changed_files(state: dict) -> list:
changed = []
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_dir():
continue
if path.suffix.lower() not in SUPPORTED:
continue
if path.name.startswith((".", "~$")):
continue
if "Admin/Backups" in str(path) or "Backups" in path.parts:
continue
if "Journal/Media" in str(path):
continue
if state.get(str(path)) != str(path.stat().st_mtime):
changed.append(path)
return changed
def set_ingestion_state(**kwargs):
with ingestion_lock:
ingestion_state.update(kwargs)
def write_status(handler):
with ingestion_lock:
status = {
"running": True, "timestamp": time.time(),
"pending": handler.pending, "last_event": handler.last_event,
"ingestion": dict(ingestion_state),
}
try:
with open(STATUS_FILE, "w") as f:
json.dump(status, f)
except Exception:
pass
def write_heartbeat():
try:
Path(HEARTBEAT_FILE).write_text(str(time.time()))
except Exception:
pass
def run_ingestion(embedder):
state = load_state()
changed = get_changed_files(state)
if not changed:
log.info("No new or changed files — skipping ingestion.")
set_ingestion_state(status="idle", message="No changes detected", file_count=0)
return
count = len(changed)
log.info(f"Found {count} new or changed files — starting ingestion...")
set_ingestion_state(
status="ingesting", message=f"Ingesting {count} file(s)...",
file_count=count, started_at=time.time(), finished_at=None, last_error="",
)
try:
state = ingest_files(changed, embedder, state)
save_state(state)
set_ingestion_state(
status="idle",
message=f"Last run: ingested {count} file(s) successfully",
finished_at=time.time(),
)
except Exception as e:
log.error(f"Ingestion failed: {e}")
set_ingestion_state(
status="error", message=f"Ingestion exception: {e}",
last_error=str(e), finished_at=time.time(),
)
def start_ingestion_thread(embedder):
global ingestion_thread
with ingestion_lock:
if ingestion_thread and ingestion_thread.is_alive():
log.info("Ingestion already running — skipping.")
return
ingestion_thread = threading.Thread(
target=run_ingestion, args=(embedder,), daemon=True
)
ingestion_thread.start()
class IngestHandler(FileSystemEventHandler):
def __init__(self):
self.pending = False
self.last_event = 0
def _should_ignore(self, path: Path) -> bool:
if path.name.startswith((".", "~$")):
return True
if "Admin/Backups" in str(path) or "Backups" in path.parts:
return True
if "Journal/Media" in str(path):
return True
return False
def on_created(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
return
log.info(f"Event: created {path}")
self.pending = True
self.last_event = time.time()
def on_modified(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
return
log.info(f"Event: modified {path}")
self.pending = True
self.last_event = time.time()
def on_moved(self, event):
if event.is_directory:
return
# Nextcloud WebDAV writes .part temp files then renames to final path.
# src_path is the .part file; dest_path is the final filename.
dest = Path(event.dest_path)
if dest.suffix.lower() not in SUPPORTED or self._should_ignore(dest):
return
log.info(f"Event: moved -> {dest}")
self.pending = True
self.last_event = time.time()
def on_closed(self, event):
# FileClosedEvent fires on the final file after Nextcloud completes write.
# Belt-and-suspenders catch for any write pattern not caught by on_moved.
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
return
log.info(f"Event: closed {path}")
self.pending = True
self.last_event = time.time()
def main():
log.info("Aaron AI Watcher starting...")
log.info(f"Watching: {NEXTCLOUD_PATH}")
embedder = load_embedder()
log.info("Startup scan: checking for files missed since last run...")
state = load_state()
missed = get_changed_files(state)
if missed:
log.info(f"Startup recovery: {len(missed)} missed file(s) — ingesting now.")
set_ingestion_state(
status="ingesting",
message=f"Startup recovery: ingesting {len(missed)} missed file(s)...",
file_count=len(missed), started_at=time.time(),
)
try:
state = ingest_files(missed, embedder, state)
save_state(state)
set_ingestion_state(
status="idle",
message=f"Startup recovery complete: {len(missed)} file(s) ingested.",
finished_at=time.time(),
)
except Exception as e:
log.error(f"Startup recovery failed: {e}")
set_ingestion_state(status="error", message=str(e),
last_error=str(e), finished_at=time.time())
else:
log.info("Startup scan: no missed files.")
handler = IngestHandler()
observer = Observer()
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
observer.start()
log.info("Observer started.")
try:
while True:
write_heartbeat()
write_status(handler)
if handler.pending:
elapsed = time.time() - handler.last_event
if elapsed >= DEBOUNCE_SECONDS:
handler.pending = False
start_ingestion_thread(embedder)
time.sleep(5)
except KeyboardInterrupt:
log.info("KeyboardInterrupt — stopping.")
observer.stop()
observer.join()
log.info("Watcher stopped.")
if __name__ == "__main__":
main()