diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index f52ff73..0a1f190 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -1,10 +1,12 @@ import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer"; -const { MoleculerError } = require("moleculer").Errors; import JobResult from "../models/jobresult.model"; import { refineQuery } from "filename-parser"; import BullMqMixin, { BullMQAdapter, Queue } from "moleculer-bullmq"; import { extractFromArchive } from "../utils/uncompression.utils"; import { isNil, isUndefined } from "lodash"; +import { pubClient } from "../config/redis.config"; + +const { MoleculerError } = require("moleculer").Errors; console.log(process.env.REDIS_URI); export default class JobQueueService extends Service { @@ -183,8 +185,19 @@ export default class JobQueueService extends Service { }, async "enqueue.async.completed"(ctx: Context<{ id: Number }>) { + // 1. Fetch the job result using the job Id const jobState = await this.job(ctx.params.id); - + // 2. Incremement the completed job counter + await pubClient.incr("completedJobCount"); + // 3. Fetch the completed job count for the final payload to be sent to the client + const completedJobCount = await pubClient.get("completedJobCount"); + // 4. Emit the LS_COVER_EXTRACTED event with the necessary details + await this.broker.call("socket.broadcast", { + namespace: "/", //optional + event: "action", + args: [{ type: "LS_COVER_EXTRACTED", completedJobCount, importResult: jobState.returnvalue.data.importResult }], //optional + }); + // 5. Persist the job results in mongo for posterity await JobResult.create({ id: ctx.params.id, status: "completed", @@ -194,12 +207,25 @@ export default class JobQueueService extends Service { }, async "enqueue.async.failed"(ctx) { + console.log("FAILED FAILED FAILED FAILED FAILED") const jobState = await this.job(ctx.params.id); + await pubClient.incr("failedJobCount"); + const failedJobCount = await pubClient.get("failedJobCount"); + await JobResult.create({ id: ctx.params.id, status: "failed", failedReason: jobState.failedReason, }); + + // 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details + await this.broker.call("socket.broadcast", { + namespace: "/", //optional + event: "action", + args: [{ type: "LS_COVER_EXTRACTION_FAILED", failedJobCount, importResult: jobState }], //optional + }); + + }, }, }); diff --git a/services/library.service.ts b/services/library.service.ts index 732e8d8..f8c5eaa 100644 --- a/services/library.service.ts +++ b/services/library.service.ts @@ -51,6 +51,7 @@ import { IExtractionOptions, } from "threetwo-ui-typings"; const ObjectId = require("mongoose").Types.ObjectId; +import { pubClient } from "../config/redis.config"; import fsExtra from "fs-extra"; const through2 = require("through2"); import klaw from "klaw"; @@ -197,6 +198,9 @@ export default class ImportService extends Service { // importType: "new", // } // ); + // reset the job counters in Redis + await pubClient.set("completedJobCount", 0); + await pubClient.set("failedJobCount", 0); this.broker.call('jobqueue.enqueue', { fileObject: { filePath: item.path, diff --git a/services/socket.service.ts b/services/socket.service.ts index eedd2f5..5403e47 100644 --- a/services/socket.service.ts +++ b/services/socket.service.ts @@ -4,6 +4,7 @@ import { createClient } from "redis"; import { createAdapter } from "@socket.io/redis-adapter"; import Session from "../models/session.model"; import { pubClient, subClient } from "../config/redis.config"; +const { MoleculerError } = require("moleculer").Errors; const SocketIOService = require("moleculer-io"); const { v4: uuidv4 } = require("uuid"); @@ -33,14 +34,25 @@ export default class SocketService extends Service { console.log( "Attempting to resume session..." ); - const sessionRecord = - await Session.find({ - sessionId: - data.session.sessionId, - }); - this.io.emit("yelaveda", { - hagindari: "bhagindari", - }); + try { + const sessionRecord = + await Session.find({ + sessionId: + data.session.sessionId, + }); + if (sessionRecord.length !== 0 && sessionRecord[0].sessionId === data.session.sessionId) { + this.io.emit("yelaveda", { + hagindari: "bhagindari", + }); + } + } catch (err) { + throw new MoleculerError( + err, + 500, + "SESSION_ID_NOT_FOUND", + { data: data.session.sessionId } + ); + } break;