Revert "Merge branch 'master' into getbundles-fix"

This reverts commit 30168844f3, reversing
changes made to 2e60e2e3d5.
This commit is contained in:
2024-10-24 10:59:09 -04:00
parent c9ecbb911a
commit 1d48499c64
14 changed files with 168 additions and 546 deletions

View File

@@ -14,6 +14,7 @@ import { pubClient } from "../config/redis.config";
import path from "path";
const { MoleculerError } = require("moleculer").Errors;
console.log(process.env.REDIS_URI);
export default class JobQueueService extends Service {
public constructor(public broker: ServiceBroker) {
super(broker);
@@ -21,10 +22,9 @@ export default class JobQueueService extends Service {
name: "jobqueue",
hooks: {},
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: pubClient,
client: process.env.REDIS_URI,
},
},
actions: {
@@ -57,24 +57,20 @@ export default class JobQueueService extends Service {
handler: async (
ctx: Context<{ action: string; description: string }>
) => {
try {
const { action, description } = ctx.params;
// Enqueue the job
const job = await this.localQueue(
ctx,
action,
{},
{
priority: 10,
}
);
console.log(`Job ${job.id} enqueued`);
console.log(`${description}`);
const { action, description } = ctx.params;
// Enqueue the job
const job = await this.localQueue(
ctx,
action,
ctx.params,
{
priority: 10,
}
);
console.log(`Job ${job.id} enqueued`);
console.log(`${description}`);
return job.id;
} catch (error) {
console.error("Failed to enqueue job:", error);
}
return job.id;
},
},

View File

@@ -59,11 +59,9 @@ import path from "path";
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
import AirDCPPSocket from "../shared/airdcpp.socket";
export default class LibraryService extends Service {
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "library" }
) {
console.log(`MONGO -> ${process.env.MONGO_URI}`);
export default class ImportService extends Service {
public constructor(public broker: ServiceBroker) {
super(broker);
this.parseServiceSchema({
name: "library",
@@ -166,52 +164,78 @@ export default class LibraryService extends Service {
},
newImport: {
rest: "POST /newImport",
async handler(ctx) {
const { sessionId } = ctx.params;
// params: {},
async handler(
ctx: Context<{
extractionOptions?: any;
sessionId: string;
}>
) {
try {
// Initialize Redis counters once at the start of the import
await pubClient.set("completedJobCount", 0);
await pubClient.set("failedJobCount", 0);
// Convert klaw to use a promise-based approach for better flow control
const files = await this.getComicFiles(
COMICS_DIRECTORY
);
for (const file of files) {
console.info(
"Found a file at path:",
file.path
);
const comicExists = await Comic.exists({
"rawFileDetails.name": path.basename(
file.path,
path.extname(file.path)
),
});
if (!comicExists) {
// Send the extraction job to the queue
await this.broker.call("jobqueue.enqueue", {
fileObject: {
filePath: file.path,
fileSize: file.stats.size,
},
sessionId,
importType: "new",
action: "enqueue.async",
});
} else {
console.log(
"Comic already exists in the library."
// Get params to be passed to the import jobs
const { sessionId } = ctx.params;
// 1. Walk the Source folder
klaw(path.resolve(COMICS_DIRECTORY))
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
let fileExtension = path.extname(
item.path
);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
this.push(item);
}
next();
})
)
// 1.2 Pipe filtered results to the next step
// Enqueue the job in the queue
.on("data", async (item) => {
console.info(
"Found a file at path: %s",
item.path
);
}
}
console.log("All files traversed.");
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
path.extname(item.path)
)}`,
});
if (!comicExists) {
// 2.1 Reset the job counters in Redis
await pubClient.set(
"completedJobCount",
0
);
await pubClient.set(
"failedJobCount",
0
);
// 2.2 Send the extraction job to the queue
this.broker.call("jobqueue.enqueue", {
fileObject: {
filePath: item.path,
fileSize: item.stats.size,
},
sessionId,
importType: "new",
action: "enqueue.async",
});
} else {
console.log(
"Comic already exists in the library."
);
}
})
.on("end", () => {
console.log("All files traversed.");
});
} catch (error) {
console.error(
"Error during newImport processing:",
error
);
console.log(error);
}
},
},
@@ -840,35 +864,7 @@ export default class LibraryService extends Service {
},
},
},
methods: {
// Method to walk the directory and filter comic files
getComicFiles: (directory) => {
return new Promise((resolve, reject) => {
const files = [];
klaw(directory)
.pipe(
through2.obj(function (item, enc, next) {
const fileExtension = path.extname(
item.path
);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
this.push(item);
}
next();
})
)
.on("data", (item) => {
files.push(item);
})
.on("end", () => resolve(files))
.on("error", (err) => reject(err));
});
},
},
methods: {},
});
}
}

View File

@@ -1,6 +1,7 @@
"use strict";
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
import { JobType } from "moleculer-bullmq";
import { createClient } from "redis";
import { createAdapter } from "@socket.io/redis-adapter";
import Session from "../models/session.model";
import { pubClient, subClient } from "../config/redis.config";
@@ -273,22 +274,6 @@ export default class SocketService extends Service {
},
},
async started() {
this.logger.info("Starting Socket Service...");
this.logger.debug("pubClient:", pubClient);
this.logger.debug("subClient:", subClient);
if (!pubClient || !subClient) {
this.logger.error("Redis clients are not initialized!");
throw new Error("Redis clients are not initialized!");
}
// Additional checks or logic if necessary
if (pubClient.status !== "ready") {
await pubClient.connect();
}
if (subClient.status !== "ready") {
await subClient.connect();
}
this.io.on("connection", async (socket) => {
console.log(
`socket.io server connected to client with session ID: ${socket.id}`

View File

@@ -9,7 +9,6 @@ import {
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import BullMqMixin from "moleculer-bullmq";
import { pubClient } from "../config/redis.config";
const { MoleculerError } = require("moleculer").Errors;
export default class ImageTransformation extends Service {
@@ -24,7 +23,7 @@ export default class ImageTransformation extends Service {
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: pubClient,
client: process.env.REDIS_URI,
},
},
hooks: {},