Compare commits

..

2 Commits

4 changed files with 324 additions and 273 deletions

View File

@@ -1,162 +0,0 @@
#!/usr/bin/env python3
"""
Gitea Issue Body Parser
Extracts structured data from Gitea issue markdown bodies:
- Title
- Context section
- Acceptance criteria (checkboxes)
- Labels
- Epic/parent references
Usage:
python3 scripts/gitea_issue_parser.py <issue_body.txt
python3 scripts/gitea_issue_parser.py --url https://forge.../api/v1/repos/.../issues/123
echo "issue body" | python3 scripts/gitea_issue_parser.py --stdin
Output: JSON with {title, context, criteria[], labels[], epic_ref}
"""
import argparse
import json
import re
import sys
from typing import Optional
def parse_issue_body(body: str, title: str = "", labels: list = None) -> dict:
"""Parse a Gitea issue body into structured JSON."""
result = {
"title": title,
"context": "",
"criteria": [],
"labels": labels or [],
"epic_ref": None,
"sections": {},
}
if not body:
return result
# Extract epic/parent reference from title or body
epic_pattern = r"#(\d+)"
title_refs = re.findall(epic_pattern, title)
body_refs = re.findall(epic_pattern, body[:200]) # Check early body refs
# Look for "Closes #N" or "Part of #N" or "Epic: #N"
close_match = re.search(r"(?:Closes?|Fixes?|Resolves?)\s+#(\d+)", body, re.IGNORECASE)
part_match = re.search(r"(?:Part of|Epic|Parent|Blocks?)\s+#(\d+)", body, re.IGNORECASE)
if close_match:
result["epic_ref"] = f"#{close_match.group(1)}"
elif part_match:
result["epic_ref"] = f"#{part_match.group(1)}"
elif title_refs:
result["epic_ref"] = f"#{title_refs[0]}"
elif body_refs:
result["epic_ref"] = f"#{body_refs[0]}"
# Split into sections by ## headers
section_pattern = r"^##\s+(.+)$"
lines = body.split("\n")
current_section = None
current_content = []
for line in lines:
header_match = re.match(section_pattern, line)
if header_match:
# Save previous section
if current_section:
result["sections"][current_section] = "\n".join(current_content).strip()
current_section = header_match.group(1).strip().lower()
current_content = []
else:
current_content.append(line)
# Save last section
if current_section:
result["sections"][current_section] = "\n".join(current_content).strip()
# Extract context
for key in ["context", "background", "description", "problem"]:
if key in result["sections"]:
result["context"] = result["sections"][key]
break
# Extract acceptance criteria (checkboxes)
criteria_section = None
for key in ["acceptance criteria", "acceptance_criteria", "criteria", "requirements", "definition of done"]:
if key in result["sections"]:
criteria_section = result["sections"][key]
break
if criteria_section:
checkbox_pattern = r"-\s*\[[ xX]?\]\s*(.+)"
for match in re.finditer(checkbox_pattern, criteria_section):
result["criteria"].append(match.group(1).strip())
# Also try plain numbered/bulleted lists if no checkboxes found
if not result["criteria"]:
list_pattern = r"^\s*(?:\d+\.|-|\*)\s+(.+)"
for match in re.finditer(list_pattern, criteria_section, re.MULTILINE):
result["criteria"].append(match.group(1).strip())
# If no sectioned criteria found, scan whole body for checkboxes
if not result["criteria"]:
for match in re.finditer(r"-\s*\[[ xX]?\]\s*(.+)", body):
result["criteria"].append(match.group(1).strip())
return result
def parse_from_url(api_url: str, token: str = None) -> dict:
"""Parse an issue from a Gitea API URL."""
import urllib.request
headers = {}
if token:
headers["Authorization"] = f"token {token}"
req = urllib.request.Request(api_url, headers=headers)
resp = json.loads(urllib.request.urlopen(req, timeout=30).read())
title = resp.get("title", "")
body = resp.get("body", "")
labels = [l["name"] for l in resp.get("labels", [])]
return parse_issue_body(body, title, labels)
def main():
parser = argparse.ArgumentParser(description="Parse Gitea issue body into structured JSON")
parser.add_argument("input", nargs="?", help="Issue body file (or - for stdin)")
parser.add_argument("--url", help="Gitea API URL for the issue")
parser.add_argument("--stdin", action="store_true", help="Read from stdin")
parser.add_argument("--token", help="Gitea API token (or set GITEA_TOKEN env var)")
parser.add_argument("--title", default="", help="Issue title (for epic ref extraction)")
parser.add_argument("--labels", nargs="*", default=[], help="Issue labels")
parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON output")
args = parser.parse_args()
import os
token = args.token or os.environ.get("GITEA_TOKEN")
if args.url:
result = parse_from_url(args.url, token)
elif args.stdin or (args.input and args.input == "-"):
body = sys.stdin.read()
result = parse_issue_body(body, args.title, args.labels)
elif args.input:
with open(args.input) as f:
body = f.read()
result = parse_issue_body(body, args.title, args.labels)
else:
parser.print_help()
sys.exit(1)
indent = 2 if args.pretty else None
print(json.dumps(result, indent=indent))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,234 @@
#!/usr/bin/env python3
"""
Session Transcript → Training Pair Harvester
Scans Hermes session JSONL files for Q&A patterns and extracts
terse→rich training pairs. Outputs JSONL matching the timmy-config
training pairs spec.
Usage:
python3 scripts/session_pair_harvester.py ~/.hermes/sessions/
python3 scripts/session_pair_harvester.py session.jsonl --output pairs.jsonl
python3 scripts/session_pair_harvester.py --dir ~/.hermes/sessions/ --min-ratio 2.0
Output format:
{"terse": "user short prompt", "rich": "ai detailed response", "source": "session_id", "model": "..."}
"""
import argparse
import hashlib
import json
import sys
from pathlib import Path
from typing import Optional
def compute_hash(text: str) -> str:
"""Content hash for deduplication."""
return hashlib.sha256(text.encode()).hexdigest()[:16]
def extract_pairs_from_session(session_data: dict, min_ratio: float = 1.5,
min_response_words: int = 20) -> list:
"""Extract terse→rich pairs from a single session object."""
pairs = []
conversations = session_data.get("conversations", [])
session_id = session_data.get("id", "unknown")
model = session_data.get("model", "unknown")
seen_hashes = set()
for i, msg in enumerate(conversations):
# Look for assistant/gpt responses
if msg.get("from") not in ("gpt", "assistant"):
continue
response_text = msg.get("value", "")
if not response_text or len(response_text.split()) < min_response_words:
continue
# Find the preceding human message
prompt_text = ""
for j in range(i - 1, -1, -1):
if conversations[j].get("from") == "human":
prompt_text = conversations[j].get("value", "")
break
if not prompt_text:
continue
# Filter: skip tool results, system messages embedded as human
if prompt_text.startswith("{") and "output" in prompt_text[:100]:
continue # likely a tool result
if prompt_text.startswith("# SOUL.md") or prompt_text.startswith("You are"):
continue # system prompt leak
# Quality filters
prompt_words = len(prompt_text.split())
response_words = len(response_text.split())
# Must have meaningful length ratio
if prompt_words == 0 or response_words == 0:
continue
ratio = response_words / prompt_words
if ratio < min_ratio:
continue
# Skip responses that are mostly code
code_blocks = response_text.count("```")
if code_blocks >= 4 and len(response_text.replace("```", "").strip()) < 50:
continue
# Skip responses with tool call artifacts
if "tool_call" in response_text[:100] or "function_call" in response_text[:100]:
continue
# Deduplicate by content hash
content_hash = compute_hash(prompt_text + response_text[:200])
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Clean up response: remove markdown headers if too many
clean_response = response_text
pairs.append({
"terse": prompt_text.strip(),
"rich": clean_response.strip(),
"source": session_id,
"model": model,
"prompt_words": prompt_words,
"response_words": response_words,
"ratio": round(ratio, 2),
})
return pairs
def extract_from_jsonl_file(filepath: str, **kwargs) -> list:
"""Extract pairs from a session JSONL file."""
pairs = []
path = Path(filepath)
if not path.exists():
print(f"Warning: {filepath} not found", file=sys.stderr)
return pairs
content = path.read_text()
lines = content.strip().split("\n")
for line in lines:
line = line.strip()
if not line:
continue
try:
session = json.loads(line)
except json.JSONDecodeError:
continue
session_pairs = extract_pairs_from_session(session, **kwargs)
pairs.extend(session_pairs)
return pairs
def deduplicate_pairs(pairs: list) -> list:
"""Remove duplicate pairs across files."""
seen = set()
unique = []
for pair in pairs:
key = compute_hash(pair["terse"] + pair["rich"][:200])
if key not in seen:
seen.add(key)
unique.append(pair)
return unique
def main():
parser = argparse.ArgumentParser(description="Harvest training pairs from session transcripts")
parser.add_argument("input", nargs="?", help="Session JSONL file or directory")
parser.add_argument("--dir", "-d", help="Directory to scan for session files")
parser.add_argument("--output", "-o", default="harvested_pairs.jsonl", help="Output file")
parser.add_argument("--min-ratio", type=float, default=1.5, help="Min response/prompt word ratio")
parser.add_argument("--min-words", type=int, default=20, help="Min response word count")
parser.add_argument("--dry-run", action="store_true", help="Print stats without writing")
args = parser.parse_args()
all_pairs = []
files_scanned = 0
scan_dir = args.dir or args.input
if not scan_dir:
parser.print_help()
sys.exit(1)
scan_path = Path(scan_dir)
if scan_path.is_dir():
jsonl_files = sorted(scan_path.rglob("*.jsonl"))
print(f"Scanning {len(jsonl_files)} files in {scan_dir}...", file=sys.stderr)
for fpath in jsonl_files:
pairs = extract_from_jsonl_file(
str(fpath),
min_ratio=args.min_ratio,
min_response_words=args.min_words
)
all_pairs.extend(pairs)
files_scanned += 1
else:
pairs = extract_from_jsonl_file(
str(scan_path),
min_ratio=args.min_ratio,
min_response_words=args.min_words
)
all_pairs.extend(pairs)
files_scanned = 1
# Deduplicate
unique_pairs = deduplicate_pairs(all_pairs)
# Stats
if unique_pairs:
avg_prompt = sum(p["prompt_words"] for p in unique_pairs) / len(unique_pairs)
avg_response = sum(p["response_words"] for p in unique_pairs) / len(unique_pairs)
avg_ratio = sum(p["ratio"] for p in unique_pairs) / len(unique_pairs)
else:
avg_prompt = avg_response = avg_ratio = 0
stats = {
"files_scanned": files_scanned,
"raw_pairs": len(all_pairs),
"unique_pairs": len(unique_pairs),
"duplicates_removed": len(all_pairs) - len(unique_pairs),
"avg_prompt_words": round(avg_prompt, 1),
"avg_response_words": round(avg_response, 1),
"avg_ratio": round(avg_ratio, 2),
}
print(json.dumps(stats, indent=2), file=sys.stderr)
if args.dry_run:
# Print sample pairs
for pair in unique_pairs[:3]:
print(f"\n--- Source: {pair['source']} (ratio: {pair['ratio']}) ---", file=sys.stderr)
print(f"TERSE: {pair['terse'][:100]}...", file=sys.stderr)
print(f"RICH: {pair['rich'][:150]}...", file=sys.stderr)
return
# Write output
output_path = Path(args.output)
with open(output_path, "w") as f:
for pair in unique_pairs:
# Strip internal fields for output
output = {
"terse": pair["terse"],
"rich": pair["rich"],
"source": pair["source"],
"model": pair["model"],
}
f.write(json.dumps(output) + "\n")
print(f"\nWrote {len(unique_pairs)} pairs to {output_path}", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -1,111 +0,0 @@
#!/usr/bin/env python3
"""Tests for gitea_issue_parser."""
import json
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from gitea_issue_parser import parse_issue_body
def test_basic_structure():
body = """## Context
This is the background.
## Acceptance Criteria
- [ ] First criterion
- [x] Second criterion (already done)
- [ ] Third criterion
## Labels
`pipeline`, `extraction`
"""
result = parse_issue_body(body, "Test Issue", ["pipeline", "extraction"])
assert result["title"] == "Test Issue"
assert "background" in result["context"].lower()
assert len(result["criteria"]) == 3
assert "First criterion" in result["criteria"]
assert result["labels"] == ["pipeline", "extraction"]
print("PASS: test_basic_structure")
def test_epic_ref():
body = "Closes #645\n\nSome description."
result = parse_issue_body(body, "feat: thing (#688)")
assert result["epic_ref"] == "#645"
print("PASS: test_epic_ref")
def test_epic_ref_from_title():
body = "Some description without close ref."
result = parse_issue_body(body, "feat: scene descriptions (#645)")
assert result["epic_ref"] == "#645"
print("PASS: test_epic_ref_from_title")
def test_no_checkboxes():
body = """## Requirements
1. First thing
2. Second thing
3. Third thing
"""
result = parse_issue_body(body)
assert len(result["criteria"]) == 3
print("PASS: test_no_checkboxes")
def test_empty_body():
result = parse_issue_body("", "Empty Issue")
assert result["title"] == "Empty Issue"
assert result["criteria"] == []
assert result["context"] == ""
print("PASS: test_empty_body")
def test_real_issue_format():
body = """Closes #681
## Changes
Add `#!/usr/bin/env python3` shebang to 6 Python scripts.
## Verification
All 6 files confirmed missing shebangs before fix.
## Impact
Scripts can now be executed directly.
"""
result = parse_issue_body(body, "fix: add python3 shebangs (#685)")
assert result["epic_ref"] == "#681"
assert "shebang" in result["context"].lower()
print("PASS: test_real_issue_format")
def test_all_sections_captured():
body = """## Context
Background info.
## Acceptance Criteria
- [ ] Do thing
## Labels
`test`
"""
result = parse_issue_body(body)
assert "context" in result["sections"]
assert "acceptance criteria" in result["sections"]
print("PASS: test_all_sections_captured")
if __name__ == "__main__":
test_basic_structure()
test_epic_ref()
test_epic_ref_from_title()
test_no_checkboxes()
test_empty_body()
test_real_issue_format()
test_all_sections_captured()
print("\nAll tests passed.")

View File

@@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""Tests for session_pair_harvester."""
import json
import sys
import os
import tempfile
sys.path.insert(0, os.path.dirname(__file__))
from session_pair_harvester import extract_pairs_from_session, deduplicate_pairs, compute_hash
def test_basic_extraction():
session = {
"id": "test_001",
"model": "test-model",
"conversations": [
{"from": "system", "value": "You are helpful."},
{"from": "human", "value": "What is Python?"},
{"from": "gpt", "value": "Python is a high-level programming language known for its readability and versatility. It supports multiple paradigms including procedural, object-oriented, and functional programming. Python is widely used in web development, data science, machine learning, and automation."},
]
}
pairs = extract_pairs_from_session(session, min_ratio=1.5, min_response_words=10)
assert len(pairs) == 1
assert pairs[0]["terse"] == "What is Python?"
assert "programming language" in pairs[0]["rich"]
assert pairs[0]["source"] == "test_001"
print("PASS: test_basic_extraction")
def test_filters_short_responses():
session = {
"id": "test_002",
"model": "test",
"conversations": [
{"from": "human", "value": "Hi"},
{"from": "gpt", "value": "Hello!"},
]
}
pairs = extract_pairs_from_session(session, min_ratio=1.5, min_response_words=20)
assert len(pairs) == 0
print("PASS: test_filters_short_responses")
def test_skips_tool_results():
session = {
"id": "test_003",
"model": "test",
"conversations": [
{"from": "human", "value": '{"output": "file content", "exit_code": 0}'},
{"from": "gpt", "value": "The file was read successfully. Now let me analyze the content and provide a detailed summary of what was found in the file system."},
]
}
pairs = extract_pairs_from_session(session, min_ratio=1.5, min_response_words=10)
assert len(pairs) == 0
print("PASS: test_skips_tool_results")
def test_deduplication():
pairs = [
{"terse": "What is X?", "rich": "X is Y.", "source": "s1", "model": "m"},
{"terse": "What is X?", "rich": "X is Y.", "source": "s2", "model": "m"},
{"terse": "What is Z?", "rich": "Z is W.", "source": "s1", "model": "m"},
]
unique = deduplicate_pairs(pairs)
assert len(unique) == 2
print("PASS: test_deduplication")
def test_ratio_filter():
session = {
"id": "test_005",
"model": "test",
"conversations": [
{"from": "human", "value": "Explain quantum computing in detail with examples and applications"},
{"from": "gpt", "value": "OK."},
]
}
pairs = extract_pairs_from_session(session, min_ratio=1.5, min_response_words=10)
assert len(pairs) == 0 # response too short relative to prompt
print("PASS: test_ratio_filter")
if __name__ == "__main__":
test_basic_extraction()
test_filters_short_responses()
test_skips_tool_results()
test_deduplication()
test_ratio_filter()
print("\nAll tests passed.")