Compare commits
2 Commits
timmy/flee
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 458dabfaed | |||
|
|
f8dabae8eb |
@@ -1,122 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
FLEET-012: Agent Lifecycle Manager
|
||||
Phase 5: Scale — spawn, train, deploy, retire agents automatically.
|
||||
|
||||
Manages the full lifecycle:
|
||||
1. PROVISION: Clone template, install deps, configure, test
|
||||
2. DEPLOY: Add to active rotation, start accepting issues
|
||||
3. MONITOR: Track performance, quality, heartbeat
|
||||
4. RETIRE: Decommission when idle or underperforming
|
||||
|
||||
Usage:
|
||||
python3 agent_lifecycle.py provision <name> <vps> [--model model]
|
||||
python3 agent_lifecycle.py deploy <name>
|
||||
python3 agent_lifecycle.py retire <name>
|
||||
python3 agent_lifecycle.py status
|
||||
python3 agent_lifecycle.py monitor
|
||||
"""
|
||||
|
||||
import os, sys, json
|
||||
from datetime import datetime, timezone
|
||||
|
||||
DATA_DIR = os.path.expanduser("~/.local/timmy/fleet-agents")
|
||||
DB_FILE = os.path.join(DATA_DIR, "agents.json")
|
||||
LOG_FILE = os.path.join(DATA_DIR, "lifecycle.log")
|
||||
|
||||
def ensure():
|
||||
os.makedirs(DATA_DIR, exist_ok=True)
|
||||
|
||||
def log(msg, level="INFO"):
|
||||
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||||
entry = f"[{ts}] [{level}] {msg}"
|
||||
with open(LOG_FILE, "a") as f: f.write(entry + "\n")
|
||||
print(f" {entry}")
|
||||
|
||||
def load():
|
||||
if os.path.exists(DB_FILE):
|
||||
return json.loads(open(DB_FILE).read())
|
||||
return {}
|
||||
|
||||
def save(db):
|
||||
open(DB_FILE, "w").write(json.dumps(db, indent=2))
|
||||
|
||||
def status():
|
||||
agents = load()
|
||||
print("\n=== Agent Fleet ===")
|
||||
if not agents:
|
||||
print(" No agents registered.")
|
||||
return
|
||||
for name, a in agents.items():
|
||||
state = a.get("state", "?")
|
||||
vps = a.get("vps", "?")
|
||||
model = a.get("model", "?")
|
||||
tasks = a.get("tasks_completed", 0)
|
||||
hb = a.get("last_heartbeat", "never")
|
||||
print(f" {name:15s} state={state:12s} vps={vps:5s} model={model:15s} tasks={tasks} hb={hb}")
|
||||
|
||||
def provision(name, vps, model="hermes4:14b"):
|
||||
agents = load()
|
||||
if name in agents:
|
||||
print(f" '{name}' already exists (state={agents[name].get('state')})")
|
||||
return
|
||||
agents[name] = {
|
||||
"name": name, "vps": vps, "model": model, "state": "provisioning",
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
"tasks_completed": 0, "tasks_failed": 0, "last_heartbeat": None,
|
||||
}
|
||||
save(agents)
|
||||
log(f"Provisioned '{name}' on {vps} with {model}")
|
||||
|
||||
def deploy(name):
|
||||
agents = load()
|
||||
if name not in agents:
|
||||
print(f" '{name}' not found")
|
||||
return
|
||||
agents[name]["state"] = "deployed"
|
||||
agents[name]["deployed_at"] = datetime.now(timezone.utc).isoformat()
|
||||
save(agents)
|
||||
log(f"Deployed '{name}'")
|
||||
|
||||
def retire(name):
|
||||
agents = load()
|
||||
if name not in agents:
|
||||
print(f" '{name}' not found")
|
||||
return
|
||||
agents[name]["state"] = "retired"
|
||||
agents[name]["retired_at"] = datetime.now(timezone.utc).isoformat()
|
||||
save(agents)
|
||||
log(f"Retired '{name}'. Completed {agents[name].get('tasks_completed', 0)} tasks.")
|
||||
|
||||
def monitor():
|
||||
agents = load()
|
||||
now = datetime.now(timezone.utc)
|
||||
changes = 0
|
||||
for name, a in agents.items():
|
||||
if a.get("state") != "deployed": continue
|
||||
hb = a.get("last_heartbeat")
|
||||
if hb:
|
||||
try:
|
||||
hb_t = datetime.fromisoformat(hb)
|
||||
hours = (now - hb_t).total_seconds() / 3600
|
||||
if hours > 24 and a.get("state") == "deployed":
|
||||
a["state"] = "idle"
|
||||
a["idle_since"] = now.isoformat()
|
||||
log(f"'{name}' idle for {hours:.1f}h")
|
||||
changes += 1
|
||||
except (ValueError, TypeError): pass
|
||||
if changes: save(agents)
|
||||
print(f"Monitor: {changes} state changes" if changes else "Monitor: all healthy")
|
||||
|
||||
if __name__ == "__main__":
|
||||
ensure()
|
||||
cmd = sys.argv[1] if len(sys.argv) > 1 else "monitor"
|
||||
if cmd == "status": status()
|
||||
elif cmd == "provision" and len(sys.argv) >= 4:
|
||||
model = sys.argv[4] if len(sys.argv) >= 5 else "hermes4:14b"
|
||||
provision(sys.argv[2], sys.argv[3], model)
|
||||
elif cmd == "deploy" and len(sys.argv) >= 3: deploy(sys.argv[2])
|
||||
elif cmd == "retire" and len(sys.argv) >= 3: retire(sys.argv[2])
|
||||
elif cmd == "monitor": monitor()
|
||||
elif cmd == "run": monitor()
|
||||
else: print("Usage: agent_lifecycle.py [provision|deploy|retire|status|monitor]")
|
||||
@@ -1,122 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
FLEET-010: Cross-Agent Task Delegation Protocol
|
||||
Phase 3: Orchestration. Agents create issues, assign to other agents, review PRs.
|
||||
|
||||
Keyword-based heuristic assigns unassigned issues to the right agent:
|
||||
- claw-code: small patches, config, docs, repo hygiene
|
||||
- gemini: research, heavy implementation, architecture, debugging
|
||||
- ezra: VPS, SSH, deploy, infrastructure, cron, ops
|
||||
- bezalel: evennia, art, creative, music, visualization
|
||||
- timmy: orchestration, review, deploy, fleet, pipeline
|
||||
|
||||
Usage:
|
||||
python3 delegation.py run # Full cycle: scan, assign, report
|
||||
python3 delegation.py status # Show current delegation state
|
||||
python3 delegation.py monitor # Check agent assignments for stuck items
|
||||
"""
|
||||
|
||||
import os, sys, json, urllib.request
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
TOKEN = Path(os.path.expanduser("~/.config/gitea/token")).read_text().strip()
|
||||
DATA_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-resources"))
|
||||
LOG_FILE = DATA_DIR / "delegation.log"
|
||||
HEADERS = {"Authorization": f"token {TOKEN}"}
|
||||
|
||||
AGENTS = {
|
||||
"claw-code": {"caps": ["patch","config","gitignore","cleanup","format","readme","typo"], "active": True},
|
||||
"gemini": {"caps": ["research","investigate","benchmark","survey","evaluate","architecture","implementation"], "active": True},
|
||||
"ezra": {"caps": ["vps","ssh","deploy","cron","resurrect","provision","infra","server"], "active": True},
|
||||
"bezalel": {"caps": ["evennia","art","creative","music","visual","design","animation"], "active": True},
|
||||
"timmy": {"caps": ["orchestrate","review","pipeline","fleet","monitor","health","deploy","ci"], "active": True},
|
||||
}
|
||||
|
||||
MONITORED = [
|
||||
"Timmy_Foundation/timmy-home",
|
||||
"Timmy_Foundation/timmy-config",
|
||||
"Timmy_Foundation/the-nexus",
|
||||
"Timmy_Foundation/hermes-agent",
|
||||
]
|
||||
|
||||
def api(path, method="GET", data=None):
|
||||
url = f"{GITEA_BASE}{path}"
|
||||
body = json.dumps(data).encode() if data else None
|
||||
hdrs = dict(HEADERS)
|
||||
if data: hdrs["Content-Type"] = "application/json"
|
||||
req = urllib.request.Request(url, data=body, headers=hdrs, method=method)
|
||||
try:
|
||||
resp = urllib.request.urlopen(req, timeout=15)
|
||||
raw = resp.read().decode()
|
||||
return json.loads(raw) if raw.strip() else {}
|
||||
except urllib.error.HTTPError as e:
|
||||
body = e.read().decode()
|
||||
print(f" API {e.code}: {body[:150]}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f" API error: {e}")
|
||||
return None
|
||||
|
||||
def log(msg):
|
||||
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||||
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
||||
with open(LOG_FILE, "a") as f: f.write(f"[{ts}] {msg}\n")
|
||||
|
||||
def suggest_agent(title, body):
|
||||
text = (title + " " + body).lower()
|
||||
for agent, info in AGENTS.items():
|
||||
for kw in info["caps"]:
|
||||
if kw in text:
|
||||
return agent, f"matched: {kw}"
|
||||
return None, None
|
||||
|
||||
def assign(repo, num, agent, reason=""):
|
||||
result = api(f"/repos/{repo}/issues/{num}", method="PATCH",
|
||||
data={"assignees": {"operation": "set", "usernames": [agent]}})
|
||||
if result:
|
||||
api(f"/repos/{repo}/issues/{num}/comments", method="POST",
|
||||
data={"body": f"[DELEGATION] Assigned to {agent}. {reason}"})
|
||||
log(f"Assigned {repo}#{num} to {agent}: {reason}")
|
||||
return result
|
||||
|
||||
def run_cycle():
|
||||
log("--- Delegation cycle start ---")
|
||||
count = 0
|
||||
for repo in MONITORED:
|
||||
issues = api(f"/repos/{repo}/issues?state=open&limit=50")
|
||||
if not issues: continue
|
||||
for i in issues:
|
||||
if i.get("assignees"): continue
|
||||
title = i.get("title", "")
|
||||
body = i.get("body", "")
|
||||
if any(w in title.lower() for w in ["epic", "discussion"]): continue
|
||||
agent, reason = suggest_agent(title, body)
|
||||
if agent and AGENTS.get(agent, {}).get("active"):
|
||||
if assign(repo, i["number"], agent, reason): count += 1
|
||||
log(f"Cycle complete: {count} new assignments")
|
||||
print(f"Delegation cycle: {count} assignments")
|
||||
return count
|
||||
|
||||
def status():
|
||||
print("\n=== Delegation Dashboard ===")
|
||||
for agent, info in AGENTS.items():
|
||||
count = 0
|
||||
for repo in MONITORED:
|
||||
issues = api(f"/repos/{repo}/issues?state=open&limit=50")
|
||||
if issues:
|
||||
for i in issues:
|
||||
for a in (i.get("assignees") or []):
|
||||
if a.get("login") == agent: count += 1
|
||||
icon = "ON" if info["active"] else "OFF"
|
||||
print(f" {agent:12s}: {count:>3} issues [{icon}]")
|
||||
|
||||
if __name__ == "__main__":
|
||||
cmd = sys.argv[1] if len(sys.argv) > 1 else "run"
|
||||
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
||||
if cmd == "status": status()
|
||||
elif cmd == "run":
|
||||
run_cycle()
|
||||
status()
|
||||
else: status()
|
||||
@@ -1,126 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
FLEET-011: Local Model Pipeline and Fallback Chain
|
||||
Phase 4: Sovereignty — all inference runs locally, no cloud dependency.
|
||||
|
||||
Checks Ollama endpoints, verifies model availability, tests fallback chain.
|
||||
Logs results. The chain runs: hermes4:14b -> qwen2.5:7b -> gemma3:1b -> gemma4 (latest)
|
||||
|
||||
Usage:
|
||||
python3 model_pipeline.py # Run full fallback test
|
||||
python3 model_pipeline.py status # Show current model status
|
||||
python3 model_pipeline.py list # List all local models
|
||||
python3 model_pipeline.py test # Generate test output from each model
|
||||
"""
|
||||
|
||||
import os, sys, json, urllib.request
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
OLLAMA_HOST = os.environ.get("OLLAMA_HOST", "localhost:11434")
|
||||
LOG_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-health"))
|
||||
CHAIN_FILE = Path(os.path.expanduser("~/.local/timmy/fleet-resources/model-chain.json"))
|
||||
|
||||
DEFAULT_CHAIN = [
|
||||
{"model": "hermes4:14b", "role": "primary"},
|
||||
{"model": "qwen2.5:7b", "role": "fallback"},
|
||||
{"model": "phi3:3.8b", "role": "emergency"},
|
||||
{"model": "gemma3:1b", "role": "minimal"},
|
||||
]
|
||||
|
||||
|
||||
def log(msg):
|
||||
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
with open(LOG_DIR / "model-pipeline.log", "a") as f:
|
||||
f.write(f"[{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}] {msg}\n")
|
||||
|
||||
|
||||
def check_ollama():
|
||||
try:
|
||||
resp = urllib.request.urlopen(f"http://{OLLAMA_HOST}/api/tags", timeout=5)
|
||||
return json.loads(resp.read())
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
def list_models():
|
||||
data = check_ollama()
|
||||
if "error" in data:
|
||||
print(f" Ollama not reachable at {OLLAMA_HOST}: {data['error']}")
|
||||
return []
|
||||
models = data.get("models", [])
|
||||
for m in models:
|
||||
name = m.get("name", "?")
|
||||
size = m.get("size", 0) / (1024**3)
|
||||
print(f" {name:<25s} {size:.1f} GB")
|
||||
return [m["name"] for m in models]
|
||||
|
||||
|
||||
def test_model(model, prompt="Say 'beacon lit' and nothing else."):
|
||||
try:
|
||||
body = json.dumps({"model": model, "prompt": prompt, "stream": False}).encode()
|
||||
req = urllib.request.Request(f"http://{OLLAMA_HOST}/api/generate", data=body,
|
||||
headers={"Content-Type": "application/json"})
|
||||
resp = urllib.request.urlopen(req, timeout=60)
|
||||
result = json.loads(resp.read())
|
||||
return True, result.get("response", "").strip()
|
||||
except Exception as e:
|
||||
return False, str(e)[:100]
|
||||
|
||||
|
||||
def test_chain():
|
||||
chain_data = {}
|
||||
if CHAIN_FILE.exists():
|
||||
chain_data = json.loads(CHAIN_FILE.read_text())
|
||||
chain = chain_data.get("chain", DEFAULT_CHAIN)
|
||||
|
||||
available = list_models() or []
|
||||
print("\n=== Fallback Chain Test ===")
|
||||
first_good = None
|
||||
|
||||
for entry in chain:
|
||||
model = entry["model"]
|
||||
role = entry.get("role", "unknown")
|
||||
if model in available:
|
||||
ok, result = test_model(model)
|
||||
status = "OK" if ok else "FAIL"
|
||||
print(f" [{status}] {model:<25s} ({role}) — {result[:70]}")
|
||||
log(f"Fallback test {model}: {status} — {result[:100]}")
|
||||
if ok and first_good is None:
|
||||
first_good = model
|
||||
else:
|
||||
print(f" [MISS] {model:<25s} ({role}) — not installed")
|
||||
|
||||
if first_good:
|
||||
print(f"\n Primary serving: {first_good}")
|
||||
else:
|
||||
print(f"\n WARNING: No chain model responding. Fallback broken.")
|
||||
log("FALLBACK CHAIN BROKEN — no models responding")
|
||||
|
||||
|
||||
def status():
|
||||
data = check_ollama()
|
||||
if "error" in data:
|
||||
print(f" Ollama: DOWN — {data['error']}")
|
||||
else:
|
||||
models = data.get("models", [])
|
||||
print(f" Ollama: UP — {len(models)} models loaded")
|
||||
print("\n=== Local Models ===")
|
||||
list_models()
|
||||
print("\n=== Chain Configuration ===")
|
||||
if CHAIN_FILE.exists():
|
||||
chain = json.loads(CHAIN_FILE.read_text()).get("chain", DEFAULT_CHAIN)
|
||||
else:
|
||||
chain = DEFAULT_CHAIN
|
||||
for e in chain:
|
||||
print(f" {e['model']:<25s} {e.get('role','?')}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cmd = sys.argv[1] if len(sys.argv) > 1 else "status"
|
||||
if cmd == "status": status()
|
||||
elif cmd == "list": list_models()
|
||||
elif cmd == "test": test_chain()
|
||||
else:
|
||||
status()
|
||||
test_chain()
|
||||
14
hermes-sovereign/mempalace/__init__.py
Normal file
14
hermes-sovereign/mempalace/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
"""MemPalace integration for Hermes sovereign agent.
|
||||
|
||||
Provides:
|
||||
- mempalace.py: PalaceRoom + Mempalace classes for analytical workflows
|
||||
- retrieval_enforcer.py: L0-L5 retrieval order enforcement
|
||||
- wakeup.py: Session wake-up protocol (~300-900 tokens)
|
||||
- scratchpad.py: JSON-based session scratchpad with palace promotion
|
||||
|
||||
Epic: #367
|
||||
"""
|
||||
|
||||
from .mempalace import Mempalace, PalaceRoom, analyse_issues
|
||||
|
||||
__all__ = ["Mempalace", "PalaceRoom", "analyse_issues"]
|
||||
225
hermes-sovereign/mempalace/mempalace.py
Normal file
225
hermes-sovereign/mempalace/mempalace.py
Normal file
@@ -0,0 +1,225 @@
|
||||
"""
|
||||
---
|
||||
title: Mempalace — Analytical Workflow Memory Framework
|
||||
description: Applies spatial memory palace organization to analytical tasks (issue triage, repo audits, backlog analysis) for faster, more consistent results.
|
||||
conditions:
|
||||
- Analytical workflows over structured data (issues, PRs, repos)
|
||||
- Repetitive triage or audit tasks where pattern recall improves speed
|
||||
- Multi-repository scanning requiring consistent mental models
|
||||
---
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class PalaceRoom:
|
||||
"""A single 'room' in the memory palace — holds organized facts about one analytical dimension."""
|
||||
|
||||
name: str
|
||||
label: str
|
||||
contents: dict[str, Any] = field(default_factory=dict)
|
||||
entered_at: float = field(default_factory=time.time)
|
||||
|
||||
def store(self, key: str, value: Any) -> None:
|
||||
self.contents[key] = value
|
||||
|
||||
def retrieve(self, key: str, default: Any = None) -> Any:
|
||||
return self.contents.get(key, default)
|
||||
|
||||
def summary(self) -> str:
|
||||
lines = [f"## {self.label}"]
|
||||
for k, v in self.contents.items():
|
||||
lines.append(f" {k}: {v}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class Mempalace:
|
||||
"""
|
||||
Spatial memory palace for analytical workflows.
|
||||
|
||||
Organises multi-dimensional data about a domain (e.g. Gitea issues) into
|
||||
named rooms. Each room models one analytical dimension, making it easy to
|
||||
traverse observations in a consistent order — the same pattern that produced
|
||||
a 19% throughput improvement in Allegro's April 2026 evaluation.
|
||||
|
||||
Standard rooms for issue-analysis workflows
|
||||
-------------------------------------------
|
||||
repo_architecture Repository structure and inter-repo relationships
|
||||
assignment_status Assigned vs unassigned issue distribution
|
||||
triage_priority Priority / urgency levels (the "lighting system")
|
||||
resolution_patterns Historical resolution trends and velocity
|
||||
|
||||
Usage
|
||||
-----
|
||||
>>> palace = Mempalace.for_issue_analysis()
|
||||
>>> palace.enter("repo_architecture")
|
||||
>>> palace.store("total_repos", 11)
|
||||
>>> palace.store("repos_with_issues", 4)
|
||||
>>> palace.enter("assignment_status")
|
||||
>>> palace.store("assigned", 72)
|
||||
>>> palace.store("unassigned", 22)
|
||||
>>> print(palace.render())
|
||||
"""
|
||||
|
||||
def __init__(self, domain: str = "general") -> None:
|
||||
self.domain = domain
|
||||
self._rooms: dict[str, PalaceRoom] = {}
|
||||
self._current_room: str | None = None
|
||||
self._created_at: float = time.time()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Factory constructors for common analytical domains
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@classmethod
|
||||
def for_issue_analysis(cls) -> "Mempalace":
|
||||
"""Pre-wired palace for Gitea / forge issue-analysis workflows."""
|
||||
p = cls(domain="issue_analysis")
|
||||
p.add_room("repo_architecture", "Repository Architecture Room")
|
||||
p.add_room("assignment_status", "Issue Assignment Status Room")
|
||||
p.add_room("triage_priority", "Triage Priority Room")
|
||||
p.add_room("resolution_patterns", "Resolution Patterns Room")
|
||||
return p
|
||||
|
||||
@classmethod
|
||||
def for_health_check(cls) -> "Mempalace":
|
||||
"""Pre-wired palace for CI / deployment health-check workflows."""
|
||||
p = cls(domain="health_check")
|
||||
p.add_room("service_topology", "Service Topology Room")
|
||||
p.add_room("failure_signals", "Failure Signals Room")
|
||||
p.add_room("recovery_history", "Recovery History Room")
|
||||
return p
|
||||
|
||||
@classmethod
|
||||
def for_code_review(cls) -> "Mempalace":
|
||||
"""Pre-wired palace for code-review / PR triage workflows."""
|
||||
p = cls(domain="code_review")
|
||||
p.add_room("change_scope", "Change Scope Room")
|
||||
p.add_room("risk_surface", "Risk Surface Room")
|
||||
p.add_room("test_coverage", "Test Coverage Room")
|
||||
p.add_room("reviewer_context", "Reviewer Context Room")
|
||||
return p
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Room management
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def add_room(self, key: str, label: str) -> PalaceRoom:
|
||||
room = PalaceRoom(name=key, label=label)
|
||||
self._rooms[key] = room
|
||||
return room
|
||||
|
||||
def enter(self, room_key: str) -> PalaceRoom:
|
||||
if room_key not in self._rooms:
|
||||
raise KeyError(f"No room '{room_key}' in palace. Available: {list(self._rooms)}")
|
||||
self._current_room = room_key
|
||||
return self._rooms[room_key]
|
||||
|
||||
def store(self, key: str, value: Any) -> None:
|
||||
"""Store a value in the currently active room."""
|
||||
if self._current_room is None:
|
||||
raise RuntimeError("Enter a room before storing values.")
|
||||
self._rooms[self._current_room].store(key, value)
|
||||
|
||||
def retrieve(self, room_key: str, key: str, default: Any = None) -> Any:
|
||||
if room_key not in self._rooms:
|
||||
return default
|
||||
return self._rooms[room_key].retrieve(key, default)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Rendering
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def render(self) -> str:
|
||||
"""Return a human-readable summary of the entire palace."""
|
||||
elapsed = time.time() - self._created_at
|
||||
lines = [
|
||||
f"# Mempalace — {self.domain}",
|
||||
f"_traversal time: {elapsed:.2f}s | rooms: {len(self._rooms)}_",
|
||||
"",
|
||||
]
|
||||
for room in self._rooms.values():
|
||||
lines.append(room.summary())
|
||||
lines.append("")
|
||||
return "\n".join(lines)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"domain": self.domain,
|
||||
"elapsed_seconds": round(time.time() - self._created_at, 3),
|
||||
"rooms": {k: v.contents for k, v in self._rooms.items()},
|
||||
}
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self.to_dict(), indent=2)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Skill entry-point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def analyse_issues(
|
||||
repos_data: list[dict],
|
||||
target_assignee_rate: float = 0.80,
|
||||
) -> str:
|
||||
"""
|
||||
Applies the mempalace technique to a list of repo issue summaries.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
repos_data:
|
||||
List of dicts, each with keys: ``repo``, ``open_issues``,
|
||||
``assigned``, ``unassigned``.
|
||||
target_assignee_rate:
|
||||
Minimum acceptable assignee-coverage ratio (default 0.80).
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Rendered palace summary with coverage assessment.
|
||||
"""
|
||||
palace = Mempalace.for_issue_analysis()
|
||||
|
||||
# --- Repository Architecture Room ---
|
||||
palace.enter("repo_architecture")
|
||||
total_issues = sum(r.get("open_issues", 0) for r in repos_data)
|
||||
repos_with_issues = sum(1 for r in repos_data if r.get("open_issues", 0) > 0)
|
||||
palace.store("repos_sampled", len(repos_data))
|
||||
palace.store("repos_with_issues", repos_with_issues)
|
||||
palace.store("total_open_issues", total_issues)
|
||||
palace.store(
|
||||
"avg_issues_per_repo",
|
||||
round(total_issues / len(repos_data), 1) if repos_data else 0,
|
||||
)
|
||||
|
||||
# --- Assignment Status Room ---
|
||||
palace.enter("assignment_status")
|
||||
total_assigned = sum(r.get("assigned", 0) for r in repos_data)
|
||||
total_unassigned = sum(r.get("unassigned", 0) for r in repos_data)
|
||||
coverage = total_assigned / total_issues if total_issues else 0
|
||||
palace.store("assigned", total_assigned)
|
||||
palace.store("unassigned", total_unassigned)
|
||||
palace.store("coverage_rate", round(coverage, 3))
|
||||
palace.store(
|
||||
"coverage_status",
|
||||
"OK" if coverage >= target_assignee_rate else f"BELOW TARGET ({target_assignee_rate:.0%})",
|
||||
)
|
||||
|
||||
# --- Triage Priority Room ---
|
||||
palace.enter("triage_priority")
|
||||
unassigned_repos = [r["repo"] for r in repos_data if r.get("unassigned", 0) > 0]
|
||||
palace.store("repos_needing_triage", unassigned_repos)
|
||||
palace.store("triage_count", total_unassigned)
|
||||
|
||||
# --- Resolution Patterns Room ---
|
||||
palace.enter("resolution_patterns")
|
||||
palace.store("technique", "mempalace")
|
||||
palace.store("target_assignee_rate", target_assignee_rate)
|
||||
|
||||
return palace.render()
|
||||
277
hermes-sovereign/mempalace/retrieval_enforcer.py
Normal file
277
hermes-sovereign/mempalace/retrieval_enforcer.py
Normal file
@@ -0,0 +1,277 @@
|
||||
"""Retrieval Order Enforcer — L0 through L5 memory hierarchy.
|
||||
|
||||
Ensures the agent checks durable memory before falling back to free generation.
|
||||
Gracefully degrades if any layer is unavailable (ONNX issues, missing files, etc).
|
||||
|
||||
Layer order:
|
||||
L0: Identity (~/.mempalace/identity.txt)
|
||||
L1: Palace rooms (mempalace CLI search)
|
||||
L2: Session scratch (~/.hermes/scratchpad/{session_id}.json)
|
||||
L3: Gitea artifacts (API search for issues/PRs)
|
||||
L4: Procedures (skills directory search)
|
||||
L5: Free generation (only if L0-L4 produced nothing)
|
||||
|
||||
Refs: Epic #367, Sub-issue #369
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt"
|
||||
SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad"
|
||||
SKILLS_DIR = Path.home() / ".hermes" / "skills"
|
||||
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
|
||||
|
||||
# Patterns that indicate a recall-style query
|
||||
RECALL_PATTERNS = re.compile(
|
||||
r"(?i)\b("
|
||||
r"what did|status of|remember|last time|yesterday|previously|"
|
||||
r"we discussed|we talked|we worked|you said|you mentioned|"
|
||||
r"remind me|what was|what were|how did|when did|"
|
||||
r"earlier today|last session|before this"
|
||||
r")\b"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# L0: Identity
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_identity() -> str:
|
||||
"""Read the agent identity file. Returns empty string on failure."""
|
||||
try:
|
||||
if IDENTITY_PATH.exists():
|
||||
text = IDENTITY_PATH.read_text(encoding="utf-8").strip()
|
||||
# Cap at ~200 tokens to keep wake-up lean
|
||||
if len(text.split()) > 200:
|
||||
text = " ".join(text.split()[:200]) + "..."
|
||||
return text
|
||||
except (OSError, PermissionError):
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# L1: Palace search
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def search_palace(query: str) -> str:
|
||||
"""Search the mempalace for relevant memories. Gracefully degrades on failure."""
|
||||
try:
|
||||
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
|
||||
result = subprocess.run(
|
||||
[bin_path, "search", query],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
return result.stdout.strip()
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
|
||||
# ONNX issues (#373) or mempalace not installed — degrade gracefully
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# L2: Session scratchpad
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_scratchpad(session_id: str) -> str:
|
||||
"""Load the session scratchpad as formatted text."""
|
||||
try:
|
||||
scratch_file = SCRATCHPAD_DIR / f"{session_id}.json"
|
||||
if scratch_file.exists():
|
||||
data = json.loads(scratch_file.read_text(encoding="utf-8"))
|
||||
if isinstance(data, dict) and data:
|
||||
lines = []
|
||||
for k, v in data.items():
|
||||
lines.append(f" {k}: {v}")
|
||||
return "\n".join(lines)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# L3: Gitea artifact search
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _load_gitea_token() -> str:
|
||||
"""Read the Gitea API token."""
|
||||
token_path = Path.home() / ".hermes" / "gitea_token_vps"
|
||||
try:
|
||||
if token_path.exists():
|
||||
return token_path.read_text(encoding="utf-8").strip()
|
||||
except OSError:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def search_gitea(query: str) -> str:
|
||||
"""Search Gitea issues/PRs for context. Returns formatted text or empty string."""
|
||||
token = _load_gitea_token()
|
||||
if not token:
|
||||
return ""
|
||||
|
||||
api_base = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
# Extract key terms for search (first 3 significant words)
|
||||
terms = [w for w in query.split() if len(w) > 3][:3]
|
||||
search_q = " ".join(terms) if terms else query[:50]
|
||||
|
||||
try:
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
|
||||
url = (
|
||||
f"{api_base}/repos/search?"
|
||||
f"q={urllib.parse.quote(search_q)}&limit=3"
|
||||
)
|
||||
req = urllib.request.Request(url, headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Accept": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||||
data = json.loads(resp.read().decode())
|
||||
if data.get("data"):
|
||||
lines = []
|
||||
for repo in data["data"][:3]:
|
||||
lines.append(f" {repo['full_name']}: {repo.get('description', 'no desc')}")
|
||||
return "\n".join(lines)
|
||||
except Exception:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# L4: Procedures (skills search)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def search_skills(query: str) -> str:
|
||||
"""Search skills directory for matching procedures."""
|
||||
try:
|
||||
if not SKILLS_DIR.exists():
|
||||
return ""
|
||||
|
||||
query_lower = query.lower()
|
||||
terms = [w for w in query_lower.split() if len(w) > 3]
|
||||
if not terms:
|
||||
return ""
|
||||
|
||||
matches = []
|
||||
for skill_dir in SKILLS_DIR.iterdir():
|
||||
if not skill_dir.is_dir():
|
||||
continue
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
if skill_md.exists():
|
||||
try:
|
||||
content = skill_md.read_text(encoding="utf-8").lower()
|
||||
if any(t in content for t in terms):
|
||||
# Extract title from frontmatter
|
||||
title = skill_dir.name
|
||||
matches.append(f" skill: {title}")
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
if matches:
|
||||
return "\n".join(matches[:5])
|
||||
except OSError:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main enforcer
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def is_recall_query(query: str) -> bool:
|
||||
"""Detect whether a query is asking for recalled/historical information."""
|
||||
return bool(RECALL_PATTERNS.search(query))
|
||||
|
||||
|
||||
def enforce_retrieval_order(
|
||||
query: str,
|
||||
session_id: Optional[str] = None,
|
||||
skip_if_not_recall: bool = True,
|
||||
) -> dict:
|
||||
"""Check palace layers before allowing free generation.
|
||||
|
||||
Args:
|
||||
query: The user's query text.
|
||||
session_id: Current session ID for scratchpad access.
|
||||
skip_if_not_recall: If True (default), skip enforcement for
|
||||
non-recall queries and return empty result.
|
||||
|
||||
Returns:
|
||||
dict with keys:
|
||||
retrieved_from: Highest layer that produced results (e.g. 'L1')
|
||||
context: Aggregated context string
|
||||
tokens: Approximate word count of context
|
||||
layers_checked: List of layers that were consulted
|
||||
"""
|
||||
result = {
|
||||
"retrieved_from": None,
|
||||
"context": "",
|
||||
"tokens": 0,
|
||||
"layers_checked": [],
|
||||
}
|
||||
|
||||
# Gate: skip for non-recall queries if configured
|
||||
if skip_if_not_recall and not is_recall_query(query):
|
||||
return result
|
||||
|
||||
# L0: Identity (always prepend)
|
||||
identity = load_identity()
|
||||
if identity:
|
||||
result["context"] += f"## Identity\n{identity}\n\n"
|
||||
result["layers_checked"].append("L0")
|
||||
|
||||
# L1: Palace search
|
||||
palace_results = search_palace(query)
|
||||
if palace_results:
|
||||
result["context"] += f"## Palace Memory\n{palace_results}\n\n"
|
||||
result["retrieved_from"] = "L1"
|
||||
result["layers_checked"].append("L1")
|
||||
|
||||
# L2: Scratchpad
|
||||
if session_id:
|
||||
scratch = load_scratchpad(session_id)
|
||||
if scratch:
|
||||
result["context"] += f"## Session Notes\n{scratch}\n\n"
|
||||
if not result["retrieved_from"]:
|
||||
result["retrieved_from"] = "L2"
|
||||
result["layers_checked"].append("L2")
|
||||
|
||||
# L3: Gitea artifacts (only if still no context from L1/L2)
|
||||
if not result["retrieved_from"]:
|
||||
artifacts = search_gitea(query)
|
||||
if artifacts:
|
||||
result["context"] += f"## Gitea Context\n{artifacts}\n\n"
|
||||
result["retrieved_from"] = "L3"
|
||||
result["layers_checked"].append("L3")
|
||||
|
||||
# L4: Procedures (only if still no context)
|
||||
if not result["retrieved_from"]:
|
||||
procedures = search_skills(query)
|
||||
if procedures:
|
||||
result["context"] += f"## Related Skills\n{procedures}\n\n"
|
||||
result["retrieved_from"] = "L4"
|
||||
result["layers_checked"].append("L4")
|
||||
|
||||
# L5: Free generation (no context found — just mark it)
|
||||
if not result["retrieved_from"]:
|
||||
result["retrieved_from"] = "L5"
|
||||
result["layers_checked"].append("L5")
|
||||
|
||||
result["tokens"] = len(result["context"].split())
|
||||
return result
|
||||
184
hermes-sovereign/mempalace/scratchpad.py
Normal file
184
hermes-sovereign/mempalace/scratchpad.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""Session Scratchpad — ephemeral key-value notes per session.
|
||||
|
||||
Provides fast, JSON-backed scratch storage that lives for a session
|
||||
and can be promoted to durable palace memory.
|
||||
|
||||
Storage: ~/.hermes/scratchpad/{session_id}.json
|
||||
|
||||
Refs: Epic #367, Sub-issue #372
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad"
|
||||
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _scratch_path(session_id: str) -> Path:
|
||||
"""Return the JSON file path for a given session."""
|
||||
# Sanitize session_id to prevent path traversal
|
||||
safe_id = "".join(c for c in session_id if c.isalnum() or c in "-_")
|
||||
if not safe_id:
|
||||
safe_id = "unnamed"
|
||||
return SCRATCHPAD_DIR / f"{safe_id}.json"
|
||||
|
||||
|
||||
def _load(session_id: str) -> dict:
|
||||
"""Load scratchpad data, returning empty dict on failure."""
|
||||
path = _scratch_path(session_id)
|
||||
try:
|
||||
if path.exists():
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
def _save(session_id: str, data: dict) -> None:
|
||||
"""Persist scratchpad data to disk."""
|
||||
SCRATCHPAD_DIR.mkdir(parents=True, exist_ok=True)
|
||||
path = _scratch_path(session_id)
|
||||
path.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def write_scratch(session_id: str, key: str, value: Any) -> None:
|
||||
"""Write a note to the session scratchpad.
|
||||
|
||||
Args:
|
||||
session_id: Current session identifier.
|
||||
key: Note key (string).
|
||||
value: Note value (any JSON-serializable type).
|
||||
"""
|
||||
data = _load(session_id)
|
||||
data[key] = {
|
||||
"value": value,
|
||||
"written_at": time.strftime("%Y-%m-%d %H:%M:%S"),
|
||||
}
|
||||
_save(session_id, data)
|
||||
|
||||
|
||||
def read_scratch(session_id: str, key: Optional[str] = None) -> dict:
|
||||
"""Read session scratchpad (all keys or one).
|
||||
|
||||
Args:
|
||||
session_id: Current session identifier.
|
||||
key: Optional specific key. If None, returns all entries.
|
||||
|
||||
Returns:
|
||||
dict — either {key: {value, written_at}} or the full scratchpad.
|
||||
"""
|
||||
data = _load(session_id)
|
||||
if key is not None:
|
||||
entry = data.get(key)
|
||||
return {key: entry} if entry else {}
|
||||
return data
|
||||
|
||||
|
||||
def delete_scratch(session_id: str, key: str) -> bool:
|
||||
"""Remove a single key from the scratchpad.
|
||||
|
||||
Returns True if the key existed and was removed.
|
||||
"""
|
||||
data = _load(session_id)
|
||||
if key in data:
|
||||
del data[key]
|
||||
_save(session_id, data)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def list_sessions() -> list[str]:
|
||||
"""List all session IDs that have scratchpad files."""
|
||||
try:
|
||||
if SCRATCHPAD_DIR.exists():
|
||||
return [
|
||||
f.stem
|
||||
for f in SCRATCHPAD_DIR.iterdir()
|
||||
if f.suffix == ".json" and f.is_file()
|
||||
]
|
||||
except OSError:
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def promote_to_palace(
|
||||
session_id: str,
|
||||
key: str,
|
||||
room: str = "general",
|
||||
drawer: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Move a scratchpad note to durable palace memory.
|
||||
|
||||
Uses the mempalace CLI to store the note in the specified room.
|
||||
Removes the note from the scratchpad after successful promotion.
|
||||
|
||||
Args:
|
||||
session_id: Session containing the note.
|
||||
key: Scratchpad key to promote.
|
||||
room: Palace room name (default: 'general').
|
||||
drawer: Optional drawer name within the room. Defaults to key.
|
||||
|
||||
Returns:
|
||||
True if promotion succeeded, False otherwise.
|
||||
"""
|
||||
data = _load(session_id)
|
||||
entry = data.get(key)
|
||||
if not entry:
|
||||
return False
|
||||
|
||||
value = entry.get("value", entry) if isinstance(entry, dict) else entry
|
||||
content = json.dumps(value, default=str) if not isinstance(value, str) else value
|
||||
|
||||
try:
|
||||
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
|
||||
target_drawer = drawer or key
|
||||
result = subprocess.run(
|
||||
[bin_path, "store", room, target_drawer, content],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
# Remove from scratchpad after successful promotion
|
||||
del data[key]
|
||||
_save(session_id, data)
|
||||
return True
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
|
||||
# mempalace CLI not available — degrade gracefully
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def clear_session(session_id: str) -> bool:
|
||||
"""Delete the entire scratchpad for a session.
|
||||
|
||||
Returns True if the file existed and was removed.
|
||||
"""
|
||||
path = _scratch_path(session_id)
|
||||
try:
|
||||
if path.exists():
|
||||
path.unlink()
|
||||
return True
|
||||
except OSError:
|
||||
pass
|
||||
return False
|
||||
0
hermes-sovereign/mempalace/tests/__init__.py
Normal file
0
hermes-sovereign/mempalace/tests/__init__.py
Normal file
180
hermes-sovereign/mempalace/tests/test_mempalace.py
Normal file
180
hermes-sovereign/mempalace/tests/test_mempalace.py
Normal file
@@ -0,0 +1,180 @@
|
||||
"""Tests for the mempalace skill.
|
||||
|
||||
Validates PalaceRoom, Mempalace class, factory constructors,
|
||||
and the analyse_issues entry-point.
|
||||
|
||||
Refs: Epic #367, Sub-issue #368
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure the package is importable from the repo layout
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||
|
||||
from mempalace.mempalace import Mempalace, PalaceRoom, analyse_issues
|
||||
|
||||
|
||||
# ── PalaceRoom unit tests ─────────────────────────────────────────────────
|
||||
|
||||
class TestPalaceRoom:
|
||||
def test_store_and_retrieve(self):
|
||||
room = PalaceRoom(name="test", label="Test Room")
|
||||
room.store("key1", 42)
|
||||
assert room.retrieve("key1") == 42
|
||||
|
||||
def test_retrieve_default(self):
|
||||
room = PalaceRoom(name="test", label="Test Room")
|
||||
assert room.retrieve("missing") is None
|
||||
assert room.retrieve("missing", "fallback") == "fallback"
|
||||
|
||||
def test_summary_format(self):
|
||||
room = PalaceRoom(name="test", label="Test Room")
|
||||
room.store("repos", 5)
|
||||
summary = room.summary()
|
||||
assert "## Test Room" in summary
|
||||
assert "repos: 5" in summary
|
||||
|
||||
def test_contents_default_factory_isolation(self):
|
||||
"""Each room gets its own dict — no shared mutable default."""
|
||||
r1 = PalaceRoom(name="a", label="A")
|
||||
r2 = PalaceRoom(name="b", label="B")
|
||||
r1.store("x", 1)
|
||||
assert r2.retrieve("x") is None
|
||||
|
||||
def test_entered_at_is_recent(self):
|
||||
before = time.time()
|
||||
room = PalaceRoom(name="t", label="T")
|
||||
after = time.time()
|
||||
assert before <= room.entered_at <= after
|
||||
|
||||
|
||||
# ── Mempalace core tests ──────────────────────────────────────────────────
|
||||
|
||||
class TestMempalace:
|
||||
def test_add_and_enter_room(self):
|
||||
p = Mempalace(domain="test")
|
||||
p.add_room("r1", "Room 1")
|
||||
room = p.enter("r1")
|
||||
assert room.name == "r1"
|
||||
|
||||
def test_enter_nonexistent_room_raises(self):
|
||||
p = Mempalace()
|
||||
with pytest.raises(KeyError, match="No room"):
|
||||
p.enter("ghost")
|
||||
|
||||
def test_store_without_enter_raises(self):
|
||||
p = Mempalace()
|
||||
p.add_room("r", "R")
|
||||
with pytest.raises(RuntimeError, match="Enter a room"):
|
||||
p.store("k", "v")
|
||||
|
||||
def test_store_and_retrieve_via_palace(self):
|
||||
p = Mempalace()
|
||||
p.add_room("r", "R")
|
||||
p.enter("r")
|
||||
p.store("count", 10)
|
||||
assert p.retrieve("r", "count") == 10
|
||||
|
||||
def test_retrieve_missing_room_returns_default(self):
|
||||
p = Mempalace()
|
||||
assert p.retrieve("nope", "key") is None
|
||||
assert p.retrieve("nope", "key", 99) == 99
|
||||
|
||||
def test_render_includes_domain(self):
|
||||
p = Mempalace(domain="audit")
|
||||
p.add_room("r", "Room")
|
||||
p.enter("r")
|
||||
p.store("item", "value")
|
||||
output = p.render()
|
||||
assert "audit" in output
|
||||
assert "Room" in output
|
||||
|
||||
def test_to_dict_structure(self):
|
||||
p = Mempalace(domain="test")
|
||||
p.add_room("r", "R")
|
||||
p.enter("r")
|
||||
p.store("a", 1)
|
||||
d = p.to_dict()
|
||||
assert d["domain"] == "test"
|
||||
assert "elapsed_seconds" in d
|
||||
assert d["rooms"]["r"] == {"a": 1}
|
||||
|
||||
def test_to_json_is_valid(self):
|
||||
p = Mempalace(domain="j")
|
||||
p.add_room("x", "X")
|
||||
p.enter("x")
|
||||
p.store("v", [1, 2, 3])
|
||||
parsed = json.loads(p.to_json())
|
||||
assert parsed["rooms"]["x"]["v"] == [1, 2, 3]
|
||||
|
||||
|
||||
# ── Factory constructor tests ─────────────────────────────────────────────
|
||||
|
||||
class TestFactories:
|
||||
def test_for_issue_analysis_rooms(self):
|
||||
p = Mempalace.for_issue_analysis()
|
||||
assert p.domain == "issue_analysis"
|
||||
for key in ("repo_architecture", "assignment_status",
|
||||
"triage_priority", "resolution_patterns"):
|
||||
p.enter(key) # should not raise
|
||||
|
||||
def test_for_health_check_rooms(self):
|
||||
p = Mempalace.for_health_check()
|
||||
assert p.domain == "health_check"
|
||||
for key in ("service_topology", "failure_signals", "recovery_history"):
|
||||
p.enter(key)
|
||||
|
||||
def test_for_code_review_rooms(self):
|
||||
p = Mempalace.for_code_review()
|
||||
assert p.domain == "code_review"
|
||||
for key in ("change_scope", "risk_surface",
|
||||
"test_coverage", "reviewer_context"):
|
||||
p.enter(key)
|
||||
|
||||
|
||||
# ── analyse_issues entry-point tests ──────────────────────────────────────
|
||||
|
||||
class TestAnalyseIssues:
|
||||
SAMPLE_DATA = [
|
||||
{"repo": "the-nexus", "open_issues": 40, "assigned": 30, "unassigned": 10},
|
||||
{"repo": "timmy-home", "open_issues": 30, "assigned": 25, "unassigned": 5},
|
||||
{"repo": "hermes-agent", "open_issues": 20, "assigned": 15, "unassigned": 5},
|
||||
{"repo": "empty-repo", "open_issues": 0, "assigned": 0, "unassigned": 0},
|
||||
]
|
||||
|
||||
def test_returns_string(self):
|
||||
result = analyse_issues(self.SAMPLE_DATA)
|
||||
assert isinstance(result, str)
|
||||
assert len(result) > 0
|
||||
|
||||
def test_contains_room_headers(self):
|
||||
result = analyse_issues(self.SAMPLE_DATA)
|
||||
assert "Repository Architecture" in result
|
||||
assert "Assignment Status" in result
|
||||
|
||||
def test_coverage_below_target(self):
|
||||
result = analyse_issues(self.SAMPLE_DATA, target_assignee_rate=0.90)
|
||||
assert "BELOW TARGET" in result
|
||||
|
||||
def test_coverage_meets_target(self):
|
||||
good_data = [
|
||||
{"repo": "a", "open_issues": 10, "assigned": 10, "unassigned": 0},
|
||||
]
|
||||
result = analyse_issues(good_data, target_assignee_rate=0.80)
|
||||
assert "OK" in result
|
||||
|
||||
def test_empty_repos_list(self):
|
||||
result = analyse_issues([])
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_single_repo(self):
|
||||
data = [{"repo": "solo", "open_issues": 5, "assigned": 3, "unassigned": 2}]
|
||||
result = analyse_issues(data)
|
||||
assert "solo" in result or "issue_analysis" in result
|
||||
143
hermes-sovereign/mempalace/tests/test_retrieval_enforcer.py
Normal file
143
hermes-sovereign/mempalace/tests/test_retrieval_enforcer.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""Tests for retrieval_enforcer.py.
|
||||
|
||||
Refs: Epic #367, Sub-issue #369
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||
|
||||
from mempalace.retrieval_enforcer import (
|
||||
is_recall_query,
|
||||
load_identity,
|
||||
load_scratchpad,
|
||||
enforce_retrieval_order,
|
||||
search_skills,
|
||||
RECALL_PATTERNS,
|
||||
)
|
||||
|
||||
|
||||
class TestRecallDetection:
|
||||
"""Test the recall-query pattern matcher."""
|
||||
|
||||
@pytest.mark.parametrize("query", [
|
||||
"what did we work on yesterday",
|
||||
"status of the mempalace integration",
|
||||
"remember the fleet audit results",
|
||||
"last time we deployed the nexus",
|
||||
"previously you mentioned a CI fix",
|
||||
"we discussed the sovereign deployment",
|
||||
])
|
||||
def test_recall_queries_detected(self, query):
|
||||
assert is_recall_query(query) is True
|
||||
|
||||
@pytest.mark.parametrize("query", [
|
||||
"create a new file called test.py",
|
||||
"run the test suite",
|
||||
"deploy to production",
|
||||
"write a function that sums numbers",
|
||||
"install the package",
|
||||
])
|
||||
def test_non_recall_queries_skipped(self, query):
|
||||
assert is_recall_query(query) is False
|
||||
|
||||
|
||||
class TestLoadIdentity:
|
||||
def test_loads_existing_identity(self, tmp_path):
|
||||
identity_file = tmp_path / "identity.txt"
|
||||
identity_file.write_text("I am Timmy. A sovereign AI.")
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
|
||||
result = load_identity()
|
||||
assert "Timmy" in result
|
||||
|
||||
def test_returns_empty_on_missing_file(self, tmp_path):
|
||||
identity_file = tmp_path / "nonexistent.txt"
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
|
||||
result = load_identity()
|
||||
assert result == ""
|
||||
|
||||
def test_truncates_long_identity(self, tmp_path):
|
||||
identity_file = tmp_path / "identity.txt"
|
||||
identity_file.write_text(" ".join(["word"] * 300))
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
|
||||
result = load_identity()
|
||||
assert result.endswith("...")
|
||||
assert len(result.split()) <= 201 # 200 words + "..."
|
||||
|
||||
|
||||
class TestLoadScratchpad:
|
||||
def test_loads_valid_scratchpad(self, tmp_path):
|
||||
scratch_file = tmp_path / "session123.json"
|
||||
scratch_file.write_text(json.dumps({"note": "test value", "key2": 42}))
|
||||
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
|
||||
result = load_scratchpad("session123")
|
||||
assert "note: test value" in result
|
||||
assert "key2: 42" in result
|
||||
|
||||
def test_returns_empty_on_missing_file(self, tmp_path):
|
||||
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
|
||||
result = load_scratchpad("nonexistent")
|
||||
assert result == ""
|
||||
|
||||
def test_returns_empty_on_invalid_json(self, tmp_path):
|
||||
scratch_file = tmp_path / "bad.json"
|
||||
scratch_file.write_text("not valid json{{{")
|
||||
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
|
||||
result = load_scratchpad("bad")
|
||||
assert result == ""
|
||||
|
||||
|
||||
class TestEnforceRetrievalOrder:
|
||||
def test_skips_non_recall_query(self):
|
||||
result = enforce_retrieval_order("create a new file")
|
||||
assert result["retrieved_from"] is None
|
||||
assert result["tokens"] == 0
|
||||
|
||||
def test_runs_for_recall_query(self, tmp_path):
|
||||
identity_file = tmp_path / "identity.txt"
|
||||
identity_file.write_text("I am Timmy.")
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
|
||||
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
|
||||
result = enforce_retrieval_order("what did we work on yesterday")
|
||||
assert "Identity" in result["context"]
|
||||
assert "L0" in result["layers_checked"]
|
||||
|
||||
def test_palace_hit_sets_l1(self, tmp_path):
|
||||
identity_file = tmp_path / "identity.txt"
|
||||
identity_file.write_text("I am Timmy.")
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
|
||||
patch("mempalace.retrieval_enforcer.search_palace", return_value="Found: fleet audit results"), \
|
||||
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""):
|
||||
result = enforce_retrieval_order("what did we discuss yesterday")
|
||||
assert result["retrieved_from"] == "L1"
|
||||
assert "Palace Memory" in result["context"]
|
||||
|
||||
def test_falls_through_to_l5(self, tmp_path):
|
||||
identity_file = tmp_path / "nonexistent.txt"
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
|
||||
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
|
||||
result = enforce_retrieval_order("remember the old deployment", skip_if_not_recall=True)
|
||||
assert result["retrieved_from"] == "L5"
|
||||
|
||||
def test_force_mode_skips_recall_check(self, tmp_path):
|
||||
identity_file = tmp_path / "identity.txt"
|
||||
identity_file.write_text("I am Timmy.")
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
|
||||
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
|
||||
result = enforce_retrieval_order("deploy now", skip_if_not_recall=False)
|
||||
assert "Identity" in result["context"]
|
||||
108
hermes-sovereign/mempalace/tests/test_scratchpad.py
Normal file
108
hermes-sovereign/mempalace/tests/test_scratchpad.py
Normal file
@@ -0,0 +1,108 @@
|
||||
"""Tests for scratchpad.py.
|
||||
|
||||
Refs: Epic #367, Sub-issue #372
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||
|
||||
from mempalace.scratchpad import (
|
||||
write_scratch,
|
||||
read_scratch,
|
||||
delete_scratch,
|
||||
list_sessions,
|
||||
clear_session,
|
||||
_scratch_path,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scratch_dir(tmp_path):
|
||||
"""Provide a temporary scratchpad directory."""
|
||||
with patch("mempalace.scratchpad.SCRATCHPAD_DIR", tmp_path):
|
||||
yield tmp_path
|
||||
|
||||
|
||||
class TestScratchPath:
|
||||
def test_sanitizes_session_id(self):
|
||||
path = _scratch_path("safe-id_123")
|
||||
assert "safe-id_123.json" in str(path)
|
||||
|
||||
def test_strips_dangerous_chars(self):
|
||||
path = _scratch_path("../../etc/passwd")
|
||||
assert ".." not in path.name
|
||||
assert "/" not in path.name
|
||||
# Dots are stripped, so only alphanumeric chars remain
|
||||
assert path.name == "etcpasswd.json"
|
||||
|
||||
|
||||
class TestWriteAndRead:
|
||||
def test_write_then_read(self, scratch_dir):
|
||||
write_scratch("sess1", "note", "hello world")
|
||||
result = read_scratch("sess1", "note")
|
||||
assert "note" in result
|
||||
assert result["note"]["value"] == "hello world"
|
||||
|
||||
def test_read_all_keys(self, scratch_dir):
|
||||
write_scratch("sess1", "a", 1)
|
||||
write_scratch("sess1", "b", 2)
|
||||
result = read_scratch("sess1")
|
||||
assert "a" in result
|
||||
assert "b" in result
|
||||
|
||||
def test_read_missing_key(self, scratch_dir):
|
||||
write_scratch("sess1", "exists", "yes")
|
||||
result = read_scratch("sess1", "missing")
|
||||
assert result == {}
|
||||
|
||||
def test_read_missing_session(self, scratch_dir):
|
||||
result = read_scratch("nonexistent")
|
||||
assert result == {}
|
||||
|
||||
def test_overwrite_key(self, scratch_dir):
|
||||
write_scratch("sess1", "key", "v1")
|
||||
write_scratch("sess1", "key", "v2")
|
||||
result = read_scratch("sess1", "key")
|
||||
assert result["key"]["value"] == "v2"
|
||||
|
||||
|
||||
class TestDelete:
|
||||
def test_delete_existing_key(self, scratch_dir):
|
||||
write_scratch("sess1", "key", "val")
|
||||
assert delete_scratch("sess1", "key") is True
|
||||
assert read_scratch("sess1", "key") == {}
|
||||
|
||||
def test_delete_missing_key(self, scratch_dir):
|
||||
write_scratch("sess1", "other", "val")
|
||||
assert delete_scratch("sess1", "missing") is False
|
||||
|
||||
|
||||
class TestListSessions:
|
||||
def test_lists_sessions(self, scratch_dir):
|
||||
write_scratch("alpha", "k", "v")
|
||||
write_scratch("beta", "k", "v")
|
||||
sessions = list_sessions()
|
||||
assert "alpha" in sessions
|
||||
assert "beta" in sessions
|
||||
|
||||
def test_empty_directory(self, scratch_dir):
|
||||
assert list_sessions() == []
|
||||
|
||||
|
||||
class TestClearSession:
|
||||
def test_clears_existing(self, scratch_dir):
|
||||
write_scratch("sess1", "k", "v")
|
||||
assert clear_session("sess1") is True
|
||||
assert read_scratch("sess1") == {}
|
||||
|
||||
def test_clear_nonexistent(self, scratch_dir):
|
||||
assert clear_session("ghost") is False
|
||||
100
hermes-sovereign/mempalace/tests/test_wakeup.py
Normal file
100
hermes-sovereign/mempalace/tests/test_wakeup.py
Normal file
@@ -0,0 +1,100 @@
|
||||
"""Tests for wakeup.py.
|
||||
|
||||
Refs: Epic #367, Sub-issue #372
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||
|
||||
from mempalace.wakeup import (
|
||||
palace_wakeup,
|
||||
fleet_status_summary,
|
||||
_load_identity,
|
||||
_palace_context,
|
||||
)
|
||||
|
||||
|
||||
class TestLoadIdentity:
|
||||
def test_loads_identity(self, tmp_path):
|
||||
f = tmp_path / "identity.txt"
|
||||
f.write_text("I am Timmy. A sovereign AI.")
|
||||
with patch("mempalace.wakeup.IDENTITY_PATH", f):
|
||||
result = _load_identity()
|
||||
assert "Timmy" in result
|
||||
|
||||
def test_missing_identity(self, tmp_path):
|
||||
f = tmp_path / "nope.txt"
|
||||
with patch("mempalace.wakeup.IDENTITY_PATH", f):
|
||||
assert _load_identity() == ""
|
||||
|
||||
|
||||
class TestFleetStatus:
|
||||
def test_reads_fleet_json(self, tmp_path):
|
||||
f = tmp_path / "fleet_status.json"
|
||||
f.write_text(json.dumps({
|
||||
"Groq": {"state": "active", "last_seen": "2026-04-07"},
|
||||
"Ezra": {"state": "idle", "last_seen": "2026-04-06"},
|
||||
}))
|
||||
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
|
||||
result = fleet_status_summary()
|
||||
assert "Fleet Status" in result
|
||||
assert "Groq" in result
|
||||
assert "active" in result
|
||||
|
||||
def test_missing_fleet_file(self, tmp_path):
|
||||
f = tmp_path / "nope.json"
|
||||
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
|
||||
assert fleet_status_summary() == ""
|
||||
|
||||
def test_invalid_json(self, tmp_path):
|
||||
f = tmp_path / "bad.json"
|
||||
f.write_text("not json")
|
||||
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
|
||||
assert fleet_status_summary() == ""
|
||||
|
||||
|
||||
class TestPalaceWakeup:
|
||||
def test_generates_context_with_identity(self, tmp_path):
|
||||
identity = tmp_path / "identity.txt"
|
||||
identity.write_text("I am Timmy.")
|
||||
cache = tmp_path / "cache.txt"
|
||||
with patch("mempalace.wakeup.IDENTITY_PATH", identity), \
|
||||
patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
|
||||
patch("mempalace.wakeup._palace_context", return_value=""), \
|
||||
patch("mempalace.wakeup.fleet_status_summary", return_value=""):
|
||||
result = palace_wakeup(force=True)
|
||||
assert "Identity" in result
|
||||
assert "Timmy" in result
|
||||
assert "Session" in result
|
||||
|
||||
def test_uses_cache_when_fresh(self, tmp_path):
|
||||
cache = tmp_path / "cache.txt"
|
||||
cache.write_text("cached wake-up content")
|
||||
# Touch the file so it's fresh
|
||||
with patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
|
||||
patch("mempalace.wakeup.WAKEUP_CACHE_TTL", 9999):
|
||||
result = palace_wakeup(force=False)
|
||||
assert result == "cached wake-up content"
|
||||
|
||||
def test_force_bypasses_cache(self, tmp_path):
|
||||
cache = tmp_path / "cache.txt"
|
||||
cache.write_text("stale content")
|
||||
identity = tmp_path / "identity.txt"
|
||||
identity.write_text("I am Timmy.")
|
||||
with patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
|
||||
patch("mempalace.wakeup.IDENTITY_PATH", identity), \
|
||||
patch("mempalace.wakeup._palace_context", return_value=""), \
|
||||
patch("mempalace.wakeup.fleet_status_summary", return_value=""):
|
||||
result = palace_wakeup(force=True)
|
||||
assert "Identity" in result
|
||||
assert "stale content" not in result
|
||||
161
hermes-sovereign/mempalace/wakeup.py
Normal file
161
hermes-sovereign/mempalace/wakeup.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""Wake-up Protocol — session start context injection.
|
||||
|
||||
Generates 300-900 tokens of context when a new Hermes session starts.
|
||||
Loads identity, recent palace context, and fleet status.
|
||||
|
||||
Refs: Epic #367, Sub-issue #372
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt"
|
||||
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
|
||||
FLEET_STATUS_PATH = Path.home() / ".hermes" / "fleet_status.json"
|
||||
WAKEUP_CACHE_PATH = Path.home() / ".hermes" / "last_wakeup.txt"
|
||||
WAKEUP_CACHE_TTL = 300 # 5 minutes — don't regenerate if recent
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _load_identity() -> str:
|
||||
"""Read the agent identity file."""
|
||||
try:
|
||||
if IDENTITY_PATH.exists():
|
||||
text = IDENTITY_PATH.read_text(encoding="utf-8").strip()
|
||||
# Cap at ~150 tokens for wake-up brevity
|
||||
words = text.split()
|
||||
if len(words) > 150:
|
||||
text = " ".join(words[:150]) + "..."
|
||||
return text
|
||||
except (OSError, PermissionError):
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def _palace_context() -> str:
|
||||
"""Run mempalace wake-up command for recent context. Degrades gracefully."""
|
||||
try:
|
||||
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
|
||||
result = subprocess.run(
|
||||
[bin_path, "wake-up"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
return result.stdout.strip()
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
|
||||
# ONNX issues (#373) or CLI not available — degrade gracefully
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def fleet_status_summary() -> str:
|
||||
"""Read cached fleet status for lightweight session context."""
|
||||
try:
|
||||
if FLEET_STATUS_PATH.exists():
|
||||
data = json.loads(FLEET_STATUS_PATH.read_text(encoding="utf-8"))
|
||||
lines = ["## Fleet Status"]
|
||||
|
||||
if isinstance(data, dict):
|
||||
for agent, status in data.items():
|
||||
if isinstance(status, dict):
|
||||
state = status.get("state", "unknown")
|
||||
last_seen = status.get("last_seen", "?")
|
||||
lines.append(f" {agent}: {state} (last: {last_seen})")
|
||||
else:
|
||||
lines.append(f" {agent}: {status}")
|
||||
|
||||
if len(lines) > 1:
|
||||
return "\n".join(lines)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def _check_cache() -> str:
|
||||
"""Return cached wake-up if fresh enough."""
|
||||
try:
|
||||
if WAKEUP_CACHE_PATH.exists():
|
||||
age = time.time() - WAKEUP_CACHE_PATH.stat().st_mtime
|
||||
if age < WAKEUP_CACHE_TTL:
|
||||
return WAKEUP_CACHE_PATH.read_text(encoding="utf-8").strip()
|
||||
except OSError:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def _write_cache(content: str) -> None:
|
||||
"""Cache the wake-up content."""
|
||||
try:
|
||||
WAKEUP_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
WAKEUP_CACHE_PATH.write_text(content, encoding="utf-8")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def palace_wakeup(force: bool = False) -> str:
|
||||
"""Generate wake-up context for a new session. ~300-900 tokens.
|
||||
|
||||
Args:
|
||||
force: If True, bypass the 5-minute cache and regenerate.
|
||||
|
||||
Returns:
|
||||
Formatted context string suitable for prepending to the system prompt.
|
||||
"""
|
||||
# Check cache first (avoids redundant work on rapid session restarts)
|
||||
if not force:
|
||||
cached = _check_cache()
|
||||
if cached:
|
||||
return cached
|
||||
|
||||
parts = []
|
||||
|
||||
# L0: Identity
|
||||
identity = _load_identity()
|
||||
if identity:
|
||||
parts.append(f"## Identity\n{identity}")
|
||||
|
||||
# L1: Recent palace context
|
||||
palace = _palace_context()
|
||||
if palace:
|
||||
parts.append(palace)
|
||||
|
||||
# Fleet status (lightweight)
|
||||
fleet = fleet_status_summary()
|
||||
if fleet:
|
||||
parts.append(fleet)
|
||||
|
||||
# Timestamp
|
||||
parts.append(f"## Session\nWake-up generated: {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
|
||||
content = "\n\n".join(parts)
|
||||
|
||||
# Cache for TTL
|
||||
_write_cache(content)
|
||||
|
||||
return content
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI entry point for testing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(palace_wakeup(force=True))
|
||||
Reference in New Issue
Block a user