diff --git a/models/jobresult.model.ts b/models/jobresult.model.ts index 01233e9..8f20944 100644 --- a/models/jobresult.model.ts +++ b/models/jobresult.model.ts @@ -3,6 +3,7 @@ const mongoose = require("mongoose"); const JobResultScehma = mongoose.Schema({ id: Number, status: String, + sessionId: String, failedReason: Object, timestamp: Date, }); diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index ba96a90..76c1def 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -61,12 +61,12 @@ export default class JobQueueService extends Service { "enqueue.async": { handler: async ( ctx: Context<{ - socketSessionId: String; + sessionId: String; }> ) => { try { console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`); - + console.log(ctx.params); // 1. De-structure the job params const { fileObject } = ctx.locals.job.data.params; @@ -155,14 +155,14 @@ export default class JobQueueService extends Service { importResult, }, id: ctx.locals.job.id, - socketSessionId: ctx.params.socketSessionId, + sessionId: ctx.params.sessionId, }; } catch (error) { console.error( `An error occurred processing Job ID ${ctx.locals.job.id}` ); throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", { - data: ctx.params.socketSessionId, + data: ctx.params.sessionId, }); } }, @@ -227,6 +227,7 @@ export default class JobQueueService extends Service { id: ctx.params.id, status: "completed", timestamp: job.timestamp, + sessionId: job.returnvalue.sessionId, failedReason: {}, }); @@ -242,6 +243,7 @@ export default class JobQueueService extends Service { id: ctx.params.id, status: "failed", failedReason: job.failedReason, + sessionId: job.returnvalue.sessionId, timestamp: job.timestamp, }); diff --git a/services/library.service.ts b/services/library.service.ts index 1acd626..b885628 100644 --- a/services/library.service.ts +++ b/services/library.service.ts @@ -33,13 +33,7 @@ SOFTWARE. "use strict"; import { isNil } from "lodash"; -import { - Context, - Service, - ServiceBroker, - ServiceSchema, - Errors, -} from "moleculer"; +import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer"; import { DbMixin } from "../mixins/db.mixin"; import Comic from "../models/comic.model"; import { walkFolder, getSizeOfDirectory } from "../utils/file.utils"; @@ -101,9 +95,7 @@ export default class ImportService extends Service { uncompressFullArchive: { rest: "POST /uncompressFullArchive", params: {}, - handler: async ( - ctx: Context<{ filePath: string; options: any }> - ) => { + handler: async (ctx: Context<{ filePath: string; options: any }>) => { await broker.call("importqueue.uncompressResize", { filePath: ctx.params.filePath, options: ctx.params.options, @@ -121,8 +113,7 @@ export default class ImportService extends Service { }); // Determine source where the comic was added from // and gather identifying information about it - const sourceName = - referenceComicObject[0].acquisition.source.name; + const sourceName = referenceComicObject[0].acquisition.source.name; const { sourcedMetadata } = referenceComicObject[0]; const filePath = `${COMICS_DIRECTORY}/${ctx.params.bundle.data.name}`; @@ -155,32 +146,28 @@ export default class ImportService extends Service { async handler( ctx: Context<{ extractionOptions?: any; + sessionId: string; }> ) { try { + // Get params to be passed to the import jobs + const { sessionId } = ctx.params; // 1. Walk the Source folder klaw(path.resolve(COMICS_DIRECTORY)) // 1.1 Filter on .cb* extensions .pipe( - through2.obj(function(item, enc, next) { + through2.obj(function (item, enc, next) { let fileExtension = path.extname(item.path); - if ( - [".cbz", ".cbr", ".cb7"].includes( - fileExtension - ) - ) { + if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) { this.push(item); } next(); }) ) // 1.2 Pipe filtered results to the next step - // Enqueue the job in the queue + // Enqueue the job in the queue .on("data", async (item) => { - console.info( - "Found a file at path: %s", - item.path - ); + console.info("Found a file at path: %s", item.path); let comicExists = await Comic.exists({ "rawFileDetails.name": `${path.basename( item.path, @@ -192,17 +179,16 @@ export default class ImportService extends Service { await pubClient.set("completedJobCount", 0); await pubClient.set("failedJobCount", 0); // 2.2 Send the extraction job to the queue - this.broker.call('jobqueue.enqueue', { + this.broker.call("jobqueue.enqueue", { fileObject: { filePath: item.path, fileSize: item.stats.size, }, + sessionId, importType: "new", }); } else { - console.log( - "Comic already exists in the library." - ); + console.log("Comic already exists in the library."); } }) .on("end", () => { @@ -254,28 +240,19 @@ export default class ImportService extends Service { // we solicit volume information and add that to mongo if ( comicMetadata.sourcedMetadata.comicvine && - !isNil( - comicMetadata.sourcedMetadata.comicvine - .volume - ) + !isNil(comicMetadata.sourcedMetadata.comicvine.volume) ) { - volumeDetails = await this.broker.call( - "comicvine.getVolumes", - { - volumeURI: - comicMetadata.sourcedMetadata - .comicvine.volume - .api_detail_url, - } - ); + volumeDetails = await this.broker.call("comicvine.getVolumes", { + volumeURI: + comicMetadata.sourcedMetadata.comicvine.volume + .api_detail_url, + }); comicMetadata.sourcedMetadata.comicvine.volumeInformation = volumeDetails.results; } console.log("Saving to Mongo..."); - console.log( - `Import type: [${ctx.params.importType}]` - ); + console.log(`Import type: [${ctx.params.importType}]`); switch (ctx.params.importType) { case "new": return await Comic.create(comicMetadata); @@ -296,10 +273,7 @@ export default class ImportService extends Service { } } catch (error) { console.log(error); - throw new Errors.MoleculerError( - "Import failed.", - 500 - ); + throw new Errors.MoleculerError("Import failed.", 500); } }, }, @@ -317,9 +291,7 @@ export default class ImportService extends Service { ) { // 1. Find mongo object by id // 2. Import payload into sourcedMetadata.comicvine - const comicObjectId = new ObjectId( - ctx.params.comicObjectId - ); + const comicObjectId = new ObjectId(ctx.params.comicObjectId); return new Promise(async (resolve, reject) => { let volumeDetails = {}; @@ -328,18 +300,15 @@ export default class ImportService extends Service { const volumeDetails = await this.broker.call( "comicvine.getVolumes", { - volumeURI: - matchedResult.volume.api_detail_url, + volumeURI: matchedResult.volume.api_detail_url, } ); - matchedResult.volumeInformation = - volumeDetails.results; + matchedResult.volumeInformation = volumeDetails.results; Comic.findByIdAndUpdate( comicObjectId, { $set: { - "sourcedMetadata.comicvine": - matchedResult, + "sourcedMetadata.comicvine": matchedResult, }, }, { new: true }, @@ -370,9 +339,7 @@ export default class ImportService extends Service { }> ) { console.log(JSON.stringify(ctx.params, null, 2)); - const comicObjectId = new ObjectId( - ctx.params.comicObjectId - ); + const comicObjectId = new ObjectId(ctx.params.comicObjectId); return new Promise((resolve, reject) => { Comic.findByIdAndUpdate( @@ -426,9 +393,7 @@ export default class ImportService extends Service { params: { ids: "array" }, handler: async (ctx: Context<{ ids: [string] }>) => { console.log(ctx.params.ids); - const queryIds = ctx.params.ids.map( - (id) => new ObjectId(id) - ); + const queryIds = ctx.params.ids.map((id) => new ObjectId(id)); return await Comic.find({ _id: { $in: queryIds, @@ -444,8 +409,7 @@ export default class ImportService extends Service { const volumes = await Comic.aggregate([ { $project: { - volumeInfo: - "$sourcedMetadata.comicvine.volumeInformation", + volumeInfo: "$sourcedMetadata.comicvine.volumeInformation", }, }, { @@ -491,52 +455,46 @@ export default class ImportService extends Service { const { queryObjects } = ctx.params; // construct the query for ElasticSearch let elasticSearchQuery = {}; - const elasticSearchQueries = queryObjects.map( - (queryObject) => { - console.log("Volume: ", queryObject.volumeName); - console.log("Issue: ", queryObject.issueName); - if (queryObject.issueName === null) { - queryObject.issueName = ""; - } - if (queryObject.volumeName === null) { - queryObject.volumeName = ""; - } - elasticSearchQuery = { - bool: { - must: [ - { - match_phrase: { - "rawFileDetails.name": - queryObject.volumeName, - }, - }, - { - term: { - "inferredMetadata.issue.number": - parseInt( - queryObject.issueNumber, - 10 - ), - }, - }, - ], - }, - }; - - return [ - { - index: "comics", - search_type: "dfs_query_then_fetch", - }, - { - query: elasticSearchQuery, - }, - ]; + const elasticSearchQueries = queryObjects.map((queryObject) => { + console.log("Volume: ", queryObject.volumeName); + console.log("Issue: ", queryObject.issueName); + if (queryObject.issueName === null) { + queryObject.issueName = ""; } - ); - console.log( - JSON.stringify(elasticSearchQueries, null, 2) - ); + if (queryObject.volumeName === null) { + queryObject.volumeName = ""; + } + elasticSearchQuery = { + bool: { + must: [ + { + match_phrase: { + "rawFileDetails.name": queryObject.volumeName, + }, + }, + { + term: { + "inferredMetadata.issue.number": parseInt( + queryObject.issueNumber, + 10 + ), + }, + }, + ], + }, + }; + + return [ + { + index: "comics", + search_type: "dfs_query_then_fetch", + }, + { + query: elasticSearchQuery, + }, + ]; + }); + console.log(JSON.stringify(elasticSearchQueries, null, 2)); return await ctx.broker.call("search.searchComic", { elasticSearchQueries, @@ -549,10 +507,11 @@ export default class ImportService extends Service { rest: "GET /libraryStatistics", params: {}, handler: async (ctx: Context<{}>) => { - const comicDirectorySize = await getSizeOfDirectory( - COMICS_DIRECTORY, - [".cbz", ".cbr", ".cb7"] - ); + const comicDirectorySize = await getSizeOfDirectory(COMICS_DIRECTORY, [ + ".cbz", + ".cbr", + ".cb7", + ]); const totalCount = await Comic.countDocuments({}); const statistics = await Comic.aggregate([ { @@ -561,11 +520,7 @@ export default class ImportService extends Service { { $match: { "rawFileDetails.extension": { - $in: [ - ".cbr", - ".cbz", - ".cb7", - ], + $in: [".cbr", ".cbz", ".cb7"], }, }, }, @@ -579,8 +534,7 @@ export default class ImportService extends Service { issues: [ { $match: { - "sourcedMetadata.comicvine.volumeInformation": - { + "sourcedMetadata.comicvine.volumeInformation": { $gt: {}, }, }, @@ -645,23 +599,16 @@ export default class ImportService extends Service { .drop() .then(async (data) => { console.info(data); - const coversFolderDeleteResult = - fsExtra.emptyDirSync( - path.resolve( - `${USERDATA_DIRECTORY}/covers` - ) - ); - const expandedFolderDeleteResult = - fsExtra.emptyDirSync( - path.resolve( - `${USERDATA_DIRECTORY}/expanded` - ) - ); - const eSIndicesDeleteResult = - await ctx.broker.call( - "search.deleteElasticSearchIndices", - {} - ); + const coversFolderDeleteResult = fsExtra.emptyDirSync( + path.resolve(`${USERDATA_DIRECTORY}/covers`) + ); + const expandedFolderDeleteResult = fsExtra.emptyDirSync( + path.resolve(`${USERDATA_DIRECTORY}/expanded`) + ); + const eSIndicesDeleteResult = await ctx.broker.call( + "search.deleteElasticSearchIndices", + {} + ); return { data, coversFolderDeleteResult, diff --git a/services/socket.service.ts b/services/socket.service.ts index ae6d316..b09627e 100644 --- a/services/socket.service.ts +++ b/services/socket.service.ts @@ -1,6 +1,6 @@ "use strict"; import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer"; -import {JobType} from "moleculer-bullmq"; +import { JobType } from "moleculer-bullmq"; import { createClient } from "redis"; import { createAdapter } from "@socket.io/redis-adapter"; import Session from "../models/session.model"; @@ -16,7 +16,6 @@ export default class SocketService extends Service { schema: ServiceSchema<{}> = { name: "socket" } ) { super(broker); - let socketSessionId = null; this.parseServiceSchema({ name: "socket", mixins: [SocketIOService], @@ -41,17 +40,25 @@ export default class SocketService extends Service { if ( sessionRecord.length !== 0 && sessionRecord[0].sessionId === - data.session.sessionId + data.session.sessionId ) { // 2. Find if the queue has active jobs - const jobs: JobType = await this.broker.call("jobqueue.getJobCountsByType", {}) + const jobs: JobType = await this.broker.call( + "jobqueue.getJobCountsByType", + {} + ); const { active, prioritized } = jobs; if (active > 0 && prioritized > 0) { // 3. Get job counts - const completedJobCount = await pubClient.get("completedJobCount"); - const failedJobCount = await pubClient.get("failedJobCount"); - + const completedJobCount = + await pubClient.get( + "completedJobCount" + ); + const failedJobCount = await pubClient.get( + "failedJobCount" + ); + // 4. Send the counts to the active socket.io session await this.broker.call("socket.broadcast", { namespace: "/", @@ -66,9 +73,6 @@ export default class SocketService extends Service { ], }); } - - - } } catch (err) { throw new MoleculerError( @@ -83,19 +87,6 @@ export default class SocketService extends Service { break; - case "LS_IMPORT": - console.log(`Recieved ${data.type} event.`); - // 1. Send task to queue - await this.broker.call( - "library.newImport", - { - data: data.data, - socketSessionId, - }, - {} - ); - break; - case "LS_SET_QUEUE_STATUS": console.log(data); await this.broker.call(