Chunked uploads (#2)

* Switch database to begin using CBOR, and begin work on chunked uploads

* Chunked uploads work, no feedback to client yet

* Added poorly implemented progress for chunked uploads

* I think these methods are cursed

* Improved performance by properly limiting the number of uploads

* Properly show errors in file bars again

* Improved setup for chunk downloading, added chunk size to config

* Ran clippy, added chunk cleaner

* Ran `cargo fmt`
This commit is contained in:
G2 2024-11-03 05:47:26 -06:00 committed by GitHub
parent 3892975fc2
commit e78c61b410
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 456 additions and 247 deletions

View file

@ -4,9 +4,9 @@ version = "0.1.2"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
bincode = { version = "2.0.0-rc.3", features = ["serde"] }
blake3 = { version = "1.5.4", features = ["mmap", "rayon", "serde"] } blake3 = { version = "1.5.4", features = ["mmap", "rayon", "serde"] }
chrono = { version = "0.4.38", features = ["serde"] } chrono = { version = "0.4.38", features = ["serde"] }
ciborium = "0.2.2"
file-format = { version = "0.25.0", features = ["reader"] } file-format = { version = "0.25.0", features = ["reader"] }
log = "0.4" log = "0.4"
lz4_flex = "0.11.3" lz4_flex = "0.11.3"
@ -16,7 +16,7 @@ rocket = { version = "0.5", features = ["json"] }
serde = { version = "1.0.213", features = ["derive"] } serde = { version = "1.0.213", features = ["derive"] }
serde_with = { version = "3.11.0", features = ["chrono_0_4"] } serde_with = { version = "3.11.0", features = ["chrono_0_4"] }
toml = "0.8.19" toml = "0.8.19"
uuid = { version = "1.11.0", features = ["v4"] } uuid = { version = "1.11.0", features = ["serde", "v4"] }
[profile.production] [profile.production]
inherits = "release" inherits = "release"

View file

@ -7,33 +7,32 @@ use std::{
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use bincode::{config::Configuration, decode_from_std_read, encode_into_std_write, Decode, Encode};
use blake3::Hash; use blake3::Hash;
use chrono::{DateTime, TimeDelta, Utc}; use chrono::{DateTime, TimeDelta, Utc};
use ciborium::{from_reader, into_writer};
use log::{error, info, warn}; use log::{error, info, warn};
use rand::distributions::{Alphanumeric, DistString}; use rand::distributions::{Alphanumeric, DistString};
use rocket::{ use rocket::{
form::{self, FromFormField, ValueField},
serde::{Deserialize, Serialize}, serde::{Deserialize, Serialize},
tokio::{select, sync::mpsc::Receiver, time},
}; };
use serde_with::{serde_as, DisplayFromStr}; use serde_with::{serde_as, DisplayFromStr};
use uuid::Uuid;
const BINCODE_CFG: Configuration = bincode::config::standard(); #[derive(Debug, Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Decode, Encode)]
pub struct Mochibase { pub struct Mochibase {
path: PathBuf, path: PathBuf,
/// Every hash in the database along with the [`Mmid`]s associated with them /// Every hash in the database along with the [`Mmid`]s associated with them
#[bincode(with_serde)]
hashes: HashMap<Hash, HashSet<Mmid>>, hashes: HashMap<Hash, HashSet<Mmid>>,
/// All entries in the database /// All entries in the database
#[bincode(with_serde)]
entries: HashMap<Mmid, MochiFile>, entries: HashMap<Mmid, MochiFile>,
} }
impl Mochibase { impl Mochibase {
/// Create a new database initialized with no data, and save it to the
/// provided path
pub fn new<P: AsRef<Path>>(path: &P) -> Result<Self, io::Error> { pub fn new<P: AsRef<Path>>(path: &P) -> Result<Self, io::Error> {
let output = Self { let output = Self {
path: path.as_ref().to_path_buf(), path: path.as_ref().to_path_buf(),
@ -52,7 +51,7 @@ impl Mochibase {
let file = File::open(path)?; let file = File::open(path)?;
let mut lz4_file = lz4_flex::frame::FrameDecoder::new(file); let mut lz4_file = lz4_flex::frame::FrameDecoder::new(file);
decode_from_std_read(&mut lz4_file, BINCODE_CFG) from_reader(&mut lz4_file)
.map_err(|e| io::Error::other(format!("failed to open database: {e}"))) .map_err(|e| io::Error::other(format!("failed to open database: {e}")))
} }
@ -70,7 +69,7 @@ impl Mochibase {
// Create a file and write the LZ4 compressed stream into it // Create a file and write the LZ4 compressed stream into it
let file = File::create(self.path.with_extension("bkp"))?; let file = File::create(self.path.with_extension("bkp"))?;
let mut lz4_file = lz4_flex::frame::FrameEncoder::new(file); let mut lz4_file = lz4_flex::frame::FrameEncoder::new(file);
encode_into_std_write(self, &mut lz4_file, BINCODE_CFG) into_writer(self, &mut lz4_file)
.map_err(|e| io::Error::other(format!("failed to save database: {e}")))?; .map_err(|e| io::Error::other(format!("failed to save database: {e}")))?;
lz4_file.try_finish()?; lz4_file.try_finish()?;
@ -154,7 +153,7 @@ impl Mochibase {
/// An entry in the database storing metadata about a file /// An entry in the database storing metadata about a file
#[serde_as] #[serde_as]
#[derive(Debug, Clone, Decode, Encode, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
pub struct MochiFile { pub struct MochiFile {
/// A unique identifier describing this file /// A unique identifier describing this file
mmid: Mmid, mmid: Mmid,
@ -166,16 +165,13 @@ pub struct MochiFile {
mime_type: String, mime_type: String,
/// The Blake3 hash of the file /// The Blake3 hash of the file
#[bincode(with_serde)]
#[serde_as(as = "DisplayFromStr")] #[serde_as(as = "DisplayFromStr")]
hash: Hash, hash: Hash,
/// The datetime when the file was uploaded /// The datetime when the file was uploaded
#[bincode(with_serde)]
upload_datetime: DateTime<Utc>, upload_datetime: DateTime<Utc>,
/// The datetime when the file is set to expire /// The datetime when the file is set to expire
#[bincode(with_serde)]
expiry_datetime: DateTime<Utc>, expiry_datetime: DateTime<Utc>,
} }
@ -227,7 +223,7 @@ impl MochiFile {
/// Clean the database. Removes files which are past their expiry /// Clean the database. Removes files which are past their expiry
/// [`chrono::DateTime`]. Also removes files which no longer exist on the disk. /// [`chrono::DateTime`]. Also removes files which no longer exist on the disk.
fn clean_database(db: &Arc<RwLock<Mochibase>>, file_path: &Path) { pub fn clean_database(db: &Arc<RwLock<Mochibase>>, file_path: &Path) {
let mut database = db.write().unwrap(); let mut database = db.write().unwrap();
// Add expired entries to the removal list // Add expired entries to the removal list
@ -266,26 +262,9 @@ fn clean_database(db: &Arc<RwLock<Mochibase>>, file_path: &Path) {
drop(database); // Just to be sure drop(database); // Just to be sure
} }
/// A loop to clean the database periodically.
pub async fn clean_loop(
db: Arc<RwLock<Mochibase>>,
file_path: PathBuf,
mut shutdown_signal: Receiver<()>,
interval: TimeDelta,
) {
let mut interval = time::interval(interval.to_std().unwrap());
loop {
select! {
_ = interval.tick() => clean_database(&db, &file_path),
_ = shutdown_signal.recv() => break,
};
}
}
/// A unique identifier for an entry in the database, 8 characters long, /// A unique identifier for an entry in the database, 8 characters long,
/// consists of ASCII alphanumeric characters (`a-z`, `A-Z`, and `0-9`). /// consists of ASCII alphanumeric characters (`a-z`, `A-Z`, and `0-9`).
#[derive(Debug, PartialEq, Eq, Clone, Hash, Decode, Encode, Deserialize, Serialize)] #[derive(Debug, PartialEq, Eq, Clone, Hash, Deserialize, Serialize)]
pub struct Mmid(String); pub struct Mmid(String);
impl Mmid { impl Mmid {
@ -347,3 +326,66 @@ impl std::fmt::Display for Mmid {
write!(f, "{}", self.0) write!(f, "{}", self.0)
} }
} }
#[rocket::async_trait]
impl<'r> FromFormField<'r> for Mmid {
fn from_value(field: ValueField<'r>) -> form::Result<'r, Self> {
Ok(Self::try_from(field.value).map_err(|_| form::Error::validation("Invalid MMID"))?)
}
}
/// An in-memory database for partially uploaded chunks of files
#[derive(Default, Debug)]
pub struct Chunkbase {
chunks: HashMap<Uuid, (DateTime<Utc>, ChunkedInfo)>,
}
impl Chunkbase {
pub fn chunks(&self) -> &HashMap<Uuid, (DateTime<Utc>, ChunkedInfo)> {
&self.chunks
}
pub fn mut_chunks(&mut self) -> &mut HashMap<Uuid, (DateTime<Utc>, ChunkedInfo)> {
&mut self.chunks
}
/// Delete all temporary chunk files
pub fn delete_all(&mut self) -> Result<(), io::Error> {
for (_timeout, chunk) in self.chunks.values() {
fs::remove_file(&chunk.path)?;
}
self.chunks.clear();
Ok(())
}
pub fn delete_timed_out(&mut self) -> Result<(), io::Error> {
let now = Utc::now();
self.mut_chunks().retain(|_u, (t, c)| {
if *t <= now {
let _ = fs::remove_file(&c.path);
false
} else {
true
}
});
Ok(())
}
}
/// Information about how to manage partially uploaded chunks of files
#[serde_as]
#[derive(Default, Debug, Clone, Deserialize, Serialize)]
pub struct ChunkedInfo {
pub name: String,
pub size: u64,
#[serde_as(as = "serde_with::DurationSeconds<i64>")]
pub expire_duration: TimeDelta,
#[serde(skip)]
pub path: PathBuf,
#[serde(skip)]
pub offset: u64,
}

View file

@ -6,22 +6,28 @@ pub mod settings;
pub mod strings; pub mod strings;
pub mod utils; pub mod utils;
use std::sync::{Arc, RwLock}; use std::{
io::{self, ErrorKind},
sync::{Arc, RwLock},
};
use crate::database::{Mmid, MochiFile, Mochibase}; use crate::{
use crate::pages::{footer, head}; pages::{footer, head},
use crate::settings::Settings; settings::Settings,
use crate::strings::{parse_time_string, to_pretty_time}; strings::to_pretty_time,
use crate::utils::hash_file; };
use chrono::{DateTime, Utc}; use chrono::{TimeDelta, Utc};
use database::{Chunkbase, ChunkedInfo, Mmid, MochiFile, Mochibase};
use maud::{html, Markup, PreEscaped}; use maud::{html, Markup, PreEscaped};
use rocket::{ use rocket::{
data::ToByteUnit, data::ToByteUnit,
form::Form,
fs::TempFile,
get, post, get, post,
serde::{json::Json, Serialize}, serde::{json::Json, Serialize},
FromForm, State, tokio::{
fs,
io::{AsyncSeekExt, AsyncWriteExt},
},
Data, State,
}; };
use uuid::Uuid; use uuid::Uuid;
@ -52,7 +58,7 @@ pub fn home(settings: &State<Settings>) -> Markup {
form #uploadForm { form #uploadForm {
// It's stupid how these can't be styled so they're just hidden here... // It's stupid how these can't be styled so they're just hidden here...
input #fileDuration type="text" name="duration" minlength="2" input #fileDuration type="text" name="duration" minlength="2"
maxlength="7" value=(settings.duration.default.num_seconds().to_string() + "s") style="display:none;"; maxlength="7" value=(settings.duration.default.num_seconds().to_string()) style="display:none;";
input #fileInput type="file" name="fileUpload" multiple input #fileInput type="file" name="fileUpload" multiple
onchange="formSubmit(this.parentNode)" data-max-filesize=(settings.max_filesize) style="display:none;"; onchange="formSubmit(this.parentNode)" data-max-filesize=(settings.max_filesize) style="display:none;";
} }
@ -69,113 +75,172 @@ pub fn home(settings: &State<Settings>) -> Markup {
} }
} }
#[derive(Debug, FromForm)] #[derive(Serialize, Default)]
pub struct Upload<'r> { pub struct ChunkedResponse {
#[field(name = "duration")] status: bool,
expire_time: String, message: String,
#[field(name = "fileUpload")] /// UUID used for associating the chunk with the final file
file: TempFile<'r>, #[serde(skip_serializing_if = "Option::is_none")]
uuid: Option<Uuid>,
/// Valid max chunk size in bytes
#[serde(skip_serializing_if = "Option::is_none")]
chunk_size: Option<u64>,
} }
/// Handle a file upload and store it impl ChunkedResponse {
#[post("/upload", data = "<file_data>")] fn failure(message: &str) -> Self {
pub async fn handle_upload( Self {
mut file_data: Form<Upload<'_>>, status: false,
db: &State<Arc<RwLock<Mochibase>>>, message: message.to_string(),
..Default::default()
}
}
}
/// Start a chunked upload. Response contains all the info you need to continue
/// uploading chunks.
#[post("/upload/chunked", data = "<file_info>")]
pub async fn chunked_upload_start(
db: &State<Arc<RwLock<Chunkbase>>>,
settings: &State<Settings>, settings: &State<Settings>,
) -> Result<Json<ClientResponse>, std::io::Error> { mut file_info: Json<ChunkedInfo>,
let current = Utc::now(); ) -> Result<Json<ChunkedResponse>, std::io::Error> {
// Ensure the expiry time is valid, if not return an error let uuid = Uuid::new_v4();
let expire_time = if let Ok(t) = parse_time_string(&file_data.expire_time) { file_info.path = settings.temp_dir.join(uuid.to_string());
if settings.duration.restrict_to_allowed && !settings.duration.allowed.contains(&t) {
return Ok(Json(ClientResponse::failure("Duration not allowed"))); // Perform some sanity checks
if file_info.size > settings.max_filesize {
return Ok(Json(ChunkedResponse::failure("File too large")));
}
if settings.duration.restrict_to_allowed
&& !settings
.duration
.allowed
.contains(&file_info.expire_duration)
{
return Ok(Json(ChunkedResponse::failure("Duration not allowed")));
}
if file_info.expire_duration > settings.duration.maximum {
return Ok(Json(ChunkedResponse::failure("Duration too large")));
} }
if t > settings.duration.maximum { fs::File::create_new(&file_info.path).await?;
return Ok(Json(ClientResponse::failure("Duration larger than max")));
}
t db.write().unwrap().mut_chunks().insert(
} else { uuid,
return Ok(Json(ClientResponse::failure("Duration invalid"))); (Utc::now() + TimeDelta::seconds(30), file_info.into_inner()),
};
let raw_name = file_data
.file
.raw_name()
.unwrap()
.dangerous_unsafe_unsanitized_raw()
.as_str()
.to_string();
// Get temp path for the file
let temp_filename = settings.temp_dir.join(Uuid::new_v4().to_string());
file_data.file.persist_to(&temp_filename).await?;
// Get hash and random identifier and expiry
let file_mmid = Mmid::new_random();
let file_hash = hash_file(&temp_filename).await?;
let expiry = current + expire_time;
// Process filetype
let file_type = file_format::FileFormat::from_file(&temp_filename)?;
let constructed_file = MochiFile::new(
file_mmid.clone(),
raw_name,
file_type.media_type().to_string(),
file_hash,
current,
expiry,
); );
// If the hash does not exist in the database, Ok(Json(ChunkedResponse {
// move the file to the backend, else, delete it
if db.read().unwrap().get_hash(&file_hash).is_none() {
std::fs::rename(temp_filename, settings.file_dir.join(file_hash.to_string()))?;
} else {
std::fs::remove_file(temp_filename)?;
}
db.write()
.unwrap()
.insert(&file_mmid, constructed_file.clone());
Ok(Json(ClientResponse {
status: true, status: true,
name: constructed_file.name().clone(), message: "".into(),
mmid: Some(constructed_file.mmid().clone()), uuid: Some(uuid),
hash: constructed_file.hash().to_string(), chunk_size: Some(settings.chunk_size),
expires: Some(constructed_file.expiry()),
..Default::default()
})) }))
} }
/// A response to the client from the server #[post("/upload/chunked/<uuid>?<offset>", data = "<data>")]
#[derive(Serialize, Default, Debug)] pub async fn chunked_upload_continue(
pub struct ClientResponse { chunk_db: &State<Arc<RwLock<Chunkbase>>>,
/// Success or failure settings: &State<Settings>,
pub status: bool, data: Data<'_>,
uuid: &str,
offset: u64,
) -> Result<(), io::Error> {
let uuid = Uuid::parse_str(uuid).map_err(io::Error::other)?;
let data_stream = data.open((settings.chunk_size + 100).bytes());
pub response: &'static str, let chunked_info = match chunk_db.read().unwrap().chunks().get(&uuid) {
Some(s) => s.clone(),
None => return Err(io::Error::other("Invalid UUID")),
};
#[serde(skip_serializing_if = "str::is_empty")] let mut file = fs::File::options()
pub name: String, .read(true)
#[serde(skip_serializing_if = "Option::is_none")] .write(true)
pub mmid: Option<Mmid>, .truncate(false)
#[serde(skip_serializing_if = "str::is_empty")] .open(&chunked_info.1.path)
pub hash: String, .await?;
#[serde(skip_serializing_if = "Option::is_none")]
pub expires: Option<DateTime<Utc>>, if offset > chunked_info.1.size {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"The seek position is larger than the file size",
));
} }
impl ClientResponse { file.seek(io::SeekFrom::Start(offset)).await?;
fn failure(response: &'static str) -> Self { data_stream.stream_to(&mut file).await?;
Self { file.flush().await?;
status: false, let position = file.stream_position().await?;
response,
..Default::default() if position > chunked_info.1.size {
chunk_db.write().unwrap().mut_chunks().remove(&uuid);
return Err(io::Error::other("File larger than expected"));
} }
Ok(())
} }
/// Finalize a chunked upload
#[get("/upload/chunked/<uuid>?finish")]
pub async fn chunked_upload_finish(
main_db: &State<Arc<RwLock<Mochibase>>>,
chunk_db: &State<Arc<RwLock<Chunkbase>>>,
settings: &State<Settings>,
uuid: &str,
) -> Result<Json<MochiFile>, io::Error> {
let now = Utc::now();
let uuid = Uuid::parse_str(uuid).map_err(io::Error::other)?;
let chunked_info = match chunk_db.read().unwrap().chunks().get(&uuid) {
Some(s) => s.clone(),
None => return Err(io::Error::other("Invalid UUID")),
};
// Remove the finished chunk from the db
chunk_db
.write()
.unwrap()
.mut_chunks()
.remove(&uuid)
.unwrap();
if !chunked_info.1.path.try_exists().is_ok_and(|e| e) {
return Err(io::Error::other("File does not exist"));
}
// Get file hash
let mut hasher = blake3::Hasher::new();
hasher.update_mmap_rayon(&chunked_info.1.path).unwrap();
let hash = hasher.finalize();
let new_filename = settings.file_dir.join(hash.to_string());
// If the hash does not exist in the database,
// move the file to the backend, else, delete it
if main_db.read().unwrap().get_hash(&hash).is_none() {
std::fs::rename(&chunked_info.1.path, &new_filename).unwrap();
} else {
std::fs::remove_file(&chunked_info.1.path).unwrap();
}
let mmid = Mmid::new_random();
let file_type = file_format::FileFormat::from_file(&new_filename).unwrap();
let constructed_file = MochiFile::new(
mmid.clone(),
chunked_info.1.name,
file_type.media_type().to_string(),
hash,
now,
now + chunked_info.1.expire_duration,
);
main_db
.write()
.unwrap()
.insert(&mmid, constructed_file.clone());
Ok(Json(constructed_file))
} }

View file

@ -1,16 +1,21 @@
use std::{ use std::{
fs, fs,
path::PathBuf,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use chrono::TimeDelta; use chrono::TimeDelta;
use confetti_box::{ use confetti_box::{
database::{clean_loop, Mochibase}, database::{clean_database, Chunkbase, Mochibase},
endpoints, pages, resources, endpoints, pages, resources,
settings::Settings, settings::Settings,
}; };
use log::info; use log::info;
use rocket::{data::ToByteUnit as _, routes, tokio}; use rocket::{
data::ToByteUnit as _,
routes,
tokio::{self, select, sync::broadcast::Receiver, time},
};
#[rocket::main] #[rocket::main]
async fn main() { async fn main() {
@ -39,14 +44,22 @@ async fn main() {
let database = Arc::new(RwLock::new( let database = Arc::new(RwLock::new(
Mochibase::open_or_new(&config.database_path).expect("Failed to open or create database"), Mochibase::open_or_new(&config.database_path).expect("Failed to open or create database"),
)); ));
let chunkbase = Arc::new(RwLock::new(Chunkbase::default()));
let local_db = database.clone(); let local_db = database.clone();
let local_chunk = chunkbase.clone();
// Start monitoring thread, cleaning the database every 2 minutes let (shutdown, rx) = tokio::sync::broadcast::channel(1);
let (shutdown, rx) = tokio::sync::mpsc::channel(1); // Clean the database every 2 minutes
tokio::spawn({ tokio::spawn({
let cleaner_db = database.clone(); let cleaner_db = database.clone();
let file_path = config.file_dir.clone(); let file_path = config.file_dir.clone();
async move { clean_loop(cleaner_db, file_path, rx, TimeDelta::minutes(2)).await } async move { clean_loop(cleaner_db, file_path, rx).await }
});
tokio::spawn({
let cleaner_db = database.clone();
let file_path = config.file_dir.clone();
let rx2 = shutdown.subscribe();
async move { clean_loop(cleaner_db, file_path, rx2).await }
}); });
let rocket = rocket::build() let rocket = rocket::build()
@ -65,7 +78,9 @@ async fn main() {
.mount( .mount(
config.server.root_path.clone() + "/", config.server.root_path.clone() + "/",
routes![ routes![
confetti_box::handle_upload, confetti_box::chunked_upload_start,
confetti_box::chunked_upload_continue,
confetti_box::chunked_upload_finish,
endpoints::server_info, endpoints::server_info,
endpoints::file_info, endpoints::file_info,
endpoints::lookup_mmid, endpoints::lookup_mmid,
@ -74,6 +89,7 @@ async fn main() {
], ],
) )
.manage(database) .manage(database)
.manage(chunkbase)
.manage(config) .manage(config)
.configure(rocket_config) .configure(rocket_config)
.launch() .launch()
@ -83,10 +99,7 @@ async fn main() {
rocket.expect("Server failed to shutdown gracefully"); rocket.expect("Server failed to shutdown gracefully");
info!("Stopping database cleaning thread..."); info!("Stopping database cleaning thread...");
shutdown shutdown.send(()).expect("Failed to stop cleaner thread.");
.send(())
.await
.expect("Failed to stop cleaner thread.");
info!("Stopping database cleaning thread completed successfully."); info!("Stopping database cleaning thread completed successfully.");
info!("Saving database on shutdown..."); info!("Saving database on shutdown...");
@ -96,4 +109,38 @@ async fn main() {
.save() .save()
.expect("Failed to save database"); .expect("Failed to save database");
info!("Saving database completed successfully."); info!("Saving database completed successfully.");
info!("Deleting chunk data on shutdown...");
local_chunk
.write()
.unwrap()
.delete_all()
.expect("Failed to delete chunks");
info!("Deleting chunk data completed successfully.");
}
/// A loop to clean the database periodically.
pub async fn clean_loop(
main_db: Arc<RwLock<Mochibase>>,
file_path: PathBuf,
mut shutdown_signal: Receiver<()>,
) {
let mut interval = time::interval(TimeDelta::minutes(2).to_std().unwrap());
loop {
select! {
_ = interval.tick() => clean_database(&main_db, &file_path),
_ = shutdown_signal.recv() => break,
};
}
}
pub async fn clean_chunks(chunk_db: Arc<RwLock<Chunkbase>>, mut shutdown_signal: Receiver<()>) {
let mut interval = time::interval(TimeDelta::seconds(30).to_std().unwrap());
loop {
select! {
_ = interval.tick() => {let _ = chunk_db.write().unwrap().delete_timed_out();},
_ = shutdown_signal.recv() => break,
};
}
} }

View file

@ -11,6 +11,8 @@ pub fn head(page_title: &str) -> Markup {
title { (page_title) } title { (page_title) }
link rel="icon" type="image/svg+xml" href="/resources/favicon.svg"; link rel="icon" type="image/svg+xml" href="/resources/favicon.svg";
link rel="stylesheet" href="/resources/main.css"; link rel="stylesheet" href="/resources/main.css";
link rel="preload" href="/resources/fonts/Roboto.woff2" as="font" type="font/woff2" crossorigin;
link rel="preload" href="/resources/fonts/FiraCode.woff2" as="font" type="font/woff2" crossorigin;
} }
} }

View file

@ -17,6 +17,10 @@ pub struct Settings {
#[serde(default)] #[serde(default)]
pub max_filesize: u64, pub max_filesize: u64,
/// Maximum filesize in bytes
#[serde(default)]
pub chunk_size: u64,
/// Is overwiting already uploaded files with the same hash allowed, or is /// Is overwiting already uploaded files with the same hash allowed, or is
/// this a no-op? /// this a no-op?
#[serde(default)] #[serde(default)]
@ -48,7 +52,8 @@ pub struct Settings {
impl Default for Settings { impl Default for Settings {
fn default() -> Self { fn default() -> Self {
Self { Self {
max_filesize: 1.megabytes().into(), // 128 MB max_filesize: 25.megabytes().into(), // 1 MB
chunk_size: 1.megabytes().into(),
overwrite: true, overwrite: true,
duration: DurationSettings::default(), duration: DurationSettings::default(),
server: ServerSettings::default(), server: ServerSettings::default(),
@ -81,11 +86,11 @@ impl Settings {
} }
pub fn save(&self) -> Result<(), io::Error> { pub fn save(&self) -> Result<(), io::Error> {
let mut out_path = self.path.clone(); let out_path = &self.path.with_extension("new");
out_path.set_extension(".bkp"); let mut file = File::create(out_path)?;
let mut file = File::create(&out_path).expect("Could not save!");
file.write_all(&toml::to_string_pretty(self).unwrap().into_bytes())?; file.write_all(&toml::to_string_pretty(self).unwrap().into_bytes())?;
// Overwrite the original DB with
fs::rename(out_path, &self.path).unwrap(); fs::rename(out_path, &self.path).unwrap();
Ok(()) Ok(())

View file

@ -1,7 +1,7 @@
use blake3::Hash; use blake3::Hash;
use std::path::Path; use std::path::Path;
/// Get the Blake3 hash of a file, without reading it all into memory, and also get the size /// Get the Blake3 hash of a file, without reading it all into memory
pub async fn hash_file<P: AsRef<Path>>(input: &P) -> Result<Hash, std::io::Error> { pub async fn hash_file<P: AsRef<Path>>(input: &P) -> Result<Hash, std::io::Error> {
let mut hasher = blake3::Hasher::new(); let mut hasher = blake3::Hasher::new();
hasher.update_mmap_rayon(input)?; hasher.update_mmap_rayon(input)?;

View file

@ -10,7 +10,7 @@ async function formSubmit() {
const duration = form.elements.duration.value; const duration = form.elements.duration.value;
const maxSize = form.elements.fileUpload.dataset.maxFilesize; const maxSize = form.elements.fileUpload.dataset.maxFilesize;
await sendFile(files, duration, maxSize); await sendFiles(files, duration, maxSize);
// Reset the form file data since we've successfully submitted it // Reset the form file data since we've successfully submitted it
form.elements.fileUpload.value = ""; form.elements.fileUpload.value = "";
@ -19,6 +19,7 @@ async function formSubmit() {
async function dragDropSubmit(evt) { async function dragDropSubmit(evt) {
const form = document.getElementById("uploadForm"); const form = document.getElementById("uploadForm");
const duration = form.elements.duration.value; const duration = form.elements.duration.value;
const maxSize = form.elements.fileUpload.dataset.maxFilesize;
evt.preventDefault(); evt.preventDefault();
@ -38,12 +39,13 @@ async function dragDropSubmit(evt) {
}); });
} }
await sendFile(files, duration); await sendFiles(files, duration, maxSize);
} }
async function pasteSubmit(evt) { async function pasteSubmit(evt) {
const form = document.getElementById("uploadForm"); const form = document.getElementById("uploadForm");
const duration = form.elements.duration.value; const duration = form.elements.duration.value;
const maxSize = form.elements.fileUpload.dataset.maxFilesize;
const files = []; const files = [];
const len = evt.clipboardData.files.length; const len = evt.clipboardData.files.length;
@ -52,45 +54,145 @@ async function pasteSubmit(evt) {
files.push(file); files.push(file);
} }
await sendFile(files, duration); await sendFiles(files, duration, maxSize);
} }
async function sendFile(files, duration, maxSize) { async function sendFiles(files, duration, maxSize) {
const inProgressUploads = new Set();
const concurrencyLimit = 10;
for (const file of files) { for (const file of files) {
const [linkRow, progressBar, progressText] = addNewToList(file.name); // Start the upload and add it to the set of in-progress uploads
const uploadPromise = uploadFile(file, duration, maxSize);
inProgressUploads.add(uploadPromise);
// Once an upload finishes, remove it from the set
uploadPromise.finally(() => inProgressUploads.delete(uploadPromise));
// If we reached the concurrency limit, wait for one of the uploads to complete
if (inProgressUploads.size >= concurrencyLimit) {
await Promise.race(inProgressUploads);
}
}
// Wait for any remaining uploads to complete
await Promise.allSettled(inProgressUploads);
}
async function uploadFile(file, duration, maxSize) {
const [linkRow, progressBar, progressText] = await addNewToList(file.name);
if (file.size > maxSize) { if (file.size > maxSize) {
makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT);
console.error("Provided file is too large", file.size, "bytes; max", maxSize, "bytes"); console.error("Provided file is too large", file.size, "bytes; max", maxSize, "bytes");
continue; makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT);
return;
} else if (file.size == 0) { } else if (file.size == 0) {
makeErrored(progressBar, progressText, linkRow, ZERO_TEXT);
console.error("Provided file has 0 bytes"); console.error("Provided file has 0 bytes");
continue; makeErrored(progressBar, progressText, linkRow, ZERO_TEXT);
return;
} }
const request = new XMLHttpRequest(); // Get preliminary upload information
request.open('POST', "./upload", true); let chunkedResponse;
// Set up event listeners
request.upload.addEventListener('progress',
(p) => {uploadProgress(p, progressBar, progressText, linkRow);}, false);
request.addEventListener('load',
(c) => {uploadComplete(c, progressBar, progressText, linkRow);}, false);
request.addEventListener('error',
(e) => {networkErrorHandler(e, progressBar, progressText, linkRow);}, false);
linkRow.classList.add("upload_inprogress");
// Create and send FormData
try { try {
const formData = new FormData(); const response = await fetch("/upload/chunked", {
formData.append("duration", duration); method: "POST",
formData.append("fileUpload", file); body: JSON.stringify({
request.send(formData); "name": file.name,
} catch (e) { "size": file.size,
makeErrored(progressBar, progressText, linkRow, ERROR_TEXT); "expire_duration": parseInt(duration),
console.error("An error occured while uploading", e); }),
});
if (!response.ok) {
throw new Error(`Response status: ${response.status}`);
} }
chunkedResponse = await response.json();
} catch (error) {
console.error(error);
makeErrored(progressBar, progressText, linkRow, ERROR_TEXT);
}
// Upload the file in `chunk_size` chunks
const chunkUploads = new Set();
const progressValues = [];
const concurrencyLimit = 4;
for (let start = 0; start < file.size; start += chunkedResponse.chunk_size) {
const chunk = file.slice(start, start + chunkedResponse.chunk_size)
const url = "/upload/chunked/" + chunkedResponse.uuid + "?offset=" + start;
const ID = progressValues.push(0);
let upload = new Promise(function (resolve, reject) {
let request = new XMLHttpRequest();
request.open("POST", url, true);
request.upload.addEventListener('progress',
(p) => {uploadProgress(p, progressBar, progressText, progressValues, file.size, ID);}, true
);
request.onload = (e) => {
if (e.target.status >= 200 && e.target.status < 300) {
resolve(request.response);
} else {
reject({status: e.target.status, statusText: request.statusText});
}
};
request.onerror = (e) => {
reject({status: e.target.status, statusText: request.statusText})
};
request.send(chunk);
});
chunkUploads.add(upload);
upload.finally(() => chunkUploads.delete(upload));
if (chunkUploads.size >= concurrencyLimit) {
await Promise.race(chunkUploads);
}
}
await Promise.allSettled(chunkUploads);
// Finish the request and update the progress box
const result = await fetch("/upload/chunked/" + chunkedResponse.uuid + "?finish");
uploadComplete(result, progressBar, progressText, linkRow);
}
async function addNewToList(origFileName) {
const uploadedFilesDisplay = document.getElementById("uploadedFilesDisplay");
const linkRow = uploadedFilesDisplay.appendChild(document.createElement("div"));
const fileName = linkRow.appendChild(document.createElement("p"));
const progressBar = linkRow.appendChild(document.createElement("progress"));
const progressTxt = linkRow.appendChild(document.createElement("p"));
fileName.textContent = origFileName;
fileName.classList.add("file_name");
progressTxt.classList.add("status");
progressBar.max="100";
progressBar.value="0";
return [linkRow, progressBar, progressTxt];
}
function uploadProgress(progress, progressBar, progressText, progressValues, fileSize, ID) {
if (progress.lengthComputable) {
progressValues[ID] = progress.loaded;
const progressPercent = Math.floor((progressValues.reduce((a, b) => a + b, 0) / fileSize) * 100);
if (progressPercent == 100) {
progressBar.removeAttribute("value");
progressText.textContent = "⏳";
} else {
progressBar.value = progressPercent;
progressText.textContent = progressPercent + "%";
}
}
}
async function uploadComplete(response, progressBar, progressText, linkRow) {
if (response.status === 200) {
const responseJson = await response.json();
console.log("Successfully uploaded file", responseJson);
makeFinished(progressBar, progressText, linkRow, responseJson);
} else if (response.status === 413) {
makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT);
} else {
makeErrored(progressBar, progressText, linkRow, ERROR_TEXT);
} }
} }
@ -128,60 +230,6 @@ function makeFinished(progressBar, progressText, linkRow, response) {
linkRow.classList.add("upload_done"); linkRow.classList.add("upload_done");
} }
function networkErrorHandler(err, progressBar, progressText, linkRow) {
makeErrored(progressBar, progressText, linkRow, "A network error occured");
console.error("A network error occured while uploading", err);
}
function uploadProgress(progress, progressBar, progressText, _linkRow) {
if (progress.lengthComputable) {
const progressPercent = Math.floor((progress.loaded / progress.total) * 100);
if (progressPercent == 100) {
progressBar.removeAttribute("value");
progressText.textContent = "⏳";
} else {
progressBar.value = progressPercent;
progressText.textContent = progressPercent + "%";
}
}
}
function uploadComplete(response, progressBar, progressText, linkRow) {
let target = response.target;
if (target.status === 200) {
const response = JSON.parse(target.responseText);
if (response.status) {
console.log("Successfully uploaded file", response);
makeFinished(progressBar, progressText, linkRow, response);
} else {
console.error("Error uploading", response);
makeErrored(progressBar, progressText, linkRow, response.response);
}
} else if (target.status === 413) {
makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT);
} else {
makeErrored(progressBar, progressText, linkRow, ERROR_TEXT);
}
}
function addNewToList(origFileName) {
const uploadedFilesDisplay = document.getElementById("uploadedFilesDisplay");
const linkRow = uploadedFilesDisplay.appendChild(document.createElement("div"));
const fileName = linkRow.appendChild(document.createElement("p"));
const progressBar = linkRow.appendChild(document.createElement("progress"));
const progressTxt = linkRow.appendChild(document.createElement("p"));
fileName.textContent = origFileName;
fileName.classList.add("file_name");
progressTxt.classList.add("status");
progressBar.max="100";
progressBar.value="0";
return [linkRow, progressBar, progressTxt];
}
async function initEverything() { async function initEverything() {
const durationBox = document.getElementById("durationBox"); const durationBox = document.getElementById("durationBox");
const durationButtons = durationBox.getElementsByTagName("button"); const durationButtons = durationBox.getElementsByTagName("button");
@ -190,7 +238,7 @@ async function initEverything() {
if (this.classList.contains("selected")) { if (this.classList.contains("selected")) {
return; return;
} }
document.getElementById("uploadForm").elements.duration.value = this.dataset.durationSeconds + "s"; document.getElementById("uploadForm").elements.duration.value = this.dataset.durationSeconds;
let selected = this.parentNode.getElementsByClassName("selected"); let selected = this.parentNode.getElementsByClassName("selected");
selected[0].classList.remove("selected"); selected[0].classList.remove("selected");
this.classList.add("selected"); this.classList.add("selected");