1- use std:: { fmt, path:: PathBuf , time:: Duration } ;
1+ use std:: { fmt, fs , io :: Write , path:: PathBuf , time:: Duration } ;
22
33use actix_multipart:: Field ;
44use futures:: StreamExt ;
55use opendal:: { services:: S3 , Operator } ;
6- use tokio:: fs:: File ;
76
87use crate :: { appstate:: AppState , config:: Config , errors:: AtomicServerResult } ;
98
@@ -80,6 +79,13 @@ impl FileStore {
8079 pub fn encoded ( & self ) -> String {
8180 urlencoding:: encode ( self . prefix ( ) ) . into_owned ( )
8281 }
82+
83+ pub async fn upload_file ( & self , file_id : & str , field : Field ) -> AtomicServerResult < i64 > {
84+ match self {
85+ FileStore :: S3 ( _) => s3_upload ( self , & file_id, field) . await ,
86+ FileStore :: FS ( config) => fs_upload ( self , & config, & file_id, field) . await ,
87+ }
88+ }
8389}
8490
8591impl fmt:: Display for FileStore {
@@ -88,41 +94,37 @@ impl fmt::Display for FileStore {
8894 }
8995}
9096
91- pub async fn s3_upload (
92- file_store : & FileStore ,
97+ async fn fs_upload (
98+ file_store : & FileStore ,
99+ config : & FSConfig ,
93100 file_id : & str ,
94101 mut field : Field ,
95102) -> AtomicServerResult < i64 > {
96- let mut builder = S3 :: default ( ) ;
103+ std :: fs :: create_dir_all ( config . path . clone ( ) ) ? ;
97104
98- if let FileStore :: S3 ( config) = file_store {
99- builder. bucket ( & config. bucket ) ;
100- builder. root ( & config. path ) ;
101- config. region . as_ref ( ) . map ( |r| builder. region ( & r) ) ;
102- config. endpoint . as_ref ( ) . map ( |e| builder. endpoint ( & e) ) ;
103- } else {
104- return Err ( "Uploading to S3 but no S3 config provided" . into ( ) ) ;
105- }
105+ let mut file = fs:: File :: create ( file_store. get_fs_file_path ( file_id) ?) ?;
106106
107- let op: Operator = Operator :: new ( builder) ?. finish ( ) ;
108- let mut w = op. writer ( file_id) . await ?;
109- let mut len = 0 ;
107+ let byte_count: i64 = file
108+ . metadata ( ) ?
109+ . len ( )
110+ . try_into ( )
111+ . map_err ( |_e| "Too large" ) ?;
112+
113+ // Field in turn is stream of *Bytes* object
110114 while let Some ( chunk) = field. next ( ) . await {
111115 let data = chunk. map_err ( |e| format ! ( "Error while reading multipart data. {}" , e) ) ?;
112- len = len + data . len ( ) ;
113- w . write ( data) . await ?;
116+ // TODO: Update a SHA256 hash here for checksum
117+ file . write_all ( & data) ?;
114118 }
115119
116- let byte_length: i64 = len. try_into ( ) . map_err ( |_e| "Too large" ) ?;
117- w. close ( ) . await ?;
118- Ok ( byte_length)
120+ Ok ( byte_count)
119121}
120122
121- pub async fn s3_upload_object (
123+ async fn s3_upload (
122124 file_store : & FileStore ,
123125 file_id : & str ,
124- file_path : & PathBuf ,
125- ) -> AtomicServerResult < ( ) > {
126+ mut field : Field ,
127+ ) -> AtomicServerResult < i64 > {
126128 let mut builder = S3 :: default ( ) ;
127129
128130 if let FileStore :: S3 ( config) = file_store {
@@ -135,14 +137,17 @@ pub async fn s3_upload_object(
135137 }
136138
137139 let op: Operator = Operator :: new ( builder) ?. finish ( ) ;
140+ let mut w = op. writer ( file_id) . await ?;
141+ let mut len = 0 ;
142+ while let Some ( chunk) = field. next ( ) . await {
143+ let data = chunk. map_err ( |e| format ! ( "Error while reading multipart data. {}" , e) ) ?;
144+ len = len + data. len ( ) ;
145+ w. write ( data) . await ?;
146+ }
138147
139- let mut tmp_file = File :: open ( file_path) . await ?;
140- let length = tmp_file. metadata ( ) . await ?. len ( ) ;
141-
142- let mut w = op. writer_with ( & file_id) . content_length ( length) . await ?;
143- tokio:: io:: copy ( & mut tmp_file, & mut w) . await ?;
148+ let byte_length: i64 = len. try_into ( ) . map_err ( |_e| "Too large" ) ?;
144149 w. close ( ) . await ?;
145- Ok ( ( ) )
150+ Ok ( byte_length )
146151}
147152
148153pub async fn get_s3_signed_url (
0 commit comments