Files
threetwo-core-service/MOLECULER_DEPENDENCY_ANALYSIS.md

423 lines
16 KiB
Markdown

# Moleculer Microservices Dependency Analysis
**ThreeTwo Core Service - Comic Book Library Management System**
## System Overview
This **ThreeTwo Core Service** is a sophisticated **comic book library management system** built on Moleculer microservices architecture. The system demonstrates advanced patterns including:
- **Event-driven architecture** with real-time WebSocket communication
- **Asynchronous job processing** with BullMQ for heavy operations
- **Multi-source metadata aggregation** with canonical data resolution
- **Hybrid search** combining MongoDB aggregation and ElasticSearch
- **External system integrations** (P2P, BitTorrent, Comic APIs)
### Technical Stack
- **Framework**: Moleculer.js microservices
- **Node ID**: `threetwo-core-service`
- **Transport**: Redis (`redis://localhost:6379`)
- **Databases**: MongoDB + ElasticSearch
- **Queue System**: BullMQ (Redis-backed)
- **Real-time**: Socket.IO with Redis adapter
- **External APIs**: ComicVine, AirDC++, qBittorrent
## Service Architecture
### Core Services
| Service | File | Role | Dependencies |
|---------|------|------|-------------|
| **API** | [`api.service.ts`](services/api.service.ts) | API Gateway + File System Watcher | → library, jobqueue |
| **Library** | [`library.service.ts`](services/library.service.ts) | Core Comic Library Management | → jobqueue, search, comicvine |
| **JobQueue** | [`jobqueue.service.ts`](services/jobqueue.service.ts) | Asynchronous Job Processing (BullMQ) | → library, socket |
| **Socket** | [`socket.service.ts`](services/socket.service.ts) | Real-time Communication (Socket.IO) | → library, jobqueue |
| **Search** | [`search.service.ts`](services/search.service.ts) | ElasticSearch Integration | ElasticSearch client |
| **GraphQL** | [`graphql.service.ts`](services/graphql.service.ts) | GraphQL API Layer | → search |
### Supporting Services
| Service | File | Role | Dependencies |
|---------|------|------|-------------|
| **AirDC++** | [`airdcpp.service.ts`](services/airdcpp.service.ts) | P2P File Sharing Integration | External AirDC++ client |
| **Settings** | [`settings.service.ts`](services/settings.service.ts) | Configuration Management | MongoDB |
| **Image Transform** | [`imagetransformation.service.ts`](services/imagetransformation.service.ts) | Cover Processing | File system |
| **OPDS** | [`opds.service.ts`](services/opds.service.ts) | Comic Catalog Feeds | File system |
| **Torrent Jobs** | [`torrentjobs.service.ts`](services/torrentjobs.service.ts) | BitTorrent Integration | → library, qbittorrent |
## Service-to-Service Dependencies
### Core Service Interactions
#### 1. API Service → Other Services
```typescript
// File system watcher triggers import
ctx.broker.call("library.walkFolders", { basePathToWalk: filePath })
ctx.broker.call("importqueue.processImport", { fileObject })
```
#### 2. Library Service → Dependencies
```typescript
// Job queue integration
this.broker.call("jobqueue.enqueue", { action: "enqueue.async" })
// Search operations
ctx.broker.call("search.searchComic", { elasticSearchQueries })
ctx.broker.call("search.deleteElasticSearchIndices", {})
// External metadata
ctx.broker.call("comicvine.getVolumes", { volumeURI })
```
#### 3. JobQueue Service → Dependencies
```typescript
// Import processing
this.broker.call("library.importFromJob", { importType, payload })
// Real-time updates
this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_COVER_EXTRACTED",
args: [{ completedJobCount, importResult }]
})
```
#### 4. Socket Service → Dependencies
```typescript
// Job management
ctx.broker.call("jobqueue.getJobCountsByType", {})
ctx.broker.call("jobqueue.toggle", { action: queueAction })
// Download tracking
ctx.call("library.applyAirDCPPDownloadMetadata", {
bundleId, comicObjectId, name, size, type
})
```
#### 5. GraphQL Service → Search
```typescript
// Wanted comics query
const result = await ctx.broker.call("search.issue", {
query: eSQuery,
pagination: { size: limit, from: offset },
type: "wanted"
})
```
## API Endpoint Mapping
### REST API Routes (`/api/*`)
#### Library Management
- `POST /api/library/walkFolders` → [`library.walkFolders`](services/library.service.ts:82)
- `POST /api/library/newImport` → [`library.newImport`](services/library.service.ts:165) → [`jobqueue.enqueue`](services/library.service.ts:219)
- `POST /api/library/getComicBooks` → [`library.getComicBooks`](services/library.service.ts:535)
- `POST /api/library/getComicBookById` → [`library.getComicBookById`](services/library.service.ts:550)
- `POST /api/library/flushDB` → [`library.flushDB`](services/library.service.ts:818) → [`search.deleteElasticSearchIndices`](services/library.service.ts:839)
- `GET /api/library/libraryStatistics` → [`library.libraryStatistics`](services/library.service.ts:684)
#### Job Management
- `GET /api/jobqueue/getJobCountsByType` → [`jobqueue.getJobCountsByType`](services/jobqueue.service.ts:31)
- `GET /api/jobqueue/toggle` → [`jobqueue.toggle`](services/jobqueue.service.ts:38)
- `GET /api/jobqueue/getJobResultStatistics` → [`jobqueue.getJobResultStatistics`](services/jobqueue.service.ts:214)
#### Search Operations
- `POST /api/search/searchComic` → [`search.searchComic`](services/search.service.ts:28)
- `POST /api/search/searchIssue` → [`search.issue`](services/search.service.ts:60)
- `GET /api/search/deleteElasticSearchIndices` → [`search.deleteElasticSearchIndices`](services/search.service.ts:171)
#### AirDC++ Integration
- `POST /api/airdcpp/initialize` → [`airdcpp.initialize`](services/airdcpp.service.ts:24)
- `POST /api/airdcpp/getHubs` → [`airdcpp.getHubs`](services/airdcpp.service.ts:59)
- `POST /api/airdcpp/search` → [`airdcpp.search`](services/airdcpp.service.ts:96)
#### Image Processing
- `POST /api/imagetransformation/resizeImage` → [`imagetransformation.resize`](services/imagetransformation.service.ts:37)
- `POST /api/imagetransformation/analyze` → [`imagetransformation.analyze`](services/imagetransformation.service.ts:57)
### GraphQL Endpoints
- `POST /graphql` → [`graphql.wantedComics`](services/graphql.service.ts:49) → [`search.issue`](services/graphql.service.ts:77)
### Static File Serving
- `/userdata/*` → Static files from `./userdata`
- `/comics/*` → Static files from `./comics`
- `/logs/*` → Static files from `logs`
## Event-Driven Communication
### Job Queue Events
#### Job Completion Events
```typescript
// Successful import completion
"enqueue.async.completed" socket.broadcast("LS_COVER_EXTRACTED", {
completedJobCount,
importResult: job.returnvalue.data.importResult
})
// Failed import handling
"enqueue.async.failed" socket.broadcast("LS_COVER_EXTRACTION_FAILED", {
failedJobCount,
importResult: job
})
// Queue drained
"drained" socket.broadcast("LS_IMPORT_QUEUE_DRAINED", {
message: "drained"
})
```
#### Archive Processing Events
```typescript
// Archive uncompression completed
"uncompressFullArchive.async.completed" socket.broadcast("LS_UNCOMPRESSION_JOB_COMPLETE", {
uncompressedArchive: job.returnvalue
})
```
### File System Events
```typescript
// File watcher events (debounced 200ms)
fileWatcher.on("add", (path, stats) {
broker.call("library.walkFolders", { basePathToWalk: filePath })
broker.call("importqueue.processImport", { fileObject })
broker.broadcast(event, { path: filePath })
})
```
### WebSocket Events
#### Real-time Search
```typescript
// Search initiation
socket.emit("searchInitiated", { instance })
// Live search results
socket.emit("searchResultAdded", groupedResult)
socket.emit("searchResultUpdated", updatedResult)
socket.emit("searchComplete", { message })
```
#### Download Progress
```typescript
// Download status
broker.emit("downloadCompleted", bundleDBImportResult)
broker.emit("downloadError", error.message)
// Progress tracking
socket.emit("downloadTick", data)
```
## Data Flow Architecture
### 1. Comic Import Processing Flow
```mermaid
graph TD
A[File System Watcher] --> B[library.walkFolders]
B --> C[jobqueue.enqueue]
C --> D[jobqueue.enqueue.async]
D --> E[Archive Extraction]
E --> F[Metadata Processing]
F --> G[Canonical Metadata Creation]
G --> H[library.importFromJob]
H --> I[MongoDB Storage]
I --> J[ElasticSearch Indexing]
J --> K[socket.broadcast LS_COVER_EXTRACTED]
```
### 2. Search & Discovery Flow
```mermaid
graph TD
A[GraphQL/REST Query] --> B[search.issue]
B --> C[ElasticSearch Query]
C --> D[Results Enhancement]
D --> E[Metadata Scoring]
E --> F[Structured Response]
```
### 3. Download Management Flow
```mermaid
graph TD
A[socket[search]] --> B[airdcpp.search]
B --> C[Real-time Results]
C --> D[socket[download]]
D --> E[library.applyAirDCPPDownloadMetadata]
E --> F[Progress Tracking]
F --> G[Import Pipeline]
```
## Database Dependencies
### MongoDB Collections
| Collection | Model | Used By Services |
|------------|-------|-----------------|
| **comics** | [`Comic`](models/comic.model.ts) | library, search, jobqueue, imagetransformation |
| **settings** | [`Settings`](models/settings.model.ts) | settings |
| **sessions** | [`Session`](models/session.model.ts) | socket |
| **jobresults** | [`JobResult`](models/jobresult.model.ts) | jobqueue |
### ElasticSearch Integration
- **Index**: `comics` - Full-text search with metadata scoring
- **Client**: [`eSClient`](services/search.service.ts:13) from [`comic.model.ts`](models/comic.model.ts)
- **Query Types**: match_all, multi_match, bool queries with field boosting
### Redis Usage
| Purpose | Services | Configuration |
|---------|----------|---------------|
| **Transport** | All services | [`moleculer.config.ts:93`](moleculer.config.ts:93) |
| **Job Queue** | jobqueue | [`jobqueue.service.ts:27`](services/jobqueue.service.ts:27) |
| **Socket.IO Adapter** | socket | [`socket.service.ts:48`](services/socket.service.ts:48) |
| **Job Counters** | jobqueue | [`completedJobCount`](services/jobqueue.service.ts:392), [`failedJobCount`](services/jobqueue.service.ts:422) |
## External System Integrations
### AirDC++ (P2P File Sharing)
```typescript
// Integration wrapper
const ADCPPSocket = new AirDCPPSocket(config)
await ADCPPSocket.connect()
// Search operations
const searchInstance = await ADCPPSocket.post("search")
const searchInfo = await ADCPPSocket.post(`search/${searchInstance.id}/hub_search`, query)
// Download management
const downloadResult = await ADCPPSocket.post(`search/${searchInstanceId}/results/${resultId}/download`)
```
### ComicVine API
```typescript
// Metadata enrichment
const volumeDetails = await this.broker.call("comicvine.getVolumes", {
volumeURI: matchedResult.volume.api_detail_url
})
```
### qBittorrent Client
```typescript
// Torrent monitoring
const torrents = await this.broker.call("qbittorrent.getTorrentRealTimeStats", { infoHashes })
```
## Metadata Management System
### Multi-Source Metadata Aggregation
The system implements sophisticated metadata management with source prioritization:
#### Source Priority Order
1. **ComicInfo.xml** (embedded in archives)
2. **ComicVine API** (external database)
3. **Metron** (comic database)
4. **Grand Comics Database (GCD)**
5. **League of Comic Geeks (LOCG)**
6. **Filename Inference** (fallback)
#### Canonical Metadata Structure
```typescript
const canonical = {
title: findBestValue('title', inferredMetadata.title),
series: {
name: findSeriesValue(['series', 'seriesName', 'name'], inferredMetadata.series),
volume: findBestValue('volume', inferredMetadata.volume || 1),
startYear: findBestValue('startYear', inferredMetadata.issue?.year)
},
issueNumber: findBestValue('issueNumber', inferredMetadata.issue?.number),
publisher: findBestValue('publisher', null),
creators: [], // Combined from all sources
completeness: {
score: calculatedScore,
missingFields: [],
lastCalculated: currentTime
}
}
```
## Performance & Scalability Insights
### Asynchronous Processing
- **Heavy Operations**: Comic import, archive extraction, metadata processing
- **Queue System**: BullMQ with Redis backing for reliability
- **Job Types**: Import processing, archive extraction, torrent monitoring
- **Real-time Updates**: WebSocket progress notifications
### Search Optimization
- **Dual Storage**: MongoDB (transactional) + ElasticSearch (search)
- **Metadata Scoring**: Canonical metadata with source priority
- **Query Types**: Full-text, field-specific, boolean combinations
- **Caching**: Moleculer built-in memory caching
### External Integration Resilience
- **Timeout Handling**: Custom timeouts for long-running operations
- **Error Propagation**: Structured error responses with context
- **Connection Management**: Reusable connections for external APIs
- **Retry Logic**: Built-in retry policies for failed operations
## Critical Dependency Patterns
### 1. Service Chain Dependencies
- **Import Pipeline**: api → library → jobqueue → socket
- **Search Pipeline**: graphql → search → ElasticSearch
- **Download Pipeline**: socket → airdcpp → library
### 2. Circular Dependencies (Managed)
- **socket ←→ library**: Download coordination and progress updates
- **jobqueue ←→ socket**: Job progress notifications and queue control
### 3. Shared Resource Dependencies
- **MongoDB**: library, search, jobqueue, settings services
- **Redis**: All services (transport) + jobqueue (BullMQ) + socket (adapter)
- **ElasticSearch**: search, graphql services
## Architecture Strengths
### 1. Separation of Concerns
- **API Gateway**: Pure routing and file serving
- **Business Logic**: Centralized in library service
- **Data Access**: Abstracted through DbMixin
- **External Integration**: Isolated in dedicated services
### 2. Event-Driven Design
- **File System Events**: Automatic import triggering
- **Job Lifecycle Events**: Progress tracking and error handling
- **Real-time Communication**: WebSocket event broadcasting
### 3. Robust Metadata Management
- **Multi-Source Aggregation**: ComicVine, ComicInfo.xml, filename inference
- **Canonical Resolution**: Smart metadata merging with source attribution
- **User Curation Support**: Framework for manual metadata override
### 4. Scalability Features
- **Microservices Architecture**: Independent service scaling
- **Asynchronous Processing**: Heavy operations don't block API responses
- **Redis Transport**: Distributed service communication
- **Job Queue**: Reliable background processing with retry logic
## Potential Areas for Improvement
### 1. Service Coupling
- **High Interdependence**: library ←→ jobqueue ←→ socket tight coupling
- **Recommendation**: Event-driven decoupling for some operations
### 2. Error Handling
- **Inconsistent Patterns**: Mix of raw errors and MoleculerError usage
- **Recommendation**: Standardized error handling middleware
### 3. Configuration Management
- **Environment Variables**: Direct access vs centralized configuration
- **Recommendation**: Enhanced settings service for runtime configuration
### 4. Testing Strategy
- **Integration Testing**: Complex service interactions need comprehensive testing
- **Recommendation**: Contract testing between services
## Summary
This Moleculer-based architecture demonstrates sophisticated microservices patterns with:
- **11 specialized services** with clear boundaries
- **47 REST endpoints** + GraphQL layer
- **3 WebSocket namespaces** for real-time communication
- **Multi-database architecture** (MongoDB + ElasticSearch)
- **Advanced job processing** with BullMQ
- **External system integration** (P2P, BitTorrent, Comic APIs)
The system successfully manages complex domain requirements while maintaining good separation of concerns and providing excellent user experience through real-time updates and comprehensive metadata management.