@@ -3,37 +3,59 @@ use std::{fmt, path::PathBuf, time::Duration};
33use opendal:: { services:: S3 , Operator } ;
44use tokio:: fs:: File ;
55
6- use crate :: { appstate :: AppState , config:: Config , errors:: AtomicServerResult } ;
6+ use crate :: { config:: { Config , Opts } , errors:: AtomicServerResult } ;
77
8+ #[ derive( Clone , Debug ) ]
89pub enum FileStore {
9- S3 ,
10- FS ,
10+ S3 ( S3Config ) ,
11+ FS ( FSConfig ) ,
1112}
1213
14+ #[ derive( Clone , Debug ) ]
15+ pub struct S3Config {
16+ bucket : String ,
17+ path : String ,
18+ endpoint : Option < String > ,
19+ region : Option < String > ,
20+ }
21+
22+ #[ derive( Clone , Debug ) ]
23+ pub struct FSConfig { }
24+
1325impl FileStore {
1426 const S3_PREFIX : & ' static str = "s3:" ;
1527 const FS_PREFIX : & ' static str = "fs:" ;
1628
17- pub fn get_config_file_store ( config : & Config ) -> FileStore {
18- if config. opts . s3_bucket . is_some ( ) {
19- FileStore :: S3
29+ pub fn init_fs_from_config ( _opts : & Opts ) -> FileStore {
30+ FileStore :: FS ( FSConfig { } )
31+ }
32+
33+ pub fn init_from_config ( opts : & Opts , fs_file_store : FileStore ) -> FileStore {
34+ if opts. s3_bucket . is_some ( ) {
35+ let config = S3Config {
36+ bucket : opts. s3_bucket . clone ( ) . unwrap ( ) ,
37+ endpoint : opts. s3_endpoint . clone ( ) ,
38+ region : opts. s3_region . clone ( ) ,
39+ path : opts. s3_path . clone ( ) . unwrap_or ( "uploads" . to_string ( ) ) ,
40+ } ;
41+ FileStore :: S3 ( config)
2042 } else {
21- FileStore :: FS
43+ fs_file_store
2244 }
2345 }
2446
25- pub fn get_subject_file_store ( subject : & str ) -> FileStore {
47+ pub fn get_subject_file_store < ' a > ( config : & ' a Config , subject : & str ) -> & ' a FileStore {
2648 if subject. contains ( Self :: S3_PREFIX ) {
27- FileStore :: S3
49+ & config . file_store
2850 } else {
29- FileStore :: FS
51+ & config . fs_file_store
3052 }
3153 }
3254
3355 pub fn prefix ( & self ) -> & str {
3456 match self {
35- Self :: S3 => Self :: S3_PREFIX ,
36- Self :: FS => Self :: FS_PREFIX ,
57+ Self :: S3 ( _ ) => Self :: S3_PREFIX ,
58+ Self :: FS ( _ ) => Self :: FS_PREFIX ,
3759 }
3860 }
3961
@@ -49,92 +71,52 @@ impl fmt::Display for FileStore {
4971}
5072
5173pub async fn s3_upload_object (
52- appstate : & AppState ,
74+ file_store : & FileStore ,
5375 file_id : & str ,
5476 file_path : & PathBuf ,
5577) -> AtomicServerResult < ( ) > {
5678 let mut builder = S3 :: default ( ) ;
57- let bucket = appstate
58- . config
59- . opts
60- . s3_bucket
61- . as_ref ( )
62- . ok_or ( "uploading to s3 but no s3 bucket configured" ) ?;
63- builder. bucket ( bucket) ;
64- appstate
65- . config
66- . opts
67- . s3_region
68- . as_ref ( )
69- . map ( |r| builder. region ( & r) ) ;
70- appstate
71- . config
72- . opts
73- . s3_endpoint
74- . as_ref ( )
75- . map ( |e| builder. endpoint ( & e) ) ;
76-
77- let default_path = & "uploads" . to_string ( ) ;
78- let path = appstate
79- . config
80- . opts
81- . s3_path
82- . as_ref ( )
83- . unwrap_or ( default_path) ;
79+
80+ if let FileStore :: S3 ( config) = file_store {
81+ builder. bucket ( & config. bucket ) ;
82+ builder. root ( & config. path ) ;
83+ config. region . as_ref ( ) . map ( |r| builder. region ( & r) ) ;
84+ config. endpoint . as_ref ( ) . map ( |e| builder. endpoint ( & e) ) ;
85+ } else {
86+ return Err ( "Uploading to S3 but no S3 config provided" . into ( ) ) ;
87+ }
8488
8589 let op: Operator = Operator :: new ( builder) ?. finish ( ) ;
8690
8791 let mut tmp_file = File :: open ( file_path) . await ?;
8892 let length = tmp_file. metadata ( ) . await ?. len ( ) ;
8993
90- let s3_path = format ! ( "{}/{}" , path, & file_id) ;
91- let mut w = op. writer_with ( & s3_path) . content_length ( length) . await ?;
94+ let mut w = op. writer_with ( & file_id) . content_length ( length) . await ?;
9295 tokio:: io:: copy ( & mut tmp_file, & mut w) . await ?;
9396 w. close ( ) . await ?;
9497 Ok ( ( ) )
9598}
9699
97100pub async fn get_s3_signed_url (
98- appstate : & AppState ,
101+ file_store : & FileStore ,
99102 duration : Duration ,
100103 file_id : & str ,
101104) -> AtomicServerResult < String > {
102105 let mut builder = S3 :: default ( ) ;
103106
104- let bucket = appstate
105- . config
106- . opts
107- . s3_bucket
108- . as_ref ( )
109- . ok_or ( "s3 file found but no s3 bucket" ) ?;
110- builder. bucket ( bucket) ;
111-
112- appstate
113- . config
114- . opts
115- . s3_region
116- . as_ref ( )
117- . map ( |r| builder. region ( & r) ) ;
118-
119- appstate
120- . config
121- . opts
122- . s3_endpoint
123- . as_ref ( )
124- . map ( |e| builder. endpoint ( & e) ) ;
125-
126- let default_path = & "uploads" . to_string ( ) ;
127- let path = appstate
128- . config
129- . opts
130- . s3_path
131- . as_ref ( )
132- . unwrap_or ( default_path) ;
107+ if let FileStore :: S3 ( config) = file_store {
108+ builder. bucket ( & config. bucket ) ;
109+ builder. root ( & config. path ) ;
110+ config. region . as_ref ( ) . map ( |r| builder. region ( & r) ) ;
111+ config. endpoint . as_ref ( ) . map ( |e| builder. endpoint ( & e) ) ;
112+ } else {
113+ return Err ( "Downloading from S3 but no S3 config provided" . into ( ) ) ;
114+ }
133115
134116 let op: Operator = Operator :: new ( builder) ?. finish ( ) ;
135117
136118 let uri = op
137- . presign_read ( & format ! ( "{}/{}" , & path , file_id) , duration)
119+ . presign_read ( file_id, duration)
138120 . await ?
139121 . uri ( )
140122 . to_string ( ) ;
0 commit comments