🔧 Refactor for docker-compose
This commit is contained in:
@@ -11,7 +11,6 @@ import {
|
||||
} from "../utils/uncompression.utils";
|
||||
import { isNil, isUndefined } from "lodash";
|
||||
import { pubClient } from "../config/redis.config";
|
||||
import IORedis from 'ioredis';
|
||||
import path from "path";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
|
||||
@@ -22,9 +21,10 @@ export default class JobQueueService extends Service {
|
||||
name: "jobqueue",
|
||||
hooks: {},
|
||||
mixins: [DbMixin("comics", Comic), BullMqMixin],
|
||||
|
||||
settings: {
|
||||
bullmq: {
|
||||
client: new IORedis(process.env.REDIS_URI, { maxRetriesPerRequest: null }),
|
||||
client: pubClient,
|
||||
},
|
||||
},
|
||||
actions: {
|
||||
@@ -57,20 +57,24 @@ export default class JobQueueService extends Service {
|
||||
handler: async (
|
||||
ctx: Context<{ action: string; description: string }>
|
||||
) => {
|
||||
const { action, description } = ctx.params;
|
||||
// Enqueue the job
|
||||
const job = await this.localQueue(
|
||||
ctx,
|
||||
action,
|
||||
ctx.params,
|
||||
{
|
||||
priority: 10,
|
||||
}
|
||||
);
|
||||
console.log(`Job ${job.id} enqueued`);
|
||||
console.log(`${description}`);
|
||||
try {
|
||||
const { action, description } = ctx.params;
|
||||
// Enqueue the job
|
||||
const job = await this.localQueue(
|
||||
ctx,
|
||||
action,
|
||||
{},
|
||||
{
|
||||
priority: 10,
|
||||
}
|
||||
);
|
||||
console.log(`Job ${job.id} enqueued`);
|
||||
console.log(`${description}`);
|
||||
|
||||
return job.id;
|
||||
return job.id;
|
||||
} catch (error) {
|
||||
console.error("Failed to enqueue job:", error);
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
|
||||
@@ -165,78 +165,52 @@ export default class LibraryService extends Service {
|
||||
},
|
||||
newImport: {
|
||||
rest: "POST /newImport",
|
||||
// params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
extractionOptions?: any;
|
||||
sessionId: string;
|
||||
}>
|
||||
) {
|
||||
async handler(ctx) {
|
||||
const { sessionId } = ctx.params;
|
||||
try {
|
||||
// Get params to be passed to the import jobs
|
||||
const { sessionId } = ctx.params;
|
||||
// 1. Walk the Source folder
|
||||
klaw(path.resolve(COMICS_DIRECTORY))
|
||||
// 1.1 Filter on .cb* extensions
|
||||
.pipe(
|
||||
through2.obj(function (item, enc, next) {
|
||||
let fileExtension = path.extname(
|
||||
item.path
|
||||
);
|
||||
if (
|
||||
[".cbz", ".cbr", ".cb7"].includes(
|
||||
fileExtension
|
||||
)
|
||||
) {
|
||||
this.push(item);
|
||||
}
|
||||
next();
|
||||
})
|
||||
)
|
||||
// 1.2 Pipe filtered results to the next step
|
||||
// Enqueue the job in the queue
|
||||
.on("data", async (item) => {
|
||||
console.info(
|
||||
"Found a file at path: %s",
|
||||
item.path
|
||||
);
|
||||
let comicExists = await Comic.exists({
|
||||
"rawFileDetails.name": `${path.basename(
|
||||
item.path,
|
||||
path.extname(item.path)
|
||||
)}`,
|
||||
});
|
||||
if (!comicExists) {
|
||||
// 2.1 Reset the job counters in Redis
|
||||
await pubClient.set(
|
||||
"completedJobCount",
|
||||
0
|
||||
);
|
||||
await pubClient.set(
|
||||
"failedJobCount",
|
||||
0
|
||||
);
|
||||
// 2.2 Send the extraction job to the queue
|
||||
this.broker.call("jobqueue.enqueue", {
|
||||
fileObject: {
|
||||
filePath: item.path,
|
||||
fileSize: item.stats.size,
|
||||
},
|
||||
sessionId,
|
||||
importType: "new",
|
||||
action: "enqueue.async",
|
||||
});
|
||||
} else {
|
||||
console.log(
|
||||
"Comic already exists in the library."
|
||||
);
|
||||
}
|
||||
})
|
||||
.on("end", () => {
|
||||
console.log("All files traversed.");
|
||||
// Initialize Redis counters once at the start of the import
|
||||
await pubClient.set("completedJobCount", 0);
|
||||
await pubClient.set("failedJobCount", 0);
|
||||
|
||||
// Convert klaw to use a promise-based approach for better flow control
|
||||
const files = await this.getComicFiles(
|
||||
COMICS_DIRECTORY
|
||||
);
|
||||
for (const file of files) {
|
||||
console.info(
|
||||
"Found a file at path:",
|
||||
file.path
|
||||
);
|
||||
const comicExists = await Comic.exists({
|
||||
"rawFileDetails.name": path.basename(
|
||||
file.path,
|
||||
path.extname(file.path)
|
||||
),
|
||||
});
|
||||
|
||||
if (!comicExists) {
|
||||
// Send the extraction job to the queue
|
||||
await this.broker.call("jobqueue.enqueue", {
|
||||
fileObject: {
|
||||
filePath: file.path,
|
||||
fileSize: file.stats.size,
|
||||
},
|
||||
sessionId,
|
||||
importType: "new",
|
||||
action: "enqueue.async",
|
||||
});
|
||||
} else {
|
||||
console.log(
|
||||
"Comic already exists in the library."
|
||||
);
|
||||
}
|
||||
}
|
||||
console.log("All files traversed.");
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
console.error(
|
||||
"Error during newImport processing:",
|
||||
error
|
||||
);
|
||||
}
|
||||
},
|
||||
},
|
||||
@@ -821,7 +795,35 @@ export default class LibraryService extends Service {
|
||||
},
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
methods: {
|
||||
// Method to walk the directory and filter comic files
|
||||
getComicFiles: (directory) => {
|
||||
return new Promise((resolve, reject) => {
|
||||
const files = [];
|
||||
klaw(directory)
|
||||
.pipe(
|
||||
through2.obj(function (item, enc, next) {
|
||||
const fileExtension = path.extname(
|
||||
item.path
|
||||
);
|
||||
if (
|
||||
[".cbz", ".cbr", ".cb7"].includes(
|
||||
fileExtension
|
||||
)
|
||||
) {
|
||||
this.push(item);
|
||||
}
|
||||
next();
|
||||
})
|
||||
)
|
||||
.on("data", (item) => {
|
||||
files.push(item);
|
||||
})
|
||||
.on("end", () => resolve(files))
|
||||
.on("error", (err) => reject(err));
|
||||
});
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
import BullMqMixin from "moleculer-bullmq";
|
||||
const { MoleculerError } = require("moleculer").Errors;
|
||||
import IORedis from 'ioredis';
|
||||
|
||||
export default class ImageTransformation extends Service {
|
||||
// @ts-ignore
|
||||
@@ -24,7 +23,7 @@ export default class ImageTransformation extends Service {
|
||||
mixins: [DbMixin("comics", Comic), BullMqMixin],
|
||||
settings: {
|
||||
bullmq: {
|
||||
client: new IORedis(process.env.REDIS_URI),
|
||||
client: process.env.REDIS_URI,
|
||||
},
|
||||
},
|
||||
hooks: {},
|
||||
|
||||
Reference in New Issue
Block a user