From 486fb6912e14a9138d4891b01f9beef06e99db73 Mon Sep 17 00:00:00 2001 From: stringsplit <77242831+nbitzz@users.noreply.github.com> Date: Sat, 24 Jun 2023 17:34:12 -0700 Subject: [PATCH] hopefully implement backpressure --- src/server/lib/files.ts | 75 ++++++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/src/server/lib/files.ts b/src/server/lib/files.ts index ba1d138..63011c5 100644 --- a/src/server/lib/files.ts +++ b/src/server/lib/files.ts @@ -275,12 +275,6 @@ export default class Files { if (this.files[uploadId]) { let file = this.files[uploadId] - let dataStream = new Readable({ - read(){} - }) - - resolve(dataStream) - let scan_msg_begin = 0, scan_msg_end = file.messageids.length-1, @@ -302,7 +296,9 @@ export default class Files { scan_msg_end = Math.ceil(scan_files_end / 10) } - + + let attachments: Discord.Attachment[] = []; + for (let xi = scan_msg_begin; xi < scan_msg_end+1; xi++) { let msg = await this.uploadChannel.messages.fetch(file.messageids[xi]).catch(() => {return null}) @@ -311,25 +307,7 @@ export default class Files { let attach = Array.from(msg.attachments.values()) for (let i = (useRanges && xi == scan_msg_begin ? ( scan_files_begin - (xi*10) ) : 0); i < (useRanges && xi == scan_msg_end ? ( scan_files_end - (xi*10) + 1 ) : attach.length); i++) { - let d = await axios.get( - attach[i].url, - { - responseType:"arraybuffer", - headers: { - ...(useRanges ? { - "Range": `bytes=${i+(xi*10) == scan_files_begin && range && file.chunkSize ? range.start-(scan_files_begin*file.chunkSize) : "0"}-${i+(xi*10) == scan_files_end && range && file.chunkSize ? range.end-(scan_files_end*file.chunkSize) : ""}` - } : {}) - } - } - ).catch((e:Error) => {console.error(e)}) - - if (d) { - dataStream.push(d.data) - } else { - reject({status:500,message:"internal server error"}) - dataStream.destroy(new Error("file read error")) - return - } + attachments.push(attach[i]) } @@ -337,7 +315,50 @@ export default class Files { } - dataStream.push(null) + let position = 0; + + let getNextChunk = async () => { + let scanning_chunk = attachments[position] + if (!scanning_chunk) { + return null + } + + let d = await axios.get( + scanning_chunk.url, + { + responseType:"arraybuffer", + headers: { + ...(useRanges ? { + "Range": `bytes=${position == 0 && range && file.chunkSize ? range.start-(scan_files_begin*file.chunkSize) : "0"}-${position == attachments.length-1 && range && file.chunkSize ? range.end-(scan_files_end*file.chunkSize) : ""}` + } : {}) + } + } + ).catch((e:Error) => {console.error(e)}) + + position++; + + if (d) { + return d.data + } else { + reject({status:500,message:"internal server error"}) + return "__ERR" + } + } + + let dataStream = new Readable({ + read(){ + getNextChunk().then(async (nextChunk) => { + if (nextChunk == "__ERR") {this.destroy(new Error("file read error")); return} + let response = this.push(nextChunk) + + while (response) { + response = this.push(await getNextChunk()) + } + }) + } + }) + + resolve(dataStream) } else { reject({status:404,message:"not found"})