1313// limitations under the License.
1414
1515use std:: ops:: Add ;
16- use std:: sync:: Arc ;
1716use std:: time:: Duration ;
1817use std:: time:: UNIX_EPOCH ;
1918
@@ -22,6 +21,7 @@ use common_base::base::unescape_for_key;
2221use common_exception:: ErrorCode ;
2322use common_exception:: Result ;
2423use common_meta_api:: KVApi ;
24+ use common_meta_store:: MetaStore ;
2525use common_meta_types:: KVMeta ;
2626use common_meta_types:: MatchSeq ;
2727use common_meta_types:: NodeInfo ;
@@ -35,14 +35,14 @@ use crate::cluster::ClusterApi;
3535pub static CLUSTER_API_KEY_PREFIX : & str = "__fd_clusters" ;
3636
3737pub struct ClusterMgr {
38- kv_api : Arc < dyn KVApi > ,
38+ metastore : MetaStore ,
3939 lift_time : Duration ,
4040 cluster_prefix : String ,
4141}
4242
4343impl ClusterMgr {
4444 pub fn create (
45- kv_api : Arc < dyn KVApi > ,
45+ metastore : MetaStore ,
4646 tenant : & str ,
4747 cluster_id : & str ,
4848 lift_time : Duration ,
@@ -54,7 +54,7 @@ impl ClusterMgr {
5454 }
5555
5656 Ok ( ClusterMgr {
57- kv_api ,
57+ metastore ,
5858 lift_time,
5959 cluster_prefix : format ! (
6060 "{}/{}/{}/databend_query" ,
@@ -87,7 +87,7 @@ impl ClusterApi for ClusterMgr {
8787 let value = Operation :: Update ( serde_json:: to_vec ( & node) ?) ;
8888 let node_key = format ! ( "{}/{}" , self . cluster_prefix, escape_for_key( & node. id) ?) ;
8989 let upsert_node = self
90- . kv_api
90+ . metastore
9191 . upsert_kv ( UpsertKVReq :: new ( & node_key, seq, value, meta) ) ;
9292
9393 let res = upsert_node. await ?. added_or_else ( |v| {
@@ -101,7 +101,7 @@ impl ClusterApi for ClusterMgr {
101101 }
102102
103103 async fn get_nodes ( & self ) -> Result < Vec < NodeInfo > > {
104- let values = self . kv_api . prefix_list_kv ( & self . cluster_prefix ) . await ?;
104+ let values = self . metastore . prefix_list_kv ( & self . cluster_prefix ) . await ?;
105105
106106 let mut nodes_info = Vec :: with_capacity ( values. len ( ) ) ;
107107 for ( node_key, value) in values {
@@ -116,7 +116,7 @@ impl ClusterApi for ClusterMgr {
116116
117117 async fn drop_node ( & self , node_id : String , seq : Option < u64 > ) -> Result < ( ) > {
118118 let node_key = format ! ( "{}/{}" , self . cluster_prefix, escape_for_key( & node_id) ?) ;
119- let upsert_node = self . kv_api . upsert_kv ( UpsertKVReq :: new (
119+ let upsert_node = self . metastore . upsert_kv ( UpsertKVReq :: new (
120120 & node_key,
121121 seq. into ( ) ,
122122 Operation :: Delete ,
@@ -145,7 +145,7 @@ impl ClusterApi for ClusterMgr {
145145 } ;
146146
147147 let upsert_meta =
148- self . kv_api
148+ self . metastore
149149 . upsert_kv ( UpsertKVReq :: new ( & node_key, seq, Operation :: AsIs , meta) ) ;
150150
151151 match upsert_meta. await ? {
@@ -157,4 +157,8 @@ impl ClusterApi for ClusterMgr {
157157 UpsertKVReply { .. } => self . add_node ( node. clone ( ) ) . await ,
158158 }
159159 }
160+
161+ async fn get_local_addr ( & self ) -> Result < Option < String > > {
162+ Ok ( self . metastore . get_local_addr ( ) . await ?)
163+ }
160164}
0 commit comments