🪳 Serialized Map to be compatible with kafka

This commit is contained in:
2024-06-18 00:06:25 -04:00
parent e6a85e6a39
commit ecdc3845cb

View File

@@ -24,7 +24,7 @@ export default class ComicProcessorService extends Service {
private socketIOInstance: any;
private kafkaProducer: any;
private prowlarrResultsMap: Map<string, any> = new Map();
private airDCPPSearchResults: Map<string, any[]> = new Map();
private airDCPPSearchResults: Map<number, any[]> = new Map();
private issuesToSearch: any = [];
// @ts-ignore
@@ -84,10 +84,10 @@ export default class ComicProcessorService extends Service {
// 3. Orchestrate the query
const dcppSearchQuery = {
query: {
pattern: `${volume.name.replace(
/#/g,
"",
)} ${inferredIssueNumber} ${inferredYear}`,
pattern: `${volume.name
.replace(/[^\w\s]/g, "")
.replace(/\s+/g, " ")
.trim()}`,
extensions: ["cbz", "cbr", "cb7"],
},
hub_urls: hubs,
@@ -109,51 +109,57 @@ export default class ComicProcessorService extends Service {
namespace: "/automated",
});
const prowlarrResults = await this.broker.call("prowlarr.search", {
prowlarrQuery: {
port: "9696",
apiKey: "c4f42e265fb044dc81f7e88bd41c3367",
offset: 0,
categories: [7030],
query: `${volume.name} ${issue.issueNumber} ${year}`,
host: "localhost",
limit: 100,
type: "search",
indexerIds: [2],
},
});
this.logger.info(
"Prowlarr search results:",
JSON.stringify(prowlarrResults, null, 4),
);
// const prowlarrResults = await this.broker.call("prowlarr.search", {
// prowlarrQuery: {
// port: "9696",
// apiKey: "c4f42e265fb044dc81f7e88bd41c3367",
// offset: 0,
// categories: [7030],
// query: `${volume.name} ${issue.issueNumber} ${year}`,
// host: "localhost",
// limit: 100,
// type: "search",
// indexerIds: [2],
// },
// });
//
// this.logger.info(
// "Prowlarr search results:",
// JSON.stringify(prowlarrResults, null, 4),
// );
// Store prowlarr results in map using unique key
const key = `${volume.name}-${issue.issueNumber}-${year}`;
this.prowlarrResultsMap.set(key, prowlarrResults);
// const key = `${volume.name}-${issue.issueNumber}-${year}`;
// this.prowlarrResultsMap.set(key, prowlarrResults);
}
} catch (error) {
this.logger.error("Error processing job:", error);
}
},
produceResultsToKafka: async (dcppResults: any, prowlarrResults: any) => {
const results = { dcppResults, prowlarrResults };
produceResultsToKafka: async () => {
try {
/*
Kafka messages need to be in a format that can be serialized to JSON, and a Map is not directly serializable in a way that retains its structure, hence we use Object.fromEntries
*/
await this.kafkaProducer.send({
topic: "comic-search-results",
messages: [{ value: JSON.stringify(results) }],
messages: [
{
value: JSON.stringify(
Object.fromEntries(this.airDCPPSearchResults),
),
},
],
});
this.logger.info(
"Produced results to Kafka:",
JSON.stringify(results, null, 4),
);
console.log(`Produced results to Kafka.`);
// socket event for UI
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "searchResultsAvailable",
args: [
{
dcppResults,
bokya: Object.fromEntries(this.airDCPPSearchResults),
},
],
});
@@ -230,26 +236,30 @@ export default class ComicProcessorService extends Service {
this.socketIOInstance.on(
"searchResultUpdated",
async ({ updatedResult, instanceId }: SearchResultPayload) => {
this.logger.info(
"Received search result update:",
JSON.stringify(updatedResult, null, 4),
);
const resultsForInstance = this.airDCPPSearchResults.get(instanceId);
if (resultsForInstance) {
const toReplaceIndex = resultsForInstance.findIndex(
(element: any) => element.id === updatedResult.result.id,
);
if (toReplaceIndex !== -1) {
// Replace the existing result with the updated result
resultsForInstance[toReplaceIndex] = updatedResult.result;
// Optionally, update the map with the modified array
this.airDCPPSearchResults.set(instanceId, resultsForInstance);
}
}
},
);
// Handle searchComplete event
this.socketIOInstance.on("searchComplete", async (instanceId: string) => {
this.logger.info(`Search complete for instance ID ${instanceId}`);
await this.produceResultsToKafka(instanceId);
this.socketIOInstance.on("searchesSent", async (data: any) => {
this.logger.info(
`Search complete for query: "${data.searchInfo.query.pattern}"`,
);
await this.produceResultsToKafka();
});
},
async stopped() {