From df6652cce9e1156c171c3915794133e5c1de8e57 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Mon, 21 Aug 2023 17:55:08 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=82=20Queue=20pause/resume=20functiona?= =?UTF-8?q?lity?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package-lock.json | 105 ++++++++++---------------------- package.json | 3 +- services/jobqueue.service.ts | 114 +++++++++++++++++++---------------- services/socket.service.ts | 33 +++++----- 4 files changed, 109 insertions(+), 146 deletions(-) diff --git a/package-lock.json b/package-lock.json index 16da2e6..42b6679 100644 --- a/package-lock.json +++ b/package-lock.json @@ -57,14 +57,13 @@ "@types/lodash": "^4.14.168", "@typescript-eslint/eslint-plugin": "^5.56.0", "@typescript-eslint/parser": "^5.56.0", - "bull": "^4.10.4", "eslint": "^8.36.0", "eslint-plugin-import": "^2.20.2", "eslint-plugin-prefer-arrow": "^1.2.2", "install": "^0.13.0", "jest": "^29.5.0", "jest-cli": "^29.5.0", - "moleculer-bullmq": "^3.0.0", + "moleculer-bullmq": "github:rishighan/moleculer-bullmq", "moleculer-repl": "^0.7.0", "node-calibre": "^2.1.1", "npm": "^8.4.1", @@ -4139,38 +4138,10 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/bull": { - "version": "4.10.4", - "resolved": "https://registry.npmjs.org/bull/-/bull-4.10.4.tgz", - "integrity": "sha512-o9m/7HjS/Or3vqRd59evBlWCXd9Lp+ALppKseoSKHaykK46SmRjAilX98PgmOz1yeVaurt8D5UtvEt4bUjM3eA==", - "dev": true, - "dependencies": { - "cron-parser": "^4.2.1", - "debuglog": "^1.0.0", - "get-port": "^5.1.1", - "ioredis": "^5.0.0", - "lodash": "^4.17.21", - "msgpackr": "^1.5.2", - "semver": "^7.3.2", - "uuid": "^8.3.0" - }, - "engines": { - "node": ">=12" - } - }, - "node_modules/bull/node_modules/uuid": { - "version": "8.3.2", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", - "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==", - "dev": true, - "bin": { - "uuid": "dist/bin/uuid" - } - }, "node_modules/bullmq": { - "version": "3.15.8", - "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-3.15.8.tgz", - "integrity": "sha512-k3uimHGhl5svqD7SEak+iI6c5DxeLOaOXzCufI9Ic0ST3nJr69v71TGR4cXCTXdgCff3tLec5HgoBnfyWjgn5A==", + "version": "4.8.0", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-4.8.0.tgz", + "integrity": "sha512-M5NPxrzHQ53yeRSH3j52dOu0U6Lssdhumet9CJ9LzTh2GNbhad9VPQunaariEmPmK0zCFF2uf08PVWtRbXnQkQ==", "dev": true, "dependencies": { "cron-parser": "^4.6.0", @@ -4178,7 +4149,8 @@ "ioredis": "^5.3.2", "lodash": "^4.17.21", "msgpackr": "^1.6.2", - "semver": "^7.3.7", + "node-abort-controller": "^3.1.1", + "semver": "^7.5.4", "tslib": "^2.0.0", "uuid": "^9.0.0" } @@ -4693,9 +4665,9 @@ "dev": true }, "node_modules/cron-parser": { - "version": "4.8.1", - "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.8.1.tgz", - "integrity": "sha512-jbokKWGcyU4gl6jAfX97E1gDpY12DJ1cLJZmoDzaAln/shZ+S3KBFBuA2Q6WeUN4gJf/8klnV1EfvhA2lK5IRQ==", + "version": "4.9.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", + "integrity": "sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==", "dev": true, "dependencies": { "luxon": "^3.2.1" @@ -4795,16 +4767,6 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, - "node_modules/debuglog": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/debuglog/-/debuglog-1.0.1.tgz", - "integrity": "sha512-syBZ+rnAK3EgMsH2aYEOLUW7mZSY9Gb+0wUMCFsZvcmiz+HigA0LOcq/HoQqVuGG+EKykunc7QG2bzrponfaSw==", - "deprecated": "Package no longer supported. Contact Support at https://www.npmjs.com/support for more info.", - "dev": true, - "engines": { - "node": "*" - } - }, "node_modules/decimal.js": { "version": "10.4.3", "resolved": "https://registry.npmjs.org/decimal.js/-/decimal.js-10.4.3.tgz", @@ -6691,18 +6653,6 @@ "node": ">=8.0.0" } }, - "node_modules/get-port": { - "version": "5.1.1", - "resolved": "https://registry.npmjs.org/get-port/-/get-port-5.1.1.tgz", - "integrity": "sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==", - "dev": true, - "engines": { - "node": ">=8" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/get-stream": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/get-stream/-/get-stream-6.0.1.tgz", @@ -6972,7 +6922,8 @@ "node_modules/hosted-git-info": { "version": "2.8.9", "resolved": "https://registry.npmjs.org/hosted-git-info/-/hosted-git-info-2.8.9.tgz", - "integrity": "sha512-mxIDAb9Lsm6DoOJ7xH+5+X4y1LU/4Hi50L9C5sIswK3JzULS4bwk1FvjdBgvYR4bzT4tuUQiC15FE2f5HbLvYw==" + "integrity": "sha512-mxIDAb9Lsm6DoOJ7xH+5+X4y1LU/4Hi50L9C5sIswK3JzULS4bwk1FvjdBgvYR4bzT4tuUQiC15FE2f5HbLvYw==", + "peer": true }, "node_modules/hpagent": { "version": "1.2.0", @@ -9107,9 +9058,9 @@ } }, "node_modules/luxon": { - "version": "3.3.0", - "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.3.0.tgz", - "integrity": "sha512-An0UCfG/rSiqtAIiBPO0Y9/zAnHUZxAMiCpTd5h2smgsj7GGmcenvrvww2cqNA8/4A5ZrD1gJpHN2mIHZQF+Mg==", + "version": "3.4.0", + "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.4.0.tgz", + "integrity": "sha512-7eDo4Pt7aGhoCheGFIuq4Xa2fJm4ZpmldpGhjTYBNUYNCN6TIEP6v7chwwwt3KRp7YR+rghbfvjyo3V5y9hgBw==", "dev": true, "engines": { "node": ">=12" @@ -9625,11 +9576,11 @@ }, "node_modules/moleculer-bullmq": { "version": "3.0.0", - "resolved": "https://registry.npmjs.org/moleculer-bullmq/-/moleculer-bullmq-3.0.0.tgz", - "integrity": "sha512-Pc6ggRT7kRBO6qaHbVoGoGQfoTPW9lQwEaI/qBdgCQ+II/zCrPl3s4DxPbDX1N1BcphReAPbM7hGQ/EIVv2MiA==", + "resolved": "git+ssh://git@github.com/rishighan/moleculer-bullmq.git#10fba3a88b69884bde2c827810aa2c4bb6772874", "dev": true, + "license": "MIT", "dependencies": { - "bullmq": "^3.5.1" + "bullmq": "^4.8.0" }, "engines": { "node": ">= 10.x.x" @@ -9884,9 +9835,9 @@ "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" }, "node_modules/msgpackr": { - "version": "1.9.5", - "resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.9.5.tgz", - "integrity": "sha512-/IJ3cFSN6Ci3eG2wLhbFEL6GT63yEaoN/R5My2QkV6zro+OJaVRLPlwvxY7EtHYSmDlQpk8stvOQTL2qJFkDRg==", + "version": "1.9.7", + "resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.9.7.tgz", + "integrity": "sha512-baUNaLvKQvVhzfWTNO07njwbZK1Lxjtb0P1JL6/EhXdLTHzR57/mZqqJC39TtQKvOmkJA4pcejS4dbk7BDgLLA==", "dev": true, "optionalDependencies": { "msgpackr-extract": "^3.0.2" @@ -9988,6 +9939,12 @@ "node": ">=10" } }, + "node_modules/node-abort-controller": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz", + "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==", + "dev": true + }, "node_modules/node-addon-api": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-5.1.0.tgz", @@ -13463,11 +13420,11 @@ } }, "node_modules/r2-shared-js": { - "version": "1.0.68", - "resolved": "https://registry.npmjs.org/r2-shared-js/-/r2-shared-js-1.0.68.tgz", - "integrity": "sha512-RcamFhorl6/YZGoWzY/fm8nQYBxqnJO5ykTNPF4ROw1yOybTOcIORPcbwCWQLdwZCy0dF+N4I2SeR63ge7V3WQ==", + "version": "1.0.70", + "resolved": "https://registry.npmjs.org/r2-shared-js/-/r2-shared-js-1.0.70.tgz", + "integrity": "sha512-ZvhYyhaJB6VsuURkEbd613szVZm2eGAcsx9883yQ9X5XZpB1KblZEPbT0W8m85syjdryCn2vO9HH88Drd4G83A==", "dependencies": { - "@xmldom/xmldom": "^0.8.8", + "@xmldom/xmldom": "^0.8.10", "debug": "^4.3.4", "fast-deep-equal": "^3.1.3", "he": "^1.2.0", @@ -13478,7 +13435,7 @@ "r2-utils-js": "^1.0.35", "slugify": "^1.6.6", "ta-json-x": "^2.5.3", - "tslib": "^2.6.0", + "tslib": "^2.6.1", "xpath": "^0.0.32", "yazl": "^2.5.1" }, diff --git a/package.json b/package.json index 62d94a8..3f020c1 100644 --- a/package.json +++ b/package.json @@ -23,14 +23,13 @@ "@types/lodash": "^4.14.168", "@typescript-eslint/eslint-plugin": "^5.56.0", "@typescript-eslint/parser": "^5.56.0", - "bull": "^4.10.4", "eslint": "^8.36.0", "eslint-plugin-import": "^2.20.2", "eslint-plugin-prefer-arrow": "^1.2.2", "install": "^0.13.0", "jest": "^29.5.0", "jest-cli": "^29.5.0", - "moleculer-bullmq": "^3.0.0", + "moleculer-bullmq": "github:rishighan/moleculer-bullmq", "moleculer-repl": "^0.7.0", "node-calibre": "^2.1.1", "npm": "^8.4.1", diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index 0a1f190..f3028ed 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -23,12 +23,19 @@ export default class JobQueueService extends Service { }, }, actions: { - getJobStatuses: { - rest: "GET /getJobStatuses", - handler: async (ctx: Context<{}>) => { - const foo = await this.getJobStatuses("enqueue.async"); - console.log(foo); - return foo; + toggle: { + rest: "GET /toggle", + handler: async (ctx: Context<{ action: String }>) => { + switch (ctx.params.action) { + case "pause": + this.pause(); + break; + case "resume": + this.resume(); + break; + default: + console.log(`Unknown queue action.`); + } }, }, enqueue: { @@ -36,12 +43,9 @@ export default class JobQueueService extends Service { rest: "/GET enqueue", handler: async (ctx: Context<{}>) => { // Enqueue the job - const job = await this.localQueue( - ctx, - "enqueue.async", - ctx.params, - { priority: 10 } - ); + const job = await this.localQueue(ctx, "enqueue.async", ctx.params, { + priority: 10, + }); console.log(`Job ${job.id} enqueued`); return job.id; @@ -55,17 +59,13 @@ export default class JobQueueService extends Service { }> ) => { try { - console.log( - `Recieved Job ID ${ctx.locals.job.id}, processing...` - ); + console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`); // 1. De-structure the job params const { fileObject } = ctx.locals.job.data.params; // 2. Extract metadata from the archive - const result = await extractFromArchive( - fileObject.filePath - ); + const result = await extractFromArchive(fileObject.filePath); const { name, filePath, @@ -78,9 +78,7 @@ export default class JobQueueService extends Service { } = result; // 3a. Infer any issue-related metadata from the filename - const { inferredIssueDetails } = refineQuery( - result.name - ); + const { inferredIssueDetails } = refineQuery(result.name); console.log( "Issue metadata inferred: ", JSON.stringify(inferredIssueDetails, null, 2) @@ -120,8 +118,7 @@ export default class JobQueueService extends Service { // "acquisition.directconnect.downloads": [], // mark the metadata source - "acquisition.source.name": - ctx.locals.job.data.params.sourcedFrom, + "acquisition.source.name": ctx.locals.job.data.params.sourcedFrom, }; // 3c. Add the bundleId, if present to the payload @@ -132,13 +129,8 @@ export default class JobQueueService extends Service { // 3d. Add the sourcedMetadata, if present if ( - !isNil( - ctx.locals.job.data.params.sourcedMetadata - ) && - !isUndefined( - ctx.locals.job.data.params.sourcedMetadata - .comicvine - ) + !isNil(ctx.locals.job.data.params.sourcedMetadata) && + !isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine) ) { Object.assign( payload.sourcedMetadata, @@ -147,15 +139,11 @@ export default class JobQueueService extends Service { } // 4. write to mongo - const importResult = await this.broker.call( - "library.rawImportToDB", - { - importType: - ctx.locals.job.data.params.importType, - bundleId, - payload, - } - ); + const importResult = await this.broker.call("library.rawImportToDB", { + importType: ctx.locals.job.data.params.importType, + bundleId, + payload, + }); return { data: { importResult, @@ -167,12 +155,9 @@ export default class JobQueueService extends Service { console.error( `An error occurred processing Job ID ${ctx.locals.job.id}` ); - throw new MoleculerError( - error, - 500, - "IMPORT_JOB_ERROR", - { data: ctx.params.socketSessionId } - ); + throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", { + data: ctx.params.socketSessionId, + }); } }, }, @@ -186,8 +171,8 @@ export default class JobQueueService extends Service { async "enqueue.async.completed"(ctx: Context<{ id: Number }>) { // 1. Fetch the job result using the job Id - const jobState = await this.job(ctx.params.id); - // 2. Incremement the completed job counter + const job = await this.job(ctx.params.id); + // 2. Increment the completed job counter await pubClient.incr("completedJobCount"); // 3. Fetch the completed job count for the final payload to be sent to the client const completedJobCount = await pubClient.get("completedJobCount"); @@ -195,19 +180,26 @@ export default class JobQueueService extends Service { await this.broker.call("socket.broadcast", { namespace: "/", //optional event: "action", - args: [{ type: "LS_COVER_EXTRACTED", completedJobCount, importResult: jobState.returnvalue.data.importResult }], //optional + args: [ + { + type: "LS_COVER_EXTRACTED", + completedJobCount, + importResult: job.returnvalue.data.importResult, + }, + ], }); - // 5. Persist the job results in mongo for posterity + // 5. Persist the job results in mongo for posterity await JobResult.create({ id: ctx.params.id, status: "completed", failedReason: {}, }); + // 6. Purge it from Redis + await job.remove(); console.log(`Job ID ${ctx.params.id} completed.`); }, async "enqueue.async.failed"(ctx) { - console.log("FAILED FAILED FAILED FAILED FAILED") const jobState = await this.job(ctx.params.id); await pubClient.incr("failedJobCount"); const failedJobCount = await pubClient.get("failedJobCount"); @@ -222,10 +214,26 @@ export default class JobQueueService extends Service { await this.broker.call("socket.broadcast", { namespace: "/", //optional event: "action", - args: [{ type: "LS_COVER_EXTRACTION_FAILED", failedJobCount, importResult: jobState }], //optional + args: [ + { + type: "LS_COVER_EXTRACTION_FAILED", + failedJobCount, + importResult: jobState, + }, + ], //optional + }); + }, + async "enqueue.async.drained"(ctx) { + console.log(`Queue drained, all jobs processed.`); + await this.broker.call("socket.broadcast", { + namespace: "/", + event: "action", + args: [ + { + type: "LS_IMPORT_QUEUE_DRAINED", + }, + ], }); - - }, }, }); diff --git a/services/socket.service.ts b/services/socket.service.ts index 5403e47..94e4325 100644 --- a/services/socket.service.ts +++ b/services/socket.service.ts @@ -31,16 +31,17 @@ export default class SocketService extends Service { action: async (data) => { switch (data.type) { case "RESUME_SESSION": - console.log( - "Attempting to resume session..." - ); + console.log("Attempting to resume session..."); try { - const sessionRecord = - await Session.find({ - sessionId: - data.session.sessionId, - }); - if (sessionRecord.length !== 0 && sessionRecord[0].sessionId === data.session.sessionId) { + const sessionRecord = await Session.find({ + sessionId: data.session.sessionId, + }); + // check for sessionId's existence + if ( + sessionRecord.length !== 0 && + sessionRecord[0].sessionId === + data.session.sessionId + ) { this.io.emit("yelaveda", { hagindari: "bhagindari", }); @@ -50,16 +51,16 @@ export default class SocketService extends Service { err, 500, "SESSION_ID_NOT_FOUND", - { data: data.session.sessionId } + { + data: data.session.sessionId, + } ); } break; case "LS_IMPORT": - console.log( - `Recieved ${data.type} event.` - ); + console.log(`Recieved ${data.type} event.`); // 1. Send task to queue await this.broker.call( "library.newImport", @@ -73,15 +74,13 @@ export default class SocketService extends Service { case "LS_TOGGLE_IMPORT_QUEUE": await this.broker.call( - "importqueue.toggleImportQueue", + "jobqueue.toggle", data.data, {} ); break; case "LS_SINGLE_IMPORT": - console.info( - "AirDC++ finished a download -> " - ); + console.info("AirDC++ finished a download -> "); console.log(data); await this.broker.call( "library.importDownloadedComic",