54 Commits

Author SHA1 Message Date
94cb95f4bf 📚 Changes to CV model 2024-04-14 00:25:41 -04:00
c6651cdd91 Merge pull request #9 from rishighan/qbittorrent-settings
🏗️ Added torrent attrs to comic model
2024-03-30 21:40:02 -04:00
b35e2140b5 🧲 Created a dedicated queue for torrent ops 2024-03-29 19:36:16 -04:00
f053dcb789 🧲 Massaging data to be sent to UI 2024-03-27 22:22:40 -05:00
aea7a24f76 🧲 Added a job for deleted torrents clean-up 2024-03-24 17:31:31 -04:00
8f0c2f4302 ⚙️ getAllSettings is parameterized 2024-03-12 16:39:44 -05:00
7dbe2b2701 🏗️ Added torrent attrs to comic model 2024-03-03 12:22:40 -05:00
5b9ef9fbbb Merge pull request #8 from rishighan/qbittorrent-settings
Miscellaneous Settings
2024-01-08 16:42:54 -05:00
4cdb11fcbd Cleaned the console.logs 2024-01-08 16:40:12 -05:00
78f7c1b595 🤐 Added uncompression event 2024-01-07 22:13:02 -05:00
bbd2906ebf 🏗️ Added some archive-related keys to Comic model 2024-01-06 11:17:40 -05:00
1861c2eeed Merge pull request #7 from rishighan/qbittorrent-settings
🌊 Modified settings model schema
2023-12-30 00:52:17 -05:00
f3965437b5 🏗 Added a job for full archive extraction 2023-12-30 00:50:06 -05:00
78e0e9f8ce 🏗️ Refactored the searchIssue method 2023-12-28 22:52:33 -05:00
c926758db6 🏗️ Added a downloads array to bittorent schema 2023-12-20 00:08:38 -05:00
b2b35aedc0 🏗️ Fixed a mongo update query 2023-11-27 02:14:16 -05:00
f35e3ccbe0 Removed useless code 2023-11-15 16:02:07 -06:00
7b0c0a7420 Added the importSingleIssue action 2023-11-15 15:59:27 -06:00
c2bbbf311d 🏗️ Fixed setQueueStatus 2023-11-14 13:24:49 -06:00
b8ca03220f 🏗 Implemented setQueueStatus 2023-11-13 22:01:01 -05:00
b87b0c875d 🏗️ Fleshed out resumeSession event 2023-11-13 21:18:19 -05:00
11fbaf10db 🏗 Wired up the events correctly 2023-11-13 16:41:58 -05:00
1229feb69c 🏗️ Refactor for zustand and tanstack react query support 2023-11-09 10:22:45 -06:00
3efdc7c2e2 ⚙️ Refactored saveSettings endpoint 2023-09-15 15:49:13 -04:00
1fff931941 🌊 Modified settings model schema 2023-09-13 22:09:25 -05:00
f4e2db5a5f 📦 Instruction for paths for unrar and p7zip 2023-09-01 09:44:02 -05:00
1d7561279b 📕 Updated local dev instructions in README 2023-09-01 09:22:02 -05:00
9e47ae0436 Merge pull request #6 from rishighan/migration-to-bullmq
🐂 Migration to moleculer-bullMQ
2023-08-30 13:50:47 -04:00
b1b1cb527b 🔧 Moved moleculer-bullmq to dependencies 2023-08-30 13:16:58 -04:00
cfa09691e8 🔧 Fixed an errant condition
This error was because I checked for active AND prioritized jobs in BullMQ, when none existed, because everything was active, and the socket.io event never fired, causing the browser to be in a bad state and never "resuming" an import even when one was in progress.
2023-08-30 12:21:43 -04:00
356b093db9 🔧 Fixed a dep 2023-08-30 00:08:05 -04:00
b4b83e5c75 🔧 Reworked the jobResults aggregation 2023-08-29 23:58:06 -04:00
c718456adc 🔧 jobResult aggregate query first draft 2023-08-28 23:56:44 -04:00
76d4e6b10f 🔢 Persisting the sessionId in the JobResult 2023-08-27 20:25:04 -04:00
bde548695c 🔧 Refactored the getJobResultStatistics method 2023-08-24 23:45:51 -04:00
fd4dd1d5c4 🥭 Aggregation for jobResult 2023-08-24 23:18:27 -04:00
5540bb16ec ⏱️ Added a timestamp to job results schema 2023-08-24 09:06:38 -05:00
6ee609f2b9 🔧 Refactor and added getJobCounts 2023-08-23 11:47:47 -05:00
8b584080e2 🐂 Queue controls 2023-08-22 22:07:51 -05:00
01975079e3 🐂 Queue drain event 2023-08-22 05:20:24 -04:00
fe9fbe9c3a 🔢 Getting job counts 2023-08-22 00:04:47 -04:00
df6652cce9 🐂 Queue pause/resume functionality 2023-08-21 17:55:08 -04:00
e5fc879b2d 🐂 Added some job counters 2023-08-18 11:39:18 -04:00
625447e7f1 🧊 Added shared Redis config 2023-08-14 22:15:19 -04:00
fdcf1f7d68 🧹 Linted code 2023-08-14 19:45:40 -04:00
4003f666cf 🔧 Tooling for resumable socket.io sessions 2023-07-27 11:09:26 -07:00
7b855f8cf1 🐂 BullMQ support code 2023-07-13 08:02:12 -07:00
007ce4b66f 🏗️ Applying the refactor patc 2023-07-05 23:14:46 -04:00
cb84e4893f 🐂 Migration to moleculer-bullMQ 2023-06-29 14:16:58 -04:00
795ac561c7 🔼 Updated deps 2023-04-19 09:14:22 -04:00
175e01dc2d Added elasticsearch dep 2023-04-09 15:53:36 -04:00
66e0a26c68 Merge pull request #4 from elgohr-update/master 2023-04-04 17:46:31 -04:00
Lars Gohr
959d248273 Updated elgohr/Publish-Docker-Github-Action to a supported version (v5) 2023-03-30 06:52:23 +02:00
745ec5d774 Merge pull request #3 from rishighan/mimetype-check
 MIMEtype check for comic book archives
2023-03-23 23:59:30 -04:00
20 changed files with 3625 additions and 2943 deletions

View File

@@ -12,7 +12,7 @@ jobs:
steps:
- uses: actions/checkout@master
- name: Publish to Registry
uses: elgohr/Publish-Docker-Github-Action@master
uses: elgohr/Publish-Docker-Github-Action@v5
with:
name: frishi/threetwo-core-service
username: ${{ secrets.DOCKER_USERNAME }}

View File

@@ -3,21 +3,35 @@
This [moleculer-based](https://github.com/moleculerjs/moleculer-web) microservice houses endpoints for the following functions:
1. Local import of a comic library into mongo (currently supports `cbr` and `cbz` files)
2. Metadata extraction from file, `comicinfo.xml`
2. Metadata extraction from file, `comicinfo.xml`
3. Mongo comic object orchestration
4. CRUD operations on `Comic` model
5. Helper utils to help with image metadata extraction, file operations and more.
## Local Development
1. ~~You need `calibre` in your local path.
On `macOS` you can `brew install calibre` and make sure that `ebook-meta` is present on the path~~ Calibre is no longer required as a dependency. Ignore this step.
2. You need `mongo` for the data store. on `macOS` you can use [these instructions](https://docs.mongodb.com/manual/tutorial/install-mongodb-on-os-x/) to install it
1. You need the following dependencies installed: `mongo`, `elasticsearch` and `redis`
2. You also need binaries for `unrar` and `p7zip`
3. Clone this repo
4. Run `npm i`
5. Assuming you installed mongo correctly, run `MONGO_URI=mongodb://localhost:27017/threetwo npm run dev` to start the service
5. Assuming you installed the dependencies correctly, run:
```
COMICS_DIRECTORY=<PATH_TO_COMICS_DIRECTORY> \
USERDATA_DIRECTORY=<PATH_TO_USERDATA_DIRECTORY> \
REDIS_URI=redis://<REDIS_HOST:REDIS_PORT> \
ELASTICSEARCH_URI=<ELASTICSEARCH_HOST:ELASTICSEARCH_PORT> \
MONGO_URI=mongodb://<MONGO_HOST:MONGO_PORT>/threetwo \
UNRAR_BIN_PATH=<UNRAR_BIN_PATH> \
SEVENZ_BINARY_PATH=<SEVENZ_BINARY_PATH> \
npm run dev
```
to start the service
6. You should see the service spin up and a list of all the endpoints in the terminal
7. The service can be accessed through `http://localhost:3000/api/import/*`
7. The service can be accessed through `http://localhost:3000/api/<serviceName>/*`
## Docker Instructions
1. Build the image using `docker build . -t frishi/threetwo-import-service`. Give it a hot minute.

10
config/redis.config.ts Normal file
View File

@@ -0,0 +1,10 @@
import { createClient } from "redis";
const redisURL = new URL(process.env.REDIS_URI);
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
(async () => {
await pubClient.connect();
})();
const subClient = pubClient.duplicate();
export { subClient, pubClient };

View File

@@ -1,8 +1,7 @@
const paginate = require("mongoose-paginate-v2");
const { Client } = require("@elastic/elasticsearch");
import ComicVineMetadataSchema from "./comicvine.metadata.model";
import { mongoosastic } from "mongoosastic-ts";
const mongoose = require("mongoose")
const mongoose = require("mongoose");
import {
MongoosasticDocument,
MongoosasticModel,
@@ -28,6 +27,10 @@ const RawFileDetailsSchema = mongoose.Schema({
mimeType: String,
containedIn: String,
pageCount: Number,
archive: {
uncompressed: Boolean,
expandedPath: String,
},
cover: {
filePath: String,
stats: Object,
@@ -52,6 +55,29 @@ const DirectConnectBundleSchema = mongoose.Schema({
size: String,
type: {},
});
const wantedSchema = new mongoose.Schema({
source: { type: String, default: null },
markEntireVolumeWanted: Boolean,
issues: {
type: [
{
id: Number,
url: String,
image: { type: Array, default: [] },
},
],
default: null, // Set default to null for issues
},
volume: {
type: {
id: Number,
url: String,
image: { type: Array, default: [] },
name: String,
},
default: null, // Set default to null for volume
},
});
const ComicSchema = mongoose.Schema(
{
@@ -68,7 +94,7 @@ const ComicSchema = mongoose.Schema(
sourcedMetadata: {
comicInfo: { type: mongoose.Schema.Types.Mixed, default: {} },
comicvine: {
type: ComicVineMetadataSchema,
type: Object,
es_indexed: true,
default: {},
},
@@ -98,11 +124,9 @@ const ComicSchema = mongoose.Schema(
subtitle: { type: String, es_indexed: true },
},
},
wanted: wantedSchema,
acquisition: {
source: {
wanted: Boolean,
name: String,
},
release: {},
directconnect: {
downloads: {
@@ -111,12 +135,13 @@ const ComicSchema = mongoose.Schema(
default: [],
},
},
torrent: {
sourceApplication: String,
magnet: String,
tracker: String,
status: String,
},
torrent: [
{
infoHash: String,
name: String,
announce: [String],
},
],
usenet: {
sourceApplication: String,
},

View File

@@ -1,95 +0,0 @@
const mongoose = require("mongoose");
const Things = mongoose.Schema({
_id: false,
api_detail_url: String,
id: Number,
name: String,
site_detail_url: String,
count: String,
});
const Issue = mongoose.Schema({
_id: false,
api_detail_url: String,
id: Number,
name: String,
issue_number: String,
});
const VolumeInformation = mongoose.Schema({
_id: false,
aliases: [String],
api_detail_url: String,
characters: [Things],
concepts: [Things],
count_of_issues: String,
date_added: String,
date_last_updated: String,
deck: String,
description: String,
first_issue: Issue,
id: Number,
image: {
icon_url: String,
medium_url: String,
screen_url: String,
screen_large_url: String,
small_url: String,
super_url: String,
thumb_url: String,
tiny_url: String,
original_url: String,
image_tags: String,
},
issues: [
{
api_detail_url: String,
id: Number,
name: String,
issue_number: String,
site_detail_url: String,
},
],
last_issue: Issue,
locations: [Things],
name: String,
objects: [Things],
people: [Things],
publisher: {
api_detail_url: String,
id: Number,
name: String,
},
site_detail_url: String,
start_year: String,
});
const ComicVineMetadataSchema = mongoose.Schema({
_id: false,
aliases: [String],
api_detail_url: String,
has_staff_review: { type: mongoose.Schema.Types.Mixed },
cover_date: Date,
date_added: String,
date_last_updated: String,
deck: String,
description: String,
image: {
icon_url: String,
medium_url: String,
screen_url: String,
screen_large_url: String,
small_url: String,
super_url: String,
thumb_url: String,
tiny_url: String,
original_url: String,
image_tags: String,
},
id: Number,
name: String,
resource_type: String,
volumeInformation: VolumeInformation,
});
export default ComicVineMetadataSchema;

12
models/jobresult.model.ts Normal file
View File

@@ -0,0 +1,12 @@
const mongoose = require("mongoose");
const JobResultScehma = mongoose.Schema({
id: Number,
status: String,
sessionId: String,
failedReason: Object,
timestamp: Date,
});
const JobResult = mongoose.model("JobResult", JobResultScehma);
export default JobResult;

9
models/session.model.ts Normal file
View File

@@ -0,0 +1,9 @@
const mongoose = require("mongoose");
const SessionScehma = mongoose.Schema({
sessionId: String,
socketId: String,
});
const Session = mongoose.model("Session", SessionScehma);
export default Session;

View File

@@ -1,21 +1,34 @@
const mongoose = require("mongoose");
const paginate = require("mongoose-paginate-v2");
const HostSchema = mongoose.Schema({
_id: false,
username: String,
password: String,
hostname: String,
port: String,
protocol: String,
});
const SettingsScehma = mongoose.Schema({
directConnect: {
client: {
host: {
username: String,
password: String,
hostname: String,
port: String,
protocol: String,
},
host: HostSchema,
airDCPPUserSettings: Object,
hubs: Array,
},
},
bittorrent: {
client: {
name: String,
host: HostSchema,
},
},
prowlarr: {
client: {
host: HostSchema,
apiKey: String,
},
},
});
const Settings = mongoose.model("Settings", SettingsScehma);

4846
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -20,7 +20,6 @@
],
"author": "Rishi Ghan",
"devDependencies": {
"@elastic/elasticsearch": "^8.6.0",
"@types/lodash": "^4.14.168",
"@typescript-eslint/eslint-plugin": "^5.56.0",
"@typescript-eslint/parser": "^5.56.0",
@@ -35,15 +34,16 @@
"npm": "^8.4.1",
"ts-jest": "^29.0.5",
"ts-node": "^10.9.1",
"typescript": "^5.0.2"
"typescript": "^5.0.2",
"uuid": "^9.0.0"
},
"dependencies": {
"@npcz/magic": "^1.3.14",
"redis": "^4.6.5",
"@socket.io/redis-adapter": "^8.1.0",
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
"@elastic/elasticsearch": "^8.6.0",
"@jorgeferrero/stream-to-buffer": "^2.0.6",
"@npcz/magic": "^1.3.14",
"@root/walk": "^1.1.0",
"@socket.io/redis-adapter": "^8.1.0",
"@types/jest": "^27.4.1",
"@types/mkdirp": "^1.0.0",
"@types/node": "^13.9.8",
@@ -65,8 +65,7 @@
"leven": "^3.1.0",
"lodash": "^4.17.21",
"mkdirp": "^0.5.5",
"moleculer": "^0.14.29",
"moleculer-bull": "github:rishighan/moleculer-bull#1.0.0",
"moleculer-bullmq": "^3.0.0",
"moleculer-db": "^0.8.23",
"moleculer-db-adapter-mongoose": "^0.9.2",
"moleculer-io": "^2.2.0",
@@ -77,6 +76,7 @@
"nats": "^1.3.2",
"opds-extra": "^3.0.9",
"p7zip-threetwo": "^1.0.4",
"redis": "^4.6.5",
"sanitize-filename-ts": "^1.0.2",
"sharp": "^0.30.4",
"threetwo-ui-typings": "^1.0.14",

View File

@@ -7,6 +7,8 @@ import {
ServiceSchema,
Errors,
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import path from "path";
import {
analyze,
@@ -22,16 +24,13 @@ export default class ImageTransformation extends Service {
super(broker);
this.parseServiceSchema({
name: "imagetransformation",
mixins: [],
mixins: [DbMixin("comics", Comic)],
settings: {
// Available fields in the responses
fields: ["_id", "name", "quantity", "price"],
fields: ["_id"],
// Validator for the `create` & `insert` actions.
entityValidator: {
name: "string|min:3",
price: "number|positive",
},
entityValidator: {},
},
hooks: {},
actions: {

View File

@@ -1,291 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2022 Rishi Ghan
*
The MIT License (MIT)
Copyright (c) 2015 Rishi Ghan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/*
* Revision History:
* Initial: 2022/01/28 Rishi Ghan
*/
"use strict";
import { refineQuery } from "filename-parser";
import { isNil, isUndefined } from "lodash";
import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
import BullMQMixin, { SandboxedJob } from "moleculer-bull";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import {
extractFromArchive,
uncompressEntireArchive,
} from "../utils/uncompression.utils";
const REDIS_URI = process.env.REDIS_URI || `redis://localhost:6379`;
const EventEmitter = require("events");
EventEmitter.defaultMaxListeners = 20;
console.log(`REDIS -> ${REDIS_URI}`);
export default class QueueService extends Service {
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "importqueue" }
) {
super(broker);
this.parseServiceSchema({
name: "importqueue",
mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
settings: {
bullmq: {
maxStalledCount: 0,
},
},
hooks: {},
queues: {
"process.import": {
concurrency: 10,
async process(job: SandboxedJob) {
console.info("New job received!", job.data);
console.info(`Processing queue...`);
// extract the cover
const result = await extractFromArchive(
job.data.fileObject.filePath
);
const {
name,
filePath,
fileSize,
extension,
mimeType,
cover,
containedIn,
comicInfoJSON,
} = result;
// Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(
result.name
);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
);
// Add the bundleId, if present to the payload
let bundleId = null;
if (!isNil(job.data.bundleId)) {
bundleId = job.data.bundleId;
}
// Orchestrate the payload
const payload = {
importStatus: {
isImported: true,
tagged: false,
matchedResult: {
score: "0",
},
},
rawFileDetails: {
name,
filePath,
fileSize,
extension,
mimeType,
containedIn,
cover,
},
inferredMetadata: {
issue: inferredIssueDetails,
},
sourcedMetadata: {
// except for ComicInfo.xml, everything else should be copied over from the
// parent comic
comicInfo: comicInfoJSON,
},
// since we already have at least 1 copy
// mark it as not wanted by default
"acquisition.source.wanted": false,
// clear out the downloads array
// "acquisition.directconnect.downloads": [],
// mark the metadata source
"acquisition.source.name": job.data.sourcedFrom,
};
// Add the sourcedMetadata, if present
if (!isNil(job.data.sourcedMetadata) && !isUndefined(job.data.sourcedMetadata.comicvine)) {
Object.assign(
payload.sourcedMetadata,
job.data.sourcedMetadata
);
}
// write to mongo
const importResult = await this.broker.call(
"library.rawImportToDB",
{
importType: job.data.importType,
bundleId,
payload,
}
);
return {
data: {
importResult,
},
id: job.id,
worker: process.pid,
};
},
},
"process.uncompressAndResize": {
concurrency: 2,
async process(job: SandboxedJob) {
console.log(`Initiating uncompression job...`);
return await uncompressEntireArchive(
job.data.filePath,
job.data.options
);
},
},
},
actions: {
uncompressResize: {
rest: "POST /uncompressResize",
params: {},
async handler(
ctx: Context<{
data: { filePath: string; options: any };
}>
) {
return await this.createJob(
"process.uncompressAndResize",
ctx.params
);
},
},
processImport: {
rest: "POST /processImport",
params: {},
async handler(
ctx: Context<{
fileObject: object;
importType: string;
bundleId: number;
sourcedFrom?: string;
sourcedMetadata: object;
}>
) {
return await this.createJob("process.import", {
fileObject: ctx.params.fileObject,
importType: ctx.params.importType,
bundleId: ctx.params.bundleId,
sourcedFrom: ctx.params.sourcedFrom,
sourcedMetadata: ctx.params.sourcedMetadata,
});
},
},
toggleImportQueue: {
rest: "POST /pauseImportQueue",
params: {},
handler: async (ctx: Context<{ action: string }>) => {
switch (ctx.params.action) {
case "pause":
const foo = await this.getQueue(
"process.import"
).pause();
console.log("paused", foo);
return foo;
case "resume":
const soo = await this.getQueue(
"process.import"
).resume();
console.log("resumed", soo);
return soo;
default:
console.log("Unrecognized queue action.");
}
},
},
},
methods: {},
async started(): Promise<any> {
await this.getQueue("process.import").on(
"failed",
async (job, error) => {
console.error(
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
);
console.error(job.data);
}
);
await this.getQueue("process.import").on(
"completed",
async (job, res) => {
await this.broker.call("socket.broadcast", {
namespace: "/", //optional
event: "action",
args: [{ type: "LS_COVER_EXTRACTED", result: res }], //optional
});
console.info(
`Import Job with the id '${job.id}' completed.`
);
}
);
await this.getQueue("process.import").on(
"stalled",
async (job) => {
console.warn(`Import job '${job.id} stalled!`);
console.log(`${JSON.stringify(job, null, 2)}`);
console.log(`is stalled.`);
}
);
await this.getQueue("process.uncompressAndResize").on(
"completed",
async (job, res) => {
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
args: [
{
type: "COMICBOOK_EXTRACTION_SUCCESS",
result: {
files: res,
purpose: job.data.options.purpose,
},
},
],
});
console.info(`Uncompression Job ${job.id} completed.`);
}
);
},
});
}
}

View File

@@ -0,0 +1,444 @@
import { Context, Service, ServiceBroker } from "moleculer";
import JobResult from "../models/jobresult.model";
import { refineQuery } from "filename-parser";
import BullMqMixin from "moleculer-bullmq";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
const ObjectId = require("mongoose").Types.ObjectId;
import {
extractFromArchive,
uncompressEntireArchive,
} from "../utils/uncompression.utils";
import { isNil, isUndefined } from "lodash";
import { pubClient } from "../config/redis.config";
import path from "path";
const { MoleculerError } = require("moleculer").Errors;
console.log(process.env.REDIS_URI);
export default class JobQueueService extends Service {
public constructor(public broker: ServiceBroker) {
super(broker);
this.parseServiceSchema({
name: "jobqueue",
hooks: {},
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: process.env.REDIS_URI,
},
},
actions: {
getJobCountsByType: {
rest: "GET /getJobCountsByType",
handler: async (ctx: Context<{}>) => {
console.log(ctx.params);
return await this.$resolve("jobqueue").getJobCounts();
},
},
toggle: {
rest: "GET /toggle",
handler: async (ctx: Context<{ action: String }>) => {
switch (ctx.params.action) {
case "pause":
this.pause();
break;
case "resume":
this.resume();
break;
default:
console.log(`Unknown queue action.`);
}
},
},
enqueue: {
queue: true,
rest: "GET /enqueue",
handler: async (
ctx: Context<{ action: string; description: string }>
) => {
const { action, description } = ctx.params;
// Enqueue the job
const job = await this.localQueue(
ctx,
action,
ctx.params,
{
priority: 10,
}
);
console.log(`Job ${job.id} enqueued`);
console.log(`${description}`);
return job.id;
},
},
// Comic Book Import Job Queue
"enqueue.async": {
handler: async (
ctx: Context<{
sessionId: String;
}>
) => {
try {
console.log(
`Recieved Job ID ${ctx.locals.job.id}, processing...`
);
// 1. De-structure the job params
const { fileObject } = ctx.locals.job.data.params;
// 2. Extract metadata from the archive
const result = await extractFromArchive(
fileObject.filePath
);
const {
name,
filePath,
fileSize,
extension,
mimeType,
cover,
containedIn,
comicInfoJSON,
} = result;
// 3a. Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(
result.name
);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
);
// 3b. Orchestrate the payload
const payload = {
importStatus: {
isImported: true,
tagged: false,
matchedResult: {
score: "0",
},
},
rawFileDetails: {
name,
filePath,
fileSize,
extension,
mimeType,
containedIn,
cover,
},
inferredMetadata: {
issue: inferredIssueDetails,
},
sourcedMetadata: {
// except for ComicInfo.xml, everything else should be copied over from the
// parent comic
comicInfo: comicInfoJSON,
},
// since we already have at least 1 copy
// mark it as not wanted by default
"acquisition.source.wanted": false,
// clear out the downloads array
// "acquisition.directconnect.downloads": [],
// mark the metadata source
"acquisition.source.name":
ctx.locals.job.data.params.sourcedFrom,
};
// 3c. Add the bundleId, if present to the payload
let bundleId = null;
if (!isNil(ctx.locals.job.data.params.bundleId)) {
bundleId = ctx.locals.job.data.params.bundleId;
}
// 3d. Add the sourcedMetadata, if present
if (
!isNil(
ctx.locals.job.data.params.sourcedMetadata
) &&
!isUndefined(
ctx.locals.job.data.params.sourcedMetadata
.comicvine
)
) {
Object.assign(
payload.sourcedMetadata,
ctx.locals.job.data.params.sourcedMetadata
);
}
// 4. write to mongo
const importResult = await this.broker.call(
"library.rawImportToDB",
{
importType:
ctx.locals.job.data.params.importType,
bundleId,
payload,
}
);
return {
data: {
importResult,
},
id: ctx.locals.job.id,
sessionId: ctx.params.sessionId,
};
} catch (error) {
console.error(
`An error occurred processing Job ID ${ctx.locals.job.id}`
);
throw new MoleculerError(
error,
500,
"IMPORT_JOB_ERROR",
{
data: ctx.params.sessionId,
}
);
}
},
},
getJobResultStatistics: {
rest: "GET /getJobResultStatistics",
handler: async (ctx: Context<{}>) => {
return await JobResult.aggregate([
{
$group: {
_id: {
sessionId: "$sessionId",
status: "$status",
},
earliestTimestamp: {
$min: "$timestamp",
},
count: {
$sum: 1,
},
},
},
{
$group: {
_id: "$_id.sessionId",
statuses: {
$push: {
status: "$_id.status",
earliestTimestamp:
"$earliestTimestamp",
count: "$count",
},
},
},
},
{
$project: {
_id: 0,
sessionId: "$_id",
completedJobs: {
$reduce: {
input: "$statuses",
initialValue: 0,
in: {
$sum: [
"$$value",
{
$cond: [
{
$eq: [
"$$this.status",
"completed",
],
},
"$$this.count",
0,
],
},
],
},
},
},
failedJobs: {
$reduce: {
input: "$statuses",
initialValue: 0,
in: {
$sum: [
"$$value",
{
$cond: [
{
$eq: [
"$$this.status",
"failed",
],
},
"$$this.count",
0,
],
},
],
},
},
},
earliestTimestamp: {
$min: "$statuses.earliestTimestamp",
},
},
},
]);
},
},
"uncompressFullArchive.async": {
rest: "POST /uncompressFullArchive",
handler: async (
ctx: Context<{
filePath: string;
comicObjectId: string;
options: any;
}>
) => {
console.log(
`Recieved Job ID ${JSON.stringify(
ctx.locals
)}, processing...`
);
const { filePath, options, comicObjectId } = ctx.params;
const comicId = new ObjectId(comicObjectId);
// 2. Extract metadata from the archive
const result: string[] = await uncompressEntireArchive(
filePath,
options
);
if (Array.isArray(result) && result.length !== 0) {
// Get the containing directory of the uncompressed archive
const directoryPath = path.dirname(result[0]);
// Add to mongo object
await Comic.findByIdAndUpdate(
comicId,
{
$set: {
"rawFileDetails.archive": {
uncompressed: true,
expandedPath: directoryPath,
},
},
},
{ new: true, safe: true, upsert: true }
);
return result;
}
},
},
},
events: {
async "uncompressFullArchive.async.active"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} is set to active.`
);
},
async "uncompressFullArchive.async.completed"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} completed.`
);
const job = await this.job(ctx.params.id);
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_UNCOMPRESSION_JOB_COMPLETE",
args: [
{
uncompressedArchive: job.returnvalue,
},
],
});
return job.returnvalue;
},
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
console.log(`Job ID ${ctx.params.id} is set to active.`);
},
async drained(ctx) {
console.log("Queue drained.");
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_IMPORT_QUEUE_DRAINED",
args: [
{
message: "drained",
},
],
});
},
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
// 1. Fetch the job result using the job Id
const job = await this.job(ctx.params.id);
// 2. Increment the completed job counter
await pubClient.incr("completedJobCount");
// 3. Fetch the completed job count for the final payload to be sent to the client
const completedJobCount = await pubClient.get(
"completedJobCount"
);
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_COVER_EXTRACTED",
args: [
{
completedJobCount,
importResult: job.returnvalue.data.importResult,
},
],
});
// 5. Persist the job results in mongo for posterity
await JobResult.create({
id: ctx.params.id,
status: "completed",
timestamp: job.timestamp,
sessionId: job.returnvalue.sessionId,
failedReason: {},
});
console.log(`Job ID ${ctx.params.id} completed.`);
},
async "enqueue.async.failed"(ctx) {
const job = await this.job(ctx.params.id);
await pubClient.incr("failedJobCount");
const failedJobCount = await pubClient.get(
"failedJobCount"
);
await JobResult.create({
id: ctx.params.id,
status: "failed",
failedReason: job.failedReason,
sessionId: job.data.params.sessionId,
timestamp: job.timestamp,
});
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_COVER_EXTRACTION_FAILED",
args: [
{
failedJobCount,
importResult: job,
},
],
});
},
},
methods: {},
});
}
}

View File

@@ -51,6 +51,7 @@ import {
IExtractionOptions,
} from "threetwo-ui-typings";
const ObjectId = require("mongoose").Types.ObjectId;
import { pubClient } from "../config/redis.config";
import fsExtra from "fs-extra";
const through2 = require("through2");
import klaw from "klaw";
@@ -66,16 +67,32 @@ export default class ImportService extends Service {
mixins: [DbMixin("comics", Comic)],
hooks: {},
actions: {
getHealthInformation: {
rest: "GET /getHealthInformation",
params: {},
handler: async (ctx: Context<{}>) => {
try {
return await ctx.broker.call("$node.services");
} catch (error) {
return new Error("Service is down.");
}
},
},
walkFolders: {
rest: "POST /walkFolders",
params: {
basePathToWalk: "string",
},
async handler(ctx: Context<{ basePathToWalk: string }>) {
params: {},
async handler(
ctx: Context<{
basePathToWalk: string;
extensions: string[];
}>
) {
console.log(ctx.params);
return await walkFolder(ctx.params.basePathToWalk, [
".cbz",
".cbr",
".cb7",
...ctx.params.extensions,
]);
},
},
@@ -90,11 +107,18 @@ export default class ImportService extends Service {
rest: "POST /uncompressFullArchive",
params: {},
handler: async (
ctx: Context<{ filePath: string; options: any }>
ctx: Context<{
filePath: string;
comicObjectId: string;
options: any;
}>
) => {
await broker.call("importqueue.uncompressResize", {
this.broker.call("jobqueue.enqueue", {
filePath: ctx.params.filePath,
comicObjectId: ctx.params.comicObjectId,
options: ctx.params.options,
action: "uncompressFullArchive.async",
description: `Job for uncompressing archive at ${ctx.params.filePath}`,
});
},
},
@@ -139,64 +163,81 @@ export default class ImportService extends Service {
},
newImport: {
rest: "POST /newImport",
params: {},
// params: {},
async handler(
ctx: Context<{
extractionOptions?: any;
sessionId: string;
}>
) {
// 1. Walk the Source folder
klaw(path.resolve(COMICS_DIRECTORY))
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
let fileExtension = path.extname(item.path);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
this.push(item);
}
next();
})
)
// 1.2 Pipe filtered results to the next step
.on("data", async (item) => {
console.info(
"Found a file at path: %s",
item.path
);
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
path.extname(item.path)
)}`,
});
if (!comicExists) {
// 2. Send the extraction job to the queue
await broker.call(
"importqueue.processImport",
{
try {
// Get params to be passed to the import jobs
const { sessionId } = ctx.params;
// 1. Walk the Source folder
klaw(path.resolve(COMICS_DIRECTORY))
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
let fileExtension = path.extname(
item.path
);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
this.push(item);
}
next();
})
)
// 1.2 Pipe filtered results to the next step
// Enqueue the job in the queue
.on("data", async (item) => {
console.info(
"Found a file at path: %s",
item.path
);
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
path.extname(item.path)
)}`,
});
if (!comicExists) {
// 2.1 Reset the job counters in Redis
await pubClient.set(
"completedJobCount",
0
);
await pubClient.set(
"failedJobCount",
0
);
// 2.2 Send the extraction job to the queue
this.broker.call("jobqueue.enqueue", {
fileObject: {
filePath: item.path,
fileSize: item.stats.size,
},
sessionId,
importType: "new",
}
);
} else {
console.log(
"Comic already exists in the library."
);
}
})
.on("end", () => {
console.log("All files traversed.");
});
action: "enqueue.async",
});
} else {
console.log(
"Comic already exists in the library."
);
}
})
.on("end", () => {
console.log("All files traversed.");
});
} catch (error) {
console.log(error);
}
},
},
rawImportToDB: {
rest: "POST /rawImportToDB",
params: {},
@@ -219,11 +260,13 @@ export default class ImportService extends Service {
rawFileDetails: {
name: string;
};
wanted: {
issues: [];
volume: {};
source: string;
markEntireVolumeWanted: Boolean;
};
acquisition: {
source: {
wanted: boolean;
name?: string;
};
directconnect: {
downloads: [];
};
@@ -234,27 +277,6 @@ export default class ImportService extends Service {
try {
let volumeDetails;
const comicMetadata = ctx.params.payload;
// When an issue is added from the search CV feature
// we solicit volume information and add that to mongo
if (
comicMetadata.sourcedMetadata.comicvine &&
!isNil(
comicMetadata.sourcedMetadata.comicvine
.volume
)
) {
volumeDetails = await this.broker.call(
"comicvine.getVolumes",
{
volumeURI:
comicMetadata.sourcedMetadata
.comicvine.volume
.api_detail_url,
}
);
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
volumeDetails.results;
}
console.log("Saving to Mongo...");
console.log(
@@ -262,6 +284,7 @@ export default class ImportService extends Service {
);
switch (ctx.params.importType) {
case "new":
console.log(comicMetadata);
return await Comic.create(comicMetadata);
case "update":
return await Comic.findOneAndUpdate(
@@ -383,6 +406,66 @@ export default class ImportService extends Service {
});
},
},
applyTorrentDownloadMetadata: {
rest: "POST /applyTorrentDownloadMetadata",
handler: async (
ctx: Context<{
torrentToDownload: any;
comicObjectId: String;
infoHash: String;
name: String;
announce: [String];
}>
) => {
const {
name,
torrentToDownload,
comicObjectId,
announce,
infoHash,
} = ctx.params;
console.log(JSON.stringify(ctx.params, null, 4));
try {
return await Comic.findByIdAndUpdate(
new ObjectId(comicObjectId),
{
$push: {
"acquisition.torrent": {
infoHash,
name,
announce,
},
},
},
{ new: true, safe: true, upsert: true }
);
} catch (err) {
console.log(err);
}
},
},
getInfoHashes: {
rest: "GET /getInfoHashes",
handler: async (ctx: Context<{}>) => {
try {
return await Comic.aggregate([
{
$unwind: "$acquisition.torrent",
},
{
$group: {
_id: "$_id",
infoHashes: {
$push: "$acquisition.torrent.infoHash",
},
},
},
]);
} catch (err) {
return err;
}
},
},
getComicBooks: {
rest: "POST /getComicBooks",
params: {},
@@ -402,6 +485,7 @@ export default class ImportService extends Service {
rest: "POST /getComicBookById",
params: { id: "string" },
async handler(ctx: Context<{ id: string }>) {
console.log(ctx.params.id);
return await Comic.findById(ctx.params.id);
},
},

View File

@@ -46,14 +46,15 @@ export default class SettingsService extends Service {
.map((item) => JSON.stringify(item))
.join("\n");
queries += "\n";
const { body } = await eSClient.msearch({
const { responses } = await eSClient.msearch({
body: queries,
});
body.responses.forEach((match) => {
responses.forEach((match) => {
console.log(match.hits);
});
return body.responses;
return responses;
},
},
issue: {
@@ -74,9 +75,9 @@ export default class SettingsService extends Service {
) => {
try {
console.log(ctx.params);
const { query, pagination } = ctx.params;
const { query, pagination, type } = ctx.params;
let eSQuery = {};
switch (ctx.params.type) {
switch (type) {
case "all":
Object.assign(eSQuery, {
match_all: {},
@@ -99,12 +100,19 @@ export default class SettingsService extends Service {
case "wanted":
Object.assign(eSQuery, {
bool: {
must: {
term: {
"acquisition.source.wanted":
true,
should: [
{
exists: {
field: "wanted.issues",
},
},
},
{
exists: {
field: "wanted.volume",
},
},
],
minimum_should_match: 1,
},
});
break;

View File

@@ -8,7 +8,7 @@ import {
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Settings from "../models/settings.model";
import { isEmpty, pickBy, identity, map } from "lodash";
import { isEmpty, pickBy, identity, map, isNil } from "lodash";
const ObjectId = require("mongoose").Types.ObjectId;
export default class SettingsService extends Service {
@@ -28,12 +28,31 @@ export default class SettingsService extends Service {
rest: "GET /getAllSettings",
params: {},
async handler(ctx: Context<{ settingsKey: string }>) {
const settings = await Settings.find({});
if (isEmpty(settings)) {
const { settingsKey } = ctx.params;
// Initialize a projection object. Include everything by default.
let projection = settingsKey
? { _id: 0, [settingsKey]: 1 }
: {};
// Find the settings with the dynamic projection
const settings = await Settings.find({}, projection);
if (settings.length === 0) {
return {};
}
console.log(settings[0]);
return settings[0];
// If settingsKey is provided, return the specific part of the settings.
// Otherwise, return the entire settings document.
if (settingsKey) {
// Check if the specific key exists in the settings document.
// Since `settings` is an array, we access the first element.
// Then, we use the settingsKey to return only that part of the document.
return settings[0][settingsKey] || {};
} else {
// Return the entire settings document
return settings[0];
}
},
},
@@ -42,44 +61,106 @@ export default class SettingsService extends Service {
params: {},
async handler(
ctx: Context<{
settingsPayload: {
host: object;
airDCPPUserSettings: object;
hubs: [];
settingsPayload?: {
protocol: string;
hostname: string;
port: string;
username: string;
password: string;
_id?: string;
airDCPPUserSettings?: object;
hubs?: [];
};
settingsObjectId: string;
settingsObjectId?: string;
settingsKey: string;
}>
) {
console.log("varan bhat", ctx.params);
const { host, airDCPPUserSettings, hubs } =
ctx.params.settingsPayload;
let query = {
host,
airDCPPUserSettings,
hubs,
};
const keysToUpdate = pickBy(query, identity);
let updateQuery = {};
try {
let query = {};
const { settingsKey, settingsObjectId } =
ctx.params;
const {
hostname,
protocol,
port,
username,
password,
} = ctx.params.settingsPayload;
const host = {
hostname,
protocol,
port,
username,
password,
};
const undefinedPropsInHostname = Object.values(
host
).filter((value) => value === undefined);
map(Object.keys(keysToUpdate), (key) => {
updateQuery[`directConnect.client.${key}`] =
query[key];
});
const options = {
upsert: true,
new: true,
setDefaultsOnInsert: true,
};
const filter = {
_id: new ObjectId(ctx.params.settingsObjectId),
};
const result = Settings.findOneAndUpdate(
filter,
{ $set: updateQuery },
options
);
// Update, depending what key was passed in params
// 1. Construct the update query
switch (settingsKey) {
case "bittorrent":
console.log(
`Recieved settings for ${settingsKey}, building query...`
);
query = {
...(undefinedPropsInHostname.length ===
0 && {
$set: {
"bittorrent.client.host": host,
},
}),
};
break;
case "directConnect":
console.log(
`Recieved settings for ${settingsKey}, building query...`
);
const { hubs, airDCPPUserSettings } =
ctx.params.settingsPayload;
query = {
...(undefinedPropsInHostname.length ===
0 && {
$set: {
"directConnect.client.host":
host,
},
}),
...(!isNil(hubs) && {
$set: {
"directConnect.client.hubs":
hubs,
},
}),
};
console.log(JSON.stringify(query, null, 4));
break;
return result;
default:
return false;
}
// 2. Set up options, filters
const options = {
upsert: true,
setDefaultsOnInsert: true,
returnDocument: "after",
};
const filter = settingsObjectId
? { _id: settingsObjectId }
: {};
// 3. Execute the mongo query
const result = await Settings.findOneAndUpdate(
filter,
query,
options
);
return result;
} catch (err) {
return err;
}
},
},
deleteSettings: {

View File

@@ -1,16 +1,14 @@
"use strict";
import { Service, ServiceBroker, ServiceSchema } from "moleculer";
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
import { JobType } from "moleculer-bullmq";
import { createClient } from "redis";
import { createAdapter } from "@socket.io/redis-adapter";
import Session from "../models/session.model";
import { pubClient, subClient } from "../config/redis.config";
const { MoleculerError } = require("moleculer").Errors;
const SocketIOService = require("moleculer-io");
const redisURL = new URL(process.env.REDIS_URI);
// console.log(redisURL.hostname);
const { v4: uuidv4 } = require("uuid");
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
(async () => {
await pubClient.connect();
})();
const subClient = pubClient.duplicate();
export default class SocketService extends Service {
// @ts-ignore
public constructor(
@@ -28,45 +26,7 @@ export default class SocketService extends Service {
"/": {
events: {
call: {
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
},
action: async (data, ack) => {
// write your handler function here.
switch (data.type) {
case "LS_IMPORT":
console.log(
`Recieved ${data.type} event.`
);
// 1. Send task to queue
await this.broker.call(
"library.newImport",
data.data,
{}
);
break;
case "LS_TOGGLE_IMPORT_QUEUE":
await this.broker.call(
"importqueue.toggleImportQueue",
data.data,
{}
);
break;
case "LS_SINGLE_IMPORT":
console.info(
"AirDC++ finished a download -> "
);
console.log(data);
await this.broker.call(
"library.importDownloadedComic",
{ bundle: data },
{}
);
break;
// uncompress archive events
case "COMICBOOK_EXTRACTION_SUCCESS":
console.log(data);
return data;
}
whitelist: ["socket.*"],
},
},
},
@@ -77,12 +37,113 @@ export default class SocketService extends Service {
},
},
hooks: {},
actions: {},
actions: {
resumeSession: async (ctx: Context<{ sessionId: string }>) => {
const { sessionId } = ctx.params;
console.log("Attempting to resume session...");
try {
const sessionRecord = await Session.find({
sessionId,
});
// 1. Check for sessionId's existence, and a match
if (
sessionRecord.length !== 0 &&
sessionRecord[0].sessionId === sessionId
) {
// 2. Find if the queue has active, paused or waiting jobs
const jobs: JobType = await this.broker.call(
"jobqueue.getJobCountsByType",
{}
);
const { active, paused, waiting } = jobs;
if (active > 0 || paused > 0 || waiting > 0) {
// 3. Get job counts
const completedJobCount = await pubClient.get(
"completedJobCount"
);
const failedJobCount = await pubClient.get(
"failedJobCount"
);
// 4. Send the counts to the active socket.io session
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION",
args: [
{
completedJobCount,
failedJobCount,
queueStatus: "running",
},
],
});
}
}
} catch (err) {
throw new MoleculerError(
err,
500,
"SESSION_ID_NOT_FOUND",
{
data: sessionId,
}
);
}
},
setQueueStatus: async (
ctx: Context<{
queueAction: string;
queueStatus: string;
}>
) => {
const { queueAction } = ctx.params;
await this.broker.call(
"jobqueue.toggle",
{ action: queueAction },
{}
);
},
importSingleIssue: async (ctx: Context<{}>) => {
console.info("AirDC++ finished a download -> ");
console.log(ctx.params);
// await this.broker.call(
// "library.importDownloadedComic",
// { bundle: data },
// {}
// );
},
},
methods: {},
async started() {
this.io.on("connection", (data) =>
console.log("socket.io server initialized.")
);
this.io.on("connection", async (socket) => {
console.log(
`socket.io server connected to client with session ID: ${socket.id}`
);
console.log("Looking up sessionId in Mongo...");
const sessionIdExists = await Session.find({
sessionId: socket.handshake.query.sessionId,
});
// 1. if sessionId isn't found in Mongo, create one and persist it
if (sessionIdExists.length === 0) {
console.log(
`Socket Id ${socket.id} not found in Mongo, creating a new session...`
);
const sessionId = uuidv4();
socket.sessionId = sessionId;
console.log(`Saving session ${sessionId} to Mongo...`);
await Session.create({
sessionId,
socketId: socket.id,
});
socket.emit("sessionInitialized", sessionId);
}
// 2. else, retrieve it from Mongo and "resume" the socket.io connection
else {
console.log(`Found socketId ${socket.id}, no-op.`);
}
});
},
});
}

View File

@@ -0,0 +1,98 @@
"use strict";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import BullMqMixin from "moleculer-bullmq";
const { MoleculerError } = require("moleculer").Errors;
export default class ImageTransformation extends Service {
// @ts-ignore
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "imagetransformation" }
) {
super(broker);
this.parseServiceSchema({
name: "torrentjobs",
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: process.env.REDIS_URI,
},
},
hooks: {},
actions: {
getTorrentData: {
queue: true,
rest: "GET /getTorrentData",
handler: async (ctx: Context<{ trigger: string }>) => {
const { trigger } = ctx.params;
console.log(`Recieved ${trigger} as the trigger...`);
const jobOptions = {
jobId: "retrieveTorrentData",
name: "bossy",
repeat: {
every: 10000, // Repeat every 10000 ms
limit: 100, // Limit to 100 repeats
},
};
const job = await this.localQueue(
ctx,
"fetchTorrentData",
ctx.params,
jobOptions
);
return job;
},
},
fetchTorrentData: {
rest: "GET /fetchTorrentData",
handler: async (
ctx: Context<{
birdName: String;
}>
) => {
const repeatableJob = await this.$resolve(
"torrentjobs"
).getRepeatableJobs();
console.info(repeatableJob);
console.info(
`Scheduled job for fetching torrent data fired.`
);
// 1. query mongo for infohashes
const infoHashes = await this.broker.call(
"library.getInfoHashes",
{}
);
// 2. query qbittorrent to see if they exist
const torrents: any = await this.broker.call(
"qbittorrent.getTorrentRealTimeStats",
{ infoHashes }
);
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "AS_TORRENT_DATA",
args: [
{
torrents,
},
],
});
// 3. If they do, don't do anything
// 4. If they don't purge them from mongo
},
},
},
methods: {},
});
}
}

View File

@@ -21,7 +21,7 @@ const ALLOWED_IMAGE_FILE_FORMATS = [".jpg", ".jpeg", ".png"];
// Tell FileMagic where to find the magic.mgc file
FileMagic.magicFile = require.resolve("@npcz/magic/dist/magic.mgc");
// We can onlu use MAGIC_PRESERVE_ATIME on operating suystems that support
// We can only use MAGIC_PRESERVE_ATIME on operating suystems that support
// it and that includes OS X for example. It's a good practice as we don't
// want to change the last access time because we are just checking the file
// contents type
@@ -108,6 +108,14 @@ export const isValidImageFileExtension = (fileName: string): boolean => {
return includes(ALLOWED_IMAGE_FILE_FORMATS, path.extname(fileName));
};
/**
* This function constructs paths for a target extraction folder and an input file based on extraction
* options and a walked folder.
* @param {IExtractionOptions} extractionOptions - An object containing options for the extraction
* process, such as the target extraction folder.
* @param {IFolderData} walkedFolder - `walkedFolder` is an object that represents a folder that has
* been walked through during a file extraction process. It contains the following properties:
*/
export const constructPaths = (
extractionOptions: IExtractionOptions,
walkedFolder: IFolderData
@@ -142,7 +150,7 @@ export const getFileConstituents = (filePath: string) => {
};
/**
* Method that infers MIME type from a filepath
* Method that infers MIME type from a filepath
* @param {string} filePath
* @returns {Promise} string
*/
@@ -155,6 +163,15 @@ export const getMimeType = async (filePath: string) => {
});
};
/**
* This function creates a directory at a specified path using the fse.ensureDir method and throws an
* error if it fails.
* @param {any} options - The options parameter is an optional object that can be passed to the
* fse.ensureDir method to configure its behavior. It can include properties such as mode, which sets
* the permissions of the directory, and fs, which specifies the file system module to use.
* @param {string} directoryPath - The `directoryPath` parameter is a string that represents the path
* of the directory that needs to be created.
*/
export const createDirectory = async (options: any, directoryPath: string) => {
try {
await fse.ensureDir(directoryPath, options);

View File

@@ -47,6 +47,7 @@ import {
getMimeType,
} from "../utils/file.utils";
import { convertXMLToJSON } from "./xml.utils";
const { MoleculerError } = require("moleculer").Errors;
const fse = require("fs-extra");
const Unrar = require("unrar");
interface RarFile {
@@ -73,7 +74,7 @@ const errors = [];
*/
export const extractComicInfoXMLFromRar = async (
filePath: string,
mimeType: string,
mimeType: string
): Promise<any> => {
try {
// Create the target directory
@@ -209,7 +210,7 @@ export const extractComicInfoXMLFromRar = async (
export const extractComicInfoXMLFromZip = async (
filePath: string,
mimeType: string,
mimeType: string
): Promise<any> => {
try {
// Create the target directory
@@ -254,7 +255,7 @@ export const extractComicInfoXMLFromZip = async (
// Push the first file (cover) to our extraction target
extractionTargets.push(files[0].name);
filesToWriteToDisk.coverFile = path.basename(files[0].name);
if (!isEmpty(comicInfoXMLFileObject)) {
filesToWriteToDisk.comicInfoXML = comicInfoXMLFileObject[0].name;
extractionTargets.push(filesToWriteToDisk.comicInfoXML);
@@ -356,18 +357,26 @@ export const extractFromArchive = async (filePath: string) => {
switch (mimeType) {
case "application/x-7z-compressed; charset=binary":
case "application/zip; charset=binary":
const cbzResult = await extractComicInfoXMLFromZip(filePath, mimeType);
const cbzResult = await extractComicInfoXMLFromZip(
filePath,
mimeType
);
return Object.assign({}, ...cbzResult);
case "application/x-rar; charset=binary":
const cbrResult = await extractComicInfoXMLFromRar(filePath, mimeType);
const cbrResult = await extractComicInfoXMLFromRar(
filePath,
mimeType
);
return Object.assign({}, ...cbrResult);
default:
console.log(
console.error(
"Error inferring filetype for comicinfo.xml extraction."
);
break;
throw new MoleculerError({}, 500, "FILETYPE_INFERENCE_ERROR", {
data: { message: "Cannot infer filetype." },
});
}
};