Stage 3 worker v2.1 — saga-size limit + wedge detection + sudoers fixes

Production incident 2026-05-01: F14 re-cascade attempt surfaced three
compounding issues in cascade resilience.

stage3_worker.py changes:
- MAX_CHUNKS_PER_SAGA=10 — large documents split into multiple bulk
  commits, all sharing the same saga tag for Graphiti document linking.
  Original implementation sent all chunks as one saga; 17-19 chunk sagas
  deadlocked sidecar's Python-side coordination.
- recover_wedge() function — restarts aaronai-graphiti.service when
  consecutive_failures hits threshold. Mirrors Stage 2 pattern.
- run() loop adds consecutive_failures counter with threshold-2
  escalation. Resolves F28 + F29 from code review.
- Worker version bumped 2.0 -> 2.1.
- post_bulk() helper extracts shared HTTP POST + error handling.

Outside-repo changes (system config, separately documented):
- WatchdogSec=600 commented in stage2 + stage3 systemd unit files.
  Workers have no sd_notify support; per-request timeouts in code
  handle the actual failure modes.
- /etc/sudoers.d/aaron-aaronai created with NOPASSWD entries for
  systemctl restart ollama and restart aaronai-graphiti.service.
  Stage 2's existing recover_wedge() was silently broken since
  deployment due to this gap.

.gitignore — added rules for *.bak files, runtime artifacts
(watcher_heartbeat, dreamer_state.json, corpus_integrity_report.json,
watcher_state.json, watcher_status.json), Python cruft, virtual env,
.env, editor/OS files, and Aaron AI runtime data (conversations.db,
sessions.db, memory.md, settings.json).

Untracked 11 files that shouldn't have been committed in 465f2f7
(this morning): backup files and runtime artifacts.

Re-cascading Shop Class (414KB) and BirdAI-Experiments-Log.md (192KB)
through the patched worker after re-extracting full text from disk.
Cascade in progress at commit time.
This commit is contained in:
2026-05-01 05:18:09 +00:00
parent 465f2f725b
commit b936931668
14 changed files with 150 additions and 4459 deletions
+37 -22
View File
@@ -1,34 +1,49 @@
# Environment and secrets # Backup files (rely on git history instead)
.env *.bak
*.env *.bak.*
# Databases # Runtime artifacts
db/ watcher_heartbeat
conversations.db dreamer_state.json
sessions.db corpus_integrity_report.json
watcher_state.json watcher_state.json
watcher_status.json watcher_status.json
# Python # Logs (these belong in /var/log/)
*.log
# Python artifacts
__pycache__/ __pycache__/
*.pyc *.pyc
*.pyo *.pyo
*.pyd
.pytest_cache/
*.egg-info/
# Virtual environment
venv/ venv/
.venv/
# Logs # Environment and secrets
*.log .env
.env.local
.env.*.local
# Memory and settings (personal data) # Editor and OS cruft
memory.md .vscode/
settings.json .idea/
*.swp
# Backups *.swo
Admin/
# OS
.DS_Store .DS_Store
Thumbs.db Thumbs.db
dreamer_state.json
migration_progress.json # Local data not for repo
dreamer_state.json db/
migration_progress.json embeddings/
experiments/summary_embeddings_cache.json
# Aaron AI runtime data (personal, do not commit)
conversations.db
sessions.db
memory.md
settings.json
-161
View File
@@ -1,161 +0,0 @@
{
"timestamp": "2026-04-30T21:58:06.498354",
"summary": {
"filesystem_total": 1399,
"pgvector_total": 1215,
"graphiti_total": 1205,
"both": 998,
"pgvector_only": 10,
"neither": 129,
"graphiti_only": 0,
"failures": 0,
"orphans_pgvector": 207,
"orphans_graphiti": 207
},
"gaps": [
"test.md",
"test-watcher-trigger.md",
"Renders.pptx",
"Ribbon Cutting Slideshow.pptx",
"PRINTERS.docx",
"Course Calender.docx",
"Attendance.docx",
"GH Slicer Notes [Autosaved].pptx",
"GH Slicer Notes.pptx",
"06_Surface Commands.docx",
"03_Precision_1 Homework.docx",
"02_2D Geometry Toolbars.docx",
"00_Roster.docx",
"Homeworks.docx",
"02_Osnap_ModelingAids.docx",
"06_Solids.docx",
"04_Precision_2 Homework.docx",
"~\ufffdMod08_Post_Processing_2023.pptx",
"~\ufffdMod02_Industries_and_Applications_2023.pptx",
"~\ufffdMod04_FDM_Materials_2023.pptx",
"Byron_V Independent Study.pdf",
"IanC.Lorber_Final_2dsketches..pdf",
"Irene_Raptopoulos_Final.pdf",
"rcolon-final-schematic.pdf",
"kellogg_schematic_schem.pdf",
"Visa Soccer Goal Frame Drawing.pdf",
"4_13_2017_0_22_14.pdf",
"4_13_2017_0_50_29.pdf",
"4_13_2017_0_49_25.pdf",
"4_13_2017_12_27_19.pdf",
"4_13_2017_12_26_54.pdf",
"4_12_2017_21_7_14.pdf",
"4_13_2017_12_25_49.pdf",
"4_13_2017_12_26_5.pdf",
"4_13_2017_12_27_8.pdf",
"IanC.Lorber_Final_2dsketches - 2016.pdf",
"MidtermLayout1.pdf",
"MidtermLayout2.pdf",
"pascar_finalSchematic.pdf",
"Week 7.pdf",
"EXAMPLE Art Education IM Rubric and Scaffold UPDATED copy.docx",
"DDF - IM Rubric and Scaffold UPDATED.docx",
"DI_9.pdf",
"DI_6.pdf",
"DI_4.pdf",
"DI_7.pdf",
"DI_8.pdf",
"DI_2.pdf",
"DI_1.pdf",
"DI_3.pdf",
"DI_5.pdf",
"JN EVAL.pdf",
"Amazon.com_ Iwata Eclipse HP-CS Airbrush - Gravity Feed Dual Action, High-Flow Atomization for Fine Detail to Wide Coverage \u2013 E3 Nozzle, 0.pdf",
"RCPA1.pdf",
"image2022-01-07-133846 - CAryn.pdf",
"image2022-01-07-134536 - Conference.pdf",
"image2022-01-07-131439.pdf",
"image2022-01-07-135248 - BSC Revision.pdf",
"image2022-01-07-141250- LAI.pdf",
"image2022-01-07-134938 - Maker in Residence.pdf",
"image2022-01-07-133157 - Teching Evals.pdf",
"image2022-01-07-135538 PRS Medal.pdf",
"image2022-01-07-131217.pdf",
"image2022-01-07-135911 - HVAMC.pdf",
"image2022-01-07-133504 - Sarah and OLiva.pdf",
"image2022-01-07-131903 - Annual Reports.pdf",
"image2022-01-07-132917 - Mastery.pdf",
"Annual Report - 2016.pdf",
"Annual Report - 2018.pdf",
"Annual Report - 2019.pdf",
"Annual Report - 2017.pdf",
"MOU 2018 - 19.pdf",
"Appointment 2016.pdf",
"Appointment 2019 - 2021.pdf",
"MOU 2017.pdf",
"SKMBT_55220060909245.pdf",
"Dean SSE Letter 2018.pdf",
"Dean FPA Letter 2018.pdf",
"Sarah Thesis Article.pdf",
"Caryn Bylott Thesis.pdf",
"Olivia Thesis.pdf",
"Teching Evals.pdf",
"Maker in Residence - Dalles v2.pdf",
"Modela Gallery - Ripples from Stillwater.pdf",
"Maker in Residence - Dalles.pdf",
"Makerbot Innovation Center Director.pdf",
"Design Week 2019.pdf",
"DW Workshops.pdf",
"Ron Rael Lecture.pdf",
"BSC Revision.pdf",
"President's Medal.pdf",
"CNC Workshop.pdf",
"Dorksy.pdf",
"FULL SCAN.pdf",
"DDF Course Fee Rev 2017.pdf",
"SlideSlam 2019.pdf",
"Candy and Cold Cases.pdf",
"Murder ID.pdf",
"HVAMC SuperLab.pdf",
"Certifications.pdf",
"Brazkem Letter.pdf",
"Havana 2017.pdf",
"CAA Conference - 2020.pdf",
"CAA Conference 2019.pdf",
"Central Hudson Grant.pdf",
"3D Printing Updates - Council of Industry.pdf",
"k12 Training.pdf",
"Hudson Valley Futures Summit.pdf",
"StateAssemblymanTour.pdf",
"Best of New Paltz.pdf",
"HVAMC - ColorPage.pdf",
"HVAMC - Daily Freeman.pdf",
"Chocolate Skulls.pdf",
"10_13_2016_21_19_43.pdf",
"10_13_2016_21_19_46.pdf",
"10_13_2016_21_19_1.pdf",
"10_13_2016_21_18_55.pdf",
"161012_102427.pdf",
"161012_102737.pdf",
"161012_114217.pdf",
"Eto Forms.txt",
"AaronNelsonUndergraduteTranscript Unsecured.pdf",
"Aaron Nelson Transcript Undergradute Unsecured.pdf",
"AP023.pdf",
"How Buildings Learn_ What Happens After They are Built -- Stewart Brand .pdf",
"Occupying and connecting _ thoughts on territories and -- Frei Otto; Berthold Burkhardt.pdf",
"SilkwormManual.pdf",
"NELSON commitment letter.pdf",
"i-9.pdf"
],
"failures": [],
"auto_queued": [],
"pgvector_only_sample": [
"dreamer_changelog.md",
"experiments-log-additions-2026-04-30.md",
"2026-04-30-15-59-voice.md",
"2026-04-30-16-59-voice.md",
"2026-04-30-16-53-voice.md",
"2026-04-30-17-06-voice.md",
"2026-04-30-16-23-voice.md",
"2026-04-30-late-rem.md",
"2026-04-30-synthesis.md",
"2026-04-30-nrem.md"
],
"graphiti_only": []
}
-46
View File
@@ -1,46 +0,0 @@
{
"last_dream_timestamp": 1777536047.392913,
"last_dream_mode": "pipeline",
"last_dream_file": "Journal/Dreams/2026-04-30-synthesis.md",
"retrieved_sources": [
"ChatGPT: CV Summary Request",
"ChatGPT: Program response drafting",
"Cognition in the Wild (A Bradford Book) -- Hutchins, Edwin.pdf",
"ChatGPT: Digital Fabrication Cultural Project",
"Dossier Narrative.pdf",
"E1_8-taxonomy-free-cascade-protocol.md",
"References.docx",
"ChatGPT: Career change anxiety",
"Dossier Narrative.docx",
"Dossier Narrative Kill Me PLS_REV_HOME.docx",
"Aaron Nelson Tenure Dossier Narrative.pdf",
"Claude: Importing chat history from ChatGPT",
"aaronai-architecture.md",
"Aaron AI: What should I be the most excited about right now?",
"ChatGPT: Dean Position Evaluation",
"2026-04-27-early-rem-1.md",
"The Poetics of Space -- Gaston Bachelard translated from the French by Maria Jolas -- First Edition, 1994.pdf",
"Dossier Narrative Kill Me PLS.docx",
"Advances in Architectural Geometry 2023 -- Kathrin D\u00f6rfler (editor); Jan Knippers (editor); Achim.pdf",
"Claude: I filling out my annual report...",
"References.pdf",
"2026-04-28-early-rem.md",
"Claude: Law enforcement career options",
"Dossier Narrative Kill Me PLS_REV_2.docx",
"2026-04-29-late-rem.md",
"Dossier Narrative Kill Me PLS_REV.docx",
"Utah MDD - Aaron Nelson - Copy.pptx",
"Dossier Narrative Kill Me PLS_REV_3.docx",
"Aaron AI: Who's covering for me on sabbatical?",
"ChatGPT: GA Proposal Revision Guide",
"BirdAI-Experiments-Log.md",
"Mod06_GrabCAD_Print_and _Advanced_FDM_2023.pptx",
"The Extended Mind _ The Power of Thinking Outside the Brain -- Annie Murphy Paul.pdf",
"Company of One -- Paul Jarvis.pdf",
"Fabrication Processes_Syllabus.docx",
"ChatGPT: Digital fabrication education",
"Fabrication Processes_Syllabus DDF710 V3.docx",
"Claude: Setting up a custom OpenClaw instance",
"Claude: Weighing Utah versus Oklahoma"
]
}
File diff suppressed because it is too large Load Diff
-442
View File
@@ -1,442 +0,0 @@
"""
Consolidator 0.1 — alias resolution agent for BirdAI's Tier 1 substrate.
Reads entities from FalkorDB group_id 'aaron', infers light type labels,
computes pairwise similarity within type blocks using ego summary embedding +
name string distance + neighbor pattern overlap, generates merge proposals
above threshold, writes proposal log for human review.
Does NOT execute merges. 0.1 is the calibration phase — proposals only,
human reviews before any action.
"""
import json
import re
import os
from datetime import datetime, timezone
from collections import defaultdict
from pathlib import Path
from falkordb import FalkorDB
import numpy as np
# Configuration
GROUP_ID = "aaron"
HIGH_CONFIDENCE_THRESHOLD = 0.85 # propose merge above this
LOW_CONFIDENCE_THRESHOLD = 0.65 # log as low-confidence below
PROPOSALS_DIR = Path("/home/aaron/Nextcloud/Journal/Consolidation")
PROPOSALS_DIR.mkdir(parents=True, exist_ok=True)
def cosine_similarity(a, b):
"""Cosine similarity between two embedding vectors."""
a = np.array(a, dtype=np.float32)
b = np.array(b, dtype=np.float32)
na = np.linalg.norm(a)
nb = np.linalg.norm(b)
if na == 0 or nb == 0:
return 0.0
return float(np.dot(a, b) / (na * nb))
def name_similarity(name_a, name_b):
"""
Token-overlap-based name similarity.
Handles formal/informal pairs (Aaron / Aaron Nelson),
abbreviation pairs (HVAMC / Hudson Valley AMC),
and simple transcription noise.
"""
a_lower = name_a.lower().strip()
b_lower = name_b.lower().strip()
if a_lower == b_lower:
return 1.0
# Tokenize
a_tokens = set(re.findall(r'\b\w+\b', a_lower))
b_tokens = set(re.findall(r'\b\w+\b', b_lower))
if not a_tokens or not b_tokens:
return 0.0
# Substring containment (handles "Aaron" in "Aaron Nelson")
if a_lower in b_lower or b_lower in a_lower:
# Strong signal but not 1.0 — different lengths
shorter = min(len(a_lower), len(b_lower))
longer = max(len(a_lower), len(b_lower))
return 0.7 + 0.2 * (shorter / longer)
# Token Jaccard (handles "Aaron Nelson" vs "Nelson, Aaron")
intersection = a_tokens & b_tokens
union = a_tokens | b_tokens
jaccard = len(intersection) / len(union)
# Acronym check (HVAMC vs Hudson Valley Additive Manufacturing Center)
def is_acronym(short, full):
if len(short) >= len(full):
return False
if not short.isupper():
short_upper = short.upper()
else:
short_upper = short
full_words = full.split()
if len(full_words) < 2:
return False
first_letters = ''.join(w[0].upper() for w in full_words if w)
return short_upper == first_letters or short_upper in first_letters
if is_acronym(name_a, name_b) or is_acronym(name_b, name_a):
return 0.85
return jaccard
def infer_type(entity_name, summary):
"""
Light type inference for blocking. Heuristic-based, transparent.
Returns one of: person, organization, project, place, concept, unknown.
NOT a precise classification — just enough to avoid obviously wrong
cross-type comparisons (person vs project). When in doubt, return
'unknown' which gets compared against everything.
"""
name_lower = entity_name.lower().strip()
summary_lower = (summary or "").lower()
# Person: name patterns
person_indicators = [
# First+Last name pattern (two title-cased words, no other tokens)
bool(re.match(r'^[A-Z][a-z]+ [A-Z][a-z]+$', entity_name.strip())),
# Single name that's also in the summary as a person
any(phrase in summary_lower for phrase in [
'is a person', 'is a professor', 'is an artist', 'is a colleague',
'is a friend', 'is a family member', 'works at', 'studied at',
"'s spouse", "'s child", "'s parent", "'s student",
]),
]
if any(person_indicators):
return "person"
# Organization: company/institution indicators
org_indicators = [
any(suffix in name_lower for suffix in [
' inc', ' llc', ' corp', ' company', ' university', ' college',
' school', ' institute', ' foundation', ' department',
]),
any(phrase in summary_lower for phrase in [
'is a company', 'is a university', 'is an organization',
'is an institution', 'is a department', 'is a nonprofit',
]),
]
if any(org_indicators):
return "organization"
# Project: software/creative work indicators
project_indicators = [
any(phrase in summary_lower for phrase in [
'is a project', 'software project', 'is a codebase',
'is a tool', 'is a system', 'is an application',
'is a research project', 'is a design project',
]),
any(suffix in name_lower for suffix in [' project', ' system', ' platform']),
]
if any(project_indicators):
return "project"
# Place: location indicators
place_indicators = [
any(phrase in summary_lower for phrase in [
'is a city', 'is a town', 'is a state', 'is a country',
'is a neighborhood', 'is a region', 'is a location',
]),
]
if any(place_indicators):
return "place"
# Default
return "unknown"
def get_neighbors(graph, entity_uuid, limit=20):
"""Get the names of entities connected to this entity (1-hop)."""
query = """
MATCH (e:Entity {uuid: $uuid})-[r:RELATES_TO]-(other:Entity)
RETURN DISTINCT other.name AS name
LIMIT $limit
"""
result = graph.query(query, {"uuid": entity_uuid, "limit": limit})
return set(row[0] for row in result.result_set if row[0])
def neighbor_jaccard(neighbors_a, neighbors_b):
"""Jaccard similarity of two neighbor sets."""
if not neighbors_a and not neighbors_b:
return 0.0
intersection = neighbors_a & neighbors_b
union = neighbors_a | neighbors_b
if not union:
return 0.0
return len(intersection) / len(union)
def get_edge_count(graph, entity_uuid):
query = """
MATCH (e:Entity {uuid: $uuid})-[r:RELATES_TO]-()
RETURN count(r) AS c
"""
result = graph.query(query, {"uuid": entity_uuid})
return result.result_set[0][0] if result.result_set else 0
def combine_signals(name_sim, ego_sim, neighbor_sim):
"""
Combine the three similarity signals into a single confidence score.
Weighting based on DEG-RAG findings: ego info is essential, neighbor
cues help in some settings, name similarity is a strong tie-breaker
but not the primary signal.
For 0.1, simple weighted average with floor based on ego_sim alone.
"""
# If ego similarity is very low, the entities probably aren't aliases
# regardless of name match (different concepts can share names)
if ego_sim < 0.4:
return min(0.5, ego_sim)
# If name is very similar AND ego is at least moderate, high confidence
if name_sim >= 0.85 and ego_sim >= 0.65:
return 0.5 * ego_sim + 0.3 * name_sim + 0.2 * neighbor_sim
# Standard weighted average
return 0.5 * ego_sim + 0.25 * name_sim + 0.25 * neighbor_sim
def generate_proposals():
db = FalkorDB(host='localhost', port=6379)
graph = db.select_graph(GROUP_ID)
# Pull all entities with embeddings
print(f"Fetching entities from group_id '{GROUP_ID}'...")
result = graph.query("""
MATCH (n:Entity)
WHERE n.name_embedding IS NOT NULL AND n.summary IS NOT NULL
RETURN n.uuid, n.name, n.summary, n.name_embedding
""")
entities = []
for row in result.result_set:
entities.append({
'uuid': row[0],
'name': row[1],
'summary': row[2],
'embedding': row[3],
})
print(f" Loaded {len(entities)} entities with embeddings")
# Infer types for blocking
print("Inferring entity types for blocking...")
type_counts = defaultdict(int)
for e in entities:
e['inferred_type'] = infer_type(e['name'], e['summary'])
type_counts[e['inferred_type']] += 1
for t, c in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {t}: {c}")
# Group by inferred type for blocking
blocks = defaultdict(list)
for e in entities:
blocks[e['inferred_type']].append(e)
# 'unknown' entities get compared against everything (they might be any type)
# Other types only get compared within their type block + against unknowns
print()
print("Comparing entities within type blocks...")
proposals = []
low_confidence = []
comparisons_done = 0
# Build comparison pairs
pairs_to_compare = []
typed_blocks = {t: ents for t, ents in blocks.items() if t != 'unknown'}
unknown_block = blocks.get('unknown', [])
# Within-type pairs (excluding unknown)
for t, ents in typed_blocks.items():
for i in range(len(ents)):
for j in range(i + 1, len(ents)):
pairs_to_compare.append((ents[i], ents[j]))
# Unknown vs unknown
for i in range(len(unknown_block)):
for j in range(i + 1, len(unknown_block)):
pairs_to_compare.append((unknown_block[i], unknown_block[j]))
# Unknown vs typed (unknowns might be any type)
for ent_unknown in unknown_block:
for t, ents in typed_blocks.items():
for ent_typed in ents:
pairs_to_compare.append((ent_unknown, ent_typed))
print(f" Pairs to compare: {len(pairs_to_compare):,}")
# Compute similarities
cache_neighbors = {}
def neighbors_cached(uuid):
if uuid not in cache_neighbors:
cache_neighbors[uuid] = get_neighbors(graph, uuid)
return cache_neighbors[uuid]
for ent_a, ent_b in pairs_to_compare:
comparisons_done += 1
if comparisons_done % 5000 == 0:
print(f" ... {comparisons_done:,} / {len(pairs_to_compare):,}")
# Quick filter: skip if name similarity is very low and names are clearly different
name_sim = name_similarity(ent_a['name'], ent_b['name'])
ego_sim_quick = cosine_similarity(ent_a['embedding'], ent_b['embedding'])
# Pre-filter to avoid expensive neighbor query on obviously different pairs
if ego_sim_quick < 0.5 and name_sim < 0.3:
continue
# Full comparison
neighbors_a = neighbors_cached(ent_a['uuid'])
neighbors_b = neighbors_cached(ent_b['uuid'])
neighbor_sim = neighbor_jaccard(neighbors_a, neighbors_b)
confidence = combine_signals(name_sim, ego_sim_quick, neighbor_sim)
record = {
'entity_a': {
'uuid': ent_a['uuid'],
'name': ent_a['name'],
'type': ent_a['inferred_type'],
'summary': ent_a['summary'][:200],
'edge_count': get_edge_count(graph, ent_a['uuid']),
},
'entity_b': {
'uuid': ent_b['uuid'],
'name': ent_b['name'],
'type': ent_b['inferred_type'],
'summary': ent_b['summary'][:200],
'edge_count': get_edge_count(graph, ent_b['uuid']),
},
'confidence': round(confidence, 3),
'signals': {
'name_similarity': round(name_sim, 3),
'ego_similarity': round(ego_sim_quick, 3),
'neighbor_overlap': round(neighbor_sim, 3),
},
'shared_neighbors': sorted(list(neighbors_a & neighbors_b))[:10],
}
if confidence >= HIGH_CONFIDENCE_THRESHOLD:
proposals.append(record)
elif confidence >= LOW_CONFIDENCE_THRESHOLD:
low_confidence.append(record)
print(f"\nDone. Proposals: {len(proposals)}, Low-confidence: {len(low_confidence)}")
return proposals, low_confidence, len(entities), len(pairs_to_compare)
def write_proposals_log(proposals, low_confidence, total_entities, total_comparisons):
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H%M")
out_path = PROPOSALS_DIR / f"proposals-{timestamp}.md"
proposals_sorted = sorted(proposals, key=lambda p: -p['confidence'])
low_sorted = sorted(low_confidence, key=lambda p: -p['confidence'])
lines = []
lines.append(f"# Consolidator 0.1 — Run {timestamp}")
lines.append("")
lines.append("## Statistics")
lines.append(f"- Entities scanned: {total_entities:,}")
lines.append(f"- Pairwise comparisons: {total_comparisons:,}")
lines.append(f"- High-confidence proposals (≥{HIGH_CONFIDENCE_THRESHOLD}): {len(proposals)}")
lines.append(f"- Low-confidence candidates ({LOW_CONFIDENCE_THRESHOLD}-{HIGH_CONFIDENCE_THRESHOLD}): {len(low_confidence)}")
lines.append("")
lines.append("## How to review")
lines.append("")
lines.append("For each proposal, mark your decision by changing `[ ]` to one of:")
lines.append("- `[APPROVE]` — execute this merge on next run")
lines.append("- `[REJECT]` — don't merge, don't propose again")
lines.append("- `[DEFER]` — re-surface in next run for further consideration")
lines.append("")
lines.append("Save the file when done. Do not modify proposal_id or uuid fields.")
lines.append("")
lines.append("---")
lines.append("")
lines.append(f"## Proposed Merges (n={len(proposals)})")
lines.append("")
for i, p in enumerate(proposals_sorted, start=1):
lines.append(f"### Proposal {i}")
lines.append("")
lines.append(f"**Decision:** [ ]")
lines.append("")
lines.append(f"**Confidence:** {p['confidence']}")
lines.append("")
lines.append(f"**Entity A:** \"{p['entity_a']['name']}\" (type: {p['entity_a']['type']}, {p['entity_a']['edge_count']} edges)")
lines.append(f" - uuid: `{p['entity_a']['uuid']}`")
lines.append(f" - summary: {p['entity_a']['summary']}")
lines.append("")
lines.append(f"**Entity B:** \"{p['entity_b']['name']}\" (type: {p['entity_b']['type']}, {p['entity_b']['edge_count']} edges)")
lines.append(f" - uuid: `{p['entity_b']['uuid']}`")
lines.append(f" - summary: {p['entity_b']['summary']}")
lines.append("")
lines.append(f"**Signals:**")
lines.append(f" - Name similarity: {p['signals']['name_similarity']}")
lines.append(f" - Ego (summary) similarity: {p['signals']['ego_similarity']}")
lines.append(f" - Neighbor overlap: {p['signals']['neighbor_overlap']}")
if p['shared_neighbors']:
shared_str = ', '.join(f'"{n}"' for n in p['shared_neighbors'][:8])
lines.append(f" - Shared neighbors (sample): {shared_str}")
lines.append("")
lines.append("**Optional rejection note:** ")
lines.append("")
lines.append("---")
lines.append("")
lines.append("")
lines.append(f"## Low-Confidence Candidates (n={len(low_confidence)}, informational only, no action)")
lines.append("")
for p in low_sorted[:30]:
lines.append(f"- **{p['confidence']}** \"{p['entity_a']['name']}\" + \"{p['entity_b']['name']}\" (name={p['signals']['name_similarity']}, ego={p['signals']['ego_similarity']}, nbr={p['signals']['neighbor_overlap']})")
if len(low_sorted) > 30:
lines.append(f"- *(...{len(low_sorted) - 30} more not shown)*")
out_path.write_text("\n".join(lines))
print(f"\nProposal log written to: {out_path}")
# Also save raw JSON for downstream tooling
json_path = PROPOSALS_DIR / f"proposals-{timestamp}.json"
with open(json_path, 'w') as f:
json.dump({
'run_timestamp': timestamp,
'statistics': {
'total_entities': total_entities,
'total_comparisons': total_comparisons,
'proposal_count': len(proposals),
'low_confidence_count': len(low_confidence),
},
'proposals': proposals_sorted,
'low_confidence': low_sorted,
}, f, indent=2)
print(f"Raw JSON: {json_path}")
def main():
print("=" * 70)
print("Consolidator 0.1 — Calibration Phase")
print("=" * 70)
print()
proposals, low_confidence, total_entities, total_comparisons = generate_proposals()
write_proposals_log(proposals, low_confidence, total_entities, total_comparisons)
print()
print("Next: review the proposals markdown file and mark APPROVE/REJECT/DEFER")
print("for each proposal. Re-run will read decisions and execute approved merges.")
if __name__ == "__main__":
main()
@@ -1,245 +0,0 @@
#!/usr/bin/env python3
"""
corpus_integrity.py — BirdAI Corpus Integrity Check
Compares three sources of truth:
1. Filesystem (Nextcloud) — what files exist
2. pgvector (embeddings table) — what's been through Stage 1
3. Graphiti (migration state + stage_3_queue) — what's been through Stage 3
Usage:
python3 corpus_integrity.py # report only
python3 corpus_integrity.py --fix # report + auto-queue gaps for retry
python3 corpus_integrity.py --json # output JSON to stdout
"""
import os
import sys
import json
import argparse
from pathlib import Path
from datetime import datetime
import psycopg2
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
MIGRATION_STATE = str(Path.home() / "aaronai" / "experiments" / "tier1_migration_state.json")
REPORT_PATH = str(Path.home() / "aaronai" / "corpus_integrity_report.json")
SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"}
PG_DSN = os.getenv("PG_DSN")
def get_pg():
return psycopg2.connect(PG_DSN)
def get_filesystem_files():
files = []
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_dir(): continue
if path.suffix.lower() not in SUPPORTED: continue
if path.name.startswith((".", "~$")): continue
if "Admin/Backups" in str(path) or "Backups" in path.parts: continue
if "Journal/Media" in str(path): continue
files.append({"source": path.name, "filepath": str(path),
"size": path.stat().st_size, "mtime": path.stat().st_mtime})
return files
def get_pgvector_sources():
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("SELECT DISTINCT source FROM embeddings WHERE source IS NOT NULL")
sources = {row[0] for row in cur.fetchall()}
pg.close()
return sources
except Exception as e:
print(f"ERROR: pgvector: {e}", file=sys.stderr)
return set()
def get_graphiti_sources():
sources = set()
try:
state_path = Path(MIGRATION_STATE)
if state_path.exists():
state = json.loads(state_path.read_text())
for filepath in state.get("ingested", []):
sources.add(Path(filepath).name)
except Exception as e:
print(f"WARNING: migration state: {e}", file=sys.stderr)
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("SELECT DISTINCT source FROM stage_3_queue WHERE completed_at IS NOT NULL")
for row in cur.fetchall(): sources.add(row[0])
pg.close()
except Exception as e:
print(f"WARNING: stage_3_queue: {e}", file=sys.stderr)
return sources
def get_ingest_failures():
failures = {}
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT source, filepath, error, retry_count, first_failed_at, last_failed_at
FROM ingest_failures WHERE resolved = FALSE ORDER BY last_failed_at DESC
""")
for row in cur.fetchall():
failures[row[0]] = {"source": row[0], "filepath": row[1], "error": row[2],
"retry_count": row[3], "first_failed_at": str(row[4]),
"last_failed_at": str(row[5])}
pg.close()
except Exception as e:
print(f"WARNING: ingest_failures: {e}", file=sys.stderr)
return failures
def extract_text_for_retry(filepath):
path = Path(filepath)
suffix = path.suffix.lower()
try:
if suffix == ".docx":
from docx import Document as D
return "\n".join(p.text for p in D(path).paragraphs if p.text.strip())
elif suffix == ".pdf":
from pypdf import PdfReader
return "".join(p.extract_text() + "\n" for p in PdfReader(path).pages if p.extract_text())
elif suffix == ".pptx":
from pptx import Presentation
prs = Presentation(path)
return "\n".join(shape.text for slide in prs.slides for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip())
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
print(f"WARNING: extraction failed {path.name}: {e}", file=sys.stderr)
return ""
def queue_for_retry(source, full_text, filepath):
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text, char_length = EXCLUDED.char_length,
enqueued_at = NOW(), completed_at = NULL, failed_at = NULL, attempts = 0
""", (source, full_text[:50000], len(full_text)))
pg.commit()
pg.close()
return True
except Exception as e:
print(f"WARNING: queue failed {source}: {e}", file=sys.stderr)
return False
def run_reconciliation(fix=False):
print(f"BirdAI Corpus Integrity Check — {datetime.now().isoformat()}")
print()
print("Scanning filesystem...")
fs_files = get_filesystem_files()
fs_sources = {f["source"]: f for f in fs_files}
print(f" Filesystem: {len(fs_files)} files")
print("Querying pgvector...")
pv_sources = get_pgvector_sources()
print(f" pgvector: {len(pv_sources)} distinct sources")
print("Querying Graphiti...")
gr_sources = get_graphiti_sources()
print(f" Graphiti: {len(gr_sources)} sources")
print("Querying ingest failures...")
failures = get_ingest_failures()
print(f" Failures: {len(failures)} unresolved")
print()
both, pv_only, neither, gr_only = [], [], [], []
for source, finfo in fs_sources.items():
in_pv = source in pv_sources
in_gr = source in gr_sources
if in_pv and in_gr: both.append(finfo)
elif in_pv: pv_only.append(finfo)
elif in_gr: gr_only.append(finfo)
else: neither.append(finfo)
orphans_pv = pv_sources - set(fs_sources.keys())
orphans_gr = gr_sources - set(fs_sources.keys())
print(f"Results:")
print(f" Both (pgvector + Graphiti): {len(both)}")
print(f" pgvector only: {len(pv_only)}")
print(f" Neither (corpus gap): {len(neither)}")
print(f" Graphiti only: {len(gr_only)}")
print(f" Ingest failures: {len(failures)}")
print(f" pgvector orphans: {len(orphans_pv)}")
print(f" Graphiti orphans: {len(orphans_gr)}")
print()
auto_queued = []
if fix and neither:
print(f"Auto-queuing {len(neither)} gap files...")
for finfo in neither:
text = extract_text_for_retry(finfo["filepath"])
if text.strip():
if queue_for_retry(finfo["source"], text, finfo["filepath"]):
auto_queued.append(finfo["source"])
print(f" Queued: {finfo['source']}")
else:
print(f" Skipped (unreadable): {finfo['source']}")
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at)
VALUES (%s, %s, %s, 0, NOW(), NOW())
ON CONFLICT (source) DO UPDATE SET
error = EXCLUDED.error,
last_failed_at = NOW()
""", (finfo["source"], finfo["filepath"],
"Empty text — likely scanned, encrypted, or corrupt. Requires manual review or OCR."))
pg.commit()
pg.close()
except Exception as e:
print(f" WARNING: could not record failure: {e}")
print()
report = {
"timestamp": datetime.now().isoformat(),
"summary": {
"filesystem_total": len(fs_files), "pgvector_total": len(pv_sources),
"graphiti_total": len(gr_sources), "both": len(both),
"pgvector_only": len(pv_only), "neither": len(neither),
"graphiti_only": len(gr_only), "failures": len(failures),
"orphans_pgvector": len(orphans_pv), "orphans_graphiti": len(orphans_gr),
},
"gaps": [f["source"] for f in neither],
"failures": list(failures.values()),
"auto_queued": auto_queued,
"pgvector_only_sample": [f["source"] for f in pv_only[:20]],
"graphiti_only": list(gr_only),
}
Path(REPORT_PATH).write_text(json.dumps(report, indent=2))
print(f"Report written to: {REPORT_PATH}")
return report
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--fix", action="store_true")
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
report = run_reconciliation(fix=args.fix)
if args.json:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
-554
View File
@@ -1,554 +0,0 @@
"""
Aaron AI Dreamer — Active Inference Engine
Interdependent stage architecture grounded in sleep consolidation research.
Nightly pipeline: NREM → Early REM → Late REM → Synthesis
Each stage receives the previous stage's output as context.
Lucid mode is on-demand only (Dream Now from settings).
Research basis:
- Singh et al. PNAS 2022: alternating NREM/REM outperforms single-stage approaches
- Klinzing et al. Nature Neuroscience 2019: SO-spindle-ripple coupling is interdependent
- REM operates on what NREM produced — stages are not discrete alternatives
"""
import os
import json
import sqlite3
import argparse
from pathlib import Path
from datetime import datetime, timedelta
from dotenv import load_dotenv
import psycopg2
import hashlib
load_dotenv(Path.home() / "aaronai" / ".env")
PG_DSN = os.getenv("PG_DSN")
def get_pg():
return psycopg2.connect(PG_DSN)
# ─── Paths ──────────────────────────────────────────────────────────────────
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json")
DREAMER_STATE = str(Path.home() / "aaronai" / "dreamer_state.json")
JOURNAL_DIR = "/home/aaron/nextcloud/data/data/aaron/files/Journal/Daily"
NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL", "https://nextcloud.aaronnelson.studio")
NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron")
NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "")
DREAMS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams"
# Similarity ranges calibrated for all-MiniLM-L6-v2
MODE_RANGES = {
"nrem": (0.48, 0.72),
"early-rem": (0.38, 0.55),
"late-rem": (0.22, 0.42),
"lucid": (0.32, 0.72),
}
# ─── Prompt versioning ──────────────────────────────────────────────────────
# Bump the relevant constant manually when changing a prompt.
PROMPT_VERSION_NREM = "1.0"
PROMPT_VERSION_EREM = "1.1"
PROMPT_VERSION_LREM = "1.2"
PROMPT_VERSION_SYN = "1.0"
def prompt_signature():
return (f"nrem={PROMPT_VERSION_NREM}|erem={PROMPT_VERSION_EREM}"
f"|lrem={PROMPT_VERSION_LREM}|syn={PROMPT_VERSION_SYN}")
def prompt_hash(prompts: list[str]) -> str:
combined = "".join(prompts)
return hashlib.md5(combined.encode()).hexdigest()[:8]
# ─── Stage 1: Observe ───────────────────────────────────────────────────────
def observe_corpus():
state = load_dreamer_state()
last_dream = state.get("last_dream_timestamp", 0)
new_chunk_count = 0
try:
watcher_state = json.loads(Path(WATCHER_STATE).read_text())
for path, mtime in watcher_state.items():
if float(mtime) > last_dream:
new_chunk_count += 1
except:
pass
days_since = (datetime.now().timestamp() - last_dream) / 86400
recent_topics = get_recent_conversation_topics()
return {
"new_chunks": new_chunk_count,
"days_since_dream": days_since,
"recent_topics": recent_topics,
"last_dream": last_dream,
}
def get_recent_conversation_topics(days=14):
try:
conn = sqlite3.connect(CONVERSATIONS_DB)
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
c = conn.cursor()
c.execute("""
SELECT m.content FROM messages m
JOIN conversations c ON m.conversation_id = c.id
WHERE m.role = 'user' AND c.updated_at > ?
ORDER BY m.timestamp DESC LIMIT 20
""", (cutoff,))
rows = c.fetchall()
conn.close()
return [r[0][:200] for r in rows]
except:
return []
# ─── Stage 2: Retrieve ──────────────────────────────────────────────────────
def retrieve(mode, task=None, n_results=8):
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("all-MiniLM-L6-v2")
low, high = MODE_RANGES[mode]
if task:
query = task
elif mode == "late-rem":
delta = observe_corpus()
topics = delta.get("recent_topics", [])
query = topics[0] if topics else "practice place memory making"
elif mode == "early-rem":
query = "career decision personal change what matters next"
else:
query = "research fabrication teaching practice recent work"
embedding = embedder.encode([query]).tolist()[0]
chunks = []
seen_sources = set()
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity
FROM embeddings
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (embedding, embedding, n_results * 3))
for doc, source, similarity in cur.fetchall():
if not (low <= similarity <= high):
continue
if source in seen_sources:
continue
chunks.append({
"source": source or "unknown",
"content": doc,
"relevance": similarity,
})
seen_sources.add(source)
if len(chunks) >= n_results:
break
pg.close()
except Exception as e:
print(f"pgvector retrieval error: {e}")
return chunks
# ─── Stage 3: Synthesize ────────────────────────────────────────────────────
def synthesize_nrem(chunks):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""You have read everything Aaron Nelson has written and published.
You are a careful colleague who noticed something this week.
Here is material from his corpus:
{chunk_text}
Write to Aaron directly. Identify one specific connection between
this material and something he wrote or worked on previously.
Stay close to the documents — cite them specifically by name.
Do not speculate beyond what the material supports. Do not use
headers or bullet points. Write one paragraph of 200-300 words
that ends with a single concrete question he could act on."""
return _call_claude(prompt)
def synthesize_early_rem(chunks, nrem_output):
# v1.1 — removed citation instruction, removed close-friend persona,
# shifted register from analysis to recognition.
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""Something was noticed earlier tonight, moving through Aaron's recent work:
{nrem_output}
That observation is still with you. Now here is material from a different
time — pulled from further back, from different parts of his corpus:
{chunk_text}
You are not analyzing. You are recognizing.
Something in the earlier observation and something in this older material
are the same thing wearing different clothes. Find it. Don't explain why
they're connected — just let the connection speak. Write from inside the
recognition, not from above it.
The emotional register underneath the career logic is more interesting
than the career logic. The pattern that has been repeating longer than
he has been aware of it is more interesting than the current instance.
Write directly to Aaron. No citations, no references, no analysis.
First person, present tense. Let what you noticed arrive rather than
be delivered. 150-250 words. End with one thing that is true that
he probably already knows but hasn't said out loud yet."""
return _call_claude(prompt)
def synthesize_late_rem(chunks, nrem_output, early_rem_output):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""You have been moving through Aaron Nelson's corpus all night.
First you found this, in the careful light of early consolidation:
{nrem_output}
Then, in the more personal territory that followed:
{early_rem_output}
Now it is late. The boundaries between things have loosened.
Here is material pulled from opposite ends of his work:
{chunk_text}
Do not explain the connections between all of this.
Do not resolve them. Do not summarize what came before.
Something stranger is possible now — let the accumulated
material from the night find its own shape. Compressed,
associative, slightly off. Let the strangeness stand.
No headers. No bullet points. No hedging. No resolution.
No offer. End mid-thought if that is where the material ends.
150-250 words."""
return _call_claude(prompt)
def synthesize_final(nrem_output, early_rem_output, late_rem_output):
prompt = f"""You have spent the night moving through Aaron Nelson's corpus
in three passes, each building on the last.
The first pass — careful, close to the documents:
{nrem_output}
The second pass — more personal, following what the first opened:
{early_rem_output}
The third pass — associative, strange, letting things touch that
don't normally touch:
{late_rem_output}
Now synthesize. Not a summary — a synthesis. Find what runs through
all three that none of them said directly. The thing that only becomes
visible when you hold all three passes together.
Write it as a single unbroken piece. No headers, no bullet points,
no stage labels. 200-300 words. End with the one question that
matters most right now."""
return _call_claude(prompt, max_tokens=800)
def synthesize_lucid(chunks, task):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""Aaron has a question he is sitting with:
{task or "What should I be thinking about that I am not?"}
You have searched his entire corpus and found material that
speaks to this question from unexpected directions. Here is
what you found:
{chunk_text}
Do not summarize. Do not list. Pick the most interesting
tension between what the corpus contains and what he is
asking, and follow it through to its conclusion. Cite
specific documents by name. Be direct about what you think.
No headers, no bullet points. 250-400 words.
End with an offer to work on it together."""
return _call_claude(prompt)
def _call_claude(prompt, max_tokens=1000):
import anthropic
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
# ─── Stage 4: Deliver ───────────────────────────────────────────────────────
def deliver(dream_text, mode, task=None):
import requests
date_str = datetime.now().strftime("%Y-%m-%d")
filename = f"{date_str}-{mode}.md"
header = f"# Dream — {mode.upper()} — {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
header += f"*prompt_sig: {prompt_signature()}*\n\n"
if task:
header += f"*Task: {task}*\n\n"
header += "---\n\n"
content = header + dream_text
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
requests.request("MKCOL", DREAMS_WEBDAV, auth=auth, timeout=10)
url = f"{DREAMS_WEBDAV}/{filename}"
counter = 1
while True:
check = requests.request("PROPFIND", url, auth=auth, timeout=10)
if check.status_code == 404:
break
filename = f"{date_str}-{mode}-{counter}.md"
url = f"{DREAMS_WEBDAV}/{filename}"
counter += 1
response = requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30)
response.raise_for_status()
print(f"Delivered: Journal/Dreams/{filename}")
return f"Journal/Dreams/{filename}"
def notify_sse(mode, filename):
try:
import requests
requests.post("http://localhost:8000/api/events/notify", json={
"type": "dream",
"mode": mode,
"filename": filename,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
}, timeout=3)
except Exception as e:
print(f"SSE notify failed (non-critical): {e}")
# ─── State ──────────────────────────────────────────────────────────────────
def load_dreamer_state():
p = Path(DREAMER_STATE)
if p.exists():
try:
return json.loads(p.read_text())
except:
pass
return {}
def save_dreamer_state(state):
Path(DREAMER_STATE).write_text(json.dumps(state, indent=2))
# ─── Orchestrators ───────────────────────────────────────────────────────────
def write_manifest(date_str, stage_data, corpus_data):
import requests
manifest = {
"date": date_str,
"prompt_sig": prompt_signature(),
"prompt_hash": prompt_hash([
synthesize_nrem.__doc__ or "",
synthesize_early_rem.__doc__ or "",
synthesize_late_rem.__doc__ or "",
synthesize_final.__doc__ or "",
]),
"stages": stage_data,
"corpus": corpus_data,
"rating": None,
"notes": "",
}
content = json.dumps(manifest, indent=2)
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
url = f"{DREAMS_WEBDAV}/dream-manifest-{date_str}.json"
try:
requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30)
print(f"Manifest written: Journal/Dreams/dream-manifest-{date_str}.json")
except Exception as e:
print(f"Manifest write failed (non-critical): {e}")
def dream_pipeline():
"""
Full nightly pipeline — interdependent stages.
NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis.
"""
print(f"Dreamer pipeline starting — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
delta = observe_corpus()
print(f"Corpus: {delta['new_chunks']} new chunks, {delta['days_since_dream']:.1f} days since last dream")
# ── Stage 1: NREM ──────────────────────────────────────────────────────
print("\n[NREM] Retrieving...")
nrem_chunks = retrieve("nrem")
if not nrem_chunks:
print("[NREM] No suitable chunks — aborting pipeline")
return None
print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...")
nrem_output = synthesize_nrem(nrem_chunks)
nrem_file = deliver(nrem_output, "nrem")
stage_data = {
"nrem": {
"chunks_retrieved": len(nrem_chunks),
"avg_similarity": round(sum(c["relevance"] for c in nrem_chunks) / len(nrem_chunks), 3),
"query": "research fabrication teaching practice recent work",
"word_count": len(nrem_output.split()),
"status": "ok",
}
}
print(f"[NREM] Done.\n{nrem_output[:200]}...")
# ── Stage 2: Early REM — informed by NREM ──────────────────────────────
print("\n[Early REM] Retrieving...")
early_chunks = retrieve("early-rem")
if not early_chunks:
print("[Early REM] No suitable chunks — skipping")
early_rem_output = nrem_output # fallback
else:
print(f"[Early REM] Retrieved {len(early_chunks)} chunks. Synthesizing with NREM context...")
early_rem_output = synthesize_early_rem(early_chunks, nrem_output)
deliver(early_rem_output, "early-rem")
stage_data["early_rem"] = {
"chunks_retrieved": len(early_chunks),
"avg_similarity": round(sum(c["relevance"] for c in early_chunks) / len(early_chunks), 3),
"query": "career decision personal change what matters next",
"word_count": len(early_rem_output.split()),
"status": "ok",
}
print(f"[Early REM] Done.\n{early_rem_output[:200]}...")
# ── Stage 3: Late REM — informed by NREM + Early REM ──────────────────
print("\n[Late REM] Retrieving...")
late_chunks = retrieve("late-rem")
if not late_chunks:
print("[Late REM] No suitable chunks — skipping")
late_rem_output = early_rem_output # fallback
else:
print(f"[Late REM] Retrieved {len(late_chunks)} chunks. Synthesizing with full context...")
late_rem_output = synthesize_late_rem(late_chunks, nrem_output, early_rem_output)
deliver(late_rem_output, "late-rem")
stage_data["late_rem"] = {
"chunks_retrieved": len(late_chunks),
"avg_similarity": round(sum(c["relevance"] for c in late_chunks) / len(late_chunks), 3),
"query": "practice place memory making",
"word_count": len(late_rem_output.split()),
"status": "ok",
}
print(f"[Late REM] Done.\n{late_rem_output[:200]}...")
# ── Stage 4: Synthesis — all three stages ─────────────────────────────
print("\n[Synthesis] Integrating all stages...")
synthesis_output = synthesize_final(nrem_output, early_rem_output, late_rem_output)
synthesis_file = deliver(synthesis_output, "synthesis")
stage_data["synthesis"] = {
"word_count": len(synthesis_output.split()),
"status": "ok",
}
print(f"\n{'='*60}")
print("SYNTHESIS:")
print(synthesis_output)
print(f"{'='*60}")
# Write manifest
corpus_data = {
"total_chunks": delta.get("new_chunks", 0),
"new_chunks_since_last_dream": delta.get("new_chunks", 0),
"days_since_last_dream": round(delta.get("days_since_dream", 0), 2),
}
write_manifest(datetime.now().strftime("%Y-%m-%d"), stage_data, corpus_data)
# Update state and notify
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = "pipeline"
state["last_dream_file"] = synthesis_file
save_dreamer_state(state)
notify_sse("synthesis", synthesis_file.split("/")[-1])
print(f"\nPipeline complete. Synthesis: {synthesis_file}")
return synthesis_file
def dream_lucid(task):
"""On-demand lucid dream — single mode, used by Dream Now in settings."""
print(f"Lucid dream starting — task: {task[:80] if task else 'none'}")
chunks = retrieve("lucid", task=task)
if not chunks:
print("No suitable chunks — aborting")
return None
print(f"Retrieved {len(chunks)} chunks. Synthesizing...")
output = synthesize_lucid(chunks, task)
filepath = deliver(output, "lucid", task=task)
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = "lucid"
state["last_dream_file"] = filepath
save_dreamer_state(state)
notify_sse("lucid", filepath.split("/")[-1])
print(f"\n{'='*60}")
print(output)
print(f"{'='*60}")
print(f"\nDelivered to {filepath}")
return filepath
def dream_single(mode, task=None):
"""
Single mode — used by Dream Now for non-lucid modes.
Runs one stage independently (for testing/tuning individual stages).
"""
print(f"Single mode dream: {mode}")
chunks = retrieve(mode, task=task)
if not chunks:
print("No suitable chunks — aborting")
return None
print(f"Retrieved {len(chunks)} chunks. Synthesizing...")
if mode == "nrem":
output = synthesize_nrem(chunks)
elif mode == "early-rem":
output = synthesize_early_rem(chunks, "")
elif mode == "late-rem":
output = synthesize_late_rem(chunks, "", "")
else:
output = synthesize_lucid(chunks, task)
filepath = deliver(output, mode, task=task)
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = mode
state["last_dream_file"] = filepath
save_dreamer_state(state)
notify_sse(mode, filepath.split("/")[-1])
print(f"\n{'='*60}")
print(output)
print(f"{'='*60}")
print(f"\nDelivered to {filepath}")
return filepath
# ─── CLI ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Aaron AI Dreamer")
parser.add_argument("--mode", choices=["nrem", "early-rem", "late-rem", "lucid", "pipeline"])
parser.add_argument("--task", type=str)
args = parser.parse_args()
if args.mode == "lucid":
dream_lucid(args.task or "What should I be thinking about that I am not?")
elif args.mode and args.mode != "pipeline":
dream_single(args.mode, args.task)
else:
# Default: full pipeline
dream_pipeline()
-668
View File
@@ -1,668 +0,0 @@
"""
Aaron AI Dreamer — Active Inference Engine
Interdependent stage architecture grounded in sleep consolidation research.
Nightly pipeline: NREM → Early REM → Late REM → Synthesis
Each stage receives the previous stage's output as context.
Lucid mode is on-demand only (Dream Now from settings).
Research basis:
- Singh et al. PNAS 2022: alternating NREM/REM outperforms single-stage approaches
- Klinzing et al. Nature Neuroscience 2019: SO-spindle-ripple coupling is interdependent
- REM operates on what NREM produced — stages are not discrete alternatives
"""
import os
import json
import sqlite3
import argparse
from pathlib import Path
from datetime import datetime, timedelta
from dotenv import load_dotenv
import psycopg2
import hashlib
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
PG_DSN = os.getenv("PG_DSN")
def get_pg():
return psycopg2.connect(PG_DSN)
# ─── Paths ──────────────────────────────────────────────────────────────────
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json")
DREAMER_STATE = str(Path.home() / "aaronai" / "dreamer_state.json")
JOURNAL_DIR = "/home/aaron/nextcloud/data/data/aaron/files/Journal/Daily"
NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL", "https://nextcloud.aaronnelson.studio")
NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron")
NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "")
DREAMS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams"
# Similarity ranges calibrated for all-MiniLM-L6-v2
MODE_RANGES = {
"nrem": (0.48, 0.72),
"early-rem": (0.38, 0.55),
"late-rem": (0.22, 0.42),
"lucid": (0.32, 0.72),
}
DREAMER_VERSION = "1.1" # 1.0=original exclusion logic; 1.1=score-band exclusion
# ─── Prompt versioning ──────────────────────────────────────────────────────
# Bump the relevant constant manually when changing a prompt.
PROMPT_VERSION_NREM = "1.0"
PROMPT_VERSION_EREM = "1.1"
PROMPT_VERSION_LREM = "1.2"
PROMPT_VERSION_SYN = "1.0"
def prompt_signature():
return (f"nrem={PROMPT_VERSION_NREM}|erem={PROMPT_VERSION_EREM}"
f"|lrem={PROMPT_VERSION_LREM}|syn={PROMPT_VERSION_SYN}")
def prompt_hash(prompts: list[str]) -> str:
combined = "".join(prompts)
return hashlib.md5(combined.encode()).hexdigest()[:8]
def extract_folder(source_path):
"""Extract top-level Nextcloud folder from source path."""
parts = source_path.replace("\\", "/").split("/")
return parts[0] if parts else "unknown"
# ─── Stage 1: Observe ───────────────────────────────────────────────────────
def observe_corpus():
state = load_dreamer_state()
last_dream = state.get("last_dream_timestamp", 0)
new_chunk_count = 0
try:
watcher_state = json.loads(Path(WATCHER_STATE).read_text())
for path, mtime in watcher_state.items():
if float(mtime) > last_dream:
new_chunk_count += 1
except:
pass
days_since = (datetime.now().timestamp() - last_dream) / 86400
recent_topics = get_recent_conversation_topics()
return {
"new_chunks": new_chunk_count,
"days_since_dream": days_since,
"recent_topics": recent_topics,
"last_dream": last_dream,
}
def get_recent_conversation_topics(days=14):
try:
conn = sqlite3.connect(CONVERSATIONS_DB)
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
c = conn.cursor()
c.execute("""
SELECT m.content FROM messages m
JOIN conversations c ON m.conversation_id = c.id
WHERE m.role = 'user' AND c.updated_at > ?
ORDER BY m.timestamp DESC LIMIT 20
""", (cutoff,))
rows = c.fetchall()
conn.close()
return [r[0][:200] for r in rows]
except:
return []
# ─── Stage 2: Retrieve ──────────────────────────────────────────────────────
def retrieve_graphiti(mode, task=None, n_results=8):
"""E3 experiment — Graphiti substrate retrieval.
Queries Graphiti /search endpoint instead of pgvector.
Returns chunks in same format as retrieve() for pipeline compatibility.
Note: content is Graphiti facts (synthesized relationships), not raw chunks.
"""
import requests as req_lib
if task:
query = task
elif mode == "late-rem":
delta = observe_corpus()
topics = delta.get("recent_topics", [])
query = topics[0] if topics else "practice place memory making"
elif mode == "early-rem":
query = "career decision personal change what matters next"
else:
query = "research fabrication teaching practice recent work"
try:
resp = req_lib.get(
"http://localhost:8001/search",
params={"query": query, "limit": n_results, "group_id": "aaron"},
timeout=30,
)
resp.raise_for_status()
results = resp.json().get("results", [])
chunks = []
for r in results:
fact = r.get("fact", "")
if not fact.strip():
continue
chunks.append({
"source": r.get("source", "graphiti"),
"content": fact,
"relevance": r.get("score", 0.5),
"similarity": r.get("score", 0.5),
})
return chunks
except Exception as e:
print(f"[Graphiti retrieval error: {e}] — falling back to empty.")
return []
def retrieve(mode, task=None, n_results=8, excluded_sources=None):
# E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search
# Default behavior: pgvector similarity search (unchanged)
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
if substrate == "graphiti":
return retrieve_graphiti(mode, task=task, n_results=n_results)
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("all-MiniLM-L6-v2")
low, high = MODE_RANGES[mode]
if task:
query = task
elif mode == "late-rem":
delta = observe_corpus()
topics = delta.get("recent_topics", [])
query = topics[0] if topics else "practice place memory making"
elif mode == "early-rem":
query = "career decision personal change what matters next"
else:
query = "research fabrication teaching practice recent work"
embedding = embedder.encode([query]).tolist()[0]
chunks = []
seen_sources = set()
try:
pg = get_pg()
cur = pg.cursor()
excluded_sources = excluded_sources or set()
if excluded_sources:
cur.execute("""
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity
FROM embeddings
WHERE source NOT IN %s
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (embedding, tuple(excluded_sources), embedding, n_results * 3))
else:
cur.execute("""
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity
FROM embeddings
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (embedding, embedding, n_results * 3))
for doc, source, similarity in cur.fetchall():
if not (low <= similarity <= high):
continue
if source in seen_sources:
continue
chunks.append({
"source": source or "unknown",
"content": doc,
"relevance": similarity,
"similarity": similarity,
})
seen_sources.add(source)
if len(chunks) >= n_results:
break
pg.close()
except Exception as e:
print(f"pgvector retrieval error: {e}")
return chunks
# ─── Stage 3: Synthesize ────────────────────────────────────────────────────
def synthesize_nrem(chunks):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""You have read everything Aaron Nelson has written and published.
You are a careful colleague who noticed something this week.
Here is material from his corpus:
{chunk_text}
Write to Aaron directly. Identify one specific connection between
this material and something he wrote or worked on previously.
Stay close to the documents — cite them specifically by name.
Do not speculate beyond what the material supports. Do not use
headers or bullet points. Write one paragraph of 200-300 words
that ends with a single concrete question he could act on."""
return _call_claude(prompt)
def synthesize_early_rem(chunks, nrem_output):
# v1.1 — removed citation instruction, removed close-friend persona,
# shifted register from analysis to recognition.
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""Something was noticed earlier tonight, moving through Aaron's recent work:
{nrem_output}
That observation is still with you. Now here is material from a different
time — pulled from further back, from different parts of his corpus:
{chunk_text}
You are not analyzing. You are recognizing.
Something in the earlier observation and something in this older material
are the same thing wearing different clothes. Find it. Don't explain why
they're connected — just let the connection speak. Write from inside the
recognition, not from above it.
The emotional register underneath the career logic is more interesting
than the career logic. The pattern that has been repeating longer than
he has been aware of it is more interesting than the current instance.
Write directly to Aaron. No citations, no references, no analysis.
First person, present tense. Let what you noticed arrive rather than
be delivered. 150-250 words. End with one thing that is true that
he probably already knows but hasn't said out loud yet."""
return _call_claude(prompt)
def synthesize_late_rem(chunks, nrem_output, early_rem_output):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""You have been moving through Aaron Nelson's corpus all night.
First you found this, in the careful light of early consolidation:
{nrem_output}
Then, in the more personal territory that followed:
{early_rem_output}
Now it is late. The boundaries between things have loosened.
Here is material pulled from opposite ends of his work:
{chunk_text}
Do not explain the connections between all of this.
Do not resolve them. Do not summarize what came before.
Something stranger is possible now — let the accumulated
material from the night find its own shape. Compressed,
associative, slightly off. Let the strangeness stand.
No headers. No bullet points. No hedging. No resolution.
No offer. End mid-thought if that is where the material ends.
150-250 words."""
return _call_claude(prompt)
def synthesize_final(nrem_output, early_rem_output, late_rem_output):
prompt = f"""You have spent the night moving through Aaron Nelson's corpus
in three passes, each building on the last.
The first pass — careful, close to the documents:
{nrem_output}
The second pass — more personal, following what the first opened:
{early_rem_output}
The third pass — associative, strange, letting things touch that
don't normally touch:
{late_rem_output}
Now synthesize. Not a summary — a synthesis. Find what runs through
all three that none of them said directly. The thing that only becomes
visible when you hold all three passes together.
Write it as a single unbroken piece. No headers, no bullet points,
no stage labels. 200-300 words. End with the one question that
matters most right now."""
return _call_claude(prompt, max_tokens=800)
def synthesize_lucid(chunks, task):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""Aaron has a question he is sitting with:
{task or "What should I be thinking about that I am not?"}
You have searched his entire corpus and found material that
speaks to this question from unexpected directions. Here is
what you found:
{chunk_text}
Do not summarize. Do not list. Pick the most interesting
tension between what the corpus contains and what he is
asking, and follow it through to its conclusion. Cite
specific documents by name. Be direct about what you think.
No headers, no bullet points. 250-400 words.
End with an offer to work on it together."""
return _call_claude(prompt)
def _call_claude(prompt, max_tokens=1000):
import anthropic
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
# ─── Stage 4: Deliver ───────────────────────────────────────────────────────
def deliver(dream_text, mode, task=None):
import requests
date_str = datetime.now().strftime("%Y-%m-%d")
filename = f"{date_str}-{mode}.md"
header = f"# Dream — {mode.upper()} — {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
header += f"*prompt_sig: {prompt_signature()}*\n\n"
if task:
header += f"*Task: {task}*\n\n"
header += "---\n\n"
content = header + dream_text
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
requests.request("MKCOL", DREAMS_WEBDAV, auth=auth, timeout=10)
url = f"{DREAMS_WEBDAV}/{filename}"
counter = 1
while True:
check = requests.request("PROPFIND", url, auth=auth, timeout=10)
if check.status_code == 404:
break
filename = f"{date_str}-{mode}-{counter}.md"
url = f"{DREAMS_WEBDAV}/{filename}"
counter += 1
response = requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30)
response.raise_for_status()
print(f"Delivered: Journal/Dreams/{filename}")
return f"Journal/Dreams/{filename}"
def notify_sse(mode, filename):
try:
import requests
requests.post("http://localhost:8000/api/events/notify", json={
"type": "dream",
"mode": mode,
"filename": filename,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
}, timeout=3)
except Exception as e:
print(f"SSE notify failed (non-critical): {e}")
# ─── State ──────────────────────────────────────────────────────────────────
def load_dreamer_state():
p = Path(DREAMER_STATE)
if p.exists():
try:
return json.loads(p.read_text())
except:
pass
return {}
def save_dreamer_state(state):
Path(DREAMER_STATE).write_text(json.dumps(state, indent=2))
# ─── Orchestrators ───────────────────────────────────────────────────────────
def write_manifest(date_str, stage_data, corpus_data):
import requests
manifest = {
"date": date_str,
"prompt_sig": prompt_signature(),
"dreamer_version": DREAMER_VERSION,
"prompt_hash": prompt_hash([
synthesize_nrem.__doc__ or "",
synthesize_early_rem.__doc__ or "",
synthesize_late_rem.__doc__ or "",
synthesize_final.__doc__ or "",
]),
"stages": stage_data,
"corpus": corpus_data,
"rating": None,
"notes": "",
}
content = json.dumps(manifest, indent=2)
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
url = f"{DREAMS_WEBDAV}/dream-manifest-{date_str}.json"
try:
requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30)
print(f"Manifest written: Journal/Dreams/dream-manifest-{date_str}.json")
except Exception as e:
print(f"Manifest write failed (non-critical): {e}")
def dream_pipeline():
"""
Full nightly pipeline — interdependent stages.
NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis.
"""
print(f"Dreamer pipeline starting — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
state = load_dreamer_state()
previously_retrieved = set(state.get("retrieved_sources", []))
session_retrieved = set()
delta = observe_corpus()
print(f"Corpus: {delta['new_chunks']} new chunks, {delta['days_since_dream']:.1f} days since last dream")
print(f"Excluding {len(previously_retrieved)} previously retrieved sources")
# ── Stage 1: NREM ──────────────────────────────────────────────────────
print("\n[NREM] Retrieving...")
nrem_chunks = retrieve("nrem", excluded_sources=previously_retrieved | session_retrieved)
session_retrieved.update(c["source"] for c in nrem_chunks)
# Track sources that scored above Early REM ceiling — these are the only ones Early REM should exclude
nrem_high_sources = {c["source"] for c in nrem_chunks if c["similarity"] > 0.55}
if not nrem_chunks:
print("[NREM] No suitable chunks — aborting pipeline")
return None
print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...")
nrem_output = synthesize_nrem(nrem_chunks)
nrem_file = deliver(nrem_output, "nrem")
nrem_sources = [c["source"] for c in nrem_chunks]
nrem_folders = list({extract_folder(s) for s in nrem_sources})
stage_data = {
"nrem": {
"chunks_retrieved": len(nrem_chunks),
"avg_similarity": round(sum(c["relevance"] for c in nrem_chunks) / len(nrem_chunks), 3),
"query": "research fabrication teaching practice recent work",
"word_count": len(nrem_output.split()),
"sources": nrem_sources,
"distinct_folders": nrem_folders,
"folder_count": len(nrem_folders),
"status": "ok",
}
}
print(f"[NREM] Done.\n{nrem_output[:200]}...")
# ── Stage 2: Early REM — informed by NREM ──────────────────────────────
print("\n[Early REM] Retrieving...")
# Early REM excludes previously retrieved + NREM high-scorers only (not full session_retrieved)
# Sources that scored in Early REM band during NREM remain available
early_chunks = retrieve("early-rem", excluded_sources=previously_retrieved | nrem_high_sources)
session_retrieved.update(c["source"] for c in early_chunks)
if not early_chunks:
print("[Early REM] No suitable chunks — skipping")
early_rem_output = nrem_output # fallback
else:
print(f"[Early REM] Retrieved {len(early_chunks)} chunks. Synthesizing with NREM context...")
early_rem_output = synthesize_early_rem(early_chunks, nrem_output)
deliver(early_rem_output, "early-rem")
early_sources = [c["source"] for c in early_chunks]
early_folders = list({extract_folder(s) for s in early_sources})
stage_data["early_rem"] = {
"chunks_retrieved": len(early_chunks),
"avg_similarity": round(sum(c["relevance"] for c in early_chunks) / len(early_chunks), 3),
"query": "career decision personal change what matters next",
"word_count": len(early_rem_output.split()),
"sources": early_sources,
"distinct_folders": early_folders,
"folder_count": len(early_folders),
"status": "ok",
}
print(f"[Early REM] Done.\n{early_rem_output[:200]}...")
# ── Stage 3: Late REM — informed by NREM + Early REM ──────────────────
print("\n[Late REM] Retrieving...")
late_chunks = retrieve("late-rem", excluded_sources=previously_retrieved | session_retrieved)
session_retrieved.update(c["source"] for c in late_chunks)
if not late_chunks:
print("[Late REM] No suitable chunks — skipping")
late_rem_output = early_rem_output # fallback
else:
print(f"[Late REM] Retrieved {len(late_chunks)} chunks. Synthesizing with full context...")
late_rem_output = synthesize_late_rem(late_chunks, nrem_output, early_rem_output)
deliver(late_rem_output, "late-rem")
late_sources = [c["source"] for c in late_chunks]
late_folders = [extract_folder(s) for s in late_sources]
cross_domain_pairs = sum(
1 for i in range(len(late_folders))
for j in range(i+1, len(late_folders))
if late_folders[i] != late_folders[j]
)
stage_data["late_rem"] = {
"chunks_retrieved": len(late_chunks),
"avg_similarity": round(sum(c["relevance"] for c in late_chunks) / len(late_chunks), 3),
"query": "practice place memory making",
"word_count": len(late_rem_output.split()),
"sources": late_sources,
"distinct_folders": list(set(late_folders)),
"folder_count": len(set(late_folders)),
"cross_domain_pairs": cross_domain_pairs,
"status": "ok",
}
print(f"[Late REM] Done.\n{late_rem_output[:200]}...")
# ── Stage 4: Synthesis — all three stages ─────────────────────────────
print("\n[Synthesis] Integrating all stages...")
synthesis_output = synthesize_final(nrem_output, early_rem_output, late_rem_output)
synthesis_file = deliver(synthesis_output, "synthesis")
stage_data["synthesis"] = {
"word_count": len(synthesis_output.split()),
"status": "ok",
}
print(f"\n{'='*60}")
print("SYNTHESIS:")
print(synthesis_output)
print(f"{'='*60}")
# Write manifest
all_session_sources = list(session_retrieved)
all_session_folders = list({extract_folder(s) for s in all_session_sources})
corpus_data = {
"total_chunks": delta.get("new_chunks", 0),
"new_chunks_since_last_dream": delta.get("new_chunks", 0),
"days_since_last_dream": round(delta.get("days_since_dream", 0), 2),
"substrate": "pgvector",
"aggregate": {
"total_distinct_sources": len(all_session_sources),
"total_distinct_folders": len(all_session_folders),
"folders_touched": all_session_folders,
}
}
write_manifest(datetime.now().strftime("%Y-%m-%d"), stage_data, corpus_data)
# Update state and notify
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = "pipeline"
state["last_dream_file"] = synthesis_file
# Accumulate retrieved sources across nights. Cap at 500, trim to 400 on overflow.
all_retrieved = list(previously_retrieved | session_retrieved)
if len(all_retrieved) > 500:
all_retrieved = all_retrieved[-400:]
state["retrieved_sources"] = all_retrieved
save_dreamer_state(state)
notify_sse("synthesis", synthesis_file.split("/")[-1])
print(f"\nPipeline complete. Synthesis: {synthesis_file}")
return synthesis_file
def dream_lucid(task):
"""On-demand lucid dream — single mode, used by Dream Now in settings."""
print(f"Lucid dream starting — task: {task[:80] if task else 'none'}")
chunks = retrieve("lucid", task=task)
if not chunks:
print("No suitable chunks — aborting")
return None
print(f"Retrieved {len(chunks)} chunks. Synthesizing...")
output = synthesize_lucid(chunks, task)
filepath = deliver(output, "lucid", task=task)
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = "lucid"
state["last_dream_file"] = filepath
save_dreamer_state(state)
notify_sse("lucid", filepath.split("/")[-1])
print(f"\n{'='*60}")
print(output)
print(f"{'='*60}")
print(f"\nDelivered to {filepath}")
return filepath
def dream_single(mode, task=None):
"""
Single mode — used by Dream Now for non-lucid modes.
Runs one stage independently (for testing/tuning individual stages).
"""
print(f"Single mode dream: {mode}")
chunks = retrieve(mode, task=task)
if not chunks:
print("No suitable chunks — aborting")
return None
print(f"Retrieved {len(chunks)} chunks. Synthesizing...")
if mode == "nrem":
output = synthesize_nrem(chunks)
elif mode == "early-rem":
output = synthesize_early_rem(chunks, "")
elif mode == "late-rem":
output = synthesize_late_rem(chunks, "", "")
else:
output = synthesize_lucid(chunks, task)
filepath = deliver(output, mode, task=task)
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = mode
state["last_dream_file"] = filepath
save_dreamer_state(state)
notify_sse(mode, filepath.split("/")[-1])
print(f"\n{'='*60}")
print(output)
print(f"{'='*60}")
print(f"\nDelivered to {filepath}")
return filepath
# ─── CLI ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Aaron AI Dreamer")
parser.add_argument("--mode", choices=["nrem", "early-rem", "late-rem", "lucid", "pipeline"])
parser.add_argument("--task", type=str)
args = parser.parse_args()
if args.mode == "lucid":
dream_lucid(args.task or "What should I be thinking about that I am not?")
elif args.mode and args.mode != "pipeline":
dream_single(args.mode, args.task)
else:
# Default: full pipeline
dream_pipeline()
-171
View File
@@ -1,171 +0,0 @@
"""
Aaron AI — Graphiti Sidecar Service
Wraps graphiti-core in a FastAPI service to avoid asyncio event loop conflicts.
Port 8001 (internal only). No OpenAI dependency.
"""
import os, logging, sys
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
load_dotenv(Path.home() / "aaronai" / ".env")
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
log = logging.getLogger("graphiti-sidecar")
GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "aaron")
FALKORDB_HOST = os.getenv("FALKORDB_HOST", "localhost")
FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379"))
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic")
LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6")
LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
os.environ["EMBEDDING_DIM"] = "384"
def get_llm_client():
from graphiti_core.llm_client.config import LLMConfig
config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL)
if LLM_PROVIDER == "anthropic":
from graphiti_core.llm_client.anthropic_client import AnthropicClient
return AnthropicClient(config)
elif LLM_PROVIDER == "openai":
from graphiti_core.llm_client.openai_client import OpenAIClient
return OpenAIClient(config)
elif LLM_PROVIDER == "gemini":
from graphiti_core.llm_client.gemini_client import GeminiClient
return GeminiClient(config)
elif LLM_PROVIDER == "groq":
from graphiti_core.llm_client.groq_client import GroqClient
return GroqClient(config)
raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}")
graphiti_instance = None
async def get_graphiti():
if graphiti_instance is None:
raise HTTPException(status_code=503, detail="Graphiti not initialized")
return graphiti_instance
@asynccontextmanager
async def lifespan(app: FastAPI):
global graphiti_instance
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
log.info("Loading embedding and reranker models...")
from st_embedder import SentenceTransformerEmbedder
from graphiti_core.cross_encoder.bge_reranker_client import BGERerankerClient
from graphiti_core.driver.falkordb_driver import FalkorDriver
from graphiti_core import Graphiti
log.info(f"Connecting to FalkorDB at {FALKORDB_HOST}:{FALKORDB_PORT}...")
graphiti_instance = Graphiti(
llm_client=get_llm_client(),
embedder=SentenceTransformerEmbedder(),
cross_encoder=BGERerankerClient(),
graph_driver=FalkorDriver(host=FALKORDB_HOST, port=FALKORDB_PORT),
)
await graphiti_instance.build_indices_and_constraints()
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
yield
await graphiti_instance.close()
app = FastAPI(title="Aaron AI Graphiti Sidecar", lifespan=lifespan)
class BulkEpisodeItem(BaseModel):
name: str
content: str
source_description: str = ""
timestamp: str | None = None
class BulkEpisodeRequest(BaseModel):
episodes: list[BulkEpisodeItem]
group_id: str | None = None
class EpisodeRequest(BaseModel):
name: str
content: str
source_description: str = ""
timestamp: str | None = None
group_id: str | None = None
@app.get("/health")
async def health():
return {"ok": True, "provider": LLM_PROVIDER, "group": GROUP_ID}
@app.post("/episodes")
async def add_episode(req: EpisodeRequest):
g = await get_graphiti()
from graphiti_core.nodes import EpisodeType
try:
ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now()
await g.add_episode(
name=req.name,
episode_body=req.content,
source=EpisodeType.text,
reference_time=ref_time,
source_description=req.source_description,
group_id=req.group_id or GROUP_ID,
)
return {"ok": True}
except Exception as e:
log.error(f"Episode ingestion failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/episodes/bulk")
async def add_episodes_bulk(req: BulkEpisodeRequest):
g = await get_graphiti()
from graphiti_core.nodes import EpisodeType
from graphiti_core.utils.bulk_utils import RawEpisode
raw_episodes = []
for ep in req.episodes:
ref_time = datetime.fromisoformat(ep.timestamp) if ep.timestamp else datetime.now()
raw_episodes.append(RawEpisode(
name=ep.name,
content=ep.content,
source_description=ep.source_description,
source=EpisodeType.text,
reference_time=ref_time,
))
try:
result = await g.add_episode_bulk(
bulk_episodes=raw_episodes,
group_id=req.group_id or GROUP_ID,
)
return {"ok": True, "count": len(raw_episodes)}
except Exception as e:
log.error(f"Bulk ingestion failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/search")
async def search(query: str, limit: int = 8, group_id: str | None = None):
g = await get_graphiti()
try:
results = await g.search(
query=query,
num_results=limit,
group_ids=[group_id or GROUP_ID],
)
return {
"results": [
{
"fact": r.fact,
"source": getattr(r, "source_node_uuid", ""),
"score": getattr(r, "score", 0),
"valid_at": str(getattr(r, "valid_at", "")),
"invalid_at": str(getattr(r, "invalid_at", "")),
}
for r in results
]
}
except Exception as e:
log.error(f"Search failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info")
-182
View File
@@ -1,182 +0,0 @@
import os
import sys
import hashlib
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
import psycopg2.extras
import json
from sentence_transformers import SentenceTransformer
from docx import Document
from pypdf import PdfReader
from pptx import Presentation
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
PG_DSN = os.getenv("PG_DSN")
def get_pg():
return psycopg2.connect(PG_DSN)
def extract_text_from_docx(path):
doc = Document(path)
return "\n".join([para.text for para in doc.paragraphs if para.text.strip()])
def extract_text_from_pdf(path):
reader = PdfReader(path)
text = ""
for page in reader.pages:
extracted = page.extract_text()
if extracted:
text += extracted + "\n"
return text
def extract_text_from_pptx(path):
prs = Presentation(path)
text = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
text += shape.text + "\n"
return text
def extract_text_from_txt(path):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
def chunk_text(text, chunk_size=500, overlap=50):
words = text.split()
chunks = []
start = 0
while start < len(words):
end = start + chunk_size
chunk = " ".join(words[start:end])
if chunk.strip():
chunks.append(chunk)
start += chunk_size - overlap
return chunks
def make_id(filepath, chunk_index):
path_hash = hashlib.md5(str(filepath).encode()).hexdigest()[:8]
return f"{path_hash}_{chunk_index}"
def enqueue_stage2(source, full_text):
"""Enqueue document for Stage 2 (Mistral orientation) → Stage 3 (Graphiti ingest).
TEMPORARY: this queue feed will be removed when pgvector is decommissioned
and the watcher calls Stage 2 directly.
"""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
char_length = EXCLUDED.char_length,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text[:50000], len(full_text)))
pg.commit()
pg.close()
except Exception as e:
print(f" Stage 2 queue insert failed (non-fatal): {e}")
def ingest_file(filepath):
path = Path(filepath)
suffix = path.suffix.lower()
if path.name.startswith("~$") or path.name.startswith("."):
return 0
try:
if suffix == ".docx":
text = extract_text_from_docx(path)
elif suffix == ".pdf":
text = extract_text_from_pdf(path)
elif suffix == ".pptx":
text = extract_text_from_pptx(path)
elif suffix in [".txt", ".md"]:
text = extract_text_from_txt(path)
else:
return 0
if not text.strip():
return 0
chunks = chunk_text(text)
if not chunks:
return 0
embeddings = embedder.encode(chunks).tolist()
ids = [make_id(path, i) for i in range(len(chunks))]
metadatas = [{
"source": path.name,
"filepath": str(path),
"folder": str(path.parent.relative_to(Path(sys.argv[1]) if len(sys.argv) > 1 else path.parent))
} for _ in chunks]
# STAGE 1: Write to pgvector (TEMPORARY — remove when chat agent migrates to Graphiti)
pg = get_pg()
cur = pg.cursor()
for chunk_id, chunk, embedding, meta in zip(ids, chunks, embeddings, metadatas):
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (
chunk_id, chunk, embedding,
meta.get("source"), "document", None,
json.dumps(meta)
))
pg.commit()
pg.close()
print(f" Indexed {len(chunks)} chunks: {path.name}")
# Enqueue for Stage 2 → Stage 3 (Graphiti pipeline)
# SKIP_STAGE2_ENQUEUE env var set by migration scripts to prevent bulk enqueue
if not os.getenv("SKIP_STAGE2_ENQUEUE"):
enqueue_stage2(path.name, text)
return len(chunks)
except Exception as e:
print(f" Error: {path.name}: {e}")
return 0
def ingest_folder(folder_path):
folder = Path(folder_path)
if not folder.exists():
print(f"Folder not found: {folder_path}")
sys.exit(1)
supported = [".docx", ".pdf", ".pptx", ".txt", ".md"]
files = [f for f in folder.rglob("*")
if f.suffix.lower() in supported
and not f.name.startswith("~$")
and not f.name.startswith(".")]
if not files:
print("No supported files found.")
sys.exit(1)
print(f"Found {len(files)} files to process\n")
total_chunks = 0
for f in files:
total_chunks += ingest_file(f)
print(f"\nDone. Total chunks indexed: {total_chunks}")
if __name__ == "__main__":
target = sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "aaronai" / "docs")
print(f"Ingesting from: {target}\n")
ingest_folder(target)
+111 -20
View File
@@ -9,10 +9,19 @@ write lock contention during entity deduplication. Chunking at ~500 words
Each document's chunks are linked via Graphiti's saga mechanism, preserving Each document's chunks are linked via Graphiti's saga mechanism, preserving
document structure in the graph. document structure in the graph.
Saga-size limit (MAX_CHUNKS_PER_SAGA): 2026-05-01 incident showed sagas of
17 and 19 chunks deadlock the sidecar's Python-side coordination. Documents
producing more than MAX_CHUNKS_PER_SAGA chunks are split into multiple bulk
commits, each tagged with the same saga value so Graphiti still links them.
Wedge detection: 2026-05-01 incident also surfaced the asymmetry with Stage 2 —
Stage 3 had no recovery path when the sidecar deadlocked. Now mirrors Stage 2's
consecutive_failures pattern with sidecar restart on threshold.
Runs as systemd service: aaronai-stage3.service Runs as systemd service: aaronai-stage3.service
""" """
import os, json, time, logging, requests import os, json, time, logging, subprocess, requests
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -35,13 +44,16 @@ HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat")
RETRY_ATTEMPTS = 2 RETRY_ATTEMPTS = 2
POLL_INTERVAL = 5 POLL_INTERVAL = 5
INGEST_TIMEOUT = 600 INGEST_TIMEOUT = 600
WORKER_VERSION = "2.0" WORKER_VERSION = "2.1"
# Match Stage 1 chunking parameters # Match Stage 1 chunking parameters
CHUNK_SIZE_WORDS = 500 CHUNK_SIZE_WORDS = 500
CHUNK_OVERLAP_WORDS = 50 CHUNK_OVERLAP_WORDS = 50
# Documents under this threshold ingested as single episode (no chunking overhead) # Documents under this threshold ingested as single episode (no chunking overhead)
SINGLE_EPISODE_THRESHOLD = 1500 SINGLE_EPISODE_THRESHOLD = 1500
# Sagas larger than this many chunks split into multiple commits
# (2026-05-01 incident: 17 and 19 chunk sagas deadlocked sidecar)
MAX_CHUNKS_PER_SAGA = 10
def get_pg(): def get_pg():
@@ -56,6 +68,30 @@ def write_heartbeat():
pass pass
def recover_wedge():
"""Restart Graphiti sidecar when consecutive failures suggest deadlock.
Mirrors Stage 2's recover_wedge() for ollama. Requires passwordless sudo
for `systemctl restart aaronai-graphiti.service` for the worker's user."""
log.warning("Graphiti wedge detected — restarting sidecar")
subprocess.run(
["sudo", "systemctl", "restart", "aaronai-graphiti.service"],
capture_output=True
)
# Sidecar needs longer than ollama for model loading (sentence-transformers
# + BGE reranker + Graphiti library init)
time.sleep(45)
for _ in range(3):
try:
r = requests.get(f"{GRAPHITI_URL}/health", timeout=10)
if r.status_code == 200:
log.info("Graphiti recovered")
return True
except Exception:
time.sleep(10)
log.error("Graphiti recovery failed")
return False
def chunk_text(text, chunk_size=CHUNK_SIZE_WORDS, overlap=CHUNK_OVERLAP_WORDS): def chunk_text(text, chunk_size=CHUNK_SIZE_WORDS, overlap=CHUNK_OVERLAP_WORDS):
"""Split text into word-based chunks matching Stage 1 chunking.""" """Split text into word-based chunks matching Stage 1 chunking."""
words = text.split() words = text.split()
@@ -70,18 +106,33 @@ def chunk_text(text, chunk_size=CHUNK_SIZE_WORDS, overlap=CHUNK_OVERLAP_WORDS):
return chunks return chunks
def post_bulk(payload, batch_label=""):
"""Single POST to /episodes/bulk with consistent error handling."""
resp = requests.post(
f"{GRAPHITI_URL}/episodes/bulk",
json=payload,
timeout=INGEST_TIMEOUT
)
if not resp.ok:
prefix = f"{batch_label} " if batch_label else ""
raise RuntimeError(f"{prefix}Sidecar {resp.status_code}: {resp.text[:500]}")
return resp.json()
def ingest_to_graphiti(source, full_text, orientation): def ingest_to_graphiti(source, full_text, orientation):
""" """
Ingest document to Graphiti as chunked episodes linked by saga. Ingest document to Graphiti as chunked episodes linked by saga.
Short documents (<1500 chars) are ingested as a single episode. Three paths:
Long documents are chunked at 500 words (matching Stage 1) and - Short documents (<SINGLE_EPISODE_THRESHOLD): single episode, no saga
ingested as a bulk batch with saga=source linking them together. - Medium documents (chunks <= MAX_CHUNKS_PER_SAGA): one bulk commit, saga-linked
- Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga tag
so Graphiti links them as one document unit
""" """
char_length = len(full_text) char_length = len(full_text)
if char_length < SINGLE_EPISODE_THRESHOLD: if char_length < SINGLE_EPISODE_THRESHOLD:
# Single episode — short enough that deduplication won't block
episodes = [{ episodes = [{
"name": source, "name": source,
"content": full_text, "content": full_text,
@@ -89,27 +140,54 @@ def ingest_to_graphiti(source, full_text, orientation):
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
}] }]
log.info(f" Single episode ({char_length} chars)") log.info(f" Single episode ({char_length} chars)")
payload = {"episodes": episodes, "group_id": "aaron"} return post_bulk({"episodes": episodes, "group_id": "aaron"})
else:
# Chunk document — each chunk becomes a separate episode chunks = chunk_text(full_text)
chunks = chunk_text(full_text) total_chunks = len(chunks)
if total_chunks <= MAX_CHUNKS_PER_SAGA:
episodes = [ episodes = [
{ {
"name": f"{source} [{i+1}/{len(chunks)}]", "name": f"{source} [{i+1}/{total_chunks}]",
"content": chunk, "content": chunk,
"source_description": orientation, "source_description": orientation,
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
} }
for i, chunk in enumerate(chunks) for i, chunk in enumerate(chunks)
] ]
log.info(f" Chunked into {len(chunks)} episodes ({char_length} chars)") log.info(f" Chunked into {total_chunks} episodes ({char_length} chars)")
# saga=source links all chunks into a document unit in the graph return post_bulk(
payload = {"episodes": episodes, "group_id": "aaron", "saga": source} {"episodes": episodes, "group_id": "aaron", "saga": source}
)
resp = requests.post(f"{GRAPHITI_URL}/episodes/bulk", json=payload, timeout=INGEST_TIMEOUT) # Large document: split into batches sharing the same saga tag
if not resp.ok: batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA
raise RuntimeError(f"Sidecar {resp.status_code}: {resp.text[:500]}") log.info(
return resp.json() f" Chunked into {total_chunks} episodes ({char_length} chars); "
f"splitting into {batch_count} batches of up to {MAX_CHUNKS_PER_SAGA}"
)
last_result = None
for batch_idx in range(batch_count):
start = batch_idx * MAX_CHUNKS_PER_SAGA
end = min(start + MAX_CHUNKS_PER_SAGA, total_chunks)
batch_chunks = chunks[start:end]
episodes = [
{
"name": f"{source} [{start + i + 1}/{total_chunks}]",
"content": chunk,
"source_description": orientation,
"timestamp": datetime.now().isoformat(),
}
for i, chunk in enumerate(batch_chunks)
]
batch_label = f"batch {batch_idx + 1}/{batch_count} (chunks {start + 1}-{end})"
log.info(f" {batch_label} starting")
last_result = post_bulk(
{"episodes": episodes, "group_id": "aaron", "saga": source},
batch_label=batch_label,
)
log.info(f" {batch_label} committed")
return last_result
def process_one(row): def process_one(row):
@@ -145,6 +223,7 @@ def process_one(row):
def run(): def run():
log.info(f"Stage 3 worker starting (v{WORKER_VERSION})") log.info(f"Stage 3 worker starting (v{WORKER_VERSION})")
consecutive_failures = 0
while True: while True:
write_heartbeat() write_heartbeat()
@@ -166,11 +245,23 @@ def run():
pg.close() pg.close()
if not row: if not row:
consecutive_failures = 0
time.sleep(POLL_INTERVAL) time.sleep(POLL_INTERVAL)
continue continue
process_one(row) success = process_one(row)
time.sleep(2)
if not success:
consecutive_failures += 1
if consecutive_failures >= 2:
log.warning("Multiple consecutive failures — checking for Graphiti wedge")
recovered = recover_wedge()
if recovered:
consecutive_failures = 0
time.sleep(10)
else:
consecutive_failures = 0
time.sleep(2)
except Exception as e: except Exception as e:
log.error(f"Worker loop error: {e}") log.error(f"Worker loop error: {e}")
-210
View File
@@ -1,210 +0,0 @@
import time
import subprocess
import logging
import json
import threading
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
INGEST_SCRIPT = "/home/aaron/aaronai/scripts/ingest.py"
PYTHON = "/home/aaron/aaronai/venv/bin/python3"
LOG_FILE = "/home/aaron/aaronai/watcher.log"
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
SUPPORTED = {'.pdf', '.docx', '.pptx', '.txt', '.md'}
DEBOUNCE_SECONDS = 120
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
ingestion_state = {
"status": "idle",
"message": "",
"file_count": 0,
"started_at": None,
"finished_at": None,
"last_error": "",
}
ingestion_lock = threading.Lock()
ingestion_thread = None
def set_ingestion_state(**kwargs):
with ingestion_lock:
ingestion_state.update(kwargs)
def load_state():
if Path(STATE_FILE).exists():
with open(STATE_FILE) as f:
return json.load(f)
return {}
def save_state(state):
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
def get_changed_files():
state = load_state()
changed = []
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_dir():
continue
if path.suffix.lower() not in SUPPORTED:
continue
if path.name.startswith('.') or path.name.startswith('~$'):
continue
mtime = str(path.stat().st_mtime)
key = str(path)
if state.get(key) != mtime:
changed.append(path)
return changed, state
def run_ingestion():
changed, state = get_changed_files()
if not changed:
logging.info("No new or changed files detected — skipping ingestion.")
set_ingestion_state(status="idle", message="No changes detected", file_count=0)
return
count = len(changed)
logging.info(f"Found {count} new or changed files — starting ingestion...")
set_ingestion_state(
status="ingesting",
message=f"Ingesting {count} file(s)...",
file_count=count,
started_at=time.time(),
finished_at=None,
last_error="",
)
try:
result = subprocess.run(
[PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH],
capture_output=True,
text=True,
timeout=1800
)
if result.returncode == 0:
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_file() and path.suffix.lower() in SUPPORTED:
state[str(path)] = str(path.stat().st_mtime)
save_state(state)
logging.info("Ingestion complete. State updated.")
set_ingestion_state(
status="idle",
message=f"Last run: ingested {count} file(s) successfully",
finished_at=time.time(),
)
else:
logging.error(f"Ingestion error: {result.stderr}")
set_ingestion_state(
status="error",
message="Ingestion failed — see log",
last_error=result.stderr[-300:],
finished_at=time.time(),
)
except subprocess.TimeoutExpired:
logging.error("Ingestion timed out.")
set_ingestion_state(
status="error",
message="Ingestion timed out (>30 min)",
last_error="TimeoutExpired",
finished_at=time.time(),
)
except Exception as e:
logging.error(f"Ingestion failed: {e}")
set_ingestion_state(
status="error",
message=f"Ingestion exception: {e}",
last_error=str(e),
finished_at=time.time(),
)
def start_ingestion_thread():
global ingestion_thread
if ingestion_thread and ingestion_thread.is_alive():
logging.info("Ingestion already running — skipping.")
return
ingestion_thread = threading.Thread(target=run_ingestion, daemon=True)
ingestion_thread.start()
class IngestHandler(FileSystemEventHandler):
def __init__(self):
self.pending = False
self.last_event = 0
def on_any_event(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED:
return
if path.name.startswith('.') or path.name.startswith('~$'):
return
if 'Admin/Backups' in str(path) or 'Backups' in path.parts:
return
if 'Journal/Media' in str(path):
return
if event.event_type not in ('modified', 'created', 'moved'):
return
logging.info(f"Event: {event.event_type} {event.src_path}")
self.pending = True
self.last_event = time.time()
def write_status(handler):
with ingestion_lock:
status = {
"running": True,
"timestamp": time.time(),
"pending": handler.pending,
"last_event": handler.last_event,
"ingestion": dict(ingestion_state),
}
with open(STATUS_FILE, 'w') as f:
json.dump(status, f)
def main():
logging.info("Aaron AI Watcher starting...")
logging.info(f"Watching: {NEXTCLOUD_PATH}")
handler = IngestHandler()
observer = Observer()
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
observer.start()
try:
while True:
write_status(handler)
if handler.pending:
elapsed = time.time() - handler.last_event
if elapsed >= DEBOUNCE_SECONDS:
handler.pending = False
start_ingestion_thread()
time.sleep(5)
except KeyboardInterrupt:
observer.stop()
observer.join()
logging.info("Watcher stopped.")
if __name__ == "__main__":
main()
-448
View File
@@ -1,448 +0,0 @@
"""
Aaron AI Watcher — Stage 1 of the encoding pipeline.
Watches the Nextcloud directory for new or changed files.
On detection, chunks + embeds documents in-process (no subprocess),
then enqueues to stage_2_queue for async cascade processing.
Design principles:
- Embedding model loaded ONCE at startup, reused across all ingest runs
- In-process ingest (no subprocess) — eliminates per-run model reload memory spike
- Missed-file recovery on startup — ingests anything new since last state
- Heartbeat file updated every loop tick — enables external health monitoring
- Parity principle: no filtering, no decisions, faithful capture
- Does NOT enqueue to stage_2_queue during bulk migration (SKIP_STAGE2_ENQUEUE env var)
Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3_queue -> Stage 3 (Graphiti)
"""
import os
import time
import json
import hashlib
import logging
import threading
from pathlib import Path
import psycopg2
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from docx import Document as DocxDocument
from pypdf import PdfReader
from pptx import Presentation
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
LOG_FILE = "/home/aaron/aaronai/watcher.log"
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat"
SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"}
DEBOUNCE_SECONDS = 120
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
EMBED_MODEL = "all-MiniLM-L6-v2"
PG_DSN = os.getenv("PG_DSN")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [watcher] %(levelname)s %(message)s",
handlers=[logging.FileHandler(LOG_FILE)],
)
log = logging.getLogger("watcher")
ingestion_lock = threading.Lock()
ingestion_state = {
"status": "idle", "message": "", "file_count": 0,
"started_at": None, "finished_at": None, "last_error": "",
}
ingestion_thread = None
def load_embedder():
log.info(f"Loading embedding model: {EMBED_MODEL}")
model = SentenceTransformer(EMBED_MODEL)
log.info("Embedding model ready.")
return model
def get_pg():
return psycopg2.connect(PG_DSN)
def extract_text(path: Path) -> str:
suffix = path.suffix.lower()
try:
if suffix == ".docx":
doc = DocxDocument(path)
return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
elif suffix == ".pdf":
reader = PdfReader(path)
return "".join(
page.extract_text() + "\n"
for page in reader.pages if page.extract_text()
)
elif suffix == ".pptx":
prs = Presentation(path)
return "\n".join(
shape.text for slide in prs.slides
for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip()
)
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
log.warning(f"Text extraction failed for {path.name}: {e}")
record_ingest_failure(path, f"Text extraction failed: {e}")
return ""
def chunk_text(text: str) -> list:
words = text.split()
chunks = []
start = 0
while start < len(words):
chunk = " ".join(words[start:start + CHUNK_SIZE])
if chunk.strip():
chunks.append(chunk)
start += CHUNK_SIZE - CHUNK_OVERLAP
return chunks
def make_chunk_id(filepath: Path, chunk_index: int) -> str:
return hashlib.md5(str(filepath).encode()).hexdigest()[:8] + f"_{chunk_index}"
def enqueue_stage2(source: str, full_text: str):
if os.getenv("SKIP_STAGE2_ENQUEUE"):
return
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
char_length = EXCLUDED.char_length,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text[:50000], len(full_text)))
pg.commit()
pg.close()
except Exception as e:
log.warning(f"Stage 2 enqueue failed (non-fatal): {e}")
def record_ingest_failure(filepath: Path, error: str):
"""Write extraction or ingest failure to ingest_failures table for UI visibility."""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at)
VALUES (%s, %s, %s, 0, NOW(), NOW())
ON CONFLICT (source) DO UPDATE SET
error = EXCLUDED.error,
retry_count = ingest_failures.retry_count + 1,
last_failed_at = NOW(),
resolved = FALSE
""", (filepath.name, str(filepath), error[:1000]))
pg.commit()
pg.close()
except Exception as e:
log.warning(f"Could not record ingest failure (non-fatal): {e}")
def resolve_ingest_failure(source: str):
"""Mark a previously failed file as resolved after successful ingest."""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("UPDATE ingest_failures SET resolved = TRUE WHERE source = %s", (source,))
pg.commit()
pg.close()
except Exception as e:
log.warning(f"Could not resolve ingest failure record (non-fatal): {e}")
def ingest_file(filepath: Path, embedder) -> int:
if filepath.name.startswith(("~$", ".")):
return 0
if filepath.suffix.lower() not in SUPPORTED:
return 0
text = extract_text(filepath)
if not text.strip():
return 0
chunks = chunk_text(text)
if not chunks:
return 0
try:
embeddings = embedder.encode(chunks).tolist()
except Exception as e:
log.error(f"Embedding failed for {filepath.name}: {e}")
record_ingest_failure(filepath, f"Embedding failed: {e}")
return 0
source = filepath.name
try:
pg = get_pg()
cur = pg.cursor()
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
chunk_id = make_chunk_id(filepath, i)
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (chunk_id, chunk, embedding, source, "document",
json.dumps({"source": source, "filepath": str(filepath)})))
pg.commit()
pg.close()
except Exception as e:
log.error(f"pgvector write failed for {filepath.name}: {e}")
record_ingest_failure(filepath, f"pgvector write failed: {e}")
return 0
log.info(f"Indexed {len(chunks)} chunks: {filepath.name}")
resolve_ingest_failure(source)
enqueue_stage2(source, text)
return len(chunks)
def ingest_files(paths: list, embedder, state: dict) -> dict:
total = 0
for path in paths:
count = ingest_file(path, embedder)
total += count
state[str(path)] = str(path.stat().st_mtime)
log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.")
return state
def load_state() -> dict:
if Path(STATE_FILE).exists():
try:
with open(STATE_FILE) as f:
return json.load(f)
except Exception:
pass
return {}
def save_state(state: dict):
with open(STATE_FILE, "w") as f:
json.dump(state, f)
def get_changed_files(state: dict) -> list:
changed = []
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_dir():
continue
if path.suffix.lower() not in SUPPORTED:
continue
if path.name.startswith((".", "~$")):
continue
if "Admin/Backups" in str(path) or "Backups" in path.parts:
continue
if "Journal/Media" in str(path):
continue
if state.get(str(path)) != str(path.stat().st_mtime):
changed.append(path)
return changed
def set_ingestion_state(**kwargs):
with ingestion_lock:
ingestion_state.update(kwargs)
def write_status(handler):
with ingestion_lock:
status = {
"running": True, "timestamp": time.time(),
"pending": handler.pending, "last_event": handler.last_event,
"ingestion": dict(ingestion_state),
}
try:
with open(STATUS_FILE, "w") as f:
json.dump(status, f)
except Exception:
pass
def write_heartbeat():
try:
Path(HEARTBEAT_FILE).write_text(str(time.time()))
except Exception:
pass
def run_ingestion(embedder):
state = load_state()
changed = get_changed_files(state)
if not changed:
log.info("No new or changed files — skipping ingestion.")
set_ingestion_state(status="idle", message="No changes detected", file_count=0)
return
count = len(changed)
log.info(f"Found {count} new or changed files — starting ingestion...")
set_ingestion_state(
status="ingesting", message=f"Ingesting {count} file(s)...",
file_count=count, started_at=time.time(), finished_at=None, last_error="",
)
try:
state = ingest_files(changed, embedder, state)
save_state(state)
set_ingestion_state(
status="idle",
message=f"Last run: ingested {count} file(s) successfully",
finished_at=time.time(),
)
except Exception as e:
log.error(f"Ingestion failed: {e}")
set_ingestion_state(
status="error", message=f"Ingestion exception: {e}",
last_error=str(e), finished_at=time.time(),
)
def start_ingestion_thread(embedder):
global ingestion_thread
with ingestion_lock:
if ingestion_thread and ingestion_thread.is_alive():
log.info("Ingestion already running — skipping.")
return
ingestion_thread = threading.Thread(
target=run_ingestion, args=(embedder,), daemon=True
)
ingestion_thread.start()
class IngestHandler(FileSystemEventHandler):
def __init__(self):
self.pending = False
self.last_event = 0
def _should_ignore(self, path: Path) -> bool:
if path.name.startswith((".", "~$")):
return True
if "Admin/Backups" in str(path) or "Backups" in path.parts:
return True
if "Journal/Media" in str(path):
return True
return False
def on_created(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
return
log.info(f"Event: created {path}")
self.pending = True
self.last_event = time.time()
def on_modified(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
return
log.info(f"Event: modified {path}")
self.pending = True
self.last_event = time.time()
def on_moved(self, event):
if event.is_directory:
return
# Nextcloud WebDAV writes .part temp files then renames to final path.
# src_path is the .part file; dest_path is the final filename.
dest = Path(event.dest_path)
if dest.suffix.lower() not in SUPPORTED or self._should_ignore(dest):
return
log.info(f"Event: moved -> {dest}")
self.pending = True
self.last_event = time.time()
def on_closed(self, event):
# FileClosedEvent fires on the final file after Nextcloud completes write.
# Belt-and-suspenders catch for any write pattern not caught by on_moved.
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
return
log.info(f"Event: closed {path}")
self.pending = True
self.last_event = time.time()
def main():
log.info("Aaron AI Watcher starting...")
log.info(f"Watching: {NEXTCLOUD_PATH}")
embedder = load_embedder()
log.info("Startup scan: checking for files missed since last run...")
state = load_state()
missed = get_changed_files(state)
if missed:
log.info(f"Startup recovery: {len(missed)} missed file(s) — ingesting now.")
set_ingestion_state(
status="ingesting",
message=f"Startup recovery: ingesting {len(missed)} missed file(s)...",
file_count=len(missed), started_at=time.time(),
)
try:
state = ingest_files(missed, embedder, state)
save_state(state)
set_ingestion_state(
status="idle",
message=f"Startup recovery complete: {len(missed)} file(s) ingested.",
finished_at=time.time(),
)
except Exception as e:
log.error(f"Startup recovery failed: {e}")
set_ingestion_state(status="error", message=str(e),
last_error=str(e), finished_at=time.time())
else:
log.info("Startup scan: no missed files.")
handler = IngestHandler()
observer = Observer()
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
observer.start()
log.info("Observer started.")
try:
while True:
write_heartbeat()
write_status(handler)
if handler.pending:
elapsed = time.time() - handler.last_event
if elapsed >= DEBOUNCE_SECONDS:
handler.pending = False
start_ingestion_thread(embedder)
time.sleep(5)
except KeyboardInterrupt:
log.info("KeyboardInterrupt — stopping.")
observer.stop()
observer.join()
log.info("Watcher stopped.")
if __name__ == "__main__":
main()
-1
View File
@@ -1 +0,0 @@
1777602395.781194