🔨 Fix for schema stitching upon init
This commit is contained in:
@@ -107,12 +107,14 @@ async function autoResolveMetadata(broker: any, logger: any, comicId: string, co
|
|||||||
*/
|
*/
|
||||||
export default {
|
export default {
|
||||||
name: "graphql",
|
name: "graphql",
|
||||||
|
|
||||||
settings: {
|
settings: {
|
||||||
/** Remote metadata GraphQL endpoint URL */
|
/** Remote metadata GraphQL endpoint URL */
|
||||||
metadataGraphqlUrl: process.env.METADATA_GRAPHQL_URL || "http://localhost:3080/metadata-graphql",
|
metadataGraphqlUrl: process.env.METADATA_GRAPHQL_URL || "http://localhost:3080/metadata-graphql",
|
||||||
/** Remote acquisition GraphQL endpoint URL */
|
/** Remote acquisition GraphQL endpoint URL */
|
||||||
acquisitionGraphqlUrl: process.env.ACQUISITION_GRAPHQL_URL || "http://localhost:3060/acquisition-graphql",
|
acquisitionGraphqlUrl: process.env.ACQUISITION_GRAPHQL_URL || "http://localhost:3060/acquisition-graphql",
|
||||||
|
/** Retry interval in ms for re-stitching remote schemas (0 = disabled) */
|
||||||
|
schemaRetryInterval: 5000,
|
||||||
},
|
},
|
||||||
|
|
||||||
actions: {
|
actions: {
|
||||||
@@ -214,16 +216,12 @@ export default {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
methods: {
|
||||||
/**
|
/**
|
||||||
* Service started lifecycle hook
|
* Attempt to build/rebuild the stitched schema.
|
||||||
* Creates local schema and attempts to stitch with remote metadata schema.
|
* Returns true if at least one remote schema was stitched.
|
||||||
* Falls back to local-only if remote unavailable.
|
|
||||||
*/
|
*/
|
||||||
async started() {
|
async _buildSchema(localSchema: any): Promise<boolean> {
|
||||||
this.logger.info("GraphQL service starting...");
|
|
||||||
|
|
||||||
const localSchema = makeExecutableSchema({ typeDefs, resolvers });
|
|
||||||
|
|
||||||
const subschemas: any[] = [{ schema: localSchema }];
|
const subschemas: any[] = [{ schema: localSchema }];
|
||||||
|
|
||||||
// Stitch metadata schema
|
// Stitch metadata schema
|
||||||
@@ -250,12 +248,33 @@ export default {
|
|||||||
this.schema = stitchSchemas({ subschemas, mergeTypes: true });
|
this.schema = stitchSchemas({ subschemas, mergeTypes: true });
|
||||||
this.logger.info(`✓ Stitched ${subschemas.length} schemas`);
|
this.logger.info(`✓ Stitched ${subschemas.length} schemas`);
|
||||||
this.remoteSchemaAvailable = true;
|
this.remoteSchemaAvailable = true;
|
||||||
|
return true;
|
||||||
} else {
|
} else {
|
||||||
this.logger.warn("⚠ FALLING BACK TO LOCAL SCHEMA ONLY");
|
|
||||||
this.schema = localSchema;
|
this.schema = localSchema;
|
||||||
this.remoteSchemaAvailable = false;
|
this.remoteSchemaAvailable = false;
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Service started lifecycle hook
|
||||||
|
* Blocks until remote schemas are stitched, retrying every schemaRetryInterval ms.
|
||||||
|
*/
|
||||||
|
async started() {
|
||||||
|
this.logger.info("GraphQL service starting...");
|
||||||
|
|
||||||
|
this._localSchema = makeExecutableSchema({ typeDefs, resolvers });
|
||||||
|
this.schema = this._localSchema;
|
||||||
|
this.remoteSchemaAvailable = false;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const stitched = await this._buildSchema(this._localSchema);
|
||||||
|
if (stitched) break;
|
||||||
|
this.logger.warn(`⚠ Remote schemas unavailable — retrying in ${this.settings.schemaRetryInterval}ms`);
|
||||||
|
await new Promise(resolve => setTimeout(resolve, this.settings.schemaRetryInterval));
|
||||||
|
}
|
||||||
|
|
||||||
this.logger.info("GraphQL service started successfully");
|
this.logger.info("GraphQL service started successfully");
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|||||||
@@ -324,6 +324,20 @@ export default class ImportStateService extends Service {
|
|||||||
return Array.from(this.activeSessions.values());
|
return Array.from(this.activeSessions.values());
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Force-clear all active sessions (e.g. after flushDB)
|
||||||
|
*/
|
||||||
|
clearActiveSessions: {
|
||||||
|
async handler() {
|
||||||
|
const cleared = Array.from(this.activeSessions.keys());
|
||||||
|
this.activeSessions.clear();
|
||||||
|
this.watcherEnabled = true;
|
||||||
|
this.logger.warn(`[Import State] Force-cleared ${cleared.length} session(s): ${cleared.join(", ")}`);
|
||||||
|
await this.broker.broadcast("IMPORT_WATCHER_ENABLED", { reason: "sessions cleared" });
|
||||||
|
return { cleared };
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
methods: {
|
methods: {
|
||||||
|
|||||||
@@ -379,7 +379,20 @@ export default class JobQueueService extends Service {
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Complete the active import session now that the queue is empty
|
||||||
|
try {
|
||||||
|
const activeSession = await this.broker.call("importstate.getActiveSession");
|
||||||
|
if (activeSession) {
|
||||||
|
await this.broker.call("importstate.completeSession", {
|
||||||
|
sessionId: activeSession.sessionId,
|
||||||
|
success: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error("Failed to complete import session after queue drained:", err);
|
||||||
|
}
|
||||||
|
|
||||||
// Emit final library statistics when queue is drained
|
// Emit final library statistics when queue is drained
|
||||||
try {
|
try {
|
||||||
await this.broker.call("socket.broadcastLibraryStatistics", {});
|
await this.broker.call("socket.broadcastLibraryStatistics", {});
|
||||||
@@ -392,10 +405,11 @@ export default class JobQueueService extends Service {
|
|||||||
const job = await this.job(ctx.params.id);
|
const job = await this.job(ctx.params.id);
|
||||||
// 2. Increment the completed job counter
|
// 2. Increment the completed job counter
|
||||||
await pubClient.incr("completedJobCount");
|
await pubClient.incr("completedJobCount");
|
||||||
// 3. Fetch the completed job count for the final payload to be sent to the client
|
// 3. Fetch the completed and total job counts for the progress payload
|
||||||
const completedJobCount = await pubClient.get(
|
const [completedJobCount, totalJobCount] = await Promise.all([
|
||||||
"completedJobCount"
|
pubClient.get("completedJobCount"),
|
||||||
);
|
pubClient.get("totalJobCount"),
|
||||||
|
]);
|
||||||
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
|
// 4. Emit the LS_COVER_EXTRACTED event with the necessary details
|
||||||
await this.broker.call("socket.broadcast", {
|
await this.broker.call("socket.broadcast", {
|
||||||
namespace: "/",
|
namespace: "/",
|
||||||
@@ -403,6 +417,7 @@ export default class JobQueueService extends Service {
|
|||||||
args: [
|
args: [
|
||||||
{
|
{
|
||||||
completedJobCount,
|
completedJobCount,
|
||||||
|
totalJobCount,
|
||||||
importResult: job.returnvalue.data.importResult,
|
importResult: job.returnvalue.data.importResult,
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
|
|||||||
@@ -323,6 +323,9 @@ export default class ImportService extends Service {
|
|||||||
? ((stats.alreadyImported / stats.total) * 100).toFixed(2)
|
? ((stats.alreadyImported / stats.total) * 100).toFixed(2)
|
||||||
: "0.00";
|
: "0.00";
|
||||||
|
|
||||||
|
// Count all comics in DB (true imported count, regardless of file presence on disk)
|
||||||
|
const alreadyImported = await Comic.countDocuments({});
|
||||||
|
|
||||||
// Count comics marked as missing (in DB but no longer on disk)
|
// Count comics marked as missing (in DB but no longer on disk)
|
||||||
const missingFiles = await Comic.countDocuments({
|
const missingFiles = await Comic.countDocuments({
|
||||||
"importStatus.isRawFileMissing": true,
|
"importStatus.isRawFileMissing": true,
|
||||||
@@ -333,7 +336,7 @@ export default class ImportService extends Service {
|
|||||||
directory: resolvedPath,
|
directory: resolvedPath,
|
||||||
stats: {
|
stats: {
|
||||||
totalLocalFiles: stats.total,
|
totalLocalFiles: stats.total,
|
||||||
alreadyImported: stats.alreadyImported,
|
alreadyImported,
|
||||||
newFiles: stats.newFiles,
|
newFiles: stats.newFiles,
|
||||||
missingFiles,
|
missingFiles,
|
||||||
percentageImported: `${percentageImported}%`,
|
percentageImported: `${percentageImported}%`,
|
||||||
@@ -457,11 +460,16 @@ export default class ImportService extends Service {
|
|||||||
filesQueued: newFiles.length,
|
filesQueued: newFiles.length,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", {
|
this.broker.broadcast("LS_INCREMENTAL_IMPORT_PROGRESS", {
|
||||||
message: `Queueing ${newFiles.length} new files for import...`,
|
message: `Queueing ${newFiles.length} new files for import...`,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Reset counters and set total so the UI can show a progress bar
|
||||||
|
await pubClient.set("completedJobCount", 0);
|
||||||
|
await pubClient.set("failedJobCount", 0);
|
||||||
|
await pubClient.set("totalJobCount", newFiles.length);
|
||||||
|
|
||||||
// Queue all new files
|
// Queue all new files
|
||||||
for (const file of newFiles) {
|
for (const file of newFiles) {
|
||||||
await this.broker.call("jobqueue.enqueue", {
|
await this.broker.call("jobqueue.enqueue", {
|
||||||
@@ -1369,6 +1377,8 @@ export default class ImportService extends Service {
|
|||||||
rest: "POST /flushDB",
|
rest: "POST /flushDB",
|
||||||
params: {},
|
params: {},
|
||||||
handler: async (ctx: Context<{}>) => {
|
handler: async (ctx: Context<{}>) => {
|
||||||
|
// Clear any stale import sessions so subsequent imports are not blocked
|
||||||
|
await ctx.broker.call("importstate.clearActiveSessions", {});
|
||||||
return await Comic.collection
|
return await Comic.collection
|
||||||
.drop()
|
.drop()
|
||||||
.then(async (data) => {
|
.then(async (data) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user