test: add immersion tracking startup safety and sqlite tests

This commit is contained in:
2026-02-17 00:59:09 -08:00
parent 4d28efabd0
commit 79bf5ebefb
7 changed files with 457 additions and 50 deletions

View File

@@ -10,8 +10,9 @@ const DEFAULT_QUEUE_CAP = 1_000;
const DEFAULT_BATCH_SIZE = 25;
const DEFAULT_FLUSH_INTERVAL_MS = 500;
const DEFAULT_MAINTENANCE_INTERVAL_MS = 24 * 60 * 60 * 1000;
const WEEK_MS = 7 * 24 * 60 * 60 * 1000;
const EVENTS_RETENTION_MS = 7 * 24 * 60 * 60 * 1000;
const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000;
const EVENTS_RETENTION_MS = ONE_WEEK_MS;
const VACUUM_INTERVAL_MS = ONE_WEEK_MS;
const TELEMETRY_RETENTION_MS = 30 * 24 * 60 * 60 * 1000;
const DAILY_ROLLUP_RETENTION_MS = 365 * 24 * 60 * 60 * 1000;
const MONTHLY_ROLLUP_RETENTION_MS = 5 * 365 * 24 * 60 * 60 * 1000;
@@ -159,7 +160,6 @@ export class ImmersionTrackerService {
private maintenanceTimer: ReturnType<typeof setInterval> | null = null;
private flushScheduled = false;
private droppedWriteCount = 0;
private pendingFlush = false;
private lastMaintenanceMs = 0;
private lastVacuumMs = 0;
private isDestroyed = false;
@@ -167,6 +167,8 @@ export class ImmersionTrackerService {
private currentVideoKey = "";
private currentMediaPathOrUrl = "";
private lastQueueWriteAtMs = 0;
private readonly telemetryInsertStmt: ReturnType<DatabaseSync["prepare"]>;
private readonly eventInsertStmt: ReturnType<DatabaseSync["prepare"]>;
constructor(options: ImmersionTrackerOptions) {
this.dbPath = options.dbPath;
@@ -184,6 +186,24 @@ export class ImmersionTrackerService {
this.db = new DatabaseSync(this.dbPath);
this.applyPragmas();
this.ensureSchema();
this.telemetryInsertStmt = this.db.prepare(`
INSERT INTO imm_session_telemetry (
session_id, sample_ms, total_watched_ms, active_watched_ms,
lines_seen, words_seen, tokens_seen, cards_mined, lookup_count,
lookup_hits, pause_count, pause_ms, seek_forward_count,
seek_backward_count, media_buffer_events
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
`);
this.eventInsertStmt = this.db.prepare(`
INSERT INTO imm_session_events (
session_id, ts_ms, event_type, line_index, segment_start_ms, segment_end_ms,
words_delta, cards_delta, payload_json
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?
)
`);
this.scheduleMaintenance();
this.scheduleFlush();
}
@@ -199,10 +219,7 @@ export class ImmersionTrackerService {
clearInterval(this.maintenanceTimer);
this.maintenanceTimer = null;
}
this.flushTelemetry(true);
this.flushNow();
this.finalizeActiveSession();
this.flushNow();
this.db.close();
}
@@ -336,7 +353,7 @@ export class ImmersionTrackerService {
}
const sourceType = this.isRemoteSource(normalizedPath) ? SOURCE_TYPE_REMOTE : SOURCE_TYPE_LOCAL;
const videoKey = this.buildVideoKey(normalizedPath, sourceType, normalizedTitle);
const videoKey = this.buildVideoKey(normalizedPath, sourceType);
const canonicalTitle = normalizedTitle || this.deriveCanonicalTitle(normalizedPath);
const sourcePath = sourceType === SOURCE_TYPE_LOCAL ? normalizedPath : null;
const sourceUrl = sourceType === SOURCE_TYPE_REMOTE ? normalizedPath : null;
@@ -622,7 +639,6 @@ export class ImmersionTrackerService {
const batch = this.queue.splice(0, Math.min(this.batchSize, this.queue.length));
this.writeLock.locked = true;
this.pendingFlush = true;
try {
this.db.exec("BEGIN IMMEDIATE");
for (const write of batch) {
@@ -635,7 +651,6 @@ export class ImmersionTrackerService {
this.logger.warn("Immersion tracker flush failed, retrying later", error as Error);
} finally {
this.writeLock.locked = false;
this.pendingFlush = false;
this.flushScheduled = false;
if (this.queue.length > 0) {
this.scheduleFlush(this.flushIntervalMs);
@@ -645,17 +660,7 @@ export class ImmersionTrackerService {
private flushSingle(write: QueuedWrite): void {
if (write.kind === "telemetry") {
const stmt = this.db.prepare(`
INSERT INTO imm_session_telemetry (
session_id, sample_ms, total_watched_ms, active_watched_ms,
lines_seen, words_seen, tokens_seen, cards_mined, lookup_count,
lookup_hits, pause_count, pause_ms, seek_forward_count,
seek_backward_count, media_buffer_events
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
`);
stmt.run(
this.telemetryInsertStmt.run(
write.sessionId,
write.sampleMs!,
write.totalWatchedMs!,
@@ -675,15 +680,7 @@ export class ImmersionTrackerService {
return;
}
const eventStmt = this.db.prepare(`
INSERT INTO imm_session_events (
session_id, ts_ms, event_type, line_index, segment_start_ms, segment_end_ms,
words_delta, cards_delta, payload_json
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?
)
`);
eventStmt.run(
this.eventInsertStmt.run(
write.sessionId,
write.sampleMs!,
write.eventType!,
@@ -871,7 +868,7 @@ export class ImmersionTrackerService {
const dailyCutoff = nowMs - DAILY_ROLLUP_RETENTION_MS;
const monthlyCutoff = nowMs - MONTHLY_ROLLUP_RETENTION_MS;
const dayCutoff = Math.floor(dailyCutoff / 86_400_000);
const monthCutoff = Math.floor(monthlyCutoff / 2_592_000_000);
const monthCutoff = this.toMonthKey(monthlyCutoff);
this.db.prepare(`DELETE FROM imm_session_events WHERE ts_ms < ?`).run(eventCutoff);
this.db.prepare(`DELETE FROM imm_session_telemetry WHERE sample_ms < ?`).run(telemetryCutoff);
@@ -882,7 +879,10 @@ export class ImmersionTrackerService {
.run(telemetryCutoff);
this.runRollupMaintenance();
if (nowMs - this.lastVacuumMs >= WEEK_MS && !this.writeLock.locked) {
if (
nowMs - this.lastVacuumMs >= VACUUM_INTERVAL_MS
&& !this.writeLock.locked
) {
this.db.exec("VACUUM");
this.lastVacuumMs = nowMs;
}
@@ -938,7 +938,7 @@ export class ImmersionTrackerService {
total_words_seen, total_tokens_seen, total_cards
)
SELECT
CAST(s.started_at_ms / 2592000000 AS INTEGER) AS rollup_month,
CAST(strftime('%Y%m', s.started_at_ms / 1000, 'unixepoch') AS INTEGER) AS rollup_month,
s.video_id AS video_id,
COUNT(DISTINCT s.session_id) AS total_sessions,
COALESCE(SUM(t.active_watched_ms), 0) / 60000.0 AS total_active_min,
@@ -953,6 +953,11 @@ export class ImmersionTrackerService {
`);
}
private toMonthKey(timestampMs: number): number {
const monthDate = new Date(timestampMs);
return monthDate.getUTCFullYear() * 100 + monthDate.getUTCMonth() + 1;
}
private startSession(videoId: number, startedAtMs?: number): void {
const nowMs = startedAtMs ?? Date.now();
const result = this.startSessionStatement(videoId, nowMs);
@@ -1005,9 +1010,7 @@ export class ImmersionTrackerService {
private startSessionStatement(videoId: number, startedAtMs: number): {
lastInsertRowid: number | bigint;
} {
const sessionUuid = `session-${videoId}-${startedAtMs}-${Math.random()
.toString(16)
.slice(2, 10)}`;
const sessionUuid = crypto.randomUUID();
return this.db
.prepare(`
INSERT INTO imm_sessions (
@@ -1367,7 +1370,7 @@ export class ImmersionTrackerService {
return value.trim().replace(/\s+/g, " ");
}
private buildVideoKey(mediaPath: string, sourceType: number, title: string): string {
private buildVideoKey(mediaPath: string, sourceType: number): string {
if (sourceType === SOURCE_TYPE_REMOTE) {
return `remote:${mediaPath}`;
}