From 6fb1374ce9d9808ba46905a2fd9947ca94c68623 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Thu, 9 May 2024 13:45:18 -0400 Subject: [PATCH 01/14] =?UTF-8?q?=F0=9F=8F=97=EF=B8=8F=20WIP=20for=20autom?= =?UTF-8?q?atic=20downloads=20endpoint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package-lock.json | 168 +++++++++++++++---------------- package.json | 1 + services/autodownload.service.ts | 102 +++++++++++++++++-- services/prowlarr.service.ts | 41 ++++---- 4 files changed, 200 insertions(+), 112 deletions(-) diff --git a/package-lock.json b/package-lock.json index 36ab843..c22b043 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,7 +21,6 @@ "@types/parse-torrent": "^5.8.7", "@typescript-eslint/eslint-plugin": "^5.44.0", "@typescript-eslint/parser": "^5.44.0", - "airdcpp-apisocket": "^2.4.4", "axios": "^1.5.0", "concurrently": "^7.6.0", "cross-env": "^7.0.3", @@ -35,10 +34,10 @@ "moleculer-repl": "^0.7.3", "prettier": "^2.8.0", "qbittorrent-api-v2": "^1.2.2", + "socket.io-client": "^4.7.5", "ts-jest": "^29.0.3", "ts-node": "^10.9.1", - "typescript": "^4.9.3", - "ws": "^8.16.0" + "typescript": "^4.9.3" }, "engines": { "node": ">= 16.x.x" @@ -1361,6 +1360,12 @@ "@sinonjs/commons": "^3.0.0" } }, + "node_modules/@socket.io/component-emitter": { + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.2.tgz", + "integrity": "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==", + "dev": true + }, "node_modules/@tsconfig/node10": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz", @@ -1761,19 +1766,6 @@ "node": ">=0.4.0" } }, - "node_modules/airdcpp-apisocket": { - "version": "2.4.4", - "resolved": "https://registry.npmjs.org/airdcpp-apisocket/-/airdcpp-apisocket-2.4.4.tgz", - "integrity": "sha512-Xn0kWSVdLJwPpOoHcdI2wzzfzZW2jTpuyZR2wCNs2UIlZhO+FTwMf3QQfNCt5gYTOld9LaiCEulxFuXDA8qrLA==", - "dev": true, - "dependencies": { - "chalk": "^4.1.2", - "events": "^3.3.0", - "invariant": "^2.2.4", - "is-in-browser": "^2.0.0", - "promise": "^8.1.0" - } - }, "node_modules/ajv": { "version": "6.12.6", "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", @@ -2076,12 +2068,6 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/asap": { - "version": "2.0.6", - "resolved": "https://registry.npmjs.org/asap/-/asap-2.0.6.tgz", - "integrity": "sha512-BSHWgDSAiKs50o2Re8ppvp3seVHXSRM44cdSsT9FfNEUUZLOGWVCsiWaRPWM1Znn+mqZ1OfVZ3z3DWEzSp7hRA==", - "dev": true - }, "node_modules/astral-regex": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/astral-regex/-/astral-regex-2.0.0.tgz", @@ -3029,6 +3015,49 @@ "node": ">= 0.8" } }, + "node_modules/engine.io-client": { + "version": "6.5.3", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.5.3.tgz", + "integrity": "sha512-9Z0qLB0NIisTRt1DZ/8U2k12RJn8yls/nXMZLn+/N8hANT3TcYjKFKcwbw5zFQiN4NTde3TSY9zb79e1ij6j9Q==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.11.0", + "xmlhttprequest-ssl": "~2.0.0" + } + }, + "node_modules/engine.io-client/node_modules/ws": { + "version": "8.11.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz", + "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", + "dev": true, + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": "^5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, + "node_modules/engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/error-ex": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.2.tgz", @@ -3715,15 +3744,6 @@ "resolved": "https://registry.npmjs.org/eventemitter2/-/eventemitter2-6.4.9.tgz", "integrity": "sha512-JEPTiaOt9f04oa6NOkc4aH+nVp5I3wEjpHbIPqfgCdD5v5bUzy7xQqwcVO2aDQgOWhI28da57HksMrzK9HlRxg==" }, - "node_modules/events": { - "version": "3.3.0", - "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", - "integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==", - "dev": true, - "engines": { - "node": ">=0.8.x" - } - }, "node_modules/execa": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/execa/-/execa-5.1.1.tgz", @@ -4417,15 +4437,6 @@ "node": ">= 0.4" } }, - "node_modules/invariant": { - "version": "2.2.4", - "resolved": "https://registry.npmjs.org/invariant/-/invariant-2.2.4.tgz", - "integrity": "sha512-phJfQVBuaJM5raOpJjSfkiD6BpbCE4Ns//LaXl6wGYtUBY83nWS6Rf9tXm2e8VaK60JEjYldbPif/A2B1C2gNA==", - "dev": true, - "dependencies": { - "loose-envify": "^1.0.0" - } - }, "node_modules/ioredis": { "version": "5.3.2", "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", @@ -4583,12 +4594,6 @@ "node": ">=0.10.0" } }, - "node_modules/is-in-browser": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/is-in-browser/-/is-in-browser-2.0.0.tgz", - "integrity": "sha512-/NUv5pqj+krUJalhGpj0lyy+x7vrD9jt1PlAfkoIDEXqE+xZgFJ4FU8e9m99WuHbCqsBZVf+nzvAjNso+SO80A==", - "dev": true - }, "node_modules/is-interactive": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/is-interactive/-/is-interactive-1.0.0.tgz", @@ -5566,18 +5571,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/loose-envify": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/loose-envify/-/loose-envify-1.4.0.tgz", - "integrity": "sha512-lyuxPGr/Wfhrlem2CL/UcnUc1zcqKAImBDzukY7Y5F/yQiNdko6+fRLevlw1HgMySw7f611UIY408EtxRSoK3Q==", - "dev": true, - "dependencies": { - "js-tokens": "^3.0.0 || ^4.0.0" - }, - "bin": { - "loose-envify": "cli.js" - } - }, "node_modules/lru-cache": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-5.1.1.tgz", @@ -6542,15 +6535,6 @@ "url": "https://github.com/chalk/ansi-styles?sponsor=1" } }, - "node_modules/promise": { - "version": "8.3.0", - "resolved": "https://registry.npmjs.org/promise/-/promise-8.3.0.tgz", - "integrity": "sha512-rZPNPKTOYVNEEKFaq1HqTgOwZD+4/YHS5ukLzQCypkj+OkYx7iv0mA91lJlpPPZ8vMau3IIGj5Qlwrx+8iiSmg==", - "dev": true, - "dependencies": { - "asap": "~2.0.6" - } - }, "node_modules/prompts": { "version": "2.4.2", "resolved": "https://registry.npmjs.org/prompts/-/prompts-2.4.2.tgz", @@ -7175,6 +7159,34 @@ "url": "https://github.com/chalk/slice-ansi?sponsor=1" } }, + "node_modules/socket.io-client": { + "version": "4.7.5", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.7.5.tgz", + "integrity": "sha512-sJ/tqHOCe7Z50JCBCXrsY3I2k03iOiUe+tj1OmKeD2lXPiGH/RUCdTZFoqVyN7l1MnpIzPrGtLcijffmeouNlQ==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/socket.io-parser": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1" + }, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/source-map": { "version": "0.6.1", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", @@ -8026,25 +8038,13 @@ "node": "^12.13.0 || ^14.15.0 || >=16.0.0" } }, - "node_modules/ws": { - "version": "8.16.0", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.16.0.tgz", - "integrity": "sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ==", + "node_modules/xmlhttprequest-ssl": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.0.0.tgz", + "integrity": "sha512-QKxVRxiRACQcVuQEYFsI1hhkrMlrXHPegbbd1yn9UHOmRxY+si12nQYzri3vbzt8VdTTRviqcKxcyllFas5z2A==", "dev": true, "engines": { - "node": ">=10.0.0" - }, - "peerDependencies": { - "bufferutil": "^4.0.1", - "utf-8-validate": ">=5.0.2" - }, - "peerDependenciesMeta": { - "bufferutil": { - "optional": true - }, - "utf-8-validate": { - "optional": true - } + "node": ">=0.4.0" } }, "node_modules/y18n": { diff --git a/package.json b/package.json index e341c24..7593264 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "moleculer-repl": "^0.7.3", "prettier": "^2.8.0", "qbittorrent-api-v2": "^1.2.2", + "socket.io-client": "^4.7.5", "ts-jest": "^29.0.3", "ts-node": "^10.9.1", "typescript": "^4.9.3" diff --git a/services/autodownload.service.ts b/services/autodownload.service.ts index 4e5753a..102157a 100644 --- a/services/autodownload.service.ts +++ b/services/autodownload.service.ts @@ -1,6 +1,6 @@ "use strict"; import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer"; -import axios from "axios"; +import io from "socket.io-client"; export default class AutoDownloadService extends Service { // @ts-ignore @@ -25,12 +25,17 @@ export default class AutoDownloadService extends Service { {}, ); + // 2a. Get the list of hubs from AirDC++ + const data: any = await this.broker.call("settings.getSettings", { + settingsKey: "directConnect", + }); + const { hubs } = data?.client; + console.log("HUBZZZZZ", hubs); // Iterate through the list of wanted comics - for (const comic of wantedComics) { + wantedComics.forEach(async (comic: any) => { let issuesToSearch: any = []; - if (comic.wanted.markEntireVolumeAsWanted) { - // 1a. Fetch all issues from ComicVine if the entire volume is wanted + // Fetch all issues from ComicVine if the entire volume is wanted issuesToSearch = await this.broker.call( "comicvine.getIssuesForVolume", { @@ -39,12 +44,68 @@ export default class AutoDownloadService extends Service { ); } else if (comic.wanted.issues && comic.wanted.issues.length > 0) { // 1b. Just the issues in "wanted.issues[]" - issuesToSearch = comic.wanted.issues; + issuesToSearch = { + issues: comic.wanted.issues, + volumeName: comic.wanted.volume?.name, + }; } - for (const issue of issuesToSearch) { - // construct the search queries + for (const issue of issuesToSearch.issues) { + // 2. construct the search queries + + // 2b. for AirDC++ search, with the volume name, issueId and cover_date + const { year } = this.parseStringDate(issue.coverDate); + + const dcppSearchQuery = { + query: { + pattern: `${issuesToSearch.volumeName.replace(/#/g, "")} ${ + issue.issueNumber + } ${year}`, + extensions: ["cbz", "cbr", "cb7"], + }, + hub_urls: hubs.map((hub: any) => hub.value), + priority: 5, + }; + // Perform the AirDC++ search + const dcppResults = await this.broker.call("socket.search", { + query: dcppSearchQuery, + config: { + hostname: "localhost:5600", + protocol: "http", + username: "user", + password: "pass", + }, + namespace: "/automated", + }); + this.socketIOInstance.on("searchResultUpdated", (data: any) => { + console.log("Hyaar we go", data); + }); + // const dcppResults = await ctx.call("airdcpp.search", { + // dcppSearchQuery, + // }); + + // 2b. for Prowlarr search, with the volume name, issueId and cover_date + const prowlarrQuery = { + port: "9696", + apiKey: "c4f42e265fb044dc81f7e88bd41c3367", + offset: 0, + categories: [7030], + query: `${issuesToSearch.volumeName} ${issue.issueNumber} ${year}`, + host: "localhost", + limit: 100, + type: "search", + indexerIds: [2], + }; + + // Perform the Prowlarr search + const prowlarrResults = await this.broker.call("prowlarr.search", { + prowlarrQuery, + }); + + // Process results here or after the loop + console.log("DCPP Results: ", dcppResults); + console.log("Prowlarr Results: ", prowlarrResults); } - } + }); }, }, determineDownloadChannel: { @@ -62,7 +123,30 @@ export default class AutoDownloadService extends Service { }, }, }, - methods: {}, + methods: { + parseStringDate: (dateString: string) => { + const date = new Date(dateString); + + // Get the year, month, and day + const year = date.getFullYear(); // 2022 + const month = date.getMonth() + 1; // December is 11 in Date object (0-indexed), so add 1 to make it human-readable + const day = date.getDate(); // 1 + return { year, month, day }; + }, + }, + async started() { + this.socketIOInstance = io("ws://localhost:3001/automated", { + transports: ["websocket"], + withCredentials: true, + }); + this.socketIOInstance.on("connect", (data: any) => { + console.log("connected", data); + }); + + this.socketIOInstance.on("searchResultAdded", (data: any) => { + console.log("Received searchResultUpdated event:", data); + }); + }, }); } } diff --git a/services/prowlarr.service.ts b/services/prowlarr.service.ts index 0a1b7a9..b1adc01 100644 --- a/services/prowlarr.service.ts +++ b/services/prowlarr.service.ts @@ -54,28 +54,31 @@ export default class ProwlarrService extends Service { rest: "GET /search", handler: async ( ctx: Context<{ - host: string; - port: string; - apiKey: string; - query: string; - type: string; - indexerIds: [number]; - categories: [number]; - limit: number; - offset: number; + prowlarrQuery: { + host: string; + port: string; + apiKey: string; + query: string; + type: string; + indexerIds: [number]; + categories: [number]; + limit: number; + offset: number; + }; }>, ) => { - console.log(JSON.stringify(ctx.params, null, 2)); const { - indexerIds, - categories, - host, - port, - apiKey, - query, - type, - limit, - offset, + prowlarrQuery: { + indexerIds, + categories, + host, + port, + apiKey, + query, + type, + limit, + offset, + }, } = ctx.params; const indexer = indexerIds[0] ? indexerIds.length === 1 : indexerIds; const category = categories[0] ? categories.length === 1 : categories; From 12e46334da1c9278768e0155a20bc7ffa88f2a8c Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Tue, 28 May 2024 08:39:46 -0400 Subject: [PATCH 02/14] =?UTF-8?q?=F0=9F=AA=B3=20kafka=20for=20handling=20d?= =?UTF-8?q?c++=20download=20jobs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package-lock.json | 17 +++ package.json | 3 + services/autodownload.service.ts | 179 +++++++++------------------ services/comicprocessor.service.ts | 188 +++++++++++++++++++++++++++++ 4 files changed, 265 insertions(+), 122 deletions(-) create mode 100644 services/comicprocessor.service.ts diff --git a/package-lock.json b/package-lock.json index c22b043..2a50e64 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "dependencies": { "@robertklep/qbittorrent": "^1.0.1", "ioredis": "^5.0.0", + "kafkajs": "^2.2.4", "moleculer": "^0.14.27", "moleculer-web": "^0.10.5", "parse-torrent": "^9.1.5" @@ -17,6 +18,7 @@ "devDependencies": { "@jest/globals": "^29.3.1", "@types/jest": "^29.2.3", + "@types/lodash": "^4.17.4", "@types/node": "^18.11.9", "@types/parse-torrent": "^5.8.7", "@typescript-eslint/eslint-plugin": "^5.44.0", @@ -31,6 +33,7 @@ "eslint-plugin-import": "^2.26.0", "eslint-plugin-jest": "^27.1.6", "jest": "^29.3.1", + "lodash": "^4.17.21", "moleculer-repl": "^0.7.3", "prettier": "^2.8.0", "qbittorrent-api-v2": "^1.2.2", @@ -1486,6 +1489,12 @@ "integrity": "sha512-dRLjCWHYg4oaA77cxO64oO+7JwCwnIzkZPdrrC71jQmQtlhM556pwKo5bUzqvZndkVbeFLIIi+9TC40JNF5hNQ==", "dev": true }, + "node_modules/@types/lodash": { + "version": "4.17.4", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.17.4.tgz", + "integrity": "sha512-wYCP26ZLxaT3R39kiN2+HcJ4kTd3U1waI/cY7ivWYqFP6pW3ZNpvi6Wd6PHZx7T/t8z0vlkXMg3QYLa7DZ/IJQ==", + "dev": true + }, "node_modules/@types/magnet-uri": { "version": "5.1.5", "resolved": "https://registry.npmjs.org/@types/magnet-uri/-/magnet-uri-5.1.5.tgz", @@ -5462,6 +5471,14 @@ "node": ">=6" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keyv": { "version": "4.5.3", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.3.tgz", diff --git a/package.json b/package.json index 7593264..1af15b1 100644 --- a/package.json +++ b/package.json @@ -23,6 +23,7 @@ "devDependencies": { "@jest/globals": "^29.3.1", "@types/jest": "^29.2.3", + "@types/lodash": "^4.17.4", "@types/node": "^18.11.9", "@types/parse-torrent": "^5.8.7", "@typescript-eslint/eslint-plugin": "^5.44.0", @@ -37,6 +38,7 @@ "eslint-plugin-import": "^2.26.0", "eslint-plugin-jest": "^27.1.6", "jest": "^29.3.1", + "lodash": "^4.17.21", "moleculer-repl": "^0.7.3", "prettier": "^2.8.0", "qbittorrent-api-v2": "^1.2.2", @@ -48,6 +50,7 @@ "dependencies": { "@robertklep/qbittorrent": "^1.0.1", "ioredis": "^5.0.0", + "kafkajs": "^2.2.4", "moleculer": "^0.14.27", "moleculer-web": "^0.10.5", "parse-torrent": "^9.1.5" diff --git a/services/autodownload.service.ts b/services/autodownload.service.ts index 102157a..f1b3289 100644 --- a/services/autodownload.service.ts +++ b/services/autodownload.service.ts @@ -1,8 +1,10 @@ "use strict"; import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer"; -import io from "socket.io-client"; +import { Kafka } from "kafkajs"; export default class AutoDownloadService extends Service { + private kafkaProducer: any; + // @ts-ignore public constructor( public broker: ServiceBroker, @@ -11,141 +13,74 @@ export default class AutoDownloadService extends Service { super(broker); this.parseServiceSchema({ name: "autodownload", - mixins: [], - hooks: {}, actions: { searchWantedComics: { rest: "POST /searchWantedComics", handler: async (ctx: Context<{}>) => { - // 1.iterate through the wanted comic objects, and: - // 1a. Orchestrate all issues from ComicVine if the entire volume is wanted - // 1b. Just the issues in "wanted.issues[]" - const wantedComics: any = await this.broker.call( - "library.getComicsMarkedAsWanted", - {}, - ); + try { + const wantedComics: any = await this.broker.call( + "library.getComicsMarkedAsWanted", + {}, + ); + this.logger.info("Fetched wanted comics:", wantedComics.length); - // 2a. Get the list of hubs from AirDC++ - const data: any = await this.broker.call("settings.getSettings", { - settingsKey: "directConnect", - }); - const { hubs } = data?.client; - console.log("HUBZZZZZ", hubs); - // Iterate through the list of wanted comics - wantedComics.forEach(async (comic: any) => { - let issuesToSearch: any = []; - if (comic.wanted.markEntireVolumeAsWanted) { - // Fetch all issues from ComicVine if the entire volume is wanted - issuesToSearch = await this.broker.call( - "comicvine.getIssuesForVolume", - { - volumeId: comic.wanted.volume.id, - }, - ); - } else if (comic.wanted.issues && comic.wanted.issues.length > 0) { - // 1b. Just the issues in "wanted.issues[]" - issuesToSearch = { - issues: comic.wanted.issues, - volumeName: comic.wanted.volume?.name, - }; + for (const comic of wantedComics) { + if (comic.wanted.markEntireVolumeWanted) { + const issues: any = await this.broker.call( + "comicvine.getIssuesForVolume", + { + volumeId: comic.wanted.volume.id, + }, + ); + for (const issue of issues) { + await this.produceJobToKafka( + comic.wanted.volume.name, + issue, + ); + } + } else if (comic.wanted.issues && comic.wanted.issues.length > 0) { + for (const issue of comic.wanted.issues) { + await this.produceJobToKafka( + comic.wanted.volume?.name, + issue, + ); + } + } } - for (const issue of issuesToSearch.issues) { - // 2. construct the search queries - - // 2b. for AirDC++ search, with the volume name, issueId and cover_date - const { year } = this.parseStringDate(issue.coverDate); - - const dcppSearchQuery = { - query: { - pattern: `${issuesToSearch.volumeName.replace(/#/g, "")} ${ - issue.issueNumber - } ${year}`, - extensions: ["cbz", "cbr", "cb7"], - }, - hub_urls: hubs.map((hub: any) => hub.value), - priority: 5, - }; - // Perform the AirDC++ search - const dcppResults = await this.broker.call("socket.search", { - query: dcppSearchQuery, - config: { - hostname: "localhost:5600", - protocol: "http", - username: "user", - password: "pass", - }, - namespace: "/automated", - }); - this.socketIOInstance.on("searchResultUpdated", (data: any) => { - console.log("Hyaar we go", data); - }); - // const dcppResults = await ctx.call("airdcpp.search", { - // dcppSearchQuery, - // }); - - // 2b. for Prowlarr search, with the volume name, issueId and cover_date - const prowlarrQuery = { - port: "9696", - apiKey: "c4f42e265fb044dc81f7e88bd41c3367", - offset: 0, - categories: [7030], - query: `${issuesToSearch.volumeName} ${issue.issueNumber} ${year}`, - host: "localhost", - limit: 100, - type: "search", - indexerIds: [2], - }; - - // Perform the Prowlarr search - const prowlarrResults = await this.broker.call("prowlarr.search", { - prowlarrQuery, - }); - - // Process results here or after the loop - console.log("DCPP Results: ", dcppResults); - console.log("Prowlarr Results: ", prowlarrResults); - } - }); - }, - }, - determineDownloadChannel: { - rest: "POST /determineDownloadChannel", - handler: async (ctx: Context<{}>) => { - // 1. Parse the incoming search query - // to make sure that it is well-formed - // At the very least, it should have name, year, number - // 2. Choose between download mediums based on user-preference? - // possible choices are: DC++, Torrent - // 3. Perform the search on those media with the aforementioned search query - // 4. Choose a subset of relevant search results, - // and score them - // 5. Download the highest-scoring, relevant result + } catch (error) { + this.logger.error("Error in searchWantedComics:", error); + throw new Errors.MoleculerError( + "Failed to search wanted comics.", + 500, + "SEARCH_WANTED_COMICS_ERROR", + { error }, + ); + } }, }, }, methods: { - parseStringDate: (dateString: string) => { - const date = new Date(dateString); - - // Get the year, month, and day - const year = date.getFullYear(); // 2022 - const month = date.getMonth() + 1; // December is 11 in Date object (0-indexed), so add 1 to make it human-readable - const day = date.getDate(); // 1 - return { year, month, day }; + produceJobToKafka: async (volumeName: string, issue: any) => { + const job = { volumeName, issue }; + await this.kafkaProducer.send({ + topic: "comic-search-jobs", + messages: [{ value: JSON.stringify(job) }], + }); + this.logger.info("Produced job to Kafka:", job); }, }, async started() { - this.socketIOInstance = io("ws://localhost:3001/automated", { - transports: ["websocket"], - withCredentials: true, - }); - this.socketIOInstance.on("connect", (data: any) => { - console.log("connected", data); - }); - - this.socketIOInstance.on("searchResultAdded", (data: any) => { - console.log("Received searchResultUpdated event:", data); + const kafka = new Kafka({ + clientId: "comic-search-service", + brokers: ["localhost:9092"], }); + this.kafkaProducer = kafka.producer(); + await this.kafkaProducer.connect(); + this.logger.info("Kafka producer connected successfully."); + }, + async stopped() { + await this.kafkaProducer.disconnect(); + this.logger.info("Kafka producer disconnected successfully."); }, }); } diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts new file mode 100644 index 0000000..18e5372 --- /dev/null +++ b/services/comicprocessor.service.ts @@ -0,0 +1,188 @@ +"use strict"; +import { Service, ServiceBroker, ServiceSchema } from "moleculer"; +import { Kafka, EachMessagePayload, logLevel } from "kafkajs"; +import { isUndefined } from "lodash"; +import io from "socket.io-client"; + +export default class ComicProcessorService extends Service { + private kafkaConsumer: any; + private socketIOInstance: any; + private kafkaProducer: any; + private prowlarrResultsMap: Map = new Map(); + private airDCPPSearchResults: Array = []; + + // @ts-ignore + public constructor( + public broker: ServiceBroker, + schema: ServiceSchema<{}> = { name: "comicProcessor" }, + ) { + super(broker); + this.parseServiceSchema({ + name: "comicProcessor", + methods: { + parseStringDate: (dateString: string) => { + const date = new Date(dateString); + return { + year: date.getFullYear(), + month: date.getMonth() + 1, + day: date.getDate(), + }; + }, + processJob: async (job: any) => { + this.logger.info("Processing job:", job); + const { volumeName, issue } = job; + const { year } = this.parseStringDate(issue.cover_date || issue.coverDate); + const settings: any = await this.broker.call("settings.getSettings", { + settingsKey: "directConnect", + }); + const hubs = settings.client.hubs.map((hub: any) => hub.value); + const dcppSearchQuery = { + query: { + pattern: `${volumeName.replace(/#/g, "")} ${ + issue.issue_number || issue.issueNumber + } ${year}`, + extensions: ["cbz", "cbr", "cb7"], + }, + hub_urls: hubs, + priority: 5, + }; + this.logger.info( + "DC++ search query:", + JSON.stringify(dcppSearchQuery, null, 4), + ); + + await this.broker.call("socket.search", { + query: dcppSearchQuery, + config: { + hostname: "localhost:5600", + protocol: "http", + username: "user", + password: "pass", + }, + namespace: "/automated", + }); + + const prowlarrResults = await this.broker.call("prowlarr.search", { + prowlarrQuery: { + port: "9696", + apiKey: "c4f42e265fb044dc81f7e88bd41c3367", + offset: 0, + categories: [7030], + query: `${volumeName} ${issue.issueNumber} ${year}`, + host: "localhost", + limit: 100, + type: "search", + indexerIds: [2], + }, + }); + + this.logger.info( + "Prowlarr search results:", + JSON.stringify(prowlarrResults, null, 4), + ); + // Store prowlarr results in map using unique key + const key = `${volumeName}-${issue.issueNumber}-${year}`; + this.prowlarrResultsMap.set(key, prowlarrResults); + }, + produceResultsToKafka: async (dcppResults: any, prowlarrResults: any) => { + const results = { dcppResults, prowlarrResults }; + await this.kafkaProducer.send({ + topic: "comic-search-results", + messages: [{ value: JSON.stringify(results) }], + }); + this.logger.info( + "Produced results to Kafka:", + JSON.stringify(results, null, 4), + ); + }, + }, + async started() { + const kafka = new Kafka({ + clientId: "comic-processor-service", + brokers: ["localhost:9092"], + logLevel: logLevel.INFO, + }); + this.kafkaConsumer = kafka.consumer({ groupId: "comic-processor-group" }); + this.kafkaProducer = kafka.producer(); + await this.kafkaConsumer.connect(); + await this.kafkaProducer.connect(); + this.logger.info("Kafka consumer and producer connected successfully."); + + await this.kafkaConsumer.subscribe({ + topic: "comic-search-jobs", + fromBeginning: true, + }); + + await this.kafkaConsumer.run({ + eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + if (message.value) { + const job = JSON.parse(message.value.toString()); + this.logger.info( + "Consumed job from Kafka:", + JSON.stringify(job, null, 4), + ); + await this.processJob(job); + } else { + this.logger.warn("Received message with null value"); + } + }, + }); + + this.socketIOInstance = io("ws://localhost:3001/automated", { + transports: ["websocket"], + withCredentials: true, + }); + this.socketIOInstance.on("connect", () => { + this.logger.info("Socket.IO connected successfully."); + }); + + this.socketIOInstance.on("searchResultAdded", (data: any) => { + this.logger.info( + "Received search result added:", + JSON.stringify(data, null, 4), + ); + this.airDCPPSearchResults.push(data); + }); + + this.socketIOInstance.on("searchResultUpdated", async (data: any) => { + this.logger.info( + "Received search result update:", + JSON.stringify(data, null, 4), + ); + if ( + !isUndefined(data.result) && + !isUndefined(this.airDCPPSearchResults.result) + ) { + const toReplaceIndex = this.airDCPPSearchResults.findIndex( + (element: any) => { + return element?.result.id === data.result.id; + }, + ); + this.airDCPPSearchResults[toReplaceIndex] = data.result; + } + }); + this.socketIOInstance.on("searchComplete", async () => { + // Ensure results are not empty before producing to Kafka + if (this.airDCPPSearchResults.length > 0) { + await this.produceResultsToKafka(this.airDCPPSearchResults, []); + } else { + this.logger.warn( + "AirDC++ search results are empty, not producing to Kafka.", + ); + } + }); + }, + async stopped() { + await this.kafkaConsumer.disconnect(); + await this.kafkaProducer.disconnect(); + this.logger.info("Kafka consumer and producer disconnected successfully."); + + // Close Socket.IO connection + if (this.socketIOInstance) { + this.socketIOInstance.close(); + this.logger.info("Socket.IO disconnected successfully."); + } + }, + }); + } +} From 60e5b6f61b1037c6523f691581c389424136cad6 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Mon, 3 Jun 2024 17:18:22 -0400 Subject: [PATCH 03/14] =?UTF-8?q?=F0=9F=94=A7=20kafka-powered=20autodownlo?= =?UTF-8?q?ad=20loop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/autodownload.service.ts | 90 ++++++++++++++++++++++-------- services/comicprocessor.service.ts | 83 ++++++++++++++++++--------- 2 files changed, 125 insertions(+), 48 deletions(-) diff --git a/services/autodownload.service.ts b/services/autodownload.service.ts index f1b3289..e3b5690 100644 --- a/services/autodownload.service.ts +++ b/services/autodownload.service.ts @@ -2,6 +2,25 @@ import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer"; import { Kafka } from "kafkajs"; +interface Comic { + wanted: { + markEntireVolumeWanted?: boolean; + issues?: Array; + volume: { + id: string; + name: string; + }; + }; +} + +interface PaginatedResult { + wantedComics: Comic[]; + total: number; + page: number; + limit: number; + pages: number; +} + export default class AutoDownloadService extends Service { private kafkaProducer: any; @@ -18,35 +37,60 @@ export default class AutoDownloadService extends Service { rest: "POST /searchWantedComics", handler: async (ctx: Context<{}>) => { try { - const wantedComics: any = await this.broker.call( - "library.getComicsMarkedAsWanted", - {}, - ); - this.logger.info("Fetched wanted comics:", wantedComics.length); + let page = 1; + const limit = this.BATCH_SIZE; - for (const comic of wantedComics) { - if (comic.wanted.markEntireVolumeWanted) { - const issues: any = await this.broker.call( - "comicvine.getIssuesForVolume", - { - volumeId: comic.wanted.volume.id, - }, + while (true) { + const result: PaginatedResult = await this.broker.call( + "library.getComicsMarkedAsWanted", + { page, limit }, + ); + + if (!result || !result.wantedComics) { + this.logger.error("Invalid response structure", result); + throw new Errors.MoleculerError( + "Invalid response structure from getComicsMarkedAsWanted", + 500, + "INVALID_RESPONSE_STRUCTURE", ); - for (const issue of issues) { - await this.produceJobToKafka( - comic.wanted.volume.name, - issue, - ); - } - } else if (comic.wanted.issues && comic.wanted.issues.length > 0) { - for (const issue of comic.wanted.issues) { - await this.produceJobToKafka( - comic.wanted.volume?.name, - issue, + } + + this.logger.info( + `Fetched ${result.wantedComics.length} comics from page ${page} of ${result.pages}`, + ); + + for (const comic of result.wantedComics) { + if (comic.wanted.markEntireVolumeWanted) { + const issues: any = await this.broker.call( + "comicvine.getIssuesForVolume", + { + volumeId: comic.wanted.volume.id, + }, ); + for (const issue of issues) { + await this.produceJobToKafka( + comic.wanted.volume.name, + issue, + ); + } + } else if ( + comic.wanted.issues && + comic.wanted.issues.length > 0 + ) { + for (const issue of comic.wanted.issues) { + await this.produceJobToKafka( + comic.wanted.volume?.name, + issue, + ); + } } } + + if (page >= result.pages) break; + page += 1; } + + return { success: true, message: "Processing started." }; } catch (error) { this.logger.error("Error in searchWantedComics:", error); throw new Errors.MoleculerError( diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index 18e5372..ff1a8ad 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -3,7 +3,20 @@ import { Service, ServiceBroker, ServiceSchema } from "moleculer"; import { Kafka, EachMessagePayload, logLevel } from "kafkajs"; import { isUndefined } from "lodash"; import io from "socket.io-client"; +interface SearchResult { + result: { + id: string; + // Add other relevant fields + }; + search_id: string; + // Add other relevant fields +} +interface SearchResultPayload { + groupedResult: SearchResult; + updatedResult: SearchResult; + instanceId: string; +} export default class ComicProcessorService extends Service { private kafkaConsumer: any; private socketIOInstance: any; @@ -136,35 +149,55 @@ export default class ComicProcessorService extends Service { this.logger.info("Socket.IO connected successfully."); }); - this.socketIOInstance.on("searchResultAdded", (data: any) => { - this.logger.info( - "Received search result added:", - JSON.stringify(data, null, 4), - ); - this.airDCPPSearchResults.push(data); - }); - - this.socketIOInstance.on("searchResultUpdated", async (data: any) => { - this.logger.info( - "Received search result update:", - JSON.stringify(data, null, 4), - ); - if ( - !isUndefined(data.result) && - !isUndefined(this.airDCPPSearchResults.result) - ) { - const toReplaceIndex = this.airDCPPSearchResults.findIndex( - (element: any) => { - return element?.result.id === data.result.id; - }, + this.socketIOInstance.on( + "searchResultAdded", + ({ groupedResult, instanceId }: SearchResultPayload) => { + this.logger.info( + "Received search result added:", + JSON.stringify(groupedResult, null, 4), ); - this.airDCPPSearchResults[toReplaceIndex] = data.result; - } - }); + this.airDCPPSearchResults.push({ + groupedResult: groupedResult.result, + instanceId, + }); + }, + ); + + this.socketIOInstance.on( + "searchResultUpdated", + async ({ updatedResult, instanceId }: SearchResultPayload) => { + this.logger.info( + "Received search result update:", + JSON.stringify(updatedResult, null, 4), + ); + if ( + !isUndefined(updatedResult.result) && + !isUndefined(this.airDCPPSearchResults.result) + ) { + const toReplaceIndex = this.airDCPPSearchResults.findIndex( + (element: any) => { + return element?.result.id === updatedResult.result.id; + }, + ); + this.airDCPPSearchResults[toReplaceIndex] = { + result: updatedResult.result, + instanceId, + }; + } + }, + ); this.socketIOInstance.on("searchComplete", async () => { // Ensure results are not empty before producing to Kafka if (this.airDCPPSearchResults.length > 0) { - await this.produceResultsToKafka(this.airDCPPSearchResults, []); + const results = this.airDCPPSearchResults.reduce((acc: any, item: any) => { + const key = item.instanceId; + if (!acc[key]) { + acc[key] = []; + } + acc[key].push(item); + return acc; + }, {}); + await this.produceResultsToKafka(results, []); } else { this.logger.warn( "AirDC++ search results are empty, not producing to Kafka.", From 55494abdc0d8a3ebd92824ceb04b59751edffa51 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Wed, 12 Jun 2024 21:41:29 -0500 Subject: [PATCH 04/14] =?UTF-8?q?=F0=9F=94=A7=20Fixes=20to=20search=20quer?= =?UTF-8?q?y=20builder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/autodownload.service.ts | 80 ++++------ services/comicprocessor.service.ts | 245 +++++++++++++++++------------ 2 files changed, 177 insertions(+), 148 deletions(-) diff --git a/services/autodownload.service.ts b/services/autodownload.service.ts index e3b5690..a568b90 100644 --- a/services/autodownload.service.ts +++ b/services/autodownload.service.ts @@ -13,16 +13,9 @@ interface Comic { }; } -interface PaginatedResult { - wantedComics: Comic[]; - total: number; - page: number; - limit: number; - pages: number; -} - export default class AutoDownloadService extends Service { private kafkaProducer: any; + private readonly BATCH_SIZE = 100; // Adjust based on your system capacity // @ts-ignore public constructor( @@ -41,13 +34,23 @@ export default class AutoDownloadService extends Service { const limit = this.BATCH_SIZE; while (true) { - const result: PaginatedResult = await this.broker.call( + const comics: Comic[] = await this.broker.call( "library.getComicsMarkedAsWanted", { page, limit }, ); - if (!result || !result.wantedComics) { - this.logger.error("Invalid response structure", result); + // Log the entire result object for debugging + this.logger.info( + "Received comics from getComicsMarkedAsWanted:", + JSON.stringify(comics, null, 2), + ); + + // Check if result structure is correct + if (!Array.isArray(comics)) { + this.logger.error( + "Invalid response structure", + JSON.stringify(comics, null, 2), + ); throw new Errors.MoleculerError( "Invalid response structure from getComicsMarkedAsWanted", 500, @@ -56,41 +59,22 @@ export default class AutoDownloadService extends Service { } this.logger.info( - `Fetched ${result.wantedComics.length} comics from page ${page} of ${result.pages}`, + `Fetched ${comics.length} comics from page ${page}`, ); - for (const comic of result.wantedComics) { - if (comic.wanted.markEntireVolumeWanted) { - const issues: any = await this.broker.call( - "comicvine.getIssuesForVolume", - { - volumeId: comic.wanted.volume.id, - }, - ); - for (const issue of issues) { - await this.produceJobToKafka( - comic.wanted.volume.name, - issue, - ); - } - } else if ( - comic.wanted.issues && - comic.wanted.issues.length > 0 - ) { - for (const issue of comic.wanted.issues) { - await this.produceJobToKafka( - comic.wanted.volume?.name, - issue, - ); - } - } + // Enqueue the jobs in batches + for (const comic of comics) { + await this.produceJobToKafka(comic); } - if (page >= result.pages) break; + if (comics.length < limit) break; // End loop if fewer comics than the limit were fetched page += 1; } - return { success: true, message: "Processing started." }; + return { + success: true, + message: "Jobs enqueued for background processing.", + }; } catch (error) { this.logger.error("Error in searchWantedComics:", error); throw new Errors.MoleculerError( @@ -104,13 +88,17 @@ export default class AutoDownloadService extends Service { }, }, methods: { - produceJobToKafka: async (volumeName: string, issue: any) => { - const job = { volumeName, issue }; - await this.kafkaProducer.send({ - topic: "comic-search-jobs", - messages: [{ value: JSON.stringify(job) }], - }); - this.logger.info("Produced job to Kafka:", job); + produceJobToKafka: async (comic: Comic) => { + const job = { comic }; + try { + await this.kafkaProducer.send({ + topic: "comic-search-jobs", + messages: [{ value: JSON.stringify(job) }], + }); + this.logger.info("Produced job to Kafka:", job); + } catch (error) { + this.logger.error("Error producing job to Kafka:", error); + } }, }, async started() { diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index ff1a8ad..e79e4b7 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -1,8 +1,9 @@ "use strict"; import { Service, ServiceBroker, ServiceSchema } from "moleculer"; import { Kafka, EachMessagePayload, logLevel } from "kafkajs"; -import { isUndefined } from "lodash"; import io from "socket.io-client"; +import { isUndefined } from "lodash"; + interface SearchResult { result: { id: string; @@ -17,12 +18,14 @@ interface SearchResultPayload { updatedResult: SearchResult; instanceId: string; } + export default class ComicProcessorService extends Service { private kafkaConsumer: any; private socketIOInstance: any; private kafkaProducer: any; private prowlarrResultsMap: Map = new Map(); - private airDCPPSearchResults: Array = []; + private airDCPPSearchResults: Map = new Map(); + private issuesToSearch: any = []; // @ts-ignore public constructor( @@ -42,71 +45,118 @@ export default class ComicProcessorService extends Service { }; }, processJob: async (job: any) => { - this.logger.info("Processing job:", job); - const { volumeName, issue } = job; - const { year } = this.parseStringDate(issue.cover_date || issue.coverDate); - const settings: any = await this.broker.call("settings.getSettings", { - settingsKey: "directConnect", - }); - const hubs = settings.client.hubs.map((hub: any) => hub.value); - const dcppSearchQuery = { - query: { - pattern: `${volumeName.replace(/#/g, "")} ${ - issue.issue_number || issue.issueNumber - } ${year}`, - extensions: ["cbz", "cbr", "cb7"], - }, - hub_urls: hubs, - priority: 5, - }; - this.logger.info( - "DC++ search query:", - JSON.stringify(dcppSearchQuery, null, 4), - ); + try { + this.logger.info("Processing job:", JSON.stringify(job, null, 2)); + const { comic } = job; + const { volume, issues, markEntireVolumeWanted } = comic.wanted; - await this.broker.call("socket.search", { - query: dcppSearchQuery, - config: { - hostname: "localhost:5600", - protocol: "http", - username: "user", - password: "pass", - }, - namespace: "/automated", - }); + // If entire volume is marked as wanted, get their details from CV + if (markEntireVolumeWanted) { + this.issuesToSearch = await this.broker.call( + "comicvine.getIssuesForVolume", + { volumeId: volume.id }, + ); + this.logger.info( + `The entire volume with id: ${volume.id} was marked as wanted.`, + ); + this.logger.info(`Fetched issues for ${volume.id}:`); + this.logger.info(`${this.issuesToSearch.length} issues to search`); + } else { + // Or proceed with `issues` from the wanted object. + this.issuesToSearch = issues; + } - const prowlarrResults = await this.broker.call("prowlarr.search", { - prowlarrQuery: { - port: "9696", - apiKey: "c4f42e265fb044dc81f7e88bd41c3367", - offset: 0, - categories: [7030], - query: `${volumeName} ${issue.issueNumber} ${year}`, - host: "localhost", - limit: 100, - type: "search", - indexerIds: [2], - }, - }); + for (const issue of this.issuesToSearch) { + // issue number + const inferredIssueNumber = issue.issueNumber + ? issue.issueNumber + : issue.issue_number; + // year + const { year } = this.parseStringDate(issue.coverDate); + const inferredYear = year ? issue?.coverDate : issue.year; - this.logger.info( - "Prowlarr search results:", - JSON.stringify(prowlarrResults, null, 4), - ); - // Store prowlarr results in map using unique key - const key = `${volumeName}-${issue.issueNumber}-${year}`; - this.prowlarrResultsMap.set(key, prowlarrResults); + const settings: any = await this.broker.call("settings.getSettings", { + settingsKey: "directConnect", + }); + const hubs = settings.client.hubs.map((hub: any) => hub.value); + const dcppSearchQuery = { + query: { + pattern: `${volume.name.replace( + /#/g, + "", + )} ${inferredIssueNumber} ${inferredYear}`, + extensions: ["cbz", "cbr", "cb7"], + }, + hub_urls: hubs, + priority: 5, + }; + this.logger.info( + "DC++ search query:", + JSON.stringify(dcppSearchQuery, null, 4), + ); + + await this.broker.call("socket.search", { + query: dcppSearchQuery, + config: { + hostname: "localhost:5600", + protocol: "http", + username: "user", + password: "pass", + }, + namespace: "/automated", + }); + + const prowlarrResults = await this.broker.call("prowlarr.search", { + prowlarrQuery: { + port: "9696", + apiKey: "c4f42e265fb044dc81f7e88bd41c3367", + offset: 0, + categories: [7030], + query: `${volume.name} ${issue.issueNumber} ${year}`, + host: "localhost", + limit: 100, + type: "search", + indexerIds: [2], + }, + }); + + this.logger.info( + "Prowlarr search results:", + JSON.stringify(prowlarrResults, null, 4), + ); + + // Store prowlarr results in map using unique key + const key = `${volume.name}-${issue.issueNumber}-${year}`; + this.prowlarrResultsMap.set(key, prowlarrResults); + } + } catch (error) { + this.logger.error("Error processing job:", error); + } }, produceResultsToKafka: async (dcppResults: any, prowlarrResults: any) => { const results = { dcppResults, prowlarrResults }; - await this.kafkaProducer.send({ - topic: "comic-search-results", - messages: [{ value: JSON.stringify(results) }], - }); - this.logger.info( - "Produced results to Kafka:", - JSON.stringify(results, null, 4), - ); + try { + await this.kafkaProducer.send({ + topic: "comic-search-results", + messages: [{ value: JSON.stringify(results) }], + }); + this.logger.info( + "Produced results to Kafka:", + JSON.stringify(results, null, 4), + ); + // socket event for UI + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "searchResultsAvailable", + args: [ + { + dcppResults, + }, + ], + }); + } catch (error) { + this.logger.error("Error producing results to Kafka:", error); + } }, }, async started() { @@ -117,9 +167,22 @@ export default class ComicProcessorService extends Service { }); this.kafkaConsumer = kafka.consumer({ groupId: "comic-processor-group" }); this.kafkaProducer = kafka.producer(); + + this.kafkaConsumer.on("consumer.crash", (event: any) => { + this.logger.error("Consumer crash:", event); + }); + this.kafkaConsumer.on("consumer.connect", () => { + this.logger.info("Consumer connected"); + }); + this.kafkaConsumer.on("consumer.disconnect", () => { + this.logger.info("Consumer disconnected"); + }); + this.kafkaConsumer.on("consumer.network.request_timeout", () => { + this.logger.warn("Consumer network request timeout"); + }); + await this.kafkaConsumer.connect(); await this.kafkaProducer.connect(); - this.logger.info("Kafka consumer and producer connected successfully."); await this.kafkaConsumer.subscribe({ topic: "comic-search-jobs", @@ -130,10 +193,6 @@ export default class ComicProcessorService extends Service { eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { if (message.value) { const job = JSON.parse(message.value.toString()); - this.logger.info( - "Consumed job from Kafka:", - JSON.stringify(job, null, 4), - ); await this.processJob(job); } else { this.logger.warn("Received message with null value"); @@ -149,6 +208,7 @@ export default class ComicProcessorService extends Service { this.logger.info("Socket.IO connected successfully."); }); + // Handle searchResultAdded event this.socketIOInstance.on( "searchResultAdded", ({ groupedResult, instanceId }: SearchResultPayload) => { @@ -156,13 +216,14 @@ export default class ComicProcessorService extends Service { "Received search result added:", JSON.stringify(groupedResult, null, 4), ); - this.airDCPPSearchResults.push({ - groupedResult: groupedResult.result, - instanceId, - }); + if (!this.airDCPPSearchResults.has(instanceId)) { + this.airDCPPSearchResults.set(instanceId, []); + } + this.airDCPPSearchResults.get(instanceId).push(groupedResult.result); }, ); + // Handle searchResultUpdated event this.socketIOInstance.on( "searchResultUpdated", async ({ updatedResult, instanceId }: SearchResultPayload) => { @@ -170,50 +231,30 @@ export default class ComicProcessorService extends Service { "Received search result update:", JSON.stringify(updatedResult, null, 4), ); - if ( - !isUndefined(updatedResult.result) && - !isUndefined(this.airDCPPSearchResults.result) - ) { - const toReplaceIndex = this.airDCPPSearchResults.findIndex( - (element: any) => { - return element?.result.id === updatedResult.result.id; - }, + const resultsForInstance = this.airDCPPSearchResults.get(instanceId); + if (resultsForInstance) { + const toReplaceIndex = resultsForInstance.findIndex( + (element: any) => element.id === updatedResult.result.id, ); - this.airDCPPSearchResults[toReplaceIndex] = { - result: updatedResult.result, - instanceId, - }; + if (toReplaceIndex !== -1) { + resultsForInstance[toReplaceIndex] = updatedResult.result; + } } }, ); - this.socketIOInstance.on("searchComplete", async () => { - // Ensure results are not empty before producing to Kafka - if (this.airDCPPSearchResults.length > 0) { - const results = this.airDCPPSearchResults.reduce((acc: any, item: any) => { - const key = item.instanceId; - if (!acc[key]) { - acc[key] = []; - } - acc[key].push(item); - return acc; - }, {}); - await this.produceResultsToKafka(results, []); - } else { - this.logger.warn( - "AirDC++ search results are empty, not producing to Kafka.", - ); - } + + // Handle searchComplete event + this.socketIOInstance.on("searchComplete", async (instanceId: string) => { + this.logger.info(`Search complete for instance ID ${instanceId}`); + await this.produceResultsToKafka(instanceId); }); }, async stopped() { await this.kafkaConsumer.disconnect(); await this.kafkaProducer.disconnect(); - this.logger.info("Kafka consumer and producer disconnected successfully."); - // Close Socket.IO connection if (this.socketIOInstance) { this.socketIOInstance.close(); - this.logger.info("Socket.IO disconnected successfully."); } }, }); From e6a85e6a397a00122020ebd33774cc6ab9b644bd Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Thu, 13 Jun 2024 14:09:00 -0400 Subject: [PATCH 05/14] =?UTF-8?q?=F0=9F=8F=97=EF=B8=8F=20Commented=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/comicprocessor.service.ts | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index e79e4b7..fd163b3 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -47,6 +47,12 @@ export default class ComicProcessorService extends Service { processJob: async (job: any) => { try { this.logger.info("Processing job:", JSON.stringify(job, null, 2)); + // Get the hub to search on + const settings: any = await this.broker.call("settings.getSettings", { + settingsKey: "directConnect", + }); + const hubs = settings.client.hubs.map((hub: any) => hub.value); + const { comic } = job; const { volume, issues, markEntireVolumeWanted } = comic.wanted; @@ -67,18 +73,15 @@ export default class ComicProcessorService extends Service { } for (const issue of this.issuesToSearch) { - // issue number - const inferredIssueNumber = issue.issueNumber - ? issue.issueNumber - : issue.issue_number; - // year + // Query builder for DC++ + // 1. issue number + const inferredIssueNumber = + issue.issueNumber || issue.issue_number || ""; + // 2. year const { year } = this.parseStringDate(issue.coverDate); - const inferredYear = year ? issue?.coverDate : issue.year; + const inferredYear = year || issue.year || ""; - const settings: any = await this.broker.call("settings.getSettings", { - settingsKey: "directConnect", - }); - const hubs = settings.client.hubs.map((hub: any) => hub.value); + // 3. Orchestrate the query const dcppSearchQuery = { query: { pattern: `${volume.name.replace( From ecdc3845cbca0c9792411618a7f154ca1f1b2b67 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Tue, 18 Jun 2024 00:06:25 -0400 Subject: [PATCH 06/14] =?UTF-8?q?=F0=9F=AA=B3=20Serialized=20Map=20to=20be?= =?UTF-8?q?=20compatible=20with=20kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/comicprocessor.service.ts | 90 +++++++++++++++++------------- 1 file changed, 50 insertions(+), 40 deletions(-) diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index fd163b3..28550af 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -24,7 +24,7 @@ export default class ComicProcessorService extends Service { private socketIOInstance: any; private kafkaProducer: any; private prowlarrResultsMap: Map = new Map(); - private airDCPPSearchResults: Map = new Map(); + private airDCPPSearchResults: Map = new Map(); private issuesToSearch: any = []; // @ts-ignore @@ -84,10 +84,10 @@ export default class ComicProcessorService extends Service { // 3. Orchestrate the query const dcppSearchQuery = { query: { - pattern: `${volume.name.replace( - /#/g, - "", - )} ${inferredIssueNumber} ${inferredYear}`, + pattern: `${volume.name + .replace(/[^\w\s]/g, "") + .replace(/\s+/g, " ") + .trim()}`, extensions: ["cbz", "cbr", "cb7"], }, hub_urls: hubs, @@ -109,51 +109,57 @@ export default class ComicProcessorService extends Service { namespace: "/automated", }); - const prowlarrResults = await this.broker.call("prowlarr.search", { - prowlarrQuery: { - port: "9696", - apiKey: "c4f42e265fb044dc81f7e88bd41c3367", - offset: 0, - categories: [7030], - query: `${volume.name} ${issue.issueNumber} ${year}`, - host: "localhost", - limit: 100, - type: "search", - indexerIds: [2], - }, - }); - - this.logger.info( - "Prowlarr search results:", - JSON.stringify(prowlarrResults, null, 4), - ); + // const prowlarrResults = await this.broker.call("prowlarr.search", { + // prowlarrQuery: { + // port: "9696", + // apiKey: "c4f42e265fb044dc81f7e88bd41c3367", + // offset: 0, + // categories: [7030], + // query: `${volume.name} ${issue.issueNumber} ${year}`, + // host: "localhost", + // limit: 100, + // type: "search", + // indexerIds: [2], + // }, + // }); + // + // this.logger.info( + // "Prowlarr search results:", + // JSON.stringify(prowlarrResults, null, 4), + // ); // Store prowlarr results in map using unique key - const key = `${volume.name}-${issue.issueNumber}-${year}`; - this.prowlarrResultsMap.set(key, prowlarrResults); + // const key = `${volume.name}-${issue.issueNumber}-${year}`; + // this.prowlarrResultsMap.set(key, prowlarrResults); } } catch (error) { this.logger.error("Error processing job:", error); } }, - produceResultsToKafka: async (dcppResults: any, prowlarrResults: any) => { - const results = { dcppResults, prowlarrResults }; + produceResultsToKafka: async () => { try { + /* + Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, hence we use Object.fromEntries + */ await this.kafkaProducer.send({ topic: "comic-search-results", - messages: [{ value: JSON.stringify(results) }], + messages: [ + { + value: JSON.stringify( + Object.fromEntries(this.airDCPPSearchResults), + ), + }, + ], }); - this.logger.info( - "Produced results to Kafka:", - JSON.stringify(results, null, 4), - ); + console.log(`Produced results to Kafka.`); + // socket event for UI await this.broker.call("socket.broadcast", { namespace: "/", event: "searchResultsAvailable", args: [ { - dcppResults, + bokya: Object.fromEntries(this.airDCPPSearchResults), }, ], }); @@ -230,26 +236,30 @@ export default class ComicProcessorService extends Service { this.socketIOInstance.on( "searchResultUpdated", async ({ updatedResult, instanceId }: SearchResultPayload) => { - this.logger.info( - "Received search result update:", - JSON.stringify(updatedResult, null, 4), - ); const resultsForInstance = this.airDCPPSearchResults.get(instanceId); + if (resultsForInstance) { const toReplaceIndex = resultsForInstance.findIndex( (element: any) => element.id === updatedResult.result.id, ); + if (toReplaceIndex !== -1) { + // Replace the existing result with the updated result resultsForInstance[toReplaceIndex] = updatedResult.result; + + // Optionally, update the map with the modified array + this.airDCPPSearchResults.set(instanceId, resultsForInstance); } } }, ); // Handle searchComplete event - this.socketIOInstance.on("searchComplete", async (instanceId: string) => { - this.logger.info(`Search complete for instance ID ${instanceId}`); - await this.produceResultsToKafka(instanceId); + this.socketIOInstance.on("searchesSent", async (data: any) => { + this.logger.info( + `Search complete for query: "${data.searchInfo.query.pattern}"`, + ); + await this.produceResultsToKafka(); }); }, async stopped() { From 0430670a01c2fb209a358facc7182e1d66267615 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Tue, 2 Jul 2024 22:09:33 -0400 Subject: [PATCH 07/14] =?UTF-8?q?=F0=9F=8F=97=EF=B8=8F=20Added=20structure?= =?UTF-8?q?=20for=20the=20UI=20notification?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/comicprocessor.service.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index 28550af..32a188c 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -136,10 +136,10 @@ export default class ComicProcessorService extends Service { this.logger.error("Error processing job:", error); } }, - produceResultsToKafka: async () => { + produceResultsToKafka: async (query: string) => { try { /* - Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, hence we use Object.fromEntries + Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, hence why we use Object.fromEntries */ await this.kafkaProducer.send({ topic: "comic-search-results", @@ -159,7 +159,8 @@ export default class ComicProcessorService extends Service { event: "searchResultsAvailable", args: [ { - bokya: Object.fromEntries(this.airDCPPSearchResults), + query, + results: Object.fromEntries(this.airDCPPSearchResults), }, ], }); @@ -259,7 +260,7 @@ export default class ComicProcessorService extends Service { this.logger.info( `Search complete for query: "${data.searchInfo.query.pattern}"`, ); - await this.produceResultsToKafka(); + await this.produceResultsToKafka(data.searchInfo.query.pattern); }); }, async stopped() { From a710211a2c83df3697521bb91a61601b38ac3114 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Wed, 10 Jul 2024 14:36:03 -0400 Subject: [PATCH 08/14] =?UTF-8?q?=F0=9F=94=A7=20Commented=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/comicprocessor.service.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index 32a188c..8b82c25 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -44,6 +44,7 @@ export default class ComicProcessorService extends Service { day: date.getDate(), }; }, + rankSearchResults: (results, query: string) => {}, processJob: async (job: any) => { try { this.logger.info("Processing job:", JSON.stringify(job, null, 2)); @@ -138,6 +139,10 @@ export default class ComicProcessorService extends Service { }, produceResultsToKafka: async (query: string) => { try { + /* + Match and rank + */ + /* Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, hence why we use Object.fromEntries */ From 4045097eb0d19ece4c67812c0d710de32de60785 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Wed, 14 Aug 2024 12:19:55 -0400 Subject: [PATCH 09/14] =?UTF-8?q?=F0=9F=8F=97=EF=B8=8F=20Added=20string-si?= =?UTF-8?q?milarity=20lib?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package-lock.json | 72 ++++++++++++++++-------------- package.json | 7 +-- services/comicprocessor.service.ts | 38 +++++++++++++--- 3 files changed, 74 insertions(+), 43 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2a50e64..f95073a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,8 @@ "kafkajs": "^2.2.4", "moleculer": "^0.14.27", "moleculer-web": "^0.10.5", - "parse-torrent": "^9.1.5" + "parse-torrent": "^9.1.5", + "string-similarity-alg": "^1.3.2" }, "devDependencies": { "@jest/globals": "^29.3.1", @@ -2356,12 +2357,12 @@ } }, "node_modules/braces": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz", - "integrity": "sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz", + "integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==", "dev": true, "dependencies": { - "fill-range": "^7.0.1" + "fill-range": "^7.1.1" }, "engines": { "node": ">=8" @@ -3025,39 +3026,18 @@ } }, "node_modules/engine.io-client": { - "version": "6.5.3", - "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.5.3.tgz", - "integrity": "sha512-9Z0qLB0NIisTRt1DZ/8U2k12RJn8yls/nXMZLn+/N8hANT3TcYjKFKcwbw5zFQiN4NTde3TSY9zb79e1ij6j9Q==", + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.5.4.tgz", + "integrity": "sha512-GeZeeRjpD2qf49cZQ0Wvh/8NJNfeXkXXcoGh+F77oEAgo9gUHwT1fCRxSNU+YEEaysOJTnsFHmM5oAcPy4ntvQ==", "dev": true, "dependencies": { "@socket.io/component-emitter": "~3.1.0", "debug": "~4.3.1", "engine.io-parser": "~5.2.1", - "ws": "~8.11.0", + "ws": "~8.17.1", "xmlhttprequest-ssl": "~2.0.0" } }, - "node_modules/engine.io-client/node_modules/ws": { - "version": "8.11.0", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz", - "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", - "dev": true, - "engines": { - "node": ">=10.0.0" - }, - "peerDependencies": { - "bufferutil": "^4.0.1", - "utf-8-validate": "^5.0.2" - }, - "peerDependenciesMeta": { - "bufferutil": { - "optional": true - }, - "utf-8-validate": { - "optional": true - } - } - }, "node_modules/engine.io-parser": { "version": "5.2.2", "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", @@ -3892,9 +3872,9 @@ } }, "node_modules/fill-range": { - "version": "7.0.1", - "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", - "integrity": "sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==", + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", + "integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==", "dev": true, "dependencies": { "to-regex-range": "^5.0.1" @@ -7300,6 +7280,11 @@ "node": ">=10" } }, + "node_modules/string-similarity-alg": { + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/string-similarity-alg/-/string-similarity-alg-1.3.2.tgz", + "integrity": "sha512-M+jTGEJmWLfIg2dawXOifzbkUs/tp8HbeSCXZpNII2oZvU5uexaBFx+NoUBWS3M6VQ2ezJJCMstU8L8gq6YqsQ==" + }, "node_modules/string-width": { "version": "4.2.3", "resolved": "https://registry.npmjs.org/string-width/-/string-width-4.2.3.tgz", @@ -8055,6 +8040,27 @@ "node": "^12.13.0 || ^14.15.0 || >=16.0.0" } }, + "node_modules/ws": { + "version": "8.17.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "dev": true, + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/xmlhttprequest-ssl": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.0.0.tgz", diff --git a/package.json b/package.json index 1af15b1..bb2000e 100644 --- a/package.json +++ b/package.json @@ -51,9 +51,10 @@ "@robertklep/qbittorrent": "^1.0.1", "ioredis": "^5.0.0", "kafkajs": "^2.2.4", - "moleculer": "^0.14.27", - "moleculer-web": "^0.10.5", - "parse-torrent": "^9.1.5" + "moleculer": "^0.14.34", + "moleculer-web": "^0.10.7", + "parse-torrent": "^9.1.5", + "string-similarity-alg": "^1.3.2" }, "engines": { "node": ">= 16.x.x" diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index 8b82c25..a53fab2 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -3,6 +3,7 @@ import { Service, ServiceBroker, ServiceSchema } from "moleculer"; import { Kafka, EachMessagePayload, logLevel } from "kafkajs"; import io from "socket.io-client"; import { isUndefined } from "lodash"; +import stringSimilarity from "string-similarity-alg"; interface SearchResult { result: { @@ -44,7 +45,26 @@ export default class ComicProcessorService extends Service { day: date.getDate(), }; }, - rankSearchResults: (results, query: string) => {}, + rankSearchResults: async (results: Map, query: string) => { + // Find the highest-ranked response based on similarity to the search string + let highestRankedResult = null; + let highestSimilarity = -1; + + results.forEach((resultArray) => { + resultArray.forEach((result) => { + const similarity = stringSimilarity("jaro-winkler").compare( + result.name, + query, + ); + if (similarity > highestSimilarity) { + highestSimilarity = similarity; + highestRankedResult = { ...result, similarity }; + } + }); + }); + + return highestRankedResult; + }, processJob: async (job: any) => { try { this.logger.info("Processing job:", JSON.stringify(job, null, 2)); @@ -137,12 +157,17 @@ export default class ComicProcessorService extends Service { this.logger.error("Error processing job:", error); } }, - produceResultsToKafka: async (query: string) => { + produceResultsToKafka: async (query: string, result: any[]) => { try { /* Match and rank */ - + const result = await this.rankSearchResults( + this.airDCPPSearchResults, + query, + ); + console.log("majori"); + console.log(result); /* Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, hence why we use Object.fromEntries */ @@ -150,9 +175,7 @@ export default class ComicProcessorService extends Service { topic: "comic-search-results", messages: [ { - value: JSON.stringify( - Object.fromEntries(this.airDCPPSearchResults), - ), + value: JSON.stringify(result), }, ], }); @@ -165,7 +188,7 @@ export default class ComicProcessorService extends Service { args: [ { query, - results: Object.fromEntries(this.airDCPPSearchResults), + result, }, ], }); @@ -265,6 +288,7 @@ export default class ComicProcessorService extends Service { this.logger.info( `Search complete for query: "${data.searchInfo.query.pattern}"`, ); + await this.produceResultsToKafka(data.searchInfo.query.pattern); }); }, From 651e3ac7bb800f2ad5d26adf7a0883c0c2f46fe0 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Mon, 18 Nov 2024 11:07:38 -0500 Subject: [PATCH 10/14] =?UTF-8?q?=F0=9F=94=A7=20Added=20an=20undefined=20c?= =?UTF-8?q?heck=20for=20the=20results?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/comicprocessor.service.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index a53fab2..5abf17d 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -2,7 +2,7 @@ import { Service, ServiceBroker, ServiceSchema } from "moleculer"; import { Kafka, EachMessagePayload, logLevel } from "kafkajs"; import io from "socket.io-client"; -import { isUndefined } from "lodash"; +import { isNil, isUndefined } from "lodash"; import stringSimilarity from "string-similarity-alg"; interface SearchResult { @@ -122,10 +122,10 @@ export default class ComicProcessorService extends Service { await this.broker.call("socket.search", { query: dcppSearchQuery, config: { - hostname: "localhost:5600", + hostname: "192.168.1.119:5600", protocol: "http", - username: "user", - password: "pass", + username: "admin", + password: "password", }, namespace: "/automated", }); @@ -257,7 +257,9 @@ export default class ComicProcessorService extends Service { if (!this.airDCPPSearchResults.has(instanceId)) { this.airDCPPSearchResults.set(instanceId, []); } - this.airDCPPSearchResults.get(instanceId).push(groupedResult.result); + if (!isUndefined(groupedResult?.result)) { + this.airDCPPSearchResults.get(instanceId).push(groupedResult.result); + } }, ); From e0954eb3e884f15e313c06a4b31a03bd64e001c1 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Fri, 22 Nov 2024 21:12:25 -0500 Subject: [PATCH 11/14] =?UTF-8?q?=F0=9F=94=A7=20Fixing=20the=20response=20?= =?UTF-8?q?from=20searchResultAdded=20event?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/comicprocessor.service.ts | 34 ++++++++++++++---------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index 5abf17d..89a1211 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -122,10 +122,10 @@ export default class ComicProcessorService extends Service { await this.broker.call("socket.search", { query: dcppSearchQuery, config: { - hostname: "192.168.1.119:5600", + hostname: "localhost:5600", protocol: "http", - username: "admin", - password: "password", + username: "user", + password: "pass", }, namespace: "/automated", }); @@ -247,21 +247,19 @@ export default class ComicProcessorService extends Service { }); // Handle searchResultAdded event - this.socketIOInstance.on( - "searchResultAdded", - ({ groupedResult, instanceId }: SearchResultPayload) => { - this.logger.info( - "Received search result added:", - JSON.stringify(groupedResult, null, 4), - ); - if (!this.airDCPPSearchResults.has(instanceId)) { - this.airDCPPSearchResults.set(instanceId, []); - } - if (!isUndefined(groupedResult?.result)) { - this.airDCPPSearchResults.get(instanceId).push(groupedResult.result); - } - }, - ); + this.socketIOInstance.on("searchResultAdded", ({ result }: SearchResult) => { + console.log(result); + this.logger.info( + "AirDC++ Search result added:", + JSON.stringify(result, null, 4), + ); + // if (!this.airDCPPSearchResults.has(instanceId)) { + // this.airDCPPSearchResults.set(instanceId, []); + // } + // if (!isUndefined(groupedResult?.result)) { + // this.airDCPPSearchResults.get(instanceId).push(groupedResult.result); + // } + }); // Handle searchResultUpdated event this.socketIOInstance.on( From cd9ea85b8057740020f1632f0712fe6f918a105c Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Sun, 22 Dec 2024 21:59:49 -0500 Subject: [PATCH 12/14] =?UTF-8?q?=F0=9F=94=A7=20Fixing=20autodownload=20fu?= =?UTF-8?q?nctionality?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 4 +- services/comicprocessor.service.ts | 111 ++++++++++++++++------------- 2 files changed, 62 insertions(+), 53 deletions(-) diff --git a/package.json b/package.json index bb2000e..61087fa 100644 --- a/package.json +++ b/package.json @@ -38,19 +38,19 @@ "eslint-plugin-import": "^2.26.0", "eslint-plugin-jest": "^27.1.6", "jest": "^29.3.1", - "lodash": "^4.17.21", "moleculer-repl": "^0.7.3", "prettier": "^2.8.0", "qbittorrent-api-v2": "^1.2.2", - "socket.io-client": "^4.7.5", "ts-jest": "^29.0.3", "ts-node": "^10.9.1", "typescript": "^4.9.3" }, "dependencies": { + "lodash": "^4.17.21", "@robertklep/qbittorrent": "^1.0.1", "ioredis": "^5.0.0", "kafkajs": "^2.2.4", + "socket.io-client": "^4.7.5", "moleculer": "^0.14.34", "moleculer-web": "^0.10.7", "parse-torrent": "^9.1.5", diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index 89a1211..d23d3a9 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -1,35 +1,31 @@ -"use strict"; -import { Service, ServiceBroker, ServiceSchema } from "moleculer"; -import { Kafka, EachMessagePayload, logLevel } from "kafkajs"; -import io from "socket.io-client"; +import type { EachMessagePayload } from "kafkajs"; +import { Kafka, logLevel } from "kafkajs"; import { isNil, isUndefined } from "lodash"; +import type { ServiceBroker, ServiceSchema } from "moleculer"; +import { Service } from "moleculer"; +import io from "socket.io-client"; import stringSimilarity from "string-similarity-alg"; interface SearchResult { - result: { - id: string; - // Add other relevant fields - }; - search_id: string; - // Add other relevant fields -} - -interface SearchResultPayload { - groupedResult: SearchResult; - updatedResult: SearchResult; - instanceId: string; + groupedResult: { entityId: number; payload: any }; + updatedResult: { entityId: number; payload: any }; } export default class ComicProcessorService extends Service { private kafkaConsumer: any; + private socketIOInstance: any; + private kafkaProducer: any; + private prowlarrResultsMap: Map = new Map(); + private airDCPPSearchResults: Map = new Map(); + private issuesToSearch: any = []; - // @ts-ignore - public constructor( + // @ts-ignore: schema parameter is required by Service constructor + constructor( public broker: ServiceBroker, schema: ServiceSchema<{}> = { name: "comicProcessor" }, ) { @@ -122,10 +118,10 @@ export default class ComicProcessorService extends Service { await this.broker.call("socket.search", { query: dcppSearchQuery, config: { - hostname: "localhost:5600", + hostname: "192.168.1.119:5600", protocol: "http", - username: "user", - password: "pass", + username: "admin", + password: "password", }, namespace: "/automated", }); @@ -157,7 +153,7 @@ export default class ComicProcessorService extends Service { this.logger.error("Error processing job:", error); } }, - produceResultsToKafka: async (query: string, result: any[]) => { + produceResultsToKafka: async (query: string, result: any[]): Promise => { try { /* Match and rank @@ -166,10 +162,12 @@ export default class ComicProcessorService extends Service { this.airDCPPSearchResults, query, ); + console.log(JSON.stringify(result, null, 4)); console.log("majori"); - console.log(result); /* - Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, hence why we use Object.fromEntries + Kafka messages need to be in a format that can be serialized to JSON, + and a Map is not directly serializable in a way that retains its structure, + hence why we use Object.fromEntries */ await this.kafkaProducer.send({ topic: "comic-search-results", @@ -247,48 +245,59 @@ export default class ComicProcessorService extends Service { }); // Handle searchResultAdded event - this.socketIOInstance.on("searchResultAdded", ({ result }: SearchResult) => { - console.log(result); + this.socketIOInstance.on("searchResultAdded", (result: SearchResult) => { + const { + groupedResult: { entityId, payload }, + } = result; + this.logger.info( - "AirDC++ Search result added:", - JSON.stringify(result, null, 4), + `AirDC++ Search result added for entityId: ${entityId} - ${payload?.name}`, ); - // if (!this.airDCPPSearchResults.has(instanceId)) { - // this.airDCPPSearchResults.set(instanceId, []); - // } - // if (!isUndefined(groupedResult?.result)) { - // this.airDCPPSearchResults.get(instanceId).push(groupedResult.result); - // } + if (!this.airDCPPSearchResults.has(entityId)) { + this.airDCPPSearchResults.set(entityId, []); + } + if (!isNil(payload)) { + this.airDCPPSearchResults.get(entityId).push(payload); + } + + console.log(typeof entityId, entityId); + console.log(entityId); + console.log( + "Updated airDCPPSearchResults:", + JSON.stringify(Array.from(this.airDCPPSearchResults.entries()), null, 4), + ); + console.log(JSON.stringify(payload, null, 4)); }); // Handle searchResultUpdated event - this.socketIOInstance.on( - "searchResultUpdated", - async ({ updatedResult, instanceId }: SearchResultPayload) => { - const resultsForInstance = this.airDCPPSearchResults.get(instanceId); + this.socketIOInstance.on("searchResultUpdated", (result: SearchResult) => { + const { + updatedResult: { entityId, payload }, + } = result; + const resultsForInstance = this.airDCPPSearchResults.get(entityId); - if (resultsForInstance) { - const toReplaceIndex = resultsForInstance.findIndex( - (element: any) => element.id === updatedResult.result.id, - ); + if (resultsForInstance) { + const toReplaceIndex = resultsForInstance.findIndex((element: any) => { + console.log("search result updated!"); + console.log(JSON.stringify(element, null, 4)); + return element.id === payload.id; + }); - if (toReplaceIndex !== -1) { - // Replace the existing result with the updated result - resultsForInstance[toReplaceIndex] = updatedResult.result; + if (toReplaceIndex !== -1) { + // Replace the existing result with the updated result + resultsForInstance[toReplaceIndex] = payload; - // Optionally, update the map with the modified array - this.airDCPPSearchResults.set(instanceId, resultsForInstance); - } + // Optionally, update the map with the modified array + this.airDCPPSearchResults.set(entityId, resultsForInstance); } - }, - ); + } + }); // Handle searchComplete event this.socketIOInstance.on("searchesSent", async (data: any) => { this.logger.info( `Search complete for query: "${data.searchInfo.query.pattern}"`, ); - await this.produceResultsToKafka(data.searchInfo.query.pattern); }); }, From 36f08212a0b07a2d97fe39479a47da87fcce59b7 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Mon, 27 Jan 2025 19:28:40 -0500 Subject: [PATCH 13/14] =?UTF-8?q?=F0=9F=94=A7=20Wired=20the=20kafka=20prod?= =?UTF-8?q?ucer=20back=20up?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/comicprocessor.service.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index d23d3a9..2fbbaa5 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -27,9 +27,9 @@ export default class ComicProcessorService extends Service { // @ts-ignore: schema parameter is required by Service constructor constructor( public broker: ServiceBroker, - schema: ServiceSchema<{}> = { name: "comicProcessor" }, + schema: ServiceSchema = { name: "comicProcessor" }, ) { - super(broker); + super(broker, schema); this.parseServiceSchema({ name: "comicProcessor", methods: { @@ -158,12 +158,12 @@ export default class ComicProcessorService extends Service { /* Match and rank */ - const result = await this.rankSearchResults( + const finalResult = await this.rankSearchResults( this.airDCPPSearchResults, query, ); - console.log(JSON.stringify(result, null, 4)); - console.log("majori"); + console.log("Final result:"); + console.log(JSON.stringify(finalResult, null, 4)); /* Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, @@ -173,7 +173,7 @@ export default class ComicProcessorService extends Service { topic: "comic-search-results", messages: [ { - value: JSON.stringify(result), + value: JSON.stringify(finalResult), }, ], }); @@ -186,7 +186,7 @@ export default class ComicProcessorService extends Service { args: [ { query, - result, + finalResult, }, ], }); From c0946e2ce43f258b5cea1bb1849c521751a288ef Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Mon, 17 Feb 2025 15:41:38 -0500 Subject: [PATCH 14/14] =?UTF-8?q?=F0=9F=94=A7=20Fixed=20awaits=20inside=20?= =?UTF-8?q?loops?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/autodownload.service.ts | 27 +++++++++++---------------- services/comicprocessor.service.ts | 12 ++++-------- 2 files changed, 15 insertions(+), 24 deletions(-) diff --git a/services/autodownload.service.ts b/services/autodownload.service.ts index a568b90..76d1c26 100644 --- a/services/autodownload.service.ts +++ b/services/autodownload.service.ts @@ -1,11 +1,12 @@ "use strict"; -import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer"; import { Kafka } from "kafkajs"; +import type { Context, ServiceBroker, ServiceSchema } from "moleculer"; +import { Errors, Service } from "moleculer"; interface Comic { wanted: { markEntireVolumeWanted?: boolean; - issues?: Array; + issues?: any[]; volume: { id: string; name: string; @@ -15,10 +16,11 @@ interface Comic { export default class AutoDownloadService extends Service { private kafkaProducer: any; + private readonly BATCH_SIZE = 100; // Adjust based on your system capacity // @ts-ignore - public constructor( + constructor( public broker: ServiceBroker, schema: ServiceSchema<{}> = { name: "autodownload" }, ) { @@ -30,22 +32,20 @@ export default class AutoDownloadService extends Service { rest: "POST /searchWantedComics", handler: async (ctx: Context<{}>) => { try { + /* eslint-disable no-await-in-loop */ let page = 1; const limit = this.BATCH_SIZE; - - while (true) { - const comics: Comic[] = await this.broker.call( + let comics: Comic[]; + do { + comics = await this.broker.call( "library.getComicsMarkedAsWanted", { page, limit }, ); - - // Log the entire result object for debugging + // Log debugging info this.logger.info( "Received comics from getComicsMarkedAsWanted:", JSON.stringify(comics, null, 2), ); - - // Check if result structure is correct if (!Array.isArray(comics)) { this.logger.error( "Invalid response structure", @@ -57,19 +57,14 @@ export default class AutoDownloadService extends Service { "INVALID_RESPONSE_STRUCTURE", ); } - this.logger.info( `Fetched ${comics.length} comics from page ${page}`, ); - - // Enqueue the jobs in batches for (const comic of comics) { await this.produceJobToKafka(comic); } - - if (comics.length < limit) break; // End loop if fewer comics than the limit were fetched page += 1; - } + } while (comics.length === limit); return { success: true, diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index 2fbbaa5..39b2c7f 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -162,8 +162,6 @@ export default class ComicProcessorService extends Service { this.airDCPPSearchResults, query, ); - console.log("Final result:"); - console.log(JSON.stringify(finalResult, null, 4)); /* Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, @@ -177,7 +175,7 @@ export default class ComicProcessorService extends Service { }, ], }); - console.log(`Produced results to Kafka.`); + this.logger.info(`Produced results to Kafka.`); // socket event for UI await this.broker.call("socket.broadcast", { @@ -260,8 +258,6 @@ export default class ComicProcessorService extends Service { this.airDCPPSearchResults.get(entityId).push(payload); } - console.log(typeof entityId, entityId); - console.log(entityId); console.log( "Updated airDCPPSearchResults:", JSON.stringify(Array.from(this.airDCPPSearchResults.entries()), null, 4), @@ -278,15 +274,15 @@ export default class ComicProcessorService extends Service { if (resultsForInstance) { const toReplaceIndex = resultsForInstance.findIndex((element: any) => { - console.log("search result updated!"); - console.log(JSON.stringify(element, null, 4)); + this.logger.info("search result updated!"); + this.logger.info(JSON.stringify(element, null, 4)); return element.id === payload.id; }); if (toReplaceIndex !== -1) { // Replace the existing result with the updated result resultsForInstance[toReplaceIndex] = payload; - +rty6j // Optionally, update the map with the modified array this.airDCPPSearchResults.set(entityId, resultsForInstance); }