🏗️ Massive refactoring around uncompression and resizing methods

This commit is contained in:
2022-12-08 11:15:10 -08:00
parent 5f3c93da73
commit 9a41f34099
6 changed files with 282 additions and 246 deletions

View File

@@ -38,7 +38,7 @@ import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
import BullMQMixin, { SandboxedJob } from "moleculer-bull";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import { extractFromArchive } from "../utils/uncompression.utils";
import { extractFromArchive, uncompressEntireArchive } from "../utils/uncompression.utils";
const REDIS_URI = process.env.REDIS_URI || `redis://localhost:6379`;
const EventEmitter = require("events");
@@ -57,7 +57,7 @@ export default class QueueService extends Service {
settings: {
bullmq: {
maxStalledCount: 0,
}
},
},
hooks: {},
@@ -68,11 +68,9 @@ export default class QueueService extends Service {
console.info("New job received!", job.data);
console.info(`Processing queue...`);
// extract the cover
const result = await extractFromArchive(
job.data.fileObject.filePath
);
const {
name,
filePath,
@@ -87,7 +85,6 @@ export default class QueueService extends Service {
const { inferredIssueDetails } = refineQuery(
result.name
);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
@@ -126,7 +123,7 @@ export default class QueueService extends Service {
},
directconnect: {
downloads: [],
}
},
},
});
return {
@@ -145,8 +142,29 @@ export default class QueueService extends Service {
};
},
},
"process.uncompressAndResize": {
concurrency: 2,
async process(job: SandboxedJob) {
console.log(``);
return await uncompressEntireArchive(job.data.filePath, job.data.options);
},
},
},
actions: {
uncompressResize: {
rest: "POST /uncompressResize",
params: {},
async handler(
ctx: Context<{
data: { filePath: string; options: any };
}>
) {
return await this.createJob(
"process.uncompressAndResize",
ctx.params
);
},
},
processImport: {
rest: "POST /processImport",
params: {},
@@ -164,7 +182,6 @@ export default class QueueService extends Service {
rest: "POST /pauseImportQueue",
params: {},
handler: async (ctx: Context<{ action: string }>) => {
console.log(ctx.params);
switch (ctx.params.action) {
case "pause":
const foo = await this.getQueue(
@@ -183,11 +200,6 @@ export default class QueueService extends Service {
}
},
},
unarchiveComicBook: {
rest: "POST /unarchiveComicBook",
params: {},
handler: async (ctx: Context<{}>) => {},
},
},
methods: {},
async started(): Promise<any> {
@@ -208,19 +220,31 @@ export default class QueueService extends Service {
event: "action",
args: [{ type: "LS_COVER_EXTRACTED", result: res }], //optional
});
console.info(`Job with the id '${job.id}' completed.`);
console.info(`Import Job with the id '${job.id}' completed.`);
}
);
await this.getQueue("process.import").on(
"stalled",
async (job) => {
console.warn(
`The job with the id '${job.id} got stalled!`
`Import job '${job.id} stalled!`
);
console.log(`${JSON.stringify(job, null, 2)}`);
console.log(`is stalled.`);
}
);
await this.getQueue("process.uncompressAndResize").on(
"completed",
async (job, res) => {
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
args: [{ type: "COMICBOOK_EXTRACTION_SUCCESS", result: res }]
});
console.info(`Uncompression Job ${job.id} completed.`)
}
);
},
});
}