almost works

This commit is contained in:
split / May 2024-03-02 01:01:12 -08:00
parent 0a78c197a4
commit daca021522
4 changed files with 85 additions and 42 deletions

View file

@ -50,7 +50,15 @@ program.command("download")
if (fs.existsSync(out) && (await stat(out)).isDirectory()) if (fs.existsSync(out) && (await stat(out)).isDirectory())
out = `${out.replace(/\/+$/, "")}/${fp.filename}` out = `${out.replace(/\/+$/, "")}/${fp.filename}`
;(await files.readFileStream(id)).pipe( let filestream = await files.readFileStream(id)
let prog=0
filestream.on("data", dt => {
prog+=dt.byteLength
console.log(`Downloading ${fp.filename}: ${Math.floor(prog/(fp.sizeInBytes??0)*10000)/100}% (${Math.floor(prog/(1024*1024))}MiB/${Math.floor((fp.sizeInBytes??0)/(1024*1024))}MiB)`)
})
filestream.pipe(
fs.createWriteStream(out) fs.createWriteStream(out)
) )
}) })
@ -71,9 +79,10 @@ program.command("upload")
let writable = files.createWriteStream() let writable = files.createWriteStream()
writable writable
.setName(file) .setName(basename(file))
?.setType("application/octet-stream") ?.setType("application/octet-stream")
?.setUploadId(options.fileId)
if (options.id) writable.setUploadId(options.id)
if (!(writable instanceof Writable)) if (!(writable instanceof Writable))
throw JSON.stringify(writable, null, 3) throw JSON.stringify(writable, null, 3)
@ -84,8 +93,9 @@ program.command("upload")
console.log("Drained"); console.log("Drained");
}) })
writable.on("finish", () => { writable.on("finish", async () => {
console.log("Finished!") console.log("Finished!")
console.log(`ID: ${await writable.commit()}`)
}) })
writable.on("pipe", () => { writable.on("pipe", () => {

View file

@ -1,7 +1,7 @@
import { REST } from "./DiscordRequests.js" import { REST } from "./DiscordRequests.js"
import type { APIMessage } from "discord-api-types/v10" import type { APIMessage } from "discord-api-types/v10"
import FormData from "form-data" import FormData from "form-data"
import { Readable } from "node:stream" import { Transform, type Readable } from "node:stream"
import { Configuration } from "../files.js" import { Configuration } from "../files.js"
const EXPIRE_AFTER = 20 * 60 * 1000 const EXPIRE_AFTER = 20 * 60 * 1000
@ -84,7 +84,13 @@ export class Client {
let boundPush = (stream: Readable, chunk: Buffer) => { let boundPush = (stream: Readable, chunk: Buffer) => {
let position = 0 let position = 0
console.log(`Chunk length ${chunk.byteLength}`) console.log(`Chunk length ${chunk.byteLength}`)
while (position < chunk.byteLength) { while (position < chunk.byteLength) {
if ((bytes_sent % this.config.maxDiscordFileSize) == 0) {
console.log("Progress is 0. Pushing boundary")
pushBoundary(stream)
}
let capture = Math.min( let capture = Math.min(
this.config.maxDiscordFileSize - (bytes_sent % this.config.maxDiscordFileSize) + 1, this.config.maxDiscordFileSize - (bytes_sent % this.config.maxDiscordFileSize) + 1,
chunk.byteLength chunk.byteLength
@ -94,29 +100,24 @@ export class Client {
position += capture, bytes_sent += capture position += capture, bytes_sent += capture
console.log("Chunk progress:", bytes_sent % this.config.maxDiscordFileSize, "B") console.log("Chunk progress:", bytes_sent % this.config.maxDiscordFileSize, "B")
if ((bytes_sent % this.config.maxDiscordFileSize) == 0) {
console.log("Progress is 0. Pushing boundary")
pushBoundary(stream)
}
} }
} }
let transformed = new Readable({ let transformed = new Transform({
read() { transform(chunk, encoding, callback) {
let result = stream.read() boundPush(this, chunk)
if (result) boundPush(this, result) callback()
if (result === null) { },
console.log("Ending") flush(callback) {
this.push(`\n--${boundary}--`) this.push(`\n--${boundary}--`)
this.push(null) callback()
}
} }
}) })
pushBoundary(transformed) //pushBoundary(transformed)
stream.pipe(transformed)
let returned = await this.rest.fetch(`/channels/${this.targetChannel}/messages`, { let returned = await this.rest.fetch(`/channels/${this.targetChannel}/messages`, {
method: "POST", method: "POST",

View file

@ -4,7 +4,7 @@ import crypto from "node:crypto"
import { files } from "./accounts.js" import { files } from "./accounts.js"
import { Client as API } from "./DiscordAPI/index.js" import { Client as API } from "./DiscordAPI/index.js"
import type {APIAttachment} from "discord-api-types/v10" import type {APIAttachment} from "discord-api-types/v10"
import "dotenv" import "dotenv/config"
import * as Accounts from "./accounts.js" import * as Accounts from "./accounts.js"
@ -127,29 +127,46 @@ export class UploadStream extends Writable {
async _write(data: Buffer, encoding: string, callback: () => void) { async _write(data: Buffer, encoding: string, callback: () => void) {
console.log("Write to stream attempted") console.log("Write to stream attempted")
if (filled + data.byteLength > (this.files.config.maxDiscordFileSize*this.files.config.maxDiscordFiles)) if (this.filled + data.byteLength > (this.files.config.maxDiscordFileSize*this.files.config.maxDiscordFiles))
return this.destroy(new WebError(413, "maximum file size exceeded")) return this.destroy(new WebError(413, "maximum file size exceeded"))
// cut up the buffer into message sized chunks // cut up the buffer into message sized chunks
let progress = 0 let position = 0
let readyForMore = false
while (progress < data.byteLength) { while (position < data.byteLength) {
let capture = Math.min( let capture = Math.min(
(this.config.maxDiscordFileSize*10) - (this.filled % (this.config.maxDiscordFileSize*10)) + 1, (this.files.config.maxDiscordFileSize*10) - (this.filled % (this.files.config.maxDiscordFileSize*10)) + 1,
chunk.byteLength data.byteLength
) )
console.log(`Capturing ${capture} bytes, ${chunk.subarray(position, position + capture).byteLength}`) console.log(`Capturing ${capture} bytes from fl, ${data.subarray(position, position + capture).byteLength}`)
let nextStream = await this.getNextStream() if (!this.current) await this.getNextStream()
nextStream.push( chunk.subarray(position, position+capture) ) if (!this.current) {
this.destroy(new Error("getNextStream called during debounce")); return
}
readyForMore = this.current.push( data.subarray(position, position+capture) )
console.log(`pushed ${data.byteLength} byte chunk`); console.log(`pushed ${data.byteLength} byte chunk`);
progress += capture, this.filled += capture position += capture, this.filled += capture
// message is full, so tell the next run to get a new message // message is full, so tell the next run to get a new message
if (this.filled % (this.config.maxDiscordFileSize*10) == 0) if (this.filled % (this.files.config.maxDiscordFileSize*10) == 0) {
this.current!.push(null)
this.current = undefined this.current = undefined
}
} }
if (readyForMore || !this.current) callback()
else this.once("exec-callback", callback)
}
async _final(callback: (error?: Error | null | undefined) => void) {
if (this.current) {
this.current.push(null);
// i probably dnt need this but whateverrr :3
await new Promise((res,rej) => this.once("debounceReleased", res))
}
callback() callback()
} }
@ -174,8 +191,8 @@ export class UploadStream extends Writable {
*/ */
async commit() { async commit() {
if (this.errored) throw this.error if (this.errored) throw this.error
if (!this.closed) { if (!this.writableFinished) {
let err = Error("attempted to commit file without closing the stream") let err = Error("attempted to commit file when the stream was still unfinished")
this.destroy(err); throw err this.destroy(err); throw err
} }
@ -184,9 +201,9 @@ export class UploadStream extends Writable {
if (!this.name) throw new WebError(400, "no filename provided") if (!this.name) throw new WebError(400, "no filename provided")
if (!this.uploadId) this.setUploadId(generateFileId()) if (!this.uploadId) this.setUploadId(generateFileId())
let ogf = this.files.files[this.uploadId] let ogf = this.files.files[this.uploadId!]
this.files.files[this.uploadId] = { this.files.files[this.uploadId!] = {
filename: this.name, filename: this.name,
mime: this.mime, mime: this.mime,
messageids: this.messages, messageids: this.messages,
@ -203,6 +220,9 @@ export class UploadStream extends Writable {
chunkSize: this.files.config.maxDiscordFileSize chunkSize: this.files.config.maxDiscordFileSize
} }
await this.files.write()
return this.uploadId
} }
// exposed methods // exposed methods
@ -214,6 +234,7 @@ export class UploadStream extends Writable {
return this.destroy( new WebError(400, "filename can be a maximum of 512 characters") ) return this.destroy( new WebError(400, "filename can be a maximum of 512 characters") )
this.name = name; this.name = name;
return this
} }
setType(type: string) { setType(type: string) {
@ -223,6 +244,7 @@ export class UploadStream extends Writable {
return this.destroy( new WebError(400, "mime type can be a maximum of 256 characters") ) return this.destroy( new WebError(400, "mime type can be a maximum of 256 characters") )
this.mime = type; this.mime = type;
return this
} }
setUploadId(id: string) { setUploadId(id: string) {
@ -261,8 +283,9 @@ export class UploadStream extends Writable {
let stream = new Readable({ let stream = new Readable({
read() { read() {
console.log("Read called. Emitting drain") // this is stupid but it should work
wrt.emit("drain") console.log("Read called; calling on server to execute callback")
wrt.emit("exec-callback")
} }
}) })
stream.pause() stream.pause()
@ -272,6 +295,7 @@ export class UploadStream extends Writable {
this.messages.push(message.id) this.messages.push(message.id)
console.log(`Sent: ${message.id}`) console.log(`Sent: ${message.id}`)
this.newmessage_debounce = true this.newmessage_debounce = true
this.emit("debounceReleased")
}) })
return stream return stream
@ -280,12 +304,17 @@ export class UploadStream extends Writable {
private async getNextStream() { private async getNextStream() {
console.log("Getting stream...") console.log("Getting stream...")
console.log("current:" + (this.current ? "yes" : "no"))
if (this.current) return this.current if (this.current) return this.current
else { else if (this.newmessage_debounce) {
// startmessage.... idk // startmessage.... idk
this.current = await this.startMessage(); this.current = await this.startMessage();
console.log("current:" + (this.current ? "yes" : "no"))
return this.current return this.current
} else {
return new Promise((resolve, reject) => {
console.log("Waiting for debounce to be released...")
this.once("debounceReleased", async () => resolve(await this.getNextStream()))
})
} }
} }
} }
@ -523,7 +552,7 @@ export default class Files {
let result = await currentPusher() let result = await currentPusher()
if (result?.streamDone) currentPusher = undefined; if (result?.streamDone) currentPusher = undefined;
return result?.readyForMore return result?.streamDone || result?.readyForMore
} }

View file

@ -100,5 +100,8 @@
/* Completeness */ /* Completeness */
// "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */ // "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */
"skipLibCheck": true /* Skip type checking all .d.ts files. */ "skipLibCheck": true /* Skip type checking all .d.ts files. */
} },
"references": [
{ "path": "../../tsconfig.json" }
]
} }