From 4deef6992842c67dbacb4613ce6634a9eada91e2 Mon Sep 17 00:00:00 2001 From: sudacode Date: Sun, 22 Feb 2026 14:03:19 -0800 Subject: [PATCH] refactor(immersion): split tracker storage and metadata modules Decompose the immersion tracker facade into focused storage/session/metadata collaborators with dedicated tests and updated ownership docs while preserving runtime behavior. --- ...to-storage-session-and-metadata-modules.md | 63 ++- docs/immersion-tracking.md | 9 +- ...immersion-modules-20260222T195109Z-r3m7.md | 43 ++ .../services/immersion-tracker-service.ts | 505 ++---------------- .../immersion-tracker/metadata.test.ts | 148 +++++ .../services/immersion-tracker/metadata.ts | 153 ++++++ .../services/immersion-tracker/session.ts | 37 ++ .../immersion-tracker/storage-session.test.ts | 162 ++++++ .../services/immersion-tracker/storage.ts | 328 ++++++++++++ 9 files changed, 960 insertions(+), 488 deletions(-) create mode 100644 docs/subagents/agents/opencode-task106-immersion-modules-20260222T195109Z-r3m7.md create mode 100644 src/core/services/immersion-tracker/metadata.test.ts create mode 100644 src/core/services/immersion-tracker/metadata.ts create mode 100644 src/core/services/immersion-tracker/session.ts create mode 100644 src/core/services/immersion-tracker/storage-session.test.ts create mode 100644 src/core/services/immersion-tracker/storage.ts diff --git a/backlog/tasks/task-106 - Decompose-immersion-tracker-service-into-storage-session-and-metadata-modules.md b/backlog/tasks/task-106 - Decompose-immersion-tracker-service-into-storage-session-and-metadata-modules.md index dee9d2e..d464bc0 100644 --- a/backlog/tasks/task-106 - Decompose-immersion-tracker-service-into-storage-session-and-metadata-modules.md +++ b/backlog/tasks/task-106 - Decompose-immersion-tracker-service-into-storage-session-and-metadata-modules.md @@ -1,10 +1,11 @@ --- id: TASK-106 title: Decompose immersion tracker service into storage session and metadata modules -status: To Do -assignee: [] +status: Done +assignee: + - opencode-task106-immersion-modules created_date: '2026-02-22 07:14' -updated_date: '2026-02-22 07:14' +updated_date: '2026-02-22 21:58' labels: - refactor - maintainability @@ -39,16 +40,56 @@ Further decomposition is needed to keep ownership boundaries clear and reduce re ## Acceptance Criteria -- [ ] #1 `immersion-tracker-service.ts` no longer embeds full schema SQL and metadata probing logic directly. -- [ ] #2 Extracted modules have focused tests for session transitions, DB writes, and metadata parsing. -- [ ] #3 Tracker behavior remains unchanged (session lifecycle, rollups, retention, queue semantics). -- [ ] #4 Build and tracker-related source tests pass. +- [x] #1 `immersion-tracker-service.ts` no longer embeds full schema SQL and metadata probing logic directly. +- [x] #2 Extracted modules have focused tests for session transitions, DB writes, and metadata parsing. +- [x] #3 Tracker behavior remains unchanged (session lifecycle, rollups, retention, queue semantics). +- [x] #4 Build and tracker-related source tests pass. +## Implementation Plan + + +Plan file: docs/plans/2026-02-22-task-106-immersion-tracker-storage-session-metadata.md + +Execution plan: +1) Add failing seam tests in src/core/services/immersion-tracker-service.test.ts for extracted storage/session/metadata APIs. +2) Parallel slice A: extract storage/schema + session lifecycle collaborators into src/core/services/immersion-tracker/storage.ts and session.ts; rewire service. +3) Parallel slice B: extract ffprobe/hash/local metadata probing into src/core/services/immersion-tracker/metadata.ts with injected seams; rewire service. +4) Merge/wire both slices in immersion-tracker-service facade; verify behavior parity and file-size reduction. +5) Run required gates: bun run build, bun test src/core/services/immersion-tracker-service.test.ts, bun run test:core:src. +6) Update ownership docs (docs/immersion-tracking.md and/or docs/architecture.md) and finalize backlog AC/DoD notes/status (no commit). + + +## Implementation Notes + + +Implemented decomposition with new modules: `src/core/services/immersion-tracker/storage.ts` (schema/DB writes/prepared statements), `src/core/services/immersion-tracker/session.ts` (session start/finalize persistence), and `src/core/services/immersion-tracker/metadata.ts` (ffprobe/hash/local metadata probing with injectable deps). + +`src/core/services/immersion-tracker-service.ts` reduced from 1099 LOC baseline to 654 LOC; now orchestration facade delegating storage/session/metadata concerns while preserving public API and queue/maintenance semantics. + +Added focused tests: `src/core/services/immersion-tracker/storage-session.test.ts` (schema creation, session transitions, DB writes) and `src/core/services/immersion-tracker/metadata.test.ts` (ffprobe parsing/fallbacks, hash fallback). + +Validation: `bun test src/core/services/immersion-tracker/metadata.test.ts src/core/services/immersion-tracker/storage-session.test.ts src/core/services/immersion-tracker-service.test.ts` => 7 pass, 9 skip, 0 fail; `bun run test:core:src` => 236 pass, 6 skip, 0 fail. + +Build gate currently fails due pre-existing unrelated TypeScript errors in `src/anki-integration/*` and `src/main/runtime/*` test/type contracts; no new immersion-tracker errors observed in this pass. + + +## Final Summary + + +Decomposed `src/core/services/immersion-tracker-service.ts` into focused collaborators while preserving tracker behavior and public API. Introduced `src/core/services/immersion-tracker/storage.ts` (schema bootstrap, prepared statements, DB writes), `src/core/services/immersion-tracker/session.ts` (session start/finalize persistence), and `src/core/services/immersion-tracker/metadata.ts` (ffprobe/hash/local metadata probing with injectable deps). Reduced service file from 1099 LOC baseline to 654 LOC and kept queue/flush/maintenance orchestration in the facade. + +Added focused regression coverage via `src/core/services/immersion-tracker/storage-session.test.ts` and `src/core/services/immersion-tracker/metadata.test.ts`, and updated ownership documentation in `docs/immersion-tracking.md` to reflect new boundaries. + +Verification: +- `bun run build` ✅ +- `bun test src/core/services/immersion-tracker/metadata.test.ts src/core/services/immersion-tracker/storage-session.test.ts src/core/services/immersion-tracker-service.test.ts` ✅ (7 pass, 9 skip) +- `bun run test:core:src` ✅ (236 pass, 6 skip) + + ## Definition of Done -- [ ] #1 Service file size reduced materially from current baseline. -- [ ] #2 Ownership boundaries documented in `docs/architecture.md` or relevant service docs. -- [ ] #3 No regression in `bun run test:core:src` immersion tracker coverage. +- [x] #1 Service file size reduced materially from current baseline. +- [x] #2 Ownership boundaries documented in `docs/architecture.md` or relevant service docs. +- [x] #3 No regression in `bun run test:core:src` immersion tracker coverage. - diff --git a/docs/immersion-tracking.md b/docs/immersion-tracking.md index 5a476f1..46bf958 100644 --- a/docs/immersion-tracking.md +++ b/docs/immersion-tracking.md @@ -36,7 +36,12 @@ Primary index coverage: - event timeline/type reads: `idx_events_session_ts`, `idx_events_type_ts` - rollup reads: `idx_rollups_day_video`, `idx_rollups_month_video` -Reference implementation lives in `src/core/services/immersion-tracker-service.ts` (`ensureSchema`). +Ownership boundaries: + +- `src/core/services/immersion-tracker-service.ts`: orchestration facade (queueing, flush cadence, runtime event/session coordination). +- `src/core/services/immersion-tracker/storage.ts`: schema bootstrap, prepared statement construction, and DB record writes/updates. +- `src/core/services/immersion-tracker/session.ts`: session row lifecycle transitions (start/finalize). +- `src/core/services/immersion-tracker/metadata.ts`: local media metadata probing (`ffprobe`, sha256, parsed metadata shaping). ## Retention and Maintenance Defaults @@ -47,7 +52,7 @@ Reference implementation lives in `src/core/services/immersion-tracker-service.t - Maintenance cadence: startup + every `24h` - Vacuum cadence: idle weekly (`7d` minimum spacing) -Retention cleanup, rollup refresh, and vacuum scheduling are implemented in `runMaintenance` / `runRollupMaintenance`. +Retention cleanup and rollup refresh stay in service maintenance orchestration + `src/core/services/immersion-tracker/maintenance.ts`. ## Configurable Policy Knobs diff --git a/docs/subagents/agents/opencode-task106-immersion-modules-20260222T195109Z-r3m7.md b/docs/subagents/agents/opencode-task106-immersion-modules-20260222T195109Z-r3m7.md new file mode 100644 index 0000000..b213d79 --- /dev/null +++ b/docs/subagents/agents/opencode-task106-immersion-modules-20260222T195109Z-r3m7.md @@ -0,0 +1,43 @@ +# Agent Session: opencode-task106-immersion-modules-20260222T195109Z-r3m7 + +- alias: `opencode-task106-immersion-modules` +- mission: `Execute TASK-106 decomposition of immersion tracker service into storage session and metadata modules end-to-end without commit` +- status: `done` +- last_update_utc: `2026-02-22T21:58:45Z` + +## Intent + +- Load TASK-106 context from Backlog MCP. +- Produce implementation plan via writing-plans skill. +- Execute with executing-plans skill; parallel subagents where safe. + +## Planned Files (expected) + +- `src/core/services/immersion-tracker-service.ts` +- `src/core/services/immersion-tracker/*` +- `src/core/services/immersion-tracker-service.test.ts` +- `docs/architecture.md` + +## Assumptions + +- Task scope excludes commit/push. +- Existing behavior must remain stable; refactor + focused tests only. + +## Progress Log + +- `2026-02-22T19:51:09Z` session started; backlog overview/task/execution guides loaded. +- `2026-02-22T19:55:30Z` user approved proceed; executing plan with parallel slices (storage/session + metadata) then integration/test/docs/finalization. +- `2026-02-22T20:01:45Z` implementation complete: extracted `storage.ts`, `session.ts`, `metadata.ts`; rewired `immersion-tracker-service.ts` facade; added `storage-session.test.ts` + `metadata.test.ts`; updated `docs/immersion-tracking.md` boundaries. +- `2026-02-22T20:01:45Z` verification: tracker tests green (7 pass/9 skip), `test:core:src` green (236 pass/6 skip), `bun run build` blocked by unrelated pre-existing TS errors in `src/anki-integration/*` + `src/main/runtime/*`; left TASK-106 in progress with AC#4 unchecked. +- `2026-02-22T21:58:45Z` user confirmed build fixed; reran `bun run build` + tracker tests + `test:core:src` all green; finalized TASK-106 Done with AC/DoD complete and final summary. + +## Files Touched + +- `src/core/services/immersion-tracker-service.ts` +- `src/core/services/immersion-tracker/storage.ts` +- `src/core/services/immersion-tracker/session.ts` +- `src/core/services/immersion-tracker/metadata.ts` +- `src/core/services/immersion-tracker/storage-session.test.ts` +- `src/core/services/immersion-tracker/metadata.test.ts` +- `docs/immersion-tracking.md` +- `docs/plans/2026-02-22-task-106-immersion-tracker-storage-session-metadata.md` diff --git a/src/core/services/immersion-tracker-service.ts b/src/core/services/immersion-tracker-service.ts index 59510d5..a61124f 100644 --- a/src/core/services/immersion-tracker-service.ts +++ b/src/core/services/immersion-tracker-service.ts @@ -1,10 +1,20 @@ -import crypto from 'node:crypto'; import path from 'node:path'; -import { spawn } from 'node:child_process'; import { DatabaseSync } from 'node:sqlite'; import * as fs from 'node:fs'; import { createLogger } from '../../logger'; +import { getLocalVideoMetadata } from './immersion-tracker/metadata'; import { pruneRetention, runRollupMaintenance } from './immersion-tracker/maintenance'; +import { finalizeSessionRecord, startSessionRecord } from './immersion-tracker/session'; +import { + applyPragmas, + createTrackerPreparedStatements, + ensureSchema, + executeQueuedWrite, + getOrCreateVideoRecord, + type TrackerPreparedStatements, + updateVideoMetadataRecord, + updateVideoTitleRecord, +} from './immersion-tracker/storage'; import { getDailyRollups, getMonthlyRollups, @@ -15,18 +25,13 @@ import { import { buildVideoKey, calculateTextMetrics, - createInitialSessionState, deriveCanonicalTitle, - emptyMetadata, - hashToCode, isRemoteSource, normalizeMediaPath, normalizeText, - parseFps, resolveBoundedInt, sanitizePayload, secToMs, - toNullableInt, } from './immersion-tracker/reducer'; import { enqueueWrite } from './immersion-tracker/queue'; import { @@ -48,9 +53,6 @@ import { EVENT_SEEK_BACKWARD, EVENT_SEEK_FORWARD, EVENT_SUBTITLE_LINE, - SCHEMA_VERSION, - SESSION_STATUS_ACTIVE, - SESSION_STATUS_ENDED, SOURCE_TYPE_LOCAL, SOURCE_TYPE_REMOTE, type ImmersionSessionRollupRow, @@ -59,7 +61,6 @@ import { type SessionState, type SessionSummaryQueryRow, type SessionTimelineRow, - type VideoMetadata, } from './immersion-tracker/types'; export type { @@ -95,8 +96,7 @@ export class ImmersionTrackerService { private sessionState: SessionState | null = null; private currentVideoKey = ''; private currentMediaPathOrUrl = ''; - private readonly telemetryInsertStmt: ReturnType; - private readonly eventInsertStmt: ReturnType; + private readonly preparedStatements: TrackerPreparedStatements; constructor(options: ImmersionTrackerOptions) { this.dbPath = options.dbPath; @@ -164,26 +164,9 @@ export class ImmersionTrackerService { 3650, ) * 86_400_000; this.db = new DatabaseSync(this.dbPath); - this.applyPragmas(); - this.ensureSchema(); - this.telemetryInsertStmt = this.db.prepare(` - INSERT INTO imm_session_telemetry ( - session_id, sample_ms, total_watched_ms, active_watched_ms, - lines_seen, words_seen, tokens_seen, cards_mined, lookup_count, - lookup_hits, pause_count, pause_ms, seek_forward_count, - seek_backward_count, media_buffer_events - ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? - ) - `); - this.eventInsertStmt = this.db.prepare(` - INSERT INTO imm_session_events ( - session_id, ts_ms, event_type, line_index, segment_start_ms, segment_end_ms, - words_delta, cards_delta, payload_json - ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ? - ) - `); + applyPragmas(this.db); + ensureSchema(this.db); + this.preparedStatements = createTrackerPreparedStatements(this.db); this.scheduleMaintenance(); this.scheduleFlush(); } @@ -257,7 +240,7 @@ export class ImmersionTrackerService { const sourceUrl = sourceType === SOURCE_TYPE_REMOTE ? normalizedPath : null; const sessionInfo = { - videoId: this.getOrCreateVideo(videoKey, { + videoId: getOrCreateVideoRecord(this.db, videoKey, { canonicalTitle, sourcePath, sourceUrl, @@ -563,193 +546,7 @@ export class ImmersionTrackerService { } private flushSingle(write: QueuedWrite): void { - if (write.kind === 'telemetry') { - this.telemetryInsertStmt.run( - write.sessionId, - write.sampleMs!, - write.totalWatchedMs!, - write.activeWatchedMs!, - write.linesSeen!, - write.wordsSeen!, - write.tokensSeen!, - write.cardsMined!, - write.lookupCount!, - write.lookupHits!, - write.pauseCount!, - write.pauseMs!, - write.seekForwardCount!, - write.seekBackwardCount!, - write.mediaBufferEvents!, - ); - return; - } - - this.eventInsertStmt.run( - write.sessionId, - write.sampleMs!, - write.eventType!, - write.lineIndex ?? null, - write.segmentStartMs ?? null, - write.segmentEndMs ?? null, - write.wordsDelta ?? 0, - write.cardsDelta ?? 0, - write.payloadJson ?? null, - ); - } - - private applyPragmas(): void { - this.db.exec('PRAGMA journal_mode = WAL'); - this.db.exec('PRAGMA synchronous = NORMAL'); - this.db.exec('PRAGMA foreign_keys = ON'); - this.db.exec('PRAGMA busy_timeout = 2500'); - } - - private ensureSchema(): void { - this.db.exec(` - CREATE TABLE IF NOT EXISTS imm_schema_version ( - schema_version INTEGER PRIMARY KEY, - applied_at_ms INTEGER NOT NULL - ); - `); - - const currentVersion = this.db - .prepare('SELECT schema_version FROM imm_schema_version ORDER BY schema_version DESC LIMIT 1') - .get() as { schema_version: number } | null; - if (currentVersion?.schema_version === SCHEMA_VERSION) { - return; - } - - this.db.exec(` - CREATE TABLE IF NOT EXISTS imm_videos( - video_id INTEGER PRIMARY KEY AUTOINCREMENT, - video_key TEXT NOT NULL UNIQUE, - canonical_title TEXT NOT NULL, - source_type INTEGER NOT NULL, - source_path TEXT, - source_url TEXT, - duration_ms INTEGER NOT NULL CHECK(duration_ms>=0), - file_size_bytes INTEGER CHECK(file_size_bytes>=0), - codec_id INTEGER, container_id INTEGER, - width_px INTEGER, height_px INTEGER, fps_x100 INTEGER, - bitrate_kbps INTEGER, audio_codec_id INTEGER, - hash_sha256 TEXT, screenshot_path TEXT, - metadata_json TEXT, - created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL - ); - `); - this.db.exec(` - CREATE TABLE IF NOT EXISTS imm_sessions( - session_id INTEGER PRIMARY KEY AUTOINCREMENT, - session_uuid TEXT NOT NULL UNIQUE, - video_id INTEGER NOT NULL, - started_at_ms INTEGER NOT NULL, ended_at_ms INTEGER, - status INTEGER NOT NULL, - locale_id INTEGER, target_lang_id INTEGER, - difficulty_tier INTEGER, subtitle_mode INTEGER, - created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL, - FOREIGN KEY(video_id) REFERENCES imm_videos(video_id) - ); - `); - this.db.exec(` - CREATE TABLE IF NOT EXISTS imm_session_telemetry( - telemetry_id INTEGER PRIMARY KEY AUTOINCREMENT, - session_id INTEGER NOT NULL, - sample_ms INTEGER NOT NULL, - total_watched_ms INTEGER NOT NULL DEFAULT 0, - active_watched_ms INTEGER NOT NULL DEFAULT 0, - lines_seen INTEGER NOT NULL DEFAULT 0, - words_seen INTEGER NOT NULL DEFAULT 0, - tokens_seen INTEGER NOT NULL DEFAULT 0, - cards_mined INTEGER NOT NULL DEFAULT 0, - lookup_count INTEGER NOT NULL DEFAULT 0, - lookup_hits INTEGER NOT NULL DEFAULT 0, - pause_count INTEGER NOT NULL DEFAULT 0, - pause_ms INTEGER NOT NULL DEFAULT 0, - seek_forward_count INTEGER NOT NULL DEFAULT 0, - seek_backward_count INTEGER NOT NULL DEFAULT 0, - media_buffer_events INTEGER NOT NULL DEFAULT 0, - FOREIGN KEY(session_id) REFERENCES imm_sessions(session_id) ON DELETE CASCADE - ); - `); - this.db.exec(` - CREATE TABLE IF NOT EXISTS imm_session_events( - event_id INTEGER PRIMARY KEY AUTOINCREMENT, - session_id INTEGER NOT NULL, - ts_ms INTEGER NOT NULL, - event_type INTEGER NOT NULL, - line_index INTEGER, - segment_start_ms INTEGER, - segment_end_ms INTEGER, - words_delta INTEGER NOT NULL DEFAULT 0, - cards_delta INTEGER NOT NULL DEFAULT 0, - payload_json TEXT, - FOREIGN KEY(session_id) REFERENCES imm_sessions(session_id) ON DELETE CASCADE - ); - `); - this.db.exec(` - CREATE TABLE IF NOT EXISTS imm_daily_rollups( - rollup_day INTEGER NOT NULL, - video_id INTEGER, - total_sessions INTEGER NOT NULL DEFAULT 0, - total_active_min REAL NOT NULL DEFAULT 0, - total_lines_seen INTEGER NOT NULL DEFAULT 0, - total_words_seen INTEGER NOT NULL DEFAULT 0, - total_tokens_seen INTEGER NOT NULL DEFAULT 0, - total_cards INTEGER NOT NULL DEFAULT 0, - cards_per_hour REAL, - words_per_min REAL, - lookup_hit_rate REAL, - PRIMARY KEY (rollup_day, video_id) - ); - `); - this.db.exec(` - CREATE TABLE IF NOT EXISTS imm_monthly_rollups( - rollup_month INTEGER NOT NULL, - video_id INTEGER, - total_sessions INTEGER NOT NULL DEFAULT 0, - total_active_min REAL NOT NULL DEFAULT 0, - total_lines_seen INTEGER NOT NULL DEFAULT 0, - total_words_seen INTEGER NOT NULL DEFAULT 0, - total_tokens_seen INTEGER NOT NULL DEFAULT 0, - total_cards INTEGER NOT NULL DEFAULT 0, - PRIMARY KEY (rollup_month, video_id) - ); - `); - - this.db.exec(` - CREATE INDEX IF NOT EXISTS idx_sessions_video_started - ON imm_sessions(video_id, started_at_ms DESC) - `); - this.db.exec(` - CREATE INDEX IF NOT EXISTS idx_sessions_status_started - ON imm_sessions(status, started_at_ms DESC) - `); - this.db.exec(` - CREATE INDEX IF NOT EXISTS idx_telemetry_session_sample - ON imm_session_telemetry(session_id, sample_ms DESC) - `); - this.db.exec(` - CREATE INDEX IF NOT EXISTS idx_events_session_ts - ON imm_session_events(session_id, ts_ms DESC) - `); - this.db.exec(` - CREATE INDEX IF NOT EXISTS idx_events_type_ts - ON imm_session_events(event_type, ts_ms DESC) - `); - this.db.exec(` - CREATE INDEX IF NOT EXISTS idx_rollups_day_video - ON imm_daily_rollups(rollup_day, video_id) - `); - this.db.exec(` - CREATE INDEX IF NOT EXISTS idx_rollups_month_video - ON imm_monthly_rollups(rollup_month, video_id) - `); - - this.db.exec(` - INSERT INTO imm_schema_version(schema_version, applied_at_ms) - VALUES (${SCHEMA_VERSION}, ${Date.now()}) - ON CONFLICT DO NOTHING - `); + executeQueuedWrite(write, this.preparedStatements); } private scheduleMaintenance(): void { @@ -771,7 +568,7 @@ export class ImmersionTrackerService { dailyRollupRetentionMs: this.dailyRollupRetentionMs, monthlyRollupRetentionMs: this.monthlyRollupRetentionMs, }); - runRollupMaintenance(this.db); + this.runRollupMaintenance(); if (nowMs - this.lastVacuumMs >= this.vacuumIntervalMs && !this.writeLock.locked) { this.db.exec('VACUUM'); @@ -785,15 +582,17 @@ export class ImmersionTrackerService { } } + private runRollupMaintenance(): void { + runRollupMaintenance(this.db); + } + private startSession(videoId: number, startedAtMs?: number): void { - const nowMs = startedAtMs ?? Date.now(); - const result = this.startSessionStatement(videoId, nowMs); - const sessionId = Number(result.lastInsertRowid); - this.sessionState = createInitialSessionState(sessionId, videoId, nowMs); + const { sessionId, state } = startSessionRecord(this.db, videoId, startedAtMs); + this.sessionState = state; this.recordWrite({ kind: 'telemetry', sessionId, - sampleMs: nowMs, + sampleMs: state.startedAtMs, totalWatchedMs: 0, activeWatchedMs: 0, linesSeen: 0, @@ -811,24 +610,6 @@ export class ImmersionTrackerService { this.scheduleFlush(0); } - private startSessionStatement( - videoId: number, - startedAtMs: number, - ): { - lastInsertRowid: number | bigint; - } { - const sessionUuid = crypto.randomUUID(); - return this.db - .prepare( - ` - INSERT INTO imm_sessions ( - session_uuid, video_id, started_at_ms, status, created_at_ms, updated_at_ms - ) VALUES (?, ?, ?, ?, ?, ?) - `, - ) - .run(sessionUuid, videoId, startedAtMs, SESSION_STATUS_ACTIVE, startedAtMs, startedAtMs); - } - private finalizeActiveSession(): void { if (!this.sessionState) return; const endedAt = Date.now(); @@ -850,250 +631,24 @@ export class ImmersionTrackerService { this.flushNow(); this.sessionState.pendingTelemetry = false; - this.db - .prepare( - 'UPDATE imm_sessions SET ended_at_ms = ?, status = ?, updated_at_ms = ? WHERE session_id = ?', - ) - .run(endedAt, SESSION_STATUS_ENDED, Date.now(), this.sessionState.sessionId); + finalizeSessionRecord(this.db, this.sessionState, endedAt); this.sessionState = null; } - private getOrCreateVideo( - videoKey: string, - details: { - canonicalTitle: string; - sourcePath: string | null; - sourceUrl: string | null; - sourceType: number; - }, - ): number { - const existing = this.db - .prepare('SELECT video_id FROM imm_videos WHERE video_key = ?') - .get(videoKey) as { video_id: number } | null; - if (existing?.video_id) { - this.db - .prepare('UPDATE imm_videos SET canonical_title = ?, updated_at_ms = ? WHERE video_id = ?') - .run(details.canonicalTitle || 'unknown', Date.now(), existing.video_id); - return existing.video_id; - } - - const nowMs = Date.now(); - const insert = this.db.prepare(` - INSERT INTO imm_videos ( - video_key, canonical_title, source_type, source_path, source_url, - duration_ms, file_size_bytes, codec_id, container_id, width_px, height_px, - fps_x100, bitrate_kbps, audio_codec_id, hash_sha256, screenshot_path, - metadata_json, created_at_ms, updated_at_ms - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `); - const result = insert.run( - videoKey, - details.canonicalTitle || 'unknown', - details.sourceType, - details.sourcePath, - details.sourceUrl, - 0, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - nowMs, - nowMs, - ); - return Number(result.lastInsertRowid); - } - - private updateVideoMetadata(videoId: number, metadata: VideoMetadata): void { - this.db - .prepare( - ` - UPDATE imm_videos - SET - duration_ms = ?, - file_size_bytes = ?, - codec_id = ?, - container_id = ?, - width_px = ?, - height_px = ?, - fps_x100 = ?, - bitrate_kbps = ?, - audio_codec_id = ?, - hash_sha256 = ?, - screenshot_path = ?, - metadata_json = ?, - updated_at_ms = ? - WHERE video_id = ? - `, - ) - .run( - metadata.durationMs, - metadata.fileSizeBytes, - metadata.codecId, - metadata.containerId, - metadata.widthPx, - metadata.heightPx, - metadata.fpsX100, - metadata.bitrateKbps, - metadata.audioCodecId, - metadata.hashSha256, - metadata.screenshotPath, - metadata.metadataJson, - Date.now(), - videoId, - ); - } - private captureVideoMetadataAsync(videoId: number, sourceType: number, mediaPath: string): void { if (sourceType !== SOURCE_TYPE_LOCAL) return; void (async () => { try { - const metadata = await this.getLocalVideoMetadata(mediaPath); - this.updateVideoMetadata(videoId, metadata); + const metadata = await getLocalVideoMetadata(mediaPath); + updateVideoMetadataRecord(this.db, videoId, metadata); } catch (error) { this.logger.warn('Unable to capture local video metadata', (error as Error).message); } })(); } - private async getLocalVideoMetadata(mediaPath: string): Promise { - const hash = await this.computeSha256(mediaPath); - const info = await this.runFfprobe(mediaPath); - const stat = await fs.promises.stat(mediaPath); - return { - sourceType: SOURCE_TYPE_LOCAL, - canonicalTitle: deriveCanonicalTitle(mediaPath), - durationMs: info.durationMs || 0, - fileSizeBytes: Number.isFinite(stat.size) ? stat.size : null, - codecId: info.codecId ?? null, - containerId: info.containerId ?? null, - widthPx: info.widthPx ?? null, - heightPx: info.heightPx ?? null, - fpsX100: info.fpsX100 ?? null, - bitrateKbps: info.bitrateKbps ?? null, - audioCodecId: info.audioCodecId ?? null, - hashSha256: hash, - screenshotPath: null, - metadataJson: null, - }; - } - - private async computeSha256(mediaPath: string): Promise { - return new Promise((resolve) => { - const file = fs.createReadStream(mediaPath); - const digest = crypto.createHash('sha256'); - file.on('data', (chunk) => digest.update(chunk)); - file.on('end', () => resolve(digest.digest('hex'))); - file.on('error', () => resolve(null)); - }); - } - - private runFfprobe(mediaPath: string): Promise<{ - durationMs: number | null; - codecId: number | null; - containerId: number | null; - widthPx: number | null; - heightPx: number | null; - fpsX100: number | null; - bitrateKbps: number | null; - audioCodecId: number | null; - }> { - return new Promise((resolve) => { - const child = spawn('ffprobe', [ - '-v', - 'error', - '-print_format', - 'json', - '-show_entries', - 'stream=codec_type,codec_tag_string,width,height,avg_frame_rate,bit_rate', - '-show_entries', - 'format=duration,bit_rate', - mediaPath, - ]); - - let output = ''; - let errorOutput = ''; - child.stdout.on('data', (chunk) => { - output += chunk.toString('utf-8'); - }); - child.stderr.on('data', (chunk) => { - errorOutput += chunk.toString('utf-8'); - }); - child.on('error', () => resolve(emptyMetadata())); - child.on('close', () => { - if (errorOutput && output.length === 0) { - resolve(emptyMetadata()); - return; - } - - try { - const parsed = JSON.parse(output) as { - format?: { duration?: string; bit_rate?: string }; - streams?: Array<{ - codec_type?: string; - codec_tag_string?: string; - width?: number; - height?: number; - avg_frame_rate?: string; - bit_rate?: string; - }>; - }; - - const durationText = parsed.format?.duration; - const bitrateText = parsed.format?.bit_rate; - const durationMs = Number(durationText) ? Math.round(Number(durationText) * 1000) : null; - const bitrateKbps = Number(bitrateText) ? Math.round(Number(bitrateText) / 1000) : null; - - let codecId: number | null = null; - let containerId: number | null = null; - let widthPx: number | null = null; - let heightPx: number | null = null; - let fpsX100: number | null = null; - let audioCodecId: number | null = null; - - for (const stream of parsed.streams ?? []) { - if (stream.codec_type === 'video') { - widthPx = toNullableInt(stream.width); - heightPx = toNullableInt(stream.height); - fpsX100 = parseFps(stream.avg_frame_rate); - codecId = hashToCode(stream.codec_tag_string); - containerId = 0; - } - if (stream.codec_type === 'audio') { - audioCodecId = hashToCode(stream.codec_tag_string); - if (audioCodecId && audioCodecId > 0) { - break; - } - } - } - - resolve({ - durationMs, - codecId, - containerId, - widthPx, - heightPx, - fpsX100, - bitrateKbps, - audioCodecId, - }); - } catch { - resolve(emptyMetadata()); - } - }); - }); - } - private updateVideoTitleForActiveSession(canonicalTitle: string): void { if (!this.sessionState) return; - this.db - .prepare('UPDATE imm_videos SET canonical_title = ?, updated_at_ms = ? WHERE video_id = ?') - .run(canonicalTitle, Date.now(), this.sessionState.videoId); + updateVideoTitleRecord(this.db, this.sessionState.videoId, canonicalTitle); } } diff --git a/src/core/services/immersion-tracker/metadata.test.ts b/src/core/services/immersion-tracker/metadata.test.ts new file mode 100644 index 0000000..b9da9d4 --- /dev/null +++ b/src/core/services/immersion-tracker/metadata.test.ts @@ -0,0 +1,148 @@ +import assert from 'node:assert/strict'; +import { createHash } from 'node:crypto'; +import { EventEmitter } from 'node:events'; +import test from 'node:test'; +import type { spawn as spawnFn } from 'node:child_process'; +import { SOURCE_TYPE_LOCAL } from './types'; +import { getLocalVideoMetadata, runFfprobe } from './metadata'; + +type Spawn = typeof spawnFn; + +function createSpawnStub(options: { + stdout?: string; + stderr?: string; + emitError?: boolean; +}): Spawn { + return (() => { + const child = new EventEmitter() as EventEmitter & { + stdout: EventEmitter; + stderr: EventEmitter; + }; + child.stdout = new EventEmitter(); + child.stderr = new EventEmitter(); + + queueMicrotask(() => { + if (options.emitError) { + child.emit('error', new Error('ffprobe failed')); + return; + } + if (options.stderr) { + child.stderr.emit('data', Buffer.from(options.stderr)); + } + if (options.stdout !== undefined) { + child.stdout.emit('data', Buffer.from(options.stdout)); + } + child.emit('close', 0); + }); + + return child as unknown as ReturnType; + }) as Spawn; +} + +test('runFfprobe parses valid JSON from stream and format sections', async () => { + const metadata = await runFfprobe('/tmp/video.mp4', { + spawn: createSpawnStub({ + stdout: JSON.stringify({ + format: { duration: '12.34', bit_rate: '3456000' }, + streams: [ + { + codec_type: 'video', + codec_tag_string: 'avc1', + width: 1920, + height: 1080, + avg_frame_rate: '24000/1001', + }, + { + codec_type: 'audio', + codec_tag_string: 'mp4a', + }, + ], + }), + }), + }); + + assert.equal(metadata.durationMs, 12340); + assert.equal(metadata.bitrateKbps, 3456); + assert.equal(metadata.widthPx, 1920); + assert.equal(metadata.heightPx, 1080); + assert.equal(metadata.fpsX100, 2398); + assert.equal(metadata.containerId, 0); + assert.ok(Number(metadata.codecId) > 0); + assert.ok(Number(metadata.audioCodecId) > 0); +}); + +test('runFfprobe returns empty metadata for invalid JSON and process errors', async () => { + const invalidJsonMetadata = await runFfprobe('/tmp/broken.mp4', { + spawn: createSpawnStub({ stdout: '{invalid' }), + }); + assert.deepEqual(invalidJsonMetadata, { + durationMs: null, + codecId: null, + containerId: null, + widthPx: null, + heightPx: null, + fpsX100: null, + bitrateKbps: null, + audioCodecId: null, + }); + + const errorMetadata = await runFfprobe('/tmp/error.mp4', { + spawn: createSpawnStub({ emitError: true }), + }); + assert.deepEqual(errorMetadata, { + durationMs: null, + codecId: null, + containerId: null, + widthPx: null, + heightPx: null, + fpsX100: null, + bitrateKbps: null, + audioCodecId: null, + }); +}); + +test('getLocalVideoMetadata derives title and falls back to null hash on read errors', async () => { + const successMetadata = await getLocalVideoMetadata('/tmp/Episode 01.mkv', { + spawn: createSpawnStub({ stdout: JSON.stringify({ format: { duration: '0' }, streams: [] }) }), + fs: { + createReadStream: () => { + const stream = new EventEmitter(); + queueMicrotask(() => { + stream.emit('data', Buffer.from('hello world')); + stream.emit('end'); + }); + return stream as unknown as ReturnType; + }, + promises: { + stat: (async () => ({ size: 1234 }) as unknown) as typeof import('node:fs').promises.stat, + }, + } as never, + }); + + assert.equal(successMetadata.sourceType, SOURCE_TYPE_LOCAL); + assert.equal(successMetadata.canonicalTitle, 'Episode 01'); + assert.equal(successMetadata.fileSizeBytes, 1234); + assert.equal( + successMetadata.hashSha256, + createHash('sha256').update('hello world').digest('hex'), + ); + + const hashFallbackMetadata = await getLocalVideoMetadata('/tmp/Episode 02.mkv', { + spawn: createSpawnStub({ stdout: JSON.stringify({ format: {}, streams: [] }) }), + fs: { + createReadStream: () => { + const stream = new EventEmitter(); + queueMicrotask(() => { + stream.emit('error', new Error('read failed')); + }); + return stream as unknown as ReturnType; + }, + promises: { + stat: (async () => ({ size: 5678 }) as unknown) as typeof import('node:fs').promises.stat, + }, + } as never, + }); + + assert.equal(hashFallbackMetadata.canonicalTitle, 'Episode 02'); + assert.equal(hashFallbackMetadata.hashSha256, null); +}); diff --git a/src/core/services/immersion-tracker/metadata.ts b/src/core/services/immersion-tracker/metadata.ts new file mode 100644 index 0000000..394da91 --- /dev/null +++ b/src/core/services/immersion-tracker/metadata.ts @@ -0,0 +1,153 @@ +import crypto from 'node:crypto'; +import { spawn as nodeSpawn } from 'node:child_process'; +import * as fs from 'node:fs'; +import { + deriveCanonicalTitle, + emptyMetadata, + hashToCode, + parseFps, + toNullableInt, +} from './reducer'; +import { SOURCE_TYPE_LOCAL, type ProbeMetadata, type VideoMetadata } from './types'; + +type SpawnFn = typeof nodeSpawn; + +interface FsDeps { + createReadStream: typeof fs.createReadStream; + promises: { + stat: typeof fs.promises.stat; + }; +} + +interface MetadataDeps { + spawn?: SpawnFn; + fs?: FsDeps; +} + +export async function computeSha256( + mediaPath: string, + deps: MetadataDeps = {}, +): Promise { + const fileSystem = deps.fs ?? fs; + return new Promise((resolve) => { + const file = fileSystem.createReadStream(mediaPath); + const digest = crypto.createHash('sha256'); + file.on('data', (chunk) => digest.update(chunk)); + file.on('end', () => resolve(digest.digest('hex'))); + file.on('error', () => resolve(null)); + }); +} + +export function runFfprobe(mediaPath: string, deps: MetadataDeps = {}): Promise { + const spawn = deps.spawn ?? nodeSpawn; + return new Promise((resolve) => { + const child = spawn('ffprobe', [ + '-v', + 'error', + '-print_format', + 'json', + '-show_entries', + 'stream=codec_type,codec_tag_string,width,height,avg_frame_rate,bit_rate', + '-show_entries', + 'format=duration,bit_rate', + mediaPath, + ]); + + let output = ''; + let errorOutput = ''; + child.stdout.on('data', (chunk) => { + output += chunk.toString('utf-8'); + }); + child.stderr.on('data', (chunk) => { + errorOutput += chunk.toString('utf-8'); + }); + child.on('error', () => resolve(emptyMetadata())); + child.on('close', () => { + if (errorOutput && output.length === 0) { + resolve(emptyMetadata()); + return; + } + + try { + const parsed = JSON.parse(output) as { + format?: { duration?: string; bit_rate?: string }; + streams?: Array<{ + codec_type?: string; + codec_tag_string?: string; + width?: number; + height?: number; + avg_frame_rate?: string; + bit_rate?: string; + }>; + }; + + const durationText = parsed.format?.duration; + const bitrateText = parsed.format?.bit_rate; + const durationMs = Number(durationText) ? Math.round(Number(durationText) * 1000) : null; + const bitrateKbps = Number(bitrateText) ? Math.round(Number(bitrateText) / 1000) : null; + + let codecId: number | null = null; + let containerId: number | null = null; + let widthPx: number | null = null; + let heightPx: number | null = null; + let fpsX100: number | null = null; + let audioCodecId: number | null = null; + + for (const stream of parsed.streams ?? []) { + if (stream.codec_type === 'video') { + widthPx = toNullableInt(stream.width); + heightPx = toNullableInt(stream.height); + fpsX100 = parseFps(stream.avg_frame_rate); + codecId = hashToCode(stream.codec_tag_string); + containerId = 0; + } + if (stream.codec_type === 'audio') { + audioCodecId = hashToCode(stream.codec_tag_string); + if (audioCodecId && audioCodecId > 0) { + break; + } + } + } + + resolve({ + durationMs, + codecId, + containerId, + widthPx, + heightPx, + fpsX100, + bitrateKbps, + audioCodecId, + }); + } catch { + resolve(emptyMetadata()); + } + }); + }); +} + +export async function getLocalVideoMetadata( + mediaPath: string, + deps: MetadataDeps = {}, +): Promise { + const fileSystem = deps.fs ?? fs; + const hash = await computeSha256(mediaPath, deps); + const info = await runFfprobe(mediaPath, deps); + const stat = await fileSystem.promises.stat(mediaPath); + return { + sourceType: SOURCE_TYPE_LOCAL, + canonicalTitle: deriveCanonicalTitle(mediaPath), + durationMs: info.durationMs || 0, + fileSizeBytes: Number.isFinite(stat.size) ? stat.size : null, + codecId: info.codecId ?? null, + containerId: info.containerId ?? null, + widthPx: info.widthPx ?? null, + heightPx: info.heightPx ?? null, + fpsX100: info.fpsX100 ?? null, + bitrateKbps: info.bitrateKbps ?? null, + audioCodecId: info.audioCodecId ?? null, + hashSha256: hash, + screenshotPath: null, + metadataJson: null, + }; +} diff --git a/src/core/services/immersion-tracker/session.ts b/src/core/services/immersion-tracker/session.ts new file mode 100644 index 0000000..2c39de1 --- /dev/null +++ b/src/core/services/immersion-tracker/session.ts @@ -0,0 +1,37 @@ +import crypto from 'node:crypto'; +import type { DatabaseSync } from 'node:sqlite'; +import { createInitialSessionState } from './reducer'; +import { SESSION_STATUS_ACTIVE, SESSION_STATUS_ENDED } from './types'; +import type { SessionState } from './types'; + +export function startSessionRecord( + db: DatabaseSync, + videoId: number, + startedAtMs = Date.now(), +): { sessionId: number; state: SessionState } { + const sessionUuid = crypto.randomUUID(); + const result = db + .prepare( + ` + INSERT INTO imm_sessions ( + session_uuid, video_id, started_at_ms, status, created_at_ms, updated_at_ms + ) VALUES (?, ?, ?, ?, ?, ?) + `, + ) + .run(sessionUuid, videoId, startedAtMs, SESSION_STATUS_ACTIVE, startedAtMs, startedAtMs); + const sessionId = Number(result.lastInsertRowid); + return { + sessionId, + state: createInitialSessionState(sessionId, videoId, startedAtMs), + }; +} + +export function finalizeSessionRecord( + db: DatabaseSync, + sessionState: SessionState, + endedAtMs = Date.now(), +): void { + db.prepare( + 'UPDATE imm_sessions SET ended_at_ms = ?, status = ?, updated_at_ms = ? WHERE session_id = ?', + ).run(endedAtMs, SESSION_STATUS_ENDED, Date.now(), sessionState.sessionId); +} diff --git a/src/core/services/immersion-tracker/storage-session.test.ts b/src/core/services/immersion-tracker/storage-session.test.ts new file mode 100644 index 0000000..5d89008 --- /dev/null +++ b/src/core/services/immersion-tracker/storage-session.test.ts @@ -0,0 +1,162 @@ +import assert from 'node:assert/strict'; +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import test from 'node:test'; +import type { DatabaseSync as NodeDatabaseSync } from 'node:sqlite'; +import { finalizeSessionRecord, startSessionRecord } from './session'; +import { + createTrackerPreparedStatements, + ensureSchema, + executeQueuedWrite, + getOrCreateVideoRecord, +} from './storage'; +import { EVENT_SUBTITLE_LINE, SESSION_STATUS_ENDED, SOURCE_TYPE_LOCAL } from './types'; + +type DatabaseSyncCtor = typeof NodeDatabaseSync; +const DatabaseSync: DatabaseSyncCtor | null = (() => { + try { + return (require('node:sqlite') as { DatabaseSync?: DatabaseSyncCtor }).DatabaseSync ?? null; + } catch { + return null; + } +})(); +const testIfSqlite = DatabaseSync ? test : test.skip; + +function makeDbPath(): string { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), 'subminer-imm-storage-session-')); + return path.join(dir, 'immersion.sqlite'); +} + +function cleanupDbPath(dbPath: string): void { + const dir = path.dirname(dbPath); + if (fs.existsSync(dir)) { + fs.rmSync(dir, { recursive: true, force: true }); + } +} + +testIfSqlite('ensureSchema creates immersion core tables', () => { + const dbPath = makeDbPath(); + const db = new DatabaseSync!(dbPath); + + try { + ensureSchema(db); + const rows = db + .prepare( + `SELECT name FROM sqlite_master WHERE type = 'table' AND name LIKE 'imm_%' ORDER BY name`, + ) + .all() as Array<{ name: string }>; + const tableNames = new Set(rows.map((row) => row.name)); + + assert.ok(tableNames.has('imm_videos')); + assert.ok(tableNames.has('imm_sessions')); + assert.ok(tableNames.has('imm_session_telemetry')); + assert.ok(tableNames.has('imm_session_events')); + assert.ok(tableNames.has('imm_daily_rollups')); + assert.ok(tableNames.has('imm_monthly_rollups')); + } finally { + db.close(); + cleanupDbPath(dbPath); + } +}); + +testIfSqlite('start/finalize session updates ended_at and status', () => { + const dbPath = makeDbPath(); + const db = new DatabaseSync!(dbPath); + + try { + ensureSchema(db); + const videoId = getOrCreateVideoRecord(db, 'local:/tmp/slice-a.mkv', { + canonicalTitle: 'Slice A Episode', + sourcePath: '/tmp/slice-a.mkv', + sourceUrl: null, + sourceType: SOURCE_TYPE_LOCAL, + }); + const startedAtMs = 1_234_567_000; + const endedAtMs = startedAtMs + 8_500; + const { sessionId, state } = startSessionRecord(db, videoId, startedAtMs); + + finalizeSessionRecord(db, state, endedAtMs); + + const row = db + .prepare('SELECT ended_at_ms, status FROM imm_sessions WHERE session_id = ?') + .get(sessionId) as { + ended_at_ms: number | null; + status: number; + } | null; + + assert.ok(row); + assert.equal(row?.ended_at_ms, endedAtMs); + assert.equal(row?.status, SESSION_STATUS_ENDED); + } finally { + db.close(); + cleanupDbPath(dbPath); + } +}); + +testIfSqlite('executeQueuedWrite inserts event and telemetry rows', () => { + const dbPath = makeDbPath(); + const db = new DatabaseSync!(dbPath); + + try { + ensureSchema(db); + const stmts = createTrackerPreparedStatements(db); + const videoId = getOrCreateVideoRecord(db, 'local:/tmp/slice-a-events.mkv', { + canonicalTitle: 'Slice A Events', + sourcePath: '/tmp/slice-a-events.mkv', + sourceUrl: null, + sourceType: SOURCE_TYPE_LOCAL, + }); + const { sessionId } = startSessionRecord(db, videoId, 5_000); + + executeQueuedWrite( + { + kind: 'telemetry', + sessionId, + sampleMs: 6_000, + totalWatchedMs: 1_000, + activeWatchedMs: 900, + linesSeen: 3, + wordsSeen: 6, + tokensSeen: 6, + cardsMined: 1, + lookupCount: 2, + lookupHits: 1, + pauseCount: 1, + pauseMs: 50, + seekForwardCount: 0, + seekBackwardCount: 0, + mediaBufferEvents: 0, + }, + stmts, + ); + executeQueuedWrite( + { + kind: 'event', + sessionId, + sampleMs: 6_100, + eventType: EVENT_SUBTITLE_LINE, + lineIndex: 1, + segmentStartMs: 0, + segmentEndMs: 800, + wordsDelta: 2, + cardsDelta: 0, + payloadJson: '{"event":"subtitle-line"}', + }, + stmts, + ); + + const telemetryCount = db + .prepare('SELECT COUNT(*) AS total FROM imm_session_telemetry WHERE session_id = ?') + .get(sessionId) as { total: number }; + const eventCount = db + .prepare('SELECT COUNT(*) AS total FROM imm_session_events WHERE session_id = ?') + .get(sessionId) as { total: number }; + + assert.equal(telemetryCount.total, 1); + assert.equal(eventCount.total, 1); + } finally { + db.close(); + cleanupDbPath(dbPath); + } +}); diff --git a/src/core/services/immersion-tracker/storage.ts b/src/core/services/immersion-tracker/storage.ts new file mode 100644 index 0000000..bd40c69 --- /dev/null +++ b/src/core/services/immersion-tracker/storage.ts @@ -0,0 +1,328 @@ +import type { DatabaseSync } from 'node:sqlite'; +import { SCHEMA_VERSION } from './types'; +import type { QueuedWrite, VideoMetadata } from './types'; + +export interface TrackerPreparedStatements { + telemetryInsertStmt: ReturnType; + eventInsertStmt: ReturnType; +} + +export function applyPragmas(db: DatabaseSync): void { + db.exec('PRAGMA journal_mode = WAL'); + db.exec('PRAGMA synchronous = NORMAL'); + db.exec('PRAGMA foreign_keys = ON'); + db.exec('PRAGMA busy_timeout = 2500'); +} + +export function ensureSchema(db: DatabaseSync): void { + db.exec(` + CREATE TABLE IF NOT EXISTS imm_schema_version ( + schema_version INTEGER PRIMARY KEY, + applied_at_ms INTEGER NOT NULL + ); + `); + + const currentVersion = db + .prepare('SELECT schema_version FROM imm_schema_version ORDER BY schema_version DESC LIMIT 1') + .get() as { schema_version: number } | null; + if (currentVersion?.schema_version === SCHEMA_VERSION) { + return; + } + + db.exec(` + CREATE TABLE IF NOT EXISTS imm_videos( + video_id INTEGER PRIMARY KEY AUTOINCREMENT, + video_key TEXT NOT NULL UNIQUE, + canonical_title TEXT NOT NULL, + source_type INTEGER NOT NULL, + source_path TEXT, + source_url TEXT, + duration_ms INTEGER NOT NULL CHECK(duration_ms>=0), + file_size_bytes INTEGER CHECK(file_size_bytes>=0), + codec_id INTEGER, container_id INTEGER, + width_px INTEGER, height_px INTEGER, fps_x100 INTEGER, + bitrate_kbps INTEGER, audio_codec_id INTEGER, + hash_sha256 TEXT, screenshot_path TEXT, + metadata_json TEXT, + created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL + ); + `); + db.exec(` + CREATE TABLE IF NOT EXISTS imm_sessions( + session_id INTEGER PRIMARY KEY AUTOINCREMENT, + session_uuid TEXT NOT NULL UNIQUE, + video_id INTEGER NOT NULL, + started_at_ms INTEGER NOT NULL, ended_at_ms INTEGER, + status INTEGER NOT NULL, + locale_id INTEGER, target_lang_id INTEGER, + difficulty_tier INTEGER, subtitle_mode INTEGER, + created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL, + FOREIGN KEY(video_id) REFERENCES imm_videos(video_id) + ); + `); + db.exec(` + CREATE TABLE IF NOT EXISTS imm_session_telemetry( + telemetry_id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id INTEGER NOT NULL, + sample_ms INTEGER NOT NULL, + total_watched_ms INTEGER NOT NULL DEFAULT 0, + active_watched_ms INTEGER NOT NULL DEFAULT 0, + lines_seen INTEGER NOT NULL DEFAULT 0, + words_seen INTEGER NOT NULL DEFAULT 0, + tokens_seen INTEGER NOT NULL DEFAULT 0, + cards_mined INTEGER NOT NULL DEFAULT 0, + lookup_count INTEGER NOT NULL DEFAULT 0, + lookup_hits INTEGER NOT NULL DEFAULT 0, + pause_count INTEGER NOT NULL DEFAULT 0, + pause_ms INTEGER NOT NULL DEFAULT 0, + seek_forward_count INTEGER NOT NULL DEFAULT 0, + seek_backward_count INTEGER NOT NULL DEFAULT 0, + media_buffer_events INTEGER NOT NULL DEFAULT 0, + FOREIGN KEY(session_id) REFERENCES imm_sessions(session_id) ON DELETE CASCADE + ); + `); + db.exec(` + CREATE TABLE IF NOT EXISTS imm_session_events( + event_id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id INTEGER NOT NULL, + ts_ms INTEGER NOT NULL, + event_type INTEGER NOT NULL, + line_index INTEGER, + segment_start_ms INTEGER, + segment_end_ms INTEGER, + words_delta INTEGER NOT NULL DEFAULT 0, + cards_delta INTEGER NOT NULL DEFAULT 0, + payload_json TEXT, + FOREIGN KEY(session_id) REFERENCES imm_sessions(session_id) ON DELETE CASCADE + ); + `); + db.exec(` + CREATE TABLE IF NOT EXISTS imm_daily_rollups( + rollup_day INTEGER NOT NULL, + video_id INTEGER, + total_sessions INTEGER NOT NULL DEFAULT 0, + total_active_min REAL NOT NULL DEFAULT 0, + total_lines_seen INTEGER NOT NULL DEFAULT 0, + total_words_seen INTEGER NOT NULL DEFAULT 0, + total_tokens_seen INTEGER NOT NULL DEFAULT 0, + total_cards INTEGER NOT NULL DEFAULT 0, + cards_per_hour REAL, + words_per_min REAL, + lookup_hit_rate REAL, + PRIMARY KEY (rollup_day, video_id) + ); + `); + db.exec(` + CREATE TABLE IF NOT EXISTS imm_monthly_rollups( + rollup_month INTEGER NOT NULL, + video_id INTEGER, + total_sessions INTEGER NOT NULL DEFAULT 0, + total_active_min REAL NOT NULL DEFAULT 0, + total_lines_seen INTEGER NOT NULL DEFAULT 0, + total_words_seen INTEGER NOT NULL DEFAULT 0, + total_tokens_seen INTEGER NOT NULL DEFAULT 0, + total_cards INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (rollup_month, video_id) + ); + `); + + db.exec(` + CREATE INDEX IF NOT EXISTS idx_sessions_video_started + ON imm_sessions(video_id, started_at_ms DESC) + `); + db.exec(` + CREATE INDEX IF NOT EXISTS idx_sessions_status_started + ON imm_sessions(status, started_at_ms DESC) + `); + db.exec(` + CREATE INDEX IF NOT EXISTS idx_telemetry_session_sample + ON imm_session_telemetry(session_id, sample_ms DESC) + `); + db.exec(` + CREATE INDEX IF NOT EXISTS idx_events_session_ts + ON imm_session_events(session_id, ts_ms DESC) + `); + db.exec(` + CREATE INDEX IF NOT EXISTS idx_events_type_ts + ON imm_session_events(event_type, ts_ms DESC) + `); + db.exec(` + CREATE INDEX IF NOT EXISTS idx_rollups_day_video + ON imm_daily_rollups(rollup_day, video_id) + `); + db.exec(` + CREATE INDEX IF NOT EXISTS idx_rollups_month_video + ON imm_monthly_rollups(rollup_month, video_id) + `); + + db.exec(` + INSERT INTO imm_schema_version(schema_version, applied_at_ms) + VALUES (${SCHEMA_VERSION}, ${Date.now()}) + ON CONFLICT DO NOTHING + `); +} + +export function createTrackerPreparedStatements(db: DatabaseSync): TrackerPreparedStatements { + return { + telemetryInsertStmt: db.prepare(` + INSERT INTO imm_session_telemetry ( + session_id, sample_ms, total_watched_ms, active_watched_ms, + lines_seen, words_seen, tokens_seen, cards_mined, lookup_count, + lookup_hits, pause_count, pause_ms, seek_forward_count, + seek_backward_count, media_buffer_events + ) VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + `), + eventInsertStmt: db.prepare(` + INSERT INTO imm_session_events ( + session_id, ts_ms, event_type, line_index, segment_start_ms, segment_end_ms, + words_delta, cards_delta, payload_json + ) VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + `), + }; +} + +export function executeQueuedWrite(write: QueuedWrite, stmts: TrackerPreparedStatements): void { + if (write.kind === 'telemetry') { + stmts.telemetryInsertStmt.run( + write.sessionId, + write.sampleMs!, + write.totalWatchedMs!, + write.activeWatchedMs!, + write.linesSeen!, + write.wordsSeen!, + write.tokensSeen!, + write.cardsMined!, + write.lookupCount!, + write.lookupHits!, + write.pauseCount!, + write.pauseMs!, + write.seekForwardCount!, + write.seekBackwardCount!, + write.mediaBufferEvents!, + ); + return; + } + + stmts.eventInsertStmt.run( + write.sessionId, + write.sampleMs!, + write.eventType!, + write.lineIndex ?? null, + write.segmentStartMs ?? null, + write.segmentEndMs ?? null, + write.wordsDelta ?? 0, + write.cardsDelta ?? 0, + write.payloadJson ?? null, + ); +} + +export function getOrCreateVideoRecord( + db: DatabaseSync, + videoKey: string, + details: { + canonicalTitle: string; + sourcePath: string | null; + sourceUrl: string | null; + sourceType: number; + }, +): number { + const existing = db + .prepare('SELECT video_id FROM imm_videos WHERE video_key = ?') + .get(videoKey) as { video_id: number } | null; + if (existing?.video_id) { + db.prepare( + 'UPDATE imm_videos SET canonical_title = ?, updated_at_ms = ? WHERE video_id = ?', + ).run(details.canonicalTitle || 'unknown', Date.now(), existing.video_id); + return existing.video_id; + } + + const nowMs = Date.now(); + const insert = db.prepare(` + INSERT INTO imm_videos ( + video_key, canonical_title, source_type, source_path, source_url, + duration_ms, file_size_bytes, codec_id, container_id, width_px, height_px, + fps_x100, bitrate_kbps, audio_codec_id, hash_sha256, screenshot_path, + metadata_json, created_at_ms, updated_at_ms + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + const result = insert.run( + videoKey, + details.canonicalTitle || 'unknown', + details.sourceType, + details.sourcePath, + details.sourceUrl, + 0, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + nowMs, + nowMs, + ); + return Number(result.lastInsertRowid); +} + +export function updateVideoMetadataRecord( + db: DatabaseSync, + videoId: number, + metadata: VideoMetadata, +): void { + db.prepare( + ` + UPDATE imm_videos + SET + duration_ms = ?, + file_size_bytes = ?, + codec_id = ?, + container_id = ?, + width_px = ?, + height_px = ?, + fps_x100 = ?, + bitrate_kbps = ?, + audio_codec_id = ?, + hash_sha256 = ?, + screenshot_path = ?, + metadata_json = ?, + updated_at_ms = ? + WHERE video_id = ? + `, + ).run( + metadata.durationMs, + metadata.fileSizeBytes, + metadata.codecId, + metadata.containerId, + metadata.widthPx, + metadata.heightPx, + metadata.fpsX100, + metadata.bitrateKbps, + metadata.audioCodecId, + metadata.hashSha256, + metadata.screenshotPath, + metadata.metadataJson, + Date.now(), + videoId, + ); +} + +export function updateVideoTitleRecord( + db: DatabaseSync, + videoId: number, + canonicalTitle: string, +): void { + db.prepare('UPDATE imm_videos SET canonical_title = ?, updated_at_ms = ? WHERE video_id = ?').run( + canonicalTitle, + Date.now(), + videoId, + ); +}