"use strict"; import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer"; import { JobType } from "moleculer-bullmq"; import { createClient } from "redis"; import { createAdapter } from "@socket.io/redis-adapter"; import Session from "../models/session.model"; import { pubClient, subClient } from "../config/redis.config"; const { MoleculerError } = require("moleculer").Errors; const SocketIOService = require("moleculer-io"); const { v4: uuidv4 } = require("uuid"); import AirDCPPSocket from "../shared/airdcpp.socket"; import type { Socket as IOSocket } from "socket.io"; import { namespace } from "../moleculer.config"; // Context type carrying the Socket.IO socket in meta type SocketCtx

= Context; export default class SocketService extends Service { // @ts-ignore public constructor( public broker: ServiceBroker, schema: ServiceSchema<{}> = { name: "socket" } ) { super(broker); this.parseServiceSchema({ name: "socket", mixins: [SocketIOService], settings: { port: process.env.PORT || 3001, io: { namespaces: { "/automated": { events: { call: { whitelist: [ "socket.*", // Allow 'search' in the automated namespace ], }, }, }, "/manual": { events: { call: { whitelist: ["socket.*"] }, }, }, }, options: { adapter: createAdapter(pubClient, subClient), }, }, }, hooks: {}, actions: { resumeSession: async (ctx: Context<{ sessionId: string }>) => { const { sessionId } = ctx.params; console.log("Attempting to resume session..."); try { const sessionRecord = await Session.find({ sessionId, }); // 1. Check for sessionId's existence, and a match if ( sessionRecord.length !== 0 && sessionRecord[0].sessionId === sessionId ) { // 2. Find if the queue has active, paused or waiting jobs const jobs: JobType = await this.broker.call( "jobqueue.getJobCountsByType", {} ); const { active, paused, waiting } = jobs; if (active > 0 || paused > 0 || waiting > 0) { // 3. Get job counts const completedJobCount = await pubClient.get( "completedJobCount" ); const failedJobCount = await pubClient.get( "failedJobCount" ); // 4. Send the counts to the active socket.io session await this.broker.call("socket.broadcast", { namespace: "/", event: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION", args: [ { completedJobCount, failedJobCount, queueStatus: "running", }, ], }); } } } catch (err) { throw new MoleculerError( err, 500, "SESSION_ID_NOT_FOUND", { data: sessionId, } ); } }, setQueueStatus: async ( ctx: Context<{ queueAction: string; queueStatus: string; }> ) => { const { queueAction } = ctx.params; await this.broker.call( "jobqueue.toggle", { action: queueAction }, {} ); }, importSingleIssue: async (ctx: Context<{}>) => { console.info("AirDC++ finished a download -> "); console.log(ctx.params); // await this.broker.call( // "library.importDownloadedComic", // { bundle: data }, // {} // ); }, search: { params: { query: "object", config: "object", }, async handler(ctx) { const { query, config, namespace } = ctx.params; const namespacedInstance = this.io.of(namespace || "/"); const ADCPPSocket = new AirDCPPSocket(config); try { await ADCPPSocket.connect(); const instance = await ADCPPSocket.post( "search", query ); // Send the instance to the client await namespacedInstance.emit("searchInitiated", { instance, }); // Setting up listeners await ADCPPSocket.addListener( `search`, `search_result_added`, (groupedResult) => { console.log( JSON.stringify(groupedResult, null, 4) ); namespacedInstance.emit( "searchResultAdded", groupedResult ); }, instance.id ); await ADCPPSocket.addListener( `search`, `search_result_updated`, (updatedResult) => { namespacedInstance.emit( "searchResultUpdated", updatedResult ); }, instance.id ); await ADCPPSocket.addListener( `search`, `search_hub_searches_sent`, async (searchInfo) => { await this.sleep(5000); const currentInstance = await ADCPPSocket.get( `search/${instance.id}` ); // Send the instance to the client await namespacedInstance.emit( "searchesSent", { searchInfo, } ); if (currentInstance.result_count === 0) { console.log("No more search results."); namespacedInstance.emit( "searchComplete", { message: "No more search results.", } ); } }, instance.id ); // Perform the actual search await ADCPPSocket.post( `search/${instance.id}/hub_search`, query ); } catch (error) { await namespacedInstance.emit( "searchError", error.message ); throw new MoleculerError( "Search failed", 500, "SEARCH_FAILED", { error, } ); } finally { // await ADCPPSocket.disconnect(); } }, }, download: { // params: { // searchInstanceId: "string", // resultId: "string", // comicObjectId: "string", // name: "string", // size: "number", // type: "any", // Define more specific type if possible // config: "object", // }, async handler(ctx) { console.log(ctx.params); const { searchInstanceId, resultId, config, comicObjectId, name, size, type, } = ctx.params; const ADCPPSocket = new AirDCPPSocket(config); try { await ADCPPSocket.connect(); const downloadResult = await ADCPPSocket.post( `search/${searchInstanceId}/results/${resultId}/download` ); if (downloadResult && downloadResult.bundle_info) { // Assume bundle_info is part of the response and contains the necessary details const bundleDBImportResult = await ctx.call( "library.applyAirDCPPDownloadMetadata", { bundleId: downloadResult.bundle_info.id, comicObjectId, name, size, type, } ); this.logger.info( "Download and metadata update successful", bundleDBImportResult ); this.broker.emit( "downloadCompleted", bundleDBImportResult ); return bundleDBImportResult; } else { throw new Error( "Failed to download or missing download result information" ); } } catch (error) { this.broker.emit("downloadError", error.message); throw new MoleculerError( "Download failed", 500, "DOWNLOAD_FAILED", { error, } ); } finally { // await ADCPPSocket.disconnect(); } }, }, /** * Compute and broadcast current library statistics to all connected Socket.IO clients. * Called after every filesystem event (add, unlink, etc.) to keep the UI in sync. * Emits a single `LS_LIBRARY_STATS` event with totalLocalFiles, alreadyImported, * newFiles, missingFiles, and percentageImported. */ broadcastLibraryStatistics: async (ctx: Context<{ directoryPath?: string }>) => { try { const result: any = await this.broker.call("library.getImportStatistics", { directoryPath: ctx.params?.directoryPath, }); await this.broker.call("socket.broadcast", { namespace: "/", event: "LS_LIBRARY_STATS", args: [result], }); } catch (err) { this.logger.error("[Socket] broadcastLibraryStatistics failed:", err); } }, listenFileProgress: { params: { config: "object", namespace: "string" }, async handler( ctx: SocketCtx<{ config: any; namespace: string }> ) { const { config, namespace } = ctx.params; const namespacedInstance = this.io.of(namespace || "/"); const ADCPPSocket = new AirDCPPSocket(config); try { // Connect once await ADCPPSocket.connect(); await ADCPPSocket.addListener( "queue", "queue_bundle_tick", async (data) => { console.log( `is mulk ne har shakz ko jo kaam tha saupa \nus shakz ne us kaam ki maachis jala di` ); namespacedInstance.emit("downloadTick", data) } ); } catch {} }, }, }, events: { // File watcher events - forward to Socket.IO clients async "add"(ctx: Context<{ path: string }>) { console.log(`[File Watcher] File added: ${ctx.params.path}`); await this.broker.call("socket.broadcast", { namespace: "/", event: "LS_FILE_ADDED", args: [ { path: ctx.params.path, timestamp: new Date().toISOString(), }, ], }); // Emit updated statistics after file addition try { await this.broker.call("socket.broadcastLibraryStatistics", {}); } catch (err) { console.error("Failed to emit library statistics after file add:", err); } }, async "unlink"(ctx: Context<{ path: string }>) { console.log(`[File Watcher] File removed: ${ctx.params.path}`); await this.broker.call("socket.broadcast", { namespace: "/", event: "LS_FILE_REMOVED", args: [ { path: ctx.params.path, timestamp: new Date().toISOString(), }, ], }); // Emit updated statistics after file removal try { await this.broker.call("socket.broadcastLibraryStatistics", {}); } catch (err) { console.error("Failed to emit library statistics after file remove:", err); } }, async "addDir"(ctx: Context<{ path: string }>) { console.log(`[File Watcher] Directory added: ${ctx.params.path}`); await this.broker.call("socket.broadcast", { namespace: "/", event: "LS_DIRECTORY_ADDED", args: [ { path: ctx.params.path, timestamp: new Date().toISOString(), }, ], }); // Emit updated statistics after directory addition try { await this.broker.call("socket.broadcastLibraryStatistics", {}); } catch (err) { console.error("Failed to emit library statistics after directory add:", err); } }, async "unlinkDir"(ctx: Context<{ path: string }>) { console.log(`[File Watcher] Directory removed: ${ctx.params.path}`); await this.broker.call("socket.broadcast", { namespace: "/", event: "LS_DIRECTORY_REMOVED", args: [ { path: ctx.params.path, timestamp: new Date().toISOString(), }, ], }); // Emit updated statistics after directory removal try { await this.broker.call("socket.broadcastLibraryStatistics", {}); } catch (err) { console.error("Failed to emit library statistics after directory remove:", err); } }, async "change"(ctx: Context<{ path: string }>) { console.log(`[File Watcher] File changed: ${ctx.params.path}`); await this.broker.call("socket.broadcast", { namespace: "/", event: "LS_FILE_CHANGED", args: [ { path: ctx.params.path, timestamp: new Date().toISOString(), }, ], }); }, }, methods: { sleep: (ms: number): Promise => { return new Promise((resolve) => setTimeout(resolve, ms)); }, handleSocketConnection: async function (socket: any) { this.logger.info( `Socket connected with session ID: ${socket.id}` ); console.log("Looking up sessionId in Mongo..."); const sessionIdExists = await Session.find({ sessionId: socket.handshake.query.sessionId, }); if (sessionIdExists.length === 0) { console.log( `Socket Id ${socket.id} not found in Mongo, creating a new session...` ); const sessionId = uuidv4(); socket.sessionId = sessionId; console.log(`Saving session ${sessionId} to Mongo...`); await Session.create({ sessionId, socketId: socket.id, }); socket.emit("sessionInitialized", sessionId); } else { console.log(`Found socketId ${socket.id}, no-op.`); } }, }, async started() { this.io.of("/manual").on("connection", async (socket) => { console.log( `socket.io server connected to /manual namespace` ); }); this.io.on("connection", async (socket) => { console.log( `socket.io server connected to client with session ID: ${socket.id}` ); console.log("Looking up sessionId in Mongo..."); const sessionIdExists = await Session.find({ sessionId: socket.handshake.query.sessionId, }); // 1. if sessionId isn't found in Mongo, create one and persist it if (sessionIdExists.length === 0) { console.log( `Socket Id ${socket.id} not found in Mongo, creating a new session...` ); const sessionId = uuidv4(); socket.sessionId = sessionId; console.log(`Saving session ${sessionId} to Mongo...`); await Session.create({ sessionId, socketId: socket.id, }); socket.emit("sessionInitialized", sessionId); } // 2. else, retrieve it from Mongo and "resume" the socket.io connection else { console.log(`Found socketId ${socket.id}, no-op.`); } }); }, }); } }