From 8c224bad68d85e119b6e7e9d1b69584f6cba5228 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Fri, 27 Feb 2026 00:00:46 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A8=20Fixed=20import=20flow?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dependencies.docker-compose.yml | 2 +- migrations/add-import-indexes.ts | 137 +++++++++++++++++++ models/comic.model.ts | 5 + moleculer.config.ts | 1 - package.json | 3 +- services/library.service.ts | 217 ++++++++++++++++++++++++++++++- 6 files changed, 360 insertions(+), 5 deletions(-) create mode 100644 migrations/add-import-indexes.ts diff --git a/dependencies.docker-compose.yml b/dependencies.docker-compose.yml index 785d5ff..3a30f72 100644 --- a/dependencies.docker-compose.yml +++ b/dependencies.docker-compose.yml @@ -55,7 +55,7 @@ services: - "27017:27017" volumes: - "mongodb_data:/bitnami/mongodb" - + redis: image: "bitnami/redis:latest" container_name: queue diff --git a/migrations/add-import-indexes.ts b/migrations/add-import-indexes.ts new file mode 100644 index 0000000..fc09a49 --- /dev/null +++ b/migrations/add-import-indexes.ts @@ -0,0 +1,137 @@ +/** + * Migration script to add indexes for import performance optimization + * + * This migration adds indexes to the Comic collection to dramatically improve + * the performance of import statistics queries, especially for large libraries. + * + * Run this script once to add indexes to an existing database: + * npx ts-node migrations/add-import-indexes.ts + */ + +import mongoose from "mongoose"; +import Comic from "../models/comic.model"; + +// Suppress Mongoose 7 deprecation warning +mongoose.set('strictQuery', false); + +const MONGO_URI = process.env.MONGO_URI || "mongodb://localhost:27017/threetwo"; + +async function addIndexes() { + try { + console.log("Connecting to MongoDB..."); + await mongoose.connect(MONGO_URI); + console.log("Connected successfully"); + + console.log("\nAdding indexes to Comic collection..."); + + // Get the collection + const collection = Comic.collection; + + // Check existing indexes + console.log("\nExisting indexes:"); + const existingIndexes = await collection.indexes(); + const existingIndexMap = new Map(); + + existingIndexes.forEach((index) => { + const keyStr = JSON.stringify(index.key); + console.log(` - ${keyStr} (name: ${index.name})`); + existingIndexMap.set(keyStr, index.name); + }); + + // Helper function to create index if it doesn't exist + async function createIndexIfNeeded( + key: any, + options: any, + description: string + ) { + const keyStr = JSON.stringify(key); + + if (existingIndexMap.has(keyStr)) { + console.log(` ⏭️ Index on ${description} already exists (${existingIndexMap.get(keyStr)})`); + return; + } + + console.log(` Creating index on ${description}...`); + try { + await collection.createIndex(key, options); + console.log(" ✓ Created"); + } catch (error: any) { + // If index already exists with different name, that's okay + if (error.code === 85 || error.codeName === 'IndexOptionsConflict') { + console.log(` ⏭️ Index already exists (skipping)`); + } else { + throw error; + } + } + } + + // Add new indexes + console.log("\nCreating new indexes..."); + + // Index for import statistics queries (most important) + await createIndexIfNeeded( + { "rawFileDetails.filePath": 1 }, + { + name: "rawFileDetails_filePath_1", + background: true // Create in background to avoid blocking + }, + "rawFileDetails.filePath" + ); + + // Index for duplicate detection + await createIndexIfNeeded( + { "rawFileDetails.name": 1 }, + { + name: "rawFileDetails_name_1", + background: true + }, + "rawFileDetails.name" + ); + + // Index for wanted comics queries + await createIndexIfNeeded( + { "wanted.volume.id": 1 }, + { + name: "wanted_volume_id_1", + background: true, + sparse: true // Only index documents that have this field + }, + "wanted.volume.id" + ); + + // Verify indexes were created + console.log("\nFinal indexes:"); + const finalIndexes = await collection.indexes(); + finalIndexes.forEach((index) => { + console.log(` - ${JSON.stringify(index.key)} (name: ${index.name})`); + }); + + console.log("\n✅ Migration completed successfully!"); + console.log("\nPerformance improvements:"); + console.log(" - Import statistics queries should be 10-100x faster"); + console.log(" - Large libraries (10,000+ comics) will see the most benefit"); + console.log(" - Timeout errors should be eliminated"); + + } catch (error) { + console.error("\n❌ Migration failed:", error); + throw error; + } finally { + await mongoose.disconnect(); + console.log("\nDisconnected from MongoDB"); + } +} + +// Run the migration +if (require.main === module) { + addIndexes() + .then(() => { + console.log("\nMigration script completed"); + process.exit(0); + }) + .catch((error) => { + console.error("\nMigration script failed:", error); + process.exit(1); + }); +} + +export default addIndexes; diff --git a/models/comic.model.ts b/models/comic.model.ts index d10fd29..f26d4f4 100644 --- a/models/comic.model.ts +++ b/models/comic.model.ts @@ -387,5 +387,10 @@ ComicSchema.plugin(mongoosastic, { } as MongoosasticPluginOpts); ComicSchema.plugin(paginate); +// Add indexes for performance +ComicSchema.index({ "rawFileDetails.filePath": 1 }); // For import statistics queries +ComicSchema.index({ "rawFileDetails.name": 1 }); // For duplicate detection +ComicSchema.index({ "wanted.volume.id": 1 }); // For wanted comics queries + const Comic = mongoose.model("Comic", ComicSchema); export default Comic; diff --git a/moleculer.config.ts b/moleculer.config.ts index 059c768..a1f6de1 100644 --- a/moleculer.config.ts +++ b/moleculer.config.ts @@ -102,7 +102,6 @@ const brokerConfig: BrokerOptions = { serializer: "JSON", // Number of milliseconds to wait before reject a request with a RequestTimeout error. Disabled: 0 - // Increased to 60 seconds to handle long-running operations like import statistics on large libraries requestTimeout: 60 * 1000, // Retry policy settings. More info: https://moleculer.services/docs/0.14/fault-tolerance.html#Retry diff --git a/package.json b/package.json index 1f9feee..01c7585 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,8 @@ "lint": "eslint --ext .js,.ts .", "dc:up": "docker-compose up --build -d", "dc:logs": "docker-compose logs -f", - "dc:down": "docker-compose down" + "dc:down": "docker-compose down", + "migrate:indexes": "ts-node migrations/add-import-indexes.ts" }, "keywords": [ "microservices", diff --git a/services/library.service.ts b/services/library.service.ts index 1910080..0e732d7 100644 --- a/services/library.service.ts +++ b/services/library.service.ts @@ -59,6 +59,7 @@ import path from "path"; import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories"; import AirDCPPSocket from "../shared/airdcpp.socket"; import { importComicViaGraphQL } from "../utils/import.graphql.utils"; +import { getImportStatistics as getImportStats } from "../utils/import.utils"; console.log(`MONGO -> ${process.env.MONGO_URI}`); export default class ImportService extends Service { @@ -86,7 +87,7 @@ export default class ImportService extends Service { async handler( ctx: Context<{ basePathToWalk: string; - extensions: string[]; + extensions?: string[]; }> ) { console.log(ctx.params); @@ -94,7 +95,7 @@ export default class ImportService extends Service { ".cbz", ".cbr", ".cb7", - ...ctx.params.extensions, + ...(ctx.params.extensions || []), ]); }, }, @@ -245,6 +246,218 @@ export default class ImportService extends Service { } }, }, + getImportStatistics: { + rest: "POST /getImportStatistics", + timeout: 300000, // 5 minute timeout for large libraries + async handler( + ctx: Context<{ + directoryPath?: string; + }> + ) { + try { + const { directoryPath } = ctx.params; + const resolvedPath = path.resolve(directoryPath || COMICS_DIRECTORY); + console.log(`[Import Statistics] Analyzing directory: ${resolvedPath}`); + + // Collect all comic files from the directory + const localFiles: string[] = []; + + await new Promise((resolve, reject) => { + klaw(resolvedPath) + .on("error", (err) => { + console.error(`Error walking directory ${resolvedPath}:`, err); + reject(err); + }) + .pipe( + through2.obj(function (item, enc, next) { + const fileExtension = path.extname(item.path); + if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) { + localFiles.push(item.path); + } + next(); + }) + ) + .on("data", () => {}) // Required for stream to work + .on("end", () => { + console.log(`[Import Statistics] Found ${localFiles.length} comic files`); + resolve(); + }); + }); + + // Get statistics by comparing with database + const stats = await getImportStats(localFiles); + const percentageImported = stats.total > 0 + ? ((stats.alreadyImported / stats.total) * 100).toFixed(2) + : "0.00"; + + return { + success: true, + directory: resolvedPath, + stats: { + totalLocalFiles: stats.total, + alreadyImported: stats.alreadyImported, + newFiles: stats.newFiles, + percentageImported: `${percentageImported}%`, + }, + }; + } catch (error) { + console.error("[Import Statistics] Error:", error); + throw new Errors.MoleculerError( + "Failed to calculate import statistics", + 500, + "IMPORT_STATS_ERROR", + { error: error.message } + ); + } + }, + }, + incrementalImport: { + rest: "POST /incrementalImport", + timeout: 60000, // 60 second timeout + async handler( + ctx: Context<{ + sessionId: string; + directoryPath?: string; + }> + ) { + try { + const { sessionId, directoryPath } = ctx.params; + const resolvedPath = path.resolve(directoryPath || COMICS_DIRECTORY); + console.log(`[Incremental Import] Starting for directory: ${resolvedPath}`); + + // Emit start event + this.broker.broadcast("LS_INCREMENTAL_IMPORT_STARTED", { + message: "Starting incremental import analysis...", + directory: resolvedPath, + }); + + // Step 1: Fetch imported files from database + this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", { + message: "Fetching imported files from database...", + }); + + const importedFileNames = new Set(); + const comics = await Comic.find( + { "rawFileDetails.name": { $exists: true, $ne: null } }, + { "rawFileDetails.name": 1, _id: 0 } + ).lean(); + + for (const comic of comics) { + if (comic.rawFileDetails?.name) { + importedFileNames.add(comic.rawFileDetails.name); + } + } + + console.log(`[Incremental Import] Found ${importedFileNames.size} imported files in database`); + + // Step 2: Scan directory for comic files + this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", { + message: "Scanning directory for comic files...", + }); + + const localFiles: Array<{ path: string; name: string; size: number }> = []; + + await new Promise((resolve, reject) => { + klaw(resolvedPath) + .on("error", (err) => { + console.error(`Error walking directory ${resolvedPath}:`, err); + reject(err); + }) + .pipe( + through2.obj(function (item, enc, next) { + const fileExtension = path.extname(item.path); + if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) { + const fileName = path.basename(item.path, fileExtension); + localFiles.push({ + path: item.path, + name: fileName, + size: item.stats.size, + }); + } + next(); + }) + ) + .on("data", () => {}) // Required for stream to work + .on("end", () => { + console.log(`[Incremental Import] Found ${localFiles.length} comic files in directory`); + resolve(); + }); + }); + + // Step 3: Filter to only new files + this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", { + message: `Found ${localFiles.length} comic files, filtering...`, + }); + + const newFiles = localFiles.filter(file => !importedFileNames.has(file.name)); + + console.log(`[Incremental Import] ${newFiles.length} new files to import`); + + // Step 4: Reset job counters and queue new files + if (newFiles.length > 0) { + this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", { + message: `Queueing ${newFiles.length} new files for import...`, + }); + + // Reset counters once at the start + await pubClient.set("completedJobCount", 0); + await pubClient.set("failedJobCount", 0); + console.log("[Incremental Import] Job counters reset"); + + // Queue all new files + for (const file of newFiles) { + await this.broker.call("jobqueue.enqueue", { + fileObject: { + filePath: file.path, + fileSize: file.size, + }, + sessionId, + importType: "new", + sourcedFrom: "library", + action: "enqueue.async", + }); + } + } + + // Emit completion event + this.broker.broadcast("LS_INCREMENTAL_IMPORT_COMPLETE", { + message: `Successfully queued ${newFiles.length} files for import`, + stats: { + total: localFiles.length, + alreadyImported: localFiles.length - newFiles.length, + newFiles: newFiles.length, + queued: newFiles.length, + }, + }); + + return { + success: true, + message: `Incremental import complete: ${newFiles.length} new files queued`, + stats: { + total: localFiles.length, + alreadyImported: localFiles.length - newFiles.length, + newFiles: newFiles.length, + queued: newFiles.length, + }, + }; + } catch (error) { + console.error("[Incremental Import] Error:", error); + + // Emit error event + this.broker.broadcast("LS_INCREMENTAL_IMPORT_ERROR", { + message: error.message || "Unknown error during incremental import", + error: error, + }); + + throw new Errors.MoleculerError( + "Failed to perform incremental import", + 500, + "INCREMENTAL_IMPORT_ERROR", + { error: error.message } + ); + } + }, + }, rawImportToDB: { rest: "POST /rawImportToDB", params: {},