From 4fdada5cc5fc5ce058f384a6c1aff5ec7b8bcdd5 Mon Sep 17 00:00:00 2001 From: stringsplit <77242831+nbitzz@users.noreply.github.com> Date: Tue, 5 Mar 2024 22:52:46 -0800 Subject: [PATCH] i have a hunch hono's streaming api buffers --- package-lock.json | 14 +- package.json | 2 +- src/server/lib/errors.ts | 3 +- src/server/lib/files.ts | 375 +++++++++++++------------ src/server/routes/api/v0/primaryApi.ts | 20 +- 5 files changed, 215 insertions(+), 199 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2a9c518..00663eb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,7 +22,7 @@ "dotenv": "^16.0.2", "express": "^4.18.1", "formidable": "^3.5.1", - "hono": "^3.8.3", + "hono": "^4.0.10", "multer": "^1.4.5-lts.1", "node-fetch": "^3.3.2", "nodemailer": "^6.9.3", @@ -1227,9 +1227,9 @@ } }, "node_modules/hono": { - "version": "3.8.3", - "resolved": "https://registry.npmjs.org/hono/-/hono-3.8.3.tgz", - "integrity": "sha512-NLJgUCKKMvijBy+V+U1FQTsNwHk2bD1KGlWJA9+qaCNWgx5St9bhfQwxrpcTGvG2Gi2naemTWCzBavDNXOqO6Q==", + "version": "4.0.10", + "resolved": "https://registry.npmjs.org/hono/-/hono-4.0.10.tgz", + "integrity": "sha512-sq0RFAC3Ij+bkhZu90EGAQnVI1EhohRsjo9BU+BjXLbC71GSy41JjsFqCeg8MRpO2Gdu0A4MXF5licO89tn/rw==", "engines": { "node": ">=16.0.0" } @@ -2828,9 +2828,9 @@ "integrity": "sha512-QFLV0taWQOZtvIRIAdBChesmogZrtuXvVWsFHZTk2SU+anspqZ2vMnoLg7IE1+Uk16N19APic1BuF8bC8c2m5g==" }, "hono": { - "version": "3.8.3", - "resolved": "https://registry.npmjs.org/hono/-/hono-3.8.3.tgz", - "integrity": "sha512-NLJgUCKKMvijBy+V+U1FQTsNwHk2bD1KGlWJA9+qaCNWgx5St9bhfQwxrpcTGvG2Gi2naemTWCzBavDNXOqO6Q==" + "version": "4.0.10", + "resolved": "https://registry.npmjs.org/hono/-/hono-4.0.10.tgz", + "integrity": "sha512-sq0RFAC3Ij+bkhZu90EGAQnVI1EhohRsjo9BU+BjXLbC71GSy41JjsFqCeg8MRpO2Gdu0A4MXF5licO89tn/rw==" }, "http-errors": { "version": "2.0.0", diff --git a/package.json b/package.json index 86dd14f..ac81e1e 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "dotenv": "^16.0.2", "express": "^4.18.1", "formidable": "^3.5.1", - "hono": "^3.8.3", + "hono": "^4.0.10", "multer": "^1.4.5-lts.1", "node-fetch": "^3.3.2", "nodemailer": "^6.9.3", diff --git a/src/server/lib/errors.ts b/src/server/lib/errors.ts index e31c931..f2d3615 100644 --- a/src/server/lib/errors.ts +++ b/src/server/lib/errors.ts @@ -1,5 +1,6 @@ import { readFile } from "fs/promises" import type { Context } from "hono" +import type { StatusCode } from "hono/utils/http-status" let errorPage: string @@ -27,7 +28,7 @@ export default async function ServeError( errorPage .replaceAll("$code", code.toString()) .replaceAll("$text", reason), - code, + code as StatusCode, { "x-backup-status-message": reason, // glitch default nginx configuration } diff --git a/src/server/lib/files.ts b/src/server/lib/files.ts index d740b29..002be9c 100644 --- a/src/server/lib/files.ts +++ b/src/server/lib/files.ts @@ -77,23 +77,6 @@ export interface StatusCodeError { message: string } -async function startPushingWebStream(stream: Readable, webStream: ReadableStream) { - const reader = await webStream.getReader() - let pushing = false // acts as a debounce just in case - // (words of a girl paranoid from writing readfilestream) - - return function() { - if (pushing) return - pushing = true - - return reader.read().then(result => { - if (result.value) - pushing = false - return {readyForMore: result.value ? stream.push(result.value) : false, streamDone: result.done } - }) - } -} - export class WebError extends Error { readonly statusCode: number = 500 @@ -105,6 +88,198 @@ export class WebError extends Error { } +export class ReadStream extends Readable { + + files: Files + pointer: FilePointer + + attachmentBuffer: APIAttachment[] = [] + msgIdx: number = 0 + position: number = 0 + + ranges: { + useRanges: boolean, + byteStart: number, + byteEnd: number + scan_msg_begin: number, + scan_msg_end: number, + scan_files_begin: number, + scan_files_end: number + } + + id: number = Math.random() + + constructor(files: Files, pointer: FilePointer, range?: {start: number, end: number}) { + super() + console.log(this.id, range) + this.files = files + this.pointer = pointer + + let useRanges = + Boolean(range && pointer.chunkSize && pointer.sizeInBytes) + + this.ranges = { + useRanges, + scan_msg_begin: 0, + scan_msg_end: pointer.messageids.length - 1, + scan_files_begin: + useRanges + ? Math.floor(range!.start / pointer.chunkSize!) + : 0, + scan_files_end: + useRanges + ? Math.ceil(range!.end / pointer.chunkSize!) - 1 + : -1, + byteStart: range?.start || 0, + byteEnd: range?.end || 0 + } + + if (useRanges) + this.ranges.scan_msg_begin = Math.floor(this.ranges.scan_files_begin / 10), + this.ranges.scan_msg_end = Math.ceil(this.ranges.scan_files_end / 10)-1, + this.msgIdx = this.ranges.scan_msg_begin + } + + async _read() { + if (this.busy) return + this.busy = true + let readyForMore = true + + while (readyForMore) { + let result = await this.pushData() + if (result === undefined) return // stream has been destroyed. nothing left to do... + readyForMore = result + } + this.busy = false + } + + async getNextAttachment() { + // return first in our attachment buffer + let ret = this.attachmentBuffer.splice(0,1)[0] + if (ret) return ret + + console.log(this.id, this.msgIdx, this.ranges.scan_msg_end, this.pointer.messageids[this.msgIdx]) + + // oh, there's none left. let's fetch a new message, then. + if ( + !this.pointer.messageids[this.msgIdx] + || this.msgIdx > this.ranges.scan_msg_end + ) return null + + console.log('passing') + + let msg = await this.files.api + .fetchMessage(this.pointer.messageids[this.msgIdx]) + .catch(() => { + return null + }) + + if (msg?.attachments) { + let attach = Array.from(msg.attachments.values()) + + this.attachmentBuffer = this.ranges.useRanges ? attach.slice( + this.msgIdx == this.ranges.scan_msg_begin + ? this.ranges.scan_files_begin - this.ranges.scan_msg_begin * 10 + : 0, + this.msgIdx == this.ranges.scan_msg_end + ? this.ranges.scan_files_end - this.ranges.scan_msg_end * 10 + 1 + : attach.length + ) : attach + } + + this.msgIdx++ + return this.attachmentBuffer.splice(0,1)[0] + } + + async getPusherForWebStream(webStream: ReadableStream) { + const reader = await webStream.getReader() + let pushing = false // acts as a debounce just in case + // (words of a girl paranoid from writing readfilestream) + + let pushToStream = this.push.bind(this) + + return function() { + //if (pushing) return + pushing = true + + return reader.read().then(result => { + let pushed + if (!result.done) { + pushing = false + pushed = pushToStream(result.value) + } + return {readyForMore: pushed || false, streamDone: result.done } + }) + } + } + + async getNextChunk() { + let scanning_chunk = await this.getNextAttachment() + if (!scanning_chunk) return null + + let { + byteStart, byteEnd, scan_files_begin, scan_files_end, scan_msg_begin, scan_msg_end + } = this.ranges + + let headers: HeadersInit = + this.ranges.useRanges + ? { + Range: `bytes=${ + this.position == 0 + ? byteStart - scan_files_begin * this.pointer.chunkSize! + : "0" + }-${ + this.attachmentBuffer.length == 0 && this.msgIdx == scan_files_end + ? byteEnd - scan_files_end * this.pointer.chunkSize! + : "" + }`, + } + : {} + + let response = await fetch(scanning_chunk.url, {headers}) + .catch((e: Error) => { + console.error(e) + return {body: e} + }) + + this.position++ + + return response.body + } + + currentPusher?: (() => Promise<{readyForMore: boolean, streamDone: boolean }> | undefined) + busy: boolean = false + + async pushData(): Promise { + + // uh oh, we don't have a currentPusher + // let's make one then + if (!this.currentPusher) { + let next = await this.getNextChunk() + if (next && !(next instanceof Error)) + // okay, so we have a new chunk + // let's generate a new currentPusher + this.currentPusher = await this.getPusherForWebStream(next) + else { + // oops, look like there's an error + // or the stream has ended. + // let's destroy the stream + console.log(this.id, "Ending", next) + if (next) this.destroy(next); else this.push(null) + return + } + } + + let result = await this.currentPusher() + + if (result?.streamDone) { + this.currentPusher = undefined + return this.pushData() + } else return result?.readyForMore + + } +} + export class UploadStream extends Writable { uploadId?: string @@ -351,35 +526,6 @@ export default class Files { .catch(console.error) } - validateUpload(metadata: FileUploadSettings & { size : number, uploadId: string }) { - return multiAssert( - new Map() - .set(!metadata.filename, {status: 400, message: "missing filename"}) - .set(metadata.filename.length > 128, {status: 400, message: "filename too long"}) - .set(!metadata.mime, {status: 400, message: "missing mime type"}) - .set(metadata.mime.length > 128, {status: 400, message: "mime type too long"}) - .set( - metadata.uploadId.match(id_check_regex)?.[0] != metadata.uploadId - || metadata.uploadId.length > this.config.maxUploadIdLength, - { status: 400, message: "invalid file ID" } - ) - .set( - this.files[metadata.uploadId] && - (metadata.owner - ? this.files[metadata.uploadId].owner != metadata.owner - : true), - { status: 403, message: "you don't own this file" } - ) - .set( - this.files[metadata.uploadId]?.reserved, - { - status: 400, - message: "already uploading this file. if your file is stuck in this state, contact an administrator" - } - ) - ) - } - createWriteStream(owner?: string) { return new UploadStream(this, owner) } @@ -435,144 +581,11 @@ export default class Files { async readFileStream( uploadId: string, range?: { start: number; end: number } - ): Promise { + ): Promise { if (this.files[uploadId]) { let file = this.files[uploadId] if (!file.sizeInBytes || !file.chunkSize) await this.update(uploadId) - - let scan_msg_begin = 0, - scan_msg_end = file.messageids.length - 1, - scan_files_begin = 0, - scan_files_end = -1 - - let useRanges = range && file.chunkSize && file.sizeInBytes - - // todo: figure out how to get typesccript to accept useRanges - // i'm too tired to look it up or write whatever it wnats me to do - if (useRanges) { - // Calculate where to start file scans... - - scan_files_begin = Math.floor(range!.start / file.chunkSize!) - scan_files_end = Math.ceil(range!.end / file.chunkSize!) - 1 - - scan_msg_begin = Math.floor(scan_files_begin / 10) - scan_msg_end = Math.ceil(scan_files_end / 10) - } - - let attachments: APIAttachment[] = [] - - let msgIdx = scan_msg_begin - - let getNextAttachment = async () => { - // return first in our attachment buffer - let ret = attachments.splice(0,1)[0] - if (ret) return ret - - // oh, there's none left. let's fetch a new message, then. - if (!file.messageids[msgIdx] || msgIdx > scan_msg_end) return null - let msg = await this.api - .fetchMessage(file.messageids[msgIdx]) - .catch(() => { - return null - }) - - if (msg?.attachments) { - let attach = Array.from(msg.attachments.values()) - - attachments = useRanges ? attach.slice( - msgIdx == scan_msg_begin - ? scan_files_begin - scan_msg_begin * 10 - : 0, - msgIdx == scan_msg_end - ? scan_files_end - scan_msg_end * 10 + 1 - : attach.length - ) : attach - console.log(attachments) - } - - msgIdx++ - return attachments.splice(0,1)[0] - } - - let position = 0 - - let getNextChunk = async () => { - let scanning_chunk = await getNextAttachment() - if (!scanning_chunk) return null - - let headers: HeadersInit = - useRanges - ? { - Range: `bytes=${ - position == 0 - ? range!.start - scan_files_begin * file.chunkSize! - : "0" - }-${ - position == attachments.length - 1 - ? range!.end - scan_files_end * file.chunkSize! - : "" - }`, - } - : {} - - let response = await fetch(scanning_chunk.url, {headers}) - .catch((e: Error) => { - console.error(e) - return {body: e} - }) - - position++ - - return response.body - } - - let currentPusher : (() => Promise<{readyForMore: boolean, streamDone: boolean }> | undefined) | undefined - let busy = false - - let pushWS : (stream: Readable) => Promise = async (stream: Readable) => { - - // uh oh, we don't have a currentPusher - // let's make one then - if (!currentPusher) { - let next = await getNextChunk() - if (next && !(next instanceof Error)) - // okay, so we have a new chunk - // let's generate a new currentPusher - currentPusher = await startPushingWebStream(stream, next) - else { - // oops, look like there's an error - // or the stream has ended. - // let's destroy the stream - if (next) stream.destroy(next); else stream.push(null) - return - } - } - - let result = await currentPusher() - - if (result?.streamDone) currentPusher = undefined; - return result?.streamDone || result?.readyForMore - - } - - let dataStream = new Readable({ - async read() { - - if (busy) return - busy = true - let readyForMore = true - - while (readyForMore) { - let result = await pushWS(this) - if (result === undefined) return // stream has been destroyed. nothing left to do... - readyForMore = result - } - busy = false - - } - }) - - return dataStream + return new ReadStream(this, file, range) } else { throw { status: 404, message: "not found" } } diff --git a/src/server/routes/api/v0/primaryApi.ts b/src/server/routes/api/v0/primaryApi.ts index 183e30c..3e6376e 100644 --- a/src/server/routes/api/v0/primaryApi.ts +++ b/src/server/routes/api/v0/primaryApi.ts @@ -1,6 +1,6 @@ import bodyParser from "body-parser" import { Hono } from "hono" - +import {stream as startHonoStream} from "hono/streaming" import * as Accounts from "../../../lib/accounts.js" import * as auth from "../../../lib/auth.js" import RangeParser, { type Range } from "range-parser" @@ -12,6 +12,7 @@ import {ReadableStream as StreamWebReadable} from "node:stream/web" import formidable from "formidable" import { HttpBindings } from "@hono/node-server" import pkg from "../../../../../package.json" assert {type: "json"} +import { type StatusCode } from "hono/utils/http-status" export let primaryApi = new Hono<{ Variables: { account: Accounts.Account @@ -68,8 +69,6 @@ export default function (files: Files) { } } - console.log(range) - return files .readFileStream(fileId, range) .then(async (stream) => { @@ -85,8 +84,11 @@ export default function (files: Files) { ) } - return ctx.req.method == "HEAD" ? ctx.body(null) : ctx.stream(async (webStream) => { - webStream.pipe(Readable.toWeb(stream.on("error", e => {})) as ReadableStream).catch(e => {}) + return ctx.req.method == "HEAD" ? ctx.body(null) : startHonoStream(ctx, async (webStream) => { + await webStream.pipe(Readable.toWeb(stream) as ReadableStream) + }, async (err, webStream) => { + console.error(err) + await webStream.close() }) }) .catch((err) => { @@ -111,9 +113,9 @@ export default function (files: Files) { errEscalated = true if ("httpCode" in err) - ctx.status(err.httpCode as number) + ctx.status(err.httpCode as StatusCode) else if (err instanceof WebError) - ctx.status(err.statusCode) + ctx.status(err.statusCode as StatusCode) else ctx.status(400) resolve(ctx.body(err.message)) } @@ -207,7 +209,7 @@ export default function (files: Files) { Readable.fromWeb(res.body as StreamWebReadable) .pipe(file) - .on("error", (err) => resolve(ctx.text(err.message, err instanceof WebError ? err.statusCode : 500))) + .on("error", (err) => resolve(ctx.text(err.message, err instanceof WebError ? err.statusCode as StatusCode : 500))) file .setName( @@ -222,7 +224,7 @@ export default function (files: Files) { file.once("finish", () => { file.commit() .then(id => resolve(ctx.text(id!))) - .catch((err) => resolve(ctx.text(err.message, err instanceof WebError ? err.statusCode : 500))) + .catch((err) => resolve(ctx.text(err.message, err instanceof WebError ? err.statusCode as StatusCode : 500))) }) })