forked from Rockachopa/Timmy-time-dashboard
1
src/loop/__init__.py
Normal file
1
src/loop/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Three-phase agent loop: Gather → Reason → Act."""
|
||||
37
src/loop/phase1_gather.py
Normal file
37
src/loop/phase1_gather.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""Phase 1 — Gather: accept raw input, produce structured context.
|
||||
|
||||
This is the sensory phase. It receives a raw ContextPayload and enriches
|
||||
it with whatever context Timmy needs before reasoning. In the stub form,
|
||||
it simply passes the payload through with a phase marker.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from loop.schema import ContextPayload
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def gather(payload: ContextPayload) -> ContextPayload:
|
||||
"""Accept raw input and return structured context for reasoning.
|
||||
|
||||
Stub: tags the payload with phase=gather and logs transit.
|
||||
Timmy will flesh this out with context selection, memory lookup,
|
||||
adapter polling, and attention-residual weighting.
|
||||
"""
|
||||
logger.info(
|
||||
"Phase 1 (Gather) received: source=%s content_len=%d tokens=%d",
|
||||
payload.source,
|
||||
len(payload.content),
|
||||
payload.token_count,
|
||||
)
|
||||
|
||||
result = payload.with_metadata(phase="gather", gathered=True)
|
||||
|
||||
logger.info(
|
||||
"Phase 1 (Gather) produced: metadata_keys=%s",
|
||||
sorted(result.metadata.keys()),
|
||||
)
|
||||
return result
|
||||
36
src/loop/phase2_reason.py
Normal file
36
src/loop/phase2_reason.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""Phase 2 — Reason: accept gathered context, produce reasoning output.
|
||||
|
||||
This is the deliberation phase. It receives enriched context from Phase 1
|
||||
and decides what to do. In the stub form, it passes the payload through
|
||||
with a phase marker.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from loop.schema import ContextPayload
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def reason(payload: ContextPayload) -> ContextPayload:
|
||||
"""Accept gathered context and return a reasoning result.
|
||||
|
||||
Stub: tags the payload with phase=reason and logs transit.
|
||||
Timmy will flesh this out with LLM calls, confidence scoring,
|
||||
plan generation, and judgment logic.
|
||||
"""
|
||||
logger.info(
|
||||
"Phase 2 (Reason) received: source=%s gathered=%s",
|
||||
payload.source,
|
||||
payload.metadata.get("gathered", False),
|
||||
)
|
||||
|
||||
result = payload.with_metadata(phase="reason", reasoned=True)
|
||||
|
||||
logger.info(
|
||||
"Phase 2 (Reason) produced: metadata_keys=%s",
|
||||
sorted(result.metadata.keys()),
|
||||
)
|
||||
return result
|
||||
36
src/loop/phase3_act.py
Normal file
36
src/loop/phase3_act.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""Phase 3 — Act: accept reasoning output, execute and produce feedback.
|
||||
|
||||
This is the command phase. It receives the reasoning result from Phase 2
|
||||
and takes action. In the stub form, it passes the payload through with a
|
||||
phase marker and produces feedback for the next cycle.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from loop.schema import ContextPayload
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def act(payload: ContextPayload) -> ContextPayload:
|
||||
"""Accept reasoning result and return action output + feedback.
|
||||
|
||||
Stub: tags the payload with phase=act and logs transit.
|
||||
Timmy will flesh this out with tool execution, delegation,
|
||||
response generation, and feedback construction.
|
||||
"""
|
||||
logger.info(
|
||||
"Phase 3 (Act) received: source=%s reasoned=%s",
|
||||
payload.source,
|
||||
payload.metadata.get("reasoned", False),
|
||||
)
|
||||
|
||||
result = payload.with_metadata(phase="act", acted=True)
|
||||
|
||||
logger.info(
|
||||
"Phase 3 (Act) produced: metadata_keys=%s",
|
||||
sorted(result.metadata.keys()),
|
||||
)
|
||||
return result
|
||||
40
src/loop/runner.py
Normal file
40
src/loop/runner.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""Loop runner — orchestrates the three phases in sequence.
|
||||
|
||||
Runs Gather → Reason → Act as a single cycle, passing output from each
|
||||
phase as input to the next. The Act output feeds back as input to the
|
||||
next Gather call.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from loop.phase1_gather import gather
|
||||
from loop.phase2_reason import reason
|
||||
from loop.phase3_act import act
|
||||
from loop.schema import ContextPayload
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_cycle(payload: ContextPayload) -> ContextPayload:
|
||||
"""Execute one full Gather → Reason → Act cycle.
|
||||
|
||||
Returns the Act phase output, which can be fed back as input
|
||||
to the next cycle.
|
||||
"""
|
||||
logger.info("=== Loop cycle start: source=%s ===", payload.source)
|
||||
|
||||
gathered = gather(payload)
|
||||
reasoned = reason(gathered)
|
||||
acted = act(reasoned)
|
||||
|
||||
logger.info(
|
||||
"=== Loop cycle complete: phases=%s ===",
|
||||
[
|
||||
gathered.metadata.get("phase"),
|
||||
reasoned.metadata.get("phase"),
|
||||
acted.metadata.get("phase"),
|
||||
],
|
||||
)
|
||||
return acted
|
||||
43
src/loop/schema.py
Normal file
43
src/loop/schema.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""Data schema for the three-phase loop.
|
||||
|
||||
Each phase passes a ContextPayload forward. The schema is intentionally
|
||||
minimal — Timmy decides what fields matter as the loop matures.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ContextPayload:
|
||||
"""Immutable context packet passed between loop phases.
|
||||
|
||||
Attributes:
|
||||
source: Where this payload originated (e.g. "user", "timer", "event").
|
||||
content: The raw content string to process.
|
||||
timestamp: When the payload was created.
|
||||
token_count: Estimated token count for budget tracking. -1 = unknown.
|
||||
metadata: Arbitrary key-value pairs for phase-specific data.
|
||||
"""
|
||||
|
||||
source: str
|
||||
content: str
|
||||
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
|
||||
token_count: int = -1
|
||||
metadata: dict = field(default_factory=dict)
|
||||
|
||||
def with_metadata(self, **kwargs: object) -> ContextPayload:
|
||||
"""Return a new payload with additional metadata merged in."""
|
||||
merged = {**self.metadata, **kwargs}
|
||||
return ContextPayload(
|
||||
source=self.source,
|
||||
content=self.content,
|
||||
timestamp=self.timestamp,
|
||||
token_count=self.token_count,
|
||||
metadata=merged,
|
||||
)
|
||||
0
tests/loop/__init__.py
Normal file
0
tests/loop/__init__.py
Normal file
133
tests/loop/test_three_phase.py
Normal file
133
tests/loop/test_three_phase.py
Normal file
@@ -0,0 +1,133 @@
|
||||
"""Tests for the three-phase loop scaffold.
|
||||
|
||||
Validates the acceptance criteria from issue #324:
|
||||
1. Loop accepts context payload as input to Phase 1
|
||||
2. Phase 1 output feeds into Phase 2 without manual intervention
|
||||
3. Phase 2 output feeds into Phase 3 without manual intervention
|
||||
4. Phase 3 output feeds back into Phase 1
|
||||
5. Full cycle completes without crash
|
||||
6. No state leaks between cycles
|
||||
7. Each phase logs what it received and what it produced
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from loop.phase1_gather import gather
|
||||
from loop.phase2_reason import reason
|
||||
from loop.phase3_act import act
|
||||
from loop.runner import run_cycle
|
||||
from loop.schema import ContextPayload
|
||||
|
||||
|
||||
def _make_payload(source: str = "test", content: str = "hello") -> ContextPayload:
|
||||
return ContextPayload(source=source, content=content, token_count=5)
|
||||
|
||||
|
||||
# --- Schema ---
|
||||
|
||||
|
||||
def test_context_payload_defaults():
|
||||
p = ContextPayload(source="user", content="hi")
|
||||
assert p.source == "user"
|
||||
assert p.content == "hi"
|
||||
assert p.token_count == -1
|
||||
assert p.metadata == {}
|
||||
assert isinstance(p.timestamp, datetime)
|
||||
|
||||
|
||||
def test_with_metadata_returns_new_payload():
|
||||
p = _make_payload()
|
||||
p2 = p.with_metadata(foo="bar")
|
||||
assert p2.metadata == {"foo": "bar"}
|
||||
assert p.metadata == {} # original unchanged
|
||||
|
||||
|
||||
def test_with_metadata_merges():
|
||||
p = _make_payload().with_metadata(a=1)
|
||||
p2 = p.with_metadata(b=2)
|
||||
assert p2.metadata == {"a": 1, "b": 2}
|
||||
|
||||
|
||||
# --- Individual phases ---
|
||||
|
||||
|
||||
def test_gather_marks_phase():
|
||||
result = gather(_make_payload())
|
||||
assert result.metadata["phase"] == "gather"
|
||||
assert result.metadata["gathered"] is True
|
||||
|
||||
|
||||
def test_reason_marks_phase():
|
||||
gathered = gather(_make_payload())
|
||||
result = reason(gathered)
|
||||
assert result.metadata["phase"] == "reason"
|
||||
assert result.metadata["reasoned"] is True
|
||||
|
||||
|
||||
def test_act_marks_phase():
|
||||
gathered = gather(_make_payload())
|
||||
reasoned = reason(gathered)
|
||||
result = act(reasoned)
|
||||
assert result.metadata["phase"] == "act"
|
||||
assert result.metadata["acted"] is True
|
||||
|
||||
|
||||
# --- Full cycle ---
|
||||
|
||||
|
||||
def test_full_cycle_completes():
|
||||
"""Acceptance criterion 5: full cycle completes without crash."""
|
||||
payload = _make_payload(source="user", content="What is sovereignty?")
|
||||
result = run_cycle(payload)
|
||||
assert result.metadata["gathered"] is True
|
||||
assert result.metadata["reasoned"] is True
|
||||
assert result.metadata["acted"] is True
|
||||
|
||||
|
||||
def test_full_cycle_preserves_source():
|
||||
"""Source field survives the full pipeline."""
|
||||
result = run_cycle(_make_payload(source="timer"))
|
||||
assert result.source == "timer"
|
||||
|
||||
|
||||
def test_full_cycle_preserves_content():
|
||||
"""Content field survives the full pipeline."""
|
||||
result = run_cycle(_make_payload(content="test data"))
|
||||
assert result.content == "test data"
|
||||
|
||||
|
||||
def test_no_state_leaks_between_cycles():
|
||||
"""Acceptance criterion 6: no state leaks between cycles."""
|
||||
r1 = run_cycle(_make_payload(source="cycle1", content="first"))
|
||||
r2 = run_cycle(_make_payload(source="cycle2", content="second"))
|
||||
assert r1.source == "cycle1"
|
||||
assert r2.source == "cycle2"
|
||||
assert r1.content == "first"
|
||||
assert r2.content == "second"
|
||||
|
||||
|
||||
def test_cycle_output_feeds_back_as_input():
|
||||
"""Acceptance criterion 4: Phase 3 output feeds back into Phase 1."""
|
||||
first = run_cycle(_make_payload(source="initial"))
|
||||
second = run_cycle(first)
|
||||
# Second cycle should still work — no crash, metadata accumulates
|
||||
assert second.metadata["gathered"] is True
|
||||
assert second.metadata["acted"] is True
|
||||
|
||||
|
||||
def test_phases_log(caplog):
|
||||
"""Acceptance criterion 7: each phase logs what it received and produced."""
|
||||
import logging
|
||||
|
||||
with caplog.at_level(logging.INFO):
|
||||
run_cycle(_make_payload())
|
||||
|
||||
messages = caplog.text
|
||||
assert "Phase 1 (Gather) received" in messages
|
||||
assert "Phase 1 (Gather) produced" in messages
|
||||
assert "Phase 2 (Reason) received" in messages
|
||||
assert "Phase 2 (Reason) produced" in messages
|
||||
assert "Phase 3 (Act) received" in messages
|
||||
assert "Phase 3 (Act) produced" in messages
|
||||
assert "Loop cycle start" in messages
|
||||
assert "Loop cycle complete" in messages
|
||||
Reference in New Issue
Block a user