Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
1fed477af6 feat: sovereign DNS record management (#692)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 19s
2026-04-14 23:59:30 -04:00
7 changed files with 441 additions and 353 deletions

View File

@@ -0,0 +1,13 @@
# Ansible-style variable file for sovereign DNS sync (#692)
# Copy to a private path and fill in provider credentials via env vars.
# Use `auto` to resolve the current VPS public IP at sync time.
dns_provider: cloudflare
# For Cloudflare: zone_id
# For Route53: hosted zone ID (also accepted under dns_zone_id)
dns_zone_id: your-zone-id
domain_ip_map:
forge.alexanderwhitestone.com: auto
matrix.alexanderwhitestone.com: auto
timmy.alexanderwhitestone.com: auto

View File

@@ -1,61 +0,0 @@
# [PHASE-1] Survival - Keep the Lights On
Phase 1 is the manual-clicker stage of the fleet. The machines exist. The services exist. The human is still the automation loop.
## Phase Definition
- Current state: fleet exists, agents run, everything important still depends on human vigilance.
- Resources tracked here: Capacity, Uptime.
- Next phase: [PHASE-2] Automation - Self-Healing Infrastructure
## Current Buildings
- VPS hosts: Ezra, Allegro, Bezalel
- Agents: Timmy harness, Code Claw heartbeat, Gemini AI Studio worker
- Gitea forge
- Evennia worlds
## Current Resource Snapshot
- Fleet operational: yes
- Uptime baseline: 0.0%
- Days at or above 95% uptime: 0
- Capacity utilization: 0.0%
## Next Phase Trigger
To unlock [PHASE-2] Automation - Self-Healing Infrastructure, the fleet must hold both of these conditions at once:
- Uptime >= 95% for 30 consecutive days
- Capacity utilization > 60%
- Current trigger state: NOT READY
## Missing Requirements
- Uptime 0.0% / 95.0%
- Days at or above 95% uptime: 0/30
- Capacity utilization 0.0% / >60.0%
## Manual Clicker Interpretation
Paperclips analogy: Phase 1 = Manual clicker. You ARE the automation.
Every restart, every SSH, every check is a manual click.
## Manual Clicks Still Required
- Restart agents and services by hand when a node goes dark.
- SSH into machines to verify health, disk, and memory.
- Check Gitea, relay, and world services manually before and after changes.
- Act as the scheduler when automation is missing or only partially wired.
## Repo Signals Already Present
- `scripts/fleet_health_probe.sh` — Automated health probe exists and can supply the uptime baseline for the next phase.
- `scripts/fleet_milestones.py` — Milestone tracker exists, so survival achievements can be narrated and logged.
- `scripts/auto_restart_agent.sh` — Auto-restart tooling already exists as phase-2 groundwork.
- `scripts/backup_pipeline.sh` — Backup pipeline scaffold exists for post-survival automation work.
- `infrastructure/timmy-bridge/reports/generate_report.py` — Bridge reporting exists and can summarize heartbeat-driven uptime.
## Notes
- The fleet is alive, but the human is still the control loop.
- Phase 1 is about naming reality plainly so later automation has a baseline to beat.

View File

@@ -12,7 +12,6 @@ Quick-reference index for common operational tasks across the Timmy Foundation i
| Check fleet health | fleet-ops | `python3 scripts/fleet_readiness.py` |
| Agent scorecard | fleet-ops | `python3 scripts/agent_scorecard.py` |
| View fleet manifest | fleet-ops | `cat manifest.yaml` |
| Render Phase-1 survival report | timmy-home | `python3 scripts/fleet_phase_status.py --output docs/FLEET_PHASE_1_SURVIVAL.md` |
## the-nexus (Frontend + Brain)

View File

@@ -1,224 +0,0 @@
#!/usr/bin/env python3
"""Render the current fleet survival phase as a durable report."""
from __future__ import annotations
import argparse
import json
from copy import deepcopy
from pathlib import Path
from typing import Any
PHASE_NAME = "[PHASE-1] Survival - Keep the Lights On"
NEXT_PHASE_NAME = "[PHASE-2] Automation - Self-Healing Infrastructure"
TARGET_UPTIME_PERCENT = 95.0
TARGET_UPTIME_DAYS = 30
TARGET_CAPACITY_PERCENT = 60.0
DEFAULT_BUILDINGS = [
"VPS hosts: Ezra, Allegro, Bezalel",
"Agents: Timmy harness, Code Claw heartbeat, Gemini AI Studio worker",
"Gitea forge",
"Evennia worlds",
]
DEFAULT_MANUAL_CLICKS = [
"Restart agents and services by hand when a node goes dark.",
"SSH into machines to verify health, disk, and memory.",
"Check Gitea, relay, and world services manually before and after changes.",
"Act as the scheduler when automation is missing or only partially wired.",
]
REPO_SIGNAL_FILES = {
"scripts/fleet_health_probe.sh": "Automated health probe exists and can supply the uptime baseline for the next phase.",
"scripts/fleet_milestones.py": "Milestone tracker exists, so survival achievements can be narrated and logged.",
"scripts/auto_restart_agent.sh": "Auto-restart tooling already exists as phase-2 groundwork.",
"scripts/backup_pipeline.sh": "Backup pipeline scaffold exists for post-survival automation work.",
"infrastructure/timmy-bridge/reports/generate_report.py": "Bridge reporting exists and can summarize heartbeat-driven uptime.",
}
DEFAULT_SNAPSHOT = {
"fleet_operational": True,
"resources": {
"uptime_percent": 0.0,
"days_at_or_above_95_percent": 0,
"capacity_utilization_percent": 0.0,
},
"current_buildings": DEFAULT_BUILDINGS,
"manual_clicks": DEFAULT_MANUAL_CLICKS,
"notes": [
"The fleet is alive, but the human is still the control loop.",
"Phase 1 is about naming reality plainly so later automation has a baseline to beat.",
],
}
def default_snapshot() -> dict[str, Any]:
return deepcopy(DEFAULT_SNAPSHOT)
def _deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
result = deepcopy(base)
for key, value in override.items():
if isinstance(value, dict) and isinstance(result.get(key), dict):
result[key] = _deep_merge(result[key], value)
else:
result[key] = value
return result
def load_snapshot(snapshot_path: Path | None = None) -> dict[str, Any]:
snapshot = default_snapshot()
if snapshot_path is None:
return snapshot
override = json.loads(snapshot_path.read_text(encoding="utf-8"))
return _deep_merge(snapshot, override)
def collect_repo_signals(repo_root: Path) -> list[str]:
signals: list[str] = []
for rel_path, description in REPO_SIGNAL_FILES.items():
if (repo_root / rel_path).exists():
signals.append(f"`{rel_path}` — {description}")
return signals
def compute_phase_status(snapshot: dict[str, Any], repo_root: Path | None = None) -> dict[str, Any]:
repo_root = repo_root or Path(__file__).resolve().parents[1]
resources = snapshot.get("resources", {})
uptime_percent = float(resources.get("uptime_percent", 0.0))
uptime_days = int(resources.get("days_at_or_above_95_percent", 0))
capacity_percent = float(resources.get("capacity_utilization_percent", 0.0))
fleet_operational = bool(snapshot.get("fleet_operational", False))
missing: list[str] = []
if not fleet_operational:
missing.append("Fleet operational flag is false.")
if uptime_percent < TARGET_UPTIME_PERCENT:
missing.append(f"Uptime {uptime_percent:.1f}% / {TARGET_UPTIME_PERCENT:.1f}%")
if uptime_days < TARGET_UPTIME_DAYS:
missing.append(f"Days at or above 95% uptime: {uptime_days}/{TARGET_UPTIME_DAYS}")
if capacity_percent <= TARGET_CAPACITY_PERCENT:
missing.append(f"Capacity utilization {capacity_percent:.1f}% / >{TARGET_CAPACITY_PERCENT:.1f}%")
return {
"title": PHASE_NAME,
"current_phase": "PHASE-1 Survival",
"fleet_operational": fleet_operational,
"resources": {
"uptime_percent": uptime_percent,
"days_at_or_above_95_percent": uptime_days,
"capacity_utilization_percent": capacity_percent,
},
"current_buildings": list(snapshot.get("current_buildings", DEFAULT_BUILDINGS)),
"manual_clicks": list(snapshot.get("manual_clicks", DEFAULT_MANUAL_CLICKS)),
"notes": list(snapshot.get("notes", [])),
"repo_signals": collect_repo_signals(repo_root),
"next_phase": NEXT_PHASE_NAME,
"next_phase_ready": fleet_operational and not missing,
"missing_requirements": missing,
}
def render_markdown(status: dict[str, Any]) -> str:
resources = status["resources"]
missing = status["missing_requirements"]
ready_line = "READY" if status["next_phase_ready"] else "NOT READY"
lines = [
f"# {status['title']}",
"",
"Phase 1 is the manual-clicker stage of the fleet. The machines exist. The services exist. The human is still the automation loop.",
"",
"## Phase Definition",
"",
"- Current state: fleet exists, agents run, everything important still depends on human vigilance.",
"- Resources tracked here: Capacity, Uptime.",
f"- Next phase: {status['next_phase']}",
"",
"## Current Buildings",
"",
]
lines.extend(f"- {item}" for item in status["current_buildings"])
lines.extend([
"",
"## Current Resource Snapshot",
"",
f"- Fleet operational: {'yes' if status['fleet_operational'] else 'no'}",
f"- Uptime baseline: {resources['uptime_percent']:.1f}%",
f"- Days at or above 95% uptime: {resources['days_at_or_above_95_percent']}",
f"- Capacity utilization: {resources['capacity_utilization_percent']:.1f}%",
"",
"## Next Phase Trigger",
"",
f"To unlock {status['next_phase']}, the fleet must hold both of these conditions at once:",
f"- Uptime >= {TARGET_UPTIME_PERCENT:.0f}% for {TARGET_UPTIME_DAYS} consecutive days",
f"- Capacity utilization > {TARGET_CAPACITY_PERCENT:.0f}%",
f"- Current trigger state: {ready_line}",
"",
"## Missing Requirements",
"",
])
if missing:
lines.extend(f"- {item}" for item in missing)
else:
lines.append("- None. Phase 2 can unlock now.")
lines.extend([
"",
"## Manual Clicker Interpretation",
"",
"Paperclips analogy: Phase 1 = Manual clicker. You ARE the automation.",
"Every restart, every SSH, every check is a manual click.",
"",
"## Manual Clicks Still Required",
"",
])
lines.extend(f"- {item}" for item in status["manual_clicks"])
lines.extend([
"",
"## Repo Signals Already Present",
"",
])
if status["repo_signals"]:
lines.extend(f"- {item}" for item in status["repo_signals"])
else:
lines.append("- No survival-adjacent repo signals detected.")
if status["notes"]:
lines.extend(["", "## Notes", ""])
lines.extend(f"- {item}" for item in status["notes"])
return "\n".join(lines).rstrip() + "\n"
def main() -> None:
parser = argparse.ArgumentParser(description="Render the fleet phase-1 survival report")
parser.add_argument("--snapshot", help="Optional JSON snapshot overriding the default phase-1 baseline")
parser.add_argument("--output", help="Write markdown report to this path")
parser.add_argument("--json", action="store_true", help="Print computed status as JSON instead of markdown")
args = parser.parse_args()
snapshot = load_snapshot(Path(args.snapshot).expanduser() if args.snapshot else None)
repo_root = Path(__file__).resolve().parents[1]
status = compute_phase_status(snapshot, repo_root=repo_root)
if args.json:
rendered = json.dumps(status, indent=2)
else:
rendered = render_markdown(status)
if args.output:
output_path = Path(args.output).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(rendered, encoding="utf-8")
print(f"Phase status written to {output_path}")
else:
print(rendered)
if __name__ == "__main__":
main()

265
scripts/sovereign_dns.py Normal file
View File

@@ -0,0 +1,265 @@
#!/usr/bin/env python3
"""Sovereign DNS management for fleet domains.
Supports:
- Cloudflare via REST API token
- Route53 via boto3-compatible client (or injected client in tests)
- add / update / delete A records
- sync mode using an Ansible-style domain -> IP mapping YAML
"""
from __future__ import annotations
import argparse
import json
import os
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Callable
import yaml
DEFAULT_MAPPING_PATH = Path('configs/dns_records.example.yaml')
def load_domain_mapping(path: str | Path) -> dict:
data = yaml.safe_load(Path(path).read_text()) or {}
if not isinstance(data, dict):
raise ValueError('mapping file must contain a YAML object')
data.setdefault('domain_ip_map', {})
if not isinstance(data['domain_ip_map'], dict):
raise ValueError('domain_ip_map must be a mapping of domain -> IPv4')
return data
def detect_public_ip(urlopen_fn=urllib.request.urlopen, service_url: str = 'https://api.ipify.org') -> str:
req = urllib.request.Request(service_url, headers={'User-Agent': 'sovereign-dns/1.0'})
with urlopen_fn(req, timeout=10) as resp:
return resp.read().decode().strip()
def resolve_domain_ip_map(domain_ip_map: dict[str, str], current_public_ip: str) -> dict[str, str]:
resolved = {}
for domain, value in domain_ip_map.items():
if isinstance(value, str) and value.strip().lower() in {'auto', '__public_ip__', '$public_ip'}:
resolved[domain] = current_public_ip
else:
resolved[domain] = value
return resolved
def build_sync_plan(current: dict[str, dict], desired: dict[str, str]) -> dict[str, list[dict]]:
create: list[dict] = []
update: list[dict] = []
delete: list[dict] = []
for name, ip in desired.items():
existing = current.get(name)
if existing is None:
create.append({'name': name, 'content': ip})
elif existing.get('content') != ip:
update.append({'name': name, 'id': existing.get('id'), 'content': ip})
for name, record in current.items():
if name not in desired:
delete.append({'name': name, 'id': record.get('id')})
return {'create': create, 'update': update, 'delete': delete}
class CloudflareDNSProvider:
def __init__(self, api_token: str, zone_id: str, request_fn: Callable | None = None):
self.api_token = api_token
self.zone_id = zone_id
self.request_fn = request_fn or self._request
def _request(self, method: str, path: str, payload: dict | None = None) -> dict:
url = 'https://api.cloudflare.com/client/v4' + path
data = None if payload is None else json.dumps(payload).encode()
req = urllib.request.Request(
url,
data=data,
method=method,
headers={
'Authorization': f'Bearer {self.api_token}',
'Content-Type': 'application/json',
},
)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def list_a_records(self) -> dict[str, dict]:
path = f'/zones/{self.zone_id}/dns_records?type=A&per_page=500'
data = self.request_fn('GET', path)
return {item['name']: {'id': item['id'], 'content': item['content']} for item in data.get('result', [])}
def upsert_a_record(self, name: str, content: str) -> dict:
lookup_path = f'/zones/{self.zone_id}/dns_records?type=A&name={urllib.parse.quote(name)}'
existing = self.request_fn('GET', lookup_path).get('result', [])
payload = {'type': 'A', 'name': name, 'content': content, 'ttl': 120, 'proxied': False}
if existing:
return self.request_fn('PUT', f"/zones/{self.zone_id}/dns_records/{existing[0]['id']}", payload)
return self.request_fn('POST', f'/zones/{self.zone_id}/dns_records', payload)
def delete_record(self, record_id: str) -> dict:
return self.request_fn('DELETE', f'/zones/{self.zone_id}/dns_records/{record_id}')
def apply_plan(self, create: list[dict], update: list[dict], delete: list[dict], current: dict[str, dict] | None = None) -> dict:
results = {'created': [], 'updated': [], 'deleted': []}
for item in create:
self.upsert_a_record(item['name'], item['content'])
results['created'].append(item['name'])
for item in update:
self.upsert_a_record(item['name'], item['content'])
results['updated'].append(item['name'])
current = current or {}
for item in delete:
record_id = item.get('id') or current.get(item['name'], {}).get('id')
if record_id:
self.delete_record(record_id)
results['deleted'].append(item['name'])
return results
class Route53DNSProvider:
def __init__(self, hosted_zone_id: str, client=None):
self.hosted_zone_id = hosted_zone_id
if client is None:
import boto3 # optional runtime dependency
client = boto3.client('route53')
self.client = client
def list_a_records(self) -> dict[str, dict]:
data = self.client.list_resource_record_sets(HostedZoneId=self.hosted_zone_id)
result = {}
for item in data.get('ResourceRecordSets', []):
if item.get('Type') != 'A':
continue
name = item['Name'].rstrip('.')
values = item.get('ResourceRecords', [])
if values:
result[name] = {'content': values[0]['Value']}
return result
def apply_plan(self, create: list[dict], update: list[dict], delete: list[dict], current: dict[str, dict] | None = None) -> dict:
current = current or {}
changes = []
for item in create:
changes.append({
'Action': 'CREATE',
'ResourceRecordSet': {
'Name': item['name'],
'Type': 'A',
'TTL': 120,
'ResourceRecords': [{'Value': item['content']}],
},
})
for item in update:
changes.append({
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': item['name'],
'Type': 'A',
'TTL': 120,
'ResourceRecords': [{'Value': item['content']}],
},
})
for item in delete:
old = current.get(item['name'], {})
if old.get('content'):
changes.append({
'Action': 'DELETE',
'ResourceRecordSet': {
'Name': item['name'],
'Type': 'A',
'TTL': 120,
'ResourceRecords': [{'Value': old['content']}],
},
})
if changes:
self.client.change_resource_record_sets(
HostedZoneId=self.hosted_zone_id,
ChangeBatch={'Changes': changes, 'Comment': 'sovereign_dns sync'},
)
return {'changes': changes}
def build_provider(provider_name: str, zone_id: str, api_token: str | None = None):
provider_name = provider_name.lower()
if provider_name == 'cloudflare':
if not api_token:
raise ValueError('Cloudflare requires api_token')
return CloudflareDNSProvider(api_token=api_token, zone_id=zone_id)
if provider_name == 'route53':
return Route53DNSProvider(hosted_zone_id=zone_id)
raise ValueError(f'Unsupported provider: {provider_name}')
def main() -> int:
parser = argparse.ArgumentParser(description='Manage sovereign DNS A records via provider APIs')
sub = parser.add_subparsers(dest='command', required=True)
sync_p = sub.add_parser('sync', help='Sync desired domain->IP mapping to provider')
sync_p.add_argument('--mapping', default=str(DEFAULT_MAPPING_PATH))
sync_p.add_argument('--provider')
sync_p.add_argument('--zone-id')
sync_p.add_argument('--api-token-env', default='CLOUDFLARE_API_TOKEN')
sync_p.add_argument('--public-ip-url', default='https://api.ipify.org')
upsert_p = sub.add_parser('upsert', help='Create or update a single A record')
upsert_p.add_argument('--provider', required=True)
upsert_p.add_argument('--zone-id', required=True)
upsert_p.add_argument('--name', required=True)
upsert_p.add_argument('--content', required=True)
upsert_p.add_argument('--api-token-env', default='CLOUDFLARE_API_TOKEN')
delete_p = sub.add_parser('delete', help='Delete a single A record')
delete_p.add_argument('--provider', required=True)
delete_p.add_argument('--zone-id', required=True)
delete_p.add_argument('--name', required=True)
delete_p.add_argument('--api-token-env', default='CLOUDFLARE_API_TOKEN')
args = parser.parse_args()
if args.command == 'sync':
cfg = load_domain_mapping(args.mapping)
provider_name = args.provider or cfg.get('dns_provider', 'cloudflare')
zone_id = args.zone_id or cfg.get('dns_zone_id') or cfg.get('hosted_zone_id')
token = os.environ.get(args.api_token_env, '')
provider = build_provider(provider_name, zone_id=zone_id, api_token=token)
current = provider.list_a_records()
public_ip = detect_public_ip(service_url=args.public_ip_url)
desired = resolve_domain_ip_map(cfg['domain_ip_map'], current_public_ip=public_ip)
plan = build_sync_plan(current=current, desired=desired)
result = provider.apply_plan(**plan, current=current)
print(json.dumps({'provider': provider_name, 'zone_id': zone_id, 'public_ip': public_ip, 'plan': plan, 'result': result}, indent=2))
return 0
if args.command == 'upsert':
token = os.environ.get(args.api_token_env, '')
provider = build_provider(args.provider, zone_id=args.zone_id, api_token=token)
result = provider.upsert_a_record(args.name, args.content)
print(json.dumps(result, indent=2))
return 0
if args.command == 'delete':
token = os.environ.get(args.api_token_env, '')
provider = build_provider(args.provider, zone_id=args.zone_id, api_token=token)
current = provider.list_a_records()
record = current.get(args.name)
if not record:
raise SystemExit(f'No A record found for {args.name}')
if isinstance(provider, CloudflareDNSProvider):
result = provider.delete_record(record['id'])
else:
result = provider.apply_plan(create=[], update=[], delete=[{'name': args.name}], current=current)
print(json.dumps(result, indent=2))
return 0
raise SystemExit('Unknown command')
if __name__ == '__main__':
raise SystemExit(main())

View File

@@ -1,67 +0,0 @@
from __future__ import annotations
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SCRIPT_PATH = ROOT / "scripts" / "fleet_phase_status.py"
DOC_PATH = ROOT / "docs" / "FLEET_PHASE_1_SURVIVAL.md"
def _load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_compute_phase_status_tracks_survival_gate_requirements() -> None:
mod = _load_module(SCRIPT_PATH, "fleet_phase_status")
status = mod.compute_phase_status(
{
"fleet_operational": True,
"resources": {
"uptime_percent": 94.5,
"days_at_or_above_95_percent": 12,
"capacity_utilization_percent": 45.0,
},
}
)
assert status["current_phase"] == "PHASE-1 Survival"
assert status["next_phase_ready"] is False
assert any("94.5% / 95.0%" in item for item in status["missing_requirements"])
assert any("12/30" in item for item in status["missing_requirements"])
assert any("45.0% / >60.0%" in item for item in status["missing_requirements"])
def test_render_markdown_preserves_phase_buildings_and_manual_clicker_language() -> None:
mod = _load_module(SCRIPT_PATH, "fleet_phase_status")
status = mod.compute_phase_status(mod.default_snapshot())
report = mod.render_markdown(status)
for snippet in (
"# [PHASE-1] Survival - Keep the Lights On",
"VPS hosts: Ezra, Allegro, Bezalel",
"Timmy harness",
"Gitea forge",
"Evennia worlds",
"Every restart, every SSH, every check is a manual click.",
):
assert snippet in report
def test_repo_contains_generated_phase_1_doc() -> None:
assert DOC_PATH.exists(), "missing committed phase-1 survival doc"
text = DOC_PATH.read_text(encoding="utf-8")
for snippet in (
"# [PHASE-1] Survival - Keep the Lights On",
"## Current Buildings",
"## Next Phase Trigger",
"## Manual Clicker Interpretation",
):
assert snippet in text

163
tests/test_sovereign_dns.py Normal file
View File

@@ -0,0 +1,163 @@
import json
import sys
from pathlib import Path
from types import SimpleNamespace
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / 'scripts'))
from sovereign_dns import (
CloudflareDNSProvider,
Route53DNSProvider,
build_sync_plan,
detect_public_ip,
load_domain_mapping,
resolve_domain_ip_map,
)
def test_load_domain_mapping_reads_ansible_style_domain_to_ip_map(tmp_path):
cfg = tmp_path / 'dns_records.yaml'
cfg.write_text(
"""
dns_provider: cloudflare
dns_zone_id: zone-123
domain_ip_map:
forge.example.com: 1.2.3.4
matrix.example.com: 5.6.7.8
"""
)
loaded = load_domain_mapping(cfg)
assert loaded['dns_provider'] == 'cloudflare'
assert loaded['dns_zone_id'] == 'zone-123'
assert loaded['domain_ip_map'] == {
'forge.example.com': '1.2.3.4',
'matrix.example.com': '5.6.7.8',
}
def test_build_sync_plan_updates_changed_ip_and_creates_missing_records():
current = {
'forge.example.com': {'id': 'rec-1', 'content': '1.1.1.1'},
'old.example.com': {'id': 'rec-2', 'content': '9.9.9.9'},
}
desired = {
'forge.example.com': '2.2.2.2',
'new.example.com': '3.3.3.3',
}
plan = build_sync_plan(current=current, desired=desired)
assert plan['update'] == [
{'name': 'forge.example.com', 'id': 'rec-1', 'content': '2.2.2.2'}
]
assert plan['create'] == [
{'name': 'new.example.com', 'content': '3.3.3.3'}
]
assert plan['delete'] == [
{'name': 'old.example.com', 'id': 'rec-2'}
]
def test_resolve_domain_ip_map_replaces_auto_values_with_detected_public_ip():
resolved = resolve_domain_ip_map(
{
'forge.example.com': 'auto',
'matrix.example.com': '5.6.7.8',
},
current_public_ip='8.8.4.4',
)
assert resolved == {
'forge.example.com': '8.8.4.4',
'matrix.example.com': '5.6.7.8',
}
def test_detect_public_ip_reads_provider_response():
class FakeResponse:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def read(self):
return b'4.3.2.1\n'
ip = detect_public_ip(lambda req, timeout=10: FakeResponse())
assert ip == '4.3.2.1'
def test_cloudflare_upsert_calls_expected_http_methods():
calls = []
def fake_request(method, path, payload=None):
calls.append({'method': method, 'path': path, 'payload': payload})
if method == 'GET':
return {'success': True, 'result': []}
return {'success': True, 'result': {'id': 'created-id'}}
provider = CloudflareDNSProvider(
api_token='tok',
zone_id='zone-1',
request_fn=fake_request,
)
provider.upsert_a_record('forge.example.com', '1.2.3.4')
assert calls[0]['method'] == 'GET'
assert calls[0]['path'] == '/zones/zone-1/dns_records?type=A&name=forge.example.com'
assert calls[1]['method'] == 'POST'
assert calls[1]['path'] == '/zones/zone-1/dns_records'
assert calls[1]['payload']['name'] == 'forge.example.com'
assert calls[1]['payload']['content'] == '1.2.3.4'
assert calls[1]['payload']['type'] == 'A'
def test_cloudflare_upsert_updates_when_record_exists():
calls = []
def fake_request(method, path, payload=None):
calls.append({'method': method, 'path': path, 'payload': payload})
if method == 'GET':
return {'success': True, 'result': [{'id': 'rec-123', 'content': '1.1.1.1'}]}
return {'success': True, 'result': {'id': 'rec-123'}}
provider = CloudflareDNSProvider(
api_token='tok',
zone_id='zone-1',
request_fn=fake_request,
)
provider.upsert_a_record('forge.example.com', '2.2.2.2')
assert calls[1]['method'] == 'PUT'
assert calls[1]['path'] == '/zones/zone-1/dns_records/rec-123'
assert calls[1]['payload']['content'] == '2.2.2.2'
def test_route53_sync_uses_change_batches():
batches = []
class FakeClient:
def change_resource_record_sets(self, HostedZoneId, ChangeBatch):
batches.append({'HostedZoneId': HostedZoneId, 'ChangeBatch': ChangeBatch})
return {'ChangeInfo': {'Status': 'PENDING'}}
provider = Route53DNSProvider(hosted_zone_id='ZONE123', client=FakeClient())
provider.apply_plan(
create=[{'name': 'new.example.com', 'content': '3.3.3.3'}],
update=[{'name': 'forge.example.com', 'id': 'ignored', 'content': '2.2.2.2'}],
delete=[{'name': 'old.example.com', 'id': 'ignored'}],
current={'old.example.com': {'content': '9.9.9.9'}},
)
batch = batches[0]
assert batch['HostedZoneId'] == 'ZONE123'
changes = batch['ChangeBatch']['Changes']
assert changes[0]['Action'] == 'CREATE'
assert changes[0]['ResourceRecordSet']['Name'] == 'new.example.com'
assert changes[1]['Action'] == 'UPSERT'
assert changes[1]['ResourceRecordSet']['Name'] == 'forge.example.com'
assert changes[2]['Action'] == 'DELETE'
assert changes[2]['ResourceRecordSet']['Name'] == 'old.example.com'