@@ -605,10 +605,12 @@ impl Container {
605605 ) -> Result < WithOutput < ExecuteResponse > , ExecuteError > {
606606 let ActiveExecution {
607607 task,
608+ stdin_tx,
608609 stdout_rx,
609610 stderr_rx,
610611 } = self . begin_execute ( request) . await ?;
611612
613+ drop ( stdin_tx) ;
612614 WithOutput :: try_absorb ( task, stdout_rx, stderr_rx) . await
613615 }
614616
@@ -636,6 +638,7 @@ impl Container {
636638
637639 let SpawnCargo {
638640 task,
641+ stdin_tx,
639642 stdout_rx,
640643 stderr_rx,
641644 } = self
@@ -660,6 +663,7 @@ impl Container {
660663
661664 Ok ( ActiveExecution {
662665 task,
666+ stdin_tx,
663667 stdout_rx,
664668 stderr_rx,
665669 } )
@@ -707,13 +711,16 @@ impl Container {
707711
708712 let SpawnCargo {
709713 task,
714+ stdin_tx,
710715 stdout_rx,
711716 stderr_rx,
712717 } = self
713718 . spawn_cargo_task ( execute_cargo)
714719 . await
715720 . context ( CouldNotStartCargoSnafu ) ?;
716721
722+ drop ( stdin_tx) ;
723+
717724 let commander = self . commander . clone ( ) ;
718725 let task = async move {
719726 let ExecuteCommandResponse {
@@ -758,41 +765,53 @@ impl Container {
758765 ) -> Result < SpawnCargo , SpawnCargoError > {
759766 use spawn_cargo_error:: * ;
760767
768+ let ( stdin_tx, mut stdin_rx) = mpsc:: channel ( 8 ) ;
761769 let ( stdout_tx, stdout_rx) = mpsc:: channel ( 8 ) ;
762770 let ( stderr_tx, stderr_rx) = mpsc:: channel ( 8 ) ;
763771
764- let mut from_worker_rx = self
772+ let ( to_worker_tx , mut from_worker_rx) = self
765773 . commander
766774 . many ( execute_cargo)
767775 . await
768776 . context ( CouldNotStartCargoSnafu ) ?;
769777
770778 let task = tokio:: spawn ( {
771779 async move {
772- while let Some ( container_msg) = from_worker_rx. recv ( ) . await {
773- trace ! ( "processing {container_msg:?}" ) ;
774-
775- match container_msg {
776- WorkerMessage :: ExecuteCommand ( resp) => {
777- return Ok ( resp) ;
778- }
779- WorkerMessage :: StdoutPacket ( packet) => {
780- stdout_tx. send ( packet) . await . ok ( /* Receiver gone, that's OK */ ) ;
781- }
782- WorkerMessage :: StderrPacket ( packet) => {
783- stderr_tx. send ( packet) . await . ok ( /* Receiver gone, that's OK */ ) ;
784- }
785- _ => return UnexpectedMessageSnafu . fail ( ) ,
780+ loop {
781+ select ! {
782+ Some ( stdin) = stdin_rx. recv( ) => {
783+ let msg = CoordinatorMessage :: StdinPacket ( stdin) ;
784+ trace!( "processing {msg:?}" ) ;
785+ to_worker_tx. send( msg) . await . context( StdinSnafu ) ?;
786+ } ,
787+
788+ Some ( container_msg) = from_worker_rx. recv( ) => {
789+ trace!( "processing {container_msg:?}" ) ;
790+
791+ match container_msg {
792+ WorkerMessage :: ExecuteCommand ( resp) => {
793+ return Ok ( resp) ;
794+ }
795+ WorkerMessage :: StdoutPacket ( packet) => {
796+ stdout_tx. send( packet) . await . ok( /* Receiver gone, that's OK */ ) ;
797+ }
798+ WorkerMessage :: StderrPacket ( packet) => {
799+ stderr_tx. send( packet) . await . ok( /* Receiver gone, that's OK */ ) ;
800+ }
801+ _ => return UnexpectedMessageSnafu . fail( ) ,
802+ }
803+ } ,
804+
805+ else => return UnexpectedEndOfMessagesSnafu . fail( ) ,
786806 }
787807 }
788-
789- UnexpectedEndOfMessagesSnafu . fail ( )
790808 }
791809 . instrument ( trace_span ! ( "cargo task" ) . or_current ( ) )
792810 } ) ;
793811
794812 Ok ( SpawnCargo {
795813 task,
814+ stdin_tx,
796815 stdout_rx,
797816 stderr_rx,
798817 } )
@@ -812,6 +831,7 @@ impl Container {
812831
813832pub struct ActiveExecution {
814833 pub task : BoxFuture < ' static , Result < ExecuteResponse , ExecuteError > > ,
834+ pub stdin_tx : mpsc:: Sender < String > ,
815835 pub stdout_rx : mpsc:: Receiver < String > ,
816836 pub stderr_rx : mpsc:: Receiver < String > ,
817837}
@@ -820,6 +840,7 @@ impl fmt::Debug for ActiveExecution {
820840 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
821841 f. debug_struct ( "ActiveExecution" )
822842 . field ( "task" , & "<future>" )
843+ . field ( "stdin_tx" , & self . stdin_tx )
823844 . field ( "stdout_rx" , & self . stdout_rx )
824845 . field ( "stderr_rx" , & self . stderr_rx )
825846 . finish ( )
@@ -900,6 +921,7 @@ pub enum CompileError {
900921
901922struct SpawnCargo {
902923 task : JoinHandle < Result < ExecuteCommandResponse , SpawnCargoError > > ,
924+ stdin_tx : mpsc:: Sender < String > ,
903925 stdout_rx : mpsc:: Receiver < String > ,
904926 stderr_rx : mpsc:: Receiver < String > ,
905927}
@@ -915,6 +937,9 @@ pub enum SpawnCargoError {
915937
916938 #[ snafu( display( "There are no more messages" ) ) ]
917939 UnexpectedEndOfMessages ,
940+
941+ #[ snafu( display( "Unable to send stdin message" ) ) ]
942+ Stdin { source : MultiplexedSenderError } ,
918943}
919944
920945#[ derive( Debug , Clone ) ]
@@ -1173,7 +1198,10 @@ impl Commander {
11731198 }
11741199 }
11751200
1176- async fn many < M > ( & self , message : M ) -> Result < mpsc:: Receiver < WorkerMessage > , CommanderError >
1201+ async fn many < M > (
1202+ & self ,
1203+ message : M ,
1204+ ) -> Result < ( MultiplexedSender , mpsc:: Receiver < WorkerMessage > ) , CommanderError >
11771205 where
11781206 M : Into < CoordinatorMessage > ,
11791207 {
@@ -1190,7 +1218,7 @@ impl Commander {
11901218 . await
11911219 . context ( UnableToStartManySnafu ) ?;
11921220
1193- Ok ( from_worker_rx)
1221+ Ok ( ( to_worker_tx , from_worker_rx) )
11941222 }
11951223}
11961224
@@ -1509,15 +1537,20 @@ mod tests {
15091537 //Coordinator::new_docker().await
15101538 }
15111539
1540+ const ARBITRARY_EXECUTE_REQUEST : ExecuteRequest = ExecuteRequest {
1541+ channel : Channel :: Stable ,
1542+ mode : Mode :: Debug ,
1543+ edition : Edition :: Rust2021 ,
1544+ crate_type : CrateType :: Binary ,
1545+ tests : false ,
1546+ backtrace : false ,
1547+ code : String :: new ( ) ,
1548+ } ;
1549+
15121550 fn new_execute_request ( ) -> ExecuteRequest {
15131551 ExecuteRequest {
1514- channel : Channel :: Stable ,
1515- mode : Mode :: Debug ,
1516- edition : Edition :: Rust2021 ,
1517- crate_type : CrateType :: Binary ,
1518- tests : false ,
1519- backtrace : false ,
15201552 code : r#"fn main() { println!("Hello, coordinator!"); }"# . into ( ) ,
1553+ ..ARBITRARY_EXECUTE_REQUEST
15211554 }
15221555 }
15231556
@@ -1724,6 +1757,51 @@ mod tests {
17241757 Ok ( ( ) )
17251758 }
17261759
1760+ #[ tokio:: test]
1761+ #[ snafu:: report]
1762+ async fn execute_stdin ( ) -> Result < ( ) > {
1763+ let coordinator = new_coordinator ( ) . await ;
1764+
1765+ let request = ExecuteRequest {
1766+ code : r#"
1767+ fn main() {
1768+ let mut input = String::new();
1769+ if std::io::stdin().read_line(&mut input).is_ok() {
1770+ println!("You entered >>>{input:?}<<<");
1771+ }
1772+ }
1773+ "#
1774+ . into ( ) ,
1775+ ..ARBITRARY_EXECUTE_REQUEST
1776+ } ;
1777+
1778+ let ActiveExecution {
1779+ task,
1780+ stdin_tx,
1781+ stdout_rx,
1782+ stderr_rx,
1783+ } = coordinator. begin_execute ( request) . await . unwrap ( ) ;
1784+
1785+ stdin_tx. send ( "this is stdin\n " . into ( ) ) . await . unwrap ( ) ;
1786+ // Purposefully not dropping stdin_tx early -- a user might forget
1787+
1788+ let WithOutput {
1789+ response,
1790+ stdout,
1791+ stderr,
1792+ } = WithOutput :: try_absorb ( task, stdout_rx, stderr_rx)
1793+ . with_timeout ( )
1794+ . await
1795+ . unwrap ( ) ;
1796+
1797+ assert ! ( response. success, "{stderr}" ) ;
1798+ assert_contains ! ( stdout, r#">>>"this is stdin\n"<<<"# ) ;
1799+
1800+ coordinator. shutdown ( ) . await ?;
1801+
1802+ Ok ( ( ) )
1803+ }
1804+
17271805 const HELLO_WORLD_CODE : & str = r#"fn main() { println!("Hello World!"); }"# ;
17281806
17291807 const ARBITRARY_COMPILE_REQUEST : CompileRequest = CompileRequest {
0 commit comments