1616 *
1717 */
1818
19- use std:: collections:: HashSet ;
19+ use std:: collections:: { HashMap , HashSet } ;
2020
2121use actix_web:: { http:: header:: ContentType , Error } ;
2222use chrono:: Utc ;
2323use datafusion:: error:: DataFusionError ;
2424use http:: StatusCode ;
2525use itertools:: Itertools ;
2626use once_cell:: sync:: Lazy ;
27+ use relative_path:: RelativePathBuf ;
2728use serde:: { Deserialize , Serialize } ;
2829use serde_json:: Error as SerdeError ;
2930use tokio:: sync:: RwLock ;
30- use tracing:: { error, trace , warn } ;
31+ use tracing:: error;
3132
3233use crate :: {
33- handlers:: http:: rbac:: RBACError ,
34+ handlers:: http:: {
35+ rbac:: RBACError ,
36+ users:: { CORRELATION_DIR , USERS_ROOT_DIR } ,
37+ } ,
3438 option:: CONFIG ,
3539 query:: QUERY_SESSION ,
3640 rbac:: { map:: SessionKey , Users } ,
@@ -39,167 +43,208 @@ use crate::{
3943 utils:: { get_hash, user_auth_for_query} ,
4044} ;
4145
42- pub static CORRELATIONS : Lazy < Correlation > = Lazy :: new ( Correlation :: default) ;
46+ pub static CORRELATIONS : Lazy < Correlations > = Lazy :: new ( Correlations :: default) ;
4347
44- #[ derive( Debug , Default ) ]
45- pub struct Correlation ( RwLock < Vec < CorrelationConfig > > ) ;
48+ type CorrelationMap = HashMap < CorrelationId , CorrelationConfig > ;
4649
47- impl Correlation {
48- //load correlations from storage
50+ #[ derive( Debug , Default , derive_more:: Deref ) ]
51+ pub struct Correlations ( RwLock < CorrelationMap > ) ;
52+
53+ impl Correlations {
54+ // Load correlations from storage
4955 pub async fn load ( & self ) -> anyhow:: Result < ( ) > {
5056 let store = CONFIG . storage ( ) . get_object_store ( ) ;
5157 let all_correlations = store. get_all_correlations ( ) . await . unwrap_or_default ( ) ;
5258
53- let correlations: Vec < CorrelationConfig > = all_correlations
54- . into_iter ( )
55- . flat_map ( |( _, correlations_bytes) | correlations_bytes)
56- . filter_map ( |correlation| {
57- serde_json:: from_slice ( & correlation)
58- . inspect_err ( |e| {
59- error ! ( "Unable to load correlation: {e}" ) ;
60- } )
61- . ok ( )
62- } )
63- . collect ( ) ;
59+ for correlations_bytes in all_correlations. values ( ) . flatten ( ) {
60+ let correlation = match serde_json:: from_slice :: < CorrelationConfig > ( correlations_bytes)
61+ {
62+ Ok ( c) => c,
63+ Err ( e) => {
64+ error ! ( "Unable to load correlation file : {e}" ) ;
65+ continue ;
66+ }
67+ } ;
68+
69+ self . write ( )
70+ . await
71+ . insert ( correlation. id . to_owned ( ) , correlation) ;
72+ }
6473
65- let mut s = self . 0 . write ( ) . await ;
66- s. extend ( correlations) ;
6774 Ok ( ( ) )
6875 }
6976
70- pub async fn list_correlations_for_user (
77+ pub async fn list_correlations (
7178 & self ,
7279 session_key : & SessionKey ,
73- user_id : & str ,
7480 ) -> Result < Vec < CorrelationConfig > , CorrelationError > {
75- let correlations = self . 0 . read ( ) . await . iter ( ) . cloned ( ) . collect_vec ( ) ;
76-
7781 let mut user_correlations = vec ! [ ] ;
7882 let permissions = Users . get_permissions ( session_key) ;
7983
80- for c in correlations {
81- let tables = & c
84+ for correlation in self . read ( ) . await . values ( ) {
85+ let tables = & correlation
8286 . table_configs
8387 . iter ( )
8488 . map ( |t| t. table_name . clone ( ) )
8589 . collect_vec ( ) ;
86- if user_auth_for_query ( & permissions, tables) . is_ok ( ) && c . user_id == user_id {
87- user_correlations. push ( c ) ;
90+ if user_auth_for_query ( & permissions, tables) . is_ok ( ) {
91+ user_correlations. push ( correlation . clone ( ) ) ;
8892 }
8993 }
94+
9095 Ok ( user_correlations)
9196 }
9297
9398 pub async fn get_correlation (
9499 & self ,
95100 correlation_id : & str ,
96- user_id : & str ,
97101 ) -> Result < CorrelationConfig , CorrelationError > {
98- let read = self . 0 . read ( ) . await ;
99- let correlation = read
100- . iter ( )
101- . find ( |c| c. id == correlation_id && c. user_id == user_id)
102- . cloned ( ) ;
103-
104- correlation. ok_or_else ( || {
105- CorrelationError :: AnyhowError ( anyhow:: Error :: msg ( format ! (
106- "Unable to find correlation with ID- {correlation_id}"
107- ) ) )
108- } )
102+ self . read ( )
103+ . await
104+ . get ( correlation_id)
105+ . cloned ( )
106+ . ok_or_else ( || {
107+ CorrelationError :: AnyhowError ( anyhow:: Error :: msg ( format ! (
108+ "Unable to find correlation with ID- {correlation_id}"
109+ ) ) )
110+ } )
109111 }
110112
111- pub async fn update ( & self , correlation : & CorrelationConfig ) -> Result < ( ) , CorrelationError > {
112- // save to memory
113- let mut s = self . 0 . write ( ) . await ;
114- s. retain ( |c| c. id != correlation. id ) ;
115- s. push ( correlation. clone ( ) ) ;
116- Ok ( ( ) )
113+ /// Create correlation associated with the user
114+ pub async fn create (
115+ & self ,
116+ mut correlation : CorrelationConfig ,
117+ session_key : & SessionKey ,
118+ ) -> Result < CorrelationConfig , CorrelationError > {
119+ correlation. id = get_hash ( Utc :: now ( ) . timestamp_micros ( ) . to_string ( ) . as_str ( ) ) ;
120+ correlation. validate ( session_key) . await ?;
121+
122+ // Update in storage
123+ let correlation_bytes = serde_json:: to_vec ( & correlation) ?. into ( ) ;
124+ let path = correlation. path ( ) ;
125+ CONFIG
126+ . storage ( )
127+ . get_object_store ( )
128+ . put_object ( & path, correlation_bytes)
129+ . await ?;
130+
131+ // Update in memory
132+ self . write ( ) . await . insert (
133+ correlation. id . to_owned ( ) ,
134+ correlation. clone ( ) ,
135+ ) ;
136+
137+ Ok ( correlation)
117138 }
118139
119- pub async fn delete ( & self , correlation_id : & str ) -> Result < ( ) , CorrelationError > {
120- // now delete from memory
121- let read_access = self . 0 . read ( ) . await ;
140+ /// Update existing correlation for the user and with the same ID
141+ pub async fn update (
142+ & self ,
143+ mut updated_correlation : CorrelationConfig ,
144+ session_key : & SessionKey ,
145+ ) -> Result < CorrelationConfig , CorrelationError > {
146+ // validate whether user has access to this correlation object or not
147+ let correlation = self . get_correlation ( & updated_correlation. id ) . await ?;
148+ if correlation. user_id != updated_correlation. user_id {
149+ return Err ( CorrelationError :: AnyhowError ( anyhow:: Error :: msg ( format ! (
150+ r#"User "{}" isn't authorized to update correlation with ID - {}"# ,
151+ updated_correlation. user_id, correlation. id
152+ ) ) ) ) ;
153+ }
122154
123- let index = read_access
124- . iter ( )
125- . enumerate ( )
126- . find ( |( _, c) | c. id == correlation_id)
127- . to_owned ( ) ;
128-
129- if let Some ( ( index, _) ) = index {
130- // drop the read access in order to get exclusive write access
131- drop ( read_access) ;
132- self . 0 . write ( ) . await . remove ( index) ;
133- trace ! ( "removed correlation from memory" ) ;
134- } else {
135- warn ! ( "Correlation ID- {correlation_id} not found in memory!" ) ;
155+ correlation. validate ( session_key) . await ?;
156+ updated_correlation. update ( correlation) ;
157+
158+ // Update in storage
159+ let correlation_bytes = serde_json:: to_vec ( & updated_correlation) ?. into ( ) ;
160+ let path = updated_correlation. path ( ) ;
161+ CONFIG
162+ . storage ( )
163+ . get_object_store ( )
164+ . put_object ( & path, correlation_bytes)
165+ . await ?;
166+
167+ // Update in memory
168+ self . write ( ) . await . insert (
169+ updated_correlation. id . to_owned ( ) ,
170+ updated_correlation. clone ( ) ,
171+ ) ;
172+
173+ Ok ( updated_correlation)
174+ }
175+
176+ /// Delete correlation from memory and storage
177+ pub async fn delete (
178+ & self ,
179+ correlation_id : & str ,
180+ user_id : & str ,
181+ ) -> Result < ( ) , CorrelationError > {
182+ let correlation = CORRELATIONS . get_correlation ( correlation_id) . await ?;
183+ if correlation. user_id != user_id {
184+ return Err ( CorrelationError :: AnyhowError ( anyhow:: Error :: msg ( format ! (
185+ r#"User "{user_id}" isn't authorized to delete correlation with ID - {correlation_id}"#
186+ ) ) ) ) ;
136187 }
188+
189+ // Delete from memory
190+ self . write ( ) . await . remove ( & correlation. id ) ;
191+
192+ // Delete from storage
193+ let path = correlation. path ( ) ;
194+ CONFIG
195+ . storage ( )
196+ . get_object_store ( )
197+ . delete_object ( & path)
198+ . await ?;
199+
137200 Ok ( ( ) )
138201 }
139202}
140203
141- #[ derive( Debug , Clone , Serialize , Deserialize ) ]
204+ #[ derive( Debug , Clone , Default , Serialize , Deserialize ) ]
142205#[ serde( rename_all = "camelCase" ) ]
143206pub enum CorrelationVersion {
207+ #[ default]
144208 V1 ,
145209}
146210
211+ type CorrelationId = String ;
212+ type UserId = String ;
213+
147214#[ derive( Debug , Clone , Serialize , Deserialize ) ]
148215#[ serde( rename_all = "camelCase" ) ]
149216pub struct CorrelationConfig {
217+ #[ serde( default ) ]
150218 pub version : CorrelationVersion ,
151219 pub title : String ,
152- pub id : String ,
153- pub user_id : String ,
220+ #[ serde( default ) ]
221+ pub id : CorrelationId ,
222+ #[ serde( default ) ]
223+ pub user_id : UserId ,
154224 pub table_configs : Vec < TableConfig > ,
155225 pub join_config : JoinConfig ,
156226 pub filter : Option < FilterQuery > ,
157227 pub start_time : Option < String > ,
158228 pub end_time : Option < String > ,
159229}
160230
161- impl CorrelationConfig { }
162-
163- #[ derive( Debug , Clone , Serialize , Deserialize ) ]
164- #[ serde( rename_all = "camelCase" ) ]
165- pub struct CorrelationRequest {
166- pub title : String ,
167- pub table_configs : Vec < TableConfig > ,
168- pub join_config : JoinConfig ,
169- pub filter : Option < FilterQuery > ,
170- pub start_time : Option < String > ,
171- pub end_time : Option < String > ,
172- }
173-
174- impl From < CorrelationRequest > for CorrelationConfig {
175- fn from ( val : CorrelationRequest ) -> Self {
176- Self {
177- version : CorrelationVersion :: V1 ,
178- title : val. title ,
179- id : get_hash ( Utc :: now ( ) . timestamp_micros ( ) . to_string ( ) . as_str ( ) ) ,
180- user_id : String :: default ( ) ,
181- table_configs : val. table_configs ,
182- join_config : val. join_config ,
183- filter : val. filter ,
184- start_time : val. start_time ,
185- end_time : val. end_time ,
186- }
231+ impl CorrelationConfig {
232+ pub fn path ( & self ) -> RelativePathBuf {
233+ RelativePathBuf :: from_iter ( [
234+ USERS_ROOT_DIR ,
235+ & self . user_id ,
236+ CORRELATION_DIR ,
237+ & format ! ( "{}.json" , self . id) ,
238+ ] )
187239 }
188- }
189240
190- impl CorrelationRequest {
191- pub fn generate_correlation_config ( self , id : String , user_id : String ) -> CorrelationConfig {
192- CorrelationConfig {
193- version : CorrelationVersion :: V1 ,
194- title : self . title ,
195- id,
196- user_id,
197- table_configs : self . table_configs ,
198- join_config : self . join_config ,
199- filter : self . filter ,
200- start_time : self . start_time ,
201- end_time : self . end_time ,
202- }
241+ pub fn update ( & mut self , update : Self ) {
242+ self . title = update. title ;
243+ self . table_configs = update. table_configs ;
244+ self . join_config = update. join_config ;
245+ self . filter = update. filter ;
246+ self . start_time = update. start_time ;
247+ self . end_time = update. end_time ;
203248 }
204249
205250 /// This function will validate the TableConfigs, JoinConfig, and user auth
0 commit comments