diff --git a/Dockerfile b/Dockerfile index 4c2f202..5c202e8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,6 +42,8 @@ RUN node -v && npm -v COPY package.json package-lock.json ./ COPY moleculer.config.ts ./ COPY tsconfig.json ./ +COPY scripts ./scripts +RUN chmod +x ./scripts/* # Install application dependencies RUN npm install @@ -50,9 +52,8 @@ RUN npm install -g typescript ts-node # Copy the rest of the application files COPY . . -# Build and clean up -RUN npm run build \ - && npm prune +# clean up +RUN npm prune # Expose the application's port EXPOSE 3000 diff --git a/config/redis.config.ts b/config/redis.config.ts index 3d6cf0b..be50db6 100644 --- a/config/redis.config.ts +++ b/config/redis.config.ts @@ -1,10 +1,30 @@ -import { createClient } from "redis"; -const redisURL = new URL(process.env.REDIS_URI); +// Import the Redis library +import IORedis from "ioredis"; -const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` }); -(async () => { - await pubClient.connect(); -})(); -const subClient = pubClient.duplicate(); +// Environment variable for Redis URI +const redisURI = process.env.REDIS_URI || "redis://localhost:6379"; +console.log(`process.env.REDIS_URI is ${process.env.REDIS_URI}`); +// Creating the publisher client +const pubClient = new IORedis(redisURI); -export { subClient, pubClient }; +// Creating the subscriber client +const subClient = new IORedis(redisURI); + +// Handle connection events for the publisher +pubClient.on("connect", () => { + console.log("Publisher client connected to Redis."); +}); +pubClient.on("error", (err) => { + console.error("Publisher client failed to connect to Redis:", err); +}); + +// Handle connection events for the subscriber +subClient.on("connect", () => { + console.log("Subscriber client connected to Redis."); +}); +subClient.on("error", (err) => { + console.error("Subscriber client failed to connect to Redis:", err); +}); + +// Export the clients for use in other parts of the application +export { pubClient, subClient }; diff --git a/docker-compose.env b/docker-compose.env index 68dcb92..6c801ff 100644 --- a/docker-compose.env +++ b/docker-compose.env @@ -3,7 +3,15 @@ LOGGER=true LOGLEVEL=info SERVICEDIR=dist/services -TRANSPORTER=nats://nats:4222 +VITE_UNDERLYING_HOST=localhost +COMICS_DIRECTORY=/Users/rishi/work/threetwo-core-service/comics +USERDATA_DIRECTORY=/Users/rishi/work/threetwo-core-service/userdata +REDIS_URI=redis://redis:6379 +KAFKA_BROKER=kafka1:9092 +ELASTICSEARCH_URI=http://elasticsearch:9200 +MONGO_URI=mongodb://db:27017/threetwo +UNRAR_BIN_PATH=/opt/homebrew/bin/unrar +SEVENZ_BINARY_PATH=/opt/homebrew/bin/7za CACHER=Memory diff --git a/docker-compose.yml b/docker-compose.yml index 692eb3a..d93a7a0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,58 +1,125 @@ -version: "3.3" +x-userdata-volume: &userdata-volume + type: bind + source: ${USERDATA_DIRECTORY} + target: /userdata + +x-comics-volume: &comics-volume + type: bind + source: ${COMICS_DIRECTORY} + target: /comics services: - - api: + core-services: build: - context: . - image: threetwo-library-service - env_file: docker-compose.env - environment: - SERVICES: api - PORT: 3000 - depends_on: - - nats - labels: - - "traefik.enable=true" - - "traefik.http.routers.api-gw.rule=PathPrefix(`/`)" - - "traefik.http.services.api-gw.loadbalancer.server.port=3000" - networks: - - internal - - greeter: - build: - context: . - image: threetwo-library-service - env_file: docker-compose.env - environment: - SERVICES: greeter - depends_on: - - nats - networks: - - internal - - nats: - image: nats:2 - networks: - - internal - - traefik: - image: traefik:v2.1 - command: - - "--api.insecure=true" # Don't do that in production! - - "--providers.docker=true" - - "--providers.docker.exposedbydefault=false" + # context: https://github.com/rishighan/threetwo-core-service.git + context: ./ + dockerfile: Dockerfile + image: frishi/threetwo-core-service + container_name: core-services ports: - - 3000:80 - - 3001:8080 + - "3000:3000" + - "3001:3001" + depends_on: + - db + - redis + - elasticsearch + - kafka1 + - zoo1 + environment: + name: core-services + SERVICES: api,library,imagetransformation,opds,search,settings,jobqueue,socket,torrentjobs + env_file: docker-compose.env volumes: - - /var/run/docker.sock:/var/run/docker.sock:ro + - *comics-volume + - *userdata-volume networks: - - internal - - default + - proxy + + zoo1: + image: confluentinc/cp-zookeeper:7.3.2 + hostname: zoo1 + container_name: zoo1 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_SERVERS: zoo1:2888:3888 + networks: + - proxy + + kafka1: + image: confluentinc/cp-kafka:7.3.2 + hostname: kafka1 + container_name: kafka1 + ports: + - "9092:9092" + - "29092:29092" + - "9999:9999" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1} :9092,DOCKER://host.docker.internal:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state. change.logger=INFO" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_JMX_PORT: 9999 + KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1} + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" + depends_on: + - zoo1 + networks: + - proxy + + db: + image: "mongo:latest" + container_name: database + networks: + - proxy + ports: + - "27017:27017" + volumes: + - "mongodb_data:/bitnami/mongodb" + + redis: + image: "bitnami/redis:latest" + container_name: redis + hostname: redis + environment: + ALLOW_EMPTY_PASSWORD: "yes" + networks: + - proxy + ports: + - "6379:6379" + + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2 + container_name: elasticsearch + environment: + - "discovery.type=single-node" + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + - "xpack.security.enabled=true" + - "xpack.security.authc.api_key.enabled=true" + - "ELASTIC_PASSWORD=password" + ulimits: + memlock: + soft: -1 + hard: -1 + ports: + - 9200:9200 + networks: + - proxy networks: - internal: + proxy: + external: true volumes: - data: + mongodb_data: + driver: local + elasticsearch: + driver: local diff --git a/mixins/db.mixin.ts b/mixins/db.mixin.ts index 95542c0..c999dcf 100644 --- a/mixins/db.mixin.ts +++ b/mixins/db.mixin.ts @@ -2,21 +2,60 @@ const path = require("path"); const mkdir = require("mkdirp").sync; const DbService = require("moleculer-db"); - export const DbMixin = (collection, model) => { - if (process.env.MONGO_URI) { - const MongooseAdapter = require("moleculer-db-adapter-mongoose"); - return { - mixins: [DbService], - adapter: new MongooseAdapter(process.env.MONGO_URI, { - user: process.env.MONGO_INITDB_ROOT_USERNAME, - pass: process.env.MONGO_INITDB_ROOT_PASSWORD, - keepAlive: true, - useUnifiedTopology: true, - family: 4, - }), - model, - }; + if (!process.env.MONGO_URI) { + console.log("MONGO_URI not provided, initializing local storage..."); + mkdir(path.resolve("./data")); + return { mixins: [DbService] }; // Handle case where no DB URI is provided } - mkdir(path.resolve("./data")); + + const MongooseAdapter = require("moleculer-db-adapter-mongoose"); + const adapter = new MongooseAdapter(process.env.MONGO_URI, { + user: process.env.MONGO_INITDB_ROOT_USERNAME, + pass: process.env.MONGO_INITDB_ROOT_PASSWORD, + keepAlive: true, + useNewUrlParser: true, + useUnifiedTopology: true, + }); + + const connectWithRetry = async ( + adapter, + maxRetries = 5, + interval = 5000 + ) => { + for (let retry = 0; retry < maxRetries; retry++) { + try { + await adapter.connect(); + console.log("MongoDB connected successfully!"); + return; + } catch (err) { + console.error("MongoDB connection error:", err); + console.log( + `Retrying MongoDB connection in ${ + interval / 1000 + } seconds...` + ); + await new Promise((resolve) => setTimeout(resolve, interval)); + } + } + console.error("Failed to connect to MongoDB after several attempts."); + }; + + return { + mixins: [DbService], + adapter, + model, + collection, + async started() { + await connectWithRetry(this.adapter); + }, + async stopped() { + try { + await this.adapter.disconnect(); + console.log("MongoDB disconnected"); + } catch (err) { + console.error("MongoDB disconnection error:", err); + } + }, + }; }; diff --git a/moleculer.config.ts b/moleculer.config.ts index 69e2ce2..d0a0b79 100644 --- a/moleculer.config.ts +++ b/moleculer.config.ts @@ -5,6 +5,7 @@ import { MetricRegistry, ServiceBroker, } from "moleculer"; +const RedisTransporter = require("moleculer").Transporters.Redis; /** * Moleculer ServiceBroker configuration file @@ -90,7 +91,7 @@ const brokerConfig: BrokerOptions = { // More info: https://moleculer.services/docs/0.14/networking.html // Note: During the development, you don't need to define it because all services will be loaded locally. // In production you can set it via `TRANSPORTER=nats://localhost:4222` environment variable. - transporter: process.env.REDIS_URI || "redis://localhost:6379", + transporter: new RedisTransporter(process.env.REDIS_URI), // Define a cacher. // More info: https://moleculer.services/docs/0.14/caching.html diff --git a/package-lock.json b/package-lock.json index 662c092..6ef7258 100644 --- a/package-lock.json +++ b/package-lock.json @@ -38,16 +38,16 @@ "mkdirp": "^0.5.5", "moleculer-bullmq": "^3.0.0", "moleculer-db": "^0.8.23", - "moleculer-db-adapter-mongoose": "^0.9.2", + "moleculer-db-adapter-mongoose": "^0.9.4", "moleculer-io": "^2.2.0", - "moleculer-web": "^0.10.5", + "moleculer-web": "^0.10.7", "mongoosastic-ts": "^6.0.3", "mongoose": "^6.10.4", "mongoose-paginate-v2": "^1.3.18", "nats": "^1.3.2", "opds-extra": "^3.0.10", "p7zip-threetwo": "^1.0.4", - "redis": "^4.6.5", + "redis": "^4.6.14", "sanitize-filename-ts": "^1.0.2", "sharp": "^0.33.3", "threetwo-ui-typings": "^1.0.14", @@ -63,6 +63,7 @@ "eslint-plugin-import": "^2.20.2", "eslint-plugin-prefer-arrow": "^1.2.2", "install": "^0.13.0", + "ioredis": "^5.4.1", "jest": "^29.5.0", "jest-cli": "^29.5.0", "moleculer-repl": "^0.7.0", diff --git a/package.json b/package.json index cb57c85..e4de15a 100644 --- a/package.json +++ b/package.json @@ -4,8 +4,8 @@ "description": "Endpoints for common operations in ThreeTwo", "scripts": { "build": "tsc --build tsconfig.json", - "dev": "ts-node ./node_modules/moleculer/bin/moleculer-runner.js --hot --repl --config moleculer.config.ts services/**/*.service.ts", - "start": "moleculer-runner --config dist/moleculer.config.js", + "dev": "./scripts/start.sh dev", + "start": "npm run build && ./scripts/start.sh prod", "cli": "moleculer connect NATS", "ci": "jest --watch", "test": "jest --coverage", @@ -27,6 +27,7 @@ "eslint-plugin-import": "^2.20.2", "eslint-plugin-prefer-arrow": "^1.2.2", "install": "^0.13.0", + "ioredis": "^5.4.1", "jest": "^29.5.0", "jest-cli": "^29.5.0", "moleculer-repl": "^0.7.0", @@ -68,16 +69,16 @@ "mkdirp": "^0.5.5", "moleculer-bullmq": "^3.0.0", "moleculer-db": "^0.8.23", - "moleculer-db-adapter-mongoose": "^0.9.2", + "moleculer-db-adapter-mongoose": "^0.9.4", "moleculer-io": "^2.2.0", - "moleculer-web": "^0.10.5", + "moleculer-web": "^0.10.7", "mongoosastic-ts": "^6.0.3", "mongoose": "^6.10.4", "mongoose-paginate-v2": "^1.3.18", "nats": "^1.3.2", "opds-extra": "^3.0.10", "p7zip-threetwo": "^1.0.4", - "redis": "^4.6.5", + "redis": "^4.6.14", "sanitize-filename-ts": "^1.0.2", "sharp": "^0.33.3", "threetwo-ui-typings": "^1.0.14", diff --git a/scripts/start.sh b/scripts/start.sh new file mode 100755 index 0000000..f35cb9b --- /dev/null +++ b/scripts/start.sh @@ -0,0 +1,26 @@ +#!/bin/bash +echo "Starting script with mode: $MODE" + +# Extract the host and port from MONGO_URI +HOST_PORT=$(echo $MONGO_URI | sed -e 's/mongodb:\/\///' -e 's/\/.*$//') + +# Assuming the script is called from the project root +PROJECT_ROOT=$(pwd) +echo "Project root: $PROJECT_ROOT" + +CONFIG_PATH="$PROJECT_ROOT/moleculer.config.ts" +echo "Configuration path: $CONFIG_PATH" + +# Set the correct path for moleculer-runner based on the mode +if [ "$MODE" == "dev" ]; then + # For development: use ts-node + MOLECULER_RUNNER="ts-node $PROJECT_ROOT/node_modules/moleculer/bin/moleculer-runner.js --hot --repl --config $CONFIG_PATH $PROJECT_ROOT/services/**/*.service.ts" + echo "Moleculer Runner for dev: $MOLECULER_RUNNER" +else + # For production: direct node execution of the compiled JavaScript + MOLECULER_RUNNER="moleculer-runner --config $PROJECT_ROOT/dist/moleculer.config.js $PROJECT_ROOT/dist/services/**/*.service.js" + echo "Moleculer Runner for prod: $MOLECULER_RUNNER" +fi + +# Run wait-for-it, then start the application +./scripts/wait-for-it.sh $HOST_PORT -- $MOLECULER_RUNNER diff --git a/scripts/wait-for-it.sh b/scripts/wait-for-it.sh new file mode 100755 index 0000000..c092856 --- /dev/null +++ b/scripts/wait-for-it.sh @@ -0,0 +1,190 @@ +#!/usr/bin/env bash +# Use this script to test if a given TCP host/port are available + +WAITFORIT_cmdname=${0##*/} +if [[ $OSTYPE == 'darwin'* ]]; then + if ! command -v gtimeout &> /dev/null + then + echo "missing gtimeout (`brew install coreutils`)" + exit + fi + alias timeout=gtimeout +fi + +echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } + +usage() +{ + cat << USAGE >&2 +Usage: + $WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args] + -h HOST | --host=HOST Host or IP under test + -p PORT | --port=PORT TCP port under test + Alternatively, you specify the host and port as host:port + -s | --strict Only execute subcommand if the test succeeds + -q | --quiet Don't output any status messages + -t TIMEOUT | --timeout=TIMEOUT + Timeout in seconds, zero for no timeout + -- COMMAND ARGS Execute command with args after the test finishes +USAGE + exit 1 +} + +wait_for() +{ + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" + else + echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout" + fi + WAITFORIT_start_ts=$(date +%s) + while : + do + if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then + nc -z $WAITFORIT_HOST $WAITFORIT_PORT + WAITFORIT_result=$? + else + (echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1 + WAITFORIT_result=$? + fi + if [[ $WAITFORIT_result -eq 0 ]]; then + WAITFORIT_end_ts=$(date +%s) + echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds" + break + fi + sleep 1 + done + return $WAITFORIT_result +} + +wait_for_wrapper() +{ + # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 + if [[ $WAITFORIT_QUIET -eq 1 ]]; then + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & + else + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & + fi + WAITFORIT_PID=$! + trap "kill -INT -$WAITFORIT_PID" INT + wait $WAITFORIT_PID + WAITFORIT_RESULT=$? + if [[ $WAITFORIT_RESULT -ne 0 ]]; then + echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" + fi + return $WAITFORIT_RESULT +} + +# process arguments +while [[ $# -gt 0 ]] +do + case "$1" in + *:* ) + WAITFORIT_hostport=(${1//:/ }) + WAITFORIT_HOST=${WAITFORIT_hostport[0]} + WAITFORIT_PORT=${WAITFORIT_hostport[1]} + shift 1 + ;; + --child) + WAITFORIT_CHILD=1 + shift 1 + ;; + -q | --quiet) + WAITFORIT_QUIET=1 + shift 1 + ;; + -s | --strict) + WAITFORIT_STRICT=1 + shift 1 + ;; + -h) + WAITFORIT_HOST="$2" + if [[ $WAITFORIT_HOST == "" ]]; then break; fi + shift 2 + ;; + --host=*) + WAITFORIT_HOST="${1#*=}" + shift 1 + ;; + -p) + WAITFORIT_PORT="$2" + if [[ $WAITFORIT_PORT == "" ]]; then break; fi + shift 2 + ;; + --port=*) + WAITFORIT_PORT="${1#*=}" + shift 1 + ;; + -t) + WAITFORIT_TIMEOUT="$2" + if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi + shift 2 + ;; + --timeout=*) + WAITFORIT_TIMEOUT="${1#*=}" + shift 1 + ;; + --) + shift + WAITFORIT_CLI=("$@") + break + ;; + --help) + usage + ;; + *) + echoerr "Unknown argument: $1" + usage + ;; + esac +done + +if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then + echoerr "Error: you need to provide a host and port to test." + usage +fi + +WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15} +WAITFORIT_STRICT=${WAITFORIT_STRICT:-0} +WAITFORIT_CHILD=${WAITFORIT_CHILD:-0} +WAITFORIT_QUIET=${WAITFORIT_QUIET:-0} + +# Check to see if timeout is from busybox? +WAITFORIT_TIMEOUT_PATH=$(type -p timeout) +WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH) + +WAITFORIT_BUSYTIMEFLAG="" +if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then + WAITFORIT_ISBUSY=1 + # Check if busybox timeout uses -t flag + # (recent Alpine versions don't support -t anymore) + if timeout &>/dev/stdout | grep -q -e '-t '; then + WAITFORIT_BUSYTIMEFLAG="-t" + fi +else + WAITFORIT_ISBUSY=0 +fi + +if [[ $WAITFORIT_CHILD -gt 0 ]]; then + wait_for + WAITFORIT_RESULT=$? + exit $WAITFORIT_RESULT +else + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + wait_for_wrapper + WAITFORIT_RESULT=$? + else + wait_for + WAITFORIT_RESULT=$? + fi +fi + +if [[ $WAITFORIT_CLI != "" ]]; then + if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then + echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess" + exit $WAITFORIT_RESULT + fi + exec "${WAITFORIT_CLI[@]}" +else + exit $WAITFORIT_RESULT +fi diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index 830e18c..d644563 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -14,7 +14,6 @@ import { pubClient } from "../config/redis.config"; import path from "path"; const { MoleculerError } = require("moleculer").Errors; -console.log(process.env.REDIS_URI); export default class JobQueueService extends Service { public constructor(public broker: ServiceBroker) { super(broker); @@ -22,9 +21,10 @@ export default class JobQueueService extends Service { name: "jobqueue", hooks: {}, mixins: [DbMixin("comics", Comic), BullMqMixin], + settings: { bullmq: { - client: process.env.REDIS_URI, + client: pubClient, }, }, actions: { @@ -57,20 +57,24 @@ export default class JobQueueService extends Service { handler: async ( ctx: Context<{ action: string; description: string }> ) => { - const { action, description } = ctx.params; - // Enqueue the job - const job = await this.localQueue( - ctx, - action, - ctx.params, - { - priority: 10, - } - ); - console.log(`Job ${job.id} enqueued`); - console.log(`${description}`); + try { + const { action, description } = ctx.params; + // Enqueue the job + const job = await this.localQueue( + ctx, + action, + {}, + { + priority: 10, + } + ); + console.log(`Job ${job.id} enqueued`); + console.log(`${description}`); - return job.id; + return job.id; + } catch (error) { + console.error("Failed to enqueue job:", error); + } }, }, diff --git a/services/library.service.ts b/services/library.service.ts index ecf6c44..efeea48 100644 --- a/services/library.service.ts +++ b/services/library.service.ts @@ -59,9 +59,11 @@ import path from "path"; import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories"; import AirDCPPSocket from "../shared/airdcpp.socket"; -console.log(`MONGO -> ${process.env.MONGO_URI}`); -export default class ImportService extends Service { - public constructor(public broker: ServiceBroker) { +export default class LibraryService extends Service { + public constructor( + public broker: ServiceBroker, + schema: ServiceSchema<{}> = { name: "library" } + ) { super(broker); this.parseServiceSchema({ name: "library", @@ -164,78 +166,52 @@ export default class ImportService extends Service { }, newImport: { rest: "POST /newImport", - // params: {}, - async handler( - ctx: Context<{ - extractionOptions?: any; - sessionId: string; - }> - ) { + async handler(ctx) { + const { sessionId } = ctx.params; try { - // Get params to be passed to the import jobs - const { sessionId } = ctx.params; - // 1. Walk the Source folder - klaw(path.resolve(COMICS_DIRECTORY)) - // 1.1 Filter on .cb* extensions - .pipe( - through2.obj(function (item, enc, next) { - let fileExtension = path.extname( - item.path - ); - if ( - [".cbz", ".cbr", ".cb7"].includes( - fileExtension - ) - ) { - this.push(item); - } - next(); - }) - ) - // 1.2 Pipe filtered results to the next step - // Enqueue the job in the queue - .on("data", async (item) => { - console.info( - "Found a file at path: %s", - item.path - ); - let comicExists = await Comic.exists({ - "rawFileDetails.name": `${path.basename( - item.path, - path.extname(item.path) - )}`, - }); - if (!comicExists) { - // 2.1 Reset the job counters in Redis - await pubClient.set( - "completedJobCount", - 0 - ); - await pubClient.set( - "failedJobCount", - 0 - ); - // 2.2 Send the extraction job to the queue - this.broker.call("jobqueue.enqueue", { - fileObject: { - filePath: item.path, - fileSize: item.stats.size, - }, - sessionId, - importType: "new", - action: "enqueue.async", - }); - } else { - console.log( - "Comic already exists in the library." - ); - } - }) - .on("end", () => { - console.log("All files traversed."); + // Initialize Redis counters once at the start of the import + await pubClient.set("completedJobCount", 0); + await pubClient.set("failedJobCount", 0); + + // Convert klaw to use a promise-based approach for better flow control + const files = await this.getComicFiles( + COMICS_DIRECTORY + ); + for (const file of files) { + console.info( + "Found a file at path:", + file.path + ); + const comicExists = await Comic.exists({ + "rawFileDetails.name": path.basename( + file.path, + path.extname(file.path) + ), }); + + if (!comicExists) { + // Send the extraction job to the queue + await this.broker.call("jobqueue.enqueue", { + fileObject: { + filePath: file.path, + fileSize: file.stats.size, + }, + sessionId, + importType: "new", + action: "enqueue.async", + }); + } else { + console.log( + "Comic already exists in the library." + ); + } + } + console.log("All files traversed."); } catch (error) { - console.log(error); + console.error( + "Error during newImport processing:", + error + ); } }, }, @@ -864,7 +840,35 @@ export default class ImportService extends Service { }, }, }, - methods: {}, + methods: { + // Method to walk the directory and filter comic files + getComicFiles: (directory) => { + return new Promise((resolve, reject) => { + const files = []; + klaw(directory) + .pipe( + through2.obj(function (item, enc, next) { + const fileExtension = path.extname( + item.path + ); + if ( + [".cbz", ".cbr", ".cb7"].includes( + fileExtension + ) + ) { + this.push(item); + } + next(); + }) + ) + .on("data", (item) => { + files.push(item); + }) + .on("end", () => resolve(files)) + .on("error", (err) => reject(err)); + }); + }, + }, }); } } diff --git a/services/socket.service.ts b/services/socket.service.ts index d7d704d..84c4cfd 100644 --- a/services/socket.service.ts +++ b/services/socket.service.ts @@ -1,7 +1,6 @@ "use strict"; import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer"; import { JobType } from "moleculer-bullmq"; -import { createClient } from "redis"; import { createAdapter } from "@socket.io/redis-adapter"; import Session from "../models/session.model"; import { pubClient, subClient } from "../config/redis.config"; @@ -274,6 +273,22 @@ export default class SocketService extends Service { }, }, async started() { + this.logger.info("Starting Socket Service..."); + this.logger.debug("pubClient:", pubClient); + this.logger.debug("subClient:", subClient); + if (!pubClient || !subClient) { + this.logger.error("Redis clients are not initialized!"); + throw new Error("Redis clients are not initialized!"); + } + + // Additional checks or logic if necessary + if (pubClient.status !== "ready") { + await pubClient.connect(); + } + + if (subClient.status !== "ready") { + await subClient.connect(); + } this.io.on("connection", async (socket) => { console.log( `socket.io server connected to client with session ID: ${socket.id}` diff --git a/services/torrentjobs.service.ts b/services/torrentjobs.service.ts index c9cdf73..737ed3f 100644 --- a/services/torrentjobs.service.ts +++ b/services/torrentjobs.service.ts @@ -9,6 +9,7 @@ import { import { DbMixin } from "../mixins/db.mixin"; import Comic from "../models/comic.model"; import BullMqMixin from "moleculer-bullmq"; +import { pubClient } from "../config/redis.config"; const { MoleculerError } = require("moleculer").Errors; export default class ImageTransformation extends Service { @@ -23,7 +24,7 @@ export default class ImageTransformation extends Service { mixins: [DbMixin("comics", Comic), BullMqMixin], settings: { bullmq: { - client: process.env.REDIS_URI, + client: pubClient, }, }, hooks: {},