This commit is contained in:
2026-02-09 22:30:57 -08:00
parent 4e76d0db9a
commit d23a385861
11 changed files with 360 additions and 1451 deletions

View File

@@ -1,15 +1,15 @@
#!/usr/bin/env bash
THEME="${THEME:-/opt/mpv-yomitan/catppuccin-macchiato.rasi}"
THEME="${THEME:-$HOME/.local/share/SubMiner/themes/subminer.rasi}"
FONTCONFIG_FILE=$HOME/.config/mpv/mpv-fonts.conf
COMMAND=mpv
VIDEO_EXTENSIONS="mkv|mp4|avi|webm|mov|flv|wmv|m4v|ts|m2ts"
# Parse command-line options first
while getopts ":it:" opt; do
while getopts ":st:" opt; do
case $opt in
i)
COMMAND="$COMMAND --profile=immersion"
s)
COMMAND="$COMMAND --profile=subminer"
;;
t)
THEME="$OPTARG"

View File

@@ -1,28 +1,37 @@
#!/usr/bin/env python3
"""Record microphone audio and transcribe it with whisper.cpp."""
"""Record microphone audio and transcribe it with whisper.cpp or faster-whisper."""
from __future__ import annotations
import argparse
import os
import re
import shutil
import signal
import subprocess
import sys
import tempfile
import threading
import time
import traceback
import wave
from pathlib import Path
import numpy as np
import sounddevice as sd
DEFAULT_MODEL = "~/models/whisper.cpp/ggml-small.bin"
DEFAULT_MODEL = "small"
DEFAULT_DURATION = 8.0
DEFAULT_STATE_DIR = Path.home() / ".cache" / "whisper-record-toggle"
DEFAULT_WHISPERCPP_MODEL_DIR = Path.home() / "models" / "whisper.cpp"
APP_NAME = "Whisper Record"
DEFAULT_TOGGLE_DEBOUNCE = 0.0
def _append_log(state_dir: Path, message: str) -> None:
state_dir.mkdir(parents=True, exist_ok=True)
log_file = state_dir / "worker.log"
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
with log_file.open("a", encoding="utf-8") as fh:
fh.write(f"[{timestamp}] {message}\n")
class Notifier:
@@ -120,20 +129,6 @@ def _format_seconds(value: float) -> str:
return f"{minutes:02d}:{seconds:02d}"
def find_whisper_binary(explicit: str | None) -> str:
if explicit:
return explicit
for candidate in ("whisper-cli", "main", "whisper"):
path = shutil.which(candidate)
if path:
return path
raise RuntimeError(
"Could not find whisper.cpp binary. Pass --whisper-bin /path/to/whisper-cli"
)
def write_wav(path: Path, audio: np.ndarray, samplerate: int, channels: int) -> None:
with wave.open(str(path), "wb") as wav_file:
wav_file.setnchannels(channels)
@@ -142,72 +137,164 @@ def write_wav(path: Path, audio: np.ndarray, samplerate: int, channels: int) ->
wav_file.writeframes(audio.tobytes())
def transcribe(whisper_bin: str, model: str, wav_path: Path, notifier: Notifier) -> str:
with tempfile.TemporaryDirectory(prefix="whisper-out-") as out_dir:
out_base = Path(out_dir) / "transcript"
cmd = [
whisper_bin,
"-m",
model,
"-f",
str(wav_path),
"-otxt",
"-of",
str(out_base),
"-nt",
]
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
def transcribe(
backend: str,
model_name_or_path: str,
wav_path: Path,
notifier: Notifier,
device: str,
compute_type: str,
beam_size: int,
) -> str:
if backend == "whispercpp":
return transcribe_whispercpp(
model_name_or_path=model_name_or_path,
wav_path=wav_path,
notifier=notifier,
device=device,
beam_size=beam_size,
)
output_lines: list[str] = []
progress: dict[str, int | None] = {"pct": None}
if backend == "ctranslate2":
return transcribe_ctranslate2(
model_name_or_path=model_name_or_path,
wav_path=wav_path,
notifier=notifier,
device=device,
compute_type=compute_type,
beam_size=beam_size,
)
raise RuntimeError(f"Unsupported backend: {backend}")
def _reader() -> None:
assert process.stdout is not None
for line in process.stdout:
output_lines.append(line)
match = re.search(r"(?<!\d)(\d{1,3})%", line)
if match:
progress["pct"] = min(100, int(match.group(1)))
reader = threading.Thread(target=_reader, daemon=True)
reader.start()
def _resolve_whispercpp_model(model_name_or_path: str) -> Path:
candidate = Path(model_name_or_path).expanduser()
if candidate.exists():
return candidate
spinner = "|/-\\"
frame = 0
while process.poll() is None:
pct = progress["pct"]
status = (
f"Transcribing... {pct}%"
if pct is not None
else f"Transcribing... {spinner[frame % len(spinner)]}"
)
notifier.send("Transcribing", status, timeout_ms=1200)
print(f"\r{status}", end="", file=sys.stderr, flush=True)
frame += 1
time.sleep(0.35)
name = model_name_or_path.strip()
search_paths = [
DEFAULT_WHISPERCPP_MODEL_DIR / name,
DEFAULT_WHISPERCPP_MODEL_DIR / f"ggml-{name}.bin",
DEFAULT_WHISPERCPP_MODEL_DIR / f"ggml-{name}.en.bin",
]
for path in search_paths:
if path.exists():
return path
reader.join(timeout=1.0)
print("\r" + (" " * 48) + "\r", end="", file=sys.stderr, flush=True)
result_stdout = "".join(output_lines).strip()
raise RuntimeError(
"whisper.cpp model not found. Pass --model as a .bin path or place model at "
f"{DEFAULT_WHISPERCPP_MODEL_DIR}/ggml-<name>.bin (for example ggml-small.bin)."
)
if process.returncode != 0:
stderr = result_stdout
raise RuntimeError(f"whisper.cpp failed: {stderr}")
txt_file = out_base.with_suffix(".txt")
if txt_file.exists():
return txt_file.read_text(encoding="utf-8").strip()
def transcribe_whispercpp(
model_name_or_path: str,
wav_path: Path,
notifier: Notifier,
device: str,
beam_size: int,
) -> str:
whisper_cli = shutil.which("whisper-cli")
if not whisper_cli:
raise RuntimeError("whisper-cli not found in PATH. Install whisper.cpp.")
fallback = result_stdout
if fallback:
return fallback
model_path = _resolve_whispercpp_model(model_name_or_path)
output_prefix = wav_path.parent / wav_path.stem
output_txt = Path(f"{output_prefix}.txt")
if output_txt.exists():
output_txt.unlink()
raise RuntimeError("Transcription finished but no output text was produced.")
notifier.send("Transcribing", "Running whisper.cpp...", timeout_ms=1500)
cmd = [
whisper_cli,
"-f",
str(wav_path),
"-m",
str(model_path),
"-otxt",
"-of",
str(output_prefix),
"-bs",
str(beam_size),
"-np",
]
if device == "cpu":
cmd.append("-ng")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
details = (result.stderr or result.stdout or "").strip()
raise RuntimeError(details or "whisper.cpp failed.")
if not output_txt.exists():
details = (result.stderr or result.stdout or "").strip()
raise RuntimeError(
"whisper.cpp completed but no transcript file was produced. "
f"Expected: {output_txt}. {details}"
)
return output_txt.read_text(encoding="utf-8").strip()
def transcribe_ctranslate2(
model_name_or_path: str,
wav_path: Path,
notifier: Notifier,
device: str,
compute_type: str,
beam_size: int,
) -> str:
whisper_cli = shutil.which("whisper-ctranslate2")
if not whisper_cli:
raise RuntimeError(
"whisper-ctranslate2 not found in PATH. Install with: pip install faster-whisper"
)
if model_name_or_path.endswith(".bin"):
raise RuntimeError(
"faster-whisper/ctranslate2 does not use ggml .bin models. "
"Use a model name like 'small' or a CTranslate2 model directory."
)
notifier.send("Transcribing", "Running whisper-ctranslate2...", timeout_ms=1500)
output_dir = wav_path.parent
output_txt = output_dir / f"{wav_path.stem}.txt"
if output_txt.exists():
output_txt.unlink()
cmd = [
whisper_cli,
str(wav_path),
"--output_dir",
str(output_dir),
"--output_format",
"txt",
"--device",
device,
"--compute_type",
compute_type,
"--beam_size",
str(beam_size),
"--verbose",
"False",
]
model_dir_candidate = Path(model_name_or_path).expanduser()
if model_dir_candidate.exists() and model_dir_candidate.is_dir():
cmd.extend(["--model_directory", str(model_dir_candidate)])
elif "/" in model_name_or_path or model_name_or_path.startswith("."):
cmd.extend(["--model_directory", model_name_or_path])
else:
cmd.extend(["--model", model_name_or_path])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
details = (result.stderr or result.stdout or "").strip()
raise RuntimeError(details or "whisper-ctranslate2 failed.")
if not output_txt.exists():
details = (result.stderr or result.stdout or "").strip()
raise RuntimeError(
"whisper-ctranslate2 completed but no transcript file was produced. "
f"Expected: {output_txt}. {details}"
)
return output_txt.read_text(encoding="utf-8").strip()
def _type_with_tool(text: str) -> None:
@@ -261,13 +348,26 @@ def _is_alive(pid: int | None) -> bool:
return True
def _read_and_clear_error(error_file: Path) -> str | None:
if not error_file.exists():
return None
message = error_file.read_text(encoding="utf-8").strip()
error_file.unlink()
return message or "Worker failed."
def _run_transcription_job(args: argparse.Namespace, duration: float | None) -> str:
notifier = Notifier()
model_path = Path(args.model).expanduser()
if not model_path.exists():
raise RuntimeError(f"Model file not found: {model_path}")
whisper_bin = find_whisper_binary(args.whisper_bin)
model_name_or_path = args.model
if (
"/" in model_name_or_path
or model_name_or_path.startswith(".")
or model_name_or_path.startswith("~")
):
model_path = Path(model_name_or_path).expanduser()
if not model_path.exists():
raise RuntimeError(f"Model path not found: {model_path}")
model_name_or_path = str(model_path)
notifier.send("Recording", "Starting...", timeout_ms=1200)
recorder = Recorder(samplerate=args.samplerate, channels=args.channels)
@@ -289,8 +389,20 @@ def _run_transcription_job(args: argparse.Namespace, duration: float | None) ->
with tempfile.TemporaryDirectory(prefix="whisper-audio-") as tmp_dir:
wav_path = Path(tmp_dir) / "input.wav"
write_wav(wav_path, audio, args.samplerate, args.channels)
notifier.send("Transcribing", "Running whisper.cpp...", timeout_ms=1500)
text = transcribe(whisper_bin, str(model_path), wav_path, notifier)
notifier.send(
"Transcribing",
f"Running backend: {args.backend}",
timeout_ms=1500,
)
text = transcribe(
backend=args.backend,
model_name_or_path=model_name_or_path,
wav_path=wav_path,
notifier=notifier,
device=args.device,
compute_type=args.compute_type,
beam_size=args.beam_size,
)
return text.strip()
@@ -324,12 +436,21 @@ def run_worker(args: argparse.Namespace) -> int:
transcript_file.unlink()
if error_file.exists():
error_file.unlink()
_append_log(
state_dir,
f"worker start model={args.model} device={args.device} compute_type={args.compute_type}",
)
try:
text = _run_transcription_job(args, duration=None)
transcript_file.write_text(text, encoding="utf-8")
_append_log(state_dir, f"worker complete transcript_chars={len(text)}")
except Exception as exc:
details = "".join(
traceback.format_exception(type(exc), exc, exc.__traceback__)
).strip()
error_file.write_text(str(exc), encoding="utf-8")
_append_log(state_dir, f"worker error: {details}")
return 1
finally:
if pid_file.exists():
@@ -354,6 +475,8 @@ def start_background(args: argparse.Namespace) -> int:
"--mode",
"once",
"--worker",
"--backend",
args.backend,
"--model",
args.model,
"--samplerate",
@@ -364,16 +487,32 @@ def start_background(args: argparse.Namespace) -> int:
str(args.notify_interval),
"--state-dir",
str(state_dir),
"--device",
args.device,
"--compute-type",
args.compute_type,
"--beam-size",
str(args.beam_size),
]
if args.whisper_bin:
cmd.extend(["--whisper-bin", args.whisper_bin])
subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
log_path = state_dir / "worker.log"
with log_path.open("a", encoding="utf-8") as log_fh:
subprocess.Popen(
cmd,
stdout=log_fh,
stderr=log_fh,
start_new_session=True,
)
_append_log(state_dir, "start requested")
# If worker fails immediately (common with model/device config issues),
# surface that early instead of only showing "No active recording" later.
time.sleep(0.15)
worker_error = _read_and_clear_error(state_dir / "error.txt")
if worker_error:
print(worker_error, file=sys.stderr)
Notifier().send("Transcription error", worker_error, timeout_ms=3000)
return 1
Notifier().send(
"Recording", "Started (press keybind again to stop)", timeout_ms=1200
)
@@ -389,9 +528,14 @@ def stop_background(args: argparse.Namespace) -> int:
pid = _read_pid(pid_file)
if not _is_alive(pid):
worker_error = _read_and_clear_error(error_file)
if worker_error:
print(worker_error, file=sys.stderr)
Notifier().send("Transcription error", worker_error, timeout_ms=3000)
return 1
if pid_file.exists():
pid_file.unlink()
print("No active recording.")
print(f"No active recording. Check log: {state_dir / 'worker.log'}")
return 1
assert pid is not None
@@ -404,12 +548,13 @@ def stop_background(args: argparse.Namespace) -> int:
if _is_alive(pid):
print("Timed out waiting for transcription to finish.", file=sys.stderr)
_append_log(state_dir, "stop timeout waiting for worker exit")
return 1
if error_file.exists():
message = error_file.read_text(encoding="utf-8").strip()
error_file.unlink()
print(message or "Worker failed.", file=sys.stderr)
worker_error = _read_and_clear_error(error_file)
if worker_error:
print(worker_error, file=sys.stderr)
Notifier().send("Transcription error", worker_error, timeout_ms=3000)
return 1
text = ""
@@ -427,7 +572,7 @@ def stop_background(args: argparse.Namespace) -> int:
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Record from microphone and transcribe with whisper.cpp"
description="Record from microphone and transcribe with whisper.cpp or faster-whisper"
)
parser.add_argument(
"--mode",
@@ -435,18 +580,24 @@ def parse_args() -> argparse.Namespace:
default="once",
help="once: record/transcribe immediately, start/stop: background toggle pieces, toggle: start if idle else stop",
)
parser.add_argument("--start", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--stop", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--toggle", action="store_true", help=argparse.SUPPRESS)
parser.add_argument(
"--worker",
action="store_true",
help=argparse.SUPPRESS,
)
parser.add_argument(
"--model", default=DEFAULT_MODEL, help="Path to whisper.cpp model"
"--backend",
choices=("whispercpp", "ctranslate2"),
default="whispercpp",
help="Transcription backend (default: whispercpp)",
)
parser.add_argument(
"--whisper-bin",
default=None,
help="Path to whisper.cpp binary (default: auto-detect whisper-cli/main)",
"--model",
default=DEFAULT_MODEL,
help="Model name or path. For whispercpp: ggml .bin path/name. For ctranslate2: model name or model directory.",
)
parser.add_argument(
"--duration",
@@ -475,6 +626,22 @@ def parse_args() -> argparse.Namespace:
default="print",
help="How to emit transcript text: print to terminal or type into active window",
)
parser.add_argument(
"--device",
default="auto",
help="Inference device for faster-whisper (auto, cpu, cuda)",
)
parser.add_argument(
"--compute-type",
default="auto",
help="faster-whisper compute type (auto, default, float16, int8, int8_float16, ...)",
)
parser.add_argument(
"--beam-size",
type=int,
default=5,
help="Beam size for decoding (default: 5)",
)
parser.add_argument(
"--state-dir",
default=str(DEFAULT_STATE_DIR),
@@ -486,7 +653,27 @@ def parse_args() -> argparse.Namespace:
default=90.0,
help="Max seconds to wait for background transcription to finish on stop",
)
return parser.parse_args()
parser.add_argument(
"--toggle-debounce",
type=float,
default=DEFAULT_TOGGLE_DEBOUNCE,
help="Ignore repeated toggle triggers within this many seconds (default: 0.0, disabled)",
)
args = parser.parse_args()
legacy_modes = [
mode
for flag, mode in (
(args.start, "start"),
(args.stop, "stop"),
(args.toggle, "toggle"),
)
if flag
]
if len(legacy_modes) > 1:
parser.error("Use only one of --start, --stop, or --toggle.")
if legacy_modes:
args.mode = legacy_modes[0]
return args
def main() -> int:
@@ -505,6 +692,22 @@ def main() -> int:
return stop_background(args)
state_dir = Path(args.state_dir)
state_dir.mkdir(parents=True, exist_ok=True)
debounce_file = state_dir / "last-toggle.txt"
now = time.monotonic()
if args.toggle_debounce > 0 and debounce_file.exists():
try:
last = float(debounce_file.read_text(encoding="utf-8").strip())
except ValueError:
last = 0.0
if now - last < args.toggle_debounce:
_append_log(
state_dir,
f"toggle ignored by debounce: delta={now - last:.3f}s < {args.toggle_debounce:.3f}s",
)
return 0
debounce_file.write_text(f"{now:.6f}", encoding="utf-8")
pid = _read_pid(state_dir / "recording.pid")
if _is_alive(pid):
return stop_background(args)
@@ -512,4 +715,21 @@ def main() -> int:
if __name__ == "__main__":
raise SystemExit(main())
try:
raise SystemExit(main())
except Exception as exc:
state_dir = DEFAULT_STATE_DIR
try:
argv = sys.argv[1:]
if "--state-dir" in argv:
idx = argv.index("--state-dir")
if idx + 1 < len(argv):
state_dir = Path(argv[idx + 1]).expanduser()
except Exception:
pass
_append_log(
state_dir,
"fatal exception: "
+ "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)).strip(),
)
raise