Files
SubMiner/src/core/services/immersion-tracker-service.ts
sudacode f005f542a3 feat(immersion): add anime metadata, occurrence tracking, and schema upgrades
- Add imm_anime table with AniList integration
- Add imm_subtitle_lines, imm_word_line_occurrences, imm_kanji_line_occurrences
- Add POS fields (part_of_speech, pos1, pos2, pos3) to imm_words
- Add anime metadata parsing with guessit fallback
- Add video duration tracking and watched status
- Add episode, streak, trend, and word/kanji detail queries
- Deduplicate subtitle line recording within sessions
- Pass Anki note IDs through card mining callback chain
2026-03-14 23:11:27 -07:00

1090 lines
33 KiB
TypeScript

import path from 'node:path';
import * as fs from 'node:fs';
import { createLogger } from '../../logger';
import type { CoverArtFetcher } from './anilist/cover-art-fetcher';
import { getLocalVideoMetadata, guessAnimeVideoMetadata } from './immersion-tracker/metadata';
import { pruneRetention, runRollupMaintenance } from './immersion-tracker/maintenance';
import { Database, type DatabaseSync } from './immersion-tracker/sqlite';
import { finalizeSessionRecord, startSessionRecord } from './immersion-tracker/session';
import {
applyPragmas,
createTrackerPreparedStatements,
ensureSchema,
executeQueuedWrite,
getOrCreateAnimeRecord,
getOrCreateVideoRecord,
linkVideoToAnimeRecord,
type TrackerPreparedStatements,
updateVideoMetadataRecord,
updateVideoTitleRecord,
} from './immersion-tracker/storage';
import {
cleanupVocabularyStats,
getAnimeCoverArt,
getAnimeDailyRollups,
getAnimeAnilistEntries,
getAnimeDetail,
getAnimeEpisodes,
getAnimeLibrary,
getAnimeWords,
getEpisodeCardEvents,
getEpisodeSessions,
getEpisodeWords,
getCoverArt,
getDailyRollups,
getEpisodesPerDay,
getKanjiAnimeAppearances,
getKanjiDetail,
getKanjiWords,
getNewAnimePerDay,
getSimilarWords,
getStreakCalendar,
getKanjiOccurrences,
getKanjiStats,
getMediaDailyRollups,
getMediaDetail,
getMediaLibrary,
getMediaSessions,
getMonthlyRollups,
getQueryHints,
getSessionEvents,
getSessionSummaries,
getSessionTimeline,
getVocabularyStats,
getWatchTimePerAnime,
getWordAnimeAppearances,
getWordDetail,
getWordOccurrences,
getVideoDurationMs,
markVideoWatched,
} from './immersion-tracker/query';
import {
buildVideoKey,
calculateTextMetrics,
deriveCanonicalTitle,
isKanji,
isRemoteSource,
normalizeMediaPath,
normalizeText,
resolveBoundedInt,
sanitizePayload,
secToMs,
} from './immersion-tracker/reducer';
import { enqueueWrite } from './immersion-tracker/queue';
import {
DEFAULT_BATCH_SIZE,
DEFAULT_DAILY_ROLLUP_RETENTION_MS,
DEFAULT_EVENTS_RETENTION_MS,
DEFAULT_FLUSH_INTERVAL_MS,
DEFAULT_MAINTENANCE_INTERVAL_MS,
DEFAULT_MAX_PAYLOAD_BYTES,
DEFAULT_MONTHLY_ROLLUP_RETENTION_MS,
DEFAULT_QUEUE_CAP,
DEFAULT_TELEMETRY_RETENTION_MS,
DEFAULT_VACUUM_INTERVAL_MS,
EVENT_CARD_MINED,
EVENT_LOOKUP,
EVENT_MEDIA_BUFFER,
EVENT_PAUSE_END,
EVENT_PAUSE_START,
EVENT_SEEK_BACKWARD,
EVENT_SEEK_FORWARD,
EVENT_SUBTITLE_LINE,
SOURCE_TYPE_LOCAL,
SOURCE_TYPE_REMOTE,
type ImmersionSessionRollupRow,
type EpisodeCardEventRow,
type EpisodesPerDayRow,
type ImmersionTrackerOptions,
type KanjiAnimeAppearanceRow,
type KanjiDetailRow,
type KanjiOccurrenceRow,
type KanjiStatsRow,
type KanjiWordRow,
type LegacyVocabularyPosResolution,
type LegacyVocabularyPosRow,
type AnimeAnilistEntryRow,
type AnimeDetailRow,
type AnimeEpisodeRow,
type AnimeLibraryRow,
type AnimeWordRow,
type MediaArtRow,
type MediaDetailRow,
type MediaLibraryRow,
type NewAnimePerDayRow,
type QueuedWrite,
type SessionEventRow,
type SessionState,
type SessionSummaryQueryRow,
type SessionTimelineRow,
type SimilarWordRow,
type StreakCalendarRow,
type VocabularyCleanupSummary,
type WatchTimePerAnimeRow,
type WordAnimeAppearanceRow,
type WordDetailRow,
type WordOccurrenceRow,
type VocabularyStatsRow,
} from './immersion-tracker/types';
import type { MergedToken } from '../../types';
import { shouldExcludeTokenFromVocabularyPersistence } from './tokenizer/annotation-stage';
import { deriveStoredPartOfSpeech } from './tokenizer/part-of-speech';
export type {
AnimeAnilistEntryRow,
AnimeDetailRow,
AnimeEpisodeRow,
AnimeLibraryRow,
AnimeWordRow,
EpisodeCardEventRow,
EpisodesPerDayRow,
ImmersionSessionRollupRow,
ImmersionTrackerOptions,
ImmersionTrackerPolicy,
KanjiAnimeAppearanceRow,
KanjiDetailRow,
KanjiOccurrenceRow,
KanjiStatsRow,
KanjiWordRow,
MediaArtRow,
MediaDetailRow,
MediaLibraryRow,
NewAnimePerDayRow,
SessionEventRow,
SessionSummaryQueryRow,
SessionTimelineRow,
SimilarWordRow,
StreakCalendarRow,
WatchTimePerAnimeRow,
WordAnimeAppearanceRow,
WordDetailRow,
WordOccurrenceRow,
VocabularyStatsRow,
} from './immersion-tracker/types';
export class ImmersionTrackerService {
private readonly logger = createLogger('main:immersion-tracker');
private readonly db: DatabaseSync;
private readonly queue: QueuedWrite[] = [];
private readonly queueCap: number;
private readonly batchSize: number;
private readonly flushIntervalMs: number;
private readonly maintenanceIntervalMs: number;
private readonly maxPayloadBytes: number;
private readonly eventsRetentionMs: number;
private readonly telemetryRetentionMs: number;
private readonly dailyRollupRetentionMs: number;
private readonly monthlyRollupRetentionMs: number;
private readonly vacuumIntervalMs: number;
private readonly dbPath: string;
private readonly writeLock = { locked: false };
private flushTimer: ReturnType<typeof setTimeout> | null = null;
private maintenanceTimer: ReturnType<typeof setInterval> | null = null;
private flushScheduled = false;
private droppedWriteCount = 0;
private lastVacuumMs = 0;
private isDestroyed = false;
private sessionState: SessionState | null = null;
private currentVideoKey = '';
private currentMediaPathOrUrl = '';
private readonly preparedStatements: TrackerPreparedStatements;
private coverArtFetcher: CoverArtFetcher | null = null;
private readonly pendingCoverFetches = new Map<number, Promise<boolean>>();
private readonly recordedSubtitleKeys = new Set<string>();
private readonly pendingAnimeMetadataUpdates = new Map<number, Promise<void>>();
private readonly resolveLegacyVocabularyPos:
| ((row: LegacyVocabularyPosRow) => Promise<LegacyVocabularyPosResolution | null>)
| undefined;
constructor(options: ImmersionTrackerOptions) {
this.dbPath = options.dbPath;
this.resolveLegacyVocabularyPos = options.resolveLegacyVocabularyPos;
const parentDir = path.dirname(this.dbPath);
if (!fs.existsSync(parentDir)) {
fs.mkdirSync(parentDir, { recursive: true });
}
const policy = options.policy ?? {};
this.queueCap = resolveBoundedInt(policy.queueCap, DEFAULT_QUEUE_CAP, 100, 100_000);
this.batchSize = resolveBoundedInt(policy.batchSize, DEFAULT_BATCH_SIZE, 1, 10_000);
this.flushIntervalMs = resolveBoundedInt(
policy.flushIntervalMs,
DEFAULT_FLUSH_INTERVAL_MS,
50,
60_000,
);
this.maintenanceIntervalMs = resolveBoundedInt(
policy.maintenanceIntervalMs,
DEFAULT_MAINTENANCE_INTERVAL_MS,
60_000,
7 * 24 * 60 * 60 * 1000,
);
this.maxPayloadBytes = resolveBoundedInt(
policy.payloadCapBytes,
DEFAULT_MAX_PAYLOAD_BYTES,
64,
8192,
);
const retention = policy.retention ?? {};
this.eventsRetentionMs =
resolveBoundedInt(
retention.eventsDays,
Math.floor(DEFAULT_EVENTS_RETENTION_MS / 86_400_000),
1,
3650,
) * 86_400_000;
this.telemetryRetentionMs =
resolveBoundedInt(
retention.telemetryDays,
Math.floor(DEFAULT_TELEMETRY_RETENTION_MS / 86_400_000),
1,
3650,
) * 86_400_000;
this.dailyRollupRetentionMs =
resolveBoundedInt(
retention.dailyRollupsDays,
Math.floor(DEFAULT_DAILY_ROLLUP_RETENTION_MS / 86_400_000),
1,
36500,
) * 86_400_000;
this.monthlyRollupRetentionMs =
resolveBoundedInt(
retention.monthlyRollupsDays,
Math.floor(DEFAULT_MONTHLY_ROLLUP_RETENTION_MS / 86_400_000),
1,
36500,
) * 86_400_000;
this.vacuumIntervalMs =
resolveBoundedInt(
retention.vacuumIntervalDays,
Math.floor(DEFAULT_VACUUM_INTERVAL_MS / 86_400_000),
1,
3650,
) * 86_400_000;
this.db = new Database(this.dbPath);
applyPragmas(this.db);
ensureSchema(this.db);
this.preparedStatements = createTrackerPreparedStatements(this.db);
this.scheduleMaintenance();
this.scheduleFlush();
}
destroy(): void {
if (this.isDestroyed) return;
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
if (this.maintenanceTimer) {
clearInterval(this.maintenanceTimer);
this.maintenanceTimer = null;
}
this.finalizeActiveSession();
this.isDestroyed = true;
this.db.close();
}
async getSessionSummaries(limit = 50): Promise<SessionSummaryQueryRow[]> {
return getSessionSummaries(this.db, limit);
}
async getSessionTimeline(sessionId: number, limit = 200): Promise<SessionTimelineRow[]> {
return getSessionTimeline(this.db, sessionId, limit);
}
async getQueryHints(): Promise<{
totalSessions: number;
activeSessions: number;
episodesToday: number;
activeAnimeCount: number;
}> {
return getQueryHints(this.db);
}
async getDailyRollups(limit = 60): Promise<ImmersionSessionRollupRow[]> {
return getDailyRollups(this.db, limit);
}
async getMonthlyRollups(limit = 24): Promise<ImmersionSessionRollupRow[]> {
return getMonthlyRollups(this.db, limit);
}
async getVocabularyStats(limit = 100, excludePos?: string[]): Promise<VocabularyStatsRow[]> {
return getVocabularyStats(this.db, limit, excludePos);
}
async cleanupVocabularyStats(): Promise<VocabularyCleanupSummary> {
return cleanupVocabularyStats(this.db, {
resolveLegacyPos: this.resolveLegacyVocabularyPos,
});
}
async getKanjiStats(limit = 100): Promise<KanjiStatsRow[]> {
return getKanjiStats(this.db, limit);
}
async getWordOccurrences(
headword: string,
word: string,
reading: string,
limit = 100,
offset = 0,
): Promise<WordOccurrenceRow[]> {
return getWordOccurrences(this.db, headword, word, reading, limit, offset);
}
async getKanjiOccurrences(
kanji: string,
limit = 100,
offset = 0,
): Promise<KanjiOccurrenceRow[]> {
return getKanjiOccurrences(this.db, kanji, limit, offset);
}
async getSessionEvents(sessionId: number, limit = 500): Promise<SessionEventRow[]> {
return getSessionEvents(this.db, sessionId, limit);
}
async getMediaLibrary(): Promise<MediaLibraryRow[]> {
return getMediaLibrary(this.db);
}
async getMediaDetail(videoId: number): Promise<MediaDetailRow | null> {
return getMediaDetail(this.db, videoId);
}
async getMediaSessions(videoId: number, limit = 100): Promise<SessionSummaryQueryRow[]> {
return getMediaSessions(this.db, videoId, limit);
}
async getMediaDailyRollups(videoId: number, limit = 90): Promise<ImmersionSessionRollupRow[]> {
return getMediaDailyRollups(this.db, videoId, limit);
}
async getCoverArt(videoId: number): Promise<MediaArtRow | null> {
return getCoverArt(this.db, videoId);
}
async getAnimeLibrary(): Promise<AnimeLibraryRow[]> {
return getAnimeLibrary(this.db);
}
async getAnimeDetail(animeId: number): Promise<AnimeDetailRow | null> {
return getAnimeDetail(this.db, animeId);
}
async getAnimeEpisodes(animeId: number): Promise<AnimeEpisodeRow[]> {
return getAnimeEpisodes(this.db, animeId);
}
async getAnimeAnilistEntries(animeId: number): Promise<AnimeAnilistEntryRow[]> {
return getAnimeAnilistEntries(this.db, animeId);
}
async getAnimeCoverArt(animeId: number): Promise<MediaArtRow | null> {
return getAnimeCoverArt(this.db, animeId);
}
async getAnimeWords(animeId: number, limit = 50): Promise<AnimeWordRow[]> {
return getAnimeWords(this.db, animeId, limit);
}
async getEpisodeWords(videoId: number, limit = 50): Promise<AnimeWordRow[]> {
return getEpisodeWords(this.db, videoId, limit);
}
async getEpisodeSessions(videoId: number): Promise<SessionSummaryQueryRow[]> {
return getEpisodeSessions(this.db, videoId);
}
async setVideoWatched(videoId: number, watched: boolean): Promise<void> {
markVideoWatched(this.db, videoId, watched);
}
async getEpisodeCardEvents(videoId: number): Promise<EpisodeCardEventRow[]> {
return getEpisodeCardEvents(this.db, videoId);
}
async getAnimeDailyRollups(animeId: number, limit = 90): Promise<ImmersionSessionRollupRow[]> {
return getAnimeDailyRollups(this.db, animeId, limit);
}
async getStreakCalendar(days = 90): Promise<StreakCalendarRow[]> {
return getStreakCalendar(this.db, days);
}
async getEpisodesPerDay(limit = 90): Promise<EpisodesPerDayRow[]> {
return getEpisodesPerDay(this.db, limit);
}
async getNewAnimePerDay(limit = 90): Promise<NewAnimePerDayRow[]> {
return getNewAnimePerDay(this.db, limit);
}
async getWatchTimePerAnime(limit = 90): Promise<WatchTimePerAnimeRow[]> {
return getWatchTimePerAnime(this.db, limit);
}
async getWordDetail(wordId: number): Promise<WordDetailRow | null> {
return getWordDetail(this.db, wordId);
}
async getWordAnimeAppearances(wordId: number): Promise<WordAnimeAppearanceRow[]> {
return getWordAnimeAppearances(this.db, wordId);
}
async getSimilarWords(wordId: number, limit = 10): Promise<SimilarWordRow[]> {
return getSimilarWords(this.db, wordId, limit);
}
async getKanjiDetail(kanjiId: number): Promise<KanjiDetailRow | null> {
return getKanjiDetail(this.db, kanjiId);
}
async getKanjiAnimeAppearances(kanjiId: number): Promise<KanjiAnimeAppearanceRow[]> {
return getKanjiAnimeAppearances(this.db, kanjiId);
}
async getKanjiWords(kanjiId: number, limit = 20): Promise<KanjiWordRow[]> {
return getKanjiWords(this.db, kanjiId, limit);
}
setCoverArtFetcher(fetcher: CoverArtFetcher | null): void {
this.coverArtFetcher = fetcher;
}
async ensureCoverArt(videoId: number): Promise<boolean> {
const existing = getCoverArt(this.db, videoId);
if (existing?.coverBlob) {
return true;
}
if (!this.coverArtFetcher) {
return false;
}
const inFlight = this.pendingCoverFetches.get(videoId);
if (inFlight) {
return await inFlight;
}
const fetchPromise = (async () => {
const detail = getMediaDetail(this.db, videoId);
const canonicalTitle = detail?.canonicalTitle?.trim();
if (!canonicalTitle) {
return false;
}
return await this.coverArtFetcher!.fetchIfMissing(this.db, videoId, canonicalTitle);
})();
this.pendingCoverFetches.set(videoId, fetchPromise);
try {
return await fetchPromise;
} finally {
this.pendingCoverFetches.delete(videoId);
}
}
handleMediaChange(mediaPath: string | null, mediaTitle: string | null): void {
const normalizedPath = normalizeMediaPath(mediaPath);
const normalizedTitle = normalizeText(mediaTitle);
this.logger.info(
`handleMediaChange called with path=${normalizedPath || '<empty>'} title=${normalizedTitle || '<empty>'}`,
);
if (normalizedPath === this.currentMediaPathOrUrl) {
if (normalizedTitle && normalizedTitle !== this.currentVideoKey) {
this.currentVideoKey = normalizedTitle;
this.updateVideoTitleForActiveSession(normalizedTitle);
this.logger.debug('Media title updated for existing session');
} else {
this.logger.debug('Media change ignored; path unchanged');
}
return;
}
this.finalizeActiveSession();
this.currentMediaPathOrUrl = normalizedPath;
this.currentVideoKey = normalizedTitle;
if (!normalizedPath) {
this.logger.info('Media path cleared; immersion session tracking paused');
return;
}
const sourceType = isRemoteSource(normalizedPath) ? SOURCE_TYPE_REMOTE : SOURCE_TYPE_LOCAL;
const videoKey = buildVideoKey(normalizedPath, sourceType);
const canonicalTitle = normalizedTitle || deriveCanonicalTitle(normalizedPath);
const sourcePath = sourceType === SOURCE_TYPE_LOCAL ? normalizedPath : null;
const sourceUrl = sourceType === SOURCE_TYPE_REMOTE ? normalizedPath : null;
const sessionInfo = {
videoId: getOrCreateVideoRecord(this.db, videoKey, {
canonicalTitle,
sourcePath,
sourceUrl,
sourceType,
}),
startedAtMs: Date.now(),
};
this.logger.info(
`Starting immersion session for path=${normalizedPath} videoId=${sessionInfo.videoId}`,
);
this.startSession(sessionInfo.videoId, sessionInfo.startedAtMs);
this.captureAnimeMetadataAsync(sessionInfo.videoId, normalizedPath, normalizedTitle || null);
this.captureVideoMetadataAsync(sessionInfo.videoId, sourceType, normalizedPath);
}
handleMediaTitleUpdate(mediaTitle: string | null): void {
if (!this.sessionState) return;
const normalizedTitle = normalizeText(mediaTitle);
if (!normalizedTitle) return;
this.currentVideoKey = normalizedTitle;
this.updateVideoTitleForActiveSession(normalizedTitle);
}
recordSubtitleLine(
text: string,
startSec: number,
endSec: number,
tokens?: MergedToken[] | null,
): void {
if (!this.sessionState || !text.trim()) return;
const cleaned = normalizeText(text);
if (!cleaned) return;
if (!endSec || endSec <= 0) {
return;
}
const startMs = secToMs(startSec);
const subtitleKey = `${startMs}:${cleaned}`;
if (this.recordedSubtitleKeys.has(subtitleKey)) {
return;
}
this.recordedSubtitleKeys.add(subtitleKey);
const nowMs = Date.now();
const nowSec = nowMs / 1000;
const metrics = calculateTextMetrics(cleaned);
this.sessionState.currentLineIndex += 1;
this.sessionState.linesSeen += 1;
this.sessionState.wordsSeen += metrics.words;
this.sessionState.tokensSeen += metrics.tokens;
this.sessionState.pendingTelemetry = true;
const wordOccurrences = new Map<
string,
{
headword: string;
word: string;
reading: string;
partOfSpeech: string;
pos1: string;
pos2: string;
pos3: string;
occurrenceCount: number;
}
>();
for (const token of tokens ?? []) {
if (shouldExcludeTokenFromVocabularyPersistence(token)) {
continue;
}
const headword = normalizeText(token.headword || token.surface);
const word = normalizeText(token.surface || token.headword);
const reading = normalizeText(token.reading);
if (!headword || !word) {
continue;
}
const wordKey = [
headword,
word,
reading,
].join('\u0000');
const storedPartOfSpeech = deriveStoredPartOfSpeech({
partOfSpeech: token.partOfSpeech,
pos1: token.pos1 ?? '',
});
const existing = wordOccurrences.get(wordKey);
if (existing) {
existing.occurrenceCount += 1;
continue;
}
wordOccurrences.set(wordKey, {
headword,
word,
reading,
partOfSpeech: storedPartOfSpeech,
pos1: token.pos1 ?? '',
pos2: token.pos2 ?? '',
pos3: token.pos3 ?? '',
occurrenceCount: 1,
});
}
const kanjiCounts = new Map<string, number>();
for (const char of cleaned) {
if (!isKanji(char)) {
continue;
}
kanjiCounts.set(char, (kanjiCounts.get(char) ?? 0) + 1);
}
this.recordWrite({
kind: 'subtitleLine',
sessionId: this.sessionState.sessionId,
videoId: this.sessionState.videoId,
lineIndex: this.sessionState.currentLineIndex,
segmentStartMs: secToMs(startSec),
segmentEndMs: secToMs(endSec),
text: cleaned,
wordOccurrences: Array.from(wordOccurrences.values()),
kanjiOccurrences: Array.from(kanjiCounts.entries()).map(([kanji, occurrenceCount]) => ({
kanji,
occurrenceCount,
})),
firstSeen: nowSec,
lastSeen: nowSec,
});
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: nowMs,
lineIndex: this.sessionState.currentLineIndex,
segmentStartMs: secToMs(startSec),
segmentEndMs: secToMs(endSec),
wordsDelta: metrics.words,
cardsDelta: 0,
eventType: EVENT_SUBTITLE_LINE,
payloadJson: sanitizePayload(
{
event: 'subtitle-line',
text: cleaned,
words: metrics.words,
},
this.maxPayloadBytes,
),
});
}
recordMediaDuration(durationSec: number): void {
if (!this.sessionState || !Number.isFinite(durationSec) || durationSec <= 0) return;
const durationMs = Math.round(durationSec * 1000);
const current = getVideoDurationMs(this.db, this.sessionState.videoId);
if (current === 0 || Math.abs(current - durationMs) > 1000) {
this.db.prepare('UPDATE imm_videos SET duration_ms = ?, LAST_UPDATE_DATE = ? WHERE video_id = ?')
.run(durationMs, Date.now(), this.sessionState.videoId);
}
}
recordPlaybackPosition(mediaTimeSec: number | null): void {
if (!this.sessionState || mediaTimeSec === null || !Number.isFinite(mediaTimeSec)) {
return;
}
const nowMs = Date.now();
const mediaMs = Math.round(mediaTimeSec * 1000);
if (this.sessionState.lastWallClockMs <= 0) {
this.sessionState.lastWallClockMs = nowMs;
this.sessionState.lastMediaMs = mediaMs;
return;
}
const wallDeltaMs = nowMs - this.sessionState.lastWallClockMs;
if (wallDeltaMs > 0 && wallDeltaMs < 60_000) {
this.sessionState.totalWatchedMs += wallDeltaMs;
if (!this.sessionState.isPaused) {
this.sessionState.activeWatchedMs += wallDeltaMs;
}
}
if (this.sessionState.lastMediaMs !== null) {
const mediaDeltaMs = mediaMs - this.sessionState.lastMediaMs;
if (Math.abs(mediaDeltaMs) >= 1_000) {
if (mediaDeltaMs > 0) {
this.sessionState.seekForwardCount += 1;
this.sessionState.pendingTelemetry = true;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: nowMs,
eventType: EVENT_SEEK_FORWARD,
wordsDelta: 0,
cardsDelta: 0,
segmentStartMs: this.sessionState.lastMediaMs,
segmentEndMs: mediaMs,
payloadJson: sanitizePayload(
{
fromMs: this.sessionState.lastMediaMs,
toMs: mediaMs,
},
this.maxPayloadBytes,
),
});
} else if (mediaDeltaMs < 0) {
this.sessionState.seekBackwardCount += 1;
this.sessionState.pendingTelemetry = true;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: nowMs,
eventType: EVENT_SEEK_BACKWARD,
wordsDelta: 0,
cardsDelta: 0,
segmentStartMs: this.sessionState.lastMediaMs,
segmentEndMs: mediaMs,
payloadJson: sanitizePayload(
{
fromMs: this.sessionState.lastMediaMs,
toMs: mediaMs,
},
this.maxPayloadBytes,
),
});
}
}
}
this.sessionState.lastWallClockMs = nowMs;
this.sessionState.lastMediaMs = mediaMs;
this.sessionState.pendingTelemetry = true;
if (!this.sessionState.markedWatched) {
const durationMs = getVideoDurationMs(this.db, this.sessionState.videoId);
if (durationMs > 0 && mediaMs >= durationMs * 0.98) {
markVideoWatched(this.db, this.sessionState.videoId, true);
this.sessionState.markedWatched = true;
}
}
}
recordPauseState(isPaused: boolean): void {
if (!this.sessionState) return;
if (this.sessionState.isPaused === isPaused) return;
const nowMs = Date.now();
this.sessionState.isPaused = isPaused;
if (isPaused) {
this.sessionState.lastPauseStartMs = nowMs;
this.sessionState.pauseCount += 1;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: nowMs,
eventType: EVENT_PAUSE_START,
cardsDelta: 0,
wordsDelta: 0,
payloadJson: sanitizePayload({ paused: true }, this.maxPayloadBytes),
});
} else {
if (this.sessionState.lastPauseStartMs) {
const pauseMs = Math.max(0, nowMs - this.sessionState.lastPauseStartMs);
this.sessionState.pauseMs += pauseMs;
this.sessionState.lastPauseStartMs = null;
}
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: nowMs,
eventType: EVENT_PAUSE_END,
cardsDelta: 0,
wordsDelta: 0,
payloadJson: sanitizePayload({ paused: false }, this.maxPayloadBytes),
});
}
this.sessionState.pendingTelemetry = true;
}
recordLookup(hit: boolean): void {
if (!this.sessionState) return;
this.sessionState.lookupCount += 1;
if (hit) {
this.sessionState.lookupHits += 1;
}
this.sessionState.pendingTelemetry = true;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: Date.now(),
eventType: EVENT_LOOKUP,
cardsDelta: 0,
wordsDelta: 0,
payloadJson: sanitizePayload(
{
hit,
},
this.maxPayloadBytes,
),
});
}
recordCardsMined(count = 1, noteIds?: number[]): void {
if (!this.sessionState) return;
this.sessionState.cardsMined += count;
this.sessionState.pendingTelemetry = true;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: Date.now(),
eventType: EVENT_CARD_MINED,
wordsDelta: 0,
cardsDelta: count,
payloadJson: sanitizePayload(
{ cardsMined: count, ...(noteIds?.length ? { noteIds } : {}) },
this.maxPayloadBytes,
),
});
}
recordMediaBufferEvent(): void {
if (!this.sessionState) return;
this.sessionState.mediaBufferEvents += 1;
this.sessionState.pendingTelemetry = true;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: Date.now(),
eventType: EVENT_MEDIA_BUFFER,
cardsDelta: 0,
wordsDelta: 0,
payloadJson: sanitizePayload(
{
buffer: true,
},
this.maxPayloadBytes,
),
});
}
private recordWrite(write: QueuedWrite): void {
if (this.isDestroyed) return;
const { dropped } = enqueueWrite(this.queue, write, this.queueCap);
if (dropped > 0) {
this.droppedWriteCount += dropped;
this.logger.warn(`Immersion tracker queue overflow; dropped ${dropped} oldest writes`);
}
if (write.kind === 'event' || this.queue.length >= this.batchSize) {
this.scheduleFlush(0);
}
}
private flushTelemetry(force = false): void {
if (!this.sessionState || (!force && !this.sessionState.pendingTelemetry)) {
return;
}
this.recordWrite({
kind: 'telemetry',
sessionId: this.sessionState.sessionId,
sampleMs: Date.now(),
totalWatchedMs: this.sessionState.totalWatchedMs,
activeWatchedMs: this.sessionState.activeWatchedMs,
linesSeen: this.sessionState.linesSeen,
wordsSeen: this.sessionState.wordsSeen,
tokensSeen: this.sessionState.tokensSeen,
cardsMined: this.sessionState.cardsMined,
lookupCount: this.sessionState.lookupCount,
lookupHits: this.sessionState.lookupHits,
pauseCount: this.sessionState.pauseCount,
pauseMs: this.sessionState.pauseMs,
seekForwardCount: this.sessionState.seekForwardCount,
seekBackwardCount: this.sessionState.seekBackwardCount,
mediaBufferEvents: this.sessionState.mediaBufferEvents,
});
this.sessionState.pendingTelemetry = false;
}
private scheduleFlush(delayMs = this.flushIntervalMs): void {
if (this.flushScheduled || this.writeLock.locked) return;
this.flushScheduled = true;
this.flushTimer = setTimeout(() => {
this.flushScheduled = false;
this.flushNow();
}, delayMs);
}
private flushNow(): void {
if (this.writeLock.locked || this.isDestroyed) return;
if (this.queue.length === 0) {
this.flushScheduled = false;
return;
}
this.flushTelemetry();
if (this.queue.length === 0) {
this.flushScheduled = false;
return;
}
const batch = this.queue.splice(0, Math.min(this.batchSize, this.queue.length));
this.writeLock.locked = true;
try {
this.db.exec('BEGIN IMMEDIATE');
for (const write of batch) {
this.flushSingle(write);
}
this.db.exec('COMMIT');
} catch (error) {
this.db.exec('ROLLBACK');
this.queue.unshift(...batch);
this.logger.warn('Immersion tracker flush failed, retrying later', error as Error);
} finally {
this.writeLock.locked = false;
this.flushScheduled = false;
if (this.queue.length > 0) {
this.scheduleFlush(this.flushIntervalMs);
}
}
}
private flushSingle(write: QueuedWrite): void {
executeQueuedWrite(write, this.preparedStatements);
}
private scheduleMaintenance(): void {
this.maintenanceTimer = setInterval(() => {
this.runMaintenance();
}, this.maintenanceIntervalMs);
this.runMaintenance();
}
private runMaintenance(): void {
if (this.isDestroyed) return;
try {
this.flushTelemetry(true);
this.flushNow();
const nowMs = Date.now();
const retentionResult = pruneRetention(this.db, nowMs, {
eventsRetentionMs: this.eventsRetentionMs,
telemetryRetentionMs: this.telemetryRetentionMs,
dailyRollupRetentionMs: this.dailyRollupRetentionMs,
monthlyRollupRetentionMs: this.monthlyRollupRetentionMs,
});
const shouldRebuildRollups =
retentionResult.deletedTelemetryRows > 0 || retentionResult.deletedEndedSessions > 0;
this.runRollupMaintenance(shouldRebuildRollups);
if (nowMs - this.lastVacuumMs >= this.vacuumIntervalMs && !this.writeLock.locked) {
this.db.exec('VACUUM');
this.lastVacuumMs = nowMs;
}
} catch (error) {
this.logger.warn(
'Immersion tracker maintenance failed, will retry later',
(error as Error).message,
);
}
}
private runRollupMaintenance(forceRebuild = false): void {
runRollupMaintenance(this.db, forceRebuild);
}
private startSession(videoId: number, startedAtMs?: number): void {
const { sessionId, state } = startSessionRecord(this.db, videoId, startedAtMs);
this.sessionState = state;
this.recordedSubtitleKeys.clear();
this.recordWrite({
kind: 'telemetry',
sessionId,
sampleMs: state.startedAtMs,
totalWatchedMs: 0,
activeWatchedMs: 0,
linesSeen: 0,
wordsSeen: 0,
tokensSeen: 0,
cardsMined: 0,
lookupCount: 0,
lookupHits: 0,
pauseCount: 0,
pauseMs: 0,
seekForwardCount: 0,
seekBackwardCount: 0,
mediaBufferEvents: 0,
});
this.scheduleFlush(0);
}
private finalizeActiveSession(): void {
if (!this.sessionState) return;
const endedAt = Date.now();
if (this.sessionState.lastPauseStartMs) {
this.sessionState.pauseMs += Math.max(0, endedAt - this.sessionState.lastPauseStartMs);
this.sessionState.lastPauseStartMs = null;
}
const finalWallNow = endedAt;
if (this.sessionState.lastWallClockMs > 0) {
const wallDelta = finalWallNow - this.sessionState.lastWallClockMs;
if (wallDelta > 0 && wallDelta < 60_000) {
this.sessionState.totalWatchedMs += wallDelta;
if (!this.sessionState.isPaused) {
this.sessionState.activeWatchedMs += wallDelta;
}
}
}
this.flushTelemetry(true);
this.flushNow();
this.sessionState.pendingTelemetry = false;
finalizeSessionRecord(this.db, this.sessionState, endedAt);
this.sessionState = null;
}
private captureVideoMetadataAsync(videoId: number, sourceType: number, mediaPath: string): void {
if (sourceType !== SOURCE_TYPE_LOCAL) return;
void (async () => {
try {
const metadata = await getLocalVideoMetadata(mediaPath);
updateVideoMetadataRecord(this.db, videoId, metadata);
} catch (error) {
this.logger.warn('Unable to capture local video metadata', (error as Error).message);
}
})();
}
private captureAnimeMetadataAsync(
videoId: number,
mediaPath: string | null,
mediaTitle: string | null,
): void {
const updatePromise = (async () => {
try {
const parsed = await guessAnimeVideoMetadata(mediaPath, mediaTitle);
if (this.isDestroyed || !parsed?.parsedTitle.trim()) {
return;
}
const animeId = getOrCreateAnimeRecord(this.db, {
parsedTitle: parsed.parsedTitle,
canonicalTitle: parsed.parsedTitle,
anilistId: null,
titleRomaji: null,
titleEnglish: null,
titleNative: null,
metadataJson: parsed.parseMetadataJson,
});
linkVideoToAnimeRecord(this.db, videoId, {
animeId,
parsedBasename: parsed.parsedBasename,
parsedTitle: parsed.parsedTitle,
parsedSeason: parsed.parsedSeason,
parsedEpisode: parsed.parsedEpisode,
parserSource: parsed.parserSource,
parserConfidence: parsed.parserConfidence,
parseMetadataJson: parsed.parseMetadataJson,
});
} catch (error) {
this.logger.warn('Unable to capture anime metadata', (error as Error).message);
}
})();
this.pendingAnimeMetadataUpdates.set(videoId, updatePromise);
void updatePromise.finally(() => {
this.pendingAnimeMetadataUpdates.delete(videoId);
});
}
private updateVideoTitleForActiveSession(canonicalTitle: string): void {
if (!this.sessionState) return;
updateVideoTitleRecord(this.db, this.sessionState.videoId, canonicalTitle);
}
}