diff --git a/models/jobresult.model.ts b/models/jobresult.model.ts new file mode 100644 index 0000000..4308d6b --- /dev/null +++ b/models/jobresult.model.ts @@ -0,0 +1,11 @@ +const mongoose = require("mongoose"); +const paginate = require("mongoose-paginate-v2"); + +const JobResultScehma = mongoose.Schema({ + id: Number, + status: String, + failedReason: Object +}); + +const JobResult = mongoose.model("JobResult", JobResultScehma); +export default JobResult; diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index f7ef78f..63ffae0 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -3,9 +3,9 @@ import { Service, ServiceBroker, ServiceSchema, - Errors, } from "moleculer"; -// import { BullMQAdapter, JobStatus, BullMqMixin } from 'moleculer-bullmq'; +const MoleculerError = require("moleculer").Errors; +import JobResult from "../models/jobresult.model"; import { refineQuery } from "filename-parser"; import BullMqMixin, { BullMQAdapter, Queue } from 'moleculer-bullmq'; import { extractFromArchive } from "../utils/uncompression.utils"; @@ -45,8 +45,11 @@ export default class JobQueueService extends Service { return job.id; } }, + // Comic Book Import Job Queue "enqueue.async": { - handler: async (ctx: Context<{}>) => { + handler: async (ctx: Context<{ + socketSessionId: String, + }>) => { try { console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`); @@ -125,7 +128,7 @@ export default class JobQueueService extends Service { ) { Object.assign( payload.sourcedMetadata, - ctx.locals.job.data.paramssourcedMetadata + ctx.locals.job.data.params.sourcedMetadata ); } @@ -143,9 +146,11 @@ export default class JobQueueService extends Service { importResult, }, id: ctx.locals.job.id, + socketSessionId: ctx.params.socketSessionId, }; } catch (error) { console.error(`An error occurred processing Job ID ${ctx.locals.job.id}`); + throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", ctx.params.socketSessionId); } } }, @@ -153,16 +158,28 @@ export default class JobQueueService extends Service { events: { // use the `${QUEUE_NAME}.QUEUE_EVENT` scheme - async "enqueue.async.active"(ctx) { + async "enqueue.async.active"(ctx: Context<{ id: Number }>) { console.log(`Job ID ${ctx.params.id} is set to active.`); }, - async "enqueue.async.completed"(ctx) { + async "enqueue.async.completed"(ctx: Context<{ id: Number }>) { + const jobState = await this.job(ctx.params.id); + await JobResult.create({ + id: ctx.params.id, + status: "completed", + failedReason: {}, + }); console.log(`Job ID ${ctx.params.id} completed.`); + }, async "enqueue.async.failed"(ctx) { - console.log("ch-----++++++++++-"); + const jobState = await this.job(ctx.params.id); + await JobResult.create({ + id: ctx.params.id, + status: "failed", + failedReason: jobState.failedReason, + }); } } }); diff --git a/services/library.service.ts b/services/library.service.ts index 732e8d8..bb85cd0 100644 --- a/services/library.service.ts +++ b/services/library.service.ts @@ -154,6 +154,7 @@ export default class ImportService extends Service { async handler( ctx: Context<{ extractionOptions?: any; + socketSessionId: String, }> ) { try { @@ -203,6 +204,7 @@ export default class ImportService extends Service { fileSize: item.stats.size, }, importType: "new", + socketSessionId: ctx.params.socketSessionId, }); } else { console.log( diff --git a/services/search.service.ts b/services/search.service.ts index e50edac..29d1205 100644 --- a/services/search.service.ts +++ b/services/search.service.ts @@ -46,14 +46,15 @@ export default class SettingsService extends Service { .map((item) => JSON.stringify(item)) .join("\n"); queries += "\n"; - const { body } = await eSClient.msearch({ + const { responses } = await eSClient.msearch({ body: queries, }); - body.responses.forEach((match) => { + + responses.forEach((match) => { console.log(match.hits); }); - return body.responses; + return responses; }, }, issue: { diff --git a/services/socket.service.ts b/services/socket.service.ts index 402b0c3..2b2ce6d 100644 --- a/services/socket.service.ts +++ b/services/socket.service.ts @@ -1,16 +1,16 @@ "use strict"; -import { Service, ServiceBroker, ServiceSchema } from "moleculer"; +import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer"; import { createClient } from "redis"; import { createAdapter } from "@socket.io/redis-adapter"; const SocketIOService = require("moleculer-io"); -const redisURL = new URL(process.env.REDIS_URI); -// console.log(redisURL.hostname); +const redisURL = new URL(process.env.REDIS_URI); const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` }); (async () => { await pubClient.connect(); })(); const subClient = pubClient.duplicate(); + export default class SocketService extends Service { // @ts-ignore public constructor( @@ -18,6 +18,7 @@ export default class SocketService extends Service { schema: ServiceSchema<{}> = { name: "socket" } ) { super(broker); + let socketSessionId = null; this.parseServiceSchema({ name: "socket", mixins: [SocketIOService], @@ -30,8 +31,7 @@ export default class SocketService extends Service { call: { // whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"], }, - action: async (data, ack) => { - // write your handler function here. + action: async (data) => { switch (data.type) { case "LS_IMPORT": console.log( @@ -40,10 +40,11 @@ export default class SocketService extends Service { // 1. Send task to queue await this.broker.call( "library.newImport", - data.data, + { data: data.data, socketSessionId }, {} ); break; + case "LS_TOGGLE_IMPORT_QUEUE": await this.broker.call( "importqueue.toggleImportQueue", @@ -77,12 +78,18 @@ export default class SocketService extends Service { }, }, hooks: {}, - actions: {}, - methods: {}, + actions: { + + }, + methods: { + + }, async started() { - this.io.on("connection", (data) => - console.log("socket.io server initialized.") - ); + this.io.on("connection", (socket) => { + console.log(`socket.io server initialized with session ID: ${socket.id}`); + socket.emit("sessionId", socket.id); + socketSessionId = socket.id; + }); }, }); } diff --git a/utils/uncompression.utils.ts b/utils/uncompression.utils.ts index d29058c..69f5293 100644 --- a/utils/uncompression.utils.ts +++ b/utils/uncompression.utils.ts @@ -254,7 +254,7 @@ export const extractComicInfoXMLFromZip = async ( // Push the first file (cover) to our extraction target extractionTargets.push(files[0].name); filesToWriteToDisk.coverFile = path.basename(files[0].name); - + if (!isEmpty(comicInfoXMLFileObject)) { filesToWriteToDisk.comicInfoXML = comicInfoXMLFileObject[0].name; extractionTargets.push(filesToWriteToDisk.comicInfoXML); @@ -364,10 +364,11 @@ export const extractFromArchive = async (filePath: string) => { return Object.assign({}, ...cbrResult); default: - console.log( + console.error( "Error inferring filetype for comicinfo.xml extraction." ); - break; + throw new Error("Cannot infer filetype"); + } };