From 8138e0fe4fe6ad289c377836fb9df1416375eccd Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Thu, 5 Mar 2026 15:05:12 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=9A=20Import=20stats=20hardening?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/graphql/resolvers.ts | 187 ++++++++++++++++- models/graphql/typedef.ts | 22 ++ services/api.service.ts | 209 ++++++++++++++++--- services/importstate.service.ts | 348 ++++++++++++++++++++++++++++++++ services/library.service.ts | 144 +++++++++---- 5 files changed, 836 insertions(+), 74 deletions(-) create mode 100644 services/importstate.service.ts diff --git a/models/graphql/resolvers.ts b/models/graphql/resolvers.ts index 195b9c3..9779aa7 100644 --- a/models/graphql/resolvers.ts +++ b/models/graphql/resolvers.ts @@ -766,6 +766,56 @@ export const resolvers = { throw new Error(`Failed to fetch job result statistics: ${error.message}`); } }, + + /** + * Get active import session (if any) + * @async + * @function getActiveImportSession + * @param {any} _ - Parent resolver (unused) + * @param {Object} args - Query arguments (none) + * @param {Object} context - GraphQL context with broker + * @returns {Promise} Active import session or null + * @throws {Error} If import state service is unavailable + * @description Retrieves the currently active import session (if any). + * Useful for checking if an import is in progress before starting a new one. + * + * @example + * ```graphql + * query { + * getActiveImportSession { + * sessionId + * type + * status + * startedAt + * stats { + * totalFiles + * filesProcessed + * filesSucceeded + * filesFailed + * } + * } + * } + * ``` + */ + getActiveImportSession: async ( + _: any, + args: {}, + context: any + ) => { + try { + const broker = context?.broker; + + if (!broker) { + throw new Error("Broker not available in context"); + } + + const session = await broker.call("importstate.getActiveSession"); + return session; + } catch (error) { + console.error("Error fetching active import session:", error); + throw new Error(`Failed to fetch active import session: ${error.message}`); + } + }, }, Mutation: { @@ -1373,12 +1423,143 @@ export const resolvers = { throw new Error(`Failed to update sourced metadata: ${error.message}`); } }, + + /** + * Start a new full import of the comics directory + * @async + * @function startNewImport + * @param {any} _ - Parent resolver (unused) + * @param {Object} args - Mutation arguments + * @param {string} args.sessionId - Session ID for tracking this import batch + * @param {Object} context - GraphQL context with broker + * @returns {Promise} Import job result with success status and jobs queued count + * @throws {Error} If import service is unavailable or import fails + * @description Starts a full import of all comics in the comics directory. + * Scans the entire directory and queues jobs for all comic files that haven't + * been imported yet. Checks for active import sessions to prevent race conditions. + * + * @example + * ```graphql + * mutation { + * startNewImport(sessionId: "import-2024-01-01") { + * success + * message + * jobsQueued + * } + * } + * ``` + */ + startNewImport: async ( + _: any, + { sessionId }: { sessionId: string }, + context: any + ) => { + try { + const broker = context?.broker; + + if (!broker) { + throw new Error("Broker not available in context"); + } + + // Check for active import sessions (race condition prevention) + const activeSession = await broker.call("importstate.getActiveSession"); + if (activeSession) { + throw new Error( + `Cannot start new import: Another import session "${activeSession.sessionId}" is already active (${activeSession.type}). Please wait for it to complete.` + ); + } + + // Call the library service to start new import + await broker.call("library.newImport", { + sessionId, + }); + + return { + success: true, + message: "New import started successfully", + jobsQueued: 0, // The actual count is tracked asynchronously + }; + } catch (error) { + console.error("Error starting new import:", error); + throw new Error(`Failed to start new import: ${error.message}`); + } + }, + + /** + * Start an incremental import (only new files) + * @async + * @function startIncrementalImport + * @param {any} _ - Parent resolver (unused) + * @param {Object} args - Mutation arguments + * @param {string} args.sessionId - Session ID for tracking this import batch + * @param {string} [args.directoryPath] - Optional directory path to scan (defaults to COMICS_DIRECTORY) + * @param {Object} context - GraphQL context with broker + * @returns {Promise} Incremental import result with statistics + * @throws {Error} If import service is unavailable or import fails + * @description Starts an incremental import that only processes new files + * not already in the database. More efficient than full import for large libraries. + * Checks for active import sessions to prevent race conditions. + * + * @example + * ```graphql + * mutation { + * startIncrementalImport( + * sessionId: "incremental-2024-01-01" + * directoryPath: "/path/to/comics" + * ) { + * success + * message + * stats { + * total + * alreadyImported + * newFiles + * queued + * } + * } + * } + * ``` + */ + startIncrementalImport: async ( + _: any, + { + sessionId, + directoryPath, + }: { sessionId: string; directoryPath?: string }, + context: any + ) => { + try { + const broker = context?.broker; + + if (!broker) { + throw new Error("Broker not available in context"); + } + + // Check for active import sessions (race condition prevention) + const activeSession = await broker.call("importstate.getActiveSession"); + if (activeSession) { + throw new Error( + `Cannot start incremental import: Another import session "${activeSession.sessionId}" is already active (${activeSession.type}). Please wait for it to complete.` + ); + } + + // Call the library service to start incremental import + const result = await broker.call("library.incrementalImport", { + sessionId, + directoryPath, + }); + + return result; + } catch (error) { + console.error("Error starting incremental import:", error); + throw new Error(`Failed to start incremental import: ${error.message}`); + } + }, }, /** - * Field resolvers for Comic type - * @description Custom field resolvers for transforming Comic data - */ + * Field resolvers for Comic type + * @description Custom field resolvers for transforming Comic data + */ Comic: { /** * Resolve Comic ID field diff --git a/models/graphql/typedef.ts b/models/graphql/typedef.ts index 091f491..629330e 100644 --- a/models/graphql/typedef.ts +++ b/models/graphql/typedef.ts @@ -353,6 +353,9 @@ export const typeDefs = gql` # Get job result statistics grouped by session getJobResultStatistics: [JobResultStatistics!]! + + # Get active import session (if any) + getActiveImportSession: ImportSession } # Mutations @@ -780,4 +783,23 @@ export const typeDefs = gql` failedJobs: Int! earliestTimestamp: String! } + + # Import session information + type ImportSession { + sessionId: String! + type: String! + status: String! + startedAt: String! + completedAt: String + stats: ImportSessionStats! + directoryPath: String + } + + type ImportSessionStats { + totalFiles: Int! + filesQueued: Int! + filesProcessed: Int! + filesSucceeded: Int! + filesFailed: Int! + } `; diff --git a/services/api.service.ts b/services/api.service.ts index 76f8fa2..2305587 100644 --- a/services/api.service.ts +++ b/services/api.service.ts @@ -1,7 +1,7 @@ import chokidar, { FSWatcher } from "chokidar"; import fs from "fs"; import path from "path"; -import { Service, ServiceBroker, ServiceSchema } from "moleculer"; +import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer"; import ApiGateway from "moleculer-web"; import debounce from "lodash/debounce"; import { IFolderData } from "threetwo-ui-typings"; @@ -133,7 +133,103 @@ export default class ApiService extends Service { logResponseData: true, assets: { folder: "public", options: {} }, }, - events: {}, + events: { + /** + * Listen for import session completion to refresh statistics + */ + "IMPORT_SESSION_COMPLETED": { + async handler(ctx: Context<{ + sessionId: string; + type: string; + success: boolean; + stats: any; + }>) { + const { sessionId, type, success } = ctx.params; + this.logger.info( + `[Stats Cache] Import session completed: ${sessionId} (${type}, success: ${success})` + ); + + // Invalidate and refresh statistics cache + await this.actions.invalidateStatsCache(); + }, + }, + + /** + * Listen for import progress to update cache incrementally + */ + "IMPORT_PROGRESS": { + async handler(ctx: Context<{ + sessionId: string; + stats: any; + }>) { + // Update cache with current progress + if (this.statsCache) { + const { stats } = ctx.params; + // Update alreadyImported count based on files succeeded + if (stats.filesSucceeded) { + this.statsCache.alreadyImported += 1; + this.statsCache.newFiles = Math.max(0, this.statsCache.newFiles - 1); + + // Recalculate percentage + if (this.statsCache.totalLocalFiles > 0) { + const percentage = ( + (this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100 + ).toFixed(2); + this.statsCache.percentageImported = `${percentage}%`; + } + + this.statsCache.lastUpdated = new Date(); + + // Trigger debounced broadcast + if (this.broadcastStatsUpdate) { + this.broadcastStatsUpdate(); + } + } + } + }, + }, + + /** + * Listen for watcher disable events + */ + "IMPORT_WATCHER_DISABLED": { + async handler(ctx: Context<{ reason: string; sessionId: string }>) { + const { reason, sessionId } = ctx.params; + this.logger.info(`[Watcher] Disabled: ${reason} (session: ${sessionId})`); + + // Broadcast to frontend + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "IMPORT_WATCHER_STATUS", + args: [{ + enabled: false, + reason, + sessionId, + }], + }); + }, + }, + + /** + * Listen for watcher enable events + */ + "IMPORT_WATCHER_ENABLED": { + async handler(ctx: Context<{ sessionId: string }>) { + const { sessionId } = ctx.params; + this.logger.info(`[Watcher] Re-enabled after session: ${sessionId}`); + + // Broadcast to frontend + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "IMPORT_WATCHER_STATUS", + args: [{ + enabled: true, + sessionId, + }], + }); + }, + }, + }, actions: { /** * Get cached import statistics (fast, no filesystem scan) @@ -421,30 +517,57 @@ export default class ApiService extends Service { * @private */ private async handleFileEvent( - event: string, - filePath: string, - stats?: fs.Stats + event: string, + filePath: string, + stats?: fs.Stats ): Promise { - this.logger.info(`File event [${event}]: ${filePath}`); - - // Update statistics cache for add/unlink events - if (event === "add" || event === "unlink") { - this.updateStatsCache(event, filePath); - } - - if (event === "add" && stats) { + this.logger.info(`File event [${event}]: ${filePath}`); + + // Check if watcher should process files (not during manual imports) + if (event === "add") { + const watcherState: any = await this.broker.call("importstate.isWatcherEnabled"); + if (!watcherState.enabled) { + this.logger.info( + `[Watcher] Skipping file ${filePath} - manual import in progress (${watcherState.activeSession?.sessionId})` + ); + return; + } + } + + // Update statistics cache for add/unlink events + if (event === "add" || event === "unlink") { + this.updateStatsCache(event, filePath); + } + + if (event === "add" && stats) { setTimeout(async () => { - try { - const newStats = await fs.promises.stat(filePath); - if (newStats.mtime.getTime() === stats.mtime.getTime()) { - this.logger.info(`Stable file detected: ${filePath}, importing.`); - - const folderData: IFolderData[] = await this.broker.call( - "library.walkFolders", - { basePathToWalk: filePath } - ); - - if (folderData && folderData.length > 0) { + try { + // Double-check watcher is still enabled + const watcherState: any = await this.broker.call("importstate.isWatcherEnabled"); + if (!watcherState.enabled) { + this.logger.info( + `[Watcher] Skipping delayed import for ${filePath} - manual import started` + ); + return; + } + + const newStats = await fs.promises.stat(filePath); + if (newStats.mtime.getTime() === stats.mtime.getTime()) { + this.logger.info(`Stable file detected: ${filePath}, importing.`); + + // Create a watcher session for this file + const sessionId = `watcher-${Date.now()}`; + await this.broker.call("importstate.startSession", { + sessionId, + type: "watcher", + }); + + const folderData: IFolderData[] = await this.broker.call( + "library.walkFolders", + { basePathToWalk: filePath } + ); + + if (folderData && folderData.length > 0) { const fileData = folderData[0]; const fileName = path.basename(filePath, path.extname(filePath)); const extension = path.extname(filePath); @@ -490,20 +613,42 @@ export default class ApiService extends Service { }; // Call the library service to import the comic - await this.broker.call("library.rawImportToDB", { - importType: "new", - payload: payload, + const result: any = await this.broker.call("library.rawImportToDB", { + importType: "new", + payload: payload, }); - this.logger.info(`Successfully queued import for: ${filePath}`); + this.logger.info(`Successfully imported: ${filePath}`); // Mark file as imported in statistics cache this.markFileAsImported(filePath); + + // Complete watcher session + await this.broker.call("importstate.completeSession", { + sessionId, + success: result.success, + }); + } else { + // Complete session even if no folder data + await this.broker.call("importstate.completeSession", { + sessionId, + success: false, + }); + } } - } - } catch (error) { - this.logger.error(`Error importing file ${filePath}:`, error); - } + } catch (error) { + this.logger.error(`Error importing file ${filePath}:`, error); + // Try to complete session on error + try { + const sessionId = `watcher-${Date.now()}`; + await this.broker.call("importstate.completeSession", { + sessionId, + success: false, + }); + } catch (e) { + // Ignore session completion errors + } + } }, 3000); } diff --git a/services/importstate.service.ts b/services/importstate.service.ts new file mode 100644 index 0000000..b54013b --- /dev/null +++ b/services/importstate.service.ts @@ -0,0 +1,348 @@ +/** + * Import State Management Service + * + * Centralized service for tracking import sessions, preventing race conditions, + * and coordinating between file watcher, manual imports, and statistics updates. + */ + +import { Service, ServiceBroker, Context } from "moleculer"; +import { pubClient } from "../config/redis.config"; + +/** + * Import session state + */ +interface ImportSession { + sessionId: string; + type: "full" | "incremental" | "watcher"; + status: "starting" | "scanning" | "queueing" | "active" | "completed" | "failed"; + startedAt: Date; + completedAt?: Date; + stats: { + totalFiles: number; + filesQueued: number; + filesProcessed: number; + filesSucceeded: number; + filesFailed: number; + }; + directoryPath?: string; +} + +export default class ImportStateService extends Service { + private activeSessions: Map = new Map(); + private watcherEnabled: boolean = true; + + public constructor(broker: ServiceBroker) { + super(broker); + this.parseServiceSchema({ + name: "importstate", + actions: { + /** + * Start a new import session + */ + startSession: { + params: { + sessionId: "string", + type: { type: "enum", values: ["full", "incremental", "watcher"] }, + directoryPath: { type: "string", optional: true }, + }, + async handler(ctx: Context<{ + sessionId: string; + type: "full" | "incremental" | "watcher"; + directoryPath?: string; + }>) { + const { sessionId, type, directoryPath } = ctx.params; + + // Check for active sessions (prevent race conditions) + const activeSession = this.getActiveSession(); + if (activeSession && type !== "watcher") { + throw new Error( + `Cannot start ${type} import: Another import session "${activeSession.sessionId}" is already active (${activeSession.type})` + ); + } + + // If starting manual import, temporarily disable watcher + if (type !== "watcher") { + this.logger.info(`[Import State] Disabling watcher for ${type} import`); + this.watcherEnabled = false; + await this.broker.broadcast("IMPORT_WATCHER_DISABLED", { + reason: `${type} import started`, + sessionId, + }); + } + + const session: ImportSession = { + sessionId, + type, + status: "starting", + startedAt: new Date(), + stats: { + totalFiles: 0, + filesQueued: 0, + filesProcessed: 0, + filesSucceeded: 0, + filesFailed: 0, + }, + directoryPath, + }; + + this.activeSessions.set(sessionId, session); + this.logger.info(`[Import State] Started session: ${sessionId} (${type})`); + + // Broadcast session started + await this.broker.broadcast("IMPORT_SESSION_STARTED", { + sessionId, + type, + startedAt: session.startedAt, + }); + + // Store in Redis for persistence + await pubClient.set( + `import:session:${sessionId}`, + JSON.stringify(session), + { EX: 86400 } // 24 hour expiry + ); + + return session; + }, + }, + + /** + * Update session status + */ + updateSession: { + params: { + sessionId: "string", + status: { + type: "enum", + values: ["starting", "scanning", "queueing", "active", "completed", "failed"], + optional: true, + }, + stats: { type: "object", optional: true }, + }, + async handler(ctx: Context<{ + sessionId: string; + status?: ImportSession["status"]; + stats?: Partial; + }>) { + const { sessionId, status, stats } = ctx.params; + const session = this.activeSessions.get(sessionId); + + if (!session) { + throw new Error(`Session not found: ${sessionId}`); + } + + if (status) { + session.status = status; + } + + if (stats) { + session.stats = { ...session.stats, ...stats }; + } + + // Update Redis + await pubClient.set( + `import:session:${sessionId}`, + JSON.stringify(session), + { EX: 86400 } + ); + + // Broadcast update + await this.broker.broadcast("IMPORT_SESSION_UPDATED", { + sessionId, + status: session.status, + stats: session.stats, + }); + + return session; + }, + }, + + /** + * Complete a session + */ + completeSession: { + params: { + sessionId: "string", + success: "boolean", + }, + async handler(ctx: Context<{ + sessionId: string; + success: boolean; + }>) { + const { sessionId, success } = ctx.params; + const session = this.activeSessions.get(sessionId); + + if (!session) { + this.logger.warn(`[Import State] Session not found: ${sessionId}`); + return null; + } + + session.status = success ? "completed" : "failed"; + session.completedAt = new Date(); + + this.logger.info( + `[Import State] Completed session: ${sessionId} (${session.status})` + ); + + // Re-enable watcher if this was a manual import + if (session.type !== "watcher") { + this.watcherEnabled = true; + this.logger.info("[Import State] Re-enabling watcher"); + await this.broker.broadcast("IMPORT_WATCHER_ENABLED", { + sessionId, + }); + } + + // Broadcast completion + await this.broker.broadcast("IMPORT_SESSION_COMPLETED", { + sessionId, + type: session.type, + success, + stats: session.stats, + duration: session.completedAt.getTime() - session.startedAt.getTime(), + }); + + // Update Redis with final state + await pubClient.set( + `import:session:${sessionId}:final`, + JSON.stringify(session), + { EX: 604800 } // 7 day expiry for completed sessions + ); + + // Remove from active sessions + this.activeSessions.delete(sessionId); + + // Trigger statistics refresh + await this.broker.call("api.invalidateStatsCache"); + + return session; + }, + }, + + /** + * Get current session + */ + getSession: { + params: { + sessionId: "string", + }, + async handler(ctx: Context<{ sessionId: string }>) { + const { sessionId } = ctx.params; + return this.activeSessions.get(sessionId) || null; + }, + }, + + /** + * Get active session (if any) + */ + getActiveSession: { + async handler() { + return this.getActiveSession(); + }, + }, + + /** + * Check if watcher should process files + */ + isWatcherEnabled: { + async handler() { + return { + enabled: this.watcherEnabled, + activeSession: this.getActiveSession(), + }; + }, + }, + + /** + * Increment file processed counter + */ + incrementProcessed: { + params: { + sessionId: "string", + success: "boolean", + }, + async handler(ctx: Context<{ + sessionId: string; + success: boolean; + }>) { + const { sessionId, success } = ctx.params; + const session = this.activeSessions.get(sessionId); + + if (!session) { + return null; + } + + session.stats.filesProcessed++; + if (success) { + session.stats.filesSucceeded++; + } else { + session.stats.filesFailed++; + } + + // Update Redis + await pubClient.set( + `import:session:${sessionId}`, + JSON.stringify(session), + { EX: 86400 } + ); + + // Broadcast progress update + await this.broker.broadcast("IMPORT_PROGRESS", { + sessionId, + stats: session.stats, + }); + + return session.stats; + }, + }, + + /** + * Get all active sessions + */ + getAllActiveSessions: { + async handler() { + return Array.from(this.activeSessions.values()); + }, + }, + }, + + methods: { + /** + * Get the currently active session (non-watcher) + */ + getActiveSession(): ImportSession | null { + for (const session of this.activeSessions.values()) { + if ( + session.type !== "watcher" && + ["starting", "scanning", "queueing", "active"].includes(session.status) + ) { + return session; + } + } + return null; + }, + }, + + events: { + /** + * Listen for job completion events from jobqueue + */ + "JOB_COMPLETED": { + async handler(ctx: Context<{ sessionId?: string; success: boolean }>) { + const { sessionId, success } = ctx.params; + if (sessionId) { + await this.actions.incrementProcessed({ sessionId, success }); + } + }, + }, + }, + + started: async () => { + this.logger.info("[Import State] Service started"); + // Clean up any stale sessions from Redis on startup + const keys = await pubClient.keys("import:session:*"); + this.logger.info(`[Import State] Found ${keys.length} session keys in Redis`); + }, + }); + } +} diff --git a/services/library.service.ts b/services/library.service.ts index ac97920..59faa73 100644 --- a/services/library.service.ts +++ b/services/library.service.ts @@ -177,7 +177,21 @@ export default class ImportService extends Service { // Get params to be passed to the import jobs const { sessionId } = ctx.params; const resolvedPath = path.resolve(COMICS_DIRECTORY); + + // Start import session + await this.broker.call("importstate.startSession", { + sessionId, + type: "full", + directoryPath: resolvedPath, + }); + console.log(`Walking comics directory: ${resolvedPath}`); + + // Update session status + await this.broker.call("importstate.updateSession", { + sessionId, + status: "scanning", + }); // 1. Walk the Source folder klaw(resolvedPath) .on("error", (err) => { @@ -186,15 +200,18 @@ export default class ImportService extends Service { // 1.1 Filter on .cb* extensions .pipe( through2.obj(function (item, enc, next) { - let fileExtension = path.extname( - item.path - ); - if ( - [".cbz", ".cbr", ".cb7"].includes( - fileExtension - ) - ) { - this.push(item); + // Only process files, not directories + if (item.stats.isFile()) { + let fileExtension = path.extname( + item.path + ); + if ( + [".cbz", ".cbr", ".cb7"].includes( + fileExtension + ) + ) { + this.push(item); + } } next(); }) @@ -213,16 +230,7 @@ export default class ImportService extends Service { )}`, }); if (!comicExists) { - // 2.1 Reset the job counters in Redis - await pubClient.set( - "completedJobCount", - 0 - ); - await pubClient.set( - "failedJobCount", - 0 - ); - // 2.2 Send the extraction job to the queue + // Send the extraction job to the queue this.broker.call("jobqueue.enqueue", { fileObject: { filePath: item.path, @@ -238,11 +246,24 @@ export default class ImportService extends Service { ); } }) - .on("end", () => { + .on("end", async () => { console.log("All files traversed."); + // Update session to active (jobs are now being processed) + await this.broker.call("importstate.updateSession", { + sessionId, + status: "active", + }); }); } catch (error) { console.log(error); + // Mark session as failed + const { sessionId } = ctx.params; + if (sessionId) { + await this.broker.call("importstate.completeSession", { + sessionId, + success: false, + }); + } } }, }, @@ -270,9 +291,12 @@ export default class ImportService extends Service { }) .pipe( through2.obj(function (item, enc, next) { - const fileExtension = path.extname(item.path); - if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) { - localFiles.push(item.path); + // Only process files, not directories + if (item.stats.isFile()) { + const fileExtension = path.extname(item.path); + if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) { + localFiles.push(item.path); + } } next(); }) @@ -325,6 +349,13 @@ export default class ImportService extends Service { const resolvedPath = path.resolve(directoryPath || COMICS_DIRECTORY); console.log(`[Incremental Import] Starting for directory: ${resolvedPath}`); + // Start import session + await this.broker.call("importstate.startSession", { + sessionId, + type: "incremental", + directoryPath: resolvedPath, + }); + // Emit start event this.broker.broadcast("LS_INCREMENTAL_IMPORT_STARTED", { message: "Starting incremental import analysis...", @@ -332,6 +363,11 @@ export default class ImportService extends Service { }); // Step 1: Fetch imported files from database + await this.broker.call("importstate.updateSession", { + sessionId, + status: "scanning", + }); + this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", { message: "Fetching imported files from database...", }); @@ -365,14 +401,17 @@ export default class ImportService extends Service { }) .pipe( through2.obj(function (item, enc, next) { - const fileExtension = path.extname(item.path); - if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) { - const fileName = path.basename(item.path, fileExtension); - localFiles.push({ - path: item.path, - name: fileName, - size: item.stats.size, - }); + // Only process files, not directories + if (item.stats.isFile()) { + const fileExtension = path.extname(item.path); + if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) { + const fileName = path.basename(item.path, fileExtension); + localFiles.push({ + path: item.path, + name: fileName, + size: item.stats.size, + }); + } } next(); }) @@ -393,17 +432,21 @@ export default class ImportService extends Service { console.log(`[Incremental Import] ${newFiles.length} new files to import`); - // Step 4: Reset job counters and queue new files + // Step 4: Queue new files if (newFiles.length > 0) { + await this.broker.call("importstate.updateSession", { + sessionId, + status: "queueing", + stats: { + totalFiles: localFiles.length, + filesQueued: newFiles.length, + }, + }); + this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", { message: `Queueing ${newFiles.length} new files for import...`, }); - // Reset counters once at the start - await pubClient.set("completedJobCount", 0); - await pubClient.set("failedJobCount", 0); - console.log("[Incremental Import] Job counters reset"); - // Queue all new files for (const file of newFiles) { await this.broker.call("jobqueue.enqueue", { @@ -417,9 +460,21 @@ export default class ImportService extends Service { action: "enqueue.async", }); } + + // Update session to active + await this.broker.call("importstate.updateSession", { + sessionId, + status: "active", + }); + } else { + // No files to import, complete immediately + await this.broker.call("importstate.completeSession", { + sessionId, + success: true, + }); } - // Emit completion event + // Emit completion event (queueing complete, not import complete) this.broker.broadcast("LS_INCREMENTAL_IMPORT_COMPLETE", { message: `Successfully queued ${newFiles.length} files for import`, stats: { @@ -432,7 +487,9 @@ export default class ImportService extends Service { return { success: true, - message: `Incremental import complete: ${newFiles.length} new files queued`, + message: newFiles.length > 0 + ? `Incremental import started: ${newFiles.length} new files queued` + : "No new files to import", stats: { total: localFiles.length, alreadyImported: localFiles.length - newFiles.length, @@ -443,6 +500,15 @@ export default class ImportService extends Service { } catch (error) { console.error("[Incremental Import] Error:", error); + // Mark session as failed + const { sessionId } = ctx.params; + if (sessionId) { + await this.broker.call("importstate.completeSession", { + sessionId, + success: false, + }); + } + // Emit error event this.broker.broadcast("LS_INCREMENTAL_IMPORT_ERROR", { message: error.message || "Unknown error during incremental import",