From bfaf7bb6647a06a931609d6bfb0a09de58476e4c Mon Sep 17 00:00:00 2001 From: Rishi Ghan Date: Tue, 24 Mar 2026 10:58:09 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A8=20Fix=20for=20schema=20stitching?= =?UTF-8?q?=20upon=20init?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/graphql.service.ts | 41 ++++++++++++++++++++++++--------- services/importstate.service.ts | 14 +++++++++++ services/jobqueue.service.ts | 25 ++++++++++++++++---- services/library.service.ts | 14 +++++++++-- 4 files changed, 76 insertions(+), 18 deletions(-) diff --git a/services/graphql.service.ts b/services/graphql.service.ts index f4c082b..d541671 100644 --- a/services/graphql.service.ts +++ b/services/graphql.service.ts @@ -107,12 +107,14 @@ async function autoResolveMetadata(broker: any, logger: any, comicId: string, co */ export default { name: "graphql", - + settings: { /** Remote metadata GraphQL endpoint URL */ metadataGraphqlUrl: process.env.METADATA_GRAPHQL_URL || "http://localhost:3080/metadata-graphql", /** Remote acquisition GraphQL endpoint URL */ acquisitionGraphqlUrl: process.env.ACQUISITION_GRAPHQL_URL || "http://localhost:3060/acquisition-graphql", + /** Retry interval in ms for re-stitching remote schemas (0 = disabled) */ + schemaRetryInterval: 5000, }, actions: { @@ -214,16 +216,12 @@ export default { }, }, + methods: { /** - * Service started lifecycle hook - * Creates local schema and attempts to stitch with remote metadata schema. - * Falls back to local-only if remote unavailable. + * Attempt to build/rebuild the stitched schema. + * Returns true if at least one remote schema was stitched. */ - async started() { - this.logger.info("GraphQL service starting..."); - - const localSchema = makeExecutableSchema({ typeDefs, resolvers }); - + async _buildSchema(localSchema: any): Promise { const subschemas: any[] = [{ schema: localSchema }]; // Stitch metadata schema @@ -250,12 +248,33 @@ export default { this.schema = stitchSchemas({ subschemas, mergeTypes: true }); this.logger.info(`✓ Stitched ${subschemas.length} schemas`); this.remoteSchemaAvailable = true; + return true; } else { - this.logger.warn("⚠ FALLING BACK TO LOCAL SCHEMA ONLY"); this.schema = localSchema; this.remoteSchemaAvailable = false; + return false; } - + }, + }, + + /** + * Service started lifecycle hook + * Blocks until remote schemas are stitched, retrying every schemaRetryInterval ms. + */ + async started() { + this.logger.info("GraphQL service starting..."); + + this._localSchema = makeExecutableSchema({ typeDefs, resolvers }); + this.schema = this._localSchema; + this.remoteSchemaAvailable = false; + + while (true) { + const stitched = await this._buildSchema(this._localSchema); + if (stitched) break; + this.logger.warn(`⚠ Remote schemas unavailable — retrying in ${this.settings.schemaRetryInterval}ms`); + await new Promise(resolve => setTimeout(resolve, this.settings.schemaRetryInterval)); + } + this.logger.info("GraphQL service started successfully"); }, diff --git a/services/importstate.service.ts b/services/importstate.service.ts index 053c4cb..40c2e47 100644 --- a/services/importstate.service.ts +++ b/services/importstate.service.ts @@ -324,6 +324,20 @@ export default class ImportStateService extends Service { return Array.from(this.activeSessions.values()); }, }, + + /** + * Force-clear all active sessions (e.g. after flushDB) + */ + clearActiveSessions: { + async handler() { + const cleared = Array.from(this.activeSessions.keys()); + this.activeSessions.clear(); + this.watcherEnabled = true; + this.logger.warn(`[Import State] Force-cleared ${cleared.length} session(s): ${cleared.join(", ")}`); + await this.broker.broadcast("IMPORT_WATCHER_ENABLED", { reason: "sessions cleared" }); + return { cleared }; + }, + }, }, methods: { diff --git a/services/jobqueue.service.ts b/services/jobqueue.service.ts index c735eaf..278e04c 100644 --- a/services/jobqueue.service.ts +++ b/services/jobqueue.service.ts @@ -379,7 +379,20 @@ export default class JobQueueService extends Service { }, ], }); - + + // Complete the active import session now that the queue is empty + try { + const activeSession = await this.broker.call("importstate.getActiveSession"); + if (activeSession) { + await this.broker.call("importstate.completeSession", { + sessionId: activeSession.sessionId, + success: true, + }); + } + } catch (err) { + console.error("Failed to complete import session after queue drained:", err); + } + // Emit final library statistics when queue is drained try { await this.broker.call("socket.broadcastLibraryStatistics", {}); @@ -392,10 +405,11 @@ export default class JobQueueService extends Service { const job = await this.job(ctx.params.id); // 2. Increment the completed job counter await pubClient.incr("completedJobCount"); - // 3. Fetch the completed job count for the final payload to be sent to the client - const completedJobCount = await pubClient.get( - "completedJobCount" - ); + // 3. Fetch the completed and total job counts for the progress payload + const [completedJobCount, totalJobCount] = await Promise.all([ + pubClient.get("completedJobCount"), + pubClient.get("totalJobCount"), + ]); // 4. Emit the LS_COVER_EXTRACTED event with the necessary details await this.broker.call("socket.broadcast", { namespace: "/", @@ -403,6 +417,7 @@ export default class JobQueueService extends Service { args: [ { completedJobCount, + totalJobCount, importResult: job.returnvalue.data.importResult, }, ], diff --git a/services/library.service.ts b/services/library.service.ts index 6a6fc08..cf63303 100644 --- a/services/library.service.ts +++ b/services/library.service.ts @@ -323,6 +323,9 @@ export default class ImportService extends Service { ? ((stats.alreadyImported / stats.total) * 100).toFixed(2) : "0.00"; + // Count all comics in DB (true imported count, regardless of file presence on disk) + const alreadyImported = await Comic.countDocuments({}); + // Count comics marked as missing (in DB but no longer on disk) const missingFiles = await Comic.countDocuments({ "importStatus.isRawFileMissing": true, @@ -333,7 +336,7 @@ export default class ImportService extends Service { directory: resolvedPath, stats: { totalLocalFiles: stats.total, - alreadyImported: stats.alreadyImported, + alreadyImported, newFiles: stats.newFiles, missingFiles, percentageImported: `${percentageImported}%`, @@ -457,11 +460,16 @@ export default class ImportService extends Service { filesQueued: newFiles.length, }, }); - + this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", { message: `Queueing ${newFiles.length} new files for import...`, }); + // Reset counters and set total so the UI can show a progress bar + await pubClient.set("completedJobCount", 0); + await pubClient.set("failedJobCount", 0); + await pubClient.set("totalJobCount", newFiles.length); + // Queue all new files for (const file of newFiles) { await this.broker.call("jobqueue.enqueue", { @@ -1369,6 +1377,8 @@ export default class ImportService extends Service { rest: "POST /flushDB", params: {}, handler: async (ctx: Context<{}>) => { + // Clear any stale import sessions so subsequent imports are not blocked + await ctx.broker.call("importstate.clearActiveSessions", {}); return await Comic.collection .drop() .then(async (data) => {