diff --git a/models/graphql/resolvers.ts b/models/graphql/resolvers.ts index 9779aa7..f429cfa 100644 --- a/models/graphql/resolvers.ts +++ b/models/graphql/resolvers.ts @@ -95,7 +95,18 @@ export const resolvers = { } ) => { try { - const result = await Comic.paginate(predicate, paginationOptions); + // Parse predicate if it's a JSON string (from scalar type) + let parsedPredicate = predicate; + if (typeof predicate === 'string') { + try { + parsedPredicate = JSON.parse(predicate); + } catch (parseError) { + console.error("Error parsing predicate JSON:", parseError); + throw new Error("Invalid predicate format: must be valid JSON"); + } + } + + const result = await Comic.paginate(parsedPredicate, paginationOptions); return result; } catch (error) { console.error("Error fetching comic books:", error); @@ -674,54 +685,6 @@ export const resolvers = { } }, - /** - * Get cached import statistics (fast, real-time) - * @async - * @function getCachedImportStatistics - * @param {any} _ - Parent resolver (unused) - * @param {Object} args - Query arguments (none) - * @param {Object} context - GraphQL context with broker - * @returns {Promise} Cached import statistics - * @throws {Error} If statistics service is unavailable - * @description Retrieves cached import statistics from the API service. - * This is a fast, real-time query that doesn't require filesystem scanning. - * - * @example - * ```graphql - * query { - * getCachedImportStatistics { - * success - * stats { - * totalLocalFiles - * alreadyImported - * newFiles - * percentageImported - * pendingFiles - * } - * lastUpdated - * } - * } - * ``` - */ - getCachedImportStatistics: async ( - _: any, - args: {}, - context: any - ) => { - try { - const broker = context?.broker; - - if (!broker) { - throw new Error("Broker not available in context"); - } - - const result = await broker.call("api.getCachedImportStatistics"); - return result; - } catch (error) { - console.error("Error fetching cached import statistics:", error); - throw new Error(`Failed to fetch cached import statistics: ${error.message}`); - } - }, /** * Get job result statistics grouped by session @@ -810,6 +773,7 @@ export const resolvers = { } const session = await broker.call("importstate.getActiveSession"); + console.log("[GraphQL] getActiveImportSession result:", session ? `Session ${session.sessionId} (${session.type}, ${session.status})` : "No active session"); return session; } catch (error) { console.error("Error fetching active import session:", error); @@ -1554,6 +1518,55 @@ export const resolvers = { throw new Error(`Failed to start incremental import: ${error.message}`); } }, + + /** + * Force complete a stuck import session + * @async + * @function forceCompleteSession + * @param {any} _ - Parent resolver (unused) + * @param {Object} args - Arguments + * @param {string} args.sessionId - Session ID to force complete + * @param {any} context - GraphQL context with broker + * @returns {Promise} Result with success status and message + * @throws {Error} If broker is unavailable or session completion fails + * + * @example + * ```graphql + * mutation { + * forceCompleteSession(sessionId: "d7c5043f-5438-4076-9480-2782267899b6") { + * success + * message + * } + * } + * ``` + */ + forceCompleteSession: async ( + _: any, + { sessionId }: { sessionId: string }, + context: any + ) => { + try { + const broker = context?.broker; + + if (!broker) { + throw new Error("Broker not available in context"); + } + + // Force complete the session (mark as failed since it was stuck) + await broker.call("importstate.completeSession", { + sessionId, + success: false, + }); + + return { + success: true, + message: `Session ${sessionId} has been force completed and removed from active sessions`, + }; + } catch (error) { + console.error("Error force completing session:", error); + throw new Error(`Failed to force complete session: ${error.message}`); + } + }, }, /** diff --git a/models/graphql/typedef.ts b/models/graphql/typedef.ts index 629330e..2253986 100644 --- a/models/graphql/typedef.ts +++ b/models/graphql/typedef.ts @@ -348,9 +348,6 @@ export const typeDefs = gql` # Get import statistics for a directory getImportStatistics(directoryPath: String): ImportStatistics! - # Get cached import statistics (fast, real-time) - getCachedImportStatistics: CachedImportStatistics! - # Get job result statistics grouped by session getJobResultStatistics: [JobResultStatistics!]! @@ -406,6 +403,9 @@ export const typeDefs = gql` sessionId: String! directoryPath: String ): IncrementalImportResult! + + # Force complete a stuck import session + forceCompleteSession(sessionId: String!): ForceCompleteResult! } # Input types @@ -739,22 +739,6 @@ export const typeDefs = gql` percentageImported: String! } - # Cached import statistics (real-time) - type CachedImportStatistics { - success: Boolean! - message: String - stats: CachedImportStats - lastUpdated: String - } - - type CachedImportStats { - totalLocalFiles: Int! - alreadyImported: Int! - newFiles: Int! - percentageImported: String! - pendingFiles: Int! - } - # Import job result type ImportJobResult { success: Boolean! @@ -776,6 +760,12 @@ export const typeDefs = gql` queued: Int! } + # Force complete session result + type ForceCompleteResult { + success: Boolean! + message: String! + } + # Job result statistics type JobResultStatistics { sessionId: String! diff --git a/services/api.service.ts b/services/api.service.ts index 2305587..233665b 100644 --- a/services/api.service.ts +++ b/services/api.service.ts @@ -6,18 +6,6 @@ import ApiGateway from "moleculer-web"; import debounce from "lodash/debounce"; import { IFolderData } from "threetwo-ui-typings"; -/** - * Import statistics cache for real-time updates - */ -interface ImportStatisticsCache { - totalLocalFiles: number; - alreadyImported: number; - newFiles: number; - percentageImported: string; - lastUpdated: Date; - pendingFiles: Set; // Files in stabilization period -} - /** * ApiService exposes REST endpoints and watches the comics directory for changes. * It uses chokidar to monitor filesystem events and broadcasts them via the Moleculer broker. @@ -30,18 +18,6 @@ export default class ApiService extends Service { */ private fileWatcher?: any; - /** - * Import statistics cache for real-time updates - * @private - */ - private statsCache: ImportStatisticsCache | null = null; - - /** - * Debounced function to broadcast statistics updates - * @private - */ - private broadcastStatsUpdate?: ReturnType; - /** * Creates an instance of ApiService. * @param {ServiceBroker} broker - The Moleculer service broker instance. @@ -134,61 +110,6 @@ export default class ApiService extends Service { assets: { folder: "public", options: {} }, }, events: { - /** - * Listen for import session completion to refresh statistics - */ - "IMPORT_SESSION_COMPLETED": { - async handler(ctx: Context<{ - sessionId: string; - type: string; - success: boolean; - stats: any; - }>) { - const { sessionId, type, success } = ctx.params; - this.logger.info( - `[Stats Cache] Import session completed: ${sessionId} (${type}, success: ${success})` - ); - - // Invalidate and refresh statistics cache - await this.actions.invalidateStatsCache(); - }, - }, - - /** - * Listen for import progress to update cache incrementally - */ - "IMPORT_PROGRESS": { - async handler(ctx: Context<{ - sessionId: string; - stats: any; - }>) { - // Update cache with current progress - if (this.statsCache) { - const { stats } = ctx.params; - // Update alreadyImported count based on files succeeded - if (stats.filesSucceeded) { - this.statsCache.alreadyImported += 1; - this.statsCache.newFiles = Math.max(0, this.statsCache.newFiles - 1); - - // Recalculate percentage - if (this.statsCache.totalLocalFiles > 0) { - const percentage = ( - (this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100 - ).toFixed(2); - this.statsCache.percentageImported = `${percentage}%`; - } - - this.statsCache.lastUpdated = new Date(); - - // Trigger debounced broadcast - if (this.broadcastStatsUpdate) { - this.broadcastStatsUpdate(); - } - } - } - }, - }, - /** * Listen for watcher disable events */ @@ -230,192 +151,8 @@ export default class ApiService extends Service { }, }, }, - actions: { - /** - * Get cached import statistics (fast, no filesystem scan) - * @returns Cached statistics or null if not initialized - */ - getCachedImportStatistics: { - rest: "GET /cachedImportStatistics", - async handler() { - // If cache not initialized, try to initialize it now - if (!this.statsCache) { - this.logger.info("[Stats Cache] Cache not initialized, initializing now..."); - try { - await this.initializeStatsCache(); - } catch (error) { - this.logger.error("[Stats Cache] Failed to initialize:", error); - return { - success: false, - message: "Failed to initialize statistics cache", - stats: null, - lastUpdated: null, - }; - } - } - - // Check again after initialization attempt - if (!this.statsCache) { - return { - success: false, - message: "Statistics cache not initialized yet", - stats: null, - lastUpdated: null, - }; - } - - return { - success: true, - stats: { - totalLocalFiles: this.statsCache.totalLocalFiles, - alreadyImported: this.statsCache.alreadyImported, - newFiles: this.statsCache.newFiles, - percentageImported: this.statsCache.percentageImported, - pendingFiles: this.statsCache.pendingFiles.size, - }, - lastUpdated: this.statsCache.lastUpdated.toISOString(), - }; - }, - }, - - /** - * Invalidate statistics cache (force refresh on next request) - */ - invalidateStatsCache: { - async handler() { - this.logger.info("[Stats Cache] Invalidating cache..."); - await this.initializeStatsCache(); - return { success: true, message: "Cache invalidated and refreshed" }; - }, - }, - }, - methods: { - /** - * Initialize statistics cache by fetching current import statistics - * @private - */ - initializeStatsCache: async function() { - try { - this.logger.info("[Stats Cache] Initializing import statistics cache..."); - const stats = await this.broker.call("library.getImportStatistics", {}); - - if (stats && stats.success) { - this.statsCache = { - totalLocalFiles: stats.stats.totalLocalFiles, - alreadyImported: stats.stats.alreadyImported, - newFiles: stats.stats.newFiles, - percentageImported: stats.stats.percentageImported, - lastUpdated: new Date(), - pendingFiles: new Set(), - }; - this.logger.info("[Stats Cache] Cache initialized successfully"); - } - } catch (error) { - this.logger.error("[Stats Cache] Failed to initialize cache:", error); - } - }, - - /** - * Update statistics cache when files are added or removed - * @param event - File event type ('add' or 'unlink') - * @param filePath - Path to the file - * @private - */ - updateStatsCache: function(event: string, filePath: string) { - if (!this.statsCache) return; - - const fileExtension = path.extname(filePath); - const isComicFile = [".cbz", ".cbr", ".cb7"].includes(fileExtension); - - if (!isComicFile) return; - - if (event === "add") { - // Add to pending files (in stabilization period) - this.statsCache.pendingFiles.add(filePath); - this.statsCache.totalLocalFiles++; - this.statsCache.newFiles++; - } else if (event === "unlink") { - // Remove from pending if it was there - this.statsCache.pendingFiles.delete(filePath); - this.statsCache.totalLocalFiles--; - // Could be either new or already imported, but we'll decrement newFiles for safety - if (this.statsCache.newFiles > 0) { - this.statsCache.newFiles--; - } - } - - // Recalculate percentage - if (this.statsCache.totalLocalFiles > 0) { - const percentage = ((this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100).toFixed(2); - this.statsCache.percentageImported = `${percentage}%`; - } else { - this.statsCache.percentageImported = "0.00%"; - } - - this.statsCache.lastUpdated = new Date(); - - // Trigger debounced broadcast - if (this.broadcastStatsUpdate) { - this.broadcastStatsUpdate(); - } - }, - - /** - * Broadcast statistics update via Socket.IO - * @private - */ - broadcastStats: async function() { - if (!this.statsCache) return; - - try { - await this.broker.call("socket.broadcast", { - namespace: "/", - event: "IMPORT_STATISTICS_UPDATED", - args: [{ - stats: { - totalLocalFiles: this.statsCache.totalLocalFiles, - alreadyImported: this.statsCache.alreadyImported, - newFiles: this.statsCache.newFiles, - percentageImported: this.statsCache.percentageImported, - pendingFiles: this.statsCache.pendingFiles.size, - }, - lastUpdated: this.statsCache.lastUpdated.toISOString(), - }], - }); - this.logger.debug("[Stats Cache] Broadcasted statistics update"); - } catch (error) { - this.logger.error("[Stats Cache] Failed to broadcast statistics:", error); - } - }, - - /** - * Mark a file as imported (moved from pending to imported) - * @param filePath - Path to the imported file - * @private - */ - markFileAsImported: function(filePath: string) { - if (!this.statsCache) return; - - this.statsCache.pendingFiles.delete(filePath); - this.statsCache.alreadyImported++; - if (this.statsCache.newFiles > 0) { - this.statsCache.newFiles--; - } - - // Recalculate percentage - if (this.statsCache.totalLocalFiles > 0) { - const percentage = ((this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100).toFixed(2); - this.statsCache.percentageImported = `${percentage}%`; - } - - this.statsCache.lastUpdated = new Date(); - - // Trigger debounced broadcast - if (this.broadcastStatsUpdate) { - this.broadcastStatsUpdate(); - } - }, - }, + actions: {}, + methods: {}, started: this.startWatcher, stopped: this.stopWatcher, }); @@ -439,20 +176,6 @@ export default class ApiService extends Service { return; } - // Initialize debounced broadcast function (2 second debounce for statistics updates) - this.broadcastStatsUpdate = debounce( - () => { - this.broadcastStats(); - }, - 2000, - { leading: false, trailing: true } - ); - - // Initialize statistics cache (async, but don't block watcher startup) - this.initializeStatsCache().catch(err => { - this.logger.error("[Stats Cache] Failed to initialize on startup:", err); - }); - this.fileWatcher = chokidar.watch(watchDir, { persistent: true, ignoreInitial: true, @@ -534,11 +257,6 @@ export default class ApiService extends Service { } } - // Update statistics cache for add/unlink events - if (event === "add" || event === "unlink") { - this.updateStatsCache(event, filePath); - } - if (event === "add" && stats) { setTimeout(async () => { try { @@ -620,9 +338,6 @@ export default class ApiService extends Service { this.logger.info(`Successfully imported: ${filePath}`); - // Mark file as imported in statistics cache - this.markFileAsImported(filePath); - // Complete watcher session await this.broker.call("importstate.completeSession", { sessionId, diff --git a/services/importstate.service.ts b/services/importstate.service.ts index b54013b..4c83d9e 100644 --- a/services/importstate.service.ts +++ b/services/importstate.service.ts @@ -237,7 +237,26 @@ export default class ImportStateService extends Service { */ getActiveSession: { async handler() { - return this.getActiveSession(); + const session = this.getActiveSession(); + if (session) { + // Format session for GraphQL response + return { + sessionId: session.sessionId, + type: session.type, + status: session.status, + startedAt: session.startedAt.toISOString(), + completedAt: session.completedAt?.toISOString() || null, + stats: { + totalFiles: session.stats.totalFiles, + filesQueued: session.stats.filesQueued, + filesProcessed: session.stats.filesProcessed, + filesSucceeded: session.stats.filesSucceeded, + filesFailed: session.stats.filesFailed, + }, + directoryPath: session.directoryPath || null, + }; + } + return null; }, }, @@ -339,9 +358,13 @@ export default class ImportStateService extends Service { started: async () => { this.logger.info("[Import State] Service started"); - // Clean up any stale sessions from Redis on startup - const keys = await pubClient.keys("import:session:*"); - this.logger.info(`[Import State] Found ${keys.length} session keys in Redis`); + // Auto-complete stuck sessions every 5 minutes + setInterval(() => { + for (const [id, session] of this.activeSessions.entries()) { + const age = Date.now() - session.startedAt.getTime(); + if (age > 30 * 60 * 1000 && session.stats.filesProcessed === 0) this.actions.completeSession({ sessionId: id, success: false }); + } + }, 5 * 60 * 1000); }, }); } diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index 43f2c0e..c735eaf 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -379,6 +379,13 @@ export default class JobQueueService extends Service { }, ], }); + + // Emit final library statistics when queue is drained + try { + await this.broker.call("socket.broadcastLibraryStatistics", {}); + } catch (err) { + console.error("Failed to emit library statistics after queue drained:", err); + } }, async "enqueue.async.completed"(ctx: Context<{ id: Number }>) { // 1. Fetch the job result using the job Id @@ -410,6 +417,13 @@ export default class JobQueueService extends Service { }); console.log(`Job ID ${ctx.params.id} completed.`); + + // 6. Emit updated library statistics after each import + try { + await this.broker.call("socket.broadcastLibraryStatistics", {}); + } catch (err) { + console.error("Failed to emit library statistics after import:", err); + } }, async "enqueue.async.failed"(ctx) { diff --git a/services/library.service.ts b/services/library.service.ts index 59faa73..b47c474 100644 --- a/services/library.service.ts +++ b/services/library.service.ts @@ -253,6 +253,15 @@ export default class ImportService extends Service { sessionId, status: "active", }); + + // Emit library statistics after scanning + try { + await this.broker.call("socket.broadcastLibraryStatistics", { + directoryPath: resolvedPath, + }); + } catch (err) { + console.error("Failed to emit library statistics:", err); + } }); } catch (error) { console.log(error); @@ -466,12 +475,30 @@ export default class ImportService extends Service { sessionId, status: "active", }); + + // Emit library statistics after queueing + try { + await this.broker.call("socket.broadcastLibraryStatistics", { + directoryPath: resolvedPath, + }); + } catch (err) { + console.error("Failed to emit library statistics:", err); + } } else { // No files to import, complete immediately await this.broker.call("importstate.completeSession", { sessionId, success: true, }); + + // Emit library statistics even when no new files + try { + await this.broker.call("socket.broadcastLibraryStatistics", { + directoryPath: resolvedPath, + }); + } catch (err) { + console.error("Failed to emit library statistics:", err); + } } // Emit completion event (queueing complete, not import complete) @@ -1210,6 +1237,11 @@ export default class ImportService extends Service { "search.deleteElasticSearchIndices", {} ); + + // Invalidate statistics cache after flushing database + console.info("Invalidating statistics cache after flushDB..."); + await ctx.broker.call("api.invalidateStatsCache"); + return { data, coversFolderDeleteResult, diff --git a/services/socket.service.ts b/services/socket.service.ts index 65ede36..1d132e8 100644 --- a/services/socket.service.ts +++ b/services/socket.service.ts @@ -326,6 +326,106 @@ export default class SocketService extends Service { }, }, }, + events: { + // File watcher events - forward to Socket.IO clients + async "add"(ctx: Context<{ path: string }>) { + console.log(`[File Watcher] File added: ${ctx.params.path}`); + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "LS_FILE_ADDED", + args: [ + { + path: ctx.params.path, + timestamp: new Date().toISOString(), + }, + ], + }); + + // Emit updated statistics after file addition + try { + await this.broker.call("socket.broadcastLibraryStatistics", {}); + } catch (err) { + console.error("Failed to emit library statistics after file add:", err); + } + }, + + async "unlink"(ctx: Context<{ path: string }>) { + console.log(`[File Watcher] File removed: ${ctx.params.path}`); + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "LS_FILE_REMOVED", + args: [ + { + path: ctx.params.path, + timestamp: new Date().toISOString(), + }, + ], + }); + + // Emit updated statistics after file removal + try { + await this.broker.call("socket.broadcastLibraryStatistics", {}); + } catch (err) { + console.error("Failed to emit library statistics after file remove:", err); + } + }, + + async "addDir"(ctx: Context<{ path: string }>) { + console.log(`[File Watcher] Directory added: ${ctx.params.path}`); + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "LS_DIRECTORY_ADDED", + args: [ + { + path: ctx.params.path, + timestamp: new Date().toISOString(), + }, + ], + }); + + // Emit updated statistics after directory addition + try { + await this.broker.call("socket.broadcastLibraryStatistics", {}); + } catch (err) { + console.error("Failed to emit library statistics after directory add:", err); + } + }, + + async "unlinkDir"(ctx: Context<{ path: string }>) { + console.log(`[File Watcher] Directory removed: ${ctx.params.path}`); + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "LS_DIRECTORY_REMOVED", + args: [ + { + path: ctx.params.path, + timestamp: new Date().toISOString(), + }, + ], + }); + + // Emit updated statistics after directory removal + try { + await this.broker.call("socket.broadcastLibraryStatistics", {}); + } catch (err) { + console.error("Failed to emit library statistics after directory remove:", err); + } + }, + + async "change"(ctx: Context<{ path: string }>) { + console.log(`[File Watcher] File changed: ${ctx.params.path}`); + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "LS_FILE_CHANGED", + args: [ + { + path: ctx.params.path, + timestamp: new Date().toISOString(), + }, + ], + }); + }, + }, methods: { sleep: (ms: number): Promise => { return new Promise((resolve) => setTimeout(resolve, ms)); diff --git a/utils/import.utils.ts b/utils/import.utils.ts index ce2fbbd..51b23ea 100644 --- a/utils/import.utils.ts +++ b/utils/import.utils.ts @@ -141,25 +141,43 @@ export async function getImportStatistics(localFilePaths: string[]): Promise<{ }> { console.log(`[Import Stats] Checking ${localFilePaths.length} files against database...`); - // Normalize all paths upfront - const normalizedPaths = localFilePaths.map((p) => path.normalize(p)); + // Extract file names (without extension) from paths + // This matches how comics are stored in the database (rawFileDetails.name) + const fileNameToPath = new Map(); + const fileNames: string[] = []; - // Use batch query instead of fetching all comics - // This is much faster for large libraries + for (const filePath of localFilePaths) { + const fileName = path.basename(filePath, path.extname(filePath)); + fileNames.push(fileName); + fileNameToPath.set(fileName, filePath); + } + + console.log(`[Import Stats] Extracted ${fileNames.length} file names from paths`); + + // Query by file name (matches how comics are checked during import) const importedComics = await Comic.find( { - "rawFileDetails.filePath": { $in: normalizedPaths }, + "rawFileDetails.name": { $in: fileNames }, }, - { "rawFileDetails.filePath": 1, _id: 0 } + { "rawFileDetails.name": 1, "rawFileDetails.filePath": 1, _id: 0 } ).lean(); - // Build Set of imported paths - const importedPaths = new Set( - importedComics - .map((c: any) => c.rawFileDetails?.filePath) - .filter(Boolean) - .map((p: string) => path.normalize(p)) - ); + console.log(`[Import Stats] Found ${importedComics.length} matching comics in database`); + + // Build Set of imported paths based on name matching + const importedPaths = new Set(); + const importedNames = new Set(); + + for (const comic of importedComics) { + if (comic.rawFileDetails?.name) { + importedNames.add(comic.rawFileDetails.name); + // Map back to the local file path + const localPath = fileNameToPath.get(comic.rawFileDetails.name); + if (localPath) { + importedPaths.add(localPath); + } + } + } const alreadyImported = importedPaths.size; const newFiles = localFilePaths.length - alreadyImported;