@@ -5,6 +5,8 @@ use slog::{error, warn, Logger};
55use mithril_common:: { logging:: LoggerExtensions , StdResult } ;
66use tokio:: { select, sync:: watch:: Receiver } ;
77
8+ use crate :: MetricsService ;
9+
810use super :: { CertifierService , SignatureConsumer } ;
911
1012/// A signature processor which receives signature and processes them.
@@ -24,6 +26,7 @@ pub struct SequentialSignatureProcessor {
2426 certifier : Arc < dyn CertifierService > ,
2527 stop_rx : Receiver < ( ) > ,
2628 logger : Logger ,
29+ metrics_service : Arc < MetricsService > ,
2730}
2831
2932impl SequentialSignatureProcessor {
@@ -33,12 +36,14 @@ impl SequentialSignatureProcessor {
3336 certifier : Arc < dyn CertifierService > ,
3437 stop_rx : Receiver < ( ) > ,
3538 logger : Logger ,
39+ metrics_service : Arc < MetricsService > ,
3640 ) -> Self {
3741 Self {
3842 consumer,
3943 certifier,
4044 stop_rx,
4145 logger : logger. new_with_component_name :: < Self > ( ) ,
46+ metrics_service,
4247 }
4348 }
4449}
@@ -55,6 +60,11 @@ impl SignatureProcessor for SequentialSignatureProcessor {
5560 . await
5661 {
5762 error ! ( self . logger, "Error dispatching single signature" ; "error" => ?e) ;
63+ } else {
64+ let origin_network = self . consumer . get_origin_tag ( ) ;
65+ self . metrics_service
66+ . get_signature_registration_total_received_since_startup ( )
67+ . increment ( & [ & origin_network] ) ;
5868 }
5969 }
6070 }
@@ -107,24 +117,29 @@ mod tests {
107117 #[ tokio:: test]
108118 async fn processor_process_signatures_succeeds ( ) {
109119 let logger = TestLogger :: stdout ( ) ;
120+ let single_signatures = vec ! [
121+ (
122+ fake_data:: single_signature( vec![ 1 , 2 , 3 ] ) ,
123+ SignedEntityType :: MithrilStakeDistribution ( Epoch ( 1 ) ) ,
124+ ) ,
125+ (
126+ fake_data:: single_signature( vec![ 4 , 5 , 6 ] ) ,
127+ SignedEntityType :: MithrilStakeDistribution ( Epoch ( 2 ) ) ,
128+ ) ,
129+ ] ;
130+ let single_signatures_length = single_signatures. len ( ) ;
131+ let network_origin = "test_network" ;
110132 let mock_consumer = {
111133 let mut mock_consumer = MockSignatureConsumer :: new ( ) ;
112134 mock_consumer
113135 . expect_get_signatures ( )
114- . returning ( || {
115- Ok ( vec ! [
116- (
117- fake_data:: single_signature( vec![ 1 , 2 , 3 ] ) ,
118- SignedEntityType :: MithrilStakeDistribution ( Epoch ( 1 ) ) ,
119- ) ,
120- (
121- fake_data:: single_signature( vec![ 4 , 5 , 6 ] ) ,
122- SignedEntityType :: MithrilStakeDistribution ( Epoch ( 2 ) ) ,
123- ) ,
124- ] )
125- } )
136+ . returning ( move || Ok ( single_signatures. clone ( ) ) )
126137 . times ( 1 ) ;
127138 mock_consumer
139+ . expect_get_origin_tag ( )
140+ . returning ( || network_origin. to_string ( ) )
141+ . times ( single_signatures_length) ;
142+ mock_consumer
128143 } ;
129144 let mock_certifier = {
130145 let mut mock_certifier = MockCertifierService :: new ( ) ;
@@ -144,21 +159,33 @@ mod tests {
144159 )
145160 . returning ( |_, _| Ok ( SignatureRegistrationStatus :: Registered ) )
146161 . times ( 1 ) ;
147-
148162 mock_certifier
149163 } ;
150164 let ( _stop_tx, stop_rx) = channel ( ( ) ) ;
165+ let metrics_service = MetricsService :: new ( TestLogger :: stdout ( ) ) . unwrap ( ) ;
166+ let initial_counter_value = metrics_service
167+ . get_signature_registration_total_received_since_startup ( )
168+ . get ( & [ network_origin] ) ;
169+ let metrics_service = Arc :: new ( metrics_service) ;
151170 let processor = SequentialSignatureProcessor :: new (
152171 Arc :: new ( mock_consumer) ,
153172 Arc :: new ( mock_certifier) ,
154173 stop_rx,
155174 logger,
175+ metrics_service. clone ( ) ,
156176 ) ;
157177
158178 processor
159179 . process_signatures ( )
160180 . await
161181 . expect ( "Failed to process signatures" ) ;
182+
183+ assert_eq ! (
184+ initial_counter_value + single_signatures_length as u32 ,
185+ metrics_service
186+ . get_signature_registration_total_received_since_startup( )
187+ . get( & [ network_origin] )
188+ )
162189 }
163190
164191 #[ tokio:: test]
@@ -185,11 +212,13 @@ mod tests {
185212 mock_certifier
186213 } ;
187214 let ( stop_tx, stop_rx) = channel ( ( ) ) ;
215+ let metrics_service = MetricsService :: new ( TestLogger :: stdout ( ) ) . unwrap ( ) ;
188216 let processor = SequentialSignatureProcessor :: new (
189217 Arc :: new ( fake_consumer) ,
190218 Arc :: new ( mock_certifier) ,
191219 stop_rx,
192220 logger,
221+ Arc :: new ( metrics_service) ,
193222 ) ;
194223
195224 tokio:: select!(
0 commit comments