🧲 Created a dedicated queue for torrent ops

This commit is contained in:
2024-03-29 19:36:16 -04:00
parent f053dcb789
commit b35e2140b5
3 changed files with 105 additions and 66 deletions

View File

@@ -55,13 +55,13 @@ export default class JobQueueService extends Service {
queue: true,
rest: "GET /enqueue",
handler: async (
ctx: Context<{ queueName: string; description: string }>
ctx: Context<{ action: string; description: string }>
) => {
const { queueName, description } = ctx.params;
const { action, description } = ctx.params;
// Enqueue the job
const job = await this.localQueue(
ctx,
queueName,
action,
ctx.params,
{
priority: 10,
@@ -73,69 +73,7 @@ export default class JobQueueService extends Service {
return job.id;
},
},
getTorrentData: {
queue: true,
rest: "GET /getTorrentData",
handler: async (ctx: Context<{ trigger: string }>) => {
const { trigger } = ctx.params;
console.log(`Recieved ${trigger} as the trigger...`);
const jobOptions = {
jobId: "retrieveTorrentData",
name: "bossy",
repeat: {
every: 10000, // Repeat every 10000 ms
limit: 100, // Limit to 100 repeats
},
};
const job = await this.localQueue(
ctx,
"fetchTorrentDataJob",
"bird",
jobOptions
);
return job;
},
},
fetchTorrentDataJob: {
rest: "GET /fetchTorrentDataJob",
handler: async (
ctx: Context<{
birdName: String;
}>
) => {
const repeatableJob = await this.$resolve(
"jobqueue"
).getRepeatableJobs();
console.info(repeatableJob);
console.info(
`Scheduled job for fetching torrent data fired.`
);
// 1. query mongo for infohashes
const infoHashes = await this.broker.call(
"library.getInfoHashes",
{}
);
// 2. query qbittorrent to see if they exist
const torrents: any = await this.broker.call(
"qbittorrent.getTorrentRealTimeStats",
{ infoHashes }
);
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "AS_TORRENT_DATA",
args: [
{
torrents,
},
],
});
// 3. If they do, don't do anything
// 4. If they don't purge them from mongo
},
},
// Comic Book Import Job Queue
"enqueue.async": {
handler: async (

View File

@@ -222,7 +222,7 @@ export default class ImportService extends Service {
},
sessionId,
importType: "new",
queueName: "enqueue.async",
action: "enqueue.async",
});
} else {
console.log(

View File

@@ -0,0 +1,101 @@
"use strict";
import axios from "axios";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
const ObjectId = require("mongoose").Types.ObjectId;
import { isNil, isUndefined } from "lodash";
import BullMqMixin from "moleculer-bullmq";
const { MoleculerError } = require("moleculer").Errors;
export default class ImageTransformation extends Service {
// @ts-ignore
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "imagetransformation" }
) {
super(broker);
this.parseServiceSchema({
name: "torrentjobs",
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: process.env.REDIS_URI,
},
},
hooks: {},
actions: {
getTorrentData: {
queue: true,
rest: "GET /getTorrentData",
handler: async (ctx: Context<{ trigger: string }>) => {
const { trigger } = ctx.params;
console.log(`Recieved ${trigger} as the trigger...`);
const jobOptions = {
jobId: "retrieveTorrentData",
name: "bossy",
repeat: {
every: 10000, // Repeat every 10000 ms
limit: 100, // Limit to 100 repeats
},
};
const job = await this.localQueue(
ctx,
"fetchTorrentData",
ctx.params,
jobOptions
);
return job;
},
},
fetchTorrentData: {
rest: "GET /fetchTorrentData",
handler: async (
ctx: Context<{
birdName: String;
}>
) => {
const repeatableJob = await this.$resolve(
"torrentjobs"
).getRepeatableJobs();
console.info(repeatableJob);
console.info(
`Scheduled job for fetching torrent data fired.`
);
// 1. query mongo for infohashes
const infoHashes = await this.broker.call(
"library.getInfoHashes",
{}
);
// 2. query qbittorrent to see if they exist
const torrents: any = await this.broker.call(
"qbittorrent.getTorrentRealTimeStats",
{ infoHashes }
);
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "AS_TORRENT_DATA",
args: [
{
torrents,
},
],
});
// 3. If they do, don't do anything
// 4. If they don't purge them from mongo
},
},
},
methods: {},
});
}
}