118 lines
3.1 KiB
TypeScript
118 lines
3.1 KiB
TypeScript
import type { Producer } from "kafkajs";
|
|
import { Kafka } from "kafkajs";
|
|
import type { Context, ServiceBroker } from "moleculer";
|
|
import { Errors, Service } from "moleculer";
|
|
|
|
interface Issue {
|
|
id: string;
|
|
number: number;
|
|
}
|
|
|
|
interface Comic {
|
|
wanted: {
|
|
markEntireVolumeWanted?: boolean;
|
|
issues?: Issue[];
|
|
volume: {
|
|
id: string;
|
|
name: string;
|
|
};
|
|
};
|
|
}
|
|
|
|
export default class AutoDownloadService extends Service {
|
|
private kafkaProducer!: Producer;
|
|
|
|
private readonly BATCH_SIZE = 100;
|
|
|
|
// @ts-ignore -- Moleculer requires this constructor signature for service instantiation
|
|
constructor(broker: ServiceBroker) {
|
|
super(broker);
|
|
this.parseServiceSchema({
|
|
name: "autodownload",
|
|
actions: {
|
|
searchWantedComics: {
|
|
rest: "POST /searchWantedComics",
|
|
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
|
handler: async (ctx: Context<Record<string, never>>) => {
|
|
try {
|
|
/* eslint-disable no-await-in-loop */
|
|
let page = 1;
|
|
const limit = this.BATCH_SIZE;
|
|
let comics: Comic[];
|
|
do {
|
|
comics = await this.broker.call(
|
|
"library.getComicsMarkedAsWanted",
|
|
{ page, limit },
|
|
);
|
|
// Log debugging info
|
|
this.logger.info(
|
|
"Received comics from getComicsMarkedAsWanted:",
|
|
JSON.stringify(comics, null, 2),
|
|
);
|
|
if (!Array.isArray(comics)) {
|
|
this.logger.error(
|
|
"Invalid response structure",
|
|
JSON.stringify(comics, null, 2),
|
|
);
|
|
throw new Errors.MoleculerError(
|
|
"Invalid response structure from getComicsMarkedAsWanted",
|
|
500,
|
|
"INVALID_RESPONSE_STRUCTURE",
|
|
);
|
|
}
|
|
this.logger.info(
|
|
`Fetched ${comics.length} comics from page ${page}`,
|
|
);
|
|
for (const comic of comics) {
|
|
await this.produceJobToKafka(comic);
|
|
}
|
|
page += 1;
|
|
} while (comics.length === limit);
|
|
|
|
return {
|
|
success: true,
|
|
message: "Jobs enqueued for background processing.",
|
|
};
|
|
} catch (error) {
|
|
this.logger.error("Error in searchWantedComics:", error);
|
|
throw new Errors.MoleculerError(
|
|
"Failed to search wanted comics.",
|
|
500,
|
|
"SEARCH_WANTED_COMICS_ERROR",
|
|
{ error },
|
|
);
|
|
}
|
|
},
|
|
},
|
|
},
|
|
methods: {
|
|
produceJobToKafka: async (comic: Comic) => {
|
|
const job = { comic };
|
|
try {
|
|
await this.kafkaProducer.send({
|
|
topic: "comic-search-jobs",
|
|
messages: [{ value: JSON.stringify(job) }],
|
|
});
|
|
this.logger.info("Produced job to Kafka:", job);
|
|
} catch (error) {
|
|
this.logger.error("Error producing job to Kafka:", error);
|
|
}
|
|
},
|
|
},
|
|
async started() {
|
|
const kafka = new Kafka({
|
|
clientId: "comic-search-service",
|
|
brokers: [process.env.KAFKA_BROKER_URI || "localhost:9092"],
|
|
});
|
|
this.kafkaProducer = kafka.producer();
|
|
await this.kafkaProducer.connect();
|
|
this.logger.info("Kafka producer connected successfully.");
|
|
},
|
|
async stopped() {
|
|
await this.kafkaProducer.disconnect();
|
|
this.logger.info("Kafka producer disconnected successfully.");
|
|
},
|
|
});
|
|
}
|
|
}
|