🏗 Refactoring the import process WIP
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
import { Service, ServiceBroker, Context } from "moleculer";
|
||||
import ApiGateway from "moleculer-web";
|
||||
import chokidar from "chokidar";
|
||||
import { logger } from "../utils/logger.utils";
|
||||
import path from "path";
|
||||
import fs from "fs";
|
||||
import { IExtractionOptions, IFolderData } from "threetwo-ui-typings";
|
||||
@@ -107,24 +106,24 @@ export default class ApiService extends Service {
|
||||
});
|
||||
// Add a connect listener
|
||||
this.io.on("connection", (client) => {
|
||||
this.logger.info("Client connected via websocket!");
|
||||
console.log("Client connected via websocket!");
|
||||
|
||||
client.on("action", async (action) => {
|
||||
switch (action.type) {
|
||||
case "LS_IMPORT":
|
||||
// 1. Send task to queue
|
||||
const result = await this.broker.call(
|
||||
"libraryqueue.enqueue",
|
||||
"import.newImport",
|
||||
action.data,
|
||||
{}
|
||||
);
|
||||
|
||||
client.emit("LS_COVER_EXTRACTED", result);
|
||||
break;
|
||||
}
|
||||
});
|
||||
// Add a disconnect listener
|
||||
client.on("disconnect", () => {
|
||||
this.logger.info("Client disconnected");
|
||||
console.log("Client disconnected");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -149,7 +148,8 @@ export default class ApiService extends Service {
|
||||
stat.mtime.getTime() ===
|
||||
previousPath.mtime.getTime()
|
||||
) {
|
||||
logger.info("File detected, starting import...");
|
||||
console.log("File detected, starting import...");
|
||||
// this walking business needs to go, SACURATAYYY, SACURATAYYY!! This dude needs to go.
|
||||
const walkedFolders: IFolderData =
|
||||
await broker.call("import.walkFolders", {
|
||||
basePathToWalk: path,
|
||||
@@ -175,21 +175,21 @@ export default class ApiService extends Service {
|
||||
};
|
||||
fileWatcher
|
||||
.on("add", async (path, stats) => {
|
||||
logger.info("Watcher detected new files.");
|
||||
logger.info(
|
||||
console.log("Watcher detected new files.");
|
||||
console.log(
|
||||
`File ${path} has been added with stats: ${JSON.stringify(
|
||||
stats
|
||||
)}`
|
||||
);
|
||||
|
||||
logger.info("File copy started...");
|
||||
console.log("File copy started...");
|
||||
fs.stat(path, function (err, stat) {
|
||||
if (err) {
|
||||
logger.error(
|
||||
console.log(
|
||||
"Error watching file for copy completion. ERR: " +
|
||||
err.message
|
||||
);
|
||||
logger.error(
|
||||
console.log(
|
||||
"Error file not processed. PATH: " + path
|
||||
);
|
||||
throw err;
|
||||
@@ -203,15 +203,15 @@ export default class ApiService extends Service {
|
||||
});
|
||||
})
|
||||
.on("change", (path, stats) =>
|
||||
logger.info(
|
||||
console.log(
|
||||
`File ${path} has been changed. Stats: ${stats}`
|
||||
)
|
||||
)
|
||||
.on("unlink", (path) =>
|
||||
logger.info(`File ${path} has been removed`)
|
||||
console.log(`File ${path} has been removed`)
|
||||
)
|
||||
.on("addDir", (path) =>
|
||||
logger.info(`Directory ${path} has been added`)
|
||||
console.log(`Directory ${path} has been added`)
|
||||
);
|
||||
},
|
||||
});
|
||||
|
||||
@@ -9,10 +9,9 @@ import {
|
||||
} from "moleculer";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
import { walkFolder } from "../utils/file.utils";
|
||||
import { explodePath, walkFolder } from "../utils/file.utils";
|
||||
import { convertXMLToJSON } from "../utils/xml.utils";
|
||||
import https from "https";
|
||||
import { logger } from "../utils/logger.utils";
|
||||
import {
|
||||
IExtractComicBookCoverErrorResponse,
|
||||
IExtractedComicBookCoverFile,
|
||||
@@ -20,12 +19,14 @@ import {
|
||||
} from "threetwo-ui-typings";
|
||||
import {
|
||||
extractCoverFromFile,
|
||||
extractCoverFromFile2,
|
||||
unrarArchive,
|
||||
} from "../utils/uncompression.utils";
|
||||
import { scrapeIssuesFromDOM } from "../utils/scraping.utils";
|
||||
const ObjectId = require("mongoose").Types.ObjectId;
|
||||
import mongoose from "mongoose";
|
||||
import fsExtra from "fs-extra";
|
||||
const through2 = require("through2");
|
||||
import klaw from "klaw";
|
||||
import path from "path";
|
||||
|
||||
export default class ImportService extends Service {
|
||||
@@ -72,6 +73,84 @@ export default class ImportService extends Service {
|
||||
return convertXMLToJSON("lagos");
|
||||
},
|
||||
},
|
||||
newImport: {
|
||||
rest: "POST /newImport",
|
||||
params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
extractionOptions?: any;
|
||||
}>
|
||||
) {
|
||||
// 1. Walk the Source folder
|
||||
klaw(path.resolve(process.env.COMICS_DIRECTORY))
|
||||
// 1.1 Filter on .cb* extensions
|
||||
.pipe(
|
||||
through2.obj(function (
|
||||
item,
|
||||
enc,
|
||||
next
|
||||
) {
|
||||
let fileExtension = path.extname(
|
||||
item.path
|
||||
);
|
||||
if (
|
||||
[
|
||||
".cbz",
|
||||
".cbr",
|
||||
".cb7",
|
||||
].includes(fileExtension)
|
||||
) {
|
||||
this.push(item);
|
||||
}
|
||||
|
||||
next();
|
||||
})
|
||||
)
|
||||
// 1.2 Pipe filtered results to the next step
|
||||
.on("data", async (item) => {
|
||||
console.info(
|
||||
"Found a file at path: %s",
|
||||
item.path
|
||||
);
|
||||
let comicExists = await Comic.exists({
|
||||
"rawFileDetails.name": `${path.basename(
|
||||
item.path,
|
||||
path.extname(item.path)
|
||||
)}`,
|
||||
});
|
||||
if (!comicExists) {
|
||||
// 2. Send the extraction job to the queue
|
||||
await broker.call(
|
||||
"libraryqueue.enqueue",
|
||||
{
|
||||
fileObject: {
|
||||
filePath: item.path,
|
||||
size: item.stats.size,
|
||||
},
|
||||
}
|
||||
);
|
||||
} else {
|
||||
console.log(
|
||||
"Comic already exists in the library."
|
||||
);
|
||||
}
|
||||
})
|
||||
.on("end", () => {
|
||||
console.log("Import process complete.");
|
||||
});
|
||||
},
|
||||
},
|
||||
nicefyPath: {
|
||||
rest: "POST /nicefyPath",
|
||||
params: {},
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
filePath: string;
|
||||
}>
|
||||
) {
|
||||
return explodePath(ctx.params.filePath);
|
||||
},
|
||||
},
|
||||
processAndImportToDB: {
|
||||
rest: "POST /processAndImportToDB",
|
||||
|
||||
@@ -96,6 +175,11 @@ export default class ImportService extends Service {
|
||||
let comicExists = await Comic.exists({
|
||||
"rawFileDetails.name": `${walkedFolders.name}`,
|
||||
});
|
||||
// rough flow of import process
|
||||
// 1. Walk folder
|
||||
// 2. For each folder, call extract function
|
||||
// 3. For each successful extraction, run dbImport
|
||||
|
||||
if (!comicExists) {
|
||||
// 1. Extract cover and cover metadata
|
||||
let comicBookCoverMetadata:
|
||||
@@ -132,12 +216,12 @@ export default class ImportService extends Service {
|
||||
dbImportResult,
|
||||
};
|
||||
} else {
|
||||
logger.info(
|
||||
console.info(
|
||||
`Comic: \"${walkedFolders.name}\" already exists in the database`
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
console.error(
|
||||
"Error importing comic books",
|
||||
error
|
||||
);
|
||||
@@ -233,7 +317,7 @@ export default class ImportService extends Service {
|
||||
{ new: true },
|
||||
(err, result) => {
|
||||
if (err) {
|
||||
console.log(err);
|
||||
console.info(err);
|
||||
reject(err);
|
||||
} else {
|
||||
// 3. Fetch and append volume information
|
||||
@@ -364,17 +448,17 @@ export default class ImportService extends Service {
|
||||
rest: "POST /flushDB",
|
||||
params: {},
|
||||
async handler(ctx: Context<{}>) {
|
||||
return await mongoose.connection.db
|
||||
.dropCollection("comics")
|
||||
return await Comic.collection
|
||||
.drop()
|
||||
.then((data) => {
|
||||
logger.info(data);
|
||||
console.info(data);
|
||||
const foo = fsExtra.emptyDirSync(
|
||||
path.resolve("./userdata/covers")
|
||||
);
|
||||
const foo2 = fsExtra.emptyDirSync(
|
||||
path.resolve("./userdata/expanded")
|
||||
);
|
||||
return { foo, foo2 };
|
||||
return { data, foo, foo2 };
|
||||
})
|
||||
.catch((error) => error);
|
||||
},
|
||||
@@ -422,7 +506,7 @@ export default class ImportService extends Service {
|
||||
});
|
||||
|
||||
resp.on("end", () => {
|
||||
console.log(
|
||||
console.info(
|
||||
data,
|
||||
"HERE, BITCHES< HERE"
|
||||
);
|
||||
@@ -435,7 +519,7 @@ export default class ImportService extends Service {
|
||||
}
|
||||
)
|
||||
.on("error", (err) => {
|
||||
console.log("Error: " + err.message);
|
||||
console.info("Error: " + err.message);
|
||||
reject(err);
|
||||
});
|
||||
}),
|
||||
|
||||
@@ -7,7 +7,11 @@ import {
|
||||
Errors,
|
||||
} from "moleculer";
|
||||
import BullMQMixin from "moleculer-bull";
|
||||
const REDIS_URI = process.env.REDIS_URI || `redis://0.0.0.0:6379`;
|
||||
import { SandboxedJob } from "moleculer-bull";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
import { extractCoverFromFile2 } from "../utils/uncompression.utils";
|
||||
const REDIS_URI = process.env.REDIS_URI || `redis://0.0.0.0:6379`;
|
||||
|
||||
export default class LibraryQueueService extends Service {
|
||||
public constructor(
|
||||
@@ -15,22 +19,45 @@ export default class LibraryQueueService extends Service {
|
||||
schema: ServiceSchema<{}> = { name: "libraryqueue" }
|
||||
) {
|
||||
super(broker);
|
||||
console.log(this.io);
|
||||
this.parseServiceSchema(
|
||||
Service.mergeSchemas(
|
||||
{
|
||||
name: "libraryqueue",
|
||||
mixins: [BullMQMixin(REDIS_URI)],
|
||||
mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
|
||||
settings: {},
|
||||
hooks: {},
|
||||
queues: {
|
||||
"process.import": {
|
||||
async process(job) {
|
||||
this.logger.info("New job received!", job.data);
|
||||
this.logger.info(`Processing queue...`);
|
||||
const result = await this.broker.call('import.processAndImportToDB', job.data);
|
||||
|
||||
async process(job: SandboxedJob) {
|
||||
console.info("New job received!", job.data);
|
||||
console.info(`Processing queue...`);
|
||||
// extract the cover
|
||||
const result = await extractCoverFromFile2(
|
||||
job.data.fileObject
|
||||
);
|
||||
|
||||
// write to mongo
|
||||
const dbImportResult = await this.broker.call(
|
||||
"import.rawImportToDB",
|
||||
{
|
||||
importStatus: {
|
||||
isImported: true,
|
||||
tagged: false,
|
||||
matchedResult: {
|
||||
score: "0",
|
||||
},
|
||||
},
|
||||
rawFileDetails: result,
|
||||
sourcedMetadata: {
|
||||
comicvine: {},
|
||||
},
|
||||
},
|
||||
{}
|
||||
);
|
||||
|
||||
return Promise.resolve({
|
||||
result,
|
||||
dbImportResult,
|
||||
id: job.id,
|
||||
worker: process.pid,
|
||||
});
|
||||
@@ -41,40 +68,42 @@ export default class LibraryQueueService extends Service {
|
||||
enqueue: {
|
||||
rest: "POST /enqueue",
|
||||
params: {},
|
||||
async handler(ctx: Context<{ extractionOptions: object, walkedFolders: object}>) {
|
||||
async handler(
|
||||
ctx: Context<{
|
||||
fileObject: object;
|
||||
}>
|
||||
) {
|
||||
return await this.createJob("process.import", {
|
||||
extractionOptions: ctx.params.extractionOptions,
|
||||
walkedFolders: ctx.params.walkedFolders,
|
||||
fileObject: ctx.params.fileObject,
|
||||
});
|
||||
|
||||
|
||||
},
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
async started(): Promise<any> {
|
||||
const failed = await this.getQueue(
|
||||
"process.import"
|
||||
).on("failed", async (job, error) => {
|
||||
this.logger.error(
|
||||
`An error occured in 'mail.send' queue on job id '${job.id}': ${error.message}`
|
||||
);
|
||||
});
|
||||
const completed = await this.getQueue(
|
||||
"process.import"
|
||||
).on("completed", async (job, res) => {
|
||||
this.logger.info(
|
||||
`Job with the id '${job.id}' completed.`
|
||||
);
|
||||
});
|
||||
const stalled = await this.getQueue(
|
||||
"process.import"
|
||||
).on("stalled", async (job) => {
|
||||
this.logger.warn(
|
||||
`The job with the id '${job} got stalled!`
|
||||
);
|
||||
});
|
||||
},
|
||||
const failed = await this.getQueue("process.import").on(
|
||||
"failed",
|
||||
async (job, error) => {
|
||||
console.error(
|
||||
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
|
||||
);
|
||||
}
|
||||
);
|
||||
const completed = await this.getQueue(
|
||||
"process.import"
|
||||
).on("completed", async (job, res) => {
|
||||
console.info(
|
||||
`Job with the id '${job.id}' completed.`
|
||||
);
|
||||
});
|
||||
const stalled = await this.getQueue(
|
||||
"process.import"
|
||||
).on("stalled", async (job) => {
|
||||
console.warn(
|
||||
`The job with the id '${job} got stalled!`
|
||||
);
|
||||
});
|
||||
},
|
||||
},
|
||||
schema
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user