From 4f3250f655d08f028c2b2746aa30d0bfbb97b211 Mon Sep 17 00:00:00 2001 From: G2-Games Date: Thu, 7 Nov 2024 04:17:29 -0600 Subject: [PATCH] Fixed chunk database and cleaning --- src/database.rs | 70 +++++++++++++++++++++++++++++++++++++++++------- src/endpoints.rs | 7 +---- src/lib.rs | 58 +++++++++++++-------------------------- src/main.rs | 5 ++-- 4 files changed, 82 insertions(+), 58 deletions(-) diff --git a/src/database.rs b/src/database.rs index 51445ae..4124a08 100644 --- a/src/database.rs +++ b/src/database.rs @@ -14,7 +14,7 @@ use log::{error, info, warn}; use rand::distributions::{Alphanumeric, DistString}; use rocket::{ form::{self, FromFormField, ValueField}, - serde::{Deserialize, Serialize}, + serde::{Deserialize, Serialize}, tokio, }; use serde_with::{serde_as, DisplayFromStr}; use uuid::Uuid; @@ -341,14 +341,6 @@ pub struct Chunkbase { } impl Chunkbase { - pub fn chunks(&self) -> &HashMap, ChunkedInfo)> { - &self.chunks - } - - pub fn mut_chunks(&mut self) -> &mut HashMap, ChunkedInfo)> { - &mut self.chunks - } - /// Delete all temporary chunk files pub fn delete_all(&mut self) -> Result<(), io::Error> { for (_timeout, chunk) in self.chunks.values() { @@ -362,7 +354,7 @@ impl Chunkbase { pub fn delete_timed_out(&mut self) -> Result<(), io::Error> { let now = Utc::now(); - self.mut_chunks().retain(|_u, (t, c)| { + self.chunks.retain(|_u, (t, c)| { if *t <= now { let _ = fs::remove_file(&c.path); false @@ -373,6 +365,64 @@ impl Chunkbase { Ok(()) } + + pub fn new_file>(&mut self, mut info: ChunkedInfo, temp_dir: &P, timeout: TimeDelta) -> Result { + let uuid = Uuid::new_v4(); + let expire = Utc::now() + timeout; + info.path = temp_dir.as_ref().join(uuid.to_string()); + + self.chunks.insert(uuid, (expire, info.clone())); + + fs::File::create_new(&info.path)?; + + Ok(uuid) + } + + pub fn get_file(&self, uuid: &Uuid) -> Option<&(DateTime, ChunkedInfo)> { + self.chunks.get(&uuid) + } + + pub fn remove_file(&mut self, uuid: &Uuid) -> Result { + let item = match self.chunks.remove(uuid) { + Some(i) => i, + None => return Ok(false), + }; + + fs::remove_file(item.1.path)?; + + Ok(true) + } + + pub fn move_and_remove_file>(&mut self, uuid: &Uuid, new_location: &P) -> Result { + let item = match self.chunks.remove(uuid) { + Some(i) => i, + None => return Ok(false), + }; + + fs::rename(item.1.path, new_location)?; + + Ok(true) + } + + pub fn extend_timeout(&mut self, uuid: &Uuid, timeout: TimeDelta) -> bool { + let item = match self.chunks.get_mut(uuid) { + Some(i) => i, + None => return false, + }; + + item.0 = Utc::now() + timeout; + + true + } + + pub fn add_recieved_chunk(&mut self, uuid: &Uuid, chunk: u64) -> bool { + let item = match self.chunks.get_mut(uuid) { + Some(i) => i, + None => return false, + }; + + item.1.recieved_chunks.insert(chunk) + } } /// Information about how to manage partially uploaded chunks of files diff --git a/src/endpoints.rs b/src/endpoints.rs index fba217b..8803edf 100644 --- a/src/endpoints.rs +++ b/src/endpoints.rs @@ -4,12 +4,7 @@ use std::{ }; use rocket::{ - get, - http::ContentType, - response::Redirect, - serde::{self, json::Json}, - tokio::fs::File, - uri, State, + get, http::ContentType, response::{self, Redirect, Responder, Response}, serde::{self, json::Json}, tokio::fs::File, uri, Request, State }; use serde::Serialize; diff --git a/src/lib.rs b/src/lib.rs index 192aa7c..c66be3a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,14 +20,10 @@ use chrono::{TimeDelta, Utc}; use database::{Chunkbase, ChunkedInfo, Mmid, MochiFile, Mochibase}; use maud::{html, Markup, PreEscaped}; use rocket::{ - data::ToByteUnit, - get, post, - serde::{json::Json, Serialize}, - tokio::{ + data::ToByteUnit, get, post, serde::{json::Json, Serialize}, tokio::{ fs, io::{AsyncSeekExt, AsyncWriteExt}, - }, - Data, State, + }, Data, State }; use uuid::Uuid; @@ -105,11 +101,8 @@ impl ChunkedResponse { pub async fn chunked_upload_start( db: &State>>, settings: &State, - mut file_info: Json, + file_info: Json, ) -> Result, std::io::Error> { - let uuid = Uuid::new_v4(); - file_info.path = settings.temp_dir.join(uuid.to_string()); - // Perform some sanity checks if file_info.size > settings.max_filesize { return Ok(Json(ChunkedResponse::failure("File too large"))); @@ -126,12 +119,11 @@ pub async fn chunked_upload_start( return Ok(Json(ChunkedResponse::failure("Duration too large"))); } - fs::File::create_new(&file_info.path).await?; - - db.write().unwrap().mut_chunks().insert( - uuid, - (Utc::now() + TimeDelta::seconds(30), file_info.into_inner()), - ); + let uuid = db.write().unwrap().new_file( + file_info.0, + &settings.temp_dir, + TimeDelta::seconds(30) + )?; Ok(Json(ChunkedResponse { status: true, @@ -152,10 +144,11 @@ pub async fn chunked_upload_continue( let uuid = Uuid::parse_str(uuid).map_err(io::Error::other)?; let data_stream = data.open((settings.chunk_size + 100).bytes()); - let chunked_info = match chunk_db.read().unwrap().chunks().get(&uuid) { + let chunked_info = match chunk_db.read().unwrap().get_file(&uuid) { Some(s) => s.clone(), None => return Err(io::Error::other("Invalid UUID")), }; + if chunked_info.1.recieved_chunks.contains(&chunk) { return Err(io::Error::new(ErrorKind::Other, "Chunk already uploaded")); } @@ -181,22 +174,16 @@ pub async fn chunked_upload_continue( let position = file.stream_position().await?; if written > settings.chunk_size { - chunk_db.write().unwrap().mut_chunks().remove(&uuid); + chunk_db.write().unwrap().remove_file(&uuid)?; return Err(io::Error::other("Wrote more than one chunk")); } if position > chunked_info.1.size { - chunk_db.write().unwrap().mut_chunks().remove(&uuid); + chunk_db.write().unwrap().remove_file(&uuid)?; return Err(io::Error::other("File larger than expected")); } - chunk_db - .write() - .unwrap() - .mut_chunks() - .get_mut(&uuid) - .unwrap() - .1 - .recieved_chunks - .insert(chunk); + + chunk_db.write().unwrap().add_recieved_chunk(&uuid, chunk); + chunk_db.write().unwrap().extend_timeout(&uuid, TimeDelta::seconds(30)); Ok(()) } @@ -211,19 +198,11 @@ pub async fn chunked_upload_finish( ) -> Result, io::Error> { let now = Utc::now(); let uuid = Uuid::parse_str(uuid).map_err(io::Error::other)?; - let chunked_info = match chunk_db.read().unwrap().chunks().get(&uuid) { + let chunked_info = match chunk_db.read().unwrap().get_file(&uuid) { Some(s) => s.clone(), None => return Err(io::Error::other("Invalid UUID")), }; - // Remove the finished chunk from the db - chunk_db - .write() - .unwrap() - .mut_chunks() - .remove(&uuid) - .unwrap(); - if !chunked_info.1.path.try_exists().is_ok_and(|e| e) { return Err(io::Error::other("File does not exist")); } @@ -236,10 +215,11 @@ pub async fn chunked_upload_finish( // If the hash does not exist in the database, // move the file to the backend, else, delete it + // This also removes it from the chunk database if main_db.read().unwrap().get_hash(&hash).is_none() { - std::fs::rename(&chunked_info.1.path, &new_filename).unwrap(); + chunk_db.write().unwrap().move_and_remove_file(&uuid, &new_filename)?; } else { - std::fs::remove_file(&chunked_info.1.path).unwrap(); + chunk_db.write().unwrap().remove_file(&uuid)?; } let mmid = Mmid::new_random(); diff --git a/src/main.rs b/src/main.rs index 608ded3..d625628 100644 --- a/src/main.rs +++ b/src/main.rs @@ -56,10 +56,9 @@ async fn main() { async move { clean_loop(cleaner_db, file_path, rx).await } }); tokio::spawn({ - let cleaner_db = database.clone(); - let file_path = config.file_dir.clone(); + let chunk_db = local_chunk.clone(); let rx2 = shutdown.subscribe(); - async move { clean_loop(cleaner_db, file_path, rx2).await } + async move { clean_chunks(chunk_db, rx2).await } }); let rocket = rocket::build()