Compare commits
4 Commits
ux-refinem
...
airdcpp-au
| Author | SHA1 | Date | |
|---|---|---|---|
| 68907a96a2 | |||
| aff4467b8b | |||
| e7341b553d | |||
| a4ccc78fe8 |
@@ -42,6 +42,8 @@ RUN node -v && npm -v
|
||||
COPY package.json package-lock.json ./
|
||||
COPY moleculer.config.ts ./
|
||||
COPY tsconfig.json ./
|
||||
COPY scripts ./scripts
|
||||
RUN chmod +x ./scripts/*
|
||||
|
||||
# Install application dependencies
|
||||
RUN npm install
|
||||
@@ -50,9 +52,8 @@ RUN npm install -g typescript ts-node
|
||||
# Copy the rest of the application files
|
||||
COPY . .
|
||||
|
||||
# Build and clean up
|
||||
RUN npm run build \
|
||||
&& npm prune
|
||||
# clean up
|
||||
RUN npm prune
|
||||
|
||||
# Expose the application's port
|
||||
EXPOSE 3000
|
||||
|
||||
@@ -1,10 +1,30 @@
|
||||
import { createClient } from "redis";
|
||||
const redisURL = new URL(process.env.REDIS_URI);
|
||||
// Import the Redis library
|
||||
import IORedis from "ioredis";
|
||||
|
||||
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
|
||||
(async () => {
|
||||
await pubClient.connect();
|
||||
})();
|
||||
const subClient = pubClient.duplicate();
|
||||
// Environment variable for Redis URI
|
||||
const redisURI = process.env.REDIS_URI || "redis://localhost:6379";
|
||||
console.log(`process.env.REDIS_URI is ${process.env.REDIS_URI}`);
|
||||
// Creating the publisher client
|
||||
const pubClient = new IORedis(redisURI);
|
||||
|
||||
export { subClient, pubClient };
|
||||
// Creating the subscriber client
|
||||
const subClient = new IORedis(redisURI);
|
||||
|
||||
// Handle connection events for the publisher
|
||||
pubClient.on("connect", () => {
|
||||
console.log("Publisher client connected to Redis.");
|
||||
});
|
||||
pubClient.on("error", (err) => {
|
||||
console.error("Publisher client failed to connect to Redis:", err);
|
||||
});
|
||||
|
||||
// Handle connection events for the subscriber
|
||||
subClient.on("connect", () => {
|
||||
console.log("Subscriber client connected to Redis.");
|
||||
});
|
||||
subClient.on("error", (err) => {
|
||||
console.error("Subscriber client failed to connect to Redis:", err);
|
||||
});
|
||||
|
||||
// Export the clients for use in other parts of the application
|
||||
export { pubClient, subClient };
|
||||
|
||||
@@ -60,7 +60,7 @@ services:
|
||||
networks:
|
||||
- kafka-net
|
||||
ports:
|
||||
- "27017:27017"
|
||||
- "127.0.0.1:27017:27017"
|
||||
volumes:
|
||||
- "mongodb_data:/bitnami/mongodb"
|
||||
|
||||
@@ -72,7 +72,7 @@ services:
|
||||
networks:
|
||||
- kafka-net
|
||||
ports:
|
||||
- "6379:6379"
|
||||
- "127.0.0.1:6379:6379"
|
||||
|
||||
elasticsearch:
|
||||
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2
|
||||
@@ -88,7 +88,7 @@ services:
|
||||
soft: -1
|
||||
hard: -1
|
||||
ports:
|
||||
- "9200:9200"
|
||||
- "127.0.0.1:9200:9200"
|
||||
networks:
|
||||
- kafka-net
|
||||
|
||||
|
||||
@@ -3,7 +3,15 @@ LOGGER=true
|
||||
LOGLEVEL=info
|
||||
SERVICEDIR=dist/services
|
||||
|
||||
TRANSPORTER=nats://nats:4222
|
||||
VITE_UNDERLYING_HOST=localhost
|
||||
COMICS_DIRECTORY=/Users/rishi/work/threetwo-core-service/comics
|
||||
USERDATA_DIRECTORY=/Users/rishi/work/threetwo-core-service/userdata
|
||||
REDIS_URI=redis://redis:6379
|
||||
KAFKA_BROKER=kafka1:9092
|
||||
ELASTICSEARCH_URI=http://elasticsearch:9200
|
||||
MONGO_URI=mongodb://db:27017/threetwo
|
||||
UNRAR_BIN_PATH=/opt/homebrew/bin/unrar
|
||||
SEVENZ_BINARY_PATH=/opt/homebrew/bin/7za
|
||||
|
||||
CACHER=Memory
|
||||
|
||||
|
||||
@@ -1,58 +1,125 @@
|
||||
version: "3.3"
|
||||
x-userdata-volume: &userdata-volume
|
||||
type: bind
|
||||
source: ${USERDATA_DIRECTORY}
|
||||
target: /userdata
|
||||
|
||||
x-comics-volume: &comics-volume
|
||||
type: bind
|
||||
source: ${COMICS_DIRECTORY}
|
||||
target: /comics
|
||||
|
||||
services:
|
||||
|
||||
api:
|
||||
core-services:
|
||||
build:
|
||||
context: .
|
||||
image: threetwo-library-service
|
||||
env_file: docker-compose.env
|
||||
environment:
|
||||
SERVICES: api
|
||||
PORT: 3000
|
||||
depends_on:
|
||||
- nats
|
||||
labels:
|
||||
- "traefik.enable=true"
|
||||
- "traefik.http.routers.api-gw.rule=PathPrefix(`/`)"
|
||||
- "traefik.http.services.api-gw.loadbalancer.server.port=3000"
|
||||
networks:
|
||||
- internal
|
||||
|
||||
greeter:
|
||||
build:
|
||||
context: .
|
||||
image: threetwo-library-service
|
||||
env_file: docker-compose.env
|
||||
environment:
|
||||
SERVICES: greeter
|
||||
depends_on:
|
||||
- nats
|
||||
networks:
|
||||
- internal
|
||||
|
||||
nats:
|
||||
image: nats:2
|
||||
networks:
|
||||
- internal
|
||||
|
||||
traefik:
|
||||
image: traefik:v2.1
|
||||
command:
|
||||
- "--api.insecure=true" # Don't do that in production!
|
||||
- "--providers.docker=true"
|
||||
- "--providers.docker.exposedbydefault=false"
|
||||
# context: https://github.com/rishighan/threetwo-core-service.git
|
||||
context: ./
|
||||
dockerfile: Dockerfile
|
||||
image: frishi/threetwo-core-service
|
||||
container_name: core-services
|
||||
ports:
|
||||
- 3000:80
|
||||
- 3001:8080
|
||||
- "3000:3000"
|
||||
- "3001:3001"
|
||||
depends_on:
|
||||
- db
|
||||
- redis
|
||||
- elasticsearch
|
||||
- kafka1
|
||||
- zoo1
|
||||
environment:
|
||||
name: core-services
|
||||
SERVICES: api,library,imagetransformation,opds,search,settings,jobqueue,socket,torrentjobs
|
||||
env_file: docker-compose.env
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock:ro
|
||||
- *comics-volume
|
||||
- *userdata-volume
|
||||
networks:
|
||||
- internal
|
||||
- default
|
||||
- proxy
|
||||
|
||||
zoo1:
|
||||
image: confluentinc/cp-zookeeper:7.3.2
|
||||
hostname: zoo1
|
||||
container_name: zoo1
|
||||
ports:
|
||||
- "2181:2181"
|
||||
environment:
|
||||
ZOOKEEPER_CLIENT_PORT: 2181
|
||||
ZOOKEEPER_SERVER_ID: 1
|
||||
ZOOKEEPER_SERVERS: zoo1:2888:3888
|
||||
networks:
|
||||
- proxy
|
||||
|
||||
kafka1:
|
||||
image: confluentinc/cp-kafka:7.3.2
|
||||
hostname: kafka1
|
||||
container_name: kafka1
|
||||
ports:
|
||||
- "9092:9092"
|
||||
- "29092:29092"
|
||||
- "9999:9999"
|
||||
environment:
|
||||
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1} :9092,DOCKER://host.docker.internal:29092
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
|
||||
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state. change.logger=INFO"
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
||||
KAFKA_JMX_PORT: 9999
|
||||
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
|
||||
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
|
||||
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
|
||||
depends_on:
|
||||
- zoo1
|
||||
networks:
|
||||
- proxy
|
||||
|
||||
db:
|
||||
image: "mongo:latest"
|
||||
container_name: database
|
||||
networks:
|
||||
- proxy
|
||||
ports:
|
||||
- "27017:27017"
|
||||
volumes:
|
||||
- "mongodb_data:/bitnami/mongodb"
|
||||
|
||||
redis:
|
||||
image: "bitnami/redis:latest"
|
||||
container_name: redis
|
||||
hostname: redis
|
||||
environment:
|
||||
ALLOW_EMPTY_PASSWORD: "yes"
|
||||
networks:
|
||||
- proxy
|
||||
ports:
|
||||
- "6379:6379"
|
||||
|
||||
elasticsearch:
|
||||
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2
|
||||
container_name: elasticsearch
|
||||
environment:
|
||||
- "discovery.type=single-node"
|
||||
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
|
||||
- "xpack.security.enabled=true"
|
||||
- "xpack.security.authc.api_key.enabled=true"
|
||||
- "ELASTIC_PASSWORD=password"
|
||||
ulimits:
|
||||
memlock:
|
||||
soft: -1
|
||||
hard: -1
|
||||
ports:
|
||||
- 9200:9200
|
||||
networks:
|
||||
- proxy
|
||||
|
||||
networks:
|
||||
internal:
|
||||
proxy:
|
||||
external: true
|
||||
|
||||
volumes:
|
||||
data:
|
||||
mongodb_data:
|
||||
driver: local
|
||||
elasticsearch:
|
||||
driver: local
|
||||
|
||||
@@ -2,21 +2,60 @@ const path = require("path");
|
||||
const mkdir = require("mkdirp").sync;
|
||||
const DbService = require("moleculer-db");
|
||||
|
||||
|
||||
export const DbMixin = (collection, model) => {
|
||||
if (process.env.MONGO_URI) {
|
||||
const MongooseAdapter = require("moleculer-db-adapter-mongoose");
|
||||
return {
|
||||
mixins: [DbService],
|
||||
adapter: new MongooseAdapter(process.env.MONGO_URI, {
|
||||
user: process.env.MONGO_INITDB_ROOT_USERNAME,
|
||||
pass: process.env.MONGO_INITDB_ROOT_PASSWORD,
|
||||
keepAlive: true,
|
||||
useUnifiedTopology: true,
|
||||
family: 4,
|
||||
}),
|
||||
model,
|
||||
};
|
||||
if (!process.env.MONGO_URI) {
|
||||
console.log("MONGO_URI not provided, initializing local storage...");
|
||||
mkdir(path.resolve("./data"));
|
||||
return { mixins: [DbService] }; // Handle case where no DB URI is provided
|
||||
}
|
||||
mkdir(path.resolve("./data"));
|
||||
|
||||
const MongooseAdapter = require("moleculer-db-adapter-mongoose");
|
||||
const adapter = new MongooseAdapter(process.env.MONGO_URI, {
|
||||
user: process.env.MONGO_INITDB_ROOT_USERNAME,
|
||||
pass: process.env.MONGO_INITDB_ROOT_PASSWORD,
|
||||
keepAlive: true,
|
||||
useNewUrlParser: true,
|
||||
useUnifiedTopology: true,
|
||||
});
|
||||
|
||||
const connectWithRetry = async (
|
||||
adapter,
|
||||
maxRetries = 5,
|
||||
interval = 5000
|
||||
) => {
|
||||
for (let retry = 0; retry < maxRetries; retry++) {
|
||||
try {
|
||||
await adapter.connect();
|
||||
console.log("MongoDB connected successfully!");
|
||||
return;
|
||||
} catch (err) {
|
||||
console.error("MongoDB connection error:", err);
|
||||
console.log(
|
||||
`Retrying MongoDB connection in ${
|
||||
interval / 1000
|
||||
} seconds...`
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, interval));
|
||||
}
|
||||
}
|
||||
console.error("Failed to connect to MongoDB after several attempts.");
|
||||
};
|
||||
|
||||
return {
|
||||
mixins: [DbService],
|
||||
adapter,
|
||||
model,
|
||||
collection,
|
||||
async started() {
|
||||
await connectWithRetry(this.adapter);
|
||||
},
|
||||
async stopped() {
|
||||
try {
|
||||
await this.adapter.disconnect();
|
||||
console.log("MongoDB disconnected");
|
||||
} catch (err) {
|
||||
console.error("MongoDB disconnection error:", err);
|
||||
}
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
MetricRegistry,
|
||||
ServiceBroker,
|
||||
} from "moleculer";
|
||||
const RedisTransporter = require("moleculer").Transporters.Redis;
|
||||
|
||||
/**
|
||||
* Moleculer ServiceBroker configuration file
|
||||
@@ -90,7 +91,7 @@ const brokerConfig: BrokerOptions = {
|
||||
// More info: https://moleculer.services/docs/0.14/networking.html
|
||||
// Note: During the development, you don't need to define it because all services will be loaded locally.
|
||||
// In production you can set it via `TRANSPORTER=nats://localhost:4222` environment variable.
|
||||
transporter: process.env.REDIS_URI || "redis://localhost:6379",
|
||||
transporter: new RedisTransporter(process.env.REDIS_URI),
|
||||
|
||||
// Define a cacher.
|
||||
// More info: https://moleculer.services/docs/0.14/caching.html
|
||||
|
||||
1033
package-lock.json
generated
1033
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
11
package.json
11
package.json
@@ -4,8 +4,8 @@
|
||||
"description": "Endpoints for common operations in ThreeTwo",
|
||||
"scripts": {
|
||||
"build": "tsc --build tsconfig.json",
|
||||
"dev": "ts-node ./node_modules/moleculer/bin/moleculer-runner.js --hot --repl --config moleculer.config.ts services/**/*.service.ts",
|
||||
"start": "moleculer-runner --config dist/moleculer.config.js",
|
||||
"dev": "./scripts/start.sh dev",
|
||||
"start": "npm run build && ./scripts/start.sh prod",
|
||||
"cli": "moleculer connect NATS",
|
||||
"ci": "jest --watch",
|
||||
"test": "jest --coverage",
|
||||
@@ -27,6 +27,7 @@
|
||||
"eslint-plugin-import": "^2.20.2",
|
||||
"eslint-plugin-prefer-arrow": "^1.2.2",
|
||||
"install": "^0.13.0",
|
||||
"ioredis": "^5.4.1",
|
||||
"jest": "^29.5.0",
|
||||
"jest-cli": "^29.5.0",
|
||||
"moleculer-repl": "^0.7.0",
|
||||
@@ -68,16 +69,16 @@
|
||||
"mkdirp": "^0.5.5",
|
||||
"moleculer-bullmq": "^3.0.0",
|
||||
"moleculer-db": "^0.8.23",
|
||||
"moleculer-db-adapter-mongoose": "^0.9.2",
|
||||
"moleculer-db-adapter-mongoose": "^0.9.4",
|
||||
"moleculer-io": "^2.2.0",
|
||||
"moleculer-web": "^0.10.5",
|
||||
"moleculer-web": "^0.10.7",
|
||||
"mongoosastic-ts": "^6.0.3",
|
||||
"mongoose": "^6.10.4",
|
||||
"mongoose-paginate-v2": "^1.3.18",
|
||||
"nats": "^1.3.2",
|
||||
"opds-extra": "^3.0.10",
|
||||
"p7zip-threetwo": "^1.0.4",
|
||||
"redis": "^4.6.5",
|
||||
"redis": "^4.6.14",
|
||||
"sanitize-filename-ts": "^1.0.2",
|
||||
"sharp": "^0.33.3",
|
||||
"threetwo-ui-typings": "^1.0.14",
|
||||
|
||||
26
scripts/start.sh
Executable file
26
scripts/start.sh
Executable file
@@ -0,0 +1,26 @@
|
||||
#!/bin/bash
|
||||
echo "Starting script with mode: $MODE"
|
||||
|
||||
# Extract the host and port from MONGO_URI
|
||||
HOST_PORT=$(echo $MONGO_URI | sed -e 's/mongodb:\/\///' -e 's/\/.*$//')
|
||||
|
||||
# Assuming the script is called from the project root
|
||||
PROJECT_ROOT=$(pwd)
|
||||
echo "Project root: $PROJECT_ROOT"
|
||||
|
||||
CONFIG_PATH="$PROJECT_ROOT/moleculer.config.ts"
|
||||
echo "Configuration path: $CONFIG_PATH"
|
||||
|
||||
# Set the correct path for moleculer-runner based on the mode
|
||||
if [ "$MODE" == "dev" ]; then
|
||||
# For development: use ts-node
|
||||
MOLECULER_RUNNER="ts-node $PROJECT_ROOT/node_modules/moleculer/bin/moleculer-runner.js --hot --repl --config $CONFIG_PATH $PROJECT_ROOT/services/**/*.service.ts"
|
||||
echo "Moleculer Runner for dev: $MOLECULER_RUNNER"
|
||||
else
|
||||
# For production: direct node execution of the compiled JavaScript
|
||||
MOLECULER_RUNNER="moleculer-runner --config $PROJECT_ROOT/dist/moleculer.config.js $PROJECT_ROOT/dist/services/**/*.service.js"
|
||||
echo "Moleculer Runner for prod: $MOLECULER_RUNNER"
|
||||
fi
|
||||
|
||||
# Run wait-for-it, then start the application
|
||||
./scripts/wait-for-it.sh $HOST_PORT -- $MOLECULER_RUNNER
|
||||
190
scripts/wait-for-it.sh
Executable file
190
scripts/wait-for-it.sh
Executable file
@@ -0,0 +1,190 @@
|
||||
#!/usr/bin/env bash
|
||||
# Use this script to test if a given TCP host/port are available
|
||||
|
||||
WAITFORIT_cmdname=${0##*/}
|
||||
if [[ $OSTYPE == 'darwin'* ]]; then
|
||||
if ! command -v gtimeout &> /dev/null
|
||||
then
|
||||
echo "missing gtimeout (`brew install coreutils`)"
|
||||
exit
|
||||
fi
|
||||
alias timeout=gtimeout
|
||||
fi
|
||||
|
||||
echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi }
|
||||
|
||||
usage()
|
||||
{
|
||||
cat << USAGE >&2
|
||||
Usage:
|
||||
$WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args]
|
||||
-h HOST | --host=HOST Host or IP under test
|
||||
-p PORT | --port=PORT TCP port under test
|
||||
Alternatively, you specify the host and port as host:port
|
||||
-s | --strict Only execute subcommand if the test succeeds
|
||||
-q | --quiet Don't output any status messages
|
||||
-t TIMEOUT | --timeout=TIMEOUT
|
||||
Timeout in seconds, zero for no timeout
|
||||
-- COMMAND ARGS Execute command with args after the test finishes
|
||||
USAGE
|
||||
exit 1
|
||||
}
|
||||
|
||||
wait_for()
|
||||
{
|
||||
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
|
||||
echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
|
||||
else
|
||||
echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout"
|
||||
fi
|
||||
WAITFORIT_start_ts=$(date +%s)
|
||||
while :
|
||||
do
|
||||
if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then
|
||||
nc -z $WAITFORIT_HOST $WAITFORIT_PORT
|
||||
WAITFORIT_result=$?
|
||||
else
|
||||
(echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1
|
||||
WAITFORIT_result=$?
|
||||
fi
|
||||
if [[ $WAITFORIT_result -eq 0 ]]; then
|
||||
WAITFORIT_end_ts=$(date +%s)
|
||||
echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds"
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
return $WAITFORIT_result
|
||||
}
|
||||
|
||||
wait_for_wrapper()
|
||||
{
|
||||
# In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692
|
||||
if [[ $WAITFORIT_QUIET -eq 1 ]]; then
|
||||
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
|
||||
else
|
||||
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
|
||||
fi
|
||||
WAITFORIT_PID=$!
|
||||
trap "kill -INT -$WAITFORIT_PID" INT
|
||||
wait $WAITFORIT_PID
|
||||
WAITFORIT_RESULT=$?
|
||||
if [[ $WAITFORIT_RESULT -ne 0 ]]; then
|
||||
echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
|
||||
fi
|
||||
return $WAITFORIT_RESULT
|
||||
}
|
||||
|
||||
# process arguments
|
||||
while [[ $# -gt 0 ]]
|
||||
do
|
||||
case "$1" in
|
||||
*:* )
|
||||
WAITFORIT_hostport=(${1//:/ })
|
||||
WAITFORIT_HOST=${WAITFORIT_hostport[0]}
|
||||
WAITFORIT_PORT=${WAITFORIT_hostport[1]}
|
||||
shift 1
|
||||
;;
|
||||
--child)
|
||||
WAITFORIT_CHILD=1
|
||||
shift 1
|
||||
;;
|
||||
-q | --quiet)
|
||||
WAITFORIT_QUIET=1
|
||||
shift 1
|
||||
;;
|
||||
-s | --strict)
|
||||
WAITFORIT_STRICT=1
|
||||
shift 1
|
||||
;;
|
||||
-h)
|
||||
WAITFORIT_HOST="$2"
|
||||
if [[ $WAITFORIT_HOST == "" ]]; then break; fi
|
||||
shift 2
|
||||
;;
|
||||
--host=*)
|
||||
WAITFORIT_HOST="${1#*=}"
|
||||
shift 1
|
||||
;;
|
||||
-p)
|
||||
WAITFORIT_PORT="$2"
|
||||
if [[ $WAITFORIT_PORT == "" ]]; then break; fi
|
||||
shift 2
|
||||
;;
|
||||
--port=*)
|
||||
WAITFORIT_PORT="${1#*=}"
|
||||
shift 1
|
||||
;;
|
||||
-t)
|
||||
WAITFORIT_TIMEOUT="$2"
|
||||
if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi
|
||||
shift 2
|
||||
;;
|
||||
--timeout=*)
|
||||
WAITFORIT_TIMEOUT="${1#*=}"
|
||||
shift 1
|
||||
;;
|
||||
--)
|
||||
shift
|
||||
WAITFORIT_CLI=("$@")
|
||||
break
|
||||
;;
|
||||
--help)
|
||||
usage
|
||||
;;
|
||||
*)
|
||||
echoerr "Unknown argument: $1"
|
||||
usage
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then
|
||||
echoerr "Error: you need to provide a host and port to test."
|
||||
usage
|
||||
fi
|
||||
|
||||
WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15}
|
||||
WAITFORIT_STRICT=${WAITFORIT_STRICT:-0}
|
||||
WAITFORIT_CHILD=${WAITFORIT_CHILD:-0}
|
||||
WAITFORIT_QUIET=${WAITFORIT_QUIET:-0}
|
||||
|
||||
# Check to see if timeout is from busybox?
|
||||
WAITFORIT_TIMEOUT_PATH=$(type -p timeout)
|
||||
WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH)
|
||||
|
||||
WAITFORIT_BUSYTIMEFLAG=""
|
||||
if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then
|
||||
WAITFORIT_ISBUSY=1
|
||||
# Check if busybox timeout uses -t flag
|
||||
# (recent Alpine versions don't support -t anymore)
|
||||
if timeout &>/dev/stdout | grep -q -e '-t '; then
|
||||
WAITFORIT_BUSYTIMEFLAG="-t"
|
||||
fi
|
||||
else
|
||||
WAITFORIT_ISBUSY=0
|
||||
fi
|
||||
|
||||
if [[ $WAITFORIT_CHILD -gt 0 ]]; then
|
||||
wait_for
|
||||
WAITFORIT_RESULT=$?
|
||||
exit $WAITFORIT_RESULT
|
||||
else
|
||||
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
|
||||
wait_for_wrapper
|
||||
WAITFORIT_RESULT=$?
|
||||
else
|
||||
wait_for
|
||||
WAITFORIT_RESULT=$?
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ $WAITFORIT_CLI != "" ]]; then
|
||||
if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then
|
||||
echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess"
|
||||
exit $WAITFORIT_RESULT
|
||||
fi
|
||||
exec "${WAITFORIT_CLI[@]}"
|
||||
else
|
||||
exit $WAITFORIT_RESULT
|
||||
fi
|
||||
@@ -14,7 +14,6 @@ import { pubClient } from "../config/redis.config";
|
||||
import path from "path";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
|
||||
console.log(process.env.REDIS_URI);
|
||||
export default class JobQueueService extends Service {
|
||||
public constructor(public broker: ServiceBroker) {
|
||||
super(broker);
|
||||
@@ -22,9 +21,10 @@ export default class JobQueueService extends Service {
|
||||
name: "jobqueue",
|
||||
hooks: {},
|
||||
mixins: [DbMixin("comics", Comic), BullMqMixin],
|
||||
|
||||
settings: {
|
||||
bullmq: {
|
||||
client: process.env.REDIS_URI,
|
||||
client: pubClient,
|
||||
},
|
||||
},
|
||||
actions: {
|
||||
@@ -57,20 +57,24 @@ export default class JobQueueService extends Service {
|
||||
handler: async (
|
||||
ctx: Context<{ action: string; description: string }>
|
||||
) => {
|
||||
const { action, description } = ctx.params;
|
||||
// Enqueue the job
|
||||
const job = await this.localQueue(
|
||||
ctx,
|
||||
action,
|
||||
ctx.params,
|
||||
{
|
||||
priority: 10,
|
||||
}
|
||||
);
|
||||
console.log(`Job ${job.id} enqueued`);
|
||||
console.log(`${description}`);
|
||||
try {
|
||||
const { action, description } = ctx.params;
|
||||
// Enqueue the job
|
||||
const job = await this.localQueue(
|
||||
ctx,
|
||||
action,
|
||||
{},
|
||||
{
|
||||
priority: 10,
|
||||
}
|
||||
);
|
||||
console.log(`Job ${job.id} enqueued`);
|
||||
console.log(`${description}`);
|
||||
|
||||
return job.id;
|
||||
return job.id;
|
||||
} catch (error) {
|
||||
console.error("Failed to enqueue job:", error);
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
|
||||
@@ -57,11 +57,12 @@ const through2 = require("through2");
|
||||
import klaw from "klaw";
|
||||
import path from "path";
|
||||
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
|
||||
import AirDCPPSocket from "../shared/airdcpp.socket";
|
||||
|
||||
console.log(`MONGO -> ${process.env.MONGO_URI}`);
|
||||
export default class ImportService extends Service {
|
||||
public constructor(public broker: ServiceBroker) {
|
||||
export default class LibraryService extends Service {
|
||||
public constructor(
|
||||
public broker: ServiceBroker,
|
||||
schema: ServiceSchema<{}> = { name: "library" }
|
||||
) {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "library",
|
||||
@@ -164,78 +165,51 @@ export default class ImportService extends Service {
|
||||
},
|
||||
newImport: {
|
||||
rest: "POST /newImport",
|
||||
// params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
extractionOptions?: any;
|
||||
sessionId: string;
|
||||
}>
|
||||
) {
|
||||
async handler(ctx) {
|
||||
const { sessionId } = ctx.params;
|
||||
try {
|
||||
// Get params to be passed to the import jobs
|
||||
const { sessionId } = ctx.params;
|
||||
// 1. Walk the Source folder
|
||||
klaw(path.resolve(COMICS_DIRECTORY))
|
||||
// 1.1 Filter on .cb* extensions
|
||||
.pipe(
|
||||
through2.obj(function (item, enc, next) {
|
||||
let fileExtension = path.extname(
|
||||
item.path
|
||||
);
|
||||
if (
|
||||
[".cbz", ".cbr", ".cb7"].includes(
|
||||
fileExtension
|
||||
)
|
||||
) {
|
||||
this.push(item);
|
||||
}
|
||||
next();
|
||||
})
|
||||
)
|
||||
// 1.2 Pipe filtered results to the next step
|
||||
// Enqueue the job in the queue
|
||||
.on("data", async (item) => {
|
||||
console.info(
|
||||
"Found a file at path: %s",
|
||||
item.path
|
||||
);
|
||||
let comicExists = await Comic.exists({
|
||||
"rawFileDetails.name": `${path.basename(
|
||||
item.path,
|
||||
path.extname(item.path)
|
||||
)}`,
|
||||
});
|
||||
if (!comicExists) {
|
||||
// 2.1 Reset the job counters in Redis
|
||||
await pubClient.set(
|
||||
"completedJobCount",
|
||||
0
|
||||
);
|
||||
await pubClient.set(
|
||||
"failedJobCount",
|
||||
0
|
||||
);
|
||||
// 2.2 Send the extraction job to the queue
|
||||
this.broker.call("jobqueue.enqueue", {
|
||||
fileObject: {
|
||||
filePath: item.path,
|
||||
fileSize: item.stats.size,
|
||||
},
|
||||
sessionId,
|
||||
importType: "new",
|
||||
action: "enqueue.async",
|
||||
});
|
||||
} else {
|
||||
console.log(
|
||||
"Comic already exists in the library."
|
||||
);
|
||||
}
|
||||
})
|
||||
.on("end", () => {
|
||||
console.log("All files traversed.");
|
||||
// Initialize Redis counters once at the start of the import
|
||||
await pubClient.set("completedJobCount", 0);
|
||||
await pubClient.set("failedJobCount", 0);
|
||||
|
||||
// Convert klaw to use a promise-based approach for better flow control
|
||||
const files = await this.getComicFiles(
|
||||
process.env.COMICS_DIRECTORY
|
||||
);
|
||||
for (const file of files) {
|
||||
console.info(
|
||||
"Found a file at path:",
|
||||
file.path
|
||||
);
|
||||
const comicExists = await Comic.exists({
|
||||
"rawFileDetails.name": path.basename(
|
||||
file.path,
|
||||
path.extname(file.path)
|
||||
),
|
||||
});
|
||||
if (!comicExists) {
|
||||
// Send the extraction job to the queue
|
||||
await this.broker.call("jobqueue.enqueue", {
|
||||
fileObject: {
|
||||
filePath: file.path,
|
||||
fileSize: file.stats.size,
|
||||
},
|
||||
sessionId,
|
||||
importType: "new",
|
||||
action: "enqueue.async",
|
||||
});
|
||||
} else {
|
||||
console.log(
|
||||
"Comic already exists in the library."
|
||||
);
|
||||
}
|
||||
}
|
||||
console.log("All files traversed.");
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
console.error(
|
||||
"Error during newImport processing:",
|
||||
error
|
||||
);
|
||||
}
|
||||
},
|
||||
},
|
||||
@@ -356,21 +330,51 @@ export default class ImportService extends Service {
|
||||
},
|
||||
getComicsMarkedAsWanted: {
|
||||
rest: "GET /getComicsMarkedAsWanted",
|
||||
handler: async (ctx: Context<{}>) => {
|
||||
params: {
|
||||
page: { type: "number", default: 1 },
|
||||
limit: { type: "number", default: 100 },
|
||||
},
|
||||
handler: async (
|
||||
ctx: Context<{ page: number; limit: number }>
|
||||
) => {
|
||||
const { page, limit } = ctx.params;
|
||||
this.logger.info(
|
||||
`Requesting page ${page} with limit ${limit}`
|
||||
);
|
||||
try {
|
||||
// Query to find comics where 'markEntireVolumeAsWanted' is true or 'issues' array is not empty
|
||||
const wantedComics = await Comic.find({
|
||||
wanted: { $exists: true },
|
||||
$or: [
|
||||
{ "wanted.markEntireVolumeWanted": true },
|
||||
{ "wanted.issues": { $not: { $size: 0 } } },
|
||||
],
|
||||
});
|
||||
const options = {
|
||||
page,
|
||||
limit,
|
||||
lean: true,
|
||||
};
|
||||
|
||||
console.log(wantedComics); // Output the found comics
|
||||
return wantedComics;
|
||||
const result = await Comic.paginate(
|
||||
{
|
||||
wanted: { $exists: true },
|
||||
$or: [
|
||||
{
|
||||
"wanted.markEntireVolumeWanted":
|
||||
true,
|
||||
},
|
||||
{
|
||||
"wanted.issues": {
|
||||
$not: { $size: 0 },
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
options
|
||||
);
|
||||
|
||||
// Log the raw result from the database
|
||||
this.logger.info(
|
||||
"Paginate result:",
|
||||
JSON.stringify(result, null, 2)
|
||||
);
|
||||
|
||||
return result.docs; // Return just the docs array
|
||||
} catch (error) {
|
||||
console.error("Error finding comics:", error);
|
||||
this.logger.error("Error finding comics:", error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
@@ -552,9 +556,7 @@ export default class ImportService extends Service {
|
||||
params: { id: "string" },
|
||||
async handler(ctx: Context<{ id: string }>) {
|
||||
console.log(ctx.params.id);
|
||||
return await Comic.findById(
|
||||
new ObjectId(ctx.params.id)
|
||||
);
|
||||
return await Comic.findById(ctx.params.id);
|
||||
},
|
||||
},
|
||||
getComicBooksByIds: {
|
||||
@@ -773,48 +775,6 @@ export default class ImportService extends Service {
|
||||
},
|
||||
},
|
||||
|
||||
// This method belongs in library service,
|
||||
// because bundles can only exist for comics _in the library_
|
||||
// (wanted or imported)
|
||||
getBundles: {
|
||||
rest: "POST /getBundles",
|
||||
params: {},
|
||||
handler: async (
|
||||
ctx: Context<{
|
||||
comicObjectId: string;
|
||||
config: any;
|
||||
}>
|
||||
) => {
|
||||
try {
|
||||
// 1. Get the comic object Id
|
||||
const { config } = ctx.params;
|
||||
const comicObject = await Comic.findById(
|
||||
new ObjectId(ctx.params.comicObjectId)
|
||||
);
|
||||
// 2. Init AirDC++
|
||||
const ADCPPSocket = new AirDCPPSocket(config);
|
||||
await ADCPPSocket.connect();
|
||||
// 3. Get the bundles for the comic object
|
||||
if (comicObject) {
|
||||
// make the call to get the bundles from AirDC++ using the bundleId
|
||||
const bundles =
|
||||
comicObject.acquisition.directconnect.downloads.map(
|
||||
async (bundle) => {
|
||||
return await ADCPPSocket.get(
|
||||
`queue/bundles/${bundle.bundleId}`
|
||||
);
|
||||
}
|
||||
);
|
||||
return Promise.all(bundles);
|
||||
}
|
||||
} catch (error) {
|
||||
throw new Errors.MoleculerError(
|
||||
"Couldn't fetch bundles from AirDC++",
|
||||
500
|
||||
);
|
||||
}
|
||||
},
|
||||
},
|
||||
flushDB: {
|
||||
rest: "POST /flushDB",
|
||||
params: {},
|
||||
@@ -864,7 +824,35 @@ export default class ImportService extends Service {
|
||||
},
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
methods: {
|
||||
// Method to walk the directory and filter comic files
|
||||
getComicFiles: (directory) => {
|
||||
return new Promise((resolve, reject) => {
|
||||
const files = [];
|
||||
klaw(directory)
|
||||
.pipe(
|
||||
through2.obj(function (item, enc, next) {
|
||||
const fileExtension = path.extname(
|
||||
item.path
|
||||
);
|
||||
if (
|
||||
[".cbz", ".cbr", ".cb7"].includes(
|
||||
fileExtension
|
||||
)
|
||||
) {
|
||||
this.push(item);
|
||||
}
|
||||
next();
|
||||
})
|
||||
)
|
||||
.on("data", (item) => {
|
||||
files.push(item);
|
||||
})
|
||||
.on("end", () => resolve(files))
|
||||
.on("error", (err) => reject(err));
|
||||
});
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
"use strict";
|
||||
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
|
||||
import { JobType } from "moleculer-bullmq";
|
||||
import { createClient } from "redis";
|
||||
import { createAdapter } from "@socket.io/redis-adapter";
|
||||
import Session from "../models/session.model";
|
||||
import { pubClient, subClient } from "../config/redis.config";
|
||||
@@ -62,8 +61,12 @@ export default class SocketService extends Service {
|
||||
|
||||
if (active > 0 || paused > 0 || waiting > 0) {
|
||||
// 3. Get job counts
|
||||
const completedJobCount = await pubClient.get("completedJobCount");
|
||||
const failedJobCount = await pubClient.get("failedJobCount");
|
||||
const completedJobCount = await pubClient.get(
|
||||
"completedJobCount"
|
||||
);
|
||||
const failedJobCount = await pubClient.get(
|
||||
"failedJobCount"
|
||||
);
|
||||
|
||||
// 4. Send the counts to the active socket.io session
|
||||
await this.broker.call("socket.broadcast", {
|
||||
@@ -80,9 +83,14 @@ export default class SocketService extends Service {
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
throw new MoleculerError(err, 500, "SESSION_ID_NOT_FOUND", {
|
||||
data: sessionId,
|
||||
});
|
||||
throw new MoleculerError(
|
||||
err,
|
||||
500,
|
||||
"SESSION_ID_NOT_FOUND",
|
||||
{
|
||||
data: sessionId,
|
||||
}
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
@@ -93,7 +101,11 @@ export default class SocketService extends Service {
|
||||
}>
|
||||
) => {
|
||||
const { queueAction } = ctx.params;
|
||||
await this.broker.call("jobqueue.toggle", { action: queueAction }, {});
|
||||
await this.broker.call(
|
||||
"jobqueue.toggle",
|
||||
{ action: queueAction },
|
||||
{}
|
||||
);
|
||||
},
|
||||
importSingleIssue: async (ctx: Context<{}>) => {
|
||||
console.info("AirDC++ finished a download -> ");
|
||||
@@ -104,11 +116,13 @@ export default class SocketService extends Service {
|
||||
// {}
|
||||
// );
|
||||
},
|
||||
// AirDCPP Socket actions
|
||||
|
||||
search: {
|
||||
params: {
|
||||
query: "object",
|
||||
config: "object",
|
||||
namespace: "string",
|
||||
},
|
||||
async handler(ctx) {
|
||||
const { query, config, namespace } = ctx.params;
|
||||
@@ -116,7 +130,10 @@ export default class SocketService extends Service {
|
||||
const ADCPPSocket = new AirDCPPSocket(config);
|
||||
try {
|
||||
await ADCPPSocket.connect();
|
||||
const instance = await ADCPPSocket.post("search", query);
|
||||
const instance = await ADCPPSocket.post(
|
||||
"search",
|
||||
query
|
||||
);
|
||||
|
||||
// Send the instance to the client
|
||||
await namespacedInstance.emit("searchInitiated", {
|
||||
@@ -127,10 +144,14 @@ export default class SocketService extends Service {
|
||||
await ADCPPSocket.addListener(
|
||||
`search`,
|
||||
`search_result_added`,
|
||||
(groupedResult, entityId: number) => {
|
||||
namespacedInstance.emit("searchResultAdded", {
|
||||
groupedResult: { entityId, payload: groupedResult?.result },
|
||||
});
|
||||
(data) => {
|
||||
namespacedInstance.emit(
|
||||
"searchResultAdded",
|
||||
{
|
||||
groupedResult: data,
|
||||
instanceId: instance.id,
|
||||
}
|
||||
);
|
||||
},
|
||||
instance.id
|
||||
);
|
||||
@@ -138,10 +159,18 @@ export default class SocketService extends Service {
|
||||
await ADCPPSocket.addListener(
|
||||
`search`,
|
||||
`search_result_updated`,
|
||||
(updatedResult, entityId: number) => {
|
||||
namespacedInstance.emit("searchResultUpdated", {
|
||||
updatedResult: { entityId, payload: updatedResult?.result },
|
||||
(data) => {
|
||||
console.log({
|
||||
updatedResult: data,
|
||||
instanceId: instance.id,
|
||||
});
|
||||
namespacedInstance.emit(
|
||||
"searchResultUpdated",
|
||||
{
|
||||
updatedResult: data,
|
||||
instanceId: instance.id,
|
||||
}
|
||||
);
|
||||
},
|
||||
instance.id
|
||||
);
|
||||
@@ -151,32 +180,54 @@ export default class SocketService extends Service {
|
||||
`search_hub_searches_sent`,
|
||||
async (searchInfo) => {
|
||||
await this.sleep(5000);
|
||||
const currentInstance = await ADCPPSocket.get(
|
||||
`search/${instance.id}`
|
||||
const currentInstance =
|
||||
await ADCPPSocket.get(
|
||||
`search/${instance.id}`
|
||||
);
|
||||
console.log(
|
||||
JSON.stringify(currentInstance, null, 4)
|
||||
);
|
||||
// Send the instance to the client
|
||||
await namespacedInstance.emit("searchesSent", {
|
||||
searchInfo,
|
||||
});
|
||||
await namespacedInstance.emit(
|
||||
"searchesSent",
|
||||
{
|
||||
searchInfo,
|
||||
}
|
||||
);
|
||||
|
||||
if (currentInstance.result_count === 0) {
|
||||
console.log("No more search results.");
|
||||
namespacedInstance.emit("searchComplete", {
|
||||
message: "No more search results.",
|
||||
});
|
||||
namespacedInstance.emit(
|
||||
"searchComplete",
|
||||
{
|
||||
message:
|
||||
"No more search results.",
|
||||
currentInstance,
|
||||
}
|
||||
);
|
||||
}
|
||||
},
|
||||
instance.id
|
||||
);
|
||||
|
||||
// Perform the actual search
|
||||
await ADCPPSocket.post(`search/${instance.id}/hub_search`, query);
|
||||
await ADCPPSocket.post(
|
||||
`search/${instance.id}/hub_search`,
|
||||
query
|
||||
);
|
||||
} catch (error) {
|
||||
await namespacedInstance.emit("searchError", error.message);
|
||||
throw new MoleculerError("Search failed", 500, "SEARCH_FAILED", {
|
||||
error,
|
||||
});
|
||||
await namespacedInstance.emit(
|
||||
"searchError",
|
||||
error.message
|
||||
);
|
||||
throw new MoleculerError(
|
||||
"Search failed",
|
||||
500,
|
||||
"SEARCH_FAILED",
|
||||
{ error }
|
||||
);
|
||||
} finally {
|
||||
// await ADCPPSocket.disconnect();
|
||||
await ADCPPSocket.disconnect();
|
||||
}
|
||||
},
|
||||
},
|
||||
@@ -225,7 +276,10 @@ export default class SocketService extends Service {
|
||||
"Download and metadata update successful",
|
||||
bundleDBImportResult
|
||||
);
|
||||
this.broker.emit("downloadCompleted", bundleDBImportResult);
|
||||
this.broker.emit(
|
||||
"downloadCompleted",
|
||||
bundleDBImportResult
|
||||
);
|
||||
return bundleDBImportResult;
|
||||
} else {
|
||||
throw new Error(
|
||||
@@ -234,9 +288,12 @@ export default class SocketService extends Service {
|
||||
}
|
||||
} catch (error) {
|
||||
this.broker.emit("downloadError", error.message);
|
||||
throw new MoleculerError("Download failed", 500, "DOWNLOAD_FAILED", {
|
||||
error,
|
||||
});
|
||||
throw new MoleculerError(
|
||||
"Download failed",
|
||||
500,
|
||||
"DOWNLOAD_FAILED",
|
||||
{ error }
|
||||
);
|
||||
} finally {
|
||||
// await ADCPPSocket.disconnect();
|
||||
}
|
||||
@@ -256,7 +313,10 @@ export default class SocketService extends Service {
|
||||
"queue",
|
||||
"queue_bundle_tick",
|
||||
(tickData) => {
|
||||
console.log("Received tick data: ", tickData);
|
||||
console.log(
|
||||
"Received tick data: ",
|
||||
tickData
|
||||
);
|
||||
this.io.emit("bundleTickUpdate", tickData);
|
||||
},
|
||||
null
|
||||
@@ -277,6 +337,22 @@ export default class SocketService extends Service {
|
||||
},
|
||||
},
|
||||
async started() {
|
||||
this.logger.info("Starting Socket Service...");
|
||||
this.logger.debug("pubClient:", pubClient);
|
||||
this.logger.debug("subClient:", subClient);
|
||||
if (!pubClient || !subClient) {
|
||||
this.logger.error("Redis clients are not initialized!");
|
||||
throw new Error("Redis clients are not initialized!");
|
||||
}
|
||||
|
||||
// Additional checks or logic if necessary
|
||||
if (pubClient.status !== "ready") {
|
||||
await pubClient.connect();
|
||||
}
|
||||
|
||||
if (subClient.status !== "ready") {
|
||||
await subClient.connect();
|
||||
}
|
||||
this.io.on("connection", async (socket) => {
|
||||
console.log(
|
||||
`socket.io server connected to client with session ID: ${socket.id}`
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
import BullMqMixin from "moleculer-bullmq";
|
||||
import { pubClient } from "../config/redis.config";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
|
||||
export default class ImageTransformation extends Service {
|
||||
@@ -23,7 +24,7 @@ export default class ImageTransformation extends Service {
|
||||
mixins: [DbMixin("comics", Comic), BullMqMixin],
|
||||
settings: {
|
||||
bullmq: {
|
||||
client: process.env.REDIS_URI,
|
||||
client: pubClient,
|
||||
},
|
||||
},
|
||||
hooks: {},
|
||||
|
||||
Reference in New Issue
Block a user