diff --git a/src/server/lib/DiscordAPI/index.ts b/src/server/lib/DiscordAPI/index.ts index 5dd29e5..cccfe0c 100644 --- a/src/server/lib/DiscordAPI/index.ts +++ b/src/server/lib/DiscordAPI/index.ts @@ -59,7 +59,10 @@ export class Client { // Remove bulk deletable messages let bulkDeletable = ids.filter(e => convertSnowflakeToDate(e).valueOf() < 2 * 7 * 24 * 60 * 60 * 1000) - await this.rest.fetch(`/channels/${this.targetChannel}/messages/bulk-delete`, {method: "POST",body: JSON.stringify({messages: bulkDeletable})}) + await this.rest.fetch(`/channels/${this.targetChannel}/messages/bulk-delete`, { + method: "POST", + body: JSON.stringify({messages: bulkDeletable}) + }) bulkDeletable.forEach(Map.prototype.delete.bind(this.messageCache)) // everything else, we can do manually... @@ -116,6 +119,9 @@ export class Client { } }) + let controller = new AbortController() + stream.on("error", _ => controller.abort()) + //pushBoundary(transformed) stream.pipe(transformed) @@ -124,7 +130,8 @@ export class Client { body: transformed, headers: { "Content-Type": `multipart/form-data; boundary=${boundary}` - } + }, + signal: controller.signal }) diff --git a/src/server/lib/files.ts b/src/server/lib/files.ts index 07ab3bd..9b02469 100644 --- a/src/server/lib/files.ts +++ b/src/server/lib/files.ts @@ -170,18 +170,17 @@ export class UploadStream extends Writable { callback() } - _destroy(error: Error | null) { + _destroy(error: Error | null, callback: (err?: Error|null) => void) { this.error = error || undefined - this.abort() - /* - if (error instanceof WebError) return // destroyed by self - if (error) return // destroyed externally...*/ + if (error) this.abort() + callback() } /** * @description Cancel & unlock the file. When destroy() is called with a non-WebError, this is automatically called */ async abort() { + console.log("Aborting") if (!this.destroyed) this.destroy() if (this.current) this.current.destroy(this.error) await this.files.api.deleteMessages(this.messages) @@ -202,8 +201,15 @@ export class UploadStream extends Writable { } // Perform checks - if (!this.mime) throw new WebError(400, "no mime provided") - if (!this.name) throw new WebError(400, "no filename provided") + if (!this.mime) { + this.abort() + throw new WebError(400, "no mime provided") + } + if (!this.name) { + this.abort() + throw new WebError(400, "no filename provided") + } + if (!this.uploadId) this.setUploadId(generateFileId()) let ogf = this.files.files[this.uploadId!] @@ -301,6 +307,8 @@ export class UploadStream extends Writable { console.log(`Sent: ${message.id}`) this.newmessage_debounce = true this.emit("debounceReleased") + }).catch(e => { + if (!this.errored) this.destroy(e) }) return stream diff --git a/src/server/lib/formdata.ts b/src/server/lib/formdata.ts deleted file mode 100644 index 57e6e68..0000000 --- a/src/server/lib/formdata.ts +++ /dev/null @@ -1,81 +0,0 @@ -import { Transform, Readable } from "node:stream"; -import { TransformCallback } from "stream"; - -let content_disposition_matcher = /\s*([^=;]+)(?:=(?:"((?:\\"|[^"])*)"|([^;]*))?;?|;?)/g // probably a bad regex but IDC - -/** - * @description Checks if a chunk can be completed by something else (ex. a boundary) - * @param chunk Chunk to perform check on - * @param cmp Chunk to check whether or not something is completable with - * @returns Whether or not this chunk could be completed by cmp - */ -function endChk(chunk: Buffer, cmp: Buffer) { - for (let i = cmp.byteLength-1; i > 0; i--) - if (chunk.subarray(-(i-1)).equals(cmp.subarray(0,i))) - return true - return false -} - -export type Headers = { - ["content-disposition"]?: Record, - ["content-type"]?: string -} - -export class Field extends Readable { - - headers: Headers = {} - - constructor(unparsedHeaders: string) { - super() - this.headers = Object.fromEntries( - unparsedHeaders.split("\r\n") - .map(e => [e.split(":")[0].trim(), e.split(":").slice(1).join(":").trim()]) - ) - - if (this.headers["content-disposition"]) - this.headers["content-disposition"] = Object.fromEntries(Array.from( - (this.headers["content-disposition"] as unknown as string) - .matchAll(content_disposition_matcher)).map(e => [e[1], e[2] ? e[2] : true])) - } - - _read(size: number): void { - this.emit("hungry") - } - - collect(maxSize: number = 0) { - return new Promise((res,rej) => { - let bufs: Buffer[] = [] - - this.on("data", (data) => { - if (maxSize && bufs.reduce((cur, acc) => cur+acc.byteLength, 0) > maxSize) - this.destroy(new Error("went above collect()'s maxSize")) - bufs.push(data) - }) - - this.on("end", () => res(Buffer.concat(bufs))) - this.on("error", (err) => rej(err)) - }) - } - -} - -export default class FormDataParser extends Transform { - - readableObjectMode = true - - readonly boundary: string - private workingMemory: Buffer | undefined - private workingField: Field | undefined - - constructor(boundary: string) { - super() - this.boundary = boundary - } - - _transform(_chunk: any, encoding: BufferEncoding, callback: TransformCallback): void { - - let chunk = this.workingMemory ? Buffer.concat([this.workingMemory, _chunk]) : _chunk - - } - -} \ No newline at end of file diff --git a/src/server/routes/api/v0/primaryApi.ts b/src/server/routes/api/v0/primaryApi.ts index 6c7c3b1..b3ecfaf 100644 --- a/src/server/routes/api/v0/primaryApi.ts +++ b/src/server/routes/api/v0/primaryApi.ts @@ -7,7 +7,6 @@ import RangeParser, { type Range } from "range-parser" import ServeError from "../../../lib/errors.js" import Files, { WebError } from "../../../lib/files.js" import { getAccount, requiresPermissions } from "../../../lib/middleware.js" -import FormDataParser, { Field } from "../../../lib/formdata.js" import {Readable} from "node:stream" import {ReadableStream as StreamWebReadable} from "node:stream/web" import formidable from "formidable" @@ -102,7 +101,20 @@ export default function (files: Files) { requiresPermissions("upload"), (ctx) => { return new Promise((resolve,reject) => { ctx.env.incoming.removeAllListeners("data") // remove hono's buffering - console.log('awawa') + + let errEscalated = false + function escalate(err:Error) { + if (errEscalated) return + errEscalated = true + + if ("httpCode" in err) + ctx.status(err.httpCode as number) + else if (err instanceof WebError) + ctx.status(err.statusCode) + else ctx.status(400) + resolve(ctx.body(err.message)) + } + let acc = ctx.get("account") as Accounts.Account | undefined if (!ctx.req.header("Content-Type")?.startsWith("multipart/form-data")) { @@ -116,7 +128,6 @@ export default function (files: Files) { resolve(ctx.body("[err] body must be supplied")) return } - console.log('awawawawa') let file = files.createWriteStream(acc?.id) let parser = formidable({ @@ -126,50 +137,44 @@ export default function (files: Files) { }) parser.onPart = function(part) { - console.log(part) - if (part.originalFilename == "" || !part.mimetype) { - parser._handlePart(part); return + if (!part.originalFilename || !part.mimetype) { + parser._handlePart(part) + return } // lol if (part.name == "file") { - file.on("drain", () => { - ctx.env.incoming.resume() - }) - part.addListener("data", (data: Buffer) => { + file.setName(part.originalFilename || "") + file.setType(part.mimetype || "") + + file.on("drain", () => ctx.env.incoming.resume()) + file.on("error", (err) => part.emit("error", err)) + + part.on("data", (data: Buffer) => { if (!file.write(data)) ctx.env.incoming.pause() }) + part.on("end", () => file.end()) } } parser.on("field", (k,v) => { - console.log(k,v) if (k == "uploadId") file.setUploadId(v) }) - parser.parse(ctx.env.incoming).catch(e => console.log(e)) - console.log("Parsing") + parser.parse(ctx.env.incoming).catch(e => console.error(e)) parser.on('error', (err) => { - if ("httpCode" in err) - ctx.status(err.httpCode) - else ctx.status(400) - resolve(ctx.body(err.message)) + escalate(err) + if (!file.destroyed) file.destroy(err) }) + file.on("error", escalate) - file.on("error", (err) => { - if (err instanceof WebError) - ctx.status(err.statusCode) - resolve(ctx.body(err?.message)) - }) - - file.on("finish", () => { - file.commit().then(id => resolve(ctx.body(id!))).catch((err) => { - if (err instanceof WebError) - ctx.status(err.statusCode) - resolve(ctx.body(err?.message)) - }) + file.on("finish", async () => { + if (!ctx.env.incoming.readableEnded) await new Promise(res => ctx.env.incoming.once("end", res)) + file.commit() + .then(id => resolve(ctx.body(id!))) + .catch(escalate) }) })} diff --git a/src/svelte/elem/UploadWindow.svelte b/src/svelte/elem/UploadWindow.svelte index 0b2b136..4af300f 100644 --- a/src/svelte/elem/UploadWindow.svelte +++ b/src/svelte/elem/UploadWindow.svelte @@ -60,7 +60,7 @@ let handle_fetch_promise = (x,prom) => { return prom.then(async (res) => { let txt = await res.text() - if (txt.startsWith("[err]")) uploads[x].uploadStatus.error = txt; + if (!res.ok) uploads[x].uploadStatus.error = txt; else { uploads[x].uploadStatus.fileId = txt; @@ -84,12 +84,10 @@ switch(v.type) { case "upload": let fd = new FormData() + if (v.params.uploadId) fd.append("uploadId", v.params.uploadId) fd.append("file",v.file) return handle_fetch_promise(x,fetch("/upload",{ - headers: { - "monofile-params": JSON.stringify(v.params) - }, method: "POST", body: fd }))