Oh that's actually done

This commit is contained in:
May 2024-03-04 20:48:53 -08:00
parent 41dc623a28
commit 7d622a481b
5 changed files with 60 additions and 123 deletions

View file

@ -59,7 +59,10 @@ export class Client {
// Remove bulk deletable messages // Remove bulk deletable messages
let bulkDeletable = ids.filter(e => convertSnowflakeToDate(e).valueOf() < 2 * 7 * 24 * 60 * 60 * 1000) let bulkDeletable = ids.filter(e => convertSnowflakeToDate(e).valueOf() < 2 * 7 * 24 * 60 * 60 * 1000)
await this.rest.fetch(`/channels/${this.targetChannel}/messages/bulk-delete`, {method: "POST",body: JSON.stringify({messages: bulkDeletable})}) await this.rest.fetch(`/channels/${this.targetChannel}/messages/bulk-delete`, {
method: "POST",
body: JSON.stringify({messages: bulkDeletable})
})
bulkDeletable.forEach(Map.prototype.delete.bind(this.messageCache)) bulkDeletable.forEach(Map.prototype.delete.bind(this.messageCache))
// everything else, we can do manually... // everything else, we can do manually...
@ -116,6 +119,9 @@ export class Client {
} }
}) })
let controller = new AbortController()
stream.on("error", _ => controller.abort())
//pushBoundary(transformed) //pushBoundary(transformed)
stream.pipe(transformed) stream.pipe(transformed)
@ -124,7 +130,8 @@ export class Client {
body: transformed, body: transformed,
headers: { headers: {
"Content-Type": `multipart/form-data; boundary=${boundary}` "Content-Type": `multipart/form-data; boundary=${boundary}`
} },
signal: controller.signal
}) })

View file

@ -170,18 +170,17 @@ export class UploadStream extends Writable {
callback() callback()
} }
_destroy(error: Error | null) { _destroy(error: Error | null, callback: (err?: Error|null) => void) {
this.error = error || undefined this.error = error || undefined
this.abort() if (error) this.abort()
/* callback()
if (error instanceof WebError) return // destroyed by self
if (error) return // destroyed externally...*/
} }
/** /**
* @description Cancel & unlock the file. When destroy() is called with a non-WebError, this is automatically called * @description Cancel & unlock the file. When destroy() is called with a non-WebError, this is automatically called
*/ */
async abort() { async abort() {
console.log("Aborting")
if (!this.destroyed) this.destroy() if (!this.destroyed) this.destroy()
if (this.current) this.current.destroy(this.error) if (this.current) this.current.destroy(this.error)
await this.files.api.deleteMessages(this.messages) await this.files.api.deleteMessages(this.messages)
@ -202,8 +201,15 @@ export class UploadStream extends Writable {
} }
// Perform checks // Perform checks
if (!this.mime) throw new WebError(400, "no mime provided") if (!this.mime) {
if (!this.name) throw new WebError(400, "no filename provided") this.abort()
throw new WebError(400, "no mime provided")
}
if (!this.name) {
this.abort()
throw new WebError(400, "no filename provided")
}
if (!this.uploadId) this.setUploadId(generateFileId()) if (!this.uploadId) this.setUploadId(generateFileId())
let ogf = this.files.files[this.uploadId!] let ogf = this.files.files[this.uploadId!]
@ -301,6 +307,8 @@ export class UploadStream extends Writable {
console.log(`Sent: ${message.id}`) console.log(`Sent: ${message.id}`)
this.newmessage_debounce = true this.newmessage_debounce = true
this.emit("debounceReleased") this.emit("debounceReleased")
}).catch(e => {
if (!this.errored) this.destroy(e)
}) })
return stream return stream

View file

@ -1,81 +0,0 @@
import { Transform, Readable } from "node:stream";
import { TransformCallback } from "stream";
let content_disposition_matcher = /\s*([^=;]+)(?:=(?:"((?:\\"|[^"])*)"|([^;]*))?;?|;?)/g // probably a bad regex but IDC
/**
* @description Checks if a chunk can be completed by something else (ex. a boundary)
* @param chunk Chunk to perform check on
* @param cmp Chunk to check whether or not something is completable with
* @returns Whether or not this chunk could be completed by cmp
*/
function endChk(chunk: Buffer, cmp: Buffer) {
for (let i = cmp.byteLength-1; i > 0; i--)
if (chunk.subarray(-(i-1)).equals(cmp.subarray(0,i)))
return true
return false
}
export type Headers = {
["content-disposition"]?: Record<string, boolean|string>,
["content-type"]?: string
}
export class Field extends Readable {
headers: Headers = {}
constructor(unparsedHeaders: string) {
super()
this.headers = Object.fromEntries(
unparsedHeaders.split("\r\n")
.map(e => [e.split(":")[0].trim(), e.split(":").slice(1).join(":").trim()])
)
if (this.headers["content-disposition"])
this.headers["content-disposition"] = Object.fromEntries(Array.from(
(this.headers["content-disposition"] as unknown as string)
.matchAll(content_disposition_matcher)).map(e => [e[1], e[2] ? e[2] : true]))
}
_read(size: number): void {
this.emit("hungry")
}
collect(maxSize: number = 0) {
return new Promise<Buffer>((res,rej) => {
let bufs: Buffer[] = []
this.on("data", (data) => {
if (maxSize && bufs.reduce((cur, acc) => cur+acc.byteLength, 0) > maxSize)
this.destroy(new Error("went above collect()'s maxSize"))
bufs.push(data)
})
this.on("end", () => res(Buffer.concat(bufs)))
this.on("error", (err) => rej(err))
})
}
}
export default class FormDataParser extends Transform {
readableObjectMode = true
readonly boundary: string
private workingMemory: Buffer | undefined
private workingField: Field | undefined
constructor(boundary: string) {
super()
this.boundary = boundary
}
_transform(_chunk: any, encoding: BufferEncoding, callback: TransformCallback): void {
let chunk = this.workingMemory ? Buffer.concat([this.workingMemory, _chunk]) : _chunk
}
}

View file

@ -7,7 +7,6 @@ import RangeParser, { type Range } from "range-parser"
import ServeError from "../../../lib/errors.js" import ServeError from "../../../lib/errors.js"
import Files, { WebError } from "../../../lib/files.js" import Files, { WebError } from "../../../lib/files.js"
import { getAccount, requiresPermissions } from "../../../lib/middleware.js" import { getAccount, requiresPermissions } from "../../../lib/middleware.js"
import FormDataParser, { Field } from "../../../lib/formdata.js"
import {Readable} from "node:stream" import {Readable} from "node:stream"
import {ReadableStream as StreamWebReadable} from "node:stream/web" import {ReadableStream as StreamWebReadable} from "node:stream/web"
import formidable from "formidable" import formidable from "formidable"
@ -102,7 +101,20 @@ export default function (files: Files) {
requiresPermissions("upload"), requiresPermissions("upload"),
(ctx) => { return new Promise((resolve,reject) => { (ctx) => { return new Promise((resolve,reject) => {
ctx.env.incoming.removeAllListeners("data") // remove hono's buffering ctx.env.incoming.removeAllListeners("data") // remove hono's buffering
console.log('awawa')
let errEscalated = false
function escalate(err:Error) {
if (errEscalated) return
errEscalated = true
if ("httpCode" in err)
ctx.status(err.httpCode as number)
else if (err instanceof WebError)
ctx.status(err.statusCode)
else ctx.status(400)
resolve(ctx.body(err.message))
}
let acc = ctx.get("account") as Accounts.Account | undefined let acc = ctx.get("account") as Accounts.Account | undefined
if (!ctx.req.header("Content-Type")?.startsWith("multipart/form-data")) { if (!ctx.req.header("Content-Type")?.startsWith("multipart/form-data")) {
@ -116,7 +128,6 @@ export default function (files: Files) {
resolve(ctx.body("[err] body must be supplied")) resolve(ctx.body("[err] body must be supplied"))
return return
} }
console.log('awawawawa')
let file = files.createWriteStream(acc?.id) let file = files.createWriteStream(acc?.id)
let parser = formidable({ let parser = formidable({
@ -126,50 +137,44 @@ export default function (files: Files) {
}) })
parser.onPart = function(part) { parser.onPart = function(part) {
console.log(part) if (!part.originalFilename || !part.mimetype) {
if (part.originalFilename == "" || !part.mimetype) { parser._handlePart(part)
parser._handlePart(part); return return
} }
// lol // lol
if (part.name == "file") { if (part.name == "file") {
file.on("drain", () => { file.setName(part.originalFilename || "")
ctx.env.incoming.resume() file.setType(part.mimetype || "")
})
part.addListener("data", (data: Buffer) => { file.on("drain", () => ctx.env.incoming.resume())
file.on("error", (err) => part.emit("error", err))
part.on("data", (data: Buffer) => {
if (!file.write(data)) if (!file.write(data))
ctx.env.incoming.pause() ctx.env.incoming.pause()
}) })
part.on("end", () => file.end())
} }
} }
parser.on("field", (k,v) => { parser.on("field", (k,v) => {
console.log(k,v)
if (k == "uploadId") if (k == "uploadId")
file.setUploadId(v) file.setUploadId(v)
}) })
parser.parse(ctx.env.incoming).catch(e => console.log(e)) parser.parse(ctx.env.incoming).catch(e => console.error(e))
console.log("Parsing")
parser.on('error', (err) => { parser.on('error', (err) => {
if ("httpCode" in err) escalate(err)
ctx.status(err.httpCode) if (!file.destroyed) file.destroy(err)
else ctx.status(400)
resolve(ctx.body(err.message))
}) })
file.on("error", escalate)
file.on("error", (err) => { file.on("finish", async () => {
if (err instanceof WebError) if (!ctx.env.incoming.readableEnded) await new Promise(res => ctx.env.incoming.once("end", res))
ctx.status(err.statusCode) file.commit()
resolve(ctx.body(err?.message)) .then(id => resolve(ctx.body(id!)))
}) .catch(escalate)
file.on("finish", () => {
file.commit().then(id => resolve(ctx.body(id!))).catch((err) => {
if (err instanceof WebError)
ctx.status(err.statusCode)
resolve(ctx.body(err?.message))
})
}) })
})} })}

View file

@ -60,7 +60,7 @@
let handle_fetch_promise = (x,prom) => { let handle_fetch_promise = (x,prom) => {
return prom.then(async (res) => { return prom.then(async (res) => {
let txt = await res.text() let txt = await res.text()
if (txt.startsWith("[err]")) uploads[x].uploadStatus.error = txt; if (!res.ok) uploads[x].uploadStatus.error = txt;
else { else {
uploads[x].uploadStatus.fileId = txt; uploads[x].uploadStatus.fileId = txt;
@ -84,12 +84,10 @@
switch(v.type) { switch(v.type) {
case "upload": case "upload":
let fd = new FormData() let fd = new FormData()
if (v.params.uploadId) fd.append("uploadId", v.params.uploadId)
fd.append("file",v.file) fd.append("file",v.file)
return handle_fetch_promise(x,fetch("/upload",{ return handle_fetch_promise(x,fetch("/upload",{
headers: {
"monofile-params": JSON.stringify(v.params)
},
method: "POST", method: "POST",
body: fd body: fd
})) }))