diff --git a/examples/frontend/graphql-queries/importQueries.ts b/examples/frontend/graphql-queries/importQueries.ts new file mode 100644 index 0000000..09865cf --- /dev/null +++ b/examples/frontend/graphql-queries/importQueries.ts @@ -0,0 +1,117 @@ +/** + * GraphQL queries and mutations for import operations + * @module examples/frontend/graphql-queries/importQueries + */ + +/** + * Query to get import statistics for a directory + * Shows how many files are already imported vs. new files + */ +export const GET_IMPORT_STATISTICS = ` + query GetImportStatistics($directoryPath: String) { + getImportStatistics(directoryPath: $directoryPath) { + success + directory + stats { + totalLocalFiles + alreadyImported + newFiles + percentageImported + } + } + } +`; + +/** + * Mutation to start a new full import + * Imports all comics in the directory, skipping already imported files + */ +export const START_NEW_IMPORT = ` + mutation StartNewImport($sessionId: String!) { + startNewImport(sessionId: $sessionId) { + success + message + jobsQueued + } + } +`; + +/** + * Mutation to start an incremental import + * Only imports new files not already in the database + */ +export const START_INCREMENTAL_IMPORT = ` + mutation StartIncrementalImport($sessionId: String!, $directoryPath: String) { + startIncrementalImport(sessionId: $sessionId, directoryPath: $directoryPath) { + success + message + stats { + total + alreadyImported + newFiles + queued + } + } + } +`; + +/** + * Example usage with variables + */ +export const exampleUsage = { + // Get import statistics + getStatistics: { + query: GET_IMPORT_STATISTICS, + variables: { + directoryPath: "/comics", // Optional, defaults to COMICS_DIRECTORY + }, + }, + + // Start new full import + startNewImport: { + query: START_NEW_IMPORT, + variables: { + sessionId: `import-${Date.now()}`, + }, + }, + + // Start incremental import + startIncrementalImport: { + query: START_INCREMENTAL_IMPORT, + variables: { + sessionId: `incremental-${Date.now()}`, + directoryPath: "/comics", // Optional + }, + }, +}; + +/** + * TypeScript types for the responses + */ +export interface ImportStatistics { + success: boolean; + directory: string; + stats: { + totalLocalFiles: number; + alreadyImported: number; + newFiles: number; + percentageImported: string; + }; +} + +export interface ImportJobResult { + success: boolean; + message: string; + jobsQueued: number; +} + +export interface IncrementalImportResult { + success: boolean; + message: string; + stats: { + total: number; + alreadyImported: number; + newFiles: number; + queued: number; + }; +} diff --git a/models/graphql/resolvers.ts b/models/graphql/resolvers.ts index 61f25e5..f0973dd 100644 --- a/models/graphql/resolvers.ts +++ b/models/graphql/resolvers.ts @@ -621,6 +621,99 @@ export const resolvers = { throw new Error("Failed to preview canonical metadata"); } }, + + /** + * Get cached import statistics (fast, real-time) + * @async + * @function getCachedImportStatistics + * @param {any} _ - Parent resolver (unused) + * @param {Object} args - Query arguments (none) + * @param {Object} context - GraphQL context with broker + * @returns {Promise} Cached import statistics + * @throws {Error} If statistics service is unavailable + * @description Retrieves cached import statistics from the API service. + * This is a fast, real-time query that doesn't require filesystem scanning. + * + * @example + * ```graphql + * query { + * getCachedImportStatistics { + * success + * stats { + * totalLocalFiles + * alreadyImported + * newFiles + * percentageImported + * pendingFiles + * } + * lastUpdated + * } + * } + * ``` + */ + getCachedImportStatistics: async ( + _: any, + args: {}, + context: any + ) => { + try { + const broker = context?.broker; + + if (!broker) { + throw new Error("Broker not available in context"); + } + + const result = await broker.call("api.getCachedImportStatistics"); + return result; + } catch (error) { + console.error("Error fetching cached import statistics:", error); + throw new Error(`Failed to fetch cached import statistics: ${error.message}`); + } + }, + + /** + * Get job result statistics grouped by session + * @async + * @function getJobResultStatistics + * @param {any} _ - Parent resolver (unused) + * @param {Object} args - Query arguments (none) + * @param {Object} context - GraphQL context with broker + * @returns {Promise} Array of job result statistics by session + * @throws {Error} If job queue service is unavailable + * @description Retrieves job result statistics grouped by session ID, + * including counts of completed and failed jobs and earliest timestamp. + * + * @example + * ```graphql + * query { + * getJobResultStatistics { + * sessionId + * completedJobs + * failedJobs + * earliestTimestamp + * } + * } + * ``` + */ + getJobResultStatistics: async ( + _: any, + args: {}, + context: any + ) => { + try { + const broker = context?.broker; + + if (!broker) { + throw new Error("Broker not available in context"); + } + + const result = await broker.call("jobqueue.getJobResultStatistics"); + return result; + } catch (error) { + console.error("Error fetching job result statistics:", error); + throw new Error(`Failed to fetch job result statistics: ${error.message}`); + } + }, }, Mutation: { diff --git a/models/graphql/typedef.ts b/models/graphql/typedef.ts index 2071cac..091f491 100644 --- a/models/graphql/typedef.ts +++ b/models/graphql/typedef.ts @@ -344,6 +344,15 @@ export const typeDefs = gql` comicId: ID! preferences: UserPreferencesInput ): CanonicalMetadata + + # Get import statistics for a directory + getImportStatistics(directoryPath: String): ImportStatistics! + + # Get cached import statistics (fast, real-time) + getCachedImportStatistics: CachedImportStatistics! + + # Get job result statistics grouped by session + getJobResultStatistics: [JobResultStatistics!]! } # Mutations @@ -385,6 +394,15 @@ export const typeDefs = gql` source: MetadataSource! metadata: String! ): Comic! + + # Start a new full import of the comics directory + startNewImport(sessionId: String!): ImportJobResult! + + # Start an incremental import (only new files) + startIncrementalImport( + sessionId: String! + directoryPath: String + ): IncrementalImportResult! } # Input types @@ -703,4 +721,63 @@ export const typeDefs = gql` _score: Float _source: Comic! } + + # Import statistics + type ImportStatistics { + success: Boolean! + directory: String! + stats: ImportStats! + } + + type ImportStats { + totalLocalFiles: Int! + alreadyImported: Int! + newFiles: Int! + percentageImported: String! + } + + # Cached import statistics (real-time) + type CachedImportStatistics { + success: Boolean! + message: String + stats: CachedImportStats + lastUpdated: String + } + + type CachedImportStats { + totalLocalFiles: Int! + alreadyImported: Int! + newFiles: Int! + percentageImported: String! + pendingFiles: Int! + } + + # Import job result + type ImportJobResult { + success: Boolean! + message: String! + jobsQueued: Int! + } + + # Incremental import result + type IncrementalImportResult { + success: Boolean! + message: String! + stats: IncrementalImportStats! + } + + type IncrementalImportStats { + total: Int! + alreadyImported: Int! + newFiles: Int! + queued: Int! + } + + # Job result statistics + type JobResultStatistics { + sessionId: String! + completedJobs: Int! + failedJobs: Int! + earliestTimestamp: String! + } `; diff --git a/services/api.service.ts b/services/api.service.ts index da1a0b6..60c8133 100644 --- a/services/api.service.ts +++ b/services/api.service.ts @@ -6,6 +6,18 @@ import ApiGateway from "moleculer-web"; import debounce from "lodash/debounce"; import { IFolderData } from "threetwo-ui-typings"; +/** + * Import statistics cache for real-time updates + */ +interface ImportStatisticsCache { + totalLocalFiles: number; + alreadyImported: number; + newFiles: number; + percentageImported: string; + lastUpdated: Date; + pendingFiles: Set; // Files in stabilization period +} + /** * ApiService exposes REST endpoints and watches the comics directory for changes. * It uses chokidar to monitor filesystem events and broadcasts them via the Moleculer broker. @@ -18,6 +30,18 @@ export default class ApiService extends Service { */ private fileWatcher?: any; + /** + * Import statistics cache for real-time updates + * @private + */ + private statsCache: ImportStatisticsCache | null = null; + + /** + * Debounced function to broadcast statistics updates + * @private + */ + private broadcastStatsUpdate?: ReturnType; + /** * Creates an instance of ApiService. * @param {ServiceBroker} broker - The Moleculer service broker instance. @@ -109,7 +133,192 @@ export default class ApiService extends Service { assets: { folder: "public", options: {} }, }, events: {}, - methods: {}, + actions: { + /** + * Get cached import statistics (fast, no filesystem scan) + * @returns Cached statistics or null if not initialized + */ + getCachedImportStatistics: { + rest: "GET /cachedImportStatistics", + async handler() { + // If cache not initialized, try to initialize it now + if (!this.statsCache) { + this.logger.info("[Stats Cache] Cache not initialized, initializing now..."); + try { + await this.initializeStatsCache(); + } catch (error) { + this.logger.error("[Stats Cache] Failed to initialize:", error); + return { + success: false, + message: "Failed to initialize statistics cache", + stats: null, + lastUpdated: null, + }; + } + } + + // Check again after initialization attempt + if (!this.statsCache) { + return { + success: false, + message: "Statistics cache not initialized yet", + stats: null, + lastUpdated: null, + }; + } + + return { + success: true, + stats: { + totalLocalFiles: this.statsCache.totalLocalFiles, + alreadyImported: this.statsCache.alreadyImported, + newFiles: this.statsCache.newFiles, + percentageImported: this.statsCache.percentageImported, + pendingFiles: this.statsCache.pendingFiles.size, + }, + lastUpdated: this.statsCache.lastUpdated.toISOString(), + }; + }, + }, + + /** + * Invalidate statistics cache (force refresh on next request) + */ + invalidateStatsCache: { + async handler() { + this.logger.info("[Stats Cache] Invalidating cache..."); + await this.initializeStatsCache(); + return { success: true, message: "Cache invalidated and refreshed" }; + }, + }, + }, + methods: { + /** + * Initialize statistics cache by fetching current import statistics + * @private + */ + initializeStatsCache: async function() { + try { + this.logger.info("[Stats Cache] Initializing import statistics cache..."); + const stats = await this.broker.call("library.getImportStatistics", {}); + + if (stats && stats.success) { + this.statsCache = { + totalLocalFiles: stats.stats.totalLocalFiles, + alreadyImported: stats.stats.alreadyImported, + newFiles: stats.stats.newFiles, + percentageImported: stats.stats.percentageImported, + lastUpdated: new Date(), + pendingFiles: new Set(), + }; + this.logger.info("[Stats Cache] Cache initialized successfully"); + } + } catch (error) { + this.logger.error("[Stats Cache] Failed to initialize cache:", error); + } + }, + + /** + * Update statistics cache when files are added or removed + * @param event - File event type ('add' or 'unlink') + * @param filePath - Path to the file + * @private + */ + updateStatsCache: function(event: string, filePath: string) { + if (!this.statsCache) return; + + const fileExtension = path.extname(filePath); + const isComicFile = [".cbz", ".cbr", ".cb7"].includes(fileExtension); + + if (!isComicFile) return; + + if (event === "add") { + // Add to pending files (in stabilization period) + this.statsCache.pendingFiles.add(filePath); + this.statsCache.totalLocalFiles++; + this.statsCache.newFiles++; + } else if (event === "unlink") { + // Remove from pending if it was there + this.statsCache.pendingFiles.delete(filePath); + this.statsCache.totalLocalFiles--; + // Could be either new or already imported, but we'll decrement newFiles for safety + if (this.statsCache.newFiles > 0) { + this.statsCache.newFiles--; + } + } + + // Recalculate percentage + if (this.statsCache.totalLocalFiles > 0) { + const percentage = ((this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100).toFixed(2); + this.statsCache.percentageImported = `${percentage}%`; + } else { + this.statsCache.percentageImported = "0.00%"; + } + + this.statsCache.lastUpdated = new Date(); + + // Trigger debounced broadcast + if (this.broadcastStatsUpdate) { + this.broadcastStatsUpdate(); + } + }, + + /** + * Broadcast statistics update via Socket.IO + * @private + */ + broadcastStats: async function() { + if (!this.statsCache) return; + + try { + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "IMPORT_STATISTICS_UPDATED", + args: [{ + stats: { + totalLocalFiles: this.statsCache.totalLocalFiles, + alreadyImported: this.statsCache.alreadyImported, + newFiles: this.statsCache.newFiles, + percentageImported: this.statsCache.percentageImported, + pendingFiles: this.statsCache.pendingFiles.size, + }, + lastUpdated: this.statsCache.lastUpdated.toISOString(), + }], + }); + this.logger.debug("[Stats Cache] Broadcasted statistics update"); + } catch (error) { + this.logger.error("[Stats Cache] Failed to broadcast statistics:", error); + } + }, + + /** + * Mark a file as imported (moved from pending to imported) + * @param filePath - Path to the imported file + * @private + */ + markFileAsImported: function(filePath: string) { + if (!this.statsCache) return; + + this.statsCache.pendingFiles.delete(filePath); + this.statsCache.alreadyImported++; + if (this.statsCache.newFiles > 0) { + this.statsCache.newFiles--; + } + + // Recalculate percentage + if (this.statsCache.totalLocalFiles > 0) { + const percentage = ((this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100).toFixed(2); + this.statsCache.percentageImported = `${percentage}%`; + } + + this.statsCache.lastUpdated = new Date(); + + // Trigger debounced broadcast + if (this.broadcastStatsUpdate) { + this.broadcastStatsUpdate(); + } + }, + }, started: this.startWatcher, stopped: this.stopWatcher, }); @@ -120,7 +329,7 @@ export default class ApiService extends Service { * Debounces rapid events and logs initial scan completion. * @private */ - private startWatcher(): void { + private async startWatcher(): Promise { const rawDir = process.env.COMICS_DIRECTORY; if (!rawDir) { this.logger.error("COMICS_DIRECTORY not set; cannot start watcher"); @@ -133,6 +342,20 @@ export default class ApiService extends Service { return; } + // Initialize debounced broadcast function (2 second debounce for statistics updates) + this.broadcastStatsUpdate = debounce( + () => { + this.broadcastStats(); + }, + 2000, + { leading: false, trailing: true } + ); + + // Initialize statistics cache (async, but don't block watcher startup) + this.initializeStatsCache().catch(err => { + this.logger.error("[Stats Cache] Failed to initialize on startup:", err); + }); + this.fileWatcher = chokidar.watch(watchDir, { persistent: true, ignoreInitial: true, @@ -202,13 +425,19 @@ export default class ApiService extends Service { stats?: fs.Stats ): Promise { this.logger.info(`File event [${event}]: ${filePath}`); + + // Update statistics cache for add/unlink events + if (event === "add" || event === "unlink") { + this.updateStatsCache(event, filePath); + } + if (event === "add" && stats) { setTimeout(async () => { - const newStats = await fs.promises.stat(filePath); - if (newStats.mtime.getTime() === stats.mtime.getTime()) { - this.logger.info(`Stable file detected: ${filePath}, importing.`); - - try { + try { + const newStats = await fs.promises.stat(filePath); + if (newStats.mtime.getTime() === stats.mtime.getTime()) { + this.logger.info(`Stable file detected: ${filePath}, importing.`); + const folderData: IFolderData[] = await this.broker.call( "library.walkFolders", { basePathToWalk: filePath } @@ -266,13 +495,18 @@ export default class ApiService extends Service { }); this.logger.info(`Successfully queued import for: ${filePath}`); + + // Mark file as imported in statistics cache + this.markFileAsImported(filePath); } - } catch (error) { - this.logger.error(`Error importing file ${filePath}:`, error); } + } catch (error) { + this.logger.error(`Error importing file ${filePath}:`, error); } }, 3000); } + + // Broadcast file system event this.broker.broadcast(event, { path: filePath }); } }