24 Commits

Author SHA1 Message Date
b35e2140b5 🧲 Created a dedicated queue for torrent ops 2024-03-29 19:36:16 -04:00
f053dcb789 🧲 Massaging data to be sent to UI 2024-03-27 22:22:40 -05:00
aea7a24f76 🧲 Added a job for deleted torrents clean-up 2024-03-24 17:31:31 -04:00
8f0c2f4302 ⚙️ getAllSettings is parameterized 2024-03-12 16:39:44 -05:00
7dbe2b2701 🏗️ Added torrent attrs to comic model 2024-03-03 12:22:40 -05:00
4cdb11fcbd Cleaned the console.logs 2024-01-08 16:40:12 -05:00
78f7c1b595 🤐 Added uncompression event 2024-01-07 22:13:02 -05:00
bbd2906ebf 🏗️ Added some archive-related keys to Comic model 2024-01-06 11:17:40 -05:00
f3965437b5 🏗 Added a job for full archive extraction 2023-12-30 00:50:06 -05:00
78e0e9f8ce 🏗️ Refactored the searchIssue method 2023-12-28 22:52:33 -05:00
c926758db6 🏗️ Added a downloads array to bittorent schema 2023-12-20 00:08:38 -05:00
b2b35aedc0 🏗️ Fixed a mongo update query 2023-11-27 02:14:16 -05:00
f35e3ccbe0 Removed useless code 2023-11-15 16:02:07 -06:00
7b0c0a7420 Added the importSingleIssue action 2023-11-15 15:59:27 -06:00
c2bbbf311d 🏗️ Fixed setQueueStatus 2023-11-14 13:24:49 -06:00
b8ca03220f 🏗 Implemented setQueueStatus 2023-11-13 22:01:01 -05:00
b87b0c875d 🏗️ Fleshed out resumeSession event 2023-11-13 21:18:19 -05:00
11fbaf10db 🏗 Wired up the events correctly 2023-11-13 16:41:58 -05:00
1229feb69c 🏗️ Refactor for zustand and tanstack react query support 2023-11-09 10:22:45 -06:00
3efdc7c2e2 ⚙️ Refactored saveSettings endpoint 2023-09-15 15:49:13 -04:00
1fff931941 🌊 Modified settings model schema 2023-09-13 22:09:25 -05:00
f4e2db5a5f 📦 Instruction for paths for unrar and p7zip 2023-09-01 09:44:02 -05:00
1d7561279b 📕 Updated local dev instructions in README 2023-09-01 09:22:02 -05:00
9e47ae0436 Merge pull request #6 from rishighan/migration-to-bullmq
🐂 Migration to moleculer-bullMQ
2023-08-30 13:50:47 -04:00
12 changed files with 1169 additions and 715 deletions

View File

@@ -3,21 +3,35 @@
This [moleculer-based](https://github.com/moleculerjs/moleculer-web) microservice houses endpoints for the following functions:
1. Local import of a comic library into mongo (currently supports `cbr` and `cbz` files)
2. Metadata extraction from file, `comicinfo.xml`
2. Metadata extraction from file, `comicinfo.xml`
3. Mongo comic object orchestration
4. CRUD operations on `Comic` model
5. Helper utils to help with image metadata extraction, file operations and more.
## Local Development
1. ~~You need `calibre` in your local path.
On `macOS` you can `brew install calibre` and make sure that `ebook-meta` is present on the path~~ Calibre is no longer required as a dependency. Ignore this step.
2. You need `mongo` for the data store. on `macOS` you can use [these instructions](https://docs.mongodb.com/manual/tutorial/install-mongodb-on-os-x/) to install it
1. You need the following dependencies installed: `mongo`, `elasticsearch` and `redis`
2. You also need binaries for `unrar` and `p7zip`
3. Clone this repo
4. Run `npm i`
5. Assuming you installed mongo correctly, run `MONGO_URI=mongodb://localhost:27017/threetwo npm run dev` to start the service
5. Assuming you installed the dependencies correctly, run:
```
COMICS_DIRECTORY=<PATH_TO_COMICS_DIRECTORY> \
USERDATA_DIRECTORY=<PATH_TO_USERDATA_DIRECTORY> \
REDIS_URI=redis://<REDIS_HOST:REDIS_PORT> \
ELASTICSEARCH_URI=<ELASTICSEARCH_HOST:ELASTICSEARCH_PORT> \
MONGO_URI=mongodb://<MONGO_HOST:MONGO_PORT>/threetwo \
UNRAR_BIN_PATH=<UNRAR_BIN_PATH> \
SEVENZ_BINARY_PATH=<SEVENZ_BINARY_PATH> \
npm run dev
```
to start the service
6. You should see the service spin up and a list of all the endpoints in the terminal
7. The service can be accessed through `http://localhost:3000/api/import/*`
7. The service can be accessed through `http://localhost:3000/api/<serviceName>/*`
## Docker Instructions
1. Build the image using `docker build . -t frishi/threetwo-import-service`. Give it a hot minute.

View File

@@ -2,7 +2,7 @@ const paginate = require("mongoose-paginate-v2");
const { Client } = require("@elastic/elasticsearch");
import ComicVineMetadataSchema from "./comicvine.metadata.model";
import { mongoosastic } from "mongoosastic-ts";
const mongoose = require("mongoose")
const mongoose = require("mongoose");
import {
MongoosasticDocument,
MongoosasticModel,
@@ -28,6 +28,10 @@ const RawFileDetailsSchema = mongoose.Schema({
mimeType: String,
containedIn: String,
pageCount: Number,
archive: {
uncompressed: Boolean,
expandedPath: String,
},
cover: {
filePath: String,
stats: Object,
@@ -111,12 +115,13 @@ const ComicSchema = mongoose.Schema(
default: [],
},
},
torrent: {
sourceApplication: String,
magnet: String,
tracker: String,
status: String,
},
torrent: [
{
infoHash: String,
name: String,
announce: [String],
},
],
usenet: {
sourceApplication: String,
},

View File

@@ -1,21 +1,34 @@
const mongoose = require("mongoose");
const paginate = require("mongoose-paginate-v2");
const HostSchema = mongoose.Schema({
_id: false,
username: String,
password: String,
hostname: String,
port: String,
protocol: String,
});
const SettingsScehma = mongoose.Schema({
directConnect: {
client: {
host: {
username: String,
password: String,
hostname: String,
port: String,
protocol: String,
},
host: HostSchema,
airDCPPUserSettings: Object,
hubs: Array,
},
},
bittorrent: {
client: {
name: String,
host: HostSchema,
},
},
prowlarr: {
client: {
host: HostSchema,
apiKey: String,
},
},
});
const Settings = mongoose.model("Settings", SettingsScehma);

861
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -7,6 +7,8 @@ import {
ServiceSchema,
Errors,
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import path from "path";
import {
analyze,
@@ -22,16 +24,13 @@ export default class ImageTransformation extends Service {
super(broker);
this.parseServiceSchema({
name: "imagetransformation",
mixins: [],
mixins: [DbMixin("comics", Comic)],
settings: {
// Available fields in the responses
fields: ["_id", "name", "quantity", "price"],
fields: ["_id"],
// Validator for the `create` & `insert` actions.
entityValidator: {
name: "string|min:3",
price: "number|positive",
},
entityValidator: {},
},
hooks: {},
actions: {

View File

@@ -2,10 +2,16 @@ import { Context, Service, ServiceBroker } from "moleculer";
import JobResult from "../models/jobresult.model";
import { refineQuery } from "filename-parser";
import BullMqMixin from "moleculer-bullmq";
import { extractFromArchive } from "../utils/uncompression.utils";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
const ObjectId = require("mongoose").Types.ObjectId;
import {
extractFromArchive,
uncompressEntireArchive,
} from "../utils/uncompression.utils";
import { isNil, isUndefined } from "lodash";
import { pubClient } from "../config/redis.config";
import path from "path";
const { MoleculerError } = require("moleculer").Errors;
console.log(process.env.REDIS_URI);
@@ -15,7 +21,7 @@ export default class JobQueueService extends Service {
this.parseServiceSchema({
name: "jobqueue",
hooks: {},
mixins: [BullMqMixin],
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: process.env.REDIS_URI,
@@ -44,19 +50,30 @@ export default class JobQueueService extends Service {
}
},
},
enqueue: {
queue: true,
rest: "/GET enqueue",
handler: async (ctx: Context<{}>) => {
rest: "GET /enqueue",
handler: async (
ctx: Context<{ action: string; description: string }>
) => {
const { action, description } = ctx.params;
// Enqueue the job
const job = await this.localQueue(ctx, "enqueue.async", ctx.params, {
priority: 10,
});
const job = await this.localQueue(
ctx,
action,
ctx.params,
{
priority: 10,
}
);
console.log(`Job ${job.id} enqueued`);
console.log(`${description}`);
return job.id;
},
},
// Comic Book Import Job Queue
"enqueue.async": {
handler: async (
@@ -65,13 +82,16 @@ export default class JobQueueService extends Service {
}>
) => {
try {
console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`);
console.log(ctx.params);
console.log(
`Recieved Job ID ${ctx.locals.job.id}, processing...`
);
// 1. De-structure the job params
const { fileObject } = ctx.locals.job.data.params;
// 2. Extract metadata from the archive
const result = await extractFromArchive(fileObject.filePath);
const result = await extractFromArchive(
fileObject.filePath
);
const {
name,
filePath,
@@ -84,7 +104,9 @@ export default class JobQueueService extends Service {
} = result;
// 3a. Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(result.name);
const { inferredIssueDetails } = refineQuery(
result.name
);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
@@ -124,7 +146,8 @@ export default class JobQueueService extends Service {
// "acquisition.directconnect.downloads": [],
// mark the metadata source
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom,
"acquisition.source.name":
ctx.locals.job.data.params.sourcedFrom,
};
// 3c. Add the bundleId, if present to the payload
@@ -135,8 +158,13 @@ export default class JobQueueService extends Service {
// 3d. Add the sourcedMetadata, if present
if (
!isNil(ctx.locals.job.data.params.sourcedMetadata) &&
!isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine)
!isNil(
ctx.locals.job.data.params.sourcedMetadata
) &&
!isUndefined(
ctx.locals.job.data.params.sourcedMetadata
.comicvine
)
) {
Object.assign(
payload.sourcedMetadata,
@@ -145,11 +173,15 @@ export default class JobQueueService extends Service {
}
// 4. write to mongo
const importResult = await this.broker.call("library.rawImportToDB", {
importType: ctx.locals.job.data.params.importType,
bundleId,
payload,
});
const importResult = await this.broker.call(
"library.rawImportToDB",
{
importType:
ctx.locals.job.data.params.importType,
bundleId,
payload,
}
);
return {
data: {
importResult,
@@ -161,9 +193,14 @@ export default class JobQueueService extends Service {
console.error(
`An error occurred processing Job ID ${ctx.locals.job.id}`
);
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", {
data: ctx.params.sessionId,
});
throw new MoleculerError(
error,
500,
"IMPORT_JOB_ERROR",
{
data: ctx.params.sessionId,
}
);
}
},
},
@@ -191,7 +228,8 @@ export default class JobQueueService extends Service {
statuses: {
$push: {
status: "$_id.status",
earliestTimestamp: "$earliestTimestamp",
earliestTimestamp:
"$earliestTimestamp",
count: "$count",
},
},
@@ -211,7 +249,10 @@ export default class JobQueueService extends Service {
{
$cond: [
{
$eq: ["$$this.status", "completed"],
$eq: [
"$$this.status",
"completed",
],
},
"$$this.count",
0,
@@ -231,7 +272,10 @@ export default class JobQueueService extends Service {
{
$cond: [
{
$eq: ["$$this.status", "failed"],
$eq: [
"$$this.status",
"failed",
],
},
"$$this.count",
0,
@@ -249,9 +293,75 @@ export default class JobQueueService extends Service {
]);
},
},
"uncompressFullArchive.async": {
rest: "POST /uncompressFullArchive",
handler: async (
ctx: Context<{
filePath: string;
comicObjectId: string;
options: any;
}>
) => {
console.log(
`Recieved Job ID ${JSON.stringify(
ctx.locals
)}, processing...`
);
const { filePath, options, comicObjectId } = ctx.params;
const comicId = new ObjectId(comicObjectId);
// 2. Extract metadata from the archive
const result: string[] = await uncompressEntireArchive(
filePath,
options
);
if (Array.isArray(result) && result.length !== 0) {
// Get the containing directory of the uncompressed archive
const directoryPath = path.dirname(result[0]);
// Add to mongo object
await Comic.findByIdAndUpdate(
comicId,
{
$set: {
"rawFileDetails.archive": {
uncompressed: true,
expandedPath: directoryPath,
},
},
},
{ new: true, safe: true, upsert: true }
);
return result;
}
},
},
},
events: {
async "uncompressFullArchive.async.active"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} is set to active.`
);
},
async "uncompressFullArchive.async.completed"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} completed.`
);
const job = await this.job(ctx.params.id);
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_UNCOMPRESSION_JOB_COMPLETE",
args: [
{
uncompressedArchive: job.returnvalue,
},
],
});
return job.returnvalue;
},
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
console.log(`Job ID ${ctx.params.id} is set to active.`);
@@ -260,10 +370,10 @@ export default class JobQueueService extends Service {
console.log("Queue drained.");
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
event: "LS_IMPORT_QUEUE_DRAINED",
args: [
{
type: "LS_IMPORT_QUEUE_DRAINED",
message: "drained",
},
],
});
@@ -274,14 +384,15 @@ export default class JobQueueService extends Service {
// 2. Increment the completed job counter
await pubClient.incr("completedJobCount");
// 3. Fetch the completed job count for the final payload to be sent to the client
const completedJobCount = await pubClient.get("completedJobCount");
const completedJobCount = await pubClient.get(
"completedJobCount"
);
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
event: "LS_COVER_EXTRACTED",
args: [
{
type: "LS_COVER_EXTRACTED",
completedJobCount,
importResult: job.returnvalue.data.importResult,
},
@@ -302,7 +413,9 @@ export default class JobQueueService extends Service {
async "enqueue.async.failed"(ctx) {
const job = await this.job(ctx.params.id);
await pubClient.incr("failedJobCount");
const failedJobCount = await pubClient.get("failedJobCount");
const failedJobCount = await pubClient.get(
"failedJobCount"
);
await JobResult.create({
id: ctx.params.id,
@@ -315,10 +428,9 @@ export default class JobQueueService extends Service {
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
event: "LS_COVER_EXTRACTION_FAILED",
args: [
{
type: "LS_COVER_EXTRACTION_FAILED",
failedJobCount,
importResult: job,
},
@@ -326,6 +438,7 @@ export default class JobQueueService extends Service {
});
},
},
methods: {},
});
}
}

View File

@@ -33,7 +33,13 @@ SOFTWARE.
"use strict";
import { isNil } from "lodash";
import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import { walkFolder, getSizeOfDirectory } from "../utils/file.utils";
@@ -74,14 +80,19 @@ export default class ImportService extends Service {
},
walkFolders: {
rest: "POST /walkFolders",
params: {
basePathToWalk: "string",
},
async handler(ctx: Context<{ basePathToWalk: string }>) {
params: {},
async handler(
ctx: Context<{
basePathToWalk: string;
extensions: string[];
}>
) {
console.log(ctx.params);
return await walkFolder(ctx.params.basePathToWalk, [
".cbz",
".cbr",
".cb7",
...ctx.params.extensions,
]);
},
},
@@ -95,10 +106,19 @@ export default class ImportService extends Service {
uncompressFullArchive: {
rest: "POST /uncompressFullArchive",
params: {},
handler: async (ctx: Context<{ filePath: string; options: any }>) => {
await broker.call("importqueue.uncompressResize", {
handler: async (
ctx: Context<{
filePath: string;
comicObjectId: string;
options: any;
}>
) => {
this.broker.call("jobqueue.enqueue", {
filePath: ctx.params.filePath,
comicObjectId: ctx.params.comicObjectId,
options: ctx.params.options,
queueName: "uncompressFullArchive.async",
description: `Job for uncompressing archive at ${ctx.params.filePath}`,
});
},
},
@@ -113,7 +133,8 @@ export default class ImportService extends Service {
});
// Determine source where the comic was added from
// and gather identifying information about it
const sourceName = referenceComicObject[0].acquisition.source.name;
const sourceName =
referenceComicObject[0].acquisition.source.name;
const { sourcedMetadata } = referenceComicObject[0];
const filePath = `${COMICS_DIRECTORY}/${ctx.params.bundle.data.name}`;
@@ -157,8 +178,14 @@ export default class ImportService extends Service {
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
let fileExtension = path.extname(item.path);
if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) {
let fileExtension = path.extname(
item.path
);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
this.push(item);
}
next();
@@ -167,7 +194,10 @@ export default class ImportService extends Service {
// 1.2 Pipe filtered results to the next step
// Enqueue the job in the queue
.on("data", async (item) => {
console.info("Found a file at path: %s", item.path);
console.info(
"Found a file at path: %s",
item.path
);
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
@@ -176,8 +206,14 @@ export default class ImportService extends Service {
});
if (!comicExists) {
// 2.1 Reset the job counters in Redis
await pubClient.set("completedJobCount", 0);
await pubClient.set("failedJobCount", 0);
await pubClient.set(
"completedJobCount",
0
);
await pubClient.set(
"failedJobCount",
0
);
// 2.2 Send the extraction job to the queue
this.broker.call("jobqueue.enqueue", {
fileObject: {
@@ -186,9 +222,12 @@ export default class ImportService extends Service {
},
sessionId,
importType: "new",
action: "enqueue.async",
});
} else {
console.log("Comic already exists in the library.");
console.log(
"Comic already exists in the library."
);
}
})
.on("end", () => {
@@ -240,21 +279,31 @@ export default class ImportService extends Service {
// we solicit volume information and add that to mongo
if (
comicMetadata.sourcedMetadata.comicvine &&
!isNil(comicMetadata.sourcedMetadata.comicvine.volume)
!isNil(
comicMetadata.sourcedMetadata.comicvine
.volume
)
) {
volumeDetails = await this.broker.call("comicvine.getVolumes", {
volumeURI:
comicMetadata.sourcedMetadata.comicvine.volume
.api_detail_url,
});
volumeDetails = await this.broker.call(
"comicvine.getVolumes",
{
volumeURI:
comicMetadata.sourcedMetadata
.comicvine.volume
.api_detail_url,
}
);
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
volumeDetails.results;
}
console.log("Saving to Mongo...");
console.log(`Import type: [${ctx.params.importType}]`);
console.log(
`Import type: [${ctx.params.importType}]`
);
switch (ctx.params.importType) {
case "new":
console.log(comicMetadata);
return await Comic.create(comicMetadata);
case "update":
return await Comic.findOneAndUpdate(
@@ -273,7 +322,10 @@ export default class ImportService extends Service {
}
} catch (error) {
console.log(error);
throw new Errors.MoleculerError("Import failed.", 500);
throw new Errors.MoleculerError(
"Import failed.",
500
);
}
},
},
@@ -291,7 +343,9 @@ export default class ImportService extends Service {
) {
// 1. Find mongo object by id
// 2. Import payload into sourcedMetadata.comicvine
const comicObjectId = new ObjectId(ctx.params.comicObjectId);
const comicObjectId = new ObjectId(
ctx.params.comicObjectId
);
return new Promise(async (resolve, reject) => {
let volumeDetails = {};
@@ -300,15 +354,18 @@ export default class ImportService extends Service {
const volumeDetails = await this.broker.call(
"comicvine.getVolumes",
{
volumeURI: matchedResult.volume.api_detail_url,
volumeURI:
matchedResult.volume.api_detail_url,
}
);
matchedResult.volumeInformation = volumeDetails.results;
matchedResult.volumeInformation =
volumeDetails.results;
Comic.findByIdAndUpdate(
comicObjectId,
{
$set: {
"sourcedMetadata.comicvine": matchedResult,
"sourcedMetadata.comicvine":
matchedResult,
},
},
{ new: true },
@@ -339,7 +396,9 @@ export default class ImportService extends Service {
}>
) {
console.log(JSON.stringify(ctx.params, null, 2));
const comicObjectId = new ObjectId(ctx.params.comicObjectId);
const comicObjectId = new ObjectId(
ctx.params.comicObjectId
);
return new Promise((resolve, reject) => {
Comic.findByIdAndUpdate(
@@ -366,6 +425,66 @@ export default class ImportService extends Service {
});
},
},
applyTorrentDownloadMetadata: {
rest: "POST /applyTorrentDownloadMetadata",
handler: async (
ctx: Context<{
torrentToDownload: any;
comicObjectId: String;
infoHash: String;
name: String;
announce: [String];
}>
) => {
const {
name,
torrentToDownload,
comicObjectId,
announce,
infoHash,
} = ctx.params;
console.log(JSON.stringify(ctx.params, null, 4));
try {
return await Comic.findByIdAndUpdate(
new ObjectId(comicObjectId),
{
$push: {
"acquisition.torrent": {
infoHash,
name,
announce,
},
},
},
{ new: true, safe: true, upsert: true }
);
} catch (err) {
console.log(err);
}
},
},
getInfoHashes: {
rest: "GET /getInfoHashes",
handler: async (ctx: Context<{}>) => {
try {
return await Comic.aggregate([
{
$unwind: "$acquisition.torrent",
},
{
$group: {
_id: "$_id",
infoHashes: {
$push: "$acquisition.torrent.infoHash",
},
},
},
]);
} catch (err) {
return err;
}
},
},
getComicBooks: {
rest: "POST /getComicBooks",
params: {},
@@ -385,6 +504,7 @@ export default class ImportService extends Service {
rest: "POST /getComicBookById",
params: { id: "string" },
async handler(ctx: Context<{ id: string }>) {
console.log(ctx.params.id);
return await Comic.findById(ctx.params.id);
},
},
@@ -393,7 +513,9 @@ export default class ImportService extends Service {
params: { ids: "array" },
handler: async (ctx: Context<{ ids: [string] }>) => {
console.log(ctx.params.ids);
const queryIds = ctx.params.ids.map((id) => new ObjectId(id));
const queryIds = ctx.params.ids.map(
(id) => new ObjectId(id)
);
return await Comic.find({
_id: {
$in: queryIds,
@@ -409,7 +531,8 @@ export default class ImportService extends Service {
const volumes = await Comic.aggregate([
{
$project: {
volumeInfo: "$sourcedMetadata.comicvine.volumeInformation",
volumeInfo:
"$sourcedMetadata.comicvine.volumeInformation",
},
},
{
@@ -455,46 +578,52 @@ export default class ImportService extends Service {
const { queryObjects } = ctx.params;
// construct the query for ElasticSearch
let elasticSearchQuery = {};
const elasticSearchQueries = queryObjects.map((queryObject) => {
console.log("Volume: ", queryObject.volumeName);
console.log("Issue: ", queryObject.issueName);
if (queryObject.issueName === null) {
queryObject.issueName = "";
}
if (queryObject.volumeName === null) {
queryObject.volumeName = "";
}
elasticSearchQuery = {
bool: {
must: [
{
match_phrase: {
"rawFileDetails.name": queryObject.volumeName,
const elasticSearchQueries = queryObjects.map(
(queryObject) => {
console.log("Volume: ", queryObject.volumeName);
console.log("Issue: ", queryObject.issueName);
if (queryObject.issueName === null) {
queryObject.issueName = "";
}
if (queryObject.volumeName === null) {
queryObject.volumeName = "";
}
elasticSearchQuery = {
bool: {
must: [
{
match_phrase: {
"rawFileDetails.name":
queryObject.volumeName,
},
},
},
{
term: {
"inferredMetadata.issue.number": parseInt(
queryObject.issueNumber,
10
),
{
term: {
"inferredMetadata.issue.number":
parseInt(
queryObject.issueNumber,
10
),
},
},
},
],
},
};
],
},
};
return [
{
index: "comics",
search_type: "dfs_query_then_fetch",
},
{
query: elasticSearchQuery,
},
];
});
console.log(JSON.stringify(elasticSearchQueries, null, 2));
return [
{
index: "comics",
search_type: "dfs_query_then_fetch",
},
{
query: elasticSearchQuery,
},
];
}
);
console.log(
JSON.stringify(elasticSearchQueries, null, 2)
);
return await ctx.broker.call("search.searchComic", {
elasticSearchQueries,
@@ -507,11 +636,10 @@ export default class ImportService extends Service {
rest: "GET /libraryStatistics",
params: {},
handler: async (ctx: Context<{}>) => {
const comicDirectorySize = await getSizeOfDirectory(COMICS_DIRECTORY, [
".cbz",
".cbr",
".cb7",
]);
const comicDirectorySize = await getSizeOfDirectory(
COMICS_DIRECTORY,
[".cbz", ".cbr", ".cb7"]
);
const totalCount = await Comic.countDocuments({});
const statistics = await Comic.aggregate([
{
@@ -520,7 +648,11 @@ export default class ImportService extends Service {
{
$match: {
"rawFileDetails.extension": {
$in: [".cbr", ".cbz", ".cb7"],
$in: [
".cbr",
".cbz",
".cb7",
],
},
},
},
@@ -534,9 +666,10 @@ export default class ImportService extends Service {
issues: [
{
$match: {
"sourcedMetadata.comicvine.volumeInformation": {
$gt: {},
},
"sourcedMetadata.comicvine.volumeInformation":
{
$gt: {},
},
},
},
{
@@ -599,16 +732,23 @@ export default class ImportService extends Service {
.drop()
.then(async (data) => {
console.info(data);
const coversFolderDeleteResult = fsExtra.emptyDirSync(
path.resolve(`${USERDATA_DIRECTORY}/covers`)
);
const expandedFolderDeleteResult = fsExtra.emptyDirSync(
path.resolve(`${USERDATA_DIRECTORY}/expanded`)
);
const eSIndicesDeleteResult = await ctx.broker.call(
"search.deleteElasticSearchIndices",
{}
);
const coversFolderDeleteResult =
fsExtra.emptyDirSync(
path.resolve(
`${USERDATA_DIRECTORY}/covers`
)
);
const expandedFolderDeleteResult =
fsExtra.emptyDirSync(
path.resolve(
`${USERDATA_DIRECTORY}/expanded`
)
);
const eSIndicesDeleteResult =
await ctx.broker.call(
"search.deleteElasticSearchIndices",
{}
);
return {
data,
coversFolderDeleteResult,

View File

@@ -75,9 +75,9 @@ export default class SettingsService extends Service {
) => {
try {
console.log(ctx.params);
const { query, pagination } = ctx.params;
const { query, pagination, type } = ctx.params;
let eSQuery = {};
switch (ctx.params.type) {
switch (type) {
case "all":
Object.assign(eSQuery, {
match_all: {},

View File

@@ -8,7 +8,7 @@ import {
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Settings from "../models/settings.model";
import { isEmpty, pickBy, identity, map } from "lodash";
import { isEmpty, pickBy, identity, map, isNil } from "lodash";
const ObjectId = require("mongoose").Types.ObjectId;
export default class SettingsService extends Service {
@@ -28,12 +28,31 @@ export default class SettingsService extends Service {
rest: "GET /getAllSettings",
params: {},
async handler(ctx: Context<{ settingsKey: string }>) {
const settings = await Settings.find({});
if (isEmpty(settings)) {
const { settingsKey } = ctx.params;
// Initialize a projection object. Include everything by default.
let projection = settingsKey
? { _id: 0, [settingsKey]: 1 }
: {};
// Find the settings with the dynamic projection
const settings = await Settings.find({}, projection);
if (settings.length === 0) {
return {};
}
console.log(settings[0]);
return settings[0];
// If settingsKey is provided, return the specific part of the settings.
// Otherwise, return the entire settings document.
if (settingsKey) {
// Check if the specific key exists in the settings document.
// Since `settings` is an array, we access the first element.
// Then, we use the settingsKey to return only that part of the document.
return settings[0][settingsKey] || {};
} else {
// Return the entire settings document
return settings[0];
}
},
},
@@ -42,44 +61,106 @@ export default class SettingsService extends Service {
params: {},
async handler(
ctx: Context<{
settingsPayload: {
host: object;
airDCPPUserSettings: object;
hubs: [];
settingsPayload?: {
protocol: string;
hostname: string;
port: string;
username: string;
password: string;
_id?: string;
airDCPPUserSettings?: object;
hubs?: [];
};
settingsObjectId: string;
settingsObjectId?: string;
settingsKey: string;
}>
) {
console.log("varan bhat", ctx.params);
const { host, airDCPPUserSettings, hubs } =
ctx.params.settingsPayload;
let query = {
host,
airDCPPUserSettings,
hubs,
};
const keysToUpdate = pickBy(query, identity);
let updateQuery = {};
try {
let query = {};
const { settingsKey, settingsObjectId } =
ctx.params;
const {
hostname,
protocol,
port,
username,
password,
} = ctx.params.settingsPayload;
const host = {
hostname,
protocol,
port,
username,
password,
};
const undefinedPropsInHostname = Object.values(
host
).filter((value) => value === undefined);
map(Object.keys(keysToUpdate), (key) => {
updateQuery[`directConnect.client.${key}`] =
query[key];
});
const options = {
upsert: true,
new: true,
setDefaultsOnInsert: true,
};
const filter = {
_id: new ObjectId(ctx.params.settingsObjectId),
};
const result = Settings.findOneAndUpdate(
filter,
{ $set: updateQuery },
options
);
// Update, depending what key was passed in params
// 1. Construct the update query
switch (settingsKey) {
case "bittorrent":
console.log(
`Recieved settings for ${settingsKey}, building query...`
);
query = {
...(undefinedPropsInHostname.length ===
0 && {
$set: {
"bittorrent.client.host": host,
},
}),
};
break;
case "directConnect":
console.log(
`Recieved settings for ${settingsKey}, building query...`
);
const { hubs, airDCPPUserSettings } =
ctx.params.settingsPayload;
query = {
...(undefinedPropsInHostname.length ===
0 && {
$set: {
"directConnect.client.host":
host,
},
}),
...(!isNil(hubs) && {
$set: {
"directConnect.client.hubs":
hubs,
},
}),
};
console.log(JSON.stringify(query, null, 4));
break;
return result;
default:
return false;
}
// 2. Set up options, filters
const options = {
upsert: true,
setDefaultsOnInsert: true,
returnDocument: "after",
};
const filter = settingsObjectId
? { _id: settingsObjectId }
: {};
// 3. Execute the mongo query
const result = await Settings.findOneAndUpdate(
filter,
query,
options
);
return result;
} catch (err) {
return err;
}
},
},
deleteSettings: {

View File

@@ -26,89 +26,7 @@ export default class SocketService extends Service {
"/": {
events: {
call: {
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
},
action: async (data) => {
switch (data.type) {
case "RESUME_SESSION":
console.log("Attempting to resume session...");
try {
const sessionRecord = await Session.find({
sessionId: data.session.sessionId,
});
// 1. Check for sessionId's existence, and a match
if (
sessionRecord.length !== 0 &&
sessionRecord[0].sessionId ===
data.session.sessionId
) {
// 2. Find if the queue has active jobs
const jobs: JobType = await this.broker.call(
"jobqueue.getJobCountsByType",
{}
);
const { active } = jobs;
if (active > 0) {
// 3. Get job counts
const completedJobCount =
await pubClient.get(
"completedJobCount"
);
const failedJobCount = await pubClient.get(
"failedJobCount"
);
// 4. Send the counts to the active socket.io session
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
args: [
{
type: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION",
completedJobCount,
failedJobCount,
queueStatus: "running",
},
],
});
}
}
} catch (err) {
throw new MoleculerError(
err,
500,
"SESSION_ID_NOT_FOUND",
{
data: data.session.sessionId,
}
);
}
break;
case "LS_SET_QUEUE_STATUS":
console.log(data);
await this.broker.call(
"jobqueue.toggle",
{ action: data.data.queueAction },
{}
);
break;
case "LS_SINGLE_IMPORT":
console.info("AirDC++ finished a download -> ");
console.log(data);
await this.broker.call(
"library.importDownloadedComic",
{ bundle: data },
{}
);
break;
// uncompress archive events
case "COMICBOOK_EXTRACTION_SUCCESS":
console.log(data);
return data;
}
whitelist: ["socket.*"],
},
},
},
@@ -119,7 +37,84 @@ export default class SocketService extends Service {
},
},
hooks: {},
actions: {},
actions: {
resumeSession: async (ctx: Context<{ sessionId: string }>) => {
const { sessionId } = ctx.params;
console.log("Attempting to resume session...");
try {
const sessionRecord = await Session.find({
sessionId,
});
// 1. Check for sessionId's existence, and a match
if (
sessionRecord.length !== 0 &&
sessionRecord[0].sessionId === sessionId
) {
// 2. Find if the queue has active, paused or waiting jobs
const jobs: JobType = await this.broker.call(
"jobqueue.getJobCountsByType",
{}
);
const { active, paused, waiting } = jobs;
if (active > 0 || paused > 0 || waiting > 0) {
// 3. Get job counts
const completedJobCount = await pubClient.get(
"completedJobCount"
);
const failedJobCount = await pubClient.get(
"failedJobCount"
);
// 4. Send the counts to the active socket.io session
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION",
args: [
{
completedJobCount,
failedJobCount,
queueStatus: "running",
},
],
});
}
}
} catch (err) {
throw new MoleculerError(
err,
500,
"SESSION_ID_NOT_FOUND",
{
data: sessionId,
}
);
}
},
setQueueStatus: async (
ctx: Context<{
queueAction: string;
queueStatus: string;
}>
) => {
const { queueAction } = ctx.params;
await this.broker.call(
"jobqueue.toggle",
{ action: queueAction },
{}
);
},
importSingleIssue: async (ctx: Context<{}>) => {
console.info("AirDC++ finished a download -> ");
console.log(ctx.params);
// await this.broker.call(
// "library.importDownloadedComic",
// { bundle: data },
// {}
// );
},
},
methods: {},
async started() {
this.io.on("connection", async (socket) => {
@@ -146,10 +141,7 @@ export default class SocketService extends Service {
}
// 2. else, retrieve it from Mongo and "resume" the socket.io connection
else {
console.log(
`Found socketId ${socket.id}, attempting to resume socket.io connection...`
);
console.log(socket.handshake.query.sessionId);
console.log(`Found socketId ${socket.id}, no-op.`);
}
});
},

View File

@@ -0,0 +1,101 @@
"use strict";
import axios from "axios";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
const ObjectId = require("mongoose").Types.ObjectId;
import { isNil, isUndefined } from "lodash";
import BullMqMixin from "moleculer-bullmq";
const { MoleculerError } = require("moleculer").Errors;
export default class ImageTransformation extends Service {
// @ts-ignore
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "imagetransformation" }
) {
super(broker);
this.parseServiceSchema({
name: "torrentjobs",
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: process.env.REDIS_URI,
},
},
hooks: {},
actions: {
getTorrentData: {
queue: true,
rest: "GET /getTorrentData",
handler: async (ctx: Context<{ trigger: string }>) => {
const { trigger } = ctx.params;
console.log(`Recieved ${trigger} as the trigger...`);
const jobOptions = {
jobId: "retrieveTorrentData",
name: "bossy",
repeat: {
every: 10000, // Repeat every 10000 ms
limit: 100, // Limit to 100 repeats
},
};
const job = await this.localQueue(
ctx,
"fetchTorrentData",
ctx.params,
jobOptions
);
return job;
},
},
fetchTorrentData: {
rest: "GET /fetchTorrentData",
handler: async (
ctx: Context<{
birdName: String;
}>
) => {
const repeatableJob = await this.$resolve(
"torrentjobs"
).getRepeatableJobs();
console.info(repeatableJob);
console.info(
`Scheduled job for fetching torrent data fired.`
);
// 1. query mongo for infohashes
const infoHashes = await this.broker.call(
"library.getInfoHashes",
{}
);
// 2. query qbittorrent to see if they exist
const torrents: any = await this.broker.call(
"qbittorrent.getTorrentRealTimeStats",
{ infoHashes }
);
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "AS_TORRENT_DATA",
args: [
{
torrents,
},
],
});
// 3. If they do, don't do anything
// 4. If they don't purge them from mongo
},
},
},
methods: {},
});
}
}

View File

@@ -74,7 +74,7 @@ const errors = [];
*/
export const extractComicInfoXMLFromRar = async (
filePath: string,
mimeType: string,
mimeType: string
): Promise<any> => {
try {
// Create the target directory
@@ -210,7 +210,7 @@ export const extractComicInfoXMLFromRar = async (
export const extractComicInfoXMLFromZip = async (
filePath: string,
mimeType: string,
mimeType: string
): Promise<any> => {
try {
// Create the target directory
@@ -357,11 +357,17 @@ export const extractFromArchive = async (filePath: string) => {
switch (mimeType) {
case "application/x-7z-compressed; charset=binary":
case "application/zip; charset=binary":
const cbzResult = await extractComicInfoXMLFromZip(filePath, mimeType);
const cbzResult = await extractComicInfoXMLFromZip(
filePath,
mimeType
);
return Object.assign({}, ...cbzResult);
case "application/x-rar; charset=binary":
const cbrResult = await extractComicInfoXMLFromRar(filePath, mimeType);
const cbrResult = await extractComicInfoXMLFromRar(
filePath,
mimeType
);
return Object.assign({}, ...cbrResult);
default:
@@ -369,9 +375,8 @@ export const extractFromArchive = async (filePath: string) => {
"Error inferring filetype for comicinfo.xml extraction."
);
throw new MoleculerError({}, 500, "FILETYPE_INFERENCE_ERROR", {
data: { message: "Cannot infer filetype."},
data: { message: "Cannot infer filetype." },
});
}
};