Compare commits

..

4 Commits

Author SHA1 Message Date
Alexander Whitestone
e28d16b324 test: cover patch did-you-mean end-to-end (#960)
All checks were successful
Lint / lint (pull_request) Successful in 18s
- add focused QA tests for replace-mode rich hints without legacy generic hints
- verify ambiguous replace failures do not show did-you-mean noise
- verify V4A patch validation surfaces rich hints
- verify skill patching surfaces rich hints on true no-match failures
2026-04-22 10:43:34 -04:00
Alexander Whitestone
bc32047610 wip: fix patch did-you-mean dependencies (#960)
- restore escape-drift guard needed by current fuzzy-match tests
- import missing typing symbols in file_tools after porting patch hint logic
2026-04-22 10:41:29 -04:00
Teknium
3a24420d7d fix(patch): gate 'did you mean?' to no-match + extend to v4a/skill_manage
Follow-ups on top of @teyrebaz33's cherry-picked commit:

1. New shared helper format_no_match_hint() in fuzzy_match.py with a
   startswith('Could not find') gate so the snippet only appends to
   genuine no-match errors — not to 'Found N matches' (ambiguous),
   'Escape-drift detected', or 'identical strings' errors, which would
   all mislead the model.

2. file_tools.patch_tool suppresses the legacy generic '[Hint: old_string
   not found...]' string when the rich 'Did you mean?' snippet is
   already attached — no more double-hint.

3. Wire the same helper into patch_parser.py (V4A patch mode, both
   _validate_operations and _apply_update) and skill_manager_tool.py so
   all three fuzzy callers surface the hint consistently.

Tests: 7 new gating tests in TestFormatNoMatchHint cover every error
class (ambiguous, drift, identical, non-zero match count, None error,
no similar content, happy path). 34/34 test_fuzzy_match, 96/96
test_file_tools + test_patch_parser + test_skill_manager_tool pass.
E2E verified across all four scenarios: no-match-with-similar,
no-match-no-similar, ambiguous, success. V4A mode confirmed
end-to-end with a non-matching hunk.

(cherry picked from commit 5e6427a42c)
2026-04-22 10:30:00 -04:00
teyrebaz33
d14c1c5a56 feat(patch): add 'did you mean?' feedback when patch fails to match
When patch_replace() cannot find old_string in a file, the error message
now includes the closest matching lines from the file with line numbers
and context. This helps the LLM self-correct without a separate read_file
call.

Implements Phase 1 of #536: enhanced patch error feedback with no
architectural changes.

- tools/fuzzy_match.py: new find_closest_lines() using SequenceMatcher
- tools/file_operations.py: attach closest-lines hint to patch errors
- tests/tools/test_fuzzy_match.py: 5 new tests for find_closest_lines

(cherry picked from commit 15abf4ed8f)
2026-04-22 10:28:40 -04:00
9 changed files with 544 additions and 522 deletions

View File

@@ -1,70 +1,43 @@
from __future__ import annotations
"""
A2A agent card generation for fleet discovery.
Agent Card — A2A-compliant agent discovery.
Part of #843: fix: implement A2A agent card for fleet discovery (#819)
Refs #801.
Closes #802.
Provides metadata about the agent's identity, capabilities, and installed skills
for discovery by other agents in the fleet.
"""
import argparse
import json
import logging
import os
import socket
import sys
from dataclasses import asdict, dataclass, field
from typing import Any, Dict, Iterable, List, Mapping, Sequence
from urllib.parse import urlparse, urlunparse
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_cli import __version__
from hermes_cli.config import load_config
from hermes_cli.config import load_config, get_hermes_home
from agent.skill_utils import (
get_all_skills_dirs,
get_disabled_skill_names,
iter_skill_index_files,
parse_frontmatter,
skill_matches_platform,
get_all_skills_dirs,
get_disabled_skill_names,
skill_matches_platform
)
logger = logging.getLogger(__name__)
DEFAULT_DESCRIPTION = "Sovereign AI agent — orchestration, code, research"
DEFAULT_INPUT_MODES = ["text/plain", "application/json"]
DEFAULT_OUTPUT_MODES = ["text/plain", "application/json"]
_REQUIRED_CAPABILITY_FLAGS = (
"streaming",
"pushNotifications",
"stateTransitionHistory",
)
@dataclass
class AgentSkill:
id: str
name: str
description: str = ""
tags: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
data: Dict[str, Any] = {"id": self.id, "name": self.name}
if self.description:
data["description"] = self.description
if self.tags:
data["tags"] = self.tags
return data
version: str = "1.0.0"
@dataclass
class AgentCapabilities:
streaming: bool = True
pushNotifications: bool = False
stateTransitionHistory: bool = True
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
tools: bool = True
vision: bool = False
reasoning: bool = False
@dataclass
class AgentCard:
@@ -74,81 +47,14 @@ class AgentCard:
version: str = __version__
capabilities: AgentCapabilities = field(default_factory=AgentCapabilities)
skills: List[AgentSkill] = field(default_factory=list)
defaultInputModes: List[str] = field(default_factory=lambda: list(DEFAULT_INPUT_MODES))
defaultOutputModes: List[str] = field(default_factory=lambda: list(DEFAULT_OUTPUT_MODES))
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
data: Dict[str, Any] = {
"name": self.name,
"description": self.description,
"url": self.url,
"version": self.version,
"capabilities": self.capabilities.to_dict(),
"skills": [skill.to_dict() for skill in self.skills],
"defaultInputModes": list(self.defaultInputModes),
"defaultOutputModes": list(self.defaultOutputModes),
}
if self.metadata:
data["metadata"] = dict(self.metadata)
return data
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), indent=indent)
def _env_or_empty(key: str) -> str:
return os.environ.get(key, "").strip()
def _as_agent_config(config: Mapping[str, Any] | None) -> Dict[str, Any]:
if not isinstance(config, Mapping):
return {}
agent_cfg = config.get("agent")
return dict(agent_cfg) if isinstance(agent_cfg, Mapping) else {}
def _as_a2a_config(config: Mapping[str, Any] | None) -> Dict[str, Any]:
if not isinstance(config, Mapping):
return {}
a2a_cfg = config.get("a2a")
return dict(a2a_cfg) if isinstance(a2a_cfg, Mapping) else {}
def _normalize_string_list(value: Any) -> List[str]:
if value is None:
return []
if isinstance(value, str):
parts = value.split(",")
elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)):
parts = list(value)
else:
parts = [value]
out: List[str] = []
seen = set()
for item in parts:
text = str(item).strip()
if not text or text in seen:
continue
seen.add(text)
out.append(text)
return out
def _normalize_skill_tags(frontmatter: Mapping[str, Any]) -> List[str]:
tags = _normalize_string_list(frontmatter.get("tags"))
category = str(frontmatter.get("category") or "").strip()
if category and category not in tags:
tags.append(category)
return tags
defaultInputModes: List[str] = field(default_factory=lambda: ["text/plain"])
defaultOutputModes: List[str] = field(default_factory=lambda: ["text/plain"])
def _load_skills() -> List[AgentSkill]:
"""Scan enabled skills and return A2A skill metadata."""
skills: List[AgentSkill] = []
"""Scan all enabled skills and return metadata."""
skills = []
disabled = get_disabled_skill_names()
seen_ids = set()
for skills_dir in get_all_skills_dirs():
if not skills_dir.is_dir():
continue
@@ -159,262 +65,71 @@ def _load_skills() -> List[AgentSkill]:
except Exception:
continue
skill_name = frontmatter.get("name") or skill_file.parent.name
if str(skill_name) in disabled:
continue
if not skill_matches_platform(frontmatter):
continue
skill_id = str(frontmatter.get("name") or skill_file.parent.name).strip().lower().replace(" ", "-")
if skill_id in disabled or skill_id in seen_ids:
continue
seen_ids.add(skill_id)
skills.append(AgentSkill(
id=str(skill_name),
name=str(frontmatter.get("name", skill_name)),
description=str(frontmatter.get("description", "")),
version=str(frontmatter.get("version", "1.0.0"))
))
return skills
display_name = str(frontmatter.get("title") or frontmatter.get("name") or skill_file.parent.name).strip()
description = str(frontmatter.get("description") or "").strip()
tags = _normalize_skill_tags(frontmatter)
skills.append(
AgentSkill(
id=skill_id,
name=display_name,
description=description,
tags=tags,
)
)
def build_agent_card() -> AgentCard:
"""Build the agent card from current configuration and environment."""
config = load_config()
# Identity
name = os.environ.get("HERMES_AGENT_NAME") or config.get("agent", {}).get("name") or "hermes"
description = os.environ.get("HERMES_AGENT_DESCRIPTION") or config.get("agent", {}).get("description") or "Sovereign AI agent"
# URL - try to determine from environment or config
port = os.environ.get("HERMES_WEB_PORT") or "9119"
host = os.environ.get("HERMES_WEB_HOST") or "localhost"
url = f"http://{host}:{port}"
# Capabilities
# In a real scenario, we'd check model metadata for vision/reasoning
capabilities = AgentCapabilities(
streaming=True,
tools=True,
vision=False, # Default to false unless we can confirm
reasoning=False
)
# Skills
skills = _load_skills()
return AgentCard(
name=name,
description=description,
url=url,
version=__version__,
capabilities=capabilities,
skills=skills
)
return sorted(skills, key=lambda skill: skill.id)
def _get_agent_name(config: Mapping[str, Any] | None, override: str | None = None) -> str:
if override:
return override
env_name = _env_or_empty("HERMES_AGENT_NAME") or _env_or_empty("AGENT_NAME")
if env_name:
return env_name
agent_cfg = _as_agent_config(config)
if agent_cfg.get("name"):
return str(agent_cfg["name"]).strip()
def get_agent_card_json() -> str:
"""Return the agent card as a JSON string."""
try:
hostname = socket.gethostname().split(".", 1)[0].strip()
if hostname:
return hostname
except Exception:
pass
return "hermes"
def _get_description(config: Mapping[str, Any] | None, override: str | None = None) -> str:
if override:
return override
env_description = _env_or_empty("HERMES_AGENT_DESCRIPTION") or _env_or_empty("AGENT_DESCRIPTION")
if env_description:
return env_description
agent_cfg = _as_agent_config(config)
if agent_cfg.get("description"):
return str(agent_cfg["description"]).strip()
return DEFAULT_DESCRIPTION
def _normalize_a2a_url(url: str) -> str:
raw = (url or "").strip()
if not raw:
return ""
parsed = urlparse(raw if "://" in raw else f"https://{raw}")
scheme = parsed.scheme or "https"
netloc = parsed.netloc or parsed.path
path = parsed.path if parsed.netloc else ""
normalized_path = path.rstrip("/") if path not in ("", "/") else ""
if not normalized_path.endswith("/a2a"):
normalized_path = f"{normalized_path}/a2a" if normalized_path else "/a2a"
return urlunparse((scheme, netloc, normalized_path, "", "", ""))
def _get_agent_url(config: Mapping[str, Any] | None, override: str | None = None) -> str:
if override:
return _normalize_a2a_url(override)
agent_cfg = _as_agent_config(config)
a2a_cfg = _as_a2a_config(config)
explicit = (
_env_or_empty("HERMES_A2A_PUBLIC_URL")
or str(a2a_cfg.get("public_url") or "").strip()
or str(agent_cfg.get("a2a_public_url") or "").strip()
)
if explicit:
return _normalize_a2a_url(explicit)
host = (
_env_or_empty("HERMES_A2A_HOST")
or str(a2a_cfg.get("host") or "").strip()
or _env_or_empty("HERMES_WEB_HOST")
or str(agent_cfg.get("host") or "").strip()
or "localhost"
)
port = (
_env_or_empty("HERMES_A2A_PORT")
or str(a2a_cfg.get("port") or "").strip()
or _env_or_empty("HERMES_WEB_PORT")
or str(agent_cfg.get("port") or "").strip()
or "9119"
)
scheme = (
_env_or_empty("HERMES_A2A_SCHEME")
or str(a2a_cfg.get("scheme") or "").strip()
or ("https" if (_env_or_empty("HERMES_MTLS_CERT") or str(port) == "9443") else "http")
)
return _normalize_a2a_url(f"{scheme}://{host}:{port}")
def _merge_skills(base_skills: Iterable[AgentSkill], extra_skills: Iterable[AgentSkill] | None = None) -> List[AgentSkill]:
merged: Dict[str, AgentSkill] = {}
for skill in list(base_skills) + list(extra_skills or []):
if skill.id not in merged:
merged[skill.id] = skill
return [merged[key] for key in sorted(merged)]
def build_agent_card(
*,
name: str | None = None,
description: str | None = None,
url: str | None = None,
extra_skills: Iterable[AgentSkill] | None = None,
metadata: Mapping[str, Any] | None = None,
) -> AgentCard:
"""Build an A2A-compliant agent card from config, env, and installed skills."""
try:
config = load_config()
except Exception as exc:
logger.debug("Falling back to empty config while building agent card: %s", exc)
config = {}
card = AgentCard(
name=_get_agent_name(config, override=name),
description=_get_description(config, override=description),
url=_get_agent_url(config, override=url),
skills=_merge_skills(_load_skills(), extra_skills),
metadata=dict(metadata or {}),
)
return card
def validate_agent_card(card: AgentCard | Dict[str, Any]) -> List[str]:
"""Return a list of schema-validation errors for an agent card."""
data = card.to_dict() if isinstance(card, AgentCard) else dict(card)
errors: List[str] = []
for field_name in ("name", "description", "url", "version"):
value = data.get(field_name)
if not isinstance(value, str) or not value.strip():
errors.append(f"{field_name} must be a non-empty string")
url_value = str(data.get("url") or "")
parsed = urlparse(url_value)
if not parsed.scheme or not parsed.netloc:
errors.append("url must be an absolute http/https URL")
elif parsed.scheme not in {"http", "https"}:
errors.append("url must use http or https")
elif not parsed.path.rstrip("/").endswith("/a2a"):
errors.append("url must point to the /a2a endpoint")
capabilities = data.get("capabilities")
if not isinstance(capabilities, Mapping):
errors.append("capabilities must be an object")
else:
for capability_name in _REQUIRED_CAPABILITY_FLAGS:
if not isinstance(capabilities.get(capability_name), bool):
errors.append(f"capabilities.{capability_name} must be a boolean")
for field_name, required_modes in (
("defaultInputModes", DEFAULT_INPUT_MODES),
("defaultOutputModes", DEFAULT_OUTPUT_MODES),
):
modes = data.get(field_name)
if not isinstance(modes, list) or not modes:
errors.append(f"{field_name} must be a non-empty list of MIME types")
continue
for mode in modes:
if not isinstance(mode, str) or "/" not in mode:
errors.append(f"{field_name} entries must be MIME types")
for required_mode in required_modes:
if required_mode not in modes:
errors.append(f"{field_name} must include {required_mode}")
skills = data.get("skills")
if not isinstance(skills, list):
errors.append("skills must be a list")
else:
for index, skill in enumerate(skills):
if not isinstance(skill, Mapping):
errors.append(f"skills[{index}] must be an object")
continue
if not str(skill.get("id") or "").strip():
errors.append(f"skills[{index}] missing id")
if not str(skill.get("name") or "").strip():
errors.append(f"skills[{index}] missing name")
tags = skill.get("tags", [])
if tags is None:
tags = []
if not isinstance(tags, list):
errors.append(f"skills[{index}].tags must be a list")
else:
for tag in tags:
if not isinstance(tag, str) or not tag.strip():
errors.append(f"skills[{index}].tags entries must be non-empty strings")
metadata = data.get("metadata")
if metadata is not None and not isinstance(metadata, Mapping):
errors.append("metadata must be an object when present")
return errors
def get_agent_card_json(
*,
name: str | None = None,
description: str | None = None,
url: str | None = None,
metadata: Mapping[str, Any] | None = None,
indent: int = 2,
) -> str:
"""Return the local agent card as JSON, falling back to an error card on failure."""
try:
card = build_agent_card(name=name, description=description, url=url, metadata=metadata)
errors = validate_agent_card(card)
if errors:
raise ValueError("; ".join(errors))
return card.to_json(indent=indent)
except Exception as exc:
logger.error("Failed to build agent card: %s", exc)
card = build_agent_card()
return json.dumps(asdict(card), indent=2)
except Exception as e:
logger.error(f"Failed to build agent card: {e}")
# Minimal fallback card
fallback = {
"name": name or _env_or_empty("HERMES_AGENT_NAME") or "hermes",
"description": "Sovereign AI agent (agent card fallback)",
"url": url or "http://localhost:9119/a2a",
"name": "hermes",
"description": "Sovereign AI agent (fallback)",
"version": __version__,
"capabilities": AgentCapabilities().to_dict(),
"skills": [],
"defaultInputModes": list(DEFAULT_INPUT_MODES),
"defaultOutputModes": list(DEFAULT_OUTPUT_MODES),
"error": str(exc),
"error": str(e)
}
return json.dumps(fallback, indent=indent)
return json.dumps(fallback, indent=2)
def main(argv: Sequence[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Generate an A2A-compliant Hermes agent card")
parser.add_argument("--name", help="Override the agent name")
parser.add_argument("--description", help="Override the agent description")
parser.add_argument("--url", help="Override the public A2A URL")
parser.add_argument("--validate", action="store_true", help="Validate before printing; exit 1 on schema errors")
args = parser.parse_args(list(argv) if argv is not None else None)
card = build_agent_card(name=args.name, description=args.description, url=args.url)
errors = validate_agent_card(card)
if args.validate and errors:
for error in errors:
print(error, file=sys.stderr)
return 1
print(card.to_json(indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
def validate_agent_card(card_data: Dict[str, Any]) -> bool:
"""Check if the card data complies with the A2A schema."""
required = ["name", "description", "url", "version"]
return all(k in card_data for k in required)

View File

@@ -1,150 +0,0 @@
from __future__ import annotations
import json
from pathlib import Path
import pytest
from agent import agent_card as mod
DEFAULT_DESCRIPTION = "Sovereign AI agent — orchestration, code, research"
def _set_base_context(monkeypatch, *, name: str = "Timmy", description: str = DEFAULT_DESCRIPTION, url: str = "https://timmy.local:9443/a2a", skills=None):
monkeypatch.setattr(mod, "load_config", lambda: {"agent": {"name": name, "description": description}})
monkeypatch.setattr(
mod,
"_load_skills",
lambda: list(
skills
if skills is not None
else [
mod.AgentSkill(
id="code",
name="Code Implementation",
description="Implement and patch code",
tags=["python", "gitea"],
)
]
),
)
monkeypatch.setenv("HERMES_A2A_PUBLIC_URL", url)
monkeypatch.delenv("HERMES_AGENT_NAME", raising=False)
monkeypatch.delenv("AGENT_NAME", raising=False)
monkeypatch.delenv("HERMES_AGENT_DESCRIPTION", raising=False)
monkeypatch.delenv("AGENT_DESCRIPTION", raising=False)
def test_build_agent_card_matches_issue_802_schema(monkeypatch):
_set_base_context(monkeypatch)
card = mod.build_agent_card()
payload = card.to_dict()
assert payload["name"] == "Timmy"
assert payload["description"] == DEFAULT_DESCRIPTION
assert payload["url"] == "https://timmy.local:9443/a2a"
assert payload["capabilities"] == {
"streaming": True,
"pushNotifications": False,
"stateTransitionHistory": True,
}
assert payload["defaultInputModes"] == ["text/plain", "application/json"]
assert payload["defaultOutputModes"] == ["text/plain", "application/json"]
assert payload["skills"][0]["tags"] == ["python", "gitea"]
assert mod.validate_agent_card(payload) == []
@pytest.mark.parametrize(
("name", "url"),
[
("Timmy", "https://timmy.local:9443/a2a"),
("Allegro", "https://allegro.local:9443/a2a"),
("Ezra", "https://ezra.local:9443/a2a"),
],
)
def test_build_agent_card_supports_fleet_members(monkeypatch, name, url):
_set_base_context(monkeypatch, name=name, url=url, skills=[])
payload = mod.build_agent_card().to_dict()
assert payload["name"] == name
assert payload["url"] == url
assert mod.validate_agent_card(payload) == []
def test_load_skills_collects_tags_and_category(monkeypatch, tmp_path):
skill_root = tmp_path / "skills"
skill_dir = skill_root / "code-implementation"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"""---
name: Code Implementation
description: Implement and patch code
tags: [python, gitea]
category: discovery
---
# Code Implementation
""",
encoding="utf-8",
)
monkeypatch.setattr(mod, "get_all_skills_dirs", lambda: [skill_root])
monkeypatch.setattr(mod, "get_disabled_skill_names", lambda: set())
monkeypatch.setattr(mod, "skill_matches_platform", lambda _frontmatter: True)
skills = mod._load_skills()
assert len(skills) == 1
assert skills[0].id == "code-implementation"
assert skills[0].name == "Code Implementation"
assert skills[0].description == "Implement and patch code"
assert skills[0].tags == ["python", "gitea", "discovery"]
def test_validate_agent_card_reports_schema_errors():
errors = mod.validate_agent_card(
{
"name": "",
"description": "",
"url": "timmy.local",
"version": "",
"capabilities": {"streaming": True},
"skills": [{"id": "", "name": "", "tags": "python"}],
"defaultInputModes": ["text/plain"],
"defaultOutputModes": ["plain"],
"metadata": [],
}
)
assert any("name must be a non-empty string" in error for error in errors)
assert any("url must be an absolute http/https URL" in error for error in errors)
assert any("capabilities.pushNotifications" in error for error in errors)
assert any("skills[0] missing id" in error for error in errors)
assert any("skills[0].tags must be a list" in error for error in errors)
assert any("defaultInputModes must include application/json" in error for error in errors)
assert any("defaultOutputModes entries must be MIME types" in error for error in errors)
assert any("metadata must be an object" in error for error in errors)
def test_get_agent_card_json_emits_valid_json(monkeypatch):
_set_base_context(monkeypatch)
payload = json.loads(mod.get_agent_card_json())
assert payload["name"] == "Timmy"
assert mod.validate_agent_card(payload) == []
def test_main_validate_prints_card(monkeypatch, capsys):
_set_base_context(monkeypatch)
exit_code = mod.main(["--validate"])
captured = capsys.readouterr()
assert exit_code == 0
payload = json.loads(captured.out)
assert payload["url"] == "https://timmy.local:9443/a2a"
assert captured.err == ""

View File

@@ -148,3 +148,184 @@ class TestStrategyNameSurfaced:
assert count == 0
assert strategy is None
assert err is not None
class TestEscapeDriftGuard:
"""Tests for the escape-drift guard that catches bash/JSON serialization
artifacts where an apostrophe gets prefixed with a spurious backslash
in tool-call transport.
"""
def test_drift_blocked_apostrophe(self):
"""File has ', old_string and new_string both have \\' — classic
tool-call drift. Guard must block with a helpful error instead of
writing \\' literals into source code."""
content = "x = \"hello there\"\n"
# Simulate transport-corrupted old_string and new_string where an
# apostrophe-like context got prefixed with a backslash. The content
# itself has no apostrophe, but both strings do — matching via
# whitespace/anchor strategies would otherwise succeed.
old_string = "x = \"hello there\" # don\\'t edit\n"
new_string = "x = \"hi there\" # don\\'t edit\n"
# This particular pair won't match anything, so it exits via
# no-match path. Build a case where a non-exact strategy DOES match.
content = "line\n x = 1\nline"
old_string = "line\n x = \\'a\\'\nline"
new_string = "line\n x = \\'b\\'\nline"
new, count, strategy, err = fuzzy_find_and_replace(content, old_string, new_string)
assert count == 0
assert err is not None and "Escape-drift" in err
assert "backslash" in err.lower()
assert new == content # file untouched
def test_drift_blocked_double_quote(self):
"""Same idea but with \\" drift instead of \\'."""
content = 'line\n x = 1\nline'
old_string = 'line\n x = \\"a\\"\nline'
new_string = 'line\n x = \\"b\\"\nline'
new, count, strategy, err = fuzzy_find_and_replace(content, old_string, new_string)
assert count == 0
assert err is not None and "Escape-drift" in err
def test_drift_allowed_when_file_genuinely_has_backslash_escapes(self):
"""If the file already contains \\' (e.g. inside an existing escaped
string), the model is legitimately preserving it. Guard must NOT
fire."""
content = "line\n x = \\'a\\'\nline"
old_string = "line\n x = \\'a\\'\nline"
new_string = "line\n x = \\'b\\'\nline"
new, count, strategy, err = fuzzy_find_and_replace(content, old_string, new_string)
assert err is None
assert count == 1
assert "\\'b\\'" in new
def test_drift_allowed_on_exact_match(self):
"""Exact matches bypass the drift guard entirely — if the file
really contains the exact bytes old_string specified, it's not
drift."""
content = "hello \\'world\\'"
new, count, strategy, err = fuzzy_find_and_replace(
content, "hello \\'world\\'", "hello \\'there\\'"
)
assert err is None
assert count == 1
assert strategy == "exact"
def test_drift_allowed_when_adding_escaped_strings(self):
"""Model is adding new content with \\' that wasn't in the original.
old_string has no \\', so guard doesn't fire."""
content = "line1\nline2\nline3"
old_string = "line1\nline2\nline3"
new_string = "line1\nprint(\\'added\\')\nline2\nline3"
new, count, strategy, err = fuzzy_find_and_replace(content, old_string, new_string)
assert err is None
assert count == 1
assert "\\'added\\'" in new
def test_no_drift_check_when_new_string_lacks_suspect_chars(self):
"""Fast-path: if new_string has no \\' or \\", guard must not
fire even on fuzzy match."""
content = "def foo():\n pass" # extra space ignored by line_trimmed
old_string = "def foo():\n pass"
new_string = "def bar():\n return 1"
new, count, strategy, err = fuzzy_find_and_replace(content, old_string, new_string)
assert err is None
assert count == 1
class TestFindClosestLines:
def setup_method(self):
from tools.fuzzy_match import find_closest_lines
self.find_closest_lines = find_closest_lines
def test_finds_similar_line(self):
content = "def foo():\n pass\ndef bar():\n return 1\n"
result = self.find_closest_lines("def baz():", content)
assert "def foo" in result or "def bar" in result
def test_returns_empty_for_no_match(self):
content = "completely different content here"
result = self.find_closest_lines("xyzzy_no_match_possible_!!!", content)
assert result == ""
def test_returns_empty_for_empty_inputs(self):
assert self.find_closest_lines("", "some content") == ""
assert self.find_closest_lines("old string", "") == ""
def test_includes_context_lines(self):
content = "line1\nline2\ndef target():\n pass\nline5\n"
result = self.find_closest_lines("def target():", content)
assert "target" in result
def test_includes_line_numbers(self):
content = "line1\nline2\ndef foo():\n pass\n"
result = self.find_closest_lines("def foo():", content)
# Should include line numbers in format "N| content"
assert "|" in result
class TestFormatNoMatchHint:
"""Gating tests for format_no_match_hint — the shared helper that decides
whether a 'Did you mean?' snippet should be appended to an error.
"""
def setup_method(self):
from tools.fuzzy_match import format_no_match_hint
self.fmt = format_no_match_hint
def test_fires_on_could_not_find_with_match(self):
"""Classic no-match: similar content exists → hint fires."""
content = "def foo():\n pass\ndef bar():\n pass\n"
result = self.fmt(
"Could not find a match for old_string in the file",
0, "def baz():", content,
)
assert "Did you mean" in result
assert "foo" in result or "bar" in result
def test_silent_on_ambiguous_match_error(self):
"""'Found N matches' is not a missing-match failure — no hint."""
content = "aaa bbb aaa\n"
result = self.fmt(
"Found 2 matches for old_string. Provide more context to make it unique, or use replace_all=True.",
0, "aaa", content,
)
assert result == ""
def test_silent_on_escape_drift_error(self):
"""Escape-drift errors are intentional blocks — hint would mislead."""
content = "x = 1\n"
result = self.fmt(
"Escape-drift detected: old_string and new_string contain the literal sequence '\\\\''...",
0, "x = \\'1\\'", content,
)
assert result == ""
def test_silent_on_identical_strings(self):
"""old_string == new_string — hint irrelevant."""
result = self.fmt(
"old_string and new_string are identical",
0, "foo", "foo bar\n",
)
assert result == ""
def test_silent_when_match_count_nonzero(self):
"""If match succeeded, we shouldn't be in the error path — defense in depth."""
result = self.fmt(
"Could not find a match for old_string in the file",
1, "foo", "foo bar\n",
)
assert result == ""
def test_silent_on_none_error(self):
"""No error at all — no hint."""
result = self.fmt(None, 0, "foo", "bar\n")
assert result == ""
def test_silent_when_no_similar_content(self):
"""Even for a valid no-match error, skip hint when nothing similar exists."""
result = self.fmt(
"Could not find a match for old_string in the file",
0, "totally_unique_xyzzy_qux", "abc\nxyz\n",
)
assert result == ""

View File

@@ -0,0 +1,114 @@
import json
import os
import textwrap
from pathlib import Path
import tools.skill_manager_tool as skill_manager_tool
from tools.file_tools import patch_tool
from tools.skill_manager_tool import _create_skill, _patch_skill
def _disable_patch_tool_guards(monkeypatch):
monkeypatch.setattr("tools.file_tools._check_sensitive_path", lambda _path: None)
monkeypatch.setattr("tools.file_tools._check_file_staleness", lambda _path, _task_id: None)
monkeypatch.setattr("tools.file_tools._log_and_check_conflict", lambda _path, _task_id, _action: None)
def test_patch_tool_replace_no_match_shows_rich_hint_without_legacy_hint(tmp_path, monkeypatch):
_disable_patch_tool_guards(monkeypatch)
sample = tmp_path / "sample.py"
sample.write_text("def foo():\n return 1\n\ndef bar():\n return 2\n", encoding="utf-8")
raw = patch_tool(
mode="replace",
path=str(sample),
old_string="def barycentric():",
new_string="def barycentric_new():",
task_id="qa960-replace-rich-hint",
)
result = json.loads(raw)
assert result["success"] is False
assert "Could not find a match" in result["error"]
assert "Did you mean one of these sections?" in result["error"]
assert "def bar():" in result["error"] or "def foo():" in result["error"]
assert "[Hint:" not in raw
def test_patch_tool_replace_ambiguous_error_does_not_show_did_you_mean(tmp_path, monkeypatch):
_disable_patch_tool_guards(monkeypatch)
sample = tmp_path / "sample.py"
sample.write_text("aaa\nbbb\naaa\n", encoding="utf-8")
raw = patch_tool(
mode="replace",
path=str(sample),
old_string="aaa",
new_string="ccc",
task_id="qa960-replace-ambiguous",
)
result = json.loads(raw)
assert result["success"] is False
assert "Found 2 matches" in result["error"]
assert "Did you mean one of these sections?" not in result["error"]
assert "[Hint:" not in raw
def test_patch_tool_v4a_no_match_shows_rich_hint(tmp_path, monkeypatch):
_disable_patch_tool_guards(monkeypatch)
sample = tmp_path / "sample.py"
sample.write_text("def foo():\n return 1\n", encoding="utf-8")
patch = textwrap.dedent(
f"""\
*** Begin Patch
*** Update File: {sample}
@@
-def barycentric():
+def barycentric_new():
*** End Patch
"""
)
raw = patch_tool(mode="patch", patch=patch, task_id="qa960-v4a-rich-hint")
result = json.loads(raw)
assert result["success"] is False
assert "Patch validation failed" in result["error"]
assert "Did you mean one of these sections?" in result["error"]
assert "def foo():" in result["error"]
def test_skill_patch_no_match_shows_rich_hint(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
skills_dir = tmp_path / "skills"
skills_dir.mkdir(parents=True, exist_ok=True)
monkeypatch.setattr(skill_manager_tool, "SKILLS_DIR", skills_dir)
monkeypatch.setattr(skill_manager_tool, "_security_scan_skill", lambda _skill_dir: None)
_create_skill(
"qa-skill",
textwrap.dedent(
"""\
---
name: qa-skill
description: test
---
Step 1: Do the thing.
Step 2: Verify the thing.
"""
),
)
result = _patch_skill(
"qa-skill",
"Step 1: Do the production rollout.",
"Step 1: Updated.",
)
assert result["success"] is False
assert "Could not find a match" in result["error"]
assert "Did you mean one of these sections?" in result["error"]
assert "Step 1: Do the thing." in result["error"]
assert "file_preview" in result

View File

@@ -757,12 +757,14 @@ class ShellFileOperations(FileOperations):
content, old_string, new_string, replace_all
)
if error:
return PatchResult(error=error)
if match_count == 0:
return PatchResult(error=f"Could not find match for old_string in {path}")
if error or match_count == 0:
err_msg = error or f"Could not find match for old_string in {path}"
try:
from tools.fuzzy_match import format_no_match_hint
err_msg += format_no_match_hint(err_msg, match_count, old_string, content)
except Exception:
pass
return PatchResult(error=err_msg)
# Write back
write_result = self.write_file(path, new_content)
if write_result.error:

View File

@@ -8,6 +8,7 @@ import os
import threading
import time
from pathlib import Path
from typing import Any, Dict, Optional
from tools.binary_extensions import has_binary_extension
from tools.file_operations import ShellFileOperations
from agent.redact import redact_sensitive_text
@@ -690,8 +691,11 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
result_json = json.dumps(result_dict, ensure_ascii=False)
# Hint when old_string not found — saves iterations where the agent
# retries with stale content instead of re-reading the file.
# Suppressed when patch_replace already attached a rich "Did you mean?"
# snippet (which is strictly more useful than the generic hint).
if result_dict.get("error") and "Could not find" in str(result_dict["error"]):
result_json += "\n\n[Hint: old_string not found. Use read_file to verify the current content, or search_files to locate the text.]"
if "Did you mean one of these sections?" not in str(result_dict["error"]):
result_json += "\n\n[Hint: old_string not found. Use read_file to verify the current content, or search_files to locate the text.]"
return result_json
except Exception as e:
return tool_error(str(e))

View File

@@ -93,6 +93,21 @@ def fuzzy_find_and_replace(content: str, old_string: str, new_string: str,
f"Provide more context to make it unique, or use replace_all=True."
)
# Escape-drift guard: when the matched strategy is NOT `exact`,
# we matched via some form of normalization. If new_string
# contains shell/JSON-style escape sequences (\\' or \\") that
# would be written literally into the file but the matched
# region of the file has no such sequences, this is almost
# certainly tool-call serialization drift — the model typed
# an apostrophe/quote and the transport added a stray
# backslash. Writing new_string as-is would corrupt the file.
# Block with a helpful error so the model re-reads and retries
# instead of the caller silently persisting garbage (or not).
if strategy_name != "exact":
drift_err = _detect_escape_drift(content, matches, old_string, new_string)
if drift_err:
return content, 0, None, drift_err
# Perform replacement
new_content = _apply_replacements(content, matches, new_string)
return new_content, len(matches), strategy_name, None
@@ -101,6 +116,46 @@ def fuzzy_find_and_replace(content: str, old_string: str, new_string: str,
return content, 0, None, "Could not find a match for old_string in the file"
def _detect_escape_drift(content: str, matches: List[Tuple[int, int]],
old_string: str, new_string: str) -> Optional[str]:
"""Detect tool-call escape-drift artifacts in new_string.
Looks for ``\\'`` or ``\\"`` sequences that are present in both
old_string and new_string (i.e. the model copy-pasted them as "context"
it intended to preserve) but don't exist in the matched region of the
file. That pattern indicates the transport layer inserted spurious
shell-style escapes around apostrophes or quotes — writing new_string
verbatim would literally insert ``\\'`` into source code.
Returns an error string if drift is detected, None otherwise.
"""
# Cheap pre-check: bail out unless new_string actually contains a
# suspect escape sequence. This keeps the guard free for all the
# common, correct cases.
if "\\'" not in new_string and '\\"' not in new_string:
return None
# Aggregate matched regions of the file — that's what new_string will
# replace. If the suspect escapes are present there already, the
# model is genuinely preserving them (valid for some languages /
# escaped strings); accept the patch.
matched_regions = "".join(content[start:end] for start, end in matches)
for suspect in ("\\'", '\\"'):
if suspect in new_string and suspect in old_string and suspect not in matched_regions:
plain = suspect[1] # "'" or '"'
return (
f"Escape-drift detected: old_string and new_string contain "
f"the literal sequence {suspect!r} but the matched region of "
f"the file does not. This is almost always a tool-call "
f"serialization artifact where an apostrophe or quote got "
f"prefixed with a spurious backslash. Re-read the file with "
f"read_file and pass old_string/new_string without "
f"backslash-escaping {plain!r} characters."
)
return None
def _apply_replacements(content: str, matches: List[Tuple[int, int]], new_string: str) -> str:
"""
Apply replacements at the given positions.
@@ -564,3 +619,86 @@ def _map_normalized_positions(original: str, normalized: str,
original_matches.append((orig_start, min(orig_end, len(original))))
return original_matches
def find_closest_lines(old_string: str, content: str, context_lines: int = 2, max_results: int = 3) -> str:
"""Find lines in content most similar to old_string for "did you mean?" feedback.
Returns a formatted string showing the closest matching lines with context,
or empty string if no useful match is found.
"""
if not old_string or not content:
return ""
old_lines = old_string.splitlines()
content_lines = content.splitlines()
if not old_lines or not content_lines:
return ""
# Use first line of old_string as anchor for search
anchor = old_lines[0].strip()
if not anchor:
# Try second line if first is blank
candidates = [l.strip() for l in old_lines if l.strip()]
if not candidates:
return ""
anchor = candidates[0]
# Score each line in content by similarity to anchor
scored = []
for i, line in enumerate(content_lines):
stripped = line.strip()
if not stripped:
continue
ratio = SequenceMatcher(None, anchor, stripped).ratio()
if ratio > 0.3:
scored.append((ratio, i))
if not scored:
return ""
# Take top matches
scored.sort(key=lambda x: -x[0])
top = scored[:max_results]
parts = []
seen_ranges = set()
for _, line_idx in top:
start = max(0, line_idx - context_lines)
end = min(len(content_lines), line_idx + len(old_lines) + context_lines)
key = (start, end)
if key in seen_ranges:
continue
seen_ranges.add(key)
snippet = "\n".join(
f"{start + j + 1:4d}| {content_lines[start + j]}"
for j in range(end - start)
)
parts.append(snippet)
if not parts:
return ""
return "\n---\n".join(parts)
def format_no_match_hint(error: Optional[str], match_count: int,
old_string: str, content: str) -> str:
"""Return a '\\n\\nDid you mean...' snippet for plain no-match errors.
Gated so the hint only fires for actual "old_string not found" failures.
Ambiguous-match ("Found N matches"), escape-drift, and identical-strings
errors all have ``match_count == 0`` but a "did you mean?" snippet would
be misleading — those failed for unrelated reasons.
Returns an empty string when there's nothing useful to append.
"""
if match_count != 0:
return ""
if not error or not error.startswith("Could not find"):
return ""
hint = find_closest_lines(old_string, content)
if not hint:
return ""
return "\n\nDid you mean one of these sections?\n" + hint

View File

@@ -290,10 +290,16 @@ def _validate_operations(
)
if count == 0:
label = f"'{hunk.context_hint}'" if hunk.context_hint else "(no hint)"
errors.append(
msg = (
f"{op.file_path}: hunk {label} not found"
+ (f"{match_error}" if match_error else "")
)
try:
from tools.fuzzy_match import format_no_match_hint
msg += format_no_match_hint(match_error, count, search_pattern, simulated)
except Exception:
pass
errors.append(msg)
else:
# Advance simulation so subsequent hunks validate correctly.
# Reuse the result from the call above — no second fuzzy run.
@@ -537,7 +543,13 @@ def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
error = None
if error:
return False, f"Could not apply hunk: {error}"
err_msg = f"Could not apply hunk: {error}"
try:
from tools.fuzzy_match import format_no_match_hint
err_msg += format_no_match_hint(error, 0, search_pattern, new_content)
except Exception:
pass
return False, err_msg
else:
# Addition-only hunk (no context or removed lines).
# Insert at the location indicated by the context hint, or at end of file.

View File

@@ -575,9 +575,15 @@ def _patch_skill(
if match_error:
# Show a short preview of the file so the model can self-correct
preview = content[:500] + ("..." if len(content) > 500 else "")
err_msg = match_error
try:
from tools.fuzzy_match import format_no_match_hint
err_msg += format_no_match_hint(match_error, match_count, old_string, content)
except Exception:
pass
return {
"success": False,
"error": match_error,
"error": err_msg,
"file_preview": preview,
}