i have a hunch hono's streaming api buffers

This commit is contained in:
May 2024-03-05 22:52:46 -08:00
parent 3bedc21d05
commit 4fdada5cc5
5 changed files with 215 additions and 199 deletions

14
package-lock.json generated
View file

@ -22,7 +22,7 @@
"dotenv": "^16.0.2", "dotenv": "^16.0.2",
"express": "^4.18.1", "express": "^4.18.1",
"formidable": "^3.5.1", "formidable": "^3.5.1",
"hono": "^3.8.3", "hono": "^4.0.10",
"multer": "^1.4.5-lts.1", "multer": "^1.4.5-lts.1",
"node-fetch": "^3.3.2", "node-fetch": "^3.3.2",
"nodemailer": "^6.9.3", "nodemailer": "^6.9.3",
@ -1227,9 +1227,9 @@
} }
}, },
"node_modules/hono": { "node_modules/hono": {
"version": "3.8.3", "version": "4.0.10",
"resolved": "https://registry.npmjs.org/hono/-/hono-3.8.3.tgz", "resolved": "https://registry.npmjs.org/hono/-/hono-4.0.10.tgz",
"integrity": "sha512-NLJgUCKKMvijBy+V+U1FQTsNwHk2bD1KGlWJA9+qaCNWgx5St9bhfQwxrpcTGvG2Gi2naemTWCzBavDNXOqO6Q==", "integrity": "sha512-sq0RFAC3Ij+bkhZu90EGAQnVI1EhohRsjo9BU+BjXLbC71GSy41JjsFqCeg8MRpO2Gdu0A4MXF5licO89tn/rw==",
"engines": { "engines": {
"node": ">=16.0.0" "node": ">=16.0.0"
} }
@ -2828,9 +2828,9 @@
"integrity": "sha512-QFLV0taWQOZtvIRIAdBChesmogZrtuXvVWsFHZTk2SU+anspqZ2vMnoLg7IE1+Uk16N19APic1BuF8bC8c2m5g==" "integrity": "sha512-QFLV0taWQOZtvIRIAdBChesmogZrtuXvVWsFHZTk2SU+anspqZ2vMnoLg7IE1+Uk16N19APic1BuF8bC8c2m5g=="
}, },
"hono": { "hono": {
"version": "3.8.3", "version": "4.0.10",
"resolved": "https://registry.npmjs.org/hono/-/hono-3.8.3.tgz", "resolved": "https://registry.npmjs.org/hono/-/hono-4.0.10.tgz",
"integrity": "sha512-NLJgUCKKMvijBy+V+U1FQTsNwHk2bD1KGlWJA9+qaCNWgx5St9bhfQwxrpcTGvG2Gi2naemTWCzBavDNXOqO6Q==" "integrity": "sha512-sq0RFAC3Ij+bkhZu90EGAQnVI1EhohRsjo9BU+BjXLbC71GSy41JjsFqCeg8MRpO2Gdu0A4MXF5licO89tn/rw=="
}, },
"http-errors": { "http-errors": {
"version": "2.0.0", "version": "2.0.0",

View file

@ -31,7 +31,7 @@
"dotenv": "^16.0.2", "dotenv": "^16.0.2",
"express": "^4.18.1", "express": "^4.18.1",
"formidable": "^3.5.1", "formidable": "^3.5.1",
"hono": "^3.8.3", "hono": "^4.0.10",
"multer": "^1.4.5-lts.1", "multer": "^1.4.5-lts.1",
"node-fetch": "^3.3.2", "node-fetch": "^3.3.2",
"nodemailer": "^6.9.3", "nodemailer": "^6.9.3",

View file

@ -1,5 +1,6 @@
import { readFile } from "fs/promises" import { readFile } from "fs/promises"
import type { Context } from "hono" import type { Context } from "hono"
import type { StatusCode } from "hono/utils/http-status"
let errorPage: string let errorPage: string
@ -27,7 +28,7 @@ export default async function ServeError(
errorPage errorPage
.replaceAll("$code", code.toString()) .replaceAll("$code", code.toString())
.replaceAll("$text", reason), .replaceAll("$text", reason),
code, code as StatusCode,
{ {
"x-backup-status-message": reason, // glitch default nginx configuration "x-backup-status-message": reason, // glitch default nginx configuration
} }

View file

@ -77,23 +77,6 @@ export interface StatusCodeError {
message: string message: string
} }
async function startPushingWebStream(stream: Readable, webStream: ReadableStream) {
const reader = await webStream.getReader()
let pushing = false // acts as a debounce just in case
// (words of a girl paranoid from writing readfilestream)
return function() {
if (pushing) return
pushing = true
return reader.read().then(result => {
if (result.value)
pushing = false
return {readyForMore: result.value ? stream.push(result.value) : false, streamDone: result.done }
})
}
}
export class WebError extends Error { export class WebError extends Error {
readonly statusCode: number = 500 readonly statusCode: number = 500
@ -105,6 +88,198 @@ export class WebError extends Error {
} }
export class ReadStream extends Readable {
files: Files
pointer: FilePointer
attachmentBuffer: APIAttachment[] = []
msgIdx: number = 0
position: number = 0
ranges: {
useRanges: boolean,
byteStart: number,
byteEnd: number
scan_msg_begin: number,
scan_msg_end: number,
scan_files_begin: number,
scan_files_end: number
}
id: number = Math.random()
constructor(files: Files, pointer: FilePointer, range?: {start: number, end: number}) {
super()
console.log(this.id, range)
this.files = files
this.pointer = pointer
let useRanges =
Boolean(range && pointer.chunkSize && pointer.sizeInBytes)
this.ranges = {
useRanges,
scan_msg_begin: 0,
scan_msg_end: pointer.messageids.length - 1,
scan_files_begin:
useRanges
? Math.floor(range!.start / pointer.chunkSize!)
: 0,
scan_files_end:
useRanges
? Math.ceil(range!.end / pointer.chunkSize!) - 1
: -1,
byteStart: range?.start || 0,
byteEnd: range?.end || 0
}
if (useRanges)
this.ranges.scan_msg_begin = Math.floor(this.ranges.scan_files_begin / 10),
this.ranges.scan_msg_end = Math.ceil(this.ranges.scan_files_end / 10)-1,
this.msgIdx = this.ranges.scan_msg_begin
}
async _read() {
if (this.busy) return
this.busy = true
let readyForMore = true
while (readyForMore) {
let result = await this.pushData()
if (result === undefined) return // stream has been destroyed. nothing left to do...
readyForMore = result
}
this.busy = false
}
async getNextAttachment() {
// return first in our attachment buffer
let ret = this.attachmentBuffer.splice(0,1)[0]
if (ret) return ret
console.log(this.id, this.msgIdx, this.ranges.scan_msg_end, this.pointer.messageids[this.msgIdx])
// oh, there's none left. let's fetch a new message, then.
if (
!this.pointer.messageids[this.msgIdx]
|| this.msgIdx > this.ranges.scan_msg_end
) return null
console.log('passing')
let msg = await this.files.api
.fetchMessage(this.pointer.messageids[this.msgIdx])
.catch(() => {
return null
})
if (msg?.attachments) {
let attach = Array.from(msg.attachments.values())
this.attachmentBuffer = this.ranges.useRanges ? attach.slice(
this.msgIdx == this.ranges.scan_msg_begin
? this.ranges.scan_files_begin - this.ranges.scan_msg_begin * 10
: 0,
this.msgIdx == this.ranges.scan_msg_end
? this.ranges.scan_files_end - this.ranges.scan_msg_end * 10 + 1
: attach.length
) : attach
}
this.msgIdx++
return this.attachmentBuffer.splice(0,1)[0]
}
async getPusherForWebStream(webStream: ReadableStream) {
const reader = await webStream.getReader()
let pushing = false // acts as a debounce just in case
// (words of a girl paranoid from writing readfilestream)
let pushToStream = this.push.bind(this)
return function() {
//if (pushing) return
pushing = true
return reader.read().then(result => {
let pushed
if (!result.done) {
pushing = false
pushed = pushToStream(result.value)
}
return {readyForMore: pushed || false, streamDone: result.done }
})
}
}
async getNextChunk() {
let scanning_chunk = await this.getNextAttachment()
if (!scanning_chunk) return null
let {
byteStart, byteEnd, scan_files_begin, scan_files_end, scan_msg_begin, scan_msg_end
} = this.ranges
let headers: HeadersInit =
this.ranges.useRanges
? {
Range: `bytes=${
this.position == 0
? byteStart - scan_files_begin * this.pointer.chunkSize!
: "0"
}-${
this.attachmentBuffer.length == 0 && this.msgIdx == scan_files_end
? byteEnd - scan_files_end * this.pointer.chunkSize!
: ""
}`,
}
: {}
let response = await fetch(scanning_chunk.url, {headers})
.catch((e: Error) => {
console.error(e)
return {body: e}
})
this.position++
return response.body
}
currentPusher?: (() => Promise<{readyForMore: boolean, streamDone: boolean }> | undefined)
busy: boolean = false
async pushData(): Promise<boolean | undefined> {
// uh oh, we don't have a currentPusher
// let's make one then
if (!this.currentPusher) {
let next = await this.getNextChunk()
if (next && !(next instanceof Error))
// okay, so we have a new chunk
// let's generate a new currentPusher
this.currentPusher = await this.getPusherForWebStream(next)
else {
// oops, look like there's an error
// or the stream has ended.
// let's destroy the stream
console.log(this.id, "Ending", next)
if (next) this.destroy(next); else this.push(null)
return
}
}
let result = await this.currentPusher()
if (result?.streamDone) {
this.currentPusher = undefined
return this.pushData()
} else return result?.readyForMore
}
}
export class UploadStream extends Writable { export class UploadStream extends Writable {
uploadId?: string uploadId?: string
@ -351,35 +526,6 @@ export default class Files {
.catch(console.error) .catch(console.error)
} }
validateUpload(metadata: FileUploadSettings & { size : number, uploadId: string }) {
return multiAssert(
new Map()
.set(!metadata.filename, {status: 400, message: "missing filename"})
.set(metadata.filename.length > 128, {status: 400, message: "filename too long"})
.set(!metadata.mime, {status: 400, message: "missing mime type"})
.set(metadata.mime.length > 128, {status: 400, message: "mime type too long"})
.set(
metadata.uploadId.match(id_check_regex)?.[0] != metadata.uploadId
|| metadata.uploadId.length > this.config.maxUploadIdLength,
{ status: 400, message: "invalid file ID" }
)
.set(
this.files[metadata.uploadId] &&
(metadata.owner
? this.files[metadata.uploadId].owner != metadata.owner
: true),
{ status: 403, message: "you don't own this file" }
)
.set(
this.files[metadata.uploadId]?.reserved,
{
status: 400,
message: "already uploading this file. if your file is stuck in this state, contact an administrator"
}
)
)
}
createWriteStream(owner?: string) { createWriteStream(owner?: string) {
return new UploadStream(this, owner) return new UploadStream(this, owner)
} }
@ -435,144 +581,11 @@ export default class Files {
async readFileStream( async readFileStream(
uploadId: string, uploadId: string,
range?: { start: number; end: number } range?: { start: number; end: number }
): Promise<Readable> { ): Promise<ReadStream> {
if (this.files[uploadId]) { if (this.files[uploadId]) {
let file = this.files[uploadId] let file = this.files[uploadId]
if (!file.sizeInBytes || !file.chunkSize) await this.update(uploadId) if (!file.sizeInBytes || !file.chunkSize) await this.update(uploadId)
return new ReadStream(this, file, range)
let scan_msg_begin = 0,
scan_msg_end = file.messageids.length - 1,
scan_files_begin = 0,
scan_files_end = -1
let useRanges = range && file.chunkSize && file.sizeInBytes
// todo: figure out how to get typesccript to accept useRanges
// i'm too tired to look it up or write whatever it wnats me to do
if (useRanges) {
// Calculate where to start file scans...
scan_files_begin = Math.floor(range!.start / file.chunkSize!)
scan_files_end = Math.ceil(range!.end / file.chunkSize!) - 1
scan_msg_begin = Math.floor(scan_files_begin / 10)
scan_msg_end = Math.ceil(scan_files_end / 10)
}
let attachments: APIAttachment[] = []
let msgIdx = scan_msg_begin
let getNextAttachment = async () => {
// return first in our attachment buffer
let ret = attachments.splice(0,1)[0]
if (ret) return ret
// oh, there's none left. let's fetch a new message, then.
if (!file.messageids[msgIdx] || msgIdx > scan_msg_end) return null
let msg = await this.api
.fetchMessage(file.messageids[msgIdx])
.catch(() => {
return null
})
if (msg?.attachments) {
let attach = Array.from(msg.attachments.values())
attachments = useRanges ? attach.slice(
msgIdx == scan_msg_begin
? scan_files_begin - scan_msg_begin * 10
: 0,
msgIdx == scan_msg_end
? scan_files_end - scan_msg_end * 10 + 1
: attach.length
) : attach
console.log(attachments)
}
msgIdx++
return attachments.splice(0,1)[0]
}
let position = 0
let getNextChunk = async () => {
let scanning_chunk = await getNextAttachment()
if (!scanning_chunk) return null
let headers: HeadersInit =
useRanges
? {
Range: `bytes=${
position == 0
? range!.start - scan_files_begin * file.chunkSize!
: "0"
}-${
position == attachments.length - 1
? range!.end - scan_files_end * file.chunkSize!
: ""
}`,
}
: {}
let response = await fetch(scanning_chunk.url, {headers})
.catch((e: Error) => {
console.error(e)
return {body: e}
})
position++
return response.body
}
let currentPusher : (() => Promise<{readyForMore: boolean, streamDone: boolean }> | undefined) | undefined
let busy = false
let pushWS : (stream: Readable) => Promise<boolean | undefined> = async (stream: Readable) => {
// uh oh, we don't have a currentPusher
// let's make one then
if (!currentPusher) {
let next = await getNextChunk()
if (next && !(next instanceof Error))
// okay, so we have a new chunk
// let's generate a new currentPusher
currentPusher = await startPushingWebStream(stream, next)
else {
// oops, look like there's an error
// or the stream has ended.
// let's destroy the stream
if (next) stream.destroy(next); else stream.push(null)
return
}
}
let result = await currentPusher()
if (result?.streamDone) currentPusher = undefined;
return result?.streamDone || result?.readyForMore
}
let dataStream = new Readable({
async read() {
if (busy) return
busy = true
let readyForMore = true
while (readyForMore) {
let result = await pushWS(this)
if (result === undefined) return // stream has been destroyed. nothing left to do...
readyForMore = result
}
busy = false
}
})
return dataStream
} else { } else {
throw { status: 404, message: "not found" } throw { status: 404, message: "not found" }
} }

View file

@ -1,6 +1,6 @@
import bodyParser from "body-parser" import bodyParser from "body-parser"
import { Hono } from "hono" import { Hono } from "hono"
import {stream as startHonoStream} from "hono/streaming"
import * as Accounts from "../../../lib/accounts.js" import * as Accounts from "../../../lib/accounts.js"
import * as auth from "../../../lib/auth.js" import * as auth from "../../../lib/auth.js"
import RangeParser, { type Range } from "range-parser" import RangeParser, { type Range } from "range-parser"
@ -12,6 +12,7 @@ import {ReadableStream as StreamWebReadable} from "node:stream/web"
import formidable from "formidable" import formidable from "formidable"
import { HttpBindings } from "@hono/node-server" import { HttpBindings } from "@hono/node-server"
import pkg from "../../../../../package.json" assert {type: "json"} import pkg from "../../../../../package.json" assert {type: "json"}
import { type StatusCode } from "hono/utils/http-status"
export let primaryApi = new Hono<{ export let primaryApi = new Hono<{
Variables: { Variables: {
account: Accounts.Account account: Accounts.Account
@ -68,8 +69,6 @@ export default function (files: Files) {
} }
} }
console.log(range)
return files return files
.readFileStream(fileId, range) .readFileStream(fileId, range)
.then(async (stream) => { .then(async (stream) => {
@ -85,8 +84,11 @@ export default function (files: Files) {
) )
} }
return ctx.req.method == "HEAD" ? ctx.body(null) : ctx.stream(async (webStream) => { return ctx.req.method == "HEAD" ? ctx.body(null) : startHonoStream(ctx, async (webStream) => {
webStream.pipe(Readable.toWeb(stream.on("error", e => {})) as ReadableStream).catch(e => {}) await webStream.pipe(Readable.toWeb(stream) as ReadableStream)
}, async (err, webStream) => {
console.error(err)
await webStream.close()
}) })
}) })
.catch((err) => { .catch((err) => {
@ -111,9 +113,9 @@ export default function (files: Files) {
errEscalated = true errEscalated = true
if ("httpCode" in err) if ("httpCode" in err)
ctx.status(err.httpCode as number) ctx.status(err.httpCode as StatusCode)
else if (err instanceof WebError) else if (err instanceof WebError)
ctx.status(err.statusCode) ctx.status(err.statusCode as StatusCode)
else ctx.status(400) else ctx.status(400)
resolve(ctx.body(err.message)) resolve(ctx.body(err.message))
} }
@ -207,7 +209,7 @@ export default function (files: Files) {
Readable.fromWeb(res.body as StreamWebReadable) Readable.fromWeb(res.body as StreamWebReadable)
.pipe(file) .pipe(file)
.on("error", (err) => resolve(ctx.text(err.message, err instanceof WebError ? err.statusCode : 500))) .on("error", (err) => resolve(ctx.text(err.message, err instanceof WebError ? err.statusCode as StatusCode : 500)))
file file
.setName( .setName(
@ -222,7 +224,7 @@ export default function (files: Files) {
file.once("finish", () => { file.once("finish", () => {
file.commit() file.commit()
.then(id => resolve(ctx.text(id!))) .then(id => resolve(ctx.text(id!)))
.catch((err) => resolve(ctx.text(err.message, err instanceof WebError ? err.statusCode : 500))) .catch((err) => resolve(ctx.text(err.message, err instanceof WebError ? err.statusCode as StatusCode : 500)))
}) })
}) })