🔧 Tooling for resumable socket.io sessions
This commit is contained in:
@@ -1,5 +1,4 @@
|
|||||||
const mongoose = require("mongoose");
|
const mongoose = require("mongoose");
|
||||||
const paginate = require("mongoose-paginate-v2");
|
|
||||||
|
|
||||||
const JobResultScehma = mongoose.Schema({
|
const JobResultScehma = mongoose.Schema({
|
||||||
id: Number,
|
id: Number,
|
||||||
|
|||||||
9
models/session.model.ts
Normal file
9
models/session.model.ts
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
const mongoose = require("mongoose");
|
||||||
|
|
||||||
|
const SessionScehma = mongoose.Schema({
|
||||||
|
sessionId: String,
|
||||||
|
socketId: String,
|
||||||
|
});
|
||||||
|
|
||||||
|
const Session = mongoose.model("Session", SessionScehma);
|
||||||
|
export default Session;
|
||||||
3994
package-lock.json
generated
3994
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -36,7 +36,8 @@
|
|||||||
"npm": "^8.4.1",
|
"npm": "^8.4.1",
|
||||||
"ts-jest": "^29.0.5",
|
"ts-jest": "^29.0.5",
|
||||||
"ts-node": "^10.9.1",
|
"ts-node": "^10.9.1",
|
||||||
"typescript": "^5.0.2"
|
"typescript": "^5.0.2",
|
||||||
|
"uuid": "^9.0.0"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
|
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
|
||||||
@@ -66,7 +67,6 @@
|
|||||||
"leven": "^3.1.0",
|
"leven": "^3.1.0",
|
||||||
"lodash": "^4.17.21",
|
"lodash": "^4.17.21",
|
||||||
"mkdirp": "^0.5.5",
|
"mkdirp": "^0.5.5",
|
||||||
"moleculer-bull": "github:rishighan/moleculer-bull#1.0.0",
|
|
||||||
"moleculer-db": "^0.8.23",
|
"moleculer-db": "^0.8.23",
|
||||||
"moleculer-db-adapter-mongoose": "^0.9.2",
|
"moleculer-db-adapter-mongoose": "^0.9.2",
|
||||||
"moleculer-io": "^2.2.0",
|
"moleculer-io": "^2.2.0",
|
||||||
|
|||||||
@@ -4,14 +4,14 @@ import {
|
|||||||
ServiceBroker,
|
ServiceBroker,
|
||||||
ServiceSchema,
|
ServiceSchema,
|
||||||
} from "moleculer";
|
} from "moleculer";
|
||||||
const MoleculerError = require("moleculer").Errors;
|
const { MoleculerError } = require("moleculer").Errors;
|
||||||
import JobResult from "../models/jobresult.model";
|
import JobResult from "../models/jobresult.model";
|
||||||
import { refineQuery } from "filename-parser";
|
import { refineQuery } from "filename-parser";
|
||||||
import BullMqMixin, { BullMQAdapter, Queue } from 'moleculer-bullmq';
|
import BullMqMixin, { BullMQAdapter, Queue } from 'moleculer-bullmq';
|
||||||
import { extractFromArchive } from "../utils/uncompression.utils";
|
import { extractFromArchive } from "../utils/uncompression.utils";
|
||||||
import { isNil, isUndefined } from "lodash";
|
import { isNil, isUndefined } from "lodash";
|
||||||
|
|
||||||
|
console.log(process.env.REDIS_URI);
|
||||||
export default class JobQueueService extends Service {
|
export default class JobQueueService extends Service {
|
||||||
public constructor(public broker: ServiceBroker) {
|
public constructor(public broker: ServiceBroker) {
|
||||||
super(broker);
|
super(broker);
|
||||||
@@ -150,7 +150,7 @@ export default class JobQueueService extends Service {
|
|||||||
};
|
};
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error(`An error occurred processing Job ID ${ctx.locals.job.id}`);
|
console.error(`An error occurred processing Job ID ${ctx.locals.job.id}`);
|
||||||
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", ctx.params.socketSessionId);
|
throw new MoleculerError(error, 500, "IMPORT_JOB_ERROR", {data: ctx.params.socketSessionId});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -154,7 +154,6 @@ export default class ImportService extends Service {
|
|||||||
async handler(
|
async handler(
|
||||||
ctx: Context<{
|
ctx: Context<{
|
||||||
extractionOptions?: any;
|
extractionOptions?: any;
|
||||||
socketSessionId: String,
|
|
||||||
}>
|
}>
|
||||||
) {
|
) {
|
||||||
try {
|
try {
|
||||||
@@ -204,7 +203,6 @@ export default class ImportService extends Service {
|
|||||||
fileSize: item.stats.size,
|
fileSize: item.stats.size,
|
||||||
},
|
},
|
||||||
importType: "new",
|
importType: "new",
|
||||||
socketSessionId: ctx.params.socketSessionId,
|
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
console.log(
|
console.log(
|
||||||
|
|||||||
@@ -2,8 +2,9 @@
|
|||||||
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
|
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
|
||||||
import { createClient } from "redis";
|
import { createClient } from "redis";
|
||||||
import { createAdapter } from "@socket.io/redis-adapter";
|
import { createAdapter } from "@socket.io/redis-adapter";
|
||||||
|
import Session from "../models/session.model";
|
||||||
const SocketIOService = require("moleculer-io");
|
const SocketIOService = require("moleculer-io");
|
||||||
|
const { v4: uuidv4 } = require("uuid");
|
||||||
const redisURL = new URL(process.env.REDIS_URI);
|
const redisURL = new URL(process.env.REDIS_URI);
|
||||||
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
|
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
|
||||||
(async () => {
|
(async () => {
|
||||||
@@ -33,6 +34,11 @@ export default class SocketService extends Service {
|
|||||||
},
|
},
|
||||||
action: async (data) => {
|
action: async (data) => {
|
||||||
switch (data.type) {
|
switch (data.type) {
|
||||||
|
case "RESUME_SESSION":
|
||||||
|
console.log("Attempting to resume session...")
|
||||||
|
|
||||||
|
break;
|
||||||
|
|
||||||
case "LS_IMPORT":
|
case "LS_IMPORT":
|
||||||
console.log(
|
console.log(
|
||||||
`Recieved ${data.type} event.`
|
`Recieved ${data.type} event.`
|
||||||
@@ -85,10 +91,23 @@ export default class SocketService extends Service {
|
|||||||
|
|
||||||
},
|
},
|
||||||
async started() {
|
async started() {
|
||||||
this.io.on("connection", (socket) => {
|
this.io.on("connection", async (socket) => {
|
||||||
console.log(`socket.io server initialized with session ID: ${socket.id}`);
|
console.log(socket);
|
||||||
socket.emit("sessionId", socket.id);
|
console.log(`socket.io server connected to client with session ID: ${socket.id}`);
|
||||||
socketSessionId = socket.id;
|
console.log("Looking up sessionId in Mongo...");
|
||||||
|
const sessionIdExists = await Session.find({ sessionId: socket.handshake.query.sessionId });
|
||||||
|
if(sessionIdExists.length === 0) {
|
||||||
|
console.log(`Socket Id ${socket.id} not found in Mongo, creating a new session...`);
|
||||||
|
const sessionId = uuidv4();
|
||||||
|
socket.sessionId = sessionId;
|
||||||
|
console.log(`Saving session ${sessionId} to Mongo...`);
|
||||||
|
await Session.create({
|
||||||
|
sessionId,
|
||||||
|
socketId: socket.id,
|
||||||
|
});
|
||||||
|
socket.emit("sessionInitialized", sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user