readFileStream now respects backpressure more

This commit is contained in:
May 2023-12-23 15:10:55 -08:00
parent 7d18a60589
commit fd8e143e3b

View file

@ -106,9 +106,8 @@ async function startPushingWebStream(stream: Readable, webStream: ReadableStream
return reader.read().then(result => { return reader.read().then(result => {
if (result.value) if (result.value)
stream.push(result.value) pushing = false
pushing = false return {readyForMore: result.value ? stream.push(result.value) : false, streamDone: result.done }
return result.done
}) })
} }
} }
@ -544,7 +543,7 @@ export default class Files {
let d = await fetch(scanning_chunk.url, {headers}) let d = await fetch(scanning_chunk.url, {headers})
.catch((e: Error) => { .catch((e: Error) => {
console.error(e) console.error(e)
return {body: "__ERR"} return {body: e}
}) })
position++ position++
@ -552,35 +551,50 @@ export default class Files {
return d.body return d.body
} }
let ord: number[] = [] let currentPusher : (() => Promise<{readyForMore: boolean, streamDone: boolean }> | undefined) | undefined
// hopefully this regulates it? let busy = false
let lastChunkSent = true
let pushWS : (stream: Readable) => Promise<boolean | undefined> = async (stream: Readable) => {
// uh oh, we don't have a currentPusher
// let's make one then
if (!currentPusher) {
let next = await getNextChunk()
if (next && !(next instanceof Error))
// okay, so we have a new chunk
// let's generate a new currentPusher
currentPusher = await startPushingWebStream(stream, next)
else {
// oops, look like there's an error
// or the stream has ended.
// let's destroy the stream
stream.destroy(next || undefined)
return
}
}
let result = await currentPusher()
if (result?.streamDone) currentPusher = undefined;
return result?.readyForMore
}
let dataStream = new Readable({ let dataStream = new Readable({
read() { async read() {
if (!lastChunkSent) return
lastChunkSent = false
getNextChunk().then(async (nextChunk) => {
if (typeof nextChunk == "string") {
this.destroy(new Error("file read error"))
return
}
if (!nextChunk) return // EOF if (busy) return
busy = true
let readyForMore = true
let response = await pushWebStream(this, nextChunk) while (readyForMore) {
let result = await pushWS(this)
if (result === undefined) return // stream has been destroyed. nothing left to do...
while (response) { readyForMore = result
let nextChunk = await getNextChunk() }
// idk why this line was below but i moved it on top busy = false
// hopefully it wasn't for some other weird reason
if (!nextChunk || typeof nextChunk == "string") return }
response = await pushWebStream(this, nextChunk)
}
lastChunkSent = true
})
},
}) })
return dataStream return dataStream