🧑🏻‍🔧 Worker Streams

This commit is contained in:
2021-05-10 23:10:22 -07:00
parent 924453b22d
commit 325024afd1
7 changed files with 177 additions and 62 deletions

View File

@@ -16,6 +16,7 @@
"license": "MIT",
"dependencies": {
"@babel/runtime": "^7.13.17",
"@types/node-fetch": "^2.5.10",
"@types/react": "^17.0.3",
"@types/react-dom": "^17.0.2",
"@types/react-redux": "^7.1.16",
@@ -30,11 +31,13 @@
"highland": "^2.13.5",
"imghash": "^0.0.8",
"mongoose": "^5.10.11",
"node-fetch": "^2.6.1",
"oboe": "^2.1.5",
"react": "^17.0.1",
"react-dom": "^17.0.1",
"react-hooks-worker": "^1.0.0",
"sharp": "^0.28.1"
"sharp": "^0.28.1",
"through2": "^4.0.2"
},
"devDependencies": {
"@babel/cli": "^7.13.10",

View File

@@ -1,18 +1,19 @@
import axios from "axios";
import fetch, { Response } from "node-fetch";
import {
IExtractComicBookCoverErrorResponse,
IExtractedComicBookCoverFile,
IExtractionOptions,
IFolderData,
} from "../../server/interfaces/folder.interface";
import { FS_API_BASE_URI } from "../constants/endpoints";
import { API_BASE_URI } from "../constants/endpoints";
export async function walkFolder(path: string): Promise<Array<IFolderData>> {
return axios
.request<Array<IFolderData>>({
url: FS_API_BASE_URI + "walkFolder",
url: API_BASE_URI + "walkFolder",
method: "POST",
params: {
data: {
basePathToWalk: path,
},
transformResponse: (r: string) => JSON.parse(r),
@@ -26,17 +27,16 @@ export async function walkFolder(path: string): Promise<Array<IFolderData>> {
export async function extractCoverFromComicBookArchive(
extractionOptions: IExtractionOptions,
walkedFolders: Array<IFolderData>,
): Promise<
| IExtractedComicBookCoverFile
| IExtractedComicBookCoverFile[]
| IExtractComicBookCoverErrorResponse
> {
return await axios.request({
url: FS_API_BASE_URI + "getComicCovers",
): Promise<Response> {
return await fetch(API_BASE_URI + "getComicCovers", {
method: "POST",
data: {
headers: {
Accept: "application/json",
"Content-Type": "application/json",
},
body: JSON.stringify({
extractionOptions,
walkedFolders,
},
}),
});
}

View File

@@ -1,3 +1,3 @@
export const COMICBOOKINFO_SERVICE_URI =
"http://localhost:6050/api/comicbookinfo/";
export const FS_API_BASE_URI = "http://localhost:8050/api/";
export const API_BASE_URI = "http://localhost:8050/api/";

View File

@@ -4,6 +4,8 @@ import {
} from "../actions/fileops.actions";
import { IExtractedComicBookCoverFile } from "../../server/interfaces/folder.interface";
const ndjsonStream = require("can-ndjson-stream");
import fetch from "node-fetch";
import { API_BASE_URI } from "../constants/endpoints";
export const greet = async (
path: string,
@@ -19,14 +21,23 @@ export const greet = async (
pageLimit: 25,
page: 1,
};
const extractionOptions = {
...targetOptions,
paginationOptions: pagingConfig,
};
const fileObjects = await walkFolder("./comics");
const fo = await extractCoverFromComicBookArchive(
{
...targetOptions,
paginationOptions: pagingConfig,
},
const fetchedResource = await extractCoverFromComicBookArchive(
extractionOptions,
fileObjects,
);
// return JSON.stringify(fo);
const reader = await ndjsonStream(fetchedResource.body).getReader();
reader.read().then(function process({ done, value }) {
if (done) {
console.log("done");
return;
}
return reader.read().then(process);
});
};

View File

@@ -1,46 +1,74 @@
import router from "../router";
import { default as paginate } from "express-paginate";
import { IExtractionOptions } from "../../interfaces/folder.interface";
import { IExtractedComicBookCoverFile, IExtractionOptions } from "../../interfaces/folder.interface";
import { Request, Response } from "express";
import _ from "lodash";
import H from "highland";
import axios from "axios";
import oboe from "oboe";
import fs from "fs";
import { Readable } from "stream";
import through2 from "through2";
const getData = (source) => {
const response: { value: string }[] = [];
for (let index = 0; index < 100; index++) {
response.push({ value: "rishi " + index });
}
return response;
};
router.route("/getComicCovers").post(async (req: Request, res: Response) => {
typeof req.body.extractionOptions === "object"
? req.body.extractionOptions
: {};
console.log(oboe);
oboe({
url: "http://localhost:3000/api/import/getComicCovers",
const foo = await axios({
url: "http://localhost:3853/api/import/getComicCovers",
method: "POST",
body: {
data: {
extractionOptions: req.body.extractionOptions,
walkedFolders: req.body.walkedFolders,
},
headers: {
"Content-Type": "application/json",
"Content-Length": req.body.length,
},
}).node("{name path fileSize}", (data) => {
console.log(data);
return res.sendStatus(200);
});
const stream = new Readable({
objectMode: true,
highWaterMark: 1,
read() {},
});
// We create the stream transform using through2 library..
// We instruct it to handle objects, buffer size and transform function,
// that is, we convert our object to text to be able to send it through the stream response, which does not handle objects..
const ndjsonStream = through2(
{ objectMode: true, highWaterMark: 1 },
(data, enc, cb) => {
cb(null, JSON.stringify(data) + "\n");
},
);
// console.log(ndjsonStream);
// Through pipe we do a double addressing, our reading stream goes through the transformation
// to finally go through the stream response..
stream.pipe(ndjsonStream).pipe(res);
stream.push({ source1: foo.data });
stream.push(null);
});
router.route("/walkFolder").post(async (req: Request, res: Response) => {
const basePathToWalk =
typeof req.body.basePathToWalk === "string" ? req.body.basePathToWalk : "";
axios
const walkedFolders = await axios
.request({
url: "http://localhost:3853/api/import/walkFolders",
method: "POST",
data: {
basePathToWalk,
},
})
.then((data) => data)
.then((data) => data.data)
.catch((error) => error);
return res.json(walkedFolders);
});
export default router;

View File

@@ -1,30 +1,30 @@
Arguments:
/Users/rishi/.nvm/versions/node/v10.16.0/bin/node /Users/rishi/.yarn/bin/yarn.js add @types/can-ndjson-stream -D
/usr/local/Cellar/node/15.11.0/bin/node /usr/local/bin/yarn add @types/fetch --save-dev
PATH:
/Users/rishi/.yarn/bin:/Users/rishi/.config/yarn/global/node_modules/.bin:/Users/rishi/.nvm/versions/node/v10.16.0/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/Library/Apple/usr/bin
/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/Library/Apple/usr/bin
Yarn version:
1.22.5
1.22.10
Node version:
10.16.0
15.11.0
Platform:
darwin x64
Trace:
Error: https://registry.yarnpkg.com/@types%2fcan-ndjson-stream: Not found
at Request.params.callback [as _callback] (/Users/rishi/.yarn/lib/cli.js:66988:18)
at Request.self.callback (/Users/rishi/.yarn/lib/cli.js:140749:22)
at Request.emit (events.js:198:13)
at Request.<anonymous> (/Users/rishi/.yarn/lib/cli.js:141721:10)
at Request.emit (events.js:198:13)
at IncomingMessage.<anonymous> (/Users/rishi/.yarn/lib/cli.js:141643:12)
at Object.onceWrapper (events.js:286:20)
at IncomingMessage.emit (events.js:203:15)
at endReadableNT (_stream_readable.js:1129:12)
at process._tickCallback (internal/process/next_tick.js:63:19)
Error: https://registry.yarnpkg.com/@types%2ffetch: Not found
at Request.params.callback [as _callback] (/usr/local/lib/node_modules/yarn/lib/cli.js:66988:18)
at Request.self.callback (/usr/local/lib/node_modules/yarn/lib/cli.js:140662:22)
at Request.emit (node:events:378:20)
at Request.<anonymous> (/usr/local/lib/node_modules/yarn/lib/cli.js:141634:10)
at Request.emit (node:events:378:20)
at IncomingMessage.<anonymous> (/usr/local/lib/node_modules/yarn/lib/cli.js:141556:12)
at Object.onceWrapper (node:events:484:28)
at IncomingMessage.emit (node:events:390:22)
at endReadableNT (node:internal/streams/readable:1307:12)
at processTicksAndRejections (node:internal/process/task_queues:81:21)
npm manifest:
{
@@ -55,9 +55,12 @@ npm manifest:
"comlink-loader": "^2.0.0",
"express": "^4.17.1",
"express-paginate": "^1.0.2",
"fetch": "^1.1.0",
"fs-extra": "^9.1.0",
"highland": "^2.13.5",
"imghash": "^0.0.8",
"mongoose": "^5.10.11",
"oboe": "^2.1.5",
"react": "^17.0.1",
"react-dom": "^17.0.1",
"react-hooks-worker": "^1.0.0",
@@ -3038,6 +3041,13 @@ Lockfile:
dependencies:
file-uri-to-path "1.0.0"
biskviit@1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/biskviit/-/biskviit-1.0.1.tgz#037a0cd4b71b9e331fd90a1122de17dc49e420a7"
integrity sha1-A3oM1LcbnjMf2QoRIt4X3EnkIKc=
dependencies:
psl "^1.1.7"
bl@^1.0.0:
version "1.2.3"
resolved "https://registry.yarnpkg.com/bl/-/bl-1.2.3.tgz#1e8dd80142eac80d7158c9dccc047fb620e035e7"
@@ -4670,6 +4680,13 @@ Lockfile:
resolved "https://registry.yarnpkg.com/encodeurl/-/encodeurl-1.0.2.tgz#ad3ff4c86ec2d029322f5a02c3a9a606c95b3f59"
integrity sha1-rT/0yG7C0CkyL1oCw6mmBslbP1k=
encoding@0.1.12:
version "0.1.12"
resolved "https://registry.yarnpkg.com/encoding/-/encoding-0.1.12.tgz#538b66f3ee62cd1ab51ec323829d1f9480c74beb"
integrity sha1-U4tm8+5izRq1HsMjgp0flIDHS+s=
dependencies:
iconv-lite "~0.4.13"
encoding@^0.1.12:
version "0.1.13"
resolved "https://registry.yarnpkg.com/encoding/-/encoding-0.1.13.tgz#56574afdd791f54a8e9b2785c0582a2d26210fa9"
@@ -5426,6 +5443,14 @@ Lockfile:
dependencies:
pend "~1.2.0"
fetch@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/fetch/-/fetch-1.1.0.tgz#0a8279f06be37f9f0ebb567560a30a480da59a2e"
integrity sha1-CoJ58Gvjf58Ou1Z1YKMKSA2lmi4=
dependencies:
biskviit "1.0.1"
encoding "0.1.12"
figures@^1.3.5:
version "1.7.0"
resolved "https://registry.yarnpkg.com/figures/-/figures-1.7.0.tgz#cbe1e3affcf1cd44b80cadfed28dc793a9701d2e"
@@ -6151,6 +6176,13 @@ Lockfile:
resolved "https://registry.yarnpkg.com/he/-/he-1.2.0.tgz#84ae65fa7eafb165fddb61566ae14baf05664f0f"
integrity sha512-F/1DnUGPopORZi0ni+CvrCgHQ5FyEAHRLSApuYWMmrbSwoN2Mn/7k+Gl38gJnR7yyDZk6WLXwiGod1JOWNDKGw==
highland@^2.13.5:
version "2.13.5"
resolved "https://registry.yarnpkg.com/highland/-/highland-2.13.5.tgz#d55cd8ac3f67a00fad79918668d51493222cfcc2"
integrity sha512-dn2flPapIIAa4BtkB2ahjshg8iSJtrJtdhEb9/oiOrS5HMQTR/GuhFpqJ+11YBdtnl3AwWKvbZd1Uxr8uAmA7A==
dependencies:
util-deprecate "^1.0.2"
history@^4.9.0:
version "4.10.1"
resolved "https://registry.yarnpkg.com/history/-/history-4.10.1.tgz#33371a65e3a83b267434e2b3f3b1b4c58aad4cf3"
@@ -6292,6 +6324,11 @@ Lockfile:
statuses ">= 1.5.0 < 2"
toidentifier "1.0.0"
http-https@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/http-https/-/http-https-1.0.0.tgz#2f908dd5f1db4068c058cd6e6d4ce392c913389b"
integrity sha1-L5CN1fHbQGjAWM1ubUzjkskTOJs=
http-parser-js@>=0.5.1:
version "0.5.3"
resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.3.tgz#01d2709c79d41698bb01d4decc5e9da4e4a033d9"
@@ -6359,7 +6396,7 @@ Lockfile:
dependencies:
ms "^2.0.0"
iconv-lite@0.4.24:
iconv-lite@0.4.24, iconv-lite@~0.4.13:
version "0.4.24"
resolved "https://registry.yarnpkg.com/iconv-lite/-/iconv-lite-0.4.24.tgz#2022b4b25fbddc21d2f524974a474aafe733908b"
integrity sha512-v3MXnZAcvnywkTUEZomIActle7RXXeedOR31wwl7VlyoXO4Qi9arvSenNQWne1TcRwhCL1HwLI21bEqdpj8/rA==
@@ -9156,6 +9193,13 @@ Lockfile:
es-abstract "^1.18.0-next.1"
has "^1.0.3"
oboe@^2.1.5:
version "2.1.5"
resolved "https://registry.yarnpkg.com/oboe/-/oboe-2.1.5.tgz#5554284c543a2266d7a38f17e073821fbde393cd"
integrity sha1-VVQoTFQ6ImbXo48X4HOCH73jk80=
dependencies:
http-https "^1.0.0"
obuf@^1.0.0, obuf@^1.1.2:
version "1.1.2"
resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e"
@@ -9935,7 +9979,7 @@ Lockfile:
resolved "https://registry.yarnpkg.com/pseudomap/-/pseudomap-1.0.2.tgz#f052a28da70e618917ef0a8ac34c1ae5a68286b3"
integrity sha1-8FKijacOYYkX7wqKw0wa5aaChrM=
psl@^1.1.28, psl@^1.1.33:
psl@^1.1.28, psl@^1.1.33, psl@^1.1.7:
version "1.8.0"
resolved "https://registry.yarnpkg.com/psl/-/psl-1.8.0.tgz#9326f8bcfb013adcc005fdff056acce020e51c24"
integrity sha512-RIdOzyoavK+hA18OGGWDqUTsCLhtA7IcZ/6NCs4fFJaHBDab+pDDmDIByWFRQJq2Cd7r1OoQxBGKOaztq+hjIQ==

View File

@@ -1755,6 +1755,14 @@
"@types/mongodb" "*"
"@types/node" "*"
"@types/node-fetch@^2.5.10":
version "2.5.10"
resolved "https://registry.yarnpkg.com/@types/node-fetch/-/node-fetch-2.5.10.tgz#9b4d4a0425562f9fcea70b12cb3fcdd946ca8132"
integrity sha512-IpkX0AasN44hgEad0gEF/V6EgR5n69VEqPEgnmoM8GsIGro3PowbWs4tR6IhxUTyPLpOn+fiGG6nrQhcmoCuIQ==
dependencies:
"@types/node" "*"
form-data "^3.0.0"
"@types/node@*":
version "14.14.25"
resolved "https://registry.yarnpkg.com/@types/node/-/node-14.14.25.tgz#15967a7b577ff81383f9b888aa6705d43fbbae93"
@@ -3604,7 +3612,7 @@ columnify@~1.5.4:
strip-ansi "^3.0.0"
wcwidth "^1.0.0"
combined-stream@^1.0.6, combined-stream@~1.0.6:
combined-stream@^1.0.6, combined-stream@^1.0.8, combined-stream@~1.0.6:
version "1.0.8"
resolved "https://registry.yarnpkg.com/combined-stream/-/combined-stream-1.0.8.tgz#c3d45a8b34fd730631a110a8a2520682b31d5a7f"
integrity sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==
@@ -5459,6 +5467,15 @@ forever-agent@~0.6.1:
resolved "https://registry.yarnpkg.com/forever-agent/-/forever-agent-0.6.1.tgz#fbc71f0c41adeb37f96c577ad1ed42d8fdacca91"
integrity sha1-+8cfDEGt6zf5bFd60e1C2P2sypE=
form-data@^3.0.0:
version "3.0.1"
resolved "https://registry.yarnpkg.com/form-data/-/form-data-3.0.1.tgz#ebd53791b78356a99af9a300d4282c4d5eb9755f"
integrity sha512-RHkBKtLWUVwd7SqRIvCZMEvAMoGUp0XU+seQiZejj0COz3RI3hWP4sCv3gZWWLjJTd7rGwcsF5eKZGii0r/hbg==
dependencies:
asynckit "^0.4.0"
combined-stream "^1.0.8"
mime-types "^2.1.12"
form-data@~2.3.2:
version "2.3.3"
resolved "https://registry.yarnpkg.com/form-data/-/form-data-2.3.3.tgz#dcce52c05f644f298c6a7ab936bd724ceffbf3a6"
@@ -8542,6 +8559,11 @@ node-addon-api@^3.1.0:
resolved "https://registry.yarnpkg.com/node-addon-api/-/node-addon-api-3.1.0.tgz#98b21931557466c6729e51cb77cd39c965f42239"
integrity sha512-flmrDNB06LIl5lywUz7YlNGZH/5p0M7W28k8hzd9Lshtdh1wshD2Y+U4h9LD6KObOy1f+fEVdgprPrEymjM5uw==
node-fetch@^2.6.1:
version "2.6.1"
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.1.tgz#045bd323631f76ed2e2b55573394416b639a0052"
integrity sha512-V4aYg89jEoVRxRb2fJdAg8FHvI7cEyYdVAh94HH0UIK8oJxUfkjlDQN9RbMx+bEjP7+ggMiFRprSti032Oipxw==
node-forge@^0.10.0:
version "0.10.0"
resolved "https://registry.yarnpkg.com/node-forge/-/node-forge-0.10.0.tgz#32dea2afb3e9926f02ee5ce8794902691a676bf3"
@@ -10091,6 +10113,15 @@ read@1, read@~1.0.1, read@~1.0.7:
dependencies:
mute-stream "~0.0.4"
readable-stream@3, readable-stream@^3.0.0, readable-stream@^3.0.6, readable-stream@^3.1.1, readable-stream@^3.4.0, readable-stream@^3.6.0:
version "3.6.0"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.0.tgz#337bbda3adc0706bd3e024426a286d4b4b2c9198"
integrity sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA==
dependencies:
inherits "^2.0.3"
string_decoder "^1.1.1"
util-deprecate "^1.0.1"
readable-stream@^2.0.0, readable-stream@^2.0.1, readable-stream@^2.0.2, readable-stream@^2.0.6, readable-stream@^2.3.0, readable-stream@^2.3.5, readable-stream@~2.3.6:
version "2.3.7"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-2.3.7.tgz#1eca1cf711aef814c04f62252a36a62f6cb23b57"
@@ -10104,15 +10135,6 @@ readable-stream@^2.0.0, readable-stream@^2.0.1, readable-stream@^2.0.2, readable
string_decoder "~1.1.1"
util-deprecate "~1.0.1"
readable-stream@^3.0.0, readable-stream@^3.0.6, readable-stream@^3.1.1, readable-stream@^3.4.0, readable-stream@^3.6.0:
version "3.6.0"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.0.tgz#337bbda3adc0706bd3e024426a286d4b4b2c9198"
integrity sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA==
dependencies:
inherits "^2.0.3"
string_decoder "^1.1.1"
util-deprecate "^1.0.1"
readdir-scoped-modules@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/readdir-scoped-modules/-/readdir-scoped-modules-1.1.0.tgz#8d45407b4f870a0dcaebc0e28670d18e74514309"
@@ -11679,6 +11701,13 @@ through2@^2.0.2, through2@^2.0.3:
readable-stream "~2.3.6"
xtend "~4.0.1"
through2@^4.0.2:
version "4.0.2"
resolved "https://registry.yarnpkg.com/through2/-/through2-4.0.2.tgz#a7ce3ac2a7a8b0b966c80e7c49f0484c3b239764"
integrity sha512-iOqSav00cVxEEICeD7TjLB1sueEL+81Wpzp2bY17uZjZN0pWZPuo4suZ/61VujxmqSGFfgOcNuTZ85QJwNZQpw==
dependencies:
readable-stream "3"
through@^2.3.8:
version "2.3.8"
resolved "https://registry.yarnpkg.com/through/-/through-2.3.8.tgz#0dd4c9ffaabc357960b1b724115d7e0e86a2e1f5"