From 6ee609f2b9b4106b949636ca7e632bdbc3f58ec8 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Wed, 23 Aug 2023 11:47:47 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A7=20Refactor=20and=20added=20getJobC?= =?UTF-8?q?ounts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/jobqueue.service.ts | 21 ++++++++++------ services/socket.service.ts | 46 +++++++++++++++++++++++------------- 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index 7d5b282..fe3c7d3 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -23,6 +23,13 @@ export default class JobQueueService extends Service { }, }, actions: { + getJobCountsByType: { + rest: "GET /getJobCountsByType", + handler: async (ctx: Context<{}>) => { + console.log(ctx.params); + return await this.$resolve("jobqueue").getJobCounts(); + } + }, toggle: { rest: "GET /toggle", handler: async (ctx: Context<{ action: String }>) => { @@ -203,35 +210,35 @@ export default class JobQueueService extends Service { status: "completed", failedReason: {}, }); - // 6. Purge it from Redis - await job.remove(); + console.log(`Job ID ${ctx.params.id} completed.`); }, async "enqueue.async.failed"(ctx) { - const jobState = await this.job(ctx.params.id); + const job = await this.job(ctx.params.id); await pubClient.incr("failedJobCount"); const failedJobCount = await pubClient.get("failedJobCount"); await JobResult.create({ id: ctx.params.id, status: "failed", - failedReason: jobState.failedReason, + failedReason: job.failedReason, }); // 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details await this.broker.call("socket.broadcast", { - namespace: "/", //optional + namespace: "/", event: "action", args: [ { type: "LS_COVER_EXTRACTION_FAILED", failedJobCount, - importResult: jobState, + importResult: job, }, - ], //optional + ], }); + }, }, }); diff --git a/services/socket.service.ts b/services/socket.service.ts index c8ad6e6..ae6d316 100644 --- a/services/socket.service.ts +++ b/services/socket.service.ts @@ -1,5 +1,6 @@ "use strict"; import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer"; +import {JobType} from "moleculer-bullmq"; import { createClient } from "redis"; import { createAdapter } from "@socket.io/redis-adapter"; import Session from "../models/session.model"; @@ -36,27 +37,38 @@ export default class SocketService extends Service { const sessionRecord = await Session.find({ sessionId: data.session.sessionId, }); - // check for sessionId's existence + // 1. Check for sessionId's existence, and a match if ( sessionRecord.length !== 0 && sessionRecord[0].sessionId === - data.session.sessionId + data.session.sessionId ) { - // 1. Get job counts - console.log("yea?") - const completedJobCount = await pubClient.get("completedJobCount"); - const failedJobCount = await pubClient.get("failedJobCount"); - await this.broker.call("socket.broadcast", { - namespace: "/", //optional - event: "action", - args: [ - { - type: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION", - completedJobCount, - failedJobCount, - }, - ], - }); + // 2. Find if the queue has active jobs + const jobs: JobType = await this.broker.call("jobqueue.getJobCountsByType", {}) + const { active, prioritized } = jobs; + + if (active > 0 && prioritized > 0) { + // 3. Get job counts + const completedJobCount = await pubClient.get("completedJobCount"); + const failedJobCount = await pubClient.get("failedJobCount"); + + // 4. Send the counts to the active socket.io session + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "action", + args: [ + { + type: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION", + completedJobCount, + failedJobCount, + queueStatus: "running", + }, + ], + }); + } + + + } } catch (err) { throw new MoleculerError(