From ecdc3845cbca0c9792411618a7f154ca1f1b2b67 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Tue, 18 Jun 2024 00:06:25 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=AA=B3=20Serialized=20Map=20to=20be=20com?= =?UTF-8?q?patible=20with=20kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/comicprocessor.service.ts | 90 +++++++++++++++++------------- 1 file changed, 50 insertions(+), 40 deletions(-) diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index fd163b3..28550af 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -24,7 +24,7 @@ export default class ComicProcessorService extends Service { private socketIOInstance: any; private kafkaProducer: any; private prowlarrResultsMap: Map = new Map(); - private airDCPPSearchResults: Map = new Map(); + private airDCPPSearchResults: Map = new Map(); private issuesToSearch: any = []; // @ts-ignore @@ -84,10 +84,10 @@ export default class ComicProcessorService extends Service { // 3. Orchestrate the query const dcppSearchQuery = { query: { - pattern: `${volume.name.replace( - /#/g, - "", - )} ${inferredIssueNumber} ${inferredYear}`, + pattern: `${volume.name + .replace(/[^\w\s]/g, "") + .replace(/\s+/g, " ") + .trim()}`, extensions: ["cbz", "cbr", "cb7"], }, hub_urls: hubs, @@ -109,51 +109,57 @@ export default class ComicProcessorService extends Service { namespace: "/automated", }); - const prowlarrResults = await this.broker.call("prowlarr.search", { - prowlarrQuery: { - port: "9696", - apiKey: "c4f42e265fb044dc81f7e88bd41c3367", - offset: 0, - categories: [7030], - query: `${volume.name} ${issue.issueNumber} ${year}`, - host: "localhost", - limit: 100, - type: "search", - indexerIds: [2], - }, - }); - - this.logger.info( - "Prowlarr search results:", - JSON.stringify(prowlarrResults, null, 4), - ); + // const prowlarrResults = await this.broker.call("prowlarr.search", { + // prowlarrQuery: { + // port: "9696", + // apiKey: "c4f42e265fb044dc81f7e88bd41c3367", + // offset: 0, + // categories: [7030], + // query: `${volume.name} ${issue.issueNumber} ${year}`, + // host: "localhost", + // limit: 100, + // type: "search", + // indexerIds: [2], + // }, + // }); + // + // this.logger.info( + // "Prowlarr search results:", + // JSON.stringify(prowlarrResults, null, 4), + // ); // Store prowlarr results in map using unique key - const key = `${volume.name}-${issue.issueNumber}-${year}`; - this.prowlarrResultsMap.set(key, prowlarrResults); + // const key = `${volume.name}-${issue.issueNumber}-${year}`; + // this.prowlarrResultsMap.set(key, prowlarrResults); } } catch (error) { this.logger.error("Error processing job:", error); } }, - produceResultsToKafka: async (dcppResults: any, prowlarrResults: any) => { - const results = { dcppResults, prowlarrResults }; + produceResultsToKafka: async () => { try { + /* + Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, hence we use Object.fromEntries + */ await this.kafkaProducer.send({ topic: "comic-search-results", - messages: [{ value: JSON.stringify(results) }], + messages: [ + { + value: JSON.stringify( + Object.fromEntries(this.airDCPPSearchResults), + ), + }, + ], }); - this.logger.info( - "Produced results to Kafka:", - JSON.stringify(results, null, 4), - ); + console.log(`Produced results to Kafka.`); + // socket event for UI await this.broker.call("socket.broadcast", { namespace: "/", event: "searchResultsAvailable", args: [ { - dcppResults, + bokya: Object.fromEntries(this.airDCPPSearchResults), }, ], }); @@ -230,26 +236,30 @@ export default class ComicProcessorService extends Service { this.socketIOInstance.on( "searchResultUpdated", async ({ updatedResult, instanceId }: SearchResultPayload) => { - this.logger.info( - "Received search result update:", - JSON.stringify(updatedResult, null, 4), - ); const resultsForInstance = this.airDCPPSearchResults.get(instanceId); + if (resultsForInstance) { const toReplaceIndex = resultsForInstance.findIndex( (element: any) => element.id === updatedResult.result.id, ); + if (toReplaceIndex !== -1) { + // Replace the existing result with the updated result resultsForInstance[toReplaceIndex] = updatedResult.result; + + // Optionally, update the map with the modified array + this.airDCPPSearchResults.set(instanceId, resultsForInstance); } } }, ); // Handle searchComplete event - this.socketIOInstance.on("searchComplete", async (instanceId: string) => { - this.logger.info(`Search complete for instance ID ${instanceId}`); - await this.produceResultsToKafka(instanceId); + this.socketIOInstance.on("searchesSent", async (data: any) => { + this.logger.info( + `Search complete for query: "${data.searchInfo.query.pattern}"`, + ); + await this.produceResultsToKafka(); }); }, async stopped() {