🔧 Bull + Redis task queue first draft

This commit is contained in:
2021-10-27 07:30:56 -07:00
parent 289b2ec3bd
commit 0ea5f78e98
3 changed files with 37 additions and 50 deletions

View File

@@ -99,39 +99,24 @@ export default class ApiService extends Service {
this.logger.info("Client connected via websocket!"); this.logger.info("Client connected via websocket!");
client.on("action", (action, done) => { client.on("action", (action, done) => {
console.log(action);
switch (action.type) { switch (action.type) {
case "LS_IMPORT": case "LS_IMPORT":
this.broker this.broker
.call( .call(
"import.processAndImportToDB", "libraryqueue.enqueue",
action.data, action.data,
{} {}
) )
.then((res) => { .then((res) => {
if (done) done(res); if (done) {
done(res);
}
}) })
.catch((err) => this.logger.error(err)); .catch((err) => this.logger.error(err));
break; break;
} }
}); });
// Add a disconnect listener
// client.on("call", ({ action, params, opts }, done) => {
// this.logger.info(
// "Received request from client! Action:",
// action,
// ", Params:",
// params
// );
// this.broker
// .call(action, params, opts)
// .then((res) => {
// if (done) done(res);
// })
// .catch((err) => this.logger.error(err));
// });
client.on("disconnect", () => { client.on("disconnect", () => {
this.logger.info("Client disconnected"); this.logger.info("Client disconnected");
}); });

View File

@@ -91,7 +91,6 @@ export default class ImportService extends Service {
try { try {
const { extractionOptions, walkedFolders } = const { extractionOptions, walkedFolders } =
ctx.params; ctx.params;
// map(walkedFolders, async (folder, idx) => {
let comicExists = await Comic.exists({ let comicExists = await Comic.exists({
"rawFileDetails.name": `${walkedFolders.name}`, "rawFileDetails.name": `${walkedFolders.name}`,
}); });
@@ -127,12 +126,12 @@ export default class ImportService extends Service {
{} {}
); );
return await dbImportResult;
} else { } else {
logger.info( logger.info(
`Comic: \"${walkedFolders.name}\" already exists in the database` `Comic: \"${walkedFolders.name}\" already exists in the database`
); );
} }
// });
} catch (error) { } catch (error) {
logger.error( logger.error(
"Error importing comic books", "Error importing comic books",

View File

@@ -6,7 +6,6 @@ import {
ServiceSchema, ServiceSchema,
Errors, Errors,
} from "moleculer"; } from "moleculer";
import BullMQMixin from "moleculer-bull"; import BullMQMixin from "moleculer-bull";
export default class LibraryQueueService extends Service { export default class LibraryQueueService extends Service {
@@ -23,13 +22,13 @@ export default class LibraryQueueService extends Service {
settings: {}, settings: {},
hooks: {}, hooks: {},
queues: { queues: {
"mail.send": { "process.import": {
async process(job) { async process(job) {
this.logger.info("New job received!", job.data); this.logger.info("New job received!", job.data);
this.logger.info(`Processing queue...`); this.logger.info(`Processing queue...`);
// const accounts = await this.broker.call('v1.users.list'); const result = await this.broker.call('import.processAndImportToDB', job.data);
// this.logger.info(accounts);
return Promise.resolve({ return Promise.resolve({
result,
id: job.id, id: job.id,
worker: process.pid, worker: process.pid,
}); });
@@ -40,36 +39,40 @@ export default class LibraryQueueService extends Service {
enqueue: { enqueue: {
rest: "POST /enqueue", rest: "POST /enqueue",
params: {}, params: {},
async handler(ctx: Context<{}>) { async handler(ctx: Context<{ extractionOptions: object, walkedFolders: object}>) {
console.log(ctx.params);
const job = await this.createJob("mail.send", { const job = await this.createJob("mail.send", {
blah: "blah", extractionOptions: ctx.params.extractionOptions,
}); walkedFolders: ctx.params.walkedFolders,
const failed = await this.getQueue(
"mail.send"
).on("failed", async (job, error) => {
this.logger.error(
`An error occured in 'mail.send' queue on job id '${job.id}': ${error.message}`
);
});
const completed = await this.getQueue(
"mail.send"
).on("completed", async (job, res) => {
this.logger.info(
`Job with the id '${job.id}' completed.`
);
});
const stalled = await this.getQueue(
"mail.send"
).on("stalled", async (job) => {
this.logger.warn(
`The job with the id '${job} got stalled!`
);
}); });
}, },
}, },
}, },
methods: {}, methods: {},
async started(): Promise<any> {}, async started(): Promise<any> {
const failed = await this.getQueue(
"process.import"
).on("failed", async (job, error) => {
this.logger.error(
`An error occured in 'mail.send' queue on job id '${job.id}': ${error.message}`
);
});
const completed = await this.getQueue(
"process.import"
).on("completed", async (job, res) => {
this.logger.info(
`Job with the id '${job.id}' completed.`
);
});
const stalled = await this.getQueue(
"process.import"
).on("stalled", async (job) => {
this.logger.warn(
`The job with the id '${job} got stalled!`
);
});
},
}, },
schema schema
) )