🐂 Integrated elasticsearch search for issues in volumes with BullMQ

This commit is contained in:
2022-02-01 09:32:56 -08:00
parent 6bbfcf7603
commit 2c9bd55080
6 changed files with 517 additions and 14 deletions

View File

@@ -34,7 +34,7 @@ SOFTWARE.
"use strict";
import { isNil, map } from "lodash";
import { isNil, isUndefined, map } from "lodash";
import {
Context,
Service,
@@ -55,6 +55,7 @@ import {
import { unrarArchive } from "../utils/uncompression.utils";
import { extractCoverFromFile2 } from "../utils/uncompression.utils";
import { scrapeIssuesFromDOM } from "../utils/scraping.utils";
import axios from "axios";
const ObjectId = require("mongoose").Types.ObjectId;
import fsExtra from "fs-extra";
const through2 = require("through2");
@@ -394,6 +395,7 @@ export default class ImportService extends Service {
{
$group: {
_id: "$sourcedMetadata.comicvine.volume.id",
comicObjectId : { $first: "$_id" },
volumeURI: {
$last: "$sourcedMetadata.comicvine.volume.api_detail_url",
},
@@ -412,8 +414,9 @@ export default class ImportService extends Service {
// 2. Map over the aggregation result and get volume metadata from CV
// 2a. Make a call to comicvine-service
volumesMetadata = map(volumes, async (volume) => {
console.log(volume);
if (!isNil(volume.volumeURI)) {
return await ctx.call("comicvine.getVolumes", {
const volumeMetadata = await ctx.call("comicvine.getVolumes", {
volumeURI: volume.volumeURI,
data: {
format: "json",
@@ -423,12 +426,63 @@ export default class ImportService extends Service {
offset: "0",
},
});
volumeMetadata["comicObjectId"] = volume.comicObjectId;
return volumeMetadata;
}
});
return Promise.all(volumesMetadata);
},
},
getIssuesForSeries: {
rest: "POST /getIssuesForSeries",
params: {},
handler: async (ctx:Context<{ comicObjectID: string }>) => {
// 1. Query mongo to get issues for a given volume
const comicBookDetails: any = await this.broker.call(
"import.getComicBookById",
{ id: ctx.params.comicObjectID }
);
// 2. Query CV and get metadata for them
comicBookDetails.sourcedMetadata.comicvine.volumeInformation.issues.map(
async (issue: any, idx: any) => {
const issueMetadata: any =
await axios.request({
url: `${issue.api_detail_url}?api_key=${process.env.COMICVINE_API_KEY}`,
params: {
resources: "issues",
limit: "100",
format: "json",
},
headers: {
"User-Agent": "ThreeTwo",
},
});
const metadata =
issueMetadata.data.results;
// 2a. Query Mongo with Elastic to see if a match exists for a given issue's name, and issue number
if (
!isUndefined(metadata.volume.name) &&
!isUndefined(metadata.issue_number)
) {
console.log("asdasd", metadata.volume.name);
await ctx.broker.call("libraryqueue.issuesForSeries", { queryObject: {
issueName:
metadata.volume
.name,
issueNumber:
metadata.issue_number,
}});
}
return issueMetadata.data.results;
}
);
}
},
flushDB: {
rest: "POST /flushDB",
params: {},

View File

@@ -31,9 +31,9 @@ SOFTWARE.
* Initial: 2022/01/28 Rishi Ghan
*/
"use strict";
import { isNil, isUndefined } from "lodash";
import {
Context,
Service,
@@ -41,6 +41,7 @@ import {
ServiceSchema,
Errors,
} from "moleculer";
import BullMQMixin from "moleculer-bull";
import { SandboxedJob } from "moleculer-bull";
import { DbMixin } from "../mixins/db.mixin";
@@ -95,6 +96,30 @@ export default class LibraryQueueService extends Service {
});
},
},
"issue.findMatchesInLibrary": {
concurrency: 20,
async process(job: SandboxedJob) {
try {
console.log(
"reached the issuematchinlibrary queue"
);
console.log(job.data);
const matchesInLibrary = await this.broker.call(
"search.searchComic",
{
queryObject: job.data.queryObject,
}
);
console.log(
`Matches in Library: ${matchesInLibrary}`
);
return Promise.all(matchesInLibrary);
} catch (error) {
throw error;
}
},
},
},
actions: {
enqueue: {
@@ -110,6 +135,23 @@ export default class LibraryQueueService extends Service {
});
},
},
issuesForSeries: {
rest: "POST /findIssuesForSeries",
params: {},
handler: async (
ctx: Context<{ queryObject: {
issueName: string,
issueNumber: string,
} }>
) => {
return await this.createJob(
"issue.findMatchesInLibrary",
{
queryObject: ctx.params.queryObject,
}
);
},
},
},
methods: {},
async started(): Promise<any> {
@@ -142,6 +184,35 @@ export default class LibraryQueueService extends Service {
);
}
);
await this.getQueue("issue.findMatchesInLibrary").on(
"failed",
async (job, error) => {
console.error(
`An error occured in 'issue.findMatchesInLibrary' queue on job id '${job.id}': ${error.message}`
);
}
);
await this.getQueue("issue.findMatchesInLibrary").on(
"completed",
async (job, res) => {
client.emit("action", {
type: "LS_COVER_EXTRACTED",
result: res,
});
console.info(
`Job with the id '${job.id}' completed.`
);
}
);
await this.getQueue("issue.findMatchesInLibrary").on(
"stalled",
async (job) => {
console.warn(
`The job with the id '${job} got stalled!`
);
}
);
});
},
});

View File

@@ -18,6 +18,8 @@ const client = new Client({
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import { refineQuery } from "filename-parser";
import { filter } from "lodash";
console.log(client);
@@ -38,15 +40,34 @@ export default class SettingsService extends Service {
searchComic: {
rest: "POST /searchComic",
params: {},
async handler(ctx: Context<{}>) {
Comic.esSearch({
query_string: {
query: "batman",
timeout: 400000,
async handler(
ctx: Context<{ queryObject: {
issueName: string,
issueNumber: string,
} }>
) {
console.log(ctx.params);
return Comic.esSearch({
query: {
match: {
"rawFileDetails.name": {
query: ctx.params.queryObject.issueName,
operator: "or",
fuzziness: "AUTO",
},
},
},
}).then(function (results) {
// results here
console.log(results.body.hits.hits);
results.body.hits.hits.forEach((item) => console.log(item._source))
const foo = results.body.hits.hits.map((hit) => {
const parsedFilename = refineQuery(hit._source.rawFileDetails.name);
if(parsedFilename.searchParams.searchTerms.number === parseInt(ctx.params.queryObject.issueNumber, 10)) {
return hit;
}
});
return filter(foo, null);
});
},
},