jimaku-dl/src/jimaku_dl/downloader.py

784 lines
27 KiB
Python

#!/usr/bin/env python3
from logging import Logger, basicConfig, getLogger
from os import environ
from os.path import abspath, basename, dirname, exists, isdir, join, normpath, splitext
from re import IGNORECASE
from re import compile as re_compile
from re import search
from subprocess import CalledProcessError
from subprocess import run as subprocess_run
from typing import Any, Dict, List, Optional, Tuple, Union
from requests import get as requests_get
from requests import post as requests_post
class JimakuDownloader:
"""
Main class for downloading subtitles from Jimaku using the AniList API.
This class provides functionality to search for, select, and download
subtitles for anime media files or directories.
"""
# API endpoints
ANILIST_API_URL = "https://graphql.anilist.co"
JIMAKU_SEARCH_URL = "https://jimaku.cc/api/entries/search"
JIMAKU_FILES_BASE = "https://jimaku.cc/api/entries"
def __init__(self, api_token: Optional[str] = None, log_level: str = "INFO"):
"""
Initialize the JimakuDownloader with API token and logging configuration.
Parameters
----------
api_token : str, optional
Jimaku API token for authentication. If None, will try to get from JIMAKU_API_TOKEN env var
log_level : str, default="INFO"
Logging level to use (DEBUG, INFO, WARNING, ERROR, CRITICAL)
"""
# Set up logging
self.logger = self._setup_logging(log_level)
# Initialize API token
self.api_token = api_token or environ.get("JIMAKU_API_TOKEN", "")
if not self.api_token:
self.logger.warning(
"No API token provided. Will need to be set before downloading."
)
def _setup_logging(self, log_level: str) -> Logger:
"""
Configure logging with the specified level.
Parameters
----------
log_level : str
The desired log level (e.g. "INFO", "DEBUG", etc.)
Returns
-------
logger : logging.Logger
Configured logger instance
"""
import logging
numeric_level = getattr(logging, log_level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f"Invalid log level: {log_level}")
basicConfig(
level=numeric_level,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
return getLogger(__name__)
def is_directory_input(self, path: str) -> bool:
"""
Check if the input path is a directory.
Parameters
----------
path : str
Path to check
Returns
-------
bool
True if the path is a directory, False otherwise
"""
return isdir(path)
def parse_filename(self, filename: str) -> Tuple[str, int, int]:
"""
Extract show title, season, and episode number from the filename.
Parameters
----------
filename : str
The filename to parse
Returns
-------
tuple
(title, season, episode) where:
- title (str): Show title
- season (int): Season number
- episode (int): Episode number
"""
match = search(r"(.+?)[. _-]+[Ss](\d+)[Ee](\d+)", filename)
if match:
title = match.group(1).replace(".", " ").strip()
season = int(match.group(2))
episode = int(match.group(3))
return title, season, episode
else:
self.logger.warning("Could not parse filename automatically.")
title = input(
"Could not parse media title. Please enter show title: "
).strip()
try:
season = int(
input("Enter season number (or 0 if not applicable): ").strip()
or "0"
)
episode = int(
input("Enter episode number (or 0 if not applicable): ").strip()
or "0"
)
except ValueError:
self.logger.error("Invalid input.")
raise ValueError("Invalid season or episode number")
return title, season, episode
def parse_directory_name(self, dirname: str) -> Tuple[bool, str, int, int]:
"""
Extract show title from the directory name.
Parameters
----------
dirname : str
The directory name to parse
Returns
-------
tuple
(success, title, season, episode) where:
- success (bool): Whether a title could be extracted
- title (str): Show title extracted from directory name
- season (int): Defaults to 1
- episode (int): Defaults to 0 (indicating all episodes)
"""
# Clean up the directory name to use as the title
title = basename(dirname.rstrip("/"))
# Skip empty titles or obviously non-anime directories
if not title or title in [".", "..", "/"]:
self.logger.debug(f"Directory name '{title}' is not usable")
return False, "", 1, 0
# Skip common system directories
common_dirs = [
"bin",
"etc",
"lib",
"home",
"usr",
"var",
"tmp",
"opt",
"media",
"mnt",
]
if title.lower() in common_dirs:
self.logger.debug(
f"Directory name '{title}' is a common system directory, skipping"
)
return False, "", 1, 0
title = title.replace("_", " ").replace(".", " ").strip()
# Check if the title seems usable (at least 3 characters)
if len(title) < 3:
self.logger.debug(
f"Directory name '{title}' too short, likely not a show title"
)
return False, "", 1, 0
self.logger.debug(f"Parsed title from directory name: {title}")
# For directories, assume season 1 and episode 0 (indicating all episodes)
return True, title, 1, 0
def find_anime_title_in_path(self, path: str) -> Tuple[str, int, int]:
"""
Recursively search for an anime title in the path, trying parent directories
if necessary.
Parameters
----------
path : str
Starting directory path
Returns
-------
tuple
(title, season, episode) - anime title and defaults for season and episode
Raises
------
ValueError
If no suitable directory name is found up to root
"""
original_path = path
path = abspath(path)
while path and path != "/":
success, title, season, episode = self.parse_directory_name(path)
if success:
self.logger.debug(f"Found anime title '{title}' from directory: {path}")
return title, season, episode
# Try parent directory
self.logger.debug(f"No anime title in '{path}', trying parent directory")
parent_path = dirname(path)
# Check if we're stuck (parent is same as current)
if parent_path == path:
break
path = parent_path
# If we get here, we've reached root without finding a suitable title
self.logger.error(
f"Could not extract anime title from directory path: {original_path}"
)
self.logger.error("Please specify a directory with a recognizable anime name")
raise ValueError(f"Could not find anime title in path: {original_path}")
def load_cached_anilist_id(self, directory: str) -> Optional[int]:
"""
Look for a file named '.anilist.id' in the given directory and return the AniList ID.
Parameters
----------
directory : str
Path to the directory to search for cache file
Returns
-------
int or None
The cached AniList ID if found and valid, None otherwise
"""
cache_path = join(directory, ".anilist.id")
if exists(cache_path):
try:
with open(cache_path, "r", encoding="UTF-8") as f:
return int(f.read().strip())
except Exception:
self.logger.warning("Failed to read cached AniList ID.")
return None
return None
def save_anilist_id(self, directory: str, anilist_id: int) -> None:
"""
Save the AniList ID to a file named '.anilist.id' in the given directory.
Parameters
----------
directory : str
Path to the directory where the cache file should be saved
anilist_id : int
The AniList ID to cache
Returns
-------
None
"""
cache_path = join(directory, ".anilist.id")
try:
with open(cache_path, "w") as f:
f.write(str(anilist_id))
except Exception as e:
self.logger.warning(f"Could not save AniList cache file: {e}")
def query_anilist(self, title: str) -> int:
"""
Query AniList's GraphQL API for the given title and return its media ID.
Parameters
----------
title : str
The anime title to search for
Returns
-------
int
The AniList media ID for the title
Raises
------
ValueError
If no media is found or an error occurs with the API
"""
query = """
query ($search: String) {
Media(search: $search, type: ANIME) {
id
title {
romaji
english
native
}
}
}
"""
variables = {"search": title}
try:
self.logger.debug(f"Sending AniList query for title: {title}")
response = requests_post(
self.ANILIST_API_URL, json={"query": query, "variables": variables}
)
response.raise_for_status()
data = response.json()
self.logger.debug(f"AniList response: {data}")
media = data.get("data", {}).get("Media")
if media:
return media.get("id")
else:
self.logger.error("AniList: No media found for title.")
raise ValueError(f"No media found on AniList for title: {title}")
except Exception as e:
self.logger.error(f"Error querying AniList: {e}")
raise ValueError(f"Error querying AniList API: {str(e)}")
def query_jimaku_entries(self, anilist_id: int) -> List[Dict[str, Any]]:
"""
Query the Jimaku API to list available subtitle entries.
Parameters
----------
anilist_id : int
The AniList ID of the anime
Returns
-------
list
List of entry dictionaries containing subtitle metadata
Raises
------
ValueError
If no entries are found or an error occurs with the API
"""
if not self.api_token:
raise ValueError(
"API token is required. Set it in the constructor or JIMAKU_API_TOKEN env var."
)
params = {"anilist_id": anilist_id}
headers = {
"Authorization": f"{self.api_token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
try:
self.logger.debug(f"Querying Jimaku entries for AniList ID: {anilist_id}")
response = requests_get(
self.JIMAKU_SEARCH_URL, params=params, headers=headers
)
response.raise_for_status()
results = response.json()
self.logger.debug(f"Jimaku search response: {results}")
if not results:
self.logger.error("No subtitle entries found on Jimaku for this media.")
raise ValueError(
f"No subtitle entries found for AniList ID: {anilist_id}"
)
return results
except Exception as e:
self.logger.error(f"Error querying Jimaku API: {e}")
raise ValueError(f"Error querying Jimaku API: {str(e)}")
def get_entry_files(self, entry_id: Union[str, int]) -> List[Dict[str, Any]]:
"""
Retrieve file information for a given entry ID.
Parameters
----------
entry_id : str or int
The Jimaku entry ID to retrieve files for
Returns
-------
list
List of file info dictionaries
Raises
------
ValueError
If no files are found or an error occurs with the API
"""
if not self.api_token:
raise ValueError(
"API token is required. Set it in the constructor or JIMAKU_API_TOKEN env var."
)
url = f"{self.JIMAKU_FILES_BASE}/{entry_id}/files"
headers = {
"Authorization": f"{self.api_token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
try:
self.logger.debug(f"Querying files for entry ID: {entry_id}")
response = requests_get(url, headers=headers)
response.raise_for_status()
files = response.json()
self.logger.debug(f"Entry files response: {files}")
if not files:
self.logger.error("No files found for the selected entry.")
raise ValueError(f"No files found for entry ID: {entry_id}")
return files
except Exception as e:
self.logger.error(f"Error querying files for entry {entry_id}: {e}")
raise ValueError(f"Error retrieving files: {str(e)}")
def filter_files_by_episode(
self, files: List[Dict[str, Any]], target_episode: int
) -> List[Dict[str, Any]]:
"""
Filter subtitle files to only include those matching the target episode.
Parameters
----------
files : list
List of file info dictionaries
target_episode : int
Episode number to filter by
Returns
-------
list
Filtered list of file info dictionaries matching the target episode,
or all files if no matches are found
"""
filtered_files = []
# More flexible episode pattern that can detect various formats:
# - E01, e01, Ep01, EP01, episode01
# - Just the number: 01, 1
# - With separators: - 01, _01, .01
# Using word boundaries to avoid matching random numbers
episode_patterns = [
# Standard episode markers
re_compile(r"[Ee](?:p(?:isode)?)?[ ._-]*(\d+)", IGNORECASE),
# Just the number with word boundary or separator before it
re_compile(r"(?:^|\s|[._-])(\d+)(?:\s|$|[._-])", IGNORECASE),
# Number with hash
re_compile(r"#(\d+)", IGNORECASE),
]
# Check for keywords that indicate a file covers all episodes
all_episodes_keywords = ["all", "batch", "complete", "season", "full"]
for file_info in files:
filename = file_info.get("name", "").lower()
matched = False
# Try each pattern to find episode number
for pattern in episode_patterns:
matches = pattern.findall(filename)
for match in matches:
try:
file_episode = int(match)
if file_episode == target_episode:
filtered_files.append(file_info)
self.logger.debug(
f"Matched episode {target_episode} in: {filename}"
)
matched = True
break
except (ValueError, TypeError):
continue
if matched:
break
# If we didn't find a match but it might be a batch file
if not matched:
# Check if it seems to be a batch file that would include our episode
might_include_episode = any(
keyword in filename for keyword in all_episodes_keywords
)
if might_include_episode:
self.logger.debug(
f"Might include episode {target_episode} (batch): {filename}"
)
filtered_files.append(file_info)
if filtered_files:
self.logger.info(
f"Found {len(filtered_files)} files matching episode {target_episode}"
)
return filtered_files
else:
# If no matches found, return all files to avoid empty selection
self.logger.warning(
f"No files specifically matched episode {target_episode}, showing all options"
)
return files
def fzf_menu(
self, options: List[str], multi: bool = False
) -> Union[str, List[str], None]:
"""
Launch fzf with the provided options for selection.
Parameters
----------
options : list
List of strings to present as options
multi : bool, optional
Whether to enable multi-select mode (default: False)
Returns
-------
str or list or None
If multi=False: Selected option string or None if cancelled
If multi=True: List of selected option strings or empty list if cancelled
"""
try:
fzf_args = ["fzf", "--height=40%", "--border"]
if multi:
fzf_args.append("--multi")
self.logger.debug("Launching fzf multi-selection menu")
else:
self.logger.debug("Launching fzf single selection menu")
proc = subprocess_run(
fzf_args,
input="\n".join(options),
text=True,
capture_output=True,
check=True,
)
if multi:
return [
line.strip()
for line in proc.stdout.strip().split("\n")
if line.strip()
]
else:
return proc.stdout.strip()
except CalledProcessError:
self.logger.warning("User cancelled fzf selection")
return [] if multi else None
def download_file(self, url: str, dest_path: str) -> str:
"""
Download the file from the given URL and save it to dest_path.
Parameters
----------
url : str
URL to download the file from
dest_path : str
Path where the file should be saved
Returns
-------
str
Path where the file was saved
Raises
------
ValueError
If an error occurs during download
"""
try:
self.logger.debug(f"Downloading file from: {url}")
response = requests_get(url, stream=True)
response.raise_for_status()
with open(dest_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
self.logger.debug(f"File saved to: {dest_path}")
return dest_path
except Exception as e:
self.logger.error(f"Error downloading subtitle file: {e}")
raise ValueError(f"Error downloading file: {str(e)}")
def download_subtitles(
self, media_path: str, dest_dir: Optional[str] = None, play: bool = False
) -> List[str]:
"""
Download subtitles for the given media path.
This is the main entry point method that orchestrates the entire download process.
Parameters
----------
media_path : str
Path to the media file or directory
dest_dir : str, optional
Directory to save downloaded subtitles (default: same directory as media)
play : bool, default=False
Whether to launch MPV with the subtitles after download
Returns
-------
list
List of paths to downloaded subtitle files
Raises
------
ValueError
If media path doesn't exist or other errors occur
"""
if not exists(media_path):
raise ValueError(f"Path '{media_path}' does not exist")
self.logger.info("Starting subtitle search and download process")
# Check if input is a file or directory
is_directory = self.is_directory_input(media_path)
self.logger.info(
f"Processing {'directory' if is_directory else 'file'}: {media_path}"
)
# Set destination directory
if dest_dir:
dest_dir = dest_dir
else:
if is_directory:
dest_dir = media_path
else:
dest_dir = dirname(abspath(media_path))
self.logger.debug(f"Destination directory: {dest_dir}")
# Parse media information based on input type
if is_directory:
title, season, episode = self.find_anime_title_in_path(media_path)
media_dir = media_path
media_file = None # No specific file for directory input
self.logger.debug(
f"Found anime title '{title}' but will save subtitles to: {dest_dir}"
)
else:
base_filename = basename(media_path)
title, season, episode = self.parse_filename(base_filename)
media_dir = dirname(abspath(media_path))
media_file = media_path
self.logger.info(
f"Identified show: {title}, Season: {season}, Episode: {episode}"
)
# Get AniList ID (either from cache or by querying)
anilist_id = self.load_cached_anilist_id(media_dir)
if not anilist_id:
self.logger.info("Querying AniList for media ID...")
anilist_id = self.query_anilist(title)
self.logger.info(f"AniList ID for '{title}' is {anilist_id}")
self.save_anilist_id(media_dir, anilist_id)
else:
self.logger.info(f"Using cached AniList ID: {anilist_id}")
# Query Jimaku for available subtitle entries
self.logger.info("Querying Jimaku for subtitle entries...")
entries = self.query_jimaku_entries(anilist_id)
# Present entries in fzf for selection
entry_options = []
entry_mapping = {}
for i, entry in enumerate(entries, start=1):
opt = f"{i}. {entry.get('english_name', 'No Eng Name')} - {entry.get('japanese_name', 'None')}"
entry_options.append(opt)
entry_mapping[opt] = entry
# Sort entry options alphabetically
entry_options.sort()
self.logger.info("Select a subtitle entry using fzf:")
selected_entry_option = self.fzf_menu(
entry_options, multi=False
) # Always single selection for entries
if not selected_entry_option or selected_entry_option not in entry_mapping:
raise ValueError("No valid entry selected")
selected_entry = entry_mapping[selected_entry_option]
entry_id = selected_entry.get("id")
if not entry_id:
raise ValueError("Selected entry does not have a valid ID")
# Retrieve the files for the selected entry
self.logger.info(f"Retrieving files for entry ID: {entry_id}")
files = self.get_entry_files(entry_id)
# For file input: filter files by episode
if not is_directory and episode > 0:
self.logger.info(f"Filtering subtitle files for episode {episode}")
files = self.filter_files_by_episode(files, episode)
# Present available subtitle files for selection
file_options = []
file_mapping = {}
for i, file_info in enumerate(files, start=1):
display = f"{i}. {file_info.get('name', 'Unknown')}"
file_options.append(display)
file_mapping[display] = file_info
# Sort the file options alphabetically for better readability
file_options.sort()
# Use multi-select mode only for directory input
self.logger.info(
f"Select {'one or more' if is_directory else 'one'} subtitle file(s):"
)
selected_files = self.fzf_menu(file_options, multi=is_directory)
# Handle the different return types based on multi or single selection
if is_directory: # multi-select mode
if not selected_files:
raise ValueError("No subtitle files selected")
selected_files_list = selected_files # already a list
else: # single-select mode
if not selected_files:
raise ValueError("No subtitle file selected")
selected_files_list = [
selected_files
] # convert to list for consistent processing
# Download each selected subtitle file
downloaded_files = []
for opt in selected_files_list:
file_info = file_mapping.get(opt)
if not file_info:
self.logger.warning(f"Could not find mapping for selected file: {opt}")
continue
download_url = file_info.get("url")
if not download_url:
self.logger.warning(
f"File option '{opt}' does not have a download URL. Skipping."
)
continue
# Use provided filename if available; otherwise, default to base video name + suffix.
filename = file_info.get("name")
if not filename:
if is_directory:
# For directory input, use the file's own name or ID
filename = f"{file_info.get('name', 'subtitle.srt')}"
dest_path = join(dest_dir, filename)
self.logger.info(f"Downloading '{opt}' to {dest_path}...")
self.download_file(download_url, dest_path)
downloaded_files.append(dest_path)
self.logger.info(f"Subtitle saved to: {dest_path}")
# Optionally, launch MPV with the video file and the downloaded subtitles
if play and not is_directory:
self.logger.info("Launching MPV with the subtitle files...")
mpv_cmd = ["mpv", media_file]
mpv_cmd.extend([f"--sub-file={filename}"])
try:
self.logger.debug(f"Running command: {' '.join(mpv_cmd)}")
subprocess_run(mpv_cmd)
except FileNotFoundError:
self.logger.error(
"MPV not found. Please install MPV and ensure it is in your PATH."
)
elif play and is_directory:
self.logger.warning(
"Cannot play media with MPV when input is a directory. Skipping playback."
)
self.logger.info("Subtitle download process completed successfully")
return downloaded_files