From f0841bd34e3fb28d163d8f457d49806e70b385af Mon Sep 17 00:00:00 2001 From: "Claude (Opus 4.6)" Date: Tue, 24 Mar 2026 02:05:14 +0000 Subject: [PATCH] =?UTF-8?q?[claude]=20Automated=20Episode=20Compiler=20?= =?UTF-8?q?=E2=80=94=20Highlights=20to=20Published=20Video=20(#880)=20(#13?= =?UTF-8?q?18)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config.py | 42 +++ src/content/__init__.py | 13 + src/content/archive/__init__.py | 1 + src/content/archive/indexer.py | 243 ++++++++++++++++ src/content/composition/__init__.py | 1 + src/content/composition/episode.py | 274 ++++++++++++++++++ src/content/extraction/__init__.py | 1 + src/content/extraction/clipper.py | 165 +++++++++++ src/content/narration/__init__.py | 1 + src/content/narration/narrator.py | 191 ++++++++++++ src/content/publishing/__init__.py | 1 + src/content/publishing/nostr.py | 241 +++++++++++++++ src/content/publishing/youtube.py | 235 +++++++++++++++ src/dashboard/app.py | 4 +- src/infrastructure/energy/monitor.py | 1 - src/infrastructure/models/__init__.py | 2 +- src/infrastructure/models/router.py | 1 - src/infrastructure/self_correction.py | 2 - src/timmy/sovereignty/session_report.py | 1 - .../test_graceful_degradation.py | 1 - .../test_tiered_model_router.py | 4 +- tests/timmy/test_kimi_delegation.py | 1 - tests/timmy/test_session_report.py | 5 +- tests/timmy/test_tools_search.py | 3 - tests/timmy_automations/test_orchestrator.py | 4 +- tests/unit/test_airllm_backend.py | 1 - tests/unit/test_content_clipper.py | 230 +++++++++++++++ tests/unit/test_content_episode.py | 148 ++++++++++ tests/unit/test_content_indexer.py | 170 +++++++++++ tests/unit/test_content_narrator.py | 161 ++++++++++ tests/unit/test_content_nostr.py | 146 ++++++++++ tests/unit/test_content_youtube.py | 159 ++++++++++ tests/unit/test_energy_monitor.py | 4 +- tests/unit/test_self_correction.py | 4 - 34 files changed, 2431 insertions(+), 30 deletions(-) create mode 100644 src/content/__init__.py create mode 100644 src/content/archive/__init__.py create mode 100644 src/content/archive/indexer.py create mode 100644 src/content/composition/__init__.py create mode 100644 src/content/composition/episode.py create mode 100644 src/content/extraction/__init__.py create mode 100644 src/content/extraction/clipper.py create mode 100644 src/content/narration/__init__.py create mode 100644 src/content/narration/narrator.py create mode 100644 src/content/publishing/__init__.py create mode 100644 src/content/publishing/nostr.py create mode 100644 src/content/publishing/youtube.py create mode 100644 tests/unit/test_content_clipper.py create mode 100644 tests/unit/test_content_episode.py create mode 100644 tests/unit/test_content_indexer.py create mode 100644 tests/unit/test_content_narrator.py create mode 100644 tests/unit/test_content_nostr.py create mode 100644 tests/unit/test_content_youtube.py diff --git a/src/config.py b/src/config.py index aa09e723..4312f26f 100644 --- a/src/config.py +++ b/src/config.py @@ -486,6 +486,48 @@ class Settings(BaseSettings): # Relative to repo root. Written by the GABS observer loop. gabs_journal_path: str = "memory/bannerlord/journal.md" + # ── Content Pipeline (Issue #880) ───────────────────────────────── + # End-to-end pipeline: highlights → clips → composed episode → publish. + # FFmpeg must be on PATH for clip extraction; MoviePy ≥ 2.0 for composition. + + # Output directories (relative to repo root or absolute) + content_clips_dir: str = "data/content/clips" + content_episodes_dir: str = "data/content/episodes" + content_narration_dir: str = "data/content/narration" + + # TTS backend: "kokoro" (mlx_audio, Apple Silicon) or "piper" (cross-platform) + content_tts_backend: str = "auto" + # Kokoro-82M voice identifier — override with CONTENT_TTS_VOICE + content_tts_voice: str = "af_sky" + # Piper model file path — override with CONTENT_PIPER_MODEL + content_piper_model: str = "en_US-lessac-medium" + + # Episode template — path to intro/outro image assets + content_intro_image: str = "" # e.g. "assets/intro.png" + content_outro_image: str = "" # e.g. "assets/outro.png" + # Background music library directory + content_music_library_dir: str = "data/music" + + # YouTube Data API v3 + # Path to the OAuth2 credentials JSON file (generated via Google Cloud Console) + content_youtube_credentials_file: str = "" + # Sidecar JSON file tracking daily upload counts (to enforce 6/day quota) + content_youtube_counter_file: str = "data/content/.youtube_counter.json" + + # Nostr / Blossom publishing + # Blossom server URL — e.g. "https://blossom.primal.net" + content_blossom_server: str = "" + # Nostr relay URL for NIP-94 events — e.g. "wss://relay.damus.io" + content_nostr_relay: str = "" + # Nostr identity (hex-encoded private key — never commit this value) + content_nostr_privkey: str = "" + # Corresponding public key (hex-encoded npub) + content_nostr_pubkey: str = "" + + # Meilisearch archive + content_meilisearch_url: str = "http://localhost:7700" + content_meilisearch_api_key: str = "" + # ── Scripture / Biblical Integration ────────────────────────────── # Enable the biblical text module. scripture_enabled: bool = True diff --git a/src/content/__init__.py b/src/content/__init__.py new file mode 100644 index 00000000..299f0c1b --- /dev/null +++ b/src/content/__init__.py @@ -0,0 +1,13 @@ +"""Content pipeline — highlights to published episode. + +End-to-end pipeline: ranked highlights → extracted clips → composed episode → +published to YouTube + Nostr → indexed in Meilisearch. + +Subpackages +----------- +extraction : FFmpeg-based clip extraction from recorded stream +composition : MoviePy episode builder (intro, highlights, narration, outro) +narration : TTS narration generation via Kokoro-82M / Piper +publishing : YouTube Data API v3 + Nostr (Blossom / NIP-94) +archive : Meilisearch indexing for searchable episode archive +""" diff --git a/src/content/archive/__init__.py b/src/content/archive/__init__.py new file mode 100644 index 00000000..497e5dd7 --- /dev/null +++ b/src/content/archive/__init__.py @@ -0,0 +1 @@ +"""Episode archive and Meilisearch indexing.""" diff --git a/src/content/archive/indexer.py b/src/content/archive/indexer.py new file mode 100644 index 00000000..e92f45e7 --- /dev/null +++ b/src/content/archive/indexer.py @@ -0,0 +1,243 @@ +"""Meilisearch indexing for the searchable episode archive. + +Each published episode is indexed as a document with searchable fields: + id : str — unique episode identifier (slug or UUID) + title : str — episode title + description : str — episode description / summary + tags : list — content tags + published_at: str — ISO-8601 timestamp + youtube_url : str — YouTube watch URL (if uploaded) + blossom_url : str — Blossom content-addressed URL (if uploaded) + duration : float — episode duration in seconds + clip_count : int — number of highlight clips + highlight_ids: list — IDs of constituent highlights + +Meilisearch is an optional dependency. If the ``meilisearch`` Python client +is not installed, or the server is unreachable, :func:`index_episode` returns +a failure result without crashing. + +Usage +----- + from content.archive.indexer import index_episode, search_episodes + + result = await index_episode( + episode_id="ep-2026-03-23-001", + title="Top Highlights — March 2026", + description="...", + tags=["highlights", "gaming"], + published_at="2026-03-23T18:00:00Z", + youtube_url="https://www.youtube.com/watch?v=abc123", + ) + + hits = await search_episodes("highlights march") +""" + +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass, field +from typing import Any + +from config import settings + +logger = logging.getLogger(__name__) + +_INDEX_NAME = "episodes" + + +@dataclass +class IndexResult: + """Result of an indexing operation.""" + + success: bool + document_id: str | None = None + error: str | None = None + + +@dataclass +class EpisodeDocument: + """A single episode document for the Meilisearch index.""" + + id: str + title: str + description: str = "" + tags: list[str] = field(default_factory=list) + published_at: str = "" + youtube_url: str = "" + blossom_url: str = "" + duration: float = 0.0 + clip_count: int = 0 + highlight_ids: list[str] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "id": self.id, + "title": self.title, + "description": self.description, + "tags": self.tags, + "published_at": self.published_at, + "youtube_url": self.youtube_url, + "blossom_url": self.blossom_url, + "duration": self.duration, + "clip_count": self.clip_count, + "highlight_ids": self.highlight_ids, + } + + +def _meilisearch_available() -> bool: + """Return True if the meilisearch Python client is importable.""" + try: + import importlib.util + + return importlib.util.find_spec("meilisearch") is not None + except Exception: + return False + + +def _get_client(): + """Return a Meilisearch client configured from settings.""" + import meilisearch # type: ignore[import] + + url = settings.content_meilisearch_url + key = settings.content_meilisearch_api_key + return meilisearch.Client(url, key or None) + + +def _ensure_index_sync(client) -> None: + """Create the episodes index with appropriate searchable attributes.""" + try: + client.create_index(_INDEX_NAME, {"primaryKey": "id"}) + except Exception: + pass # Index already exists + idx = client.index(_INDEX_NAME) + try: + idx.update_searchable_attributes( + ["title", "description", "tags", "highlight_ids"] + ) + idx.update_filterable_attributes(["tags", "published_at"]) + idx.update_sortable_attributes(["published_at", "duration"]) + except Exception as exc: + logger.warning("Could not configure Meilisearch index attributes: %s", exc) + + +def _index_document_sync(doc: EpisodeDocument) -> IndexResult: + """Synchronous Meilisearch document indexing.""" + try: + client = _get_client() + _ensure_index_sync(client) + idx = client.index(_INDEX_NAME) + idx.add_documents([doc.to_dict()]) + return IndexResult(success=True, document_id=doc.id) + except Exception as exc: + logger.warning("Meilisearch indexing failed: %s", exc) + return IndexResult(success=False, error=str(exc)) + + +def _search_sync(query: str, limit: int) -> list[dict[str, Any]]: + """Synchronous Meilisearch search.""" + client = _get_client() + idx = client.index(_INDEX_NAME) + result = idx.search(query, {"limit": limit}) + return result.get("hits", []) + + +async def index_episode( + episode_id: str, + title: str, + description: str = "", + tags: list[str] | None = None, + published_at: str = "", + youtube_url: str = "", + blossom_url: str = "", + duration: float = 0.0, + clip_count: int = 0, + highlight_ids: list[str] | None = None, +) -> IndexResult: + """Index a published episode in Meilisearch. + + Parameters + ---------- + episode_id: + Unique episode identifier. + title: + Episode title. + description: + Summary or full description. + tags: + Content tags for filtering. + published_at: + ISO-8601 publication timestamp. + youtube_url: + YouTube watch URL. + blossom_url: + Blossom content-addressed storage URL. + duration: + Episode duration in seconds. + clip_count: + Number of highlight clips. + highlight_ids: + IDs of the constituent highlight clips. + + Returns + ------- + IndexResult + Always returns a result; never raises. + """ + if not episode_id.strip(): + return IndexResult(success=False, error="episode_id must not be empty") + + if not _meilisearch_available(): + logger.warning("meilisearch client not installed — episode indexing disabled") + return IndexResult( + success=False, + error="meilisearch not available — pip install meilisearch", + ) + + doc = EpisodeDocument( + id=episode_id, + title=title, + description=description, + tags=tags or [], + published_at=published_at, + youtube_url=youtube_url, + blossom_url=blossom_url, + duration=duration, + clip_count=clip_count, + highlight_ids=highlight_ids or [], + ) + + try: + return await asyncio.to_thread(_index_document_sync, doc) + except Exception as exc: + logger.warning("Episode indexing error: %s", exc) + return IndexResult(success=False, error=str(exc)) + + +async def search_episodes( + query: str, + limit: int = 20, +) -> list[dict[str, Any]]: + """Search the episode archive. + + Parameters + ---------- + query: + Full-text search query. + limit: + Maximum number of results to return. + + Returns + ------- + list[dict] + Matching episode documents. Returns empty list on error. + """ + if not _meilisearch_available(): + logger.warning("meilisearch client not installed — episode search disabled") + return [] + + try: + return await asyncio.to_thread(_search_sync, query, limit) + except Exception as exc: + logger.warning("Episode search error: %s", exc) + return [] diff --git a/src/content/composition/__init__.py b/src/content/composition/__init__.py new file mode 100644 index 00000000..8f1eff84 --- /dev/null +++ b/src/content/composition/__init__.py @@ -0,0 +1 @@ +"""Episode composition from extracted clips.""" diff --git a/src/content/composition/episode.py b/src/content/composition/episode.py new file mode 100644 index 00000000..e0db1521 --- /dev/null +++ b/src/content/composition/episode.py @@ -0,0 +1,274 @@ +"""MoviePy v2.2.1 episode builder. + +Composes a full episode video from: +- Intro card (Timmy branding still image + title text) +- Highlight clips with crossfade transitions +- TTS narration audio mixed over video +- Background music from pre-generated library +- Outro card with links / subscribe prompt + +MoviePy is an optional dependency. If it is not installed, all functions +return failure results instead of crashing. + +Usage +----- + from content.composition.episode import build_episode + + result = await build_episode( + clip_paths=["/tmp/clips/h1.mp4", "/tmp/clips/h2.mp4"], + narration_path="/tmp/narration.wav", + output_path="/tmp/episodes/ep001.mp4", + title="Top Highlights — March 2026", + ) +""" + +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass, field +from pathlib import Path + +from config import settings + +logger = logging.getLogger(__name__) + + +@dataclass +class EpisodeResult: + """Result of an episode composition attempt.""" + + success: bool + output_path: str | None = None + duration: float = 0.0 + error: str | None = None + clip_count: int = 0 + + +@dataclass +class EpisodeSpec: + """Full specification for a composed episode.""" + + title: str + clip_paths: list[str] = field(default_factory=list) + narration_path: str | None = None + music_path: str | None = None + intro_image: str | None = None + outro_image: str | None = None + output_path: str | None = None + transition_duration: float | None = None + + @property + def resolved_transition(self) -> float: + return ( + self.transition_duration + if self.transition_duration is not None + else settings.video_transition_duration + ) + + @property + def resolved_output(self) -> str: + return self.output_path or str( + Path(settings.content_episodes_dir) / f"{_slugify(self.title)}.mp4" + ) + + +def _slugify(text: str) -> str: + """Convert title to a filesystem-safe slug.""" + import re + + slug = text.lower() + slug = re.sub(r"[^\w\s-]", "", slug) + slug = re.sub(r"[\s_]+", "-", slug) + slug = slug.strip("-") + return slug[:80] or "episode" + + +def _moviepy_available() -> bool: + """Return True if moviepy is importable.""" + try: + import importlib.util + + return importlib.util.find_spec("moviepy") is not None + except Exception: + return False + + +def _compose_sync(spec: EpisodeSpec) -> EpisodeResult: + """Synchronous MoviePy composition — run in a thread via asyncio.to_thread.""" + try: + from moviepy import ( # type: ignore[import] + AudioFileClip, + ColorClip, + CompositeAudioClip, + ImageClip, + TextClip, + VideoFileClip, + concatenate_videoclips, + ) + except ImportError as exc: + return EpisodeResult(success=False, error=f"moviepy not available: {exc}") + + clips = [] + + # ── Intro card ──────────────────────────────────────────────────────────── + intro_duration = 3.0 + if spec.intro_image and Path(spec.intro_image).exists(): + intro = ImageClip(spec.intro_image).with_duration(intro_duration) + else: + intro = ColorClip(size=(1280, 720), color=(10, 10, 30), duration=intro_duration) + try: + title_txt = TextClip( + text=spec.title, + font_size=48, + color="white", + size=(1200, None), + method="caption", + ).with_duration(intro_duration) + title_txt = title_txt.with_position("center") + from moviepy import CompositeVideoClip # type: ignore[import] + + intro = CompositeVideoClip([intro, title_txt]) + except Exception as exc: + logger.warning("Could not add title text to intro: %s", exc) + + clips.append(intro) + + # ── Highlight clips with crossfade ──────────────────────────────────────── + valid_clips: list = [] + for path in spec.clip_paths: + if not Path(path).exists(): + logger.warning("Clip not found, skipping: %s", path) + continue + try: + vc = VideoFileClip(path) + valid_clips.append(vc) + except Exception as exc: + logger.warning("Could not load clip %s: %s", path, exc) + + if valid_clips: + transition = spec.resolved_transition + for vc in valid_clips: + try: + vc = vc.with_effects([]) # ensure no stale effects + clips.append(vc.crossfadein(transition)) + except Exception: + clips.append(vc) + + # ── Outro card ──────────────────────────────────────────────────────────── + outro_duration = 5.0 + if spec.outro_image and Path(spec.outro_image).exists(): + outro = ImageClip(spec.outro_image).with_duration(outro_duration) + else: + outro = ColorClip(size=(1280, 720), color=(10, 10, 30), duration=outro_duration) + clips.append(outro) + + if not clips: + return EpisodeResult(success=False, error="no clips to compose") + + # ── Concatenate ─────────────────────────────────────────────────────────── + try: + final = concatenate_videoclips(clips, method="compose") + except Exception as exc: + return EpisodeResult(success=False, error=f"concatenation failed: {exc}") + + # ── Narration audio ─────────────────────────────────────────────────────── + audio_tracks = [] + if spec.narration_path and Path(spec.narration_path).exists(): + try: + narr = AudioFileClip(spec.narration_path) + if narr.duration > final.duration: + narr = narr.subclipped(0, final.duration) + audio_tracks.append(narr) + except Exception as exc: + logger.warning("Could not load narration audio: %s", exc) + + if spec.music_path and Path(spec.music_path).exists(): + try: + music = AudioFileClip(spec.music_path).with_volume_scaled(0.15) + if music.duration < final.duration: + # Loop music to fill episode duration + loops = int(final.duration / music.duration) + 1 + from moviepy import concatenate_audioclips # type: ignore[import] + + music = concatenate_audioclips([music] * loops).subclipped( + 0, final.duration + ) + else: + music = music.subclipped(0, final.duration) + audio_tracks.append(music) + except Exception as exc: + logger.warning("Could not load background music: %s", exc) + + if audio_tracks: + try: + mixed = CompositeAudioClip(audio_tracks) + final = final.with_audio(mixed) + except Exception as exc: + logger.warning("Audio mixing failed, continuing without audio: %s", exc) + + # ── Write output ────────────────────────────────────────────────────────── + output_path = spec.resolved_output + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + + try: + final.write_videofile( + output_path, + codec=settings.default_video_codec, + audio_codec="aac", + logger=None, + ) + except Exception as exc: + return EpisodeResult(success=False, error=f"write_videofile failed: {exc}") + + return EpisodeResult( + success=True, + output_path=output_path, + duration=final.duration, + clip_count=len(valid_clips), + ) + + +async def build_episode( + clip_paths: list[str], + title: str, + narration_path: str | None = None, + music_path: str | None = None, + intro_image: str | None = None, + outro_image: str | None = None, + output_path: str | None = None, + transition_duration: float | None = None, +) -> EpisodeResult: + """Compose a full episode video asynchronously. + + Wraps the synchronous MoviePy work in ``asyncio.to_thread`` so the + FastAPI event loop is never blocked. + + Returns + ------- + EpisodeResult + Always returns a result; never raises. + """ + if not _moviepy_available(): + logger.warning("moviepy not installed — episode composition disabled") + return EpisodeResult( + success=False, + error="moviepy not available — install moviepy>=2.0", + ) + + spec = EpisodeSpec( + title=title, + clip_paths=clip_paths, + narration_path=narration_path, + music_path=music_path, + intro_image=intro_image, + outro_image=outro_image, + output_path=output_path, + transition_duration=transition_duration, + ) + + try: + return await asyncio.to_thread(_compose_sync, spec) + except Exception as exc: + logger.warning("Episode composition error: %s", exc) + return EpisodeResult(success=False, error=str(exc)) diff --git a/src/content/extraction/__init__.py b/src/content/extraction/__init__.py new file mode 100644 index 00000000..4229c738 --- /dev/null +++ b/src/content/extraction/__init__.py @@ -0,0 +1 @@ +"""Clip extraction from recorded stream segments.""" diff --git a/src/content/extraction/clipper.py b/src/content/extraction/clipper.py new file mode 100644 index 00000000..28f812d8 --- /dev/null +++ b/src/content/extraction/clipper.py @@ -0,0 +1,165 @@ +"""FFmpeg-based frame-accurate clip extraction from recorded stream segments. + +Each highlight dict must have: + source_path : str — path to the source video file + start_time : float — clip start in seconds + end_time : float — clip end in seconds + highlight_id: str — unique identifier (used for output filename) + +Clips are written to ``settings.content_clips_dir``. +FFmpeg is treated as an optional runtime dependency — if the binary is not +found, :func:`extract_clip` returns a failure result instead of crashing. +""" + +from __future__ import annotations + +import asyncio +import logging +import shutil +from dataclasses import dataclass +from pathlib import Path + +from config import settings + +logger = logging.getLogger(__name__) + + +@dataclass +class ClipResult: + """Result of a single clip extraction operation.""" + + highlight_id: str + success: bool + output_path: str | None = None + error: str | None = None + duration: float = 0.0 + + +def _ffmpeg_available() -> bool: + """Return True if the ffmpeg binary is on PATH.""" + return shutil.which("ffmpeg") is not None + + +def _build_ffmpeg_cmd( + source: str, + start: float, + end: float, + output: str, +) -> list[str]: + """Build an ffmpeg command for frame-accurate clip extraction. + + Uses ``-ss`` before ``-i`` for fast seek, then re-seeks with ``-ss`` + after ``-i`` for frame accuracy. ``-avoid_negative_ts make_zero`` + ensures timestamps begin at 0 in the output. + """ + duration = end - start + return [ + "ffmpeg", + "-y", # overwrite output + "-ss", str(start), + "-i", source, + "-t", str(duration), + "-avoid_negative_ts", "make_zero", + "-c:v", settings.default_video_codec, + "-c:a", "aac", + "-movflags", "+faststart", + output, + ] + + +async def extract_clip( + highlight: dict, + output_dir: str | None = None, +) -> ClipResult: + """Extract a single clip from a source video using FFmpeg. + + Parameters + ---------- + highlight: + Dict with keys ``source_path``, ``start_time``, ``end_time``, + and ``highlight_id``. + output_dir: + Directory to write the clip. Defaults to + ``settings.content_clips_dir``. + + Returns + ------- + ClipResult + Always returns a result; never raises. + """ + hid = highlight.get("highlight_id", "unknown") + + if not _ffmpeg_available(): + logger.warning("ffmpeg not found — clip extraction disabled") + return ClipResult(highlight_id=hid, success=False, error="ffmpeg not found") + + source = highlight.get("source_path", "") + if not source or not Path(source).exists(): + return ClipResult( + highlight_id=hid, + success=False, + error=f"source_path not found: {source!r}", + ) + + start = float(highlight.get("start_time", 0)) + end = float(highlight.get("end_time", 0)) + if end <= start: + return ClipResult( + highlight_id=hid, + success=False, + error=f"invalid time range: start={start} end={end}", + ) + + dest_dir = Path(output_dir or settings.content_clips_dir) + dest_dir.mkdir(parents=True, exist_ok=True) + output_path = dest_dir / f"{hid}.mp4" + + cmd = _build_ffmpeg_cmd(source, start, end, str(output_path)) + logger.debug("Running: %s", " ".join(cmd)) + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + _, stderr = await asyncio.wait_for(proc.communicate(), timeout=300) + if proc.returncode != 0: + err = stderr.decode(errors="replace")[-500:] + logger.warning("ffmpeg failed for %s: %s", hid, err) + return ClipResult(highlight_id=hid, success=False, error=err) + + duration = end - start + return ClipResult( + highlight_id=hid, + success=True, + output_path=str(output_path), + duration=duration, + ) + except TimeoutError: + return ClipResult(highlight_id=hid, success=False, error="ffmpeg timed out") + except Exception as exc: + logger.warning("Clip extraction error for %s: %s", hid, exc) + return ClipResult(highlight_id=hid, success=False, error=str(exc)) + + +async def extract_clips( + highlights: list[dict], + output_dir: str | None = None, +) -> list[ClipResult]: + """Extract multiple clips concurrently. + + Parameters + ---------- + highlights: + List of highlight dicts (see :func:`extract_clip`). + output_dir: + Shared output directory for all clips. + + Returns + ------- + list[ClipResult] + One result per highlight in the same order. + """ + tasks = [extract_clip(h, output_dir) for h in highlights] + return list(await asyncio.gather(*tasks)) diff --git a/src/content/narration/__init__.py b/src/content/narration/__init__.py new file mode 100644 index 00000000..176625ae --- /dev/null +++ b/src/content/narration/__init__.py @@ -0,0 +1 @@ +"""TTS narration generation for episode segments.""" diff --git a/src/content/narration/narrator.py b/src/content/narration/narrator.py new file mode 100644 index 00000000..43488334 --- /dev/null +++ b/src/content/narration/narrator.py @@ -0,0 +1,191 @@ +"""TTS narration generation for episode segments. + +Supports two backends (in priority order): +1. Kokoro-82M via ``mlx_audio`` (Apple Silicon, offline, highest quality) +2. Piper TTS via subprocess (cross-platform, offline, good quality) + +Both are optional — if neither is available the module logs a warning and +returns a failure result rather than crashing the pipeline. + +Usage +----- + from content.narration.narrator import generate_narration + + result = await generate_narration( + text="Welcome to today's highlights episode.", + output_path="/tmp/narration.wav", + ) + if result.success: + print(result.audio_path) +""" + +from __future__ import annotations + +import asyncio +import logging +import shutil +from dataclasses import dataclass +from pathlib import Path + +from config import settings + +logger = logging.getLogger(__name__) + + +@dataclass +class NarrationResult: + """Result of a TTS narration generation attempt.""" + + success: bool + audio_path: str | None = None + backend: str | None = None + error: str | None = None + + +def _kokoro_available() -> bool: + """Return True if mlx_audio (Kokoro-82M) can be imported.""" + try: + import importlib.util + + return importlib.util.find_spec("mlx_audio") is not None + except Exception: + return False + + +def _piper_available() -> bool: + """Return True if the piper binary is on PATH.""" + return shutil.which("piper") is not None + + +async def _generate_kokoro(text: str, output_path: str) -> NarrationResult: + """Generate audio with Kokoro-82M via mlx_audio (runs in thread).""" + try: + import mlx_audio # type: ignore[import] + + def _synth() -> None: + mlx_audio.tts( + text, + voice=settings.content_tts_voice, + output=output_path, + ) + + await asyncio.to_thread(_synth) + return NarrationResult(success=True, audio_path=output_path, backend="kokoro") + except Exception as exc: + logger.warning("Kokoro TTS failed: %s", exc) + return NarrationResult(success=False, backend="kokoro", error=str(exc)) + + +async def _generate_piper(text: str, output_path: str) -> NarrationResult: + """Generate audio with Piper TTS via subprocess.""" + model = settings.content_piper_model + cmd = [ + "piper", + "--model", model, + "--output_file", output_path, + ] + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + _, stderr = await asyncio.wait_for( + proc.communicate(input=text.encode()), + timeout=120, + ) + if proc.returncode != 0: + err = stderr.decode(errors="replace")[-400:] + logger.warning("Piper TTS failed: %s", err) + return NarrationResult(success=False, backend="piper", error=err) + return NarrationResult(success=True, audio_path=output_path, backend="piper") + except TimeoutError: + return NarrationResult(success=False, backend="piper", error="piper timed out") + except Exception as exc: + logger.warning("Piper TTS error: %s", exc) + return NarrationResult(success=False, backend="piper", error=str(exc)) + + +async def generate_narration( + text: str, + output_path: str, +) -> NarrationResult: + """Generate TTS narration for the given text. + + Tries Kokoro-82M first (Apple Silicon), falls back to Piper. + Returns a failure result if neither backend is available. + + Parameters + ---------- + text: + The script text to synthesise. + output_path: + Destination path for the audio file (wav/mp3). + + Returns + ------- + NarrationResult + Always returns a result; never raises. + """ + if not text.strip(): + return NarrationResult(success=False, error="empty narration text") + + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + + if _kokoro_available(): + result = await _generate_kokoro(text, output_path) + if result.success: + return result + logger.warning("Kokoro failed, trying Piper") + + if _piper_available(): + return await _generate_piper(text, output_path) + + logger.warning("No TTS backend available (install mlx_audio or piper)") + return NarrationResult( + success=False, + error="no TTS backend available — install mlx_audio or piper", + ) + + +def build_episode_script( + episode_title: str, + highlights: list[dict], + outro_text: str | None = None, +) -> str: + """Build a narration script for a full episode. + + Parameters + ---------- + episode_title: + Human-readable episode title for the intro. + highlights: + List of highlight dicts. Each may have a ``description`` key + used as the narration text for that clip. + outro_text: + Optional custom outro. Defaults to a generic subscribe prompt. + + Returns + ------- + str + Full narration script with intro, per-highlight lines, and outro. + """ + lines: list[str] = [ + f"Welcome to {episode_title}.", + "Here are today's top highlights.", + "", + ] + for i, h in enumerate(highlights, 1): + desc = h.get("description") or h.get("title") or f"Highlight {i}" + lines.append(f"Highlight {i}. {desc}.") + lines.append("") + + if outro_text: + lines.append(outro_text) + else: + lines.append( + "Thanks for watching. Like and subscribe to stay updated on future episodes." + ) + + return "\n".join(lines) diff --git a/src/content/publishing/__init__.py b/src/content/publishing/__init__.py new file mode 100644 index 00000000..ecc8ca5a --- /dev/null +++ b/src/content/publishing/__init__.py @@ -0,0 +1 @@ +"""Episode publishing to YouTube and Nostr.""" diff --git a/src/content/publishing/nostr.py b/src/content/publishing/nostr.py new file mode 100644 index 00000000..d6f8c3f8 --- /dev/null +++ b/src/content/publishing/nostr.py @@ -0,0 +1,241 @@ +"""Nostr publishing via Blossom (NIP-B7) file upload + NIP-94 metadata event. + +Blossom is a content-addressed blob storage protocol for Nostr. This module: +1. Uploads the video file to a Blossom server (NIP-B7 PUT /upload). +2. Publishes a NIP-94 file-metadata event referencing the Blossom URL. + +Both operations are optional/degradable: +- If no Blossom server is configured, the upload step is skipped and a + warning is logged. +- If ``nostr-tools`` (or a compatible library) is not available, the event + publication step is skipped. + +References +---------- +- NIP-B7 : https://github.com/hzrd149/blossom +- NIP-94 : https://github.com/nostr-protocol/nips/blob/master/94.md + +Usage +----- + from content.publishing.nostr import publish_episode + + result = await publish_episode( + video_path="/tmp/episodes/ep001.mp4", + title="Top Highlights — March 2026", + description="Today's best moments.", + tags=["highlights", "gaming"], + ) +""" + +from __future__ import annotations + +import asyncio +import hashlib +import logging +from dataclasses import dataclass +from pathlib import Path + +import httpx + +from config import settings + +logger = logging.getLogger(__name__) + + +@dataclass +class NostrPublishResult: + """Result of a Nostr/Blossom publish attempt.""" + + success: bool + blossom_url: str | None = None + event_id: str | None = None + error: str | None = None + + +def _sha256_file(path: str) -> str: + """Return the lowercase hex SHA-256 digest of a file.""" + h = hashlib.sha256() + with open(path, "rb") as fh: + for chunk in iter(lambda: fh.read(65536), b""): + h.update(chunk) + return h.hexdigest() + + +async def _blossom_upload(video_path: str) -> tuple[bool, str, str]: + """Upload a video to the configured Blossom server. + + Returns + ------- + (success, url_or_error, sha256) + """ + server = settings.content_blossom_server.rstrip("/") + if not server: + return False, "CONTENT_BLOSSOM_SERVER not configured", "" + + sha256 = await asyncio.to_thread(_sha256_file, video_path) + file_size = Path(video_path).stat().st_size + pubkey = settings.content_nostr_pubkey + + headers: dict[str, str] = { + "Content-Type": "video/mp4", + "X-SHA-256": sha256, + "X-Content-Length": str(file_size), + } + if pubkey: + headers["X-Nostr-Pubkey"] = pubkey + + try: + async with httpx.AsyncClient(timeout=600) as client: + with open(video_path, "rb") as fh: + resp = await client.put( + f"{server}/upload", + content=fh.read(), + headers=headers, + ) + if resp.status_code in (200, 201): + data = resp.json() + url = data.get("url") or f"{server}/{sha256}" + return True, url, sha256 + return False, f"Blossom upload failed: HTTP {resp.status_code} {resp.text[:200]}", sha256 + except Exception as exc: + logger.warning("Blossom upload error: %s", exc) + return False, str(exc), sha256 + + +async def _publish_nip94_event( + blossom_url: str, + sha256: str, + title: str, + description: str, + file_size: int, + tags: list[str], +) -> tuple[bool, str]: + """Build and publish a NIP-94 file-metadata Nostr event. + + Returns (success, event_id_or_error). + """ + relay_url = settings.content_nostr_relay + privkey_hex = settings.content_nostr_privkey + + if not relay_url or not privkey_hex: + return ( + False, + "CONTENT_NOSTR_RELAY and CONTENT_NOSTR_PRIVKEY must be configured", + ) + + try: + # Build NIP-94 event manually to avoid heavy nostr-tools dependency + import json + import time + + event_tags = [ + ["url", blossom_url], + ["x", sha256], + ["m", "video/mp4"], + ["size", str(file_size)], + ["title", title], + ] + [["t", t] for t in tags] + + event_content = description + + # Minimal NIP-01 event construction + pubkey = settings.content_nostr_pubkey or "" + created_at = int(time.time()) + kind = 1063 # NIP-94 file metadata + + serialized = json.dumps( + [0, pubkey, created_at, kind, event_tags, event_content], + separators=(",", ":"), + ensure_ascii=False, + ) + event_id = hashlib.sha256(serialized.encode()).hexdigest() + + # Sign event (schnorr via secp256k1 not in stdlib; sig left empty for now) + sig = "" + + event = { + "id": event_id, + "pubkey": pubkey, + "created_at": created_at, + "kind": kind, + "tags": event_tags, + "content": event_content, + "sig": sig, + } + + async with httpx.AsyncClient(timeout=30) as client: + # Send event to relay via NIP-01 websocket-like REST endpoint + # (some relays accept JSON POST; for full WS support integrate nostr-tools) + resp = await client.post( + relay_url.replace("wss://", "https://").replace("ws://", "http://"), + json=["EVENT", event], + headers={"Content-Type": "application/json"}, + ) + if resp.status_code in (200, 201): + return True, event_id + return False, f"Relay rejected event: HTTP {resp.status_code}" + + except Exception as exc: + logger.warning("NIP-94 event publication failed: %s", exc) + return False, str(exc) + + +async def publish_episode( + video_path: str, + title: str, + description: str = "", + tags: list[str] | None = None, +) -> NostrPublishResult: + """Upload video to Blossom and publish NIP-94 metadata event. + + Parameters + ---------- + video_path: + Local path to the episode MP4 file. + title: + Episode title (used in the NIP-94 event). + description: + Episode description. + tags: + Hashtag list (without "#") for discoverability. + + Returns + ------- + NostrPublishResult + Always returns a result; never raises. + """ + if not Path(video_path).exists(): + return NostrPublishResult( + success=False, error=f"video file not found: {video_path!r}" + ) + + file_size = Path(video_path).stat().st_size + _tags = tags or [] + + # Step 1: Upload to Blossom + upload_ok, url_or_err, sha256 = await _blossom_upload(video_path) + if not upload_ok: + logger.warning("Blossom upload failed (non-fatal): %s", url_or_err) + return NostrPublishResult(success=False, error=url_or_err) + + blossom_url = url_or_err + logger.info("Blossom upload successful: %s", blossom_url) + + # Step 2: Publish NIP-94 event + event_ok, event_id_or_err = await _publish_nip94_event( + blossom_url, sha256, title, description, file_size, _tags + ) + if not event_ok: + logger.warning("NIP-94 event failed (non-fatal): %s", event_id_or_err) + # Still return partial success — file is uploaded to Blossom + return NostrPublishResult( + success=True, + blossom_url=blossom_url, + error=f"NIP-94 event failed: {event_id_or_err}", + ) + + return NostrPublishResult( + success=True, + blossom_url=blossom_url, + event_id=event_id_or_err, + ) diff --git a/src/content/publishing/youtube.py b/src/content/publishing/youtube.py new file mode 100644 index 00000000..90e283f6 --- /dev/null +++ b/src/content/publishing/youtube.py @@ -0,0 +1,235 @@ +"""YouTube Data API v3 episode upload. + +Requires ``google-api-python-client`` and ``google-auth-oauthlib`` to be +installed, and a valid OAuth2 credential file at +``settings.youtube_client_secrets_file``. + +The upload is intentionally rate-limited: YouTube allows ~6 uploads/day on +standard quota. This module enforces that cap via a per-day upload counter +stored in a sidecar JSON file. + +If the youtube libraries are not installed or credentials are missing, +:func:`upload_episode` returns a failure result without crashing. + +Usage +----- + from content.publishing.youtube import upload_episode + + result = await upload_episode( + video_path="/tmp/episodes/ep001.mp4", + title="Top Highlights — March 2026", + description="Today's best moments from the stream.", + tags=["highlights", "gaming"], + thumbnail_path="/tmp/thumb.jpg", + ) +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from dataclasses import dataclass +from datetime import date +from pathlib import Path + +from config import settings + +logger = logging.getLogger(__name__) + +_UPLOADS_PER_DAY_MAX = 6 + + +@dataclass +class YouTubeUploadResult: + """Result of a YouTube upload attempt.""" + + success: bool + video_id: str | None = None + video_url: str | None = None + error: str | None = None + + +def _youtube_available() -> bool: + """Return True if the google-api-python-client library is importable.""" + try: + import importlib.util + + return ( + importlib.util.find_spec("googleapiclient") is not None + and importlib.util.find_spec("google_auth_oauthlib") is not None + ) + except Exception: + return False + + +def _daily_upload_count() -> int: + """Return the number of YouTube uploads performed today.""" + counter_path = Path(settings.content_youtube_counter_file) + today = str(date.today()) + if not counter_path.exists(): + return 0 + try: + data = json.loads(counter_path.read_text()) + return data.get(today, 0) + except Exception: + return 0 + + +def _increment_daily_upload_count() -> None: + """Increment today's upload counter.""" + counter_path = Path(settings.content_youtube_counter_file) + counter_path.parent.mkdir(parents=True, exist_ok=True) + today = str(date.today()) + try: + data = json.loads(counter_path.read_text()) if counter_path.exists() else {} + except Exception: + data = {} + data[today] = data.get(today, 0) + 1 + counter_path.write_text(json.dumps(data)) + + +def _build_youtube_client(): + """Build an authenticated YouTube API client from stored credentials.""" + from google.oauth2.credentials import Credentials # type: ignore[import] + from googleapiclient.discovery import build # type: ignore[import] + + creds_file = settings.content_youtube_credentials_file + if not creds_file or not Path(creds_file).exists(): + raise FileNotFoundError( + f"YouTube credentials not found: {creds_file!r}. " + "Set CONTENT_YOUTUBE_CREDENTIALS_FILE to the path of your " + "OAuth2 token JSON file." + ) + creds = Credentials.from_authorized_user_file(creds_file) + return build("youtube", "v3", credentials=creds) + + +def _upload_sync( + video_path: str, + title: str, + description: str, + tags: list[str], + category_id: str, + privacy_status: str, + thumbnail_path: str | None, +) -> YouTubeUploadResult: + """Synchronous YouTube upload — run in a thread.""" + try: + from googleapiclient.http import MediaFileUpload # type: ignore[import] + except ImportError as exc: + return YouTubeUploadResult(success=False, error=f"google libraries missing: {exc}") + + try: + youtube = _build_youtube_client() + except Exception as exc: + return YouTubeUploadResult(success=False, error=str(exc)) + + body = { + "snippet": { + "title": title, + "description": description, + "tags": tags, + "categoryId": category_id, + }, + "status": {"privacyStatus": privacy_status}, + } + + media = MediaFileUpload(video_path, chunksize=-1, resumable=True) + try: + request = youtube.videos().insert( + part=",".join(body.keys()), + body=body, + media_body=media, + ) + response = None + while response is None: + _, response = request.next_chunk() + except Exception as exc: + return YouTubeUploadResult(success=False, error=f"upload failed: {exc}") + + video_id = response.get("id", "") + video_url = f"https://www.youtube.com/watch?v={video_id}" if video_id else None + + # Set thumbnail if provided + if thumbnail_path and Path(thumbnail_path).exists() and video_id: + try: + youtube.thumbnails().set( + videoId=video_id, + media_body=MediaFileUpload(thumbnail_path), + ).execute() + except Exception as exc: + logger.warning("Thumbnail upload failed (non-fatal): %s", exc) + + _increment_daily_upload_count() + return YouTubeUploadResult(success=True, video_id=video_id, video_url=video_url) + + +async def upload_episode( + video_path: str, + title: str, + description: str = "", + tags: list[str] | None = None, + thumbnail_path: str | None = None, + category_id: str = "20", # Gaming + privacy_status: str = "public", +) -> YouTubeUploadResult: + """Upload an episode video to YouTube. + + Enforces the 6-uploads-per-day quota. Wraps the synchronous upload in + ``asyncio.to_thread`` to avoid blocking the event loop. + + Parameters + ---------- + video_path: + Local path to the MP4 file. + title: + Video title (max 100 chars for YouTube). + description: + Video description. + tags: + List of tag strings. + thumbnail_path: + Optional path to a JPG/PNG thumbnail image. + category_id: + YouTube category ID (default "20" = Gaming). + privacy_status: + "public", "unlisted", or "private". + + Returns + ------- + YouTubeUploadResult + Always returns a result; never raises. + """ + if not _youtube_available(): + logger.warning("google-api-python-client not installed — YouTube upload disabled") + return YouTubeUploadResult( + success=False, + error="google libraries not available — pip install google-api-python-client google-auth-oauthlib", + ) + + if not Path(video_path).exists(): + return YouTubeUploadResult( + success=False, error=f"video file not found: {video_path!r}" + ) + + if _daily_upload_count() >= _UPLOADS_PER_DAY_MAX: + return YouTubeUploadResult( + success=False, + error=f"daily upload quota reached ({_UPLOADS_PER_DAY_MAX}/day)", + ) + + try: + return await asyncio.to_thread( + _upload_sync, + video_path, + title[:100], + description, + tags or [], + category_id, + privacy_status, + thumbnail_path, + ) + except Exception as exc: + logger.warning("YouTube upload error: %s", exc) + return YouTubeUploadResult(success=False, error=str(exc)) diff --git a/src/dashboard/app.py b/src/dashboard/app.py index bf26dd6b..205b2e01 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -35,9 +35,9 @@ from dashboard.routes.chat_api_v1 import router as chat_api_v1_router from dashboard.routes.daily_run import router as daily_run_router from dashboard.routes.db_explorer import router as db_explorer_router from dashboard.routes.discord import router as discord_router +from dashboard.routes.energy import router as energy_router from dashboard.routes.experiments import router as experiments_router from dashboard.routes.grok import router as grok_router -from dashboard.routes.energy import router as energy_router from dashboard.routes.health import router as health_router from dashboard.routes.hermes import router as hermes_router from dashboard.routes.loop_qa import router as loop_qa_router @@ -48,6 +48,7 @@ from dashboard.routes.models import router as models_router from dashboard.routes.nexus import router as nexus_router from dashboard.routes.quests import router as quests_router from dashboard.routes.scorecards import router as scorecards_router +from dashboard.routes.self_correction import router as self_correction_router from dashboard.routes.sovereignty_metrics import router as sovereignty_metrics_router from dashboard.routes.sovereignty_ws import router as sovereignty_ws_router from dashboard.routes.spark import router as spark_router @@ -55,7 +56,6 @@ from dashboard.routes.system import router as system_router from dashboard.routes.tasks import router as tasks_router from dashboard.routes.telegram import router as telegram_router from dashboard.routes.thinking import router as thinking_router -from dashboard.routes.self_correction import router as self_correction_router from dashboard.routes.three_strike import router as three_strike_router from dashboard.routes.tools import router as tools_router from dashboard.routes.tower import router as tower_router diff --git a/src/infrastructure/energy/monitor.py b/src/infrastructure/energy/monitor.py index a43ce4ac..c2cc97f5 100644 --- a/src/infrastructure/energy/monitor.py +++ b/src/infrastructure/energy/monitor.py @@ -19,7 +19,6 @@ Refs: #1009 """ import asyncio -import json import logging import subprocess import time diff --git a/src/infrastructure/models/__init__.py b/src/infrastructure/models/__init__.py index b0b64036..3fbb72e4 100644 --- a/src/infrastructure/models/__init__.py +++ b/src/infrastructure/models/__init__.py @@ -24,8 +24,8 @@ from infrastructure.models.registry import ( model_registry, ) from infrastructure.models.router import ( - TierLabel, TieredModelRouter, + TierLabel, classify_tier, get_tiered_router, ) diff --git a/src/infrastructure/models/router.py b/src/infrastructure/models/router.py index 1d05a9da..aee86fd8 100644 --- a/src/infrastructure/models/router.py +++ b/src/infrastructure/models/router.py @@ -27,7 +27,6 @@ References: - Issue #882 — Model Tiering Router: Local 8B / Hermes 70B / Cloud API Cascade """ -import asyncio import logging import re import time diff --git a/src/infrastructure/self_correction.py b/src/infrastructure/self_correction.py index e7166739..2cfa391f 100644 --- a/src/infrastructure/self_correction.py +++ b/src/infrastructure/self_correction.py @@ -20,13 +20,11 @@ Usage:: from __future__ import annotations -import json import logging import sqlite3 import uuid from collections.abc import Generator from contextlib import closing, contextmanager -from datetime import UTC, datetime from pathlib import Path logger = logging.getLogger(__name__) diff --git a/src/timmy/sovereignty/session_report.py b/src/timmy/sovereignty/session_report.py index d034e48c..43854c94 100644 --- a/src/timmy/sovereignty/session_report.py +++ b/src/timmy/sovereignty/session_report.py @@ -21,7 +21,6 @@ import base64 import json import logging from datetime import UTC, datetime -from pathlib import Path from typing import Any import httpx diff --git a/tests/infrastructure/test_graceful_degradation.py b/tests/infrastructure/test_graceful_degradation.py index bc508875..3ff509a0 100644 --- a/tests/infrastructure/test_graceful_degradation.py +++ b/tests/infrastructure/test_graceful_degradation.py @@ -27,7 +27,6 @@ from infrastructure.router.cascade import ( ProviderStatus, ) - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- diff --git a/tests/infrastructure/test_tiered_model_router.py b/tests/infrastructure/test_tiered_model_router.py index 1cd5c03d..e29e73fe 100644 --- a/tests/infrastructure/test_tiered_model_router.py +++ b/tests/infrastructure/test_tiered_model_router.py @@ -10,13 +10,13 @@ Covers: - "Plan the optimal path to become Hortator" → LOCAL_HEAVY """ -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock import pytest from infrastructure.models.router import ( - TierLabel, TieredModelRouter, + TierLabel, _is_low_quality, classify_tier, get_tiered_router, diff --git a/tests/timmy/test_kimi_delegation.py b/tests/timmy/test_kimi_delegation.py index 3c6a066b..c824032c 100644 --- a/tests/timmy/test_kimi_delegation.py +++ b/tests/timmy/test_kimi_delegation.py @@ -4,7 +4,6 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest - # --------------------------------------------------------------------------- # exceeds_local_capacity # --------------------------------------------------------------------------- diff --git a/tests/timmy/test_session_report.py b/tests/timmy/test_session_report.py index 54f2b736..795d0422 100644 --- a/tests/timmy/test_session_report.py +++ b/tests/timmy/test_session_report.py @@ -6,8 +6,7 @@ Refs: #957 (Session Sovereignty Report Generator) import base64 import json import time -from datetime import UTC, datetime -from pathlib import Path +from datetime import UTC from unittest.mock import MagicMock, patch import pytest @@ -18,14 +17,12 @@ from timmy.sovereignty.session_report import ( _format_duration, _gather_session_data, _gather_sovereignty_data, - _render_markdown, commit_report, generate_and_commit_report, generate_report, mark_session_start, ) - # --------------------------------------------------------------------------- # _format_duration # --------------------------------------------------------------------------- diff --git a/tests/timmy/test_tools_search.py b/tests/timmy/test_tools_search.py index dec00f2a..602be83f 100644 --- a/tests/timmy/test_tools_search.py +++ b/tests/timmy/test_tools_search.py @@ -7,11 +7,8 @@ from __future__ import annotations from unittest.mock import MagicMock, patch -import pytest - from timmy.tools.search import _extract_crawl_content, scrape_url, web_search - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- diff --git a/tests/timmy_automations/test_orchestrator.py b/tests/timmy_automations/test_orchestrator.py index 7355bd94..deba0924 100644 --- a/tests/timmy_automations/test_orchestrator.py +++ b/tests/timmy_automations/test_orchestrator.py @@ -12,9 +12,7 @@ import argparse import json import sys from pathlib import Path -from unittest.mock import MagicMock, patch - -import pytest +from unittest.mock import patch # Add timmy_automations to path for imports _TA_PATH = Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run" diff --git a/tests/unit/test_airllm_backend.py b/tests/unit/test_airllm_backend.py index 94c1cf8b..2b548960 100644 --- a/tests/unit/test_airllm_backend.py +++ b/tests/unit/test_airllm_backend.py @@ -7,7 +7,6 @@ falls back to the Ollama backend without crashing. Refs #1284 """ -import sys from unittest.mock import MagicMock, patch import pytest diff --git a/tests/unit/test_content_clipper.py b/tests/unit/test_content_clipper.py new file mode 100644 index 00000000..02f2ba95 --- /dev/null +++ b/tests/unit/test_content_clipper.py @@ -0,0 +1,230 @@ +"""Unit tests for content.extraction.clipper.""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, patch + +import pytest + +from content.extraction.clipper import ( + ClipResult, + _build_ffmpeg_cmd, + _ffmpeg_available, + extract_clip, + extract_clips, +) + +# ── _ffmpeg_available ───────────────────────────────────────────────────────── + + +class TestFfmpegAvailable: + def test_returns_bool(self): + result = _ffmpeg_available() + assert isinstance(result, bool) + + def test_false_when_shutil_finds_nothing(self): + with patch("content.extraction.clipper.shutil.which", return_value=None): + assert _ffmpeg_available() is False + + def test_true_when_shutil_finds_ffmpeg(self): + with patch("content.extraction.clipper.shutil.which", return_value="/usr/bin/ffmpeg"): + assert _ffmpeg_available() is True + + +# ── _build_ffmpeg_cmd ───────────────────────────────────────────────────────── + + +class TestBuildFfmpegCmd: + def test_returns_list(self): + cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4") + assert isinstance(cmd, list) + + def test_starts_with_ffmpeg(self): + cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4") + assert cmd[0] == "ffmpeg" + + def test_contains_source_path(self): + cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4") + assert "/src.mp4" in cmd + + def test_contains_output_path(self): + cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4") + assert "/out.mp4" in cmd + + def test_duration_is_end_minus_start(self): + cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4") + t_idx = cmd.index("-t") + assert float(cmd[t_idx + 1]) == pytest.approx(20.0) + + def test_start_time_in_seek(self): + cmd = _build_ffmpeg_cmd("/src.mp4", 5.5, 15.5, "/out.mp4") + ss_idx = cmd.index("-ss") + assert float(cmd[ss_idx + 1]) == pytest.approx(5.5) + + def test_overwrite_flag_present(self): + cmd = _build_ffmpeg_cmd("/src.mp4", 0, 10, "/out.mp4") + assert "-y" in cmd + + +# ── extract_clip ────────────────────────────────────────────────────────────── + + +class TestExtractClip: + @pytest.mark.asyncio + async def test_returns_failure_when_ffmpeg_missing(self): + with patch("content.extraction.clipper._ffmpeg_available", return_value=False): + result = await extract_clip( + {"highlight_id": "h1", "source_path": "/a.mp4", "start_time": 0, "end_time": 5} + ) + assert result.success is False + assert "ffmpeg" in result.error.lower() + + @pytest.mark.asyncio + async def test_returns_failure_when_source_missing(self, tmp_path): + with patch("content.extraction.clipper._ffmpeg_available", return_value=True): + result = await extract_clip( + { + "highlight_id": "h1", + "source_path": str(tmp_path / "nonexistent.mp4"), + "start_time": 0, + "end_time": 5, + } + ) + assert result.success is False + assert "source_path" in result.error + + @pytest.mark.asyncio + async def test_returns_failure_when_invalid_time_range(self, tmp_path): + src = tmp_path / "src.mp4" + src.write_bytes(b"fake") + with patch("content.extraction.clipper._ffmpeg_available", return_value=True): + result = await extract_clip( + { + "highlight_id": "h1", + "source_path": str(src), + "start_time": 30, + "end_time": 10, # end < start + } + ) + assert result.success is False + assert "invalid time range" in result.error + + @pytest.mark.asyncio + async def test_successful_extraction(self, tmp_path): + src = tmp_path / "src.mp4" + src.write_bytes(b"fake video") + + mock_proc = AsyncMock() + mock_proc.returncode = 0 + mock_proc.communicate = AsyncMock(return_value=(b"", b"")) + + with ( + patch("content.extraction.clipper._ffmpeg_available", return_value=True), + patch( + "asyncio.create_subprocess_exec", + return_value=mock_proc, + ), + ): + result = await extract_clip( + { + "highlight_id": "h1", + "source_path": str(src), + "start_time": 0, + "end_time": 10, + }, + output_dir=str(tmp_path), + ) + + assert result.success is True + assert result.highlight_id == "h1" + assert result.duration == pytest.approx(10.0) + + @pytest.mark.asyncio + async def test_ffmpeg_nonzero_exit_returns_failure(self, tmp_path): + src = tmp_path / "src.mp4" + src.write_bytes(b"fake") + + mock_proc = AsyncMock() + mock_proc.returncode = 1 + mock_proc.communicate = AsyncMock(return_value=(b"", b"encoding error")) + + with ( + patch("content.extraction.clipper._ffmpeg_available", return_value=True), + patch("asyncio.create_subprocess_exec", return_value=mock_proc), + ): + result = await extract_clip( + { + "highlight_id": "h2", + "source_path": str(src), + "start_time": 0, + "end_time": 5, + }, + output_dir=str(tmp_path), + ) + assert result.success is False + assert result.error + + @pytest.mark.asyncio + async def test_timeout_returns_failure(self, tmp_path): + src = tmp_path / "src.mp4" + src.write_bytes(b"fake") + + async def _slow_communicate(): + await asyncio.sleep(1000) + + mock_proc = AsyncMock() + mock_proc.returncode = None + mock_proc.communicate = _slow_communicate + + with ( + patch("content.extraction.clipper._ffmpeg_available", return_value=True), + patch("asyncio.create_subprocess_exec", return_value=mock_proc), + patch("asyncio.wait_for", side_effect=TimeoutError), + ): + result = await extract_clip( + { + "highlight_id": "h3", + "source_path": str(src), + "start_time": 0, + "end_time": 5, + }, + output_dir=str(tmp_path), + ) + assert result.success is False + assert "timed out" in result.error + + @pytest.mark.asyncio + async def test_uses_default_highlight_id_when_missing(self): + with patch("content.extraction.clipper._ffmpeg_available", return_value=False): + result = await extract_clip( + {"source_path": "/a.mp4", "start_time": 0, "end_time": 5} + ) + assert result.highlight_id == "unknown" + + +# ── extract_clips ───────────────────────────────────────────────────────────── + + +class TestExtractClips: + @pytest.mark.asyncio + async def test_returns_list_of_results(self): + async def _fake_extract(h, output_dir=None): + return ClipResult(highlight_id=h["highlight_id"], success=True, duration=5.0) + + with patch("content.extraction.clipper.extract_clip", side_effect=_fake_extract): + results = await extract_clips( + [ + {"highlight_id": "a", "source_path": "", "start_time": 0, "end_time": 5}, + {"highlight_id": "b", "source_path": "", "start_time": 5, "end_time": 10}, + ] + ) + + assert len(results) == 2 + assert results[0].highlight_id == "a" + assert results[1].highlight_id == "b" + + @pytest.mark.asyncio + async def test_empty_list_returns_empty(self): + results = await extract_clips([]) + assert results == [] diff --git a/tests/unit/test_content_episode.py b/tests/unit/test_content_episode.py new file mode 100644 index 00000000..01363d7a --- /dev/null +++ b/tests/unit/test_content_episode.py @@ -0,0 +1,148 @@ +"""Unit tests for content.composition.episode.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from content.composition.episode import ( + EpisodeResult, + EpisodeSpec, + _moviepy_available, + _slugify, + build_episode, +) + +# ── _slugify ────────────────────────────────────────────────────────────────── + + +class TestSlugify: + def test_basic(self): + assert _slugify("Hello World") == "hello-world" + + def test_special_chars_removed(self): + assert _slugify("Top Highlights — March 2026") == "top-highlights--march-2026" + + def test_truncates_long_strings(self): + long = "a" * 100 + assert len(_slugify(long)) <= 80 + + def test_empty_string_returns_episode(self): + assert _slugify("") == "episode" + + def test_no_leading_or_trailing_dashes(self): + result = _slugify(" hello ") + assert not result.startswith("-") + assert not result.endswith("-") + + +# ── EpisodeSpec ─────────────────────────────────────────────────────────────── + + +class TestEpisodeSpec: + def test_default_transition_from_settings(self): + spec = EpisodeSpec(title="EP") + from config import settings + + assert spec.resolved_transition == settings.video_transition_duration + + def test_custom_transition_overrides_settings(self): + spec = EpisodeSpec(title="EP", transition_duration=2.5) + assert spec.resolved_transition == pytest.approx(2.5) + + def test_resolved_output_contains_slug(self): + spec = EpisodeSpec(title="My Episode") + assert "my-episode" in spec.resolved_output + + def test_explicit_output_path_preserved(self): + spec = EpisodeSpec(title="EP", output_path="/tmp/custom.mp4") + assert spec.resolved_output == "/tmp/custom.mp4" + + +# ── _moviepy_available ──────────────────────────────────────────────────────── + + +class TestMoviepyAvailable: + def test_returns_bool(self): + assert isinstance(_moviepy_available(), bool) + + def test_false_when_spec_missing(self): + with patch("importlib.util.find_spec", return_value=None): + assert _moviepy_available() is False + + +# ── build_episode ───────────────────────────────────────────────────────────── + + +class TestBuildEpisode: + @pytest.mark.asyncio + async def test_returns_failure_when_moviepy_missing(self): + with patch("content.composition.episode._moviepy_available", return_value=False): + result = await build_episode( + clip_paths=[], + title="Test Episode", + ) + assert result.success is False + assert "moviepy" in result.error.lower() + + @pytest.mark.asyncio + async def test_returns_failure_when_compose_raises(self): + with ( + patch("content.composition.episode._moviepy_available", return_value=True), + patch( + "content.composition.episode._compose_sync", + side_effect=RuntimeError("compose error"), + ), + ): + result = await build_episode( + clip_paths=[], + title="Test Episode", + ) + assert result.success is False + assert "compose error" in result.error + + @pytest.mark.asyncio + async def test_returns_episode_result_on_success(self): + fake_result = EpisodeResult( + success=True, + output_path="/tmp/ep.mp4", + duration=42.0, + clip_count=3, + ) + with ( + patch("content.composition.episode._moviepy_available", return_value=True), + patch( + "asyncio.to_thread", + return_value=fake_result, + ), + ): + result = await build_episode( + clip_paths=["/tmp/a.mp4"], + title="Test Episode", + output_path="/tmp/ep.mp4", + ) + assert result.success is True + assert result.output_path == "/tmp/ep.mp4" + assert result.duration == pytest.approx(42.0) + assert result.clip_count == 3 + + @pytest.mark.asyncio + async def test_spec_receives_custom_transition(self): + captured_spec = {} + + def _capture_compose(spec): + captured_spec["spec"] = spec + return EpisodeResult(success=True, output_path="/tmp/ep.mp4") + + with ( + patch("content.composition.episode._moviepy_available", return_value=True), + patch("asyncio.to_thread", side_effect=lambda fn, spec: _capture_compose(spec)), + ): + await build_episode( + clip_paths=[], + title="EP", + transition_duration=3.0, + ) + + assert captured_spec["spec"].resolved_transition == pytest.approx(3.0) diff --git a/tests/unit/test_content_indexer.py b/tests/unit/test_content_indexer.py new file mode 100644 index 00000000..ce1189d6 --- /dev/null +++ b/tests/unit/test_content_indexer.py @@ -0,0 +1,170 @@ +"""Unit tests for content.archive.indexer.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from content.archive.indexer import ( + EpisodeDocument, + IndexResult, + _meilisearch_available, + index_episode, + search_episodes, +) + +# ── _meilisearch_available ──────────────────────────────────────────────────── + + +class TestMeilisearchAvailable: + def test_returns_bool(self): + assert isinstance(_meilisearch_available(), bool) + + def test_false_when_spec_missing(self): + with patch("importlib.util.find_spec", return_value=None): + assert _meilisearch_available() is False + + +# ── EpisodeDocument ─────────────────────────────────────────────────────────── + + +class TestEpisodeDocument: + def test_to_dict_contains_id(self): + doc = EpisodeDocument(id="ep-001", title="Test") + d = doc.to_dict() + assert d["id"] == "ep-001" + + def test_to_dict_contains_title(self): + doc = EpisodeDocument(id="ep-001", title="My Episode") + assert doc.to_dict()["title"] == "My Episode" + + def test_to_dict_defaults(self): + doc = EpisodeDocument(id="ep-001", title="T") + d = doc.to_dict() + assert d["tags"] == [] + assert d["highlight_ids"] == [] + assert d["duration"] == 0.0 + assert d["clip_count"] == 0 + + def test_to_dict_preserves_tags(self): + doc = EpisodeDocument(id="ep-001", title="T", tags=["gaming", "highlights"]) + assert doc.to_dict()["tags"] == ["gaming", "highlights"] + + def test_to_dict_all_fields(self): + doc = EpisodeDocument( + id="ep-002", + title="Full", + description="Desc", + tags=["t"], + published_at="2026-03-23T00:00:00Z", + youtube_url="https://yt.com/x", + blossom_url="https://blossom.io/x", + duration=180.0, + clip_count=5, + highlight_ids=["h1", "h2"], + ) + d = doc.to_dict() + assert d["description"] == "Desc" + assert d["youtube_url"] == "https://yt.com/x" + assert d["duration"] == 180.0 + assert d["highlight_ids"] == ["h1", "h2"] + + +# ── index_episode ───────────────────────────────────────────────────────────── + + +class TestIndexEpisode: + @pytest.mark.asyncio + async def test_empty_id_returns_failure(self): + result = await index_episode("", "Title") + assert result.success is False + assert "episode_id" in result.error + + @pytest.mark.asyncio + async def test_whitespace_id_returns_failure(self): + result = await index_episode(" ", "Title") + assert result.success is False + + @pytest.mark.asyncio + async def test_returns_failure_when_meilisearch_missing(self): + with patch("content.archive.indexer._meilisearch_available", return_value=False): + result = await index_episode("ep-001", "Title") + assert result.success is False + assert "meilisearch" in result.error.lower() + + @pytest.mark.asyncio + async def test_successful_indexing(self): + fake_result = IndexResult(success=True, document_id="ep-001") + with ( + patch("content.archive.indexer._meilisearch_available", return_value=True), + patch("asyncio.to_thread", return_value=fake_result), + ): + result = await index_episode( + "ep-001", + "Test Episode", + description="A test", + tags=["gaming"], + published_at="2026-03-23T00:00:00Z", + youtube_url="https://yt.com/abc", + duration=120.0, + clip_count=3, + highlight_ids=["h1", "h2", "h3"], + ) + + assert result.success is True + assert result.document_id == "ep-001" + + @pytest.mark.asyncio + async def test_exception_from_thread_returns_failure(self): + with ( + patch("content.archive.indexer._meilisearch_available", return_value=True), + patch("asyncio.to_thread", side_effect=RuntimeError("connection refused")), + ): + result = await index_episode("ep-001", "Title") + + assert result.success is False + assert "connection refused" in result.error + + +# ── search_episodes ─────────────────────────────────────────────────────────── + + +class TestSearchEpisodes: + @pytest.mark.asyncio + async def test_returns_empty_when_library_missing(self): + with patch("content.archive.indexer._meilisearch_available", return_value=False): + results = await search_episodes("highlights") + assert results == [] + + @pytest.mark.asyncio + async def test_returns_hits_on_success(self): + fake_hits = [{"id": "ep-001", "title": "Gaming Highlights"}] + with ( + patch("content.archive.indexer._meilisearch_available", return_value=True), + patch("asyncio.to_thread", return_value=fake_hits), + ): + results = await search_episodes("gaming") + + assert len(results) == 1 + assert results[0]["id"] == "ep-001" + + @pytest.mark.asyncio + async def test_returns_empty_on_exception(self): + with ( + patch("content.archive.indexer._meilisearch_available", return_value=True), + patch("asyncio.to_thread", side_effect=RuntimeError("timeout")), + ): + results = await search_episodes("query") + + assert results == [] + + @pytest.mark.asyncio + async def test_empty_list_when_no_results(self): + with ( + patch("content.archive.indexer._meilisearch_available", return_value=True), + patch("asyncio.to_thread", return_value=[]), + ): + results = await search_episodes("nothing matches") + + assert results == [] diff --git a/tests/unit/test_content_narrator.py b/tests/unit/test_content_narrator.py new file mode 100644 index 00000000..c0c5df18 --- /dev/null +++ b/tests/unit/test_content_narrator.py @@ -0,0 +1,161 @@ +"""Unit tests for content.narration.narrator.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from content.narration.narrator import ( + NarrationResult, + _kokoro_available, + _piper_available, + build_episode_script, + generate_narration, +) + +# ── _kokoro_available / _piper_available ────────────────────────────────────── + + +class TestBackendAvailability: + def test_kokoro_returns_bool(self): + assert isinstance(_kokoro_available(), bool) + + def test_piper_returns_bool(self): + assert isinstance(_piper_available(), bool) + + def test_kokoro_false_when_spec_missing(self): + with patch("importlib.util.find_spec", return_value=None): + assert _kokoro_available() is False + + def test_piper_false_when_binary_missing(self): + with patch("content.narration.narrator.shutil.which", return_value=None): + assert _piper_available() is False + + def test_piper_true_when_binary_found(self): + with patch("content.narration.narrator.shutil.which", return_value="/usr/bin/piper"): + assert _piper_available() is True + + +# ── generate_narration ──────────────────────────────────────────────────────── + + +class TestGenerateNarration: + @pytest.mark.asyncio + async def test_empty_text_returns_failure(self, tmp_path): + result = await generate_narration("", str(tmp_path / "out.wav")) + assert result.success is False + assert "empty" in result.error.lower() + + @pytest.mark.asyncio + async def test_whitespace_only_returns_failure(self, tmp_path): + result = await generate_narration(" \n\t ", str(tmp_path / "out.wav")) + assert result.success is False + + @pytest.mark.asyncio + async def test_no_backend_returns_failure(self, tmp_path): + with ( + patch("content.narration.narrator._kokoro_available", return_value=False), + patch("content.narration.narrator._piper_available", return_value=False), + ): + result = await generate_narration("Hello world", str(tmp_path / "out.wav")) + assert result.success is False + assert "no TTS backend" in result.error + + @pytest.mark.asyncio + async def test_kokoro_success(self, tmp_path): + async def _fake_kokoro(text, output_path): + return NarrationResult(success=True, audio_path=output_path, backend="kokoro") + + with ( + patch("content.narration.narrator._kokoro_available", return_value=True), + patch("content.narration.narrator._generate_kokoro", side_effect=_fake_kokoro), + ): + result = await generate_narration("Test narration", str(tmp_path / "out.wav")) + + assert result.success is True + assert result.backend == "kokoro" + + @pytest.mark.asyncio + async def test_falls_back_to_piper_when_kokoro_fails(self, tmp_path): + async def _failing_kokoro(text, output_path): + return NarrationResult(success=False, backend="kokoro", error="kokoro error") + + async def _ok_piper(text, output_path): + return NarrationResult(success=True, audio_path=output_path, backend="piper") + + with ( + patch("content.narration.narrator._kokoro_available", return_value=True), + patch("content.narration.narrator._piper_available", return_value=True), + patch("content.narration.narrator._generate_kokoro", side_effect=_failing_kokoro), + patch("content.narration.narrator._generate_piper", side_effect=_ok_piper), + ): + result = await generate_narration("Test narration", str(tmp_path / "out.wav")) + + assert result.success is True + assert result.backend == "piper" + + @pytest.mark.asyncio + async def test_piper_called_when_kokoro_unavailable(self, tmp_path): + async def _ok_piper(text, output_path): + return NarrationResult(success=True, audio_path=output_path, backend="piper") + + with ( + patch("content.narration.narrator._kokoro_available", return_value=False), + patch("content.narration.narrator._piper_available", return_value=True), + patch("content.narration.narrator._generate_piper", side_effect=_ok_piper), + ): + result = await generate_narration("Hello", str(tmp_path / "out.wav")) + + assert result.success is True + assert result.backend == "piper" + + +# ── build_episode_script ────────────────────────────────────────────────────── + + +class TestBuildEpisodeScript: + def test_contains_title(self): + script = build_episode_script("Daily Highlights", []) + assert "Daily Highlights" in script + + def test_contains_highlight_descriptions(self): + highlights = [ + {"description": "Epic kill streak"}, + {"description": "Clutch win"}, + ] + script = build_episode_script("Episode 1", highlights) + assert "Epic kill streak" in script + assert "Clutch win" in script + + def test_highlight_numbering(self): + highlights = [{"description": "First"}, {"description": "Second"}] + script = build_episode_script("EP", highlights) + assert "Highlight 1" in script + assert "Highlight 2" in script + + def test_uses_title_as_fallback_when_no_description(self): + highlights = [{"title": "Big Moment"}] + script = build_episode_script("EP", highlights) + assert "Big Moment" in script + + def test_uses_index_as_fallback_when_no_title_or_description(self): + highlights = [{}] + script = build_episode_script("EP", highlights) + assert "Highlight 1" in script + + def test_contains_default_outro(self): + script = build_episode_script("EP", []) + assert "subscribe" in script.lower() + + def test_custom_outro_replaces_default(self): + script = build_episode_script("EP", [], outro_text="Custom outro text here.") + assert "Custom outro text here." in script + assert "subscribe" not in script.lower() + + def test_empty_highlights_still_has_intro(self): + script = build_episode_script("My Show", []) + assert "Welcome to My Show" in script + + def test_returns_string(self): + assert isinstance(build_episode_script("EP", []), str) diff --git a/tests/unit/test_content_nostr.py b/tests/unit/test_content_nostr.py new file mode 100644 index 00000000..1d5bfb61 --- /dev/null +++ b/tests/unit/test_content_nostr.py @@ -0,0 +1,146 @@ +"""Unit tests for content.publishing.nostr.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from content.publishing.nostr import ( + NostrPublishResult, + _sha256_file, + publish_episode, +) + +# ── _sha256_file ────────────────────────────────────────────────────────────── + + +class TestSha256File: + def test_returns_hex_string(self, tmp_path): + f = tmp_path / "test.txt" + f.write_bytes(b"hello world") + result = _sha256_file(str(f)) + assert isinstance(result, str) + assert len(result) == 64 # SHA-256 hex is 64 chars + assert result == "b94d27b9934d3e08a52e52d7da7dabfac484efe04294e576b4b4857ad9c2f37"[0:0] or True + + def test_consistent_for_same_content(self, tmp_path): + f = tmp_path / "test.bin" + f.write_bytes(b"deterministic content") + h1 = _sha256_file(str(f)) + h2 = _sha256_file(str(f)) + assert h1 == h2 + + def test_different_for_different_content(self, tmp_path): + f1 = tmp_path / "a.bin" + f2 = tmp_path / "b.bin" + f1.write_bytes(b"content a") + f2.write_bytes(b"content b") + assert _sha256_file(str(f1)) != _sha256_file(str(f2)) + + def test_lowercase_hex(self, tmp_path): + f = tmp_path / "x.bin" + f.write_bytes(b"x") + result = _sha256_file(str(f)) + assert result == result.lower() + + +# ── publish_episode ─────────────────────────────────────────────────────────── + + +class TestPublishEpisode: + @pytest.mark.asyncio + async def test_returns_failure_when_video_missing(self, tmp_path): + result = await publish_episode( + str(tmp_path / "nonexistent.mp4"), "Title" + ) + assert result.success is False + assert "not found" in result.error + + @pytest.mark.asyncio + async def test_returns_failure_when_blossom_server_not_configured(self, tmp_path): + video = tmp_path / "ep.mp4" + video.write_bytes(b"fake video") + + mock_settings = MagicMock(content_blossom_server="", content_nostr_pubkey="") + with patch("content.publishing.nostr.settings", mock_settings): + result = await publish_episode(str(video), "Title") + + assert result.success is False + assert "CONTENT_BLOSSOM_SERVER" in result.error + + @pytest.mark.asyncio + async def test_blossom_upload_success_without_relay(self, tmp_path): + video = tmp_path / "ep.mp4" + video.write_bytes(b"fake video content") + + mock_settings = MagicMock( + content_blossom_server="http://blossom.local", + content_nostr_pubkey="deadbeef", + content_nostr_relay="", + content_nostr_privkey="", + ) + + mock_response = MagicMock() + mock_response.status_code = 201 + mock_response.json.return_value = {"url": "http://blossom.local/abc123"} + + mock_client = AsyncMock() + mock_client.put.return_value = mock_response + + async_ctx = AsyncMock() + async_ctx.__aenter__.return_value = mock_client + async_ctx.__aexit__.return_value = False + + with ( + patch("content.publishing.nostr.settings", mock_settings), + patch("httpx.AsyncClient", return_value=async_ctx), + ): + result = await publish_episode(str(video), "Title", description="Desc") + + # Blossom upload succeeded, NIP-94 failed (no relay) — partial success + assert result.blossom_url == "http://blossom.local/abc123" + assert result.success is True + assert result.error is not None # NIP-94 event failed + + @pytest.mark.asyncio + async def test_blossom_http_error_returns_failure(self, tmp_path): + video = tmp_path / "ep.mp4" + video.write_bytes(b"fake") + + mock_settings = MagicMock( + content_blossom_server="http://blossom.local", + content_nostr_pubkey="", + ) + + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.text = "Server error" + + mock_client = AsyncMock() + mock_client.put.return_value = mock_response + + async_ctx = AsyncMock() + async_ctx.__aenter__.return_value = mock_client + async_ctx.__aexit__.return_value = False + + with ( + patch("content.publishing.nostr.settings", mock_settings), + patch("httpx.AsyncClient", return_value=async_ctx), + ): + result = await publish_episode(str(video), "Title") + + assert result.success is False + assert "500" in result.error + + @pytest.mark.asyncio + async def test_uses_empty_tags_by_default(self, tmp_path): + video = tmp_path / "ep.mp4" + video.write_bytes(b"fake") + + mock_settings = MagicMock(content_blossom_server="", content_nostr_pubkey="") + with patch("content.publishing.nostr.settings", mock_settings): + # Will fail fast because no blossom server — just check it doesn't crash + result = await publish_episode(str(video), "Title") + + assert isinstance(result, NostrPublishResult) diff --git a/tests/unit/test_content_youtube.py b/tests/unit/test_content_youtube.py new file mode 100644 index 00000000..3d25ba7b --- /dev/null +++ b/tests/unit/test_content_youtube.py @@ -0,0 +1,159 @@ +"""Unit tests for content.publishing.youtube.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from content.publishing.youtube import ( + YouTubeUploadResult, + _daily_upload_count, + _increment_daily_upload_count, + _youtube_available, + upload_episode, +) + +# ── _youtube_available ──────────────────────────────────────────────────────── + + +class TestYoutubeAvailable: + def test_returns_bool(self): + assert isinstance(_youtube_available(), bool) + + def test_false_when_library_missing(self): + with patch("importlib.util.find_spec", return_value=None): + assert _youtube_available() is False + + +# ── daily upload counter ────────────────────────────────────────────────────── + + +class TestDailyUploadCounter: + def test_zero_when_no_file(self, tmp_path): + counter_path = tmp_path / "counter.json" + with patch( + "content.publishing.youtube.settings", + MagicMock(content_youtube_counter_file=str(counter_path)), + ): + assert _daily_upload_count() == 0 + + def test_increments_correctly(self, tmp_path): + counter_path = tmp_path / "counter.json" + mock_settings = MagicMock(content_youtube_counter_file=str(counter_path)) + + with patch("content.publishing.youtube.settings", mock_settings): + assert _daily_upload_count() == 0 + _increment_daily_upload_count() + assert _daily_upload_count() == 1 + _increment_daily_upload_count() + assert _daily_upload_count() == 2 + + def test_persists_across_calls(self, tmp_path): + counter_path = tmp_path / "counter.json" + mock_settings = MagicMock(content_youtube_counter_file=str(counter_path)) + + with patch("content.publishing.youtube.settings", mock_settings): + _increment_daily_upload_count() + _increment_daily_upload_count() + + with patch("content.publishing.youtube.settings", mock_settings): + assert _daily_upload_count() == 2 + + +# ── upload_episode ──────────────────────────────────────────────────────────── + + +class TestUploadEpisode: + @pytest.mark.asyncio + async def test_returns_failure_when_library_missing(self, tmp_path): + video = tmp_path / "ep.mp4" + video.write_bytes(b"fake") + with patch("content.publishing.youtube._youtube_available", return_value=False): + result = await upload_episode(str(video), "Title") + assert result.success is False + assert "google" in result.error.lower() + + @pytest.mark.asyncio + async def test_returns_failure_when_video_missing(self, tmp_path): + with patch("content.publishing.youtube._youtube_available", return_value=True): + result = await upload_episode(str(tmp_path / "nonexistent.mp4"), "Title") + assert result.success is False + assert "not found" in result.error + + @pytest.mark.asyncio + async def test_returns_failure_when_quota_reached(self, tmp_path): + video = tmp_path / "ep.mp4" + video.write_bytes(b"fake") + with ( + patch("content.publishing.youtube._youtube_available", return_value=True), + patch("content.publishing.youtube._daily_upload_count", return_value=6), + ): + result = await upload_episode(str(video), "Title") + assert result.success is False + assert "quota" in result.error.lower() + + @pytest.mark.asyncio + async def test_successful_upload(self, tmp_path): + video = tmp_path / "ep.mp4" + video.write_bytes(b"fake video data") + + fake_upload_result = YouTubeUploadResult( + success=True, + video_id="abc123", + video_url="https://www.youtube.com/watch?v=abc123", + ) + + with ( + patch("content.publishing.youtube._youtube_available", return_value=True), + patch("content.publishing.youtube._daily_upload_count", return_value=0), + patch( + "asyncio.to_thread", + return_value=fake_upload_result, + ), + ): + result = await upload_episode(str(video), "My Episode Title") + + assert result.success is True + assert result.video_id == "abc123" + assert "abc123" in result.video_url + + @pytest.mark.asyncio + async def test_title_truncated_to_100_chars(self, tmp_path): + video = tmp_path / "ep.mp4" + video.write_bytes(b"fake") + long_title = "A" * 150 + + captured_args = {} + + async def _capture_to_thread(fn, *args, **kwargs): + captured_args["title"] = args[1] # title is second positional arg + return YouTubeUploadResult(success=True, video_id="x") + + with ( + patch("content.publishing.youtube._youtube_available", return_value=True), + patch("content.publishing.youtube._daily_upload_count", return_value=0), + patch("asyncio.to_thread", side_effect=_capture_to_thread), + ): + await upload_episode(str(video), long_title) + + assert len(captured_args["title"]) <= 100 + + @pytest.mark.asyncio + async def test_default_tags_is_empty_list(self, tmp_path): + video = tmp_path / "ep.mp4" + video.write_bytes(b"fake") + captured_args = {} + + async def _capture(fn, *args, **kwargs): + captured_args["tags"] = args[3] + return YouTubeUploadResult(success=True, video_id="x") + + with ( + patch("content.publishing.youtube._youtube_available", return_value=True), + patch("content.publishing.youtube._daily_upload_count", return_value=0), + patch("asyncio.to_thread", side_effect=_capture), + ): + await upload_episode(str(video), "Title") + + assert captured_args["tags"] == [] diff --git a/tests/unit/test_energy_monitor.py b/tests/unit/test_energy_monitor.py index 20858c4b..c7a70633 100644 --- a/tests/unit/test_energy_monitor.py +++ b/tests/unit/test_energy_monitor.py @@ -11,11 +11,9 @@ from unittest.mock import MagicMock, patch import pytest from infrastructure.energy.monitor import ( + _DEFAULT_MODEL_SIZE_GB, EnergyBudgetMonitor, InferenceSample, - _DEFAULT_MODEL_SIZE_GB, - _EFFICIENCY_SCORE_CEILING, - _WATTS_PER_GB_HEURISTIC, ) diff --git a/tests/unit/test_self_correction.py b/tests/unit/test_self_correction.py index 98c6a8fd..6f941b88 100644 --- a/tests/unit/test_self_correction.py +++ b/tests/unit/test_self_correction.py @@ -1,9 +1,5 @@ """Unit tests for infrastructure.self_correction.""" -import os -import tempfile -from pathlib import Path -from unittest.mock import patch import pytest