"""Views for the Flask app.""" import re from urllib.parse import urlparse from flask import Blueprint, abort, current_app, g, jsonify, request from sqlalchemy import inspect from sqlalchemy.exc import SQLAlchemyError from app.database import get_db_session from app.models import SavedQueue, WatchHistory from app.mpv import send_to_mpv from app.utils import (fetch_video_info, is_valid_url, sanitize_video_data, validate_video_data) bp = Blueprint("views", __name__) @bp.before_request def before_request(): g.db_session = get_db_session() @bp.teardown_app_request def shutdown_session(exception=None): if hasattr(g, "db_session"): g.db_session.close() @bp.route("/save_queue", methods=["POST"]) def save_queue(): data = request.get_json() if not data or "urls" not in data: return jsonify(message="Invalid JSON data"), 400 db_session = g.db_session db_session.query(SavedQueue).delete() for url in data["urls"]: new_entry = SavedQueue(video_url=url) db_session.add(new_entry) try: db_session.commit() return jsonify(message="Data added to saved queue"), 200 except SQLAlchemyError as e: db_session.rollback() current_app.logger.error(f"Failed to insert data: {e}") return jsonify(message="Database error"), 500 @bp.route("/load_queue", methods=["GET"]) def load_queue(): """ Retrieves the saved queue of video URLs. """ current_app.logger.debug("Loading saved queue") db_session = g.db_session urls = [entry.video_url for entry in db_session.query(SavedQueue).all()] current_app.logger.debug(f"Loaded {len(urls)} URLs from the saved queue") return jsonify(urls), 200 @bp.route("/add_video", methods=["POST"]) def add_video(): data = request.get_json() if not data or "video_url" not in data: current_app.logger.error("Missing video_url field") return jsonify(message="Missing video_url field"), 400 video_url = data.get("video_url") # Check if it's a local directory or file path (not starting with http/https) is_local_path = not video_url.startswith("http://") and not video_url.startswith( "https://" ) if is_local_path: current_app.logger.info(f"Processing local path: {video_url}") # Create basic metadata for local path sanitized_data = { "video_url": video_url, "video_name": video_url.split("/")[-1] or "Local Directory", "channel_url": "local://", "channel_name": "Local Media", "category": "Local", "view_count": 0, "subscribers": 0, "thumbnail_url": "", "upload_date": None, } else: # Regular YouTube URL processing # Validate video URL format and allowed domains if not is_valid_url(video_url, allowed_domains=["youtube.com", "youtu.be"]): current_app.logger.error("Invalid video URL format or domain") return jsonify(message="Invalid video URL"), 400 # Check if we only have video_url if all(key == "video_url" for key in data.keys()): current_app.logger.info( "Only video_url provided. Fetching additional information..." ) video_info = fetch_video_info(video_url) if not video_info: return jsonify(message="Failed to fetch video information"), 400 # Replace the data with our fetched info data = video_info # Validate all required fields exist if not all( k in data for k in ["video_url", "video_name", "channel_url", "channel_name"] ): current_app.logger.error("Missing required fields after fetching") return jsonify(message="Missing required fields"), 400 # Validate and sanitize all inputs validation_errors = validate_video_data(data) if validation_errors: current_app.logger.error(f"Validation errors: {validation_errors}") return jsonify(message="Invalid input data", errors=validation_errors), 400 # Sanitize string inputs sanitized_data = sanitize_video_data(data) new_entry = WatchHistory( video_url=sanitized_data["video_url"], video_name=sanitized_data["video_name"], channel_url=sanitized_data["channel_url"], channel_name=sanitized_data["channel_name"], category=sanitized_data.get("category"), view_count=sanitized_data.get("view_count"), subscriber_count=sanitized_data.get("subscribers"), thumbnail_url=sanitized_data.get("thumbnail_url"), upload_date=sanitized_data.get("upload_date"), ) db_session = g.db_session try: current_app.logger.debug("Adding video to watch history") current_app.logger.debug(f"Data: {sanitized_data}") db_session.add(new_entry) db_session.commit() current_app.logger.debug("Video added to watch history") return jsonify(message="Video added"), 200 except SQLAlchemyError as e: db_session.rollback() current_app.logger.error(f"Database error: {e}") return jsonify(message="Failed to add video"), 500 @bp.route("/queue", methods=["POST"]) def handle_queue(): data = request.get_json() if not data or "url" not in data: current_app.logger.warning("Request missing 'url' parameter") return jsonify(message="Missing 'url' parameter"), 400 video_url = data["url"] # Basic URL validation if not isinstance(video_url, str) or not video_url.strip(): current_app.logger.warning(f"Invalid URL format: {repr(video_url)}") return jsonify(message="Invalid URL format"), 400 # URL validation to check for http/https protocol if not (video_url.startswith("http://") or video_url.startswith("https://")): current_app.logger.warning(f"URL missing protocol: {repr(video_url)}") return jsonify(message="URL must start with http:// or https://"), 400 # Validate URL structure try: parsed_url = urlparse(video_url) if not all([parsed_url.scheme, parsed_url.netloc]): current_app.logger.warning(f"Invalid URL structure: {repr(video_url)}") return jsonify(message="Invalid URL structure"), 400 except Exception as e: current_app.logger.error(f"Failed to parse URL {repr(video_url)}: {str(e)}") return jsonify(message="Failed to parse URL"), 400 # Sanitize the URL to prevent command injection sanitized_url = video_url.replace('"', '\\"') command = f'{{"command": ["script-message", "add_to_youtube_queue", "{sanitized_url}"]}}\n' current_app.logger.debug(f"Sending URL to MPV: {repr(sanitized_url)}") if send_to_mpv(command): current_app.logger.info("Successfully added URL to MPV queue") return jsonify(message="URL added to mpv queue"), 200 current_app.logger.error("Failed to add URL to MPV queue") return jsonify(message="Failed to add URL to mpv queue"), 500 @bp.route("/migrate_watch_history", methods=["POST"]) def migrate_watch_history(): db_session = g.db_session engine = db_session.get_bind() try: # First check and add missing columns inspector = inspect(engine) existing_columns = [ col["name"] for col in inspector.get_columns("watch_history") ] # Define new columns and their SQL new_columns = { "category": "ALTER TABLE watch_history ADD COLUMN category VARCHAR(100)", "view_count": "ALTER TABLE watch_history ADD COLUMN view_count INTEGER", "subscriber_count": "ALTER TABLE watch_history ADD COLUMN subscriber_count INTEGER", "thumbnail_url": "ALTER TABLE watch_history ADD COLUMN thumbnail_url VARCHAR(255)", "upload_date": "ALTER TABLE watch_history ADD COLUMN upload_date TIMESTAMP", } # Add missing columns columns_added = [] for col_name, sql in new_columns.items(): if col_name not in existing_columns: engine.execute(sql) columns_added.append(col_name) current_app.logger.info(f"Added column: {col_name}") # Now backfill with default values entries = db_session.query(WatchHistory).all() updated_count = 0 for entry in entries: updated = False # Check and set defaults for new columns if they're None if entry.category is None: entry.category = "Unknown" updated = True if entry.view_count is None: entry.view_count = 0 updated = True if entry.subscriber_count is None: entry.subscriber_count = 0 updated = True if entry.thumbnail_url is None: entry.thumbnail_url = "" updated = True if entry.upload_date is None: # Set to watch_date as a fallback entry.upload_date = entry.watch_date updated = True if updated: updated_count += 1 db_session.commit() return ( jsonify( { "message": "Migration completed successfully", "columns_added": columns_added, "records_updated": updated_count, "total_records": len(entries), } ), 200, ) except SQLAlchemyError as e: db_session.rollback() current_app.logger.error(f"Migration failed: {e}") return jsonify(message=f"Failed to migrate watch history: {str(e)}"), 500