🧦 Changes to socket service to support UI

This commit is contained in:
2025-05-18 20:46:37 -04:00
parent 8b8f470f52
commit 7313fc4df7

View File

@@ -33,6 +33,11 @@ export default class SocketService extends Service {
}, },
}, },
}, },
"/manual": {
events: {
call: { whitelist: ["socket.*"] },
},
},
}, },
options: { options: {
adapter: createAdapter(pubClient, subClient), adapter: createAdapter(pubClient, subClient),
@@ -62,8 +67,12 @@ export default class SocketService extends Service {
if (active > 0 || paused > 0 || waiting > 0) { if (active > 0 || paused > 0 || waiting > 0) {
// 3. Get job counts // 3. Get job counts
const completedJobCount = await pubClient.get("completedJobCount"); const completedJobCount = await pubClient.get(
const failedJobCount = await pubClient.get("failedJobCount"); "completedJobCount"
);
const failedJobCount = await pubClient.get(
"failedJobCount"
);
// 4. Send the counts to the active socket.io session // 4. Send the counts to the active socket.io session
await this.broker.call("socket.broadcast", { await this.broker.call("socket.broadcast", {
@@ -80,9 +89,14 @@ export default class SocketService extends Service {
} }
} }
} catch (err) { } catch (err) {
throw new MoleculerError(err, 500, "SESSION_ID_NOT_FOUND", { throw new MoleculerError(
data: sessionId, err,
}); 500,
"SESSION_ID_NOT_FOUND",
{
data: sessionId,
}
);
} }
}, },
@@ -93,7 +107,11 @@ export default class SocketService extends Service {
}> }>
) => { ) => {
const { queueAction } = ctx.params; const { queueAction } = ctx.params;
await this.broker.call("jobqueue.toggle", { action: queueAction }, {}); await this.broker.call(
"jobqueue.toggle",
{ action: queueAction },
{}
);
}, },
importSingleIssue: async (ctx: Context<{}>) => { importSingleIssue: async (ctx: Context<{}>) => {
console.info("AirDC++ finished a download -> "); console.info("AirDC++ finished a download -> ");
@@ -112,11 +130,15 @@ export default class SocketService extends Service {
}, },
async handler(ctx) { async handler(ctx) {
const { query, config, namespace } = ctx.params; const { query, config, namespace } = ctx.params;
console.log("NAMESPACE", namespace)
const namespacedInstance = this.io.of(namespace || "/"); const namespacedInstance = this.io.of(namespace || "/");
const ADCPPSocket = new AirDCPPSocket(config); const ADCPPSocket = new AirDCPPSocket(config);
try { try {
await ADCPPSocket.connect(); await ADCPPSocket.connect();
const instance = await ADCPPSocket.post("search", query); const instance = await ADCPPSocket.post(
"search",
query
);
// Send the instance to the client // Send the instance to the client
await namespacedInstance.emit("searchInitiated", { await namespacedInstance.emit("searchInitiated", {
@@ -128,8 +150,13 @@ export default class SocketService extends Service {
`search`, `search`,
`search_result_added`, `search_result_added`,
(groupedResult) => { (groupedResult) => {
console.log(JSON.stringify(groupedResult, null, 4)); console.log(
namespacedInstance.emit("searchResultAdded", groupedResult); JSON.stringify(groupedResult, null, 4)
);
namespacedInstance.emit(
"searchResultAdded",
groupedResult
);
}, },
instance.id instance.id
); );
@@ -138,7 +165,10 @@ export default class SocketService extends Service {
`search`, `search`,
`search_result_updated`, `search_result_updated`,
(updatedResult) => { (updatedResult) => {
namespacedInstance.emit("searchResultUpdated", updatedResult); namespacedInstance.emit(
"searchResultUpdated",
updatedResult
);
}, },
instance.id instance.id
); );
@@ -148,30 +178,49 @@ export default class SocketService extends Service {
`search_hub_searches_sent`, `search_hub_searches_sent`,
async (searchInfo) => { async (searchInfo) => {
await this.sleep(5000); await this.sleep(5000);
const currentInstance = await ADCPPSocket.get( const currentInstance =
`search/${instance.id}` await ADCPPSocket.get(
); `search/${instance.id}`
);
// Send the instance to the client // Send the instance to the client
await namespacedInstance.emit("searchesSent", { await namespacedInstance.emit(
searchInfo, "searchesSent",
}); {
searchInfo,
}
);
if (currentInstance.result_count === 0) { if (currentInstance.result_count === 0) {
console.log("No more search results."); console.log("No more search results.");
namespacedInstance.emit("searchComplete", { namespacedInstance.emit(
message: "No more search results.", "searchComplete",
}); {
message:
"No more search results.",
}
);
} }
}, },
instance.id instance.id
); );
// Perform the actual search // Perform the actual search
await ADCPPSocket.post(`search/${instance.id}/hub_search`, query); await ADCPPSocket.post(
`search/${instance.id}/hub_search`,
query
);
} catch (error) { } catch (error) {
await namespacedInstance.emit("searchError", error.message); await namespacedInstance.emit(
throw new MoleculerError("Search failed", 500, "SEARCH_FAILED", { "searchError",
error, error.message
}); );
throw new MoleculerError(
"Search failed",
500,
"SEARCH_FAILED",
{
error,
}
);
} finally { } finally {
// await ADCPPSocket.disconnect(); // await ADCPPSocket.disconnect();
} }
@@ -222,7 +271,10 @@ export default class SocketService extends Service {
"Download and metadata update successful", "Download and metadata update successful",
bundleDBImportResult bundleDBImportResult
); );
this.broker.emit("downloadCompleted", bundleDBImportResult); this.broker.emit(
"downloadCompleted",
bundleDBImportResult
);
return bundleDBImportResult; return bundleDBImportResult;
} else { } else {
throw new Error( throw new Error(
@@ -231,9 +283,14 @@ export default class SocketService extends Service {
} }
} catch (error) { } catch (error) {
this.broker.emit("downloadError", error.message); this.broker.emit("downloadError", error.message);
throw new MoleculerError("Download failed", 500, "DOWNLOAD_FAILED", { throw new MoleculerError(
error, "Download failed",
}); 500,
"DOWNLOAD_FAILED",
{
error,
}
);
} finally { } finally {
// await ADCPPSocket.disconnect(); // await ADCPPSocket.disconnect();
} }
@@ -253,7 +310,10 @@ export default class SocketService extends Service {
"queue", "queue",
"queue_bundle_tick", "queue_bundle_tick",
(tickData) => { (tickData) => {
console.log("Received tick data: ", tickData); console.log(
"Received tick data: ",
tickData
);
this.io.emit("bundleTickUpdate", tickData); this.io.emit("bundleTickUpdate", tickData);
}, },
null null
@@ -272,8 +332,33 @@ export default class SocketService extends Service {
sleep: (ms: number): Promise<NodeJS.Timeout> => { sleep: (ms: number): Promise<NodeJS.Timeout> => {
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
}, },
handleSocketConnection: async function (socket: any) {
this.logger.info(`Socket connected with session ID: ${socket.id}`);
console.log("Looking up sessionId in Mongo...");
const sessionIdExists = await Session.find({
sessionId: socket.handshake.query.sessionId,
});
if (sessionIdExists.length === 0) {
console.log(`Socket Id ${socket.id} not found in Mongo, creating a new session...`);
const sessionId = uuidv4();
socket.sessionId = sessionId;
console.log(`Saving session ${sessionId} to Mongo...`);
await Session.create({
sessionId,
socketId: socket.id,
});
socket.emit("sessionInitialized", sessionId);
} else {
console.log(`Found socketId ${socket.id}, no-op.`);
}
}
}, },
async started() { async started() {
this.io.of("/manual").on("connection", async (socket) => {
console.log(`socket.io server connected to /manual namespace`);
})
this.io.on("connection", async (socket) => { this.io.on("connection", async (socket) => {
console.log( console.log(
`socket.io server connected to client with session ID: ${socket.id}` `socket.io server connected to client with session ID: ${socket.id}`