Compare commits
79 Commits
mimetype-c
...
airdcpp-au
| Author | SHA1 | Date | |
|---|---|---|---|
| 68907a96a2 | |||
| aff4467b8b | |||
| e7341b553d | |||
| a4ccc78fe8 | |||
| 2247411ac8 | |||
| e61ecb1143 | |||
| e01421f17b | |||
| cc271021e0 | |||
| cc772504ae | |||
| 8dcb17a6a0 | |||
| a06896ffcc | |||
| 03f6623ed0 | |||
| 66f9d63b44 | |||
| a936df3144 | |||
| dc9dabca48 | |||
| 4680fd0875 | |||
| 323548c0ff | |||
| f4563c12c6 | |||
| 1b0cada848 | |||
| 750a74cd9f | |||
| 402ee4d81b | |||
| 1fa35ac0e3 | |||
| 680594e67c | |||
| 5593fcb4a0 | |||
| d7f3d3a7cf | |||
| 94cb95f4bf | |||
| c6651cdd91 | |||
| b35e2140b5 | |||
| f053dcb789 | |||
| aea7a24f76 | |||
| 8f0c2f4302 | |||
| 7dbe2b2701 | |||
| 5b9ef9fbbb | |||
| 4cdb11fcbd | |||
| 78f7c1b595 | |||
| bbd2906ebf | |||
| 1861c2eeed | |||
| f3965437b5 | |||
| 78e0e9f8ce | |||
| c926758db6 | |||
| b2b35aedc0 | |||
| f35e3ccbe0 | |||
| 7b0c0a7420 | |||
| c2bbbf311d | |||
| b8ca03220f | |||
| b87b0c875d | |||
| 11fbaf10db | |||
| 1229feb69c | |||
| 3efdc7c2e2 | |||
| 1fff931941 | |||
| f4e2db5a5f | |||
| 1d7561279b | |||
| 9e47ae0436 | |||
| b1b1cb527b | |||
| cfa09691e8 | |||
| 356b093db9 | |||
| b4b83e5c75 | |||
| c718456adc | |||
| 76d4e6b10f | |||
| bde548695c | |||
| fd4dd1d5c4 | |||
| 5540bb16ec | |||
| 6ee609f2b9 | |||
| 8b584080e2 | |||
| 01975079e3 | |||
| fe9fbe9c3a | |||
| df6652cce9 | |||
| e5fc879b2d | |||
| 625447e7f1 | |||
| fdcf1f7d68 | |||
| 4003f666cf | |||
| 7b855f8cf1 | |||
| 007ce4b66f | |||
| cb84e4893f | |||
| 795ac561c7 | |||
| 175e01dc2d | |||
| 66e0a26c68 | |||
|
|
959d248273 | ||
| 745ec5d774 |
2
.github/workflows/main.yml
vendored
2
.github/workflows/main.yml
vendored
@@ -12,7 +12,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@master
|
||||
- name: Publish to Registry
|
||||
uses: elgohr/Publish-Docker-Github-Action@master
|
||||
uses: elgohr/Publish-Docker-Github-Action@v5
|
||||
with:
|
||||
name: frishi/threetwo-core-service
|
||||
username: ${{ secrets.DOCKER_USERNAME }}
|
||||
|
||||
66
Dockerfile
66
Dockerfile
@@ -1,40 +1,62 @@
|
||||
FROM alpine:3.14
|
||||
# Use a base image with Node.js 22.1.0
|
||||
FROM node:22.1.0
|
||||
|
||||
# Set metadata for contact
|
||||
LABEL maintainer="Rishi Ghan <rishi.ghan@gmail.com>"
|
||||
|
||||
# Show all node logs
|
||||
# Set environment variables
|
||||
ENV NPM_CONFIG_LOGLEVEL warn
|
||||
ENV NODE_ENV=production
|
||||
|
||||
# Set the working directory
|
||||
WORKDIR /core-services
|
||||
|
||||
# Install required packages
|
||||
RUN apt-get update && apt-get install -y \
|
||||
libvips-tools \
|
||||
wget \
|
||||
imagemagick \
|
||||
python3 \
|
||||
xvfb \
|
||||
xz-utils \
|
||||
curl \
|
||||
bash \
|
||||
software-properties-common
|
||||
|
||||
RUN apk add --update \
|
||||
--repository http://nl.alpinelinux.org/alpine/v3.14/main \
|
||||
vips-tools \
|
||||
wget \
|
||||
imagemagick \
|
||||
python3 \
|
||||
unrar \
|
||||
p7zip \
|
||||
nodejs \
|
||||
npm \
|
||||
xvfb \
|
||||
xz
|
||||
# Install p7zip
|
||||
RUN apt-get update && apt-get install -y p7zip
|
||||
|
||||
# Install unrar directly from RARLAB
|
||||
RUN wget https://www.rarlab.com/rar/rarlinux-x64-621.tar.gz \
|
||||
&& tar -zxvf rarlinux-x64-621.tar.gz \
|
||||
&& cp rar/unrar /usr/bin/ \
|
||||
&& rm -rf rarlinux-x64-621.tar.gz rar
|
||||
|
||||
# Clean up package lists
|
||||
RUN rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Verify Node.js installation
|
||||
RUN node -v && npm -v
|
||||
|
||||
# Copy application configuration files
|
||||
COPY package.json package-lock.json ./
|
||||
COPY moleculer.config.ts ./
|
||||
COPY tsconfig.json ./
|
||||
COPY scripts ./scripts
|
||||
RUN chmod +x ./scripts/*
|
||||
|
||||
RUN npm i
|
||||
# Install Dependncies
|
||||
# Install application dependencies
|
||||
RUN npm install
|
||||
RUN npm install -g typescript ts-node
|
||||
|
||||
# Copy the rest of the application files
|
||||
COPY . .
|
||||
|
||||
# Build and cleanup
|
||||
RUN npm run build \
|
||||
&& npm prune
|
||||
|
||||
# clean up
|
||||
RUN npm prune
|
||||
|
||||
# Expose the application's port
|
||||
EXPOSE 3000
|
||||
# Start server
|
||||
CMD ["npm", "start"]
|
||||
|
||||
# Command to run the application
|
||||
CMD ["npm", "start"]
|
||||
|
||||
26
README.md
26
README.md
@@ -3,21 +3,35 @@
|
||||
This [moleculer-based](https://github.com/moleculerjs/moleculer-web) microservice houses endpoints for the following functions:
|
||||
|
||||
1. Local import of a comic library into mongo (currently supports `cbr` and `cbz` files)
|
||||
2. Metadata extraction from file, `comicinfo.xml`
|
||||
2. Metadata extraction from file, `comicinfo.xml`
|
||||
3. Mongo comic object orchestration
|
||||
4. CRUD operations on `Comic` model
|
||||
5. Helper utils to help with image metadata extraction, file operations and more.
|
||||
|
||||
## Local Development
|
||||
|
||||
1. ~~You need `calibre` in your local path.
|
||||
On `macOS` you can `brew install calibre` and make sure that `ebook-meta` is present on the path~~ Calibre is no longer required as a dependency. Ignore this step.
|
||||
2. You need `mongo` for the data store. on `macOS` you can use [these instructions](https://docs.mongodb.com/manual/tutorial/install-mongodb-on-os-x/) to install it
|
||||
1. You need the following dependencies installed: `mongo`, `elasticsearch` and `redis`
|
||||
2. You also need binaries for `unrar` and `p7zip`
|
||||
3. Clone this repo
|
||||
4. Run `npm i`
|
||||
5. Assuming you installed mongo correctly, run `MONGO_URI=mongodb://localhost:27017/threetwo npm run dev` to start the service
|
||||
5. Assuming you installed the dependencies correctly, run:
|
||||
|
||||
```
|
||||
COMICS_DIRECTORY=<PATH_TO_COMICS_DIRECTORY> \
|
||||
USERDATA_DIRECTORY=<PATH_TO_USERDATA_DIRECTORY> \
|
||||
REDIS_URI=redis://<REDIS_HOST:REDIS_PORT> \
|
||||
ELASTICSEARCH_URI=<ELASTICSEARCH_HOST:ELASTICSEARCH_PORT> \
|
||||
MONGO_URI=mongodb://<MONGO_HOST:MONGO_PORT>/threetwo \
|
||||
UNRAR_BIN_PATH=<UNRAR_BIN_PATH> \
|
||||
SEVENZ_BINARY_PATH=<SEVENZ_BINARY_PATH> \
|
||||
npm run dev
|
||||
```
|
||||
|
||||
to start the service
|
||||
|
||||
6. You should see the service spin up and a list of all the endpoints in the terminal
|
||||
7. The service can be accessed through `http://localhost:3000/api/import/*`
|
||||
7. The service can be accessed through `http://localhost:3000/api/<serviceName>/*`
|
||||
|
||||
## Docker Instructions
|
||||
|
||||
1. Build the image using `docker build . -t frishi/threetwo-import-service`. Give it a hot minute.
|
||||
|
||||
30
config/redis.config.ts
Normal file
30
config/redis.config.ts
Normal file
@@ -0,0 +1,30 @@
|
||||
// Import the Redis library
|
||||
import IORedis from "ioredis";
|
||||
|
||||
// Environment variable for Redis URI
|
||||
const redisURI = process.env.REDIS_URI || "redis://localhost:6379";
|
||||
console.log(`process.env.REDIS_URI is ${process.env.REDIS_URI}`);
|
||||
// Creating the publisher client
|
||||
const pubClient = new IORedis(redisURI);
|
||||
|
||||
// Creating the subscriber client
|
||||
const subClient = new IORedis(redisURI);
|
||||
|
||||
// Handle connection events for the publisher
|
||||
pubClient.on("connect", () => {
|
||||
console.log("Publisher client connected to Redis.");
|
||||
});
|
||||
pubClient.on("error", (err) => {
|
||||
console.error("Publisher client failed to connect to Redis:", err);
|
||||
});
|
||||
|
||||
// Handle connection events for the subscriber
|
||||
subClient.on("connect", () => {
|
||||
console.log("Subscriber client connected to Redis.");
|
||||
});
|
||||
subClient.on("error", (err) => {
|
||||
console.error("Subscriber client failed to connect to Redis:", err);
|
||||
});
|
||||
|
||||
// Export the clients for use in other parts of the application
|
||||
export { pubClient, subClient };
|
||||
103
dependencies.docker-compose.yml
Normal file
103
dependencies.docker-compose.yml
Normal file
@@ -0,0 +1,103 @@
|
||||
services:
|
||||
zoo1:
|
||||
image: confluentinc/cp-zookeeper:7.3.2
|
||||
hostname: zoo1
|
||||
container_name: zoo1
|
||||
ports:
|
||||
- "2181:2181"
|
||||
environment:
|
||||
ZOOKEEPER_CLIENT_PORT: 2181
|
||||
ZOOKEEPER_SERVER_ID: 1
|
||||
ZOOKEEPER_SERVERS: zoo1:2888:3888
|
||||
networks:
|
||||
- kafka-net
|
||||
|
||||
kafka1:
|
||||
image: confluentinc/cp-kafka:7.3.2
|
||||
hostname: kafka1
|
||||
container_name: kafka1
|
||||
ports:
|
||||
- "9092:9092"
|
||||
- "29092:29092"
|
||||
- "9999:9999"
|
||||
environment:
|
||||
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
|
||||
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
||||
KAFKA_JMX_PORT: 9999
|
||||
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
|
||||
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
|
||||
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
|
||||
depends_on:
|
||||
- zoo1
|
||||
networks:
|
||||
- kafka-net
|
||||
|
||||
kafka-ui:
|
||||
container_name: kafka-ui
|
||||
image: provectuslabs/kafka-ui:latest
|
||||
ports:
|
||||
- 8087:8080
|
||||
environment:
|
||||
DYNAMIC_CONFIG_ENABLED: true
|
||||
volumes:
|
||||
- /Users/rishi/work/config/kafka-ui/config.yml:/etc/kafkaui/dynamic_config.yaml
|
||||
depends_on:
|
||||
- kafka1
|
||||
- zoo1
|
||||
networks:
|
||||
- kafka-net
|
||||
|
||||
db:
|
||||
image: "mongo:latest"
|
||||
container_name: database
|
||||
networks:
|
||||
- kafka-net
|
||||
ports:
|
||||
- "127.0.0.1:27017:27017"
|
||||
volumes:
|
||||
- "mongodb_data:/bitnami/mongodb"
|
||||
|
||||
redis:
|
||||
image: "bitnami/redis:latest"
|
||||
container_name: queue
|
||||
environment:
|
||||
ALLOW_EMPTY_PASSWORD: "yes"
|
||||
networks:
|
||||
- kafka-net
|
||||
ports:
|
||||
- "127.0.0.1:6379:6379"
|
||||
|
||||
elasticsearch:
|
||||
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2
|
||||
container_name: elasticsearch
|
||||
environment:
|
||||
- "discovery.type=single-node"
|
||||
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
|
||||
- "xpack.security.enabled=true"
|
||||
- "xpack.security.authc.api_key.enabled=true"
|
||||
- "ELASTIC_PASSWORD=password"
|
||||
ulimits:
|
||||
memlock:
|
||||
soft: -1
|
||||
hard: -1
|
||||
ports:
|
||||
- "127.0.0.1:9200:9200"
|
||||
networks:
|
||||
- kafka-net
|
||||
|
||||
networks:
|
||||
kafka-net:
|
||||
driver: bridge
|
||||
|
||||
volumes:
|
||||
mongodb_data:
|
||||
driver: local
|
||||
elasticsearch:
|
||||
driver: local
|
||||
@@ -3,7 +3,15 @@ LOGGER=true
|
||||
LOGLEVEL=info
|
||||
SERVICEDIR=dist/services
|
||||
|
||||
TRANSPORTER=nats://nats:4222
|
||||
VITE_UNDERLYING_HOST=localhost
|
||||
COMICS_DIRECTORY=/Users/rishi/work/threetwo-core-service/comics
|
||||
USERDATA_DIRECTORY=/Users/rishi/work/threetwo-core-service/userdata
|
||||
REDIS_URI=redis://redis:6379
|
||||
KAFKA_BROKER=kafka1:9092
|
||||
ELASTICSEARCH_URI=http://elasticsearch:9200
|
||||
MONGO_URI=mongodb://db:27017/threetwo
|
||||
UNRAR_BIN_PATH=/opt/homebrew/bin/unrar
|
||||
SEVENZ_BINARY_PATH=/opt/homebrew/bin/7za
|
||||
|
||||
CACHER=Memory
|
||||
|
||||
|
||||
@@ -1,58 +1,125 @@
|
||||
version: "3.3"
|
||||
x-userdata-volume: &userdata-volume
|
||||
type: bind
|
||||
source: ${USERDATA_DIRECTORY}
|
||||
target: /userdata
|
||||
|
||||
x-comics-volume: &comics-volume
|
||||
type: bind
|
||||
source: ${COMICS_DIRECTORY}
|
||||
target: /comics
|
||||
|
||||
services:
|
||||
|
||||
api:
|
||||
core-services:
|
||||
build:
|
||||
context: .
|
||||
image: threetwo-library-service
|
||||
env_file: docker-compose.env
|
||||
environment:
|
||||
SERVICES: api
|
||||
PORT: 3000
|
||||
depends_on:
|
||||
- nats
|
||||
labels:
|
||||
- "traefik.enable=true"
|
||||
- "traefik.http.routers.api-gw.rule=PathPrefix(`/`)"
|
||||
- "traefik.http.services.api-gw.loadbalancer.server.port=3000"
|
||||
networks:
|
||||
- internal
|
||||
|
||||
greeter:
|
||||
build:
|
||||
context: .
|
||||
image: threetwo-library-service
|
||||
env_file: docker-compose.env
|
||||
environment:
|
||||
SERVICES: greeter
|
||||
depends_on:
|
||||
- nats
|
||||
networks:
|
||||
- internal
|
||||
|
||||
nats:
|
||||
image: nats:2
|
||||
networks:
|
||||
- internal
|
||||
|
||||
traefik:
|
||||
image: traefik:v2.1
|
||||
command:
|
||||
- "--api.insecure=true" # Don't do that in production!
|
||||
- "--providers.docker=true"
|
||||
- "--providers.docker.exposedbydefault=false"
|
||||
# context: https://github.com/rishighan/threetwo-core-service.git
|
||||
context: ./
|
||||
dockerfile: Dockerfile
|
||||
image: frishi/threetwo-core-service
|
||||
container_name: core-services
|
||||
ports:
|
||||
- 3000:80
|
||||
- 3001:8080
|
||||
- "3000:3000"
|
||||
- "3001:3001"
|
||||
depends_on:
|
||||
- db
|
||||
- redis
|
||||
- elasticsearch
|
||||
- kafka1
|
||||
- zoo1
|
||||
environment:
|
||||
name: core-services
|
||||
SERVICES: api,library,imagetransformation,opds,search,settings,jobqueue,socket,torrentjobs
|
||||
env_file: docker-compose.env
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock:ro
|
||||
- *comics-volume
|
||||
- *userdata-volume
|
||||
networks:
|
||||
- internal
|
||||
- default
|
||||
- proxy
|
||||
|
||||
zoo1:
|
||||
image: confluentinc/cp-zookeeper:7.3.2
|
||||
hostname: zoo1
|
||||
container_name: zoo1
|
||||
ports:
|
||||
- "2181:2181"
|
||||
environment:
|
||||
ZOOKEEPER_CLIENT_PORT: 2181
|
||||
ZOOKEEPER_SERVER_ID: 1
|
||||
ZOOKEEPER_SERVERS: zoo1:2888:3888
|
||||
networks:
|
||||
- proxy
|
||||
|
||||
kafka1:
|
||||
image: confluentinc/cp-kafka:7.3.2
|
||||
hostname: kafka1
|
||||
container_name: kafka1
|
||||
ports:
|
||||
- "9092:9092"
|
||||
- "29092:29092"
|
||||
- "9999:9999"
|
||||
environment:
|
||||
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1} :9092,DOCKER://host.docker.internal:29092
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
|
||||
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state. change.logger=INFO"
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
||||
KAFKA_JMX_PORT: 9999
|
||||
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
|
||||
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
|
||||
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
|
||||
depends_on:
|
||||
- zoo1
|
||||
networks:
|
||||
- proxy
|
||||
|
||||
db:
|
||||
image: "mongo:latest"
|
||||
container_name: database
|
||||
networks:
|
||||
- proxy
|
||||
ports:
|
||||
- "27017:27017"
|
||||
volumes:
|
||||
- "mongodb_data:/bitnami/mongodb"
|
||||
|
||||
redis:
|
||||
image: "bitnami/redis:latest"
|
||||
container_name: redis
|
||||
hostname: redis
|
||||
environment:
|
||||
ALLOW_EMPTY_PASSWORD: "yes"
|
||||
networks:
|
||||
- proxy
|
||||
ports:
|
||||
- "6379:6379"
|
||||
|
||||
elasticsearch:
|
||||
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2
|
||||
container_name: elasticsearch
|
||||
environment:
|
||||
- "discovery.type=single-node"
|
||||
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
|
||||
- "xpack.security.enabled=true"
|
||||
- "xpack.security.authc.api_key.enabled=true"
|
||||
- "ELASTIC_PASSWORD=password"
|
||||
ulimits:
|
||||
memlock:
|
||||
soft: -1
|
||||
hard: -1
|
||||
ports:
|
||||
- 9200:9200
|
||||
networks:
|
||||
- proxy
|
||||
|
||||
networks:
|
||||
internal:
|
||||
proxy:
|
||||
external: true
|
||||
|
||||
volumes:
|
||||
data:
|
||||
mongodb_data:
|
||||
driver: local
|
||||
elasticsearch:
|
||||
driver: local
|
||||
|
||||
@@ -2,21 +2,60 @@ const path = require("path");
|
||||
const mkdir = require("mkdirp").sync;
|
||||
const DbService = require("moleculer-db");
|
||||
|
||||
|
||||
export const DbMixin = (collection, model) => {
|
||||
if (process.env.MONGO_URI) {
|
||||
const MongooseAdapter = require("moleculer-db-adapter-mongoose");
|
||||
return {
|
||||
mixins: [DbService],
|
||||
adapter: new MongooseAdapter(process.env.MONGO_URI, {
|
||||
user: process.env.MONGO_INITDB_ROOT_USERNAME,
|
||||
pass: process.env.MONGO_INITDB_ROOT_PASSWORD,
|
||||
keepAlive: true,
|
||||
useUnifiedTopology: true,
|
||||
family: 4,
|
||||
}),
|
||||
model,
|
||||
};
|
||||
if (!process.env.MONGO_URI) {
|
||||
console.log("MONGO_URI not provided, initializing local storage...");
|
||||
mkdir(path.resolve("./data"));
|
||||
return { mixins: [DbService] }; // Handle case where no DB URI is provided
|
||||
}
|
||||
mkdir(path.resolve("./data"));
|
||||
|
||||
const MongooseAdapter = require("moleculer-db-adapter-mongoose");
|
||||
const adapter = new MongooseAdapter(process.env.MONGO_URI, {
|
||||
user: process.env.MONGO_INITDB_ROOT_USERNAME,
|
||||
pass: process.env.MONGO_INITDB_ROOT_PASSWORD,
|
||||
keepAlive: true,
|
||||
useNewUrlParser: true,
|
||||
useUnifiedTopology: true,
|
||||
});
|
||||
|
||||
const connectWithRetry = async (
|
||||
adapter,
|
||||
maxRetries = 5,
|
||||
interval = 5000
|
||||
) => {
|
||||
for (let retry = 0; retry < maxRetries; retry++) {
|
||||
try {
|
||||
await adapter.connect();
|
||||
console.log("MongoDB connected successfully!");
|
||||
return;
|
||||
} catch (err) {
|
||||
console.error("MongoDB connection error:", err);
|
||||
console.log(
|
||||
`Retrying MongoDB connection in ${
|
||||
interval / 1000
|
||||
} seconds...`
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, interval));
|
||||
}
|
||||
}
|
||||
console.error("Failed to connect to MongoDB after several attempts.");
|
||||
};
|
||||
|
||||
return {
|
||||
mixins: [DbService],
|
||||
adapter,
|
||||
model,
|
||||
collection,
|
||||
async started() {
|
||||
await connectWithRetry(this.adapter);
|
||||
},
|
||||
async stopped() {
|
||||
try {
|
||||
await this.adapter.disconnect();
|
||||
console.log("MongoDB disconnected");
|
||||
} catch (err) {
|
||||
console.error("MongoDB disconnection error:", err);
|
||||
}
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
const paginate = require("mongoose-paginate-v2");
|
||||
const { Client } = require("@elastic/elasticsearch");
|
||||
import ComicVineMetadataSchema from "./comicvine.metadata.model";
|
||||
import { mongoosastic } from "mongoosastic-ts";
|
||||
const mongoose = require("mongoose")
|
||||
const mongoose = require("mongoose");
|
||||
import {
|
||||
MongoosasticDocument,
|
||||
MongoosasticModel,
|
||||
@@ -28,6 +27,10 @@ const RawFileDetailsSchema = mongoose.Schema({
|
||||
mimeType: String,
|
||||
containedIn: String,
|
||||
pageCount: Number,
|
||||
archive: {
|
||||
uncompressed: Boolean,
|
||||
expandedPath: String,
|
||||
},
|
||||
cover: {
|
||||
filePath: String,
|
||||
stats: Object,
|
||||
@@ -51,7 +54,38 @@ const DirectConnectBundleSchema = mongoose.Schema({
|
||||
name: String,
|
||||
size: String,
|
||||
type: {},
|
||||
_id: false,
|
||||
});
|
||||
const wantedSchema = mongoose.Schema(
|
||||
{
|
||||
source: { type: String, default: null },
|
||||
markEntireVolumeWanted: Boolean,
|
||||
issues: {
|
||||
type: [
|
||||
{
|
||||
_id: false, // Disable automatic ObjectId creation for each issue
|
||||
id: Number,
|
||||
url: String,
|
||||
image: { type: Array, default: [] },
|
||||
coverDate: String,
|
||||
issueNumber: String,
|
||||
},
|
||||
],
|
||||
default: null,
|
||||
},
|
||||
volume: {
|
||||
type: {
|
||||
_id: false, // Disable automatic ObjectId creation for volume
|
||||
id: Number,
|
||||
url: String,
|
||||
image: { type: Array, default: [] },
|
||||
name: String,
|
||||
},
|
||||
default: null,
|
||||
},
|
||||
},
|
||||
{ _id: false }
|
||||
); // Disable automatic ObjectId creation for the wanted object itself
|
||||
|
||||
const ComicSchema = mongoose.Schema(
|
||||
{
|
||||
@@ -67,18 +101,12 @@ const ComicSchema = mongoose.Schema(
|
||||
},
|
||||
sourcedMetadata: {
|
||||
comicInfo: { type: mongoose.Schema.Types.Mixed, default: {} },
|
||||
comicvine: {
|
||||
type: ComicVineMetadataSchema,
|
||||
es_indexed: true,
|
||||
default: {},
|
||||
},
|
||||
shortboxed: {},
|
||||
comicvine: { type: mongoose.Schema.Types.Mixed, default: {} }, // Set as a freeform object
|
||||
locg: {
|
||||
type: LOCGSchema,
|
||||
es_indexed: true,
|
||||
default: {},
|
||||
},
|
||||
gcd: {},
|
||||
},
|
||||
rawFileDetails: {
|
||||
type: RawFileDetailsSchema,
|
||||
@@ -98,11 +126,9 @@ const ComicSchema = mongoose.Schema(
|
||||
subtitle: { type: String, es_indexed: true },
|
||||
},
|
||||
},
|
||||
wanted: wantedSchema,
|
||||
|
||||
acquisition: {
|
||||
source: {
|
||||
wanted: Boolean,
|
||||
name: String,
|
||||
},
|
||||
release: {},
|
||||
directconnect: {
|
||||
downloads: {
|
||||
@@ -111,12 +137,13 @@ const ComicSchema = mongoose.Schema(
|
||||
default: [],
|
||||
},
|
||||
},
|
||||
torrent: {
|
||||
sourceApplication: String,
|
||||
magnet: String,
|
||||
tracker: String,
|
||||
status: String,
|
||||
},
|
||||
torrent: [
|
||||
{
|
||||
infoHash: String,
|
||||
name: String,
|
||||
announce: [String],
|
||||
},
|
||||
],
|
||||
usenet: {
|
||||
sourceApplication: String,
|
||||
},
|
||||
|
||||
@@ -1,95 +0,0 @@
|
||||
const mongoose = require("mongoose");
|
||||
const Things = mongoose.Schema({
|
||||
_id: false,
|
||||
api_detail_url: String,
|
||||
id: Number,
|
||||
name: String,
|
||||
site_detail_url: String,
|
||||
count: String,
|
||||
});
|
||||
const Issue = mongoose.Schema({
|
||||
_id: false,
|
||||
api_detail_url: String,
|
||||
id: Number,
|
||||
name: String,
|
||||
issue_number: String,
|
||||
});
|
||||
const VolumeInformation = mongoose.Schema({
|
||||
_id: false,
|
||||
aliases: [String],
|
||||
api_detail_url: String,
|
||||
characters: [Things],
|
||||
concepts: [Things],
|
||||
count_of_issues: String,
|
||||
date_added: String,
|
||||
date_last_updated: String,
|
||||
deck: String,
|
||||
description: String,
|
||||
first_issue: Issue,
|
||||
id: Number,
|
||||
image: {
|
||||
icon_url: String,
|
||||
medium_url: String,
|
||||
screen_url: String,
|
||||
screen_large_url: String,
|
||||
small_url: String,
|
||||
super_url: String,
|
||||
thumb_url: String,
|
||||
tiny_url: String,
|
||||
original_url: String,
|
||||
image_tags: String,
|
||||
},
|
||||
issues: [
|
||||
{
|
||||
api_detail_url: String,
|
||||
id: Number,
|
||||
name: String,
|
||||
issue_number: String,
|
||||
site_detail_url: String,
|
||||
},
|
||||
],
|
||||
last_issue: Issue,
|
||||
locations: [Things],
|
||||
name: String,
|
||||
objects: [Things],
|
||||
people: [Things],
|
||||
publisher: {
|
||||
api_detail_url: String,
|
||||
id: Number,
|
||||
name: String,
|
||||
},
|
||||
site_detail_url: String,
|
||||
start_year: String,
|
||||
});
|
||||
|
||||
const ComicVineMetadataSchema = mongoose.Schema({
|
||||
_id: false,
|
||||
aliases: [String],
|
||||
api_detail_url: String,
|
||||
has_staff_review: { type: mongoose.Schema.Types.Mixed },
|
||||
|
||||
cover_date: Date,
|
||||
date_added: String,
|
||||
date_last_updated: String,
|
||||
deck: String,
|
||||
description: String,
|
||||
image: {
|
||||
icon_url: String,
|
||||
medium_url: String,
|
||||
screen_url: String,
|
||||
screen_large_url: String,
|
||||
small_url: String,
|
||||
super_url: String,
|
||||
thumb_url: String,
|
||||
tiny_url: String,
|
||||
original_url: String,
|
||||
image_tags: String,
|
||||
},
|
||||
|
||||
id: Number,
|
||||
name: String,
|
||||
resource_type: String,
|
||||
volumeInformation: VolumeInformation,
|
||||
});
|
||||
|
||||
export default ComicVineMetadataSchema;
|
||||
12
models/jobresult.model.ts
Normal file
12
models/jobresult.model.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
const mongoose = require("mongoose");
|
||||
|
||||
const JobResultScehma = mongoose.Schema({
|
||||
id: Number,
|
||||
status: String,
|
||||
sessionId: String,
|
||||
failedReason: Object,
|
||||
timestamp: Date,
|
||||
});
|
||||
|
||||
const JobResult = mongoose.model("JobResult", JobResultScehma);
|
||||
export default JobResult;
|
||||
9
models/session.model.ts
Normal file
9
models/session.model.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
const mongoose = require("mongoose");
|
||||
|
||||
const SessionScehma = mongoose.Schema({
|
||||
sessionId: String,
|
||||
socketId: String,
|
||||
});
|
||||
|
||||
const Session = mongoose.model("Session", SessionScehma);
|
||||
export default Session;
|
||||
@@ -1,21 +1,34 @@
|
||||
const mongoose = require("mongoose");
|
||||
const paginate = require("mongoose-paginate-v2");
|
||||
|
||||
const HostSchema = mongoose.Schema({
|
||||
_id: false,
|
||||
username: String,
|
||||
password: String,
|
||||
hostname: String,
|
||||
port: String,
|
||||
protocol: String,
|
||||
});
|
||||
const SettingsScehma = mongoose.Schema({
|
||||
directConnect: {
|
||||
client: {
|
||||
host: {
|
||||
username: String,
|
||||
password: String,
|
||||
hostname: String,
|
||||
port: String,
|
||||
protocol: String,
|
||||
},
|
||||
host: HostSchema,
|
||||
airDCPPUserSettings: Object,
|
||||
|
||||
hubs: Array,
|
||||
},
|
||||
},
|
||||
bittorrent: {
|
||||
client: {
|
||||
name: String,
|
||||
host: HostSchema,
|
||||
},
|
||||
},
|
||||
prowlarr: {
|
||||
client: {
|
||||
host: HostSchema,
|
||||
apiKey: String,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const Settings = mongoose.model("Settings", SettingsScehma);
|
||||
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
MetricRegistry,
|
||||
ServiceBroker,
|
||||
} from "moleculer";
|
||||
const RedisTransporter = require("moleculer").Transporters.Redis;
|
||||
|
||||
/**
|
||||
* Moleculer ServiceBroker configuration file
|
||||
@@ -90,7 +91,7 @@ const brokerConfig: BrokerOptions = {
|
||||
// More info: https://moleculer.services/docs/0.14/networking.html
|
||||
// Note: During the development, you don't need to define it because all services will be loaded locally.
|
||||
// In production you can set it via `TRANSPORTER=nats://localhost:4222` environment variable.
|
||||
transporter: process.env.REDIS_URI || "redis://localhost:6379",
|
||||
transporter: new RedisTransporter(process.env.REDIS_URI),
|
||||
|
||||
// Define a cacher.
|
||||
// More info: https://moleculer.services/docs/0.14/caching.html
|
||||
|
||||
6231
package-lock.json
generated
6231
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
32
package.json
32
package.json
@@ -4,8 +4,8 @@
|
||||
"description": "Endpoints for common operations in ThreeTwo",
|
||||
"scripts": {
|
||||
"build": "tsc --build tsconfig.json",
|
||||
"dev": "ts-node ./node_modules/moleculer/bin/moleculer-runner.js --hot --repl --config moleculer.config.ts services/**/*.service.ts",
|
||||
"start": "moleculer-runner --config dist/moleculer.config.js",
|
||||
"dev": "./scripts/start.sh dev",
|
||||
"start": "npm run build && ./scripts/start.sh prod",
|
||||
"cli": "moleculer connect NATS",
|
||||
"ci": "jest --watch",
|
||||
"test": "jest --coverage",
|
||||
@@ -20,7 +20,6 @@
|
||||
],
|
||||
"author": "Rishi Ghan",
|
||||
"devDependencies": {
|
||||
"@elastic/elasticsearch": "^8.6.0",
|
||||
"@types/lodash": "^4.14.168",
|
||||
"@typescript-eslint/eslint-plugin": "^5.56.0",
|
||||
"@typescript-eslint/parser": "^5.56.0",
|
||||
@@ -28,6 +27,7 @@
|
||||
"eslint-plugin-import": "^2.20.2",
|
||||
"eslint-plugin-prefer-arrow": "^1.2.2",
|
||||
"install": "^0.13.0",
|
||||
"ioredis": "^5.4.1",
|
||||
"jest": "^29.5.0",
|
||||
"jest-cli": "^29.5.0",
|
||||
"moleculer-repl": "^0.7.0",
|
||||
@@ -35,20 +35,22 @@
|
||||
"npm": "^8.4.1",
|
||||
"ts-jest": "^29.0.5",
|
||||
"ts-node": "^10.9.1",
|
||||
"typescript": "^5.0.2"
|
||||
"typescript": "^5.0.2",
|
||||
"uuid": "^9.0.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"@npcz/magic": "^1.3.14",
|
||||
"redis": "^4.6.5",
|
||||
"@socket.io/redis-adapter": "^8.1.0",
|
||||
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
|
||||
"@elastic/elasticsearch": "^8.13.1",
|
||||
"@jorgeferrero/stream-to-buffer": "^2.0.6",
|
||||
"@npcz/magic": "^1.3.14",
|
||||
"@root/walk": "^1.1.0",
|
||||
"@socket.io/redis-adapter": "^8.1.0",
|
||||
"@types/jest": "^27.4.1",
|
||||
"@types/mkdirp": "^1.0.0",
|
||||
"@types/node": "^13.9.8",
|
||||
"@types/string-similarity": "^4.0.0",
|
||||
"axios": "^0.25.0",
|
||||
"airdcpp-apisocket": "^2.4.4",
|
||||
"axios": "^1.6.8",
|
||||
"axios-retry": "^3.2.4",
|
||||
"bree": "^7.1.5",
|
||||
"calibre-opds": "^1.0.7",
|
||||
@@ -65,24 +67,24 @@
|
||||
"leven": "^3.1.0",
|
||||
"lodash": "^4.17.21",
|
||||
"mkdirp": "^0.5.5",
|
||||
"moleculer": "^0.14.29",
|
||||
"moleculer-bull": "github:rishighan/moleculer-bull#1.0.0",
|
||||
"moleculer-bullmq": "^3.0.0",
|
||||
"moleculer-db": "^0.8.23",
|
||||
"moleculer-db-adapter-mongoose": "^0.9.2",
|
||||
"moleculer-db-adapter-mongoose": "^0.9.4",
|
||||
"moleculer-io": "^2.2.0",
|
||||
"moleculer-web": "^0.10.5",
|
||||
"moleculer-web": "^0.10.7",
|
||||
"mongoosastic-ts": "^6.0.3",
|
||||
"mongoose": "^6.10.4",
|
||||
"mongoose-paginate-v2": "^1.3.18",
|
||||
"nats": "^1.3.2",
|
||||
"opds-extra": "^3.0.9",
|
||||
"opds-extra": "^3.0.10",
|
||||
"p7zip-threetwo": "^1.0.4",
|
||||
"redis": "^4.6.14",
|
||||
"sanitize-filename-ts": "^1.0.2",
|
||||
"sharp": "^0.30.4",
|
||||
"sharp": "^0.33.3",
|
||||
"threetwo-ui-typings": "^1.0.14",
|
||||
"through2": "^4.0.2",
|
||||
"unrar": "^0.2.0",
|
||||
"xml2js": "^0.4.23"
|
||||
"xml2js": "^0.6.2"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">= 18.x.x"
|
||||
|
||||
26
scripts/start.sh
Executable file
26
scripts/start.sh
Executable file
@@ -0,0 +1,26 @@
|
||||
#!/bin/bash
|
||||
echo "Starting script with mode: $MODE"
|
||||
|
||||
# Extract the host and port from MONGO_URI
|
||||
HOST_PORT=$(echo $MONGO_URI | sed -e 's/mongodb:\/\///' -e 's/\/.*$//')
|
||||
|
||||
# Assuming the script is called from the project root
|
||||
PROJECT_ROOT=$(pwd)
|
||||
echo "Project root: $PROJECT_ROOT"
|
||||
|
||||
CONFIG_PATH="$PROJECT_ROOT/moleculer.config.ts"
|
||||
echo "Configuration path: $CONFIG_PATH"
|
||||
|
||||
# Set the correct path for moleculer-runner based on the mode
|
||||
if [ "$MODE" == "dev" ]; then
|
||||
# For development: use ts-node
|
||||
MOLECULER_RUNNER="ts-node $PROJECT_ROOT/node_modules/moleculer/bin/moleculer-runner.js --hot --repl --config $CONFIG_PATH $PROJECT_ROOT/services/**/*.service.ts"
|
||||
echo "Moleculer Runner for dev: $MOLECULER_RUNNER"
|
||||
else
|
||||
# For production: direct node execution of the compiled JavaScript
|
||||
MOLECULER_RUNNER="moleculer-runner --config $PROJECT_ROOT/dist/moleculer.config.js $PROJECT_ROOT/dist/services/**/*.service.js"
|
||||
echo "Moleculer Runner for prod: $MOLECULER_RUNNER"
|
||||
fi
|
||||
|
||||
# Run wait-for-it, then start the application
|
||||
./scripts/wait-for-it.sh $HOST_PORT -- $MOLECULER_RUNNER
|
||||
190
scripts/wait-for-it.sh
Executable file
190
scripts/wait-for-it.sh
Executable file
@@ -0,0 +1,190 @@
|
||||
#!/usr/bin/env bash
|
||||
# Use this script to test if a given TCP host/port are available
|
||||
|
||||
WAITFORIT_cmdname=${0##*/}
|
||||
if [[ $OSTYPE == 'darwin'* ]]; then
|
||||
if ! command -v gtimeout &> /dev/null
|
||||
then
|
||||
echo "missing gtimeout (`brew install coreutils`)"
|
||||
exit
|
||||
fi
|
||||
alias timeout=gtimeout
|
||||
fi
|
||||
|
||||
echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi }
|
||||
|
||||
usage()
|
||||
{
|
||||
cat << USAGE >&2
|
||||
Usage:
|
||||
$WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args]
|
||||
-h HOST | --host=HOST Host or IP under test
|
||||
-p PORT | --port=PORT TCP port under test
|
||||
Alternatively, you specify the host and port as host:port
|
||||
-s | --strict Only execute subcommand if the test succeeds
|
||||
-q | --quiet Don't output any status messages
|
||||
-t TIMEOUT | --timeout=TIMEOUT
|
||||
Timeout in seconds, zero for no timeout
|
||||
-- COMMAND ARGS Execute command with args after the test finishes
|
||||
USAGE
|
||||
exit 1
|
||||
}
|
||||
|
||||
wait_for()
|
||||
{
|
||||
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
|
||||
echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
|
||||
else
|
||||
echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout"
|
||||
fi
|
||||
WAITFORIT_start_ts=$(date +%s)
|
||||
while :
|
||||
do
|
||||
if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then
|
||||
nc -z $WAITFORIT_HOST $WAITFORIT_PORT
|
||||
WAITFORIT_result=$?
|
||||
else
|
||||
(echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1
|
||||
WAITFORIT_result=$?
|
||||
fi
|
||||
if [[ $WAITFORIT_result -eq 0 ]]; then
|
||||
WAITFORIT_end_ts=$(date +%s)
|
||||
echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds"
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
return $WAITFORIT_result
|
||||
}
|
||||
|
||||
wait_for_wrapper()
|
||||
{
|
||||
# In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692
|
||||
if [[ $WAITFORIT_QUIET -eq 1 ]]; then
|
||||
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
|
||||
else
|
||||
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
|
||||
fi
|
||||
WAITFORIT_PID=$!
|
||||
trap "kill -INT -$WAITFORIT_PID" INT
|
||||
wait $WAITFORIT_PID
|
||||
WAITFORIT_RESULT=$?
|
||||
if [[ $WAITFORIT_RESULT -ne 0 ]]; then
|
||||
echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
|
||||
fi
|
||||
return $WAITFORIT_RESULT
|
||||
}
|
||||
|
||||
# process arguments
|
||||
while [[ $# -gt 0 ]]
|
||||
do
|
||||
case "$1" in
|
||||
*:* )
|
||||
WAITFORIT_hostport=(${1//:/ })
|
||||
WAITFORIT_HOST=${WAITFORIT_hostport[0]}
|
||||
WAITFORIT_PORT=${WAITFORIT_hostport[1]}
|
||||
shift 1
|
||||
;;
|
||||
--child)
|
||||
WAITFORIT_CHILD=1
|
||||
shift 1
|
||||
;;
|
||||
-q | --quiet)
|
||||
WAITFORIT_QUIET=1
|
||||
shift 1
|
||||
;;
|
||||
-s | --strict)
|
||||
WAITFORIT_STRICT=1
|
||||
shift 1
|
||||
;;
|
||||
-h)
|
||||
WAITFORIT_HOST="$2"
|
||||
if [[ $WAITFORIT_HOST == "" ]]; then break; fi
|
||||
shift 2
|
||||
;;
|
||||
--host=*)
|
||||
WAITFORIT_HOST="${1#*=}"
|
||||
shift 1
|
||||
;;
|
||||
-p)
|
||||
WAITFORIT_PORT="$2"
|
||||
if [[ $WAITFORIT_PORT == "" ]]; then break; fi
|
||||
shift 2
|
||||
;;
|
||||
--port=*)
|
||||
WAITFORIT_PORT="${1#*=}"
|
||||
shift 1
|
||||
;;
|
||||
-t)
|
||||
WAITFORIT_TIMEOUT="$2"
|
||||
if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi
|
||||
shift 2
|
||||
;;
|
||||
--timeout=*)
|
||||
WAITFORIT_TIMEOUT="${1#*=}"
|
||||
shift 1
|
||||
;;
|
||||
--)
|
||||
shift
|
||||
WAITFORIT_CLI=("$@")
|
||||
break
|
||||
;;
|
||||
--help)
|
||||
usage
|
||||
;;
|
||||
*)
|
||||
echoerr "Unknown argument: $1"
|
||||
usage
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then
|
||||
echoerr "Error: you need to provide a host and port to test."
|
||||
usage
|
||||
fi
|
||||
|
||||
WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15}
|
||||
WAITFORIT_STRICT=${WAITFORIT_STRICT:-0}
|
||||
WAITFORIT_CHILD=${WAITFORIT_CHILD:-0}
|
||||
WAITFORIT_QUIET=${WAITFORIT_QUIET:-0}
|
||||
|
||||
# Check to see if timeout is from busybox?
|
||||
WAITFORIT_TIMEOUT_PATH=$(type -p timeout)
|
||||
WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH)
|
||||
|
||||
WAITFORIT_BUSYTIMEFLAG=""
|
||||
if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then
|
||||
WAITFORIT_ISBUSY=1
|
||||
# Check if busybox timeout uses -t flag
|
||||
# (recent Alpine versions don't support -t anymore)
|
||||
if timeout &>/dev/stdout | grep -q -e '-t '; then
|
||||
WAITFORIT_BUSYTIMEFLAG="-t"
|
||||
fi
|
||||
else
|
||||
WAITFORIT_ISBUSY=0
|
||||
fi
|
||||
|
||||
if [[ $WAITFORIT_CHILD -gt 0 ]]; then
|
||||
wait_for
|
||||
WAITFORIT_RESULT=$?
|
||||
exit $WAITFORIT_RESULT
|
||||
else
|
||||
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
|
||||
wait_for_wrapper
|
||||
WAITFORIT_RESULT=$?
|
||||
else
|
||||
wait_for
|
||||
WAITFORIT_RESULT=$?
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ $WAITFORIT_CLI != "" ]]; then
|
||||
if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then
|
||||
echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess"
|
||||
exit $WAITFORIT_RESULT
|
||||
fi
|
||||
exec "${WAITFORIT_CLI[@]}"
|
||||
else
|
||||
exit $WAITFORIT_RESULT
|
||||
fi
|
||||
156
services/airdcpp.service.ts
Normal file
156
services/airdcpp.service.ts
Normal file
@@ -0,0 +1,156 @@
|
||||
"use strict";
|
||||
import {
|
||||
Context,
|
||||
Service,
|
||||
ServiceBroker,
|
||||
ServiceSchema,
|
||||
Errors,
|
||||
} from "moleculer";
|
||||
import axios from "axios";
|
||||
import AirDCPPSocket from "../shared/airdcpp.socket";
|
||||
|
||||
export default class AirDCPPService extends Service {
|
||||
// @ts-ignore
|
||||
public constructor(
|
||||
public broker: ServiceBroker,
|
||||
schema: ServiceSchema<{}> = { name: "airdcpp" }
|
||||
) {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "airdcpp",
|
||||
mixins: [],
|
||||
hooks: {},
|
||||
actions: {
|
||||
initialize: {
|
||||
rest: "POST /initialize",
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
host: {
|
||||
hostname: string;
|
||||
port: string;
|
||||
protocol: string;
|
||||
username: string;
|
||||
password: string;
|
||||
};
|
||||
}>
|
||||
) => {
|
||||
try {
|
||||
const {
|
||||
host: {
|
||||
hostname,
|
||||
protocol,
|
||||
port,
|
||||
username,
|
||||
password,
|
||||
},
|
||||
} = ctx.params;
|
||||
const airDCPPSocket = new AirDCPPSocket({
|
||||
protocol,
|
||||
hostname: `${hostname}:${port}`,
|
||||
username,
|
||||
password,
|
||||
});
|
||||
return await airDCPPSocket.connect();
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
}
|
||||
},
|
||||
},
|
||||
getHubs: {
|
||||
rest: "POST /getHubs",
|
||||
timeout: 70000,
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
host: {
|
||||
hostname: string;
|
||||
port: string;
|
||||
protocol: string;
|
||||
username: string;
|
||||
password: string;
|
||||
};
|
||||
}>
|
||||
) => {
|
||||
const {
|
||||
host: {
|
||||
hostname,
|
||||
port,
|
||||
protocol,
|
||||
username,
|
||||
password,
|
||||
},
|
||||
} = ctx.params;
|
||||
try {
|
||||
const airDCPPSocket = new AirDCPPSocket({
|
||||
protocol,
|
||||
hostname: `${hostname}:${port}`,
|
||||
username,
|
||||
password,
|
||||
});
|
||||
await airDCPPSocket.connect();
|
||||
return await airDCPPSocket.get(`hubs`);
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
},
|
||||
},
|
||||
search: {
|
||||
rest: "POST /search",
|
||||
timeout: 20000,
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
host: {
|
||||
hostname;
|
||||
port;
|
||||
protocol;
|
||||
username;
|
||||
password;
|
||||
};
|
||||
dcppSearchQuery;
|
||||
}>
|
||||
) => {
|
||||
try {
|
||||
const {
|
||||
host: {
|
||||
hostname,
|
||||
port,
|
||||
protocol,
|
||||
username,
|
||||
password,
|
||||
},
|
||||
dcppSearchQuery,
|
||||
} = ctx.params;
|
||||
const airDCPPSocket = new AirDCPPSocket({
|
||||
protocol,
|
||||
hostname: `${hostname}:${port}`,
|
||||
username,
|
||||
password,
|
||||
});
|
||||
await airDCPPSocket.connect();
|
||||
const searchInstance = await airDCPPSocket.post(
|
||||
`search`
|
||||
);
|
||||
|
||||
// Post the search
|
||||
const searchInfo = await airDCPPSocket.post(
|
||||
`search/${searchInstance.id}/hub_search`,
|
||||
dcppSearchQuery
|
||||
);
|
||||
await this.sleep(10000);
|
||||
const results = await airDCPPSocket.get(
|
||||
`search/${searchInstance.id}/results/0/5`
|
||||
);
|
||||
return results;
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
sleep: (ms: number) => {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,8 @@ import {
|
||||
ServiceSchema,
|
||||
Errors,
|
||||
} from "moleculer";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
import path from "path";
|
||||
import {
|
||||
analyze,
|
||||
@@ -22,16 +24,13 @@ export default class ImageTransformation extends Service {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "imagetransformation",
|
||||
mixins: [],
|
||||
mixins: [DbMixin("comics", Comic)],
|
||||
settings: {
|
||||
// Available fields in the responses
|
||||
fields: ["_id", "name", "quantity", "price"],
|
||||
fields: ["_id"],
|
||||
|
||||
// Validator for the `create` & `insert` actions.
|
||||
entityValidator: {
|
||||
name: "string|min:3",
|
||||
price: "number|positive",
|
||||
},
|
||||
entityValidator: {},
|
||||
},
|
||||
hooks: {},
|
||||
actions: {
|
||||
|
||||
@@ -1,291 +0,0 @@
|
||||
/*
|
||||
* MIT License
|
||||
*
|
||||
* Copyright (c) 2022 Rishi Ghan
|
||||
*
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2015 Rishi Ghan
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Revision History:
|
||||
* Initial: 2022/01/28 Rishi Ghan
|
||||
*/
|
||||
|
||||
"use strict";
|
||||
|
||||
import { refineQuery } from "filename-parser";
|
||||
import { isNil, isUndefined } from "lodash";
|
||||
import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
|
||||
import BullMQMixin, { SandboxedJob } from "moleculer-bull";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
import {
|
||||
extractFromArchive,
|
||||
uncompressEntireArchive,
|
||||
} from "../utils/uncompression.utils";
|
||||
|
||||
const REDIS_URI = process.env.REDIS_URI || `redis://localhost:6379`;
|
||||
const EventEmitter = require("events");
|
||||
EventEmitter.defaultMaxListeners = 20;
|
||||
|
||||
console.log(`REDIS -> ${REDIS_URI}`);
|
||||
export default class QueueService extends Service {
|
||||
public constructor(
|
||||
public broker: ServiceBroker,
|
||||
schema: ServiceSchema<{}> = { name: "importqueue" }
|
||||
) {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "importqueue",
|
||||
mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
|
||||
settings: {
|
||||
bullmq: {
|
||||
maxStalledCount: 0,
|
||||
},
|
||||
},
|
||||
|
||||
hooks: {},
|
||||
queues: {
|
||||
"process.import": {
|
||||
concurrency: 10,
|
||||
async process(job: SandboxedJob) {
|
||||
console.info("New job received!", job.data);
|
||||
console.info(`Processing queue...`);
|
||||
// extract the cover
|
||||
const result = await extractFromArchive(
|
||||
job.data.fileObject.filePath
|
||||
);
|
||||
const {
|
||||
name,
|
||||
filePath,
|
||||
fileSize,
|
||||
extension,
|
||||
mimeType,
|
||||
cover,
|
||||
containedIn,
|
||||
comicInfoJSON,
|
||||
} = result;
|
||||
|
||||
// Infer any issue-related metadata from the filename
|
||||
const { inferredIssueDetails } = refineQuery(
|
||||
result.name
|
||||
);
|
||||
console.log(
|
||||
"Issue metadata inferred: ",
|
||||
JSON.stringify(inferredIssueDetails, null, 2)
|
||||
);
|
||||
// Add the bundleId, if present to the payload
|
||||
let bundleId = null;
|
||||
if (!isNil(job.data.bundleId)) {
|
||||
bundleId = job.data.bundleId;
|
||||
}
|
||||
|
||||
// Orchestrate the payload
|
||||
const payload = {
|
||||
importStatus: {
|
||||
isImported: true,
|
||||
tagged: false,
|
||||
matchedResult: {
|
||||
score: "0",
|
||||
},
|
||||
},
|
||||
rawFileDetails: {
|
||||
name,
|
||||
filePath,
|
||||
fileSize,
|
||||
extension,
|
||||
mimeType,
|
||||
containedIn,
|
||||
cover,
|
||||
},
|
||||
inferredMetadata: {
|
||||
issue: inferredIssueDetails,
|
||||
},
|
||||
sourcedMetadata: {
|
||||
// except for ComicInfo.xml, everything else should be copied over from the
|
||||
// parent comic
|
||||
comicInfo: comicInfoJSON,
|
||||
},
|
||||
// since we already have at least 1 copy
|
||||
// mark it as not wanted by default
|
||||
"acquisition.source.wanted": false,
|
||||
|
||||
// clear out the downloads array
|
||||
// "acquisition.directconnect.downloads": [],
|
||||
|
||||
// mark the metadata source
|
||||
"acquisition.source.name": job.data.sourcedFrom,
|
||||
};
|
||||
|
||||
// Add the sourcedMetadata, if present
|
||||
if (!isNil(job.data.sourcedMetadata) && !isUndefined(job.data.sourcedMetadata.comicvine)) {
|
||||
Object.assign(
|
||||
payload.sourcedMetadata,
|
||||
job.data.sourcedMetadata
|
||||
);
|
||||
}
|
||||
|
||||
// write to mongo
|
||||
const importResult = await this.broker.call(
|
||||
"library.rawImportToDB",
|
||||
{
|
||||
importType: job.data.importType,
|
||||
bundleId,
|
||||
payload,
|
||||
}
|
||||
);
|
||||
return {
|
||||
data: {
|
||||
importResult,
|
||||
},
|
||||
id: job.id,
|
||||
worker: process.pid,
|
||||
};
|
||||
},
|
||||
},
|
||||
"process.uncompressAndResize": {
|
||||
concurrency: 2,
|
||||
async process(job: SandboxedJob) {
|
||||
console.log(`Initiating uncompression job...`);
|
||||
return await uncompressEntireArchive(
|
||||
job.data.filePath,
|
||||
job.data.options
|
||||
);
|
||||
},
|
||||
},
|
||||
},
|
||||
actions: {
|
||||
uncompressResize: {
|
||||
rest: "POST /uncompressResize",
|
||||
params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
data: { filePath: string; options: any };
|
||||
}>
|
||||
) {
|
||||
return await this.createJob(
|
||||
"process.uncompressAndResize",
|
||||
ctx.params
|
||||
);
|
||||
},
|
||||
},
|
||||
processImport: {
|
||||
rest: "POST /processImport",
|
||||
params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
fileObject: object;
|
||||
importType: string;
|
||||
bundleId: number;
|
||||
sourcedFrom?: string;
|
||||
sourcedMetadata: object;
|
||||
}>
|
||||
) {
|
||||
return await this.createJob("process.import", {
|
||||
fileObject: ctx.params.fileObject,
|
||||
importType: ctx.params.importType,
|
||||
bundleId: ctx.params.bundleId,
|
||||
sourcedFrom: ctx.params.sourcedFrom,
|
||||
sourcedMetadata: ctx.params.sourcedMetadata,
|
||||
});
|
||||
},
|
||||
},
|
||||
toggleImportQueue: {
|
||||
rest: "POST /pauseImportQueue",
|
||||
params: {},
|
||||
handler: async (ctx: Context<{ action: string }>) => {
|
||||
switch (ctx.params.action) {
|
||||
case "pause":
|
||||
const foo = await this.getQueue(
|
||||
"process.import"
|
||||
).pause();
|
||||
console.log("paused", foo);
|
||||
return foo;
|
||||
case "resume":
|
||||
const soo = await this.getQueue(
|
||||
"process.import"
|
||||
).resume();
|
||||
console.log("resumed", soo);
|
||||
return soo;
|
||||
default:
|
||||
console.log("Unrecognized queue action.");
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
async started(): Promise<any> {
|
||||
await this.getQueue("process.import").on(
|
||||
"failed",
|
||||
async (job, error) => {
|
||||
console.error(
|
||||
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
|
||||
);
|
||||
console.error(job.data);
|
||||
}
|
||||
);
|
||||
await this.getQueue("process.import").on(
|
||||
"completed",
|
||||
async (job, res) => {
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/", //optional
|
||||
event: "action",
|
||||
args: [{ type: "LS_COVER_EXTRACTED", result: res }], //optional
|
||||
});
|
||||
console.info(
|
||||
`Import Job with the id '${job.id}' completed.`
|
||||
);
|
||||
}
|
||||
);
|
||||
await this.getQueue("process.import").on(
|
||||
"stalled",
|
||||
async (job) => {
|
||||
console.warn(`Import job '${job.id} stalled!`);
|
||||
console.log(`${JSON.stringify(job, null, 2)}`);
|
||||
console.log(`is stalled.`);
|
||||
}
|
||||
);
|
||||
|
||||
await this.getQueue("process.uncompressAndResize").on(
|
||||
"completed",
|
||||
async (job, res) => {
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "action",
|
||||
args: [
|
||||
{
|
||||
type: "COMICBOOK_EXTRACTION_SUCCESS",
|
||||
result: {
|
||||
files: res,
|
||||
purpose: job.data.options.purpose,
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
console.info(`Uncompression Job ${job.id} completed.`);
|
||||
}
|
||||
);
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
448
services/jobqueue.service.ts
Normal file
448
services/jobqueue.service.ts
Normal file
@@ -0,0 +1,448 @@
|
||||
import { Context, Service, ServiceBroker } from "moleculer";
|
||||
import JobResult from "../models/jobresult.model";
|
||||
import { refineQuery } from "filename-parser";
|
||||
import BullMqMixin from "moleculer-bullmq";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
const ObjectId = require("mongoose").Types.ObjectId;
|
||||
import {
|
||||
extractFromArchive,
|
||||
uncompressEntireArchive,
|
||||
} from "../utils/uncompression.utils";
|
||||
import { isNil, isUndefined } from "lodash";
|
||||
import { pubClient } from "../config/redis.config";
|
||||
import path from "path";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
|
||||
export default class JobQueueService extends Service {
|
||||
public constructor(public broker: ServiceBroker) {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "jobqueue",
|
||||
hooks: {},
|
||||
mixins: [DbMixin("comics", Comic), BullMqMixin],
|
||||
|
||||
settings: {
|
||||
bullmq: {
|
||||
client: pubClient,
|
||||
},
|
||||
},
|
||||
actions: {
|
||||
getJobCountsByType: {
|
||||
rest: "GET /getJobCountsByType",
|
||||
handler: async (ctx: Context<{}>) => {
|
||||
console.log(ctx.params);
|
||||
return await this.$resolve("jobqueue").getJobCounts();
|
||||
},
|
||||
},
|
||||
toggle: {
|
||||
rest: "GET /toggle",
|
||||
handler: async (ctx: Context<{ action: String }>) => {
|
||||
switch (ctx.params.action) {
|
||||
case "pause":
|
||||
this.pause();
|
||||
break;
|
||||
case "resume":
|
||||
this.resume();
|
||||
break;
|
||||
default:
|
||||
console.log(`Unknown queue action.`);
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
enqueue: {
|
||||
queue: true,
|
||||
rest: "GET /enqueue",
|
||||
handler: async (
|
||||
ctx: Context<{ action: string; description: string }>
|
||||
) => {
|
||||
try {
|
||||
const { action, description } = ctx.params;
|
||||
// Enqueue the job
|
||||
const job = await this.localQueue(
|
||||
ctx,
|
||||
action,
|
||||
{},
|
||||
{
|
||||
priority: 10,
|
||||
}
|
||||
);
|
||||
console.log(`Job ${job.id} enqueued`);
|
||||
console.log(`${description}`);
|
||||
|
||||
return job.id;
|
||||
} catch (error) {
|
||||
console.error("Failed to enqueue job:", error);
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
// Comic Book Import Job Queue
|
||||
"enqueue.async": {
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
sessionId: String;
|
||||
}>
|
||||
) => {
|
||||
try {
|
||||
console.log(
|
||||
`Recieved Job ID ${ctx.locals.job.id}, processing...`
|
||||
);
|
||||
// 1. De-structure the job params
|
||||
const { fileObject } = ctx.locals.job.data.params;
|
||||
|
||||
// 2. Extract metadata from the archive
|
||||
const result = await extractFromArchive(
|
||||
fileObject.filePath
|
||||
);
|
||||
const {
|
||||
name,
|
||||
filePath,
|
||||
fileSize,
|
||||
extension,
|
||||
mimeType,
|
||||
cover,
|
||||
containedIn,
|
||||
comicInfoJSON,
|
||||
} = result;
|
||||
|
||||
// 3a. Infer any issue-related metadata from the filename
|
||||
const { inferredIssueDetails } = refineQuery(
|
||||
result.name
|
||||
);
|
||||
console.log(
|
||||
"Issue metadata inferred: ",
|
||||
JSON.stringify(inferredIssueDetails, null, 2)
|
||||
);
|
||||
|
||||
// 3b. Orchestrate the payload
|
||||
const payload = {
|
||||
importStatus: {
|
||||
isImported: true,
|
||||
tagged: false,
|
||||
matchedResult: {
|
||||
score: "0",
|
||||
},
|
||||
},
|
||||
rawFileDetails: {
|
||||
name,
|
||||
filePath,
|
||||
fileSize,
|
||||
extension,
|
||||
mimeType,
|
||||
containedIn,
|
||||
cover,
|
||||
},
|
||||
inferredMetadata: {
|
||||
issue: inferredIssueDetails,
|
||||
},
|
||||
sourcedMetadata: {
|
||||
// except for ComicInfo.xml, everything else should be copied over from the
|
||||
// parent comic
|
||||
comicInfo: comicInfoJSON,
|
||||
},
|
||||
// since we already have at least 1 copy
|
||||
// mark it as not wanted by default
|
||||
"acquisition.source.wanted": false,
|
||||
|
||||
// clear out the downloads array
|
||||
// "acquisition.directconnect.downloads": [],
|
||||
|
||||
// mark the metadata source
|
||||
"acquisition.source.name":
|
||||
ctx.locals.job.data.params.sourcedFrom,
|
||||
};
|
||||
|
||||
// 3c. Add the bundleId, if present to the payload
|
||||
let bundleId = null;
|
||||
if (!isNil(ctx.locals.job.data.params.bundleId)) {
|
||||
bundleId = ctx.locals.job.data.params.bundleId;
|
||||
}
|
||||
|
||||
// 3d. Add the sourcedMetadata, if present
|
||||
if (
|
||||
!isNil(
|
||||
ctx.locals.job.data.params.sourcedMetadata
|
||||
) &&
|
||||
!isUndefined(
|
||||
ctx.locals.job.data.params.sourcedMetadata
|
||||
.comicvine
|
||||
)
|
||||
) {
|
||||
Object.assign(
|
||||
payload.sourcedMetadata,
|
||||
ctx.locals.job.data.params.sourcedMetadata
|
||||
);
|
||||
}
|
||||
|
||||
// 4. write to mongo
|
||||
const importResult = await this.broker.call(
|
||||
"library.rawImportToDB",
|
||||
{
|
||||
importType:
|
||||
ctx.locals.job.data.params.importType,
|
||||
bundleId,
|
||||
payload,
|
||||
}
|
||||
);
|
||||
return {
|
||||
data: {
|
||||
importResult,
|
||||
},
|
||||
id: ctx.locals.job.id,
|
||||
sessionId: ctx.params.sessionId,
|
||||
};
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`An error occurred processing Job ID ${ctx.locals.job.id}`
|
||||
);
|
||||
throw new MoleculerError(
|
||||
error,
|
||||
500,
|
||||
"IMPORT_JOB_ERROR",
|
||||
{
|
||||
data: ctx.params.sessionId,
|
||||
}
|
||||
);
|
||||
}
|
||||
},
|
||||
},
|
||||
getJobResultStatistics: {
|
||||
rest: "GET /getJobResultStatistics",
|
||||
handler: async (ctx: Context<{}>) => {
|
||||
return await JobResult.aggregate([
|
||||
{
|
||||
$group: {
|
||||
_id: {
|
||||
sessionId: "$sessionId",
|
||||
status: "$status",
|
||||
},
|
||||
earliestTimestamp: {
|
||||
$min: "$timestamp",
|
||||
},
|
||||
count: {
|
||||
$sum: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
$group: {
|
||||
_id: "$_id.sessionId",
|
||||
statuses: {
|
||||
$push: {
|
||||
status: "$_id.status",
|
||||
earliestTimestamp:
|
||||
"$earliestTimestamp",
|
||||
count: "$count",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
$project: {
|
||||
_id: 0,
|
||||
sessionId: "$_id",
|
||||
completedJobs: {
|
||||
$reduce: {
|
||||
input: "$statuses",
|
||||
initialValue: 0,
|
||||
in: {
|
||||
$sum: [
|
||||
"$$value",
|
||||
{
|
||||
$cond: [
|
||||
{
|
||||
$eq: [
|
||||
"$$this.status",
|
||||
"completed",
|
||||
],
|
||||
},
|
||||
"$$this.count",
|
||||
0,
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
failedJobs: {
|
||||
$reduce: {
|
||||
input: "$statuses",
|
||||
initialValue: 0,
|
||||
in: {
|
||||
$sum: [
|
||||
"$$value",
|
||||
{
|
||||
$cond: [
|
||||
{
|
||||
$eq: [
|
||||
"$$this.status",
|
||||
"failed",
|
||||
],
|
||||
},
|
||||
"$$this.count",
|
||||
0,
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
earliestTimestamp: {
|
||||
$min: "$statuses.earliestTimestamp",
|
||||
},
|
||||
},
|
||||
},
|
||||
]);
|
||||
},
|
||||
},
|
||||
"uncompressFullArchive.async": {
|
||||
rest: "POST /uncompressFullArchive",
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
filePath: string;
|
||||
comicObjectId: string;
|
||||
options: any;
|
||||
}>
|
||||
) => {
|
||||
console.log(
|
||||
`Recieved Job ID ${JSON.stringify(
|
||||
ctx.locals
|
||||
)}, processing...`
|
||||
);
|
||||
const { filePath, options, comicObjectId } = ctx.params;
|
||||
const comicId = new ObjectId(comicObjectId);
|
||||
// 2. Extract metadata from the archive
|
||||
const result: string[] = await uncompressEntireArchive(
|
||||
filePath,
|
||||
options
|
||||
);
|
||||
if (Array.isArray(result) && result.length !== 0) {
|
||||
// Get the containing directory of the uncompressed archive
|
||||
const directoryPath = path.dirname(result[0]);
|
||||
// Add to mongo object
|
||||
await Comic.findByIdAndUpdate(
|
||||
comicId,
|
||||
{
|
||||
$set: {
|
||||
"rawFileDetails.archive": {
|
||||
uncompressed: true,
|
||||
expandedPath: directoryPath,
|
||||
},
|
||||
},
|
||||
},
|
||||
{ new: true, safe: true, upsert: true }
|
||||
);
|
||||
return result;
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
events: {
|
||||
async "uncompressFullArchive.async.active"(
|
||||
ctx: Context<{ id: number }>
|
||||
) {
|
||||
console.log(
|
||||
`Uncompression Job ID ${ctx.params.id} is set to active.`
|
||||
);
|
||||
},
|
||||
async "uncompressFullArchive.async.completed"(
|
||||
ctx: Context<{ id: number }>
|
||||
) {
|
||||
console.log(
|
||||
`Uncompression Job ID ${ctx.params.id} completed.`
|
||||
);
|
||||
const job = await this.job(ctx.params.id);
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "LS_UNCOMPRESSION_JOB_COMPLETE",
|
||||
args: [
|
||||
{
|
||||
uncompressedArchive: job.returnvalue,
|
||||
},
|
||||
],
|
||||
});
|
||||
return job.returnvalue;
|
||||
},
|
||||
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
|
||||
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
|
||||
console.log(`Job ID ${ctx.params.id} is set to active.`);
|
||||
},
|
||||
async drained(ctx) {
|
||||
console.log("Queue drained.");
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "LS_IMPORT_QUEUE_DRAINED",
|
||||
args: [
|
||||
{
|
||||
message: "drained",
|
||||
},
|
||||
],
|
||||
});
|
||||
},
|
||||
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
|
||||
// 1. Fetch the job result using the job Id
|
||||
const job = await this.job(ctx.params.id);
|
||||
// 2. Increment the completed job counter
|
||||
await pubClient.incr("completedJobCount");
|
||||
// 3. Fetch the completed job count for the final payload to be sent to the client
|
||||
const completedJobCount = await pubClient.get(
|
||||
"completedJobCount"
|
||||
);
|
||||
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "LS_COVER_EXTRACTED",
|
||||
args: [
|
||||
{
|
||||
completedJobCount,
|
||||
importResult: job.returnvalue.data.importResult,
|
||||
},
|
||||
],
|
||||
});
|
||||
// 5. Persist the job results in mongo for posterity
|
||||
await JobResult.create({
|
||||
id: ctx.params.id,
|
||||
status: "completed",
|
||||
timestamp: job.timestamp,
|
||||
sessionId: job.returnvalue.sessionId,
|
||||
failedReason: {},
|
||||
});
|
||||
|
||||
console.log(`Job ID ${ctx.params.id} completed.`);
|
||||
},
|
||||
|
||||
async "enqueue.async.failed"(ctx) {
|
||||
const job = await this.job(ctx.params.id);
|
||||
await pubClient.incr("failedJobCount");
|
||||
const failedJobCount = await pubClient.get(
|
||||
"failedJobCount"
|
||||
);
|
||||
|
||||
await JobResult.create({
|
||||
id: ctx.params.id,
|
||||
status: "failed",
|
||||
failedReason: job.failedReason,
|
||||
sessionId: job.data.params.sessionId,
|
||||
timestamp: job.timestamp,
|
||||
});
|
||||
|
||||
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "LS_COVER_EXTRACTION_FAILED",
|
||||
args: [
|
||||
{
|
||||
failedJobCount,
|
||||
importResult: job,
|
||||
},
|
||||
],
|
||||
});
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -51,31 +51,50 @@ import {
|
||||
IExtractionOptions,
|
||||
} from "threetwo-ui-typings";
|
||||
const ObjectId = require("mongoose").Types.ObjectId;
|
||||
import { pubClient } from "../config/redis.config";
|
||||
import fsExtra from "fs-extra";
|
||||
const through2 = require("through2");
|
||||
import klaw from "klaw";
|
||||
import path from "path";
|
||||
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
|
||||
|
||||
console.log(`MONGO -> ${process.env.MONGO_URI}`);
|
||||
export default class ImportService extends Service {
|
||||
public constructor(public broker: ServiceBroker) {
|
||||
export default class LibraryService extends Service {
|
||||
public constructor(
|
||||
public broker: ServiceBroker,
|
||||
schema: ServiceSchema<{}> = { name: "library" }
|
||||
) {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "library",
|
||||
mixins: [DbMixin("comics", Comic)],
|
||||
hooks: {},
|
||||
actions: {
|
||||
getHealthInformation: {
|
||||
rest: "GET /getHealthInformation",
|
||||
params: {},
|
||||
handler: async (ctx: Context<{}>) => {
|
||||
try {
|
||||
return await ctx.broker.call("$node.services");
|
||||
} catch (error) {
|
||||
return new Error("Service is down.");
|
||||
}
|
||||
},
|
||||
},
|
||||
walkFolders: {
|
||||
rest: "POST /walkFolders",
|
||||
params: {
|
||||
basePathToWalk: "string",
|
||||
},
|
||||
async handler(ctx: Context<{ basePathToWalk: string }>) {
|
||||
params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
basePathToWalk: string;
|
||||
extensions: string[];
|
||||
}>
|
||||
) {
|
||||
console.log(ctx.params);
|
||||
return await walkFolder(ctx.params.basePathToWalk, [
|
||||
".cbz",
|
||||
".cbr",
|
||||
".cb7",
|
||||
...ctx.params.extensions,
|
||||
]);
|
||||
},
|
||||
},
|
||||
@@ -90,11 +109,18 @@ export default class ImportService extends Service {
|
||||
rest: "POST /uncompressFullArchive",
|
||||
params: {},
|
||||
handler: async (
|
||||
ctx: Context<{ filePath: string; options: any }>
|
||||
ctx: Context<{
|
||||
filePath: string;
|
||||
comicObjectId: string;
|
||||
options: any;
|
||||
}>
|
||||
) => {
|
||||
await broker.call("importqueue.uncompressResize", {
|
||||
this.broker.call("jobqueue.enqueue", {
|
||||
filePath: ctx.params.filePath,
|
||||
comicObjectId: ctx.params.comicObjectId,
|
||||
options: ctx.params.options,
|
||||
action: "uncompressFullArchive.async",
|
||||
description: `Job for uncompressing archive at ${ctx.params.filePath}`,
|
||||
});
|
||||
},
|
||||
},
|
||||
@@ -139,64 +165,54 @@ export default class ImportService extends Service {
|
||||
},
|
||||
newImport: {
|
||||
rest: "POST /newImport",
|
||||
params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
extractionOptions?: any;
|
||||
}>
|
||||
) {
|
||||
// 1. Walk the Source folder
|
||||
klaw(path.resolve(COMICS_DIRECTORY))
|
||||
// 1.1 Filter on .cb* extensions
|
||||
.pipe(
|
||||
through2.obj(function (item, enc, next) {
|
||||
let fileExtension = path.extname(item.path);
|
||||
if (
|
||||
[".cbz", ".cbr", ".cb7"].includes(
|
||||
fileExtension
|
||||
)
|
||||
) {
|
||||
this.push(item);
|
||||
}
|
||||
next();
|
||||
})
|
||||
)
|
||||
// 1.2 Pipe filtered results to the next step
|
||||
.on("data", async (item) => {
|
||||
async handler(ctx) {
|
||||
const { sessionId } = ctx.params;
|
||||
try {
|
||||
// Initialize Redis counters once at the start of the import
|
||||
await pubClient.set("completedJobCount", 0);
|
||||
await pubClient.set("failedJobCount", 0);
|
||||
|
||||
// Convert klaw to use a promise-based approach for better flow control
|
||||
const files = await this.getComicFiles(
|
||||
process.env.COMICS_DIRECTORY
|
||||
);
|
||||
for (const file of files) {
|
||||
console.info(
|
||||
"Found a file at path: %s",
|
||||
item.path
|
||||
"Found a file at path:",
|
||||
file.path
|
||||
);
|
||||
let comicExists = await Comic.exists({
|
||||
"rawFileDetails.name": `${path.basename(
|
||||
item.path,
|
||||
path.extname(item.path)
|
||||
)}`,
|
||||
const comicExists = await Comic.exists({
|
||||
"rawFileDetails.name": path.basename(
|
||||
file.path,
|
||||
path.extname(file.path)
|
||||
),
|
||||
});
|
||||
if (!comicExists) {
|
||||
// 2. Send the extraction job to the queue
|
||||
await broker.call(
|
||||
"importqueue.processImport",
|
||||
{
|
||||
fileObject: {
|
||||
filePath: item.path,
|
||||
fileSize: item.stats.size,
|
||||
},
|
||||
importType: "new",
|
||||
}
|
||||
);
|
||||
// Send the extraction job to the queue
|
||||
await this.broker.call("jobqueue.enqueue", {
|
||||
fileObject: {
|
||||
filePath: file.path,
|
||||
fileSize: file.stats.size,
|
||||
},
|
||||
sessionId,
|
||||
importType: "new",
|
||||
action: "enqueue.async",
|
||||
});
|
||||
} else {
|
||||
console.log(
|
||||
"Comic already exists in the library."
|
||||
);
|
||||
}
|
||||
})
|
||||
.on("end", () => {
|
||||
console.log("All files traversed.");
|
||||
});
|
||||
}
|
||||
console.log("All files traversed.");
|
||||
} catch (error) {
|
||||
console.error(
|
||||
"Error during newImport processing:",
|
||||
error
|
||||
);
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
rawImportToDB: {
|
||||
rest: "POST /rawImportToDB",
|
||||
params: {},
|
||||
@@ -207,10 +223,7 @@ export default class ImportService extends Service {
|
||||
payload: {
|
||||
_id?: string;
|
||||
sourcedMetadata: {
|
||||
comicvine?: {
|
||||
volume: { api_detail_url: string };
|
||||
volumeInformation: {};
|
||||
};
|
||||
comicvine?: any;
|
||||
locg?: {};
|
||||
};
|
||||
inferredMetadata: {
|
||||
@@ -219,11 +232,13 @@ export default class ImportService extends Service {
|
||||
rawFileDetails: {
|
||||
name: string;
|
||||
};
|
||||
wanted: {
|
||||
issues: [];
|
||||
volume: { id: number };
|
||||
source: string;
|
||||
markEntireVolumeWanted: Boolean;
|
||||
};
|
||||
acquisition: {
|
||||
source: {
|
||||
wanted: boolean;
|
||||
name?: string;
|
||||
};
|
||||
directconnect: {
|
||||
downloads: [];
|
||||
};
|
||||
@@ -232,61 +247,139 @@ export default class ImportService extends Service {
|
||||
}>
|
||||
) {
|
||||
try {
|
||||
let volumeDetails;
|
||||
const comicMetadata = ctx.params.payload;
|
||||
// When an issue is added from the search CV feature
|
||||
// we solicit volume information and add that to mongo
|
||||
if (
|
||||
comicMetadata.sourcedMetadata.comicvine &&
|
||||
!isNil(
|
||||
comicMetadata.sourcedMetadata.comicvine
|
||||
.volume
|
||||
)
|
||||
) {
|
||||
volumeDetails = await this.broker.call(
|
||||
"comicvine.getVolumes",
|
||||
{
|
||||
volumeURI:
|
||||
comicMetadata.sourcedMetadata
|
||||
.comicvine.volume
|
||||
.api_detail_url,
|
||||
}
|
||||
);
|
||||
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
|
||||
volumeDetails.results;
|
||||
}
|
||||
console.log(
|
||||
JSON.stringify(ctx.params.payload, null, 4)
|
||||
);
|
||||
const { payload } = ctx.params;
|
||||
const { wanted } = payload;
|
||||
|
||||
console.log("Saving to Mongo...");
|
||||
console.log(
|
||||
`Import type: [${ctx.params.importType}]`
|
||||
);
|
||||
switch (ctx.params.importType) {
|
||||
case "new":
|
||||
return await Comic.create(comicMetadata);
|
||||
case "update":
|
||||
return await Comic.findOneAndUpdate(
|
||||
{
|
||||
"acquisition.directconnect.downloads.bundleId":
|
||||
ctx.params.bundleId,
|
||||
},
|
||||
comicMetadata,
|
||||
{
|
||||
upsert: true,
|
||||
new: true,
|
||||
}
|
||||
);
|
||||
default:
|
||||
return false;
|
||||
|
||||
if (
|
||||
!wanted ||
|
||||
!wanted.volume ||
|
||||
!wanted.volume.id
|
||||
) {
|
||||
console.log(
|
||||
"No valid identifier for upsert. Attempting to create a new document with minimal data..."
|
||||
);
|
||||
const newDocument = new Comic(payload); // Using the entire payload for the new document
|
||||
|
||||
await newDocument.save();
|
||||
return {
|
||||
success: true,
|
||||
message:
|
||||
"New document created due to lack of valid identifiers.",
|
||||
data: newDocument,
|
||||
};
|
||||
}
|
||||
|
||||
let condition = {
|
||||
"wanted.volume.id": wanted.volume.id,
|
||||
};
|
||||
|
||||
let update: any = {
|
||||
// Using 'any' to bypass strict type checks; alternatively, define a more accurate type
|
||||
$set: {
|
||||
rawFileDetails: payload.rawFileDetails,
|
||||
inferredMetadata: payload.inferredMetadata,
|
||||
sourcedMetadata: payload.sourcedMetadata,
|
||||
},
|
||||
$setOnInsert: {
|
||||
"wanted.source": payload.wanted.source,
|
||||
"wanted.markEntireVolumeWanted":
|
||||
payload.wanted.markEntireVolumeWanted,
|
||||
"wanted.volume": payload.wanted.volume,
|
||||
},
|
||||
};
|
||||
|
||||
if (wanted.issues && wanted.issues.length > 0) {
|
||||
update.$addToSet = {
|
||||
"wanted.issues": { $each: wanted.issues },
|
||||
};
|
||||
}
|
||||
|
||||
const options = {
|
||||
upsert: true,
|
||||
new: true,
|
||||
};
|
||||
|
||||
const result = await Comic.findOneAndUpdate(
|
||||
condition,
|
||||
update,
|
||||
options
|
||||
);
|
||||
console.log(
|
||||
"Operation completed. Document updated or inserted:",
|
||||
result
|
||||
);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: "Document successfully upserted.",
|
||||
data: result,
|
||||
};
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
throw new Errors.MoleculerError(
|
||||
"Import failed.",
|
||||
"Operation failed.",
|
||||
500
|
||||
);
|
||||
}
|
||||
},
|
||||
},
|
||||
getComicsMarkedAsWanted: {
|
||||
rest: "GET /getComicsMarkedAsWanted",
|
||||
params: {
|
||||
page: { type: "number", default: 1 },
|
||||
limit: { type: "number", default: 100 },
|
||||
},
|
||||
handler: async (
|
||||
ctx: Context<{ page: number; limit: number }>
|
||||
) => {
|
||||
const { page, limit } = ctx.params;
|
||||
this.logger.info(
|
||||
`Requesting page ${page} with limit ${limit}`
|
||||
);
|
||||
try {
|
||||
const options = {
|
||||
page,
|
||||
limit,
|
||||
lean: true,
|
||||
};
|
||||
|
||||
const result = await Comic.paginate(
|
||||
{
|
||||
wanted: { $exists: true },
|
||||
$or: [
|
||||
{
|
||||
"wanted.markEntireVolumeWanted":
|
||||
true,
|
||||
},
|
||||
{
|
||||
"wanted.issues": {
|
||||
$not: { $size: 0 },
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
options
|
||||
);
|
||||
|
||||
// Log the raw result from the database
|
||||
this.logger.info(
|
||||
"Paginate result:",
|
||||
JSON.stringify(result, null, 2)
|
||||
);
|
||||
|
||||
return result.docs; // Return just the docs array
|
||||
} catch (error) {
|
||||
this.logger.error("Error finding comics:", error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
applyComicVineMetadata: {
|
||||
rest: "POST /applyComicVineMetadata",
|
||||
params: {},
|
||||
@@ -383,6 +476,66 @@ export default class ImportService extends Service {
|
||||
});
|
||||
},
|
||||
},
|
||||
applyTorrentDownloadMetadata: {
|
||||
rest: "POST /applyTorrentDownloadMetadata",
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
torrentToDownload: any;
|
||||
comicObjectId: String;
|
||||
infoHash: String;
|
||||
name: String;
|
||||
announce: [String];
|
||||
}>
|
||||
) => {
|
||||
const {
|
||||
name,
|
||||
torrentToDownload,
|
||||
comicObjectId,
|
||||
announce,
|
||||
infoHash,
|
||||
} = ctx.params;
|
||||
console.log(JSON.stringify(ctx.params, null, 4));
|
||||
try {
|
||||
return await Comic.findByIdAndUpdate(
|
||||
new ObjectId(comicObjectId),
|
||||
{
|
||||
$push: {
|
||||
"acquisition.torrent": {
|
||||
infoHash,
|
||||
name,
|
||||
announce,
|
||||
},
|
||||
},
|
||||
},
|
||||
{ new: true, safe: true, upsert: true }
|
||||
);
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
}
|
||||
},
|
||||
},
|
||||
getInfoHashes: {
|
||||
rest: "GET /getInfoHashes",
|
||||
handler: async (ctx: Context<{}>) => {
|
||||
try {
|
||||
return await Comic.aggregate([
|
||||
{
|
||||
$unwind: "$acquisition.torrent",
|
||||
},
|
||||
{
|
||||
$group: {
|
||||
_id: "$_id",
|
||||
infoHashes: {
|
||||
$push: "$acquisition.torrent.infoHash",
|
||||
},
|
||||
},
|
||||
},
|
||||
]);
|
||||
} catch (err) {
|
||||
return err;
|
||||
}
|
||||
},
|
||||
},
|
||||
getComicBooks: {
|
||||
rest: "POST /getComicBooks",
|
||||
params: {},
|
||||
@@ -402,6 +555,7 @@ export default class ImportService extends Service {
|
||||
rest: "POST /getComicBookById",
|
||||
params: { id: "string" },
|
||||
async handler(ctx: Context<{ id: string }>) {
|
||||
console.log(ctx.params.id);
|
||||
return await Comic.findById(ctx.params.id);
|
||||
},
|
||||
},
|
||||
@@ -670,7 +824,35 @@ export default class ImportService extends Service {
|
||||
},
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
methods: {
|
||||
// Method to walk the directory and filter comic files
|
||||
getComicFiles: (directory) => {
|
||||
return new Promise((resolve, reject) => {
|
||||
const files = [];
|
||||
klaw(directory)
|
||||
.pipe(
|
||||
through2.obj(function (item, enc, next) {
|
||||
const fileExtension = path.extname(
|
||||
item.path
|
||||
);
|
||||
if (
|
||||
[".cbz", ".cbr", ".cb7"].includes(
|
||||
fileExtension
|
||||
)
|
||||
) {
|
||||
this.push(item);
|
||||
}
|
||||
next();
|
||||
})
|
||||
)
|
||||
.on("data", (item) => {
|
||||
files.push(item);
|
||||
})
|
||||
.on("end", () => resolve(files))
|
||||
.on("error", (err) => reject(err));
|
||||
});
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,14 +46,15 @@ export default class SettingsService extends Service {
|
||||
.map((item) => JSON.stringify(item))
|
||||
.join("\n");
|
||||
queries += "\n";
|
||||
const { body } = await eSClient.msearch({
|
||||
const { responses } = await eSClient.msearch({
|
||||
body: queries,
|
||||
});
|
||||
body.responses.forEach((match) => {
|
||||
|
||||
responses.forEach((match) => {
|
||||
console.log(match.hits);
|
||||
});
|
||||
|
||||
return body.responses;
|
||||
return responses;
|
||||
},
|
||||
},
|
||||
issue: {
|
||||
@@ -74,9 +75,9 @@ export default class SettingsService extends Service {
|
||||
) => {
|
||||
try {
|
||||
console.log(ctx.params);
|
||||
const { query, pagination } = ctx.params;
|
||||
const { query, pagination, type } = ctx.params;
|
||||
let eSQuery = {};
|
||||
switch (ctx.params.type) {
|
||||
switch (type) {
|
||||
case "all":
|
||||
Object.assign(eSQuery, {
|
||||
match_all: {},
|
||||
@@ -99,12 +100,19 @@ export default class SettingsService extends Service {
|
||||
case "wanted":
|
||||
Object.assign(eSQuery, {
|
||||
bool: {
|
||||
must: {
|
||||
term: {
|
||||
"acquisition.source.wanted":
|
||||
true,
|
||||
should: [
|
||||
{
|
||||
exists: {
|
||||
field: "wanted.issues",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
exists: {
|
||||
field: "wanted.volume",
|
||||
},
|
||||
},
|
||||
],
|
||||
minimum_should_match: 1,
|
||||
},
|
||||
});
|
||||
break;
|
||||
|
||||
@@ -8,7 +8,7 @@ import {
|
||||
} from "moleculer";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Settings from "../models/settings.model";
|
||||
import { isEmpty, pickBy, identity, map } from "lodash";
|
||||
import { isEmpty, pickBy, identity, map, isNil } from "lodash";
|
||||
const ObjectId = require("mongoose").Types.ObjectId;
|
||||
|
||||
export default class SettingsService extends Service {
|
||||
@@ -28,12 +28,31 @@ export default class SettingsService extends Service {
|
||||
rest: "GET /getAllSettings",
|
||||
params: {},
|
||||
async handler(ctx: Context<{ settingsKey: string }>) {
|
||||
const settings = await Settings.find({});
|
||||
if (isEmpty(settings)) {
|
||||
const { settingsKey } = ctx.params;
|
||||
|
||||
// Initialize a projection object. Include everything by default.
|
||||
let projection = settingsKey
|
||||
? { _id: 0, [settingsKey]: 1 }
|
||||
: {};
|
||||
|
||||
// Find the settings with the dynamic projection
|
||||
const settings = await Settings.find({}, projection);
|
||||
|
||||
if (settings.length === 0) {
|
||||
return {};
|
||||
}
|
||||
console.log(settings[0]);
|
||||
return settings[0];
|
||||
|
||||
// If settingsKey is provided, return the specific part of the settings.
|
||||
// Otherwise, return the entire settings document.
|
||||
if (settingsKey) {
|
||||
// Check if the specific key exists in the settings document.
|
||||
// Since `settings` is an array, we access the first element.
|
||||
// Then, we use the settingsKey to return only that part of the document.
|
||||
return settings[0][settingsKey] || {};
|
||||
} else {
|
||||
// Return the entire settings document
|
||||
return settings[0];
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
@@ -42,44 +61,107 @@ export default class SettingsService extends Service {
|
||||
params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
settingsPayload: {
|
||||
host: object;
|
||||
airDCPPUserSettings: object;
|
||||
hubs: [];
|
||||
settingsPayload?: {
|
||||
protocol: string;
|
||||
hostname: string;
|
||||
port: string;
|
||||
username: string;
|
||||
password: string;
|
||||
_id?: string;
|
||||
airDCPPUserSettings?: object;
|
||||
hubs?: [];
|
||||
};
|
||||
settingsObjectId: string;
|
||||
settingsObjectId?: string;
|
||||
settingsKey: string;
|
||||
}>
|
||||
) {
|
||||
console.log("varan bhat", ctx.params);
|
||||
const { host, airDCPPUserSettings, hubs } =
|
||||
ctx.params.settingsPayload;
|
||||
let query = {
|
||||
host,
|
||||
airDCPPUserSettings,
|
||||
hubs,
|
||||
};
|
||||
const keysToUpdate = pickBy(query, identity);
|
||||
let updateQuery = {};
|
||||
try {
|
||||
console.log(ctx.params);
|
||||
let query = {};
|
||||
const { settingsKey, settingsObjectId } =
|
||||
ctx.params;
|
||||
const {
|
||||
hostname,
|
||||
protocol,
|
||||
port,
|
||||
username,
|
||||
password,
|
||||
} = ctx.params.settingsPayload;
|
||||
const host = {
|
||||
hostname,
|
||||
protocol,
|
||||
port,
|
||||
username,
|
||||
password,
|
||||
};
|
||||
const undefinedPropsInHostname = Object.values(
|
||||
host
|
||||
).filter((value) => value === undefined);
|
||||
|
||||
map(Object.keys(keysToUpdate), (key) => {
|
||||
updateQuery[`directConnect.client.${key}`] =
|
||||
query[key];
|
||||
});
|
||||
const options = {
|
||||
upsert: true,
|
||||
new: true,
|
||||
setDefaultsOnInsert: true,
|
||||
};
|
||||
const filter = {
|
||||
_id: new ObjectId(ctx.params.settingsObjectId),
|
||||
};
|
||||
const result = Settings.findOneAndUpdate(
|
||||
filter,
|
||||
{ $set: updateQuery },
|
||||
options
|
||||
);
|
||||
// Update, depending what key was passed in params
|
||||
// 1. Construct the update query
|
||||
switch (settingsKey) {
|
||||
case "bittorrent":
|
||||
console.log(
|
||||
`Recieved settings for ${settingsKey}, building query...`
|
||||
);
|
||||
query = {
|
||||
...(undefinedPropsInHostname.length ===
|
||||
0 && {
|
||||
$set: {
|
||||
"bittorrent.client.host": host,
|
||||
},
|
||||
}),
|
||||
};
|
||||
break;
|
||||
case "directConnect":
|
||||
console.log(
|
||||
`Recieved settings for ${settingsKey}, building query...`
|
||||
);
|
||||
const { hubs, airDCPPUserSettings } =
|
||||
ctx.params.settingsPayload;
|
||||
query = {
|
||||
...(undefinedPropsInHostname.length ===
|
||||
0 && {
|
||||
$set: {
|
||||
"directConnect.client.host":
|
||||
host,
|
||||
},
|
||||
}),
|
||||
...(!isNil(hubs) && {
|
||||
$set: {
|
||||
"directConnect.client.hubs":
|
||||
hubs,
|
||||
},
|
||||
}),
|
||||
};
|
||||
console.log(JSON.stringify(query, null, 4));
|
||||
break;
|
||||
|
||||
return result;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
||||
// 2. Set up options, filters
|
||||
const options = {
|
||||
upsert: true,
|
||||
setDefaultsOnInsert: true,
|
||||
returnDocument: "after",
|
||||
};
|
||||
const filter = settingsObjectId
|
||||
? { _id: settingsObjectId }
|
||||
: {};
|
||||
|
||||
// 3. Execute the mongo query
|
||||
const result = await Settings.findOneAndUpdate(
|
||||
filter,
|
||||
query,
|
||||
options
|
||||
);
|
||||
return result;
|
||||
} catch (err) {
|
||||
return err;
|
||||
}
|
||||
},
|
||||
},
|
||||
deleteSettings: {
|
||||
|
||||
@@ -1,16 +1,14 @@
|
||||
"use strict";
|
||||
import { Service, ServiceBroker, ServiceSchema } from "moleculer";
|
||||
import { createClient } from "redis";
|
||||
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
|
||||
import { JobType } from "moleculer-bullmq";
|
||||
import { createAdapter } from "@socket.io/redis-adapter";
|
||||
import Session from "../models/session.model";
|
||||
import { pubClient, subClient } from "../config/redis.config";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
const SocketIOService = require("moleculer-io");
|
||||
const redisURL = new URL(process.env.REDIS_URI);
|
||||
// console.log(redisURL.hostname);
|
||||
const { v4: uuidv4 } = require("uuid");
|
||||
import AirDCPPSocket from "../shared/airdcpp.socket";
|
||||
|
||||
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
|
||||
(async () => {
|
||||
await pubClient.connect();
|
||||
})();
|
||||
const subClient = pubClient.duplicate();
|
||||
export default class SocketService extends Service {
|
||||
// @ts-ignore
|
||||
public constructor(
|
||||
@@ -25,48 +23,12 @@ export default class SocketService extends Service {
|
||||
port: process.env.PORT || 3001,
|
||||
io: {
|
||||
namespaces: {
|
||||
"/": {
|
||||
"/automated": {
|
||||
events: {
|
||||
call: {
|
||||
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
|
||||
},
|
||||
action: async (data, ack) => {
|
||||
// write your handler function here.
|
||||
switch (data.type) {
|
||||
case "LS_IMPORT":
|
||||
console.log(
|
||||
`Recieved ${data.type} event.`
|
||||
);
|
||||
// 1. Send task to queue
|
||||
await this.broker.call(
|
||||
"library.newImport",
|
||||
data.data,
|
||||
{}
|
||||
);
|
||||
break;
|
||||
case "LS_TOGGLE_IMPORT_QUEUE":
|
||||
await this.broker.call(
|
||||
"importqueue.toggleImportQueue",
|
||||
data.data,
|
||||
{}
|
||||
);
|
||||
break;
|
||||
case "LS_SINGLE_IMPORT":
|
||||
console.info(
|
||||
"AirDC++ finished a download -> "
|
||||
);
|
||||
console.log(data);
|
||||
await this.broker.call(
|
||||
"library.importDownloadedComic",
|
||||
{ bundle: data },
|
||||
{}
|
||||
);
|
||||
break;
|
||||
// uncompress archive events
|
||||
case "COMICBOOK_EXTRACTION_SUCCESS":
|
||||
console.log(data);
|
||||
return data;
|
||||
}
|
||||
whitelist: [
|
||||
"socket.*", // Allow 'search' in the automated namespace
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -77,12 +39,347 @@ export default class SocketService extends Service {
|
||||
},
|
||||
},
|
||||
hooks: {},
|
||||
actions: {},
|
||||
methods: {},
|
||||
actions: {
|
||||
resumeSession: async (ctx: Context<{ sessionId: string }>) => {
|
||||
const { sessionId } = ctx.params;
|
||||
console.log("Attempting to resume session...");
|
||||
try {
|
||||
const sessionRecord = await Session.find({
|
||||
sessionId,
|
||||
});
|
||||
// 1. Check for sessionId's existence, and a match
|
||||
if (
|
||||
sessionRecord.length !== 0 &&
|
||||
sessionRecord[0].sessionId === sessionId
|
||||
) {
|
||||
// 2. Find if the queue has active, paused or waiting jobs
|
||||
const jobs: JobType = await this.broker.call(
|
||||
"jobqueue.getJobCountsByType",
|
||||
{}
|
||||
);
|
||||
const { active, paused, waiting } = jobs;
|
||||
|
||||
if (active > 0 || paused > 0 || waiting > 0) {
|
||||
// 3. Get job counts
|
||||
const completedJobCount = await pubClient.get(
|
||||
"completedJobCount"
|
||||
);
|
||||
const failedJobCount = await pubClient.get(
|
||||
"failedJobCount"
|
||||
);
|
||||
|
||||
// 4. Send the counts to the active socket.io session
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION",
|
||||
args: [
|
||||
{
|
||||
completedJobCount,
|
||||
failedJobCount,
|
||||
queueStatus: "running",
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
throw new MoleculerError(
|
||||
err,
|
||||
500,
|
||||
"SESSION_ID_NOT_FOUND",
|
||||
{
|
||||
data: sessionId,
|
||||
}
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
setQueueStatus: async (
|
||||
ctx: Context<{
|
||||
queueAction: string;
|
||||
queueStatus: string;
|
||||
}>
|
||||
) => {
|
||||
const { queueAction } = ctx.params;
|
||||
await this.broker.call(
|
||||
"jobqueue.toggle",
|
||||
{ action: queueAction },
|
||||
{}
|
||||
);
|
||||
},
|
||||
importSingleIssue: async (ctx: Context<{}>) => {
|
||||
console.info("AirDC++ finished a download -> ");
|
||||
console.log(ctx.params);
|
||||
// await this.broker.call(
|
||||
// "library.importDownloadedComic",
|
||||
// { bundle: data },
|
||||
// {}
|
||||
// );
|
||||
},
|
||||
// AirDCPP Socket actions
|
||||
|
||||
search: {
|
||||
params: {
|
||||
query: "object",
|
||||
config: "object",
|
||||
namespace: "string",
|
||||
},
|
||||
async handler(ctx) {
|
||||
const { query, config, namespace } = ctx.params;
|
||||
const namespacedInstance = this.io.of(namespace || "/");
|
||||
const ADCPPSocket = new AirDCPPSocket(config);
|
||||
try {
|
||||
await ADCPPSocket.connect();
|
||||
const instance = await ADCPPSocket.post(
|
||||
"search",
|
||||
query
|
||||
);
|
||||
|
||||
// Send the instance to the client
|
||||
await namespacedInstance.emit("searchInitiated", {
|
||||
instance,
|
||||
});
|
||||
|
||||
// Setting up listeners
|
||||
await ADCPPSocket.addListener(
|
||||
`search`,
|
||||
`search_result_added`,
|
||||
(data) => {
|
||||
namespacedInstance.emit(
|
||||
"searchResultAdded",
|
||||
{
|
||||
groupedResult: data,
|
||||
instanceId: instance.id,
|
||||
}
|
||||
);
|
||||
},
|
||||
instance.id
|
||||
);
|
||||
|
||||
await ADCPPSocket.addListener(
|
||||
`search`,
|
||||
`search_result_updated`,
|
||||
(data) => {
|
||||
console.log({
|
||||
updatedResult: data,
|
||||
instanceId: instance.id,
|
||||
});
|
||||
namespacedInstance.emit(
|
||||
"searchResultUpdated",
|
||||
{
|
||||
updatedResult: data,
|
||||
instanceId: instance.id,
|
||||
}
|
||||
);
|
||||
},
|
||||
instance.id
|
||||
);
|
||||
|
||||
await ADCPPSocket.addListener(
|
||||
`search`,
|
||||
`search_hub_searches_sent`,
|
||||
async (searchInfo) => {
|
||||
await this.sleep(5000);
|
||||
const currentInstance =
|
||||
await ADCPPSocket.get(
|
||||
`search/${instance.id}`
|
||||
);
|
||||
console.log(
|
||||
JSON.stringify(currentInstance, null, 4)
|
||||
);
|
||||
// Send the instance to the client
|
||||
await namespacedInstance.emit(
|
||||
"searchesSent",
|
||||
{
|
||||
searchInfo,
|
||||
}
|
||||
);
|
||||
|
||||
if (currentInstance.result_count === 0) {
|
||||
console.log("No more search results.");
|
||||
namespacedInstance.emit(
|
||||
"searchComplete",
|
||||
{
|
||||
message:
|
||||
"No more search results.",
|
||||
currentInstance,
|
||||
}
|
||||
);
|
||||
}
|
||||
},
|
||||
instance.id
|
||||
);
|
||||
|
||||
// Perform the actual search
|
||||
await ADCPPSocket.post(
|
||||
`search/${instance.id}/hub_search`,
|
||||
query
|
||||
);
|
||||
} catch (error) {
|
||||
await namespacedInstance.emit(
|
||||
"searchError",
|
||||
error.message
|
||||
);
|
||||
throw new MoleculerError(
|
||||
"Search failed",
|
||||
500,
|
||||
"SEARCH_FAILED",
|
||||
{ error }
|
||||
);
|
||||
} finally {
|
||||
await ADCPPSocket.disconnect();
|
||||
}
|
||||
},
|
||||
},
|
||||
download: {
|
||||
// params: {
|
||||
// searchInstanceId: "string",
|
||||
// resultId: "string",
|
||||
// comicObjectId: "string",
|
||||
// name: "string",
|
||||
// size: "number",
|
||||
// type: "any", // Define more specific type if possible
|
||||
// config: "object",
|
||||
// },
|
||||
async handler(ctx) {
|
||||
console.log(ctx.params);
|
||||
const {
|
||||
searchInstanceId,
|
||||
resultId,
|
||||
config,
|
||||
comicObjectId,
|
||||
name,
|
||||
size,
|
||||
type,
|
||||
} = ctx.params;
|
||||
const ADCPPSocket = new AirDCPPSocket(config);
|
||||
try {
|
||||
await ADCPPSocket.connect();
|
||||
const downloadResult = await ADCPPSocket.post(
|
||||
`search/${searchInstanceId}/results/${resultId}/download`
|
||||
);
|
||||
|
||||
if (downloadResult && downloadResult.bundle_info) {
|
||||
// Assume bundle_info is part of the response and contains the necessary details
|
||||
const bundleDBImportResult = await ctx.call(
|
||||
"library.applyAirDCPPDownloadMetadata",
|
||||
{
|
||||
bundleId: downloadResult.bundle_info.id,
|
||||
comicObjectId,
|
||||
name,
|
||||
size,
|
||||
type,
|
||||
}
|
||||
);
|
||||
|
||||
this.logger.info(
|
||||
"Download and metadata update successful",
|
||||
bundleDBImportResult
|
||||
);
|
||||
this.broker.emit(
|
||||
"downloadCompleted",
|
||||
bundleDBImportResult
|
||||
);
|
||||
return bundleDBImportResult;
|
||||
} else {
|
||||
throw new Error(
|
||||
"Failed to download or missing download result information"
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
this.broker.emit("downloadError", error.message);
|
||||
throw new MoleculerError(
|
||||
"Download failed",
|
||||
500,
|
||||
"DOWNLOAD_FAILED",
|
||||
{ error }
|
||||
);
|
||||
} finally {
|
||||
// await ADCPPSocket.disconnect();
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
listenBundleTick: {
|
||||
async handler(ctx) {
|
||||
const { config } = ctx.params;
|
||||
const ADCPPSocket = new AirDCPPSocket(config);
|
||||
|
||||
try {
|
||||
await ADCPPSocket.connect();
|
||||
console.log("Connected to AirDCPP successfully.");
|
||||
|
||||
ADCPPSocket.addListener(
|
||||
"queue",
|
||||
"queue_bundle_tick",
|
||||
(tickData) => {
|
||||
console.log(
|
||||
"Received tick data: ",
|
||||
tickData
|
||||
);
|
||||
this.io.emit("bundleTickUpdate", tickData);
|
||||
},
|
||||
null
|
||||
); // Assuming no specific ID is needed here
|
||||
} catch (error) {
|
||||
console.error(
|
||||
"Error connecting to AirDCPP or setting listener:",
|
||||
error
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
sleep: (ms: number): Promise<NodeJS.Timeout> => {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
},
|
||||
},
|
||||
async started() {
|
||||
this.io.on("connection", (data) =>
|
||||
console.log("socket.io server initialized.")
|
||||
);
|
||||
this.logger.info("Starting Socket Service...");
|
||||
this.logger.debug("pubClient:", pubClient);
|
||||
this.logger.debug("subClient:", subClient);
|
||||
if (!pubClient || !subClient) {
|
||||
this.logger.error("Redis clients are not initialized!");
|
||||
throw new Error("Redis clients are not initialized!");
|
||||
}
|
||||
|
||||
// Additional checks or logic if necessary
|
||||
if (pubClient.status !== "ready") {
|
||||
await pubClient.connect();
|
||||
}
|
||||
|
||||
if (subClient.status !== "ready") {
|
||||
await subClient.connect();
|
||||
}
|
||||
this.io.on("connection", async (socket) => {
|
||||
console.log(
|
||||
`socket.io server connected to client with session ID: ${socket.id}`
|
||||
);
|
||||
console.log("Looking up sessionId in Mongo...");
|
||||
const sessionIdExists = await Session.find({
|
||||
sessionId: socket.handshake.query.sessionId,
|
||||
});
|
||||
// 1. if sessionId isn't found in Mongo, create one and persist it
|
||||
if (sessionIdExists.length === 0) {
|
||||
console.log(
|
||||
`Socket Id ${socket.id} not found in Mongo, creating a new session...`
|
||||
);
|
||||
const sessionId = uuidv4();
|
||||
socket.sessionId = sessionId;
|
||||
console.log(`Saving session ${sessionId} to Mongo...`);
|
||||
await Session.create({
|
||||
sessionId,
|
||||
socketId: socket.id,
|
||||
});
|
||||
socket.emit("sessionInitialized", sessionId);
|
||||
}
|
||||
// 2. else, retrieve it from Mongo and "resume" the socket.io connection
|
||||
else {
|
||||
console.log(`Found socketId ${socket.id}, no-op.`);
|
||||
}
|
||||
});
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
99
services/torrentjobs.service.ts
Normal file
99
services/torrentjobs.service.ts
Normal file
@@ -0,0 +1,99 @@
|
||||
"use strict";
|
||||
import {
|
||||
Context,
|
||||
Service,
|
||||
ServiceBroker,
|
||||
ServiceSchema,
|
||||
Errors,
|
||||
} from "moleculer";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
import BullMqMixin from "moleculer-bullmq";
|
||||
import { pubClient } from "../config/redis.config";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
|
||||
export default class ImageTransformation extends Service {
|
||||
// @ts-ignore
|
||||
public constructor(
|
||||
public broker: ServiceBroker,
|
||||
schema: ServiceSchema<{}> = { name: "torrentjobs" }
|
||||
) {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "torrentjobs",
|
||||
mixins: [DbMixin("comics", Comic), BullMqMixin],
|
||||
settings: {
|
||||
bullmq: {
|
||||
client: pubClient,
|
||||
},
|
||||
},
|
||||
hooks: {},
|
||||
actions: {
|
||||
getTorrentData: {
|
||||
queue: true,
|
||||
rest: "GET /getTorrentData",
|
||||
handler: async (ctx: Context<{ trigger: string }>) => {
|
||||
const { trigger } = ctx.params;
|
||||
console.log(`Recieved ${trigger} as the trigger...`);
|
||||
|
||||
const jobOptions = {
|
||||
jobId: "retrieveTorrentData",
|
||||
name: "bossy",
|
||||
repeat: {
|
||||
every: 10000, // Repeat every 10000 ms
|
||||
limit: 100, // Limit to 100 repeats
|
||||
},
|
||||
};
|
||||
|
||||
const job = await this.localQueue(
|
||||
ctx,
|
||||
"fetchTorrentData",
|
||||
ctx.params,
|
||||
jobOptions
|
||||
);
|
||||
return job;
|
||||
},
|
||||
},
|
||||
fetchTorrentData: {
|
||||
rest: "GET /fetchTorrentData",
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
birdName: String;
|
||||
}>
|
||||
) => {
|
||||
const repeatableJob = await this.$resolve(
|
||||
"torrentjobs"
|
||||
).getRepeatableJobs();
|
||||
console.info(repeatableJob);
|
||||
console.info(
|
||||
`Scheduled job for fetching torrent data fired.`
|
||||
);
|
||||
// 1. query mongo for infohashes
|
||||
const infoHashes = await this.broker.call(
|
||||
"library.getInfoHashes",
|
||||
{}
|
||||
);
|
||||
// 2. query qbittorrent to see if they exist
|
||||
const torrents: any = await this.broker.call(
|
||||
"qbittorrent.getTorrentRealTimeStats",
|
||||
{ infoHashes }
|
||||
);
|
||||
// 4.
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "AS_TORRENT_DATA",
|
||||
args: [
|
||||
{
|
||||
torrents,
|
||||
},
|
||||
],
|
||||
});
|
||||
// 3. If they do, don't do anything
|
||||
// 4. If they don't purge them from mongo
|
||||
},
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
});
|
||||
}
|
||||
}
|
||||
73
shared/airdcpp.socket.ts
Normal file
73
shared/airdcpp.socket.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
const WebSocket = require("ws");
|
||||
const { Socket } = require("airdcpp-apisocket");
|
||||
|
||||
class AirDCPPSocket {
|
||||
// Explicitly declare properties
|
||||
options; // Holds configuration options
|
||||
socketInstance; // Instance of the AirDCPP Socket
|
||||
|
||||
constructor(configuration: any) {
|
||||
let socketProtocol = configuration.protocol === "https" ? "wss" : "ws";
|
||||
this.options = {
|
||||
url: `${socketProtocol}://${configuration.hostname}/api/v1/`,
|
||||
autoReconnect: true,
|
||||
reconnectInterval: 5000, // milliseconds
|
||||
logLevel: "verbose",
|
||||
ignoredListenerEvents: [
|
||||
"transfer_statistics",
|
||||
"hash_statistics",
|
||||
"hub_counts_updated",
|
||||
],
|
||||
username: configuration.username,
|
||||
password: configuration.password,
|
||||
};
|
||||
// Initialize the socket instance using the configured options and WebSocket
|
||||
this.socketInstance = Socket(this.options, WebSocket);
|
||||
}
|
||||
|
||||
// Method to ensure the socket connection is established if required by the library or implementation logic
|
||||
async connect() {
|
||||
// Here we'll check if a connect method exists and call it
|
||||
if (
|
||||
this.socketInstance &&
|
||||
typeof this.socketInstance.connect === "function"
|
||||
) {
|
||||
const sessionInformation = await this.socketInstance.connect();
|
||||
return sessionInformation;
|
||||
}
|
||||
}
|
||||
|
||||
// Method to ensure the socket is disconnected properly if required by the library or implementation logic
|
||||
async disconnect() {
|
||||
// Similarly, check if a disconnect method exists and call it
|
||||
if (
|
||||
this.socketInstance &&
|
||||
typeof this.socketInstance.disconnect === "function"
|
||||
) {
|
||||
await this.socketInstance.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
// Method to post data to an endpoint
|
||||
async post(endpoint: any, data: any = {}) {
|
||||
// Call post on the socket instance, assuming post is a valid method of the socket instance
|
||||
return await this.socketInstance.post(endpoint, data);
|
||||
}
|
||||
async get(endpoint: any, data: any = {}) {
|
||||
// Call post on the socket instance, assuming post is a valid method of the socket instance
|
||||
return await this.socketInstance.get(endpoint, data);
|
||||
}
|
||||
|
||||
// Method to add listeners to the socket instance for handling real-time updates or events
|
||||
async addListener(event: any, handlerName: any, callback: any, id: any) {
|
||||
// Attach a listener to the socket instance
|
||||
return await this.socketInstance.addListener(
|
||||
event,
|
||||
handlerName,
|
||||
callback,
|
||||
id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export default AirDCPPSocket;
|
||||
@@ -21,7 +21,7 @@ const ALLOWED_IMAGE_FILE_FORMATS = [".jpg", ".jpeg", ".png"];
|
||||
// Tell FileMagic where to find the magic.mgc file
|
||||
FileMagic.magicFile = require.resolve("@npcz/magic/dist/magic.mgc");
|
||||
|
||||
// We can onlu use MAGIC_PRESERVE_ATIME on operating suystems that support
|
||||
// We can only use MAGIC_PRESERVE_ATIME on operating suystems that support
|
||||
// it and that includes OS X for example. It's a good practice as we don't
|
||||
// want to change the last access time because we are just checking the file
|
||||
// contents type
|
||||
@@ -108,6 +108,14 @@ export const isValidImageFileExtension = (fileName: string): boolean => {
|
||||
return includes(ALLOWED_IMAGE_FILE_FORMATS, path.extname(fileName));
|
||||
};
|
||||
|
||||
/**
|
||||
* This function constructs paths for a target extraction folder and an input file based on extraction
|
||||
* options and a walked folder.
|
||||
* @param {IExtractionOptions} extractionOptions - An object containing options for the extraction
|
||||
* process, such as the target extraction folder.
|
||||
* @param {IFolderData} walkedFolder - `walkedFolder` is an object that represents a folder that has
|
||||
* been walked through during a file extraction process. It contains the following properties:
|
||||
*/
|
||||
export const constructPaths = (
|
||||
extractionOptions: IExtractionOptions,
|
||||
walkedFolder: IFolderData
|
||||
@@ -142,7 +150,7 @@ export const getFileConstituents = (filePath: string) => {
|
||||
};
|
||||
|
||||
/**
|
||||
* Method that infers MIME type from a filepath
|
||||
* Method that infers MIME type from a filepath
|
||||
* @param {string} filePath
|
||||
* @returns {Promise} string
|
||||
*/
|
||||
@@ -155,6 +163,15 @@ export const getMimeType = async (filePath: string) => {
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* This function creates a directory at a specified path using the fse.ensureDir method and throws an
|
||||
* error if it fails.
|
||||
* @param {any} options - The options parameter is an optional object that can be passed to the
|
||||
* fse.ensureDir method to configure its behavior. It can include properties such as mode, which sets
|
||||
* the permissions of the directory, and fs, which specifies the file system module to use.
|
||||
* @param {string} directoryPath - The `directoryPath` parameter is a string that represents the path
|
||||
* of the directory that needs to be created.
|
||||
*/
|
||||
export const createDirectory = async (options: any, directoryPath: string) => {
|
||||
try {
|
||||
await fse.ensureDir(directoryPath, options);
|
||||
|
||||
@@ -47,6 +47,7 @@ import {
|
||||
getMimeType,
|
||||
} from "../utils/file.utils";
|
||||
import { convertXMLToJSON } from "./xml.utils";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
const fse = require("fs-extra");
|
||||
const Unrar = require("unrar");
|
||||
interface RarFile {
|
||||
@@ -73,7 +74,7 @@ const errors = [];
|
||||
*/
|
||||
export const extractComicInfoXMLFromRar = async (
|
||||
filePath: string,
|
||||
mimeType: string,
|
||||
mimeType: string
|
||||
): Promise<any> => {
|
||||
try {
|
||||
// Create the target directory
|
||||
@@ -209,7 +210,7 @@ export const extractComicInfoXMLFromRar = async (
|
||||
|
||||
export const extractComicInfoXMLFromZip = async (
|
||||
filePath: string,
|
||||
mimeType: string,
|
||||
mimeType: string
|
||||
): Promise<any> => {
|
||||
try {
|
||||
// Create the target directory
|
||||
@@ -254,7 +255,7 @@ export const extractComicInfoXMLFromZip = async (
|
||||
// Push the first file (cover) to our extraction target
|
||||
extractionTargets.push(files[0].name);
|
||||
filesToWriteToDisk.coverFile = path.basename(files[0].name);
|
||||
|
||||
|
||||
if (!isEmpty(comicInfoXMLFileObject)) {
|
||||
filesToWriteToDisk.comicInfoXML = comicInfoXMLFileObject[0].name;
|
||||
extractionTargets.push(filesToWriteToDisk.comicInfoXML);
|
||||
@@ -356,18 +357,26 @@ export const extractFromArchive = async (filePath: string) => {
|
||||
switch (mimeType) {
|
||||
case "application/x-7z-compressed; charset=binary":
|
||||
case "application/zip; charset=binary":
|
||||
const cbzResult = await extractComicInfoXMLFromZip(filePath, mimeType);
|
||||
const cbzResult = await extractComicInfoXMLFromZip(
|
||||
filePath,
|
||||
mimeType
|
||||
);
|
||||
return Object.assign({}, ...cbzResult);
|
||||
|
||||
case "application/x-rar; charset=binary":
|
||||
const cbrResult = await extractComicInfoXMLFromRar(filePath, mimeType);
|
||||
const cbrResult = await extractComicInfoXMLFromRar(
|
||||
filePath,
|
||||
mimeType
|
||||
);
|
||||
return Object.assign({}, ...cbrResult);
|
||||
|
||||
default:
|
||||
console.log(
|
||||
console.error(
|
||||
"Error inferring filetype for comicinfo.xml extraction."
|
||||
);
|
||||
break;
|
||||
throw new MoleculerError({}, 500, "FILETYPE_INFERENCE_ERROR", {
|
||||
data: { message: "Cannot infer filetype." },
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user