Compare commits
24 Commits
migration-
...
qbittorren
| Author | SHA1 | Date | |
|---|---|---|---|
| b35e2140b5 | |||
| f053dcb789 | |||
| aea7a24f76 | |||
| 8f0c2f4302 | |||
| 7dbe2b2701 | |||
| 4cdb11fcbd | |||
| 78f7c1b595 | |||
| bbd2906ebf | |||
| f3965437b5 | |||
| 78e0e9f8ce | |||
| c926758db6 | |||
| b2b35aedc0 | |||
| f35e3ccbe0 | |||
| 7b0c0a7420 | |||
| c2bbbf311d | |||
| b8ca03220f | |||
| b87b0c875d | |||
| 11fbaf10db | |||
| 1229feb69c | |||
| 3efdc7c2e2 | |||
| 1fff931941 | |||
| f4e2db5a5f | |||
| 1d7561279b | |||
| 9e47ae0436 |
24
README.md
24
README.md
@@ -10,14 +10,28 @@ This [moleculer-based](https://github.com/moleculerjs/moleculer-web) microservic
|
|||||||
|
|
||||||
## Local Development
|
## Local Development
|
||||||
|
|
||||||
1. ~~You need `calibre` in your local path.
|
1. You need the following dependencies installed: `mongo`, `elasticsearch` and `redis`
|
||||||
On `macOS` you can `brew install calibre` and make sure that `ebook-meta` is present on the path~~ Calibre is no longer required as a dependency. Ignore this step.
|
2. You also need binaries for `unrar` and `p7zip`
|
||||||
2. You need `mongo` for the data store. on `macOS` you can use [these instructions](https://docs.mongodb.com/manual/tutorial/install-mongodb-on-os-x/) to install it
|
|
||||||
3. Clone this repo
|
3. Clone this repo
|
||||||
4. Run `npm i`
|
4. Run `npm i`
|
||||||
5. Assuming you installed mongo correctly, run `MONGO_URI=mongodb://localhost:27017/threetwo npm run dev` to start the service
|
5. Assuming you installed the dependencies correctly, run:
|
||||||
|
|
||||||
|
```
|
||||||
|
COMICS_DIRECTORY=<PATH_TO_COMICS_DIRECTORY> \
|
||||||
|
USERDATA_DIRECTORY=<PATH_TO_USERDATA_DIRECTORY> \
|
||||||
|
REDIS_URI=redis://<REDIS_HOST:REDIS_PORT> \
|
||||||
|
ELASTICSEARCH_URI=<ELASTICSEARCH_HOST:ELASTICSEARCH_PORT> \
|
||||||
|
MONGO_URI=mongodb://<MONGO_HOST:MONGO_PORT>/threetwo \
|
||||||
|
UNRAR_BIN_PATH=<UNRAR_BIN_PATH> \
|
||||||
|
SEVENZ_BINARY_PATH=<SEVENZ_BINARY_PATH> \
|
||||||
|
npm run dev
|
||||||
|
```
|
||||||
|
|
||||||
|
to start the service
|
||||||
|
|
||||||
6. You should see the service spin up and a list of all the endpoints in the terminal
|
6. You should see the service spin up and a list of all the endpoints in the terminal
|
||||||
7. The service can be accessed through `http://localhost:3000/api/import/*`
|
7. The service can be accessed through `http://localhost:3000/api/<serviceName>/*`
|
||||||
|
|
||||||
## Docker Instructions
|
## Docker Instructions
|
||||||
|
|
||||||
1. Build the image using `docker build . -t frishi/threetwo-import-service`. Give it a hot minute.
|
1. Build the image using `docker build . -t frishi/threetwo-import-service`. Give it a hot minute.
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ const paginate = require("mongoose-paginate-v2");
|
|||||||
const { Client } = require("@elastic/elasticsearch");
|
const { Client } = require("@elastic/elasticsearch");
|
||||||
import ComicVineMetadataSchema from "./comicvine.metadata.model";
|
import ComicVineMetadataSchema from "./comicvine.metadata.model";
|
||||||
import { mongoosastic } from "mongoosastic-ts";
|
import { mongoosastic } from "mongoosastic-ts";
|
||||||
const mongoose = require("mongoose")
|
const mongoose = require("mongoose");
|
||||||
import {
|
import {
|
||||||
MongoosasticDocument,
|
MongoosasticDocument,
|
||||||
MongoosasticModel,
|
MongoosasticModel,
|
||||||
@@ -28,6 +28,10 @@ const RawFileDetailsSchema = mongoose.Schema({
|
|||||||
mimeType: String,
|
mimeType: String,
|
||||||
containedIn: String,
|
containedIn: String,
|
||||||
pageCount: Number,
|
pageCount: Number,
|
||||||
|
archive: {
|
||||||
|
uncompressed: Boolean,
|
||||||
|
expandedPath: String,
|
||||||
|
},
|
||||||
cover: {
|
cover: {
|
||||||
filePath: String,
|
filePath: String,
|
||||||
stats: Object,
|
stats: Object,
|
||||||
@@ -111,12 +115,13 @@ const ComicSchema = mongoose.Schema(
|
|||||||
default: [],
|
default: [],
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
torrent: {
|
torrent: [
|
||||||
sourceApplication: String,
|
{
|
||||||
magnet: String,
|
infoHash: String,
|
||||||
tracker: String,
|
name: String,
|
||||||
status: String,
|
announce: [String],
|
||||||
},
|
},
|
||||||
|
],
|
||||||
usenet: {
|
usenet: {
|
||||||
sourceApplication: String,
|
sourceApplication: String,
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -1,21 +1,34 @@
|
|||||||
const mongoose = require("mongoose");
|
const mongoose = require("mongoose");
|
||||||
const paginate = require("mongoose-paginate-v2");
|
const paginate = require("mongoose-paginate-v2");
|
||||||
|
|
||||||
const SettingsScehma = mongoose.Schema({
|
const HostSchema = mongoose.Schema({
|
||||||
directConnect: {
|
_id: false,
|
||||||
client: {
|
|
||||||
host: {
|
|
||||||
username: String,
|
username: String,
|
||||||
password: String,
|
password: String,
|
||||||
hostname: String,
|
hostname: String,
|
||||||
port: String,
|
port: String,
|
||||||
protocol: String,
|
protocol: String,
|
||||||
},
|
});
|
||||||
|
const SettingsScehma = mongoose.Schema({
|
||||||
|
directConnect: {
|
||||||
|
client: {
|
||||||
|
host: HostSchema,
|
||||||
airDCPPUserSettings: Object,
|
airDCPPUserSettings: Object,
|
||||||
|
|
||||||
hubs: Array,
|
hubs: Array,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
bittorrent: {
|
||||||
|
client: {
|
||||||
|
name: String,
|
||||||
|
host: HostSchema,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
prowlarr: {
|
||||||
|
client: {
|
||||||
|
host: HostSchema,
|
||||||
|
apiKey: String,
|
||||||
|
},
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
const Settings = mongoose.model("Settings", SettingsScehma);
|
const Settings = mongoose.model("Settings", SettingsScehma);
|
||||||
|
|||||||
861
package-lock.json
generated
861
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -7,6 +7,8 @@ import {
|
|||||||
ServiceSchema,
|
ServiceSchema,
|
||||||
Errors,
|
Errors,
|
||||||
} from "moleculer";
|
} from "moleculer";
|
||||||
|
import { DbMixin } from "../mixins/db.mixin";
|
||||||
|
import Comic from "../models/comic.model";
|
||||||
import path from "path";
|
import path from "path";
|
||||||
import {
|
import {
|
||||||
analyze,
|
analyze,
|
||||||
@@ -22,16 +24,13 @@ export default class ImageTransformation extends Service {
|
|||||||
super(broker);
|
super(broker);
|
||||||
this.parseServiceSchema({
|
this.parseServiceSchema({
|
||||||
name: "imagetransformation",
|
name: "imagetransformation",
|
||||||
mixins: [],
|
mixins: [DbMixin("comics", Comic)],
|
||||||
settings: {
|
settings: {
|
||||||
// Available fields in the responses
|
// Available fields in the responses
|
||||||
fields: ["_id", "name", "quantity", "price"],
|
fields: ["_id"],
|
||||||
|
|
||||||
// Validator for the `create` & `insert` actions.
|
// Validator for the `create` & `insert` actions.
|
||||||
entityValidator: {
|
entityValidator: {},
|
||||||
name: "string|min:3",
|
|
||||||
price: "number|positive",
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
hooks: {},
|
hooks: {},
|
||||||
actions: {
|
actions: {
|
||||||
|
|||||||
@@ -2,10 +2,16 @@ import { Context, Service, ServiceBroker } from "moleculer";
|
|||||||
import JobResult from "../models/jobresult.model";
|
import JobResult from "../models/jobresult.model";
|
||||||
import { refineQuery } from "filename-parser";
|
import { refineQuery } from "filename-parser";
|
||||||
import BullMqMixin from "moleculer-bullmq";
|
import BullMqMixin from "moleculer-bullmq";
|
||||||
import { extractFromArchive } from "../utils/uncompression.utils";
|
import { DbMixin } from "../mixins/db.mixin";
|
||||||
|
import Comic from "../models/comic.model";
|
||||||
|
const ObjectId = require("mongoose").Types.ObjectId;
|
||||||
|
import {
|
||||||
|
extractFromArchive,
|
||||||
|
uncompressEntireArchive,
|
||||||
|
} from "../utils/uncompression.utils";
|
||||||
import { isNil, isUndefined } from "lodash";
|
import { isNil, isUndefined } from "lodash";
|
||||||
import { pubClient } from "../config/redis.config";
|
import { pubClient } from "../config/redis.config";
|
||||||
|
import path from "path";
|
||||||
const { MoleculerError } = require("moleculer").Errors;
|
const { MoleculerError } = require("moleculer").Errors;
|
||||||
|
|
||||||
console.log(process.env.REDIS_URI);
|
console.log(process.env.REDIS_URI);
|
||||||
@@ -15,7 +21,7 @@ export default class JobQueueService extends Service {
|
|||||||
this.parseServiceSchema({
|
this.parseServiceSchema({
|
||||||
name: "jobqueue",
|
name: "jobqueue",
|
||||||
hooks: {},
|
hooks: {},
|
||||||
mixins: [BullMqMixin],
|
mixins: [DbMixin("comics", Comic), BullMqMixin],
|
||||||
settings: {
|
settings: {
|
||||||
bullmq: {
|
bullmq: {
|
||||||
client: process.env.REDIS_URI,
|
client: process.env.REDIS_URI,
|
||||||
@@ -44,19 +50,30 @@ export default class JobQueueService extends Service {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
enqueue: {
|
enqueue: {
|
||||||
queue: true,
|
queue: true,
|
||||||
rest: "/GET enqueue",
|
rest: "GET /enqueue",
|
||||||
handler: async (ctx: Context<{}>) => {
|
handler: async (
|
||||||
|
ctx: Context<{ action: string; description: string }>
|
||||||
|
) => {
|
||||||
|
const { action, description } = ctx.params;
|
||||||
// Enqueue the job
|
// Enqueue the job
|
||||||
const job = await this.localQueue(ctx, "enqueue.async", ctx.params, {
|
const job = await this.localQueue(
|
||||||
|
ctx,
|
||||||
|
action,
|
||||||
|
ctx.params,
|
||||||
|
{
|
||||||
priority: 10,
|
priority: 10,
|
||||||
});
|
}
|
||||||
|
);
|
||||||
console.log(`Job ${job.id} enqueued`);
|
console.log(`Job ${job.id} enqueued`);
|
||||||
|
console.log(`${description}`);
|
||||||
|
|
||||||
return job.id;
|
return job.id;
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
// Comic Book Import Job Queue
|
// Comic Book Import Job Queue
|
||||||
"enqueue.async": {
|
"enqueue.async": {
|
||||||
handler: async (
|
handler: async (
|
||||||
@@ -65,13 +82,16 @@ export default class JobQueueService extends Service {
|
|||||||
}>
|
}>
|
||||||
) => {
|
) => {
|
||||||
try {
|
try {
|
||||||
console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`);
|
console.log(
|
||||||
console.log(ctx.params);
|
`Recieved Job ID ${ctx.locals.job.id}, processing...`
|
||||||
|
);
|
||||||
// 1. De-structure the job params
|
// 1. De-structure the job params
|
||||||
const { fileObject } = ctx.locals.job.data.params;
|
const { fileObject } = ctx.locals.job.data.params;
|
||||||
|
|
||||||
// 2. Extract metadata from the archive
|
// 2. Extract metadata from the archive
|
||||||
const result = await extractFromArchive(fileObject.filePath);
|
const result = await extractFromArchive(
|
||||||
|
fileObject.filePath
|
||||||
|
);
|
||||||
const {
|
const {
|
||||||
name,
|
name,
|
||||||
filePath,
|
filePath,
|
||||||
@@ -84,7 +104,9 @@ export default class JobQueueService extends Service {
|
|||||||
} = result;
|
} = result;
|
||||||
|
|
||||||
// 3a. Infer any issue-related metadata from the filename
|
// 3a. Infer any issue-related metadata from the filename
|
||||||
const { inferredIssueDetails } = refineQuery(result.name);
|
const { inferredIssueDetails } = refineQuery(
|
||||||
|
result.name
|
||||||
|
);
|
||||||
console.log(
|
console.log(
|
||||||
"Issue metadata inferred: ",
|
"Issue metadata inferred: ",
|
||||||
JSON.stringify(inferredIssueDetails, null, 2)
|
JSON.stringify(inferredIssueDetails, null, 2)
|
||||||
@@ -124,7 +146,8 @@ export default class JobQueueService extends Service {
|
|||||||
// "acquisition.directconnect.downloads": [],
|
// "acquisition.directconnect.downloads": [],
|
||||||
|
|
||||||
// mark the metadata source
|
// mark the metadata source
|
||||||
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom,
|
"acquisition.source.name":
|
||||||
|
ctx.locals.job.data.params.sourcedFrom,
|
||||||
};
|
};
|
||||||
|
|
||||||
// 3c. Add the bundleId, if present to the payload
|
// 3c. Add the bundleId, if present to the payload
|
||||||
@@ -135,8 +158,13 @@ export default class JobQueueService extends Service {
|
|||||||
|
|
||||||
// 3d. Add the sourcedMetadata, if present
|
// 3d. Add the sourcedMetadata, if present
|
||||||
if (
|
if (
|
||||||
!isNil(ctx.locals.job.data.params.sourcedMetadata) &&
|
!isNil(
|
||||||
!isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine)
|
ctx.locals.job.data.params.sourcedMetadata
|
||||||
|
) &&
|
||||||
|
!isUndefined(
|
||||||
|
ctx.locals.job.data.params.sourcedMetadata
|
||||||
|
.comicvine
|
||||||
|
)
|
||||||
) {
|
) {
|
||||||
Object.assign(
|
Object.assign(
|
||||||
payload.sourcedMetadata,
|
payload.sourcedMetadata,
|
||||||
@@ -145,11 +173,15 @@ export default class JobQueueService extends Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 4. write to mongo
|
// 4. write to mongo
|
||||||
const importResult = await this.broker.call("library.rawImportToDB", {
|
const importResult = await this.broker.call(
|
||||||
importType: ctx.locals.job.data.params.importType,
|
"library.rawImportToDB",
|
||||||
|
{
|
||||||
|
importType:
|
||||||
|
ctx.locals.job.data.params.importType,
|
||||||
bundleId,
|
bundleId,
|
||||||
payload,
|
payload,
|
||||||
});
|
}
|
||||||
|
);
|
||||||
return {
|
return {
|
||||||
data: {
|
data: {
|
||||||
importResult,
|
importResult,
|
||||||
@@ -161,9 +193,14 @@ export default class JobQueueService extends Service {
|
|||||||
console.error(
|
console.error(
|
||||||
`An error occurred processing Job ID ${ctx.locals.job.id}`
|
`An error occurred processing Job ID ${ctx.locals.job.id}`
|
||||||
);
|
);
|
||||||
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", {
|
throw new MoleculerError(
|
||||||
|
error,
|
||||||
|
500,
|
||||||
|
"IMPORT_JOB_ERROR",
|
||||||
|
{
|
||||||
data: ctx.params.sessionId,
|
data: ctx.params.sessionId,
|
||||||
});
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -191,7 +228,8 @@ export default class JobQueueService extends Service {
|
|||||||
statuses: {
|
statuses: {
|
||||||
$push: {
|
$push: {
|
||||||
status: "$_id.status",
|
status: "$_id.status",
|
||||||
earliestTimestamp: "$earliestTimestamp",
|
earliestTimestamp:
|
||||||
|
"$earliestTimestamp",
|
||||||
count: "$count",
|
count: "$count",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -211,7 +249,10 @@ export default class JobQueueService extends Service {
|
|||||||
{
|
{
|
||||||
$cond: [
|
$cond: [
|
||||||
{
|
{
|
||||||
$eq: ["$$this.status", "completed"],
|
$eq: [
|
||||||
|
"$$this.status",
|
||||||
|
"completed",
|
||||||
|
],
|
||||||
},
|
},
|
||||||
"$$this.count",
|
"$$this.count",
|
||||||
0,
|
0,
|
||||||
@@ -231,7 +272,10 @@ export default class JobQueueService extends Service {
|
|||||||
{
|
{
|
||||||
$cond: [
|
$cond: [
|
||||||
{
|
{
|
||||||
$eq: ["$$this.status", "failed"],
|
$eq: [
|
||||||
|
"$$this.status",
|
||||||
|
"failed",
|
||||||
|
],
|
||||||
},
|
},
|
||||||
"$$this.count",
|
"$$this.count",
|
||||||
0,
|
0,
|
||||||
@@ -249,9 +293,75 @@ export default class JobQueueService extends Service {
|
|||||||
]);
|
]);
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"uncompressFullArchive.async": {
|
||||||
|
rest: "POST /uncompressFullArchive",
|
||||||
|
handler: async (
|
||||||
|
ctx: Context<{
|
||||||
|
filePath: string;
|
||||||
|
comicObjectId: string;
|
||||||
|
options: any;
|
||||||
|
}>
|
||||||
|
) => {
|
||||||
|
console.log(
|
||||||
|
`Recieved Job ID ${JSON.stringify(
|
||||||
|
ctx.locals
|
||||||
|
)}, processing...`
|
||||||
|
);
|
||||||
|
const { filePath, options, comicObjectId } = ctx.params;
|
||||||
|
const comicId = new ObjectId(comicObjectId);
|
||||||
|
// 2. Extract metadata from the archive
|
||||||
|
const result: string[] = await uncompressEntireArchive(
|
||||||
|
filePath,
|
||||||
|
options
|
||||||
|
);
|
||||||
|
if (Array.isArray(result) && result.length !== 0) {
|
||||||
|
// Get the containing directory of the uncompressed archive
|
||||||
|
const directoryPath = path.dirname(result[0]);
|
||||||
|
// Add to mongo object
|
||||||
|
await Comic.findByIdAndUpdate(
|
||||||
|
comicId,
|
||||||
|
{
|
||||||
|
$set: {
|
||||||
|
"rawFileDetails.archive": {
|
||||||
|
uncompressed: true,
|
||||||
|
expandedPath: directoryPath,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{ new: true, safe: true, upsert: true }
|
||||||
|
);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
events: {
|
events: {
|
||||||
|
async "uncompressFullArchive.async.active"(
|
||||||
|
ctx: Context<{ id: number }>
|
||||||
|
) {
|
||||||
|
console.log(
|
||||||
|
`Uncompression Job ID ${ctx.params.id} is set to active.`
|
||||||
|
);
|
||||||
|
},
|
||||||
|
async "uncompressFullArchive.async.completed"(
|
||||||
|
ctx: Context<{ id: number }>
|
||||||
|
) {
|
||||||
|
console.log(
|
||||||
|
`Uncompression Job ID ${ctx.params.id} completed.`
|
||||||
|
);
|
||||||
|
const job = await this.job(ctx.params.id);
|
||||||
|
await this.broker.call("socket.broadcast", {
|
||||||
|
namespace: "/",
|
||||||
|
event: "LS_UNCOMPRESSION_JOB_COMPLETE",
|
||||||
|
args: [
|
||||||
|
{
|
||||||
|
uncompressedArchive: job.returnvalue,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
});
|
||||||
|
return job.returnvalue;
|
||||||
|
},
|
||||||
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
|
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
|
||||||
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
|
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
|
||||||
console.log(`Job ID ${ctx.params.id} is set to active.`);
|
console.log(`Job ID ${ctx.params.id} is set to active.`);
|
||||||
@@ -260,10 +370,10 @@ export default class JobQueueService extends Service {
|
|||||||
console.log("Queue drained.");
|
console.log("Queue drained.");
|
||||||
await this.broker.call("socket.broadcast", {
|
await this.broker.call("socket.broadcast", {
|
||||||
namespace: "/",
|
namespace: "/",
|
||||||
event: "action",
|
event: "LS_IMPORT_QUEUE_DRAINED",
|
||||||
args: [
|
args: [
|
||||||
{
|
{
|
||||||
type: "LS_IMPORT_QUEUE_DRAINED",
|
message: "drained",
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
});
|
});
|
||||||
@@ -274,14 +384,15 @@ export default class JobQueueService extends Service {
|
|||||||
// 2. Increment the completed job counter
|
// 2. Increment the completed job counter
|
||||||
await pubClient.incr("completedJobCount");
|
await pubClient.incr("completedJobCount");
|
||||||
// 3. Fetch the completed job count for the final payload to be sent to the client
|
// 3. Fetch the completed job count for the final payload to be sent to the client
|
||||||
const completedJobCount = await pubClient.get("completedJobCount");
|
const completedJobCount = await pubClient.get(
|
||||||
|
"completedJobCount"
|
||||||
|
);
|
||||||
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
|
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
|
||||||
await this.broker.call("socket.broadcast", {
|
await this.broker.call("socket.broadcast", {
|
||||||
namespace: "/",
|
namespace: "/",
|
||||||
event: "action",
|
event: "LS_COVER_EXTRACTED",
|
||||||
args: [
|
args: [
|
||||||
{
|
{
|
||||||
type: "LS_COVER_EXTRACTED",
|
|
||||||
completedJobCount,
|
completedJobCount,
|
||||||
importResult: job.returnvalue.data.importResult,
|
importResult: job.returnvalue.data.importResult,
|
||||||
},
|
},
|
||||||
@@ -302,7 +413,9 @@ export default class JobQueueService extends Service {
|
|||||||
async "enqueue.async.failed"(ctx) {
|
async "enqueue.async.failed"(ctx) {
|
||||||
const job = await this.job(ctx.params.id);
|
const job = await this.job(ctx.params.id);
|
||||||
await pubClient.incr("failedJobCount");
|
await pubClient.incr("failedJobCount");
|
||||||
const failedJobCount = await pubClient.get("failedJobCount");
|
const failedJobCount = await pubClient.get(
|
||||||
|
"failedJobCount"
|
||||||
|
);
|
||||||
|
|
||||||
await JobResult.create({
|
await JobResult.create({
|
||||||
id: ctx.params.id,
|
id: ctx.params.id,
|
||||||
@@ -315,10 +428,9 @@ export default class JobQueueService extends Service {
|
|||||||
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
|
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
|
||||||
await this.broker.call("socket.broadcast", {
|
await this.broker.call("socket.broadcast", {
|
||||||
namespace: "/",
|
namespace: "/",
|
||||||
event: "action",
|
event: "LS_COVER_EXTRACTION_FAILED",
|
||||||
args: [
|
args: [
|
||||||
{
|
{
|
||||||
type: "LS_COVER_EXTRACTION_FAILED",
|
|
||||||
failedJobCount,
|
failedJobCount,
|
||||||
importResult: job,
|
importResult: job,
|
||||||
},
|
},
|
||||||
@@ -326,6 +438,7 @@ export default class JobQueueService extends Service {
|
|||||||
});
|
});
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
methods: {},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,7 +33,13 @@ SOFTWARE.
|
|||||||
|
|
||||||
"use strict";
|
"use strict";
|
||||||
import { isNil } from "lodash";
|
import { isNil } from "lodash";
|
||||||
import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer";
|
import {
|
||||||
|
Context,
|
||||||
|
Service,
|
||||||
|
ServiceBroker,
|
||||||
|
ServiceSchema,
|
||||||
|
Errors,
|
||||||
|
} from "moleculer";
|
||||||
import { DbMixin } from "../mixins/db.mixin";
|
import { DbMixin } from "../mixins/db.mixin";
|
||||||
import Comic from "../models/comic.model";
|
import Comic from "../models/comic.model";
|
||||||
import { walkFolder, getSizeOfDirectory } from "../utils/file.utils";
|
import { walkFolder, getSizeOfDirectory } from "../utils/file.utils";
|
||||||
@@ -74,14 +80,19 @@ export default class ImportService extends Service {
|
|||||||
},
|
},
|
||||||
walkFolders: {
|
walkFolders: {
|
||||||
rest: "POST /walkFolders",
|
rest: "POST /walkFolders",
|
||||||
params: {
|
params: {},
|
||||||
basePathToWalk: "string",
|
async handler(
|
||||||
},
|
ctx: Context<{
|
||||||
async handler(ctx: Context<{ basePathToWalk: string }>) {
|
basePathToWalk: string;
|
||||||
|
extensions: string[];
|
||||||
|
}>
|
||||||
|
) {
|
||||||
|
console.log(ctx.params);
|
||||||
return await walkFolder(ctx.params.basePathToWalk, [
|
return await walkFolder(ctx.params.basePathToWalk, [
|
||||||
".cbz",
|
".cbz",
|
||||||
".cbr",
|
".cbr",
|
||||||
".cb7",
|
".cb7",
|
||||||
|
...ctx.params.extensions,
|
||||||
]);
|
]);
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -95,10 +106,19 @@ export default class ImportService extends Service {
|
|||||||
uncompressFullArchive: {
|
uncompressFullArchive: {
|
||||||
rest: "POST /uncompressFullArchive",
|
rest: "POST /uncompressFullArchive",
|
||||||
params: {},
|
params: {},
|
||||||
handler: async (ctx: Context<{ filePath: string; options: any }>) => {
|
handler: async (
|
||||||
await broker.call("importqueue.uncompressResize", {
|
ctx: Context<{
|
||||||
|
filePath: string;
|
||||||
|
comicObjectId: string;
|
||||||
|
options: any;
|
||||||
|
}>
|
||||||
|
) => {
|
||||||
|
this.broker.call("jobqueue.enqueue", {
|
||||||
filePath: ctx.params.filePath,
|
filePath: ctx.params.filePath,
|
||||||
|
comicObjectId: ctx.params.comicObjectId,
|
||||||
options: ctx.params.options,
|
options: ctx.params.options,
|
||||||
|
queueName: "uncompressFullArchive.async",
|
||||||
|
description: `Job for uncompressing archive at ${ctx.params.filePath}`,
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -113,7 +133,8 @@ export default class ImportService extends Service {
|
|||||||
});
|
});
|
||||||
// Determine source where the comic was added from
|
// Determine source where the comic was added from
|
||||||
// and gather identifying information about it
|
// and gather identifying information about it
|
||||||
const sourceName = referenceComicObject[0].acquisition.source.name;
|
const sourceName =
|
||||||
|
referenceComicObject[0].acquisition.source.name;
|
||||||
const { sourcedMetadata } = referenceComicObject[0];
|
const { sourcedMetadata } = referenceComicObject[0];
|
||||||
|
|
||||||
const filePath = `${COMICS_DIRECTORY}/${ctx.params.bundle.data.name}`;
|
const filePath = `${COMICS_DIRECTORY}/${ctx.params.bundle.data.name}`;
|
||||||
@@ -157,8 +178,14 @@ export default class ImportService extends Service {
|
|||||||
// 1.1 Filter on .cb* extensions
|
// 1.1 Filter on .cb* extensions
|
||||||
.pipe(
|
.pipe(
|
||||||
through2.obj(function (item, enc, next) {
|
through2.obj(function (item, enc, next) {
|
||||||
let fileExtension = path.extname(item.path);
|
let fileExtension = path.extname(
|
||||||
if ([".cbz", ".cbr", ".cb7"].includes(fileExtension)) {
|
item.path
|
||||||
|
);
|
||||||
|
if (
|
||||||
|
[".cbz", ".cbr", ".cb7"].includes(
|
||||||
|
fileExtension
|
||||||
|
)
|
||||||
|
) {
|
||||||
this.push(item);
|
this.push(item);
|
||||||
}
|
}
|
||||||
next();
|
next();
|
||||||
@@ -167,7 +194,10 @@ export default class ImportService extends Service {
|
|||||||
// 1.2 Pipe filtered results to the next step
|
// 1.2 Pipe filtered results to the next step
|
||||||
// Enqueue the job in the queue
|
// Enqueue the job in the queue
|
||||||
.on("data", async (item) => {
|
.on("data", async (item) => {
|
||||||
console.info("Found a file at path: %s", item.path);
|
console.info(
|
||||||
|
"Found a file at path: %s",
|
||||||
|
item.path
|
||||||
|
);
|
||||||
let comicExists = await Comic.exists({
|
let comicExists = await Comic.exists({
|
||||||
"rawFileDetails.name": `${path.basename(
|
"rawFileDetails.name": `${path.basename(
|
||||||
item.path,
|
item.path,
|
||||||
@@ -176,8 +206,14 @@ export default class ImportService extends Service {
|
|||||||
});
|
});
|
||||||
if (!comicExists) {
|
if (!comicExists) {
|
||||||
// 2.1 Reset the job counters in Redis
|
// 2.1 Reset the job counters in Redis
|
||||||
await pubClient.set("completedJobCount", 0);
|
await pubClient.set(
|
||||||
await pubClient.set("failedJobCount", 0);
|
"completedJobCount",
|
||||||
|
0
|
||||||
|
);
|
||||||
|
await pubClient.set(
|
||||||
|
"failedJobCount",
|
||||||
|
0
|
||||||
|
);
|
||||||
// 2.2 Send the extraction job to the queue
|
// 2.2 Send the extraction job to the queue
|
||||||
this.broker.call("jobqueue.enqueue", {
|
this.broker.call("jobqueue.enqueue", {
|
||||||
fileObject: {
|
fileObject: {
|
||||||
@@ -186,9 +222,12 @@ export default class ImportService extends Service {
|
|||||||
},
|
},
|
||||||
sessionId,
|
sessionId,
|
||||||
importType: "new",
|
importType: "new",
|
||||||
|
action: "enqueue.async",
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
console.log("Comic already exists in the library.");
|
console.log(
|
||||||
|
"Comic already exists in the library."
|
||||||
|
);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", () => {
|
||||||
@@ -240,21 +279,31 @@ export default class ImportService extends Service {
|
|||||||
// we solicit volume information and add that to mongo
|
// we solicit volume information and add that to mongo
|
||||||
if (
|
if (
|
||||||
comicMetadata.sourcedMetadata.comicvine &&
|
comicMetadata.sourcedMetadata.comicvine &&
|
||||||
!isNil(comicMetadata.sourcedMetadata.comicvine.volume)
|
!isNil(
|
||||||
|
comicMetadata.sourcedMetadata.comicvine
|
||||||
|
.volume
|
||||||
|
)
|
||||||
) {
|
) {
|
||||||
volumeDetails = await this.broker.call("comicvine.getVolumes", {
|
volumeDetails = await this.broker.call(
|
||||||
|
"comicvine.getVolumes",
|
||||||
|
{
|
||||||
volumeURI:
|
volumeURI:
|
||||||
comicMetadata.sourcedMetadata.comicvine.volume
|
comicMetadata.sourcedMetadata
|
||||||
|
.comicvine.volume
|
||||||
.api_detail_url,
|
.api_detail_url,
|
||||||
});
|
}
|
||||||
|
);
|
||||||
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
|
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
|
||||||
volumeDetails.results;
|
volumeDetails.results;
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log("Saving to Mongo...");
|
console.log("Saving to Mongo...");
|
||||||
console.log(`Import type: [${ctx.params.importType}]`);
|
console.log(
|
||||||
|
`Import type: [${ctx.params.importType}]`
|
||||||
|
);
|
||||||
switch (ctx.params.importType) {
|
switch (ctx.params.importType) {
|
||||||
case "new":
|
case "new":
|
||||||
|
console.log(comicMetadata);
|
||||||
return await Comic.create(comicMetadata);
|
return await Comic.create(comicMetadata);
|
||||||
case "update":
|
case "update":
|
||||||
return await Comic.findOneAndUpdate(
|
return await Comic.findOneAndUpdate(
|
||||||
@@ -273,7 +322,10 @@ export default class ImportService extends Service {
|
|||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.log(error);
|
console.log(error);
|
||||||
throw new Errors.MoleculerError("Import failed.", 500);
|
throw new Errors.MoleculerError(
|
||||||
|
"Import failed.",
|
||||||
|
500
|
||||||
|
);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -291,7 +343,9 @@ export default class ImportService extends Service {
|
|||||||
) {
|
) {
|
||||||
// 1. Find mongo object by id
|
// 1. Find mongo object by id
|
||||||
// 2. Import payload into sourcedMetadata.comicvine
|
// 2. Import payload into sourcedMetadata.comicvine
|
||||||
const comicObjectId = new ObjectId(ctx.params.comicObjectId);
|
const comicObjectId = new ObjectId(
|
||||||
|
ctx.params.comicObjectId
|
||||||
|
);
|
||||||
|
|
||||||
return new Promise(async (resolve, reject) => {
|
return new Promise(async (resolve, reject) => {
|
||||||
let volumeDetails = {};
|
let volumeDetails = {};
|
||||||
@@ -300,15 +354,18 @@ export default class ImportService extends Service {
|
|||||||
const volumeDetails = await this.broker.call(
|
const volumeDetails = await this.broker.call(
|
||||||
"comicvine.getVolumes",
|
"comicvine.getVolumes",
|
||||||
{
|
{
|
||||||
volumeURI: matchedResult.volume.api_detail_url,
|
volumeURI:
|
||||||
|
matchedResult.volume.api_detail_url,
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
matchedResult.volumeInformation = volumeDetails.results;
|
matchedResult.volumeInformation =
|
||||||
|
volumeDetails.results;
|
||||||
Comic.findByIdAndUpdate(
|
Comic.findByIdAndUpdate(
|
||||||
comicObjectId,
|
comicObjectId,
|
||||||
{
|
{
|
||||||
$set: {
|
$set: {
|
||||||
"sourcedMetadata.comicvine": matchedResult,
|
"sourcedMetadata.comicvine":
|
||||||
|
matchedResult,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{ new: true },
|
{ new: true },
|
||||||
@@ -339,7 +396,9 @@ export default class ImportService extends Service {
|
|||||||
}>
|
}>
|
||||||
) {
|
) {
|
||||||
console.log(JSON.stringify(ctx.params, null, 2));
|
console.log(JSON.stringify(ctx.params, null, 2));
|
||||||
const comicObjectId = new ObjectId(ctx.params.comicObjectId);
|
const comicObjectId = new ObjectId(
|
||||||
|
ctx.params.comicObjectId
|
||||||
|
);
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
Comic.findByIdAndUpdate(
|
Comic.findByIdAndUpdate(
|
||||||
@@ -366,6 +425,66 @@ export default class ImportService extends Service {
|
|||||||
});
|
});
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
applyTorrentDownloadMetadata: {
|
||||||
|
rest: "POST /applyTorrentDownloadMetadata",
|
||||||
|
handler: async (
|
||||||
|
ctx: Context<{
|
||||||
|
torrentToDownload: any;
|
||||||
|
comicObjectId: String;
|
||||||
|
infoHash: String;
|
||||||
|
name: String;
|
||||||
|
announce: [String];
|
||||||
|
}>
|
||||||
|
) => {
|
||||||
|
const {
|
||||||
|
name,
|
||||||
|
torrentToDownload,
|
||||||
|
comicObjectId,
|
||||||
|
announce,
|
||||||
|
infoHash,
|
||||||
|
} = ctx.params;
|
||||||
|
console.log(JSON.stringify(ctx.params, null, 4));
|
||||||
|
try {
|
||||||
|
return await Comic.findByIdAndUpdate(
|
||||||
|
new ObjectId(comicObjectId),
|
||||||
|
{
|
||||||
|
$push: {
|
||||||
|
"acquisition.torrent": {
|
||||||
|
infoHash,
|
||||||
|
name,
|
||||||
|
announce,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{ new: true, safe: true, upsert: true }
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
console.log(err);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
getInfoHashes: {
|
||||||
|
rest: "GET /getInfoHashes",
|
||||||
|
handler: async (ctx: Context<{}>) => {
|
||||||
|
try {
|
||||||
|
return await Comic.aggregate([
|
||||||
|
{
|
||||||
|
$unwind: "$acquisition.torrent",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
$group: {
|
||||||
|
_id: "$_id",
|
||||||
|
infoHashes: {
|
||||||
|
$push: "$acquisition.torrent.infoHash",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
} catch (err) {
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
getComicBooks: {
|
getComicBooks: {
|
||||||
rest: "POST /getComicBooks",
|
rest: "POST /getComicBooks",
|
||||||
params: {},
|
params: {},
|
||||||
@@ -385,6 +504,7 @@ export default class ImportService extends Service {
|
|||||||
rest: "POST /getComicBookById",
|
rest: "POST /getComicBookById",
|
||||||
params: { id: "string" },
|
params: { id: "string" },
|
||||||
async handler(ctx: Context<{ id: string }>) {
|
async handler(ctx: Context<{ id: string }>) {
|
||||||
|
console.log(ctx.params.id);
|
||||||
return await Comic.findById(ctx.params.id);
|
return await Comic.findById(ctx.params.id);
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -393,7 +513,9 @@ export default class ImportService extends Service {
|
|||||||
params: { ids: "array" },
|
params: { ids: "array" },
|
||||||
handler: async (ctx: Context<{ ids: [string] }>) => {
|
handler: async (ctx: Context<{ ids: [string] }>) => {
|
||||||
console.log(ctx.params.ids);
|
console.log(ctx.params.ids);
|
||||||
const queryIds = ctx.params.ids.map((id) => new ObjectId(id));
|
const queryIds = ctx.params.ids.map(
|
||||||
|
(id) => new ObjectId(id)
|
||||||
|
);
|
||||||
return await Comic.find({
|
return await Comic.find({
|
||||||
_id: {
|
_id: {
|
||||||
$in: queryIds,
|
$in: queryIds,
|
||||||
@@ -409,7 +531,8 @@ export default class ImportService extends Service {
|
|||||||
const volumes = await Comic.aggregate([
|
const volumes = await Comic.aggregate([
|
||||||
{
|
{
|
||||||
$project: {
|
$project: {
|
||||||
volumeInfo: "$sourcedMetadata.comicvine.volumeInformation",
|
volumeInfo:
|
||||||
|
"$sourcedMetadata.comicvine.volumeInformation",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -455,7 +578,8 @@ export default class ImportService extends Service {
|
|||||||
const { queryObjects } = ctx.params;
|
const { queryObjects } = ctx.params;
|
||||||
// construct the query for ElasticSearch
|
// construct the query for ElasticSearch
|
||||||
let elasticSearchQuery = {};
|
let elasticSearchQuery = {};
|
||||||
const elasticSearchQueries = queryObjects.map((queryObject) => {
|
const elasticSearchQueries = queryObjects.map(
|
||||||
|
(queryObject) => {
|
||||||
console.log("Volume: ", queryObject.volumeName);
|
console.log("Volume: ", queryObject.volumeName);
|
||||||
console.log("Issue: ", queryObject.issueName);
|
console.log("Issue: ", queryObject.issueName);
|
||||||
if (queryObject.issueName === null) {
|
if (queryObject.issueName === null) {
|
||||||
@@ -469,12 +593,14 @@ export default class ImportService extends Service {
|
|||||||
must: [
|
must: [
|
||||||
{
|
{
|
||||||
match_phrase: {
|
match_phrase: {
|
||||||
"rawFileDetails.name": queryObject.volumeName,
|
"rawFileDetails.name":
|
||||||
|
queryObject.volumeName,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
term: {
|
term: {
|
||||||
"inferredMetadata.issue.number": parseInt(
|
"inferredMetadata.issue.number":
|
||||||
|
parseInt(
|
||||||
queryObject.issueNumber,
|
queryObject.issueNumber,
|
||||||
10
|
10
|
||||||
),
|
),
|
||||||
@@ -493,8 +619,11 @@ export default class ImportService extends Service {
|
|||||||
query: elasticSearchQuery,
|
query: elasticSearchQuery,
|
||||||
},
|
},
|
||||||
];
|
];
|
||||||
});
|
}
|
||||||
console.log(JSON.stringify(elasticSearchQueries, null, 2));
|
);
|
||||||
|
console.log(
|
||||||
|
JSON.stringify(elasticSearchQueries, null, 2)
|
||||||
|
);
|
||||||
|
|
||||||
return await ctx.broker.call("search.searchComic", {
|
return await ctx.broker.call("search.searchComic", {
|
||||||
elasticSearchQueries,
|
elasticSearchQueries,
|
||||||
@@ -507,11 +636,10 @@ export default class ImportService extends Service {
|
|||||||
rest: "GET /libraryStatistics",
|
rest: "GET /libraryStatistics",
|
||||||
params: {},
|
params: {},
|
||||||
handler: async (ctx: Context<{}>) => {
|
handler: async (ctx: Context<{}>) => {
|
||||||
const comicDirectorySize = await getSizeOfDirectory(COMICS_DIRECTORY, [
|
const comicDirectorySize = await getSizeOfDirectory(
|
||||||
".cbz",
|
COMICS_DIRECTORY,
|
||||||
".cbr",
|
[".cbz", ".cbr", ".cb7"]
|
||||||
".cb7",
|
);
|
||||||
]);
|
|
||||||
const totalCount = await Comic.countDocuments({});
|
const totalCount = await Comic.countDocuments({});
|
||||||
const statistics = await Comic.aggregate([
|
const statistics = await Comic.aggregate([
|
||||||
{
|
{
|
||||||
@@ -520,7 +648,11 @@ export default class ImportService extends Service {
|
|||||||
{
|
{
|
||||||
$match: {
|
$match: {
|
||||||
"rawFileDetails.extension": {
|
"rawFileDetails.extension": {
|
||||||
$in: [".cbr", ".cbz", ".cb7"],
|
$in: [
|
||||||
|
".cbr",
|
||||||
|
".cbz",
|
||||||
|
".cb7",
|
||||||
|
],
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -534,7 +666,8 @@ export default class ImportService extends Service {
|
|||||||
issues: [
|
issues: [
|
||||||
{
|
{
|
||||||
$match: {
|
$match: {
|
||||||
"sourcedMetadata.comicvine.volumeInformation": {
|
"sourcedMetadata.comicvine.volumeInformation":
|
||||||
|
{
|
||||||
$gt: {},
|
$gt: {},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -599,13 +732,20 @@ export default class ImportService extends Service {
|
|||||||
.drop()
|
.drop()
|
||||||
.then(async (data) => {
|
.then(async (data) => {
|
||||||
console.info(data);
|
console.info(data);
|
||||||
const coversFolderDeleteResult = fsExtra.emptyDirSync(
|
const coversFolderDeleteResult =
|
||||||
path.resolve(`${USERDATA_DIRECTORY}/covers`)
|
fsExtra.emptyDirSync(
|
||||||
|
path.resolve(
|
||||||
|
`${USERDATA_DIRECTORY}/covers`
|
||||||
|
)
|
||||||
);
|
);
|
||||||
const expandedFolderDeleteResult = fsExtra.emptyDirSync(
|
const expandedFolderDeleteResult =
|
||||||
path.resolve(`${USERDATA_DIRECTORY}/expanded`)
|
fsExtra.emptyDirSync(
|
||||||
|
path.resolve(
|
||||||
|
`${USERDATA_DIRECTORY}/expanded`
|
||||||
|
)
|
||||||
);
|
);
|
||||||
const eSIndicesDeleteResult = await ctx.broker.call(
|
const eSIndicesDeleteResult =
|
||||||
|
await ctx.broker.call(
|
||||||
"search.deleteElasticSearchIndices",
|
"search.deleteElasticSearchIndices",
|
||||||
{}
|
{}
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -75,9 +75,9 @@ export default class SettingsService extends Service {
|
|||||||
) => {
|
) => {
|
||||||
try {
|
try {
|
||||||
console.log(ctx.params);
|
console.log(ctx.params);
|
||||||
const { query, pagination } = ctx.params;
|
const { query, pagination, type } = ctx.params;
|
||||||
let eSQuery = {};
|
let eSQuery = {};
|
||||||
switch (ctx.params.type) {
|
switch (type) {
|
||||||
case "all":
|
case "all":
|
||||||
Object.assign(eSQuery, {
|
Object.assign(eSQuery, {
|
||||||
match_all: {},
|
match_all: {},
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import {
|
|||||||
} from "moleculer";
|
} from "moleculer";
|
||||||
import { DbMixin } from "../mixins/db.mixin";
|
import { DbMixin } from "../mixins/db.mixin";
|
||||||
import Settings from "../models/settings.model";
|
import Settings from "../models/settings.model";
|
||||||
import { isEmpty, pickBy, identity, map } from "lodash";
|
import { isEmpty, pickBy, identity, map, isNil } from "lodash";
|
||||||
const ObjectId = require("mongoose").Types.ObjectId;
|
const ObjectId = require("mongoose").Types.ObjectId;
|
||||||
|
|
||||||
export default class SettingsService extends Service {
|
export default class SettingsService extends Service {
|
||||||
@@ -28,12 +28,31 @@ export default class SettingsService extends Service {
|
|||||||
rest: "GET /getAllSettings",
|
rest: "GET /getAllSettings",
|
||||||
params: {},
|
params: {},
|
||||||
async handler(ctx: Context<{ settingsKey: string }>) {
|
async handler(ctx: Context<{ settingsKey: string }>) {
|
||||||
const settings = await Settings.find({});
|
const { settingsKey } = ctx.params;
|
||||||
if (isEmpty(settings)) {
|
|
||||||
|
// Initialize a projection object. Include everything by default.
|
||||||
|
let projection = settingsKey
|
||||||
|
? { _id: 0, [settingsKey]: 1 }
|
||||||
|
: {};
|
||||||
|
|
||||||
|
// Find the settings with the dynamic projection
|
||||||
|
const settings = await Settings.find({}, projection);
|
||||||
|
|
||||||
|
if (settings.length === 0) {
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
console.log(settings[0]);
|
|
||||||
|
// If settingsKey is provided, return the specific part of the settings.
|
||||||
|
// Otherwise, return the entire settings document.
|
||||||
|
if (settingsKey) {
|
||||||
|
// Check if the specific key exists in the settings document.
|
||||||
|
// Since `settings` is an array, we access the first element.
|
||||||
|
// Then, we use the settingsKey to return only that part of the document.
|
||||||
|
return settings[0][settingsKey] || {};
|
||||||
|
} else {
|
||||||
|
// Return the entire settings document
|
||||||
return settings[0];
|
return settings[0];
|
||||||
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
@@ -42,44 +61,106 @@ export default class SettingsService extends Service {
|
|||||||
params: {},
|
params: {},
|
||||||
async handler(
|
async handler(
|
||||||
ctx: Context<{
|
ctx: Context<{
|
||||||
settingsPayload: {
|
settingsPayload?: {
|
||||||
host: object;
|
protocol: string;
|
||||||
airDCPPUserSettings: object;
|
hostname: string;
|
||||||
hubs: [];
|
port: string;
|
||||||
|
username: string;
|
||||||
|
password: string;
|
||||||
|
_id?: string;
|
||||||
|
airDCPPUserSettings?: object;
|
||||||
|
hubs?: [];
|
||||||
};
|
};
|
||||||
settingsObjectId: string;
|
settingsObjectId?: string;
|
||||||
|
settingsKey: string;
|
||||||
}>
|
}>
|
||||||
) {
|
) {
|
||||||
console.log("varan bhat", ctx.params);
|
try {
|
||||||
const { host, airDCPPUserSettings, hubs } =
|
let query = {};
|
||||||
ctx.params.settingsPayload;
|
const { settingsKey, settingsObjectId } =
|
||||||
let query = {
|
ctx.params;
|
||||||
host,
|
const {
|
||||||
airDCPPUserSettings,
|
hostname,
|
||||||
hubs,
|
protocol,
|
||||||
|
port,
|
||||||
|
username,
|
||||||
|
password,
|
||||||
|
} = ctx.params.settingsPayload;
|
||||||
|
const host = {
|
||||||
|
hostname,
|
||||||
|
protocol,
|
||||||
|
port,
|
||||||
|
username,
|
||||||
|
password,
|
||||||
};
|
};
|
||||||
const keysToUpdate = pickBy(query, identity);
|
const undefinedPropsInHostname = Object.values(
|
||||||
let updateQuery = {};
|
host
|
||||||
|
).filter((value) => value === undefined);
|
||||||
|
|
||||||
map(Object.keys(keysToUpdate), (key) => {
|
// Update, depending what key was passed in params
|
||||||
updateQuery[`directConnect.client.${key}`] =
|
// 1. Construct the update query
|
||||||
query[key];
|
switch (settingsKey) {
|
||||||
});
|
case "bittorrent":
|
||||||
|
console.log(
|
||||||
|
`Recieved settings for ${settingsKey}, building query...`
|
||||||
|
);
|
||||||
|
query = {
|
||||||
|
...(undefinedPropsInHostname.length ===
|
||||||
|
0 && {
|
||||||
|
$set: {
|
||||||
|
"bittorrent.client.host": host,
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
case "directConnect":
|
||||||
|
console.log(
|
||||||
|
`Recieved settings for ${settingsKey}, building query...`
|
||||||
|
);
|
||||||
|
const { hubs, airDCPPUserSettings } =
|
||||||
|
ctx.params.settingsPayload;
|
||||||
|
query = {
|
||||||
|
...(undefinedPropsInHostname.length ===
|
||||||
|
0 && {
|
||||||
|
$set: {
|
||||||
|
"directConnect.client.host":
|
||||||
|
host,
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
...(!isNil(hubs) && {
|
||||||
|
$set: {
|
||||||
|
"directConnect.client.hubs":
|
||||||
|
hubs,
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
console.log(JSON.stringify(query, null, 4));
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Set up options, filters
|
||||||
const options = {
|
const options = {
|
||||||
upsert: true,
|
upsert: true,
|
||||||
new: true,
|
|
||||||
setDefaultsOnInsert: true,
|
setDefaultsOnInsert: true,
|
||||||
|
returnDocument: "after",
|
||||||
};
|
};
|
||||||
const filter = {
|
const filter = settingsObjectId
|
||||||
_id: new ObjectId(ctx.params.settingsObjectId),
|
? { _id: settingsObjectId }
|
||||||
};
|
: {};
|
||||||
const result = Settings.findOneAndUpdate(
|
|
||||||
|
// 3. Execute the mongo query
|
||||||
|
const result = await Settings.findOneAndUpdate(
|
||||||
filter,
|
filter,
|
||||||
{ $set: updateQuery },
|
query,
|
||||||
options
|
options
|
||||||
);
|
);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
} catch (err) {
|
||||||
|
return err;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
deleteSettings: {
|
deleteSettings: {
|
||||||
|
|||||||
@@ -26,33 +26,40 @@ export default class SocketService extends Service {
|
|||||||
"/": {
|
"/": {
|
||||||
events: {
|
events: {
|
||||||
call: {
|
call: {
|
||||||
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
|
whitelist: ["socket.*"],
|
||||||
},
|
},
|
||||||
action: async (data) => {
|
},
|
||||||
switch (data.type) {
|
},
|
||||||
case "RESUME_SESSION":
|
},
|
||||||
|
options: {
|
||||||
|
adapter: createAdapter(pubClient, subClient),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
hooks: {},
|
||||||
|
actions: {
|
||||||
|
resumeSession: async (ctx: Context<{ sessionId: string }>) => {
|
||||||
|
const { sessionId } = ctx.params;
|
||||||
console.log("Attempting to resume session...");
|
console.log("Attempting to resume session...");
|
||||||
try {
|
try {
|
||||||
const sessionRecord = await Session.find({
|
const sessionRecord = await Session.find({
|
||||||
sessionId: data.session.sessionId,
|
sessionId,
|
||||||
});
|
});
|
||||||
// 1. Check for sessionId's existence, and a match
|
// 1. Check for sessionId's existence, and a match
|
||||||
if (
|
if (
|
||||||
sessionRecord.length !== 0 &&
|
sessionRecord.length !== 0 &&
|
||||||
sessionRecord[0].sessionId ===
|
sessionRecord[0].sessionId === sessionId
|
||||||
data.session.sessionId
|
|
||||||
) {
|
) {
|
||||||
// 2. Find if the queue has active jobs
|
// 2. Find if the queue has active, paused or waiting jobs
|
||||||
const jobs: JobType = await this.broker.call(
|
const jobs: JobType = await this.broker.call(
|
||||||
"jobqueue.getJobCountsByType",
|
"jobqueue.getJobCountsByType",
|
||||||
{}
|
{}
|
||||||
);
|
);
|
||||||
const { active } = jobs;
|
const { active, paused, waiting } = jobs;
|
||||||
|
|
||||||
if (active > 0) {
|
if (active > 0 || paused > 0 || waiting > 0) {
|
||||||
// 3. Get job counts
|
// 3. Get job counts
|
||||||
const completedJobCount =
|
const completedJobCount = await pubClient.get(
|
||||||
await pubClient.get(
|
|
||||||
"completedJobCount"
|
"completedJobCount"
|
||||||
);
|
);
|
||||||
const failedJobCount = await pubClient.get(
|
const failedJobCount = await pubClient.get(
|
||||||
@@ -62,10 +69,9 @@ export default class SocketService extends Service {
|
|||||||
// 4. Send the counts to the active socket.io session
|
// 4. Send the counts to the active socket.io session
|
||||||
await this.broker.call("socket.broadcast", {
|
await this.broker.call("socket.broadcast", {
|
||||||
namespace: "/",
|
namespace: "/",
|
||||||
event: "action",
|
event: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION",
|
||||||
args: [
|
args: [
|
||||||
{
|
{
|
||||||
type: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION",
|
|
||||||
completedJobCount,
|
completedJobCount,
|
||||||
failedJobCount,
|
failedJobCount,
|
||||||
queueStatus: "running",
|
queueStatus: "running",
|
||||||
@@ -80,46 +86,35 @@ export default class SocketService extends Service {
|
|||||||
500,
|
500,
|
||||||
"SESSION_ID_NOT_FOUND",
|
"SESSION_ID_NOT_FOUND",
|
||||||
{
|
{
|
||||||
data: data.session.sessionId,
|
data: sessionId,
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
|
||||||
break;
|
setQueueStatus: async (
|
||||||
|
ctx: Context<{
|
||||||
case "LS_SET_QUEUE_STATUS":
|
queueAction: string;
|
||||||
console.log(data);
|
queueStatus: string;
|
||||||
|
}>
|
||||||
|
) => {
|
||||||
|
const { queueAction } = ctx.params;
|
||||||
await this.broker.call(
|
await this.broker.call(
|
||||||
"jobqueue.toggle",
|
"jobqueue.toggle",
|
||||||
{ action: data.data.queueAction },
|
{ action: queueAction },
|
||||||
{}
|
{}
|
||||||
);
|
);
|
||||||
break;
|
},
|
||||||
case "LS_SINGLE_IMPORT":
|
importSingleIssue: async (ctx: Context<{}>) => {
|
||||||
console.info("AirDC++ finished a download -> ");
|
console.info("AirDC++ finished a download -> ");
|
||||||
console.log(data);
|
console.log(ctx.params);
|
||||||
await this.broker.call(
|
// await this.broker.call(
|
||||||
"library.importDownloadedComic",
|
// "library.importDownloadedComic",
|
||||||
{ bundle: data },
|
// { bundle: data },
|
||||||
{}
|
// {}
|
||||||
);
|
// );
|
||||||
break;
|
|
||||||
// uncompress archive events
|
|
||||||
case "COMICBOOK_EXTRACTION_SUCCESS":
|
|
||||||
console.log(data);
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
|
||||||
},
|
|
||||||
options: {
|
|
||||||
adapter: createAdapter(pubClient, subClient),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
hooks: {},
|
|
||||||
actions: {},
|
|
||||||
methods: {},
|
methods: {},
|
||||||
async started() {
|
async started() {
|
||||||
this.io.on("connection", async (socket) => {
|
this.io.on("connection", async (socket) => {
|
||||||
@@ -146,10 +141,7 @@ export default class SocketService extends Service {
|
|||||||
}
|
}
|
||||||
// 2. else, retrieve it from Mongo and "resume" the socket.io connection
|
// 2. else, retrieve it from Mongo and "resume" the socket.io connection
|
||||||
else {
|
else {
|
||||||
console.log(
|
console.log(`Found socketId ${socket.id}, no-op.`);
|
||||||
`Found socketId ${socket.id}, attempting to resume socket.io connection...`
|
|
||||||
);
|
|
||||||
console.log(socket.handshake.query.sessionId);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|||||||
101
services/torrentjobs.service.ts
Normal file
101
services/torrentjobs.service.ts
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
"use strict";
|
||||||
|
import axios from "axios";
|
||||||
|
import {
|
||||||
|
Context,
|
||||||
|
Service,
|
||||||
|
ServiceBroker,
|
||||||
|
ServiceSchema,
|
||||||
|
Errors,
|
||||||
|
} from "moleculer";
|
||||||
|
import { DbMixin } from "../mixins/db.mixin";
|
||||||
|
import Comic from "../models/comic.model";
|
||||||
|
const ObjectId = require("mongoose").Types.ObjectId;
|
||||||
|
import { isNil, isUndefined } from "lodash";
|
||||||
|
import BullMqMixin from "moleculer-bullmq";
|
||||||
|
const { MoleculerError } = require("moleculer").Errors;
|
||||||
|
|
||||||
|
export default class ImageTransformation extends Service {
|
||||||
|
// @ts-ignore
|
||||||
|
public constructor(
|
||||||
|
public broker: ServiceBroker,
|
||||||
|
schema: ServiceSchema<{}> = { name: "imagetransformation" }
|
||||||
|
) {
|
||||||
|
super(broker);
|
||||||
|
this.parseServiceSchema({
|
||||||
|
name: "torrentjobs",
|
||||||
|
mixins: [DbMixin("comics", Comic), BullMqMixin],
|
||||||
|
settings: {
|
||||||
|
bullmq: {
|
||||||
|
client: process.env.REDIS_URI,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
hooks: {},
|
||||||
|
actions: {
|
||||||
|
getTorrentData: {
|
||||||
|
queue: true,
|
||||||
|
rest: "GET /getTorrentData",
|
||||||
|
handler: async (ctx: Context<{ trigger: string }>) => {
|
||||||
|
const { trigger } = ctx.params;
|
||||||
|
console.log(`Recieved ${trigger} as the trigger...`);
|
||||||
|
|
||||||
|
const jobOptions = {
|
||||||
|
jobId: "retrieveTorrentData",
|
||||||
|
name: "bossy",
|
||||||
|
repeat: {
|
||||||
|
every: 10000, // Repeat every 10000 ms
|
||||||
|
limit: 100, // Limit to 100 repeats
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const job = await this.localQueue(
|
||||||
|
ctx,
|
||||||
|
"fetchTorrentData",
|
||||||
|
ctx.params,
|
||||||
|
jobOptions
|
||||||
|
);
|
||||||
|
return job;
|
||||||
|
},
|
||||||
|
},
|
||||||
|
fetchTorrentData: {
|
||||||
|
rest: "GET /fetchTorrentData",
|
||||||
|
handler: async (
|
||||||
|
ctx: Context<{
|
||||||
|
birdName: String;
|
||||||
|
}>
|
||||||
|
) => {
|
||||||
|
const repeatableJob = await this.$resolve(
|
||||||
|
"torrentjobs"
|
||||||
|
).getRepeatableJobs();
|
||||||
|
console.info(repeatableJob);
|
||||||
|
console.info(
|
||||||
|
`Scheduled job for fetching torrent data fired.`
|
||||||
|
);
|
||||||
|
// 1. query mongo for infohashes
|
||||||
|
const infoHashes = await this.broker.call(
|
||||||
|
"library.getInfoHashes",
|
||||||
|
{}
|
||||||
|
);
|
||||||
|
// 2. query qbittorrent to see if they exist
|
||||||
|
const torrents: any = await this.broker.call(
|
||||||
|
"qbittorrent.getTorrentRealTimeStats",
|
||||||
|
{ infoHashes }
|
||||||
|
);
|
||||||
|
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
|
||||||
|
await this.broker.call("socket.broadcast", {
|
||||||
|
namespace: "/",
|
||||||
|
event: "AS_TORRENT_DATA",
|
||||||
|
args: [
|
||||||
|
{
|
||||||
|
torrents,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
});
|
||||||
|
// 3. If they do, don't do anything
|
||||||
|
// 4. If they don't purge them from mongo
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
methods: {},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -74,7 +74,7 @@ const errors = [];
|
|||||||
*/
|
*/
|
||||||
export const extractComicInfoXMLFromRar = async (
|
export const extractComicInfoXMLFromRar = async (
|
||||||
filePath: string,
|
filePath: string,
|
||||||
mimeType: string,
|
mimeType: string
|
||||||
): Promise<any> => {
|
): Promise<any> => {
|
||||||
try {
|
try {
|
||||||
// Create the target directory
|
// Create the target directory
|
||||||
@@ -210,7 +210,7 @@ export const extractComicInfoXMLFromRar = async (
|
|||||||
|
|
||||||
export const extractComicInfoXMLFromZip = async (
|
export const extractComicInfoXMLFromZip = async (
|
||||||
filePath: string,
|
filePath: string,
|
||||||
mimeType: string,
|
mimeType: string
|
||||||
): Promise<any> => {
|
): Promise<any> => {
|
||||||
try {
|
try {
|
||||||
// Create the target directory
|
// Create the target directory
|
||||||
@@ -357,11 +357,17 @@ export const extractFromArchive = async (filePath: string) => {
|
|||||||
switch (mimeType) {
|
switch (mimeType) {
|
||||||
case "application/x-7z-compressed; charset=binary":
|
case "application/x-7z-compressed; charset=binary":
|
||||||
case "application/zip; charset=binary":
|
case "application/zip; charset=binary":
|
||||||
const cbzResult = await extractComicInfoXMLFromZip(filePath, mimeType);
|
const cbzResult = await extractComicInfoXMLFromZip(
|
||||||
|
filePath,
|
||||||
|
mimeType
|
||||||
|
);
|
||||||
return Object.assign({}, ...cbzResult);
|
return Object.assign({}, ...cbzResult);
|
||||||
|
|
||||||
case "application/x-rar; charset=binary":
|
case "application/x-rar; charset=binary":
|
||||||
const cbrResult = await extractComicInfoXMLFromRar(filePath, mimeType);
|
const cbrResult = await extractComicInfoXMLFromRar(
|
||||||
|
filePath,
|
||||||
|
mimeType
|
||||||
|
);
|
||||||
return Object.assign({}, ...cbrResult);
|
return Object.assign({}, ...cbrResult);
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@@ -369,9 +375,8 @@ export const extractFromArchive = async (filePath: string) => {
|
|||||||
"Error inferring filetype for comicinfo.xml extraction."
|
"Error inferring filetype for comicinfo.xml extraction."
|
||||||
);
|
);
|
||||||
throw new MoleculerError({}, 500, "FILETYPE_INFERENCE_ERROR", {
|
throw new MoleculerError({}, 500, "FILETYPE_INFERENCE_ERROR", {
|
||||||
data: { message: "Cannot infer filetype."},
|
data: { message: "Cannot infer filetype." },
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user