diff --git a/services/api.service.ts b/services/api.service.ts index 0d27a8c..717a2a4 100644 --- a/services/api.service.ts +++ b/services/api.service.ts @@ -99,39 +99,24 @@ export default class ApiService extends Service { this.logger.info("Client connected via websocket!"); client.on("action", (action, done) => { - console.log(action); switch (action.type) { case "LS_IMPORT": this.broker .call( - "import.processAndImportToDB", + "libraryqueue.enqueue", action.data, {} ) .then((res) => { - if (done) done(res); + if (done) { + done(res); + } }) .catch((err) => this.logger.error(err)); break; } }); - - // client.on("call", ({ action, params, opts }, done) => { - // this.logger.info( - // "Received request from client! Action:", - // action, - // ", Params:", - // params - // ); - - // this.broker - // .call(action, params, opts) - // .then((res) => { - // if (done) done(res); - // }) - // .catch((err) => this.logger.error(err)); - // }); - + // Add a disconnect listener client.on("disconnect", () => { this.logger.info("Client disconnected"); }); diff --git a/services/import.service.ts b/services/import.service.ts index c88ed48..2a945b6 100644 --- a/services/import.service.ts +++ b/services/import.service.ts @@ -91,7 +91,6 @@ export default class ImportService extends Service { try { const { extractionOptions, walkedFolders } = ctx.params; - // map(walkedFolders, async (folder, idx) => { let comicExists = await Comic.exists({ "rawFileDetails.name": `${walkedFolders.name}`, }); @@ -127,12 +126,12 @@ export default class ImportService extends Service { {} ); + return await dbImportResult; } else { logger.info( `Comic: \"${walkedFolders.name}\" already exists in the database` ); } - // }); } catch (error) { logger.error( "Error importing comic books", diff --git a/services/library.queue.service.ts b/services/library.queue.service.ts index 2c48a11..9c18069 100644 --- a/services/library.queue.service.ts +++ b/services/library.queue.service.ts @@ -6,7 +6,6 @@ import { ServiceSchema, Errors, } from "moleculer"; - import BullMQMixin from "moleculer-bull"; export default class LibraryQueueService extends Service { @@ -23,13 +22,13 @@ export default class LibraryQueueService extends Service { settings: {}, hooks: {}, queues: { - "mail.send": { + "process.import": { async process(job) { this.logger.info("New job received!", job.data); this.logger.info(`Processing queue...`); - // const accounts = await this.broker.call('v1.users.list'); - // this.logger.info(accounts); + const result = await this.broker.call('import.processAndImportToDB', job.data); return Promise.resolve({ + result, id: job.id, worker: process.pid, }); @@ -40,36 +39,40 @@ export default class LibraryQueueService extends Service { enqueue: { rest: "POST /enqueue", params: {}, - async handler(ctx: Context<{}>) { + async handler(ctx: Context<{ extractionOptions: object, walkedFolders: object}>) { + console.log(ctx.params); const job = await this.createJob("mail.send", { - blah: "blah", - }); - const failed = await this.getQueue( - "mail.send" - ).on("failed", async (job, error) => { - this.logger.error( - `An error occured in 'mail.send' queue on job id '${job.id}': ${error.message}` - ); - }); - const completed = await this.getQueue( - "mail.send" - ).on("completed", async (job, res) => { - this.logger.info( - `Job with the id '${job.id}' completed.` - ); - }); - const stalled = await this.getQueue( - "mail.send" - ).on("stalled", async (job) => { - this.logger.warn( - `The job with the id '${job} got stalled!` - ); + extractionOptions: ctx.params.extractionOptions, + walkedFolders: ctx.params.walkedFolders, }); + }, }, }, methods: {}, - async started(): Promise {}, + async started(): Promise { + const failed = await this.getQueue( + "process.import" + ).on("failed", async (job, error) => { + this.logger.error( + `An error occured in 'mail.send' queue on job id '${job.id}': ${error.message}` + ); + }); + const completed = await this.getQueue( + "process.import" + ).on("completed", async (job, res) => { + this.logger.info( + `Job with the id '${job.id}' completed.` + ); + }); + const stalled = await this.getQueue( + "process.import" + ).on("stalled", async (job) => { + this.logger.warn( + `The job with the id '${job} got stalled!` + ); + }); + }, }, schema )