🏗️ Added some archive-related keys to Comic model

This commit is contained in:
2024-01-06 11:17:40 -05:00
parent f3965437b5
commit bbd2906ebf
5 changed files with 404 additions and 175 deletions

View File

@@ -2,9 +2,16 @@ import { Context, Service, ServiceBroker } from "moleculer";
import JobResult from "../models/jobresult.model";
import { refineQuery } from "filename-parser";
import BullMqMixin from "moleculer-bullmq";
import { extractFromArchive, uncompressEntireArchive } from "../utils/uncompression.utils";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
const ObjectId = require("mongoose").Types.ObjectId;
import {
extractFromArchive,
uncompressEntireArchive,
} from "../utils/uncompression.utils";
import { isNil, isUndefined } from "lodash";
import { pubClient } from "../config/redis.config";
import path from "path";
const { MoleculerError } = require("moleculer").Errors;
@@ -15,7 +22,7 @@ export default class JobQueueService extends Service {
this.parseServiceSchema({
name: "jobqueue",
hooks: {},
mixins: [BullMqMixin],
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: process.env.REDIS_URI,
@@ -47,13 +54,20 @@ export default class JobQueueService extends Service {
enqueue: {
queue: true,
rest: "/GET enqueue",
handler: async (ctx: Context<{ queueName: string; description: string }>) => {
handler: async (
ctx: Context<{ queueName: string; description: string }>
) => {
console.log(ctx.params);
const { queueName, description } = ctx.params;
// Enqueue the job
const job = await this.localQueue(ctx, queueName, ctx.params, {
priority: 10,
});
const job = await this.localQueue(
ctx,
queueName,
ctx.params,
{
priority: 10,
}
);
console.log(`Job ${job.id} enqueued`);
console.log(`${description}`);
@@ -68,13 +82,17 @@ export default class JobQueueService extends Service {
}>
) => {
try {
console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`);
console.log(
`Recieved Job ID ${ctx.locals.job.id}, processing...`
);
console.log(ctx.params);
// 1. De-structure the job params
const { fileObject } = ctx.locals.job.data.params;
// 2. Extract metadata from the archive
const result = await extractFromArchive(fileObject.filePath);
const result = await extractFromArchive(
fileObject.filePath
);
const {
name,
filePath,
@@ -87,7 +105,9 @@ export default class JobQueueService extends Service {
} = result;
// 3a. Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(result.name);
const { inferredIssueDetails } = refineQuery(
result.name
);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
@@ -127,7 +147,8 @@ export default class JobQueueService extends Service {
// "acquisition.directconnect.downloads": [],
// mark the metadata source
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom,
"acquisition.source.name":
ctx.locals.job.data.params.sourcedFrom,
};
// 3c. Add the bundleId, if present to the payload
@@ -138,8 +159,13 @@ export default class JobQueueService extends Service {
// 3d. Add the sourcedMetadata, if present
if (
!isNil(ctx.locals.job.data.params.sourcedMetadata) &&
!isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine)
!isNil(
ctx.locals.job.data.params.sourcedMetadata
) &&
!isUndefined(
ctx.locals.job.data.params.sourcedMetadata
.comicvine
)
) {
Object.assign(
payload.sourcedMetadata,
@@ -148,11 +174,15 @@ export default class JobQueueService extends Service {
}
// 4. write to mongo
const importResult = await this.broker.call("library.rawImportToDB", {
importType: ctx.locals.job.data.params.importType,
bundleId,
payload,
});
const importResult = await this.broker.call(
"library.rawImportToDB",
{
importType:
ctx.locals.job.data.params.importType,
bundleId,
payload,
}
);
return {
data: {
importResult,
@@ -164,9 +194,14 @@ export default class JobQueueService extends Service {
console.error(
`An error occurred processing Job ID ${ctx.locals.job.id}`
);
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", {
data: ctx.params.sessionId,
});
throw new MoleculerError(
error,
500,
"IMPORT_JOB_ERROR",
{
data: ctx.params.sessionId,
}
);
}
},
},
@@ -194,7 +229,8 @@ export default class JobQueueService extends Service {
statuses: {
$push: {
status: "$_id.status",
earliestTimestamp: "$earliestTimestamp",
earliestTimestamp:
"$earliestTimestamp",
count: "$count",
},
},
@@ -214,7 +250,10 @@ export default class JobQueueService extends Service {
{
$cond: [
{
$eq: ["$$this.status", "completed"],
$eq: [
"$$this.status",
"completed",
],
},
"$$this.count",
0,
@@ -234,7 +273,10 @@ export default class JobQueueService extends Service {
{
$cond: [
{
$eq: ["$$this.status", "failed"],
$eq: [
"$$this.status",
"failed",
],
},
"$$this.count",
0,
@@ -254,21 +296,62 @@ export default class JobQueueService extends Service {
},
"uncompressFullArchive.async": {
rest: "POST /uncompressFullArchive",
handler: async (ctx: Context<{ filePath: string; options: any }>) => {
const { filePath, options } = ctx.params;
console.log("asd", filePath);
handler: async (
ctx: Context<{
filePath: string;
comicObjectId: string;
options: any;
}>
) => {
console.log(
`Recieved Job ID ${JSON.stringify(
ctx.locals
)}, processing...`
);
const { filePath, options, comicObjectId } = ctx.params;
const comicId = new ObjectId(comicObjectId);
// 2. Extract metadata from the archive
return await uncompressEntireArchive(filePath, options);
const result: string[] = await uncompressEntireArchive(
filePath,
options
);
if (Array.isArray(result) && result.length !== 0) {
// Get the containing directory of the uncompressed archive
const directoryPath = path.dirname(result[0]);
// Add to mongo object
await Comic.findByIdAndUpdate(
comicId,
{
$set: {
"rawFileDetails.archive": {
uncompressed: true,
expandedPath: directoryPath,
},
},
},
{ new: true, safe: true, upsert: true }
);
return result;
}
},
},
},
events: {
async "uncompressFullArchive.async.active"(ctx: Context<{ id: number }>) {
console.log(`Uncompression Job ID ${ctx.params.id} is set to active.`);
async "uncompressFullArchive.async.active"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} is set to active.`
);
},
async "uncompressFullArchive.async.completed"(ctx: Context<{ id: number }>) {
console.log(`Uncompression Job ID ${ctx.params.id} completed.`);
async "uncompressFullArchive.async.completed"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} completed.`
);
},
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
@@ -292,7 +375,9 @@ export default class JobQueueService extends Service {
// 2. Increment the completed job counter
await pubClient.incr("completedJobCount");
// 3. Fetch the completed job count for the final payload to be sent to the client
const completedJobCount = await pubClient.get("completedJobCount");
const completedJobCount = await pubClient.get(
"completedJobCount"
);
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
@@ -319,7 +404,9 @@ export default class JobQueueService extends Service {
async "enqueue.async.failed"(ctx) {
const job = await this.job(ctx.params.id);
await pubClient.incr("failedJobCount");
const failedJobCount = await pubClient.get("failedJobCount");
const failedJobCount = await pubClient.get(
"failedJobCount"
);
await JobResult.create({
id: ctx.params.id,