📈 Added real time import stats and stats cache

This commit is contained in:
2026-03-05 12:40:57 -05:00
parent a2f9be71ed
commit cc30dcc14f
4 changed files with 530 additions and 9 deletions

View File

@@ -0,0 +1,117 @@
/**
* GraphQL queries and mutations for import operations
* @module examples/frontend/graphql-queries/importQueries
*/
/**
* Query to get import statistics for a directory
* Shows how many files are already imported vs. new files
*/
export const GET_IMPORT_STATISTICS = `
query GetImportStatistics($directoryPath: String) {
getImportStatistics(directoryPath: $directoryPath) {
success
directory
stats {
totalLocalFiles
alreadyImported
newFiles
percentageImported
}
}
}
`;
/**
* Mutation to start a new full import
* Imports all comics in the directory, skipping already imported files
*/
export const START_NEW_IMPORT = `
mutation StartNewImport($sessionId: String!) {
startNewImport(sessionId: $sessionId) {
success
message
jobsQueued
}
}
`;
/**
* Mutation to start an incremental import
* Only imports new files not already in the database
*/
export const START_INCREMENTAL_IMPORT = `
mutation StartIncrementalImport($sessionId: String!, $directoryPath: String) {
startIncrementalImport(sessionId: $sessionId, directoryPath: $directoryPath) {
success
message
stats {
total
alreadyImported
newFiles
queued
}
}
}
`;
/**
* Example usage with variables
*/
export const exampleUsage = {
// Get import statistics
getStatistics: {
query: GET_IMPORT_STATISTICS,
variables: {
directoryPath: "/comics", // Optional, defaults to COMICS_DIRECTORY
},
},
// Start new full import
startNewImport: {
query: START_NEW_IMPORT,
variables: {
sessionId: `import-${Date.now()}`,
},
},
// Start incremental import
startIncrementalImport: {
query: START_INCREMENTAL_IMPORT,
variables: {
sessionId: `incremental-${Date.now()}`,
directoryPath: "/comics", // Optional
},
},
};
/**
* TypeScript types for the responses
*/
export interface ImportStatistics {
success: boolean;
directory: string;
stats: {
totalLocalFiles: number;
alreadyImported: number;
newFiles: number;
percentageImported: string;
};
}
export interface ImportJobResult {
success: boolean;
message: string;
jobsQueued: number;
}
export interface IncrementalImportResult {
success: boolean;
message: string;
stats: {
total: number;
alreadyImported: number;
newFiles: number;
queued: number;
};
}

View File

@@ -621,6 +621,99 @@ export const resolvers = {
throw new Error("Failed to preview canonical metadata");
}
},
/**
* Get cached import statistics (fast, real-time)
* @async
* @function getCachedImportStatistics
* @param {any} _ - Parent resolver (unused)
* @param {Object} args - Query arguments (none)
* @param {Object} context - GraphQL context with broker
* @returns {Promise<Object>} Cached import statistics
* @throws {Error} If statistics service is unavailable
* @description Retrieves cached import statistics from the API service.
* This is a fast, real-time query that doesn't require filesystem scanning.
*
* @example
* ```graphql
* query {
* getCachedImportStatistics {
* success
* stats {
* totalLocalFiles
* alreadyImported
* newFiles
* percentageImported
* pendingFiles
* }
* lastUpdated
* }
* }
* ```
*/
getCachedImportStatistics: async (
_: any,
args: {},
context: any
) => {
try {
const broker = context?.broker;
if (!broker) {
throw new Error("Broker not available in context");
}
const result = await broker.call("api.getCachedImportStatistics");
return result;
} catch (error) {
console.error("Error fetching cached import statistics:", error);
throw new Error(`Failed to fetch cached import statistics: ${error.message}`);
}
},
/**
* Get job result statistics grouped by session
* @async
* @function getJobResultStatistics
* @param {any} _ - Parent resolver (unused)
* @param {Object} args - Query arguments (none)
* @param {Object} context - GraphQL context with broker
* @returns {Promise<Array>} Array of job result statistics by session
* @throws {Error} If job queue service is unavailable
* @description Retrieves job result statistics grouped by session ID,
* including counts of completed and failed jobs and earliest timestamp.
*
* @example
* ```graphql
* query {
* getJobResultStatistics {
* sessionId
* completedJobs
* failedJobs
* earliestTimestamp
* }
* }
* ```
*/
getJobResultStatistics: async (
_: any,
args: {},
context: any
) => {
try {
const broker = context?.broker;
if (!broker) {
throw new Error("Broker not available in context");
}
const result = await broker.call("jobqueue.getJobResultStatistics");
return result;
} catch (error) {
console.error("Error fetching job result statistics:", error);
throw new Error(`Failed to fetch job result statistics: ${error.message}`);
}
},
},
Mutation: {

View File

@@ -344,6 +344,15 @@ export const typeDefs = gql`
comicId: ID!
preferences: UserPreferencesInput
): CanonicalMetadata
# Get import statistics for a directory
getImportStatistics(directoryPath: String): ImportStatistics!
# Get cached import statistics (fast, real-time)
getCachedImportStatistics: CachedImportStatistics!
# Get job result statistics grouped by session
getJobResultStatistics: [JobResultStatistics!]!
}
# Mutations
@@ -385,6 +394,15 @@ export const typeDefs = gql`
source: MetadataSource!
metadata: String!
): Comic!
# Start a new full import of the comics directory
startNewImport(sessionId: String!): ImportJobResult!
# Start an incremental import (only new files)
startIncrementalImport(
sessionId: String!
directoryPath: String
): IncrementalImportResult!
}
# Input types
@@ -703,4 +721,63 @@ export const typeDefs = gql`
_score: Float
_source: Comic!
}
# Import statistics
type ImportStatistics {
success: Boolean!
directory: String!
stats: ImportStats!
}
type ImportStats {
totalLocalFiles: Int!
alreadyImported: Int!
newFiles: Int!
percentageImported: String!
}
# Cached import statistics (real-time)
type CachedImportStatistics {
success: Boolean!
message: String
stats: CachedImportStats
lastUpdated: String
}
type CachedImportStats {
totalLocalFiles: Int!
alreadyImported: Int!
newFiles: Int!
percentageImported: String!
pendingFiles: Int!
}
# Import job result
type ImportJobResult {
success: Boolean!
message: String!
jobsQueued: Int!
}
# Incremental import result
type IncrementalImportResult {
success: Boolean!
message: String!
stats: IncrementalImportStats!
}
type IncrementalImportStats {
total: Int!
alreadyImported: Int!
newFiles: Int!
queued: Int!
}
# Job result statistics
type JobResultStatistics {
sessionId: String!
completedJobs: Int!
failedJobs: Int!
earliestTimestamp: String!
}
`;

View File

@@ -6,6 +6,18 @@ import ApiGateway from "moleculer-web";
import debounce from "lodash/debounce";
import { IFolderData } from "threetwo-ui-typings";
/**
* Import statistics cache for real-time updates
*/
interface ImportStatisticsCache {
totalLocalFiles: number;
alreadyImported: number;
newFiles: number;
percentageImported: string;
lastUpdated: Date;
pendingFiles: Set<string>; // Files in stabilization period
}
/**
* ApiService exposes REST endpoints and watches the comics directory for changes.
* It uses chokidar to monitor filesystem events and broadcasts them via the Moleculer broker.
@@ -18,6 +30,18 @@ export default class ApiService extends Service {
*/
private fileWatcher?: any;
/**
* Import statistics cache for real-time updates
* @private
*/
private statsCache: ImportStatisticsCache | null = null;
/**
* Debounced function to broadcast statistics updates
* @private
*/
private broadcastStatsUpdate?: ReturnType<typeof debounce>;
/**
* Creates an instance of ApiService.
* @param {ServiceBroker} broker - The Moleculer service broker instance.
@@ -109,7 +133,192 @@ export default class ApiService extends Service {
assets: { folder: "public", options: {} },
},
events: {},
methods: {},
actions: {
/**
* Get cached import statistics (fast, no filesystem scan)
* @returns Cached statistics or null if not initialized
*/
getCachedImportStatistics: {
rest: "GET /cachedImportStatistics",
async handler() {
// If cache not initialized, try to initialize it now
if (!this.statsCache) {
this.logger.info("[Stats Cache] Cache not initialized, initializing now...");
try {
await this.initializeStatsCache();
} catch (error) {
this.logger.error("[Stats Cache] Failed to initialize:", error);
return {
success: false,
message: "Failed to initialize statistics cache",
stats: null,
lastUpdated: null,
};
}
}
// Check again after initialization attempt
if (!this.statsCache) {
return {
success: false,
message: "Statistics cache not initialized yet",
stats: null,
lastUpdated: null,
};
}
return {
success: true,
stats: {
totalLocalFiles: this.statsCache.totalLocalFiles,
alreadyImported: this.statsCache.alreadyImported,
newFiles: this.statsCache.newFiles,
percentageImported: this.statsCache.percentageImported,
pendingFiles: this.statsCache.pendingFiles.size,
},
lastUpdated: this.statsCache.lastUpdated.toISOString(),
};
},
},
/**
* Invalidate statistics cache (force refresh on next request)
*/
invalidateStatsCache: {
async handler() {
this.logger.info("[Stats Cache] Invalidating cache...");
await this.initializeStatsCache();
return { success: true, message: "Cache invalidated and refreshed" };
},
},
},
methods: {
/**
* Initialize statistics cache by fetching current import statistics
* @private
*/
initializeStatsCache: async function() {
try {
this.logger.info("[Stats Cache] Initializing import statistics cache...");
const stats = await this.broker.call("library.getImportStatistics", {});
if (stats && stats.success) {
this.statsCache = {
totalLocalFiles: stats.stats.totalLocalFiles,
alreadyImported: stats.stats.alreadyImported,
newFiles: stats.stats.newFiles,
percentageImported: stats.stats.percentageImported,
lastUpdated: new Date(),
pendingFiles: new Set<string>(),
};
this.logger.info("[Stats Cache] Cache initialized successfully");
}
} catch (error) {
this.logger.error("[Stats Cache] Failed to initialize cache:", error);
}
},
/**
* Update statistics cache when files are added or removed
* @param event - File event type ('add' or 'unlink')
* @param filePath - Path to the file
* @private
*/
updateStatsCache: function(event: string, filePath: string) {
if (!this.statsCache) return;
const fileExtension = path.extname(filePath);
const isComicFile = [".cbz", ".cbr", ".cb7"].includes(fileExtension);
if (!isComicFile) return;
if (event === "add") {
// Add to pending files (in stabilization period)
this.statsCache.pendingFiles.add(filePath);
this.statsCache.totalLocalFiles++;
this.statsCache.newFiles++;
} else if (event === "unlink") {
// Remove from pending if it was there
this.statsCache.pendingFiles.delete(filePath);
this.statsCache.totalLocalFiles--;
// Could be either new or already imported, but we'll decrement newFiles for safety
if (this.statsCache.newFiles > 0) {
this.statsCache.newFiles--;
}
}
// Recalculate percentage
if (this.statsCache.totalLocalFiles > 0) {
const percentage = ((this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100).toFixed(2);
this.statsCache.percentageImported = `${percentage}%`;
} else {
this.statsCache.percentageImported = "0.00%";
}
this.statsCache.lastUpdated = new Date();
// Trigger debounced broadcast
if (this.broadcastStatsUpdate) {
this.broadcastStatsUpdate();
}
},
/**
* Broadcast statistics update via Socket.IO
* @private
*/
broadcastStats: async function() {
if (!this.statsCache) return;
try {
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "IMPORT_STATISTICS_UPDATED",
args: [{
stats: {
totalLocalFiles: this.statsCache.totalLocalFiles,
alreadyImported: this.statsCache.alreadyImported,
newFiles: this.statsCache.newFiles,
percentageImported: this.statsCache.percentageImported,
pendingFiles: this.statsCache.pendingFiles.size,
},
lastUpdated: this.statsCache.lastUpdated.toISOString(),
}],
});
this.logger.debug("[Stats Cache] Broadcasted statistics update");
} catch (error) {
this.logger.error("[Stats Cache] Failed to broadcast statistics:", error);
}
},
/**
* Mark a file as imported (moved from pending to imported)
* @param filePath - Path to the imported file
* @private
*/
markFileAsImported: function(filePath: string) {
if (!this.statsCache) return;
this.statsCache.pendingFiles.delete(filePath);
this.statsCache.alreadyImported++;
if (this.statsCache.newFiles > 0) {
this.statsCache.newFiles--;
}
// Recalculate percentage
if (this.statsCache.totalLocalFiles > 0) {
const percentage = ((this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100).toFixed(2);
this.statsCache.percentageImported = `${percentage}%`;
}
this.statsCache.lastUpdated = new Date();
// Trigger debounced broadcast
if (this.broadcastStatsUpdate) {
this.broadcastStatsUpdate();
}
},
},
started: this.startWatcher,
stopped: this.stopWatcher,
});
@@ -120,7 +329,7 @@ export default class ApiService extends Service {
* Debounces rapid events and logs initial scan completion.
* @private
*/
private startWatcher(): void {
private async startWatcher(): Promise<void> {
const rawDir = process.env.COMICS_DIRECTORY;
if (!rawDir) {
this.logger.error("COMICS_DIRECTORY not set; cannot start watcher");
@@ -133,6 +342,20 @@ export default class ApiService extends Service {
return;
}
// Initialize debounced broadcast function (2 second debounce for statistics updates)
this.broadcastStatsUpdate = debounce(
() => {
this.broadcastStats();
},
2000,
{ leading: false, trailing: true }
);
// Initialize statistics cache (async, but don't block watcher startup)
this.initializeStatsCache().catch(err => {
this.logger.error("[Stats Cache] Failed to initialize on startup:", err);
});
this.fileWatcher = chokidar.watch(watchDir, {
persistent: true,
ignoreInitial: true,
@@ -202,13 +425,19 @@ export default class ApiService extends Service {
stats?: fs.Stats
): Promise<void> {
this.logger.info(`File event [${event}]: ${filePath}`);
// Update statistics cache for add/unlink events
if (event === "add" || event === "unlink") {
this.updateStatsCache(event, filePath);
}
if (event === "add" && stats) {
setTimeout(async () => {
const newStats = await fs.promises.stat(filePath);
if (newStats.mtime.getTime() === stats.mtime.getTime()) {
this.logger.info(`Stable file detected: ${filePath}, importing.`);
try {
try {
const newStats = await fs.promises.stat(filePath);
if (newStats.mtime.getTime() === stats.mtime.getTime()) {
this.logger.info(`Stable file detected: ${filePath}, importing.`);
const folderData: IFolderData[] = await this.broker.call(
"library.walkFolders",
{ basePathToWalk: filePath }
@@ -266,13 +495,18 @@ export default class ApiService extends Service {
});
this.logger.info(`Successfully queued import for: ${filePath}`);
// Mark file as imported in statistics cache
this.markFileAsImported(filePath);
}
} catch (error) {
this.logger.error(`Error importing file ${filePath}:`, error);
}
} catch (error) {
this.logger.error(`Error importing file ${filePath}:`, error);
}
}, 3000);
}
// Broadcast file system event
this.broker.broadcast(event, { path: filePath });
}
}