33use std:: fmt;
44use std:: marker;
55use std:: ptr;
6+ use std:: sync:: Arc ;
67use std:: time:: Duration ;
78
89use curl_sys;
@@ -29,10 +30,15 @@ use crate::{Error, MultiError};
2930///
3031/// [multi tutorial]: https://curl.haxx.se/libcurl/c/libcurl-multi.html
3132pub struct Multi {
32- raw : * mut curl_sys :: CURLM ,
33+ raw : Arc < RawMulti > ,
3334 data : Box < MultiData > ,
3435}
3536
37+ #[ derive( Debug ) ]
38+ struct RawMulti {
39+ handle : * mut curl_sys:: CURLM ,
40+ }
41+
3642struct MultiData {
3743 socket : Box < dyn FnMut ( Socket , SocketEvents , usize ) + Send > ,
3844 timer : Box < dyn FnMut ( Option < Duration > ) -> bool + Send > ,
@@ -92,6 +98,20 @@ pub struct WaitFd {
9298 inner : curl_sys:: curl_waitfd ,
9399}
94100
101+ /// A handle that can be used to wake up a thread that's blocked in [Multi::poll].
102+ /// The handle can be passed to and used from any thread.
103+ #[ cfg( feature = "poll_7_68_0" ) ]
104+ #[ derive( Debug , Clone ) ]
105+ pub struct MultiWaker {
106+ raw : std:: sync:: Weak < RawMulti > ,
107+ }
108+
109+ #[ cfg( feature = "poll_7_68_0" ) ]
110+ unsafe impl Send for MultiWaker { }
111+
112+ #[ cfg( feature = "poll_7_68_0" ) ]
113+ unsafe impl Sync for MultiWaker { }
114+
95115impl Multi {
96116 /// Creates a new multi session through which multiple HTTP transfers can be
97117 /// initiated.
@@ -101,7 +121,7 @@ impl Multi {
101121 let ptr = curl_sys:: curl_multi_init ( ) ;
102122 assert ! ( !ptr. is_null( ) ) ;
103123 Multi {
104- raw : ptr,
124+ raw : Arc :: new ( RawMulti { handle : ptr } ) ,
105125 data : Box :: new ( MultiData {
106126 socket : Box :: new ( |_, _, _| ( ) ) ,
107127 timer : Box :: new ( |_| true ) ,
@@ -196,7 +216,7 @@ impl Multi {
196216 pub fn assign ( & self , socket : Socket , token : usize ) -> Result < ( ) , MultiError > {
197217 unsafe {
198218 cvt ( curl_sys:: curl_multi_assign (
199- self . raw ,
219+ self . raw . handle ,
200220 socket,
201221 token as * mut _ ,
202222 ) ) ?;
@@ -341,15 +361,15 @@ impl Multi {
341361 }
342362
343363 fn setopt_long ( & mut self , opt : curl_sys:: CURLMoption , val : c_long ) -> Result < ( ) , MultiError > {
344- unsafe { cvt ( curl_sys:: curl_multi_setopt ( self . raw , opt, val) ) }
364+ unsafe { cvt ( curl_sys:: curl_multi_setopt ( self . raw . handle , opt, val) ) }
345365 }
346366
347367 fn setopt_ptr (
348368 & mut self ,
349369 opt : curl_sys:: CURLMoption ,
350370 val : * const c_char ,
351371 ) -> Result < ( ) , MultiError > {
352- unsafe { cvt ( curl_sys:: curl_multi_setopt ( self . raw , opt, val) ) }
372+ unsafe { cvt ( curl_sys:: curl_multi_setopt ( self . raw . handle , opt, val) ) }
353373 }
354374
355375 /// Add an easy handle to a multi session
@@ -377,7 +397,7 @@ impl Multi {
377397 easy. transfer ( ) ;
378398
379399 unsafe {
380- cvt ( curl_sys:: curl_multi_add_handle ( self . raw , easy. raw ( ) ) ) ?;
400+ cvt ( curl_sys:: curl_multi_add_handle ( self . raw . handle , easy. raw ( ) ) ) ?;
381401 }
382402 Ok ( EasyHandle {
383403 easy,
@@ -388,7 +408,7 @@ impl Multi {
388408 /// Same as `add`, but works with the `Easy2` type.
389409 pub fn add2 < H > ( & self , easy : Easy2 < H > ) -> Result < Easy2Handle < H > , MultiError > {
390410 unsafe {
391- cvt ( curl_sys:: curl_multi_add_handle ( self . raw , easy. raw ( ) ) ) ?;
411+ cvt ( curl_sys:: curl_multi_add_handle ( self . raw . handle , easy. raw ( ) ) ) ?;
392412 }
393413 Ok ( Easy2Handle {
394414 easy,
@@ -410,7 +430,7 @@ impl Multi {
410430 pub fn remove ( & self , easy : EasyHandle ) -> Result < Easy , MultiError > {
411431 unsafe {
412432 cvt ( curl_sys:: curl_multi_remove_handle (
413- self . raw ,
433+ self . raw . handle ,
414434 easy. easy . raw ( ) ,
415435 ) ) ?;
416436 }
@@ -421,7 +441,7 @@ impl Multi {
421441 pub fn remove2 < H > ( & self , easy : Easy2Handle < H > ) -> Result < Easy2 < H > , MultiError > {
422442 unsafe {
423443 cvt ( curl_sys:: curl_multi_remove_handle (
424- self . raw ,
444+ self . raw . handle ,
425445 easy. easy . raw ( ) ,
426446 ) ) ?;
427447 }
@@ -445,7 +465,7 @@ impl Multi {
445465 let mut queue = 0 ;
446466 unsafe {
447467 loop {
448- let ptr = curl_sys:: curl_multi_info_read ( self . raw , & mut queue) ;
468+ let ptr = curl_sys:: curl_multi_info_read ( self . raw . handle , & mut queue) ;
449469 if ptr. is_null ( ) {
450470 break ;
451471 }
@@ -479,7 +499,7 @@ impl Multi {
479499 let mut remaining = 0 ;
480500 unsafe {
481501 cvt ( curl_sys:: curl_multi_socket_action (
482- self . raw ,
502+ self . raw . handle ,
483503 socket,
484504 events. bits ,
485505 & mut remaining,
@@ -507,7 +527,7 @@ impl Multi {
507527 let mut remaining = 0 ;
508528 unsafe {
509529 cvt ( curl_sys:: curl_multi_socket_action (
510- self . raw ,
530+ self . raw . handle ,
511531 curl_sys:: CURL_SOCKET_BAD ,
512532 0 ,
513533 & mut remaining,
@@ -536,7 +556,7 @@ impl Multi {
536556 pub fn get_timeout ( & self ) -> Result < Option < Duration > , MultiError > {
537557 let mut ms = 0 ;
538558 unsafe {
539- cvt ( curl_sys:: curl_multi_timeout ( self . raw , & mut ms) ) ?;
559+ cvt ( curl_sys:: curl_multi_timeout ( self . raw . handle , & mut ms) ) ?;
540560 if ms == -1 {
541561 Ok ( None )
542562 } else {
@@ -571,19 +591,72 @@ impl Multi {
571591 /// }
572592 /// ```
573593 pub fn wait ( & self , waitfds : & mut [ WaitFd ] , timeout : Duration ) -> Result < u32 , MultiError > {
574- let timeout_ms = {
575- let secs = timeout. as_secs ( ) ;
576- if secs > ( i32:: max_value ( ) / 1000 ) as u64 {
577- // Duration too large, clamp at maximum value.
578- i32:: max_value ( )
579- } else {
580- secs as i32 * 1000 + timeout. subsec_nanos ( ) as i32 / 1_000_000
581- }
582- } ;
594+ let timeout_ms = Multi :: timeout_i32 ( timeout) ;
583595 unsafe {
584596 let mut ret = 0 ;
585597 cvt ( curl_sys:: curl_multi_wait (
586- self . raw ,
598+ self . raw . handle ,
599+ waitfds. as_mut_ptr ( ) as * mut _ ,
600+ waitfds. len ( ) as u32 ,
601+ timeout_ms,
602+ & mut ret,
603+ ) ) ?;
604+ Ok ( ret as u32 )
605+ }
606+ }
607+
608+ fn timeout_i32 ( timeout : Duration ) -> i32 {
609+ let secs = timeout. as_secs ( ) ;
610+ if secs > ( i32:: MAX / 1000 ) as u64 {
611+ // Duration too large, clamp at maximum value.
612+ i32:: MAX
613+ } else {
614+ secs as i32 * 1000 + timeout. subsec_nanos ( ) as i32 / 1_000_000
615+ }
616+ }
617+
618+ /// Block until activity is detected or a timeout passes.
619+ ///
620+ /// The timeout is used in millisecond-precision. Large durations are
621+ /// clamped at the maximum value curl accepts.
622+ ///
623+ /// The returned integer will contain the number of internal file
624+ /// descriptors on which interesting events occurred.
625+ ///
626+ /// This function is a simpler alternative to using `fdset()` and `select()`
627+ /// and does not suffer from file descriptor limits.
628+ ///
629+ /// While this method is similar to [Multi::wait], with the following
630+ /// distinctions:
631+ /// * If there are no handles added to the multi, poll will honor the
632+ /// provided timeout, while [Multi::wait] returns immediately.
633+ /// * If poll has blocked due to there being no activity on the handles in
634+ /// the Multi, it can be woken up from any thread and at any time before
635+ /// the timeout expires.
636+ ///
637+ /// Requires libcurl 7.66.0 or later.
638+ ///
639+ /// # Example
640+ ///
641+ /// ```
642+ /// use curl::multi::Multi;
643+ /// use std::time::Duration;
644+ ///
645+ /// let m = Multi::new();
646+ ///
647+ /// // Add some Easy handles...
648+ ///
649+ /// while m.perform().unwrap() > 0 {
650+ /// m.poll(&mut [], Duration::from_secs(1)).unwrap();
651+ /// }
652+ /// ```
653+ #[ cfg( feature = "poll_7_68_0" ) ]
654+ pub fn poll ( & self , waitfds : & mut [ WaitFd ] , timeout : Duration ) -> Result < u32 , MultiError > {
655+ let timeout_ms = Multi :: timeout_i32 ( timeout) ;
656+ unsafe {
657+ let mut ret = 0 ;
658+ cvt ( curl_sys:: curl_multi_poll (
659+ self . raw . handle ,
587660 waitfds. as_mut_ptr ( ) as * mut _ ,
588661 waitfds. len ( ) as u32 ,
589662 timeout_ms,
@@ -593,6 +666,13 @@ impl Multi {
593666 }
594667 }
595668
669+ /// Returns a new [MultiWaker] that can be used to wake up a thread that's
670+ /// currently blocked in [Multi::poll].
671+ #[ cfg( feature = "poll_7_68_0" ) ]
672+ pub fn waker ( & self ) -> MultiWaker {
673+ MultiWaker :: new ( Arc :: downgrade ( & self . raw ) )
674+ }
675+
596676 /// Reads/writes available data from each easy handle.
597677 ///
598678 /// This function handles transfers on all the added handles that need
@@ -636,7 +716,7 @@ impl Multi {
636716 pub fn perform ( & self ) -> Result < u32 , MultiError > {
637717 unsafe {
638718 let mut ret = 0 ;
639- cvt ( curl_sys:: curl_multi_perform ( self . raw , & mut ret) ) ?;
719+ cvt ( curl_sys:: curl_multi_perform ( self . raw . handle , & mut ret) ) ?;
640720 Ok ( ret as u32 )
641721 }
642722 }
@@ -684,7 +764,11 @@ impl Multi {
684764 let write = write. map ( |r| r as * mut _ ) . unwrap_or ( ptr:: null_mut ( ) ) ;
685765 let except = except. map ( |r| r as * mut _ ) . unwrap_or ( ptr:: null_mut ( ) ) ;
686766 cvt ( curl_sys:: curl_multi_fdset (
687- self . raw , read, write, except, & mut ret,
767+ self . raw . handle ,
768+ read,
769+ write,
770+ except,
771+ & mut ret,
688772 ) ) ?;
689773 if ret == -1 {
690774 Ok ( None )
@@ -710,11 +794,38 @@ impl Multi {
710794
711795 /// Get a pointer to the raw underlying CURLM handle.
712796 pub fn raw ( & self ) -> * mut curl_sys:: CURLM {
713- self . raw
797+ self . raw . handle
714798 }
799+ }
715800
716- unsafe fn close_impl ( & self ) -> Result < ( ) , MultiError > {
717- cvt ( curl_sys:: curl_multi_cleanup ( self . raw ) )
801+ impl Drop for RawMulti {
802+ fn drop ( & mut self ) {
803+ unsafe {
804+ let _ = cvt ( curl_sys:: curl_multi_cleanup ( self . handle ) ) ;
805+ }
806+ }
807+ }
808+
809+ #[ cfg( feature = "poll_7_68_0" ) ]
810+ impl MultiWaker {
811+ /// Creates a new MultiWaker handle.
812+ fn new ( raw : std:: sync:: Weak < RawMulti > ) -> Self {
813+ Self { raw }
814+ }
815+
816+ /// Wakes up a thread that is blocked in [Multi::poll]. This method can be
817+ /// invoked from any thread.
818+ ///
819+ /// Will return an error if the RawMulti has already been dropped.
820+ ///
821+ /// Requires libcurl 7.68.0 or later.
822+ pub fn wakeup ( & self ) -> Result < ( ) , MultiError > {
823+ if let Some ( raw) = self . raw . upgrade ( ) {
824+ unsafe { cvt ( curl_sys:: curl_multi_wakeup ( raw. handle ) ) }
825+ } else {
826+ // This happens if the RawMulti has already been dropped:
827+ Err ( MultiError :: new ( curl_sys:: CURLM_BAD_HANDLE ) )
828+ }
718829 }
719830}
720831
@@ -732,12 +843,6 @@ impl fmt::Debug for Multi {
732843 }
733844}
734845
735- impl Drop for Multi {
736- fn drop ( & mut self ) {
737- let _ = unsafe { self . close_impl ( ) } ;
738- }
739- }
740-
741846macro_rules! impl_easy_getters {
742847 ( ) => {
743848 impl_easy_getters! {
0 commit comments