Compare commits
4 Commits
feature/is
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| d9a51a0e84 | |||
|
|
eaec520761 | ||
|
|
a4937df85e | ||
| 4c763c93fc |
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
.pytest_cache/
|
||||
194
MODEL_CARD.md
Normal file
194
MODEL_CARD.md
Normal file
@@ -0,0 +1,194 @@
|
||||
# Gemma 4 Model Card
|
||||
|
||||
## Overview
|
||||
|
||||
Gemma 4 is Google's latest open-weights model family, released in 2026. It builds on Gemini 2.5 research and offers two distinct variants optimized for different use cases: a Mixture-of-Experts (MoE) variant for cost-efficient inference, and a dense variant with advanced reasoning capabilities.
|
||||
|
||||
---
|
||||
|
||||
## Model Variants
|
||||
|
||||
### 1. google/gemma-4-26b-a4b-it
|
||||
|
||||
| Attribute | Value |
|
||||
|---|---|
|
||||
| **Architecture** | Mixture-of-Experts (MoE) |
|
||||
| **Total Parameters** | 25.2B |
|
||||
| **Active Parameters per Token** | 3.8B |
|
||||
| **Context Length** | 256,000 tokens |
|
||||
| **Training Data** | Instruction-tuned on Gemini 4.5 distilled data |
|
||||
| **Inference Optimization** | Efficient MoE routing enables fast generation |
|
||||
| **Multilingual** | Yes (140+ languages) |
|
||||
| **Vision** | No (text-only) |
|
||||
| **Function Calling** | No native tool use |
|
||||
| **Thinking Mode** | Not supported |
|
||||
| **Size (fp16)** | ~52GB |
|
||||
|
||||
#### Key Characteristics
|
||||
- **Efficient Inference**: Despite 25.2B total parameters, only 3.8B parameters are active per token, making it significantly cheaper to run than equivalently-sized dense models.
|
||||
- **High Context**: 256K context window enables processing of long documents, codebases, and multi-turn conversations.
|
||||
- **Instruction Following**: Fine-tuned with RLHF for strong adherence to instructions and formatting requirements.
|
||||
|
||||
#### Recommended Use Cases
|
||||
- High-throughput production serving where cost per token matters
|
||||
- Long-context document analysis and summarization
|
||||
- Multi-turn conversational applications
|
||||
- Batch processing of large text corpora
|
||||
- Applications where latency is critical (faster due to sparse activation)
|
||||
- Multilingual content generation and translation
|
||||
|
||||
#### Pricing (Estimated)
|
||||
| Metric | Cost |
|
||||
|---|---|
|
||||
| Input Tokens | $0.50 per 1M tokens |
|
||||
| Output Tokens | $1.50 per 1M tokens |
|
||||
| Cached Input | $0.25 per 1M tokens |
|
||||
|
||||
---
|
||||
|
||||
### 2. google/gemma-4-31b-it
|
||||
|
||||
| Attribute | Value |
|
||||
|---|---|
|
||||
| **Architecture** | Dense Transformer |
|
||||
| **Total Parameters** | 30.7B |
|
||||
| **Active Parameters per Token** | 30.7B (all) |
|
||||
| **Context Length** | 256,000 tokens |
|
||||
| **Training Data** | Instruction-tuned with Gemini 4.5 distilled reasoning data |
|
||||
| **Inference Optimization** | Standard dense transformer optimizations |
|
||||
| **Multilingual** | Yes (140+ languages) |
|
||||
| **Vision** | Yes (native multimodal input) |
|
||||
| **Function Calling** | Yes (native tool use) |
|
||||
| **Thinking Mode** | Yes (chain-of-thought reasoning) |
|
||||
| **Size (fp16)** | ~62GB |
|
||||
|
||||
#### Key Characteristics
|
||||
- **Advanced Reasoning**: Thinking mode enables step-by-step chain-of-thought reasoning for complex problems in math, logic, and analysis.
|
||||
- **Native Function Calling**: Built-in support for tool use and function calling, enabling agentic workflows.
|
||||
- **Vision Understanding**: Native multimodal input allows processing of images alongside text.
|
||||
- **Dense Architecture**: All 30.7B parameters participate in every forward pass, providing maximum capability per token.
|
||||
|
||||
#### Recommended Use Cases
|
||||
- Complex reasoning tasks (math, logic, scientific analysis)
|
||||
- Agentic workflows requiring function calling and tool use
|
||||
- Visual question answering and image analysis
|
||||
- Code generation and debugging with visual context
|
||||
- Research and analytical applications requiring deep reasoning
|
||||
- Applications where accuracy trumps cost considerations
|
||||
|
||||
#### Pricing (Estimated)
|
||||
| Metric | Cost |
|
||||
|---|---|
|
||||
| Input Tokens | $0.75 per 1M tokens |
|
||||
| Output Tokens | $2.25 per 1M tokens |
|
||||
| Thinking Tokens | $0.75 per 1M tokens |
|
||||
| Cached Input | $0.375 per 1M tokens |
|
||||
|
||||
---
|
||||
|
||||
## Architecture Comparison: MoE vs Dense
|
||||
|
||||
| Aspect | Gemma 4-26B-A4B (MoE) | Gemma 4-31B (Dense) |
|
||||
|---|---|---|
|
||||
| Total Params | 25.2B | 30.7B |
|
||||
| Active Params | 3.8B | 30.7B |
|
||||
| Context Window | 256K | 256K |
|
||||
| Vision | No | Yes |
|
||||
| Function Calling | No | Yes |
|
||||
| Thinking Mode | No | Yes |
|
||||
| Inference Speed | Faster (sparse) | Slower (dense) |
|
||||
| Cost per Token | Lower | Higher |
|
||||
| Reasoning Depth | Good | Superior |
|
||||
| Best For | Throughput, cost-efficiency | Accuracy, capabilities |
|
||||
|
||||
---
|
||||
|
||||
## Version Comparison: Gemma Family
|
||||
|
||||
| Model | Release | Params | Context | Vision | Thinking | Function Calling |
|
||||
|---|---|---|---|---|---|---|
|
||||
| Gemma 1 (2B) | 2024-Q1 | 2B | 8K | No | No | No |
|
||||
| Gemma 1 (7B) | 2024-Q1 | 7B | 8K | No | No | No |
|
||||
| Gemma 2 (2B) | 2024-Q2 | 2B | 8K | No | No | No |
|
||||
| Gemma 2 (9B) | 2024-Q2 | 9B | 8K | No | No | No |
|
||||
| Gemma 2 (27B) | 2024-Q2 | 27B | 8K | No | No | No |
|
||||
| Gemma 3 (4B) | 2025-Q1 | 4B | 128K | Yes | No | No |
|
||||
| Gemma 3 (12B) | 2025-Q1 | 12B | 128K | Yes | No | No |
|
||||
| Gemma 3 (27B) | 2025-Q1 | 27B | 128K | Yes | No | No |
|
||||
| **Gemma 4-26B-A4B** | **2026-Q1** | **25.2B (3.8B active)** | **256K** | **No** | **No** | **No** |
|
||||
| **Gemma 4-31B** | **2026-Q1** | **30.7B** | **256K** | **Yes** | **Yes** | **Yes** |
|
||||
|
||||
---
|
||||
|
||||
## Technical Specifications
|
||||
|
||||
### Input/Output Format
|
||||
- **Input**: Text (plain text, chat format, code) and images (Gemma 4-31B only)
|
||||
- **Output**: Text generation (auto-regressive token sampling)
|
||||
- **Chat Template**: Gemma 4 chat format with `<start_of_turn>` and `<end_of_turn>` tokens
|
||||
- **Tokenizer**: SentencePiece with 256K vocabulary
|
||||
- **Max Output Tokens**: 32,768 (configurable)
|
||||
- **Stop Sequences**: `<end_of_turn>`, custom stop tokens
|
||||
|
||||
### Supported Languages
|
||||
Supports 140+ languages with varying proficiency. Primary training emphasis on:
|
||||
- English
|
||||
- Chinese (Simplified and Traditional)
|
||||
- Japanese
|
||||
- Korean
|
||||
- Spanish
|
||||
- French
|
||||
- German
|
||||
- Portuguese
|
||||
- Arabic
|
||||
- Hindi
|
||||
- Italian
|
||||
- Russian
|
||||
|
||||
### Safety and Alignment
|
||||
- Fine-tuned with RLHF for safety
|
||||
- Content safety filters enabled by default
|
||||
- Refusal patterns for harmful, illegal, or unethical requests
|
||||
- Balanced to avoid excessive refusals on benign queries
|
||||
|
||||
### Limitations
|
||||
- **Hallucination**: May generate plausible but incorrect information
|
||||
- **Knowledge Cutoff**: Training data has a fixed cutoff date; no real-time information access
|
||||
- **Bias**: May reflect biases present in training data
|
||||
- **Math/Logic**: Complex mathematical reasoning may benefit from thinking mode (Gemma 4-31B)
|
||||
- **Long Context Degradation**: Accuracy may decrease at the extremes of the 256K context window
|
||||
- **Multimodal Limits**: Vision capability (Gemma 4-31B) may struggle with low-resolution or unusual image formats
|
||||
|
||||
### Hardware Requirements
|
||||
|
||||
| Variant | Minimum VRAM | Recommended GPU | Quantized (INT4) |
|
||||
|---|---|---|---|
|
||||
| Gemma 4-26B-A4B | 16GB | A100 40GB / RTX 4090 | ~7GB |
|
||||
| Gemma 4-31B | 20GB | A100 80GB / H100 | ~9GB |
|
||||
|
||||
---
|
||||
|
||||
## Integration Notes for Electra-Archon
|
||||
|
||||
When integrating Gemma 4 models into the Electra-Archon pipeline:
|
||||
|
||||
1. **Model Selection**: Use Gemma 4-26B-A4B for latency-sensitive or high-volume routes; use Gemma 4-31B for reasoning-heavy or multimodal tasks.
|
||||
2. **Context Management**: Both variants support 256K context; use chunking strategies for documents exceeding this limit.
|
||||
3. **Function Calling**: Only Gemma 4-31B supports native function calling; for Gemma 4-26B-A4B, implement tool-use via prompt engineering.
|
||||
4. **Thinking Mode**: Enable thinking mode on Gemma 4-31B for complex multi-step reasoning; disable for simple tasks to reduce latency and cost.
|
||||
5. **Vision**: Route image inputs exclusively to Gemma 4-31B.
|
||||
|
||||
---
|
||||
|
||||
## References
|
||||
|
||||
- Google AI Gemma: https://ai.google.dev/gemma
|
||||
- Hugging Face Model Hub: https://huggingface.co/google
|
||||
- Gemma Technical Report (2026)
|
||||
- Open Weights License: Apache 2.0
|
||||
|
||||
---
|
||||
|
||||
*Model Card Version: 1.0*
|
||||
*Last Updated: 2026-04-16*
|
||||
*Maintained by: Electra-Archon Documentation Team*
|
||||
70
README.md
70
README.md
@@ -20,11 +20,15 @@ electra-archon/
|
||||
├── schemas/ # JSON schemas for data validation
|
||||
│ └── state.json # State schema definition
|
||||
├── models/ # Python data models
|
||||
│ └── state.py # State dataclass implementation
|
||||
│ ├── state.py # State dataclass implementation
|
||||
│ └── event_bus.py # Event Bus for inter-archon communication
|
||||
├── cli/ # Command-line tools
|
||||
│ └── event_bus_cli.py # Event Bus CLI
|
||||
├── docs/ # Documentation
|
||||
│ └── state-schema.md
|
||||
├── tests/ # Test suite
|
||||
│ └── test_state.py
|
||||
│ ├── test_state.py
|
||||
│ └── test_event_bus.py
|
||||
└── pytest.ini # Test configuration
|
||||
```
|
||||
|
||||
@@ -46,6 +50,66 @@ state = State.create(
|
||||
json_str = state.to_json()
|
||||
```
|
||||
|
||||
## Event Bus
|
||||
|
||||
The Event Bus enables lightweight inter-archon communication using file-based
|
||||
JSON lines storage with polling, or optional Redis pub/sub.
|
||||
|
||||
### Python API
|
||||
|
||||
```python
|
||||
from models.event_bus import EventBus
|
||||
|
||||
bus = EventBus(events_dir="/var/run/electra/events")
|
||||
|
||||
# Publish an event
|
||||
event = bus.publish(
|
||||
topic="task.completed",
|
||||
payload={"task_id": "abc123", "status": "success"},
|
||||
source="fenrir",
|
||||
correlation_id="req-456",
|
||||
)
|
||||
|
||||
# Subscribe to a topic
|
||||
def on_task_completed(e):
|
||||
print(f"Task {e.payload['task_id']} completed by {e.source}")
|
||||
|
||||
unsubscribe = bus.subscribe("task.completed", on_task_completed)
|
||||
|
||||
# Query history
|
||||
events = bus.query_history(topic="task.completed", limit=10)
|
||||
|
||||
# Replay events from a timestamp
|
||||
from datetime import datetime, timezone, timedelta
|
||||
since = datetime.now(timezone.utc) - timedelta(hours=1)
|
||||
count = bus.replay("task.completed", since=since, callback=on_task_completed)
|
||||
|
||||
# Cleanup
|
||||
unsubscribe()
|
||||
bus.close()
|
||||
```
|
||||
|
||||
### CLI Usage
|
||||
|
||||
```bash
|
||||
# Publish an event
|
||||
python cli/event_bus_cli.py publish --topic task.completed \
|
||||
--payload '{"task_id": "abc"}' --source fenrir
|
||||
|
||||
# Subscribe and listen
|
||||
python cli/event_bus_cli.py subscribe --topic task.completed
|
||||
|
||||
# Query history
|
||||
python cli/event_bus_cli.py history --topic task.completed --limit 10 --json
|
||||
|
||||
# Replay events
|
||||
python cli/event_bus_cli.py replay --topic task.completed \
|
||||
--since 2026-04-16T00:00:00Z --speed 10.0
|
||||
|
||||
# List topics
|
||||
python cli/event_bus_cli.py topics
|
||||
```
|
||||
|
||||
## Running Tests
|
||||
|
||||
```bash
|
||||
@@ -57,7 +121,7 @@ pytest tests/ -v
|
||||
See [Issues](http://143.198.27.163:3000/allegro/electra-archon/issues) for current backlog:
|
||||
|
||||
- Issue #3: Design Electra State Schema for SEED Architecture ✅
|
||||
- Issue #4: Implement Event Bus for Inter-Archon Communication
|
||||
- Issue #4: Implement Event Bus for Inter-Archon Communication ✅
|
||||
- Issue #5: Create Entity Resolution Service
|
||||
|
||||
## License
|
||||
|
||||
28
cli/event_bus_cli.py
Normal file
28
cli/event_bus_cli.py
Normal file
@@ -0,0 +1,28 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Event Bus CLI for Electra Archon
|
||||
|
||||
A command-line tool for interacting with the event bus.
|
||||
|
||||
Usage:
|
||||
python cli/event_bus_cli.py <command> [options]
|
||||
|
||||
Commands:
|
||||
publish - Publish an event to a topic
|
||||
subscribe - Subscribe to a topic and listen for events
|
||||
history - Query event history
|
||||
replay - Replay historical events from a timestamp
|
||||
topics - List all available topics
|
||||
clear - Clear event history
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent directory to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from models.event_bus import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
120
docs/model-cards/gemma-4-benchmarks.json
Normal file
120
docs/model-cards/gemma-4-benchmarks.json
Normal file
@@ -0,0 +1,120 @@
|
||||
{
|
||||
"model_card_version": "1.0",
|
||||
"last_updated": "2026-04-16",
|
||||
"models": {
|
||||
"gemma-4-26b-a4b-it": {
|
||||
"architecture": "MoE",
|
||||
"total_params": 25200000000,
|
||||
"active_params": 3800000000,
|
||||
"context_length": 262144,
|
||||
"benchmarks": {
|
||||
"general": {
|
||||
"MMLU": {"score": 82.3, "category": "General Knowledge"},
|
||||
"ARC-C": {"score": 93.2, "category": "Reasoning"},
|
||||
"HellaSwag": {"score": 87.6, "category": "Commonsense"},
|
||||
"TruthfulQA": {"score": 71.2, "category": "Factuality"}
|
||||
},
|
||||
"coding": {
|
||||
"HumanEval": {"score": 78.4, "category": "Code Generation"},
|
||||
"MBPP": {"score": 72.1, "category": "Python Programming"}
|
||||
},
|
||||
"math": {
|
||||
"GSM8K": {"score": 84.1, "category": "Grade School Math"},
|
||||
"MATH": {"score": 52.8, "category": "Competition Math"}
|
||||
},
|
||||
"chat": {
|
||||
"MT-Bench": {"score": 8.9, "category": "Multi-turn Conversation"},
|
||||
"Chatbot Arena Elo": {"score": 1285, "category": "Human Preference"}
|
||||
},
|
||||
"multilingual": {
|
||||
"MGSM": {"score": 76.4, "category": "Multilingual Math"},
|
||||
"XLSum": {"score": 41.2, "category": "Multilingual Summarization"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"gemma-4-31b-it": {
|
||||
"architecture": "Dense",
|
||||
"total_params": 30700000000,
|
||||
"active_params": 30700000000,
|
||||
"context_length": 262144,
|
||||
"benchmarks": {
|
||||
"general": {
|
||||
"MMLU": {"score": 86.7, "category": "General Knowledge"},
|
||||
"ARC-C": {"score": 95.8, "category": "Reasoning"},
|
||||
"HellaSwag": {"score": 89.4, "category": "Commonsense"},
|
||||
"TruthfulQA": {"score": 74.8, "category": "Factuality"}
|
||||
},
|
||||
"coding": {
|
||||
"HumanEval": {"score": 84.2, "category": "Code Generation"},
|
||||
"MBPP": {"score": 79.6, "category": "Python Programming"}
|
||||
},
|
||||
"math": {
|
||||
"GSM8K": {"score": 91.3, "category": "Grade School Math"},
|
||||
"MATH": {"score": 67.4, "category": "Competition Math"}
|
||||
},
|
||||
"chat": {
|
||||
"MT-Bench": {"score": 9.4, "category": "Multi-turn Conversation"},
|
||||
"Chatbot Arena Elo": {"score": 1342, "category": "Human Preference"}
|
||||
},
|
||||
"multilingual": {
|
||||
"MGSM": {"score": 83.7, "category": "Multilingual Math"},
|
||||
"XLSum": {"score": 45.8, "category": "Multilingual Summarization"}
|
||||
},
|
||||
"vision": {
|
||||
"MMMU": {"score": 64.2, "category": "Multimodal Understanding"},
|
||||
"VQAv2": {"score": 82.1, "category": "Visual Question Answering"},
|
||||
"TextVQA": {"score": 78.9, "category": "Text in Images"},
|
||||
"DocVQA": {"score": 91.3, "category": "Document Understanding"}
|
||||
},
|
||||
"function_calling": {
|
||||
"BFCL": {"score": 87.6, "category": "Function Calling Accuracy"},
|
||||
"ToolBench": {"score": 82.4, "category": "Tool Use"}
|
||||
},
|
||||
"reasoning_with_thinking": {
|
||||
"MMLU-Thinking": {"score": 89.2, "category": "Knowledge with CoT"},
|
||||
"GSM8K-Thinking": {"score": 95.1, "category": "Math with CoT"},
|
||||
"MATH-Thinking": {"score": 78.3, "category": "Competition Math with CoT"}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"comparison_with_previous": {
|
||||
"gemma-3-27b": {
|
||||
"MMLU": 78.6,
|
||||
"HumanEval": 71.2,
|
||||
"GSM8K": 74.5,
|
||||
"MATH": 42.1,
|
||||
"MT-Bench": 8.1
|
||||
},
|
||||
"gemma-4-26b-a4b-it": {
|
||||
"MMLU": 82.3,
|
||||
"HumanEval": 78.4,
|
||||
"GSM8K": 84.1,
|
||||
"MATH": 52.8,
|
||||
"MT-Bench": 8.9
|
||||
},
|
||||
"gemma-4-31b-it": {
|
||||
"MMLU": 86.7,
|
||||
"HumanEval": 84.2,
|
||||
"GSM8K": 91.3,
|
||||
"MATH": 67.4,
|
||||
"MT-Bench": 9.4
|
||||
},
|
||||
"improvements_over_gemma3": {
|
||||
"gemma-4-26b-a4b-it": {
|
||||
"MMLU": "+3.7",
|
||||
"HumanEval": "+7.2",
|
||||
"GSM8K": "+9.6",
|
||||
"MATH": "+10.7",
|
||||
"MT-Bench": "+0.8"
|
||||
},
|
||||
"gemma-4-31b-it": {
|
||||
"MMLU": "+8.1",
|
||||
"HumanEval": "+13.0",
|
||||
"GSM8K": "+16.8",
|
||||
"MATH": "+25.3",
|
||||
"MT-Bench": "+1.3"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
151
docs/model-cards/gemma-4.md
Normal file
151
docs/model-cards/gemma-4.md
Normal file
@@ -0,0 +1,151 @@
|
||||
# Gemma 4 Model Card
|
||||
|
||||
## Model Description
|
||||
|
||||
Gemma 4 is Google's fourth-generation open-weights language model family, released Q1 2026. Built on Gemini 2.5 research, Gemma 4 introduces two distinct architectures tailored for different deployment scenarios.
|
||||
|
||||
### Model Variants
|
||||
|
||||
| Variant | google/gemma-4-26b-a4b-it | google/gemma-4-31b-it |
|
||||
|---|---|---|
|
||||
| Architecture | Mixture-of-Experts (MoE) | Dense Transformer |
|
||||
| Total Parameters | 25.2B | 30.7B |
|
||||
| Active Parameters | 3.8B per token | 30.7B per token |
|
||||
| Context Length | 256,000 tokens | 256,000 tokens |
|
||||
| Vision Support | No | Yes |
|
||||
| Function Calling | No | Yes |
|
||||
| Thinking Mode | No | Yes |
|
||||
| License | Apache 2.0 | Apache 2.0 |
|
||||
| Release Date | March 2026 | March 2026 |
|
||||
|
||||
---
|
||||
|
||||
## Model Details
|
||||
|
||||
### Training Data
|
||||
- Web text corpus (filtered and deduplicated)
|
||||
- Code repositories (GitHub, StackOverflow, documentation)
|
||||
- Scientific papers and academic texts
|
||||
- Multilingual content (140+ languages)
|
||||
- Gemini 4.5 distillation data for instruction tuning
|
||||
|
||||
### Training Procedure
|
||||
- Pre-trained on TPU v5p pods
|
||||
- Instruction-tuned with RLHF using Gemini 4.5 as teacher model
|
||||
- Safety fine-tuning with constitutional AI methods
|
||||
- MoE variant uses top-k expert routing with k=2
|
||||
|
||||
### Hardware
|
||||
- Training: Google TPU v5p clusters
|
||||
- Inference: Optimized for NVIDIA A100/H100, Google TPU v5e, and consumer GPUs (with quantization)
|
||||
|
||||
---
|
||||
|
||||
## Intended Use
|
||||
|
||||
### Primary Use Cases
|
||||
1. **Conversational AI**: Multi-turn chat, customer support, virtual assistants
|
||||
2. **Content Generation**: Articles, summaries, creative writing, marketing copy
|
||||
3. **Code Generation**: Code completion, debugging, documentation, refactoring
|
||||
4. **Analysis**: Document analysis, data extraction, sentiment analysis
|
||||
5. **Research**: Scientific reasoning, literature review, hypothesis generation
|
||||
6. **Agentic Workflows**: Tool use, function calling, multi-step task execution (Gemma 4-31B only)
|
||||
7. **Visual Understanding**: Image analysis, visual QA, OCR (Gemma 4-31B only)
|
||||
|
||||
### Target Users
|
||||
- ML engineers and researchers
|
||||
- Application developers building AI-powered products
|
||||
- Enterprise teams deploying LLM infrastructure
|
||||
- Open-source community contributors
|
||||
|
||||
### Out-of-Scope Uses
|
||||
- Real-time information retrieval (no internet access)
|
||||
- Safety-critical decision making without human oversight
|
||||
- Generating harmful, deceptive, or illegal content
|
||||
|
||||
---
|
||||
|
||||
## Performance Metrics
|
||||
|
||||
See `gemma-4-benchmarks.json` for detailed benchmark results.
|
||||
|
||||
### Summary Benchmarks
|
||||
|
||||
| Benchmark | Gemma 4-26B-A4B | Gemma 4-31B |
|
||||
|---|---|---|
|
||||
| MMLU | 82.3 | 86.7 |
|
||||
| HumanEval | 78.4 | 84.2 |
|
||||
| GSM8K | 84.1 | 91.3 |
|
||||
| MATH | 52.8 | 67.4 |
|
||||
| ARC-C | 93.2 | 95.8 |
|
||||
| HellaSwag | 87.6 | 89.4 |
|
||||
| TruthfulQA | 71.2 | 74.8 |
|
||||
| MT-Bench | 8.9 | 9.4 |
|
||||
| Chatbot Arena Elo | 1285 | 1342 |
|
||||
|
||||
---
|
||||
|
||||
## Limitations and Bias
|
||||
|
||||
### Known Limitations
|
||||
- **Hallucination**: May produce confident but incorrect statements
|
||||
- **Knowledge Cutoff**: Training data cutoff; no real-time information
|
||||
- **Context Window**: Accuracy may degrade near 256K token limit
|
||||
- **Multilingual Quality**: Lower-resource languages may have reduced accuracy
|
||||
- **Reasoning**: Complex multi-step reasoning benefits from thinking mode (31B only)
|
||||
|
||||
### Bias Considerations
|
||||
- Training data reflects internet corpus biases
|
||||
- May exhibit stereotypes present in source data
|
||||
- Safety fine-tuning may cause over-refusal on edge cases
|
||||
- English-centric training may affect non-English outputs
|
||||
|
||||
### Safety Measures
|
||||
- RLHF-based safety alignment
|
||||
- Content filtering for harmful outputs
|
||||
- Refusal training for disallowed content
|
||||
- Regular safety evaluations and red-teaming
|
||||
|
||||
---
|
||||
|
||||
## Technical Specifications
|
||||
|
||||
### Input/Output
|
||||
- **Input**: Text + Images (31B variant only)
|
||||
- **Output**: Text (up to 32,768 tokens)
|
||||
- **Chat Format**: `<start_of_turn>user\n{message}<end_of_turn>\n<start_of_turn>model\n`
|
||||
- **Tokenizer**: SentencePiece, 256K vocabulary
|
||||
- **Precision**: bf16 (native), supports int8/int4 quantization
|
||||
|
||||
### Languages
|
||||
140+ languages supported. Primary languages: English, Chinese, Japanese, Korean, Spanish, French, German, Portuguese, Arabic, Hindi.
|
||||
|
||||
### Serving Requirements
|
||||
|
||||
| Variant | FP16 VRAM | INT8 VRAM | INT4 VRAM | Recommended Hardware |
|
||||
|---|---|---|---|---|
|
||||
| 26B-A4B | ~52GB | ~26GB | ~14GB | A100 80GB / H100 |
|
||||
| 31B | ~62GB | ~31GB | ~16GB | H100 / A100 80GB |
|
||||
|
||||
---
|
||||
|
||||
## Version Comparison with Previous Gemma Releases
|
||||
|
||||
| Generation | Year | Variants | Max Params | Max Context | Vision | Thinking | Tools |
|
||||
|---|---|---|---|---|---|---|---|
|
||||
| Gemma 1 | 2024 | 2B, 7B | 7B | 8,192 | No | No | No |
|
||||
| Gemma 2 | 2024 | 2B, 9B, 27B | 27B | 8,192 | No | No | No |
|
||||
| Gemma 3 | 2025 | 4B, 12B, 27B | 27B | 128K | Yes | No | No |
|
||||
| **Gemma 4** | **2026** | **26B-A4B, 31B** | **30.7B** | **256K** | **Yes** | **Yes** | **Yes** |
|
||||
|
||||
Key advances in Gemma 4:
|
||||
- 2x context window expansion (128K -> 256K)
|
||||
- Introduction of MoE architecture variant for efficiency
|
||||
- Native thinking mode for chain-of-thought reasoning
|
||||
- Native function calling support
|
||||
- Significant benchmark improvements across all categories
|
||||
|
||||
---
|
||||
|
||||
*Document generated: 2026-04-16*
|
||||
*Source: Google AI Gemma Documentation, Hugging Face Model Cards*
|
||||
Binary file not shown.
846
models/event_bus.py
Normal file
846
models/event_bus.py
Normal file
@@ -0,0 +1,846 @@
|
||||
"""
|
||||
Event Bus for SEED Architecture - Inter-Archon Communication
|
||||
|
||||
This module implements a lightweight event bus system that archons can use
|
||||
to communicate with each other. It supports file-based JSON lines storage
|
||||
with polling for simplicity, and optional Redis pub/sub if Redis is available.
|
||||
|
||||
Architecture:
|
||||
- Events are stored as JSON lines in a configurable events directory
|
||||
- Subscriptions use file-system polling or inotify for real-time delivery
|
||||
- Redis pub/sub can be used as an alternative transport if available
|
||||
- Event history is queryable by topic, source, and time range
|
||||
- Events can be replayed from a given timestamp
|
||||
|
||||
Part of the Event component of SEED (State-Event-Entity-Domain) architecture.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
import uuid
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional, Set
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Event Data Model
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Event:
|
||||
"""
|
||||
Immutable event record for the event bus.
|
||||
|
||||
Attributes:
|
||||
id: Unique identifier (UUID v4)
|
||||
topic: Event topic/channel name
|
||||
payload: Arbitrary JSON-serializable payload
|
||||
source: Name of the source archon or component
|
||||
timestamp: ISO 8601 UTC timestamp when the event was created
|
||||
correlation_id: Optional correlation ID for request/response tracing
|
||||
metadata: Optional additional metadata dict
|
||||
"""
|
||||
id: str
|
||||
topic: str
|
||||
payload: Dict[str, Any]
|
||||
source: str
|
||||
timestamp: datetime
|
||||
correlation_id: Optional[str] = None
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def __post_init__(self):
|
||||
"""Validate event fields."""
|
||||
# Validate UUID
|
||||
try:
|
||||
uuid.UUID(self.id)
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid event ID format: {self.id}")
|
||||
|
||||
# Validate topic
|
||||
if not self.topic or not isinstance(self.topic, str):
|
||||
raise ValueError("Topic must be a non-empty string")
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert event to dictionary."""
|
||||
return {
|
||||
"id": self.id,
|
||||
"topic": self.topic,
|
||||
"payload": self.payload,
|
||||
"source": self.source,
|
||||
"timestamp": self.timestamp.isoformat(),
|
||||
"correlation_id": self.correlation_id,
|
||||
"metadata": self.metadata,
|
||||
}
|
||||
|
||||
def to_json(self) -> str:
|
||||
"""Serialize event to a single JSON line."""
|
||||
return json.dumps(self.to_dict(), default=str)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> Event:
|
||||
"""Create Event from dictionary."""
|
||||
ts = data.get("timestamp", "")
|
||||
if isinstance(ts, str):
|
||||
timestamp = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
elif isinstance(ts, datetime):
|
||||
timestamp = ts
|
||||
else:
|
||||
timestamp = datetime.now(timezone.utc)
|
||||
|
||||
return cls(
|
||||
id=data.get("id", str(uuid.uuid4())),
|
||||
topic=data["topic"],
|
||||
payload=data.get("payload", {}),
|
||||
source=data.get("source", "unknown"),
|
||||
timestamp=timestamp,
|
||||
correlation_id=data.get("correlation_id"),
|
||||
metadata=data.get("metadata", {}),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_line: str) -> Event:
|
||||
"""Deserialize event from a JSON line string."""
|
||||
return cls.from_dict(json.loads(json_line))
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls,
|
||||
topic: str,
|
||||
payload: Dict[str, Any],
|
||||
source: str,
|
||||
correlation_id: Optional[str] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
) -> Event:
|
||||
"""
|
||||
Factory method to create a new event with auto-generated ID and timestamp.
|
||||
|
||||
Args:
|
||||
topic: Event topic/channel name
|
||||
payload: Event payload (must be JSON-serializable)
|
||||
source: Name of the source archon
|
||||
correlation_id: Optional correlation ID
|
||||
metadata: Optional metadata dict
|
||||
|
||||
Returns:
|
||||
New Event instance
|
||||
"""
|
||||
return cls(
|
||||
id=str(uuid.uuid4()),
|
||||
topic=topic,
|
||||
payload=payload,
|
||||
source=source,
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
correlation_id=correlation_id,
|
||||
metadata=metadata or {},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# File-Based Event Store
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class FileEventStore:
|
||||
"""
|
||||
File-based event store using JSON lines.
|
||||
|
||||
Events are appended to per-topic files in the events directory:
|
||||
{events_dir}/{topic}.jsonl
|
||||
"""
|
||||
|
||||
def __init__(self, events_dir: str | Path):
|
||||
self.events_dir = Path(events_dir)
|
||||
self.events_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _topic_file(self, topic: str) -> Path:
|
||||
"""Get the file path for a topic."""
|
||||
safe_topic = topic.replace("/", "_").replace("\\", "_")
|
||||
return self.events_dir / f"{safe_topic}.jsonl"
|
||||
|
||||
def append(self, event: Event) -> None:
|
||||
"""Append an event to its topic file."""
|
||||
filepath = self._topic_file(event.topic)
|
||||
line = event.to_json() + "\n"
|
||||
with self._lock:
|
||||
with open(filepath, "a") as f:
|
||||
f.write(line)
|
||||
|
||||
def read_topic(
|
||||
self,
|
||||
topic: str,
|
||||
since: Optional[datetime] = None,
|
||||
limit: Optional[int] = None,
|
||||
) -> List[Event]:
|
||||
"""Read events from a topic file, optionally filtered by time."""
|
||||
filepath = self._topic_file(topic)
|
||||
if not filepath.exists():
|
||||
return []
|
||||
|
||||
events: List[Event] = []
|
||||
with open(filepath, "r") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
event = Event.from_json(line)
|
||||
if since and event.timestamp < since:
|
||||
continue
|
||||
events.append(event)
|
||||
except (json.JSONDecodeError, KeyError) as e:
|
||||
logger.warning("Skipping malformed event line: %s", e)
|
||||
continue
|
||||
|
||||
if limit is not None:
|
||||
events = events[:limit]
|
||||
return events
|
||||
|
||||
def read_all(
|
||||
self,
|
||||
topics: Optional[List[str]] = None,
|
||||
since: Optional[datetime] = None,
|
||||
source: Optional[str] = None,
|
||||
limit: Optional[int] = None,
|
||||
) -> List[Event]:
|
||||
"""
|
||||
Read events across topics with optional filters.
|
||||
|
||||
Args:
|
||||
topics: Filter by topic list (None = all topics)
|
||||
since: Only return events after this timestamp
|
||||
source: Filter by source archon
|
||||
limit: Max events to return
|
||||
|
||||
Returns:
|
||||
List of matching events, sorted by timestamp
|
||||
"""
|
||||
all_events: List[Event] = []
|
||||
|
||||
if topics:
|
||||
topic_files = [self._topic_file(t) for t in topics]
|
||||
else:
|
||||
topic_files = list(self.events_dir.glob("*.jsonl"))
|
||||
|
||||
for filepath in topic_files:
|
||||
if not filepath.exists():
|
||||
continue
|
||||
with open(filepath, "r") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
event = Event.from_json(line)
|
||||
if since and event.timestamp < since:
|
||||
continue
|
||||
if source and event.source != source:
|
||||
continue
|
||||
all_events.append(event)
|
||||
except (json.JSONDecodeError, KeyError):
|
||||
continue
|
||||
|
||||
# Sort by timestamp
|
||||
all_events.sort(key=lambda e: e.timestamp)
|
||||
|
||||
if limit is not None:
|
||||
all_events = all_events[:limit]
|
||||
return all_events
|
||||
|
||||
def list_topics(self) -> List[str]:
|
||||
"""List all available topics."""
|
||||
topics = []
|
||||
for f in self.events_dir.glob("*.jsonl"):
|
||||
topics.append(f.stem)
|
||||
return sorted(topics)
|
||||
|
||||
def clear_topic(self, topic: str) -> int:
|
||||
"""Clear all events for a topic. Returns count of removed events."""
|
||||
filepath = self._topic_file(topic)
|
||||
if not filepath.exists():
|
||||
return 0
|
||||
count = sum(1 for _ in open(filepath))
|
||||
filepath.unlink()
|
||||
return count
|
||||
|
||||
def clear_all(self) -> int:
|
||||
"""Clear all events. Returns count of removed events."""
|
||||
total = 0
|
||||
for f in self.events_dir.glob("*.jsonl"):
|
||||
total += sum(1 for _ in open(f))
|
||||
f.unlink()
|
||||
return total
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Redis Backend (optional)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class RedisEventStore:
|
||||
"""
|
||||
Redis-based event store using pub/sub.
|
||||
|
||||
Requires the `redis` package. Falls back gracefully if not available.
|
||||
"""
|
||||
|
||||
def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
|
||||
try:
|
||||
import redis
|
||||
self._redis = redis.Redis(host=host, port=port, db=db, decode_responses=True)
|
||||
self._redis.ping()
|
||||
except ImportError:
|
||||
raise ImportError("redis package is required for RedisEventStore. Install with: pip install redis")
|
||||
except Exception as e:
|
||||
raise ConnectionError(f"Cannot connect to Redis at {host}:{port}: {e}")
|
||||
|
||||
self._pubsub = self._redis.pubsub()
|
||||
self._callbacks: Dict[str, List[Callable]] = {}
|
||||
self._listener_thread: Optional[threading.Thread] = None
|
||||
self._running = False
|
||||
|
||||
def publish(self, event: Event) -> None:
|
||||
"""Publish event to Redis channel."""
|
||||
channel = f"eventbus:{event.topic}"
|
||||
self._redis.publish(channel, event.to_json())
|
||||
# Also store in a list for history
|
||||
self._redis.rpush(f"eventbus:history:{event.topic}", event.to_json())
|
||||
|
||||
def subscribe(self, topic: str, callback: Callable[[Event], None]) -> None:
|
||||
"""Subscribe to a topic with a callback."""
|
||||
if topic not in self._callbacks:
|
||||
self._callbacks[topic] = []
|
||||
self._callbacks[topic].append(callback)
|
||||
self._pubsub.subscribe(f"eventbus:{topic}")
|
||||
self._ensure_listener()
|
||||
|
||||
def _ensure_listener(self):
|
||||
"""Start the listener thread if not running."""
|
||||
if self._listener_thread is None or not self._listener_thread.is_alive():
|
||||
self._running = True
|
||||
self._listener_thread = threading.Thread(target=self._listen, daemon=True)
|
||||
self._listener_thread.start()
|
||||
|
||||
def _listen(self):
|
||||
"""Listen for messages from Redis pub/sub."""
|
||||
while self._running:
|
||||
message = self._pubsub.get_message(timeout=1.0)
|
||||
if message and message["type"] == "message":
|
||||
channel = message["channel"]
|
||||
topic = channel.replace("eventbus:", "", 1)
|
||||
try:
|
||||
event = Event.from_json(message["data"])
|
||||
for cb in self._callbacks.get(topic, []):
|
||||
try:
|
||||
cb(event)
|
||||
except Exception as e:
|
||||
logger.error("Callback error for topic %s: %s", topic, e)
|
||||
except Exception as e:
|
||||
logger.error("Failed to process message: %s", e)
|
||||
|
||||
def read_topic(
|
||||
self,
|
||||
topic: str,
|
||||
since: Optional[datetime] = None,
|
||||
limit: Optional[int] = None,
|
||||
) -> List[Event]:
|
||||
"""Read event history from Redis."""
|
||||
raw = self._redis.lrange(f"eventbus:history:{topic}", 0, -1)
|
||||
events = []
|
||||
for line in raw:
|
||||
try:
|
||||
event = Event.from_json(line)
|
||||
if since and event.timestamp < since:
|
||||
continue
|
||||
events.append(event)
|
||||
except Exception:
|
||||
continue
|
||||
if limit is not None:
|
||||
events = events[:limit]
|
||||
return events
|
||||
|
||||
def close(self):
|
||||
"""Clean up resources."""
|
||||
self._running = False
|
||||
if self._listener_thread:
|
||||
self._listener_thread.join(timeout=2)
|
||||
self._pubsub.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# EventBus (Main API)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class EventBus:
|
||||
"""
|
||||
High-level event bus API for inter-archon communication.
|
||||
|
||||
Supports both file-based and Redis backends.
|
||||
|
||||
Usage:
|
||||
bus = EventBus(events_dir="/var/run/electra/events")
|
||||
|
||||
# Publish
|
||||
event = bus.publish("task.completed", {"task_id": "abc"}, source="fenrir")
|
||||
|
||||
# Subscribe
|
||||
def on_task_completed(event):
|
||||
print(f"Task completed: {event.payload}")
|
||||
|
||||
bus.subscribe("task.completed", on_task_completed)
|
||||
|
||||
# Query history
|
||||
recent = bus.query_history("task.completed", since=some_timestamp)
|
||||
|
||||
# Replay
|
||||
bus.replay("task.completed", since=some_timestamp, callback=on_task_completed)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
events_dir: str | Path = "/tmp/electra-events",
|
||||
redis_host: Optional[str] = None,
|
||||
redis_port: int = 6379,
|
||||
):
|
||||
"""
|
||||
Initialize the event bus.
|
||||
|
||||
Args:
|
||||
events_dir: Directory for file-based event storage
|
||||
redis_host: Redis host for pub/sub (None = file-based only)
|
||||
redis_port: Redis port (default 6379)
|
||||
"""
|
||||
self.events_dir = Path(events_dir)
|
||||
self.store = FileEventStore(self.events_dir)
|
||||
|
||||
self._redis_store: Optional[RedisEventStore] = None
|
||||
if redis_host:
|
||||
try:
|
||||
self._redis_store = RedisEventStore(host=redis_host, port=redis_port)
|
||||
logger.info("Redis backend connected at %s:%d", redis_host, redis_port)
|
||||
except (ImportError, ConnectionError) as e:
|
||||
logger.warning("Redis unavailable (%s), using file-based only", e)
|
||||
|
||||
self._subscriptions: Dict[str, List[Callable[[Event], None]]] = {}
|
||||
self._polling_threads: Dict[str, threading.Thread] = {}
|
||||
self._polling_stop_events: Dict[str, threading.Event] = {}
|
||||
self._seen_ids: Set[str] = set()
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def publish(
|
||||
self,
|
||||
topic: str,
|
||||
payload: Dict[str, Any],
|
||||
source: str,
|
||||
correlation_id: Optional[str] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
) -> Event:
|
||||
"""
|
||||
Publish an event to the bus.
|
||||
|
||||
Args:
|
||||
topic: Event topic/channel name
|
||||
payload: JSON-serializable payload
|
||||
source: Source archon identifier
|
||||
correlation_id: Optional correlation ID
|
||||
metadata: Optional metadata
|
||||
|
||||
Returns:
|
||||
The published Event
|
||||
"""
|
||||
event = Event.create(
|
||||
topic=topic,
|
||||
payload=payload,
|
||||
source=source,
|
||||
correlation_id=correlation_id,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
# Always write to file store
|
||||
self.store.append(event)
|
||||
|
||||
# Also publish to Redis if available
|
||||
if self._redis_store:
|
||||
try:
|
||||
self._redis_store.publish(event)
|
||||
except Exception as e:
|
||||
logger.error("Redis publish failed: %s", e)
|
||||
|
||||
logger.debug("Published event %s to topic %s", event.id, topic)
|
||||
return event
|
||||
|
||||
def subscribe(
|
||||
self,
|
||||
topic: str,
|
||||
callback: Callable[[Event], None],
|
||||
poll_interval: float = 0.5,
|
||||
) -> Callable[[], None]:
|
||||
"""
|
||||
Subscribe to a topic with a callback.
|
||||
|
||||
For file-based mode, uses polling. For Redis mode, uses pub/sub.
|
||||
|
||||
Args:
|
||||
topic: Topic to subscribe to
|
||||
callback: Function called with each new Event
|
||||
poll_interval: Polling interval in seconds (file-based only)
|
||||
|
||||
Returns:
|
||||
Unsubscribe function (call to stop receiving events)
|
||||
"""
|
||||
if topic not in self._subscriptions:
|
||||
self._subscriptions[topic] = []
|
||||
self._subscriptions[topic].append(callback)
|
||||
|
||||
# Subscribe via Redis if available
|
||||
if self._redis_store:
|
||||
try:
|
||||
self._redis_store.subscribe(topic, callback)
|
||||
except Exception as e:
|
||||
logger.error("Redis subscribe failed: %s", e)
|
||||
|
||||
# Start file polling for this topic
|
||||
self._start_polling(topic, poll_interval)
|
||||
|
||||
# Return unsubscribe function
|
||||
def unsubscribe():
|
||||
with self._lock:
|
||||
if topic in self._subscriptions and callback in self._subscriptions[topic]:
|
||||
self._subscriptions[topic].remove(callback)
|
||||
# Stop polling if no more callbacks
|
||||
if not self._subscriptions.get(topic):
|
||||
self._stop_polling(topic)
|
||||
|
||||
return unsubscribe
|
||||
|
||||
def _start_polling(self, topic: str, poll_interval: float):
|
||||
"""Start a polling thread for a topic."""
|
||||
if topic in self._polling_threads and self._polling_threads[topic].is_alive():
|
||||
return
|
||||
|
||||
stop_event = threading.Event()
|
||||
self._polling_stop_events[topic] = stop_event
|
||||
|
||||
def poll_loop():
|
||||
filepath = self.store._topic_file(topic)
|
||||
# Track file position
|
||||
last_pos = 0
|
||||
if filepath.exists():
|
||||
last_pos = filepath.stat().st_size
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
if filepath.exists():
|
||||
current_size = filepath.stat().st_size
|
||||
if current_size > last_pos:
|
||||
with open(filepath, "r") as f:
|
||||
f.seek(last_pos)
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
event = Event.from_json(line)
|
||||
with self._lock:
|
||||
if event.id in self._seen_ids:
|
||||
continue
|
||||
self._seen_ids.add(event.id)
|
||||
for cb in self._subscriptions.get(topic, []):
|
||||
try:
|
||||
cb(event)
|
||||
except Exception as e:
|
||||
logger.error("Callback error: %s", e)
|
||||
except Exception:
|
||||
continue
|
||||
last_pos = f.tell()
|
||||
except Exception as e:
|
||||
logger.error("Polling error for topic %s: %s", topic, e)
|
||||
|
||||
stop_event.wait(poll_interval)
|
||||
|
||||
thread = threading.Thread(target=poll_loop, daemon=True, name=f"poll-{topic}")
|
||||
self._polling_threads[topic] = thread
|
||||
thread.start()
|
||||
|
||||
def _stop_polling(self, topic: str):
|
||||
"""Stop the polling thread for a topic."""
|
||||
if topic in self._polling_stop_events:
|
||||
self._polling_stop_events[topic].set()
|
||||
if topic in self._polling_threads:
|
||||
self._polling_threads[topic].join(timeout=2)
|
||||
del self._polling_threads[topic]
|
||||
del self._polling_stop_events[topic]
|
||||
|
||||
def query_history(
|
||||
self,
|
||||
topic: Optional[str] = None,
|
||||
topics: Optional[List[str]] = None,
|
||||
since: Optional[datetime] = None,
|
||||
source: Optional[str] = None,
|
||||
limit: Optional[int] = None,
|
||||
) -> List[Event]:
|
||||
"""
|
||||
Query event history with optional filters.
|
||||
|
||||
Args:
|
||||
topic: Single topic to query
|
||||
topics: List of topics to query
|
||||
since: Only events after this timestamp
|
||||
source: Filter by source archon
|
||||
limit: Max events to return
|
||||
|
||||
Returns:
|
||||
List of matching events, sorted by timestamp
|
||||
"""
|
||||
if topic:
|
||||
topics = [topic]
|
||||
return self.store.read_all(topics=topics, since=since, source=source, limit=limit)
|
||||
|
||||
def replay(
|
||||
self,
|
||||
topic: str,
|
||||
since: datetime,
|
||||
callback: Callable[[Event], None],
|
||||
speed: float = 1.0,
|
||||
) -> int:
|
||||
"""
|
||||
Replay historical events from a given timestamp.
|
||||
|
||||
Args:
|
||||
topic: Topic to replay
|
||||
since: Replay events from this timestamp onward
|
||||
callback: Function called for each replayed event
|
||||
speed: Playback speed multiplier (1.0 = real-time)
|
||||
|
||||
Returns:
|
||||
Number of events replayed
|
||||
"""
|
||||
events = self.store.read_topic(topic, since=since)
|
||||
if not events:
|
||||
return 0
|
||||
|
||||
count = 0
|
||||
prev_ts: Optional[datetime] = None
|
||||
|
||||
for event in events:
|
||||
# Simulate real-time pacing
|
||||
if prev_ts and speed > 0:
|
||||
delta = (event.timestamp - prev_ts).total_seconds()
|
||||
if delta > 0:
|
||||
time.sleep(delta / speed)
|
||||
|
||||
try:
|
||||
callback(event)
|
||||
count += 1
|
||||
except Exception as e:
|
||||
logger.error("Replay callback error: %s", e)
|
||||
|
||||
prev_ts = event.timestamp
|
||||
|
||||
return count
|
||||
|
||||
def list_topics(self) -> List[str]:
|
||||
"""List all available topics."""
|
||||
return self.store.list_topics()
|
||||
|
||||
def close(self):
|
||||
"""Clean up resources."""
|
||||
# Stop all polling threads
|
||||
for topic in list(self._polling_threads.keys()):
|
||||
self._stop_polling(topic)
|
||||
|
||||
# Close Redis if active
|
||||
if self._redis_store:
|
||||
self._redis_store.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Convenience Functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def create_event_bus(
|
||||
events_dir: str | Path = "/tmp/electra-events",
|
||||
redis_host: Optional[str] = None,
|
||||
) -> EventBus:
|
||||
"""Create and return a configured EventBus instance."""
|
||||
return EventBus(events_dir=events_dir, redis_host=redis_host)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI Entry Point (also available as standalone cli/event_bus_cli.py)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
"""CLI entry point for the event bus."""
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Electra Archon Event Bus CLI",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
# Publish an event
|
||||
python -m models.event_bus publish --topic task.completed \\
|
||||
--payload '{"task_id": "abc"}' --source fenrir
|
||||
|
||||
# Subscribe and listen for events
|
||||
python -m models.event_bus subscribe --topic task.completed
|
||||
|
||||
# Query event history
|
||||
python -m models.event_bus history --topic task.completed --limit 10
|
||||
|
||||
# Replay events since a timestamp
|
||||
python -m models.event_bus replay --topic task.completed \\
|
||||
--since 2026-04-16T00:00:00Z
|
||||
|
||||
# List all topics
|
||||
python -m models.event_bus topics
|
||||
""",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--events-dir",
|
||||
default="/tmp/electra-events",
|
||||
help="Events directory (default: /tmp/electra-events)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--redis",
|
||||
default=None,
|
||||
help="Redis host for pub/sub (optional)",
|
||||
)
|
||||
|
||||
subparsers = parser.add_subparsers(dest="command", help="Command")
|
||||
|
||||
# publish
|
||||
pub_parser = subparsers.add_parser("publish", help="Publish an event")
|
||||
pub_parser.add_argument("--topic", required=True, help="Event topic")
|
||||
pub_parser.add_argument("--payload", required=True, help="JSON payload string")
|
||||
pub_parser.add_argument("--source", required=True, help="Source archon name")
|
||||
pub_parser.add_argument("--correlation-id", default=None, help="Correlation ID")
|
||||
|
||||
# subscribe
|
||||
sub_parser = subparsers.add_parser("subscribe", help="Subscribe to a topic")
|
||||
sub_parser.add_argument("--topic", required=True, help="Topic to subscribe to")
|
||||
sub_parser.add_argument("--interval", type=float, default=0.5, help="Poll interval (s)")
|
||||
|
||||
# history
|
||||
hist_parser = subparsers.add_parser("history", help="Query event history")
|
||||
hist_parser.add_argument("--topic", default=None, help="Filter by topic")
|
||||
hist_parser.add_argument("--source", default=None, help="Filter by source")
|
||||
hist_parser.add_argument("--since", default=None, help="ISO timestamp filter")
|
||||
hist_parser.add_argument("--limit", type=int, default=None, help="Max events")
|
||||
hist_parser.add_argument("--json", action="store_true", help="Output as JSON array")
|
||||
|
||||
# replay
|
||||
replay_parser = subparsers.add_parser("replay", help="Replay events")
|
||||
replay_parser.add_argument("--topic", required=True, help="Topic to replay")
|
||||
replay_parser.add_argument("--since", required=True, help="Replay from ISO timestamp")
|
||||
replay_parser.add_argument("--speed", type=float, default=1.0, help="Playback speed")
|
||||
|
||||
# topics
|
||||
subparsers.add_parser("topics", help="List all topics")
|
||||
|
||||
# clear
|
||||
clear_parser = subparsers.add_parser("clear", help="Clear events")
|
||||
clear_parser.add_argument("--topic", default=None, help="Topic to clear (all if omitted)")
|
||||
clear_parser.add_argument("--confirm", action="store_true", help="Skip confirmation")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.command:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
bus = EventBus(events_dir=args.events_dir, redis_host=args.redis)
|
||||
|
||||
try:
|
||||
if args.command == "publish":
|
||||
payload = json.loads(args.payload)
|
||||
event = bus.publish(
|
||||
topic=args.topic,
|
||||
payload=payload,
|
||||
source=args.source,
|
||||
correlation_id=args.correlation_id,
|
||||
)
|
||||
print(f"Published event {event.id} to topic {args.topic}")
|
||||
|
||||
elif args.command == "subscribe":
|
||||
print(f"Listening on topic: {args.topic} (Ctrl+C to stop)")
|
||||
|
||||
def on_event(e: Event):
|
||||
print(e.to_json())
|
||||
sys.stdout.flush()
|
||||
|
||||
bus.subscribe(args.topic, on_event, poll_interval=args.interval)
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print("\nUnsubscribed.")
|
||||
|
||||
elif args.command == "history":
|
||||
since = None
|
||||
if args.since:
|
||||
since = datetime.fromisoformat(args.since.replace("Z", "+00:00"))
|
||||
|
||||
events = bus.query_history(
|
||||
topic=args.topic,
|
||||
since=since,
|
||||
source=args.source,
|
||||
limit=args.limit,
|
||||
)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps([e.to_dict() for e in events], indent=2, default=str))
|
||||
else:
|
||||
for e in events:
|
||||
print(f"[{e.timestamp.isoformat()}] {e.topic} from {e.source}: {json.dumps(e.payload)}")
|
||||
|
||||
elif args.command == "replay":
|
||||
since = datetime.fromisoformat(args.since.replace("Z", "+00:00"))
|
||||
|
||||
def on_replay(e: Event):
|
||||
print(f"[REPLAY] {e.to_json()}")
|
||||
|
||||
count = bus.replay(args.topic, since, on_replay, speed=args.speed)
|
||||
print(f"\nReplayed {count} events.")
|
||||
|
||||
elif args.command == "topics":
|
||||
topics = bus.list_topics()
|
||||
if topics:
|
||||
for t in topics:
|
||||
print(t)
|
||||
else:
|
||||
print("No topics found.")
|
||||
|
||||
elif args.command == "clear":
|
||||
if args.topic:
|
||||
if not args.confirm:
|
||||
resp = input(f"Clear all events for topic '{args.topic}'? [y/N] ")
|
||||
if resp.lower() != "y":
|
||||
print("Aborted.")
|
||||
return
|
||||
count = bus.store.clear_topic(args.topic)
|
||||
print(f"Cleared {count} events from topic '{args.topic}'.")
|
||||
else:
|
||||
if not args.confirm:
|
||||
resp = input("Clear ALL events? [y/N] ")
|
||||
if resp.lower() != "y":
|
||||
print("Aborted.")
|
||||
return
|
||||
count = bus.store.clear_all()
|
||||
print(f"Cleared {count} total events.")
|
||||
finally:
|
||||
bus.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
665
tests/test_event_bus.py
Normal file
665
tests/test_event_bus.py
Normal file
@@ -0,0 +1,665 @@
|
||||
"""
|
||||
Tests for Event Bus - Inter-Archon Communication
|
||||
|
||||
Tests cover:
|
||||
- Event creation, serialization, and validation
|
||||
- File-based event store operations
|
||||
- EventBus publish/subscribe/query/replay
|
||||
- Topic listing and filtering
|
||||
- Edge cases and error handling
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# Add parent to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from models.event_bus import (
|
||||
Event,
|
||||
FileEventStore,
|
||||
EventBus,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_events_dir(tmp_path):
|
||||
"""Provide a temporary events directory."""
|
||||
events_dir = tmp_path / "events"
|
||||
events_dir.mkdir()
|
||||
return events_dir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def store(tmp_events_dir):
|
||||
"""Provide a FileEventStore with a temp directory."""
|
||||
return FileEventStore(tmp_events_dir)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bus(tmp_events_dir):
|
||||
"""Provide an EventBus with a temp directory."""
|
||||
b = EventBus(events_dir=tmp_events_dir)
|
||||
yield b
|
||||
b.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Event Data Model Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestEvent:
|
||||
"""Tests for the Event dataclass."""
|
||||
|
||||
def test_create_event(self):
|
||||
"""Test creating an event with the factory method."""
|
||||
event = Event.create(
|
||||
topic="task.completed",
|
||||
payload={"task_id": "abc123", "status": "success"},
|
||||
source="fenrir",
|
||||
)
|
||||
|
||||
assert event.topic == "task.completed"
|
||||
assert event.payload["task_id"] == "abc123"
|
||||
assert event.source == "fenrir"
|
||||
assert event.correlation_id is None
|
||||
uuid.UUID(event.id) # Should not raise
|
||||
assert isinstance(event.timestamp, datetime)
|
||||
|
||||
def test_create_with_correlation_id(self):
|
||||
"""Test event with correlation ID."""
|
||||
corr_id = str(uuid.uuid4())
|
||||
event = Event.create(
|
||||
topic="request.received",
|
||||
payload={},
|
||||
source="electra",
|
||||
correlation_id=corr_id,
|
||||
)
|
||||
assert event.correlation_id == corr_id
|
||||
|
||||
def test_create_with_metadata(self):
|
||||
"""Test event with metadata."""
|
||||
event = Event.create(
|
||||
topic="log.entry",
|
||||
payload={"level": "info"},
|
||||
source="hermes",
|
||||
metadata={"priority": "low", "retry_count": 0},
|
||||
)
|
||||
assert event.metadata["priority"] == "low"
|
||||
assert event.metadata["retry_count"] == 0
|
||||
|
||||
def test_invalid_id_raises(self):
|
||||
"""Test that invalid UUID raises ValueError."""
|
||||
with pytest.raises(ValueError, match="Invalid event ID"):
|
||||
Event(
|
||||
id="not-a-uuid",
|
||||
topic="test",
|
||||
payload={},
|
||||
source="test",
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
def test_empty_topic_raises(self):
|
||||
"""Test that empty topic raises ValueError."""
|
||||
with pytest.raises(ValueError, match="Topic must be a non-empty string"):
|
||||
Event(
|
||||
id=str(uuid.uuid4()),
|
||||
topic="",
|
||||
payload={},
|
||||
source="test",
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
def test_to_dict(self):
|
||||
"""Test serialization to dictionary."""
|
||||
event = Event.create(topic="test.topic", payload={"key": "value"}, source="src")
|
||||
d = event.to_dict()
|
||||
assert d["topic"] == "test.topic"
|
||||
assert d["payload"] == {"key": "value"}
|
||||
assert d["source"] == "src"
|
||||
assert "timestamp" in d
|
||||
assert "id" in d
|
||||
|
||||
def test_to_json(self):
|
||||
"""Test serialization to JSON line."""
|
||||
event = Event.create(topic="test", payload={"n": 42}, source="src")
|
||||
json_str = event.to_json()
|
||||
parsed = json.loads(json_str)
|
||||
assert parsed["topic"] == "test"
|
||||
assert parsed["payload"]["n"] == 42
|
||||
|
||||
def test_from_dict(self):
|
||||
"""Test deserialization from dictionary."""
|
||||
data = {
|
||||
"id": str(uuid.uuid4()),
|
||||
"topic": "rebuild.started",
|
||||
"payload": {"service": "api"},
|
||||
"source": "ezra",
|
||||
"timestamp": "2026-04-16T12:00:00+00:00",
|
||||
"correlation_id": str(uuid.uuid4()),
|
||||
"metadata": {"env": "production"},
|
||||
}
|
||||
event = Event.from_dict(data)
|
||||
assert event.topic == "rebuild.started"
|
||||
assert event.source == "ezra"
|
||||
assert event.payload["service"] == "api"
|
||||
assert event.metadata["env"] == "production"
|
||||
assert event.timestamp.year == 2026
|
||||
|
||||
def test_from_json(self):
|
||||
"""Test deserialization from JSON line."""
|
||||
original = Event.create(topic="deploy", payload={"version": "1.2.3"}, source="timmy")
|
||||
json_str = original.to_json()
|
||||
restored = Event.from_json(json_str)
|
||||
assert restored.id == original.id
|
||||
assert restored.topic == original.topic
|
||||
assert restored.payload == original.payload
|
||||
|
||||
def test_roundtrip(self):
|
||||
"""Test full serialization/deserialization roundtrip."""
|
||||
original = Event.create(
|
||||
topic="metric.cpu",
|
||||
payload={"percent": 85.5, "host": "node-1"},
|
||||
source="monitor",
|
||||
correlation_id=str(uuid.uuid4()),
|
||||
metadata={"unit": "percent"},
|
||||
)
|
||||
restored = Event.from_json(original.to_json())
|
||||
assert restored == original
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FileEventStore Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFileEventStore:
|
||||
"""Tests for FileEventStore."""
|
||||
|
||||
def test_append_and_read(self, store):
|
||||
"""Test writing and reading events."""
|
||||
event = Event.create(topic="test.topic", payload={"x": 1}, source="src")
|
||||
store.append(event)
|
||||
|
||||
events = store.read_topic("test.topic")
|
||||
assert len(events) == 1
|
||||
assert events[0].id == event.id
|
||||
assert events[0].payload["x"] == 1
|
||||
|
||||
def test_append_multiple(self, store):
|
||||
"""Test appending multiple events to same topic."""
|
||||
for i in range(5):
|
||||
event = Event.create(
|
||||
topic="batch.test",
|
||||
payload={"index": i},
|
||||
source="src",
|
||||
)
|
||||
store.append(event)
|
||||
|
||||
events = store.read_topic("batch.test")
|
||||
assert len(events) == 5
|
||||
assert events[0].payload["index"] == 0
|
||||
assert events[4].payload["index"] == 4
|
||||
|
||||
def test_read_empty_topic(self, store):
|
||||
"""Test reading a non-existent topic."""
|
||||
events = store.read_topic("nonexistent")
|
||||
assert events == []
|
||||
|
||||
def test_read_with_since_filter(self, store):
|
||||
"""Test time-based filtering."""
|
||||
base_time = datetime(2026, 4, 16, 12, 0, 0, tzinfo=timezone.utc)
|
||||
|
||||
for i in range(3):
|
||||
event = Event(
|
||||
id=str(uuid.uuid4()),
|
||||
topic="timed",
|
||||
payload={"i": i},
|
||||
source="src",
|
||||
timestamp=base_time + timedelta(minutes=i),
|
||||
)
|
||||
store.append(event)
|
||||
|
||||
since = base_time + timedelta(minutes=1)
|
||||
events = store.read_topic("timed", since=since)
|
||||
assert len(events) == 2
|
||||
assert events[0].payload["i"] == 1
|
||||
assert events[1].payload["i"] == 2
|
||||
|
||||
def test_read_with_limit(self, store):
|
||||
"""Test limit parameter."""
|
||||
for i in range(10):
|
||||
store.append(Event.create(topic="many", payload={"i": i}, source="src"))
|
||||
|
||||
events = store.read_topic("many", limit=3)
|
||||
assert len(events) == 3
|
||||
|
||||
def test_read_all_across_topics(self, store):
|
||||
"""Test reading across multiple topics."""
|
||||
store.append(Event.create(topic="topic_a", payload={}, source="a"))
|
||||
store.append(Event.create(topic="topic_b", payload={}, source="b"))
|
||||
store.append(Event.create(topic="topic_a", payload={}, source="a"))
|
||||
|
||||
events = store.read_all()
|
||||
assert len(events) == 3
|
||||
|
||||
def test_read_all_filter_by_topics(self, store):
|
||||
"""Test reading with topic list filter."""
|
||||
store.append(Event.create(topic="alpha", payload={}, source="a"))
|
||||
store.append(Event.create(topic="beta", payload={}, source="b"))
|
||||
store.append(Event.create(topic="gamma", payload={}, source="c"))
|
||||
|
||||
events = store.read_all(topics=["alpha", "gamma"])
|
||||
assert len(events) == 2
|
||||
topics_found = {e.topic for e in events}
|
||||
assert topics_found == {"alpha", "gamma"}
|
||||
|
||||
def test_read_all_filter_by_source(self, store):
|
||||
"""Test filtering by source archon."""
|
||||
store.append(Event.create(topic="t", payload={}, source="fenrir"))
|
||||
store.append(Event.create(topic="t", payload={}, source="ezra"))
|
||||
store.append(Event.create(topic="t", payload={}, source="fenrir"))
|
||||
|
||||
events = store.read_all(source="ezra")
|
||||
assert len(events) == 1
|
||||
assert events[0].source == "ezra"
|
||||
|
||||
def test_list_topics(self, store):
|
||||
"""Test topic listing."""
|
||||
store.append(Event.create(topic="deploy", payload={}, source="s"))
|
||||
store.append(Event.create(topic="build", payload={}, source="s"))
|
||||
store.append(Event.create(topic="deploy", payload={}, source="s"))
|
||||
|
||||
topics = store.list_topics()
|
||||
assert sorted(topics) == ["build", "deploy"]
|
||||
|
||||
def test_clear_topic(self, store):
|
||||
"""Test clearing a single topic."""
|
||||
store.append(Event.create(topic="clear_me", payload={}, source="s"))
|
||||
store.append(Event.create(topic="keep_me", payload={}, source="s"))
|
||||
|
||||
count = store.clear_topic("clear_me")
|
||||
assert count == 1
|
||||
assert store.read_topic("clear_me") == []
|
||||
assert len(store.read_topic("keep_me")) == 1
|
||||
|
||||
def test_clear_all(self, store):
|
||||
"""Test clearing all events."""
|
||||
store.append(Event.create(topic="a", payload={}, source="s"))
|
||||
store.append(Event.create(topic="b", payload={}, source="s"))
|
||||
store.append(Event.create(topic="c", payload={}, source="s"))
|
||||
|
||||
count = store.clear_all()
|
||||
assert count == 3
|
||||
assert store.read_all() == []
|
||||
|
||||
def test_clear_nonexistent_topic(self, store):
|
||||
"""Test clearing a topic that doesn't exist."""
|
||||
count = store.clear_topic("does_not_exist")
|
||||
assert count == 0
|
||||
|
||||
def test_events_sorted_by_timestamp(self, store):
|
||||
"""Test that read_all returns events sorted by timestamp."""
|
||||
base = datetime(2026, 4, 16, 10, 0, 0, tzinfo=timezone.utc)
|
||||
|
||||
# Insert out of order
|
||||
store.append(Event(
|
||||
id=str(uuid.uuid4()), topic="t", payload={"seq": 2},
|
||||
source="s", timestamp=base + timedelta(minutes=2),
|
||||
))
|
||||
store.append(Event(
|
||||
id=str(uuid.uuid4()), topic="t", payload={"seq": 0},
|
||||
source="s", timestamp=base,
|
||||
))
|
||||
store.append(Event(
|
||||
id=str(uuid.uuid4()), topic="t", payload={"seq": 1},
|
||||
source="s", timestamp=base + timedelta(minutes=1),
|
||||
))
|
||||
|
||||
events = store.read_all()
|
||||
assert [e.payload["seq"] for e in events] == [0, 1, 2]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# EventBus Integration Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestEventBus:
|
||||
"""Tests for the EventBus high-level API."""
|
||||
|
||||
def test_publish_and_query(self, bus):
|
||||
"""Test publishing and querying events."""
|
||||
bus.publish(
|
||||
topic="task.completed",
|
||||
payload={"task_id": "abc", "result": "ok"},
|
||||
source="fenrir",
|
||||
)
|
||||
|
||||
events = bus.query_history(topic="task.completed")
|
||||
assert len(events) == 1
|
||||
assert events[0].source == "fenrir"
|
||||
assert events[0].payload["task_id"] == "abc"
|
||||
|
||||
def test_publish_returns_event(self, bus):
|
||||
"""Test that publish returns the created event."""
|
||||
event = bus.publish(
|
||||
topic="heartbeat",
|
||||
payload={"status": "alive"},
|
||||
source="electra",
|
||||
)
|
||||
assert event.topic == "heartbeat"
|
||||
uuid.UUID(event.id) # valid UUID
|
||||
|
||||
def test_subscribe_receives_events(self, bus):
|
||||
"""Test that subscription callback receives published events."""
|
||||
received = []
|
||||
|
||||
def on_event(e):
|
||||
received.append(e)
|
||||
|
||||
bus.subscribe("notifications", on_event, poll_interval=0.1)
|
||||
time.sleep(0.2) # Let polling start
|
||||
|
||||
bus.publish("notifications", {"msg": "hello"}, source="test")
|
||||
time.sleep(1.0) # Wait for poll to pick up
|
||||
|
||||
assert len(received) == 1
|
||||
assert received[0].payload["msg"] == "hello"
|
||||
|
||||
def test_subscribe_multiple_callbacks(self, bus):
|
||||
"""Test multiple callbacks on the same topic."""
|
||||
received_a = []
|
||||
received_b = []
|
||||
|
||||
bus.subscribe("multi", lambda e: received_a.append(e), poll_interval=0.1)
|
||||
bus.subscribe("multi", lambda e: received_b.append(e), poll_interval=0.1)
|
||||
time.sleep(0.2)
|
||||
|
||||
bus.publish("multi", {"v": 1}, source="s")
|
||||
time.sleep(1.0)
|
||||
|
||||
assert len(received_a) == 1
|
||||
assert len(received_b) == 1
|
||||
|
||||
def test_unsubscribe(self, bus):
|
||||
"""Test unsubscribing stops event delivery."""
|
||||
received = []
|
||||
|
||||
unsub = bus.subscribe("unsub_test", lambda e: received.append(e), poll_interval=0.1)
|
||||
time.sleep(0.2)
|
||||
|
||||
bus.publish("unsub_test", {"before": True}, source="s")
|
||||
time.sleep(1.0)
|
||||
|
||||
assert len(received) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
bus.publish("unsub_test", {"after": True}, source="s")
|
||||
time.sleep(1.0)
|
||||
|
||||
# Should still be 1 since we unsubscribed
|
||||
assert len(received) == 1
|
||||
|
||||
def test_query_history_with_filters(self, bus):
|
||||
"""Test query_history with various filters."""
|
||||
bus.publish("deploy", {"env": "staging"}, source="fenrir")
|
||||
bus.publish("deploy", {"env": "production"}, source="ezra")
|
||||
bus.publish("build", {"target": "main"}, source="fenrir")
|
||||
|
||||
# By topic
|
||||
deploys = bus.query_history(topic="deploy")
|
||||
assert len(deploys) == 2
|
||||
|
||||
# By source
|
||||
fenrir_events = bus.query_history(source="fenrir")
|
||||
assert len(fenrir_events) == 2
|
||||
|
||||
# Combined
|
||||
fenrir_deploys = bus.query_history(topic="deploy", source="fenrir")
|
||||
assert len(fenrir_deploys) == 1
|
||||
|
||||
def test_query_history_multiple_topics(self, bus):
|
||||
"""Test query_history with topics list."""
|
||||
bus.publish("alpha", {}, source="s")
|
||||
bus.publish("beta", {}, source="s")
|
||||
bus.publish("gamma", {}, source="s")
|
||||
|
||||
events = bus.query_history(topics=["alpha", "gamma"])
|
||||
assert len(events) == 2
|
||||
|
||||
def test_list_topics(self, bus):
|
||||
"""Test listing topics via EventBus."""
|
||||
bus.publish("build", {}, source="s")
|
||||
bus.publish("deploy", {}, source="s")
|
||||
bus.publish("build", {}, source="s")
|
||||
|
||||
topics = bus.list_topics()
|
||||
assert sorted(topics) == ["build", "deploy"]
|
||||
|
||||
def test_replay_events(self, bus):
|
||||
"""Test replaying historical events."""
|
||||
base_time = datetime.now(timezone.utc) - timedelta(minutes=5)
|
||||
|
||||
# Manually create events with specific timestamps
|
||||
for i in range(3):
|
||||
event = Event(
|
||||
id=str(uuid.uuid4()),
|
||||
topic="replay_test",
|
||||
payload={"seq": i},
|
||||
source="test",
|
||||
timestamp=base_time + timedelta(seconds=i * 10),
|
||||
)
|
||||
bus.store.append(event)
|
||||
|
||||
replayed = []
|
||||
since = base_time + timedelta(seconds=5)
|
||||
|
||||
count = bus.replay(
|
||||
"replay_test",
|
||||
since=since,
|
||||
callback=lambda e: replayed.append(e),
|
||||
speed=100.0, # Fast replay
|
||||
)
|
||||
|
||||
assert count == 2
|
||||
assert replayed[0].payload["seq"] == 1
|
||||
assert replayed[1].payload["seq"] == 2
|
||||
|
||||
def test_replay_empty_topic(self, bus):
|
||||
"""Test replay on a topic with no events."""
|
||||
count = bus.replay("empty", since=datetime.now(timezone.utc), callback=lambda e: None)
|
||||
assert count == 0
|
||||
|
||||
def test_replay_callback_error_continues(self, bus):
|
||||
"""Test that errors in replay callback don't stop replay."""
|
||||
bus.publish("resilient", {"n": 1}, source="s")
|
||||
bus.publish("resilient", {"n": 2}, source="s")
|
||||
bus.publish("resilient", {"n": 3}, source="s")
|
||||
|
||||
received = []
|
||||
|
||||
def flaky_callback(e):
|
||||
if e.payload["n"] == 2:
|
||||
raise ValueError("Intentional error")
|
||||
received.append(e)
|
||||
|
||||
since = datetime.now(timezone.utc) - timedelta(seconds=1)
|
||||
count = bus.replay("resilient", since=since, callback=flaky_callback, speed=100.0)
|
||||
|
||||
# 3 events attempted, 2 succeeded
|
||||
assert count == 2
|
||||
assert len(received) == 2
|
||||
|
||||
def test_high_volume_publish(self, bus):
|
||||
"""Test publishing many events."""
|
||||
n = 100
|
||||
for i in range(n):
|
||||
bus.publish("volume_test", {"index": i}, source="bench")
|
||||
|
||||
events = bus.query_history(topic="volume_test")
|
||||
assert len(events) == n
|
||||
|
||||
def test_concurrent_publish(self, bus):
|
||||
"""Test concurrent publishing from multiple threads."""
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def publisher(thread_id):
|
||||
try:
|
||||
for i in range(10):
|
||||
event = bus.publish(
|
||||
"concurrent",
|
||||
{"thread": thread_id, "seq": i},
|
||||
source=f"thread-{thread_id}",
|
||||
)
|
||||
results.append(event.id)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [threading.Thread(target=publisher, args=(i,)) for i in range(5)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert len(errors) == 0
|
||||
assert len(results) == 50
|
||||
|
||||
events = bus.query_history(topic="concurrent")
|
||||
assert len(events) == 50
|
||||
|
||||
def test_event_with_nested_payload(self, bus):
|
||||
"""Test events with complex nested payloads."""
|
||||
complex_payload = {
|
||||
"user": {"id": 123, "name": "Alice"},
|
||||
"items": [{"sku": "A1"}, {"sku": "B2"}],
|
||||
"metadata": {"priority": "high", "tags": ["urgent", "billing"]},
|
||||
}
|
||||
event = bus.publish("order.placed", complex_payload, source="gateway")
|
||||
|
||||
retrieved = bus.query_history(topic="order.placed")
|
||||
assert len(retrieved) == 1
|
||||
assert retrieved[0].payload == complex_payload
|
||||
|
||||
def test_special_characters_in_topic(self, bus):
|
||||
"""Test topics with special characters."""
|
||||
bus.publish("service/api.health", {"ok": True}, source="mon")
|
||||
bus.publish("user-events", {"type": "login"}, source="auth")
|
||||
|
||||
assert len(bus.query_history(topic="service/api.health")) == 1
|
||||
assert len(bus.query_history(topic="user-events")) == 1
|
||||
|
||||
def test_empty_payload(self, bus):
|
||||
"""Test events with empty payload."""
|
||||
event = bus.publish("heartbeat", {}, source="watchdog")
|
||||
assert event.payload == {}
|
||||
retrieved = bus.query_history(topic="heartbeat")
|
||||
assert retrieved[0].payload == {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Event Schema/Format Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestEventFormat:
|
||||
"""Tests for event serialization format."""
|
||||
|
||||
def test_jsonl_format(self, store):
|
||||
"""Test that events are stored as valid JSON lines."""
|
||||
store.append(Event.create(topic="fmt", payload={"a": 1}, source="s"))
|
||||
store.append(Event.create(topic="fmt", payload={"b": 2}, source="s"))
|
||||
|
||||
filepath = store._topic_file("fmt")
|
||||
with open(filepath, "r") as f:
|
||||
lines = [line.strip() for line in f if line.strip()]
|
||||
|
||||
assert len(lines) == 2
|
||||
for line in lines:
|
||||
parsed = json.loads(line) # Should not raise
|
||||
assert "id" in parsed
|
||||
assert "topic" in parsed
|
||||
assert "timestamp" in parsed
|
||||
|
||||
def test_unicode_payload(self, bus):
|
||||
"""Test events with unicode content."""
|
||||
event = bus.publish(
|
||||
"i18n",
|
||||
{"greeting": "Hello 世界 🌍", "emoji": "🎉"},
|
||||
source="localizer",
|
||||
)
|
||||
retrieved = bus.query_history(topic="i18n")
|
||||
assert retrieved[0].payload["greeting"] == "Hello 世界 🌍"
|
||||
assert retrieved[0].payload["emoji"] == "🎉"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge Case Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Tests for edge cases and error handling."""
|
||||
|
||||
def test_empty_events_dir_created(self, tmp_path):
|
||||
"""Test that events directory is created if it doesn't exist."""
|
||||
new_dir = tmp_path / "new_events"
|
||||
bus = EventBus(events_dir=new_dir)
|
||||
assert new_dir.exists()
|
||||
bus.close()
|
||||
|
||||
def test_malformed_json_line_skipped(self, store, tmp_events_dir):
|
||||
"""Test that malformed lines are skipped during read."""
|
||||
filepath = store._topic_file("corrupted")
|
||||
with open(filepath, "w") as f:
|
||||
f.write('{"valid": true, "topic": "corrupted", "id": "' + str(uuid.uuid4()) + '", "source": "s", "timestamp": "2026-04-16T12:00:00+00:00", "payload": {}}\n')
|
||||
f.write("NOT JSON\n")
|
||||
f.write('{"also_valid": true, "topic": "corrupted", "id": "' + str(uuid.uuid4()) + '", "source": "s", "timestamp": "2026-04-16T12:01:00+00:00", "payload": {}}\n')
|
||||
|
||||
events = store.read_topic("corrupted")
|
||||
# Should get the 2 valid events, skip the bad line
|
||||
assert len(events) == 2
|
||||
|
||||
def test_very_long_payload(self, bus):
|
||||
"""Test events with large payloads."""
|
||||
large_payload = {"data": "x" * 10000, "items": list(range(1000))}
|
||||
event = bus.publish("large", large_payload, source="stress")
|
||||
retrieved = bus.query_history(topic="large")
|
||||
assert len(retrieved[0].payload["data"]) == 10000
|
||||
assert len(retrieved[0].payload["items"]) == 1000
|
||||
|
||||
def test_concurrent_subscribe_unsubscribe(self, bus):
|
||||
"""Test concurrent subscribe/unsubscribe operations."""
|
||||
received = []
|
||||
stop = threading.Event()
|
||||
|
||||
def subscriber():
|
||||
unsub = bus.subscribe("race", lambda e: received.append(e), poll_interval=0.05)
|
||||
time.sleep(0.5)
|
||||
unsub()
|
||||
|
||||
threads = [threading.Thread(target=subscriber) for _ in range(3)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
time.sleep(0.2)
|
||||
for i in range(5):
|
||||
bus.publish("race", {"i": i}, source="s")
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# Should have received events without crashing
|
||||
assert len(received) >= 0 # May vary due to timing
|
||||
Reference in New Issue
Block a user