🧹 Linted code

This commit is contained in:
2023-08-14 19:45:40 -04:00
parent 4003f666cf
commit fdcf1f7d68
2 changed files with 84 additions and 44 deletions

View File

@@ -1,13 +1,8 @@
import { import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
Context,
Service,
ServiceBroker,
ServiceSchema,
} from "moleculer";
const { MoleculerError } = require("moleculer").Errors; const { MoleculerError } = require("moleculer").Errors;
import JobResult from "../models/jobresult.model"; import JobResult from "../models/jobresult.model";
import { refineQuery } from "filename-parser"; import { refineQuery } from "filename-parser";
import BullMqMixin, { BullMQAdapter, Queue } from 'moleculer-bullmq'; import BullMqMixin, { BullMQAdapter, Queue } from "moleculer-bullmq";
import { extractFromArchive } from "../utils/uncompression.utils"; import { extractFromArchive } from "../utils/uncompression.utils";
import { isNil, isUndefined } from "lodash"; import { isNil, isUndefined } from "lodash";
@@ -23,41 +18,52 @@ export default class JobQueueService extends Service {
settings: { settings: {
bullmq: { bullmq: {
client: process.env.REDIS_URI, client: process.env.REDIS_URI,
} },
}, },
actions: { actions: {
getJobStatuses: { getJobStatuses: {
rest: "GET /getJobStatuses", rest: "GET /getJobStatuses",
handler: async (ctx: Context<{}>) => { handler: async (ctx: Context<{}>) => {
const foo = await this.getJobStatuses('enqueue.async'); const foo = await this.getJobStatuses("enqueue.async");
console.log(foo); console.log(foo);
return foo; return foo;
} },
}, },
enqueue: { enqueue: {
queue: true, queue: true,
rest: "/GET enqueue", rest: "/GET enqueue",
handler: async (ctx: Context<{}>) => { handler: async (ctx: Context<{}>) => {
// Enqueue the job // Enqueue the job
const job = await this.localQueue(ctx, 'enqueue.async', ctx.params, { priority: 10 }); const job = await this.localQueue(
ctx,
"enqueue.async",
ctx.params,
{ priority: 10 }
);
console.log(`Job ${job.id} enqueued`); console.log(`Job ${job.id} enqueued`);
return job.id; return job.id;
} },
}, },
// Comic Book Import Job Queue // Comic Book Import Job Queue
"enqueue.async": { "enqueue.async": {
handler: async (ctx: Context<{ handler: async (
socketSessionId: String, ctx: Context<{
}>) => { socketSessionId: String;
}>
) => {
try { try {
console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`); console.log(
`Recieved Job ID ${ctx.locals.job.id}, processing...`
);
// 1. De-structure the job params // 1. De-structure the job params
const { fileObject } = ctx.locals.job.data.params; const { fileObject } = ctx.locals.job.data.params;
// 2. Extract metadata from the archive // 2. Extract metadata from the archive
const result = await extractFromArchive(fileObject.filePath); const result = await extractFromArchive(
fileObject.filePath
);
const { const {
name, name,
filePath, filePath,
@@ -112,8 +118,9 @@ export default class JobQueueService extends Service {
// "acquisition.directconnect.downloads": [], // "acquisition.directconnect.downloads": [],
// mark the metadata source // mark the metadata source
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom, "acquisition.source.name":
} ctx.locals.job.data.params.sourcedFrom,
};
// 3c. Add the bundleId, if present to the payload // 3c. Add the bundleId, if present to the payload
let bundleId = null; let bundleId = null;
@@ -123,8 +130,13 @@ export default class JobQueueService extends Service {
// 3d. Add the sourcedMetadata, if present // 3d. Add the sourcedMetadata, if present
if ( if (
!isNil(ctx.locals.job.data.params.sourcedMetadata) && !isNil(
!isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine) ctx.locals.job.data.params.sourcedMetadata
) &&
!isUndefined(
ctx.locals.job.data.params.sourcedMetadata
.comicvine
)
) { ) {
Object.assign( Object.assign(
payload.sourcedMetadata, payload.sourcedMetadata,
@@ -136,7 +148,8 @@ export default class JobQueueService extends Service {
const importResult = await this.broker.call( const importResult = await this.broker.call(
"library.rawImportToDB", "library.rawImportToDB",
{ {
importType: ctx.locals.job.data.params.importType, importType:
ctx.locals.job.data.params.importType,
bundleId, bundleId,
payload, payload,
} }
@@ -149,10 +162,17 @@ export default class JobQueueService extends Service {
socketSessionId: ctx.params.socketSessionId, socketSessionId: ctx.params.socketSessionId,
}; };
} catch (error) { } catch (error) {
console.error(`An error occurred processing Job ID ${ctx.locals.job.id}`); console.error(
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", {data: ctx.params.socketSessionId}); `An error occurred processing Job ID ${ctx.locals.job.id}`
);
throw new MoleculerError(
error,
500,
"IMPORT_JOB_ERROR",
{ data: ctx.params.socketSessionId }
);
} }
} },
}, },
}, },
@@ -164,13 +184,13 @@ export default class JobQueueService extends Service {
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) { async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
const jobState = await this.job(ctx.params.id); const jobState = await this.job(ctx.params.id);
await JobResult.create({ await JobResult.create({
id: ctx.params.id, id: ctx.params.id,
status: "completed", status: "completed",
failedReason: {}, failedReason: {},
}); });
console.log(`Job ID ${ctx.params.id} completed.`); console.log(`Job ID ${ctx.params.id} completed.`);
}, },
async "enqueue.async.failed"(ctx) { async "enqueue.async.failed"(ctx) {
@@ -180,8 +200,8 @@ export default class JobQueueService extends Service {
status: "failed", status: "failed",
failedReason: jobState.failedReason, failedReason: jobState.failedReason,
}); });
} },
} },
}); });
} }
} }

View File

@@ -34,10 +34,20 @@ export default class SocketService extends Service {
}, },
action: async (data) => { action: async (data) => {
switch (data.type) { switch (data.type) {
case "RESUME_SESSION": case "RESUME_SESSION":
console.log("Attempting to resume session...") console.log(
"Attempting to resume session..."
);
const sessionRecord =
await Session.find({
sessionId:
data.session.sessionId,
});
this.io.emit("yelaveda", {
hagindari: "bhagindari",
});
break; break;
case "LS_IMPORT": case "LS_IMPORT":
console.log( console.log(
@@ -46,7 +56,10 @@ export default class SocketService extends Service {
// 1. Send task to queue // 1. Send task to queue
await this.broker.call( await this.broker.call(
"library.newImport", "library.newImport",
{ data: data.data, socketSessionId }, {
data: data.data,
socketSessionId,
},
{} {}
); );
break; break;
@@ -84,20 +97,22 @@ export default class SocketService extends Service {
}, },
}, },
hooks: {}, hooks: {},
actions: { actions: {},
methods: {},
},
methods: {
},
async started() { async started() {
this.io.on("connection", async (socket) => { this.io.on("connection", async (socket) => {
console.log(socket); console.log(
console.log(`socket.io server connected to client with session ID: ${socket.id}`); `socket.io server connected to client with session ID: ${socket.id}`
);
console.log("Looking up sessionId in Mongo..."); console.log("Looking up sessionId in Mongo...");
const sessionIdExists = await Session.find({ sessionId: socket.handshake.query.sessionId }); const sessionIdExists = await Session.find({
if(sessionIdExists.length === 0) { sessionId: socket.handshake.query.sessionId,
console.log(`Socket Id ${socket.id} not found in Mongo, creating a new session...`); });
// 1. if sessionId isn't found in Mongo, create one and persist it
if (sessionIdExists.length === 0) {
console.log(
`Socket Id ${socket.id} not found in Mongo, creating a new session...`
);
const sessionId = uuidv4(); const sessionId = uuidv4();
socket.sessionId = sessionId; socket.sessionId = sessionId;
console.log(`Saving session ${sessionId} to Mongo...`); console.log(`Saving session ${sessionId} to Mongo...`);
@@ -107,7 +122,12 @@ export default class SocketService extends Service {
}); });
socket.emit("sessionInitialized", sessionId); socket.emit("sessionInitialized", sessionId);
} }
// 2. else, retrieve it from Mongo and "resume" the socket.io connection
else {
console.log(
`Found socketId ${socket.id}, attempting to resume socket.io connection...`
);
}
}); });
}, },
}); });