@@ -3,7 +3,6 @@ use futures::{
33 stream:: BoxStream ,
44 Future , FutureExt , Stream , StreamExt ,
55} ;
6- use once_cell:: sync:: Lazy ;
76use serde:: Deserialize ;
87use snafu:: prelude:: * ;
98use std:: {
@@ -20,7 +19,7 @@ use tokio::{
2019 join,
2120 process:: { Child , ChildStdin , ChildStdout , Command } ,
2221 select,
23- sync:: { mpsc, oneshot, OnceCell } ,
22+ sync:: { mpsc, oneshot, OnceCell , OwnedSemaphorePermit , Semaphore } ,
2423 task:: { JoinHandle , JoinSet } ,
2524 time:: { self , MissedTickBehavior } ,
2625} ;
@@ -821,6 +820,97 @@ enum DemultiplexCommand {
821820 ListenOnce ( JobId , oneshot:: Sender < WorkerMessage > ) ,
822821}
823822
823+ #[ derive( Debug , Copy , Clone ) ]
824+ pub struct CoordinatorId {
825+ start : u64 ,
826+ id : u64 ,
827+ }
828+
829+ /// Enforces a limited number of concurrent `Coordinator`s.
830+ #[ derive( Debug ) ]
831+ pub struct CoordinatorFactory {
832+ semaphore : Arc < Semaphore > ,
833+
834+ start : u64 ,
835+ id : AtomicU64 ,
836+ }
837+
838+ impl CoordinatorFactory {
839+ pub fn new ( maximum : usize ) -> Self {
840+ let semaphore = Arc :: new ( Semaphore :: new ( maximum) ) ;
841+
842+ let now = std:: time:: SystemTime :: now ( ) ;
843+ let start = now
844+ . duration_since ( std:: time:: UNIX_EPOCH )
845+ . unwrap_or_default ( )
846+ . as_secs ( ) ;
847+
848+ let id = AtomicU64 :: new ( 0 ) ;
849+
850+ Self {
851+ semaphore,
852+ start,
853+ id,
854+ }
855+ }
856+
857+ fn next_id ( & self ) -> CoordinatorId {
858+ let start = self . start ;
859+ let id = self . id . fetch_add ( 1 , Ordering :: SeqCst ) ;
860+
861+ CoordinatorId { start, id }
862+ }
863+
864+ pub async fn build < B > ( & self ) -> LimitedCoordinator < B >
865+ where
866+ B : Backend + From < CoordinatorId > ,
867+ {
868+ let semaphore = self . semaphore . clone ( ) ;
869+ let permit = semaphore
870+ . acquire_owned ( )
871+ . await
872+ . expect ( "Unable to acquire permit" ) ;
873+
874+ let id = self . next_id ( ) ;
875+ let backend = B :: from ( id) ;
876+
877+ let coordinator = Coordinator :: new ( backend) ;
878+
879+ LimitedCoordinator {
880+ coordinator,
881+ _permit : permit,
882+ }
883+ }
884+ }
885+
886+ pub struct LimitedCoordinator < T > {
887+ coordinator : Coordinator < T > ,
888+ _permit : OwnedSemaphorePermit ,
889+ }
890+
891+ impl < T > LimitedCoordinator < T >
892+ where
893+ T : Backend ,
894+ {
895+ pub async fn shutdown ( self ) -> Result < T > {
896+ self . coordinator . shutdown ( ) . await
897+ }
898+ }
899+
900+ impl < T > ops:: Deref for LimitedCoordinator < T > {
901+ type Target = Coordinator < T > ;
902+
903+ fn deref ( & self ) -> & Self :: Target {
904+ & self . coordinator
905+ }
906+ }
907+
908+ impl < T > ops:: DerefMut for LimitedCoordinator < T > {
909+ fn deref_mut ( & mut self ) -> & mut Self :: Target {
910+ & mut self . coordinator
911+ }
912+ }
913+
824914#[ derive( Debug ) ]
825915pub struct Coordinator < B > {
826916 backend : B ,
@@ -844,7 +934,7 @@ impl<B> Coordinator<B>
844934where
845935 B : Backend ,
846936{
847- pub async fn new ( backend : B ) -> Self {
937+ pub fn new ( backend : B ) -> Self {
848938 let token = CancellationToken :: new ( ) ;
849939
850940 Self {
@@ -1089,12 +1179,6 @@ where
10891179 }
10901180}
10911181
1092- impl Coordinator < DockerBackend > {
1093- pub async fn new_docker ( ) -> Self {
1094- Self :: new ( DockerBackend ( ( ) ) ) . await
1095- }
1096- }
1097-
10981182#[ derive( Debug ) ]
10991183struct Container {
11001184 task : JoinHandle < Result < ( ) > > ,
@@ -2521,24 +2605,26 @@ fn basic_secure_docker_command() -> Command {
25212605 )
25222606}
25232607
2524- static DOCKER_BACKEND_START : Lazy < u64 > = Lazy :: new ( || {
2525- use std:: time;
2526-
2527- let now = time:: SystemTime :: now ( ) ;
2528- now. duration_since ( time:: UNIX_EPOCH )
2529- . unwrap_or_default ( )
2530- . as_secs ( )
2531- } ) ;
2532-
2533- static DOCKER_BACKEND_ID : AtomicU64 = AtomicU64 :: new ( 0 ) ;
2608+ pub struct DockerBackend {
2609+ id : CoordinatorId ,
2610+ instance : AtomicU64 ,
2611+ }
25342612
2535- pub struct DockerBackend ( ( ) ) ;
2613+ impl From < CoordinatorId > for DockerBackend {
2614+ fn from ( id : CoordinatorId ) -> Self {
2615+ Self {
2616+ id,
2617+ instance : Default :: default ( ) ,
2618+ }
2619+ }
2620+ }
25362621
25372622impl DockerBackend {
25382623 fn next_name ( & self ) -> String {
2539- let start = * DOCKER_BACKEND_START ;
2540- let id = DOCKER_BACKEND_ID . fetch_add ( 1 , Ordering :: SeqCst ) ;
2541- format ! ( "playground-{start}-{id}" )
2624+ let CoordinatorId { start, id } = self . id ;
2625+ let instance = self . instance . fetch_add ( 1 , Ordering :: SeqCst ) ;
2626+
2627+ format ! ( "playground-{start}-{id}-{instance}" )
25422628 }
25432629}
25442630
@@ -2697,14 +2783,10 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok
26972783#[ cfg( test) ]
26982784mod tests {
26992785 use assertables:: * ;
2700- use futures:: {
2701- future:: { join, try_join_all} ,
2702- Future , FutureExt ,
2703- } ;
2786+ use futures:: future:: { join, try_join_all} ;
27042787 use once_cell:: sync:: Lazy ;
2705- use std:: { env, sync:: Once , time :: Duration } ;
2788+ use std:: { env, sync:: Once } ;
27062789 use tempdir:: TempDir ;
2707- use tokio:: sync:: { OwnedSemaphorePermit , Semaphore } ;
27082790
27092791 use super :: * ;
27102792
@@ -2726,8 +2808,8 @@ mod tests {
27262808 project_dir : TempDir ,
27272809 }
27282810
2729- impl TestBackend {
2730- fn new ( ) -> Self {
2811+ impl From < CoordinatorId > for TestBackend {
2812+ fn from ( _id : CoordinatorId ) -> Self {
27312813 static COMPILE_WORKER_ONCE : Once = Once :: new ( ) ;
27322814
27332815 COMPILE_WORKER_ONCE . call_once ( || {
@@ -2781,63 +2863,18 @@ mod tests {
27812863 . unwrap_or ( 5 )
27822864 } ) ;
27832865
2784- static CONCURRENT_TEST_SEMAPHORE : Lazy < Arc < Semaphore > > =
2785- Lazy :: new ( || Arc :: new ( Semaphore :: new ( * MAX_CONCURRENT_TESTS ) ) ) ;
2786-
2787- struct RestrictedCoordinator < T > {
2788- _permit : OwnedSemaphorePermit ,
2789- coordinator : Coordinator < T > ,
2790- }
2791-
2792- impl < T > RestrictedCoordinator < T >
2793- where
2794- T : Backend ,
2795- {
2796- async fn with < F , Fut > ( f : F ) -> Self
2797- where
2798- F : FnOnce ( ) -> Fut ,
2799- Fut : Future < Output = Coordinator < T > > ,
2800- {
2801- let semaphore = CONCURRENT_TEST_SEMAPHORE . clone ( ) ;
2802- let permit = semaphore
2803- . acquire_owned ( )
2804- . await
2805- . expect ( "Unable to acquire permit" ) ;
2806- let coordinator = f ( ) . await ;
2807- Self {
2808- _permit : permit,
2809- coordinator,
2810- }
2811- }
2812-
2813- async fn shutdown ( self ) -> super :: Result < T , super :: Error > {
2814- self . coordinator . shutdown ( ) . await
2815- }
2816- }
2817-
2818- impl < T > ops:: Deref for RestrictedCoordinator < T > {
2819- type Target = Coordinator < T > ;
2820-
2821- fn deref ( & self ) -> & Self :: Target {
2822- & self . coordinator
2823- }
2824- }
2825-
2826- impl < T > ops:: DerefMut for RestrictedCoordinator < T > {
2827- fn deref_mut ( & mut self ) -> & mut Self :: Target {
2828- & mut self . coordinator
2829- }
2830- }
2866+ static TEST_COORDINATOR_FACTORY : Lazy < CoordinatorFactory > =
2867+ Lazy :: new ( || CoordinatorFactory :: new ( * MAX_CONCURRENT_TESTS ) ) ;
28312868
2832- async fn new_coordinator_test ( ) -> RestrictedCoordinator < impl Backend > {
2833- RestrictedCoordinator :: with ( || Coordinator :: new ( TestBackend :: new ( ) ) ) . await
2869+ async fn new_coordinator_test ( ) -> LimitedCoordinator < TestBackend > {
2870+ TEST_COORDINATOR_FACTORY . build ( ) . await
28342871 }
28352872
2836- async fn new_coordinator_docker ( ) -> RestrictedCoordinator < impl Backend > {
2837- RestrictedCoordinator :: with ( || Coordinator :: new_docker ( ) ) . await
2873+ async fn new_coordinator_docker ( ) -> LimitedCoordinator < DockerBackend > {
2874+ TEST_COORDINATOR_FACTORY . build ( ) . await
28382875 }
28392876
2840- async fn new_coordinator ( ) -> RestrictedCoordinator < impl Backend > {
2877+ async fn new_coordinator ( ) -> LimitedCoordinator < impl Backend > {
28412878 #[ cfg( not( force_docker) ) ]
28422879 {
28432880 new_coordinator_test ( ) . await
0 commit comments