📚 Import stats hardening

This commit is contained in:
2026-03-05 15:05:12 -05:00
parent 71267ecc7e
commit 8138e0fe4f
5 changed files with 836 additions and 74 deletions

View File

@@ -766,6 +766,56 @@ export const resolvers = {
throw new Error(`Failed to fetch job result statistics: ${error.message}`);
}
},
/**
* Get active import session (if any)
* @async
* @function getActiveImportSession
* @param {any} _ - Parent resolver (unused)
* @param {Object} args - Query arguments (none)
* @param {Object} context - GraphQL context with broker
* @returns {Promise<Object|null>} Active import session or null
* @throws {Error} If import state service is unavailable
* @description Retrieves the currently active import session (if any).
* Useful for checking if an import is in progress before starting a new one.
*
* @example
* ```graphql
* query {
* getActiveImportSession {
* sessionId
* type
* status
* startedAt
* stats {
* totalFiles
* filesProcessed
* filesSucceeded
* filesFailed
* }
* }
* }
* ```
*/
getActiveImportSession: async (
_: any,
args: {},
context: any
) => {
try {
const broker = context?.broker;
if (!broker) {
throw new Error("Broker not available in context");
}
const session = await broker.call("importstate.getActiveSession");
return session;
} catch (error) {
console.error("Error fetching active import session:", error);
throw new Error(`Failed to fetch active import session: ${error.message}`);
}
},
},
Mutation: {
@@ -1373,6 +1423,137 @@ export const resolvers = {
throw new Error(`Failed to update sourced metadata: ${error.message}`);
}
},
/**
* Start a new full import of the comics directory
* @async
* @function startNewImport
* @param {any} _ - Parent resolver (unused)
* @param {Object} args - Mutation arguments
* @param {string} args.sessionId - Session ID for tracking this import batch
* @param {Object} context - GraphQL context with broker
* @returns {Promise<Object>} Import job result with success status and jobs queued count
* @throws {Error} If import service is unavailable or import fails
* @description Starts a full import of all comics in the comics directory.
* Scans the entire directory and queues jobs for all comic files that haven't
* been imported yet. Checks for active import sessions to prevent race conditions.
*
* @example
* ```graphql
* mutation {
* startNewImport(sessionId: "import-2024-01-01") {
* success
* message
* jobsQueued
* }
* }
* ```
*/
startNewImport: async (
_: any,
{ sessionId }: { sessionId: string },
context: any
) => {
try {
const broker = context?.broker;
if (!broker) {
throw new Error("Broker not available in context");
}
// Check for active import sessions (race condition prevention)
const activeSession = await broker.call("importstate.getActiveSession");
if (activeSession) {
throw new Error(
`Cannot start new import: Another import session "${activeSession.sessionId}" is already active (${activeSession.type}). Please wait for it to complete.`
);
}
// Call the library service to start new import
await broker.call("library.newImport", {
sessionId,
});
return {
success: true,
message: "New import started successfully",
jobsQueued: 0, // The actual count is tracked asynchronously
};
} catch (error) {
console.error("Error starting new import:", error);
throw new Error(`Failed to start new import: ${error.message}`);
}
},
/**
* Start an incremental import (only new files)
* @async
* @function startIncrementalImport
* @param {any} _ - Parent resolver (unused)
* @param {Object} args - Mutation arguments
* @param {string} args.sessionId - Session ID for tracking this import batch
* @param {string} [args.directoryPath] - Optional directory path to scan (defaults to COMICS_DIRECTORY)
* @param {Object} context - GraphQL context with broker
* @returns {Promise<Object>} Incremental import result with statistics
* @throws {Error} If import service is unavailable or import fails
* @description Starts an incremental import that only processes new files
* not already in the database. More efficient than full import for large libraries.
* Checks for active import sessions to prevent race conditions.
*
* @example
* ```graphql
* mutation {
* startIncrementalImport(
* sessionId: "incremental-2024-01-01"
* directoryPath: "/path/to/comics"
* ) {
* success
* message
* stats {
* total
* alreadyImported
* newFiles
* queued
* }
* }
* }
* ```
*/
startIncrementalImport: async (
_: any,
{
sessionId,
directoryPath,
}: { sessionId: string; directoryPath?: string },
context: any
) => {
try {
const broker = context?.broker;
if (!broker) {
throw new Error("Broker not available in context");
}
// Check for active import sessions (race condition prevention)
const activeSession = await broker.call("importstate.getActiveSession");
if (activeSession) {
throw new Error(
`Cannot start incremental import: Another import session "${activeSession.sessionId}" is already active (${activeSession.type}). Please wait for it to complete.`
);
}
// Call the library service to start incremental import
const result = await broker.call("library.incrementalImport", {
sessionId,
directoryPath,
});
return result;
} catch (error) {
console.error("Error starting incremental import:", error);
throw new Error(`Failed to start incremental import: ${error.message}`);
}
},
},
/**

View File

@@ -353,6 +353,9 @@ export const typeDefs = gql`
# Get job result statistics grouped by session
getJobResultStatistics: [JobResultStatistics!]!
# Get active import session (if any)
getActiveImportSession: ImportSession
}
# Mutations
@@ -780,4 +783,23 @@ export const typeDefs = gql`
failedJobs: Int!
earliestTimestamp: String!
}
# Import session information
type ImportSession {
sessionId: String!
type: String!
status: String!
startedAt: String!
completedAt: String
stats: ImportSessionStats!
directoryPath: String
}
type ImportSessionStats {
totalFiles: Int!
filesQueued: Int!
filesProcessed: Int!
filesSucceeded: Int!
filesFailed: Int!
}
`;

View File

@@ -1,7 +1,7 @@
import chokidar, { FSWatcher } from "chokidar";
import fs from "fs";
import path from "path";
import { Service, ServiceBroker, ServiceSchema } from "moleculer";
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
import ApiGateway from "moleculer-web";
import debounce from "lodash/debounce";
import { IFolderData } from "threetwo-ui-typings";
@@ -133,7 +133,103 @@ export default class ApiService extends Service {
logResponseData: true,
assets: { folder: "public", options: {} },
},
events: {},
events: {
/**
* Listen for import session completion to refresh statistics
*/
"IMPORT_SESSION_COMPLETED": {
async handler(ctx: Context<{
sessionId: string;
type: string;
success: boolean;
stats: any;
}>) {
const { sessionId, type, success } = ctx.params;
this.logger.info(
`[Stats Cache] Import session completed: ${sessionId} (${type}, success: ${success})`
);
// Invalidate and refresh statistics cache
await this.actions.invalidateStatsCache();
},
},
/**
* Listen for import progress to update cache incrementally
*/
"IMPORT_PROGRESS": {
async handler(ctx: Context<{
sessionId: string;
stats: any;
}>) {
// Update cache with current progress
if (this.statsCache) {
const { stats } = ctx.params;
// Update alreadyImported count based on files succeeded
if (stats.filesSucceeded) {
this.statsCache.alreadyImported += 1;
this.statsCache.newFiles = Math.max(0, this.statsCache.newFiles - 1);
// Recalculate percentage
if (this.statsCache.totalLocalFiles > 0) {
const percentage = (
(this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100
).toFixed(2);
this.statsCache.percentageImported = `${percentage}%`;
}
this.statsCache.lastUpdated = new Date();
// Trigger debounced broadcast
if (this.broadcastStatsUpdate) {
this.broadcastStatsUpdate();
}
}
}
},
},
/**
* Listen for watcher disable events
*/
"IMPORT_WATCHER_DISABLED": {
async handler(ctx: Context<{ reason: string; sessionId: string }>) {
const { reason, sessionId } = ctx.params;
this.logger.info(`[Watcher] Disabled: ${reason} (session: ${sessionId})`);
// Broadcast to frontend
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "IMPORT_WATCHER_STATUS",
args: [{
enabled: false,
reason,
sessionId,
}],
});
},
},
/**
* Listen for watcher enable events
*/
"IMPORT_WATCHER_ENABLED": {
async handler(ctx: Context<{ sessionId: string }>) {
const { sessionId } = ctx.params;
this.logger.info(`[Watcher] Re-enabled after session: ${sessionId}`);
// Broadcast to frontend
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "IMPORT_WATCHER_STATUS",
args: [{
enabled: true,
sessionId,
}],
});
},
},
},
actions: {
/**
* Get cached import statistics (fast, no filesystem scan)
@@ -427,6 +523,17 @@ export default class ApiService extends Service {
): Promise<void> {
this.logger.info(`File event [${event}]: ${filePath}`);
// Check if watcher should process files (not during manual imports)
if (event === "add") {
const watcherState: any = await this.broker.call("importstate.isWatcherEnabled");
if (!watcherState.enabled) {
this.logger.info(
`[Watcher] Skipping file ${filePath} - manual import in progress (${watcherState.activeSession?.sessionId})`
);
return;
}
}
// Update statistics cache for add/unlink events
if (event === "add" || event === "unlink") {
this.updateStatsCache(event, filePath);
@@ -435,10 +542,26 @@ export default class ApiService extends Service {
if (event === "add" && stats) {
setTimeout(async () => {
try {
// Double-check watcher is still enabled
const watcherState: any = await this.broker.call("importstate.isWatcherEnabled");
if (!watcherState.enabled) {
this.logger.info(
`[Watcher] Skipping delayed import for ${filePath} - manual import started`
);
return;
}
const newStats = await fs.promises.stat(filePath);
if (newStats.mtime.getTime() === stats.mtime.getTime()) {
this.logger.info(`Stable file detected: ${filePath}, importing.`);
// Create a watcher session for this file
const sessionId = `watcher-${Date.now()}`;
await this.broker.call("importstate.startSession", {
sessionId,
type: "watcher",
});
const folderData: IFolderData[] = await this.broker.call(
"library.walkFolders",
{ basePathToWalk: filePath }
@@ -490,19 +613,41 @@ export default class ApiService extends Service {
};
// Call the library service to import the comic
await this.broker.call("library.rawImportToDB", {
const result: any = await this.broker.call("library.rawImportToDB", {
importType: "new",
payload: payload,
});
this.logger.info(`Successfully queued import for: ${filePath}`);
this.logger.info(`Successfully imported: ${filePath}`);
// Mark file as imported in statistics cache
this.markFileAsImported(filePath);
// Complete watcher session
await this.broker.call("importstate.completeSession", {
sessionId,
success: result.success,
});
} else {
// Complete session even if no folder data
await this.broker.call("importstate.completeSession", {
sessionId,
success: false,
});
}
}
} catch (error) {
this.logger.error(`Error importing file ${filePath}:`, error);
// Try to complete session on error
try {
const sessionId = `watcher-${Date.now()}`;
await this.broker.call("importstate.completeSession", {
sessionId,
success: false,
});
} catch (e) {
// Ignore session completion errors
}
}
}, 3000);
}

View File

@@ -0,0 +1,348 @@
/**
* Import State Management Service
*
* Centralized service for tracking import sessions, preventing race conditions,
* and coordinating between file watcher, manual imports, and statistics updates.
*/
import { Service, ServiceBroker, Context } from "moleculer";
import { pubClient } from "../config/redis.config";
/**
* Import session state
*/
interface ImportSession {
sessionId: string;
type: "full" | "incremental" | "watcher";
status: "starting" | "scanning" | "queueing" | "active" | "completed" | "failed";
startedAt: Date;
completedAt?: Date;
stats: {
totalFiles: number;
filesQueued: number;
filesProcessed: number;
filesSucceeded: number;
filesFailed: number;
};
directoryPath?: string;
}
export default class ImportStateService extends Service {
private activeSessions: Map<string, ImportSession> = new Map();
private watcherEnabled: boolean = true;
public constructor(broker: ServiceBroker) {
super(broker);
this.parseServiceSchema({
name: "importstate",
actions: {
/**
* Start a new import session
*/
startSession: {
params: {
sessionId: "string",
type: { type: "enum", values: ["full", "incremental", "watcher"] },
directoryPath: { type: "string", optional: true },
},
async handler(ctx: Context<{
sessionId: string;
type: "full" | "incremental" | "watcher";
directoryPath?: string;
}>) {
const { sessionId, type, directoryPath } = ctx.params;
// Check for active sessions (prevent race conditions)
const activeSession = this.getActiveSession();
if (activeSession && type !== "watcher") {
throw new Error(
`Cannot start ${type} import: Another import session "${activeSession.sessionId}" is already active (${activeSession.type})`
);
}
// If starting manual import, temporarily disable watcher
if (type !== "watcher") {
this.logger.info(`[Import State] Disabling watcher for ${type} import`);
this.watcherEnabled = false;
await this.broker.broadcast("IMPORT_WATCHER_DISABLED", {
reason: `${type} import started`,
sessionId,
});
}
const session: ImportSession = {
sessionId,
type,
status: "starting",
startedAt: new Date(),
stats: {
totalFiles: 0,
filesQueued: 0,
filesProcessed: 0,
filesSucceeded: 0,
filesFailed: 0,
},
directoryPath,
};
this.activeSessions.set(sessionId, session);
this.logger.info(`[Import State] Started session: ${sessionId} (${type})`);
// Broadcast session started
await this.broker.broadcast("IMPORT_SESSION_STARTED", {
sessionId,
type,
startedAt: session.startedAt,
});
// Store in Redis for persistence
await pubClient.set(
`import:session:${sessionId}`,
JSON.stringify(session),
{ EX: 86400 } // 24 hour expiry
);
return session;
},
},
/**
* Update session status
*/
updateSession: {
params: {
sessionId: "string",
status: {
type: "enum",
values: ["starting", "scanning", "queueing", "active", "completed", "failed"],
optional: true,
},
stats: { type: "object", optional: true },
},
async handler(ctx: Context<{
sessionId: string;
status?: ImportSession["status"];
stats?: Partial<ImportSession["stats"]>;
}>) {
const { sessionId, status, stats } = ctx.params;
const session = this.activeSessions.get(sessionId);
if (!session) {
throw new Error(`Session not found: ${sessionId}`);
}
if (status) {
session.status = status;
}
if (stats) {
session.stats = { ...session.stats, ...stats };
}
// Update Redis
await pubClient.set(
`import:session:${sessionId}`,
JSON.stringify(session),
{ EX: 86400 }
);
// Broadcast update
await this.broker.broadcast("IMPORT_SESSION_UPDATED", {
sessionId,
status: session.status,
stats: session.stats,
});
return session;
},
},
/**
* Complete a session
*/
completeSession: {
params: {
sessionId: "string",
success: "boolean",
},
async handler(ctx: Context<{
sessionId: string;
success: boolean;
}>) {
const { sessionId, success } = ctx.params;
const session = this.activeSessions.get(sessionId);
if (!session) {
this.logger.warn(`[Import State] Session not found: ${sessionId}`);
return null;
}
session.status = success ? "completed" : "failed";
session.completedAt = new Date();
this.logger.info(
`[Import State] Completed session: ${sessionId} (${session.status})`
);
// Re-enable watcher if this was a manual import
if (session.type !== "watcher") {
this.watcherEnabled = true;
this.logger.info("[Import State] Re-enabling watcher");
await this.broker.broadcast("IMPORT_WATCHER_ENABLED", {
sessionId,
});
}
// Broadcast completion
await this.broker.broadcast("IMPORT_SESSION_COMPLETED", {
sessionId,
type: session.type,
success,
stats: session.stats,
duration: session.completedAt.getTime() - session.startedAt.getTime(),
});
// Update Redis with final state
await pubClient.set(
`import:session:${sessionId}:final`,
JSON.stringify(session),
{ EX: 604800 } // 7 day expiry for completed sessions
);
// Remove from active sessions
this.activeSessions.delete(sessionId);
// Trigger statistics refresh
await this.broker.call("api.invalidateStatsCache");
return session;
},
},
/**
* Get current session
*/
getSession: {
params: {
sessionId: "string",
},
async handler(ctx: Context<{ sessionId: string }>) {
const { sessionId } = ctx.params;
return this.activeSessions.get(sessionId) || null;
},
},
/**
* Get active session (if any)
*/
getActiveSession: {
async handler() {
return this.getActiveSession();
},
},
/**
* Check if watcher should process files
*/
isWatcherEnabled: {
async handler() {
return {
enabled: this.watcherEnabled,
activeSession: this.getActiveSession(),
};
},
},
/**
* Increment file processed counter
*/
incrementProcessed: {
params: {
sessionId: "string",
success: "boolean",
},
async handler(ctx: Context<{
sessionId: string;
success: boolean;
}>) {
const { sessionId, success } = ctx.params;
const session = this.activeSessions.get(sessionId);
if (!session) {
return null;
}
session.stats.filesProcessed++;
if (success) {
session.stats.filesSucceeded++;
} else {
session.stats.filesFailed++;
}
// Update Redis
await pubClient.set(
`import:session:${sessionId}`,
JSON.stringify(session),
{ EX: 86400 }
);
// Broadcast progress update
await this.broker.broadcast("IMPORT_PROGRESS", {
sessionId,
stats: session.stats,
});
return session.stats;
},
},
/**
* Get all active sessions
*/
getAllActiveSessions: {
async handler() {
return Array.from(this.activeSessions.values());
},
},
},
methods: {
/**
* Get the currently active session (non-watcher)
*/
getActiveSession(): ImportSession | null {
for (const session of this.activeSessions.values()) {
if (
session.type !== "watcher" &&
["starting", "scanning", "queueing", "active"].includes(session.status)
) {
return session;
}
}
return null;
},
},
events: {
/**
* Listen for job completion events from jobqueue
*/
"JOB_COMPLETED": {
async handler(ctx: Context<{ sessionId?: string; success: boolean }>) {
const { sessionId, success } = ctx.params;
if (sessionId) {
await this.actions.incrementProcessed({ sessionId, success });
}
},
},
},
started: async () => {
this.logger.info("[Import State] Service started");
// Clean up any stale sessions from Redis on startup
const keys = await pubClient.keys("import:session:*");
this.logger.info(`[Import State] Found ${keys.length} session keys in Redis`);
},
});
}
}

View File

@@ -177,7 +177,21 @@ export default class ImportService extends Service {
// Get params to be passed to the import jobs
const { sessionId } = ctx.params;
const resolvedPath = path.resolve(COMICS_DIRECTORY);
// Start import session
await this.broker.call("importstate.startSession", {
sessionId,
type: "full",
directoryPath: resolvedPath,
});
console.log(`Walking comics directory: ${resolvedPath}`);
// Update session status
await this.broker.call("importstate.updateSession", {
sessionId,
status: "scanning",
});
// 1. Walk the Source folder
klaw(resolvedPath)
.on("error", (err) => {
@@ -186,6 +200,8 @@ export default class ImportService extends Service {
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
// Only process files, not directories
if (item.stats.isFile()) {
let fileExtension = path.extname(
item.path
);
@@ -196,6 +212,7 @@ export default class ImportService extends Service {
) {
this.push(item);
}
}
next();
})
)
@@ -213,16 +230,7 @@ export default class ImportService extends Service {
)}`,
});
if (!comicExists) {
// 2.1 Reset the job counters in Redis
await pubClient.set(
"completedJobCount",
0
);
await pubClient.set(
"failedJobCount",
0
);
// 2.2 Send the extraction job to the queue
// Send the extraction job to the queue
this.broker.call("jobqueue.enqueue", {
fileObject: {
filePath: item.path,
@@ -238,11 +246,24 @@ export default class ImportService extends Service {
);
}
})
.on("end", () => {
.on("end", async () => {
console.log("All files traversed.");
// Update session to active (jobs are now being processed)
await this.broker.call("importstate.updateSession", {
sessionId,
status: "active",
});
});
} catch (error) {
console.log(error);
// Mark session as failed
const { sessionId } = ctx.params;
if (sessionId) {
await this.broker.call("importstate.completeSession", {
sessionId,
success: false,
});
}
}
},
},
@@ -270,10 +291,13 @@ export default class ImportService extends Service {
})
.pipe(
through2.obj(function (item, enc, next) {
// Only process files, not directories
if (item.stats.isFile()) {
const fileExtension = path.extname(item.path);
if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) {
localFiles.push(item.path);
}
}
next();
})
)
@@ -325,6 +349,13 @@ export default class ImportService extends Service {
const resolvedPath = path.resolve(directoryPath || COMICS_DIRECTORY);
console.log(`[Incremental Import] Starting for directory: ${resolvedPath}`);
// Start import session
await this.broker.call("importstate.startSession", {
sessionId,
type: "incremental",
directoryPath: resolvedPath,
});
// Emit start event
this.broker.broadcast("LS_INCREMENTAL_IMPORT_STARTED", {
message: "Starting incremental import analysis...",
@@ -332,6 +363,11 @@ export default class ImportService extends Service {
});
// Step 1: Fetch imported files from database
await this.broker.call("importstate.updateSession", {
sessionId,
status: "scanning",
});
this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", {
message: "Fetching imported files from database...",
});
@@ -365,6 +401,8 @@ export default class ImportService extends Service {
})
.pipe(
through2.obj(function (item, enc, next) {
// Only process files, not directories
if (item.stats.isFile()) {
const fileExtension = path.extname(item.path);
if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) {
const fileName = path.basename(item.path, fileExtension);
@@ -374,6 +412,7 @@ export default class ImportService extends Service {
size: item.stats.size,
});
}
}
next();
})
)
@@ -393,17 +432,21 @@ export default class ImportService extends Service {
console.log(`[Incremental Import] ${newFiles.length} new files to import`);
// Step 4: Reset job counters and queue new files
// Step 4: Queue new files
if (newFiles.length > 0) {
await this.broker.call("importstate.updateSession", {
sessionId,
status: "queueing",
stats: {
totalFiles: localFiles.length,
filesQueued: newFiles.length,
},
});
this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", {
message: `Queueing ${newFiles.length} new files for import...`,
});
// Reset counters once at the start
await pubClient.set("completedJobCount", 0);
await pubClient.set("failedJobCount", 0);
console.log("[Incremental Import] Job counters reset");
// Queue all new files
for (const file of newFiles) {
await this.broker.call("jobqueue.enqueue", {
@@ -417,9 +460,21 @@ export default class ImportService extends Service {
action: "enqueue.async",
});
}
// Update session to active
await this.broker.call("importstate.updateSession", {
sessionId,
status: "active",
});
} else {
// No files to import, complete immediately
await this.broker.call("importstate.completeSession", {
sessionId,
success: true,
});
}
// Emit completion event
// Emit completion event (queueing complete, not import complete)
this.broker.broadcast("LS_INCREMENTAL_IMPORT_COMPLETE", {
message: `Successfully queued ${newFiles.length} files for import`,
stats: {
@@ -432,7 +487,9 @@ export default class ImportService extends Service {
return {
success: true,
message: `Incremental import complete: ${newFiles.length} new files queued`,
message: newFiles.length > 0
? `Incremental import started: ${newFiles.length} new files queued`
: "No new files to import",
stats: {
total: localFiles.length,
alreadyImported: localFiles.length - newFiles.length,
@@ -443,6 +500,15 @@ export default class ImportService extends Service {
} catch (error) {
console.error("[Incremental Import] Error:", error);
// Mark session as failed
const { sessionId } = ctx.params;
if (sessionId) {
await this.broker.call("importstate.completeSession", {
sessionId,
success: false,
});
}
// Emit error event
this.broker.broadcast("LS_INCREMENTAL_IMPORT_ERROR", {
message: error.message || "Unknown error during incremental import",