From ef4e1f75b007ee7c08145e26e08ec6b9ecce4fb0 Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Thu, 27 May 2021 10:12:00 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A5=20Socket.io=20integration=20WIP?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package-lock.json | 42 +++++++++++++++ package.json | 3 ++ services/api.service.ts | 79 +++++++++++++++++++++------ services/import.service.ts | 108 +++++++++++++++++++------------------ 4 files changed, 164 insertions(+), 68 deletions(-) diff --git a/package-lock.json b/package-lock.json index a93b5ef..c5ca4df 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2224,6 +2224,11 @@ "delayed-stream": "~1.0.0" } }, + "component-bind": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/component-bind/-/component-bind-1.0.0.tgz", + "integrity": "sha1-AMYIq33Nk4l8AAllGx06jh5zu9E=" + }, "component-emitter": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.3.0.tgz", @@ -5697,6 +5702,19 @@ "integrity": "sha1-nbe1lJatPzz+8wp1FC0tkwrXJlE=", "dev": true }, + "json-stream-stringify": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/json-stream-stringify/-/json-stream-stringify-2.0.3.tgz", + "integrity": "sha512-Ry+1rZE1YVKlCMG1emCiReP/OfZrFrEGZn6TC7NPpIGO9NXf0KEqqBL0flgt2J59EiRPv+CKi7S7v31RAHrdXw==" + }, + "json-streamify": { + "version": "0.1.4", + "resolved": "https://registry.npmjs.org/json-streamify/-/json-streamify-0.1.4.tgz", + "integrity": "sha1-HBgHSnAJv3vS0hRDzEJbmGTNCJ0=", + "requires": { + "traverse": ">=0.2.6" + } + }, "json-stringify-safe": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz", @@ -8257,6 +8275,30 @@ "debug": "~4.3.1" } }, + "socket.io-stream": { + "version": "0.9.1", + "resolved": "https://registry.npmjs.org/socket.io-stream/-/socket.io-stream-0.9.1.tgz", + "integrity": "sha1-QhJYMWKIuDrGk7DUPv0J1tQ6upc=", + "requires": { + "component-bind": "~1.0.0", + "debug": "~2.2.0" + }, + "dependencies": { + "debug": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/debug/-/debug-2.2.0.tgz", + "integrity": "sha1-+HBX6ZWxofauaklgZkE3vFbwOdo=", + "requires": { + "ms": "0.7.1" + } + }, + "ms": { + "version": "0.7.1", + "resolved": "https://registry.npmjs.org/ms/-/ms-0.7.1.tgz", + "integrity": "sha1-nNE8A62/8ltl7/3nzoZO6VIBcJg=" + } + } + }, "sonic-boom": { "version": "1.4.1", "resolved": "https://registry.npmjs.org/sonic-boom/-/sonic-boom-1.4.1.tgz", diff --git a/package.json b/package.json index a55f3c0..769d70f 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,8 @@ "fs-extra": "^10.0.0", "highland": "^2.13.5", "highland-json": "^1.4.1", + "json-stream-stringify": "^2.0.3", + "json-streamify": "^0.1.4", "lodash": "^4.17.21", "moleculer": "^0.14.0", "moleculer-db": "^0.8.4", @@ -57,6 +59,7 @@ "pino-pretty": "^4.7.1", "sharp": "^0.28.1", "socket.io": "^4.1.1", + "socket.io-stream": "^0.9.1", "stream-chain": "^2.2.4", "stream-json": "^1.7.1", "through2": "^4.0.2", diff --git a/services/api.service.ts b/services/api.service.ts index 9ca84dd..62b2c13 100644 --- a/services/api.service.ts +++ b/services/api.service.ts @@ -7,6 +7,10 @@ import { getCovers, extractArchive } from "../utils/uncompression.utils"; import { map, flatten } from "lodash"; import JSONStream from "JSONStream"; const IO = require("socket.io")(); +const ss = require("socket.io-stream"); +const JsonStreamStringify = require("json-stream-stringify"); +import axios from "axios"; +const { Writable, Readable } = require("stream"); export default class ApiService extends Service { public constructor(broker: ServiceBroker) { @@ -30,7 +34,12 @@ export default class ApiService extends Service { mergeParams: true, autoAliases: true, - aliases: {}, + aliases: { + async "POST getComicCovers"(req, res) { + const { extractionOptions, walkedFolders } = + req.body; + }, + }, // Calling options. More info: https://moleculer.services/docs/0.14/moleculer-web.html#Calling-options callingOptions: {}, @@ -86,21 +95,61 @@ export default class ApiService extends Service { this.io.on("connection", (client) => { this.logger.info("Client connected via websocket!"); - client.on("call", ({ action, params, opts }, done) => { - this.logger.info( - "Received request from client! Action:", - action, - ", Params:", - params - ); + client.on( + "call", + async ({ action, params, opts }, done) => { + this.logger.info( + "Received request from client! Action:", + action, + ", Params:", + params + ); + const { extractionOptions, walkedFolders } = params; + const stream = ss.createStream(); + switch (extractionOptions.extractionMode) { + case "bulk": + map(walkedFolders, async (folder, idx) => { + let foo = await extractArchive( + extractionOptions, + folder + ); - this.broker - .call("import." + action, params, opts) - .then((res) => { - client.emit("comicBookCoverMetadata", res); - }) - .catch((err) => this.logger.error(err)); - }); + let fo = new JsonStreamStringify({ + foo, + }); + + client.emit("comicBookCoverMetadata", { + data: foo, + status: "Done!", + }); + }); + // res.end(); + + case "single": + return await extractArchive( + extractionOptions, + walkedFolders[0] + ); + default: + console.log( + "Unknown extraction mode selected." + ); + return { + message: + "Unknown extraction mode selected.", + errorCode: "90", + data: `${extractionOptions}`, + }; + } + + // this.broker + // .call("import." + action, params, opts) + // .then((resp) => { + // // client.emit("comicBookCoverMetadata", resp); + // }) + // .catch((err) => this.logger.error(err)); + } + ); client.on("disconnect", () => { this.logger.info("Client disconnected"); diff --git a/services/import.service.ts b/services/import.service.ts index 42fe781..f90b5c5 100644 --- a/services/import.service.ts +++ b/services/import.service.ts @@ -11,6 +11,7 @@ import { } from "../utils/uncompression.utils"; import { IExtractionOptions, + IExtractedComicBookCoverFile, IFolderData, } from "../interfaces/folder.interface"; import axios from "axios"; @@ -19,12 +20,14 @@ import through2 from "through2"; import oboe from "oboe"; import H from "highland"; import { stringify } from "highland-json"; +const JsonStreamStringify = require("json-stream-stringify"); const IO = require("socket.io")(); - +const { chain } = require("stream-chain"); const { parser } = require("stream-json"); const { pick } = require("stream-json/filters/Pick"); const { ignore } = require("stream-json/filters/Ignore"); const { streamValues } = require("stream-json/streamers/StreamValues"); +const StreamArray = require("stream-json/streamers/StreamArray"); export default class ProductsService extends Service { // @ts-ignore @@ -63,58 +66,57 @@ export default class ProductsService extends Service { ); }, }, - getComicCovers: { - rest: "POST /getComicCovers", - params: { - extractionOptions: "object", - walkedFolders: "array", - }, - async handler( - ctx: Context<{ - extractionOptions: IExtractionOptions; - walkedFolders: IFolderData[]; - }> - ) { - switch ( - ctx.params.extractionOptions.extractionMode - ) { - case "bulk": - let rs = new Readable(); - const extractedDataPromises = map( - ctx.params.walkedFolders, - async (folder) => { - while (!isUndefined(folder)) { - let foo = - await extractArchive( - ctx.params - .extractionOptions, - folder - ); - console.log("levar", foo); - rs.push(foo); - } - rs.push(null); - } - ); - - case "single": - return await extractArchive( - ctx.params.extractionOptions, - ctx.params.walkedFolders[0] - ); - default: - console.log( - "Unknown extraction mode selected." - ); - return { - message: - "Unknown extraction mode selected.", - errorCode: "90", - data: `${ctx.params.extractionOptions}`, - }; - } - }, - }, + // getComicCovers: { + // rest: "POST /getComicCovers", + // params: { + // extractionOptions: "object", + // walkedFolders: "array", + // }, + // async handler( + // ctx: Context<{ + // extractionOptions: IExtractionOptions; + // walkedFolders: IFolderData[]; + // }> + // ) { + // switch ( + // ctx.params.extractionOptions.extractionMode + // ) { + // case "bulk": + // map( + // ctx.params.walkedFolders, + // async (folder, idx) => { + // let foo = await extractArchive( + // ctx.params + // .extractionOptions, + // folder + // ); + // // console.log("levar", foo); + // let jsonStream = + // new JsonStreamStringify({ + // foo, + // }); + // return jsonStream; + // } + // ); + // + // case "single": + // return await extractArchive( + // ctx.params.extractionOptions, + // ctx.params.walkedFolders[0] + // ); + // default: + // console.log( + // "Unknown extraction mode selected." + // ); + // return { + // message: + // "Unknown extraction mode selected.", + // errorCode: "90", + // data: `${ctx.params.extractionOptions}`, + // }; + // } + // }, + // }, }, methods: {}, },