1717
1818#[ cfg( unix) ]
1919use std:: os:: unix:: io:: { AsRawFd , FromRawFd , RawFd } ;
20+ use std:: time:: Duration ;
2021
2122use protobuf:: { CodedInputStream , Message } ;
2223use std:: collections:: HashMap ;
@@ -40,6 +41,7 @@ use crate::{MethodHandler, TtrpcContext};
4041const DEFAULT_WAIT_THREAD_COUNT_DEFAULT : usize = 3 ;
4142const DEFAULT_WAIT_THREAD_COUNT_MIN : usize = 1 ;
4243const DEFAULT_WAIT_THREAD_COUNT_MAX : usize = 5 ;
44+ const DEFAULT_ACCEPT_RETRY_INTERVAL : Duration = Duration :: from_secs ( 10 ) ;
4345
4446type MessageSender = Sender < ( MessageHeader , Vec < u8 > ) > ;
4547type MessageReceiver = Receiver < ( MessageHeader , Vec < u8 > ) > ;
@@ -57,6 +59,7 @@ pub struct Server {
5759 thread_count_default : usize ,
5860 thread_count_min : usize ,
5961 thread_count_max : usize ,
62+ accept_retry_interval : Duration ,
6063}
6164
6265struct Connection {
@@ -244,6 +247,7 @@ impl Default for Server {
244247 thread_count_default : DEFAULT_WAIT_THREAD_COUNT_DEFAULT ,
245248 thread_count_min : DEFAULT_WAIT_THREAD_COUNT_MIN ,
246249 thread_count_max : DEFAULT_WAIT_THREAD_COUNT_MAX ,
250+ accept_retry_interval : DEFAULT_ACCEPT_RETRY_INTERVAL ,
247251 }
248252 }
249253}
@@ -305,6 +309,11 @@ impl Server {
305309 self
306310 }
307311
312+ pub fn set_accept_retry_interval ( mut self , interval : Duration ) -> Server {
313+ self . accept_retry_interval = interval;
314+ self
315+ }
316+
308317 pub fn start_listen ( & mut self ) -> Result < ( ) > {
309318 let connections = self . connections . clone ( ) ;
310319
@@ -320,6 +329,7 @@ impl Server {
320329 let min = self . thread_count_min ;
321330 let max = self . thread_count_max ;
322331 let listener_quit_flag = self . listener_quit_flag . clone ( ) ;
332+ let accept_retry_interval = self . accept_retry_interval ;
323333
324334 let reaper_tx = match self . reaper . take ( ) {
325335 None => {
@@ -373,6 +383,14 @@ impl Server {
373383 }
374384 Err ( e) => {
375385 error ! ( "listener accept got {:?}" , e) ;
386+
387+ // Resource limit errors can't be recoverd in short time
388+ // and the poll(2) is level-triggered, an uncorrected error can lead to an infinite loop,
389+ // so we sleep for a while and wait for the error to be corrected.
390+ if is_resource_limit_error ( e) {
391+ thread:: sleep ( accept_retry_interval) ;
392+ }
393+
376394 continue ;
377395 }
378396 } ;
@@ -597,3 +615,11 @@ fn quit_connection(quit: Arc<AtomicBool>, control_tx: SyncSender<()>) {
597615 . send ( ( ) )
598616 . unwrap_or_else ( |err| debug ! ( "Failed to send {:?}" , err) ) ;
599617}
618+
619+ fn is_resource_limit_error ( e : std:: io:: Error ) -> bool {
620+ if let Some ( err) = e. raw_os_error ( ) {
621+ return [ libc:: EMFILE , libc:: ENFILE , libc:: ENOBUFS , libc:: ENOMEM ] . contains ( & err) ;
622+ }
623+
624+ false
625+ }
0 commit comments