hopefully implement backpressure

This commit is contained in:
May 2023-06-24 17:34:12 -07:00
parent 81378fc0dc
commit 486fb6912e

View file

@ -275,12 +275,6 @@ export default class Files {
if (this.files[uploadId]) { if (this.files[uploadId]) {
let file = this.files[uploadId] let file = this.files[uploadId]
let dataStream = new Readable({
read(){}
})
resolve(dataStream)
let let
scan_msg_begin = 0, scan_msg_begin = 0,
scan_msg_end = file.messageids.length-1, scan_msg_end = file.messageids.length-1,
@ -303,6 +297,8 @@ export default class Files {
} }
let attachments: Discord.Attachment[] = [];
for (let xi = scan_msg_begin; xi < scan_msg_end+1; xi++) { for (let xi = scan_msg_begin; xi < scan_msg_end+1; xi++) {
let msg = await this.uploadChannel.messages.fetch(file.messageids[xi]).catch(() => {return null}) let msg = await this.uploadChannel.messages.fetch(file.messageids[xi]).catch(() => {return null})
@ -311,25 +307,7 @@ export default class Files {
let attach = Array.from(msg.attachments.values()) let attach = Array.from(msg.attachments.values())
for (let i = (useRanges && xi == scan_msg_begin ? ( scan_files_begin - (xi*10) ) : 0); i < (useRanges && xi == scan_msg_end ? ( scan_files_end - (xi*10) + 1 ) : attach.length); i++) { for (let i = (useRanges && xi == scan_msg_begin ? ( scan_files_begin - (xi*10) ) : 0); i < (useRanges && xi == scan_msg_end ? ( scan_files_end - (xi*10) + 1 ) : attach.length); i++) {
let d = await axios.get( attachments.push(attach[i])
attach[i].url,
{
responseType:"arraybuffer",
headers: {
...(useRanges ? {
"Range": `bytes=${i+(xi*10) == scan_files_begin && range && file.chunkSize ? range.start-(scan_files_begin*file.chunkSize) : "0"}-${i+(xi*10) == scan_files_end && range && file.chunkSize ? range.end-(scan_files_end*file.chunkSize) : ""}`
} : {})
}
}
).catch((e:Error) => {console.error(e)})
if (d) {
dataStream.push(d.data)
} else {
reject({status:500,message:"internal server error"})
dataStream.destroy(new Error("file read error"))
return
}
} }
@ -337,7 +315,50 @@ export default class Files {
} }
dataStream.push(null) let position = 0;
let getNextChunk = async () => {
let scanning_chunk = attachments[position]
if (!scanning_chunk) {
return null
}
let d = await axios.get(
scanning_chunk.url,
{
responseType:"arraybuffer",
headers: {
...(useRanges ? {
"Range": `bytes=${position == 0 && range && file.chunkSize ? range.start-(scan_files_begin*file.chunkSize) : "0"}-${position == attachments.length-1 && range && file.chunkSize ? range.end-(scan_files_end*file.chunkSize) : ""}`
} : {})
}
}
).catch((e:Error) => {console.error(e)})
position++;
if (d) {
return d.data
} else {
reject({status:500,message:"internal server error"})
return "__ERR"
}
}
let dataStream = new Readable({
read(){
getNextChunk().then(async (nextChunk) => {
if (nextChunk == "__ERR") {this.destroy(new Error("file read error")); return}
let response = this.push(nextChunk)
while (response) {
response = this.push(await getNextChunk())
}
})
}
})
resolve(dataStream)
} else { } else {
reject({status:404,message:"not found"}) reject({status:404,message:"not found"})