Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
1fed477af6 feat: sovereign DNS record management (#692)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 19s
2026-04-14 23:59:30 -04:00
3 changed files with 441 additions and 0 deletions

View File

@@ -0,0 +1,13 @@
# Ansible-style variable file for sovereign DNS sync (#692)
# Copy to a private path and fill in provider credentials via env vars.
# Use `auto` to resolve the current VPS public IP at sync time.
dns_provider: cloudflare
# For Cloudflare: zone_id
# For Route53: hosted zone ID (also accepted under dns_zone_id)
dns_zone_id: your-zone-id
domain_ip_map:
forge.alexanderwhitestone.com: auto
matrix.alexanderwhitestone.com: auto
timmy.alexanderwhitestone.com: auto

265
scripts/sovereign_dns.py Normal file
View File

@@ -0,0 +1,265 @@
#!/usr/bin/env python3
"""Sovereign DNS management for fleet domains.
Supports:
- Cloudflare via REST API token
- Route53 via boto3-compatible client (or injected client in tests)
- add / update / delete A records
- sync mode using an Ansible-style domain -> IP mapping YAML
"""
from __future__ import annotations
import argparse
import json
import os
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Callable
import yaml
DEFAULT_MAPPING_PATH = Path('configs/dns_records.example.yaml')
def load_domain_mapping(path: str | Path) -> dict:
data = yaml.safe_load(Path(path).read_text()) or {}
if not isinstance(data, dict):
raise ValueError('mapping file must contain a YAML object')
data.setdefault('domain_ip_map', {})
if not isinstance(data['domain_ip_map'], dict):
raise ValueError('domain_ip_map must be a mapping of domain -> IPv4')
return data
def detect_public_ip(urlopen_fn=urllib.request.urlopen, service_url: str = 'https://api.ipify.org') -> str:
req = urllib.request.Request(service_url, headers={'User-Agent': 'sovereign-dns/1.0'})
with urlopen_fn(req, timeout=10) as resp:
return resp.read().decode().strip()
def resolve_domain_ip_map(domain_ip_map: dict[str, str], current_public_ip: str) -> dict[str, str]:
resolved = {}
for domain, value in domain_ip_map.items():
if isinstance(value, str) and value.strip().lower() in {'auto', '__public_ip__', '$public_ip'}:
resolved[domain] = current_public_ip
else:
resolved[domain] = value
return resolved
def build_sync_plan(current: dict[str, dict], desired: dict[str, str]) -> dict[str, list[dict]]:
create: list[dict] = []
update: list[dict] = []
delete: list[dict] = []
for name, ip in desired.items():
existing = current.get(name)
if existing is None:
create.append({'name': name, 'content': ip})
elif existing.get('content') != ip:
update.append({'name': name, 'id': existing.get('id'), 'content': ip})
for name, record in current.items():
if name not in desired:
delete.append({'name': name, 'id': record.get('id')})
return {'create': create, 'update': update, 'delete': delete}
class CloudflareDNSProvider:
def __init__(self, api_token: str, zone_id: str, request_fn: Callable | None = None):
self.api_token = api_token
self.zone_id = zone_id
self.request_fn = request_fn or self._request
def _request(self, method: str, path: str, payload: dict | None = None) -> dict:
url = 'https://api.cloudflare.com/client/v4' + path
data = None if payload is None else json.dumps(payload).encode()
req = urllib.request.Request(
url,
data=data,
method=method,
headers={
'Authorization': f'Bearer {self.api_token}',
'Content-Type': 'application/json',
},
)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def list_a_records(self) -> dict[str, dict]:
path = f'/zones/{self.zone_id}/dns_records?type=A&per_page=500'
data = self.request_fn('GET', path)
return {item['name']: {'id': item['id'], 'content': item['content']} for item in data.get('result', [])}
def upsert_a_record(self, name: str, content: str) -> dict:
lookup_path = f'/zones/{self.zone_id}/dns_records?type=A&name={urllib.parse.quote(name)}'
existing = self.request_fn('GET', lookup_path).get('result', [])
payload = {'type': 'A', 'name': name, 'content': content, 'ttl': 120, 'proxied': False}
if existing:
return self.request_fn('PUT', f"/zones/{self.zone_id}/dns_records/{existing[0]['id']}", payload)
return self.request_fn('POST', f'/zones/{self.zone_id}/dns_records', payload)
def delete_record(self, record_id: str) -> dict:
return self.request_fn('DELETE', f'/zones/{self.zone_id}/dns_records/{record_id}')
def apply_plan(self, create: list[dict], update: list[dict], delete: list[dict], current: dict[str, dict] | None = None) -> dict:
results = {'created': [], 'updated': [], 'deleted': []}
for item in create:
self.upsert_a_record(item['name'], item['content'])
results['created'].append(item['name'])
for item in update:
self.upsert_a_record(item['name'], item['content'])
results['updated'].append(item['name'])
current = current or {}
for item in delete:
record_id = item.get('id') or current.get(item['name'], {}).get('id')
if record_id:
self.delete_record(record_id)
results['deleted'].append(item['name'])
return results
class Route53DNSProvider:
def __init__(self, hosted_zone_id: str, client=None):
self.hosted_zone_id = hosted_zone_id
if client is None:
import boto3 # optional runtime dependency
client = boto3.client('route53')
self.client = client
def list_a_records(self) -> dict[str, dict]:
data = self.client.list_resource_record_sets(HostedZoneId=self.hosted_zone_id)
result = {}
for item in data.get('ResourceRecordSets', []):
if item.get('Type') != 'A':
continue
name = item['Name'].rstrip('.')
values = item.get('ResourceRecords', [])
if values:
result[name] = {'content': values[0]['Value']}
return result
def apply_plan(self, create: list[dict], update: list[dict], delete: list[dict], current: dict[str, dict] | None = None) -> dict:
current = current or {}
changes = []
for item in create:
changes.append({
'Action': 'CREATE',
'ResourceRecordSet': {
'Name': item['name'],
'Type': 'A',
'TTL': 120,
'ResourceRecords': [{'Value': item['content']}],
},
})
for item in update:
changes.append({
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': item['name'],
'Type': 'A',
'TTL': 120,
'ResourceRecords': [{'Value': item['content']}],
},
})
for item in delete:
old = current.get(item['name'], {})
if old.get('content'):
changes.append({
'Action': 'DELETE',
'ResourceRecordSet': {
'Name': item['name'],
'Type': 'A',
'TTL': 120,
'ResourceRecords': [{'Value': old['content']}],
},
})
if changes:
self.client.change_resource_record_sets(
HostedZoneId=self.hosted_zone_id,
ChangeBatch={'Changes': changes, 'Comment': 'sovereign_dns sync'},
)
return {'changes': changes}
def build_provider(provider_name: str, zone_id: str, api_token: str | None = None):
provider_name = provider_name.lower()
if provider_name == 'cloudflare':
if not api_token:
raise ValueError('Cloudflare requires api_token')
return CloudflareDNSProvider(api_token=api_token, zone_id=zone_id)
if provider_name == 'route53':
return Route53DNSProvider(hosted_zone_id=zone_id)
raise ValueError(f'Unsupported provider: {provider_name}')
def main() -> int:
parser = argparse.ArgumentParser(description='Manage sovereign DNS A records via provider APIs')
sub = parser.add_subparsers(dest='command', required=True)
sync_p = sub.add_parser('sync', help='Sync desired domain->IP mapping to provider')
sync_p.add_argument('--mapping', default=str(DEFAULT_MAPPING_PATH))
sync_p.add_argument('--provider')
sync_p.add_argument('--zone-id')
sync_p.add_argument('--api-token-env', default='CLOUDFLARE_API_TOKEN')
sync_p.add_argument('--public-ip-url', default='https://api.ipify.org')
upsert_p = sub.add_parser('upsert', help='Create or update a single A record')
upsert_p.add_argument('--provider', required=True)
upsert_p.add_argument('--zone-id', required=True)
upsert_p.add_argument('--name', required=True)
upsert_p.add_argument('--content', required=True)
upsert_p.add_argument('--api-token-env', default='CLOUDFLARE_API_TOKEN')
delete_p = sub.add_parser('delete', help='Delete a single A record')
delete_p.add_argument('--provider', required=True)
delete_p.add_argument('--zone-id', required=True)
delete_p.add_argument('--name', required=True)
delete_p.add_argument('--api-token-env', default='CLOUDFLARE_API_TOKEN')
args = parser.parse_args()
if args.command == 'sync':
cfg = load_domain_mapping(args.mapping)
provider_name = args.provider or cfg.get('dns_provider', 'cloudflare')
zone_id = args.zone_id or cfg.get('dns_zone_id') or cfg.get('hosted_zone_id')
token = os.environ.get(args.api_token_env, '')
provider = build_provider(provider_name, zone_id=zone_id, api_token=token)
current = provider.list_a_records()
public_ip = detect_public_ip(service_url=args.public_ip_url)
desired = resolve_domain_ip_map(cfg['domain_ip_map'], current_public_ip=public_ip)
plan = build_sync_plan(current=current, desired=desired)
result = provider.apply_plan(**plan, current=current)
print(json.dumps({'provider': provider_name, 'zone_id': zone_id, 'public_ip': public_ip, 'plan': plan, 'result': result}, indent=2))
return 0
if args.command == 'upsert':
token = os.environ.get(args.api_token_env, '')
provider = build_provider(args.provider, zone_id=args.zone_id, api_token=token)
result = provider.upsert_a_record(args.name, args.content)
print(json.dumps(result, indent=2))
return 0
if args.command == 'delete':
token = os.environ.get(args.api_token_env, '')
provider = build_provider(args.provider, zone_id=args.zone_id, api_token=token)
current = provider.list_a_records()
record = current.get(args.name)
if not record:
raise SystemExit(f'No A record found for {args.name}')
if isinstance(provider, CloudflareDNSProvider):
result = provider.delete_record(record['id'])
else:
result = provider.apply_plan(create=[], update=[], delete=[{'name': args.name}], current=current)
print(json.dumps(result, indent=2))
return 0
raise SystemExit('Unknown command')
if __name__ == '__main__':
raise SystemExit(main())

163
tests/test_sovereign_dns.py Normal file
View File

@@ -0,0 +1,163 @@
import json
import sys
from pathlib import Path
from types import SimpleNamespace
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / 'scripts'))
from sovereign_dns import (
CloudflareDNSProvider,
Route53DNSProvider,
build_sync_plan,
detect_public_ip,
load_domain_mapping,
resolve_domain_ip_map,
)
def test_load_domain_mapping_reads_ansible_style_domain_to_ip_map(tmp_path):
cfg = tmp_path / 'dns_records.yaml'
cfg.write_text(
"""
dns_provider: cloudflare
dns_zone_id: zone-123
domain_ip_map:
forge.example.com: 1.2.3.4
matrix.example.com: 5.6.7.8
"""
)
loaded = load_domain_mapping(cfg)
assert loaded['dns_provider'] == 'cloudflare'
assert loaded['dns_zone_id'] == 'zone-123'
assert loaded['domain_ip_map'] == {
'forge.example.com': '1.2.3.4',
'matrix.example.com': '5.6.7.8',
}
def test_build_sync_plan_updates_changed_ip_and_creates_missing_records():
current = {
'forge.example.com': {'id': 'rec-1', 'content': '1.1.1.1'},
'old.example.com': {'id': 'rec-2', 'content': '9.9.9.9'},
}
desired = {
'forge.example.com': '2.2.2.2',
'new.example.com': '3.3.3.3',
}
plan = build_sync_plan(current=current, desired=desired)
assert plan['update'] == [
{'name': 'forge.example.com', 'id': 'rec-1', 'content': '2.2.2.2'}
]
assert plan['create'] == [
{'name': 'new.example.com', 'content': '3.3.3.3'}
]
assert plan['delete'] == [
{'name': 'old.example.com', 'id': 'rec-2'}
]
def test_resolve_domain_ip_map_replaces_auto_values_with_detected_public_ip():
resolved = resolve_domain_ip_map(
{
'forge.example.com': 'auto',
'matrix.example.com': '5.6.7.8',
},
current_public_ip='8.8.4.4',
)
assert resolved == {
'forge.example.com': '8.8.4.4',
'matrix.example.com': '5.6.7.8',
}
def test_detect_public_ip_reads_provider_response():
class FakeResponse:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def read(self):
return b'4.3.2.1\n'
ip = detect_public_ip(lambda req, timeout=10: FakeResponse())
assert ip == '4.3.2.1'
def test_cloudflare_upsert_calls_expected_http_methods():
calls = []
def fake_request(method, path, payload=None):
calls.append({'method': method, 'path': path, 'payload': payload})
if method == 'GET':
return {'success': True, 'result': []}
return {'success': True, 'result': {'id': 'created-id'}}
provider = CloudflareDNSProvider(
api_token='tok',
zone_id='zone-1',
request_fn=fake_request,
)
provider.upsert_a_record('forge.example.com', '1.2.3.4')
assert calls[0]['method'] == 'GET'
assert calls[0]['path'] == '/zones/zone-1/dns_records?type=A&name=forge.example.com'
assert calls[1]['method'] == 'POST'
assert calls[1]['path'] == '/zones/zone-1/dns_records'
assert calls[1]['payload']['name'] == 'forge.example.com'
assert calls[1]['payload']['content'] == '1.2.3.4'
assert calls[1]['payload']['type'] == 'A'
def test_cloudflare_upsert_updates_when_record_exists():
calls = []
def fake_request(method, path, payload=None):
calls.append({'method': method, 'path': path, 'payload': payload})
if method == 'GET':
return {'success': True, 'result': [{'id': 'rec-123', 'content': '1.1.1.1'}]}
return {'success': True, 'result': {'id': 'rec-123'}}
provider = CloudflareDNSProvider(
api_token='tok',
zone_id='zone-1',
request_fn=fake_request,
)
provider.upsert_a_record('forge.example.com', '2.2.2.2')
assert calls[1]['method'] == 'PUT'
assert calls[1]['path'] == '/zones/zone-1/dns_records/rec-123'
assert calls[1]['payload']['content'] == '2.2.2.2'
def test_route53_sync_uses_change_batches():
batches = []
class FakeClient:
def change_resource_record_sets(self, HostedZoneId, ChangeBatch):
batches.append({'HostedZoneId': HostedZoneId, 'ChangeBatch': ChangeBatch})
return {'ChangeInfo': {'Status': 'PENDING'}}
provider = Route53DNSProvider(hosted_zone_id='ZONE123', client=FakeClient())
provider.apply_plan(
create=[{'name': 'new.example.com', 'content': '3.3.3.3'}],
update=[{'name': 'forge.example.com', 'id': 'ignored', 'content': '2.2.2.2'}],
delete=[{'name': 'old.example.com', 'id': 'ignored'}],
current={'old.example.com': {'content': '9.9.9.9'}},
)
batch = batches[0]
assert batch['HostedZoneId'] == 'ZONE123'
changes = batch['ChangeBatch']['Changes']
assert changes[0]['Action'] == 'CREATE'
assert changes[0]['ResourceRecordSet']['Name'] == 'new.example.com'
assert changes[1]['Action'] == 'UPSERT'
assert changes[1]['ResourceRecordSet']['Name'] == 'forge.example.com'
assert changes[2]['Action'] == 'DELETE'
assert changes[2]['ResourceRecordSet']['Name'] == 'old.example.com'