🚥 Socket.io integration WIP

This commit is contained in:
2021-05-27 10:12:00 -07:00
parent 1fb44c31f0
commit ef4e1f75b0
4 changed files with 164 additions and 68 deletions

42
package-lock.json generated
View File

@@ -2224,6 +2224,11 @@
"delayed-stream": "~1.0.0" "delayed-stream": "~1.0.0"
} }
}, },
"component-bind": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/component-bind/-/component-bind-1.0.0.tgz",
"integrity": "sha1-AMYIq33Nk4l8AAllGx06jh5zu9E="
},
"component-emitter": { "component-emitter": {
"version": "1.3.0", "version": "1.3.0",
"resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.3.0.tgz", "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.3.0.tgz",
@@ -5697,6 +5702,19 @@
"integrity": "sha1-nbe1lJatPzz+8wp1FC0tkwrXJlE=", "integrity": "sha1-nbe1lJatPzz+8wp1FC0tkwrXJlE=",
"dev": true "dev": true
}, },
"json-stream-stringify": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/json-stream-stringify/-/json-stream-stringify-2.0.3.tgz",
"integrity": "sha512-Ry+1rZE1YVKlCMG1emCiReP/OfZrFrEGZn6TC7NPpIGO9NXf0KEqqBL0flgt2J59EiRPv+CKi7S7v31RAHrdXw=="
},
"json-streamify": {
"version": "0.1.4",
"resolved": "https://registry.npmjs.org/json-streamify/-/json-streamify-0.1.4.tgz",
"integrity": "sha1-HBgHSnAJv3vS0hRDzEJbmGTNCJ0=",
"requires": {
"traverse": ">=0.2.6"
}
},
"json-stringify-safe": { "json-stringify-safe": {
"version": "5.0.1", "version": "5.0.1",
"resolved": "https://registry.npmjs.org/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz", "resolved": "https://registry.npmjs.org/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz",
@@ -8257,6 +8275,30 @@
"debug": "~4.3.1" "debug": "~4.3.1"
} }
}, },
"socket.io-stream": {
"version": "0.9.1",
"resolved": "https://registry.npmjs.org/socket.io-stream/-/socket.io-stream-0.9.1.tgz",
"integrity": "sha1-QhJYMWKIuDrGk7DUPv0J1tQ6upc=",
"requires": {
"component-bind": "~1.0.0",
"debug": "~2.2.0"
},
"dependencies": {
"debug": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-2.2.0.tgz",
"integrity": "sha1-+HBX6ZWxofauaklgZkE3vFbwOdo=",
"requires": {
"ms": "0.7.1"
}
},
"ms": {
"version": "0.7.1",
"resolved": "https://registry.npmjs.org/ms/-/ms-0.7.1.tgz",
"integrity": "sha1-nNE8A62/8ltl7/3nzoZO6VIBcJg="
}
}
},
"sonic-boom": { "sonic-boom": {
"version": "1.4.1", "version": "1.4.1",
"resolved": "https://registry.npmjs.org/sonic-boom/-/sonic-boom-1.4.1.tgz", "resolved": "https://registry.npmjs.org/sonic-boom/-/sonic-boom-1.4.1.tgz",

View File

@@ -42,6 +42,8 @@
"fs-extra": "^10.0.0", "fs-extra": "^10.0.0",
"highland": "^2.13.5", "highland": "^2.13.5",
"highland-json": "^1.4.1", "highland-json": "^1.4.1",
"json-stream-stringify": "^2.0.3",
"json-streamify": "^0.1.4",
"lodash": "^4.17.21", "lodash": "^4.17.21",
"moleculer": "^0.14.0", "moleculer": "^0.14.0",
"moleculer-db": "^0.8.4", "moleculer-db": "^0.8.4",
@@ -57,6 +59,7 @@
"pino-pretty": "^4.7.1", "pino-pretty": "^4.7.1",
"sharp": "^0.28.1", "sharp": "^0.28.1",
"socket.io": "^4.1.1", "socket.io": "^4.1.1",
"socket.io-stream": "^0.9.1",
"stream-chain": "^2.2.4", "stream-chain": "^2.2.4",
"stream-json": "^1.7.1", "stream-json": "^1.7.1",
"through2": "^4.0.2", "through2": "^4.0.2",

View File

@@ -7,6 +7,10 @@ import { getCovers, extractArchive } from "../utils/uncompression.utils";
import { map, flatten } from "lodash"; import { map, flatten } from "lodash";
import JSONStream from "JSONStream"; import JSONStream from "JSONStream";
const IO = require("socket.io")(); const IO = require("socket.io")();
const ss = require("socket.io-stream");
const JsonStreamStringify = require("json-stream-stringify");
import axios from "axios";
const { Writable, Readable } = require("stream");
export default class ApiService extends Service { export default class ApiService extends Service {
public constructor(broker: ServiceBroker) { public constructor(broker: ServiceBroker) {
@@ -30,7 +34,12 @@ export default class ApiService extends Service {
mergeParams: true, mergeParams: true,
autoAliases: true, autoAliases: true,
aliases: {}, aliases: {
async "POST getComicCovers"(req, res) {
const { extractionOptions, walkedFolders } =
req.body;
},
},
// Calling options. More info: https://moleculer.services/docs/0.14/moleculer-web.html#Calling-options // Calling options. More info: https://moleculer.services/docs/0.14/moleculer-web.html#Calling-options
callingOptions: {}, callingOptions: {},
@@ -86,21 +95,61 @@ export default class ApiService extends Service {
this.io.on("connection", (client) => { this.io.on("connection", (client) => {
this.logger.info("Client connected via websocket!"); this.logger.info("Client connected via websocket!");
client.on("call", ({ action, params, opts }, done) => { client.on(
this.logger.info( "call",
"Received request from client! Action:", async ({ action, params, opts }, done) => {
action, this.logger.info(
", Params:", "Received request from client! Action:",
params action,
); ", Params:",
params
);
const { extractionOptions, walkedFolders } = params;
const stream = ss.createStream();
switch (extractionOptions.extractionMode) {
case "bulk":
map(walkedFolders, async (folder, idx) => {
let foo = await extractArchive(
extractionOptions,
folder
);
this.broker let fo = new JsonStreamStringify({
.call("import." + action, params, opts) foo,
.then((res) => { });
client.emit("comicBookCoverMetadata", res);
}) client.emit("comicBookCoverMetadata", {
.catch((err) => this.logger.error(err)); data: foo,
}); status: "Done!",
});
});
// res.end();
case "single":
return await extractArchive(
extractionOptions,
walkedFolders[0]
);
default:
console.log(
"Unknown extraction mode selected."
);
return {
message:
"Unknown extraction mode selected.",
errorCode: "90",
data: `${extractionOptions}`,
};
}
// this.broker
// .call("import." + action, params, opts)
// .then((resp) => {
// // client.emit("comicBookCoverMetadata", resp);
// })
// .catch((err) => this.logger.error(err));
}
);
client.on("disconnect", () => { client.on("disconnect", () => {
this.logger.info("Client disconnected"); this.logger.info("Client disconnected");

View File

@@ -11,6 +11,7 @@ import {
} from "../utils/uncompression.utils"; } from "../utils/uncompression.utils";
import { import {
IExtractionOptions, IExtractionOptions,
IExtractedComicBookCoverFile,
IFolderData, IFolderData,
} from "../interfaces/folder.interface"; } from "../interfaces/folder.interface";
import axios from "axios"; import axios from "axios";
@@ -19,12 +20,14 @@ import through2 from "through2";
import oboe from "oboe"; import oboe from "oboe";
import H from "highland"; import H from "highland";
import { stringify } from "highland-json"; import { stringify } from "highland-json";
const JsonStreamStringify = require("json-stream-stringify");
const IO = require("socket.io")(); const IO = require("socket.io")();
const { chain } = require("stream-chain");
const { parser } = require("stream-json"); const { parser } = require("stream-json");
const { pick } = require("stream-json/filters/Pick"); const { pick } = require("stream-json/filters/Pick");
const { ignore } = require("stream-json/filters/Ignore"); const { ignore } = require("stream-json/filters/Ignore");
const { streamValues } = require("stream-json/streamers/StreamValues"); const { streamValues } = require("stream-json/streamers/StreamValues");
const StreamArray = require("stream-json/streamers/StreamArray");
export default class ProductsService extends Service { export default class ProductsService extends Service {
// @ts-ignore // @ts-ignore
@@ -63,58 +66,57 @@ export default class ProductsService extends Service {
); );
}, },
}, },
getComicCovers: { // getComicCovers: {
rest: "POST /getComicCovers", // rest: "POST /getComicCovers",
params: { // params: {
extractionOptions: "object", // extractionOptions: "object",
walkedFolders: "array", // walkedFolders: "array",
}, // },
async handler( // async handler(
ctx: Context<{ // ctx: Context<{
extractionOptions: IExtractionOptions; // extractionOptions: IExtractionOptions;
walkedFolders: IFolderData[]; // walkedFolders: IFolderData[];
}> // }>
) { // ) {
switch ( // switch (
ctx.params.extractionOptions.extractionMode // ctx.params.extractionOptions.extractionMode
) { // ) {
case "bulk": // case "bulk":
let rs = new Readable(); // map(
const extractedDataPromises = map( // ctx.params.walkedFolders,
ctx.params.walkedFolders, // async (folder, idx) => {
async (folder) => { // let foo = await extractArchive(
while (!isUndefined(folder)) { // ctx.params
let foo = // .extractionOptions,
await extractArchive( // folder
ctx.params // );
.extractionOptions, // // console.log("levar", foo);
folder // let jsonStream =
); // new JsonStreamStringify({
console.log("levar", foo); // foo,
rs.push(foo); // });
} // return jsonStream;
rs.push(null); // }
} // );
); //
// case "single":
case "single": // return await extractArchive(
return await extractArchive( // ctx.params.extractionOptions,
ctx.params.extractionOptions, // ctx.params.walkedFolders[0]
ctx.params.walkedFolders[0] // );
); // default:
default: // console.log(
console.log( // "Unknown extraction mode selected."
"Unknown extraction mode selected." // );
); // return {
return { // message:
message: // "Unknown extraction mode selected.",
"Unknown extraction mode selected.", // errorCode: "90",
errorCode: "90", // data: `${ctx.params.extractionOptions}`,
data: `${ctx.params.extractionOptions}`, // };
}; // }
} // },
}, // },
},
}, },
methods: {}, methods: {},
}, },