🔧 Fixing autodownload functionality

This commit is contained in:
2024-12-22 21:59:49 -05:00
parent e0954eb3e8
commit cd9ea85b80
2 changed files with 62 additions and 53 deletions

View File

@@ -38,19 +38,19 @@
"eslint-plugin-import": "^2.26.0", "eslint-plugin-import": "^2.26.0",
"eslint-plugin-jest": "^27.1.6", "eslint-plugin-jest": "^27.1.6",
"jest": "^29.3.1", "jest": "^29.3.1",
"lodash": "^4.17.21",
"moleculer-repl": "^0.7.3", "moleculer-repl": "^0.7.3",
"prettier": "^2.8.0", "prettier": "^2.8.0",
"qbittorrent-api-v2": "^1.2.2", "qbittorrent-api-v2": "^1.2.2",
"socket.io-client": "^4.7.5",
"ts-jest": "^29.0.3", "ts-jest": "^29.0.3",
"ts-node": "^10.9.1", "ts-node": "^10.9.1",
"typescript": "^4.9.3" "typescript": "^4.9.3"
}, },
"dependencies": { "dependencies": {
"lodash": "^4.17.21",
"@robertklep/qbittorrent": "^1.0.1", "@robertklep/qbittorrent": "^1.0.1",
"ioredis": "^5.0.0", "ioredis": "^5.0.0",
"kafkajs": "^2.2.4", "kafkajs": "^2.2.4",
"socket.io-client": "^4.7.5",
"moleculer": "^0.14.34", "moleculer": "^0.14.34",
"moleculer-web": "^0.10.7", "moleculer-web": "^0.10.7",
"parse-torrent": "^9.1.5", "parse-torrent": "^9.1.5",

View File

@@ -1,35 +1,31 @@
"use strict"; import type { EachMessagePayload } from "kafkajs";
import { Service, ServiceBroker, ServiceSchema } from "moleculer"; import { Kafka, logLevel } from "kafkajs";
import { Kafka, EachMessagePayload, logLevel } from "kafkajs";
import io from "socket.io-client";
import { isNil, isUndefined } from "lodash"; import { isNil, isUndefined } from "lodash";
import type { ServiceBroker, ServiceSchema } from "moleculer";
import { Service } from "moleculer";
import io from "socket.io-client";
import stringSimilarity from "string-similarity-alg"; import stringSimilarity from "string-similarity-alg";
interface SearchResult { interface SearchResult {
result: { groupedResult: { entityId: number; payload: any };
id: string; updatedResult: { entityId: number; payload: any };
// Add other relevant fields
};
search_id: string;
// Add other relevant fields
}
interface SearchResultPayload {
groupedResult: SearchResult;
updatedResult: SearchResult;
instanceId: string;
} }
export default class ComicProcessorService extends Service { export default class ComicProcessorService extends Service {
private kafkaConsumer: any; private kafkaConsumer: any;
private socketIOInstance: any; private socketIOInstance: any;
private kafkaProducer: any; private kafkaProducer: any;
private prowlarrResultsMap: Map<string, any> = new Map(); private prowlarrResultsMap: Map<string, any> = new Map();
private airDCPPSearchResults: Map<number, any[]> = new Map(); private airDCPPSearchResults: Map<number, any[]> = new Map();
private issuesToSearch: any = []; private issuesToSearch: any = [];
// @ts-ignore // @ts-ignore: schema parameter is required by Service constructor
public constructor( constructor(
public broker: ServiceBroker, public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "comicProcessor" }, schema: ServiceSchema<{}> = { name: "comicProcessor" },
) { ) {
@@ -122,10 +118,10 @@ export default class ComicProcessorService extends Service {
await this.broker.call("socket.search", { await this.broker.call("socket.search", {
query: dcppSearchQuery, query: dcppSearchQuery,
config: { config: {
hostname: "localhost:5600", hostname: "192.168.1.119:5600",
protocol: "http", protocol: "http",
username: "user", username: "admin",
password: "pass", password: "password",
}, },
namespace: "/automated", namespace: "/automated",
}); });
@@ -157,7 +153,7 @@ export default class ComicProcessorService extends Service {
this.logger.error("Error processing job:", error); this.logger.error("Error processing job:", error);
} }
}, },
produceResultsToKafka: async (query: string, result: any[]) => { produceResultsToKafka: async (query: string, result: any[]): Promise<void> => {
try { try {
/* /*
Match and rank Match and rank
@@ -166,10 +162,12 @@ export default class ComicProcessorService extends Service {
this.airDCPPSearchResults, this.airDCPPSearchResults,
query, query,
); );
console.log(JSON.stringify(result, null, 4));
console.log("majori"); console.log("majori");
console.log(result);
/* /*
Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, hence why we use Object.fromEntries Kafka messages need to be in a format that can be serialized to JSON,
and a Map is not directly serializable in a way that retains its structure,
hence why we use Object.fromEntries
*/ */
await this.kafkaProducer.send({ await this.kafkaProducer.send({
topic: "comic-search-results", topic: "comic-search-results",
@@ -247,48 +245,59 @@ export default class ComicProcessorService extends Service {
}); });
// Handle searchResultAdded event // Handle searchResultAdded event
this.socketIOInstance.on("searchResultAdded", ({ result }: SearchResult) => { this.socketIOInstance.on("searchResultAdded", (result: SearchResult) => {
console.log(result); const {
groupedResult: { entityId, payload },
} = result;
this.logger.info( this.logger.info(
"AirDC++ Search result added:", `AirDC++ Search result added for entityId: ${entityId} - ${payload?.name}`,
JSON.stringify(result, null, 4),
); );
// if (!this.airDCPPSearchResults.has(instanceId)) { if (!this.airDCPPSearchResults.has(entityId)) {
// this.airDCPPSearchResults.set(instanceId, []); this.airDCPPSearchResults.set(entityId, []);
// } }
// if (!isUndefined(groupedResult?.result)) { if (!isNil(payload)) {
// this.airDCPPSearchResults.get(instanceId).push(groupedResult.result); this.airDCPPSearchResults.get(entityId).push(payload);
// } }
console.log(typeof entityId, entityId);
console.log(entityId);
console.log(
"Updated airDCPPSearchResults:",
JSON.stringify(Array.from(this.airDCPPSearchResults.entries()), null, 4),
);
console.log(JSON.stringify(payload, null, 4));
}); });
// Handle searchResultUpdated event // Handle searchResultUpdated event
this.socketIOInstance.on( this.socketIOInstance.on("searchResultUpdated", (result: SearchResult) => {
"searchResultUpdated", const {
async ({ updatedResult, instanceId }: SearchResultPayload) => { updatedResult: { entityId, payload },
const resultsForInstance = this.airDCPPSearchResults.get(instanceId); } = result;
const resultsForInstance = this.airDCPPSearchResults.get(entityId);
if (resultsForInstance) { if (resultsForInstance) {
const toReplaceIndex = resultsForInstance.findIndex( const toReplaceIndex = resultsForInstance.findIndex((element: any) => {
(element: any) => element.id === updatedResult.result.id, console.log("search result updated!");
); console.log(JSON.stringify(element, null, 4));
return element.id === payload.id;
});
if (toReplaceIndex !== -1) { if (toReplaceIndex !== -1) {
// Replace the existing result with the updated result // Replace the existing result with the updated result
resultsForInstance[toReplaceIndex] = updatedResult.result; resultsForInstance[toReplaceIndex] = payload;
// Optionally, update the map with the modified array // Optionally, update the map with the modified array
this.airDCPPSearchResults.set(instanceId, resultsForInstance); this.airDCPPSearchResults.set(entityId, resultsForInstance);
} }
} }
}, });
);
// Handle searchComplete event // Handle searchComplete event
this.socketIOInstance.on("searchesSent", async (data: any) => { this.socketIOInstance.on("searchesSent", async (data: any) => {
this.logger.info( this.logger.info(
`Search complete for query: "${data.searchInfo.query.pattern}"`, `Search complete for query: "${data.searchInfo.query.pattern}"`,
); );
await this.produceResultsToKafka(data.searchInfo.query.pattern); await this.produceResultsToKafka(data.searchInfo.query.pattern);
}); });
}, },