🔧 Fixes to search query builder

This commit is contained in:
2024-06-12 21:41:29 -05:00
parent 60e5b6f61b
commit 55494abdc0
2 changed files with 177 additions and 148 deletions

View File

@@ -13,16 +13,9 @@ interface Comic {
}; };
} }
interface PaginatedResult {
wantedComics: Comic[];
total: number;
page: number;
limit: number;
pages: number;
}
export default class AutoDownloadService extends Service { export default class AutoDownloadService extends Service {
private kafkaProducer: any; private kafkaProducer: any;
private readonly BATCH_SIZE = 100; // Adjust based on your system capacity
// @ts-ignore // @ts-ignore
public constructor( public constructor(
@@ -41,13 +34,23 @@ export default class AutoDownloadService extends Service {
const limit = this.BATCH_SIZE; const limit = this.BATCH_SIZE;
while (true) { while (true) {
const result: PaginatedResult = await this.broker.call( const comics: Comic[] = await this.broker.call(
"library.getComicsMarkedAsWanted", "library.getComicsMarkedAsWanted",
{ page, limit }, { page, limit },
); );
if (!result || !result.wantedComics) { // Log the entire result object for debugging
this.logger.error("Invalid response structure", result); this.logger.info(
"Received comics from getComicsMarkedAsWanted:",
JSON.stringify(comics, null, 2),
);
// Check if result structure is correct
if (!Array.isArray(comics)) {
this.logger.error(
"Invalid response structure",
JSON.stringify(comics, null, 2),
);
throw new Errors.MoleculerError( throw new Errors.MoleculerError(
"Invalid response structure from getComicsMarkedAsWanted", "Invalid response structure from getComicsMarkedAsWanted",
500, 500,
@@ -56,41 +59,22 @@ export default class AutoDownloadService extends Service {
} }
this.logger.info( this.logger.info(
`Fetched ${result.wantedComics.length} comics from page ${page} of ${result.pages}`, `Fetched ${comics.length} comics from page ${page}`,
); );
for (const comic of result.wantedComics) { // Enqueue the jobs in batches
if (comic.wanted.markEntireVolumeWanted) { for (const comic of comics) {
const issues: any = await this.broker.call( await this.produceJobToKafka(comic);
"comicvine.getIssuesForVolume",
{
volumeId: comic.wanted.volume.id,
},
);
for (const issue of issues) {
await this.produceJobToKafka(
comic.wanted.volume.name,
issue,
);
}
} else if (
comic.wanted.issues &&
comic.wanted.issues.length > 0
) {
for (const issue of comic.wanted.issues) {
await this.produceJobToKafka(
comic.wanted.volume?.name,
issue,
);
}
}
} }
if (page >= result.pages) break; if (comics.length < limit) break; // End loop if fewer comics than the limit were fetched
page += 1; page += 1;
} }
return { success: true, message: "Processing started." }; return {
success: true,
message: "Jobs enqueued for background processing.",
};
} catch (error) { } catch (error) {
this.logger.error("Error in searchWantedComics:", error); this.logger.error("Error in searchWantedComics:", error);
throw new Errors.MoleculerError( throw new Errors.MoleculerError(
@@ -104,13 +88,17 @@ export default class AutoDownloadService extends Service {
}, },
}, },
methods: { methods: {
produceJobToKafka: async (volumeName: string, issue: any) => { produceJobToKafka: async (comic: Comic) => {
const job = { volumeName, issue }; const job = { comic };
await this.kafkaProducer.send({ try {
topic: "comic-search-jobs", await this.kafkaProducer.send({
messages: [{ value: JSON.stringify(job) }], topic: "comic-search-jobs",
}); messages: [{ value: JSON.stringify(job) }],
this.logger.info("Produced job to Kafka:", job); });
this.logger.info("Produced job to Kafka:", job);
} catch (error) {
this.logger.error("Error producing job to Kafka:", error);
}
}, },
}, },
async started() { async started() {

View File

@@ -1,8 +1,9 @@
"use strict"; "use strict";
import { Service, ServiceBroker, ServiceSchema } from "moleculer"; import { Service, ServiceBroker, ServiceSchema } from "moleculer";
import { Kafka, EachMessagePayload, logLevel } from "kafkajs"; import { Kafka, EachMessagePayload, logLevel } from "kafkajs";
import { isUndefined } from "lodash";
import io from "socket.io-client"; import io from "socket.io-client";
import { isUndefined } from "lodash";
interface SearchResult { interface SearchResult {
result: { result: {
id: string; id: string;
@@ -17,12 +18,14 @@ interface SearchResultPayload {
updatedResult: SearchResult; updatedResult: SearchResult;
instanceId: string; instanceId: string;
} }
export default class ComicProcessorService extends Service { export default class ComicProcessorService extends Service {
private kafkaConsumer: any; private kafkaConsumer: any;
private socketIOInstance: any; private socketIOInstance: any;
private kafkaProducer: any; private kafkaProducer: any;
private prowlarrResultsMap: Map<string, any> = new Map(); private prowlarrResultsMap: Map<string, any> = new Map();
private airDCPPSearchResults: Array<any> = []; private airDCPPSearchResults: Map<string, any[]> = new Map();
private issuesToSearch: any = [];
// @ts-ignore // @ts-ignore
public constructor( public constructor(
@@ -42,71 +45,118 @@ export default class ComicProcessorService extends Service {
}; };
}, },
processJob: async (job: any) => { processJob: async (job: any) => {
this.logger.info("Processing job:", job); try {
const { volumeName, issue } = job; this.logger.info("Processing job:", JSON.stringify(job, null, 2));
const { year } = this.parseStringDate(issue.cover_date || issue.coverDate); const { comic } = job;
const settings: any = await this.broker.call("settings.getSettings", { const { volume, issues, markEntireVolumeWanted } = comic.wanted;
settingsKey: "directConnect",
});
const hubs = settings.client.hubs.map((hub: any) => hub.value);
const dcppSearchQuery = {
query: {
pattern: `${volumeName.replace(/#/g, "")} ${
issue.issue_number || issue.issueNumber
} ${year}`,
extensions: ["cbz", "cbr", "cb7"],
},
hub_urls: hubs,
priority: 5,
};
this.logger.info(
"DC++ search query:",
JSON.stringify(dcppSearchQuery, null, 4),
);
await this.broker.call("socket.search", { // If entire volume is marked as wanted, get their details from CV
query: dcppSearchQuery, if (markEntireVolumeWanted) {
config: { this.issuesToSearch = await this.broker.call(
hostname: "localhost:5600", "comicvine.getIssuesForVolume",
protocol: "http", { volumeId: volume.id },
username: "user", );
password: "pass", this.logger.info(
}, `The entire volume with id: ${volume.id} was marked as wanted.`,
namespace: "/automated", );
}); this.logger.info(`Fetched issues for ${volume.id}:`);
this.logger.info(`${this.issuesToSearch.length} issues to search`);
} else {
// Or proceed with `issues` from the wanted object.
this.issuesToSearch = issues;
}
const prowlarrResults = await this.broker.call("prowlarr.search", { for (const issue of this.issuesToSearch) {
prowlarrQuery: { // issue number
port: "9696", const inferredIssueNumber = issue.issueNumber
apiKey: "c4f42e265fb044dc81f7e88bd41c3367", ? issue.issueNumber
offset: 0, : issue.issue_number;
categories: [7030], // year
query: `${volumeName} ${issue.issueNumber} ${year}`, const { year } = this.parseStringDate(issue.coverDate);
host: "localhost", const inferredYear = year ? issue?.coverDate : issue.year;
limit: 100,
type: "search",
indexerIds: [2],
},
});
this.logger.info( const settings: any = await this.broker.call("settings.getSettings", {
"Prowlarr search results:", settingsKey: "directConnect",
JSON.stringify(prowlarrResults, null, 4), });
); const hubs = settings.client.hubs.map((hub: any) => hub.value);
// Store prowlarr results in map using unique key const dcppSearchQuery = {
const key = `${volumeName}-${issue.issueNumber}-${year}`; query: {
this.prowlarrResultsMap.set(key, prowlarrResults); pattern: `${volume.name.replace(
/#/g,
"",
)} ${inferredIssueNumber} ${inferredYear}`,
extensions: ["cbz", "cbr", "cb7"],
},
hub_urls: hubs,
priority: 5,
};
this.logger.info(
"DC++ search query:",
JSON.stringify(dcppSearchQuery, null, 4),
);
await this.broker.call("socket.search", {
query: dcppSearchQuery,
config: {
hostname: "localhost:5600",
protocol: "http",
username: "user",
password: "pass",
},
namespace: "/automated",
});
const prowlarrResults = await this.broker.call("prowlarr.search", {
prowlarrQuery: {
port: "9696",
apiKey: "c4f42e265fb044dc81f7e88bd41c3367",
offset: 0,
categories: [7030],
query: `${volume.name} ${issue.issueNumber} ${year}`,
host: "localhost",
limit: 100,
type: "search",
indexerIds: [2],
},
});
this.logger.info(
"Prowlarr search results:",
JSON.stringify(prowlarrResults, null, 4),
);
// Store prowlarr results in map using unique key
const key = `${volume.name}-${issue.issueNumber}-${year}`;
this.prowlarrResultsMap.set(key, prowlarrResults);
}
} catch (error) {
this.logger.error("Error processing job:", error);
}
}, },
produceResultsToKafka: async (dcppResults: any, prowlarrResults: any) => { produceResultsToKafka: async (dcppResults: any, prowlarrResults: any) => {
const results = { dcppResults, prowlarrResults }; const results = { dcppResults, prowlarrResults };
await this.kafkaProducer.send({ try {
topic: "comic-search-results", await this.kafkaProducer.send({
messages: [{ value: JSON.stringify(results) }], topic: "comic-search-results",
}); messages: [{ value: JSON.stringify(results) }],
this.logger.info( });
"Produced results to Kafka:", this.logger.info(
JSON.stringify(results, null, 4), "Produced results to Kafka:",
); JSON.stringify(results, null, 4),
);
// socket event for UI
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "searchResultsAvailable",
args: [
{
dcppResults,
},
],
});
} catch (error) {
this.logger.error("Error producing results to Kafka:", error);
}
}, },
}, },
async started() { async started() {
@@ -117,9 +167,22 @@ export default class ComicProcessorService extends Service {
}); });
this.kafkaConsumer = kafka.consumer({ groupId: "comic-processor-group" }); this.kafkaConsumer = kafka.consumer({ groupId: "comic-processor-group" });
this.kafkaProducer = kafka.producer(); this.kafkaProducer = kafka.producer();
this.kafkaConsumer.on("consumer.crash", (event: any) => {
this.logger.error("Consumer crash:", event);
});
this.kafkaConsumer.on("consumer.connect", () => {
this.logger.info("Consumer connected");
});
this.kafkaConsumer.on("consumer.disconnect", () => {
this.logger.info("Consumer disconnected");
});
this.kafkaConsumer.on("consumer.network.request_timeout", () => {
this.logger.warn("Consumer network request timeout");
});
await this.kafkaConsumer.connect(); await this.kafkaConsumer.connect();
await this.kafkaProducer.connect(); await this.kafkaProducer.connect();
this.logger.info("Kafka consumer and producer connected successfully.");
await this.kafkaConsumer.subscribe({ await this.kafkaConsumer.subscribe({
topic: "comic-search-jobs", topic: "comic-search-jobs",
@@ -130,10 +193,6 @@ export default class ComicProcessorService extends Service {
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
if (message.value) { if (message.value) {
const job = JSON.parse(message.value.toString()); const job = JSON.parse(message.value.toString());
this.logger.info(
"Consumed job from Kafka:",
JSON.stringify(job, null, 4),
);
await this.processJob(job); await this.processJob(job);
} else { } else {
this.logger.warn("Received message with null value"); this.logger.warn("Received message with null value");
@@ -149,6 +208,7 @@ export default class ComicProcessorService extends Service {
this.logger.info("Socket.IO connected successfully."); this.logger.info("Socket.IO connected successfully.");
}); });
// Handle searchResultAdded event
this.socketIOInstance.on( this.socketIOInstance.on(
"searchResultAdded", "searchResultAdded",
({ groupedResult, instanceId }: SearchResultPayload) => { ({ groupedResult, instanceId }: SearchResultPayload) => {
@@ -156,13 +216,14 @@ export default class ComicProcessorService extends Service {
"Received search result added:", "Received search result added:",
JSON.stringify(groupedResult, null, 4), JSON.stringify(groupedResult, null, 4),
); );
this.airDCPPSearchResults.push({ if (!this.airDCPPSearchResults.has(instanceId)) {
groupedResult: groupedResult.result, this.airDCPPSearchResults.set(instanceId, []);
instanceId, }
}); this.airDCPPSearchResults.get(instanceId).push(groupedResult.result);
}, },
); );
// Handle searchResultUpdated event
this.socketIOInstance.on( this.socketIOInstance.on(
"searchResultUpdated", "searchResultUpdated",
async ({ updatedResult, instanceId }: SearchResultPayload) => { async ({ updatedResult, instanceId }: SearchResultPayload) => {
@@ -170,50 +231,30 @@ export default class ComicProcessorService extends Service {
"Received search result update:", "Received search result update:",
JSON.stringify(updatedResult, null, 4), JSON.stringify(updatedResult, null, 4),
); );
if ( const resultsForInstance = this.airDCPPSearchResults.get(instanceId);
!isUndefined(updatedResult.result) && if (resultsForInstance) {
!isUndefined(this.airDCPPSearchResults.result) const toReplaceIndex = resultsForInstance.findIndex(
) { (element: any) => element.id === updatedResult.result.id,
const toReplaceIndex = this.airDCPPSearchResults.findIndex(
(element: any) => {
return element?.result.id === updatedResult.result.id;
},
); );
this.airDCPPSearchResults[toReplaceIndex] = { if (toReplaceIndex !== -1) {
result: updatedResult.result, resultsForInstance[toReplaceIndex] = updatedResult.result;
instanceId, }
};
} }
}, },
); );
this.socketIOInstance.on("searchComplete", async () => {
// Ensure results are not empty before producing to Kafka // Handle searchComplete event
if (this.airDCPPSearchResults.length > 0) { this.socketIOInstance.on("searchComplete", async (instanceId: string) => {
const results = this.airDCPPSearchResults.reduce((acc: any, item: any) => { this.logger.info(`Search complete for instance ID ${instanceId}`);
const key = item.instanceId; await this.produceResultsToKafka(instanceId);
if (!acc[key]) {
acc[key] = [];
}
acc[key].push(item);
return acc;
}, {});
await this.produceResultsToKafka(results, []);
} else {
this.logger.warn(
"AirDC++ search results are empty, not producing to Kafka.",
);
}
}); });
}, },
async stopped() { async stopped() {
await this.kafkaConsumer.disconnect(); await this.kafkaConsumer.disconnect();
await this.kafkaProducer.disconnect(); await this.kafkaProducer.disconnect();
this.logger.info("Kafka consumer and producer disconnected successfully.");
// Close Socket.IO connection
if (this.socketIOInstance) { if (this.socketIOInstance) {
this.socketIOInstance.close(); this.socketIOInstance.close();
this.logger.info("Socket.IO disconnected successfully.");
} }
}, },
}); });