use std::path::PathBuf; use axum::{ body::Body, extract::{Path, Query, State}, routing::{delete, get, post}, Json, Router, }; use axum_extra::TypedHeader; use futures_util::TryStreamExt; use headers::ContentType; use http::StatusCode; use mime::Mime; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use sqlx::query; use tokio::{fs, io}; use tokio_util::io::StreamReader; use tracing::{error, field, info, instrument}; use ulid::Ulid; use uuid::Uuid; use crate::{app::SharedState, error::AppError}; pub fn resource() -> Router { Router::new() .route("/files", post(upload_file)) .route("/files/:file_id", get(get_file_info)) .route("/files/:file_id", delete(delete_file)) .route("/files/:file_id/keys/", post(create_file_key)) .route("/files/:file_id/keys/:key_id", delete(delete_file_key)) } #[derive(Serialize)] struct File { id: Ulid, hash: String, mime: String, keys: Vec, } #[derive(Serialize)] struct NewFile { id: Ulid, hash: String, mime: String, key: Option, } #[derive(Deserialize)] struct UploadFileOptions { #[serde(default)] create_key: bool, } #[instrument(skip_all)] async fn upload_file( State(SharedState { db, config }): State, Query(UploadFileOptions { create_key }): Query, TypedHeader(content_type): TypedHeader, body: Body, ) -> Result, AppError> { let id = Ulid::new(); let path_temp = config.file_temp_dir.join(id.to_string()); let mut hasher = Sha256::new(); { let mut file_temp = fs::File::create(&path_temp).await?; let better_body = body .into_data_stream() .inspect_ok(|b| hasher.update(b)) .map_err(|err| io::Error::new(io::ErrorKind::Other, err)); let mut reader = StreamReader::new(better_body); if let Err(err) = io::copy(&mut reader, &mut file_temp).await { error!( err = field::display(&err), file_path = field::debug(&path_temp), "failed to copy file, removing", ); drop(file_temp); if let Err(err) = fs::remove_file(path_temp).await { error!( err = field::display(err), "failed to remove failed upload file", ); } return Err(err.into()); } } let hash = hasher.finalize(); let hash_hex = hex::encode(hash); let path_hash = PathBuf::from("files").join(&hash_hex); if fs::try_exists(&path_hash).await? { info!(hash = hash_hex, "file already exists"); if let Err(err) = fs::remove_file(&path_temp).await { error!(err = field::display(&err), "failed to remove temp file"); } } else if let Err(err) = fs::rename(&path_temp, &path_hash).await { error!(err = field::display(&err), "failed to move finished file"); if let Err(err) = fs::remove_file(&path_temp).await { error!( err = field::display(&err), "failed to remove file after failed move", ); } return Err(err.into()); } let mime = Into::::into(content_type); let mime_str = mime.to_string(); let mut tx = db.begin().await?; match query!( "INSERT INTO file (id, hash, mime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", Uuid::from(id), &hash[..], mime_str, ) .execute(&mut *tx) .await? .rows_affected() { 0 | 1 => {} rows => return Err(AppError::ImpossibleAffectedRows(rows)), } // `ON CONFLICT DO NOTHING RETURNING id` only works when there *isn't* a // conflict let id = query!("SELECT id FROM file WHERE hash = $1", &hash[..]) .fetch_one(&mut *tx) .await? .id .into(); let mut key_opt = None; if create_key { let key = Ulid::new(); key_opt = Some(key); match query!( "INSERT INTO file_key (id, file_id) VALUES ($1, $2)", Uuid::from(key), Uuid::from(id), ) .execute(&mut *tx) .await? .rows_affected() { 1 => {} 0 => return Err(AppError::UlidConflict(key)), rows => return Err(AppError::ImpossibleAffectedRows(rows)), } } tx.commit().await?; Ok(Json(NewFile { id, hash: hash_hex, mime: mime_str, key: key_opt, })) } async fn get_file_info( State(SharedState { db, .. }): State, Path(id): Path, ) -> Result, AppError> { let (file, keys) = tokio::try_join!( query!( "SELECT id, hash, mime FROM file WHERE id = $1", Uuid::from(id), ) .fetch_optional(&db), query!("SELECT id FROM file_key WHERE file_id = $1", Uuid::from(id)).fetch_all(&db), )?; match file { Some(r) => Ok(Json(File { id, hash: hex::encode(r.hash), mime: r.mime, keys: keys.into_iter().map(|r| r.id.into()).collect(), })), None => Err(AppError::FileNotFound(id)), } } #[instrument(skip_all)] async fn delete_file( State(SharedState { db, config }): State, Path(file_id): Path, ) -> Result { let file_hash = query!( "DELETE FROM file WHERE id = $1 RETURNING hash", Uuid::from(file_id) ) .fetch_optional(&db) .await? .ok_or(AppError::FileNotFound(file_id))? .hash; let file_path = config.file_store_dir.join(hex::encode(file_hash)); if let Err(err) = fs::remove_file(file_path).await { error!(err = field::display(err), "failed to remove file"); } Ok(StatusCode::NO_CONTENT) } async fn create_file_key( State(SharedState { db, .. }): State, Path(file_id): Path, ) -> Result<(StatusCode, Json), AppError> { let key_id = Ulid::new(); match query!( "INSERT INTO file_key (id, file_id) VALUES ($1, $2)", Uuid::from(key_id), Uuid::from(file_id), ) .execute(&db) .await? .rows_affected() { 1 => Ok((StatusCode::CREATED, Json(key_id))), rows => Err(AppError::ImpossibleAffectedRows(rows)), } } async fn delete_file_key( State(SharedState { db, .. }): State, Path((file_id, key_id)): Path<(Ulid, Ulid)>, ) -> Result { match query!( "DELETE FROM file_key WHERE id = $1 AND file_id = $2", Uuid::from(key_id), Uuid::from(file_id), ) .execute(&db) .await? .rows_affected() { 1 => Ok(StatusCode::NO_CONTENT), 0 => Err(AppError::FileKeyNotFound(key_id)), rows => Err(AppError::ImpossibleAffectedRows(rows)), } }