🔧 Added DC++ search and download actions

This commit is contained in:
2024-04-17 21:14:48 -05:00
parent d7f3d3a7cf
commit 5593fcb4a0
4 changed files with 936 additions and 350 deletions

1008
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -39,7 +39,7 @@
},
"dependencies": {
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
"@elastic/elasticsearch": "^8.6.0",
"@elastic/elasticsearch": "^8.13.1",
"@jorgeferrero/stream-to-buffer": "^2.0.6",
"@npcz/magic": "^1.3.14",
"@root/walk": "^1.1.0",
@@ -48,7 +48,8 @@
"@types/mkdirp": "^1.0.0",
"@types/node": "^13.9.8",
"@types/string-similarity": "^4.0.0",
"axios": "^0.25.0",
"airdcpp-apisocket": "^2.4.4",
"axios": "^1.6.8",
"axios-retry": "^3.2.4",
"bree": "^7.1.5",
"calibre-opds": "^1.0.7",
@@ -74,15 +75,15 @@
"mongoose": "^6.10.4",
"mongoose-paginate-v2": "^1.3.18",
"nats": "^1.3.2",
"opds-extra": "^3.0.9",
"opds-extra": "^3.0.10",
"p7zip-threetwo": "^1.0.4",
"redis": "^4.6.5",
"sanitize-filename-ts": "^1.0.2",
"sharp": "^0.30.4",
"sharp": "^0.33.3",
"threetwo-ui-typings": "^1.0.14",
"through2": "^4.0.2",
"unrar": "^0.2.0",
"xml2js": "^0.4.23"
"xml2js": "^0.6.2"
},
"engines": {
"node": ">= 18.x.x"

View File

@@ -8,6 +8,7 @@ import { pubClient, subClient } from "../config/redis.config";
const { MoleculerError } = require("moleculer").Errors;
const SocketIOService = require("moleculer-io");
const { v4: uuidv4 } = require("uuid");
import AirDCPPSocket from "../shared/airdcpp.socket";
export default class SocketService extends Service {
// @ts-ignore
@@ -114,8 +115,200 @@ export default class SocketService extends Service {
// {}
// );
},
// AirDCPP Socket actions
search: {
params: {
query: "object",
config: "object",
},
async handler(ctx) {
const { query, config } = ctx.params;
const ADCPPSocket = new AirDCPPSocket(config);
try {
await ADCPPSocket.connect();
const instance = await ADCPPSocket.post(
"search",
query
);
// Send the instance to the client
await this.io.emit("searchInitiated", {
instance,
});
// Setting up listeners
await ADCPPSocket.addListener(
`search`,
`search_result_added`,
(groupedResult) => {
this.io.emit(
"searchResultAdded",
groupedResult
);
},
instance.id
);
await ADCPPSocket.addListener(
`search`,
`search_result_updated`,
(updatedResult) => {
console.log("hi", updatedResult);
this.io.emit(
"searchResultUpdated",
updatedResult
);
},
instance.id
);
await ADCPPSocket.addListener(
`search`,
`search_hub_searches_sent`,
async (searchInfo) => {
await this.sleep(5000);
const currentInstance =
await ADCPPSocket.get(
`search/${instance.id}`
);
// Send the instance to the client
await this.io.emit("searchesSent", {
searchInfo,
});
if (currentInstance.result_count === 0) {
console.log("No more search results.");
this.io.emit("searchComplete", {
message: "No more search results.",
});
}
},
instance.id
);
// Perform the actual search
await ADCPPSocket.post(
`search/${instance.id}/hub_search`,
query
);
} catch (error) {
await this.io.emit("searchError", error.message);
throw new MoleculerError(
"Search failed",
500,
"SEARCH_FAILED",
{ error }
);
} finally {
// await ADCPPSocket.disconnect();
}
},
},
download: {
// params: {
// searchInstanceId: "string",
// resultId: "string",
// comicObjectId: "string",
// name: "string",
// size: "number",
// type: "any", // Define more specific type if possible
// config: "object",
// },
async handler(ctx) {
console.log(ctx.params);
const {
searchInstanceId,
resultId,
config,
comicObjectId,
name,
size,
type,
} = ctx.params;
const ADCPPSocket = new AirDCPPSocket(config);
try {
await ADCPPSocket.connect();
const downloadResult = await ADCPPSocket.post(
`search/${searchInstanceId}/results/${resultId}/download`
);
if (downloadResult && downloadResult.bundle_info) {
// Assume bundle_info is part of the response and contains the necessary details
const bundleDBImportResult = await ctx.call(
"library.applyAirDCPPDownloadMetadata",
{
bundleId: downloadResult.bundle_info.id,
comicObjectId,
name,
size,
type,
}
);
this.logger.info(
"Download and metadata update successful",
bundleDBImportResult
);
this.broker.emit(
"downloadCompleted",
bundleDBImportResult
);
return bundleDBImportResult;
} else {
throw new Error(
"Failed to download or missing download result information"
);
}
} catch (error) {
this.broker.emit("downloadError", error.message);
throw new MoleculerError(
"Download failed",
500,
"DOWNLOAD_FAILED",
{ error }
);
} finally {
// await ADCPPSocket.disconnect();
}
},
},
listenBundleTick: {
async handler(ctx) {
const { config } = ctx.params;
const ADCPPSocket = new AirDCPPSocket(config);
try {
await ADCPPSocket.connect();
console.log("Connected to AirDCPP successfully.");
ADCPPSocket.addListener(
"queue",
"queue_bundle_tick",
(tickData) => {
console.log(
"Received tick data: ",
tickData
);
this.io.emit("bundleTickUpdate", tickData);
},
null
); // Assuming no specific ID is needed here
} catch (error) {
console.error(
"Error connecting to AirDCPP or setting listener:",
error
);
throw error;
}
},
},
},
methods: {
sleep: (ms: number): Promise<NodeJS.Timeout> => {
return new Promise((resolve) => setTimeout(resolve, ms));
},
},
methods: {},
async started() {
this.io.on("connection", async (socket) => {
console.log(

72
shared/airdcpp.socket.ts Normal file
View File

@@ -0,0 +1,72 @@
const WebSocket = require("ws");
const { Socket } = require("airdcpp-apisocket");
class AirDCPPSocket {
// Explicitly declare properties
options; // Holds configuration options
socketInstance; // Instance of the AirDCPP Socket
constructor(configuration: any) {
let socketProtocol = configuration.protocol === "https" ? "wss" : "ws";
this.options = {
url: `${socketProtocol}://${configuration.hostname}/api/v1/`,
autoReconnect: true,
reconnectInterval: 5000, // milliseconds
logLevel: "verbose",
ignoredListenerEvents: [
"transfer_statistics",
"hash_statistics",
"hub_counts_updated",
],
username: configuration.username,
password: configuration.password,
};
// Initialize the socket instance using the configured options and WebSocket
this.socketInstance = Socket(this.options, WebSocket);
}
// Method to ensure the socket connection is established if required by the library or implementation logic
async connect() {
// Here we'll check if a connect method exists and call it
if (
this.socketInstance &&
typeof this.socketInstance.connect === "function"
) {
await this.socketInstance.connect();
}
}
// Method to ensure the socket is disconnected properly if required by the library or implementation logic
async disconnect() {
// Similarly, check if a disconnect method exists and call it
if (
this.socketInstance &&
typeof this.socketInstance.disconnect === "function"
) {
await this.socketInstance.disconnect();
}
}
// Method to post data to an endpoint
async post(endpoint: any, data: any = {}) {
// Call post on the socket instance, assuming post is a valid method of the socket instance
return await this.socketInstance.post(endpoint, data);
}
async get(endpoint: any, data: any = {}) {
// Call post on the socket instance, assuming post is a valid method of the socket instance
return await this.socketInstance.get(endpoint, data);
}
// Method to add listeners to the socket instance for handling real-time updates or events
async addListener(event: any, handlerName: any, callback: any, id: any) {
// Attach a listener to the socket instance
return await this.socketInstance.addListener(
event,
handlerName,
callback,
id
);
}
}
export default AirDCPPSocket;