Files
SubMiner/src/core/services/immersion-tracker-service.ts

1303 lines
40 KiB
TypeScript

import path from 'node:path';
import * as fs from 'node:fs';
import { createLogger } from '../../logger';
import type { CoverArtFetcher } from './anilist/cover-art-fetcher';
import { getLocalVideoMetadata, guessAnimeVideoMetadata } from './immersion-tracker/metadata';
import {
pruneRawRetention,
pruneRollupRetention,
runOptimizeMaintenance,
runRollupMaintenance,
} from './immersion-tracker/maintenance';
import { Database, type DatabaseSync } from './immersion-tracker/sqlite';
import { finalizeSessionRecord, startSessionRecord } from './immersion-tracker/session';
import {
applyPragmas,
createTrackerPreparedStatements,
ensureSchema,
executeQueuedWrite,
getOrCreateAnimeRecord,
getOrCreateVideoRecord,
linkVideoToAnimeRecord,
type TrackerPreparedStatements,
updateVideoMetadataRecord,
updateVideoTitleRecord,
} from './immersion-tracker/storage';
import {
applySessionLifetimeSummary,
reconcileStaleActiveSessions,
rebuildLifetimeSummaries as rebuildLifetimeSummaryTables,
shouldBackfillLifetimeSummaries,
} from './immersion-tracker/lifetime';
import {
cleanupVocabularyStats,
getAnimeCoverArt,
getAnimeDailyRollups,
getAnimeAnilistEntries,
getAnimeDetail,
getAnimeEpisodes,
getAnimeLibrary,
getAnimeWords,
getEpisodeCardEvents,
getEpisodeSessions,
getEpisodeWords,
getCoverArt,
getDailyRollups,
getEpisodesPerDay,
getKanjiAnimeAppearances,
getKanjiDetail,
getKanjiWords,
getNewAnimePerDay,
getSimilarWords,
getStreakCalendar,
getKanjiOccurrences,
getKanjiStats,
getMediaDailyRollups,
getMediaDetail,
getMediaLibrary,
getMediaSessions,
getMonthlyRollups,
getQueryHints,
getSessionEvents,
getSessionSummaries,
getSessionTimeline,
getSessionWordsByLine,
getTrendsDashboard,
getAllDistinctHeadwords,
getAnimeDistinctHeadwords,
getMediaDistinctHeadwords,
getVocabularyStats,
getWatchTimePerAnime,
getWordAnimeAppearances,
getWordDetail,
getWordOccurrences,
getVideoDurationMs,
upsertCoverArt,
markVideoWatched,
deleteSession as deleteSessionQuery,
deleteSessions as deleteSessionsQuery,
deleteVideo as deleteVideoQuery,
} from './immersion-tracker/query';
import {
buildVideoKey,
deriveCanonicalTitle,
isKanji,
isRemoteSource,
normalizeMediaPath,
normalizeText,
resolveBoundedInt,
sanitizePayload,
secToMs,
} from './immersion-tracker/reducer';
import { DEFAULT_MIN_WATCH_RATIO } from '../../shared/watch-threshold';
import { enqueueWrite } from './immersion-tracker/queue';
import {
DEFAULT_BATCH_SIZE,
DEFAULT_DAILY_ROLLUP_RETENTION_MS,
DEFAULT_EVENTS_RETENTION_MS,
DEFAULT_FLUSH_INTERVAL_MS,
DEFAULT_MAINTENANCE_INTERVAL_MS,
DEFAULT_MAX_PAYLOAD_BYTES,
DEFAULT_MONTHLY_ROLLUP_RETENTION_MS,
DEFAULT_QUEUE_CAP,
DEFAULT_SESSIONS_RETENTION_MS,
DEFAULT_TELEMETRY_RETENTION_MS,
DEFAULT_VACUUM_INTERVAL_MS,
EVENT_CARD_MINED,
EVENT_LOOKUP,
EVENT_MEDIA_BUFFER,
EVENT_PAUSE_END,
EVENT_PAUSE_START,
EVENT_SEEK_BACKWARD,
EVENT_SEEK_FORWARD,
EVENT_SUBTITLE_LINE,
EVENT_YOMITAN_LOOKUP,
SOURCE_TYPE_LOCAL,
SOURCE_TYPE_REMOTE,
type ImmersionSessionRollupRow,
type EpisodeCardEventRow,
type EpisodesPerDayRow,
type ImmersionTrackerOptions,
type KanjiAnimeAppearanceRow,
type KanjiDetailRow,
type KanjiOccurrenceRow,
type KanjiStatsRow,
type KanjiWordRow,
type LifetimeRebuildSummary,
type LegacyVocabularyPosResolution,
type LegacyVocabularyPosRow,
type AnimeAnilistEntryRow,
type AnimeDetailRow,
type AnimeEpisodeRow,
type AnimeLibraryRow,
type AnimeWordRow,
type MediaArtRow,
type MediaDetailRow,
type MediaLibraryRow,
type NewAnimePerDayRow,
type QueuedWrite,
type SessionEventRow,
type SessionState,
type SessionSummaryQueryRow,
type SessionTimelineRow,
type SimilarWordRow,
type StreakCalendarRow,
type VocabularyCleanupSummary,
type WatchTimePerAnimeRow,
type WordAnimeAppearanceRow,
type WordDetailRow,
type WordOccurrenceRow,
type VocabularyStatsRow,
type CountedWordOccurrence,
} from './immersion-tracker/types';
import type { MergedToken } from '../../types';
import { shouldExcludeTokenFromVocabularyPersistence } from './tokenizer/annotation-stage';
import { deriveStoredPartOfSpeech } from './tokenizer/part-of-speech';
export type {
AnimeAnilistEntryRow,
AnimeDetailRow,
AnimeEpisodeRow,
AnimeLibraryRow,
AnimeWordRow,
EpisodeCardEventRow,
EpisodesPerDayRow,
ImmersionSessionRollupRow,
ImmersionTrackerOptions,
ImmersionTrackerPolicy,
KanjiAnimeAppearanceRow,
KanjiDetailRow,
KanjiOccurrenceRow,
KanjiStatsRow,
KanjiWordRow,
MediaArtRow,
MediaDetailRow,
MediaLibraryRow,
NewAnimePerDayRow,
SessionEventRow,
SessionSummaryQueryRow,
SessionTimelineRow,
SimilarWordRow,
StreakCalendarRow,
WatchTimePerAnimeRow,
WordAnimeAppearanceRow,
WordDetailRow,
WordOccurrenceRow,
VocabularyStatsRow,
} from './immersion-tracker/types';
export class ImmersionTrackerService {
private readonly logger = createLogger('main:immersion-tracker');
private readonly db: DatabaseSync;
private readonly queue: QueuedWrite[] = [];
private readonly queueCap: number;
private readonly batchSize: number;
private readonly flushIntervalMs: number;
private readonly maintenanceIntervalMs: number;
private readonly maxPayloadBytes: number;
private readonly eventsRetentionMs: number;
private readonly telemetryRetentionMs: number;
private readonly sessionsRetentionMs: number;
private readonly dailyRollupRetentionMs: number;
private readonly monthlyRollupRetentionMs: number;
private readonly vacuumIntervalMs: number;
private readonly dbPath: string;
private readonly writeLock = { locked: false };
private flushTimer: ReturnType<typeof setTimeout> | null = null;
private maintenanceTimer: ReturnType<typeof setInterval> | null = null;
private flushScheduled = false;
private droppedWriteCount = 0;
private lastVacuumMs = 0;
private isDestroyed = false;
private sessionState: SessionState | null = null;
private currentVideoKey = '';
private currentMediaPathOrUrl = '';
private readonly preparedStatements: TrackerPreparedStatements;
private coverArtFetcher: CoverArtFetcher | null = null;
private readonly pendingCoverFetches = new Map<number, Promise<boolean>>();
private readonly recordedSubtitleKeys = new Set<string>();
private readonly pendingAnimeMetadataUpdates = new Map<number, Promise<void>>();
private readonly resolveLegacyVocabularyPos:
| ((row: LegacyVocabularyPosRow) => Promise<LegacyVocabularyPosResolution | null>)
| undefined;
constructor(options: ImmersionTrackerOptions) {
this.dbPath = options.dbPath;
this.resolveLegacyVocabularyPos = options.resolveLegacyVocabularyPos;
const parentDir = path.dirname(this.dbPath);
if (!fs.existsSync(parentDir)) {
fs.mkdirSync(parentDir, { recursive: true });
}
const policy = options.policy ?? {};
this.queueCap = resolveBoundedInt(policy.queueCap, DEFAULT_QUEUE_CAP, 100, 100_000);
this.batchSize = resolveBoundedInt(policy.batchSize, DEFAULT_BATCH_SIZE, 1, 10_000);
this.flushIntervalMs = resolveBoundedInt(
policy.flushIntervalMs,
DEFAULT_FLUSH_INTERVAL_MS,
50,
60_000,
);
this.maintenanceIntervalMs = resolveBoundedInt(
policy.maintenanceIntervalMs,
DEFAULT_MAINTENANCE_INTERVAL_MS,
60_000,
7 * 24 * 60 * 60 * 1000,
);
this.maxPayloadBytes = resolveBoundedInt(
policy.payloadCapBytes,
DEFAULT_MAX_PAYLOAD_BYTES,
64,
8192,
);
const retention = policy.retention ?? {};
const daysToRetentionMs = (
value: number | undefined,
fallbackMs: number,
maxDays: number,
): number => {
const fallbackDays = Math.floor(fallbackMs / 86_400_000);
const resolvedDays = resolveBoundedInt(value, fallbackDays, 0, maxDays);
return resolvedDays === 0 ? Number.POSITIVE_INFINITY : resolvedDays * 86_400_000;
};
this.eventsRetentionMs = daysToRetentionMs(
retention.eventsDays,
DEFAULT_EVENTS_RETENTION_MS,
3650,
);
this.telemetryRetentionMs = daysToRetentionMs(
retention.telemetryDays,
DEFAULT_TELEMETRY_RETENTION_MS,
3650,
);
this.sessionsRetentionMs = daysToRetentionMs(
retention.sessionsDays,
DEFAULT_SESSIONS_RETENTION_MS,
3650,
);
this.dailyRollupRetentionMs = daysToRetentionMs(
retention.dailyRollupsDays,
DEFAULT_DAILY_ROLLUP_RETENTION_MS,
36500,
);
this.monthlyRollupRetentionMs = daysToRetentionMs(
retention.monthlyRollupsDays,
DEFAULT_MONTHLY_ROLLUP_RETENTION_MS,
36500,
);
this.vacuumIntervalMs = daysToRetentionMs(
retention.vacuumIntervalDays,
DEFAULT_VACUUM_INTERVAL_MS,
3650,
);
this.db = new Database(this.dbPath);
applyPragmas(this.db);
ensureSchema(this.db);
const reconciledSessions = reconcileStaleActiveSessions(this.db);
if (reconciledSessions > 0) {
this.logger.info(
`Recovered stale active sessions on startup: reconciledSessions=${reconciledSessions}`,
);
}
if (shouldBackfillLifetimeSummaries(this.db)) {
const result = rebuildLifetimeSummaryTables(this.db);
if (result.appliedSessions > 0) {
this.logger.info(
`Backfilled lifetime summaries from retained sessions: appliedSessions=${result.appliedSessions}`,
);
}
}
this.preparedStatements = createTrackerPreparedStatements(this.db);
this.scheduleMaintenance();
this.scheduleFlush();
}
destroy(): void {
if (this.isDestroyed) return;
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
if (this.maintenanceTimer) {
clearInterval(this.maintenanceTimer);
this.maintenanceTimer = null;
}
this.finalizeActiveSession();
this.isDestroyed = true;
this.db.close();
}
async getSessionSummaries(limit = 50): Promise<SessionSummaryQueryRow[]> {
return getSessionSummaries(this.db, limit);
}
async getSessionTimeline(sessionId: number, limit?: number): Promise<SessionTimelineRow[]> {
return getSessionTimeline(this.db, sessionId, limit);
}
async getSessionWordsByLine(
sessionId: number,
): Promise<Array<{ lineIndex: number; headword: string; occurrenceCount: number }>> {
return getSessionWordsByLine(this.db, sessionId);
}
async getAllDistinctHeadwords(): Promise<string[]> {
return getAllDistinctHeadwords(this.db);
}
async getAnimeDistinctHeadwords(animeId: number): Promise<string[]> {
return getAnimeDistinctHeadwords(this.db, animeId);
}
async getMediaDistinctHeadwords(videoId: number): Promise<string[]> {
return getMediaDistinctHeadwords(this.db, videoId);
}
async getQueryHints(): Promise<{
totalSessions: number;
activeSessions: number;
episodesToday: number;
activeAnimeCount: number;
totalEpisodesWatched: number;
totalAnimeCompleted: number;
totalActiveMin: number;
totalCards: number;
activeDays: number;
totalTokensSeen: number;
totalLookupCount: number;
totalLookupHits: number;
totalYomitanLookupCount: number;
newWordsToday: number;
newWordsThisWeek: number;
}> {
return getQueryHints(this.db);
}
async getDailyRollups(limit = 60): Promise<ImmersionSessionRollupRow[]> {
return getDailyRollups(this.db, limit);
}
async getMonthlyRollups(limit = 24): Promise<ImmersionSessionRollupRow[]> {
return getMonthlyRollups(this.db, limit);
}
async getTrendsDashboard(
range: '7d' | '30d' | '90d' | 'all' = '30d',
groupBy: 'day' | 'month' = 'day',
): Promise<unknown> {
return getTrendsDashboard(this.db, range, groupBy);
}
async getVocabularyStats(limit = 100, excludePos?: string[]): Promise<VocabularyStatsRow[]> {
return getVocabularyStats(this.db, limit, excludePos);
}
async cleanupVocabularyStats(): Promise<VocabularyCleanupSummary> {
return cleanupVocabularyStats(this.db, {
resolveLegacyPos: this.resolveLegacyVocabularyPos,
});
}
async rebuildLifetimeSummaries(): Promise<LifetimeRebuildSummary> {
this.flushTelemetry(true);
this.flushNow();
return rebuildLifetimeSummaryTables(this.db);
}
async getKanjiStats(limit = 100): Promise<KanjiStatsRow[]> {
return getKanjiStats(this.db, limit);
}
async getWordOccurrences(
headword: string,
word: string,
reading: string,
limit = 100,
offset = 0,
): Promise<WordOccurrenceRow[]> {
return getWordOccurrences(this.db, headword, word, reading, limit, offset);
}
async getKanjiOccurrences(kanji: string, limit = 100, offset = 0): Promise<KanjiOccurrenceRow[]> {
return getKanjiOccurrences(this.db, kanji, limit, offset);
}
async getSessionEvents(
sessionId: number,
limit = 500,
eventTypes?: number[],
): Promise<SessionEventRow[]> {
return getSessionEvents(this.db, sessionId, limit, eventTypes);
}
async getMediaLibrary(): Promise<MediaLibraryRow[]> {
return getMediaLibrary(this.db);
}
async getMediaDetail(videoId: number): Promise<MediaDetailRow | null> {
return getMediaDetail(this.db, videoId);
}
async getMediaSessions(videoId: number, limit = 100): Promise<SessionSummaryQueryRow[]> {
return getMediaSessions(this.db, videoId, limit);
}
async getMediaDailyRollups(videoId: number, limit = 90): Promise<ImmersionSessionRollupRow[]> {
return getMediaDailyRollups(this.db, videoId, limit);
}
async getCoverArt(videoId: number): Promise<MediaArtRow | null> {
return getCoverArt(this.db, videoId);
}
async getAnimeLibrary(): Promise<AnimeLibraryRow[]> {
return getAnimeLibrary(this.db);
}
async getAnimeDetail(animeId: number): Promise<AnimeDetailRow | null> {
return getAnimeDetail(this.db, animeId);
}
async getAnimeEpisodes(animeId: number): Promise<AnimeEpisodeRow[]> {
return getAnimeEpisodes(this.db, animeId);
}
async getAnimeAnilistEntries(animeId: number): Promise<AnimeAnilistEntryRow[]> {
return getAnimeAnilistEntries(this.db, animeId);
}
async getAnimeCoverArt(animeId: number): Promise<MediaArtRow | null> {
return getAnimeCoverArt(this.db, animeId);
}
async getAnimeWords(animeId: number, limit = 50): Promise<AnimeWordRow[]> {
return getAnimeWords(this.db, animeId, limit);
}
async getEpisodeWords(videoId: number, limit = 50): Promise<AnimeWordRow[]> {
return getEpisodeWords(this.db, videoId, limit);
}
async getEpisodeSessions(videoId: number): Promise<SessionSummaryQueryRow[]> {
return getEpisodeSessions(this.db, videoId);
}
async setVideoWatched(videoId: number, watched: boolean): Promise<void> {
markVideoWatched(this.db, videoId, watched);
}
async markActiveVideoWatched(): Promise<boolean> {
if (!this.sessionState) return false;
markVideoWatched(this.db, this.sessionState.videoId, true);
this.sessionState.markedWatched = true;
return true;
}
async deleteSession(sessionId: number): Promise<void> {
if (this.sessionState?.sessionId === sessionId) {
this.logger.warn(`Ignoring delete request for active immersion session ${sessionId}`);
return;
}
deleteSessionQuery(this.db, sessionId);
}
async deleteSessions(sessionIds: number[]): Promise<void> {
const activeSessionId = this.sessionState?.sessionId;
const deletableSessionIds =
activeSessionId === undefined
? sessionIds
: sessionIds.filter((sessionId) => sessionId !== activeSessionId);
if (deletableSessionIds.length !== sessionIds.length) {
this.logger.warn(
`Ignoring bulk delete request for active immersion session ${activeSessionId}`,
);
}
deleteSessionsQuery(this.db, deletableSessionIds);
}
async deleteVideo(videoId: number): Promise<void> {
if (this.sessionState?.videoId === videoId) {
this.logger.warn(`Ignoring delete request for active immersion video ${videoId}`);
return;
}
deleteVideoQuery(this.db, videoId);
}
async reassignAnimeAnilist(
animeId: number,
info: {
anilistId: number;
titleRomaji?: string | null;
titleEnglish?: string | null;
titleNative?: string | null;
episodesTotal?: number | null;
description?: string | null;
coverUrl?: string | null;
},
): Promise<void> {
this.db
.prepare(
`
UPDATE imm_anime
SET anilist_id = ?,
title_romaji = COALESCE(?, title_romaji),
title_english = COALESCE(?, title_english),
title_native = COALESCE(?, title_native),
episodes_total = COALESCE(?, episodes_total),
description = CASE WHEN ? = 1 THEN ? ELSE description END,
LAST_UPDATE_DATE = ?
WHERE anime_id = ?
`,
)
.run(
info.anilistId,
info.titleRomaji ?? null,
info.titleEnglish ?? null,
info.titleNative ?? null,
info.episodesTotal ?? null,
info.description !== undefined ? 1 : 0,
info.description ?? null,
Date.now(),
animeId,
);
// Update cover art for all videos in this anime
if (info.coverUrl) {
const videos = this.db
.prepare('SELECT video_id FROM imm_videos WHERE anime_id = ?')
.all(animeId) as Array<{ video_id: number }>;
let coverBlob: Buffer | null = null;
try {
const res = await fetch(info.coverUrl);
if (res.ok) {
coverBlob = Buffer.from(await res.arrayBuffer());
}
} catch {
/* ignore */
}
for (const v of videos) {
upsertCoverArt(this.db, v.video_id, {
anilistId: info.anilistId,
coverUrl: info.coverUrl,
coverBlob,
titleRomaji: info.titleRomaji ?? null,
titleEnglish: info.titleEnglish ?? null,
episodesTotal: info.episodesTotal ?? null,
});
}
}
}
async getEpisodeCardEvents(videoId: number): Promise<EpisodeCardEventRow[]> {
return getEpisodeCardEvents(this.db, videoId);
}
async getAnimeDailyRollups(animeId: number, limit = 90): Promise<ImmersionSessionRollupRow[]> {
return getAnimeDailyRollups(this.db, animeId, limit);
}
async getStreakCalendar(days = 90): Promise<StreakCalendarRow[]> {
return getStreakCalendar(this.db, days);
}
async getEpisodesPerDay(limit = 90): Promise<EpisodesPerDayRow[]> {
return getEpisodesPerDay(this.db, limit);
}
async getNewAnimePerDay(limit = 90): Promise<NewAnimePerDayRow[]> {
return getNewAnimePerDay(this.db, limit);
}
async getWatchTimePerAnime(limit = 90): Promise<WatchTimePerAnimeRow[]> {
return getWatchTimePerAnime(this.db, limit);
}
async getWordDetail(wordId: number): Promise<WordDetailRow | null> {
return getWordDetail(this.db, wordId);
}
async getWordAnimeAppearances(wordId: number): Promise<WordAnimeAppearanceRow[]> {
return getWordAnimeAppearances(this.db, wordId);
}
async getSimilarWords(wordId: number, limit = 10): Promise<SimilarWordRow[]> {
return getSimilarWords(this.db, wordId, limit);
}
async getKanjiDetail(kanjiId: number): Promise<KanjiDetailRow | null> {
return getKanjiDetail(this.db, kanjiId);
}
async getKanjiAnimeAppearances(kanjiId: number): Promise<KanjiAnimeAppearanceRow[]> {
return getKanjiAnimeAppearances(this.db, kanjiId);
}
async getKanjiWords(kanjiId: number, limit = 20): Promise<KanjiWordRow[]> {
return getKanjiWords(this.db, kanjiId, limit);
}
setCoverArtFetcher(fetcher: CoverArtFetcher | null): void {
this.coverArtFetcher = fetcher;
}
async ensureCoverArt(videoId: number): Promise<boolean> {
const existing = await this.getCoverArt(videoId);
if (existing?.coverBlob) {
return true;
}
if (!this.coverArtFetcher) {
return false;
}
const inFlight = this.pendingCoverFetches.get(videoId);
if (inFlight) {
return await inFlight;
}
const fetchPromise = (async () => {
const detail = getMediaDetail(this.db, videoId);
const canonicalTitle = detail?.canonicalTitle?.trim();
if (!canonicalTitle) {
return false;
}
const fetched = await this.coverArtFetcher!.fetchIfMissing(this.db, videoId, canonicalTitle);
if (!fetched) {
return false;
}
const cover = await this.getCoverArt(videoId);
return cover?.coverBlob != null;
})();
this.pendingCoverFetches.set(videoId, fetchPromise);
try {
return await fetchPromise;
} finally {
this.pendingCoverFetches.delete(videoId);
}
}
handleMediaChange(mediaPath: string | null, mediaTitle: string | null): void {
const normalizedPath = normalizeMediaPath(mediaPath);
const normalizedTitle = normalizeText(mediaTitle);
this.logger.info(
`handleMediaChange called with path=${normalizedPath || '<empty>'} title=${normalizedTitle || '<empty>'}`,
);
if (normalizedPath === this.currentMediaPathOrUrl) {
if (normalizedTitle && normalizedTitle !== this.currentVideoKey) {
this.currentVideoKey = normalizedTitle;
this.updateVideoTitleForActiveSession(normalizedTitle);
this.logger.debug('Media title updated for existing session');
} else {
this.logger.debug('Media change ignored; path unchanged');
}
return;
}
this.finalizeActiveSession();
this.currentMediaPathOrUrl = normalizedPath;
this.currentVideoKey = normalizedTitle;
if (!normalizedPath) {
this.logger.info('Media path cleared; immersion session tracking paused');
return;
}
const sourceType = isRemoteSource(normalizedPath) ? SOURCE_TYPE_REMOTE : SOURCE_TYPE_LOCAL;
const videoKey = buildVideoKey(normalizedPath, sourceType);
const canonicalTitle = normalizedTitle || deriveCanonicalTitle(normalizedPath);
const sourcePath = sourceType === SOURCE_TYPE_LOCAL ? normalizedPath : null;
const sourceUrl = sourceType === SOURCE_TYPE_REMOTE ? normalizedPath : null;
const sessionInfo = {
videoId: getOrCreateVideoRecord(this.db, videoKey, {
canonicalTitle,
sourcePath,
sourceUrl,
sourceType,
}),
startedAtMs: Date.now(),
};
this.logger.info(
`Starting immersion session for path=${normalizedPath} videoId=${sessionInfo.videoId}`,
);
this.startSession(sessionInfo.videoId, sessionInfo.startedAtMs);
this.captureAnimeMetadataAsync(sessionInfo.videoId, normalizedPath, normalizedTitle || null);
this.captureVideoMetadataAsync(sessionInfo.videoId, sourceType, normalizedPath);
}
handleMediaTitleUpdate(mediaTitle: string | null): void {
if (!this.sessionState) return;
const normalizedTitle = normalizeText(mediaTitle);
if (!normalizedTitle) return;
this.currentVideoKey = normalizedTitle;
this.updateVideoTitleForActiveSession(normalizedTitle);
}
recordSubtitleLine(
text: string,
startSec: number,
endSec: number,
tokens?: MergedToken[] | null,
secondaryText?: string | null,
): void {
if (!this.sessionState || !text.trim()) return;
const cleaned = normalizeText(text);
if (!cleaned) return;
if (!endSec || endSec <= 0) {
return;
}
const startMs = secToMs(startSec);
const subtitleKey = `${startMs}:${cleaned}`;
if (this.recordedSubtitleKeys.has(subtitleKey)) {
return;
}
this.recordedSubtitleKeys.add(subtitleKey);
const nowMs = Date.now();
const nowSec = nowMs / 1000;
const tokenCount = tokens?.length ?? 0;
this.sessionState.currentLineIndex += 1;
this.sessionState.linesSeen += 1;
this.sessionState.tokensSeen += tokenCount;
this.sessionState.pendingTelemetry = true;
const wordOccurrences = new Map<string, CountedWordOccurrence>();
for (const token of tokens ?? []) {
if (shouldExcludeTokenFromVocabularyPersistence(token)) {
continue;
}
const headword = normalizeText(token.headword || token.surface);
const word = normalizeText(token.surface || token.headword);
const reading = normalizeText(token.reading);
if (!headword || !word) {
continue;
}
const wordKey = [headword, word, reading].join('\u0000');
const storedPartOfSpeech = deriveStoredPartOfSpeech({
partOfSpeech: token.partOfSpeech,
pos1: token.pos1 ?? '',
});
const existing = wordOccurrences.get(wordKey);
if (existing) {
existing.occurrenceCount += 1;
continue;
}
wordOccurrences.set(wordKey, {
headword,
word,
reading,
partOfSpeech: storedPartOfSpeech,
pos1: token.pos1 ?? '',
pos2: token.pos2 ?? '',
pos3: token.pos3 ?? '',
occurrenceCount: 1,
frequencyRank: token.frequencyRank ?? null,
});
}
const kanjiCounts = new Map<string, number>();
for (const char of cleaned) {
if (!isKanji(char)) {
continue;
}
kanjiCounts.set(char, (kanjiCounts.get(char) ?? 0) + 1);
}
this.recordWrite({
kind: 'subtitleLine',
sessionId: this.sessionState.sessionId,
videoId: this.sessionState.videoId,
lineIndex: this.sessionState.currentLineIndex,
segmentStartMs: secToMs(startSec),
segmentEndMs: secToMs(endSec),
text: cleaned,
secondaryText: secondaryText ?? null,
wordOccurrences: Array.from(wordOccurrences.values()),
kanjiOccurrences: Array.from(kanjiCounts.entries()).map(([kanji, occurrenceCount]) => ({
kanji,
occurrenceCount,
})),
firstSeen: nowSec,
lastSeen: nowSec,
});
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: nowMs,
lineIndex: this.sessionState.currentLineIndex,
segmentStartMs: secToMs(startSec),
segmentEndMs: secToMs(endSec),
tokensDelta: tokenCount,
cardsDelta: 0,
eventType: EVENT_SUBTITLE_LINE,
payloadJson: sanitizePayload(
{
event: 'subtitle-line',
tokens: tokenCount,
},
this.maxPayloadBytes,
),
});
}
recordMediaDuration(durationSec: number): void {
if (!this.sessionState || !Number.isFinite(durationSec) || durationSec <= 0) return;
const durationMs = Math.round(durationSec * 1000);
const current = getVideoDurationMs(this.db, this.sessionState.videoId);
if (current === 0 || Math.abs(current - durationMs) > 1000) {
this.db
.prepare('UPDATE imm_videos SET duration_ms = ?, LAST_UPDATE_DATE = ? WHERE video_id = ?')
.run(durationMs, Date.now(), this.sessionState.videoId);
}
}
recordPlaybackPosition(mediaTimeSec: number | null): void {
if (!this.sessionState || mediaTimeSec === null || !Number.isFinite(mediaTimeSec)) {
return;
}
const nowMs = Date.now();
const mediaMs = Math.round(mediaTimeSec * 1000);
if (this.sessionState.lastWallClockMs <= 0) {
this.sessionState.lastWallClockMs = nowMs;
this.sessionState.lastMediaMs = mediaMs;
return;
}
const wallDeltaMs = nowMs - this.sessionState.lastWallClockMs;
if (wallDeltaMs > 0 && wallDeltaMs < 60_000) {
this.sessionState.totalWatchedMs += wallDeltaMs;
if (!this.sessionState.isPaused) {
this.sessionState.activeWatchedMs += wallDeltaMs;
}
}
if (this.sessionState.lastMediaMs !== null) {
const mediaDeltaMs = mediaMs - this.sessionState.lastMediaMs;
if (Math.abs(mediaDeltaMs) >= 1_000) {
if (mediaDeltaMs > 0) {
this.sessionState.seekForwardCount += 1;
this.sessionState.pendingTelemetry = true;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: nowMs,
eventType: EVENT_SEEK_FORWARD,
tokensDelta: 0,
cardsDelta: 0,
segmentStartMs: this.sessionState.lastMediaMs,
segmentEndMs: mediaMs,
payloadJson: sanitizePayload(
{
fromMs: this.sessionState.lastMediaMs,
toMs: mediaMs,
},
this.maxPayloadBytes,
),
});
} else if (mediaDeltaMs < 0) {
this.sessionState.seekBackwardCount += 1;
this.sessionState.pendingTelemetry = true;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: nowMs,
eventType: EVENT_SEEK_BACKWARD,
tokensDelta: 0,
cardsDelta: 0,
segmentStartMs: this.sessionState.lastMediaMs,
segmentEndMs: mediaMs,
payloadJson: sanitizePayload(
{
fromMs: this.sessionState.lastMediaMs,
toMs: mediaMs,
},
this.maxPayloadBytes,
),
});
}
}
}
this.sessionState.lastWallClockMs = nowMs;
this.sessionState.lastMediaMs = mediaMs;
this.sessionState.pendingTelemetry = true;
if (!this.sessionState.markedWatched) {
const durationMs = getVideoDurationMs(this.db, this.sessionState.videoId);
if (durationMs > 0 && mediaMs >= durationMs * DEFAULT_MIN_WATCH_RATIO) {
markVideoWatched(this.db, this.sessionState.videoId, true);
this.sessionState.markedWatched = true;
}
}
}
recordPauseState(isPaused: boolean): void {
if (!this.sessionState) return;
if (this.sessionState.isPaused === isPaused) return;
const nowMs = Date.now();
this.sessionState.isPaused = isPaused;
if (isPaused) {
this.sessionState.lastPauseStartMs = nowMs;
this.sessionState.pauseCount += 1;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: nowMs,
eventType: EVENT_PAUSE_START,
cardsDelta: 0,
tokensDelta: 0,
payloadJson: sanitizePayload({ paused: true }, this.maxPayloadBytes),
});
} else {
if (this.sessionState.lastPauseStartMs) {
const pauseMs = Math.max(0, nowMs - this.sessionState.lastPauseStartMs);
this.sessionState.pauseMs += pauseMs;
this.sessionState.lastPauseStartMs = null;
}
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: nowMs,
eventType: EVENT_PAUSE_END,
cardsDelta: 0,
tokensDelta: 0,
payloadJson: sanitizePayload({ paused: false }, this.maxPayloadBytes),
});
}
this.sessionState.pendingTelemetry = true;
}
recordLookup(hit: boolean): void {
if (!this.sessionState) return;
this.sessionState.lookupCount += 1;
if (hit) {
this.sessionState.lookupHits += 1;
}
this.sessionState.pendingTelemetry = true;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: Date.now(),
eventType: EVENT_LOOKUP,
cardsDelta: 0,
tokensDelta: 0,
payloadJson: sanitizePayload(
{
hit,
},
this.maxPayloadBytes,
),
});
}
recordYomitanLookup(): void {
if (!this.sessionState) return;
this.sessionState.yomitanLookupCount += 1;
this.sessionState.pendingTelemetry = true;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: Date.now(),
eventType: EVENT_YOMITAN_LOOKUP,
cardsDelta: 0,
tokensDelta: 0,
payloadJson: null,
});
}
recordCardsMined(count = 1, noteIds?: number[]): void {
if (!this.sessionState) return;
this.sessionState.cardsMined += count;
this.sessionState.pendingTelemetry = true;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: Date.now(),
eventType: EVENT_CARD_MINED,
tokensDelta: 0,
cardsDelta: count,
payloadJson: sanitizePayload(
{ cardsMined: count, ...(noteIds?.length ? { noteIds } : {}) },
this.maxPayloadBytes,
),
});
}
recordMediaBufferEvent(): void {
if (!this.sessionState) return;
this.sessionState.mediaBufferEvents += 1;
this.sessionState.pendingTelemetry = true;
this.recordWrite({
kind: 'event',
sessionId: this.sessionState.sessionId,
sampleMs: Date.now(),
eventType: EVENT_MEDIA_BUFFER,
cardsDelta: 0,
tokensDelta: 0,
payloadJson: sanitizePayload(
{
buffer: true,
},
this.maxPayloadBytes,
),
});
}
private recordWrite(write: QueuedWrite): void {
if (this.isDestroyed) return;
const { dropped } = enqueueWrite(this.queue, write, this.queueCap);
if (dropped > 0) {
this.droppedWriteCount += dropped;
this.logger.warn(`Immersion tracker queue overflow; dropped ${dropped} oldest writes`);
}
if (write.kind === 'event' || this.queue.length >= this.batchSize) {
this.scheduleFlush(0);
}
}
private flushTelemetry(force = false): void {
if (!this.sessionState || (!force && !this.sessionState.pendingTelemetry)) {
return;
}
this.recordWrite({
kind: 'telemetry',
sessionId: this.sessionState.sessionId,
sampleMs: Date.now(),
lastMediaMs: this.sessionState.lastMediaMs,
totalWatchedMs: this.sessionState.totalWatchedMs,
activeWatchedMs: this.sessionState.activeWatchedMs,
linesSeen: this.sessionState.linesSeen,
tokensSeen: this.sessionState.tokensSeen,
cardsMined: this.sessionState.cardsMined,
lookupCount: this.sessionState.lookupCount,
lookupHits: this.sessionState.lookupHits,
yomitanLookupCount: this.sessionState.yomitanLookupCount,
pauseCount: this.sessionState.pauseCount,
pauseMs: this.sessionState.pauseMs,
seekForwardCount: this.sessionState.seekForwardCount,
seekBackwardCount: this.sessionState.seekBackwardCount,
mediaBufferEvents: this.sessionState.mediaBufferEvents,
});
this.sessionState.pendingTelemetry = false;
}
private scheduleFlush(delayMs = this.flushIntervalMs): void {
if (this.flushScheduled || this.writeLock.locked) return;
this.flushScheduled = true;
this.flushTimer = setTimeout(() => {
this.flushScheduled = false;
this.flushNow();
}, delayMs);
}
private flushNow(): void {
if (this.writeLock.locked || this.isDestroyed) return;
if (this.queue.length === 0) {
this.flushScheduled = false;
return;
}
this.flushTelemetry();
if (this.queue.length === 0) {
this.flushScheduled = false;
return;
}
const batch = this.queue.splice(0, Math.min(this.batchSize, this.queue.length));
this.writeLock.locked = true;
try {
this.db.exec('BEGIN IMMEDIATE');
for (const write of batch) {
this.flushSingle(write);
}
this.db.exec('COMMIT');
} catch (error) {
this.db.exec('ROLLBACK');
this.queue.unshift(...batch);
this.logger.warn('Immersion tracker flush failed, retrying later', error as Error);
} finally {
this.writeLock.locked = false;
this.flushScheduled = false;
if (this.queue.length > 0) {
this.scheduleFlush(this.flushIntervalMs);
}
}
}
private flushSingle(write: QueuedWrite): void {
executeQueuedWrite(write, this.preparedStatements);
}
private scheduleMaintenance(): void {
this.maintenanceTimer = setInterval(() => {
this.runMaintenance();
}, this.maintenanceIntervalMs);
this.runMaintenance();
}
private runMaintenance(): void {
if (this.isDestroyed) return;
try {
this.flushTelemetry(true);
this.flushNow();
const nowMs = Date.now();
this.runRollupMaintenance(false);
if (
Number.isFinite(this.eventsRetentionMs) ||
Number.isFinite(this.telemetryRetentionMs) ||
Number.isFinite(this.sessionsRetentionMs)
) {
pruneRawRetention(this.db, nowMs, {
eventsRetentionMs: this.eventsRetentionMs,
telemetryRetentionMs: this.telemetryRetentionMs,
sessionsRetentionMs: this.sessionsRetentionMs,
});
}
if (
Number.isFinite(this.dailyRollupRetentionMs) ||
Number.isFinite(this.monthlyRollupRetentionMs)
) {
pruneRollupRetention(this.db, nowMs, {
dailyRollupRetentionMs: this.dailyRollupRetentionMs,
monthlyRollupRetentionMs: this.monthlyRollupRetentionMs,
});
}
if (
this.vacuumIntervalMs > 0 &&
nowMs - this.lastVacuumMs >= this.vacuumIntervalMs &&
!this.writeLock.locked
) {
this.db.exec('VACUUM');
this.lastVacuumMs = nowMs;
}
runOptimizeMaintenance(this.db);
} catch (error) {
this.logger.warn(
'Immersion tracker maintenance failed, will retry later',
(error as Error).message,
);
}
}
private runRollupMaintenance(forceRebuild = false): void {
runRollupMaintenance(this.db, forceRebuild);
}
private startSession(videoId: number, startedAtMs?: number): void {
const { sessionId, state } = startSessionRecord(this.db, videoId, startedAtMs);
this.sessionState = state;
this.recordedSubtitleKeys.clear();
this.recordWrite({
kind: 'telemetry',
sessionId,
sampleMs: state.startedAtMs,
totalWatchedMs: 0,
activeWatchedMs: 0,
linesSeen: 0,
tokensSeen: 0,
cardsMined: 0,
lookupCount: 0,
lookupHits: 0,
yomitanLookupCount: 0,
pauseCount: 0,
pauseMs: 0,
seekForwardCount: 0,
seekBackwardCount: 0,
mediaBufferEvents: 0,
});
this.scheduleFlush(0);
}
private finalizeActiveSession(): void {
if (!this.sessionState) return;
const endedAt = Date.now();
if (this.sessionState.lastPauseStartMs) {
this.sessionState.pauseMs += Math.max(0, endedAt - this.sessionState.lastPauseStartMs);
this.sessionState.lastPauseStartMs = null;
}
const finalWallNow = endedAt;
if (this.sessionState.lastWallClockMs > 0) {
const wallDelta = finalWallNow - this.sessionState.lastWallClockMs;
if (wallDelta > 0 && wallDelta < 60_000) {
this.sessionState.totalWatchedMs += wallDelta;
if (!this.sessionState.isPaused) {
this.sessionState.activeWatchedMs += wallDelta;
}
}
}
this.flushTelemetry(true);
this.flushNow();
this.sessionState.pendingTelemetry = false;
finalizeSessionRecord(this.db, this.sessionState, endedAt);
applySessionLifetimeSummary(this.db, this.sessionState, endedAt);
this.sessionState = null;
}
private captureVideoMetadataAsync(videoId: number, sourceType: number, mediaPath: string): void {
if (sourceType !== SOURCE_TYPE_LOCAL) return;
void (async () => {
try {
const metadata = await getLocalVideoMetadata(mediaPath);
updateVideoMetadataRecord(this.db, videoId, metadata);
} catch (error) {
this.logger.warn('Unable to capture local video metadata', (error as Error).message);
}
})();
}
private captureAnimeMetadataAsync(
videoId: number,
mediaPath: string | null,
mediaTitle: string | null,
): void {
const updatePromise = (async () => {
try {
const parsed = await guessAnimeVideoMetadata(mediaPath, mediaTitle);
if (this.isDestroyed || !parsed?.parsedTitle.trim()) {
return;
}
const animeId = getOrCreateAnimeRecord(this.db, {
parsedTitle: parsed.parsedTitle,
canonicalTitle: parsed.parsedTitle,
anilistId: null,
titleRomaji: null,
titleEnglish: null,
titleNative: null,
metadataJson: parsed.parseMetadataJson,
});
linkVideoToAnimeRecord(this.db, videoId, {
animeId,
parsedBasename: parsed.parsedBasename,
parsedTitle: parsed.parsedTitle,
parsedSeason: parsed.parsedSeason,
parsedEpisode: parsed.parsedEpisode,
parserSource: parsed.parserSource,
parserConfidence: parsed.parserConfidence,
parseMetadataJson: parsed.parseMetadataJson,
});
} catch (error) {
this.logger.warn('Unable to capture anime metadata', (error as Error).message);
}
})();
this.pendingAnimeMetadataUpdates.set(videoId, updatePromise);
void updatePromise.finally(() => {
this.pendingAnimeMetadataUpdates.delete(videoId);
});
}
private updateVideoTitleForActiveSession(canonicalTitle: string): void {
if (!this.sessionState) return;
updateVideoTitleRecord(this.db, this.sessionState.videoId, canonicalTitle);
}
}