85 Commits

Author SHA1 Message Date
30168844f3 Merge branch 'master' into getbundles-fix 2024-10-24 10:47:31 -04:00
2e60e2e3d5 Added package-lock.json 2024-10-24 10:45:19 -04:00
8254ec2093 ⌫ package.json deleted 2024-10-24 10:41:59 -04:00
7381d03045 🔧 Fixed getBundles endpoint 2024-10-23 23:14:21 -04:00
d7e865f84f 🔧 Prettification 2024-10-23 14:26:24 -04:00
baa5a99855 🔧 Removed indirection for getBundles 2024-10-23 13:42:05 -04:00
68c2dacff4 🔧 getBundles endpoint WIP 2024-10-21 18:04:16 -04:00
55e0ce6d36 🖌️ Formatting changes 2024-10-18 13:19:57 -04:00
4ffad69c44 🔧 Todo to move the method from UI 2024-10-16 18:50:14 -04:00
f9438f2129 🔧 Fixing broken AirDCPP search 2024-09-26 21:33:02 -04:00
2247411ac8 🪳 Added kafka to the docker-compose deps 2024-05-28 08:40:33 -04:00
e61ecb1143 🔧 Refactor for docker-compose 2024-05-24 14:22:59 -04:00
e01421f17b 🐳 Added all other deps 2024-05-23 23:15:03 -04:00
cc271021e0 🐳 Created a deps docker-compose stack 2024-05-19 21:19:15 -04:00
cc772504ae 🔧 Fixed the Redis disconnection issue 2024-05-17 01:26:16 -04:00
8dcb17a6a0 🔧 Reverted 2024-05-16 14:15:09 -04:00
a06896ffcc 🔧 Reverting to nats for transporter needs 2024-05-16 14:03:07 -04:00
03f6623ed0 🔧 Fixes 2024-05-15 21:27:38 -05:00
66f9d63b44 🔧 Debuggin Redis connectivity issue 2024-05-15 16:58:49 -05:00
a936df3144 🪲 Added a console.log 2024-05-15 12:13:50 -05:00
dc9dabca48 🔧 Fixed REDIS_URI 2024-05-15 12:00:51 -05:00
4680fd0875 ⬅️ Reverted changes 2024-05-15 11:47:08 -05:00
323548c0ff 🔧 WIP Dockerfile fixes 2024-05-15 11:32:11 -05:00
f4563c12c6 🔧 Added startup scripts fixing MongoDB timeouts 2024-05-12 23:35:01 -04:00
1b0cada848 🏗️ Added validation to db mixin 2024-05-11 13:31:10 -04:00
750a74cd9f Merge pull request #10 from rishighan/automated-download-loop
Automated download loop
2024-05-10 22:59:08 -04:00
402ee4d81b 🏗️ Updated Dockerfile 2024-05-10 22:55:57 -04:00
1fa35ac0e3 🏗️ Automatic downloads endpoint support 2024-05-09 13:49:26 -04:00
680594e67c 🔧 Added wiring for AirDC++ service 2024-04-23 22:47:32 -05:00
5593fcb4a0 🔧 Added DC++ search and download actions 2024-04-17 21:14:48 -05:00
d7f3d3a7cf 🔧 Modified Comic model 2024-04-16 22:41:33 -05:00
94cb95f4bf 📚 Changes to CV model 2024-04-14 00:25:41 -04:00
c6651cdd91 Merge pull request #9 from rishighan/qbittorrent-settings
🏗️ Added torrent attrs to comic model
2024-03-30 21:40:02 -04:00
b35e2140b5 🧲 Created a dedicated queue for torrent ops 2024-03-29 19:36:16 -04:00
f053dcb789 🧲 Massaging data to be sent to UI 2024-03-27 22:22:40 -05:00
aea7a24f76 🧲 Added a job for deleted torrents clean-up 2024-03-24 17:31:31 -04:00
8f0c2f4302 ⚙️ getAllSettings is parameterized 2024-03-12 16:39:44 -05:00
7dbe2b2701 🏗️ Added torrent attrs to comic model 2024-03-03 12:22:40 -05:00
5b9ef9fbbb Merge pull request #8 from rishighan/qbittorrent-settings
Miscellaneous Settings
2024-01-08 16:42:54 -05:00
4cdb11fcbd Cleaned the console.logs 2024-01-08 16:40:12 -05:00
78f7c1b595 🤐 Added uncompression event 2024-01-07 22:13:02 -05:00
bbd2906ebf 🏗️ Added some archive-related keys to Comic model 2024-01-06 11:17:40 -05:00
1861c2eeed Merge pull request #7 from rishighan/qbittorrent-settings
🌊 Modified settings model schema
2023-12-30 00:52:17 -05:00
f3965437b5 🏗 Added a job for full archive extraction 2023-12-30 00:50:06 -05:00
78e0e9f8ce 🏗️ Refactored the searchIssue method 2023-12-28 22:52:33 -05:00
c926758db6 🏗️ Added a downloads array to bittorent schema 2023-12-20 00:08:38 -05:00
b2b35aedc0 🏗️ Fixed a mongo update query 2023-11-27 02:14:16 -05:00
f35e3ccbe0 Removed useless code 2023-11-15 16:02:07 -06:00
7b0c0a7420 Added the importSingleIssue action 2023-11-15 15:59:27 -06:00
c2bbbf311d 🏗️ Fixed setQueueStatus 2023-11-14 13:24:49 -06:00
b8ca03220f 🏗 Implemented setQueueStatus 2023-11-13 22:01:01 -05:00
b87b0c875d 🏗️ Fleshed out resumeSession event 2023-11-13 21:18:19 -05:00
11fbaf10db 🏗 Wired up the events correctly 2023-11-13 16:41:58 -05:00
1229feb69c 🏗️ Refactor for zustand and tanstack react query support 2023-11-09 10:22:45 -06:00
3efdc7c2e2 ⚙️ Refactored saveSettings endpoint 2023-09-15 15:49:13 -04:00
1fff931941 🌊 Modified settings model schema 2023-09-13 22:09:25 -05:00
f4e2db5a5f 📦 Instruction for paths for unrar and p7zip 2023-09-01 09:44:02 -05:00
1d7561279b 📕 Updated local dev instructions in README 2023-09-01 09:22:02 -05:00
9e47ae0436 Merge pull request #6 from rishighan/migration-to-bullmq
🐂 Migration to moleculer-bullMQ
2023-08-30 13:50:47 -04:00
b1b1cb527b 🔧 Moved moleculer-bullmq to dependencies 2023-08-30 13:16:58 -04:00
cfa09691e8 🔧 Fixed an errant condition
This error was because I checked for active AND prioritized jobs in BullMQ, when none existed, because everything was active, and the socket.io event never fired, causing the browser to be in a bad state and never "resuming" an import even when one was in progress.
2023-08-30 12:21:43 -04:00
356b093db9 🔧 Fixed a dep 2023-08-30 00:08:05 -04:00
b4b83e5c75 🔧 Reworked the jobResults aggregation 2023-08-29 23:58:06 -04:00
c718456adc 🔧 jobResult aggregate query first draft 2023-08-28 23:56:44 -04:00
76d4e6b10f 🔢 Persisting the sessionId in the JobResult 2023-08-27 20:25:04 -04:00
bde548695c 🔧 Refactored the getJobResultStatistics method 2023-08-24 23:45:51 -04:00
fd4dd1d5c4 🥭 Aggregation for jobResult 2023-08-24 23:18:27 -04:00
5540bb16ec ⏱️ Added a timestamp to job results schema 2023-08-24 09:06:38 -05:00
6ee609f2b9 🔧 Refactor and added getJobCounts 2023-08-23 11:47:47 -05:00
8b584080e2 🐂 Queue controls 2023-08-22 22:07:51 -05:00
01975079e3 🐂 Queue drain event 2023-08-22 05:20:24 -04:00
fe9fbe9c3a 🔢 Getting job counts 2023-08-22 00:04:47 -04:00
df6652cce9 🐂 Queue pause/resume functionality 2023-08-21 17:55:08 -04:00
e5fc879b2d 🐂 Added some job counters 2023-08-18 11:39:18 -04:00
625447e7f1 🧊 Added shared Redis config 2023-08-14 22:15:19 -04:00
fdcf1f7d68 🧹 Linted code 2023-08-14 19:45:40 -04:00
4003f666cf 🔧 Tooling for resumable socket.io sessions 2023-07-27 11:09:26 -07:00
7b855f8cf1 🐂 BullMQ support code 2023-07-13 08:02:12 -07:00
007ce4b66f 🏗️ Applying the refactor patc 2023-07-05 23:14:46 -04:00
cb84e4893f 🐂 Migration to moleculer-bullMQ 2023-06-29 14:16:58 -04:00
795ac561c7 🔼 Updated deps 2023-04-19 09:14:22 -04:00
175e01dc2d Added elasticsearch dep 2023-04-09 15:53:36 -04:00
66e0a26c68 Merge pull request #4 from elgohr-update/master 2023-04-04 17:46:31 -04:00
Lars Gohr
959d248273 Updated elgohr/Publish-Docker-Github-Action to a supported version (v5) 2023-03-30 06:52:23 +02:00
745ec5d774 Merge pull request #3 from rishighan/mimetype-check
 MIMEtype check for comic book archives
2023-03-23 23:59:30 -04:00
30 changed files with 5419 additions and 3863 deletions

View File

@@ -12,7 +12,7 @@ jobs:
steps:
- uses: actions/checkout@master
- name: Publish to Registry
uses: elgohr/Publish-Docker-Github-Action@master
uses: elgohr/Publish-Docker-Github-Action@v5
with:
name: frishi/threetwo-core-service
username: ${{ secrets.DOCKER_USERNAME }}

View File

@@ -1,40 +1,62 @@
FROM alpine:3.14
# Use a base image with Node.js 22.1.0
FROM node:22.1.0
# Set metadata for contact
LABEL maintainer="Rishi Ghan <rishi.ghan@gmail.com>"
# Show all node logs
# Set environment variables
ENV NPM_CONFIG_LOGLEVEL warn
ENV NODE_ENV=production
# Set the working directory
WORKDIR /core-services
# Install required packages
RUN apt-get update && apt-get install -y \
libvips-tools \
wget \
imagemagick \
python3 \
xvfb \
xz-utils \
curl \
bash \
software-properties-common
RUN apk add --update \
--repository http://nl.alpinelinux.org/alpine/v3.14/main \
vips-tools \
wget \
imagemagick \
python3 \
unrar \
p7zip \
nodejs \
npm \
xvfb \
xz
# Install p7zip
RUN apt-get update && apt-get install -y p7zip
# Install unrar directly from RARLAB
RUN wget https://www.rarlab.com/rar/rarlinux-x64-621.tar.gz \
&& tar -zxvf rarlinux-x64-621.tar.gz \
&& cp rar/unrar /usr/bin/ \
&& rm -rf rarlinux-x64-621.tar.gz rar
# Clean up package lists
RUN rm -rf /var/lib/apt/lists/*
# Verify Node.js installation
RUN node -v && npm -v
# Copy application configuration files
COPY package.json package-lock.json ./
COPY moleculer.config.ts ./
COPY tsconfig.json ./
COPY scripts ./scripts
RUN chmod +x ./scripts/*
RUN npm i
# Install Dependncies
# Install application dependencies
RUN npm install
RUN npm install -g typescript ts-node
# Copy the rest of the application files
COPY . .
# Build and cleanup
RUN npm run build \
&& npm prune
# clean up
RUN npm prune
# Expose the application's port
EXPOSE 3000
# Start server
CMD ["npm", "start"]
# Command to run the application
CMD ["npm", "start"]

View File

@@ -3,21 +3,35 @@
This [moleculer-based](https://github.com/moleculerjs/moleculer-web) microservice houses endpoints for the following functions:
1. Local import of a comic library into mongo (currently supports `cbr` and `cbz` files)
2. Metadata extraction from file, `comicinfo.xml`
2. Metadata extraction from file, `comicinfo.xml`
3. Mongo comic object orchestration
4. CRUD operations on `Comic` model
5. Helper utils to help with image metadata extraction, file operations and more.
## Local Development
1. ~~You need `calibre` in your local path.
On `macOS` you can `brew install calibre` and make sure that `ebook-meta` is present on the path~~ Calibre is no longer required as a dependency. Ignore this step.
2. You need `mongo` for the data store. on `macOS` you can use [these instructions](https://docs.mongodb.com/manual/tutorial/install-mongodb-on-os-x/) to install it
1. You need the following dependencies installed: `mongo`, `elasticsearch` and `redis`
2. You also need binaries for `unrar` and `p7zip`
3. Clone this repo
4. Run `npm i`
5. Assuming you installed mongo correctly, run `MONGO_URI=mongodb://localhost:27017/threetwo npm run dev` to start the service
5. Assuming you installed the dependencies correctly, run:
```
COMICS_DIRECTORY=<PATH_TO_COMICS_DIRECTORY> \
USERDATA_DIRECTORY=<PATH_TO_USERDATA_DIRECTORY> \
REDIS_URI=redis://<REDIS_HOST:REDIS_PORT> \
ELASTICSEARCH_URI=<ELASTICSEARCH_HOST:ELASTICSEARCH_PORT> \
MONGO_URI=mongodb://<MONGO_HOST:MONGO_PORT>/threetwo \
UNRAR_BIN_PATH=<UNRAR_BIN_PATH> \
SEVENZ_BINARY_PATH=<SEVENZ_BINARY_PATH> \
npm run dev
```
to start the service
6. You should see the service spin up and a list of all the endpoints in the terminal
7. The service can be accessed through `http://localhost:3000/api/import/*`
7. The service can be accessed through `http://localhost:3000/api/<serviceName>/*`
## Docker Instructions
1. Build the image using `docker build . -t frishi/threetwo-import-service`. Give it a hot minute.

30
config/redis.config.ts Normal file
View File

@@ -0,0 +1,30 @@
// Import the Redis library
import IORedis from "ioredis";
// Environment variable for Redis URI
const redisURI = process.env.REDIS_URI || "redis://localhost:6379";
console.log(`process.env.REDIS_URI is ${process.env.REDIS_URI}`);
// Creating the publisher client
const pubClient = new IORedis(redisURI);
// Creating the subscriber client
const subClient = new IORedis(redisURI);
// Handle connection events for the publisher
pubClient.on("connect", () => {
console.log("Publisher client connected to Redis.");
});
pubClient.on("error", (err) => {
console.error("Publisher client failed to connect to Redis:", err);
});
// Handle connection events for the subscriber
subClient.on("connect", () => {
console.log("Subscriber client connected to Redis.");
});
subClient.on("error", (err) => {
console.error("Subscriber client failed to connect to Redis:", err);
});
// Export the clients for use in other parts of the application
export { pubClient, subClient };

View File

@@ -0,0 +1,103 @@
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
networks:
- kafka-net
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
networks:
- kafka-net
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8087:8080
environment:
DYNAMIC_CONFIG_ENABLED: true
volumes:
- /Users/rishi/work/config/kafka-ui/config.yml:/etc/kafkaui/dynamic_config.yaml
depends_on:
- kafka1
- zoo1
networks:
- kafka-net
db:
image: "mongo:latest"
container_name: database
networks:
- kafka-net
ports:
- "27017:27017"
volumes:
- "mongodb_data:/bitnami/mongodb"
redis:
image: "bitnami/redis:latest"
container_name: queue
environment:
ALLOW_EMPTY_PASSWORD: "yes"
networks:
- kafka-net
ports:
- "6379:6379"
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2
container_name: elasticsearch
environment:
- "discovery.type=single-node"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- "xpack.security.enabled=true"
- "xpack.security.authc.api_key.enabled=true"
- "ELASTIC_PASSWORD=password"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- "9200:9200"
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
volumes:
mongodb_data:
driver: local
elasticsearch:
driver: local

View File

@@ -3,7 +3,15 @@ LOGGER=true
LOGLEVEL=info
SERVICEDIR=dist/services
TRANSPORTER=nats://nats:4222
VITE_UNDERLYING_HOST=localhost
COMICS_DIRECTORY=/Users/rishi/work/threetwo-core-service/comics
USERDATA_DIRECTORY=/Users/rishi/work/threetwo-core-service/userdata
REDIS_URI=redis://redis:6379
KAFKA_BROKER=kafka1:9092
ELASTICSEARCH_URI=http://elasticsearch:9200
MONGO_URI=mongodb://db:27017/threetwo
UNRAR_BIN_PATH=/opt/homebrew/bin/unrar
SEVENZ_BINARY_PATH=/opt/homebrew/bin/7za
CACHER=Memory

View File

@@ -1,58 +1,125 @@
version: "3.3"
x-userdata-volume: &userdata-volume
type: bind
source: ${USERDATA_DIRECTORY}
target: /userdata
x-comics-volume: &comics-volume
type: bind
source: ${COMICS_DIRECTORY}
target: /comics
services:
api:
core-services:
build:
context: .
image: threetwo-library-service
env_file: docker-compose.env
environment:
SERVICES: api
PORT: 3000
depends_on:
- nats
labels:
- "traefik.enable=true"
- "traefik.http.routers.api-gw.rule=PathPrefix(`/`)"
- "traefik.http.services.api-gw.loadbalancer.server.port=3000"
networks:
- internal
greeter:
build:
context: .
image: threetwo-library-service
env_file: docker-compose.env
environment:
SERVICES: greeter
depends_on:
- nats
networks:
- internal
nats:
image: nats:2
networks:
- internal
traefik:
image: traefik:v2.1
command:
- "--api.insecure=true" # Don't do that in production!
- "--providers.docker=true"
- "--providers.docker.exposedbydefault=false"
# context: https://github.com/rishighan/threetwo-core-service.git
context: ./
dockerfile: Dockerfile
image: frishi/threetwo-core-service
container_name: core-services
ports:
- 3000:80
- 3001:8080
- "3000:3000"
- "3001:3001"
depends_on:
- db
- redis
- elasticsearch
- kafka1
- zoo1
environment:
name: core-services
SERVICES: api,library,imagetransformation,opds,search,settings,jobqueue,socket,torrentjobs
env_file: docker-compose.env
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- *comics-volume
- *userdata-volume
networks:
- internal
- default
- proxy
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
networks:
- proxy
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1} :9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state. change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
networks:
- proxy
db:
image: "mongo:latest"
container_name: database
networks:
- proxy
ports:
- "27017:27017"
volumes:
- "mongodb_data:/bitnami/mongodb"
redis:
image: "bitnami/redis:latest"
container_name: redis
hostname: redis
environment:
ALLOW_EMPTY_PASSWORD: "yes"
networks:
- proxy
ports:
- "6379:6379"
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2
container_name: elasticsearch
environment:
- "discovery.type=single-node"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- "xpack.security.enabled=true"
- "xpack.security.authc.api_key.enabled=true"
- "ELASTIC_PASSWORD=password"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- 9200:9200
networks:
- proxy
networks:
internal:
proxy:
external: true
volumes:
data:
mongodb_data:
driver: local
elasticsearch:
driver: local

View File

@@ -2,21 +2,60 @@ const path = require("path");
const mkdir = require("mkdirp").sync;
const DbService = require("moleculer-db");
export const DbMixin = (collection, model) => {
if (process.env.MONGO_URI) {
const MongooseAdapter = require("moleculer-db-adapter-mongoose");
return {
mixins: [DbService],
adapter: new MongooseAdapter(process.env.MONGO_URI, {
user: process.env.MONGO_INITDB_ROOT_USERNAME,
pass: process.env.MONGO_INITDB_ROOT_PASSWORD,
keepAlive: true,
useUnifiedTopology: true,
family: 4,
}),
model,
};
if (!process.env.MONGO_URI) {
console.log("MONGO_URI not provided, initializing local storage...");
mkdir(path.resolve("./data"));
return { mixins: [DbService] }; // Handle case where no DB URI is provided
}
mkdir(path.resolve("./data"));
const MongooseAdapter = require("moleculer-db-adapter-mongoose");
const adapter = new MongooseAdapter(process.env.MONGO_URI, {
user: process.env.MONGO_INITDB_ROOT_USERNAME,
pass: process.env.MONGO_INITDB_ROOT_PASSWORD,
keepAlive: true,
useNewUrlParser: true,
useUnifiedTopology: true,
});
const connectWithRetry = async (
adapter,
maxRetries = 5,
interval = 5000
) => {
for (let retry = 0; retry < maxRetries; retry++) {
try {
await adapter.connect();
console.log("MongoDB connected successfully!");
return;
} catch (err) {
console.error("MongoDB connection error:", err);
console.log(
`Retrying MongoDB connection in ${
interval / 1000
} seconds...`
);
await new Promise((resolve) => setTimeout(resolve, interval));
}
}
console.error("Failed to connect to MongoDB after several attempts.");
};
return {
mixins: [DbService],
adapter,
model,
collection,
async started() {
await connectWithRetry(this.adapter);
},
async stopped() {
try {
await this.adapter.disconnect();
console.log("MongoDB disconnected");
} catch (err) {
console.error("MongoDB disconnection error:", err);
}
},
};
};

View File

@@ -1,8 +1,7 @@
const paginate = require("mongoose-paginate-v2");
const { Client } = require("@elastic/elasticsearch");
import ComicVineMetadataSchema from "./comicvine.metadata.model";
import { mongoosastic } from "mongoosastic-ts";
const mongoose = require("mongoose")
const mongoose = require("mongoose");
import {
MongoosasticDocument,
MongoosasticModel,
@@ -28,6 +27,10 @@ const RawFileDetailsSchema = mongoose.Schema({
mimeType: String,
containedIn: String,
pageCount: Number,
archive: {
uncompressed: Boolean,
expandedPath: String,
},
cover: {
filePath: String,
stats: Object,
@@ -51,7 +54,38 @@ const DirectConnectBundleSchema = mongoose.Schema({
name: String,
size: String,
type: {},
_id: false,
});
const wantedSchema = mongoose.Schema(
{
source: { type: String, default: null },
markEntireVolumeWanted: Boolean,
issues: {
type: [
{
_id: false, // Disable automatic ObjectId creation for each issue
id: Number,
url: String,
image: { type: Array, default: [] },
coverDate: String,
issueNumber: String,
},
],
default: null,
},
volume: {
type: {
_id: false, // Disable automatic ObjectId creation for volume
id: Number,
url: String,
image: { type: Array, default: [] },
name: String,
},
default: null,
},
},
{ _id: false }
); // Disable automatic ObjectId creation for the wanted object itself
const ComicSchema = mongoose.Schema(
{
@@ -67,18 +101,12 @@ const ComicSchema = mongoose.Schema(
},
sourcedMetadata: {
comicInfo: { type: mongoose.Schema.Types.Mixed, default: {} },
comicvine: {
type: ComicVineMetadataSchema,
es_indexed: true,
default: {},
},
shortboxed: {},
comicvine: { type: mongoose.Schema.Types.Mixed, default: {} }, // Set as a freeform object
locg: {
type: LOCGSchema,
es_indexed: true,
default: {},
},
gcd: {},
},
rawFileDetails: {
type: RawFileDetailsSchema,
@@ -98,11 +126,9 @@ const ComicSchema = mongoose.Schema(
subtitle: { type: String, es_indexed: true },
},
},
wanted: wantedSchema,
acquisition: {
source: {
wanted: Boolean,
name: String,
},
release: {},
directconnect: {
downloads: {
@@ -111,12 +137,13 @@ const ComicSchema = mongoose.Schema(
default: [],
},
},
torrent: {
sourceApplication: String,
magnet: String,
tracker: String,
status: String,
},
torrent: [
{
infoHash: String,
name: String,
announce: [String],
},
],
usenet: {
sourceApplication: String,
},

View File

@@ -1,95 +0,0 @@
const mongoose = require("mongoose");
const Things = mongoose.Schema({
_id: false,
api_detail_url: String,
id: Number,
name: String,
site_detail_url: String,
count: String,
});
const Issue = mongoose.Schema({
_id: false,
api_detail_url: String,
id: Number,
name: String,
issue_number: String,
});
const VolumeInformation = mongoose.Schema({
_id: false,
aliases: [String],
api_detail_url: String,
characters: [Things],
concepts: [Things],
count_of_issues: String,
date_added: String,
date_last_updated: String,
deck: String,
description: String,
first_issue: Issue,
id: Number,
image: {
icon_url: String,
medium_url: String,
screen_url: String,
screen_large_url: String,
small_url: String,
super_url: String,
thumb_url: String,
tiny_url: String,
original_url: String,
image_tags: String,
},
issues: [
{
api_detail_url: String,
id: Number,
name: String,
issue_number: String,
site_detail_url: String,
},
],
last_issue: Issue,
locations: [Things],
name: String,
objects: [Things],
people: [Things],
publisher: {
api_detail_url: String,
id: Number,
name: String,
},
site_detail_url: String,
start_year: String,
});
const ComicVineMetadataSchema = mongoose.Schema({
_id: false,
aliases: [String],
api_detail_url: String,
has_staff_review: { type: mongoose.Schema.Types.Mixed },
cover_date: Date,
date_added: String,
date_last_updated: String,
deck: String,
description: String,
image: {
icon_url: String,
medium_url: String,
screen_url: String,
screen_large_url: String,
small_url: String,
super_url: String,
thumb_url: String,
tiny_url: String,
original_url: String,
image_tags: String,
},
id: Number,
name: String,
resource_type: String,
volumeInformation: VolumeInformation,
});
export default ComicVineMetadataSchema;

12
models/jobresult.model.ts Normal file
View File

@@ -0,0 +1,12 @@
const mongoose = require("mongoose");
const JobResultScehma = mongoose.Schema({
id: Number,
status: String,
sessionId: String,
failedReason: Object,
timestamp: Date,
});
const JobResult = mongoose.model("JobResult", JobResultScehma);
export default JobResult;

9
models/session.model.ts Normal file
View File

@@ -0,0 +1,9 @@
const mongoose = require("mongoose");
const SessionScehma = mongoose.Schema({
sessionId: String,
socketId: String,
});
const Session = mongoose.model("Session", SessionScehma);
export default Session;

View File

@@ -1,21 +1,34 @@
const mongoose = require("mongoose");
const paginate = require("mongoose-paginate-v2");
const HostSchema = mongoose.Schema({
_id: false,
username: String,
password: String,
hostname: String,
port: String,
protocol: String,
});
const SettingsScehma = mongoose.Schema({
directConnect: {
client: {
host: {
username: String,
password: String,
hostname: String,
port: String,
protocol: String,
},
host: HostSchema,
airDCPPUserSettings: Object,
hubs: Array,
},
},
bittorrent: {
client: {
name: String,
host: HostSchema,
},
},
prowlarr: {
client: {
host: HostSchema,
apiKey: String,
},
},
});
const Settings = mongoose.model("Settings", SettingsScehma);

View File

@@ -5,6 +5,7 @@ import {
MetricRegistry,
ServiceBroker,
} from "moleculer";
const RedisTransporter = require("moleculer").Transporters.Redis;
/**
* Moleculer ServiceBroker configuration file
@@ -90,7 +91,7 @@ const brokerConfig: BrokerOptions = {
// More info: https://moleculer.services/docs/0.14/networking.html
// Note: During the development, you don't need to define it because all services will be loaded locally.
// In production you can set it via `TRANSPORTER=nats://localhost:4222` environment variable.
transporter: process.env.REDIS_URI || "redis://localhost:6379",
transporter: new RedisTransporter(process.env.REDIS_URI),
// Define a cacher.
// More info: https://moleculer.services/docs/0.14/caching.html

6285
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,8 +4,8 @@
"description": "Endpoints for common operations in ThreeTwo",
"scripts": {
"build": "tsc --build tsconfig.json",
"dev": "ts-node ./node_modules/moleculer/bin/moleculer-runner.js --hot --repl --config moleculer.config.ts services/**/*.service.ts",
"start": "moleculer-runner --config dist/moleculer.config.js",
"dev": "./scripts/start.sh dev",
"start": "npm run build && ./scripts/start.sh prod",
"cli": "moleculer connect NATS",
"ci": "jest --watch",
"test": "jest --coverage",
@@ -20,7 +20,6 @@
],
"author": "Rishi Ghan",
"devDependencies": {
"@elastic/elasticsearch": "^8.6.0",
"@types/lodash": "^4.14.168",
"@typescript-eslint/eslint-plugin": "^5.56.0",
"@typescript-eslint/parser": "^5.56.0",
@@ -28,6 +27,7 @@
"eslint-plugin-import": "^2.20.2",
"eslint-plugin-prefer-arrow": "^1.2.2",
"install": "^0.13.0",
"ioredis": "^5.4.1",
"jest": "^29.5.0",
"jest-cli": "^29.5.0",
"moleculer-repl": "^0.7.0",
@@ -35,20 +35,22 @@
"npm": "^8.4.1",
"ts-jest": "^29.0.5",
"ts-node": "^10.9.1",
"typescript": "^5.0.2"
"typescript": "^5.0.2",
"uuid": "^9.0.0"
},
"dependencies": {
"@npcz/magic": "^1.3.14",
"redis": "^4.6.5",
"@socket.io/redis-adapter": "^8.1.0",
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
"@elastic/elasticsearch": "^8.13.1",
"@jorgeferrero/stream-to-buffer": "^2.0.6",
"@npcz/magic": "^1.3.14",
"@root/walk": "^1.1.0",
"@socket.io/redis-adapter": "^8.1.0",
"@types/jest": "^27.4.1",
"@types/mkdirp": "^1.0.0",
"@types/node": "^13.9.8",
"@types/string-similarity": "^4.0.0",
"axios": "^0.25.0",
"airdcpp-apisocket": "^2.4.4",
"axios": "^1.6.8",
"axios-retry": "^3.2.4",
"bree": "^7.1.5",
"calibre-opds": "^1.0.7",
@@ -65,24 +67,24 @@
"leven": "^3.1.0",
"lodash": "^4.17.21",
"mkdirp": "^0.5.5",
"moleculer": "^0.14.29",
"moleculer-bull": "github:rishighan/moleculer-bull#1.0.0",
"moleculer-bullmq": "^3.0.0",
"moleculer-db": "^0.8.23",
"moleculer-db-adapter-mongoose": "^0.9.2",
"moleculer-db-adapter-mongoose": "^0.9.4",
"moleculer-io": "^2.2.0",
"moleculer-web": "^0.10.5",
"moleculer-web": "^0.10.7",
"mongoosastic-ts": "^6.0.3",
"mongoose": "^6.10.4",
"mongoose-paginate-v2": "^1.3.18",
"nats": "^1.3.2",
"opds-extra": "^3.0.9",
"opds-extra": "^3.0.10",
"p7zip-threetwo": "^1.0.4",
"redis": "^4.6.14",
"sanitize-filename-ts": "^1.0.2",
"sharp": "^0.30.4",
"sharp": "^0.33.3",
"threetwo-ui-typings": "^1.0.14",
"through2": "^4.0.2",
"unrar": "^0.2.0",
"xml2js": "^0.4.23"
"xml2js": "^0.6.2"
},
"engines": {
"node": ">= 18.x.x"

26
scripts/start.sh Executable file
View File

@@ -0,0 +1,26 @@
#!/bin/bash
echo "Starting script with mode: $MODE"
# Extract the host and port from MONGO_URI
HOST_PORT=$(echo $MONGO_URI | sed -e 's/mongodb:\/\///' -e 's/\/.*$//')
# Assuming the script is called from the project root
PROJECT_ROOT=$(pwd)
echo "Project root: $PROJECT_ROOT"
CONFIG_PATH="$PROJECT_ROOT/moleculer.config.ts"
echo "Configuration path: $CONFIG_PATH"
# Set the correct path for moleculer-runner based on the mode
if [ "$MODE" == "dev" ]; then
# For development: use ts-node
MOLECULER_RUNNER="ts-node $PROJECT_ROOT/node_modules/moleculer/bin/moleculer-runner.js --hot --repl --config $CONFIG_PATH $PROJECT_ROOT/services/**/*.service.ts"
echo "Moleculer Runner for dev: $MOLECULER_RUNNER"
else
# For production: direct node execution of the compiled JavaScript
MOLECULER_RUNNER="moleculer-runner --config $PROJECT_ROOT/dist/moleculer.config.js $PROJECT_ROOT/dist/services/**/*.service.js"
echo "Moleculer Runner for prod: $MOLECULER_RUNNER"
fi
# Run wait-for-it, then start the application
./scripts/wait-for-it.sh $HOST_PORT -- $MOLECULER_RUNNER

190
scripts/wait-for-it.sh Executable file
View File

@@ -0,0 +1,190 @@
#!/usr/bin/env bash
# Use this script to test if a given TCP host/port are available
WAITFORIT_cmdname=${0##*/}
if [[ $OSTYPE == 'darwin'* ]]; then
if ! command -v gtimeout &> /dev/null
then
echo "missing gtimeout (`brew install coreutils`)"
exit
fi
alias timeout=gtimeout
fi
echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi }
usage()
{
cat << USAGE >&2
Usage:
$WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args]
-h HOST | --host=HOST Host or IP under test
-p PORT | --port=PORT TCP port under test
Alternatively, you specify the host and port as host:port
-s | --strict Only execute subcommand if the test succeeds
-q | --quiet Don't output any status messages
-t TIMEOUT | --timeout=TIMEOUT
Timeout in seconds, zero for no timeout
-- COMMAND ARGS Execute command with args after the test finishes
USAGE
exit 1
}
wait_for()
{
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
else
echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout"
fi
WAITFORIT_start_ts=$(date +%s)
while :
do
if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then
nc -z $WAITFORIT_HOST $WAITFORIT_PORT
WAITFORIT_result=$?
else
(echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1
WAITFORIT_result=$?
fi
if [[ $WAITFORIT_result -eq 0 ]]; then
WAITFORIT_end_ts=$(date +%s)
echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds"
break
fi
sleep 1
done
return $WAITFORIT_result
}
wait_for_wrapper()
{
# In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692
if [[ $WAITFORIT_QUIET -eq 1 ]]; then
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
else
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
fi
WAITFORIT_PID=$!
trap "kill -INT -$WAITFORIT_PID" INT
wait $WAITFORIT_PID
WAITFORIT_RESULT=$?
if [[ $WAITFORIT_RESULT -ne 0 ]]; then
echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
fi
return $WAITFORIT_RESULT
}
# process arguments
while [[ $# -gt 0 ]]
do
case "$1" in
*:* )
WAITFORIT_hostport=(${1//:/ })
WAITFORIT_HOST=${WAITFORIT_hostport[0]}
WAITFORIT_PORT=${WAITFORIT_hostport[1]}
shift 1
;;
--child)
WAITFORIT_CHILD=1
shift 1
;;
-q | --quiet)
WAITFORIT_QUIET=1
shift 1
;;
-s | --strict)
WAITFORIT_STRICT=1
shift 1
;;
-h)
WAITFORIT_HOST="$2"
if [[ $WAITFORIT_HOST == "" ]]; then break; fi
shift 2
;;
--host=*)
WAITFORIT_HOST="${1#*=}"
shift 1
;;
-p)
WAITFORIT_PORT="$2"
if [[ $WAITFORIT_PORT == "" ]]; then break; fi
shift 2
;;
--port=*)
WAITFORIT_PORT="${1#*=}"
shift 1
;;
-t)
WAITFORIT_TIMEOUT="$2"
if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi
shift 2
;;
--timeout=*)
WAITFORIT_TIMEOUT="${1#*=}"
shift 1
;;
--)
shift
WAITFORIT_CLI=("$@")
break
;;
--help)
usage
;;
*)
echoerr "Unknown argument: $1"
usage
;;
esac
done
if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then
echoerr "Error: you need to provide a host and port to test."
usage
fi
WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15}
WAITFORIT_STRICT=${WAITFORIT_STRICT:-0}
WAITFORIT_CHILD=${WAITFORIT_CHILD:-0}
WAITFORIT_QUIET=${WAITFORIT_QUIET:-0}
# Check to see if timeout is from busybox?
WAITFORIT_TIMEOUT_PATH=$(type -p timeout)
WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH)
WAITFORIT_BUSYTIMEFLAG=""
if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then
WAITFORIT_ISBUSY=1
# Check if busybox timeout uses -t flag
# (recent Alpine versions don't support -t anymore)
if timeout &>/dev/stdout | grep -q -e '-t '; then
WAITFORIT_BUSYTIMEFLAG="-t"
fi
else
WAITFORIT_ISBUSY=0
fi
if [[ $WAITFORIT_CHILD -gt 0 ]]; then
wait_for
WAITFORIT_RESULT=$?
exit $WAITFORIT_RESULT
else
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
wait_for_wrapper
WAITFORIT_RESULT=$?
else
wait_for
WAITFORIT_RESULT=$?
fi
fi
if [[ $WAITFORIT_CLI != "" ]]; then
if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then
echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess"
exit $WAITFORIT_RESULT
fi
exec "${WAITFORIT_CLI[@]}"
else
exit $WAITFORIT_RESULT
fi

156
services/airdcpp.service.ts Normal file
View File

@@ -0,0 +1,156 @@
"use strict";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import axios from "axios";
import AirDCPPSocket from "../shared/airdcpp.socket";
export default class AirDCPPService extends Service {
// @ts-ignore
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "airdcpp" }
) {
super(broker);
this.parseServiceSchema({
name: "airdcpp",
mixins: [],
hooks: {},
actions: {
initialize: {
rest: "POST /initialize",
handler: async (
ctx: Context<{
host: {
hostname: string;
port: string;
protocol: string;
username: string;
password: string;
};
}>
) => {
try {
const {
host: {
hostname,
protocol,
port,
username,
password,
},
} = ctx.params;
const airDCPPSocket = new AirDCPPSocket({
protocol,
hostname: `${hostname}:${port}`,
username,
password,
});
return await airDCPPSocket.connect();
} catch (err) {
console.error(err);
}
},
},
getHubs: {
rest: "POST /getHubs",
timeout: 70000,
handler: async (
ctx: Context<{
host: {
hostname: string;
port: string;
protocol: string;
username: string;
password: string;
};
}>
) => {
const {
host: {
hostname,
port,
protocol,
username,
password,
},
} = ctx.params;
try {
const airDCPPSocket = new AirDCPPSocket({
protocol,
hostname: `${hostname}:${port}`,
username,
password,
});
await airDCPPSocket.connect();
return await airDCPPSocket.get(`hubs`);
} catch (err) {
throw err;
}
},
},
search: {
rest: "POST /search",
timeout: 20000,
handler: async (
ctx: Context<{
host: {
hostname;
port;
protocol;
username;
password;
};
dcppSearchQuery;
}>
) => {
try {
const {
host: {
hostname,
port,
protocol,
username,
password,
},
dcppSearchQuery,
} = ctx.params;
const airDCPPSocket = new AirDCPPSocket({
protocol,
hostname: `${hostname}:${port}`,
username,
password,
});
await airDCPPSocket.connect();
const searchInstance = await airDCPPSocket.post(
`search`
);
// Post the search
const searchInfo = await airDCPPSocket.post(
`search/${searchInstance.id}/hub_search`,
dcppSearchQuery
);
await this.sleep(10000);
const results = await airDCPPSocket.get(
`search/${searchInstance.id}/results/0/5`
);
return results;
} catch (err) {
throw err;
}
},
},
},
methods: {
sleep: (ms: number) => {
return new Promise((resolve) => setTimeout(resolve, ms));
},
},
});
}
}

View File

@@ -7,6 +7,8 @@ import {
ServiceSchema,
Errors,
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import path from "path";
import {
analyze,
@@ -22,16 +24,13 @@ export default class ImageTransformation extends Service {
super(broker);
this.parseServiceSchema({
name: "imagetransformation",
mixins: [],
mixins: [DbMixin("comics", Comic)],
settings: {
// Available fields in the responses
fields: ["_id", "name", "quantity", "price"],
fields: ["_id"],
// Validator for the `create` & `insert` actions.
entityValidator: {
name: "string|min:3",
price: "number|positive",
},
entityValidator: {},
},
hooks: {},
actions: {

View File

@@ -1,291 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2022 Rishi Ghan
*
The MIT License (MIT)
Copyright (c) 2015 Rishi Ghan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/*
* Revision History:
* Initial: 2022/01/28 Rishi Ghan
*/
"use strict";
import { refineQuery } from "filename-parser";
import { isNil, isUndefined } from "lodash";
import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
import BullMQMixin, { SandboxedJob } from "moleculer-bull";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import {
extractFromArchive,
uncompressEntireArchive,
} from "../utils/uncompression.utils";
const REDIS_URI = process.env.REDIS_URI || `redis://localhost:6379`;
const EventEmitter = require("events");
EventEmitter.defaultMaxListeners = 20;
console.log(`REDIS -> ${REDIS_URI}`);
export default class QueueService extends Service {
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "importqueue" }
) {
super(broker);
this.parseServiceSchema({
name: "importqueue",
mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
settings: {
bullmq: {
maxStalledCount: 0,
},
},
hooks: {},
queues: {
"process.import": {
concurrency: 10,
async process(job: SandboxedJob) {
console.info("New job received!", job.data);
console.info(`Processing queue...`);
// extract the cover
const result = await extractFromArchive(
job.data.fileObject.filePath
);
const {
name,
filePath,
fileSize,
extension,
mimeType,
cover,
containedIn,
comicInfoJSON,
} = result;
// Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(
result.name
);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
);
// Add the bundleId, if present to the payload
let bundleId = null;
if (!isNil(job.data.bundleId)) {
bundleId = job.data.bundleId;
}
// Orchestrate the payload
const payload = {
importStatus: {
isImported: true,
tagged: false,
matchedResult: {
score: "0",
},
},
rawFileDetails: {
name,
filePath,
fileSize,
extension,
mimeType,
containedIn,
cover,
},
inferredMetadata: {
issue: inferredIssueDetails,
},
sourcedMetadata: {
// except for ComicInfo.xml, everything else should be copied over from the
// parent comic
comicInfo: comicInfoJSON,
},
// since we already have at least 1 copy
// mark it as not wanted by default
"acquisition.source.wanted": false,
// clear out the downloads array
// "acquisition.directconnect.downloads": [],
// mark the metadata source
"acquisition.source.name": job.data.sourcedFrom,
};
// Add the sourcedMetadata, if present
if (!isNil(job.data.sourcedMetadata) && !isUndefined(job.data.sourcedMetadata.comicvine)) {
Object.assign(
payload.sourcedMetadata,
job.data.sourcedMetadata
);
}
// write to mongo
const importResult = await this.broker.call(
"library.rawImportToDB",
{
importType: job.data.importType,
bundleId,
payload,
}
);
return {
data: {
importResult,
},
id: job.id,
worker: process.pid,
};
},
},
"process.uncompressAndResize": {
concurrency: 2,
async process(job: SandboxedJob) {
console.log(`Initiating uncompression job...`);
return await uncompressEntireArchive(
job.data.filePath,
job.data.options
);
},
},
},
actions: {
uncompressResize: {
rest: "POST /uncompressResize",
params: {},
async handler(
ctx: Context<{
data: { filePath: string; options: any };
}>
) {
return await this.createJob(
"process.uncompressAndResize",
ctx.params
);
},
},
processImport: {
rest: "POST /processImport",
params: {},
async handler(
ctx: Context<{
fileObject: object;
importType: string;
bundleId: number;
sourcedFrom?: string;
sourcedMetadata: object;
}>
) {
return await this.createJob("process.import", {
fileObject: ctx.params.fileObject,
importType: ctx.params.importType,
bundleId: ctx.params.bundleId,
sourcedFrom: ctx.params.sourcedFrom,
sourcedMetadata: ctx.params.sourcedMetadata,
});
},
},
toggleImportQueue: {
rest: "POST /pauseImportQueue",
params: {},
handler: async (ctx: Context<{ action: string }>) => {
switch (ctx.params.action) {
case "pause":
const foo = await this.getQueue(
"process.import"
).pause();
console.log("paused", foo);
return foo;
case "resume":
const soo = await this.getQueue(
"process.import"
).resume();
console.log("resumed", soo);
return soo;
default:
console.log("Unrecognized queue action.");
}
},
},
},
methods: {},
async started(): Promise<any> {
await this.getQueue("process.import").on(
"failed",
async (job, error) => {
console.error(
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
);
console.error(job.data);
}
);
await this.getQueue("process.import").on(
"completed",
async (job, res) => {
await this.broker.call("socket.broadcast", {
namespace: "/", //optional
event: "action",
args: [{ type: "LS_COVER_EXTRACTED", result: res }], //optional
});
console.info(
`Import Job with the id '${job.id}' completed.`
);
}
);
await this.getQueue("process.import").on(
"stalled",
async (job) => {
console.warn(`Import job '${job.id} stalled!`);
console.log(`${JSON.stringify(job, null, 2)}`);
console.log(`is stalled.`);
}
);
await this.getQueue("process.uncompressAndResize").on(
"completed",
async (job, res) => {
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
args: [
{
type: "COMICBOOK_EXTRACTION_SUCCESS",
result: {
files: res,
purpose: job.data.options.purpose,
},
},
],
});
console.info(`Uncompression Job ${job.id} completed.`);
}
);
},
});
}
}

View File

@@ -0,0 +1,448 @@
import { Context, Service, ServiceBroker } from "moleculer";
import JobResult from "../models/jobresult.model";
import { refineQuery } from "filename-parser";
import BullMqMixin from "moleculer-bullmq";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
const ObjectId = require("mongoose").Types.ObjectId;
import {
extractFromArchive,
uncompressEntireArchive,
} from "../utils/uncompression.utils";
import { isNil, isUndefined } from "lodash";
import { pubClient } from "../config/redis.config";
import path from "path";
const { MoleculerError } = require("moleculer").Errors;
export default class JobQueueService extends Service {
public constructor(public broker: ServiceBroker) {
super(broker);
this.parseServiceSchema({
name: "jobqueue",
hooks: {},
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: pubClient,
},
},
actions: {
getJobCountsByType: {
rest: "GET /getJobCountsByType",
handler: async (ctx: Context<{}>) => {
console.log(ctx.params);
return await this.$resolve("jobqueue").getJobCounts();
},
},
toggle: {
rest: "GET /toggle",
handler: async (ctx: Context<{ action: String }>) => {
switch (ctx.params.action) {
case "pause":
this.pause();
break;
case "resume":
this.resume();
break;
default:
console.log(`Unknown queue action.`);
}
},
},
enqueue: {
queue: true,
rest: "GET /enqueue",
handler: async (
ctx: Context<{ action: string; description: string }>
) => {
try {
const { action, description } = ctx.params;
// Enqueue the job
const job = await this.localQueue(
ctx,
action,
{},
{
priority: 10,
}
);
console.log(`Job ${job.id} enqueued`);
console.log(`${description}`);
return job.id;
} catch (error) {
console.error("Failed to enqueue job:", error);
}
},
},
// Comic Book Import Job Queue
"enqueue.async": {
handler: async (
ctx: Context<{
sessionId: String;
}>
) => {
try {
console.log(
`Recieved Job ID ${ctx.locals.job.id}, processing...`
);
// 1. De-structure the job params
const { fileObject } = ctx.locals.job.data.params;
// 2. Extract metadata from the archive
const result = await extractFromArchive(
fileObject.filePath
);
const {
name,
filePath,
fileSize,
extension,
mimeType,
cover,
containedIn,
comicInfoJSON,
} = result;
// 3a. Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(
result.name
);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
);
// 3b. Orchestrate the payload
const payload = {
importStatus: {
isImported: true,
tagged: false,
matchedResult: {
score: "0",
},
},
rawFileDetails: {
name,
filePath,
fileSize,
extension,
mimeType,
containedIn,
cover,
},
inferredMetadata: {
issue: inferredIssueDetails,
},
sourcedMetadata: {
// except for ComicInfo.xml, everything else should be copied over from the
// parent comic
comicInfo: comicInfoJSON,
},
// since we already have at least 1 copy
// mark it as not wanted by default
"acquisition.source.wanted": false,
// clear out the downloads array
// "acquisition.directconnect.downloads": [],
// mark the metadata source
"acquisition.source.name":
ctx.locals.job.data.params.sourcedFrom,
};
// 3c. Add the bundleId, if present to the payload
let bundleId = null;
if (!isNil(ctx.locals.job.data.params.bundleId)) {
bundleId = ctx.locals.job.data.params.bundleId;
}
// 3d. Add the sourcedMetadata, if present
if (
!isNil(
ctx.locals.job.data.params.sourcedMetadata
) &&
!isUndefined(
ctx.locals.job.data.params.sourcedMetadata
.comicvine
)
) {
Object.assign(
payload.sourcedMetadata,
ctx.locals.job.data.params.sourcedMetadata
);
}
// 4. write to mongo
const importResult = await this.broker.call(
"library.rawImportToDB",
{
importType:
ctx.locals.job.data.params.importType,
bundleId,
payload,
}
);
return {
data: {
importResult,
},
id: ctx.locals.job.id,
sessionId: ctx.params.sessionId,
};
} catch (error) {
console.error(
`An error occurred processing Job ID ${ctx.locals.job.id}`
);
throw new MoleculerError(
error,
500,
"IMPORT_JOB_ERROR",
{
data: ctx.params.sessionId,
}
);
}
},
},
getJobResultStatistics: {
rest: "GET /getJobResultStatistics",
handler: async (ctx: Context<{}>) => {
return await JobResult.aggregate([
{
$group: {
_id: {
sessionId: "$sessionId",
status: "$status",
},
earliestTimestamp: {
$min: "$timestamp",
},
count: {
$sum: 1,
},
},
},
{
$group: {
_id: "$_id.sessionId",
statuses: {
$push: {
status: "$_id.status",
earliestTimestamp:
"$earliestTimestamp",
count: "$count",
},
},
},
},
{
$project: {
_id: 0,
sessionId: "$_id",
completedJobs: {
$reduce: {
input: "$statuses",
initialValue: 0,
in: {
$sum: [
"$$value",
{
$cond: [
{
$eq: [
"$$this.status",
"completed",
],
},
"$$this.count",
0,
],
},
],
},
},
},
failedJobs: {
$reduce: {
input: "$statuses",
initialValue: 0,
in: {
$sum: [
"$$value",
{
$cond: [
{
$eq: [
"$$this.status",
"failed",
],
},
"$$this.count",
0,
],
},
],
},
},
},
earliestTimestamp: {
$min: "$statuses.earliestTimestamp",
},
},
},
]);
},
},
"uncompressFullArchive.async": {
rest: "POST /uncompressFullArchive",
handler: async (
ctx: Context<{
filePath: string;
comicObjectId: string;
options: any;
}>
) => {
console.log(
`Recieved Job ID ${JSON.stringify(
ctx.locals
)}, processing...`
);
const { filePath, options, comicObjectId } = ctx.params;
const comicId = new ObjectId(comicObjectId);
// 2. Extract metadata from the archive
const result: string[] = await uncompressEntireArchive(
filePath,
options
);
if (Array.isArray(result) && result.length !== 0) {
// Get the containing directory of the uncompressed archive
const directoryPath = path.dirname(result[0]);
// Add to mongo object
await Comic.findByIdAndUpdate(
comicId,
{
$set: {
"rawFileDetails.archive": {
uncompressed: true,
expandedPath: directoryPath,
},
},
},
{ new: true, safe: true, upsert: true }
);
return result;
}
},
},
},
events: {
async "uncompressFullArchive.async.active"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} is set to active.`
);
},
async "uncompressFullArchive.async.completed"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} completed.`
);
const job = await this.job(ctx.params.id);
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_UNCOMPRESSION_JOB_COMPLETE",
args: [
{
uncompressedArchive: job.returnvalue,
},
],
});
return job.returnvalue;
},
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
console.log(`Job ID ${ctx.params.id} is set to active.`);
},
async drained(ctx) {
console.log("Queue drained.");
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_IMPORT_QUEUE_DRAINED",
args: [
{
message: "drained",
},
],
});
},
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
// 1. Fetch the job result using the job Id
const job = await this.job(ctx.params.id);
// 2. Increment the completed job counter
await pubClient.incr("completedJobCount");
// 3. Fetch the completed job count for the final payload to be sent to the client
const completedJobCount = await pubClient.get(
"completedJobCount"
);
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_COVER_EXTRACTED",
args: [
{
completedJobCount,
importResult: job.returnvalue.data.importResult,
},
],
});
// 5. Persist the job results in mongo for posterity
await JobResult.create({
id: ctx.params.id,
status: "completed",
timestamp: job.timestamp,
sessionId: job.returnvalue.sessionId,
failedReason: {},
});
console.log(`Job ID ${ctx.params.id} completed.`);
},
async "enqueue.async.failed"(ctx) {
const job = await this.job(ctx.params.id);
await pubClient.incr("failedJobCount");
const failedJobCount = await pubClient.get(
"failedJobCount"
);
await JobResult.create({
id: ctx.params.id,
status: "failed",
failedReason: job.failedReason,
sessionId: job.data.params.sessionId,
timestamp: job.timestamp,
});
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_COVER_EXTRACTION_FAILED",
args: [
{
failedJobCount,
importResult: job,
},
],
});
},
},
methods: {},
});
}
}

View File

@@ -51,31 +51,51 @@ import {
IExtractionOptions,
} from "threetwo-ui-typings";
const ObjectId = require("mongoose").Types.ObjectId;
import { pubClient } from "../config/redis.config";
import fsExtra from "fs-extra";
const through2 = require("through2");
import klaw from "klaw";
import path from "path";
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
import AirDCPPSocket from "../shared/airdcpp.socket";
console.log(`MONGO -> ${process.env.MONGO_URI}`);
export default class ImportService extends Service {
public constructor(public broker: ServiceBroker) {
export default class LibraryService extends Service {
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "library" }
) {
super(broker);
this.parseServiceSchema({
name: "library",
mixins: [DbMixin("comics", Comic)],
hooks: {},
actions: {
getHealthInformation: {
rest: "GET /getHealthInformation",
params: {},
handler: async (ctx: Context<{}>) => {
try {
return await ctx.broker.call("$node.services");
} catch (error) {
return new Error("Service is down.");
}
},
},
walkFolders: {
rest: "POST /walkFolders",
params: {
basePathToWalk: "string",
},
async handler(ctx: Context<{ basePathToWalk: string }>) {
params: {},
async handler(
ctx: Context<{
basePathToWalk: string;
extensions: string[];
}>
) {
console.log(ctx.params);
return await walkFolder(ctx.params.basePathToWalk, [
".cbz",
".cbr",
".cb7",
...ctx.params.extensions,
]);
},
},
@@ -90,11 +110,18 @@ export default class ImportService extends Service {
rest: "POST /uncompressFullArchive",
params: {},
handler: async (
ctx: Context<{ filePath: string; options: any }>
ctx: Context<{
filePath: string;
comicObjectId: string;
options: any;
}>
) => {
await broker.call("importqueue.uncompressResize", {
this.broker.call("jobqueue.enqueue", {
filePath: ctx.params.filePath,
comicObjectId: ctx.params.comicObjectId,
options: ctx.params.options,
action: "uncompressFullArchive.async",
description: `Job for uncompressing archive at ${ctx.params.filePath}`,
});
},
},
@@ -139,64 +166,55 @@ export default class ImportService extends Service {
},
newImport: {
rest: "POST /newImport",
params: {},
async handler(
ctx: Context<{
extractionOptions?: any;
}>
) {
// 1. Walk the Source folder
klaw(path.resolve(COMICS_DIRECTORY))
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
let fileExtension = path.extname(item.path);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
this.push(item);
}
next();
})
)
// 1.2 Pipe filtered results to the next step
.on("data", async (item) => {
async handler(ctx) {
const { sessionId } = ctx.params;
try {
// Initialize Redis counters once at the start of the import
await pubClient.set("completedJobCount", 0);
await pubClient.set("failedJobCount", 0);
// Convert klaw to use a promise-based approach for better flow control
const files = await this.getComicFiles(
COMICS_DIRECTORY
);
for (const file of files) {
console.info(
"Found a file at path: %s",
item.path
"Found a file at path:",
file.path
);
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
path.extname(item.path)
)}`,
const comicExists = await Comic.exists({
"rawFileDetails.name": path.basename(
file.path,
path.extname(file.path)
),
});
if (!comicExists) {
// 2. Send the extraction job to the queue
await broker.call(
"importqueue.processImport",
{
fileObject: {
filePath: item.path,
fileSize: item.stats.size,
},
importType: "new",
}
);
// Send the extraction job to the queue
await this.broker.call("jobqueue.enqueue", {
fileObject: {
filePath: file.path,
fileSize: file.stats.size,
},
sessionId,
importType: "new",
action: "enqueue.async",
});
} else {
console.log(
"Comic already exists in the library."
);
}
})
.on("end", () => {
console.log("All files traversed.");
});
}
console.log("All files traversed.");
} catch (error) {
console.error(
"Error during newImport processing:",
error
);
}
},
},
rawImportToDB: {
rest: "POST /rawImportToDB",
params: {},
@@ -207,10 +225,7 @@ export default class ImportService extends Service {
payload: {
_id?: string;
sourcedMetadata: {
comicvine?: {
volume: { api_detail_url: string };
volumeInformation: {};
};
comicvine?: any;
locg?: {};
};
inferredMetadata: {
@@ -219,11 +234,13 @@ export default class ImportService extends Service {
rawFileDetails: {
name: string;
};
wanted: {
issues: [];
volume: { id: number };
source: string;
markEntireVolumeWanted: Boolean;
};
acquisition: {
source: {
wanted: boolean;
name?: string;
};
directconnect: {
downloads: [];
};
@@ -232,61 +249,109 @@ export default class ImportService extends Service {
}>
) {
try {
let volumeDetails;
const comicMetadata = ctx.params.payload;
// When an issue is added from the search CV feature
// we solicit volume information and add that to mongo
if (
comicMetadata.sourcedMetadata.comicvine &&
!isNil(
comicMetadata.sourcedMetadata.comicvine
.volume
)
) {
volumeDetails = await this.broker.call(
"comicvine.getVolumes",
{
volumeURI:
comicMetadata.sourcedMetadata
.comicvine.volume
.api_detail_url,
}
);
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
volumeDetails.results;
}
console.log(
JSON.stringify(ctx.params.payload, null, 4)
);
const { payload } = ctx.params;
const { wanted } = payload;
console.log("Saving to Mongo...");
console.log(
`Import type: [${ctx.params.importType}]`
);
switch (ctx.params.importType) {
case "new":
return await Comic.create(comicMetadata);
case "update":
return await Comic.findOneAndUpdate(
{
"acquisition.directconnect.downloads.bundleId":
ctx.params.bundleId,
},
comicMetadata,
{
upsert: true,
new: true,
}
);
default:
return false;
if (
!wanted ||
!wanted.volume ||
!wanted.volume.id
) {
console.log(
"No valid identifier for upsert. Attempting to create a new document with minimal data..."
);
const newDocument = new Comic(payload); // Using the entire payload for the new document
await newDocument.save();
return {
success: true,
message:
"New document created due to lack of valid identifiers.",
data: newDocument,
};
}
let condition = {
"wanted.volume.id": wanted.volume.id,
};
let update: any = {
// Using 'any' to bypass strict type checks; alternatively, define a more accurate type
$set: {
rawFileDetails: payload.rawFileDetails,
inferredMetadata: payload.inferredMetadata,
sourcedMetadata: payload.sourcedMetadata,
},
$setOnInsert: {
"wanted.source": payload.wanted.source,
"wanted.markEntireVolumeWanted":
payload.wanted.markEntireVolumeWanted,
"wanted.volume": payload.wanted.volume,
},
};
if (wanted.issues && wanted.issues.length > 0) {
update.$addToSet = {
"wanted.issues": { $each: wanted.issues },
};
}
const options = {
upsert: true,
new: true,
};
const result = await Comic.findOneAndUpdate(
condition,
update,
options
);
console.log(
"Operation completed. Document updated or inserted:",
result
);
return {
success: true,
message: "Document successfully upserted.",
data: result,
};
} catch (error) {
console.log(error);
throw new Errors.MoleculerError(
"Import failed.",
"Operation failed.",
500
);
}
},
},
getComicsMarkedAsWanted: {
rest: "GET /getComicsMarkedAsWanted",
handler: async (ctx: Context<{}>) => {
try {
// Query to find comics where 'markEntireVolumeAsWanted' is true or 'issues' array is not empty
const wantedComics = await Comic.find({
wanted: { $exists: true },
$or: [
{ "wanted.markEntireVolumeWanted": true },
{ "wanted.issues": { $not: { $size: 0 } } },
],
});
console.log(wantedComics); // Output the found comics
return wantedComics;
} catch (error) {
console.error("Error finding comics:", error);
throw error;
}
},
},
applyComicVineMetadata: {
rest: "POST /applyComicVineMetadata",
params: {},
@@ -383,6 +448,66 @@ export default class ImportService extends Service {
});
},
},
applyTorrentDownloadMetadata: {
rest: "POST /applyTorrentDownloadMetadata",
handler: async (
ctx: Context<{
torrentToDownload: any;
comicObjectId: String;
infoHash: String;
name: String;
announce: [String];
}>
) => {
const {
name,
torrentToDownload,
comicObjectId,
announce,
infoHash,
} = ctx.params;
console.log(JSON.stringify(ctx.params, null, 4));
try {
return await Comic.findByIdAndUpdate(
new ObjectId(comicObjectId),
{
$push: {
"acquisition.torrent": {
infoHash,
name,
announce,
},
},
},
{ new: true, safe: true, upsert: true }
);
} catch (err) {
console.log(err);
}
},
},
getInfoHashes: {
rest: "GET /getInfoHashes",
handler: async (ctx: Context<{}>) => {
try {
return await Comic.aggregate([
{
$unwind: "$acquisition.torrent",
},
{
$group: {
_id: "$_id",
infoHashes: {
$push: "$acquisition.torrent.infoHash",
},
},
},
]);
} catch (err) {
return err;
}
},
},
getComicBooks: {
rest: "POST /getComicBooks",
params: {},
@@ -402,7 +527,10 @@ export default class ImportService extends Service {
rest: "POST /getComicBookById",
params: { id: "string" },
async handler(ctx: Context<{ id: string }>) {
return await Comic.findById(ctx.params.id);
console.log(ctx.params.id);
return await Comic.findById(
new ObjectId(ctx.params.id)
);
},
},
getComicBooksByIds: {
@@ -621,6 +749,48 @@ export default class ImportService extends Service {
},
},
// This method belongs in library service,
// because bundles can only exist for comics _in the library_
// (wanted or imported)
getBundles: {
rest: "POST /getBundles",
params: {},
handler: async (
ctx: Context<{
comicObjectId: string;
config: any;
}>
) => {
try {
// 1. Get the comic object Id
const { config } = ctx.params;
const comicObject = await Comic.findById(
new ObjectId(ctx.params.comicObjectId)
);
// 2. Init AirDC++
const ADCPPSocket = new AirDCPPSocket(config);
await ADCPPSocket.connect();
// 3. Get the bundles for the comic object
if (comicObject) {
// make the call to get the bundles from AirDC++ using the bundleId
const bundles =
comicObject.acquisition.directconnect.downloads.map(
async (bundle) => {
return await ADCPPSocket.get(
`queue/bundles/${bundle.bundleId}`
);
}
);
return Promise.all(bundles);
}
} catch (error) {
throw new Errors.MoleculerError(
"Couldn't fetch bundles from AirDC++",
500
);
}
},
},
flushDB: {
rest: "POST /flushDB",
params: {},
@@ -670,7 +840,35 @@ export default class ImportService extends Service {
},
},
},
methods: {},
methods: {
// Method to walk the directory and filter comic files
getComicFiles: (directory) => {
return new Promise((resolve, reject) => {
const files = [];
klaw(directory)
.pipe(
through2.obj(function (item, enc, next) {
const fileExtension = path.extname(
item.path
);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
this.push(item);
}
next();
})
)
.on("data", (item) => {
files.push(item);
})
.on("end", () => resolve(files))
.on("error", (err) => reject(err));
});
},
},
});
}
}

View File

@@ -46,14 +46,15 @@ export default class SettingsService extends Service {
.map((item) => JSON.stringify(item))
.join("\n");
queries += "\n";
const { body } = await eSClient.msearch({
const { responses } = await eSClient.msearch({
body: queries,
});
body.responses.forEach((match) => {
responses.forEach((match) => {
console.log(match.hits);
});
return body.responses;
return responses;
},
},
issue: {
@@ -74,9 +75,9 @@ export default class SettingsService extends Service {
) => {
try {
console.log(ctx.params);
const { query, pagination } = ctx.params;
const { query, pagination, type } = ctx.params;
let eSQuery = {};
switch (ctx.params.type) {
switch (type) {
case "all":
Object.assign(eSQuery, {
match_all: {},
@@ -99,12 +100,19 @@ export default class SettingsService extends Service {
case "wanted":
Object.assign(eSQuery, {
bool: {
must: {
term: {
"acquisition.source.wanted":
true,
should: [
{
exists: {
field: "wanted.issues",
},
},
},
{
exists: {
field: "wanted.volume",
},
},
],
minimum_should_match: 1,
},
});
break;

View File

@@ -8,7 +8,7 @@ import {
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Settings from "../models/settings.model";
import { isEmpty, pickBy, identity, map } from "lodash";
import { isEmpty, pickBy, identity, map, isNil } from "lodash";
const ObjectId = require("mongoose").Types.ObjectId;
export default class SettingsService extends Service {
@@ -28,12 +28,31 @@ export default class SettingsService extends Service {
rest: "GET /getAllSettings",
params: {},
async handler(ctx: Context<{ settingsKey: string }>) {
const settings = await Settings.find({});
if (isEmpty(settings)) {
const { settingsKey } = ctx.params;
// Initialize a projection object. Include everything by default.
let projection = settingsKey
? { _id: 0, [settingsKey]: 1 }
: {};
// Find the settings with the dynamic projection
const settings = await Settings.find({}, projection);
if (settings.length === 0) {
return {};
}
console.log(settings[0]);
return settings[0];
// If settingsKey is provided, return the specific part of the settings.
// Otherwise, return the entire settings document.
if (settingsKey) {
// Check if the specific key exists in the settings document.
// Since `settings` is an array, we access the first element.
// Then, we use the settingsKey to return only that part of the document.
return settings[0][settingsKey] || {};
} else {
// Return the entire settings document
return settings[0];
}
},
},
@@ -42,44 +61,107 @@ export default class SettingsService extends Service {
params: {},
async handler(
ctx: Context<{
settingsPayload: {
host: object;
airDCPPUserSettings: object;
hubs: [];
settingsPayload?: {
protocol: string;
hostname: string;
port: string;
username: string;
password: string;
_id?: string;
airDCPPUserSettings?: object;
hubs?: [];
};
settingsObjectId: string;
settingsObjectId?: string;
settingsKey: string;
}>
) {
console.log("varan bhat", ctx.params);
const { host, airDCPPUserSettings, hubs } =
ctx.params.settingsPayload;
let query = {
host,
airDCPPUserSettings,
hubs,
};
const keysToUpdate = pickBy(query, identity);
let updateQuery = {};
try {
console.log(ctx.params);
let query = {};
const { settingsKey, settingsObjectId } =
ctx.params;
const {
hostname,
protocol,
port,
username,
password,
} = ctx.params.settingsPayload;
const host = {
hostname,
protocol,
port,
username,
password,
};
const undefinedPropsInHostname = Object.values(
host
).filter((value) => value === undefined);
map(Object.keys(keysToUpdate), (key) => {
updateQuery[`directConnect.client.${key}`] =
query[key];
});
const options = {
upsert: true,
new: true,
setDefaultsOnInsert: true,
};
const filter = {
_id: new ObjectId(ctx.params.settingsObjectId),
};
const result = Settings.findOneAndUpdate(
filter,
{ $set: updateQuery },
options
);
// Update, depending what key was passed in params
// 1. Construct the update query
switch (settingsKey) {
case "bittorrent":
console.log(
`Recieved settings for ${settingsKey}, building query...`
);
query = {
...(undefinedPropsInHostname.length ===
0 && {
$set: {
"bittorrent.client.host": host,
},
}),
};
break;
case "directConnect":
console.log(
`Recieved settings for ${settingsKey}, building query...`
);
const { hubs, airDCPPUserSettings } =
ctx.params.settingsPayload;
query = {
...(undefinedPropsInHostname.length ===
0 && {
$set: {
"directConnect.client.host":
host,
},
}),
...(!isNil(hubs) && {
$set: {
"directConnect.client.hubs":
hubs,
},
}),
};
console.log(JSON.stringify(query, null, 4));
break;
return result;
default:
return false;
}
// 2. Set up options, filters
const options = {
upsert: true,
setDefaultsOnInsert: true,
returnDocument: "after",
};
const filter = settingsObjectId
? { _id: settingsObjectId }
: {};
// 3. Execute the mongo query
const result = await Settings.findOneAndUpdate(
filter,
query,
options
);
return result;
} catch (err) {
return err;
}
},
},
deleteSettings: {

View File

@@ -1,16 +1,14 @@
"use strict";
import { Service, ServiceBroker, ServiceSchema } from "moleculer";
import { createClient } from "redis";
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
import { JobType } from "moleculer-bullmq";
import { createAdapter } from "@socket.io/redis-adapter";
import Session from "../models/session.model";
import { pubClient, subClient } from "../config/redis.config";
const { MoleculerError } = require("moleculer").Errors;
const SocketIOService = require("moleculer-io");
const redisURL = new URL(process.env.REDIS_URI);
// console.log(redisURL.hostname);
const { v4: uuidv4 } = require("uuid");
import AirDCPPSocket from "../shared/airdcpp.socket";
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
(async () => {
await pubClient.connect();
})();
const subClient = pubClient.duplicate();
export default class SocketService extends Service {
// @ts-ignore
public constructor(
@@ -25,48 +23,12 @@ export default class SocketService extends Service {
port: process.env.PORT || 3001,
io: {
namespaces: {
"/": {
"/automated": {
events: {
call: {
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
},
action: async (data, ack) => {
// write your handler function here.
switch (data.type) {
case "LS_IMPORT":
console.log(
`Recieved ${data.type} event.`
);
// 1. Send task to queue
await this.broker.call(
"library.newImport",
data.data,
{}
);
break;
case "LS_TOGGLE_IMPORT_QUEUE":
await this.broker.call(
"importqueue.toggleImportQueue",
data.data,
{}
);
break;
case "LS_SINGLE_IMPORT":
console.info(
"AirDC++ finished a download -> "
);
console.log(data);
await this.broker.call(
"library.importDownloadedComic",
{ bundle: data },
{}
);
break;
// uncompress archive events
case "COMICBOOK_EXTRACTION_SUCCESS":
console.log(data);
return data;
}
whitelist: [
"socket.*", // Allow 'search' in the automated namespace
],
},
},
},
@@ -77,12 +39,283 @@ export default class SocketService extends Service {
},
},
hooks: {},
actions: {},
methods: {},
actions: {
resumeSession: async (ctx: Context<{ sessionId: string }>) => {
const { sessionId } = ctx.params;
console.log("Attempting to resume session...");
try {
const sessionRecord = await Session.find({
sessionId,
});
// 1. Check for sessionId's existence, and a match
if (
sessionRecord.length !== 0 &&
sessionRecord[0].sessionId === sessionId
) {
// 2. Find if the queue has active, paused or waiting jobs
const jobs: JobType = await this.broker.call(
"jobqueue.getJobCountsByType",
{}
);
const { active, paused, waiting } = jobs;
if (active > 0 || paused > 0 || waiting > 0) {
// 3. Get job counts
const completedJobCount = await pubClient.get("completedJobCount");
const failedJobCount = await pubClient.get("failedJobCount");
// 4. Send the counts to the active socket.io session
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION",
args: [
{
completedJobCount,
failedJobCount,
queueStatus: "running",
},
],
});
}
}
} catch (err) {
throw new MoleculerError(err, 500, "SESSION_ID_NOT_FOUND", {
data: sessionId,
});
}
},
setQueueStatus: async (
ctx: Context<{
queueAction: string;
queueStatus: string;
}>
) => {
const { queueAction } = ctx.params;
await this.broker.call("jobqueue.toggle", { action: queueAction }, {});
},
importSingleIssue: async (ctx: Context<{}>) => {
console.info("AirDC++ finished a download -> ");
console.log(ctx.params);
// await this.broker.call(
// "library.importDownloadedComic",
// { bundle: data },
// {}
// );
},
search: {
params: {
query: "object",
config: "object",
},
async handler(ctx) {
const { query, config, namespace } = ctx.params;
const namespacedInstance = this.io.of(namespace || "/");
const ADCPPSocket = new AirDCPPSocket(config);
try {
await ADCPPSocket.connect();
const instance = await ADCPPSocket.post("search", query);
// Send the instance to the client
await namespacedInstance.emit("searchInitiated", {
instance,
});
// Setting up listeners
await ADCPPSocket.addListener(
`search`,
`search_result_added`,
(groupedResult) => {
console.log(JSON.stringify(groupedResult, null, 4));
namespacedInstance.emit("searchResultAdded", groupedResult);
},
instance.id
);
await ADCPPSocket.addListener(
`search`,
`search_result_updated`,
(updatedResult) => {
namespacedInstance.emit("searchResultUpdated", updatedResult);
},
instance.id
);
await ADCPPSocket.addListener(
`search`,
`search_hub_searches_sent`,
async (searchInfo) => {
await this.sleep(5000);
const currentInstance = await ADCPPSocket.get(
`search/${instance.id}`
);
// Send the instance to the client
await namespacedInstance.emit("searchesSent", {
searchInfo,
});
if (currentInstance.result_count === 0) {
console.log("No more search results.");
namespacedInstance.emit("searchComplete", {
message: "No more search results.",
});
}
},
instance.id
);
// Perform the actual search
await ADCPPSocket.post(`search/${instance.id}/hub_search`, query);
} catch (error) {
await namespacedInstance.emit("searchError", error.message);
throw new MoleculerError("Search failed", 500, "SEARCH_FAILED", {
error,
});
} finally {
// await ADCPPSocket.disconnect();
}
},
},
download: {
// params: {
// searchInstanceId: "string",
// resultId: "string",
// comicObjectId: "string",
// name: "string",
// size: "number",
// type: "any", // Define more specific type if possible
// config: "object",
// },
async handler(ctx) {
console.log(ctx.params);
const {
searchInstanceId,
resultId,
config,
comicObjectId,
name,
size,
type,
} = ctx.params;
const ADCPPSocket = new AirDCPPSocket(config);
try {
await ADCPPSocket.connect();
const downloadResult = await ADCPPSocket.post(
`search/${searchInstanceId}/results/${resultId}/download`
);
if (downloadResult && downloadResult.bundle_info) {
// Assume bundle_info is part of the response and contains the necessary details
const bundleDBImportResult = await ctx.call(
"library.applyAirDCPPDownloadMetadata",
{
bundleId: downloadResult.bundle_info.id,
comicObjectId,
name,
size,
type,
}
);
this.logger.info(
"Download and metadata update successful",
bundleDBImportResult
);
this.broker.emit("downloadCompleted", bundleDBImportResult);
return bundleDBImportResult;
} else {
throw new Error(
"Failed to download or missing download result information"
);
}
} catch (error) {
this.broker.emit("downloadError", error.message);
throw new MoleculerError("Download failed", 500, "DOWNLOAD_FAILED", {
error,
});
} finally {
// await ADCPPSocket.disconnect();
}
},
},
listenBundleTick: {
async handler(ctx) {
const { config } = ctx.params;
const ADCPPSocket = new AirDCPPSocket(config);
try {
await ADCPPSocket.connect();
console.log("Connected to AirDCPP successfully.");
ADCPPSocket.addListener(
"queue",
"queue_bundle_tick",
(tickData) => {
console.log("Received tick data: ", tickData);
this.io.emit("bundleTickUpdate", tickData);
},
null
); // Assuming no specific ID is needed here
} catch (error) {
console.error(
"Error connecting to AirDCPP or setting listener:",
error
);
throw error;
}
},
},
},
methods: {
sleep: (ms: number): Promise<NodeJS.Timeout> => {
return new Promise((resolve) => setTimeout(resolve, ms));
},
},
async started() {
this.io.on("connection", (data) =>
console.log("socket.io server initialized.")
);
this.logger.info("Starting Socket Service...");
this.logger.debug("pubClient:", pubClient);
this.logger.debug("subClient:", subClient);
if (!pubClient || !subClient) {
this.logger.error("Redis clients are not initialized!");
throw new Error("Redis clients are not initialized!");
}
// Additional checks or logic if necessary
if (pubClient.status !== "ready") {
await pubClient.connect();
}
if (subClient.status !== "ready") {
await subClient.connect();
}
this.io.on("connection", async (socket) => {
console.log(
`socket.io server connected to client with session ID: ${socket.id}`
);
console.log("Looking up sessionId in Mongo...");
const sessionIdExists = await Session.find({
sessionId: socket.handshake.query.sessionId,
});
// 1. if sessionId isn't found in Mongo, create one and persist it
if (sessionIdExists.length === 0) {
console.log(
`Socket Id ${socket.id} not found in Mongo, creating a new session...`
);
const sessionId = uuidv4();
socket.sessionId = sessionId;
console.log(`Saving session ${sessionId} to Mongo...`);
await Session.create({
sessionId,
socketId: socket.id,
});
socket.emit("sessionInitialized", sessionId);
}
// 2. else, retrieve it from Mongo and "resume" the socket.io connection
else {
console.log(`Found socketId ${socket.id}, no-op.`);
}
});
},
});
}

View File

@@ -0,0 +1,99 @@
"use strict";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import BullMqMixin from "moleculer-bullmq";
import { pubClient } from "../config/redis.config";
const { MoleculerError } = require("moleculer").Errors;
export default class ImageTransformation extends Service {
// @ts-ignore
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "torrentjobs" }
) {
super(broker);
this.parseServiceSchema({
name: "torrentjobs",
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: pubClient,
},
},
hooks: {},
actions: {
getTorrentData: {
queue: true,
rest: "GET /getTorrentData",
handler: async (ctx: Context<{ trigger: string }>) => {
const { trigger } = ctx.params;
console.log(`Recieved ${trigger} as the trigger...`);
const jobOptions = {
jobId: "retrieveTorrentData",
name: "bossy",
repeat: {
every: 10000, // Repeat every 10000 ms
limit: 100, // Limit to 100 repeats
},
};
const job = await this.localQueue(
ctx,
"fetchTorrentData",
ctx.params,
jobOptions
);
return job;
},
},
fetchTorrentData: {
rest: "GET /fetchTorrentData",
handler: async (
ctx: Context<{
birdName: String;
}>
) => {
const repeatableJob = await this.$resolve(
"torrentjobs"
).getRepeatableJobs();
console.info(repeatableJob);
console.info(
`Scheduled job for fetching torrent data fired.`
);
// 1. query mongo for infohashes
const infoHashes = await this.broker.call(
"library.getInfoHashes",
{}
);
// 2. query qbittorrent to see if they exist
const torrents: any = await this.broker.call(
"qbittorrent.getTorrentRealTimeStats",
{ infoHashes }
);
// 4.
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "AS_TORRENT_DATA",
args: [
{
torrents,
},
],
});
// 3. If they do, don't do anything
// 4. If they don't purge them from mongo
},
},
},
methods: {},
});
}
}

73
shared/airdcpp.socket.ts Normal file
View File

@@ -0,0 +1,73 @@
const WebSocket = require("ws");
const { Socket } = require("airdcpp-apisocket");
class AirDCPPSocket {
// Explicitly declare properties
options; // Holds configuration options
socketInstance; // Instance of the AirDCPP Socket
constructor(configuration: any) {
let socketProtocol = configuration.protocol === "https" ? "wss" : "ws";
this.options = {
url: `${socketProtocol}://${configuration.hostname}/api/v1/`,
autoReconnect: true,
reconnectInterval: 5000, // milliseconds
logLevel: "verbose",
ignoredListenerEvents: [
"transfer_statistics",
"hash_statistics",
"hub_counts_updated",
],
username: configuration.username,
password: configuration.password,
};
// Initialize the socket instance using the configured options and WebSocket
this.socketInstance = Socket(this.options, WebSocket);
}
// Method to ensure the socket connection is established if required by the library or implementation logic
async connect() {
// Here we'll check if a connect method exists and call it
if (
this.socketInstance &&
typeof this.socketInstance.connect === "function"
) {
const sessionInformation = await this.socketInstance.connect();
return sessionInformation;
}
}
// Method to ensure the socket is disconnected properly if required by the library or implementation logic
async disconnect() {
// Similarly, check if a disconnect method exists and call it
if (
this.socketInstance &&
typeof this.socketInstance.disconnect === "function"
) {
await this.socketInstance.disconnect();
}
}
// Method to post data to an endpoint
async post(endpoint: any, data: any = {}) {
// Call post on the socket instance, assuming post is a valid method of the socket instance
return await this.socketInstance.post(endpoint, data);
}
async get(endpoint: any, data: any = {}) {
// Call post on the socket instance, assuming post is a valid method of the socket instance
return await this.socketInstance.get(endpoint, data);
}
// Method to add listeners to the socket instance for handling real-time updates or events
async addListener(event: any, handlerName: any, callback: any, id: any) {
// Attach a listener to the socket instance
return await this.socketInstance.addListener(
event,
handlerName,
callback,
id
);
}
}
export default AirDCPPSocket;

View File

@@ -21,7 +21,7 @@ const ALLOWED_IMAGE_FILE_FORMATS = [".jpg", ".jpeg", ".png"];
// Tell FileMagic where to find the magic.mgc file
FileMagic.magicFile = require.resolve("@npcz/magic/dist/magic.mgc");
// We can onlu use MAGIC_PRESERVE_ATIME on operating suystems that support
// We can only use MAGIC_PRESERVE_ATIME on operating suystems that support
// it and that includes OS X for example. It's a good practice as we don't
// want to change the last access time because we are just checking the file
// contents type
@@ -108,6 +108,14 @@ export const isValidImageFileExtension = (fileName: string): boolean => {
return includes(ALLOWED_IMAGE_FILE_FORMATS, path.extname(fileName));
};
/**
* This function constructs paths for a target extraction folder and an input file based on extraction
* options and a walked folder.
* @param {IExtractionOptions} extractionOptions - An object containing options for the extraction
* process, such as the target extraction folder.
* @param {IFolderData} walkedFolder - `walkedFolder` is an object that represents a folder that has
* been walked through during a file extraction process. It contains the following properties:
*/
export const constructPaths = (
extractionOptions: IExtractionOptions,
walkedFolder: IFolderData
@@ -142,7 +150,7 @@ export const getFileConstituents = (filePath: string) => {
};
/**
* Method that infers MIME type from a filepath
* Method that infers MIME type from a filepath
* @param {string} filePath
* @returns {Promise} string
*/
@@ -155,6 +163,15 @@ export const getMimeType = async (filePath: string) => {
});
};
/**
* This function creates a directory at a specified path using the fse.ensureDir method and throws an
* error if it fails.
* @param {any} options - The options parameter is an optional object that can be passed to the
* fse.ensureDir method to configure its behavior. It can include properties such as mode, which sets
* the permissions of the directory, and fs, which specifies the file system module to use.
* @param {string} directoryPath - The `directoryPath` parameter is a string that represents the path
* of the directory that needs to be created.
*/
export const createDirectory = async (options: any, directoryPath: string) => {
try {
await fse.ensureDir(directoryPath, options);

View File

@@ -47,6 +47,7 @@ import {
getMimeType,
} from "../utils/file.utils";
import { convertXMLToJSON } from "./xml.utils";
const { MoleculerError } = require("moleculer").Errors;
const fse = require("fs-extra");
const Unrar = require("unrar");
interface RarFile {
@@ -73,7 +74,7 @@ const errors = [];
*/
export const extractComicInfoXMLFromRar = async (
filePath: string,
mimeType: string,
mimeType: string
): Promise<any> => {
try {
// Create the target directory
@@ -209,7 +210,7 @@ export const extractComicInfoXMLFromRar = async (
export const extractComicInfoXMLFromZip = async (
filePath: string,
mimeType: string,
mimeType: string
): Promise<any> => {
try {
// Create the target directory
@@ -254,7 +255,7 @@ export const extractComicInfoXMLFromZip = async (
// Push the first file (cover) to our extraction target
extractionTargets.push(files[0].name);
filesToWriteToDisk.coverFile = path.basename(files[0].name);
if (!isEmpty(comicInfoXMLFileObject)) {
filesToWriteToDisk.comicInfoXML = comicInfoXMLFileObject[0].name;
extractionTargets.push(filesToWriteToDisk.comicInfoXML);
@@ -356,18 +357,26 @@ export const extractFromArchive = async (filePath: string) => {
switch (mimeType) {
case "application/x-7z-compressed; charset=binary":
case "application/zip; charset=binary":
const cbzResult = await extractComicInfoXMLFromZip(filePath, mimeType);
const cbzResult = await extractComicInfoXMLFromZip(
filePath,
mimeType
);
return Object.assign({}, ...cbzResult);
case "application/x-rar; charset=binary":
const cbrResult = await extractComicInfoXMLFromRar(filePath, mimeType);
const cbrResult = await extractComicInfoXMLFromRar(
filePath,
mimeType
);
return Object.assign({}, ...cbrResult);
default:
console.log(
console.error(
"Error inferring filetype for comicinfo.xml extraction."
);
break;
throw new MoleculerError({}, 500, "FILETYPE_INFERENCE_ERROR", {
data: { message: "Cannot infer filetype." },
});
}
};