@@ -103,25 +103,30 @@ impl MithrilInfrastructure {
103103 let relay_signer_registration_mode = & config. relay_signer_registration_mode ;
104104 let relay_signature_registration_mode = & config. relay_signature_registration_mode ;
105105
106- let aggregators =
107- Self :: start_aggregators ( config, aggregator_cardano_nodes, chain_observer_type) . await ?;
108- let aggregator_endpoints = aggregators
106+ let ( leader_aggregator, follower_aggregators) =
107+ Self :: prepare_aggregators ( config, aggregator_cardano_nodes, chain_observer_type)
108+ . await ?;
109+
110+ Self :: register_startup_era ( & leader_aggregator, config) . await ?;
111+ leader_aggregator. serve ( ) . await ?;
112+
113+ let follower_aggregator_endpoints = follower_aggregators
109114 . iter ( )
110115 . map ( |aggregator| aggregator. endpoint ( ) )
111116 . collect :: < Vec < _ > > ( ) ;
112- let leader_aggregator_endpoint = aggregator_endpoints[ 0 ] . to_owned ( ) ;
113117
114118 let ( relay_aggregators, relay_signers, relay_passives) = Self :: start_relays (
115119 config,
116- & aggregator_endpoints,
120+ leader_aggregator. endpoint ( ) ,
121+ & follower_aggregator_endpoints,
117122 & signer_party_ids,
118123 relay_signer_registration_mode. to_owned ( ) ,
119124 relay_signature_registration_mode. to_owned ( ) ,
120125 ) ?;
121126
122127 let signers = Self :: start_signers (
123128 config,
124- leader_aggregator_endpoint ,
129+ leader_aggregator . endpoint ( ) ,
125130 signer_cardano_nodes,
126131 & relay_signers,
127132 )
@@ -132,11 +137,14 @@ impl MithrilInfrastructure {
132137 CardanoNetwork :: DevNet ( DEVNET_MAGIC_ID ) ,
133138 ) ) ;
134139
140+ let mut all_aggregators = vec ! [ leader_aggregator] ;
141+ all_aggregators. extend ( follower_aggregators) ;
142+
135143 Ok ( Self {
136144 bin_dir : config. bin_dir . to_path_buf ( ) ,
137145 artifacts_dir : config. artifacts_dir . to_path_buf ( ) ,
138146 devnet : config. devnet . clone ( ) ,
139- aggregators,
147+ aggregators : all_aggregators ,
140148 signers,
141149 relay_aggregators,
142150 relay_signers,
@@ -176,90 +184,110 @@ impl MithrilInfrastructure {
176184 + 1 ;
177185 if self . era_reader_adapter == "cardano-chain" {
178186 let devnet = self . devnet . clone ( ) ;
179- assertions:: register_era_marker ( self . aggregator ( 0 ) , & devnet, next_era, next_era_epoch)
180- . await ?;
187+ assertions:: register_era_marker (
188+ self . leader_aggregator ( ) ,
189+ & devnet,
190+ next_era,
191+ next_era_epoch,
192+ )
193+ . await ?;
181194 }
182195 let mut current_era = self . current_era . write ( ) . await ;
183196 * current_era = next_era. to_owned ( ) ;
184197
185198 Ok ( ( ) )
186199 }
187200
188- async fn start_aggregators (
201+ async fn prepare_aggregators (
189202 config : & MithrilInfrastructureConfig ,
190- pool_nodes : & [ FullNode ] ,
203+ full_nodes : & [ FullNode ] ,
191204 chain_observer_type : & str ,
192- ) -> StdResult < Vec < Aggregator > > {
193- let mut aggregators = vec ! [ ] ;
194- let mut leader_aggregator_endpoint: Option < String > = None ;
195- for ( index, full_node) in pool_nodes. iter ( ) . enumerate ( ) {
196- let aggregator_name = Aggregator :: name_suffix ( index) ;
197- let aggregator_artifacts_dir = config
198- . artifacts_dir
199- . join ( format ! ( "mithril-aggregator-{aggregator_name}" ) ) ;
200- let aggregator_store_dir =
201- config. store_dir . join ( format ! ( "aggregator-{aggregator_name}" ) ) ;
202- let aggregator = Aggregator :: new ( & AggregatorConfig {
203- index,
204- name : & aggregator_name,
205- server_port : config. server_port + index as u64 ,
205+ ) -> StdResult < ( Aggregator , Vec < Aggregator > ) > {
206+ let [ leader_node, follower_nodes @ ..] = full_nodes else {
207+ panic ! ( "Can't prepare Aggregators: No full nodes found" ) ;
208+ } ;
209+ let leader_aggregator =
210+ Self :: prepare_aggregator ( 0 , leader_node, config, chain_observer_type, None ) . await ?;
211+
212+ let mut follower_aggregators = vec ! [ ] ;
213+ for ( index, full_node) in follower_nodes. iter ( ) . enumerate ( ) {
214+ let aggregator = Self :: prepare_aggregator (
215+ index + 1 ,
206216 full_node,
207- cardano_cli_path : & config. devnet . cardano_cli_path ( ) ,
208- work_dir : & config. work_dir ,
209- store_dir : & aggregator_store_dir,
210- artifacts_dir : & aggregator_artifacts_dir,
211- bin_dir : & config. bin_dir ,
212- cardano_node_version : & config. cardano_node_version ,
213- mithril_run_interval : config. mithril_run_interval ,
214- mithril_era : & config. mithril_era ,
215- mithril_era_reader_adapter : & config. mithril_era_reader_adapter ,
216- mithril_era_marker_address : & config. devnet . mithril_era_marker_address ( ) ?,
217- signed_entity_types : & config. signed_entity_types ,
217+ config,
218218 chain_observer_type,
219- leader_aggregator_endpoint : & leader_aggregator_endpoint. clone ( ) ,
220- } ) ?;
221-
222- aggregator
223- . set_protocol_parameters ( & ProtocolParameters {
224- k : 70 ,
225- m : 105 ,
226- phi_f : 0.95 ,
227- } )
228- . await ;
229-
230- if leader_aggregator_endpoint. is_none ( )
231- && config. has_leader_follower_signer_registration ( )
232- {
233- leader_aggregator_endpoint = Some ( aggregator. endpoint ( ) ) ;
234- }
235-
236- aggregators. push ( aggregator) ;
219+ Some ( leader_aggregator. endpoint ( ) ) ,
220+ )
221+ . await ?;
222+ follower_aggregators. push ( aggregator) ;
237223 }
238224
239- Self :: register_startup_era ( & aggregators[ 0 ] , config) . await ?;
240-
241- for aggregator in & aggregators {
242- aggregator. serve ( ) . await ?;
243- }
225+ Ok ( ( leader_aggregator, follower_aggregators) )
226+ }
244227
245- Ok ( aggregators)
228+ async fn prepare_aggregator (
229+ index : usize ,
230+ full_node : & FullNode ,
231+ config : & MithrilInfrastructureConfig ,
232+ chain_observer_type : & str ,
233+ leader_aggregator_endpoint : Option < String > ,
234+ ) -> StdResult < Aggregator > {
235+ let aggregator_name = Aggregator :: name_suffix ( index) ;
236+ let aggregator_artifacts_dir = config
237+ . artifacts_dir
238+ . join ( format ! ( "mithril-aggregator-{aggregator_name}" ) ) ;
239+ let aggregator_store_dir = config. store_dir . join ( format ! ( "aggregator-{aggregator_name}" ) ) ;
240+ let aggregator = Aggregator :: new ( & AggregatorConfig {
241+ index,
242+ name : & aggregator_name,
243+ server_port : config. server_port + index as u64 ,
244+ full_node,
245+ cardano_cli_path : & config. devnet . cardano_cli_path ( ) ,
246+ work_dir : & config. work_dir ,
247+ store_dir : & aggregator_store_dir,
248+ artifacts_dir : & aggregator_artifacts_dir,
249+ bin_dir : & config. bin_dir ,
250+ cardano_node_version : & config. cardano_node_version ,
251+ mithril_run_interval : config. mithril_run_interval ,
252+ mithril_era : & config. mithril_era ,
253+ mithril_era_reader_adapter : & config. mithril_era_reader_adapter ,
254+ mithril_era_marker_address : & config. devnet . mithril_era_marker_address ( ) ?,
255+ signed_entity_types : & config. signed_entity_types ,
256+ chain_observer_type,
257+ leader_aggregator_endpoint : & leader_aggregator_endpoint,
258+ } ) ?;
259+
260+ aggregator
261+ . set_protocol_parameters ( & ProtocolParameters {
262+ k : 70 ,
263+ m : 105 ,
264+ phi_f : 0.95 ,
265+ } )
266+ . await ;
267+
268+ Ok ( aggregator)
246269 }
247270
248271 fn start_relays (
249272 config : & MithrilInfrastructureConfig ,
250- aggregator_endpoints : & [ String ] ,
273+ leader_aggregator_endpoint : String ,
274+ follower_aggregator_endpoints : & [ String ] ,
251275 signers_party_ids : & [ PartyId ] ,
252276 relay_signer_registration_mode : String ,
253277 relay_signature_registration_mode : String ,
254278 ) -> StdResult < ( Vec < RelayAggregator > , Vec < RelaySigner > , Vec < RelayPassive > ) > {
255279 if !config. use_relays {
256280 return Ok ( ( vec ! [ ] , vec ! [ ] , vec ! [ ] ) ) ;
257281 }
282+ let aggregator_endpoints = [
283+ vec ! [ leader_aggregator_endpoint. clone( ) ] ,
284+ follower_aggregator_endpoints. to_vec ( ) ,
285+ ]
286+ . concat ( ) ;
258287
259288 let mut relay_aggregators: Vec < RelayAggregator > = vec ! [ ] ;
260289 let mut relay_signers: Vec < RelaySigner > = vec ! [ ] ;
261290 let mut relay_passives: Vec < RelayPassive > = vec ! [ ] ;
262- let leader_aggregator_endpoint = & aggregator_endpoints[ 0 ] ;
263291
264292 info ! ( "Starting the Mithril infrastructure in P2P mode (experimental)" ) ;
265293
@@ -287,7 +315,7 @@ impl MithrilInfrastructure {
287315 dial_to : bootstrap_peer_addr. clone ( ) ,
288316 relay_signer_registration_mode : relay_signer_registration_mode. clone ( ) ,
289317 relay_signature_registration_mode : relay_signature_registration_mode. clone ( ) ,
290- aggregator_endpoint : leader_aggregator_endpoint,
318+ aggregator_endpoint : & leader_aggregator_endpoint,
291319 party_id : party_id. clone ( ) ,
292320 work_dir : & config. work_dir ,
293321 bin_dir : & config. bin_dir ,
@@ -377,7 +405,7 @@ impl MithrilInfrastructure {
377405 signer. stop ( ) . await ?;
378406 }
379407
380- for aggregator in & self . aggregators {
408+ for aggregator in self . aggregators ( ) {
381409 aggregator. stop ( ) . await ?;
382410 }
383411
@@ -396,6 +424,18 @@ impl MithrilInfrastructure {
396424 & self . aggregators [ index]
397425 }
398426
427+ pub fn leader_aggregator ( & self ) -> & Aggregator {
428+ & self . aggregators [ 0 ]
429+ }
430+
431+ pub fn follower_aggregators ( & self ) -> & [ Aggregator ] {
432+ & self . aggregators [ 1 ..]
433+ }
434+
435+ pub fn follower_aggregator ( & self , index : usize ) -> & Aggregator {
436+ & self . aggregators [ index + 1 ]
437+ }
438+
399439 pub fn signers ( & self ) -> & [ Signer ] {
400440 & self . signers
401441 }
0 commit comments