i cant believe it took me this long to realize

This commit is contained in:
May 2023-12-27 13:51:22 -08:00
parent b80ddd26e7
commit 1805c631f1
3 changed files with 79 additions and 231 deletions

View file

@ -78,7 +78,7 @@ program.command("upload")
console.log(`started: ${file}`) console.log(`started: ${file}`)
writable.on("drain", () => { writable.on("drain", () => {
console.log("Drained") console.log("Drained");
}) })
writable.on("finish", () => { writable.on("finish", () => {
@ -95,37 +95,11 @@ program.command("upload")
writable.on("close", () => { writable.on("close", () => {
console.log("Closed.") console.log("Closed.")
}) });
;(await fs.createReadStream(file)).pipe( ;(await fs.createReadStream(file)).pipe(
writable writable
) )
}) })
program.command("memup")
.description("Upload a file to the instance (no stream)")
.argument("<file>", "Path to the file you'd like to upload")
.option("-id, --fileid <id>", 'Custom file ID to use')
.action(async (file, options) => {
await (new Promise<void>(resolve => setTimeout(() => resolve(), 1000)))
if (!(fs.existsSync(file) && (await stat(file)).isFile()))
throw `${file} is not a file`
let buf = fs.readFileSync(file)
let id = files.uploadFile({
filename: basename(file),
mime: "application/octet-stream",
uploadId: options.fileid
}, buf)
console.log(`uploaded: ${await id}`)
})
program.parse() program.parse()

View file

@ -2,6 +2,7 @@ import { REST } from "./DiscordRequests"
import type { APIMessage } from "discord-api-types/v10" import type { APIMessage } from "discord-api-types/v10"
import FormData from "form-data" import FormData from "form-data"
import { Readable } from "node:stream" import { Readable } from "node:stream"
import { Configuration } from "../files"
const EXPIRE_AFTER = 20 * 60 * 1000 const EXPIRE_AFTER = 20 * 60 * 1000
const DISCORD_EPOCH = 1420070400000 const DISCORD_EPOCH = 1420070400000
@ -22,12 +23,14 @@ export class Client {
private readonly token : string private readonly token : string
private readonly rest : REST private readonly rest : REST
private readonly targetChannel : string private readonly targetChannel : string
private readonly config : Configuration
private messageCache : Map<string, MessageCacheObject> = new Map() private messageCache : Map<string, MessageCacheObject> = new Map()
constructor(token: string, targetChannel: string) { constructor(token: string, config: Configuration) {
this.token = token this.token = token
this.rest = new REST(token) this.rest = new REST(token)
this.targetChannel = targetChannel this.targetChannel = config.targetChannel
this.config = config
} }
async fetchMessage(id: string, cache: boolean = true) { async fetchMessage(id: string, cache: boolean = true) {
@ -70,21 +73,56 @@ export class Client {
} }
async send(chunks: (Readable|Buffer)[]) { async send(stream: Readable) {
// make formdata
let fd = new FormData() let bytes_sent = 0
chunks.forEach((v,x) => { let file_number = 0
fd.append(`files[${x}]`, v, { filename: Math.random().toString().slice(2) }) let boundary = "-".repeat(20) + Math.random().toString().slice(2)
let pushBoundary = (stream: Readable) =>
stream.push(`--${boundary}\r\nContent-Disposition: form-data, name="files[${file_number}]"; filename="${Math.random().toString().slice(2)}\r\nContent-Type: application/octet-stream\r\n\r\n"`)
let boundPush = (stream: Readable, chunk: Buffer) => {
let position = 0
while (position < chunk.length) {
let capture = Math.min(
this.config.maxDiscordFileSize - (bytes_sent % this.config.maxDiscordFileSize),
chunk.length
) + 1
stream.push( chunk.subarray(position, capture) )
position += capture, bytes_sent += capture-1
console.log("Chunk progress:", bytes_sent % this.config.maxDiscordFileSize, "B")
if (bytes_sent % this.config.maxDiscordFileSize == 0) {
console.log("Progress is 0. Pushing boundary")
pushBoundary(stream)
}
}
}
let transformed = new Readable({
read(size) {
let result = stream.read(size)
if (result) boundPush(this, result)
}
}) })
pushBoundary(transformed)
let returned = await this.rest.fetch(`/channels/${this.targetChannel}/messages`, { let returned = await this.rest.fetch(`/channels/${this.targetChannel}/messages`, {
method: "POST", method: "POST",
body: fd, body: transformed,
headers: fd.getHeaders() headers: {
"Content-Type": `multipart/form-data; boundary=${boundary}`
}
}) })
let response = (await returned.json() as APIMessage) let response = (await returned.json() as APIMessage)
console.log(JSON.stringify(response, null, 4)) console.log(JSON.stringify(response, null, 4))
return response return response
} }
} }

View file

@ -107,7 +107,7 @@ namespace StreamHelpers {
readonly targetSize: number readonly targetSize: number
filled: number = 0 filled: number = 0
buffer: UploadStream[] = [] current?: Readable
messages: string[] = [] messages: string[] = []
writable?: Writable writable?: Writable
@ -122,48 +122,40 @@ namespace StreamHelpers {
this.targetSize = targetSize this.targetSize = targetSize
} }
private async startMessage(streamCount: number): Promise<UploadStream[] | undefined> { private async startMessage(): Promise<Readable | undefined> {
console.log(`Starting a message with ${streamCount} stream(s)`)
if (!this.newmessage_debounce) return if (!this.newmessage_debounce) return
this.newmessage_debounce = false this.newmessage_debounce = false
let streams = []
let sbuf = this let sbuf = this
let stream = new Readable({
read() {
console.log("Read called. Emitting drain")
sbuf.writable!.emit("drain")
}
})
stream.pause()
// can't think of a better way to do console.log(`Starting a message`)
for (let i = 0; i < streamCount; i++) { this.api.send(stream).then(message => {
streams.push({ this.messages.push(message.id)
uploaded: 0, console.log(`Sent: ${message.id}`)
stream: new Readable({ this.newmessage_debounce = true
read() { })
console.log('FD is reading stream. Emitting drain...')
sbuf.writable!.emit("drain");
}
})
})
}
let message = await this.api.send(streams.map(e => e.stream)); return stream
this.messages.push(message.id)
this.newmessage_debounce = true
return streams
} }
async getNextStream() { async getNextStream() {
console.log("Getting next stream...") console.log("Getting stream...")
if (this.buffer[0]) return this.buffer[0] if (this.current) return this.current
else { else {
// startmessage.... idk // startmessage.... idk
await this.startMessage( this.current = await this.startMessage();
this.messages.length < Math.ceil(this.targetSize/this.files.config.maxDiscordFileSize/10) console.log("current:" + (this.current ? "yes" : "no"))
? 10 return this.current
: Math.ceil(this.targetSize/this.files.config.maxDiscordFileSize) - this.messages.length*10
);
return this.buffer[0]
} }
} }
@ -179,7 +171,7 @@ export default class Files {
constructor(config: Configuration) { constructor(config: Configuration) {
this.config = config this.config = config
this.api = new API(process.env.TOKEN!, config.targetChannel) this.api = new API(process.env.TOKEN!, config)
readFile(this.data_directory+ "/files.json") readFile(this.data_directory+ "/files.json")
.then((buf) => { .then((buf) => {
@ -230,39 +222,14 @@ export default class Files {
let fs_obj = this let fs_obj = this
let wt = new Writable({ let wt = new Writable({
async write(data: Buffer) { write(data: Buffer, encoding, callback) {
console.log("Write to stream attempted") console.log("Write to stream attempted")
let positionInBuf = 0 buf.getNextStream().then(ns => {
while (positionInBuf < data.byteLength) { if (ns) {ns.push(data); callback()} else this.end();
let ns = (await buf.getNextStream().catch(e => { console.log(`pushed... ${ns ? "ns exists" : "ns doesn't exist"}... ${data.byteLength} byte chunk`);
return
return e })
})) as Error | undefined | StreamHelpers.UploadStream },
if (!ns || ns instanceof Error) {
this.destroy(ns)
return
}
let bytesToPush = Math.min(
data.byteLength,
fs_obj.config.maxDiscordFileSize-ns.uploaded
)
ns.stream.push(data.subarray(positionInBuf, positionInBuf + bytesToPush))
ns.uploaded += bytesToPush
buf.filled += bytesToPush
positionInBuf += bytesToPush
if (ns.uploaded == fs_obj.config.maxDiscordFileSize)
buf.buffer.splice(0, 1)[0]?.stream.push(null)
if (buf.filled == buf.targetSize) {
this.destroy()
return
}
}
return false
}
}) })
buf.writable = wt; buf.writable = wt;
@ -271,137 +238,6 @@ export default class Files {
} }
/**
* @description Uploads a new file
* @param metadata Settings for your new upload
* @param buffer Buffer containing file content
* @returns Promise which resolves to the ID of the new file
*/
async uploadFile(
metadata: FileUploadSettings,
buffer: Buffer
): Promise<string | StatusCodeError> {
if (!metadata.filename || !metadata.mime)
throw { status: 400, message: "missing filename/mime" }
let uploadId = (metadata.uploadId || generateFileId()).toString()
if (
(uploadId.match(id_check_regex) || [])[0] != uploadId ||
uploadId.length > this.config.maxUploadIdLength
)
throw { status: 400, message: "invalid id" }
if (
this.files[uploadId] &&
(metadata.owner
? this.files[uploadId].owner != metadata.owner
: true)
)
throw {
status: 400,
message: "you are not the owner of this file id",
}
if (this.files[uploadId] && this.files[uploadId].reserved)
throw {
status: 400,
message:
"already uploading this file. if your file is stuck in this state, contact an administrator",
}
if (metadata.filename.length > 128)
throw { status: 400, message: "name too long" }
if (metadata.mime.length > 128)
throw { status: 400, message: "mime too long" }
// reserve file, hopefully should prevent
// large files breaking
let existingFile = this.files[uploadId]
// save
if (metadata.owner) {
await files.index(metadata.owner, uploadId)
}
// get buffer
if (
buffer.byteLength >=
this.config.maxDiscordFileSize * this.config.maxDiscordFiles
)
throw { status: 400, message: "file too large" }
// generate buffers to upload
let toUpload = []
for (
let i = 0;
i < Math.ceil(buffer.byteLength / this.config.maxDiscordFileSize);
i++
) {
toUpload.push(
buffer.subarray(
i * this.config.maxDiscordFileSize,
Math.min(
buffer.byteLength,
(i + 1) * this.config.maxDiscordFileSize
)
)
)
}
// begin uploading
let uploadGroups = []
for (let i = 0; i < Math.ceil(toUpload.length / 10); i++) {
uploadGroups.push(toUpload.slice(i * 10, (i + 1) * 10))
}
let msgIds = []
for (const uploadGroup of uploadGroups) {
let message = await this.api.send(uploadGroup)
if (message) {
msgIds.push(message.id)
} else {
if (!existingFile) delete this.files[uploadId]
else this.files[uploadId] = existingFile
throw { status: 500, message: "please try again" }
}
}
if (existingFile) this.api.deleteMessages(existingFile.messageids)
const { filename, mime, owner } = metadata
this.files[uploadId] = {
filename,
messageids: msgIds,
mime,
owner,
sizeInBytes: buffer.byteLength,
visibility: existingFile
? existingFile.visibility
: metadata.owner
? Accounts.getFromId(metadata.owner)?.defaultFileVisibility
: undefined,
// so that json.stringify doesnt include tag:undefined
...((existingFile || {}).tag ? { tag: existingFile.tag } : {}),
chunkSize: this.config.maxDiscordFileSize,
}
return this.write().then(_ => uploadId).catch(_ => {
delete this.files[uploadId]
throw { status: 500, message: "failed to save database" }
})
}
// fs // fs
/** /**