🐂 Migration to moleculer-bullMQ #6

Merged
rishighan merged 21 commits from migration-to-bullmq into master 2023-08-30 17:50:47 +00:00
11 changed files with 2908 additions and 2483 deletions

10
config/redis.config.ts Normal file
View File

@@ -0,0 +1,10 @@
import { createClient } from "redis";
const redisURL = new URL(process.env.REDIS_URI);
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
(async () => {
await pubClient.connect();
})();
const subClient = pubClient.duplicate();
export { subClient, pubClient };

12
models/jobresult.model.ts Normal file
View File

@@ -0,0 +1,12 @@
const mongoose = require("mongoose");
const JobResultScehma = mongoose.Schema({
id: Number,
status: String,
sessionId: String,
failedReason: Object,
timestamp: Date,
});
const JobResult = mongoose.model("JobResult", JobResultScehma);
export default JobResult;

9
models/session.model.ts Normal file
View File

@@ -0,0 +1,9 @@
const mongoose = require("mongoose");
const SessionScehma = mongoose.Schema({
sessionId: String,
socketId: String,
});
const Session = mongoose.model("Session", SessionScehma);
export default Session;

4283
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -34,11 +34,12 @@
"npm": "^8.4.1",
"ts-jest": "^29.0.5",
"ts-node": "^10.9.1",
"typescript": "^5.0.2"
"typescript": "^5.0.2",
"uuid": "^9.0.0"
},
"dependencies": {
"@elastic/elasticsearch": "^8.6.0",
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
"@elastic/elasticsearch": "^8.6.0",
"@jorgeferrero/stream-to-buffer": "^2.0.6",
"@npcz/magic": "^1.3.14",
"@root/walk": "^1.1.0",
@@ -64,8 +65,7 @@
"leven": "^3.1.0",
"lodash": "^4.17.21",
"mkdirp": "^0.5.5",
"moleculer": "^0.14.29",
"moleculer-bull": "github:rishighan/moleculer-bull#1.0.0",
"moleculer-bullmq": "^3.0.0",
"moleculer-db": "^0.8.23",
"moleculer-db-adapter-mongoose": "^0.9.2",
"moleculer-io": "^2.2.0",

View File

@@ -1,291 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2022 Rishi Ghan
*
The MIT License (MIT)
Copyright (c) 2015 Rishi Ghan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/*
* Revision History:
* Initial: 2022/01/28 Rishi Ghan
*/
"use strict";
import { refineQuery } from "filename-parser";
import { isNil, isUndefined } from "lodash";
import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
import BullMQMixin, { SandboxedJob } from "moleculer-bull";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import {
extractFromArchive,
uncompressEntireArchive,
} from "../utils/uncompression.utils";
const REDIS_URI = process.env.REDIS_URI || `redis://localhost:6379`;
const EventEmitter = require("events");
EventEmitter.defaultMaxListeners = 20;
console.log(`REDIS -> ${REDIS_URI}`);
export default class QueueService extends Service {
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "importqueue" }
) {
super(broker);
this.parseServiceSchema({
name: "importqueue",
mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
settings: {
bullmq: {
maxStalledCount: 0,
},
},
hooks: {},
queues: {
"process.import": {
concurrency: 10,
async process(job: SandboxedJob) {
console.info("New job received!", job.data);
console.info(`Processing queue...`);
// extract the cover
const result = await extractFromArchive(
job.data.fileObject.filePath
);
const {
name,
filePath,
fileSize,
extension,
mimeType,
cover,
containedIn,
comicInfoJSON,
} = result;
// Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(
result.name
);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
);
// Add the bundleId, if present to the payload
let bundleId = null;
if (!isNil(job.data.bundleId)) {
bundleId = job.data.bundleId;
}
// Orchestrate the payload
const payload = {
importStatus: {
isImported: true,
tagged: false,
matchedResult: {
score: "0",
},
},
rawFileDetails: {
name,
filePath,
fileSize,
extension,
mimeType,
containedIn,
cover,
},
inferredMetadata: {
issue: inferredIssueDetails,
},
sourcedMetadata: {
// except for ComicInfo.xml, everything else should be copied over from the
// parent comic
comicInfo: comicInfoJSON,
},
// since we already have at least 1 copy
// mark it as not wanted by default
"acquisition.source.wanted": false,
// clear out the downloads array
// "acquisition.directconnect.downloads": [],
// mark the metadata source
"acquisition.source.name": job.data.sourcedFrom,
};
// Add the sourcedMetadata, if present
if (!isNil(job.data.sourcedMetadata) && !isUndefined(job.data.sourcedMetadata.comicvine)) {
Object.assign(
payload.sourcedMetadata,
job.data.sourcedMetadata
);
}
// write to mongo
const importResult = await this.broker.call(
"library.rawImportToDB",
{
importType: job.data.importType,
bundleId,
payload,
}
);
return {
data: {
importResult,
},
id: job.id,
worker: process.pid,
};
},
},
"process.uncompressAndResize": {
concurrency: 2,
async process(job: SandboxedJob) {
console.log(`Initiating uncompression job...`);
return await uncompressEntireArchive(
job.data.filePath,
job.data.options
);
},
},
},
actions: {
uncompressResize: {
rest: "POST /uncompressResize",
params: {},
async handler(
ctx: Context<{
data: { filePath: string; options: any };
}>
) {
return await this.createJob(
"process.uncompressAndResize",
ctx.params
);
},
},
processImport: {
rest: "POST /processImport",
params: {},
async handler(
ctx: Context<{
fileObject: object;
importType: string;
bundleId: number;
sourcedFrom?: string;
sourcedMetadata: object;
}>
) {
return await this.createJob("process.import", {
fileObject: ctx.params.fileObject,
importType: ctx.params.importType,
bundleId: ctx.params.bundleId,
sourcedFrom: ctx.params.sourcedFrom,
sourcedMetadata: ctx.params.sourcedMetadata,
});
},
},
toggleImportQueue: {
rest: "POST /pauseImportQueue",
params: {},
handler: async (ctx: Context<{ action: string }>) => {
switch (ctx.params.action) {
case "pause":
const foo = await this.getQueue(
"process.import"
).pause();
console.log("paused", foo);
return foo;
case "resume":
const soo = await this.getQueue(
"process.import"
).resume();
console.log("resumed", soo);
return soo;
default:
console.log("Unrecognized queue action.");
}
},
},
},
methods: {},
async started(): Promise<any> {
await this.getQueue("process.import").on(
"failed",
async (job, error) => {
console.error(
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
);
console.error(job.data);
}
);
await this.getQueue("process.import").on(
"completed",
async (job, res) => {
await this.broker.call("socket.broadcast", {
namespace: "/", //optional
event: "action",
args: [{ type: "LS_COVER_EXTRACTED", result: res }], //optional
});
console.info(
`Import Job with the id '${job.id}' completed.`
);
}
);
await this.getQueue("process.import").on(
"stalled",
async (job) => {
console.warn(`Import job '${job.id} stalled!`);
console.log(`${JSON.stringify(job, null, 2)}`);
console.log(`is stalled.`);
}
);
await this.getQueue("process.uncompressAndResize").on(
"completed",
async (job, res) => {
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
args: [
{
type: "COMICBOOK_EXTRACTION_SUCCESS",
result: {
files: res,
purpose: job.data.options.purpose,
},
},
],
});
console.info(`Uncompression Job ${job.id} completed.`);
}
);
},
});
}
}

View File

@@ -0,0 +1,331 @@
import { Context, Service, ServiceBroker } from "moleculer";
import JobResult from "../models/jobresult.model";
import { refineQuery } from "filename-parser";
import BullMqMixin from "moleculer-bullmq";
import { extractFromArchive } from "../utils/uncompression.utils";
import { isNil, isUndefined } from "lodash";
import { pubClient } from "../config/redis.config";
const { MoleculerError } = require("moleculer").Errors;
console.log(process.env.REDIS_URI);
export default class JobQueueService extends Service {
public constructor(public broker: ServiceBroker) {
super(broker);
this.parseServiceSchema({
name: "jobqueue",
hooks: {},
mixins: [BullMqMixin],
settings: {
bullmq: {
client: process.env.REDIS_URI,
},
},
actions: {
getJobCountsByType: {
rest: "GET /getJobCountsByType",
handler: async (ctx: Context<{}>) => {
console.log(ctx.params);
return await this.$resolve("jobqueue").getJobCounts();
},
},
toggle: {
rest: "GET /toggle",
handler: async (ctx: Context<{ action: String }>) => {
switch (ctx.params.action) {
case "pause":
this.pause();
break;
case "resume":
this.resume();
break;
default:
console.log(`Unknown queue action.`);
}
},
},
enqueue: {
queue: true,
rest: "/GET enqueue",
handler: async (ctx: Context<{}>) => {
// Enqueue the job
const job = await this.localQueue(ctx, "enqueue.async", ctx.params, {
priority: 10,
});
console.log(`Job ${job.id} enqueued`);
return job.id;
},
},
// Comic Book Import Job Queue
"enqueue.async": {
handler: async (
ctx: Context<{
sessionId: String;
}>
) => {
try {
console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`);
console.log(ctx.params);
// 1. De-structure the job params
const { fileObject } = ctx.locals.job.data.params;
// 2. Extract metadata from the archive
const result = await extractFromArchive(fileObject.filePath);
const {
name,
filePath,
fileSize,
extension,
mimeType,
cover,
containedIn,
comicInfoJSON,
} = result;
// 3a. Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(result.name);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
);
// 3b. Orchestrate the payload
const payload = {
importStatus: {
isImported: true,
tagged: false,
matchedResult: {
score: "0",
},
},
rawFileDetails: {
name,
filePath,
fileSize,
extension,
mimeType,
containedIn,
cover,
},
inferredMetadata: {
issue: inferredIssueDetails,
},
sourcedMetadata: {
// except for ComicInfo.xml, everything else should be copied over from the
// parent comic
comicInfo: comicInfoJSON,
},
// since we already have at least 1 copy
// mark it as not wanted by default
"acquisition.source.wanted": false,
// clear out the downloads array
// "acquisition.directconnect.downloads": [],
// mark the metadata source
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom,
};
// 3c. Add the bundleId, if present to the payload
let bundleId = null;
if (!isNil(ctx.locals.job.data.params.bundleId)) {
bundleId = ctx.locals.job.data.params.bundleId;
}
// 3d. Add the sourcedMetadata, if present
if (
!isNil(ctx.locals.job.data.params.sourcedMetadata) &&
!isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine)
) {
Object.assign(
payload.sourcedMetadata,
ctx.locals.job.data.params.sourcedMetadata
);
}
// 4. write to mongo
const importResult = await this.broker.call("library.rawImportToDB", {
importType: ctx.locals.job.data.params.importType,
bundleId,
payload,
});
return {
data: {
importResult,
},
id: ctx.locals.job.id,
sessionId: ctx.params.sessionId,
};
} catch (error) {
console.error(
`An error occurred processing Job ID ${ctx.locals.job.id}`
);
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", {
data: ctx.params.sessionId,
});
}
},
},
getJobResultStatistics: {
rest: "GET /getJobResultStatistics",
handler: async (ctx: Context<{}>) => {
return await JobResult.aggregate([
{
$group: {
_id: {
sessionId: "$sessionId",
status: "$status",
},
earliestTimestamp: {
$min: "$timestamp",
},
count: {
$sum: 1,
},
},
},
{
$group: {
_id: "$_id.sessionId",
statuses: {
$push: {
status: "$_id.status",
earliestTimestamp: "$earliestTimestamp",
count: "$count",
},
},
},
},
{
$project: {
_id: 0,
sessionId: "$_id",
completedJobs: {
$reduce: {
input: "$statuses",
initialValue: 0,
in: {
$sum: [
"$$value",
{
$cond: [
{
$eq: ["$$this.status", "completed"],
},
"$$this.count",
0,
],
},
],
},
},
},
failedJobs: {
$reduce: {
input: "$statuses",
initialValue: 0,
in: {
$sum: [
"$$value",
{
$cond: [
{
$eq: ["$$this.status", "failed"],
},
"$$this.count",
0,
],
},
],
},
},
},
earliestTimestamp: {
$min: "$statuses.earliestTimestamp",
},
},
},
]);
},
},
},
events: {
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
console.log(`Job ID ${ctx.params.id} is set to active.`);
},
async drained(ctx) {
console.log("Queue drained.");
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
args: [
{
type: "LS_IMPORT_QUEUE_DRAINED",
},
],
});
},
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
// 1. Fetch the job result using the job Id
const job = await this.job(ctx.params.id);
// 2. Increment the completed job counter
await pubClient.incr("completedJobCount");
// 3. Fetch the completed job count for the final payload to be sent to the client
const completedJobCount = await pubClient.get("completedJobCount");
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
args: [
{
type: "LS_COVER_EXTRACTED",
completedJobCount,
importResult: job.returnvalue.data.importResult,
},
],
});
// 5. Persist the job results in mongo for posterity
await JobResult.create({
id: ctx.params.id,
status: "completed",
timestamp: job.timestamp,
sessionId: job.returnvalue.sessionId,
failedReason: {},
});
console.log(`Job ID ${ctx.params.id} completed.`);
},
async "enqueue.async.failed"(ctx) {
const job = await this.job(ctx.params.id);
await pubClient.incr("failedJobCount");
const failedJobCount = await pubClient.get("failedJobCount");
await JobResult.create({
id: ctx.params.id,
status: "failed",
failedReason: job.failedReason,
sessionId: job.data.params.sessionId,
timestamp: job.timestamp,
});
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
args: [
{
type: "LS_COVER_EXTRACTION_FAILED",
failedJobCount,
importResult: job,
},
],
});
},
},
});
}
}

View File

@@ -33,13 +33,7 @@ SOFTWARE.
"use strict";
import { isNil } from "lodash";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import { walkFolder, getSizeOfDirectory } from "../utils/file.utils";
@@ -51,6 +45,7 @@ import {
IExtractionOptions,
} from "threetwo-ui-typings";
const ObjectId = require("mongoose").Types.ObjectId;
import { pubClient } from "../config/redis.config";
import fsExtra from "fs-extra";
const through2 = require("through2");
import klaw from "klaw";
@@ -66,6 +61,17 @@ export default class ImportService extends Service {
mixins: [DbMixin("comics", Comic)],
hooks: {},
actions: {
getHealthInformation: {
rest: "GET /getHealthInformation",
params: {},
handler: async (ctx: Context<{}>) => {
try {
return await ctx.broker.call("$node.services");
} catch (error) {
return new Error("Service is down.");
}
},
},
walkFolders: {
rest: "POST /walkFolders",
params: {
@@ -89,9 +95,7 @@ export default class ImportService extends Service {
uncompressFullArchive: {
rest: "POST /uncompressFullArchive",
params: {},
handler: async (
ctx: Context<{ filePath: string; options: any }>
) => {
handler: async (ctx: Context<{ filePath: string; options: any }>) => {
await broker.call("importqueue.uncompressResize", {
filePath: ctx.params.filePath,
options: ctx.params.options,
@@ -109,8 +113,7 @@ export default class ImportService extends Service {
});
// Determine source where the comic was added from
// and gather identifying information about it
const sourceName =
referenceComicObject[0].acquisition.source.name;
const sourceName = referenceComicObject[0].acquisition.source.name;
const { sourcedMetadata } = referenceComicObject[0];
const filePath = `${COMICS_DIRECTORY}/${ctx.params.bundle.data.name}`;
@@ -139,64 +142,63 @@ export default class ImportService extends Service {
},
newImport: {
rest: "POST /newImport",
params: {},
// params: {},
async handler(
ctx: Context<{
extractionOptions?: any;
sessionId: string;
}>
) {
// 1. Walk the Source folder
klaw(path.resolve(COMICS_DIRECTORY))
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
let fileExtension = path.extname(item.path);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
this.push(item);
}
next();
})
)
// 1.2 Pipe filtered results to the next step
.on("data", async (item) => {
console.info(
"Found a file at path: %s",
item.path
);
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
path.extname(item.path)
)}`,
});
if (!comicExists) {
// 2. Send the extraction job to the queue
await broker.call(
"importqueue.processImport",
{
try {
// Get params to be passed to the import jobs
const { sessionId } = ctx.params;
// 1. Walk the Source folder
klaw(path.resolve(COMICS_DIRECTORY))
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
let fileExtension = path.extname(item.path);
if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) {
this.push(item);
}
next();
})
)
// 1.2 Pipe filtered results to the next step
// Enqueue the job in the queue
.on("data", async (item) => {
console.info("Found a file at path: %s", item.path);
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
path.extname(item.path)
)}`,
});
if (!comicExists) {
// 2.1 Reset the job counters in Redis
await pubClient.set("completedJobCount", 0);
await pubClient.set("failedJobCount", 0);
// 2.2 Send the extraction job to the queue
this.broker.call("jobqueue.enqueue", {
fileObject: {
filePath: item.path,
fileSize: item.stats.size,
},
sessionId,
importType: "new",
}
);
} else {
console.log(
"Comic already exists in the library."
);
}
})
.on("end", () => {
console.log("All files traversed.");
});
});
} else {
console.log("Comic already exists in the library.");
}
})
.on("end", () => {
console.log("All files traversed.");
});
} catch (error) {
console.log(error);
}
},
},
rawImportToDB: {
rest: "POST /rawImportToDB",
params: {},
@@ -238,28 +240,19 @@ export default class ImportService extends Service {
// we solicit volume information and add that to mongo
if (
comicMetadata.sourcedMetadata.comicvine &&
!isNil(
comicMetadata.sourcedMetadata.comicvine
.volume
)
!isNil(comicMetadata.sourcedMetadata.comicvine.volume)
) {
volumeDetails = await this.broker.call(
"comicvine.getVolumes",
{
volumeURI:
comicMetadata.sourcedMetadata
.comicvine.volume
.api_detail_url,
}
);
volumeDetails = await this.broker.call("comicvine.getVolumes", {
volumeURI:
comicMetadata.sourcedMetadata.comicvine.volume
.api_detail_url,
});
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
volumeDetails.results;
}
console.log("Saving to Mongo...");
console.log(
`Import type: [${ctx.params.importType}]`
);
console.log(`Import type: [${ctx.params.importType}]`);
switch (ctx.params.importType) {
case "new":
return await Comic.create(comicMetadata);
@@ -280,10 +273,7 @@ export default class ImportService extends Service {
}
} catch (error) {
console.log(error);
throw new Errors.MoleculerError(
"Import failed.",
500
);
throw new Errors.MoleculerError("Import failed.", 500);
}
},
},
@@ -301,9 +291,7 @@ export default class ImportService extends Service {
) {
// 1. Find mongo object by id
// 2. Import payload into sourcedMetadata.comicvine
const comicObjectId = new ObjectId(
ctx.params.comicObjectId
);
const comicObjectId = new ObjectId(ctx.params.comicObjectId);
return new Promise(async (resolve, reject) => {
let volumeDetails = {};
@@ -312,18 +300,15 @@ export default class ImportService extends Service {
const volumeDetails = await this.broker.call(
"comicvine.getVolumes",
{
volumeURI:
matchedResult.volume.api_detail_url,
volumeURI: matchedResult.volume.api_detail_url,
}
);
matchedResult.volumeInformation =
volumeDetails.results;
matchedResult.volumeInformation = volumeDetails.results;
Comic.findByIdAndUpdate(
comicObjectId,
{
$set: {
"sourcedMetadata.comicvine":
matchedResult,
"sourcedMetadata.comicvine": matchedResult,
},
},
{ new: true },
@@ -354,9 +339,7 @@ export default class ImportService extends Service {
}>
) {
console.log(JSON.stringify(ctx.params, null, 2));
const comicObjectId = new ObjectId(
ctx.params.comicObjectId
);
const comicObjectId = new ObjectId(ctx.params.comicObjectId);
return new Promise((resolve, reject) => {
Comic.findByIdAndUpdate(
@@ -410,9 +393,7 @@ export default class ImportService extends Service {
params: { ids: "array" },
handler: async (ctx: Context<{ ids: [string] }>) => {
console.log(ctx.params.ids);
const queryIds = ctx.params.ids.map(
(id) => new ObjectId(id)
);
const queryIds = ctx.params.ids.map((id) => new ObjectId(id));
return await Comic.find({
_id: {
$in: queryIds,
@@ -428,8 +409,7 @@ export default class ImportService extends Service {
const volumes = await Comic.aggregate([
{
$project: {
volumeInfo:
"$sourcedMetadata.comicvine.volumeInformation",
volumeInfo: "$sourcedMetadata.comicvine.volumeInformation",
},
},
{
@@ -475,52 +455,46 @@ export default class ImportService extends Service {
const { queryObjects } = ctx.params;
// construct the query for ElasticSearch
let elasticSearchQuery = {};
const elasticSearchQueries = queryObjects.map(
(queryObject) => {
console.log("Volume: ", queryObject.volumeName);
console.log("Issue: ", queryObject.issueName);
if (queryObject.issueName === null) {
queryObject.issueName = "";
}
if (queryObject.volumeName === null) {
queryObject.volumeName = "";
}
elasticSearchQuery = {
bool: {
must: [
{
match_phrase: {
"rawFileDetails.name":
queryObject.volumeName,
},
},
{
term: {
"inferredMetadata.issue.number":
parseInt(
queryObject.issueNumber,
10
),
},
},
],
},
};
return [
{
index: "comics",
search_type: "dfs_query_then_fetch",
},
{
query: elasticSearchQuery,
},
];
const elasticSearchQueries = queryObjects.map((queryObject) => {
console.log("Volume: ", queryObject.volumeName);
console.log("Issue: ", queryObject.issueName);
if (queryObject.issueName === null) {
queryObject.issueName = "";
}
);
console.log(
JSON.stringify(elasticSearchQueries, null, 2)
);
if (queryObject.volumeName === null) {
queryObject.volumeName = "";
}
elasticSearchQuery = {
bool: {
must: [
{
match_phrase: {
"rawFileDetails.name": queryObject.volumeName,
},
},
{
term: {
"inferredMetadata.issue.number": parseInt(
queryObject.issueNumber,
10
),
},
},
],
},
};
return [
{
index: "comics",
search_type: "dfs_query_then_fetch",
},
{
query: elasticSearchQuery,
},
];
});
console.log(JSON.stringify(elasticSearchQueries, null, 2));
return await ctx.broker.call("search.searchComic", {
elasticSearchQueries,
@@ -533,10 +507,11 @@ export default class ImportService extends Service {
rest: "GET /libraryStatistics",
params: {},
handler: async (ctx: Context<{}>) => {
const comicDirectorySize = await getSizeOfDirectory(
COMICS_DIRECTORY,
[".cbz", ".cbr", ".cb7"]
);
const comicDirectorySize = await getSizeOfDirectory(COMICS_DIRECTORY, [
".cbz",
".cbr",
".cb7",
]);
const totalCount = await Comic.countDocuments({});
const statistics = await Comic.aggregate([
{
@@ -545,11 +520,7 @@ export default class ImportService extends Service {
{
$match: {
"rawFileDetails.extension": {
$in: [
".cbr",
".cbz",
".cb7",
],
$in: [".cbr", ".cbz", ".cb7"],
},
},
},
@@ -563,10 +534,9 @@ export default class ImportService extends Service {
issues: [
{
$match: {
"sourcedMetadata.comicvine.volumeInformation":
{
$gt: {},
},
"sourcedMetadata.comicvine.volumeInformation": {
$gt: {},
},
},
},
{
@@ -629,23 +599,16 @@ export default class ImportService extends Service {
.drop()
.then(async (data) => {
console.info(data);
const coversFolderDeleteResult =
fsExtra.emptyDirSync(
path.resolve(
`${USERDATA_DIRECTORY}/covers`
)
);
const expandedFolderDeleteResult =
fsExtra.emptyDirSync(
path.resolve(
`${USERDATA_DIRECTORY}/expanded`
)
);
const eSIndicesDeleteResult =
await ctx.broker.call(
"search.deleteElasticSearchIndices",
{}
);
const coversFolderDeleteResult = fsExtra.emptyDirSync(
path.resolve(`${USERDATA_DIRECTORY}/covers`)
);
const expandedFolderDeleteResult = fsExtra.emptyDirSync(
path.resolve(`${USERDATA_DIRECTORY}/expanded`)
);
const eSIndicesDeleteResult = await ctx.broker.call(
"search.deleteElasticSearchIndices",
{}
);
return {
data,
coversFolderDeleteResult,

View File

@@ -46,14 +46,15 @@ export default class SettingsService extends Service {
.map((item) => JSON.stringify(item))
.join("\n");
queries += "\n";
const { body } = await eSClient.msearch({
const { responses } = await eSClient.msearch({
body: queries,
});
body.responses.forEach((match) => {
responses.forEach((match) => {
console.log(match.hits);
});
return body.responses;
return responses;
},
},
issue: {

View File

@@ -1,16 +1,14 @@
"use strict";
import { Service, ServiceBroker, ServiceSchema } from "moleculer";
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
import { JobType } from "moleculer-bullmq";
import { createClient } from "redis";
import { createAdapter } from "@socket.io/redis-adapter";
import Session from "../models/session.model";
import { pubClient, subClient } from "../config/redis.config";
const { MoleculerError } = require("moleculer").Errors;
const SocketIOService = require("moleculer-io");
const redisURL = new URL(process.env.REDIS_URI);
// console.log(redisURL.hostname);
const { v4: uuidv4 } = require("uuid");
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
(async () => {
await pubClient.connect();
})();
const subClient = pubClient.duplicate();
export default class SocketService extends Service {
// @ts-ignore
public constructor(
@@ -30,31 +28,75 @@ export default class SocketService extends Service {
call: {
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
},
action: async (data, ack) => {
// write your handler function here.
action: async (data) => {
switch (data.type) {
case "LS_IMPORT":
console.log(
`Recieved ${data.type} event.`
);
// 1. Send task to queue
await this.broker.call(
"library.newImport",
data.data,
{}
);
case "RESUME_SESSION":
console.log("Attempting to resume session...");
try {
const sessionRecord = await Session.find({
sessionId: data.session.sessionId,
});
// 1. Check for sessionId's existence, and a match
if (
sessionRecord.length !== 0 &&
sessionRecord[0].sessionId ===
data.session.sessionId
) {
// 2. Find if the queue has active jobs
const jobs: JobType = await this.broker.call(
"jobqueue.getJobCountsByType",
{}
);
const { active } = jobs;
if (active > 0) {
// 3. Get job counts
const completedJobCount =
await pubClient.get(
"completedJobCount"
);
const failedJobCount = await pubClient.get(
"failedJobCount"
);
// 4. Send the counts to the active socket.io session
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
args: [
{
type: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION",
completedJobCount,
failedJobCount,
queueStatus: "running",
},
],
});
}
}
} catch (err) {
throw new MoleculerError(
err,
500,
"SESSION_ID_NOT_FOUND",
{
data: data.session.sessionId,
}
);
}
break;
case "LS_TOGGLE_IMPORT_QUEUE":
case "LS_SET_QUEUE_STATUS":
console.log(data);
await this.broker.call(
"importqueue.toggleImportQueue",
data.data,
"jobqueue.toggle",
{ action: data.data.queueAction },
{}
);
break;
case "LS_SINGLE_IMPORT":
console.info(
"AirDC++ finished a download -> "
);
console.info("AirDC++ finished a download -> ");
console.log(data);
await this.broker.call(
"library.importDownloadedComic",
@@ -80,9 +122,36 @@ export default class SocketService extends Service {
actions: {},
methods: {},
async started() {
this.io.on("connection", (data) =>
console.log("socket.io server initialized.")
);
this.io.on("connection", async (socket) => {
console.log(
`socket.io server connected to client with session ID: ${socket.id}`
);
console.log("Looking up sessionId in Mongo...");
const sessionIdExists = await Session.find({
sessionId: socket.handshake.query.sessionId,
});
// 1. if sessionId isn't found in Mongo, create one and persist it
if (sessionIdExists.length === 0) {
console.log(
`Socket Id ${socket.id} not found in Mongo, creating a new session...`
);
const sessionId = uuidv4();
socket.sessionId = sessionId;
console.log(`Saving session ${sessionId} to Mongo...`);
await Session.create({
sessionId,
socketId: socket.id,
});
socket.emit("sessionInitialized", sessionId);
}
// 2. else, retrieve it from Mongo and "resume" the socket.io connection
else {
console.log(
`Found socketId ${socket.id}, attempting to resume socket.io connection...`
);
console.log(socket.handshake.query.sessionId);
}
});
},
});
}

View File

@@ -47,6 +47,7 @@ import {
getMimeType,
} from "../utils/file.utils";
import { convertXMLToJSON } from "./xml.utils";
const { MoleculerError } = require("moleculer").Errors;
const fse = require("fs-extra");
const Unrar = require("unrar");
interface RarFile {
@@ -254,7 +255,7 @@ export const extractComicInfoXMLFromZip = async (
// Push the first file (cover) to our extraction target
extractionTargets.push(files[0].name);
filesToWriteToDisk.coverFile = path.basename(files[0].name);
if (!isEmpty(comicInfoXMLFileObject)) {
filesToWriteToDisk.comicInfoXML = comicInfoXMLFileObject[0].name;
extractionTargets.push(filesToWriteToDisk.comicInfoXML);
@@ -364,10 +365,13 @@ export const extractFromArchive = async (filePath: string) => {
return Object.assign({}, ...cbrResult);
default:
console.log(
console.error(
"Error inferring filetype for comicinfo.xml extraction."
);
break;
throw new MoleculerError({}, 500, "FILETYPE_INFERENCE_ERROR", {
data: { message: "Cannot infer filetype."},
});
}
};