Files
threetwo-core-service/services/jobqueue.service.ts
2025-10-29 12:25:05 -04:00

684 lines
19 KiB
TypeScript

import { Context, Service, ServiceBroker } from "moleculer";
import JobResult from "../models/jobresult.model";
import { refineQuery } from "filename-parser";
import BullMqMixin from "moleculer-bullmq";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
const ObjectId = require("mongoose").Types.ObjectId;
import {
extractFromArchive,
uncompressEntireArchive,
} from "../utils/uncompression.utils";
import { isNil, isUndefined } from "lodash";
import { pubClient } from "../config/redis.config";
import path from "path";
const { MoleculerError } = require("moleculer").Errors;
console.log(process.env.REDIS_URI);
export default class JobQueueService extends Service {
public constructor(public broker: ServiceBroker) {
super(broker);
this.parseServiceSchema({
name: "jobqueue",
hooks: {},
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: process.env.REDIS_URI,
},
},
actions: {
getJobCountsByType: {
rest: "GET /getJobCountsByType",
handler: async (ctx: Context<{}>) => {
console.log(ctx.params);
return await this.$resolve("jobqueue").getJobCounts();
},
},
toggle: {
rest: "GET /toggle",
handler: async (ctx: Context<{ action: String }>) => {
switch (ctx.params.action) {
case "pause":
this.pause();
break;
case "resume":
this.resume();
break;
default:
console.log(`Unknown queue action.`);
}
},
},
enqueue: {
queue: true,
rest: "GET /enqueue",
handler: async (
ctx: Context<{ action: string; description: string }>
) => {
const { action, description } = ctx.params;
// Enqueue the job
const job = await this.localQueue(
ctx,
action,
ctx.params,
{
priority: 10,
}
);
console.log(`Job ${job.id} enqueued`);
console.log(`${description}`);
return job.id;
},
},
// Comic Book Import Job Queue - Enhanced for better metadata handling
"enqueue.async": {
handler: async (
ctx: Context<{
sessionId: String;
}>
) => {
try {
console.log(
`Received Job ID ${ctx.locals.job.id}, processing...`
);
// 1. De-structure the job params
const { fileObject } = ctx.locals.job.data.params;
// 2. Extract metadata from the archive
const result = await extractFromArchive(
fileObject.filePath
);
const {
name,
filePath,
fileSize,
extension,
mimeType,
cover,
containedIn,
comicInfoJSON,
} = result;
// 3a. Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(
result.name
);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
);
// 3b. Prepare sourced metadata from various sources
let sourcedMetadata = {
comicInfo: comicInfoJSON || {},
comicvine: {},
metron: {},
gcd: {},
locg: {}
};
// Include any external metadata if provided
if (!isNil(ctx.locals.job.data.params.sourcedMetadata)) {
const providedMetadata = ctx.locals.job.data.params.sourcedMetadata;
sourcedMetadata = {
...sourcedMetadata,
...providedMetadata
};
}
// 3c. Prepare inferred metadata matching Comic model structure
const inferredMetadata = {
series: inferredIssueDetails?.name || "Unknown Series",
issue: {
name: inferredIssueDetails?.name || "Unknown Series",
number: inferredIssueDetails?.number || 1,
subtitle: inferredIssueDetails?.subtitle || "",
year: inferredIssueDetails?.year || new Date().getFullYear().toString()
},
volume: 1, // Default volume since not available in inferredIssueDetails
title: inferredIssueDetails?.name || path.basename(filePath, path.extname(filePath))
};
// 3d. Create canonical metadata - user-curated values with source attribution
const canonicalMetadata = this.createCanonicalMetadata(sourcedMetadata, inferredMetadata);
// 3e. Create comic payload with canonical metadata structure
const comicPayload = {
// File details
rawFileDetails: {
name,
filePath,
fileSize,
extension,
mimeType,
containedIn,
cover,
},
// Enhanced sourced metadata (now supports more sources)
sourcedMetadata,
// Original inferred metadata
inferredMetadata,
// New canonical metadata - user-curated values with source attribution
canonicalMetadata,
// Import status
"acquisition.source.wanted": false,
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom,
};
// 3f. Add bundleId if present
let bundleId = null;
if (!isNil(ctx.locals.job.data.params.bundleId)) {
bundleId = ctx.locals.job.data.params.bundleId;
}
// 4. Use library service to import with enhanced metadata
const importResult = await this.broker.call(
"library.importFromJob",
{
importType: ctx.locals.job.data.params.importType,
bundleId,
payload: comicPayload,
}
);
return {
data: {
importResult,
},
id: ctx.locals.job.id,
sessionId: ctx.params.sessionId,
};
} catch (error) {
console.error(
`An error occurred processing Job ID ${ctx.locals.job.id}`
);
throw new MoleculerError(
error,
500,
"ENHANCED_IMPORT_JOB_ERROR",
{
data: ctx.params.sessionId,
}
);
}
},
},
getJobResultStatistics: {
rest: "GET /getJobResultStatistics",
handler: async (ctx: Context<{}>) => {
return await JobResult.aggregate([
{
$group: {
_id: {
sessionId: "$sessionId",
status: "$status",
},
earliestTimestamp: {
$min: "$timestamp",
},
count: {
$sum: 1,
},
},
},
{
$group: {
_id: "$_id.sessionId",
statuses: {
$push: {
status: "$_id.status",
earliestTimestamp:
"$earliestTimestamp",
count: "$count",
},
},
},
},
{
$project: {
_id: 0,
sessionId: "$_id",
completedJobs: {
$reduce: {
input: "$statuses",
initialValue: 0,
in: {
$sum: [
"$$value",
{
$cond: [
{
$eq: [
"$$this.status",
"completed",
],
},
"$$this.count",
0,
],
},
],
},
},
},
failedJobs: {
$reduce: {
input: "$statuses",
initialValue: 0,
in: {
$sum: [
"$$value",
{
$cond: [
{
$eq: [
"$$this.status",
"failed",
],
},
"$$this.count",
0,
],
},
],
},
},
},
earliestTimestamp: {
$min: "$statuses.earliestTimestamp",
},
},
},
]);
},
},
"uncompressFullArchive.async": {
rest: "POST /uncompressFullArchive",
handler: async (
ctx: Context<{
filePath: string;
comicObjectId: string;
options: any;
}>
) => {
console.log(
`Received Job ID ${JSON.stringify(
ctx.locals
)}, processing...`
);
const { filePath, options, comicObjectId } = ctx.params;
const comicId = new ObjectId(comicObjectId);
// 2. Extract metadata from the archive
const result: string[] = await uncompressEntireArchive(
filePath,
options
);
if (Array.isArray(result) && result.length !== 0) {
// Get the containing directory of the uncompressed archive
const directoryPath = path.dirname(result[0]);
// Add to mongo object
await Comic.findByIdAndUpdate(
comicId,
{
$set: {
"rawFileDetails.archive": {
uncompressed: true,
expandedPath: directoryPath,
},
},
},
{ new: true, safe: true, upsert: true }
);
return result;
}
},
},
},
events: {
async "uncompressFullArchive.async.active"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} is set to active.`
);
},
async "uncompressFullArchive.async.completed"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} completed.`
);
const job = await this.job(ctx.params.id);
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_UNCOMPRESSION_JOB_COMPLETE",
args: [
{
uncompressedArchive: job.returnvalue,
},
],
});
return job.returnvalue;
},
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
console.log(`Job ID ${ctx.params.id} is set to active.`);
},
async drained(ctx) {
console.log("Queue drained.");
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_IMPORT_QUEUE_DRAINED",
args: [
{
message: "drained",
},
],
});
},
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
// 1. Fetch the job result using the job Id
const job = await this.job(ctx.params.id);
// 2. Increment the completed job counter
await pubClient.incr("completedJobCount");
// 3. Fetch the completed job count for the final payload to be sent to the client
const completedJobCount = await pubClient.get(
"completedJobCount"
);
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_COVER_EXTRACTED",
args: [
{
completedJobCount,
importResult: job.returnvalue.data.importResult,
},
],
});
// 5. Persist the job results in mongo for posterity
await JobResult.create({
id: ctx.params.id,
status: "completed",
timestamp: job.timestamp,
sessionId: job.returnvalue.sessionId,
failedReason: {},
});
console.log(`Job ID ${ctx.params.id} completed.`);
},
async "enqueue.async.failed"(ctx) {
const job = await this.job(ctx.params.id);
await pubClient.incr("failedJobCount");
const failedJobCount = await pubClient.get(
"failedJobCount"
);
await JobResult.create({
id: ctx.params.id,
status: "failed",
failedReason: job.failedReason,
sessionId: job.data.params.sessionId,
timestamp: job.timestamp,
});
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_COVER_EXTRACTION_FAILED",
args: [
{
failedJobCount,
importResult: job,
},
],
});
},
},
methods: {
/**
* Create canonical metadata structure with source attribution for user-driven curation
* @param sourcedMetadata - Metadata from various external sources
* @param inferredMetadata - Metadata inferred from filename/file analysis
*/
createCanonicalMetadata(sourcedMetadata: any, inferredMetadata: any) {
const currentTime = new Date();
// Priority order: comicInfo -> comicvine -> metron -> gcd -> locg -> inferred
const sourcePriority = ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg'];
// Helper function to extract actual value from metadata (handle arrays, etc.)
const extractValue = (value: any) => {
if (Array.isArray(value)) {
return value.length > 0 ? value[0] : null;
}
return value;
};
// Helper function to find the best value and its source
const findBestValue = (fieldName: string, defaultValue: any = null, defaultSource: string = 'inferred') => {
for (const source of sourcePriority) {
const rawValue = sourcedMetadata[source]?.[fieldName];
if (rawValue !== undefined && rawValue !== null && rawValue !== '') {
const extractedValue = extractValue(rawValue);
if (extractedValue !== null && extractedValue !== '') {
return {
value: extractedValue,
source: source,
userSelected: false,
lastModified: currentTime
};
}
}
}
return {
value: defaultValue,
source: defaultSource,
userSelected: false,
lastModified: currentTime
};
};
// Helper function for series-specific field resolution
const findSeriesValue = (fieldNames: string[], defaultValue: any = null) => {
for (const source of sourcePriority) {
const metadata = sourcedMetadata[source];
if (metadata) {
for (const fieldName of fieldNames) {
const rawValue = metadata[fieldName];
if (rawValue !== undefined && rawValue !== null && rawValue !== '') {
const extractedValue = extractValue(rawValue);
if (extractedValue !== null && extractedValue !== '') {
return {
value: extractedValue,
source: source,
userSelected: false,
lastModified: currentTime
};
}
}
}
}
}
return {
value: defaultValue,
source: 'inferred',
userSelected: false,
lastModified: currentTime
};
};
const canonical: any = {
// Core identifying information
title: findBestValue('title', inferredMetadata.title),
// Series information
series: {
name: findSeriesValue(['series', 'seriesName', 'name'], inferredMetadata.series),
volume: findBestValue('volume', inferredMetadata.volume || 1),
startYear: findBestValue('startYear', inferredMetadata.issue?.year ? parseInt(inferredMetadata.issue.year) : new Date().getFullYear())
},
// Issue information
issueNumber: findBestValue('issueNumber', inferredMetadata.issue?.number?.toString() || "1"),
// Publishing information
publisher: findBestValue('publisher', null),
publicationDate: findBestValue('publicationDate', null),
coverDate: findBestValue('coverDate', null),
// Content information
pageCount: findBestValue('pageCount', null),
summary: findBestValue('summary', null),
// Creator information - collect from all sources for richer data
creators: [],
// Character and genre arrays with source tracking
characters: {
values: [],
source: 'inferred',
userSelected: false,
lastModified: currentTime
},
genres: {
values: [],
source: 'inferred',
userSelected: false,
lastModified: currentTime
},
// Canonical metadata tracking
lastCanonicalUpdate: currentTime,
hasUserModifications: false,
// Quality and completeness tracking
completeness: {
score: 0,
missingFields: [],
lastCalculated: currentTime
}
};
// Handle creators - combine from all sources but track source attribution
const allCreators: any[] = [];
for (const source of sourcePriority) {
const metadata = sourcedMetadata[source];
if (metadata?.creators) {
metadata.creators.forEach((creator: any) => {
allCreators.push({
name: extractValue(creator.name),
role: extractValue(creator.role),
source: source,
userSelected: false,
lastModified: currentTime
});
});
} else {
// Handle legacy writer/artist fields
if (metadata?.writer) {
allCreators.push({
name: extractValue(metadata.writer),
role: 'Writer',
source: source,
userSelected: false,
lastModified: currentTime
});
}
if (metadata?.artist) {
allCreators.push({
name: extractValue(metadata.artist),
role: 'Artist',
source: source,
userSelected: false,
lastModified: currentTime
});
}
}
}
canonical.creators = allCreators;
// Handle characters - combine from all sources
const allCharacters = new Set();
let characterSource = 'inferred';
for (const source of sourcePriority) {
if (sourcedMetadata[source]?.characters && sourcedMetadata[source].characters.length > 0) {
sourcedMetadata[source].characters.forEach((char: string) => allCharacters.add(char));
if (characterSource === 'inferred') characterSource = source; // Use the first source found
}
}
canonical.characters = {
values: Array.from(allCharacters),
source: characterSource,
userSelected: false,
lastModified: currentTime
};
// Handle genres - combine from all sources
const allGenres = new Set();
let genreSource = 'inferred';
for (const source of sourcePriority) {
if (sourcedMetadata[source]?.genres && sourcedMetadata[source].genres.length > 0) {
sourcedMetadata[source].genres.forEach((genre: string) => allGenres.add(genre));
if (genreSource === 'inferred') genreSource = source; // Use the first source found
}
}
canonical.genres = {
values: Array.from(allGenres),
source: genreSource,
userSelected: false,
lastModified: currentTime
};
// Calculate completeness score
const requiredFields = ['title', 'series.name', 'issueNumber', 'publisher'];
const optionalFields = ['publicationDate', 'coverDate', 'pageCount', 'summary'];
const missingFields = [];
let filledCount = 0;
// Check required fields
requiredFields.forEach(field => {
const fieldPath = field.split('.');
let value = canonical;
for (const path of fieldPath) {
value = value?.[path];
}
if (value?.value) {
filledCount++;
} else {
missingFields.push(field);
}
});
// Check optional fields
optionalFields.forEach(field => {
if (canonical[field]?.value) {
filledCount++;
}
});
const totalFields = requiredFields.length + optionalFields.length;
canonical.completeness = {
score: Math.round((filledCount / totalFields) * 100),
missingFields: missingFields,
lastCalculated: currentTime
};
return canonical;
}
},
});
}
}