106 Commits

Author SHA1 Message Date
2d9ea15550 🔧 Added canonical metadata related changes 2025-11-17 13:00:11 -05:00
755381021d Additions 2025-10-29 12:25:05 -04:00
a9bfa479c4 🔧 Added graphQL bits 2025-09-23 18:14:35 -04:00
136a7f494f 🐳 Added graphql deps 2025-07-14 11:58:42 -04:00
b332d9d75a 📜 Added JsDoc to methods 2025-06-10 13:55:11 -04:00
a0671ce6d1 👀 Refactoring file watcher code 2025-06-10 13:30:31 -04:00
999af29800 ⬇️ Fixing ADC++ socket download notifications 2025-06-03 21:57:44 -04:00
7313fc4df7 🧦 Changes to socket service to support UI 2025-05-18 20:46:37 -04:00
8b8f470f52 😂 IDK anymore 2025-02-25 16:00:37 -05:00
a2eae27c31 🏗️ Added a builder step 2025-02-25 15:36:44 -05:00
58168b1a9c 💻 Switched back to x86_64 2025-02-25 14:31:45 -05:00
bd62866340 🔧 Yet another fix for sharp 2025-02-25 14:18:44 -05:00
77d21d3046 🗻 Switched to node 21 alpine 2025-02-25 13:45:35 -05:00
030f89b258 🤷🏼 YOLO 2025-02-25 12:22:17 -05:00
a702f724f7 🪓 Arch change 2025-02-24 17:52:03 -05:00
d0b4219aef 🔧 Fixed Dockerfile 2025-02-24 17:29:48 -05:00
09d7fa2772 🪓 Attempting to get sharp installed in the image 2025-02-24 15:20:54 -05:00
b0c56f65c4 🔧 Update for libsharp for arm64 2025-02-24 14:05:07 -05:00
10ff192ce1 Bumped up elasticsearch to 8.17.2 2025-02-20 12:37:36 -05:00
1d48499c64 Revert "Merge branch 'master' into getbundles-fix"
This reverts commit 30168844f3, reversing
changes made to 2e60e2e3d5.
2024-10-24 10:59:09 -04:00
c9ecbb911a Merge pull request #12 from rishighan/getbundles-fix
getBundles Fix
2024-10-24 10:50:29 -04:00
30168844f3 Merge branch 'master' into getbundles-fix 2024-10-24 10:47:31 -04:00
2e60e2e3d5 Added package-lock.json 2024-10-24 10:45:19 -04:00
8254ec2093 ⌫ package.json deleted 2024-10-24 10:41:59 -04:00
7381d03045 🔧 Fixed getBundles endpoint 2024-10-23 23:14:21 -04:00
d7e865f84f 🔧 Prettification 2024-10-23 14:26:24 -04:00
baa5a99855 🔧 Removed indirection for getBundles 2024-10-23 13:42:05 -04:00
68c2dacff4 🔧 getBundles endpoint WIP 2024-10-21 18:04:16 -04:00
55e0ce6d36 🖌️ Formatting changes 2024-10-18 13:19:57 -04:00
4ffad69c44 🔧 Todo to move the method from UI 2024-10-16 18:50:14 -04:00
f9438f2129 🔧 Fixing broken AirDCPP search 2024-09-26 21:33:02 -04:00
2247411ac8 🪳 Added kafka to the docker-compose deps 2024-05-28 08:40:33 -04:00
e61ecb1143 🔧 Refactor for docker-compose 2024-05-24 14:22:59 -04:00
e01421f17b 🐳 Added all other deps 2024-05-23 23:15:03 -04:00
cc271021e0 🐳 Created a deps docker-compose stack 2024-05-19 21:19:15 -04:00
cc772504ae 🔧 Fixed the Redis disconnection issue 2024-05-17 01:26:16 -04:00
8dcb17a6a0 🔧 Reverted 2024-05-16 14:15:09 -04:00
a06896ffcc 🔧 Reverting to nats for transporter needs 2024-05-16 14:03:07 -04:00
03f6623ed0 🔧 Fixes 2024-05-15 21:27:38 -05:00
66f9d63b44 🔧 Debuggin Redis connectivity issue 2024-05-15 16:58:49 -05:00
a936df3144 🪲 Added a console.log 2024-05-15 12:13:50 -05:00
dc9dabca48 🔧 Fixed REDIS_URI 2024-05-15 12:00:51 -05:00
4680fd0875 ⬅️ Reverted changes 2024-05-15 11:47:08 -05:00
323548c0ff 🔧 WIP Dockerfile fixes 2024-05-15 11:32:11 -05:00
f4563c12c6 🔧 Added startup scripts fixing MongoDB timeouts 2024-05-12 23:35:01 -04:00
1b0cada848 🏗️ Added validation to db mixin 2024-05-11 13:31:10 -04:00
750a74cd9f Merge pull request #10 from rishighan/automated-download-loop
Automated download loop
2024-05-10 22:59:08 -04:00
402ee4d81b 🏗️ Updated Dockerfile 2024-05-10 22:55:57 -04:00
1fa35ac0e3 🏗️ Automatic downloads endpoint support 2024-05-09 13:49:26 -04:00
680594e67c 🔧 Added wiring for AirDC++ service 2024-04-23 22:47:32 -05:00
5593fcb4a0 🔧 Added DC++ search and download actions 2024-04-17 21:14:48 -05:00
d7f3d3a7cf 🔧 Modified Comic model 2024-04-16 22:41:33 -05:00
94cb95f4bf 📚 Changes to CV model 2024-04-14 00:25:41 -04:00
c6651cdd91 Merge pull request #9 from rishighan/qbittorrent-settings
🏗️ Added torrent attrs to comic model
2024-03-30 21:40:02 -04:00
b35e2140b5 🧲 Created a dedicated queue for torrent ops 2024-03-29 19:36:16 -04:00
f053dcb789 🧲 Massaging data to be sent to UI 2024-03-27 22:22:40 -05:00
aea7a24f76 🧲 Added a job for deleted torrents clean-up 2024-03-24 17:31:31 -04:00
8f0c2f4302 ⚙️ getAllSettings is parameterized 2024-03-12 16:39:44 -05:00
7dbe2b2701 🏗️ Added torrent attrs to comic model 2024-03-03 12:22:40 -05:00
5b9ef9fbbb Merge pull request #8 from rishighan/qbittorrent-settings
Miscellaneous Settings
2024-01-08 16:42:54 -05:00
4cdb11fcbd Cleaned the console.logs 2024-01-08 16:40:12 -05:00
78f7c1b595 🤐 Added uncompression event 2024-01-07 22:13:02 -05:00
bbd2906ebf 🏗️ Added some archive-related keys to Comic model 2024-01-06 11:17:40 -05:00
1861c2eeed Merge pull request #7 from rishighan/qbittorrent-settings
🌊 Modified settings model schema
2023-12-30 00:52:17 -05:00
f3965437b5 🏗 Added a job for full archive extraction 2023-12-30 00:50:06 -05:00
78e0e9f8ce 🏗️ Refactored the searchIssue method 2023-12-28 22:52:33 -05:00
c926758db6 🏗️ Added a downloads array to bittorent schema 2023-12-20 00:08:38 -05:00
b2b35aedc0 🏗️ Fixed a mongo update query 2023-11-27 02:14:16 -05:00
f35e3ccbe0 Removed useless code 2023-11-15 16:02:07 -06:00
7b0c0a7420 Added the importSingleIssue action 2023-11-15 15:59:27 -06:00
c2bbbf311d 🏗️ Fixed setQueueStatus 2023-11-14 13:24:49 -06:00
b8ca03220f 🏗 Implemented setQueueStatus 2023-11-13 22:01:01 -05:00
b87b0c875d 🏗️ Fleshed out resumeSession event 2023-11-13 21:18:19 -05:00
11fbaf10db 🏗 Wired up the events correctly 2023-11-13 16:41:58 -05:00
1229feb69c 🏗️ Refactor for zustand and tanstack react query support 2023-11-09 10:22:45 -06:00
3efdc7c2e2 ⚙️ Refactored saveSettings endpoint 2023-09-15 15:49:13 -04:00
1fff931941 🌊 Modified settings model schema 2023-09-13 22:09:25 -05:00
f4e2db5a5f 📦 Instruction for paths for unrar and p7zip 2023-09-01 09:44:02 -05:00
1d7561279b 📕 Updated local dev instructions in README 2023-09-01 09:22:02 -05:00
9e47ae0436 Merge pull request #6 from rishighan/migration-to-bullmq
🐂 Migration to moleculer-bullMQ
2023-08-30 13:50:47 -04:00
b1b1cb527b 🔧 Moved moleculer-bullmq to dependencies 2023-08-30 13:16:58 -04:00
cfa09691e8 🔧 Fixed an errant condition
This error was because I checked for active AND prioritized jobs in BullMQ, when none existed, because everything was active, and the socket.io event never fired, causing the browser to be in a bad state and never "resuming" an import even when one was in progress.
2023-08-30 12:21:43 -04:00
356b093db9 🔧 Fixed a dep 2023-08-30 00:08:05 -04:00
b4b83e5c75 🔧 Reworked the jobResults aggregation 2023-08-29 23:58:06 -04:00
c718456adc 🔧 jobResult aggregate query first draft 2023-08-28 23:56:44 -04:00
76d4e6b10f 🔢 Persisting the sessionId in the JobResult 2023-08-27 20:25:04 -04:00
bde548695c 🔧 Refactored the getJobResultStatistics method 2023-08-24 23:45:51 -04:00
fd4dd1d5c4 🥭 Aggregation for jobResult 2023-08-24 23:18:27 -04:00
5540bb16ec ⏱️ Added a timestamp to job results schema 2023-08-24 09:06:38 -05:00
6ee609f2b9 🔧 Refactor and added getJobCounts 2023-08-23 11:47:47 -05:00
8b584080e2 🐂 Queue controls 2023-08-22 22:07:51 -05:00
01975079e3 🐂 Queue drain event 2023-08-22 05:20:24 -04:00
fe9fbe9c3a 🔢 Getting job counts 2023-08-22 00:04:47 -04:00
df6652cce9 🐂 Queue pause/resume functionality 2023-08-21 17:55:08 -04:00
e5fc879b2d 🐂 Added some job counters 2023-08-18 11:39:18 -04:00
625447e7f1 🧊 Added shared Redis config 2023-08-14 22:15:19 -04:00
fdcf1f7d68 🧹 Linted code 2023-08-14 19:45:40 -04:00
4003f666cf 🔧 Tooling for resumable socket.io sessions 2023-07-27 11:09:26 -07:00
7b855f8cf1 🐂 BullMQ support code 2023-07-13 08:02:12 -07:00
007ce4b66f 🏗️ Applying the refactor patc 2023-07-05 23:14:46 -04:00
cb84e4893f 🐂 Migration to moleculer-bullMQ 2023-06-29 14:16:58 -04:00
795ac561c7 🔼 Updated deps 2023-04-19 09:14:22 -04:00
175e01dc2d Added elasticsearch dep 2023-04-09 15:53:36 -04:00
66e0a26c68 Merge pull request #4 from elgohr-update/master 2023-04-04 17:46:31 -04:00
Lars Gohr
959d248273 Updated elgohr/Publish-Docker-Github-Action to a supported version (v5) 2023-03-30 06:52:23 +02:00
745ec5d774 Merge pull request #3 from rishighan/mimetype-check
 MIMEtype check for comic book archives
2023-03-23 23:59:30 -04:00
34 changed files with 8303 additions and 4156 deletions

View File

@@ -2,6 +2,7 @@ node_modules
comics/*
userdata/*
npm-debug.log
logs/*
Dockerfile
.dockerignore
.git

View File

@@ -12,7 +12,7 @@ jobs:
steps:
- uses: actions/checkout@master
- name: Publish to Registry
uses: elgohr/Publish-Docker-Github-Action@master
uses: elgohr/Publish-Docker-Github-Action@v5
with:
name: frishi/threetwo-core-service
username: ${{ secrets.DOCKER_USERNAME }}

View File

@@ -1,3 +0,0 @@
{
"esversion": 10
}

356
CANONICAL_METADATA_GUIDE.md Normal file
View File

@@ -0,0 +1,356 @@
# Canonical Comic Metadata Model - Implementation Guide
## 🎯 Overview
The canonical metadata model provides a comprehensive system for managing comic book metadata from multiple sources with proper **provenance tracking**, **confidence scoring**, and **conflict resolution**.
## 🏗️ Architecture
### **Core Components:**
1. **📋 Type Definitions** ([`models/canonical-comic.types.ts`](models/canonical-comic.types.ts:1))
2. **🎯 GraphQL Schema** ([`models/graphql/canonical-typedef.ts`](models/graphql/canonical-typedef.ts:1))
3. **🔧 Resolution Engine** ([`utils/metadata-resolver.utils.ts`](utils/metadata-resolver.utils.ts:1))
4. **💾 Database Model** ([`models/canonical-comic.model.ts`](models/canonical-comic.model.ts:1))
5. **⚙️ Service Layer** ([`services/canonical-metadata.service.ts`](services/canonical-metadata.service.ts:1))
---
## 📊 Metadata Sources & Ranking
### **Source Priority (Highest to Lowest):**
```typescript
enum MetadataSourceRank {
USER_MANUAL = 1, // User overrides - highest priority
COMICINFO_XML = 2, // Embedded metadata - high trust
COMICVINE = 3, // ComicVine API - authoritative
METRON = 4, // Metron API - authoritative
GCD = 5, // Grand Comics Database - community
LOCG = 6, // League of Comic Geeks - specialized
LOCAL_FILE = 7 // Filename inference - lowest trust
}
```
### **Confidence Scoring:**
- **User Manual**: 1.0 (100% trusted)
- **ComicInfo.XML**: 0.8-0.95 (based on completeness)
- **ComicVine**: 0.9 (highly reliable API)
- **Metron**: 0.85 (reliable API)
- **GCD**: 0.8 (community-maintained)
- **Local File**: 0.3 (inference-based)
---
## 🔄 Usage Examples
### **1. Import ComicVine Metadata**
```typescript
// REST API
POST /api/canonicalMetadata/importComicVine/60f7b1234567890abcdef123
{
"comicVineData": {
"id": 142857,
"name": "Amazing Spider-Man #1",
"issue_number": "1",
"cover_date": "2023-01-01",
"volume": {
"id": 12345,
"name": "Amazing Spider-Man",
"start_year": 2023,
"publisher": { "name": "Marvel Comics" }
},
"person_credits": [
{ "name": "Dan Slott", "role": "writer" }
]
}
}
```
```typescript
// Service usage
const result = await broker.call('canonicalMetadata.importComicVineMetadata', {
comicId: '60f7b1234567890abcdef123',
comicVineData: comicVineData,
forceUpdate: false
});
```
### **2. Import ComicInfo.XML**
```typescript
POST /api/canonicalMetadata/importComicInfo/60f7b1234567890abcdef123
{
"xmlData": {
"Title": "Amazing Spider-Man",
"Series": "Amazing Spider-Man",
"Number": "1",
"Year": 2023,
"Month": 1,
"Writer": "Dan Slott",
"Penciller": "John Romita Jr",
"Publisher": "Marvel Comics"
}
}
```
### **3. Set Manual Metadata (Highest Priority)**
```typescript
PUT /api/canonicalMetadata/manual/60f7b1234567890abcdef123/title
{
"value": "The Amazing Spider-Man #1",
"confidence": 1.0,
"notes": "User corrected title formatting"
}
```
### **4. Resolve Metadata Conflicts**
```typescript
// Get conflicts
GET /api/canonicalMetadata/conflicts/60f7b1234567890abcdef123
// Resolve by selecting preferred source
POST /api/canonicalMetadata/resolve/60f7b1234567890abcdef123/title
{
"selectedSource": "COMICVINE"
}
```
### **5. Query with Source Filtering**
```graphql
query {
searchComicsByMetadata(
title: "Spider-Man"
sources: [COMICVINE, COMICINFO_XML]
minConfidence: 0.8
) {
resolvedMetadata {
title
series { name volume publisher }
creators { name role }
}
canonicalMetadata {
title {
value
source
confidence
timestamp
sourceUrl
}
}
}
}
```
---
## 🔧 Data Structure
### **Canonical Metadata Storage:**
```typescript
{
"canonicalMetadata": {
"title": [
{
"value": "Amazing Spider-Man #1",
"source": "COMICVINE",
"confidence": 0.9,
"rank": 3,
"timestamp": "2023-01-15T10:00:00Z",
"sourceId": "142857",
"sourceUrl": "https://comicvine.gamespot.com/issue/4000-142857/"
},
{
"value": "Amazing Spider-Man",
"source": "COMICINFO_XML",
"confidence": 0.8,
"rank": 2,
"timestamp": "2023-01-15T09:00:00Z"
}
],
"creators": [
{
"value": [
{ "name": "Dan Slott", "role": "Writer" },
{ "name": "John Romita Jr", "role": "Penciller" }
],
"source": "COMICINFO_XML",
"confidence": 0.85,
"rank": 2,
"timestamp": "2023-01-15T09:00:00Z"
}
]
}
}
```
### **Resolved Metadata (Best Values):**
```typescript
{
"resolvedMetadata": {
"title": "Amazing Spider-Man #1", // From ComicVine (higher confidence)
"series": {
"name": "Amazing Spider-Man",
"volume": 1,
"publisher": "Marvel Comics"
},
"creators": [
{ "name": "Dan Slott", "role": "Writer" },
{ "name": "John Romita Jr", "role": "Penciller" }
],
"lastResolved": "2023-01-15T10:30:00Z",
"resolutionConflicts": [
{
"field": "title",
"conflictingValues": [
{ "value": "Amazing Spider-Man #1", "source": "COMICVINE", "confidence": 0.9 },
{ "value": "Amazing Spider-Man", "source": "COMICINFO_XML", "confidence": 0.8 }
]
}
]
}
}
```
---
## ⚙️ Resolution Strategies
### **Available Strategies:**
```typescript
const strategies = {
// Use source with highest confidence score
highest_confidence: { strategy: 'highest_confidence' },
// Use source with highest rank (USER_MANUAL > COMICINFO_XML > COMICVINE...)
highest_rank: { strategy: 'highest_rank' },
// Use most recently added metadata
most_recent: { strategy: 'most_recent' },
// Prefer user manual entries
user_preference: { strategy: 'user_preference' },
// Attempt to find consensus among sources
consensus: { strategy: 'consensus' }
};
```
### **Custom Strategy:**
```typescript
const customStrategy: MetadataResolutionStrategy = {
strategy: 'highest_rank',
minimumConfidence: 0.7,
allowedSources: [MetadataSource.COMICVINE, MetadataSource.COMICINFO_XML],
fieldSpecificStrategies: {
'creators': { strategy: 'consensus' }, // Merge creators from multiple sources
'title': { strategy: 'highest_confidence' } // Use most confident title
}
};
```
---
## 🚀 Integration Workflow
### **1. Local File Import Process:**
```typescript
// 1. Extract file metadata
const localMetadata = extractLocalMetadata(filePath);
comic.addMetadata('title', inferredTitle, MetadataSource.LOCAL_FILE, 0.3);
// 2. Parse ComicInfo.XML (if exists)
if (comicInfoXML) {
await broker.call('canonicalMetadata.importComicInfoXML', {
comicId: comic._id,
xmlData: comicInfoXML
});
}
// 3. Enhance with external APIs
const comicVineMatch = await searchComicVine(comic.resolvedMetadata.title);
if (comicVineMatch) {
await broker.call('canonicalMetadata.importComicVineMetadata', {
comicId: comic._id,
comicVineData: comicVineMatch
});
}
// 4. Resolve final metadata
await broker.call('canonicalMetadata.reResolveMetadata', {
comicId: comic._id
});
```
### **2. Conflict Resolution Workflow:**
```typescript
// 1. Detect conflicts
const conflicts = await broker.call('canonicalMetadata.getMetadataConflicts', {
comicId: comic._id
});
// 2. Present to user for resolution
if (conflicts.length > 0) {
// Show UI with conflicting values and sources
const userChoice = await presentConflictResolution(conflicts);
// 3. Apply user's resolution
await broker.call('canonicalMetadata.resolveMetadataConflict', {
comicId: comic._id,
field: userChoice.field,
selectedSource: userChoice.source
});
}
```
---
## 📈 Performance Considerations
### **Database Indexes:**
-**Text search**: `resolvedMetadata.title`, `resolvedMetadata.series.name`
-**Unique identification**: `series.name` + `volume` + `issueNumber`
-**Source filtering**: `canonicalMetadata.*.source` + `confidence`
-**Import status**: `importStatus.isImported` + `tagged`
### **Optimization Tips:**
- **Batch metadata imports** for large collections
- **Cache resolved metadata** for frequently accessed comics
- **Index on confidence scores** for quality filtering
- **Paginate conflict resolution** for large libraries
---
## 🛡️ Best Practices
### **Data Quality:**
1. **Always validate** external API responses before import
2. **Set appropriate confidence** scores based on source reliability
3. **Preserve original data** in source-specific fields
4. **Log metadata changes** for audit trails
### **Conflict Management:**
1. **Prefer user overrides** for disputed fields
2. **Use consensus** for aggregatable fields (creators, characters)
3. **Maintain provenance** links to original sources
4. **Provide clear UI** for conflict resolution
### **Performance:**
1. **Re-resolve metadata** only when sources change
2. **Cache frequently accessed** resolved metadata
3. **Batch operations** for bulk imports
4. **Use appropriate indexes** for common queries
---
This canonical metadata model provides enterprise-grade metadata management with full provenance tracking, confidence scoring, and flexible conflict resolution for comic book collections of any size.

View File

@@ -1,40 +1,104 @@
FROM alpine:3.14
# Use a non-ARM image (x86_64) for Node.js
FROM --platform=linux/amd64 node:21-alpine3.18 AS builder
# Set metadata for contact
LABEL maintainer="Rishi Ghan <rishi.ghan@gmail.com>"
# Show all node logs
ENV NPM_CONFIG_LOGLEVEL warn
# Set environment variables
ENV NPM_CONFIG_LOGLEVEL=warn
ENV NODE_ENV=production
# Set the working directory
WORKDIR /core-services
RUN apk add --update \
--repository http://nl.alpinelinux.org/alpine/v3.14/main \
vips-tools \
# Install required dependencies using apk
RUN apk update && apk add --no-cache \
bash \
wget \
imagemagick \
python3 \
unrar \
p7zip \
nodejs \
npm \
xvfb \
xz
build-base \
g++ \
python3-dev \
p7zip \
curl \
git \
glib \
cairo-dev \
pango-dev \
icu-dev \
pkgconfig
# Install libvips from source
RUN wget https://github.com/libvips/libvips/releases/download/v8.13.0/vips-8.13.0.tar.gz \
&& tar -zxvf vips-8.13.0.tar.gz \
&& cd vips-8.13.0 \
&& ./configure --disable-python \
&& make -j$(nproc) \
&& make install \
&& cd .. \
&& rm -rf vips-8.13.0.tar.gz vips-8.13.0
# Install unrar directly from RARLAB
RUN wget https://www.rarlab.com/rar/rarlinux-x64-621.tar.gz \
&& tar -zxvf rarlinux-x64-621.tar.gz \
&& cp rar/unrar /usr/bin/ \
&& rm -rf rarlinux-x64-621.tar.gz rar
# Verify Node.js installation
RUN node -v && npm -v
# Copy application configuration files
COPY package.json package-lock.json ./
COPY moleculer.config.ts ./
COPY tsconfig.json ./
RUN npm i
# Install Dependncies
# Install application dependencies
RUN npm install
# Install sharp with proper platform configuration
RUN npm install --force sharp --platform=linux/amd64
# Install global dependencies
RUN npm install -g typescript ts-node
# Copy the rest of the application files (e.g., source code)
COPY . .
# Build and cleanup
RUN npm run build \
&& npm prune
# Build the app
RUN npm run build
# Final image
FROM --platform=linux/amd64 node:21-alpine3.18
# Set environment variables
ENV NODE_ENV=production
# Set the working directory
WORKDIR /core-services
# Install runtime dependencies
RUN apk update && apk add --no-cache \
bash \
wget \
imagemagick \
python3 \
xvfb \
p7zip \
curl \
git \
glib \
cairo-dev \
pango-dev \
icu-dev \
pkgconfig
# Copy necessary files from the builder image
COPY --from=builder /core-services /core-services
# Expose the application's port
EXPOSE 3000
# Start server
CMD ["npm", "start"]
# Command to run the application (this will now work)
CMD ["npm", "start"]

View File

@@ -0,0 +1,423 @@
# Moleculer Microservices Dependency Analysis
**ThreeTwo Core Service - Comic Book Library Management System**
## System Overview
This **ThreeTwo Core Service** is a sophisticated **comic book library management system** built on Moleculer microservices architecture. The system demonstrates advanced patterns including:
- **Event-driven architecture** with real-time WebSocket communication
- **Asynchronous job processing** with BullMQ for heavy operations
- **Multi-source metadata aggregation** with canonical data resolution
- **Hybrid search** combining MongoDB aggregation and ElasticSearch
- **External system integrations** (P2P, BitTorrent, Comic APIs)
### Technical Stack
- **Framework**: Moleculer.js microservices
- **Node ID**: `threetwo-core-service`
- **Transport**: Redis (`redis://localhost:6379`)
- **Databases**: MongoDB + ElasticSearch
- **Queue System**: BullMQ (Redis-backed)
- **Real-time**: Socket.IO with Redis adapter
- **External APIs**: ComicVine, AirDC++, qBittorrent
## Service Architecture
### Core Services
| Service | File | Role | Dependencies |
|---------|------|------|-------------|
| **API** | [`api.service.ts`](services/api.service.ts) | API Gateway + File System Watcher | → library, jobqueue |
| **Library** | [`library.service.ts`](services/library.service.ts) | Core Comic Library Management | → jobqueue, search, comicvine |
| **JobQueue** | [`jobqueue.service.ts`](services/jobqueue.service.ts) | Asynchronous Job Processing (BullMQ) | → library, socket |
| **Socket** | [`socket.service.ts`](services/socket.service.ts) | Real-time Communication (Socket.IO) | → library, jobqueue |
| **Search** | [`search.service.ts`](services/search.service.ts) | ElasticSearch Integration | ElasticSearch client |
| **GraphQL** | [`graphql.service.ts`](services/graphql.service.ts) | GraphQL API Layer | → search |
### Supporting Services
| Service | File | Role | Dependencies |
|---------|------|------|-------------|
| **AirDC++** | [`airdcpp.service.ts`](services/airdcpp.service.ts) | P2P File Sharing Integration | External AirDC++ client |
| **Settings** | [`settings.service.ts`](services/settings.service.ts) | Configuration Management | MongoDB |
| **Image Transform** | [`imagetransformation.service.ts`](services/imagetransformation.service.ts) | Cover Processing | File system |
| **OPDS** | [`opds.service.ts`](services/opds.service.ts) | Comic Catalog Feeds | File system |
| **Torrent Jobs** | [`torrentjobs.service.ts`](services/torrentjobs.service.ts) | BitTorrent Integration | → library, qbittorrent |
## Service-to-Service Dependencies
### Core Service Interactions
#### 1. API Service → Other Services
```typescript
// File system watcher triggers import
ctx.broker.call("library.walkFolders", { basePathToWalk: filePath })
ctx.broker.call("importqueue.processImport", { fileObject })
```
#### 2. Library Service → Dependencies
```typescript
// Job queue integration
this.broker.call("jobqueue.enqueue", { action: "enqueue.async" })
// Search operations
ctx.broker.call("search.searchComic", { elasticSearchQueries })
ctx.broker.call("search.deleteElasticSearchIndices", {})
// External metadata
ctx.broker.call("comicvine.getVolumes", { volumeURI })
```
#### 3. JobQueue Service → Dependencies
```typescript
// Import processing
this.broker.call("library.importFromJob", { importType, payload })
// Real-time updates
this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_COVER_EXTRACTED",
args: [{ completedJobCount, importResult }]
})
```
#### 4. Socket Service → Dependencies
```typescript
// Job management
ctx.broker.call("jobqueue.getJobCountsByType", {})
ctx.broker.call("jobqueue.toggle", { action: queueAction })
// Download tracking
ctx.call("library.applyAirDCPPDownloadMetadata", {
bundleId, comicObjectId, name, size, type
})
```
#### 5. GraphQL Service → Search
```typescript
// Wanted comics query
const result = await ctx.broker.call("search.issue", {
query: eSQuery,
pagination: { size: limit, from: offset },
type: "wanted"
})
```
## API Endpoint Mapping
### REST API Routes (`/api/*`)
#### Library Management
- `POST /api/library/walkFolders` → [`library.walkFolders`](services/library.service.ts:82)
- `POST /api/library/newImport` → [`library.newImport`](services/library.service.ts:165) → [`jobqueue.enqueue`](services/library.service.ts:219)
- `POST /api/library/getComicBooks` → [`library.getComicBooks`](services/library.service.ts:535)
- `POST /api/library/getComicBookById` → [`library.getComicBookById`](services/library.service.ts:550)
- `POST /api/library/flushDB` → [`library.flushDB`](services/library.service.ts:818) → [`search.deleteElasticSearchIndices`](services/library.service.ts:839)
- `GET /api/library/libraryStatistics` → [`library.libraryStatistics`](services/library.service.ts:684)
#### Job Management
- `GET /api/jobqueue/getJobCountsByType` → [`jobqueue.getJobCountsByType`](services/jobqueue.service.ts:31)
- `GET /api/jobqueue/toggle` → [`jobqueue.toggle`](services/jobqueue.service.ts:38)
- `GET /api/jobqueue/getJobResultStatistics` → [`jobqueue.getJobResultStatistics`](services/jobqueue.service.ts:214)
#### Search Operations
- `POST /api/search/searchComic` → [`search.searchComic`](services/search.service.ts:28)
- `POST /api/search/searchIssue` → [`search.issue`](services/search.service.ts:60)
- `GET /api/search/deleteElasticSearchIndices` → [`search.deleteElasticSearchIndices`](services/search.service.ts:171)
#### AirDC++ Integration
- `POST /api/airdcpp/initialize` → [`airdcpp.initialize`](services/airdcpp.service.ts:24)
- `POST /api/airdcpp/getHubs` → [`airdcpp.getHubs`](services/airdcpp.service.ts:59)
- `POST /api/airdcpp/search` → [`airdcpp.search`](services/airdcpp.service.ts:96)
#### Image Processing
- `POST /api/imagetransformation/resizeImage` → [`imagetransformation.resize`](services/imagetransformation.service.ts:37)
- `POST /api/imagetransformation/analyze` → [`imagetransformation.analyze`](services/imagetransformation.service.ts:57)
### GraphQL Endpoints
- `POST /graphql` → [`graphql.wantedComics`](services/graphql.service.ts:49) → [`search.issue`](services/graphql.service.ts:77)
### Static File Serving
- `/userdata/*` → Static files from `./userdata`
- `/comics/*` → Static files from `./comics`
- `/logs/*` → Static files from `logs`
## Event-Driven Communication
### Job Queue Events
#### Job Completion Events
```typescript
// Successful import completion
"enqueue.async.completed" socket.broadcast("LS_COVER_EXTRACTED", {
completedJobCount,
importResult: job.returnvalue.data.importResult
})
// Failed import handling
"enqueue.async.failed" socket.broadcast("LS_COVER_EXTRACTION_FAILED", {
failedJobCount,
importResult: job
})
// Queue drained
"drained" socket.broadcast("LS_IMPORT_QUEUE_DRAINED", {
message: "drained"
})
```
#### Archive Processing Events
```typescript
// Archive uncompression completed
"uncompressFullArchive.async.completed" socket.broadcast("LS_UNCOMPRESSION_JOB_COMPLETE", {
uncompressedArchive: job.returnvalue
})
```
### File System Events
```typescript
// File watcher events (debounced 200ms)
fileWatcher.on("add", (path, stats) {
broker.call("library.walkFolders", { basePathToWalk: filePath })
broker.call("importqueue.processImport", { fileObject })
broker.broadcast(event, { path: filePath })
})
```
### WebSocket Events
#### Real-time Search
```typescript
// Search initiation
socket.emit("searchInitiated", { instance })
// Live search results
socket.emit("searchResultAdded", groupedResult)
socket.emit("searchResultUpdated", updatedResult)
socket.emit("searchComplete", { message })
```
#### Download Progress
```typescript
// Download status
broker.emit("downloadCompleted", bundleDBImportResult)
broker.emit("downloadError", error.message)
// Progress tracking
socket.emit("downloadTick", data)
```
## Data Flow Architecture
### 1. Comic Import Processing Flow
```mermaid
graph TD
A[File System Watcher] --> B[library.walkFolders]
B --> C[jobqueue.enqueue]
C --> D[jobqueue.enqueue.async]
D --> E[Archive Extraction]
E --> F[Metadata Processing]
F --> G[Canonical Metadata Creation]
G --> H[library.importFromJob]
H --> I[MongoDB Storage]
I --> J[ElasticSearch Indexing]
J --> K[socket.broadcast LS_COVER_EXTRACTED]
```
### 2. Search & Discovery Flow
```mermaid
graph TD
A[GraphQL/REST Query] --> B[search.issue]
B --> C[ElasticSearch Query]
C --> D[Results Enhancement]
D --> E[Metadata Scoring]
E --> F[Structured Response]
```
### 3. Download Management Flow
```mermaid
graph TD
A[socket[search]] --> B[airdcpp.search]
B --> C[Real-time Results]
C --> D[socket[download]]
D --> E[library.applyAirDCPPDownloadMetadata]
E --> F[Progress Tracking]
F --> G[Import Pipeline]
```
## Database Dependencies
### MongoDB Collections
| Collection | Model | Used By Services |
|------------|-------|-----------------|
| **comics** | [`Comic`](models/comic.model.ts) | library, search, jobqueue, imagetransformation |
| **settings** | [`Settings`](models/settings.model.ts) | settings |
| **sessions** | [`Session`](models/session.model.ts) | socket |
| **jobresults** | [`JobResult`](models/jobresult.model.ts) | jobqueue |
### ElasticSearch Integration
- **Index**: `comics` - Full-text search with metadata scoring
- **Client**: [`eSClient`](services/search.service.ts:13) from [`comic.model.ts`](models/comic.model.ts)
- **Query Types**: match_all, multi_match, bool queries with field boosting
### Redis Usage
| Purpose | Services | Configuration |
|---------|----------|---------------|
| **Transport** | All services | [`moleculer.config.ts:93`](moleculer.config.ts:93) |
| **Job Queue** | jobqueue | [`jobqueue.service.ts:27`](services/jobqueue.service.ts:27) |
| **Socket.IO Adapter** | socket | [`socket.service.ts:48`](services/socket.service.ts:48) |
| **Job Counters** | jobqueue | [`completedJobCount`](services/jobqueue.service.ts:392), [`failedJobCount`](services/jobqueue.service.ts:422) |
## External System Integrations
### AirDC++ (P2P File Sharing)
```typescript
// Integration wrapper
const ADCPPSocket = new AirDCPPSocket(config)
await ADCPPSocket.connect()
// Search operations
const searchInstance = await ADCPPSocket.post("search")
const searchInfo = await ADCPPSocket.post(`search/${searchInstance.id}/hub_search`, query)
// Download management
const downloadResult = await ADCPPSocket.post(`search/${searchInstanceId}/results/${resultId}/download`)
```
### ComicVine API
```typescript
// Metadata enrichment
const volumeDetails = await this.broker.call("comicvine.getVolumes", {
volumeURI: matchedResult.volume.api_detail_url
})
```
### qBittorrent Client
```typescript
// Torrent monitoring
const torrents = await this.broker.call("qbittorrent.getTorrentRealTimeStats", { infoHashes })
```
## Metadata Management System
### Multi-Source Metadata Aggregation
The system implements sophisticated metadata management with source prioritization:
#### Source Priority Order
1. **ComicInfo.xml** (embedded in archives)
2. **ComicVine API** (external database)
3. **Metron** (comic database)
4. **Grand Comics Database (GCD)**
5. **League of Comic Geeks (LOCG)**
6. **Filename Inference** (fallback)
#### Canonical Metadata Structure
```typescript
const canonical = {
title: findBestValue('title', inferredMetadata.title),
series: {
name: findSeriesValue(['series', 'seriesName', 'name'], inferredMetadata.series),
volume: findBestValue('volume', inferredMetadata.volume || 1),
startYear: findBestValue('startYear', inferredMetadata.issue?.year)
},
issueNumber: findBestValue('issueNumber', inferredMetadata.issue?.number),
publisher: findBestValue('publisher', null),
creators: [], // Combined from all sources
completeness: {
score: calculatedScore,
missingFields: [],
lastCalculated: currentTime
}
}
```
## Performance & Scalability Insights
### Asynchronous Processing
- **Heavy Operations**: Comic import, archive extraction, metadata processing
- **Queue System**: BullMQ with Redis backing for reliability
- **Job Types**: Import processing, archive extraction, torrent monitoring
- **Real-time Updates**: WebSocket progress notifications
### Search Optimization
- **Dual Storage**: MongoDB (transactional) + ElasticSearch (search)
- **Metadata Scoring**: Canonical metadata with source priority
- **Query Types**: Full-text, field-specific, boolean combinations
- **Caching**: Moleculer built-in memory caching
### External Integration Resilience
- **Timeout Handling**: Custom timeouts for long-running operations
- **Error Propagation**: Structured error responses with context
- **Connection Management**: Reusable connections for external APIs
- **Retry Logic**: Built-in retry policies for failed operations
## Critical Dependency Patterns
### 1. Service Chain Dependencies
- **Import Pipeline**: api → library → jobqueue → socket
- **Search Pipeline**: graphql → search → ElasticSearch
- **Download Pipeline**: socket → airdcpp → library
### 2. Circular Dependencies (Managed)
- **socket ←→ library**: Download coordination and progress updates
- **jobqueue ←→ socket**: Job progress notifications and queue control
### 3. Shared Resource Dependencies
- **MongoDB**: library, search, jobqueue, settings services
- **Redis**: All services (transport) + jobqueue (BullMQ) + socket (adapter)
- **ElasticSearch**: search, graphql services
## Architecture Strengths
### 1. Separation of Concerns
- **API Gateway**: Pure routing and file serving
- **Business Logic**: Centralized in library service
- **Data Access**: Abstracted through DbMixin
- **External Integration**: Isolated in dedicated services
### 2. Event-Driven Design
- **File System Events**: Automatic import triggering
- **Job Lifecycle Events**: Progress tracking and error handling
- **Real-time Communication**: WebSocket event broadcasting
### 3. Robust Metadata Management
- **Multi-Source Aggregation**: ComicVine, ComicInfo.xml, filename inference
- **Canonical Resolution**: Smart metadata merging with source attribution
- **User Curation Support**: Framework for manual metadata override
### 4. Scalability Features
- **Microservices Architecture**: Independent service scaling
- **Asynchronous Processing**: Heavy operations don't block API responses
- **Redis Transport**: Distributed service communication
- **Job Queue**: Reliable background processing with retry logic
## Potential Areas for Improvement
### 1. Service Coupling
- **High Interdependence**: library ←→ jobqueue ←→ socket tight coupling
- **Recommendation**: Event-driven decoupling for some operations
### 2. Error Handling
- **Inconsistent Patterns**: Mix of raw errors and MoleculerError usage
- **Recommendation**: Standardized error handling middleware
### 3. Configuration Management
- **Environment Variables**: Direct access vs centralized configuration
- **Recommendation**: Enhanced settings service for runtime configuration
### 4. Testing Strategy
- **Integration Testing**: Complex service interactions need comprehensive testing
- **Recommendation**: Contract testing between services
## Summary
This Moleculer-based architecture demonstrates sophisticated microservices patterns with:
- **11 specialized services** with clear boundaries
- **47 REST endpoints** + GraphQL layer
- **3 WebSocket namespaces** for real-time communication
- **Multi-database architecture** (MongoDB + ElasticSearch)
- **Advanced job processing** with BullMQ
- **External system integration** (P2P, BitTorrent, Comic APIs)
The system successfully manages complex domain requirements while maintaining good separation of concerns and providing excellent user experience through real-time updates and comprehensive metadata management.

189
README.md
View File

@@ -1,24 +1,175 @@
# threetwo-core-service
# ThreeTwo Core Service
This [moleculer-based](https://github.com/moleculerjs/moleculer-web) microservice houses endpoints for the following functions:
**A comprehensive comic book library management system** built as a high-performance Moleculer microservices architecture. ThreeTwo automatically processes comic archives (CBR, CBZ, CB7), extracts metadata, generates thumbnails, and provides powerful search and real-time synchronization capabilities.
1. Local import of a comic library into mongo (currently supports `cbr` and `cbz` files)
2. Metadata extraction from file, `comicinfo.xml`
3. Mongo comic object orchestration
4. CRUD operations on `Comic` model
5. Helper utils to help with image metadata extraction, file operations and more.
## 🎯 What This Service Does
## Local Development
ThreeTwo transforms chaotic comic book collections into intelligently organized, searchable digital libraries by:
1. ~~You need `calibre` in your local path.
On `macOS` you can `brew install calibre` and make sure that `ebook-meta` is present on the path~~ Calibre is no longer required as a dependency. Ignore this step.
2. You need `mongo` for the data store. on `macOS` you can use [these instructions](https://docs.mongodb.com/manual/tutorial/install-mongodb-on-os-x/) to install it
3. Clone this repo
4. Run `npm i`
5. Assuming you installed mongo correctly, run `MONGO_URI=mongodb://localhost:27017/threetwo npm run dev` to start the service
6. You should see the service spin up and a list of all the endpoints in the terminal
7. The service can be accessed through `http://localhost:3000/api/import/*`
## Docker Instructions
- **📚 Automated Library Management** - Monitors directories and automatically imports new comics
- **🧠 Intelligent Metadata Extraction** - Parses ComicInfo.XML and enriches data from external APIs (ComicVine)
- **🔍 Advanced Search** - ElasticSearch-powered multi-field search with confidence scoring
- **📱 Real-time Updates** - Live progress tracking and notifications via Socket.IO
- **🎨 Media Processing** - Automatic thumbnail generation and image optimization
1. Build the image using `docker build . -t frishi/threetwo-import-service`. Give it a hot minute.
2. Run it using `docker run -it frishi/threetwo-import-service`
## 🏗️ Architecture
Built on **Moleculer microservices** with the following core services:
```
API Gateway (REST) ←→ GraphQL API ←→ Socket.IO Hub
Library Service ←→ Search Service ←→ Job Queue Service
MongoDB ←→ Elasticsearch ←→ Redis (Cache/Queue)
```
### **Key Features:**
- **Multi-format Support** - CBR, CBZ, CB7 archive processing
- **Confidence Tracking** - Metadata quality assessment and provenance
- **Job Queue System** - Background processing with BullMQ and Redis
- **Debounced File Watching** - Efficient file system monitoring
- **Batch Operations** - Scalable bulk import handling
- **Real-time Sync** - Live updates across all connected clients
## 🚀 API Interfaces
- **REST API** - `http://localhost:3000/api/` - Traditional HTTP endpoints
- **GraphQL API** - `http://localhost:4000/graphql` - Modern query interface
- **Socket.IO** - Real-time events and progress tracking
- **Static Assets** - Direct access to comic covers and images
## 🛠️ Technology Stack
- **Backend**: Moleculer, Node.js, TypeScript
- **Database**: MongoDB (persistence), Elasticsearch (search), Redis (cache/queue)
- **Processing**: BullMQ (job queues), Sharp (image processing)
- **Communication**: Socket.IO (real-time), GraphQL + REST APIs
## 📋 Prerequisites
You need the following dependencies installed:
- **MongoDB** - Document database for comic metadata
- **Elasticsearch** - Full-text search and analytics
- **Redis** - Caching and job queue backend
- **System Binaries**: `unrar` and `p7zip` for archive extraction
## 🚀 Local Development
1. **Clone and Install**
```bash
git clone <repository-url>
cd threetwo-core-service
npm install
```
2. **Environment Setup**
```bash
COMICS_DIRECTORY=<PATH_TO_COMICS_DIRECTORY> \
USERDATA_DIRECTORY=<PATH_TO_USERDATA_DIRECTORY> \
REDIS_URI=redis://<REDIS_HOST:REDIS_PORT> \
ELASTICSEARCH_URI=<ELASTICSEARCH_HOST:ELASTICSEARCH_PORT> \
MONGO_URI=mongodb://<MONGO_HOST:MONGO_PORT>/threetwo \
UNRAR_BIN_PATH=<UNRAR_BIN_PATH> \
SEVENZ_BINARY_PATH=<SEVENZ_BINARY_PATH> \
npm run dev
```
3. **Service Access**
- **Main API**: `http://localhost:3000/api/<serviceName>/*`
- **GraphQL Playground**: `http://localhost:4000/graphql`
- **Admin Interface**: `http://localhost:3000/` (Moleculer dashboard)
## 🐳 Docker Deployment
```bash
# Build the image
docker build . -t threetwo-core-service
# Run with docker-compose (recommended)
docker-compose up -d
# Or run standalone
docker run -it threetwo-core-service
```
## 📊 Performance Features
- **Smart Debouncing** - 200ms file system event debouncing prevents overload
- **Batch Processing** - Efficient handling of bulk import operations
- **Multi-level Caching** - Memory + Redis caching for optimal performance
- **Job Queues** - Background processing prevents UI blocking
- **Connection Pooling** - Efficient database connection management
## 🔧 Core Services
| Service | Purpose | Key Features |
|---------|---------|--------------|
| **API Gateway** | REST endpoints + file watching | CORS, rate limiting, static serving |
| **GraphQL** | Modern query interface | Flexible queries, pagination |
| **Library** | Core CRUD operations | Comic management, metadata handling |
| **Search** | ElasticSearch integration | Multi-field search, aggregations |
| **Job Queue** | Background processing | Import jobs, progress tracking |
| **Socket** | Real-time communication | Live updates, session management |
## 📈 Use Cases
- **Personal Collections** - Organize digital comic libraries (hundreds to thousands)
- **Digital Libraries** - Professional-grade comic archive management
- **Developer Integration** - API access for custom comic applications
- **Bulk Processing** - Large-scale comic digitization projects
## 🛡️ Security & Reliability
- **Input Validation** - Comprehensive parameter validation
- **File Type Verification** - Magic number verification for security
- **Error Handling** - Graceful degradation and recovery
- **Health Monitoring** - Service health checks and diagnostics
## 🧩 Recent Enhancements
### Canonical Metadata System
A comprehensive **canonical metadata model** with full provenance tracking has been implemented to unify metadata from multiple sources:
- **Multi-Source Integration**: ComicVine, Metron, GCD, ComicInfo.XML, local files, and user manual entries
- **Source Ranking System**: Prioritized confidence scoring with USER_MANUAL (1) → COMICINFO_XML (2) → COMICVINE (3) → METRON (4) → GCD (5) → LOCG (6) → LOCAL_FILE (7)
- **Conflict Resolution**: Automatic metadata merging with confidence scoring and source attribution
- **Performance Optimized**: Proper indexing, batch processing, and caching strategies
### Complete Service Architecture Analysis
Comprehensive analysis of all **12 Moleculer services** with detailed endpoint documentation:
| Service | Endpoints | Primary Function |
|---------|-----------|------------------|
| [`api`](services/api.service.ts:1) | Gateway | REST API + file watching with 200ms debouncing |
| [`library`](services/library.service.ts:1) | 21 endpoints | Core CRUD operations and metadata management |
| [`search`](services/search.service.ts:1) | 8 endpoints | Elasticsearch integration and multi-search |
| [`jobqueue`](services/jobqueue.service.ts:1) | Queue mgmt | BullMQ job processing with Redis backend |
| [`graphql`](services/graphql.service.ts:1) | GraphQL API | Modern query interface with resolvers |
| [`socket`](services/socket.service.ts:1) | Real-time | Socket.IO communication with session management |
| [`canonicalMetadata`](services/canonical-metadata.service.ts:1) | 6 endpoints | **NEW**: Metadata provenance and conflict resolution |
| `airdcpp` | Integration | AirDC++ connectivity for P2P operations |
| `imagetransformation` | Processing | Image optimization and thumbnail generation |
| `opds` | Protocol | Open Publication Distribution System support |
| `settings` | Configuration | System-wide configuration management |
| `torrentjobs` | Downloads | Torrent-based comic acquisition |
### Performance Optimizations Identified
- **Debouncing**: 200ms file system event debouncing prevents overload
- **Job Queues**: Background processing with BullMQ prevents UI blocking
- **Caching Strategy**: Multi-level caching (Memory + Redis) for optimal performance
- **Batch Operations**: Efficient bulk import handling with pagination
- **Index Optimization**: MongoDB compound indexes for metadata queries
### Files Created
- [`models/canonical-comic.types.ts`](models/canonical-comic.types.ts:1) - TypeScript definitions for canonical metadata
- [`utils/metadata-resolver.utils.ts`](utils/metadata-resolver.utils.ts:1) - Conflict resolution and confidence scoring
- [`models/canonical-comic.model.ts`](models/canonical-comic.model.ts:1) - Mongoose schema with performance indexes
- [`services/canonical-metadata.service.ts`](services/canonical-metadata.service.ts:1) - REST endpoints for metadata import
- [`models/graphql/canonical-typedef.ts`](models/graphql/canonical-typedef.ts:1) - GraphQL schema with backward compatibility
- [`CANONICAL_METADATA_GUIDE.md`](CANONICAL_METADATA_GUIDE.md:1) - Complete implementation guide
---
**ThreeTwo Core Service** provides enterprise-grade comic book library management with modern microservices architecture, real-time capabilities, and intelligent automation.

10
config/redis.config.ts Normal file
View File

@@ -0,0 +1,10 @@
import { createClient } from "redis";
const redisURL = new URL(process.env.REDIS_URI);
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
(async () => {
await pubClient.connect();
})();
const subClient = pubClient.duplicate();
export { subClient, pubClient };

View File

@@ -0,0 +1,103 @@
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
networks:
- kafka-net
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
networks:
- kafka-net
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8087:8080
environment:
DYNAMIC_CONFIG_ENABLED: true
volumes:
- /Users/rishi/work/config/kafka-ui/config.yml:/etc/kafkaui/dynamic_config.yaml
depends_on:
- kafka1
- zoo1
networks:
- kafka-net
db:
image: "mongo:latest"
container_name: database
networks:
- kafka-net
ports:
- "27017:27017"
volumes:
- "mongodb_data:/bitnami/mongodb"
redis:
image: "bitnami/redis:latest"
container_name: queue
environment:
ALLOW_EMPTY_PASSWORD: "yes"
networks:
- kafka-net
ports:
- "6379:6379"
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.2
container_name: elasticsearch
environment:
- "discovery.type=single-node"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- "xpack.security.enabled=true"
- "xpack.security.authc.api_key.enabled=true"
- "ELASTIC_PASSWORD=password"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- "9200:9200"
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
volumes:
mongodb_data:
driver: local
elasticsearch:
driver: local

47
graphql-server.ts Normal file
View File

@@ -0,0 +1,47 @@
import express from "express";
import { ApolloServer } from "@apollo/server";
import { expressMiddleware } from "@as-integrations/express4";
import { typeDefs } from "./models/graphql/typedef";
import { resolvers } from "./models/graphql/resolvers";
import { ServiceBroker } from "moleculer";
import cors from "cors";
// Boot Moleculer broker in parallel
const broker = new ServiceBroker({ transporter: null }); // or your actual transporter config
async function startGraphQLServer() {
const app = express();
const apollo = new ApolloServer({
typeDefs,
resolvers,
});
await apollo.start();
app.use(
"/graphql",
cors(),
express.json(),
expressMiddleware(apollo, {
context: async ({ req }) => ({
authToken: req.headers.authorization || null,
broker,
}),
})
);
const PORT = 4000;
app.listen(PORT, () =>
console.log(`🚀 GraphQL server running at http://localhost:${PORT}/graphql`)
);
}
async function bootstrap() {
await broker.start(); // make sure Moleculer is up
await startGraphQLServer();
}
bootstrap().catch((err) => {
console.error("❌ Failed to start GraphQL server:", err);
process.exit(1);
});

View File

@@ -1,8 +1,7 @@
const paginate = require("mongoose-paginate-v2");
const { Client } = require("@elastic/elasticsearch");
import ComicVineMetadataSchema from "./comicvine.metadata.model";
import { mongoosastic } from "mongoosastic-ts";
const mongoose = require("mongoose")
const mongoose = require("mongoose");
import {
MongoosasticDocument,
MongoosasticModel,
@@ -28,6 +27,10 @@ const RawFileDetailsSchema = mongoose.Schema({
mimeType: String,
containedIn: String,
pageCount: Number,
archive: {
uncompressed: Boolean,
expandedPath: String,
},
cover: {
filePath: String,
stats: Object,
@@ -51,7 +54,38 @@ const DirectConnectBundleSchema = mongoose.Schema({
name: String,
size: String,
type: {},
_id: false,
});
const wantedSchema = mongoose.Schema(
{
source: { type: String, default: null },
markEntireVolumeWanted: Boolean,
issues: {
type: [
{
_id: false, // Disable automatic ObjectId creation for each issue
id: Number,
url: String,
image: { type: Array, default: [] },
coverDate: String,
issueNumber: String,
},
],
default: null,
},
volume: {
type: {
_id: false, // Disable automatic ObjectId creation for volume
id: Number,
url: String,
image: { type: Array, default: [] },
name: String,
},
default: null,
},
},
{ _id: false }
); // Disable automatic ObjectId creation for the wanted object itself
const ComicSchema = mongoose.Schema(
{
@@ -67,18 +101,179 @@ const ComicSchema = mongoose.Schema(
},
sourcedMetadata: {
comicInfo: { type: mongoose.Schema.Types.Mixed, default: {} },
comicvine: {
type: ComicVineMetadataSchema,
es_indexed: true,
default: {},
},
shortboxed: {},
comicvine: { type: mongoose.Schema.Types.Mixed, default: {} },
metron: { type: mongoose.Schema.Types.Mixed, default: {} },
gcd: { type: mongoose.Schema.Types.Mixed, default: {} },
locg: {
type: LOCGSchema,
es_indexed: true,
default: {},
},
gcd: {},
},
// Canonical metadata - user-curated "canonical" values with source attribution
canonicalMetadata: {
// Core identifying information
title: {
value: { type: String, es_indexed: true },
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
},
// Series information
series: {
name: {
value: { type: String, es_indexed: true },
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
},
volume: {
value: Number,
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
},
startYear: {
value: Number,
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
}
},
// Issue information
issueNumber: {
value: { type: String, es_indexed: true },
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
},
// Publishing information
publisher: {
value: { type: String, es_indexed: true },
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
},
publicationDate: {
value: Date,
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
},
coverDate: {
value: Date,
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
},
// Content information
pageCount: {
value: Number,
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
},
summary: {
value: String,
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
},
// Creator information - array with source attribution
creators: [{
_id: false,
name: String,
role: String,
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
}],
// Character and genre arrays with source tracking
characters: {
values: [String],
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
},
genres: {
values: [String],
source: {
type: String,
enum: ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg', 'inferred', 'user'],
default: 'inferred'
},
userSelected: { type: Boolean, default: false },
lastModified: { type: Date, default: Date.now }
},
// Canonical metadata tracking
lastCanonicalUpdate: { type: Date, default: Date.now },
hasUserModifications: { type: Boolean, default: false },
// Quality and completeness tracking
completeness: {
score: { type: Number, min: 0, max: 100, default: 0 },
missingFields: [String],
lastCalculated: { type: Date, default: Date.now }
}
},
rawFileDetails: {
type: RawFileDetailsSchema,
@@ -98,11 +293,9 @@ const ComicSchema = mongoose.Schema(
subtitle: { type: String, es_indexed: true },
},
},
wanted: wantedSchema,
acquisition: {
source: {
wanted: Boolean,
name: String,
},
release: {},
directconnect: {
downloads: {
@@ -111,12 +304,13 @@ const ComicSchema = mongoose.Schema(
default: [],
},
},
torrent: {
sourceApplication: String,
magnet: String,
tracker: String,
status: String,
},
torrent: [
{
infoHash: String,
name: String,
announce: [String],
},
],
usenet: {
sourceApplication: String,
},

View File

@@ -1,95 +0,0 @@
const mongoose = require("mongoose");
const Things = mongoose.Schema({
_id: false,
api_detail_url: String,
id: Number,
name: String,
site_detail_url: String,
count: String,
});
const Issue = mongoose.Schema({
_id: false,
api_detail_url: String,
id: Number,
name: String,
issue_number: String,
});
const VolumeInformation = mongoose.Schema({
_id: false,
aliases: [String],
api_detail_url: String,
characters: [Things],
concepts: [Things],
count_of_issues: String,
date_added: String,
date_last_updated: String,
deck: String,
description: String,
first_issue: Issue,
id: Number,
image: {
icon_url: String,
medium_url: String,
screen_url: String,
screen_large_url: String,
small_url: String,
super_url: String,
thumb_url: String,
tiny_url: String,
original_url: String,
image_tags: String,
},
issues: [
{
api_detail_url: String,
id: Number,
name: String,
issue_number: String,
site_detail_url: String,
},
],
last_issue: Issue,
locations: [Things],
name: String,
objects: [Things],
people: [Things],
publisher: {
api_detail_url: String,
id: Number,
name: String,
},
site_detail_url: String,
start_year: String,
});
const ComicVineMetadataSchema = mongoose.Schema({
_id: false,
aliases: [String],
api_detail_url: String,
has_staff_review: { type: mongoose.Schema.Types.Mixed },
cover_date: Date,
date_added: String,
date_last_updated: String,
deck: String,
description: String,
image: {
icon_url: String,
medium_url: String,
screen_url: String,
screen_large_url: String,
small_url: String,
super_url: String,
thumb_url: String,
tiny_url: String,
original_url: String,
image_tags: String,
},
id: Number,
name: String,
resource_type: String,
volumeInformation: VolumeInformation,
});
export default ComicVineMetadataSchema;

59
models/graphql/typedef.ts Normal file
View File

@@ -0,0 +1,59 @@
import { gql } from "graphql-tag";
export const typeDefs = gql`
type Query {
comic(id: ID!): Comic
comics(limit: Int = 10): [Comic]
wantedComics(limit: Int = 25, offset: Int = 0): ComicPage!
}
type Comic {
id: ID!
title: String!
volume: Int
issueNumber: String!
publicationDate: String
variant: String
format: String
creators: [Creator!]!
arcs: [String!]
coverUrl: String
filePath: String
pageCount: Int
tags: [String!]
source: String
confidence: ConfidenceMap
provenance: ProvenanceMap
}
type Creator {
name: String!
role: String!
}
type ConfidenceMap {
title: Float
volume: Float
issueNumber: Float
publicationDate: Float
creators: Float
variant: Float
format: Float
}
type ProvenanceMap {
title: String
volume: String
issueNumber: String
publicationDate: String
creators: String
variant: String
format: String
}
type ComicPage {
total: Int!
results: [Comic!]!
}
`;

12
models/jobresult.model.ts Normal file
View File

@@ -0,0 +1,12 @@
const mongoose = require("mongoose");
const JobResultScehma = mongoose.Schema({
id: Number,
status: String,
sessionId: String,
failedReason: Object,
timestamp: Date,
});
const JobResult = mongoose.model("JobResult", JobResultScehma);
export default JobResult;

9
models/session.model.ts Normal file
View File

@@ -0,0 +1,9 @@
const mongoose = require("mongoose");
const SessionScehma = mongoose.Schema({
sessionId: String,
socketId: String,
});
const Session = mongoose.model("Session", SessionScehma);
export default Session;

View File

@@ -1,21 +1,34 @@
const mongoose = require("mongoose");
const paginate = require("mongoose-paginate-v2");
const HostSchema = mongoose.Schema({
_id: false,
username: String,
password: String,
hostname: String,
port: String,
protocol: String,
});
const SettingsScehma = mongoose.Schema({
directConnect: {
client: {
host: {
username: String,
password: String,
hostname: String,
port: String,
protocol: String,
},
host: HostSchema,
airDCPPUserSettings: Object,
hubs: Array,
},
},
bittorrent: {
client: {
name: String,
host: HostSchema,
},
},
prowlarr: {
client: {
host: HostSchema,
apiKey: String,
},
},
});
const Settings = mongoose.model("Settings", SettingsScehma);

7823
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -20,10 +20,10 @@
],
"author": "Rishi Ghan",
"devDependencies": {
"@elastic/elasticsearch": "^8.6.0",
"@types/lodash": "^4.14.168",
"@typescript-eslint/eslint-plugin": "^5.56.0",
"@typescript-eslint/parser": "^5.56.0",
"concurrently": "^9.2.0",
"eslint": "^8.36.0",
"eslint-plugin-import": "^2.20.2",
"eslint-plugin-prefer-arrow": "^1.2.2",
@@ -35,28 +35,36 @@
"npm": "^8.4.1",
"ts-jest": "^29.0.5",
"ts-node": "^10.9.1",
"typescript": "^5.0.2"
"typescript": "^5.0.2",
"uuid": "^9.0.0"
},
"dependencies": {
"@npcz/magic": "^1.3.14",
"redis": "^4.6.5",
"@socket.io/redis-adapter": "^8.1.0",
"@apollo/server": "^4.12.2",
"@as-integrations/express4": "^1.1.1",
"@bluelovers/fast-glob": "https://github.com/rishighan/fast-glob-v2-api.git",
"@elastic/elasticsearch": "^8.13.1",
"@jorgeferrero/stream-to-buffer": "^2.0.6",
"@ltv/moleculer-apollo-server-mixin": "^0.1.30",
"@npcz/magic": "^1.3.14",
"@root/walk": "^1.1.0",
"@socket.io/redis-adapter": "^8.1.0",
"@types/jest": "^27.4.1",
"@types/mkdirp": "^1.0.0",
"@types/node": "^13.9.8",
"@types/node": "^24.0.13",
"@types/string-similarity": "^4.0.0",
"axios": "^0.25.0",
"airdcpp-apisocket": "^3.0.0-beta.8",
"axios": "^1.6.8",
"axios-retry": "^3.2.4",
"bree": "^7.1.5",
"calibre-opds": "^1.0.7",
"chokidar": "^3.5.3",
"chokidar": "^4.0.3",
"cors": "^2.8.5",
"delay": "^5.0.0",
"dotenv": "^10.0.0",
"filename-parser": "^1.0.4",
"fs-extra": "^10.0.0",
"graphql": "^16.11.0",
"graphql-tag": "^2.12.6",
"http-response-stream": "^1.0.9",
"image-js": "^0.34.0",
"imghash": "^0.0.9",
@@ -65,27 +73,27 @@
"leven": "^3.1.0",
"lodash": "^4.17.21",
"mkdirp": "^0.5.5",
"moleculer": "^0.14.29",
"moleculer-bull": "github:rishighan/moleculer-bull#1.0.0",
"moleculer-bullmq": "^3.0.0",
"moleculer-db": "^0.8.23",
"moleculer-db-adapter-mongoose": "^0.9.2",
"moleculer-io": "^2.2.0",
"moleculer-web": "^0.10.5",
"moleculer-web": "^0.10.8",
"mongoosastic-ts": "^6.0.3",
"mongoose": "^6.10.4",
"mongoose-paginate-v2": "^1.3.18",
"nats": "^1.3.2",
"opds-extra": "^3.0.9",
"opds-extra": "^3.0.10",
"p7zip-threetwo": "^1.0.4",
"redis": "^4.6.5",
"sanitize-filename-ts": "^1.0.2",
"sharp": "^0.30.4",
"sharp": "^0.33.3",
"threetwo-ui-typings": "^1.0.14",
"through2": "^4.0.2",
"unrar": "^0.2.0",
"xml2js": "^0.4.23"
"xml2js": "^0.6.2"
},
"engines": {
"node": ">= 18.x.x"
"node": ">= 22.x.x"
},
"jest": {
"coverageDirectory": "<rootDir>/coverage",

156
services/airdcpp.service.ts Normal file
View File

@@ -0,0 +1,156 @@
"use strict";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import axios from "axios";
import AirDCPPSocket from "../shared/airdcpp.socket";
export default class AirDCPPService extends Service {
// @ts-ignore
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "airdcpp" }
) {
super(broker);
this.parseServiceSchema({
name: "airdcpp",
mixins: [],
hooks: {},
actions: {
initialize: {
rest: "POST /initialize",
handler: async (
ctx: Context<{
host: {
hostname: string;
port: string;
protocol: string;
username: string;
password: string;
};
}>
) => {
try {
const {
host: {
hostname,
protocol,
port,
username,
password,
},
} = ctx.params;
const airDCPPSocket = new AirDCPPSocket({
protocol,
hostname: `${hostname}:${port}`,
username,
password,
});
return await airDCPPSocket.connect();
} catch (err) {
console.error(err);
}
},
},
getHubs: {
rest: "POST /getHubs",
timeout: 70000,
handler: async (
ctx: Context<{
host: {
hostname: string;
port: string;
protocol: string;
username: string;
password: string;
};
}>
) => {
const {
host: {
hostname,
port,
protocol,
username,
password,
},
} = ctx.params;
try {
const airDCPPSocket = new AirDCPPSocket({
protocol,
hostname: `${hostname}:${port}`,
username,
password,
});
await airDCPPSocket.connect();
return await airDCPPSocket.get(`hubs`);
} catch (err) {
throw err;
}
},
},
search: {
rest: "POST /search",
timeout: 20000,
handler: async (
ctx: Context<{
host: {
hostname;
port;
protocol;
username;
password;
};
dcppSearchQuery;
}>
) => {
try {
const {
host: {
hostname,
port,
protocol,
username,
password,
},
dcppSearchQuery,
} = ctx.params;
const airDCPPSocket = new AirDCPPSocket({
protocol,
hostname: `${hostname}:${port}`,
username,
password,
});
await airDCPPSocket.connect();
const searchInstance = await airDCPPSocket.post(
`search`
);
// Post the search
const searchInfo = await airDCPPSocket.post(
`search/${searchInstance.id}/hub_search`,
dcppSearchQuery
);
await this.sleep(10000);
const results = await airDCPPSocket.get(
`search/${searchInstance.id}/results/0/5`
);
return results;
} catch (err) {
throw err;
}
},
},
},
methods: {
sleep: (ms: number) => {
return new Promise((resolve) => setTimeout(resolve, ms));
},
},
});
}
}

View File

@@ -1,20 +1,52 @@
import chokidar from "chokidar";
import chokidar, { FSWatcher } from "chokidar";
import fs from "fs";
import { Service, ServiceBroker } from "moleculer";
import ApiGateway from "moleculer-web";
import path from "path";
import { Service, ServiceBroker, ServiceSchema } from "moleculer";
import ApiGateway from "moleculer-web";
import debounce from "lodash/debounce";
import { IFolderData } from "threetwo-ui-typings";
/**
* ApiService exposes REST endpoints and watches the comics directory for changes.
* It uses chokidar to monitor filesystem events and broadcasts them via the Moleculer broker.
* @extends Service
*/
export default class ApiService extends Service {
/**
* The chokidar file system watcher instance.
* @private
*/
private fileWatcher?: any;
/**
* Creates an instance of ApiService.
* @param {ServiceBroker} broker - The Moleculer service broker instance.
*/
public constructor(broker: ServiceBroker) {
super(broker);
this.parseServiceSchema({
name: "api",
mixins: [ApiGateway],
// More info about settings: https://moleculer.services/docs/0.14/moleculer-web.html
settings: {
port: process.env.PORT || 3000,
routes: [
{
path: "/graphql",
whitelist: ["graphql.*"],
bodyParsers: {
json: true,
urlencoded: { extended: true },
},
aliases: {
"POST /": "graphql.wantedComics",
},
cors: {
origin: "*",
methods: ["GET", "OPTIONS", "POST"],
allowedHeaders: ["*"],
credentials: false,
},
},
{
path: "/api",
whitelist: ["**"],
@@ -39,18 +71,11 @@ export default class ApiService extends Service {
autoAliases: true,
aliases: {},
callingOptions: {},
bodyParsers: {
json: {
strict: false,
limit: "1MB",
},
urlencoded: {
extended: true,
limit: "1MB",
},
json: { strict: false, limit: "1MB" },
urlencoded: { extended: true, limit: "1MB" },
},
mappingPolicy: "all", // Available values: "all", "restrict"
mappingPolicy: "all",
logging: true,
},
{
@@ -71,120 +96,123 @@ export default class ApiService extends Service {
log4XXResponses: false,
logRequestParams: true,
logResponseData: true,
assets: {
folder: "public",
// Options to `server-static` module
options: {},
},
assets: { folder: "public", options: {} },
},
events: {
},
events: {},
methods: {},
started(): any {
// Filewatcher
const fileWatcher = chokidar.watch(
path.resolve("/comics"),
{
ignored: (filePath) =>
path.extname(filePath) === ".dctmp",
persistent: true,
usePolling: true,
interval: 5000,
ignoreInitial: true,
followSymlinks: true,
atomic: true,
awaitWriteFinish: {
stabilityThreshold: 2000,
pollInterval: 100,
},
}
);
const fileCopyDelaySeconds = 3;
const checkEnd = (path, prev) => {
fs.stat(path, async (err, stat) => {
// Replace error checking with something appropriate for your app.
if (err) throw err;
if (stat.mtime.getTime() === prev.mtime.getTime()) {
console.log("finished");
// Move on: call whatever needs to be called to process the file.
console.log(
"File detected, starting import..."
);
const walkedFolder: IFolderData =
await broker.call("library.walkFolders", {
basePathToWalk: path,
});
await this.broker.call(
"importqueue.processImport",
{
fileObject: {
filePath: path,
fileSize: walkedFolder[0].fileSize,
},
}
);
} else
setTimeout(
checkEnd,
fileCopyDelaySeconds,
path,
stat
);
});
};
fileWatcher
.on("add", (path, stats) => {
console.log("Watcher detected new files.");
console.log(
`File ${path} has been added with stats: ${JSON.stringify(
stats,
null,
2
)}`
);
console.log("File", path, "has been added");
fs.stat(path, function(err, stat) {
// Replace error checking with something appropriate for your app.
if (err) throw err;
setTimeout(
checkEnd,
fileCopyDelaySeconds,
path,
stat
);
});
})
// .once(
// "change",
// (path, stats) =>
// console.log(
// `File ${path} has been changed. Stats: ${JSON.stringify(
// stats,
// null,
// 2
// )}`
// )
// )
.on(
"unlink",
(path) =>
console.log(`File ${path} has been removed`)
)
.on(
"addDir",
(path) =>
console.log(`Directory ${path} has been added`)
);
},
started: this.startWatcher,
stopped: this.stopWatcher,
});
}
/**
* Initializes and starts the chokidar watcher on the COMICS_DIRECTORY.
* Debounces rapid events and logs initial scan completion.
* @private
*/
private startWatcher(): void {
const rawDir = process.env.COMICS_DIRECTORY;
if (!rawDir) {
this.logger.error("COMICS_DIRECTORY not set; cannot start watcher");
return;
}
const watchDir = path.resolve(rawDir);
this.logger.info(`Watching comics folder at: ${watchDir}`);
if (!fs.existsSync(watchDir)) {
this.logger.error(`✖ Comics folder does not exist: ${watchDir}`);
return;
}
this.fileWatcher = chokidar.watch(watchDir, {
persistent: true,
ignoreInitial: true,
followSymlinks: true,
depth: 10,
usePolling: true,
interval: 5000,
atomic: true,
awaitWriteFinish: { stabilityThreshold: 2000, pollInterval: 100 },
ignored: (p) => p.endsWith(".dctmp") || p.includes("/.git/"),
});
/**
* Debounced handler for file system events, batching rapid triggers
* into a 200ms window. Leading and trailing calls invoked.
* @param {string} event - Type of file event (add, change, etc.).
* @param {string} p - Path of the file or directory.
* @param {fs.Stats} [stats] - Optional file stats for add/change events.
*/
const debouncedEvent = debounce(
(event: string, p: string, stats?: fs.Stats) => {
try {
this.handleFileEvent(event, p, stats);
} catch (err) {
this.logger.error(
`Error handling file event [${event}] for ${p}:`,
err
);
}
},
200,
{ leading: true, trailing: true }
);
this.fileWatcher
.on("ready", () => this.logger.info("Initial scan complete."))
.on("error", (err) => this.logger.error("Watcher error:", err))
.on("add", (p, stats) => debouncedEvent("add", p, stats))
.on("change", (p, stats) => debouncedEvent("change", p, stats))
.on("unlink", (p) => debouncedEvent("unlink", p))
.on("addDir", (p) => debouncedEvent("addDir", p))
.on("unlinkDir", (p) => debouncedEvent("unlinkDir", p));
}
/**
* Stops and closes the chokidar watcher, freeing resources.
* @private
*/
private async stopWatcher(): Promise<void> {
if (this.fileWatcher) {
this.logger.info("Stopping file watcher...");
await this.fileWatcher.close();
this.fileWatcher = undefined;
}
}
/**
* Handles a filesystem event by logging and optionally importing new files.
* @param event - The type of chokidar event ('add', 'change', 'unlink', etc.).
* @param filePath - The full path of the file or directory that triggered the event.
* @param stats - Optional fs.Stats data for 'add' or 'change' events.
* @private
*/
private async handleFileEvent(
event: string,
filePath: string,
stats?: fs.Stats
): Promise<void> {
this.logger.info(`File event [${event}]: ${filePath}`);
if (event === "add" && stats) {
setTimeout(async () => {
const newStats = await fs.promises.stat(filePath);
if (newStats.mtime.getTime() === stats.mtime.getTime()) {
this.logger.info(
`Stable file detected: ${filePath}, importing.`
);
const folderData: IFolderData = await this.broker.call(
"library.walkFolders",
{ basePathToWalk: filePath }
);
// this would have to be a call to importDownloadedComic
await this.broker.call("importqueue.processImport", {
fileObject: {
filePath,
fileSize: folderData[0].fileSize,
},
});
}
}, 3000);
}
this.broker.broadcast(event, { path: filePath });
}
}

116
services/graphql.service.ts Normal file
View File

@@ -0,0 +1,116 @@
// services/graphql.service.ts
import { gql as ApolloMixin } from "@ltv/moleculer-apollo-server-mixin";
import { print } from "graphql";
import { typeDefs } from "../models/graphql/typedef";
import { ServiceSchema } from "moleculer";
/**
* Interface representing the structure of an ElasticSearch result.
*/
interface SearchResult {
hits: {
total: { value: number };
hits: any[];
};
}
/**
* GraphQL Moleculer Service exposing typed resolvers via @ltv/moleculer-apollo-server-mixin.
* Includes resolver for fetching comics marked as "wanted".
*/
const GraphQLService: ServiceSchema = {
name: "graphql",
mixins: [ApolloMixin],
actions: {
/**
* Resolver for fetching comics marked as "wanted" in ElasticSearch.
*
* Queries the `search.issue` Moleculer action using a filtered ES query
* that matches issues or volumes with a `wanted` flag.
*
* @param {number} [limit=25] - Maximum number of results to return.
* @param {number} [offset=0] - Starting index for paginated results.
* @returns {Promise<{ total: number, comics: any[] }>} - Total number of matches and result set.
*
* @example
* query {
* wantedComics(limit: 10, offset: 0) {
* total
* comics {
* _id
* _source {
* title
* }
* }
* }
* }
*/
wantedComics: {
params: {
limit: {
type: "number",
integer: true,
min: 1,
optional: true,
},
offset: {
type: "number",
integer: true,
min: 0,
optional: true,
},
},
async handler(ctx) {
const { limit = 25, offset = 0 } = ctx.params;
const eSQuery = {
bool: {
should: [
{ exists: { field: "wanted.issues" } },
{ exists: { field: "wanted.volume" } },
],
minimum_should_match: 1,
},
};
const result = (await ctx.broker.call("search.issue", {
query: eSQuery,
pagination: { size: limit, from: offset },
type: "wanted",
trigger: "wantedComicsGraphQL",
})) as SearchResult;
return {
data: {
wantedComics: {
total: result?.hits?.total?.value || 0,
comics:
result?.hits?.hits.map((hit) => hit._source) ||
[],
},
},
};
},
},
},
settings: {
apolloServer: {
typeDefs: print(typeDefs), // If typeDefs is AST; remove print if it's raw SDL string
resolvers: {
Query: {
wantedComics: "graphql.wantedComics",
},
},
path: "/graphql",
playground: true,
introspection: true,
context: ({ ctx }: any) => ({
broker: ctx.broker,
}),
},
},
};
export default GraphQLService;

View File

@@ -7,6 +7,8 @@ import {
ServiceSchema,
Errors,
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import path from "path";
import {
analyze,
@@ -22,16 +24,13 @@ export default class ImageTransformation extends Service {
super(broker);
this.parseServiceSchema({
name: "imagetransformation",
mixins: [],
mixins: [DbMixin("comics", Comic)],
settings: {
// Available fields in the responses
fields: ["_id", "name", "quantity", "price"],
fields: ["_id"],
// Validator for the `create` & `insert` actions.
entityValidator: {
name: "string|min:3",
price: "number|positive",
},
entityValidator: {},
},
hooks: {},
actions: {

View File

@@ -1,291 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2022 Rishi Ghan
*
The MIT License (MIT)
Copyright (c) 2015 Rishi Ghan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/*
* Revision History:
* Initial: 2022/01/28 Rishi Ghan
*/
"use strict";
import { refineQuery } from "filename-parser";
import { isNil, isUndefined } from "lodash";
import { Context, Service, ServiceBroker, ServiceSchema } from "moleculer";
import BullMQMixin, { SandboxedJob } from "moleculer-bull";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import {
extractFromArchive,
uncompressEntireArchive,
} from "../utils/uncompression.utils";
const REDIS_URI = process.env.REDIS_URI || `redis://localhost:6379`;
const EventEmitter = require("events");
EventEmitter.defaultMaxListeners = 20;
console.log(`REDIS -> ${REDIS_URI}`);
export default class QueueService extends Service {
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "importqueue" }
) {
super(broker);
this.parseServiceSchema({
name: "importqueue",
mixins: [BullMQMixin(REDIS_URI), DbMixin("comics", Comic)],
settings: {
bullmq: {
maxStalledCount: 0,
},
},
hooks: {},
queues: {
"process.import": {
concurrency: 10,
async process(job: SandboxedJob) {
console.info("New job received!", job.data);
console.info(`Processing queue...`);
// extract the cover
const result = await extractFromArchive(
job.data.fileObject.filePath
);
const {
name,
filePath,
fileSize,
extension,
mimeType,
cover,
containedIn,
comicInfoJSON,
} = result;
// Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(
result.name
);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
);
// Add the bundleId, if present to the payload
let bundleId = null;
if (!isNil(job.data.bundleId)) {
bundleId = job.data.bundleId;
}
// Orchestrate the payload
const payload = {
importStatus: {
isImported: true,
tagged: false,
matchedResult: {
score: "0",
},
},
rawFileDetails: {
name,
filePath,
fileSize,
extension,
mimeType,
containedIn,
cover,
},
inferredMetadata: {
issue: inferredIssueDetails,
},
sourcedMetadata: {
// except for ComicInfo.xml, everything else should be copied over from the
// parent comic
comicInfo: comicInfoJSON,
},
// since we already have at least 1 copy
// mark it as not wanted by default
"acquisition.source.wanted": false,
// clear out the downloads array
// "acquisition.directconnect.downloads": [],
// mark the metadata source
"acquisition.source.name": job.data.sourcedFrom,
};
// Add the sourcedMetadata, if present
if (!isNil(job.data.sourcedMetadata) && !isUndefined(job.data.sourcedMetadata.comicvine)) {
Object.assign(
payload.sourcedMetadata,
job.data.sourcedMetadata
);
}
// write to mongo
const importResult = await this.broker.call(
"library.rawImportToDB",
{
importType: job.data.importType,
bundleId,
payload,
}
);
return {
data: {
importResult,
},
id: job.id,
worker: process.pid,
};
},
},
"process.uncompressAndResize": {
concurrency: 2,
async process(job: SandboxedJob) {
console.log(`Initiating uncompression job...`);
return await uncompressEntireArchive(
job.data.filePath,
job.data.options
);
},
},
},
actions: {
uncompressResize: {
rest: "POST /uncompressResize",
params: {},
async handler(
ctx: Context<{
data: { filePath: string; options: any };
}>
) {
return await this.createJob(
"process.uncompressAndResize",
ctx.params
);
},
},
processImport: {
rest: "POST /processImport",
params: {},
async handler(
ctx: Context<{
fileObject: object;
importType: string;
bundleId: number;
sourcedFrom?: string;
sourcedMetadata: object;
}>
) {
return await this.createJob("process.import", {
fileObject: ctx.params.fileObject,
importType: ctx.params.importType,
bundleId: ctx.params.bundleId,
sourcedFrom: ctx.params.sourcedFrom,
sourcedMetadata: ctx.params.sourcedMetadata,
});
},
},
toggleImportQueue: {
rest: "POST /pauseImportQueue",
params: {},
handler: async (ctx: Context<{ action: string }>) => {
switch (ctx.params.action) {
case "pause":
const foo = await this.getQueue(
"process.import"
).pause();
console.log("paused", foo);
return foo;
case "resume":
const soo = await this.getQueue(
"process.import"
).resume();
console.log("resumed", soo);
return soo;
default:
console.log("Unrecognized queue action.");
}
},
},
},
methods: {},
async started(): Promise<any> {
await this.getQueue("process.import").on(
"failed",
async (job, error) => {
console.error(
`An error occured in 'process.import' queue on job id '${job.id}': ${error.message}`
);
console.error(job.data);
}
);
await this.getQueue("process.import").on(
"completed",
async (job, res) => {
await this.broker.call("socket.broadcast", {
namespace: "/", //optional
event: "action",
args: [{ type: "LS_COVER_EXTRACTED", result: res }], //optional
});
console.info(
`Import Job with the id '${job.id}' completed.`
);
}
);
await this.getQueue("process.import").on(
"stalled",
async (job) => {
console.warn(`Import job '${job.id} stalled!`);
console.log(`${JSON.stringify(job, null, 2)}`);
console.log(`is stalled.`);
}
);
await this.getQueue("process.uncompressAndResize").on(
"completed",
async (job, res) => {
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "action",
args: [
{
type: "COMICBOOK_EXTRACTION_SUCCESS",
result: {
files: res,
purpose: job.data.options.purpose,
},
},
],
});
console.info(`Uncompression Job ${job.id} completed.`);
}
);
},
});
}
}

View File

@@ -0,0 +1,683 @@
import { Context, Service, ServiceBroker } from "moleculer";
import JobResult from "../models/jobresult.model";
import { refineQuery } from "filename-parser";
import BullMqMixin from "moleculer-bullmq";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
const ObjectId = require("mongoose").Types.ObjectId;
import {
extractFromArchive,
uncompressEntireArchive,
} from "../utils/uncompression.utils";
import { isNil, isUndefined } from "lodash";
import { pubClient } from "../config/redis.config";
import path from "path";
const { MoleculerError } = require("moleculer").Errors;
console.log(process.env.REDIS_URI);
export default class JobQueueService extends Service {
public constructor(public broker: ServiceBroker) {
super(broker);
this.parseServiceSchema({
name: "jobqueue",
hooks: {},
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: process.env.REDIS_URI,
},
},
actions: {
getJobCountsByType: {
rest: "GET /getJobCountsByType",
handler: async (ctx: Context<{}>) => {
console.log(ctx.params);
return await this.$resolve("jobqueue").getJobCounts();
},
},
toggle: {
rest: "GET /toggle",
handler: async (ctx: Context<{ action: String }>) => {
switch (ctx.params.action) {
case "pause":
this.pause();
break;
case "resume":
this.resume();
break;
default:
console.log(`Unknown queue action.`);
}
},
},
enqueue: {
queue: true,
rest: "GET /enqueue",
handler: async (
ctx: Context<{ action: string; description: string }>
) => {
const { action, description } = ctx.params;
// Enqueue the job
const job = await this.localQueue(
ctx,
action,
ctx.params,
{
priority: 10,
}
);
console.log(`Job ${job.id} enqueued`);
console.log(`${description}`);
return job.id;
},
},
// Comic Book Import Job Queue - Enhanced for better metadata handling
"enqueue.async": {
handler: async (
ctx: Context<{
sessionId: String;
}>
) => {
try {
console.log(
`Received Job ID ${ctx.locals.job.id}, processing...`
);
// 1. De-structure the job params
const { fileObject } = ctx.locals.job.data.params;
// 2. Extract metadata from the archive
const result = await extractFromArchive(
fileObject.filePath
);
const {
name,
filePath,
fileSize,
extension,
mimeType,
cover,
containedIn,
comicInfoJSON,
} = result;
// 3a. Infer any issue-related metadata from the filename
const { inferredIssueDetails } = refineQuery(
result.name
);
console.log(
"Issue metadata inferred: ",
JSON.stringify(inferredIssueDetails, null, 2)
);
// 3b. Prepare sourced metadata from various sources
let sourcedMetadata = {
comicInfo: comicInfoJSON || {},
comicvine: {},
metron: {},
gcd: {},
locg: {}
};
// Include any external metadata if provided
if (!isNil(ctx.locals.job.data.params.sourcedMetadata)) {
const providedMetadata = ctx.locals.job.data.params.sourcedMetadata;
sourcedMetadata = {
...sourcedMetadata,
...providedMetadata
};
}
// 3c. Prepare inferred metadata matching Comic model structure
const inferredMetadata = {
series: inferredIssueDetails?.name || "Unknown Series",
issue: {
name: inferredIssueDetails?.name || "Unknown Series",
number: inferredIssueDetails?.number || 1,
subtitle: inferredIssueDetails?.subtitle || "",
year: inferredIssueDetails?.year || new Date().getFullYear().toString()
},
volume: 1, // Default volume since not available in inferredIssueDetails
title: inferredIssueDetails?.name || path.basename(filePath, path.extname(filePath))
};
// 3d. Create canonical metadata - user-curated values with source attribution
const canonicalMetadata = this.createCanonicalMetadata(sourcedMetadata, inferredMetadata);
// 3e. Create comic payload with canonical metadata structure
const comicPayload = {
// File details
rawFileDetails: {
name,
filePath,
fileSize,
extension,
mimeType,
containedIn,
cover,
},
// Enhanced sourced metadata (now supports more sources)
sourcedMetadata,
// Original inferred metadata
inferredMetadata,
// New canonical metadata - user-curated values with source attribution
canonicalMetadata,
// Import status
"acquisition.source.wanted": false,
"acquisition.source.name": ctx.locals.job.data.params.sourcedFrom,
};
// 3f. Add bundleId if present
let bundleId = null;
if (!isNil(ctx.locals.job.data.params.bundleId)) {
bundleId = ctx.locals.job.data.params.bundleId;
}
// 4. Use library service to import with enhanced metadata
const importResult = await this.broker.call(
"library.importFromJob",
{
importType: ctx.locals.job.data.params.importType,
bundleId,
payload: comicPayload,
}
);
return {
data: {
importResult,
},
id: ctx.locals.job.id,
sessionId: ctx.params.sessionId,
};
} catch (error) {
console.error(
`An error occurred processing Job ID ${ctx.locals.job.id}`
);
throw new MoleculerError(
error,
500,
"ENHANCED_IMPORT_JOB_ERROR",
{
data: ctx.params.sessionId,
}
);
}
},
},
getJobResultStatistics: {
rest: "GET /getJobResultStatistics",
handler: async (ctx: Context<{}>) => {
return await JobResult.aggregate([
{
$group: {
_id: {
sessionId: "$sessionId",
status: "$status",
},
earliestTimestamp: {
$min: "$timestamp",
},
count: {
$sum: 1,
},
},
},
{
$group: {
_id: "$_id.sessionId",
statuses: {
$push: {
status: "$_id.status",
earliestTimestamp:
"$earliestTimestamp",
count: "$count",
},
},
},
},
{
$project: {
_id: 0,
sessionId: "$_id",
completedJobs: {
$reduce: {
input: "$statuses",
initialValue: 0,
in: {
$sum: [
"$$value",
{
$cond: [
{
$eq: [
"$$this.status",
"completed",
],
},
"$$this.count",
0,
],
},
],
},
},
},
failedJobs: {
$reduce: {
input: "$statuses",
initialValue: 0,
in: {
$sum: [
"$$value",
{
$cond: [
{
$eq: [
"$$this.status",
"failed",
],
},
"$$this.count",
0,
],
},
],
},
},
},
earliestTimestamp: {
$min: "$statuses.earliestTimestamp",
},
},
},
]);
},
},
"uncompressFullArchive.async": {
rest: "POST /uncompressFullArchive",
handler: async (
ctx: Context<{
filePath: string;
comicObjectId: string;
options: any;
}>
) => {
console.log(
`Received Job ID ${JSON.stringify(
ctx.locals
)}, processing...`
);
const { filePath, options, comicObjectId } = ctx.params;
const comicId = new ObjectId(comicObjectId);
// 2. Extract metadata from the archive
const result: string[] = await uncompressEntireArchive(
filePath,
options
);
if (Array.isArray(result) && result.length !== 0) {
// Get the containing directory of the uncompressed archive
const directoryPath = path.dirname(result[0]);
// Add to mongo object
await Comic.findByIdAndUpdate(
comicId,
{
$set: {
"rawFileDetails.archive": {
uncompressed: true,
expandedPath: directoryPath,
},
},
},
{ new: true, safe: true, upsert: true }
);
return result;
}
},
},
},
events: {
async "uncompressFullArchive.async.active"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} is set to active.`
);
},
async "uncompressFullArchive.async.completed"(
ctx: Context<{ id: number }>
) {
console.log(
`Uncompression Job ID ${ctx.params.id} completed.`
);
const job = await this.job(ctx.params.id);
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_UNCOMPRESSION_JOB_COMPLETE",
args: [
{
uncompressedArchive: job.returnvalue,
},
],
});
return job.returnvalue;
},
// use the `${QUEUE_NAME}.QUEUE_EVENT` scheme
async "enqueue.async.active"(ctx: Context<{ id: Number }>) {
console.log(`Job ID ${ctx.params.id} is set to active.`);
},
async drained(ctx) {
console.log("Queue drained.");
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_IMPORT_QUEUE_DRAINED",
args: [
{
message: "drained",
},
],
});
},
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
// 1. Fetch the job result using the job Id
const job = await this.job(ctx.params.id);
// 2. Increment the completed job counter
await pubClient.incr("completedJobCount");
// 3. Fetch the completed job count for the final payload to be sent to the client
const completedJobCount = await pubClient.get(
"completedJobCount"
);
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_COVER_EXTRACTED",
args: [
{
completedJobCount,
importResult: job.returnvalue.data.importResult,
},
],
});
// 5. Persist the job results in mongo for posterity
await JobResult.create({
id: ctx.params.id,
status: "completed",
timestamp: job.timestamp,
sessionId: job.returnvalue.sessionId,
failedReason: {},
});
console.log(`Job ID ${ctx.params.id} completed.`);
},
async "enqueue.async.failed"(ctx) {
const job = await this.job(ctx.params.id);
await pubClient.incr("failedJobCount");
const failedJobCount = await pubClient.get(
"failedJobCount"
);
await JobResult.create({
id: ctx.params.id,
status: "failed",
failedReason: job.failedReason,
sessionId: job.data.params.sessionId,
timestamp: job.timestamp,
});
// 4. Emit the LS_COVER_EXTRACTION_FAILED event with the necessary details
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_COVER_EXTRACTION_FAILED",
args: [
{
failedJobCount,
importResult: job,
},
],
});
},
},
methods: {
/**
* Create canonical metadata structure with source attribution for user-driven curation
* @param sourcedMetadata - Metadata from various external sources
* @param inferredMetadata - Metadata inferred from filename/file analysis
*/
createCanonicalMetadata(sourcedMetadata: any, inferredMetadata: any) {
const currentTime = new Date();
// Priority order: comicInfo -> comicvine -> metron -> gcd -> locg -> inferred
const sourcePriority = ['comicInfo', 'comicvine', 'metron', 'gcd', 'locg'];
// Helper function to extract actual value from metadata (handle arrays, etc.)
const extractValue = (value: any) => {
if (Array.isArray(value)) {
return value.length > 0 ? value[0] : null;
}
return value;
};
// Helper function to find the best value and its source
const findBestValue = (fieldName: string, defaultValue: any = null, defaultSource: string = 'inferred') => {
for (const source of sourcePriority) {
const rawValue = sourcedMetadata[source]?.[fieldName];
if (rawValue !== undefined && rawValue !== null && rawValue !== '') {
const extractedValue = extractValue(rawValue);
if (extractedValue !== null && extractedValue !== '') {
return {
value: extractedValue,
source: source,
userSelected: false,
lastModified: currentTime
};
}
}
}
return {
value: defaultValue,
source: defaultSource,
userSelected: false,
lastModified: currentTime
};
};
// Helper function for series-specific field resolution
const findSeriesValue = (fieldNames: string[], defaultValue: any = null) => {
for (const source of sourcePriority) {
const metadata = sourcedMetadata[source];
if (metadata) {
for (const fieldName of fieldNames) {
const rawValue = metadata[fieldName];
if (rawValue !== undefined && rawValue !== null && rawValue !== '') {
const extractedValue = extractValue(rawValue);
if (extractedValue !== null && extractedValue !== '') {
return {
value: extractedValue,
source: source,
userSelected: false,
lastModified: currentTime
};
}
}
}
}
}
return {
value: defaultValue,
source: 'inferred',
userSelected: false,
lastModified: currentTime
};
};
const canonical: any = {
// Core identifying information
title: findBestValue('title', inferredMetadata.title),
// Series information
series: {
name: findSeriesValue(['series', 'seriesName', 'name'], inferredMetadata.series),
volume: findBestValue('volume', inferredMetadata.volume || 1),
startYear: findBestValue('startYear', inferredMetadata.issue?.year ? parseInt(inferredMetadata.issue.year) : new Date().getFullYear())
},
// Issue information
issueNumber: findBestValue('issueNumber', inferredMetadata.issue?.number?.toString() || "1"),
// Publishing information
publisher: findBestValue('publisher', null),
publicationDate: findBestValue('publicationDate', null),
coverDate: findBestValue('coverDate', null),
// Content information
pageCount: findBestValue('pageCount', null),
summary: findBestValue('summary', null),
// Creator information - collect from all sources for richer data
creators: [],
// Character and genre arrays with source tracking
characters: {
values: [],
source: 'inferred',
userSelected: false,
lastModified: currentTime
},
genres: {
values: [],
source: 'inferred',
userSelected: false,
lastModified: currentTime
},
// Canonical metadata tracking
lastCanonicalUpdate: currentTime,
hasUserModifications: false,
// Quality and completeness tracking
completeness: {
score: 0,
missingFields: [],
lastCalculated: currentTime
}
};
// Handle creators - combine from all sources but track source attribution
const allCreators: any[] = [];
for (const source of sourcePriority) {
const metadata = sourcedMetadata[source];
if (metadata?.creators) {
metadata.creators.forEach((creator: any) => {
allCreators.push({
name: extractValue(creator.name),
role: extractValue(creator.role),
source: source,
userSelected: false,
lastModified: currentTime
});
});
} else {
// Handle legacy writer/artist fields
if (metadata?.writer) {
allCreators.push({
name: extractValue(metadata.writer),
role: 'Writer',
source: source,
userSelected: false,
lastModified: currentTime
});
}
if (metadata?.artist) {
allCreators.push({
name: extractValue(metadata.artist),
role: 'Artist',
source: source,
userSelected: false,
lastModified: currentTime
});
}
}
}
canonical.creators = allCreators;
// Handle characters - combine from all sources
const allCharacters = new Set();
let characterSource = 'inferred';
for (const source of sourcePriority) {
if (sourcedMetadata[source]?.characters && sourcedMetadata[source].characters.length > 0) {
sourcedMetadata[source].characters.forEach((char: string) => allCharacters.add(char));
if (characterSource === 'inferred') characterSource = source; // Use the first source found
}
}
canonical.characters = {
values: Array.from(allCharacters),
source: characterSource,
userSelected: false,
lastModified: currentTime
};
// Handle genres - combine from all sources
const allGenres = new Set();
let genreSource = 'inferred';
for (const source of sourcePriority) {
if (sourcedMetadata[source]?.genres && sourcedMetadata[source].genres.length > 0) {
sourcedMetadata[source].genres.forEach((genre: string) => allGenres.add(genre));
if (genreSource === 'inferred') genreSource = source; // Use the first source found
}
}
canonical.genres = {
values: Array.from(allGenres),
source: genreSource,
userSelected: false,
lastModified: currentTime
};
// Calculate completeness score
const requiredFields = ['title', 'series.name', 'issueNumber', 'publisher'];
const optionalFields = ['publicationDate', 'coverDate', 'pageCount', 'summary'];
const missingFields = [];
let filledCount = 0;
// Check required fields
requiredFields.forEach(field => {
const fieldPath = field.split('.');
let value = canonical;
for (const path of fieldPath) {
value = value?.[path];
}
if (value?.value) {
filledCount++;
} else {
missingFields.push(field);
}
});
// Check optional fields
optionalFields.forEach(field => {
if (canonical[field]?.value) {
filledCount++;
}
});
const totalFields = requiredFields.length + optionalFields.length;
canonical.completeness = {
score: Math.round((filledCount / totalFields) * 100),
missingFields: missingFields,
lastCalculated: currentTime
};
return canonical;
}
},
});
}
}

View File

@@ -51,11 +51,13 @@ import {
IExtractionOptions,
} from "threetwo-ui-typings";
const ObjectId = require("mongoose").Types.ObjectId;
import { pubClient } from "../config/redis.config";
import fsExtra from "fs-extra";
const through2 = require("through2");
import klaw from "klaw";
import path from "path";
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
import AirDCPPSocket from "../shared/airdcpp.socket";
console.log(`MONGO -> ${process.env.MONGO_URI}`);
export default class ImportService extends Service {
@@ -66,16 +68,32 @@ export default class ImportService extends Service {
mixins: [DbMixin("comics", Comic)],
hooks: {},
actions: {
getHealthInformation: {
rest: "GET /getHealthInformation",
params: {},
handler: async (ctx: Context<{}>) => {
try {
return await ctx.broker.call("$node.services");
} catch (error) {
return new Error("Service is down.");
}
},
},
walkFolders: {
rest: "POST /walkFolders",
params: {
basePathToWalk: "string",
},
async handler(ctx: Context<{ basePathToWalk: string }>) {
params: {},
async handler(
ctx: Context<{
basePathToWalk: string;
extensions: string[];
}>
) {
console.log(ctx.params);
return await walkFolder(ctx.params.basePathToWalk, [
".cbz",
".cbr",
".cb7",
...ctx.params.extensions,
]);
},
},
@@ -90,11 +108,18 @@ export default class ImportService extends Service {
rest: "POST /uncompressFullArchive",
params: {},
handler: async (
ctx: Context<{ filePath: string; options: any }>
ctx: Context<{
filePath: string;
comicObjectId: string;
options: any;
}>
) => {
await broker.call("importqueue.uncompressResize", {
this.broker.call("jobqueue.enqueue", {
filePath: ctx.params.filePath,
comicObjectId: ctx.params.comicObjectId,
options: ctx.params.options,
action: "uncompressFullArchive.async",
description: `Job for uncompressing archive at ${ctx.params.filePath}`,
});
},
},
@@ -139,64 +164,81 @@ export default class ImportService extends Service {
},
newImport: {
rest: "POST /newImport",
params: {},
// params: {},
async handler(
ctx: Context<{
extractionOptions?: any;
sessionId: string;
}>
) {
// 1. Walk the Source folder
klaw(path.resolve(COMICS_DIRECTORY))
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
let fileExtension = path.extname(item.path);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
this.push(item);
}
next();
})
)
// 1.2 Pipe filtered results to the next step
.on("data", async (item) => {
console.info(
"Found a file at path: %s",
item.path
);
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
path.extname(item.path)
)}`,
});
if (!comicExists) {
// 2. Send the extraction job to the queue
await broker.call(
"importqueue.processImport",
{
try {
// Get params to be passed to the import jobs
const { sessionId } = ctx.params;
// 1. Walk the Source folder
klaw(path.resolve(COMICS_DIRECTORY))
// 1.1 Filter on .cb* extensions
.pipe(
through2.obj(function (item, enc, next) {
let fileExtension = path.extname(
item.path
);
if (
[".cbz", ".cbr", ".cb7"].includes(
fileExtension
)
) {
this.push(item);
}
next();
})
)
// 1.2 Pipe filtered results to the next step
// Enqueue the job in the queue
.on("data", async (item) => {
console.info(
"Found a file at path: %s",
item.path
);
let comicExists = await Comic.exists({
"rawFileDetails.name": `${path.basename(
item.path,
path.extname(item.path)
)}`,
});
if (!comicExists) {
// 2.1 Reset the job counters in Redis
await pubClient.set(
"completedJobCount",
0
);
await pubClient.set(
"failedJobCount",
0
);
// 2.2 Send the extraction job to the queue
this.broker.call("jobqueue.enqueue", {
fileObject: {
filePath: item.path,
fileSize: item.stats.size,
},
sessionId,
importType: "new",
}
);
} else {
console.log(
"Comic already exists in the library."
);
}
})
.on("end", () => {
console.log("All files traversed.");
});
action: "enqueue.async",
});
} else {
console.log(
"Comic already exists in the library."
);
}
})
.on("end", () => {
console.log("All files traversed.");
});
} catch (error) {
console.log(error);
}
},
},
rawImportToDB: {
rest: "POST /rawImportToDB",
params: {},
@@ -207,10 +249,7 @@ export default class ImportService extends Service {
payload: {
_id?: string;
sourcedMetadata: {
comicvine?: {
volume: { api_detail_url: string };
volumeInformation: {};
};
comicvine?: any;
locg?: {};
};
inferredMetadata: {
@@ -219,11 +258,13 @@ export default class ImportService extends Service {
rawFileDetails: {
name: string;
};
wanted: {
issues: [];
volume: { id: number };
source: string;
markEntireVolumeWanted: Boolean;
};
acquisition: {
source: {
wanted: boolean;
name?: string;
};
directconnect: {
downloads: [];
};
@@ -232,61 +273,109 @@ export default class ImportService extends Service {
}>
) {
try {
let volumeDetails;
const comicMetadata = ctx.params.payload;
// When an issue is added from the search CV feature
// we solicit volume information and add that to mongo
if (
comicMetadata.sourcedMetadata.comicvine &&
!isNil(
comicMetadata.sourcedMetadata.comicvine
.volume
)
) {
volumeDetails = await this.broker.call(
"comicvine.getVolumes",
{
volumeURI:
comicMetadata.sourcedMetadata
.comicvine.volume
.api_detail_url,
}
);
comicMetadata.sourcedMetadata.comicvine.volumeInformation =
volumeDetails.results;
}
console.log(
JSON.stringify(ctx.params.payload, null, 4)
);
const { payload } = ctx.params;
const { wanted } = payload;
console.log("Saving to Mongo...");
console.log(
`Import type: [${ctx.params.importType}]`
);
switch (ctx.params.importType) {
case "new":
return await Comic.create(comicMetadata);
case "update":
return await Comic.findOneAndUpdate(
{
"acquisition.directconnect.downloads.bundleId":
ctx.params.bundleId,
},
comicMetadata,
{
upsert: true,
new: true,
}
);
default:
return false;
if (
!wanted ||
!wanted.volume ||
!wanted.volume.id
) {
console.log(
"No valid identifier for upsert. Attempting to create a new document with minimal data..."
);
const newDocument = new Comic(payload); // Using the entire payload for the new document
await newDocument.save();
return {
success: true,
message:
"New document created due to lack of valid identifiers.",
data: newDocument,
};
}
let condition = {
"wanted.volume.id": wanted.volume.id,
};
let update: any = {
// Using 'any' to bypass strict type checks; alternatively, define a more accurate type
$set: {
rawFileDetails: payload.rawFileDetails,
inferredMetadata: payload.inferredMetadata,
sourcedMetadata: payload.sourcedMetadata,
},
$setOnInsert: {
"wanted.source": payload.wanted.source,
"wanted.markEntireVolumeWanted":
payload.wanted.markEntireVolumeWanted,
"wanted.volume": payload.wanted.volume,
},
};
if (wanted.issues && wanted.issues.length > 0) {
update.$addToSet = {
"wanted.issues": { $each: wanted.issues },
};
}
const options = {
upsert: true,
new: true,
};
const result = await Comic.findOneAndUpdate(
condition,
update,
options
);
console.log(
"Operation completed. Document updated or inserted:",
result
);
return {
success: true,
message: "Document successfully upserted.",
data: result,
};
} catch (error) {
console.log(error);
throw new Errors.MoleculerError(
"Import failed.",
"Operation failed.",
500
);
}
},
},
getComicsMarkedAsWanted: {
rest: "GET /getComicsMarkedAsWanted",
handler: async (ctx: Context<{}>) => {
try {
// Query to find comics where 'markEntireVolumeAsWanted' is true or 'issues' array is not empty
const wantedComics = await Comic.find({
wanted: { $exists: true },
$or: [
{ "wanted.markEntireVolumeWanted": true },
{ "wanted.issues": { $not: { $size: 0 } } },
],
});
console.log(wantedComics); // Output the found comics
return wantedComics;
} catch (error) {
console.error("Error finding comics:", error);
throw error;
}
},
},
applyComicVineMetadata: {
rest: "POST /applyComicVineMetadata",
params: {},
@@ -383,6 +472,66 @@ export default class ImportService extends Service {
});
},
},
applyTorrentDownloadMetadata: {
rest: "POST /applyTorrentDownloadMetadata",
handler: async (
ctx: Context<{
torrentToDownload: any;
comicObjectId: String;
infoHash: String;
name: String;
announce: [String];
}>
) => {
const {
name,
torrentToDownload,
comicObjectId,
announce,
infoHash,
} = ctx.params;
console.log(JSON.stringify(ctx.params, null, 4));
try {
return await Comic.findByIdAndUpdate(
new ObjectId(comicObjectId),
{
$push: {
"acquisition.torrent": {
infoHash,
name,
announce,
},
},
},
{ new: true, safe: true, upsert: true }
);
} catch (err) {
console.log(err);
}
},
},
getInfoHashes: {
rest: "GET /getInfoHashes",
handler: async (ctx: Context<{}>) => {
try {
return await Comic.aggregate([
{
$unwind: "$acquisition.torrent",
},
{
$group: {
_id: "$_id",
infoHashes: {
$push: "$acquisition.torrent.infoHash",
},
},
},
]);
} catch (err) {
return err;
}
},
},
getComicBooks: {
rest: "POST /getComicBooks",
params: {},
@@ -402,7 +551,10 @@ export default class ImportService extends Service {
rest: "POST /getComicBookById",
params: { id: "string" },
async handler(ctx: Context<{ id: string }>) {
return await Comic.findById(ctx.params.id);
console.log(ctx.params.id);
return await Comic.findById(
new ObjectId(ctx.params.id)
);
},
},
getComicBooksByIds: {
@@ -621,6 +773,48 @@ export default class ImportService extends Service {
},
},
// This method belongs in library service,
// because bundles can only exist for comics _in the library_
// (wanted or imported)
getBundles: {
rest: "POST /getBundles",
params: {},
handler: async (
ctx: Context<{
comicObjectId: string;
config: any;
}>
) => {
try {
// 1. Get the comic object Id
const { config } = ctx.params;
const comicObject = await Comic.findById(
new ObjectId(ctx.params.comicObjectId)
);
// 2. Init AirDC++
const ADCPPSocket = new AirDCPPSocket(config);
await ADCPPSocket.connect();
// 3. Get the bundles for the comic object
if (comicObject) {
// make the call to get the bundles from AirDC++ using the bundleId
const bundles =
comicObject.acquisition.directconnect.downloads.map(
async (bundle) => {
return await ADCPPSocket.get(
`queue/bundles/${bundle.bundleId}`
);
}
);
return Promise.all(bundles);
}
} catch (error) {
throw new Errors.MoleculerError(
"Couldn't fetch bundles from AirDC++",
500
);
}
},
},
flushDB: {
rest: "POST /flushDB",
params: {},
@@ -669,8 +863,57 @@ export default class ImportService extends Service {
console.log(ctx.params);
},
},
/**
* Enhanced import from job queue - works with enhanced Comic model
*/
importFromJob: {
params: {
importType: "string",
bundleId: { type: "string", optional: true },
payload: "object"
},
async handler(ctx: Context<{
importType: string;
bundleId?: string;
payload: any;
}>) {
try {
const { importType, bundleId, payload } = ctx.params;
console.log(`Importing comic with enhanced metadata processing...`);
// Create comic with enhanced metadata structure
const comic = new Comic({
...payload,
importStatus: {
isImported: true,
tagged: false,
lastProcessed: new Date()
}
});
await comic.save();
console.log(`Successfully imported comic: ${comic._id}`);
console.log(`Resolved metadata: ${JSON.stringify(comic.resolvedMetadata)}`);
return {
success: true,
comic: comic._id,
metadata: {
sources: Object.keys(comic.sourcedMetadata || {}),
resolvedFields: Object.keys(comic.resolvedMetadata || {}),
primarySource: comic.resolvedMetadata?.primarySource || 'inferred'
}
};
} catch (error) {
console.error("Error importing comic:", error);
throw error;
}
}
}
},
methods: {},
methods: {}
});
}
}

View File

@@ -46,14 +46,15 @@ export default class SettingsService extends Service {
.map((item) => JSON.stringify(item))
.join("\n");
queries += "\n";
const { body } = await eSClient.msearch({
const { responses } = await eSClient.msearch({
body: queries,
});
body.responses.forEach((match) => {
responses.forEach((match) => {
console.log(match.hits);
});
return body.responses;
return responses;
},
},
issue: {
@@ -74,9 +75,9 @@ export default class SettingsService extends Service {
) => {
try {
console.log(ctx.params);
const { query, pagination } = ctx.params;
const { query, pagination, type } = ctx.params;
let eSQuery = {};
switch (ctx.params.type) {
switch (type) {
case "all":
Object.assign(eSQuery, {
match_all: {},
@@ -99,12 +100,19 @@ export default class SettingsService extends Service {
case "wanted":
Object.assign(eSQuery, {
bool: {
must: {
term: {
"acquisition.source.wanted":
true,
should: [
{
exists: {
field: "wanted.issues",
},
},
},
{
exists: {
field: "wanted.volume",
},
},
],
minimum_should_match: 1,
},
});
break;

View File

@@ -8,7 +8,7 @@ import {
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Settings from "../models/settings.model";
import { isEmpty, pickBy, identity, map } from "lodash";
import { isEmpty, pickBy, identity, map, isNil } from "lodash";
const ObjectId = require("mongoose").Types.ObjectId;
export default class SettingsService extends Service {
@@ -24,16 +24,52 @@ export default class SettingsService extends Service {
settings: {},
hooks: {},
actions: {
getEnvironmentVariables: {
rest: "GET /getEnvironmentVariables",
params: {},
handler: async (ctx: Context<{}>) => {
return {
comicsDirectory: process.env.COMICS_DIRECTORY,
userdataDirectory: process.env.USERDATA_DIRECTORY,
redisURI: process.env.REDIS_URI,
elasticsearchURI: process.env.ELASTICSEARCH_URI,
mongoURI: process.env.MONGO_URI,
kafkaBroker: process.env.KAFKA_BROKER,
unrarBinPath: process.env.UNRAR_BIN_PATH,
sevenzBinPath: process.env.SEVENZ_BINARY_PATH,
comicvineAPIKey: process.env.COMICVINE_API_KEY,
}
}
},
getSettings: {
rest: "GET /getAllSettings",
params: {},
async handler(ctx: Context<{ settingsKey: string }>) {
const settings = await Settings.find({});
if (isEmpty(settings)) {
const { settingsKey } = ctx.params;
// Initialize a projection object. Include everything by default.
let projection = settingsKey
? { _id: 0, [settingsKey]: 1 }
: {};
// Find the settings with the dynamic projection
const settings = await Settings.find({}, projection);
if (settings.length === 0) {
return {};
}
console.log(settings[0]);
return settings[0];
// If settingsKey is provided, return the specific part of the settings.
// Otherwise, return the entire settings document.
if (settingsKey) {
// Check if the specific key exists in the settings document.
// Since `settings` is an array, we access the first element.
// Then, we use the settingsKey to return only that part of the document.
return settings[0][settingsKey] || {};
} else {
// Return the entire settings document
return settings[0];
}
},
},
@@ -42,44 +78,107 @@ export default class SettingsService extends Service {
params: {},
async handler(
ctx: Context<{
settingsPayload: {
host: object;
airDCPPUserSettings: object;
hubs: [];
settingsPayload?: {
protocol: string;
hostname: string;
port: string;
username: string;
password: string;
_id?: string;
airDCPPUserSettings?: object;
hubs?: [];
};
settingsObjectId: string;
settingsObjectId?: string;
settingsKey: string;
}>
) {
console.log("varan bhat", ctx.params);
const { host, airDCPPUserSettings, hubs } =
ctx.params.settingsPayload;
let query = {
host,
airDCPPUserSettings,
hubs,
};
const keysToUpdate = pickBy(query, identity);
let updateQuery = {};
try {
console.log(ctx.params);
let query = {};
const { settingsKey, settingsObjectId } =
ctx.params;
const {
hostname,
protocol,
port,
username,
password,
} = ctx.params.settingsPayload;
const host = {
hostname,
protocol,
port,
username,
password,
};
const undefinedPropsInHostname = Object.values(
host
).filter((value) => value === undefined);
map(Object.keys(keysToUpdate), (key) => {
updateQuery[`directConnect.client.${key}`] =
query[key];
});
const options = {
upsert: true,
new: true,
setDefaultsOnInsert: true,
};
const filter = {
_id: new ObjectId(ctx.params.settingsObjectId),
};
const result = Settings.findOneAndUpdate(
filter,
{ $set: updateQuery },
options
);
// Update, depending what key was passed in params
// 1. Construct the update query
switch (settingsKey) {
case "bittorrent":
console.log(
`Recieved settings for ${settingsKey}, building query...`
);
query = {
...(undefinedPropsInHostname.length ===
0 && {
$set: {
"bittorrent.client.host": host,
},
}),
};
break;
case "directConnect":
console.log(
`Recieved settings for ${settingsKey}, building query...`
);
const { hubs, airDCPPUserSettings } =
ctx.params.settingsPayload;
query = {
...(undefinedPropsInHostname.length ===
0 && {
$set: {
"directConnect.client.host":
host,
},
}),
...(!isNil(hubs) && {
$set: {
"directConnect.client.hubs":
hubs,
},
}),
};
console.log(JSON.stringify(query, null, 4));
break;
return result;
default:
return false;
}
// 2. Set up options, filters
const options = {
upsert: true,
setDefaultsOnInsert: true,
returnDocument: "after",
};
const filter = settingsObjectId
? { _id: settingsObjectId }
: {};
// 3. Execute the mongo query
const result = await Settings.findOneAndUpdate(
filter,
query,
options
);
return result;
} catch (err) {
return err;
}
},
},
deleteSettings: {

View File

@@ -1,16 +1,20 @@
"use strict";
import { Service, ServiceBroker, ServiceSchema } from "moleculer";
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
import { JobType } from "moleculer-bullmq";
import { createClient } from "redis";
import { createAdapter } from "@socket.io/redis-adapter";
import Session from "../models/session.model";
import { pubClient, subClient } from "../config/redis.config";
const { MoleculerError } = require("moleculer").Errors;
const SocketIOService = require("moleculer-io");
const redisURL = new URL(process.env.REDIS_URI);
// console.log(redisURL.hostname);
const { v4: uuidv4 } = require("uuid");
import AirDCPPSocket from "../shared/airdcpp.socket";
import type { Socket as IOSocket } from "socket.io";
import { namespace } from "../moleculer.config";
// Context type carrying the Socket.IO socket in meta
type SocketCtx<P> = Context<P, { socket: IOSocket }>;
const pubClient = createClient({ url: `redis://${redisURL.hostname}:6379` });
(async () => {
await pubClient.connect();
})();
const subClient = pubClient.duplicate();
export default class SocketService extends Service {
// @ts-ignore
public constructor(
@@ -25,51 +29,20 @@ export default class SocketService extends Service {
port: process.env.PORT || 3001,
io: {
namespaces: {
"/": {
"/automated": {
events: {
call: {
// whitelist: ["math.*", "say.*", "accounts.*", "rooms.*", "io.*"],
},
action: async (data, ack) => {
// write your handler function here.
switch (data.type) {
case "LS_IMPORT":
console.log(
`Recieved ${data.type} event.`
);
// 1. Send task to queue
await this.broker.call(
"library.newImport",
data.data,
{}
);
break;
case "LS_TOGGLE_IMPORT_QUEUE":
await this.broker.call(
"importqueue.toggleImportQueue",
data.data,
{}
);
break;
case "LS_SINGLE_IMPORT":
console.info(
"AirDC++ finished a download -> "
);
console.log(data);
await this.broker.call(
"library.importDownloadedComic",
{ bundle: data },
{}
);
break;
// uncompress archive events
case "COMICBOOK_EXTRACTION_SUCCESS":
console.log(data);
return data;
}
whitelist: [
"socket.*", // Allow 'search' in the automated namespace
],
},
},
},
"/manual": {
events: {
call: { whitelist: ["socket.*"] },
},
},
},
options: {
adapter: createAdapter(pubClient, subClient),
@@ -77,12 +50,346 @@ export default class SocketService extends Service {
},
},
hooks: {},
actions: {},
methods: {},
actions: {
resumeSession: async (ctx: Context<{ sessionId: string }>) => {
const { sessionId } = ctx.params;
console.log("Attempting to resume session...");
try {
const sessionRecord = await Session.find({
sessionId,
});
// 1. Check for sessionId's existence, and a match
if (
sessionRecord.length !== 0 &&
sessionRecord[0].sessionId === sessionId
) {
// 2. Find if the queue has active, paused or waiting jobs
const jobs: JobType = await this.broker.call(
"jobqueue.getJobCountsByType",
{}
);
const { active, paused, waiting } = jobs;
if (active > 0 || paused > 0 || waiting > 0) {
// 3. Get job counts
const completedJobCount = await pubClient.get(
"completedJobCount"
);
const failedJobCount = await pubClient.get(
"failedJobCount"
);
// 4. Send the counts to the active socket.io session
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "RESTORE_JOB_COUNTS_AFTER_SESSION_RESTORATION",
args: [
{
completedJobCount,
failedJobCount,
queueStatus: "running",
},
],
});
}
}
} catch (err) {
throw new MoleculerError(
err,
500,
"SESSION_ID_NOT_FOUND",
{
data: sessionId,
}
);
}
},
setQueueStatus: async (
ctx: Context<{
queueAction: string;
queueStatus: string;
}>
) => {
const { queueAction } = ctx.params;
await this.broker.call(
"jobqueue.toggle",
{ action: queueAction },
{}
);
},
importSingleIssue: async (ctx: Context<{}>) => {
console.info("AirDC++ finished a download -> ");
console.log(ctx.params);
// await this.broker.call(
// "library.importDownloadedComic",
// { bundle: data },
// {}
// );
},
search: {
params: {
query: "object",
config: "object",
},
async handler(ctx) {
const { query, config, namespace } = ctx.params;
const namespacedInstance = this.io.of(namespace || "/");
const ADCPPSocket = new AirDCPPSocket(config);
try {
await ADCPPSocket.connect();
const instance = await ADCPPSocket.post(
"search",
query
);
// Send the instance to the client
await namespacedInstance.emit("searchInitiated", {
instance,
});
// Setting up listeners
await ADCPPSocket.addListener(
`search`,
`search_result_added`,
(groupedResult) => {
console.log(
JSON.stringify(groupedResult, null, 4)
);
namespacedInstance.emit(
"searchResultAdded",
groupedResult
);
},
instance.id
);
await ADCPPSocket.addListener(
`search`,
`search_result_updated`,
(updatedResult) => {
namespacedInstance.emit(
"searchResultUpdated",
updatedResult
);
},
instance.id
);
await ADCPPSocket.addListener(
`search`,
`search_hub_searches_sent`,
async (searchInfo) => {
await this.sleep(5000);
const currentInstance =
await ADCPPSocket.get(
`search/${instance.id}`
);
// Send the instance to the client
await namespacedInstance.emit(
"searchesSent",
{
searchInfo,
}
);
if (currentInstance.result_count === 0) {
console.log("No more search results.");
namespacedInstance.emit(
"searchComplete",
{
message:
"No more search results.",
}
);
}
},
instance.id
);
// Perform the actual search
await ADCPPSocket.post(
`search/${instance.id}/hub_search`,
query
);
} catch (error) {
await namespacedInstance.emit(
"searchError",
error.message
);
throw new MoleculerError(
"Search failed",
500,
"SEARCH_FAILED",
{
error,
}
);
} finally {
// await ADCPPSocket.disconnect();
}
},
},
download: {
// params: {
// searchInstanceId: "string",
// resultId: "string",
// comicObjectId: "string",
// name: "string",
// size: "number",
// type: "any", // Define more specific type if possible
// config: "object",
// },
async handler(ctx) {
console.log(ctx.params);
const {
searchInstanceId,
resultId,
config,
comicObjectId,
name,
size,
type,
} = ctx.params;
const ADCPPSocket = new AirDCPPSocket(config);
try {
await ADCPPSocket.connect();
const downloadResult = await ADCPPSocket.post(
`search/${searchInstanceId}/results/${resultId}/download`
);
if (downloadResult && downloadResult.bundle_info) {
// Assume bundle_info is part of the response and contains the necessary details
const bundleDBImportResult = await ctx.call(
"library.applyAirDCPPDownloadMetadata",
{
bundleId: downloadResult.bundle_info.id,
comicObjectId,
name,
size,
type,
}
);
this.logger.info(
"Download and metadata update successful",
bundleDBImportResult
);
this.broker.emit(
"downloadCompleted",
bundleDBImportResult
);
return bundleDBImportResult;
} else {
throw new Error(
"Failed to download or missing download result information"
);
}
} catch (error) {
this.broker.emit("downloadError", error.message);
throw new MoleculerError(
"Download failed",
500,
"DOWNLOAD_FAILED",
{
error,
}
);
} finally {
// await ADCPPSocket.disconnect();
}
},
},
listenFileProgress: {
params: { config: "object", namespace: "string" },
async handler(
ctx: SocketCtx<{ config: any; namespace: string }>
) {
const { config, namespace } = ctx.params;
const namespacedInstance = this.io.of(namespace || "/");
const ADCPPSocket = new AirDCPPSocket(config);
try {
// Connect once
await ADCPPSocket.connect();
await ADCPPSocket.addListener(
"queue",
"queue_bundle_tick",
async (data) => {
console.log(
`is mulk ne har shakz ko jo kaam tha saupa \nus shakz ne us kaam ki maachis jala di`
);
namespacedInstance.emit("downloadTick", data)
}
);
} catch {}
},
},
},
methods: {
sleep: (ms: number): Promise<NodeJS.Timeout> => {
return new Promise((resolve) => setTimeout(resolve, ms));
},
handleSocketConnection: async function (socket: any) {
this.logger.info(
`Socket connected with session ID: ${socket.id}`
);
console.log("Looking up sessionId in Mongo...");
const sessionIdExists = await Session.find({
sessionId: socket.handshake.query.sessionId,
});
if (sessionIdExists.length === 0) {
console.log(
`Socket Id ${socket.id} not found in Mongo, creating a new session...`
);
const sessionId = uuidv4();
socket.sessionId = sessionId;
console.log(`Saving session ${sessionId} to Mongo...`);
await Session.create({
sessionId,
socketId: socket.id,
});
socket.emit("sessionInitialized", sessionId);
} else {
console.log(`Found socketId ${socket.id}, no-op.`);
}
},
},
async started() {
this.io.on("connection", (data) =>
console.log("socket.io server initialized.")
);
this.io.of("/manual").on("connection", async (socket) => {
console.log(
`socket.io server connected to /manual namespace`
);
});
this.io.on("connection", async (socket) => {
console.log(
`socket.io server connected to client with session ID: ${socket.id}`
);
console.log("Looking up sessionId in Mongo...");
const sessionIdExists = await Session.find({
sessionId: socket.handshake.query.sessionId,
});
// 1. if sessionId isn't found in Mongo, create one and persist it
if (sessionIdExists.length === 0) {
console.log(
`Socket Id ${socket.id} not found in Mongo, creating a new session...`
);
const sessionId = uuidv4();
socket.sessionId = sessionId;
console.log(`Saving session ${sessionId} to Mongo...`);
await Session.create({
sessionId,
socketId: socket.id,
});
socket.emit("sessionInitialized", sessionId);
}
// 2. else, retrieve it from Mongo and "resume" the socket.io connection
else {
console.log(`Found socketId ${socket.id}, no-op.`);
}
});
},
});
}

View File

@@ -0,0 +1,98 @@
"use strict";
import {
Context,
Service,
ServiceBroker,
ServiceSchema,
Errors,
} from "moleculer";
import { DbMixin } from "../mixins/db.mixin";
import Comic from "../models/comic.model";
import BullMqMixin from "moleculer-bullmq";
const { MoleculerError } = require("moleculer").Errors;
export default class ImageTransformation extends Service {
// @ts-ignore
public constructor(
public broker: ServiceBroker,
schema: ServiceSchema<{}> = { name: "torrentjobs" }
) {
super(broker);
this.parseServiceSchema({
name: "torrentjobs",
mixins: [DbMixin("comics", Comic), BullMqMixin],
settings: {
bullmq: {
client: process.env.REDIS_URI,
},
},
hooks: {},
actions: {
getTorrentData: {
queue: true,
rest: "GET /getTorrentData",
handler: async (ctx: Context<{ trigger: string }>) => {
const { trigger } = ctx.params;
console.log(`Recieved ${trigger} as the trigger...`);
const jobOptions = {
jobId: "retrieveTorrentData",
name: "bossy",
repeat: {
every: 10000, // Repeat every 10000 ms
limit: 100, // Limit to 100 repeats
},
};
const job = await this.localQueue(
ctx,
"fetchTorrentData",
ctx.params,
jobOptions
);
return job;
},
},
fetchTorrentData: {
rest: "GET /fetchTorrentData",
handler: async (
ctx: Context<{
birdName: String;
}>
) => {
const repeatableJob = await this.$resolve(
"torrentjobs"
).getRepeatableJobs();
console.info(repeatableJob);
console.info(
`Scheduled job for fetching torrent data fired.`
);
// 1. query mongo for infohashes
const infoHashes = await this.broker.call(
"library.getInfoHashes",
{}
);
// 2. query qbittorrent to see if they exist
const torrents: any = await this.broker.call(
"qbittorrent.getTorrentRealTimeStats",
{ infoHashes }
);
// 4.
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "AS_TORRENT_DATA",
args: [
{
torrents,
},
],
});
// 3. If they do, don't do anything
// 4. If they don't purge them from mongo
},
},
},
methods: {},
});
}
}

151
shared/airdcpp.socket.ts Normal file
View File

@@ -0,0 +1,151 @@
import WebSocket from "ws";
/**
* Wrapper around the AirDC++ WebSocket API socket.
* Provides methods to connect, disconnect, and interact with the AirDC++ API.
*/
class AirDCPPSocket {
/**
* Configuration options for the underlying socket.
* @private
*/
private options: {
url: string;
autoReconnect: boolean;
reconnectInterval: number;
logLevel: string;
ignoredListenerEvents: string[];
username: string;
password: string;
};
/**
* Instance of the AirDC++ API socket.
* @private
*/
private socketInstance: any;
/**
* Promise that resolves when the Socket module is loaded
* @private
*/
private socketModulePromise: Promise<any>;
/**
* Constructs a new AirDCPPSocket wrapper.
* @param {{ protocol: string; hostname: string; username: string; password: string }} configuration
* Connection configuration: protocol (ws or wss), hostname, username, and password.
*/
constructor(configuration: {
protocol: string;
hostname: string;
username: string;
password: string;
}) {
const socketProtocol =
configuration.protocol === "https" ? "wss" : "ws";
this.options = {
url: `${socketProtocol}://${configuration.hostname}/api/v1/`,
autoReconnect: true,
reconnectInterval: 5000,
logLevel: "verbose",
ignoredListenerEvents: [
"transfer_statistics",
"hash_statistics",
"hub_counts_updated",
],
username: configuration.username,
password: configuration.password,
};
// Use dynamic import to load the ES module
this.socketModulePromise = import("airdcpp-apisocket").then(module => {
const { Socket } = module;
this.socketInstance = Socket(this.options, WebSocket);
return this.socketInstance;
});
}
/**
* Establishes a connection to the AirDC++ server.
* @async
* @returns {Promise<any>} Session information returned by the server.
*/
async connect(): Promise<any> {
await this.socketModulePromise;
if (
this.socketInstance &&
typeof this.socketInstance.connect === "function"
) {
return await this.socketInstance.connect();
}
return Promise.reject(
new Error("Connect method not available on socket instance")
);
}
/**
* Disconnects from the AirDC++ server.
* @async
* @returns {Promise<void>}
*/
async disconnect(): Promise<void> {
await this.socketModulePromise;
if (
this.socketInstance &&
typeof this.socketInstance.disconnect === "function"
) {
await this.socketInstance.disconnect();
}
}
/**
* Sends a POST request to a specific AirDC++ endpoint.
* @async
* @param {string} endpoint - API endpoint path (e.g., "search").
* @param {object} [data={}] - Payload to send with the request.
* @returns {Promise<any>} Response from the AirDC++ server.
*/
async post(endpoint: string, data: object = {}): Promise<any> {
await this.socketModulePromise;
return await this.socketInstance.post(endpoint, data);
}
/**
* Sends a GET request to a specific AirDC++ endpoint.
* @async
* @param {string} endpoint - API endpoint path (e.g., "search/123").
* @param {object} [data={}] - Query parameters to include.
* @returns {Promise<any>} Response from the AirDC++ server.
*/
async get(endpoint: string, data: object = {}): Promise<any> {
await this.socketModulePromise;
return await this.socketInstance.get(endpoint, data);
}
/**
* Adds an event listener to the AirDC++ socket.
* @async
* @param {string} event - Event group (e.g., "search" or "queue").
* @param {string} handlerName - Specific event within the group (e.g., "search_result_added").
* @param {Function} callback - Callback to invoke when the event occurs.
* @param {string|number} [id] - Optional identifier (e.g., search instance ID).
* @returns {Promise<any>} Listener registration result.
*/
async addListener(
event: string,
handlerName: string,
callback: (...args: any[]) => void,
id?: string | number
): Promise<any> {
await this.socketModulePromise;
return await this.socketInstance.addListener(
event,
handlerName,
callback,
id
);
}
}
export default AirDCPPSocket;

View File

@@ -4,6 +4,7 @@
"esModuleInterop": true,
"noImplicitAny": false,
"removeComments": true,
"allowSyntheticDefaultImports": true,
"preserveConstEnums": true,
"sourceMap": true,
"pretty": true,

View File

@@ -1,24 +0,0 @@
<?xml version="1.0"?>
<ComicInfo xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<Title>Title of the Book</Title>
<Summary>A description of the book</Summary>
<Number>1</Number>
<Count>3</Count>
<Year>2010</Year>
<Month>4</Month>
<Writer>Author name</Writer>
<Publisher>self</Publisher>
<Genre>educational</Genre>
<BlackAndWhite>No</BlackAndWhite>
<Manga>No</Manga>
<Characters>Superman</Characters>
<PageCount>5</PageCount>
<Pages>
<Page Image="0" Type="FrontCover" ImageSize="139382" ImageWidth="774" ImageHeight="1024" />
<Page Image="2" ImageSize="125736" ImageWidth="797" ImageHeight="1024" />
<Page Image="1" ImageSize="127937" ImageWidth="797" ImageHeight="1024" />
<Page Image="4" ImageSize="160902" ImageWidth="804" ImageHeight="1024" />
<Page Image="3" ImageSize="211181" ImageWidth="804" ImageHeight="1024" />
</Pages>
</ComicInfo>

View File

@@ -21,7 +21,7 @@ const ALLOWED_IMAGE_FILE_FORMATS = [".jpg", ".jpeg", ".png"];
// Tell FileMagic where to find the magic.mgc file
FileMagic.magicFile = require.resolve("@npcz/magic/dist/magic.mgc");
// We can onlu use MAGIC_PRESERVE_ATIME on operating suystems that support
// We can only use MAGIC_PRESERVE_ATIME on operating suystems that support
// it and that includes OS X for example. It's a good practice as we don't
// want to change the last access time because we are just checking the file
// contents type
@@ -108,6 +108,14 @@ export const isValidImageFileExtension = (fileName: string): boolean => {
return includes(ALLOWED_IMAGE_FILE_FORMATS, path.extname(fileName));
};
/**
* This function constructs paths for a target extraction folder and an input file based on extraction
* options and a walked folder.
* @param {IExtractionOptions} extractionOptions - An object containing options for the extraction
* process, such as the target extraction folder.
* @param {IFolderData} walkedFolder - `walkedFolder` is an object that represents a folder that has
* been walked through during a file extraction process. It contains the following properties:
*/
export const constructPaths = (
extractionOptions: IExtractionOptions,
walkedFolder: IFolderData
@@ -142,7 +150,7 @@ export const getFileConstituents = (filePath: string) => {
};
/**
* Method that infers MIME type from a filepath
* Method that infers MIME type from a filepath
* @param {string} filePath
* @returns {Promise} string
*/
@@ -155,6 +163,15 @@ export const getMimeType = async (filePath: string) => {
});
};
/**
* This function creates a directory at a specified path using the fse.ensureDir method and throws an
* error if it fails.
* @param {any} options - The options parameter is an optional object that can be passed to the
* fse.ensureDir method to configure its behavior. It can include properties such as mode, which sets
* the permissions of the directory, and fs, which specifies the file system module to use.
* @param {string} directoryPath - The `directoryPath` parameter is a string that represents the path
* of the directory that needs to be created.
*/
export const createDirectory = async (options: any, directoryPath: string) => {
try {
await fse.ensureDir(directoryPath, options);

View File

@@ -47,6 +47,7 @@ import {
getMimeType,
} from "../utils/file.utils";
import { convertXMLToJSON } from "./xml.utils";
const { MoleculerError } = require("moleculer").Errors;
const fse = require("fs-extra");
const Unrar = require("unrar");
interface RarFile {
@@ -73,7 +74,7 @@ const errors = [];
*/
export const extractComicInfoXMLFromRar = async (
filePath: string,
mimeType: string,
mimeType: string
): Promise<any> => {
try {
// Create the target directory
@@ -209,7 +210,7 @@ export const extractComicInfoXMLFromRar = async (
export const extractComicInfoXMLFromZip = async (
filePath: string,
mimeType: string,
mimeType: string
): Promise<any> => {
try {
// Create the target directory
@@ -254,7 +255,7 @@ export const extractComicInfoXMLFromZip = async (
// Push the first file (cover) to our extraction target
extractionTargets.push(files[0].name);
filesToWriteToDisk.coverFile = path.basename(files[0].name);
if (!isEmpty(comicInfoXMLFileObject)) {
filesToWriteToDisk.comicInfoXML = comicInfoXMLFileObject[0].name;
extractionTargets.push(filesToWriteToDisk.comicInfoXML);
@@ -356,18 +357,26 @@ export const extractFromArchive = async (filePath: string) => {
switch (mimeType) {
case "application/x-7z-compressed; charset=binary":
case "application/zip; charset=binary":
const cbzResult = await extractComicInfoXMLFromZip(filePath, mimeType);
const cbzResult = await extractComicInfoXMLFromZip(
filePath,
mimeType
);
return Object.assign({}, ...cbzResult);
case "application/x-rar; charset=binary":
const cbrResult = await extractComicInfoXMLFromRar(filePath, mimeType);
const cbrResult = await extractComicInfoXMLFromRar(
filePath,
mimeType
);
return Object.assign({}, ...cbrResult);
default:
console.log(
console.error(
"Error inferring filetype for comicinfo.xml extraction."
);
break;
throw new MoleculerError({}, 500, "FILETYPE_INFERENCE_ERROR", {
data: { message: "Cannot infer filetype." },
});
}
};