From 12e46334da1c9278768e0155a20bc7ffa88f2a8c Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Tue, 28 May 2024 08:39:46 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=AA=B3=20kafka=20for=20handling=20dc++=20?= =?UTF-8?q?download=20jobs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package-lock.json | 17 +++ package.json | 3 + services/autodownload.service.ts | 179 +++++++++------------------ services/comicprocessor.service.ts | 188 +++++++++++++++++++++++++++++ 4 files changed, 265 insertions(+), 122 deletions(-) create mode 100644 services/comicprocessor.service.ts diff --git a/package-lock.json b/package-lock.json index c22b043..2a50e64 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "dependencies": { "@robertklep/qbittorrent": "^1.0.1", "ioredis": "^5.0.0", + "kafkajs": "^2.2.4", "moleculer": "^0.14.27", "moleculer-web": "^0.10.5", "parse-torrent": "^9.1.5" @@ -17,6 +18,7 @@ "devDependencies": { "@jest/globals": "^29.3.1", "@types/jest": "^29.2.3", + "@types/lodash": "^4.17.4", "@types/node": "^18.11.9", "@types/parse-torrent": "^5.8.7", "@typescript-eslint/eslint-plugin": "^5.44.0", @@ -31,6 +33,7 @@ "eslint-plugin-import": "^2.26.0", "eslint-plugin-jest": "^27.1.6", "jest": "^29.3.1", + "lodash": "^4.17.21", "moleculer-repl": "^0.7.3", "prettier": "^2.8.0", "qbittorrent-api-v2": "^1.2.2", @@ -1486,6 +1489,12 @@ "integrity": "sha512-dRLjCWHYg4oaA77cxO64oO+7JwCwnIzkZPdrrC71jQmQtlhM556pwKo5bUzqvZndkVbeFLIIi+9TC40JNF5hNQ==", "dev": true }, + "node_modules/@types/lodash": { + "version": "4.17.4", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.17.4.tgz", + "integrity": "sha512-wYCP26ZLxaT3R39kiN2+HcJ4kTd3U1waI/cY7ivWYqFP6pW3ZNpvi6Wd6PHZx7T/t8z0vlkXMg3QYLa7DZ/IJQ==", + "dev": true + }, "node_modules/@types/magnet-uri": { "version": "5.1.5", "resolved": "https://registry.npmjs.org/@types/magnet-uri/-/magnet-uri-5.1.5.tgz", @@ -5462,6 +5471,14 @@ "node": ">=6" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keyv": { "version": "4.5.3", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.3.tgz", diff --git a/package.json b/package.json index 7593264..1af15b1 100644 --- a/package.json +++ b/package.json @@ -23,6 +23,7 @@ "devDependencies": { "@jest/globals": "^29.3.1", "@types/jest": "^29.2.3", + "@types/lodash": "^4.17.4", "@types/node": "^18.11.9", "@types/parse-torrent": "^5.8.7", "@typescript-eslint/eslint-plugin": "^5.44.0", @@ -37,6 +38,7 @@ "eslint-plugin-import": "^2.26.0", "eslint-plugin-jest": "^27.1.6", "jest": "^29.3.1", + "lodash": "^4.17.21", "moleculer-repl": "^0.7.3", "prettier": "^2.8.0", "qbittorrent-api-v2": "^1.2.2", @@ -48,6 +50,7 @@ "dependencies": { "@robertklep/qbittorrent": "^1.0.1", "ioredis": "^5.0.0", + "kafkajs": "^2.2.4", "moleculer": "^0.14.27", "moleculer-web": "^0.10.5", "parse-torrent": "^9.1.5" diff --git a/services/autodownload.service.ts b/services/autodownload.service.ts index 102157a..f1b3289 100644 --- a/services/autodownload.service.ts +++ b/services/autodownload.service.ts @@ -1,8 +1,10 @@ "use strict"; import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer"; -import io from "socket.io-client"; +import { Kafka } from "kafkajs"; export default class AutoDownloadService extends Service { + private kafkaProducer: any; + // @ts-ignore public constructor( public broker: ServiceBroker, @@ -11,141 +13,74 @@ export default class AutoDownloadService extends Service { super(broker); this.parseServiceSchema({ name: "autodownload", - mixins: [], - hooks: {}, actions: { searchWantedComics: { rest: "POST /searchWantedComics", handler: async (ctx: Context<{}>) => { - // 1.iterate through the wanted comic objects, and: - // 1a. Orchestrate all issues from ComicVine if the entire volume is wanted - // 1b. Just the issues in "wanted.issues[]" - const wantedComics: any = await this.broker.call( - "library.getComicsMarkedAsWanted", - {}, - ); + try { + const wantedComics: any = await this.broker.call( + "library.getComicsMarkedAsWanted", + {}, + ); + this.logger.info("Fetched wanted comics:", wantedComics.length); - // 2a. Get the list of hubs from AirDC++ - const data: any = await this.broker.call("settings.getSettings", { - settingsKey: "directConnect", - }); - const { hubs } = data?.client; - console.log("HUBZZZZZ", hubs); - // Iterate through the list of wanted comics - wantedComics.forEach(async (comic: any) => { - let issuesToSearch: any = []; - if (comic.wanted.markEntireVolumeAsWanted) { - // Fetch all issues from ComicVine if the entire volume is wanted - issuesToSearch = await this.broker.call( - "comicvine.getIssuesForVolume", - { - volumeId: comic.wanted.volume.id, - }, - ); - } else if (comic.wanted.issues && comic.wanted.issues.length > 0) { - // 1b. Just the issues in "wanted.issues[]" - issuesToSearch = { - issues: comic.wanted.issues, - volumeName: comic.wanted.volume?.name, - }; + for (const comic of wantedComics) { + if (comic.wanted.markEntireVolumeWanted) { + const issues: any = await this.broker.call( + "comicvine.getIssuesForVolume", + { + volumeId: comic.wanted.volume.id, + }, + ); + for (const issue of issues) { + await this.produceJobToKafka( + comic.wanted.volume.name, + issue, + ); + } + } else if (comic.wanted.issues && comic.wanted.issues.length > 0) { + for (const issue of comic.wanted.issues) { + await this.produceJobToKafka( + comic.wanted.volume?.name, + issue, + ); + } + } } - for (const issue of issuesToSearch.issues) { - // 2. construct the search queries - - // 2b. for AirDC++ search, with the volume name, issueId and cover_date - const { year } = this.parseStringDate(issue.coverDate); - - const dcppSearchQuery = { - query: { - pattern: `${issuesToSearch.volumeName.replace(/#/g, "")} ${ - issue.issueNumber - } ${year}`, - extensions: ["cbz", "cbr", "cb7"], - }, - hub_urls: hubs.map((hub: any) => hub.value), - priority: 5, - }; - // Perform the AirDC++ search - const dcppResults = await this.broker.call("socket.search", { - query: dcppSearchQuery, - config: { - hostname: "localhost:5600", - protocol: "http", - username: "user", - password: "pass", - }, - namespace: "/automated", - }); - this.socketIOInstance.on("searchResultUpdated", (data: any) => { - console.log("Hyaar we go", data); - }); - // const dcppResults = await ctx.call("airdcpp.search", { - // dcppSearchQuery, - // }); - - // 2b. for Prowlarr search, with the volume name, issueId and cover_date - const prowlarrQuery = { - port: "9696", - apiKey: "c4f42e265fb044dc81f7e88bd41c3367", - offset: 0, - categories: [7030], - query: `${issuesToSearch.volumeName} ${issue.issueNumber} ${year}`, - host: "localhost", - limit: 100, - type: "search", - indexerIds: [2], - }; - - // Perform the Prowlarr search - const prowlarrResults = await this.broker.call("prowlarr.search", { - prowlarrQuery, - }); - - // Process results here or after the loop - console.log("DCPP Results: ", dcppResults); - console.log("Prowlarr Results: ", prowlarrResults); - } - }); - }, - }, - determineDownloadChannel: { - rest: "POST /determineDownloadChannel", - handler: async (ctx: Context<{}>) => { - // 1. Parse the incoming search query - // to make sure that it is well-formed - // At the very least, it should have name, year, number - // 2. Choose between download mediums based on user-preference? - // possible choices are: DC++, Torrent - // 3. Perform the search on those media with the aforementioned search query - // 4. Choose a subset of relevant search results, - // and score them - // 5. Download the highest-scoring, relevant result + } catch (error) { + this.logger.error("Error in searchWantedComics:", error); + throw new Errors.MoleculerError( + "Failed to search wanted comics.", + 500, + "SEARCH_WANTED_COMICS_ERROR", + { error }, + ); + } }, }, }, methods: { - parseStringDate: (dateString: string) => { - const date = new Date(dateString); - - // Get the year, month, and day - const year = date.getFullYear(); // 2022 - const month = date.getMonth() + 1; // December is 11 in Date object (0-indexed), so add 1 to make it human-readable - const day = date.getDate(); // 1 - return { year, month, day }; + produceJobToKafka: async (volumeName: string, issue: any) => { + const job = { volumeName, issue }; + await this.kafkaProducer.send({ + topic: "comic-search-jobs", + messages: [{ value: JSON.stringify(job) }], + }); + this.logger.info("Produced job to Kafka:", job); }, }, async started() { - this.socketIOInstance = io("ws://localhost:3001/automated", { - transports: ["websocket"], - withCredentials: true, - }); - this.socketIOInstance.on("connect", (data: any) => { - console.log("connected", data); - }); - - this.socketIOInstance.on("searchResultAdded", (data: any) => { - console.log("Received searchResultUpdated event:", data); + const kafka = new Kafka({ + clientId: "comic-search-service", + brokers: ["localhost:9092"], }); + this.kafkaProducer = kafka.producer(); + await this.kafkaProducer.connect(); + this.logger.info("Kafka producer connected successfully."); + }, + async stopped() { + await this.kafkaProducer.disconnect(); + this.logger.info("Kafka producer disconnected successfully."); }, }); } diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts new file mode 100644 index 0000000..18e5372 --- /dev/null +++ b/services/comicprocessor.service.ts @@ -0,0 +1,188 @@ +"use strict"; +import { Service, ServiceBroker, ServiceSchema } from "moleculer"; +import { Kafka, EachMessagePayload, logLevel } from "kafkajs"; +import { isUndefined } from "lodash"; +import io from "socket.io-client"; + +export default class ComicProcessorService extends Service { + private kafkaConsumer: any; + private socketIOInstance: any; + private kafkaProducer: any; + private prowlarrResultsMap: Map = new Map(); + private airDCPPSearchResults: Array = []; + + // @ts-ignore + public constructor( + public broker: ServiceBroker, + schema: ServiceSchema<{}> = { name: "comicProcessor" }, + ) { + super(broker); + this.parseServiceSchema({ + name: "comicProcessor", + methods: { + parseStringDate: (dateString: string) => { + const date = new Date(dateString); + return { + year: date.getFullYear(), + month: date.getMonth() + 1, + day: date.getDate(), + }; + }, + processJob: async (job: any) => { + this.logger.info("Processing job:", job); + const { volumeName, issue } = job; + const { year } = this.parseStringDate(issue.cover_date || issue.coverDate); + const settings: any = await this.broker.call("settings.getSettings", { + settingsKey: "directConnect", + }); + const hubs = settings.client.hubs.map((hub: any) => hub.value); + const dcppSearchQuery = { + query: { + pattern: `${volumeName.replace(/#/g, "")} ${ + issue.issue_number || issue.issueNumber + } ${year}`, + extensions: ["cbz", "cbr", "cb7"], + }, + hub_urls: hubs, + priority: 5, + }; + this.logger.info( + "DC++ search query:", + JSON.stringify(dcppSearchQuery, null, 4), + ); + + await this.broker.call("socket.search", { + query: dcppSearchQuery, + config: { + hostname: "localhost:5600", + protocol: "http", + username: "user", + password: "pass", + }, + namespace: "/automated", + }); + + const prowlarrResults = await this.broker.call("prowlarr.search", { + prowlarrQuery: { + port: "9696", + apiKey: "c4f42e265fb044dc81f7e88bd41c3367", + offset: 0, + categories: [7030], + query: `${volumeName} ${issue.issueNumber} ${year}`, + host: "localhost", + limit: 100, + type: "search", + indexerIds: [2], + }, + }); + + this.logger.info( + "Prowlarr search results:", + JSON.stringify(prowlarrResults, null, 4), + ); + // Store prowlarr results in map using unique key + const key = `${volumeName}-${issue.issueNumber}-${year}`; + this.prowlarrResultsMap.set(key, prowlarrResults); + }, + produceResultsToKafka: async (dcppResults: any, prowlarrResults: any) => { + const results = { dcppResults, prowlarrResults }; + await this.kafkaProducer.send({ + topic: "comic-search-results", + messages: [{ value: JSON.stringify(results) }], + }); + this.logger.info( + "Produced results to Kafka:", + JSON.stringify(results, null, 4), + ); + }, + }, + async started() { + const kafka = new Kafka({ + clientId: "comic-processor-service", + brokers: ["localhost:9092"], + logLevel: logLevel.INFO, + }); + this.kafkaConsumer = kafka.consumer({ groupId: "comic-processor-group" }); + this.kafkaProducer = kafka.producer(); + await this.kafkaConsumer.connect(); + await this.kafkaProducer.connect(); + this.logger.info("Kafka consumer and producer connected successfully."); + + await this.kafkaConsumer.subscribe({ + topic: "comic-search-jobs", + fromBeginning: true, + }); + + await this.kafkaConsumer.run({ + eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + if (message.value) { + const job = JSON.parse(message.value.toString()); + this.logger.info( + "Consumed job from Kafka:", + JSON.stringify(job, null, 4), + ); + await this.processJob(job); + } else { + this.logger.warn("Received message with null value"); + } + }, + }); + + this.socketIOInstance = io("ws://localhost:3001/automated", { + transports: ["websocket"], + withCredentials: true, + }); + this.socketIOInstance.on("connect", () => { + this.logger.info("Socket.IO connected successfully."); + }); + + this.socketIOInstance.on("searchResultAdded", (data: any) => { + this.logger.info( + "Received search result added:", + JSON.stringify(data, null, 4), + ); + this.airDCPPSearchResults.push(data); + }); + + this.socketIOInstance.on("searchResultUpdated", async (data: any) => { + this.logger.info( + "Received search result update:", + JSON.stringify(data, null, 4), + ); + if ( + !isUndefined(data.result) && + !isUndefined(this.airDCPPSearchResults.result) + ) { + const toReplaceIndex = this.airDCPPSearchResults.findIndex( + (element: any) => { + return element?.result.id === data.result.id; + }, + ); + this.airDCPPSearchResults[toReplaceIndex] = data.result; + } + }); + this.socketIOInstance.on("searchComplete", async () => { + // Ensure results are not empty before producing to Kafka + if (this.airDCPPSearchResults.length > 0) { + await this.produceResultsToKafka(this.airDCPPSearchResults, []); + } else { + this.logger.warn( + "AirDC++ search results are empty, not producing to Kafka.", + ); + } + }); + }, + async stopped() { + await this.kafkaConsumer.disconnect(); + await this.kafkaProducer.disconnect(); + this.logger.info("Kafka consumer and producer disconnected successfully."); + + // Close Socket.IO connection + if (this.socketIOInstance) { + this.socketIOInstance.close(); + this.logger.info("Socket.IO disconnected successfully."); + } + }, + }); + } +}