From 1d48499c6491d295a5d6f8bb7e4b818a048f9960 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Thu, 24 Oct 2024 10:59:09 -0400 Subject: [PATCH] Revert "Merge branch 'master' into getbundles-fix" This reverts commit 30168844f32ec2aeb8696cea7ae87c27ac03bab1, reversing changes made to 2e60e2e3d53e4e562bf22270484700076247bd60. --- Dockerfile | 7 +- config/redis.config.ts | 36 ++---- docker-compose.env | 10 +- docker-compose.yml | 151 +++++++------------------ mixins/db.mixin.ts | 69 +++--------- moleculer.config.ts | 3 +- package-lock.json | 7 +- package.json | 11 +- scripts/start.sh | 26 ----- scripts/wait-for-it.sh | 190 -------------------------------- services/jobqueue.service.ts | 34 +++--- services/library.service.ts | 150 ++++++++++++------------- services/socket.service.ts | 17 +-- services/torrentjobs.service.ts | 3 +- 14 files changed, 168 insertions(+), 546 deletions(-) delete mode 100755 scripts/start.sh delete mode 100755 scripts/wait-for-it.sh diff --git a/Dockerfile b/Dockerfile index 5c202e8..4c2f202 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,8 +42,6 @@ RUN node -v && npm -v COPY package.json package-lock.json ./ COPY moleculer.config.ts ./ COPY tsconfig.json ./ -COPY scripts ./scripts -RUN chmod +x ./scripts/* # Install application dependencies RUN npm install @@ -52,8 +50,9 @@ RUN npm install -g typescript ts-node # Copy the rest of the application files COPY . . -# clean up -RUN npm prune +# Build and clean up +RUN npm run build \ + && npm prune # Expose the application's port EXPOSE 3000 diff --git a/config/redis.config.ts b/config/redis.config.ts index be50db6..3d6cf0b 100644 --- a/config/redis.config.ts +++ b/config/redis.config.ts @@ -1,30 +1,10 @@ -// Import the Redis library -import IORedis from "ioredis"; +import { createClient } from "redis"; +const redisURL = new URL(process.env.REDIS_URI); -// Environment variable for Redis URI -const redisURI = process.env.REDIS_URI || "redis://localhost:6379"; -console.log(`process.env.REDIS_URI is ${process.env.REDIS_URI}`); -// Creating the publisher client -const pubClient = new IORedis(redisURI); +const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` }); +(async () => { + await pubClient.connect(); +})(); +const subClient = pubClient.duplicate(); -// Creating the subscriber client -const subClient = new IORedis(redisURI); - -// Handle connection events for the publisher -pubClient.on("connect", () => { - console.log("Publisher client connected to Redis."); -}); -pubClient.on("error", (err) => { - console.error("Publisher client failed to connect to Redis:", err); -}); - -// Handle connection events for the subscriber -subClient.on("connect", () => { - console.log("Subscriber client connected to Redis."); -}); -subClient.on("error", (err) => { - console.error("Subscriber client failed to connect to Redis:", err); -}); - -// Export the clients for use in other parts of the application -export { pubClient, subClient }; +export { subClient, pubClient }; diff --git a/docker-compose.env b/docker-compose.env index 6c801ff..68dcb92 100644 --- a/docker-compose.env +++ b/docker-compose.env @@ -3,15 +3,7 @@ LOGGER=true LOGLEVEL=info SERVICEDIR=dist/services -VITE_UNDERLYING_HOST=localhost -COMICS_DIRECTORY=/Users/rishi/work/threetwo-core-service/comics -USERDATA_DIRECTORY=/Users/rishi/work/threetwo-core-service/userdata -REDIS_URI=redis://redis:6379 -KAFKA_BROKER=kafka1:9092 -ELASTICSEARCH_URI=http://elasticsearch:9200 -MONGO_URI=mongodb://db:27017/threetwo -UNRAR_BIN_PATH=/opt/homebrew/bin/unrar -SEVENZ_BINARY_PATH=/opt/homebrew/bin/7za +TRANSPORTER=nats://nats:4222 CACHER=Memory diff --git a/docker-compose.yml b/docker-compose.yml index d93a7a0..692eb3a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,125 +1,58 @@ -x-userdata-volume: &userdata-volume - type: bind - source: ${USERDATA_DIRECTORY} - target: /userdata - -x-comics-volume: &comics-volume - type: bind - source: ${COMICS_DIRECTORY} - target: /comics +version: "3.3" services: - core-services: + + api: build: - # context: https://github.com/rishighan/threetwo-core-service.git - context: ./ - dockerfile: Dockerfile - image: frishi/threetwo-core-service - container_name: core-services - ports: - - "3000:3000" - - "3001:3001" - depends_on: - - db - - redis - - elasticsearch - - kafka1 - - zoo1 - environment: - name: core-services - SERVICES: api,library,imagetransformation,opds,search,settings,jobqueue,socket,torrentjobs + context: . + image: threetwo-library-service env_file: docker-compose.env - volumes: - - *comics-volume - - *userdata-volume - networks: - - proxy - - zoo1: - image: confluentinc/cp-zookeeper:7.3.2 - hostname: zoo1 - container_name: zoo1 - ports: - - "2181:2181" environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_SERVER_ID: 1 - ZOOKEEPER_SERVERS: zoo1:2888:3888 - networks: - - proxy - - kafka1: - image: confluentinc/cp-kafka:7.3.2 - hostname: kafka1 - container_name: kafka1 - ports: - - "9092:9092" - - "29092:29092" - - "9999:9999" - environment: - KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1} :9092,DOCKER://host.docker.internal:29092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL - KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" - KAFKA_BROKER_ID: 1 - KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state. change.logger=INFO" - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_JMX_PORT: 9999 - KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1} - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" + SERVICES: api + PORT: 3000 depends_on: - - zoo1 + - nats + labels: + - "traefik.enable=true" + - "traefik.http.routers.api-gw.rule=PathPrefix(`/`)" + - "traefik.http.services.api-gw.loadbalancer.server.port=3000" networks: - - proxy + - internal - db: - image: "mongo:latest" - container_name: database + greeter: + build: + context: . + image: threetwo-library-service + env_file: docker-compose.env + environment: + SERVICES: greeter + depends_on: + - nats networks: - - proxy + - internal + + nats: + image: nats:2 + networks: + - internal + + traefik: + image: traefik:v2.1 + command: + - "--api.insecure=true" # Don't do that in production! + - "--providers.docker=true" + - "--providers.docker.exposedbydefault=false" ports: - - "27017:27017" + - 3000:80 + - 3001:8080 volumes: - - "mongodb_data:/bitnami/mongodb" - - redis: - image: "bitnami/redis:latest" - container_name: redis - hostname: redis - environment: - ALLOW_EMPTY_PASSWORD: "yes" + - /var/run/docker.sock:/var/run/docker.sock:ro networks: - - proxy - ports: - - "6379:6379" - - elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2 - container_name: elasticsearch - environment: - - "discovery.type=single-node" - - "ES_JAVA_OPTS=-Xms512m -Xmx512m" - - "xpack.security.enabled=true" - - "xpack.security.authc.api_key.enabled=true" - - "ELASTIC_PASSWORD=password" - ulimits: - memlock: - soft: -1 - hard: -1 - ports: - - 9200:9200 - networks: - - proxy + - internal + - default networks: - proxy: - external: true + internal: volumes: - mongodb_data: - driver: local - elasticsearch: - driver: local + data: diff --git a/mixins/db.mixin.ts b/mixins/db.mixin.ts index c999dcf..95542c0 100644 --- a/mixins/db.mixin.ts +++ b/mixins/db.mixin.ts @@ -2,60 +2,21 @@ const path = require("path"); const mkdir = require("mkdirp").sync; const DbService = require("moleculer-db"); + export const DbMixin = (collection, model) => { - if (!process.env.MONGO_URI) { - console.log("MONGO_URI not provided, initializing local storage..."); - mkdir(path.resolve("./data")); - return { mixins: [DbService] }; // Handle case where no DB URI is provided + if (process.env.MONGO_URI) { + const MongooseAdapter = require("moleculer-db-adapter-mongoose"); + return { + mixins: [DbService], + adapter: new MongooseAdapter(process.env.MONGO_URI, { + user: process.env.MONGO_INITDB_ROOT_USERNAME, + pass: process.env.MONGO_INITDB_ROOT_PASSWORD, + keepAlive: true, + useUnifiedTopology: true, + family: 4, + }), + model, + }; } - - const MongooseAdapter = require("moleculer-db-adapter-mongoose"); - const adapter = new MongooseAdapter(process.env.MONGO_URI, { - user: process.env.MONGO_INITDB_ROOT_USERNAME, - pass: process.env.MONGO_INITDB_ROOT_PASSWORD, - keepAlive: true, - useNewUrlParser: true, - useUnifiedTopology: true, - }); - - const connectWithRetry = async ( - adapter, - maxRetries = 5, - interval = 5000 - ) => { - for (let retry = 0; retry < maxRetries; retry++) { - try { - await adapter.connect(); - console.log("MongoDB connected successfully!"); - return; - } catch (err) { - console.error("MongoDB connection error:", err); - console.log( - `Retrying MongoDB connection in ${ - interval / 1000 - } seconds...` - ); - await new Promise((resolve) => setTimeout(resolve, interval)); - } - } - console.error("Failed to connect to MongoDB after several attempts."); - }; - - return { - mixins: [DbService], - adapter, - model, - collection, - async started() { - await connectWithRetry(this.adapter); - }, - async stopped() { - try { - await this.adapter.disconnect(); - console.log("MongoDB disconnected"); - } catch (err) { - console.error("MongoDB disconnection error:", err); - } - }, - }; + mkdir(path.resolve("./data")); }; diff --git a/moleculer.config.ts b/moleculer.config.ts index d0a0b79..69e2ce2 100644 --- a/moleculer.config.ts +++ b/moleculer.config.ts @@ -5,7 +5,6 @@ import { MetricRegistry, ServiceBroker, } from "moleculer"; -const RedisTransporter = require("moleculer").Transporters.Redis; /** * Moleculer ServiceBroker configuration file @@ -91,7 +90,7 @@ const brokerConfig: BrokerOptions = { // More info: https://moleculer.services/docs/0.14/networking.html // Note: During the development, you don't need to define it because all services will be loaded locally. // In production you can set it via `TRANSPORTER=nats://localhost:4222` environment variable. - transporter: new RedisTransporter(process.env.REDIS_URI), + transporter: process.env.REDIS_URI || "redis://localhost:6379", // Define a cacher. // More info: https://moleculer.services/docs/0.14/caching.html diff --git a/package-lock.json b/package-lock.json index 6ef7258..662c092 100644 --- a/package-lock.json +++ b/package-lock.json @@ -38,16 +38,16 @@ "mkdirp": "^0.5.5", "moleculer-bullmq": "^3.0.0", "moleculer-db": "^0.8.23", - "moleculer-db-adapter-mongoose": "^0.9.4", + "moleculer-db-adapter-mongoose": "^0.9.2", "moleculer-io": "^2.2.0", - "moleculer-web": "^0.10.7", + "moleculer-web": "^0.10.5", "mongoosastic-ts": "^6.0.3", "mongoose": "^6.10.4", "mongoose-paginate-v2": "^1.3.18", "nats": "^1.3.2", "opds-extra": "^3.0.10", "p7zip-threetwo": "^1.0.4", - "redis": "^4.6.14", + "redis": "^4.6.5", "sanitize-filename-ts": "^1.0.2", "sharp": "^0.33.3", "threetwo-ui-typings": "^1.0.14", @@ -63,7 +63,6 @@ "eslint-plugin-import": "^2.20.2", "eslint-plugin-prefer-arrow": "^1.2.2", "install": "^0.13.0", - "ioredis": "^5.4.1", "jest": "^29.5.0", "jest-cli": "^29.5.0", "moleculer-repl": "^0.7.0", diff --git a/package.json b/package.json index e4de15a..cb57c85 100644 --- a/package.json +++ b/package.json @@ -4,8 +4,8 @@ "description": "Endpoints for common operations in ThreeTwo", "scripts": { "build": "tsc --build tsconfig.json", - "dev": "./scripts/start.sh dev", - "start": "npm run build && ./scripts/start.sh prod", + "dev": "ts-node ./node_modules/moleculer/bin/moleculer-runner.js --hot --repl --config moleculer.config.ts services/**/*.service.ts", + "start": "moleculer-runner --config dist/moleculer.config.js", "cli": "moleculer connect NATS", "ci": "jest --watch", "test": "jest --coverage", @@ -27,7 +27,6 @@ "eslint-plugin-import": "^2.20.2", "eslint-plugin-prefer-arrow": "^1.2.2", "install": "^0.13.0", - "ioredis": "^5.4.1", "jest": "^29.5.0", "jest-cli": "^29.5.0", "moleculer-repl": "^0.7.0", @@ -69,16 +68,16 @@ "mkdirp": "^0.5.5", "moleculer-bullmq": "^3.0.0", "moleculer-db": "^0.8.23", - "moleculer-db-adapter-mongoose": "^0.9.4", + "moleculer-db-adapter-mongoose": "^0.9.2", "moleculer-io": "^2.2.0", - "moleculer-web": "^0.10.7", + "moleculer-web": "^0.10.5", "mongoosastic-ts": "^6.0.3", "mongoose": "^6.10.4", "mongoose-paginate-v2": "^1.3.18", "nats": "^1.3.2", "opds-extra": "^3.0.10", "p7zip-threetwo": "^1.0.4", - "redis": "^4.6.14", + "redis": "^4.6.5", "sanitize-filename-ts": "^1.0.2", "sharp": "^0.33.3", "threetwo-ui-typings": "^1.0.14", diff --git a/scripts/start.sh b/scripts/start.sh deleted file mode 100755 index f35cb9b..0000000 --- a/scripts/start.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash -echo "Starting script with mode: $MODE" - -# Extract the host and port from MONGO_URI -HOST_PORT=$(echo $MONGO_URI | sed -e 's/mongodb:\/\///' -e 's/\/.*$//') - -# Assuming the script is called from the project root -PROJECT_ROOT=$(pwd) -echo "Project root: $PROJECT_ROOT" - -CONFIG_PATH="$PROJECT_ROOT/moleculer.config.ts" -echo "Configuration path: $CONFIG_PATH" - -# Set the correct path for moleculer-runner based on the mode -if [ "$MODE" == "dev" ]; then - # For development: use ts-node - MOLECULER_RUNNER="ts-node $PROJECT_ROOT/node_modules/moleculer/bin/moleculer-runner.js --hot --repl --config $CONFIG_PATH $PROJECT_ROOT/services/**/*.service.ts" - echo "Moleculer Runner for dev: $MOLECULER_RUNNER" -else - # For production: direct node execution of the compiled JavaScript - MOLECULER_RUNNER="moleculer-runner --config $PROJECT_ROOT/dist/moleculer.config.js $PROJECT_ROOT/dist/services/**/*.service.js" - echo "Moleculer Runner for prod: $MOLECULER_RUNNER" -fi - -# Run wait-for-it, then start the application -./scripts/wait-for-it.sh $HOST_PORT -- $MOLECULER_RUNNER diff --git a/scripts/wait-for-it.sh b/scripts/wait-for-it.sh deleted file mode 100755 index c092856..0000000 --- a/scripts/wait-for-it.sh +++ /dev/null @@ -1,190 +0,0 @@ -#!/usr/bin/env bash -# Use this script to test if a given TCP host/port are available - -WAITFORIT_cmdname=${0##*/} -if [[ $OSTYPE == 'darwin'* ]]; then - if ! command -v gtimeout &> /dev/null - then - echo "missing gtimeout (`brew install coreutils`)" - exit - fi - alias timeout=gtimeout -fi - -echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } - -usage() -{ - cat << USAGE >&2 -Usage: - $WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args] - -h HOST | --host=HOST Host or IP under test - -p PORT | --port=PORT TCP port under test - Alternatively, you specify the host and port as host:port - -s | --strict Only execute subcommand if the test succeeds - -q | --quiet Don't output any status messages - -t TIMEOUT | --timeout=TIMEOUT - Timeout in seconds, zero for no timeout - -- COMMAND ARGS Execute command with args after the test finishes -USAGE - exit 1 -} - -wait_for() -{ - if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then - echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" - else - echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout" - fi - WAITFORIT_start_ts=$(date +%s) - while : - do - if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then - nc -z $WAITFORIT_HOST $WAITFORIT_PORT - WAITFORIT_result=$? - else - (echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1 - WAITFORIT_result=$? - fi - if [[ $WAITFORIT_result -eq 0 ]]; then - WAITFORIT_end_ts=$(date +%s) - echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds" - break - fi - sleep 1 - done - return $WAITFORIT_result -} - -wait_for_wrapper() -{ - # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 - if [[ $WAITFORIT_QUIET -eq 1 ]]; then - timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & - else - timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & - fi - WAITFORIT_PID=$! - trap "kill -INT -$WAITFORIT_PID" INT - wait $WAITFORIT_PID - WAITFORIT_RESULT=$? - if [[ $WAITFORIT_RESULT -ne 0 ]]; then - echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" - fi - return $WAITFORIT_RESULT -} - -# process arguments -while [[ $# -gt 0 ]] -do - case "$1" in - *:* ) - WAITFORIT_hostport=(${1//:/ }) - WAITFORIT_HOST=${WAITFORIT_hostport[0]} - WAITFORIT_PORT=${WAITFORIT_hostport[1]} - shift 1 - ;; - --child) - WAITFORIT_CHILD=1 - shift 1 - ;; - -q | --quiet) - WAITFORIT_QUIET=1 - shift 1 - ;; - -s | --strict) - WAITFORIT_STRICT=1 - shift 1 - ;; - -h) - WAITFORIT_HOST="$2" - if [[ $WAITFORIT_HOST == "" ]]; then break; fi - shift 2 - ;; - --host=*) - WAITFORIT_HOST="${1#*=}" - shift 1 - ;; - -p) - WAITFORIT_PORT="$2" - if [[ $WAITFORIT_PORT == "" ]]; then break; fi - shift 2 - ;; - --port=*) - WAITFORIT_PORT="${1#*=}" - shift 1 - ;; - -t) - WAITFORIT_TIMEOUT="$2" - if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi - shift 2 - ;; - --timeout=*) - WAITFORIT_TIMEOUT="${1#*=}" - shift 1 - ;; - --) - shift - WAITFORIT_CLI=("$@") - break - ;; - --help) - usage - ;; - *) - echoerr "Unknown argument: $1" - usage - ;; - esac -done - -if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then - echoerr "Error: you need to provide a host and port to test." - usage -fi - -WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15} -WAITFORIT_STRICT=${WAITFORIT_STRICT:-0} -WAITFORIT_CHILD=${WAITFORIT_CHILD:-0} -WAITFORIT_QUIET=${WAITFORIT_QUIET:-0} - -# Check to see if timeout is from busybox? -WAITFORIT_TIMEOUT_PATH=$(type -p timeout) -WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH) - -WAITFORIT_BUSYTIMEFLAG="" -if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then - WAITFORIT_ISBUSY=1 - # Check if busybox timeout uses -t flag - # (recent Alpine versions don't support -t anymore) - if timeout &>/dev/stdout | grep -q -e '-t '; then - WAITFORIT_BUSYTIMEFLAG="-t" - fi -else - WAITFORIT_ISBUSY=0 -fi - -if [[ $WAITFORIT_CHILD -gt 0 ]]; then - wait_for - WAITFORIT_RESULT=$? - exit $WAITFORIT_RESULT -else - if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then - wait_for_wrapper - WAITFORIT_RESULT=$? - else - wait_for - WAITFORIT_RESULT=$? - fi -fi - -if [[ $WAITFORIT_CLI != "" ]]; then - if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then - echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess" - exit $WAITFORIT_RESULT - fi - exec "${WAITFORIT_CLI[@]}" -else - exit $WAITFORIT_RESULT -fi diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index d644563..830e18c 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -14,6 +14,7 @@ import { pubClient } from "../config/redis.config"; import path from "path"; const { MoleculerError } = require("moleculer").Errors; +console.log(process.env.REDIS_URI); export default class JobQueueService extends Service { public constructor(public broker: ServiceBroker) { super(broker); @@ -21,10 +22,9 @@ export default class JobQueueService extends Service { name: "jobqueue", hooks: {}, mixins: [DbMixin("comics", Comic), BullMqMixin], - settings: { bullmq: { - client: pubClient, + client: process.env.REDIS_URI, }, }, actions: { @@ -57,24 +57,20 @@ export default class JobQueueService extends Service { handler: async ( ctx: Context<{ action: string; description: string }> ) => { - try { - const { action, description } = ctx.params; - // Enqueue the job - const job = await this.localQueue( - ctx, - action, - {}, - { - priority: 10, - } - ); - console.log(`Job ${job.id} enqueued`); - console.log(`${description}`); + const { action, description } = ctx.params; + // Enqueue the job + const job = await this.localQueue( + ctx, + action, + ctx.params, + { + priority: 10, + } + ); + console.log(`Job ${job.id} enqueued`); + console.log(`${description}`); - return job.id; - } catch (error) { - console.error("Failed to enqueue job:", error); - } + return job.id; }, }, diff --git a/services/library.service.ts b/services/library.service.ts index efeea48..ecf6c44 100644 --- a/services/library.service.ts +++ b/services/library.service.ts @@ -59,11 +59,9 @@ import path from "path"; import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories"; import AirDCPPSocket from "../shared/airdcpp.socket"; -export default class LibraryService extends Service { - public constructor( - public broker: ServiceBroker, - schema: ServiceSchema<{}> = { name: "library" } - ) { +console.log(`MONGO -> ${process.env.MONGO_URI}`); +export default class ImportService extends Service { + public constructor(public broker: ServiceBroker) { super(broker); this.parseServiceSchema({ name: "library", @@ -166,52 +164,78 @@ export default class LibraryService extends Service { }, newImport: { rest: "POST /newImport", - async handler(ctx) { - const { sessionId } = ctx.params; + // params: {}, + async handler( + ctx: Context<{ + extractionOptions?: any; + sessionId: string; + }> + ) { try { - // Initialize Redis counters once at the start of the import - await pubClient.set("completedJobCount", 0); - await pubClient.set("failedJobCount", 0); - - // Convert klaw to use a promise-based approach for better flow control - const files = await this.getComicFiles( - COMICS_DIRECTORY - ); - for (const file of files) { - console.info( - "Found a file at path:", - file.path - ); - const comicExists = await Comic.exists({ - "rawFileDetails.name": path.basename( - file.path, - path.extname(file.path) - ), - }); - - if (!comicExists) { - // Send the extraction job to the queue - await this.broker.call("jobqueue.enqueue", { - fileObject: { - filePath: file.path, - fileSize: file.stats.size, - }, - sessionId, - importType: "new", - action: "enqueue.async", - }); - } else { - console.log( - "Comic already exists in the library." + // Get params to be passed to the import jobs + const { sessionId } = ctx.params; + // 1. Walk the Source folder + klaw(path.resolve(COMICS_DIRECTORY)) + // 1.1 Filter on .cb* extensions + .pipe( + through2.obj(function (item, enc, next) { + let fileExtension = path.extname( + item.path + ); + if ( + [".cbz", ".cbr", ".cb7"].includes( + fileExtension + ) + ) { + this.push(item); + } + next(); + }) + ) + // 1.2 Pipe filtered results to the next step + // Enqueue the job in the queue + .on("data", async (item) => { + console.info( + "Found a file at path: %s", + item.path ); - } - } - console.log("All files traversed."); + let comicExists = await Comic.exists({ + "rawFileDetails.name": `${path.basename( + item.path, + path.extname(item.path) + )}`, + }); + if (!comicExists) { + // 2.1 Reset the job counters in Redis + await pubClient.set( + "completedJobCount", + 0 + ); + await pubClient.set( + "failedJobCount", + 0 + ); + // 2.2 Send the extraction job to the queue + this.broker.call("jobqueue.enqueue", { + fileObject: { + filePath: item.path, + fileSize: item.stats.size, + }, + sessionId, + importType: "new", + action: "enqueue.async", + }); + } else { + console.log( + "Comic already exists in the library." + ); + } + }) + .on("end", () => { + console.log("All files traversed."); + }); } catch (error) { - console.error( - "Error during newImport processing:", - error - ); + console.log(error); } }, }, @@ -840,35 +864,7 @@ export default class LibraryService extends Service { }, }, }, - methods: { - // Method to walk the directory and filter comic files - getComicFiles: (directory) => { - return new Promise((resolve, reject) => { - const files = []; - klaw(directory) - .pipe( - through2.obj(function (item, enc, next) { - const fileExtension = path.extname( - item.path - ); - if ( - [".cbz", ".cbr", ".cb7"].includes( - fileExtension - ) - ) { - this.push(item); - } - next(); - }) - ) - .on("data", (item) => { - files.push(item); - }) - .on("end", () => resolve(files)) - .on("error", (err) => reject(err)); - }); - }, - }, + methods: {}, }); } } diff --git a/services/socket.service.ts b/services/socket.service.ts index 84c4cfd..d7d704d 100644 --- a/services/socket.service.ts +++ b/services/socket.service.ts @@ -1,6 +1,7 @@ "use strict"; import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer"; import { JobType } from "moleculer-bullmq"; +import { createClient } from "redis"; import { createAdapter } from "@socket.io/redis-adapter"; import Session from "../models/session.model"; import { pubClient, subClient } from "../config/redis.config"; @@ -273,22 +274,6 @@ export default class SocketService extends Service { }, }, async started() { - this.logger.info("Starting Socket Service..."); - this.logger.debug("pubClient:", pubClient); - this.logger.debug("subClient:", subClient); - if (!pubClient || !subClient) { - this.logger.error("Redis clients are not initialized!"); - throw new Error("Redis clients are not initialized!"); - } - - // Additional checks or logic if necessary - if (pubClient.status !== "ready") { - await pubClient.connect(); - } - - if (subClient.status !== "ready") { - await subClient.connect(); - } this.io.on("connection", async (socket) => { console.log( `socket.io server connected to client with session ID: ${socket.id}` diff --git a/services/torrentjobs.service.ts b/services/torrentjobs.service.ts index 737ed3f..c9cdf73 100644 --- a/services/torrentjobs.service.ts +++ b/services/torrentjobs.service.ts @@ -9,7 +9,6 @@ import { import { DbMixin } from "../mixins/db.mixin"; import Comic from "../models/comic.model"; import BullMqMixin from "moleculer-bullmq"; -import { pubClient } from "../config/redis.config"; const { MoleculerError } = require("moleculer").Errors; export default class ImageTransformation extends Service { @@ -24,7 +23,7 @@ export default class ImageTransformation extends Service { mixins: [DbMixin("comics", Comic), BullMqMixin], settings: { bullmq: { - client: pubClient, + client: process.env.REDIS_URI, }, }, hooks: {},