Compare commits

...

10 Commits

Author SHA1 Message Date
Alexander Whitestone
341abab2a0 feat: Priority Rebalancer — re-score issues from pipeline data (#174)
Monthly pipeline tool that:
- Reads knowledge store, metrics, and staleness data
- Scores all open issues across the org
- Suggests priority upgrades/downgrades based on accumulated signals
- Generates JSON + markdown reports
- Optional --apply mode to push changes via Gitea API

Signals detected:
- Stale/missing knowledge entries
- Empty knowledge store
- Missing metrics output
- Low repo coverage
- Issue age, activity, assignment status
- Keyword/label analysis

Usage:
  python3 scripts/priority_rebalancer.py --org Timmy_Foundation
  python3 scripts/priority_rebalancer.py --org Timmy_Foundation --apply
  python3 scripts/priority_rebalancer.py --org Timmy_Foundation --json

23 tests, all passing.
2026-04-15 10:52:51 -04:00
e6f1b07f16 Merge pull request 'feat: Knowledge store staleness detector (closes #179)' (#185) from feat/179-staleness-check into main 2026-04-15 06:09:14 +00:00
81c02f6709 feat: Add staleness detector tests (closes #179) 2026-04-15 04:00:46 +00:00
c2c3c6a3b9 feat: Add knowledge staleness detector (closes #179) 2026-04-15 04:00:12 +00:00
8d716ff03f Add comprehensive test script for harvest prompt validation 2026-04-14 19:02:41 +00:00
920510996e Add test session 5: Session with questions 2026-04-14 19:01:03 +00:00
1fafeaf5a4 Add test session 4: Session with patterns 2026-04-14 19:01:00 +00:00
36b440f998 Add test session 3: Partial session with tool quirks 2026-04-14 19:00:58 +00:00
9f3caabf42 Add test session 2: Failed session with pitfalls 2026-04-14 19:00:56 +00:00
a21f3a44e1 Add test session 1: Successful session 2026-04-14 18:58:05 +00:00
10 changed files with 1526 additions and 0 deletions

View File

@@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes.
Usage:
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
"""
import argparse
import hashlib
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return None
def check_staleness(index_path: str, repo_root: str = ".") -> List[Dict[str, Any]]:
"""Check all entries in knowledge index for staleness.
Returns list of entries with staleness info:
- status: "fresh" | "stale" | "missing_source" | "no_hash"
- current_hash: computed hash (if source exists)
- stored_hash: hash from index
"""
with open(index_path) as f:
data = json.load(f)
facts = data.get("facts", [])
results = []
for entry in facts:
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results.append({**entry, "status": "no_source", "current_hash": None})
continue
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash is None:
results.append({**entry, "status": "missing_source", "current_hash": None})
elif not stored_hash:
results.append({**entry, "status": "no_hash", "current_hash": current_hash})
elif current_hash != stored_hash:
results.append({**entry, "status": "stale", "current_hash": current_hash})
else:
results.append({**entry, "status": "fresh", "current_hash": current_hash})
return results
def fix_hashes(index_path: str, repo_root: str = ".") -> int:
"""Add hashes to entries missing them. Returns count of fixed entries."""
with open(index_path) as f:
data = json.load(f)
fixed = 0
for entry in data.get("facts", []):
if entry.get("source_hash"):
continue
source_file = entry.get("source_file")
if not source_file:
continue
full_path = os.path.join(repo_root, source_file)
h = compute_file_hash(full_path)
if h:
entry["source_hash"] = h
fixed += 1
with open(index_path, "w") as f:
json.dump(data, f, indent=2)
return fixed
def main():
parser = argparse.ArgumentParser(description="Check knowledge store staleness")
parser.add_argument("--index", required=True, help="Path to knowledge/index.json")
parser.add_argument("--repo", default=".", help="Repo root for source file resolution")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--fix", action="store_true", help="Add hashes to entries missing them")
args = parser.parse_args()
if args.fix:
fixed = fix_hashes(args.index, args.repo)
print(f"Fixed {fixed} entries with missing hashes.")
return
results = check_staleness(args.index, args.repo)
if args.json:
print(json.dumps(results, indent=2))
else:
stale = [r for r in results if r["status"] != "fresh"]
fresh = [r for r in results if r["status"] == "fresh"]
print(f"Knowledge Store Staleness Check")
print(f" Total entries: {len(results)}")
print(f" Fresh: {len(fresh)}")
print(f" Stale/Issues: {len(stale)}")
print()
if stale:
print("Issues found:")
for r in stale:
status = r["status"]
fact = r.get("fact", "?")[:60]
source = r.get("source_file", "?")
print(f" [{status}] {source}: {fact}")
else:
print("All entries are fresh!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,682 @@
#!/usr/bin/env python3
"""
Priority Rebalancer — Re-evaluate issue priorities based on accumulated data.
Reads pipeline outputs, knowledge store, and Gitea issues to suggest
priority changes based on what the fleet has learned.
Usage:
python3 scripts/priority_rebalancer.py --org Timmy_Foundation
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --repo compounding-intelligence
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --dry-run
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --apply
Output:
metrics/priority_report.json — full analysis
metrics/priority_suggestions.md — human-readable suggestions
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field, asdict
from collections import Counter, defaultdict
import urllib.request
import urllib.error
# ============================================================
# Data Models
# ============================================================
@dataclass
class IssueScore:
issue_id: int
repo: str
title: str
current_labels: List[str]
current_priority: Optional[str]
suggested_priority: Optional[str]
score: float
reasons: List[str]
age_days: int
comment_count: int
assignee: Optional[str]
dependencies: List[str] = field(default_factory=list)
blocking: List[str] = field(default_factory=list)
@dataclass
class PipelineSignal:
source: str # "knowledge", "metrics", "sessions", "staleness"
signal_type: str # "stale_knowledge", "high_error_rate", "missing_coverage", etc.
weight: float # 0.0 - 1.0
detail: str
affected_repos: List[str] = field(default_factory=list)
affected_issues: List[int] = field(default_factory=list)
# ============================================================
# Gitea API Client
# ============================================================
class GiteaClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.token = token
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}/api/v1{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code} for {path}: {e.read().decode()[:200]}", file=sys.stderr)
return None
def get_org_repos(self, org: str) -> List[Dict]:
repos = []
page = 1
while True:
batch = self._request(f"/orgs/{org}/repos", {"limit": 50, "page": page})
if not batch:
break
repos.extend(batch)
if len(batch) < 50:
break
page += 1
return repos
def get_issues(self, org: str, repo: str, state: str = "open") -> List[Dict]:
issues = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/issues",
{"state": state, "limit": 50, "page": page, "type": "issues"})
if not batch:
break
issues.extend(batch)
if len(batch) < 50:
break
page += 1
return issues
def add_label_to_issue(self, org: str, repo: str, issue_num: int, label_ids: List[int]) -> bool:
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels"
data = json.dumps({"labels": label_ids}).encode()
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 200
except Exception:
return False
def remove_label_from_issue(self, org: str, repo: str, issue_num: int, label_id: int) -> bool:
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels/{label_id}"
req = urllib.request.Request(url, method="DELETE")
req.add_header("Authorization", f"token {self.token}")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 200
except Exception:
return False
def get_repo_labels(self, org: str, repo: str) -> List[Dict]:
labels = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/labels", {"limit": 50, "page": page})
if not batch:
break
labels.extend(batch)
if len(batch) < 50:
break
page += 1
return labels
def add_comment(self, org: str, repo: str, issue_num: int, body: str) -> bool:
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/comments"
data = json.dumps({"body": body}).encode()
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 201
except Exception:
return False
# ============================================================
# Pipeline Signal Collectors
# ============================================================
def collect_knowledge_signals(knowledge_dir: str) -> List[PipelineSignal]:
"""Analyze knowledge store for coverage gaps and staleness."""
signals = []
index_path = os.path.join(knowledge_dir, "index.json")
if not os.path.exists(index_path):
signals.append(PipelineSignal(
source="knowledge",
signal_type="missing_index",
weight=0.8,
detail="knowledge/index.json not found — no knowledge base exists"
))
return signals
try:
with open(index_path) as f:
data = json.load(f)
except (json.JSONDecodeError, IOError) as e:
signals.append(PipelineSignal(
source="knowledge",
signal_type="corrupt_index",
weight=0.9,
detail=f"knowledge/index.json is corrupt: {e}"
))
return signals
facts = data.get("facts", [])
total = len(facts)
if total == 0:
signals.append(PipelineSignal(
source="knowledge",
signal_type="empty_knowledge",
weight=0.7,
detail="Knowledge store has 0 facts — harvester not running or not finding sessions"
))
return signals
# Check staleness
stale_count = 0
missing_source = 0
for fact in facts:
status = fact.get("status", "unknown")
if status == "stale":
stale_count += 1
elif status in ("missing_source", "no_source"):
missing_source += 1
if stale_count > 0:
signals.append(PipelineSignal(
source="knowledge",
signal_type="stale_knowledge",
weight=min(1.0, stale_count / max(1, total)),
detail=f"{stale_count}/{total} facts are stale (source files changed)"
))
if missing_source > 0:
signals.append(PipelineSignal(
source="knowledge",
signal_type="missing_sources",
weight=min(1.0, missing_source / max(1, total)),
detail=f"{missing_source}/{total} facts have missing source files"
))
# Coverage by repo
repo_counts = Counter(f.get("repo", "unknown") for f in facts)
if len(repo_counts) < 3:
signals.append(PipelineSignal(
source="knowledge",
signal_type="low_coverage",
weight=0.5,
detail=f"Knowledge covers only {len(repo_counts)} repos — expand harvester scope",
affected_repos=list(repo_counts.keys())
))
return signals
def collect_staleness_signals(scripts_dir: str, knowledge_dir: str) -> List[PipelineSignal]:
"""Run staleness checker if available."""
signals = []
checker = os.path.join(scripts_dir, "knowledge_staleness_check.py")
index_path = os.path.join(knowledge_dir, "index.json")
if not os.path.exists(checker) or not os.path.exists(index_path):
return signals
try:
import subprocess
result = subprocess.run(
["python3", checker, "--index", index_path, "--json"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
data = json.loads(result.stdout)
stale = data.get("stale_count", 0)
total = data.get("total", 0)
if stale > 0:
signals.append(PipelineSignal(
source="staleness",
signal_type="stale_knowledge",
weight=min(1.0, stale / max(1, total)),
detail=f"Staleness checker found {stale}/{total} stale entries"
))
except Exception:
pass
return signals
def collect_metrics_signals(metrics_dir: str) -> List[PipelineSignal]:
"""Analyze metrics directory for pipeline health."""
signals = []
if not os.path.isdir(metrics_dir):
return signals
files = os.listdir(metrics_dir)
if len(files) <= 1: # Only .gitkeep
signals.append(PipelineSignal(
source="metrics",
signal_type="no_metrics",
weight=0.6,
detail="Metrics directory is empty — measurer pipeline not producing output"
))
return signals
# ============================================================
# Priority Scoring Engine
# ============================================================
PRIORITY_LEVELS = ["P0", "P1", "P2", "P3", "none"]
PRIORITY_LABELS = {"p0": "P0", "p1": "P1", "p2": "P2", "p3": "P3",
"priority:critical": "P0", "priority:high": "P1",
"priority:medium": "P2", "priority:low": "P3"}
def extract_priority(labels: List[str]) -> Optional[str]:
"""Extract priority level from issue labels."""
for label in labels:
lower = label.lower().strip()
if lower in PRIORITY_LABELS:
return PRIORITY_LABELS[lower]
return None
def compute_issue_score(
issue: Dict,
repo: str,
signals: List[PipelineSignal],
now: datetime
) -> IssueScore:
"""Compute priority score for a single issue."""
labels = [l.get("name", "") if isinstance(l, dict) else l for l in issue.get("labels", [])]
title = issue.get("title", "")
issue_id = issue.get("number", 0)
current_priority = extract_priority(labels)
# Parse dates
created_str = issue.get("created_at", "")
try:
created = datetime.fromisoformat(created_str.replace("Z", "+00:00"))
age_days = (now - created.replace(tzinfo=None)).days
except (ValueError, AttributeError):
age_days = 0
comment_count = issue.get("comments", 0)
assignee = None
assignees = issue.get("assignees") or []
if assignees:
assignee = assignees[0].get("login") if isinstance(assignees[0], dict) else str(assignees[0])
# Base score
score = 0.0
reasons = []
# Age factor: older issues drift down unless actively discussed
if age_days > 90 and comment_count < 2:
score -= 15
reasons.append(f"Dormant: {age_days} days old with only {comment_count} comments")
elif age_days > 30:
score -= 5
reasons.append(f"Aging: {age_days} days old")
# Activity factor: recent discussion suggests urgency
if comment_count > 5:
score += 10
reasons.append(f"Active discussion: {comment_count} comments")
elif comment_count > 0:
score += 3
# Assignment factor: unassigned issues need triage
if not assignee:
score += 5
reasons.append("Unassigned — needs triage")
# Pipeline signal alignment
for signal in signals:
title_lower = title.lower()
if signal.signal_type == "stale_knowledge" and "stale" in title_lower:
score += signal.weight * 20
reasons.append(f"Matches signal: {signal.detail}")
elif signal.signal_type == "empty_knowledge" and ("harvester" in title_lower or "knowledge" in title_lower):
score += signal.weight * 25
reasons.append(f"Critical gap: {signal.detail}")
elif signal.signal_type == "no_metrics" and "measur" in title_lower:
score += signal.weight * 15
reasons.append(f"Pipeline gap: {signal.detail}")
elif signal.signal_type == "low_coverage" and any(r.lower() in title_lower for r in signal.affected_repos):
score += signal.weight * 10
reasons.append(f"Coverage gap: {signal.detail}")
# Keyword boosts
keyword_scores = {
"broken": 20, "bug": 15, "fix": 10, "error": 12, "fail": 15,
"security": 25, "auth": 20, "data loss": 30, "crash": 25,
"blocker": 20, "urgent": 15, "critical": 15,
"epic": 8, "feature": -3, "nice to have": -10, "someday": -15
}
title_lower = title.lower()
for keyword, boost in keyword_scores.items():
if keyword in title_lower:
score += boost
if boost > 0:
reasons.append(f"Keyword match: '{keyword}' (+{boost})")
# Label-based adjustments
for label in labels:
lower = label.lower()
if lower == "pipeline":
score += 5 # Pipeline issues are infrastructure
elif lower == "bug":
score += 12
elif lower == "enhancement":
score -= 2
elif lower == "documentation":
score -= 5
elif "epic" in lower:
score += 3
# Determine suggested priority
if score >= 40:
suggested = "P0"
elif score >= 25:
suggested = "P1"
elif score >= 10:
suggested = "P2"
elif score >= 0:
suggested = "P3"
else:
suggested = None # Consider closing or deprioritizing
return IssueScore(
issue_id=issue_id,
repo=repo,
title=title,
current_labels=labels,
current_priority=current_priority,
suggested_priority=suggested,
score=round(score, 1),
reasons=reasons if reasons else ["No strong signals"],
age_days=age_days,
comment_count=comment_count,
assignee=assignee
)
# ============================================================
# Report Generation
# ============================================================
def generate_report(
scores: List[IssueScore],
signals: List[PipelineSignal],
org: str,
repos_scanned: List[str]
) -> Dict[str, Any]:
"""Generate the full priority report."""
now = datetime.now(timezone.utc).isoformat()
# Categorize changes
upgrades = [s for s in scores if s.suggested_priority and s.current_priority and
PRIORITY_LEVELS.index(s.suggested_priority) < PRIORITY_LEVELS.index(s.current_priority)]
downgrades = [s for s in scores if s.suggested_priority and s.current_priority and
PRIORITY_LEVELS.index(s.suggested_priority) > PRIORITY_LEVELS.index(s.current_priority)]
new_assignments = [s for s in scores if s.suggested_priority and not s.current_priority]
no_change = [s for s in scores if s.suggested_priority == s.current_priority]
return {
"generated_at": now,
"org": org,
"repos_scanned": repos_scanned,
"total_issues": len(scores),
"signals": [asdict(s) for s in signals],
"summary": {
"suggested_upgrades": len(upgrades),
"suggested_downgrades": len(downgrades),
"suggested_new_priorities": len(new_assignments),
"unchanged": len(no_change)
},
"top_priority": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)[:10]],
"upgrades": [asdict(s) for s in upgrades],
"downgrades": [asdict(s) for s in downgrades],
"new_assignments": [asdict(s) for s in new_assignments],
"all_scores": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)]
}
def generate_markdown_report(report: Dict[str, Any]) -> str:
"""Generate human-readable markdown report."""
lines = []
lines.append("# Priority Rebalancer Report")
lines.append(f"**Generated:** {report['generated_at']}")
lines.append(f"**Org:** {report['org']}")
lines.append(f"**Repos scanned:** {', '.join(report['repos_scanned'])}")
lines.append(f"**Issues analyzed:** {report['total_issues']}")
lines.append("")
# Signals
if report["signals"]:
lines.append("## Pipeline Signals")
for sig in report["signals"]:
weight_bar = "" * int(sig["weight"] * 10) + "" * (10 - int(sig["weight"] * 10))
lines.append(f"- [{weight_bar}] **{sig['source']}/{sig['signal_type']}** — {sig['detail']}")
lines.append("")
# Summary
s = report["summary"]
lines.append("## Summary")
lines.append(f"- Suggested upgrades: **{s['suggested_upgrades']}**")
lines.append(f"- Suggested downgrades: **{s['suggested_downgrades']}**")
lines.append(f"- New priority assignments: **{s['suggested_new_priorities']}**")
lines.append(f"- Unchanged: **{s['unchanged']}**")
lines.append("")
# Top 10
lines.append("## Top 10 by Score")
for i, item in enumerate(report["top_priority"][:10], 1):
cur = item["current_priority"] or "none"
sug = item["suggested_priority"] or "none"
arrow = "" if PRIORITY_LEVELS.index(sug) < PRIORITY_LEVELS.index(cur) else "" if PRIORITY_LEVELS.index(sug) > PRIORITY_LEVELS.index(cur) else ""
lines.append(f"{i}. **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" Score: {item['score']} | Current: {cur} {arrow} Suggested: {sug}")
lines.append(f" Reasons: {'; '.join(item['reasons'][:3])}")
lines.append("")
# Upgrades
if report["upgrades"]:
lines.append("## Suggested Upgrades")
for item in report["upgrades"]:
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" {item['current_priority']}{item['suggested_priority']} (score: {item['score']})")
lines.append(f" {'; '.join(item['reasons'][:2])}")
lines.append("")
# Downgrades
if report["downgrades"]:
lines.append("## Suggested Downgrades")
for item in report["downgrades"]:
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" {item['current_priority']}{item['suggested_priority']} (score: {item['score']})")
lines.append(f" {'; '.join(item['reasons'][:2])}")
lines.append("")
# New assignments
if report["new_assignments"]:
lines.append("## New Priority Suggestions (currently unlabelled)")
for item in report["new_assignments"][:20]:
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" Suggested: {item['suggested_priority']} (score: {item['score']})")
lines.append(f" {'; '.join(item['reasons'][:2])}")
lines.append("")
return "\n".join(lines)
# ============================================================
# Main
# ============================================================
def main():
parser = argparse.ArgumentParser(description="Priority Rebalancer — re-score issues based on pipeline data")
parser.add_argument("--org", default="Timmy_Foundation", help="Gitea org name")
parser.add_argument("--repo", help="Single repo to analyze (default: all)")
parser.add_argument("--base-url", default="https://forge.alexanderwhitestone.com", help="Gitea base URL")
parser.add_argument("--knowledge-dir", default=None, help="Path to knowledge directory")
parser.add_argument("--metrics-dir", default=None, help="Path to metrics directory")
parser.add_argument("--scripts-dir", default=None, help="Path to scripts directory")
parser.add_argument("--output-dir", default=None, help="Path to output directory")
parser.add_argument("--dry-run", action="store_true", help="Show what would change without applying")
parser.add_argument("--apply", action="store_true", help="Apply priority changes via API")
parser.add_argument("--json", action="store_true", help="Output JSON instead of markdown")
args = parser.parse_args()
# Resolve paths relative to script location
script_dir = Path(__file__).parent
repo_root = script_dir.parent
knowledge_dir = args.knowledge_dir or str(repo_root / "knowledge")
metrics_dir = args.metrics_dir or str(repo_root / "metrics")
scripts_dir = args.scripts_dir or str(repo_root / "scripts")
output_dir = args.output_dir or str(repo_root / "metrics")
# Get token
token = os.environ.get("GITEA_TOKEN")
if not token:
token_path = os.path.expanduser("~/.config/gitea/token")
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
if not token:
print("Error: No Gitea token found. Set GITEA_TOKEN or create ~/.config/gitea/token", file=sys.stderr)
sys.exit(1)
client = GiteaClient(args.base_url, token)
now = datetime.utcnow()
# Collect pipeline signals
print("Collecting pipeline signals...", file=sys.stderr)
signals = []
signals.extend(collect_knowledge_signals(knowledge_dir))
signals.extend(collect_staleness_signals(scripts_dir, knowledge_dir))
signals.extend(collect_metrics_signals(metrics_dir))
print(f" Found {len(signals)} signals", file=sys.stderr)
# Get repos
if args.repo:
repos = [{"name": args.repo}]
else:
repos = client.get_org_repos(args.org)
repo_names = [r["name"] for r in repos]
print(f"Scanning {len(repo_names)} repos: {', '.join(repo_names[:5])}{'...' if len(repo_names) > 5 else ''}", file=sys.stderr)
# Score all issues
all_scores = []
for repo in repos:
repo_name = repo["name"]
issues = client.get_issues(args.org, repo_name)
print(f" {repo_name}: {len(issues)} open issues", file=sys.stderr)
for issue in issues:
if issue.get("pull_request"):
continue
score = compute_issue_score(issue, repo_name, signals, now)
all_scores.append(score)
# Generate report
report = generate_report(all_scores, signals, args.org, repo_names)
# Output
os.makedirs(output_dir, exist_ok=True)
if args.json:
print(json.dumps(report, indent=2))
else:
md = generate_markdown_report(report)
print(md)
# Write files
report_path = os.path.join(output_dir, "priority_report.json")
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
print(f"\nFull report: {report_path}", file=sys.stderr)
md_path = os.path.join(output_dir, "priority_suggestions.md")
with open(md_path, "w") as f:
f.write(generate_markdown_report(report))
print(f"Suggestions: {md_path}", file=sys.stderr)
# Apply changes if requested
if args.apply:
print("\nApplying priority changes...", file=sys.stderr)
applied = 0
# Get label IDs for priority labels
priority_label_map = {}
for repo_name in repo_names:
labels = client.get_repo_labels(args.org, repo_name)
for label in labels:
name = label.get("name", "").lower()
if name in ("p0", "p1", "p2", "p3"):
priority_label_map[(repo_name, name)] = label["id"]
for score in all_scores:
if score.suggested_priority and score.suggested_priority != score.current_priority:
sug_lower = score.suggested_priority.lower()
label_id = priority_label_map.get((score.repo, sug_lower))
if label_id:
ok = client.add_label_to_issue(args.org, score.repo, score.issue_id, [label_id])
if ok:
applied += 1
print(f" Applied: [{score.repo}#{score.issue_id}] → {score.suggested_priority}", file=sys.stderr)
# Add comment explaining the change
comment = f"**Priority Rebalancer** suggested: **{score.suggested_priority}** (was: {score.current_priority or 'none'})\n\n"
comment += f"Score: {score.score}\n"
comment += f"Reasons:\n"
for r in score.reasons[:5]:
comment += f"- {r}\n"
client.add_comment(args.org, score.repo, score.issue_id, comment)
print(f"Applied {applied} priority changes", file=sys.stderr)
elif args.dry_run:
print(f"\nDry run — {report['summary']['suggested_upgrades'] + report['summary']['suggested_downgrades'] + report['summary']['suggested_new_priorities']} changes would be applied", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,212 @@
#!/usr/bin/env python3
"""
Comprehensive test script for knowledge extraction prompt.
Validates prompt structure, requirements, and consistency.
"""
import json
import re
from pathlib import Path
def test_prompt_structure():
"""Test that the prompt has the required structure."""
prompt_path = Path("templates/harvest-prompt.md")
if not prompt_path.exists():
return False, "harvest-prompt.md not found"
content = prompt_path.read_text()
# Check for required sections
required_sections = [
"System Prompt",
"Instructions",
"Categories",
"Output Format",
"Confidence Scoring",
"Constraints",
"Example"
]
for section in required_sections:
if section.lower() not in content.lower():
return False, f"Missing required section: {section}"
# Check for required categories
required_categories = ["fact", "pitfall", "pattern", "tool-quirk", "question"]
for category in required_categories:
if category not in content:
return False, f"Missing required category: {category}"
# Check for required output fields
required_fields = ["fact", "category", "repo", "confidence"]
for field in required_fields:
if field not in content:
return False, f"Missing required output field: {field}"
# Check prompt size (should be ~1k tokens, roughly 4k chars)
if len(content) > 5000:
return False, f"Prompt too large: {len(content)} chars (max ~5000)"
if len(content) < 1000:
return False, f"Prompt too small: {len(content)} chars (min ~1000)"
return True, "Prompt structure is valid"
def test_confidence_scoring():
"""Test that confidence scoring is properly defined."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
# Check for confidence scale definitions
confidence_levels = [
("0.9-1.0", "explicitly stated"),
("0.7-0.8", "clearly implied"),
("0.5-0.6", "suggested"),
("0.3-0.4", "inferred"),
("0.1-0.2", "speculative")
]
for level, description in confidence_levels:
if level not in content:
return False, f"Missing confidence level: {level}"
if description.lower() not in content.lower():
return False, f"Missing confidence description: {description}"
return True, "Confidence scoring is properly defined"
def test_example_quality():
"""Test that examples are clear and complete."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
# Check for example input/output
if "example" not in content.lower():
return False, "No examples provided"
# Check that example includes all categories
example_section = content[content.lower().find("example"):]
# Look for JSON example
json_match = re.search(r'\{[\s\S]*"knowledge"[\s\S]*\}', example_section)
if not json_match:
return False, "No JSON example found"
example_json = json_match.group(0)
# Check for all categories in example
for category in ["fact", "pitfall", "pattern", "tool-quirk", "question"]:
if category not in example_json:
return False, f"Example missing category: {category}"
return True, "Examples are clear and complete"
def test_constraint_coverage():
"""Test that constraints cover all requirements."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
required_constraints = [
"No hallucination",
"only extract",
"explicitly",
"partial",
"failed sessions",
"1k tokens"
]
for constraint in required_constraints:
if constraint.lower() not in content.lower():
return False, f"Missing constraint: {constraint}"
return True, "Constraints cover all requirements"
def test_test_sessions():
"""Test that test sessions exist and are valid."""
test_sessions_dir = Path("test_sessions")
if not test_sessions_dir.exists():
return False, "test_sessions directory not found"
session_files = list(test_sessions_dir.glob("*.jsonl"))
if len(session_files) < 5:
return False, f"Only {len(session_files)} test sessions found, need 5"
# Check each session file
for session_file in session_files:
content = session_file.read_text()
lines = content.strip().split("\n")
# Check that each line is valid JSON
for i, line in enumerate(lines, 1):
try:
json.loads(line)
except json.JSONDecodeError as e:
return False, f"Invalid JSON in {session_file.name}, line {i}: {e}"
return True, f"Found {len(session_files)} valid test sessions"
def run_all_tests():
"""Run all tests and return results."""
tests = [
("Prompt Structure", test_prompt_structure),
("Confidence Scoring", test_confidence_scoring),
("Example Quality", test_example_quality),
("Constraint Coverage", test_constraint_coverage),
("Test Sessions", test_test_sessions)
]
results = []
all_passed = True
for test_name, test_func in tests:
try:
passed, message = test_func()
results.append({
"test": test_name,
"passed": passed,
"message": message
})
if not passed:
all_passed = False
except Exception as e:
results.append({
"test": test_name,
"passed": False,
"message": f"Error: {str(e)}"
})
all_passed = False
# Print results
print("=" * 60)
print("HARVEST PROMPT TEST RESULTS")
print("=" * 60)
for result in results:
status = "✓ PASS" if result["passed"] else "✗ FAIL"
print(f"{status}: {result['test']}")
print(f" {result['message']}")
print()
print("=" * 60)
if all_passed:
print("ALL TESTS PASSED!")
else:
print("SOME TESTS FAILED!")
print("=" * 60)
return all_passed, results
if __name__ == "__main__":
all_passed, results = run_all_tests()
# Save results to file
with open("test_results.json", "w") as f:
json.dump({
"all_passed": all_passed,
"results": results,
"timestamp": "2026-04-14T19:05:00Z"
}, f, indent=2)
print(f"Results saved to test_results.json")
# Exit with appropriate code
exit(0 if all_passed else 1)

View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""Tests for scripts/knowledge_staleness_check.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location("ks", os.path.join(os.path.dirname(__file__) or ".", "knowledge_staleness_check.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
check_staleness = mod.check_staleness
fix_hashes = mod.fix_hashes
compute_file_hash = mod.compute_file_hash
def test_fresh_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("print('hello')")
h = compute_file_hash(src)
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "hello", "source_file": "source.py", "source_hash": h}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "fresh"
print("PASS: test_fresh_entry")
def test_stale_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("original content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "old", "source_file": "source.py", "source_hash": "sha256:wrong"}]}, f)
# Now change the source
with open(src, "w") as f:
f.write("modified content")
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "stale"
print("PASS: test_stale_entry")
def test_missing_source():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "gone", "source_file": "nonexistent.py", "source_hash": "sha256:abc"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "missing_source"
print("PASS: test_missing_source")
def test_no_hash():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "no hash", "source_file": "source.py"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_hash"
assert results[0]["current_hash"].startswith("sha256:")
print("PASS: test_no_hash")
def test_no_source_field():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "orphan"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_source"
print("PASS: test_no_source_field")
def test_fix_hashes():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content for hashing")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "needs hash", "source_file": "source.py"}]}, f)
fixed = fix_hashes(idx, tmpdir)
assert fixed == 1
# Verify hash was added
with open(idx) as f:
data = json.load(f)
assert data["facts"][0]["source_hash"].startswith("sha256:")
print("PASS: test_fix_hashes")
def test_empty_index():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": []}, f)
results = check_staleness(idx, tmpdir)
assert results == []
print("PASS: test_empty_index")
def test_compute_hash_nonexistent():
h = compute_file_hash("/nonexistent/path/file.py")
assert h is None
print("PASS: test_compute_hash_nonexistent")
def run_all():
test_fresh_entry()
test_stale_entry()
test_missing_source()
test_no_hash()
test_no_source_field()
test_fix_hashes()
test_empty_index()
test_compute_hash_nonexistent()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()

View File

@@ -0,0 +1,305 @@
#!/usr/bin/env python3
"""
Tests for Priority Rebalancer
"""
import json
import os
import sys
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
# Add script dir to path
sys.path.insert(0, str(Path(__file__).parent))
from priority_rebalancer import (
GiteaClient,
IssueScore,
PipelineSignal,
compute_issue_score,
collect_knowledge_signals,
collect_metrics_signals,
extract_priority,
generate_report,
generate_markdown_report,
PRIORITY_LEVELS,
)
# ============================================================
# Test Helpers
# ============================================================
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
# ============================================================
# Priority Extraction Tests
# ============================================================
print("=== Priority Rebalancer Tests ===\n")
print("-- Priority Extraction --")
@test("extract P0 from label")
def _():
assert_eq(extract_priority(["P0", "bug"]), "P0")
@test("extract P1 from priority:high")
def _():
assert_eq(extract_priority(["priority:high"]), "P1")
@test("extract P2 from priority:medium")
def _():
assert_eq(extract_priority(["priority:medium"]), "P2")
@test("extract P3 from priority:low")
def _():
assert_eq(extract_priority(["priority:low"]), "P3")
@test("returns None for no priority")
def _():
assert_eq(extract_priority(["bug", "enhancement"]), None)
@test("case insensitive")
def _():
assert_eq(extract_priority(["p1"]), "P1")
assert_eq(extract_priority(["PRIORITY:CRITICAL"]), "P0")
# ============================================================
# Issue Scoring Tests
# ============================================================
print("\n-- Issue Scoring --")
def make_issue(**kwargs):
defaults = {
"number": 1,
"title": "Test issue",
"labels": [],
"created_at": (datetime.utcnow() - timedelta(days=5)).isoformat() + "Z",
"comments": 0,
"assignees": None,
}
defaults.update(kwargs)
return defaults
@test("bug gets score boost")
def _():
issue = make_issue(title="Incorrect output format", labels=["bug"])
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(score.score > 0, f"Bug should boost score, got {score.score}")
# Bug label alone should be P2 or P3 (not P0)
assert_true(score.suggested_priority in ("P2", "P3"),
f"Bug label alone should be P2/P3, got {score.suggested_priority}")
@test("security gets high score")
def _():
issue = make_issue(title="Security: auth bypass", labels=["bug"])
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(score.score >= 25, f"Security should score high, got {score.score}")
@test("old dormant issue gets penalized")
def _():
issue = make_issue(
title="Some old feature",
created_at=(datetime.utcnow() - timedelta(days=120)).isoformat() + "Z",
comments=0
)
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(score.score < 0, f"Old dormant should be negative, got {score.score}")
assert_true(any("Dormant" in r for r in score.reasons), "Should mention dormancy")
@test("active discussion boosts score")
def _():
issue = make_issue(title="Important fix", comments=8)
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(score.score > 5, f"Active discussion should boost, got {score.score}")
assert_true(any("Active" in r for r in score.reasons))
@test("unassigned gets slight boost")
def _():
issue = make_issue(title="Fix bug", assignees=None)
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(any("Unassigned" in r for r in score.reasons))
@test("assigned issue notes assignee")
def _():
issue = make_issue(title="Fix bug", assignees=[{"login": "alice"}])
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_eq(score.assignee, "alice")
@test("nice-to-have gets penalized")
def _():
issue = make_issue(title="Nice to have: fancy animation")
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(score.score < 0, f"Nice-to-have should be negative, got {score.score}")
# ============================================================
# Pipeline Signal Tests
# ============================================================
print("\n-- Pipeline Signals --")
@test("signal alignment boosts matching issues")
def _():
signals = [PipelineSignal(
source="knowledge",
signal_type="stale_knowledge",
weight=0.8,
detail="20 stale facts"
)]
issue = make_issue(title="Fix stale knowledge entries")
score = compute_issue_score(issue, "test-repo", signals, datetime.utcnow())
assert_true(any("Matches signal" in r for r in score.reasons))
@test("empty knowledge boosts harvester issues")
def _():
signals = [PipelineSignal(
source="knowledge",
signal_type="empty_knowledge",
weight=0.7,
detail="0 facts"
)]
issue = make_issue(title="Implement harvester pipeline")
score = compute_issue_score(issue, "test-repo", signals, datetime.utcnow())
assert_true(any("Critical gap" in r for r in score.reasons))
# ============================================================
# Knowledge Signal Collection Tests
# ============================================================
print("\n-- Knowledge Signal Collection --")
@test("missing index generates signal")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
signals = collect_knowledge_signals(tmpdir)
assert_true(len(signals) > 0)
assert_eq(signals[0].signal_type, "missing_index")
@test("empty knowledge generates signal")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": []}, f)
signals = collect_knowledge_signals(tmpdir)
assert_true(any(s.signal_type == "empty_knowledge" for s in signals))
@test("corrupt index generates signal")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
f.write("not json {{{")
signals = collect_knowledge_signals(tmpdir)
assert_true(any(s.signal_type == "corrupt_index" for s in signals))
@test("knowledge with facts passes")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [
{"id": 1, "repo": "test", "status": "fresh"},
{"id": 2, "repo": "test", "status": "fresh"},
]}, f)
signals = collect_knowledge_signals(tmpdir)
# Should not generate missing_index or empty_knowledge
assert_false(any(s.signal_type in ("missing_index", "empty_knowledge") for s in signals))
# ============================================================
# Metrics Signal Collection Tests
# ============================================================
print("\n-- Metrics Signal Collection --")
@test("empty metrics dir generates signal")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
signals = collect_metrics_signals(tmpdir)
assert_true(any(s.signal_type == "no_metrics" for s in signals))
@test("metrics with files passes")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
# Create files (simulating real metrics dir with .gitkeep + actual files)
with open(os.path.join(tmpdir, ".gitkeep"), "w") as f:
f.write("")
with open(os.path.join(tmpdir, "report.json"), "w") as f:
f.write("{}")
signals = collect_metrics_signals(tmpdir)
assert_false(any(s.signal_type == "no_metrics" for s in signals))
# ============================================================
# Report Generation Tests
# ============================================================
print("\n-- Report Generation --")
@test("report has correct structure")
def _():
scores = [
IssueScore(1, "repo1", "Bug fix", ["bug"], None, "P1", 30.0, ["test"], 5, 3, None),
IssueScore(2, "repo1", "Feature", ["enhancement"], "P3", None, -5.0, ["test"], 60, 0, "alice"),
]
signals = [PipelineSignal("knowledge", "stale_knowledge", 0.5, "10 stale")]
report = generate_report(scores, signals, "test-org", ["repo1"])
assert_eq(report["org"], "test-org")
assert_eq(report["total_issues"], 2)
assert_true("generated_at" in report)
assert_true("summary" in report)
assert_true("top_priority" in report)
assert_eq(report["summary"]["suggested_new_priorities"], 1)
@test("markdown report is non-empty")
def _():
scores = [IssueScore(1, "repo1", "Test", ["bug"], None, "P2", 15.0, ["reason"], 5, 0, None)]
report = generate_report(scores, [], "test-org", ["repo1"])
md = generate_markdown_report(report)
assert_true(len(md) > 100)
assert_true("Priority Rebalancer Report" in md)
assert_true("Top 10" in md)
# ============================================================
# Summary
# ============================================================
print(f"\n=== Summary ===")
print(f"Total: {PASS + FAIL} | Passed: {PASS} | Failed: {FAIL}")
if FAIL > 0:
sys.exit(1)

View File

@@ -0,0 +1,18 @@
{"role": "user", "content": "Deploy the application to production"}
{"role": "assistant", "content": "I'll deploy the application to production.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ssh deploy@prod-server 'cd /app && git pull && docker-compose up -d'"}}]}
{"role": "tool", "content": "ssh: connect to host prod-server port 22: Connection refused"}
{"role": "assistant", "content": "Connection refused. Let me check the server status.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ping -c 3 prod-server"}}]}
{"role": "tool", "content": "PING prod-server (192.168.1.100): 56 data bytes\nRequest timeout for icmp_seq 0\nRequest timeout for icmp_seq 1\nRequest timeout for icmp_seq 2\n--- prod-server ping statistics ---\n3 packets transmitted, 0 packets received, 100.0% packet loss"}
{"role": "assistant", "content": "The server appears to be down. Let me check if there's an alternative deployment method."}
{"role": "user", "content": "Try using the VPS instead"}
{"role": "assistant", "content": "I'll try deploying to the VPS.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ssh deploy@vps-server 'cd /app && git pull && docker-compose up -d'"}}]}
{"role": "tool", "content": "Permission denied (publickey)."}
{"role": "assistant", "content": "Permission denied. The SSH key might not be set up correctly. Let me check the SSH configuration.", "tool_calls": [{"function": "terminal", "arguments": {"command": "cat ~/.ssh/config"}}]}
{"role": "tool", "content": "Host vps-server\n HostName 167.99.126.228\n User deploy\n IdentityFile ~/.ssh/id_rsa_vps"}
{"role": "assistant", "content": "The SSH config looks correct. Let me check if the key exists.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ls -la ~/.ssh/id_rsa_vps"}}]}
{"role": "tool", "content": "ls: /Users/user/.ssh/id_rsa_vps: No such file or directory"}
{"role": "assistant", "content": "The SSH key doesn't exist. This is a pitfall - we need to generate the SSH key first."}
{"role": "user", "content": "Generate a new SSH key for the VPS"}
{"role": "assistant", "content": "I'll generate a new SSH key.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ssh-keygen -t rsa -b 4096 -f ~/.ssh/id_rsa_vps -N ''"}}]}
{"role": "tool", "content": "Generating public/private rsa key pair.\nYour identification has been saved in /Users/user/.ssh/id_rsa_vps\nYour public key has been saved in /Users/user/.ssh/id_rsa_vps.pub\nThe key fingerprint is:\nSHA256:abc123... user@machine"}
{"role": "assistant", "content": "SSH key generated. Now I need to add it to the VPS."}

View File

@@ -0,0 +1,14 @@
{"role": "user", "content": "Check the Gitea API for open issues"}
{"role": "assistant", "content": "I'll check the Gitea API for open issues.", "tool_calls": [{"function": "terminal", "arguments": {"command": "curl -H 'Authorization: token $GITEA_TOKEN' https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/compounding-intelligence/issues?state=open"}}]}
{"role": "tool", "content": "curl: (6) Could not resolve host: forge.alexanderwhitestone.com"}
{"role": "assistant", "content": "DNS resolution failed. Let me check the network connectivity.", "tool_calls": [{"function": "terminal", "arguments": {"command": "nslookup forge.alexanderwhitestone.com"}}]}
{"role": "tool", "content": "Server: 8.8.8.8\nAddress: 8.8.8.8#53\n\n** server can't find forge.alexanderwhitestone.com: NXDOMAIN"}
{"role": "assistant", "content": "The domain doesn't resolve. This might be a DNS issue. Let me try using the IP address directly.", "tool_calls": [{"function": "terminal", "arguments": {"command": "curl -H 'Authorization: token $GITEA_TOKEN' https://167.99.126.228/api/v1/repos/Timmy_Foundation/compounding-intelligence/issues?state=open"}}]}
{"role": "tool", "content": "curl: (60) SSL certificate problem: self signed certificate"}
{"role": "assistant", "content": "SSL certificate issue. The server is using a self-signed certificate. Let me try with -k flag to skip SSL verification.", "tool_calls": [{"function": "terminal", "arguments": {"command": "curl -k -H 'Authorization: token $GITEA_TOKEN' https://167.99.126.228/api/v1/repos/Timmy_Foundation/compounding-intelligence/issues?state=open"}}]}
{"role": "tool", "content": "[]"}
{"role": "assistant", "content": "No open issues found. The API returned an empty array."}
{"role": "user", "content": "Check if there are any pull requests"}
{"role": "assistant", "content": "Let me check for pull requests.", "tool_calls": [{"function": "terminal", "arguments": {"command": "curl -k -H 'Authorization: token $GITEA_TOKEN' https://167.99.126.228/api/v1/repos/Timmy_Foundation/compounding-intelligence/pulls?state=open"}}]}
{"role": "tool", "content": "[{"id": 20, "title": "feat: add harvester.py + session_reader.py — session knowledge extractor (closes #8)", "state": "open", "user": {"login": "Rockachopa"}, "created_at": "2026-04-14T17:28:20Z"}]"}
{"role": "assistant", "content": "Found 1 open pull request: PR #20 - "feat: add harvester.py + session_reader.py — session knowledge extractor (closes #8)" by Rockachopa."}

View File

@@ -0,0 +1,10 @@
{"role": "user", "content": "How do I deploy a new version of the application?"}
{"role": "assistant", "content": "Here's the deployment process:\n1. Run tests: pytest\n2. Build Docker image: docker build -t app:latest .\n3. Push to registry: docker push registry.example.com/app:latest\n4. Update production: ssh prod 'cd /app && docker-compose pull && docker-compose up -d'\n5. Verify: curl https://app.example.com/health"}
{"role": "user", "content": "What if tests fail?"}
{"role": "assistant", "content": "If tests fail:\n1. Check the error logs: pytest -v\n2. Fix the failing tests\n3. Re-run tests: pytest\n4. If still failing, check recent changes: git log --oneline -10\n5. Consider reverting: git revert HEAD"}
{"role": "user", "content": "What if the Docker build fails?"}
{"role": "assistant", "content": "If Docker build fails:\n1. Check Dockerfile syntax: docker build --no-cache -t app:latest .\n2. Check for missing dependencies: cat requirements.txt\n3. Check disk space: df -h\n4. Clean up Docker: docker system prune -f\n5. Try building with verbose output: docker build --progress=plain -t app:latest ."}
{"role": "user", "content": "What if the push to registry fails?"}
{"role": "assistant", "content": "If push to registry fails:\n1. Check registry credentials: docker login registry.example.com\n2. Check registry space: curl https://registry.example.com/v2/_catalog\n3. Check image size: docker images app:latest\n4. Try tagging with version: docker tag app:latest registry.example.com/app:v1.0.0\n5. Push with version tag: docker push registry.example.com/app:v1.0.0"}
{"role": "user", "content": "What if the production update fails?"}
{"role": "assistant", "content": "If production update fails:\n1. Check SSH connection: ssh prod 'echo connected'\n2. Check disk space on prod: ssh prod 'df -h'\n3. Check Docker status: ssh prod 'docker ps -a'\n4. Check logs: ssh prod 'docker-compose logs --tail=100'\n5. Rollback: ssh prod 'cd /app && git checkout HEAD~1 && docker-compose up -d'"}

View File

@@ -0,0 +1,12 @@
{"role": "user", "content": "I need to implement a new feature for the application"}
{"role": "assistant", "content": "I can help you implement a new feature. What feature would you like to add?"}
{"role": "user", "content": "I want to add user authentication"}
{"role": "assistant", "content": "User authentication is a good feature. I have a few questions:\n1. What authentication method do you prefer? (JWT, OAuth, Session-based)\n2. Do you need social login? (Google, GitHub, etc.)\n3. What user data do you need to store?\n4. Do you need role-based access control?\n5. What's your database? (PostgreSQL, MySQL, SQLite)"}
{"role": "user", "content": "I prefer JWT, no social login, just email and password, no roles, PostgreSQL"}
{"role": "assistant", "content": "Great choices. Here's the implementation plan:\n1. Create user model with email and password hash\n2. Create JWT token generation and validation\n3. Create login endpoint: POST /api/auth/login\n4. Create register endpoint: POST /api/auth/register\n5. Create middleware for protected routes\n6. Add password hashing with bcrypt\n7. Add rate limiting for login attempts\n8. Add password reset functionality\n9. Add email verification\n10. Test all endpoints"}
{"role": "user", "content": "What about password reset?"}
{"role": "assistant", "content": "For password reset:\n1. Create reset token table\n2. Create forgot password endpoint: POST /api/auth/forgot-password\n3. Send reset email with token\n4. Create reset password endpoint: POST /api/auth/reset-password\n5. Validate reset token and expiration\n6. Update password and delete token\n7. Send confirmation email"}
{"role": "user", "content": "What about email verification?"}
{"role": "assistant", "content": "For email verification:\n1. Add email_verified field to user model\n2. Create verification token table\n3. Send verification email on registration\n4. Create verify email endpoint: GET /api/auth/verify-email?token=...\n5. Validate token and update email_verified\n6. Add middleware to check email_verified for certain routes\n7. Add resend verification email endpoint"}
{"role": "user", "content": "What about rate limiting?"}
{"role": "assistant", "content": "For rate limiting:\n1. Use Redis for rate limit storage\n2. Create rate limit middleware\n3. Set limits: 5 login attempts per minute, 3 password resets per hour\n4. Return 429 Too Many Requests when limit exceeded\n5. Add X-RateLimit headers to responses\n6. Log rate limit violations\n7. Add whitelist for trusted IPs"}

View File

@@ -0,0 +1,13 @@
{"role": "user", "content": "Clone the repo at forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence"}
{"role": "assistant", "content": "I'll clone the repository for you.", "tool_calls": [{"function": "terminal", "arguments": {"command": "git clone https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence.git"}}]}
{"role": "tool", "content": "Cloning into 'compounding-intelligence'...\nremote: Enumerating objects: 15, done.\nremote: Counting objects: 100% (15/15), done.\nremote: Compressing objects: 100% (12/12), done.\nremote: Total 15 (delta 2), reused 0 (delta 0), pack-reused 0\nReceiving objects: 100% (15/15), done.\nResolving deltas: 100% (2/2), done."}
{"role": "assistant", "content": "Repository cloned successfully. Let me check the structure.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ls -la compounding-intelligence/"}}]}
{"role": "tool", "content": "total 8\ndrwxr-xr-x 5 user staff 160 Apr 14 17:21 .\ndrwxr-xr-x 12 user staff 384 Apr 14 17:21 ..\ndrwxr-xr-x 13 user staff 416 Apr 14 17:21 .git\n-rw-r--r-- 1 user staff 1234 Apr 14 17:21 README.md\ndrwxr-xr-x 3 user staff 96 Apr 14 17:21 knowledge\n-rw-r--r-- 1 user staff 456 Apr 14 17:21 metrics\ndrwxr-xr-x 4 user staff 128 Apr 14 17:21 scripts\ndrwxr-xr-x 3 user staff 96 Apr 14 17:21 templates"}
{"role": "user", "content": "What files are in the templates directory?"}
{"role": "assistant", "content": "Let me check the templates directory.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ls -la compounding-intelligence/templates/"}}]}
{"role": "tool", "content": "total 0\ndrwxr-xr-x 3 user staff 96 Apr 14 17:21 .\ndrwxr-xr-x 5 user staff 160 Apr 14 17:21 ..\n-rw-r--r-- 1 user staff 0 Apr 14 17:21 .gitkeep"}
{"role": "assistant", "content": "The templates directory currently only contains a .gitkeep file. No templates have been added yet."}
{"role": "user", "content": "Create a harvest-prompt.md template in the templates directory."}
{"role": "assistant", "content": "I'll create the harvest-prompt.md template for you.", "tool_calls": [{"function": "terminal", "arguments": {"command": "cat > compounding-intelligence/templates/harvest-prompt.md << 'EOF'\n# Knowledge Extraction Prompt\n\nExtract knowledge from session transcripts.\nEOF"}}]}
{"role": "tool", "content": ""}
{"role": "assistant", "content": "Template created successfully. The harvest-prompt.md file is now in the templates directory."}