🐂 Added some job counters
This commit is contained in:
@@ -1,10 +1,12 @@
|
|||||||
import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
|
import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
|
||||||
const { MoleculerError } = require("moleculer").Errors;
|
|
||||||
import JobResult from "../models/jobresult.model";
|
import JobResult from "../models/jobresult.model";
|
||||||
import { refineQuery } from "filename-parser";
|
import { refineQuery } from "filename-parser";
|
||||||
import BullMqMixin, { BullMQAdapter, Queue } from "moleculer-bullmq";
|
import BullMqMixin, { BullMQAdapter, Queue } from "moleculer-bullmq";
|
||||||
import { extractFromArchive } from "../utils/uncompression.utils";
|
import { extractFromArchive } from "../utils/uncompression.utils";
|
||||||
import { isNil, isUndefined } from "lodash";
|
import { isNil, isUndefined } from "lodash";
|
||||||
|
import { pubClient } from "../config/redis.config";
|
||||||
|
|
||||||
|
const { MoleculerError } = require("moleculer").Errors;
|
||||||
|
|
||||||
console.log(process.env.REDIS_URI);
|
console.log(process.env.REDIS_URI);
|
||||||
export default class JobQueueService extends Service {
|
export default class JobQueueService extends Service {
|
||||||
@@ -183,8 +185,19 @@ export default class JobQueueService extends Service {
|
|||||||
},
|
},
|
||||||
|
|
||||||
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
|
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
|
||||||
|
// 1. Fetch the job result using the job Id
|
||||||
const jobState = await this.job(ctx.params.id);
|
const jobState = await this.job(ctx.params.id);
|
||||||
|
// 2. Incremement the completed job counter
|
||||||
|
await pubClient.incr("completedJobCount");
|
||||||
|
// 3. Fetch the completed job count for the final payload to be sent to the client
|
||||||
|
const completedJobCount = await pubClient.get("completedJobCount");
|
||||||
|
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
|
||||||
|
await this.broker.call("socket.broadcast", {
|
||||||
|
namespace: "/", //optional
|
||||||
|
event: "action",
|
||||||
|
args: [{ type: "LS_COVER_EXTRACTED", completedJobCount, importResult: jobState.returnvalue.data.importResult }], //optional
|
||||||
|
});
|
||||||
|
// 5. Persist the job results in mongo for posterity
|
||||||
await JobResult.create({
|
await JobResult.create({
|
||||||
id: ctx.params.id,
|
id: ctx.params.id,
|
||||||
status: "completed",
|
status: "completed",
|
||||||
@@ -194,12 +207,25 @@ export default class JobQueueService extends Service {
|
|||||||
},
|
},
|
||||||
|
|
||||||
async "enqueue.async.failed"(ctx) {
|
async "enqueue.async.failed"(ctx) {
|
||||||
|
console.log("FAILED FAILED FAILED FAILED FAILED")
|
||||||
const jobState = await this.job(ctx.params.id);
|
const jobState = await this.job(ctx.params.id);
|
||||||
|
await pubClient.incr("failedJobCount");
|
||||||
|
const failedJobCount = await pubClient.get("failedJobCount");
|
||||||
|
|
||||||
await JobResult.create({
|
await JobResult.create({
|
||||||
id: ctx.params.id,
|
id: ctx.params.id,
|
||||||
status: "failed",
|
status: "failed",
|
||||||
failedReason: jobState.failedReason,
|
failedReason: jobState.failedReason,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
|
||||||
|
await this.broker.call("socket.broadcast", {
|
||||||
|
namespace: "/", //optional
|
||||||
|
event: "action",
|
||||||
|
args: [{ type: "LS_COVER_EXTRACTION_FAILED", failedJobCount, importResult: jobState }], //optional
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -51,6 +51,7 @@ import {
|
|||||||
IExtractionOptions,
|
IExtractionOptions,
|
||||||
} from "threetwo-ui-typings";
|
} from "threetwo-ui-typings";
|
||||||
const ObjectId = require("mongoose").Types.ObjectId;
|
const ObjectId = require("mongoose").Types.ObjectId;
|
||||||
|
import { pubClient } from "../config/redis.config";
|
||||||
import fsExtra from "fs-extra";
|
import fsExtra from "fs-extra";
|
||||||
const through2 = require("through2");
|
const through2 = require("through2");
|
||||||
import klaw from "klaw";
|
import klaw from "klaw";
|
||||||
@@ -197,6 +198,9 @@ export default class ImportService extends Service {
|
|||||||
// importType: "new",
|
// importType: "new",
|
||||||
// }
|
// }
|
||||||
// );
|
// );
|
||||||
|
// reset the job counters in Redis
|
||||||
|
await pubClient.set("completedJobCount", 0);
|
||||||
|
await pubClient.set("failedJobCount", 0);
|
||||||
this.broker.call('jobqueue.enqueue', {
|
this.broker.call('jobqueue.enqueue', {
|
||||||
fileObject: {
|
fileObject: {
|
||||||
filePath: item.path,
|
filePath: item.path,
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import { createClient } from "redis";
|
|||||||
import { createAdapter } from "@socket.io/redis-adapter";
|
import { createAdapter } from "@socket.io/redis-adapter";
|
||||||
import Session from "../models/session.model";
|
import Session from "../models/session.model";
|
||||||
import { pubClient, subClient } from "../config/redis.config";
|
import { pubClient, subClient } from "../config/redis.config";
|
||||||
|
const { MoleculerError } = require("moleculer").Errors;
|
||||||
const SocketIOService = require("moleculer-io");
|
const SocketIOService = require("moleculer-io");
|
||||||
const { v4: uuidv4 } = require("uuid");
|
const { v4: uuidv4 } = require("uuid");
|
||||||
|
|
||||||
@@ -33,14 +34,25 @@ export default class SocketService extends Service {
|
|||||||
console.log(
|
console.log(
|
||||||
"Attempting to resume session..."
|
"Attempting to resume session..."
|
||||||
);
|
);
|
||||||
const sessionRecord =
|
try {
|
||||||
await Session.find({
|
const sessionRecord =
|
||||||
sessionId:
|
await Session.find({
|
||||||
data.session.sessionId,
|
sessionId:
|
||||||
});
|
data.session.sessionId,
|
||||||
this.io.emit("yelaveda", {
|
});
|
||||||
hagindari: "bhagindari",
|
if (sessionRecord.length !== 0 && sessionRecord[0].sessionId === data.session.sessionId) {
|
||||||
});
|
this.io.emit("yelaveda", {
|
||||||
|
hagindari: "bhagindari",
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
throw new MoleculerError(
|
||||||
|
err,
|
||||||
|
500,
|
||||||
|
"SESSION_ID_NOT_FOUND",
|
||||||
|
{ data: data.session.sessionId }
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user