mirror of
https://github.com/ksyasuda/SubMiner.git
synced 2026-02-28 06:22:45 -08:00
refactor(immersion): split tracker storage and metadata modules
Decompose the immersion tracker facade into focused storage/session/metadata collaborators with dedicated tests and updated ownership docs while preserving runtime behavior.
This commit is contained in:
@@ -1,10 +1,20 @@
|
||||
import crypto from 'node:crypto';
|
||||
import path from 'node:path';
|
||||
import { spawn } from 'node:child_process';
|
||||
import { DatabaseSync } from 'node:sqlite';
|
||||
import * as fs from 'node:fs';
|
||||
import { createLogger } from '../../logger';
|
||||
import { getLocalVideoMetadata } from './immersion-tracker/metadata';
|
||||
import { pruneRetention, runRollupMaintenance } from './immersion-tracker/maintenance';
|
||||
import { finalizeSessionRecord, startSessionRecord } from './immersion-tracker/session';
|
||||
import {
|
||||
applyPragmas,
|
||||
createTrackerPreparedStatements,
|
||||
ensureSchema,
|
||||
executeQueuedWrite,
|
||||
getOrCreateVideoRecord,
|
||||
type TrackerPreparedStatements,
|
||||
updateVideoMetadataRecord,
|
||||
updateVideoTitleRecord,
|
||||
} from './immersion-tracker/storage';
|
||||
import {
|
||||
getDailyRollups,
|
||||
getMonthlyRollups,
|
||||
@@ -15,18 +25,13 @@ import {
|
||||
import {
|
||||
buildVideoKey,
|
||||
calculateTextMetrics,
|
||||
createInitialSessionState,
|
||||
deriveCanonicalTitle,
|
||||
emptyMetadata,
|
||||
hashToCode,
|
||||
isRemoteSource,
|
||||
normalizeMediaPath,
|
||||
normalizeText,
|
||||
parseFps,
|
||||
resolveBoundedInt,
|
||||
sanitizePayload,
|
||||
secToMs,
|
||||
toNullableInt,
|
||||
} from './immersion-tracker/reducer';
|
||||
import { enqueueWrite } from './immersion-tracker/queue';
|
||||
import {
|
||||
@@ -48,9 +53,6 @@ import {
|
||||
EVENT_SEEK_BACKWARD,
|
||||
EVENT_SEEK_FORWARD,
|
||||
EVENT_SUBTITLE_LINE,
|
||||
SCHEMA_VERSION,
|
||||
SESSION_STATUS_ACTIVE,
|
||||
SESSION_STATUS_ENDED,
|
||||
SOURCE_TYPE_LOCAL,
|
||||
SOURCE_TYPE_REMOTE,
|
||||
type ImmersionSessionRollupRow,
|
||||
@@ -59,7 +61,6 @@ import {
|
||||
type SessionState,
|
||||
type SessionSummaryQueryRow,
|
||||
type SessionTimelineRow,
|
||||
type VideoMetadata,
|
||||
} from './immersion-tracker/types';
|
||||
|
||||
export type {
|
||||
@@ -95,8 +96,7 @@ export class ImmersionTrackerService {
|
||||
private sessionState: SessionState | null = null;
|
||||
private currentVideoKey = '';
|
||||
private currentMediaPathOrUrl = '';
|
||||
private readonly telemetryInsertStmt: ReturnType<DatabaseSync['prepare']>;
|
||||
private readonly eventInsertStmt: ReturnType<DatabaseSync['prepare']>;
|
||||
private readonly preparedStatements: TrackerPreparedStatements;
|
||||
|
||||
constructor(options: ImmersionTrackerOptions) {
|
||||
this.dbPath = options.dbPath;
|
||||
@@ -164,26 +164,9 @@ export class ImmersionTrackerService {
|
||||
3650,
|
||||
) * 86_400_000;
|
||||
this.db = new DatabaseSync(this.dbPath);
|
||||
this.applyPragmas();
|
||||
this.ensureSchema();
|
||||
this.telemetryInsertStmt = this.db.prepare(`
|
||||
INSERT INTO imm_session_telemetry (
|
||||
session_id, sample_ms, total_watched_ms, active_watched_ms,
|
||||
lines_seen, words_seen, tokens_seen, cards_mined, lookup_count,
|
||||
lookup_hits, pause_count, pause_ms, seek_forward_count,
|
||||
seek_backward_count, media_buffer_events
|
||||
) VALUES (
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
|
||||
)
|
||||
`);
|
||||
this.eventInsertStmt = this.db.prepare(`
|
||||
INSERT INTO imm_session_events (
|
||||
session_id, ts_ms, event_type, line_index, segment_start_ms, segment_end_ms,
|
||||
words_delta, cards_delta, payload_json
|
||||
) VALUES (
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?
|
||||
)
|
||||
`);
|
||||
applyPragmas(this.db);
|
||||
ensureSchema(this.db);
|
||||
this.preparedStatements = createTrackerPreparedStatements(this.db);
|
||||
this.scheduleMaintenance();
|
||||
this.scheduleFlush();
|
||||
}
|
||||
@@ -257,7 +240,7 @@ export class ImmersionTrackerService {
|
||||
const sourceUrl = sourceType === SOURCE_TYPE_REMOTE ? normalizedPath : null;
|
||||
|
||||
const sessionInfo = {
|
||||
videoId: this.getOrCreateVideo(videoKey, {
|
||||
videoId: getOrCreateVideoRecord(this.db, videoKey, {
|
||||
canonicalTitle,
|
||||
sourcePath,
|
||||
sourceUrl,
|
||||
@@ -563,193 +546,7 @@ export class ImmersionTrackerService {
|
||||
}
|
||||
|
||||
private flushSingle(write: QueuedWrite): void {
|
||||
if (write.kind === 'telemetry') {
|
||||
this.telemetryInsertStmt.run(
|
||||
write.sessionId,
|
||||
write.sampleMs!,
|
||||
write.totalWatchedMs!,
|
||||
write.activeWatchedMs!,
|
||||
write.linesSeen!,
|
||||
write.wordsSeen!,
|
||||
write.tokensSeen!,
|
||||
write.cardsMined!,
|
||||
write.lookupCount!,
|
||||
write.lookupHits!,
|
||||
write.pauseCount!,
|
||||
write.pauseMs!,
|
||||
write.seekForwardCount!,
|
||||
write.seekBackwardCount!,
|
||||
write.mediaBufferEvents!,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
this.eventInsertStmt.run(
|
||||
write.sessionId,
|
||||
write.sampleMs!,
|
||||
write.eventType!,
|
||||
write.lineIndex ?? null,
|
||||
write.segmentStartMs ?? null,
|
||||
write.segmentEndMs ?? null,
|
||||
write.wordsDelta ?? 0,
|
||||
write.cardsDelta ?? 0,
|
||||
write.payloadJson ?? null,
|
||||
);
|
||||
}
|
||||
|
||||
private applyPragmas(): void {
|
||||
this.db.exec('PRAGMA journal_mode = WAL');
|
||||
this.db.exec('PRAGMA synchronous = NORMAL');
|
||||
this.db.exec('PRAGMA foreign_keys = ON');
|
||||
this.db.exec('PRAGMA busy_timeout = 2500');
|
||||
}
|
||||
|
||||
private ensureSchema(): void {
|
||||
this.db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_schema_version (
|
||||
schema_version INTEGER PRIMARY KEY,
|
||||
applied_at_ms INTEGER NOT NULL
|
||||
);
|
||||
`);
|
||||
|
||||
const currentVersion = this.db
|
||||
.prepare('SELECT schema_version FROM imm_schema_version ORDER BY schema_version DESC LIMIT 1')
|
||||
.get() as { schema_version: number } | null;
|
||||
if (currentVersion?.schema_version === SCHEMA_VERSION) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_videos(
|
||||
video_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
video_key TEXT NOT NULL UNIQUE,
|
||||
canonical_title TEXT NOT NULL,
|
||||
source_type INTEGER NOT NULL,
|
||||
source_path TEXT,
|
||||
source_url TEXT,
|
||||
duration_ms INTEGER NOT NULL CHECK(duration_ms>=0),
|
||||
file_size_bytes INTEGER CHECK(file_size_bytes>=0),
|
||||
codec_id INTEGER, container_id INTEGER,
|
||||
width_px INTEGER, height_px INTEGER, fps_x100 INTEGER,
|
||||
bitrate_kbps INTEGER, audio_codec_id INTEGER,
|
||||
hash_sha256 TEXT, screenshot_path TEXT,
|
||||
metadata_json TEXT,
|
||||
created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL
|
||||
);
|
||||
`);
|
||||
this.db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_sessions(
|
||||
session_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
session_uuid TEXT NOT NULL UNIQUE,
|
||||
video_id INTEGER NOT NULL,
|
||||
started_at_ms INTEGER NOT NULL, ended_at_ms INTEGER,
|
||||
status INTEGER NOT NULL,
|
||||
locale_id INTEGER, target_lang_id INTEGER,
|
||||
difficulty_tier INTEGER, subtitle_mode INTEGER,
|
||||
created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL,
|
||||
FOREIGN KEY(video_id) REFERENCES imm_videos(video_id)
|
||||
);
|
||||
`);
|
||||
this.db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_session_telemetry(
|
||||
telemetry_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
session_id INTEGER NOT NULL,
|
||||
sample_ms INTEGER NOT NULL,
|
||||
total_watched_ms INTEGER NOT NULL DEFAULT 0,
|
||||
active_watched_ms INTEGER NOT NULL DEFAULT 0,
|
||||
lines_seen INTEGER NOT NULL DEFAULT 0,
|
||||
words_seen INTEGER NOT NULL DEFAULT 0,
|
||||
tokens_seen INTEGER NOT NULL DEFAULT 0,
|
||||
cards_mined INTEGER NOT NULL DEFAULT 0,
|
||||
lookup_count INTEGER NOT NULL DEFAULT 0,
|
||||
lookup_hits INTEGER NOT NULL DEFAULT 0,
|
||||
pause_count INTEGER NOT NULL DEFAULT 0,
|
||||
pause_ms INTEGER NOT NULL DEFAULT 0,
|
||||
seek_forward_count INTEGER NOT NULL DEFAULT 0,
|
||||
seek_backward_count INTEGER NOT NULL DEFAULT 0,
|
||||
media_buffer_events INTEGER NOT NULL DEFAULT 0,
|
||||
FOREIGN KEY(session_id) REFERENCES imm_sessions(session_id) ON DELETE CASCADE
|
||||
);
|
||||
`);
|
||||
this.db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_session_events(
|
||||
event_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
session_id INTEGER NOT NULL,
|
||||
ts_ms INTEGER NOT NULL,
|
||||
event_type INTEGER NOT NULL,
|
||||
line_index INTEGER,
|
||||
segment_start_ms INTEGER,
|
||||
segment_end_ms INTEGER,
|
||||
words_delta INTEGER NOT NULL DEFAULT 0,
|
||||
cards_delta INTEGER NOT NULL DEFAULT 0,
|
||||
payload_json TEXT,
|
||||
FOREIGN KEY(session_id) REFERENCES imm_sessions(session_id) ON DELETE CASCADE
|
||||
);
|
||||
`);
|
||||
this.db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_daily_rollups(
|
||||
rollup_day INTEGER NOT NULL,
|
||||
video_id INTEGER,
|
||||
total_sessions INTEGER NOT NULL DEFAULT 0,
|
||||
total_active_min REAL NOT NULL DEFAULT 0,
|
||||
total_lines_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_words_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_tokens_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_cards INTEGER NOT NULL DEFAULT 0,
|
||||
cards_per_hour REAL,
|
||||
words_per_min REAL,
|
||||
lookup_hit_rate REAL,
|
||||
PRIMARY KEY (rollup_day, video_id)
|
||||
);
|
||||
`);
|
||||
this.db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_monthly_rollups(
|
||||
rollup_month INTEGER NOT NULL,
|
||||
video_id INTEGER,
|
||||
total_sessions INTEGER NOT NULL DEFAULT 0,
|
||||
total_active_min REAL NOT NULL DEFAULT 0,
|
||||
total_lines_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_words_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_tokens_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_cards INTEGER NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (rollup_month, video_id)
|
||||
);
|
||||
`);
|
||||
|
||||
this.db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_video_started
|
||||
ON imm_sessions(video_id, started_at_ms DESC)
|
||||
`);
|
||||
this.db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_status_started
|
||||
ON imm_sessions(status, started_at_ms DESC)
|
||||
`);
|
||||
this.db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_telemetry_session_sample
|
||||
ON imm_session_telemetry(session_id, sample_ms DESC)
|
||||
`);
|
||||
this.db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_events_session_ts
|
||||
ON imm_session_events(session_id, ts_ms DESC)
|
||||
`);
|
||||
this.db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_events_type_ts
|
||||
ON imm_session_events(event_type, ts_ms DESC)
|
||||
`);
|
||||
this.db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_rollups_day_video
|
||||
ON imm_daily_rollups(rollup_day, video_id)
|
||||
`);
|
||||
this.db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_rollups_month_video
|
||||
ON imm_monthly_rollups(rollup_month, video_id)
|
||||
`);
|
||||
|
||||
this.db.exec(`
|
||||
INSERT INTO imm_schema_version(schema_version, applied_at_ms)
|
||||
VALUES (${SCHEMA_VERSION}, ${Date.now()})
|
||||
ON CONFLICT DO NOTHING
|
||||
`);
|
||||
executeQueuedWrite(write, this.preparedStatements);
|
||||
}
|
||||
|
||||
private scheduleMaintenance(): void {
|
||||
@@ -771,7 +568,7 @@ export class ImmersionTrackerService {
|
||||
dailyRollupRetentionMs: this.dailyRollupRetentionMs,
|
||||
monthlyRollupRetentionMs: this.monthlyRollupRetentionMs,
|
||||
});
|
||||
runRollupMaintenance(this.db);
|
||||
this.runRollupMaintenance();
|
||||
|
||||
if (nowMs - this.lastVacuumMs >= this.vacuumIntervalMs && !this.writeLock.locked) {
|
||||
this.db.exec('VACUUM');
|
||||
@@ -785,15 +582,17 @@ export class ImmersionTrackerService {
|
||||
}
|
||||
}
|
||||
|
||||
private runRollupMaintenance(): void {
|
||||
runRollupMaintenance(this.db);
|
||||
}
|
||||
|
||||
private startSession(videoId: number, startedAtMs?: number): void {
|
||||
const nowMs = startedAtMs ?? Date.now();
|
||||
const result = this.startSessionStatement(videoId, nowMs);
|
||||
const sessionId = Number(result.lastInsertRowid);
|
||||
this.sessionState = createInitialSessionState(sessionId, videoId, nowMs);
|
||||
const { sessionId, state } = startSessionRecord(this.db, videoId, startedAtMs);
|
||||
this.sessionState = state;
|
||||
this.recordWrite({
|
||||
kind: 'telemetry',
|
||||
sessionId,
|
||||
sampleMs: nowMs,
|
||||
sampleMs: state.startedAtMs,
|
||||
totalWatchedMs: 0,
|
||||
activeWatchedMs: 0,
|
||||
linesSeen: 0,
|
||||
@@ -811,24 +610,6 @@ export class ImmersionTrackerService {
|
||||
this.scheduleFlush(0);
|
||||
}
|
||||
|
||||
private startSessionStatement(
|
||||
videoId: number,
|
||||
startedAtMs: number,
|
||||
): {
|
||||
lastInsertRowid: number | bigint;
|
||||
} {
|
||||
const sessionUuid = crypto.randomUUID();
|
||||
return this.db
|
||||
.prepare(
|
||||
`
|
||||
INSERT INTO imm_sessions (
|
||||
session_uuid, video_id, started_at_ms, status, created_at_ms, updated_at_ms
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
`,
|
||||
)
|
||||
.run(sessionUuid, videoId, startedAtMs, SESSION_STATUS_ACTIVE, startedAtMs, startedAtMs);
|
||||
}
|
||||
|
||||
private finalizeActiveSession(): void {
|
||||
if (!this.sessionState) return;
|
||||
const endedAt = Date.now();
|
||||
@@ -850,250 +631,24 @@ export class ImmersionTrackerService {
|
||||
this.flushNow();
|
||||
this.sessionState.pendingTelemetry = false;
|
||||
|
||||
this.db
|
||||
.prepare(
|
||||
'UPDATE imm_sessions SET ended_at_ms = ?, status = ?, updated_at_ms = ? WHERE session_id = ?',
|
||||
)
|
||||
.run(endedAt, SESSION_STATUS_ENDED, Date.now(), this.sessionState.sessionId);
|
||||
finalizeSessionRecord(this.db, this.sessionState, endedAt);
|
||||
this.sessionState = null;
|
||||
}
|
||||
|
||||
private getOrCreateVideo(
|
||||
videoKey: string,
|
||||
details: {
|
||||
canonicalTitle: string;
|
||||
sourcePath: string | null;
|
||||
sourceUrl: string | null;
|
||||
sourceType: number;
|
||||
},
|
||||
): number {
|
||||
const existing = this.db
|
||||
.prepare('SELECT video_id FROM imm_videos WHERE video_key = ?')
|
||||
.get(videoKey) as { video_id: number } | null;
|
||||
if (existing?.video_id) {
|
||||
this.db
|
||||
.prepare('UPDATE imm_videos SET canonical_title = ?, updated_at_ms = ? WHERE video_id = ?')
|
||||
.run(details.canonicalTitle || 'unknown', Date.now(), existing.video_id);
|
||||
return existing.video_id;
|
||||
}
|
||||
|
||||
const nowMs = Date.now();
|
||||
const insert = this.db.prepare(`
|
||||
INSERT INTO imm_videos (
|
||||
video_key, canonical_title, source_type, source_path, source_url,
|
||||
duration_ms, file_size_bytes, codec_id, container_id, width_px, height_px,
|
||||
fps_x100, bitrate_kbps, audio_codec_id, hash_sha256, screenshot_path,
|
||||
metadata_json, created_at_ms, updated_at_ms
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`);
|
||||
const result = insert.run(
|
||||
videoKey,
|
||||
details.canonicalTitle || 'unknown',
|
||||
details.sourceType,
|
||||
details.sourcePath,
|
||||
details.sourceUrl,
|
||||
0,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
nowMs,
|
||||
nowMs,
|
||||
);
|
||||
return Number(result.lastInsertRowid);
|
||||
}
|
||||
|
||||
private updateVideoMetadata(videoId: number, metadata: VideoMetadata): void {
|
||||
this.db
|
||||
.prepare(
|
||||
`
|
||||
UPDATE imm_videos
|
||||
SET
|
||||
duration_ms = ?,
|
||||
file_size_bytes = ?,
|
||||
codec_id = ?,
|
||||
container_id = ?,
|
||||
width_px = ?,
|
||||
height_px = ?,
|
||||
fps_x100 = ?,
|
||||
bitrate_kbps = ?,
|
||||
audio_codec_id = ?,
|
||||
hash_sha256 = ?,
|
||||
screenshot_path = ?,
|
||||
metadata_json = ?,
|
||||
updated_at_ms = ?
|
||||
WHERE video_id = ?
|
||||
`,
|
||||
)
|
||||
.run(
|
||||
metadata.durationMs,
|
||||
metadata.fileSizeBytes,
|
||||
metadata.codecId,
|
||||
metadata.containerId,
|
||||
metadata.widthPx,
|
||||
metadata.heightPx,
|
||||
metadata.fpsX100,
|
||||
metadata.bitrateKbps,
|
||||
metadata.audioCodecId,
|
||||
metadata.hashSha256,
|
||||
metadata.screenshotPath,
|
||||
metadata.metadataJson,
|
||||
Date.now(),
|
||||
videoId,
|
||||
);
|
||||
}
|
||||
|
||||
private captureVideoMetadataAsync(videoId: number, sourceType: number, mediaPath: string): void {
|
||||
if (sourceType !== SOURCE_TYPE_LOCAL) return;
|
||||
void (async () => {
|
||||
try {
|
||||
const metadata = await this.getLocalVideoMetadata(mediaPath);
|
||||
this.updateVideoMetadata(videoId, metadata);
|
||||
const metadata = await getLocalVideoMetadata(mediaPath);
|
||||
updateVideoMetadataRecord(this.db, videoId, metadata);
|
||||
} catch (error) {
|
||||
this.logger.warn('Unable to capture local video metadata', (error as Error).message);
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
private async getLocalVideoMetadata(mediaPath: string): Promise<VideoMetadata> {
|
||||
const hash = await this.computeSha256(mediaPath);
|
||||
const info = await this.runFfprobe(mediaPath);
|
||||
const stat = await fs.promises.stat(mediaPath);
|
||||
return {
|
||||
sourceType: SOURCE_TYPE_LOCAL,
|
||||
canonicalTitle: deriveCanonicalTitle(mediaPath),
|
||||
durationMs: info.durationMs || 0,
|
||||
fileSizeBytes: Number.isFinite(stat.size) ? stat.size : null,
|
||||
codecId: info.codecId ?? null,
|
||||
containerId: info.containerId ?? null,
|
||||
widthPx: info.widthPx ?? null,
|
||||
heightPx: info.heightPx ?? null,
|
||||
fpsX100: info.fpsX100 ?? null,
|
||||
bitrateKbps: info.bitrateKbps ?? null,
|
||||
audioCodecId: info.audioCodecId ?? null,
|
||||
hashSha256: hash,
|
||||
screenshotPath: null,
|
||||
metadataJson: null,
|
||||
};
|
||||
}
|
||||
|
||||
private async computeSha256(mediaPath: string): Promise<string | null> {
|
||||
return new Promise((resolve) => {
|
||||
const file = fs.createReadStream(mediaPath);
|
||||
const digest = crypto.createHash('sha256');
|
||||
file.on('data', (chunk) => digest.update(chunk));
|
||||
file.on('end', () => resolve(digest.digest('hex')));
|
||||
file.on('error', () => resolve(null));
|
||||
});
|
||||
}
|
||||
|
||||
private runFfprobe(mediaPath: string): Promise<{
|
||||
durationMs: number | null;
|
||||
codecId: number | null;
|
||||
containerId: number | null;
|
||||
widthPx: number | null;
|
||||
heightPx: number | null;
|
||||
fpsX100: number | null;
|
||||
bitrateKbps: number | null;
|
||||
audioCodecId: number | null;
|
||||
}> {
|
||||
return new Promise((resolve) => {
|
||||
const child = spawn('ffprobe', [
|
||||
'-v',
|
||||
'error',
|
||||
'-print_format',
|
||||
'json',
|
||||
'-show_entries',
|
||||
'stream=codec_type,codec_tag_string,width,height,avg_frame_rate,bit_rate',
|
||||
'-show_entries',
|
||||
'format=duration,bit_rate',
|
||||
mediaPath,
|
||||
]);
|
||||
|
||||
let output = '';
|
||||
let errorOutput = '';
|
||||
child.stdout.on('data', (chunk) => {
|
||||
output += chunk.toString('utf-8');
|
||||
});
|
||||
child.stderr.on('data', (chunk) => {
|
||||
errorOutput += chunk.toString('utf-8');
|
||||
});
|
||||
child.on('error', () => resolve(emptyMetadata()));
|
||||
child.on('close', () => {
|
||||
if (errorOutput && output.length === 0) {
|
||||
resolve(emptyMetadata());
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(output) as {
|
||||
format?: { duration?: string; bit_rate?: string };
|
||||
streams?: Array<{
|
||||
codec_type?: string;
|
||||
codec_tag_string?: string;
|
||||
width?: number;
|
||||
height?: number;
|
||||
avg_frame_rate?: string;
|
||||
bit_rate?: string;
|
||||
}>;
|
||||
};
|
||||
|
||||
const durationText = parsed.format?.duration;
|
||||
const bitrateText = parsed.format?.bit_rate;
|
||||
const durationMs = Number(durationText) ? Math.round(Number(durationText) * 1000) : null;
|
||||
const bitrateKbps = Number(bitrateText) ? Math.round(Number(bitrateText) / 1000) : null;
|
||||
|
||||
let codecId: number | null = null;
|
||||
let containerId: number | null = null;
|
||||
let widthPx: number | null = null;
|
||||
let heightPx: number | null = null;
|
||||
let fpsX100: number | null = null;
|
||||
let audioCodecId: number | null = null;
|
||||
|
||||
for (const stream of parsed.streams ?? []) {
|
||||
if (stream.codec_type === 'video') {
|
||||
widthPx = toNullableInt(stream.width);
|
||||
heightPx = toNullableInt(stream.height);
|
||||
fpsX100 = parseFps(stream.avg_frame_rate);
|
||||
codecId = hashToCode(stream.codec_tag_string);
|
||||
containerId = 0;
|
||||
}
|
||||
if (stream.codec_type === 'audio') {
|
||||
audioCodecId = hashToCode(stream.codec_tag_string);
|
||||
if (audioCodecId && audioCodecId > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resolve({
|
||||
durationMs,
|
||||
codecId,
|
||||
containerId,
|
||||
widthPx,
|
||||
heightPx,
|
||||
fpsX100,
|
||||
bitrateKbps,
|
||||
audioCodecId,
|
||||
});
|
||||
} catch {
|
||||
resolve(emptyMetadata());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private updateVideoTitleForActiveSession(canonicalTitle: string): void {
|
||||
if (!this.sessionState) return;
|
||||
this.db
|
||||
.prepare('UPDATE imm_videos SET canonical_title = ?, updated_at_ms = ? WHERE video_id = ?')
|
||||
.run(canonicalTitle, Date.now(), this.sessionState.videoId);
|
||||
updateVideoTitleRecord(this.db, this.sessionState.videoId, canonicalTitle);
|
||||
}
|
||||
}
|
||||
|
||||
148
src/core/services/immersion-tracker/metadata.test.ts
Normal file
148
src/core/services/immersion-tracker/metadata.test.ts
Normal file
@@ -0,0 +1,148 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { createHash } from 'node:crypto';
|
||||
import { EventEmitter } from 'node:events';
|
||||
import test from 'node:test';
|
||||
import type { spawn as spawnFn } from 'node:child_process';
|
||||
import { SOURCE_TYPE_LOCAL } from './types';
|
||||
import { getLocalVideoMetadata, runFfprobe } from './metadata';
|
||||
|
||||
type Spawn = typeof spawnFn;
|
||||
|
||||
function createSpawnStub(options: {
|
||||
stdout?: string;
|
||||
stderr?: string;
|
||||
emitError?: boolean;
|
||||
}): Spawn {
|
||||
return (() => {
|
||||
const child = new EventEmitter() as EventEmitter & {
|
||||
stdout: EventEmitter;
|
||||
stderr: EventEmitter;
|
||||
};
|
||||
child.stdout = new EventEmitter();
|
||||
child.stderr = new EventEmitter();
|
||||
|
||||
queueMicrotask(() => {
|
||||
if (options.emitError) {
|
||||
child.emit('error', new Error('ffprobe failed'));
|
||||
return;
|
||||
}
|
||||
if (options.stderr) {
|
||||
child.stderr.emit('data', Buffer.from(options.stderr));
|
||||
}
|
||||
if (options.stdout !== undefined) {
|
||||
child.stdout.emit('data', Buffer.from(options.stdout));
|
||||
}
|
||||
child.emit('close', 0);
|
||||
});
|
||||
|
||||
return child as unknown as ReturnType<Spawn>;
|
||||
}) as Spawn;
|
||||
}
|
||||
|
||||
test('runFfprobe parses valid JSON from stream and format sections', async () => {
|
||||
const metadata = await runFfprobe('/tmp/video.mp4', {
|
||||
spawn: createSpawnStub({
|
||||
stdout: JSON.stringify({
|
||||
format: { duration: '12.34', bit_rate: '3456000' },
|
||||
streams: [
|
||||
{
|
||||
codec_type: 'video',
|
||||
codec_tag_string: 'avc1',
|
||||
width: 1920,
|
||||
height: 1080,
|
||||
avg_frame_rate: '24000/1001',
|
||||
},
|
||||
{
|
||||
codec_type: 'audio',
|
||||
codec_tag_string: 'mp4a',
|
||||
},
|
||||
],
|
||||
}),
|
||||
}),
|
||||
});
|
||||
|
||||
assert.equal(metadata.durationMs, 12340);
|
||||
assert.equal(metadata.bitrateKbps, 3456);
|
||||
assert.equal(metadata.widthPx, 1920);
|
||||
assert.equal(metadata.heightPx, 1080);
|
||||
assert.equal(metadata.fpsX100, 2398);
|
||||
assert.equal(metadata.containerId, 0);
|
||||
assert.ok(Number(metadata.codecId) > 0);
|
||||
assert.ok(Number(metadata.audioCodecId) > 0);
|
||||
});
|
||||
|
||||
test('runFfprobe returns empty metadata for invalid JSON and process errors', async () => {
|
||||
const invalidJsonMetadata = await runFfprobe('/tmp/broken.mp4', {
|
||||
spawn: createSpawnStub({ stdout: '{invalid' }),
|
||||
});
|
||||
assert.deepEqual(invalidJsonMetadata, {
|
||||
durationMs: null,
|
||||
codecId: null,
|
||||
containerId: null,
|
||||
widthPx: null,
|
||||
heightPx: null,
|
||||
fpsX100: null,
|
||||
bitrateKbps: null,
|
||||
audioCodecId: null,
|
||||
});
|
||||
|
||||
const errorMetadata = await runFfprobe('/tmp/error.mp4', {
|
||||
spawn: createSpawnStub({ emitError: true }),
|
||||
});
|
||||
assert.deepEqual(errorMetadata, {
|
||||
durationMs: null,
|
||||
codecId: null,
|
||||
containerId: null,
|
||||
widthPx: null,
|
||||
heightPx: null,
|
||||
fpsX100: null,
|
||||
bitrateKbps: null,
|
||||
audioCodecId: null,
|
||||
});
|
||||
});
|
||||
|
||||
test('getLocalVideoMetadata derives title and falls back to null hash on read errors', async () => {
|
||||
const successMetadata = await getLocalVideoMetadata('/tmp/Episode 01.mkv', {
|
||||
spawn: createSpawnStub({ stdout: JSON.stringify({ format: { duration: '0' }, streams: [] }) }),
|
||||
fs: {
|
||||
createReadStream: () => {
|
||||
const stream = new EventEmitter();
|
||||
queueMicrotask(() => {
|
||||
stream.emit('data', Buffer.from('hello world'));
|
||||
stream.emit('end');
|
||||
});
|
||||
return stream as unknown as ReturnType<typeof import('node:fs').createReadStream>;
|
||||
},
|
||||
promises: {
|
||||
stat: (async () => ({ size: 1234 }) as unknown) as typeof import('node:fs').promises.stat,
|
||||
},
|
||||
} as never,
|
||||
});
|
||||
|
||||
assert.equal(successMetadata.sourceType, SOURCE_TYPE_LOCAL);
|
||||
assert.equal(successMetadata.canonicalTitle, 'Episode 01');
|
||||
assert.equal(successMetadata.fileSizeBytes, 1234);
|
||||
assert.equal(
|
||||
successMetadata.hashSha256,
|
||||
createHash('sha256').update('hello world').digest('hex'),
|
||||
);
|
||||
|
||||
const hashFallbackMetadata = await getLocalVideoMetadata('/tmp/Episode 02.mkv', {
|
||||
spawn: createSpawnStub({ stdout: JSON.stringify({ format: {}, streams: [] }) }),
|
||||
fs: {
|
||||
createReadStream: () => {
|
||||
const stream = new EventEmitter();
|
||||
queueMicrotask(() => {
|
||||
stream.emit('error', new Error('read failed'));
|
||||
});
|
||||
return stream as unknown as ReturnType<typeof import('node:fs').createReadStream>;
|
||||
},
|
||||
promises: {
|
||||
stat: (async () => ({ size: 5678 }) as unknown) as typeof import('node:fs').promises.stat,
|
||||
},
|
||||
} as never,
|
||||
});
|
||||
|
||||
assert.equal(hashFallbackMetadata.canonicalTitle, 'Episode 02');
|
||||
assert.equal(hashFallbackMetadata.hashSha256, null);
|
||||
});
|
||||
153
src/core/services/immersion-tracker/metadata.ts
Normal file
153
src/core/services/immersion-tracker/metadata.ts
Normal file
@@ -0,0 +1,153 @@
|
||||
import crypto from 'node:crypto';
|
||||
import { spawn as nodeSpawn } from 'node:child_process';
|
||||
import * as fs from 'node:fs';
|
||||
import {
|
||||
deriveCanonicalTitle,
|
||||
emptyMetadata,
|
||||
hashToCode,
|
||||
parseFps,
|
||||
toNullableInt,
|
||||
} from './reducer';
|
||||
import { SOURCE_TYPE_LOCAL, type ProbeMetadata, type VideoMetadata } from './types';
|
||||
|
||||
type SpawnFn = typeof nodeSpawn;
|
||||
|
||||
interface FsDeps {
|
||||
createReadStream: typeof fs.createReadStream;
|
||||
promises: {
|
||||
stat: typeof fs.promises.stat;
|
||||
};
|
||||
}
|
||||
|
||||
interface MetadataDeps {
|
||||
spawn?: SpawnFn;
|
||||
fs?: FsDeps;
|
||||
}
|
||||
|
||||
export async function computeSha256(
|
||||
mediaPath: string,
|
||||
deps: MetadataDeps = {},
|
||||
): Promise<string | null> {
|
||||
const fileSystem = deps.fs ?? fs;
|
||||
return new Promise((resolve) => {
|
||||
const file = fileSystem.createReadStream(mediaPath);
|
||||
const digest = crypto.createHash('sha256');
|
||||
file.on('data', (chunk) => digest.update(chunk));
|
||||
file.on('end', () => resolve(digest.digest('hex')));
|
||||
file.on('error', () => resolve(null));
|
||||
});
|
||||
}
|
||||
|
||||
export function runFfprobe(mediaPath: string, deps: MetadataDeps = {}): Promise<ProbeMetadata> {
|
||||
const spawn = deps.spawn ?? nodeSpawn;
|
||||
return new Promise((resolve) => {
|
||||
const child = spawn('ffprobe', [
|
||||
'-v',
|
||||
'error',
|
||||
'-print_format',
|
||||
'json',
|
||||
'-show_entries',
|
||||
'stream=codec_type,codec_tag_string,width,height,avg_frame_rate,bit_rate',
|
||||
'-show_entries',
|
||||
'format=duration,bit_rate',
|
||||
mediaPath,
|
||||
]);
|
||||
|
||||
let output = '';
|
||||
let errorOutput = '';
|
||||
child.stdout.on('data', (chunk) => {
|
||||
output += chunk.toString('utf-8');
|
||||
});
|
||||
child.stderr.on('data', (chunk) => {
|
||||
errorOutput += chunk.toString('utf-8');
|
||||
});
|
||||
child.on('error', () => resolve(emptyMetadata()));
|
||||
child.on('close', () => {
|
||||
if (errorOutput && output.length === 0) {
|
||||
resolve(emptyMetadata());
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(output) as {
|
||||
format?: { duration?: string; bit_rate?: string };
|
||||
streams?: Array<{
|
||||
codec_type?: string;
|
||||
codec_tag_string?: string;
|
||||
width?: number;
|
||||
height?: number;
|
||||
avg_frame_rate?: string;
|
||||
bit_rate?: string;
|
||||
}>;
|
||||
};
|
||||
|
||||
const durationText = parsed.format?.duration;
|
||||
const bitrateText = parsed.format?.bit_rate;
|
||||
const durationMs = Number(durationText) ? Math.round(Number(durationText) * 1000) : null;
|
||||
const bitrateKbps = Number(bitrateText) ? Math.round(Number(bitrateText) / 1000) : null;
|
||||
|
||||
let codecId: number | null = null;
|
||||
let containerId: number | null = null;
|
||||
let widthPx: number | null = null;
|
||||
let heightPx: number | null = null;
|
||||
let fpsX100: number | null = null;
|
||||
let audioCodecId: number | null = null;
|
||||
|
||||
for (const stream of parsed.streams ?? []) {
|
||||
if (stream.codec_type === 'video') {
|
||||
widthPx = toNullableInt(stream.width);
|
||||
heightPx = toNullableInt(stream.height);
|
||||
fpsX100 = parseFps(stream.avg_frame_rate);
|
||||
codecId = hashToCode(stream.codec_tag_string);
|
||||
containerId = 0;
|
||||
}
|
||||
if (stream.codec_type === 'audio') {
|
||||
audioCodecId = hashToCode(stream.codec_tag_string);
|
||||
if (audioCodecId && audioCodecId > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resolve({
|
||||
durationMs,
|
||||
codecId,
|
||||
containerId,
|
||||
widthPx,
|
||||
heightPx,
|
||||
fpsX100,
|
||||
bitrateKbps,
|
||||
audioCodecId,
|
||||
});
|
||||
} catch {
|
||||
resolve(emptyMetadata());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export async function getLocalVideoMetadata(
|
||||
mediaPath: string,
|
||||
deps: MetadataDeps = {},
|
||||
): Promise<VideoMetadata> {
|
||||
const fileSystem = deps.fs ?? fs;
|
||||
const hash = await computeSha256(mediaPath, deps);
|
||||
const info = await runFfprobe(mediaPath, deps);
|
||||
const stat = await fileSystem.promises.stat(mediaPath);
|
||||
return {
|
||||
sourceType: SOURCE_TYPE_LOCAL,
|
||||
canonicalTitle: deriveCanonicalTitle(mediaPath),
|
||||
durationMs: info.durationMs || 0,
|
||||
fileSizeBytes: Number.isFinite(stat.size) ? stat.size : null,
|
||||
codecId: info.codecId ?? null,
|
||||
containerId: info.containerId ?? null,
|
||||
widthPx: info.widthPx ?? null,
|
||||
heightPx: info.heightPx ?? null,
|
||||
fpsX100: info.fpsX100 ?? null,
|
||||
bitrateKbps: info.bitrateKbps ?? null,
|
||||
audioCodecId: info.audioCodecId ?? null,
|
||||
hashSha256: hash,
|
||||
screenshotPath: null,
|
||||
metadataJson: null,
|
||||
};
|
||||
}
|
||||
37
src/core/services/immersion-tracker/session.ts
Normal file
37
src/core/services/immersion-tracker/session.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import crypto from 'node:crypto';
|
||||
import type { DatabaseSync } from 'node:sqlite';
|
||||
import { createInitialSessionState } from './reducer';
|
||||
import { SESSION_STATUS_ACTIVE, SESSION_STATUS_ENDED } from './types';
|
||||
import type { SessionState } from './types';
|
||||
|
||||
export function startSessionRecord(
|
||||
db: DatabaseSync,
|
||||
videoId: number,
|
||||
startedAtMs = Date.now(),
|
||||
): { sessionId: number; state: SessionState } {
|
||||
const sessionUuid = crypto.randomUUID();
|
||||
const result = db
|
||||
.prepare(
|
||||
`
|
||||
INSERT INTO imm_sessions (
|
||||
session_uuid, video_id, started_at_ms, status, created_at_ms, updated_at_ms
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
`,
|
||||
)
|
||||
.run(sessionUuid, videoId, startedAtMs, SESSION_STATUS_ACTIVE, startedAtMs, startedAtMs);
|
||||
const sessionId = Number(result.lastInsertRowid);
|
||||
return {
|
||||
sessionId,
|
||||
state: createInitialSessionState(sessionId, videoId, startedAtMs),
|
||||
};
|
||||
}
|
||||
|
||||
export function finalizeSessionRecord(
|
||||
db: DatabaseSync,
|
||||
sessionState: SessionState,
|
||||
endedAtMs = Date.now(),
|
||||
): void {
|
||||
db.prepare(
|
||||
'UPDATE imm_sessions SET ended_at_ms = ?, status = ?, updated_at_ms = ? WHERE session_id = ?',
|
||||
).run(endedAtMs, SESSION_STATUS_ENDED, Date.now(), sessionState.sessionId);
|
||||
}
|
||||
162
src/core/services/immersion-tracker/storage-session.test.ts
Normal file
162
src/core/services/immersion-tracker/storage-session.test.ts
Normal file
@@ -0,0 +1,162 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import fs from 'node:fs';
|
||||
import os from 'node:os';
|
||||
import path from 'node:path';
|
||||
import test from 'node:test';
|
||||
import type { DatabaseSync as NodeDatabaseSync } from 'node:sqlite';
|
||||
import { finalizeSessionRecord, startSessionRecord } from './session';
|
||||
import {
|
||||
createTrackerPreparedStatements,
|
||||
ensureSchema,
|
||||
executeQueuedWrite,
|
||||
getOrCreateVideoRecord,
|
||||
} from './storage';
|
||||
import { EVENT_SUBTITLE_LINE, SESSION_STATUS_ENDED, SOURCE_TYPE_LOCAL } from './types';
|
||||
|
||||
type DatabaseSyncCtor = typeof NodeDatabaseSync;
|
||||
const DatabaseSync: DatabaseSyncCtor | null = (() => {
|
||||
try {
|
||||
return (require('node:sqlite') as { DatabaseSync?: DatabaseSyncCtor }).DatabaseSync ?? null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
})();
|
||||
const testIfSqlite = DatabaseSync ? test : test.skip;
|
||||
|
||||
function makeDbPath(): string {
|
||||
const dir = fs.mkdtempSync(path.join(os.tmpdir(), 'subminer-imm-storage-session-'));
|
||||
return path.join(dir, 'immersion.sqlite');
|
||||
}
|
||||
|
||||
function cleanupDbPath(dbPath: string): void {
|
||||
const dir = path.dirname(dbPath);
|
||||
if (fs.existsSync(dir)) {
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
testIfSqlite('ensureSchema creates immersion core tables', () => {
|
||||
const dbPath = makeDbPath();
|
||||
const db = new DatabaseSync!(dbPath);
|
||||
|
||||
try {
|
||||
ensureSchema(db);
|
||||
const rows = db
|
||||
.prepare(
|
||||
`SELECT name FROM sqlite_master WHERE type = 'table' AND name LIKE 'imm_%' ORDER BY name`,
|
||||
)
|
||||
.all() as Array<{ name: string }>;
|
||||
const tableNames = new Set(rows.map((row) => row.name));
|
||||
|
||||
assert.ok(tableNames.has('imm_videos'));
|
||||
assert.ok(tableNames.has('imm_sessions'));
|
||||
assert.ok(tableNames.has('imm_session_telemetry'));
|
||||
assert.ok(tableNames.has('imm_session_events'));
|
||||
assert.ok(tableNames.has('imm_daily_rollups'));
|
||||
assert.ok(tableNames.has('imm_monthly_rollups'));
|
||||
} finally {
|
||||
db.close();
|
||||
cleanupDbPath(dbPath);
|
||||
}
|
||||
});
|
||||
|
||||
testIfSqlite('start/finalize session updates ended_at and status', () => {
|
||||
const dbPath = makeDbPath();
|
||||
const db = new DatabaseSync!(dbPath);
|
||||
|
||||
try {
|
||||
ensureSchema(db);
|
||||
const videoId = getOrCreateVideoRecord(db, 'local:/tmp/slice-a.mkv', {
|
||||
canonicalTitle: 'Slice A Episode',
|
||||
sourcePath: '/tmp/slice-a.mkv',
|
||||
sourceUrl: null,
|
||||
sourceType: SOURCE_TYPE_LOCAL,
|
||||
});
|
||||
const startedAtMs = 1_234_567_000;
|
||||
const endedAtMs = startedAtMs + 8_500;
|
||||
const { sessionId, state } = startSessionRecord(db, videoId, startedAtMs);
|
||||
|
||||
finalizeSessionRecord(db, state, endedAtMs);
|
||||
|
||||
const row = db
|
||||
.prepare('SELECT ended_at_ms, status FROM imm_sessions WHERE session_id = ?')
|
||||
.get(sessionId) as {
|
||||
ended_at_ms: number | null;
|
||||
status: number;
|
||||
} | null;
|
||||
|
||||
assert.ok(row);
|
||||
assert.equal(row?.ended_at_ms, endedAtMs);
|
||||
assert.equal(row?.status, SESSION_STATUS_ENDED);
|
||||
} finally {
|
||||
db.close();
|
||||
cleanupDbPath(dbPath);
|
||||
}
|
||||
});
|
||||
|
||||
testIfSqlite('executeQueuedWrite inserts event and telemetry rows', () => {
|
||||
const dbPath = makeDbPath();
|
||||
const db = new DatabaseSync!(dbPath);
|
||||
|
||||
try {
|
||||
ensureSchema(db);
|
||||
const stmts = createTrackerPreparedStatements(db);
|
||||
const videoId = getOrCreateVideoRecord(db, 'local:/tmp/slice-a-events.mkv', {
|
||||
canonicalTitle: 'Slice A Events',
|
||||
sourcePath: '/tmp/slice-a-events.mkv',
|
||||
sourceUrl: null,
|
||||
sourceType: SOURCE_TYPE_LOCAL,
|
||||
});
|
||||
const { sessionId } = startSessionRecord(db, videoId, 5_000);
|
||||
|
||||
executeQueuedWrite(
|
||||
{
|
||||
kind: 'telemetry',
|
||||
sessionId,
|
||||
sampleMs: 6_000,
|
||||
totalWatchedMs: 1_000,
|
||||
activeWatchedMs: 900,
|
||||
linesSeen: 3,
|
||||
wordsSeen: 6,
|
||||
tokensSeen: 6,
|
||||
cardsMined: 1,
|
||||
lookupCount: 2,
|
||||
lookupHits: 1,
|
||||
pauseCount: 1,
|
||||
pauseMs: 50,
|
||||
seekForwardCount: 0,
|
||||
seekBackwardCount: 0,
|
||||
mediaBufferEvents: 0,
|
||||
},
|
||||
stmts,
|
||||
);
|
||||
executeQueuedWrite(
|
||||
{
|
||||
kind: 'event',
|
||||
sessionId,
|
||||
sampleMs: 6_100,
|
||||
eventType: EVENT_SUBTITLE_LINE,
|
||||
lineIndex: 1,
|
||||
segmentStartMs: 0,
|
||||
segmentEndMs: 800,
|
||||
wordsDelta: 2,
|
||||
cardsDelta: 0,
|
||||
payloadJson: '{"event":"subtitle-line"}',
|
||||
},
|
||||
stmts,
|
||||
);
|
||||
|
||||
const telemetryCount = db
|
||||
.prepare('SELECT COUNT(*) AS total FROM imm_session_telemetry WHERE session_id = ?')
|
||||
.get(sessionId) as { total: number };
|
||||
const eventCount = db
|
||||
.prepare('SELECT COUNT(*) AS total FROM imm_session_events WHERE session_id = ?')
|
||||
.get(sessionId) as { total: number };
|
||||
|
||||
assert.equal(telemetryCount.total, 1);
|
||||
assert.equal(eventCount.total, 1);
|
||||
} finally {
|
||||
db.close();
|
||||
cleanupDbPath(dbPath);
|
||||
}
|
||||
});
|
||||
328
src/core/services/immersion-tracker/storage.ts
Normal file
328
src/core/services/immersion-tracker/storage.ts
Normal file
@@ -0,0 +1,328 @@
|
||||
import type { DatabaseSync } from 'node:sqlite';
|
||||
import { SCHEMA_VERSION } from './types';
|
||||
import type { QueuedWrite, VideoMetadata } from './types';
|
||||
|
||||
export interface TrackerPreparedStatements {
|
||||
telemetryInsertStmt: ReturnType<DatabaseSync['prepare']>;
|
||||
eventInsertStmt: ReturnType<DatabaseSync['prepare']>;
|
||||
}
|
||||
|
||||
export function applyPragmas(db: DatabaseSync): void {
|
||||
db.exec('PRAGMA journal_mode = WAL');
|
||||
db.exec('PRAGMA synchronous = NORMAL');
|
||||
db.exec('PRAGMA foreign_keys = ON');
|
||||
db.exec('PRAGMA busy_timeout = 2500');
|
||||
}
|
||||
|
||||
export function ensureSchema(db: DatabaseSync): void {
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_schema_version (
|
||||
schema_version INTEGER PRIMARY KEY,
|
||||
applied_at_ms INTEGER NOT NULL
|
||||
);
|
||||
`);
|
||||
|
||||
const currentVersion = db
|
||||
.prepare('SELECT schema_version FROM imm_schema_version ORDER BY schema_version DESC LIMIT 1')
|
||||
.get() as { schema_version: number } | null;
|
||||
if (currentVersion?.schema_version === SCHEMA_VERSION) {
|
||||
return;
|
||||
}
|
||||
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_videos(
|
||||
video_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
video_key TEXT NOT NULL UNIQUE,
|
||||
canonical_title TEXT NOT NULL,
|
||||
source_type INTEGER NOT NULL,
|
||||
source_path TEXT,
|
||||
source_url TEXT,
|
||||
duration_ms INTEGER NOT NULL CHECK(duration_ms>=0),
|
||||
file_size_bytes INTEGER CHECK(file_size_bytes>=0),
|
||||
codec_id INTEGER, container_id INTEGER,
|
||||
width_px INTEGER, height_px INTEGER, fps_x100 INTEGER,
|
||||
bitrate_kbps INTEGER, audio_codec_id INTEGER,
|
||||
hash_sha256 TEXT, screenshot_path TEXT,
|
||||
metadata_json TEXT,
|
||||
created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL
|
||||
);
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_sessions(
|
||||
session_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
session_uuid TEXT NOT NULL UNIQUE,
|
||||
video_id INTEGER NOT NULL,
|
||||
started_at_ms INTEGER NOT NULL, ended_at_ms INTEGER,
|
||||
status INTEGER NOT NULL,
|
||||
locale_id INTEGER, target_lang_id INTEGER,
|
||||
difficulty_tier INTEGER, subtitle_mode INTEGER,
|
||||
created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL,
|
||||
FOREIGN KEY(video_id) REFERENCES imm_videos(video_id)
|
||||
);
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_session_telemetry(
|
||||
telemetry_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
session_id INTEGER NOT NULL,
|
||||
sample_ms INTEGER NOT NULL,
|
||||
total_watched_ms INTEGER NOT NULL DEFAULT 0,
|
||||
active_watched_ms INTEGER NOT NULL DEFAULT 0,
|
||||
lines_seen INTEGER NOT NULL DEFAULT 0,
|
||||
words_seen INTEGER NOT NULL DEFAULT 0,
|
||||
tokens_seen INTEGER NOT NULL DEFAULT 0,
|
||||
cards_mined INTEGER NOT NULL DEFAULT 0,
|
||||
lookup_count INTEGER NOT NULL DEFAULT 0,
|
||||
lookup_hits INTEGER NOT NULL DEFAULT 0,
|
||||
pause_count INTEGER NOT NULL DEFAULT 0,
|
||||
pause_ms INTEGER NOT NULL DEFAULT 0,
|
||||
seek_forward_count INTEGER NOT NULL DEFAULT 0,
|
||||
seek_backward_count INTEGER NOT NULL DEFAULT 0,
|
||||
media_buffer_events INTEGER NOT NULL DEFAULT 0,
|
||||
FOREIGN KEY(session_id) REFERENCES imm_sessions(session_id) ON DELETE CASCADE
|
||||
);
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_session_events(
|
||||
event_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
session_id INTEGER NOT NULL,
|
||||
ts_ms INTEGER NOT NULL,
|
||||
event_type INTEGER NOT NULL,
|
||||
line_index INTEGER,
|
||||
segment_start_ms INTEGER,
|
||||
segment_end_ms INTEGER,
|
||||
words_delta INTEGER NOT NULL DEFAULT 0,
|
||||
cards_delta INTEGER NOT NULL DEFAULT 0,
|
||||
payload_json TEXT,
|
||||
FOREIGN KEY(session_id) REFERENCES imm_sessions(session_id) ON DELETE CASCADE
|
||||
);
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_daily_rollups(
|
||||
rollup_day INTEGER NOT NULL,
|
||||
video_id INTEGER,
|
||||
total_sessions INTEGER NOT NULL DEFAULT 0,
|
||||
total_active_min REAL NOT NULL DEFAULT 0,
|
||||
total_lines_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_words_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_tokens_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_cards INTEGER NOT NULL DEFAULT 0,
|
||||
cards_per_hour REAL,
|
||||
words_per_min REAL,
|
||||
lookup_hit_rate REAL,
|
||||
PRIMARY KEY (rollup_day, video_id)
|
||||
);
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS imm_monthly_rollups(
|
||||
rollup_month INTEGER NOT NULL,
|
||||
video_id INTEGER,
|
||||
total_sessions INTEGER NOT NULL DEFAULT 0,
|
||||
total_active_min REAL NOT NULL DEFAULT 0,
|
||||
total_lines_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_words_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_tokens_seen INTEGER NOT NULL DEFAULT 0,
|
||||
total_cards INTEGER NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (rollup_month, video_id)
|
||||
);
|
||||
`);
|
||||
|
||||
db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_video_started
|
||||
ON imm_sessions(video_id, started_at_ms DESC)
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_status_started
|
||||
ON imm_sessions(status, started_at_ms DESC)
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_telemetry_session_sample
|
||||
ON imm_session_telemetry(session_id, sample_ms DESC)
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_events_session_ts
|
||||
ON imm_session_events(session_id, ts_ms DESC)
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_events_type_ts
|
||||
ON imm_session_events(event_type, ts_ms DESC)
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_rollups_day_video
|
||||
ON imm_daily_rollups(rollup_day, video_id)
|
||||
`);
|
||||
db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_rollups_month_video
|
||||
ON imm_monthly_rollups(rollup_month, video_id)
|
||||
`);
|
||||
|
||||
db.exec(`
|
||||
INSERT INTO imm_schema_version(schema_version, applied_at_ms)
|
||||
VALUES (${SCHEMA_VERSION}, ${Date.now()})
|
||||
ON CONFLICT DO NOTHING
|
||||
`);
|
||||
}
|
||||
|
||||
export function createTrackerPreparedStatements(db: DatabaseSync): TrackerPreparedStatements {
|
||||
return {
|
||||
telemetryInsertStmt: db.prepare(`
|
||||
INSERT INTO imm_session_telemetry (
|
||||
session_id, sample_ms, total_watched_ms, active_watched_ms,
|
||||
lines_seen, words_seen, tokens_seen, cards_mined, lookup_count,
|
||||
lookup_hits, pause_count, pause_ms, seek_forward_count,
|
||||
seek_backward_count, media_buffer_events
|
||||
) VALUES (
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
|
||||
)
|
||||
`),
|
||||
eventInsertStmt: db.prepare(`
|
||||
INSERT INTO imm_session_events (
|
||||
session_id, ts_ms, event_type, line_index, segment_start_ms, segment_end_ms,
|
||||
words_delta, cards_delta, payload_json
|
||||
) VALUES (
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?
|
||||
)
|
||||
`),
|
||||
};
|
||||
}
|
||||
|
||||
export function executeQueuedWrite(write: QueuedWrite, stmts: TrackerPreparedStatements): void {
|
||||
if (write.kind === 'telemetry') {
|
||||
stmts.telemetryInsertStmt.run(
|
||||
write.sessionId,
|
||||
write.sampleMs!,
|
||||
write.totalWatchedMs!,
|
||||
write.activeWatchedMs!,
|
||||
write.linesSeen!,
|
||||
write.wordsSeen!,
|
||||
write.tokensSeen!,
|
||||
write.cardsMined!,
|
||||
write.lookupCount!,
|
||||
write.lookupHits!,
|
||||
write.pauseCount!,
|
||||
write.pauseMs!,
|
||||
write.seekForwardCount!,
|
||||
write.seekBackwardCount!,
|
||||
write.mediaBufferEvents!,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
stmts.eventInsertStmt.run(
|
||||
write.sessionId,
|
||||
write.sampleMs!,
|
||||
write.eventType!,
|
||||
write.lineIndex ?? null,
|
||||
write.segmentStartMs ?? null,
|
||||
write.segmentEndMs ?? null,
|
||||
write.wordsDelta ?? 0,
|
||||
write.cardsDelta ?? 0,
|
||||
write.payloadJson ?? null,
|
||||
);
|
||||
}
|
||||
|
||||
export function getOrCreateVideoRecord(
|
||||
db: DatabaseSync,
|
||||
videoKey: string,
|
||||
details: {
|
||||
canonicalTitle: string;
|
||||
sourcePath: string | null;
|
||||
sourceUrl: string | null;
|
||||
sourceType: number;
|
||||
},
|
||||
): number {
|
||||
const existing = db
|
||||
.prepare('SELECT video_id FROM imm_videos WHERE video_key = ?')
|
||||
.get(videoKey) as { video_id: number } | null;
|
||||
if (existing?.video_id) {
|
||||
db.prepare(
|
||||
'UPDATE imm_videos SET canonical_title = ?, updated_at_ms = ? WHERE video_id = ?',
|
||||
).run(details.canonicalTitle || 'unknown', Date.now(), existing.video_id);
|
||||
return existing.video_id;
|
||||
}
|
||||
|
||||
const nowMs = Date.now();
|
||||
const insert = db.prepare(`
|
||||
INSERT INTO imm_videos (
|
||||
video_key, canonical_title, source_type, source_path, source_url,
|
||||
duration_ms, file_size_bytes, codec_id, container_id, width_px, height_px,
|
||||
fps_x100, bitrate_kbps, audio_codec_id, hash_sha256, screenshot_path,
|
||||
metadata_json, created_at_ms, updated_at_ms
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`);
|
||||
const result = insert.run(
|
||||
videoKey,
|
||||
details.canonicalTitle || 'unknown',
|
||||
details.sourceType,
|
||||
details.sourcePath,
|
||||
details.sourceUrl,
|
||||
0,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
nowMs,
|
||||
nowMs,
|
||||
);
|
||||
return Number(result.lastInsertRowid);
|
||||
}
|
||||
|
||||
export function updateVideoMetadataRecord(
|
||||
db: DatabaseSync,
|
||||
videoId: number,
|
||||
metadata: VideoMetadata,
|
||||
): void {
|
||||
db.prepare(
|
||||
`
|
||||
UPDATE imm_videos
|
||||
SET
|
||||
duration_ms = ?,
|
||||
file_size_bytes = ?,
|
||||
codec_id = ?,
|
||||
container_id = ?,
|
||||
width_px = ?,
|
||||
height_px = ?,
|
||||
fps_x100 = ?,
|
||||
bitrate_kbps = ?,
|
||||
audio_codec_id = ?,
|
||||
hash_sha256 = ?,
|
||||
screenshot_path = ?,
|
||||
metadata_json = ?,
|
||||
updated_at_ms = ?
|
||||
WHERE video_id = ?
|
||||
`,
|
||||
).run(
|
||||
metadata.durationMs,
|
||||
metadata.fileSizeBytes,
|
||||
metadata.codecId,
|
||||
metadata.containerId,
|
||||
metadata.widthPx,
|
||||
metadata.heightPx,
|
||||
metadata.fpsX100,
|
||||
metadata.bitrateKbps,
|
||||
metadata.audioCodecId,
|
||||
metadata.hashSha256,
|
||||
metadata.screenshotPath,
|
||||
metadata.metadataJson,
|
||||
Date.now(),
|
||||
videoId,
|
||||
);
|
||||
}
|
||||
|
||||
export function updateVideoTitleRecord(
|
||||
db: DatabaseSync,
|
||||
videoId: number,
|
||||
canonicalTitle: string,
|
||||
): void {
|
||||
db.prepare('UPDATE imm_videos SET canonical_title = ?, updated_at_ms = ? WHERE video_id = ?').run(
|
||||
canonicalTitle,
|
||||
Date.now(),
|
||||
videoId,
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user