🔧 Fixed awaits inside loops
This commit is contained in:
@@ -1,11 +1,12 @@
|
|||||||
"use strict";
|
"use strict";
|
||||||
import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer";
|
|
||||||
import { Kafka } from "kafkajs";
|
import { Kafka } from "kafkajs";
|
||||||
|
import type { Context, ServiceBroker, ServiceSchema } from "moleculer";
|
||||||
|
import { Errors, Service } from "moleculer";
|
||||||
|
|
||||||
interface Comic {
|
interface Comic {
|
||||||
wanted: {
|
wanted: {
|
||||||
markEntireVolumeWanted?: boolean;
|
markEntireVolumeWanted?: boolean;
|
||||||
issues?: Array<any>;
|
issues?: any[];
|
||||||
volume: {
|
volume: {
|
||||||
id: string;
|
id: string;
|
||||||
name: string;
|
name: string;
|
||||||
@@ -15,10 +16,11 @@ interface Comic {
|
|||||||
|
|
||||||
export default class AutoDownloadService extends Service {
|
export default class AutoDownloadService extends Service {
|
||||||
private kafkaProducer: any;
|
private kafkaProducer: any;
|
||||||
|
|
||||||
private readonly BATCH_SIZE = 100; // Adjust based on your system capacity
|
private readonly BATCH_SIZE = 100; // Adjust based on your system capacity
|
||||||
|
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
public constructor(
|
constructor(
|
||||||
public broker: ServiceBroker,
|
public broker: ServiceBroker,
|
||||||
schema: ServiceSchema<{}> = { name: "autodownload" },
|
schema: ServiceSchema<{}> = { name: "autodownload" },
|
||||||
) {
|
) {
|
||||||
@@ -30,22 +32,20 @@ export default class AutoDownloadService extends Service {
|
|||||||
rest: "POST /searchWantedComics",
|
rest: "POST /searchWantedComics",
|
||||||
handler: async (ctx: Context<{}>) => {
|
handler: async (ctx: Context<{}>) => {
|
||||||
try {
|
try {
|
||||||
|
/* eslint-disable no-await-in-loop */
|
||||||
let page = 1;
|
let page = 1;
|
||||||
const limit = this.BATCH_SIZE;
|
const limit = this.BATCH_SIZE;
|
||||||
|
let comics: Comic[];
|
||||||
while (true) {
|
do {
|
||||||
const comics: Comic[] = await this.broker.call(
|
comics = await this.broker.call(
|
||||||
"library.getComicsMarkedAsWanted",
|
"library.getComicsMarkedAsWanted",
|
||||||
{ page, limit },
|
{ page, limit },
|
||||||
);
|
);
|
||||||
|
// Log debugging info
|
||||||
// Log the entire result object for debugging
|
|
||||||
this.logger.info(
|
this.logger.info(
|
||||||
"Received comics from getComicsMarkedAsWanted:",
|
"Received comics from getComicsMarkedAsWanted:",
|
||||||
JSON.stringify(comics, null, 2),
|
JSON.stringify(comics, null, 2),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Check if result structure is correct
|
|
||||||
if (!Array.isArray(comics)) {
|
if (!Array.isArray(comics)) {
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
"Invalid response structure",
|
"Invalid response structure",
|
||||||
@@ -57,19 +57,14 @@ export default class AutoDownloadService extends Service {
|
|||||||
"INVALID_RESPONSE_STRUCTURE",
|
"INVALID_RESPONSE_STRUCTURE",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.info(
|
this.logger.info(
|
||||||
`Fetched ${comics.length} comics from page ${page}`,
|
`Fetched ${comics.length} comics from page ${page}`,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Enqueue the jobs in batches
|
|
||||||
for (const comic of comics) {
|
for (const comic of comics) {
|
||||||
await this.produceJobToKafka(comic);
|
await this.produceJobToKafka(comic);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (comics.length < limit) break; // End loop if fewer comics than the limit were fetched
|
|
||||||
page += 1;
|
page += 1;
|
||||||
}
|
} while (comics.length === limit);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
success: true,
|
success: true,
|
||||||
|
|||||||
@@ -162,8 +162,6 @@ export default class ComicProcessorService extends Service {
|
|||||||
this.airDCPPSearchResults,
|
this.airDCPPSearchResults,
|
||||||
query,
|
query,
|
||||||
);
|
);
|
||||||
console.log("Final result:");
|
|
||||||
console.log(JSON.stringify(finalResult, null, 4));
|
|
||||||
/*
|
/*
|
||||||
Kafka messages need to be in a format that can be serialized to JSON,
|
Kafka messages need to be in a format that can be serialized to JSON,
|
||||||
and a Map is not directly serializable in a way that retains its structure,
|
and a Map is not directly serializable in a way that retains its structure,
|
||||||
@@ -177,7 +175,7 @@ export default class ComicProcessorService extends Service {
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
});
|
});
|
||||||
console.log(`Produced results to Kafka.`);
|
this.logger.info(`Produced results to Kafka.`);
|
||||||
|
|
||||||
// socket event for UI
|
// socket event for UI
|
||||||
await this.broker.call("socket.broadcast", {
|
await this.broker.call("socket.broadcast", {
|
||||||
@@ -260,8 +258,6 @@ export default class ComicProcessorService extends Service {
|
|||||||
this.airDCPPSearchResults.get(entityId).push(payload);
|
this.airDCPPSearchResults.get(entityId).push(payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log(typeof entityId, entityId);
|
|
||||||
console.log(entityId);
|
|
||||||
console.log(
|
console.log(
|
||||||
"Updated airDCPPSearchResults:",
|
"Updated airDCPPSearchResults:",
|
||||||
JSON.stringify(Array.from(this.airDCPPSearchResults.entries()), null, 4),
|
JSON.stringify(Array.from(this.airDCPPSearchResults.entries()), null, 4),
|
||||||
@@ -278,15 +274,15 @@ export default class ComicProcessorService extends Service {
|
|||||||
|
|
||||||
if (resultsForInstance) {
|
if (resultsForInstance) {
|
||||||
const toReplaceIndex = resultsForInstance.findIndex((element: any) => {
|
const toReplaceIndex = resultsForInstance.findIndex((element: any) => {
|
||||||
console.log("search result updated!");
|
this.logger.info("search result updated!");
|
||||||
console.log(JSON.stringify(element, null, 4));
|
this.logger.info(JSON.stringify(element, null, 4));
|
||||||
return element.id === payload.id;
|
return element.id === payload.id;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (toReplaceIndex !== -1) {
|
if (toReplaceIndex !== -1) {
|
||||||
// Replace the existing result with the updated result
|
// Replace the existing result with the updated result
|
||||||
resultsForInstance[toReplaceIndex] = payload;
|
resultsForInstance[toReplaceIndex] = payload;
|
||||||
|
rty6j
|
||||||
// Optionally, update the map with the modified array
|
// Optionally, update the map with the modified array
|
||||||
this.airDCPPSearchResults.set(entityId, resultsForInstance);
|
this.airDCPPSearchResults.set(entityId, resultsForInstance);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user