🏗 Added a job for full archive extraction

This commit is contained in:
2023-12-30 00:50:06 -05:00
parent 78e0e9f8ce
commit f3965437b5
4 changed files with 246 additions and 402 deletions

View File

@@ -2,7 +2,7 @@ import { Context, Service, ServiceBroker } from "moleculer";
import JobResult from "../models/jobresult.model";
import { refineQuery } from "filename-parser";
import BullMqMixin from "moleculer-bullmq";
import { extractFromArchive } from "../utils/uncompression.utils";
import { extractFromArchive, uncompressEntireArchive } from "../utils/uncompression.utils";
import { isNil, isUndefined } from "lodash";
import { pubClient } from "../config/redis.config";
@@ -47,17 +47,15 @@ export default class JobQueueService extends Service {
enqueue: {
queue: true,
rest: "/GET enqueue",
handler: async (ctx: Context<{}>) => {
handler: async (ctx: Context<{ queueName: string; description: string }>) => {
console.log(ctx.params);
const { queueName, description } = ctx.params;
// Enqueue the job
const job = await this.localQueue(
ctx,
"enqueue.async",
ctx.params,
{
priority: 10,
}
);
const job = await this.localQueue(ctx, queueName, ctx.params, {
priority: 10,
});
console.log(`Job ${job.id} enqueued`);
console.log(`${description}`);
return job.id;
},
@@ -70,17 +68,13 @@ export default class JobQueueService extends Service {
}>
) => {
try {
console.log(
`Recieved Job ID ${ctx.locals.job.id}, processing...`
);
console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`);
console.log(ctx.params);
// 1. De-structure the job params
const { fileObject } = ctx.locals.job.data.params;
// 2. Extract metadata from the archive
const result = await extractFromArchive(
fileObject.filePath
);
const result = await extractFromArchive(fileObject.filePath);
const {
name,
filePath,
@@ -93,9 +87,7 @@ export default class JobQueueService extends Service {
} = result;
// 3a. Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(
result.name
);
const { inferredIssueDetails } = refineQuery(result.name);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
@@ -135,8 +127,7 @@ export default class JobQueueService extends Service {
// "acquisition.directconnect.downloads": [],
// mark the metadata source
"acquisition.source.name":
ctx.locals.job.data.params.sourcedFrom,
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom,
};
// 3c. Add the bundleId, if present to the payload
@@ -147,13 +138,8 @@ export default class JobQueueService extends Service {
// 3d. Add the sourcedMetadata, if present
if (
!isNil(
ctx.locals.job.data.params.sourcedMetadata
) &&
!isUndefined(
ctx.locals.job.data.params.sourcedMetadata
.comicvine
)
!isNil(ctx.locals.job.data.params.sourcedMetadata) &&
!isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine)
) {
Object.assign(
payload.sourcedMetadata,
@@ -162,15 +148,11 @@ export default class JobQueueService extends Service {
}
// 4. write to mongo
const importResult = await this.broker.call(
"library.rawImportToDB",
{
importType:
ctx.locals.job.data.params.importType,
bundleId,
payload,
}
);
const importResult = await this.broker.call("library.rawImportToDB", {
importType: ctx.locals.job.data.params.importType,
bundleId,
payload,
});
return {
data: {
importResult,
@@ -182,14 +164,9 @@ export default class JobQueueService extends Service {
console.error(
`An error occurred processing Job ID ${ctx.locals.job.id}`
);
throw new MoleculerError(
error,
500,
"IMPORT_JOB_ERROR",
{
data: ctx.params.sessionId,
}
);
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", {
data: ctx.params.sessionId,
});
}
},
},
@@ -217,8 +194,7 @@ export default class JobQueueService extends Service {
statuses: {
$push: {
status: "$_id.status",
earliestTimestamp:
"$earliestTimestamp",
earliestTimestamp: "$earliestTimestamp",
count: "$count",
},
},
@@ -238,10 +214,7 @@ export default class JobQueueService extends Service {
{
$cond: [
{
$eq: [
"$$this.status",
"completed",
],
$eq: ["$$this.status", "completed"],
},
"$$this.count",
0,
@@ -261,10 +234,7 @@ export default class JobQueueService extends Service {
{
$cond: [
{
$eq: [
"$$this.status",
"failed",
],
$eq: ["$$this.status", "failed"],
},
"$$this.count",
0,
@@ -282,9 +252,24 @@ export default class JobQueueService extends Service {
]);
},
},
"uncompressFullArchive.async": {
rest: "POST /uncompressFullArchive",
handler: async (ctx: Context<{ filePath: string; options: any }>) => {
const { filePath, options } = ctx.params;
console.log("asd", filePath);
// 2. Extract metadata from the archive
return await uncompressEntireArchive(filePath, options);
},
},
},
events: {
async "uncompressFullArchive.async.active"(ctx: Context<{ id: number }>) {
console.log(`Uncompression Job ID ${ctx.params.id} is set to active.`);
},
async "uncompressFullArchive.async.completed"(ctx: Context<{ id: number }>) {
console.log(`Uncompression Job ID ${ctx.params.id} completed.`);
},
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
console.log(`Job ID ${ctx.params.id} is set to active.`);
@@ -307,9 +292,7 @@ export default class JobQueueService extends Service {
// 2. Increment the completed job counter
await pubClient.incr("completedJobCount");
// 3. Fetch the completed job count for the final payload to be sent to the client
const completedJobCount = await pubClient.get(
"completedJobCount"
);
const completedJobCount = await pubClient.get("completedJobCount");
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
@@ -336,9 +319,7 @@ export default class JobQueueService extends Service {
async "enqueue.async.failed"(ctx) {
const job = await this.job(ctx.params.id);
await pubClient.incr("failedJobCount");
const failedJobCount = await pubClient.get(
"failedJobCount"
);
const failedJobCount = await pubClient.get("failedJobCount");
await JobResult.create({
id: ctx.params.id,

View File

@@ -33,13 +33,7 @@ SOFTWARE.
"use strict";
import { isNil } from "lodash";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import { walkFolder, getSizeOfDirectory } from "../utils/file.utils";
@@ -101,9 +95,7 @@ export default class ImportService extends Service {
uncompressFullArchive: {
rest: "POST /uncompressFullArchive",
params: {},
handler: async (
ctx: Context<{ filePath: string; options: any }>
) => {
handler: async (ctx: Context<{ filePath: string; options: any }>) => {
await broker.call("importqueue.uncompressResize", {
filePath: ctx.params.filePath,
options: ctx.params.options,
@@ -121,8 +113,7 @@ export default class ImportService extends Service {
});
// Determine source where the comic was added from
// and gather identifying information about it
const sourceName =
referenceComicObject[0].acquisition.source.name;
const sourceName = referenceComicObject[0].acquisition.source.name;
const { sourcedMetadata } = referenceComicObject[0];
const filePath = `${COMICS_DIRECTORY}/${ctx.params.bundle.data.name}`;
@@ -166,14 +157,8 @@ export default class ImportService extends Service {
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
let fileExtension = path.extname(
item.path
);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
let fileExtension = path.extname(item.path);
if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) {
this.push(item);
}
next();
@@ -182,10 +167,7 @@ export default class ImportService extends Service {
// 1.2 Pipe filtered results to the next step
// Enqueue the job in the queue
.on("data", async (item) => {
console.info(
"Found a file at path: %s",
item.path
);
console.info("Found a file at path: %s", item.path);
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
@@ -194,14 +176,8 @@ export default class ImportService extends Service {
});
if (!comicExists) {
// 2.1 Reset the job counters in Redis
await pubClient.set(
"completedJobCount",
0
);
await pubClient.set(
"failedJobCount",
0
);
await pubClient.set("completedJobCount", 0);
await pubClient.set("failedJobCount", 0);
// 2.2 Send the extraction job to the queue
this.broker.call("jobqueue.enqueue", {
fileObject: {
@@ -210,11 +186,10 @@ export default class ImportService extends Service {
},
sessionId,
importType: "new",
queueName: "enqueue.async",
});
} else {
console.log(
"Comic already exists in the library."
);
console.log("Comic already exists in the library.");
}
})
.on("end", () => {
@@ -266,28 +241,19 @@ export default class ImportService extends Service {
// we solicit volume information and add that to mongo
if (
comicMetadata.sourcedMetadata.comicvine &&
!isNil(
comicMetadata.sourcedMetadata.comicvine
.volume
)
!isNil(comicMetadata.sourcedMetadata.comicvine.volume)
) {
volumeDetails = await this.broker.call(
"comicvine.getVolumes",
{
volumeURI:
comicMetadata.sourcedMetadata
.comicvine.volume
.api_detail_url,
}
);
volumeDetails = await this.broker.call("comicvine.getVolumes", {
volumeURI:
comicMetadata.sourcedMetadata.comicvine.volume
.api_detail_url,
});
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
volumeDetails.results;
}
console.log("Saving to Mongo...");
console.log(
`Import type: [${ctx.params.importType}]`
);
console.log(`Import type: [${ctx.params.importType}]`);
switch (ctx.params.importType) {
case "new":
return await Comic.create(comicMetadata);
@@ -308,10 +274,7 @@ export default class ImportService extends Service {
}
} catch (error) {
console.log(error);
throw new Errors.MoleculerError(
"Import failed.",
500
);
throw new Errors.MoleculerError("Import failed.", 500);
}
},
},
@@ -329,9 +292,7 @@ export default class ImportService extends Service {
) {
// 1. Find mongo object by id
// 2. Import payload into sourcedMetadata.comicvine
const comicObjectId = new ObjectId(
ctx.params.comicObjectId
);
const comicObjectId = new ObjectId(ctx.params.comicObjectId);
return new Promise(async (resolve, reject) => {
let volumeDetails = {};
@@ -340,18 +301,15 @@ export default class ImportService extends Service {
const volumeDetails = await this.broker.call(
"comicvine.getVolumes",
{
volumeURI:
matchedResult.volume.api_detail_url,
volumeURI: matchedResult.volume.api_detail_url,
}
);
matchedResult.volumeInformation =
volumeDetails.results;
matchedResult.volumeInformation = volumeDetails.results;
Comic.findByIdAndUpdate(
comicObjectId,
{
$set: {
"sourcedMetadata.comicvine":
matchedResult,
"sourcedMetadata.comicvine": matchedResult,
},
},
{ new: true },
@@ -382,9 +340,7 @@ export default class ImportService extends Service {
}>
) {
console.log(JSON.stringify(ctx.params, null, 2));
const comicObjectId = new ObjectId(
ctx.params.comicObjectId
);
const comicObjectId = new ObjectId(ctx.params.comicObjectId);
return new Promise((resolve, reject) => {
Comic.findByIdAndUpdate(
@@ -439,9 +395,7 @@ export default class ImportService extends Service {
params: { ids: "array" },
handler: async (ctx: Context<{ ids: [string] }>) => {
console.log(ctx.params.ids);
const queryIds = ctx.params.ids.map(
(id) => new ObjectId(id)
);
const queryIds = ctx.params.ids.map((id) => new ObjectId(id));
return await Comic.find({
_id: {
$in: queryIds,
@@ -457,8 +411,7 @@ export default class ImportService extends Service {
const volumes = await Comic.aggregate([
{
$project: {
volumeInfo:
"$sourcedMetadata.comicvine.volumeInformation",
volumeInfo: "$sourcedMetadata.comicvine.volumeInformation",
},
},
{
@@ -504,52 +457,46 @@ export default class ImportService extends Service {
const { queryObjects } = ctx.params;
// construct the query for ElasticSearch
let elasticSearchQuery = {};
const elasticSearchQueries = queryObjects.map(
(queryObject) => {
console.log("Volume: ", queryObject.volumeName);
console.log("Issue: ", queryObject.issueName);
if (queryObject.issueName === null) {
queryObject.issueName = "";
}
if (queryObject.volumeName === null) {
queryObject.volumeName = "";
}
elasticSearchQuery = {
bool: {
must: [
{
match_phrase: {
"rawFileDetails.name":
queryObject.volumeName,
},
},
{
term: {
"inferredMetadata.issue.number":
parseInt(
queryObject.issueNumber,
10
),
},
},
],
},
};
return [
{
index: "comics",
search_type: "dfs_query_then_fetch",
},
{
query: elasticSearchQuery,
},
];
const elasticSearchQueries = queryObjects.map((queryObject) => {
console.log("Volume: ", queryObject.volumeName);
console.log("Issue: ", queryObject.issueName);
if (queryObject.issueName === null) {
queryObject.issueName = "";
}
);
console.log(
JSON.stringify(elasticSearchQueries, null, 2)
);
if (queryObject.volumeName === null) {
queryObject.volumeName = "";
}
elasticSearchQuery = {
bool: {
must: [
{
match_phrase: {
"rawFileDetails.name": queryObject.volumeName,
},
},
{
term: {
"inferredMetadata.issue.number": parseInt(
queryObject.issueNumber,
10
),
},
},
],
},
};
return [
{
index: "comics",
search_type: "dfs_query_then_fetch",
},
{
query: elasticSearchQuery,
},
];
});
console.log(JSON.stringify(elasticSearchQueries, null, 2));
return await ctx.broker.call("search.searchComic", {
elasticSearchQueries,
@@ -562,10 +509,11 @@ export default class ImportService extends Service {
rest: "GET /libraryStatistics",
params: {},
handler: async (ctx: Context<{}>) => {
const comicDirectorySize = await getSizeOfDirectory(
COMICS_DIRECTORY,
[".cbz", ".cbr", ".cb7"]
);
const comicDirectorySize = await getSizeOfDirectory(COMICS_DIRECTORY, [
".cbz",
".cbr",
".cb7",
]);
const totalCount = await Comic.countDocuments({});
const statistics = await Comic.aggregate([
{
@@ -574,11 +522,7 @@ export default class ImportService extends Service {
{
$match: {
"rawFileDetails.extension": {
$in: [
".cbr",
".cbz",
".cb7",
],
$in: [".cbr", ".cbz", ".cb7"],
},
},
},
@@ -592,10 +536,9 @@ export default class ImportService extends Service {
issues: [
{
$match: {
"sourcedMetadata.comicvine.volumeInformation":
{
$gt: {},
},
"sourcedMetadata.comicvine.volumeInformation": {
$gt: {},
},
},
},
{
@@ -658,23 +601,16 @@ export default class ImportService extends Service {
.drop()
.then(async (data) => {
console.info(data);
const coversFolderDeleteResult =
fsExtra.emptyDirSync(
path.resolve(
`${USERDATA_DIRECTORY}/covers`
)
);
const expandedFolderDeleteResult =
fsExtra.emptyDirSync(
path.resolve(
`${USERDATA_DIRECTORY}/expanded`
)
);
const eSIndicesDeleteResult =
await ctx.broker.call(
"search.deleteElasticSearchIndices",
{}
);
const coversFolderDeleteResult = fsExtra.emptyDirSync(
path.resolve(`${USERDATA_DIRECTORY}/covers`)
);
const expandedFolderDeleteResult = fsExtra.emptyDirSync(
path.resolve(`${USERDATA_DIRECTORY}/expanded`)
);
const eSIndicesDeleteResult = await ctx.broker.call(
"search.deleteElasticSearchIndices",
{}
);
return {
data,
coversFolderDeleteResult,