Compare commits

...

9 Commits

Author SHA1 Message Date
4e76d0db9a update 2026-02-08 18:56:20 -08:00
2e1716c32f update 2026-02-08 18:51:49 -08:00
0778ea116a update 2026-02-08 18:51:10 -08:00
056406456b update 2026-02-08 18:50:52 -08:00
7f07e2dd73 update 2026-02-08 18:48:04 -08:00
9ae8dfd974 Merge branch 'master' of github.com:ksyasuda/dotfiles 2026-02-08 18:46:30 -08:00
ed1014edcc mpv: point ytdl-preload submodule to gist fork 2026-02-08 18:44:20 -08:00
4f9f513ed0 Merge branch 'master' of github.com:ksyasuda/dotfiles 2026-02-08 15:54:01 -08:00
640efd074b add transcribe script 2026-02-08 15:53:54 -08:00
10 changed files with 1743 additions and 98 deletions

View File

@@ -152,3 +152,6 @@ bind = $mainMod, a, exec, ~/.config/rofi/scripts/rofi-anki-script.sh
bindl = , mouse:275, exec, xdotool key alt+w # top mouse to texthooker
bindl = , mouse:276, exec, xdotool key alt+grave # bottom mouse to overlay
bind = ALT, g, exec, /opt/mpv-yomitan/mpv-yomitan.AppImage --toggle
# F5
bind = ,code:71, exec, ~/projects/scripts/whisper_record_transcribe.py --mode toggle --output type

View File

@@ -1 +0,0 @@
temp=/tmp/ytdl-preload

View File

@@ -1 +0,0 @@
../submodules/ytdl-preload/ytdl-preload.lua

View File

@@ -0,0 +1,405 @@
----------------------
-- #example ytdl_preload.conf
-- # make sure lines do not have trailing whitespace
-- # ytdl_opt has no sanity check and should be formatted exactly how it would appear in yt-dlp CLI, they are split into a key/value pair on whitespace
-- # at least on Windows, do not escape '\' in temp, just us a single one for each divider
-- #temp=R:\ytdltest
-- #ytdl_opt1=-r 50k
-- #ytdl_opt2=-N 5
-- #ytdl_opt#=etc
----------------------
local nextIndex
local caught = true
-- local pop = false
local ytdl = "yt-dlp"
local utils = require 'mp.utils'
local options = require 'mp.options'
local opts = {
temp = "/tmp/ytdl-preload",
ytdl_opt1 = "",
ytdl_opt2 = "",
ytdl_opt3 = "",
ytdl_opt4 = "",
ytdl_opt5 = "",
ytdl_opt6 = "",
ytdl_opt7 = "",
ytdl_opt8 = "",
ytdl_opt9 = "",
}
options.read_options(opts, "ytdl_preload")
local additionalOpts = {}
for k, v in pairs(opts) do
if k:find("ytdl_opt%d") and v ~= "" then
additionalOpts[k] = v
-- print("entry")
-- print(k .. v)
end
end
local cachePath = opts.temp
local chapter_list = {}
local json = ""
local filesToDelete = {}
local function exists(file)
local ok, err, code = os.rename(file, file)
if not ok then
if code == 13 then -- Permission denied, but it exists
return true
end
end
return ok, err
end
local function useNewLoadfile()
for _, c in pairs(mp.get_property_native("command-list")) do
if c["name"] == "loadfile" then
for _, a in pairs(c["args"]) do
if a["name"] == "index" then
return true
end
end
end
end
end
--from ytdl_hook
local function time_to_secs(time_string)
local ret
local a, b, c = time_string:match("(%d+):(%d%d?):(%d%d)")
if a ~= nil then
ret = (a * 3600 + b * 60 + c)
else
a, b = time_string:match("(%d%d?):(%d%d)")
if a ~= nil then
ret = (a * 60 + b)
end
end
return ret
end
local function extract_chapters(data, video_length)
local ret = {}
for line in data:gmatch("[^\r\n]+") do
local time = time_to_secs(line)
if time and (time < video_length) then
table.insert(ret, { time = time, title = line })
end
end
table.sort(ret, function(a, b) return a.time < b.time end)
return ret
end
local function chapters()
if json.chapters then
for i = 1, #json.chapters do
local chapter = json.chapters[i]
local title = chapter.title or ""
if title == "" then
title = string.format('Chapter %02d', i)
end
table.insert(chapter_list, { time = chapter.start_time, title = title })
end
elseif not (json.description == nil) and not (json.duration == nil) then
chapter_list = extract_chapters(json.description, json.duration)
end
end
--end ytdl_hook
local title = ""
local fVideo = ""
local fAudio = ""
local function load_files(dtitle, destination, audio, wait)
if wait then
if exists(destination .. ".mka") then
print("---wait success: found mka---")
audio = "audio-file=" .. destination .. '.mka,'
else
print("---could not find mka after wait, audio may be missing---")
end
end
-- if audio ~= "" then
-- table.insert(filesToDelete, destination .. ".mka")
-- end
-- table.insert(filesToDelete, destination .. ".mkv")
dtitle = dtitle:gsub("-" .. ("[%w_-]"):rep(11) .. "$", "")
dtitle = dtitle:gsub("^" .. ("%d"):rep(10) .. "%-", "")
if useNewLoadfile() then
mp.commandv("loadfile", destination .. ".mkv", "append", -1,
audio .. 'force-media-title="' .. dtitle .. '",demuxer-max-back-bytes=1MiB,demuxer-max-bytes=3MiB,ytdl=no')
else
mp.commandv("loadfile", destination .. ".mkv", "append",
audio .. 'force-media-title="' .. dtitle .. '",demuxer-max-back-bytes=1MiB,demuxer-max-bytes=3MiB,ytdl=no') --,sub-file="..destination..".en.vtt") --in case they are not set up to autoload
end
mp.commandv("playlist_move", mp.get_property("playlist-count") - 1, nextIndex)
mp.commandv("playlist_remove", nextIndex + 1)
caught = true
title = ""
-- pop = true
end
local listenID = ""
local function listener(event)
if not caught and event.prefix == mp.get_script_name() and string.find(event.text, listenID) then
local destination = string.match(event.text, "%[download%] Destination: (.+).mkv") or
string.match(event.text, "%[download%] (.+).mkv has already been downloaded")
-- if destination then print("---"..cachePath) end;
if destination and string.find(destination, string.gsub(cachePath, '~/', '')) then
-- print(listenID)
mp.unregister_event(listener)
_, title = utils.split_path(destination)
local audio = ""
if fAudio == "" then
load_files(title, destination, audio, false)
else
if exists(destination .. ".mka") then
audio = "audio-file=" .. destination .. '.mka,'
load_files(title, destination, audio, false)
else
print("---expected mka but could not find it, waiting for 2 seconds---")
mp.add_timeout(2, function()
load_files(title, destination, audio, true)
end)
end
end
end
end
end
--from ytdl_hook
mp.add_hook("on_preloaded", 10, function()
if string.find(mp.get_property("path"), cachePath) then
chapters()
if next(chapter_list) ~= nil then
mp.set_property_native("chapter-list", chapter_list)
chapter_list = {}
json = ""
end
end
end)
--end ytdl_hook
function dump(o)
if type(o) == 'table' then
local s = '{ '
for k, v in pairs(o) do
if type(k) ~= 'number' then k = '"' .. k .. '"' end
s = s .. '[' .. k .. '] = ' .. dump(v) .. ','
end
return s .. '} '
else
return tostring(o)
end
end
local function addOPTS(old)
for k, v in pairs(additionalOpts) do
-- print(k)
if string.find(v, "%s") then
for l, w in string.gmatch(v, "([-%w]+) (.+)") do
table.insert(old, l)
table.insert(old, w)
end
else
table.insert(old, v)
end
end
-- print(dump(old))
return old
end
local AudioDownloadHandle = {}
local VideoDownloadHandle = {}
local JsonDownloadHandle = {}
local function download_files(id, success, result, error)
if result.killed_by_us then
return
end
local jfile = cachePath .. "/" .. id .. ".json"
local jfileIO = io.open(jfile, "w")
jfileIO:write(result.stdout)
jfileIO:close()
json = utils.parse_json(result.stdout)
-- print(dump(json))
if json.requested_downloads[1].requested_formats ~= nil then
local args = { ytdl, "--no-continue", "-q", "-f", fAudio, "--restrict-filenames", "--no-playlist", "--no-part",
"-o", cachePath .. "/" .. id .. "-%(title)s-%(id)s.mka", "--load-info-json", jfile }
args = addOPTS(args)
AudioDownloadHandle = mp.command_native_async({
name = "subprocess",
args = args,
playback_only = false
}, function()
end)
else
fAudio = ""
fVideo = fVideo:gsub("bestvideo", "best")
fVideo = fVideo:gsub("bv", "best")
end
local args = { ytdl, "--no-continue", "-f", fVideo .. '/best', "--restrict-filenames", "--no-playlist",
"--no-part", "-o", cachePath .. "/" .. id .. "-%(title)s-%(id)s.mkv", "--load-info-json", jfile }
args = addOPTS(args)
VideoDownloadHandle = mp.command_native_async({
name = "subprocess",
args = args,
playback_only = false
}, function()
end)
end
local function DL()
local index = tonumber(mp.get_property("playlist-pos"))
if mp.get_property("playlist/" .. index .. "/filename"):find("/videos$") and mp.get_property("playlist/" .. index + 1 .. "/filename"):find("/shorts$") then
return
end
if tonumber(mp.get_property("playlist-pos-1")) > 0 and mp.get_property("playlist-pos-1") ~= mp.get_property("playlist-count") then
nextIndex = index + 1
local nextFile = mp.get_property("playlist/" .. nextIndex .. "/filename")
if nextFile and caught and nextFile:find("://", 0, false) then
caught = false
mp.enable_messages("info")
mp.register_event("log-message", listener)
local ytFormat = mp.get_property("ytdl-format")
fVideo = string.match(ytFormat, '(.+)%+.+//?') or 'bestvideo'
fAudio = string.match(ytFormat, '.+%+(.+)//?') or 'bestaudio'
-- print("start"..nextFile)
listenID = tostring(os.time())
local args = { ytdl, "--dump-single-json", "--no-simulate", "--skip-download",
"--restrict-filenames",
"--no-playlist", "--sub-lang", "en", "--write-sub", "--no-part", "-o",
cachePath .. "/" .. listenID .. "-%(title)s-%(id)s.%(ext)s", nextFile }
args = addOPTS(args)
-- print(dump(args))
table.insert(filesToDelete, listenID)
JsonDownloadHandle = mp.command_native_async({
name = "subprocess",
args = args,
capture_stdout = true,
capture_stderr = true,
playback_only = false
}, function(...)
download_files(listenID, ...)
end)
end
end
end
local function clearCache()
-- print(pop)
--if pop == true then
mp.abort_async_command(AudioDownloadHandle)
mp.abort_async_command(VideoDownloadHandle)
mp.abort_async_command(JsonDownloadHandle)
-- for k, v in pairs(filesToDelete) do
-- print("remove: " .. v)
-- os.remove(v)
-- end
local ftd = io.open(cachePath .. "/temp.files", "a")
for k, v in pairs(filesToDelete) do
ftd:write(v .. "\n")
if package.config:sub(1, 1) ~= '/' then
os.execute('del /Q /F "' .. cachePath .. "\\" .. v .. '*"')
else
os.execute('rm -f ' .. cachePath .. "/" .. v .. "*")
end
end
ftd:close()
print('clear')
mp.command("quit")
--end
end
mp.add_hook("on_unload", 50, function()
-- mp.abort_async_command(AudioDownloadHandle)
-- mp.abort_async_command(VideoDownloadHandle)
mp.abort_async_command(JsonDownloadHandle)
mp.unregister_event(listener)
caught = true
listenID = "resetYtdlPreloadListener"
-- print(listenID)
end)
local skipInitial
mp.observe_property("playlist-count", "number", function()
if skipInitial then
DL()
else
skipInitial = true
end
end)
--from ytdl_hook
local platform_is_windows = (package.config:sub(1, 1) == "\\")
local o = {
exclude = "",
try_ytdl_first = false,
use_manifests = false,
all_formats = false,
force_all_formats = true,
ytdl_path = "",
}
local paths_to_search = { "yt-dlp", "yt-dlp_x86", "youtube-dl" }
--local options = require 'mp.options'
options.read_options(o, "ytdl_hook")
local separator = platform_is_windows and ";" or ":"
if o.ytdl_path:match("[^" .. separator .. "]") then
paths_to_search = {}
for path in o.ytdl_path:gmatch("[^" .. separator .. "]+") do
table.insert(paths_to_search, path)
end
end
local function exec(args)
local ret = mp.command_native({
name = "subprocess",
args = args,
capture_stdout = true,
capture_stderr = true
})
return ret.status, ret.stdout, ret, ret.killed_by_us
end
local msg = require 'mp.msg'
local command = {}
for _, path in pairs(paths_to_search) do
-- search for youtube-dl in mpv's config dir
local exesuf = platform_is_windows and ".exe" or ""
local ytdl_cmd = mp.find_config_file(path .. exesuf)
if ytdl_cmd then
msg.verbose("Found youtube-dl at: " .. ytdl_cmd)
ytdl = ytdl_cmd
break
else
msg.verbose("No youtube-dl found with path " .. path .. exesuf .. " in config directories")
--search in PATH
command[1] = path
es, json, result, aborted = exec(command)
if result.error_string == "init" then
msg.verbose("youtube-dl with path " .. path .. exesuf .. " not found in PATH or not enough permissions")
else
msg.verbose("Found youtube-dl with path " .. path .. exesuf .. " in PATH")
ytdl = path
break
end
end
end
--end ytdl_hook
mp.register_event("start-file", DL)
mp.register_event("shutdown", clearCache)
local ftd = io.open(cachePath .. "/temp.files", "r")
while ftd ~= nil do
local line = ftd:read()
if line == nil or line == "" then
ftd:close()
io.open(cachePath .. "/temp.files", "w"):close()
break
end
-- print("DEL::"..line)
if package.config:sub(1, 1) ~= '/' then
os.execute('del /Q /F "' .. cachePath .. "\\" .. line .. '*" >nul 2>nul')
else
os.execute('rm -f ' .. cachePath .. "/" .. line .. "* &> /dev/null")
end
end

View File

@@ -0,0 +1,405 @@
----------------------
-- #example ytdl_preload.conf
-- # make sure lines do not have trailing whitespace
-- # ytdl_opt has no sanity check and should be formatted exactly how it would appear in yt-dlp CLI, they are split into a key/value pair on whitespace
-- # at least on Windows, do not escape '\' in temp, just us a single one for each divider
-- #temp=R:\ytdltest
-- #ytdl_opt1=-r 50k
-- #ytdl_opt2=-N 5
-- #ytdl_opt#=etc
----------------------
local nextIndex
local caught = true
-- local pop = false
local ytdl = "yt-dlp"
local utils = require 'mp.utils'
local options = require 'mp.options'
local opts = {
temp = "/tmp/ytdl-preload",
ytdl_opt1 = "",
ytdl_opt2 = "",
ytdl_opt3 = "",
ytdl_opt4 = "",
ytdl_opt5 = "",
ytdl_opt6 = "",
ytdl_opt7 = "",
ytdl_opt8 = "",
ytdl_opt9 = "",
}
options.read_options(opts, "ytdl_preload")
local additionalOpts = {}
for k, v in pairs(opts) do
if k:find("ytdl_opt%d") and v ~= "" then
additionalOpts[k] = v
-- print("entry")
-- print(k .. v)
end
end
local cachePath = opts.temp
local chapter_list = {}
local json = ""
local filesToDelete = {}
local function exists(file)
local ok, err, code = os.rename(file, file)
if not ok then
if code == 13 then -- Permission denied, but it exists
return true
end
end
return ok, err
end
local function useNewLoadfile()
for _, c in pairs(mp.get_property_native("command-list")) do
if c["name"] == "loadfile" then
for _, a in pairs(c["args"]) do
if a["name"] == "index" then
return true
end
end
end
end
end
--from ytdl_hook
local function time_to_secs(time_string)
local ret
local a, b, c = time_string:match("(%d+):(%d%d?):(%d%d)")
if a ~= nil then
ret = (a * 3600 + b * 60 + c)
else
a, b = time_string:match("(%d%d?):(%d%d)")
if a ~= nil then
ret = (a * 60 + b)
end
end
return ret
end
local function extract_chapters(data, video_length)
local ret = {}
for line in data:gmatch("[^\r\n]+") do
local time = time_to_secs(line)
if time and (time < video_length) then
table.insert(ret, { time = time, title = line })
end
end
table.sort(ret, function(a, b) return a.time < b.time end)
return ret
end
local function chapters()
if json.chapters then
for i = 1, #json.chapters do
local chapter = json.chapters[i]
local title = chapter.title or ""
if title == "" then
title = string.format('Chapter %02d', i)
end
table.insert(chapter_list, { time = chapter.start_time, title = title })
end
elseif not (json.description == nil) and not (json.duration == nil) then
chapter_list = extract_chapters(json.description, json.duration)
end
end
--end ytdl_hook
local title = ""
local fVideo = ""
local fAudio = ""
local function load_files(dtitle, destination, audio, wait)
if wait then
if exists(destination .. ".mka") then
print("---wait success: found mka---")
audio = "audio-file=" .. destination .. '.mka,'
else
print("---could not find mka after wait, audio may be missing---")
end
end
-- if audio ~= "" then
-- table.insert(filesToDelete, destination .. ".mka")
-- end
-- table.insert(filesToDelete, destination .. ".mkv")
dtitle = dtitle:gsub("-" .. ("[%w_-]"):rep(11) .. "$", "")
dtitle = dtitle:gsub("^" .. ("%d"):rep(10) .. "%-", "")
if useNewLoadfile() then
mp.commandv("loadfile", destination .. ".mkv", "append", -1,
audio .. 'force-media-title="' .. dtitle .. '",demuxer-max-back-bytes=1MiB,demuxer-max-bytes=3MiB,ytdl=no')
else
mp.commandv("loadfile", destination .. ".mkv", "append",
audio .. 'force-media-title="' .. dtitle .. '",demuxer-max-back-bytes=1MiB,demuxer-max-bytes=3MiB,ytdl=no') --,sub-file="..destination..".en.vtt") --in case they are not set up to autoload
end
mp.commandv("playlist_move", mp.get_property("playlist-count") - 1, nextIndex)
mp.commandv("playlist_remove", nextIndex + 1)
caught = true
title = ""
-- pop = true
end
local listenID = ""
local function listener(event)
if not caught and event.prefix == mp.get_script_name() and string.find(event.text, listenID) then
local destination = string.match(event.text, "%[download%] Destination: (.+).mkv") or
string.match(event.text, "%[download%] (.+).mkv has already been downloaded")
-- if destination then print("---"..cachePath) end;
if destination and string.find(destination, string.gsub(cachePath, '~/', '')) then
-- print(listenID)
mp.unregister_event(listener)
_, title = utils.split_path(destination)
local audio = ""
if fAudio == "" then
load_files(title, destination, audio, false)
else
if exists(destination .. ".mka") then
audio = "audio-file=" .. destination .. '.mka,'
load_files(title, destination, audio, false)
else
print("---expected mka but could not find it, waiting for 2 seconds---")
mp.add_timeout(2, function()
load_files(title, destination, audio, true)
end)
end
end
end
end
end
--from ytdl_hook
mp.add_hook("on_preloaded", 10, function()
if string.find(mp.get_property("path"), cachePath) then
chapters()
if next(chapter_list) ~= nil then
mp.set_property_native("chapter-list", chapter_list)
chapter_list = {}
json = ""
end
end
end)
--end ytdl_hook
function dump(o)
if type(o) == 'table' then
local s = '{ '
for k, v in pairs(o) do
if type(k) ~= 'number' then k = '"' .. k .. '"' end
s = s .. '[' .. k .. '] = ' .. dump(v) .. ','
end
return s .. '} '
else
return tostring(o)
end
end
local function addOPTS(old)
for k, v in pairs(additionalOpts) do
-- print(k)
if string.find(v, "%s") then
for l, w in string.gmatch(v, "([-%w]+) (.+)") do
table.insert(old, l)
table.insert(old, w)
end
else
table.insert(old, v)
end
end
-- print(dump(old))
return old
end
local AudioDownloadHandle = {}
local VideoDownloadHandle = {}
local JsonDownloadHandle = {}
local function download_files(id, success, result, error)
if result.killed_by_us then
return
end
local jfile = cachePath .. "/" .. id .. ".json"
local jfileIO = io.open(jfile, "w")
jfileIO:write(result.stdout)
jfileIO:close()
json = utils.parse_json(result.stdout)
-- print(dump(json))
if json.requested_downloads[1].requested_formats ~= nil then
local args = { ytdl, "--no-continue", "-q", "-f", fAudio, "--restrict-filenames", "--no-playlist", "--no-part",
"-o", cachePath .. "/" .. id .. "-%(title)s-%(id)s.mka", "--load-info-json", jfile }
args = addOPTS(args)
AudioDownloadHandle = mp.command_native_async({
name = "subprocess",
args = args,
playback_only = false
}, function()
end)
else
fAudio = ""
fVideo = fVideo:gsub("bestvideo", "best")
fVideo = fVideo:gsub("bv", "best")
end
local args = { ytdl, "--no-continue", "-f", fVideo .. '/best', "--restrict-filenames", "--no-playlist",
"--no-part", "-o", cachePath .. "/" .. id .. "-%(title)s-%(id)s.mkv", "--load-info-json", jfile }
args = addOPTS(args)
VideoDownloadHandle = mp.command_native_async({
name = "subprocess",
args = args,
playback_only = false
}, function()
end)
end
local function DL()
local index = tonumber(mp.get_property("playlist-pos"))
if mp.get_property("playlist/" .. index .. "/filename"):find("/videos$") and mp.get_property("playlist/" .. index + 1 .. "/filename"):find("/shorts$") then
return
end
if tonumber(mp.get_property("playlist-pos-1")) > 0 and mp.get_property("playlist-pos-1") ~= mp.get_property("playlist-count") then
nextIndex = index + 1
local nextFile = mp.get_property("playlist/" .. nextIndex .. "/filename")
if nextFile and caught and nextFile:find("://", 0, false) then
caught = false
mp.enable_messages("info")
mp.register_event("log-message", listener)
local ytFormat = mp.get_property("ytdl-format")
fVideo = string.match(ytFormat, '(.+)%+.+//?') or 'bestvideo'
fAudio = string.match(ytFormat, '.+%+(.+)//?') or 'bestaudio'
-- print("start"..nextFile)
listenID = tostring(os.time())
local args = { ytdl, "--dump-single-json", "--no-simulate", "--skip-download",
"--restrict-filenames",
"--no-playlist", "--sub-lang", "en", "--write-sub", "--no-part", "-o",
cachePath .. "/" .. listenID .. "-%(title)s-%(id)s.%(ext)s", nextFile }
args = addOPTS(args)
-- print(dump(args))
table.insert(filesToDelete, listenID)
JsonDownloadHandle = mp.command_native_async({
name = "subprocess",
args = args,
capture_stdout = true,
capture_stderr = true,
playback_only = false
}, function(...)
download_files(listenID, ...)
end)
end
end
end
local function clearCache()
-- print(pop)
--if pop == true then
mp.abort_async_command(AudioDownloadHandle)
mp.abort_async_command(VideoDownloadHandle)
mp.abort_async_command(JsonDownloadHandle)
-- for k, v in pairs(filesToDelete) do
-- print("remove: " .. v)
-- os.remove(v)
-- end
local ftd = io.open(cachePath .. "/temp.files", "a")
for k, v in pairs(filesToDelete) do
ftd:write(v .. "\n")
if package.config:sub(1, 1) ~= '/' then
os.execute('del /Q /F "' .. cachePath .. "\\" .. v .. '*"')
else
os.execute('rm -f ' .. cachePath .. "/" .. v .. "*")
end
end
ftd:close()
print('clear')
mp.command("quit")
--end
end
mp.add_hook("on_unload", 50, function()
-- mp.abort_async_command(AudioDownloadHandle)
-- mp.abort_async_command(VideoDownloadHandle)
mp.abort_async_command(JsonDownloadHandle)
mp.unregister_event(listener)
caught = true
listenID = "resetYtdlPreloadListener"
-- print(listenID)
end)
local skipInitial
mp.observe_property("playlist-count", "number", function()
if skipInitial then
DL()
else
skipInitial = true
end
end)
--from ytdl_hook
local platform_is_windows = (package.config:sub(1, 1) == "\\")
local o = {
exclude = "",
try_ytdl_first = false,
use_manifests = false,
all_formats = false,
force_all_formats = true,
ytdl_path = "",
}
local paths_to_search = { "yt-dlp", "yt-dlp_x86", "youtube-dl" }
--local options = require 'mp.options'
options.read_options(o, "ytdl_hook")
local separator = platform_is_windows and ";" or ":"
if o.ytdl_path:match("[^" .. separator .. "]") then
paths_to_search = {}
for path in o.ytdl_path:gmatch("[^" .. separator .. "]+") do
table.insert(paths_to_search, path)
end
end
local function exec(args)
local ret = mp.command_native({
name = "subprocess",
args = args,
capture_stdout = true,
capture_stderr = true
})
return ret.status, ret.stdout, ret, ret.killed_by_us
end
local msg = require 'mp.msg'
local command = {}
for _, path in pairs(paths_to_search) do
-- search for youtube-dl in mpv's config dir
local exesuf = platform_is_windows and ".exe" or ""
local ytdl_cmd = mp.find_config_file(path .. exesuf)
if ytdl_cmd then
msg.verbose("Found youtube-dl at: " .. ytdl_cmd)
ytdl = ytdl_cmd
break
else
msg.verbose("No youtube-dl found with path " .. path .. exesuf .. " in config directories")
--search in PATH
command[1] = path
es, json, result, aborted = exec(command)
if result.error_string == "init" then
msg.verbose("youtube-dl with path " .. path .. exesuf .. " not found in PATH or not enough permissions")
else
msg.verbose("Found youtube-dl with path " .. path .. exesuf .. " in PATH")
ytdl = path
break
end
end
end
--end ytdl_hook
mp.register_event("start-file", DL)
mp.register_event("shutdown", clearCache)
local ftd = io.open(cachePath .. "/temp.files", "r")
while ftd ~= nil do
local line = ftd:read()
if line == nil or line == "" then
ftd:close()
io.open(cachePath .. "/temp.files", "w"):close()
break
end
-- print("DEL::"..line)
if package.config:sub(1, 1) ~= '/' then
os.execute('del /Q /F "' .. cachePath .. "\\" .. line .. '*" >nul 2>nul')
else
os.execute('rm -f ' .. cachePath .. "/" .. line .. "* &> /dev/null")
end
end

View File

@@ -0,0 +1,405 @@
----------------------
-- #example ytdl_preload.conf
-- # make sure lines do not have trailing whitespace
-- # ytdl_opt has no sanity check and should be formatted exactly how it would appear in yt-dlp CLI, they are split into a key/value pair on whitespace
-- # at least on Windows, do not escape '\' in temp, just us a single one for each divider
-- #temp=R:\ytdltest
-- #ytdl_opt1=-r 50k
-- #ytdl_opt2=-N 5
-- #ytdl_opt#=etc
----------------------
local nextIndex
local caught = true
-- local pop = false
local ytdl = "yt-dlp"
local utils = require 'mp.utils'
local options = require 'mp.options'
local opts = {
temp = "/tmp/ytdl-preload",
ytdl_opt1 = "",
ytdl_opt2 = "",
ytdl_opt3 = "",
ytdl_opt4 = "",
ytdl_opt5 = "",
ytdl_opt6 = "",
ytdl_opt7 = "",
ytdl_opt8 = "",
ytdl_opt9 = "",
}
options.read_options(opts, "ytdl_preload")
local additionalOpts = {}
for k, v in pairs(opts) do
if k:find("ytdl_opt%d") and v ~= "" then
additionalOpts[k] = v
-- print("entry")
-- print(k .. v)
end
end
local cachePath = opts.temp
local chapter_list = {}
local json = ""
local filesToDelete = {}
local function exists(file)
local ok, err, code = os.rename(file, file)
if not ok then
if code == 13 then -- Permission denied, but it exists
return true
end
end
return ok, err
end
local function useNewLoadfile()
for _, c in pairs(mp.get_property_native("command-list")) do
if c["name"] == "loadfile" then
for _, a in pairs(c["args"]) do
if a["name"] == "index" then
return true
end
end
end
end
end
--from ytdl_hook
local function time_to_secs(time_string)
local ret
local a, b, c = time_string:match("(%d+):(%d%d?):(%d%d)")
if a ~= nil then
ret = (a * 3600 + b * 60 + c)
else
a, b = time_string:match("(%d%d?):(%d%d)")
if a ~= nil then
ret = (a * 60 + b)
end
end
return ret
end
local function extract_chapters(data, video_length)
local ret = {}
for line in data:gmatch("[^\r\n]+") do
local time = time_to_secs(line)
if time and (time < video_length) then
table.insert(ret, { time = time, title = line })
end
end
table.sort(ret, function(a, b) return a.time < b.time end)
return ret
end
local function chapters()
if json.chapters then
for i = 1, #json.chapters do
local chapter = json.chapters[i]
local title = chapter.title or ""
if title == "" then
title = string.format('Chapter %02d', i)
end
table.insert(chapter_list, { time = chapter.start_time, title = title })
end
elseif not (json.description == nil) and not (json.duration == nil) then
chapter_list = extract_chapters(json.description, json.duration)
end
end
--end ytdl_hook
local title = ""
local fVideo = ""
local fAudio = ""
local function load_files(dtitle, destination, audio, wait)
if wait then
if exists(destination .. ".mka") then
print("---wait success: found mka---")
audio = "audio-file=" .. destination .. '.mka,'
else
print("---could not find mka after wait, audio may be missing---")
end
end
-- if audio ~= "" then
-- table.insert(filesToDelete, destination .. ".mka")
-- end
-- table.insert(filesToDelete, destination .. ".mkv")
dtitle = dtitle:gsub("-" .. ("[%w_-]"):rep(11) .. "$", "")
dtitle = dtitle:gsub("^" .. ("%d"):rep(10) .. "%-", "")
if useNewLoadfile() then
mp.commandv("loadfile", destination .. ".mkv", "append", -1,
audio .. 'force-media-title="' .. dtitle .. '",demuxer-max-back-bytes=1MiB,demuxer-max-bytes=3MiB,ytdl=no')
else
mp.commandv("loadfile", destination .. ".mkv", "append",
audio .. 'force-media-title="' .. dtitle .. '",demuxer-max-back-bytes=1MiB,demuxer-max-bytes=3MiB,ytdl=no') --,sub-file="..destination..".en.vtt") --in case they are not set up to autoload
end
mp.commandv("playlist_move", mp.get_property("playlist-count") - 1, nextIndex)
mp.commandv("playlist_remove", nextIndex + 1)
caught = true
title = ""
-- pop = true
end
local listenID = ""
local function listener(event)
if not caught and event.prefix == mp.get_script_name() and string.find(event.text, listenID) then
local destination = string.match(event.text, "%[download%] Destination: (.+).mkv") or
string.match(event.text, "%[download%] (.+).mkv has already been downloaded")
-- if destination then print("---"..cachePath) end;
if destination and string.find(destination, string.gsub(cachePath, '~/', '')) then
-- print(listenID)
mp.unregister_event(listener)
_, title = utils.split_path(destination)
local audio = ""
if fAudio == "" then
load_files(title, destination, audio, false)
else
if exists(destination .. ".mka") then
audio = "audio-file=" .. destination .. '.mka,'
load_files(title, destination, audio, false)
else
print("---expected mka but could not find it, waiting for 2 seconds---")
mp.add_timeout(2, function()
load_files(title, destination, audio, true)
end)
end
end
end
end
end
--from ytdl_hook
mp.add_hook("on_preloaded", 10, function()
if string.find(mp.get_property("path"), cachePath) then
chapters()
if next(chapter_list) ~= nil then
mp.set_property_native("chapter-list", chapter_list)
chapter_list = {}
json = ""
end
end
end)
--end ytdl_hook
function dump(o)
if type(o) == 'table' then
local s = '{ '
for k, v in pairs(o) do
if type(k) ~= 'number' then k = '"' .. k .. '"' end
s = s .. '[' .. k .. '] = ' .. dump(v) .. ','
end
return s .. '} '
else
return tostring(o)
end
end
local function addOPTS(old)
for k, v in pairs(additionalOpts) do
-- print(k)
if string.find(v, "%s") then
for l, w in string.gmatch(v, "([-%w]+) (.+)") do
table.insert(old, l)
table.insert(old, w)
end
else
table.insert(old, v)
end
end
-- print(dump(old))
return old
end
local AudioDownloadHandle = {}
local VideoDownloadHandle = {}
local JsonDownloadHandle = {}
local function download_files(id, success, result, error)
if result.killed_by_us then
return
end
local jfile = cachePath .. "/" .. id .. ".json"
local jfileIO = io.open(jfile, "w")
jfileIO:write(result.stdout)
jfileIO:close()
json = utils.parse_json(result.stdout)
-- print(dump(json))
if json.requested_downloads[1].requested_formats ~= nil then
local args = { ytdl, "--no-continue", "-q", "-f", fAudio, "--restrict-filenames", "--no-playlist", "--no-part",
"-o", cachePath .. "/" .. id .. "-%(title)s-%(id)s.mka", "--load-info-json", jfile }
args = addOPTS(args)
AudioDownloadHandle = mp.command_native_async({
name = "subprocess",
args = args,
playback_only = false
}, function()
end)
else
fAudio = ""
fVideo = fVideo:gsub("bestvideo", "best")
fVideo = fVideo:gsub("bv", "best")
end
local args = { ytdl, "--no-continue", "-f", fVideo .. '/best', "--restrict-filenames", "--no-playlist",
"--no-part", "-o", cachePath .. "/" .. id .. "-%(title)s-%(id)s.mkv", "--load-info-json", jfile }
args = addOPTS(args)
VideoDownloadHandle = mp.command_native_async({
name = "subprocess",
args = args,
playback_only = false
}, function()
end)
end
local function DL()
local index = tonumber(mp.get_property("playlist-pos"))
if mp.get_property("playlist/" .. index .. "/filename"):find("/videos$") and mp.get_property("playlist/" .. index + 1 .. "/filename"):find("/shorts$") then
return
end
if tonumber(mp.get_property("playlist-pos-1")) > 0 and mp.get_property("playlist-pos-1") ~= mp.get_property("playlist-count") then
nextIndex = index + 1
local nextFile = mp.get_property("playlist/" .. nextIndex .. "/filename")
if nextFile and caught and nextFile:find("://", 0, false) then
caught = false
mp.enable_messages("info")
mp.register_event("log-message", listener)
local ytFormat = mp.get_property("ytdl-format")
fVideo = string.match(ytFormat, '(.+)%+.+//?') or 'bestvideo'
fAudio = string.match(ytFormat, '.+%+(.+)//?') or 'bestaudio'
-- print("start"..nextFile)
listenID = tostring(os.time())
local args = { ytdl, "--dump-single-json", "--no-simulate", "--skip-download",
"--restrict-filenames",
"--no-playlist", "--sub-lang", "en", "--write-sub", "--no-part", "-o",
cachePath .. "/" .. listenID .. "-%(title)s-%(id)s.%(ext)s", nextFile }
args = addOPTS(args)
-- print(dump(args))
table.insert(filesToDelete, listenID)
JsonDownloadHandle = mp.command_native_async({
name = "subprocess",
args = args,
capture_stdout = true,
capture_stderr = true,
playback_only = false
}, function(...)
download_files(listenID, ...)
end)
end
end
end
local function clearCache()
-- print(pop)
--if pop == true then
mp.abort_async_command(AudioDownloadHandle)
mp.abort_async_command(VideoDownloadHandle)
mp.abort_async_command(JsonDownloadHandle)
-- for k, v in pairs(filesToDelete) do
-- print("remove: " .. v)
-- os.remove(v)
-- end
local ftd = io.open(cachePath .. "/temp.files", "a")
for k, v in pairs(filesToDelete) do
ftd:write(v .. "\n")
if package.config:sub(1, 1) ~= '/' then
os.execute('del /Q /F "' .. cachePath .. "\\" .. v .. '*"')
else
os.execute('rm -f ' .. cachePath .. "/" .. v .. "*")
end
end
ftd:close()
print('clear')
mp.command("quit")
--end
end
mp.add_hook("on_unload", 50, function()
-- mp.abort_async_command(AudioDownloadHandle)
-- mp.abort_async_command(VideoDownloadHandle)
mp.abort_async_command(JsonDownloadHandle)
mp.unregister_event(listener)
caught = true
listenID = "resetYtdlPreloadListener"
-- print(listenID)
end)
local skipInitial
mp.observe_property("playlist-count", "number", function()
if skipInitial then
DL()
else
skipInitial = true
end
end)
--from ytdl_hook
local platform_is_windows = (package.config:sub(1, 1) == "\\")
local o = {
exclude = "",
try_ytdl_first = false,
use_manifests = false,
all_formats = false,
force_all_formats = true,
ytdl_path = "",
}
local paths_to_search = { "yt-dlp", "yt-dlp_x86", "youtube-dl" }
--local options = require 'mp.options'
options.read_options(o, "ytdl_hook")
local separator = platform_is_windows and ";" or ":"
if o.ytdl_path:match("[^" .. separator .. "]") then
paths_to_search = {}
for path in o.ytdl_path:gmatch("[^" .. separator .. "]+") do
table.insert(paths_to_search, path)
end
end
local function exec(args)
local ret = mp.command_native({
name = "subprocess",
args = args,
capture_stdout = true,
capture_stderr = true
})
return ret.status, ret.stdout, ret, ret.killed_by_us
end
local msg = require 'mp.msg'
local command = {}
for _, path in pairs(paths_to_search) do
-- search for youtube-dl in mpv's config dir
local exesuf = platform_is_windows and ".exe" or ""
local ytdl_cmd = mp.find_config_file(path .. exesuf)
if ytdl_cmd then
msg.verbose("Found youtube-dl at: " .. ytdl_cmd)
ytdl = ytdl_cmd
break
else
msg.verbose("No youtube-dl found with path " .. path .. exesuf .. " in config directories")
--search in PATH
command[1] = path
es, json, result, aborted = exec(command)
if result.error_string == "init" then
msg.verbose("youtube-dl with path " .. path .. exesuf .. " not found in PATH or not enough permissions")
else
msg.verbose("Found youtube-dl with path " .. path .. exesuf .. " in PATH")
ytdl = path
break
end
end
end
--end ytdl_hook
mp.register_event("start-file", DL)
mp.register_event("shutdown", clearCache)
local ftd = io.open(cachePath .. "/temp.files", "r")
while ftd ~= nil do
local line = ftd:read()
if line == nil or line == "" then
ftd:close()
io.open(cachePath .. "/temp.files", "w"):close()
break
end
-- print("DEL::"..line)
if package.config:sub(1, 1) ~= '/' then
os.execute('del /Q /F "' .. cachePath .. "\\" .. line .. '*" >nul 2>nul')
else
os.execute('rm -f ' .. cachePath .. "/" .. line .. "* &> /dev/null")
end
end

View File

@@ -300,14 +300,14 @@ return {
-- log_level = "TRACE",
},
extensions = {
mcphub = {
callback = "mcphub.extensions.codecompanion",
opts = {
show_result_in_chat = true, -- Show the mcp tool result in the chat buffer
make_vars = true, -- make chat #variables from MCP server resources
make_slash_commands = true, -- make /slash_commands from MCP server prompts
},
},
-- mcphub = {
-- callback = "mcphub.extensions.codecompanion",
-- opts = {
-- show_result_in_chat = true, -- Show the mcp tool result in the chat buffer
-- make_vars = true, -- make chat #variables from MCP server resources
-- make_slash_commands = true, -- make /slash_commands from MCP server prompts
-- },
-- },
},
memory = {
opts = {

View File

@@ -1,86 +0,0 @@
return {
"ravitemer/mcphub.nvim",
dependencies = {
"nvim-lua/plenary.nvim",
},
build = "npm install -g mcp-hub@latest", -- Installs `mcp-hub` node binary globally
config = function()
require("mcphub").setup({
--- `mcp-hub` binary related options-------------------
config = vim.fn.expand("~/.config/mcphub/servers.json"), -- Absolute path to MCP Servers config file (will create if not exists)
port = 37373, -- The port `mcp-hub` server listens to
shutdown_delay = 5 * 60 * 000, -- Delay in ms before shutting down the server when last instance closes (default: 5 minutes)
use_bundled_binary = false, -- Use local `mcp-hub` binary (set this to true when using build = "bundled_build.lua")
mcp_request_timeout = 60000, --Max time allowed for a MCP tool or resource to execute in milliseconds, set longer for long running tasks
global_env = {}, -- Global environment variables available to all MCP servers (can be a table or a function returning a table)
workspace = {
enabled = true, -- Enable project-local configuration files
look_for = { ".mcphub/servers.json", ".vscode/mcp.json", ".cursor/mcp.json" }, -- Files to look for when detecting project boundaries (VS Code format supported)
reload_on_dir_changed = true, -- Automatically switch hubs on DirChanged event
port_range = { min = 40000, max = 41000 }, -- Port range for generating unique workspace ports
get_port = nil, -- Optional function returning custom port number. Called when generating ports to allow custom port assignment logic
},
---Chat-plugin related options-----------------
auto_approve = false, -- Auto approve mcp tool calls
auto_toggle_mcp_servers = true, -- Let LLMs start and stop MCP servers automatically
extensions = {
avante = {
make_slash_commands = true, -- make /slash commands from MCP server prompts
},
},
--- Plugin specific options-------------------
native_servers = {}, -- add your custom lua native servers here
builtin_tools = {
edit_file = {
parser = {
track_issues = true,
extract_inline_content = true,
},
locator = {
fuzzy_threshold = 0.8,
enable_fuzzy_matching = true,
},
ui = {
go_to_origin_on_complete = true,
keybindings = {
accept = ".",
reject = ",",
next = "n",
prev = "p",
accept_all = "ga",
reject_all = "gr",
},
},
},
},
ui = {
window = {
width = 0.8, -- 0-1 (ratio); "50%" (percentage); 50 (raw number)
height = 0.8, -- 0-1 (ratio); "50%" (percentage); 50 (raw number)
align = "center", -- "center", "top-left", "top-right", "bottom-left", "bottom-right", "top", "bottom", "left", "right"
relative = "editor",
zindex = 50,
border = "rounded", -- "none", "single", "double", "rounded", "solid", "shadow"
},
wo = { -- window-scoped options (vim.wo)
winhl = "Normal:MCPHubNormal,FloatBorder:MCPHubBorder",
},
},
json_decode = nil, -- Custom JSON parser function (e.g., require('json5').parse for JSON5 support)
on_ready = function(hub)
-- Called when hub is ready
end,
on_error = function(err)
-- Called on errors
end,
log = {
level = vim.log.levels.WARN,
to_file = false,
file_path = nil,
prefix = "MCPHub",
},
})
end,
}

2
.gitmodules vendored
View File

@@ -9,7 +9,7 @@
url = git@github.com:Samillion/ModernZ.git
[submodule ".config/mpv-modules/ytdl-preload"]
path = .config/mpv-modules/ytdl-preload
url = git@gist.github.com:17d90e3deeb35b5f75e55adb19098f58.git
url = git@gist.github.com:484107d131ba0abd878c5a9489d9b664.git
[submodule ".config/mpv-modules/mpv-anilist-updater"]
path = .config/mpv-modules/mpv-anilist-updater
url = git@github.com:AzuredBlue/mpv-anilist-updater.git

View File

@@ -0,0 +1,515 @@
#!/usr/bin/env python3
"""Record microphone audio and transcribe it with whisper.cpp."""
from __future__ import annotations
import argparse
import os
import re
import shutil
import signal
import subprocess
import sys
import tempfile
import threading
import time
import wave
from pathlib import Path
import numpy as np
import sounddevice as sd
DEFAULT_MODEL = "~/models/whisper.cpp/ggml-small.bin"
DEFAULT_DURATION = 8.0
DEFAULT_STATE_DIR = Path.home() / ".cache" / "whisper-record-toggle"
APP_NAME = "Whisper Record"
class Notifier:
"""Best-effort desktop notifications with optional live updates."""
def __init__(self) -> None:
self.enabled = shutil.which("notify-send") is not None
self.notification_id: str | None = None
def send(self, title: str, body: str, timeout_ms: int = 1500) -> None:
if not self.enabled:
return
base_cmd = [
"notify-send",
"-a",
APP_NAME,
"-u",
"normal",
"-t",
str(timeout_ms),
]
if self.notification_id:
base_cmd.extend(["-r", self.notification_id])
# Prefer -p so we can reuse the same ID for replacement updates.
result = subprocess.run(
[*base_cmd, "-p", title, body], capture_output=True, text=True
)
if result.returncode != 0:
# Fallback for environments where -p is unsupported.
subprocess.run([*base_cmd, title, body], capture_output=True, text=True)
return
notification_id = result.stdout.strip().splitlines()[-1].strip()
if notification_id.isdigit():
self.notification_id = notification_id
class Recorder:
"""Stream microphone audio into memory while tracking elapsed time."""
def __init__(self, samplerate: int, channels: int) -> None:
self.samplerate = samplerate
self.channels = channels
self.frames: list[np.ndarray] = []
def _callback(self, indata: np.ndarray, _frames: int, _time, status) -> None:
if status:
print(f"sounddevice warning: {status}", file=sys.stderr)
self.frames.append(indata.copy())
def record(
self, duration: float | None, notifier: Notifier, interval: float
) -> np.ndarray:
start = time.monotonic()
last_update = 0.0
with sd.InputStream(
samplerate=self.samplerate,
channels=self.channels,
dtype="int16",
callback=self._callback,
):
while True:
elapsed = time.monotonic() - start
if elapsed - last_update >= interval:
timer = _format_seconds(elapsed)
if duration is not None:
notifier.send(
"Recording", f"{timer} / {_format_seconds(duration)}"
)
else:
notifier.send(
"Recording", f"Elapsed: {timer} (press keybind again)"
)
last_update = elapsed
if duration is not None and elapsed >= duration:
break
time.sleep(0.05)
if not self.frames:
raise RuntimeError(
"No audio captured. Check your input device and permissions."
)
return np.concatenate(self.frames, axis=0)
def _format_seconds(value: float) -> str:
total = int(value)
minutes, seconds = divmod(total, 60)
return f"{minutes:02d}:{seconds:02d}"
def find_whisper_binary(explicit: str | None) -> str:
if explicit:
return explicit
for candidate in ("whisper-cli", "main", "whisper"):
path = shutil.which(candidate)
if path:
return path
raise RuntimeError(
"Could not find whisper.cpp binary. Pass --whisper-bin /path/to/whisper-cli"
)
def write_wav(path: Path, audio: np.ndarray, samplerate: int, channels: int) -> None:
with wave.open(str(path), "wb") as wav_file:
wav_file.setnchannels(channels)
wav_file.setsampwidth(2)
wav_file.setframerate(samplerate)
wav_file.writeframes(audio.tobytes())
def transcribe(whisper_bin: str, model: str, wav_path: Path, notifier: Notifier) -> str:
with tempfile.TemporaryDirectory(prefix="whisper-out-") as out_dir:
out_base = Path(out_dir) / "transcript"
cmd = [
whisper_bin,
"-m",
model,
"-f",
str(wav_path),
"-otxt",
"-of",
str(out_base),
"-nt",
]
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
output_lines: list[str] = []
progress: dict[str, int | None] = {"pct": None}
def _reader() -> None:
assert process.stdout is not None
for line in process.stdout:
output_lines.append(line)
match = re.search(r"(?<!\d)(\d{1,3})%", line)
if match:
progress["pct"] = min(100, int(match.group(1)))
reader = threading.Thread(target=_reader, daemon=True)
reader.start()
spinner = "|/-\\"
frame = 0
while process.poll() is None:
pct = progress["pct"]
status = (
f"Transcribing... {pct}%"
if pct is not None
else f"Transcribing... {spinner[frame % len(spinner)]}"
)
notifier.send("Transcribing", status, timeout_ms=1200)
print(f"\r{status}", end="", file=sys.stderr, flush=True)
frame += 1
time.sleep(0.35)
reader.join(timeout=1.0)
print("\r" + (" " * 48) + "\r", end="", file=sys.stderr, flush=True)
result_stdout = "".join(output_lines).strip()
if process.returncode != 0:
stderr = result_stdout
raise RuntimeError(f"whisper.cpp failed: {stderr}")
txt_file = out_base.with_suffix(".txt")
if txt_file.exists():
return txt_file.read_text(encoding="utf-8").strip()
fallback = result_stdout
if fallback:
return fallback
raise RuntimeError("Transcription finished but no output text was produced.")
def _type_with_tool(text: str) -> None:
if shutil.which("wtype"):
subprocess.run(["wtype", text], check=True)
return
if shutil.which("ydotool"):
subprocess.run(["ydotool", "type", "--", text], check=True)
return
if shutil.which("xdotool"):
subprocess.run(["xdotool", "type", "--clearmodifiers", "--", text], check=True)
return
raise RuntimeError("No typing tool found. Install one of: wtype, ydotool, xdotool.")
def _emit_text(text: str, args: argparse.Namespace, notifier: Notifier) -> int:
if args.output == "print":
print(text)
notifier.send("Done", "Transcription printed to terminal", timeout_ms=1500)
return 0
try:
_type_with_tool(text)
except Exception as exc:
print(f"Failed to simulate typing: {exc}", file=sys.stderr)
notifier.send("Typing error", str(exc), timeout_ms=2500)
return 1
notifier.send("Done", "Transcription typed into active window", timeout_ms=1500)
return 0
def _read_pid(pid_file: Path) -> int | None:
if not pid_file.exists():
return None
try:
return int(pid_file.read_text(encoding="utf-8").strip())
except ValueError:
return None
def _is_alive(pid: int | None) -> bool:
if pid is None:
return False
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
def _run_transcription_job(args: argparse.Namespace, duration: float | None) -> str:
notifier = Notifier()
model_path = Path(args.model).expanduser()
if not model_path.exists():
raise RuntimeError(f"Model file not found: {model_path}")
whisper_bin = find_whisper_binary(args.whisper_bin)
notifier.send("Recording", "Starting...", timeout_ms=1200)
recorder = Recorder(samplerate=args.samplerate, channels=args.channels)
try:
audio = recorder.record(
duration=duration,
notifier=notifier,
interval=max(args.notify_interval, 0.2),
)
except KeyboardInterrupt:
if not recorder.frames:
notifier.send("Recording", "Cancelled", timeout_ms=1000)
return ""
audio = np.concatenate(recorder.frames, axis=0)
except Exception as exc:
raise RuntimeError(f"Recording failed: {exc}") from exc
with tempfile.TemporaryDirectory(prefix="whisper-audio-") as tmp_dir:
wav_path = Path(tmp_dir) / "input.wav"
write_wav(wav_path, audio, args.samplerate, args.channels)
notifier.send("Transcribing", "Running whisper.cpp...", timeout_ms=1500)
text = transcribe(whisper_bin, str(model_path), wav_path, notifier)
return text.strip()
def run_once(args: argparse.Namespace) -> int:
duration = None if args.duration <= 0 else args.duration
try:
text = _run_transcription_job(args, duration=duration)
except Exception as exc:
print(str(exc), file=sys.stderr)
Notifier().send("Transcription error", str(exc), timeout_ms=3000)
return 1
if not text:
print("(No speech detected)")
Notifier().send("Done", "No speech detected", timeout_ms=1500)
return 0
return _emit_text(text, args, Notifier())
def run_worker(args: argparse.Namespace) -> int:
state_dir = Path(args.state_dir)
state_dir.mkdir(parents=True, exist_ok=True)
pid_file = state_dir / "recording.pid"
transcript_file = state_dir / "transcript.txt"
error_file = state_dir / "error.txt"
pid_file.write_text(str(os.getpid()), encoding="utf-8")
if transcript_file.exists():
transcript_file.unlink()
if error_file.exists():
error_file.unlink()
try:
text = _run_transcription_job(args, duration=None)
transcript_file.write_text(text, encoding="utf-8")
except Exception as exc:
error_file.write_text(str(exc), encoding="utf-8")
return 1
finally:
if pid_file.exists():
pid_file.unlink()
return 0
def start_background(args: argparse.Namespace) -> int:
state_dir = Path(args.state_dir)
state_dir.mkdir(parents=True, exist_ok=True)
pid_file = state_dir / "recording.pid"
pid = _read_pid(pid_file)
if _is_alive(pid):
print("Recording is already running.")
return 0
cmd = [
sys.executable,
str(Path(__file__).resolve()),
"--mode",
"once",
"--worker",
"--model",
args.model,
"--samplerate",
str(args.samplerate),
"--channels",
str(args.channels),
"--notify-interval",
str(args.notify_interval),
"--state-dir",
str(state_dir),
]
if args.whisper_bin:
cmd.extend(["--whisper-bin", args.whisper_bin])
subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
Notifier().send(
"Recording", "Started (press keybind again to stop)", timeout_ms=1200
)
print("Recording started.")
return 0
def stop_background(args: argparse.Namespace) -> int:
state_dir = Path(args.state_dir)
pid_file = state_dir / "recording.pid"
transcript_file = state_dir / "transcript.txt"
error_file = state_dir / "error.txt"
pid = _read_pid(pid_file)
if not _is_alive(pid):
if pid_file.exists():
pid_file.unlink()
print("No active recording.")
return 1
assert pid is not None
os.kill(pid, signal.SIGINT)
Notifier().send("Recording", "Stopping...", timeout_ms=1200)
deadline = time.monotonic() + max(args.stop_timeout, 1.0)
while _is_alive(pid) and time.monotonic() < deadline:
time.sleep(0.1)
if _is_alive(pid):
print("Timed out waiting for transcription to finish.", file=sys.stderr)
return 1
if error_file.exists():
message = error_file.read_text(encoding="utf-8").strip()
error_file.unlink()
print(message or "Worker failed.", file=sys.stderr)
return 1
text = ""
if transcript_file.exists():
text = transcript_file.read_text(encoding="utf-8").strip()
transcript_file.unlink()
if not text:
print("(No speech detected)")
Notifier().send("Done", "No speech detected", timeout_ms=1500)
return 0
return _emit_text(text, args, Notifier())
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Record from microphone and transcribe with whisper.cpp"
)
parser.add_argument(
"--mode",
choices=("once", "start", "stop", "toggle"),
default="once",
help="once: record/transcribe immediately, start/stop: background toggle pieces, toggle: start if idle else stop",
)
parser.add_argument(
"--worker",
action="store_true",
help=argparse.SUPPRESS,
)
parser.add_argument(
"--model", default=DEFAULT_MODEL, help="Path to whisper.cpp model"
)
parser.add_argument(
"--whisper-bin",
default=None,
help="Path to whisper.cpp binary (default: auto-detect whisper-cli/main)",
)
parser.add_argument(
"--duration",
type=float,
default=DEFAULT_DURATION,
help="Recording length in seconds for --mode once (default: 8). Use 0 for manual stop.",
)
parser.add_argument(
"--samplerate",
type=int,
default=16000,
help="Input sample rate (default: 16000)",
)
parser.add_argument(
"--channels", type=int, default=1, help="Input channels (default: 1)"
)
parser.add_argument(
"--notify-interval",
type=float,
default=1.0,
help="Seconds between notification timer updates (default: 1.0)",
)
parser.add_argument(
"--output",
choices=("print", "type"),
default="print",
help="How to emit transcript text: print to terminal or type into active window",
)
parser.add_argument(
"--state-dir",
default=str(DEFAULT_STATE_DIR),
help="Directory to store toggle state files",
)
parser.add_argument(
"--stop-timeout",
type=float,
default=90.0,
help="Max seconds to wait for background transcription to finish on stop",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if args.worker:
return run_worker(args)
if args.mode == "once":
return run_once(args)
if args.mode == "start":
return start_background(args)
if args.mode == "stop":
return stop_background(args)
state_dir = Path(args.state_dir)
pid = _read_pid(state_dir / "recording.pid")
if _is_alive(pid):
return stop_background(args)
return start_background(args)
if __name__ == "__main__":
raise SystemExit(main())