🔌 reworked socket.io freeform with moleculer-io
This commit is contained in:
@@ -1,19 +1,16 @@
|
||||
import { Service, ServiceBroker, Context } from "moleculer";
|
||||
import ApiGateway from "moleculer-web";
|
||||
import chokidar from "chokidar";
|
||||
import path from "path";
|
||||
import fs from "fs";
|
||||
import { IExtractionOptions, IFolderData } from "threetwo-ui-typings";
|
||||
import { SocketIOMixin } from "../mixins/socket.io.mixin";
|
||||
import { debounce } from "lodash";
|
||||
export const io = SocketIOMixin();
|
||||
import { Service, ServiceBroker } from "moleculer";
|
||||
import ApiGateway from "moleculer-web";
|
||||
import path from "path";
|
||||
import { IFolderData } from "threetwo-ui-typings";
|
||||
|
||||
export default class ApiService extends Service {
|
||||
public constructor(broker: ServiceBroker) {
|
||||
super(broker);
|
||||
this.parseServiceSchema({
|
||||
name: "api",
|
||||
mixins: [ApiGateway, SocketIOMixin],
|
||||
mixins: [ApiGateway],
|
||||
// More info about settings: https://moleculer.services/docs/0.14/moleculer-web.html
|
||||
settings: {
|
||||
port: process.env.PORT || 3000,
|
||||
@@ -81,148 +78,114 @@ export default class ApiService extends Service {
|
||||
},
|
||||
},
|
||||
events: {
|
||||
"**"(payload, sender, event) {
|
||||
if (io)
|
||||
io.emit("event", {
|
||||
sender,
|
||||
event,
|
||||
payload,
|
||||
});
|
||||
},
|
||||
|
||||
},
|
||||
|
||||
methods: {},
|
||||
started(): any {
|
||||
// Add a connect listener
|
||||
io.on("connection", (client) => {
|
||||
console.log("Client connected via websocket!");
|
||||
|
||||
client.on("action", async (action) => {
|
||||
switch (action.type) {
|
||||
case "LS_IMPORT":
|
||||
// 1. Send task to queue
|
||||
console.log(`Recieved ${action.type} event.`);
|
||||
await this.broker.call(
|
||||
"library.newImport",
|
||||
action.data,
|
||||
{}
|
||||
);
|
||||
break;
|
||||
case "LS_TOGGLE_IMPORT_QUEUE":
|
||||
await this.broker.call(
|
||||
"importqueue.toggleImportQueue",
|
||||
action.data,
|
||||
{}
|
||||
);
|
||||
break;
|
||||
}
|
||||
});
|
||||
// Add a disconnect listener
|
||||
client.on("disconnect", () => {
|
||||
console.log("Client disconnected");
|
||||
});
|
||||
|
||||
// Filewatcher
|
||||
const fileWatcher = chokidar.watch(
|
||||
path.resolve("/comics"),
|
||||
{
|
||||
ignored: (filePath) =>
|
||||
path.extname(filePath) === ".dctmp",
|
||||
persistent: true,
|
||||
usePolling: true,
|
||||
interval: 5000,
|
||||
ignoreInitial: true,
|
||||
followSymlinks: true,
|
||||
atomic: true,
|
||||
awaitWriteFinish: {
|
||||
stabilityThreshold: 2000,
|
||||
pollInterval: 100,
|
||||
},
|
||||
}
|
||||
);
|
||||
const fileCopyDelaySeconds = 3;
|
||||
const checkEnd = (path, prev) => {
|
||||
fs.stat(path, async (err, stat) => {
|
||||
// Filewatcher
|
||||
const fileWatcher = chokidar.watch(
|
||||
path.resolve("/comics"),
|
||||
{
|
||||
ignored: (filePath) =>
|
||||
path.extname(filePath) === ".dctmp",
|
||||
persistent: true,
|
||||
usePolling: true,
|
||||
interval: 5000,
|
||||
ignoreInitial: true,
|
||||
followSymlinks: true,
|
||||
atomic: true,
|
||||
awaitWriteFinish: {
|
||||
stabilityThreshold: 2000,
|
||||
pollInterval: 100,
|
||||
},
|
||||
}
|
||||
);
|
||||
const fileCopyDelaySeconds = 3;
|
||||
const checkEnd = (path, prev) => {
|
||||
fs.stat(path, async (err, stat) => {
|
||||
// Replace error checking with something appropriate for your app.
|
||||
if (err) throw err;
|
||||
if (stat.mtime.getTime() === prev.mtime.getTime()) {
|
||||
console.log("finished");
|
||||
// Move on: call whatever needs to be called to process the file.
|
||||
console.log(
|
||||
"File detected, starting import..."
|
||||
);
|
||||
const walkedFolder: IFolderData =
|
||||
await broker.call("library.walkFolders", {
|
||||
basePathToWalk: path,
|
||||
});
|
||||
await this.broker.call(
|
||||
"importqueue.processImport",
|
||||
{
|
||||
fileObject: {
|
||||
filePath: path,
|
||||
fileSize: walkedFolder[0].fileSize,
|
||||
},
|
||||
}
|
||||
);
|
||||
} else
|
||||
setTimeout(
|
||||
checkEnd,
|
||||
fileCopyDelaySeconds,
|
||||
path,
|
||||
stat
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
fileWatcher
|
||||
.once("add", (path, stats) => {
|
||||
console.log("Watcher detected new files.");
|
||||
console.log(
|
||||
`File ${path} has been added with stats: ${JSON.stringify(
|
||||
stats,
|
||||
null,
|
||||
2
|
||||
)}`
|
||||
);
|
||||
|
||||
console.log("File", path, "has been added");
|
||||
|
||||
fs.stat(path, function(err, stat) {
|
||||
// Replace error checking with something appropriate for your app.
|
||||
if (err) throw err;
|
||||
if (stat.mtime.getTime() === prev.mtime.getTime()) {
|
||||
console.log("finished");
|
||||
// Move on: call whatever needs to be called to process the file.
|
||||
console.log(
|
||||
"File detected, starting import..."
|
||||
);
|
||||
const walkedFolder: IFolderData =
|
||||
await broker.call("library.walkFolders", {
|
||||
basePathToWalk: path,
|
||||
});
|
||||
await this.broker.call(
|
||||
"importqueue.processImport",
|
||||
{
|
||||
fileObject: {
|
||||
filePath: path,
|
||||
fileSize: walkedFolder[0].fileSize,
|
||||
},
|
||||
}
|
||||
);
|
||||
} else
|
||||
setTimeout(
|
||||
checkEnd,
|
||||
fileCopyDelaySeconds,
|
||||
path,
|
||||
stat
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
fileWatcher
|
||||
.once("add", (path, stats) => {
|
||||
console.log("Watcher detected new files.");
|
||||
console.log(
|
||||
`File ${path} has been added with stats: ${JSON.stringify(
|
||||
stats,
|
||||
null,
|
||||
2
|
||||
)}`
|
||||
setTimeout(
|
||||
checkEnd,
|
||||
fileCopyDelaySeconds,
|
||||
path,
|
||||
stat
|
||||
);
|
||||
});
|
||||
})
|
||||
// .once(
|
||||
// "change",
|
||||
|
||||
console.log("File", path, "has been added");
|
||||
// (path, stats) =>
|
||||
// console.log(
|
||||
// `File ${path} has been changed. Stats: ${JSON.stringify(
|
||||
// stats,
|
||||
// null,
|
||||
// 2
|
||||
// )}`
|
||||
// )
|
||||
// )
|
||||
.once(
|
||||
"unlink",
|
||||
|
||||
fs.stat(path, function (err, stat) {
|
||||
// Replace error checking with something appropriate for your app.
|
||||
if (err) throw err;
|
||||
setTimeout(
|
||||
checkEnd,
|
||||
fileCopyDelaySeconds,
|
||||
path,
|
||||
stat
|
||||
);
|
||||
});
|
||||
})
|
||||
// .once(
|
||||
// "change",
|
||||
(path) =>
|
||||
console.log(`File ${path} has been removed`)
|
||||
)
|
||||
.once(
|
||||
"addDir",
|
||||
|
||||
// (path, stats) =>
|
||||
// console.log(
|
||||
// `File ${path} has been changed. Stats: ${JSON.stringify(
|
||||
// stats,
|
||||
// null,
|
||||
// 2
|
||||
// )}`
|
||||
// )
|
||||
// )
|
||||
.once(
|
||||
"unlink",
|
||||
(path) =>
|
||||
console.log(`Directory ${path} has been added`)
|
||||
);
|
||||
|
||||
(path) =>
|
||||
console.log(`File ${path} has been removed`)
|
||||
)
|
||||
.once(
|
||||
"addDir",
|
||||
|
||||
(path) =>
|
||||
console.log(`Directory ${path} has been added`)
|
||||
);
|
||||
});
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
SOFTWARE.
|
||||
*/
|
||||
|
||||
/*
|
||||
@@ -33,24 +33,21 @@ SOFTWARE.
|
||||
|
||||
"use strict";
|
||||
|
||||
import { refineQuery } from "filename-parser";
|
||||
import {
|
||||
Context,
|
||||
Service,
|
||||
ServiceBroker,
|
||||
ServiceSchema,
|
||||
Errors,
|
||||
ServiceSchema
|
||||
} from "moleculer";
|
||||
|
||||
import BullMQMixin from "moleculer-bull";
|
||||
import { SandboxedJob } from "moleculer-bull";
|
||||
import BullMQMixin, { SandboxedJob } from "moleculer-bull";
|
||||
import { DbMixin } from "../mixins/db.mixin";
|
||||
import Comic from "../models/comic.model";
|
||||
import { extractFromArchive } from "../utils/uncompression.utils";
|
||||
import { refineQuery } from "filename-parser";
|
||||
import { io } from "./api.service";
|
||||
import { USERDATA_DIRECTORY } from "../constants/directories";
|
||||
import { IExtractedComicBookCoverFile } from "threetwo-ui-typings";
|
||||
|
||||
const REDIS_URI = process.env.REDIS_URI || `redis://localhost:6379`;
|
||||
const EventEmitter = require("events");
|
||||
EventEmitter.defaultMaxListeners = 20;
|
||||
|
||||
console.log(`REDIS -> ${REDIS_URI}`);
|
||||
export default class QueueService extends Service {
|
||||
@@ -59,9 +56,13 @@ export default class QueueService extends Service {
|
||||
schema: ServiceSchema<{}> = { name: "importqueue" }
|
||||
) {
|
||||
super(broker);
|
||||
console.log(this.io);
|
||||
this.parseServiceSchema({
|
||||
name: "importqueue",
|
||||
mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
|
||||
mixins: [
|
||||
BullMQMixin(REDIS_URI),
|
||||
DbMixin("comics", Comic),
|
||||
],
|
||||
settings: {},
|
||||
hooks: {},
|
||||
queues: {
|
||||
@@ -75,7 +76,7 @@ export default class QueueService extends Service {
|
||||
const result = await extractFromArchive(
|
||||
job.data.fileObject.filePath
|
||||
);
|
||||
|
||||
|
||||
const {
|
||||
name,
|
||||
filePath,
|
||||
@@ -177,45 +178,42 @@ export default class QueueService extends Service {
|
||||
unarchiveComicBook: {
|
||||
rest: "POST /unarchiveComicBook",
|
||||
params: {},
|
||||
handler: async (ctx: Context<{}>) => {},
|
||||
handler: async (ctx: Context<{}>) => { },
|
||||
},
|
||||
},
|
||||
methods: {},
|
||||
async started(): Promise<any> {
|
||||
io.on("connection", async (client) => {
|
||||
await this.getQueue("process.import").on(
|
||||
"failed",
|
||||
async (job, error) => {
|
||||
console.error(
|
||||
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
|
||||
);
|
||||
console.error(job.data);
|
||||
}
|
||||
);
|
||||
await this.getQueue("process.import").on(
|
||||
"completed",
|
||||
async (job, res) => {
|
||||
client.emit("action", {
|
||||
type: "LS_COVER_EXTRACTED",
|
||||
result: res,
|
||||
});
|
||||
console.info(
|
||||
`Job with the id '${job.id}' completed.`
|
||||
);
|
||||
}
|
||||
);
|
||||
await this.getQueue("process.import").on(
|
||||
"stalled",
|
||||
async (job) => {
|
||||
console.warn(
|
||||
`The job with the id '${job.id} got stalled!`
|
||||
);
|
||||
console.log(`${JSON.stringify(job, null, 2)}`);
|
||||
console.log(`is stalled.`);
|
||||
}
|
||||
);
|
||||
await this.getQueue("process.import").on(
|
||||
"failed",
|
||||
async (job, error) => {
|
||||
console.error(
|
||||
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
|
||||
);
|
||||
console.error(job.data);
|
||||
}
|
||||
);
|
||||
await this.getQueue("process.import").on(
|
||||
"completed",
|
||||
async (job, res) => {
|
||||
await this.broker.call('socket.broadcast', {
|
||||
namespace: '/', //optional
|
||||
event: "action",
|
||||
args: [{ type: "LS_COVER_EXTRACTED", result: res }], //optional
|
||||
|
||||
});
|
||||
})
|
||||
console.info(`Job with the id '${job.id}' completed.`);
|
||||
}
|
||||
);
|
||||
await this.getQueue("process.import").on(
|
||||
"stalled",
|
||||
async (job) => {
|
||||
console.warn(
|
||||
`The job with the id '${job.id} got stalled!`
|
||||
);
|
||||
console.log(`${JSON.stringify(job, null, 2)}`);
|
||||
console.log(`is stalled.`);
|
||||
}
|
||||
);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
78
services/socket.service.ts
Normal file
78
services/socket.service.ts
Normal file
@@ -0,0 +1,78 @@
|
||||
"use strict";
|
||||
import {
|
||||
Service,
|
||||
ServiceBroker,
|
||||
ServiceSchema
|
||||
} from "moleculer";
|
||||
const redisAdapter = require("socket.io-redis");
|
||||
const SocketIOService = require("moleculer-io");
|
||||
|
||||
export default class SocketService extends Service {
|
||||
// @ts-ignore
|
||||
public constructor(
|
||||
public broker: ServiceBroker,
|
||||
schema: ServiceSchema<{}> = { name: "socket" }
|
||||
) {
|
||||
super(broker);
|
||||
this.parseServiceSchema(
|
||||
Service.mergeSchemas(
|
||||
{
|
||||
name: "socket",
|
||||
mixins: [SocketIOService],
|
||||
settings: {
|
||||
port: process.env.PORT || 3001,
|
||||
io: {
|
||||
namespaces: {
|
||||
"/": {
|
||||
events: {
|
||||
call: {
|
||||
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
|
||||
},
|
||||
action: async (data, ack) => {
|
||||
// write your handler function here.
|
||||
console.log(
|
||||
JSON.stringify(data, null, 2)
|
||||
);
|
||||
|
||||
switch (data.type) {
|
||||
case "LS_IMPORT":
|
||||
console.log(
|
||||
`Recieved ${data.type} event.`
|
||||
);
|
||||
// 1. Send task to queue
|
||||
await this.broker.call(
|
||||
"library.newImport",
|
||||
data.data,
|
||||
{}
|
||||
);
|
||||
break;
|
||||
case "LS_TOGGLE_IMPORT_QUEUE":
|
||||
await this.broker.call(
|
||||
"importqueue.toggleImportQueue",
|
||||
data.data,
|
||||
{}
|
||||
);
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
options: {
|
||||
adapter: redisAdapter({ host: 'localhost', port: 6379 }),
|
||||
},
|
||||
},
|
||||
},
|
||||
hooks: {},
|
||||
actions: {},
|
||||
methods: {},
|
||||
async started() {
|
||||
this.io.on("connection", (data) => console.log("Connected to socket.io server."))
|
||||
}
|
||||
},
|
||||
schema
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user