34 Commits

Author SHA1 Message Date
1dad0cfd7e 🔧 Fixing autodownload functionality 2024-12-22 22:00:18 -05:00
1d48499c64 Revert "Merge branch 'master' into getbundles-fix"
This reverts commit 30168844f3, reversing
changes made to 2e60e2e3d5.
2024-10-24 10:59:09 -04:00
c9ecbb911a Merge pull request #12 from rishighan/getbundles-fix
getBundles Fix
2024-10-24 10:50:29 -04:00
30168844f3 Merge branch 'master' into getbundles-fix 2024-10-24 10:47:31 -04:00
2e60e2e3d5 Added package-lock.json 2024-10-24 10:45:19 -04:00
8254ec2093 ⌫ package.json deleted 2024-10-24 10:41:59 -04:00
7381d03045 🔧 Fixed getBundles endpoint 2024-10-23 23:14:21 -04:00
d7e865f84f 🔧 Prettification 2024-10-23 14:26:24 -04:00
baa5a99855 🔧 Removed indirection for getBundles 2024-10-23 13:42:05 -04:00
68c2dacff4 🔧 getBundles endpoint WIP 2024-10-21 18:04:16 -04:00
55e0ce6d36 🖌️ Formatting changes 2024-10-18 13:19:57 -04:00
4ffad69c44 🔧 Todo to move the method from UI 2024-10-16 18:50:14 -04:00
f9438f2129 🔧 Fixing broken AirDCPP search 2024-09-26 21:33:02 -04:00
2247411ac8 🪳 Added kafka to the docker-compose deps 2024-05-28 08:40:33 -04:00
e61ecb1143 🔧 Refactor for docker-compose 2024-05-24 14:22:59 -04:00
e01421f17b 🐳 Added all other deps 2024-05-23 23:15:03 -04:00
cc271021e0 🐳 Created a deps docker-compose stack 2024-05-19 21:19:15 -04:00
cc772504ae 🔧 Fixed the Redis disconnection issue 2024-05-17 01:26:16 -04:00
8dcb17a6a0 🔧 Reverted 2024-05-16 14:15:09 -04:00
a06896ffcc 🔧 Reverting to nats for transporter needs 2024-05-16 14:03:07 -04:00
03f6623ed0 🔧 Fixes 2024-05-15 21:27:38 -05:00
66f9d63b44 🔧 Debuggin Redis connectivity issue 2024-05-15 16:58:49 -05:00
a936df3144 🪲 Added a console.log 2024-05-15 12:13:50 -05:00
dc9dabca48 🔧 Fixed REDIS_URI 2024-05-15 12:00:51 -05:00
4680fd0875 ⬅️ Reverted changes 2024-05-15 11:47:08 -05:00
323548c0ff 🔧 WIP Dockerfile fixes 2024-05-15 11:32:11 -05:00
f4563c12c6 🔧 Added startup scripts fixing MongoDB timeouts 2024-05-12 23:35:01 -04:00
1b0cada848 🏗️ Added validation to db mixin 2024-05-11 13:31:10 -04:00
750a74cd9f Merge pull request #10 from rishighan/automated-download-loop
Automated download loop
2024-05-10 22:59:08 -04:00
402ee4d81b 🏗️ Updated Dockerfile 2024-05-10 22:55:57 -04:00
1fa35ac0e3 🏗️ Automatic downloads endpoint support 2024-05-09 13:49:26 -04:00
680594e67c 🔧 Added wiring for AirDC++ service 2024-04-23 22:47:32 -05:00
5593fcb4a0 🔧 Added DC++ search and download actions 2024-04-17 21:14:48 -05:00
d7f3d3a7cf 🔧 Modified Comic model 2024-04-16 22:41:33 -05:00
11 changed files with 2182 additions and 1683 deletions

View File

@@ -1,40 +1,61 @@
FROM alpine:3.14
# Use a base image with Node.js 22.1.0
FROM node:22.1.0
# Set metadata for contact
LABEL maintainer="Rishi Ghan <rishi.ghan@gmail.com>"
# Show all node logs
# Set environment variables
ENV NPM_CONFIG_LOGLEVEL warn
ENV NODE_ENV=production
# Set the working directory
WORKDIR /core-services
# Install required packages
RUN apt-get update && apt-get install -y \
libvips-tools \
wget \
imagemagick \
python3 \
xvfb \
xz-utils \
curl \
bash \
software-properties-common
RUN apk add --update \
--repository http://nl.alpinelinux.org/alpine/v3.14/main \
vips-tools \
wget \
imagemagick \
python3 \
unrar \
p7zip \
nodejs \
npm \
xvfb \
xz
# Install p7zip
RUN apt-get update && apt-get install -y p7zip
# Install unrar directly from RARLAB
RUN wget https://www.rarlab.com/rar/rarlinux-x64-621.tar.gz \
&& tar -zxvf rarlinux-x64-621.tar.gz \
&& cp rar/unrar /usr/bin/ \
&& rm -rf rarlinux-x64-621.tar.gz rar
# Clean up package lists
RUN rm -rf /var/lib/apt/lists/*
# Verify Node.js installation
RUN node -v && npm -v
# Copy application configuration files
COPY package.json package-lock.json ./
COPY moleculer.config.ts ./
COPY tsconfig.json ./
RUN npm i
# Install Dependncies
# Install application dependencies
RUN npm install
RUN npm install -g typescript ts-node
# Copy the rest of the application files
COPY . .
# Build and cleanup
# Build and clean up
RUN npm run build \
&& npm prune
&& npm prune
# Expose the application's port
EXPOSE 3000
# Start server
CMD ["npm", "start"]
# Command to run the application
CMD ["npm", "start"]

View File

@@ -0,0 +1,103 @@
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
networks:
- kafka-net
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
networks:
- kafka-net
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8087:8080
environment:
DYNAMIC_CONFIG_ENABLED: true
volumes:
- /Users/rishi/work/config/kafka-ui/config.yml:/etc/kafkaui/dynamic_config.yaml
depends_on:
- kafka1
- zoo1
networks:
- kafka-net
db:
image: "mongo:latest"
container_name: database
networks:
- kafka-net
ports:
- "27017:27017"
volumes:
- "mongodb_data:/bitnami/mongodb"
redis:
image: "bitnami/redis:latest"
container_name: queue
environment:
ALLOW_EMPTY_PASSWORD: "yes"
networks:
- kafka-net
ports:
- "6379:6379"
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2
container_name: elasticsearch
environment:
- "discovery.type=single-node"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- "xpack.security.enabled=true"
- "xpack.security.authc.api_key.enabled=true"
- "ELASTIC_PASSWORD=password"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- "9200:9200"
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
volumes:
mongodb_data:
driver: local
elasticsearch:
driver: local

View File

@@ -54,30 +54,38 @@ const DirectConnectBundleSchema = mongoose.Schema({
name: String,
size: String,
type: {},
_id: false,
});
const wantedSchema = new mongoose.Schema({
source: { type: String, default: null },
markEntireVolumeWanted: Boolean,
issues: {
type: [
{
const wantedSchema = mongoose.Schema(
{
source: { type: String, default: null },
markEntireVolumeWanted: Boolean,
issues: {
type: [
{
_id: false, // Disable automatic ObjectId creation for each issue
id: Number,
url: String,
image: { type: Array, default: [] },
coverDate: String,
issueNumber: String,
},
],
default: null,
},
volume: {
type: {
_id: false, // Disable automatic ObjectId creation for volume
id: Number,
url: String,
image: { type: Array, default: [] },
name: String,
},
],
default: null, // Set default to null for issues
},
volume: {
type: {
id: Number,
url: String,
image: { type: Array, default: [] },
name: String,
default: null,
},
default: null, // Set default to null for volume
},
});
{ _id: false }
); // Disable automatic ObjectId creation for the wanted object itself
const ComicSchema = mongoose.Schema(
{
@@ -93,18 +101,12 @@ const ComicSchema = mongoose.Schema(
},
sourcedMetadata: {
comicInfo: { type: mongoose.Schema.Types.Mixed, default: {} },
comicvine: {
type: Object,
es_indexed: true,
default: {},
},
shortboxed: {},
comicvine: { type: mongoose.Schema.Types.Mixed, default: {} }, // Set as a freeform object
locg: {
type: LOCGSchema,
es_indexed: true,
default: {},
},
gcd: {},
},
rawFileDetails: {
type: RawFileDetailsSchema,

3030
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -39,7 +39,7 @@
},
"dependencies": {
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
"@elastic/elasticsearch": "^8.6.0",
"@elastic/elasticsearch": "^8.13.1",
"@jorgeferrero/stream-to-buffer": "^2.0.6",
"@npcz/magic": "^1.3.14",
"@root/walk": "^1.1.0",
@@ -48,7 +48,8 @@
"@types/mkdirp": "^1.0.0",
"@types/node": "^13.9.8",
"@types/string-similarity": "^4.0.0",
"axios": "^0.25.0",
"airdcpp-apisocket": "^2.4.4",
"axios": "^1.6.8",
"axios-retry": "^3.2.4",
"bree": "^7.1.5",
"calibre-opds": "^1.0.7",
@@ -74,15 +75,15 @@
"mongoose": "^6.10.4",
"mongoose-paginate-v2": "^1.3.18",
"nats": "^1.3.2",
"opds-extra": "^3.0.9",
"opds-extra": "^3.0.10",
"p7zip-threetwo": "^1.0.4",
"redis": "^4.6.5",
"sanitize-filename-ts": "^1.0.2",
"sharp": "^0.30.4",
"sharp": "^0.33.3",
"threetwo-ui-typings": "^1.0.14",
"through2": "^4.0.2",
"unrar": "^0.2.0",
"xml2js": "^0.4.23"
"xml2js": "^0.6.2"
},
"engines": {
"node": ">= 18.x.x"

156
services/airdcpp.service.ts Normal file
View File

@@ -0,0 +1,156 @@
"use strict";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import axios from "axios";
import AirDCPPSocket from "../shared/airdcpp.socket";
export default class AirDCPPService extends Service {
// @ts-ignore
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "airdcpp" }
) {
super(broker);
this.parseServiceSchema({
name: "airdcpp",
mixins: [],
hooks: {},
actions: {
initialize: {
rest: "POST /initialize",
handler: async (
ctx: Context<{
host: {
hostname: string;
port: string;
protocol: string;
username: string;
password: string;
};
}>
) => {
try {
const {
host: {
hostname,
protocol,
port,
username,
password,
},
} = ctx.params;
const airDCPPSocket = new AirDCPPSocket({
protocol,
hostname: `${hostname}:${port}`,
username,
password,
});
return await airDCPPSocket.connect();
} catch (err) {
console.error(err);
}
},
},
getHubs: {
rest: "POST /getHubs",
timeout: 70000,
handler: async (
ctx: Context<{
host: {
hostname: string;
port: string;
protocol: string;
username: string;
password: string;
};
}>
) => {
const {
host: {
hostname,
port,
protocol,
username,
password,
},
} = ctx.params;
try {
const airDCPPSocket = new AirDCPPSocket({
protocol,
hostname: `${hostname}:${port}`,
username,
password,
});
await airDCPPSocket.connect();
return await airDCPPSocket.get(`hubs`);
} catch (err) {
throw err;
}
},
},
search: {
rest: "POST /search",
timeout: 20000,
handler: async (
ctx: Context<{
host: {
hostname;
port;
protocol;
username;
password;
};
dcppSearchQuery;
}>
) => {
try {
const {
host: {
hostname,
port,
protocol,
username,
password,
},
dcppSearchQuery,
} = ctx.params;
const airDCPPSocket = new AirDCPPSocket({
protocol,
hostname: `${hostname}:${port}`,
username,
password,
});
await airDCPPSocket.connect();
const searchInstance = await airDCPPSocket.post(
`search`
);
// Post the search
const searchInfo = await airDCPPSocket.post(
`search/${searchInstance.id}/hub_search`,
dcppSearchQuery
);
await this.sleep(10000);
const results = await airDCPPSocket.get(
`search/${searchInstance.id}/results/0/5`
);
return results;
} catch (err) {
throw err;
}
},
},
},
methods: {
sleep: (ms: number) => {
return new Promise((resolve) => setTimeout(resolve, ms));
},
},
});
}
}

View File

@@ -57,6 +57,7 @@ const through2 = require("through2");
import klaw from "klaw";
import path from "path";
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
import AirDCPPSocket from "../shared/airdcpp.socket";
console.log(`MONGO -> ${process.env.MONGO_URI}`);
export default class ImportService extends Service {
@@ -248,10 +249,7 @@ export default class ImportService extends Service {
payload: {
_id?: string;
sourcedMetadata: {
comicvine?: {
volume: { api_detail_url: string };
volumeInformation: {};
};
comicvine?: any;
locg?: {};
};
inferredMetadata: {
@@ -262,7 +260,7 @@ export default class ImportService extends Service {
};
wanted: {
issues: [];
volume: {};
volume: { id: number };
source: string;
markEntireVolumeWanted: Boolean;
};
@@ -275,41 +273,109 @@ export default class ImportService extends Service {
}>
) {
try {
let volumeDetails;
const comicMetadata = ctx.params.payload;
console.log(
JSON.stringify(ctx.params.payload, null, 4)
);
const { payload } = ctx.params;
const { wanted } = payload;
console.log("Saving to Mongo...");
console.log(
`Import type: [${ctx.params.importType}]`
);
switch (ctx.params.importType) {
case "new":
console.log(comicMetadata);
return await Comic.create(comicMetadata);
case "update":
return await Comic.findOneAndUpdate(
{
"acquisition.directconnect.downloads.bundleId":
ctx.params.bundleId,
},
comicMetadata,
{
upsert: true,
new: true,
}
);
default:
return false;
if (
!wanted ||
!wanted.volume ||
!wanted.volume.id
) {
console.log(
"No valid identifier for upsert. Attempting to create a new document with minimal data..."
);
const newDocument = new Comic(payload); // Using the entire payload for the new document
await newDocument.save();
return {
success: true,
message:
"New document created due to lack of valid identifiers.",
data: newDocument,
};
}
let condition = {
"wanted.volume.id": wanted.volume.id,
};
let update: any = {
// Using 'any' to bypass strict type checks; alternatively, define a more accurate type
$set: {
rawFileDetails: payload.rawFileDetails,
inferredMetadata: payload.inferredMetadata,
sourcedMetadata: payload.sourcedMetadata,
},
$setOnInsert: {
"wanted.source": payload.wanted.source,
"wanted.markEntireVolumeWanted":
payload.wanted.markEntireVolumeWanted,
"wanted.volume": payload.wanted.volume,
},
};
if (wanted.issues && wanted.issues.length > 0) {
update.$addToSet = {
"wanted.issues": { $each: wanted.issues },
};
}
const options = {
upsert: true,
new: true,
};
const result = await Comic.findOneAndUpdate(
condition,
update,
options
);
console.log(
"Operation completed. Document updated or inserted:",
result
);
return {
success: true,
message: "Document successfully upserted.",
data: result,
};
} catch (error) {
console.log(error);
throw new Errors.MoleculerError(
"Import failed.",
"Operation failed.",
500
);
}
},
},
getComicsMarkedAsWanted: {
rest: "GET /getComicsMarkedAsWanted",
handler: async (ctx: Context<{}>) => {
try {
// Query to find comics where 'markEntireVolumeAsWanted' is true or 'issues' array is not empty
const wantedComics = await Comic.find({
wanted: { $exists: true },
$or: [
{ "wanted.markEntireVolumeWanted": true },
{ "wanted.issues": { $not: { $size: 0 } } },
],
});
console.log(wantedComics); // Output the found comics
return wantedComics;
} catch (error) {
console.error("Error finding comics:", error);
throw error;
}
},
},
applyComicVineMetadata: {
rest: "POST /applyComicVineMetadata",
params: {},
@@ -486,7 +552,9 @@ export default class ImportService extends Service {
params: { id: "string" },
async handler(ctx: Context<{ id: string }>) {
console.log(ctx.params.id);
return await Comic.findById(ctx.params.id);
return await Comic.findById(
new ObjectId(ctx.params.id)
);
},
},
getComicBooksByIds: {
@@ -705,6 +773,48 @@ export default class ImportService extends Service {
},
},
// This method belongs in library service,
// because bundles can only exist for comics _in the library_
// (wanted or imported)
getBundles: {
rest: "POST /getBundles",
params: {},
handler: async (
ctx: Context<{
comicObjectId: string;
config: any;
}>
) => {
try {
// 1. Get the comic object Id
const { config } = ctx.params;
const comicObject = await Comic.findById(
new ObjectId(ctx.params.comicObjectId)
);
// 2. Init AirDC++
const ADCPPSocket = new AirDCPPSocket(config);
await ADCPPSocket.connect();
// 3. Get the bundles for the comic object
if (comicObject) {
// make the call to get the bundles from AirDC++ using the bundleId
const bundles =
comicObject.acquisition.directconnect.downloads.map(
async (bundle) => {
return await ADCPPSocket.get(
`queue/bundles/${bundle.bundleId}`
);
}
);
return Promise.all(bundles);
}
} catch (error) {
throw new Errors.MoleculerError(
"Couldn't fetch bundles from AirDC++",
500
);
}
},
},
flushDB: {
rest: "POST /flushDB",
params: {},

View File

@@ -76,6 +76,7 @@ export default class SettingsService extends Service {
}>
) {
try {
console.log(ctx.params);
let query = {};
const { settingsKey, settingsObjectId } =
ctx.params;

View File

@@ -8,6 +8,7 @@ import { pubClient, subClient } from "../config/redis.config";
const { MoleculerError } = require("moleculer").Errors;
const SocketIOService = require("moleculer-io");
const { v4: uuidv4 } = require("uuid");
import AirDCPPSocket from "../shared/airdcpp.socket";
export default class SocketService extends Service {
// @ts-ignore
@@ -23,10 +24,12 @@ export default class SocketService extends Service {
port: process.env.PORT || 3001,
io: {
namespaces: {
"/": {
"/automated": {
events: {
call: {
whitelist: ["socket.*"],
whitelist: [
"socket.*", // Allow 'search' in the automated namespace
],
},
},
},
@@ -59,12 +62,8 @@ export default class SocketService extends Service {
if (active > 0 || paused > 0 || waiting > 0) {
// 3. Get job counts
const completedJobCount = await pubClient.get(
"completedJobCount"
);
const failedJobCount = await pubClient.get(
"failedJobCount"
);
const completedJobCount = await pubClient.get("completedJobCount");
const failedJobCount = await pubClient.get("failedJobCount");
// 4. Send the counts to the active socket.io session
await this.broker.call("socket.broadcast", {
@@ -81,14 +80,9 @@ export default class SocketService extends Service {
}
}
} catch (err) {
throw new MoleculerError(
err,
500,
"SESSION_ID_NOT_FOUND",
{
data: sessionId,
}
);
throw new MoleculerError(err, 500, "SESSION_ID_NOT_FOUND", {
data: sessionId,
});
}
},
@@ -99,11 +93,7 @@ export default class SocketService extends Service {
}>
) => {
const { queueAction } = ctx.params;
await this.broker.call(
"jobqueue.toggle",
{ action: queueAction },
{}
);
await this.broker.call("jobqueue.toggle", { action: queueAction }, {});
},
importSingleIssue: async (ctx: Context<{}>) => {
console.info("AirDC++ finished a download -> ");
@@ -114,8 +104,178 @@ export default class SocketService extends Service {
// {}
// );
},
search: {
params: {
query: "object",
config: "object",
},
async handler(ctx) {
const { query, config, namespace } = ctx.params;
const namespacedInstance = this.io.of(namespace || "/");
const ADCPPSocket = new AirDCPPSocket(config);
try {
await ADCPPSocket.connect();
const instance = await ADCPPSocket.post("search", query);
// Send the instance to the client
await namespacedInstance.emit("searchInitiated", {
instance,
});
// Setting up listeners
await ADCPPSocket.addListener(
`search`,
`search_result_added`,
(groupedResult, entityId: number) => {
namespacedInstance.emit("searchResultAdded", {
groupedResult: { entityId, payload: groupedResult?.result },
});
},
instance.id
);
await ADCPPSocket.addListener(
`search`,
`search_result_updated`,
(updatedResult, entityId: number) => {
namespacedInstance.emit("searchResultUpdated", {
updatedResult: { entityId, payload: updatedResult?.result },
});
},
instance.id
);
await ADCPPSocket.addListener(
`search`,
`search_hub_searches_sent`,
async (searchInfo) => {
await this.sleep(5000);
const currentInstance = await ADCPPSocket.get(
`search/${instance.id}`
);
// Send the instance to the client
await namespacedInstance.emit("searchesSent", {
searchInfo,
});
if (currentInstance.result_count === 0) {
console.log("No more search results.");
namespacedInstance.emit("searchComplete", {
message: "No more search results.",
});
}
},
instance.id
);
// Perform the actual search
await ADCPPSocket.post(`search/${instance.id}/hub_search`, query);
} catch (error) {
await namespacedInstance.emit("searchError", error.message);
throw new MoleculerError("Search failed", 500, "SEARCH_FAILED", {
error,
});
} finally {
// await ADCPPSocket.disconnect();
}
},
},
download: {
// params: {
// searchInstanceId: "string",
// resultId: "string",
// comicObjectId: "string",
// name: "string",
// size: "number",
// type: "any", // Define more specific type if possible
// config: "object",
// },
async handler(ctx) {
console.log(ctx.params);
const {
searchInstanceId,
resultId,
config,
comicObjectId,
name,
size,
type,
} = ctx.params;
const ADCPPSocket = new AirDCPPSocket(config);
try {
await ADCPPSocket.connect();
const downloadResult = await ADCPPSocket.post(
`search/${searchInstanceId}/results/${resultId}/download`
);
if (downloadResult && downloadResult.bundle_info) {
// Assume bundle_info is part of the response and contains the necessary details
const bundleDBImportResult = await ctx.call(
"library.applyAirDCPPDownloadMetadata",
{
bundleId: downloadResult.bundle_info.id,
comicObjectId,
name,
size,
type,
}
);
this.logger.info(
"Download and metadata update successful",
bundleDBImportResult
);
this.broker.emit("downloadCompleted", bundleDBImportResult);
return bundleDBImportResult;
} else {
throw new Error(
"Failed to download or missing download result information"
);
}
} catch (error) {
this.broker.emit("downloadError", error.message);
throw new MoleculerError("Download failed", 500, "DOWNLOAD_FAILED", {
error,
});
} finally {
// await ADCPPSocket.disconnect();
}
},
},
listenBundleTick: {
async handler(ctx) {
const { config } = ctx.params;
const ADCPPSocket = new AirDCPPSocket(config);
try {
await ADCPPSocket.connect();
console.log("Connected to AirDCPP successfully.");
ADCPPSocket.addListener(
"queue",
"queue_bundle_tick",
(tickData) => {
console.log("Received tick data: ", tickData);
this.io.emit("bundleTickUpdate", tickData);
},
null
); // Assuming no specific ID is needed here
} catch (error) {
console.error(
"Error connecting to AirDCPP or setting listener:",
error
);
throw error;
}
},
},
},
methods: {
sleep: (ms: number): Promise<NodeJS.Timeout> => {
return new Promise((resolve) => setTimeout(resolve, ms));
},
},
methods: {},
async started() {
this.io.on("connection", async (socket) => {
console.log(

View File

@@ -15,7 +15,7 @@ export default class ImageTransformation extends Service {
// @ts-ignore
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "imagetransformation" }
schema: ServiceSchema<{}> = { name: "torrentjobs" }
) {
super(broker);
this.parseServiceSchema({
@@ -77,7 +77,7 @@ export default class ImageTransformation extends Service {
"qbittorrent.getTorrentRealTimeStats",
{ infoHashes }
);
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
// 4.
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "AS_TORRENT_DATA",

73
shared/airdcpp.socket.ts Normal file
View File

@@ -0,0 +1,73 @@
const WebSocket = require("ws");
const { Socket } = require("airdcpp-apisocket");
class AirDCPPSocket {
// Explicitly declare properties
options; // Holds configuration options
socketInstance; // Instance of the AirDCPP Socket
constructor(configuration: any) {
let socketProtocol = configuration.protocol === "https" ? "wss" : "ws";
this.options = {
url: `${socketProtocol}://${configuration.hostname}/api/v1/`,
autoReconnect: true,
reconnectInterval: 5000, // milliseconds
logLevel: "verbose",
ignoredListenerEvents: [
"transfer_statistics",
"hash_statistics",
"hub_counts_updated",
],
username: configuration.username,
password: configuration.password,
};
// Initialize the socket instance using the configured options and WebSocket
this.socketInstance = Socket(this.options, WebSocket);
}
// Method to ensure the socket connection is established if required by the library or implementation logic
async connect() {
// Here we'll check if a connect method exists and call it
if (
this.socketInstance &&
typeof this.socketInstance.connect === "function"
) {
const sessionInformation = await this.socketInstance.connect();
return sessionInformation;
}
}
// Method to ensure the socket is disconnected properly if required by the library or implementation logic
async disconnect() {
// Similarly, check if a disconnect method exists and call it
if (
this.socketInstance &&
typeof this.socketInstance.disconnect === "function"
) {
await this.socketInstance.disconnect();
}
}
// Method to post data to an endpoint
async post(endpoint: any, data: any = {}) {
// Call post on the socket instance, assuming post is a valid method of the socket instance
return await this.socketInstance.post(endpoint, data);
}
async get(endpoint: any, data: any = {}) {
// Call post on the socket instance, assuming post is a valid method of the socket instance
return await this.socketInstance.get(endpoint, data);
}
// Method to add listeners to the socket instance for handling real-time updates or events
async addListener(event: any, handlerName: any, callback: any, id: any) {
// Attach a listener to the socket instance
return await this.socketInstance.addListener(
event,
handlerName,
callback,
id
);
}
}
export default AirDCPPSocket;