Compare commits
10 Commits
airdcpp-au
...
getbundles
| Author | SHA1 | Date | |
|---|---|---|---|
| 30168844f3 | |||
| 2e60e2e3d5 | |||
| 8254ec2093 | |||
| 7381d03045 | |||
| d7e865f84f | |||
| baa5a99855 | |||
| 68c2dacff4 | |||
| 55e0ce6d36 | |||
| 4ffad69c44 | |||
| f9438f2129 |
@@ -60,7 +60,7 @@ services:
|
|||||||
networks:
|
networks:
|
||||||
- kafka-net
|
- kafka-net
|
||||||
ports:
|
ports:
|
||||||
- "127.0.0.1:27017:27017"
|
- "27017:27017"
|
||||||
volumes:
|
volumes:
|
||||||
- "mongodb_data:/bitnami/mongodb"
|
- "mongodb_data:/bitnami/mongodb"
|
||||||
|
|
||||||
@@ -72,7 +72,7 @@ services:
|
|||||||
networks:
|
networks:
|
||||||
- kafka-net
|
- kafka-net
|
||||||
ports:
|
ports:
|
||||||
- "127.0.0.1:6379:6379"
|
- "6379:6379"
|
||||||
|
|
||||||
elasticsearch:
|
elasticsearch:
|
||||||
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2
|
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2
|
||||||
@@ -88,7 +88,7 @@ services:
|
|||||||
soft: -1
|
soft: -1
|
||||||
hard: -1
|
hard: -1
|
||||||
ports:
|
ports:
|
||||||
- "127.0.0.1:9200:9200"
|
- "9200:9200"
|
||||||
networks:
|
networks:
|
||||||
- kafka-net
|
- kafka-net
|
||||||
|
|
||||||
|
|||||||
1026
package-lock.json
generated
1026
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -57,6 +57,7 @@ const through2 = require("through2");
|
|||||||
import klaw from "klaw";
|
import klaw from "klaw";
|
||||||
import path from "path";
|
import path from "path";
|
||||||
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
|
import { COMICS_DIRECTORY, USERDATA_DIRECTORY } from "../constants/directories";
|
||||||
|
import AirDCPPSocket from "../shared/airdcpp.socket";
|
||||||
|
|
||||||
export default class LibraryService extends Service {
|
export default class LibraryService extends Service {
|
||||||
public constructor(
|
public constructor(
|
||||||
@@ -174,7 +175,7 @@ export default class LibraryService extends Service {
|
|||||||
|
|
||||||
// Convert klaw to use a promise-based approach for better flow control
|
// Convert klaw to use a promise-based approach for better flow control
|
||||||
const files = await this.getComicFiles(
|
const files = await this.getComicFiles(
|
||||||
process.env.COMICS_DIRECTORY
|
COMICS_DIRECTORY
|
||||||
);
|
);
|
||||||
for (const file of files) {
|
for (const file of files) {
|
||||||
console.info(
|
console.info(
|
||||||
@@ -187,6 +188,7 @@ export default class LibraryService extends Service {
|
|||||||
path.extname(file.path)
|
path.extname(file.path)
|
||||||
),
|
),
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!comicExists) {
|
if (!comicExists) {
|
||||||
// Send the extraction job to the queue
|
// Send the extraction job to the queue
|
||||||
await this.broker.call("jobqueue.enqueue", {
|
await this.broker.call("jobqueue.enqueue", {
|
||||||
@@ -330,51 +332,21 @@ export default class LibraryService extends Service {
|
|||||||
},
|
},
|
||||||
getComicsMarkedAsWanted: {
|
getComicsMarkedAsWanted: {
|
||||||
rest: "GET /getComicsMarkedAsWanted",
|
rest: "GET /getComicsMarkedAsWanted",
|
||||||
params: {
|
handler: async (ctx: Context<{}>) => {
|
||||||
page: { type: "number", default: 1 },
|
|
||||||
limit: { type: "number", default: 100 },
|
|
||||||
},
|
|
||||||
handler: async (
|
|
||||||
ctx: Context<{ page: number; limit: number }>
|
|
||||||
) => {
|
|
||||||
const { page, limit } = ctx.params;
|
|
||||||
this.logger.info(
|
|
||||||
`Requesting page ${page} with limit ${limit}`
|
|
||||||
);
|
|
||||||
try {
|
try {
|
||||||
const options = {
|
// Query to find comics where 'markEntireVolumeAsWanted' is true or 'issues' array is not empty
|
||||||
page,
|
const wantedComics = await Comic.find({
|
||||||
limit,
|
wanted: { $exists: true },
|
||||||
lean: true,
|
$or: [
|
||||||
};
|
{ "wanted.markEntireVolumeWanted": true },
|
||||||
|
{ "wanted.issues": { $not: { $size: 0 } } },
|
||||||
|
],
|
||||||
|
});
|
||||||
|
|
||||||
const result = await Comic.paginate(
|
console.log(wantedComics); // Output the found comics
|
||||||
{
|
return wantedComics;
|
||||||
wanted: { $exists: true },
|
|
||||||
$or: [
|
|
||||||
{
|
|
||||||
"wanted.markEntireVolumeWanted":
|
|
||||||
true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"wanted.issues": {
|
|
||||||
$not: { $size: 0 },
|
|
||||||
},
|
|
||||||
},
|
|
||||||
],
|
|
||||||
},
|
|
||||||
options
|
|
||||||
);
|
|
||||||
|
|
||||||
// Log the raw result from the database
|
|
||||||
this.logger.info(
|
|
||||||
"Paginate result:",
|
|
||||||
JSON.stringify(result, null, 2)
|
|
||||||
);
|
|
||||||
|
|
||||||
return result.docs; // Return just the docs array
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error("Error finding comics:", error);
|
console.error("Error finding comics:", error);
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -556,7 +528,9 @@ export default class LibraryService extends Service {
|
|||||||
params: { id: "string" },
|
params: { id: "string" },
|
||||||
async handler(ctx: Context<{ id: string }>) {
|
async handler(ctx: Context<{ id: string }>) {
|
||||||
console.log(ctx.params.id);
|
console.log(ctx.params.id);
|
||||||
return await Comic.findById(ctx.params.id);
|
return await Comic.findById(
|
||||||
|
new ObjectId(ctx.params.id)
|
||||||
|
);
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
getComicBooksByIds: {
|
getComicBooksByIds: {
|
||||||
@@ -775,6 +749,48 @@ export default class LibraryService extends Service {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
// This method belongs in library service,
|
||||||
|
// because bundles can only exist for comics _in the library_
|
||||||
|
// (wanted or imported)
|
||||||
|
getBundles: {
|
||||||
|
rest: "POST /getBundles",
|
||||||
|
params: {},
|
||||||
|
handler: async (
|
||||||
|
ctx: Context<{
|
||||||
|
comicObjectId: string;
|
||||||
|
config: any;
|
||||||
|
}>
|
||||||
|
) => {
|
||||||
|
try {
|
||||||
|
// 1. Get the comic object Id
|
||||||
|
const { config } = ctx.params;
|
||||||
|
const comicObject = await Comic.findById(
|
||||||
|
new ObjectId(ctx.params.comicObjectId)
|
||||||
|
);
|
||||||
|
// 2. Init AirDC++
|
||||||
|
const ADCPPSocket = new AirDCPPSocket(config);
|
||||||
|
await ADCPPSocket.connect();
|
||||||
|
// 3. Get the bundles for the comic object
|
||||||
|
if (comicObject) {
|
||||||
|
// make the call to get the bundles from AirDC++ using the bundleId
|
||||||
|
const bundles =
|
||||||
|
comicObject.acquisition.directconnect.downloads.map(
|
||||||
|
async (bundle) => {
|
||||||
|
return await ADCPPSocket.get(
|
||||||
|
`queue/bundles/${bundle.bundleId}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
return Promise.all(bundles);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
throw new Errors.MoleculerError(
|
||||||
|
"Couldn't fetch bundles from AirDC++",
|
||||||
|
500
|
||||||
|
);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
flushDB: {
|
flushDB: {
|
||||||
rest: "POST /flushDB",
|
rest: "POST /flushDB",
|
||||||
params: {},
|
params: {},
|
||||||
|
|||||||
@@ -61,12 +61,8 @@ export default class SocketService extends Service {
|
|||||||
|
|
||||||
if (active > 0 || paused > 0 || waiting > 0) {
|
if (active > 0 || paused > 0 || waiting > 0) {
|
||||||
// 3. Get job counts
|
// 3. Get job counts
|
||||||
const completedJobCount = await pubClient.get(
|
const completedJobCount = await pubClient.get("completedJobCount");
|
||||||
"completedJobCount"
|
const failedJobCount = await pubClient.get("failedJobCount");
|
||||||
);
|
|
||||||
const failedJobCount = await pubClient.get(
|
|
||||||
"failedJobCount"
|
|
||||||
);
|
|
||||||
|
|
||||||
// 4. Send the counts to the active socket.io session
|
// 4. Send the counts to the active socket.io session
|
||||||
await this.broker.call("socket.broadcast", {
|
await this.broker.call("socket.broadcast", {
|
||||||
@@ -83,14 +79,9 @@ export default class SocketService extends Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
throw new MoleculerError(
|
throw new MoleculerError(err, 500, "SESSION_ID_NOT_FOUND", {
|
||||||
err,
|
data: sessionId,
|
||||||
500,
|
});
|
||||||
"SESSION_ID_NOT_FOUND",
|
|
||||||
{
|
|
||||||
data: sessionId,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
@@ -101,11 +92,7 @@ export default class SocketService extends Service {
|
|||||||
}>
|
}>
|
||||||
) => {
|
) => {
|
||||||
const { queueAction } = ctx.params;
|
const { queueAction } = ctx.params;
|
||||||
await this.broker.call(
|
await this.broker.call("jobqueue.toggle", { action: queueAction }, {});
|
||||||
"jobqueue.toggle",
|
|
||||||
{ action: queueAction },
|
|
||||||
{}
|
|
||||||
);
|
|
||||||
},
|
},
|
||||||
importSingleIssue: async (ctx: Context<{}>) => {
|
importSingleIssue: async (ctx: Context<{}>) => {
|
||||||
console.info("AirDC++ finished a download -> ");
|
console.info("AirDC++ finished a download -> ");
|
||||||
@@ -116,13 +103,11 @@ export default class SocketService extends Service {
|
|||||||
// {}
|
// {}
|
||||||
// );
|
// );
|
||||||
},
|
},
|
||||||
// AirDCPP Socket actions
|
|
||||||
|
|
||||||
search: {
|
search: {
|
||||||
params: {
|
params: {
|
||||||
query: "object",
|
query: "object",
|
||||||
config: "object",
|
config: "object",
|
||||||
namespace: "string",
|
|
||||||
},
|
},
|
||||||
async handler(ctx) {
|
async handler(ctx) {
|
||||||
const { query, config, namespace } = ctx.params;
|
const { query, config, namespace } = ctx.params;
|
||||||
@@ -130,10 +115,7 @@ export default class SocketService extends Service {
|
|||||||
const ADCPPSocket = new AirDCPPSocket(config);
|
const ADCPPSocket = new AirDCPPSocket(config);
|
||||||
try {
|
try {
|
||||||
await ADCPPSocket.connect();
|
await ADCPPSocket.connect();
|
||||||
const instance = await ADCPPSocket.post(
|
const instance = await ADCPPSocket.post("search", query);
|
||||||
"search",
|
|
||||||
query
|
|
||||||
);
|
|
||||||
|
|
||||||
// Send the instance to the client
|
// Send the instance to the client
|
||||||
await namespacedInstance.emit("searchInitiated", {
|
await namespacedInstance.emit("searchInitiated", {
|
||||||
@@ -144,14 +126,9 @@ export default class SocketService extends Service {
|
|||||||
await ADCPPSocket.addListener(
|
await ADCPPSocket.addListener(
|
||||||
`search`,
|
`search`,
|
||||||
`search_result_added`,
|
`search_result_added`,
|
||||||
(data) => {
|
(groupedResult) => {
|
||||||
namespacedInstance.emit(
|
console.log(JSON.stringify(groupedResult, null, 4));
|
||||||
"searchResultAdded",
|
namespacedInstance.emit("searchResultAdded", groupedResult);
|
||||||
{
|
|
||||||
groupedResult: data,
|
|
||||||
instanceId: instance.id,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
},
|
},
|
||||||
instance.id
|
instance.id
|
||||||
);
|
);
|
||||||
@@ -159,18 +136,8 @@ export default class SocketService extends Service {
|
|||||||
await ADCPPSocket.addListener(
|
await ADCPPSocket.addListener(
|
||||||
`search`,
|
`search`,
|
||||||
`search_result_updated`,
|
`search_result_updated`,
|
||||||
(data) => {
|
(updatedResult) => {
|
||||||
console.log({
|
namespacedInstance.emit("searchResultUpdated", updatedResult);
|
||||||
updatedResult: data,
|
|
||||||
instanceId: instance.id,
|
|
||||||
});
|
|
||||||
namespacedInstance.emit(
|
|
||||||
"searchResultUpdated",
|
|
||||||
{
|
|
||||||
updatedResult: data,
|
|
||||||
instanceId: instance.id,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
},
|
},
|
||||||
instance.id
|
instance.id
|
||||||
);
|
);
|
||||||
@@ -180,54 +147,32 @@ export default class SocketService extends Service {
|
|||||||
`search_hub_searches_sent`,
|
`search_hub_searches_sent`,
|
||||||
async (searchInfo) => {
|
async (searchInfo) => {
|
||||||
await this.sleep(5000);
|
await this.sleep(5000);
|
||||||
const currentInstance =
|
const currentInstance = await ADCPPSocket.get(
|
||||||
await ADCPPSocket.get(
|
`search/${instance.id}`
|
||||||
`search/${instance.id}`
|
|
||||||
);
|
|
||||||
console.log(
|
|
||||||
JSON.stringify(currentInstance, null, 4)
|
|
||||||
);
|
);
|
||||||
// Send the instance to the client
|
// Send the instance to the client
|
||||||
await namespacedInstance.emit(
|
await namespacedInstance.emit("searchesSent", {
|
||||||
"searchesSent",
|
searchInfo,
|
||||||
{
|
});
|
||||||
searchInfo,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
if (currentInstance.result_count === 0) {
|
if (currentInstance.result_count === 0) {
|
||||||
console.log("No more search results.");
|
console.log("No more search results.");
|
||||||
namespacedInstance.emit(
|
namespacedInstance.emit("searchComplete", {
|
||||||
"searchComplete",
|
message: "No more search results.",
|
||||||
{
|
});
|
||||||
message:
|
|
||||||
"No more search results.",
|
|
||||||
currentInstance,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
instance.id
|
instance.id
|
||||||
);
|
);
|
||||||
|
|
||||||
// Perform the actual search
|
// Perform the actual search
|
||||||
await ADCPPSocket.post(
|
await ADCPPSocket.post(`search/${instance.id}/hub_search`, query);
|
||||||
`search/${instance.id}/hub_search`,
|
|
||||||
query
|
|
||||||
);
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
await namespacedInstance.emit(
|
await namespacedInstance.emit("searchError", error.message);
|
||||||
"searchError",
|
throw new MoleculerError("Search failed", 500, "SEARCH_FAILED", {
|
||||||
error.message
|
error,
|
||||||
);
|
});
|
||||||
throw new MoleculerError(
|
|
||||||
"Search failed",
|
|
||||||
500,
|
|
||||||
"SEARCH_FAILED",
|
|
||||||
{ error }
|
|
||||||
);
|
|
||||||
} finally {
|
} finally {
|
||||||
await ADCPPSocket.disconnect();
|
// await ADCPPSocket.disconnect();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -276,10 +221,7 @@ export default class SocketService extends Service {
|
|||||||
"Download and metadata update successful",
|
"Download and metadata update successful",
|
||||||
bundleDBImportResult
|
bundleDBImportResult
|
||||||
);
|
);
|
||||||
this.broker.emit(
|
this.broker.emit("downloadCompleted", bundleDBImportResult);
|
||||||
"downloadCompleted",
|
|
||||||
bundleDBImportResult
|
|
||||||
);
|
|
||||||
return bundleDBImportResult;
|
return bundleDBImportResult;
|
||||||
} else {
|
} else {
|
||||||
throw new Error(
|
throw new Error(
|
||||||
@@ -288,12 +230,9 @@ export default class SocketService extends Service {
|
|||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.broker.emit("downloadError", error.message);
|
this.broker.emit("downloadError", error.message);
|
||||||
throw new MoleculerError(
|
throw new MoleculerError("Download failed", 500, "DOWNLOAD_FAILED", {
|
||||||
"Download failed",
|
error,
|
||||||
500,
|
});
|
||||||
"DOWNLOAD_FAILED",
|
|
||||||
{ error }
|
|
||||||
);
|
|
||||||
} finally {
|
} finally {
|
||||||
// await ADCPPSocket.disconnect();
|
// await ADCPPSocket.disconnect();
|
||||||
}
|
}
|
||||||
@@ -313,10 +252,7 @@ export default class SocketService extends Service {
|
|||||||
"queue",
|
"queue",
|
||||||
"queue_bundle_tick",
|
"queue_bundle_tick",
|
||||||
(tickData) => {
|
(tickData) => {
|
||||||
console.log(
|
console.log("Received tick data: ", tickData);
|
||||||
"Received tick data: ",
|
|
||||||
tickData
|
|
||||||
);
|
|
||||||
this.io.emit("bundleTickUpdate", tickData);
|
this.io.emit("bundleTickUpdate", tickData);
|
||||||
},
|
},
|
||||||
null
|
null
|
||||||
|
|||||||
Reference in New Issue
Block a user