From cd9ea85b8057740020f1632f0712fe6f918a105c Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Sun, 22 Dec 2024 21:59:49 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A7=20Fixing=20autodownload=20function?= =?UTF-8?q?ality?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 4 +- services/comicprocessor.service.ts | 111 ++++++++++++++++------------- 2 files changed, 62 insertions(+), 53 deletions(-) diff --git a/package.json b/package.json index bb2000e..61087fa 100644 --- a/package.json +++ b/package.json @@ -38,19 +38,19 @@ "eslint-plugin-import": "^2.26.0", "eslint-plugin-jest": "^27.1.6", "jest": "^29.3.1", - "lodash": "^4.17.21", "moleculer-repl": "^0.7.3", "prettier": "^2.8.0", "qbittorrent-api-v2": "^1.2.2", - "socket.io-client": "^4.7.5", "ts-jest": "^29.0.3", "ts-node": "^10.9.1", "typescript": "^4.9.3" }, "dependencies": { + "lodash": "^4.17.21", "@robertklep/qbittorrent": "^1.0.1", "ioredis": "^5.0.0", "kafkajs": "^2.2.4", + "socket.io-client": "^4.7.5", "moleculer": "^0.14.34", "moleculer-web": "^0.10.7", "parse-torrent": "^9.1.5", diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index 89a1211..d23d3a9 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -1,35 +1,31 @@ -"use strict"; -import { Service, ServiceBroker, ServiceSchema } from "moleculer"; -import { Kafka, EachMessagePayload, logLevel } from "kafkajs"; -import io from "socket.io-client"; +import type { EachMessagePayload } from "kafkajs"; +import { Kafka, logLevel } from "kafkajs"; import { isNil, isUndefined } from "lodash"; +import type { ServiceBroker, ServiceSchema } from "moleculer"; +import { Service } from "moleculer"; +import io from "socket.io-client"; import stringSimilarity from "string-similarity-alg"; interface SearchResult { - result: { - id: string; - // Add other relevant fields - }; - search_id: string; - // Add other relevant fields -} - -interface SearchResultPayload { - groupedResult: SearchResult; - updatedResult: SearchResult; - instanceId: string; + groupedResult: { entityId: number; payload: any }; + updatedResult: { entityId: number; payload: any }; } export default class ComicProcessorService extends Service { private kafkaConsumer: any; + private socketIOInstance: any; + private kafkaProducer: any; + private prowlarrResultsMap: Map = new Map(); + private airDCPPSearchResults: Map = new Map(); + private issuesToSearch: any = []; - // @ts-ignore - public constructor( + // @ts-ignore: schema parameter is required by Service constructor + constructor( public broker: ServiceBroker, schema: ServiceSchema<{}> = { name: "comicProcessor" }, ) { @@ -122,10 +118,10 @@ export default class ComicProcessorService extends Service { await this.broker.call("socket.search", { query: dcppSearchQuery, config: { - hostname: "localhost:5600", + hostname: "192.168.1.119:5600", protocol: "http", - username: "user", - password: "pass", + username: "admin", + password: "password", }, namespace: "/automated", }); @@ -157,7 +153,7 @@ export default class ComicProcessorService extends Service { this.logger.error("Error processing job:", error); } }, - produceResultsToKafka: async (query: string, result: any[]) => { + produceResultsToKafka: async (query: string, result: any[]): Promise => { try { /* Match and rank @@ -166,10 +162,12 @@ export default class ComicProcessorService extends Service { this.airDCPPSearchResults, query, ); + console.log(JSON.stringify(result, null, 4)); console.log("majori"); - console.log(result); /* - Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, hence why we use Object.fromEntries + Kafka messages need to be in a format that can be serialized to JSON, + and a Map is not directly serializable in a way that retains its structure, + hence why we use Object.fromEntries */ await this.kafkaProducer.send({ topic: "comic-search-results", @@ -247,48 +245,59 @@ export default class ComicProcessorService extends Service { }); // Handle searchResultAdded event - this.socketIOInstance.on("searchResultAdded", ({ result }: SearchResult) => { - console.log(result); + this.socketIOInstance.on("searchResultAdded", (result: SearchResult) => { + const { + groupedResult: { entityId, payload }, + } = result; + this.logger.info( - "AirDC++ Search result added:", - JSON.stringify(result, null, 4), + `AirDC++ Search result added for entityId: ${entityId} - ${payload?.name}`, ); - // if (!this.airDCPPSearchResults.has(instanceId)) { - // this.airDCPPSearchResults.set(instanceId, []); - // } - // if (!isUndefined(groupedResult?.result)) { - // this.airDCPPSearchResults.get(instanceId).push(groupedResult.result); - // } + if (!this.airDCPPSearchResults.has(entityId)) { + this.airDCPPSearchResults.set(entityId, []); + } + if (!isNil(payload)) { + this.airDCPPSearchResults.get(entityId).push(payload); + } + + console.log(typeof entityId, entityId); + console.log(entityId); + console.log( + "Updated airDCPPSearchResults:", + JSON.stringify(Array.from(this.airDCPPSearchResults.entries()), null, 4), + ); + console.log(JSON.stringify(payload, null, 4)); }); // Handle searchResultUpdated event - this.socketIOInstance.on( - "searchResultUpdated", - async ({ updatedResult, instanceId }: SearchResultPayload) => { - const resultsForInstance = this.airDCPPSearchResults.get(instanceId); + this.socketIOInstance.on("searchResultUpdated", (result: SearchResult) => { + const { + updatedResult: { entityId, payload }, + } = result; + const resultsForInstance = this.airDCPPSearchResults.get(entityId); - if (resultsForInstance) { - const toReplaceIndex = resultsForInstance.findIndex( - (element: any) => element.id === updatedResult.result.id, - ); + if (resultsForInstance) { + const toReplaceIndex = resultsForInstance.findIndex((element: any) => { + console.log("search result updated!"); + console.log(JSON.stringify(element, null, 4)); + return element.id === payload.id; + }); - if (toReplaceIndex !== -1) { - // Replace the existing result with the updated result - resultsForInstance[toReplaceIndex] = updatedResult.result; + if (toReplaceIndex !== -1) { + // Replace the existing result with the updated result + resultsForInstance[toReplaceIndex] = payload; - // Optionally, update the map with the modified array - this.airDCPPSearchResults.set(instanceId, resultsForInstance); - } + // Optionally, update the map with the modified array + this.airDCPPSearchResults.set(entityId, resultsForInstance); } - }, - ); + } + }); // Handle searchComplete event this.socketIOInstance.on("searchesSent", async (data: any) => { this.logger.info( `Search complete for query: "${data.searchInfo.query.pattern}"`, ); - await this.produceResultsToKafka(data.searchInfo.query.pattern); }); },