import logging import os import socket import time import urllib.parse from datetime import date import mysql.connector from flask import Flask, jsonify, request from mysql.connector import Error MAX_RETRIES = 5 SOCKET_RETRY_DELAY = 1.5 # Configuration MPV_SOCKET: str = os.getenv("MPV_SOCKET", "/tmp/mpvsocket") HOST_NAME: str = os.getenv("HOST_NAME", "0.0.0.0") PORT_NUMBER: int = int(os.getenv("PORT_NUMBER", "8080")) # MySQL Configuration MYSQL_HOST: str = os.getenv("MYSQL_HOST", "localhost") MYSQL_DATABASE: str = os.getenv("MYSQL_DATABASE", "your_database") MYSQL_USER: str = os.getenv("MYSQL_USER", "your_username") MYSQL_PASSWORD: str = os.getenv("MYSQL_PASSWORD", "your_password") MYSQL_PORT: int = int(os.getenv("MYSQL_PORT", "3306")) LOGLEVEL = os.getenv("LOGLEVEL", "INFO").strip().upper() # Initialize Flask app = Flask(__name__) # Set up logging def setup_logging(): """Sets up logging for both the app and flask.""" if not app.logger.hasHandlers(): # Check if there are already handlers # Create a formatter formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) # Create a console handler console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) # Configure the root logger app.logger.addHandler(console_handler) # Set the log level based on LOGLEVEL if LOGLEVEL == "DEBUG": app.logger.setLevel(logging.DEBUG) elif LOGLEVEL == "WARNING": app.logger.setLevel(logging.WARNING) elif LOGLEVEL == "ERROR": app.logger.setLevel(logging.ERROR) else: app.logger.setLevel(logging.INFO) # Silence noisy logs from certain libraries if necessary logging.getLogger("logger").setLevel(logging.INFO) # Set up logging setup_logging() def get_mysql_connection(): """ Get a MySQL database connection. -------- Returns -------- connection: mysql.connector.connection.MySQLConnection The MySQL connection object if successful, otherwise None. """ try: connection = mysql.connector.connect( host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, port=MYSQL_PORT, ) if connection.is_connected(): app.logger.debug("Connected to database successfully.") return connection except Error as e: app.logger.error(f"Error while connecting to MySQL: {e}") return None def ensure_watch_history_table_exists(): """Ensure the watch_history table exists in the mpv schema, otherwise create it.""" connection = get_mysql_connection() if connection: try: cursor = connection.cursor() cursor.execute("CREATE DATABASE IF NOT EXISTS mpv") cursor.execute( """ CREATE TABLE IF NOT EXISTS mpv.watch_history ( whid INT AUTO_INCREMENT PRIMARY KEY, video_url VARCHAR(255) NOT NULL, video_name VARCHAR(255) NOT NULL, channel_url VARCHAR(255) NOT NULL, channel_name VARCHAR(255) NOT NULL, watch_date DATE NOT NULL ) """ ) connection.commit() app.logger.info("Ensured watch_history table exists") except Error as e: app.logger.error(f"Failed to ensure watch_history table exists: {e}") finally: cursor.close() connection.close() def send_to_mpv(command: str): """Send a command to the mpv socket, retrying up to MAX_RETRIES times if the socket is not available.""" attempts = 0 while attempts < MAX_RETRIES: try: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client_socket: client_socket.connect(MPV_SOCKET) client_socket.sendall(command.encode("utf-8")) app.logger.info("Command sent to mpv successfully.") return True except socket.error as e: attempts += 1 app.logger.error( f"Failed to connect to socket (attempt {attempts}/{MAX_RETRIES}): {e}. Retrying in {SOCKET_RETRY_DELAY} seconds..." ) time.sleep(SOCKET_RETRY_DELAY) app.logger.error(f"Exceeded maximum retries ({MAX_RETRIES}). Ignoring the request.") return False @app.route("/add_video", methods=["POST"]) def add_video(): """ Adds a video to the mpv queue and the MySQL database. -------- Parameters -------- data: dict The JSON data containing the video information. Required fields: - video_url: str - video_name: str - channel_url: str - channel_name: str e.g. {"video_url": "https://www.youtube.com/watch?v=video_id", "video_name": "video_name", "channel_url": "https://www.youtube.com/channel/channel_id", "channel_name": "channel_name"} """ data = request.get_json() if data: video_url: str = data.get("video_url") video_name: str = data.get("video_name") channel_url: str = data.get("channel_url") channel_name: str = data.get("channel_name") watch_date: date = date.today().strftime("%Y-%m-%d") if video_url and video_name and channel_url and channel_name and watch_date: app.logger.debug(f"Received data: {data}") app.logger.debug(f"Watch date: {watch_date}") # Insert the data into the MySQL database connection = get_mysql_connection() if connection: try: query = """ INSERT INTO mpv.watch_history (video_url, video_name, channel_url, channel_name, watch_date) VALUES (%s, %s, %s, %s, %s) """ cursor = connection.cursor() cursor.execute( query, ( video_url, video_name, channel_url, channel_name, watch_date, ), ) connection.commit() app.logger.info("Data inserted into MySQL database") return ( jsonify(message="Data added to mpv queue and database"), 200, ) except Error as e: app.logger.error(f"Failed to insert data into MySQL database: {e}") return jsonify(message="Failed to add data to database"), 500 finally: cursor.close() connection.close() else: return jsonify(message="Failed to connect to MySQL database"), 500 else: app.logger.error("Missing required data fields") return jsonify(message="Missing required data fields"), 400 else: app.logger.error("Invalid JSON data") return jsonify(message="Invalid JSON data"), 400 @app.route("/", methods=["GET"]) def handle_request(): """ Handle GET requests to the root URL. This function is used to add a video to the mpv queue. """ video_url = request.args.get("url") if video_url: video_url = urllib.parse.unquote(video_url) # Decode the URL app.logger.info(f"Received URL: {video_url}") # Create the command to send to mpv command = f'{{"command": ["script-message", "add_to_youtube_queue", "{video_url}"]}}\n' # Try to send the command to mpv if send_to_mpv(command): return "URL added to mpv queue", 200 else: return "Failed to add URL to mpv queue after max retries", 500 else: app.logger.error("Missing 'url' parameter") return "Missing 'url' parameter", 400 if __name__ == "__main__": app.logger.info(f"Starting server on {HOST_NAME}:{PORT_NUMBER}...") ensure_watch_history_table_exists() try: app.run(host=HOST_NAME, port=PORT_NUMBER) except Exception as e: app.logger.exception(f"Error occurred: {e}") except KeyboardInterrupt: app.logger.info("Server is shutting down...") app.logger.info("Server stopped.")