Skip to content

Commit d9403f9

Browse files
metamejoepio
authored andcommitted
S3 Support #543
added s3 config vars s3 uploads working prefix file-id with store type cargo format download building download working possibly fix issue with : refactor files logic into files module static str lovelies clean up log changelog and readme updates use tokio streams in upload check bucket exists before unwrap Store config in FileStore cargo fmt consolidate download fs path logic move file stores to appstate no unwrap refactor finish refactor tests passing back to nicer interface documentation move test to tests.rs readme and dep fix
1 parent 23994ad commit d9403f9

File tree

13 files changed

+1734
-890
lines changed

13 files changed

+1734
-890
lines changed

Cargo.lock

Lines changed: 1383 additions & 853 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ _Status: alpha. [Breaking changes](CHANGELOG.md) are expected until 1.0._
3939
- 📖 **Pagination, sorting and filtering** queries using [Atomic Collections](https://docs.atomicdata.dev/schema/collections.html).
4040
- 🔐 **Authorization** (read / write permissions) and Hierarchical structures powered by [Atomic Hierarchy](https://docs.atomicdata.dev/hierarchy.html)
4141
- 📲 **Invite and sharing system** with [Atomic Invites](https://docs.atomicdata.dev/invitations.html)
42-
- 📂 **File management**: Upload, download and preview attachments.
42+
- 📂 **File management**: Upload, download and preview attachments with support for using S3 as file storage backend.
4343
- 🖥️ **Desktop app**: Easy desktop installation, with status bar icon, powered by [tauri](https://github.com/tauri-apps/tauri/).
4444
- 📚 **Libraries**: [Javascript / Typescript](https://www.npmjs.com/package/@tomic/lib), [React](https://www.npmjs.com/package/@tomic/react), [Svelte](https://www.npmjs.com/package/@tomic/svelte)
4545

@@ -51,6 +51,7 @@ https://user-images.githubusercontent.com/2183313/139728539-d69b899f-6f9b-44cb-a
5151

5252
Check out the [documentation](http://docs.atomicdata.dev/atomicserver/intro.html) for installation instructions, API docs, and more.
5353

54+
### Configuring S3 for File Storage
5455
## Contribute
5556

5657
Issues and PRs are welcome!

docs/src/atomicserver/installation.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,20 @@ ATOMIC_HTTPS=false
106106
ATOMIC_SERVER_URL=https://example.com
107107
```
108108

109+
### Configuring S3 for File Storage
110+
You can configure atomic-server to use S3 (and compatible services) for file storage via environment variables or command line arguments when starting atomic-server.
111+
112+
Credentials can either be found in the standard location for AWS credentials on your OS (e.g. `~/.aws/credentials` on UNIX systems) or by using the environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`.
113+
114+
Available configuration values:
115+
116+
- bucket: `--s3-bucket="my-bucket-name"` or env var `ATOMIC_S3_BUCKET` (required)
117+
- region: `--s3-region="us-east-2"` or env var `ATOMIC_S3_REGION`
118+
- endpoint: `--s3-endpoint="https://s3.us-east-2.amazonaws.com"` or env var `ATOMIC_S3_ENDPOINT`
119+
0 path: `--s3-path="atomic_uploads"` or env var `ATOMIC_S3_PATH`
120+
121+
For example, the above configuration would uploads files to `s3://my-bucket-name/atomic_uploads/` in the `us-east-2` region.
122+
109123
## Using `systemd` to run Atomic-Server as a service
110124

111125
In Linux operating systems, you can use `systemd` to manage running processes.

server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ dialoguer = "0.10"
3333
directories = ">= 2, < 5"
3434
dotenv = "0.15"
3535
futures = "0.3"
36+
opendal = "0.39.0"
3637
percent-encoding = "2.2.0"
3738
regex = "1"
3839
rio_api = "0.7"

server/src/appstate.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! App state, which is accessible from handlers
22
use crate::{
3-
commit_monitor::CommitMonitor, config::Config, errors::AtomicServerResult, search::SearchState,
3+
commit_monitor::CommitMonitor, config::Config, errors::AtomicServerResult, files::FileStore,
4+
search::SearchState,
45
};
56
use atomic_lib::{
67
agents::{generate_public_key, Agent},
@@ -23,6 +24,10 @@ pub struct AppState {
2324
/// The Actix Address of the CommitMonitor, which should receive updates when a commit is applied
2425
pub commit_monitor: actix::Addr<CommitMonitor>,
2526
pub search_state: SearchState,
27+
/// stores config values and the active FileStore type, e.g. FS or S3
28+
pub file_store: FileStore,
29+
/// stores config values for FS filestore regardless of active file store as fallback
30+
pub fs_file_store: FileStore,
2631
}
2732

2833
/// Creates the AppState (the server's context available in Handlers).
@@ -102,11 +107,18 @@ pub fn init(config: Config) -> AtomicServerResult<AppState> {
102107
crate::search::add_all_resources(&search_state, &store)?
103108
}
104109

110+
tracing::info!("Initializing file stores");
111+
// Initialize file stores
112+
let fs_file_store = FileStore::init_fs_from_config(&config);
113+
let file_store = FileStore::init_from_config(&config, fs_file_store.clone());
114+
105115
Ok(AppState {
106116
store,
107117
config,
108118
commit_monitor,
109119
search_state,
120+
file_store,
121+
fs_file_store,
110122
})
111123
}
112124

server/src/bin.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod commit_monitor;
88
pub mod config;
99
mod content_types;
1010
mod errors;
11+
mod files;
1112
mod handlers;
1213
mod helpers;
1314
#[cfg(feature = "https")]

server/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,22 @@ pub struct Opts {
7474
#[clap(long, env = "ATOMIC_DATA_DIR")]
7575
pub data_dir: Option<PathBuf>,
7676

77+
/// bucket name from s3-compatible storage service
78+
#[clap(long, env = "ATOMIC_S3_BUCKET")]
79+
pub s3_bucket: Option<String>,
80+
81+
/// region for s3-compatible storage service, defaults to "us-east-1"
82+
#[clap(long, env = "ATOMIC_S3_REGION")]
83+
pub s3_region: Option<String>,
84+
85+
/// endpoint for s3-compatible storage service, defaults to "https://s3.amazonaws.com"
86+
#[clap(long, env = "ATOMIC_S3_ENDPOINT")]
87+
pub s3_endpoint: Option<String>,
88+
89+
/// path where s3 uploads will be stored
90+
#[clap(long, env = "ATOMIC_S3_PATH")]
91+
pub s3_path: Option<String>,
92+
7793
/// CAUTION: Skip authentication checks, making all data publicly readable. Improves performance.
7894
#[clap(long, env = "ATOMIC_PUBLIC_MODE")]
7995
pub public_mode: bool,

server/src/errors.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,3 +179,13 @@ impl From<actix_web::Error> for AtomicServerError {
179179
}
180180
}
181181
}
182+
183+
impl From<opendal::Error> for AtomicServerError {
184+
fn from(error: opendal::Error) -> Self {
185+
AtomicServerError {
186+
message: error.to_string(),
187+
error_type: AppErrorType::Other,
188+
error_resource: None,
189+
}
190+
}
191+
}

server/src/files.rs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
use std::{fmt, fs, io::Write, path::PathBuf, time::Duration};
2+
3+
use actix_multipart::Field;
4+
use futures::StreamExt;
5+
use opendal::{services::S3, Operator};
6+
7+
use crate::{appstate::AppState, config::Config, errors::AtomicServerResult};
8+
9+
#[derive(Clone, Debug, PartialEq)]
10+
pub enum FileStore {
11+
S3(S3Config),
12+
FS(FSConfig),
13+
}
14+
15+
#[derive(Clone, Debug, PartialEq)]
16+
pub struct S3Config {
17+
pub bucket: String,
18+
pub path: String,
19+
pub endpoint: Option<String>,
20+
pub region: Option<String>,
21+
}
22+
23+
#[derive(Clone, Debug, PartialEq)]
24+
pub struct FSConfig {
25+
pub path: PathBuf,
26+
}
27+
28+
impl FileStore {
29+
const S3_PREFIX: &'static str = "s3:";
30+
const FS_PREFIX: &'static str = "fs:";
31+
32+
pub fn init_fs_from_config(config: &Config) -> FileStore {
33+
FileStore::FS(FSConfig {
34+
path: config.uploads_path.clone(),
35+
})
36+
}
37+
38+
pub fn init_from_config(config: &Config, fs_file_store: FileStore) -> FileStore {
39+
let opts = &config.opts;
40+
if let Some(bucket) = &opts.s3_bucket {
41+
let config = S3Config {
42+
bucket: bucket.clone(),
43+
endpoint: opts.s3_endpoint.clone(),
44+
region: opts.s3_region.clone(),
45+
path: opts.s3_path.clone().unwrap_or("uploads".to_string()),
46+
};
47+
FileStore::S3(config)
48+
} else {
49+
fs_file_store
50+
}
51+
}
52+
53+
pub fn get_subject_file_store<'a>(appstate: &'a AppState, subject: &str) -> &'a FileStore {
54+
if subject.contains(Self::S3_PREFIX) {
55+
&appstate.file_store
56+
} else {
57+
&appstate.fs_file_store
58+
}
59+
}
60+
61+
pub fn get_fs_file_path(&self, file_id: &str) -> AtomicServerResult<PathBuf> {
62+
if let FileStore::FS(config) = self {
63+
let fs_file_id = file_id.strip_prefix(Self::FS_PREFIX).unwrap_or(file_id);
64+
let mut file_path = config.path.clone();
65+
file_path.push(fs_file_id.to_string());
66+
Ok(file_path)
67+
} else {
68+
Err("Wrong FileStore passed to get_fs_file_path".into())
69+
}
70+
}
71+
72+
pub fn prefix(&self) -> &str {
73+
match self {
74+
Self::S3(_) => Self::S3_PREFIX,
75+
Self::FS(_) => Self::FS_PREFIX,
76+
}
77+
}
78+
79+
pub fn encoded(&self) -> String {
80+
urlencoding::encode(self.prefix()).into_owned()
81+
}
82+
83+
pub async fn upload_file(&self, file_id: &str, field: Field) -> AtomicServerResult<i64> {
84+
match self {
85+
FileStore::S3(_) => s3_upload(self, &file_id, field).await,
86+
FileStore::FS(config) => fs_upload(self, &config, &file_id, field).await,
87+
}
88+
}
89+
}
90+
91+
impl fmt::Display for FileStore {
92+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93+
write!(f, "{}", self.prefix())
94+
}
95+
}
96+
97+
async fn fs_upload(
98+
file_store: &FileStore,
99+
config: &FSConfig,
100+
file_id: &str,
101+
mut field: Field,
102+
) -> AtomicServerResult<i64> {
103+
std::fs::create_dir_all(config.path.clone())?;
104+
105+
let mut file = fs::File::create(file_store.get_fs_file_path(file_id)?)?;
106+
107+
let byte_count: i64 = file
108+
.metadata()?
109+
.len()
110+
.try_into()
111+
.map_err(|_e| "Too large")?;
112+
113+
// Field in turn is stream of *Bytes* object
114+
while let Some(chunk) = field.next().await {
115+
let data = chunk.map_err(|e| format!("Error while reading multipart data. {}", e))?;
116+
// TODO: Update a SHA256 hash here for checksum
117+
file.write_all(&data)?;
118+
}
119+
120+
Ok(byte_count)
121+
}
122+
123+
async fn s3_upload(
124+
file_store: &FileStore,
125+
file_id: &str,
126+
mut field: Field,
127+
) -> AtomicServerResult<i64> {
128+
let mut builder = S3::default();
129+
130+
if let FileStore::S3(config) = file_store {
131+
builder.bucket(&config.bucket);
132+
builder.root(&config.path);
133+
config.region.as_ref().map(|r| builder.region(&r));
134+
config.endpoint.as_ref().map(|e| builder.endpoint(&e));
135+
} else {
136+
return Err("Uploading to S3 but no S3 config provided".into());
137+
}
138+
139+
let op: Operator = Operator::new(builder)?.finish();
140+
let mut w = op.writer(file_id).await?;
141+
let mut len = 0;
142+
while let Some(chunk) = field.next().await {
143+
let data = chunk.map_err(|e| format!("Error while reading multipart data. {}", e))?;
144+
len = len + data.len();
145+
w.write(data).await?;
146+
}
147+
148+
let byte_length: i64 = len.try_into().map_err(|_e| "Too large")?;
149+
w.close().await?;
150+
Ok(byte_length)
151+
}
152+
153+
pub async fn get_s3_signed_url(
154+
file_store: &FileStore,
155+
duration: Duration,
156+
file_id: &str,
157+
) -> AtomicServerResult<String> {
158+
let mut builder = S3::default();
159+
160+
if let FileStore::S3(config) = file_store {
161+
builder.bucket(&config.bucket);
162+
builder.root(&config.path);
163+
config.region.as_ref().map(|r| builder.region(&r));
164+
config.endpoint.as_ref().map(|e| builder.endpoint(&e));
165+
} else {
166+
return Err("Downloading from S3 but no S3 config provided".into());
167+
}
168+
169+
let op: Operator = Operator::new(builder)?.finish();
170+
171+
let uri = op.presign_read(file_id, duration).await?.uri().to_string();
172+
173+
Ok(uri)
174+
}

server/src/handlers/download.rs

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
1+
use std::time::Duration;
2+
13
use actix_files::NamedFile;
2-
use actix_web::{web, HttpRequest, HttpResponse};
3-
use atomic_lib::{urls, Resource, Storelike};
4+
use actix_web::{web, HttpRequest, HttpResponse, Responder};
5+
use atomic_lib::{urls, Storelike};
46

5-
use crate::{appstate::AppState, errors::AtomicServerResult, helpers::get_client_agent};
7+
use crate::{
8+
appstate::AppState,
9+
errors::AtomicServerResult,
10+
files::{self, FileStore},
11+
helpers::get_client_agent,
12+
};
613

714
/// Downloads the File of the Resource that matches the same URL minus the `/download` path.
815
#[tracing::instrument(skip(appstate, req))]
@@ -26,20 +33,39 @@ pub async fn handle_download(
2633

2734
let for_agent = get_client_agent(headers, &appstate, subject.clone())?;
2835
tracing::info!("handle_download: {}", subject);
29-
let resource = store.get_resource_extended(&subject, false, &for_agent)?;
30-
download_file_handler_partial(&resource, &req, &appstate)
36+
let file_store = FileStore::get_subject_file_store(&appstate, &subject);
37+
let encoded = subject.replace(file_store.prefix(), &file_store.encoded());
38+
let resource = store.get_resource_extended(&encoded, false, &for_agent)?;
39+
let file_id = resource
40+
.get(urls::INTERNAL_ID)
41+
.map_err(|e| format!("Internal ID of file could not be resolved. {}", e))?
42+
.to_string();
43+
44+
if let FileStore::S3(_) = file_store {
45+
signed_url_redirect_handler(file_id.as_str(), &req, &appstate).await
46+
} else {
47+
download_file_handler_partial(file_id.as_str(), &req, &appstate)
48+
}
3149
}
3250

3351
pub fn download_file_handler_partial(
34-
resource: &Resource,
52+
file_id: &str,
3553
req: &HttpRequest,
3654
appstate: &AppState,
3755
) -> AtomicServerResult<HttpResponse> {
38-
let file_name = resource
39-
.get(urls::INTERNAL_ID)
40-
.map_err(|e| format!("Internal ID of file could not be resolved. {}", e))?;
41-
let mut file_path = appstate.config.uploads_path.clone();
42-
file_path.push(file_name.to_string());
56+
let file_path = appstate.fs_file_store.get_fs_file_path(file_id)?;
4357
let file = NamedFile::open(file_path)?;
4458
Ok(file.into_response(req))
4559
}
60+
61+
async fn signed_url_redirect_handler(
62+
file_id: &str,
63+
req: &HttpRequest,
64+
appstate: &AppState,
65+
) -> AtomicServerResult<HttpResponse> {
66+
let signed_url =
67+
files::get_s3_signed_url(&appstate.file_store, Duration::from_secs(3600), file_id).await?;
68+
Ok(web::Redirect::to(signed_url)
69+
.respond_to(req)
70+
.map_into_boxed_body())
71+
}

0 commit comments

Comments
 (0)