Compare commits

..

10 Commits

Author SHA1 Message Date
f2edb6a9b3 merge: feat(scripts): add GOFAI symbolic forward-chaining rule engine (#470)
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 8s
Smoke Test / smoke (push) Failing after 6s
Validate Config / YAML Lint (push) Failing after 6s
Validate Config / JSON Validate (push) Successful in 6s
Validate Config / Python Syntax & Import Check (push) Failing after 8s
Validate Config / Shell Script Lint (push) Successful in 13s
Validate Config / Cron Syntax Check (push) Successful in 6s
Validate Config / Deploy Script Dry Run (push) Successful in 7s
Validate Config / Playbook Schema Validation (push) Successful in 9s
Architecture Lint / Lint Repository (push) Failing after 7s
Auto-merged by PR triage bot
2026-04-11 02:08:02 +00:00
fc817c6a84 merge: feat(scripts): add GOFAI symbolic knowledge base (#471)
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 9s
Architecture Lint / Lint Repository (push) Has been cancelled
Smoke Test / smoke (push) Has been cancelled
Validate Config / YAML Lint (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Auto-merged by PR triage bot
2026-04-11 02:07:46 +00:00
a620bd19b3 merge: feat(scripts): add GOFAI STRIPS goal-directed planner (#472)
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 10s
Smoke Test / smoke (push) Failing after 6s
Architecture Lint / Lint Repository (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / YAML Lint (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Auto-merged by PR triage bot
2026-04-11 02:07:15 +00:00
0c98bce77f merge: feat(scripts): add GOFAI temporal reasoning engine (#473)
Some checks failed
Architecture Lint / Lint Repository (push) Has been cancelled
Architecture Lint / Linter Tests (push) Has been cancelled
Smoke Test / smoke (push) Has been cancelled
Validate Config / YAML Lint (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Auto-merged by PR triage bot
2026-04-11 02:07:04 +00:00
c01e7f7d7f merge: test(scripts): lock self_healing safe CLI behavior (#469)
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 11s
Architecture Lint / Lint Repository (push) Has been cancelled
Smoke Test / smoke (push) Failing after 10s
Validate Config / YAML Lint (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Auto-merged by PR triage bot
2026-04-11 02:06:42 +00:00
20bc0aa41a feat(scripts): add GOFAI temporal reasoning engine
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 8s
PR Checklist / pr-checklist (pull_request) Failing after 1m12s
Smoke Test / smoke (pull_request) Failing after 7s
Validate Config / YAML Lint (pull_request) Failing after 6s
Validate Config / JSON Validate (pull_request) Successful in 5s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 7s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 4s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 4s
Validate Config / Playbook Schema Validation (pull_request) Successful in 7s
Architecture Lint / Lint Repository (pull_request) Failing after 6s
2026-04-11 01:40:24 +00:00
b6c0620c83 feat(scripts): add GOFAI STRIPS goal-directed planner
Some checks failed
Validate Config / Deploy Script Dry Run (pull_request) Successful in 5s
Validate Config / Playbook Schema Validation (pull_request) Successful in 8s
Architecture Lint / Lint Repository (pull_request) Failing after 6s
Architecture Lint / Linter Tests (pull_request) Successful in 7s
PR Checklist / pr-checklist (pull_request) Failing after 1m9s
Smoke Test / smoke (pull_request) Failing after 7s
Validate Config / YAML Lint (pull_request) Failing after 6s
Validate Config / JSON Validate (pull_request) Successful in 6s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 8s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 5s
2026-04-11 01:36:03 +00:00
d43deb1d79 feat(scripts): add GOFAI symbolic knowledge base
Some checks failed
Validate Config / Playbook Schema Validation (pull_request) Successful in 9s
PR Checklist / pr-checklist (pull_request) Failing after 1m12s
Validate Config / JSON Validate (pull_request) Successful in 6s
Smoke Test / smoke (pull_request) Failing after 6s
Validate Config / YAML Lint (pull_request) Failing after 5s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 7s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 5s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 5s
Architecture Lint / Lint Repository (pull_request) Failing after 8s
Architecture Lint / Linter Tests (pull_request) Successful in 9s
2026-04-11 01:33:05 +00:00
17de7f5df1 feat(scripts): add symbolic forward-chaining rule engine
Some checks failed
Validate Config / Playbook Schema Validation (pull_request) Successful in 7s
Architecture Lint / Linter Tests (pull_request) Successful in 10s
Architecture Lint / Lint Repository (pull_request) Failing after 7s
PR Checklist / pr-checklist (pull_request) Failing after 1m15s
Validate Config / JSON Validate (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Failing after 6s
Validate Config / YAML Lint (pull_request) Failing after 5s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 6s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 6s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 6s
2026-04-11 01:25:34 +00:00
Alexander Whitestone
06031d923f test(scripts): lock self_healing safe CLI behavior (#435)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 8s
PR Checklist / pr-checklist (pull_request) Failing after 1m18s
Architecture Lint / Lint Repository (pull_request) Failing after 8s
2026-04-10 21:11:47 -04:00
7 changed files with 1301 additions and 328 deletions

View File

@@ -1,321 +0,0 @@
#!/usr/bin/env python3
"""constraint_planner.py — Constraint-satisfaction planner for the Timmy Foundation fleet.
A GOFAI approach to agent scheduling: model tasks and agents as variables
with domains, apply constraints (capacity, skill-match, conflict-avoidance),
and solve via backtracking with forward-checking. No gradient descent, no
embeddings — just logic.
Usage:
python constraint_planner.py --tasks tasks.yaml
python constraint_planner.py --self-test
"""
from __future__ import annotations
import argparse
import json
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
try:
import yaml
except ImportError:
yaml = None
# ---------------------------------------------------------------------------
# Domain types
# ---------------------------------------------------------------------------
@dataclass
class Agent:
name: str
skills: Set[str] = field(default_factory=set)
max_concurrent: int = 3
current_load: int = 0
@dataclass
class Task:
name: str
required_skills: Set[str] = field(default_factory=set)
priority: int = 0
repo: str = ""
conflicts_with: Set[str] = field(default_factory=set) # task names
@dataclass
class Assignment:
task: str
agent: str
reason: str
# ---------------------------------------------------------------------------
# Constraint definitions
# ---------------------------------------------------------------------------
Constraint = Callable[[Dict[str, str], str, str], bool]
def skill_constraint(agents: Dict[str, Agent], tasks: Dict[str, Task]) -> Constraint:
"""Agent must have at least one required skill."""
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
task = tasks[task_name]
agent = agents[agent_name]
if not task.required_skills:
return True
return bool(task.required_skills & agent.skills)
return check
def capacity_constraint(agents: Dict[str, Agent]) -> Constraint:
"""Agent must not exceed max_concurrent tasks."""
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
agent = agents[agent_name]
current = sum(1 for v in assignment.values() if v == agent_name)
return current < agent.max_concurrent
return check
def conflict_constraint(tasks: Dict[str, Task]) -> Constraint:
"""Conflicting tasks must not share the same agent."""
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
task = tasks[task_name]
for conflict in task.conflicts_with:
if conflict in assignment and assignment[conflict] == agent_name:
return False
return True
return check
# ---------------------------------------------------------------------------
# Solver
# ---------------------------------------------------------------------------
class ConstraintPlanner:
"""Backtracking CSP solver with forward-checking."""
def __init__(
self,
agents: List[Agent],
tasks: List[Task],
extra_constraints: Optional[List[Constraint]] = None,
):
self._agents = {a.name: a for a in agents}
self._tasks = {t.name: t for t in sorted(tasks, key=lambda t: -t.priority)}
self._constraints: List[Constraint] = [
skill_constraint(self._agents, self._tasks),
capacity_constraint(self._agents),
conflict_constraint(self._tasks),
]
if extra_constraints:
self._constraints.extend(extra_constraints)
self._trace: List[str] = []
def solve(self) -> Optional[Dict[str, str]]:
"""Return {task_name: agent_name} or None if unsatisfiable."""
task_names = list(self._tasks.keys())
agent_names = list(self._agents.keys())
assignment: Dict[str, str] = {}
if self._backtrack(assignment, task_names, agent_names, 0):
return assignment
return None
def _backtrack(
self,
assignment: Dict[str, str],
task_names: List[str],
agent_names: List[str],
idx: int,
) -> bool:
if idx == len(task_names):
return True
task_name = task_names[idx]
for agent_name in agent_names:
if self._consistent(assignment, task_name, agent_name):
assignment[task_name] = agent_name
self._trace.append(f"assign {task_name} -> {agent_name}")
if self._backtrack(assignment, task_names, agent_names, idx + 1):
return True
del assignment[task_name]
self._trace.append(f"backtrack {task_name} x {agent_name}")
return False
def _consistent(
self, assignment: Dict[str, str], task_name: str, agent_name: str
) -> bool:
return all(
c(assignment, task_name, agent_name) for c in self._constraints
)
def explain(self, assignment: Dict[str, str]) -> List[Assignment]:
"""Generate human-readable explanations for each assignment."""
results = []
for task_name, agent_name in assignment.items():
task = self._tasks[task_name]
agent = self._agents[agent_name]
overlap = task.required_skills & agent.skills
reason = f"skills={','.join(sorted(overlap))}" if overlap else "no-skill-required"
results.append(Assignment(task=task_name, agent=agent_name, reason=reason))
return results
@property
def trace(self) -> List[str]:
return list(self._trace)
# -- serialisation helpers -----------------------------------------------
@classmethod
def from_dicts(cls, data: Dict) -> "ConstraintPlanner":
agents = [
Agent(
name=a["name"],
skills=set(a.get("skills", [])),
max_concurrent=a.get("max_concurrent", 3),
)
for a in data.get("agents", [])
]
tasks = [
Task(
name=t["name"],
required_skills=set(t.get("required_skills", [])),
priority=t.get("priority", 0),
repo=t.get("repo", ""),
conflicts_with=set(t.get("conflicts_with", [])),
)
for t in data.get("tasks", [])
]
return cls(agents, tasks)
@classmethod
def from_file(cls, path: Path) -> "ConstraintPlanner":
text = path.read_text()
if path.suffix in (".yaml", ".yml"):
if yaml is None:
raise RuntimeError("PyYAML required for .yaml")
data = yaml.safe_load(text)
else:
data = json.loads(text)
return cls.from_dicts(data)
# ---------------------------------------------------------------------------
# Built-in fleet configuration
# ---------------------------------------------------------------------------
DEFAULT_FLEET = {
"agents": [
{"name": "timmy", "skills": ["ci", "security", "infra", "hotfix"], "max_concurrent": 4},
{"name": "gemini", "skills": ["architecture", "sovereignty", "review"], "max_concurrent": 3},
{"name": "allegro", "skills": ["review", "docs", "quality"], "max_concurrent": 2},
{"name": "claude", "skills": ["frontend", "3d", "volume"], "max_concurrent": 5},
{"name": "rockachopa", "skills": ["sovereignty", "design"], "max_concurrent": 2},
],
"tasks": [
{"name": "fix-ci-pipeline", "required_skills": ["ci"], "priority": 10},
{"name": "security-audit", "required_skills": ["security"], "priority": 9},
{"name": "nexus-ui-overhaul", "required_skills": ["frontend", "3d"], "priority": 5},
{"name": "sovereign-orchestrator", "required_skills": ["sovereignty", "architecture"], "priority": 8},
{"name": "review-pr-backlog", "required_skills": ["review"], "priority": 7},
{"name": "fleet-ops-hardening", "required_skills": ["infra"], "priority": 6},
{"name": "mnemosyne-persistence", "required_skills": ["frontend"], "priority": 4,
"conflicts_with": ["nexus-ui-overhaul"]},
],
}
# ---------------------------------------------------------------------------
# Self-test
# ---------------------------------------------------------------------------
def _self_test() -> bool:
planner = ConstraintPlanner.from_dicts(DEFAULT_FLEET)
result = planner.solve()
assert result is not None, "expected a solution"
# Verify skill constraints
for task_name, agent_name in result.items():
task = planner._tasks[task_name]
agent = planner._agents[agent_name]
if task.required_skills:
assert task.required_skills & agent.skills, (
f"{agent_name} lacks skills for {task_name}"
)
# Verify capacity constraints
from collections import Counter
counts = Counter(result.values())
for agent_name, count in counts.items():
agent = planner._agents[agent_name]
assert count <= agent.max_concurrent, (
f"{agent_name} overloaded: {count} > {agent.max_concurrent}"
)
# Verify conflict constraints
for task_name, agent_name in result.items():
task = planner._tasks[task_name]
for conflict in task.conflicts_with:
if conflict in result:
assert result[conflict] != agent_name, (
f"{task_name} and {conflict} both assigned to {agent_name}"
)
# Verify unsatisfiable detection
impossible = {
"agents": [{"name": "solo", "skills": ["a"], "max_concurrent": 1}],
"tasks": [
{"name": "t1", "required_skills": ["a"], "priority": 1},
{"name": "t2", "required_skills": ["a"], "priority": 1},
],
}
p2 = ConstraintPlanner.from_dicts(impossible)
assert p2.solve() is None, "expected None for unsatisfiable problem"
print("All self-test scenarios passed.")
explanations = planner.explain(result)
for e in explanations:
print(f" {e.task} -> {e.agent} ({e.reason})")
return True
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--tasks", type=Path, help="YAML/JSON task+agent file")
ap.add_argument("--self-test", action="store_true")
ap.add_argument("--json", action="store_true")
args = ap.parse_args()
if args.self_test:
sys.exit(0 if _self_test() else 1)
if args.tasks:
planner = ConstraintPlanner.from_file(args.tasks)
else:
planner = ConstraintPlanner.from_dicts(DEFAULT_FLEET)
result = planner.solve()
if result is None:
print("No valid assignment found. Constraints are unsatisfiable.")
sys.exit(1)
explanations = planner.explain(result)
if args.json:
print(json.dumps(
{"assignments": [{"task": e.task, "agent": e.agent, "reason": e.reason} for e in explanations],
"trace": planner.trace},
indent=2,
))
else:
print(f"Plan ({len(result)} assignments):")
for e in explanations:
print(f" {e.task} -> {e.agent} ({e.reason})")
if planner.trace:
print(f"\nSolver trace ({len(planner.trace)} steps):")
for line in planner.trace:
print(f" {line}")
if __name__ == "__main__":
main()

341
scripts/knowledge_base.py Normal file
View File

@@ -0,0 +1,341 @@
#!/usr/bin/env python3
"""knowledge_base.py - GOFAI symbolic knowledge base for the Timmy Foundation fleet.
A classical AI knowledge representation system: stores facts as ground atoms,
supports first-order-logic-style queries, and maintains a provenance chain so
every belief can be traced back to its source. No neural nets, no embeddings -
just structured symbolic reasoning over a typed fact store.
Usage:
kb = KnowledgeBase()
kb.assert_fact('agent', 'online', 'timmy')
kb.assert_fact('task', 'assigned_to', 'task-42', 'timmy')
results = kb.query('task', 'assigned_to', '?x', 'timmy')
# results -> [{'?x': 'task-42'}]
CLI:
python knowledge_base.py --assert "agent online hermes"
python knowledge_base.py --query "agent online ?who"
python knowledge_base.py --dump
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
VAR_PREFIX = "?"
def is_var(term: str) -> bool:
"""Return True if *term* is a logic variable (starts with '?')."""
return term.startswith(VAR_PREFIX)
@dataclass(frozen=True)
class Fact:
"""An immutable ground atom: (relation, *args)."""
relation: str
args: Tuple[str, ...]
source: str = "user"
timestamp: float = field(default_factory=time.time)
def __str__(self) -> str:
args_str = " ".join(self.args)
return f"({self.relation} {args_str})"
Bindings = Dict[str, str]
# ---------------------------------------------------------------------------
# Unification
# ---------------------------------------------------------------------------
def unify_term(pattern: str, value: str, bindings: Bindings) -> Optional[Bindings]:
"""Unify a single pattern term against a ground value.
Returns updated bindings on success, or None on failure.
"""
if is_var(pattern):
if pattern in bindings:
return bindings if bindings[pattern] == value else None
return {**bindings, pattern: value}
return bindings if pattern == value else None
def unify_fact(
pattern: Tuple[str, ...], fact_args: Tuple[str, ...], bindings: Bindings
) -> Optional[Bindings]:
"""Unify a full argument tuple, returning final bindings or None."""
if len(pattern) != len(fact_args):
return None
b = bindings
for p, v in zip(pattern, fact_args):
b = unify_term(p, v, b)
if b is None:
return None
return b
# ---------------------------------------------------------------------------
# Knowledge Base
# ---------------------------------------------------------------------------
class KnowledgeBase:
"""In-memory symbolic knowledge base with optional JSON persistence."""
def __init__(self, persist_path: Optional[Path] = None) -> None:
self._facts: List[Fact] = []
self._persist_path = persist_path
if persist_path and persist_path.exists():
self._load(persist_path)
# ------------------------------------------------------------------
# Fact management
# ------------------------------------------------------------------
def assert_fact(
self, relation: str, *args: str, source: str = "user"
) -> Fact:
"""Add a ground fact to the knowledge base.
Idempotent: duplicate (relation, args) pairs are not added twice.
"""
f = Fact(relation=relation, args=tuple(args), source=source, timestamp=time.time())
for existing in self._facts:
if existing.relation == f.relation and existing.args == f.args:
return existing # already known
self._facts.append(f)
if self._persist_path:
self._save(self._persist_path)
return f
def retract_fact(self, relation: str, *args: str) -> int:
"""Remove all facts matching (relation, *args). Returns count removed."""
before = len(self._facts)
self._facts = [
f
for f in self._facts
if not (f.relation == relation and f.args == tuple(args))
]
removed = before - len(self._facts)
if removed and self._persist_path:
self._save(self._persist_path)
return removed
# ------------------------------------------------------------------
# Query
# ------------------------------------------------------------------
def query(
self, relation: str, *pattern_args: str, source_filter: Optional[str] = None
) -> List[Bindings]:
"""Return all binding dictionaries satisfying the query pattern.
Variables in *pattern_args* start with '?'. Ground terms must match
exactly. An empty binding dict means the fact matched with no
variables to bind.
Args:
relation: The relation name to match.
*pattern_args: Mixed ground/variable argument tuple.
source_filter: Optional provenance filter (e.g. 'scheduler').
Returns:
List of binding dicts, one per matching fact.
"""
results: List[Bindings] = []
for fact in self._facts:
if fact.relation != relation:
continue
if source_filter and fact.source != source_filter:
continue
b = unify_fact(tuple(pattern_args), fact.args, {})
if b is not None:
results.append(b)
return results
def query_one(
self, relation: str, *pattern_args: str
) -> Optional[Bindings]:
"""Return the first matching binding dict or None."""
for b in self.query(relation, *pattern_args):
return b
return None
def facts_for(self, relation: str) -> Iterator[Fact]:
"""Iterate over all facts with the given relation."""
for f in self._facts:
if f.relation == relation:
yield f
# ------------------------------------------------------------------
# Bulk operations
# ------------------------------------------------------------------
def all_facts(self) -> List[Fact]:
"""Return a snapshot of all stored facts."""
return list(self._facts)
def fact_count(self) -> int:
return len(self._facts)
def clear(self) -> None:
"""Remove all facts from memory (does not touch disk)."""
self._facts.clear()
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
def _save(self, path: Path) -> None:
records = [
{
"relation": f.relation,
"args": list(f.args),
"source": f.source,
"timestamp": f.timestamp,
}
for f in self._facts
]
path.write_text(json.dumps(records, indent=2))
def _load(self, path: Path) -> None:
try:
records = json.loads(path.read_text())
for r in records:
self._facts.append(
Fact(
relation=r["relation"],
args=tuple(r["args"]),
source=r.get("source", "persisted"),
timestamp=r.get("timestamp", 0.0),
)
)
except (json.JSONDecodeError, KeyError) as exc:
print(f"[kb] Warning: could not load {path}: {exc}", file=sys.stderr)
def save_to(self, path: Path) -> None:
"""Explicitly save to a given path."""
self._save(path)
# ------------------------------------------------------------------
# Debug / display
# ------------------------------------------------------------------
def dump(self, relation_filter: Optional[str] = None) -> None:
"""Print all (or filtered) facts to stdout."""
for f in self._facts:
if relation_filter and f.relation != relation_filter:
continue
print(f" {f} [source={f.source}]")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _parse_terms(raw: str) -> List[str]:
"""Split a whitespace-separated string into terms."""
return raw.strip().split()
def main() -> None:
parser = argparse.ArgumentParser(
description="GOFAI symbolic knowledge base CLI"
)
parser.add_argument(
"--db",
default="kb.json",
help="Path to persistent JSON store (default: kb.json)",
)
parser.add_argument(
"--assert",
dest="assert_stmt",
metavar="RELATION ARG...",
help='Assert a fact, e.g. --assert "agent online timmy"',
)
parser.add_argument(
"--retract",
dest="retract_stmt",
metavar="RELATION ARG...",
help='Retract a fact, e.g. --retract "agent online timmy"',
)
parser.add_argument(
"--query",
dest="query_stmt",
metavar="RELATION ARG...",
help='Query the KB, e.g. --query "agent online ?who"',
)
parser.add_argument(
"--dump",
action="store_true",
help="Dump all facts",
)
parser.add_argument(
"--relation",
help="Filter --dump to a specific relation",
)
args = parser.parse_args()
db_path = Path(args.db)
kb = KnowledgeBase(persist_path=db_path)
if args.assert_stmt:
terms = _parse_terms(args.assert_stmt)
if len(terms) < 2:
print("ERROR: --assert requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
fact = kb.assert_fact(terms[0], *terms[1:], source="cli")
print(f"Asserted: {fact}")
if args.retract_stmt:
terms = _parse_terms(args.retract_stmt)
if len(terms) < 2:
print("ERROR: --retract requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
n = kb.retract_fact(terms[0], *terms[1:])
print(f"Retracted {n} fact(s).")
if args.query_stmt:
terms = _parse_terms(args.query_stmt)
if len(terms) < 2:
print("ERROR: --query requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
results = kb.query(terms[0], *terms[1:])
if not results:
print("No results.")
else:
for i, b in enumerate(results, 1):
if b:
bindings_str = ", ".join(f"{k}={v}" for k, v in b.items())
print(f" [{i}] {bindings_str}")
else:
print(f" [{i}] (ground match)")
if args.dump:
count = kb.fact_count()
print(f"Knowledge Base — {count} fact(s):")
kb.dump(relation_filter=args.relation)
if not any([args.assert_stmt, args.retract_stmt, args.query_stmt, args.dump]):
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -14,6 +14,7 @@ import subprocess
import argparse
import requests
import datetime
from typing import Sequence
# --- CONFIGURATION ---
FLEET = {
@@ -192,10 +193,10 @@ EXAMPLES:
"""
print(help_text)
def main():
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Self-healing infrastructure script (safe-by-default).",
add_help=False # We'll handle --help ourselves
add_help=False,
)
parser.add_argument("--dry-run", action="store_true", default=False,
help="Run in dry-run mode (default behavior).")
@@ -209,25 +210,28 @@ def main():
help="Show detailed help about safety features.")
parser.add_argument("--help", "-h", action="store_true", default=False,
help="Show standard help.")
return parser
args = parser.parse_args()
def main(argv: Sequence[str] | None = None):
parser = build_parser()
args = parser.parse_args(list(argv) if argv is not None else None)
if args.help_safe:
print_help_safe()
sys.exit(0)
raise SystemExit(0)
if args.help:
parser.print_help()
sys.exit(0)
raise SystemExit(0)
# Determine mode: if --execute is given, disable dry-run
dry_run = not args.execute
# If --dry-run is explicitly given, ensure dry-run (redundant but clear)
if args.dry_run:
dry_run = True
healer = SelfHealer(dry_run=dry_run, confirm_kill=args.confirm_kill, yes=args.yes)
healer.run()
if __name__ == "__main__":
main()

304
scripts/strips_planner.py Normal file
View File

@@ -0,0 +1,304 @@
#!/usr/bin/env python3
"""strips_planner.py - GOFAI STRIPS-style goal-directed planner for the Timmy Foundation fleet.
Implements a classical means-ends analysis (MEA) planner over a STRIPS action
representation. Each action has preconditions, an add-list, and a delete-list.
The planner uses regression (backward chaining) from the goal state to find a
linear action sequence that achieves all goal conditions from the initial state.
No ML, no embeddings - just symbolic state-space search.
Representation:
State: frozenset of ground literals, e.g. {'agent_idle', 'task_queued'}
Action: (name, preconditions, add_effects, delete_effects)
Goal: set of literals that must hold in the final state
Algorithm:
Iterative-deepening DFS (IDDFS) over the regression search space.
Cycle detection via visited-state set per path.
Usage (Python API):
from strips_planner import Action, STRIPSPlanner
actions = [
Action('assign_task',
pre={'agent_idle', 'task_queued'},
add={'task_running'},
delete={'agent_idle', 'task_queued'}),
Action('complete_task',
pre={'task_running'},
add={'agent_idle', 'task_done'},
delete={'task_running'}),
]
planner = STRIPSPlanner(actions)
plan = planner.solve(
initial={'agent_idle', 'task_queued'},
goal={'task_done'},
)
# plan -> ['assign_task', 'complete_task']
CLI:
python strips_planner.py --demo
python strips_planner.py --max-depth 15
"""
from __future__ import annotations
import argparse
import sys
from dataclasses import dataclass, field
from typing import FrozenSet, List, Optional, Set, Tuple
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
Literal = str
State = FrozenSet[Literal]
@dataclass(frozen=True)
class Action:
"""A STRIPS operator with preconditions and add/delete effects."""
name: str
pre: FrozenSet[Literal]
add: FrozenSet[Literal]
delete: FrozenSet[Literal]
def __post_init__(self) -> None:
# Coerce mutable sets to frozensets for hashability
object.__setattr__(self, 'pre', frozenset(self.pre))
object.__setattr__(self, 'add', frozenset(self.add))
object.__setattr__(self, 'delete', frozenset(self.delete))
def applicable(self, state: State) -> bool:
"""True if all preconditions hold in *state*."""
return self.pre <= state
def apply(self, state: State) -> State:
"""Return the successor state after executing this action."""
return (state - self.delete) | self.add
def __str__(self) -> str:
return self.name
# ---------------------------------------------------------------------------
# Planner
# ---------------------------------------------------------------------------
class STRIPSPlanner:
"""Goal-directed STRIPS planner using iterative-deepening DFS.
Searches forward from the initial state, pruning branches where the
goal cannot be satisfied within the remaining depth budget.
"""
def __init__(self, actions: List[Action]) -> None:
self.actions = list(actions)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def solve(
self,
initial: Set[Literal] | FrozenSet[Literal],
goal: Set[Literal] | FrozenSet[Literal],
max_depth: int = 20,
) -> Optional[List[str]]:
"""Find a plan that achieves *goal* from *initial*.
Args:
initial: Initial world state (set of ground literals).
goal: Goal conditions (set of ground literals to achieve).
max_depth: Maximum plan length to consider.
Returns:
Ordered list of action names, or None if no plan found.
"""
s0 = frozenset(initial)
g = frozenset(goal)
if g <= s0:
return [] # goal already satisfied
for depth in range(1, max_depth + 1):
result = self._dfs(s0, g, depth, [], {s0})
if result is not None:
return result
return None
# ------------------------------------------------------------------
# Internal search
# ------------------------------------------------------------------
def _dfs(
self,
state: State,
goal: State,
remaining: int,
path: List[str],
visited: Set[State],
) -> Optional[List[str]]:
"""Depth-limited forward DFS."""
if remaining == 0:
return None
for action in self.actions:
if not action.applicable(state):
continue
next_state = action.apply(state)
if next_state in visited:
continue
new_path = path + [action.name]
if goal <= next_state:
return new_path
visited.add(next_state)
result = self._dfs(next_state, goal, remaining - 1, new_path, visited)
visited.discard(next_state)
if result is not None:
return result
return None
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def explain_plan(
self, initial: Set[Literal], plan: List[str]
) -> List[Tuple[str, State]]:
"""Trace each action with the resulting state for debugging.
Returns:
List of (action_name, resulting_state) tuples.
"""
state: State = frozenset(initial)
trace: List[Tuple[str, State]] = []
action_map = {a.name: a for a in self.actions}
for name in plan:
action = action_map[name]
state = action.apply(state)
trace.append((name, state))
return trace
# ---------------------------------------------------------------------------
# Built-in demo domain: Timmy fleet task lifecycle
# ---------------------------------------------------------------------------
def _fleet_demo_actions() -> List[Action]:
"""Return a small STRIPS domain modelling the Timmy fleet task lifecycle."""
return [
Action(
name='receive_task',
pre={'fleet_idle'},
add={'task_queued', 'fleet_busy'},
delete={'fleet_idle'},
),
Action(
name='validate_task',
pre={'task_queued'},
add={'task_validated'},
delete={'task_queued'},
),
Action(
name='assign_agent',
pre={'task_validated', 'agent_available'},
add={'task_assigned'},
delete={'task_validated', 'agent_available'},
),
Action(
name='execute_task',
pre={'task_assigned'},
add={'task_running'},
delete={'task_assigned'},
),
Action(
name='complete_task',
pre={'task_running'},
add={'task_done', 'agent_available', 'fleet_idle'},
delete={'task_running', 'fleet_busy'},
),
Action(
name='report_result',
pre={'task_done'},
add={'result_reported'},
delete={'task_done'},
),
]
def run_demo(max_depth: int = 20) -> None:
"""Run the built-in Timmy fleet planning demo."""
actions = _fleet_demo_actions()
planner = STRIPSPlanner(actions)
initial: Set[Literal] = {'fleet_idle', 'agent_available'}
goal: Set[Literal] = {'result_reported', 'fleet_idle'}
print("=" * 60)
print("STRIPS Planner Demo - Timmy Fleet Task Lifecycle")
print("=" * 60)
print(f"Initial state : {sorted(initial)}")
print(f"Goal : {sorted(goal)}")
print(f"Max depth : {max_depth}")
print()
plan = planner.solve(initial, goal, max_depth=max_depth)
if plan is None:
print("No plan found within depth limit.")
return
print(f"Plan ({len(plan)} steps):")
for i, step in enumerate(plan, 1):
print(f" {i:2d}. {step}")
print()
print("Execution trace:")
state: Set[Literal] = set(initial)
for name, resulting_state in planner.explain_plan(initial, plan):
print(f" -> {name}")
print(f" state: {sorted(resulting_state)}")
print()
achieved = frozenset(goal) <= frozenset(state) or True
goal_met = all(g in [s for _, rs in planner.explain_plan(initial, plan) for s in rs]
or g in initial for g in goal)
final_state = planner.explain_plan(initial, plan)[-1][1] if plan else frozenset(initial)
print(f"Goal satisfied: {frozenset(goal) <= final_state}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="GOFAI STRIPS-style goal-directed planner"
)
parser.add_argument(
"--demo",
action="store_true",
help="Run the built-in Timmy fleet demo",
)
parser.add_argument(
"--max-depth",
type=int,
default=20,
metavar="N",
help="Maximum plan depth for IDDFS search (default: 20)",
)
args = parser.parse_args()
if args.demo or not any(vars(args).values()):
run_demo(max_depth=args.max_depth)
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,276 @@
#!/usr/bin/env python3
"""symbolic_reasoner.py — Forward-chaining rule engine for the Timmy Foundation fleet.
A classical GOFAI approach: declarative IF-THEN rules evaluated over a
working-memory of facts. Rules fire until quiescence (no new facts) or
a configurable cycle limit. Designed to sit *beside* the LLM layer so
that hard policy constraints never depend on probabilistic inference.
Usage:
python symbolic_reasoner.py --rules rules.yaml --facts facts.yaml
python symbolic_reasoner.py --self-test
"""
from __future__ import annotations
import argparse
import json
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, FrozenSet, List, Optional, Set, Tuple
try:
import yaml
except ImportError:
yaml = None # graceful fallback — JSON-only mode
# ---------------------------------------------------------------------------
# Domain types
# ---------------------------------------------------------------------------
Fact = Tuple[str, ...] # e.g. ("agent", "timmy", "role", "infrastructure")
@dataclass(frozen=True)
class Rule:
"""A single IF-THEN production rule."""
name: str
conditions: FrozenSet[Fact] # all must be present
negations: FrozenSet[Fact] # none may be present
conclusions: FrozenSet[Fact] # added when rule fires
priority: int = 0 # higher fires first
def satisfied(self, wm: Set[Fact]) -> bool:
return self.conditions.issubset(wm) and self.negations.isdisjoint(wm)
# ---------------------------------------------------------------------------
# Engine
# ---------------------------------------------------------------------------
class SymbolicReasoner:
"""Forward-chaining production system."""
def __init__(self, rules: List[Rule], *, cycle_limit: int = 200):
self._rules = sorted(rules, key=lambda r: -r.priority)
self._cycle_limit = cycle_limit
self._trace: List[str] = []
# -- public API ---------------------------------------------------------
def infer(self, initial_facts: Set[Fact]) -> Set[Fact]:
"""Run to quiescence and return the final working-memory."""
wm = set(initial_facts)
fired: Set[str] = set()
for cycle in range(self._cycle_limit):
progress = False
for rule in self._rules:
if rule.name in fired:
continue
if rule.satisfied(wm):
new = rule.conclusions - wm
if new:
wm |= new
fired.add(rule.name)
self._trace.append(
f"cycle {cycle}: {rule.name} => {_fmt_facts(new)}"
)
progress = True
break # restart from highest-priority rule
if not progress:
break
return wm
def query(self, wm: Set[Fact], pattern: Tuple[Optional[str], ...]) -> List[Fact]:
"""Return facts matching *pattern* (None = wildcard)."""
return [
f for f in wm
if len(f) == len(pattern)
and all(p is None or p == v for p, v in zip(pattern, f))
]
@property
def trace(self) -> List[str]:
return list(self._trace)
# -- serialisation helpers -----------------------------------------------
@classmethod
def from_dicts(cls, raw_rules: List[Dict], **kw) -> "SymbolicReasoner":
rules = [_parse_rule(r) for r in raw_rules]
return cls(rules, **kw)
@classmethod
def from_file(cls, path: Path, **kw) -> "SymbolicReasoner":
text = path.read_text()
if path.suffix in (".yaml", ".yml"):
if yaml is None:
raise RuntimeError("PyYAML required for .yaml rules")
data = yaml.safe_load(text)
else:
data = json.loads(text)
return cls.from_dicts(data["rules"], **kw)
# ---------------------------------------------------------------------------
# Parsing helpers
# ---------------------------------------------------------------------------
def _parse_fact(raw: list | str) -> Fact:
if isinstance(raw, str):
return tuple(raw.split())
return tuple(str(x) for x in raw)
def _parse_rule(d: Dict) -> Rule:
return Rule(
name=d["name"],
conditions=frozenset(_parse_fact(c) for c in d.get("if", [])),
negations=frozenset(_parse_fact(c) for c in d.get("unless", [])),
conclusions=frozenset(_parse_fact(c) for c in d.get("then", [])),
priority=d.get("priority", 0),
)
def _fmt_facts(facts: Set[Fact]) -> str:
return ", ".join(" ".join(f) for f in sorted(facts))
# ---------------------------------------------------------------------------
# Built-in fleet rules (loaded when no --rules file is given)
# ---------------------------------------------------------------------------
DEFAULT_FLEET_RULES: List[Dict] = [
{
"name": "route-ci-to-timmy",
"if": [["task", "category", "ci"]],
"then": [["assign", "timmy"], ["reason", "timmy", "best-ci-merge-rate"]],
"priority": 10,
},
{
"name": "route-security-to-timmy",
"if": [["task", "category", "security"]],
"then": [["assign", "timmy"], ["reason", "timmy", "security-specialist"]],
"priority": 10,
},
{
"name": "route-architecture-to-gemini",
"if": [["task", "category", "architecture"]],
"unless": [["assign", "timmy"]],
"then": [["assign", "gemini"], ["reason", "gemini", "architecture-strength"]],
"priority": 8,
},
{
"name": "route-review-to-allegro",
"if": [["task", "category", "review"]],
"then": [["assign", "allegro"], ["reason", "allegro", "highest-quality-per-pr"]],
"priority": 9,
},
{
"name": "route-frontend-to-claude",
"if": [["task", "category", "frontend"]],
"unless": [["task", "repo", "fleet-ops"]],
"then": [["assign", "claude"], ["reason", "claude", "high-volume-frontend"]],
"priority": 5,
},
{
"name": "block-merge-without-review",
"if": [["pr", "status", "open"], ["pr", "reviews", "0"]],
"then": [["pr", "action", "block-merge"], ["reason", "policy", "no-unreviewed-merges"]],
"priority": 20,
},
{
"name": "block-merge-ci-failing",
"if": [["pr", "status", "open"], ["pr", "ci", "failing"]],
"then": [["pr", "action", "block-merge"], ["reason", "policy", "ci-must-pass"]],
"priority": 20,
},
{
"name": "auto-label-hotfix",
"if": [["pr", "title-prefix", "hotfix"]],
"then": [["pr", "label", "hotfix"], ["pr", "priority", "urgent"]],
"priority": 15,
},
]
# ---------------------------------------------------------------------------
# Self-test
# ---------------------------------------------------------------------------
def _self_test() -> bool:
"""Verify core behaviour; returns True on success."""
engine = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
# Scenario 1: CI task should route to Timmy
wm = engine.infer({("task", "category", "ci")})
assert ("assign", "timmy") in wm, f"expected timmy assignment, got {wm}"
# Scenario 2: architecture task routes to gemini (not timmy)
engine2 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
wm2 = engine2.infer({("task", "category", "architecture")})
assert ("assign", "gemini") in wm2, f"expected gemini assignment, got {wm2}"
# Scenario 3: open PR with no reviews should block merge
engine3 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
wm3 = engine3.infer({("pr", "status", "open"), ("pr", "reviews", "0")})
assert ("pr", "action", "block-merge") in wm3
# Scenario 4: negation — frontend + fleet-ops should NOT assign claude
engine4 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
wm4 = engine4.infer({("task", "category", "frontend"), ("task", "repo", "fleet-ops")})
assert ("assign", "claude") not in wm4
# Scenario 5: query with wildcards
results = engine.query(wm, ("reason", None, None))
assert len(results) > 0
print("All 5 self-test scenarios passed.")
for line in engine.trace:
print(f" {line}")
return True
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--rules", type=Path, help="YAML/JSON rule file")
ap.add_argument("--facts", type=Path, help="YAML/JSON initial facts")
ap.add_argument("--self-test", action="store_true")
ap.add_argument("--json", action="store_true", help="output as JSON")
args = ap.parse_args()
if args.self_test:
ok = _self_test()
sys.exit(0 if ok else 1)
if args.rules:
engine = SymbolicReasoner.from_file(args.rules)
else:
engine = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
if args.facts:
text = args.facts.read_text()
if args.facts.suffix in (".yaml", ".yml"):
raw = yaml.safe_load(text)
else:
raw = json.loads(text)
initial = {_parse_fact(f) for f in raw.get("facts", [])}
else:
initial = set()
print("No --facts provided; running with empty working memory.")
wm = engine.infer(initial)
if args.json:
print(json.dumps({"facts": [list(f) for f in sorted(wm)], "trace": engine.trace}, indent=2))
else:
print(f"Final working memory ({len(wm)} facts):")
for f in sorted(wm):
print(f" {' '.join(f)}")
if engine.trace:
print(f"\nInference trace ({len(engine.trace)} firings):")
for line in engine.trace:
print(f" {line}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,307 @@
#!/usr/bin/env python3
"""temporal_reasoner.py - GOFAI temporal reasoning engine for the Timmy Foundation fleet.
A symbolic temporal constraint network (TCN) for scheduling and ordering events.
Models Allen's interval algebra relations (before, after, meets, overlaps, etc.)
and propagates temporal constraints via path-consistency to detect conflicts.
No ML, no embeddings - just constraint propagation over a temporal graph.
Core concepts:
TimePoint: A named instant on a symbolic timeline.
Interval: A pair of time-points (start, end) with start < end.
Constraint: A relation between two time-points or intervals
(e.g. A.before(B), A.meets(B)).
Usage (Python API):
from temporal_reasoner import TemporalNetwork, Interval
tn = TemporalNetwork()
deploy = tn.add_interval('deploy', duration=(10, 30))
test = tn.add_interval('test', duration=(5, 15))
tn.add_constraint(deploy, 'before', test)
consistent = tn.propagate()
CLI:
python temporal_reasoner.py --demo
"""
from __future__ import annotations
import argparse
import sys
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Set, Tuple
INF = float('inf')
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class TimePoint:
"""A named instant on the timeline."""
name: str
id: int = field(default=0)
def __str__(self) -> str:
return self.name
@dataclass
class Interval:
"""A named interval bounded by two time-points."""
name: str
start: int # index into the distance matrix
end: int # index into the distance matrix
def __str__(self) -> str:
return self.name
class Relation(Enum):
"""Allen's interval algebra relations (simplified subset)."""
BEFORE = 'before'
AFTER = 'after'
MEETS = 'meets'
MET_BY = 'met_by'
OVERLAPS = 'overlaps'
DURING = 'during'
EQUALS = 'equals'
# ---------------------------------------------------------------------------
# Simple Temporal Network (STN) via distance matrix
# ---------------------------------------------------------------------------
class TemporalNetwork:
"""Simple Temporal Network with Floyd-Warshall propagation.
Internally maintains a distance matrix D where D[i][j] is the
maximum allowed distance from time-point i to time-point j.
Negative cycles indicate inconsistency.
"""
def __init__(self) -> None:
self._n = 0
self._names: List[str] = []
self._dist: List[List[float]] = []
self._intervals: Dict[str, Interval] = {}
self._origin_idx: int = -1
self._add_point('origin')
self._origin_idx = 0
# ------------------------------------------------------------------
# Point management
# ------------------------------------------------------------------
def _add_point(self, name: str) -> int:
"""Add a time-point and return its index."""
idx = self._n
self._n += 1
self._names.append(name)
# Extend distance matrix
for row in self._dist:
row.append(INF)
self._dist.append([INF] * self._n)
self._dist[idx][idx] = 0.0
return idx
# ------------------------------------------------------------------
# Interval management
# ------------------------------------------------------------------
def add_interval(
self,
name: str,
duration: Optional[Tuple[float, float]] = None,
) -> Interval:
"""Add a named interval with optional duration bounds [lo, hi].
Returns the Interval object with start/end indices.
"""
s = self._add_point(f"{name}.start")
e = self._add_point(f"{name}.end")
# start < end (at least 1 time unit)
self._dist[s][e] = min(self._dist[s][e], duration[1] if duration else INF)
self._dist[e][s] = min(self._dist[e][s], -(duration[0] if duration else 1))
interval = Interval(name=name, start=s, end=e)
self._intervals[name] = interval
return interval
# ------------------------------------------------------------------
# Constraint management
# ------------------------------------------------------------------
def add_distance_constraint(
self, i: int, j: int, lo: float, hi: float
) -> None:
"""Add constraint: lo <= t_j - t_i <= hi."""
self._dist[i][j] = min(self._dist[i][j], hi)
self._dist[j][i] = min(self._dist[j][i], -lo)
def add_constraint(
self, a: Interval, relation: str, b: Interval, gap: Tuple[float, float] = (0, INF)
) -> None:
"""Add an Allen-style relation between two intervals.
Supported relations: before, after, meets, met_by, equals.
"""
rel = relation.lower()
if rel == 'before':
# a.end + gap <= b.start
self.add_distance_constraint(a.end, b.start, gap[0], gap[1])
elif rel == 'after':
self.add_distance_constraint(b.end, a.start, gap[0], gap[1])
elif rel == 'meets':
# a.end == b.start
self.add_distance_constraint(a.end, b.start, 0, 0)
elif rel == 'met_by':
self.add_distance_constraint(b.end, a.start, 0, 0)
elif rel == 'equals':
self.add_distance_constraint(a.start, b.start, 0, 0)
self.add_distance_constraint(a.end, b.end, 0, 0)
else:
raise ValueError(f"Unsupported relation: {relation}")
# ------------------------------------------------------------------
# Propagation (Floyd-Warshall)
# ------------------------------------------------------------------
def propagate(self) -> bool:
"""Run Floyd-Warshall to propagate all constraints.
Returns True if the network is consistent (no negative cycles).
"""
n = self._n
d = self._dist
for k in range(n):
for i in range(n):
for j in range(n):
if d[i][k] + d[k][j] < d[i][j]:
d[i][j] = d[i][k] + d[k][j]
# Check for negative cycles
for i in range(n):
if d[i][i] < 0:
return False
return True
def is_consistent(self) -> bool:
"""Check consistency without mutating (copies matrix first)."""
import copy
saved = copy.deepcopy(self._dist)
result = self.propagate()
self._dist = saved
return result
# ------------------------------------------------------------------
# Query
# ------------------------------------------------------------------
def earliest(self, point_idx: int) -> float:
"""Earliest possible time for a point (relative to origin)."""
return -self._dist[point_idx][self._origin_idx]
def latest(self, point_idx: int) -> float:
"""Latest possible time for a point (relative to origin)."""
return self._dist[self._origin_idx][point_idx]
def interval_bounds(self, interval: Interval) -> Dict[str, Tuple[float, float]]:
"""Return earliest/latest start and end for an interval."""
return {
'start': (self.earliest(interval.start), self.latest(interval.start)),
'end': (self.earliest(interval.end), self.latest(interval.end)),
}
# ------------------------------------------------------------------
# Display
# ------------------------------------------------------------------
def dump(self) -> None:
"""Print the current distance matrix and interval bounds."""
print(f"Temporal Network — {self._n} time-points, {len(self._intervals)} intervals")
print()
for name, interval in self._intervals.items():
bounds = self.interval_bounds(interval)
s_lo, s_hi = bounds['start']
e_lo, e_hi = bounds['end']
print(f" {name}:")
print(f" start: [{s_lo:.1f}, {s_hi:.1f}]")
print(f" end: [{e_lo:.1f}, {e_hi:.1f}]")
# ---------------------------------------------------------------------------
# Demo: Timmy fleet deployment pipeline
# ---------------------------------------------------------------------------
def run_demo() -> None:
"""Run a demo temporal reasoning scenario for the Timmy fleet."""
print("=" * 60)
print("Temporal Reasoner Demo - Fleet Deployment Pipeline")
print("=" * 60)
print()
tn = TemporalNetwork()
# Define pipeline stages with duration bounds [min, max]
build = tn.add_interval('build', duration=(5, 15))
test = tn.add_interval('test', duration=(10, 30))
review = tn.add_interval('review', duration=(2, 10))
deploy = tn.add_interval('deploy', duration=(1, 5))
monitor = tn.add_interval('monitor', duration=(20, 60))
# Temporal constraints
tn.add_constraint(build, 'meets', test) # test starts when build ends
tn.add_constraint(test, 'before', review, gap=(0, 5)) # review within 5 of test
tn.add_constraint(review, 'meets', deploy) # deploy immediately after review
tn.add_constraint(deploy, 'before', monitor, gap=(0, 2)) # monitor within 2 of deploy
# Global deadline: everything done within 120 time units
tn.add_distance_constraint(tn._origin_idx, monitor.end, 0, 120)
# Build must start within first 10 units
tn.add_distance_constraint(tn._origin_idx, build.start, 0, 10)
print("Constraints added. Propagating...")
consistent = tn.propagate()
print(f"Network consistent: {consistent}")
print()
if consistent:
tn.dump()
print()
# Now add a conflicting constraint to show inconsistency detection
print("--- Adding conflicting constraint: monitor.before(build) ---")
tn.add_constraint(monitor, 'before', build)
consistent2 = tn.propagate()
print(f"Network consistent after conflict: {consistent2}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="GOFAI temporal reasoning engine"
)
parser.add_argument(
"--demo",
action="store_true",
help="Run the fleet deployment pipeline demo",
)
args = parser.parse_args()
if args.demo or not any(vars(args).values()):
run_demo()
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,62 @@
"""Tests for scripts/self_healing.py safe CLI behavior."""
from __future__ import annotations
import importlib.util
from pathlib import Path
from unittest.mock import MagicMock
import pytest
REPO_ROOT = Path(__file__).parent.parent
spec = importlib.util.spec_from_file_location("self_healing", REPO_ROOT / "scripts" / "self_healing.py")
sh = importlib.util.module_from_spec(spec)
spec.loader.exec_module(sh)
class TestMainCli:
def test_help_exits_without_running_healer(self, monkeypatch, capsys):
healer_cls = MagicMock()
monkeypatch.setattr(sh, "SelfHealer", healer_cls)
with pytest.raises(SystemExit) as excinfo:
sh.main(["--help"])
assert excinfo.value.code == 0
healer_cls.assert_not_called()
out = capsys.readouterr().out
assert "--execute" in out
assert "--help-safe" in out
def test_help_safe_exits_without_running_healer(self, monkeypatch, capsys):
healer_cls = MagicMock()
monkeypatch.setattr(sh, "SelfHealer", healer_cls)
with pytest.raises(SystemExit) as excinfo:
sh.main(["--help-safe"])
assert excinfo.value.code == 0
healer_cls.assert_not_called()
out = capsys.readouterr().out
assert "DRY-RUN" in out
assert "--confirm-kill" in out
def test_default_invocation_runs_in_dry_run_mode(self, monkeypatch):
healer = MagicMock()
healer_cls = MagicMock(return_value=healer)
monkeypatch.setattr(sh, "SelfHealer", healer_cls)
sh.main([])
healer_cls.assert_called_once_with(dry_run=True, confirm_kill=False, yes=False)
healer.run.assert_called_once_with()
def test_execute_flag_disables_dry_run(self, monkeypatch):
healer = MagicMock()
healer_cls = MagicMock(return_value=healer)
monkeypatch.setattr(sh, "SelfHealer", healer_cls)
sh.main(["--execute", "--yes", "--confirm-kill"])
healer_cls.assert_called_once_with(dry_run=False, confirm_kill=True, yes=True)
healer.run.assert_called_once_with()