Added websockets

This commit is contained in:
G2-Games 2025-01-13 22:31:11 -06:00
parent e6c0f84f4a
commit 3e2e49bb79
4 changed files with 197 additions and 8 deletions

View file

@ -19,6 +19,7 @@ lz4_flex = "0.11.3"
maud = { version = "0.26", features = ["rocket"] }
rand = "0.8.5"
rocket = { version = "0.5", features = ["json"] }
rocket_ws = "0.1.1"
serde = { version = "1.0.213", features = ["derive"] }
serde_with = { version = "3.11.0", features = ["chrono_0_4"] }
toml = "0.8.19"

View file

@ -20,7 +20,7 @@ use chrono::{TimeDelta, Utc};
use database::{Chunkbase, ChunkedInfo, Mmid, MochiFile, Mochibase};
use maud::{html, Markup, PreEscaped};
use rocket::{
data::ToByteUnit, get, post, serde::{json::Json, Serialize}, tokio::{
data::ToByteUnit, futures::{SinkExt as _, StreamExt as _}, get, post, serde::{json::{self, Json}, Serialize}, tokio::{
fs, io::{AsyncSeekExt, AsyncWriteExt}
}, Data, State
};
@ -240,3 +240,112 @@ pub async fn chunked_upload_finish(
Ok(Json(constructed_file))
}
#[get("/upload/websocket?<name>&<size>&<duration>")]
pub async fn websocket_upload(
ws: rocket_ws::WebSocket,
main_db: &State<Arc<RwLock<Mochibase>>>,
chunk_db: &State<Arc<RwLock<Chunkbase>>>,
settings: &State<Settings>,
name: String,
size: u64,
duration: i64, // Duration in seconds
) -> Result<rocket_ws::Channel<'static>, Json<ChunkedResponse>> {
let max_filesize = settings.max_filesize;
let expire_duration = TimeDelta::seconds(duration);
if size > max_filesize {
return Err(Json(ChunkedResponse::failure("File too large")));
}
if settings.duration.restrict_to_allowed
&& !settings
.duration
.allowed
.contains(&expire_duration)
{
return Err(Json(ChunkedResponse::failure("Duration not allowed")));
}
if expire_duration > settings.duration.maximum {
return Err(Json(ChunkedResponse::failure("Duration too large")));
}
let file_info = ChunkedInfo {
name,
size,
expire_duration,
..Default::default()
};
let uuid = chunk_db.write().unwrap().new_file(
file_info,
&settings.temp_dir,
TimeDelta::seconds(30)
).map_err(|e| Json(ChunkedResponse::failure(e.to_string().as_str())))?;
let info = chunk_db.read().unwrap().get_file(&uuid).unwrap().clone();
let chunk_db = Arc::clone(chunk_db);
let main_db = Arc::clone(main_db);
let file_dir = settings.file_dir.clone();
let mut file = fs::File::create(&info.1.path).await.unwrap();
Ok(ws.channel(move |mut stream| Box::pin(async move {
let mut offset = 0;
let mut hasher = blake3::Hasher::new();
while let Some(message) = stream.next().await {
if let Ok(m) = message.as_ref() {
if m.is_empty() {
// We're finished here
break;
}
}
let message = message.unwrap().into_data();
offset += message.len() as u64;
if (offset > info.1.size) | (offset > max_filesize) {
break
}
hasher.update(&message);
stream.send(rocket_ws::Message::Text(json::serde_json::ser::to_string(&offset).unwrap())).await.unwrap();
file.write_all(&message).await.unwrap();
file.flush().await?;
chunk_db.write().unwrap().extend_timeout(&uuid, TimeDelta::seconds(30));
}
let now = Utc::now();
let hash = hasher.finalize();
let new_filename = file_dir.join(hash.to_string());
// If the hash does not exist in the database,
// move the file to the backend, else, delete it
// This also removes it from the chunk database
if main_db.read().unwrap().get_hash(&hash).is_none() {
chunk_db.write().unwrap().move_and_remove_file(&uuid, &new_filename)?;
} else {
chunk_db.write().unwrap().remove_file(&uuid)?;
}
let mmid = Mmid::new_random();
let file_type = file_format::FileFormat::from_file(&new_filename).unwrap();
let constructed_file = MochiFile::new(
mmid.clone(),
info.1.name,
file_type.media_type().to_string(),
hash,
now,
now + info.1.expire_duration,
);
main_db
.write()
.unwrap()
.insert(&mmid, constructed_file.clone());
stream.send(rocket_ws::Message::Text(json::serde_json::ser::to_string(&constructed_file).unwrap())).await?;
Ok(())
})))
}

View file

@ -80,6 +80,7 @@ async fn main() {
confetti_box::chunked_upload_start,
confetti_box::chunked_upload_continue,
confetti_box::chunked_upload_finish,
confetti_box::websocket_upload,
endpoints::server_info,
endpoints::file_info,
endpoints::lookup_mmid,

View file

@ -72,11 +72,19 @@ async function sendFiles(files, duration, maxSize) {
}
let start = performance.now();
for (const file of files) {
console.log("Started upload for", file.name);
// Start the upload and add it to the set of in-progress uploads
const uploadPromise = uploadFile(file, duration, maxSize);
let uploadPromise;
if ('WebSocket' in window && window.WebSocket.CLOSING === 2) {
console.log("Uploading file using Websockets");
uploadPromise = uploadFileWebsocket(file, duration, maxSize);
} else {
console.log("Uploading file using Chunks");
uploadPromise = uploadFileChunked(file, duration, maxSize);
}
inProgressUploads.add(uploadPromise);
// Once an upload finishes, remove it from the set
@ -90,13 +98,15 @@ async function sendFiles(files, duration, maxSize) {
// Wait for any remaining uploads to complete
await Promise.allSettled(inProgressUploads);
let end = performance.now();
console.log(end - start);
wakeLock.release().then(() => {
wakeLock = null;
});
}
async function uploadFile(file, duration, maxSize) {
async function uploadFileChunked(file, duration, maxSize) {
const [linkRow, progressBar, progressText] = await addNewToList(file.name);
if (file.size > maxSize) {
console.error("Provided file is too large", file.size, "bytes; max", maxSize, "bytes");
@ -168,7 +178,64 @@ async function uploadFile(file, duration, maxSize) {
// Finish the request and update the progress box
const result = await fetch("/upload/chunked/" + chunkedResponse.uuid + "?finish");
uploadComplete(result, progressBar, progressText, linkRow);
let responseJson = null;
if (result.status == 200) {
responseJson = await result.json()
}
uploadComplete(responseJson, result.status, progressBar, progressText, linkRow);
}
async function uploadFileWebsocket(file, duration, maxSize) {
const [linkRow, progressBar, progressText] = await addNewToList(file.name);
if (file.size > maxSize) {
console.error("Provided file is too large", file.size, "bytes; max", maxSize, "bytes");
makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT);
return;
} else if (file.size == 0) {
console.error("Provided file has 0 bytes");
makeErrored(progressBar, progressText, linkRow, ZERO_TEXT);
return;
}
// Open the websocket connection
let loc = window.location, new_uri;
if (loc.protocol === "https:") {
new_uri = "wss:";
} else {
new_uri = "ws:";
}
new_uri += "//" + loc.host;
new_uri += loc.pathname + "/upload/websocket?name=" + file.name +"&size=" + file.size + "&duration=" + parseInt(duration);
const socket = new WebSocket(new_uri);
const chunkSize = 10_000_000;
socket.addEventListener("open", (_event) => {
for (let chunk_num = 0; chunk_num < Math.floor(file.size / chunkSize) + 1; chunk_num ++) {
const offset = Math.floor(chunk_num * chunkSize);
const chunk = file.slice(offset, offset + chunkSize);
socket.send(chunk);
}
socket.send("");
});
return new Promise(function(resolve, reject) {
socket.addEventListener("message", (event) => {
const response = JSON.parse(event.data);
if (response.mmid == null) {
const progress = parseInt(response);
uploadProgressWebsocket(progress, progressBar, progressText, file.size);
} else {
// It's so over
socket.close();
uploadComplete(response, 200, progressBar, progressText, linkRow);
resolve();
}
});
});
}
async function addNewToList(origFileName) {
@ -204,12 +271,23 @@ function uploadProgress(progress, progressBar, progressText, progressValues, fil
}
}
async function uploadComplete(response, progressBar, progressText, linkRow) {
if (response.status === 200) {
const responseJson = await response.json();
function uploadProgressWebsocket(bytesFinished, progressBar, progressText, fileSize) {
const progressPercent = Math.floor((bytesFinished / fileSize) * 100);
if (progressPercent == 100) {
progressBar.removeAttribute("value");
progressText.textContent = "⏳";
} else {
progressBar.value = bytesFinished;
progressBar.max = fileSize;
progressText.textContent = progressPercent + "%";
}
}
async function uploadComplete(responseJson, status, progressBar, progressText, linkRow) {
if (status === 200) {
console.log("Successfully uploaded file", responseJson);
makeFinished(progressBar, progressText, linkRow, responseJson);
} else if (response.status === 413) {
} else if (status === 413) {
makeErrored(progressBar, progressText, linkRow, TOO_LARGE_TEXT);
} else {
makeErrored(progressBar, progressText, linkRow, ERROR_TEXT);