Fixed chunk database and cleaning

This commit is contained in:
G2-Games 2024-11-07 04:17:29 -06:00
parent 9fea95010f
commit 4f3250f655
4 changed files with 82 additions and 58 deletions

View file

@ -14,7 +14,7 @@ use log::{error, info, warn};
use rand::distributions::{Alphanumeric, DistString}; use rand::distributions::{Alphanumeric, DistString};
use rocket::{ use rocket::{
form::{self, FromFormField, ValueField}, form::{self, FromFormField, ValueField},
serde::{Deserialize, Serialize}, serde::{Deserialize, Serialize}, tokio,
}; };
use serde_with::{serde_as, DisplayFromStr}; use serde_with::{serde_as, DisplayFromStr};
use uuid::Uuid; use uuid::Uuid;
@ -341,14 +341,6 @@ pub struct Chunkbase {
} }
impl Chunkbase { impl Chunkbase {
pub fn chunks(&self) -> &HashMap<Uuid, (DateTime<Utc>, ChunkedInfo)> {
&self.chunks
}
pub fn mut_chunks(&mut self) -> &mut HashMap<Uuid, (DateTime<Utc>, ChunkedInfo)> {
&mut self.chunks
}
/// Delete all temporary chunk files /// Delete all temporary chunk files
pub fn delete_all(&mut self) -> Result<(), io::Error> { pub fn delete_all(&mut self) -> Result<(), io::Error> {
for (_timeout, chunk) in self.chunks.values() { for (_timeout, chunk) in self.chunks.values() {
@ -362,7 +354,7 @@ impl Chunkbase {
pub fn delete_timed_out(&mut self) -> Result<(), io::Error> { pub fn delete_timed_out(&mut self) -> Result<(), io::Error> {
let now = Utc::now(); let now = Utc::now();
self.mut_chunks().retain(|_u, (t, c)| { self.chunks.retain(|_u, (t, c)| {
if *t <= now { if *t <= now {
let _ = fs::remove_file(&c.path); let _ = fs::remove_file(&c.path);
false false
@ -373,6 +365,64 @@ impl Chunkbase {
Ok(()) Ok(())
} }
pub fn new_file<P: AsRef<Path>>(&mut self, mut info: ChunkedInfo, temp_dir: &P, timeout: TimeDelta) -> Result<Uuid, io::Error> {
let uuid = Uuid::new_v4();
let expire = Utc::now() + timeout;
info.path = temp_dir.as_ref().join(uuid.to_string());
self.chunks.insert(uuid, (expire, info.clone()));
fs::File::create_new(&info.path)?;
Ok(uuid)
}
pub fn get_file(&self, uuid: &Uuid) -> Option<&(DateTime<Utc>, ChunkedInfo)> {
self.chunks.get(&uuid)
}
pub fn remove_file(&mut self, uuid: &Uuid) -> Result<bool, io::Error> {
let item = match self.chunks.remove(uuid) {
Some(i) => i,
None => return Ok(false),
};
fs::remove_file(item.1.path)?;
Ok(true)
}
pub fn move_and_remove_file<P: AsRef<Path>>(&mut self, uuid: &Uuid, new_location: &P) -> Result<bool, io::Error> {
let item = match self.chunks.remove(uuid) {
Some(i) => i,
None => return Ok(false),
};
fs::rename(item.1.path, new_location)?;
Ok(true)
}
pub fn extend_timeout(&mut self, uuid: &Uuid, timeout: TimeDelta) -> bool {
let item = match self.chunks.get_mut(uuid) {
Some(i) => i,
None => return false,
};
item.0 = Utc::now() + timeout;
true
}
pub fn add_recieved_chunk(&mut self, uuid: &Uuid, chunk: u64) -> bool {
let item = match self.chunks.get_mut(uuid) {
Some(i) => i,
None => return false,
};
item.1.recieved_chunks.insert(chunk)
}
} }
/// Information about how to manage partially uploaded chunks of files /// Information about how to manage partially uploaded chunks of files

View file

@ -4,12 +4,7 @@ use std::{
}; };
use rocket::{ use rocket::{
get, get, http::ContentType, response::{self, Redirect, Responder, Response}, serde::{self, json::Json}, tokio::fs::File, uri, Request, State
http::ContentType,
response::Redirect,
serde::{self, json::Json},
tokio::fs::File,
uri, State,
}; };
use serde::Serialize; use serde::Serialize;

View file

@ -20,14 +20,10 @@ use chrono::{TimeDelta, Utc};
use database::{Chunkbase, ChunkedInfo, Mmid, MochiFile, Mochibase}; use database::{Chunkbase, ChunkedInfo, Mmid, MochiFile, Mochibase};
use maud::{html, Markup, PreEscaped}; use maud::{html, Markup, PreEscaped};
use rocket::{ use rocket::{
data::ToByteUnit, data::ToByteUnit, get, post, serde::{json::Json, Serialize}, tokio::{
get, post,
serde::{json::Json, Serialize},
tokio::{
fs, fs,
io::{AsyncSeekExt, AsyncWriteExt}, io::{AsyncSeekExt, AsyncWriteExt},
}, }, Data, State
Data, State,
}; };
use uuid::Uuid; use uuid::Uuid;
@ -105,11 +101,8 @@ impl ChunkedResponse {
pub async fn chunked_upload_start( pub async fn chunked_upload_start(
db: &State<Arc<RwLock<Chunkbase>>>, db: &State<Arc<RwLock<Chunkbase>>>,
settings: &State<Settings>, settings: &State<Settings>,
mut file_info: Json<ChunkedInfo>, file_info: Json<ChunkedInfo>,
) -> Result<Json<ChunkedResponse>, std::io::Error> { ) -> Result<Json<ChunkedResponse>, std::io::Error> {
let uuid = Uuid::new_v4();
file_info.path = settings.temp_dir.join(uuid.to_string());
// Perform some sanity checks // Perform some sanity checks
if file_info.size > settings.max_filesize { if file_info.size > settings.max_filesize {
return Ok(Json(ChunkedResponse::failure("File too large"))); return Ok(Json(ChunkedResponse::failure("File too large")));
@ -126,12 +119,11 @@ pub async fn chunked_upload_start(
return Ok(Json(ChunkedResponse::failure("Duration too large"))); return Ok(Json(ChunkedResponse::failure("Duration too large")));
} }
fs::File::create_new(&file_info.path).await?; let uuid = db.write().unwrap().new_file(
file_info.0,
db.write().unwrap().mut_chunks().insert( &settings.temp_dir,
uuid, TimeDelta::seconds(30)
(Utc::now() + TimeDelta::seconds(30), file_info.into_inner()), )?;
);
Ok(Json(ChunkedResponse { Ok(Json(ChunkedResponse {
status: true, status: true,
@ -152,10 +144,11 @@ pub async fn chunked_upload_continue(
let uuid = Uuid::parse_str(uuid).map_err(io::Error::other)?; let uuid = Uuid::parse_str(uuid).map_err(io::Error::other)?;
let data_stream = data.open((settings.chunk_size + 100).bytes()); let data_stream = data.open((settings.chunk_size + 100).bytes());
let chunked_info = match chunk_db.read().unwrap().chunks().get(&uuid) { let chunked_info = match chunk_db.read().unwrap().get_file(&uuid) {
Some(s) => s.clone(), Some(s) => s.clone(),
None => return Err(io::Error::other("Invalid UUID")), None => return Err(io::Error::other("Invalid UUID")),
}; };
if chunked_info.1.recieved_chunks.contains(&chunk) { if chunked_info.1.recieved_chunks.contains(&chunk) {
return Err(io::Error::new(ErrorKind::Other, "Chunk already uploaded")); return Err(io::Error::new(ErrorKind::Other, "Chunk already uploaded"));
} }
@ -181,22 +174,16 @@ pub async fn chunked_upload_continue(
let position = file.stream_position().await?; let position = file.stream_position().await?;
if written > settings.chunk_size { if written > settings.chunk_size {
chunk_db.write().unwrap().mut_chunks().remove(&uuid); chunk_db.write().unwrap().remove_file(&uuid)?;
return Err(io::Error::other("Wrote more than one chunk")); return Err(io::Error::other("Wrote more than one chunk"));
} }
if position > chunked_info.1.size { if position > chunked_info.1.size {
chunk_db.write().unwrap().mut_chunks().remove(&uuid); chunk_db.write().unwrap().remove_file(&uuid)?;
return Err(io::Error::other("File larger than expected")); return Err(io::Error::other("File larger than expected"));
} }
chunk_db
.write() chunk_db.write().unwrap().add_recieved_chunk(&uuid, chunk);
.unwrap() chunk_db.write().unwrap().extend_timeout(&uuid, TimeDelta::seconds(30));
.mut_chunks()
.get_mut(&uuid)
.unwrap()
.1
.recieved_chunks
.insert(chunk);
Ok(()) Ok(())
} }
@ -211,19 +198,11 @@ pub async fn chunked_upload_finish(
) -> Result<Json<MochiFile>, io::Error> { ) -> Result<Json<MochiFile>, io::Error> {
let now = Utc::now(); let now = Utc::now();
let uuid = Uuid::parse_str(uuid).map_err(io::Error::other)?; let uuid = Uuid::parse_str(uuid).map_err(io::Error::other)?;
let chunked_info = match chunk_db.read().unwrap().chunks().get(&uuid) { let chunked_info = match chunk_db.read().unwrap().get_file(&uuid) {
Some(s) => s.clone(), Some(s) => s.clone(),
None => return Err(io::Error::other("Invalid UUID")), None => return Err(io::Error::other("Invalid UUID")),
}; };
// Remove the finished chunk from the db
chunk_db
.write()
.unwrap()
.mut_chunks()
.remove(&uuid)
.unwrap();
if !chunked_info.1.path.try_exists().is_ok_and(|e| e) { if !chunked_info.1.path.try_exists().is_ok_and(|e| e) {
return Err(io::Error::other("File does not exist")); return Err(io::Error::other("File does not exist"));
} }
@ -236,10 +215,11 @@ pub async fn chunked_upload_finish(
// If the hash does not exist in the database, // If the hash does not exist in the database,
// move the file to the backend, else, delete it // move the file to the backend, else, delete it
// This also removes it from the chunk database
if main_db.read().unwrap().get_hash(&hash).is_none() { if main_db.read().unwrap().get_hash(&hash).is_none() {
std::fs::rename(&chunked_info.1.path, &new_filename).unwrap(); chunk_db.write().unwrap().move_and_remove_file(&uuid, &new_filename)?;
} else { } else {
std::fs::remove_file(&chunked_info.1.path).unwrap(); chunk_db.write().unwrap().remove_file(&uuid)?;
} }
let mmid = Mmid::new_random(); let mmid = Mmid::new_random();

View file

@ -56,10 +56,9 @@ async fn main() {
async move { clean_loop(cleaner_db, file_path, rx).await } async move { clean_loop(cleaner_db, file_path, rx).await }
}); });
tokio::spawn({ tokio::spawn({
let cleaner_db = database.clone(); let chunk_db = local_chunk.clone();
let file_path = config.file_dir.clone();
let rx2 = shutdown.subscribe(); let rx2 = shutdown.subscribe();
async move { clean_loop(cleaner_db, file_path, rx2).await } async move { clean_chunks(chunk_db, rx2).await }
}); });
let rocket = rocket::build() let rocket = rocket::build()