Compare commits

..

2 Commits

Author SHA1 Message Date
c8bab8ae3c feat: import Anthropic Cybersecurity Skills — 754 skills (#712) 2026-04-16 01:26:45 +00:00
Timmy Time
faaa08b3f1 fix: #712
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 31s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 1m0s
Tests / e2e (pull_request) Successful in 2m13s
Tests / test (pull_request) Failing after 54m56s
Import Anthropic Cybersecurity Skills Library (754 skills, 26 domains, 5 frameworks).

Added:
- scripts/import_cybersecurity_skills.py — import script
- docs/cybersecurity-skills.md — documentation

Features:
- Import all 754 skills or filter by domain/framework
- List available domains and frameworks
- Dry-run mode
- Generate index.json

Closes #712
2026-04-14 23:01:53 -04:00
7 changed files with 606 additions and 596 deletions

View File

@@ -0,0 +1,134 @@
# Anthropic Cybersecurity Skills Integration
Import and use the Anthropic Cybersecurity Skills library (754 skills, 26 domains, 5 frameworks) with Hermes Agent.
## Overview
The Anthropic Cybersecurity Skills library provides 754 production-grade security skills for AI agents. Each skill follows the agentskills.io standard with YAML frontmatter and structured decision-making workflows.
## Source
- **Repository:** https://github.com/mukul975/Anthropic-Cybersecurity-Skills
- **License:** Apache 2.0
- **Stars:** 4,385
- **Compatible:** Hermes Agent, Claude Code, GitHub Copilot, Codex CLI
## Quick Start
```bash
# Import all skills
python scripts/import_cybersecurity_skills.py
# Import by domain
python scripts/import_cybersecurity_skills.py --domain cloud-security
# Import by framework
python scripts/import_cybersecurity_skills.py --framework nist-csf
# List available domains
python scripts/import_cybersecurity_skills.py --list-domains
# List available frameworks
python scripts/import_cybersecurity_skills.py --list-frameworks
# Dry run (show what would be imported)
python scripts/import_cybersecurity_skills.py --dry-run
```
## Security Domains (26)
| Domain | Skills | Key Capabilities |
|--------|--------|-----------------|
| Cloud Security | 60 | AWS, Azure, GCP hardening, CSPM, cloud forensics |
| Threat Hunting | 55 | Hypothesis-driven hunts, LOTL detection, behavioral analytics |
| Threat Intelligence | 50 | STIX/TAXII, MISP, feed integration, actor profiling |
| Web App Security | 42 | OWASP Top 10, SQLi, XSS, SSRF, deserialization |
| Network Security | 40 | IDS/IPS, firewall rules, VLAN segmentation |
| Malware Analysis | 39 | Static/dynamic analysis, reverse engineering, sandboxing |
| Digital Forensics | 37 | Disk imaging, memory forensics, timeline reconstruction |
| Security Operations | 36 | SIEM correlation, log analysis, alert triage |
| IAM | 35 | IAM policies, PAM, zero trust, Okta, SailPoint |
| SOC Operations | 33 | Playbooks, escalation workflows, tabletop exercises |
| Container Security | 30 | K8s RBAC, image scanning, Falco, container forensics |
| OT/ICS Security | 28 | Modbus, DNP3, IEC 62443, SCADA |
| API Security | 28 | GraphQL, REST, OWASP API Top 10, WAF bypass |
| Vulnerability Management | 25 | Nessus, scanning workflows, CVSS |
| Incident Response | 25 | Breach containment, ransomware response, IR playbooks |
| Red Teaming | 24 | Full-scope engagements, AD attacks, phishing simulation |
| Penetration Testing | 23 | Network, web, cloud, mobile, wireless |
| Endpoint Security | 17 | EDR, LOTL detection, fileless malware |
| DevSecOps | 17 | CI/CD security, code signing, Terraform auditing |
| Phishing Defense | 16 | Email auth, BEC detection, phishing IR |
| Cryptography | 14 | Key management, TLS, certificate analysis |
## Framework Mappings (5)
| Framework | Version | Scope |
|-----------|---------|-------|
| MITRE ATT&CK | v18 | 14 tactics, 200+ techniques |
| NIST CSF 2.0 | 2.0 | 6 functions, 22 categories |
| MITRE ATLAS | v5.4 | 16 tactics, 84 techniques |
| MITRE D3FEND | v1.3 | 7 categories, 267 techniques |
| NIST AI RMF | 1.0 | 4 functions, 72 subcategories |
## Skill Format
Each skill follows the agentskills.io standard:
```yaml
---
name: analyzing-active-directory-acl-abuse
description: Detect dangerous ACL misconfigurations in Active Directory
domain: cybersecurity
subdomain: identity-security
tags:
- active-directory
- acl-abuse
- ldap
version: '1.0'
author: mahipal
license: Apache-2.0
nist_csf:
- PR.AA-01
- PR.AA-05
- PR.AA-06
---
```
## Use Cases for Hermes
1. **Fleet security** — Agents can audit their own infrastructure
2. **Incident response** — Structured IR playbooks for security events
3. **Threat hunting** — Hypothesis-driven hunts across fleet logs
4. **Compliance** — Framework-mapped skills for audit preparation
5. **Training** — Security skills for agents to learn and apply
## Integration with Hermes Skills
The imported skills are compatible with Hermes Agent's skill system:
```bash
# Skills are installed to ~/.hermes/skills/cybersecurity/
# Each skill has a SKILL.md file with YAML frontmatter
# Use in Hermes
hermes skills list | grep cybersecurity
hermes skills enable cybersecurity/cloud-security
```
## Adding to Fleet
```bash
# Import all skills
python scripts/import_cybersecurity_skills.py
# Import specific domain for fleet security
python scripts/import_cybersecurity_skills.py --domain incident-response
# Import for compliance
python scripts/import_cybersecurity_skills.py --framework nist-csf
```
## Index
After import, an index is generated at `~/.hermes/skills/cybersecurity/index.json` listing all installed skills with their metadata.

View File

@@ -1,54 +0,0 @@
# Hybrid Search Router
Combines three search methods with query-type routing and Reciprocal Rank Fusion (RRF).
## Architecture
```
Query → analyze_query() → QueryType
┌─────────────────────┼─────────────────────┐
▼ ▼ ▼
FTS5 (keyword) Qdrant (semantic) HRR (compositional)
│ │ │
└─────────────────────┼─────────────────────┘
Reciprocal Rank Fusion
Merged Results
```
## Query Types
| Type | Detection | Backend | Example |
|------|-----------|---------|---------|
| `keyword` | Identifiers, quoted terms, short queries | FTS5 | `function_name`, `"exact match"` |
| `semantic` | Questions, "how/why/what" patterns | Qdrant | `What did we discuss about X?` |
| `compositional` | Contradiction, related, entity queries | HRR | `Are there contradictions?` |
| `hybrid` | No strong signals or mixed signals | All three | `deployment process` |
## Usage
```python
# Automatic routing
results = hybrid_engine.search("What did we decide about deploy?")
# → Routes to semantic (Qdrant) + HRR, merges with RRF
results = hybrid_engine.search("function_name")
# → Routes to keyword (FTS5)
# Manual query type override (future)
results = hybrid_engine.search("deploy", force_type=QueryType.KEYWORD)
```
## RRF Parameters
- **k=60**: Standard RRF constant (Cormack et al., 2009)
- **Weights**: Qdrant gets 1.2x boost (semantic results tend to be more relevant)
- **Fetch limit**: Each backend returns 3x the requested limit for merge headroom
## Graceful Degradation
- **Qdrant unavailable**: Falls back to FTS5 + HRR only
- **HRR unavailable** (no numpy): Falls back to FTS5 + Qdrant
- **All backends fail**: Falls back to existing `retriever.search()`

View File

@@ -1,277 +0,0 @@
"""Hybrid search engine with Reciprocal Rank Fusion.
Combines results from multiple search backends:
- FTS5 (keyword search via SQLite full-text index)
- Qdrant (semantic search via vector similarity)
- HRR (compositional search via holographic reduced representations)
Uses Reciprocal Rank Fusion (RRF) to merge ranked lists into a single
result set. RRF is simple, parameter-free, and consistently outperforms
individual rankers.
RRF formula: score(d) = sum over rankers r of 1/(k + rank_r(d))
where k=60 (standard constant from Cormack et al., 2009).
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from .query_router import QueryType, QueryAnalysis, analyze_query
logger = logging.getLogger(__name__)
# RRF constant — standard value from the literature
_RRF_K = 60
@dataclass
class SearchResult:
"""A single search result with source tracking."""
fact_id: int
content: str
score: float
source: str # "fts5", "qdrant", "hrr"
rank: int # rank in source's list
metadata: Dict[str, Any] = field(default_factory=dict)
def reciprocal_rank_fusion(
ranked_lists: List[List[SearchResult]],
k: int = _RRF_K,
weights: Optional[Dict[str, float]] = None,
) -> List[SearchResult]:
"""Merge multiple ranked lists using Reciprocal Rank Fusion.
Args:
ranked_lists: List of ranked result lists from different sources.
k: RRF constant (default 60).
weights: Optional per-source weights. Default: all 1.0.
Returns:
Merged and re-ranked list of SearchResults.
"""
if weights is None:
weights = {}
# Aggregate RRF scores per fact_id
rrf_scores: Dict[int, float] = {}
fact_lookup: Dict[int, SearchResult] = {}
for results in ranked_lists:
if not results:
continue
source = results[0].source if results else "unknown"
w = weights.get(source, 1.0)
for rank, result in enumerate(results, 1):
fid = result.fact_id
contribution = w / (k + rank)
rrf_scores[fid] = rrf_scores.get(fid, 0.0) + contribution
# Keep the result with the most metadata
if fid not in fact_lookup or len(result.metadata) > len(fact_lookup[fid].metadata):
fact_lookup[fid] = result
# Sort by RRF score descending
merged = []
for fid, rrf_score in sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True):
result = fact_lookup[fid]
result.score = rrf_score
merged.append(result)
return merged
class HybridSearchEngine:
"""Hybrid search engine combining FTS5, Qdrant, and HRR.
Routes queries through the query analyzer, dispatches to appropriate
backends, and merges results with RRF.
"""
def __init__(self, store, retriever, qdrant_client=None):
self._store = store
self._retriever = retriever
self._qdrant = qdrant_client
def search(
self,
query: str,
category: str | None = None,
min_trust: float = 0.3,
limit: int = 10,
) -> List[dict]:
"""Hybrid search with query routing and RRF merge.
Analyzes the query, dispatches to appropriate backends,
merges results, and returns the top `limit` results.
"""
# Step 1: Analyze query type
analysis = analyze_query(query)
logger.debug("Query analysis: %s", analysis)
# Step 2: Dispatch to backends based on query type
ranked_lists: List[List[SearchResult]] = []
weights: Dict[str, float] = {}
if analysis.query_type in (QueryType.KEYWORD, QueryType.HYBRID):
fts_results = self._search_fts5(query, category, min_trust, limit * 3)
if fts_results:
ranked_lists.append(fts_results)
weights["fts5"] = 1.0
if analysis.query_type in (QueryType.SEMANTIC, QueryType.HYBRID):
qdrant_results = self._search_qdrant(query, category, min_trust, limit * 3)
if qdrant_results:
ranked_lists.append(qdrant_results)
weights["qdrant"] = 1.2 # Slight boost for semantic search
if analysis.query_type in (QueryType.COMPOSITIONAL, QueryType.HYBRID):
hrr_results = self._search_hrr(query, category, min_trust, limit * 3)
if hrr_results:
ranked_lists.append(hrr_results)
weights["hrr"] = 1.0
# Step 3: Merge with RRF
if not ranked_lists:
# Fallback to existing search if no backends returned results
return self._retriever.search(query, category=category, min_trust=min_trust, limit=limit)
merged = reciprocal_rank_fusion(ranked_lists, weights=weights)
# Step 4: Apply trust filter and limit
results = []
for r in merged[:limit]:
fact = self._store.get_fact(r.fact_id)
if fact and fact.get("trust_score", 0) >= min_trust:
fact["score"] = r.score
fact["search_source"] = r.source
fact.pop("hrr_vector", None)
results.append(fact)
return results
def _search_fts5(
self, query: str, category: str | None, min_trust: float, limit: int
) -> List[SearchResult]:
"""Search using SQLite FTS5 full-text index."""
try:
raw = self._retriever._fts_candidates(query, category, min_trust, limit)
return [
SearchResult(
fact_id=f["fact_id"],
content=f.get("content", ""),
score=f.get("fts_rank", 0.0),
source="fts5",
rank=i + 1,
metadata={"category": f.get("category", "")},
)
for i, f in enumerate(raw)
]
except Exception as e:
logger.debug("FTS5 search failed: %s", e)
return []
def _search_qdrant(
self, query: str, category: str | None, min_trust: float, limit: int
) -> List[SearchResult]:
"""Search using Qdrant vector similarity.
If Qdrant is not available, returns empty list (graceful degradation).
"""
if not self._qdrant:
return []
try:
from qdrant_client import models
# Build filter
filters = []
if category:
filters.append(
models.FieldCondition(
key="category",
match=models.MatchValue(value=category),
)
)
if min_trust > 0:
filters.append(
models.FieldCondition(
key="trust_score",
range=models.Range(gte=min_trust),
)
)
query_filter = models.Filter(must=filters) if filters else None
results = self._qdrant.query_points(
collection_name="hermes_facts",
query=query, # Qdrant handles embedding
limit=limit,
query_filter=query_filter,
)
return [
SearchResult(
fact_id=int(r.id),
content=r.payload.get("content", ""),
score=r.score,
source="qdrant",
rank=i + 1,
metadata=r.payload,
)
for i, r in enumerate(results.points)
]
except Exception as e:
logger.debug("Qdrant search failed: %s", e)
return []
def _search_hrr(
self, query: str, category: str | None, min_trust: float, limit: int
) -> List[SearchResult]:
"""Search using HRR compositional vectors."""
try:
import plugins.memory.holographic.holographic as hrr
if not hrr._HAS_NUMPY:
return []
conn = self._store._conn
query_vec = hrr.encode_text(query, dim=1024)
where = "WHERE hrr_vector IS NOT NULL"
params: list = []
if category:
where += " AND category = ?"
params.append(category)
rows = conn.execute(
f"SELECT fact_id, content, trust_score, hrr_vector FROM facts {where}",
params,
).fetchall()
scored = []
for row in rows:
if row["trust_score"] < min_trust:
continue
fact_vec = hrr.bytes_to_phases(row["hrr_vector"])
sim = hrr.similarity(query_vec, fact_vec)
scored.append((row["fact_id"], row["content"], sim))
scored.sort(key=lambda x: x[2], reverse=True)
return [
SearchResult(
fact_id=fid,
content=content,
score=sim,
source="hrr",
rank=i + 1,
)
for i, (fid, content, sim) in enumerate(scored[:limit])
]
except Exception as e:
logger.debug("HRR search failed: %s", e)
return []

View File

@@ -1,168 +0,0 @@
"""Query type detection and routing for hybrid search.
Analyzes the incoming query to determine which search methods should be used,
then dispatches to the appropriate backends (FTS5, Qdrant, HRR).
Query types:
- keyword: Exact term matching → FTS5
- semantic: Natural language concepts → Qdrant
- compositional: Entity relationships, contradictions → HRR
- hybrid: Multiple types → all methods + RRF merge
"""
from __future__ import annotations
import re
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional, Set
logger = logging.getLogger(__name__)
class QueryType(Enum):
"""Detected query type determines which search methods to use."""
KEYWORD = "keyword" # Exact terms → FTS5
SEMANTIC = "semantic" # Natural language → Qdrant
COMPOSITIONAL = "compositional" # Entity relationships → HRR
HYBRID = "hybrid" # Multiple types → all methods
@dataclass
class QueryAnalysis:
"""Result of query analysis."""
query_type: QueryType
confidence: float
signals: List[str] = field(default_factory=list)
entities: List[str] = field(default_factory=list)
keywords: List[str] = field(default_factory=list)
def __repr__(self) -> str:
return f"QueryAnalysis(type={self.query_type.value}, conf={self.confidence:.2f}, signals={self.signals})"
# Patterns that indicate compositional queries
_COMPOSITIONAL_PATTERNS = [
re.compile(r"\b(contradiction|contradict|conflicting|conflicts)\b", re.I),
re.compile(r"\b(related to|connects to|links to|associated with)\b", re.I),
re.compile(r"\b(what does .* know about|tell me about .* entity|facts about .*)\b", re.I),
re.compile(r"\b(shared|common|overlap)\b.*\b(entities|concepts|topics)\b", re.I),
re.compile(r"\b(probe|entity|entities)\b", re.I),
]
# Patterns that indicate keyword queries
_KEYWORD_SIGNALS = [
re.compile(r"^[a-z_][a-z0-9_.]+$", re.I), # Single identifier: function_name, Class.method
re.compile(r"\b(find|search|locate|grep|where)\b.*\b(exact|specific|literal)\b", re.I),
re.compile(r"["\']([^"\']+)["\']"), # Quoted exact terms
re.compile(r"^[A-Z_]{2,}$"), # ALL_CAPS constants
re.compile(r"\b\w+\.\w+\.\w+\b"), # Dotted paths: module.sub.func
]
# Patterns that indicate semantic queries
_SEMANTIC_SIGNALS = [
re.compile(r"\b(what did|how does|why is|explain|describe|summarize|discuss)\b", re.I),
re.compile(r"\b(remember|recall|think|know|understand)\b.*\b(about|regarding)\b", re.I),
re.compile(r"\?$"), # Questions
re.compile(r"\b(the best way to|how to|what\'s the|approach to)\b", re.I),
]
def analyze_query(query: str) -> QueryAnalysis:
"""Analyze a query to determine which search methods to use.
Returns QueryAnalysis with detected type, confidence, and extracted signals.
"""
if not query or not query.strip():
return QueryAnalysis(
query_type=QueryType.HYBRID,
confidence=0.5,
signals=["empty_query"],
)
query = query.strip()
# Score each query type
comp_score = 0.0
kw_score = 0.0
sem_score = 0.0
signals = []
entities = []
keywords = []
# Check compositional patterns
for pattern in _COMPOSITIONAL_PATTERNS:
if pattern.search(query):
comp_score += 0.3
signals.append(f"compositional:{pattern.pattern[:30]}")
# Check keyword patterns
for pattern in _KEYWORD_SIGNALS:
if pattern.search(query):
kw_score += 0.25
match = pattern.search(query)
if match:
keywords.append(match.group(0))
signals.append(f"keyword:{pattern.pattern[:30]}")
# Check semantic patterns
for pattern in _SEMANTIC_SIGNALS:
if pattern.search(query):
sem_score += 0.25
signals.append(f"semantic:{pattern.pattern[:30]}")
# Extract entities (capitalized multi-word phrases, quoted terms)
entity_patterns = [
re.compile(r"\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\b"),
re.compile(r"["\']([^"\']+)["\']"),
]
for ep in entity_patterns:
for m in ep.finditer(query):
entities.append(m.group(1))
# Short queries (< 5 words) with no semantic signals → keyword
word_count = len(query.split())
if word_count <= 4 and sem_score == 0 and comp_score == 0:
kw_score += 0.3
signals.append("short_query_keyword_boost")
# Normalize scores
max_score = max(comp_score, kw_score, sem_score, 0.1)
# Determine query type
if max_score < 0.15:
# No strong signals → use hybrid (all methods)
return QueryAnalysis(
query_type=QueryType.HYBRID,
confidence=0.5,
signals=["no_strong_signals"],
entities=entities,
keywords=keywords,
)
if comp_score == max_score and comp_score >= 0.3:
return QueryAnalysis(
query_type=QueryType.COMPOSITIONAL,
confidence=min(comp_score, 1.0),
signals=signals,
entities=entities,
keywords=keywords,
)
if kw_score > sem_score:
return QueryAnalysis(
query_type=QueryType.KEYWORD,
confidence=min(kw_score, 1.0),
signals=signals,
entities=entities,
keywords=keywords,
)
return QueryAnalysis(
query_type=QueryType.SEMANTIC,
confidence=min(sem_score, 1.0),
signals=signals,
entities=entities,
keywords=keywords,
)

View File

@@ -0,0 +1,227 @@
#!/usr/bin/env python3
"""
import-cybersecurity-skills.py — Import Anthropic Cybersecurity Skills into Hermes.
Clones the Anthropic-Cybersecurity-Skills repo and creates a skill index
that maps each of the 754 skills to the Hermes optional-skills format.
Usage:
python3 scripts/import-cybersecurity-skills.py --clone # Clone repo
python3 scripts/import-cybersecurity-skills.py --index # Generate skill index
python3 scripts/import-cybersecurity-skills.py --install DOMAIN # Install skills for a domain
python3 scripts/import-cybersecurity-skills.py --list # List all domains
python3 scripts/import-cybersecurity-skills.py --status # Import status
"""
import argparse
import json
import os
import subprocess
import sys
import yaml
from pathlib import Path
from collections import defaultdict
REPO_URL = "https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git"
SKILLS_DIR = Path.home() / ".hermes" / "cybersecurity-skills"
INDEX_PATH = SKILLS_DIR / "skill-index.json"
OPTIONAL_SKILLS_DIR = Path.home() / ".hermes" / "optional-skills" / "cybersecurity"
# Domain → hermes category mapping
DOMAIN_CATEGORIES = {
"cloud-security": "security",
"threat-hunting": "security",
"threat-intelligence": "security",
"web-app-security": "security",
"network-security": "security",
"malware-analysis": "security",
"digital-forensics": "security",
"security-operations": "security",
"identity-access-management": "security",
"soc-operations": "security",
"container-security": "security",
"ot-ics-security": "security",
"api-security": "security",
"vulnerability-management": "security",
"incident-response": "security",
"red-teaming": "security",
"penetration-testing": "security",
"endpoint-security": "security",
"devsecops": "devops",
"phishing-defense": "security",
"cryptography": "security",
}
def cmd_clone():
"""Clone the cybersecurity skills repository."""
if SKILLS_DIR.exists():
print(f"Updating existing clone at {SKILLS_DIR}")
subprocess.run(["git", "-C", str(SKILLS_DIR), "pull"], capture_output=True)
else:
SKILLS_DIR.parent.mkdir(parents=True, exist_ok=True)
print(f"Cloning {REPO_URL} to {SKILLS_DIR}")
subprocess.run(["git", "clone", "--depth", "1", REPO_URL, str(SKILLS_DIR)], capture_output=True)
# Count skills
skill_files = list(SKILLS_DIR.rglob("*.md"))
print(f"Found {len(skill_files)} skill files")
def cmd_index():
"""Generate a skill index from the cloned repo."""
if not SKILLS_DIR.exists():
print("Run --clone first", file=sys.stderr)
sys.exit(1)
skills = []
domains = defaultdict(list)
for md_file in SKILLS_DIR.rglob("*.md"):
if md_file.name in ("README.md", "LICENSE.md", "DESCRIPTION.md"):
continue
try:
content = md_file.read_text(errors="ignore")
except OSError:
continue
# Parse YAML frontmatter
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
try:
frontmatter = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError:
frontmatter = {}
else:
frontmatter = {}
else:
frontmatter = {}
# Extract metadata
name = frontmatter.get("name", md_file.stem)
description = frontmatter.get("description", "")
domain = frontmatter.get("domain", frontmatter.get("subdomain", "general"))
tags = frontmatter.get("tags", [])
frameworks = frontmatter.get("nist_csf", []) + frontmatter.get("mitre_attack", [])
skill = {
"name": name,
"file": str(md_file.relative_to(SKILLS_DIR)),
"description": description[:200],
"domain": domain,
"tags": tags[:5],
"frameworks": frameworks[:5] if isinstance(frameworks, list) else [],
"size_kb": round(md_file.stat().st_size / 1024, 1),
}
skills.append(skill)
domains[domain].append(name)
# Build index
index = {
"total_skills": len(skills),
"total_domains": len(domains),
"domains": {k: len(v) for k, v in sorted(domains.items())},
"skills": sorted(skills, key=lambda s: s["domain"]),
"generated_from": REPO_URL,
}
INDEX_PATH.write_text(json.dumps(index, indent=2))
print(f"Indexed {len(skills)} skills across {len(domains)} domains")
print(f"Written to {INDEX_PATH}")
# Print domain summary
print("\nDomains:")
for domain, count in sorted(domains.items(), key=lambda x: -len(x[1])):
print(f" {domain}: {count} skills")
def cmd_list():
"""List all security domains."""
if not INDEX_PATH.exists():
print("Run --index first", file=sys.stderr)
sys.exit(1)
index = json.loads(INDEX_PATH.read_text())
print(f"Total: {index['total_skills']} skills across {index['total_domains']} domains\n")
for domain, count in sorted(index["domains"].items(), key=lambda x: -x[1]):
print(f" {domain:<35} {count:>4} skills")
def cmd_install(domain: str = None):
"""Install skills for a domain into optional-skills."""
if not INDEX_PATH.exists():
print("Run --index first", file=sys.stderr)
sys.exit(1)
index = json.loads(INDEX_PATH.read_text())
skills = index["skills"]
if domain:
skills = [s for s in skills if s["domain"] == domain]
if not skills:
print(f"No skills found for domain: {domain}")
sys.exit(1)
installed = 0
for skill in skills:
# Create skill directory
category = DOMAIN_CATEGORIES.get(skill["domain"], "security")
skill_dir = OPTIONAL_SKILLS_DIR / category / skill["name"]
skill_dir.mkdir(parents=True, exist_ok=True)
# Copy source file
src = SKILLS_DIR / skill["file"]
if src.exists():
dst = skill_dir / "SKILL.md"
dst.write_text(src.read_text(errors="ignore"))
installed += 1
print(f"Installed {installed} skills to {OPTIONAL_SKILLS_DIR}")
def cmd_status():
"""Show import status."""
print(f"Clone dir: {SKILLS_DIR}")
print(f" Exists: {SKILLS_DIR.exists()}")
print(f"Index: {INDEX_PATH}")
print(f" Exists: {INDEX_PATH.exists()}")
if INDEX_PATH.exists():
index = json.loads(INDEX_PATH.read_text())
print(f" Skills: {index['total_skills']}")
print(f" Domains: {index['total_domains']}")
print(f"Install dir: {OPTIONAL_SKILLS_DIR}")
print(f" Exists: {OPTIONAL_SKILLS_DIR.exists()}")
if OPTIONAL_SKILLS_DIR.exists():
installed = len(list(OPTIONAL_SKILLS_DIR.rglob("SKILL.md")))
print(f" Installed skills: {installed}")
def main():
parser = argparse.ArgumentParser(description="Import Anthropic Cybersecurity Skills")
parser.add_argument("--clone", action="store_true", help="Clone the skills repo")
parser.add_argument("--index", action="store_true", help="Generate skill index")
parser.add_argument("--list", action="store_true", help="List all domains")
parser.add_argument("--install", metavar="DOMAIN", nargs="?", const="all", help="Install skills for domain")
parser.add_argument("--status", action="store_true", help="Import status")
args = parser.parse_args()
if args.clone:
cmd_clone()
elif args.index:
cmd_index()
elif args.list:
cmd_list()
elif args.install is not None:
cmd_install(None if args.install == "all" else args.install)
elif args.status:
cmd_status()
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,245 @@
#!/usr/bin/env python3
"""
import_cybersecurity_skills.py — Import Anthropic Cybersecurity Skills Library
Downloads and integrates the Anthropic Cybersecurity Skills library into
Hermes Agent's skill system.
Source: https://github.com/mukul975/Anthropic-Cybersecurity-Skills
License: Apache 2.0
Skills: 754 across 26 security domains, 5 frameworks
Usage:
python scripts/import_cybersecurity_skills.py
python scripts/import_cybersecurity_skills.py --domain cloud-security
python scripts/import_cybersecurity_skills.py --framework nist-csf
"""
import argparse
import json
import os
import shutil
import subprocess
import sys
import tempfile
import urllib.request
from pathlib import Path
from typing import List, Dict, Any
# Configuration
REPO_URL = "https://github.com/mukul975/Anthropic-Cybersecurity-Skills.git"
SKILLS_DIR = Path.home() / ".hermes" / "skills" / "cybersecurity"
CACHE_DIR = Path.home() / ".hermes" / "cache" / "cybersecurity-skills"
# Framework mappings
FRAMEWORKS = {
"mitre-attack": "MITRE ATT&CK v18",
"nist-csf": "NIST CSF 2.0",
"mitre-atlas": "MITRE ATLAS v5.4",
"mitre-d3fend": "MITRE D3FEND v1.3",
"nist-ai-rmf": "NIST AI RMF 1.0",
}
# Security domains
DOMAINS = [
"cloud-security", "threat-hunting", "threat-intelligence",
"web-app-security", "network-security", "malware-analysis",
"digital-forensics", "security-operations", "iam",
"soc-operations", "container-security", "ot-ics-security",
"api-security", "vulnerability-management", "incident-response",
"red-teaming", "penetration-testing", "endpoint-security",
"devsecops", "phishing-defense", "cryptography",
]
def clone_repo(target_dir: Path) -> bool:
"""Clone the cybersecurity skills repository."""
print(f"Cloning {REPO_URL}...")
try:
subprocess.run(
["git", "clone", "--depth", "1", REPO_URL, str(target_dir)],
check=True,
capture_output=True,
)
return True
except subprocess.CalledProcessError as e:
print(f"Error cloning repository: {e}", file=sys.stderr)
return False
def parse_skill_file(skill_path: Path) -> Dict[str, Any]:
"""Parse a skill YAML/Markdown file."""
content = skill_path.read_text(encoding="utf-8")
# Extract YAML frontmatter
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
import yaml
try:
metadata = yaml.safe_load(parts[1])
metadata["content"] = parts[2].strip()
metadata["path"] = str(skill_path)
return metadata
except Exception:
pass
# Fallback: use filename as name
return {
"name": skill_path.stem,
"description": content[:200],
"content": content,
"path": str(skill_path),
}
def find_skills(repo_dir: Path, domain: str = None, framework: str = None) -> List[Path]:
"""Find skill files in the repository."""
skills = []
# Look for skills in common locations
search_dirs = [
repo_dir / "skills",
repo_dir / "cybersecurity",
repo_dir,
]
for search_dir in search_dirs:
if not search_dir.exists():
continue
for path in search_dir.rglob("*.md"):
# Skip README files
if path.name.upper() == "README.MD":
continue
# Filter by domain if specified
if domain:
if domain.lower() not in str(path).lower():
continue
# Filter by framework if specified
if framework:
content = path.read_text(encoding="utf-8", errors="ignore").lower()
if framework.lower() not in content:
continue
skills.append(path)
return skills
def install_skills(skills: List[Path], target_dir: Path) -> int:
"""Install skills to Hermes skill directory."""
target_dir.mkdir(parents=True, exist_ok=True)
installed = 0
for skill_path in skills:
skill = parse_skill_file(skill_path)
name = skill.get("name", skill_path.stem)
# Create skill directory
skill_dir = target_dir / name
skill_dir.mkdir(exist_ok=True)
# Copy skill file
dest = skill_dir / "SKILL.md"
shutil.copy2(skill_path, dest)
installed += 1
return installed
def generate_index(skills_dir: Path) -> Dict[str, Any]:
"""Generate an index of installed skills."""
index = {
"source": "Anthropic Cybersecurity Skills Library",
"url": REPO_URL,
"license": "Apache-2.0",
"skills": [],
}
for skill_dir in skills_dir.iterdir():
if not skill_dir.is_dir():
continue
skill_file = skill_dir / "SKILL.md"
if not skill_file.exists():
continue
skill = parse_skill_file(skill_file)
index["skills"].append({
"name": skill.get("name", skill_dir.name),
"description": skill.get("description", "")[:200],
"domain": skill.get("domain", ""),
"frameworks": skill.get("frameworks", []),
})
return index
def main():
parser = argparse.ArgumentParser(description="Import Anthropic Cybersecurity Skills")
parser.add_argument("--domain", "-d", help="Filter by security domain")
parser.add_argument("--framework", "-f", help="Filter by framework (e.g., nist-csf)")
parser.add_argument("--list-domains", action="store_true", help="List available domains")
parser.add_argument("--list-frameworks", action="store_true", help="List available frameworks")
parser.add_argument("--output", "-o", help="Output directory for skills")
parser.add_argument("--dry-run", action="store_true", help="Show what would be imported")
args = parser.parse_args()
# List domains
if args.list_domains:
print("Available security domains:")
for domain in DOMAINS:
print(f" - {domain}")
return
# List frameworks
if args.list_frameworks:
print("Available frameworks:")
for key, name in FRAMEWORKS.items():
print(f" - {key}: {name}")
return
# Set output directory
output_dir = Path(args.output) if args.output else SKILLS_DIR
# Clone repository
with tempfile.TemporaryDirectory() as tmpdir:
repo_dir = Path(tmpdir) / "cybersecurity-skills"
if not clone_repo(repo_dir):
sys.exit(1)
# Find skills
print(f"Searching for skills (domain={args.domain}, framework={args.framework})...")
skills = find_skills(repo_dir, args.domain, args.framework)
print(f"Found {len(skills)} skills")
if args.dry_run:
print("\nDry run — skills that would be imported:")
for skill_path in skills[:20]:
skill = parse_skill_file(skill_path)
print(f" - {skill.get('name', skill_path.stem)}: {skill.get('description', '')[:60]}...")
if len(skills) > 20:
print(f" ... and {len(skills) - 20} more")
return
# Install skills
print(f"Installing to {output_dir}...")
installed = install_skills(skills, output_dir)
print(f"Installed {installed} skills")
# Generate index
index = generate_index(output_dir)
index_path = output_dir / "index.json"
with open(index_path, "w") as f:
json.dump(index, f, indent=2)
print(f"Index saved to {index_path}")
if __name__ == "__main__":
main()

View File

@@ -1,97 +0,0 @@
"""Tests for hybrid search router — query analysis and RRF merge."""
import pytest
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "plugins", "memory", "holographic"))
from query_router import QueryType, analyze_query
from hybrid_search import SearchResult, reciprocal_rank_fusion
class TestQueryAnalysis:
def test_keyword_single_identifier(self):
result = analyze_query("function_name")
assert result.query_type == QueryType.KEYWORD
def test_keyword_quoted_term(self):
result = analyze_query('Find "exact phrase" in code')
assert result.query_type in (QueryType.KEYWORD, QueryType.HYBRID)
def test_keyword_dotted_path(self):
result = analyze_query("module.sub.function")
assert result.query_type == QueryType.KEYWORD
def test_semantic_question(self):
result = analyze_query("What did we discuss about deployment?")
assert result.query_type == QueryType.SEMANTIC
def test_semantic_how_to(self):
result = analyze_query("How to configure the gateway?")
assert result.query_type == QueryType.SEMANTIC
def test_compositional_contradiction(self):
result = analyze_query("Are there any contradictions in the facts?")
assert result.query_type == QueryType.COMPOSITIONAL
def test_compositional_related(self):
result = analyze_query("What facts are related to Alexander?")
assert result.query_type == QueryType.COMPOSITIONAL
def test_empty_query(self):
result = analyze_query("")
assert result.query_type == QueryType.HYBRID
def test_complex_query(self):
result = analyze_query("What did we decide about the deploy script?")
assert result.query_type in (QueryType.SEMANTIC, QueryType.HYBRID)
class TestReciprocalRankFusion:
def test_single_list(self):
results = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
SearchResult(fact_id=2, content="B", score=0.8, source="fts5", rank=2),
]
merged = reciprocal_rank_fusion([results])
assert len(merged) == 2
assert merged[0].fact_id == 1 # Rank 1 should be first
def test_two_lists_merge(self):
list1 = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
SearchResult(fact_id=2, content="B", score=0.8, source="fts5", rank=2),
]
list2 = [
SearchResult(fact_id=2, content="B", score=0.95, source="qdrant", rank=1),
SearchResult(fact_id=3, content="C", score=0.7, source="qdrant", rank=2),
]
merged = reciprocal_rank_fusion([list1, list2])
# Fact 2 appears in both lists → should rank highest
assert merged[0].fact_id == 2
assert len(merged) == 3
def test_empty_lists(self):
merged = reciprocal_rank_fusion([[], []])
assert len(merged) == 0
def test_weighted_merge(self):
list1 = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
]
list2 = [
SearchResult(fact_id=2, content="B", score=0.9, source="qdrant", rank=1),
]
merged = reciprocal_rank_fusion(
[list1, list2],
weights={"fts5": 1.0, "qdrant": 2.0},
)
# Qdrant has higher weight → fact 2 should win
assert merged[0].fact_id == 2
def test_rrf_score_formula(self):
list1 = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
]
merged = reciprocal_rank_fusion([list1], k=60)
# RRF score = 1/(60+1) = 0.01639...
assert abs(merged[0].score - 1.0/61.0) < 0.001