From c5040e1b79c986daafb3ee019a97f4cfd7a3bd4b Mon Sep 17 00:00:00 2001 From: stringsplit <77242831+nbitzz@users.noreply.github.com> Date: Fri, 15 Dec 2023 19:08:42 +0000 Subject: [PATCH] hwo tf am i gonna add backpressure to this i just realized --- src/server/lib/files.ts | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/server/lib/files.ts b/src/server/lib/files.ts index 56cc9b8..d267241 100644 --- a/src/server/lib/files.ts +++ b/src/server/lib/files.ts @@ -107,6 +107,9 @@ namespace StreamHelpers { readonly targetSize: number filled: number = 0 buffer: UploadStream[] = [] + messages: string[] = [] + + private newmessage_debounce : boolean = true api: API @@ -115,7 +118,10 @@ namespace StreamHelpers { this.targetSize = targetSize } - private startMessage(streamCount: number): UploadStream[] { + private async startMessage(streamCount: number): Promise { + + if (!this.newmessage_debounce) return + this.newmessage_debounce = false let streams = [] @@ -123,11 +129,13 @@ namespace StreamHelpers { for (let i = 0; i < streamCount; i++) { streams.push({ uploaded: 0, - stream: new Readable() + stream: new Readable({}) }) } - this.api.send(streams.map(e => e.stream)); + let message = await this.api.send(streams.map(e => e.stream)); + this.messages.push(message.id) + this.newmessage_debounce = true return streams @@ -205,9 +213,12 @@ export default class Files { async write(data: Buffer) { let positionInBuf = 0 while (positionInBuf < data.byteLength) { - let ns = (await buf.getNextStream()) - if (!ns) { - this.destroy() + let ns = (await buf.getNextStream().catch(e => { + + return e + })) + if (!ns || ns instanceof Error) { + this.destroy(ns) return } @@ -222,7 +233,7 @@ export default class Files { positionInBuf += bytesToPush if (ns.uploaded == fs_obj.config.maxDiscordFileSize) - buf.buffer.splice(0, 1) + buf.buffer.splice(0, 1)[0]?.stream.destroy() if (buf.filled == buf.targetSize) { this.destroy()