From d0f29cfeae5b88048b6af572fcdf276834c822b6 Mon Sep 17 00:00:00 2001 From: sudacode Date: Sun, 1 Mar 2026 00:08:21 -0800 Subject: [PATCH] feat: optimize immersion rollup maintenance --- docs/immersion-tracking.md | 2 + .../services/immersion-tracker-service.ts | 10 +- .../services/immersion-tracker/maintenance.ts | 208 ++++++++++++++++-- .../immersion-tracker/storage-session.test.ts | 11 + .../services/immersion-tracker/storage.ts | 11 + 5 files changed, 218 insertions(+), 24 deletions(-) diff --git a/docs/immersion-tracking.md b/docs/immersion-tracking.md index 5573493..1d0af4c 100644 --- a/docs/immersion-tracking.md +++ b/docs/immersion-tracking.md @@ -10,6 +10,8 @@ SubMiner stores immersion analytics in local SQLite (`immersion.sqlite`) by defa - Queue overflow policy is deterministic: drop oldest queued writes, keep newest. - Flush policy defaults to `25` writes or `500ms` max delay. - SQLite pragmas: `journal_mode=WAL`, `synchronous=NORMAL`, `foreign_keys=ON`, `busy_timeout=2500`. +- Rollups now run incrementally from the last processed telemetry sample; startup performs a one-time bootstrap rebuild-equivalent pass. +- If retention pruning removes telemetry/session rows, maintenance triggers a full rollup rebuild to resync historical aggregates. ## Schema (v3) diff --git a/src/core/services/immersion-tracker-service.ts b/src/core/services/immersion-tracker-service.ts index 576d814..5445036 100644 --- a/src/core/services/immersion-tracker-service.ts +++ b/src/core/services/immersion-tracker-service.ts @@ -586,13 +586,15 @@ export class ImmersionTrackerService { this.flushTelemetry(true); this.flushNow(); const nowMs = Date.now(); - pruneRetention(this.db, nowMs, { + const retentionResult = pruneRetention(this.db, nowMs, { eventsRetentionMs: this.eventsRetentionMs, telemetryRetentionMs: this.telemetryRetentionMs, dailyRollupRetentionMs: this.dailyRollupRetentionMs, monthlyRollupRetentionMs: this.monthlyRollupRetentionMs, }); - this.runRollupMaintenance(); + const shouldRebuildRollups = + retentionResult.deletedTelemetryRows > 0 || retentionResult.deletedEndedSessions > 0; + this.runRollupMaintenance(shouldRebuildRollups); if (nowMs - this.lastVacuumMs >= this.vacuumIntervalMs && !this.writeLock.locked) { this.db.exec('VACUUM'); @@ -606,8 +608,8 @@ export class ImmersionTrackerService { } } - private runRollupMaintenance(): void { - runRollupMaintenance(this.db); + private runRollupMaintenance(forceRebuild = false): void { + runRollupMaintenance(this.db, forceRebuild); } private startSession(videoId: number, startedAtMs?: number): void { diff --git a/src/core/services/immersion-tracker/maintenance.ts b/src/core/services/immersion-tracker/maintenance.ts index 17c449d..1c611b5 100644 --- a/src/core/services/immersion-tracker/maintenance.ts +++ b/src/core/services/immersion-tracker/maintenance.ts @@ -1,5 +1,31 @@ import type { DatabaseSync } from 'node:sqlite'; +const ROLLUP_STATE_KEY = 'last_rollup_sample_ms'; +const DAILY_MS = 86_400_000; +const ZERO_ID = 0; + +interface RollupStateRow { + state_value: number; +} + +interface RollupGroupRow { + rollup_day: number; + rollup_month: number; + video_id: number; +} + +interface RollupTelemetryResult { + maxSampleMs: number | null; +} + +interface RetentionResult { + deletedSessionEvents: number; + deletedTelemetryRows: number; + deletedDailyRows: number; + deletedMonthlyRows: number; + deletedEndedSessions: number; +} + export function toMonthKey(timestampMs: number): number { const monthDate = new Date(timestampMs); return monthDate.getUTCFullYear() * 100 + monthDate.getUTCMonth() + 1; @@ -14,27 +40,65 @@ export function pruneRetention( dailyRollupRetentionMs: number; monthlyRollupRetentionMs: number; }, -): void { +): RetentionResult { const eventCutoff = nowMs - policy.eventsRetentionMs; const telemetryCutoff = nowMs - policy.telemetryRetentionMs; - const dailyCutoff = nowMs - policy.dailyRollupRetentionMs; - const monthlyCutoff = nowMs - policy.monthlyRollupRetentionMs; - const dayCutoff = Math.floor(dailyCutoff / 86_400_000); - const monthCutoff = toMonthKey(monthlyCutoff); + const dayCutoff = nowMs - policy.dailyRollupRetentionMs; + const monthCutoff = nowMs - policy.monthlyRollupRetentionMs; - db.prepare(`DELETE FROM imm_session_events WHERE ts_ms < ?`).run(eventCutoff); - db.prepare(`DELETE FROM imm_session_telemetry WHERE sample_ms < ?`).run(telemetryCutoff); - db.prepare(`DELETE FROM imm_daily_rollups WHERE rollup_day < ?`).run(dayCutoff); - db.prepare(`DELETE FROM imm_monthly_rollups WHERE rollup_month < ?`).run(monthCutoff); - db.prepare(`DELETE FROM imm_sessions WHERE ended_at_ms IS NOT NULL AND ended_at_ms < ?`).run( - telemetryCutoff, - ); + const deletedSessionEvents = (db + .prepare(`DELETE FROM imm_session_events WHERE ts_ms < ?`) + .run(eventCutoff) as { changes: number }).changes; + const deletedTelemetryRows = (db + .prepare(`DELETE FROM imm_session_telemetry WHERE sample_ms < ?`) + .run(telemetryCutoff) as { changes: number }).changes; + const deletedDailyRows = (db + .prepare(`DELETE FROM imm_daily_rollups WHERE rollup_day < ?`) + .run(Math.floor(dayCutoff / DAILY_MS)) as { changes: number }).changes; + const deletedMonthlyRows = (db + .prepare(`DELETE FROM imm_monthly_rollups WHERE rollup_month < ?`) + .run(toMonthKey(monthCutoff)) as { changes: number }).changes; + const deletedEndedSessions = (db + .prepare( + `DELETE FROM imm_sessions WHERE ended_at_ms IS NOT NULL AND ended_at_ms < ?`, + ) + .run(telemetryCutoff) as { changes: number }).changes; + + return { + deletedSessionEvents, + deletedTelemetryRows, + deletedDailyRows, + deletedMonthlyRows, + deletedEndedSessions, + }; } -export function runRollupMaintenance(db: DatabaseSync): void { - const rollupNowMs = Date.now(); +function getLastRollupSampleMs(db: DatabaseSync): number { + const row = db + .prepare(`SELECT state_value FROM imm_rollup_state WHERE state_key = ? LIMIT 1`) + .get(ROLLUP_STATE_KEY) as unknown as RollupStateRow | null; + return row ? Number(row.state_value) : ZERO_ID; +} - db.exec(` +function setLastRollupSampleMs(db: DatabaseSync, sampleMs: number): void { + db.prepare( + `INSERT INTO imm_rollup_state (state_key, state_value) + VALUES (?, ?) + ON CONFLICT(state_key) DO UPDATE SET state_value = excluded.state_value`, + ).run(ROLLUP_STATE_KEY, sampleMs); +} + +function upsertDailyRollupsForGroups( + db: DatabaseSync, + groups: Array<{ rollupDay: number; videoId: number }>, + rollupNowMs: number, +): void { + if (groups.length === 0) { + return; + } + + const deleteStmt = db.prepare(`DELETE FROM imm_daily_rollups WHERE rollup_day = ? AND video_id = ?`); + const upsertStmt = db.prepare(` INSERT INTO imm_daily_rollups ( rollup_day, video_id, total_sessions, total_active_min, total_lines_seen, total_words_seen, total_tokens_seen, total_cards, cards_per_hour, @@ -64,11 +128,12 @@ export function runRollupMaintenance(db: DatabaseSync): void { THEN CAST(COALESCE(SUM(t.lookup_hits), 0) AS REAL) / CAST(SUM(t.lookup_count) AS REAL) ELSE NULL END AS lookup_hit_rate, - ${rollupNowMs} AS CREATED_DATE, - ${rollupNowMs} AS LAST_UPDATE_DATE + ? AS CREATED_DATE, + ? AS LAST_UPDATE_DATE FROM imm_sessions s JOIN imm_session_telemetry t ON t.session_id = s.session_id + WHERE CAST(s.started_at_ms / 86400000 AS INTEGER) = ? AND s.video_id = ? GROUP BY rollup_day, s.video_id ON CONFLICT (rollup_day, video_id) DO UPDATE SET total_sessions = excluded.total_sessions, @@ -84,7 +149,25 @@ export function runRollupMaintenance(db: DatabaseSync): void { LAST_UPDATE_DATE = excluded.LAST_UPDATE_DATE `); - db.exec(` + for (const { rollupDay, videoId } of groups) { + deleteStmt.run(rollupDay, videoId); + upsertStmt.run(rollupNowMs, rollupNowMs, rollupDay, videoId); + } +} + +function upsertMonthlyRollupsForGroups( + db: DatabaseSync, + groups: Array<{ rollupMonth: number; videoId: number }>, + rollupNowMs: number, +): void { + if (groups.length === 0) { + return; + } + + const deleteStmt = db.prepare( + `DELETE FROM imm_monthly_rollups WHERE rollup_month = ? AND video_id = ?`, + ); + const upsertStmt = db.prepare(` INSERT INTO imm_monthly_rollups ( rollup_month, video_id, total_sessions, total_active_min, total_lines_seen, total_words_seen, total_tokens_seen, total_cards, CREATED_DATE, LAST_UPDATE_DATE @@ -98,11 +181,12 @@ export function runRollupMaintenance(db: DatabaseSync): void { COALESCE(SUM(t.words_seen), 0) AS total_words_seen, COALESCE(SUM(t.tokens_seen), 0) AS total_tokens_seen, COALESCE(SUM(t.cards_mined), 0) AS total_cards, - ${rollupNowMs} AS CREATED_DATE, - ${rollupNowMs} AS LAST_UPDATE_DATE + ? AS CREATED_DATE, + ? AS LAST_UPDATE_DATE FROM imm_sessions s JOIN imm_session_telemetry t ON t.session_id = s.session_id + WHERE CAST(strftime('%Y%m', s.started_at_ms / 1000, 'unixepoch') AS INTEGER) = ? AND s.video_id = ? GROUP BY rollup_month, s.video_id ON CONFLICT (rollup_month, video_id) DO UPDATE SET total_sessions = excluded.total_sessions, @@ -114,4 +198,88 @@ export function runRollupMaintenance(db: DatabaseSync): void { CREATED_DATE = COALESCE(imm_monthly_rollups.CREATED_DATE, excluded.CREATED_DATE), LAST_UPDATE_DATE = excluded.LAST_UPDATE_DATE `); + + for (const { rollupMonth, videoId } of groups) { + deleteStmt.run(rollupMonth, videoId); + upsertStmt.run(rollupNowMs, rollupNowMs, rollupMonth, videoId); + } +} + +function getAffectedRollupGroups( + db: DatabaseSync, + lastRollupSampleMs: number, +): Array<{ rollupDay: number; rollupMonth: number; videoId: number }> { + return ( + db + .prepare( + ` + SELECT DISTINCT + CAST(s.started_at_ms / 86400000 AS INTEGER) AS rollup_day, + CAST(strftime('%Y%m', s.started_at_ms / 1000, 'unixepoch') AS INTEGER) AS rollup_month, + s.video_id AS video_id + FROM imm_session_telemetry t + JOIN imm_sessions s + ON s.session_id = t.session_id + WHERE t.sample_ms > ? + `, + ) + .all(lastRollupSampleMs) as unknown as RollupGroupRow[] + ).map((row) => ({ + rollupDay: row.rollup_day, + rollupMonth: row.rollup_month, + videoId: row.video_id, + })); +} + +function dedupeGroups( + groups: Array, +): Array { + const seen = new Set(); + const result: Array = []; + for (const group of groups) { + const key = `${group.rollupDay ?? group.rollupMonth}-${group.videoId}`; + if (seen.has(key)) { + continue; + } + seen.add(key); + result.push(group); + } + return result; +} + +export function runRollupMaintenance(db: DatabaseSync, forceRebuild = false): void { + const rollupNowMs = Date.now(); + const lastRollupSampleMs = forceRebuild ? ZERO_ID : getLastRollupSampleMs(db); + + const maxSampleRow = db + .prepare('SELECT MAX(sample_ms) AS maxSampleMs FROM imm_session_telemetry') + .get() as unknown as RollupTelemetryResult | null; + if (!maxSampleRow?.maxSampleMs) { + if (forceRebuild) { + setLastRollupSampleMs(db, ZERO_ID); + } + return; + } + + const affectedGroups = getAffectedRollupGroups(db, lastRollupSampleMs); + if (!forceRebuild && affectedGroups.length === 0) { + return; + } + + const dailyGroups = dedupeGroups( + affectedGroups.map((group) => ({ + rollupDay: group.rollupDay, + videoId: group.videoId, + })), + ); + const monthlyGroups = dedupeGroups( + affectedGroups.map((group) => ({ + rollupMonth: group.rollupMonth, + videoId: group.videoId, + })), + ); + + upsertDailyRollupsForGroups(db, dailyGroups, rollupNowMs); + upsertMonthlyRollupsForGroups(db, monthlyGroups, rollupNowMs); + setLastRollupSampleMs(db, Number(maxSampleRow.maxSampleMs)); } diff --git a/src/core/services/immersion-tracker/storage-session.test.ts b/src/core/services/immersion-tracker/storage-session.test.ts index 1fd586b..8243c8d 100644 --- a/src/core/services/immersion-tracker/storage-session.test.ts +++ b/src/core/services/immersion-tracker/storage-session.test.ts @@ -56,6 +56,17 @@ testIfSqlite('ensureSchema creates immersion core tables', () => { assert.ok(tableNames.has('imm_monthly_rollups')); assert.ok(tableNames.has('imm_words')); assert.ok(tableNames.has('imm_kanji')); + assert.ok(tableNames.has('imm_rollup_state')); + + const rollupStateRow = db + .prepare( + 'SELECT state_value FROM imm_rollup_state WHERE state_key = ?', + ) + .get('last_rollup_sample_ms') as { + state_value: number; + } | null; + assert.ok(rollupStateRow); + assert.equal(rollupStateRow?.state_value, 0); } finally { db.close(); cleanupDbPath(dbPath); diff --git a/src/core/services/immersion-tracker/storage.ts b/src/core/services/immersion-tracker/storage.ts index fb822c8..1266e23 100644 --- a/src/core/services/immersion-tracker/storage.ts +++ b/src/core/services/immersion-tracker/storage.ts @@ -42,6 +42,17 @@ export function ensureSchema(db: DatabaseSync): void { applied_at_ms INTEGER NOT NULL ); `); + db.exec(` + CREATE TABLE IF NOT EXISTS imm_rollup_state( + state_key TEXT PRIMARY KEY, + state_value INTEGER NOT NULL + ); + `); + db.exec(` + INSERT INTO imm_rollup_state(state_key, state_value) + VALUES ('last_rollup_sample_ms', 0) + ON CONFLICT(state_key) DO NOTHING + `); const currentVersion = db .prepare('SELECT schema_version FROM imm_schema_version ORDER BY schema_version DESC LIMIT 1')