feat: add full creative studio + DevOps tools (Pixel, Lyra, Reel personas)

Adds 3 new personas (Pixel, Lyra, Reel) and 5 new tool modules:

- Git/DevOps tools (GitPython): clone, status, diff, log, blame, branch,
  add, commit, push, pull, stash — wired to Forge and Helm personas
- Image generation (FLUX via diffusers): text-to-image, storyboards,
  variations — Pixel persona
- Music generation (ACE-Step 1.5): full songs with vocals+instrumentals,
  instrumental tracks, vocal-only tracks — Lyra persona
- Video generation (Wan 2.1 via diffusers): text-to-video, image-to-video
  clips — Reel persona
- Creative Director pipeline: multi-step orchestration that chains
  storyboard → music → video → assembly into 3+ minute final videos
- Video assembler (MoviePy + FFmpeg): stitch clips, overlay audio,
  title cards, subtitles, final export

Also includes:
- Spark Intelligence tool-level + creative pipeline event capture
- Creative Studio dashboard page (/creative/ui) with 4 tabs
- Config settings for all new models and output directories
- pyproject.toml creative optional extra for GPU dependencies
- 107 new tests covering all modules (624 total, all passing)

https://claude.ai/code/session_01KJm6jQkNi3aA3yoQJn636c
This commit is contained in:
Claude
2026-02-24 16:31:47 +00:00
parent 1ab26d30ad
commit 1103da339c
29 changed files with 3573 additions and 13 deletions

1
src/tools/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Creative and DevOps tool modules for Timmy's swarm agents."""

281
src/tools/git_tools.py Normal file
View File

@@ -0,0 +1,281 @@
"""Git operations tools for Forge, Helm, and Timmy personas.
Provides a full set of git commands that agents can execute against
local or remote repositories. Uses GitPython under the hood.
All functions return plain dicts so they're easily serialisable for
tool-call results, Spark event capture, and WebSocket broadcast.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
_GIT_AVAILABLE = True
try:
from git import Repo, InvalidGitRepositoryError, GitCommandNotFound
except ImportError:
_GIT_AVAILABLE = False
def _require_git() -> None:
if not _GIT_AVAILABLE:
raise ImportError(
"GitPython is not installed. Run: pip install GitPython"
)
def _open_repo(repo_path: str | Path) -> "Repo":
"""Open an existing git repo at *repo_path*."""
_require_git()
return Repo(str(repo_path))
# ── Repository management ────────────────────────────────────────────────────
def git_clone(url: str, dest: str | Path) -> dict:
"""Clone a remote repository to a local path.
Returns dict with ``path`` and ``default_branch``.
"""
_require_git()
repo = Repo.clone_from(url, str(dest))
return {
"success": True,
"path": str(dest),
"default_branch": repo.active_branch.name,
}
def git_init(path: str | Path) -> dict:
"""Initialise a new git repository at *path*."""
_require_git()
Path(path).mkdir(parents=True, exist_ok=True)
repo = Repo.init(str(path))
return {"success": True, "path": str(path), "bare": repo.bare}
# ── Status / inspection ──────────────────────────────────────────────────────
def git_status(repo_path: str | Path) -> dict:
"""Return working-tree status: modified, staged, untracked files."""
repo = _open_repo(repo_path)
return {
"success": True,
"branch": repo.active_branch.name,
"is_dirty": repo.is_dirty(untracked_files=True),
"untracked": repo.untracked_files,
"modified": [item.a_path for item in repo.index.diff(None)],
"staged": [item.a_path for item in repo.index.diff("HEAD")],
}
def git_diff(
repo_path: str | Path,
staged: bool = False,
file_path: Optional[str] = None,
) -> dict:
"""Show diff of working tree or staged changes.
If *file_path* is given, scope diff to that file only.
"""
repo = _open_repo(repo_path)
args: list[str] = []
if staged:
args.append("--cached")
if file_path:
args.extend(["--", file_path])
diff_text = repo.git.diff(*args)
return {"success": True, "diff": diff_text, "staged": staged}
def git_log(
repo_path: str | Path,
max_count: int = 20,
branch: Optional[str] = None,
) -> dict:
"""Return recent commit history as a list of dicts."""
repo = _open_repo(repo_path)
ref = branch or repo.active_branch.name
commits = []
for commit in repo.iter_commits(ref, max_count=max_count):
commits.append({
"sha": commit.hexsha,
"short_sha": commit.hexsha[:8],
"message": commit.message.strip(),
"author": str(commit.author),
"date": commit.committed_datetime.isoformat(),
"files_changed": len(commit.stats.files),
})
return {"success": True, "branch": ref, "commits": commits}
def git_blame(repo_path: str | Path, file_path: str) -> dict:
"""Show line-by-line authorship for a file."""
repo = _open_repo(repo_path)
blame_text = repo.git.blame(file_path)
return {"success": True, "file": file_path, "blame": blame_text}
# ── Branching ─────────────────────────────────────────────────────────────────
def git_branch(
repo_path: str | Path,
create: Optional[str] = None,
switch: Optional[str] = None,
) -> dict:
"""List branches, optionally create or switch to one."""
repo = _open_repo(repo_path)
if create:
repo.create_head(create)
if switch:
repo.heads[switch].checkout()
branches = [h.name for h in repo.heads]
active = repo.active_branch.name
return {
"success": True,
"branches": branches,
"active": active,
"created": create,
"switched": switch,
}
# ── Staging & committing ─────────────────────────────────────────────────────
def git_add(repo_path: str | Path, paths: list[str] | None = None) -> dict:
"""Stage files for commit. *paths* defaults to all modified files."""
repo = _open_repo(repo_path)
if paths:
repo.index.add(paths)
else:
# Stage all changes
repo.git.add(A=True)
staged = [item.a_path for item in repo.index.diff("HEAD")]
return {"success": True, "staged": staged}
def git_commit(repo_path: str | Path, message: str) -> dict:
"""Create a commit with the given message."""
repo = _open_repo(repo_path)
commit = repo.index.commit(message)
return {
"success": True,
"sha": commit.hexsha,
"short_sha": commit.hexsha[:8],
"message": message,
}
# ── Remote operations ─────────────────────────────────────────────────────────
def git_push(
repo_path: str | Path,
remote: str = "origin",
branch: Optional[str] = None,
) -> dict:
"""Push the current (or specified) branch to the remote."""
repo = _open_repo(repo_path)
ref = branch or repo.active_branch.name
info = repo.remotes[remote].push(ref)
summaries = [str(i.summary) for i in info]
return {"success": True, "remote": remote, "branch": ref, "summaries": summaries}
def git_pull(
repo_path: str | Path,
remote: str = "origin",
branch: Optional[str] = None,
) -> dict:
"""Pull from the remote into the working tree."""
repo = _open_repo(repo_path)
ref = branch or repo.active_branch.name
info = repo.remotes[remote].pull(ref)
summaries = [str(i.summary) for i in info]
return {"success": True, "remote": remote, "branch": ref, "summaries": summaries}
# ── Stashing ──────────────────────────────────────────────────────────────────
def git_stash(
repo_path: str | Path,
pop: bool = False,
message: Optional[str] = None,
) -> dict:
"""Stash or pop working-tree changes."""
repo = _open_repo(repo_path)
if pop:
repo.git.stash("pop")
return {"success": True, "action": "pop"}
args = ["push"]
if message:
args.extend(["-m", message])
repo.git.stash(*args)
return {"success": True, "action": "stash", "message": message}
# ── Tool catalogue ────────────────────────────────────────────────────────────
GIT_TOOL_CATALOG: dict[str, dict] = {
"git_clone": {
"name": "Git Clone",
"description": "Clone a remote repository to a local path",
"fn": git_clone,
},
"git_status": {
"name": "Git Status",
"description": "Show working tree status (modified, staged, untracked)",
"fn": git_status,
},
"git_diff": {
"name": "Git Diff",
"description": "Show diff of working tree or staged changes",
"fn": git_diff,
},
"git_log": {
"name": "Git Log",
"description": "Show recent commit history",
"fn": git_log,
},
"git_blame": {
"name": "Git Blame",
"description": "Show line-by-line authorship for a file",
"fn": git_blame,
},
"git_branch": {
"name": "Git Branch",
"description": "List, create, or switch branches",
"fn": git_branch,
},
"git_add": {
"name": "Git Add",
"description": "Stage files for commit",
"fn": git_add,
},
"git_commit": {
"name": "Git Commit",
"description": "Create a commit with a message",
"fn": git_commit,
},
"git_push": {
"name": "Git Push",
"description": "Push branch to remote repository",
"fn": git_push,
},
"git_pull": {
"name": "Git Pull",
"description": "Pull from remote repository",
"fn": git_pull,
},
"git_stash": {
"name": "Git Stash",
"description": "Stash or pop working tree changes",
"fn": git_stash,
},
}

171
src/tools/image_tools.py Normal file
View File

@@ -0,0 +1,171 @@
"""Image generation tools — Pixel persona.
Uses FLUX.2 Klein 4B (or configurable model) via HuggingFace diffusers
for text-to-image generation, storyboard frames, and variations.
All heavy imports are lazy so the module loads instantly even without
a GPU or the ``creative`` extra installed.
"""
from __future__ import annotations
import json
import logging
import uuid
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# Lazy-loaded pipeline singleton
_pipeline = None
def _get_pipeline():
"""Lazy-load the FLUX diffusers pipeline."""
global _pipeline
if _pipeline is not None:
return _pipeline
try:
import torch
from diffusers import FluxPipeline
except ImportError:
raise ImportError(
"Creative dependencies not installed. "
"Run: pip install 'timmy-time[creative]'"
)
from config import settings
model_id = getattr(settings, "flux_model_id", "black-forest-labs/FLUX.1-schnell")
device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.float16 if device == "cuda" else torch.float32
logger.info("Loading image model %s on %s", model_id, device)
_pipeline = FluxPipeline.from_pretrained(
model_id, torch_dtype=dtype,
).to(device)
logger.info("Image model loaded.")
return _pipeline
def _output_dir() -> Path:
from config import settings
d = Path(getattr(settings, "image_output_dir", "data/images"))
d.mkdir(parents=True, exist_ok=True)
return d
def _save_metadata(image_path: Path, meta: dict) -> Path:
meta_path = image_path.with_suffix(".json")
meta_path.write_text(json.dumps(meta, indent=2))
return meta_path
# ── Public tools ──────────────────────────────────────────────────────────────
def generate_image(
prompt: str,
negative_prompt: str = "",
width: int = 1024,
height: int = 1024,
steps: int = 4,
seed: Optional[int] = None,
) -> dict:
"""Generate an image from a text prompt.
Returns dict with ``path``, ``width``, ``height``, and ``prompt``.
"""
pipe = _get_pipeline()
import torch
generator = torch.Generator(device=pipe.device)
if seed is not None:
generator.manual_seed(seed)
image = pipe(
prompt=prompt,
negative_prompt=negative_prompt or None,
width=width,
height=height,
num_inference_steps=steps,
generator=generator,
).images[0]
uid = uuid.uuid4().hex[:12]
out_path = _output_dir() / f"{uid}.png"
image.save(out_path)
meta = {
"id": uid, "prompt": prompt, "negative_prompt": negative_prompt,
"width": width, "height": height, "steps": steps, "seed": seed,
}
_save_metadata(out_path, meta)
return {"success": True, "path": str(out_path), **meta}
def generate_storyboard(
scenes: list[str],
width: int = 1024,
height: int = 576,
steps: int = 4,
) -> dict:
"""Generate a storyboard: one keyframe image per scene description.
Args:
scenes: List of scene description strings.
Returns dict with list of generated frame paths.
"""
frames = []
for i, scene in enumerate(scenes):
result = generate_image(
prompt=scene, width=width, height=height, steps=steps,
)
result["scene_index"] = i
result["scene_description"] = scene
frames.append(result)
return {"success": True, "frame_count": len(frames), "frames": frames}
def image_variations(
prompt: str,
count: int = 4,
width: int = 1024,
height: int = 1024,
steps: int = 4,
) -> dict:
"""Generate multiple variations of the same prompt with different seeds."""
import random
variations = []
for _ in range(count):
seed = random.randint(0, 2**32 - 1)
result = generate_image(
prompt=prompt, width=width, height=height,
steps=steps, seed=seed,
)
variations.append(result)
return {"success": True, "count": len(variations), "variations": variations}
# ── Tool catalogue ────────────────────────────────────────────────────────────
IMAGE_TOOL_CATALOG: dict[str, dict] = {
"generate_image": {
"name": "Generate Image",
"description": "Generate an image from a text prompt using FLUX",
"fn": generate_image,
},
"generate_storyboard": {
"name": "Generate Storyboard",
"description": "Generate keyframe images for a sequence of scenes",
"fn": generate_storyboard,
},
"image_variations": {
"name": "Image Variations",
"description": "Generate multiple variations of the same prompt",
"fn": image_variations,
},
}

210
src/tools/music_tools.py Normal file
View File

@@ -0,0 +1,210 @@
"""Music generation tools — Lyra persona.
Uses ACE-Step 1.5 for full song generation with vocals, instrumentals,
and lyrics. Falls back gracefully when the ``creative`` extra is not
installed.
All heavy imports are lazy — the module loads instantly without GPU.
"""
from __future__ import annotations
import json
import logging
import uuid
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# Lazy-loaded model singleton
_model = None
def _get_model():
"""Lazy-load the ACE-Step music generation model."""
global _model
if _model is not None:
return _model
try:
from ace_step import ACEStep
except ImportError:
raise ImportError(
"ACE-Step not installed. Run: pip install 'timmy-time[creative]'"
)
from config import settings
model_name = getattr(settings, "ace_step_model", "ace-step/ACE-Step-v1.5")
logger.info("Loading music model %s", model_name)
_model = ACEStep(model_name)
logger.info("Music model loaded.")
return _model
def _output_dir() -> Path:
from config import settings
d = Path(getattr(settings, "music_output_dir", "data/music"))
d.mkdir(parents=True, exist_ok=True)
return d
def _save_metadata(audio_path: Path, meta: dict) -> Path:
meta_path = audio_path.with_suffix(".json")
meta_path.write_text(json.dumps(meta, indent=2))
return meta_path
# ── Supported genres ──────────────────────────────────────────────────────────
GENRES = [
"pop", "rock", "hip-hop", "r&b", "jazz", "blues", "country",
"electronic", "classical", "folk", "reggae", "metal", "punk",
"soul", "funk", "latin", "ambient", "lo-fi", "cinematic",
]
# ── Public tools ──────────────────────────────────────────────────────────────
def generate_song(
lyrics: str,
genre: str = "pop",
duration: int = 120,
language: str = "en",
title: Optional[str] = None,
) -> dict:
"""Generate a full song with vocals and instrumentals from lyrics.
Args:
lyrics: Song lyrics text.
genre: Musical genre / style tag.
duration: Target duration in seconds (30240).
language: ISO language code (19 languages supported).
title: Optional song title for metadata.
Returns dict with ``path``, ``duration``, ``genre``, etc.
"""
model = _get_model()
duration = max(30, min(240, duration))
uid = uuid.uuid4().hex[:12]
out_path = _output_dir() / f"{uid}.wav"
logger.info("Generating song: genre=%s duration=%ds …", genre, duration)
audio = model.generate(
lyrics=lyrics,
genre=genre,
duration=duration,
language=language,
)
audio.save(str(out_path))
meta = {
"id": uid, "title": title or f"Untitled ({genre})",
"lyrics": lyrics, "genre": genre,
"duration": duration, "language": language,
}
_save_metadata(out_path, meta)
return {"success": True, "path": str(out_path), **meta}
def generate_instrumental(
prompt: str,
genre: str = "cinematic",
duration: int = 60,
) -> dict:
"""Generate an instrumental track from a text prompt (no vocals).
Args:
prompt: Description of the desired music.
genre: Musical genre / style tag.
duration: Target duration in seconds (15180).
"""
model = _get_model()
duration = max(15, min(180, duration))
uid = uuid.uuid4().hex[:12]
out_path = _output_dir() / f"{uid}.wav"
logger.info("Generating instrumental: genre=%s", genre)
audio = model.generate(
lyrics="",
genre=genre,
duration=duration,
prompt=prompt,
)
audio.save(str(out_path))
meta = {
"id": uid, "prompt": prompt, "genre": genre,
"duration": duration, "instrumental": True,
}
_save_metadata(out_path, meta)
return {"success": True, "path": str(out_path), **meta}
def generate_vocals(
lyrics: str,
style: str = "pop",
duration: int = 60,
language: str = "en",
) -> dict:
"""Generate a vocal-only track from lyrics.
Useful for layering over custom instrumentals.
"""
model = _get_model()
duration = max(15, min(180, duration))
uid = uuid.uuid4().hex[:12]
out_path = _output_dir() / f"{uid}.wav"
audio = model.generate(
lyrics=lyrics,
genre=f"{style} acapella vocals",
duration=duration,
language=language,
)
audio.save(str(out_path))
meta = {
"id": uid, "lyrics": lyrics, "style": style,
"duration": duration, "vocals_only": True,
}
_save_metadata(out_path, meta)
return {"success": True, "path": str(out_path), **meta}
def list_genres() -> dict:
"""Return the list of supported genre / style tags."""
return {"success": True, "genres": GENRES}
# ── Tool catalogue ────────────────────────────────────────────────────────────
MUSIC_TOOL_CATALOG: dict[str, dict] = {
"generate_song": {
"name": "Generate Song",
"description": "Generate a full song with vocals + instrumentals from lyrics",
"fn": generate_song,
},
"generate_instrumental": {
"name": "Generate Instrumental",
"description": "Generate an instrumental track from a text prompt",
"fn": generate_instrumental,
},
"generate_vocals": {
"name": "Generate Vocals",
"description": "Generate a vocal-only track from lyrics",
"fn": generate_vocals,
},
"list_genres": {
"name": "List Genres",
"description": "List supported music genre / style tags",
"fn": list_genres,
},
}

206
src/tools/video_tools.py Normal file
View File

@@ -0,0 +1,206 @@
"""Video generation tools — Reel persona.
Uses Wan 2.1 (via HuggingFace diffusers) for text-to-video and
image-to-video generation. Heavy imports are lazy.
"""
from __future__ import annotations
import json
import logging
import uuid
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# Lazy-loaded pipeline singletons
_t2v_pipeline = None
_i2v_pipeline = None
def _get_t2v_pipeline():
"""Lazy-load the text-to-video pipeline (Wan 2.1)."""
global _t2v_pipeline
if _t2v_pipeline is not None:
return _t2v_pipeline
try:
import torch
from diffusers import DiffusionPipeline
except ImportError:
raise ImportError(
"Creative dependencies not installed. "
"Run: pip install 'timmy-time[creative]'"
)
from config import settings
model_id = getattr(settings, "wan_model_id", "Wan-AI/Wan2.1-T2V-1.3B")
device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.float16 if device == "cuda" else torch.float32
logger.info("Loading video model %s on %s", model_id, device)
_t2v_pipeline = DiffusionPipeline.from_pretrained(
model_id, torch_dtype=dtype,
).to(device)
logger.info("Video model loaded.")
return _t2v_pipeline
def _output_dir() -> Path:
from config import settings
d = Path(getattr(settings, "video_output_dir", "data/video"))
d.mkdir(parents=True, exist_ok=True)
return d
def _save_metadata(video_path: Path, meta: dict) -> Path:
meta_path = video_path.with_suffix(".json")
meta_path.write_text(json.dumps(meta, indent=2))
return meta_path
def _export_frames_to_mp4(frames, out_path: Path, fps: int = 24) -> None:
"""Export a list of PIL Image frames to an MP4 file using moviepy."""
import numpy as np
from moviepy import ImageSequenceClip
frame_arrays = [np.array(f) for f in frames]
clip = ImageSequenceClip(frame_arrays, fps=fps)
clip.write_videofile(
str(out_path), codec="libx264", audio=False, logger=None,
)
# ── Resolution presets ────────────────────────────────────────────────────────
RESOLUTION_PRESETS = {
"480p": (854, 480),
"720p": (1280, 720),
}
VIDEO_STYLES = [
"cinematic", "anime", "documentary", "abstract",
"timelapse", "slow-motion", "music-video", "vlog",
]
# ── Public tools ──────────────────────────────────────────────────────────────
def generate_video_clip(
prompt: str,
duration: int = 5,
resolution: str = "480p",
fps: int = 24,
seed: Optional[int] = None,
) -> dict:
"""Generate a short video clip from a text prompt.
Args:
prompt: Text description of the desired video.
duration: Target duration in seconds (210).
resolution: "480p" or "720p".
fps: Frames per second.
seed: Optional seed for reproducibility.
Returns dict with ``path``, ``duration``, ``resolution``.
"""
pipe = _get_t2v_pipeline()
import torch
duration = max(2, min(10, duration))
w, h = RESOLUTION_PRESETS.get(resolution, RESOLUTION_PRESETS["480p"])
num_frames = duration * fps
generator = torch.Generator(device=pipe.device)
if seed is not None:
generator.manual_seed(seed)
logger.info("Generating %ds video at %s", duration, resolution)
result = pipe(
prompt=prompt,
num_frames=num_frames,
width=w,
height=h,
generator=generator,
)
frames = result.frames[0] if hasattr(result, "frames") else result.images
uid = uuid.uuid4().hex[:12]
out_path = _output_dir() / f"{uid}.mp4"
_export_frames_to_mp4(frames, out_path, fps=fps)
meta = {
"id": uid, "prompt": prompt, "duration": duration,
"resolution": resolution, "fps": fps, "seed": seed,
}
_save_metadata(out_path, meta)
return {"success": True, "path": str(out_path), **meta}
def image_to_video(
image_path: str,
prompt: str = "",
duration: int = 5,
fps: int = 24,
) -> dict:
"""Animate a still image into a video clip.
Args:
image_path: Path to the source image.
prompt: Optional motion / style guidance.
duration: Target duration in seconds (210).
"""
pipe = _get_t2v_pipeline()
from PIL import Image
duration = max(2, min(10, duration))
img = Image.open(image_path).convert("RGB")
num_frames = duration * fps
logger.info("Animating image %s%ds video …", image_path, duration)
result = pipe(
prompt=prompt or "animate this image with natural motion",
image=img,
num_frames=num_frames,
)
frames = result.frames[0] if hasattr(result, "frames") else result.images
uid = uuid.uuid4().hex[:12]
out_path = _output_dir() / f"{uid}.mp4"
_export_frames_to_mp4(frames, out_path, fps=fps)
meta = {
"id": uid, "source_image": image_path,
"prompt": prompt, "duration": duration, "fps": fps,
}
_save_metadata(out_path, meta)
return {"success": True, "path": str(out_path), **meta}
def list_video_styles() -> dict:
"""Return supported video style presets."""
return {"success": True, "styles": VIDEO_STYLES, "resolutions": list(RESOLUTION_PRESETS.keys())}
# ── Tool catalogue ────────────────────────────────────────────────────────────
VIDEO_TOOL_CATALOG: dict[str, dict] = {
"generate_video_clip": {
"name": "Generate Video Clip",
"description": "Generate a short video clip from a text prompt using Wan 2.1",
"fn": generate_video_clip,
},
"image_to_video": {
"name": "Image to Video",
"description": "Animate a still image into a video clip",
"fn": image_to_video,
},
"list_video_styles": {
"name": "List Video Styles",
"description": "List supported video style presets and resolutions",
"fn": list_video_styles,
},
}