Compare commits

..

5 Commits

Author SHA1 Message Date
379769ca6d feat(cron): Show health status in job list
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m0s
Part of #349. Shows current vs. cleared errors, success history.
2026-04-14 00:19:11 +00:00
91bc02bc38 feat(cron): Add clear-error CLI subparser
Part of #349. Adds `hermes cron clear-error JOB_ID` command.
2026-04-14 00:18:52 +00:00
77265a31e1 feat(cron): Add clear-error CLI command
Part of #349. Adds `hermes cron clear-error JOB_ID` command.
2026-04-14 00:18:30 +00:00
cf36bd2ddf feat(cron): Add clear_error action and health timestamps
Part of #349. Adds clear_error action and includes health timestamps in job format.
2026-04-14 00:18:09 +00:00
0413fc1788 feat(cron): Comprehensive stale error state handling
- mark_job_run: track last_error_at, last_success_at, error_resolved_at
- trigger_job: clear stale error state when re-triggering
- clear_job_error: manual clearing of stale errors

Closes #349
2026-04-14 00:17:45 +00:00
9 changed files with 138 additions and 586 deletions

View File

@@ -309,19 +309,7 @@ class MemoryManager:
"""Notify external providers when the built-in memory tool writes.
Skips the builtin provider itself (it's the source of the write).
Passes current MEMORY.md entries for cross-tier dedup checking.
"""
# Collect current memory entries for dedup context
memory_entries = []
for provider in self._providers:
if provider.name == "builtin" and hasattr(provider, "_store") and provider._store:
try:
store = provider._store
if hasattr(store, "get_all_entries"):
memory_entries = store.get_all_entries(target)
except Exception:
pass
for provider in self._providers:
if provider.name == "builtin":
continue
@@ -333,54 +321,6 @@ class MemoryManager:
provider.name, e,
)
def run_dedup_scan(self) -> dict:
"""Run cross-tier deduplication scan across all memory providers.
Returns a report dict with duplicates found and actions taken.
"""
report = {"status": "ok", "duplicates": 0, "actions": []}
# Collect MEMORY.md entries
memory_entries = []
builtin_store = None
for provider in self._providers:
if provider.name == "builtin" and hasattr(provider, "_store"):
builtin_store = provider._store
if builtin_store:
try:
entries = builtin_store.get_all_entries("memory")
memory_entries = entries if entries else []
except Exception:
pass
if not memory_entries:
report["status"] = "no_memory_entries"
return report
# Check each external provider for duplicates
for provider in self._providers:
if provider.name == "builtin":
continue
if not hasattr(provider, "_store") or not provider._store:
continue
try:
from plugins.memory.holographic.dedup import scan_cross_tier_duplicates
all_facts = provider._store.list_facts(min_trust=0.0, limit=1000)
dup_report = scan_cross_tier_duplicates(memory_entries, all_facts)
report["duplicates"] += dup_report.duplicates_found
if dup_report.duplicates_found > 0:
from plugins.memory.holographic.dedup import resolve_duplicates
cleaned = resolve_duplicates(dup_report, memory_entries, provider._store)
removed = len(memory_entries) - len(cleaned)
report["actions"].append(
f"{provider.name}: {dup_report.duplicates_found} duplicates, "
f"{removed} MEMORY.md entries removed"
)
except Exception as e:
logger.warning("Dedup scan failed for provider '%s': %s", provider.name, e)
return report
def on_delegation(self, task: str, result: str, *,
child_session_id: str = "", **kwargs) -> None:
"""Notify all providers that a subagent completed."""

View File

@@ -547,20 +547,30 @@ def resume_job(job_id: str) -> Optional[Dict[str, Any]]:
def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
"""Schedule a job to run on the next scheduler tick."""
"""Schedule a job to run on the next scheduler tick.
Clears stale error state when re-triggering a previously-failed job
so the stale failure doesn't persist until the next tick completes.
"""
job = get_job(job_id)
if not job:
return None
return update_job(
job_id,
{
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"next_run_at": _hermes_now().isoformat(),
},
)
updates = {
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"next_run_at": _hermes_now().isoformat(),
}
# Clear stale error state when re-triggering
if job.get("last_status") == "error":
updates["last_status"] = "retrying"
updates["last_error"] = None
updates["error_cleared_at"] = _hermes_now().isoformat()
return update_job(job_id, updates)
def run_job_now(job_id: str) -> Optional[Dict[str, Any]]:
@@ -618,6 +628,7 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
Updates last_run_at, last_status, increments completed count,
computes next_run_at, and auto-deletes if repeat limit reached.
Tracks health timestamps for error/success history.
"""
jobs = load_jobs()
for i, job in enumerate(jobs):
@@ -627,6 +638,18 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
job["last_status"] = "ok" if success else "error"
job["last_error"] = error if not success else None
# Track health timestamps
if success:
job["last_success_at"] = now
# Clear stale error tracking on success
if job.get("last_error_at"):
job["error_resolved_at"] = now
else:
job["last_error_at"] = now
# Clear resolved tracking on new error
if job.get("error_resolved_at"):
del job["error_resolved_at"]
# Increment completed count
if job.get("repeat"):
job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1
@@ -656,6 +679,32 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
save_jobs(jobs)
def clear_job_error(job_id: str) -> Optional[Dict[str, Any]]:
"""
Clear stale error state for a job.
Resets last_status to 'ok', last_error to None, and
records when the error was cleared. Useful after auth
recovery when the job itself is healthy but stale error
state persists.
Returns:
Updated job dict, or None if not found.
"""
jobs = load_jobs()
for job in jobs:
if job["id"] == job_id:
job["last_status"] = "ok"
job["last_error"] = None
job["error_cleared_at"] = _hermes_now().isoformat()
save_jobs(jobs)
return job
save_jobs(jobs)
return None
def advance_next_run(job_id: str) -> bool:
"""Preemptively advance next_run_at for a recurring job before execution.

View File

@@ -1,91 +0,0 @@
# Memory Tier Ownership
Each fact lives in exactly one tier. This prevents duplicate tokens on every
prompt injection and eliminates stale-data divergence when one copy is updated
but not the other.
## Tier 1 — MEMORY.md (Built-in)
**Purpose:** Always-on system prompt context — compact, high-signal.
**Contains:**
- Operational notes and active task state
- Immediate context the agent needs every turn
- User preferences that affect agent behavior
**Constraints:**
- Keep under 50 entries (every byte costs prompt tokens)
- Entries >100 chars should migrate to the fact store
- Managed via the `memory` tool (add/replace/remove)
**Examples:**
- "Default model: mimo-v2-pro/Nous"
- "Alexander prefers action over narration"
- "Deploy via Ansible; wants one-command redeploy"
## Tier 2 — Fact Store (Holographic)
**Purpose:** Deep structured storage with search, reasoning, and trust scoring.
**Contains:**
- `user_pref` — User preferences and habits
- `project` — Project-specific facts and conventions
- `tool` — Tool quirks, API behaviors, environment details
- `general` — Everything else worth remembering
**Advantages over MEMORY.md:**
- FTS5 full-text search
- Entity resolution (link facts to people/projects/tools)
- Trust scoring (good facts rise, bad facts sink)
- Compositional reasoning (`reason` across multiple entities)
- Duplicate detection (UNIQUE constraint + similarity matching)
- Unlimited size
**Managed via:** `fact_store` tool (add/search/probe/related/reason/contradict/update/remove/list)
## Tier 3 — MemPalace
**Purpose:** Specialized long-form archives and multi-session research.
**Contains:**
- Detailed analysis and research notes
- Multi-session task context
- Structured "palace rooms" for domain-specific knowledge
## Migration Rules
| Condition | Destination |
|-----------|------------|
| Entry >100 chars | → fact store |
| Category is `user_pref`, `project`, `tool` | → fact store |
| Needs entity linking | → fact store |
| Needs trust scoring | → fact store |
| Short operational note (<80 chars) | → MEMORY.md |
| Always-on context | → MEMORY.md |
| When in doubt | → fact store |
## Cross-Tier Deduplication
**Problem:** The `on_memory_write` bridge mirrors MEMORY.md writes to the fact
store. Without dedup, the same fact exists in both tiers — wasting tokens and
risking stale data.
**Solution:**
1. `on_memory_write` checks the fact store for similar entries before mirroring
2. Similarity threshold: 0.85 (catches rephrasings, avoids false positives)
3. If duplicate found: skip the mirror (fact store entry is canonical)
4. `dedup` action on `fact_store` tool: runtime scan + auto-resolve
5. `MemoryManager.run_dedup_scan()`: programmatic cross-tier cleanup
**Resolution strategy:** Fact store wins by default — it has trust scoring,
FTS5, and entity resolution. MEMORY.md copies are removed.
## Running Dedup
```python
# Via tool
result = fact_store(action="dedup")
# Via MemoryManager
report = memory_manager.run_dedup_scan()
```

View File

@@ -93,6 +93,39 @@ def cron_list(show_all: bool = False):
script = job.get("script")
if script:
print(f" Script: {script}")
# Show health status
last_status = job.get("last_status")
last_error = job.get("last_error")
last_error_at = job.get("last_error_at")
last_success_at = job.get("last_success_at")
error_cleared_at = job.get("error_cleared_at")
error_resolved_at = job.get("error_resolved_at")
if last_status == "error" and last_error:
if error_cleared_at or error_resolved_at:
# Error was cleared/resolved
cleared_time = error_cleared_at or error_resolved_at
print(color(f" Status: ok (error cleared)", Colors.GREEN))
print(color(f" Last error: {last_error[:80]}...", Colors.DIM))
print(color(f" Resolved: {cleared_time}", Colors.DIM))
else:
# Current error
print(color(f" Status: ERROR", Colors.RED))
print(color(f" Error: {last_error[:80]}...", Colors.RED))
if last_error_at:
print(color(f" Since: {last_error_at}", Colors.RED))
elif last_status == "retrying":
print(color(f" Status: retrying (error cleared)", Colors.YELLOW))
elif last_status == "ok":
if last_success_at:
print(color(f" Status: ok (last success: {last_success_at})", Colors.GREEN))
elif last_status:
print(f" Status: {last_status}")
# Show success history if available
if last_success_at and last_status != "error":
print(f" Last ok: {last_success_at}")
print()
from hermes_cli.gateway import find_gateway_pids
@@ -222,7 +255,18 @@ def cron_edit(args):
def _job_action(action: str, job_id: str, success_verb: str, now: bool = False) -> int:
if action == "run" and now:
if action == "clear_error":
result = _cron_api(action="clear_error", job_id=job_id)
if not result.get("success"):
print(color(f"Failed to clear error: {result.get('error', 'unknown error')}", Colors.RED))
return 1
job = result.get("job", {})
name = job.get("name", job_id)
print(color(f"Cleared stale error state for job '{name}'", Colors.GREEN))
if job.get("error_cleared_at"):
print(f" Cleared at: {job['error_cleared_at']}")
return 0
if action == "run" and now:
# Synchronous execution — run job immediately and show result
result = _cron_api(action="run_now", job_id=job_id)
if not result.get("success"):
@@ -292,9 +336,13 @@ def cron_command(args):
now = getattr(args, 'now', False)
return _job_action("run", args.job_id, "Triggered", now=now)
if subcmd == "clear-error":
return _job_action("clear_error", args.job_id, "Cleared")
if subcmd in {"remove", "rm", "delete"}:
return _job_action("remove", args.job_id, "Removed")
print(f"Unknown cron command: {subcmd}")
print("Usage: hermes cron [list|create|edit|pause|resume|run|remove|status|tick]")
print("Usage: hermes cron [list|create|edit|pause|resume|run|remove|clear-error|status|tick]")
sys.exit(1)

View File

@@ -4576,6 +4576,9 @@ For more help on a command:
cron_run.add_argument("job_id", help="Job ID to trigger")
cron_run.add_argument("--now", action="store_true", help="Execute immediately and wait for result (clears stale errors)")
cron_clear_error = cron_subparsers.add_parser("clear-error", help="Clear stale error state for a job")
cron_clear_error.add_argument("job_id", help="Job ID to clear error for")
cron_remove = cron_subparsers.add_parser("remove", aliases=["rm", "delete"], help="Remove a scheduled job")
cron_remove.add_argument("job_id", help="Job ID to remove")

View File

@@ -55,7 +55,7 @@ FACT_STORE_SCHEMA = {
"properties": {
"action": {
"type": "string",
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list", "dedup"],
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list"],
},
"content": {"type": "string", "description": "Fact content (required for 'add')."},
"query": {"type": "string", "description": "Search query (required for 'search')."},
@@ -242,48 +242,27 @@ class HolographicMemoryProvider(MemoryProvider):
self._auto_extract_facts(messages)
def on_memory_write(self, action: str, target: str, content: str) -> None:
"""Mirror built-in memory writes as facts with cross-tier dedup.
"""Mirror built-in memory writes as facts.
- add: check for duplicates first, skip if fact already exists
- replace: search for old content, update or re-add (dedup-aware)
- remove: remove matching facts (hard remove, not trust decay)
Dedup strategy: before adding, search existing facts for near-matches.
If similarity > 0.85, skip the add (existing fact store entry wins).
- add: mirror new fact to holographic store
- replace: search for old content, update or re-add
- remove: lower trust on matching facts so they fade naturally
"""
if not self._store:
return
try:
if action == "add" and content:
category = "user_pref" if target == "user" else "general"
# Cross-tier dedup: check if this fact already exists
from .dedup import is_duplicate_before_add
existing = self._store.search_facts(content.strip()[:200], limit=5)
dup = is_duplicate_before_add(content, existing)
if dup:
logger.debug(
"Skipping duplicate mirror: '%s' already exists as fact#%d",
content[:60], dup.get("fact_id", "?")
)
return
self._store.add_fact(content, category=category)
elif action == "replace" and content:
category = "user_pref" if target == "user" else "general"
# Check for duplicate before adding replacement
from .dedup import is_duplicate_before_add
existing = self._store.search_facts(content.strip()[:200], limit=5)
dup = is_duplicate_before_add(content, existing)
if dup:
logger.debug("Skipping duplicate replace mirror: fact#%d already matches", dup.get("fact_id", "?"))
return
self._store.add_fact(content, category=category)
elif action == "remove" and content:
# Hard remove matching facts (not just trust decay)
# Lower trust on matching facts so they decay naturally
results = self._store.search_facts(content, limit=5)
for fact in results:
if content.strip().lower() in fact.get("content", "").lower():
self._store.remove_fact(fact["fact_id"])
logger.debug("Removed mirrored fact#%d on memory remove", fact["fact_id"])
self._store.update_fact(fact["fact_id"], trust=max(0.0, fact.get("trust", 0.5) - 0.4))
except Exception as e:
logger.debug("Holographic memory_write mirror failed: %s", e)
@@ -372,31 +351,6 @@ class HolographicMemoryProvider(MemoryProvider):
)
return json.dumps({"facts": facts, "count": len(facts)})
elif action == "dedup":
from .dedup import scan_cross_tier_duplicates, resolve_duplicates, DedupReport
# Get all facts from store
all_facts = store.list_facts(min_trust=0.0, limit=1000)
# Get memory entries from built-in store (passed via kwargs if available)
memory_entries = kwargs.get("memory_entries", [])
if not memory_entries:
return json.dumps({
"status": "no_memory_entries",
"message": "No MEMORY.md entries provided for comparison. Use memory tool to read first.",
"fact_count": len(all_facts),
})
report = scan_cross_tier_duplicates(memory_entries, all_facts)
if report.duplicates_found == 0:
return json.dumps({"status": "clean", "message": "No cross-tier duplicates found."})
# Auto-resolve: fact store wins
cleaned = resolve_duplicates(report, memory_entries, store)
return json.dumps({
"status": "resolved",
"duplicates_found": report.duplicates_found,
"entries_removed": len(memory_entries) - len(cleaned),
"cleaned_entries": cleaned,
"summary": report.summary(),
})
else:
return json.dumps({"error": f"Unknown action: {action}"})

View File

@@ -1,191 +0,0 @@
"""Cross-tier memory deduplication.
Detects and resolves duplicate facts between MEMORY.md (built-in) and the
holographic fact store. Facts should live in exactly one tier:
Tier 1 — MEMORY.md: Always-on context (compact, <50 entries ideal).
Tier 2 — Fact store: Deep structured storage (unlimited, entity-aware).
Tier 3 — MemPalace: Specialized long-form archives.
Ownership rules:
- user_pref / project / tool facts → fact store (structured, searchable)
- "always-on" operational notes → MEMORY.md (compact, system prompt)
- When in doubt: fact store wins (it has dedup, trust scoring, FTS5)
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
_SIMILARITY_THRESHOLD = 0.85
@dataclass
class DuplicatePair:
memory_entry: str
memory_index: int
fact_store_id: int
fact_store_content: str
similarity: float
resolution: str = ""
resolved: bool = False
@dataclass
class DedupReport:
total_memory_entries: int = 0
total_facts: int = 0
duplicates_found: int = 0
pairs: List[DuplicatePair] = field(default_factory=list)
def summary(self) -> str:
lines = [
f"Cross-tier dedup: {self.total_memory_entries} MEMORY.md entries, "
f"{self.total_facts} fact store entries, "
f"{self.duplicates_found} duplicates found",
]
for p in self.pairs:
status = f"[{p.resolution}]" if p.resolved else "[PENDING]"
lines.append(
f" {status} sim={p.similarity:.2f} "
f"mem[{p.memory_index}]: {p.memory_entry[:60]} "
f"<> fact#{p.fact_store_id}: {p.fact_store_content[:60]}"
)
return "\n".join(lines)
def _normalize(text: str) -> str:
text = text.strip().lower()
text = re.sub(r"^[\\s>*\\-\\u2022]+", "", text)
text = re.sub(r"\\s+", " ", text)
text = text.rstrip(".,;:!?",)
return text
def _similarity(a: str, b: str) -> float:
if not a or not b:
return 0.0
return SequenceMatcher(None, a, b).ratio()
def scan_cross_tier_duplicates(
memory_entries: List[str],
fact_store_facts: List[Dict[str, Any]],
threshold: float = _SIMILARITY_THRESHOLD,
) -> DedupReport:
report = DedupReport(
total_memory_entries=len(memory_entries),
total_facts=len(fact_store_facts),
)
for i, mem_line in enumerate(memory_entries):
mem_norm = _normalize(mem_line)
if not mem_norm or len(mem_norm) < 10:
continue
for fact in fact_store_facts:
fact_norm = _normalize(fact.get("content", ""))
if not fact_norm or len(fact_norm) < 10:
continue
sim = _similarity(mem_norm, fact_norm)
if sim >= threshold:
report.pairs.append(DuplicatePair(
memory_entry=mem_line,
memory_index=i,
fact_store_id=fact.get("fact_id", -1),
fact_store_content=fact.get("content", ""),
similarity=sim,
))
report.duplicates_found = len(report.pairs)
return report
def classify_tier(fact_content: str, category: str = "general") -> str:
if category in ("user_pref", "project", "tool"):
return "factstore"
content = fact_content.strip()
if len(content) < 80 and any(
kw in content.lower() for kw in ("todo", "note:", "fix:", "remember:", "always", "never")
):
return "memory"
return "factstore"
def resolve_pair(pair: DuplicatePair) -> str:
pair.resolution = "keep_factstore"
pair.resolved = True
return pair.resolution
def resolve_duplicates(
report: DedupReport,
memory_entries: List[str],
fact_store=None,
) -> List[str]:
indices_to_remove = set()
for pair in report.pairs:
resolve_pair(pair)
if pair.resolution == "keep_factstore":
indices_to_remove.add(pair.memory_index)
elif pair.resolution == "keep_memory" and fact_store:
try:
fact_store.remove_fact(pair.fact_store_id)
except Exception as e:
logger.debug("Failed to remove fact %d: %s", pair.fact_store_id, e)
cleaned = [e for i, e in enumerate(memory_entries) if i not in indices_to_remove]
removed = len(memory_entries) - len(cleaned)
if removed:
logger.info("Dedup removed %d duplicate entries from MEMORY.md", removed)
return cleaned
def is_duplicate_before_add(
content: str,
existing_facts: List[Dict[str, Any]],
threshold: float = _SIMILARITY_THRESHOLD,
) -> Optional[Dict[str, Any]]:
"""Check if content is a duplicate of an existing fact BEFORE adding.
Returns the matching fact dict if duplicate, None otherwise.
Used by on_memory_write to prevent cross-tier duplication at write time.
"""
content_norm = _normalize(content)
if not content_norm or len(content_norm) < 10:
return None
for fact in existing_facts:
fact_norm = _normalize(fact.get("content", ""))
if not fact_norm or len(fact_norm) < 10:
continue
if _similarity(content_norm, fact_norm) >= threshold:
return fact
return None
TIER_OWNERSHIP_DOC = """# Memory Tier Ownership
Each fact lives in exactly one tier to prevent duplicate tokens and stale-data divergence.
## Tier 1 — MEMORY.md (built-in)
- Always-on system prompt context (compact, <50 entries ideal).
- Operational notes, active task state, immediate context.
- Managed by: `memory` tool.
## Tier 2 — Fact Store (holographic)
- Deep structured storage with search and reasoning.
- user_pref, project, tool facts; entity-linked knowledge.
- Managed by: `fact_store` tool.
- Has: FTS5 search, trust scoring, entity resolution.
## Tier 3 — MemPalace
- Specialized long-form archives and research.
## Rules
- MEMORY.md entries >100 chars → migrate to fact store.
- Structured categories (user_pref, project, tool) → fact store.
- Duplicate across tiers: fact store wins (it has trust scoring).
- `on_memory_write` checks fact store before mirroring.
"""

View File

@@ -1,178 +0,0 @@
"""Tests for cross-tier memory deduplication.
Tests the dedup module's normalize, similarity, scan, resolve, and
is_duplicate_before_add functions.
"""
import pytest
import sys
import os
# Add the plugins path so we can import dedup
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "plugins", "memory", "holographic"))
from dedup import (
_normalize,
_similarity,
scan_cross_tier_duplicates,
resolve_duplicates,
is_duplicate_before_add,
classify_tier,
DedupReport,
DuplicatePair,
)
class TestNormalize:
def test_basic_lowercasing(self):
assert _normalize("Hello World") == "hello world"
def test_strips_markdown_bullets(self):
assert _normalize("- some fact") == "some fact"
assert _normalize("* some fact") == "some fact"
assert _normalize(" - some fact ") == "some fact"
def test_strips_trailing_punctuation(self):
assert _normalize("some fact.") == "some fact"
assert _normalize("some fact,") == "some fact"
assert _normalize("some fact;") == "some fact"
def test_collapses_whitespace(self):
assert _normalize("some fact here") == "some fact here"
def test_empty_and_short(self):
assert _normalize("") == ""
assert _normalize(" ") == ""
assert _normalize("abc") == "abc"
class TestSimilarity:
def test_identical_strings(self):
assert _similarity("hello world", "hello world") == 1.0
def test_completely_different(self):
assert _similarity("abc", "xyz") < 0.3
def test_similar_rephrasing(self):
sim = _similarity("deploy via ansible", "deploy with ansible")
assert sim > 0.7
def test_empty_strings(self):
assert _similarity("", "hello") == 0.0
assert _similarity("hello", "") == 0.0
assert _similarity("", "") == 0.0
class TestScanCrossTierDuplicates:
def test_no_duplicates(self):
memory = ["Deploy via Ansible", "Use mimo-v2-pro model"]
facts = [
{"fact_id": 1, "content": "User prefers dark mode"},
{"fact_id": 2, "content": "Project uses Python 3.11"},
]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 0
assert len(report.pairs) == 0
def test_exact_duplicate(self):
memory = ["Deploy via Ansible"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 1
assert report.pairs[0].similarity == 1.0
assert report.pairs[0].fact_store_id == 1
def test_near_duplicate_above_threshold(self):
memory = ["Alexander prefers action over narration"]
facts = [{"fact_id": 1, "content": "Alexander prefers action over narration."}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 1
def test_below_threshold_not_duplicate(self):
memory = ["Deploy via Ansible on VPS"]
facts = [{"fact_id": 1, "content": "Deploy via Docker on local machine"}]
report = scan_cross_tier_duplicates(memory, facts, threshold=0.85)
assert report.duplicates_found == 0
def test_short_entries_skipped(self):
memory = ["OK", "ab"]
facts = [{"fact_id": 1, "content": "OK"}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 0
def test_multiple_duplicates(self):
memory = ["Fact A here", "Fact B here"]
facts = [
{"fact_id": 1, "content": "Fact A here"},
{"fact_id": 2, "content": "Fact B here"},
]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 2
def test_report_summary(self):
memory = ["Deploy via Ansible"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
summary = report.summary()
assert "1 MEMORY.md entries" in summary
assert "1 fact store entries" in summary
assert "1 duplicates" in summary
class TestResolveDuplicates:
def test_removes_memory_duplicates(self):
memory = ["Deploy via Ansible", "Use Python 3.11"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
cleaned = resolve_duplicates(report, memory)
assert len(cleaned) == 1
assert cleaned[0] == "Use Python 3.11"
def test_no_duplicates_returns_same(self):
memory = ["Deploy via Ansible", "Use Python 3.11"]
facts = [{"fact_id": 1, "content": "Completely different fact"}]
report = scan_cross_tier_duplicates(memory, facts)
cleaned = resolve_duplicates(report, memory)
assert len(cleaned) == 2
class TestIsDuplicateBeforeAdd:
def test_finds_duplicate(self):
existing = [{"fact_id": 1, "content": "Deploy via Ansible"}]
result = is_duplicate_before_add("Deploy via Ansible", existing)
assert result is not None
assert result["fact_id"] == 1
def test_no_duplicate_returns_none(self):
existing = [{"fact_id": 1, "content": "Use dark mode"}]
result = is_duplicate_before_add("Deploy via Ansible", existing)
assert result is None
def test_short_content_returns_none(self):
existing = [{"fact_id": 1, "content": "OK"}]
result = is_duplicate_before_add("OK", existing)
assert result is None
def test_empty_existing_returns_none(self):
result = is_duplicate_before_add("Some fact here", [])
assert result is None
class TestClassifyTier:
def test_user_pref_goes_to_factstore(self):
assert classify_tier("anything", "user_pref") == "factstore"
def test_project_goes_to_factstore(self):
assert classify_tier("anything", "project") == "factstore"
def test_short_operational_note_goes_to_memory(self):
assert classify_tier("remember: always use sudo") == "memory"
assert classify_tier("todo: fix the deploy script") == "memory"
def test_long_fact_goes_to_factstore(self):
long_fact = "The deployment process requires running ansible-playbook with the production inventory file and verifying health checks after completion"
assert classify_tier(long_fact) == "factstore"
def test_general_short_goes_to_factstore(self):
# Short but not operational
assert classify_tier("user likes dark mode") == "factstore"

View File

@@ -201,6 +201,17 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
"paused_at": job.get("paused_at"),
"paused_reason": job.get("paused_reason"),
}
# Health timestamps
if job.get("last_error_at"):
result["last_error_at"] = job["last_error_at"]
if job.get("last_success_at"):
result["last_success_at"] = job["last_success_at"]
if job.get("error_resolved_at"):
result["error_resolved_at"] = job["error_resolved_at"]
if job.get("error_cleared_at"):
result["error_cleared_at"] = job["error_cleared_at"]
if job.get("script"):
result["script"] = job["script"]
return result
@@ -326,6 +337,13 @@ def cronjob(
if result is None:
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
return json.dumps(result, indent=2)
if normalized == "clear_error":
from cron.jobs import clear_job_error
job = clear_job_error(job_id)
if job is None:
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
return json.dumps({"success": True, "job": _format_job(job)}, indent=2)
if normalized == "update":
updates: Dict[str, Any] = {}