Files
dotfiles/.agents/skills/speech/scripts/text_to_speech.py
2026-03-17 16:53:22 -07:00

529 lines
15 KiB
Python

#!/usr/bin/env python3
"""Generate speech audio with the OpenAI Audio API (TTS).
Defaults to gpt-4o-mini-tts-2025-12-15 and a built-in voice (cedar).
"""
from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
import re
import sys
import time
from typing import Any, Dict, List, Optional
DEFAULT_MODEL = "gpt-4o-mini-tts-2025-12-15"
DEFAULT_VOICE = "cedar"
DEFAULT_RESPONSE_FORMAT = "mp3"
DEFAULT_SPEED = 1.0
MAX_INPUT_CHARS = 4096
MAX_RPM = 50
DEFAULT_RPM = 50
DEFAULT_ATTEMPTS = 3
ALLOWED_VOICES = {
"alloy",
"ash",
"ballad",
"cedar",
"coral",
"echo",
"fable",
"marin",
"nova",
"onyx",
"sage",
"shimmer",
"verse",
}
ALLOWED_FORMATS = {"mp3", "opus", "aac", "flac", "wav", "pcm"}
def _die(message: str, code: int = 1) -> None:
print(f"Error: {message}", file=sys.stderr)
raise SystemExit(code)
def _warn(message: str) -> None:
print(f"Warning: {message}", file=sys.stderr)
def _ensure_api_key(dry_run: bool) -> None:
if os.getenv("OPENAI_API_KEY"):
print("OPENAI_API_KEY is set.", file=sys.stderr)
return
if dry_run:
_warn("OPENAI_API_KEY is not set; dry-run only.")
return
_die("OPENAI_API_KEY is not set. Export it before running.")
def _read_text(text: Optional[str], text_file: Optional[str], label: str) -> str:
if text and text_file:
_die(f"Use --{label} or --{label}-file, not both.")
if text_file:
path = Path(text_file)
if not path.exists():
_die(f"{label} file not found: {path}")
return path.read_text(encoding="utf-8").strip()
if text:
return str(text).strip()
_die(f"Missing {label}. Use --{label} or --{label}-file.")
return "" # unreachable
def _validate_input(text: str) -> None:
if not text:
_die("Input text is empty.")
if len(text) > MAX_INPUT_CHARS:
_die(
f"Input text exceeds {MAX_INPUT_CHARS} characters. Split into smaller chunks."
)
def _normalize_voice(voice: Optional[str]) -> str:
if not voice:
return DEFAULT_VOICE
value = str(voice).strip().lower()
if value not in ALLOWED_VOICES:
_die(
"voice must be one of: " + ", ".join(sorted(ALLOWED_VOICES))
)
return value
def _normalize_format(fmt: Optional[str]) -> str:
if not fmt:
return DEFAULT_RESPONSE_FORMAT
value = str(fmt).strip().lower()
if value not in ALLOWED_FORMATS:
_die("response-format must be one of: " + ", ".join(sorted(ALLOWED_FORMATS)))
return value
def _normalize_speed(speed: Optional[float]) -> Optional[float]:
if speed is None:
return None
try:
value = float(speed)
except ValueError:
_die("speed must be a number")
if value < 0.25 or value > 4.0:
_die("speed must be between 0.25 and 4.0")
return value
def _normalize_output_path(out: Optional[str], response_format: str) -> Path:
if out:
path = Path(out)
if path.exists() and path.is_dir():
return path / f"speech.{response_format}"
if path.suffix == "":
return path.with_suffix("." + response_format)
if path.suffix.lstrip(".").lower() != response_format:
_warn(
f"Output extension {path.suffix} does not match response-format {response_format}."
)
return path
return Path(f"speech.{response_format}")
def _create_client():
try:
from openai import OpenAI
except ImportError:
_die("openai SDK not installed. Install with `uv pip install openai`.")
return OpenAI()
def _extract_retry_after_seconds(exc: Exception) -> Optional[float]:
for attr in ("retry_after", "retry_after_seconds"):
val = getattr(exc, attr, None)
if isinstance(val, (int, float)) and val >= 0:
return float(val)
msg = str(exc)
m = re.search(r"retry[- ]after[:= ]+([0-9]+(?:\\.[0-9]+)?)", msg, re.IGNORECASE)
if m:
try:
return float(m.group(1))
except Exception:
return None
return None
def _is_rate_limit_error(exc: Exception) -> bool:
name = exc.__class__.__name__.lower()
if "ratelimit" in name or "rate_limit" in name:
return True
msg = str(exc).lower()
return "429" in msg or "rate limit" in msg or "too many requests" in msg
def _is_transient_error(exc: Exception) -> bool:
if _is_rate_limit_error(exc):
return True
name = exc.__class__.__name__.lower()
if "timeout" in name or "timedout" in name or "tempor" in name:
return True
msg = str(exc).lower()
return "timeout" in msg or "timed out" in msg or "connection reset" in msg
def _maybe_drop_instructions(model: str, instructions: Optional[str]) -> Optional[str]:
if instructions and model in {"tts-1", "tts-1-hd"}:
_warn("instructions are not supported for tts-1 / tts-1-hd; ignoring.")
return None
return instructions
def _print_payload(payload: Dict[str, Any]) -> None:
print(json.dumps(payload, indent=2, sort_keys=True))
def _write_audio(
client: Any,
payload: Dict[str, Any],
out_path: Path,
*,
dry_run: bool,
force: bool,
attempts: int,
) -> None:
if dry_run:
_print_payload(payload)
print(f"Would write {out_path}")
return
_ensure_api_key(dry_run)
if out_path.exists() and not force:
_die(f"Output already exists: {out_path} (use --force to overwrite)")
out_path.parent.mkdir(parents=True, exist_ok=True)
last_exc: Optional[Exception] = None
for attempt in range(1, attempts + 1):
try:
with client.audio.speech.with_streaming_response.create(**payload) as response:
response.stream_to_file(out_path)
print(f"Wrote {out_path}")
return
except Exception as exc:
last_exc = exc
if not _is_transient_error(exc) or attempt >= attempts:
raise
sleep_s = _extract_retry_after_seconds(exc)
if sleep_s is None:
sleep_s = min(60.0, 2.0 ** attempt)
print(
f"Attempt {attempt}/{attempts} failed ({exc.__class__.__name__}); retrying in {sleep_s:.1f}s",
file=sys.stderr,
)
time.sleep(sleep_s)
if last_exc:
raise last_exc
def _slugify(value: str) -> str:
value = value.strip().lower()
value = re.sub(r"[^a-z0-9]+", "-", value)
value = re.sub(r"-+", "-", value).strip("-")
return value[:60] if value else "job"
def _read_jobs_jsonl(path: str) -> List[Dict[str, Any]]:
p = Path(path)
if not p.exists():
_die(f"Input file not found: {p}")
jobs: List[Dict[str, Any]] = []
for line_no, raw in enumerate(p.read_text(encoding="utf-8").splitlines(), start=1):
line = raw.strip()
if not line or line.startswith("#"):
continue
if line.startswith("{"):
try:
item = json.loads(line)
except json.JSONDecodeError as exc:
_die(f"Invalid JSON on line {line_no}: {exc}")
if not isinstance(item, dict):
_die(f"Invalid job on line {line_no}: expected object")
jobs.append(item)
else:
jobs.append({"input": line})
if not jobs:
_die("No jobs found in input file.")
return jobs
def _job_input(job: Dict[str, Any]) -> str:
for key in ("input", "text", "prompt"):
if key in job and str(job[key]).strip():
return str(job[key]).strip()
_die("Job missing input text (use 'input').")
return "" # unreachable
def _merge_non_null(base: Dict[str, Any], extra: Dict[str, Any]) -> Dict[str, Any]:
merged = dict(base)
for k, v in extra.items():
if v is not None:
merged[k] = v
return merged
def _enforce_rpm(rpm: int) -> int:
if rpm <= 0:
_die("rpm must be > 0")
if rpm > MAX_RPM:
_warn(f"rpm capped at {MAX_RPM} (requested {rpm}).")
return MAX_RPM
return rpm
def _sleep_for_rate_limit(last_ts: Optional[float], rpm: int) -> float:
min_interval = 60.0 / float(rpm)
now = time.monotonic()
if last_ts is None:
return now
elapsed = now - last_ts
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
return time.monotonic()
def _list_voices() -> None:
for name in sorted(ALLOWED_VOICES):
print(name)
def _run_speak(args: argparse.Namespace) -> int:
if args.list_voices:
_list_voices()
return 0
input_text = _read_text(args.input, args.input_file, "input")
_validate_input(input_text)
instructions = None
if args.instructions or args.instructions_file:
instructions = _read_text(args.instructions, args.instructions_file, "instructions")
model = str(args.model).strip()
voice = _normalize_voice(args.voice)
response_format = _normalize_format(args.response_format)
speed = _normalize_speed(args.speed)
instructions = _maybe_drop_instructions(model, instructions)
payload: Dict[str, Any] = {
"model": model,
"voice": voice,
"input": input_text,
"response_format": response_format,
}
if instructions:
payload["instructions"] = instructions
if speed is not None:
payload["speed"] = speed
out_path = _normalize_output_path(args.out, response_format)
if args.dry_run:
_ensure_api_key(True)
_print_payload(payload)
print(f"Would write {out_path}")
return 0
client = _create_client()
_write_audio(
client,
payload,
out_path,
dry_run=args.dry_run,
force=args.force,
attempts=args.attempts,
)
return 0
def _run_speak_batch(args: argparse.Namespace) -> int:
jobs = _read_jobs_jsonl(args.input)
out_dir = Path(args.out_dir)
base_instructions = None
if args.instructions or args.instructions_file:
base_instructions = _read_text(args.instructions, args.instructions_file, "instructions")
base_payload = {
"model": str(args.model).strip(),
"voice": _normalize_voice(args.voice),
"response_format": _normalize_format(args.response_format),
"speed": _normalize_speed(args.speed),
"instructions": base_instructions,
}
rpm = _enforce_rpm(args.rpm)
last_ts: Optional[float] = None
if args.dry_run:
_ensure_api_key(True)
client = None if args.dry_run else _create_client()
for idx, job in enumerate(jobs, start=1):
input_text = _job_input(job)
_validate_input(input_text)
job_payload = dict(base_payload)
job_payload["input"] = input_text
overrides: Dict[str, Any] = {}
if "model" in job:
overrides["model"] = str(job["model"]).strip()
if "voice" in job:
overrides["voice"] = _normalize_voice(job["voice"])
if "response_format" in job or "format" in job:
overrides["response_format"] = _normalize_format(job.get("response_format") or job.get("format"))
if "speed" in job and job["speed"] is not None:
overrides["speed"] = _normalize_speed(job["speed"])
if "instructions" in job and str(job["instructions"]).strip():
overrides["instructions"] = str(job["instructions"]).strip()
job_payload = _merge_non_null(job_payload, overrides)
job_payload["instructions"] = _maybe_drop_instructions(
job_payload["model"], job_payload.get("instructions")
)
if job_payload.get("instructions") is None:
job_payload.pop("instructions", None)
response_format = job_payload["response_format"]
explicit_out = job.get("out")
if explicit_out:
out_path = _normalize_output_path(str(explicit_out), response_format)
if out_path.is_absolute():
out_path = out_dir / out_path.name
else:
out_path = out_dir / out_path
else:
slug = _slugify(input_text[:80])
out_path = out_dir / f"{idx:03d}-{slug}.{response_format}"
if args.dry_run:
_print_payload(job_payload)
print(f"Would write {out_path}")
continue
last_ts = _sleep_for_rate_limit(last_ts, rpm)
if client is None:
client = _create_client()
_write_audio(
client,
job_payload,
out_path,
dry_run=False,
force=args.force,
attempts=args.attempts,
)
return 0
def _add_common_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--model",
default=DEFAULT_MODEL,
help=f"Model to use (default: {DEFAULT_MODEL})",
)
parser.add_argument(
"--voice",
default=DEFAULT_VOICE,
help=f"Voice to use (default: {DEFAULT_VOICE})",
)
parser.add_argument(
"--response-format",
default=DEFAULT_RESPONSE_FORMAT,
help=f"Output format (default: {DEFAULT_RESPONSE_FORMAT})",
)
parser.add_argument(
"--speed",
type=float,
default=DEFAULT_SPEED,
help=f"Speech speed (0.25-4.0, default: {DEFAULT_SPEED})",
)
parser.add_argument(
"--instructions",
help="Style directions for the voice",
)
parser.add_argument(
"--instructions-file",
help="Path to instructions text file",
)
parser.add_argument(
"--attempts",
type=int,
default=DEFAULT_ATTEMPTS,
help=f"Retries on transient errors (default: {DEFAULT_ATTEMPTS})",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print payload; do not call the API",
)
parser.add_argument(
"--force",
action="store_true",
help="Overwrite output files if they exist",
)
def main() -> int:
parser = argparse.ArgumentParser(
description="Generate speech audio using the OpenAI Audio API."
)
subparsers = parser.add_subparsers(dest="command", required=True)
list_voices = subparsers.add_parser("list-voices", help="List supported voices")
list_voices.set_defaults(func=lambda _args: (_list_voices() or 0))
speak = subparsers.add_parser("speak", help="Generate a single audio file")
speak.add_argument("--input", help="Input text")
speak.add_argument("--input-file", help="Path to input text file")
speak.add_argument("--out", help="Output file path")
speak.add_argument(
"--list-voices",
action="store_true",
help="Print voices and exit",
)
_add_common_args(speak)
speak.set_defaults(func=_run_speak)
batch = subparsers.add_parser("speak-batch", help="Generate from JSONL jobs")
batch.add_argument("--input", required=True, help="Path to JSONL file")
batch.add_argument(
"--out-dir",
default="out",
help="Output directory (default: out)",
)
batch.add_argument(
"--rpm",
type=int,
default=DEFAULT_RPM,
help=f"Requests per minute cap (default: {DEFAULT_RPM}, max: {MAX_RPM})",
)
_add_common_args(batch)
batch.set_defaults(func=_run_speak_batch)
args = parser.parse_args()
return int(args.func(args))
if __name__ == "__main__":
raise SystemExit(main())