#!/usr/bin/env python3 """ MPV Immersion Tracker Data Analyzer Analyzes the CSV data generated by the immersion tracker script """ import argparse import csv import os import sys from collections import Counter, defaultdict from datetime import datetime, timedelta class ImmersionAnalyzer: def __init__(self, csv_file): self.csv_file = csv_file self.data = [] self.load_data() def load_data(self): """Load data from CSV file""" if not os.path.exists(self.csv_file): print(f"Error: CSV file '{self.csv_file}' not found!") sys.exit(1) try: with open(self.csv_file, "r", encoding="utf-8") as file: reader = csv.DictReader(file) for row in reader: # Convert numeric fields try: row["Duration"] = int(row["Duration"]) row["Watch Time"] = int(row["Watch Time"]) row["Progress"] = float(row["Progress"]) except (ValueError, KeyError): continue # Parse timestamps try: row["Start Time"] = datetime.strptime( row["Start Time"], "%Y-%m-%d %H:%M:%S" ) row["End Time"] = datetime.strptime( row["End Time"], "%Y-%m-%d %H:%M:%S" ) except (ValueError, KeyError): continue self.data.append(row) print(f"Loaded {len(self.data)} sessions from {self.csv_file}") except Exception as e: print(f"Error loading CSV file: {e}") sys.exit(1) def total_watch_time(self): """Calculate total watch time""" total_seconds = sum(row["Watch Time"] for row in self.data) hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 return hours, minutes, total_seconds def total_content_duration(self): """Calculate total content duration""" total_seconds = sum(row["Duration"] for row in self.data) hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 return hours, minutes, total_seconds def average_progress(self): """Calculate average completion percentage""" if not self.data: return 0 return sum(row["Progress"] for row in self.data) / len(self.data) def most_watched_content(self, limit=10): """Find most watched content""" content_watch_time = defaultdict(int) for row in self.data: content_watch_time[row["Title"]] += row["Watch Time"] return sorted(content_watch_time.items(), key=lambda x: x[1], reverse=True)[ :limit ] def daily_progress(self): """Calculate daily watch time""" daily_watch = defaultdict(int) for row in self.data: date = row["Start Time"].date() daily_watch[date] += row["Watch Time"] return sorted(daily_watch.items()) def weekly_progress(self): """Calculate weekly watch time""" weekly_watch = defaultdict(int) for row in self.data: # Get the week start (Monday) date = row["Start Time"].date() week_start = date - timedelta(days=date.weekday()) weekly_watch[week_start] += row["Watch Time"] return sorted(weekly_watch.items()) def monthly_progress(self): """Calculate monthly watch time""" monthly_watch = defaultdict(int) for row in self.data: month_start = row["Start Time"].replace(day=1).date() monthly_watch[month_start] += row["Watch Time"] return sorted(monthly_watch.items()) def format_duration(self, seconds): """Format duration in human-readable format""" hours = seconds // 3600 minutes = (seconds % 3600) // 60 if hours > 0: return f"{hours}h {minutes}m" else: return f"{minutes}m" def print_summary(self): """Print a comprehensive summary""" print("\n" + "=" * 60) print("MPV IMMERSION TRACKER - DATA SUMMARY") print("=" * 60) if not self.data: print("No data found!") return # Basic stats total_hours, total_minutes, total_seconds = self.total_watch_time() content_hours, content_minutes, content_seconds = self.total_content_duration() avg_progress = self.average_progress() print(f"\nšŸ“Š BASIC STATISTICS:") print(f" Total sessions: {len(self.data)}") print( f" Total watch time: {total_hours}h {total_minutes}m ({total_seconds:,} seconds)" ) print( f" Total content duration: {content_hours}h {content_minutes}m ({content_seconds:,} seconds)" ) print(f" Average completion: {avg_progress:.1f}%") # Time analysis print(f"\nā° TIME ANALYSIS:") daily_data = self.daily_progress() if daily_data: total_days = len(daily_data) avg_daily = total_seconds / total_days avg_daily_h, avg_daily_m = avg_daily // 3600, (avg_daily % 3600) // 60 print(f" Active days: {total_days}") print(f" Average daily watch time: {avg_daily_h}h {avg_daily_m}m") # Most active day most_active_day, most_active_time = max(daily_data, key=lambda x: x[1]) print( f" Most active day: {most_active_day} ({self.format_duration(most_active_time)})" ) # Content analysis print(f"\nšŸŽ¬ CONTENT ANALYSIS:") most_watched = self.most_watched_content(5) for i, (title, watch_time) in enumerate(most_watched, 1): print(f" {i}. {title[:50]}{'...' if len(title) > 50 else ''}") print(f" Watch time: {self.format_duration(watch_time)}") # Recent activity print(f"\nšŸ“… RECENT ACTIVITY:") recent_sessions = sorted( self.data, key=lambda x: x["Start Time"], reverse=True )[:5] for session in recent_sessions: date_str = session["Start Time"].strftime("%Y-%m-%d %H:%M") print( f" {date_str}: {session['Title'][:40]}{'...' if len(session['Title']) > 40 else ''}" ) print( f" Progress: {session['Progress']:.1f}% ({self.format_duration(session['Watch Time'])})" ) def print_detailed_analysis(self): """Print detailed analysis with charts""" print("\n" + "=" * 60) print("DETAILED ANALYSIS") print("=" * 60) # Weekly progress chart print(f"\nšŸ“ˆ WEEKLY PROGRESS (Last 8 weeks):") weekly_data = self.weekly_progress() recent_weeks = weekly_data[-8:] if len(weekly_data) >= 8 else weekly_data for week_start, watch_time in recent_weeks: week_end = week_start + timedelta(days=6) week_range = ( f"{week_start.strftime('%m/%d')} - {week_end.strftime('%m/%d')}" ) duration_str = self.format_duration(watch_time) print(f" Week of {week_range}: {duration_str}") # Monthly progress chart print(f"\nšŸ“Š MONTHLY PROGRESS:") monthly_data = self.monthly_progress() for month_start, watch_time in monthly_data: month_name = month_start.strftime("%B %Y") duration_str = self.format_duration(watch_time) print(f" {month_name}: {duration_str}") # Progress distribution print(f"\nšŸŽÆ PROGRESS DISTRIBUTION:") progress_ranges = {"0-25%": 0, "26-50%": 0, "51-75%": 0, "76-99%": 0, "100%": 0} for row in self.data: progress = row["Progress"] if progress == 100: progress_ranges["100%"] += 1 elif progress >= 76: progress_ranges["76-99%"] += 1 elif progress >= 51: progress_ranges["51-75%"] += 1 elif progress >= 26: progress_ranges["26-50%"] += 1 else: progress_ranges["0-25%"] += 1 for range_name, count in progress_ranges.items(): percentage = (count / len(self.data)) * 100 if self.data else 0 print(f" {range_name}: {count} sessions ({percentage:.1f}%)") def export_summary(self, output_file): """Export summary to a text file""" try: with open(output_file, "w", encoding="utf-8") as f: # Redirect stdout to file import io old_stdout = sys.stdout sys.stdout = io.StringIO() self.print_summary() self.print_detailed_analysis() output = sys.stdout.getvalue() sys.stdout = old_stdout f.write(output) print(f"Summary exported to: {output_file}") except Exception as e: print(f"Error exporting summary: {e}") def main(): parser = argparse.ArgumentParser(description="Analyze MPV Immersion Tracker data") parser.add_argument( "csv_file", nargs="?", default="data/immersion_log.csv", help="Path to the CSV file (default: data/immersion_log.csv)", ) parser.add_argument( "--export", "-e", metavar="FILE", help="Export summary to specified file" ) parser.add_argument( "--detailed", "-d", action="store_true", help="Show detailed analysis" ) args = parser.parse_args() # Check if CSV file exists if not os.path.exists(args.csv_file): print(f"Error: CSV file '{args.csv_file}' not found!") print("Make sure you have run the MPV immersion tracker first.") sys.exit(1) # Create analyzer and run analysis analyzer = ImmersionAnalyzer(args.csv_file) if args.export: analyzer.export_summary(args.export) else: analyzer.print_summary() if args.detailed: analyzer.print_detailed_analysis() print(f"\nšŸ’” Tip: Use --export FILENAME to save the analysis to a file") print(f"šŸ’” Tip: Use --detailed for more comprehensive analysis") if __name__ == "__main__": main()