🔧 Emitting confirmation events back to the client

This commit is contained in:
2021-12-06 08:43:06 -08:00
parent ecd5d155fc
commit 32ad866c72
2 changed files with 29 additions and 25 deletions

View File

@@ -8,6 +8,7 @@ import { createServer } from "http";
import { Server, Socket } from "socket.io"; import { Server, Socket } from "socket.io";
import { SocketIOMixin } from "../mixins/socket.io.mixin"; import { SocketIOMixin } from "../mixins/socket.io.mixin";
const SOCKET_HOST = process.env.DOCKER_HOST || `localhost`; const SOCKET_HOST = process.env.DOCKER_HOST || `localhost`;
export const io = SocketIOMixin();
export default class ApiService extends Service { export default class ApiService extends Service {
public constructor(broker: ServiceBroker) { public constructor(broker: ServiceBroker) {
super(broker); super(broker);
@@ -81,8 +82,8 @@ export default class ApiService extends Service {
}, },
events: { events: {
"**"(payload, sender, event) { "**"(payload, sender, event) {
if (this.io) if (io)
this.io.emit("event", { io.emit("event", {
sender, sender,
event, event,
payload, payload,
@@ -92,10 +93,9 @@ export default class ApiService extends Service {
methods: {}, methods: {},
started(): any { started(): any {
// Socket gateway-ish
this.io = SocketIOMixin();
// Add a connect listener // Add a connect listener
this.io.on("connection", (client) => { io.on("connection", (client) => {
console.log("Client connected via websocket!"); console.log("Client connected via websocket!");
client.on("action", async (action) => { client.on("action", async (action) => {

View File

@@ -11,6 +11,7 @@ import { SandboxedJob } from "moleculer-bull";
import { DbMixin } from "../mixins/db.mixin"; import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model"; import Comic from "../models/comic.model";
import { extractCoverFromFile2 } from "../utils/uncompression.utils"; import { extractCoverFromFile2 } from "../utils/uncompression.utils";
import { io } from "./api.service";
const REDIS_URI = process.env.REDIS_URI || `redis://0.0.0.0:6379`; const REDIS_URI = process.env.REDIS_URI || `redis://0.0.0.0:6379`;
export default class LibraryQueueService extends Service { export default class LibraryQueueService extends Service {
@@ -56,7 +57,6 @@ export default class LibraryQueueService extends Service {
{} {}
); );
return Promise.resolve({ return Promise.resolve({
dbImportResult, dbImportResult,
id: job.id, id: job.id,
@@ -66,7 +66,6 @@ export default class LibraryQueueService extends Service {
}, },
}, },
actions: { actions: {
enqueue: { enqueue: {
rest: "POST /enqueue", rest: "POST /enqueue",
params: {}, params: {},
@@ -83,28 +82,33 @@ export default class LibraryQueueService extends Service {
}, },
methods: {}, methods: {},
async started(): Promise<any> { async started(): Promise<any> {
const failed = await this.getQueue("process.import").on( io.on("connection", async (client) => {
"failed", await this.getQueue(
async (job, error) => { "process.import"
).on("failed", async (job, error) => {
console.error( console.error(
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}` `An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
); );
} });
); await this.getQueue(
const completed = await this.getQueue(
"process.import" "process.import"
).on("completed", async (job, res) => { ).on("completed", async (job, res) => {
client.emit("action", {
type: "LS_COVER_EXTRACTED",
result: res,
});
console.info( console.info(
`Job with the id '${job.id}' completed.` `Job with the id '${job.id}' completed.`
); );
}); });
const stalled = await this.getQueue( await this.getQueue(
"process.import" "process.import"
).on("stalled", async (job) => { ).on("stalled", async (job) => {
console.warn( console.warn(
`The job with the id '${job} got stalled!` `The job with the id '${job} got stalled!`
); );
}); });
});
}, },
}, },
schema schema