🔨 Fixes for import statuses

This commit is contained in:
2026-03-05 21:29:38 -05:00
parent 8138e0fe4f
commit c7d3d46bcf
8 changed files with 277 additions and 372 deletions

View File

@@ -95,7 +95,18 @@ export const resolvers = {
}
) => {
try {
const result = await Comic.paginate(predicate, paginationOptions);
// Parse predicate if it's a JSON string (from scalar type)
let parsedPredicate = predicate;
if (typeof predicate === 'string') {
try {
parsedPredicate = JSON.parse(predicate);
} catch (parseError) {
console.error("Error parsing predicate JSON:", parseError);
throw new Error("Invalid predicate format: must be valid JSON");
}
}
const result = await Comic.paginate(parsedPredicate, paginationOptions);
return result;
} catch (error) {
console.error("Error fetching comic books:", error);
@@ -674,54 +685,6 @@ export const resolvers = {
}
},
/**
* Get cached import statistics (fast, real-time)
* @async
* @function getCachedImportStatistics
* @param {any} _ - Parent resolver (unused)
* @param {Object} args - Query arguments (none)
* @param {Object} context - GraphQL context with broker
* @returns {Promise<Object>} Cached import statistics
* @throws {Error} If statistics service is unavailable
* @description Retrieves cached import statistics from the API service.
* This is a fast, real-time query that doesn't require filesystem scanning.
*
* @example
* ```graphql
* query {
* getCachedImportStatistics {
* success
* stats {
* totalLocalFiles
* alreadyImported
* newFiles
* percentageImported
* pendingFiles
* }
* lastUpdated
* }
* }
* ```
*/
getCachedImportStatistics: async (
_: any,
args: {},
context: any
) => {
try {
const broker = context?.broker;
if (!broker) {
throw new Error("Broker not available in context");
}
const result = await broker.call("api.getCachedImportStatistics");
return result;
} catch (error) {
console.error("Error fetching cached import statistics:", error);
throw new Error(`Failed to fetch cached import statistics: ${error.message}`);
}
},
/**
* Get job result statistics grouped by session
@@ -810,6 +773,7 @@ export const resolvers = {
}
const session = await broker.call("importstate.getActiveSession");
console.log("[GraphQL] getActiveImportSession result:", session ? `Session ${session.sessionId} (${session.type}, ${session.status})` : "No active session");
return session;
} catch (error) {
console.error("Error fetching active import session:", error);
@@ -1554,6 +1518,55 @@ export const resolvers = {
throw new Error(`Failed to start incremental import: ${error.message}`);
}
},
/**
* Force complete a stuck import session
* @async
* @function forceCompleteSession
* @param {any} _ - Parent resolver (unused)
* @param {Object} args - Arguments
* @param {string} args.sessionId - Session ID to force complete
* @param {any} context - GraphQL context with broker
* @returns {Promise<Object>} Result with success status and message
* @throws {Error} If broker is unavailable or session completion fails
*
* @example
* ```graphql
* mutation {
* forceCompleteSession(sessionId: "d7c5043f-5438-4076-9480-2782267899b6") {
* success
* message
* }
* }
* ```
*/
forceCompleteSession: async (
_: any,
{ sessionId }: { sessionId: string },
context: any
) => {
try {
const broker = context?.broker;
if (!broker) {
throw new Error("Broker not available in context");
}
// Force complete the session (mark as failed since it was stuck)
await broker.call("importstate.completeSession", {
sessionId,
success: false,
});
return {
success: true,
message: `Session ${sessionId} has been force completed and removed from active sessions`,
};
} catch (error) {
console.error("Error force completing session:", error);
throw new Error(`Failed to force complete session: ${error.message}`);
}
},
},
/**

View File

@@ -348,9 +348,6 @@ export const typeDefs = gql`
# Get import statistics for a directory
getImportStatistics(directoryPath: String): ImportStatistics!
# Get cached import statistics (fast, real-time)
getCachedImportStatistics: CachedImportStatistics!
# Get job result statistics grouped by session
getJobResultStatistics: [JobResultStatistics!]!
@@ -406,6 +403,9 @@ export const typeDefs = gql`
sessionId: String!
directoryPath: String
): IncrementalImportResult!
# Force complete a stuck import session
forceCompleteSession(sessionId: String!): ForceCompleteResult!
}
# Input types
@@ -739,22 +739,6 @@ export const typeDefs = gql`
percentageImported: String!
}
# Cached import statistics (real-time)
type CachedImportStatistics {
success: Boolean!
message: String
stats: CachedImportStats
lastUpdated: String
}
type CachedImportStats {
totalLocalFiles: Int!
alreadyImported: Int!
newFiles: Int!
percentageImported: String!
pendingFiles: Int!
}
# Import job result
type ImportJobResult {
success: Boolean!
@@ -776,6 +760,12 @@ export const typeDefs = gql`
queued: Int!
}
# Force complete session result
type ForceCompleteResult {
success: Boolean!
message: String!
}
# Job result statistics
type JobResultStatistics {
sessionId: String!

View File

@@ -6,18 +6,6 @@ import ApiGateway from "moleculer-web";
import debounce from "lodash/debounce";
import { IFolderData } from "threetwo-ui-typings";
/**
* Import statistics cache for real-time updates
*/
interface ImportStatisticsCache {
totalLocalFiles: number;
alreadyImported: number;
newFiles: number;
percentageImported: string;
lastUpdated: Date;
pendingFiles: Set<string>; // Files in stabilization period
}
/**
* ApiService exposes REST endpoints and watches the comics directory for changes.
* It uses chokidar to monitor filesystem events and broadcasts them via the Moleculer broker.
@@ -30,18 +18,6 @@ export default class ApiService extends Service {
*/
private fileWatcher?: any;
/**
* Import statistics cache for real-time updates
* @private
*/
private statsCache: ImportStatisticsCache | null = null;
/**
* Debounced function to broadcast statistics updates
* @private
*/
private broadcastStatsUpdate?: ReturnType<typeof debounce>;
/**
* Creates an instance of ApiService.
* @param {ServiceBroker} broker - The Moleculer service broker instance.
@@ -134,61 +110,6 @@ export default class ApiService extends Service {
assets: { folder: "public", options: {} },
},
events: {
/**
* Listen for import session completion to refresh statistics
*/
"IMPORT_SESSION_COMPLETED": {
async handler(ctx: Context<{
sessionId: string;
type: string;
success: boolean;
stats: any;
}>) {
const { sessionId, type, success } = ctx.params;
this.logger.info(
`[Stats Cache] Import session completed: ${sessionId} (${type}, success: ${success})`
);
// Invalidate and refresh statistics cache
await this.actions.invalidateStatsCache();
},
},
/**
* Listen for import progress to update cache incrementally
*/
"IMPORT_PROGRESS": {
async handler(ctx: Context<{
sessionId: string;
stats: any;
}>) {
// Update cache with current progress
if (this.statsCache) {
const { stats } = ctx.params;
// Update alreadyImported count based on files succeeded
if (stats.filesSucceeded) {
this.statsCache.alreadyImported += 1;
this.statsCache.newFiles = Math.max(0, this.statsCache.newFiles - 1);
// Recalculate percentage
if (this.statsCache.totalLocalFiles > 0) {
const percentage = (
(this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100
).toFixed(2);
this.statsCache.percentageImported = `${percentage}%`;
}
this.statsCache.lastUpdated = new Date();
// Trigger debounced broadcast
if (this.broadcastStatsUpdate) {
this.broadcastStatsUpdate();
}
}
}
},
},
/**
* Listen for watcher disable events
*/
@@ -230,192 +151,8 @@ export default class ApiService extends Service {
},
},
},
actions: {
/**
* Get cached import statistics (fast, no filesystem scan)
* @returns Cached statistics or null if not initialized
*/
getCachedImportStatistics: {
rest: "GET /cachedImportStatistics",
async handler() {
// If cache not initialized, try to initialize it now
if (!this.statsCache) {
this.logger.info("[Stats Cache] Cache not initialized, initializing now...");
try {
await this.initializeStatsCache();
} catch (error) {
this.logger.error("[Stats Cache] Failed to initialize:", error);
return {
success: false,
message: "Failed to initialize statistics cache",
stats: null,
lastUpdated: null,
};
}
}
// Check again after initialization attempt
if (!this.statsCache) {
return {
success: false,
message: "Statistics cache not initialized yet",
stats: null,
lastUpdated: null,
};
}
return {
success: true,
stats: {
totalLocalFiles: this.statsCache.totalLocalFiles,
alreadyImported: this.statsCache.alreadyImported,
newFiles: this.statsCache.newFiles,
percentageImported: this.statsCache.percentageImported,
pendingFiles: this.statsCache.pendingFiles.size,
},
lastUpdated: this.statsCache.lastUpdated.toISOString(),
};
},
},
/**
* Invalidate statistics cache (force refresh on next request)
*/
invalidateStatsCache: {
async handler() {
this.logger.info("[Stats Cache] Invalidating cache...");
await this.initializeStatsCache();
return { success: true, message: "Cache invalidated and refreshed" };
},
},
},
methods: {
/**
* Initialize statistics cache by fetching current import statistics
* @private
*/
initializeStatsCache: async function() {
try {
this.logger.info("[Stats Cache] Initializing import statistics cache...");
const stats = await this.broker.call("library.getImportStatistics", {});
if (stats && stats.success) {
this.statsCache = {
totalLocalFiles: stats.stats.totalLocalFiles,
alreadyImported: stats.stats.alreadyImported,
newFiles: stats.stats.newFiles,
percentageImported: stats.stats.percentageImported,
lastUpdated: new Date(),
pendingFiles: new Set<string>(),
};
this.logger.info("[Stats Cache] Cache initialized successfully");
}
} catch (error) {
this.logger.error("[Stats Cache] Failed to initialize cache:", error);
}
},
/**
* Update statistics cache when files are added or removed
* @param event - File event type ('add' or 'unlink')
* @param filePath - Path to the file
* @private
*/
updateStatsCache: function(event: string, filePath: string) {
if (!this.statsCache) return;
const fileExtension = path.extname(filePath);
const isComicFile = [".cbz", ".cbr", ".cb7"].includes(fileExtension);
if (!isComicFile) return;
if (event === "add") {
// Add to pending files (in stabilization period)
this.statsCache.pendingFiles.add(filePath);
this.statsCache.totalLocalFiles++;
this.statsCache.newFiles++;
} else if (event === "unlink") {
// Remove from pending if it was there
this.statsCache.pendingFiles.delete(filePath);
this.statsCache.totalLocalFiles--;
// Could be either new or already imported, but we'll decrement newFiles for safety
if (this.statsCache.newFiles > 0) {
this.statsCache.newFiles--;
}
}
// Recalculate percentage
if (this.statsCache.totalLocalFiles > 0) {
const percentage = ((this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100).toFixed(2);
this.statsCache.percentageImported = `${percentage}%`;
} else {
this.statsCache.percentageImported = "0.00%";
}
this.statsCache.lastUpdated = new Date();
// Trigger debounced broadcast
if (this.broadcastStatsUpdate) {
this.broadcastStatsUpdate();
}
},
/**
* Broadcast statistics update via Socket.IO
* @private
*/
broadcastStats: async function() {
if (!this.statsCache) return;
try {
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "IMPORT_STATISTICS_UPDATED",
args: [{
stats: {
totalLocalFiles: this.statsCache.totalLocalFiles,
alreadyImported: this.statsCache.alreadyImported,
newFiles: this.statsCache.newFiles,
percentageImported: this.statsCache.percentageImported,
pendingFiles: this.statsCache.pendingFiles.size,
},
lastUpdated: this.statsCache.lastUpdated.toISOString(),
}],
});
this.logger.debug("[Stats Cache] Broadcasted statistics update");
} catch (error) {
this.logger.error("[Stats Cache] Failed to broadcast statistics:", error);
}
},
/**
* Mark a file as imported (moved from pending to imported)
* @param filePath - Path to the imported file
* @private
*/
markFileAsImported: function(filePath: string) {
if (!this.statsCache) return;
this.statsCache.pendingFiles.delete(filePath);
this.statsCache.alreadyImported++;
if (this.statsCache.newFiles > 0) {
this.statsCache.newFiles--;
}
// Recalculate percentage
if (this.statsCache.totalLocalFiles > 0) {
const percentage = ((this.statsCache.alreadyImported / this.statsCache.totalLocalFiles) * 100).toFixed(2);
this.statsCache.percentageImported = `${percentage}%`;
}
this.statsCache.lastUpdated = new Date();
// Trigger debounced broadcast
if (this.broadcastStatsUpdate) {
this.broadcastStatsUpdate();
}
},
},
actions: {},
methods: {},
started: this.startWatcher,
stopped: this.stopWatcher,
});
@@ -439,20 +176,6 @@ export default class ApiService extends Service {
return;
}
// Initialize debounced broadcast function (2 second debounce for statistics updates)
this.broadcastStatsUpdate = debounce(
() => {
this.broadcastStats();
},
2000,
{ leading: false, trailing: true }
);
// Initialize statistics cache (async, but don't block watcher startup)
this.initializeStatsCache().catch(err => {
this.logger.error("[Stats Cache] Failed to initialize on startup:", err);
});
this.fileWatcher = chokidar.watch(watchDir, {
persistent: true,
ignoreInitial: true,
@@ -534,11 +257,6 @@ export default class ApiService extends Service {
}
}
// Update statistics cache for add/unlink events
if (event === "add" || event === "unlink") {
this.updateStatsCache(event, filePath);
}
if (event === "add" && stats) {
setTimeout(async () => {
try {
@@ -620,9 +338,6 @@ export default class ApiService extends Service {
this.logger.info(`Successfully imported: ${filePath}`);
// Mark file as imported in statistics cache
this.markFileAsImported(filePath);
// Complete watcher session
await this.broker.call("importstate.completeSession", {
sessionId,

View File

@@ -237,7 +237,26 @@ export default class ImportStateService extends Service {
*/
getActiveSession: {
async handler() {
return this.getActiveSession();
const session = this.getActiveSession();
if (session) {
// Format session for GraphQL response
return {
sessionId: session.sessionId,
type: session.type,
status: session.status,
startedAt: session.startedAt.toISOString(),
completedAt: session.completedAt?.toISOString() || null,
stats: {
totalFiles: session.stats.totalFiles,
filesQueued: session.stats.filesQueued,
filesProcessed: session.stats.filesProcessed,
filesSucceeded: session.stats.filesSucceeded,
filesFailed: session.stats.filesFailed,
},
directoryPath: session.directoryPath || null,
};
}
return null;
},
},
@@ -339,9 +358,13 @@ export default class ImportStateService extends Service {
started: async () => {
this.logger.info("[Import State] Service started");
// Clean up any stale sessions from Redis on startup
const keys = await pubClient.keys("import:session:*");
this.logger.info(`[Import State] Found ${keys.length} session keys in Redis`);
// Auto-complete stuck sessions every 5 minutes
setInterval(() => {
for (const [id, session] of this.activeSessions.entries()) {
const age = Date.now() - session.startedAt.getTime();
if (age > 30 * 60 * 1000 && session.stats.filesProcessed === 0) this.actions.completeSession({ sessionId: id, success: false });
}
}, 5 * 60 * 1000);
},
});
}

View File

@@ -379,6 +379,13 @@ export default class JobQueueService extends Service {
},
],
});
// Emit final library statistics when queue is drained
try {
await this.broker.call("socket.broadcastLibraryStatistics", {});
} catch (err) {
console.error("Failed to emit library statistics after queue drained:", err);
}
},
async "enqueue.async.completed"(ctx: Context<{ id: Number }>) {
// 1. Fetch the job result using the job Id
@@ -410,6 +417,13 @@ export default class JobQueueService extends Service {
});
console.log(`Job ID ${ctx.params.id} completed.`);
// 6. Emit updated library statistics after each import
try {
await this.broker.call("socket.broadcastLibraryStatistics", {});
} catch (err) {
console.error("Failed to emit library statistics after import:", err);
}
},
async "enqueue.async.failed"(ctx) {

View File

@@ -253,6 +253,15 @@ export default class ImportService extends Service {
sessionId,
status: "active",
});
// Emit library statistics after scanning
try {
await this.broker.call("socket.broadcastLibraryStatistics", {
directoryPath: resolvedPath,
});
} catch (err) {
console.error("Failed to emit library statistics:", err);
}
});
} catch (error) {
console.log(error);
@@ -466,12 +475,30 @@ export default class ImportService extends Service {
sessionId,
status: "active",
});
// Emit library statistics after queueing
try {
await this.broker.call("socket.broadcastLibraryStatistics", {
directoryPath: resolvedPath,
});
} catch (err) {
console.error("Failed to emit library statistics:", err);
}
} else {
// No files to import, complete immediately
await this.broker.call("importstate.completeSession", {
sessionId,
success: true,
});
// Emit library statistics even when no new files
try {
await this.broker.call("socket.broadcastLibraryStatistics", {
directoryPath: resolvedPath,
});
} catch (err) {
console.error("Failed to emit library statistics:", err);
}
}
// Emit completion event (queueing complete, not import complete)
@@ -1210,6 +1237,11 @@ export default class ImportService extends Service {
"search.deleteElasticSearchIndices",
{}
);
// Invalidate statistics cache after flushing database
console.info("Invalidating statistics cache after flushDB...");
await ctx.broker.call("api.invalidateStatsCache");
return {
data,
coversFolderDeleteResult,

View File

@@ -326,6 +326,106 @@ export default class SocketService extends Service {
},
},
},
events: {
// File watcher events - forward to Socket.IO clients
async "add"(ctx: Context<{ path: string }>) {
console.log(`[File Watcher] File added: ${ctx.params.path}`);
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_FILE_ADDED",
args: [
{
path: ctx.params.path,
timestamp: new Date().toISOString(),
},
],
});
// Emit updated statistics after file addition
try {
await this.broker.call("socket.broadcastLibraryStatistics", {});
} catch (err) {
console.error("Failed to emit library statistics after file add:", err);
}
},
async "unlink"(ctx: Context<{ path: string }>) {
console.log(`[File Watcher] File removed: ${ctx.params.path}`);
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_FILE_REMOVED",
args: [
{
path: ctx.params.path,
timestamp: new Date().toISOString(),
},
],
});
// Emit updated statistics after file removal
try {
await this.broker.call("socket.broadcastLibraryStatistics", {});
} catch (err) {
console.error("Failed to emit library statistics after file remove:", err);
}
},
async "addDir"(ctx: Context<{ path: string }>) {
console.log(`[File Watcher] Directory added: ${ctx.params.path}`);
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_DIRECTORY_ADDED",
args: [
{
path: ctx.params.path,
timestamp: new Date().toISOString(),
},
],
});
// Emit updated statistics after directory addition
try {
await this.broker.call("socket.broadcastLibraryStatistics", {});
} catch (err) {
console.error("Failed to emit library statistics after directory add:", err);
}
},
async "unlinkDir"(ctx: Context<{ path: string }>) {
console.log(`[File Watcher] Directory removed: ${ctx.params.path}`);
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_DIRECTORY_REMOVED",
args: [
{
path: ctx.params.path,
timestamp: new Date().toISOString(),
},
],
});
// Emit updated statistics after directory removal
try {
await this.broker.call("socket.broadcastLibraryStatistics", {});
} catch (err) {
console.error("Failed to emit library statistics after directory remove:", err);
}
},
async "change"(ctx: Context<{ path: string }>) {
console.log(`[File Watcher] File changed: ${ctx.params.path}`);
await this.broker.call("socket.broadcast", {
namespace: "/",
event: "LS_FILE_CHANGED",
args: [
{
path: ctx.params.path,
timestamp: new Date().toISOString(),
},
],
});
},
},
methods: {
sleep: (ms: number): Promise<NodeJS.Timeout> => {
return new Promise((resolve) => setTimeout(resolve, ms));

View File

@@ -141,25 +141,43 @@ export async function getImportStatistics(localFilePaths: string[]): Promise<{
}> {
console.log(`[Import Stats] Checking ${localFilePaths.length} files against database...`);
// Normalize all paths upfront
const normalizedPaths = localFilePaths.map((p) => path.normalize(p));
// Extract file names (without extension) from paths
// This matches how comics are stored in the database (rawFileDetails.name)
const fileNameToPath = new Map<string, string>();
const fileNames: string[] = [];
// Use batch query instead of fetching all comics
// This is much faster for large libraries
for (const filePath of localFilePaths) {
const fileName = path.basename(filePath, path.extname(filePath));
fileNames.push(fileName);
fileNameToPath.set(fileName, filePath);
}
console.log(`[Import Stats] Extracted ${fileNames.length} file names from paths`);
// Query by file name (matches how comics are checked during import)
const importedComics = await Comic.find(
{
"rawFileDetails.filePath": { $in: normalizedPaths },
"rawFileDetails.name": { $in: fileNames },
},
{ "rawFileDetails.filePath": 1, _id: 0 }
{ "rawFileDetails.name": 1, "rawFileDetails.filePath": 1, _id: 0 }
).lean();
// Build Set of imported paths
const importedPaths = new Set<string>(
importedComics
.map((c: any) => c.rawFileDetails?.filePath)
.filter(Boolean)
.map((p: string) => path.normalize(p))
);
console.log(`[Import Stats] Found ${importedComics.length} matching comics in database`);
// Build Set of imported paths based on name matching
const importedPaths = new Set<string>();
const importedNames = new Set<string>();
for (const comic of importedComics) {
if (comic.rawFileDetails?.name) {
importedNames.add(comic.rawFileDetails.name);
// Map back to the local file path
const localPath = fileNameToPath.get(comic.rawFileDetails.name);
if (localPath) {
importedPaths.add(localPath);
}
}
}
const alreadyImported = importedPaths.size;
const newFiles = localFilePaths.length - alreadyImported;