Compare commits

..

6 Commits

Author SHA1 Message Date
Alexander Whitestone
103b641bc0 feat: Mnemosyne foundation — ingest, index, CLI, tests (18/18)
Some checks failed
CI / test (pull_request) Failing after 10s
CI / validate (pull_request) Failing after 15s
Review Approval Gate / verify-review (pull_request) Failing after 3s
- ingest.py: chunking with overlap, dedup via SHA256, SQLite+FTS5
- index.py: keyword search (BM25), semantic search (cosine), RRF fusion
- cli.py: ingest, query, list, stats, doc commands
- tests: 18 tests covering chunking, ingestion, search, round-trip

Closes #1242
2026-04-11 20:16:51 -04:00
Alexander Whitestone
f9b5b2340c wip: tests — 18/18 passing, round-trip verified 2026-04-11 20:16:33 -04:00
Alexander Whitestone
3a0bd1aa3f wip: CLI with ingest, query, list, stats, doc commands 2026-04-11 20:15:58 -04:00
Alexander Whitestone
71c51d2e8c wip: holographic index with keyword + semantic search and RRF 2026-04-11 20:15:36 -04:00
Alexander Whitestone
f895998581 wip: ingest module with chunking, FTS5, dedup 2026-04-11 20:15:07 -04:00
Alexander Whitestone
aa1a6349ac wip: mnemosyne package init 2026-04-11 20:14:33 -04:00
41 changed files with 1361 additions and 2717 deletions

View File

@@ -1,201 +0,0 @@
#!/usr/bin/env bash
# ═══════════════════════════════════════════════════════════════
# stale-pr-closer.sh — Auto-close conflicted PRs superseded by
# already-merged work.
#
# Designed for cron on Hermes:
# 0 */6 * * * /path/to/the-nexus/.githooks/stale-pr-closer.sh
#
# Closes #1250 (parent epic #1248)
# ═══════════════════════════════════════════════════════════════
set -euo pipefail
# ─── Configuration ──────────────────────────────────────────
GITEA_URL="${GITEA_URL:-https://forge.alexanderwhitestone.com}"
GITEA_TOKEN="${GITEA_TOKEN:?Set GITEA_TOKEN env var}"
REPO="${REPO:-Timmy_Foundation/the-nexus}"
GRACE_HOURS="${GRACE_HOURS:-24}"
DRY_RUN="${DRY_RUN:-false}"
API="$GITEA_URL/api/v1"
AUTH="Authorization: token $GITEA_TOKEN"
log() { echo "[$(date -u +%Y-%m-%dT%H:%M:%SZ)] $*"; }
# ─── Fetch open PRs ────────────────────────────────────────
log "Checking open PRs for $REPO (grace period: ${GRACE_HOURS}h, dry_run: $DRY_RUN)"
OPEN_PRS=$(curl -s -H "$AUTH" "$API/repos/$REPO/pulls?state=open&limit=50")
PR_COUNT=$(echo "$OPEN_PRS" | python3 -c "import json,sys; print(len(json.loads(sys.stdin.read())))")
if [ "$PR_COUNT" = "0" ]; then
log "No open PRs. Done."
exit 0
fi
log "Found $PR_COUNT open PR(s)"
# ─── Fetch recently merged PRs (for supersession check) ────
MERGED_PRS=$(curl -s -H "$AUTH" "$API/repos/$REPO/pulls?state=closed&limit=100&sort=updated&direction=desc")
# ─── Process each open PR ──────────────────────────────────
echo "$OPEN_PRS" | python3 -c "
import json, sys, re
from datetime import datetime, timedelta, timezone
grace_hours = int('$GRACE_HOURS')
dry_run = '$DRY_RUN' == 'true'
api = '$API'
repo = '$REPO'
open_prs = json.loads(sys.stdin.read())
# Read merged PRs from file we'll pipe separately
# (We handle this in the shell wrapper below)
" 2>/dev/null || true
# Use Python for the complex logic
python3 << 'PYEOF'
import json, sys, os, re, subprocess
from datetime import datetime, timedelta, timezone
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
GITEA_TOKEN = os.environ["GITEA_TOKEN"]
REPO = os.environ.get("REPO", "Timmy_Foundation/the-nexus")
GRACE_HOURS = int(os.environ.get("GRACE_HOURS", "24"))
DRY_RUN = os.environ.get("DRY_RUN", "false") == "true"
API = f"{GITEA_URL}/api/v1"
HEADERS = {"Authorization": f"token {GITEA_TOKEN}", "Content-Type": "application/json"}
import urllib.request, urllib.error
def api_get(path):
req = urllib.request.Request(f"{API}{path}", headers=HEADERS)
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())
def api_post(path, data):
body = json.dumps(data).encode()
req = urllib.request.Request(f"{API}{path}", data=body, headers=HEADERS, method="POST")
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())
def api_patch(path, data):
body = json.dumps(data).encode()
req = urllib.request.Request(f"{API}{path}", data=body, headers=HEADERS, method="PATCH")
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())
def log(msg):
from datetime import datetime, timezone
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
print(f"[{ts}] {msg}")
now = datetime.now(timezone.utc)
cutoff = now - timedelta(hours=GRACE_HOURS)
# Fetch open PRs
open_prs = api_get(f"/repos/{REPO}/pulls?state=open&limit=50")
if not open_prs:
log("No open PRs. Done.")
sys.exit(0)
log(f"Found {len(open_prs)} open PR(s)")
# Fetch recently merged PRs
merged_prs = api_get(f"/repos/{REPO}/pulls?state=closed&limit=100&sort=updated&direction=desc")
merged_prs = [p for p in merged_prs if p.get("merged")]
# Build lookup: issue_number -> merged PR that closes it
# Parse "Closes #NNN" from merged PR bodies
def extract_closes(body):
if not body:
return set()
return set(int(m) for m in re.findall(r'(?:closes?|fixes?|resolves?)\s+#(\d+)', body, re.IGNORECASE))
merged_by_issue = {}
for mp in merged_prs:
for issue_num in extract_closes(mp.get("body", "")):
merged_by_issue[issue_num] = mp
# Also build a lookup by title similarity (for PRs that implement same feature without referencing same issue)
merged_by_title_words = {}
for mp in merged_prs:
# Extract meaningful words from title
title = re.sub(r'\[claude\]|\[.*?\]|feat\(.*?\):', '', mp.get("title", "")).strip().lower()
words = set(w for w in re.findall(r'\w+', title) if len(w) > 3)
if words:
merged_by_title_words[mp["number"]] = (words, mp)
closed_count = 0
for pr in open_prs:
pr_num = pr["number"]
pr_title = pr["title"]
mergeable = pr.get("mergeable", True)
updated_at = datetime.fromisoformat(pr["updated_at"].replace("Z", "+00:00"))
# Skip if within grace period
if updated_at > cutoff:
log(f" PR #{pr_num}: within grace period, skipping")
continue
# Check 1: Is it conflicted?
if mergeable:
log(f" PR #{pr_num}: mergeable, skipping")
continue
# Check 2: Does a merged PR close the same issue?
pr_closes = extract_closes(pr.get("body", ""))
superseded_by = None
for issue_num in pr_closes:
if issue_num in merged_by_issue:
superseded_by = merged_by_issue[issue_num]
break
# Check 3: Title similarity match (if no issue match)
if not superseded_by:
pr_title_clean = re.sub(r'\[.*?\]|feat\(.*?\):', '', pr_title).strip().lower()
pr_words = set(w for w in re.findall(r'\w+', pr_title_clean) if len(w) > 3)
best_overlap = 0
for mp_num, (mp_words, mp) in merged_by_title_words.items():
if mp_num == pr_num:
continue
overlap = len(pr_words & mp_words)
# Require at least 60% word overlap
if pr_words and overlap / len(pr_words) >= 0.6 and overlap > best_overlap:
best_overlap = overlap
superseded_by = mp
if not superseded_by:
log(f" PR #{pr_num}: conflicted but no superseding PR found, skipping")
continue
sup_num = superseded_by["number"]
sup_title = superseded_by["title"]
merged_at = superseded_by.get("merged_at", "unknown")[:10]
comment = (
f"**Auto-closed by stale-pr-closer**\n\n"
f"This PR has merge conflicts and has been superseded by #{sup_num} "
f"(\"{sup_title}\"), merged {merged_at}.\n\n"
f"If this PR contains unique work not covered by #{sup_num}, "
f"please reopen and rebase against `main`."
)
if DRY_RUN:
log(f" [DRY RUN] Would close PR #{pr_num} — superseded by #{sup_num}")
else:
# Post comment
api_post(f"/repos/{REPO}/issues/{pr_num}/comments", {"body": comment})
# Close PR
api_patch(f"/repos/{REPO}/pulls/{pr_num}", {"state": "closed"})
log(f" Closed PR #{pr_num} — superseded by #{sup_num} ({sup_title})")
closed_count += 1
log(f"Done. {'Would close' if DRY_RUN else 'Closed'} {closed_count} stale PR(s).")
PYEOF

18
.gitignore vendored
View File

@@ -1,18 +1,10 @@
# === Python bytecode (recursive — covers all subdirectories) ===
**/__pycache__/
*.pyc
*.pyo
# === Node ===
node_modules/
# === Test artifacts ===
test-results/
test-screenshots/
# === Tool configs ===
nexus/__pycache__/
tests/__pycache__/
mempalace/__pycache__/
.aider*
# === Path guardrails (see issue #1145) ===
# Prevent agents from writing to wrong path
# Prevent agents from writing to wrong path (see issue #1145)
public/nexus/
test-screenshots/

View File

@@ -1,54 +1,206 @@
# Contributing to The Nexus
## Issue Assignment — The Lock Protocol
**Rule: Assign before you code.**
Before starting work on any issue, you **must** assign it to yourself. If an issue is already assigned to someone else, **do not submit a competing PR**.
### For Humans
1. Check the issue is unassigned
2. Assign yourself via the Gitea UI (right sidebar → Assignees)
3. Start coding
### For Agents (Claude, Perplexity, Mimo, etc.)
1. Before generating code, call the Gitea API to check assignment:
```
GET /api/v1/repos/{owner}/{repo}/issues/{number}
→ Check assignees array
```
2. If unassigned, self-assign:
```
POST /api/v1/repos/{owner}/{repo}/issues/{number}/assignees
{"assignees": ["your-username"]}
```
3. If already assigned, **stop**. Post a comment offering to help instead.
### Why This Matters
On April 11, 2026, we found 12 stale PRs caused by Rockachopa and the `[claude]` auto-bot racing on the same issues. The auto-bot merged first, orphaning the manual PRs. Assignment-as-lock prevents this race condition.
---
# Contribution & Code Review Policy
## Branch Protection & Review Policy
All repositories enforce these rules on `main`:
All repositories enforce these rules on the `main` branch:
- ✅ Require Pull Request for merge
- ✅ Require 1 approval before merge
- ✅ Dismiss stale approvals on new commits
- <20> Require CI to pass (where CI exists)
- ✅ Block force pushes to `main`
- ✅ Block deletion of `main` branch
| Rule | Status |
|------|--------|
| Require Pull Request for merge | ✅ Enabled |
| Require 1 approval before merge | ✅ Enabled |
| Dismiss stale approvals on new commits | ✅ Enabled |
| Require CI to pass (where CI exists) | ⚠️ Conditional |
| Block force pushes to `main` | ✅ Enabled |
| Block deletion of `main` branch | ✅ Enabled |
### Default Reviewer Assignments
| Repository | Required Reviewers |
|------------------|---------------------------------|
| `hermes-agent` | `@perplexity`, `@Timmy` |
| `the-nexus` | `@perplexity` |
| `timmy-home` | `@perplexity` |
| `timmy-config` | `@perplexity` |
### CI Enforcement Status
| Repository | CI Status |
|------------------|---------------------------------|
| `hermes-agent` | ✅ Active |
| `the-nexus` | <20> CI runner pending (#915) |
| `timmy-home` | ❌ No CI |
| `timmy-config` | ❌ Limited CI |
### Workflow Requirements
1. Create feature branch from `main`
2. Submit PR with clear description
3. Wait for @perplexity review
4. Address feedback if any
5. Merge after approval and passing CI
### Emergency Exceptions
Hotfixes require:
-@Timmy approval
- ✅ Post-merge documentation
- ✅ Follow-up PR for full review
### Abandoned PR Policy
- PRs inactive >7 day: 🧹 archived
- Unreviewed PRs >14 days: ❌ closed
### Policy Enforcement
These rules are enforced by Gitea branch protection settings. Direct pushes to main will be blocked.
- Require rebase to re-enable
## Enforcement
These rules are enforced by Gitea's branch protection settings. Violations will be blocked at the platform level.
# Contribution and Code Review Policy
## Branch Protection Rules
All repositories must enforce the following rules on the `main` branch:
- ✅ Require Pull Request for merge
- ✅ Require 1 approval before merge
- ✅ Dismiss stale approvals when new commits are pushed
- ✅ Require status checks to pass (where CI is configured)
- ✅ Block force-pushing to `main`
- ✅ Block deleting the `main` branch
## Default Reviewer Assignment
All repositories must configure the following default reviewers:
- `@perplexity` as default reviewer for all repositories
- `@Timmy` as required reviewer for `hermes-agent`
- Repo-specific owners for specialized areas
## Implementation Status
| Repository | Branch Protection | CI Enforcement | Default Reviewers |
|------------------|------------------|----------------|-------------------|
| hermes-agent | ✅ Enabled | ✅ Active | @perplexity, @Timmy |
| the-nexus | ✅ Enabled | ⚠️ CI pending | @perplexity |
| timmy-home | ✅ Enabled | ❌ No CI | @perplexity |
| timmy-config | ✅ Enabled | ❌ No CI | @perplexity |
## Compliance Requirements
All contributors must:
1. Never push directly to `main`
2. Create a pull request for all changes
3. Get at least one approval before merging
4. Ensure CI passes before merging (where applicable)
## Policy Enforcement
This policy is enforced via Gitea branch protection rules. Violations will be blocked at the platform level.
For questions about this policy, contact @perplexity or @Timmy.
### Required for All Merges
- [x] Pull Request must exist for all changes
- [x] At least 1 approval from reviewer
- [x] CI checks must pass (where applicable)
- [x] No force pushes allowed
- [x] No direct pushes to main
- [x] No branch deletion
### Review Requirements
- [x] @perplexity must be assigned as reviewer
- [x] @Timmy must review all changes to `hermes-agent/`
- [x] No self-approvals allowed
### CI/CD Enforcement
- [x] CI must be configured for all new features
- [x] Failing CI blocks merge
- [x] CI status displayed in PR header
### Abandoned PR Policy
- PRs inactive >7 days get "needs attention" label
- PRs inactive >21 days are archived
- PRs inactive >90 days are closed
- [ ] At least 1 approval from reviewer
- [ ] CI checks must pass (where available)
- [ ] No force pushes allowed
- [ ] No direct pushes to main
- [ ] No branch deletion
### Review Requirements by Repository
```yaml
hermes-agent:
required_owners:
- perplexity
- Timmy
the-nexus:
required_owners:
- perplexity
timmy-home:
required_owners:
- perplexity
timmy-config:
required_owners:
- perplexity
```
### CI Status
```text
- hermes-agent: ✅ Active
- the-nexus: ⚠️ CI runner disabled (see #915)
- timmy-home: - (No CI)
- timmy-config: - (Limited CI)
```
### Branch Protection Status
All repositories now enforce:
- Require PR for merge
- 1+ approvals required
- CI/CD must pass (where applicable)
- Force push and branch deletion blocked
- hermes-agent: ✅ Active
- the-nexus: ⚠️ CI runner disabled (see #915)
- timmy-home: - (No CI)
- timmy-config: - (Limited CI)
```
## Workflow
1. Create feature branch
2. Open PR against main
3. Get 1+ approvals
4. Ensure CI passes
5. Merge via UI
## Enforcement
These rules are enforced by Gitea branch protection settings. Direct pushes to main will be blocked.
## Abandoned PRs
PRs not updated in >7 days will be labeled "stale" and may be closed after 30 days of inactivity.
# Contributing to the Nexus
**Every PR: net ≤ 10 added lines.** Not a guideline — a hard limit.
Add 40, remove 30. Can't remove? You're homebrewing. Import instead.
## Branch Protection & Review Policy
### Branch Protection Rules
All repositories enforce the following rules on the `main` branch:
| Rule | Status | Applies To |
|------|--------|------------|
| Require Pull Request for merge | ✅ Enabled | All |
| Require 1 approval before merge | ✅ Enabled | All |
| Dismiss stale approvals on new commits | ✅ Enabled | All |
| Require CI to pass (where CI exists) | ⚠️ Conditional | All |
| Block force pushes to `main` | ✅ Enabled | All |
| Block deletion of `main` branch | ✅ Enabled | All |
### Default Reviewer Assignments
| Repository | Required Reviewers |
|------------|-------------------|
|------------|------------------|
| `hermes-agent` | `@perplexity`, `@Timmy` |
| `the-nexus` | `@perplexity` |
| `timmy-home` | `@perplexity` |
@@ -63,93 +215,199 @@ All repositories enforce these rules on `main`:
| `timmy-home` | ❌ No CI |
| `timmy-config` | ❌ Limited CI |
---
### Review Requirements
## Branch Naming
- All PRs must be reviewed by at least one reviewer
- `@perplexity` is the default reviewer for all repositories
- `@Timmy` is a required reviewer for `hermes-agent`
Use descriptive prefixes:
All repositories enforce:
- ✅ Require Pull Request for merge
- ✅ Require 1 approval
- ⚠<> Require CI to pass (CI runner pending)
- ✅ Dismiss stale approvals on new commits
- ✅ Block force pushes
- ✅ Block branch deletion
| Prefix | Use |
|--------|-----|
| `feat/` | New features |
| `fix/` | Bug fixes |
| `epic/` | Multi-issue epic branches |
| `docs/` | Documentation only |
## Review Requirements
Example: `feat/mnemosyne-memory-decay`
- Mandatory reviewer: `@perplexity` for all repos
- Mandatory reviewer: `@Timmy` for `hermes-agent/`
- Optional: Add repo-specific owners for specialized areas
---
## Implementation Status
## PR Requirements
- ✅ hermes-agent: All protections enabled
- ✅ the-nexus: PR + 1 approval enforced
- ✅ timmy-home: PR + 1 approval enforced
- ✅ timmy-config: PR + 1 approval enforced
1. **Rebase before merge** — PRs must be up-to-date with `main`. If you have merge conflicts, rebase locally and force-push.
2. **Reference the issue** — Use `Closes #NNN` in the PR body so Gitea auto-closes the issue on merge.
3. **No bytecode** — Never commit `__pycache__/` or `.pyc` files. The `.gitignore` handles this, but double-check.
4. **One feature per PR** — Avoid omnibus PRs that bundle multiple unrelated features. They're harder to review and more likely to conflict.
> CI enforcement pending runner restoration (#915)
---
## What gets preserved from legacy Matrix
## Path Conventions
High-value candidates include:
- visitor movement / embodiment
- chat, bark, and presence systems
- transcript logging
- ambient / visual atmosphere systems
- economy / satflow visualizations
- smoke and browser validation discipline
| Module | Canon Path |
|--------|-----------|
| Mnemosyne (backend) | `nexus/mnemosyne/` |
| Mnemosyne (frontend) | `nexus/components/` |
| MemPalace | `nexus/mempalace/` |
| Scripts/tools | `bin/` |
| Git hooks/automation | `.githooks/` |
| Tests | `nexus/mnemosyne/tests/` |
Those
```
**Never** create a duplicate module at the repo root (e.g., `mnemosyne/` when `nexus/mnemosyne/` already exists). Check `FEATURES.yaml` manifests for the canonical path.
README.md
````
<<<<<<< SEARCH
# Contribution & Code Review Policy
---
## Branch Protection Rules (Enforced via Gitea)
All repositories must have the following branch protection rules enabled on the `main` branch:
## Feature Manifests
1. **Require Pull Request for Merge**
- Prevent direct commits to `main`
- All changes must go through PR process
Each major module maintains a `FEATURES.yaml` manifest that declares:
- What exists (status: `shipped`)
- What's in progress (status: `in-progress`, with assignee)
- What's planned (status: `planned`)
# Contribution & Code Review Policy
**Check the manifest before creating new PRs.** If your feature is already shipped, you're duplicating work. If it's in-progress by someone else, coordinate.
## Branch Protection & Review Policy
Current manifests:
- [`nexus/mnemosyne/FEATURES.yaml`](nexus/mnemosyne/FEATURES.yaml)
See [POLICY.md](POLICY.md) for full branch protection rules and review requirements. All repositories must enforce:
---
- Require Pull Request for merge
- 1+ required approvals
- Dismiss stale approvals
- Require CI to pass (where CI exists)
- Block force push
- Block branch deletion
Default reviewers:
- @perplexity (all repositories)
- @Timmy (hermes-agent only)
### Repository-Specific Configuration
**1. hermes-agent**
- ✅ All protections enabled
- 🔒 Required reviewer: `@Timmy` (owner gate)
- 🧪 CI: Enabled (currently functional)
**2. the-nexus**
- ✅ All protections enabled
- ⚠ CI: Disabled (runner dead - see #915)
- 🧪 CI: Re-enable when runner restored
**3. timmy-home**
- ✅ PR + 1 approval required
- 🧪 CI: No CI configured
**4. timmy-config**
- ✅ PR + 1 approval required
- 🧪 CI: Limited CI
### Default Reviewer Assignment
All repositories must:
- 🧑‍ Default reviewer: `@perplexity` (QA gate)
- 🧑 Required reviewer: `@Timmy` for `hermes-agent/` only
### Acceptance Criteria
- [x] All four repositories have protection rules applied
- [x] Default reviewers configured per matrix above
- [x] This policy documented in all repositories
- [x] Policy enforced for 72 hours with no unreviewed merges
> This policy replaces all previous ad-hoc workflows. Any exceptions require written approval from @Timmy and @perplexity.
All repositories enforce:
- ✅ Require Pull Request for merge
- ✅ Minimum 1 approval required
- ✅ Dismiss stale approvals on new commits
- ⚠️ Require CI to pass (CI runner pending for the-nexus)
- ✅ Block force push to `main`
- ✅ Block deletion of `main` branch
## Review Requirement
- 🧑‍ Default reviewer: `@perplexity` (QA gate)
- 🧑 Required reviewer: `@Timmy` for `hermes-agent/` only
## Workflow
1. Check the issue is unassigned → self-assign
2. Check `FEATURES.yaml` for the relevant module
3. Create feature branch from `main`
4. Submit PR with clear description and `Closes #NNN`
5. Wait for reviewer approval
6. Rebase if needed, then merge
### Emergency Exceptions
Hotfixes require:
- ✅ @Timmy approval
- ✅ Post-merge documentation
- ✅ Follow-up PR for full review
---
## Stale PR Policy
A cron job runs every 6 hours and auto-closes PRs that are:
1. **Conflicted** (not mergeable)
2. **Superseded** by a merged PR that closes the same issue or implements the same feature
Closed PRs receive a comment explaining which PR superseded them. If your PR was auto-closed but contains unique work, reopen it, rebase against `main`, and update the feature manifest.
---
1. Create feature branch from `main`
2. Submit PR with clear description
3. Wait for @perplexity review
4. Address feedback if any
5. Merge after approval and passing CI
## CI/CD Requirements
- All main branch merge require:
- ✅ Linting
- ✅ Unit tests
- ⚠️ Integration tests (pending for the-nexus)
- ✅ Security scans
All main branch merges require (where applicable):
- ✅ Linting
- ✅ Unit tests
- ⚠️ Integration tests (pending for the-nexus, see #915)
- ✅ Security scans
## Exceptions
- Emergency hotfixes require:
- ✅ @Timmy approval
- ✅ Post-merge documentation
- ✅ Follow-up PR for full review
## Abandoned PRs
- PRs inactive >7 days: 🧹 archived
- Unreviewed PRs >14 days: ❌ closed
## CI Status
- ✅ hermes-agent: CI active
- <20> the-nexus: CI runner dead (see #915)
- ✅ timmy-home: No CI
- <20> timmy-config: Limited CI
>>>>>>> replace
```
CODEOWNERS
```text
<<<<<<< search
# Contribution & Code Review Policy
## Branch Protection Rules
All repositories must:
- ✅ Require PR for merge
- ✅ Require 1 approval
- ✅ Dismiss stale approvals
- ⚠️ Require CI to pass (where exists)
- ✅ Block force push
- ✅ block branch deletion
## Review Requirements
- 🧑 Default reviewer: `@perplexity` for all repos
- 🧑 Required reviewer: `@Timmy` for `hermes-agent/`
## Workflow
1. Create feature branch from `main`
2. Submit PR with clear description
3. Wait for @perplexity review
4. Address feedback if any
5. Merge after approval and passing CI
## CI/CD Requirements
- All main branch merges require:
- ✅ Linting
- ✅ Unit tests
- ⚠️ Integration tests (pending for the-nexus)
- ✅ Security scans
## Exceptions
- Emergency hotfixes require:
-@Timmy approval
- ✅ Post-merge documentation
- ✅ Follow-up PR for full review
## Abandoned PRs
- PRs inactive >7 days: 🧹 archived
- Unreviewed PRs >14 days: ❌ closed
## CI Status
- ✅ hermes-agent: ci active
- ⚠️ the-nexus: ci runner dead (see #915)
- ✅ timmy-home: No ci
- ⚠️ timmy-config: Limited ci

3
app.js
View File

@@ -7,7 +7,6 @@ import { SpatialMemory } from './nexus/components/spatial-memory.js';
import { MemoryBirth } from './nexus/components/memory-birth.js';
import { MemoryOptimizer } from './nexus/components/memory-optimizer.js';
import { MemoryInspect } from './nexus/components/memory-inspect.js';
import { MemoryPulse } from './nexus/components/memory-pulse.js';
// ═══════════════════════════════════════════
// NEXUS v1.1 — Portal System Update
@@ -716,7 +715,6 @@ async function init() {
MemoryBirth.wrapSpatialMemory(SpatialMemory);
SpatialMemory.setCamera(camera);
MemoryInspect.init({ onNavigate: _navigateToMemory });
MemoryPulse.init(scene);
updateLoad(90);
loadSession();
@@ -1949,7 +1947,6 @@ function setupControls() {
SpatialMemory.highlightMemory(entry.data.id);
const regionDef = SpatialMemory.REGIONS[entry.region] || SpatialMemory.REGIONS.working;
MemoryInspect.show(entry.data, regionDef);
MemoryPulse.trigger(entry.data.id, SpatialMemory);
}
} else {
// Clicked empty space — close inspect panel and deselect crystal

Binary file not shown.

Binary file not shown.

View File

@@ -477,10 +477,6 @@ index.html
<div id="memory-inspect-panel" class="memory-inspect-panel" style="display:none;" aria-label="Memory Inspect Panel">
</div>
<!-- Memory Connections Panel (Mnemosyne) -->
<div id="memory-connections-panel" class="memory-connections-panel" style="display:none;" aria-label="Memory Connections Panel">
</div>
<script>
// ─── MNEMOSYNE: Memory Filter Panel ───────────────────
function openMemoryFilter() {

81
mnemosyne/README.md Normal file
View File

@@ -0,0 +1,81 @@
# Mnemosyne — The Living Holographic Archive
A sovereign, on-chain anchored memory system that ingests documents, conversations, and artifacts into a searchable holographic index.
## Design Principles
- **No network calls** at ingest time — embeddings are optional, compute locally or skip
- **SQLite + FTS5 only** — no external vector DB dependency
- **Pluggable embedding backend** (sentence-transformers, Ollama, or none)
- **Compact** — the whole module < 500 lines of Python
## Quick Start
### Ingest documents
```bash
# Single file
python -m mnemosyne.cli ingest path/to/document.md
# Directory tree
python -m mnemosyne.cli ingest path/to/docs/
# Custom chunk size
python -m mnemosyne.cli ingest docs/ --chunk-size 1024 --overlap 128
```
### Query the archive
```bash
python -m mnemosyne.cli query "sovereignty and Bitcoin"
```
### Browse the archive
```bash
python -m mnemosyne.cli list
python -m mnemosyne.cli stats
python -m mnemosyne.cli doc 42
```
## Python API
```python
from mnemosyne.ingest import ingest_text, ingest_file
from mnemosyne.index import query
# Ingest
doc_id = ingest_text("Your content here", source="manual", title="My Note")
# Search
results = query("sovereignty and Bitcoin")
for r in results:
print(f"[{r['score']:.4f}] {r['title']}: {r['content'][:100]}")
```
## Architecture
```
mnemosyne/
├── __init__.py # Package metadata
├── ingest.py # Document ingestion + chunking + SQLite storage
├── index.py # Holographic index: keyword + semantic search + RRF
├── cli.py # CLI entry point
└── README.md # This file
```
### Storage Schema
- **documents** — raw documents with source, title, content, metadata, dedup hash
- **chunks** — overlapping text chunks linked to documents
- **chunks_fts** — FTS5 virtual table with porter stemming + unicode61 tokenizer
### Search Modes
1. **Keyword** (default) — FTS5 full-text search with BM25 scoring
2. **Semantic** — cosine similarity over pre-computed embeddings (requires embedding backend)
3. **Hybrid** — Reciprocal Rank Fusion merging both result sets
## Closes
[#1242](https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1242)

10
mnemosyne/__init__.py Normal file
View File

@@ -0,0 +1,10 @@
"""
Mnemosyne — The Living Holographic Archive
A sovereign, on-chain anchored memory system that ingests documents,
conversations, and artifacts into a searchable holographic index.
No network calls at ingest time. SQLite + FTS5 only. Pluggable embedding backend.
"""
__version__ = "0.1.0"

Binary file not shown.

Binary file not shown.

Binary file not shown.

163
mnemosyne/cli.py Normal file
View File

@@ -0,0 +1,163 @@
"""
Mnemosyne CLI
Usage:
mnemosyne ingest <path> [--db PATH] [--chunk-size N] [--overlap N]
mnemosyne query <text> [--db PATH] [--limit N]
mnemosyne list [--db PATH] [--limit N]
mnemosyne stats [--db PATH]
mnemosyne doc <id> [--db PATH]
"""
import argparse
import json
import sys
from pathlib import Path
from .ingest import ingest_file, ingest_directory, get_stats, DEFAULT_DB_PATH, DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_OVERLAP
from .index import query, list_documents, get_document
def cmd_ingest(args):
"""Ingest files or directories into the archive."""
p = Path(args.path)
db = args.db or DEFAULT_DB_PATH
if p.is_dir():
result = ingest_directory(
str(p), db_path=db,
chunk_size=args.chunk_size, chunk_overlap=args.overlap,
)
print(f"Ingested: {result['ingested']} files")
print(f"Skipped (duplicates): {result['skipped']}")
if result["errors"]:
print(f"Errors: {len(result['errors'])}")
for err in result["errors"]:
print(f" {err['file']}: {err['error']}")
elif p.is_file():
doc_id = ingest_file(
str(p), db_path=db,
chunk_size=args.chunk_size, chunk_overlap=args.overlap,
)
if doc_id is not None:
print(f"Ingested: {p.name} (doc_id={doc_id})")
else:
print(f"Skipped (duplicate): {p.name}")
else:
print(f"Error: {args.path} not found", file=sys.stderr)
sys.exit(1)
def cmd_query(args):
"""Query the holographic archive."""
db = args.db or DEFAULT_DB_PATH
results = query(args.text, db_path=db, limit=args.limit)
if not results:
print("No results found.")
return
for i, r in enumerate(results, 1):
source = r.get("source", "?")
title = r.get("title") or Path(source).name
score = r.get("rrf_score") or r.get("score", 0)
methods = r.get("methods") or [r.get("method", "?")]
content_preview = r["content"][:200].replace("\n", " ")
print(f"[{i}] {title}")
print(f" Source: {source}")
print(f" Score: {score:.4f} ({', '.join(methods)})")
print(f" {content_preview}...")
print()
def cmd_list(args):
"""List documents in the archive."""
db = args.db or DEFAULT_DB_PATH
docs = list_documents(db_path=db, limit=args.limit)
if not docs:
print("Archive is empty.")
return
print(f"{'ID':>5} {'Chunks':>6} {'Title':<40} Source")
print("-" * 90)
for d in docs:
title = (d["title"] or "?")[:40]
source = Path(d["source"]).name[:30] if d["source"] else "?"
print(f"{d['id']:>5} {d['chunks']:>6} {title:<40} {source}")
def cmd_stats(args):
"""Show archive statistics."""
db = args.db or DEFAULT_DB_PATH
s = get_stats(db_path=db)
print(f"Documents: {s['documents']}")
print(f"Chunks: {s['chunks']}")
print(f"Sources: {s['sources']}")
def cmd_doc(args):
"""Show a document by ID."""
db = args.db or DEFAULT_DB_PATH
d = get_document(args.id, db_path=db)
if not d:
print(f"Document #{args.id} not found.")
sys.exit(1)
print(f"ID: {d['id']}")
print(f"Title: {d['title']}")
print(f"Source: {d['source']}")
print(f"Ingested: {d['ingested_at']}")
print(f"Metadata: {json.dumps(d['metadata'], indent=2)}")
print(f"\n--- Content ({len(d['content'])} chars) ---\n")
print(d["content"])
def main():
parser = argparse.ArgumentParser(
prog="mnemosyne",
description="Mnemosyne — The Living Holographic Archive",
)
parser.add_argument("--db", help="Database path (default: mnemosyne.db)")
sub = parser.add_subparsers(dest="command")
# ingest
p_ingest = sub.add_parser("ingest", help="Ingest files or directories")
p_ingest.add_argument("path", help="File or directory to ingest")
p_ingest.add_argument("--chunk-size", type=int, default=DEFAULT_CHUNK_SIZE)
p_ingest.add_argument("--overlap", type=int, default=DEFAULT_CHUNK_OVERLAP)
# query
p_query = sub.add_parser("query", help="Search the archive")
p_query.add_argument("text", help="Search query")
p_query.add_argument("--limit", type=int, default=10)
# list
p_list = sub.add_parser("list", help="List documents in archive")
p_list.add_argument("--limit", type=int, default=50)
# stats
sub.add_parser("stats", help="Show archive statistics")
# doc
p_doc = sub.add_parser("doc", help="Show document by ID")
p_doc.add_argument("id", type=int, help="Document ID")
args = parser.parse_args()
if args.command == "ingest":
cmd_ingest(args)
elif args.command == "query":
cmd_query(args)
elif args.command == "list":
cmd_list(args)
elif args.command == "stats":
cmd_stats(args)
elif args.command == "doc":
cmd_doc(args)
else:
parser.print_help()
if __name__ == "__main__":
main()

228
mnemosyne/index.py Normal file
View File

@@ -0,0 +1,228 @@
"""
Mnemosyne Holographic Index
Query interface: keyword search (FTS5) + semantic search (embedding similarity).
Merges results with reciprocal rank fusion.
"""
import json
import sqlite3
import math
from typing import Optional
from .ingest import get_db, DEFAULT_DB_PATH
def keyword_search(
query: str,
db_path: str = DEFAULT_DB_PATH,
limit: int = 10,
) -> list[dict]:
"""Full-text search using FTS5 with BM25 scoring.
Returns list of {chunk_id, doc_id, content, source, title, score}.
"""
conn = get_db(db_path)
# FTS5 query with BM25 ranking
rows = conn.execute("""
SELECT
c.id as chunk_id,
c.doc_id,
c.content,
d.source,
d.title,
d.metadata,
rank as bm25_score
FROM chunks_fts fts
JOIN chunks c ON c.id = fts.rowid
JOIN documents d ON d.id = c.doc_id
WHERE chunks_fts MATCH ?
ORDER BY rank
LIMIT ?
""", (query, limit)).fetchall()
results = []
for row in rows:
results.append({
"chunk_id": row[0],
"doc_id": row[1],
"content": row[2],
"source": row[3],
"title": row[4],
"metadata": json.loads(row[5]) if row[5] else {},
"score": abs(row[6]), # BM25 is negative, take abs for ranking
"method": "keyword",
})
conn.close()
return results
def semantic_search(
query_embedding: list[float],
db_path: str = DEFAULT_DB_PATH,
limit: int = 10,
) -> list[dict]:
"""Cosine similarity search over stored embeddings.
Requires embeddings to be pre-computed and stored as BLOB in chunks table.
Returns empty list if no embeddings are available.
"""
conn = get_db(db_path)
# Check if any embeddings exist
has_embeddings = conn.execute(
"SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL"
).fetchone()[0]
if has_embeddings == 0:
conn.close()
return []
rows = conn.execute("""
SELECT
c.id as chunk_id,
c.doc_id,
c.content,
c.embedding,
d.source,
d.title,
d.metadata
FROM chunks c
JOIN documents d ON d.id = c.doc_id
WHERE c.embedding IS NOT NULL
""").fetchall()
import struct
results = []
query_norm = math.sqrt(sum(x * x for x in query_embedding)) or 1.0
for row in rows:
# Deserialize embedding from BLOB (list of float32)
emb_bytes = row[3]
n_floats = len(emb_bytes) // 4
emb = struct.unpack(f"{n_floats}f", emb_bytes)
# Cosine similarity
dot = sum(a * b for a, b in zip(query_embedding, emb))
emb_norm = math.sqrt(sum(x * x for x in emb)) or 1.0
similarity = dot / (query_norm * emb_norm)
results.append({
"chunk_id": row[0],
"doc_id": row[1],
"content": row[2],
"source": row[4],
"title": row[5],
"metadata": json.loads(row[6]) if row[6] else {},
"score": similarity,
"method": "semantic",
})
conn.close()
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
def reciprocal_rank_fusion(
keyword_results: list[dict],
semantic_results: list[dict],
k: int = 60,
limit: int = 10,
) -> list[dict]:
"""Merge keyword and semantic results using Reciprocal Rank Fusion.
RRF score = sum(1 / (k + rank_i)) across result lists.
"""
rrf_scores: dict[int, float] = {}
chunk_map: dict[int, dict] = {}
for rank, result in enumerate(keyword_results):
cid = result["chunk_id"]
rrf_scores[cid] = rrf_scores.get(cid, 0) + 1.0 / (k + rank + 1)
chunk_map[cid] = result
for rank, result in enumerate(semantic_results):
cid = result["chunk_id"]
rrf_scores[cid] = rrf_scores.get(cid, 0) + 1.0 / (k + rank + 1)
chunk_map[cid] = result
# Sort by RRF score
merged = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
results = []
for cid, score in merged[:limit]:
entry = chunk_map[cid].copy()
entry["rrf_score"] = score
entry["methods"] = []
if any(r["chunk_id"] == cid for r in keyword_results):
entry["methods"].append("keyword")
if any(r["chunk_id"] == cid for r in semantic_results):
entry["methods"].append("semantic")
results.append(entry)
return results
def query(
text: str,
db_path: str = DEFAULT_DB_PATH,
limit: int = 10,
query_embedding: Optional[list[float]] = None,
) -> list[dict]:
"""Unified query: keyword search + optional semantic search, merged with RRF.
If query_embedding is provided and embeddings exist in DB, uses hybrid search.
Otherwise falls back to keyword-only.
"""
kw_results = keyword_search(text, db_path=db_path, limit=limit)
if query_embedding is not None:
sem_results = semantic_search(query_embedding, db_path=db_path, limit=limit)
if sem_results:
return reciprocal_rank_fusion(kw_results, sem_results, limit=limit)
return kw_results
def get_document(doc_id: int, db_path: str = DEFAULT_DB_PATH) -> Optional[dict]:
"""Retrieve a full document by ID."""
conn = get_db(db_path)
row = conn.execute(
"SELECT id, source, title, content, metadata, ingested_at FROM documents WHERE id = ?",
(doc_id,),
).fetchone()
conn.close()
if not row:
return None
return {
"id": row[0],
"source": row[1],
"title": row[2],
"content": row[3],
"metadata": json.loads(row[4]) if row[4] else {},
"ingested_at": row[5],
}
def list_documents(
db_path: str = DEFAULT_DB_PATH,
limit: int = 50,
offset: int = 0,
) -> list[dict]:
"""List documents in the archive with chunk counts."""
conn = get_db(db_path)
rows = conn.execute("""
SELECT d.id, d.source, d.title, d.ingested_at,
COUNT(c.id) as chunk_count
FROM documents d
LEFT JOIN chunks c ON c.doc_id = d.id
GROUP BY d.id
ORDER BY d.ingested_at DESC
LIMIT ? OFFSET ?
""", (limit, offset)).fetchall()
conn.close()
return [
{"id": r[0], "source": r[1], "title": r[2], "ingested_at": r[3], "chunks": r[4]}
for r in rows
]

267
mnemosyne/ingest.py Normal file
View File

@@ -0,0 +1,267 @@
"""
Mnemosyne Ingestion Pipeline
Accepts text/JSON/markdown inputs, chunks them with overlap,
stores in local SQLite + FTS5 for keyword search.
Embedding backend is pluggable (compute locally or skip).
"""
import json
import os
import re
import sqlite3
import hashlib
from pathlib import Path
from typing import Optional
DEFAULT_CHUNK_SIZE = 512
DEFAULT_CHUNK_OVERLAP = 64
DEFAULT_DB_PATH = "mnemosyne.db"
def get_db(db_path: str = DEFAULT_DB_PATH) -> sqlite3.Connection:
"""Open or create the Mnemosyne SQLite database with FTS5 tables."""
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.executescript("""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
doc_hash TEXT UNIQUE NOT NULL,
source TEXT NOT NULL,
title TEXT,
content TEXT NOT NULL,
metadata TEXT DEFAULT '{}',
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
doc_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
embedding BLOB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(doc_id, chunk_index)
);
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
content,
content=chunks,
content_rowid=id,
tokenize='porter unicode61'
);
-- Triggers to keep FTS5 in sync
CREATE TRIGGER IF NOT EXISTS chunks_ai AFTER INSERT ON chunks BEGIN
INSERT INTO chunks_fts(rowid, content) VALUES (new.id, new.content);
END;
CREATE TRIGGER IF NOT EXISTS chunks_ad AFTER DELETE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, content)
VALUES('delete', old.id, old.content);
END;
CREATE TRIGGER IF NOT EXISTS chunks_au AFTER UPDATE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, content)
VALUES('delete', old.id, old.content);
INSERT INTO chunks_fts(rowid, content) VALUES (new.id, new.content);
END;
""")
conn.commit()
return conn
def chunk_text(
text: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
overlap: int = DEFAULT_CHUNK_OVERLAP,
) -> list[str]:
"""Split text into overlapping chunks by character count.
Tries to break at paragraph > sentence > word boundaries.
"""
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
if end >= len(text):
chunks.append(text[start:].strip())
break
# Try to find a clean break point
segment = text[start:end]
# Prefer paragraph break
last_para = segment.rfind("\n\n")
if last_para > chunk_size * 0.5:
end = start + last_para + 2
else:
# Try sentence boundary
last_period = max(
segment.rfind(". "),
segment.rfind("! "),
segment.rfind("? "),
segment.rfind(".\n"),
)
if last_period > chunk_size * 0.5:
end = start + last_period + 2
else:
# Fall back to word boundary
last_space = segment.rfind(" ")
if last_space > chunk_size * 0.5:
end = start + last_space + 1
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = max(start + 1, end - overlap)
return chunks
def _hash_content(content: str, source: str) -> str:
"""Deterministic hash for deduplication."""
return hashlib.sha256(f"{source}:{content}".encode()).hexdigest()[:32]
def ingest_text(
content: str,
source: str = "inline",
title: Optional[str] = None,
metadata: Optional[dict] = None,
db_path: str = DEFAULT_DB_PATH,
chunk_size: int = DEFAULT_CHUNK_SIZE,
chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
) -> Optional[int]:
"""Ingest a single text document into the archive.
Returns the doc_id if new, None if duplicate.
"""
conn = get_db(db_path)
doc_hash = _hash_content(content, source)
# Deduplicate
existing = conn.execute(
"SELECT id FROM documents WHERE doc_hash = ?", (doc_hash,)
).fetchone()
if existing:
conn.close()
return None
cursor = conn.execute(
"INSERT INTO documents (doc_hash, source, title, content, metadata) VALUES (?, ?, ?, ?, ?)",
(doc_hash, source, title, content, json.dumps(metadata or {})),
)
doc_id = cursor.lastrowid
chunks = chunk_text(content, chunk_size, chunk_overlap)
for i, chunk in enumerate(chunks):
conn.execute(
"INSERT INTO chunks (doc_id, chunk_index, content) VALUES (?, ?, ?)",
(doc_id, i, chunk),
)
conn.commit()
conn.close()
return doc_id
def ingest_file(
path: str,
db_path: str = DEFAULT_DB_PATH,
chunk_size: int = DEFAULT_CHUNK_SIZE,
chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
) -> Optional[int]:
"""Ingest a file (text, markdown, JSON) into the archive.
For JSON files, extracts text from common fields (body, text, content, message).
"""
p = Path(path)
if not p.exists():
raise FileNotFoundError(f"File not found: {path}")
source = str(p.resolve())
title = p.stem
if p.suffix.lower() == ".json":
data = json.loads(p.read_text())
if isinstance(data, str):
content = data
elif isinstance(data, dict):
content = data.get("body") or data.get("text") or data.get("content") or data.get("message") or json.dumps(data, indent=2)
title = data.get("title", title)
elif isinstance(data, list):
# Array of records — ingest each as a separate doc
ids = []
for item in data:
if isinstance(item, str):
rid = ingest_text(item, source=source, db_path=db_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
else:
text_content = item.get("body") or item.get("text") or item.get("content") or json.dumps(item, indent=2)
item_title = item.get("title", title)
rid = ingest_text(text_content, source=source, title=item_title, metadata=item, db_path=db_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
if rid is not None:
ids.append(rid)
return ids[0] if ids else None
else:
content = json.dumps(data, indent=2)
else:
content = p.read_text(encoding="utf-8", errors="replace")
return ingest_text(content, source=source, title=title, db_path=db_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
def ingest_directory(
dir_path: str,
extensions: tuple[str, ...] = (".txt", ".md", ".json", ".py", ".js", ".yaml", ".yml"),
db_path: str = DEFAULT_DB_PATH,
chunk_size: int = DEFAULT_CHUNK_SIZE,
chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
) -> dict:
"""Ingest all matching files from a directory tree.
Returns {"ingested": N, "skipped": N, "errors": [...]}
"""
result = {"ingested": 0, "skipped": 0, "errors": []}
p = Path(dir_path)
if not p.is_dir():
raise NotADirectoryError(f"Not a directory: {dir_path}")
for fpath in sorted(p.rglob("*")):
if not fpath.is_file():
continue
if fpath.suffix.lower() not in extensions:
continue
# Skip hidden dirs and __pycache__
parts = fpath.relative_to(p).parts
if any(part.startswith(".") or part == "__pycache__" for part in parts):
continue
try:
doc_id = ingest_file(
str(fpath), db_path=db_path,
chunk_size=chunk_size, chunk_overlap=chunk_overlap,
)
if doc_id is not None:
result["ingested"] += 1
else:
result["skipped"] += 1
except Exception as e:
result["errors"].append({"file": str(fpath), "error": str(e)})
return result
def get_stats(db_path: str = DEFAULT_DB_PATH) -> dict:
"""Return archive statistics."""
conn = get_db(db_path)
docs = conn.execute("SELECT COUNT(*) FROM documents").fetchone()[0]
chunks = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()[0]
sources = conn.execute("SELECT COUNT(DISTINCT source) FROM documents").fetchone()[0]
conn.close()
return {"documents": docs, "chunks": chunks, "sources": sources}

View File

@@ -1,291 +0,0 @@
// ═══════════════════════════════════════════════════════════
// MNEMOSYNE — Memory Connection Panel
// ═══════════════════════════════════════════════════════════
//
// Interactive panel for browsing, adding, and removing memory
// connections. Opens as a sub-panel from MemoryInspect when
// a memory crystal is selected.
//
// Usage from app.js:
// MemoryConnections.init({
// onNavigate: fn(memId), // fly to another memory
// onConnectionChange: fn(memId, newConnections) // update hooks
// });
// MemoryConnections.show(memData, allMemories);
// MemoryConnections.hide();
//
// Depends on: SpatialMemory (for updateMemory + highlightMemory)
// ═══════════════════════════════════════════════════════════
const MemoryConnections = (() => {
let _panel = null;
let _onNavigate = null;
let _onConnectionChange = null;
let _currentMemId = null;
let _hoveredConnId = null;
// ─── INIT ────────────────────────────────────────────────
function init(opts = {}) {
_onNavigate = opts.onNavigate || null;
_onConnectionChange = opts.onConnectionChange || null;
_panel = document.getElementById('memory-connections-panel');
if (!_panel) {
console.warn('[MemoryConnections] Panel element #memory-connections-panel not found in DOM');
}
}
// ─── SHOW ────────────────────────────────────────────────
function show(memData, allMemories) {
if (!_panel || !memData) return;
_currentMemId = memData.id;
const connections = memData.connections || [];
const connectedSet = new Set(connections);
// Build lookup for connected memories
const memLookup = {};
(allMemories || []).forEach(m => { memLookup[m.id] = m; });
// Connected memories list
let connectedHtml = '';
if (connections.length > 0) {
connectedHtml = connections.map(cid => {
const cm = memLookup[cid];
const label = cm ? _truncate(cm.content || cid, 40) : cid;
const cat = cm ? cm.category : '';
const strength = cm ? Math.round((cm.strength || 0.7) * 100) : 70;
return `
<div class="mc-conn-item" data-memid="${_esc(cid)}">
<div class="mc-conn-info">
<span class="mc-conn-label" title="${_esc(cid)}">${_esc(label)}</span>
<span class="mc-conn-meta">${_esc(cat)} · ${strength}%</span>
</div>
<div class="mc-conn-actions">
<button class="mc-btn mc-btn-nav" data-nav="${_esc(cid)}" title="Navigate to memory">⮞</button>
<button class="mc-btn mc-btn-remove" data-remove="${_esc(cid)}" title="Remove connection">✕</button>
</div>
</div>`;
}).join('');
} else {
connectedHtml = '<div class="mc-empty">No connections yet</div>';
}
// Find nearby unconnected memories (same region, then other regions)
const suggestions = _findSuggestions(memData, allMemories, connectedSet);
let suggestHtml = '';
if (suggestions.length > 0) {
suggestHtml = suggestions.map(s => {
const label = _truncate(s.content || s.id, 36);
const cat = s.category || '';
const proximity = s._proximity || '';
return `
<div class="mc-suggest-item" data-memid="${_esc(s.id)}">
<div class="mc-suggest-info">
<span class="mc-suggest-label" title="${_esc(s.id)}">${_esc(label)}</span>
<span class="mc-suggest-meta">${_esc(cat)} · ${_esc(proximity)}</span>
</div>
<button class="mc-btn mc-btn-add" data-add="${_esc(s.id)}" title="Add connection">+</button>
</div>`;
}).join('');
} else {
suggestHtml = '<div class="mc-empty">No nearby memories to connect</div>';
}
_panel.innerHTML = `
<div class="mc-header">
<span class="mc-title">⬡ Connections</span>
<button class="mc-close" id="mc-close-btn" aria-label="Close connections panel">✕</button>
</div>
<div class="mc-section">
<div class="mc-section-label">LINKED (${connections.length})</div>
<div class="mc-conn-list" id="mc-conn-list">${connectedHtml}</div>
</div>
<div class="mc-section">
<div class="mc-section-label">SUGGESTED</div>
<div class="mc-suggest-list" id="mc-suggest-list">${suggestHtml}</div>
</div>
`;
// Wire close button
_panel.querySelector('#mc-close-btn')?.addEventListener('click', hide);
// Wire navigation buttons
_panel.querySelectorAll('[data-nav]').forEach(btn => {
btn.addEventListener('click', () => {
if (_onNavigate) _onNavigate(btn.dataset.nav);
});
});
// Wire remove buttons
_panel.querySelectorAll('[data-remove]').forEach(btn => {
btn.addEventListener('click', () => _removeConnection(btn.dataset.remove));
});
// Wire add buttons
_panel.querySelectorAll('[data-add]').forEach(btn => {
btn.addEventListener('click', () => _addConnection(btn.dataset.add));
});
// Wire hover highlight for connection items
_panel.querySelectorAll('.mc-conn-item').forEach(item => {
item.addEventListener('mouseenter', () => _highlightConnection(item.dataset.memid));
item.addEventListener('mouseleave', _clearConnectionHighlight);
});
_panel.style.display = 'flex';
requestAnimationFrame(() => _panel.classList.add('mc-visible'));
}
// ─── HIDE ────────────────────────────────────────────────
function hide() {
if (!_panel) return;
_clearConnectionHighlight();
_panel.classList.remove('mc-visible');
const onEnd = () => {
_panel.style.display = 'none';
_panel.removeEventListener('transitionend', onEnd);
};
_panel.addEventListener('transitionend', onEnd);
setTimeout(() => { if (_panel) _panel.style.display = 'none'; }, 350);
_currentMemId = null;
}
// ─── SUGGESTION ENGINE ──────────────────────────────────
function _findSuggestions(memData, allMemories, connectedSet) {
if (!allMemories) return [];
const suggestions = [];
const pos = memData.position || [0, 0, 0];
const sameRegion = memData.category || 'working';
for (const m of allMemories) {
if (m.id === memData.id) continue;
if (connectedSet.has(m.id)) continue;
const mpos = m.position || [0, 0, 0];
const dist = Math.sqrt(
(pos[0] - mpos[0]) ** 2 +
(pos[1] - mpos[1]) ** 2 +
(pos[2] - mpos[2]) ** 2
);
// Categorize proximity
let proximity = 'nearby';
if (m.category === sameRegion) {
proximity = dist < 5 ? 'same region · close' : 'same region';
} else {
proximity = dist < 10 ? 'adjacent' : 'distant';
}
suggestions.push({ ...m, _dist: dist, _proximity: proximity });
}
// Sort: same region first, then by distance
suggestions.sort((a, b) => {
const aSame = a.category === sameRegion ? 0 : 1;
const bSame = b.category === sameRegion ? 0 : 1;
if (aSame !== bSame) return aSame - bSame;
return a._dist - b._dist;
});
return suggestions.slice(0, 8); // Cap at 8 suggestions
}
// ─── CONNECTION ACTIONS ─────────────────────────────────
function _addConnection(targetId) {
if (!_currentMemId) return;
// Get current memory data via SpatialMemory
const allMems = typeof SpatialMemory !== 'undefined' ? SpatialMemory.getAllMemories() : [];
const current = allMems.find(m => m.id === _currentMemId);
if (!current) return;
const conns = [...(current.connections || [])];
if (conns.includes(targetId)) return;
conns.push(targetId);
// Update SpatialMemory
if (typeof SpatialMemory !== 'undefined') {
SpatialMemory.updateMemory(_currentMemId, { connections: conns });
}
// Also create reverse connection on target
const target = allMems.find(m => m.id === targetId);
if (target) {
const targetConns = [...(target.connections || [])];
if (!targetConns.includes(_currentMemId)) {
targetConns.push(_currentMemId);
SpatialMemory.updateMemory(targetId, { connections: targetConns });
}
}
if (_onConnectionChange) _onConnectionChange(_currentMemId, conns);
// Re-render panel
const updatedMem = { ...current, connections: conns };
show(updatedMem, allMems);
}
function _removeConnection(targetId) {
if (!_currentMemId) return;
const allMems = typeof SpatialMemory !== 'undefined' ? SpatialMemory.getAllMemories() : [];
const current = allMems.find(m => m.id === _currentMemId);
if (!current) return;
const conns = (current.connections || []).filter(c => c !== targetId);
if (typeof SpatialMemory !== 'undefined') {
SpatialMemory.updateMemory(_currentMemId, { connections: conns });
}
// Also remove reverse connection
const target = allMems.find(m => m.id === targetId);
if (target) {
const targetConns = (target.connections || []).filter(c => c !== _currentMemId);
SpatialMemory.updateMemory(targetId, { connections: targetConns });
}
if (_onConnectionChange) _onConnectionChange(_currentMemId, conns);
const updatedMem = { ...current, connections: conns };
show(updatedMem, allMems);
}
// ─── 3D HIGHLIGHT ───────────────────────────────────────
function _highlightConnection(memId) {
_hoveredConnId = memId;
if (typeof SpatialMemory !== 'undefined') {
SpatialMemory.highlightMemory(memId);
}
}
function _clearConnectionHighlight() {
if (_hoveredConnId && typeof SpatialMemory !== 'undefined') {
SpatialMemory.clearHighlight();
}
_hoveredConnId = null;
}
// ─── HELPERS ────────────────────────────────────────────
function _esc(str) {
return String(str)
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;');
}
function _truncate(str, n) {
return str.length > n ? str.slice(0, n - 1) + '\u2026' : str;
}
function isOpen() {
return _panel != null && _panel.style.display !== 'none';
}
return { init, show, hide, isOpen };
})();
export { MemoryConnections };

View File

@@ -1,256 +0,0 @@
// ═══════════════════════════════════════════════════════════
// MNEMOSYNE — Memory Pulse
// ═══════════════════════════════════════════════════════════
//
// Visual pulse wave that radiates through the connection graph
// when a memory crystal is clicked. Illuminates linked memories
// by BFS hop distance — closer neighbors light up first.
//
// Usage from app.js:
// import { MemoryPulse } from './nexus/components/memory-pulse.js';
// MemoryPulse.init(scene);
// MemoryPulse.trigger(clickedMemId, SpatialMemory);
//
// Depends on: SpatialMemory (getAllMemories, getMemoryFromMesh)
// ═══════════════════════════════════════════════════════════
const MemoryPulse = (() => {
let _scene = null;
let _activePulses = []; // track running animations for cleanup
const HOP_DELAY = 300; // ms between each BFS hop wave
const GLOW_DURATION = 800; // ms each crystal glows at peak
const FADE_DURATION = 600; // ms to fade back to normal
const PULSE_COLOR = 0x4af0c0; // cyan-green pulse glow
const PULSE_INTENSITY = 6.0; // peak emissive during pulse
const RING_DURATION = 1200; // ms for the expanding ring effect
// ─── INIT ────────────────────────────────────────────────
function init(scene) {
_scene = scene;
}
// ─── BFS TRAVERSAL ───────────────────────────────────────
// Returns array of arrays: [[hop-0 ids], [hop-1 ids], [hop-2 ids], ...]
function bfsHops(startId, allMemories) {
const memMap = {};
for (const m of allMemories) {
memMap[m.id] = m;
}
if (!memMap[startId]) return [];
const visited = new Set([startId]);
const hops = [];
let frontier = [startId];
while (frontier.length > 0) {
hops.push([...frontier]);
const next = [];
for (const id of frontier) {
const mem = memMap[id];
if (!mem || !mem.connections) continue;
for (const connId of mem.connections) {
if (!visited.has(connId)) {
visited.add(connId);
next.push(connId);
}
}
}
frontier = next;
}
return hops;
}
// ─── EXPANDING RING ──────────────────────────────────────
// Creates a flat ring geometry that expands outward from a position
function createExpandingRing(position, color) {
const ringGeo = new THREE.RingGeometry(0.1, 0.2, 32);
const ringMat = new THREE.MeshBasicMaterial({
color: color,
transparent: true,
opacity: 0.8,
side: THREE.DoubleSide,
depthWrite: false
});
const ring = new THREE.Mesh(ringGeo, ringMat);
ring.position.copy(position);
ring.position.y += 0.1; // slightly above crystal
ring.rotation.x = -Math.PI / 2; // flat horizontal
ring.scale.set(0.1, 0.1, 0.1);
_scene.add(ring);
return ring;
}
// ─── ANIMATE RING ────────────────────────────────────────
function animateRing(ring, onComplete) {
const startTime = performance.now();
function tick() {
const elapsed = performance.now() - startTime;
const t = Math.min(1, elapsed / RING_DURATION);
// Expand outward
const scale = 0.1 + t * 4.0;
ring.scale.set(scale, scale, scale);
// Fade out
ring.material.opacity = 0.8 * (1 - t * t);
if (t < 1) {
requestAnimationFrame(tick);
} else {
_scene.remove(ring);
ring.geometry.dispose();
ring.material.dispose();
if (onComplete) onComplete();
}
}
requestAnimationFrame(tick);
}
// ─── PULSE CRYSTAL GLOW ──────────────────────────────────
// Temporarily boosts a crystal's emissive intensity
function pulseGlow(mesh, hopIndex) {
if (!mesh || !mesh.material) return;
const originalIntensity = mesh.material.emissiveIntensity;
const originalColor = mesh.material.emissive ? mesh.material.emissive.clone() : null;
const delay = hopIndex * HOP_DELAY;
setTimeout(() => {
if (!mesh.material) return;
// Store original for restore
const origInt = mesh.material.emissiveIntensity;
// Flash to pulse color
if (mesh.material.emissive) {
mesh.material.emissive.setHex(PULSE_COLOR);
}
mesh.material.emissiveIntensity = PULSE_INTENSITY;
// Also boost point light if present
let origLightIntensity = null;
let origLightColor = null;
if (mesh.children) {
for (const child of mesh.children) {
if (child.isPointLight) {
origLightIntensity = child.intensity;
origLightColor = child.color.clone();
child.intensity = 3.0;
child.color.setHex(PULSE_COLOR);
}
}
}
// Hold at peak, then fade
setTimeout(() => {
const fadeStart = performance.now();
function fadeTick() {
const elapsed = performance.now() - fadeStart;
const t = Math.min(1, elapsed / FADE_DURATION);
const eased = 1 - (1 - t) * (1 - t); // ease-out quad
mesh.material.emissiveIntensity = PULSE_INTENSITY + (origInt - PULSE_INTENSITY) * eased;
if (originalColor) {
const pr = ((PULSE_COLOR >> 16) & 0xff) / 255;
const pg = ((PULSE_COLOR >> 8) & 0xff) / 255;
const pb = (PULSE_COLOR & 0xff) / 255;
mesh.material.emissive.setRGB(
pr + (originalColor.r - pr) * eased,
pg + (originalColor.g - pg) * eased,
pb + (originalColor.b - pb) * eased
);
}
// Restore point light
if (origLightIntensity !== null && mesh.children) {
for (const child of mesh.children) {
if (child.isPointLight) {
child.intensity = 3.0 + (origLightIntensity - 3.0) * eased;
if (origLightColor) {
const pr = ((PULSE_COLOR >> 16) & 0xff) / 255;
const pg = ((PULSE_COLOR >> 8) & 0xff) / 255;
const pb = (PULSE_COLOR & 0xff) / 255;
child.color.setRGB(
pr + (origLightColor.r - pr) * eased,
pg + (origLightColor.g - pg) * eased,
pb + (origLightColor.b - pb) * eased
);
}
}
}
}
if (t < 1) {
requestAnimationFrame(fadeTick);
}
}
requestAnimationFrame(fadeTick);
}, GLOW_DURATION);
}, delay);
}
// ─── TRIGGER ─────────────────────────────────────────────
// Main entry point: fire a pulse wave from the given memory ID
function trigger(memId, spatialMemory) {
if (!_scene) return;
const allMemories = spatialMemory.getAllMemories();
const hops = bfsHops(memId, allMemories);
if (hops.length <= 1) {
// No connections — just do a local ring
const obj = spatialMemory.getMemoryFromMesh(
spatialMemory.getCrystalMeshes().find(m => m.userData.memId === memId)
);
if (obj && obj.mesh) {
const ring = createExpandingRing(obj.mesh.position, PULSE_COLOR);
animateRing(ring);
}
return;
}
// For each hop level, create expanding rings and pulse glows
for (let hopIdx = 0; hopIdx < hops.length; hopIdx++) {
const idsInHop = hops[hopIdx];
for (const id of idsInHop) {
// Find mesh for this memory
const meshes = spatialMemory.getCrystalMeshes();
let targetMesh = null;
for (const m of meshes) {
if (m.userData && m.userData.memId === id) {
targetMesh = m;
break;
}
}
if (!targetMesh) continue;
// Schedule pulse glow
pulseGlow(targetMesh, hopIdx);
// Create expanding ring at this hop's delay
((mesh, delay) => {
setTimeout(() => {
const ring = createExpandingRing(mesh.position, PULSE_COLOR);
animateRing(ring);
}, delay * HOP_DELAY);
})(targetMesh, hopIdx);
}
}
}
// ─── CLEANUP ─────────────────────────────────────────────
function dispose() {
// Active pulses will self-clean via their animation callbacks
_activePulses = [];
}
return { init, trigger, dispose, bfsHops };
})();
export { MemoryPulse };

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,193 +0,0 @@
# ═══════════════════════════════════════════════════════════════
# FEATURES.yaml — Mnemosyne Module Manifest
# ═══════════════════════════════════════════════════════════════
#
# Single source of truth for what exists, what's planned, and
# who owns what. Agents and humans MUST check this before
# creating new PRs for Mnemosyne features.
#
# Statuses: shipped | in-progress | planned | deprecated
# Canon path: nexus/mnemosyne/
#
# Parent epic: #1248 (IaC Workflow)
# Created: 2026-04-12
# ═══════════════════════════════════════════════════════════════
project: mnemosyne
canon_path: nexus/mnemosyne/
description: The Living Holographic Archive — memory persistence, search, and graph analysis
# ─── Backend Modules ───────────────────────────────────────
modules:
archive:
status: shipped
files: [archive.py]
description: Core MnemosyneArchive class — CRUD, search, graph analysis
features:
- add / get / remove entries
- keyword search (substring match)
- semantic search (Jaccard + link-boost via HolographicLinker)
- linked entry traversal (BFS by depth)
- topic filtering and counts
- export (JSON/Markdown)
- graph data export (nodes + edges for 3D viz)
- graph clusters (connected components)
- hub entries (highest degree centrality)
- bridge entries (articulation points via DFS)
- tag management (add_tags, remove_tags, retag)
- entry update with content dedup (content_hash)
- find_duplicate (content hash matching)
- temporal queries (by_date_range, temporal_neighbors)
- rebuild_links (re-run linker across all entries)
merged_prs:
- "#1217" # Phase 1 foundation
- "#1225" # Semantic search
- "#1220" # Export, deletion, richer stats
- "#1234" # Graph clusters, hubs, bridges
- "#1238" # Tag management
- "#1241" # Entry update + content dedup
- "#1246" # Temporal queries
entry:
status: shipped
files: [entry.py]
description: ArchiveEntry dataclass — id, title, content, topics, links, timestamps, content_hash
ingest:
status: shipped
files: [ingest.py]
description: Document ingestion pipeline — chunking, dedup, auto-linking
linker:
status: shipped
files: [linker.py]
description: HolographicLinker — Jaccard token similarity, auto-link discovery
cli:
status: shipped
files: [cli.py]
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors, consolidate
tests:
status: shipped
files:
- tests/__init__.py
- tests/test_archive.py
- tests/test_graph_clusters.py
description: Test suite covering archive CRUD, search, graph analysis, clusters
# ─── Frontend Components ───────────────────────────────────
# Located in nexus/components/ (shared with other Nexus features)
frontend:
spatial_memory:
status: shipped
files: [nexus/components/spatial-memory.js]
description: 3D memory crystal rendering and spatial layout
memory_search:
status: shipped
files: [nexus/components/spatial-memory.js]
description: searchByContent() — text search through holographic archive
merged_prs:
- "#1201" # Spatial search
memory_filter:
status: shipped
files: [] # inline in index.html
description: Toggle memory categories by region
merged_prs:
- "#1213"
memory_inspector:
status: shipped
files: [nexus/components/memory-inspect.js]
description: Click-to-inspect detail panel for memory crystals
merged_prs:
- "#1229"
memory_connections:
status: shipped
files: [nexus/components/memory-connections.js]
description: Browse, add, remove memory relationships panel
merged_prs:
- "#1247"
memory_birth:
status: shipped
files: [nexus/components/memory-birth.js]
description: Birth animation when new memories are created
merged_prs:
- "#1222"
memory_particles:
status: shipped
files: [nexus/components/memory-particles.js]
description: Ambient particle system — memory activity visualization
merged_prs:
- "#1205"
memory_optimizer:
status: shipped
files: [nexus/components/memory-optimizer.js]
description: Performance optimization for large memory sets
timeline_scrubber:
status: shipped
files: [nexus/components/timeline-scrubber.js]
description: Temporal navigation scrubber for memory timeline
health_dashboard:
status: shipped
files: [] # overlay in index.html
description: Archive statistics overlay panel
merged_prs:
- "#1211"
# ─── Planned / Unshipped ──────────────────────────────────
planned:
memory_decay:
status: shipped
files: [entry.py, archive.py]
description: >
Memories have living energy that fades with neglect and
brightens with access. Vitality score based on access
frequency and recency. Exponential decay with 30-day half-life.
Touch boost with diminishing returns.
priority: medium
merged_prs:
- "#TBD" # Will be filled when PR is created
memory_pulse:
status: planned
description: >
Visual pulse wave radiates through connection graph when
a crystal is clicked, illuminating linked memories by BFS
hop distance. Was attempted in PR #1226 — needs rebasing.
priority: medium
embedding_backend:
status: shipped
files: [embeddings.py]
description: >
Pluggable embedding backend for true semantic search.
Supports Ollama (local models) and TF-IDF fallback.
Auto-detects best available backend.
priority: high
merged_prs:
- "#TBD" # Will be filled when PR is created
memory_consolidation:
status: shipped
files: [archive.py, cli.py, tests/test_consolidation.py]
description: >
Automatic merging of duplicate/near-duplicate memories
using content_hash and semantic similarity. Periodic
consolidation pass.
priority: low
merged_prs:
- "#1260"

View File

@@ -14,12 +14,6 @@ from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.linker import HolographicLinker
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event
from nexus.mnemosyne.embeddings import (
EmbeddingBackend,
OllamaEmbeddingBackend,
TfidfEmbeddingBackend,
get_embedding_backend,
)
__all__ = [
"MnemosyneArchive",
@@ -27,8 +21,4 @@ __all__ = [
"HolographicLinker",
"ingest_from_mempalace",
"ingest_event",
"EmbeddingBackend",
"OllamaEmbeddingBackend",
"TfidfEmbeddingBackend",
"get_embedding_backend",
]

View File

@@ -7,13 +7,12 @@ and provides query interfaces for retrieving connected knowledge.
from __future__ import annotations
import json
from datetime import datetime, timedelta, timezone
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from nexus.mnemosyne.entry import ArchiveEntry, _compute_content_hash
from nexus.mnemosyne.linker import HolographicLinker
from nexus.mnemosyne.embeddings import get_embedding_backend, EmbeddingBackend
_EXPORT_VERSION = "1"
@@ -25,21 +24,10 @@ class MnemosyneArchive:
MemPalace (ChromaDB) for vector-semantic search.
"""
def __init__(
self,
archive_path: Optional[Path] = None,
embedding_backend: Optional[EmbeddingBackend] = None,
auto_embed: bool = True,
):
def __init__(self, archive_path: Optional[Path] = None):
self.path = archive_path or Path.home() / ".hermes" / "mnemosyne" / "archive.json"
self.path.parent.mkdir(parents=True, exist_ok=True)
self._embedding_backend = embedding_backend
if embedding_backend is None and auto_embed:
try:
self._embedding_backend = get_embedding_backend()
except Exception:
self._embedding_backend = None
self.linker = HolographicLinker(embedding_backend=self._embedding_backend)
self.linker = HolographicLinker()
self._entries: dict[str, ArchiveEntry] = {}
self._load()
@@ -155,51 +143,33 @@ class MnemosyneArchive:
return [e for _, e in scored[:limit]]
def semantic_search(self, query: str, limit: int = 10, threshold: float = 0.05) -> list[ArchiveEntry]:
"""Semantic search using embeddings or holographic linker similarity.
"""Semantic search using holographic linker similarity.
With an embedding backend: cosine similarity between query vector and
entry vectors, boosted by inbound link count.
Without: Jaccard similarity on tokens with link boost.
Falls back to keyword search if nothing meets the threshold.
Scores each entry by Jaccard similarity between query tokens and entry
tokens, then boosts entries with more inbound links (more "holographic").
Falls back to keyword search if no entries meet the similarity threshold.
Args:
query: Natural language query string.
limit: Maximum number of results to return.
threshold: Minimum similarity score to include in results.
threshold: Minimum Jaccard similarity to be considered a semantic match.
Returns:
List of ArchiveEntry sorted by combined relevance score, descending.
"""
# Count inbound links for link-boost
query_tokens = HolographicLinker._tokenize(query)
if not query_tokens:
return []
# Count inbound links for each entry (how many entries link TO this one)
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
for entry in self._entries.values():
for linked_id in entry.links:
if linked_id in inbound:
inbound[linked_id] += 1
max_inbound = max(inbound.values(), default=1) or 1
# Try embedding-based search first
if self._embedding_backend:
query_vec = self._embedding_backend.embed(query)
if query_vec:
scored = []
for entry in self._entries.values():
text = f"{entry.title} {entry.content} {' '.join(entry.topics)}"
entry_vec = self._embedding_backend.embed(text)
if not entry_vec:
continue
sim = self._embedding_backend.similarity(query_vec, entry_vec)
if sim >= threshold:
link_boost = inbound[entry.id] / max_inbound * 0.15
scored.append((sim + link_boost, entry))
if scored:
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# Fallback: Jaccard token similarity
query_tokens = HolographicLinker._tokenize(query)
if not query_tokens:
return []
scored = []
for entry in self._entries.values():
entry_tokens = HolographicLinker._tokenize(f"{entry.title} {entry.content} {' '.join(entry.topics)}")
@@ -209,13 +179,14 @@ class MnemosyneArchive:
union = query_tokens | entry_tokens
jaccard = len(intersection) / len(union)
if jaccard >= threshold:
link_boost = inbound[entry.id] / max_inbound * 0.2
link_boost = inbound[entry.id] / max_inbound * 0.2 # up to 20% boost
scored.append((jaccard + link_boost, entry))
if scored:
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# Final fallback: keyword search
# Graceful fallback to keyword search
return self.search(query, limit=limit)
def get_linked(self, entry_id: str, depth: int = 1) -> list[ArchiveEntry]:
@@ -389,17 +360,6 @@ class MnemosyneArchive:
oldest_entry = timestamps[0] if timestamps else None
newest_entry = timestamps[-1] if timestamps else None
# Vitality summary
if n > 0:
vitalities = [self._compute_vitality(e) for e in entries]
avg_vitality = round(sum(vitalities) / n, 4)
fading_count = sum(1 for v in vitalities if v < 0.3)
vibrant_count = sum(1 for v in vitalities if v > 0.7)
else:
avg_vitality = 0.0
fading_count = 0
vibrant_count = 0
return {
"entries": n,
"total_links": total_links,
@@ -409,9 +369,6 @@ class MnemosyneArchive:
"link_density": link_density,
"oldest_entry": oldest_entry,
"newest_entry": newest_entry,
"avg_vitality": avg_vitality,
"fading_count": fading_count,
"vibrant_count": vibrant_count,
}
def _build_adjacency(self) -> dict[str, set[str]]:
@@ -694,371 +651,6 @@ class MnemosyneArchive:
self._save()
return entry
@staticmethod
def _parse_dt(dt_str: str) -> datetime:
"""Parse an ISO datetime string. Assumes UTC if no timezone is specified."""
dt = datetime.fromisoformat(dt_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
def by_date_range(self, start: str, end: str) -> list[ArchiveEntry]:
"""Return entries whose ``created_at`` falls within [start, end] (inclusive).
Args:
start: ISO datetime string for the range start (e.g. "2024-01-01" or
"2024-01-01T00:00:00Z"). Timezone-naive strings are treated as UTC.
end: ISO datetime string for the range end. Timezone-naive strings are
treated as UTC.
Returns:
List of ArchiveEntry sorted by ``created_at`` ascending.
"""
start_dt = self._parse_dt(start)
end_dt = self._parse_dt(end)
results = []
for entry in self._entries.values():
entry_dt = self._parse_dt(entry.created_at)
if start_dt <= entry_dt <= end_dt:
results.append(entry)
results.sort(key=lambda e: e.created_at)
return results
def temporal_neighbors(self, entry_id: str, window_days: int = 7) -> list[ArchiveEntry]:
"""Return entries created within ``window_days`` of a given entry.
The reference entry itself is excluded from results.
Args:
entry_id: ID of the anchor entry.
window_days: Number of days around the anchor's ``created_at`` to search.
Returns:
List of ArchiveEntry sorted by ``created_at`` ascending.
Raises:
KeyError: If ``entry_id`` does not exist in the archive.
"""
anchor = self._entries.get(entry_id)
if anchor is None:
raise KeyError(entry_id)
anchor_dt = self._parse_dt(anchor.created_at)
delta = timedelta(days=window_days)
window_start = anchor_dt - delta
window_end = anchor_dt + delta
results = []
for entry in self._entries.values():
if entry.id == entry_id:
continue
entry_dt = self._parse_dt(entry.created_at)
if window_start <= entry_dt <= window_end:
results.append(entry)
results.sort(key=lambda e: e.created_at)
return results
# ─── Memory Decay ─────────────────────────────────────────
# Decay parameters
_DECAY_HALF_LIFE_DAYS: float = 30.0 # Half-life for exponential decay
_TOUCH_BOOST_FACTOR: float = 0.1 # Base boost on access (diminishes as vitality → 1.0)
def touch(self, entry_id: str) -> ArchiveEntry:
"""Record an access to an entry, boosting its vitality.
The boost is ``_TOUCH_BOOST_FACTOR * (1 - current_vitality)`` —
diminishing returns as vitality approaches 1.0 ensures entries
can never exceed 1.0 through touch alone.
Args:
entry_id: ID of the entry to touch.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
now = datetime.now(timezone.utc).isoformat()
# Compute current decayed vitality before boosting
current = self._compute_vitality(entry)
boost = self._TOUCH_BOOST_FACTOR * (1.0 - current)
entry.vitality = min(1.0, current + boost)
entry.last_accessed = now
self._save()
return entry
def _compute_vitality(self, entry: ArchiveEntry) -> float:
"""Compute the current vitality of an entry based on time decay.
Uses exponential decay: ``v = base * 0.5 ^ (hours_since_access / half_life_hours)``
If the entry has never been accessed, uses ``created_at`` as the
reference point. New entries with no access start at full vitality.
Args:
entry: The archive entry.
Returns:
Current vitality as a float in [0.0, 1.0].
"""
if entry.last_accessed is None:
# Never accessed — check age from creation
created = self._parse_dt(entry.created_at)
hours_elapsed = (datetime.now(timezone.utc) - created).total_seconds() / 3600
else:
last = self._parse_dt(entry.last_accessed)
hours_elapsed = (datetime.now(timezone.utc) - last).total_seconds() / 3600
half_life_hours = self._DECAY_HALF_LIFE_DAYS * 24
if hours_elapsed <= 0 or half_life_hours <= 0:
return entry.vitality
decayed = entry.vitality * (0.5 ** (hours_elapsed / half_life_hours))
return max(0.0, min(1.0, decayed))
def get_vitality(self, entry_id: str) -> dict:
"""Get the current vitality status of an entry.
Args:
entry_id: ID of the entry.
Returns:
Dict with keys: entry_id, title, vitality, last_accessed, age_days
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
current_vitality = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
return {
"entry_id": entry.id,
"title": entry.title,
"vitality": round(current_vitality, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
}
def fading(self, limit: int = 10) -> list[dict]:
"""Return entries with the lowest vitality (most neglected).
Args:
limit: Maximum number of entries to return.
Returns:
List of dicts sorted by vitality ascending (most faded first).
Each dict has keys: entry_id, title, vitality, last_accessed, age_days
"""
scored = []
for entry in self._entries.values():
v = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
scored.append({
"entry_id": entry.id,
"title": entry.title,
"vitality": round(v, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
})
scored.sort(key=lambda x: x["vitality"])
return scored[:limit]
def vibrant(self, limit: int = 10) -> list[dict]:
"""Return entries with the highest vitality (most alive).
Args:
limit: Maximum number of entries to return.
Returns:
List of dicts sorted by vitality descending (most vibrant first).
Each dict has keys: entry_id, title, vitality, last_accessed, age_days
"""
scored = []
for entry in self._entries.values():
v = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
scored.append({
"entry_id": entry.id,
"title": entry.title,
"vitality": round(v, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
})
scored.sort(key=lambda x: x["vitality"], reverse=True)
return scored[:limit]
def apply_decay(self) -> dict:
"""Apply time-based decay to all entries and persist.
Recomputes each entry's vitality based on elapsed time since
its last access (or creation if never accessed). Saves the
archive after updating.
Returns:
Dict with keys: total_entries, decayed_count, avg_vitality,
fading_count (entries below 0.3), vibrant_count (entries above 0.7)
"""
decayed = 0
total_vitality = 0.0
fading_count = 0
vibrant_count = 0
for entry in self._entries.values():
old_v = entry.vitality
new_v = self._compute_vitality(entry)
if abs(new_v - old_v) > 1e-6:
entry.vitality = new_v
decayed += 1
total_vitality += entry.vitality
if entry.vitality < 0.3:
fading_count += 1
if entry.vitality > 0.7:
vibrant_count += 1
n = len(self._entries)
self._save()
return {
"total_entries": n,
"decayed_count": decayed,
"avg_vitality": round(total_vitality / n, 4) if n else 0.0,
"fading_count": fading_count,
"vibrant_count": vibrant_count,
}
def consolidate(
self,
threshold: float = 0.9,
dry_run: bool = False,
) -> list[dict]:
"""Scan the archive and merge duplicate/near-duplicate entries.
Two entries are considered duplicates if:
- They share the same ``content_hash`` (exact duplicate), or
- Their similarity score (via HolographicLinker) exceeds ``threshold``
(near-duplicate when an embedding backend is available or Jaccard is
high enough at the given threshold).
Merge strategy:
- Keep the *older* entry (earlier ``created_at``).
- Union topics from both entries (case-deduped).
- Merge metadata from newer into older (older values win on conflicts).
- Transfer all links from the newer entry to the older entry.
- Delete the newer entry.
Args:
threshold: Similarity threshold for near-duplicate detection (0.01.0).
Default 0.9 is intentionally conservative.
dry_run: If True, return the list of would-be merges without mutating
the archive.
Returns:
List of dicts, one per merged pair::
{
"kept": <entry_id of survivor>,
"removed": <entry_id of duplicate>,
"reason": "exact_hash" | "semantic_similarity",
"score": float, # 1.0 for exact hash matches
"dry_run": bool,
}
"""
merges: list[dict] = []
entries = list(self._entries.values())
removed_ids: set[str] = set()
for i, entry_a in enumerate(entries):
if entry_a.id in removed_ids:
continue
for entry_b in entries[i + 1:]:
if entry_b.id in removed_ids:
continue
# Determine if they are duplicates
reason: Optional[str] = None
score: float = 0.0
if (
entry_a.content_hash is not None
and entry_b.content_hash is not None
and entry_a.content_hash == entry_b.content_hash
):
reason = "exact_hash"
score = 1.0
else:
sim = self.linker.compute_similarity(entry_a, entry_b)
if sim >= threshold:
reason = "semantic_similarity"
score = sim
if reason is None:
continue
# Decide which entry to keep (older survives)
if entry_a.created_at <= entry_b.created_at:
kept, removed = entry_a, entry_b
else:
kept, removed = entry_b, entry_a
merges.append({
"kept": kept.id,
"removed": removed.id,
"reason": reason,
"score": round(score, 4),
"dry_run": dry_run,
})
if not dry_run:
# Merge topics (case-deduped)
existing_lower = {t.lower() for t in kept.topics}
for tag in removed.topics:
if tag.lower() not in existing_lower:
kept.topics.append(tag)
existing_lower.add(tag.lower())
# Merge metadata (kept wins on key conflicts)
for k, v in removed.metadata.items():
if k not in kept.metadata:
kept.metadata[k] = v
# Transfer links: add removed's links to kept
kept_links_set = set(kept.links)
for lid in removed.links:
if lid != kept.id and lid not in kept_links_set and lid not in removed_ids:
kept.links.append(lid)
kept_links_set.add(lid)
# Update the other entry's back-link
other = self._entries.get(lid)
if other and kept.id not in other.links:
other.links.append(kept.id)
# Remove back-links pointing at the removed entry
for other in self._entries.values():
if removed.id in other.links:
other.links.remove(removed.id)
if other.id != kept.id and kept.id not in other.links:
other.links.append(kept.id)
del self._entries[removed.id]
removed_ids.add(removed.id)
if not dry_run and merges:
self._save()
return merges
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.

View File

@@ -3,8 +3,7 @@
Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne topics, mnemosyne remove, mnemosyne export,
mnemosyne clusters, mnemosyne hubs, mnemosyne bridges, mnemosyne rebuild,
mnemosyne tag, mnemosyne untag, mnemosyne retag,
mnemosyne timeline, mnemosyne neighbors
mnemosyne tag, mnemosyne untag, mnemosyne retag
"""
from __future__ import annotations
@@ -25,16 +24,7 @@ def cmd_stats(args):
def cmd_search(args):
from nexus.mnemosyne.embeddings import get_embedding_backend
backend = None
if getattr(args, "backend", "auto") != "auto":
backend = get_embedding_backend(prefer=args.backend)
elif getattr(args, "semantic", False):
try:
backend = get_embedding_backend()
except Exception:
pass
archive = MnemosyneArchive(embedding_backend=backend)
archive = MnemosyneArchive()
if getattr(args, "semantic", False):
results = archive.semantic_search(args.query, limit=args.limit)
else:
@@ -190,55 +180,6 @@ def cmd_retag(args):
print(f" Topics: {', '.join(entry.topics) if entry.topics else '(none)'}")
def cmd_timeline(args):
archive = MnemosyneArchive()
try:
results = archive.by_date_range(args.start, args.end)
except ValueError as e:
print(f"Invalid date format: {e}")
sys.exit(1)
if not results:
print("No entries found in that date range.")
return
for entry in results:
print(f"[{entry.id[:8]}] {entry.created_at[:10]} {entry.title}")
print(f" Topics: {', '.join(entry.topics) if entry.topics else '(none)'}")
print()
def cmd_consolidate(args):
archive = MnemosyneArchive()
merges = archive.consolidate(threshold=args.threshold, dry_run=args.dry_run)
if not merges:
print("No duplicates found.")
return
label = "[DRY RUN] " if args.dry_run else ""
for m in merges:
print(f"{label}Merge ({m['reason']}, score={m['score']:.4f}):")
print(f" kept: {m['kept'][:8]}")
print(f" removed: {m['removed'][:8]}")
if args.dry_run:
print(f"\n{len(merges)} pair(s) would be merged. Re-run without --dry-run to apply.")
else:
print(f"\nMerged {len(merges)} duplicate pair(s).")
def cmd_neighbors(args):
archive = MnemosyneArchive()
try:
results = archive.temporal_neighbors(args.entry_id, window_days=args.days)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
if not results:
print("No temporal neighbors found.")
return
for entry in results:
print(f"[{entry.id[:8]}] {entry.created_at[:10]} {entry.title}")
print(f" Topics: {', '.join(entry.topics) if entry.topics else '(none)'}")
print()
def main():
parser = argparse.ArgumentParser(prog="mnemosyne", description="The Living Holographic Archive")
sub = parser.add_subparsers(dest="command")
@@ -292,18 +233,6 @@ def main():
rt.add_argument("entry_id", help="Entry ID")
rt.add_argument("tags", help="Comma-separated new tag list")
tl = sub.add_parser("timeline", help="Show entries within an ISO date range")
tl.add_argument("start", help="Start datetime (ISO format, e.g. 2024-01-01 or 2024-01-01T00:00:00Z)")
tl.add_argument("end", help="End datetime (ISO format)")
nb = sub.add_parser("neighbors", help="Show entries temporally near a given entry")
nb.add_argument("entry_id", help="Anchor entry ID")
nb.add_argument("--days", type=int, default=7, help="Window in days (default: 7)")
co = sub.add_parser("consolidate", help="Merge duplicate/near-duplicate entries")
co.add_argument("--dry-run", action="store_true", help="Show what would be merged without applying")
co.add_argument("--threshold", type=float, default=0.9, help="Similarity threshold (default: 0.9)")
args = parser.parse_args()
if not args.command:
parser.print_help()
@@ -324,9 +253,6 @@ def main():
"tag": cmd_tag,
"untag": cmd_untag,
"retag": cmd_retag,
"timeline": cmd_timeline,
"neighbors": cmd_neighbors,
"consolidate": cmd_consolidate,
}
dispatch[args.command](args)

View File

@@ -1,170 +0,0 @@
"""Pluggable embedding backends for Mnemosyne semantic search.
Provides an abstract EmbeddingBackend interface and concrete implementations:
- OllamaEmbeddingBackend: local models via Ollama (sovereign, no cloud)
- TfidfEmbeddingBackend: pure-Python TF-IDF fallback (no dependencies)
Usage:
from nexus.mnemosyne.embeddings import get_embedding_backend
backend = get_embedding_backend() # auto-detects best available
vec = backend.embed("hello world")
score = backend.similarity(vec_a, vec_b)
"""
from __future__ import annotations
import abc, json, math, os, re, urllib.request
from typing import Optional
class EmbeddingBackend(abc.ABC):
"""Abstract interface for embedding-based similarity."""
@abc.abstractmethod
def embed(self, text: str) -> list[float]:
"""Return an embedding vector for the given text."""
@abc.abstractmethod
def similarity(self, a: list[float], b: list[float]) -> float:
"""Return cosine similarity between two vectors, in [0, 1]."""
@property
def name(self) -> str:
return self.__class__.__name__
@property
def dimension(self) -> int:
return 0
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Cosine similarity between two vectors."""
if len(a) != len(b):
raise ValueError(f"Vector dimension mismatch: {len(a)} vs {len(b)}")
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x * x for x in a))
norm_b = math.sqrt(sum(x * x for x in b))
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
class OllamaEmbeddingBackend(EmbeddingBackend):
"""Embedding backend using a local Ollama instance.
Default model: nomic-embed-text (768 dims)."""
def __init__(self, base_url: str | None = None, model: str | None = None):
self.base_url = base_url or os.environ.get("OLLAMA_URL", "http://localhost:11434")
self.model = model or os.environ.get("MNEMOSYNE_EMBED_MODEL", "nomic-embed-text")
self._dim: int = 0
self._available: bool | None = None
def _check_available(self) -> bool:
if self._available is not None:
return self._available
try:
req = urllib.request.Request(f"{self.base_url}/api/tags", method="GET")
resp = urllib.request.urlopen(req, timeout=3)
tags = json.loads(resp.read())
models = [m["name"].split(":")[0] for m in tags.get("models", [])]
self._available = any(self.model in m for m in models)
except Exception:
self._available = False
return self._available
@property
def name(self) -> str:
return f"Ollama({self.model})"
@property
def dimension(self) -> int:
return self._dim
def embed(self, text: str) -> list[float]:
if not self._check_available():
raise RuntimeError(f"Ollama not available or model {self.model} not found")
data = json.dumps({"model": self.model, "prompt": text}).encode()
req = urllib.request.Request(
f"{self.base_url}/api/embeddings", data=data,
headers={"Content-Type": "application/json"}, method="POST")
resp = urllib.request.urlopen(req, timeout=30)
result = json.loads(resp.read())
vec = result.get("embedding", [])
if vec:
self._dim = len(vec)
return vec
def similarity(self, a: list[float], b: list[float]) -> float:
raw = cosine_similarity(a, b)
return (raw + 1.0) / 2.0
class TfidfEmbeddingBackend(EmbeddingBackend):
"""Pure-Python TF-IDF embedding. No dependencies. Always available."""
def __init__(self):
self._vocab: dict[str, int] = {}
self._idf: dict[str, float] = {}
self._doc_count: int = 0
self._doc_freq: dict[str, int] = {}
@property
def name(self) -> str:
return "TF-IDF (local)"
@property
def dimension(self) -> int:
return len(self._vocab)
@staticmethod
def _tokenize(text: str) -> list[str]:
return [t for t in re.findall(r"\w+", text.lower()) if len(t) > 2]
def _update_idf(self, tokens: list[str]):
self._doc_count += 1
for t in set(tokens):
self._doc_freq[t] = self._doc_freq.get(t, 0) + 1
for t, df in self._doc_freq.items():
self._idf[t] = math.log((self._doc_count + 1) / (df + 1)) + 1.0
def embed(self, text: str) -> list[float]:
tokens = self._tokenize(text)
if not tokens:
return []
for t in tokens:
if t not in self._vocab:
self._vocab[t] = len(self._vocab)
self._update_idf(tokens)
dim = len(self._vocab)
vec = [0.0] * dim
tf = {}
for t in tokens:
tf[t] = tf.get(t, 0) + 1
for t, count in tf.items():
vec[self._vocab[t]] = (count / len(tokens)) * self._idf.get(t, 1.0)
norm = math.sqrt(sum(v * v for v in vec))
if norm > 0:
vec = [v / norm for v in vec]
return vec
def similarity(self, a: list[float], b: list[float]) -> float:
if len(a) != len(b):
mx = max(len(a), len(b))
a = a + [0.0] * (mx - len(a))
b = b + [0.0] * (mx - len(b))
return max(0.0, cosine_similarity(a, b))
def get_embedding_backend(prefer: str | None = None, ollama_url: str | None = None,
model: str | None = None) -> EmbeddingBackend:
"""Auto-detect best available embedding backend. Priority: Ollama > TF-IDF."""
env_pref = os.environ.get("MNEMOSYNE_EMBED_BACKEND")
effective = prefer or env_pref
if effective == "tfidf":
return TfidfEmbeddingBackend()
if effective in (None, "ollama"):
ollama = OllamaEmbeddingBackend(base_url=ollama_url, model=model)
if ollama._check_available():
return ollama
if effective == "ollama":
raise RuntimeError("Ollama backend requested but not available")
return TfidfEmbeddingBackend()

View File

@@ -34,8 +34,6 @@ class ArchiveEntry:
updated_at: Optional[str] = None # Set on mutation; None means same as created_at
links: list[str] = field(default_factory=list) # IDs of related entries
content_hash: Optional[str] = None # SHA-256 of title+content for dedup
vitality: float = 1.0 # 0.0 (dead) to 1.0 (fully alive)
last_accessed: Optional[str] = None # ISO datetime of last access; None = never accessed
def __post_init__(self):
if self.content_hash is None:
@@ -54,8 +52,6 @@ class ArchiveEntry:
"updated_at": self.updated_at,
"links": self.links,
"content_hash": self.content_hash,
"vitality": self.vitality,
"last_accessed": self.last_accessed,
}
@classmethod

View File

@@ -2,63 +2,31 @@
Computes semantic similarity between archive entries and creates
bidirectional links, forming the holographic graph structure.
Supports pluggable embedding backends for true semantic search.
Falls back to Jaccard token similarity when no backend is available.
"""
from __future__ import annotations
from typing import Optional, TYPE_CHECKING
from typing import Optional
from nexus.mnemosyne.entry import ArchiveEntry
if TYPE_CHECKING:
from nexus.mnemosyne.embeddings import EmbeddingBackend
class HolographicLinker:
"""Links archive entries via semantic similarity.
With an embedding backend: cosine similarity on vectors.
Without: Jaccard similarity on token sets (legacy fallback).
Phase 1 uses simple keyword overlap as the similarity metric.
Phase 2 will integrate ChromaDB embeddings from MemPalace.
"""
def __init__(
self,
similarity_threshold: float = 0.15,
embedding_backend: Optional["EmbeddingBackend"] = None,
):
def __init__(self, similarity_threshold: float = 0.15):
self.threshold = similarity_threshold
self._backend = embedding_backend
self._embed_cache: dict[str, list[float]] = {}
@property
def using_embeddings(self) -> bool:
return self._backend is not None
def _get_embedding(self, entry: ArchiveEntry) -> list[float]:
"""Get or compute cached embedding for an entry."""
if entry.id in self._embed_cache:
return self._embed_cache[entry.id]
text = f"{entry.title} {entry.content}"
vec = self._backend.embed(text) if self._backend else []
if vec:
self._embed_cache[entry.id] = vec
return vec
def compute_similarity(self, a: ArchiveEntry, b: ArchiveEntry) -> float:
"""Compute similarity score between two entries.
Returns float in [0, 1]. Uses embedding cosine similarity if
a backend is configured, otherwise falls back to Jaccard.
Returns float in [0, 1]. Phase 1: Jaccard similarity on
combined title+content tokens. Phase 2: cosine similarity
on ChromaDB embeddings.
"""
if self._backend:
vec_a = self._get_embedding(a)
vec_b = self._get_embedding(b)
if vec_a and vec_b:
return self._backend.similarity(vec_a, vec_b)
# Fallback: Jaccard on tokens
tokens_a = self._tokenize(f"{a.title} {a.content}")
tokens_b = self._tokenize(f"{b.title} {b.content}")
if not tokens_a or not tokens_b:
@@ -67,10 +35,11 @@ class HolographicLinker:
union = tokens_a | tokens_b
return len(intersection) / len(union)
def find_links(
self, entry: ArchiveEntry, candidates: list[ArchiveEntry]
) -> list[tuple[str, float]]:
"""Find entries worth linking to. Returns (entry_id, score) tuples."""
def find_links(self, entry: ArchiveEntry, candidates: list[ArchiveEntry]) -> list[tuple[str, float]]:
"""Find entries worth linking to.
Returns list of (entry_id, similarity_score) tuples above threshold.
"""
results = []
for candidate in candidates:
if candidate.id == entry.id:
@@ -89,18 +58,16 @@ class HolographicLinker:
if eid not in entry.links:
entry.links.append(eid)
new_links += 1
# Bidirectional
for c in candidates:
if c.id == eid and entry.id not in c.links:
c.links.append(entry.id)
return new_links
def clear_cache(self):
"""Clear embedding cache (call after bulk entry changes)."""
self._embed_cache.clear()
@staticmethod
def _tokenize(text: str) -> set[str]:
"""Simple whitespace + punctuation tokenizer."""
import re
tokens = set(re.findall(r"\w+", text.lower()))
# Remove very short tokens
return {t for t in tokens if len(t) > 2}

View File

@@ -2,7 +2,6 @@
import json
import tempfile
from datetime import datetime, timezone, timedelta
from pathlib import Path
from nexus.mnemosyne.entry import ArchiveEntry
@@ -667,189 +666,3 @@ def test_update_entry_no_change_no_crash():
e = ingest_event(archive, title="T", content="c")
result = archive.update_entry(e.id)
assert result.title == "T"
# --- by_date_range tests ---
def _make_entry_at(archive: MnemosyneArchive, title: str, dt: datetime) -> ArchiveEntry:
"""Helper: ingest an entry and backdate its created_at."""
e = ingest_event(archive, title=title, content=title)
e.created_at = dt.isoformat()
archive._save()
return e
def test_by_date_range_empty_archive():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
results = archive.by_date_range("2024-01-01", "2024-12-31")
assert results == []
def test_by_date_range_returns_matching_entries():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
jan = datetime(2024, 1, 15, tzinfo=timezone.utc)
mar = datetime(2024, 3, 10, tzinfo=timezone.utc)
jun = datetime(2024, 6, 1, tzinfo=timezone.utc)
e1 = _make_entry_at(archive, "Jan entry", jan)
e2 = _make_entry_at(archive, "Mar entry", mar)
e3 = _make_entry_at(archive, "Jun entry", jun)
results = archive.by_date_range("2024-01-01", "2024-04-01")
ids = {e.id for e in results}
assert e1.id in ids
assert e2.id in ids
assert e3.id not in ids
def test_by_date_range_boundary_inclusive():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
exact = datetime(2024, 3, 1, tzinfo=timezone.utc)
e = _make_entry_at(archive, "Exact boundary", exact)
results = archive.by_date_range("2024-03-01T00:00:00+00:00", "2024-03-01T00:00:00+00:00")
assert len(results) == 1
assert results[0].id == e.id
def test_by_date_range_no_results():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
jan = datetime(2024, 1, 15, tzinfo=timezone.utc)
_make_entry_at(archive, "Jan entry", jan)
results = archive.by_date_range("2023-01-01", "2023-12-31")
assert results == []
def test_by_date_range_timezone_naive_treated_as_utc():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
dt = datetime(2024, 6, 15, tzinfo=timezone.utc)
e = _make_entry_at(archive, "Summer", dt)
# Timezone-naive start/end should still match
results = archive.by_date_range("2024-06-01", "2024-07-01")
assert any(r.id == e.id for r in results)
def test_by_date_range_sorted_ascending():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
dates = [
datetime(2024, 3, 5, tzinfo=timezone.utc),
datetime(2024, 1, 10, tzinfo=timezone.utc),
datetime(2024, 2, 20, tzinfo=timezone.utc),
]
for i, dt in enumerate(dates):
_make_entry_at(archive, f"Entry {i}", dt)
results = archive.by_date_range("2024-01-01", "2024-12-31")
assert len(results) == 3
assert results[0].created_at < results[1].created_at < results[2].created_at
def test_by_date_range_single_entry_archive():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
dt = datetime(2024, 5, 1, tzinfo=timezone.utc)
e = _make_entry_at(archive, "Only", dt)
assert archive.by_date_range("2024-01-01", "2024-12-31") == [e]
assert archive.by_date_range("2025-01-01", "2025-12-31") == []
# --- temporal_neighbors tests ---
def test_temporal_neighbors_empty_archive():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
e = ingest_event(archive, title="Lone", content="c")
results = archive.temporal_neighbors(e.id, window_days=7)
assert results == []
def test_temporal_neighbors_missing_entry_raises():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
try:
archive.temporal_neighbors("nonexistent-id")
assert False, "Expected KeyError"
except KeyError:
pass
def test_temporal_neighbors_returns_within_window():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
anchor_dt = datetime(2024, 4, 10, tzinfo=timezone.utc)
near_dt = datetime(2024, 4, 14, tzinfo=timezone.utc) # +4 days — within 7
far_dt = datetime(2024, 4, 20, tzinfo=timezone.utc) # +10 days — outside 7
anchor = _make_entry_at(archive, "Anchor", anchor_dt)
near = _make_entry_at(archive, "Near", near_dt)
far = _make_entry_at(archive, "Far", far_dt)
results = archive.temporal_neighbors(anchor.id, window_days=7)
ids = {e.id for e in results}
assert near.id in ids
assert far.id not in ids
assert anchor.id not in ids
def test_temporal_neighbors_excludes_anchor():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
dt = datetime(2024, 4, 10, tzinfo=timezone.utc)
anchor = _make_entry_at(archive, "Anchor", dt)
same = _make_entry_at(archive, "Same day", dt)
results = archive.temporal_neighbors(anchor.id, window_days=0)
ids = {e.id for e in results}
assert anchor.id not in ids
assert same.id in ids
def test_temporal_neighbors_custom_window():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
anchor_dt = datetime(2024, 4, 10, tzinfo=timezone.utc)
within_3 = datetime(2024, 4, 12, tzinfo=timezone.utc) # +2 days
outside_3 = datetime(2024, 4, 15, tzinfo=timezone.utc) # +5 days
anchor = _make_entry_at(archive, "Anchor", anchor_dt)
e_near = _make_entry_at(archive, "Near", within_3)
e_far = _make_entry_at(archive, "Far", outside_3)
results = archive.temporal_neighbors(anchor.id, window_days=3)
ids = {e.id for e in results}
assert e_near.id in ids
assert e_far.id not in ids
def test_temporal_neighbors_sorted_ascending():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
anchor_dt = datetime(2024, 6, 15, tzinfo=timezone.utc)
anchor = _make_entry_at(archive, "Anchor", anchor_dt)
for offset in [5, 1, 3]:
_make_entry_at(archive, f"Offset {offset}", anchor_dt + timedelta(days=offset))
results = archive.temporal_neighbors(anchor.id, window_days=7)
assert len(results) == 3
assert results[0].created_at < results[1].created_at < results[2].created_at
def test_temporal_neighbors_boundary_inclusive():
with tempfile.TemporaryDirectory() as tmp:
archive = MnemosyneArchive(archive_path=Path(tmp) / "a.json")
anchor_dt = datetime(2024, 6, 15, tzinfo=timezone.utc)
boundary_dt = anchor_dt + timedelta(days=7) # exactly at window edge
anchor = _make_entry_at(archive, "Anchor", anchor_dt)
boundary = _make_entry_at(archive, "Boundary", boundary_dt)
results = archive.temporal_neighbors(anchor.id, window_days=7)
assert any(r.id == boundary.id for r in results)

View File

@@ -1,176 +0,0 @@
"""Tests for MnemosyneArchive.consolidate() — duplicate/near-duplicate merging."""
import tempfile
from pathlib import Path
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event
def _archive(tmp: str) -> MnemosyneArchive:
return MnemosyneArchive(archive_path=Path(tmp) / "archive.json", auto_embed=False)
def test_consolidate_exact_duplicate_removed():
"""Two entries with identical content_hash are merged; only one survives."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ingest_event(archive, title="Hello world", content="Exactly the same content", topics=["a"])
# Manually add a second entry with the same hash to simulate a duplicate
e2 = ArchiveEntry(title="Hello world", content="Exactly the same content", topics=["b"])
# Bypass dedup guard so we can test consolidate() rather than add()
archive._entries[e2.id] = e2
archive._save()
assert archive.count == 2
merges = archive.consolidate(dry_run=False)
assert len(merges) == 1
assert merges[0]["reason"] == "exact_hash"
assert merges[0]["score"] == 1.0
assert archive.count == 1
def test_consolidate_keeps_older_entry():
"""The older entry (earlier created_at) is kept, the newer is removed."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ingest_event(archive, title="Hello world", content="Same content here", topics=[])
e2 = ArchiveEntry(title="Hello world", content="Same content here", topics=[])
# Make e2 clearly newer
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
merges = archive.consolidate(dry_run=False)
assert len(merges) == 1
assert merges[0]["kept"] == e1.id
assert merges[0]["removed"] == e2.id
def test_consolidate_merges_topics():
"""Topics from the removed entry are merged (unioned) into the kept entry."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ingest_event(archive, title="Memory item", content="Shared content body", topics=["alpha"])
e2 = ArchiveEntry(title="Memory item", content="Shared content body", topics=["beta", "gamma"])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
archive.consolidate(dry_run=False)
survivor = archive.get(e1.id)
assert survivor is not None
topic_lower = {t.lower() for t in survivor.topics}
assert "alpha" in topic_lower
assert "beta" in topic_lower
assert "gamma" in topic_lower
def test_consolidate_merges_metadata():
"""Metadata from the removed entry is merged into the kept entry; kept values win."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ArchiveEntry(
title="Shared", content="Identical body here", topics=[], metadata={"k1": "v1", "shared": "kept"}
)
archive._entries[e1.id] = e1
e2 = ArchiveEntry(
title="Shared", content="Identical body here", topics=[], metadata={"k2": "v2", "shared": "removed"}
)
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
archive.consolidate(dry_run=False)
survivor = archive.get(e1.id)
assert survivor.metadata["k1"] == "v1"
assert survivor.metadata["k2"] == "v2"
assert survivor.metadata["shared"] == "kept" # kept entry wins
def test_consolidate_dry_run_no_mutation():
"""Dry-run mode returns merge plan but does not alter the archive."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
ingest_event(archive, title="Same", content="Identical content to dedup", topics=[])
e2 = ArchiveEntry(title="Same", content="Identical content to dedup", topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
merges = archive.consolidate(dry_run=True)
assert len(merges) == 1
assert merges[0]["dry_run"] is True
# Archive must be unchanged
assert archive.count == 2
def test_consolidate_no_duplicates():
"""When no duplicates exist, consolidate returns an empty list."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
ingest_event(archive, title="Unique A", content="This is completely unique content for A")
ingest_event(archive, title="Unique B", content="Totally different words here for B")
merges = archive.consolidate(threshold=0.9)
assert merges == []
def test_consolidate_transfers_links():
"""Links from the removed entry are inherited by the kept entry."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
# Create a third entry to act as a link target
target = ingest_event(archive, title="Target", content="The link target entry", topics=[])
e1 = ArchiveEntry(title="Dup", content="Exact duplicate body text", topics=[], links=[target.id])
archive._entries[e1.id] = e1
target.links.append(e1.id)
e2 = ArchiveEntry(title="Dup", content="Exact duplicate body text", topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
archive.consolidate(dry_run=False)
survivor = archive.get(e1.id)
assert survivor is not None
assert target.id in survivor.links
def test_consolidate_near_duplicate_semantic():
"""Near-duplicate entries above the similarity threshold are merged."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
# Entries with very high Jaccard overlap
text_a = "python automation scripting building tools workflows"
text_b = "python automation scripting building tools workflows tasks"
e1 = ArchiveEntry(title="Automator", content=text_a, topics=[])
e2 = ArchiveEntry(title="Automator", content=text_b, topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e1.id] = e1
archive._entries[e2.id] = e2
archive._save()
# Use a low threshold to ensure these very similar entries match
merges = archive.consolidate(threshold=0.7, dry_run=False)
assert len(merges) >= 1
assert merges[0]["reason"] == "semantic_similarity"
def test_consolidate_persists_after_reload():
"""After consolidation, the reduced archive survives a save/reload cycle."""
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path, auto_embed=False)
ingest_event(archive, title="Persist test", content="Body to dedup and persist", topics=[])
e2 = ArchiveEntry(title="Persist test", content="Body to dedup and persist", topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
archive.consolidate(dry_run=False)
assert archive.count == 1
reloaded = MnemosyneArchive(archive_path=path, auto_embed=False)
assert reloaded.count == 1

View File

@@ -1,112 +0,0 @@
"""Tests for the embedding backend module."""
from __future__ import annotations
import math
import pytest
from nexus.mnemosyne.embeddings import (
EmbeddingBackend,
TfidfEmbeddingBackend,
cosine_similarity,
get_embedding_backend,
)
class TestCosineSimilarity:
def test_identical_vectors(self):
a = [1.0, 2.0, 3.0]
assert abs(cosine_similarity(a, a) - 1.0) < 1e-9
def test_orthogonal_vectors(self):
a = [1.0, 0.0]
b = [0.0, 1.0]
assert abs(cosine_similarity(a, b) - 0.0) < 1e-9
def test_opposite_vectors(self):
a = [1.0, 0.0]
b = [-1.0, 0.0]
assert abs(cosine_similarity(a, b) - (-1.0)) < 1e-9
def test_zero_vector(self):
a = [0.0, 0.0]
b = [1.0, 2.0]
assert cosine_similarity(a, b) == 0.0
def test_dimension_mismatch(self):
with pytest.raises(ValueError):
cosine_similarity([1.0], [1.0, 2.0])
class TestTfidfEmbeddingBackend:
def test_basic_embed(self):
backend = TfidfEmbeddingBackend()
vec = backend.embed("hello world test")
assert len(vec) > 0
assert all(isinstance(v, float) for v in vec)
def test_empty_text(self):
backend = TfidfEmbeddingBackend()
vec = backend.embed("")
assert vec == []
def test_identical_texts_similar(self):
backend = TfidfEmbeddingBackend()
v1 = backend.embed("the cat sat on the mat")
v2 = backend.embed("the cat sat on the mat")
sim = backend.similarity(v1, v2)
assert sim > 0.99
def test_different_texts_less_similar(self):
backend = TfidfEmbeddingBackend()
v1 = backend.embed("python programming language")
v2 = backend.embed("cooking recipes italian food")
sim = backend.similarity(v1, v2)
assert sim < 0.5
def test_related_texts_more_similar(self):
backend = TfidfEmbeddingBackend()
v1 = backend.embed("machine learning neural networks")
v2 = backend.embed("deep learning artificial neural nets")
v3 = backend.embed("baking bread sourdough recipe")
sim_related = backend.similarity(v1, v2)
sim_unrelated = backend.similarity(v1, v3)
assert sim_related > sim_unrelated
def test_name(self):
backend = TfidfEmbeddingBackend()
assert "TF-IDF" in backend.name
def test_dimension_grows(self):
backend = TfidfEmbeddingBackend()
d1 = backend.dimension
backend.embed("new unique tokens here")
d2 = backend.dimension
assert d2 > d1
def test_padding_different_lengths(self):
backend = TfidfEmbeddingBackend()
v1 = backend.embed("short")
v2 = backend.embed("this is a much longer text with many more tokens")
# Should not raise despite different lengths
sim = backend.similarity(v1, v2)
assert 0.0 <= sim <= 1.0
class TestGetEmbeddingBackend:
def test_tfidf_preferred(self):
backend = get_embedding_backend(prefer="tfidf")
assert isinstance(backend, TfidfEmbeddingBackend)
def test_auto_returns_something(self):
backend = get_embedding_backend()
assert isinstance(backend, EmbeddingBackend)
def test_ollama_unavailable_falls_back(self):
# Should fall back to TF-IDF when Ollama is unreachable
backend = get_embedding_backend(prefer="ollama", ollama_url="http://localhost:1")
# If it raises, the test fails — it should fall back
# But with prefer="ollama" it raises if unavailable
# So we test without prefer:
backend = get_embedding_backend(ollama_url="http://localhost:1")
assert isinstance(backend, TfidfEmbeddingBackend)

View File

@@ -1,278 +0,0 @@
"""Tests for Mnemosyne memory decay system."""
import json
import os
import tempfile
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
@pytest.fixture
def archive(tmp_path):
"""Create a fresh archive for testing."""
path = tmp_path / "test_archive.json"
return MnemosyneArchive(archive_path=path)
@pytest.fixture
def populated_archive(tmp_path):
"""Create an archive with some entries."""
path = tmp_path / "test_archive.json"
arch = MnemosyneArchive(archive_path=path)
arch.add(ArchiveEntry(title="Fresh Entry", content="Just added", topics=["test"]))
arch.add(ArchiveEntry(title="Old Entry", content="Been here a while", topics=["test"]))
arch.add(ArchiveEntry(title="Another Entry", content="Some content", topics=["other"]))
return arch
class TestVitalityFields:
"""Test that vitality fields exist on entries."""
def test_entry_has_vitality_default(self):
entry = ArchiveEntry(title="Test", content="Content")
assert entry.vitality == 1.0
def test_entry_has_last_accessed_default(self):
entry = ArchiveEntry(title="Test", content="Content")
assert entry.last_accessed is None
def test_entry_roundtrip_with_vitality(self):
entry = ArchiveEntry(
title="Test", content="Content",
vitality=0.75,
last_accessed="2024-01-01T00:00:00+00:00"
)
d = entry.to_dict()
assert d["vitality"] == 0.75
assert d["last_accessed"] == "2024-01-01T00:00:00+00:00"
restored = ArchiveEntry.from_dict(d)
assert restored.vitality == 0.75
assert restored.last_accessed == "2024-01-01T00:00:00+00:00"
class TestTouch:
"""Test touch() access recording and vitality boost."""
def test_touch_sets_last_accessed(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
assert entry.last_accessed is None
touched = archive.touch(entry.id)
assert touched.last_accessed is not None
def test_touch_boosts_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content", vitality=0.5))
touched = archive.touch(entry.id)
# Boost = 0.1 * (1 - 0.5) = 0.05, so vitality should be ~0.55
# (assuming no time decay in test — instantaneous)
assert touched.vitality > 0.5
assert touched.vitality <= 1.0
def test_touch_diminishing_returns(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content", vitality=0.9))
touched = archive.touch(entry.id)
# Boost = 0.1 * (1 - 0.9) = 0.01, so vitality should be ~0.91
assert touched.vitality < 0.92
assert touched.vitality > 0.9
def test_touch_never_exceeds_one(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content", vitality=0.99))
for _ in range(10):
entry = archive.touch(entry.id)
assert entry.vitality <= 1.0
def test_touch_missing_entry_raises(self, archive):
with pytest.raises(KeyError):
archive.touch("nonexistent-id")
def test_touch_persists(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
archive.touch(entry.id)
# Reload archive
arch2 = MnemosyneArchive(archive_path=archive._path)
loaded = arch2.get(entry.id)
assert loaded.last_accessed is not None
class TestGetVitality:
"""Test get_vitality() status reporting."""
def test_get_vitality_basic(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
status = archive.get_vitality(entry.id)
assert status["entry_id"] == entry.id
assert status["title"] == "Test"
assert 0.0 <= status["vitality"] <= 1.0
assert status["age_days"] == 0
def test_get_vitality_missing_raises(self, archive):
with pytest.raises(KeyError):
archive.get_vitality("nonexistent-id")
class TestComputeVitality:
"""Test the decay computation."""
def test_new_entry_full_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
v = archive._compute_vitality(entry)
assert v == 1.0
def test_recently_touched_high_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
archive.touch(entry.id)
v = archive._compute_vitality(entry)
assert v > 0.99 # Should be essentially 1.0 since just touched
def test_old_entry_decays(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
# Simulate old access — set last_accessed to 60 days ago
old_date = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
entry.last_accessed = old_date
entry.vitality = 1.0
archive._save()
v = archive._compute_vitality(entry)
# 60 days with 30-day half-life: v = 1.0 * 0.5^(60/30) = 0.25
assert v < 0.3
assert v > 0.2
def test_very_old_entry_nearly_zero(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
old_date = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
entry.last_accessed = old_date
entry.vitality = 1.0
archive._save()
v = archive._compute_vitality(entry)
# 365 days / 30 half-life = ~12 half-lives -> ~0.0002
assert v < 0.01
class TestFading:
"""Test fading() — most neglected entries."""
def test_fading_returns_lowest_first(self, populated_archive):
entries = list(populated_archive._entries.values())
# Make one entry very old
old_entry = entries[1]
old_date = (datetime.now(timezone.utc) - timedelta(days=90)).isoformat()
old_entry.last_accessed = old_date
old_entry.vitality = 1.0
populated_archive._save()
fading = populated_archive.fading(limit=3)
assert len(fading) <= 3
# First result should be the oldest
assert fading[0]["entry_id"] == old_entry.id
# Should be in ascending order
for i in range(len(fading) - 1):
assert fading[i]["vitality"] <= fading[i + 1]["vitality"]
def test_fading_empty_archive(self, archive):
fading = archive.fading()
assert fading == []
def test_fading_limit(self, populated_archive):
fading = populated_archive.fading(limit=2)
assert len(fading) == 2
class TestVibrant:
"""Test vibrant() — most alive entries."""
def test_vibrant_returns_highest_first(self, populated_archive):
entries = list(populated_archive._entries.values())
# Make one entry very old
old_entry = entries[1]
old_date = (datetime.now(timezone.utc) - timedelta(days=90)).isoformat()
old_entry.last_accessed = old_date
old_entry.vitality = 1.0
populated_archive._save()
vibrant = populated_archive.vibrant(limit=3)
# Should be in descending order
for i in range(len(vibrant) - 1):
assert vibrant[i]["vitality"] >= vibrant[i + 1]["vitality"]
# First result should NOT be the old entry
assert vibrant[0]["entry_id"] != old_entry.id
def test_vibrant_empty_archive(self, archive):
vibrant = archive.vibrant()
assert vibrant == []
class TestApplyDecay:
"""Test apply_decay() bulk decay operation."""
def test_apply_decay_returns_stats(self, populated_archive):
result = populated_archive.apply_decay()
assert result["total_entries"] == 3
assert "decayed_count" in result
assert "avg_vitality" in result
assert "fading_count" in result
assert "vibrant_count" in result
def test_apply_decay_persists(self, populated_archive):
populated_archive.apply_decay()
# Reload
arch2 = MnemosyneArchive(archive_path=populated_archive._path)
result2 = arch2.apply_decay()
# Should show same entries
assert result2["total_entries"] == 3
def test_apply_decay_on_empty(self, archive):
result = archive.apply_decay()
assert result["total_entries"] == 0
assert result["avg_vitality"] == 0.0
class TestStatsVitality:
"""Test that stats() includes vitality summary."""
def test_stats_includes_vitality(self, populated_archive):
stats = populated_archive.stats()
assert "avg_vitality" in stats
assert "fading_count" in stats
assert "vibrant_count" in stats
assert 0.0 <= stats["avg_vitality"] <= 1.0
def test_stats_empty_archive(self, archive):
stats = archive.stats()
assert stats["avg_vitality"] == 0.0
assert stats["fading_count"] == 0
assert stats["vibrant_count"] == 0
class TestDecayLifecycle:
"""Integration test: full lifecycle from creation to fading."""
def test_entry_lifecycle(self, archive):
# Create
entry = archive.add(ArchiveEntry(title="Memory", content="A thing happened"))
assert entry.vitality == 1.0
# Touch a few times
for _ in range(5):
archive.touch(entry.id)
# Check it's vibrant
vibrant = archive.vibrant(limit=1)
assert len(vibrant) == 1
assert vibrant[0]["entry_id"] == entry.id
# Simulate time passing
entry.last_accessed = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
entry.vitality = 0.8
archive._save()
# Apply decay
result = archive.apply_decay()
assert result["total_entries"] == 1
# Check it's now fading
fading = archive.fading(limit=1)
assert fading[0]["entry_id"] == entry.id
assert fading[0]["vitality"] < 0.5

160
style.css
View File

@@ -1917,163 +1917,3 @@ canvas#nexus-canvas {
background: rgba(74, 240, 192, 0.18);
border-color: #4af0c0;
}
/* ═══ MNEMOSYNE: Memory Connections Panel ═══ */
.memory-connections-panel {
position: fixed;
top: 50%;
right: 280px;
transform: translateY(-50%) translateX(12px);
width: 260px;
max-height: 70vh;
background: rgba(10, 12, 18, 0.92);
border: 1px solid rgba(74, 240, 192, 0.15);
border-radius: 8px;
box-shadow: 0 8px 32px rgba(0,0,0,0.5);
z-index: 310;
display: flex;
flex-direction: column;
opacity: 0;
transition: opacity 0.2s ease, transform 0.2s ease;
backdrop-filter: blur(8px);
-webkit-backdrop-filter: blur(8px);
font-family: var(--font-mono, monospace);
}
.memory-connections-panel.mc-visible {
opacity: 1;
transform: translateY(-50%) translateX(0);
}
.mc-header {
display: flex;
align-items: center;
justify-content: space-between;
padding: 10px 14px;
border-bottom: 1px solid rgba(255, 255, 255, 0.06);
}
.mc-title {
color: rgba(74, 240, 192, 0.8);
font-size: 11px;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.5px;
}
.mc-close {
background: none;
border: none;
color: rgba(255, 255, 255, 0.4);
font-size: 14px;
cursor: pointer;
padding: 2px 6px;
border-radius: 4px;
line-height: 1;
}
.mc-close:hover {
color: #fff;
background: rgba(255, 255, 255, 0.1);
}
.mc-section {
padding: 8px 14px 10px;
border-bottom: 1px solid rgba(255, 255, 255, 0.05);
}
.mc-section:last-child { border-bottom: none; }
.mc-section-label {
color: rgba(74, 240, 192, 0.5);
font-size: 9px;
text-transform: uppercase;
letter-spacing: 1px;
margin-bottom: 8px;
}
.mc-conn-list, .mc-suggest-list {
max-height: 200px;
overflow-y: auto;
}
.mc-conn-list::-webkit-scrollbar, .mc-suggest-list::-webkit-scrollbar { width: 3px; }
.mc-conn-list::-webkit-scrollbar-thumb, .mc-suggest-list::-webkit-scrollbar-thumb {
background: rgba(74, 240, 192, 0.15);
border-radius: 2px;
}
.mc-conn-item, .mc-suggest-item {
display: flex;
align-items: center;
justify-content: space-between;
padding: 6px 8px;
border-radius: 5px;
margin-bottom: 4px;
transition: background 0.15s ease;
}
.mc-conn-item:hover {
background: rgba(74, 240, 192, 0.06);
}
.mc-suggest-item:hover {
background: rgba(123, 92, 255, 0.06);
}
.mc-conn-info, .mc-suggest-info {
flex: 1;
min-width: 0;
overflow: hidden;
}
.mc-conn-label, .mc-suggest-label {
display: block;
color: var(--color-text, #ccc);
font-size: 11px;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.mc-conn-meta, .mc-suggest-meta {
display: block;
color: rgba(255, 255, 255, 0.3);
font-size: 9px;
margin-top: 1px;
}
.mc-conn-actions {
display: flex;
gap: 4px;
flex-shrink: 0;
margin-left: 8px;
}
.mc-btn {
background: none;
border: 1px solid rgba(255, 255, 255, 0.12);
color: rgba(255, 255, 255, 0.5);
cursor: pointer;
border-radius: 4px;
font-size: 12px;
padding: 2px 6px;
line-height: 1;
transition: all 0.15s ease;
}
.mc-btn-nav:hover {
border-color: #4af0c0;
color: #4af0c0;
background: rgba(74, 240, 192, 0.08);
}
.mc-btn-remove:hover {
border-color: #ff4466;
color: #ff4466;
background: rgba(255, 68, 102, 0.08);
}
.mc-btn-add {
border-color: rgba(123, 92, 255, 0.3);
color: rgba(123, 92, 255, 0.7);
}
.mc-btn-add:hover {
border-color: #7b5cff;
color: #7b5cff;
background: rgba(123, 92, 255, 0.12);
}
.mc-empty {
color: rgba(255, 255, 255, 0.25);
font-size: 11px;
font-style: italic;
padding: 4px 0;
}

205
tests/test_mnemosyne.py Normal file
View File

@@ -0,0 +1,205 @@
"""
Tests for Mnemosyne — The Living Holographic Archive.
Round-trip: ingest sample docs → query → verify results.
"""
import json
import os
import tempfile
import pytest
# Add parent to path for imports
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from mnemosyne.ingest import (
chunk_text, ingest_text, ingest_file, ingest_directory,
get_stats, get_db,
)
from mnemosyne.index import keyword_search, query, list_documents, get_document
@pytest.fixture
def db_path(tmp_path):
"""Temporary database for each test."""
return str(tmp_path / "test_mnemosyne.db")
@pytest.fixture
def sample_docs(tmp_path):
"""Create sample documents for testing."""
docs = {}
# Plain text
txt = tmp_path / "alice.txt"
txt.write_text(
"Alice was beginning to get very tired of sitting by her sister on the bank. "
"She had peeped into the book her sister was reading, but it had no pictures "
"or conversations in it. 'And what is the use of a book,' thought Alice, "
"'without pictures or conversations?'"
)
docs["txt"] = str(txt)
# Markdown
md = tmp_path / "readme.md"
md.write_text(
"# Project Mnemosyne\n\n"
"Mnemosyne is a sovereign holographic archive system.\n\n"
"## Features\n\n"
"- Full-text search with FTS5\n"
"- Semantic search with embeddings\n"
"- Reciprocal rank fusion for hybrid results\n"
"- SQLite-backed, no external dependencies\n"
)
docs["md"] = str(md)
# JSON
js = tmp_path / "data.json"
js.write_text(json.dumps({
"title": "The Sovereignty Principle",
"body": "Every person has the right to run their own intelligence on their own hardware, "
"answerable to no one. This is the foundation of digital sovereignty.",
}))
docs["json"] = str(js)
# JSON array
js_arr = tmp_path / "records.json"
js_arr.write_text(json.dumps([
{"title": "Record A", "text": "First record about Bitcoin and the blockchain."},
{"title": "Record B", "text": "Second record about AI and language models."},
]))
docs["json_array"] = str(js_arr)
return docs
class TestChunking:
def test_short_text_no_split(self):
text = "Short text."
chunks = chunk_text(text, chunk_size=100)
assert len(chunks) == 1
assert chunks[0] == text
def test_long_text_splits(self):
text = "word " * 200 # 1000 chars
chunks = chunk_text(text, chunk_size=200, overlap=20)
assert len(chunks) > 1
def test_overlap_exists(self):
text = "aaa " * 100 + "bbb " * 100
chunks = chunk_text(text, chunk_size=200, overlap=50)
# Some chunks should contain both aaa and bbb due to overlap
cross_chunks = [c for c in chunks if "aaa" in c and "bbb" in c]
assert len(cross_chunks) > 0
class TestIngestion:
def test_ingest_text_returns_id(self, db_path):
doc_id = ingest_text("Hello world", source="test", db_path=db_path)
assert doc_id is not None
assert doc_id > 0
def test_ingest_text_dedup(self, db_path):
doc_id1 = ingest_text("Hello world", source="test", db_path=db_path)
doc_id2 = ingest_text("Hello world", source="test", db_path=db_path)
assert doc_id1 is not None
assert doc_id2 is None # duplicate
def test_ingest_file_txt(self, db_path, sample_docs):
doc_id = ingest_file(sample_docs["txt"], db_path=db_path)
assert doc_id is not None
def test_ingest_file_json(self, db_path, sample_docs):
doc_id = ingest_file(sample_docs["json"], db_path=db_path)
assert doc_id is not None
def test_ingest_file_json_array(self, db_path, sample_docs):
doc_id = ingest_file(sample_docs["json_array"], db_path=db_path)
assert doc_id is not None
# Should have ingested 2 records
stats = get_stats(db_path)
assert stats["documents"] == 2
def test_ingest_directory(self, db_path, sample_docs, tmp_path):
result = ingest_directory(str(tmp_path), db_path=db_path)
assert result["ingested"] >= 4
assert len(result["errors"]) == 0
def test_stats(self, db_path, sample_docs):
ingest_file(sample_docs["txt"], db_path=db_path)
ingest_file(sample_docs["md"], db_path=db_path)
stats = get_stats(db_path)
assert stats["documents"] == 2
assert stats["chunks"] >= 2
class TestSearch:
def test_keyword_search(self, db_path, sample_docs):
ingest_file(sample_docs["md"], db_path=db_path)
results = keyword_search("Mnemosyne archive", db_path=db_path)
assert len(results) > 0
assert "mnemosyne" in results[0]["content"].lower() or "archive" in results[0]["content"].lower()
def test_query_returns_results(self, db_path, sample_docs):
ingest_file(sample_docs["txt"], db_path=db_path)
results = query("Alice tired bank", db_path=db_path)
assert len(results) > 0
def test_query_empty_db(self, db_path):
results = query("anything", db_path=db_path)
assert results == []
def test_query_no_match(self, db_path, sample_docs):
ingest_file(sample_docs["txt"], db_path=db_path)
results = query("xyzzyplugh quantum entanglement", db_path=db_path)
assert results == []
def test_list_documents(self, db_path, sample_docs):
ingest_file(sample_docs["txt"], db_path=db_path)
ingest_file(sample_docs["md"], db_path=db_path)
docs = list_documents(db_path=db_path)
assert len(docs) == 2
assert all("chunks" in d for d in docs)
def test_get_document(self, db_path, sample_docs):
doc_id = ingest_file(sample_docs["txt"], db_path=db_path)
doc = get_document(doc_id, db_path=db_path)
assert doc is not None
assert "Alice" in doc["content"]
assert doc["title"] == "alice"
def test_get_document_not_found(self, db_path):
doc = get_document(9999, db_path=db_path)
assert doc is None
class TestRoundTrip:
"""Full round-trip: ingest → query → verify recall."""
def test_round_trip(self, db_path, sample_docs, tmp_path):
# Ingest all sample docs
result = ingest_directory(str(tmp_path), db_path=db_path)
assert result["ingested"] >= 4
# Verify stats
stats = get_stats(db_path)
assert stats["documents"] >= 4
assert stats["chunks"] > 0
# Query for Alice
results = query("Alice pictures conversations", db_path=db_path)
assert len(results) > 0
assert any("alice" in r.get("title", "").lower() or "Alice" in r["content"] for r in results)
# Query for Mnemosyne
results = query("Mnemosyne sovereign archive", db_path=db_path)
assert len(results) > 0
# Query for sovereignty
results = query("sovereignty intelligence hardware", db_path=db_path)
assert len(results) > 0
# List all documents
docs = list_documents(db_path=db_path)
assert len(docs) >= 4