OpenViking is multi-tenant and requires X-OpenViking-Account and X-OpenViking-User headers. Without them, API calls like POST /api/v1/search/find fail on authenticated servers. Add both headers to _VikingClient._headers(), read from env vars OPENVIKING_ACCOUNT (default: root) and OPENVIKING_USER (default: default). All instantiation sites inherit the fix automatically. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
592 lines
22 KiB
Python
592 lines
22 KiB
Python
"""OpenViking memory plugin — full bidirectional MemoryProvider interface.
|
|
|
|
Context database by Volcengine (ByteDance) that organizes agent knowledge
|
|
into a filesystem hierarchy (viking:// URIs) with tiered context loading,
|
|
automatic memory extraction, and session management.
|
|
|
|
Original PR #3369 by Mibayy, rewritten to use the full OpenViking session
|
|
lifecycle instead of read-only search endpoints.
|
|
|
|
Config via environment variables (profile-scoped via each profile's .env):
|
|
OPENVIKING_ENDPOINT — Server URL (default: http://127.0.0.1:1933)
|
|
OPENVIKING_API_KEY — API key (required for authenticated servers)
|
|
OPENVIKING_ACCOUNT — Tenant account (default: root)
|
|
OPENVIKING_USER — Tenant user (default: default)
|
|
|
|
Capabilities:
|
|
- Automatic memory extraction on session commit (6 categories)
|
|
- Tiered context: L0 (~100 tokens), L1 (~2k), L2 (full)
|
|
- Semantic search with hierarchical directory retrieval
|
|
- Filesystem-style browsing via viking:// URIs
|
|
- Resource ingestion (URLs, docs, code)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from agent.memory_provider import MemoryProvider
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DEFAULT_ENDPOINT = "http://127.0.0.1:1933"
|
|
_TIMEOUT = 30.0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP helper — uses httpx to avoid requiring the openviking SDK
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_httpx():
|
|
"""Lazy import httpx."""
|
|
try:
|
|
import httpx
|
|
return httpx
|
|
except ImportError:
|
|
return None
|
|
|
|
|
|
class _VikingClient:
|
|
"""Thin HTTP client for the OpenViking REST API."""
|
|
|
|
def __init__(self, endpoint: str, api_key: str = "",
|
|
account: str = "", user: str = ""):
|
|
self._endpoint = endpoint.rstrip("/")
|
|
self._api_key = api_key
|
|
self._account = account or os.environ.get("OPENVIKING_ACCOUNT", "root")
|
|
self._user = user or os.environ.get("OPENVIKING_USER", "default")
|
|
self._httpx = _get_httpx()
|
|
if self._httpx is None:
|
|
raise ImportError("httpx is required for OpenViking: pip install httpx")
|
|
|
|
def _headers(self) -> dict:
|
|
h = {
|
|
"Content-Type": "application/json",
|
|
"X-OpenViking-Account": self._account,
|
|
"X-OpenViking-User": self._user,
|
|
}
|
|
if self._api_key:
|
|
h["X-API-Key"] = self._api_key
|
|
return h
|
|
|
|
def _url(self, path: str) -> str:
|
|
return f"{self._endpoint}{path}"
|
|
|
|
def get(self, path: str, **kwargs) -> dict:
|
|
resp = self._httpx.get(
|
|
self._url(path), headers=self._headers(), timeout=_TIMEOUT, **kwargs
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
def post(self, path: str, payload: dict = None, **kwargs) -> dict:
|
|
resp = self._httpx.post(
|
|
self._url(path), json=payload or {}, headers=self._headers(),
|
|
timeout=_TIMEOUT, **kwargs
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
def health(self) -> bool:
|
|
try:
|
|
resp = self._httpx.get(
|
|
self._url("/health"), timeout=3.0
|
|
)
|
|
return resp.status_code == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool schemas
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SEARCH_SCHEMA = {
|
|
"name": "viking_search",
|
|
"description": (
|
|
"Semantic search over the OpenViking knowledge base. "
|
|
"Returns ranked results with viking:// URIs for deeper reading. "
|
|
"Use mode='deep' for complex queries that need reasoning across "
|
|
"multiple sources, 'fast' for simple lookups."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"query": {"type": "string", "description": "Search query."},
|
|
"mode": {
|
|
"type": "string", "enum": ["auto", "fast", "deep"],
|
|
"description": "Search depth (default: auto).",
|
|
},
|
|
"scope": {
|
|
"type": "string",
|
|
"description": "Viking URI prefix to scope search (e.g. 'viking://resources/docs/').",
|
|
},
|
|
"limit": {"type": "integer", "description": "Max results (default: 10)."},
|
|
},
|
|
"required": ["query"],
|
|
},
|
|
}
|
|
|
|
READ_SCHEMA = {
|
|
"name": "viking_read",
|
|
"description": (
|
|
"Read content at a viking:// URI. Three detail levels:\n"
|
|
" abstract — ~100 token summary (L0)\n"
|
|
" overview — ~2k token key points (L1)\n"
|
|
" full — complete content (L2)\n"
|
|
"Start with abstract/overview, only use full when you need details."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"uri": {"type": "string", "description": "viking:// URI to read."},
|
|
"level": {
|
|
"type": "string", "enum": ["abstract", "overview", "full"],
|
|
"description": "Detail level (default: overview).",
|
|
},
|
|
},
|
|
"required": ["uri"],
|
|
},
|
|
}
|
|
|
|
BROWSE_SCHEMA = {
|
|
"name": "viking_browse",
|
|
"description": (
|
|
"Browse the OpenViking knowledge store like a filesystem.\n"
|
|
" list — show directory contents\n"
|
|
" tree — show hierarchy\n"
|
|
" stat — show metadata for a URI"
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"action": {
|
|
"type": "string", "enum": ["tree", "list", "stat"],
|
|
"description": "Browse action.",
|
|
},
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Viking URI path (default: viking://). Examples: 'viking://resources/', 'viking://user/memories/'.",
|
|
},
|
|
},
|
|
"required": ["action"],
|
|
},
|
|
}
|
|
|
|
REMEMBER_SCHEMA = {
|
|
"name": "viking_remember",
|
|
"description": (
|
|
"Explicitly store a fact or memory in the OpenViking knowledge base. "
|
|
"Use for important information the agent should remember long-term. "
|
|
"The system automatically categorizes and indexes the memory."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"content": {"type": "string", "description": "The information to remember."},
|
|
"category": {
|
|
"type": "string",
|
|
"enum": ["preference", "entity", "event", "case", "pattern"],
|
|
"description": "Memory category (default: auto-detected).",
|
|
},
|
|
},
|
|
"required": ["content"],
|
|
},
|
|
}
|
|
|
|
ADD_RESOURCE_SCHEMA = {
|
|
"name": "viking_add_resource",
|
|
"description": (
|
|
"Add a URL or document to the OpenViking knowledge base. "
|
|
"Supports web pages, GitHub repos, PDFs, markdown, code files. "
|
|
"The system automatically parses, indexes, and generates summaries."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"url": {"type": "string", "description": "URL or path of the resource to add."},
|
|
"reason": {
|
|
"type": "string",
|
|
"description": "Why this resource is relevant (improves search).",
|
|
},
|
|
},
|
|
"required": ["url"],
|
|
},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MemoryProvider implementation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class OpenVikingMemoryProvider(MemoryProvider):
|
|
"""Full bidirectional memory via OpenViking context database."""
|
|
|
|
def __init__(self):
|
|
self._client: Optional[_VikingClient] = None
|
|
self._endpoint = ""
|
|
self._api_key = ""
|
|
self._session_id = ""
|
|
self._turn_count = 0
|
|
self._sync_thread: Optional[threading.Thread] = None
|
|
self._prefetch_result = ""
|
|
self._prefetch_lock = threading.Lock()
|
|
self._prefetch_thread: Optional[threading.Thread] = None
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "openviking"
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if OpenViking endpoint is configured. No network calls."""
|
|
return bool(os.environ.get("OPENVIKING_ENDPOINT"))
|
|
|
|
def get_config_schema(self):
|
|
return [
|
|
{
|
|
"key": "endpoint",
|
|
"description": "OpenViking server URL",
|
|
"required": True,
|
|
"default": _DEFAULT_ENDPOINT,
|
|
"env_var": "OPENVIKING_ENDPOINT",
|
|
},
|
|
{
|
|
"key": "api_key",
|
|
"description": "OpenViking API key",
|
|
"secret": True,
|
|
"env_var": "OPENVIKING_API_KEY",
|
|
},
|
|
]
|
|
|
|
def initialize(self, session_id: str, **kwargs) -> None:
|
|
self._endpoint = os.environ.get("OPENVIKING_ENDPOINT", _DEFAULT_ENDPOINT)
|
|
self._api_key = os.environ.get("OPENVIKING_API_KEY", "")
|
|
self._session_id = session_id
|
|
self._turn_count = 0
|
|
|
|
try:
|
|
self._client = _VikingClient(self._endpoint, self._api_key)
|
|
if not self._client.health():
|
|
logger.warning("OpenViking server at %s is not reachable", self._endpoint)
|
|
self._client = None
|
|
except ImportError:
|
|
logger.warning("httpx not installed — OpenViking plugin disabled")
|
|
self._client = None
|
|
|
|
def system_prompt_block(self) -> str:
|
|
if not self._client:
|
|
return ""
|
|
# Provide brief info about the knowledge base
|
|
try:
|
|
# Check what's in the knowledge base via a root listing
|
|
resp = self._client.post("/api/v1/browse", {"action": "stat", "path": "viking://"})
|
|
result = resp.get("result", {})
|
|
children = result.get("children", 0)
|
|
if children == 0:
|
|
return ""
|
|
return (
|
|
"# OpenViking Knowledge Base\n"
|
|
f"Active. Endpoint: {self._endpoint}\n"
|
|
"Use viking_search to find information, viking_read for details "
|
|
"(abstract/overview/full), viking_browse to explore.\n"
|
|
"Use viking_remember to store facts, viking_add_resource to index URLs/docs."
|
|
)
|
|
except Exception:
|
|
return (
|
|
"# OpenViking Knowledge Base\n"
|
|
f"Active. Endpoint: {self._endpoint}\n"
|
|
"Use viking_search, viking_read, viking_browse, "
|
|
"viking_remember, viking_add_resource."
|
|
)
|
|
|
|
def prefetch(self, query: str, *, session_id: str = "") -> str:
|
|
"""Return prefetched results from the background thread."""
|
|
if self._prefetch_thread and self._prefetch_thread.is_alive():
|
|
self._prefetch_thread.join(timeout=3.0)
|
|
with self._prefetch_lock:
|
|
result = self._prefetch_result
|
|
self._prefetch_result = ""
|
|
if not result:
|
|
return ""
|
|
return f"## OpenViking Context\n{result}"
|
|
|
|
def queue_prefetch(self, query: str, *, session_id: str = "") -> None:
|
|
"""Fire a background search to pre-load relevant context."""
|
|
if not self._client or not query:
|
|
return
|
|
|
|
def _run():
|
|
try:
|
|
client = _VikingClient(self._endpoint, self._api_key)
|
|
resp = client.post("/api/v1/search/find", {
|
|
"query": query,
|
|
"top_k": 5,
|
|
})
|
|
result = resp.get("result", {})
|
|
parts = []
|
|
for ctx_type in ("memories", "resources"):
|
|
items = result.get(ctx_type, [])
|
|
for item in items[:3]:
|
|
uri = item.get("uri", "")
|
|
abstract = item.get("abstract", "")
|
|
score = item.get("score", 0)
|
|
if abstract:
|
|
parts.append(f"- [{score:.2f}] {abstract} ({uri})")
|
|
if parts:
|
|
with self._prefetch_lock:
|
|
self._prefetch_result = "\n".join(parts)
|
|
except Exception as e:
|
|
logger.debug("OpenViking prefetch failed: %s", e)
|
|
|
|
self._prefetch_thread = threading.Thread(
|
|
target=_run, daemon=True, name="openviking-prefetch"
|
|
)
|
|
self._prefetch_thread.start()
|
|
|
|
def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None:
|
|
"""Record the conversation turn in OpenViking's session (non-blocking)."""
|
|
if not self._client:
|
|
return
|
|
|
|
self._turn_count += 1
|
|
|
|
def _sync():
|
|
try:
|
|
client = _VikingClient(self._endpoint, self._api_key)
|
|
sid = self._session_id
|
|
|
|
# Add user message
|
|
client.post(f"/api/v1/sessions/{sid}/messages", {
|
|
"role": "user",
|
|
"content": user_content[:4000], # trim very long messages
|
|
})
|
|
# Add assistant message
|
|
client.post(f"/api/v1/sessions/{sid}/messages", {
|
|
"role": "assistant",
|
|
"content": assistant_content[:4000],
|
|
})
|
|
except Exception as e:
|
|
logger.debug("OpenViking sync_turn failed: %s", e)
|
|
|
|
# Wait for any previous sync to finish before starting a new one
|
|
if self._sync_thread and self._sync_thread.is_alive():
|
|
self._sync_thread.join(timeout=5.0)
|
|
|
|
self._sync_thread = threading.Thread(
|
|
target=_sync, daemon=True, name="openviking-sync"
|
|
)
|
|
self._sync_thread.start()
|
|
|
|
def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
|
|
"""Commit the session to trigger memory extraction.
|
|
|
|
OpenViking automatically extracts 6 categories of memories:
|
|
profile, preferences, entities, events, cases, and patterns.
|
|
"""
|
|
if not self._client or self._turn_count == 0:
|
|
return
|
|
|
|
# Wait for any pending sync to finish first
|
|
if self._sync_thread and self._sync_thread.is_alive():
|
|
self._sync_thread.join(timeout=10.0)
|
|
|
|
try:
|
|
self._client.post(f"/api/v1/sessions/{self._session_id}/commit")
|
|
logger.info("OpenViking session %s committed (%d turns)", self._session_id, self._turn_count)
|
|
except Exception as e:
|
|
logger.warning("OpenViking session commit failed: %s", e)
|
|
|
|
def on_memory_write(self, action: str, target: str, content: str) -> None:
|
|
"""Mirror built-in memory writes to OpenViking as explicit memories."""
|
|
if not self._client or action != "add" or not content:
|
|
return
|
|
|
|
def _write():
|
|
try:
|
|
client = _VikingClient(self._endpoint, self._api_key)
|
|
# Add as a user message with memory context so the commit
|
|
# picks it up as an explicit memory during extraction
|
|
client.post(f"/api/v1/sessions/{self._session_id}/messages", {
|
|
"role": "user",
|
|
"parts": [
|
|
{"type": "text", "text": f"[Memory note — {target}] {content}"},
|
|
],
|
|
})
|
|
except Exception as e:
|
|
logger.debug("OpenViking memory mirror failed: %s", e)
|
|
|
|
t = threading.Thread(target=_write, daemon=True, name="openviking-memwrite")
|
|
t.start()
|
|
|
|
def get_tool_schemas(self) -> List[Dict[str, Any]]:
|
|
return [SEARCH_SCHEMA, READ_SCHEMA, BROWSE_SCHEMA, REMEMBER_SCHEMA, ADD_RESOURCE_SCHEMA]
|
|
|
|
def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str:
|
|
if not self._client:
|
|
return json.dumps({"error": "OpenViking server not connected"})
|
|
|
|
try:
|
|
if tool_name == "viking_search":
|
|
return self._tool_search(args)
|
|
elif tool_name == "viking_read":
|
|
return self._tool_read(args)
|
|
elif tool_name == "viking_browse":
|
|
return self._tool_browse(args)
|
|
elif tool_name == "viking_remember":
|
|
return self._tool_remember(args)
|
|
elif tool_name == "viking_add_resource":
|
|
return self._tool_add_resource(args)
|
|
return json.dumps({"error": f"Unknown tool: {tool_name}"})
|
|
except Exception as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
def shutdown(self) -> None:
|
|
# Wait for background threads to finish
|
|
for t in (self._sync_thread, self._prefetch_thread):
|
|
if t and t.is_alive():
|
|
t.join(timeout=5.0)
|
|
|
|
# -- Tool implementations ------------------------------------------------
|
|
|
|
def _tool_search(self, args: dict) -> str:
|
|
query = args.get("query", "")
|
|
if not query:
|
|
return json.dumps({"error": "query is required"})
|
|
|
|
payload: Dict[str, Any] = {"query": query}
|
|
mode = args.get("mode", "auto")
|
|
if mode != "auto":
|
|
payload["mode"] = mode
|
|
if args.get("scope"):
|
|
payload["target_uri"] = args["scope"]
|
|
if args.get("limit"):
|
|
payload["top_k"] = args["limit"]
|
|
|
|
resp = self._client.post("/api/v1/search/find", payload)
|
|
result = resp.get("result", {})
|
|
|
|
# Format results for the model — keep it concise
|
|
formatted = []
|
|
for ctx_type in ("memories", "resources", "skills"):
|
|
items = result.get(ctx_type, [])
|
|
for item in items:
|
|
entry = {
|
|
"uri": item.get("uri", ""),
|
|
"type": ctx_type.rstrip("s"),
|
|
"score": round(item.get("score", 0), 3),
|
|
"abstract": item.get("abstract", ""),
|
|
}
|
|
if item.get("relations"):
|
|
entry["related"] = [r.get("uri") for r in item["relations"][:3]]
|
|
formatted.append(entry)
|
|
|
|
return json.dumps({
|
|
"results": formatted,
|
|
"total": result.get("total", len(formatted)),
|
|
}, ensure_ascii=False)
|
|
|
|
def _tool_read(self, args: dict) -> str:
|
|
uri = args.get("uri", "")
|
|
if not uri:
|
|
return json.dumps({"error": "uri is required"})
|
|
|
|
level = args.get("level", "overview")
|
|
# Map our level names to OpenViking endpoints
|
|
if level == "abstract":
|
|
resp = self._client.post("/api/v1/read/abstract", {"uri": uri})
|
|
elif level == "full":
|
|
resp = self._client.post("/api/v1/read", {"uri": uri, "level": "read"})
|
|
else: # overview
|
|
resp = self._client.post("/api/v1/read", {"uri": uri, "level": "overview"})
|
|
|
|
result = resp.get("result", {})
|
|
content = result.get("content", "")
|
|
|
|
# Truncate very long content to avoid flooding the context
|
|
if len(content) > 8000:
|
|
content = content[:8000] + "\n\n[... truncated, use a more specific URI or abstract level]"
|
|
|
|
return json.dumps({
|
|
"uri": uri,
|
|
"level": level,
|
|
"content": content,
|
|
}, ensure_ascii=False)
|
|
|
|
def _tool_browse(self, args: dict) -> str:
|
|
action = args.get("action", "list")
|
|
path = args.get("path", "viking://")
|
|
|
|
resp = self._client.post("/api/v1/browse", {
|
|
"action": action,
|
|
"path": path,
|
|
})
|
|
result = resp.get("result", {})
|
|
|
|
# Format for readability
|
|
if action == "list" and "entries" in result:
|
|
entries = []
|
|
for e in result["entries"][:50]: # cap at 50 entries
|
|
entries.append({
|
|
"name": e.get("name", ""),
|
|
"uri": e.get("uri", ""),
|
|
"type": "dir" if e.get("is_dir") else "file",
|
|
})
|
|
return json.dumps({"path": path, "entries": entries}, ensure_ascii=False)
|
|
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
def _tool_remember(self, args: dict) -> str:
|
|
content = args.get("content", "")
|
|
if not content:
|
|
return json.dumps({"error": "content is required"})
|
|
|
|
# Store as a session message that will be extracted during commit.
|
|
# The category hint helps OpenViking's extraction classify correctly.
|
|
category = args.get("category", "")
|
|
text = f"[Remember] {content}"
|
|
if category:
|
|
text = f"[Remember — {category}] {content}"
|
|
|
|
self._client.post(f"/api/v1/sessions/{self._session_id}/messages", {
|
|
"role": "user",
|
|
"parts": [
|
|
{"type": "text", "text": text},
|
|
],
|
|
})
|
|
|
|
return json.dumps({
|
|
"status": "stored",
|
|
"message": "Memory recorded. Will be extracted and indexed on session commit.",
|
|
})
|
|
|
|
def _tool_add_resource(self, args: dict) -> str:
|
|
url = args.get("url", "")
|
|
if not url:
|
|
return json.dumps({"error": "url is required"})
|
|
|
|
payload: Dict[str, Any] = {"path": url}
|
|
if args.get("reason"):
|
|
payload["reason"] = args["reason"]
|
|
|
|
resp = self._client.post("/api/v1/resources", payload)
|
|
result = resp.get("result", {})
|
|
|
|
return json.dumps({
|
|
"status": "added",
|
|
"root_uri": result.get("root_uri", ""),
|
|
"message": "Resource queued for processing. Use viking_search after a moment to find it.",
|
|
}, ensure_ascii=False)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def register(ctx) -> None:
|
|
"""Register OpenViking as a memory provider plugin."""
|
|
ctx.register_memory_provider(OpenVikingMemoryProvider())
|