Skip to content

Commit 01212a0

Browse files
committed
feat: wip Sagittarius definition clients
1 parent aae74da commit 01212a0

File tree

5 files changed

+314
-0
lines changed

5 files changed

+314
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use std::str::FromStr;
2+
use tonic::metadata::{MetadataMap, MetadataValue};
3+
4+
pub fn get_authorization_metadata(token: &str) -> MetadataMap {
5+
let metadata_value = MetadataValue::from_str(token).unwrap_or_else(|error| {
6+
panic!(
7+
"An error occurred trying to convert runtime_token into metadata: {}",
8+
error
9+
);
10+
});
11+
12+
let mut map = MetadataMap::new();
13+
map.insert("authorization", metadata_value);
14+
map
15+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use crate::command::push::auth::get_authorization_metadata;
2+
use tonic::{Extensions, Request, transport::Channel};
3+
use tucana::sagittarius::{
4+
DataTypeUpdateRequest as SagittariusDataTypeUpdateRequest,
5+
data_type_service_client::DataTypeServiceClient,
6+
};
7+
use tucana::shared::DefinitionDataType;
8+
9+
pub struct SagittariusDataTypeServiceClient {
10+
client: DataTypeServiceClient<Channel>,
11+
token: String,
12+
}
13+
14+
impl SagittariusDataTypeServiceClient {
15+
pub async fn new(sagittarius_url: String, token: String) -> Self {
16+
let client = match DataTypeServiceClient::connect(sagittarius_url).await {
17+
Ok(client) => {
18+
log::info!("Successfully connected to Sagittarius DataType Endpoint!");
19+
client
20+
}
21+
Err(err) => panic!(
22+
"Failed to connect to Sagittarius (DataType Endpoint): {:?}",
23+
err
24+
),
25+
};
26+
27+
Self { client, token }
28+
}
29+
30+
pub async fn update_data_types(
31+
&mut self,
32+
data_types: Vec<DefinitionDataType>,
33+
) {
34+
let request = Request::from_parts(
35+
get_authorization_metadata(&self.token),
36+
Extensions::new(),
37+
SagittariusDataTypeUpdateRequest {
38+
data_types,
39+
},
40+
);
41+
42+
match self.client.update(request).await {
43+
Ok(response) => {
44+
log::info!(
45+
"Successfully transferred data types. Did Sagittarius updated them? {:?}",
46+
&response
47+
);
48+
}
49+
Err(err) => {
50+
log::error!("Failed to update DataTypes: {:?}", err);
51+
}
52+
};
53+
}
54+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use tonic::Extensions;
2+
use tonic::Request;
3+
use tonic::transport::Channel;
4+
use tucana::sagittarius::FlowTypeUpdateRequest as SagittariusFlowTypeUpdateRequest;
5+
use tucana::sagittarius::flow_type_service_client::FlowTypeServiceClient;
6+
use tucana::shared::FlowType;
7+
use crate::command::push::auth::get_authorization_metadata;
8+
9+
pub struct SagittariusFlowTypeServiceClient {
10+
client: FlowTypeServiceClient<Channel>,
11+
token: String,
12+
}
13+
14+
impl SagittariusFlowTypeServiceClient {
15+
pub async fn new(sagittarius_url: String, token: String) -> Self {
16+
let client = match FlowTypeServiceClient::connect(sagittarius_url).await {
17+
Ok(client) => {
18+
log::info!("Successfully connected to Sagittarius FlowType Endpoint!");
19+
client
20+
}
21+
Err(err) => panic!(
22+
"Failed to connect to Sagittarius (FlowType Endpoint): {:?}",
23+
err
24+
),
25+
};
26+
27+
Self { client, token }
28+
}
29+
30+
pub async fn update_flow_types(
31+
&mut self,
32+
flow_types: Vec<FlowType>,
33+
) {
34+
let request = Request::from_parts(
35+
get_authorization_metadata(&self.token),
36+
Extensions::new(),
37+
SagittariusFlowTypeUpdateRequest {
38+
flow_types,
39+
},
40+
);
41+
42+
match self.client.update(request).await {
43+
Ok(response) => {
44+
log::info!(
45+
"Successfully transferred FlowTypes. Did Sagittarius updated them? {:?}",
46+
&response
47+
);
48+
}
49+
Err(err) => {
50+
log::error!("Failed to update FlowTypes: {:?}", err);
51+
}
52+
};
53+
}
54+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use tonic::Extensions;
2+
use tonic::Request;
3+
use tonic::transport::Channel;
4+
use tucana::sagittarius::RuntimeFunctionDefinitionUpdateRequest as SagittariusRuntimeFunctionUpdateRequest;
5+
use tucana::sagittarius::runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient;
6+
use tucana::shared::RuntimeFunctionDefinition;
7+
use crate::command::push::auth::get_authorization_metadata;
8+
9+
pub struct SagittariusRuntimeFunctionServiceClient {
10+
client: RuntimeFunctionDefinitionServiceClient<Channel>,
11+
token: String,
12+
}
13+
14+
impl SagittariusRuntimeFunctionServiceClient {
15+
pub async fn new(sagittarius_url: String, token: String) -> Self {
16+
let client = match RuntimeFunctionDefinitionServiceClient::connect(sagittarius_url).await {
17+
Ok(client) => {
18+
log::info!("Successfully connected to Sagittarius RuntimeFunction Endpoint!");
19+
client
20+
}
21+
Err(err) => panic!(
22+
"Failed to connect to Sagittarius (RuntimeFunction Endpoint): {:?}",
23+
err
24+
),
25+
};
26+
27+
Self { client, token }
28+
}
29+
30+
pub async fn update_runtime_function_definitions(
31+
&mut self,
32+
runtime_functions: Vec<RuntimeFunctionDefinition>,
33+
) {
34+
let request = Request::from_parts(
35+
get_authorization_metadata(&self.token),
36+
Extensions::new(),
37+
SagittariusRuntimeFunctionUpdateRequest {
38+
runtime_functions,
39+
},
40+
);
41+
42+
match self.client.update(request).await {
43+
Ok(response) => {
44+
log::info!(
45+
"Successfully transferred RuntimeFunctions. Did Sagittarius updated them? {:?}",
46+
&response
47+
);
48+
}
49+
Err(err) => {
50+
log::error!("Failed to update RuntimeFunctions: {:?}", err);
51+
}
52+
};
53+
}
54+
}

crates/cli/src/command/push/mod.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
use crate::analyser::core::Analyser;
2+
use crate::command::push::data_type_client_impl::SagittariusDataTypeServiceClient;
3+
use crate::command::push::flow_type_client_impl::SagittariusFlowTypeServiceClient;
4+
use crate::command::push::function_client_impl::SagittariusRuntimeFunctionServiceClient;
5+
use crate::formatter::{default, info};
6+
use notify::event::ModifyKind;
7+
use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher};
8+
use std::sync::mpsc::channel;
9+
use std::time::{Duration, Instant};
10+
11+
mod auth;
12+
mod data_type_client_impl;
13+
mod flow_type_client_impl;
14+
mod function_client_impl;
15+
16+
pub async fn push(token: String, url: String, path: Option<String>) {
17+
let dir_path = path.unwrap_or_else(|| "./definitions".to_string());
18+
19+
info(format!("Watching directory: {dir_path}"));
20+
info(String::from("Press Ctrl+C to stop watching..."));
21+
22+
{
23+
Analyser::new(dir_path.as_str()).report(false);
24+
}
25+
26+
// Set up file watcher
27+
let (tx, rx) = channel();
28+
let mut watcher = recommended_watcher(tx).unwrap();
29+
watcher
30+
.watch(std::path::Path::new(&dir_path), RecursiveMode::Recursive)
31+
.unwrap();
32+
33+
let mut last_run = Instant::now();
34+
35+
let mut data_type_client =
36+
SagittariusDataTypeServiceClient::new(url.clone(), token.clone()).await;
37+
let mut flow_type_client =
38+
SagittariusFlowTypeServiceClient::new(url.clone(), token.clone()).await;
39+
let mut function_client = SagittariusRuntimeFunctionServiceClient::new(url, token).await;
40+
41+
loop {
42+
if let Ok(Ok(event)) = rx.recv() {
43+
match event.kind {
44+
EventKind::Modify(modify) => {
45+
if let ModifyKind::Data(_) = modify
46+
&& last_run.elapsed() > Duration::from_millis(500)
47+
{
48+
default(String::from(
49+
"\n\n\n--------------------------------------------------------------------------\n\n",
50+
));
51+
info(String::from("Change detected! Regenerating report..."));
52+
let mut analyzer = Analyser::new(dir_path.as_str());
53+
54+
// No errors when reporter is empty!
55+
if analyzer.reporter.is_empty() {
56+
data_type_client
57+
.update_data_types(
58+
analyzer
59+
.data_types
60+
.iter()
61+
.map(|d| d.definition_data_type.clone())
62+
.collect(),
63+
)
64+
.await;
65+
flow_type_client
66+
.update_flow_types(
67+
analyzer
68+
.flow_types
69+
.iter()
70+
.map(|d| d.flow_type.clone())
71+
.collect(),
72+
)
73+
.await;
74+
function_client
75+
.update_runtime_function_definitions(
76+
analyzer
77+
.functions
78+
.iter()
79+
.map(|d| d.function.clone())
80+
.collect(),
81+
)
82+
.await;
83+
}
84+
85+
analyzer.report(false);
86+
87+
last_run = Instant::now();
88+
}
89+
}
90+
EventKind::Remove(_) => {
91+
if last_run.elapsed() > Duration::from_millis(500) {
92+
default(String::from(
93+
"\n\n\n--------------------------------------------------------------------------\n\n",
94+
));
95+
info(String::from("Change detected! Regenerating report..."));
96+
let mut analyzer = Analyser::new(dir_path.as_str());
97+
98+
// No errors when reporter is empty!
99+
if analyzer.reporter.is_empty() {
100+
data_type_client
101+
.update_data_types(
102+
analyzer
103+
.data_types
104+
.iter()
105+
.map(|d| d.definition_data_type.clone())
106+
.collect(),
107+
)
108+
.await;
109+
flow_type_client
110+
.update_flow_types(
111+
analyzer
112+
.flow_types
113+
.iter()
114+
.map(|d| d.flow_type.clone())
115+
.collect(),
116+
)
117+
.await;
118+
function_client
119+
.update_runtime_function_definitions(
120+
analyzer
121+
.functions
122+
.iter()
123+
.map(|d| d.function.clone())
124+
.collect(),
125+
)
126+
.await;
127+
}
128+
129+
analyzer.report(false);
130+
last_run = Instant::now();
131+
}
132+
}
133+
_ => {}
134+
}
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)