initial commit

This commit is contained in:
2025-08-28 20:48:10 -07:00
commit 69a7bd4f98
7 changed files with 702 additions and 0 deletions

332
teppei.py Executable file
View File

@@ -0,0 +1,332 @@
#!/usr/bin/env python
import logging
import os
import time
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from pathlib import Path
from typing import Optional, Tuple
import requests
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Constants
AUDIO_BASE_URL = (
"https://www.heypera.com/listen/nihongo-con-teppei-for-beginners/{}/next"
)
SUB_BASE_URL = "https://storage.googleapis.com/pera-transcripts/nihongo-con-teppei-for-beginners/transcripts/{}.vtt"
DEFAULT_DELAY = 5 # seconds between requests
DEFAULT_TIMEOUT = 30 # seconds for HTTP requests
MAX_RETRIES = 3
class TeppeiDownloader:
def __init__(self, delay: float = DEFAULT_DELAY, timeout: int = DEFAULT_TIMEOUT):
self.delay = delay
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
})
def get_audio_url(self, episode_num: int) -> Optional[str]:
"""Get audio URL using Selenium scraping."""
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
driver = None
try:
driver = webdriver.Chrome(options=chrome_options)
driver.get(AUDIO_BASE_URL.format(episode_num))
# Wait for the audio element to be present
audio_element = WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.TAG_NAME, "audio"))
)
audio_url = audio_element.get_attribute("src")
if audio_url:
logger.info(f"Found audio URL for episode {episode_num}")
return audio_url
else:
logger.error(f"No audio URL found for episode {episode_num}")
return None
except Exception as e:
logger.error(f"Error getting audio URL for episode {episode_num}: {e}")
return None
finally:
if driver:
driver.quit()
def get_sub_url(self, episode_num: int) -> str:
"""Get subtitle URL (direct URL construction)."""
return SUB_BASE_URL.format(episode_num)
def verify_download(self, filename: str, expected_size: Optional[int] = None) -> bool:
"""Verify that a downloaded file exists and has reasonable content."""
if not os.path.exists(filename):
logger.warning(f"File verification failed: {filename} does not exist")
return False
try:
file_size = os.path.getsize(filename)
if file_size == 0:
logger.warning(f"File verification failed: {filename} is empty (0 bytes)")
return False
# For subtitle files, check if they have at least some basic content
if filename.endswith('.vtt'):
with open(filename, 'r', encoding='utf-8') as f:
content = f.read().strip()
if len(content) < 10: # Very basic check for minimal content
logger.warning(f"File verification failed: {filename} appears to have insufficient content")
return False
# If expected size is provided, check if it's reasonable
if expected_size and abs(file_size - expected_size) > expected_size * 0.1:
logger.warning(f"File verification failed: {filename} size ({file_size} bytes) differs significantly from expected ({expected_size} bytes)")
logger.debug(f"File verification passed: {filename} ({file_size} bytes)")
return True
except (OSError, IOError) as e:
logger.warning(f"File verification failed: {filename} - {e}")
return False
def download_file(self, url: str, filename: str, retries: int = MAX_RETRIES) -> bool:
"""Download a file with retry logic and proper error handling."""
for attempt in range(retries):
try:
logger.info(f"Downloading {filename} (attempt {attempt + 1}/{retries})")
response = self.session.get(url, timeout=self.timeout)
if response.status_code == 200:
with open(filename, "wb") as file:
file.write(response.content)
# Verify the download was successful
if self.verify_download(filename, len(response.content)):
logger.info(f"Successfully downloaded and verified {filename}")
return True
else:
logger.warning(f"Download verification failed for {filename}, retrying...")
# Remove the corrupted/incomplete file
try:
os.remove(filename)
except OSError:
pass # Ignore if file doesn't exist or can't be removed
else:
logger.warning(f"HTTP {response.status_code} for {filename}")
except requests.exceptions.RequestException as e:
logger.warning(f"Download attempt {attempt + 1} failed for {filename}: {e}")
if attempt < retries - 1:
time.sleep(self.delay)
logger.error(f"Failed to download {filename} after {retries} attempts")
return False
def file_exists(self, filepath: str) -> bool:
"""Check if file exists and has content."""
if not os.path.exists(filepath):
return False
# Check if file has content (not empty)
try:
return os.path.getsize(filepath) > 0
except OSError:
return False
def download_episode(self, episode_num: int, output_dir: str, force: bool = False) -> Tuple[bool, bool]:
"""Download both audio and subtitle files for an episode."""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
audio_filename = output_path / f"Nihongo-Con-Teppei-E{episode_num:02d}.mp3"
sub_filename = output_path / f"Nihongo-Con-Teppei-E{episode_num:02d}.vtt"
audio_success = True
sub_success = True
# Download audio file
if force or not self.file_exists(str(audio_filename)):
audio_url = self.get_audio_url(episode_num)
if audio_url:
audio_success = self.download_file(audio_url, str(audio_filename))
else:
audio_success = False
logger.error(f"Could not get audio URL for episode {episode_num}")
# Add delay between audio and subtitle downloads
if audio_success:
time.sleep(self.delay)
else:
logger.info(f"Audio file already exists: {audio_filename}")
# Download subtitle file
if force or not self.file_exists(str(sub_filename)):
sub_url = self.get_sub_url(episode_num)
sub_success = self.download_file(sub_url, str(sub_filename))
else:
logger.info(f"Subtitle file already exists: {sub_filename}")
return audio_success, sub_success
def download_range(self, start: int, end: int, output_dir: str, force: bool = False) -> None:
"""Download a range of episodes with progress tracking."""
if start > end:
logger.error("Start episode must be less than or equal to end episode")
return
total_episodes = end - start + 1
successful_downloads = 0
logger.info(f"Starting download of episodes {start} to {end} ({total_episodes} episodes)")
for episode in range(start, end + 1):
logger.info(f"Processing episode {episode}/{end} ({episode-start+1}/{total_episodes})")
audio_success, sub_success = self.download_episode(episode, output_dir, force)
if audio_success and sub_success:
successful_downloads += 1
# Add delay between episodes to be respectful to the server
if episode < end: # Don't delay after the last episode
logger.info(f"Waiting {self.delay} seconds before next episode...")
time.sleep(self.delay)
logger.info(f"Download complete! Successfully downloaded {successful_downloads}/{total_episodes} episodes")
def parse_args():
parser = ArgumentParser(
description="Download Nihongo Con Teppei episodes with audio and subtitles",
formatter_class=RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s 11 --download # Download episode 11
%(prog)s --start 11 --end 15 --download # Download episodes 11-15
%(prog)s 11 --download --force # Force re-download episode 11
%(prog)s --start 1 --end 20 --download --output ./teppei_episodes
"""
)
# Single episode mode
parser.add_argument(
"episode_num",
type=int,
nargs='?',
help="Episode number to download (for single episode mode)"
)
# Range mode
parser.add_argument(
"--start",
type=int,
help="Starting episode number for range download"
)
parser.add_argument(
"--end",
type=int,
help="Ending episode number for range download"
)
# Download options
parser.add_argument(
"-d", "--download",
action="store_true",
help="Download the files (if not specified, only show URLs)"
)
parser.add_argument(
"-o", "--output",
default=".",
help="Output directory (default: current directory)"
)
parser.add_argument(
"--force",
action="store_true",
help="Force re-download even if files already exist"
)
parser.add_argument(
"--delay",
type=float,
default=DEFAULT_DELAY,
help=f"Delay between requests in seconds (default: {DEFAULT_DELAY})"
)
parser.add_argument(
"--timeout",
type=int,
default=DEFAULT_TIMEOUT,
help=f"HTTP request timeout in seconds (default: {DEFAULT_TIMEOUT})"
)
return parser.parse_args()
def main():
args = parse_args()
# Validate arguments
if args.start is not None and args.end is not None:
# Range mode
if args.start < 1 or args.end < 1:
logger.error("Episode numbers must be greater than 0")
return
if args.start > args.end:
logger.error("Start episode must be less than or equal to end episode")
return
elif args.episode_num is not None:
# Single episode mode
if args.episode_num < 1:
logger.error("Episode number must be greater than 0")
return
else:
logger.error("Must specify either a single episode number or a range (--start and --end)")
return
# Create downloader instance
downloader = TeppeiDownloader(delay=args.delay, timeout=args.timeout)
if args.start is not None and args.end is not None:
# Range download mode
downloader.download_range(args.start, args.end, args.output, args.force)
else:
# Single episode mode
episode = args.episode_num
if not args.download:
# Just show URLs
audio_url = downloader.get_audio_url(episode)
sub_url = downloader.get_sub_url(episode)
if audio_url:
print(f"Audio URL: {audio_url}")
else:
print("Could not retrieve audio URL")
print(f"Subtitle URL: {sub_url}")
else:
# Download files
audio_success, sub_success = downloader.download_episode(episode, args.output, args.force)
if audio_success and sub_success:
logger.info(f"Successfully downloaded episode {episode}")
else:
logger.error(f"Failed to download episode {episode}")
if __name__ == "__main__":
main()