@@ -64,19 +64,16 @@ fn limit_by_node_id(allowed_nodes: HashSet<NodeId>) -> EventSender {
6464 let ( tx, mut rx) = tokio:: sync:: mpsc:: channel ( 32 ) ;
6565 n0_future:: task:: spawn ( async move {
6666 while let Some ( msg) = rx. recv ( ) . await {
67- match msg {
68- ProviderMessage :: ClientConnected ( msg) => {
69- let node_id = msg. node_id ;
70- let res = if allowed_nodes. contains ( & node_id) {
71- println ! ( "Client connected: {node_id}" ) ;
72- Ok ( ( ) )
73- } else {
74- println ! ( "Client rejected: {node_id}" ) ;
75- Err ( AbortReason :: Permission )
76- } ;
77- msg. tx . send ( res) . await . ok ( ) ;
78- }
79- _ => { }
67+ if let ProviderMessage :: ClientConnected ( msg) = msg {
68+ let node_id = msg. node_id ;
69+ let res = if allowed_nodes. contains ( & node_id) {
70+ println ! ( "Client connected: {node_id}" ) ;
71+ Ok ( ( ) )
72+ } else {
73+ println ! ( "Client rejected: {node_id}" ) ;
74+ Err ( AbortReason :: Permission )
75+ } ;
76+ msg. tx . send ( res) . await . ok ( ) ;
8077 }
8178 }
8279 } ) ;
@@ -93,21 +90,18 @@ fn limit_by_hash(allowed_hashes: HashSet<Hash>) -> EventSender {
9390 let ( tx, mut rx) = tokio:: sync:: mpsc:: channel ( 32 ) ;
9491 n0_future:: task:: spawn ( async move {
9592 while let Some ( msg) = rx. recv ( ) . await {
96- match msg {
97- ProviderMessage :: GetRequestReceived ( msg) => {
98- let res = if !msg. request . ranges . is_blob ( ) {
99- println ! ( "HashSeq request not allowed" ) ;
100- Err ( AbortReason :: Permission )
101- } else if !allowed_hashes. contains ( & msg. request . hash ) {
102- println ! ( "Request for hash {} not allowed" , msg. request. hash) ;
103- Err ( AbortReason :: Permission )
104- } else {
105- println ! ( "Request for hash {} allowed" , msg. request. hash) ;
106- Ok ( ( ) )
107- } ;
108- msg. tx . send ( res) . await . ok ( ) ;
109- }
110- _ => { }
93+ if let ProviderMessage :: GetRequestReceived ( msg) = msg {
94+ let res = if !msg. request . ranges . is_blob ( ) {
95+ println ! ( "HashSeq request not allowed" ) ;
96+ Err ( AbortReason :: Permission )
97+ } else if !allowed_hashes. contains ( & msg. request . hash ) {
98+ println ! ( "Request for hash {} not allowed" , msg. request. hash) ;
99+ Err ( AbortReason :: Permission )
100+ } else {
101+ println ! ( "Request for hash {} allowed" , msg. request. hash) ;
102+ Ok ( ( ) )
103+ } ;
104+ msg. tx . send ( res) . await . ok ( ) ;
111105 }
112106 }
113107 } ) ;
@@ -124,18 +118,15 @@ fn throttle(delay_ms: u64) -> EventSender {
124118 let ( tx, mut rx) = tokio:: sync:: mpsc:: channel ( 32 ) ;
125119 n0_future:: task:: spawn ( async move {
126120 while let Some ( msg) = rx. recv ( ) . await {
127- match msg {
128- ProviderMessage :: Throttle ( msg) => {
129- n0_future:: task:: spawn ( async move {
130- println ! (
131- "Throttling {} {}, {}ms" ,
132- msg. connection_id, msg. request_id, delay_ms
133- ) ;
134- tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( delay_ms) ) . await ;
135- msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
136- } ) ;
137- }
138- _ => { }
121+ if let ProviderMessage :: Throttle ( msg) = msg {
122+ n0_future:: task:: spawn ( async move {
123+ println ! (
124+ "Throttling {} {}, {}ms" ,
125+ msg. connection_id, msg. request_id, delay_ms
126+ ) ;
127+ tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( delay_ms) ) . await ;
128+ msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
129+ } ) ;
139130 }
140131 }
141132 } ) ;
@@ -153,40 +144,36 @@ fn limit_max_connections(max_connections: usize) -> EventSender {
153144 n0_future:: task:: spawn ( async move {
154145 let requests = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
155146 while let Some ( msg) = rx. recv ( ) . await {
156- match msg {
157- ProviderMessage :: GetRequestReceived ( mut msg) => {
158- let connection_id = msg. connection_id ;
159- let request_id = msg. request_id ;
160- let res = requests. fetch_update ( Ordering :: SeqCst , Ordering :: SeqCst , |n| {
161- if n >= max_connections {
162- None
163- } else {
164- Some ( n + 1 )
165- }
166- } ) ;
167- match res {
168- Ok ( n) => {
169- println ! ( "Accepting request {n}, id ({connection_id},{request_id})" ) ;
170- msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
171- }
172- Err ( _) => {
173- println ! (
174- "Connection limit of {} exceeded, rejecting request" ,
175- max_connections
176- ) ;
177- msg. tx . send ( Err ( AbortReason :: RateLimited ) ) . await . ok ( ) ;
178- continue ;
179- }
147+ if let ProviderMessage :: GetRequestReceived ( mut msg) = msg {
148+ let connection_id = msg. connection_id ;
149+ let request_id = msg. request_id ;
150+ let res = requests. fetch_update ( Ordering :: SeqCst , Ordering :: SeqCst , |n| {
151+ if n >= max_connections {
152+ None
153+ } else {
154+ Some ( n + 1 )
155+ }
156+ } ) ;
157+ match res {
158+ Ok ( n) => {
159+ println ! ( "Accepting request {n}, id ({connection_id},{request_id})" ) ;
160+ msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
161+ }
162+ Err ( _) => {
163+ println ! (
164+ "Connection limit of {max_connections} exceeded, rejecting request"
165+ ) ;
166+ msg. tx . send ( Err ( AbortReason :: RateLimited ) ) . await . ok ( ) ;
167+ continue ;
180168 }
181- let requests = requests. clone ( ) ;
182- n0_future:: task:: spawn ( async move {
183- // just drain the per request events
184- while let Ok ( Some ( _) ) = msg. rx . recv ( ) . await { }
185- println ! ( "Stopping request, id ({connection_id},{request_id})" ) ;
186- requests. fetch_sub ( 1 , Ordering :: SeqCst ) ;
187- } ) ;
188169 }
189- _ => { }
170+ let requests = requests. clone ( ) ;
171+ n0_future:: task:: spawn ( async move {
172+ // just drain the per request events
173+ while let Ok ( Some ( _) ) = msg. rx . recv ( ) . await { }
174+ println ! ( "Stopping request, id ({connection_id},{request_id})" ) ;
175+ requests. fetch_sub ( 1 , Ordering :: SeqCst ) ;
176+ } ) ;
190177 }
191178 }
192179 } ) ;
@@ -218,7 +205,7 @@ async fn main() -> anyhow::Result<()> {
218205 . bytes_and_stats ( )
219206 . await ?;
220207 println ! ( "Downloaded {} bytes" , data. len( ) ) ;
221- println ! ( "Stats: {:?}" , stats ) ;
208+ println ! ( "Stats: {stats :?}" ) ;
222209 }
223210 Args :: ByNodeId {
224211 paths,
0 commit comments