COVERAGE BEFORE
===============
tasks.py 2,117 lines ZERO tests
gitea_client.py 539 lines ZERO tests (in this repo)
Total: 2,656 lines of orchestration with no safety net
COVERAGE AFTER
==============
test_tasks_core.py — 63 tests across 12 test classes:
TestExtractFirstJsonObject (10) — JSON parsing from noisy LLM output
Every @huey.task depends on this. Tested: clean JSON, markdown
fences, prose-wrapped, nested, malformed, arrays, unicode, empty
TestParseJsonOutput (4) — stdout/stderr fallback chain
TestNormalizeCandidateEntry (12) — knowledge graph data cleaning
Confidence clamping, status validation, deduplication, truncation
TestNormalizeTrainingExamples (5) — autolora training data prep
Fallback when empty, alternative field names, empty prompt/response
TestNormalizeRubricScores (3) — eval score clamping
TestReadJson (4) — defensive file reads
Missing files, corrupt JSON, deep-copy of defaults
TestWriteJson (3) — atomic writes with sorted keys
TestJsonlIO (9) — JSONL read/write/append/count
Missing files, blank lines, append vs overwrite
TestWriteText (3) — trailing newline normalization
TestPathUtilities (4) — newest/latest path resolution
TestFormatting (6) — batch IDs, profile summaries,
tweet prompts, checkpoint defaults
test_gitea_client_core.py — 22 tests across 9 test classes:
TestUserFromDict (3) — all from_dict() deserialization
TestLabelFromDict (1)
TestIssueFromDict (4) — null assignees/labels (THE bug)
TestCommentFromDict (2) — null body handling
TestPullRequestFromDict (3) — null head/base/merged
TestPRFileFromDict (1)
TestGiteaError (2) — error formatting
TestClientHelpers (1) — _repo_path formatting
TestFindUnassigned (3) — label/title/assignee filtering
TestFindAgentIssues (2) — case-insensitive matching
WHY THESE TESTS MATTER
======================
A bug in extract_first_json_object() corrupts every @huey.task
that processes LLM output — which is all of them. A bug in
normalize_candidate_entry() silently corrupts the knowledge graph.
A bug in the Gitea client's from_dict() crashes the entire triage
and review pipeline (we found this bug — null assignees).
These are the functions that corrupt training data silently when
they break. No one notices until the next autolora run produces
a worse model.
FULL SUITE: 108/108 pass, zero regressions.
Signed-off-by: gemini <gemini@hermes.local>
541 lines
21 KiB
Python
541 lines
21 KiB
Python
"""Tests for tasks.py — the orchestration brain.
|
|
|
|
tasks.py is 2,117 lines with zero test coverage. This suite covers
|
|
the pure utility functions that every pipeline depends on: JSON parsing,
|
|
data normalization, file I/O primitives, and prompt formatting.
|
|
|
|
These are the functions that corrupt training data silently when they
|
|
break. If a normalization function drops a field or misparses JSON from
|
|
an LLM, the entire training pipeline produces garbage. No one notices
|
|
until the next autolora run produces a worse model.
|
|
|
|
Coverage priority is based on blast radius — a bug in
|
|
extract_first_json_object() affects every @huey.task that processes
|
|
LLM output, which is all of them.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
# Import tasks.py without triggering Huey/GiteaClient side effects.
|
|
# We mock the imports that have side effects to isolate the pure functions.
|
|
from unittest.mock import MagicMock
|
|
|
|
# Stub out modules with side effects before importing tasks
|
|
sys.modules.setdefault("orchestration", MagicMock(huey=MagicMock()))
|
|
sys.modules.setdefault("huey", MagicMock())
|
|
sys.modules.setdefault("gitea_client", MagicMock())
|
|
sys.modules.setdefault("metrics_helpers", MagicMock(
|
|
build_local_metric_record=MagicMock(return_value={})
|
|
))
|
|
|
|
# Now we can import the functions we want to test
|
|
REPO_ROOT = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(REPO_ROOT))
|
|
|
|
import importlib
|
|
tasks = importlib.import_module("tasks")
|
|
|
|
# Pull out the functions under test
|
|
extract_first_json_object = tasks.extract_first_json_object
|
|
parse_json_output = tasks.parse_json_output
|
|
normalize_candidate_entry = tasks.normalize_candidate_entry
|
|
normalize_training_examples = tasks.normalize_training_examples
|
|
normalize_rubric_scores = tasks.normalize_rubric_scores
|
|
archive_batch_id = tasks.archive_batch_id
|
|
archive_profile_summary = tasks.archive_profile_summary
|
|
format_tweets_for_prompt = tasks.format_tweets_for_prompt
|
|
read_json = tasks.read_json
|
|
write_json = tasks.write_json
|
|
load_jsonl = tasks.load_jsonl
|
|
write_jsonl = tasks.write_jsonl
|
|
append_jsonl = tasks.append_jsonl
|
|
write_text = tasks.write_text
|
|
count_jsonl_rows = tasks.count_jsonl_rows
|
|
newest_file = tasks.newest_file
|
|
latest_path = tasks.latest_path
|
|
archive_default_checkpoint = tasks.archive_default_checkpoint
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# JSON EXTRACTION — the single most critical function in the pipeline
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
class TestExtractFirstJsonObject:
|
|
"""extract_first_json_object() parses JSON from noisy LLM output.
|
|
|
|
Every @huey.task that processes model output depends on this.
|
|
If this breaks, the entire training pipeline produces garbage.
|
|
"""
|
|
|
|
def test_clean_json(self):
|
|
"""Parses valid JSON directly."""
|
|
result = extract_first_json_object('{"key": "value"}')
|
|
assert result == {"key": "value"}
|
|
|
|
def test_json_with_markdown_fences(self):
|
|
"""Strips ```json fences that models love to add."""
|
|
text = '```json\n{"hello": "world"}\n```'
|
|
result = extract_first_json_object(text)
|
|
assert result == {"hello": "world"}
|
|
|
|
def test_json_after_prose(self):
|
|
"""Finds JSON buried after the model's explanation."""
|
|
text = "Here is the analysis:\n\nI found that {'key': 'value'}\n\n{\"real\": true}"
|
|
result = extract_first_json_object(text)
|
|
assert result == {"real": True}
|
|
|
|
def test_nested_json(self):
|
|
"""Handles nested objects correctly."""
|
|
text = '{"outer": {"inner": [1, 2, 3]}}'
|
|
result = extract_first_json_object(text)
|
|
assert result == {"outer": {"inner": [1, 2, 3]}}
|
|
|
|
def test_raises_on_no_json(self):
|
|
"""Raises ValueError when no JSON object is found."""
|
|
with pytest.raises(ValueError, match="No JSON object found"):
|
|
extract_first_json_object("No JSON here at all")
|
|
|
|
def test_raises_on_json_array(self):
|
|
"""Raises ValueError for JSON arrays (only objects accepted)."""
|
|
with pytest.raises(ValueError, match="No JSON object found"):
|
|
extract_first_json_object("[1, 2, 3]")
|
|
|
|
def test_skips_malformed_and_finds_valid(self):
|
|
"""Skips broken JSON fragments to find the real one."""
|
|
text = '{broken {"valid": true}'
|
|
result = extract_first_json_object(text)
|
|
assert result == {"valid": True}
|
|
|
|
def test_handles_whitespace_heavy_output(self):
|
|
"""Handles output with excessive whitespace."""
|
|
text = ' \n\n {"spaced": "out"} \n\n '
|
|
result = extract_first_json_object(text)
|
|
assert result == {"spaced": "out"}
|
|
|
|
def test_empty_string_raises(self):
|
|
"""Empty input raises ValueError."""
|
|
with pytest.raises(ValueError):
|
|
extract_first_json_object("")
|
|
|
|
def test_unicode_content(self):
|
|
"""Handles Unicode characters in JSON values."""
|
|
text = '{"emoji": "🔥", "jp": "日本語"}'
|
|
result = extract_first_json_object(text)
|
|
assert result["emoji"] == "🔥"
|
|
|
|
|
|
class TestParseJsonOutput:
|
|
"""parse_json_output() tries stdout then stderr for JSON."""
|
|
|
|
def test_finds_json_in_stdout(self):
|
|
result = parse_json_output(stdout='{"from": "stdout"}')
|
|
assert result == {"from": "stdout"}
|
|
|
|
def test_falls_back_to_stderr(self):
|
|
result = parse_json_output(stdout="no json", stderr='{"from": "stderr"}')
|
|
assert result == {"from": "stderr"}
|
|
|
|
def test_empty_returns_empty_dict(self):
|
|
result = parse_json_output(stdout="", stderr="")
|
|
assert result == {}
|
|
|
|
def test_none_inputs_handled(self):
|
|
result = parse_json_output(stdout=None, stderr=None)
|
|
assert result == {}
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# DATA NORMALIZATION — training data quality depends on this
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
class TestNormalizeCandidateEntry:
|
|
"""normalize_candidate_entry() cleans LLM-generated knowledge candidates.
|
|
|
|
A bug here silently corrupts the knowledge graph. Fields are
|
|
coerced to correct types, clamped to valid ranges, and deduplicated.
|
|
"""
|
|
|
|
def test_valid_candidate(self):
|
|
"""Normalizes a well-formed candidate."""
|
|
candidate = {
|
|
"category": "trait",
|
|
"claim": "Alexander likes coffee",
|
|
"evidence_tweet_ids": ["123", "456"],
|
|
"evidence_quotes": ["I love coffee"],
|
|
"confidence": 0.8,
|
|
"status": "provisional",
|
|
}
|
|
result = normalize_candidate_entry(candidate, "batch_001", 1)
|
|
assert result["id"] == "batch_001-candidate-01"
|
|
assert result["category"] == "trait"
|
|
assert result["claim"] == "Alexander likes coffee"
|
|
assert result["confidence"] == 0.8
|
|
assert result["status"] == "provisional"
|
|
|
|
def test_empty_claim_returns_none(self):
|
|
"""Rejects candidates with empty claims."""
|
|
result = normalize_candidate_entry({"claim": ""}, "b001", 0)
|
|
assert result is None
|
|
|
|
def test_missing_claim_returns_none(self):
|
|
"""Rejects candidates with no claim field."""
|
|
result = normalize_candidate_entry({"category": "trait"}, "b001", 0)
|
|
assert result is None
|
|
|
|
def test_confidence_clamped_high(self):
|
|
"""Confidence above 1.0 is clamped to 1.0."""
|
|
result = normalize_candidate_entry(
|
|
{"claim": "test", "confidence": 5.0}, "b001", 1
|
|
)
|
|
assert result["confidence"] == 1.0
|
|
|
|
def test_confidence_clamped_low(self):
|
|
"""Confidence below 0.0 is clamped to 0.0."""
|
|
result = normalize_candidate_entry(
|
|
{"claim": "test", "confidence": -0.5}, "b001", 1
|
|
)
|
|
assert result["confidence"] == 0.0
|
|
|
|
def test_invalid_confidence_defaults(self):
|
|
"""Non-numeric confidence defaults to 0.5."""
|
|
result = normalize_candidate_entry(
|
|
{"claim": "test", "confidence": "high"}, "b001", 1
|
|
)
|
|
assert result["confidence"] == 0.5
|
|
|
|
def test_invalid_status_defaults_to_provisional(self):
|
|
"""Unknown status values default to 'provisional'."""
|
|
result = normalize_candidate_entry(
|
|
{"claim": "test", "status": "banana"}, "b001", 1
|
|
)
|
|
assert result["status"] == "provisional"
|
|
|
|
def test_duplicate_evidence_ids_deduped(self):
|
|
"""Duplicate tweet IDs are removed."""
|
|
result = normalize_candidate_entry(
|
|
{"claim": "test", "evidence_tweet_ids": ["1", "1", "2", "2"]},
|
|
"b001", 1,
|
|
)
|
|
assert result["evidence_tweet_ids"] == ["1", "2"]
|
|
|
|
def test_duplicate_quotes_deduped(self):
|
|
"""Duplicate evidence quotes are removed."""
|
|
result = normalize_candidate_entry(
|
|
{"claim": "test", "evidence_quotes": ["same", "same", "new"]},
|
|
"b001", 1,
|
|
)
|
|
assert result["evidence_quotes"] == ["same", "new"]
|
|
|
|
def test_evidence_truncated_to_5(self):
|
|
"""Evidence lists are capped at 5 items."""
|
|
result = normalize_candidate_entry(
|
|
{"claim": "test", "evidence_quotes": [f"q{i}" for i in range(10)]},
|
|
"b001", 1,
|
|
)
|
|
assert len(result["evidence_quotes"]) == 5
|
|
|
|
def test_none_category_defaults(self):
|
|
"""None category defaults to 'recurring-theme'."""
|
|
result = normalize_candidate_entry(
|
|
{"claim": "test", "category": None}, "b001", 1
|
|
)
|
|
assert result["category"] == "recurring-theme"
|
|
|
|
def test_valid_statuses_accepted(self):
|
|
"""All three valid statuses are preserved."""
|
|
for status in ("provisional", "durable", "retracted"):
|
|
result = normalize_candidate_entry(
|
|
{"claim": "test", "status": status}, "b001", 1
|
|
)
|
|
assert result["status"] == status
|
|
|
|
|
|
class TestNormalizeTrainingExamples:
|
|
"""normalize_training_examples() cleans LLM-generated training pairs.
|
|
|
|
This feeds directly into autolora. Bad data here means bad training.
|
|
"""
|
|
|
|
def test_valid_examples_normalized(self):
|
|
"""Well-formed examples pass through with added metadata."""
|
|
examples = [
|
|
{"prompt": "Q1", "response": "A1", "task_type": "analysis"},
|
|
{"prompt": "Q2", "response": "A2"},
|
|
]
|
|
result = normalize_training_examples(
|
|
examples, "b001", ["t1"], "fallback_p", "fallback_r"
|
|
)
|
|
assert len(result) == 2
|
|
assert result[0]["example_id"] == "b001-example-01"
|
|
assert result[0]["prompt"] == "Q1"
|
|
assert result[1]["task_type"] == "analysis" # defaults
|
|
|
|
def test_empty_examples_get_fallback(self):
|
|
"""When no valid examples exist, fallback is used."""
|
|
result = normalize_training_examples(
|
|
[], "b001", ["t1"], "fallback prompt", "fallback response"
|
|
)
|
|
assert len(result) == 1
|
|
assert result[0]["prompt"] == "fallback prompt"
|
|
assert result[0]["response"] == "fallback response"
|
|
|
|
def test_examples_with_empty_prompt_skipped(self):
|
|
"""Examples without prompts are filtered out."""
|
|
examples = [
|
|
{"prompt": "", "response": "A1"},
|
|
{"prompt": "Q2", "response": "A2"},
|
|
]
|
|
result = normalize_training_examples(
|
|
examples, "b001", ["t1"], "fp", "fr"
|
|
)
|
|
assert len(result) == 1
|
|
assert result[0]["prompt"] == "Q2"
|
|
|
|
def test_examples_with_empty_response_skipped(self):
|
|
"""Examples without responses are filtered out."""
|
|
examples = [
|
|
{"prompt": "Q1", "response": ""},
|
|
]
|
|
result = normalize_training_examples(
|
|
examples, "b001", ["t1"], "fp", "fr"
|
|
)
|
|
# Falls to fallback
|
|
assert len(result) == 1
|
|
assert result[0]["prompt"] == "fp"
|
|
|
|
def test_alternative_field_names_accepted(self):
|
|
"""Accepts 'instruction'/'answer' as field name alternatives."""
|
|
examples = [
|
|
{"instruction": "Q1", "answer": "A1"},
|
|
]
|
|
result = normalize_training_examples(
|
|
examples, "b001", ["t1"], "fp", "fr"
|
|
)
|
|
assert len(result) == 1
|
|
assert result[0]["prompt"] == "Q1"
|
|
assert result[0]["response"] == "A1"
|
|
|
|
|
|
class TestNormalizeRubricScores:
|
|
"""normalize_rubric_scores() cleans eval rubric output."""
|
|
|
|
def test_valid_scores(self):
|
|
scores = {"grounding": 8, "specificity": 7, "source_distinction": 9, "actionability": 6}
|
|
result = normalize_rubric_scores(scores)
|
|
assert result == {"grounding": 8.0, "specificity": 7.0,
|
|
"source_distinction": 9.0, "actionability": 6.0}
|
|
|
|
def test_missing_keys_default_to_zero(self):
|
|
result = normalize_rubric_scores({})
|
|
assert result == {"grounding": 0.0, "specificity": 0.0,
|
|
"source_distinction": 0.0, "actionability": 0.0}
|
|
|
|
def test_non_numeric_defaults_to_zero(self):
|
|
result = normalize_rubric_scores({"grounding": "excellent"})
|
|
assert result["grounding"] == 0.0
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# FILE I/O PRIMITIVES — the foundation everything reads/writes through
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
class TestReadJson:
|
|
def test_reads_valid_file(self, tmp_path):
|
|
f = tmp_path / "test.json"
|
|
f.write_text('{"key": "val"}')
|
|
assert read_json(f, {}) == {"key": "val"}
|
|
|
|
def test_missing_file_returns_default(self, tmp_path):
|
|
assert read_json(tmp_path / "nope.json", {"default": True}) == {"default": True}
|
|
|
|
def test_corrupt_file_returns_default(self, tmp_path):
|
|
f = tmp_path / "bad.json"
|
|
f.write_text("{corrupt json!!!}")
|
|
assert read_json(f, {"safe": True}) == {"safe": True}
|
|
|
|
def test_default_is_deep_copied(self, tmp_path):
|
|
"""Default is deep-copied, not shared between calls."""
|
|
default = {"nested": {"key": "val"}}
|
|
result1 = read_json(tmp_path / "a.json", default)
|
|
result2 = read_json(tmp_path / "b.json", default)
|
|
result1["nested"]["key"] = "mutated"
|
|
assert result2["nested"]["key"] == "val"
|
|
|
|
|
|
class TestWriteJson:
|
|
def test_creates_file_with_indent(self, tmp_path):
|
|
f = tmp_path / "out.json"
|
|
write_json(f, {"key": "val"})
|
|
content = f.read_text()
|
|
assert '"key": "val"' in content
|
|
assert content.endswith("\n")
|
|
|
|
def test_creates_parent_dirs(self, tmp_path):
|
|
f = tmp_path / "deep" / "nested" / "out.json"
|
|
write_json(f, {"ok": True})
|
|
assert f.exists()
|
|
|
|
def test_sorted_keys(self, tmp_path):
|
|
f = tmp_path / "sorted.json"
|
|
write_json(f, {"z": 1, "a": 2})
|
|
content = f.read_text()
|
|
assert content.index('"a"') < content.index('"z"')
|
|
|
|
|
|
class TestJsonlIO:
|
|
def test_load_jsonl_valid(self, tmp_path):
|
|
f = tmp_path / "data.jsonl"
|
|
f.write_text('{"a":1}\n{"b":2}\n')
|
|
rows = load_jsonl(f)
|
|
assert len(rows) == 2
|
|
assert rows[0] == {"a": 1}
|
|
|
|
def test_load_jsonl_missing_file(self, tmp_path):
|
|
assert load_jsonl(tmp_path / "nope.jsonl") == []
|
|
|
|
def test_load_jsonl_skips_blank_lines(self, tmp_path):
|
|
f = tmp_path / "data.jsonl"
|
|
f.write_text('{"a":1}\n\n\n{"b":2}\n')
|
|
rows = load_jsonl(f)
|
|
assert len(rows) == 2
|
|
|
|
def test_write_jsonl(self, tmp_path):
|
|
f = tmp_path / "out.jsonl"
|
|
write_jsonl(f, [{"a": 1}, {"b": 2}])
|
|
lines = f.read_text().strip().split("\n")
|
|
assert len(lines) == 2
|
|
assert json.loads(lines[0]) == {"a": 1}
|
|
|
|
def test_append_jsonl(self, tmp_path):
|
|
f = tmp_path / "append.jsonl"
|
|
f.write_text('{"existing":true}\n')
|
|
append_jsonl(f, [{"new": True}])
|
|
rows = load_jsonl(f)
|
|
assert len(rows) == 2
|
|
|
|
def test_append_jsonl_empty_list_noop(self, tmp_path):
|
|
"""Appending empty list doesn't create file."""
|
|
f = tmp_path / "nope.jsonl"
|
|
append_jsonl(f, [])
|
|
assert not f.exists()
|
|
|
|
def test_count_jsonl_rows(self, tmp_path):
|
|
f = tmp_path / "count.jsonl"
|
|
f.write_text('{"a":1}\n{"b":2}\n{"c":3}\n')
|
|
assert count_jsonl_rows(f) == 3
|
|
|
|
def test_count_jsonl_missing_file(self, tmp_path):
|
|
assert count_jsonl_rows(tmp_path / "nope.jsonl") == 0
|
|
|
|
def test_count_jsonl_skips_blank_lines(self, tmp_path):
|
|
f = tmp_path / "sparse.jsonl"
|
|
f.write_text('{"a":1}\n\n{"b":2}\n\n')
|
|
assert count_jsonl_rows(f) == 2
|
|
|
|
|
|
class TestWriteText:
|
|
def test_writes_with_trailing_newline(self, tmp_path):
|
|
f = tmp_path / "text.md"
|
|
write_text(f, "hello")
|
|
assert f.read_text() == "hello\n"
|
|
|
|
def test_strips_trailing_whitespace(self, tmp_path):
|
|
f = tmp_path / "text.md"
|
|
write_text(f, "hello \n\n\n")
|
|
assert f.read_text() == "hello\n"
|
|
|
|
def test_empty_content_writes_empty_file(self, tmp_path):
|
|
f = tmp_path / "text.md"
|
|
write_text(f, " ")
|
|
assert f.read_text() == ""
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# PATH UTILITIES
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
class TestPathUtilities:
|
|
def test_newest_file(self, tmp_path):
|
|
(tmp_path / "a.txt").write_text("a")
|
|
(tmp_path / "b.txt").write_text("b")
|
|
(tmp_path / "c.txt").write_text("c")
|
|
result = newest_file(tmp_path, "*.txt")
|
|
assert result.name == "c.txt" # sorted, last = newest
|
|
|
|
def test_newest_file_empty_dir(self, tmp_path):
|
|
assert newest_file(tmp_path, "*.txt") is None
|
|
|
|
def test_latest_path(self, tmp_path):
|
|
(tmp_path / "batch_001.json").write_text("{}")
|
|
(tmp_path / "batch_002.json").write_text("{}")
|
|
result = latest_path(tmp_path, "batch_*.json")
|
|
assert result.name == "batch_002.json"
|
|
|
|
def test_latest_path_no_matches(self, tmp_path):
|
|
assert latest_path(tmp_path, "*.nope") is None
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# FORMATTING & HELPERS
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
class TestFormatting:
|
|
def test_archive_batch_id(self):
|
|
assert archive_batch_id(1) == "batch_001"
|
|
assert archive_batch_id(42) == "batch_042"
|
|
assert archive_batch_id(100) == "batch_100"
|
|
|
|
def test_archive_profile_summary(self):
|
|
profile = {
|
|
"claims": [
|
|
{"status": "durable", "claim": "a"},
|
|
{"status": "durable", "claim": "b"},
|
|
{"status": "provisional", "claim": "c"},
|
|
{"status": "retracted", "claim": "d"},
|
|
]
|
|
}
|
|
summary = archive_profile_summary(profile)
|
|
assert len(summary["durable_claims"]) == 2
|
|
assert len(summary["provisional_claims"]) == 1
|
|
|
|
def test_archive_profile_summary_truncates(self):
|
|
"""Summaries are capped at 12 durable and 8 provisional."""
|
|
profile = {
|
|
"claims": [{"status": "durable", "claim": f"d{i}"} for i in range(20)]
|
|
+ [{"status": "provisional", "claim": f"p{i}"} for i in range(15)]
|
|
}
|
|
summary = archive_profile_summary(profile)
|
|
assert len(summary["durable_claims"]) <= 12
|
|
assert len(summary["provisional_claims"]) <= 8
|
|
|
|
def test_archive_profile_summary_empty(self):
|
|
assert archive_profile_summary({}) == {
|
|
"durable_claims": [],
|
|
"provisional_claims": [],
|
|
}
|
|
|
|
def test_format_tweets_for_prompt(self):
|
|
rows = [
|
|
{"tweet_id": "123", "created_at": "2024-01-01", "full_text": "Hello world"},
|
|
{"tweet_id": "456", "created_at": "2024-01-02", "full_text": "Goodbye world"},
|
|
]
|
|
result = format_tweets_for_prompt(rows)
|
|
assert "tweet_id=123" in result
|
|
assert "Hello world" in result
|
|
assert "2." in result # 1-indexed
|
|
|
|
def test_archive_default_checkpoint(self):
|
|
"""Default checkpoint has all required fields."""
|
|
cp = archive_default_checkpoint()
|
|
assert cp["phase"] == "discovery"
|
|
assert cp["next_offset"] == 0
|
|
assert cp["batch_size"] == 50
|
|
assert cp["batches_completed"] == 0
|