4 Commits

4 changed files with 927 additions and 335 deletions

View File

@@ -60,7 +60,7 @@ services:
networks: networks:
- kafka-net - kafka-net
ports: ports:
- "27017:27017" - "127.0.0.1:27017:27017"
volumes: volumes:
- "mongodb_data:/bitnami/mongodb" - "mongodb_data:/bitnami/mongodb"
@@ -72,7 +72,7 @@ services:
networks: networks:
- kafka-net - kafka-net
ports: ports:
- "6379:6379" - "127.0.0.1:6379:6379"
elasticsearch: elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2 image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2
@@ -88,7 +88,7 @@ services:
soft: -1 soft: -1
hard: -1 hard: -1
ports: ports:
- "9200:9200" - "127.0.0.1:9200:9200"
networks: networks:
- kafka-net - kafka-net

1026
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -57,7 +57,6 @@ const through2 = require("through2");
import klaw from "klaw"; import klaw from "klaw";
import path from "path"; import path from "path";
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories"; import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
import AirDCPPSocket from "../shared/airdcpp.socket";
export default class LibraryService extends Service { export default class LibraryService extends Service {
public constructor( public constructor(
@@ -175,7 +174,7 @@ export default class LibraryService extends Service {
// Convert klaw to use a promise-based approach for better flow control // Convert klaw to use a promise-based approach for better flow control
const files = await this.getComicFiles( const files = await this.getComicFiles(
COMICS_DIRECTORY process.env.COMICS_DIRECTORY
); );
for (const file of files) { for (const file of files) {
console.info( console.info(
@@ -188,7 +187,6 @@ export default class LibraryService extends Service {
path.extname(file.path) path.extname(file.path)
), ),
}); });
if (!comicExists) { if (!comicExists) {
// Send the extraction job to the queue // Send the extraction job to the queue
await this.broker.call("jobqueue.enqueue", { await this.broker.call("jobqueue.enqueue", {
@@ -332,21 +330,51 @@ export default class LibraryService extends Service {
}, },
getComicsMarkedAsWanted: { getComicsMarkedAsWanted: {
rest: "GET /getComicsMarkedAsWanted", rest: "GET /getComicsMarkedAsWanted",
handler: async (ctx: Context<{}>) => { params: {
page: { type: "number", default: 1 },
limit: { type: "number", default: 100 },
},
handler: async (
ctx: Context<{ page: number; limit: number }>
) => {
const { page, limit } = ctx.params;
this.logger.info(
`Requesting page ${page} with limit ${limit}`
);
try { try {
// Query to find comics where 'markEntireVolumeAsWanted' is true or 'issues' array is not empty const options = {
const wantedComics = await Comic.find({ page,
wanted: { $exists: true }, limit,
$or: [ lean: true,
{ "wanted.markEntireVolumeWanted": true }, };
{ "wanted.issues": { $not: { $size: 0 } } },
],
});
console.log(wantedComics); // Output the found comics const result = await Comic.paginate(
return wantedComics; {
wanted: { $exists: true },
$or: [
{
"wanted.markEntireVolumeWanted":
true,
},
{
"wanted.issues": {
$not: { $size: 0 },
},
},
],
},
options
);
// Log the raw result from the database
this.logger.info(
"Paginate result:",
JSON.stringify(result, null, 2)
);
return result.docs; // Return just the docs array
} catch (error) { } catch (error) {
console.error("Error finding comics:", error); this.logger.error("Error finding comics:", error);
throw error; throw error;
} }
}, },
@@ -528,9 +556,7 @@ export default class LibraryService extends Service {
params: { id: "string" }, params: { id: "string" },
async handler(ctx: Context<{ id: string }>) { async handler(ctx: Context<{ id: string }>) {
console.log(ctx.params.id); console.log(ctx.params.id);
return await Comic.findById( return await Comic.findById(ctx.params.id);
new ObjectId(ctx.params.id)
);
}, },
}, },
getComicBooksByIds: { getComicBooksByIds: {
@@ -749,48 +775,6 @@ export default class LibraryService extends Service {
}, },
}, },
// This method belongs in library service,
// because bundles can only exist for comics _in the library_
// (wanted or imported)
getBundles: {
rest: "POST /getBundles",
params: {},
handler: async (
ctx: Context<{
comicObjectId: string;
config: any;
}>
) => {
try {
// 1. Get the comic object Id
const { config } = ctx.params;
const comicObject = await Comic.findById(
new ObjectId(ctx.params.comicObjectId)
);
// 2. Init AirDC++
const ADCPPSocket = new AirDCPPSocket(config);
await ADCPPSocket.connect();
// 3. Get the bundles for the comic object
if (comicObject) {
// make the call to get the bundles from AirDC++ using the bundleId
const bundles =
comicObject.acquisition.directconnect.downloads.map(
async (bundle) => {
return await ADCPPSocket.get(
`queue/bundles/${bundle.bundleId}`
);
}
);
return Promise.all(bundles);
}
} catch (error) {
throw new Errors.MoleculerError(
"Couldn't fetch bundles from AirDC++",
500
);
}
},
},
flushDB: { flushDB: {
rest: "POST /flushDB", rest: "POST /flushDB",
params: {}, params: {},

View File

@@ -61,8 +61,12 @@ export default class SocketService extends Service {
if (active > 0 || paused > 0 || waiting > 0) { if (active > 0 || paused > 0 || waiting > 0) {
// 3. Get job counts // 3. Get job counts
const completedJobCount = await pubClient.get("completedJobCount"); const completedJobCount = await pubClient.get(
const failedJobCount = await pubClient.get("failedJobCount"); "completedJobCount"
);
const failedJobCount = await pubClient.get(
"failedJobCount"
);
// 4. Send the counts to the active socket.io session // 4. Send the counts to the active socket.io session
await this.broker.call("socket.broadcast", { await this.broker.call("socket.broadcast", {
@@ -79,9 +83,14 @@ export default class SocketService extends Service {
} }
} }
} catch (err) { } catch (err) {
throw new MoleculerError(err, 500, "SESSION_ID_NOT_FOUND", { throw new MoleculerError(
data: sessionId, err,
}); 500,
"SESSION_ID_NOT_FOUND",
{
data: sessionId,
}
);
} }
}, },
@@ -92,7 +101,11 @@ export default class SocketService extends Service {
}> }>
) => { ) => {
const { queueAction } = ctx.params; const { queueAction } = ctx.params;
await this.broker.call("jobqueue.toggle", { action: queueAction }, {}); await this.broker.call(
"jobqueue.toggle",
{ action: queueAction },
{}
);
}, },
importSingleIssue: async (ctx: Context<{}>) => { importSingleIssue: async (ctx: Context<{}>) => {
console.info("AirDC++ finished a download -> "); console.info("AirDC++ finished a download -> ");
@@ -103,11 +116,13 @@ export default class SocketService extends Service {
// {} // {}
// ); // );
}, },
// AirDCPP Socket actions
search: { search: {
params: { params: {
query: "object", query: "object",
config: "object", config: "object",
namespace: "string",
}, },
async handler(ctx) { async handler(ctx) {
const { query, config, namespace } = ctx.params; const { query, config, namespace } = ctx.params;
@@ -115,7 +130,10 @@ export default class SocketService extends Service {
const ADCPPSocket = new AirDCPPSocket(config); const ADCPPSocket = new AirDCPPSocket(config);
try { try {
await ADCPPSocket.connect(); await ADCPPSocket.connect();
const instance = await ADCPPSocket.post("search", query); const instance = await ADCPPSocket.post(
"search",
query
);
// Send the instance to the client // Send the instance to the client
await namespacedInstance.emit("searchInitiated", { await namespacedInstance.emit("searchInitiated", {
@@ -126,9 +144,14 @@ export default class SocketService extends Service {
await ADCPPSocket.addListener( await ADCPPSocket.addListener(
`search`, `search`,
`search_result_added`, `search_result_added`,
(groupedResult) => { (data) => {
console.log(JSON.stringify(groupedResult, null, 4)); namespacedInstance.emit(
namespacedInstance.emit("searchResultAdded", groupedResult); "searchResultAdded",
{
groupedResult: data,
instanceId: instance.id,
}
);
}, },
instance.id instance.id
); );
@@ -136,8 +159,18 @@ export default class SocketService extends Service {
await ADCPPSocket.addListener( await ADCPPSocket.addListener(
`search`, `search`,
`search_result_updated`, `search_result_updated`,
(updatedResult) => { (data) => {
namespacedInstance.emit("searchResultUpdated", updatedResult); console.log({
updatedResult: data,
instanceId: instance.id,
});
namespacedInstance.emit(
"searchResultUpdated",
{
updatedResult: data,
instanceId: instance.id,
}
);
}, },
instance.id instance.id
); );
@@ -147,32 +180,54 @@ export default class SocketService extends Service {
`search_hub_searches_sent`, `search_hub_searches_sent`,
async (searchInfo) => { async (searchInfo) => {
await this.sleep(5000); await this.sleep(5000);
const currentInstance = await ADCPPSocket.get( const currentInstance =
`search/${instance.id}` await ADCPPSocket.get(
`search/${instance.id}`
);
console.log(
JSON.stringify(currentInstance, null, 4)
); );
// Send the instance to the client // Send the instance to the client
await namespacedInstance.emit("searchesSent", { await namespacedInstance.emit(
searchInfo, "searchesSent",
}); {
searchInfo,
}
);
if (currentInstance.result_count === 0) { if (currentInstance.result_count === 0) {
console.log("No more search results."); console.log("No more search results.");
namespacedInstance.emit("searchComplete", { namespacedInstance.emit(
message: "No more search results.", "searchComplete",
}); {
message:
"No more search results.",
currentInstance,
}
);
} }
}, },
instance.id instance.id
); );
// Perform the actual search // Perform the actual search
await ADCPPSocket.post(`search/${instance.id}/hub_search`, query); await ADCPPSocket.post(
`search/${instance.id}/hub_search`,
query
);
} catch (error) { } catch (error) {
await namespacedInstance.emit("searchError", error.message); await namespacedInstance.emit(
throw new MoleculerError("Search failed", 500, "SEARCH_FAILED", { "searchError",
error, error.message
}); );
throw new MoleculerError(
"Search failed",
500,
"SEARCH_FAILED",
{ error }
);
} finally { } finally {
// await ADCPPSocket.disconnect(); await ADCPPSocket.disconnect();
} }
}, },
}, },
@@ -221,7 +276,10 @@ export default class SocketService extends Service {
"Download and metadata update successful", "Download and metadata update successful",
bundleDBImportResult bundleDBImportResult
); );
this.broker.emit("downloadCompleted", bundleDBImportResult); this.broker.emit(
"downloadCompleted",
bundleDBImportResult
);
return bundleDBImportResult; return bundleDBImportResult;
} else { } else {
throw new Error( throw new Error(
@@ -230,9 +288,12 @@ export default class SocketService extends Service {
} }
} catch (error) { } catch (error) {
this.broker.emit("downloadError", error.message); this.broker.emit("downloadError", error.message);
throw new MoleculerError("Download failed", 500, "DOWNLOAD_FAILED", { throw new MoleculerError(
error, "Download failed",
}); 500,
"DOWNLOAD_FAILED",
{ error }
);
} finally { } finally {
// await ADCPPSocket.disconnect(); // await ADCPPSocket.disconnect();
} }
@@ -252,7 +313,10 @@ export default class SocketService extends Service {
"queue", "queue",
"queue_bundle_tick", "queue_bundle_tick",
(tickData) => { (tickData) => {
console.log("Received tick data: ", tickData); console.log(
"Received tick data: ",
tickData
);
this.io.emit("bundleTickUpdate", tickData); this.io.emit("bundleTickUpdate", tickData);
}, },
null null