From e78c61b410af4c2eceea4625f1bd1d800b0e26fe Mon Sep 17 00:00:00 2001 From: G2 Date: Sun, 3 Nov 2024 05:47:26 -0600 Subject: [PATCH] Chunked uploads (#2) * Switch database to begin using CBOR, and begin work on chunked uploads * Chunked uploads work, no feedback to client yet * Added poorly implemented progress for chunked uploads * I think these methods are cursed * Improved performance by properly limiting the number of uploads * Properly show errors in file bars again * Improved setup for chunk downloading, added chunk size to config * Ran clippy, added chunk cleaner * Ran `cargo fmt` --- Cargo.toml | 4 +- src/database.rs | 106 ++++++++++++------ src/lib.rs | 285 +++++++++++++++++++++++++++++------------------- src/main.rs | 67 ++++++++++-- src/pages.rs | 2 + src/settings.rs | 13 ++- src/utils.rs | 2 +- web/request.js | 224 ++++++++++++++++++++++--------------- 8 files changed, 456 insertions(+), 247 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 551ba3c..e8740c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,9 +4,9 @@ version = "0.1.2" edition = "2021" [dependencies] -bincode = { version = "2.0.0-rc.3", features = ["serde"] } blake3 = { version = "1.5.4", features = ["mmap", "rayon", "serde"] } chrono = { version = "0.4.38", features = ["serde"] } +ciborium = "0.2.2" file-format = { version = "0.25.0", features = ["reader"] } log = "0.4" lz4_flex = "0.11.3" @@ -16,7 +16,7 @@ rocket = { version = "0.5", features = ["json"] } serde = { version = "1.0.213", features = ["derive"] } serde_with = { version = "3.11.0", features = ["chrono_0_4"] } toml = "0.8.19" -uuid = { version = "1.11.0", features = ["v4"] } +uuid = { version = "1.11.0", features = ["serde", "v4"] } [profile.production] inherits = "release" diff --git a/src/database.rs b/src/database.rs index 09df022..34f846f 100644 --- a/src/database.rs +++ b/src/database.rs @@ -7,33 +7,32 @@ use std::{ sync::{Arc, RwLock}, }; -use bincode::{config::Configuration, decode_from_std_read, encode_into_std_write, Decode, Encode}; use blake3::Hash; use chrono::{DateTime, TimeDelta, Utc}; +use ciborium::{from_reader, into_writer}; use log::{error, info, warn}; use rand::distributions::{Alphanumeric, DistString}; use rocket::{ + form::{self, FromFormField, ValueField}, serde::{Deserialize, Serialize}, - tokio::{select, sync::mpsc::Receiver, time}, }; use serde_with::{serde_as, DisplayFromStr}; +use uuid::Uuid; -const BINCODE_CFG: Configuration = bincode::config::standard(); - -#[derive(Debug, Clone, Decode, Encode)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Mochibase { path: PathBuf, /// Every hash in the database along with the [`Mmid`]s associated with them - #[bincode(with_serde)] hashes: HashMap>, /// All entries in the database - #[bincode(with_serde)] entries: HashMap, } impl Mochibase { + /// Create a new database initialized with no data, and save it to the + /// provided path pub fn new>(path: &P) -> Result { let output = Self { path: path.as_ref().to_path_buf(), @@ -52,7 +51,7 @@ impl Mochibase { let file = File::open(path)?; let mut lz4_file = lz4_flex::frame::FrameDecoder::new(file); - decode_from_std_read(&mut lz4_file, BINCODE_CFG) + from_reader(&mut lz4_file) .map_err(|e| io::Error::other(format!("failed to open database: {e}"))) } @@ -70,7 +69,7 @@ impl Mochibase { // Create a file and write the LZ4 compressed stream into it let file = File::create(self.path.with_extension("bkp"))?; let mut lz4_file = lz4_flex::frame::FrameEncoder::new(file); - encode_into_std_write(self, &mut lz4_file, BINCODE_CFG) + into_writer(self, &mut lz4_file) .map_err(|e| io::Error::other(format!("failed to save database: {e}")))?; lz4_file.try_finish()?; @@ -154,7 +153,7 @@ impl Mochibase { /// An entry in the database storing metadata about a file #[serde_as] -#[derive(Debug, Clone, Decode, Encode, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct MochiFile { /// A unique identifier describing this file mmid: Mmid, @@ -166,16 +165,13 @@ pub struct MochiFile { mime_type: String, /// The Blake3 hash of the file - #[bincode(with_serde)] #[serde_as(as = "DisplayFromStr")] hash: Hash, /// The datetime when the file was uploaded - #[bincode(with_serde)] upload_datetime: DateTime, /// The datetime when the file is set to expire - #[bincode(with_serde)] expiry_datetime: DateTime, } @@ -227,7 +223,7 @@ impl MochiFile { /// Clean the database. Removes files which are past their expiry /// [`chrono::DateTime`]. Also removes files which no longer exist on the disk. -fn clean_database(db: &Arc>, file_path: &Path) { +pub fn clean_database(db: &Arc>, file_path: &Path) { let mut database = db.write().unwrap(); // Add expired entries to the removal list @@ -266,26 +262,9 @@ fn clean_database(db: &Arc>, file_path: &Path) { drop(database); // Just to be sure } -/// A loop to clean the database periodically. -pub async fn clean_loop( - db: Arc>, - file_path: PathBuf, - mut shutdown_signal: Receiver<()>, - interval: TimeDelta, -) { - let mut interval = time::interval(interval.to_std().unwrap()); - - loop { - select! { - _ = interval.tick() => clean_database(&db, &file_path), - _ = shutdown_signal.recv() => break, - }; - } -} - /// A unique identifier for an entry in the database, 8 characters long, /// consists of ASCII alphanumeric characters (`a-z`, `A-Z`, and `0-9`). -#[derive(Debug, PartialEq, Eq, Clone, Hash, Decode, Encode, Deserialize, Serialize)] +#[derive(Debug, PartialEq, Eq, Clone, Hash, Deserialize, Serialize)] pub struct Mmid(String); impl Mmid { @@ -347,3 +326,66 @@ impl std::fmt::Display for Mmid { write!(f, "{}", self.0) } } + +#[rocket::async_trait] +impl<'r> FromFormField<'r> for Mmid { + fn from_value(field: ValueField<'r>) -> form::Result<'r, Self> { + Ok(Self::try_from(field.value).map_err(|_| form::Error::validation("Invalid MMID"))?) + } +} + +/// An in-memory database for partially uploaded chunks of files +#[derive(Default, Debug)] +pub struct Chunkbase { + chunks: HashMap, ChunkedInfo)>, +} + +impl Chunkbase { + pub fn chunks(&self) -> &HashMap, ChunkedInfo)> { + &self.chunks + } + + pub fn mut_chunks(&mut self) -> &mut HashMap, ChunkedInfo)> { + &mut self.chunks + } + + /// Delete all temporary chunk files + pub fn delete_all(&mut self) -> Result<(), io::Error> { + for (_timeout, chunk) in self.chunks.values() { + fs::remove_file(&chunk.path)?; + } + + self.chunks.clear(); + + Ok(()) + } + + pub fn delete_timed_out(&mut self) -> Result<(), io::Error> { + let now = Utc::now(); + self.mut_chunks().retain(|_u, (t, c)| { + if *t <= now { + let _ = fs::remove_file(&c.path); + false + } else { + true + } + }); + + Ok(()) + } +} + +/// Information about how to manage partially uploaded chunks of files +#[serde_as] +#[derive(Default, Debug, Clone, Deserialize, Serialize)] +pub struct ChunkedInfo { + pub name: String, + pub size: u64, + #[serde_as(as = "serde_with::DurationSeconds")] + pub expire_duration: TimeDelta, + + #[serde(skip)] + pub path: PathBuf, + #[serde(skip)] + pub offset: u64, +} diff --git a/src/lib.rs b/src/lib.rs index 95bfcbc..36cea50 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,22 +6,28 @@ pub mod settings; pub mod strings; pub mod utils; -use std::sync::{Arc, RwLock}; +use std::{ + io::{self, ErrorKind}, + sync::{Arc, RwLock}, +}; -use crate::database::{Mmid, MochiFile, Mochibase}; -use crate::pages::{footer, head}; -use crate::settings::Settings; -use crate::strings::{parse_time_string, to_pretty_time}; -use crate::utils::hash_file; -use chrono::{DateTime, Utc}; +use crate::{ + pages::{footer, head}, + settings::Settings, + strings::to_pretty_time, +}; +use chrono::{TimeDelta, Utc}; +use database::{Chunkbase, ChunkedInfo, Mmid, MochiFile, Mochibase}; use maud::{html, Markup, PreEscaped}; use rocket::{ data::ToByteUnit, - form::Form, - fs::TempFile, get, post, serde::{json::Json, Serialize}, - FromForm, State, + tokio::{ + fs, + io::{AsyncSeekExt, AsyncWriteExt}, + }, + Data, State, }; use uuid::Uuid; @@ -52,7 +58,7 @@ pub fn home(settings: &State) -> Markup { form #uploadForm { // It's stupid how these can't be styled so they're just hidden here... input #fileDuration type="text" name="duration" minlength="2" - maxlength="7" value=(settings.duration.default.num_seconds().to_string() + "s") style="display:none;"; + maxlength="7" value=(settings.duration.default.num_seconds().to_string()) style="display:none;"; input #fileInput type="file" name="fileUpload" multiple onchange="formSubmit(this.parentNode)" data-max-filesize=(settings.max_filesize) style="display:none;"; } @@ -69,113 +75,172 @@ pub fn home(settings: &State) -> Markup { } } -#[derive(Debug, FromForm)] -pub struct Upload<'r> { - #[field(name = "duration")] - expire_time: String, +#[derive(Serialize, Default)] +pub struct ChunkedResponse { + status: bool, + message: String, - #[field(name = "fileUpload")] - file: TempFile<'r>, -} - -/// Handle a file upload and store it -#[post("/upload", data = "")] -pub async fn handle_upload( - mut file_data: Form>, - db: &State>>, - settings: &State, -) -> Result, std::io::Error> { - let current = Utc::now(); - // Ensure the expiry time is valid, if not return an error - let expire_time = if let Ok(t) = parse_time_string(&file_data.expire_time) { - if settings.duration.restrict_to_allowed && !settings.duration.allowed.contains(&t) { - return Ok(Json(ClientResponse::failure("Duration not allowed"))); - } - - if t > settings.duration.maximum { - return Ok(Json(ClientResponse::failure("Duration larger than max"))); - } - - t - } else { - return Ok(Json(ClientResponse::failure("Duration invalid"))); - }; - - let raw_name = file_data - .file - .raw_name() - .unwrap() - .dangerous_unsafe_unsanitized_raw() - .as_str() - .to_string(); - - // Get temp path for the file - let temp_filename = settings.temp_dir.join(Uuid::new_v4().to_string()); - file_data.file.persist_to(&temp_filename).await?; - - // Get hash and random identifier and expiry - let file_mmid = Mmid::new_random(); - let file_hash = hash_file(&temp_filename).await?; - let expiry = current + expire_time; - - // Process filetype - let file_type = file_format::FileFormat::from_file(&temp_filename)?; - - let constructed_file = MochiFile::new( - file_mmid.clone(), - raw_name, - file_type.media_type().to_string(), - file_hash, - current, - expiry, - ); - - // If the hash does not exist in the database, - // move the file to the backend, else, delete it - if db.read().unwrap().get_hash(&file_hash).is_none() { - std::fs::rename(temp_filename, settings.file_dir.join(file_hash.to_string()))?; - } else { - std::fs::remove_file(temp_filename)?; - } - - db.write() - .unwrap() - .insert(&file_mmid, constructed_file.clone()); - - Ok(Json(ClientResponse { - status: true, - name: constructed_file.name().clone(), - mmid: Some(constructed_file.mmid().clone()), - hash: constructed_file.hash().to_string(), - expires: Some(constructed_file.expiry()), - ..Default::default() - })) -} - -/// A response to the client from the server -#[derive(Serialize, Default, Debug)] -pub struct ClientResponse { - /// Success or failure - pub status: bool, - - pub response: &'static str, - - #[serde(skip_serializing_if = "str::is_empty")] - pub name: String, + /// UUID used for associating the chunk with the final file #[serde(skip_serializing_if = "Option::is_none")] - pub mmid: Option, - #[serde(skip_serializing_if = "str::is_empty")] - pub hash: String, + uuid: Option, + + /// Valid max chunk size in bytes #[serde(skip_serializing_if = "Option::is_none")] - pub expires: Option>, + chunk_size: Option, } -impl ClientResponse { - fn failure(response: &'static str) -> Self { +impl ChunkedResponse { + fn failure(message: &str) -> Self { Self { status: false, - response, + message: message.to_string(), ..Default::default() } } } + +/// Start a chunked upload. Response contains all the info you need to continue +/// uploading chunks. +#[post("/upload/chunked", data = "")] +pub async fn chunked_upload_start( + db: &State>>, + settings: &State, + mut file_info: Json, +) -> Result, std::io::Error> { + let uuid = Uuid::new_v4(); + file_info.path = settings.temp_dir.join(uuid.to_string()); + + // Perform some sanity checks + if file_info.size > settings.max_filesize { + return Ok(Json(ChunkedResponse::failure("File too large"))); + } + if settings.duration.restrict_to_allowed + && !settings + .duration + .allowed + .contains(&file_info.expire_duration) + { + return Ok(Json(ChunkedResponse::failure("Duration not allowed"))); + } + if file_info.expire_duration > settings.duration.maximum { + return Ok(Json(ChunkedResponse::failure("Duration too large"))); + } + + fs::File::create_new(&file_info.path).await?; + + db.write().unwrap().mut_chunks().insert( + uuid, + (Utc::now() + TimeDelta::seconds(30), file_info.into_inner()), + ); + + Ok(Json(ChunkedResponse { + status: true, + message: "".into(), + uuid: Some(uuid), + chunk_size: Some(settings.chunk_size), + })) +} + +#[post("/upload/chunked/?", data = "")] +pub async fn chunked_upload_continue( + chunk_db: &State>>, + settings: &State, + data: Data<'_>, + uuid: &str, + offset: u64, +) -> Result<(), io::Error> { + let uuid = Uuid::parse_str(uuid).map_err(io::Error::other)?; + let data_stream = data.open((settings.chunk_size + 100).bytes()); + + let chunked_info = match chunk_db.read().unwrap().chunks().get(&uuid) { + Some(s) => s.clone(), + None => return Err(io::Error::other("Invalid UUID")), + }; + + let mut file = fs::File::options() + .read(true) + .write(true) + .truncate(false) + .open(&chunked_info.1.path) + .await?; + + if offset > chunked_info.1.size { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "The seek position is larger than the file size", + )); + } + + file.seek(io::SeekFrom::Start(offset)).await?; + data_stream.stream_to(&mut file).await?; + file.flush().await?; + let position = file.stream_position().await?; + + if position > chunked_info.1.size { + chunk_db.write().unwrap().mut_chunks().remove(&uuid); + return Err(io::Error::other("File larger than expected")); + } + + Ok(()) +} + +/// Finalize a chunked upload +#[get("/upload/chunked/?finish")] +pub async fn chunked_upload_finish( + main_db: &State>>, + chunk_db: &State>>, + settings: &State, + uuid: &str, +) -> Result, io::Error> { + let now = Utc::now(); + let uuid = Uuid::parse_str(uuid).map_err(io::Error::other)?; + let chunked_info = match chunk_db.read().unwrap().chunks().get(&uuid) { + Some(s) => s.clone(), + None => return Err(io::Error::other("Invalid UUID")), + }; + + // Remove the finished chunk from the db + chunk_db + .write() + .unwrap() + .mut_chunks() + .remove(&uuid) + .unwrap(); + + if !chunked_info.1.path.try_exists().is_ok_and(|e| e) { + return Err(io::Error::other("File does not exist")); + } + + // Get file hash + let mut hasher = blake3::Hasher::new(); + hasher.update_mmap_rayon(&chunked_info.1.path).unwrap(); + let hash = hasher.finalize(); + let new_filename = settings.file_dir.join(hash.to_string()); + + // If the hash does not exist in the database, + // move the file to the backend, else, delete it + if main_db.read().unwrap().get_hash(&hash).is_none() { + std::fs::rename(&chunked_info.1.path, &new_filename).unwrap(); + } else { + std::fs::remove_file(&chunked_info.1.path).unwrap(); + } + + let mmid = Mmid::new_random(); + let file_type = file_format::FileFormat::from_file(&new_filename).unwrap(); + + let constructed_file = MochiFile::new( + mmid.clone(), + chunked_info.1.name, + file_type.media_type().to_string(), + hash, + now, + now + chunked_info.1.expire_duration, + ); + + main_db + .write() + .unwrap() + .insert(&mmid, constructed_file.clone()); + + Ok(Json(constructed_file)) +} diff --git a/src/main.rs b/src/main.rs index 860239c..f99bf5d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,21 @@ use std::{ fs, + path::PathBuf, sync::{Arc, RwLock}, }; use chrono::TimeDelta; use confetti_box::{ - database::{clean_loop, Mochibase}, + database::{clean_database, Chunkbase, Mochibase}, endpoints, pages, resources, settings::Settings, }; use log::info; -use rocket::{data::ToByteUnit as _, routes, tokio}; +use rocket::{ + data::ToByteUnit as _, + routes, + tokio::{self, select, sync::broadcast::Receiver, time}, +}; #[rocket::main] async fn main() { @@ -39,14 +44,22 @@ async fn main() { let database = Arc::new(RwLock::new( Mochibase::open_or_new(&config.database_path).expect("Failed to open or create database"), )); + let chunkbase = Arc::new(RwLock::new(Chunkbase::default())); let local_db = database.clone(); + let local_chunk = chunkbase.clone(); - // Start monitoring thread, cleaning the database every 2 minutes - let (shutdown, rx) = tokio::sync::mpsc::channel(1); + let (shutdown, rx) = tokio::sync::broadcast::channel(1); + // Clean the database every 2 minutes tokio::spawn({ let cleaner_db = database.clone(); let file_path = config.file_dir.clone(); - async move { clean_loop(cleaner_db, file_path, rx, TimeDelta::minutes(2)).await } + async move { clean_loop(cleaner_db, file_path, rx).await } + }); + tokio::spawn({ + let cleaner_db = database.clone(); + let file_path = config.file_dir.clone(); + let rx2 = shutdown.subscribe(); + async move { clean_loop(cleaner_db, file_path, rx2).await } }); let rocket = rocket::build() @@ -65,7 +78,9 @@ async fn main() { .mount( config.server.root_path.clone() + "/", routes![ - confetti_box::handle_upload, + confetti_box::chunked_upload_start, + confetti_box::chunked_upload_continue, + confetti_box::chunked_upload_finish, endpoints::server_info, endpoints::file_info, endpoints::lookup_mmid, @@ -74,6 +89,7 @@ async fn main() { ], ) .manage(database) + .manage(chunkbase) .manage(config) .configure(rocket_config) .launch() @@ -83,10 +99,7 @@ async fn main() { rocket.expect("Server failed to shutdown gracefully"); info!("Stopping database cleaning thread..."); - shutdown - .send(()) - .await - .expect("Failed to stop cleaner thread."); + shutdown.send(()).expect("Failed to stop cleaner thread."); info!("Stopping database cleaning thread completed successfully."); info!("Saving database on shutdown..."); @@ -96,4 +109,38 @@ async fn main() { .save() .expect("Failed to save database"); info!("Saving database completed successfully."); + + info!("Deleting chunk data on shutdown..."); + local_chunk + .write() + .unwrap() + .delete_all() + .expect("Failed to delete chunks"); + info!("Deleting chunk data completed successfully."); +} + +/// A loop to clean the database periodically. +pub async fn clean_loop( + main_db: Arc>, + file_path: PathBuf, + mut shutdown_signal: Receiver<()>, +) { + let mut interval = time::interval(TimeDelta::minutes(2).to_std().unwrap()); + loop { + select! { + _ = interval.tick() => clean_database(&main_db, &file_path), + _ = shutdown_signal.recv() => break, + }; + } +} + +pub async fn clean_chunks(chunk_db: Arc>, mut shutdown_signal: Receiver<()>) { + let mut interval = time::interval(TimeDelta::seconds(30).to_std().unwrap()); + + loop { + select! { + _ = interval.tick() => {let _ = chunk_db.write().unwrap().delete_timed_out();}, + _ = shutdown_signal.recv() => break, + }; + } } diff --git a/src/pages.rs b/src/pages.rs index fce3345..f54e7e6 100644 --- a/src/pages.rs +++ b/src/pages.rs @@ -11,6 +11,8 @@ pub fn head(page_title: &str) -> Markup { title { (page_title) } link rel="icon" type="image/svg+xml" href="/resources/favicon.svg"; link rel="stylesheet" href="/resources/main.css"; + link rel="preload" href="/resources/fonts/Roboto.woff2" as="font" type="font/woff2" crossorigin; + link rel="preload" href="/resources/fonts/FiraCode.woff2" as="font" type="font/woff2" crossorigin; } } diff --git a/src/settings.rs b/src/settings.rs index 6bfd078..2f3592a 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -17,6 +17,10 @@ pub struct Settings { #[serde(default)] pub max_filesize: u64, + /// Maximum filesize in bytes + #[serde(default)] + pub chunk_size: u64, + /// Is overwiting already uploaded files with the same hash allowed, or is /// this a no-op? #[serde(default)] @@ -48,7 +52,8 @@ pub struct Settings { impl Default for Settings { fn default() -> Self { Self { - max_filesize: 1.megabytes().into(), // 128 MB + max_filesize: 25.megabytes().into(), // 1 MB + chunk_size: 1.megabytes().into(), overwrite: true, duration: DurationSettings::default(), server: ServerSettings::default(), @@ -81,11 +86,11 @@ impl Settings { } pub fn save(&self) -> Result<(), io::Error> { - let mut out_path = self.path.clone(); - out_path.set_extension(".bkp"); - let mut file = File::create(&out_path).expect("Could not save!"); + let out_path = &self.path.with_extension("new"); + let mut file = File::create(out_path)?; file.write_all(&toml::to_string_pretty(self).unwrap().into_bytes())?; + // Overwrite the original DB with fs::rename(out_path, &self.path).unwrap(); Ok(()) diff --git a/src/utils.rs b/src/utils.rs index dfb202b..565d6e0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,7 +1,7 @@ use blake3::Hash; use std::path::Path; -/// Get the Blake3 hash of a file, without reading it all into memory, and also get the size +/// Get the Blake3 hash of a file, without reading it all into memory pub async fn hash_file>(input: &P) -> Result { let mut hasher = blake3::Hasher::new(); hasher.update_mmap_rayon(input)?; diff --git a/web/request.js b/web/request.js index 5e2629c..bb66aaa 100644 --- a/web/request.js +++ b/web/request.js @@ -10,7 +10,7 @@ async function formSubmit() { const duration = form.elements.duration.value; const maxSize = form.elements.fileUpload.dataset.maxFilesize; - await sendFile(files, duration, maxSize); + await sendFiles(files, duration, maxSize); // Reset the form file data since we've successfully submitted it form.elements.fileUpload.value = ""; @@ -19,6 +19,7 @@ async function formSubmit() { async function dragDropSubmit(evt) { const form = document.getElementById("uploadForm"); const duration = form.elements.duration.value; + const maxSize = form.elements.fileUpload.dataset.maxFilesize; evt.preventDefault(); @@ -38,12 +39,13 @@ async function dragDropSubmit(evt) { }); } - await sendFile(files, duration); + await sendFiles(files, duration, maxSize); } async function pasteSubmit(evt) { const form = document.getElementById("uploadForm"); const duration = form.elements.duration.value; + const maxSize = form.elements.fileUpload.dataset.maxFilesize; const files = []; const len = evt.clipboardData.files.length; @@ -52,45 +54,145 @@ async function pasteSubmit(evt) { files.push(file); } - await sendFile(files, duration); + await sendFiles(files, duration, maxSize); } -async function sendFile(files, duration, maxSize) { +async function sendFiles(files, duration, maxSize) { + const inProgressUploads = new Set(); + const concurrencyLimit = 10; + for (const file of files) { - const [linkRow, progressBar, progressText] = addNewToList(file.name); - if (file.size > maxSize) { - makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT); - console.error("Provided file is too large", file.size, "bytes; max", maxSize, "bytes"); - continue; - } else if (file.size == 0) { - makeErrored(progressBar, progressText, linkRow, ZERO_TEXT); - console.error("Provided file has 0 bytes"); - continue; + // Start the upload and add it to the set of in-progress uploads + const uploadPromise = uploadFile(file, duration, maxSize); + inProgressUploads.add(uploadPromise); + + // Once an upload finishes, remove it from the set + uploadPromise.finally(() => inProgressUploads.delete(uploadPromise)); + + // If we reached the concurrency limit, wait for one of the uploads to complete + if (inProgressUploads.size >= concurrencyLimit) { + await Promise.race(inProgressUploads); } + } - const request = new XMLHttpRequest(); - request.open('POST', "./upload", true); + // Wait for any remaining uploads to complete + await Promise.allSettled(inProgressUploads); +} - // Set up event listeners - request.upload.addEventListener('progress', - (p) => {uploadProgress(p, progressBar, progressText, linkRow);}, false); - request.addEventListener('load', - (c) => {uploadComplete(c, progressBar, progressText, linkRow);}, false); - request.addEventListener('error', - (e) => {networkErrorHandler(e, progressBar, progressText, linkRow);}, false); +async function uploadFile(file, duration, maxSize) { + const [linkRow, progressBar, progressText] = await addNewToList(file.name); + if (file.size > maxSize) { + console.error("Provided file is too large", file.size, "bytes; max", maxSize, "bytes"); + makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT); + return; + } else if (file.size == 0) { + console.error("Provided file has 0 bytes"); + makeErrored(progressBar, progressText, linkRow, ZERO_TEXT); + return; + } - linkRow.classList.add("upload_inprogress"); - - // Create and send FormData - try { - const formData = new FormData(); - formData.append("duration", duration); - formData.append("fileUpload", file); - request.send(formData); - } catch (e) { - makeErrored(progressBar, progressText, linkRow, ERROR_TEXT); - console.error("An error occured while uploading", e); + // Get preliminary upload information + let chunkedResponse; + try { + const response = await fetch("/upload/chunked", { + method: "POST", + body: JSON.stringify({ + "name": file.name, + "size": file.size, + "expire_duration": parseInt(duration), + }), + }); + if (!response.ok) { + throw new Error(`Response status: ${response.status}`); } + chunkedResponse = await response.json(); + } catch (error) { + console.error(error); + makeErrored(progressBar, progressText, linkRow, ERROR_TEXT); + } + + // Upload the file in `chunk_size` chunks + const chunkUploads = new Set(); + const progressValues = []; + const concurrencyLimit = 4; + for (let start = 0; start < file.size; start += chunkedResponse.chunk_size) { + const chunk = file.slice(start, start + chunkedResponse.chunk_size) + const url = "/upload/chunked/" + chunkedResponse.uuid + "?offset=" + start; + const ID = progressValues.push(0); + + let upload = new Promise(function (resolve, reject) { + let request = new XMLHttpRequest(); + request.open("POST", url, true); + request.upload.addEventListener('progress', + (p) => {uploadProgress(p, progressBar, progressText, progressValues, file.size, ID);}, true + ); + + request.onload = (e) => { + if (e.target.status >= 200 && e.target.status < 300) { + resolve(request.response); + } else { + reject({status: e.target.status, statusText: request.statusText}); + } + }; + request.onerror = (e) => { + reject({status: e.target.status, statusText: request.statusText}) + }; + request.send(chunk); + }); + + chunkUploads.add(upload); + upload.finally(() => chunkUploads.delete(upload)); + if (chunkUploads.size >= concurrencyLimit) { + await Promise.race(chunkUploads); + } + } + await Promise.allSettled(chunkUploads); + + // Finish the request and update the progress box + const result = await fetch("/upload/chunked/" + chunkedResponse.uuid + "?finish"); + uploadComplete(result, progressBar, progressText, linkRow); +} + +async function addNewToList(origFileName) { + const uploadedFilesDisplay = document.getElementById("uploadedFilesDisplay"); + const linkRow = uploadedFilesDisplay.appendChild(document.createElement("div")); + const fileName = linkRow.appendChild(document.createElement("p")); + const progressBar = linkRow.appendChild(document.createElement("progress")); + const progressTxt = linkRow.appendChild(document.createElement("p")); + + fileName.textContent = origFileName; + fileName.classList.add("file_name"); + progressTxt.classList.add("status"); + progressBar.max="100"; + progressBar.value="0"; + + return [linkRow, progressBar, progressTxt]; +} + +function uploadProgress(progress, progressBar, progressText, progressValues, fileSize, ID) { + if (progress.lengthComputable) { + progressValues[ID] = progress.loaded; + + const progressPercent = Math.floor((progressValues.reduce((a, b) => a + b, 0) / fileSize) * 100); + if (progressPercent == 100) { + progressBar.removeAttribute("value"); + progressText.textContent = "⏳"; + } else { + progressBar.value = progressPercent; + progressText.textContent = progressPercent + "%"; + } + } +} + +async function uploadComplete(response, progressBar, progressText, linkRow) { + if (response.status === 200) { + const responseJson = await response.json(); + console.log("Successfully uploaded file", responseJson); + makeFinished(progressBar, progressText, linkRow, responseJson); + } else if (response.status === 413) { + makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT); + } else { + makeErrored(progressBar, progressText, linkRow, ERROR_TEXT); } } @@ -128,60 +230,6 @@ function makeFinished(progressBar, progressText, linkRow, response) { linkRow.classList.add("upload_done"); } -function networkErrorHandler(err, progressBar, progressText, linkRow) { - makeErrored(progressBar, progressText, linkRow, "A network error occured"); - console.error("A network error occured while uploading", err); -} - -function uploadProgress(progress, progressBar, progressText, _linkRow) { - if (progress.lengthComputable) { - const progressPercent = Math.floor((progress.loaded / progress.total) * 100); - if (progressPercent == 100) { - progressBar.removeAttribute("value"); - progressText.textContent = "⏳"; - } else { - progressBar.value = progressPercent; - progressText.textContent = progressPercent + "%"; - } - } -} - -function uploadComplete(response, progressBar, progressText, linkRow) { - let target = response.target; - - if (target.status === 200) { - const response = JSON.parse(target.responseText); - - if (response.status) { - console.log("Successfully uploaded file", response); - makeFinished(progressBar, progressText, linkRow, response); - } else { - console.error("Error uploading", response); - makeErrored(progressBar, progressText, linkRow, response.response); - } - } else if (target.status === 413) { - makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT); - } else { - makeErrored(progressBar, progressText, linkRow, ERROR_TEXT); - } -} - -function addNewToList(origFileName) { - const uploadedFilesDisplay = document.getElementById("uploadedFilesDisplay"); - const linkRow = uploadedFilesDisplay.appendChild(document.createElement("div")); - const fileName = linkRow.appendChild(document.createElement("p")); - const progressBar = linkRow.appendChild(document.createElement("progress")); - const progressTxt = linkRow.appendChild(document.createElement("p")); - - fileName.textContent = origFileName; - fileName.classList.add("file_name"); - progressTxt.classList.add("status"); - progressBar.max="100"; - progressBar.value="0"; - - return [linkRow, progressBar, progressTxt]; -} - async function initEverything() { const durationBox = document.getElementById("durationBox"); const durationButtons = durationBox.getElementsByTagName("button"); @@ -190,7 +238,7 @@ async function initEverything() { if (this.classList.contains("selected")) { return; } - document.getElementById("uploadForm").elements.duration.value = this.dataset.durationSeconds + "s"; + document.getElementById("uploadForm").elements.duration.value = this.dataset.durationSeconds; let selected = this.parentNode.getElementsByClassName("selected"); selected[0].classList.remove("selected"); this.classList.add("selected");