🐂 Added moleculer-bull

1. Added Bull MQ for enqueuing uncompression job
2. Added socket.io init code
3. Added a queue microservice
This commit is contained in:
2021-10-27 14:24:55 -07:00
parent 0ea5f78e98
commit a15bfafa01
3 changed files with 15 additions and 18 deletions

View File

@@ -98,21 +98,16 @@ export default class ApiService extends Service {
this.io.on("connection", (client) => { this.io.on("connection", (client) => {
this.logger.info("Client connected via websocket!"); this.logger.info("Client connected via websocket!");
client.on("action", (action, done) => { client.on("action", async (action) => {
switch (action.type) { switch (action.type) {
case "LS_IMPORT": case "LS_IMPORT":
this.broker // 1. Send task to queue
.call( const result = await this.broker.call(
"libraryqueue.enqueue", "libraryqueue.enqueue",
action.data, action.data,
{} {}
) );
.then((res) => {
if (done) {
done(res);
}
})
.catch((err) => this.logger.error(err));
break; break;
} }
}); });

View File

@@ -126,7 +126,7 @@ export default class ImportService extends Service {
{} {}
); );
return await dbImportResult; return { comicBookCoverMetadata, dbImportResult };
} else { } else {
logger.info( logger.info(
`Comic: \"${walkedFolders.name}\" already exists in the database` `Comic: \"${walkedFolders.name}\" already exists in the database`
@@ -377,7 +377,7 @@ export default class ImportService extends Service {
new Promise((resolve, reject) => new Promise((resolve, reject) =>
https https
.get( .get(
`${apiDetailURL}?api_key=a5fa0663683df8145a85d694b5da4b87e1c92c69&format=json&limit=1&offset=0&field_list=id,name,description,image,first_issue,last_issue,publisher,count_of_issues,character_credits,person_credits,aliases`, `${apiDetailURL}?api_key=${process.env.COMICVINE_API_KEY}&format=json&limit=1&offset=0&field_list=id,name,description,image,first_issue,last_issue,publisher,count_of_issues,character_credits,person_credits,aliases`,
(resp) => { (resp) => {
let data = ""; let data = "";
resp.on("data", (chunk) => { resp.on("data", (chunk) => {

View File

@@ -7,6 +7,7 @@ import {
Errors, Errors,
} from "moleculer"; } from "moleculer";
import BullMQMixin from "moleculer-bull"; import BullMQMixin from "moleculer-bull";
const REDIS_URI = process.env.REDIS_URI || `redis://0.0.0.0:6379`;
export default class LibraryQueueService extends Service { export default class LibraryQueueService extends Service {
public constructor( public constructor(
@@ -18,7 +19,7 @@ export default class LibraryQueueService extends Service {
Service.mergeSchemas( Service.mergeSchemas(
{ {
name: "libraryqueue", name: "libraryqueue",
mixins: [BullMQMixin("redis://0.0.0.0:6379")], mixins: [BullMQMixin(REDIS_URI)],
settings: {}, settings: {},
hooks: {}, hooks: {},
queues: { queues: {
@@ -27,6 +28,7 @@ export default class LibraryQueueService extends Service {
this.logger.info("New job received!", job.data); this.logger.info("New job received!", job.data);
this.logger.info(`Processing queue...`); this.logger.info(`Processing queue...`);
const result = await this.broker.call('import.processAndImportToDB', job.data); const result = await this.broker.call('import.processAndImportToDB', job.data);
return Promise.resolve({ return Promise.resolve({
result, result,
id: job.id, id: job.id,
@@ -40,12 +42,12 @@ export default class LibraryQueueService extends Service {
rest: "POST /enqueue", rest: "POST /enqueue",
params: {}, params: {},
async handler(ctx: Context<{ extractionOptions: object, walkedFolders: object}>) { async handler(ctx: Context<{ extractionOptions: object, walkedFolders: object}>) {
console.log(ctx.params); return await this.createJob("process.import", {
const job = await this.createJob("mail.send", {
extractionOptions: ctx.params.extractionOptions, extractionOptions: ctx.params.extractionOptions,
walkedFolders: ctx.params.walkedFolders, walkedFolders: ctx.params.walkedFolders,
}); });
}, },
}, },
}, },