From fdcf1f7d681a0cd07435e65ba0f6950296491a52 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Mon, 14 Aug 2023 19:45:40 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=A7=B9=20Linted=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/jobqueue.service.ts | 76 +++++++++++++++++++++++------------- services/socket.service.ts | 52 ++++++++++++++++-------- 2 files changed, 84 insertions(+), 44 deletions(-) diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index 018f9ec..f52ff73 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -1,13 +1,8 @@ -import { - Context, - Service, - ServiceBroker, - ServiceSchema, -} from "moleculer"; +import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer"; const { MoleculerError } = require("moleculer").Errors; import JobResult from "../models/jobresult.model"; import { refineQuery } from "filename-parser"; -import BullMqMixin, { BullMQAdapter, Queue } from 'moleculer-bullmq'; +import BullMqMixin, { BullMQAdapter, Queue } from "moleculer-bullmq"; import { extractFromArchive } from "../utils/uncompression.utils"; import { isNil, isUndefined } from "lodash"; @@ -23,41 +18,52 @@ export default class JobQueueService extends Service { settings: { bullmq: { client: process.env.REDIS_URI, - } + }, }, actions: { getJobStatuses: { rest: "GET /getJobStatuses", handler: async (ctx: Context<{}>) => { - const foo = await this.getJobStatuses('enqueue.async'); + const foo = await this.getJobStatuses("enqueue.async"); console.log(foo); return foo; - } + }, }, enqueue: { queue: true, rest: "/GET enqueue", handler: async (ctx: Context<{}>) => { // Enqueue the job - const job = await this.localQueue(ctx, 'enqueue.async', ctx.params, { priority: 10 }); + const job = await this.localQueue( + ctx, + "enqueue.async", + ctx.params, + { priority: 10 } + ); console.log(`Job ${job.id} enqueued`); return job.id; - } + }, }, // Comic Book Import Job Queue "enqueue.async": { - handler: async (ctx: Context<{ - socketSessionId: String, - }>) => { + handler: async ( + ctx: Context<{ + socketSessionId: String; + }> + ) => { try { - console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`); + console.log( + `Recieved Job ID ${ctx.locals.job.id}, processing...` + ); // 1. De-structure the job params const { fileObject } = ctx.locals.job.data.params; // 2. Extract metadata from the archive - const result = await extractFromArchive(fileObject.filePath); + const result = await extractFromArchive( + fileObject.filePath + ); const { name, filePath, @@ -112,8 +118,9 @@ export default class JobQueueService extends Service { // "acquisition.directconnect.downloads": [], // mark the metadata source - "acquisition.source.name": ctx.locals.job.data.params.sourcedFrom, - } + "acquisition.source.name": + ctx.locals.job.data.params.sourcedFrom, + }; // 3c. Add the bundleId, if present to the payload let bundleId = null; @@ -123,8 +130,13 @@ export default class JobQueueService extends Service { // 3d. Add the sourcedMetadata, if present if ( - !isNil(ctx.locals.job.data.params.sourcedMetadata) && - !isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine) + !isNil( + ctx.locals.job.data.params.sourcedMetadata + ) && + !isUndefined( + ctx.locals.job.data.params.sourcedMetadata + .comicvine + ) ) { Object.assign( payload.sourcedMetadata, @@ -136,7 +148,8 @@ export default class JobQueueService extends Service { const importResult = await this.broker.call( "library.rawImportToDB", { - importType: ctx.locals.job.data.params.importType, + importType: + ctx.locals.job.data.params.importType, bundleId, payload, } @@ -149,10 +162,17 @@ export default class JobQueueService extends Service { socketSessionId: ctx.params.socketSessionId, }; } catch (error) { - console.error(`An error occurred processing Job ID ${ctx.locals.job.id}`); - throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", {data: ctx.params.socketSessionId}); + console.error( + `An error occurred processing Job ID ${ctx.locals.job.id}` + ); + throw new MoleculerError( + error, + 500, + "IMPORT_JOB_ERROR", + { data: ctx.params.socketSessionId } + ); } - } + }, }, }, @@ -164,13 +184,13 @@ export default class JobQueueService extends Service { async "enqueue.async.completed"(ctx: Context<{ id: Number }>) { const jobState = await this.job(ctx.params.id); + await JobResult.create({ id: ctx.params.id, status: "completed", failedReason: {}, }); console.log(`Job ID ${ctx.params.id} completed.`); - }, async "enqueue.async.failed"(ctx) { @@ -180,8 +200,8 @@ export default class JobQueueService extends Service { status: "failed", failedReason: jobState.failedReason, }); - } - } + }, + }, }); } } diff --git a/services/socket.service.ts b/services/socket.service.ts index ac5d879..40add1c 100644 --- a/services/socket.service.ts +++ b/services/socket.service.ts @@ -34,10 +34,20 @@ export default class SocketService extends Service { }, action: async (data) => { switch (data.type) { - case "RESUME_SESSION": - console.log("Attempting to resume session...") + case "RESUME_SESSION": + console.log( + "Attempting to resume session..." + ); + const sessionRecord = + await Session.find({ + sessionId: + data.session.sessionId, + }); + this.io.emit("yelaveda", { + hagindari: "bhagindari", + }); - break; + break; case "LS_IMPORT": console.log( @@ -46,7 +56,10 @@ export default class SocketService extends Service { // 1. Send task to queue await this.broker.call( "library.newImport", - { data: data.data, socketSessionId }, + { + data: data.data, + socketSessionId, + }, {} ); break; @@ -84,20 +97,22 @@ export default class SocketService extends Service { }, }, hooks: {}, - actions: { - - }, - methods: { - - }, + actions: {}, + methods: {}, async started() { this.io.on("connection", async (socket) => { - console.log(socket); - console.log(`socket.io server connected to client with session ID: ${socket.id}`); + console.log( + `socket.io server connected to client with session ID: ${socket.id}` + ); console.log("Looking up sessionId in Mongo..."); - const sessionIdExists = await Session.find({ sessionId: socket.handshake.query.sessionId }); - if(sessionIdExists.length === 0) { - console.log(`Socket Id ${socket.id} not found in Mongo, creating a new session...`); + const sessionIdExists = await Session.find({ + sessionId: socket.handshake.query.sessionId, + }); + // 1. if sessionId isn't found in Mongo, create one and persist it + if (sessionIdExists.length === 0) { + console.log( + `Socket Id ${socket.id} not found in Mongo, creating a new session...` + ); const sessionId = uuidv4(); socket.sessionId = sessionId; console.log(`Saving session ${sessionId} to Mongo...`); @@ -107,7 +122,12 @@ export default class SocketService extends Service { }); socket.emit("sessionInitialized", sessionId); } - + // 2. else, retrieve it from Mongo and "resume" the socket.io connection + else { + console.log( + `Found socketId ${socket.id}, attempting to resume socket.io connection...` + ); + } }); }, });