Files
threetwo-core-service/MOLECULER_DEPENDENCY_ANALYSIS.md

16 KiB

Moleculer Microservices Dependency Analysis

ThreeTwo Core Service - Comic Book Library Management System

System Overview

This ThreeTwo Core Service is a sophisticated comic book library management system built on Moleculer microservices architecture. The system demonstrates advanced patterns including:

  • Event-driven architecture with real-time WebSocket communication
  • Asynchronous job processing with BullMQ for heavy operations
  • Multi-source metadata aggregation with canonical data resolution
  • Hybrid search combining MongoDB aggregation and ElasticSearch
  • External system integrations (P2P, BitTorrent, Comic APIs)

Technical Stack

  • Framework: Moleculer.js microservices
  • Node ID: threetwo-core-service
  • Transport: Redis (redis://localhost:6379)
  • Databases: MongoDB + ElasticSearch
  • Queue System: BullMQ (Redis-backed)
  • Real-time: Socket.IO with Redis adapter
  • External APIs: ComicVine, AirDC++, qBittorrent

Service Architecture

Core Services

Service File Role Dependencies
API api.service.ts API Gateway + File System Watcher → library, jobqueue
Library library.service.ts Core Comic Library Management → jobqueue, search, comicvine
JobQueue jobqueue.service.ts Asynchronous Job Processing (BullMQ) → library, socket
Socket socket.service.ts Real-time Communication (Socket.IO) → library, jobqueue
Search search.service.ts ElasticSearch Integration ElasticSearch client
GraphQL graphql.service.ts GraphQL API Layer → search

Supporting Services

Service File Role Dependencies
AirDC++ airdcpp.service.ts P2P File Sharing Integration External AirDC++ client
Settings settings.service.ts Configuration Management MongoDB
Image Transform imagetransformation.service.ts Cover Processing File system
OPDS opds.service.ts Comic Catalog Feeds File system
Torrent Jobs torrentjobs.service.ts BitTorrent Integration → library, qbittorrent

Service-to-Service Dependencies

Core Service Interactions

1. API Service → Other Services

// File system watcher triggers import
ctx.broker.call("library.walkFolders", { basePathToWalk: filePath })
ctx.broker.call("importqueue.processImport", { fileObject })

2. Library Service → Dependencies

// Job queue integration
this.broker.call("jobqueue.enqueue", { action: "enqueue.async" })

// Search operations
ctx.broker.call("search.searchComic", { elasticSearchQueries })
ctx.broker.call("search.deleteElasticSearchIndices", {})

// External metadata
ctx.broker.call("comicvine.getVolumes", { volumeURI })

3. JobQueue Service → Dependencies

// Import processing
this.broker.call("library.importFromJob", { importType, payload })

// Real-time updates
this.broker.call("socket.broadcast", {
    namespace: "/",
    event: "LS_COVER_EXTRACTED",
    args: [{ completedJobCount, importResult }]
})

4. Socket Service → Dependencies

// Job management
ctx.broker.call("jobqueue.getJobCountsByType", {})
ctx.broker.call("jobqueue.toggle", { action: queueAction })

// Download tracking
ctx.call("library.applyAirDCPPDownloadMetadata", {
    bundleId, comicObjectId, name, size, type
})
// Wanted comics query
const result = await ctx.broker.call("search.issue", {
    query: eSQuery,
    pagination: { size: limit, from: offset },
    type: "wanted"
})

API Endpoint Mapping

REST API Routes (/api/*)

Library Management

Job Management

Search Operations

AirDC++ Integration

Image Processing

GraphQL Endpoints

Static File Serving

  • /userdata/* → Static files from ./userdata
  • /comics/* → Static files from ./comics
  • /logs/* → Static files from logs

Event-Driven Communication

Job Queue Events

Job Completion Events

// Successful import completion
"enqueue.async.completed"  socket.broadcast("LS_COVER_EXTRACTED", {
    completedJobCount,
    importResult: job.returnvalue.data.importResult
})

// Failed import handling
"enqueue.async.failed"  socket.broadcast("LS_COVER_EXTRACTION_FAILED", {
    failedJobCount,
    importResult: job
})

// Queue drained
"drained"  socket.broadcast("LS_IMPORT_QUEUE_DRAINED", {
    message: "drained"
})

Archive Processing Events

// Archive uncompression completed
"uncompressFullArchive.async.completed"  socket.broadcast("LS_UNCOMPRESSION_JOB_COMPLETE", {
    uncompressedArchive: job.returnvalue
})

File System Events

// File watcher events (debounced 200ms)
fileWatcher.on("add", (path, stats)  {
    broker.call("library.walkFolders", { basePathToWalk: filePath })
    broker.call("importqueue.processImport", { fileObject })
    broker.broadcast(event, { path: filePath })
})

WebSocket Events

// Search initiation
socket.emit("searchInitiated", { instance })

// Live search results
socket.emit("searchResultAdded", groupedResult)
socket.emit("searchResultUpdated", updatedResult)
socket.emit("searchComplete", { message })

Download Progress

// Download status
broker.emit("downloadCompleted", bundleDBImportResult)
broker.emit("downloadError", error.message)

// Progress tracking
socket.emit("downloadTick", data)

Data Flow Architecture

1. Comic Import Processing Flow

graph TD
    A[File System Watcher] --> B[library.walkFolders]
    B --> C[jobqueue.enqueue]
    C --> D[jobqueue.enqueue.async]
    D --> E[Archive Extraction]
    E --> F[Metadata Processing]
    F --> G[Canonical Metadata Creation]
    G --> H[library.importFromJob]
    H --> I[MongoDB Storage]
    I --> J[ElasticSearch Indexing]
    J --> K[socket.broadcast LS_COVER_EXTRACTED]

2. Search & Discovery Flow

graph TD
    A[GraphQL/REST Query] --> B[search.issue]
    B --> C[ElasticSearch Query]
    C --> D[Results Enhancement]
    D --> E[Metadata Scoring]
    E --> F[Structured Response]

3. Download Management Flow

graph TD
    A[socket[search]] --> B[airdcpp.search]
    B --> C[Real-time Results]
    C --> D[socket[download]]
    D --> E[library.applyAirDCPPDownloadMetadata]
    E --> F[Progress Tracking]
    F --> G[Import Pipeline]

Database Dependencies

MongoDB Collections

Collection Model Used By Services
comics Comic library, search, jobqueue, imagetransformation
settings Settings settings
sessions Session socket
jobresults JobResult jobqueue

ElasticSearch Integration

  • Index: comics - Full-text search with metadata scoring
  • Client: eSClient from comic.model.ts
  • Query Types: match_all, multi_match, bool queries with field boosting

Redis Usage

Purpose Services Configuration
Transport All services moleculer.config.ts:93
Job Queue jobqueue jobqueue.service.ts:27
Socket.IO Adapter socket socket.service.ts:48
Job Counters jobqueue completedJobCount, failedJobCount

External System Integrations

AirDC++ (P2P File Sharing)

// Integration wrapper
const ADCPPSocket = new AirDCPPSocket(config)
await ADCPPSocket.connect()

// Search operations
const searchInstance = await ADCPPSocket.post("search")
const searchInfo = await ADCPPSocket.post(`search/${searchInstance.id}/hub_search`, query)

// Download management
const downloadResult = await ADCPPSocket.post(`search/${searchInstanceId}/results/${resultId}/download`)

ComicVine API

// Metadata enrichment
const volumeDetails = await this.broker.call("comicvine.getVolumes", {
    volumeURI: matchedResult.volume.api_detail_url
})

qBittorrent Client

// Torrent monitoring
const torrents = await this.broker.call("qbittorrent.getTorrentRealTimeStats", { infoHashes })

Metadata Management System

Multi-Source Metadata Aggregation

The system implements sophisticated metadata management with source prioritization:

Source Priority Order

  1. ComicInfo.xml (embedded in archives)
  2. ComicVine API (external database)
  3. Metron (comic database)
  4. Grand Comics Database (GCD)
  5. League of Comic Geeks (LOCG)
  6. Filename Inference (fallback)

Canonical Metadata Structure

const canonical = {
    title: findBestValue('title', inferredMetadata.title),
    series: {
        name: findSeriesValue(['series', 'seriesName', 'name'], inferredMetadata.series),
        volume: findBestValue('volume', inferredMetadata.volume || 1),
        startYear: findBestValue('startYear', inferredMetadata.issue?.year)
    },
    issueNumber: findBestValue('issueNumber', inferredMetadata.issue?.number),
    publisher: findBestValue('publisher', null),
    creators: [], // Combined from all sources
    completeness: {
        score: calculatedScore,
        missingFields: [],
        lastCalculated: currentTime
    }
}

Performance & Scalability Insights

Asynchronous Processing

  • Heavy Operations: Comic import, archive extraction, metadata processing
  • Queue System: BullMQ with Redis backing for reliability
  • Job Types: Import processing, archive extraction, torrent monitoring
  • Real-time Updates: WebSocket progress notifications

Search Optimization

  • Dual Storage: MongoDB (transactional) + ElasticSearch (search)
  • Metadata Scoring: Canonical metadata with source priority
  • Query Types: Full-text, field-specific, boolean combinations
  • Caching: Moleculer built-in memory caching

External Integration Resilience

  • Timeout Handling: Custom timeouts for long-running operations
  • Error Propagation: Structured error responses with context
  • Connection Management: Reusable connections for external APIs
  • Retry Logic: Built-in retry policies for failed operations

Critical Dependency Patterns

1. Service Chain Dependencies

  • Import Pipeline: api → library → jobqueue → socket
  • Search Pipeline: graphql → search → ElasticSearch
  • Download Pipeline: socket → airdcpp → library

2. Circular Dependencies (Managed)

  • socket ←→ library: Download coordination and progress updates
  • jobqueue ←→ socket: Job progress notifications and queue control

3. Shared Resource Dependencies

  • MongoDB: library, search, jobqueue, settings services
  • Redis: All services (transport) + jobqueue (BullMQ) + socket (adapter)
  • ElasticSearch: search, graphql services

Architecture Strengths

1. Separation of Concerns

  • API Gateway: Pure routing and file serving
  • Business Logic: Centralized in library service
  • Data Access: Abstracted through DbMixin
  • External Integration: Isolated in dedicated services

2. Event-Driven Design

  • File System Events: Automatic import triggering
  • Job Lifecycle Events: Progress tracking and error handling
  • Real-time Communication: WebSocket event broadcasting

3. Robust Metadata Management

  • Multi-Source Aggregation: ComicVine, ComicInfo.xml, filename inference
  • Canonical Resolution: Smart metadata merging with source attribution
  • User Curation Support: Framework for manual metadata override

4. Scalability Features

  • Microservices Architecture: Independent service scaling
  • Asynchronous Processing: Heavy operations don't block API responses
  • Redis Transport: Distributed service communication
  • Job Queue: Reliable background processing with retry logic

Potential Areas for Improvement

1. Service Coupling

  • High Interdependence: library ←→ jobqueue ←→ socket tight coupling
  • Recommendation: Event-driven decoupling for some operations

2. Error Handling

  • Inconsistent Patterns: Mix of raw errors and MoleculerError usage
  • Recommendation: Standardized error handling middleware

3. Configuration Management

  • Environment Variables: Direct access vs centralized configuration
  • Recommendation: Enhanced settings service for runtime configuration

4. Testing Strategy

  • Integration Testing: Complex service interactions need comprehensive testing
  • Recommendation: Contract testing between services

Summary

This Moleculer-based architecture demonstrates sophisticated microservices patterns with:

  • 11 specialized services with clear boundaries
  • 47 REST endpoints + GraphQL layer
  • 3 WebSocket namespaces for real-time communication
  • Multi-database architecture (MongoDB + ElasticSearch)
  • Advanced job processing with BullMQ
  • External system integration (P2P, BitTorrent, Comic APIs)

The system successfully manages complex domain requirements while maintaining good separation of concerns and providing excellent user experience through real-time updates and comprehensive metadata management.