My goddd you konw what that owrks well enough

This commit is contained in:
split / May 2024-03-06 17:49:23 -08:00
parent 70eace9de4
commit 7df1c8190a
3 changed files with 51 additions and 16 deletions

View file

@ -1,9 +1,9 @@
{ {
"maxDiscordFiles": 500, "maxDiscordFiles": 1000,
"maxDiscordFileSize": 10485760, "maxDiscordFileSize": 10485760,
"targetGuild": "906767804575928390", "targetGuild": "906767804575928390",
"targetChannel": "1024080525993971913", "targetChannel": "1024080525993971913",
"requestTimeout": 1800000, "requestTimeout": 3600000,
"maxUploadIdLength": 30, "maxUploadIdLength": 30,
"accounts": { "accounts": {
"registrationEnabled": true, "registrationEnabled": true,

View file

@ -108,6 +108,7 @@ export class ReadStream extends Readable {
} }
id: number = Math.random() id: number = Math.random()
aborter?: AbortController
constructor(files: Files, pointer: FilePointer, range?: {start: number, end: number}) { constructor(files: Files, pointer: FilePointer, range?: {start: number, end: number}) {
super() super()
@ -140,7 +141,8 @@ export class ReadStream extends Readable {
this.msgIdx = this.ranges.scan_msg_begin this.msgIdx = this.ranges.scan_msg_begin
} }
async _read() { async _read() {/*
console.log("Calling for more data")
if (this.busy) return if (this.busy) return
this.busy = true this.busy = true
let readyForMore = true let readyForMore = true
@ -150,7 +152,14 @@ export class ReadStream extends Readable {
if (result === undefined) return // stream has been destroyed. nothing left to do... if (result === undefined) return // stream has been destroyed. nothing left to do...
readyForMore = result readyForMore = result
} }
this.busy = false this.busy = false*/
this.pushData()
}
async _destroy(error: Error | null, callback: (error?: Error | null | undefined) => void): Promise<void> {
if (this.aborter)
this.aborter.abort()
callback()
} }
async getNextAttachment() { async getNextAttachment() {
@ -166,8 +175,6 @@ export class ReadStream extends Readable {
|| this.msgIdx > this.ranges.scan_msg_end || this.msgIdx > this.ranges.scan_msg_end
) return null ) return null
console.log('passing')
let msg = await this.files.api let msg = await this.files.api
.fetchMessage(this.pointer.messageids[this.msgIdx]) .fetchMessage(this.pointer.messageids[this.msgIdx])
.catch(() => { .catch(() => {
@ -175,7 +182,7 @@ export class ReadStream extends Readable {
}) })
if (msg?.attachments) { if (msg?.attachments) {
let attach = Array.from(msg.attachments.values()) let attach = msg.attachments
this.attachmentBuffer = this.ranges.useRanges ? attach.slice( this.attachmentBuffer = this.ranges.useRanges ? attach.slice(
this.msgIdx == this.ranges.scan_msg_begin this.msgIdx == this.ranges.scan_msg_begin
@ -197,12 +204,21 @@ export class ReadStream extends Readable {
// (words of a girl paranoid from writing readfilestream) // (words of a girl paranoid from writing readfilestream)
let pushToStream = this.push.bind(this) let pushToStream = this.push.bind(this)
let stream = this
return function() { return function() {
//if (pushing) return if (pushing) return
pushing = true pushing = true
return reader.read().then(result => { return reader.read().catch(e => {
// Probably means an AbortError; whatever it is we'll need to abort
if (webStream.locked) reader.releaseLock()
webStream.cancel().catch(e => undefined)
if (!stream.destroyed) stream.destroy()
return e
}).then(result => {
if (result instanceof Error || !result) return result
let pushed let pushed
if (!result.done) { if (!result.done) {
pushing = false pushing = false
@ -214,6 +230,7 @@ export class ReadStream extends Readable {
} }
async getNextChunk() { async getNextChunk() {
console.log("Next chunk requested")
let scanning_chunk = await this.getNextAttachment() let scanning_chunk = await this.getNextAttachment()
if (!scanning_chunk) return null if (!scanning_chunk) return null
@ -236,7 +253,9 @@ export class ReadStream extends Readable {
} }
: {} : {}
let response = await fetch(scanning_chunk.url, {headers}) this.aborter = new AbortController()
let response = await fetch(scanning_chunk.url, {headers, signal: this.aborter.signal})
.catch((e: Error) => { .catch((e: Error) => {
console.error(e) console.error(e)
return {body: e} return {body: e}
@ -247,7 +266,7 @@ export class ReadStream extends Readable {
return response.body return response.body
} }
currentPusher?: (() => Promise<{readyForMore: boolean, streamDone: boolean }> | undefined) currentPusher?: (() => Promise<{readyForMore: boolean, streamDone: boolean } | void> | undefined)
busy: boolean = false busy: boolean = false
async pushData(): Promise<boolean | undefined> { async pushData(): Promise<boolean | undefined> {
@ -256,11 +275,11 @@ export class ReadStream extends Readable {
// let's make one then // let's make one then
if (!this.currentPusher) { if (!this.currentPusher) {
let next = await this.getNextChunk() let next = await this.getNextChunk()
if (next && !(next instanceof Error)) if (next && !(next instanceof Error)) {
// okay, so we have a new chunk // okay, so we have a new chunk
// let's generate a new currentPusher // let's generate a new currentPusher
this.currentPusher = await this.getPusherForWebStream(next) this.currentPusher = await this.getPusherForWebStream(next)
else { } else {
// oops, look like there's an error // oops, look like there's an error
// or the stream has ended. // or the stream has ended.
// let's destroy the stream // let's destroy the stream
@ -273,6 +292,7 @@ export class ReadStream extends Readable {
let result = await this.currentPusher() let result = await this.currentPusher()
if (result?.streamDone) { if (result?.streamDone) {
this.aborter = undefined
this.currentPusher = undefined this.currentPusher = undefined
return this.pushData() return this.pushData()
} else return result?.readyForMore } else return result?.readyForMore
@ -344,9 +364,11 @@ export class UploadStream extends Writable {
callback() callback()
} }
_destroy(error: Error | null, callback: (err?: Error|null) => void) { aborted: boolean = false
async _destroy(error: Error | null, callback: (err?: Error|null) => void) {
this.error = error || undefined this.error = error || undefined
if (error) this.abort() await this.abort()
callback() callback()
} }
@ -354,6 +376,8 @@ export class UploadStream extends Writable {
* @description Cancel & unlock the file. When destroy() is called with a non-WebError, this is automatically called * @description Cancel & unlock the file. When destroy() is called with a non-WebError, this is automatically called
*/ */
async abort() { async abort() {
if (this.aborted) return
this.aborted = true
console.log("Aborting") console.log("Aborting")
if (!this.destroyed) this.destroy() if (!this.destroyed) this.destroy()
if (this.current) this.current.destroy(this.error) if (this.current) this.current.destroy(this.error)

View file

@ -13,6 +13,7 @@ import formidable from "formidable"
import { HttpBindings } from "@hono/node-server" import { HttpBindings } from "@hono/node-server"
import pkg from "../../../../../package.json" assert {type: "json"} import pkg from "../../../../../package.json" assert {type: "json"}
import { type StatusCode } from "hono/utils/http-status" import { type StatusCode } from "hono/utils/http-status"
import { EventEmitter } from "node:events"
export let primaryApi = new Hono<{ export let primaryApi = new Hono<{
Variables: { Variables: {
account: Accounts.Account account: Accounts.Account
@ -87,7 +88,17 @@ export default function (files: Files) {
return files return files
.readFileStream(fileId, range) .readFileStream(fileId, range)
.then(async (stream) => { .then(async (stream) => {
return new Response(Readable.toWeb(stream) as ReadableStream, ctx.res) let rs = new ReadableStream({
start(controller) {
stream.once("end", () => controller.close())
stream.once("error", (err) => controller.error(err))
},
cancel(reason) {
stream.destroy(reason instanceof Error ? reason : new Error(reason))
}
})
stream.pipe(ctx.env.outgoing)
return new Response(rs, ctx.body(null))
}) })
.catch((err) => { .catch((err) => {
return ServeError(ctx, err.status, err.message) return ServeError(ctx, err.status, err.message)