diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index 43dc86f..f7ef78f 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -7,7 +7,7 @@ import { } from "moleculer"; // import { BullMQAdapter, JobStatus, BullMqMixin } from 'moleculer-bullmq'; import { refineQuery } from "filename-parser"; -import BullMqMixin from 'moleculer-bullmq'; +import BullMqMixin, { BullMQAdapter, Queue } from 'moleculer-bullmq'; import { extractFromArchive } from "../utils/uncompression.utils"; import { isNil, isUndefined } from "lodash"; @@ -26,6 +26,14 @@ export default class JobQueueService extends Service { } }, actions: { + getJobStatuses: { + rest: "GET /getJobStatuses", + handler: async (ctx: Context<{}>) => { + const foo = await this.getJobStatuses('enqueue.async'); + console.log(foo); + return foo; + } + }, enqueue: { queue: true, rest: "/GET enqueue", @@ -33,118 +41,128 @@ export default class JobQueueService extends Service { // Enqueue the job const job = await this.localQueue(ctx, 'enqueue.async', ctx.params, { priority: 10 }); console.log(`Job ${job.id} enqueued`); + return job.id; } }, "enqueue.async": { handler: async (ctx: Context<{}>) => { - console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`); + try { + console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`); - // 1. De-structure the job params - const { fileObject } = ctx.locals.job.data.params; + // 1. De-structure the job params + const { fileObject } = ctx.locals.job.data.params; - // 2. Extract metadata from the archive - const result = await extractFromArchive(fileObject.filePath); - const { - name, - filePath, - fileSize, - extension, - mimeType, - cover, - containedIn, - comicInfoJSON, - } = result; - - // 3a. Infer any issue-related metadata from the filename - const { inferredIssueDetails } = refineQuery( - result.name - ); - console.log( - "Issue metadata inferred: ", - JSON.stringify(inferredIssueDetails, null, 2) - ); - - // 3c. Orchestrate the payload - const payload = { - importStatus: { - isImported: true, - tagged: false, - matchedResult: { - score: "0", - }, - }, - rawFileDetails: { + // 2. Extract metadata from the archive + const result = await extractFromArchive(fileObject.filePath); + const { name, filePath, fileSize, extension, mimeType, - containedIn, cover, - }, - inferredMetadata: { - issue: inferredIssueDetails, - }, - sourcedMetadata: { - // except for ComicInfo.xml, everything else should be copied over from the - // parent comic - comicInfo: comicInfoJSON, - }, - // since we already have at least 1 copy - // mark it as not wanted by default - "acquisition.source.wanted": false, + containedIn, + comicInfoJSON, + } = result; - // clear out the downloads array - // "acquisition.directconnect.downloads": [], - - // mark the metadata source - "acquisition.source.name": ctx.locals.job.data.params.sourcedFrom, - } - - // Add the bundleId, if present to the payload - let bundleId = null; - if (!isNil(ctx.locals.job.data.params.bundleId)) { - bundleId = ctx.locals.job.data.params.bundleId; - } - - // Add the sourcedMetadata, if present - if ( - !isNil(ctx.locals.job.data.params.sourcedMetadata) && - !isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine) - ) { - Object.assign( - payload.sourcedMetadata, - ctx.locals.job.data.paramssourcedMetadata + // 3a. Infer any issue-related metadata from the filename + const { inferredIssueDetails } = refineQuery( + result.name + ); + console.log( + "Issue metadata inferred: ", + JSON.stringify(inferredIssueDetails, null, 2) ); - } - // write to mongo - const importResult = await this.broker.call( - "library.rawImportToDB", - { - importType: ctx.locals.job.data.params.importType, - bundleId, - payload, + // 3b. Orchestrate the payload + const payload = { + importStatus: { + isImported: true, + tagged: false, + matchedResult: { + score: "0", + }, + }, + rawFileDetails: { + name, + filePath, + fileSize, + extension, + mimeType, + containedIn, + cover, + }, + inferredMetadata: { + issue: inferredIssueDetails, + }, + sourcedMetadata: { + // except for ComicInfo.xml, everything else should be copied over from the + // parent comic + comicInfo: comicInfoJSON, + }, + // since we already have at least 1 copy + // mark it as not wanted by default + "acquisition.source.wanted": false, + + // clear out the downloads array + // "acquisition.directconnect.downloads": [], + + // mark the metadata source + "acquisition.source.name": ctx.locals.job.data.params.sourcedFrom, } - ); - return { - data: { - importResult, - }, - id: ctx.locals.job.id, - }; + + // 3c. Add the bundleId, if present to the payload + let bundleId = null; + if (!isNil(ctx.locals.job.data.params.bundleId)) { + bundleId = ctx.locals.job.data.params.bundleId; + } + + // 3d. Add the sourcedMetadata, if present + if ( + !isNil(ctx.locals.job.data.params.sourcedMetadata) && + !isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine) + ) { + Object.assign( + payload.sourcedMetadata, + ctx.locals.job.data.paramssourcedMetadata + ); + } + + // 4. write to mongo + const importResult = await this.broker.call( + "library.rawImportToDB", + { + importType: ctx.locals.job.data.params.importType, + bundleId, + payload, + } + ); + return { + data: { + importResult, + }, + id: ctx.locals.job.id, + }; + } catch (error) { + console.error(`An error occurred processing Job ID ${ctx.locals.job.id}`); + } } }, }, + events: { // use the `${QUEUE_NAME}.QUEUE_EVENT` scheme async "enqueue.async.active"(ctx) { console.log(`Job ID ${ctx.params.id} is set to active.`); }, - async "enqueue.async.completed" (ctx) { + async "enqueue.async.completed"(ctx) { console.log(`Job ID ${ctx.params.id} completed.`); + }, + + async "enqueue.async.failed"(ctx) { + console.log("ch-----++++++++++-"); } } });