Merge pull request #12 from rishighan/getbundles-fix

getBundles Fix
This commit was merged in pull request #12.
This commit is contained in:
2024-10-24 10:50:29 -04:00
committed by GitHub
3 changed files with 1217 additions and 1671 deletions

2732
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -57,6 +57,7 @@ const through2 = require("through2");
import klaw from "klaw";
import path from "path";
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
import AirDCPPSocket from "../shared/airdcpp.socket";
export default class LibraryService extends Service {
public constructor(
@@ -527,7 +528,9 @@ export default class LibraryService extends Service {
params: { id: "string" },
async handler(ctx: Context<{ id: string }>) {
console.log(ctx.params.id);
return await Comic.findById(ctx.params.id);
return await Comic.findById(
new ObjectId(ctx.params.id)
);
},
},
getComicBooksByIds: {
@@ -746,6 +749,48 @@ export default class LibraryService extends Service {
},
},
// This method belongs in library service,
// because bundles can only exist for comics _in the library_
// (wanted or imported)
getBundles: {
rest: "POST /getBundles",
params: {},
handler: async (
ctx: Context<{
comicObjectId: string;
config: any;
}>
) => {
try {
// 1. Get the comic object Id
const { config } = ctx.params;
const comicObject = await Comic.findById(
new ObjectId(ctx.params.comicObjectId)
);
// 2. Init AirDC++
const ADCPPSocket = new AirDCPPSocket(config);
await ADCPPSocket.connect();
// 3. Get the bundles for the comic object
if (comicObject) {
// make the call to get the bundles from AirDC++ using the bundleId
const bundles =
comicObject.acquisition.directconnect.downloads.map(
async (bundle) => {
return await ADCPPSocket.get(
`queue/bundles/${bundle.bundleId}`
);
}
);
return Promise.all(bundles);
}
} catch (error) {
throw new Errors.MoleculerError(
"Couldn't fetch bundles from AirDC++",
500
);
}
},
},
flushDB: {
rest: "POST /flushDB",
params: {},

View File

@@ -61,12 +61,8 @@ export default class SocketService extends Service {
if (active > 0 || paused > 0 || waiting > 0) {
// 3. Get job counts
const completedJobCount = await pubClient.get(
"completedJobCount"
);
const failedJobCount = await pubClient.get(
"failedJobCount"
);
const completedJobCount = await pubClient.get("completedJobCount");
const failedJobCount = await pubClient.get("failedJobCount");
// 4. Send the counts to the active socket.io session
await this.broker.call("socket.broadcast", {
@@ -83,14 +79,9 @@ export default class SocketService extends Service {
}
}
} catch (err) {
throw new MoleculerError(
err,
500,
"SESSION_ID_NOT_FOUND",
{
data: sessionId,
}
);
throw new MoleculerError(err, 500, "SESSION_ID_NOT_FOUND", {
data: sessionId,
});
}
},
@@ -101,11 +92,7 @@ export default class SocketService extends Service {
}>
) => {
const { queueAction } = ctx.params;
await this.broker.call(
"jobqueue.toggle",
{ action: queueAction },
{}
);
await this.broker.call("jobqueue.toggle", { action: queueAction }, {});
},
importSingleIssue: async (ctx: Context<{}>) => {
console.info("AirDC++ finished a download -> ");
@@ -116,7 +103,6 @@ export default class SocketService extends Service {
// {}
// );
},
// AirDCPP Socket actions
search: {
params: {
@@ -124,18 +110,12 @@ export default class SocketService extends Service {
config: "object",
},
async handler(ctx) {
console.log("a, a kanha kanha...");
const { query, config, namespace } = ctx.params;
const namespacedInstance = this.io.of(namespace || "/");
const ADCPPSocket = new AirDCPPSocket(config);
console.log("asdas", ADCPPSocket);
try {
await ADCPPSocket.connect();
const instance = await ADCPPSocket.post(
"search",
query
);
const instance = await ADCPPSocket.post("search", query);
// Send the instance to the client
await namespacedInstance.emit("searchInitiated", {
@@ -147,10 +127,8 @@ export default class SocketService extends Service {
`search`,
`search_result_added`,
(groupedResult) => {
namespacedInstance.emit(
"searchResultAdded",
groupedResult
);
console.log(JSON.stringify(groupedResult, null, 4));
namespacedInstance.emit("searchResultAdded", groupedResult);
},
instance.id
);
@@ -159,10 +137,7 @@ export default class SocketService extends Service {
`search`,
`search_result_updated`,
(updatedResult) => {
namespacedInstance.emit(
"searchResultUpdated",
updatedResult
);
namespacedInstance.emit("searchResultUpdated", updatedResult);
},
instance.id
);
@@ -172,47 +147,30 @@ export default class SocketService extends Service {
`search_hub_searches_sent`,
async (searchInfo) => {
await this.sleep(5000);
const currentInstance =
await ADCPPSocket.get(
`search/${instance.id}`
);
// Send the instance to the client
await namespacedInstance.emit(
"searchesSent",
{
searchInfo,
}
const currentInstance = await ADCPPSocket.get(
`search/${instance.id}`
);
// Send the instance to the client
await namespacedInstance.emit("searchesSent", {
searchInfo,
});
if (currentInstance.result_count === 0) {
console.log("No more search results.");
namespacedInstance.emit(
"searchComplete",
{
message:
"No more search results.",
}
);
namespacedInstance.emit("searchComplete", {
message: "No more search results.",
});
}
},
instance.id
);
// Perform the actual search
await ADCPPSocket.post(
`search/${instance.id}/hub_search`,
query
);
await ADCPPSocket.post(`search/${instance.id}/hub_search`, query);
} catch (error) {
await namespacedInstance.emit(
"searchError",
error.message
);
throw new MoleculerError(
"Search failed",
500,
"SEARCH_FAILED",
{ error }
);
await namespacedInstance.emit("searchError", error.message);
throw new MoleculerError("Search failed", 500, "SEARCH_FAILED", {
error,
});
} finally {
// await ADCPPSocket.disconnect();
}
@@ -263,10 +221,7 @@ export default class SocketService extends Service {
"Download and metadata update successful",
bundleDBImportResult
);
this.broker.emit(
"downloadCompleted",
bundleDBImportResult
);
this.broker.emit("downloadCompleted", bundleDBImportResult);
return bundleDBImportResult;
} else {
throw new Error(
@@ -275,12 +230,9 @@ export default class SocketService extends Service {
}
} catch (error) {
this.broker.emit("downloadError", error.message);
throw new MoleculerError(
"Download failed",
500,
"DOWNLOAD_FAILED",
{ error }
);
throw new MoleculerError("Download failed", 500, "DOWNLOAD_FAILED", {
error,
});
} finally {
// await ADCPPSocket.disconnect();
}
@@ -300,10 +252,7 @@ export default class SocketService extends Service {
"queue",
"queue_bundle_tick",
(tickData) => {
console.log(
"Received tick data: ",
tickData
);
console.log("Received tick data: ", tickData);
this.io.emit("bundleTickUpdate", tickData);
},
null