diff --git a/services/socket.service.ts b/services/socket.service.ts index d7d704d..04bc726 100644 --- a/services/socket.service.ts +++ b/services/socket.service.ts @@ -33,6 +33,11 @@ export default class SocketService extends Service { }, }, }, + "/manual": { + events: { + call: { whitelist: ["socket.*"] }, + }, + }, }, options: { adapter: createAdapter(pubClient, subClient), @@ -62,8 +67,12 @@ export default class SocketService extends Service { if (active > 0 || paused > 0 || waiting > 0) { // 3. Get job counts - const completedJobCount = await pubClient.get("completedJobCount"); - const failedJobCount = await pubClient.get("failedJobCount"); + const completedJobCount = await pubClient.get( + "completedJobCount" + ); + const failedJobCount = await pubClient.get( + "failedJobCount" + ); // 4. Send the counts to the active socket.io session await this.broker.call("socket.broadcast", { @@ -80,9 +89,14 @@ export default class SocketService extends Service { } } } catch (err) { - throw new MoleculerError(err, 500, "SESSION_ID_NOT_FOUND", { - data: sessionId, - }); + throw new MoleculerError( + err, + 500, + "SESSION_ID_NOT_FOUND", + { + data: sessionId, + } + ); } }, @@ -93,7 +107,11 @@ export default class SocketService extends Service { }> ) => { const { queueAction } = ctx.params; - await this.broker.call("jobqueue.toggle", { action: queueAction }, {}); + await this.broker.call( + "jobqueue.toggle", + { action: queueAction }, + {} + ); }, importSingleIssue: async (ctx: Context<{}>) => { console.info("AirDC++ finished a download -> "); @@ -112,11 +130,15 @@ export default class SocketService extends Service { }, async handler(ctx) { const { query, config, namespace } = ctx.params; + console.log("NAMESPACE", namespace) const namespacedInstance = this.io.of(namespace || "/"); const ADCPPSocket = new AirDCPPSocket(config); try { await ADCPPSocket.connect(); - const instance = await ADCPPSocket.post("search", query); + const instance = await ADCPPSocket.post( + "search", + query + ); // Send the instance to the client await namespacedInstance.emit("searchInitiated", { @@ -128,8 +150,13 @@ export default class SocketService extends Service { `search`, `search_result_added`, (groupedResult) => { - console.log(JSON.stringify(groupedResult, null, 4)); - namespacedInstance.emit("searchResultAdded", groupedResult); + console.log( + JSON.stringify(groupedResult, null, 4) + ); + namespacedInstance.emit( + "searchResultAdded", + groupedResult + ); }, instance.id ); @@ -138,7 +165,10 @@ export default class SocketService extends Service { `search`, `search_result_updated`, (updatedResult) => { - namespacedInstance.emit("searchResultUpdated", updatedResult); + namespacedInstance.emit( + "searchResultUpdated", + updatedResult + ); }, instance.id ); @@ -148,30 +178,49 @@ export default class SocketService extends Service { `search_hub_searches_sent`, async (searchInfo) => { await this.sleep(5000); - const currentInstance = await ADCPPSocket.get( - `search/${instance.id}` - ); + const currentInstance = + await ADCPPSocket.get( + `search/${instance.id}` + ); // Send the instance to the client - await namespacedInstance.emit("searchesSent", { - searchInfo, - }); + await namespacedInstance.emit( + "searchesSent", + { + searchInfo, + } + ); if (currentInstance.result_count === 0) { console.log("No more search results."); - namespacedInstance.emit("searchComplete", { - message: "No more search results.", - }); + namespacedInstance.emit( + "searchComplete", + { + message: + "No more search results.", + } + ); } }, instance.id ); // Perform the actual search - await ADCPPSocket.post(`search/${instance.id}/hub_search`, query); + await ADCPPSocket.post( + `search/${instance.id}/hub_search`, + query + ); } catch (error) { - await namespacedInstance.emit("searchError", error.message); - throw new MoleculerError("Search failed", 500, "SEARCH_FAILED", { - error, - }); + await namespacedInstance.emit( + "searchError", + error.message + ); + throw new MoleculerError( + "Search failed", + 500, + "SEARCH_FAILED", + { + error, + } + ); } finally { // await ADCPPSocket.disconnect(); } @@ -222,7 +271,10 @@ export default class SocketService extends Service { "Download and metadata update successful", bundleDBImportResult ); - this.broker.emit("downloadCompleted", bundleDBImportResult); + this.broker.emit( + "downloadCompleted", + bundleDBImportResult + ); return bundleDBImportResult; } else { throw new Error( @@ -231,9 +283,14 @@ export default class SocketService extends Service { } } catch (error) { this.broker.emit("downloadError", error.message); - throw new MoleculerError("Download failed", 500, "DOWNLOAD_FAILED", { - error, - }); + throw new MoleculerError( + "Download failed", + 500, + "DOWNLOAD_FAILED", + { + error, + } + ); } finally { // await ADCPPSocket.disconnect(); } @@ -253,7 +310,10 @@ export default class SocketService extends Service { "queue", "queue_bundle_tick", (tickData) => { - console.log("Received tick data: ", tickData); + console.log( + "Received tick data: ", + tickData + ); this.io.emit("bundleTickUpdate", tickData); }, null @@ -272,8 +332,33 @@ export default class SocketService extends Service { sleep: (ms: number): Promise => { return new Promise((resolve) => setTimeout(resolve, ms)); }, + handleSocketConnection: async function (socket: any) { + this.logger.info(`Socket connected with session ID: ${socket.id}`); + console.log("Looking up sessionId in Mongo..."); + + const sessionIdExists = await Session.find({ + sessionId: socket.handshake.query.sessionId, + }); + + if (sessionIdExists.length === 0) { + console.log(`Socket Id ${socket.id} not found in Mongo, creating a new session...`); + const sessionId = uuidv4(); + socket.sessionId = sessionId; + console.log(`Saving session ${sessionId} to Mongo...`); + await Session.create({ + sessionId, + socketId: socket.id, + }); + socket.emit("sessionInitialized", sessionId); + } else { + console.log(`Found socketId ${socket.id}, no-op.`); + } + } }, async started() { + this.io.of("/manual").on("connection", async (socket) => { + console.log(`socket.io server connected to /manual namespace`); + }) this.io.on("connection", async (socket) => { console.log( `socket.io server connected to client with session ID: ${socket.id}`