#!/usr/bin/env python3 """ Build 1,000 deterministic code training pairs for Gitea API & Forge operations. Issue: timmy-config #591 Domains: - gitea_api (250 pairs) HTTP layer, auth, pagination, retries, error handling - repo_management (250 pairs) Creating repos, webhooks, branches, labels, milestones - issue_automation (250 pairs) Fetching, filtering, commenting, assigning, closing, triage - pr_workflow (250 pairs) Creating, reviewing, merging, deduplicating, rebasing Output: - training-data/code-patterns-gitea-api-&-forge.jsonl - ~/.hermes/training-data/code-patterns-gitea-api-&-forge.jsonl Format: JSONL with prompt/response keys, plus metadata fields for provenance. """ from __future__ import annotations import json import random from pathlib import Path # Deterministic seed so the dataset is reproducible random.seed(591_000) REPO_OUTPUT = Path(__file__).resolve().parent.parent / "training-data" / "code-patterns-gitea-api-&-forge.jsonl" HERMES_OUTPUT = Path.home() / ".hermes" / "training-data" / "code-patterns-gitea-api-&-forge.jsonl" # --------------------------------------------------------------------------- # Domain 1/4: gitea_api # --------------------------------------------------------------------------- GITEA_API_PROBLEMS = [ "Write a Python function that pings a Gitea instance to check if it is alive.", "Write a Python function that reads a Gitea API token from standard local paths and returns it.", "Write a Python function that builds a Gitea API URL with query parameters for pagination.", "Write a Python function that makes an authenticated GET request to the Gitea API with retries and exponential backoff.", "Write a Python function that handles HTTP 401, 403, 404, and 500 errors from the Gitea API gracefully.", "Write a Python function that iterates through all pages of a paginated Gitea API endpoint.", "Write a Python function that uses urllib (no external deps) to POST JSON data to the Gitea API.", "Write a Python function that wraps the Gitea API in a typed dataclass response (Issue, PR, Comment).", "Write a Python function that sets proper request headers (Authorization, Content-Type, Accept) for Gitea API calls.", "Write a Python function that implements a circuit breaker for Gitea API requests after 3 consecutive failures.", "Write a Python function that logs Gitea API requests and responses for debugging.", "Write a Python function that validates a Gitea API token by calling the /user endpoint.", "Write a Python function that converts a repo string 'owner/name' into a Gitea API path '/repos/owner/name'.", "Write a Python function that measures the latency of a Gitea API call and raises a warning if it exceeds 2 seconds.", "Write a Python function that caches Gitea API responses for 60 seconds to avoid redundant requests.", "Write a Python function that implements rate-limit awareness by reading the X-RateLimit-Remaining header.", "Write a Python function that serializes a Python dict into JSON and sends it as the body of a PATCH request to Gitea.", "Write a Python function that safely reads the response body from a Gitea API call even when the server returns an empty body.", "Write a Python function that compares two Gitea API base URLs and normalizes them by stripping trailing slashes.", "Write a Python function that creates a reusable urllib Request object with authentication for repeated Gitea API calls.", "Write a Python function that retries only on 5xx and 429 errors, but fails immediately on 4xx (except 429).", "Write a Python function that parses a Gitea API error response and extracts the human-readable message.", "Write a Python function that builds a Gitea API search query for issues across all repos in an organization.", "Write a Python function that attaches a custom User-Agent header to all Gitea API requests.", "Write a Python function that closes idle Gitea API connections after 30 seconds to prevent socket exhaustion.", ] GITEA_API_SOLUTIONS = [ '''import urllib.request import urllib.error import json def ping_gitea(base_url: str) -> bool: """Check if Gitea is responding.""" try: req = urllib.request.Request(f"{base_url}/api/v1/version") with urllib.request.urlopen(req, timeout=5) as resp: return resp.status == 200 except Exception: return False ''', '''from pathlib import Path def read_gitea_token() -> str: """Read token from standard locations.""" for path in [ Path.home() / ".hermes" / "gitea_token", Path.home() / ".config" / "gitea" / "token", ]: if path.exists(): return path.read_text().strip() raise FileNotFoundError("No Gitea token found") ''', '''import urllib.parse def build_api_url(base_url: str, path: str, params: dict | None = None) -> str: """Build a Gitea API URL with optional query params.""" url = f"{base_url.rstrip('/')}/api/v1{path}" if params: clean = {k: v for k, v in params.items() if v is not None} url += "?" + urllib.parse.urlencode(clean) return url ''', '''import urllib.request import urllib.error import json import time def gitea_get_with_retries(url: str, token: str, retries: int = 3, backoff: float = 1.5): """Authenticated GET with exponential backoff.""" for attempt in range(retries): req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") try: with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: if 400 <= e.code < 500 and e.code != 429: raise if attempt == retries - 1: raise except urllib.error.URLError: if attempt == retries - 1: raise time.sleep(backoff ** attempt) ''', '''import urllib.error class GiteaError(Exception): def __init__(self, status: int, message: str): self.status = status super().__init__(f"Gitea {status}: {message}") def handle_gitea_error(e: urllib.error.HTTPError) -> None: """Map HTTP errors to typed exceptions.""" body = e.read().decode(errors="replace") if e.fp else "" if e.code == 401: raise GiteaError(401, f"Unauthorized. Check your token. {body}") if e.code == 403: raise GiteaError(403, f"Forbidden. {body}") if e.code == 404: raise GiteaError(404, f"Not found. {body}") if e.code >= 500: raise GiteaError(e.code, f"Server error. {body}") raise GiteaError(e.code, body) ''', '''import urllib.request import json def paginate_all(url_template: str, token: str, params: dict, limit: int = 50): """Iterate all pages of a paginated Gitea endpoint.""" page = 1 while True: params["page"] = page params["limit"] = limit req = urllib.request.Request(url_template + "?" + urllib.parse.urlencode(params)) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: batch = json.loads(resp.read().decode()) if not batch: break yield from batch if len(batch) < limit: break page += 1 ''', '''import urllib.request import json def gitea_post_json(base_url: str, path: str, token: str, data: dict): """POST JSON to Gitea API using only stdlib.""" url = f"{base_url.rstrip('/')}/api/v1{path}" body = json.dumps(data).encode("utf-8") req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") req.add_header("Accept", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: raw = resp.read().decode() return json.loads(raw) if raw.strip() else {} ''', '''from dataclasses import dataclass from typing import Any @dataclass class Issue: number: int title: str state: str @classmethod def from_dict(cls, d: dict) -> "Issue": return cls( number=d.get("number", 0), title=d.get("title", ""), state=d.get("state", ""), ) def parse_issues(raw: list[dict]) -> list[Issue]: """Convert raw API response into typed Issue objects.""" return [Issue.from_dict(i) for i in raw] ''', '''def build_gitea_request(url: str, token: str, method: str = "GET", data: bytes | None = None): """Build a request with standard headers.""" req = urllib.request.Request(url, data=data, method=method) req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") req.add_header("Accept", "application/json") return req ''', '''class CircuitBreaker: def __init__(self, threshold: int = 3): self.failures = 0 self.threshold = threshold self.open = False def call(self, fn, *args, **kwargs): if self.open: raise RuntimeError("Circuit breaker is OPEN") try: result = fn(*args, **kwargs) self.failures = 0 return result except Exception: self.failures += 1 if self.failures >= self.threshold: self.open = True raise ''', '''import logging import urllib.request logger = logging.getLogger("gitea") def logged_request(req: urllib.request.Request): """Execute a request and log the exchange.""" logger.debug("%s %s", req.get_method(), req.full_url) with urllib.request.urlopen(req, timeout=30) as resp: body = resp.read().decode() logger.debug("Response %d: %s", resp.status, body[:500]) return body ''', '''import urllib.request import json def validate_token(base_url: str, token: str) -> bool: """Verify a token by calling the /user endpoint.""" req = urllib.request.Request(f"{base_url}/api/v1/user") req.add_header("Authorization", f"token {token}") try: with urllib.request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read().decode()) return "login" in data except urllib.error.HTTPError as e: if e.code == 401: return False raise ''', '''def repo_path(repo: str) -> str: """Convert 'owner/name' to '/repos/owner/name'.""" if "/" not in repo: raise ValueError("Repo must be 'owner/name'") return f"/repos/{repo}" ''', '''import time import urllib.request def timed_request(req: urllib.request.Request, threshold: float = 2.0): """Execute request and warn if latency exceeds threshold.""" start = time.time() with urllib.request.urlopen(req, timeout=30) as resp: body = resp.read().decode() elapsed = time.time() - start if elapsed > threshold: print(f"WARNING: Request took {elapsed:.2f}s") return body ''', '''import time import json class TimedCache: def __init__(self, ttl: int = 60): self.ttl = ttl self.store = {} def get(self, key: str, fetch_fn): now = time.time() if key in self.store: value, expiry = self.store[key] if now < expiry: return value value = fetch_fn() self.store[key] = (value, now + self.ttl) return value ''', '''import urllib.request def check_rate_limit(resp: urllib.request.addinfourl) -> int: """Read X-RateLimit-Remaining from response headers.""" remaining = resp.headers.get("X-RateLimit-Remaining") return int(remaining) if remaining else -1 ''', '''import urllib.request import json def gitea_patch(base_url: str, path: str, token: str, data: dict): """Serialize dict to JSON and PATCH.""" url = f"{base_url.rstrip('/')}/api/v1{path}" body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request def safe_read_response(resp: urllib.request.addinfourl) -> dict: """Return parsed JSON or empty dict if body is empty.""" raw = resp.read().decode() if not raw.strip(): return {} import json return json.loads(raw) ''', '''def normalize_base_url(url: str) -> str: """Strip trailing slashes and ensure no duplicate slashes.""" return url.rstrip("/") ''', '''import urllib.request class AuthenticatedRequest: def __init__(self, base_url: str, token: str): self.base_url = base_url.rstrip("/") self.token = token def build(self, path: str, method: str = "GET", data: bytes | None = None): url = f"{self.base_url}/api/v1{path}" req = urllib.request.Request(url, data=data, method=method) req.add_header("Authorization", f"token {self.token}") req.add_header("Content-Type", "application/json") return req ''', '''import urllib.request import urllib.error import time def smart_retry(req: urllib.request.Request, retries: int = 3): """Retry on 5xx and 429; fail fast on other 4xx.""" for attempt in range(retries): try: return urllib.request.urlopen(req, timeout=30) except urllib.error.HTTPError as e: if e.code == 429 or e.code >= 500: if attempt == retries - 1: raise time.sleep(2 ** attempt) else: raise ''', '''import json import urllib.error def parse_error_body(e: urllib.error.HTTPError) -> str: """Extract human-readable message from Gitea error response.""" try: body = json.loads(e.read().decode()) return body.get("message", str(e)) except Exception: return str(e) ''', '''import urllib.request import urllib.parse def search_issues_url(org: str, query: str, state: str = "open") -> str: """Build cross-repo issue search URL.""" params = urllib.parse.urlencode({ "q": query, "state": state, "type": "issues", }) return f"/repos/search?{params}" ''', '''import urllib.request def request_with_user_agent(url: str, token: str, agent: str = "timmy-bot/1.0"): req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") req.add_header("User-Agent", agent) return req ''', '''import urllib.request import socket class TimeoutHTTPHandler(urllib.request.HTTPHandler): def __init__(self, timeout: int = 30): self.timeout = timeout super().__init__() def http_open(self, req): return self.do_open(lambda host, port: http.client.HTTPConnection(host, port, timeout=self.timeout), req) ''', ] # --------------------------------------------------------------------------- # Domain 2/4: repo_management # --------------------------------------------------------------------------- REPO_MANAGEMENT_PROBLEMS = [ "Write a Python function that creates a new Gitea repo under an organization via API.", "Write a Python function that lists all repos in a Gitea organization with pagination.", "Write a Python function that sets a webhook on a Gitea repo to trigger on push events.", "Write a Python function that lists all branches in a Gitea repo.", "Write a Python function that protects a branch in Gitea by requiring PR reviews before merge.", "Write a Python function that creates a custom label in a Gitea repo with a specific color.", "Write a Python function that lists all labels in a Gitea repo.", "Write a Python function that creates a milestone in a Gitea repo with a due date.", "Write a Python function that clones a Gitea repo with --depth 1 for a shallow clone.", "Write a Python function that performs a sparse checkout of a single directory from a Gitea repo.", "Write a Python function that updates a repo's description and website via the Gitea API.", "Write a Python function that deletes a webhook from a Gitea repo by its ID.", "Write a Python function that checks whether a branch exists in a remote Gitea repo without cloning.", "Write a Python function that forks a Gitea repo into another organization via API.", "Write a Python function that sets branch protection rules: no force-push, require status checks.", "Write a Python function that lists all collaborators on a Gitea repo.", "Write a Python function that adds a deploy key to a Gitea repo for CI access.", "Write a Python function that enables issue tracking on a Gitea repo via API.", "Write a Python function that archives a Gitea repo (read-only) via API.", "Write a Python function that un-archives a Gitea repo via API.", "Write a Python function that transfers a Gitea repo from one owner to another.", "Write a Python function that sets a repo topic (tag) via the Gitea API.", "Write a Python function that gets the default branch of a Gitea repo.", "Write a Python function that renames a branch in a Gitea repo via API.", "Write a Python function that syncs a fork with its upstream parent repo via Gitea API.", ] REPO_MANAGEMENT_SOLUTIONS = [ '''import urllib.request import json def create_repo(org: str, name: str, token: str, description: str = "", private: bool = False): url = f"https://forge.example.com/api/v1/orgs/{org}/repos" data = {"name": name, "description": description, "private": private} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def list_org_repos(org: str, token: str, limit: int = 50, page: int = 1): url = f"https://forge.example.com/api/v1/orgs/{org}/repos?limit={limit}&page={page}" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def set_push_webhook(repo: str, token: str, webhook_url: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/hooks" data = { "type": "gitea", "config": {"url": webhook_url, "content_type": "json"}, "events": ["push"], "active": True, } body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def list_branches(repo: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/branches" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def protect_branch(repo: str, branch: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/branch_protections" data = {"branch_name": branch, "enable_push": False, "enable_push_whitelist": False} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def create_label(repo: str, name: str, color: str, token: str): owner, repo_name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{repo_name}/labels" data = {"name": name, "color": color} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def list_labels(repo: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/labels" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json from datetime import datetime, timezone def create_milestone(repo: str, title: str, due_date: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/milestones" data = {"title": title, "due_on": due_date} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import subprocess def shallow_clone(repo_url: str, dest: str, branch: str = "main"): """Clone with --depth 1 to save time and space.""" subprocess.run( ["git", "clone", "--depth", "1", "--branch", branch, repo_url, dest], check=True, capture_output=True, ) ''', '''import subprocess from pathlib import Path def sparse_checkout(repo_url: str, dest: str, directory: str): """Sparse checkout a single directory.""" Path(dest).mkdir(parents=True, exist_ok=True) subprocess.run(["git", "init", dest], check=True, capture_output=True) subprocess.run(["git", "-C", dest, "remote", "add", "origin", repo_url], check=True, capture_output=True) subprocess.run(["git", "-C", dest, "config", "core.sparseCheckout", "true"], check=True, capture_output=True) sparse_file = Path(dest) / ".git" / "info" / "sparse-checkout" sparse_file.write_text(directory + "\\n") subprocess.run(["git", "-C", dest, "pull", "origin", "main"], check=True, capture_output=True) ''', '''import urllib.request import json def update_repo_description(repo: str, description: str, website: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}" data = {"description": description, "website": website} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request def delete_webhook(repo: str, hook_id: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/hooks/{hook_id}" req = urllib.request.Request(url, method="DELETE") req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return resp.status == 204 ''', '''import urllib.request import json def branch_exists(repo: str, branch: str, token: str) -> bool: owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/branches/{branch}" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") try: with urllib.request.urlopen(req, timeout=10) as resp: return resp.status == 200 except urllib.error.HTTPError as e: if e.code == 404: return False raise ''', '''import urllib.request import json def fork_repo(repo: str, org: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/forks" data = {"organization": org} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def set_strict_protection(repo: str, branch: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/branch_protections" data = { "branch_name": branch, "enable_push": False, "enable_status_check": True, "status_check_contexts": ["ci/tests"], } body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def list_collaborators(repo: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/collaborators" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def add_deploy_key(repo: str, title: str, key: str, token: str, read_only: bool = True): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/keys" data = {"title": title, "key": key, "read_only": read_only} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def enable_issues(repo: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}" data = {"has_issues": True} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def archive_repo(repo: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}" data = {"archived": True} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def unarchive_repo(repo: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}" data = {"archived": False} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def transfer_repo(repo: str, new_owner: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/transfer" data = {"new_owner": new_owner} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def set_repo_topic(repo: str, topic: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/topics" data = {"topics": [topic]} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="PUT") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def get_default_branch(repo: str, token: str) -> str: owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: data = json.loads(resp.read().decode()) return data.get("default_branch", "main") ''', '''import urllib.request import json def rename_branch(repo: str, old: str, new: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/branches/{old}" data = {"name": new} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def sync_fork(repo: str, token: str, branch: str = "main"): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls" data = {"title": "Sync upstream", "head": f"upstream:{branch}", "base": branch, "body": "Automated sync"} body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', ] # --------------------------------------------------------------------------- # Domain 3/4: issue_automation # --------------------------------------------------------------------------- ISSUE_AUTOMATION_PROBLEMS = [ "Write a Python function that fetches open issues from a Gitea repo and filters out pull requests.", "Write a Python function that filters Gitea issues by label name.", "Write a Python function that posts a comment on a Gitea issue via API.", "Write a Python function that closes a Gitea issue via API.", "Write a Python function that assigns a Gitea issue to a specific user.", "Write a Python function that checks whether a Gitea issue already has a bot comment to avoid duplicate comments.", "Write a Python function that labels a batch of Gitea issues with the same label.", "Write a Python function that searches Gitea issues across an organization by keyword.", "Write a Python function that calculates the age of a Gitea issue in days from its created_at timestamp.", "Write a Python function that finds all unassigned open issues in a Gitea repo.", "Write a Python function that counts open issues per label in a Gitea repo.", "Write a Python function that reopens a closed Gitea issue via API.", "Write a Python function that fetches the full timeline (comments) of a Gitea issue.", "Write a Python function that detects stale issues (no activity for 30 days) in a Gitea repo.", "Write a Python function that auto-triangles Gitea issues by running a classifier on the title and body.", "Write a Python function that migrates issues from one Gitea repo to another by creating copies.", "Write a Python function that generates a markdown summary report of open issues for a repo.", "Write a Python function that locks a Gitea issue to prevent further comments.", "Write a Python function that unlocks a previously locked Gitea issue.", "Write a Python function that subscribes a user to notifications on a Gitea issue.", "Write a Python function that unsubscribes a user from notifications on a Gitea issue.", "Write a Python function that sets a milestone on a Gitea issue.", "Write a Python function that removes a milestone from a Gitea issue.", "Write a Python function that finds issues created by a specific user in a Gitea repo.", "Write a Python function that updates the title of a Gitea issue via API.", ] ISSUE_AUTOMATION_SOLUTIONS = [ '''import urllib.request import json def fetch_open_issues(repo: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues?state=open&type=issues&limit=100" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return [i for i in json.loads(resp.read().decode()) if not i.get("pull_request")] ''', '''def filter_by_label(issues: list[dict], label: str) -> list[dict]: return [i for i in issues if any(l.get("name") == label for l in i.get("labels", []))] ''', '''import urllib.request import json def comment_on_issue(repo: str, number: int, body: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{number}/comments" data = {"body": body} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def close_issue(repo: str, number: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{number}" data = {"state": "closed"} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def assign_issue(repo: str, number: int, assignees: list[str], token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{number}" data = {"assignees": assignees} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''def has_bot_comment(comments: list[dict], bot_marker: str = "🤖") -> bool: return any(bot_marker in c.get("body", "") for c in comments) ''', '''import urllib.request import json def batch_label_issues(repo: str, numbers: list[int], label_id: int, token: str): owner, name = repo.split("/") for num in numbers: url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{num}/labels" data = {"labels": [label_id]} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30): pass ''', '''import urllib.request import json def search_issues(org: str, keyword: str, token: str): url = f"https://forge.example.com/api/v1/repos/search?q={keyword}&uid=0&limit=50" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''from datetime import datetime, timezone def issue_age_days(created_at: str) -> int: """Calculate age of an issue in days.""" fmt = "%Y-%m-%dT%H:%M:%SZ" created = datetime.strptime(created_at, fmt).replace(tzinfo=timezone.utc) return (datetime.now(timezone.utc) - created).days ''', '''import urllib.request import json def find_unassigned_issues(repo: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues?state=open&type=issues&limit=100" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: issues = json.loads(resp.read().decode()) return [i for i in issues if not i.get("assignee") and not i.get("pull_request")] ''', '''from collections import Counter def count_issues_per_label(issues: list[dict]) -> Counter: counts = Counter() for i in issues: for label in i.get("labels", []): counts[label.get("name", "unknown")] += 1 return counts ''', '''import urllib.request import json def reopen_issue(repo: str, number: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{number}" data = {"state": "open"} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def get_issue_timeline(repo: str, number: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{number}/comments" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''from datetime import datetime, timezone def find_stale_issues(issues: list[dict], days: int = 30) -> list[dict]: now = datetime.now(timezone.utc) stale = [] for i in issues: updated = datetime.fromisoformat(i["updated_at"].replace("Z", "+00:00")) if (now - updated).days > days: stale.append(i) return stale ''', '''import random def triage_issue(title: str, body: str) -> str: """Simple rule-based triage classifier.""" text = (title + " " + body).lower() if any(w in text for w in ["bug", "crash", "error", "fix"]): return "bug" if any(w in text for w in ["feature", "add", "implement", "support"]): return "feature" if any(w in text for w in ["docs", "documentation", "readme"]): return "documentation" return "ops" ''', '''import urllib.request import json def migrate_issue(source_repo: str, target_repo: str, issue: dict, token: str): owner, name = target_repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues" data = { "title": f"[Migrated] {issue['title']}", "body": issue.get("body", ""), } req = urllib.request.Request(url, data=json.dumps(data).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''def generate_issue_report(repo: str, issues: list[dict]) -> str: lines = [f"# Issue Report for {repo}", ""] for i in issues: lines.append(f"- #{i['number']} [{i['state']}] {i['title']}") return "\n".join(lines) ''', '''import urllib.request import json def lock_issue(repo: str, number: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{number}/lock" req = urllib.request.Request(url, data=json.dumps({}).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return resp.status == 204 ''', '''import urllib.request def unlock_issue(repo: str, number: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{number}/lock" req = urllib.request.Request(url, method="DELETE") req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return resp.status == 204 ''', '''import urllib.request import json def subscribe_issue(repo: str, number: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{number}/subscriptions" req = urllib.request.Request(url, data=json.dumps({}).encode(), method="PUT") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request def unsubscribe_issue(repo: str, number: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{number}/subscriptions" req = urllib.request.Request(url, method="DELETE") req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return resp.status == 204 ''', '''import urllib.request import json def set_milestone(repo: str, issue_number: int, milestone_id: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{issue_number}" data = {"milestone": milestone_id} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def remove_milestone(repo: str, issue_number: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{issue_number}" data = {"milestone": 0} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''def find_issues_by_author(issues: list[dict], username: str) -> list[dict]: return [i for i in issues if i.get("user", {}).get("login") == username] ''', '''import urllib.request import json def update_issue_title(repo: str, number: int, new_title: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/issues/{number}" data = {"title": new_title} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', ] # --------------------------------------------------------------------------- # Domain 4/4: pr_workflow # --------------------------------------------------------------------------- PR_WORKFLOW_PROBLEMS = [ "Write a Python function that creates a pull request on Gitea via API.", "Write a Python function that checks if an open pull request already exists for a given issue (by scanning title/body/branch).", "Write a Python function that lists all open pull requests for a Gitea repo.", "Write a Python function that merges a Gitea pull request with retry logic for merge conflicts.", "Write a Python function that posts a review comment on a Gitea pull request.", "Write a Python function that requests a specific user to review a Gitea pull request.", "Write a Python function that checks whether a Gitea pull request is mergeable.", "Write a Python function that rebases a feature branch onto the base branch before opening a PR.", "Write a Python function that updates the description of an existing Gitea pull request.", "Write a Python function that lists the files changed in a Gitea pull request.", "Write a Python function that calculates the net line change (additions - deletions) across all files in a PR.", "Write a Python function that detects destructive deletions in a PR (any file losing >50% of lines).", "Write a Python function that closes a pull request without merging it.", "Write a Python function that approves a Gitea pull request via API.", "Write a Python function that dismisses a review on a Gitea pull request.", "Write a Python function that gets the diff of a Gitea pull request as raw text.", "Write a Python function that triggers a CI pipeline by pushing an empty commit on a PR branch.", "Write a Python function that checks if a PR branch is behind the base branch and needs rebasing.", "Write a Python function that auto-squashes all commits on a PR branch before merging.", "Write a Python function that generates a release notes draft from merged PRs since the last tag.", "Write a Python function that adds a label to a Gitea pull request.", "Write a Python function that removes a label from a Gitea pull request.", "Write a Python function that blocks merging if a PR does not have at least one approved review.", "Write a Python function that fetches the commit history of a PR branch.", "Write a Python function that cherry-picks a commit from one branch to another via Gitea API.", ] PR_WORKFLOW_SOLUTIONS = [ '''import urllib.request import json def create_pull_request(repo: str, title: str, head: str, base: str, body: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls" data = {"title": title, "head": head, "base": base, "body": body} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''def issue_has_open_pr(prs: list[dict], issue_number: int) -> bool: num_str = str(issue_number) for pr in prs: combined = f"{pr.get('title','')} {pr.get('body','')} {pr.get('head',{}).get('ref','')}" if num_str in combined: return True return False ''', '''import urllib.request import json def list_open_prs(repo: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls?state=open&limit=100" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import urllib.error import json import time def merge_pr(repo: str, number: int, token: str, method: str = "squash", retries: int = 3): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{number}/merge" data = {"Do": method} for attempt in range(retries): req = urllib.request.Request(url, data=json.dumps(data).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=30) as resp: return True except urllib.error.HTTPError as e: if e.code == 405 and attempt < retries - 1: time.sleep(5) continue raise return False ''', '''import urllib.request import json def review_comment(repo: str, pr_number: int, body: str, token: str, path: str = "", position: int = 0): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{pr_number}/reviews" data = {"body": body, "event": "COMMENT"} if path: data["comments"] = [{"path": path, "position": position, "body": body}] req = urllib.request.Request(url, data=json.dumps(data).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def request_review(repo: str, pr_number: int, reviewers: list[str], token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{pr_number}/requested_reviewers" data = {"reviewers": reviewers} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def is_mergeable(repo: str, pr_number: int, token: str) -> bool: owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{pr_number}" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: data = json.loads(resp.read().decode()) return data.get("mergeable", False) ''', '''import subprocess def rebase_branch(feature: str, base: str = "main"): subprocess.run(["git", "checkout", feature], check=True, capture_output=True) subprocess.run(["git", "fetch", "origin", base], check=True, capture_output=True) subprocess.run(["git", "rebase", f"origin/{base}"], check=True, capture_output=True) ''', '''import urllib.request import json def update_pr_description(repo: str, pr_number: int, new_body: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{pr_number}" data = {"body": new_body} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def list_pr_files(repo: str, pr_number: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{pr_number}/files" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''def net_line_change(files: list[dict]) -> int: return sum(f.get("additions", 0) - f.get("deletions", 0) for f in files) ''', '''def detect_destructive(files: list[dict], threshold: float = 0.5) -> list[str]: flagged = [] for f in files: if f.get("status") == "modified" and f.get("deletions", 0) > 0: total = f.get("additions", 0) + f.get("deletions", 0) if total > 0 and f["deletions"] / total > threshold: flagged.append(f["filename"]) return flagged ''', '''import urllib.request import json def close_pr(repo: str, pr_number: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{pr_number}" data = {"state": "closed"} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="PATCH") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def approve_pr(repo: str, pr_number: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{pr_number}/reviews" data = {"body": "LGTM", "event": "APPROVE"} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request import json def dismiss_review(repo: str, pr_number: int, review_id: int, token: str, message: str = ""): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{pr_number}/reviews/{review_id}/dismissals" data = {"message": message or "Dismissed"} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request def get_pr_diff(repo: str, pr_number: int, token: str) -> str: owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{pr_number}.diff" req = urllib.request.Request(url) req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return resp.read().decode() ''', '''import subprocess def trigger_ci(repo_url: str, branch: str): subprocess.run( ["git", "commit", "--allow-empty", "-m", "ci: trigger pipeline"], check=True, capture_output=True, ) subprocess.run(["git", "push", repo_url, branch], check=True, capture_output=True) ''', '''import subprocess def is_behind(base: str, feature: str) -> bool: result = subprocess.run( ["git", "merge-base", f"origin/{base}", feature], capture_output=True, text=True, check=True, ) merge_base = result.stdout.strip() result2 = subprocess.run( ["git", "rev-parse", f"origin/{base}"], capture_output=True, text=True, check=True, ) return merge_base != result2.stdout.strip() ''', '''import subprocess def autosquash_branch(branch: str, base: str = "main"): subprocess.run(["git", "checkout", branch], check=True, capture_output=True) subprocess.run(["git", "reset", f"origin/{base}"], check=True, capture_output=True) subprocess.run(["git", "add", "-A"], check=True, capture_output=True) subprocess.run(["git", "commit", "-m", f"feat: squash {branch}"], check=True, capture_output=True) subprocess.run(["git", "push", "--force-with-lease", "origin", branch], check=True, capture_output=True) ''', '''def generate_release_notes(merged_prs: list[dict], tag: str) -> str: lines = [f"## Release {tag}", ""] for pr in merged_prs: lines.append(f"- #{pr['number']} {pr['title']}") return "\n".join(lines) ''', '''import urllib.request import json def add_pr_label(repo: str, pr_number: int, label_id: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{pr_number}/labels" data = {"labels": [label_id]} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', '''import urllib.request def remove_pr_label(repo: str, pr_number: int, label_id: int, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/pulls/{pr_number}/labels/{label_id}" req = urllib.request.Request(url, method="DELETE") req.add_header("Authorization", f"token {token}") with urllib.request.urlopen(req, timeout=30) as resp: return resp.status == 204 ''', '''def require_approval(pr: dict, reviews: list[dict]) -> bool: if pr.get("state") != "open": return False approvals = [r for r in reviews if r.get("state") == "APPROVED"] return len(approvals) >= 1 ''', '''import subprocess def get_pr_commits(branch: str, base: str = "main") -> list[str]: result = subprocess.run( ["git", "log", f"origin/{base}..{branch}", "--pretty=format:%H %s"], capture_output=True, text=True, check=True, ) return result.stdout.strip().split("\n") if result.stdout.strip() else [] ''', '''import urllib.request import json def cherry_pick_commit(repo: str, commit_sha: str, target_branch: str, token: str): owner, name = repo.split("/") url = f"https://forge.example.com/api/v1/repos/{owner}/{name}/git/commits" data = {"message": f"cherry-pick {commit_sha}", "parents": [target_branch], "tree": commit_sha} req = urllib.request.Request(url, data=json.dumps(data).encode(), method="POST") req.add_header("Authorization", f"token {token}") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) ''', ] # --------------------------------------------------------------------------- # Variation generators # --------------------------------------------------------------------------- PROMPT_PREFIXES = [ "", "Using only the Python standard library, ", "Write a robust, production-ready function that ", "Create a typed Python utility that ", "Implement a sovereign, zero-dependency helper that ", "Design a concise Python snippet that ", "Build a reusable module function that ", "Author a defensive Python routine that ", "Draft a minimal but complete solution that ", "Engineer a fault-tolerant Python method that ", ] SUFFIXES = [ " Include error handling.", " Add retry logic with exponential backoff.", " Return typed dataclasses where appropriate.", " Log all failures for observability.", " Validate inputs before making API calls.", " Handle empty responses gracefully.", " Include docstrings and type hints.", " Optimize for minimal external dependencies.", " Add timeout and connection cleanup.", " Surface clear exception messages on failure.", ] # --------------------------------------------------------------------------- # Build pairs # --------------------------------------------------------------------------- def build_domain_pairs(problems, solutions, domain, count): pairs = [] n_problems = len(problems) n_prefixes = len(PROMPT_PREFIXES) n_suffixes = len(SUFFIXES) for idx in range(count): p_idx = idx % n_problems prefix = PROMPT_PREFIXES[idx % n_prefixes] suffix = SUFFIXES[(idx // n_problems) % n_suffixes] prompt = prefix + problems[p_idx] + suffix response = solutions[p_idx] pairs.append({ "id": f"{domain}_{idx+1:04d}", "domain": domain, "prompt": prompt, "response": response, "task_type": "code_pattern", "tags": [domain, "gitea", "forge", "python", "api"], "source_issue": 591, }) return pairs def build_all_pairs() -> list[dict]: rows = [] rows.extend(build_domain_pairs(GITEA_API_PROBLEMS, GITEA_API_SOLUTIONS, "gitea_api", 250)) rows.extend(build_domain_pairs(REPO_MANAGEMENT_PROBLEMS, REPO_MANAGEMENT_SOLUTIONS, "repo_management", 250)) rows.extend(build_domain_pairs(ISSUE_AUTOMATION_PROBLEMS, ISSUE_AUTOMATION_SOLUTIONS, "issue_automation", 250)) rows.extend(build_domain_pairs(PR_WORKFLOW_PROBLEMS, PR_WORKFLOW_SOLUTIONS, "pr_workflow", 250)) random.shuffle(rows) return rows def write_dataset(rows: list[dict]): for out_path in (REPO_OUTPUT, HERMES_OUTPUT): out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", encoding="utf-8") as fh: for row in rows: fh.write(json.dumps(row, ensure_ascii=False) + "\n") print(f"Wrote {len(rows)} pairs to {out_path}") def validate_dataset(rows: list[dict]) -> list[str]: errors = [] for i, row in enumerate(rows, 1): if not row.get("prompt", "").strip(): errors.append(f"Row {i}: empty prompt") if not row.get("response", "").strip(): errors.append(f"Row {i}: empty response") if len(row.get("response", "")) < 20: errors.append(f"Row {i}: response too short") return errors def main() -> None: rows = build_all_pairs() assert len(rows) == 1000, f"Expected 1000, got {len(rows)}" errs = validate_dataset(rows) if errs: print("Validation errors:") for e in errs[:10]: print(" ", e) raise SystemExit(1) write_dataset(rows) print("Done.") if __name__ == "__main__": main()