refactor(core): normalize service naming across app runtime

This commit is contained in:
2026-02-17 19:00:27 -08:00
parent e38a1c945e
commit 1233e3630f
87 changed files with 2813 additions and 1636 deletions

View File

@@ -11,12 +11,12 @@ const DEFAULT_BATCH_SIZE = 25;
const DEFAULT_FLUSH_INTERVAL_MS = 500;
const DEFAULT_MAINTENANCE_INTERVAL_MS = 24 * 60 * 60 * 1000;
const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000;
const EVENTS_RETENTION_MS = ONE_WEEK_MS;
const VACUUM_INTERVAL_MS = ONE_WEEK_MS;
const TELEMETRY_RETENTION_MS = 30 * 24 * 60 * 60 * 1000;
const DAILY_ROLLUP_RETENTION_MS = 365 * 24 * 60 * 60 * 1000;
const MONTHLY_ROLLUP_RETENTION_MS = 5 * 365 * 24 * 60 * 60 * 1000;
const MAX_PAYLOAD_BYTES = 256;
const DEFAULT_EVENTS_RETENTION_MS = ONE_WEEK_MS;
const DEFAULT_VACUUM_INTERVAL_MS = ONE_WEEK_MS;
const DEFAULT_TELEMETRY_RETENTION_MS = 30 * 24 * 60 * 60 * 1000;
const DEFAULT_DAILY_ROLLUP_RETENTION_MS = 365 * 24 * 60 * 60 * 1000;
const DEFAULT_MONTHLY_ROLLUP_RETENTION_MS = 5 * 365 * 24 * 60 * 60 * 1000;
const DEFAULT_MAX_PAYLOAD_BYTES = 256;
const SOURCE_TYPE_LOCAL = 1;
const SOURCE_TYPE_REMOTE = 2;
@@ -35,6 +35,22 @@ const EVENT_PAUSE_END = 8;
export interface ImmersionTrackerOptions {
dbPath: string;
policy?: ImmersionTrackerPolicy;
}
export interface ImmersionTrackerPolicy {
queueCap?: number;
batchSize?: number;
flushIntervalMs?: number;
maintenanceIntervalMs?: number;
payloadCapBytes?: number;
retention?: {
eventsDays?: number;
telemetryDays?: number;
dailyRollupsDays?: number;
monthlyRollupsDays?: number;
vacuumIntervalDays?: number;
};
}
interface TelemetryAccumulator {
@@ -154,6 +170,12 @@ export class ImmersionTrackerService {
private readonly batchSize: number;
private readonly flushIntervalMs: number;
private readonly maintenanceIntervalMs: number;
private readonly maxPayloadBytes: number;
private readonly eventsRetentionMs: number;
private readonly telemetryRetentionMs: number;
private readonly dailyRollupRetentionMs: number;
private readonly monthlyRollupRetentionMs: number;
private readonly vacuumIntervalMs: number;
private readonly dbPath: string;
private readonly writeLock = { locked: false };
private flushTimer: ReturnType<typeof setTimeout> | null = null;
@@ -177,10 +199,69 @@ export class ImmersionTrackerService {
fs.mkdirSync(parentDir, { recursive: true });
}
this.queueCap = DEFAULT_QUEUE_CAP;
this.batchSize = DEFAULT_BATCH_SIZE;
this.flushIntervalMs = DEFAULT_FLUSH_INTERVAL_MS;
this.maintenanceIntervalMs = DEFAULT_MAINTENANCE_INTERVAL_MS;
const policy = options.policy ?? {};
this.queueCap = this.resolveBoundedInt(
policy.queueCap,
DEFAULT_QUEUE_CAP,
100,
100_000,
);
this.batchSize = this.resolveBoundedInt(
policy.batchSize,
DEFAULT_BATCH_SIZE,
1,
10_000,
);
this.flushIntervalMs = this.resolveBoundedInt(
policy.flushIntervalMs,
DEFAULT_FLUSH_INTERVAL_MS,
50,
60_000,
);
this.maintenanceIntervalMs = this.resolveBoundedInt(
policy.maintenanceIntervalMs,
DEFAULT_MAINTENANCE_INTERVAL_MS,
60_000,
7 * 24 * 60 * 60 * 1000,
);
this.maxPayloadBytes = this.resolveBoundedInt(
policy.payloadCapBytes,
DEFAULT_MAX_PAYLOAD_BYTES,
64,
8192,
);
const retention = policy.retention ?? {};
this.eventsRetentionMs = this.resolveBoundedInt(
retention.eventsDays,
Math.floor(DEFAULT_EVENTS_RETENTION_MS / 86_400_000),
1,
3650,
) * 86_400_000;
this.telemetryRetentionMs = this.resolveBoundedInt(
retention.telemetryDays,
Math.floor(DEFAULT_TELEMETRY_RETENTION_MS / 86_400_000),
1,
3650,
) * 86_400_000;
this.dailyRollupRetentionMs = this.resolveBoundedInt(
retention.dailyRollupsDays,
Math.floor(DEFAULT_DAILY_ROLLUP_RETENTION_MS / 86_400_000),
1,
36500,
) * 86_400_000;
this.monthlyRollupRetentionMs = this.resolveBoundedInt(
retention.monthlyRollupsDays,
Math.floor(DEFAULT_MONTHLY_ROLLUP_RETENTION_MS / 86_400_000),
1,
36500,
) * 86_400_000;
this.vacuumIntervalMs = this.resolveBoundedInt(
retention.vacuumIntervalDays,
Math.floor(DEFAULT_VACUUM_INTERVAL_MS / 86_400_000),
1,
3650,
) * 86_400_000;
this.lastMaintenanceMs = Date.now();
this.db = new DatabaseSync(this.dbPath);
@@ -223,9 +304,7 @@ export class ImmersionTrackerService {
this.db.close();
}
async getSessionSummaries(
limit = 50,
): Promise<SessionSummaryQueryRow[]> {
async getSessionSummaries(limit = 50): Promise<SessionSummaryQueryRow[]> {
const prepared = this.db.prepare(`
SELECT
s.video_id AS videoId,
@@ -273,7 +352,9 @@ export class ImmersionTrackerService {
totalSessions: number;
activeSessions: number;
}> {
const sessions = this.db.prepare("SELECT COUNT(*) AS total FROM imm_sessions");
const sessions = this.db.prepare(
"SELECT COUNT(*) AS total FROM imm_sessions",
);
const active = this.db.prepare(
"SELECT COUNT(*) AS total FROM imm_sessions WHERE ended_at_ms IS NULL",
);
@@ -282,9 +363,7 @@ export class ImmersionTrackerService {
return { totalSessions, activeSessions };
}
async getDailyRollups(
limit = 60,
): Promise<ImmersionSessionRollupRow[]> {
async getDailyRollups(limit = 60): Promise<ImmersionSessionRollupRow[]> {
const prepared = this.db.prepare(`
SELECT
rollup_day AS rollupDayOrMonth,
@@ -305,9 +384,7 @@ export class ImmersionTrackerService {
return prepared.all(limit) as unknown as ImmersionSessionRollupRow[];
}
async getMonthlyRollups(
limit = 24,
): Promise<ImmersionSessionRollupRow[]> {
async getMonthlyRollups(limit = 24): Promise<ImmersionSessionRollupRow[]> {
const prepared = this.db.prepare(`
SELECT
rollup_month AS rollupDayOrMonth,
@@ -352,9 +429,12 @@ export class ImmersionTrackerService {
return;
}
const sourceType = this.isRemoteSource(normalizedPath) ? SOURCE_TYPE_REMOTE : SOURCE_TYPE_LOCAL;
const sourceType = this.isRemoteSource(normalizedPath)
? SOURCE_TYPE_REMOTE
: SOURCE_TYPE_LOCAL;
const videoKey = this.buildVideoKey(normalizedPath, sourceType);
const canonicalTitle = normalizedTitle || this.deriveCanonicalTitle(normalizedPath);
const canonicalTitle =
normalizedTitle || this.deriveCanonicalTitle(normalizedPath);
const sourcePath = sourceType === SOURCE_TYPE_LOCAL ? normalizedPath : null;
const sourceUrl = sourceType === SOURCE_TYPE_REMOTE ? normalizedPath : null;
@@ -372,7 +452,11 @@ export class ImmersionTrackerService {
`Starting immersion session for path=${normalizedPath} videoId=${sessionInfo.videoId}`,
);
this.startSession(sessionInfo.videoId, sessionInfo.startedAtMs);
this.captureVideoMetadataAsync(sessionInfo.videoId, sourceType, normalizedPath);
this.captureVideoMetadataAsync(
sessionInfo.videoId,
sourceType,
normalizedPath,
);
}
handleMediaTitleUpdate(mediaTitle: string | null): void {
@@ -383,11 +467,7 @@ export class ImmersionTrackerService {
this.updateVideoTitleForActiveSession(normalizedTitle);
}
recordSubtitleLine(
text: string,
startSec: number,
endSec: number,
): void {
recordSubtitleLine(text: string, startSec: number, endSec: number): void {
if (!this.sessionState || !text.trim()) return;
const cleaned = this.normalizeText(text);
if (!cleaned) return;
@@ -418,7 +498,11 @@ export class ImmersionTrackerService {
}
recordPlaybackPosition(mediaTimeSec: number | null): void {
if (!this.sessionState || mediaTimeSec === null || !Number.isFinite(mediaTimeSec)) {
if (
!this.sessionState ||
mediaTimeSec === null ||
!Number.isFinite(mediaTimeSec)
) {
return;
}
const nowMs = Date.now();
@@ -637,7 +721,10 @@ export class ImmersionTrackerService {
return;
}
const batch = this.queue.splice(0, Math.min(this.batchSize, this.queue.length));
const batch = this.queue.splice(
0,
Math.min(this.batchSize, this.queue.length),
);
this.writeLock.locked = true;
try {
this.db.exec("BEGIN IMMEDIATE");
@@ -648,7 +735,10 @@ export class ImmersionTrackerService {
} catch (error) {
this.db.exec("ROLLBACK");
this.queue.unshift(...batch);
this.logger.warn("Immersion tracker flush failed, retrying later", error as Error);
this.logger.warn(
"Immersion tracker flush failed, retrying later",
error as Error,
);
} finally {
this.writeLock.locked = false;
this.flushScheduled = false;
@@ -850,6 +940,18 @@ export class ImmersionTrackerService {
`);
}
private resolveBoundedInt(
value: number | undefined,
fallback: number,
min: number,
max: number,
): number {
if (!Number.isFinite(value)) return fallback;
const candidate = Math.floor(value as number);
if (candidate < min || candidate > max) return fallback;
return candidate;
}
private scheduleMaintenance(): void {
this.maintenanceTimer = setInterval(() => {
this.runMaintenance();
@@ -863,26 +965,33 @@ export class ImmersionTrackerService {
this.flushTelemetry(true);
this.flushNow();
const nowMs = Date.now();
const eventCutoff = nowMs - EVENTS_RETENTION_MS;
const telemetryCutoff = nowMs - TELEMETRY_RETENTION_MS;
const dailyCutoff = nowMs - DAILY_ROLLUP_RETENTION_MS;
const monthlyCutoff = nowMs - MONTHLY_ROLLUP_RETENTION_MS;
const eventCutoff = nowMs - this.eventsRetentionMs;
const telemetryCutoff = nowMs - this.telemetryRetentionMs;
const dailyCutoff = nowMs - this.dailyRollupRetentionMs;
const monthlyCutoff = nowMs - this.monthlyRollupRetentionMs;
const dayCutoff = Math.floor(dailyCutoff / 86_400_000);
const monthCutoff = this.toMonthKey(monthlyCutoff);
this.db.prepare(`DELETE FROM imm_session_events WHERE ts_ms < ?`).run(eventCutoff);
this.db.prepare(`DELETE FROM imm_session_telemetry WHERE sample_ms < ?`).run(telemetryCutoff);
this.db.prepare(`DELETE FROM imm_daily_rollups WHERE rollup_day < ?`).run(dayCutoff);
this.db.prepare(`DELETE FROM imm_monthly_rollups WHERE rollup_month < ?`).run(monthCutoff);
this.db
.prepare(`DELETE FROM imm_sessions WHERE ended_at_ms IS NOT NULL AND ended_at_ms < ?`)
.prepare(`DELETE FROM imm_session_events WHERE ts_ms < ?`)
.run(eventCutoff);
this.db
.prepare(`DELETE FROM imm_session_telemetry WHERE sample_ms < ?`)
.run(telemetryCutoff);
this.db
.prepare(`DELETE FROM imm_daily_rollups WHERE rollup_day < ?`)
.run(dayCutoff);
this.db
.prepare(`DELETE FROM imm_monthly_rollups WHERE rollup_month < ?`)
.run(monthCutoff);
this.db
.prepare(
`DELETE FROM imm_sessions WHERE ended_at_ms IS NOT NULL AND ended_at_ms < ?`,
)
.run(telemetryCutoff);
this.runRollupMaintenance();
if (
nowMs - this.lastVacuumMs >= VACUUM_INTERVAL_MS
&& !this.writeLock.locked
) {
if (nowMs - this.lastVacuumMs >= this.vacuumIntervalMs && !this.writeLock.locked) {
this.db.exec("VACUUM");
this.lastVacuumMs = nowMs;
}
@@ -1007,16 +1116,21 @@ export class ImmersionTrackerService {
this.scheduleFlush(0);
}
private startSessionStatement(videoId: number, startedAtMs: number): {
private startSessionStatement(
videoId: number,
startedAtMs: number,
): {
lastInsertRowid: number | bigint;
} {
const sessionUuid = crypto.randomUUID();
return this.db
.prepare(`
.prepare(
`
INSERT INTO imm_sessions (
session_uuid, video_id, started_at_ms, status, created_at_ms, updated_at_ms
) VALUES (?, ?, ?, ?, ?, ?)
`)
`,
)
.run(
sessionUuid,
videoId,
@@ -1055,16 +1169,24 @@ export class ImmersionTrackerService {
.prepare(
"UPDATE imm_sessions SET ended_at_ms = ?, status = ?, updated_at_ms = ? WHERE session_id = ?",
)
.run(endedAt, SESSION_STATUS_ENDED, Date.now(), this.sessionState.sessionId);
.run(
endedAt,
SESSION_STATUS_ENDED,
Date.now(),
this.sessionState.sessionId,
);
this.sessionState = null;
}
private getOrCreateVideo(videoKey: string, details: {
canonicalTitle: string;
sourcePath: string | null;
sourceUrl: string | null;
sourceType: number;
}): number {
private getOrCreateVideo(
videoKey: string,
details: {
canonicalTitle: string;
sourcePath: string | null;
sourceUrl: string | null;
sourceType: number;
},
): number {
const existing = this.db
.prepare("SELECT video_id FROM imm_videos WHERE video_key = ?")
.get(videoKey) as { video_id: number } | null;
@@ -1073,7 +1195,11 @@ export class ImmersionTrackerService {
.prepare(
"UPDATE imm_videos SET canonical_title = ?, updated_at_ms = ? WHERE video_id = ?",
)
.run(details.canonicalTitle || "unknown", Date.now(), existing.video_id);
.run(
details.canonicalTitle || "unknown",
Date.now(),
existing.video_id,
);
return existing.video_id;
}
@@ -1112,7 +1238,8 @@ export class ImmersionTrackerService {
private updateVideoMetadata(videoId: number, metadata: VideoMetadata): void {
this.db
.prepare(`
.prepare(
`
UPDATE imm_videos
SET
duration_ms = ?,
@@ -1129,7 +1256,8 @@ export class ImmersionTrackerService {
metadata_json = ?,
updated_at_ms = ?
WHERE video_id = ?
`)
`,
)
.run(
metadata.durationMs,
metadata.fileSizeBytes,
@@ -1167,7 +1295,9 @@ export class ImmersionTrackerService {
})();
}
private async getLocalVideoMetadata(mediaPath: string): Promise<VideoMetadata> {
private async getLocalVideoMetadata(
mediaPath: string,
): Promise<VideoMetadata> {
const hash = await this.computeSha256(mediaPath);
const info = await this.runFfprobe(mediaPath);
const stat = await fs.promises.stat(mediaPath);
@@ -1342,14 +1472,17 @@ export class ImmersionTrackerService {
private sanitizePayload(payload: Record<string, unknown>): string {
const json = JSON.stringify(payload);
return json.length <= MAX_PAYLOAD_BYTES
return json.length <= this.maxPayloadBytes
? json
: JSON.stringify({ truncated: true });
}
private calculateTextMetrics(value: string): { words: number; tokens: number } {
private calculateTextMetrics(value: string): {
words: number;
tokens: number;
} {
const words = value.split(/\s+/).filter(Boolean).length;
const cjkCount = (value.match(/[\u3040-\u30ff\u4e00-\u9fff]/g)?.length ?? 0);
const cjkCount = value.match(/[\u3040-\u30ff\u4e00-\u9fff]/g)?.length ?? 0;
const tokens = Math.max(words, cjkCount);
return { words, tokens };
}
@@ -1401,7 +1534,8 @@ export class ImmersionTrackerService {
}
private toNullableInt(value: number | null | undefined): number | null {
if (value === null || value === undefined || !Number.isFinite(value)) return null;
if (value === null || value === undefined || !Number.isFinite(value))
return null;
return value;
}