From fd4dd1d5c49802dbf4c80867dea33f3e5cb96f1e Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Thu, 24 Aug 2023 23:18:27 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=A5=AD=20Aggregation=20for=20jobResult?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/jobqueue.service.ts | 26 +++++++++++++++++++++----- services/library.service.ts | 16 +++------------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index 7f9d807..3bbdede 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -1,7 +1,7 @@ -import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer"; +import { Context, Service, ServiceBroker } from "moleculer"; import JobResult from "../models/jobresult.model"; import { refineQuery } from "filename-parser"; -import BullMqMixin, { BullMQAdapter, Queue } from "moleculer-bullmq"; +import BullMqMixin from "moleculer-bullmq"; import { extractFromArchive } from "../utils/uncompression.utils"; import { isNil, isUndefined } from "lodash"; import { pubClient } from "../config/redis.config"; @@ -12,7 +12,6 @@ console.log(process.env.REDIS_URI); export default class JobQueueService extends Service { public constructor(public broker: ServiceBroker) { super(broker); - this.parseServiceSchema({ name: "jobqueue", hooks: {}, @@ -168,6 +167,23 @@ export default class JobQueueService extends Service { } }, }, + getJobResultStatistics: { + rest: "GET /getJobResultStatistics", + handler: async (ctx: Context<{}>) => { + const result = await JobResult.aggregate([ + { + $group: { + _id: "$_id", + jobResults: { $addToSet: "status" } + }, + + }, + { $sort: { timestamp: -1 } }, + { $skip: 0 }, + { $limit: 5 }, + ]) + } + }, }, events: { @@ -188,14 +204,13 @@ export default class JobQueueService extends Service { async "enqueue.async.completed"(ctx: Context<{ id: Number }>) { // 1. Fetch the job result using the job Id const job = await this.job(ctx.params.id); - console.log(job); // 2. Increment the completed job counter await pubClient.incr("completedJobCount"); // 3. Fetch the completed job count for the final payload to be sent to the client const completedJobCount = await pubClient.get("completedJobCount"); // 4. Emit the LS_COVER_EXTRACTED event with the necessary details await this.broker.call("socket.broadcast", { - namespace: "/", //optional + namespace: "/", event: "action", args: [ { @@ -226,6 +241,7 @@ export default class JobQueueService extends Service { id: ctx.params.id, status: "failed", failedReason: job.failedReason, + timestamp: job.timestamp, }); // 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details diff --git a/services/library.service.ts b/services/library.service.ts index f8c5eaa..1acd626 100644 --- a/services/library.service.ts +++ b/services/library.service.ts @@ -175,6 +175,7 @@ export default class ImportService extends Service { }) ) // 1.2 Pipe filtered results to the next step + // Enqueue the job in the queue .on("data", async (item) => { console.info( "Found a file at path: %s", @@ -187,20 +188,10 @@ export default class ImportService extends Service { )}`, }); if (!comicExists) { - // 2. Send the extraction job to the queue - // await broker.call( - // "importqueue.processImport", - // { - // fileObject: { - // filePath: item.path, - // fileSize: item.stats.size, - // }, - // importType: "new", - // } - // ); - // reset the job counters in Redis + // 2.1 Reset the job counters in Redis await pubClient.set("completedJobCount", 0); await pubClient.set("failedJobCount", 0); + // 2.2 Send the extraction job to the queue this.broker.call('jobqueue.enqueue', { fileObject: { filePath: item.path, @@ -222,7 +213,6 @@ export default class ImportService extends Service { } }, }, - rawImportToDB: { rest: "POST /rawImportToDB", params: {},