🐂 BullMQ support code

This commit is contained in:
2023-07-13 08:02:12 -07:00
parent 007ce4b66f
commit 7b855f8cf1
6 changed files with 63 additions and 24 deletions

11
models/jobresult.model.ts Normal file
View File

@@ -0,0 +1,11 @@
const mongoose = require("mongoose");
const paginate = require("mongoose-paginate-v2");
const JobResultScehma = mongoose.Schema({
id: Number,
status: String,
failedReason: Object
});
const JobResult = mongoose.model("JobResult", JobResultScehma);
export default JobResult;

View File

@@ -3,9 +3,9 @@ import {
Service, Service,
ServiceBroker, ServiceBroker,
ServiceSchema, ServiceSchema,
Errors,
} from "moleculer"; } from "moleculer";
// import { BullMQAdapter, JobStatus, BullMqMixin } from 'moleculer-bullmq'; const MoleculerError = require("moleculer").Errors;
import JobResult from "../models/jobresult.model";
import { refineQuery } from "filename-parser"; import { refineQuery } from "filename-parser";
import BullMqMixin, { BullMQAdapter, Queue } from 'moleculer-bullmq'; import BullMqMixin, { BullMQAdapter, Queue } from 'moleculer-bullmq';
import { extractFromArchive } from "../utils/uncompression.utils"; import { extractFromArchive } from "../utils/uncompression.utils";
@@ -45,8 +45,11 @@ export default class JobQueueService extends Service {
return job.id; return job.id;
} }
}, },
// Comic Book Import Job Queue
"enqueue.async": { "enqueue.async": {
handler: async (ctx: Context<{}>) => { handler: async (ctx: Context<{
socketSessionId: String,
}>) => {
try { try {
console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`); console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`);
@@ -125,7 +128,7 @@ export default class JobQueueService extends Service {
) { ) {
Object.assign( Object.assign(
payload.sourcedMetadata, payload.sourcedMetadata,
ctx.locals.job.data.paramssourcedMetadata ctx.locals.job.data.params.sourcedMetadata
); );
} }
@@ -143,9 +146,11 @@ export default class JobQueueService extends Service {
importResult, importResult,
}, },
id: ctx.locals.job.id, id: ctx.locals.job.id,
socketSessionId: ctx.params.socketSessionId,
}; };
} catch (error) { } catch (error) {
console.error(`An error occurred processing Job ID ${ctx.locals.job.id}`); console.error(`An error occurred processing Job ID ${ctx.locals.job.id}`);
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", ctx.params.socketSessionId);
} }
} }
}, },
@@ -153,16 +158,28 @@ export default class JobQueueService extends Service {
events: { events: {
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme // use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
async "enqueue.async.active"(ctx) { async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
console.log(`Job ID ${ctx.params.id} is set to active.`); console.log(`Job ID ${ctx.params.id} is set to active.`);
}, },
async "enqueue.async.completed"(ctx) { async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
const jobState = await this.job(ctx.params.id);
await JobResult.create({
id: ctx.params.id,
status: "completed",
failedReason: {},
});
console.log(`Job ID ${ctx.params.id} completed.`); console.log(`Job ID ${ctx.params.id} completed.`);
}, },
async "enqueue.async.failed"(ctx) { async "enqueue.async.failed"(ctx) {
console.log("ch-----++++++++++-"); const jobState = await this.job(ctx.params.id);
await JobResult.create({
id: ctx.params.id,
status: "failed",
failedReason: jobState.failedReason,
});
} }
} }
}); });

View File

@@ -154,6 +154,7 @@ export default class ImportService extends Service {
async handler( async handler(
ctx: Context<{ ctx: Context<{
extractionOptions?: any; extractionOptions?: any;
socketSessionId: String,
}> }>
) { ) {
try { try {
@@ -203,6 +204,7 @@ export default class ImportService extends Service {
fileSize: item.stats.size, fileSize: item.stats.size,
}, },
importType: "new", importType: "new",
socketSessionId: ctx.params.socketSessionId,
}); });
} else { } else {
console.log( console.log(

View File

@@ -46,14 +46,15 @@ export default class SettingsService extends Service {
.map((item) => JSON.stringify(item)) .map((item) => JSON.stringify(item))
.join("\n"); .join("\n");
queries += "\n"; queries += "\n";
const { body } = await eSClient.msearch({ const { responses } = await eSClient.msearch({
body: queries, body: queries,
}); });
body.responses.forEach((match) => {
responses.forEach((match) => {
console.log(match.hits); console.log(match.hits);
}); });
return body.responses; return responses;
}, },
}, },
issue: { issue: {

View File

@@ -1,16 +1,16 @@
"use strict"; "use strict";
import { Service, ServiceBroker, ServiceSchema } from "moleculer"; import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
import { createClient } from "redis"; import { createClient } from "redis";
import { createAdapter } from "@socket.io/redis-adapter"; import { createAdapter } from "@socket.io/redis-adapter";
const SocketIOService = require("moleculer-io"); const SocketIOService = require("moleculer-io");
const redisURL = new URL(process.env.REDIS_URI);
// console.log(redisURL.hostname);
const redisURL = new URL(process.env.REDIS_URI);
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` }); const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
(async () => { (async () => {
await pubClient.connect(); await pubClient.connect();
})(); })();
const subClient = pubClient.duplicate(); const subClient = pubClient.duplicate();
export default class SocketService extends Service { export default class SocketService extends Service {
// @ts-ignore // @ts-ignore
public constructor( public constructor(
@@ -18,6 +18,7 @@ export default class SocketService extends Service {
schema: ServiceSchema<{}> = { name: "socket" } schema: ServiceSchema<{}> = { name: "socket" }
) { ) {
super(broker); super(broker);
let socketSessionId = null;
this.parseServiceSchema({ this.parseServiceSchema({
name: "socket", name: "socket",
mixins: [SocketIOService], mixins: [SocketIOService],
@@ -30,8 +31,7 @@ export default class SocketService extends Service {
call: { call: {
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"], // whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
}, },
action: async (data, ack) => { action: async (data) => {
// write your handler function here.
switch (data.type) { switch (data.type) {
case "LS_IMPORT": case "LS_IMPORT":
console.log( console.log(
@@ -40,10 +40,11 @@ export default class SocketService extends Service {
// 1. Send task to queue // 1. Send task to queue
await this.broker.call( await this.broker.call(
"library.newImport", "library.newImport",
data.data, { data: data.data, socketSessionId },
{} {}
); );
break; break;
case "LS_TOGGLE_IMPORT_QUEUE": case "LS_TOGGLE_IMPORT_QUEUE":
await this.broker.call( await this.broker.call(
"importqueue.toggleImportQueue", "importqueue.toggleImportQueue",
@@ -77,12 +78,18 @@ export default class SocketService extends Service {
}, },
}, },
hooks: {}, hooks: {},
actions: {}, actions: {
methods: {},
},
methods: {
},
async started() { async started() {
this.io.on("connection", (data) => this.io.on("connection", (socket) => {
console.log("socket.io server initialized.") console.log(`socket.io server initialized with session ID: ${socket.id}`);
); socket.emit("sessionId", socket.id);
socketSessionId = socket.id;
});
}, },
}); });
} }

View File

@@ -254,7 +254,7 @@ export const extractComicInfoXMLFromZip = async (
// Push the first file (cover) to our extraction target // Push the first file (cover) to our extraction target
extractionTargets.push(files[0].name); extractionTargets.push(files[0].name);
filesToWriteToDisk.coverFile = path.basename(files[0].name); filesToWriteToDisk.coverFile = path.basename(files[0].name);
if (!isEmpty(comicInfoXMLFileObject)) { if (!isEmpty(comicInfoXMLFileObject)) {
filesToWriteToDisk.comicInfoXML = comicInfoXMLFileObject[0].name; filesToWriteToDisk.comicInfoXML = comicInfoXMLFileObject[0].name;
extractionTargets.push(filesToWriteToDisk.comicInfoXML); extractionTargets.push(filesToWriteToDisk.comicInfoXML);
@@ -364,10 +364,11 @@ export const extractFromArchive = async (filePath: string) => {
return Object.assign({}, ...cbrResult); return Object.assign({}, ...cbrResult);
default: default:
console.log( console.error(
"Error inferring filetype for comicinfo.xml extraction." "Error inferring filetype for comicinfo.xml extraction."
); );
break; throw new Error("Cannot infer filetype");
} }
}; };