🔍 Missing file watching logic for chokidar
This commit is contained in:
@@ -304,6 +304,7 @@ const ComicSchema = mongoose.Schema(
|
||||
importStatus: {
|
||||
isImported: Boolean,
|
||||
tagged: Boolean,
|
||||
isRawFileMissing: { type: Boolean, default: false },
|
||||
matchedResult: {
|
||||
score: String,
|
||||
},
|
||||
|
||||
@@ -736,6 +736,7 @@ export const typeDefs = gql`
|
||||
totalLocalFiles: Int!
|
||||
alreadyImported: Int!
|
||||
newFiles: Int!
|
||||
missingFiles: Int!
|
||||
percentageImported: String!
|
||||
}
|
||||
|
||||
|
||||
2134
package-lock.json
generated
2134
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -4,7 +4,6 @@ import path from "path";
|
||||
import { Service, ServiceBroker, ServiceSchema, Context } from "moleculer";
|
||||
import ApiGateway from "moleculer-web";
|
||||
import debounce from "lodash/debounce";
|
||||
import { IFolderData } from "threetwo-ui-typings";
|
||||
|
||||
/**
|
||||
* ApiService exposes REST endpoints and watches the comics directory for changes.
|
||||
@@ -18,6 +17,12 @@ export default class ApiService extends Service {
|
||||
*/
|
||||
private fileWatcher?: any;
|
||||
|
||||
/**
|
||||
* Per-path debounced handlers for add/change events, keyed by file path.
|
||||
* @private
|
||||
*/
|
||||
private debouncedHandlers: Map<string, ReturnType<typeof debounce>> = new Map();
|
||||
|
||||
/**
|
||||
* Creates an instance of ApiService.
|
||||
* @param {ServiceBroker} broker - The Moleculer service broker instance.
|
||||
@@ -189,35 +194,38 @@ export default class ApiService extends Service {
|
||||
});
|
||||
|
||||
/**
|
||||
* Debounced handler for file system events, batching rapid triggers
|
||||
* into a 200ms window. Leading and trailing calls invoked.
|
||||
* @param {string} event - Type of file event (add, change, etc.).
|
||||
* @param {string} p - Path of the file or directory.
|
||||
* @param {fs.Stats} [stats] - Optional file stats for add/change events.
|
||||
* Returns a debounced handler for a specific path, creating one if needed.
|
||||
* Debouncing per-path prevents duplicate events for the same file while
|
||||
* ensuring each distinct path is always processed.
|
||||
*/
|
||||
const debouncedEvent = debounce(
|
||||
(event: string, p: string, stats?: fs.Stats) => {
|
||||
try {
|
||||
this.handleFileEvent(event, p, stats);
|
||||
} catch (err) {
|
||||
this.logger.error(
|
||||
`Error handling file event [${event}] for ${p}:`,
|
||||
err
|
||||
);
|
||||
}
|
||||
},
|
||||
200,
|
||||
{ leading: true, trailing: true }
|
||||
);
|
||||
const getDebouncedForPath = (p: string) => {
|
||||
if (!this.debouncedHandlers.has(p)) {
|
||||
const fn = debounce(
|
||||
(event: string, filePath: string, stats?: fs.Stats) => {
|
||||
this.debouncedHandlers.delete(filePath);
|
||||
try {
|
||||
this.handleFileEvent(event, filePath, stats);
|
||||
} catch (err) {
|
||||
this.logger.error(`Error handling file event [${event}] for ${filePath}:`, err);
|
||||
}
|
||||
},
|
||||
200,
|
||||
{ leading: true, trailing: true }
|
||||
);
|
||||
this.debouncedHandlers.set(p, fn);
|
||||
}
|
||||
return this.debouncedHandlers.get(p)!;
|
||||
};
|
||||
|
||||
this.fileWatcher
|
||||
.on("ready", () => this.logger.info("Initial scan complete."))
|
||||
.on("error", (err) => this.logger.error("Watcher error:", err))
|
||||
.on("add", (p, stats) => debouncedEvent("add", p, stats))
|
||||
.on("change", (p, stats) => debouncedEvent("change", p, stats))
|
||||
.on("unlink", (p) => debouncedEvent("unlink", p))
|
||||
.on("addDir", (p) => debouncedEvent("addDir", p))
|
||||
.on("unlinkDir", (p) => debouncedEvent("unlinkDir", p));
|
||||
.on("add", (p, stats) => getDebouncedForPath(p)("add", p, stats))
|
||||
.on("change", (p, stats) => getDebouncedForPath(p)("change", p, stats))
|
||||
// unlink/unlinkDir fire once per path — handle immediately, no debounce needed
|
||||
.on("unlink", (p) => this.handleFileEvent("unlink", p))
|
||||
.on("addDir", (p) => getDebouncedForPath(p)("addDir", p))
|
||||
.on("unlinkDir", (p) => this.handleFileEvent("unlinkDir", p));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -246,128 +254,52 @@ export default class ApiService extends Service {
|
||||
): Promise<void> {
|
||||
this.logger.info(`File event [${event}]: ${filePath}`);
|
||||
|
||||
// Check if watcher should process files (not during manual imports)
|
||||
if (event === "add") {
|
||||
const watcherState: any = await this.broker.call("importstate.isWatcherEnabled");
|
||||
if (!watcherState.enabled) {
|
||||
this.logger.info(
|
||||
`[Watcher] Skipping file ${filePath} - manual import in progress (${watcherState.activeSession?.sessionId})`
|
||||
);
|
||||
return;
|
||||
}
|
||||
// Handle file/directory removal — mark affected comics as missing and notify frontend
|
||||
if (event === "unlink" || event === "unlinkDir") {
|
||||
try {
|
||||
const result: any = await this.broker.call("library.markFileAsMissing", { filePath });
|
||||
if (result.marked > 0) {
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "LS_FILES_MISSING",
|
||||
args: [{
|
||||
missingComics: result.missingComics,
|
||||
triggerPath: filePath,
|
||||
count: result.marked,
|
||||
}],
|
||||
});
|
||||
this.logger.info(`[Watcher] Marked ${result.marked} comic(s) as missing for path: ${filePath}`);
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.error(`[Watcher] Failed to mark comics missing for ${filePath}:`, err);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if (event === "add" && stats) {
|
||||
setTimeout(async () => {
|
||||
try {
|
||||
// Double-check watcher is still enabled
|
||||
const watcherState: any = await this.broker.call("importstate.isWatcherEnabled");
|
||||
if (!watcherState.enabled) {
|
||||
this.logger.info(
|
||||
`[Watcher] Skipping delayed import for ${filePath} - manual import started`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const newStats = await fs.promises.stat(filePath);
|
||||
if (newStats.mtime.getTime() === stats.mtime.getTime()) {
|
||||
this.logger.info(`Stable file detected: ${filePath}, importing.`);
|
||||
|
||||
// Create a watcher session for this file
|
||||
const sessionId = `watcher-${Date.now()}`;
|
||||
await this.broker.call("importstate.startSession", {
|
||||
sessionId,
|
||||
type: "watcher",
|
||||
});
|
||||
|
||||
const folderData: IFolderData[] = await this.broker.call(
|
||||
"library.walkFolders",
|
||||
{ basePathToWalk: filePath }
|
||||
);
|
||||
|
||||
if (folderData && folderData.length > 0) {
|
||||
const fileData = folderData[0];
|
||||
const fileName = path.basename(filePath, path.extname(filePath));
|
||||
const extension = path.extname(filePath);
|
||||
|
||||
// Determine mimeType based on extension
|
||||
let mimeType = "application/octet-stream";
|
||||
if (extension === ".cbz") {
|
||||
mimeType = "application/zip; charset=binary";
|
||||
} else if (extension === ".cbr") {
|
||||
mimeType = "application/x-rar-compressed; charset=binary";
|
||||
}
|
||||
|
||||
// Prepare payload for rawImportToDB
|
||||
const payload = {
|
||||
rawFileDetails: {
|
||||
name: fileName,
|
||||
filePath: filePath,
|
||||
fileSize: fileData.fileSize,
|
||||
extension: extension,
|
||||
mimeType: mimeType,
|
||||
},
|
||||
inferredMetadata: {
|
||||
issue: {
|
||||
name: fileName,
|
||||
number: 0,
|
||||
},
|
||||
},
|
||||
sourcedMetadata: {
|
||||
comicInfo: null,
|
||||
},
|
||||
importStatus: {
|
||||
isImported: true,
|
||||
tagged: false,
|
||||
matchedResult: {
|
||||
score: "0",
|
||||
},
|
||||
},
|
||||
acquisition: {
|
||||
source: {
|
||||
wanted: false,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
// Call the library service to import the comic
|
||||
const result: any = await this.broker.call("library.rawImportToDB", {
|
||||
importType: "new",
|
||||
payload: payload,
|
||||
});
|
||||
|
||||
this.logger.info(`Successfully imported: ${filePath}`);
|
||||
|
||||
// Complete watcher session
|
||||
await this.broker.call("importstate.completeSession", {
|
||||
sessionId,
|
||||
success: result.success,
|
||||
});
|
||||
} else {
|
||||
// Complete session even if no folder data
|
||||
await this.broker.call("importstate.completeSession", {
|
||||
sessionId,
|
||||
success: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Error importing file ${filePath}:`, error);
|
||||
// Try to complete session on error
|
||||
try {
|
||||
const sessionId = `watcher-${Date.now()}`;
|
||||
await this.broker.call("importstate.completeSession", {
|
||||
sessionId,
|
||||
success: false,
|
||||
});
|
||||
} catch (e) {
|
||||
// Ignore session completion errors
|
||||
}
|
||||
}
|
||||
try {
|
||||
const newStats = await fs.promises.stat(filePath);
|
||||
if (newStats.mtime.getTime() === stats.mtime.getTime()) {
|
||||
this.logger.info(`[Watcher] Stable file detected: ${filePath}`);
|
||||
|
||||
// Clear missing flag if this file was previously marked absent
|
||||
await this.broker.call("library.clearFileMissingFlag", { filePath });
|
||||
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "LS_FILE_DETECTED",
|
||||
args: [{
|
||||
filePath,
|
||||
fileSize: newStats.size,
|
||||
extension: path.extname(filePath),
|
||||
}],
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`[Watcher] Error handling detected file ${filePath}:`, error);
|
||||
}
|
||||
}, 3000);
|
||||
}
|
||||
|
||||
// Broadcast file system event
|
||||
this.broker.broadcast(event, { path: filePath });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ interface ImportSession {
|
||||
type: "full" | "incremental" | "watcher";
|
||||
status: "starting" | "scanning" | "queueing" | "active" | "completed" | "failed";
|
||||
startedAt: Date;
|
||||
lastActivityAt: Date;
|
||||
completedAt?: Date;
|
||||
stats: {
|
||||
totalFiles: number;
|
||||
@@ -75,6 +76,7 @@ export default class ImportStateService extends Service {
|
||||
type,
|
||||
status: "starting",
|
||||
startedAt: new Date(),
|
||||
lastActivityAt: new Date(),
|
||||
stats: {
|
||||
totalFiles: 0,
|
||||
filesQueued: 0,
|
||||
@@ -212,8 +214,6 @@ export default class ImportStateService extends Service {
|
||||
// Remove from active sessions
|
||||
this.activeSessions.delete(sessionId);
|
||||
|
||||
// Trigger statistics refresh
|
||||
await this.broker.call("api.invalidateStatsCache");
|
||||
|
||||
return session;
|
||||
},
|
||||
@@ -292,6 +292,7 @@ export default class ImportStateService extends Service {
|
||||
}
|
||||
|
||||
session.stats.filesProcessed++;
|
||||
session.lastActivityAt = new Date();
|
||||
if (success) {
|
||||
session.stats.filesSucceeded++;
|
||||
} else {
|
||||
@@ -360,9 +361,13 @@ export default class ImportStateService extends Service {
|
||||
this.logger.info("[Import State] Service started");
|
||||
// Auto-complete stuck sessions every 5 minutes
|
||||
setInterval(() => {
|
||||
const IDLE_TIMEOUT = 10 * 60 * 1000; // 10 minutes without activity
|
||||
for (const [id, session] of this.activeSessions.entries()) {
|
||||
const age = Date.now() - session.startedAt.getTime();
|
||||
if (age > 30 * 60 * 1000 && session.stats.filesProcessed === 0) this.actions.completeSession({ sessionId: id, success: false });
|
||||
const idleMs = Date.now() - session.lastActivityAt.getTime();
|
||||
if (idleMs > IDLE_TIMEOUT) {
|
||||
this.logger.warn(`[Import State] Auto-expiring stuck session ${id} (idle ${Math.round(idleMs / 60000)}m)`);
|
||||
this.actions.completeSession({ sessionId: id, success: false });
|
||||
}
|
||||
}
|
||||
}, 5 * 60 * 1000);
|
||||
},
|
||||
|
||||
@@ -323,6 +323,11 @@ export default class ImportService extends Service {
|
||||
? ((stats.alreadyImported / stats.total) * 100).toFixed(2)
|
||||
: "0.00";
|
||||
|
||||
// Count comics marked as missing (in DB but no longer on disk)
|
||||
const missingFiles = await Comic.countDocuments({
|
||||
"importStatus.isRawFileMissing": true,
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
directory: resolvedPath,
|
||||
@@ -330,6 +335,7 @@ export default class ImportService extends Service {
|
||||
totalLocalFiles: stats.total,
|
||||
alreadyImported: stats.alreadyImported,
|
||||
newFiles: stats.newFiles,
|
||||
missingFiles,
|
||||
percentageImported: `${percentageImported}%`,
|
||||
},
|
||||
};
|
||||
@@ -751,7 +757,83 @@ export default class ImportService extends Service {
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
markFileAsMissing: {
|
||||
rest: "POST /markFileAsMissing",
|
||||
params: {
|
||||
filePath: "string",
|
||||
},
|
||||
async handler(ctx: Context<{ filePath: string }>) {
|
||||
const { filePath } = ctx.params;
|
||||
|
||||
// Prefix-regex match: covers both single file and entire directory subtree
|
||||
const escapedPath = filePath.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
|
||||
const pathRegex = new RegExp(`^${escapedPath}`);
|
||||
|
||||
const affectedComics = await Comic.find(
|
||||
{ "rawFileDetails.filePath": pathRegex },
|
||||
{
|
||||
_id: 1,
|
||||
"rawFileDetails.name": 1,
|
||||
"rawFileDetails.filePath": 1,
|
||||
"rawFileDetails.cover": 1,
|
||||
"inferredMetadata.issue.name": 1,
|
||||
"inferredMetadata.issue.number": 1,
|
||||
}
|
||||
).lean();
|
||||
|
||||
if (affectedComics.length === 0) {
|
||||
return { marked: 0, missingComics: [] };
|
||||
}
|
||||
|
||||
const affectedIds = affectedComics.map((c: any) => c._id);
|
||||
await Comic.updateMany(
|
||||
{ _id: { $in: affectedIds } },
|
||||
{ $set: { "importStatus.isRawFileMissing": true } }
|
||||
);
|
||||
|
||||
return {
|
||||
marked: affectedComics.length,
|
||||
missingComics: affectedComics,
|
||||
};
|
||||
},
|
||||
},
|
||||
|
||||
clearFileMissingFlag: {
|
||||
rest: "POST /clearFileMissingFlag",
|
||||
params: {
|
||||
filePath: "string",
|
||||
},
|
||||
async handler(ctx: Context<{ filePath: string }>) {
|
||||
const { filePath } = ctx.params;
|
||||
|
||||
// First try exact path match
|
||||
const byPath = await Comic.findOneAndUpdate(
|
||||
{ "rawFileDetails.filePath": filePath },
|
||||
{ $set: { "importStatus.isRawFileMissing": false } }
|
||||
);
|
||||
|
||||
if (!byPath) {
|
||||
// File was moved — match by filename and update the stored path too
|
||||
const fileName = path.basename(filePath, path.extname(filePath));
|
||||
await Comic.findOneAndUpdate(
|
||||
{
|
||||
"rawFileDetails.name": fileName,
|
||||
"importStatus.isRawFileMissing": true,
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
"importStatus.isRawFileMissing": false,
|
||||
"rawFileDetails.filePath": filePath,
|
||||
},
|
||||
}
|
||||
);
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
getComicsMarkedAsWanted: {
|
||||
|
||||
rest: "GET /getComicsMarkedAsWanted",
|
||||
handler: async (ctx: Context<{}>) => {
|
||||
try {
|
||||
@@ -1238,9 +1320,6 @@ export default class ImportService extends Service {
|
||||
{}
|
||||
);
|
||||
|
||||
// Invalidate statistics cache after flushing database
|
||||
console.info("Invalidating statistics cache after flushDB...");
|
||||
await ctx.broker.call("api.invalidateStatsCache");
|
||||
|
||||
return {
|
||||
data,
|
||||
|
||||
@@ -301,6 +301,27 @@ export default class SocketService extends Service {
|
||||
},
|
||||
},
|
||||
|
||||
/**
|
||||
* Compute and broadcast current library statistics to all connected Socket.IO clients.
|
||||
* Called after every filesystem event (add, unlink, etc.) to keep the UI in sync.
|
||||
* Emits a single `LS_LIBRARY_STATS` event with totalLocalFiles, alreadyImported,
|
||||
* newFiles, missingFiles, and percentageImported.
|
||||
*/
|
||||
broadcastLibraryStatistics: async (ctx: Context<{ directoryPath?: string }>) => {
|
||||
try {
|
||||
const result: any = await this.broker.call("library.getImportStatistics", {
|
||||
directoryPath: ctx.params?.directoryPath,
|
||||
});
|
||||
await this.broker.call("socket.broadcast", {
|
||||
namespace: "/",
|
||||
event: "LS_LIBRARY_STATS",
|
||||
args: [result],
|
||||
});
|
||||
} catch (err) {
|
||||
this.logger.error("[Socket] broadcastLibraryStatistics failed:", err);
|
||||
}
|
||||
},
|
||||
|
||||
listenFileProgress: {
|
||||
params: { config: "object", namespace: "string" },
|
||||
async handler(
|
||||
|
||||
Reference in New Issue
Block a user