🥭 Aggregation for jobResult
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
|
import { Context, Service, ServiceBroker } from "moleculer";
|
||||||
import JobResult from "../models/jobresult.model";
|
import JobResult from "../models/jobresult.model";
|
||||||
import { refineQuery } from "filename-parser";
|
import { refineQuery } from "filename-parser";
|
||||||
import BullMqMixin, { BullMQAdapter, Queue } from "moleculer-bullmq";
|
import BullMqMixin from "moleculer-bullmq";
|
||||||
import { extractFromArchive } from "../utils/uncompression.utils";
|
import { extractFromArchive } from "../utils/uncompression.utils";
|
||||||
import { isNil, isUndefined } from "lodash";
|
import { isNil, isUndefined } from "lodash";
|
||||||
import { pubClient } from "../config/redis.config";
|
import { pubClient } from "../config/redis.config";
|
||||||
@@ -12,7 +12,6 @@ console.log(process.env.REDIS_URI);
|
|||||||
export default class JobQueueService extends Service {
|
export default class JobQueueService extends Service {
|
||||||
public constructor(public broker: ServiceBroker) {
|
public constructor(public broker: ServiceBroker) {
|
||||||
super(broker);
|
super(broker);
|
||||||
|
|
||||||
this.parseServiceSchema({
|
this.parseServiceSchema({
|
||||||
name: "jobqueue",
|
name: "jobqueue",
|
||||||
hooks: {},
|
hooks: {},
|
||||||
@@ -168,6 +167,23 @@ export default class JobQueueService extends Service {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
getJobResultStatistics: {
|
||||||
|
rest: "GET /getJobResultStatistics",
|
||||||
|
handler: async (ctx: Context<{}>) => {
|
||||||
|
const result = await JobResult.aggregate([
|
||||||
|
{
|
||||||
|
$group: {
|
||||||
|
_id: "$_id",
|
||||||
|
jobResults: { $addToSet: "status" }
|
||||||
|
},
|
||||||
|
|
||||||
|
},
|
||||||
|
{ $sort: { timestamp: -1 } },
|
||||||
|
{ $skip: 0 },
|
||||||
|
{ $limit: 5 },
|
||||||
|
])
|
||||||
|
}
|
||||||
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
events: {
|
events: {
|
||||||
@@ -188,14 +204,13 @@ export default class JobQueueService extends Service {
|
|||||||
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
|
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
|
||||||
// 1. Fetch the job result using the job Id
|
// 1. Fetch the job result using the job Id
|
||||||
const job = await this.job(ctx.params.id);
|
const job = await this.job(ctx.params.id);
|
||||||
console.log(job);
|
|
||||||
// 2. Increment the completed job counter
|
// 2. Increment the completed job counter
|
||||||
await pubClient.incr("completedJobCount");
|
await pubClient.incr("completedJobCount");
|
||||||
// 3. Fetch the completed job count for the final payload to be sent to the client
|
// 3. Fetch the completed job count for the final payload to be sent to the client
|
||||||
const completedJobCount = await pubClient.get("completedJobCount");
|
const completedJobCount = await pubClient.get("completedJobCount");
|
||||||
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
|
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
|
||||||
await this.broker.call("socket.broadcast", {
|
await this.broker.call("socket.broadcast", {
|
||||||
namespace: "/", //optional
|
namespace: "/",
|
||||||
event: "action",
|
event: "action",
|
||||||
args: [
|
args: [
|
||||||
{
|
{
|
||||||
@@ -226,6 +241,7 @@ export default class JobQueueService extends Service {
|
|||||||
id: ctx.params.id,
|
id: ctx.params.id,
|
||||||
status: "failed",
|
status: "failed",
|
||||||
failedReason: job.failedReason,
|
failedReason: job.failedReason,
|
||||||
|
timestamp: job.timestamp,
|
||||||
});
|
});
|
||||||
|
|
||||||
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
|
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
|
||||||
|
|||||||
@@ -175,6 +175,7 @@ export default class ImportService extends Service {
|
|||||||
})
|
})
|
||||||
)
|
)
|
||||||
// 1.2 Pipe filtered results to the next step
|
// 1.2 Pipe filtered results to the next step
|
||||||
|
// Enqueue the job in the queue
|
||||||
.on("data", async (item) => {
|
.on("data", async (item) => {
|
||||||
console.info(
|
console.info(
|
||||||
"Found a file at path: %s",
|
"Found a file at path: %s",
|
||||||
@@ -187,20 +188,10 @@ export default class ImportService extends Service {
|
|||||||
)}`,
|
)}`,
|
||||||
});
|
});
|
||||||
if (!comicExists) {
|
if (!comicExists) {
|
||||||
// 2. Send the extraction job to the queue
|
// 2.1 Reset the job counters in Redis
|
||||||
// await broker.call(
|
|
||||||
// "importqueue.processImport",
|
|
||||||
// {
|
|
||||||
// fileObject: {
|
|
||||||
// filePath: item.path,
|
|
||||||
// fileSize: item.stats.size,
|
|
||||||
// },
|
|
||||||
// importType: "new",
|
|
||||||
// }
|
|
||||||
// );
|
|
||||||
// reset the job counters in Redis
|
|
||||||
await pubClient.set("completedJobCount", 0);
|
await pubClient.set("completedJobCount", 0);
|
||||||
await pubClient.set("failedJobCount", 0);
|
await pubClient.set("failedJobCount", 0);
|
||||||
|
// 2.2 Send the extraction job to the queue
|
||||||
this.broker.call('jobqueue.enqueue', {
|
this.broker.call('jobqueue.enqueue', {
|
||||||
fileObject: {
|
fileObject: {
|
||||||
filePath: item.path,
|
filePath: item.path,
|
||||||
@@ -222,7 +213,6 @@ export default class ImportService extends Service {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
rawImportToDB: {
|
rawImportToDB: {
|
||||||
rest: "POST /rawImportToDB",
|
rest: "POST /rawImportToDB",
|
||||||
params: {},
|
params: {},
|
||||||
|
|||||||
Reference in New Issue
Block a user