🏗️ Refactor for zustand and tanstack react query support

This commit is contained in:
2023-11-09 10:22:45 -06:00
parent 3efdc7c2e2
commit 1229feb69c
2 changed files with 64 additions and 28 deletions

View File

@@ -49,9 +49,14 @@ export default class JobQueueService extends Service {
rest: "/GET enqueue", rest: "/GET enqueue",
handler: async (ctx: Context<{}>) => { handler: async (ctx: Context<{}>) => {
// Enqueue the job // Enqueue the job
const job = await this.localQueue(ctx, "enqueue.async", ctx.params, { const job = await this.localQueue(
priority: 10, ctx,
}); "enqueue.async",
ctx.params,
{
priority: 10,
}
);
console.log(`Job ${job.id} enqueued`); console.log(`Job ${job.id} enqueued`);
return job.id; return job.id;
@@ -65,13 +70,17 @@ export default class JobQueueService extends Service {
}> }>
) => { ) => {
try { try {
console.log(`Recieved Job ID ${ctx.locals.job.id}, processing...`); console.log(
`Recieved Job ID ${ctx.locals.job.id}, processing...`
);
console.log(ctx.params); console.log(ctx.params);
// 1. De-structure the job params // 1. De-structure the job params
const { fileObject } = ctx.locals.job.data.params; const { fileObject } = ctx.locals.job.data.params;
// 2. Extract metadata from the archive // 2. Extract metadata from the archive
const result = await extractFromArchive(fileObject.filePath); const result = await extractFromArchive(
fileObject.filePath
);
const { const {
name, name,
filePath, filePath,
@@ -84,7 +93,9 @@ export default class JobQueueService extends Service {
} = result; } = result;
// 3a. Infer any issue-related metadata from the filename // 3a. Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(result.name); const { inferredIssueDetails } = refineQuery(
result.name
);
console.log( console.log(
"Issue metadata inferred: ", "Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2) JSON.stringify(inferredIssueDetails, null, 2)
@@ -124,7 +135,8 @@ export default class JobQueueService extends Service {
// "acquisition.directconnect.downloads": [], // "acquisition.directconnect.downloads": [],
// mark the metadata source // mark the metadata source
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom, "acquisition.source.name":
ctx.locals.job.data.params.sourcedFrom,
}; };
// 3c. Add the bundleId, if present to the payload // 3c. Add the bundleId, if present to the payload
@@ -135,8 +147,13 @@ export default class JobQueueService extends Service {
// 3d. Add the sourcedMetadata, if present // 3d. Add the sourcedMetadata, if present
if ( if (
!isNil(ctx.locals.job.data.params.sourcedMetadata) && !isNil(
!isUndefined(ctx.locals.job.data.params.sourcedMetadata.comicvine) ctx.locals.job.data.params.sourcedMetadata
) &&
!isUndefined(
ctx.locals.job.data.params.sourcedMetadata
.comicvine
)
) { ) {
Object.assign( Object.assign(
payload.sourcedMetadata, payload.sourcedMetadata,
@@ -145,11 +162,15 @@ export default class JobQueueService extends Service {
} }
// 4. write to mongo // 4. write to mongo
const importResult = await this.broker.call("library.rawImportToDB", { const importResult = await this.broker.call(
importType: ctx.locals.job.data.params.importType, "library.rawImportToDB",
bundleId, {
payload, importType:
}); ctx.locals.job.data.params.importType,
bundleId,
payload,
}
);
return { return {
data: { data: {
importResult, importResult,
@@ -161,9 +182,14 @@ export default class JobQueueService extends Service {
console.error( console.error(
`An error occurred processing Job ID ${ctx.locals.job.id}` `An error occurred processing Job ID ${ctx.locals.job.id}`
); );
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", { throw new MoleculerError(
data: ctx.params.sessionId, error,
}); 500,
"IMPORT_JOB_ERROR",
{
data: ctx.params.sessionId,
}
);
} }
}, },
}, },
@@ -191,7 +217,8 @@ export default class JobQueueService extends Service {
statuses: { statuses: {
$push: { $push: {
status: "$_id.status", status: "$_id.status",
earliestTimestamp: "$earliestTimestamp", earliestTimestamp:
"$earliestTimestamp",
count: "$count", count: "$count",
}, },
}, },
@@ -211,7 +238,10 @@ export default class JobQueueService extends Service {
{ {
$cond: [ $cond: [
{ {
$eq: ["$$this.status", "completed"], $eq: [
"$$this.status",
"completed",
],
}, },
"$$this.count", "$$this.count",
0, 0,
@@ -231,7 +261,10 @@ export default class JobQueueService extends Service {
{ {
$cond: [ $cond: [
{ {
$eq: ["$$this.status", "failed"], $eq: [
"$$this.status",
"failed",
],
}, },
"$$this.count", "$$this.count",
0, 0,
@@ -260,10 +293,10 @@ export default class JobQueueService extends Service {
console.log("Queue drained."); console.log("Queue drained.");
await this.broker.call("socket.broadcast", { await this.broker.call("socket.broadcast", {
namespace: "/", namespace: "/",
event: "action", event: "LS_IMPORT_QUEUE_DRAINED",
args: [ args: [
{ {
type: "LS_IMPORT_QUEUE_DRAINED", message: "drained",
}, },
], ],
}); });
@@ -274,14 +307,15 @@ export default class JobQueueService extends Service {
// 2. Increment the completed job counter // 2. Increment the completed job counter
await pubClient.incr("completedJobCount"); await pubClient.incr("completedJobCount");
// 3. Fetch the completed job count for the final payload to be sent to the client // 3. Fetch the completed job count for the final payload to be sent to the client
const completedJobCount = await pubClient.get("completedJobCount"); const completedJobCount = await pubClient.get(
"completedJobCount"
);
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details // 4. Emit the LS_COVER_EXTRACTED event with the necessary details
await this.broker.call("socket.broadcast", { await this.broker.call("socket.broadcast", {
namespace: "/", namespace: "/",
event: "action", event: "LS_COVER_EXTRACTED",
args: [ args: [
{ {
type: "LS_COVER_EXTRACTED",
completedJobCount, completedJobCount,
importResult: job.returnvalue.data.importResult, importResult: job.returnvalue.data.importResult,
}, },
@@ -302,7 +336,9 @@ export default class JobQueueService extends Service {
async "enqueue.async.failed"(ctx) { async "enqueue.async.failed"(ctx) {
const job = await this.job(ctx.params.id); const job = await this.job(ctx.params.id);
await pubClient.incr("failedJobCount"); await pubClient.incr("failedJobCount");
const failedJobCount = await pubClient.get("failedJobCount"); const failedJobCount = await pubClient.get(
"failedJobCount"
);
await JobResult.create({ await JobResult.create({
id: ctx.params.id, id: ctx.params.id,
@@ -315,10 +351,9 @@ export default class JobQueueService extends Service {
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details // 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
await this.broker.call("socket.broadcast", { await this.broker.call("socket.broadcast", {
namespace: "/", namespace: "/",
event: "action", event: "LS_COVER_EXTRACTION_FAILED",
args: [ args: [
{ {
type: "LS_COVER_EXTRACTION_FAILED",
failedJobCount, failedJobCount,
importResult: job, importResult: job,
}, },

View File

@@ -57,6 +57,7 @@ export default class SettingsService extends Service {
}> }>
) { ) {
try { try {
console.log(ctx.params);
let query = {}; let query = {};
const { settingsKey, settingsObjectId } = const { settingsKey, settingsObjectId } =
ctx.params; ctx.params;