🏗️ Applying the refactor patc

This commit is contained in:
2023-07-05 23:14:46 -04:00
parent cb84e4893f
commit 007ce4b66f

View File

@@ -7,7 +7,7 @@ import {
} from "moleculer"; } from "moleculer";
// import { BullMQAdapter, JobStatus, BullMqMixin } from 'moleculer-bullmq'; // import { BullMQAdapter, JobStatus, BullMqMixin } from 'moleculer-bullmq';
import { refineQuery } from "filename-parser"; import { refineQuery } from "filename-parser";
import BullMqMixin from 'moleculer-bullmq'; import BullMqMixin, { BullMQAdapter, Queue } from 'moleculer-bullmq';
import { extractFromArchive } from "../utils/uncompression.utils"; import { extractFromArchive } from "../utils/uncompression.utils";
import { isNil, isUndefined } from "lodash"; import { isNil, isUndefined } from "lodash";
@@ -26,6 +26,14 @@ export default class JobQueueService extends Service {
} }
}, },
actions: { actions: {
getJobStatuses: {
rest: "GET /getJobStatuses",
handler: async (ctx: Context<{}>) => {
const foo = await this.getJobStatuses('enqueue.async');
console.log(foo);
return foo;
}
},
enqueue: { enqueue: {
queue: true, queue: true,
rest: "/GET enqueue", rest: "/GET enqueue",
@@ -33,11 +41,13 @@ export default class JobQueueService extends Service {
// Enqueue the job // Enqueue the job
const job = await this.localQueue(ctx, 'enqueue.async', ctx.params, { priority: 10 }); const job = await this.localQueue(ctx, 'enqueue.async', ctx.params, { priority: 10 });
console.log(`Job ${job.id} enqueued`); console.log(`Job ${job.id} enqueued`);
return job.id; return job.id;
} }
}, },
"enqueue.async": { "enqueue.async": {
handler: async (ctx: Context<{}>) => { handler: async (ctx: Context<{}>) => {
try {
console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`); console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`);
// 1. De-structure the job params // 1. De-structure the job params
@@ -65,7 +75,7 @@ export default class JobQueueService extends Service {
JSON.stringify(inferredIssueDetails, null, 2) JSON.stringify(inferredIssueDetails, null, 2)
); );
// 3c. Orchestrate the payload // 3b. Orchestrate the payload
const payload = { const payload = {
importStatus: { importStatus: {
isImported: true, isImported: true,
@@ -102,13 +112,13 @@ export default class JobQueueService extends Service {
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom, "acquisition.source.name": ctx.locals.job.data.params.sourcedFrom,
} }
// Add the bundleId, if present to the payload // 3c. Add the bundleId, if present to the payload
let bundleId = null; let bundleId = null;
if (!isNil(ctx.locals.job.data.params.bundleId)) { if (!isNil(ctx.locals.job.data.params.bundleId)) {
bundleId = ctx.locals.job.data.params.bundleId; bundleId = ctx.locals.job.data.params.bundleId;
} }
// Add the sourcedMetadata, if present // 3d. Add the sourcedMetadata, if present
if ( if (
!isNil(ctx.locals.job.data.params.sourcedMetadata) && !isNil(ctx.locals.job.data.params.sourcedMetadata) &&
!isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine) !isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine)
@@ -119,7 +129,7 @@ export default class JobQueueService extends Service {
); );
} }
// write to mongo // 4. write to mongo
const importResult = await this.broker.call( const importResult = await this.broker.call(
"library.rawImportToDB", "library.rawImportToDB",
{ {
@@ -134,17 +144,25 @@ export default class JobQueueService extends Service {
}, },
id: ctx.locals.job.id, id: ctx.locals.job.id,
}; };
} catch (error) {
console.error(`An error occurred processing Job ID ${ctx.locals.job.id}`);
}
} }
}, },
}, },
events: { events: {
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme // use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
async "enqueue.async.active"(ctx) { async "enqueue.async.active"(ctx) {
console.log(`Job ID ${ctx.params.id} is set to active.`); console.log(`Job ID ${ctx.params.id} is set to active.`);
}, },
async "enqueue.async.completed" (ctx) { async "enqueue.async.completed"(ctx) {
console.log(`Job ID ${ctx.params.id} completed.`); console.log(`Job ID ${ctx.params.id} completed.`);
},
async "enqueue.async.failed"(ctx) {
console.log("ch-----++++++++++-");
} }
} }
}); });