From 60e5b6f61b1037c6523f691581c389424136cad6 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Mon, 3 Jun 2024 17:18:22 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A7=20kafka-powered=20autodownload=20l?= =?UTF-8?q?oop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/autodownload.service.ts | 90 ++++++++++++++++++++++-------- services/comicprocessor.service.ts | 83 ++++++++++++++++++--------- 2 files changed, 125 insertions(+), 48 deletions(-) diff --git a/services/autodownload.service.ts b/services/autodownload.service.ts index f1b3289..e3b5690 100644 --- a/services/autodownload.service.ts +++ b/services/autodownload.service.ts @@ -2,6 +2,25 @@ import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer"; import { Kafka } from "kafkajs"; +interface Comic { + wanted: { + markEntireVolumeWanted?: boolean; + issues?: Array; + volume: { + id: string; + name: string; + }; + }; +} + +interface PaginatedResult { + wantedComics: Comic[]; + total: number; + page: number; + limit: number; + pages: number; +} + export default class AutoDownloadService extends Service { private kafkaProducer: any; @@ -18,35 +37,60 @@ export default class AutoDownloadService extends Service { rest: "POST /searchWantedComics", handler: async (ctx: Context<{}>) => { try { - const wantedComics: any = await this.broker.call( - "library.getComicsMarkedAsWanted", - {}, - ); - this.logger.info("Fetched wanted comics:", wantedComics.length); + let page = 1; + const limit = this.BATCH_SIZE; - for (const comic of wantedComics) { - if (comic.wanted.markEntireVolumeWanted) { - const issues: any = await this.broker.call( - "comicvine.getIssuesForVolume", - { - volumeId: comic.wanted.volume.id, - }, + while (true) { + const result: PaginatedResult = await this.broker.call( + "library.getComicsMarkedAsWanted", + { page, limit }, + ); + + if (!result || !result.wantedComics) { + this.logger.error("Invalid response structure", result); + throw new Errors.MoleculerError( + "Invalid response structure from getComicsMarkedAsWanted", + 500, + "INVALID_RESPONSE_STRUCTURE", ); - for (const issue of issues) { - await this.produceJobToKafka( - comic.wanted.volume.name, - issue, - ); - } - } else if (comic.wanted.issues && comic.wanted.issues.length > 0) { - for (const issue of comic.wanted.issues) { - await this.produceJobToKafka( - comic.wanted.volume?.name, - issue, + } + + this.logger.info( + `Fetched ${result.wantedComics.length} comics from page ${page} of ${result.pages}`, + ); + + for (const comic of result.wantedComics) { + if (comic.wanted.markEntireVolumeWanted) { + const issues: any = await this.broker.call( + "comicvine.getIssuesForVolume", + { + volumeId: comic.wanted.volume.id, + }, ); + for (const issue of issues) { + await this.produceJobToKafka( + comic.wanted.volume.name, + issue, + ); + } + } else if ( + comic.wanted.issues && + comic.wanted.issues.length > 0 + ) { + for (const issue of comic.wanted.issues) { + await this.produceJobToKafka( + comic.wanted.volume?.name, + issue, + ); + } } } + + if (page >= result.pages) break; + page += 1; } + + return { success: true, message: "Processing started." }; } catch (error) { this.logger.error("Error in searchWantedComics:", error); throw new Errors.MoleculerError( diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index 18e5372..ff1a8ad 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -3,7 +3,20 @@ import { Service, ServiceBroker, ServiceSchema } from "moleculer"; import { Kafka, EachMessagePayload, logLevel } from "kafkajs"; import { isUndefined } from "lodash"; import io from "socket.io-client"; +interface SearchResult { + result: { + id: string; + // Add other relevant fields + }; + search_id: string; + // Add other relevant fields +} +interface SearchResultPayload { + groupedResult: SearchResult; + updatedResult: SearchResult; + instanceId: string; +} export default class ComicProcessorService extends Service { private kafkaConsumer: any; private socketIOInstance: any; @@ -136,35 +149,55 @@ export default class ComicProcessorService extends Service { this.logger.info("Socket.IO connected successfully."); }); - this.socketIOInstance.on("searchResultAdded", (data: any) => { - this.logger.info( - "Received search result added:", - JSON.stringify(data, null, 4), - ); - this.airDCPPSearchResults.push(data); - }); - - this.socketIOInstance.on("searchResultUpdated", async (data: any) => { - this.logger.info( - "Received search result update:", - JSON.stringify(data, null, 4), - ); - if ( - !isUndefined(data.result) && - !isUndefined(this.airDCPPSearchResults.result) - ) { - const toReplaceIndex = this.airDCPPSearchResults.findIndex( - (element: any) => { - return element?.result.id === data.result.id; - }, + this.socketIOInstance.on( + "searchResultAdded", + ({ groupedResult, instanceId }: SearchResultPayload) => { + this.logger.info( + "Received search result added:", + JSON.stringify(groupedResult, null, 4), ); - this.airDCPPSearchResults[toReplaceIndex] = data.result; - } - }); + this.airDCPPSearchResults.push({ + groupedResult: groupedResult.result, + instanceId, + }); + }, + ); + + this.socketIOInstance.on( + "searchResultUpdated", + async ({ updatedResult, instanceId }: SearchResultPayload) => { + this.logger.info( + "Received search result update:", + JSON.stringify(updatedResult, null, 4), + ); + if ( + !isUndefined(updatedResult.result) && + !isUndefined(this.airDCPPSearchResults.result) + ) { + const toReplaceIndex = this.airDCPPSearchResults.findIndex( + (element: any) => { + return element?.result.id === updatedResult.result.id; + }, + ); + this.airDCPPSearchResults[toReplaceIndex] = { + result: updatedResult.result, + instanceId, + }; + } + }, + ); this.socketIOInstance.on("searchComplete", async () => { // Ensure results are not empty before producing to Kafka if (this.airDCPPSearchResults.length > 0) { - await this.produceResultsToKafka(this.airDCPPSearchResults, []); + const results = this.airDCPPSearchResults.reduce((acc: any, item: any) => { + const key = item.instanceId; + if (!acc[key]) { + acc[key] = []; + } + acc[key].push(item); + return acc; + }, {}); + await this.produceResultsToKafka(results, []); } else { this.logger.warn( "AirDC++ search results are empty, not producing to Kafka.",