🪢 Scaffold for elasticsearch

This commit is contained in:
2021-12-17 19:41:21 -08:00
parent 32ad866c72
commit c316a1e0bc
7 changed files with 13137 additions and 623 deletions

View File

@@ -1,6 +1,17 @@
const mongoose = require("mongoose");
var mexp = require('mongoose-elasticsearch-xp').v7;
const paginate = require("mongoose-paginate-v2");
const { Client } = require("@elastic/elasticsearch");
const eSClient = new Client({
node: "http://ghost:9200",
auth: {
username: "elastic",
password: "password",
},
});
const ComicSchema = mongoose.Schema({
importStatus: {
isImported: Boolean,
@@ -34,7 +45,7 @@ const ComicSchema = mongoose.Schema({
gcd: {},
},
rawFileDetails: {
name: String,
name: { type: String, es_indexed: true },
path: String,
fileSize: Number,
extension: String,
@@ -63,7 +74,9 @@ const ComicSchema = mongoose.Schema({
},
},
}, { timestamps: true});
ComicSchema.plugin(mexp, {
client: eSClient,
});
ComicSchema.plugin(paginate);
const Comic = mongoose.model("Comic", ComicSchema);
export default Comic;

12554
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -35,13 +35,14 @@
"typescript": "^3.9.10"
},
"dependencies": {
"7zip-bin": "^5.1.1",
"7zip-min": "^1.4.0",
"@elastic/elasticsearch": "^7.15.0",
"@root/walk": "^1.1.0",
"@types/jest": "^25.1.4",
"@types/mkdirp": "^1.0.0",
"@types/node": "^13.9.8",
"@types/string-similarity": "^4.0.0",
"7zip-bin": "^5.1.1",
"7zip-min": "^1.4.0",
"chokidar": "^3.5.2",
"dotenv": "^10.0.0",
"fs-extra": "^10.0.0",
@@ -53,12 +54,14 @@
"lodash": "^4.17.21",
"mkdirp": "^0.5.5",
"moleculer": "^0.14.16",
"moleculer-addons": "github:rishighan/moleculer-addons#master",
"moleculer-bull": "^0.2.8",
"moleculer-db": "^0.8.13",
"moleculer-db-adapter-mongo": "^0.4.7",
"moleculer-db-adapter-mongoose": "^0.8.9",
"moleculer-web": "^0.10.3",
"mongoose": "^5.12.7",
"mongoose-elasticsearch-xp": "^5.8.0",
"mongoose-paginate-v2": "^1.3.18",
"nats": "^1.3.2",
"node-7z": "^3.0.0",

View File

@@ -72,8 +72,8 @@ export default class ApiService extends Service {
},
],
log4XXResponses: false,
logRequestParams: null,
logResponseData: null,
logRequestParams: true,
logResponseData: true,
assets: {
folder: "public",
// Options to `server-static` module

View File

@@ -28,507 +28,450 @@ import path from "path";
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
export default class ImportService extends Service {
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "import" }
) {
public constructor(public broker: ServiceBroker) {
super(broker);
this.parseServiceSchema(
Service.mergeSchemas(
{
name: "import",
mixins: [DbMixin("comics", Comic)],
settings: {
// Available fields in the responses
fields: ["_id", "name", "quantity", "price"],
// Validator for the `create` & `insert` actions.
entityValidator: {
name: "string|min:3",
price: "number|positive",
},
this.parseServiceSchema({
name: "import",
mixins: [DbMixin("comics", Comic)],
hooks: {},
actions: {
walkFolders: {
rest: "POST /walkFolders",
params: {
basePathToWalk: "string",
},
hooks: {},
actions: {
walkFolders: {
rest: "POST /walkFolders",
params: {
basePathToWalk: "string",
},
async handler(
ctx: Context<{ basePathToWalk: string }>
) {
return await walkFolder(
ctx.params.basePathToWalk,
[".cbz", ".cbr"]
);
},
},
convertXMLToJSON: {
rest: "POST /convertXmlToJson",
params: {},
async handler(ctx: Context<{}>) {
return convertXMLToJSON("lagos");
},
},
newImport: {
rest: "POST /newImport",
params: {},
async handler(
ctx: Context<{
extractionOptions?: any;
}>
) {
// 1. Walk the Source folder
klaw(path.resolve(COMICS_DIRECTORY))
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (
item,
enc,
next
) {
let fileExtension = path.extname(
item.path
);
if (
[
".cbz",
".cbr",
".cb7",
].includes(fileExtension)
) {
this.push(item);
}
next();
})
)
// 1.2 Pipe filtered results to the next step
.on("data", async (item) => {
console.info(
"Found a file at path: %s",
item.path
);
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
path.extname(item.path)
)}`,
});
if (!comicExists) {
// 2. Send the extraction job to the queue
await broker.call(
"libraryqueue.enqueue",
{
fileObject: {
filePath: item.path,
size: item.stats.size,
},
}
);
} else {
console.log(
"Comic already exists in the library."
);
}
})
.on("end", () => {
console.log("Import process complete.");
});
},
},
nicefyPath: {
rest: "POST /nicefyPath",
params: {},
async handler(
ctx: Context<{
filePath: string;
}>
) {
return explodePath(ctx.params.filePath);
},
},
processAndImportToDB: {
rest: "POST /processAndImportToDB",
params: {},
async handler(
ctx: Context<{
extractionOptions: any;
walkedFolders: {
name: string;
path: string;
extension: string;
containedIn: string;
fileSize: number;
isFile: boolean;
isLink: boolean;
};
}>
) {
try {
const { extractionOptions, walkedFolders } =
ctx.params;
let comicExists = await Comic.exists({
"rawFileDetails.name": `${walkedFolders.name}`,
});
// rough flow of import process
// 1. Walk folder
// 2. For each folder, call extract function
// 3. For each successful extraction, run dbImport
if (!comicExists) {
// 1. Extract cover and cover metadata
let comicBookCoverMetadata:
| IExtractedComicBookCoverFile
| IExtractComicBookCoverErrorResponse
| IExtractedComicBookCoverFile[] = await extractCoverFromFile2(
extractionOptions,
);
// 2. Add to mongo
const dbImportResult =
await this.broker.call(
"import.rawImportToDB",
{
importStatus: {
isImported: true,
tagged: false,
matchedResult: {
score: "0",
},
},
rawFileDetails:
comicBookCoverMetadata,
sourcedMetadata: {
comicvine: {},
},
},
{}
);
return {
comicBookCoverMetadata,
dbImportResult,
};
} else {
console.info(
`Comic: \"${walkedFolders.name}\" already exists in the database`
);
}
} catch (error) {
console.error(
"Error importing comic books",
error
);
}
},
},
rawImportToDB: {
rest: "POST /rawImportToDB",
params: {},
async handler(
ctx: Context<{
sourcedMetadata: {
comicvine: {
volume: { api_detail_url: string };
volumeInformation: {};
};
};
rawFileDetails: {
name: string;
};
}>
) {
let volumeDetails;
const comicMetadata = ctx.params;
if (
comicMetadata.sourcedMetadata.comicvine &&
!isNil(
comicMetadata.sourcedMetadata.comicvine
.volume
)
) {
volumeDetails =
await this.getComicVineVolumeMetadata(
comicMetadata.sourcedMetadata
.comicvine.volume.api_detail_url
);
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
volumeDetails;
}
return new Promise(async (resolve, reject) => {
Comic.create(ctx.params, (error, data) => {
if (data) {
resolve(data);
} else if (error) {
throw new Errors.MoleculerError(
"Failed to import comic book",
400,
"IMS_FAILED_COMIC_BOOK_IMPORT",
data
);
}
});
});
},
},
applyComicVineMetadata: {
rest: "POST /applyComicVineMetadata",
params: {},
async handler(
ctx: Context<{
match: {
volume: { api_detail_url: string };
volumeInformation: object;
};
comicObjectId: string;
}>
) {
// 1. Find mongo object by id
// 2. Import payload into sourcedMetadata.comicvine
const comicObjectId = new ObjectId(
ctx.params.comicObjectId
);
const matchedResult = ctx.params.match;
let volumeDetailsPromise;
if (!isNil(matchedResult.volume)) {
volumeDetailsPromise =
this.getComicVineVolumeMetadata(
matchedResult.volume.api_detail_url
);
}
return new Promise(async (resolve, reject) => {
const volumeDetails =
await volumeDetailsPromise;
matchedResult.volumeInformation =
volumeDetails;
Comic.findByIdAndUpdate(
comicObjectId,
{
sourcedMetadata: {
comicvine: matchedResult,
},
},
{ new: true },
(err, result) => {
if (err) {
console.info(err);
reject(err);
} else {
// 3. Fetch and append volume information
resolve(result);
}
}
);
});
},
},
applyAirDCPPDownloadMetadata: {
rest: "POST /applyAirDCPPDownloadMetadata",
params: {},
async handler(
ctx: Context<{
comicObjectId: string;
resultId: string;
bundleId: string;
directoryIds: [];
searchInstanceId: string;
}>
) {
const comicObjectId = new ObjectId(
ctx.params.comicObjectId
);
return new Promise((resolve, reject) => {
Comic.findByIdAndUpdate(
comicObjectId,
{
$push: {
"acquisition.directconnect": {
resultId:
ctx.params.resultId,
bundleId:
ctx.params.bundleId,
directoryIds:
ctx.params.directoryIds,
searchInstanceId:
ctx.params
.searchInstanceId,
},
},
},
{ new: true, safe: true, upsert: true },
(err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
}
);
});
},
},
getComicBooks: {
rest: "POST /getComicBooks",
params: {},
async handler(
ctx: Context<{ paginationOptions: object }>
) {
return await Comic.paginate(
{},
ctx.params.paginationOptions
);
},
},
getComicBookById: {
rest: "POST /getComicBookById",
params: { id: "string" },
async handler(ctx: Context<{ id: string }>) {
return await Comic.findById(ctx.params.id);
},
},
getComicBookGroups: {
rest: "GET /getComicBookGroups",
params: {},
async handler(ctx: Context<{}>) {
let volumesMetadata = [];
// 1. get volumes with issues mapped where issue count > 2
const volumes = await Comic.aggregate([
{
$group: {
_id: "$sourcedMetadata.comicvine.volume.id",
volumeURI: {
$last: "$sourcedMetadata.comicvine.volume.api_detail_url",
},
count: { $sum: 1 },
},
},
{
$match: {
count: { $gte: 2 },
},
},
{ $sort: { updatedAt: -1 } },
{ $skip: 0 },
{ $limit: 5 },
]);
// 2. Map over the aggregation result and get volume metadata from CV
// 2a. Make a call to comicvine-service
volumesMetadata = map(
volumes,
async (volume) => {
if (!isNil(volume.volumeURI)) {
return await ctx.call(
"comicvine.getVolumes",
{
volumeURI: volume.volumeURI,
data: {
format: "json",
fieldList:
"id,name,deck,api_detail_url",
limit: "1",
offset: "0",
},
}
);
}
}
);
return Promise.all(volumesMetadata);
},
},
flushDB: {
rest: "POST /flushDB",
params: {},
async handler(ctx: Context<{}>) {
return await Comic.collection
.drop()
.then((data) => {
console.info(data);
const foo = fsExtra.emptyDirSync(
path.resolve(
`${USERDATA_DIRECTORY}/covers`
)
);
const foo2 = fsExtra.emptyDirSync(
path.resolve(
`${USERDATA_DIRECTORY}/expanded`
)
);
return { data, foo, foo2 };
})
.catch((error) => error);
},
},
scrapeIssueNamesFromDOM: {
rest: "POST /scrapeIssueNamesFromDOM",
params: {},
async handler(ctx: Context<{ html: string }>) {
return scrapeIssuesFromDOM(ctx.params.html);
},
},
unrarArchive: {
rest: "POST /unrarArchive",
params: {},
timeout: 10000,
async handler(
ctx: Context<{
filePath: string;
options: IExtractionOptions;
}>
) {
return await unrarArchive(
ctx.params.filePath,
ctx.params.options
);
},
},
},
methods: {
getComicVineVolumeMetadata: (apiDetailURL) =>
new Promise((resolve, reject) => {
const options = {
headers: {
"User-Agent": "ThreeTwo",
},
};
return https
.get(
`${apiDetailURL}?api_key=${process.env.COMICVINE_API_KEY}&format=json&limit=1&offset=0&field_list=id,name,description,image,first_issue,last_issue,publisher,count_of_issues,character_credits,person_credits,aliases`,
options,
(resp) => {
let data = "";
resp.on("data", (chunk) => {
data += chunk;
});
resp.on("end", () => {
console.info(
data,
"HERE, BITCHES< HERE"
);
const volumeInformation =
JSON.parse(data);
resolve(
volumeInformation.results
);
});
}
)
.on("error", (err) => {
console.info("Error: " + err.message);
reject(err);
});
}),
async handler(ctx: Context<{ basePathToWalk: string }>) {
return await walkFolder(ctx.params.basePathToWalk, [
".cbz",
".cbr",
]);
},
},
schema
)
);
convertXMLToJSON: {
rest: "POST /convertXmlToJson",
params: {},
async handler(ctx: Context<{}>) {
return convertXMLToJSON("lagos");
},
},
newImport: {
rest: "POST /newImport",
params: {},
async handler(
ctx: Context<{
extractionOptions?: any;
}>
) {
// 1. Walk the Source folder
klaw(path.resolve(COMICS_DIRECTORY))
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
let fileExtension = path.extname(item.path);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
this.push(item);
}
next();
})
)
// 1.2 Pipe filtered results to the next step
.on("data", async (item) => {
console.info(
"Found a file at path: %s",
item.path
);
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
path.extname(item.path)
)}`,
});
if (!comicExists) {
// 2. Send the extraction job to the queue
await broker.call("libraryqueue.enqueue", {
fileObject: {
filePath: item.path,
size: item.stats.size,
},
});
} else {
console.log(
"Comic already exists in the library."
);
}
})
.on("end", () => {
console.log("Import process complete.");
});
},
},
nicefyPath: {
rest: "POST /nicefyPath",
params: {},
async handler(
ctx: Context<{
filePath: string;
}>
) {
return explodePath(ctx.params.filePath);
},
},
processAndImportToDB: {
rest: "POST /processAndImportToDB",
params: {},
async handler(
ctx: Context<{
extractionOptions: any;
walkedFolders: {
name: string;
path: string;
extension: string;
containedIn: string;
fileSize: number;
isFile: boolean;
isLink: boolean;
};
}>
) {
try {
const { extractionOptions, walkedFolders } =
ctx.params;
let comicExists = await Comic.exists({
"rawFileDetails.name": `${walkedFolders.name}`,
});
// rough flow of import process
// 1. Walk folder
// 2. For each folder, call extract function
// 3. For each successful extraction, run dbImport
if (!comicExists) {
// 1. Extract cover and cover metadata
let comicBookCoverMetadata:
| IExtractedComicBookCoverFile
| IExtractComicBookCoverErrorResponse
| IExtractedComicBookCoverFile[] = await extractCoverFromFile2(
extractionOptions
);
// 2. Add to mongo
const dbImportResult = await this.broker.call(
"import.rawImportToDB",
{
importStatus: {
isImported: true,
tagged: false,
matchedResult: {
score: "0",
},
},
rawFileDetails: comicBookCoverMetadata,
sourcedMetadata: {
comicvine: {},
},
},
{}
);
return {
comicBookCoverMetadata,
dbImportResult,
};
} else {
console.info(
`Comic: \"${walkedFolders.name}\" already exists in the database`
);
}
} catch (error) {
console.error("Error importing comic books", error);
}
},
},
rawImportToDB: {
rest: "POST /rawImportToDB",
params: {},
async handler(
ctx: Context<{
sourcedMetadata: {
comicvine: {
volume: { api_detail_url: string };
volumeInformation: {};
};
};
rawFileDetails: {
name: string;
};
}>
) {
let volumeDetails;
const comicMetadata = ctx.params;
if (
comicMetadata.sourcedMetadata.comicvine &&
!isNil(
comicMetadata.sourcedMetadata.comicvine.volume
)
) {
volumeDetails =
await this.getComicVineVolumeMetadata(
comicMetadata.sourcedMetadata.comicvine
.volume.api_detail_url
);
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
volumeDetails;
}
return new Promise(async (resolve, reject) => {
Comic.create(ctx.params, (error, data) => {
if (data) {
resolve(data);
} else if (error) {
throw new Errors.MoleculerError(
"Failed to import comic book",
400,
"IMS_FAILED_COMIC_BOOK_IMPORT",
data
);
}
});
});
},
},
applyComicVineMetadata: {
rest: "POST /applyComicVineMetadata",
params: {},
async handler(
ctx: Context<{
match: {
volume: { api_detail_url: string };
volumeInformation: object;
};
comicObjectId: string;
}>
) {
// 1. Find mongo object by id
// 2. Import payload into sourcedMetadata.comicvine
const comicObjectId = new ObjectId(
ctx.params.comicObjectId
);
const matchedResult = ctx.params.match;
let volumeDetailsPromise;
if (!isNil(matchedResult.volume)) {
volumeDetailsPromise =
this.getComicVineVolumeMetadata(
matchedResult.volume.api_detail_url
);
}
return new Promise(async (resolve, reject) => {
const volumeDetails = await volumeDetailsPromise;
matchedResult.volumeInformation = volumeDetails;
Comic.findByIdAndUpdate(
comicObjectId,
{
sourcedMetadata: {
comicvine: matchedResult,
},
},
{ new: true },
(err, result) => {
if (err) {
console.info(err);
reject(err);
} else {
// 3. Fetch and append volume information
resolve(result);
}
}
);
});
},
},
applyAirDCPPDownloadMetadata: {
rest: "POST /applyAirDCPPDownloadMetadata",
params: {},
async handler(
ctx: Context<{
comicObjectId: string;
resultId: string;
bundleId: string;
directoryIds: [];
searchInstanceId: string;
}>
) {
const comicObjectId = new ObjectId(
ctx.params.comicObjectId
);
return new Promise((resolve, reject) => {
Comic.findByIdAndUpdate(
comicObjectId,
{
$push: {
"acquisition.directconnect": {
resultId: ctx.params.resultId,
bundleId: ctx.params.bundleId,
directoryIds:
ctx.params.directoryIds,
searchInstanceId:
ctx.params.searchInstanceId,
},
},
},
{ new: true, safe: true, upsert: true },
(err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
}
);
});
},
},
getComicBooks: {
rest: "POST /getComicBooks",
params: {},
async handler(ctx: Context<{ paginationOptions: object }>) {
return await Comic.paginate(
{},
ctx.params.paginationOptions
);
},
},
getComicBookById: {
rest: "POST /getComicBookById",
params: { id: "string" },
async handler(ctx: Context<{ id: string }>) {
return await Comic.findById(ctx.params.id);
},
},
getComicBookGroups: {
rest: "GET /getComicBookGroups",
params: {},
async handler(ctx: Context<{}>) {
let volumesMetadata = [];
// 1. get volumes with issues mapped where issue count > 2
const volumes = await Comic.aggregate([
{
$group: {
_id: "$sourcedMetadata.comicvine.volume.id",
volumeURI: {
$last: "$sourcedMetadata.comicvine.volume.api_detail_url",
},
count: { $sum: 1 },
},
},
{
$match: {
count: { $gte: 2 },
},
},
{ $sort: { updatedAt: -1 } },
{ $skip: 0 },
{ $limit: 5 },
]);
// 2. Map over the aggregation result and get volume metadata from CV
// 2a. Make a call to comicvine-service
volumesMetadata = map(volumes, async (volume) => {
if (!isNil(volume.volumeURI)) {
return await ctx.call("comicvine.getVolumes", {
volumeURI: volume.volumeURI,
data: {
format: "json",
fieldList:
"id,name,deck,api_detail_url",
limit: "1",
offset: "0",
},
});
}
});
return Promise.all(volumesMetadata);
},
},
flushDB: {
rest: "POST /flushDB",
params: {},
async handler(ctx: Context<{}>) {
return await Comic.collection
.drop()
.then((data) => {
console.info(data);
const foo = fsExtra.emptyDirSync(
path.resolve(`${USERDATA_DIRECTORY}/covers`)
);
const foo2 = fsExtra.emptyDirSync(
path.resolve(
`${USERDATA_DIRECTORY}/expanded`
)
);
return { data, foo, foo2 };
})
.catch((error) => error);
},
},
scrapeIssueNamesFromDOM: {
rest: "POST /scrapeIssueNamesFromDOM",
params: {},
async handler(ctx: Context<{ html: string }>) {
return scrapeIssuesFromDOM(ctx.params.html);
},
},
unrarArchive: {
rest: "POST /unrarArchive",
params: {},
timeout: 10000,
async handler(
ctx: Context<{
filePath: string;
options: IExtractionOptions;
}>
) {
return await unrarArchive(
ctx.params.filePath,
ctx.params.options
);
},
},
},
methods: {
getComicVineVolumeMetadata: (apiDetailURL) =>
new Promise((resolve, reject) => {
const options = {
headers: {
"User-Agent": "ThreeTwo",
},
};
return https
.get(
`${apiDetailURL}?api_key=${process.env.COMICVINE_API_KEY}&format=json&limit=1&offset=0&field_list=id,name,description,image,first_issue,last_issue,publisher,count_of_issues,character_credits,person_credits,aliases`,
options,
(resp) => {
let data = "";
resp.on("data", (chunk) => {
data += chunk;
});
resp.on("end", () => {
console.log(`${apiDetailURL} returned data.`)
const volumeInformation =
JSON.parse(data);
resolve(volumeInformation.results);
});
}
)
.on("error", (err) => {
console.info("Error: " + err.message);
reject(err);
});
}),
},
});
}
}

View File

@@ -15,104 +15,99 @@ import { io } from "./api.service";
const REDIS_URI = process.env.REDIS_URI || `redis://0.0.0.0:6379`;
export default class LibraryQueueService extends Service {
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "libraryqueue" }
) {
public constructor(public broker: ServiceBroker) {
super(broker);
this.parseServiceSchema(
Service.mergeSchemas(
{
name: "libraryqueue",
mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
settings: {},
hooks: {},
queues: {
"process.import": {
concurrency: 30,
async process(job: SandboxedJob) {
console.info("New job received!", job.data);
console.info(`Processing queue...`);
// extract the cover
const result = await extractCoverFromFile2(
job.data.fileObject
);
this.parseServiceSchema({
name: "libraryqueue",
mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
settings: {},
hooks: {},
queues: {
"process.import": {
concurrency: 30,
async process(job: SandboxedJob) {
console.info("New job received!", job.data);
console.info(`Processing queue...`);
// extract the cover
const result = await extractCoverFromFile2(
job.data.fileObject
);
// write to mongo
const dbImportResult = await this.broker.call(
"import.rawImportToDB",
{
importStatus: {
isImported: true,
tagged: false,
matchedResult: {
score: "0",
},
},
rawFileDetails: result,
sourcedMetadata: {
comicvine: {},
},
// write to mongo
const dbImportResult = await this.broker.call(
"import.rawImportToDB",
{
importStatus: {
isImported: true,
tagged: false,
matchedResult: {
score: "0",
},
{}
);
},
rawFileDetails: result,
sourcedMetadata: {
comicvine: {},
},
},
{}
);
return Promise.resolve({
dbImportResult,
id: job.id,
worker: process.pid,
});
},
},
},
actions: {
enqueue: {
rest: "POST /enqueue",
params: {},
async handler(
ctx: Context<{
fileObject: object;
}>
) {
return await this.createJob("process.import", {
fileObject: ctx.params.fileObject,
});
},
},
},
methods: {},
async started(): Promise<any> {
io.on("connection", async (client) => {
await this.getQueue(
"process.import"
).on("failed", async (job, error) => {
console.error(
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
);
});
await this.getQueue(
"process.import"
).on("completed", async (job, res) => {
client.emit("action", {
type: "LS_COVER_EXTRACTED",
result: res,
});
console.info(
`Job with the id '${job.id}' completed.`
);
});
await this.getQueue(
"process.import"
).on("stalled", async (job) => {
console.warn(
`The job with the id '${job} got stalled!`
);
});
return Promise.resolve({
dbImportResult,
id: job.id,
worker: process.pid,
});
},
},
schema
)
);
},
actions: {
enqueue: {
rest: "POST /enqueue",
params: {},
async handler(
ctx: Context<{
fileObject: object;
}>
) {
return await this.createJob("process.import", {
fileObject: ctx.params.fileObject,
});
},
},
},
methods: {},
async started(): Promise<any> {
io.on("connection", async (client) => {
await this.getQueue("process.import").on(
"failed",
async (job, error) => {
console.error(
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
);
}
);
await this.getQueue("process.import").on(
"completed",
async (job, res) => {
client.emit("action", {
type: "LS_COVER_EXTRACTED",
result: res,
});
console.info(
`Job with the id '${job.id}' completed.`
);
}
);
await this.getQueue("process.import").on(
"stalled",
async (job) => {
console.warn(
`The job with the id '${job} got stalled!`
);
}
);
});
},
});
}
}

View File

@@ -0,0 +1,60 @@
"use strict";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
const { Client } = require("@elastic/elasticsearch");
const client = new Client({
node: "http://ghost:9200",
auth: {
username: "elastic",
password: "password",
},
});
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
console.log(client);
export default class SettingsService extends Service {
// @ts-ignore
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "search" }
) {
super(broker);
this.parseServiceSchema(
Service.mergeSchemas(
{
name: "search",
mixins: [client, DbMixin("comics", Comic)],
hooks: {},
actions: {
searchComic: {
rest: "POST /searchComic",
params: {},
async handler(ctx: Context<{}>) {
Comic.esSearch({
query_string: {
query: "batman",
},
}).then(function (results) {
// results here
console.log(results.body.hits.hits);
results.body.hits.hits.forEach((item) => console.log(item._source))
});
},
},
},
methods: {},
},
schema
)
);
}
}