From 7df1c8190abc2ce0ca65201324cea8e25b29961a Mon Sep 17 00:00:00 2001 From: stringsplit <77242831+nbitzz@users.noreply.github.com> Date: Wed, 6 Mar 2024 17:49:23 -0800 Subject: [PATCH] My goddd you konw what that owrks well enough --- config.json | 4 +-- src/server/lib/files.ts | 50 +++++++++++++++++++------- src/server/routes/api/v0/primaryApi.ts | 13 ++++++- 3 files changed, 51 insertions(+), 16 deletions(-) diff --git a/config.json b/config.json index 4b906ad..757067a 100644 --- a/config.json +++ b/config.json @@ -1,9 +1,9 @@ { - "maxDiscordFiles": 500, + "maxDiscordFiles": 1000, "maxDiscordFileSize": 10485760, "targetGuild": "906767804575928390", "targetChannel": "1024080525993971913", - "requestTimeout": 1800000, + "requestTimeout": 3600000, "maxUploadIdLength": 30, "accounts": { "registrationEnabled": true, diff --git a/src/server/lib/files.ts b/src/server/lib/files.ts index 002be9c..9ba91ea 100644 --- a/src/server/lib/files.ts +++ b/src/server/lib/files.ts @@ -108,6 +108,7 @@ export class ReadStream extends Readable { } id: number = Math.random() + aborter?: AbortController constructor(files: Files, pointer: FilePointer, range?: {start: number, end: number}) { super() @@ -140,7 +141,8 @@ export class ReadStream extends Readable { this.msgIdx = this.ranges.scan_msg_begin } - async _read() { + async _read() {/* + console.log("Calling for more data") if (this.busy) return this.busy = true let readyForMore = true @@ -150,7 +152,14 @@ export class ReadStream extends Readable { if (result === undefined) return // stream has been destroyed. nothing left to do... readyForMore = result } - this.busy = false + this.busy = false*/ + this.pushData() + } + + async _destroy(error: Error | null, callback: (error?: Error | null | undefined) => void): Promise { + if (this.aborter) + this.aborter.abort() + callback() } async getNextAttachment() { @@ -166,8 +175,6 @@ export class ReadStream extends Readable { || this.msgIdx > this.ranges.scan_msg_end ) return null - console.log('passing') - let msg = await this.files.api .fetchMessage(this.pointer.messageids[this.msgIdx]) .catch(() => { @@ -175,7 +182,7 @@ export class ReadStream extends Readable { }) if (msg?.attachments) { - let attach = Array.from(msg.attachments.values()) + let attach = msg.attachments this.attachmentBuffer = this.ranges.useRanges ? attach.slice( this.msgIdx == this.ranges.scan_msg_begin @@ -197,12 +204,21 @@ export class ReadStream extends Readable { // (words of a girl paranoid from writing readfilestream) let pushToStream = this.push.bind(this) + let stream = this return function() { - //if (pushing) return + if (pushing) return pushing = true - return reader.read().then(result => { + return reader.read().catch(e => { + // Probably means an AbortError; whatever it is we'll need to abort + if (webStream.locked) reader.releaseLock() + webStream.cancel().catch(e => undefined) + if (!stream.destroyed) stream.destroy() + return e + }).then(result => { + if (result instanceof Error || !result) return result + let pushed if (!result.done) { pushing = false @@ -214,6 +230,7 @@ export class ReadStream extends Readable { } async getNextChunk() { + console.log("Next chunk requested") let scanning_chunk = await this.getNextAttachment() if (!scanning_chunk) return null @@ -236,7 +253,9 @@ export class ReadStream extends Readable { } : {} - let response = await fetch(scanning_chunk.url, {headers}) + this.aborter = new AbortController() + + let response = await fetch(scanning_chunk.url, {headers, signal: this.aborter.signal}) .catch((e: Error) => { console.error(e) return {body: e} @@ -247,7 +266,7 @@ export class ReadStream extends Readable { return response.body } - currentPusher?: (() => Promise<{readyForMore: boolean, streamDone: boolean }> | undefined) + currentPusher?: (() => Promise<{readyForMore: boolean, streamDone: boolean } | void> | undefined) busy: boolean = false async pushData(): Promise { @@ -256,11 +275,11 @@ export class ReadStream extends Readable { // let's make one then if (!this.currentPusher) { let next = await this.getNextChunk() - if (next && !(next instanceof Error)) + if (next && !(next instanceof Error)) { // okay, so we have a new chunk // let's generate a new currentPusher this.currentPusher = await this.getPusherForWebStream(next) - else { + } else { // oops, look like there's an error // or the stream has ended. // let's destroy the stream @@ -273,6 +292,7 @@ export class ReadStream extends Readable { let result = await this.currentPusher() if (result?.streamDone) { + this.aborter = undefined this.currentPusher = undefined return this.pushData() } else return result?.readyForMore @@ -344,9 +364,11 @@ export class UploadStream extends Writable { callback() } - _destroy(error: Error | null, callback: (err?: Error|null) => void) { + aborted: boolean = false + + async _destroy(error: Error | null, callback: (err?: Error|null) => void) { this.error = error || undefined - if (error) this.abort() + await this.abort() callback() } @@ -354,6 +376,8 @@ export class UploadStream extends Writable { * @description Cancel & unlock the file. When destroy() is called with a non-WebError, this is automatically called */ async abort() { + if (this.aborted) return + this.aborted = true console.log("Aborting") if (!this.destroyed) this.destroy() if (this.current) this.current.destroy(this.error) diff --git a/src/server/routes/api/v0/primaryApi.ts b/src/server/routes/api/v0/primaryApi.ts index 974c401..2b5ed29 100644 --- a/src/server/routes/api/v0/primaryApi.ts +++ b/src/server/routes/api/v0/primaryApi.ts @@ -13,6 +13,7 @@ import formidable from "formidable" import { HttpBindings } from "@hono/node-server" import pkg from "../../../../../package.json" assert {type: "json"} import { type StatusCode } from "hono/utils/http-status" +import { EventEmitter } from "node:events" export let primaryApi = new Hono<{ Variables: { account: Accounts.Account @@ -87,7 +88,17 @@ export default function (files: Files) { return files .readFileStream(fileId, range) .then(async (stream) => { - return new Response(Readable.toWeb(stream) as ReadableStream, ctx.res) + let rs = new ReadableStream({ + start(controller) { + stream.once("end", () => controller.close()) + stream.once("error", (err) => controller.error(err)) + }, + cancel(reason) { + stream.destroy(reason instanceof Error ? reason : new Error(reason)) + } + }) + stream.pipe(ctx.env.outgoing) + return new Response(rs, ctx.body(null)) }) .catch((err) => { return ServeError(ctx, err.status, err.message)