🔧 Refactoring the import queue process

This commit is contained in:
2022-04-18 15:22:34 -07:00
parent 38a80e41b3
commit 4c071f2fc7
2 changed files with 74 additions and 79 deletions

View File

@@ -34,12 +34,7 @@ SOFTWARE.
"use strict"; "use strict";
import { refineQuery } from "filename-parser"; import { refineQuery } from "filename-parser";
import { import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
Context,
Service,
ServiceBroker,
ServiceSchema
} from "moleculer";
import BullMQMixin, { SandboxedJob } from "moleculer-bull"; import BullMQMixin, { SandboxedJob } from "moleculer-bull";
import { DbMixin } from "../mixins/db.mixin"; import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model"; import Comic from "../models/comic.model";
@@ -56,18 +51,14 @@ export default class QueueService extends Service {
schema: ServiceSchema<{}> = { name: "importqueue" } schema: ServiceSchema<{}> = { name: "importqueue" }
) { ) {
super(broker); super(broker);
console.log(this.io);
this.parseServiceSchema({ this.parseServiceSchema({
name: "importqueue", name: "importqueue",
mixins: [ mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
BullMQMixin(REDIS_URI),
DbMixin("comics", Comic),
],
settings: {}, settings: {},
hooks: {}, hooks: {},
queues: { queues: {
"process.import": { "process.import": {
concurrency: 30, concurrency: 1,
async process(job: SandboxedJob) { async process(job: SandboxedJob) {
console.info("New job received!", job.data); console.info("New job received!", job.data);
console.info(`Processing queue...`); console.info(`Processing queue...`);
@@ -130,11 +121,12 @@ export default class QueueService extends Service {
}, },
} }
); );
return Promise.resolve({ console.log("JAMA", dbImportResult);
return {
dbImportResult, dbImportResult,
id: job.id, id: job.id,
worker: process.pid, worker: process.pid,
}); };
}, },
}, },
}, },
@@ -195,12 +187,11 @@ export default class QueueService extends Service {
await this.getQueue("process.import").on( await this.getQueue("process.import").on(
"completed", "completed",
async (job, res) => { async (job, res) => {
await this.broker.call('socket.broadcast', { await this.broker.call("socket.broadcast", {
namespace: '/', //optional namespace: "/", //optional
event: "action", event: "action",
args: [{ type: "LS_COVER_EXTRACTED", result: res }], //optional args: [{ type: "LS_COVER_EXTRACTED", result: res }], //optional
});
})
console.info(`Job with the id '${job.id}' completed.`); console.info(`Job with the id '${job.id}' completed.`);
} }
); );

View File

@@ -23,7 +23,7 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
*/ */
/* /*
@@ -101,7 +101,7 @@ export default class ImportService extends Service {
klaw(path.resolve(COMICS_DIRECTORY)) klaw(path.resolve(COMICS_DIRECTORY))
// 1.1 Filter on .cb* extensions // 1.1 Filter on .cb* extensions
.pipe( .pipe(
through2.obj(function (item, enc, next) { through2.obj(function(item, enc, next) {
let fileExtension = path.extname(item.path); let fileExtension = path.extname(item.path);
if ( if (
[".cbz", ".cbr", ".cb7"].includes( [".cbz", ".cbr", ".cb7"].includes(
@@ -123,18 +123,20 @@ export default class ImportService extends Service {
let comicExists = await Comic.exists({ let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename( "rawFileDetails.name": `${path.basename(
item.path, item.path,
path.extname(item.path), path.extname(item.path)
)}`, )}`,
}); });
if (!comicExists) { if (!comicExists) {
// 2. Send the extraction job to the queue // 2. Send the extraction job to the queue
await broker.call("importqueue.processImport", { await broker.call(
fileObject: { "importqueue.processImport",
filePath: item.path, {
fileSize: item.stats.size, fileObject: {
}, filePath: item.path,
}); fileSize: item.stats.size,
},
}
);
} else { } else {
console.log( console.log(
"Comic already exists in the library." "Comic already exists in the library."
@@ -169,45 +171,34 @@ export default class ImportService extends Service {
}; };
}> }>
) { ) {
let volumeDetails; try {
const comicMetadata = ctx.params; let volumeDetails;
console.log(comicMetadata); const comicMetadata = ctx.params;
// When an issue is added from the search CV feature console.log(JSON.stringify(comicMetadata, null, 4));
if ( // When an issue is added from the search CV feature
comicMetadata.sourcedMetadata.comicvine && if (
!isNil( comicMetadata.sourcedMetadata.comicvine &&
comicMetadata.sourcedMetadata.comicvine.volume !isNil(
) comicMetadata.sourcedMetadata.comicvine
) { .volume
volumeDetails = await this.broker.call( )
"comicvine.getVolumes", ) {
{ volumeDetails = await this.broker.call(
volumeURI: "comicvine.getVolumes",
comicMetadata.sourcedMetadata.comicvine {
.volume.api_detail_url, volumeURI:
} comicMetadata.sourcedMetadata
); .comicvine.volume
comicMetadata.sourcedMetadata.comicvine.volumeInformation = .api_detail_url,
volumeDetails.results; }
} );
return await Comic.create( comicMetadata.sourcedMetadata.comicvine.volumeInformation =
ctx.params, volumeDetails.results;
(error, data) => {
if (data) {
return data;
} else if (error) {
console.log("data", data);
console.log("error", error);
throw new Errors.MoleculerError(
"Failed to import comic book",
400,
"IMS_FAILED_COMIC_BOOK_IMPORT",
error
);
}
} }
); return await Comic.create(ctx.params);
} catch (error) {
console.log(error);
}
}, },
}, },
applyComicVineMetadata: { applyComicVineMetadata: {
@@ -487,9 +478,9 @@ export default class ImportService extends Service {
{ {
$match: { $match: {
"sourcedMetadata.comicvine.volumeInformation": "sourcedMetadata.comicvine.volumeInformation":
{ {
$gt: {}, $gt: {},
}, },
}, },
}, },
{ {
@@ -552,16 +543,29 @@ export default class ImportService extends Service {
.drop() .drop()
.then(async (data) => { .then(async (data) => {
console.info(data); console.info(data);
const coversFolderDeleteResult = fsExtra.emptyDirSync( const coversFolderDeleteResult =
path.resolve(`${USERDATA_DIRECTORY}/covers`) fsExtra.emptyDirSync(
); path.resolve(
const expandedFolderDeleteResult = fsExtra.emptyDirSync( `${USERDATA_DIRECTORY}/covers`
path.resolve( )
`${USERDATA_DIRECTORY}/expanded` );
) const expandedFolderDeleteResult =
); fsExtra.emptyDirSync(
const eSIndicesDeleteResult = await ctx.broker.call("search.deleteElasticSearchIndices", {}); path.resolve(
return { data, coversFolderDeleteResult, expandedFolderDeleteResult, eSIndicesDeleteResult }; `${USERDATA_DIRECTORY}/expanded`
)
);
const eSIndicesDeleteResult =
await ctx.broker.call(
"search.deleteElasticSearchIndices",
{}
);
return {
data,
coversFolderDeleteResult,
expandedFolderDeleteResult,
eSIndicesDeleteResult,
};
}) })
.catch((error) => error); .catch((error) => error);
}, },