Skip to content

Commit 173cf7f

Browse files
metamejoepio
authored andcommitted
refactor
1 parent 2bf63af commit 173cf7f

File tree

2 files changed

+73
-37
lines changed

2 files changed

+73
-37
lines changed

server/src/files.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::{fmt, path::PathBuf, time::Duration};
22

3+
use actix_multipart::Field;
4+
use futures::StreamExt;
35
use opendal::{services::S3, Operator};
46
use tokio::fs::File;
57

@@ -21,7 +23,7 @@ pub struct S3Config {
2123

2224
#[derive(Clone, Debug)]
2325
pub struct FSConfig {
24-
path: PathBuf,
26+
pub path: PathBuf,
2527
}
2628

2729
impl FileStore {
@@ -86,6 +88,36 @@ impl fmt::Display for FileStore {
8688
}
8789
}
8890

91+
pub async fn s3_upload(
92+
file_store:&FileStore,
93+
file_id: &str,
94+
mut field: Field,
95+
) -> AtomicServerResult<i64> {
96+
let mut builder = S3::default();
97+
98+
if let FileStore::S3(config) = file_store {
99+
builder.bucket(&config.bucket);
100+
builder.root(&config.path);
101+
config.region.as_ref().map(|r| builder.region(&r));
102+
config.endpoint.as_ref().map(|e| builder.endpoint(&e));
103+
} else {
104+
return Err("Uploading to S3 but no S3 config provided".into());
105+
}
106+
107+
let op: Operator = Operator::new(builder)?.finish();
108+
let mut w = op.writer(file_id).await?;
109+
let mut len = 0;
110+
while let Some(chunk) = field.next().await {
111+
let data = chunk.map_err(|e| format!("Error while reading multipart data. {}", e))?;
112+
len = len + data.len();
113+
w.write(data).await?;
114+
}
115+
116+
let byte_length: i64 = len.try_into().map_err(|_e| "Too large")?;
117+
w.close().await?;
118+
Ok(byte_length)
119+
}
120+
89121
pub async fn s3_upload_object(
90122
file_store: &FileStore,
91123
file_id: &str,

server/src/handlers/upload.rs

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
1-
use std::{
2-
ffi::OsStr,
3-
fs::{self, File},
4-
io::Write,
5-
path::Path,
6-
};
1+
use std::{ffi::OsStr, fs::File, io::Write, path::Path};
72

8-
use actix_multipart::Multipart;
3+
use actix_multipart::{Multipart, Field};
94
use actix_web::{web, HttpResponse};
105
use atomic_lib::{
116
commit::CommitResponse, hierarchy::check_write, urls, utils::now, Resource, Storelike, Value,
127
};
8+
139
use futures::{StreamExt, TryStreamExt};
1410
use serde::Deserialize;
1511

1612
use crate::{
1713
appstate::AppState,
1814
errors::AtomicServerResult,
19-
files::{self, FileStore},
15+
files::{self, FSConfig, FileStore},
2016
helpers::get_client_agent,
2117
};
2218

@@ -54,43 +50,24 @@ pub async fn upload_handler(
5450
let mut created_resources: Vec<Resource> = Vec::new();
5551
let mut commit_responses: Vec<CommitResponse> = Vec::new();
5652

57-
while let Ok(Some(mut field)) = body.try_next().await {
53+
while let Ok(Some(field)) = body.try_next().await {
5854
let content_type = field.content_disposition().clone();
5955
let filename = content_type.get_filename().ok_or("Filename is missing")?;
6056

61-
std::fs::create_dir_all(&appstate.config.uploads_path)?;
62-
63-
let fs_file_id = format!(
64-
"{}-{}",
57+
let file_store = &appstate.file_store;
58+
let file_id = format!(
59+
"{}{}-{}",
60+
file_store.prefix(),
6561
now(),
6662
sanitize_filename::sanitize(filename)
6763
// Spacebars lead to very annoying bugs in browsers
6864
.replace(' ', "-")
6965
);
7066

71-
let mut file_path = appstate.config.uploads_path.clone();
72-
file_path.push(&fs_file_id);
73-
let mut file = File::create(&file_path)?;
74-
75-
// Field in turn is stream of *Bytes* object
76-
while let Some(chunk) = field.next().await {
77-
let data = chunk.map_err(|e| format!("Error while reading multipart data. {}", e))?;
78-
// TODO: Update a SHA256 hash here for checksum
79-
file.write_all(&data)?;
80-
}
81-
82-
let byte_count: i64 = file
83-
.metadata()?
84-
.len()
85-
.try_into()
86-
.map_err(|_e| "Too large")?;
87-
88-
let file_store = &appstate.file_store;
89-
let file_id = format!("{}{}", file_store.prefix(), &fs_file_id);
90-
if let FileStore::S3(_) = file_store {
91-
files::s3_upload_object(&file_store, &file_id, &file_path).await?;
92-
fs::remove_file(&file_path)?;
93-
}
67+
let byte_count = match file_store {
68+
FileStore::S3(_) => files::s3_upload(&file_store, &file_id, field).await?,
69+
FileStore::FS(config) => fs_upload(&file_store, &config, &file_id, field).await?,
70+
};
9471

9572
let subject_path = format!("files/{}", urlencoding::encode(&file_id));
9673
let new_subject = format!("{}/{}", store.get_server_url(), subject_path);
@@ -132,6 +109,33 @@ pub async fn upload_handler(
132109
)?))
133110
}
134111

112+
113+
async fn fs_upload(
114+
file_store: &FileStore,
115+
config: &FSConfig,
116+
file_id: &str,
117+
mut field: Field,
118+
) -> AtomicServerResult<i64> {
119+
std::fs::create_dir_all(config.path.clone())?;
120+
121+
let mut file = File::create(file_store.get_fs_file_path(file_id)?)?;
122+
123+
let byte_count: i64 = file
124+
.metadata()?
125+
.len()
126+
.try_into()
127+
.map_err(|_e| "Too large")?;
128+
129+
// Field in turn is stream of *Bytes* object
130+
while let Some(chunk) = field.next().await {
131+
let data = chunk.map_err(|e| format!("Error while reading multipart data. {}", e))?;
132+
// TODO: Update a SHA256 hash here for checksum
133+
file.write_all(&data)?;
134+
}
135+
136+
Ok(byte_count)
137+
}
138+
135139
fn guess_mime_for_filename(filename: &str) -> String {
136140
if let Some(ext) = get_extension_from_filename(filename) {
137141
actix_files::file_extension_to_mime(ext).to_string()

0 commit comments

Comments
 (0)