diff --git a/package-lock.json b/package-lock.json index 36ab843..f95073a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,18 +10,20 @@ "dependencies": { "@robertklep/qbittorrent": "^1.0.1", "ioredis": "^5.0.0", + "kafkajs": "^2.2.4", "moleculer": "^0.14.27", "moleculer-web": "^0.10.5", - "parse-torrent": "^9.1.5" + "parse-torrent": "^9.1.5", + "string-similarity-alg": "^1.3.2" }, "devDependencies": { "@jest/globals": "^29.3.1", "@types/jest": "^29.2.3", + "@types/lodash": "^4.17.4", "@types/node": "^18.11.9", "@types/parse-torrent": "^5.8.7", "@typescript-eslint/eslint-plugin": "^5.44.0", "@typescript-eslint/parser": "^5.44.0", - "airdcpp-apisocket": "^2.4.4", "axios": "^1.5.0", "concurrently": "^7.6.0", "cross-env": "^7.0.3", @@ -32,13 +34,14 @@ "eslint-plugin-import": "^2.26.0", "eslint-plugin-jest": "^27.1.6", "jest": "^29.3.1", + "lodash": "^4.17.21", "moleculer-repl": "^0.7.3", "prettier": "^2.8.0", "qbittorrent-api-v2": "^1.2.2", + "socket.io-client": "^4.7.5", "ts-jest": "^29.0.3", "ts-node": "^10.9.1", - "typescript": "^4.9.3", - "ws": "^8.16.0" + "typescript": "^4.9.3" }, "engines": { "node": ">= 16.x.x" @@ -1361,6 +1364,12 @@ "@sinonjs/commons": "^3.0.0" } }, + "node_modules/@socket.io/component-emitter": { + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.2.tgz", + "integrity": "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==", + "dev": true + }, "node_modules/@tsconfig/node10": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz", @@ -1481,6 +1490,12 @@ "integrity": "sha512-dRLjCWHYg4oaA77cxO64oO+7JwCwnIzkZPdrrC71jQmQtlhM556pwKo5bUzqvZndkVbeFLIIi+9TC40JNF5hNQ==", "dev": true }, + "node_modules/@types/lodash": { + "version": "4.17.4", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.17.4.tgz", + "integrity": "sha512-wYCP26ZLxaT3R39kiN2+HcJ4kTd3U1waI/cY7ivWYqFP6pW3ZNpvi6Wd6PHZx7T/t8z0vlkXMg3QYLa7DZ/IJQ==", + "dev": true + }, "node_modules/@types/magnet-uri": { "version": "5.1.5", "resolved": "https://registry.npmjs.org/@types/magnet-uri/-/magnet-uri-5.1.5.tgz", @@ -1761,19 +1776,6 @@ "node": ">=0.4.0" } }, - "node_modules/airdcpp-apisocket": { - "version": "2.4.4", - "resolved": "https://registry.npmjs.org/airdcpp-apisocket/-/airdcpp-apisocket-2.4.4.tgz", - "integrity": "sha512-Xn0kWSVdLJwPpOoHcdI2wzzfzZW2jTpuyZR2wCNs2UIlZhO+FTwMf3QQfNCt5gYTOld9LaiCEulxFuXDA8qrLA==", - "dev": true, - "dependencies": { - "chalk": "^4.1.2", - "events": "^3.3.0", - "invariant": "^2.2.4", - "is-in-browser": "^2.0.0", - "promise": "^8.1.0" - } - }, "node_modules/ajv": { "version": "6.12.6", "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", @@ -2076,12 +2078,6 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/asap": { - "version": "2.0.6", - "resolved": "https://registry.npmjs.org/asap/-/asap-2.0.6.tgz", - "integrity": "sha512-BSHWgDSAiKs50o2Re8ppvp3seVHXSRM44cdSsT9FfNEUUZLOGWVCsiWaRPWM1Znn+mqZ1OfVZ3z3DWEzSp7hRA==", - "dev": true - }, "node_modules/astral-regex": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/astral-regex/-/astral-regex-2.0.0.tgz", @@ -2361,12 +2357,12 @@ } }, "node_modules/braces": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz", - "integrity": "sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz", + "integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==", "dev": true, "dependencies": { - "fill-range": "^7.0.1" + "fill-range": "^7.1.1" }, "engines": { "node": ">=8" @@ -3029,6 +3025,28 @@ "node": ">= 0.8" } }, + "node_modules/engine.io-client": { + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.5.4.tgz", + "integrity": "sha512-GeZeeRjpD2qf49cZQ0Wvh/8NJNfeXkXXcoGh+F77oEAgo9gUHwT1fCRxSNU+YEEaysOJTnsFHmM5oAcPy4ntvQ==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.17.1", + "xmlhttprequest-ssl": "~2.0.0" + } + }, + "node_modules/engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/error-ex": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.2.tgz", @@ -3715,15 +3733,6 @@ "resolved": "https://registry.npmjs.org/eventemitter2/-/eventemitter2-6.4.9.tgz", "integrity": "sha512-JEPTiaOt9f04oa6NOkc4aH+nVp5I3wEjpHbIPqfgCdD5v5bUzy7xQqwcVO2aDQgOWhI28da57HksMrzK9HlRxg==" }, - "node_modules/events": { - "version": "3.3.0", - "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", - "integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==", - "dev": true, - "engines": { - "node": ">=0.8.x" - } - }, "node_modules/execa": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/execa/-/execa-5.1.1.tgz", @@ -3863,9 +3872,9 @@ } }, "node_modules/fill-range": { - "version": "7.0.1", - "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", - "integrity": "sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==", + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", + "integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==", "dev": true, "dependencies": { "to-regex-range": "^5.0.1" @@ -4417,15 +4426,6 @@ "node": ">= 0.4" } }, - "node_modules/invariant": { - "version": "2.2.4", - "resolved": "https://registry.npmjs.org/invariant/-/invariant-2.2.4.tgz", - "integrity": "sha512-phJfQVBuaJM5raOpJjSfkiD6BpbCE4Ns//LaXl6wGYtUBY83nWS6Rf9tXm2e8VaK60JEjYldbPif/A2B1C2gNA==", - "dev": true, - "dependencies": { - "loose-envify": "^1.0.0" - } - }, "node_modules/ioredis": { "version": "5.3.2", "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", @@ -4583,12 +4583,6 @@ "node": ">=0.10.0" } }, - "node_modules/is-in-browser": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/is-in-browser/-/is-in-browser-2.0.0.tgz", - "integrity": "sha512-/NUv5pqj+krUJalhGpj0lyy+x7vrD9jt1PlAfkoIDEXqE+xZgFJ4FU8e9m99WuHbCqsBZVf+nzvAjNso+SO80A==", - "dev": true - }, "node_modules/is-interactive": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/is-interactive/-/is-interactive-1.0.0.tgz", @@ -5457,6 +5451,14 @@ "node": ">=6" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keyv": { "version": "4.5.3", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.3.tgz", @@ -5566,18 +5568,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/loose-envify": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/loose-envify/-/loose-envify-1.4.0.tgz", - "integrity": "sha512-lyuxPGr/Wfhrlem2CL/UcnUc1zcqKAImBDzukY7Y5F/yQiNdko6+fRLevlw1HgMySw7f611UIY408EtxRSoK3Q==", - "dev": true, - "dependencies": { - "js-tokens": "^3.0.0 || ^4.0.0" - }, - "bin": { - "loose-envify": "cli.js" - } - }, "node_modules/lru-cache": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-5.1.1.tgz", @@ -6542,15 +6532,6 @@ "url": "https://github.com/chalk/ansi-styles?sponsor=1" } }, - "node_modules/promise": { - "version": "8.3.0", - "resolved": "https://registry.npmjs.org/promise/-/promise-8.3.0.tgz", - "integrity": "sha512-rZPNPKTOYVNEEKFaq1HqTgOwZD+4/YHS5ukLzQCypkj+OkYx7iv0mA91lJlpPPZ8vMau3IIGj5Qlwrx+8iiSmg==", - "dev": true, - "dependencies": { - "asap": "~2.0.6" - } - }, "node_modules/prompts": { "version": "2.4.2", "resolved": "https://registry.npmjs.org/prompts/-/prompts-2.4.2.tgz", @@ -7175,6 +7156,34 @@ "url": "https://github.com/chalk/slice-ansi?sponsor=1" } }, + "node_modules/socket.io-client": { + "version": "4.7.5", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.7.5.tgz", + "integrity": "sha512-sJ/tqHOCe7Z50JCBCXrsY3I2k03iOiUe+tj1OmKeD2lXPiGH/RUCdTZFoqVyN7l1MnpIzPrGtLcijffmeouNlQ==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/socket.io-parser": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1" + }, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/source-map": { "version": "0.6.1", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", @@ -7271,6 +7280,11 @@ "node": ">=10" } }, + "node_modules/string-similarity-alg": { + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/string-similarity-alg/-/string-similarity-alg-1.3.2.tgz", + "integrity": "sha512-M+jTGEJmWLfIg2dawXOifzbkUs/tp8HbeSCXZpNII2oZvU5uexaBFx+NoUBWS3M6VQ2ezJJCMstU8L8gq6YqsQ==" + }, "node_modules/string-width": { "version": "4.2.3", "resolved": "https://registry.npmjs.org/string-width/-/string-width-4.2.3.tgz", @@ -8027,9 +8041,9 @@ } }, "node_modules/ws": { - "version": "8.16.0", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.16.0.tgz", - "integrity": "sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ==", + "version": "8.17.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", "dev": true, "engines": { "node": ">=10.0.0" @@ -8047,6 +8061,15 @@ } } }, + "node_modules/xmlhttprequest-ssl": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.0.0.tgz", + "integrity": "sha512-QKxVRxiRACQcVuQEYFsI1hhkrMlrXHPegbbd1yn9UHOmRxY+si12nQYzri3vbzt8VdTTRviqcKxcyllFas5z2A==", + "dev": true, + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/y18n": { "version": "5.0.8", "resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.8.tgz", diff --git a/package.json b/package.json index e341c24..61087fa 100644 --- a/package.json +++ b/package.json @@ -23,6 +23,7 @@ "devDependencies": { "@jest/globals": "^29.3.1", "@types/jest": "^29.2.3", + "@types/lodash": "^4.17.4", "@types/node": "^18.11.9", "@types/parse-torrent": "^5.8.7", "@typescript-eslint/eslint-plugin": "^5.44.0", @@ -45,11 +46,15 @@ "typescript": "^4.9.3" }, "dependencies": { + "lodash": "^4.17.21", "@robertklep/qbittorrent": "^1.0.1", "ioredis": "^5.0.0", - "moleculer": "^0.14.27", - "moleculer-web": "^0.10.5", - "parse-torrent": "^9.1.5" + "kafkajs": "^2.2.4", + "socket.io-client": "^4.7.5", + "moleculer": "^0.14.34", + "moleculer-web": "^0.10.7", + "parse-torrent": "^9.1.5", + "string-similarity-alg": "^1.3.2" }, "engines": { "node": ">= 16.x.x" diff --git a/services/autodownload.service.ts b/services/autodownload.service.ts index 4e5753a..76d1c26 100644 --- a/services/autodownload.service.ts +++ b/services/autodownload.service.ts @@ -1,68 +1,114 @@ "use strict"; -import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer"; -import axios from "axios"; +import { Kafka } from "kafkajs"; +import type { Context, ServiceBroker, ServiceSchema } from "moleculer"; +import { Errors, Service } from "moleculer"; + +interface Comic { + wanted: { + markEntireVolumeWanted?: boolean; + issues?: any[]; + volume: { + id: string; + name: string; + }; + }; +} export default class AutoDownloadService extends Service { + private kafkaProducer: any; + + private readonly BATCH_SIZE = 100; // Adjust based on your system capacity + // @ts-ignore - public constructor( + constructor( public broker: ServiceBroker, schema: ServiceSchema<{}> = { name: "autodownload" }, ) { super(broker); this.parseServiceSchema({ name: "autodownload", - mixins: [], - hooks: {}, actions: { searchWantedComics: { rest: "POST /searchWantedComics", handler: async (ctx: Context<{}>) => { - // 1.iterate through the wanted comic objects, and: - // 1a. Orchestrate all issues from ComicVine if the entire volume is wanted - // 1b. Just the issues in "wanted.issues[]" - const wantedComics: any = await this.broker.call( - "library.getComicsMarkedAsWanted", - {}, - ); - - // Iterate through the list of wanted comics - for (const comic of wantedComics) { - let issuesToSearch: any = []; - - if (comic.wanted.markEntireVolumeAsWanted) { - // 1a. Fetch all issues from ComicVine if the entire volume is wanted - issuesToSearch = await this.broker.call( - "comicvine.getIssuesForVolume", - { - volumeId: comic.wanted.volume.id, - }, + try { + /* eslint-disable no-await-in-loop */ + let page = 1; + const limit = this.BATCH_SIZE; + let comics: Comic[]; + do { + comics = await this.broker.call( + "library.getComicsMarkedAsWanted", + { page, limit }, ); - } else if (comic.wanted.issues && comic.wanted.issues.length > 0) { - // 1b. Just the issues in "wanted.issues[]" - issuesToSearch = comic.wanted.issues; - } - for (const issue of issuesToSearch) { - // construct the search queries - } + // Log debugging info + this.logger.info( + "Received comics from getComicsMarkedAsWanted:", + JSON.stringify(comics, null, 2), + ); + if (!Array.isArray(comics)) { + this.logger.error( + "Invalid response structure", + JSON.stringify(comics, null, 2), + ); + throw new Errors.MoleculerError( + "Invalid response structure from getComicsMarkedAsWanted", + 500, + "INVALID_RESPONSE_STRUCTURE", + ); + } + this.logger.info( + `Fetched ${comics.length} comics from page ${page}`, + ); + for (const comic of comics) { + await this.produceJobToKafka(comic); + } + page += 1; + } while (comics.length === limit); + + return { + success: true, + message: "Jobs enqueued for background processing.", + }; + } catch (error) { + this.logger.error("Error in searchWantedComics:", error); + throw new Errors.MoleculerError( + "Failed to search wanted comics.", + 500, + "SEARCH_WANTED_COMICS_ERROR", + { error }, + ); } }, }, - determineDownloadChannel: { - rest: "POST /determineDownloadChannel", - handler: async (ctx: Context<{}>) => { - // 1. Parse the incoming search query - // to make sure that it is well-formed - // At the very least, it should have name, year, number - // 2. Choose between download mediums based on user-preference? - // possible choices are: DC++, Torrent - // 3. Perform the search on those media with the aforementioned search query - // 4. Choose a subset of relevant search results, - // and score them - // 5. Download the highest-scoring, relevant result - }, + }, + methods: { + produceJobToKafka: async (comic: Comic) => { + const job = { comic }; + try { + await this.kafkaProducer.send({ + topic: "comic-search-jobs", + messages: [{ value: JSON.stringify(job) }], + }); + this.logger.info("Produced job to Kafka:", job); + } catch (error) { + this.logger.error("Error producing job to Kafka:", error); + } }, }, - methods: {}, + async started() { + const kafka = new Kafka({ + clientId: "comic-search-service", + brokers: ["localhost:9092"], + }); + this.kafkaProducer = kafka.producer(); + await this.kafkaProducer.connect(); + this.logger.info("Kafka producer connected successfully."); + }, + async stopped() { + await this.kafkaProducer.disconnect(); + this.logger.info("Kafka producer disconnected successfully."); + }, }); } } diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts new file mode 100644 index 0000000..39b2c7f --- /dev/null +++ b/services/comicprocessor.service.ts @@ -0,0 +1,310 @@ +import type { EachMessagePayload } from "kafkajs"; +import { Kafka, logLevel } from "kafkajs"; +import { isNil, isUndefined } from "lodash"; +import type { ServiceBroker, ServiceSchema } from "moleculer"; +import { Service } from "moleculer"; +import io from "socket.io-client"; +import stringSimilarity from "string-similarity-alg"; + +interface SearchResult { + groupedResult: { entityId: number; payload: any }; + updatedResult: { entityId: number; payload: any }; +} + +export default class ComicProcessorService extends Service { + private kafkaConsumer: any; + + private socketIOInstance: any; + + private kafkaProducer: any; + + private prowlarrResultsMap: Map = new Map(); + + private airDCPPSearchResults: Map = new Map(); + + private issuesToSearch: any = []; + + // @ts-ignore: schema parameter is required by Service constructor + constructor( + public broker: ServiceBroker, + schema: ServiceSchema = { name: "comicProcessor" }, + ) { + super(broker, schema); + this.parseServiceSchema({ + name: "comicProcessor", + methods: { + parseStringDate: (dateString: string) => { + const date = new Date(dateString); + return { + year: date.getFullYear(), + month: date.getMonth() + 1, + day: date.getDate(), + }; + }, + rankSearchResults: async (results: Map, query: string) => { + // Find the highest-ranked response based on similarity to the search string + let highestRankedResult = null; + let highestSimilarity = -1; + + results.forEach((resultArray) => { + resultArray.forEach((result) => { + const similarity = stringSimilarity("jaro-winkler").compare( + result.name, + query, + ); + if (similarity > highestSimilarity) { + highestSimilarity = similarity; + highestRankedResult = { ...result, similarity }; + } + }); + }); + + return highestRankedResult; + }, + processJob: async (job: any) => { + try { + this.logger.info("Processing job:", JSON.stringify(job, null, 2)); + // Get the hub to search on + const settings: any = await this.broker.call("settings.getSettings", { + settingsKey: "directConnect", + }); + const hubs = settings.client.hubs.map((hub: any) => hub.value); + + const { comic } = job; + const { volume, issues, markEntireVolumeWanted } = comic.wanted; + + // If entire volume is marked as wanted, get their details from CV + if (markEntireVolumeWanted) { + this.issuesToSearch = await this.broker.call( + "comicvine.getIssuesForVolume", + { volumeId: volume.id }, + ); + this.logger.info( + `The entire volume with id: ${volume.id} was marked as wanted.`, + ); + this.logger.info(`Fetched issues for ${volume.id}:`); + this.logger.info(`${this.issuesToSearch.length} issues to search`); + } else { + // Or proceed with `issues` from the wanted object. + this.issuesToSearch = issues; + } + + for (const issue of this.issuesToSearch) { + // Query builder for DC++ + // 1. issue number + const inferredIssueNumber = + issue.issueNumber || issue.issue_number || ""; + // 2. year + const { year } = this.parseStringDate(issue.coverDate); + const inferredYear = year || issue.year || ""; + + // 3. Orchestrate the query + const dcppSearchQuery = { + query: { + pattern: `${volume.name + .replace(/[^\w\s]/g, "") + .replace(/\s+/g, " ") + .trim()}`, + extensions: ["cbz", "cbr", "cb7"], + }, + hub_urls: hubs, + priority: 5, + }; + this.logger.info( + "DC++ search query:", + JSON.stringify(dcppSearchQuery, null, 4), + ); + + await this.broker.call("socket.search", { + query: dcppSearchQuery, + config: { + hostname: "192.168.1.119:5600", + protocol: "http", + username: "admin", + password: "password", + }, + namespace: "/automated", + }); + + // const prowlarrResults = await this.broker.call("prowlarr.search", { + // prowlarrQuery: { + // port: "9696", + // apiKey: "c4f42e265fb044dc81f7e88bd41c3367", + // offset: 0, + // categories: [7030], + // query: `${volume.name} ${issue.issueNumber} ${year}`, + // host: "localhost", + // limit: 100, + // type: "search", + // indexerIds: [2], + // }, + // }); + // + // this.logger.info( + // "Prowlarr search results:", + // JSON.stringify(prowlarrResults, null, 4), + // ); + + // Store prowlarr results in map using unique key + // const key = `${volume.name}-${issue.issueNumber}-${year}`; + // this.prowlarrResultsMap.set(key, prowlarrResults); + } + } catch (error) { + this.logger.error("Error processing job:", error); + } + }, + produceResultsToKafka: async (query: string, result: any[]): Promise => { + try { + /* + Match and rank + */ + const finalResult = await this.rankSearchResults( + this.airDCPPSearchResults, + query, + ); + /* + Kafka messages need to be in a format that can be serialized to JSON, + and a Map is not directly serializable in a way that retains its structure, + hence why we use Object.fromEntries + */ + await this.kafkaProducer.send({ + topic: "comic-search-results", + messages: [ + { + value: JSON.stringify(finalResult), + }, + ], + }); + this.logger.info(`Produced results to Kafka.`); + + // socket event for UI + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "searchResultsAvailable", + args: [ + { + query, + finalResult, + }, + ], + }); + } catch (error) { + this.logger.error("Error producing results to Kafka:", error); + } + }, + }, + async started() { + const kafka = new Kafka({ + clientId: "comic-processor-service", + brokers: ["localhost:9092"], + logLevel: logLevel.INFO, + }); + this.kafkaConsumer = kafka.consumer({ groupId: "comic-processor-group" }); + this.kafkaProducer = kafka.producer(); + + this.kafkaConsumer.on("consumer.crash", (event: any) => { + this.logger.error("Consumer crash:", event); + }); + this.kafkaConsumer.on("consumer.connect", () => { + this.logger.info("Consumer connected"); + }); + this.kafkaConsumer.on("consumer.disconnect", () => { + this.logger.info("Consumer disconnected"); + }); + this.kafkaConsumer.on("consumer.network.request_timeout", () => { + this.logger.warn("Consumer network request timeout"); + }); + + await this.kafkaConsumer.connect(); + await this.kafkaProducer.connect(); + + await this.kafkaConsumer.subscribe({ + topic: "comic-search-jobs", + fromBeginning: true, + }); + + await this.kafkaConsumer.run({ + eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + if (message.value) { + const job = JSON.parse(message.value.toString()); + await this.processJob(job); + } else { + this.logger.warn("Received message with null value"); + } + }, + }); + + this.socketIOInstance = io("ws://localhost:3001/automated", { + transports: ["websocket"], + withCredentials: true, + }); + this.socketIOInstance.on("connect", () => { + this.logger.info("Socket.IO connected successfully."); + }); + + // Handle searchResultAdded event + this.socketIOInstance.on("searchResultAdded", (result: SearchResult) => { + const { + groupedResult: { entityId, payload }, + } = result; + + this.logger.info( + `AirDC++ Search result added for entityId: ${entityId} - ${payload?.name}`, + ); + if (!this.airDCPPSearchResults.has(entityId)) { + this.airDCPPSearchResults.set(entityId, []); + } + if (!isNil(payload)) { + this.airDCPPSearchResults.get(entityId).push(payload); + } + + console.log( + "Updated airDCPPSearchResults:", + JSON.stringify(Array.from(this.airDCPPSearchResults.entries()), null, 4), + ); + console.log(JSON.stringify(payload, null, 4)); + }); + + // Handle searchResultUpdated event + this.socketIOInstance.on("searchResultUpdated", (result: SearchResult) => { + const { + updatedResult: { entityId, payload }, + } = result; + const resultsForInstance = this.airDCPPSearchResults.get(entityId); + + if (resultsForInstance) { + const toReplaceIndex = resultsForInstance.findIndex((element: any) => { + this.logger.info("search result updated!"); + this.logger.info(JSON.stringify(element, null, 4)); + return element.id === payload.id; + }); + + if (toReplaceIndex !== -1) { + // Replace the existing result with the updated result + resultsForInstance[toReplaceIndex] = payload; +rty6j + // Optionally, update the map with the modified array + this.airDCPPSearchResults.set(entityId, resultsForInstance); + } + } + }); + + // Handle searchComplete event + this.socketIOInstance.on("searchesSent", async (data: any) => { + this.logger.info( + `Search complete for query: "${data.searchInfo.query.pattern}"`, + ); + await this.produceResultsToKafka(data.searchInfo.query.pattern); + }); + }, + async stopped() { + await this.kafkaConsumer.disconnect(); + await this.kafkaProducer.disconnect(); + + if (this.socketIOInstance) { + this.socketIOInstance.close(); + } + }, + }); + } +} diff --git a/services/prowlarr.service.ts b/services/prowlarr.service.ts index 0a1b7a9..b1adc01 100644 --- a/services/prowlarr.service.ts +++ b/services/prowlarr.service.ts @@ -54,28 +54,31 @@ export default class ProwlarrService extends Service { rest: "GET /search", handler: async ( ctx: Context<{ - host: string; - port: string; - apiKey: string; - query: string; - type: string; - indexerIds: [number]; - categories: [number]; - limit: number; - offset: number; + prowlarrQuery: { + host: string; + port: string; + apiKey: string; + query: string; + type: string; + indexerIds: [number]; + categories: [number]; + limit: number; + offset: number; + }; }>, ) => { - console.log(JSON.stringify(ctx.params, null, 2)); const { - indexerIds, - categories, - host, - port, - apiKey, - query, - type, - limit, - offset, + prowlarrQuery: { + indexerIds, + categories, + host, + port, + apiKey, + query, + type, + limit, + offset, + }, } = ctx.params; const indexer = indexerIds[0] ? indexerIds.length === 1 : indexerIds; const category = categories[0] ? categories.length === 1 : categories;