24 Commits

Author SHA1 Message Date
c65eb2c6ec Update docker-image.yml 2025-02-20 21:37:53 -05:00
5b2555aa61 Merge pull request #5 from rishighan/rishighan-patch-1
Update docker-image.yml
2025-02-20 21:26:42 -05:00
6b732f1518 Update docker-image.yml 2025-02-20 21:26:30 -05:00
1900a3ddb8 🔧 Removed workflow file from wrong path 2025-02-20 21:24:13 -05:00
63130b4e82 Create docker-image.yml 2025-02-20 21:23:18 -05:00
f5a2e6505b 🔧 yeeting a docker workflow 2025-02-20 21:18:40 -05:00
6f11e84c1d 🐛 Removed some random letters 2025-02-20 12:36:53 -05:00
cb8e6bb3d6 Merge pull request #4 from rishighan/autodownload-loop
Autodownload loop
2025-02-17 15:42:22 -05:00
c0946e2ce4 🔧 Fixed awaits inside loops 2025-02-17 15:41:38 -05:00
36f08212a0 🔧 Wired the kafka producer back up 2025-01-27 19:28:40 -05:00
cd9ea85b80 🔧 Fixing autodownload functionality 2024-12-22 21:59:49 -05:00
e0954eb3e8 🔧 Fixing the response from searchResultAdded event 2024-11-22 21:12:25 -05:00
651e3ac7bb 🔧 Added an undefined check for the results 2024-11-18 11:07:38 -05:00
4045097eb0 🏗️ Added string-similarity lib 2024-08-14 12:19:55 -04:00
a710211a2c 🔧 Commented code 2024-07-10 14:36:03 -04:00
0430670a01 🏗️ Added structure for the UI notification 2024-07-02 22:09:33 -04:00
ecdc3845cb 🪳 Serialized Map to be compatible with kafka 2024-06-18 00:06:25 -04:00
e6a85e6a39 🏗️ Commented code 2024-06-13 14:09:00 -04:00
55494abdc0 🔧 Fixes to search query builder 2024-06-12 21:41:29 -05:00
60e5b6f61b 🔧 kafka-powered autodownload loop 2024-06-03 17:18:22 -04:00
12e46334da 🪳 kafka for handling dc++ download jobs 2024-05-28 08:39:46 -04:00
6fb1374ce9 🏗️ WIP for automatic downloads endpoint 2024-05-09 13:45:18 -04:00
17ed663823 Added an AutoDownloadService 2024-04-23 22:46:49 -05:00
91199bcf0c Merge pull request #3 from rishighan/qbittorrent-endpoints
Qbittorrent endpoints
2024-03-30 21:38:17 -04:00
8 changed files with 614 additions and 39 deletions

19
.github/workflows/docker-image.yml vendored Normal file
View File

@@ -0,0 +1,19 @@
name: Docker Image CI
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- name: Publish to Registry
uses: elgohr/Publish-Docker-Github-Action@v5
with:
name: frishi/threetwo-acquisition-service
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}

132
package-lock.json generated
View File

@@ -10,13 +10,16 @@
"dependencies": {
"@robertklep/qbittorrent": "^1.0.1",
"ioredis": "^5.0.0",
"kafkajs": "^2.2.4",
"moleculer": "^0.14.27",
"moleculer-web": "^0.10.5",
"parse-torrent": "^9.1.5"
"parse-torrent": "^9.1.5",
"string-similarity-alg": "^1.3.2"
},
"devDependencies": {
"@jest/globals": "^29.3.1",
"@types/jest": "^29.2.3",
"@types/lodash": "^4.17.4",
"@types/node": "^18.11.9",
"@types/parse-torrent": "^5.8.7",
"@typescript-eslint/eslint-plugin": "^5.44.0",
@@ -31,9 +34,11 @@
"eslint-plugin-import": "^2.26.0",
"eslint-plugin-jest": "^27.1.6",
"jest": "^29.3.1",
"lodash": "^4.17.21",
"moleculer-repl": "^0.7.3",
"prettier": "^2.8.0",
"qbittorrent-api-v2": "^1.2.2",
"socket.io-client": "^4.7.5",
"ts-jest": "^29.0.3",
"ts-node": "^10.9.1",
"typescript": "^4.9.3"
@@ -1359,6 +1364,12 @@
"@sinonjs/commons": "^3.0.0"
}
},
"node_modules/@socket.io/component-emitter": {
"version": "3.1.2",
"resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.2.tgz",
"integrity": "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==",
"dev": true
},
"node_modules/@tsconfig/node10": {
"version": "1.0.9",
"resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz",
@@ -1479,6 +1490,12 @@
"integrity": "sha512-dRLjCWHYg4oaA77cxO64oO+7JwCwnIzkZPdrrC71jQmQtlhM556pwKo5bUzqvZndkVbeFLIIi+9TC40JNF5hNQ==",
"dev": true
},
"node_modules/@types/lodash": {
"version": "4.17.4",
"resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.17.4.tgz",
"integrity": "sha512-wYCP26ZLxaT3R39kiN2+HcJ4kTd3U1waI/cY7ivWYqFP6pW3ZNpvi6Wd6PHZx7T/t8z0vlkXMg3QYLa7DZ/IJQ==",
"dev": true
},
"node_modules/@types/magnet-uri": {
"version": "5.1.5",
"resolved": "https://registry.npmjs.org/@types/magnet-uri/-/magnet-uri-5.1.5.tgz",
@@ -2340,12 +2357,12 @@
}
},
"node_modules/braces": {
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz",
"integrity": "sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==",
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz",
"integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==",
"dev": true,
"dependencies": {
"fill-range": "^7.0.1"
"fill-range": "^7.1.1"
},
"engines": {
"node": ">=8"
@@ -3008,6 +3025,28 @@
"node": ">= 0.8"
}
},
"node_modules/engine.io-client": {
"version": "6.5.4",
"resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.5.4.tgz",
"integrity": "sha512-GeZeeRjpD2qf49cZQ0Wvh/8NJNfeXkXXcoGh+F77oEAgo9gUHwT1fCRxSNU+YEEaysOJTnsFHmM5oAcPy4ntvQ==",
"dev": true,
"dependencies": {
"@socket.io/component-emitter": "~3.1.0",
"debug": "~4.3.1",
"engine.io-parser": "~5.2.1",
"ws": "~8.17.1",
"xmlhttprequest-ssl": "~2.0.0"
}
},
"node_modules/engine.io-parser": {
"version": "5.2.2",
"resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz",
"integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==",
"dev": true,
"engines": {
"node": ">=10.0.0"
}
},
"node_modules/error-ex": {
"version": "1.3.2",
"resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.2.tgz",
@@ -3833,9 +3872,9 @@
}
},
"node_modules/fill-range": {
"version": "7.0.1",
"resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz",
"integrity": "sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==",
"version": "7.1.1",
"resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz",
"integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==",
"dev": true,
"dependencies": {
"to-regex-range": "^5.0.1"
@@ -3881,9 +3920,9 @@
"dev": true
},
"node_modules/follow-redirects": {
"version": "1.15.5",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.5.tgz",
"integrity": "sha512-vSFWUON1B+yAw1VN4xMfxgn5fTUiaOzAJCKBwIIgT/+7CuGy9+r+5gITvP62j3RmaD5Ph65UaERdOSRGUzZtgw==",
"version": "1.15.6",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.6.tgz",
"integrity": "sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==",
"dev": true,
"funding": [
{
@@ -5412,6 +5451,14 @@
"node": ">=6"
}
},
"node_modules/kafkajs": {
"version": "2.2.4",
"resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz",
"integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==",
"engines": {
"node": ">=14.0.0"
}
},
"node_modules/keyv": {
"version": "4.5.3",
"resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.3.tgz",
@@ -7109,6 +7156,34 @@
"url": "https://github.com/chalk/slice-ansi?sponsor=1"
}
},
"node_modules/socket.io-client": {
"version": "4.7.5",
"resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.7.5.tgz",
"integrity": "sha512-sJ/tqHOCe7Z50JCBCXrsY3I2k03iOiUe+tj1OmKeD2lXPiGH/RUCdTZFoqVyN7l1MnpIzPrGtLcijffmeouNlQ==",
"dev": true,
"dependencies": {
"@socket.io/component-emitter": "~3.1.0",
"debug": "~4.3.2",
"engine.io-client": "~6.5.2",
"socket.io-parser": "~4.2.4"
},
"engines": {
"node": ">=10.0.0"
}
},
"node_modules/socket.io-parser": {
"version": "4.2.4",
"resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz",
"integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==",
"dev": true,
"dependencies": {
"@socket.io/component-emitter": "~3.1.0",
"debug": "~4.3.1"
},
"engines": {
"node": ">=10.0.0"
}
},
"node_modules/source-map": {
"version": "0.6.1",
"resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
@@ -7205,6 +7280,11 @@
"node": ">=10"
}
},
"node_modules/string-similarity-alg": {
"version": "1.3.2",
"resolved": "https://registry.npmjs.org/string-similarity-alg/-/string-similarity-alg-1.3.2.tgz",
"integrity": "sha512-M+jTGEJmWLfIg2dawXOifzbkUs/tp8HbeSCXZpNII2oZvU5uexaBFx+NoUBWS3M6VQ2ezJJCMstU8L8gq6YqsQ=="
},
"node_modules/string-width": {
"version": "4.2.3",
"resolved": "https://registry.npmjs.org/string-width/-/string-width-4.2.3.tgz",
@@ -7960,6 +8040,36 @@
"node": "^12.13.0 || ^14.15.0 || >=16.0.0"
}
},
"node_modules/ws": {
"version": "8.17.1",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz",
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
"dev": true,
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
},
"node_modules/xmlhttprequest-ssl": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.0.0.tgz",
"integrity": "sha512-QKxVRxiRACQcVuQEYFsI1hhkrMlrXHPegbbd1yn9UHOmRxY+si12nQYzri3vbzt8VdTTRviqcKxcyllFas5z2A==",
"dev": true,
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/y18n": {
"version": "5.0.8",
"resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.8.tgz",

View File

@@ -23,6 +23,7 @@
"devDependencies": {
"@jest/globals": "^29.3.1",
"@types/jest": "^29.2.3",
"@types/lodash": "^4.17.4",
"@types/node": "^18.11.9",
"@types/parse-torrent": "^5.8.7",
"@typescript-eslint/eslint-plugin": "^5.44.0",
@@ -45,11 +46,15 @@
"typescript": "^4.9.3"
},
"dependencies": {
"lodash": "^4.17.21",
"@robertklep/qbittorrent": "^1.0.1",
"ioredis": "^5.0.0",
"moleculer": "^0.14.27",
"moleculer-web": "^0.10.5",
"parse-torrent": "^9.1.5"
"kafkajs": "^2.2.4",
"socket.io-client": "^4.7.5",
"moleculer": "^0.14.34",
"moleculer-web": "^0.10.7",
"parse-torrent": "^9.1.5",
"string-similarity-alg": "^1.3.2"
},
"engines": {
"node": ">= 16.x.x"

View File

@@ -3,7 +3,7 @@ import { Service, ServiceBroker } from "moleculer";
import ApiGateway from "moleculer-web";
export default class ApiService extends Service {
public constructor(broker: ServiceBroker) {
constructor(broker: ServiceBroker) {
super(broker);
this.parseServiceSchema({
name: "api",

View File

@@ -0,0 +1,114 @@
"use strict";
import { Kafka } from "kafkajs";
import type { Context, ServiceBroker, ServiceSchema } from "moleculer";
import { Errors, Service } from "moleculer";
interface Comic {
wanted: {
markEntireVolumeWanted?: boolean;
issues?: any[];
volume: {
id: string;
name: string;
};
};
}
export default class AutoDownloadService extends Service {
private kafkaProducer: any;
private readonly BATCH_SIZE = 100; // Adjust based on your system capacity
// @ts-ignore
constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "autodownload" },
) {
super(broker);
this.parseServiceSchema({
name: "autodownload",
actions: {
searchWantedComics: {
rest: "POST /searchWantedComics",
handler: async (ctx: Context<{}>) => {
try {
/* eslint-disable no-await-in-loop */
let page = 1;
const limit = this.BATCH_SIZE;
let comics: Comic[];
do {
comics = await this.broker.call(
"library.getComicsMarkedAsWanted",
{ page, limit },
);
// Log debugging info
this.logger.info(
"Received comics from getComicsMarkedAsWanted:",
JSON.stringify(comics, null, 2),
);
if (!Array.isArray(comics)) {
this.logger.error(
"Invalid response structure",
JSON.stringify(comics, null, 2),
);
throw new Errors.MoleculerError(
"Invalid response structure from getComicsMarkedAsWanted",
500,
"INVALID_RESPONSE_STRUCTURE",
);
}
this.logger.info(
`Fetched ${comics.length} comics from page ${page}`,
);
for (const comic of comics) {
await this.produceJobToKafka(comic);
}
page += 1;
} while (comics.length === limit);
return {
success: true,
message: "Jobs enqueued for background processing.",
};
} catch (error) {
this.logger.error("Error in searchWantedComics:", error);
throw new Errors.MoleculerError(
"Failed to search wanted comics.",
500,
"SEARCH_WANTED_COMICS_ERROR",
{ error },
);
}
},
},
},
methods: {
produceJobToKafka: async (comic: Comic) => {
const job = { comic };
try {
await this.kafkaProducer.send({
topic: "comic-search-jobs",
messages: [{ value: JSON.stringify(job) }],
});
this.logger.info("Produced job to Kafka:", job);
} catch (error) {
this.logger.error("Error producing job to Kafka:", error);
}
},
},
async started() {
const kafka = new Kafka({
clientId: "comic-search-service",
brokers: ["localhost:9092"],
});
this.kafkaProducer = kafka.producer();
await this.kafkaProducer.connect();
this.logger.info("Kafka producer connected successfully.");
},
async stopped() {
await this.kafkaProducer.disconnect();
this.logger.info("Kafka producer disconnected successfully.");
},
});
}
}

View File

@@ -0,0 +1,309 @@
import type { EachMessagePayload } from "kafkajs";
import { Kafka, logLevel } from "kafkajs";
import { isNil, isUndefined } from "lodash";
import type { ServiceBroker, ServiceSchema } from "moleculer";
import { Service } from "moleculer";
import io from "socket.io-client";
import stringSimilarity from "string-similarity-alg";
interface SearchResult {
groupedResult: { entityId: number; payload: any };
updatedResult: { entityId: number; payload: any };
}
export default class ComicProcessorService extends Service {
private kafkaConsumer: any;
private socketIOInstance: any;
private kafkaProducer: any;
private prowlarrResultsMap: Map<string, any> = new Map();
private airDCPPSearchResults: Map<number, any[]> = new Map();
private issuesToSearch: any = [];
// @ts-ignore: schema parameter is required by Service constructor
constructor(
public broker: ServiceBroker,
schema: ServiceSchema<object> = { name: "comicProcessor" },
) {
super(broker, schema);
this.parseServiceSchema({
name: "comicProcessor",
methods: {
parseStringDate: (dateString: string) => {
const date = new Date(dateString);
return {
year: date.getFullYear(),
month: date.getMonth() + 1,
day: date.getDate(),
};
},
rankSearchResults: async (results: Map<number, any[]>, query: string) => {
// Find the highest-ranked response based on similarity to the search string
let highestRankedResult = null;
let highestSimilarity = -1;
results.forEach((resultArray) => {
resultArray.forEach((result) => {
const similarity = stringSimilarity("jaro-winkler").compare(
result.name,
query,
);
if (similarity > highestSimilarity) {
highestSimilarity = similarity;
highestRankedResult = { ...result, similarity };
}
});
});
return highestRankedResult;
},
processJob: async (job: any) => {
try {
this.logger.info("Processing job:", JSON.stringify(job, null, 2));
// Get the hub to search on
const settings: any = await this.broker.call("settings.getSettings", {
settingsKey: "directConnect",
});
const hubs = settings.client.hubs.map((hub: any) => hub.value);
const { comic } = job;
const { volume, issues, markEntireVolumeWanted } = comic.wanted;
// If entire volume is marked as wanted, get their details from CV
if (markEntireVolumeWanted) {
this.issuesToSearch = await this.broker.call(
"comicvine.getIssuesForVolume",
{ volumeId: volume.id },
);
this.logger.info(
`The entire volume with id: ${volume.id} was marked as wanted.`,
);
this.logger.info(`Fetched issues for ${volume.id}:`);
this.logger.info(`${this.issuesToSearch.length} issues to search`);
} else {
// Or proceed with `issues` from the wanted object.
this.issuesToSearch = issues;
}
for (const issue of this.issuesToSearch) {
// Query builder for DC++
// 1. issue number
const inferredIssueNumber =
issue.issueNumber || issue.issue_number || "";
// 2. year
const { year } = this.parseStringDate(issue.coverDate);
const inferredYear = year || issue.year || "";
// 3. Orchestrate the query
const dcppSearchQuery = {
query: {
pattern: `${volume.name
.replace(/[^\w\s]/g, "")
.replace(/\s+/g, " ")
.trim()}`,
extensions: ["cbz", "cbr", "cb7"],
},
hub_urls: hubs,
priority: 5,
};
this.logger.info(
"DC++ search query:",
JSON.stringify(dcppSearchQuery, null, 4),
);
await this.broker.call("socket.search", {
query: dcppSearchQuery,
config: {
hostname: "192.168.1.119:5600",
protocol: "http",
username: "admin",
password: "password",
},
namespace: "/automated",
});
// const prowlarrResults = await this.broker.call("prowlarr.search", {
// prowlarrQuery: {
// port: "9696",
// apiKey: "c4f42e265fb044dc81f7e88bd41c3367",
// offset: 0,
// categories: [7030],
// query: `${volume.name} ${issue.issueNumber} ${year}`,
// host: "localhost",
// limit: 100,
// type: "search",
// indexerIds: [2],
// },
// });
//
// this.logger.info(
// "Prowlarr search results:",
// JSON.stringify(prowlarrResults, null, 4),
// );
// Store prowlarr results in map using unique key
// const key = `${volume.name}-${issue.issueNumber}-${year}`;
// this.prowlarrResultsMap.set(key, prowlarrResults);
}
} catch (error) {
this.logger.error("Error processing job:", error);
}
},
produceResultsToKafka: async (query: string, result: any[]): Promise<void> => {
try {
/*
Match and rank
*/
const finalResult = await this.rankSearchResults(
this.airDCPPSearchResults,
query,
);
/*
Kafka messages need to be in a format that can be serialized to JSON,
and a Map is not directly serializable in a way that retains its structure,
hence why we use Object.fromEntries
*/
await this.kafkaProducer.send({
topic: "comic-search-results",
messages: [
{
value: JSON.stringify(finalResult),
},
],
});
this.logger.info(`Produced results to Kafka.`);
// socket event for UI
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "searchResultsAvailable",
args: [
{
query,
finalResult,
},
],
});
} catch (error) {
this.logger.error("Error producing results to Kafka:", error);
}
},
},
async started() {
const kafka = new Kafka({
clientId: "comic-processor-service",
brokers: ["localhost:9092"],
logLevel: logLevel.INFO,
});
this.kafkaConsumer = kafka.consumer({ groupId: "comic-processor-group" });
this.kafkaProducer = kafka.producer();
this.kafkaConsumer.on("consumer.crash", (event: any) => {
this.logger.error("Consumer crash:", event);
});
this.kafkaConsumer.on("consumer.connect", () => {
this.logger.info("Consumer connected");
});
this.kafkaConsumer.on("consumer.disconnect", () => {
this.logger.info("Consumer disconnected");
});
this.kafkaConsumer.on("consumer.network.request_timeout", () => {
this.logger.warn("Consumer network request timeout");
});
await this.kafkaConsumer.connect();
await this.kafkaProducer.connect();
await this.kafkaConsumer.subscribe({
topic: "comic-search-jobs",
fromBeginning: true,
});
await this.kafkaConsumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
if (message.value) {
const job = JSON.parse(message.value.toString());
await this.processJob(job);
} else {
this.logger.warn("Received message with null value");
}
},
});
this.socketIOInstance = io("ws://localhost:3001/automated", {
transports: ["websocket"],
withCredentials: true,
});
this.socketIOInstance.on("connect", () => {
this.logger.info("Socket.IO connected successfully.");
});
// Handle searchResultAdded event
this.socketIOInstance.on("searchResultAdded", (result: SearchResult) => {
const {
groupedResult: { entityId, payload },
} = result;
this.logger.info(
`AirDC++ Search result added for entityId: ${entityId} - ${payload?.name}`,
);
if (!this.airDCPPSearchResults.has(entityId)) {
this.airDCPPSearchResults.set(entityId, []);
}
if (!isNil(payload)) {
this.airDCPPSearchResults.get(entityId).push(payload);
}
console.log(
"Updated airDCPPSearchResults:",
JSON.stringify(Array.from(this.airDCPPSearchResults.entries()), null, 4),
);
console.log(JSON.stringify(payload, null, 4));
});
// Handle searchResultUpdated event
this.socketIOInstance.on("searchResultUpdated", (result: SearchResult) => {
const {
updatedResult: { entityId, payload },
} = result;
const resultsForInstance = this.airDCPPSearchResults.get(entityId);
if (resultsForInstance) {
const toReplaceIndex = resultsForInstance.findIndex((element: any) => {
this.logger.info("search result updated!");
this.logger.info(JSON.stringify(element, null, 4));
return element.id === payload.id;
});
if (toReplaceIndex !== -1) {
// Replace the existing result with the updated result
resultsForInstance[toReplaceIndex] = payload;
// Optionally, update the map with the modified array
this.airDCPPSearchResults.set(entityId, resultsForInstance);
}
}
});
// Handle searchComplete event
this.socketIOInstance.on("searchesSent", async (data: any) => {
this.logger.info(
`Search complete for query: "${data.searchInfo.query.pattern}"`,
);
await this.produceResultsToKafka(data.searchInfo.query.pattern);
});
},
async stopped() {
await this.kafkaConsumer.disconnect();
await this.kafkaProducer.disconnect();
if (this.socketIOInstance) {
this.socketIOInstance.close();
}
},
});
}
}

View File

@@ -1,10 +1,10 @@
"use strict";
import { Context, Service, ServiceBroker, ServiceSchema, Errors } from "moleculer";
import axios from "axios";
export default class ProwlarrService extends Service {
// @ts-ignore
public constructor(
constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "prowlarr" },
) {
@@ -54,37 +54,42 @@ export default class ProwlarrService extends Service {
rest: "GET /search",
handler: async (
ctx: Context<{
host: string;
port: string;
apiKey: string;
query: string;
type: string;
indexerIds: [number];
categories: [number];
limit: number;
offset: number;
prowlarrQuery: {
host: string;
port: string;
apiKey: string;
query: string;
type: string;
indexerIds: [number];
categories: [number];
limit: number;
offset: number;
};
}>,
) => {
const {
indexerIds,
categories,
host,
port,
apiKey,
query,
type,
limit,
offset,
prowlarrQuery: {
indexerIds,
categories,
host,
port,
apiKey,
query,
type,
limit,
offset,
},
} = ctx.params;
const indexer = indexerIds[0] ? indexerIds.length === 1 : indexerIds;
const category = categories[0] ? categories.length === 1 : categories;
const result = await axios({
url: `http://${host}:${port}/api/v1/search`,
method: "GET",
params: {
query,
type,
indexerIds,
categories,
indexer,
category,
limit,
offset,
},

View File

@@ -86,7 +86,6 @@ export default class QBittorrentService extends Service {
getClientInfo: {
rest: "GET /getClientInfo",
handler: async (ctx: Context<{}>) => {
console.log(this.meta.app);
await this.broker.call("qbittorrent.loginWithStoredCredentials", {});
return {
buildInfo: await this.meta.app.buildInfo(),
@@ -212,6 +211,20 @@ export default class QBittorrentService extends Service {
}
},
},
determineDownloadApps: {
rest: "",
handler: async () => {
// 1. Parse the incoming search query
// to make sure that it is well-formed
// At the very least, it should have name, year, number
// 2. Choose between download mediums based on user-preference?
// possible choices are: DC++, Torrent
// 3. Perform the search on those media with the aforementioned search query
// 4. Choose a subset of relevant search results,
// and score them
// 5. Download the highest-scoring, relevant result
},
},
},
methods: {},
async started() {