Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
1fed477af6 feat: sovereign DNS record management (#692)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 19s
2026-04-14 23:59:30 -04:00
5 changed files with 441 additions and 538 deletions

View File

@@ -0,0 +1,13 @@
# Ansible-style variable file for sovereign DNS sync (#692)
# Copy to a private path and fill in provider credentials via env vars.
# Use `auto` to resolve the current VPS public IP at sync time.
dns_provider: cloudflare
# For Cloudflare: zone_id
# For Route53: hosted zone ID (also accepted under dns_zone_id)
dns_zone_id: your-zone-id
domain_ip_map:
forge.alexanderwhitestone.com: auto
matrix.alexanderwhitestone.com: auto
timmy.alexanderwhitestone.com: auto

View File

@@ -1,219 +0,0 @@
#!/usr/bin/env python3
"""
Codebase Genome — Test Suite Generator
Scans a Python codebase, identifies uncovered functions/methods,
and generates pytest test cases to fill coverage gaps.
Usage:
python codebase-genome.py <target_dir> [--output tests/test_genome_generated.py]
python codebase-genome.py <target_dir> --dry-run
python codebase-genome.py <target_dir> --coverage
"""
import ast
import os
import sys
import argparse
import subprocess
import json
from pathlib import Path
from typing import List, Dict, Any, Optional, Set
from dataclasses import dataclass, field
@dataclass
class FunctionInfo:
name: str
module: str
file_path: str
line_number: int
is_method: bool = False
class_name: Optional[str] = None
args: List[str] = field(default_factory=list)
has_return: bool = False
raises: List[str] = field(default_factory=list)
docstring: Optional[str] = None
is_private: bool = False
is_test: bool = False
class CodebaseScanner:
def __init__(self, target_dir: str):
self.target_dir = Path(target_dir).resolve()
self.functions: List[FunctionInfo] = []
self.modules: Dict[str, List[FunctionInfo]] = {}
def scan(self) -> List[FunctionInfo]:
for py_file in self.target_dir.rglob("*.py"):
if self._should_skip(py_file):
continue
try:
self._scan_file(py_file)
except SyntaxError:
print(f"Warning: Syntax error in {py_file}, skipping", file=sys.stderr)
return self.functions
def _should_skip(self, path: Path) -> bool:
skip_dirs = {"__pycache__", ".git", ".venv", "venv", "node_modules", ".tox"}
if set(path.parts) & skip_dirs:
return True
if path.name.startswith("test_") or path.name.endswith("_test.py"):
return True
if path.name in ("conftest.py", "setup.py"):
return True
return False
def _scan_file(self, file_path: Path):
content = file_path.read_text(encoding="utf-8", errors="replace")
tree = ast.parse(content)
module_name = self._get_module_name(file_path)
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
func = self._extract(node, module_name, file_path)
if func and not func.is_test:
self.functions.append(func)
self.modules.setdefault(module_name, []).append(func)
def _get_module_name(self, file_path: Path) -> str:
rel = file_path.relative_to(self.target_dir)
parts = list(rel.parts)
if parts[-1] == "__init__.py":
parts = parts[:-1]
else:
parts[-1] = parts[-1].replace(".py", "")
return ".".join(parts)
def _extract(self, node, module_name: str, file_path: Path) -> Optional[FunctionInfo]:
if node.name.startswith("test_"):
return None
args = [a.arg for a in node.args.args if a.arg not in ("self", "cls")]
has_return = any(isinstance(n, ast.Return) and n.value for n in ast.walk(node))
raises = []
for n in ast.walk(node):
if isinstance(n, ast.Raise) and n.exc and isinstance(n.exc, ast.Call):
if isinstance(n.exc.func, ast.Name):
raises.append(n.exc.func.id)
docstring = ast.get_docstring(node)
is_method = False
class_name = None
for parent in ast.walk(tree := ast.parse(open(file_path).read())):
for child in ast.iter_child_nodes(parent):
if child is node and isinstance(parent, ast.ClassDef):
is_method = True
class_name = parent.name
return FunctionInfo(
name=node.name, module=module_name, file_path=str(file_path),
line_number=node.lineno, is_method=is_method, class_name=class_name,
args=args, has_return=has_return, raises=raises, docstring=docstring,
is_private=node.name.startswith("_") and not node.name.startswith("__"),
)
class TestGenerator:
HEADER = '''# AUTO-GENERATED by codebase-genome.py — review before committing
import pytest
from unittest.mock import patch, MagicMock
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
'''
def generate(self, functions: List[FunctionInfo]) -> str:
parts = [self.HEADER]
modules: Dict[str, List[FunctionInfo]] = {}
for f in functions:
modules.setdefault(f.module, []).append(f)
for mod, funcs in sorted(modules.items()):
parts.append(f"# ═══ {mod} ═══\n")
imp = mod.replace("-", "_")
parts.append(f"try:\n from {imp} import *\nexcept ImportError:\n pytest.skip('{imp} not importable', allow_module_level=True)\n")
for func in funcs:
test = self._gen_test(func)
if test:
parts.append(test + "\n")
return "\n".join(parts)
def _gen_test(self, func: FunctionInfo) -> Optional[str]:
name = f"test_{func.module.replace('.', '_')}_{func.name}"
lines = [f"def {name}():", f' """Auto-generated for {func.module}.{func.name}."""']
if not func.args:
lines += [
" try:",
f" r = {func.name}()",
" assert r is not None or r is None",
" except Exception:",
" pass",
]
else:
lines += [
" try:",
f" {func.name}({', '.join(a + '=None' for a in func.args)})",
" except (TypeError, ValueError, AttributeError):",
" pass",
]
if any(a in ("text", "content", "message", "query", "path") for a in func.args):
lines += [
" try:",
f" {func.name}({', '.join(a + '=\"\"' if a in ('text','content','message','query','path') else a + '=None' for a in func.args)})",
" except (TypeError, ValueError):",
" pass",
]
if func.raises:
lines.append(f" # May raise: {', '.join(func.raises[:2])}")
lines.append(f" # with pytest.raises(({', '.join(func.raises[:2])})):")
lines.append(f" # {func.name}()")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Codebase Genome — Test Generator")
parser.add_argument("target_dir")
parser.add_argument("--output", "-o", default="tests/test_genome_generated.py")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--max-tests", type=int, default=100)
args = parser.parse_args()
target = Path(args.target_dir).resolve()
if not target.is_dir():
print(f"Error: {target} not a directory", file=sys.stderr)
return 1
print(f"Scanning {target}...")
scanner = CodebaseScanner(str(target))
functions = scanner.scan()
print(f"Found {len(functions)} functions in {len(scanner.modules)} modules")
if len(functions) > args.max_tests:
print(f"Limiting to {args.max_tests}")
functions = functions[:args.max_tests]
gen = TestGenerator()
code = gen.generate(functions)
if args.dry_run:
print(code)
return 0
out = target / args.output
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(code)
print(f"Generated {len(functions)} tests → {out}")
return 0
if __name__ == "__main__":
sys.exit(main())

265
scripts/sovereign_dns.py Normal file
View File

@@ -0,0 +1,265 @@
#!/usr/bin/env python3
"""Sovereign DNS management for fleet domains.
Supports:
- Cloudflare via REST API token
- Route53 via boto3-compatible client (or injected client in tests)
- add / update / delete A records
- sync mode using an Ansible-style domain -> IP mapping YAML
"""
from __future__ import annotations
import argparse
import json
import os
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Callable
import yaml
DEFAULT_MAPPING_PATH = Path('configs/dns_records.example.yaml')
def load_domain_mapping(path: str | Path) -> dict:
data = yaml.safe_load(Path(path).read_text()) or {}
if not isinstance(data, dict):
raise ValueError('mapping file must contain a YAML object')
data.setdefault('domain_ip_map', {})
if not isinstance(data['domain_ip_map'], dict):
raise ValueError('domain_ip_map must be a mapping of domain -> IPv4')
return data
def detect_public_ip(urlopen_fn=urllib.request.urlopen, service_url: str = 'https://api.ipify.org') -> str:
req = urllib.request.Request(service_url, headers={'User-Agent': 'sovereign-dns/1.0'})
with urlopen_fn(req, timeout=10) as resp:
return resp.read().decode().strip()
def resolve_domain_ip_map(domain_ip_map: dict[str, str], current_public_ip: str) -> dict[str, str]:
resolved = {}
for domain, value in domain_ip_map.items():
if isinstance(value, str) and value.strip().lower() in {'auto', '__public_ip__', '$public_ip'}:
resolved[domain] = current_public_ip
else:
resolved[domain] = value
return resolved
def build_sync_plan(current: dict[str, dict], desired: dict[str, str]) -> dict[str, list[dict]]:
create: list[dict] = []
update: list[dict] = []
delete: list[dict] = []
for name, ip in desired.items():
existing = current.get(name)
if existing is None:
create.append({'name': name, 'content': ip})
elif existing.get('content') != ip:
update.append({'name': name, 'id': existing.get('id'), 'content': ip})
for name, record in current.items():
if name not in desired:
delete.append({'name': name, 'id': record.get('id')})
return {'create': create, 'update': update, 'delete': delete}
class CloudflareDNSProvider:
def __init__(self, api_token: str, zone_id: str, request_fn: Callable | None = None):
self.api_token = api_token
self.zone_id = zone_id
self.request_fn = request_fn or self._request
def _request(self, method: str, path: str, payload: dict | None = None) -> dict:
url = 'https://api.cloudflare.com/client/v4' + path
data = None if payload is None else json.dumps(payload).encode()
req = urllib.request.Request(
url,
data=data,
method=method,
headers={
'Authorization': f'Bearer {self.api_token}',
'Content-Type': 'application/json',
},
)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def list_a_records(self) -> dict[str, dict]:
path = f'/zones/{self.zone_id}/dns_records?type=A&per_page=500'
data = self.request_fn('GET', path)
return {item['name']: {'id': item['id'], 'content': item['content']} for item in data.get('result', [])}
def upsert_a_record(self, name: str, content: str) -> dict:
lookup_path = f'/zones/{self.zone_id}/dns_records?type=A&name={urllib.parse.quote(name)}'
existing = self.request_fn('GET', lookup_path).get('result', [])
payload = {'type': 'A', 'name': name, 'content': content, 'ttl': 120, 'proxied': False}
if existing:
return self.request_fn('PUT', f"/zones/{self.zone_id}/dns_records/{existing[0]['id']}", payload)
return self.request_fn('POST', f'/zones/{self.zone_id}/dns_records', payload)
def delete_record(self, record_id: str) -> dict:
return self.request_fn('DELETE', f'/zones/{self.zone_id}/dns_records/{record_id}')
def apply_plan(self, create: list[dict], update: list[dict], delete: list[dict], current: dict[str, dict] | None = None) -> dict:
results = {'created': [], 'updated': [], 'deleted': []}
for item in create:
self.upsert_a_record(item['name'], item['content'])
results['created'].append(item['name'])
for item in update:
self.upsert_a_record(item['name'], item['content'])
results['updated'].append(item['name'])
current = current or {}
for item in delete:
record_id = item.get('id') or current.get(item['name'], {}).get('id')
if record_id:
self.delete_record(record_id)
results['deleted'].append(item['name'])
return results
class Route53DNSProvider:
def __init__(self, hosted_zone_id: str, client=None):
self.hosted_zone_id = hosted_zone_id
if client is None:
import boto3 # optional runtime dependency
client = boto3.client('route53')
self.client = client
def list_a_records(self) -> dict[str, dict]:
data = self.client.list_resource_record_sets(HostedZoneId=self.hosted_zone_id)
result = {}
for item in data.get('ResourceRecordSets', []):
if item.get('Type') != 'A':
continue
name = item['Name'].rstrip('.')
values = item.get('ResourceRecords', [])
if values:
result[name] = {'content': values[0]['Value']}
return result
def apply_plan(self, create: list[dict], update: list[dict], delete: list[dict], current: dict[str, dict] | None = None) -> dict:
current = current or {}
changes = []
for item in create:
changes.append({
'Action': 'CREATE',
'ResourceRecordSet': {
'Name': item['name'],
'Type': 'A',
'TTL': 120,
'ResourceRecords': [{'Value': item['content']}],
},
})
for item in update:
changes.append({
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': item['name'],
'Type': 'A',
'TTL': 120,
'ResourceRecords': [{'Value': item['content']}],
},
})
for item in delete:
old = current.get(item['name'], {})
if old.get('content'):
changes.append({
'Action': 'DELETE',
'ResourceRecordSet': {
'Name': item['name'],
'Type': 'A',
'TTL': 120,
'ResourceRecords': [{'Value': old['content']}],
},
})
if changes:
self.client.change_resource_record_sets(
HostedZoneId=self.hosted_zone_id,
ChangeBatch={'Changes': changes, 'Comment': 'sovereign_dns sync'},
)
return {'changes': changes}
def build_provider(provider_name: str, zone_id: str, api_token: str | None = None):
provider_name = provider_name.lower()
if provider_name == 'cloudflare':
if not api_token:
raise ValueError('Cloudflare requires api_token')
return CloudflareDNSProvider(api_token=api_token, zone_id=zone_id)
if provider_name == 'route53':
return Route53DNSProvider(hosted_zone_id=zone_id)
raise ValueError(f'Unsupported provider: {provider_name}')
def main() -> int:
parser = argparse.ArgumentParser(description='Manage sovereign DNS A records via provider APIs')
sub = parser.add_subparsers(dest='command', required=True)
sync_p = sub.add_parser('sync', help='Sync desired domain->IP mapping to provider')
sync_p.add_argument('--mapping', default=str(DEFAULT_MAPPING_PATH))
sync_p.add_argument('--provider')
sync_p.add_argument('--zone-id')
sync_p.add_argument('--api-token-env', default='CLOUDFLARE_API_TOKEN')
sync_p.add_argument('--public-ip-url', default='https://api.ipify.org')
upsert_p = sub.add_parser('upsert', help='Create or update a single A record')
upsert_p.add_argument('--provider', required=True)
upsert_p.add_argument('--zone-id', required=True)
upsert_p.add_argument('--name', required=True)
upsert_p.add_argument('--content', required=True)
upsert_p.add_argument('--api-token-env', default='CLOUDFLARE_API_TOKEN')
delete_p = sub.add_parser('delete', help='Delete a single A record')
delete_p.add_argument('--provider', required=True)
delete_p.add_argument('--zone-id', required=True)
delete_p.add_argument('--name', required=True)
delete_p.add_argument('--api-token-env', default='CLOUDFLARE_API_TOKEN')
args = parser.parse_args()
if args.command == 'sync':
cfg = load_domain_mapping(args.mapping)
provider_name = args.provider or cfg.get('dns_provider', 'cloudflare')
zone_id = args.zone_id or cfg.get('dns_zone_id') or cfg.get('hosted_zone_id')
token = os.environ.get(args.api_token_env, '')
provider = build_provider(provider_name, zone_id=zone_id, api_token=token)
current = provider.list_a_records()
public_ip = detect_public_ip(service_url=args.public_ip_url)
desired = resolve_domain_ip_map(cfg['domain_ip_map'], current_public_ip=public_ip)
plan = build_sync_plan(current=current, desired=desired)
result = provider.apply_plan(**plan, current=current)
print(json.dumps({'provider': provider_name, 'zone_id': zone_id, 'public_ip': public_ip, 'plan': plan, 'result': result}, indent=2))
return 0
if args.command == 'upsert':
token = os.environ.get(args.api_token_env, '')
provider = build_provider(args.provider, zone_id=args.zone_id, api_token=token)
result = provider.upsert_a_record(args.name, args.content)
print(json.dumps(result, indent=2))
return 0
if args.command == 'delete':
token = os.environ.get(args.api_token_env, '')
provider = build_provider(args.provider, zone_id=args.zone_id, api_token=token)
current = provider.list_a_records()
record = current.get(args.name)
if not record:
raise SystemExit(f'No A record found for {args.name}')
if isinstance(provider, CloudflareDNSProvider):
result = provider.delete_record(record['id'])
else:
result = provider.apply_plan(create=[], update=[], delete=[{'name': args.name}], current=current)
print(json.dumps(result, indent=2))
return 0
raise SystemExit('Unknown command')
if __name__ == '__main__':
raise SystemExit(main())

163
tests/test_sovereign_dns.py Normal file
View File

@@ -0,0 +1,163 @@
import json
import sys
from pathlib import Path
from types import SimpleNamespace
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / 'scripts'))
from sovereign_dns import (
CloudflareDNSProvider,
Route53DNSProvider,
build_sync_plan,
detect_public_ip,
load_domain_mapping,
resolve_domain_ip_map,
)
def test_load_domain_mapping_reads_ansible_style_domain_to_ip_map(tmp_path):
cfg = tmp_path / 'dns_records.yaml'
cfg.write_text(
"""
dns_provider: cloudflare
dns_zone_id: zone-123
domain_ip_map:
forge.example.com: 1.2.3.4
matrix.example.com: 5.6.7.8
"""
)
loaded = load_domain_mapping(cfg)
assert loaded['dns_provider'] == 'cloudflare'
assert loaded['dns_zone_id'] == 'zone-123'
assert loaded['domain_ip_map'] == {
'forge.example.com': '1.2.3.4',
'matrix.example.com': '5.6.7.8',
}
def test_build_sync_plan_updates_changed_ip_and_creates_missing_records():
current = {
'forge.example.com': {'id': 'rec-1', 'content': '1.1.1.1'},
'old.example.com': {'id': 'rec-2', 'content': '9.9.9.9'},
}
desired = {
'forge.example.com': '2.2.2.2',
'new.example.com': '3.3.3.3',
}
plan = build_sync_plan(current=current, desired=desired)
assert plan['update'] == [
{'name': 'forge.example.com', 'id': 'rec-1', 'content': '2.2.2.2'}
]
assert plan['create'] == [
{'name': 'new.example.com', 'content': '3.3.3.3'}
]
assert plan['delete'] == [
{'name': 'old.example.com', 'id': 'rec-2'}
]
def test_resolve_domain_ip_map_replaces_auto_values_with_detected_public_ip():
resolved = resolve_domain_ip_map(
{
'forge.example.com': 'auto',
'matrix.example.com': '5.6.7.8',
},
current_public_ip='8.8.4.4',
)
assert resolved == {
'forge.example.com': '8.8.4.4',
'matrix.example.com': '5.6.7.8',
}
def test_detect_public_ip_reads_provider_response():
class FakeResponse:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def read(self):
return b'4.3.2.1\n'
ip = detect_public_ip(lambda req, timeout=10: FakeResponse())
assert ip == '4.3.2.1'
def test_cloudflare_upsert_calls_expected_http_methods():
calls = []
def fake_request(method, path, payload=None):
calls.append({'method': method, 'path': path, 'payload': payload})
if method == 'GET':
return {'success': True, 'result': []}
return {'success': True, 'result': {'id': 'created-id'}}
provider = CloudflareDNSProvider(
api_token='tok',
zone_id='zone-1',
request_fn=fake_request,
)
provider.upsert_a_record('forge.example.com', '1.2.3.4')
assert calls[0]['method'] == 'GET'
assert calls[0]['path'] == '/zones/zone-1/dns_records?type=A&name=forge.example.com'
assert calls[1]['method'] == 'POST'
assert calls[1]['path'] == '/zones/zone-1/dns_records'
assert calls[1]['payload']['name'] == 'forge.example.com'
assert calls[1]['payload']['content'] == '1.2.3.4'
assert calls[1]['payload']['type'] == 'A'
def test_cloudflare_upsert_updates_when_record_exists():
calls = []
def fake_request(method, path, payload=None):
calls.append({'method': method, 'path': path, 'payload': payload})
if method == 'GET':
return {'success': True, 'result': [{'id': 'rec-123', 'content': '1.1.1.1'}]}
return {'success': True, 'result': {'id': 'rec-123'}}
provider = CloudflareDNSProvider(
api_token='tok',
zone_id='zone-1',
request_fn=fake_request,
)
provider.upsert_a_record('forge.example.com', '2.2.2.2')
assert calls[1]['method'] == 'PUT'
assert calls[1]['path'] == '/zones/zone-1/dns_records/rec-123'
assert calls[1]['payload']['content'] == '2.2.2.2'
def test_route53_sync_uses_change_batches():
batches = []
class FakeClient:
def change_resource_record_sets(self, HostedZoneId, ChangeBatch):
batches.append({'HostedZoneId': HostedZoneId, 'ChangeBatch': ChangeBatch})
return {'ChangeInfo': {'Status': 'PENDING'}}
provider = Route53DNSProvider(hosted_zone_id='ZONE123', client=FakeClient())
provider.apply_plan(
create=[{'name': 'new.example.com', 'content': '3.3.3.3'}],
update=[{'name': 'forge.example.com', 'id': 'ignored', 'content': '2.2.2.2'}],
delete=[{'name': 'old.example.com', 'id': 'ignored'}],
current={'old.example.com': {'content': '9.9.9.9'}},
)
batch = batches[0]
assert batch['HostedZoneId'] == 'ZONE123'
changes = batch['ChangeBatch']['Changes']
assert changes[0]['Action'] == 'CREATE'
assert changes[0]['ResourceRecordSet']['Name'] == 'new.example.com'
assert changes[1]['Action'] == 'UPSERT'
assert changes[1]['ResourceRecordSet']['Name'] == 'forge.example.com'
assert changes[2]['Action'] == 'DELETE'
assert changes[2]['ResourceRecordSet']['Name'] == 'old.example.com'

View File

@@ -1,319 +0,0 @@
# GENOME.md — the-nexus
**Generated:** 2026-04-14
**Repo:** Timmy_Foundation/the-nexus
**Analysis:** Codebase Genome #672
---
## Project Overview
The Nexus is Timmy's canonical 3D home-world — a browser-based Three.js application that serves as:
1. **Local-first training ground** for Timmy (the sovereign AI)
2. **Wizardly visualization surface** for the fleet system
3. **Portal architecture** connecting to other worlds and services
The app is a real-time 3D environment with spatial memory, GOFAI reasoning, agent presence, and portal-based navigation.
---
## Architecture
```mermaid
graph TB
subgraph Browser["BROWSER LAYER"]
HTML[index.html]
APP[app.js - 4082 lines]
CSS[style.css]
Worker[gofai_worker.js]
end
subgraph ThreeJS["THREE.JS RENDERING"]
Scene[Scene Management]
Camera[Camera System]
Renderer[WebGL Renderer]
Post[Post-processing<br/>Bloom, SMAA]
Physics[Physics/Player]
end
subgraph Nexus["NEXUS COMPONENTS"]
SM[SpatialMemory]
SA[SpatialAudio]
MB[MemoryBirth]
MO[MemoryOptimizer]
MI[MemoryInspect]
MP[MemoryPulse]
RT[ReasoningTrace]
RV[ResonanceVisualizer]
end
subgraph GOFAI["GOFAI REASONING"]
Worker2[Web Worker]
Rules[Rule Engine]
Facts[Fact Store]
Inference[Inference Loop]
end
subgraph Backend["BACKEND SERVICES"]
Server[server.py<br/>WebSocket Bridge]
L402[L402 Cost API]
Portal[Portal Registry]
end
subgraph Data["DATA/PERSISTENCE"]
Local[localStorage]
IDB[IndexedDB]
JSON[portals.json]
Vision[vision.json]
end
HTML --> APP
APP --> ThreeJS
APP --> Nexus
APP --> GOFAI
APP --> Backend
APP --> Data
Worker2 --> APP
Server --> APP
```
---
## Entry Points
### Primary Entry
- **`index.html`** — Main HTML shell, loads app.js
- **`app.js`** — Main application (4082 lines), Three.js scene setup
### Secondary Entry Points
- **`boot.js`** — Bootstrap sequence
- **`bootstrap.mjs`** — ES module bootstrap
- **`server.py`** — WebSocket bridge server
### Configuration Entry Points
- **`portals.json`** — Portal definitions and destinations
- **`vision.json`** — Vision/agent configuration
- **`config/fleet_agents.json`** — Fleet agent definitions
---
## Data Flow
```
User Input
app.js (Event Loop)
┌─────────────────────────────────────┐
│ Three.js Scene │
│ - Player movement │
│ - Camera controls │
│ - Physics simulation │
│ - Portal detection │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ Nexus Components │
│ - SpatialMemory (room/context) │
│ - MemoryBirth (new memories) │
│ - MemoryPulse (heartbeat) │
│ - ReasoningTrace (GOFAI output) │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ GOFAI Worker (off-thread) │
│ - Rule evaluation │
│ - Fact inference │
│ - Decision making │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ Backend Services │
│ - WebSocket (server.py) │
│ - L402 cost API │
│ - Portal registry │
└─────────────────────────────────────┘
Persistence (localStorage/IndexedDB)
```
---
## Key Abstractions
### 1. Nexus Object (`NEXUS`)
Central configuration and state object containing:
- Color palette
- Room definitions
- Portal configurations
- Agent settings
### 2. SpatialMemory
Manages room-based context for the AI agent:
- Room transitions trigger context switches
- Facts are stored per-room
- NPCs have location awareness
### 3. Portal System
Connects the 3D world to external services:
- Portals defined in `portals.json`
- Each portal links to a service/endpoint
- Visual indicators in 3D space
### 4. GOFAI Worker
Off-thread reasoning engine:
- Rule-based inference
- Fact store with persistence
- Decision making for agent behavior
### 5. Memory Components
- **MemoryBirth**: Creates new memories from interactions
- **MemoryOptimizer**: Compresses and deduplicates memories
- **MemoryPulse**: Heartbeat system for memory health
- **MemoryInspect**: Debug/inspection interface
---
## API Surface
### Internal APIs (JavaScript)
| Module | Export | Purpose |
|--------|--------|---------|
| `app.js` | `NEXUS` | Main config/state object |
| `SpatialMemory` | class | Room-based context management |
| `SpatialAudio` | class | 3D positional audio |
| `MemoryBirth` | class | Memory creation |
| `MemoryOptimizer` | class | Memory compression |
| `ReasoningTrace` | class | GOFAI reasoning visualization |
### External APIs (HTTP/WebSocket)
| Endpoint | Protocol | Purpose |
|----------|----------|---------|
| `ws://localhost:PORT` | WebSocket | Real-time bridge to backend |
| `http://localhost:8080/api/cost-estimate` | HTTP | L402 cost estimation |
| Portal endpoints | Various | External service connections |
---
## Dependencies
### Runtime Dependencies
- **Three.js** — 3D rendering engine
- **Three.js Addons** — Post-processing (Bloom, SMAA)
### Build Dependencies
- **ES Modules** — Native browser modules
- **No bundler** — Direct script loading
### Backend Dependencies
- **Python 3.x** — server.py
- **WebSocket** — Real-time communication
---
## Test Coverage
### Existing Tests
- `tests/boot.test.js` — Bootstrap sequence tests
### Test Gaps
1. **Three.js scene initialization** — No tests
2. **Portal system** — No tests
3. **Memory components** — No tests
4. **GOFAI worker** — No tests
5. **WebSocket communication** — No tests
6. **Spatial memory transitions** — No tests
7. **Physics/player movement** — No tests
### Recommended Test Priorities
1. Portal detection and activation
2. Spatial memory room transitions
3. GOFAI worker message passing
4. WebSocket connection handling
5. Memory persistence (localStorage/IndexedDB)
---
## Security Considerations
### Current Risks
1. **WebSocket without auth** — server.py has no authentication
2. **localStorage sensitive data** — Memories stored unencrypted
3. **CORS open** — No origin restrictions on WebSocket
4. **L402 endpoint** — Cost API may expose internal state
### Mitigations
1. Add WebSocket authentication
2. Encrypt sensitive memories
3. Restrict CORS origins
4. Rate limit L402 endpoint
---
## File Structure
```
the-nexus/
├── app.js # Main app (4082 lines)
├── index.html # HTML shell
├── style.css # Styles
├── server.py # WebSocket bridge
├── boot.js # Bootstrap
├── bootstrap.mjs # ES module bootstrap
├── gofai_worker.js # GOFAI web worker
├── portals.json # Portal definitions
├── vision.json # Vision config
├── nexus/ # Nexus components
│ └── components/
│ ├── spatial-memory.js
│ ├── spatial-audio.js
│ ├── memory-birth.js
│ ├── memory-optimizer.js
│ ├── memory-inspect.js
│ ├── memory-pulse.js
│ ├── reasoning-trace.js
│ └── resonance-visualizer.js
├── config/ # Configuration
├── docs/ # Documentation
├── tests/ # Tests
├── agent/ # Agent components
├── bin/ # Scripts
└── assets/ # Static assets
```
---
## Technical Debt
1. **Large app.js** (4082 lines) — Should be split into modules
2. **No TypeScript** — Pure JavaScript, no type safety
3. **Manual DOM manipulation** — Could use a framework
4. **No build system** — Direct ES modules, no optimization
5. **Limited error handling** — Minimal try/catch coverage
---
## Migration Notes
From CLAUDE.md:
- Current `main` does NOT ship the old root frontend files
- A clean checkout serves a directory listing
- The live browser shell exists in legacy form at `/Users/apayne/the-matrix`
- Migration priorities: #684 (docs), #685 (legacy audit), #686 (smoke tests), #687 (restore shell)
---
## Next Steps
1. **Restore browser shell** — Bring frontend back to main
2. **Add tests** — Cover critical paths (portals, memory, GOFAI)
3. **Split app.js** — Modularize the 4082-line file
4. **Add authentication** — Secure WebSocket and APIs
5. **TypeScript migration** — Add type safety
---
*Generated by Codebase Genome pipeline — Issue #672*