import crypto from "node:crypto"; import path from "node:path"; import { spawn } from "node:child_process"; import { DatabaseSync } from "node:sqlite"; import * as fs from "node:fs"; import { createLogger } from "../../logger"; const SCHEMA_VERSION = 1; const DEFAULT_QUEUE_CAP = 1_000; const DEFAULT_BATCH_SIZE = 25; const DEFAULT_FLUSH_INTERVAL_MS = 500; const DEFAULT_MAINTENANCE_INTERVAL_MS = 24 * 60 * 60 * 1000; const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000; const EVENTS_RETENTION_MS = ONE_WEEK_MS; const VACUUM_INTERVAL_MS = ONE_WEEK_MS; const TELEMETRY_RETENTION_MS = 30 * 24 * 60 * 60 * 1000; const DAILY_ROLLUP_RETENTION_MS = 365 * 24 * 60 * 60 * 1000; const MONTHLY_ROLLUP_RETENTION_MS = 5 * 365 * 24 * 60 * 60 * 1000; const MAX_PAYLOAD_BYTES = 256; const SOURCE_TYPE_LOCAL = 1; const SOURCE_TYPE_REMOTE = 2; const SESSION_STATUS_ACTIVE = 1; const SESSION_STATUS_ENDED = 2; const EVENT_SUBTITLE_LINE = 1; const EVENT_MEDIA_BUFFER = 2; const EVENT_LOOKUP = 3; const EVENT_CARD_MINED = 4; const EVENT_SEEK_FORWARD = 5; const EVENT_SEEK_BACKWARD = 6; const EVENT_PAUSE_START = 7; const EVENT_PAUSE_END = 8; export interface ImmersionTrackerOptions { dbPath: string; } interface TelemetryAccumulator { totalWatchedMs: number; activeWatchedMs: number; linesSeen: number; wordsSeen: number; tokensSeen: number; cardsMined: number; lookupCount: number; lookupHits: number; pauseCount: number; pauseMs: number; seekForwardCount: number; seekBackwardCount: number; mediaBufferEvents: number; } interface SessionState extends TelemetryAccumulator { sessionId: number; videoId: number; startedAtMs: number; currentLineIndex: number; lastWallClockMs: number; lastMediaMs: number | null; lastPauseStartMs: number | null; isPaused: boolean; pendingTelemetry: boolean; } interface QueuedWrite { kind: "telemetry" | "event"; sessionId: number; sampleMs?: number; totalWatchedMs?: number; activeWatchedMs?: number; linesSeen?: number; wordsSeen?: number; tokensSeen?: number; cardsMined?: number; lookupCount?: number; lookupHits?: number; pauseCount?: number; pauseMs?: number; seekForwardCount?: number; seekBackwardCount?: number; mediaBufferEvents?: number; eventType?: number; lineIndex?: number | null; segmentStartMs?: number | null; segmentEndMs?: number | null; wordsDelta?: number; cardsDelta?: number; payloadJson?: string | null; } interface VideoMetadata { sourceType: number; canonicalTitle: string; durationMs: number; fileSizeBytes: number | null; codecId: number | null; containerId: number | null; widthPx: number | null; heightPx: number | null; fpsX100: number | null; bitrateKbps: number | null; audioCodecId: number | null; hashSha256: string | null; screenshotPath: string | null; metadataJson: string | null; } export interface SessionSummaryQueryRow { videoId: number | null; startedAtMs: number; endedAtMs: number | null; totalWatchedMs: number; activeWatchedMs: number; linesSeen: number; wordsSeen: number; tokensSeen: number; cardsMined: number; lookupCount: number; lookupHits: number; } export interface SessionTimelineRow { sampleMs: number; totalWatchedMs: number; activeWatchedMs: number; linesSeen: number; wordsSeen: number; tokensSeen: number; cardsMined: number; } export interface ImmersionSessionRollupRow { rollupDayOrMonth: number; videoId: number | null; totalSessions: number; totalActiveMin: number; totalLinesSeen: number; totalWordsSeen: number; totalTokensSeen: number; totalCards: number; cardsPerHour: number | null; wordsPerMin: number | null; lookupHitRate: number | null; } export class ImmersionTrackerService { private readonly logger = createLogger("main:immersion-tracker"); private readonly db: DatabaseSync; private readonly queue: QueuedWrite[] = []; private readonly queueCap: number; private readonly batchSize: number; private readonly flushIntervalMs: number; private readonly maintenanceIntervalMs: number; private readonly dbPath: string; private readonly writeLock = { locked: false }; private flushTimer: ReturnType | null = null; private maintenanceTimer: ReturnType | null = null; private flushScheduled = false; private droppedWriteCount = 0; private lastMaintenanceMs = 0; private lastVacuumMs = 0; private isDestroyed = false; private sessionState: SessionState | null = null; private currentVideoKey = ""; private currentMediaPathOrUrl = ""; private lastQueueWriteAtMs = 0; private readonly telemetryInsertStmt: ReturnType; private readonly eventInsertStmt: ReturnType; constructor(options: ImmersionTrackerOptions) { this.dbPath = options.dbPath; const parentDir = path.dirname(this.dbPath); if (!fs.existsSync(parentDir)) { fs.mkdirSync(parentDir, { recursive: true }); } this.queueCap = DEFAULT_QUEUE_CAP; this.batchSize = DEFAULT_BATCH_SIZE; this.flushIntervalMs = DEFAULT_FLUSH_INTERVAL_MS; this.maintenanceIntervalMs = DEFAULT_MAINTENANCE_INTERVAL_MS; this.lastMaintenanceMs = Date.now(); this.db = new DatabaseSync(this.dbPath); this.applyPragmas(); this.ensureSchema(); this.telemetryInsertStmt = this.db.prepare(` INSERT INTO imm_session_telemetry ( session_id, sample_ms, total_watched_ms, active_watched_ms, lines_seen, words_seen, tokens_seen, cards_mined, lookup_count, lookup_hits, pause_count, pause_ms, seek_forward_count, seek_backward_count, media_buffer_events ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) `); this.eventInsertStmt = this.db.prepare(` INSERT INTO imm_session_events ( session_id, ts_ms, event_type, line_index, segment_start_ms, segment_end_ms, words_delta, cards_delta, payload_json ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ? ) `); this.scheduleMaintenance(); this.scheduleFlush(); } destroy(): void { if (this.isDestroyed) return; if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } if (this.maintenanceTimer) { clearInterval(this.maintenanceTimer); this.maintenanceTimer = null; } this.finalizeActiveSession(); this.isDestroyed = true; this.db.close(); } async getSessionSummaries( limit = 50, ): Promise { const prepared = this.db.prepare(` SELECT s.video_id AS videoId, s.started_at_ms AS startedAtMs, s.ended_at_ms AS endedAtMs, COALESCE(SUM(t.total_watched_ms), 0) AS totalWatchedMs, COALESCE(SUM(t.active_watched_ms), 0) AS activeWatchedMs, COALESCE(SUM(t.lines_seen), 0) AS linesSeen, COALESCE(SUM(t.words_seen), 0) AS wordsSeen, COALESCE(SUM(t.tokens_seen), 0) AS tokensSeen, COALESCE(SUM(t.cards_mined), 0) AS cardsMined, COALESCE(SUM(t.lookup_count), 0) AS lookupCount, COALESCE(SUM(t.lookup_hits), 0) AS lookupHits FROM imm_sessions s LEFT JOIN imm_session_telemetry t ON t.session_id = s.session_id GROUP BY s.session_id ORDER BY s.started_at_ms DESC LIMIT ? `); return prepared.all(limit) as unknown as SessionSummaryQueryRow[]; } async getSessionTimeline( sessionId: number, limit = 200, ): Promise { const prepared = this.db.prepare(` SELECT sample_ms AS sampleMs, total_watched_ms AS totalWatchedMs, active_watched_ms AS activeWatchedMs, lines_seen AS linesSeen, words_seen AS wordsSeen, tokens_seen AS tokensSeen, cards_mined AS cardsMined FROM imm_session_telemetry WHERE session_id = ? ORDER BY sample_ms DESC LIMIT ? `); return prepared.all(sessionId, limit) as unknown as SessionTimelineRow[]; } async getQueryHints(): Promise<{ totalSessions: number; activeSessions: number; }> { const sessions = this.db.prepare("SELECT COUNT(*) AS total FROM imm_sessions"); const active = this.db.prepare( "SELECT COUNT(*) AS total FROM imm_sessions WHERE ended_at_ms IS NULL", ); const totalSessions = Number(sessions.get()?.total ?? 0); const activeSessions = Number(active.get()?.total ?? 0); return { totalSessions, activeSessions }; } async getDailyRollups( limit = 60, ): Promise { const prepared = this.db.prepare(` SELECT rollup_day AS rollupDayOrMonth, video_id AS videoId, total_sessions AS totalSessions, total_active_min AS totalActiveMin, total_lines_seen AS totalLinesSeen, total_words_seen AS totalWordsSeen, total_tokens_seen AS totalTokensSeen, total_cards AS totalCards, cards_per_hour AS cardsPerHour, words_per_min AS wordsPerMin, lookup_hit_rate AS lookupHitRate FROM imm_daily_rollups ORDER BY rollup_day DESC, video_id DESC LIMIT ? `); return prepared.all(limit) as unknown as ImmersionSessionRollupRow[]; } async getMonthlyRollups( limit = 24, ): Promise { const prepared = this.db.prepare(` SELECT rollup_month AS rollupDayOrMonth, video_id AS videoId, total_sessions AS totalSessions, total_active_min AS totalActiveMin, total_lines_seen AS totalLinesSeen, total_words_seen AS totalWordsSeen, total_tokens_seen AS totalTokensSeen, total_cards AS totalCards, 0 AS cardsPerHour, 0 AS wordsPerMin, 0 AS lookupHitRate FROM imm_monthly_rollups ORDER BY rollup_month DESC, video_id DESC LIMIT ? `); return prepared.all(limit) as unknown as ImmersionSessionRollupRow[]; } handleMediaChange(mediaPath: string | null, mediaTitle: string | null): void { const normalizedPath = this.normalizeMediaPath(mediaPath); const normalizedTitle = this.normalizeText(mediaTitle); this.logger.info( `handleMediaChange called with path=${normalizedPath || ""} title=${normalizedTitle || ""}`, ); if (normalizedPath === this.currentMediaPathOrUrl) { if (normalizedTitle && normalizedTitle !== this.currentVideoKey) { this.currentVideoKey = normalizedTitle; this.updateVideoTitleForActiveSession(normalizedTitle); this.logger.debug("Media title updated for existing session"); } else { this.logger.debug("Media change ignored; path unchanged"); } return; } this.finalizeActiveSession(); this.currentMediaPathOrUrl = normalizedPath; this.currentVideoKey = normalizedTitle; if (!normalizedPath) { this.logger.info("Media path cleared; immersion session tracking paused"); return; } const sourceType = this.isRemoteSource(normalizedPath) ? SOURCE_TYPE_REMOTE : SOURCE_TYPE_LOCAL; const videoKey = this.buildVideoKey(normalizedPath, sourceType); const canonicalTitle = normalizedTitle || this.deriveCanonicalTitle(normalizedPath); const sourcePath = sourceType === SOURCE_TYPE_LOCAL ? normalizedPath : null; const sourceUrl = sourceType === SOURCE_TYPE_REMOTE ? normalizedPath : null; const sessionInfo = { videoId: this.getOrCreateVideo(videoKey, { canonicalTitle, sourcePath, sourceUrl, sourceType, }), startedAtMs: Date.now(), }; this.logger.info( `Starting immersion session for path=${normalizedPath} videoId=${sessionInfo.videoId}`, ); this.startSession(sessionInfo.videoId, sessionInfo.startedAtMs); this.captureVideoMetadataAsync(sessionInfo.videoId, sourceType, normalizedPath); } handleMediaTitleUpdate(mediaTitle: string | null): void { if (!this.sessionState) return; const normalizedTitle = this.normalizeText(mediaTitle); if (!normalizedTitle) return; this.currentVideoKey = normalizedTitle; this.updateVideoTitleForActiveSession(normalizedTitle); } recordSubtitleLine( text: string, startSec: number, endSec: number, ): void { if (!this.sessionState || !text.trim()) return; const cleaned = this.normalizeText(text); if (!cleaned) return; const metrics = this.calculateTextMetrics(cleaned); this.sessionState.currentLineIndex += 1; this.sessionState.linesSeen += 1; this.sessionState.wordsSeen += metrics.words; this.sessionState.tokensSeen += metrics.tokens; this.sessionState.pendingTelemetry = true; this.recordWrite({ kind: "event", sessionId: this.sessionState.sessionId, sampleMs: Date.now(), lineIndex: this.sessionState.currentLineIndex, segmentStartMs: this.secToMs(startSec), segmentEndMs: this.secToMs(endSec), wordsDelta: metrics.words, cardsDelta: 0, eventType: EVENT_SUBTITLE_LINE, payloadJson: this.sanitizePayload({ event: "subtitle-line", text: cleaned, words: metrics.words, }), }); } recordPlaybackPosition(mediaTimeSec: number | null): void { if (!this.sessionState || mediaTimeSec === null || !Number.isFinite(mediaTimeSec)) { return; } const nowMs = Date.now(); const mediaMs = Math.round(mediaTimeSec * 1000); if (this.sessionState.lastWallClockMs <= 0) { this.sessionState.lastWallClockMs = nowMs; this.sessionState.lastMediaMs = mediaMs; return; } const wallDeltaMs = nowMs - this.sessionState.lastWallClockMs; if (wallDeltaMs > 0 && wallDeltaMs < 60_000) { this.sessionState.totalWatchedMs += wallDeltaMs; if (!this.sessionState.isPaused) { this.sessionState.activeWatchedMs += wallDeltaMs; } } if (this.sessionState.lastMediaMs !== null) { const mediaDeltaMs = mediaMs - this.sessionState.lastMediaMs; if (Math.abs(mediaDeltaMs) >= 1_000) { if (mediaDeltaMs > 0) { this.sessionState.seekForwardCount += 1; this.sessionState.pendingTelemetry = true; this.recordWrite({ kind: "event", sessionId: this.sessionState.sessionId, sampleMs: nowMs, eventType: EVENT_SEEK_FORWARD, wordsDelta: 0, cardsDelta: 0, segmentStartMs: this.sessionState.lastMediaMs, segmentEndMs: mediaMs, payloadJson: this.sanitizePayload({ fromMs: this.sessionState.lastMediaMs, toMs: mediaMs, }), }); } else if (mediaDeltaMs < 0) { this.sessionState.seekBackwardCount += 1; this.sessionState.pendingTelemetry = true; this.recordWrite({ kind: "event", sessionId: this.sessionState.sessionId, sampleMs: nowMs, eventType: EVENT_SEEK_BACKWARD, wordsDelta: 0, cardsDelta: 0, segmentStartMs: this.sessionState.lastMediaMs, segmentEndMs: mediaMs, payloadJson: this.sanitizePayload({ fromMs: this.sessionState.lastMediaMs, toMs: mediaMs, }), }); } } } this.sessionState.lastWallClockMs = nowMs; this.sessionState.lastMediaMs = mediaMs; this.sessionState.pendingTelemetry = true; } recordPauseState(isPaused: boolean): void { if (!this.sessionState) return; if (this.sessionState.isPaused === isPaused) return; const nowMs = Date.now(); this.sessionState.isPaused = isPaused; if (isPaused) { this.sessionState.lastPauseStartMs = nowMs; this.sessionState.pauseCount += 1; this.recordWrite({ kind: "event", sessionId: this.sessionState.sessionId, sampleMs: nowMs, eventType: EVENT_PAUSE_START, cardsDelta: 0, wordsDelta: 0, payloadJson: this.sanitizePayload({ paused: true }), }); } else { if (this.sessionState.lastPauseStartMs) { const pauseMs = Math.max(0, nowMs - this.sessionState.lastPauseStartMs); this.sessionState.pauseMs += pauseMs; this.sessionState.lastPauseStartMs = null; } this.recordWrite({ kind: "event", sessionId: this.sessionState.sessionId, sampleMs: nowMs, eventType: EVENT_PAUSE_END, cardsDelta: 0, wordsDelta: 0, payloadJson: this.sanitizePayload({ paused: false }), }); } this.sessionState.pendingTelemetry = true; } recordLookup(hit: boolean): void { if (!this.sessionState) return; this.sessionState.lookupCount += 1; if (hit) { this.sessionState.lookupHits += 1; } this.sessionState.pendingTelemetry = true; this.recordWrite({ kind: "event", sessionId: this.sessionState.sessionId, sampleMs: Date.now(), eventType: EVENT_LOOKUP, cardsDelta: 0, wordsDelta: 0, payloadJson: this.sanitizePayload({ hit, }), }); } recordCardsMined(count = 1): void { if (!this.sessionState) return; this.sessionState.cardsMined += count; this.sessionState.pendingTelemetry = true; this.recordWrite({ kind: "event", sessionId: this.sessionState.sessionId, sampleMs: Date.now(), eventType: EVENT_CARD_MINED, wordsDelta: 0, cardsDelta: count, payloadJson: this.sanitizePayload({ cardsMined: count }), }); } recordMediaBufferEvent(): void { if (!this.sessionState) return; this.sessionState.mediaBufferEvents += 1; this.sessionState.pendingTelemetry = true; this.recordWrite({ kind: "event", sessionId: this.sessionState.sessionId, sampleMs: Date.now(), eventType: EVENT_MEDIA_BUFFER, cardsDelta: 0, wordsDelta: 0, payloadJson: this.sanitizePayload({ buffer: true, }), }); } private recordWrite(write: QueuedWrite): void { if (this.isDestroyed) return; if (this.queue.length >= this.queueCap) { const overflow = this.queue.length - this.queueCap + 1; this.queue.splice(0, overflow); this.droppedWriteCount += overflow; this.logger.warn( `Immersion tracker queue overflow; dropped ${overflow} oldest writes`, ); } this.queue.push(write); this.lastQueueWriteAtMs = Date.now(); if (write.kind === "event" || this.queue.length >= this.batchSize) { this.scheduleFlush(0); } } private flushTelemetry(force = false): void { if (!this.sessionState || (!force && !this.sessionState.pendingTelemetry)) { return; } this.recordWrite({ kind: "telemetry", sessionId: this.sessionState.sessionId, sampleMs: Date.now(), totalWatchedMs: this.sessionState.totalWatchedMs, activeWatchedMs: this.sessionState.activeWatchedMs, linesSeen: this.sessionState.linesSeen, wordsSeen: this.sessionState.wordsSeen, tokensSeen: this.sessionState.tokensSeen, cardsMined: this.sessionState.cardsMined, lookupCount: this.sessionState.lookupCount, lookupHits: this.sessionState.lookupHits, pauseCount: this.sessionState.pauseCount, pauseMs: this.sessionState.pauseMs, seekForwardCount: this.sessionState.seekForwardCount, seekBackwardCount: this.sessionState.seekBackwardCount, mediaBufferEvents: this.sessionState.mediaBufferEvents, }); this.sessionState.pendingTelemetry = false; } private scheduleFlush(delayMs = this.flushIntervalMs): void { if (this.flushScheduled || this.writeLock.locked) return; this.flushScheduled = true; this.flushTimer = setTimeout(() => { this.flushScheduled = false; this.flushNow(); }, delayMs); } private flushNow(): void { if (this.writeLock.locked || this.isDestroyed) return; if (this.queue.length === 0) { this.flushScheduled = false; return; } this.flushTelemetry(); if (this.queue.length === 0) { this.flushScheduled = false; return; } const batch = this.queue.splice(0, Math.min(this.batchSize, this.queue.length)); this.writeLock.locked = true; try { this.db.exec("BEGIN IMMEDIATE"); for (const write of batch) { this.flushSingle(write); } this.db.exec("COMMIT"); } catch (error) { this.db.exec("ROLLBACK"); this.queue.unshift(...batch); this.logger.warn("Immersion tracker flush failed, retrying later", error as Error); } finally { this.writeLock.locked = false; this.flushScheduled = false; if (this.queue.length > 0) { this.scheduleFlush(this.flushIntervalMs); } } } private flushSingle(write: QueuedWrite): void { if (write.kind === "telemetry") { this.telemetryInsertStmt.run( write.sessionId, write.sampleMs!, write.totalWatchedMs!, write.activeWatchedMs!, write.linesSeen!, write.wordsSeen!, write.tokensSeen!, write.cardsMined!, write.lookupCount!, write.lookupHits!, write.pauseCount!, write.pauseMs!, write.seekForwardCount!, write.seekBackwardCount!, write.mediaBufferEvents!, ); return; } this.eventInsertStmt.run( write.sessionId, write.sampleMs!, write.eventType!, write.lineIndex ?? null, write.segmentStartMs ?? null, write.segmentEndMs ?? null, write.wordsDelta ?? 0, write.cardsDelta ?? 0, write.payloadJson ?? null, ); } private applyPragmas(): void { this.db.exec("PRAGMA journal_mode = WAL"); this.db.exec("PRAGMA synchronous = NORMAL"); this.db.exec("PRAGMA foreign_keys = ON"); this.db.exec("PRAGMA busy_timeout = 2500"); } private ensureSchema(): void { this.db.exec(` CREATE TABLE IF NOT EXISTS imm_schema_version ( schema_version INTEGER PRIMARY KEY, applied_at_ms INTEGER NOT NULL ); `); const currentVersion = this.db .prepare( "SELECT schema_version FROM imm_schema_version ORDER BY schema_version DESC LIMIT 1", ) .get() as { schema_version: number } | null; if (currentVersion?.schema_version === SCHEMA_VERSION) { return; } this.db.exec(` CREATE TABLE IF NOT EXISTS imm_videos( video_id INTEGER PRIMARY KEY AUTOINCREMENT, video_key TEXT NOT NULL UNIQUE, canonical_title TEXT NOT NULL, source_type INTEGER NOT NULL, source_path TEXT, source_url TEXT, duration_ms INTEGER NOT NULL CHECK(duration_ms>=0), file_size_bytes INTEGER CHECK(file_size_bytes>=0), codec_id INTEGER, container_id INTEGER, width_px INTEGER, height_px INTEGER, fps_x100 INTEGER, bitrate_kbps INTEGER, audio_codec_id INTEGER, hash_sha256 TEXT, screenshot_path TEXT, metadata_json TEXT, created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL ); `); this.db.exec(` CREATE TABLE IF NOT EXISTS imm_sessions( session_id INTEGER PRIMARY KEY AUTOINCREMENT, session_uuid TEXT NOT NULL UNIQUE, video_id INTEGER NOT NULL, started_at_ms INTEGER NOT NULL, ended_at_ms INTEGER, status INTEGER NOT NULL, locale_id INTEGER, target_lang_id INTEGER, difficulty_tier INTEGER, subtitle_mode INTEGER, created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL, FOREIGN KEY(video_id) REFERENCES imm_videos(video_id) ); `); this.db.exec(` CREATE TABLE IF NOT EXISTS imm_session_telemetry( telemetry_id INTEGER PRIMARY KEY AUTOINCREMENT, session_id INTEGER NOT NULL, sample_ms INTEGER NOT NULL, total_watched_ms INTEGER NOT NULL DEFAULT 0, active_watched_ms INTEGER NOT NULL DEFAULT 0, lines_seen INTEGER NOT NULL DEFAULT 0, words_seen INTEGER NOT NULL DEFAULT 0, tokens_seen INTEGER NOT NULL DEFAULT 0, cards_mined INTEGER NOT NULL DEFAULT 0, lookup_count INTEGER NOT NULL DEFAULT 0, lookup_hits INTEGER NOT NULL DEFAULT 0, pause_count INTEGER NOT NULL DEFAULT 0, pause_ms INTEGER NOT NULL DEFAULT 0, seek_forward_count INTEGER NOT NULL DEFAULT 0, seek_backward_count INTEGER NOT NULL DEFAULT 0, media_buffer_events INTEGER NOT NULL DEFAULT 0, FOREIGN KEY(session_id) REFERENCES imm_sessions(session_id) ON DELETE CASCADE ); `); this.db.exec(` CREATE TABLE IF NOT EXISTS imm_session_events( event_id INTEGER PRIMARY KEY AUTOINCREMENT, session_id INTEGER NOT NULL, ts_ms INTEGER NOT NULL, event_type INTEGER NOT NULL, line_index INTEGER, segment_start_ms INTEGER, segment_end_ms INTEGER, words_delta INTEGER NOT NULL DEFAULT 0, cards_delta INTEGER NOT NULL DEFAULT 0, payload_json TEXT, FOREIGN KEY(session_id) REFERENCES imm_sessions(session_id) ON DELETE CASCADE ); `); this.db.exec(` CREATE TABLE IF NOT EXISTS imm_daily_rollups( rollup_day INTEGER NOT NULL, video_id INTEGER, total_sessions INTEGER NOT NULL DEFAULT 0, total_active_min REAL NOT NULL DEFAULT 0, total_lines_seen INTEGER NOT NULL DEFAULT 0, total_words_seen INTEGER NOT NULL DEFAULT 0, total_tokens_seen INTEGER NOT NULL DEFAULT 0, total_cards INTEGER NOT NULL DEFAULT 0, cards_per_hour REAL, words_per_min REAL, lookup_hit_rate REAL, PRIMARY KEY (rollup_day, video_id) ); `); this.db.exec(` CREATE TABLE IF NOT EXISTS imm_monthly_rollups( rollup_month INTEGER NOT NULL, video_id INTEGER, total_sessions INTEGER NOT NULL DEFAULT 0, total_active_min REAL NOT NULL DEFAULT 0, total_lines_seen INTEGER NOT NULL DEFAULT 0, total_words_seen INTEGER NOT NULL DEFAULT 0, total_tokens_seen INTEGER NOT NULL DEFAULT 0, total_cards INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (rollup_month, video_id) ); `); this.db.exec(` CREATE INDEX IF NOT EXISTS idx_sessions_video_started ON imm_sessions(video_id, started_at_ms DESC) `); this.db.exec(` CREATE INDEX IF NOT EXISTS idx_sessions_status_started ON imm_sessions(status, started_at_ms DESC) `); this.db.exec(` CREATE INDEX IF NOT EXISTS idx_telemetry_session_sample ON imm_session_telemetry(session_id, sample_ms DESC) `); this.db.exec(` CREATE INDEX IF NOT EXISTS idx_events_session_ts ON imm_session_events(session_id, ts_ms DESC) `); this.db.exec(` CREATE INDEX IF NOT EXISTS idx_events_type_ts ON imm_session_events(event_type, ts_ms DESC) `); this.db.exec(` CREATE INDEX IF NOT EXISTS idx_rollups_day_video ON imm_daily_rollups(rollup_day, video_id) `); this.db.exec(` CREATE INDEX IF NOT EXISTS idx_rollups_month_video ON imm_monthly_rollups(rollup_month, video_id) `); this.db.exec(` INSERT INTO imm_schema_version(schema_version, applied_at_ms) VALUES (${SCHEMA_VERSION}, ${Date.now()}) ON CONFLICT DO NOTHING `); } private scheduleMaintenance(): void { this.maintenanceTimer = setInterval(() => { this.runMaintenance(); }, this.maintenanceIntervalMs); this.runMaintenance(); } private runMaintenance(): void { if (this.isDestroyed) return; try { this.flushTelemetry(true); this.flushNow(); const nowMs = Date.now(); const eventCutoff = nowMs - EVENTS_RETENTION_MS; const telemetryCutoff = nowMs - TELEMETRY_RETENTION_MS; const dailyCutoff = nowMs - DAILY_ROLLUP_RETENTION_MS; const monthlyCutoff = nowMs - MONTHLY_ROLLUP_RETENTION_MS; const dayCutoff = Math.floor(dailyCutoff / 86_400_000); const monthCutoff = this.toMonthKey(monthlyCutoff); this.db.prepare(`DELETE FROM imm_session_events WHERE ts_ms < ?`).run(eventCutoff); this.db.prepare(`DELETE FROM imm_session_telemetry WHERE sample_ms < ?`).run(telemetryCutoff); this.db.prepare(`DELETE FROM imm_daily_rollups WHERE rollup_day < ?`).run(dayCutoff); this.db.prepare(`DELETE FROM imm_monthly_rollups WHERE rollup_month < ?`).run(monthCutoff); this.db .prepare(`DELETE FROM imm_sessions WHERE ended_at_ms IS NOT NULL AND ended_at_ms < ?`) .run(telemetryCutoff); this.runRollupMaintenance(); if ( nowMs - this.lastVacuumMs >= VACUUM_INTERVAL_MS && !this.writeLock.locked ) { this.db.exec("VACUUM"); this.lastVacuumMs = nowMs; } this.lastMaintenanceMs = nowMs; } catch (error) { this.logger.warn( "Immersion tracker maintenance failed, will retry later", (error as Error).message, ); } } private runRollupMaintenance(): void { this.db.exec(` INSERT OR REPLACE INTO imm_daily_rollups ( rollup_day, video_id, total_sessions, total_active_min, total_lines_seen, total_words_seen, total_tokens_seen, total_cards, cards_per_hour, words_per_min, lookup_hit_rate ) SELECT CAST(s.started_at_ms / 86400000 AS INTEGER) AS rollup_day, s.video_id AS video_id, COUNT(DISTINCT s.session_id) AS total_sessions, COALESCE(SUM(t.active_watched_ms), 0) / 60000.0 AS total_active_min, COALESCE(SUM(t.lines_seen), 0) AS total_lines_seen, COALESCE(SUM(t.words_seen), 0) AS total_words_seen, COALESCE(SUM(t.tokens_seen), 0) AS total_tokens_seen, COALESCE(SUM(t.cards_mined), 0) AS total_cards, CASE WHEN COALESCE(SUM(t.active_watched_ms), 0) > 0 THEN (COALESCE(SUM(t.cards_mined), 0) * 60.0) / (COALESCE(SUM(t.active_watched_ms), 0) / 60000.0) ELSE NULL END AS cards_per_hour, CASE WHEN COALESCE(SUM(t.active_watched_ms), 0) > 0 THEN COALESCE(SUM(t.words_seen), 0) / (COALESCE(SUM(t.active_watched_ms), 0) / 60000.0) ELSE NULL END AS words_per_min, CASE WHEN COALESCE(SUM(t.lookup_count), 0) > 0 THEN CAST(COALESCE(SUM(t.lookup_hits), 0) AS REAL) / CAST(SUM(t.lookup_count) AS REAL) ELSE NULL END AS lookup_hit_rate FROM imm_sessions s JOIN imm_session_telemetry t ON t.session_id = s.session_id GROUP BY rollup_day, s.video_id `); this.db.exec(` INSERT OR REPLACE INTO imm_monthly_rollups ( rollup_month, video_id, total_sessions, total_active_min, total_lines_seen, total_words_seen, total_tokens_seen, total_cards ) SELECT CAST(strftime('%Y%m', s.started_at_ms / 1000, 'unixepoch') AS INTEGER) AS rollup_month, s.video_id AS video_id, COUNT(DISTINCT s.session_id) AS total_sessions, COALESCE(SUM(t.active_watched_ms), 0) / 60000.0 AS total_active_min, COALESCE(SUM(t.lines_seen), 0) AS total_lines_seen, COALESCE(SUM(t.words_seen), 0) AS total_words_seen, COALESCE(SUM(t.tokens_seen), 0) AS total_tokens_seen, COALESCE(SUM(t.cards_mined), 0) AS total_cards FROM imm_sessions s JOIN imm_session_telemetry t ON t.session_id = s.session_id GROUP BY rollup_month, s.video_id `); } private toMonthKey(timestampMs: number): number { const monthDate = new Date(timestampMs); return monthDate.getUTCFullYear() * 100 + monthDate.getUTCMonth() + 1; } private startSession(videoId: number, startedAtMs?: number): void { const nowMs = startedAtMs ?? Date.now(); const result = this.startSessionStatement(videoId, nowMs); const sessionId = Number(result.lastInsertRowid); this.sessionState = { sessionId, videoId, startedAtMs: nowMs, currentLineIndex: 0, totalWatchedMs: 0, activeWatchedMs: 0, linesSeen: 0, wordsSeen: 0, tokensSeen: 0, cardsMined: 0, lookupCount: 0, lookupHits: 0, pauseCount: 0, pauseMs: 0, seekForwardCount: 0, seekBackwardCount: 0, mediaBufferEvents: 0, lastWallClockMs: 0, lastMediaMs: null, lastPauseStartMs: null, isPaused: false, pendingTelemetry: true, }; this.recordWrite({ kind: "telemetry", sessionId, sampleMs: nowMs, totalWatchedMs: 0, activeWatchedMs: 0, linesSeen: 0, wordsSeen: 0, tokensSeen: 0, cardsMined: 0, lookupCount: 0, lookupHits: 0, pauseCount: 0, pauseMs: 0, seekForwardCount: 0, seekBackwardCount: 0, mediaBufferEvents: 0, }); this.scheduleFlush(0); } private startSessionStatement(videoId: number, startedAtMs: number): { lastInsertRowid: number | bigint; } { const sessionUuid = crypto.randomUUID(); return this.db .prepare(` INSERT INTO imm_sessions ( session_uuid, video_id, started_at_ms, status, created_at_ms, updated_at_ms ) VALUES (?, ?, ?, ?, ?, ?) `) .run( sessionUuid, videoId, startedAtMs, SESSION_STATUS_ACTIVE, startedAtMs, startedAtMs, ); } private finalizeActiveSession(): void { if (!this.sessionState) return; const endedAt = Date.now(); if (this.sessionState.lastPauseStartMs) { this.sessionState.pauseMs += Math.max( 0, endedAt - this.sessionState.lastPauseStartMs, ); this.sessionState.lastPauseStartMs = null; } const finalWallNow = endedAt; if (this.sessionState.lastWallClockMs > 0) { const wallDelta = finalWallNow - this.sessionState.lastWallClockMs; if (wallDelta > 0 && wallDelta < 60_000) { this.sessionState.totalWatchedMs += wallDelta; if (!this.sessionState.isPaused) { this.sessionState.activeWatchedMs += wallDelta; } } } this.flushTelemetry(true); this.flushNow(); this.sessionState.pendingTelemetry = false; this.db .prepare( "UPDATE imm_sessions SET ended_at_ms = ?, status = ?, updated_at_ms = ? WHERE session_id = ?", ) .run(endedAt, SESSION_STATUS_ENDED, Date.now(), this.sessionState.sessionId); this.sessionState = null; } private getOrCreateVideo(videoKey: string, details: { canonicalTitle: string; sourcePath: string | null; sourceUrl: string | null; sourceType: number; }): number { const existing = this.db .prepare("SELECT video_id FROM imm_videos WHERE video_key = ?") .get(videoKey) as { video_id: number } | null; if (existing?.video_id) { this.db .prepare( "UPDATE imm_videos SET canonical_title = ?, updated_at_ms = ? WHERE video_id = ?", ) .run(details.canonicalTitle || "unknown", Date.now(), existing.video_id); return existing.video_id; } const nowMs = Date.now(); const insert = this.db.prepare(` INSERT INTO imm_videos ( video_key, canonical_title, source_type, source_path, source_url, duration_ms, file_size_bytes, codec_id, container_id, width_px, height_px, fps_x100, bitrate_kbps, audio_codec_id, hash_sha256, screenshot_path, metadata_json, created_at_ms, updated_at_ms ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `); const result = insert.run( videoKey, details.canonicalTitle || "unknown", details.sourceType, details.sourcePath, details.sourceUrl, 0, null, null, null, null, null, null, null, null, null, null, null, nowMs, nowMs, ); return Number(result.lastInsertRowid); } private updateVideoMetadata(videoId: number, metadata: VideoMetadata): void { this.db .prepare(` UPDATE imm_videos SET duration_ms = ?, file_size_bytes = ?, codec_id = ?, container_id = ?, width_px = ?, height_px = ?, fps_x100 = ?, bitrate_kbps = ?, audio_codec_id = ?, hash_sha256 = ?, screenshot_path = ?, metadata_json = ?, updated_at_ms = ? WHERE video_id = ? `) .run( metadata.durationMs, metadata.fileSizeBytes, metadata.codecId, metadata.containerId, metadata.widthPx, metadata.heightPx, metadata.fpsX100, metadata.bitrateKbps, metadata.audioCodecId, metadata.hashSha256, metadata.screenshotPath, metadata.metadataJson, Date.now(), videoId, ); } private captureVideoMetadataAsync( videoId: number, sourceType: number, mediaPath: string, ): void { if (sourceType !== SOURCE_TYPE_LOCAL) return; void (async () => { try { const metadata = await this.getLocalVideoMetadata(mediaPath); this.updateVideoMetadata(videoId, metadata); } catch (error) { this.logger.warn( "Unable to capture local video metadata", (error as Error).message, ); } })(); } private async getLocalVideoMetadata(mediaPath: string): Promise { const hash = await this.computeSha256(mediaPath); const info = await this.runFfprobe(mediaPath); const stat = await fs.promises.stat(mediaPath); return { sourceType: SOURCE_TYPE_LOCAL, canonicalTitle: this.deriveCanonicalTitle(mediaPath), durationMs: info.durationMs || 0, fileSizeBytes: Number.isFinite(stat.size) ? stat.size : null, codecId: info.codecId ?? null, containerId: info.containerId ?? null, widthPx: info.widthPx ?? null, heightPx: info.heightPx ?? null, fpsX100: info.fpsX100 ?? null, bitrateKbps: info.bitrateKbps ?? null, audioCodecId: info.audioCodecId ?? null, hashSha256: hash, screenshotPath: null, metadataJson: null, }; } private async computeSha256(mediaPath: string): Promise { return new Promise((resolve) => { const file = fs.createReadStream(mediaPath); const digest = crypto.createHash("sha256"); file.on("data", (chunk) => digest.update(chunk)); file.on("end", () => resolve(digest.digest("hex"))); file.on("error", () => resolve(null)); }); } private runFfprobe(mediaPath: string): Promise<{ durationMs: number | null; codecId: number | null; containerId: number | null; widthPx: number | null; heightPx: number | null; fpsX100: number | null; bitrateKbps: number | null; audioCodecId: number | null; }> { return new Promise((resolve) => { const child = spawn("ffprobe", [ "-v", "error", "-print_format", "json", "-show_entries", "stream=codec_type,codec_tag_string,width,height,avg_frame_rate,bit_rate", "-show_entries", "format=duration,bit_rate", mediaPath, ]); let output = ""; let errorOutput = ""; child.stdout.on("data", (chunk) => { output += chunk.toString("utf-8"); }); child.stderr.on("data", (chunk) => { errorOutput += chunk.toString("utf-8"); }); child.on("error", () => resolve(this.emptyMetadata())); child.on("close", () => { if (errorOutput && output.length === 0) { resolve(this.emptyMetadata()); return; } try { const parsed = JSON.parse(output) as { format?: { duration?: string; bit_rate?: string }; streams?: Array<{ codec_type?: string; codec_tag_string?: string; width?: number; height?: number; avg_frame_rate?: string; bit_rate?: string; }>; }; const durationText = parsed.format?.duration; const bitrateText = parsed.format?.bit_rate; const durationMs = Number(durationText) ? Math.round(Number(durationText) * 1000) : null; const bitrateKbps = Number(bitrateText) ? Math.round(Number(bitrateText) / 1000) : null; let codecId: number | null = null; let containerId: number | null = null; let widthPx: number | null = null; let heightPx: number | null = null; let fpsX100: number | null = null; let audioCodecId: number | null = null; for (const stream of parsed.streams ?? []) { if (stream.codec_type === "video") { widthPx = this.toNullableInt(stream.width); heightPx = this.toNullableInt(stream.height); fpsX100 = this.parseFps(stream.avg_frame_rate); codecId = this.hashToCode(stream.codec_tag_string); containerId = 0; } if (stream.codec_type === "audio") { audioCodecId = this.hashToCode(stream.codec_tag_string); if (audioCodecId && audioCodecId > 0) { break; } } } resolve({ durationMs, codecId, containerId, widthPx, heightPx, fpsX100, bitrateKbps, audioCodecId, }); } catch { resolve(this.emptyMetadata()); } }); }); } private emptyMetadata(): { durationMs: number | null; codecId: number | null; containerId: number | null; widthPx: number | null; heightPx: number | null; fpsX100: number | null; bitrateKbps: number | null; audioCodecId: number | null; } { return { durationMs: null, codecId: null, containerId: null, widthPx: null, heightPx: null, fpsX100: null, bitrateKbps: null, audioCodecId: null, }; } private parseFps(value?: string): number | null { if (!value || typeof value !== "string") return null; const [num, den] = value.split("/"); const n = Number(num); const d = Number(den); if (!Number.isFinite(n) || !Number.isFinite(d) || d === 0) return null; const fps = n / d; return Number.isFinite(fps) ? Math.round(fps * 100) : null; } private hashToCode(input?: string): number | null { if (!input) return null; let hash = 0; for (let i = 0; i < input.length; i += 1) { hash = (hash * 31 + input.charCodeAt(i)) & 0x7fffffff; } return hash || null; } private sanitizePayload(payload: Record): string { const json = JSON.stringify(payload); return json.length <= MAX_PAYLOAD_BYTES ? json : JSON.stringify({ truncated: true }); } private calculateTextMetrics(value: string): { words: number; tokens: number } { const words = value.split(/\s+/).filter(Boolean).length; const cjkCount = (value.match(/[\u3040-\u30ff\u4e00-\u9fff]/g)?.length ?? 0); const tokens = Math.max(words, cjkCount); return { words, tokens }; } private secToMs(seconds: number): number { const coerced = Number(seconds); if (!Number.isFinite(coerced)) return 0; return Math.round(coerced * 1000); } private normalizeMediaPath(mediaPath: string | null): string { if (!mediaPath || !mediaPath.trim()) return ""; return mediaPath.trim(); } private normalizeText(value: string | null | undefined): string { if (!value) return ""; return value.trim().replace(/\s+/g, " "); } private buildVideoKey(mediaPath: string, sourceType: number): string { if (sourceType === SOURCE_TYPE_REMOTE) { return `remote:${mediaPath}`; } return `local:${mediaPath}`; } private isRemoteSource(mediaPath: string): boolean { return /^[a-z][a-z0-9+.-]*:\/\//i.test(mediaPath); } private deriveCanonicalTitle(mediaPath: string): string { if (this.isRemoteSource(mediaPath)) { try { const parsed = new URL(mediaPath); const parts = parsed.pathname.split("/").filter(Boolean); if (parts.length > 0) { const leaf = decodeURIComponent(parts[parts.length - 1]); return this.normalizeText(leaf.replace(/\.[^/.]+$/, "")); } return this.normalizeText(parsed.hostname) || "unknown"; } catch { return this.normalizeText(mediaPath); } } const filename = path.basename(mediaPath); return this.normalizeText(filename.replace(/\.[^/.]+$/, "")); } private toNullableInt(value: number | null | undefined): number | null { if (value === null || value === undefined || !Number.isFinite(value)) return null; return value; } private updateVideoTitleForActiveSession(canonicalTitle: string): void { if (!this.sessionState) return; this.db .prepare( "UPDATE imm_videos SET canonical_title = ?, updated_at_ms = ? WHERE video_id = ?", ) .run(canonicalTitle, Date.now(), this.sessionState.videoId); } }