[tests] 85 new tests for tasks.py and gitea_client.py — zero to covered

COVERAGE BEFORE
===============
  tasks.py          2,117 lines    ZERO tests
  gitea_client.py     539 lines    ZERO tests (in this repo)
  Total:            2,656 lines of orchestration with no safety net

COVERAGE AFTER
==============

test_tasks_core.py — 63 tests across 12 test classes:

  TestExtractFirstJsonObject (10)  — JSON parsing from noisy LLM output
    Every @huey.task depends on this. Tested: clean JSON, markdown
    fences, prose-wrapped, nested, malformed, arrays, unicode, empty

  TestParseJsonOutput (4)          — stdout/stderr fallback chain

  TestNormalizeCandidateEntry (12) — knowledge graph data cleaning
    Confidence clamping, status validation, deduplication, truncation

  TestNormalizeTrainingExamples (5) — autolora training data prep
    Fallback when empty, alternative field names, empty prompt/response

  TestNormalizeRubricScores (3)    — eval score clamping

  TestReadJson (4)                 — defensive file reads
    Missing files, corrupt JSON, deep-copy of defaults

  TestWriteJson (3)                — atomic writes with sorted keys

  TestJsonlIO (9)                  — JSONL read/write/append/count
    Missing files, blank lines, append vs overwrite

  TestWriteText (3)                — trailing newline normalization

  TestPathUtilities (4)            — newest/latest path resolution

  TestFormatting (6)               — batch IDs, profile summaries,
                                     tweet prompts, checkpoint defaults

test_gitea_client_core.py — 22 tests across 9 test classes:

  TestUserFromDict (3)             — all from_dict() deserialization
  TestLabelFromDict (1)
  TestIssueFromDict (4)            — null assignees/labels (THE bug)
  TestCommentFromDict (2)          — null body handling
  TestPullRequestFromDict (3)      — null head/base/merged
  TestPRFileFromDict (1)
  TestGiteaError (2)               — error formatting
  TestClientHelpers (1)            — _repo_path formatting
  TestFindUnassigned (3)           — label/title/assignee filtering
  TestFindAgentIssues (2)          — case-insensitive matching

WHY THESE TESTS MATTER
======================
A bug in extract_first_json_object() corrupts every @huey.task
that processes LLM output — which is all of them. A bug in
normalize_candidate_entry() silently corrupts the knowledge graph.
A bug in the Gitea client's from_dict() crashes the entire triage
and review pipeline (we found this bug — null assignees).

These are the functions that corrupt training data silently when
they break. No one notices until the next autolora run produces
a worse model.

FULL SUITE: 108/108 pass, zero regressions.

Signed-off-by: gemini <gemini@hermes.local>
This commit is contained in:
2026-03-31 08:54:51 -04:00
parent 40ccc88ff1
commit eb1e384edc
2 changed files with 858 additions and 0 deletions

View File

@@ -0,0 +1,318 @@
"""Tests for gitea_client.py — the typed, sovereign API client.
gitea_client.py is 539 lines with zero tests in this repo (there are
tests in hermes-agent, but not here where it's actually used).
These tests cover:
- All 6 dataclass from_dict() constructors (User, Label, Issue, etc.)
- Defensive handling of missing/null fields from Gitea API
- find_unassigned_issues() filtering logic
- find_agent_issues() case-insensitive matching
- GiteaError formatting
- _repo_path() formatting
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
import pytest
# Import gitea_client directly via importlib to avoid any sys.modules mocking
# from test_tasks_core which stubs gitea_client as a MagicMock.
REPO_ROOT = Path(__file__).parent.parent
_spec = importlib.util.spec_from_file_location(
"gitea_client_real",
REPO_ROOT / "gitea_client.py",
)
_gc = importlib.util.module_from_spec(_spec)
sys.modules["gitea_client_real"] = _gc
_spec.loader.exec_module(_gc)
User = _gc.User
Label = _gc.Label
Issue = _gc.Issue
Comment = _gc.Comment
PullRequest = _gc.PullRequest
PRFile = _gc.PRFile
GiteaError = _gc.GiteaError
GiteaClient = _gc.GiteaClient
# ═══════════════════════════════════════════════════════════════════════
# DATACLASS DESERIALIZATION
# ═══════════════════════════════════════════════════════════════════════
class TestUserFromDict:
def test_full_user(self):
u = User.from_dict({"id": 1, "login": "timmy", "full_name": "Timmy", "email": "t@t.com"})
assert u.id == 1
assert u.login == "timmy"
assert u.full_name == "Timmy"
assert u.email == "t@t.com"
def test_minimal_user(self):
"""Missing fields default to empty."""
u = User.from_dict({})
assert u.id == 0
assert u.login == ""
def test_extra_fields_ignored(self):
"""Unknown fields from Gitea are silently ignored."""
u = User.from_dict({"id": 1, "login": "x", "avatar_url": "http://..."})
assert u.login == "x"
class TestLabelFromDict:
def test_label(self):
lb = Label.from_dict({"id": 5, "name": "bug", "color": "#ff0000"})
assert lb.id == 5
assert lb.name == "bug"
assert lb.color == "#ff0000"
class TestIssueFromDict:
def test_full_issue(self):
issue = Issue.from_dict({
"number": 42,
"title": "Fix the bug",
"body": "Please fix it",
"state": "open",
"user": {"id": 1, "login": "reporter"},
"assignees": [{"id": 2, "login": "dev"}],
"labels": [{"id": 3, "name": "bug"}],
"comments": 5,
})
assert issue.number == 42
assert issue.user.login == "reporter"
assert len(issue.assignees) == 1
assert issue.assignees[0].login == "dev"
assert len(issue.labels) == 1
assert issue.comments == 5
def test_null_assignees_handled(self):
"""Gitea returns null for assignees sometimes — the exact bug
that crashed find_unassigned_issues() before the defensive fix."""
issue = Issue.from_dict({
"number": 1,
"title": "test",
"body": None,
"state": "open",
"user": {"id": 1, "login": "x"},
"assignees": None,
})
assert issue.assignees == []
assert issue.body == ""
def test_null_labels_handled(self):
"""Labels can also be null."""
issue = Issue.from_dict({
"number": 1,
"title": "test",
"state": "open",
"user": {},
"labels": None,
})
assert issue.labels == []
def test_missing_user_defaults(self):
"""Issue with no user field doesn't crash."""
issue = Issue.from_dict({"number": 1, "title": "t", "state": "open"})
assert issue.user.login == ""
class TestCommentFromDict:
def test_comment(self):
c = Comment.from_dict({
"id": 10,
"body": "LGTM",
"user": {"id": 1, "login": "reviewer"},
})
assert c.id == 10
assert c.body == "LGTM"
assert c.user.login == "reviewer"
def test_null_body(self):
c = Comment.from_dict({"id": 1, "body": None, "user": {}})
assert c.body == ""
class TestPullRequestFromDict:
def test_full_pr(self):
pr = PullRequest.from_dict({
"number": 99,
"title": "Add feature",
"body": "Description here",
"state": "open",
"user": {"id": 1, "login": "dev"},
"head": {"ref": "feature-branch"},
"base": {"ref": "main"},
"mergeable": True,
"merged": False,
"changed_files": 3,
})
assert pr.number == 99
assert pr.head_branch == "feature-branch"
assert pr.base_branch == "main"
assert pr.mergeable is True
def test_null_head_base(self):
"""Handles null head/base objects."""
pr = PullRequest.from_dict({
"number": 1, "title": "t", "state": "open",
"user": {}, "head": None, "base": None,
})
assert pr.head_branch == ""
assert pr.base_branch == ""
def test_null_merged(self):
"""merged can be null from Gitea."""
pr = PullRequest.from_dict({
"number": 1, "title": "t", "state": "open",
"user": {}, "merged": None,
})
assert pr.merged is False
class TestPRFileFromDict:
def test_pr_file(self):
f = PRFile.from_dict({
"filename": "src/main.py",
"status": "modified",
"additions": 10,
"deletions": 3,
})
assert f.filename == "src/main.py"
assert f.status == "modified"
assert f.additions == 10
assert f.deletions == 3
# ═══════════════════════════════════════════════════════════════════════
# ERROR HANDLING
# ═══════════════════════════════════════════════════════════════════════
class TestGiteaError:
def test_error_formatting(self):
err = GiteaError(404, "not found", "http://example.com/api/v1/repos/x")
assert "404" in str(err)
assert "not found" in str(err)
def test_error_attributes(self):
err = GiteaError(500, "internal")
assert err.status == 500
# ═══════════════════════════════════════════════════════════════════════
# CLIENT HELPER METHODS
# ═══════════════════════════════════════════════════════════════════════
class TestClientHelpers:
def test_repo_path(self):
"""_repo_path converts owner/name to API path."""
client = GiteaClient.__new__(GiteaClient)
assert client._repo_path("Timmy_Foundation/the-nexus") == "/repos/Timmy_Foundation/the-nexus"
# ═══════════════════════════════════════════════════════════════════════
# FILTERING LOGIC — find_unassigned_issues, find_agent_issues
# ═══════════════════════════════════════════════════════════════════════
class TestFindUnassigned:
"""Tests for find_unassigned_issues() filtering logic.
These tests use pre-constructed Issue objects to test the filtering
without making any API calls.
"""
def _make_issue(self, number, assignees=None, labels=None, title="test"):
return Issue(
number=number, title=title, body="", state="open",
user=User(id=0, login=""),
assignees=[User(id=0, login=a) for a in (assignees or [])],
labels=[Label(id=0, name=lb) for lb in (labels or [])],
)
def test_filters_assigned_issues(self):
"""Issues with assignees are excluded."""
from unittest.mock import patch
issues = [
self._make_issue(1, assignees=["dev"]),
self._make_issue(2), # unassigned
]
client = GiteaClient.__new__(GiteaClient)
with patch.object(client, "list_issues", return_value=issues):
result = client.find_unassigned_issues("repo")
assert len(result) == 1
assert result[0].number == 2
def test_excludes_by_label(self):
"""Issues with excluded labels are filtered."""
from unittest.mock import patch
issues = [
self._make_issue(1, labels=["wontfix"]),
self._make_issue(2, labels=["bug"]),
]
client = GiteaClient.__new__(GiteaClient)
with patch.object(client, "list_issues", return_value=issues):
result = client.find_unassigned_issues("repo", exclude_labels=["wontfix"])
assert len(result) == 1
assert result[0].number == 2
def test_excludes_by_title_pattern(self):
"""Issues matching title patterns are filtered."""
from unittest.mock import patch
issues = [
self._make_issue(1, title="[PHASE] Research AI"),
self._make_issue(2, title="Fix login bug"),
]
client = GiteaClient.__new__(GiteaClient)
with patch.object(client, "list_issues", return_value=issues):
result = client.find_unassigned_issues(
"repo", exclude_title_patterns=["[PHASE]"]
)
assert len(result) == 1
assert result[0].number == 2
class TestFindAgentIssues:
"""Tests for find_agent_issues() case-insensitive matching."""
def test_case_insensitive_match(self):
from unittest.mock import patch
issues = [
Issue(number=1, title="t", body="", state="open",
user=User(0, ""), assignees=[User(0, "Timmy")], labels=[]),
]
client = GiteaClient.__new__(GiteaClient)
with patch.object(client, "list_issues", return_value=issues):
result = client.find_agent_issues("repo", "timmy")
assert len(result) == 1
def test_no_match_for_different_agent(self):
from unittest.mock import patch
issues = [
Issue(number=1, title="t", body="", state="open",
user=User(0, ""), assignees=[User(0, "Timmy")], labels=[]),
]
client = GiteaClient.__new__(GiteaClient)
with patch.object(client, "list_issues", return_value=issues):
result = client.find_agent_issues("repo", "claude")
assert len(result) == 0

540
tests/test_tasks_core.py Normal file
View File

@@ -0,0 +1,540 @@
"""Tests for tasks.py — the orchestration brain.
tasks.py is 2,117 lines with zero test coverage. This suite covers
the pure utility functions that every pipeline depends on: JSON parsing,
data normalization, file I/O primitives, and prompt formatting.
These are the functions that corrupt training data silently when they
break. If a normalization function drops a field or misparses JSON from
an LLM, the entire training pipeline produces garbage. No one notices
until the next autolora run produces a worse model.
Coverage priority is based on blast radius — a bug in
extract_first_json_object() affects every @huey.task that processes
LLM output, which is all of them.
"""
from __future__ import annotations
import json
import sys
import tempfile
from pathlib import Path
import pytest
# Import tasks.py without triggering Huey/GiteaClient side effects.
# We mock the imports that have side effects to isolate the pure functions.
from unittest.mock import MagicMock
# Stub out modules with side effects before importing tasks
sys.modules.setdefault("orchestration", MagicMock(huey=MagicMock()))
sys.modules.setdefault("huey", MagicMock())
sys.modules.setdefault("gitea_client", MagicMock())
sys.modules.setdefault("metrics_helpers", MagicMock(
build_local_metric_record=MagicMock(return_value={})
))
# Now we can import the functions we want to test
REPO_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(REPO_ROOT))
import importlib
tasks = importlib.import_module("tasks")
# Pull out the functions under test
extract_first_json_object = tasks.extract_first_json_object
parse_json_output = tasks.parse_json_output
normalize_candidate_entry = tasks.normalize_candidate_entry
normalize_training_examples = tasks.normalize_training_examples
normalize_rubric_scores = tasks.normalize_rubric_scores
archive_batch_id = tasks.archive_batch_id
archive_profile_summary = tasks.archive_profile_summary
format_tweets_for_prompt = tasks.format_tweets_for_prompt
read_json = tasks.read_json
write_json = tasks.write_json
load_jsonl = tasks.load_jsonl
write_jsonl = tasks.write_jsonl
append_jsonl = tasks.append_jsonl
write_text = tasks.write_text
count_jsonl_rows = tasks.count_jsonl_rows
newest_file = tasks.newest_file
latest_path = tasks.latest_path
archive_default_checkpoint = tasks.archive_default_checkpoint
# ═══════════════════════════════════════════════════════════════════════
# JSON EXTRACTION — the single most critical function in the pipeline
# ═══════════════════════════════════════════════════════════════════════
class TestExtractFirstJsonObject:
"""extract_first_json_object() parses JSON from noisy LLM output.
Every @huey.task that processes model output depends on this.
If this breaks, the entire training pipeline produces garbage.
"""
def test_clean_json(self):
"""Parses valid JSON directly."""
result = extract_first_json_object('{"key": "value"}')
assert result == {"key": "value"}
def test_json_with_markdown_fences(self):
"""Strips ```json fences that models love to add."""
text = '```json\n{"hello": "world"}\n```'
result = extract_first_json_object(text)
assert result == {"hello": "world"}
def test_json_after_prose(self):
"""Finds JSON buried after the model's explanation."""
text = "Here is the analysis:\n\nI found that {'key': 'value'}\n\n{\"real\": true}"
result = extract_first_json_object(text)
assert result == {"real": True}
def test_nested_json(self):
"""Handles nested objects correctly."""
text = '{"outer": {"inner": [1, 2, 3]}}'
result = extract_first_json_object(text)
assert result == {"outer": {"inner": [1, 2, 3]}}
def test_raises_on_no_json(self):
"""Raises ValueError when no JSON object is found."""
with pytest.raises(ValueError, match="No JSON object found"):
extract_first_json_object("No JSON here at all")
def test_raises_on_json_array(self):
"""Raises ValueError for JSON arrays (only objects accepted)."""
with pytest.raises(ValueError, match="No JSON object found"):
extract_first_json_object("[1, 2, 3]")
def test_skips_malformed_and_finds_valid(self):
"""Skips broken JSON fragments to find the real one."""
text = '{broken {"valid": true}'
result = extract_first_json_object(text)
assert result == {"valid": True}
def test_handles_whitespace_heavy_output(self):
"""Handles output with excessive whitespace."""
text = ' \n\n {"spaced": "out"} \n\n '
result = extract_first_json_object(text)
assert result == {"spaced": "out"}
def test_empty_string_raises(self):
"""Empty input raises ValueError."""
with pytest.raises(ValueError):
extract_first_json_object("")
def test_unicode_content(self):
"""Handles Unicode characters in JSON values."""
text = '{"emoji": "🔥", "jp": "日本語"}'
result = extract_first_json_object(text)
assert result["emoji"] == "🔥"
class TestParseJsonOutput:
"""parse_json_output() tries stdout then stderr for JSON."""
def test_finds_json_in_stdout(self):
result = parse_json_output(stdout='{"from": "stdout"}')
assert result == {"from": "stdout"}
def test_falls_back_to_stderr(self):
result = parse_json_output(stdout="no json", stderr='{"from": "stderr"}')
assert result == {"from": "stderr"}
def test_empty_returns_empty_dict(self):
result = parse_json_output(stdout="", stderr="")
assert result == {}
def test_none_inputs_handled(self):
result = parse_json_output(stdout=None, stderr=None)
assert result == {}
# ═══════════════════════════════════════════════════════════════════════
# DATA NORMALIZATION — training data quality depends on this
# ═══════════════════════════════════════════════════════════════════════
class TestNormalizeCandidateEntry:
"""normalize_candidate_entry() cleans LLM-generated knowledge candidates.
A bug here silently corrupts the knowledge graph. Fields are
coerced to correct types, clamped to valid ranges, and deduplicated.
"""
def test_valid_candidate(self):
"""Normalizes a well-formed candidate."""
candidate = {
"category": "trait",
"claim": "Alexander likes coffee",
"evidence_tweet_ids": ["123", "456"],
"evidence_quotes": ["I love coffee"],
"confidence": 0.8,
"status": "provisional",
}
result = normalize_candidate_entry(candidate, "batch_001", 1)
assert result["id"] == "batch_001-candidate-01"
assert result["category"] == "trait"
assert result["claim"] == "Alexander likes coffee"
assert result["confidence"] == 0.8
assert result["status"] == "provisional"
def test_empty_claim_returns_none(self):
"""Rejects candidates with empty claims."""
result = normalize_candidate_entry({"claim": ""}, "b001", 0)
assert result is None
def test_missing_claim_returns_none(self):
"""Rejects candidates with no claim field."""
result = normalize_candidate_entry({"category": "trait"}, "b001", 0)
assert result is None
def test_confidence_clamped_high(self):
"""Confidence above 1.0 is clamped to 1.0."""
result = normalize_candidate_entry(
{"claim": "test", "confidence": 5.0}, "b001", 1
)
assert result["confidence"] == 1.0
def test_confidence_clamped_low(self):
"""Confidence below 0.0 is clamped to 0.0."""
result = normalize_candidate_entry(
{"claim": "test", "confidence": -0.5}, "b001", 1
)
assert result["confidence"] == 0.0
def test_invalid_confidence_defaults(self):
"""Non-numeric confidence defaults to 0.5."""
result = normalize_candidate_entry(
{"claim": "test", "confidence": "high"}, "b001", 1
)
assert result["confidence"] == 0.5
def test_invalid_status_defaults_to_provisional(self):
"""Unknown status values default to 'provisional'."""
result = normalize_candidate_entry(
{"claim": "test", "status": "banana"}, "b001", 1
)
assert result["status"] == "provisional"
def test_duplicate_evidence_ids_deduped(self):
"""Duplicate tweet IDs are removed."""
result = normalize_candidate_entry(
{"claim": "test", "evidence_tweet_ids": ["1", "1", "2", "2"]},
"b001", 1,
)
assert result["evidence_tweet_ids"] == ["1", "2"]
def test_duplicate_quotes_deduped(self):
"""Duplicate evidence quotes are removed."""
result = normalize_candidate_entry(
{"claim": "test", "evidence_quotes": ["same", "same", "new"]},
"b001", 1,
)
assert result["evidence_quotes"] == ["same", "new"]
def test_evidence_truncated_to_5(self):
"""Evidence lists are capped at 5 items."""
result = normalize_candidate_entry(
{"claim": "test", "evidence_quotes": [f"q{i}" for i in range(10)]},
"b001", 1,
)
assert len(result["evidence_quotes"]) == 5
def test_none_category_defaults(self):
"""None category defaults to 'recurring-theme'."""
result = normalize_candidate_entry(
{"claim": "test", "category": None}, "b001", 1
)
assert result["category"] == "recurring-theme"
def test_valid_statuses_accepted(self):
"""All three valid statuses are preserved."""
for status in ("provisional", "durable", "retracted"):
result = normalize_candidate_entry(
{"claim": "test", "status": status}, "b001", 1
)
assert result["status"] == status
class TestNormalizeTrainingExamples:
"""normalize_training_examples() cleans LLM-generated training pairs.
This feeds directly into autolora. Bad data here means bad training.
"""
def test_valid_examples_normalized(self):
"""Well-formed examples pass through with added metadata."""
examples = [
{"prompt": "Q1", "response": "A1", "task_type": "analysis"},
{"prompt": "Q2", "response": "A2"},
]
result = normalize_training_examples(
examples, "b001", ["t1"], "fallback_p", "fallback_r"
)
assert len(result) == 2
assert result[0]["example_id"] == "b001-example-01"
assert result[0]["prompt"] == "Q1"
assert result[1]["task_type"] == "analysis" # defaults
def test_empty_examples_get_fallback(self):
"""When no valid examples exist, fallback is used."""
result = normalize_training_examples(
[], "b001", ["t1"], "fallback prompt", "fallback response"
)
assert len(result) == 1
assert result[0]["prompt"] == "fallback prompt"
assert result[0]["response"] == "fallback response"
def test_examples_with_empty_prompt_skipped(self):
"""Examples without prompts are filtered out."""
examples = [
{"prompt": "", "response": "A1"},
{"prompt": "Q2", "response": "A2"},
]
result = normalize_training_examples(
examples, "b001", ["t1"], "fp", "fr"
)
assert len(result) == 1
assert result[0]["prompt"] == "Q2"
def test_examples_with_empty_response_skipped(self):
"""Examples without responses are filtered out."""
examples = [
{"prompt": "Q1", "response": ""},
]
result = normalize_training_examples(
examples, "b001", ["t1"], "fp", "fr"
)
# Falls to fallback
assert len(result) == 1
assert result[0]["prompt"] == "fp"
def test_alternative_field_names_accepted(self):
"""Accepts 'instruction'/'answer' as field name alternatives."""
examples = [
{"instruction": "Q1", "answer": "A1"},
]
result = normalize_training_examples(
examples, "b001", ["t1"], "fp", "fr"
)
assert len(result) == 1
assert result[0]["prompt"] == "Q1"
assert result[0]["response"] == "A1"
class TestNormalizeRubricScores:
"""normalize_rubric_scores() cleans eval rubric output."""
def test_valid_scores(self):
scores = {"grounding": 8, "specificity": 7, "source_distinction": 9, "actionability": 6}
result = normalize_rubric_scores(scores)
assert result == {"grounding": 8.0, "specificity": 7.0,
"source_distinction": 9.0, "actionability": 6.0}
def test_missing_keys_default_to_zero(self):
result = normalize_rubric_scores({})
assert result == {"grounding": 0.0, "specificity": 0.0,
"source_distinction": 0.0, "actionability": 0.0}
def test_non_numeric_defaults_to_zero(self):
result = normalize_rubric_scores({"grounding": "excellent"})
assert result["grounding"] == 0.0
# ═══════════════════════════════════════════════════════════════════════
# FILE I/O PRIMITIVES — the foundation everything reads/writes through
# ═══════════════════════════════════════════════════════════════════════
class TestReadJson:
def test_reads_valid_file(self, tmp_path):
f = tmp_path / "test.json"
f.write_text('{"key": "val"}')
assert read_json(f, {}) == {"key": "val"}
def test_missing_file_returns_default(self, tmp_path):
assert read_json(tmp_path / "nope.json", {"default": True}) == {"default": True}
def test_corrupt_file_returns_default(self, tmp_path):
f = tmp_path / "bad.json"
f.write_text("{corrupt json!!!}")
assert read_json(f, {"safe": True}) == {"safe": True}
def test_default_is_deep_copied(self, tmp_path):
"""Default is deep-copied, not shared between calls."""
default = {"nested": {"key": "val"}}
result1 = read_json(tmp_path / "a.json", default)
result2 = read_json(tmp_path / "b.json", default)
result1["nested"]["key"] = "mutated"
assert result2["nested"]["key"] == "val"
class TestWriteJson:
def test_creates_file_with_indent(self, tmp_path):
f = tmp_path / "out.json"
write_json(f, {"key": "val"})
content = f.read_text()
assert '"key": "val"' in content
assert content.endswith("\n")
def test_creates_parent_dirs(self, tmp_path):
f = tmp_path / "deep" / "nested" / "out.json"
write_json(f, {"ok": True})
assert f.exists()
def test_sorted_keys(self, tmp_path):
f = tmp_path / "sorted.json"
write_json(f, {"z": 1, "a": 2})
content = f.read_text()
assert content.index('"a"') < content.index('"z"')
class TestJsonlIO:
def test_load_jsonl_valid(self, tmp_path):
f = tmp_path / "data.jsonl"
f.write_text('{"a":1}\n{"b":2}\n')
rows = load_jsonl(f)
assert len(rows) == 2
assert rows[0] == {"a": 1}
def test_load_jsonl_missing_file(self, tmp_path):
assert load_jsonl(tmp_path / "nope.jsonl") == []
def test_load_jsonl_skips_blank_lines(self, tmp_path):
f = tmp_path / "data.jsonl"
f.write_text('{"a":1}\n\n\n{"b":2}\n')
rows = load_jsonl(f)
assert len(rows) == 2
def test_write_jsonl(self, tmp_path):
f = tmp_path / "out.jsonl"
write_jsonl(f, [{"a": 1}, {"b": 2}])
lines = f.read_text().strip().split("\n")
assert len(lines) == 2
assert json.loads(lines[0]) == {"a": 1}
def test_append_jsonl(self, tmp_path):
f = tmp_path / "append.jsonl"
f.write_text('{"existing":true}\n')
append_jsonl(f, [{"new": True}])
rows = load_jsonl(f)
assert len(rows) == 2
def test_append_jsonl_empty_list_noop(self, tmp_path):
"""Appending empty list doesn't create file."""
f = tmp_path / "nope.jsonl"
append_jsonl(f, [])
assert not f.exists()
def test_count_jsonl_rows(self, tmp_path):
f = tmp_path / "count.jsonl"
f.write_text('{"a":1}\n{"b":2}\n{"c":3}\n')
assert count_jsonl_rows(f) == 3
def test_count_jsonl_missing_file(self, tmp_path):
assert count_jsonl_rows(tmp_path / "nope.jsonl") == 0
def test_count_jsonl_skips_blank_lines(self, tmp_path):
f = tmp_path / "sparse.jsonl"
f.write_text('{"a":1}\n\n{"b":2}\n\n')
assert count_jsonl_rows(f) == 2
class TestWriteText:
def test_writes_with_trailing_newline(self, tmp_path):
f = tmp_path / "text.md"
write_text(f, "hello")
assert f.read_text() == "hello\n"
def test_strips_trailing_whitespace(self, tmp_path):
f = tmp_path / "text.md"
write_text(f, "hello \n\n\n")
assert f.read_text() == "hello\n"
def test_empty_content_writes_empty_file(self, tmp_path):
f = tmp_path / "text.md"
write_text(f, " ")
assert f.read_text() == ""
# ═══════════════════════════════════════════════════════════════════════
# PATH UTILITIES
# ═══════════════════════════════════════════════════════════════════════
class TestPathUtilities:
def test_newest_file(self, tmp_path):
(tmp_path / "a.txt").write_text("a")
(tmp_path / "b.txt").write_text("b")
(tmp_path / "c.txt").write_text("c")
result = newest_file(tmp_path, "*.txt")
assert result.name == "c.txt" # sorted, last = newest
def test_newest_file_empty_dir(self, tmp_path):
assert newest_file(tmp_path, "*.txt") is None
def test_latest_path(self, tmp_path):
(tmp_path / "batch_001.json").write_text("{}")
(tmp_path / "batch_002.json").write_text("{}")
result = latest_path(tmp_path, "batch_*.json")
assert result.name == "batch_002.json"
def test_latest_path_no_matches(self, tmp_path):
assert latest_path(tmp_path, "*.nope") is None
# ═══════════════════════════════════════════════════════════════════════
# FORMATTING & HELPERS
# ═══════════════════════════════════════════════════════════════════════
class TestFormatting:
def test_archive_batch_id(self):
assert archive_batch_id(1) == "batch_001"
assert archive_batch_id(42) == "batch_042"
assert archive_batch_id(100) == "batch_100"
def test_archive_profile_summary(self):
profile = {
"claims": [
{"status": "durable", "claim": "a"},
{"status": "durable", "claim": "b"},
{"status": "provisional", "claim": "c"},
{"status": "retracted", "claim": "d"},
]
}
summary = archive_profile_summary(profile)
assert len(summary["durable_claims"]) == 2
assert len(summary["provisional_claims"]) == 1
def test_archive_profile_summary_truncates(self):
"""Summaries are capped at 12 durable and 8 provisional."""
profile = {
"claims": [{"status": "durable", "claim": f"d{i}"} for i in range(20)]
+ [{"status": "provisional", "claim": f"p{i}"} for i in range(15)]
}
summary = archive_profile_summary(profile)
assert len(summary["durable_claims"]) <= 12
assert len(summary["provisional_claims"]) <= 8
def test_archive_profile_summary_empty(self):
assert archive_profile_summary({}) == {
"durable_claims": [],
"provisional_claims": [],
}
def test_format_tweets_for_prompt(self):
rows = [
{"tweet_id": "123", "created_at": "2024-01-01", "full_text": "Hello world"},
{"tweet_id": "456", "created_at": "2024-01-02", "full_text": "Goodbye world"},
]
result = format_tweets_for_prompt(rows)
assert "tweet_id=123" in result
assert "Hello world" in result
assert "2." in result # 1-indexed
def test_archive_default_checkpoint(self):
"""Default checkpoint has all required fields."""
cp = archive_default_checkpoint()
assert cp["phase"] == "discovery"
assert cp["next_offset"] == 0
assert cp["batch_size"] == 50
assert cp["batches_completed"] == 0