@@ -22,18 +22,10 @@ use std::io::BufReader;
2222use std:: io:: Lines ;
2323use std:: path:: Path ;
2424use std:: str:: FromStr ;
25- use std:: sync:: Arc ;
26- use std:: sync:: Mutex ;
2725
2826use databend_common_meta_raft_store:: config:: RaftConfig ;
29- use databend_common_meta_raft_store:: key_spaces:: RaftStoreEntry ;
30- use databend_common_meta_raft_store:: key_spaces:: SMEntry ;
3127use databend_common_meta_raft_store:: ondisk:: DataVersion ;
3228use databend_common_meta_raft_store:: raft_log_v004;
33- use databend_common_meta_raft_store:: sm_v003:: adapter:: SnapshotUpgradeV002ToV003 ;
34- use databend_common_meta_raft_store:: sm_v003:: write_entry:: WriteEntry ;
35- use databend_common_meta_raft_store:: sm_v003:: SnapshotStoreV003 ;
36- use databend_common_meta_raft_store:: state_machine:: MetaSnapshotId ;
3729use databend_common_meta_sled_store:: init_get_sled_db;
3830use databend_common_meta_sled_store:: openraft:: storage:: RaftLogStorageExt ;
3931use databend_common_meta_sled_store:: openraft:: RaftSnapshotBuilder ;
@@ -45,7 +37,6 @@ use databend_common_meta_types::raft_types::LogId;
4537use databend_common_meta_types:: raft_types:: Membership ;
4638use databend_common_meta_types:: raft_types:: NodeId ;
4739use databend_common_meta_types:: raft_types:: StoredMembership ;
48- use databend_common_meta_types:: sys_data:: SysData ;
4940use databend_common_meta_types:: Cmd ;
5041use databend_common_meta_types:: Endpoint ;
5142use databend_common_meta_types:: LogEntry ;
@@ -104,116 +95,15 @@ async fn import_lines<B: BufRead + 'static>(
10495 please use an older version databend-metactl to import from V001"
10596 ) ) ;
10697 }
107- DataVersion :: V002 => import_v002 ( raft_config, it) . await ?,
108- DataVersion :: V003 => import_v003 ( raft_config, it) . await ?,
98+ // v002 v003 v004 share the same exported data format.
99+ DataVersion :: V002 => crate :: import_v004:: import_v004 ( raft_config, it) . await ?,
100+ DataVersion :: V003 => crate :: import_v004:: import_v004 ( raft_config, it) . await ?,
109101 DataVersion :: V004 => crate :: import_v004:: import_v004 ( raft_config, it) . await ?,
110102 } ;
111103
112104 Ok ( max_log_id)
113105}
114106
115- /// Import serialized lines for `DataVersion::V002`
116- ///
117- /// While importing, the max log id is also returned.
118- ///
119- /// It write logs and related entries to sled trees, and state_machine entries to a snapshot.
120- async fn import_v002 (
121- raft_config : RaftConfig ,
122- lines : impl IntoIterator < Item = Result < String , io:: Error > > ,
123- ) -> anyhow:: Result < Option < LogId > > {
124- // v002 and v003 share the same exported data format.
125- import_v003 ( raft_config, lines) . await
126- }
127-
128- /// Import serialized lines for `DataVersion::V003`
129- ///
130- /// While importing, the max log id is also returned.
131- ///
132- /// It write logs and related entries to sled trees, and state_machine entries to a snapshot.
133- async fn import_v003 (
134- raft_config : RaftConfig ,
135- lines : impl IntoIterator < Item = Result < String , io:: Error > > ,
136- ) -> anyhow:: Result < Option < LogId > > {
137- let db = init_get_sled_db ( raft_config. raft_dir . clone ( ) , 1024 * 1024 * 1024 ) ;
138-
139- let mut n = 0 ;
140- let mut max_log_id: Option < LogId > = None ;
141- let mut trees = BTreeMap :: new ( ) ;
142-
143- let sys_data = Arc :: new ( Mutex :: new ( SysData :: default ( ) ) ) ;
144-
145- let snapshot_store = SnapshotStoreV003 :: new ( raft_config) ;
146- let writer = snapshot_store. new_writer ( ) ?;
147- let ( tx, join_handle) = writer. spawn_writer_thread ( "import_v003" ) ;
148-
149- let mut converter = SnapshotUpgradeV002ToV003 {
150- sys_data : sys_data. clone ( ) ,
151- } ;
152-
153- for line in lines {
154- let l = line?;
155- let ( tree_name, kv_entry) : ( String , RaftStoreEntry ) = serde_json:: from_str ( & l) ?;
156-
157- if tree_name. starts_with ( "state_machine/" ) {
158- // Write to snapshot
159- let sm_entry: SMEntry = kv_entry. try_into ( ) . map_err ( |err_str| {
160- anyhow:: anyhow!( "Failed to convert RaftStoreEntry to SMEntry: {}" , err_str)
161- } ) ?;
162-
163- let kv = converter. sm_entry_to_rotbl_kv ( sm_entry) ?;
164- if let Some ( kv) = kv {
165- tx. send ( WriteEntry :: Data ( kv) ) . await ?;
166- }
167- } else {
168- // Write to sled tree
169- if !trees. contains_key ( & tree_name) {
170- let tree = db. open_tree ( & tree_name) ?;
171- trees. insert ( tree_name. clone ( ) , tree) ;
172- }
173-
174- let tree = trees. get ( & tree_name) . unwrap ( ) ;
175-
176- let ( k, v) = RaftStoreEntry :: serialize ( & kv_entry) ?;
177-
178- tree. insert ( k, v) ?;
179-
180- if let RaftStoreEntry :: Logs { key : _, value } = kv_entry {
181- max_log_id = std:: cmp:: max ( max_log_id, Some ( value. log_id ) ) ;
182- } ;
183- }
184-
185- n += 1 ;
186- }
187-
188- for tree in trees. values ( ) {
189- tree. flush ( ) ?;
190- }
191-
192- let s = {
193- let r = sys_data. lock ( ) . unwrap ( ) ;
194- r. clone ( )
195- } ;
196-
197- tx. send ( WriteEntry :: Finish ( s) ) . await ?;
198- let temp_snapshot_data = join_handle. await ??;
199-
200- let last_applied = {
201- let r = sys_data. lock ( ) . unwrap ( ) ;
202- * r. last_applied_ref ( )
203- } ;
204- let snapshot_id = MetaSnapshotId :: new_with_epoch ( last_applied) ;
205- let db = temp_snapshot_data. move_to_final_path ( snapshot_id. to_string ( ) ) ?;
206-
207- eprintln ! (
208- "Imported {} records, snapshot: {}; snapshot_path: {}; snapshot_stat: {}" ,
209- n,
210- snapshot_id,
211- db. path( ) ,
212- db. stat( )
213- ) ;
214- Ok ( max_log_id)
215- }
216-
217107/// Read every line from stdin or restore file, deserialize it into tree_name, key and value.
218108/// Insert them into sled db and flush.
219109///
0 commit comments