👀 Refactoring file watcher code

This commit is contained in:
2025-06-10 13:30:31 -04:00
parent 999af29800
commit a0671ce6d1
6 changed files with 1288 additions and 1191 deletions

2036
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -48,12 +48,12 @@
"@types/mkdirp": "^1.0.0", "@types/mkdirp": "^1.0.0",
"@types/node": "^13.9.8", "@types/node": "^13.9.8",
"@types/string-similarity": "^4.0.0", "@types/string-similarity": "^4.0.0",
"airdcpp-apisocket": "^2.4.4", "airdcpp-apisocket": "^3.0.0-beta.8",
"axios": "^1.6.8", "axios": "^1.6.8",
"axios-retry": "^3.2.4", "axios-retry": "^3.2.4",
"bree": "^7.1.5", "bree": "^7.1.5",
"calibre-opds": "^1.0.7", "calibre-opds": "^1.0.7",
"chokidar": "^3.5.3", "chokidar": "^4.0.3",
"delay": "^5.0.0", "delay": "^5.0.0",
"dotenv": "^10.0.0", "dotenv": "^10.0.0",
"filename-parser": "^1.0.4", "filename-parser": "^1.0.4",

View File

@@ -50,7 +50,7 @@ export default class AirDCPPService extends Service {
username, username,
password, password,
}); });
return await airDCPPSocket.connect(); return await airDCPPSocket.connect();
} catch (err) { } catch (err) {
console.error(err); console.error(err);
} }

View File

@@ -1,190 +1,160 @@
import chokidar from "chokidar"; import chokidar, { FSWatcher } from "chokidar";
import fs from "fs"; import fs from "fs";
import { Service, ServiceBroker } from "moleculer";
import ApiGateway from "moleculer-web";
import path from "path"; import path from "path";
import { Service, ServiceBroker, ServiceSchema } from "moleculer";
import ApiGateway from "moleculer-web";
import debounce from "lodash/debounce";
import { IFolderData } from "threetwo-ui-typings"; import { IFolderData } from "threetwo-ui-typings";
/**
* ApiService exposes REST endpoints and watches the comics directory for changes.
* Uses chokidar to watch the directory and broadcasts file events via Moleculer.
*/
export default class ApiService extends Service { export default class ApiService extends Service {
public constructor(broker: ServiceBroker) { public constructor(broker: ServiceBroker) {
super(broker); super(broker);
this.parseServiceSchema({ this.parseServiceSchema({
name: "api", name: "api",
mixins: [ApiGateway], mixins: [ApiGateway],
// More info about settings: https://moleculer.services/docs/0.14/moleculer-web.html // More info about settings: https://moleculer.services/docs/0.14/moleculer-web.html
settings: { settings: {
port: process.env.PORT || 3000, port: process.env.PORT || 3000,
routes: [ routes: [
{ {
path: "/api", path: "/api",
whitelist: ["**"], whitelist: ["**"],
cors: { cors: {
origin: "*", origin: "*",
methods: [ methods: ["GET", "OPTIONS", "POST", "PUT", "DELETE"],
"GET", allowedHeaders: ["*"],
"OPTIONS", exposedHeaders: [],
"POST", credentials: false,
"PUT", maxAge: 3600,
"DELETE", },
], use: [],
allowedHeaders: ["*"], mergeParams: true,
exposedHeaders: [], authentication: false,
credentials: false, authorization: false,
maxAge: 3600, autoAliases: true,
}, aliases: {},
use: [], callingOptions: {},
mergeParams: true,
authentication: false,
authorization: false,
autoAliases: true,
aliases: {},
callingOptions: {},
bodyParsers: { bodyParsers: {
json: { json: { strict: false, limit: "1MB" },
strict: false, urlencoded: { extended: true, limit: "1MB" },
limit: "1MB", },
}, mappingPolicy: "all",
urlencoded: { logging: true,
extended: true, },
limit: "1MB", {
}, path: "/userdata",
}, use: [ApiGateway.serveStatic(path.resolve("./userdata"))],
mappingPolicy: "all", // Available values: "all", "restrict" },
logging: true, {
}, path: "/comics",
{ use: [ApiGateway.serveStatic(path.resolve("./comics"))],
path: "/userdata", },
use: [ {
ApiGateway.serveStatic(path.resolve("./userdata")), path: "/logs",
], use: [ApiGateway.serveStatic("logs")],
}, },
{ ],
path: "/comics", log4XXResponses: false,
use: [ApiGateway.serveStatic(path.resolve("./comics"))], logRequestParams: true,
}, logResponseData: true,
{ assets: { folder: "public", options: {} },
path: "/logs", },
use: [ApiGateway.serveStatic("logs")], events: {},
}, methods: {},
], started: this.startWatcher,
log4XXResponses: false, stopped: this.stopWatcher,
logRequestParams: true, });
logResponseData: true, }
assets: {
folder: "public",
// Options to `server-static` module
options: {},
},
},
events: {
}, /** Active file system watcher instance. */
private fileWatcher?: any;
methods: {}, /**
started(): any { * Starts watching the comics directory with debounced, robust handlers.
// Filewatcher */
const fileWatcher = chokidar.watch( private startWatcher(): void {
path.resolve("/comics"), const watchDir = path.resolve(process.env.COMICS_PATH || "/comics");
{ this.logger.info(`Watching comics folder: ${watchDir}`);
ignored: (filePath) =>
path.extname(filePath) === ".dctmp",
persistent: true,
usePolling: true,
interval: 5000,
ignoreInitial: true,
followSymlinks: true,
atomic: true,
awaitWriteFinish: {
stabilityThreshold: 2000,
pollInterval: 100,
},
}
);
const fileCopyDelaySeconds = 3;
const checkEnd = (path, prev) => {
fs.stat(path, async (err, stat) => {
// Replace error checking with something appropriate for your app.
if (err) throw err;
if (stat.mtime.getTime() === prev.mtime.getTime()) {
console.log("finished");
// Move on: call whatever needs to be called to process the file.
console.log(
"File detected, starting import..."
);
const walkedFolder: IFolderData =
await broker.call("library.walkFolders", {
basePathToWalk: path,
});
await this.broker.call(
"importqueue.processImport",
{
fileObject: {
filePath: path,
fileSize: walkedFolder[0].fileSize,
},
}
);
} else
setTimeout(
checkEnd,
fileCopyDelaySeconds,
path,
stat
);
});
};
fileWatcher this.fileWatcher = chokidar.watch(watchDir, {
.on("add", (path, stats) => { persistent: true,
console.log("Watcher detected new files."); ignoreInitial: true,
console.log( followSymlinks: true,
`File ${path} has been added with stats: ${JSON.stringify( depth: 10,
stats, usePolling: true,
null, interval: 5000,
2 atomic: true,
)}` awaitWriteFinish: { stabilityThreshold: 2000, pollInterval: 100 },
); ignored: (p) => p.endsWith(".dctmp") || p.includes("/.git/"),
});
console.log("File", path, "has been added"); const debouncedEvent = debounce(
(event: string, p: string, stats?: fs.Stats) => {
try {
this.handleFileEvent(event, p, stats);
} catch (err) {
this.logger.error(`Error handling file event [${event}] for ${p}:`, err);
}
},
200,
{ leading: true, trailing: true }
);
fs.stat(path, function(err, stat) { this.fileWatcher
// Replace error checking with something appropriate for your app. .on("ready", () => this.logger.info("Initial scan complete."))
if (err) throw err; .on("error", (err) => this.logger.error("Watcher error:", err))
setTimeout( .on("add", (p, stats) => debouncedEvent("add", p, stats))
checkEnd, .on("change", (p, stats) => debouncedEvent("change", p, stats))
fileCopyDelaySeconds, .on("unlink", (p) => debouncedEvent("unlink", p))
path, .on("addDir", (p) => debouncedEvent("addDir", p))
stat .on("unlinkDir", (p) => debouncedEvent("unlinkDir", p));
); }
});
})
// .once(
// "change",
// (path, stats) => /**
// console.log( * Stops the file watcher and frees resources.
// `File ${path} has been changed. Stats: ${JSON.stringify( */
// stats, private async stopWatcher(): Promise<void> {
// null, if (this.fileWatcher) {
// 2 this.logger.info("Stopping file watcher...");
// )}` await this.fileWatcher.close();
// ) this.fileWatcher = undefined;
// ) }
.on( }
"unlink",
(path) => /**
console.log(`File ${path} has been removed`) * Handles and broadcasts file system events.
) * @param event - Event type (add, change, etc.)
.on( * @param filePath - Path of the file or directory
"addDir", * @param stats - Optional file stats
*/
(path) => private async handleFileEvent(
console.log(`Directory ${path} has been added`) event: string,
); filePath: string,
stats?: fs.Stats
}, ): Promise<void> {
}); this.logger.info(`File event [${event}]: ${filePath}`);
} if (event === "add" && stats) {
// Wait for write to stabilize
setTimeout(async () => {
const newStats = await fs.promises.stat(filePath);
if (newStats.mtime.getTime() === stats.mtime.getTime()) {
this.logger.info(`Stable file detected: ${filePath}, importing.`);
const folderData: IFolderData = await this.broker.call(
"library.walkFolders",
{ basePathToWalk: filePath }
);
await this.broker.call("importqueue.processImport", {
fileObject: { filePath, fileSize: folderData[0].fileSize },
});
}
}, 3000);
}
// Broadcast to other services or clients
this.broker.broadcast(event, { path: filePath });
}
} }

View File

@@ -1,17 +1,49 @@
const WebSocket = require("ws"); import WebSocket from "ws";
const { Socket } = require("airdcpp-apisocket"); // const { Socket } = require("airdcpp-apisocket");
import { Socket } from "airdcpp-apisocket";
/**
* Wrapper around the AirDC++ WebSocket API socket.
* Provides methods to connect, disconnect, and interact with the AirDC++ API.
*/
class AirDCPPSocket { class AirDCPPSocket {
// Explicitly declare properties /**
options; // Holds configuration options * Configuration options for the underlying socket.
socketInstance; // Instance of the AirDCPP Socket * @private
*/
private options: {
url: string;
autoReconnect: boolean;
reconnectInterval: number;
logLevel: string;
ignoredListenerEvents: string[];
username: string;
password: string;
};
constructor(configuration: any) { /**
let socketProtocol = configuration.protocol === "https" ? "wss" : "ws"; * Instance of the AirDC++ API socket.
* @private
*/
private socketInstance: any;
/**
* Constructs a new AirDCPPSocket wrapper.
* @param {{ protocol: string; hostname: string; username: string; password: string }} configuration
* Connection configuration: protocol (ws or wss), hostname, username, and password.
*/
constructor(configuration: {
protocol: string;
hostname: string;
username: string;
password: string;
}) {
const socketProtocol =
configuration.protocol === "https" ? "wss" : "ws";
this.options = { this.options = {
url: `${socketProtocol}://${configuration.hostname}/api/v1/`, url: `${socketProtocol}://${configuration.hostname}/api/v1/`,
autoReconnect: true, autoReconnect: true,
reconnectInterval: 5000, // milliseconds reconnectInterval: 5000,
logLevel: "verbose", logLevel: "verbose",
ignoredListenerEvents: [ ignoredListenerEvents: [
"transfer_statistics", "transfer_statistics",
@@ -21,25 +53,33 @@ class AirDCPPSocket {
username: configuration.username, username: configuration.username,
password: configuration.password, password: configuration.password,
}; };
// Initialize the socket instance using the configured options and WebSocket // Initialize the AirDC++ socket instance
this.socketInstance = Socket(this.options, WebSocket); this.socketInstance = Socket(this.options, WebSocket);
} }
// Method to ensure the socket connection is established if required by the library or implementation logic /**
async connect() { * Establishes a connection to the AirDC++ server.
// Here we'll check if a connect method exists and call it * @async
* @returns {Promise<any>} Session information returned by the server.
*/
async connect(): Promise<any> {
if ( if (
this.socketInstance && this.socketInstance &&
typeof this.socketInstance.connect === "function" typeof this.socketInstance.connect === "function"
) { ) {
const sessionInformation = await this.socketInstance.connect(); return await this.socketInstance.connect();
return sessionInformation;
} }
return Promise.reject(
new Error("Connect method not available on socket instance")
);
} }
// Method to ensure the socket is disconnected properly if required by the library or implementation logic /**
async disconnect() { * Disconnects from the AirDC++ server.
// Similarly, check if a disconnect method exists and call it * @async
* @returns {Promise<void>}
*/
async disconnect(): Promise<void> {
if ( if (
this.socketInstance && this.socketInstance &&
typeof this.socketInstance.disconnect === "function" typeof this.socketInstance.disconnect === "function"
@@ -48,19 +88,43 @@ class AirDCPPSocket {
} }
} }
// Method to post data to an endpoint /**
async post(endpoint: any, data: any = {}) { * Sends a POST request to a specific AirDC++ endpoint.
// Call post on the socket instance, assuming post is a valid method of the socket instance * @async
* @param {string} endpoint - API endpoint path (e.g., "search").
* @param {object} [data={}] - Payload to send with the request.
* @returns {Promise<any>} Response from the AirDC++ server.
*/
async post(endpoint: string, data: object = {}): Promise<any> {
return await this.socketInstance.post(endpoint, data); return await this.socketInstance.post(endpoint, data);
} }
async get(endpoint: any, data: any = {}) {
// Call post on the socket instance, assuming post is a valid method of the socket instance /**
* Sends a GET request to a specific AirDC++ endpoint.
* @async
* @param {string} endpoint - API endpoint path (e.g., "search/123").
* @param {object} [data={}] - Query parameters to include.
* @returns {Promise<any>} Response from the AirDC++ server.
*/
async get(endpoint: string, data: object = {}): Promise<any> {
return await this.socketInstance.get(endpoint, data); return await this.socketInstance.get(endpoint, data);
} }
// Method to add listeners to the socket instance for handling real-time updates or events /**
async addListener(event: any, handlerName: any, callback: any, id?: any) { * Adds an event listener to the AirDC++ socket.
// Attach a listener to the socket instance * @async
* @param {string} event - Event group (e.g., "search" or "queue").
* @param {string} handlerName - Specific event within the group (e.g., "search_result_added").
* @param {Function} callback - Callback to invoke when the event occurs.
* @param {string|number} [id] - Optional identifier (e.g., search instance ID).
* @returns {Promise<any>} Listener registration result.
*/
async addListener(
event: string,
handlerName: string,
callback: (...args: any[]) => void,
id?: string | number
): Promise<any> {
return await this.socketInstance.addListener( return await this.socketInstance.addListener(
event, event,
handlerName, handlerName,

View File

@@ -4,6 +4,7 @@
"esModuleInterop": true, "esModuleInterop": true,
"noImplicitAny": false, "noImplicitAny": false,
"removeComments": true, "removeComments": true,
"allowSyntheticDefaultImports": true,
"preserveConstEnums": true, "preserveConstEnums": true,
"sourceMap": true, "sourceMap": true,
"pretty": true, "pretty": true,