diff --git a/services/autodownload.service.ts b/services/autodownload.service.ts index e3b5690..a568b90 100644 --- a/services/autodownload.service.ts +++ b/services/autodownload.service.ts @@ -13,16 +13,9 @@ interface Comic { }; } -interface PaginatedResult { - wantedComics: Comic[]; - total: number; - page: number; - limit: number; - pages: number; -} - export default class AutoDownloadService extends Service { private kafkaProducer: any; + private readonly BATCH_SIZE = 100; // Adjust based on your system capacity // @ts-ignore public constructor( @@ -41,13 +34,23 @@ export default class AutoDownloadService extends Service { const limit = this.BATCH_SIZE; while (true) { - const result: PaginatedResult = await this.broker.call( + const comics: Comic[] = await this.broker.call( "library.getComicsMarkedAsWanted", { page, limit }, ); - if (!result || !result.wantedComics) { - this.logger.error("Invalid response structure", result); + // Log the entire result object for debugging + this.logger.info( + "Received comics from getComicsMarkedAsWanted:", + JSON.stringify(comics, null, 2), + ); + + // Check if result structure is correct + if (!Array.isArray(comics)) { + this.logger.error( + "Invalid response structure", + JSON.stringify(comics, null, 2), + ); throw new Errors.MoleculerError( "Invalid response structure from getComicsMarkedAsWanted", 500, @@ -56,41 +59,22 @@ export default class AutoDownloadService extends Service { } this.logger.info( - `Fetched ${result.wantedComics.length} comics from page ${page} of ${result.pages}`, + `Fetched ${comics.length} comics from page ${page}`, ); - for (const comic of result.wantedComics) { - if (comic.wanted.markEntireVolumeWanted) { - const issues: any = await this.broker.call( - "comicvine.getIssuesForVolume", - { - volumeId: comic.wanted.volume.id, - }, - ); - for (const issue of issues) { - await this.produceJobToKafka( - comic.wanted.volume.name, - issue, - ); - } - } else if ( - comic.wanted.issues && - comic.wanted.issues.length > 0 - ) { - for (const issue of comic.wanted.issues) { - await this.produceJobToKafka( - comic.wanted.volume?.name, - issue, - ); - } - } + // Enqueue the jobs in batches + for (const comic of comics) { + await this.produceJobToKafka(comic); } - if (page >= result.pages) break; + if (comics.length < limit) break; // End loop if fewer comics than the limit were fetched page += 1; } - return { success: true, message: "Processing started." }; + return { + success: true, + message: "Jobs enqueued for background processing.", + }; } catch (error) { this.logger.error("Error in searchWantedComics:", error); throw new Errors.MoleculerError( @@ -104,13 +88,17 @@ export default class AutoDownloadService extends Service { }, }, methods: { - produceJobToKafka: async (volumeName: string, issue: any) => { - const job = { volumeName, issue }; - await this.kafkaProducer.send({ - topic: "comic-search-jobs", - messages: [{ value: JSON.stringify(job) }], - }); - this.logger.info("Produced job to Kafka:", job); + produceJobToKafka: async (comic: Comic) => { + const job = { comic }; + try { + await this.kafkaProducer.send({ + topic: "comic-search-jobs", + messages: [{ value: JSON.stringify(job) }], + }); + this.logger.info("Produced job to Kafka:", job); + } catch (error) { + this.logger.error("Error producing job to Kafka:", error); + } }, }, async started() { diff --git a/services/comicprocessor.service.ts b/services/comicprocessor.service.ts index ff1a8ad..e79e4b7 100644 --- a/services/comicprocessor.service.ts +++ b/services/comicprocessor.service.ts @@ -1,8 +1,9 @@ "use strict"; import { Service, ServiceBroker, ServiceSchema } from "moleculer"; import { Kafka, EachMessagePayload, logLevel } from "kafkajs"; -import { isUndefined } from "lodash"; import io from "socket.io-client"; +import { isUndefined } from "lodash"; + interface SearchResult { result: { id: string; @@ -17,12 +18,14 @@ interface SearchResultPayload { updatedResult: SearchResult; instanceId: string; } + export default class ComicProcessorService extends Service { private kafkaConsumer: any; private socketIOInstance: any; private kafkaProducer: any; private prowlarrResultsMap: Map = new Map(); - private airDCPPSearchResults: Array = []; + private airDCPPSearchResults: Map = new Map(); + private issuesToSearch: any = []; // @ts-ignore public constructor( @@ -42,71 +45,118 @@ export default class ComicProcessorService extends Service { }; }, processJob: async (job: any) => { - this.logger.info("Processing job:", job); - const { volumeName, issue } = job; - const { year } = this.parseStringDate(issue.cover_date || issue.coverDate); - const settings: any = await this.broker.call("settings.getSettings", { - settingsKey: "directConnect", - }); - const hubs = settings.client.hubs.map((hub: any) => hub.value); - const dcppSearchQuery = { - query: { - pattern: `${volumeName.replace(/#/g, "")} ${ - issue.issue_number || issue.issueNumber - } ${year}`, - extensions: ["cbz", "cbr", "cb7"], - }, - hub_urls: hubs, - priority: 5, - }; - this.logger.info( - "DC++ search query:", - JSON.stringify(dcppSearchQuery, null, 4), - ); + try { + this.logger.info("Processing job:", JSON.stringify(job, null, 2)); + const { comic } = job; + const { volume, issues, markEntireVolumeWanted } = comic.wanted; - await this.broker.call("socket.search", { - query: dcppSearchQuery, - config: { - hostname: "localhost:5600", - protocol: "http", - username: "user", - password: "pass", - }, - namespace: "/automated", - }); + // If entire volume is marked as wanted, get their details from CV + if (markEntireVolumeWanted) { + this.issuesToSearch = await this.broker.call( + "comicvine.getIssuesForVolume", + { volumeId: volume.id }, + ); + this.logger.info( + `The entire volume with id: ${volume.id} was marked as wanted.`, + ); + this.logger.info(`Fetched issues for ${volume.id}:`); + this.logger.info(`${this.issuesToSearch.length} issues to search`); + } else { + // Or proceed with `issues` from the wanted object. + this.issuesToSearch = issues; + } - const prowlarrResults = await this.broker.call("prowlarr.search", { - prowlarrQuery: { - port: "9696", - apiKey: "c4f42e265fb044dc81f7e88bd41c3367", - offset: 0, - categories: [7030], - query: `${volumeName} ${issue.issueNumber} ${year}`, - host: "localhost", - limit: 100, - type: "search", - indexerIds: [2], - }, - }); + for (const issue of this.issuesToSearch) { + // issue number + const inferredIssueNumber = issue.issueNumber + ? issue.issueNumber + : issue.issue_number; + // year + const { year } = this.parseStringDate(issue.coverDate); + const inferredYear = year ? issue?.coverDate : issue.year; - this.logger.info( - "Prowlarr search results:", - JSON.stringify(prowlarrResults, null, 4), - ); - // Store prowlarr results in map using unique key - const key = `${volumeName}-${issue.issueNumber}-${year}`; - this.prowlarrResultsMap.set(key, prowlarrResults); + const settings: any = await this.broker.call("settings.getSettings", { + settingsKey: "directConnect", + }); + const hubs = settings.client.hubs.map((hub: any) => hub.value); + const dcppSearchQuery = { + query: { + pattern: `${volume.name.replace( + /#/g, + "", + )} ${inferredIssueNumber} ${inferredYear}`, + extensions: ["cbz", "cbr", "cb7"], + }, + hub_urls: hubs, + priority: 5, + }; + this.logger.info( + "DC++ search query:", + JSON.stringify(dcppSearchQuery, null, 4), + ); + + await this.broker.call("socket.search", { + query: dcppSearchQuery, + config: { + hostname: "localhost:5600", + protocol: "http", + username: "user", + password: "pass", + }, + namespace: "/automated", + }); + + const prowlarrResults = await this.broker.call("prowlarr.search", { + prowlarrQuery: { + port: "9696", + apiKey: "c4f42e265fb044dc81f7e88bd41c3367", + offset: 0, + categories: [7030], + query: `${volume.name} ${issue.issueNumber} ${year}`, + host: "localhost", + limit: 100, + type: "search", + indexerIds: [2], + }, + }); + + this.logger.info( + "Prowlarr search results:", + JSON.stringify(prowlarrResults, null, 4), + ); + + // Store prowlarr results in map using unique key + const key = `${volume.name}-${issue.issueNumber}-${year}`; + this.prowlarrResultsMap.set(key, prowlarrResults); + } + } catch (error) { + this.logger.error("Error processing job:", error); + } }, produceResultsToKafka: async (dcppResults: any, prowlarrResults: any) => { const results = { dcppResults, prowlarrResults }; - await this.kafkaProducer.send({ - topic: "comic-search-results", - messages: [{ value: JSON.stringify(results) }], - }); - this.logger.info( - "Produced results to Kafka:", - JSON.stringify(results, null, 4), - ); + try { + await this.kafkaProducer.send({ + topic: "comic-search-results", + messages: [{ value: JSON.stringify(results) }], + }); + this.logger.info( + "Produced results to Kafka:", + JSON.stringify(results, null, 4), + ); + // socket event for UI + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "searchResultsAvailable", + args: [ + { + dcppResults, + }, + ], + }); + } catch (error) { + this.logger.error("Error producing results to Kafka:", error); + } }, }, async started() { @@ -117,9 +167,22 @@ export default class ComicProcessorService extends Service { }); this.kafkaConsumer = kafka.consumer({ groupId: "comic-processor-group" }); this.kafkaProducer = kafka.producer(); + + this.kafkaConsumer.on("consumer.crash", (event: any) => { + this.logger.error("Consumer crash:", event); + }); + this.kafkaConsumer.on("consumer.connect", () => { + this.logger.info("Consumer connected"); + }); + this.kafkaConsumer.on("consumer.disconnect", () => { + this.logger.info("Consumer disconnected"); + }); + this.kafkaConsumer.on("consumer.network.request_timeout", () => { + this.logger.warn("Consumer network request timeout"); + }); + await this.kafkaConsumer.connect(); await this.kafkaProducer.connect(); - this.logger.info("Kafka consumer and producer connected successfully."); await this.kafkaConsumer.subscribe({ topic: "comic-search-jobs", @@ -130,10 +193,6 @@ export default class ComicProcessorService extends Service { eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { if (message.value) { const job = JSON.parse(message.value.toString()); - this.logger.info( - "Consumed job from Kafka:", - JSON.stringify(job, null, 4), - ); await this.processJob(job); } else { this.logger.warn("Received message with null value"); @@ -149,6 +208,7 @@ export default class ComicProcessorService extends Service { this.logger.info("Socket.IO connected successfully."); }); + // Handle searchResultAdded event this.socketIOInstance.on( "searchResultAdded", ({ groupedResult, instanceId }: SearchResultPayload) => { @@ -156,13 +216,14 @@ export default class ComicProcessorService extends Service { "Received search result added:", JSON.stringify(groupedResult, null, 4), ); - this.airDCPPSearchResults.push({ - groupedResult: groupedResult.result, - instanceId, - }); + if (!this.airDCPPSearchResults.has(instanceId)) { + this.airDCPPSearchResults.set(instanceId, []); + } + this.airDCPPSearchResults.get(instanceId).push(groupedResult.result); }, ); + // Handle searchResultUpdated event this.socketIOInstance.on( "searchResultUpdated", async ({ updatedResult, instanceId }: SearchResultPayload) => { @@ -170,50 +231,30 @@ export default class ComicProcessorService extends Service { "Received search result update:", JSON.stringify(updatedResult, null, 4), ); - if ( - !isUndefined(updatedResult.result) && - !isUndefined(this.airDCPPSearchResults.result) - ) { - const toReplaceIndex = this.airDCPPSearchResults.findIndex( - (element: any) => { - return element?.result.id === updatedResult.result.id; - }, + const resultsForInstance = this.airDCPPSearchResults.get(instanceId); + if (resultsForInstance) { + const toReplaceIndex = resultsForInstance.findIndex( + (element: any) => element.id === updatedResult.result.id, ); - this.airDCPPSearchResults[toReplaceIndex] = { - result: updatedResult.result, - instanceId, - }; + if (toReplaceIndex !== -1) { + resultsForInstance[toReplaceIndex] = updatedResult.result; + } } }, ); - this.socketIOInstance.on("searchComplete", async () => { - // Ensure results are not empty before producing to Kafka - if (this.airDCPPSearchResults.length > 0) { - const results = this.airDCPPSearchResults.reduce((acc: any, item: any) => { - const key = item.instanceId; - if (!acc[key]) { - acc[key] = []; - } - acc[key].push(item); - return acc; - }, {}); - await this.produceResultsToKafka(results, []); - } else { - this.logger.warn( - "AirDC++ search results are empty, not producing to Kafka.", - ); - } + + // Handle searchComplete event + this.socketIOInstance.on("searchComplete", async (instanceId: string) => { + this.logger.info(`Search complete for instance ID ${instanceId}`); + await this.produceResultsToKafka(instanceId); }); }, async stopped() { await this.kafkaConsumer.disconnect(); await this.kafkaProducer.disconnect(); - this.logger.info("Kafka consumer and producer disconnected successfully."); - // Close Socket.IO connection if (this.socketIOInstance) { this.socketIOInstance.close(); - this.logger.info("Socket.IO disconnected successfully."); } }, });