Merge pull request #6 from rishighan/migration-to-bullmq
🐂 Migration to moleculer-bullMQ
This commit was merged in pull request #6.
This commit is contained in:
10
config/redis.config.ts
Normal file
10
config/redis.config.ts
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
import { createClient } from "redis";
|
||||||
|
const redisURL = new URL(process.env.REDIS_URI);
|
||||||
|
|
||||||
|
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
|
||||||
|
(async () => {
|
||||||
|
await pubClient.connect();
|
||||||
|
})();
|
||||||
|
const subClient = pubClient.duplicate();
|
||||||
|
|
||||||
|
export { subClient, pubClient };
|
||||||
12
models/jobresult.model.ts
Normal file
12
models/jobresult.model.ts
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
const mongoose = require("mongoose");
|
||||||
|
|
||||||
|
const JobResultScehma = mongoose.Schema({
|
||||||
|
id: Number,
|
||||||
|
status: String,
|
||||||
|
sessionId: String,
|
||||||
|
failedReason: Object,
|
||||||
|
timestamp: Date,
|
||||||
|
});
|
||||||
|
|
||||||
|
const JobResult = mongoose.model("JobResult", JobResultScehma);
|
||||||
|
export default JobResult;
|
||||||
9
models/session.model.ts
Normal file
9
models/session.model.ts
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
const mongoose = require("mongoose");
|
||||||
|
|
||||||
|
const SessionScehma = mongoose.Schema({
|
||||||
|
sessionId: String,
|
||||||
|
socketId: String,
|
||||||
|
});
|
||||||
|
|
||||||
|
const Session = mongoose.model("Session", SessionScehma);
|
||||||
|
export default Session;
|
||||||
4283
package-lock.json
generated
4283
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -34,11 +34,12 @@
|
|||||||
"npm": "^8.4.1",
|
"npm": "^8.4.1",
|
||||||
"ts-jest": "^29.0.5",
|
"ts-jest": "^29.0.5",
|
||||||
"ts-node": "^10.9.1",
|
"ts-node": "^10.9.1",
|
||||||
"typescript": "^5.0.2"
|
"typescript": "^5.0.2",
|
||||||
|
"uuid": "^9.0.0"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@elastic/elasticsearch": "^8.6.0",
|
|
||||||
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
|
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
|
||||||
|
"@elastic/elasticsearch": "^8.6.0",
|
||||||
"@jorgeferrero/stream-to-buffer": "^2.0.6",
|
"@jorgeferrero/stream-to-buffer": "^2.0.6",
|
||||||
"@npcz/magic": "^1.3.14",
|
"@npcz/magic": "^1.3.14",
|
||||||
"@root/walk": "^1.1.0",
|
"@root/walk": "^1.1.0",
|
||||||
@@ -64,8 +65,7 @@
|
|||||||
"leven": "^3.1.0",
|
"leven": "^3.1.0",
|
||||||
"lodash": "^4.17.21",
|
"lodash": "^4.17.21",
|
||||||
"mkdirp": "^0.5.5",
|
"mkdirp": "^0.5.5",
|
||||||
"moleculer": "^0.14.29",
|
"moleculer-bullmq": "^3.0.0",
|
||||||
"moleculer-bull": "github:rishighan/moleculer-bull#1.0.0",
|
|
||||||
"moleculer-db": "^0.8.23",
|
"moleculer-db": "^0.8.23",
|
||||||
"moleculer-db-adapter-mongoose": "^0.9.2",
|
"moleculer-db-adapter-mongoose": "^0.9.2",
|
||||||
"moleculer-io": "^2.2.0",
|
"moleculer-io": "^2.2.0",
|
||||||
|
|||||||
@@ -1,291 +0,0 @@
|
|||||||
/*
|
|
||||||
* MIT License
|
|
||||||
*
|
|
||||||
* Copyright (c) 2022 Rishi Ghan
|
|
||||||
*
|
|
||||||
The MIT License (MIT)
|
|
||||||
|
|
||||||
Copyright (c) 2015 Rishi Ghan
|
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
|
||||||
in the Software without restriction, including without limitation the rights
|
|
||||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
copies of the Software, and to permit persons to whom the Software is
|
|
||||||
furnished to do so, subject to the following conditions:
|
|
||||||
|
|
||||||
The above copyright notice and this permission notice shall be included in all
|
|
||||||
copies or substantial portions of the Software.
|
|
||||||
|
|
||||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
SOFTWARE.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Revision History:
|
|
||||||
* Initial: 2022/01/28 Rishi Ghan
|
|
||||||
*/
|
|
||||||
|
|
||||||
"use strict";
|
|
||||||
|
|
||||||
import { refineQuery } from "filename-parser";
|
|
||||||
import { isNil, isUndefined } from "lodash";
|
|
||||||
import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
|
|
||||||
import BullMQMixin, { SandboxedJob } from "moleculer-bull";
|
|
||||||
import { DbMixin } from "../mixins/db.mixin";
|
|
||||||
import Comic from "../models/comic.model";
|
|
||||||
import {
|
|
||||||
extractFromArchive,
|
|
||||||
uncompressEntireArchive,
|
|
||||||
} from "../utils/uncompression.utils";
|
|
||||||
|
|
||||||
const REDIS_URI = process.env.REDIS_URI || `redis://localhost:6379`;
|
|
||||||
const EventEmitter = require("events");
|
|
||||||
EventEmitter.defaultMaxListeners = 20;
|
|
||||||
|
|
||||||
console.log(`REDIS -> ${REDIS_URI}`);
|
|
||||||
export default class QueueService extends Service {
|
|
||||||
public constructor(
|
|
||||||
public broker: ServiceBroker,
|
|
||||||
schema: ServiceSchema<{}> = { name: "importqueue" }
|
|
||||||
) {
|
|
||||||
super(broker);
|
|
||||||
this.parseServiceSchema({
|
|
||||||
name: "importqueue",
|
|
||||||
mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
|
|
||||||
settings: {
|
|
||||||
bullmq: {
|
|
||||||
maxStalledCount: 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
hooks: {},
|
|
||||||
queues: {
|
|
||||||
"process.import": {
|
|
||||||
concurrency: 10,
|
|
||||||
async process(job: SandboxedJob) {
|
|
||||||
console.info("New job received!", job.data);
|
|
||||||
console.info(`Processing queue...`);
|
|
||||||
// extract the cover
|
|
||||||
const result = await extractFromArchive(
|
|
||||||
job.data.fileObject.filePath
|
|
||||||
);
|
|
||||||
const {
|
|
||||||
name,
|
|
||||||
filePath,
|
|
||||||
fileSize,
|
|
||||||
extension,
|
|
||||||
mimeType,
|
|
||||||
cover,
|
|
||||||
containedIn,
|
|
||||||
comicInfoJSON,
|
|
||||||
} = result;
|
|
||||||
|
|
||||||
// Infer any issue-related metadata from the filename
|
|
||||||
const { inferredIssueDetails } = refineQuery(
|
|
||||||
result.name
|
|
||||||
);
|
|
||||||
console.log(
|
|
||||||
"Issue metadata inferred: ",
|
|
||||||
JSON.stringify(inferredIssueDetails, null, 2)
|
|
||||||
);
|
|
||||||
// Add the bundleId, if present to the payload
|
|
||||||
let bundleId = null;
|
|
||||||
if (!isNil(job.data.bundleId)) {
|
|
||||||
bundleId = job.data.bundleId;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Orchestrate the payload
|
|
||||||
const payload = {
|
|
||||||
importStatus: {
|
|
||||||
isImported: true,
|
|
||||||
tagged: false,
|
|
||||||
matchedResult: {
|
|
||||||
score: "0",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
rawFileDetails: {
|
|
||||||
name,
|
|
||||||
filePath,
|
|
||||||
fileSize,
|
|
||||||
extension,
|
|
||||||
mimeType,
|
|
||||||
containedIn,
|
|
||||||
cover,
|
|
||||||
},
|
|
||||||
inferredMetadata: {
|
|
||||||
issue: inferredIssueDetails,
|
|
||||||
},
|
|
||||||
sourcedMetadata: {
|
|
||||||
// except for ComicInfo.xml, everything else should be copied over from the
|
|
||||||
// parent comic
|
|
||||||
comicInfo: comicInfoJSON,
|
|
||||||
},
|
|
||||||
// since we already have at least 1 copy
|
|
||||||
// mark it as not wanted by default
|
|
||||||
"acquisition.source.wanted": false,
|
|
||||||
|
|
||||||
// clear out the downloads array
|
|
||||||
// "acquisition.directconnect.downloads": [],
|
|
||||||
|
|
||||||
// mark the metadata source
|
|
||||||
"acquisition.source.name": job.data.sourcedFrom,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Add the sourcedMetadata, if present
|
|
||||||
if (!isNil(job.data.sourcedMetadata) && !isUndefined(job.data.sourcedMetadata.comicvine)) {
|
|
||||||
Object.assign(
|
|
||||||
payload.sourcedMetadata,
|
|
||||||
job.data.sourcedMetadata
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// write to mongo
|
|
||||||
const importResult = await this.broker.call(
|
|
||||||
"library.rawImportToDB",
|
|
||||||
{
|
|
||||||
importType: job.data.importType,
|
|
||||||
bundleId,
|
|
||||||
payload,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
return {
|
|
||||||
data: {
|
|
||||||
importResult,
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
worker: process.pid,
|
|
||||||
};
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"process.uncompressAndResize": {
|
|
||||||
concurrency: 2,
|
|
||||||
async process(job: SandboxedJob) {
|
|
||||||
console.log(`Initiating uncompression job...`);
|
|
||||||
return await uncompressEntireArchive(
|
|
||||||
job.data.filePath,
|
|
||||||
job.data.options
|
|
||||||
);
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
actions: {
|
|
||||||
uncompressResize: {
|
|
||||||
rest: "POST /uncompressResize",
|
|
||||||
params: {},
|
|
||||||
async handler(
|
|
||||||
ctx: Context<{
|
|
||||||
data: { filePath: string; options: any };
|
|
||||||
}>
|
|
||||||
) {
|
|
||||||
return await this.createJob(
|
|
||||||
"process.uncompressAndResize",
|
|
||||||
ctx.params
|
|
||||||
);
|
|
||||||
},
|
|
||||||
},
|
|
||||||
processImport: {
|
|
||||||
rest: "POST /processImport",
|
|
||||||
params: {},
|
|
||||||
async handler(
|
|
||||||
ctx: Context<{
|
|
||||||
fileObject: object;
|
|
||||||
importType: string;
|
|
||||||
bundleId: number;
|
|
||||||
sourcedFrom?: string;
|
|
||||||
sourcedMetadata: object;
|
|
||||||
}>
|
|
||||||
) {
|
|
||||||
return await this.createJob("process.import", {
|
|
||||||
fileObject: ctx.params.fileObject,
|
|
||||||
importType: ctx.params.importType,
|
|
||||||
bundleId: ctx.params.bundleId,
|
|
||||||
sourcedFrom: ctx.params.sourcedFrom,
|
|
||||||
sourcedMetadata: ctx.params.sourcedMetadata,
|
|
||||||
});
|
|
||||||
},
|
|
||||||
},
|
|
||||||
toggleImportQueue: {
|
|
||||||
rest: "POST /pauseImportQueue",
|
|
||||||
params: {},
|
|
||||||
handler: async (ctx: Context<{ action: string }>) => {
|
|
||||||
switch (ctx.params.action) {
|
|
||||||
case "pause":
|
|
||||||
const foo = await this.getQueue(
|
|
||||||
"process.import"
|
|
||||||
).pause();
|
|
||||||
console.log("paused", foo);
|
|
||||||
return foo;
|
|
||||||
case "resume":
|
|
||||||
const soo = await this.getQueue(
|
|
||||||
"process.import"
|
|
||||||
).resume();
|
|
||||||
console.log("resumed", soo);
|
|
||||||
return soo;
|
|
||||||
default:
|
|
||||||
console.log("Unrecognized queue action.");
|
|
||||||
}
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
methods: {},
|
|
||||||
async started(): Promise<any> {
|
|
||||||
await this.getQueue("process.import").on(
|
|
||||||
"failed",
|
|
||||||
async (job, error) => {
|
|
||||||
console.error(
|
|
||||||
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
|
|
||||||
);
|
|
||||||
console.error(job.data);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
await this.getQueue("process.import").on(
|
|
||||||
"completed",
|
|
||||||
async (job, res) => {
|
|
||||||
await this.broker.call("socket.broadcast", {
|
|
||||||
namespace: "/", //optional
|
|
||||||
event: "action",
|
|
||||||
args: [{ type: "LS_COVER_EXTRACTED", result: res }], //optional
|
|
||||||
});
|
|
||||||
console.info(
|
|
||||||
`Import Job with the id '${job.id}' completed.`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
await this.getQueue("process.import").on(
|
|
||||||
"stalled",
|
|
||||||
async (job) => {
|
|
||||||
console.warn(`Import job '${job.id} stalled!`);
|
|
||||||
console.log(`${JSON.stringify(job, null, 2)}`);
|
|
||||||
console.log(`is stalled.`);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
await this.getQueue("process.uncompressAndResize").on(
|
|
||||||
"completed",
|
|
||||||
async (job, res) => {
|
|
||||||
await this.broker.call("socket.broadcast", {
|
|
||||||
namespace: "/",
|
|
||||||
event: "action",
|
|
||||||
args: [
|
|
||||||
{
|
|
||||||
type: "COMICBOOK_EXTRACTION_SUCCESS",
|
|
||||||
result: {
|
|
||||||
files: res,
|
|
||||||
purpose: job.data.options.purpose,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
],
|
|
||||||
});
|
|
||||||
console.info(`Uncompression Job ${job.id} completed.`);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
331
services/jobqueue.service.ts
Normal file
331
services/jobqueue.service.ts
Normal file
@@ -0,0 +1,331 @@
|
|||||||
|
import { Context, Service, ServiceBroker } from "moleculer";
|
||||||
|
import JobResult from "../models/jobresult.model";
|
||||||
|
import { refineQuery } from "filename-parser";
|
||||||
|
import BullMqMixin from "moleculer-bullmq";
|
||||||
|
import { extractFromArchive } from "../utils/uncompression.utils";
|
||||||
|
import { isNil, isUndefined } from "lodash";
|
||||||
|
import { pubClient } from "../config/redis.config";
|
||||||
|
|
||||||
|
const { MoleculerError } = require("moleculer").Errors;
|
||||||
|
|
||||||
|
console.log(process.env.REDIS_URI);
|
||||||
|
export default class JobQueueService extends Service {
|
||||||
|
public constructor(public broker: ServiceBroker) {
|
||||||
|
super(broker);
|
||||||
|
this.parseServiceSchema({
|
||||||
|
name: "jobqueue",
|
||||||
|
hooks: {},
|
||||||
|
mixins: [BullMqMixin],
|
||||||
|
settings: {
|
||||||
|
bullmq: {
|
||||||
|
client: process.env.REDIS_URI,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
actions: {
|
||||||
|
getJobCountsByType: {
|
||||||
|
rest: "GET /getJobCountsByType",
|
||||||
|
handler: async (ctx: Context<{}>) => {
|
||||||
|
console.log(ctx.params);
|
||||||
|
return await this.$resolve("jobqueue").getJobCounts();
|
||||||
|
},
|
||||||
|
},
|
||||||
|
toggle: {
|
||||||
|
rest: "GET /toggle",
|
||||||
|
handler: async (ctx: Context<{ action: String }>) => {
|
||||||
|
switch (ctx.params.action) {
|
||||||
|
case "pause":
|
||||||
|
this.pause();
|
||||||
|
break;
|
||||||
|
case "resume":
|
||||||
|
this.resume();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
console.log(`Unknown queue action.`);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
enqueue: {
|
||||||
|
queue: true,
|
||||||
|
rest: "/GET enqueue",
|
||||||
|
handler: async (ctx: Context<{}>) => {
|
||||||
|
// Enqueue the job
|
||||||
|
const job = await this.localQueue(ctx, "enqueue.async", ctx.params, {
|
||||||
|
priority: 10,
|
||||||
|
});
|
||||||
|
console.log(`Job ${job.id} enqueued`);
|
||||||
|
|
||||||
|
return job.id;
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// Comic Book Import Job Queue
|
||||||
|
"enqueue.async": {
|
||||||
|
handler: async (
|
||||||
|
ctx: Context<{
|
||||||
|
sessionId: String;
|
||||||
|
}>
|
||||||
|
) => {
|
||||||
|
try {
|
||||||
|
console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`);
|
||||||
|
console.log(ctx.params);
|
||||||
|
// 1. De-structure the job params
|
||||||
|
const { fileObject } = ctx.locals.job.data.params;
|
||||||
|
|
||||||
|
// 2. Extract metadata from the archive
|
||||||
|
const result = await extractFromArchive(fileObject.filePath);
|
||||||
|
const {
|
||||||
|
name,
|
||||||
|
filePath,
|
||||||
|
fileSize,
|
||||||
|
extension,
|
||||||
|
mimeType,
|
||||||
|
cover,
|
||||||
|
containedIn,
|
||||||
|
comicInfoJSON,
|
||||||
|
} = result;
|
||||||
|
|
||||||
|
// 3a. Infer any issue-related metadata from the filename
|
||||||
|
const { inferredIssueDetails } = refineQuery(result.name);
|
||||||
|
console.log(
|
||||||
|
"Issue metadata inferred: ",
|
||||||
|
JSON.stringify(inferredIssueDetails, null, 2)
|
||||||
|
);
|
||||||
|
|
||||||
|
// 3b. Orchestrate the payload
|
||||||
|
const payload = {
|
||||||
|
importStatus: {
|
||||||
|
isImported: true,
|
||||||
|
tagged: false,
|
||||||
|
matchedResult: {
|
||||||
|
score: "0",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
rawFileDetails: {
|
||||||
|
name,
|
||||||
|
filePath,
|
||||||
|
fileSize,
|
||||||
|
extension,
|
||||||
|
mimeType,
|
||||||
|
containedIn,
|
||||||
|
cover,
|
||||||
|
},
|
||||||
|
inferredMetadata: {
|
||||||
|
issue: inferredIssueDetails,
|
||||||
|
},
|
||||||
|
sourcedMetadata: {
|
||||||
|
// except for ComicInfo.xml, everything else should be copied over from the
|
||||||
|
// parent comic
|
||||||
|
comicInfo: comicInfoJSON,
|
||||||
|
},
|
||||||
|
// since we already have at least 1 copy
|
||||||
|
// mark it as not wanted by default
|
||||||
|
"acquisition.source.wanted": false,
|
||||||
|
|
||||||
|
// clear out the downloads array
|
||||||
|
// "acquisition.directconnect.downloads": [],
|
||||||
|
|
||||||
|
// mark the metadata source
|
||||||
|
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom,
|
||||||
|
};
|
||||||
|
|
||||||
|
// 3c. Add the bundleId, if present to the payload
|
||||||
|
let bundleId = null;
|
||||||
|
if (!isNil(ctx.locals.job.data.params.bundleId)) {
|
||||||
|
bundleId = ctx.locals.job.data.params.bundleId;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3d. Add the sourcedMetadata, if present
|
||||||
|
if (
|
||||||
|
!isNil(ctx.locals.job.data.params.sourcedMetadata) &&
|
||||||
|
!isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine)
|
||||||
|
) {
|
||||||
|
Object.assign(
|
||||||
|
payload.sourcedMetadata,
|
||||||
|
ctx.locals.job.data.params.sourcedMetadata
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. write to mongo
|
||||||
|
const importResult = await this.broker.call("library.rawImportToDB", {
|
||||||
|
importType: ctx.locals.job.data.params.importType,
|
||||||
|
bundleId,
|
||||||
|
payload,
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
data: {
|
||||||
|
importResult,
|
||||||
|
},
|
||||||
|
id: ctx.locals.job.id,
|
||||||
|
sessionId: ctx.params.sessionId,
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
console.error(
|
||||||
|
`An error occurred processing Job ID ${ctx.locals.job.id}`
|
||||||
|
);
|
||||||
|
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", {
|
||||||
|
data: ctx.params.sessionId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
getJobResultStatistics: {
|
||||||
|
rest: "GET /getJobResultStatistics",
|
||||||
|
handler: async (ctx: Context<{}>) => {
|
||||||
|
return await JobResult.aggregate([
|
||||||
|
{
|
||||||
|
$group: {
|
||||||
|
_id: {
|
||||||
|
sessionId: "$sessionId",
|
||||||
|
status: "$status",
|
||||||
|
},
|
||||||
|
earliestTimestamp: {
|
||||||
|
$min: "$timestamp",
|
||||||
|
},
|
||||||
|
count: {
|
||||||
|
$sum: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
$group: {
|
||||||
|
_id: "$_id.sessionId",
|
||||||
|
statuses: {
|
||||||
|
$push: {
|
||||||
|
status: "$_id.status",
|
||||||
|
earliestTimestamp: "$earliestTimestamp",
|
||||||
|
count: "$count",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
$project: {
|
||||||
|
_id: 0,
|
||||||
|
sessionId: "$_id",
|
||||||
|
completedJobs: {
|
||||||
|
$reduce: {
|
||||||
|
input: "$statuses",
|
||||||
|
initialValue: 0,
|
||||||
|
in: {
|
||||||
|
$sum: [
|
||||||
|
"$$value",
|
||||||
|
{
|
||||||
|
$cond: [
|
||||||
|
{
|
||||||
|
$eq: ["$$this.status", "completed"],
|
||||||
|
},
|
||||||
|
"$$this.count",
|
||||||
|
0,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
failedJobs: {
|
||||||
|
$reduce: {
|
||||||
|
input: "$statuses",
|
||||||
|
initialValue: 0,
|
||||||
|
in: {
|
||||||
|
$sum: [
|
||||||
|
"$$value",
|
||||||
|
{
|
||||||
|
$cond: [
|
||||||
|
{
|
||||||
|
$eq: ["$$this.status", "failed"],
|
||||||
|
},
|
||||||
|
"$$this.count",
|
||||||
|
0,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
earliestTimestamp: {
|
||||||
|
$min: "$statuses.earliestTimestamp",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
events: {
|
||||||
|
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
|
||||||
|
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
|
||||||
|
console.log(`Job ID ${ctx.params.id} is set to active.`);
|
||||||
|
},
|
||||||
|
async drained(ctx) {
|
||||||
|
console.log("Queue drained.");
|
||||||
|
await this.broker.call("socket.broadcast", {
|
||||||
|
namespace: "/",
|
||||||
|
event: "action",
|
||||||
|
args: [
|
||||||
|
{
|
||||||
|
type: "LS_IMPORT_QUEUE_DRAINED",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
});
|
||||||
|
},
|
||||||
|
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
|
||||||
|
// 1. Fetch the job result using the job Id
|
||||||
|
const job = await this.job(ctx.params.id);
|
||||||
|
// 2. Increment the completed job counter
|
||||||
|
await pubClient.incr("completedJobCount");
|
||||||
|
// 3. Fetch the completed job count for the final payload to be sent to the client
|
||||||
|
const completedJobCount = await pubClient.get("completedJobCount");
|
||||||
|
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
|
||||||
|
await this.broker.call("socket.broadcast", {
|
||||||
|
namespace: "/",
|
||||||
|
event: "action",
|
||||||
|
args: [
|
||||||
|
{
|
||||||
|
type: "LS_COVER_EXTRACTED",
|
||||||
|
completedJobCount,
|
||||||
|
importResult: job.returnvalue.data.importResult,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
});
|
||||||
|
// 5. Persist the job results in mongo for posterity
|
||||||
|
await JobResult.create({
|
||||||
|
id: ctx.params.id,
|
||||||
|
status: "completed",
|
||||||
|
timestamp: job.timestamp,
|
||||||
|
sessionId: job.returnvalue.sessionId,
|
||||||
|
failedReason: {},
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log(`Job ID ${ctx.params.id} completed.`);
|
||||||
|
},
|
||||||
|
|
||||||
|
async "enqueue.async.failed"(ctx) {
|
||||||
|
const job = await this.job(ctx.params.id);
|
||||||
|
await pubClient.incr("failedJobCount");
|
||||||
|
const failedJobCount = await pubClient.get("failedJobCount");
|
||||||
|
|
||||||
|
await JobResult.create({
|
||||||
|
id: ctx.params.id,
|
||||||
|
status: "failed",
|
||||||
|
failedReason: job.failedReason,
|
||||||
|
sessionId: job.data.params.sessionId,
|
||||||
|
timestamp: job.timestamp,
|
||||||
|
});
|
||||||
|
|
||||||
|
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
|
||||||
|
await this.broker.call("socket.broadcast", {
|
||||||
|
namespace: "/",
|
||||||
|
event: "action",
|
||||||
|
args: [
|
||||||
|
{
|
||||||
|
type: "LS_COVER_EXTRACTION_FAILED",
|
||||||
|
failedJobCount,
|
||||||
|
importResult: job,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
});
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -33,13 +33,7 @@ SOFTWARE.
|
|||||||
|
|
||||||
"use strict";
|
"use strict";
|
||||||
import { isNil } from "lodash";
|
import { isNil } from "lodash";
|
||||||
import {
|
import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer";
|
||||||
Context,
|
|
||||||
Service,
|
|
||||||
ServiceBroker,
|
|
||||||
ServiceSchema,
|
|
||||||
Errors,
|
|
||||||
} from "moleculer";
|
|
||||||
import { DbMixin } from "../mixins/db.mixin";
|
import { DbMixin } from "../mixins/db.mixin";
|
||||||
import Comic from "../models/comic.model";
|
import Comic from "../models/comic.model";
|
||||||
import { walkFolder, getSizeOfDirectory } from "../utils/file.utils";
|
import { walkFolder, getSizeOfDirectory } from "../utils/file.utils";
|
||||||
@@ -51,6 +45,7 @@ import {
|
|||||||
IExtractionOptions,
|
IExtractionOptions,
|
||||||
} from "threetwo-ui-typings";
|
} from "threetwo-ui-typings";
|
||||||
const ObjectId = require("mongoose").Types.ObjectId;
|
const ObjectId = require("mongoose").Types.ObjectId;
|
||||||
|
import { pubClient } from "../config/redis.config";
|
||||||
import fsExtra from "fs-extra";
|
import fsExtra from "fs-extra";
|
||||||
const through2 = require("through2");
|
const through2 = require("through2");
|
||||||
import klaw from "klaw";
|
import klaw from "klaw";
|
||||||
@@ -66,6 +61,17 @@ export default class ImportService extends Service {
|
|||||||
mixins: [DbMixin("comics", Comic)],
|
mixins: [DbMixin("comics", Comic)],
|
||||||
hooks: {},
|
hooks: {},
|
||||||
actions: {
|
actions: {
|
||||||
|
getHealthInformation: {
|
||||||
|
rest: "GET /getHealthInformation",
|
||||||
|
params: {},
|
||||||
|
handler: async (ctx: Context<{}>) => {
|
||||||
|
try {
|
||||||
|
return await ctx.broker.call("$node.services");
|
||||||
|
} catch (error) {
|
||||||
|
return new Error("Service is down.");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
walkFolders: {
|
walkFolders: {
|
||||||
rest: "POST /walkFolders",
|
rest: "POST /walkFolders",
|
||||||
params: {
|
params: {
|
||||||
@@ -89,9 +95,7 @@ export default class ImportService extends Service {
|
|||||||
uncompressFullArchive: {
|
uncompressFullArchive: {
|
||||||
rest: "POST /uncompressFullArchive",
|
rest: "POST /uncompressFullArchive",
|
||||||
params: {},
|
params: {},
|
||||||
handler: async (
|
handler: async (ctx: Context<{ filePath: string; options: any }>) => {
|
||||||
ctx: Context<{ filePath: string; options: any }>
|
|
||||||
) => {
|
|
||||||
await broker.call("importqueue.uncompressResize", {
|
await broker.call("importqueue.uncompressResize", {
|
||||||
filePath: ctx.params.filePath,
|
filePath: ctx.params.filePath,
|
||||||
options: ctx.params.options,
|
options: ctx.params.options,
|
||||||
@@ -109,8 +113,7 @@ export default class ImportService extends Service {
|
|||||||
});
|
});
|
||||||
// Determine source where the comic was added from
|
// Determine source where the comic was added from
|
||||||
// and gather identifying information about it
|
// and gather identifying information about it
|
||||||
const sourceName =
|
const sourceName = referenceComicObject[0].acquisition.source.name;
|
||||||
referenceComicObject[0].acquisition.source.name;
|
|
||||||
const { sourcedMetadata } = referenceComicObject[0];
|
const { sourcedMetadata } = referenceComicObject[0];
|
||||||
|
|
||||||
const filePath = `${COMICS_DIRECTORY}/${ctx.params.bundle.data.name}`;
|
const filePath = `${COMICS_DIRECTORY}/${ctx.params.bundle.data.name}`;
|
||||||
@@ -139,64 +142,63 @@ export default class ImportService extends Service {
|
|||||||
},
|
},
|
||||||
newImport: {
|
newImport: {
|
||||||
rest: "POST /newImport",
|
rest: "POST /newImport",
|
||||||
params: {},
|
// params: {},
|
||||||
async handler(
|
async handler(
|
||||||
ctx: Context<{
|
ctx: Context<{
|
||||||
extractionOptions?: any;
|
extractionOptions?: any;
|
||||||
|
sessionId: string;
|
||||||
}>
|
}>
|
||||||
) {
|
) {
|
||||||
// 1. Walk the Source folder
|
try {
|
||||||
klaw(path.resolve(COMICS_DIRECTORY))
|
// Get params to be passed to the import jobs
|
||||||
// 1.1 Filter on .cb* extensions
|
const { sessionId } = ctx.params;
|
||||||
.pipe(
|
// 1. Walk the Source folder
|
||||||
through2.obj(function (item, enc, next) {
|
klaw(path.resolve(COMICS_DIRECTORY))
|
||||||
let fileExtension = path.extname(item.path);
|
// 1.1 Filter on .cb* extensions
|
||||||
if (
|
.pipe(
|
||||||
[".cbz", ".cbr", ".cb7"].includes(
|
through2.obj(function (item, enc, next) {
|
||||||
fileExtension
|
let fileExtension = path.extname(item.path);
|
||||||
)
|
if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) {
|
||||||
) {
|
this.push(item);
|
||||||
this.push(item);
|
}
|
||||||
}
|
next();
|
||||||
next();
|
})
|
||||||
})
|
)
|
||||||
)
|
// 1.2 Pipe filtered results to the next step
|
||||||
// 1.2 Pipe filtered results to the next step
|
// Enqueue the job in the queue
|
||||||
.on("data", async (item) => {
|
.on("data", async (item) => {
|
||||||
console.info(
|
console.info("Found a file at path: %s", item.path);
|
||||||
"Found a file at path: %s",
|
let comicExists = await Comic.exists({
|
||||||
item.path
|
"rawFileDetails.name": `${path.basename(
|
||||||
);
|
item.path,
|
||||||
let comicExists = await Comic.exists({
|
path.extname(item.path)
|
||||||
"rawFileDetails.name": `${path.basename(
|
)}`,
|
||||||
item.path,
|
});
|
||||||
path.extname(item.path)
|
if (!comicExists) {
|
||||||
)}`,
|
// 2.1 Reset the job counters in Redis
|
||||||
});
|
await pubClient.set("completedJobCount", 0);
|
||||||
if (!comicExists) {
|
await pubClient.set("failedJobCount", 0);
|
||||||
// 2. Send the extraction job to the queue
|
// 2.2 Send the extraction job to the queue
|
||||||
await broker.call(
|
this.broker.call("jobqueue.enqueue", {
|
||||||
"importqueue.processImport",
|
|
||||||
{
|
|
||||||
fileObject: {
|
fileObject: {
|
||||||
filePath: item.path,
|
filePath: item.path,
|
||||||
fileSize: item.stats.size,
|
fileSize: item.stats.size,
|
||||||
},
|
},
|
||||||
|
sessionId,
|
||||||
importType: "new",
|
importType: "new",
|
||||||
}
|
});
|
||||||
);
|
} else {
|
||||||
} else {
|
console.log("Comic already exists in the library.");
|
||||||
console.log(
|
}
|
||||||
"Comic already exists in the library."
|
})
|
||||||
);
|
.on("end", () => {
|
||||||
}
|
console.log("All files traversed.");
|
||||||
})
|
});
|
||||||
.on("end", () => {
|
} catch (error) {
|
||||||
console.log("All files traversed.");
|
console.log(error);
|
||||||
});
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
rawImportToDB: {
|
rawImportToDB: {
|
||||||
rest: "POST /rawImportToDB",
|
rest: "POST /rawImportToDB",
|
||||||
params: {},
|
params: {},
|
||||||
@@ -238,28 +240,19 @@ export default class ImportService extends Service {
|
|||||||
// we solicit volume information and add that to mongo
|
// we solicit volume information and add that to mongo
|
||||||
if (
|
if (
|
||||||
comicMetadata.sourcedMetadata.comicvine &&
|
comicMetadata.sourcedMetadata.comicvine &&
|
||||||
!isNil(
|
!isNil(comicMetadata.sourcedMetadata.comicvine.volume)
|
||||||
comicMetadata.sourcedMetadata.comicvine
|
|
||||||
.volume
|
|
||||||
)
|
|
||||||
) {
|
) {
|
||||||
volumeDetails = await this.broker.call(
|
volumeDetails = await this.broker.call("comicvine.getVolumes", {
|
||||||
"comicvine.getVolumes",
|
volumeURI:
|
||||||
{
|
comicMetadata.sourcedMetadata.comicvine.volume
|
||||||
volumeURI:
|
.api_detail_url,
|
||||||
comicMetadata.sourcedMetadata
|
});
|
||||||
.comicvine.volume
|
|
||||||
.api_detail_url,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
|
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
|
||||||
volumeDetails.results;
|
volumeDetails.results;
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log("Saving to Mongo...");
|
console.log("Saving to Mongo...");
|
||||||
console.log(
|
console.log(`Import type: [${ctx.params.importType}]`);
|
||||||
`Import type: [${ctx.params.importType}]`
|
|
||||||
);
|
|
||||||
switch (ctx.params.importType) {
|
switch (ctx.params.importType) {
|
||||||
case "new":
|
case "new":
|
||||||
return await Comic.create(comicMetadata);
|
return await Comic.create(comicMetadata);
|
||||||
@@ -280,10 +273,7 @@ export default class ImportService extends Service {
|
|||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.log(error);
|
console.log(error);
|
||||||
throw new Errors.MoleculerError(
|
throw new Errors.MoleculerError("Import failed.", 500);
|
||||||
"Import failed.",
|
|
||||||
500
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -301,9 +291,7 @@ export default class ImportService extends Service {
|
|||||||
) {
|
) {
|
||||||
// 1. Find mongo object by id
|
// 1. Find mongo object by id
|
||||||
// 2. Import payload into sourcedMetadata.comicvine
|
// 2. Import payload into sourcedMetadata.comicvine
|
||||||
const comicObjectId = new ObjectId(
|
const comicObjectId = new ObjectId(ctx.params.comicObjectId);
|
||||||
ctx.params.comicObjectId
|
|
||||||
);
|
|
||||||
|
|
||||||
return new Promise(async (resolve, reject) => {
|
return new Promise(async (resolve, reject) => {
|
||||||
let volumeDetails = {};
|
let volumeDetails = {};
|
||||||
@@ -312,18 +300,15 @@ export default class ImportService extends Service {
|
|||||||
const volumeDetails = await this.broker.call(
|
const volumeDetails = await this.broker.call(
|
||||||
"comicvine.getVolumes",
|
"comicvine.getVolumes",
|
||||||
{
|
{
|
||||||
volumeURI:
|
volumeURI: matchedResult.volume.api_detail_url,
|
||||||
matchedResult.volume.api_detail_url,
|
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
matchedResult.volumeInformation =
|
matchedResult.volumeInformation = volumeDetails.results;
|
||||||
volumeDetails.results;
|
|
||||||
Comic.findByIdAndUpdate(
|
Comic.findByIdAndUpdate(
|
||||||
comicObjectId,
|
comicObjectId,
|
||||||
{
|
{
|
||||||
$set: {
|
$set: {
|
||||||
"sourcedMetadata.comicvine":
|
"sourcedMetadata.comicvine": matchedResult,
|
||||||
matchedResult,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{ new: true },
|
{ new: true },
|
||||||
@@ -354,9 +339,7 @@ export default class ImportService extends Service {
|
|||||||
}>
|
}>
|
||||||
) {
|
) {
|
||||||
console.log(JSON.stringify(ctx.params, null, 2));
|
console.log(JSON.stringify(ctx.params, null, 2));
|
||||||
const comicObjectId = new ObjectId(
|
const comicObjectId = new ObjectId(ctx.params.comicObjectId);
|
||||||
ctx.params.comicObjectId
|
|
||||||
);
|
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
Comic.findByIdAndUpdate(
|
Comic.findByIdAndUpdate(
|
||||||
@@ -410,9 +393,7 @@ export default class ImportService extends Service {
|
|||||||
params: { ids: "array" },
|
params: { ids: "array" },
|
||||||
handler: async (ctx: Context<{ ids: [string] }>) => {
|
handler: async (ctx: Context<{ ids: [string] }>) => {
|
||||||
console.log(ctx.params.ids);
|
console.log(ctx.params.ids);
|
||||||
const queryIds = ctx.params.ids.map(
|
const queryIds = ctx.params.ids.map((id) => new ObjectId(id));
|
||||||
(id) => new ObjectId(id)
|
|
||||||
);
|
|
||||||
return await Comic.find({
|
return await Comic.find({
|
||||||
_id: {
|
_id: {
|
||||||
$in: queryIds,
|
$in: queryIds,
|
||||||
@@ -428,8 +409,7 @@ export default class ImportService extends Service {
|
|||||||
const volumes = await Comic.aggregate([
|
const volumes = await Comic.aggregate([
|
||||||
{
|
{
|
||||||
$project: {
|
$project: {
|
||||||
volumeInfo:
|
volumeInfo: "$sourcedMetadata.comicvine.volumeInformation",
|
||||||
"$sourcedMetadata.comicvine.volumeInformation",
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -475,52 +455,46 @@ export default class ImportService extends Service {
|
|||||||
const { queryObjects } = ctx.params;
|
const { queryObjects } = ctx.params;
|
||||||
// construct the query for ElasticSearch
|
// construct the query for ElasticSearch
|
||||||
let elasticSearchQuery = {};
|
let elasticSearchQuery = {};
|
||||||
const elasticSearchQueries = queryObjects.map(
|
const elasticSearchQueries = queryObjects.map((queryObject) => {
|
||||||
(queryObject) => {
|
console.log("Volume: ", queryObject.volumeName);
|
||||||
console.log("Volume: ", queryObject.volumeName);
|
console.log("Issue: ", queryObject.issueName);
|
||||||
console.log("Issue: ", queryObject.issueName);
|
if (queryObject.issueName === null) {
|
||||||
if (queryObject.issueName === null) {
|
queryObject.issueName = "";
|
||||||
queryObject.issueName = "";
|
|
||||||
}
|
|
||||||
if (queryObject.volumeName === null) {
|
|
||||||
queryObject.volumeName = "";
|
|
||||||
}
|
|
||||||
elasticSearchQuery = {
|
|
||||||
bool: {
|
|
||||||
must: [
|
|
||||||
{
|
|
||||||
match_phrase: {
|
|
||||||
"rawFileDetails.name":
|
|
||||||
queryObject.volumeName,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
term: {
|
|
||||||
"inferredMetadata.issue.number":
|
|
||||||
parseInt(
|
|
||||||
queryObject.issueNumber,
|
|
||||||
10
|
|
||||||
),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
],
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
index: "comics",
|
|
||||||
search_type: "dfs_query_then_fetch",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
query: elasticSearchQuery,
|
|
||||||
},
|
|
||||||
];
|
|
||||||
}
|
}
|
||||||
);
|
if (queryObject.volumeName === null) {
|
||||||
console.log(
|
queryObject.volumeName = "";
|
||||||
JSON.stringify(elasticSearchQueries, null, 2)
|
}
|
||||||
);
|
elasticSearchQuery = {
|
||||||
|
bool: {
|
||||||
|
must: [
|
||||||
|
{
|
||||||
|
match_phrase: {
|
||||||
|
"rawFileDetails.name": queryObject.volumeName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
term: {
|
||||||
|
"inferredMetadata.issue.number": parseInt(
|
||||||
|
queryObject.issueNumber,
|
||||||
|
10
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
index: "comics",
|
||||||
|
search_type: "dfs_query_then_fetch",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
query: elasticSearchQuery,
|
||||||
|
},
|
||||||
|
];
|
||||||
|
});
|
||||||
|
console.log(JSON.stringify(elasticSearchQueries, null, 2));
|
||||||
|
|
||||||
return await ctx.broker.call("search.searchComic", {
|
return await ctx.broker.call("search.searchComic", {
|
||||||
elasticSearchQueries,
|
elasticSearchQueries,
|
||||||
@@ -533,10 +507,11 @@ export default class ImportService extends Service {
|
|||||||
rest: "GET /libraryStatistics",
|
rest: "GET /libraryStatistics",
|
||||||
params: {},
|
params: {},
|
||||||
handler: async (ctx: Context<{}>) => {
|
handler: async (ctx: Context<{}>) => {
|
||||||
const comicDirectorySize = await getSizeOfDirectory(
|
const comicDirectorySize = await getSizeOfDirectory(COMICS_DIRECTORY, [
|
||||||
COMICS_DIRECTORY,
|
".cbz",
|
||||||
[".cbz", ".cbr", ".cb7"]
|
".cbr",
|
||||||
);
|
".cb7",
|
||||||
|
]);
|
||||||
const totalCount = await Comic.countDocuments({});
|
const totalCount = await Comic.countDocuments({});
|
||||||
const statistics = await Comic.aggregate([
|
const statistics = await Comic.aggregate([
|
||||||
{
|
{
|
||||||
@@ -545,11 +520,7 @@ export default class ImportService extends Service {
|
|||||||
{
|
{
|
||||||
$match: {
|
$match: {
|
||||||
"rawFileDetails.extension": {
|
"rawFileDetails.extension": {
|
||||||
$in: [
|
$in: [".cbr", ".cbz", ".cb7"],
|
||||||
".cbr",
|
|
||||||
".cbz",
|
|
||||||
".cb7",
|
|
||||||
],
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -563,10 +534,9 @@ export default class ImportService extends Service {
|
|||||||
issues: [
|
issues: [
|
||||||
{
|
{
|
||||||
$match: {
|
$match: {
|
||||||
"sourcedMetadata.comicvine.volumeInformation":
|
"sourcedMetadata.comicvine.volumeInformation": {
|
||||||
{
|
$gt: {},
|
||||||
$gt: {},
|
},
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -629,23 +599,16 @@ export default class ImportService extends Service {
|
|||||||
.drop()
|
.drop()
|
||||||
.then(async (data) => {
|
.then(async (data) => {
|
||||||
console.info(data);
|
console.info(data);
|
||||||
const coversFolderDeleteResult =
|
const coversFolderDeleteResult = fsExtra.emptyDirSync(
|
||||||
fsExtra.emptyDirSync(
|
path.resolve(`${USERDATA_DIRECTORY}/covers`)
|
||||||
path.resolve(
|
);
|
||||||
`${USERDATA_DIRECTORY}/covers`
|
const expandedFolderDeleteResult = fsExtra.emptyDirSync(
|
||||||
)
|
path.resolve(`${USERDATA_DIRECTORY}/expanded`)
|
||||||
);
|
);
|
||||||
const expandedFolderDeleteResult =
|
const eSIndicesDeleteResult = await ctx.broker.call(
|
||||||
fsExtra.emptyDirSync(
|
"search.deleteElasticSearchIndices",
|
||||||
path.resolve(
|
{}
|
||||||
`${USERDATA_DIRECTORY}/expanded`
|
);
|
||||||
)
|
|
||||||
);
|
|
||||||
const eSIndicesDeleteResult =
|
|
||||||
await ctx.broker.call(
|
|
||||||
"search.deleteElasticSearchIndices",
|
|
||||||
{}
|
|
||||||
);
|
|
||||||
return {
|
return {
|
||||||
data,
|
data,
|
||||||
coversFolderDeleteResult,
|
coversFolderDeleteResult,
|
||||||
|
|||||||
@@ -46,14 +46,15 @@ export default class SettingsService extends Service {
|
|||||||
.map((item) => JSON.stringify(item))
|
.map((item) => JSON.stringify(item))
|
||||||
.join("\n");
|
.join("\n");
|
||||||
queries += "\n";
|
queries += "\n";
|
||||||
const { body } = await eSClient.msearch({
|
const { responses } = await eSClient.msearch({
|
||||||
body: queries,
|
body: queries,
|
||||||
});
|
});
|
||||||
body.responses.forEach((match) => {
|
|
||||||
|
responses.forEach((match) => {
|
||||||
console.log(match.hits);
|
console.log(match.hits);
|
||||||
});
|
});
|
||||||
|
|
||||||
return body.responses;
|
return responses;
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
issue: {
|
issue: {
|
||||||
|
|||||||
@@ -1,16 +1,14 @@
|
|||||||
"use strict";
|
"use strict";
|
||||||
import { Service, ServiceBroker, ServiceSchema } from "moleculer";
|
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
|
||||||
|
import { JobType } from "moleculer-bullmq";
|
||||||
import { createClient } from "redis";
|
import { createClient } from "redis";
|
||||||
import { createAdapter } from "@socket.io/redis-adapter";
|
import { createAdapter } from "@socket.io/redis-adapter";
|
||||||
|
import Session from "../models/session.model";
|
||||||
|
import { pubClient, subClient } from "../config/redis.config";
|
||||||
|
const { MoleculerError } = require("moleculer").Errors;
|
||||||
const SocketIOService = require("moleculer-io");
|
const SocketIOService = require("moleculer-io");
|
||||||
const redisURL = new URL(process.env.REDIS_URI);
|
const { v4: uuidv4 } = require("uuid");
|
||||||
// console.log(redisURL.hostname);
|
|
||||||
|
|
||||||
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
|
|
||||||
(async () => {
|
|
||||||
await pubClient.connect();
|
|
||||||
})();
|
|
||||||
const subClient = pubClient.duplicate();
|
|
||||||
export default class SocketService extends Service {
|
export default class SocketService extends Service {
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
public constructor(
|
public constructor(
|
||||||
@@ -30,31 +28,75 @@ export default class SocketService extends Service {
|
|||||||
call: {
|
call: {
|
||||||
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
|
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
|
||||||
},
|
},
|
||||||
action: async (data, ack) => {
|
action: async (data) => {
|
||||||
// write your handler function here.
|
|
||||||
switch (data.type) {
|
switch (data.type) {
|
||||||
case "LS_IMPORT":
|
case "RESUME_SESSION":
|
||||||
console.log(
|
console.log("Attempting to resume session...");
|
||||||
`Recieved ${data.type} event.`
|
try {
|
||||||
);
|
const sessionRecord = await Session.find({
|
||||||
// 1. Send task to queue
|
sessionId: data.session.sessionId,
|
||||||
await this.broker.call(
|
});
|
||||||
"library.newImport",
|
// 1. Check for sessionId's existence, and a match
|
||||||
data.data,
|
if (
|
||||||
{}
|
sessionRecord.length !== 0 &&
|
||||||
);
|
sessionRecord[0].sessionId ===
|
||||||
|
data.session.sessionId
|
||||||
|
) {
|
||||||
|
// 2. Find if the queue has active jobs
|
||||||
|
const jobs: JobType = await this.broker.call(
|
||||||
|
"jobqueue.getJobCountsByType",
|
||||||
|
{}
|
||||||
|
);
|
||||||
|
const { active } = jobs;
|
||||||
|
|
||||||
|
if (active > 0) {
|
||||||
|
// 3. Get job counts
|
||||||
|
const completedJobCount =
|
||||||
|
await pubClient.get(
|
||||||
|
"completedJobCount"
|
||||||
|
);
|
||||||
|
const failedJobCount = await pubClient.get(
|
||||||
|
"failedJobCount"
|
||||||
|
);
|
||||||
|
|
||||||
|
// 4. Send the counts to the active socket.io session
|
||||||
|
await this.broker.call("socket.broadcast", {
|
||||||
|
namespace: "/",
|
||||||
|
event: "action",
|
||||||
|
args: [
|
||||||
|
{
|
||||||
|
type: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION",
|
||||||
|
completedJobCount,
|
||||||
|
failedJobCount,
|
||||||
|
queueStatus: "running",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
throw new MoleculerError(
|
||||||
|
err,
|
||||||
|
500,
|
||||||
|
"SESSION_ID_NOT_FOUND",
|
||||||
|
{
|
||||||
|
data: data.session.sessionId,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
case "LS_TOGGLE_IMPORT_QUEUE":
|
|
||||||
|
case "LS_SET_QUEUE_STATUS":
|
||||||
|
console.log(data);
|
||||||
await this.broker.call(
|
await this.broker.call(
|
||||||
"importqueue.toggleImportQueue",
|
"jobqueue.toggle",
|
||||||
data.data,
|
{ action: data.data.queueAction },
|
||||||
{}
|
{}
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
case "LS_SINGLE_IMPORT":
|
case "LS_SINGLE_IMPORT":
|
||||||
console.info(
|
console.info("AirDC++ finished a download -> ");
|
||||||
"AirDC++ finished a download -> "
|
|
||||||
);
|
|
||||||
console.log(data);
|
console.log(data);
|
||||||
await this.broker.call(
|
await this.broker.call(
|
||||||
"library.importDownloadedComic",
|
"library.importDownloadedComic",
|
||||||
@@ -80,9 +122,36 @@ export default class SocketService extends Service {
|
|||||||
actions: {},
|
actions: {},
|
||||||
methods: {},
|
methods: {},
|
||||||
async started() {
|
async started() {
|
||||||
this.io.on("connection", (data) =>
|
this.io.on("connection", async (socket) => {
|
||||||
console.log("socket.io server initialized.")
|
console.log(
|
||||||
);
|
`socket.io server connected to client with session ID: ${socket.id}`
|
||||||
|
);
|
||||||
|
console.log("Looking up sessionId in Mongo...");
|
||||||
|
const sessionIdExists = await Session.find({
|
||||||
|
sessionId: socket.handshake.query.sessionId,
|
||||||
|
});
|
||||||
|
// 1. if sessionId isn't found in Mongo, create one and persist it
|
||||||
|
if (sessionIdExists.length === 0) {
|
||||||
|
console.log(
|
||||||
|
`Socket Id ${socket.id} not found in Mongo, creating a new session...`
|
||||||
|
);
|
||||||
|
const sessionId = uuidv4();
|
||||||
|
socket.sessionId = sessionId;
|
||||||
|
console.log(`Saving session ${sessionId} to Mongo...`);
|
||||||
|
await Session.create({
|
||||||
|
sessionId,
|
||||||
|
socketId: socket.id,
|
||||||
|
});
|
||||||
|
socket.emit("sessionInitialized", sessionId);
|
||||||
|
}
|
||||||
|
// 2. else, retrieve it from Mongo and "resume" the socket.io connection
|
||||||
|
else {
|
||||||
|
console.log(
|
||||||
|
`Found socketId ${socket.id}, attempting to resume socket.io connection...`
|
||||||
|
);
|
||||||
|
console.log(socket.handshake.query.sessionId);
|
||||||
|
}
|
||||||
|
});
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ import {
|
|||||||
getMimeType,
|
getMimeType,
|
||||||
} from "../utils/file.utils";
|
} from "../utils/file.utils";
|
||||||
import { convertXMLToJSON } from "./xml.utils";
|
import { convertXMLToJSON } from "./xml.utils";
|
||||||
|
const { MoleculerError } = require("moleculer").Errors;
|
||||||
const fse = require("fs-extra");
|
const fse = require("fs-extra");
|
||||||
const Unrar = require("unrar");
|
const Unrar = require("unrar");
|
||||||
interface RarFile {
|
interface RarFile {
|
||||||
@@ -254,7 +255,7 @@ export const extractComicInfoXMLFromZip = async (
|
|||||||
// Push the first file (cover) to our extraction target
|
// Push the first file (cover) to our extraction target
|
||||||
extractionTargets.push(files[0].name);
|
extractionTargets.push(files[0].name);
|
||||||
filesToWriteToDisk.coverFile = path.basename(files[0].name);
|
filesToWriteToDisk.coverFile = path.basename(files[0].name);
|
||||||
|
|
||||||
if (!isEmpty(comicInfoXMLFileObject)) {
|
if (!isEmpty(comicInfoXMLFileObject)) {
|
||||||
filesToWriteToDisk.comicInfoXML = comicInfoXMLFileObject[0].name;
|
filesToWriteToDisk.comicInfoXML = comicInfoXMLFileObject[0].name;
|
||||||
extractionTargets.push(filesToWriteToDisk.comicInfoXML);
|
extractionTargets.push(filesToWriteToDisk.comicInfoXML);
|
||||||
@@ -364,10 +365,13 @@ export const extractFromArchive = async (filePath: string) => {
|
|||||||
return Object.assign({}, ...cbrResult);
|
return Object.assign({}, ...cbrResult);
|
||||||
|
|
||||||
default:
|
default:
|
||||||
console.log(
|
console.error(
|
||||||
"Error inferring filetype for comicinfo.xml extraction."
|
"Error inferring filetype for comicinfo.xml extraction."
|
||||||
);
|
);
|
||||||
break;
|
throw new MoleculerError({}, 500, "FILETYPE_INFERENCE_ERROR", {
|
||||||
|
data: { message: "Cannot infer filetype."},
|
||||||
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user