2026-02-19 00:57:31 -08:00
#!/usr/bin/env python3
"""
Memory Tool Module - Persistent Curated Memory
Provides bounded , file - backed memory that persists across sessions . Two stores :
- MEMORY . md : agent ' s personal notes and observations (environment facts, project
conventions , tool quirks , things learned )
- USER . md : what the agent knows about the user ( preferences , communication style ,
expectations , workflow habits )
Both are injected into the system prompt as a frozen snapshot at session start .
Mid - session writes update files on disk immediately ( durable ) but do NOT change
the system prompt - - this preserves the prefix cache for the entire session .
The snapshot refreshes on the next session start .
Entry delimiter : § ( section sign ) . Entries can be multiline .
Character limits ( not tokens ) because char counts are model - independent .
Design :
- Single ` memory ` tool with action parameter : add , replace , remove , read
- replace / remove use short unique substring matching ( not full text or IDs )
- Behavioral guidance lives in the tool schema description
- Frozen snapshot pattern : system prompt is stable , tool responses show live state
"""
2026-03-17 04:19:11 -07:00
import fcntl
2026-02-19 00:57:31 -08:00
import json
Harden agent attack surface: scan writes to memory, skills, cron, and context files
The security scanner (skills_guard.py) was only wired into the hub install path.
All other write paths to persistent state — skills created by the agent, memory
entries, cron prompts, and context files — bypassed it entirely. This closes
those gaps:
- file_operations: deny-list blocks writes to ~/.ssh, ~/.aws, ~/.hermes/.env, etc.
- code_execution_tool: filter secret env vars from sandbox child process
- skill_manager_tool: wire scan_skill() into create/edit/patch/write_file with rollback
- skills_guard: add "agent-created" trust level (same policy as community)
- memory_tool: scan content for injection/exfil before system prompt injection
- prompt_builder: scan AGENTS.md, .cursorrules, SOUL.md for prompt injection
- cronjob_tools: scan cron prompts for critical threats before scheduling
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 23:43:15 -05:00
import logging
2026-02-19 00:57:31 -08:00
import os
Harden agent attack surface: scan writes to memory, skills, cron, and context files
The security scanner (skills_guard.py) was only wired into the hub install path.
All other write paths to persistent state — skills created by the agent, memory
entries, cron prompts, and context files — bypassed it entirely. This closes
those gaps:
- file_operations: deny-list blocks writes to ~/.ssh, ~/.aws, ~/.hermes/.env, etc.
- code_execution_tool: filter secret env vars from sandbox child process
- skill_manager_tool: wire scan_skill() into create/edit/patch/write_file with rollback
- skills_guard: add "agent-created" trust level (same policy as community)
- memory_tool: scan content for injection/exfil before system prompt injection
- prompt_builder: scan AGENTS.md, .cursorrules, SOUL.md for prompt injection
- cronjob_tools: scan cron prompts for critical threats before scheduling
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 23:43:15 -05:00
import re
2026-02-20 02:32:15 -08:00
import tempfile
2026-03-17 04:19:11 -07:00
from contextlib import contextmanager
2026-02-19 00:57:31 -08:00
from pathlib import Path
refactor: consolidate get_hermes_home() and parse_reasoning_effort() (#3062)
Centralizes two widely-duplicated patterns into hermes_constants.py:
1. get_hermes_home() — Path resolution for ~/.hermes (HERMES_HOME env var)
- Was copy-pasted inline across 30+ files as:
Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
- Now defined once in hermes_constants.py (zero-dependency module)
- hermes_cli/config.py re-exports it for backward compatibility
- Removed local wrapper functions in honcho_integration/client.py,
tools/website_policy.py, tools/tirith_security.py, hermes_cli/uninstall.py
2. parse_reasoning_effort() — Reasoning effort string validation
- Was copy-pasted in cli.py, gateway/run.py, cron/scheduler.py
- Same validation logic: check against (xhigh, high, medium, low, minimal, none)
- Now defined once in hermes_constants.py, called from all 3 locations
- Warning log for unknown values kept at call sites (context-specific)
31 files changed, net +31 lines (125 insertions, 94 deletions)
Full test suite: 6179 passed, 0 failed
2026-03-25 15:54:28 -07:00
from hermes_constants import get_hermes_home
2026-02-19 00:57:31 -08:00
from typing import Dict , Any , List , Optional
Harden agent attack surface: scan writes to memory, skills, cron, and context files
The security scanner (skills_guard.py) was only wired into the hub install path.
All other write paths to persistent state — skills created by the agent, memory
entries, cron prompts, and context files — bypassed it entirely. This closes
those gaps:
- file_operations: deny-list blocks writes to ~/.ssh, ~/.aws, ~/.hermes/.env, etc.
- code_execution_tool: filter secret env vars from sandbox child process
- skill_manager_tool: wire scan_skill() into create/edit/patch/write_file with rollback
- skills_guard: add "agent-created" trust level (same policy as community)
- memory_tool: scan content for injection/exfil before system prompt injection
- prompt_builder: scan AGENTS.md, .cursorrules, SOUL.md for prompt injection
- cronjob_tools: scan cron prompts for critical threats before scheduling
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 23:43:15 -05:00
logger = logging . getLogger ( __name__ )
2026-02-19 00:57:31 -08:00
# Where memory files live
refactor: consolidate get_hermes_home() and parse_reasoning_effort() (#3062)
Centralizes two widely-duplicated patterns into hermes_constants.py:
1. get_hermes_home() — Path resolution for ~/.hermes (HERMES_HOME env var)
- Was copy-pasted inline across 30+ files as:
Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
- Now defined once in hermes_constants.py (zero-dependency module)
- hermes_cli/config.py re-exports it for backward compatibility
- Removed local wrapper functions in honcho_integration/client.py,
tools/website_policy.py, tools/tirith_security.py, hermes_cli/uninstall.py
2. parse_reasoning_effort() — Reasoning effort string validation
- Was copy-pasted in cli.py, gateway/run.py, cron/scheduler.py
- Same validation logic: check against (xhigh, high, medium, low, minimal, none)
- Now defined once in hermes_constants.py, called from all 3 locations
- Warning log for unknown values kept at call sites (context-specific)
31 files changed, net +31 lines (125 insertions, 94 deletions)
Full test suite: 6179 passed, 0 failed
2026-03-25 15:54:28 -07:00
MEMORY_DIR = get_hermes_home ( ) / " memories "
2026-02-19 00:57:31 -08:00
ENTRY_DELIMITER = " \n § \n "
Harden agent attack surface: scan writes to memory, skills, cron, and context files
The security scanner (skills_guard.py) was only wired into the hub install path.
All other write paths to persistent state — skills created by the agent, memory
entries, cron prompts, and context files — bypassed it entirely. This closes
those gaps:
- file_operations: deny-list blocks writes to ~/.ssh, ~/.aws, ~/.hermes/.env, etc.
- code_execution_tool: filter secret env vars from sandbox child process
- skill_manager_tool: wire scan_skill() into create/edit/patch/write_file with rollback
- skills_guard: add "agent-created" trust level (same policy as community)
- memory_tool: scan content for injection/exfil before system prompt injection
- prompt_builder: scan AGENTS.md, .cursorrules, SOUL.md for prompt injection
- cronjob_tools: scan cron prompts for critical threats before scheduling
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 23:43:15 -05:00
# ---------------------------------------------------------------------------
# Memory content scanning — lightweight check for injection/exfiltration
# in content that gets injected into the system prompt.
# ---------------------------------------------------------------------------
_MEMORY_THREAT_PATTERNS = [
# Prompt injection
( r ' ignore \ s+(previous|all|above|prior) \ s+instructions ' , " prompt_injection " ) ,
( r ' you \ s+are \ s+now \ s+ ' , " role_hijack " ) ,
( r ' do \ s+not \ s+tell \ s+the \ s+user ' , " deception_hide " ) ,
( r ' system \ s+prompt \ s+override ' , " sys_prompt_override " ) ,
( r ' disregard \ s+(your|all|any) \ s+(instructions|rules|guidelines) ' , " disregard_rules " ) ,
( r ' act \ s+as \ s+(if|though) \ s+you \ s+(have \ s+no|don \' t \ s+have) \ s+(restrictions|limits|rules) ' , " bypass_restrictions " ) ,
# Exfiltration via curl/wget with secrets
( r ' curl \ s+[^ \ n]* \ $ \ { ? \ w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API) ' , " exfil_curl " ) ,
( r ' wget \ s+[^ \ n]* \ $ \ { ? \ w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API) ' , " exfil_wget " ) ,
( r ' cat \ s+[^ \ n]*( \ .env|credentials| \ .netrc| \ .pgpass| \ .npmrc| \ .pypirc) ' , " read_secrets " ) ,
# Persistence via shell rc
( r ' authorized_keys ' , " ssh_backdoor " ) ,
( r ' \ $HOME/ \ .ssh| \ ~/ \ .ssh ' , " ssh_access " ) ,
( r ' \ $HOME/ \ .hermes/ \ .env| \ ~/ \ .hermes/ \ .env ' , " hermes_env " ) ,
]
# Subset of invisible chars for injection detection
_INVISIBLE_CHARS = {
' \u200b ' , ' \u200c ' , ' \u200d ' , ' \u2060 ' , ' \ufeff ' ,
' \u202a ' , ' \u202b ' , ' \u202c ' , ' \u202d ' , ' \u202e ' ,
}
def _scan_memory_content ( content : str ) - > Optional [ str ] :
""" Scan memory content for injection/exfil patterns. Returns error string if blocked. """
# Check invisible unicode
for char in _INVISIBLE_CHARS :
if char in content :
return f " Blocked: content contains invisible unicode character U+ { ord ( char ) : 04X } (possible injection). "
# Check threat patterns
for pattern , pid in _MEMORY_THREAT_PATTERNS :
if re . search ( pattern , content , re . IGNORECASE ) :
return f " Blocked: content matches threat pattern ' { pid } ' . Memory entries are injected into the system prompt and must not contain injection or exfiltration payloads. "
return None
2026-02-19 00:57:31 -08:00
class MemoryStore :
"""
Bounded curated memory with file persistence . One instance per AIAgent .
Maintains two parallel states :
- _system_prompt_snapshot : frozen at load time , used for system prompt injection .
Never mutated mid - session . Keeps prefix cache stable .
- memory_entries / user_entries : live state , mutated by tool calls , persisted to disk .
Tool responses always reflect this live state .
"""
def __init__ ( self , memory_char_limit : int = 2200 , user_char_limit : int = 1375 ) :
self . memory_entries : List [ str ] = [ ]
self . user_entries : List [ str ] = [ ]
self . memory_char_limit = memory_char_limit
self . user_char_limit = user_char_limit
# Frozen snapshot for system prompt -- set once at load_from_disk()
self . _system_prompt_snapshot : Dict [ str , str ] = { " memory " : " " , " user " : " " }
def load_from_disk ( self ) :
""" Load entries from MEMORY.md and USER.md, capture system prompt snapshot. """
MEMORY_DIR . mkdir ( parents = True , exist_ok = True )
self . memory_entries = self . _read_file ( MEMORY_DIR / " MEMORY.md " )
self . user_entries = self . _read_file ( MEMORY_DIR / " USER.md " )
2026-02-20 02:32:15 -08:00
# Deduplicate entries (preserves order, keeps first occurrence)
self . memory_entries = list ( dict . fromkeys ( self . memory_entries ) )
self . user_entries = list ( dict . fromkeys ( self . user_entries ) )
2026-02-19 00:57:31 -08:00
# Capture frozen snapshot for system prompt injection
self . _system_prompt_snapshot = {
" memory " : self . _render_block ( " memory " , self . memory_entries ) ,
" user " : self . _render_block ( " user " , self . user_entries ) ,
}
2026-03-17 04:19:11 -07:00
@staticmethod
@contextmanager
def _file_lock ( path : Path ) :
""" Acquire an exclusive file lock for read-modify-write safety.
Uses a separate . lock file so the memory file itself can still be
atomically replaced via os . replace ( ) .
"""
lock_path = path . with_suffix ( path . suffix + " .lock " )
lock_path . parent . mkdir ( parents = True , exist_ok = True )
fd = open ( lock_path , " w " )
try :
fcntl . flock ( fd , fcntl . LOCK_EX )
yield
finally :
fcntl . flock ( fd , fcntl . LOCK_UN )
fd . close ( )
@staticmethod
def _path_for ( target : str ) - > Path :
if target == " user " :
return MEMORY_DIR / " USER.md "
return MEMORY_DIR / " MEMORY.md "
def _reload_target ( self , target : str ) :
""" Re-read entries from disk into in-memory state.
Called under file lock to get the latest state before mutating .
"""
fresh = self . _read_file ( self . _path_for ( target ) )
fresh = list ( dict . fromkeys ( fresh ) ) # deduplicate
self . _set_entries ( target , fresh )
2026-02-19 00:57:31 -08:00
def save_to_disk ( self , target : str ) :
""" Persist entries to the appropriate file. Called after every mutation. """
MEMORY_DIR . mkdir ( parents = True , exist_ok = True )
2026-03-17 04:19:11 -07:00
self . _write_file ( self . _path_for ( target ) , self . _entries_for ( target ) )
2026-02-19 00:57:31 -08:00
def _entries_for ( self , target : str ) - > List [ str ] :
if target == " user " :
return self . user_entries
return self . memory_entries
def _set_entries ( self , target : str , entries : List [ str ] ) :
if target == " user " :
self . user_entries = entries
else :
self . memory_entries = entries
def _char_count ( self , target : str ) - > int :
entries = self . _entries_for ( target )
if not entries :
return 0
return len ( ENTRY_DELIMITER . join ( entries ) )
def _char_limit ( self , target : str ) - > int :
if target == " user " :
return self . user_char_limit
return self . memory_char_limit
def add ( self , target : str , content : str ) - > Dict [ str , Any ] :
""" Append a new entry. Returns error if it would exceed the char limit. """
content = content . strip ( )
if not content :
return { " success " : False , " error " : " Content cannot be empty. " }
Harden agent attack surface: scan writes to memory, skills, cron, and context files
The security scanner (skills_guard.py) was only wired into the hub install path.
All other write paths to persistent state — skills created by the agent, memory
entries, cron prompts, and context files — bypassed it entirely. This closes
those gaps:
- file_operations: deny-list blocks writes to ~/.ssh, ~/.aws, ~/.hermes/.env, etc.
- code_execution_tool: filter secret env vars from sandbox child process
- skill_manager_tool: wire scan_skill() into create/edit/patch/write_file with rollback
- skills_guard: add "agent-created" trust level (same policy as community)
- memory_tool: scan content for injection/exfil before system prompt injection
- prompt_builder: scan AGENTS.md, .cursorrules, SOUL.md for prompt injection
- cronjob_tools: scan cron prompts for critical threats before scheduling
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 23:43:15 -05:00
# Scan for injection/exfiltration before accepting
scan_error = _scan_memory_content ( content )
if scan_error :
return { " success " : False , " error " : scan_error }
2026-03-17 04:19:11 -07:00
with self . _file_lock ( self . _path_for ( target ) ) :
# Re-read from disk under lock to pick up writes from other sessions
self . _reload_target ( target )
entries = self . _entries_for ( target )
limit = self . _char_limit ( target )
# Reject exact duplicates
if content in entries :
return self . _success_response ( target , " Entry already exists (no duplicate added). " )
# Calculate what the new total would be
new_entries = entries + [ content ]
new_total = len ( ENTRY_DELIMITER . join ( new_entries ) )
if new_total > limit :
current = self . _char_count ( target )
return {
" success " : False ,
" error " : (
f " Memory at { current : , } / { limit : , } chars. "
f " Adding this entry ( { len ( content ) } chars) would exceed the limit. "
f " Replace or remove existing entries first. "
) ,
" current_entries " : entries ,
" usage " : f " { current : , } / { limit : , } " ,
}
entries . append ( content )
self . _set_entries ( target , entries )
self . save_to_disk ( target )
2026-02-19 00:57:31 -08:00
return self . _success_response ( target , " Entry added. " )
def replace ( self , target : str , old_text : str , new_content : str ) - > Dict [ str , Any ] :
""" Find entry containing old_text substring, replace it with new_content. """
old_text = old_text . strip ( )
new_content = new_content . strip ( )
if not old_text :
return { " success " : False , " error " : " old_text cannot be empty. " }
if not new_content :
return { " success " : False , " error " : " new_content cannot be empty. Use ' remove ' to delete entries. " }
Harden agent attack surface: scan writes to memory, skills, cron, and context files
The security scanner (skills_guard.py) was only wired into the hub install path.
All other write paths to persistent state — skills created by the agent, memory
entries, cron prompts, and context files — bypassed it entirely. This closes
those gaps:
- file_operations: deny-list blocks writes to ~/.ssh, ~/.aws, ~/.hermes/.env, etc.
- code_execution_tool: filter secret env vars from sandbox child process
- skill_manager_tool: wire scan_skill() into create/edit/patch/write_file with rollback
- skills_guard: add "agent-created" trust level (same policy as community)
- memory_tool: scan content for injection/exfil before system prompt injection
- prompt_builder: scan AGENTS.md, .cursorrules, SOUL.md for prompt injection
- cronjob_tools: scan cron prompts for critical threats before scheduling
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 23:43:15 -05:00
# Scan replacement content for injection/exfiltration
scan_error = _scan_memory_content ( new_content )
if scan_error :
return { " success " : False , " error " : scan_error }
2026-03-17 04:19:11 -07:00
with self . _file_lock ( self . _path_for ( target ) ) :
self . _reload_target ( target )
2026-02-19 00:57:31 -08:00
2026-03-17 04:19:11 -07:00
entries = self . _entries_for ( target )
matches = [ ( i , e ) for i , e in enumerate ( entries ) if old_text in e ]
2026-02-19 00:57:31 -08:00
2026-03-17 04:19:11 -07:00
if len ( matches ) == 0 :
return { " success " : False , " error " : f " No entry matched ' { old_text } ' . " }
if len ( matches ) > 1 :
# If all matches are identical (exact duplicates), operate on the first one
unique_texts = set ( e for _ , e in matches )
if len ( unique_texts ) > 1 :
previews = [ e [ : 80 ] + ( " ... " if len ( e ) > 80 else " " ) for _ , e in matches ]
return {
" success " : False ,
" error " : f " Multiple entries matched ' { old_text } ' . Be more specific. " ,
" matches " : previews ,
}
# All identical -- safe to replace just the first
2026-02-19 00:57:31 -08:00
2026-03-17 04:19:11 -07:00
idx = matches [ 0 ] [ 0 ]
limit = self . _char_limit ( target )
2026-02-19 00:57:31 -08:00
2026-03-17 04:19:11 -07:00
# Check that replacement doesn't blow the budget
test_entries = entries . copy ( )
test_entries [ idx ] = new_content
new_total = len ( ENTRY_DELIMITER . join ( test_entries ) )
2026-02-19 00:57:31 -08:00
2026-03-17 04:19:11 -07:00
if new_total > limit :
return {
" success " : False ,
" error " : (
f " Replacement would put memory at { new_total : , } / { limit : , } chars. "
f " Shorten the new content or remove other entries first. "
) ,
}
2026-02-19 00:57:31 -08:00
2026-03-17 04:19:11 -07:00
entries [ idx ] = new_content
self . _set_entries ( target , entries )
self . save_to_disk ( target )
2026-02-19 00:57:31 -08:00
return self . _success_response ( target , " Entry replaced. " )
def remove ( self , target : str , old_text : str ) - > Dict [ str , Any ] :
""" Remove the entry containing old_text substring. """
old_text = old_text . strip ( )
if not old_text :
return { " success " : False , " error " : " old_text cannot be empty. " }
2026-03-17 04:19:11 -07:00
with self . _file_lock ( self . _path_for ( target ) ) :
self . _reload_target ( target )
entries = self . _entries_for ( target )
matches = [ ( i , e ) for i , e in enumerate ( entries ) if old_text in e ]
if len ( matches ) == 0 :
return { " success " : False , " error " : f " No entry matched ' { old_text } ' . " }
if len ( matches ) > 1 :
# If all matches are identical (exact duplicates), remove the first one
unique_texts = set ( e for _ , e in matches )
if len ( unique_texts ) > 1 :
previews = [ e [ : 80 ] + ( " ... " if len ( e ) > 80 else " " ) for _ , e in matches ]
return {
" success " : False ,
" error " : f " Multiple entries matched ' { old_text } ' . Be more specific. " ,
" matches " : previews ,
}
# All identical -- safe to remove just the first
idx = matches [ 0 ] [ 0 ]
entries . pop ( idx )
self . _set_entries ( target , entries )
self . save_to_disk ( target )
2026-02-19 00:57:31 -08:00
return self . _success_response ( target , " Entry removed. " )
def format_for_system_prompt ( self , target : str ) - > Optional [ str ] :
"""
Return the frozen snapshot for system prompt injection .
This returns the state captured at load_from_disk ( ) time , NOT the live
state . Mid - session writes do not affect this . This keeps the system
prompt stable across all turns , preserving the prefix cache .
Returns None if the snapshot is empty ( no entries at load time ) .
"""
block = self . _system_prompt_snapshot . get ( target , " " )
return block if block else None
# -- Internal helpers --
def _success_response ( self , target : str , message : str = None ) - > Dict [ str , Any ] :
entries = self . _entries_for ( target )
current = self . _char_count ( target )
limit = self . _char_limit ( target )
2026-03-28 14:55:18 -07:00
pct = min ( 100 , int ( ( current / limit ) * 100 ) ) if limit > 0 else 0
2026-02-19 00:57:31 -08:00
resp = {
" success " : True ,
" target " : target ,
" entries " : entries ,
" usage " : f " { pct } % — { current : , } / { limit : , } chars " ,
" entry_count " : len ( entries ) ,
}
if message :
resp [ " message " ] = message
return resp
def _render_block ( self , target : str , entries : List [ str ] ) - > str :
""" Render a system prompt block with header and usage indicator. """
if not entries :
return " "
limit = self . _char_limit ( target )
content = ENTRY_DELIMITER . join ( entries )
current = len ( content )
2026-03-28 14:55:18 -07:00
pct = min ( 100 , int ( ( current / limit ) * 100 ) ) if limit > 0 else 0
2026-02-19 00:57:31 -08:00
if target == " user " :
header = f " USER PROFILE (who the user is) [ { pct } % — { current : , } / { limit : , } chars] "
else :
header = f " MEMORY (your personal notes) [ { pct } % — { current : , } / { limit : , } chars] "
separator = " ═ " * 46
return f " { separator } \n { header } \n { separator } \n { content } "
@staticmethod
def _read_file ( path : Path ) - > List [ str ] :
2026-02-20 02:32:15 -08:00
""" Read a memory file and split into entries.
No file locking needed : _write_file uses atomic rename , so readers
always see either the previous complete file or the new complete file .
"""
2026-02-19 00:57:31 -08:00
if not path . exists ( ) :
return [ ]
try :
2026-02-20 02:32:15 -08:00
raw = path . read_text ( encoding = " utf-8 " )
2026-02-19 00:57:31 -08:00
except ( OSError , IOError ) :
return [ ]
if not raw . strip ( ) :
return [ ]
2026-02-28 01:33:41 +03:00
# Use ENTRY_DELIMITER for consistency with _write_file. Splitting by "§"
# alone would incorrectly split entries that contain "§" in their content.
entries = [ e . strip ( ) for e in raw . split ( ENTRY_DELIMITER ) ]
2026-02-19 00:57:31 -08:00
return [ e for e in entries if e ]
@staticmethod
def _write_file ( path : Path , entries : List [ str ] ) :
2026-02-20 02:32:15 -08:00
""" Write entries to a memory file using atomic temp-file + rename.
Previous implementation used open ( " w " ) + flock , but " w " truncates the
file * before * the lock is acquired , creating a race window where
concurrent readers see an empty file . Atomic rename avoids this :
readers always see either the old complete file or the new one .
"""
2026-02-19 00:57:31 -08:00
content = ENTRY_DELIMITER . join ( entries ) if entries else " "
try :
2026-02-20 02:32:15 -08:00
# Write to temp file in same directory (same filesystem for atomic rename)
fd , tmp_path = tempfile . mkstemp (
dir = str ( path . parent ) , suffix = " .tmp " , prefix = " .mem_ "
)
try :
with os . fdopen ( fd , " w " , encoding = " utf-8 " ) as f :
2026-02-19 00:57:31 -08:00
f . write ( content )
2026-02-20 02:32:15 -08:00
f . flush ( )
os . fsync ( f . fileno ( ) )
os . replace ( tmp_path , str ( path ) ) # Atomic on same filesystem
except BaseException :
# Clean up temp file on any failure
try :
os . unlink ( tmp_path )
except OSError :
pass
raise
2026-02-19 00:57:31 -08:00
except ( OSError , IOError ) as e :
raise RuntimeError ( f " Failed to write memory file { path } : { e } " )
def memory_tool (
action : str ,
target : str = " memory " ,
content : str = None ,
old_text : str = None ,
store : Optional [ MemoryStore ] = None ,
) - > str :
"""
Single entry point for the memory tool . Dispatches to MemoryStore methods .
Returns JSON string with results .
"""
if store is None :
return json . dumps ( { " success " : False , " error " : " Memory is not available. It may be disabled in config or this environment. " } , ensure_ascii = False )
if target not in ( " memory " , " user " ) :
return json . dumps ( { " success " : False , " error " : f " Invalid target ' { target } ' . Use ' memory ' or ' user ' . " } , ensure_ascii = False )
if action == " add " :
if not content :
return json . dumps ( { " success " : False , " error " : " Content is required for ' add ' action. " } , ensure_ascii = False )
result = store . add ( target , content )
elif action == " replace " :
if not old_text :
return json . dumps ( { " success " : False , " error " : " old_text is required for ' replace ' action. " } , ensure_ascii = False )
if not content :
return json . dumps ( { " success " : False , " error " : " content is required for ' replace ' action. " } , ensure_ascii = False )
result = store . replace ( target , old_text , content )
elif action == " remove " :
if not old_text :
return json . dumps ( { " success " : False , " error " : " old_text is required for ' remove ' action. " } , ensure_ascii = False )
result = store . remove ( target , old_text )
else :
2026-02-19 01:03:08 -08:00
return json . dumps ( { " success " : False , " error " : f " Unknown action ' { action } ' . Use: add, replace, remove " } , ensure_ascii = False )
2026-02-19 00:57:31 -08:00
return json . dumps ( result , ensure_ascii = False )
def check_memory_requirements ( ) - > bool :
""" Memory tool has no external requirements -- always available. """
return True
# =============================================================================
# OpenAI Function-Calling Schema
# =============================================================================
MEMORY_SCHEMA = {
" name " : " memory " ,
" description " : (
2026-03-14 11:26:18 -07:00
" Save durable information to persistent memory that survives across sessions. "
" Memory is injected into future turns, so keep it compact and focused on facts "
" that will still matter later. \n \n "
2026-02-22 02:31:52 -08:00
" WHEN TO SAVE (do this proactively, don ' t wait to be asked): \n "
2026-03-16 06:52:32 -07:00
" - User corrects you or says ' remember this ' / ' don ' t do that again ' \n "
2026-02-22 02:31:52 -08:00
" - User shares a preference, habit, or personal detail (name, role, timezone, coding style) \n "
" - You discover something about the environment (OS, installed tools, project structure) \n "
" - You learn a convention, API quirk, or workflow specific to this user ' s setup \n "
2026-03-14 11:26:18 -07:00
" - You identify a stable fact that will be useful again in future sessions \n \n "
2026-03-16 06:52:32 -07:00
" PRIORITY: User preferences and corrections > environment facts > procedural knowledge. "
" The most valuable memory prevents the user from having to repeat themselves. \n \n "
2026-03-14 11:26:18 -07:00
" Do NOT save task progress, session outcomes, completed-work logs, or temporary TODO "
" state to memory; use session_search to recall those from past transcripts. \n "
" If you ' ve discovered a new way to do something, solved a problem that could be "
" necessary later, save it as a skill with the skill tool. \n \n "
2026-02-22 02:31:52 -08:00
" TWO TARGETS: \n "
" - ' user ' : who the user is -- name, role, preferences, communication style, pet peeves \n "
" - ' memory ' : your notes -- environment facts, project conventions, tool quirks, lessons learned \n \n "
" ACTIONS: add (new entry), replace (update existing -- old_text identifies it), "
2026-03-14 11:26:18 -07:00
" remove (delete -- old_text identifies it). \n \n "
" SKIP: trivial/obvious info, things easily re-discovered, raw data dumps, and temporary task state. "
2026-02-19 00:57:31 -08:00
) ,
" parameters " : {
" type " : " object " ,
" properties " : {
" action " : {
" type " : " string " ,
2026-02-19 01:03:08 -08:00
" enum " : [ " add " , " replace " , " remove " ] ,
2026-02-19 00:57:31 -08:00
" description " : " The action to perform. "
} ,
" target " : {
" type " : " string " ,
" enum " : [ " memory " , " user " ] ,
" description " : " Which memory store: ' memory ' for personal notes, ' user ' for user profile. "
} ,
" content " : {
" type " : " string " ,
" description " : " The entry content. Required for ' add ' and ' replace ' . "
} ,
" old_text " : {
" type " : " string " ,
" description " : " Short unique substring identifying the entry to replace or remove. "
} ,
} ,
" required " : [ " action " , " target " ] ,
} ,
}
2026-02-20 02:32:15 -08:00
2026-02-21 20:22:33 -08:00
# --- Registry ---
from tools . registry import registry
registry . register (
name = " memory " ,
toolset = " memory " ,
schema = MEMORY_SCHEMA ,
handler = lambda args , * * kw : memory_tool (
action = args . get ( " action " , " " ) ,
target = args . get ( " target " , " memory " ) ,
content = args . get ( " content " ) ,
old_text = args . get ( " old_text " ) ,
store = kw . get ( " store " ) ) ,
check_fn = check_memory_requirements ,
2026-03-15 20:21:21 -07:00
emoji = " 🧠 " ,
2026-02-21 20:22:33 -08:00
)
2026-02-20 02:32:15 -08:00