diff --git a/services/api.service.ts b/services/api.service.ts index 931bd36..20c1263 100644 --- a/services/api.service.ts +++ b/services/api.service.ts @@ -8,6 +8,7 @@ import { createServer } from "http"; import { Server, Socket } from "socket.io"; import { SocketIOMixin } from "../mixins/socket.io.mixin"; const SOCKET_HOST = process.env.DOCKER_HOST || `localhost`; +export const io = SocketIOMixin(); export default class ApiService extends Service { public constructor(broker: ServiceBroker) { super(broker); @@ -81,8 +82,8 @@ export default class ApiService extends Service { }, events: { "**"(payload, sender, event) { - if (this.io) - this.io.emit("event", { + if (io) + io.emit("event", { sender, event, payload, @@ -92,10 +93,9 @@ export default class ApiService extends Service { methods: {}, started(): any { - // Socket gateway-ish - this.io = SocketIOMixin(); + // Add a connect listener - this.io.on("connection", (client) => { + io.on("connection", (client) => { console.log("Client connected via websocket!"); client.on("action", async (action) => { diff --git a/services/libraryqueue.service.ts b/services/libraryqueue.service.ts index bc3a6ca..f33f337 100644 --- a/services/libraryqueue.service.ts +++ b/services/libraryqueue.service.ts @@ -11,6 +11,7 @@ import { SandboxedJob } from "moleculer-bull"; import { DbMixin } from "../mixins/db.mixin"; import Comic from "../models/comic.model"; import { extractCoverFromFile2 } from "../utils/uncompression.utils"; +import { io } from "./api.service"; const REDIS_URI = process.env.REDIS_URI || `redis://0.0.0.0:6379`; export default class LibraryQueueService extends Service { @@ -55,7 +56,6 @@ export default class LibraryQueueService extends Service { }, {} ); - return Promise.resolve({ dbImportResult, @@ -66,7 +66,6 @@ export default class LibraryQueueService extends Service { }, }, actions: { - enqueue: { rest: "POST /enqueue", params: {}, @@ -83,27 +82,32 @@ export default class LibraryQueueService extends Service { }, methods: {}, async started(): Promise { - const failed = await this.getQueue("process.import").on( - "failed", - async (job, error) => { + io.on("connection", async (client) => { + await this.getQueue( + "process.import" + ).on("failed", async (job, error) => { console.error( `An error occured in 'process.import' queue on job id '${job.id}': ${error.message}` ); - } - ); - const completed = await this.getQueue( - "process.import" - ).on("completed", async (job, res) => { - console.info( - `Job with the id '${job.id}' completed.` - ); - }); - const stalled = await this.getQueue( - "process.import" - ).on("stalled", async (job) => { - console.warn( - `The job with the id '${job} got stalled!` - ); + }); + await this.getQueue( + "process.import" + ).on("completed", async (job, res) => { + client.emit("action", { + type: "LS_COVER_EXTRACTED", + result: res, + }); + console.info( + `Job with the id '${job.id}' completed.` + ); + }); + await this.getQueue( + "process.import" + ).on("stalled", async (job) => { + console.warn( + `The job with the id '${job} got stalled!` + ); + }); }); }, },