From 16d9b50f21952fa2bc1264a141edd0ee9c3fc822 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Tue, 14 Sep 2021 23:13:34 -0700 Subject: [PATCH] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=20Formatting?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- queue/consumer.ts | 35 ------------ queue/importQueue.ts | 4 -- services/api.service.ts | 13 +---- services/import.service.ts | 114 ++++++++++++++++++++----------------- utils/logger.utils.ts | 4 +- 5 files changed, 66 insertions(+), 104 deletions(-) delete mode 100644 queue/consumer.ts diff --git a/queue/consumer.ts b/queue/consumer.ts deleted file mode 100644 index 085dc5a..0000000 --- a/queue/consumer.ts +++ /dev/null @@ -1,35 +0,0 @@ -const amqp = require("amqplib/callback_api"); -export const connectQueue = (socketConnection) => { - amqp.connect("amqp://localhost", (error0, connection) => { - if (error0) { - throw error0; - } - connection.createChannel((error1, channel) => { - if (error1) { - throw error1; - } - const queue = "comicBookCovers"; - - channel.assertQueue(queue, { - durable: false, - }); - - console.log( - " [*] Waiting for comic book cover data in %s. To exit press CTRL+C", - queue - ); - - channel.consume( - queue, - (data) => { - console.log("data", data); - //Socket Trigger All Clients - socketConnection.emit("coverExtracted", JSON.parse(data.content.toString())); - }, - { - noAck: true, - } - ); - }); - }); -}; diff --git a/queue/importQueue.ts b/queue/importQueue.ts index 8df6987..a45ec54 100644 --- a/queue/importQueue.ts +++ b/queue/importQueue.ts @@ -1,8 +1,6 @@ import { logger } from "../utils/logger.utils"; - //RabbitMQ const amqp = require("amqplib/callback_api"); - const rabbitUrl = "amqp://localhost"; export const sendRabbitMQ = (queueName, data) => { @@ -16,7 +14,6 @@ export const sendRabbitMQ = (queueName, data) => { if (error1) { throw error1; } - channel.prefetch(1); const queue = queueName; // Checks for “queueName (updateStock)” queue. If it doesn’t exist, then it creates one. channel.assertQueue(queue, { @@ -31,4 +28,3 @@ export const sendRabbitMQ = (queueName, data) => { }, 500); }); }; -module.exports = sendRabbitMQ; diff --git a/services/api.service.ts b/services/api.service.ts index 591b8f0..b2aee75 100644 --- a/services/api.service.ts +++ b/services/api.service.ts @@ -1,7 +1,5 @@ import { Service, ServiceBroker, Context } from "moleculer"; import ApiGateway from "moleculer-web"; -import { connectQueue } from "../queue/consumer"; -const IO = require("socket.io")(); export default class ApiService extends Service { public constructor(broker: ServiceBroker) { super(broker); @@ -71,20 +69,11 @@ export default class ApiService extends Service { }, }, events: { - "**"(payload, sender, event) { - if (this.io) - this.io.emit("event", { - sender, - event, - payload, - }); - }, + }, methods: {}, started(): any { - this.io = IO.listen(this.server); - connectQueue(this.io); }, }); } diff --git a/services/import.service.ts b/services/import.service.ts index 45dab03..4de3c66 100644 --- a/services/import.service.ts +++ b/services/import.service.ts @@ -13,7 +13,7 @@ import { walkFolder } from "../utils/file.utils"; import { convertXMLToJSON } from "../utils/xml.utils"; import https from "https"; import { logger } from "../utils/logger.utils"; -const rabbitmq = require("../queue/importQueue"); +import { sendRabbitMQ } from "../queue/importQueue"; import { IExtractComicBookCoverErrorResponse, IExtractedComicBookCoverFile, @@ -66,6 +66,11 @@ export default class ImportService extends Service { }, importComicsToDb: { rest: "POST /importComicsToDB", + bulkhead: { + enabled: true, + concurrency: 50, + maxQueueSize: 100, + }, params: {}, async handler( ctx: Context<{ @@ -82,57 +87,64 @@ export default class ImportService extends Service { ]; }> ) { - const { extractionOptions, walkedFolders } = - ctx.params; - map(walkedFolders, async (folder, idx) => { - let comicExists = await Comic.exists({ - "rawFileDetails.name": `${folder.name}`, - }); - if (!comicExists) { - let comicBookCoverMetadata: - | IExtractedComicBookCoverFile - | IExtractComicBookCoverErrorResponse - | IExtractedComicBookCoverFile[] = await extractCoverFromFile( - extractionOptions, - folder - ); + try { + const { extractionOptions, walkedFolders } = + ctx.params; + map(walkedFolders, async (folder, idx) => { + let comicExists = await Comic.exists({ + "rawFileDetails.name": `${folder.name}`, + }); + if (!comicExists) { + // 1. Extract cover and cover metadata + let comicBookCoverMetadata: + | IExtractedComicBookCoverFile + | IExtractComicBookCoverErrorResponse + | IExtractedComicBookCoverFile[] = + await extractCoverFromFile( + extractionOptions, + folder + ); - // const dbImportResult = - // await this.broker.call( - // "import.rawImportToDB", - // { - // importStatus: { - // isImported: true, - // tagged: false, - // matchedResult: { - // score: "0", - // }, - // }, - // rawFileDetails: - // comicBookCoverMetadata, - // sourcedMetadata: { - // comicvine: {}, - // }, - // }, - // {} - // ); - rabbitmq( - "comicBookCovers", - JSON.stringify({ - comicBookCoverMetadata, - }) - ); - } else { - logger.info( - `Comic: \"${folder.name}\" already exists in the database` - ); - rabbitmq("comicBookCovers", - JSON.stringify({ - name: folder.name, - }) - ); - } - }); + // 2. Add to mongo + const dbImportResult = + await this.broker.call( + "import.rawImportToDB", + { + importStatus: { + isImported: true, + tagged: false, + matchedResult: { + score: "0", + }, + }, + rawFileDetails: + comicBookCoverMetadata, + sourcedMetadata: { + comicvine: {}, + }, + }, + {} + ); + // 3. Send to the queue + sendRabbitMQ( + "comicBookCovers", + JSON.stringify({ + comicBookCoverMetadata, + dbImportResult, + }) + ); + } else { + logger.info( + `Comic: \"${folder.name}\" already exists in the database` + ); + } + }); + } catch (error) { + logger.error( + "Error importing comic books", + error + ); + } }, }, rawImportToDB: { diff --git a/utils/logger.utils.ts b/utils/logger.utils.ts index d47c965..f8209c1 100644 --- a/utils/logger.utils.ts +++ b/utils/logger.utils.ts @@ -2,12 +2,12 @@ import { default as Pino } from "pino"; import { default as pinopretty } from "pino-pretty"; export const logger = Pino({ - name: "threetwo!", + name: "Threetwo!", prettyPrint: { colorize: true }, // crlf: false, // errorLikeObjectKeys: ["err", "error"], // errorProps: "", - // levelFirst: false, // --levelFirst + // levelFirst: false, messageKey: "msg", // --messageKey levelKey: "level", // --levelKey // messageFormat: false, // --messageFormat