From 3e2e49bb79c349494578c3b7663b206294e502f0 Mon Sep 17 00:00:00 2001 From: G2-Games Date: Mon, 13 Jan 2025 22:31:11 -0600 Subject: [PATCH] Added websockets --- confetti-box/Cargo.toml | 1 + confetti-box/src/lib.rs | 111 +++++++++++++++++++++++++++++++++++- confetti-box/src/main.rs | 1 + confetti-box/web/request.js | 92 +++++++++++++++++++++++++++--- 4 files changed, 197 insertions(+), 8 deletions(-) diff --git a/confetti-box/Cargo.toml b/confetti-box/Cargo.toml index fede6bb..d36d70e 100644 --- a/confetti-box/Cargo.toml +++ b/confetti-box/Cargo.toml @@ -19,6 +19,7 @@ lz4_flex = "0.11.3" maud = { version = "0.26", features = ["rocket"] } rand = "0.8.5" rocket = { version = "0.5", features = ["json"] } +rocket_ws = "0.1.1" serde = { version = "1.0.213", features = ["derive"] } serde_with = { version = "3.11.0", features = ["chrono_0_4"] } toml = "0.8.19" diff --git a/confetti-box/src/lib.rs b/confetti-box/src/lib.rs index fa87eb2..7a4e1c4 100644 --- a/confetti-box/src/lib.rs +++ b/confetti-box/src/lib.rs @@ -20,7 +20,7 @@ use chrono::{TimeDelta, Utc}; use database::{Chunkbase, ChunkedInfo, Mmid, MochiFile, Mochibase}; use maud::{html, Markup, PreEscaped}; use rocket::{ - data::ToByteUnit, get, post, serde::{json::Json, Serialize}, tokio::{ + data::ToByteUnit, futures::{SinkExt as _, StreamExt as _}, get, post, serde::{json::{self, Json}, Serialize}, tokio::{ fs, io::{AsyncSeekExt, AsyncWriteExt} }, Data, State }; @@ -240,3 +240,112 @@ pub async fn chunked_upload_finish( Ok(Json(constructed_file)) } + +#[get("/upload/websocket?&&")] +pub async fn websocket_upload( + ws: rocket_ws::WebSocket, + main_db: &State>>, + chunk_db: &State>>, + settings: &State, + name: String, + size: u64, + duration: i64, // Duration in seconds +) -> Result, Json> { + let max_filesize = settings.max_filesize; + let expire_duration = TimeDelta::seconds(duration); + if size > max_filesize { + return Err(Json(ChunkedResponse::failure("File too large"))); + } + if settings.duration.restrict_to_allowed + && !settings + .duration + .allowed + .contains(&expire_duration) + { + return Err(Json(ChunkedResponse::failure("Duration not allowed"))); + } + if expire_duration > settings.duration.maximum { + return Err(Json(ChunkedResponse::failure("Duration too large"))); + } + + let file_info = ChunkedInfo { + name, + size, + expire_duration, + ..Default::default() + }; + + let uuid = chunk_db.write().unwrap().new_file( + file_info, + &settings.temp_dir, + TimeDelta::seconds(30) + ).map_err(|e| Json(ChunkedResponse::failure(e.to_string().as_str())))?; + let info = chunk_db.read().unwrap().get_file(&uuid).unwrap().clone(); + + let chunk_db = Arc::clone(chunk_db); + let main_db = Arc::clone(main_db); + let file_dir = settings.file_dir.clone(); + let mut file = fs::File::create(&info.1.path).await.unwrap(); + + Ok(ws.channel(move |mut stream| Box::pin(async move { + let mut offset = 0; + let mut hasher = blake3::Hasher::new(); + while let Some(message) = stream.next().await { + if let Ok(m) = message.as_ref() { + if m.is_empty() { + // We're finished here + break; + } + } + + let message = message.unwrap().into_data(); + offset += message.len() as u64; + if (offset > info.1.size) | (offset > max_filesize) { + break + } + + hasher.update(&message); + + stream.send(rocket_ws::Message::Text(json::serde_json::ser::to_string(&offset).unwrap())).await.unwrap(); + + file.write_all(&message).await.unwrap(); + file.flush().await?; + + chunk_db.write().unwrap().extend_timeout(&uuid, TimeDelta::seconds(30)); + } + + let now = Utc::now(); + let hash = hasher.finalize(); + let new_filename = file_dir.join(hash.to_string()); + + // If the hash does not exist in the database, + // move the file to the backend, else, delete it + // This also removes it from the chunk database + if main_db.read().unwrap().get_hash(&hash).is_none() { + chunk_db.write().unwrap().move_and_remove_file(&uuid, &new_filename)?; + } else { + chunk_db.write().unwrap().remove_file(&uuid)?; + } + + let mmid = Mmid::new_random(); + let file_type = file_format::FileFormat::from_file(&new_filename).unwrap(); + + let constructed_file = MochiFile::new( + mmid.clone(), + info.1.name, + file_type.media_type().to_string(), + hash, + now, + now + info.1.expire_duration, + ); + + main_db + .write() + .unwrap() + .insert(&mmid, constructed_file.clone()); + + stream.send(rocket_ws::Message::Text(json::serde_json::ser::to_string(&constructed_file).unwrap())).await?; + + Ok(()) + }))) +} diff --git a/confetti-box/src/main.rs b/confetti-box/src/main.rs index d625628..4ff148c 100644 --- a/confetti-box/src/main.rs +++ b/confetti-box/src/main.rs @@ -80,6 +80,7 @@ async fn main() { confetti_box::chunked_upload_start, confetti_box::chunked_upload_continue, confetti_box::chunked_upload_finish, + confetti_box::websocket_upload, endpoints::server_info, endpoints::file_info, endpoints::lookup_mmid, diff --git a/confetti-box/web/request.js b/confetti-box/web/request.js index 0b75b37..0b3c9d2 100644 --- a/confetti-box/web/request.js +++ b/confetti-box/web/request.js @@ -72,11 +72,19 @@ async function sendFiles(files, duration, maxSize) { } + let start = performance.now(); for (const file of files) { console.log("Started upload for", file.name); // Start the upload and add it to the set of in-progress uploads - const uploadPromise = uploadFile(file, duration, maxSize); + let uploadPromise; + if ('WebSocket' in window && window.WebSocket.CLOSING === 2) { + console.log("Uploading file using Websockets"); + uploadPromise = uploadFileWebsocket(file, duration, maxSize); + } else { + console.log("Uploading file using Chunks"); + uploadPromise = uploadFileChunked(file, duration, maxSize); + } inProgressUploads.add(uploadPromise); // Once an upload finishes, remove it from the set @@ -90,13 +98,15 @@ async function sendFiles(files, duration, maxSize) { // Wait for any remaining uploads to complete await Promise.allSettled(inProgressUploads); + let end = performance.now(); + console.log(end - start); wakeLock.release().then(() => { wakeLock = null; }); } -async function uploadFile(file, duration, maxSize) { +async function uploadFileChunked(file, duration, maxSize) { const [linkRow, progressBar, progressText] = await addNewToList(file.name); if (file.size > maxSize) { console.error("Provided file is too large", file.size, "bytes; max", maxSize, "bytes"); @@ -168,7 +178,64 @@ async function uploadFile(file, duration, maxSize) { // Finish the request and update the progress box const result = await fetch("/upload/chunked/" + chunkedResponse.uuid + "?finish"); - uploadComplete(result, progressBar, progressText, linkRow); + let responseJson = null; + if (result.status == 200) { + responseJson = await result.json() + } + uploadComplete(responseJson, result.status, progressBar, progressText, linkRow); +} + +async function uploadFileWebsocket(file, duration, maxSize) { + + const [linkRow, progressBar, progressText] = await addNewToList(file.name); + if (file.size > maxSize) { + console.error("Provided file is too large", file.size, "bytes; max", maxSize, "bytes"); + makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT); + return; + } else if (file.size == 0) { + console.error("Provided file has 0 bytes"); + makeErrored(progressBar, progressText, linkRow, ZERO_TEXT); + return; + } + + // Open the websocket connection + let loc = window.location, new_uri; + if (loc.protocol === "https:") { + new_uri = "wss:"; + } else { + new_uri = "ws:"; + } + new_uri += "//" + loc.host; + new_uri += loc.pathname + "/upload/websocket?name=" + file.name +"&size=" + file.size + "&duration=" + parseInt(duration); + const socket = new WebSocket(new_uri); + + const chunkSize = 10_000_000; + socket.addEventListener("open", (_event) => { + for (let chunk_num = 0; chunk_num < Math.floor(file.size / chunkSize) + 1; chunk_num ++) { + const offset = Math.floor(chunk_num * chunkSize); + const chunk = file.slice(offset, offset + chunkSize); + + socket.send(chunk); + } + + socket.send(""); + }); + + return new Promise(function(resolve, reject) { + socket.addEventListener("message", (event) => { + const response = JSON.parse(event.data); + if (response.mmid == null) { + const progress = parseInt(response); + uploadProgressWebsocket(progress, progressBar, progressText, file.size); + } else { + // It's so over + socket.close(); + + uploadComplete(response, 200, progressBar, progressText, linkRow); + resolve(); + } + }); + }); } async function addNewToList(origFileName) { @@ -204,12 +271,23 @@ function uploadProgress(progress, progressBar, progressText, progressValues, fil } } -async function uploadComplete(response, progressBar, progressText, linkRow) { - if (response.status === 200) { - const responseJson = await response.json(); +function uploadProgressWebsocket(bytesFinished, progressBar, progressText, fileSize) { + const progressPercent = Math.floor((bytesFinished / fileSize) * 100); + if (progressPercent == 100) { + progressBar.removeAttribute("value"); + progressText.textContent = "⏳"; + } else { + progressBar.value = bytesFinished; + progressBar.max = fileSize; + progressText.textContent = progressPercent + "%"; + } +} + +async function uploadComplete(responseJson, status, progressBar, progressText, linkRow) { + if (status === 200) { console.log("Successfully uploaded file", responseJson); makeFinished(progressBar, progressText, linkRow, responseJson); - } else if (response.status === 413) { + } else if (status === 413) { makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT); } else { makeErrored(progressBar, progressText, linkRow, ERROR_TEXT);