Files
dotfiles/projects/scripts/whisper_record_transcribe.py
2026-02-13 00:01:10 -08:00

938 lines
30 KiB
Python
Executable File

#!/usr/bin/env python3
"""Record microphone audio and transcribe it with whisper.cpp, faster-whisper, or WhisperX."""
from __future__ import annotations
import argparse
import os
import shutil
import signal
import subprocess
import sys
import tempfile
import time
import traceback
import wave
from pathlib import Path
import numpy as np
import sounddevice as sd
DEFAULT_MODEL = "small"
DEFAULT_DURATION = 8.0
DEFAULT_STATE_DIR = Path.home() / ".cache" / "whisper-record-toggle"
DEFAULT_WHISPERCPP_MODEL_DIR = Path.home() / "models" / "whisper.cpp"
APP_NAME = "Whisper Record"
DEFAULT_TOGGLE_DEBOUNCE = 0.0
def _append_log(state_dir: Path, message: str) -> None:
state_dir.mkdir(parents=True, exist_ok=True)
log_file = state_dir / "worker.log"
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
with log_file.open("a", encoding="utf-8") as fh:
fh.write(f"[{timestamp}] {message}\n")
class Notifier:
"""Best-effort desktop notifications with optional live updates."""
def __init__(self) -> None:
self.enabled = shutil.which("notify-send") is not None
self.notification_id: str | None = None
def send(self, title: str, body: str, timeout_ms: int = 1500) -> None:
if not self.enabled:
return
base_cmd = [
"notify-send",
"-a",
APP_NAME,
"-u",
"normal",
"-t",
str(timeout_ms),
]
if self.notification_id:
base_cmd.extend(["-r", self.notification_id])
# Prefer -p so we can reuse the same ID for replacement updates.
result = subprocess.run(
[*base_cmd, "-p", title, body], capture_output=True, text=True
)
if result.returncode != 0:
# Fallback for environments where -p is unsupported.
subprocess.run([*base_cmd, title, body], capture_output=True, text=True)
return
notification_id = result.stdout.strip().splitlines()[-1].strip()
if notification_id.isdigit():
self.notification_id = notification_id
class Recorder:
"""Stream microphone audio into memory while tracking elapsed time."""
def __init__(self, samplerate: int, channels: int) -> None:
self.samplerate = samplerate
self.channels = channels
self.frames: list[np.ndarray] = []
def _callback(self, indata: np.ndarray, _frames: int, _time, status) -> None:
if status:
print(f"sounddevice warning: {status}", file=sys.stderr)
self.frames.append(indata.copy())
def record(
self, duration: float | None, notifier: Notifier, interval: float
) -> np.ndarray:
start = time.monotonic()
last_update = 0.0
with sd.InputStream(
samplerate=self.samplerate,
channels=self.channels,
dtype="int16",
callback=self._callback,
):
while True:
elapsed = time.monotonic() - start
if elapsed - last_update >= interval:
timer = _format_seconds(elapsed)
if duration is not None:
notifier.send(
"Recording", f"{timer} / {_format_seconds(duration)}"
)
else:
notifier.send(
"Recording", f"Elapsed: {timer} (press keybind again)"
)
last_update = elapsed
if duration is not None and elapsed >= duration:
break
time.sleep(0.05)
if not self.frames:
raise RuntimeError(
"No audio captured. Check your input device and permissions."
)
return np.concatenate(self.frames, axis=0)
def _format_seconds(value: float) -> str:
total = int(value)
minutes, seconds = divmod(total, 60)
return f"{minutes:02d}:{seconds:02d}"
def write_wav(path: Path, audio: np.ndarray, samplerate: int, channels: int) -> None:
with wave.open(str(path), "wb") as wav_file:
wav_file.setnchannels(channels)
wav_file.setsampwidth(2)
wav_file.setframerate(samplerate)
wav_file.writeframes(audio.tobytes())
def transcribe(
backend: str,
model_name_or_path: str,
wav_path: Path,
notifier: Notifier,
device: str,
compute_type: str,
beam_size: int,
task: str,
language: str | None,
whisperx_mode: str,
whisperx_vad_method: str,
whisperx_hf_token: str | None,
whisperx_min_speakers: int | None,
whisperx_max_speakers: int | None,
) -> str:
if backend == "whispercpp":
return transcribe_whispercpp(
model_name_or_path=model_name_or_path,
wav_path=wav_path,
notifier=notifier,
device=device,
beam_size=beam_size,
task=task,
language=language,
)
if backend == "ctranslate2":
return transcribe_ctranslate2(
model_name_or_path=model_name_or_path,
wav_path=wav_path,
notifier=notifier,
device=device,
compute_type=compute_type,
beam_size=beam_size,
task=task,
language=language,
)
if backend == "whisperx":
return transcribe_whisperx(
model_name_or_path=model_name_or_path,
wav_path=wav_path,
notifier=notifier,
device=device,
compute_type=compute_type,
beam_size=beam_size,
task=task,
language=language,
whisperx_mode=whisperx_mode,
whisperx_vad_method=whisperx_vad_method,
whisperx_hf_token=whisperx_hf_token,
whisperx_min_speakers=whisperx_min_speakers,
whisperx_max_speakers=whisperx_max_speakers,
)
raise RuntimeError(f"Unsupported backend: {backend}")
def _resolve_whispercpp_model(model_name_or_path: str) -> Path:
candidate = Path(model_name_or_path).expanduser()
if candidate.exists():
return candidate
name = model_name_or_path.strip()
search_paths = [
DEFAULT_WHISPERCPP_MODEL_DIR / name,
DEFAULT_WHISPERCPP_MODEL_DIR / f"ggml-{name}.bin",
DEFAULT_WHISPERCPP_MODEL_DIR / f"ggml-{name}.en.bin",
]
for path in search_paths:
if path.exists():
return path
raise RuntimeError(
"whisper.cpp model not found. Pass --model as a .bin path or place model at "
f"{DEFAULT_WHISPERCPP_MODEL_DIR}/ggml-<name>.bin (for example ggml-small.bin)."
)
def transcribe_whispercpp(
model_name_or_path: str,
wav_path: Path,
notifier: Notifier,
device: str,
beam_size: int,
task: str,
language: str | None,
) -> str:
whisper_cli = shutil.which("whisper-cli")
if not whisper_cli:
raise RuntimeError("whisper-cli not found in PATH. Install whisper.cpp.")
model_path = _resolve_whispercpp_model(model_name_or_path)
output_prefix = wav_path.parent / wav_path.stem
output_txt = Path(f"{output_prefix}.txt")
if output_txt.exists():
output_txt.unlink()
notifier.send("Transcribing", "Running whisper.cpp...", timeout_ms=1500)
cmd = [
whisper_cli,
"-f",
str(wav_path),
"-m",
str(model_path),
"-otxt",
"-of",
str(output_prefix),
"-bs",
str(beam_size),
"-np",
]
if device == "cpu":
cmd.append("-ng")
if language:
cmd.extend(["-l", language])
if task == "translate":
cmd.append("-tr")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
details = (result.stderr or result.stdout or "").strip()
raise RuntimeError(details or "whisper.cpp failed.")
if not output_txt.exists():
details = (result.stderr or result.stdout or "").strip()
raise RuntimeError(
"whisper.cpp completed but no transcript file was produced. "
f"Expected: {output_txt}. {details}"
)
return output_txt.read_text(encoding="utf-8").strip()
def transcribe_ctranslate2(
model_name_or_path: str,
wav_path: Path,
notifier: Notifier,
device: str,
compute_type: str,
beam_size: int,
task: str,
language: str | None,
) -> str:
whisper_cli = shutil.which("whisper-ctranslate2")
if not whisper_cli:
raise RuntimeError(
"whisper-ctranslate2 not found in PATH. Install with: pip install faster-whisper"
)
if model_name_or_path.endswith(".bin"):
raise RuntimeError(
"faster-whisper/ctranslate2 does not use ggml .bin models. "
"Use a model name like 'small' or a CTranslate2 model directory."
)
notifier.send("Transcribing", "Running whisper-ctranslate2...", timeout_ms=1500)
output_dir = wav_path.parent
output_txt = output_dir / f"{wav_path.stem}.txt"
if output_txt.exists():
output_txt.unlink()
cmd = [
whisper_cli,
str(wav_path),
"--output_dir",
str(output_dir),
"--output_format",
"txt",
"--device",
device,
"--compute_type",
compute_type,
"--beam_size",
str(beam_size),
"--task",
task,
"--verbose",
"False",
]
if language:
cmd.extend(["--language", language])
model_dir_candidate = Path(model_name_or_path).expanduser()
if model_dir_candidate.exists() and model_dir_candidate.is_dir():
cmd.extend(["--model_directory", str(model_dir_candidate)])
elif "/" in model_name_or_path or model_name_or_path.startswith("."):
cmd.extend(["--model_directory", model_name_or_path])
else:
cmd.extend(["--model", model_name_or_path])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
details = (result.stderr or result.stdout or "").strip()
raise RuntimeError(details or "whisper-ctranslate2 failed.")
if not output_txt.exists():
details = (result.stderr or result.stdout or "").strip()
raise RuntimeError(
"whisper-ctranslate2 completed but no transcript file was produced. "
f"Expected: {output_txt}. {details}"
)
return output_txt.read_text(encoding="utf-8").strip()
def transcribe_whisperx(
model_name_or_path: str,
wav_path: Path,
notifier: Notifier,
device: str,
compute_type: str,
beam_size: int,
task: str,
language: str | None,
whisperx_mode: str,
whisperx_vad_method: str,
whisperx_hf_token: str | None,
whisperx_min_speakers: int | None,
whisperx_max_speakers: int | None,
) -> str:
whisperx_cli = shutil.which("whisperx")
if not whisperx_cli:
raise RuntimeError("whisperx not found in PATH. Install with: pip install whisperx")
output_txt = wav_path.parent / f"{wav_path.stem}.txt"
if output_txt.exists():
output_txt.unlink()
if task == "translate" and not language:
raise RuntimeError("Translation requires --language so WhisperX can translate from the source language.")
def _run_whisperx(out_dir: Path, job_task: str, full_mode: bool) -> tuple[int, str]:
cmd = [
whisperx_cli,
str(wav_path),
"--output_dir",
str(out_dir),
"--output_format",
"txt",
"--device",
device,
"--compute_type",
compute_type,
"--beam_size",
str(beam_size),
"--task",
job_task,
"--vad_method",
whisperx_vad_method,
"--print_progress",
"False",
"--verbose",
"False",
]
if language:
cmd.extend(["--language", language])
model_dir_candidate = Path(model_name_or_path).expanduser()
if model_dir_candidate.exists() and model_dir_candidate.is_dir():
cmd.extend(["--model_dir", str(model_dir_candidate)])
else:
cmd.extend(["--model", model_name_or_path])
if whisperx_mode == "basic":
cmd.append("--no_align")
if full_mode:
cmd.append("--diarize")
if whisperx_hf_token:
cmd.extend(["--hf_token", whisperx_hf_token])
if whisperx_min_speakers is not None:
cmd.extend(["--min_speakers", str(whisperx_min_speakers)])
if whisperx_max_speakers is not None:
cmd.extend(["--max_speakers", str(whisperx_max_speakers)])
result = subprocess.run(cmd, capture_output=True, text=True)
details = (result.stderr or result.stdout or "").strip()
return result.returncode, details
notifier.send("Transcribing", "Running WhisperX...", timeout_ms=1500)
if whisperx_mode == "full":
native_dir = wav_path.parent / "whisperx-native"
native_dir.mkdir(parents=True, exist_ok=True)
notifier.send("Transcribing", "WhisperX full mode: extracting native subtitles", timeout_ms=1500)
rc, details = _run_whisperx(native_dir, "transcribe", full_mode=False)
if rc != 0:
raise RuntimeError(details or "WhisperX native transcription stage failed.")
rc, details = _run_whisperx(wav_path.parent, task, full_mode=(whisperx_mode == "full"))
if rc != 0:
raise RuntimeError(details or "whisperx failed.")
if not output_txt.exists():
raise RuntimeError(
"whisperx completed but no transcript file was produced. "
f"Expected: {output_txt}. {details}"
)
return output_txt.read_text(encoding="utf-8").strip()
def _type_with_tool(text: str) -> None:
if shutil.which("wtype"):
subprocess.run(["wtype", text], check=True)
return
if shutil.which("ydotool"):
subprocess.run(["ydotool", "type", "--", text], check=True)
return
if shutil.which("xdotool"):
subprocess.run(["xdotool", "type", "--clearmodifiers", "--", text], check=True)
return
raise RuntimeError("No typing tool found. Install one of: wtype, ydotool, xdotool.")
def _normalize_text_for_typing(text: str) -> str:
# Prevent simulated Enter key presses from transcript line breaks.
return " ".join(text.split())
def _emit_text(text: str, args: argparse.Namespace, notifier: Notifier) -> int:
if args.output == "print":
print(text)
notifier.send("Done", "Transcription printed to terminal", timeout_ms=1500)
return 0
try:
_type_with_tool(_normalize_text_for_typing(text))
except Exception as exc:
print(f"Failed to simulate typing: {exc}", file=sys.stderr)
notifier.send("Typing error", str(exc), timeout_ms=2500)
return 1
notifier.send("Done", "Transcription typed into active window", timeout_ms=1500)
return 0
def _read_pid(pid_file: Path) -> int | None:
if not pid_file.exists():
return None
try:
return int(pid_file.read_text(encoding="utf-8").strip())
except ValueError:
return None
def _is_alive(pid: int | None) -> bool:
if pid is None:
return False
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
def _read_and_clear_error(error_file: Path) -> str | None:
if not error_file.exists():
return None
message = error_file.read_text(encoding="utf-8").strip()
error_file.unlink()
return message or "Worker failed."
def _run_transcription_job(args: argparse.Namespace, duration: float | None) -> str:
notifier = Notifier()
model_name_or_path = args.model
if (
"/" in model_name_or_path
or model_name_or_path.startswith(".")
or model_name_or_path.startswith("~")
):
model_path = Path(model_name_or_path).expanduser()
if not model_path.exists():
raise RuntimeError(f"Model path not found: {model_path}")
model_name_or_path = str(model_path)
notifier.send("Recording", "Starting...", timeout_ms=1200)
recorder = Recorder(samplerate=args.samplerate, channels=args.channels)
try:
audio = recorder.record(
duration=duration,
notifier=notifier,
interval=max(args.notify_interval, 0.2),
)
except KeyboardInterrupt:
if not recorder.frames:
notifier.send("Recording", "Cancelled", timeout_ms=1000)
return ""
audio = np.concatenate(recorder.frames, axis=0)
except Exception as exc:
raise RuntimeError(f"Recording failed: {exc}") from exc
with tempfile.TemporaryDirectory(prefix="whisper-audio-") as tmp_dir:
wav_path = Path(tmp_dir) / "input.wav"
write_wav(wav_path, audio, args.samplerate, args.channels)
notifier.send(
"Transcribing",
f"Running backend: {args.backend}",
timeout_ms=1500,
)
text = transcribe(
backend=args.backend,
model_name_or_path=model_name_or_path,
wav_path=wav_path,
notifier=notifier,
device=args.device,
compute_type=args.compute_type,
beam_size=args.beam_size,
task=args.task,
language=args.language,
whisperx_mode=args.whisperx_mode,
whisperx_vad_method=args.whisperx_vad_method,
whisperx_hf_token=args.whisperx_hf_token,
whisperx_min_speakers=args.whisperx_min_speakers,
whisperx_max_speakers=args.whisperx_max_speakers,
)
return text.strip()
def run_once(args: argparse.Namespace) -> int:
duration = None if args.duration <= 0 else args.duration
try:
text = _run_transcription_job(args, duration=duration)
except Exception as exc:
print(str(exc), file=sys.stderr)
Notifier().send("Transcription error", str(exc), timeout_ms=3000)
return 1
if not text:
print("(No speech detected)")
Notifier().send("Done", "No speech detected", timeout_ms=1500)
return 0
return _emit_text(text, args, Notifier())
def run_worker(args: argparse.Namespace) -> int:
state_dir = Path(args.state_dir)
state_dir.mkdir(parents=True, exist_ok=True)
pid_file = state_dir / "recording.pid"
transcript_file = state_dir / "transcript.txt"
error_file = state_dir / "error.txt"
pid_file.write_text(str(os.getpid()), encoding="utf-8")
if transcript_file.exists():
transcript_file.unlink()
if error_file.exists():
error_file.unlink()
_append_log(
state_dir,
f"worker start model={args.model} device={args.device} compute_type={args.compute_type}",
)
try:
text = _run_transcription_job(args, duration=None)
transcript_file.write_text(text, encoding="utf-8")
_append_log(state_dir, f"worker complete transcript_chars={len(text)}")
except Exception as exc:
details = "".join(
traceback.format_exception(type(exc), exc, exc.__traceback__)
).strip()
error_file.write_text(str(exc), encoding="utf-8")
_append_log(state_dir, f"worker error: {details}")
return 1
finally:
if pid_file.exists():
pid_file.unlink()
return 0
def start_background(args: argparse.Namespace) -> int:
state_dir = Path(args.state_dir)
state_dir.mkdir(parents=True, exist_ok=True)
pid_file = state_dir / "recording.pid"
pid = _read_pid(pid_file)
if _is_alive(pid):
print("Recording is already running.")
return 0
cmd = [
sys.executable,
str(Path(__file__).resolve()),
"--mode",
"once",
"--worker",
"--backend",
args.backend,
"--model",
args.model,
"--samplerate",
str(args.samplerate),
"--channels",
str(args.channels),
"--notify-interval",
str(args.notify_interval),
"--state-dir",
str(state_dir),
"--device",
args.device,
"--compute-type",
args.compute_type,
"--beam-size",
str(args.beam_size),
"--task",
args.task,
"--whisperx-mode",
args.whisperx_mode,
"--whisperx-vad-method",
args.whisperx_vad_method,
]
if args.language:
cmd.extend(["--language", args.language])
if args.whisperx_hf_token:
cmd.extend(["--whisperx-hf-token", args.whisperx_hf_token])
if args.whisperx_min_speakers is not None:
cmd.extend(["--whisperx-min-speakers", str(args.whisperx_min_speakers)])
if args.whisperx_max_speakers is not None:
cmd.extend(["--whisperx-max-speakers", str(args.whisperx_max_speakers)])
log_path = state_dir / "worker.log"
with log_path.open("a", encoding="utf-8") as log_fh:
subprocess.Popen(
cmd,
stdout=log_fh,
stderr=log_fh,
start_new_session=True,
)
_append_log(state_dir, "start requested")
# If worker fails immediately (common with model/device config issues),
# surface that early instead of only showing "No active recording" later.
time.sleep(0.15)
worker_error = _read_and_clear_error(state_dir / "error.txt")
if worker_error:
print(worker_error, file=sys.stderr)
Notifier().send("Transcription error", worker_error, timeout_ms=3000)
return 1
Notifier().send(
"Recording", "Started (press keybind again to stop)", timeout_ms=1200
)
print("Recording started.")
return 0
def stop_background(args: argparse.Namespace) -> int:
state_dir = Path(args.state_dir)
pid_file = state_dir / "recording.pid"
transcript_file = state_dir / "transcript.txt"
error_file = state_dir / "error.txt"
pid = _read_pid(pid_file)
if not _is_alive(pid):
worker_error = _read_and_clear_error(error_file)
if worker_error:
print(worker_error, file=sys.stderr)
Notifier().send("Transcription error", worker_error, timeout_ms=3000)
return 1
if pid_file.exists():
pid_file.unlink()
print(f"No active recording. Check log: {state_dir / 'worker.log'}")
return 1
assert pid is not None
os.kill(pid, signal.SIGINT)
Notifier().send("Recording", "Stopping...", timeout_ms=1200)
deadline = time.monotonic() + max(args.stop_timeout, 1.0)
while _is_alive(pid) and time.monotonic() < deadline:
time.sleep(0.1)
if _is_alive(pid):
print("Timed out waiting for transcription to finish.", file=sys.stderr)
_append_log(state_dir, "stop timeout waiting for worker exit")
return 1
worker_error = _read_and_clear_error(error_file)
if worker_error:
print(worker_error, file=sys.stderr)
Notifier().send("Transcription error", worker_error, timeout_ms=3000)
return 1
text = ""
if transcript_file.exists():
text = transcript_file.read_text(encoding="utf-8").strip()
transcript_file.unlink()
if not text:
print("(No speech detected)")
Notifier().send("Done", "No speech detected", timeout_ms=1500)
return 0
return _emit_text(text, args, Notifier())
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Record from microphone and transcribe with whisper.cpp, faster-whisper, or WhisperX"
)
parser.add_argument(
"--mode",
choices=("once", "start", "stop", "toggle"),
default="once",
help="once: record/transcribe immediately, start/stop: background toggle pieces, toggle: start if idle else stop",
)
parser.add_argument("--start", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--stop", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--toggle", action="store_true", help=argparse.SUPPRESS)
parser.add_argument(
"--worker",
action="store_true",
help=argparse.SUPPRESS,
)
parser.add_argument(
"--backend",
choices=("whispercpp", "ctranslate2", "whisperx"),
default="whispercpp",
help="Transcription backend (default: whispercpp)",
)
parser.add_argument(
"--model",
default=DEFAULT_MODEL,
help="Model name or path. For whispercpp: ggml .bin path/name. For ctranslate2/whisperx: model name or model directory.",
)
parser.add_argument(
"--task",
choices=("transcribe", "translate"),
default="transcribe",
help="Task to run (default: transcribe).",
)
parser.add_argument(
"--language",
default=None,
help="Source language code/name (for example: en, es, Japanese). Strongly recommended, and required for --task translate.",
)
parser.add_argument(
"--duration",
type=float,
default=DEFAULT_DURATION,
help="Recording length in seconds for --mode once (default: 8). Use 0 for manual stop.",
)
parser.add_argument(
"--samplerate",
type=int,
default=16000,
help="Input sample rate (default: 16000)",
)
parser.add_argument(
"--channels", type=int, default=1, help="Input channels (default: 1)"
)
parser.add_argument(
"--notify-interval",
type=float,
default=1.0,
help="Seconds between notification timer updates (default: 1.0)",
)
parser.add_argument(
"--output",
choices=("print", "type"),
default="print",
help="How to emit transcript text: print to terminal or type into active window",
)
parser.add_argument(
"--device",
default="auto",
help="Inference device for faster-whisper (auto, cpu, cuda)",
)
parser.add_argument(
"--compute-type",
default="auto",
help="faster-whisper compute type (auto, default, float16, int8, int8_float16, ...)",
)
parser.add_argument(
"--beam-size",
type=int,
default=5,
help="Beam size for decoding (default: 5)",
)
parser.add_argument(
"--whisperx-mode",
choices=("basic", "align", "full"),
default="align",
help="WhisperX pipeline mode: basic (no align), align (aligned transcript), full (native transcript + translate/transcribe + align + VAD + diarization).",
)
parser.add_argument(
"--whisperx-vad-method",
choices=("silero", "pyannote"),
default="silero",
help="WhisperX VAD method (default: silero).",
)
parser.add_argument(
"--whisperx-hf-token",
default=None,
help="Optional HuggingFace token for WhisperX diarization models.",
)
parser.add_argument(
"--whisperx-min-speakers",
type=int,
default=None,
help="Optional minimum speaker count for WhisperX diarization.",
)
parser.add_argument(
"--whisperx-max-speakers",
type=int,
default=None,
help="Optional maximum speaker count for WhisperX diarization.",
)
parser.add_argument(
"--state-dir",
default=str(DEFAULT_STATE_DIR),
help="Directory to store toggle state files",
)
parser.add_argument(
"--stop-timeout",
type=float,
default=90.0,
help="Max seconds to wait for background transcription to finish on stop",
)
parser.add_argument(
"--toggle-debounce",
type=float,
default=DEFAULT_TOGGLE_DEBOUNCE,
help="Ignore repeated toggle triggers within this many seconds (default: 0.0, disabled)",
)
args = parser.parse_args()
legacy_modes = [
mode
for flag, mode in (
(args.start, "start"),
(args.stop, "stop"),
(args.toggle, "toggle"),
)
if flag
]
if len(legacy_modes) > 1:
parser.error("Use only one of --start, --stop, or --toggle.")
if legacy_modes:
args.mode = legacy_modes[0]
if args.task == "translate" and not args.language:
parser.error("--task translate requires --language so the source language is explicit.")
return args
def main() -> int:
args = parse_args()
if args.worker:
return run_worker(args)
if args.mode == "once":
return run_once(args)
if args.mode == "start":
return start_background(args)
if args.mode == "stop":
return stop_background(args)
state_dir = Path(args.state_dir)
state_dir.mkdir(parents=True, exist_ok=True)
debounce_file = state_dir / "last-toggle.txt"
now = time.monotonic()
if args.toggle_debounce > 0 and debounce_file.exists():
try:
last = float(debounce_file.read_text(encoding="utf-8").strip())
except ValueError:
last = 0.0
if now - last < args.toggle_debounce:
_append_log(
state_dir,
f"toggle ignored by debounce: delta={now - last:.3f}s < {args.toggle_debounce:.3f}s",
)
return 0
debounce_file.write_text(f"{now:.6f}", encoding="utf-8")
pid = _read_pid(state_dir / "recording.pid")
if _is_alive(pid):
return stop_background(args)
return start_background(args)
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
state_dir = DEFAULT_STATE_DIR
try:
argv = sys.argv[1:]
if "--state-dir" in argv:
idx = argv.index("--state-dir")
if idx + 1 < len(argv):
state_dir = Path(argv[idx + 1]).expanduser()
except Exception:
pass
_append_log(
state_dir,
"fatal exception: "
+ "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)).strip(),
)
raise