Compare commits
50 Commits
mimetype-c
...
qbittorren
| Author | SHA1 | Date | |
|---|---|---|---|
| b35e2140b5 | |||
| f053dcb789 | |||
| aea7a24f76 | |||
| 8f0c2f4302 | |||
| 7dbe2b2701 | |||
| 4cdb11fcbd | |||
| 78f7c1b595 | |||
| bbd2906ebf | |||
| f3965437b5 | |||
| 78e0e9f8ce | |||
| c926758db6 | |||
| b2b35aedc0 | |||
| f35e3ccbe0 | |||
| 7b0c0a7420 | |||
| c2bbbf311d | |||
| b8ca03220f | |||
| b87b0c875d | |||
| 11fbaf10db | |||
| 1229feb69c | |||
| 3efdc7c2e2 | |||
| 1fff931941 | |||
| f4e2db5a5f | |||
| 1d7561279b | |||
| 9e47ae0436 | |||
| b1b1cb527b | |||
| cfa09691e8 | |||
| 356b093db9 | |||
| b4b83e5c75 | |||
| c718456adc | |||
| 76d4e6b10f | |||
| bde548695c | |||
| fd4dd1d5c4 | |||
| 5540bb16ec | |||
| 6ee609f2b9 | |||
| 8b584080e2 | |||
| 01975079e3 | |||
| fe9fbe9c3a | |||
| df6652cce9 | |||
| e5fc879b2d | |||
| 625447e7f1 | |||
| fdcf1f7d68 | |||
| 4003f666cf | |||
| 7b855f8cf1 | |||
| 007ce4b66f | |||
| cb84e4893f | |||
| 795ac561c7 | |||
| 175e01dc2d | |||
| 66e0a26c68 | |||
|
|
959d248273 | ||
| 745ec5d774 |
2
.github/workflows/main.yml
vendored
2
.github/workflows/main.yml
vendored
@@ -12,7 +12,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@master
|
||||
- name: Publish to Registry
|
||||
uses: elgohr/Publish-Docker-Github-Action@master
|
||||
uses: elgohr/Publish-Docker-Github-Action@v5
|
||||
with:
|
||||
name: frishi/threetwo-core-service
|
||||
username: ${{ secrets.DOCKER_USERNAME }}
|
||||
|
||||
26
README.md
26
README.md
@@ -3,21 +3,35 @@
|
||||
This [moleculer-based](https://github.com/moleculerjs/moleculer-web) microservice houses endpoints for the following functions:
|
||||
|
||||
1. Local import of a comic library into mongo (currently supports `cbr` and `cbz` files)
|
||||
2. Metadata extraction from file, `comicinfo.xml`
|
||||
2. Metadata extraction from file, `comicinfo.xml`
|
||||
3. Mongo comic object orchestration
|
||||
4. CRUD operations on `Comic` model
|
||||
5. Helper utils to help with image metadata extraction, file operations and more.
|
||||
|
||||
## Local Development
|
||||
|
||||
1. ~~You need `calibre` in your local path.
|
||||
On `macOS` you can `brew install calibre` and make sure that `ebook-meta` is present on the path~~ Calibre is no longer required as a dependency. Ignore this step.
|
||||
2. You need `mongo` for the data store. on `macOS` you can use [these instructions](https://docs.mongodb.com/manual/tutorial/install-mongodb-on-os-x/) to install it
|
||||
1. You need the following dependencies installed: `mongo`, `elasticsearch` and `redis`
|
||||
2. You also need binaries for `unrar` and `p7zip`
|
||||
3. Clone this repo
|
||||
4. Run `npm i`
|
||||
5. Assuming you installed mongo correctly, run `MONGO_URI=mongodb://localhost:27017/threetwo npm run dev` to start the service
|
||||
5. Assuming you installed the dependencies correctly, run:
|
||||
|
||||
```
|
||||
COMICS_DIRECTORY=<PATH_TO_COMICS_DIRECTORY> \
|
||||
USERDATA_DIRECTORY=<PATH_TO_USERDATA_DIRECTORY> \
|
||||
REDIS_URI=redis://<REDIS_HOST:REDIS_PORT> \
|
||||
ELASTICSEARCH_URI=<ELASTICSEARCH_HOST:ELASTICSEARCH_PORT> \
|
||||
MONGO_URI=mongodb://<MONGO_HOST:MONGO_PORT>/threetwo \
|
||||
UNRAR_BIN_PATH=<UNRAR_BIN_PATH> \
|
||||
SEVENZ_BINARY_PATH=<SEVENZ_BINARY_PATH> \
|
||||
npm run dev
|
||||
```
|
||||
|
||||
to start the service
|
||||
|
||||
6. You should see the service spin up and a list of all the endpoints in the terminal
|
||||
7. The service can be accessed through `http://localhost:3000/api/import/*`
|
||||
7. The service can be accessed through `http://localhost:3000/api/<serviceName>/*`
|
||||
|
||||
## Docker Instructions
|
||||
|
||||
1. Build the image using `docker build . -t frishi/threetwo-import-service`. Give it a hot minute.
|
||||
|
||||
10
config/redis.config.ts
Normal file
10
config/redis.config.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import { createClient } from "redis";
|
||||
const redisURL = new URL(process.env.REDIS_URI);
|
||||
|
||||
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
|
||||
(async () => {
|
||||
await pubClient.connect();
|
||||
})();
|
||||
const subClient = pubClient.duplicate();
|
||||
|
||||
export { subClient, pubClient };
|
||||
@@ -2,7 +2,7 @@ const paginate = require("mongoose-paginate-v2");
|
||||
const { Client } = require("@elastic/elasticsearch");
|
||||
import ComicVineMetadataSchema from "./comicvine.metadata.model";
|
||||
import { mongoosastic } from "mongoosastic-ts";
|
||||
const mongoose = require("mongoose")
|
||||
const mongoose = require("mongoose");
|
||||
import {
|
||||
MongoosasticDocument,
|
||||
MongoosasticModel,
|
||||
@@ -28,6 +28,10 @@ const RawFileDetailsSchema = mongoose.Schema({
|
||||
mimeType: String,
|
||||
containedIn: String,
|
||||
pageCount: Number,
|
||||
archive: {
|
||||
uncompressed: Boolean,
|
||||
expandedPath: String,
|
||||
},
|
||||
cover: {
|
||||
filePath: String,
|
||||
stats: Object,
|
||||
@@ -111,12 +115,13 @@ const ComicSchema = mongoose.Schema(
|
||||
default: [],
|
||||
},
|
||||
},
|
||||
torrent: {
|
||||
sourceApplication: String,
|
||||
magnet: String,
|
||||
tracker: String,
|
||||
status: String,
|
||||
},
|
||||
torrent: [
|
||||
{
|
||||
infoHash: String,
|
||||
name: String,
|
||||
announce: [String],
|
||||
},
|
||||
],
|
||||
usenet: {
|
||||
sourceApplication: String,
|
||||
},
|
||||
|
||||
12
models/jobresult.model.ts
Normal file
12
models/jobresult.model.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
const mongoose = require("mongoose");
|
||||
|
||||
const JobResultScehma = mongoose.Schema({
|
||||
id: Number,
|
||||
status: String,
|
||||
sessionId: String,
|
||||
failedReason: Object,
|
||||
timestamp: Date,
|
||||
});
|
||||
|
||||
const JobResult = mongoose.model("JobResult", JobResultScehma);
|
||||
export default JobResult;
|
||||
9
models/session.model.ts
Normal file
9
models/session.model.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
const mongoose = require("mongoose");
|
||||
|
||||
const SessionScehma = mongoose.Schema({
|
||||
sessionId: String,
|
||||
socketId: String,
|
||||
});
|
||||
|
||||
const Session = mongoose.model("Session", SessionScehma);
|
||||
export default Session;
|
||||
@@ -1,21 +1,34 @@
|
||||
const mongoose = require("mongoose");
|
||||
const paginate = require("mongoose-paginate-v2");
|
||||
|
||||
const HostSchema = mongoose.Schema({
|
||||
_id: false,
|
||||
username: String,
|
||||
password: String,
|
||||
hostname: String,
|
||||
port: String,
|
||||
protocol: String,
|
||||
});
|
||||
const SettingsScehma = mongoose.Schema({
|
||||
directConnect: {
|
||||
client: {
|
||||
host: {
|
||||
username: String,
|
||||
password: String,
|
||||
hostname: String,
|
||||
port: String,
|
||||
protocol: String,
|
||||
},
|
||||
host: HostSchema,
|
||||
airDCPPUserSettings: Object,
|
||||
|
||||
hubs: Array,
|
||||
},
|
||||
},
|
||||
bittorrent: {
|
||||
client: {
|
||||
name: String,
|
||||
host: HostSchema,
|
||||
},
|
||||
},
|
||||
prowlarr: {
|
||||
client: {
|
||||
host: HostSchema,
|
||||
apiKey: String,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const Settings = mongoose.model("Settings", SettingsScehma);
|
||||
|
||||
4846
package-lock.json
generated
4846
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
14
package.json
14
package.json
@@ -20,7 +20,6 @@
|
||||
],
|
||||
"author": "Rishi Ghan",
|
||||
"devDependencies": {
|
||||
"@elastic/elasticsearch": "^8.6.0",
|
||||
"@types/lodash": "^4.14.168",
|
||||
"@typescript-eslint/eslint-plugin": "^5.56.0",
|
||||
"@typescript-eslint/parser": "^5.56.0",
|
||||
@@ -35,15 +34,16 @@
|
||||
"npm": "^8.4.1",
|
||||
"ts-jest": "^29.0.5",
|
||||
"ts-node": "^10.9.1",
|
||||
"typescript": "^5.0.2"
|
||||
"typescript": "^5.0.2",
|
||||
"uuid": "^9.0.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"@npcz/magic": "^1.3.14",
|
||||
"redis": "^4.6.5",
|
||||
"@socket.io/redis-adapter": "^8.1.0",
|
||||
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
|
||||
"@elastic/elasticsearch": "^8.6.0",
|
||||
"@jorgeferrero/stream-to-buffer": "^2.0.6",
|
||||
"@npcz/magic": "^1.3.14",
|
||||
"@root/walk": "^1.1.0",
|
||||
"@socket.io/redis-adapter": "^8.1.0",
|
||||
"@types/jest": "^27.4.1",
|
||||
"@types/mkdirp": "^1.0.0",
|
||||
"@types/node": "^13.9.8",
|
||||
@@ -65,8 +65,7 @@
|
||||
"leven": "^3.1.0",
|
||||
"lodash": "^4.17.21",
|
||||
"mkdirp": "^0.5.5",
|
||||
"moleculer": "^0.14.29",
|
||||
"moleculer-bull": "github:rishighan/moleculer-bull#1.0.0",
|
||||
"moleculer-bullmq": "^3.0.0",
|
||||
"moleculer-db": "^0.8.23",
|
||||
"moleculer-db-adapter-mongoose": "^0.9.2",
|
||||
"moleculer-io": "^2.2.0",
|
||||
@@ -77,6 +76,7 @@
|
||||
"nats": "^1.3.2",
|
||||
"opds-extra": "^3.0.9",
|
||||
"p7zip-threetwo": "^1.0.4",
|
||||
"redis": "^4.6.5",
|
||||
"sanitize-filename-ts": "^1.0.2",
|
||||
"sharp": "^0.30.4",
|
||||
"threetwo-ui-typings": "^1.0.14",
|
||||
|
||||
@@ -7,6 +7,8 @@ import {
|
||||
ServiceSchema,
|
||||
Errors,
|
||||
} from "moleculer";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
import path from "path";
|
||||
import {
|
||||
analyze,
|
||||
@@ -22,16 +24,13 @@ export default class ImageTransformation extends Service {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "imagetransformation",
|
||||
mixins: [],
|
||||
mixins: [DbMixin("comics", Comic)],
|
||||
settings: {
|
||||
// Available fields in the responses
|
||||
fields: ["_id", "name", "quantity", "price"],
|
||||
fields: ["_id"],
|
||||
|
||||
// Validator for the `create` & `insert` actions.
|
||||
entityValidator: {
|
||||
name: "string|min:3",
|
||||
price: "number|positive",
|
||||
},
|
||||
entityValidator: {},
|
||||
},
|
||||
hooks: {},
|
||||
actions: {
|
||||
|
||||
@@ -1,291 +0,0 @@
|
||||
/*
|
||||
* MIT License
|
||||
*
|
||||
* Copyright (c) 2022 Rishi Ghan
|
||||
*
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2015 Rishi Ghan
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Revision History:
|
||||
* Initial: 2022/01/28 Rishi Ghan
|
||||
*/
|
||||
|
||||
"use strict";
|
||||
|
||||
import { refineQuery } from "filename-parser";
|
||||
import { isNil, isUndefined } from "lodash";
|
||||
import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
|
||||
import BullMQMixin, { SandboxedJob } from "moleculer-bull";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
import {
|
||||
extractFromArchive,
|
||||
uncompressEntireArchive,
|
||||
} from "../utils/uncompression.utils";
|
||||
|
||||
const REDIS_URI = process.env.REDIS_URI || `redis://localhost:6379`;
|
||||
const EventEmitter = require("events");
|
||||
EventEmitter.defaultMaxListeners = 20;
|
||||
|
||||
console.log(`REDIS -> ${REDIS_URI}`);
|
||||
export default class QueueService extends Service {
|
||||
public constructor(
|
||||
public broker: ServiceBroker,
|
||||
schema: ServiceSchema<{}> = { name: "importqueue" }
|
||||
) {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "importqueue",
|
||||
mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
|
||||
settings: {
|
||||
bullmq: {
|
||||
maxStalledCount: 0,
|
||||
},
|
||||
},
|
||||
|
||||
hooks: {},
|
||||
queues: {
|
||||
"process.import": {
|
||||
concurrency: 10,
|
||||
async process(job: SandboxedJob) {
|
||||
console.info("New job received!", job.data);
|
||||
console.info(`Processing queue...`);
|
||||
// extract the cover
|
||||
const result = await extractFromArchive(
|
||||
job.data.fileObject.filePath
|
||||
);
|
||||
const {
|
||||
name,
|
||||
filePath,
|
||||
fileSize,
|
||||
extension,
|
||||
mimeType,
|
||||
cover,
|
||||
containedIn,
|
||||
comicInfoJSON,
|
||||
} = result;
|
||||
|
||||
// Infer any issue-related metadata from the filename
|
||||
const { inferredIssueDetails } = refineQuery(
|
||||
result.name
|
||||
);
|
||||
console.log(
|
||||
"Issue metadata inferred: ",
|
||||
JSON.stringify(inferredIssueDetails, null, 2)
|
||||
);
|
||||
// Add the bundleId, if present to the payload
|
||||
let bundleId = null;
|
||||
if (!isNil(job.data.bundleId)) {
|
||||
bundleId = job.data.bundleId;
|
||||
}
|
||||
|
||||
// Orchestrate the payload
|
||||
const payload = {
|
||||
importStatus: {
|
||||
isImported: true,
|
||||
tagged: false,
|
||||
matchedResult: {
|
||||
score: "0",
|
||||
},
|
||||
},
|
||||
rawFileDetails: {
|
||||
name,
|
||||
filePath,
|
||||
fileSize,
|
||||
extension,
|
||||
mimeType,
|
||||
containedIn,
|
||||
cover,
|
||||
},
|
||||
inferredMetadata: {
|
||||
issue: inferredIssueDetails,
|
||||
},
|
||||
sourcedMetadata: {
|
||||
// except for ComicInfo.xml, everything else should be copied over from the
|
||||
// parent comic
|
||||
comicInfo: comicInfoJSON,
|
||||
},
|
||||
// since we already have at least 1 copy
|
||||
// mark it as not wanted by default
|
||||
"acquisition.source.wanted": false,
|
||||
|
||||
// clear out the downloads array
|
||||
// "acquisition.directconnect.downloads": [],
|
||||
|
||||
// mark the metadata source
|
||||
"acquisition.source.name": job.data.sourcedFrom,
|
||||
};
|
||||
|
||||
// Add the sourcedMetadata, if present
|
||||
if (!isNil(job.data.sourcedMetadata) && !isUndefined(job.data.sourcedMetadata.comicvine)) {
|
||||
Object.assign(
|
||||
payload.sourcedMetadata,
|
||||
job.data.sourcedMetadata
|
||||
);
|
||||
}
|
||||
|
||||
// write to mongo
|
||||
const importResult = await this.broker.call(
|
||||
"library.rawImportToDB",
|
||||
{
|
||||
importType: job.data.importType,
|
||||
bundleId,
|
||||
payload,
|
||||
}
|
||||
);
|
||||
return {
|
||||
data: {
|
||||
importResult,
|
||||
},
|
||||
id: job.id,
|
||||
worker: process.pid,
|
||||
};
|
||||
},
|
||||
},
|
||||
"process.uncompressAndResize": {
|
||||
concurrency: 2,
|
||||
async process(job: SandboxedJob) {
|
||||
console.log(`Initiating uncompression job...`);
|
||||
return await uncompressEntireArchive(
|
||||
job.data.filePath,
|
||||
job.data.options
|
||||
);
|
||||
},
|
||||
},
|
||||
},
|
||||
actions: {
|
||||
uncompressResize: {
|
||||
rest: "POST /uncompressResize",
|
||||
params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
data: { filePath: string; options: any };
|
||||
}>
|
||||
) {
|
||||
return await this.createJob(
|
||||
"process.uncompressAndResize",
|
||||
ctx.params
|
||||
);
|
||||
},
|
||||
},
|
||||
processImport: {
|
||||
rest: "POST /processImport",
|
||||
params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
fileObject: object;
|
||||
importType: string;
|
||||
bundleId: number;
|
||||
sourcedFrom?: string;
|
||||
sourcedMetadata: object;
|
||||
}>
|
||||
) {
|
||||
return await this.createJob("process.import", {
|
||||
fileObject: ctx.params.fileObject,
|
||||
importType: ctx.params.importType,
|
||||
bundleId: ctx.params.bundleId,
|
||||
sourcedFrom: ctx.params.sourcedFrom,
|
||||
sourcedMetadata: ctx.params.sourcedMetadata,
|
||||
});
|
||||
},
|
||||
},
|
||||
toggleImportQueue: {
|
||||
rest: "POST /pauseImportQueue",
|
||||
params: {},
|
||||
handler: async (ctx: Context<{ action: string }>) => {
|
||||
switch (ctx.params.action) {
|
||||
case "pause":
|
||||
const foo = await this.getQueue(
|
||||
"process.import"
|
||||
).pause();
|
||||
console.log("paused", foo);
|
||||
return foo;
|
||||
case "resume":
|
||||
const soo = await this.getQueue(
|
||||
"process.import"
|
||||
).resume();
|
||||
console.log("resumed", soo);
|
||||
return soo;
|
||||
default:
|
||||
console.log("Unrecognized queue action.");
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
async started(): Promise<any> {
|
||||
await this.getQueue("process.import").on(
|
||||
"failed",
|
||||
async (job, error) => {
|
||||
console.error(
|
||||
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
|
||||
);
|
||||
console.error(job.data);
|
||||
}
|
||||
);
|
||||
await this.getQueue("process.import").on(
|
||||
"completed",
|
||||
async (job, res) => {
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/", //optional
|
||||
event: "action",
|
||||
args: [{ type: "LS_COVER_EXTRACTED", result: res }], //optional
|
||||
});
|
||||
console.info(
|
||||
`Import Job with the id '${job.id}' completed.`
|
||||
);
|
||||
}
|
||||
);
|
||||
await this.getQueue("process.import").on(
|
||||
"stalled",
|
||||
async (job) => {
|
||||
console.warn(`Import job '${job.id} stalled!`);
|
||||
console.log(`${JSON.stringify(job, null, 2)}`);
|
||||
console.log(`is stalled.`);
|
||||
}
|
||||
);
|
||||
|
||||
await this.getQueue("process.uncompressAndResize").on(
|
||||
"completed",
|
||||
async (job, res) => {
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "action",
|
||||
args: [
|
||||
{
|
||||
type: "COMICBOOK_EXTRACTION_SUCCESS",
|
||||
result: {
|
||||
files: res,
|
||||
purpose: job.data.options.purpose,
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
console.info(`Uncompression Job ${job.id} completed.`);
|
||||
}
|
||||
);
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
444
services/jobqueue.service.ts
Normal file
444
services/jobqueue.service.ts
Normal file
@@ -0,0 +1,444 @@
|
||||
import { Context, Service, ServiceBroker } from "moleculer";
|
||||
import JobResult from "../models/jobresult.model";
|
||||
import { refineQuery } from "filename-parser";
|
||||
import BullMqMixin from "moleculer-bullmq";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
const ObjectId = require("mongoose").Types.ObjectId;
|
||||
import {
|
||||
extractFromArchive,
|
||||
uncompressEntireArchive,
|
||||
} from "../utils/uncompression.utils";
|
||||
import { isNil, isUndefined } from "lodash";
|
||||
import { pubClient } from "../config/redis.config";
|
||||
import path from "path";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
|
||||
console.log(process.env.REDIS_URI);
|
||||
export default class JobQueueService extends Service {
|
||||
public constructor(public broker: ServiceBroker) {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "jobqueue",
|
||||
hooks: {},
|
||||
mixins: [DbMixin("comics", Comic), BullMqMixin],
|
||||
settings: {
|
||||
bullmq: {
|
||||
client: process.env.REDIS_URI,
|
||||
},
|
||||
},
|
||||
actions: {
|
||||
getJobCountsByType: {
|
||||
rest: "GET /getJobCountsByType",
|
||||
handler: async (ctx: Context<{}>) => {
|
||||
console.log(ctx.params);
|
||||
return await this.$resolve("jobqueue").getJobCounts();
|
||||
},
|
||||
},
|
||||
toggle: {
|
||||
rest: "GET /toggle",
|
||||
handler: async (ctx: Context<{ action: String }>) => {
|
||||
switch (ctx.params.action) {
|
||||
case "pause":
|
||||
this.pause();
|
||||
break;
|
||||
case "resume":
|
||||
this.resume();
|
||||
break;
|
||||
default:
|
||||
console.log(`Unknown queue action.`);
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
enqueue: {
|
||||
queue: true,
|
||||
rest: "GET /enqueue",
|
||||
handler: async (
|
||||
ctx: Context<{ action: string; description: string }>
|
||||
) => {
|
||||
const { action, description } = ctx.params;
|
||||
// Enqueue the job
|
||||
const job = await this.localQueue(
|
||||
ctx,
|
||||
action,
|
||||
ctx.params,
|
||||
{
|
||||
priority: 10,
|
||||
}
|
||||
);
|
||||
console.log(`Job ${job.id} enqueued`);
|
||||
console.log(`${description}`);
|
||||
|
||||
return job.id;
|
||||
},
|
||||
},
|
||||
|
||||
// Comic Book Import Job Queue
|
||||
"enqueue.async": {
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
sessionId: String;
|
||||
}>
|
||||
) => {
|
||||
try {
|
||||
console.log(
|
||||
`Recieved Job ID ${ctx.locals.job.id}, processing...`
|
||||
);
|
||||
// 1. De-structure the job params
|
||||
const { fileObject } = ctx.locals.job.data.params;
|
||||
|
||||
// 2. Extract metadata from the archive
|
||||
const result = await extractFromArchive(
|
||||
fileObject.filePath
|
||||
);
|
||||
const {
|
||||
name,
|
||||
filePath,
|
||||
fileSize,
|
||||
extension,
|
||||
mimeType,
|
||||
cover,
|
||||
containedIn,
|
||||
comicInfoJSON,
|
||||
} = result;
|
||||
|
||||
// 3a. Infer any issue-related metadata from the filename
|
||||
const { inferredIssueDetails } = refineQuery(
|
||||
result.name
|
||||
);
|
||||
console.log(
|
||||
"Issue metadata inferred: ",
|
||||
JSON.stringify(inferredIssueDetails, null, 2)
|
||||
);
|
||||
|
||||
// 3b. Orchestrate the payload
|
||||
const payload = {
|
||||
importStatus: {
|
||||
isImported: true,
|
||||
tagged: false,
|
||||
matchedResult: {
|
||||
score: "0",
|
||||
},
|
||||
},
|
||||
rawFileDetails: {
|
||||
name,
|
||||
filePath,
|
||||
fileSize,
|
||||
extension,
|
||||
mimeType,
|
||||
containedIn,
|
||||
cover,
|
||||
},
|
||||
inferredMetadata: {
|
||||
issue: inferredIssueDetails,
|
||||
},
|
||||
sourcedMetadata: {
|
||||
// except for ComicInfo.xml, everything else should be copied over from the
|
||||
// parent comic
|
||||
comicInfo: comicInfoJSON,
|
||||
},
|
||||
// since we already have at least 1 copy
|
||||
// mark it as not wanted by default
|
||||
"acquisition.source.wanted": false,
|
||||
|
||||
// clear out the downloads array
|
||||
// "acquisition.directconnect.downloads": [],
|
||||
|
||||
// mark the metadata source
|
||||
"acquisition.source.name":
|
||||
ctx.locals.job.data.params.sourcedFrom,
|
||||
};
|
||||
|
||||
// 3c. Add the bundleId, if present to the payload
|
||||
let bundleId = null;
|
||||
if (!isNil(ctx.locals.job.data.params.bundleId)) {
|
||||
bundleId = ctx.locals.job.data.params.bundleId;
|
||||
}
|
||||
|
||||
// 3d. Add the sourcedMetadata, if present
|
||||
if (
|
||||
!isNil(
|
||||
ctx.locals.job.data.params.sourcedMetadata
|
||||
) &&
|
||||
!isUndefined(
|
||||
ctx.locals.job.data.params.sourcedMetadata
|
||||
.comicvine
|
||||
)
|
||||
) {
|
||||
Object.assign(
|
||||
payload.sourcedMetadata,
|
||||
ctx.locals.job.data.params.sourcedMetadata
|
||||
);
|
||||
}
|
||||
|
||||
// 4. write to mongo
|
||||
const importResult = await this.broker.call(
|
||||
"library.rawImportToDB",
|
||||
{
|
||||
importType:
|
||||
ctx.locals.job.data.params.importType,
|
||||
bundleId,
|
||||
payload,
|
||||
}
|
||||
);
|
||||
return {
|
||||
data: {
|
||||
importResult,
|
||||
},
|
||||
id: ctx.locals.job.id,
|
||||
sessionId: ctx.params.sessionId,
|
||||
};
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`An error occurred processing Job ID ${ctx.locals.job.id}`
|
||||
);
|
||||
throw new MoleculerError(
|
||||
error,
|
||||
500,
|
||||
"IMPORT_JOB_ERROR",
|
||||
{
|
||||
data: ctx.params.sessionId,
|
||||
}
|
||||
);
|
||||
}
|
||||
},
|
||||
},
|
||||
getJobResultStatistics: {
|
||||
rest: "GET /getJobResultStatistics",
|
||||
handler: async (ctx: Context<{}>) => {
|
||||
return await JobResult.aggregate([
|
||||
{
|
||||
$group: {
|
||||
_id: {
|
||||
sessionId: "$sessionId",
|
||||
status: "$status",
|
||||
},
|
||||
earliestTimestamp: {
|
||||
$min: "$timestamp",
|
||||
},
|
||||
count: {
|
||||
$sum: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
$group: {
|
||||
_id: "$_id.sessionId",
|
||||
statuses: {
|
||||
$push: {
|
||||
status: "$_id.status",
|
||||
earliestTimestamp:
|
||||
"$earliestTimestamp",
|
||||
count: "$count",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
$project: {
|
||||
_id: 0,
|
||||
sessionId: "$_id",
|
||||
completedJobs: {
|
||||
$reduce: {
|
||||
input: "$statuses",
|
||||
initialValue: 0,
|
||||
in: {
|
||||
$sum: [
|
||||
"$$value",
|
||||
{
|
||||
$cond: [
|
||||
{
|
||||
$eq: [
|
||||
"$$this.status",
|
||||
"completed",
|
||||
],
|
||||
},
|
||||
"$$this.count",
|
||||
0,
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
failedJobs: {
|
||||
$reduce: {
|
||||
input: "$statuses",
|
||||
initialValue: 0,
|
||||
in: {
|
||||
$sum: [
|
||||
"$$value",
|
||||
{
|
||||
$cond: [
|
||||
{
|
||||
$eq: [
|
||||
"$$this.status",
|
||||
"failed",
|
||||
],
|
||||
},
|
||||
"$$this.count",
|
||||
0,
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
earliestTimestamp: {
|
||||
$min: "$statuses.earliestTimestamp",
|
||||
},
|
||||
},
|
||||
},
|
||||
]);
|
||||
},
|
||||
},
|
||||
"uncompressFullArchive.async": {
|
||||
rest: "POST /uncompressFullArchive",
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
filePath: string;
|
||||
comicObjectId: string;
|
||||
options: any;
|
||||
}>
|
||||
) => {
|
||||
console.log(
|
||||
`Recieved Job ID ${JSON.stringify(
|
||||
ctx.locals
|
||||
)}, processing...`
|
||||
);
|
||||
const { filePath, options, comicObjectId } = ctx.params;
|
||||
const comicId = new ObjectId(comicObjectId);
|
||||
// 2. Extract metadata from the archive
|
||||
const result: string[] = await uncompressEntireArchive(
|
||||
filePath,
|
||||
options
|
||||
);
|
||||
if (Array.isArray(result) && result.length !== 0) {
|
||||
// Get the containing directory of the uncompressed archive
|
||||
const directoryPath = path.dirname(result[0]);
|
||||
// Add to mongo object
|
||||
await Comic.findByIdAndUpdate(
|
||||
comicId,
|
||||
{
|
||||
$set: {
|
||||
"rawFileDetails.archive": {
|
||||
uncompressed: true,
|
||||
expandedPath: directoryPath,
|
||||
},
|
||||
},
|
||||
},
|
||||
{ new: true, safe: true, upsert: true }
|
||||
);
|
||||
return result;
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
events: {
|
||||
async "uncompressFullArchive.async.active"(
|
||||
ctx: Context<{ id: number }>
|
||||
) {
|
||||
console.log(
|
||||
`Uncompression Job ID ${ctx.params.id} is set to active.`
|
||||
);
|
||||
},
|
||||
async "uncompressFullArchive.async.completed"(
|
||||
ctx: Context<{ id: number }>
|
||||
) {
|
||||
console.log(
|
||||
`Uncompression Job ID ${ctx.params.id} completed.`
|
||||
);
|
||||
const job = await this.job(ctx.params.id);
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "LS_UNCOMPRESSION_JOB_COMPLETE",
|
||||
args: [
|
||||
{
|
||||
uncompressedArchive: job.returnvalue,
|
||||
},
|
||||
],
|
||||
});
|
||||
return job.returnvalue;
|
||||
},
|
||||
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
|
||||
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
|
||||
console.log(`Job ID ${ctx.params.id} is set to active.`);
|
||||
},
|
||||
async drained(ctx) {
|
||||
console.log("Queue drained.");
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "LS_IMPORT_QUEUE_DRAINED",
|
||||
args: [
|
||||
{
|
||||
message: "drained",
|
||||
},
|
||||
],
|
||||
});
|
||||
},
|
||||
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
|
||||
// 1. Fetch the job result using the job Id
|
||||
const job = await this.job(ctx.params.id);
|
||||
// 2. Increment the completed job counter
|
||||
await pubClient.incr("completedJobCount");
|
||||
// 3. Fetch the completed job count for the final payload to be sent to the client
|
||||
const completedJobCount = await pubClient.get(
|
||||
"completedJobCount"
|
||||
);
|
||||
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "LS_COVER_EXTRACTED",
|
||||
args: [
|
||||
{
|
||||
completedJobCount,
|
||||
importResult: job.returnvalue.data.importResult,
|
||||
},
|
||||
],
|
||||
});
|
||||
// 5. Persist the job results in mongo for posterity
|
||||
await JobResult.create({
|
||||
id: ctx.params.id,
|
||||
status: "completed",
|
||||
timestamp: job.timestamp,
|
||||
sessionId: job.returnvalue.sessionId,
|
||||
failedReason: {},
|
||||
});
|
||||
|
||||
console.log(`Job ID ${ctx.params.id} completed.`);
|
||||
},
|
||||
|
||||
async "enqueue.async.failed"(ctx) {
|
||||
const job = await this.job(ctx.params.id);
|
||||
await pubClient.incr("failedJobCount");
|
||||
const failedJobCount = await pubClient.get(
|
||||
"failedJobCount"
|
||||
);
|
||||
|
||||
await JobResult.create({
|
||||
id: ctx.params.id,
|
||||
status: "failed",
|
||||
failedReason: job.failedReason,
|
||||
sessionId: job.data.params.sessionId,
|
||||
timestamp: job.timestamp,
|
||||
});
|
||||
|
||||
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "LS_COVER_EXTRACTION_FAILED",
|
||||
args: [
|
||||
{
|
||||
failedJobCount,
|
||||
importResult: job,
|
||||
},
|
||||
],
|
||||
});
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -51,6 +51,7 @@ import {
|
||||
IExtractionOptions,
|
||||
} from "threetwo-ui-typings";
|
||||
const ObjectId = require("mongoose").Types.ObjectId;
|
||||
import { pubClient } from "../config/redis.config";
|
||||
import fsExtra from "fs-extra";
|
||||
const through2 = require("through2");
|
||||
import klaw from "klaw";
|
||||
@@ -66,16 +67,32 @@ export default class ImportService extends Service {
|
||||
mixins: [DbMixin("comics", Comic)],
|
||||
hooks: {},
|
||||
actions: {
|
||||
getHealthInformation: {
|
||||
rest: "GET /getHealthInformation",
|
||||
params: {},
|
||||
handler: async (ctx: Context<{}>) => {
|
||||
try {
|
||||
return await ctx.broker.call("$node.services");
|
||||
} catch (error) {
|
||||
return new Error("Service is down.");
|
||||
}
|
||||
},
|
||||
},
|
||||
walkFolders: {
|
||||
rest: "POST /walkFolders",
|
||||
params: {
|
||||
basePathToWalk: "string",
|
||||
},
|
||||
async handler(ctx: Context<{ basePathToWalk: string }>) {
|
||||
params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
basePathToWalk: string;
|
||||
extensions: string[];
|
||||
}>
|
||||
) {
|
||||
console.log(ctx.params);
|
||||
return await walkFolder(ctx.params.basePathToWalk, [
|
||||
".cbz",
|
||||
".cbr",
|
||||
".cb7",
|
||||
...ctx.params.extensions,
|
||||
]);
|
||||
},
|
||||
},
|
||||
@@ -90,11 +107,18 @@ export default class ImportService extends Service {
|
||||
rest: "POST /uncompressFullArchive",
|
||||
params: {},
|
||||
handler: async (
|
||||
ctx: Context<{ filePath: string; options: any }>
|
||||
ctx: Context<{
|
||||
filePath: string;
|
||||
comicObjectId: string;
|
||||
options: any;
|
||||
}>
|
||||
) => {
|
||||
await broker.call("importqueue.uncompressResize", {
|
||||
this.broker.call("jobqueue.enqueue", {
|
||||
filePath: ctx.params.filePath,
|
||||
comicObjectId: ctx.params.comicObjectId,
|
||||
options: ctx.params.options,
|
||||
queueName: "uncompressFullArchive.async",
|
||||
description: `Job for uncompressing archive at ${ctx.params.filePath}`,
|
||||
});
|
||||
},
|
||||
},
|
||||
@@ -139,64 +163,81 @@ export default class ImportService extends Service {
|
||||
},
|
||||
newImport: {
|
||||
rest: "POST /newImport",
|
||||
params: {},
|
||||
// params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
extractionOptions?: any;
|
||||
sessionId: string;
|
||||
}>
|
||||
) {
|
||||
// 1. Walk the Source folder
|
||||
klaw(path.resolve(COMICS_DIRECTORY))
|
||||
// 1.1 Filter on .cb* extensions
|
||||
.pipe(
|
||||
through2.obj(function (item, enc, next) {
|
||||
let fileExtension = path.extname(item.path);
|
||||
if (
|
||||
[".cbz", ".cbr", ".cb7"].includes(
|
||||
fileExtension
|
||||
)
|
||||
) {
|
||||
this.push(item);
|
||||
}
|
||||
next();
|
||||
})
|
||||
)
|
||||
// 1.2 Pipe filtered results to the next step
|
||||
.on("data", async (item) => {
|
||||
console.info(
|
||||
"Found a file at path: %s",
|
||||
item.path
|
||||
);
|
||||
let comicExists = await Comic.exists({
|
||||
"rawFileDetails.name": `${path.basename(
|
||||
item.path,
|
||||
path.extname(item.path)
|
||||
)}`,
|
||||
});
|
||||
if (!comicExists) {
|
||||
// 2. Send the extraction job to the queue
|
||||
await broker.call(
|
||||
"importqueue.processImport",
|
||||
{
|
||||
try {
|
||||
// Get params to be passed to the import jobs
|
||||
const { sessionId } = ctx.params;
|
||||
// 1. Walk the Source folder
|
||||
klaw(path.resolve(COMICS_DIRECTORY))
|
||||
// 1.1 Filter on .cb* extensions
|
||||
.pipe(
|
||||
through2.obj(function (item, enc, next) {
|
||||
let fileExtension = path.extname(
|
||||
item.path
|
||||
);
|
||||
if (
|
||||
[".cbz", ".cbr", ".cb7"].includes(
|
||||
fileExtension
|
||||
)
|
||||
) {
|
||||
this.push(item);
|
||||
}
|
||||
next();
|
||||
})
|
||||
)
|
||||
// 1.2 Pipe filtered results to the next step
|
||||
// Enqueue the job in the queue
|
||||
.on("data", async (item) => {
|
||||
console.info(
|
||||
"Found a file at path: %s",
|
||||
item.path
|
||||
);
|
||||
let comicExists = await Comic.exists({
|
||||
"rawFileDetails.name": `${path.basename(
|
||||
item.path,
|
||||
path.extname(item.path)
|
||||
)}`,
|
||||
});
|
||||
if (!comicExists) {
|
||||
// 2.1 Reset the job counters in Redis
|
||||
await pubClient.set(
|
||||
"completedJobCount",
|
||||
0
|
||||
);
|
||||
await pubClient.set(
|
||||
"failedJobCount",
|
||||
0
|
||||
);
|
||||
// 2.2 Send the extraction job to the queue
|
||||
this.broker.call("jobqueue.enqueue", {
|
||||
fileObject: {
|
||||
filePath: item.path,
|
||||
fileSize: item.stats.size,
|
||||
},
|
||||
sessionId,
|
||||
importType: "new",
|
||||
}
|
||||
);
|
||||
} else {
|
||||
console.log(
|
||||
"Comic already exists in the library."
|
||||
);
|
||||
}
|
||||
})
|
||||
.on("end", () => {
|
||||
console.log("All files traversed.");
|
||||
});
|
||||
action: "enqueue.async",
|
||||
});
|
||||
} else {
|
||||
console.log(
|
||||
"Comic already exists in the library."
|
||||
);
|
||||
}
|
||||
})
|
||||
.on("end", () => {
|
||||
console.log("All files traversed.");
|
||||
});
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
rawImportToDB: {
|
||||
rest: "POST /rawImportToDB",
|
||||
params: {},
|
||||
@@ -262,6 +303,7 @@ export default class ImportService extends Service {
|
||||
);
|
||||
switch (ctx.params.importType) {
|
||||
case "new":
|
||||
console.log(comicMetadata);
|
||||
return await Comic.create(comicMetadata);
|
||||
case "update":
|
||||
return await Comic.findOneAndUpdate(
|
||||
@@ -383,6 +425,66 @@ export default class ImportService extends Service {
|
||||
});
|
||||
},
|
||||
},
|
||||
applyTorrentDownloadMetadata: {
|
||||
rest: "POST /applyTorrentDownloadMetadata",
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
torrentToDownload: any;
|
||||
comicObjectId: String;
|
||||
infoHash: String;
|
||||
name: String;
|
||||
announce: [String];
|
||||
}>
|
||||
) => {
|
||||
const {
|
||||
name,
|
||||
torrentToDownload,
|
||||
comicObjectId,
|
||||
announce,
|
||||
infoHash,
|
||||
} = ctx.params;
|
||||
console.log(JSON.stringify(ctx.params, null, 4));
|
||||
try {
|
||||
return await Comic.findByIdAndUpdate(
|
||||
new ObjectId(comicObjectId),
|
||||
{
|
||||
$push: {
|
||||
"acquisition.torrent": {
|
||||
infoHash,
|
||||
name,
|
||||
announce,
|
||||
},
|
||||
},
|
||||
},
|
||||
{ new: true, safe: true, upsert: true }
|
||||
);
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
}
|
||||
},
|
||||
},
|
||||
getInfoHashes: {
|
||||
rest: "GET /getInfoHashes",
|
||||
handler: async (ctx: Context<{}>) => {
|
||||
try {
|
||||
return await Comic.aggregate([
|
||||
{
|
||||
$unwind: "$acquisition.torrent",
|
||||
},
|
||||
{
|
||||
$group: {
|
||||
_id: "$_id",
|
||||
infoHashes: {
|
||||
$push: "$acquisition.torrent.infoHash",
|
||||
},
|
||||
},
|
||||
},
|
||||
]);
|
||||
} catch (err) {
|
||||
return err;
|
||||
}
|
||||
},
|
||||
},
|
||||
getComicBooks: {
|
||||
rest: "POST /getComicBooks",
|
||||
params: {},
|
||||
@@ -402,6 +504,7 @@ export default class ImportService extends Service {
|
||||
rest: "POST /getComicBookById",
|
||||
params: { id: "string" },
|
||||
async handler(ctx: Context<{ id: string }>) {
|
||||
console.log(ctx.params.id);
|
||||
return await Comic.findById(ctx.params.id);
|
||||
},
|
||||
},
|
||||
|
||||
@@ -46,14 +46,15 @@ export default class SettingsService extends Service {
|
||||
.map((item) => JSON.stringify(item))
|
||||
.join("\n");
|
||||
queries += "\n";
|
||||
const { body } = await eSClient.msearch({
|
||||
const { responses } = await eSClient.msearch({
|
||||
body: queries,
|
||||
});
|
||||
body.responses.forEach((match) => {
|
||||
|
||||
responses.forEach((match) => {
|
||||
console.log(match.hits);
|
||||
});
|
||||
|
||||
return body.responses;
|
||||
return responses;
|
||||
},
|
||||
},
|
||||
issue: {
|
||||
@@ -74,9 +75,9 @@ export default class SettingsService extends Service {
|
||||
) => {
|
||||
try {
|
||||
console.log(ctx.params);
|
||||
const { query, pagination } = ctx.params;
|
||||
const { query, pagination, type } = ctx.params;
|
||||
let eSQuery = {};
|
||||
switch (ctx.params.type) {
|
||||
switch (type) {
|
||||
case "all":
|
||||
Object.assign(eSQuery, {
|
||||
match_all: {},
|
||||
|
||||
@@ -8,7 +8,7 @@ import {
|
||||
} from "moleculer";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Settings from "../models/settings.model";
|
||||
import { isEmpty, pickBy, identity, map } from "lodash";
|
||||
import { isEmpty, pickBy, identity, map, isNil } from "lodash";
|
||||
const ObjectId = require("mongoose").Types.ObjectId;
|
||||
|
||||
export default class SettingsService extends Service {
|
||||
@@ -28,12 +28,31 @@ export default class SettingsService extends Service {
|
||||
rest: "GET /getAllSettings",
|
||||
params: {},
|
||||
async handler(ctx: Context<{ settingsKey: string }>) {
|
||||
const settings = await Settings.find({});
|
||||
if (isEmpty(settings)) {
|
||||
const { settingsKey } = ctx.params;
|
||||
|
||||
// Initialize a projection object. Include everything by default.
|
||||
let projection = settingsKey
|
||||
? { _id: 0, [settingsKey]: 1 }
|
||||
: {};
|
||||
|
||||
// Find the settings with the dynamic projection
|
||||
const settings = await Settings.find({}, projection);
|
||||
|
||||
if (settings.length === 0) {
|
||||
return {};
|
||||
}
|
||||
console.log(settings[0]);
|
||||
return settings[0];
|
||||
|
||||
// If settingsKey is provided, return the specific part of the settings.
|
||||
// Otherwise, return the entire settings document.
|
||||
if (settingsKey) {
|
||||
// Check if the specific key exists in the settings document.
|
||||
// Since `settings` is an array, we access the first element.
|
||||
// Then, we use the settingsKey to return only that part of the document.
|
||||
return settings[0][settingsKey] || {};
|
||||
} else {
|
||||
// Return the entire settings document
|
||||
return settings[0];
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
@@ -42,44 +61,106 @@ export default class SettingsService extends Service {
|
||||
params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
settingsPayload: {
|
||||
host: object;
|
||||
airDCPPUserSettings: object;
|
||||
hubs: [];
|
||||
settingsPayload?: {
|
||||
protocol: string;
|
||||
hostname: string;
|
||||
port: string;
|
||||
username: string;
|
||||
password: string;
|
||||
_id?: string;
|
||||
airDCPPUserSettings?: object;
|
||||
hubs?: [];
|
||||
};
|
||||
settingsObjectId: string;
|
||||
settingsObjectId?: string;
|
||||
settingsKey: string;
|
||||
}>
|
||||
) {
|
||||
console.log("varan bhat", ctx.params);
|
||||
const { host, airDCPPUserSettings, hubs } =
|
||||
ctx.params.settingsPayload;
|
||||
let query = {
|
||||
host,
|
||||
airDCPPUserSettings,
|
||||
hubs,
|
||||
};
|
||||
const keysToUpdate = pickBy(query, identity);
|
||||
let updateQuery = {};
|
||||
try {
|
||||
let query = {};
|
||||
const { settingsKey, settingsObjectId } =
|
||||
ctx.params;
|
||||
const {
|
||||
hostname,
|
||||
protocol,
|
||||
port,
|
||||
username,
|
||||
password,
|
||||
} = ctx.params.settingsPayload;
|
||||
const host = {
|
||||
hostname,
|
||||
protocol,
|
||||
port,
|
||||
username,
|
||||
password,
|
||||
};
|
||||
const undefinedPropsInHostname = Object.values(
|
||||
host
|
||||
).filter((value) => value === undefined);
|
||||
|
||||
map(Object.keys(keysToUpdate), (key) => {
|
||||
updateQuery[`directConnect.client.${key}`] =
|
||||
query[key];
|
||||
});
|
||||
const options = {
|
||||
upsert: true,
|
||||
new: true,
|
||||
setDefaultsOnInsert: true,
|
||||
};
|
||||
const filter = {
|
||||
_id: new ObjectId(ctx.params.settingsObjectId),
|
||||
};
|
||||
const result = Settings.findOneAndUpdate(
|
||||
filter,
|
||||
{ $set: updateQuery },
|
||||
options
|
||||
);
|
||||
// Update, depending what key was passed in params
|
||||
// 1. Construct the update query
|
||||
switch (settingsKey) {
|
||||
case "bittorrent":
|
||||
console.log(
|
||||
`Recieved settings for ${settingsKey}, building query...`
|
||||
);
|
||||
query = {
|
||||
...(undefinedPropsInHostname.length ===
|
||||
0 && {
|
||||
$set: {
|
||||
"bittorrent.client.host": host,
|
||||
},
|
||||
}),
|
||||
};
|
||||
break;
|
||||
case "directConnect":
|
||||
console.log(
|
||||
`Recieved settings for ${settingsKey}, building query...`
|
||||
);
|
||||
const { hubs, airDCPPUserSettings } =
|
||||
ctx.params.settingsPayload;
|
||||
query = {
|
||||
...(undefinedPropsInHostname.length ===
|
||||
0 && {
|
||||
$set: {
|
||||
"directConnect.client.host":
|
||||
host,
|
||||
},
|
||||
}),
|
||||
...(!isNil(hubs) && {
|
||||
$set: {
|
||||
"directConnect.client.hubs":
|
||||
hubs,
|
||||
},
|
||||
}),
|
||||
};
|
||||
console.log(JSON.stringify(query, null, 4));
|
||||
break;
|
||||
|
||||
return result;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
||||
// 2. Set up options, filters
|
||||
const options = {
|
||||
upsert: true,
|
||||
setDefaultsOnInsert: true,
|
||||
returnDocument: "after",
|
||||
};
|
||||
const filter = settingsObjectId
|
||||
? { _id: settingsObjectId }
|
||||
: {};
|
||||
|
||||
// 3. Execute the mongo query
|
||||
const result = await Settings.findOneAndUpdate(
|
||||
filter,
|
||||
query,
|
||||
options
|
||||
);
|
||||
return result;
|
||||
} catch (err) {
|
||||
return err;
|
||||
}
|
||||
},
|
||||
},
|
||||
deleteSettings: {
|
||||
|
||||
@@ -1,16 +1,14 @@
|
||||
"use strict";
|
||||
import { Service, ServiceBroker, ServiceSchema } from "moleculer";
|
||||
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
|
||||
import { JobType } from "moleculer-bullmq";
|
||||
import { createClient } from "redis";
|
||||
import { createAdapter } from "@socket.io/redis-adapter";
|
||||
import Session from "../models/session.model";
|
||||
import { pubClient, subClient } from "../config/redis.config";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
const SocketIOService = require("moleculer-io");
|
||||
const redisURL = new URL(process.env.REDIS_URI);
|
||||
// console.log(redisURL.hostname);
|
||||
const { v4: uuidv4 } = require("uuid");
|
||||
|
||||
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
|
||||
(async () => {
|
||||
await pubClient.connect();
|
||||
})();
|
||||
const subClient = pubClient.duplicate();
|
||||
export default class SocketService extends Service {
|
||||
// @ts-ignore
|
||||
public constructor(
|
||||
@@ -28,45 +26,7 @@ export default class SocketService extends Service {
|
||||
"/": {
|
||||
events: {
|
||||
call: {
|
||||
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
|
||||
},
|
||||
action: async (data, ack) => {
|
||||
// write your handler function here.
|
||||
switch (data.type) {
|
||||
case "LS_IMPORT":
|
||||
console.log(
|
||||
`Recieved ${data.type} event.`
|
||||
);
|
||||
// 1. Send task to queue
|
||||
await this.broker.call(
|
||||
"library.newImport",
|
||||
data.data,
|
||||
{}
|
||||
);
|
||||
break;
|
||||
case "LS_TOGGLE_IMPORT_QUEUE":
|
||||
await this.broker.call(
|
||||
"importqueue.toggleImportQueue",
|
||||
data.data,
|
||||
{}
|
||||
);
|
||||
break;
|
||||
case "LS_SINGLE_IMPORT":
|
||||
console.info(
|
||||
"AirDC++ finished a download -> "
|
||||
);
|
||||
console.log(data);
|
||||
await this.broker.call(
|
||||
"library.importDownloadedComic",
|
||||
{ bundle: data },
|
||||
{}
|
||||
);
|
||||
break;
|
||||
// uncompress archive events
|
||||
case "COMICBOOK_EXTRACTION_SUCCESS":
|
||||
console.log(data);
|
||||
return data;
|
||||
}
|
||||
whitelist: ["socket.*"],
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -77,12 +37,113 @@ export default class SocketService extends Service {
|
||||
},
|
||||
},
|
||||
hooks: {},
|
||||
actions: {},
|
||||
actions: {
|
||||
resumeSession: async (ctx: Context<{ sessionId: string }>) => {
|
||||
const { sessionId } = ctx.params;
|
||||
console.log("Attempting to resume session...");
|
||||
try {
|
||||
const sessionRecord = await Session.find({
|
||||
sessionId,
|
||||
});
|
||||
// 1. Check for sessionId's existence, and a match
|
||||
if (
|
||||
sessionRecord.length !== 0 &&
|
||||
sessionRecord[0].sessionId === sessionId
|
||||
) {
|
||||
// 2. Find if the queue has active, paused or waiting jobs
|
||||
const jobs: JobType = await this.broker.call(
|
||||
"jobqueue.getJobCountsByType",
|
||||
{}
|
||||
);
|
||||
const { active, paused, waiting } = jobs;
|
||||
|
||||
if (active > 0 || paused > 0 || waiting > 0) {
|
||||
// 3. Get job counts
|
||||
const completedJobCount = await pubClient.get(
|
||||
"completedJobCount"
|
||||
);
|
||||
const failedJobCount = await pubClient.get(
|
||||
"failedJobCount"
|
||||
);
|
||||
|
||||
// 4. Send the counts to the active socket.io session
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION",
|
||||
args: [
|
||||
{
|
||||
completedJobCount,
|
||||
failedJobCount,
|
||||
queueStatus: "running",
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
throw new MoleculerError(
|
||||
err,
|
||||
500,
|
||||
"SESSION_ID_NOT_FOUND",
|
||||
{
|
||||
data: sessionId,
|
||||
}
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
setQueueStatus: async (
|
||||
ctx: Context<{
|
||||
queueAction: string;
|
||||
queueStatus: string;
|
||||
}>
|
||||
) => {
|
||||
const { queueAction } = ctx.params;
|
||||
await this.broker.call(
|
||||
"jobqueue.toggle",
|
||||
{ action: queueAction },
|
||||
{}
|
||||
);
|
||||
},
|
||||
importSingleIssue: async (ctx: Context<{}>) => {
|
||||
console.info("AirDC++ finished a download -> ");
|
||||
console.log(ctx.params);
|
||||
// await this.broker.call(
|
||||
// "library.importDownloadedComic",
|
||||
// { bundle: data },
|
||||
// {}
|
||||
// );
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
async started() {
|
||||
this.io.on("connection", (data) =>
|
||||
console.log("socket.io server initialized.")
|
||||
);
|
||||
this.io.on("connection", async (socket) => {
|
||||
console.log(
|
||||
`socket.io server connected to client with session ID: ${socket.id}`
|
||||
);
|
||||
console.log("Looking up sessionId in Mongo...");
|
||||
const sessionIdExists = await Session.find({
|
||||
sessionId: socket.handshake.query.sessionId,
|
||||
});
|
||||
// 1. if sessionId isn't found in Mongo, create one and persist it
|
||||
if (sessionIdExists.length === 0) {
|
||||
console.log(
|
||||
`Socket Id ${socket.id} not found in Mongo, creating a new session...`
|
||||
);
|
||||
const sessionId = uuidv4();
|
||||
socket.sessionId = sessionId;
|
||||
console.log(`Saving session ${sessionId} to Mongo...`);
|
||||
await Session.create({
|
||||
sessionId,
|
||||
socketId: socket.id,
|
||||
});
|
||||
socket.emit("sessionInitialized", sessionId);
|
||||
}
|
||||
// 2. else, retrieve it from Mongo and "resume" the socket.io connection
|
||||
else {
|
||||
console.log(`Found socketId ${socket.id}, no-op.`);
|
||||
}
|
||||
});
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
101
services/torrentjobs.service.ts
Normal file
101
services/torrentjobs.service.ts
Normal file
@@ -0,0 +1,101 @@
|
||||
"use strict";
|
||||
import axios from "axios";
|
||||
import {
|
||||
Context,
|
||||
Service,
|
||||
ServiceBroker,
|
||||
ServiceSchema,
|
||||
Errors,
|
||||
} from "moleculer";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
const ObjectId = require("mongoose").Types.ObjectId;
|
||||
import { isNil, isUndefined } from "lodash";
|
||||
import BullMqMixin from "moleculer-bullmq";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
|
||||
export default class ImageTransformation extends Service {
|
||||
// @ts-ignore
|
||||
public constructor(
|
||||
public broker: ServiceBroker,
|
||||
schema: ServiceSchema<{}> = { name: "imagetransformation" }
|
||||
) {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "torrentjobs",
|
||||
mixins: [DbMixin("comics", Comic), BullMqMixin],
|
||||
settings: {
|
||||
bullmq: {
|
||||
client: process.env.REDIS_URI,
|
||||
},
|
||||
},
|
||||
hooks: {},
|
||||
actions: {
|
||||
getTorrentData: {
|
||||
queue: true,
|
||||
rest: "GET /getTorrentData",
|
||||
handler: async (ctx: Context<{ trigger: string }>) => {
|
||||
const { trigger } = ctx.params;
|
||||
console.log(`Recieved ${trigger} as the trigger...`);
|
||||
|
||||
const jobOptions = {
|
||||
jobId: "retrieveTorrentData",
|
||||
name: "bossy",
|
||||
repeat: {
|
||||
every: 10000, // Repeat every 10000 ms
|
||||
limit: 100, // Limit to 100 repeats
|
||||
},
|
||||
};
|
||||
|
||||
const job = await this.localQueue(
|
||||
ctx,
|
||||
"fetchTorrentData",
|
||||
ctx.params,
|
||||
jobOptions
|
||||
);
|
||||
return job;
|
||||
},
|
||||
},
|
||||
fetchTorrentData: {
|
||||
rest: "GET /fetchTorrentData",
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
birdName: String;
|
||||
}>
|
||||
) => {
|
||||
const repeatableJob = await this.$resolve(
|
||||
"torrentjobs"
|
||||
).getRepeatableJobs();
|
||||
console.info(repeatableJob);
|
||||
console.info(
|
||||
`Scheduled job for fetching torrent data fired.`
|
||||
);
|
||||
// 1. query mongo for infohashes
|
||||
const infoHashes = await this.broker.call(
|
||||
"library.getInfoHashes",
|
||||
{}
|
||||
);
|
||||
// 2. query qbittorrent to see if they exist
|
||||
const torrents: any = await this.broker.call(
|
||||
"qbittorrent.getTorrentRealTimeStats",
|
||||
{ infoHashes }
|
||||
);
|
||||
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "AS_TORRENT_DATA",
|
||||
args: [
|
||||
{
|
||||
torrents,
|
||||
},
|
||||
],
|
||||
});
|
||||
// 3. If they do, don't do anything
|
||||
// 4. If they don't purge them from mongo
|
||||
},
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -21,7 +21,7 @@ const ALLOWED_IMAGE_FILE_FORMATS = [".jpg", ".jpeg", ".png"];
|
||||
// Tell FileMagic where to find the magic.mgc file
|
||||
FileMagic.magicFile = require.resolve("@npcz/magic/dist/magic.mgc");
|
||||
|
||||
// We can onlu use MAGIC_PRESERVE_ATIME on operating suystems that support
|
||||
// We can only use MAGIC_PRESERVE_ATIME on operating suystems that support
|
||||
// it and that includes OS X for example. It's a good practice as we don't
|
||||
// want to change the last access time because we are just checking the file
|
||||
// contents type
|
||||
@@ -108,6 +108,14 @@ export const isValidImageFileExtension = (fileName: string): boolean => {
|
||||
return includes(ALLOWED_IMAGE_FILE_FORMATS, path.extname(fileName));
|
||||
};
|
||||
|
||||
/**
|
||||
* This function constructs paths for a target extraction folder and an input file based on extraction
|
||||
* options and a walked folder.
|
||||
* @param {IExtractionOptions} extractionOptions - An object containing options for the extraction
|
||||
* process, such as the target extraction folder.
|
||||
* @param {IFolderData} walkedFolder - `walkedFolder` is an object that represents a folder that has
|
||||
* been walked through during a file extraction process. It contains the following properties:
|
||||
*/
|
||||
export const constructPaths = (
|
||||
extractionOptions: IExtractionOptions,
|
||||
walkedFolder: IFolderData
|
||||
@@ -142,7 +150,7 @@ export const getFileConstituents = (filePath: string) => {
|
||||
};
|
||||
|
||||
/**
|
||||
* Method that infers MIME type from a filepath
|
||||
* Method that infers MIME type from a filepath
|
||||
* @param {string} filePath
|
||||
* @returns {Promise} string
|
||||
*/
|
||||
@@ -155,6 +163,15 @@ export const getMimeType = async (filePath: string) => {
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* This function creates a directory at a specified path using the fse.ensureDir method and throws an
|
||||
* error if it fails.
|
||||
* @param {any} options - The options parameter is an optional object that can be passed to the
|
||||
* fse.ensureDir method to configure its behavior. It can include properties such as mode, which sets
|
||||
* the permissions of the directory, and fs, which specifies the file system module to use.
|
||||
* @param {string} directoryPath - The `directoryPath` parameter is a string that represents the path
|
||||
* of the directory that needs to be created.
|
||||
*/
|
||||
export const createDirectory = async (options: any, directoryPath: string) => {
|
||||
try {
|
||||
await fse.ensureDir(directoryPath, options);
|
||||
|
||||
@@ -47,6 +47,7 @@ import {
|
||||
getMimeType,
|
||||
} from "../utils/file.utils";
|
||||
import { convertXMLToJSON } from "./xml.utils";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
const fse = require("fs-extra");
|
||||
const Unrar = require("unrar");
|
||||
interface RarFile {
|
||||
@@ -73,7 +74,7 @@ const errors = [];
|
||||
*/
|
||||
export const extractComicInfoXMLFromRar = async (
|
||||
filePath: string,
|
||||
mimeType: string,
|
||||
mimeType: string
|
||||
): Promise<any> => {
|
||||
try {
|
||||
// Create the target directory
|
||||
@@ -209,7 +210,7 @@ export const extractComicInfoXMLFromRar = async (
|
||||
|
||||
export const extractComicInfoXMLFromZip = async (
|
||||
filePath: string,
|
||||
mimeType: string,
|
||||
mimeType: string
|
||||
): Promise<any> => {
|
||||
try {
|
||||
// Create the target directory
|
||||
@@ -254,7 +255,7 @@ export const extractComicInfoXMLFromZip = async (
|
||||
// Push the first file (cover) to our extraction target
|
||||
extractionTargets.push(files[0].name);
|
||||
filesToWriteToDisk.coverFile = path.basename(files[0].name);
|
||||
|
||||
|
||||
if (!isEmpty(comicInfoXMLFileObject)) {
|
||||
filesToWriteToDisk.comicInfoXML = comicInfoXMLFileObject[0].name;
|
||||
extractionTargets.push(filesToWriteToDisk.comicInfoXML);
|
||||
@@ -356,18 +357,26 @@ export const extractFromArchive = async (filePath: string) => {
|
||||
switch (mimeType) {
|
||||
case "application/x-7z-compressed; charset=binary":
|
||||
case "application/zip; charset=binary":
|
||||
const cbzResult = await extractComicInfoXMLFromZip(filePath, mimeType);
|
||||
const cbzResult = await extractComicInfoXMLFromZip(
|
||||
filePath,
|
||||
mimeType
|
||||
);
|
||||
return Object.assign({}, ...cbzResult);
|
||||
|
||||
case "application/x-rar; charset=binary":
|
||||
const cbrResult = await extractComicInfoXMLFromRar(filePath, mimeType);
|
||||
const cbrResult = await extractComicInfoXMLFromRar(
|
||||
filePath,
|
||||
mimeType
|
||||
);
|
||||
return Object.assign({}, ...cbrResult);
|
||||
|
||||
default:
|
||||
console.log(
|
||||
console.error(
|
||||
"Error inferring filetype for comicinfo.xml extraction."
|
||||
);
|
||||
break;
|
||||
throw new MoleculerError({}, 500, "FILETYPE_INFERENCE_ERROR", {
|
||||
data: { message: "Cannot infer filetype." },
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user