🐂 Queue pause/resume functionality

This commit is contained in:
2023-08-21 17:55:08 -04:00
parent e5fc879b2d
commit df6652cce9
4 changed files with 109 additions and 146 deletions

View File

@@ -23,12 +23,19 @@ export default class JobQueueService extends Service {
},
},
actions: {
getJobStatuses: {
rest: "GET /getJobStatuses",
handler: async (ctx: Context<{}>) => {
const foo = await this.getJobStatuses("enqueue.async");
console.log(foo);
return foo;
toggle: {
rest: "GET /toggle",
handler: async (ctx: Context<{ action: String }>) => {
switch (ctx.params.action) {
case "pause":
this.pause();
break;
case "resume":
this.resume();
break;
default:
console.log(`Unknown queue action.`);
}
},
},
enqueue: {
@@ -36,12 +43,9 @@ export default class JobQueueService extends Service {
rest: "/GET enqueue",
handler: async (ctx: Context<{}>) => {
// Enqueue the job
const job = await this.localQueue(
ctx,
"enqueue.async",
ctx.params,
{ priority: 10 }
);
const job = await this.localQueue(ctx, "enqueue.async", ctx.params, {
priority: 10,
});
console.log(`Job ${job.id} enqueued`);
return job.id;
@@ -55,17 +59,13 @@ export default class JobQueueService extends Service {
}>
) => {
try {
console.log(
`Recieved Job ID ${ctx.locals.job.id}, processing...`
);
console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`);
// 1. De-structure the job params
const { fileObject } = ctx.locals.job.data.params;
// 2. Extract metadata from the archive
const result = await extractFromArchive(
fileObject.filePath
);
const result = await extractFromArchive(fileObject.filePath);
const {
name,
filePath,
@@ -78,9 +78,7 @@ export default class JobQueueService extends Service {
} = result;
// 3a. Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(
result.name
);
const { inferredIssueDetails } = refineQuery(result.name);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
@@ -120,8 +118,7 @@ export default class JobQueueService extends Service {
// "acquisition.directconnect.downloads": [],
// mark the metadata source
"acquisition.source.name":
ctx.locals.job.data.params.sourcedFrom,
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom,
};
// 3c. Add the bundleId, if present to the payload
@@ -132,13 +129,8 @@ export default class JobQueueService extends Service {
// 3d. Add the sourcedMetadata, if present
if (
!isNil(
ctx.locals.job.data.params.sourcedMetadata
) &&
!isUndefined(
ctx.locals.job.data.params.sourcedMetadata
.comicvine
)
!isNil(ctx.locals.job.data.params.sourcedMetadata) &&
!isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine)
) {
Object.assign(
payload.sourcedMetadata,
@@ -147,15 +139,11 @@ export default class JobQueueService extends Service {
}
// 4. write to mongo
const importResult = await this.broker.call(
"library.rawImportToDB",
{
importType:
ctx.locals.job.data.params.importType,
bundleId,
payload,
}
);
const importResult = await this.broker.call("library.rawImportToDB", {
importType: ctx.locals.job.data.params.importType,
bundleId,
payload,
});
return {
data: {
importResult,
@@ -167,12 +155,9 @@ export default class JobQueueService extends Service {
console.error(
`An error occurred processing Job ID ${ctx.locals.job.id}`
);
throw new MoleculerError(
error,
500,
"IMPORT_JOB_ERROR",
{ data: ctx.params.socketSessionId }
);
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", {
data: ctx.params.socketSessionId,
});
}
},
},
@@ -186,8 +171,8 @@ export default class JobQueueService extends Service {
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
// 1. Fetch the job result using the job Id
const jobState = await this.job(ctx.params.id);
// 2. Incremement the completed job counter
const job = await this.job(ctx.params.id);
// 2. Increment the completed job counter
await pubClient.incr("completedJobCount");
// 3. Fetch the completed job count for the final payload to be sent to the client
const completedJobCount = await pubClient.get("completedJobCount");
@@ -195,19 +180,26 @@ export default class JobQueueService extends Service {
await this.broker.call("socket.broadcast", {
namespace: "/", //optional
event: "action",
args: [{ type: "LS_COVER_EXTRACTED", completedJobCount, importResult: jobState.returnvalue.data.importResult }], //optional
args: [
{
type: "LS_COVER_EXTRACTED",
completedJobCount,
importResult: job.returnvalue.data.importResult,
},
],
});
// 5. Persist the job results in mongo for posterity
// 5. Persist the job results in mongo for posterity
await JobResult.create({
id: ctx.params.id,
status: "completed",
failedReason: {},
});
// 6. Purge it from Redis
await job.remove();
console.log(`Job ID ${ctx.params.id} completed.`);
},
async "enqueue.async.failed"(ctx) {
console.log("FAILED FAILED FAILED FAILED FAILED")
const jobState = await this.job(ctx.params.id);
await pubClient.incr("failedJobCount");
const failedJobCount = await pubClient.get("failedJobCount");
@@ -222,10 +214,26 @@ export default class JobQueueService extends Service {
await this.broker.call("socket.broadcast", {
namespace: "/", //optional
event: "action",
args: [{ type: "LS_COVER_EXTRACTION_FAILED", failedJobCount, importResult: jobState }], //optional
args: [
{
type: "LS_COVER_EXTRACTION_FAILED",
failedJobCount,
importResult: jobState,
},
], //optional
});
},
async "enqueue.async.drained"(ctx) {
console.log(`Queue drained, all jobs processed.`);
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
args: [
{
type: "LS_IMPORT_QUEUE_DRAINED",
},
],
});
},
},
});