Files
hermes-agent/tools/fleet_ca.py

72 lines
4.4 KiB
Python

"""Fleet CA for agent-to-agent mTLS (#806)."""
import argparse, datetime, os, sys
from pathlib import Path
def init_ca(output_dir, ca_name="Timmy Fleet CA", days=3650):
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
out = Path(output_dir); out.mkdir(parents=True, exist_ok=True)
ca_key = rsa.generate_private_key(65537, 4096)
ca_cert = (x509.CertificateBuilder()
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ca_name)]))
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ca_name)]))
.public_key(ca_key.public_key()).serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=days))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.sign(ca_key, hashes.SHA256()))
with open(out/"ca.key","wb") as f: f.write(ca_key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption()))
os.chmod(out/"ca.key", 0o600)
with open(out/"ca.crt","wb") as f: f.write(ca_cert.public_bytes(serialization.Encoding.PEM))
print(f"CA created: {out}/ca.crt")
def issue_cert(agent_name, output_dir, days=365):
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
out = Path(output_dir)
with open(out/"ca.key","rb") as f: ca_key = serialization.load_pem_private_key(f.read(), None)
with open(out/"ca.crt","rb") as f: ca_cert = x509.load_pem_x509_certificate(f.read())
key = rsa.generate_private_key(65537, 2048)
cert = (x509.CertificateBuilder()
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, agent_name)]))
.issuer_name(ca_cert.subject).public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=days))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
.add_extension(x509.SubjectAlternativeName([x509.DNSName(agent_name), x509.DNSName(f"{agent_name}.local")]), critical=False)
.sign(ca_key, hashes.SHA256()))
with open(out/f"{agent_name}.key","wb") as f: f.write(key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption()))
os.chmod(out/f"{agent_name}.key", 0o600)
with open(out/f"{agent_name}.crt","wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM))
print(f"Cert issued: {out}/{agent_name}.crt")
def verify_cert(cert_path, ca_path):
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import padding
with open(ca_path,"rb") as f: ca = x509.load_pem_x509_certificate(f.read())
with open(cert_path,"rb") as f: cert = x509.load_pem_x509_certificate(f.read())
try:
ca.public_key().verify(cert.signature, cert.tbs_certificate_bytes, padding.PKCS1v15(), cert.signature_hash_algorithm)
print(f"OK: {cert.subject} signed by {ca.subject}"); return True
except Exception as e: print(f"FAIL: {e}"); return False
def main():
p = argparse.ArgumentParser(description="Fleet CA for mTLS")
sub = p.add_subparsers(dest="cmd")
pi = sub.add_parser("init"); pi.add_argument("--output-dir", default=os.path.expanduser("~/.hermes/certs"))
pi.add_argument("--ca-name", default="Timmy Fleet CA"); pi.add_argument("--days", type=int, default=3650)
pu = sub.add_parser("issue"); pu.add_argument("--agent", required=True); pu.add_argument("--output-dir", default=os.path.expanduser("~/.hermes/certs"))
pv = sub.add_parser("verify"); pv.add_argument("--cert", required=True); pv.add_argument("--ca", required=True)
args = p.parse_args()
if args.cmd == "init": init_ca(args.output_dir, args.ca_name, args.days)
elif args.cmd == "issue": issue_cert(args.agent, args.output_dir)
elif args.cmd == "verify": sys.exit(0 if verify_cert(args.cert, args.ca) else 1)
else: p.print_help()
if __name__ == "__main__": main()