mirror of
https://github.com/ksyasuda/SubMiner.git
synced 2026-03-01 06:22:44 -08:00
feat: optimize immersion rollup maintenance
This commit is contained in:
@@ -10,6 +10,8 @@ SubMiner stores immersion analytics in local SQLite (`immersion.sqlite`) by defa
|
|||||||
- Queue overflow policy is deterministic: drop oldest queued writes, keep newest.
|
- Queue overflow policy is deterministic: drop oldest queued writes, keep newest.
|
||||||
- Flush policy defaults to `25` writes or `500ms` max delay.
|
- Flush policy defaults to `25` writes or `500ms` max delay.
|
||||||
- SQLite pragmas: `journal_mode=WAL`, `synchronous=NORMAL`, `foreign_keys=ON`, `busy_timeout=2500`.
|
- SQLite pragmas: `journal_mode=WAL`, `synchronous=NORMAL`, `foreign_keys=ON`, `busy_timeout=2500`.
|
||||||
|
- Rollups now run incrementally from the last processed telemetry sample; startup performs a one-time bootstrap rebuild-equivalent pass.
|
||||||
|
- If retention pruning removes telemetry/session rows, maintenance triggers a full rollup rebuild to resync historical aggregates.
|
||||||
|
|
||||||
## Schema (v3)
|
## Schema (v3)
|
||||||
|
|
||||||
|
|||||||
@@ -586,13 +586,15 @@ export class ImmersionTrackerService {
|
|||||||
this.flushTelemetry(true);
|
this.flushTelemetry(true);
|
||||||
this.flushNow();
|
this.flushNow();
|
||||||
const nowMs = Date.now();
|
const nowMs = Date.now();
|
||||||
pruneRetention(this.db, nowMs, {
|
const retentionResult = pruneRetention(this.db, nowMs, {
|
||||||
eventsRetentionMs: this.eventsRetentionMs,
|
eventsRetentionMs: this.eventsRetentionMs,
|
||||||
telemetryRetentionMs: this.telemetryRetentionMs,
|
telemetryRetentionMs: this.telemetryRetentionMs,
|
||||||
dailyRollupRetentionMs: this.dailyRollupRetentionMs,
|
dailyRollupRetentionMs: this.dailyRollupRetentionMs,
|
||||||
monthlyRollupRetentionMs: this.monthlyRollupRetentionMs,
|
monthlyRollupRetentionMs: this.monthlyRollupRetentionMs,
|
||||||
});
|
});
|
||||||
this.runRollupMaintenance();
|
const shouldRebuildRollups =
|
||||||
|
retentionResult.deletedTelemetryRows > 0 || retentionResult.deletedEndedSessions > 0;
|
||||||
|
this.runRollupMaintenance(shouldRebuildRollups);
|
||||||
|
|
||||||
if (nowMs - this.lastVacuumMs >= this.vacuumIntervalMs && !this.writeLock.locked) {
|
if (nowMs - this.lastVacuumMs >= this.vacuumIntervalMs && !this.writeLock.locked) {
|
||||||
this.db.exec('VACUUM');
|
this.db.exec('VACUUM');
|
||||||
@@ -606,8 +608,8 @@ export class ImmersionTrackerService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private runRollupMaintenance(): void {
|
private runRollupMaintenance(forceRebuild = false): void {
|
||||||
runRollupMaintenance(this.db);
|
runRollupMaintenance(this.db, forceRebuild);
|
||||||
}
|
}
|
||||||
|
|
||||||
private startSession(videoId: number, startedAtMs?: number): void {
|
private startSession(videoId: number, startedAtMs?: number): void {
|
||||||
|
|||||||
@@ -1,5 +1,31 @@
|
|||||||
import type { DatabaseSync } from 'node:sqlite';
|
import type { DatabaseSync } from 'node:sqlite';
|
||||||
|
|
||||||
|
const ROLLUP_STATE_KEY = 'last_rollup_sample_ms';
|
||||||
|
const DAILY_MS = 86_400_000;
|
||||||
|
const ZERO_ID = 0;
|
||||||
|
|
||||||
|
interface RollupStateRow {
|
||||||
|
state_value: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface RollupGroupRow {
|
||||||
|
rollup_day: number;
|
||||||
|
rollup_month: number;
|
||||||
|
video_id: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface RollupTelemetryResult {
|
||||||
|
maxSampleMs: number | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface RetentionResult {
|
||||||
|
deletedSessionEvents: number;
|
||||||
|
deletedTelemetryRows: number;
|
||||||
|
deletedDailyRows: number;
|
||||||
|
deletedMonthlyRows: number;
|
||||||
|
deletedEndedSessions: number;
|
||||||
|
}
|
||||||
|
|
||||||
export function toMonthKey(timestampMs: number): number {
|
export function toMonthKey(timestampMs: number): number {
|
||||||
const monthDate = new Date(timestampMs);
|
const monthDate = new Date(timestampMs);
|
||||||
return monthDate.getUTCFullYear() * 100 + monthDate.getUTCMonth() + 1;
|
return monthDate.getUTCFullYear() * 100 + monthDate.getUTCMonth() + 1;
|
||||||
@@ -14,27 +40,65 @@ export function pruneRetention(
|
|||||||
dailyRollupRetentionMs: number;
|
dailyRollupRetentionMs: number;
|
||||||
monthlyRollupRetentionMs: number;
|
monthlyRollupRetentionMs: number;
|
||||||
},
|
},
|
||||||
): void {
|
): RetentionResult {
|
||||||
const eventCutoff = nowMs - policy.eventsRetentionMs;
|
const eventCutoff = nowMs - policy.eventsRetentionMs;
|
||||||
const telemetryCutoff = nowMs - policy.telemetryRetentionMs;
|
const telemetryCutoff = nowMs - policy.telemetryRetentionMs;
|
||||||
const dailyCutoff = nowMs - policy.dailyRollupRetentionMs;
|
const dayCutoff = nowMs - policy.dailyRollupRetentionMs;
|
||||||
const monthlyCutoff = nowMs - policy.monthlyRollupRetentionMs;
|
const monthCutoff = nowMs - policy.monthlyRollupRetentionMs;
|
||||||
const dayCutoff = Math.floor(dailyCutoff / 86_400_000);
|
|
||||||
const monthCutoff = toMonthKey(monthlyCutoff);
|
|
||||||
|
|
||||||
db.prepare(`DELETE FROM imm_session_events WHERE ts_ms < ?`).run(eventCutoff);
|
const deletedSessionEvents = (db
|
||||||
db.prepare(`DELETE FROM imm_session_telemetry WHERE sample_ms < ?`).run(telemetryCutoff);
|
.prepare(`DELETE FROM imm_session_events WHERE ts_ms < ?`)
|
||||||
db.prepare(`DELETE FROM imm_daily_rollups WHERE rollup_day < ?`).run(dayCutoff);
|
.run(eventCutoff) as { changes: number }).changes;
|
||||||
db.prepare(`DELETE FROM imm_monthly_rollups WHERE rollup_month < ?`).run(monthCutoff);
|
const deletedTelemetryRows = (db
|
||||||
db.prepare(`DELETE FROM imm_sessions WHERE ended_at_ms IS NOT NULL AND ended_at_ms < ?`).run(
|
.prepare(`DELETE FROM imm_session_telemetry WHERE sample_ms < ?`)
|
||||||
telemetryCutoff,
|
.run(telemetryCutoff) as { changes: number }).changes;
|
||||||
);
|
const deletedDailyRows = (db
|
||||||
|
.prepare(`DELETE FROM imm_daily_rollups WHERE rollup_day < ?`)
|
||||||
|
.run(Math.floor(dayCutoff / DAILY_MS)) as { changes: number }).changes;
|
||||||
|
const deletedMonthlyRows = (db
|
||||||
|
.prepare(`DELETE FROM imm_monthly_rollups WHERE rollup_month < ?`)
|
||||||
|
.run(toMonthKey(monthCutoff)) as { changes: number }).changes;
|
||||||
|
const deletedEndedSessions = (db
|
||||||
|
.prepare(
|
||||||
|
`DELETE FROM imm_sessions WHERE ended_at_ms IS NOT NULL AND ended_at_ms < ?`,
|
||||||
|
)
|
||||||
|
.run(telemetryCutoff) as { changes: number }).changes;
|
||||||
|
|
||||||
|
return {
|
||||||
|
deletedSessionEvents,
|
||||||
|
deletedTelemetryRows,
|
||||||
|
deletedDailyRows,
|
||||||
|
deletedMonthlyRows,
|
||||||
|
deletedEndedSessions,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export function runRollupMaintenance(db: DatabaseSync): void {
|
function getLastRollupSampleMs(db: DatabaseSync): number {
|
||||||
const rollupNowMs = Date.now();
|
const row = db
|
||||||
|
.prepare(`SELECT state_value FROM imm_rollup_state WHERE state_key = ? LIMIT 1`)
|
||||||
|
.get(ROLLUP_STATE_KEY) as unknown as RollupStateRow | null;
|
||||||
|
return row ? Number(row.state_value) : ZERO_ID;
|
||||||
|
}
|
||||||
|
|
||||||
db.exec(`
|
function setLastRollupSampleMs(db: DatabaseSync, sampleMs: number): void {
|
||||||
|
db.prepare(
|
||||||
|
`INSERT INTO imm_rollup_state (state_key, state_value)
|
||||||
|
VALUES (?, ?)
|
||||||
|
ON CONFLICT(state_key) DO UPDATE SET state_value = excluded.state_value`,
|
||||||
|
).run(ROLLUP_STATE_KEY, sampleMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
function upsertDailyRollupsForGroups(
|
||||||
|
db: DatabaseSync,
|
||||||
|
groups: Array<{ rollupDay: number; videoId: number }>,
|
||||||
|
rollupNowMs: number,
|
||||||
|
): void {
|
||||||
|
if (groups.length === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deleteStmt = db.prepare(`DELETE FROM imm_daily_rollups WHERE rollup_day = ? AND video_id = ?`);
|
||||||
|
const upsertStmt = db.prepare(`
|
||||||
INSERT INTO imm_daily_rollups (
|
INSERT INTO imm_daily_rollups (
|
||||||
rollup_day, video_id, total_sessions, total_active_min, total_lines_seen,
|
rollup_day, video_id, total_sessions, total_active_min, total_lines_seen,
|
||||||
total_words_seen, total_tokens_seen, total_cards, cards_per_hour,
|
total_words_seen, total_tokens_seen, total_cards, cards_per_hour,
|
||||||
@@ -64,11 +128,12 @@ export function runRollupMaintenance(db: DatabaseSync): void {
|
|||||||
THEN CAST(COALESCE(SUM(t.lookup_hits), 0) AS REAL) / CAST(SUM(t.lookup_count) AS REAL)
|
THEN CAST(COALESCE(SUM(t.lookup_hits), 0) AS REAL) / CAST(SUM(t.lookup_count) AS REAL)
|
||||||
ELSE NULL
|
ELSE NULL
|
||||||
END AS lookup_hit_rate,
|
END AS lookup_hit_rate,
|
||||||
${rollupNowMs} AS CREATED_DATE,
|
? AS CREATED_DATE,
|
||||||
${rollupNowMs} AS LAST_UPDATE_DATE
|
? AS LAST_UPDATE_DATE
|
||||||
FROM imm_sessions s
|
FROM imm_sessions s
|
||||||
JOIN imm_session_telemetry t
|
JOIN imm_session_telemetry t
|
||||||
ON t.session_id = s.session_id
|
ON t.session_id = s.session_id
|
||||||
|
WHERE CAST(s.started_at_ms / 86400000 AS INTEGER) = ? AND s.video_id = ?
|
||||||
GROUP BY rollup_day, s.video_id
|
GROUP BY rollup_day, s.video_id
|
||||||
ON CONFLICT (rollup_day, video_id) DO UPDATE SET
|
ON CONFLICT (rollup_day, video_id) DO UPDATE SET
|
||||||
total_sessions = excluded.total_sessions,
|
total_sessions = excluded.total_sessions,
|
||||||
@@ -84,7 +149,25 @@ export function runRollupMaintenance(db: DatabaseSync): void {
|
|||||||
LAST_UPDATE_DATE = excluded.LAST_UPDATE_DATE
|
LAST_UPDATE_DATE = excluded.LAST_UPDATE_DATE
|
||||||
`);
|
`);
|
||||||
|
|
||||||
db.exec(`
|
for (const { rollupDay, videoId } of groups) {
|
||||||
|
deleteStmt.run(rollupDay, videoId);
|
||||||
|
upsertStmt.run(rollupNowMs, rollupNowMs, rollupDay, videoId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function upsertMonthlyRollupsForGroups(
|
||||||
|
db: DatabaseSync,
|
||||||
|
groups: Array<{ rollupMonth: number; videoId: number }>,
|
||||||
|
rollupNowMs: number,
|
||||||
|
): void {
|
||||||
|
if (groups.length === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deleteStmt = db.prepare(
|
||||||
|
`DELETE FROM imm_monthly_rollups WHERE rollup_month = ? AND video_id = ?`,
|
||||||
|
);
|
||||||
|
const upsertStmt = db.prepare(`
|
||||||
INSERT INTO imm_monthly_rollups (
|
INSERT INTO imm_monthly_rollups (
|
||||||
rollup_month, video_id, total_sessions, total_active_min, total_lines_seen,
|
rollup_month, video_id, total_sessions, total_active_min, total_lines_seen,
|
||||||
total_words_seen, total_tokens_seen, total_cards, CREATED_DATE, LAST_UPDATE_DATE
|
total_words_seen, total_tokens_seen, total_cards, CREATED_DATE, LAST_UPDATE_DATE
|
||||||
@@ -98,11 +181,12 @@ export function runRollupMaintenance(db: DatabaseSync): void {
|
|||||||
COALESCE(SUM(t.words_seen), 0) AS total_words_seen,
|
COALESCE(SUM(t.words_seen), 0) AS total_words_seen,
|
||||||
COALESCE(SUM(t.tokens_seen), 0) AS total_tokens_seen,
|
COALESCE(SUM(t.tokens_seen), 0) AS total_tokens_seen,
|
||||||
COALESCE(SUM(t.cards_mined), 0) AS total_cards,
|
COALESCE(SUM(t.cards_mined), 0) AS total_cards,
|
||||||
${rollupNowMs} AS CREATED_DATE,
|
? AS CREATED_DATE,
|
||||||
${rollupNowMs} AS LAST_UPDATE_DATE
|
? AS LAST_UPDATE_DATE
|
||||||
FROM imm_sessions s
|
FROM imm_sessions s
|
||||||
JOIN imm_session_telemetry t
|
JOIN imm_session_telemetry t
|
||||||
ON t.session_id = s.session_id
|
ON t.session_id = s.session_id
|
||||||
|
WHERE CAST(strftime('%Y%m', s.started_at_ms / 1000, 'unixepoch') AS INTEGER) = ? AND s.video_id = ?
|
||||||
GROUP BY rollup_month, s.video_id
|
GROUP BY rollup_month, s.video_id
|
||||||
ON CONFLICT (rollup_month, video_id) DO UPDATE SET
|
ON CONFLICT (rollup_month, video_id) DO UPDATE SET
|
||||||
total_sessions = excluded.total_sessions,
|
total_sessions = excluded.total_sessions,
|
||||||
@@ -114,4 +198,88 @@ export function runRollupMaintenance(db: DatabaseSync): void {
|
|||||||
CREATED_DATE = COALESCE(imm_monthly_rollups.CREATED_DATE, excluded.CREATED_DATE),
|
CREATED_DATE = COALESCE(imm_monthly_rollups.CREATED_DATE, excluded.CREATED_DATE),
|
||||||
LAST_UPDATE_DATE = excluded.LAST_UPDATE_DATE
|
LAST_UPDATE_DATE = excluded.LAST_UPDATE_DATE
|
||||||
`);
|
`);
|
||||||
|
|
||||||
|
for (const { rollupMonth, videoId } of groups) {
|
||||||
|
deleteStmt.run(rollupMonth, videoId);
|
||||||
|
upsertStmt.run(rollupNowMs, rollupNowMs, rollupMonth, videoId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function getAffectedRollupGroups(
|
||||||
|
db: DatabaseSync,
|
||||||
|
lastRollupSampleMs: number,
|
||||||
|
): Array<{ rollupDay: number; rollupMonth: number; videoId: number }> {
|
||||||
|
return (
|
||||||
|
db
|
||||||
|
.prepare(
|
||||||
|
`
|
||||||
|
SELECT DISTINCT
|
||||||
|
CAST(s.started_at_ms / 86400000 AS INTEGER) AS rollup_day,
|
||||||
|
CAST(strftime('%Y%m', s.started_at_ms / 1000, 'unixepoch') AS INTEGER) AS rollup_month,
|
||||||
|
s.video_id AS video_id
|
||||||
|
FROM imm_session_telemetry t
|
||||||
|
JOIN imm_sessions s
|
||||||
|
ON s.session_id = t.session_id
|
||||||
|
WHERE t.sample_ms > ?
|
||||||
|
`,
|
||||||
|
)
|
||||||
|
.all(lastRollupSampleMs) as unknown as RollupGroupRow[]
|
||||||
|
).map((row) => ({
|
||||||
|
rollupDay: row.rollup_day,
|
||||||
|
rollupMonth: row.rollup_month,
|
||||||
|
videoId: row.video_id,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
function dedupeGroups<T extends { rollupDay?: number; rollupMonth?: number; videoId: number }>(
|
||||||
|
groups: Array<T>,
|
||||||
|
): Array<T> {
|
||||||
|
const seen = new Set<string>();
|
||||||
|
const result: Array<T> = [];
|
||||||
|
for (const group of groups) {
|
||||||
|
const key = `${group.rollupDay ?? group.rollupMonth}-${group.videoId}`;
|
||||||
|
if (seen.has(key)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
seen.add(key);
|
||||||
|
result.push(group);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function runRollupMaintenance(db: DatabaseSync, forceRebuild = false): void {
|
||||||
|
const rollupNowMs = Date.now();
|
||||||
|
const lastRollupSampleMs = forceRebuild ? ZERO_ID : getLastRollupSampleMs(db);
|
||||||
|
|
||||||
|
const maxSampleRow = db
|
||||||
|
.prepare('SELECT MAX(sample_ms) AS maxSampleMs FROM imm_session_telemetry')
|
||||||
|
.get() as unknown as RollupTelemetryResult | null;
|
||||||
|
if (!maxSampleRow?.maxSampleMs) {
|
||||||
|
if (forceRebuild) {
|
||||||
|
setLastRollupSampleMs(db, ZERO_ID);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const affectedGroups = getAffectedRollupGroups(db, lastRollupSampleMs);
|
||||||
|
if (!forceRebuild && affectedGroups.length === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const dailyGroups = dedupeGroups(
|
||||||
|
affectedGroups.map((group) => ({
|
||||||
|
rollupDay: group.rollupDay,
|
||||||
|
videoId: group.videoId,
|
||||||
|
})),
|
||||||
|
);
|
||||||
|
const monthlyGroups = dedupeGroups(
|
||||||
|
affectedGroups.map((group) => ({
|
||||||
|
rollupMonth: group.rollupMonth,
|
||||||
|
videoId: group.videoId,
|
||||||
|
})),
|
||||||
|
);
|
||||||
|
|
||||||
|
upsertDailyRollupsForGroups(db, dailyGroups, rollupNowMs);
|
||||||
|
upsertMonthlyRollupsForGroups(db, monthlyGroups, rollupNowMs);
|
||||||
|
setLastRollupSampleMs(db, Number(maxSampleRow.maxSampleMs));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,6 +56,17 @@ testIfSqlite('ensureSchema creates immersion core tables', () => {
|
|||||||
assert.ok(tableNames.has('imm_monthly_rollups'));
|
assert.ok(tableNames.has('imm_monthly_rollups'));
|
||||||
assert.ok(tableNames.has('imm_words'));
|
assert.ok(tableNames.has('imm_words'));
|
||||||
assert.ok(tableNames.has('imm_kanji'));
|
assert.ok(tableNames.has('imm_kanji'));
|
||||||
|
assert.ok(tableNames.has('imm_rollup_state'));
|
||||||
|
|
||||||
|
const rollupStateRow = db
|
||||||
|
.prepare(
|
||||||
|
'SELECT state_value FROM imm_rollup_state WHERE state_key = ?',
|
||||||
|
)
|
||||||
|
.get('last_rollup_sample_ms') as {
|
||||||
|
state_value: number;
|
||||||
|
} | null;
|
||||||
|
assert.ok(rollupStateRow);
|
||||||
|
assert.equal(rollupStateRow?.state_value, 0);
|
||||||
} finally {
|
} finally {
|
||||||
db.close();
|
db.close();
|
||||||
cleanupDbPath(dbPath);
|
cleanupDbPath(dbPath);
|
||||||
|
|||||||
@@ -42,6 +42,17 @@ export function ensureSchema(db: DatabaseSync): void {
|
|||||||
applied_at_ms INTEGER NOT NULL
|
applied_at_ms INTEGER NOT NULL
|
||||||
);
|
);
|
||||||
`);
|
`);
|
||||||
|
db.exec(`
|
||||||
|
CREATE TABLE IF NOT EXISTS imm_rollup_state(
|
||||||
|
state_key TEXT PRIMARY KEY,
|
||||||
|
state_value INTEGER NOT NULL
|
||||||
|
);
|
||||||
|
`);
|
||||||
|
db.exec(`
|
||||||
|
INSERT INTO imm_rollup_state(state_key, state_value)
|
||||||
|
VALUES ('last_rollup_sample_ms', 0)
|
||||||
|
ON CONFLICT(state_key) DO NOTHING
|
||||||
|
`);
|
||||||
|
|
||||||
const currentVersion = db
|
const currentVersion = db
|
||||||
.prepare('SELECT schema_version FROM imm_schema_version ORDER BY schema_version DESC LIMIT 1')
|
.prepare('SELECT schema_version FROM imm_schema_version ORDER BY schema_version DESC LIMIT 1')
|
||||||
|
|||||||
Reference in New Issue
Block a user