🔧 Replaced rabbitMQ with bullMQ

This commit is contained in:
2021-10-26 15:07:42 -07:00
parent 755ac43820
commit 289b2ec3bd
8 changed files with 858 additions and 296 deletions

View File

@@ -11,6 +11,7 @@ export const DbMixin = (collection, model) => {
user: process.env.MONGO_INITDB_ROOT_USERNAME,
pass: process.env.MONGO_INITDB_ROOT_PASSWORD,
keepAlive: true,
useUnifiedTopology: true,
}),
model,
collection,

881
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -41,34 +41,34 @@
"@types/pino": "^6.3.8",
"@types/string-similarity": "^4.0.0",
"7zip-bin": "^5.1.1",
"7zip-min": "^1.4.0",
"chokidar": "^3.5.2",
"amqplib": "^0.7.1",
"fs-extra": "^10.0.0",
"imghash": "^0.0.9",
"jsdom": "^15.2.1",
"leven": "^3.1.0",
"7zip-min": "^1.4.0",
"lodash": "^4.17.21",
"mkdirp": "^0.5.5",
"moleculer": "^0.14.16",
"moleculer-bull": "^0.2.8",
"moleculer-db": "^0.8.13",
"moleculer-db-adapter-mongo": "^0.4.7",
"moleculer-db-adapter-mongoose": "^0.8.9",
"moleculer-web": "^0.9.0",
"moleculer-web": "^0.10.3",
"mongoose": "^5.12.7",
"pino-pretty": "^7.0.0",
"pino": "^6.13.2",
"unrar": "^0.2.0",
"mongoose-paginate-v2": "^1.3.18",
"nats": "^1.3.2",
"node-7z": "^3.0.0",
"node-calibre": "^2.1.1",
"node-unrar-js": "^1.0.2",
"pino": "^6.13.2",
"pino-pretty": "^7.0.0",
"sharp": "^0.28.1",
"socket.io": "^4.1.1",
"socket.io": "^4.3.1",
"socket.io-stream": "^0.5.3",
"threetwo-ui-typings": "^1.0.10",
"typescript": "^3.8.3",
"unrar": "^0.2.0",
"xml2js": "^0.4.23"
},
"engines": {

View File

@@ -1,30 +0,0 @@
import { logger } from "../utils/logger.utils";
//RabbitMQ
const amqp = require("amqplib/callback_api");
const rabbitUrl = process.env.DOCKER_RABBITMQ_CONNECTION_STRING ? process.env.DOCKER_RABBITMQ_CONNECTION_STRING : `amqp://localhost`;
export const sendToRabbitMQ = (queueName, data) => {
// connect to local rabbitmq instance
amqp.connect(rabbitUrl, (error0, connection) => {
if (error0) {
throw error0;
}
// create channel
connection.createChannel((error1, channel) => {
if (error1) {
throw error1;
}
const queue = queueName;
// Checks for “queueName (updateStock)” queue. If it doesnt exist, then it creates one.
channel.assertQueue(queue, {
durable: false,
});
channel.sendToQueue(queue, Buffer.from(data));
logger.info(`${data} sent`);
});
setTimeout(function () {
connection.close();
// process.exit(0);
}, 500);
});
};

View File

@@ -5,6 +5,7 @@ import { logger } from "../utils/logger.utils";
import path from "path";
import fs from "fs";
import { IExtractionOptions, IFolderData } from "threetwo-ui-typings";
import IO from "socket.io";
export default class ApiService extends Service {
public constructor(broker: ServiceBroker) {
super(broker);
@@ -15,7 +16,6 @@ export default class ApiService extends Service {
// More info about settings: https://moleculer.services/docs/0.14/moleculer-web.html
settings: {
port: process.env.PORT || 3000,
routes: [
{
path: "/api",
@@ -77,10 +77,67 @@ export default class ApiService extends Service {
options: {},
},
},
events: {},
events: {
"**"(payload, sender, event) {
if (this.io)
this.io.emit("event", {
sender,
event,
payload,
});
},
},
methods: {},
started(): any {
// Socket gateway-ish
// Create a Socket.IO instance, passing it our server
this.io = new IO.Server(3001);
// Add a connect listener
this.io.on("connection", (client) => {
this.logger.info("Client connected via websocket!");
client.on("action", (action, done) => {
console.log(action);
switch (action.type) {
case "LS_IMPORT":
this.broker
.call(
"import.processAndImportToDB",
action.data,
{}
)
.then((res) => {
if (done) done(res);
})
.catch((err) => this.logger.error(err));
break;
}
});
// client.on("call", ({ action, params, opts }, done) => {
// this.logger.info(
// "Received request from client! Action:",
// action,
// ", Params:",
// params
// );
// this.broker
// .call(action, params, opts)
// .then((res) => {
// if (done) done(res);
// })
// .catch((err) => this.logger.error(err));
// });
client.on("disconnect", () => {
this.logger.info("Client disconnected");
});
});
// Filewatcher
const fileWatcher = chokidar.watch(path.resolve("./comics"), {
ignored: /(^|[\/\\])\../, // ignore dotfiles
persistent: true,
@@ -94,38 +151,66 @@ export default class ApiService extends Service {
const fileCopyDelaySeconds = 10;
const checkFileCopyComplete = (path, previousPath) => {
fs.stat(path, async (err, stat) => {
if (err) { throw err; }
if (stat.mtime.getTime() === previousPath.mtime.getTime()) {
logger.info('File copy complete, starting import...');
const walkedFolders: IFolderData = await broker.call("import.walkFolders", { basePathToWalk: path });
if (err) {
throw err;
}
if (
stat.mtime.getTime() ===
previousPath.mtime.getTime()
) {
logger.info(
"File copy complete, starting import..."
);
const walkedFolders: IFolderData =
await broker.call("import.walkFolders", {
basePathToWalk: path,
});
const extractionOptions: IExtractionOptions = {
extractTarget: "cover",
targetExtractionFolder: "./userdata/covers",
extractionMode: "single",
};
await this.broker.call("import.processAndImportToDB", { walkedFolders, extractionOptions });
await this.broker.call(
"import.processAndImportToDB",
{ walkedFolders, extractionOptions }
);
} else {
setTimeout(checkFileCopyComplete, fileCopyDelaySeconds * 1000, path, stat);
setTimeout(
checkFileCopyComplete,
fileCopyDelaySeconds * 1000,
path,
stat
);
}
})
}
});
};
fileWatcher
.on("add", async (path, stats) => {
logger.info("Watcher detected new files.")
logger.info("Watcher detected new files.");
logger.info(
`File ${path} has been added with stats: ${JSON.stringify(
stats
)}`
);
logger.info('File copy started...');
logger.info("File copy started...");
fs.stat(path, function (err, stat) {
if (err) {
logger.error('Error watching file for copy completion. ERR: ' + err.message);
logger.error('Error file not processed. PATH: ' + path);
logger.error(
"Error watching file for copy completion. ERR: " +
err.message
);
logger.error(
"Error file not processed. PATH: " + path
);
throw err;
}
setTimeout(checkFileCopyComplete, fileCopyDelaySeconds * 1000, path, stat);
setTimeout(
checkFileCopyComplete,
fileCopyDelaySeconds * 1000,
path,
stat
);
});
})
.on("change", (path, stats) =>

View File

@@ -13,7 +13,6 @@ import { walkFolder } from "../utils/file.utils";
import { convertXMLToJSON } from "../utils/xml.utils";
import https from "https";
import { logger } from "../utils/logger.utils";
import { sendToRabbitMQ } from "../queue/importQueue";
import {
IExtractComicBookCoverErrorResponse,
IExtractedComicBookCoverFile,
@@ -72,16 +71,12 @@ export default class ImportService extends Service {
},
processAndImportToDB: {
rest: "POST /processAndImportToDB",
bulkhead: {
enabled: true,
concurrency: 50,
maxQueueSize: 100,
},
params: {},
async handler(
ctx: Context<{
extractionOptions: any;
walkedFolders: [
walkedFolders:
{
name: string;
path: string;
@@ -90,16 +85,15 @@ export default class ImportService extends Service {
fileSize: number;
isFile: boolean;
isLink: boolean;
}
];
};
}>
) {
try {
const { extractionOptions, walkedFolders } =
ctx.params;
map(walkedFolders, async (folder, idx) => {
// map(walkedFolders, async (folder, idx) => {
let comicExists = await Comic.exists({
"rawFileDetails.name": `${folder.name}`,
"rawFileDetails.name": `${walkedFolders.name}`,
});
if (!comicExists) {
// 1. Extract cover and cover metadata
@@ -109,7 +103,7 @@ export default class ImportService extends Service {
| IExtractedComicBookCoverFile[] =
await extractCoverFromFile(
extractionOptions,
folder
walkedFolders
);
// 2. Add to mongo
@@ -132,20 +126,13 @@ export default class ImportService extends Service {
},
{}
);
// 3. Send to the queue
sendToRabbitMQ(
"comicBookCovers",
JSON.stringify({
comicBookCoverMetadata,
dbImportResult,
})
);
} else {
logger.info(
`Comic: \"${folder.name}\" already exists in the database`
`Comic: \"${walkedFolders.name}\" already exists in the database`
);
}
});
// });
} catch (error) {
logger.error(
"Error importing comic books",

View File

@@ -0,0 +1,78 @@
"use strict";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import BullMQMixin from "moleculer-bull";
export default class LibraryQueueService extends Service {
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "libraryqueue" }
) {
super(broker);
this.parseServiceSchema(
Service.mergeSchemas(
{
name: "libraryqueue",
mixins: [BullMQMixin("redis://0.0.0.0:6379")],
settings: {},
hooks: {},
queues: {
"mail.send": {
async process(job) {
this.logger.info("New job received!", job.data);
this.logger.info(`Processing queue...`);
// const accounts = await this.broker.call('v1.users.list');
// this.logger.info(accounts);
return Promise.resolve({
id: job.id,
worker: process.pid,
});
},
},
},
actions: {
enqueue: {
rest: "POST /enqueue",
params: {},
async handler(ctx: Context<{}>) {
const job = await this.createJob("mail.send", {
blah: "blah",
});
const failed = await this.getQueue(
"mail.send"
).on("failed", async (job, error) => {
this.logger.error(
`An error occured in 'mail.send' queue on job id '${job.id}': ${error.message}`
);
});
const completed = await this.getQueue(
"mail.send"
).on("completed", async (job, res) => {
this.logger.info(
`Job with the id '${job.id}' completed.`
);
});
const stalled = await this.getQueue(
"mail.send"
).on("stalled", async (job) => {
this.logger.warn(
`The job with the id '${job} got stalled!`
);
});
},
},
},
methods: {},
async started(): Promise<any> {},
},
schema
)
);
}
}

View File

@@ -90,8 +90,8 @@ export const extractCoverFromFile = async (
);
const ebookMetaPath = `${process.env.CALIBRE_EBOOK_META_PATH}` || `ebook-meta`;
result = await calibre.run(
`${ebookMetaPath}`,
[constructedPaths.inputFilePath],
`ebook-meta`,
[path.resolve(constructedPaths.inputFilePath)],
{
getCover: targetCoverImageFilePath,
}