🔨 Fixed import flow

This commit is contained in:
2026-02-27 00:00:46 -05:00
parent a1fa12f181
commit 8c224bad68
6 changed files with 360 additions and 5 deletions

View File

@@ -55,7 +55,7 @@ services:
- "27017:27017"
volumes:
- "mongodb_data:/bitnami/mongodb"
redis:
image: "bitnami/redis:latest"
container_name: queue

View File

@@ -0,0 +1,137 @@
/**
* Migration script to add indexes for import performance optimization
*
* This migration adds indexes to the Comic collection to dramatically improve
* the performance of import statistics queries, especially for large libraries.
*
* Run this script once to add indexes to an existing database:
* npx ts-node migrations/add-import-indexes.ts
*/
import mongoose from "mongoose";
import Comic from "../models/comic.model";
// Suppress Mongoose 7 deprecation warning
mongoose.set('strictQuery', false);
const MONGO_URI = process.env.MONGO_URI || "mongodb://localhost:27017/threetwo";
async function addIndexes() {
try {
console.log("Connecting to MongoDB...");
await mongoose.connect(MONGO_URI);
console.log("Connected successfully");
console.log("\nAdding indexes to Comic collection...");
// Get the collection
const collection = Comic.collection;
// Check existing indexes
console.log("\nExisting indexes:");
const existingIndexes = await collection.indexes();
const existingIndexMap = new Map();
existingIndexes.forEach((index) => {
const keyStr = JSON.stringify(index.key);
console.log(` - ${keyStr} (name: ${index.name})`);
existingIndexMap.set(keyStr, index.name);
});
// Helper function to create index if it doesn't exist
async function createIndexIfNeeded(
key: any,
options: any,
description: string
) {
const keyStr = JSON.stringify(key);
if (existingIndexMap.has(keyStr)) {
console.log(` ⏭️ Index on ${description} already exists (${existingIndexMap.get(keyStr)})`);
return;
}
console.log(` Creating index on ${description}...`);
try {
await collection.createIndex(key, options);
console.log(" ✓ Created");
} catch (error: any) {
// If index already exists with different name, that's okay
if (error.code === 85 || error.codeName === 'IndexOptionsConflict') {
console.log(` ⏭️ Index already exists (skipping)`);
} else {
throw error;
}
}
}
// Add new indexes
console.log("\nCreating new indexes...");
// Index for import statistics queries (most important)
await createIndexIfNeeded(
{ "rawFileDetails.filePath": 1 },
{
name: "rawFileDetails_filePath_1",
background: true // Create in background to avoid blocking
},
"rawFileDetails.filePath"
);
// Index for duplicate detection
await createIndexIfNeeded(
{ "rawFileDetails.name": 1 },
{
name: "rawFileDetails_name_1",
background: true
},
"rawFileDetails.name"
);
// Index for wanted comics queries
await createIndexIfNeeded(
{ "wanted.volume.id": 1 },
{
name: "wanted_volume_id_1",
background: true,
sparse: true // Only index documents that have this field
},
"wanted.volume.id"
);
// Verify indexes were created
console.log("\nFinal indexes:");
const finalIndexes = await collection.indexes();
finalIndexes.forEach((index) => {
console.log(` - ${JSON.stringify(index.key)} (name: ${index.name})`);
});
console.log("\n✅ Migration completed successfully!");
console.log("\nPerformance improvements:");
console.log(" - Import statistics queries should be 10-100x faster");
console.log(" - Large libraries (10,000+ comics) will see the most benefit");
console.log(" - Timeout errors should be eliminated");
} catch (error) {
console.error("\n❌ Migration failed:", error);
throw error;
} finally {
await mongoose.disconnect();
console.log("\nDisconnected from MongoDB");
}
}
// Run the migration
if (require.main === module) {
addIndexes()
.then(() => {
console.log("\nMigration script completed");
process.exit(0);
})
.catch((error) => {
console.error("\nMigration script failed:", error);
process.exit(1);
});
}
export default addIndexes;

View File

@@ -387,5 +387,10 @@ ComicSchema.plugin(mongoosastic, {
} as MongoosasticPluginOpts);
ComicSchema.plugin(paginate);
// Add indexes for performance
ComicSchema.index({ "rawFileDetails.filePath": 1 }); // For import statistics queries
ComicSchema.index({ "rawFileDetails.name": 1 }); // For duplicate detection
ComicSchema.index({ "wanted.volume.id": 1 }); // For wanted comics queries
const Comic = mongoose.model("Comic", ComicSchema);
export default Comic;

View File

@@ -102,7 +102,6 @@ const brokerConfig: BrokerOptions = {
serializer: "JSON",
// Number of milliseconds to wait before reject a request with a RequestTimeout error. Disabled: 0
// Increased to 60 seconds to handle long-running operations like import statistics on large libraries
requestTimeout: 60 * 1000,
// Retry policy settings. More info: https://moleculer.services/docs/0.14/fault-tolerance.html#Retry

View File

@@ -12,7 +12,8 @@
"lint": "eslint --ext .js,.ts .",
"dc:up": "docker-compose up --build -d",
"dc:logs": "docker-compose logs -f",
"dc:down": "docker-compose down"
"dc:down": "docker-compose down",
"migrate:indexes": "ts-node migrations/add-import-indexes.ts"
},
"keywords": [
"microservices",

View File

@@ -59,6 +59,7 @@ import path from "path";
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
import AirDCPPSocket from "../shared/airdcpp.socket";
import { importComicViaGraphQL } from "../utils/import.graphql.utils";
import { getImportStatistics as getImportStats } from "../utils/import.utils";
console.log(`MONGO -> ${process.env.MONGO_URI}`);
export default class ImportService extends Service {
@@ -86,7 +87,7 @@ export default class ImportService extends Service {
async handler(
ctx: Context<{
basePathToWalk: string;
extensions: string[];
extensions?: string[];
}>
) {
console.log(ctx.params);
@@ -94,7 +95,7 @@ export default class ImportService extends Service {
".cbz",
".cbr",
".cb7",
...ctx.params.extensions,
...(ctx.params.extensions || []),
]);
},
},
@@ -245,6 +246,218 @@ export default class ImportService extends Service {
}
},
},
getImportStatistics: {
rest: "POST /getImportStatistics",
timeout: 300000, // 5 minute timeout for large libraries
async handler(
ctx: Context<{
directoryPath?: string;
}>
) {
try {
const { directoryPath } = ctx.params;
const resolvedPath = path.resolve(directoryPath || COMICS_DIRECTORY);
console.log(`[Import Statistics] Analyzing directory: ${resolvedPath}`);
// Collect all comic files from the directory
const localFiles: string[] = [];
await new Promise<void>((resolve, reject) => {
klaw(resolvedPath)
.on("error", (err) => {
console.error(`Error walking directory ${resolvedPath}:`, err);
reject(err);
})
.pipe(
through2.obj(function (item, enc, next) {
const fileExtension = path.extname(item.path);
if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) {
localFiles.push(item.path);
}
next();
})
)
.on("data", () => {}) // Required for stream to work
.on("end", () => {
console.log(`[Import Statistics] Found ${localFiles.length} comic files`);
resolve();
});
});
// Get statistics by comparing with database
const stats = await getImportStats(localFiles);
const percentageImported = stats.total > 0
? ((stats.alreadyImported / stats.total) * 100).toFixed(2)
: "0.00";
return {
success: true,
directory: resolvedPath,
stats: {
totalLocalFiles: stats.total,
alreadyImported: stats.alreadyImported,
newFiles: stats.newFiles,
percentageImported: `${percentageImported}%`,
},
};
} catch (error) {
console.error("[Import Statistics] Error:", error);
throw new Errors.MoleculerError(
"Failed to calculate import statistics",
500,
"IMPORT_STATS_ERROR",
{ error: error.message }
);
}
},
},
incrementalImport: {
rest: "POST /incrementalImport",
timeout: 60000, // 60 second timeout
async handler(
ctx: Context<{
sessionId: string;
directoryPath?: string;
}>
) {
try {
const { sessionId, directoryPath } = ctx.params;
const resolvedPath = path.resolve(directoryPath || COMICS_DIRECTORY);
console.log(`[Incremental Import] Starting for directory: ${resolvedPath}`);
// Emit start event
this.broker.broadcast("LS_INCREMENTAL_IMPORT_STARTED", {
message: "Starting incremental import analysis...",
directory: resolvedPath,
});
// Step 1: Fetch imported files from database
this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", {
message: "Fetching imported files from database...",
});
const importedFileNames = new Set<string>();
const comics = await Comic.find(
{ "rawFileDetails.name": { $exists: true, $ne: null } },
{ "rawFileDetails.name": 1, _id: 0 }
).lean();
for (const comic of comics) {
if (comic.rawFileDetails?.name) {
importedFileNames.add(comic.rawFileDetails.name);
}
}
console.log(`[Incremental Import] Found ${importedFileNames.size} imported files in database`);
// Step 2: Scan directory for comic files
this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", {
message: "Scanning directory for comic files...",
});
const localFiles: Array<{ path: string; name: string; size: number }> = [];
await new Promise<void>((resolve, reject) => {
klaw(resolvedPath)
.on("error", (err) => {
console.error(`Error walking directory ${resolvedPath}:`, err);
reject(err);
})
.pipe(
through2.obj(function (item, enc, next) {
const fileExtension = path.extname(item.path);
if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) {
const fileName = path.basename(item.path, fileExtension);
localFiles.push({
path: item.path,
name: fileName,
size: item.stats.size,
});
}
next();
})
)
.on("data", () => {}) // Required for stream to work
.on("end", () => {
console.log(`[Incremental Import] Found ${localFiles.length} comic files in directory`);
resolve();
});
});
// Step 3: Filter to only new files
this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", {
message: `Found ${localFiles.length} comic files, filtering...`,
});
const newFiles = localFiles.filter(file => !importedFileNames.has(file.name));
console.log(`[Incremental Import] ${newFiles.length} new files to import`);
// Step 4: Reset job counters and queue new files
if (newFiles.length > 0) {
this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", {
message: `Queueing ${newFiles.length} new files for import...`,
});
// Reset counters once at the start
await pubClient.set("completedJobCount", 0);
await pubClient.set("failedJobCount", 0);
console.log("[Incremental Import] Job counters reset");
// Queue all new files
for (const file of newFiles) {
await this.broker.call("jobqueue.enqueue", {
fileObject: {
filePath: file.path,
fileSize: file.size,
},
sessionId,
importType: "new",
sourcedFrom: "library",
action: "enqueue.async",
});
}
}
// Emit completion event
this.broker.broadcast("LS_INCREMENTAL_IMPORT_COMPLETE", {
message: `Successfully queued ${newFiles.length} files for import`,
stats: {
total: localFiles.length,
alreadyImported: localFiles.length - newFiles.length,
newFiles: newFiles.length,
queued: newFiles.length,
},
});
return {
success: true,
message: `Incremental import complete: ${newFiles.length} new files queued`,
stats: {
total: localFiles.length,
alreadyImported: localFiles.length - newFiles.length,
newFiles: newFiles.length,
queued: newFiles.length,
},
};
} catch (error) {
console.error("[Incremental Import] Error:", error);
// Emit error event
this.broker.broadcast("LS_INCREMENTAL_IMPORT_ERROR", {
message: error.message || "Unknown error during incremental import",
error: error,
});
throw new Errors.MoleculerError(
"Failed to perform incremental import",
500,
"INCREMENTAL_IMPORT_ERROR",
{ error: error.message }
);
}
},
},
rawImportToDB: {
rest: "POST /rawImportToDB",
params: {},