@@ -67,20 +67,23 @@ type IOConnectorPair struct {
6767}
6868
6969func (connectorPair * IOConnectorPair ) proxy (
70- proc * vmProc ,
70+ ctx context. Context ,
7171 logger * logrus.Entry ,
7272 timeoutAfterExit time.Duration ,
7373) (ioInitDone <- chan error , ioCopyDone <- chan error ) {
7474 initDone := make (chan error , 2 )
7575 copyDone := make (chan error )
7676
77+ ioCtx , ioCancel := context .WithCancel (context .Background ())
78+
7779 // Start the initialization process. Any synchronous setup made by the connectors will
7880 // be completed after these lines. Async setup will be done once initDone is closed in
7981 // the goroutine below.
80- readerResultCh := connectorPair .ReadConnector (proc . ctx , logger .WithField ("direction" , "read" ))
81- writerResultCh := connectorPair .WriteConnector (proc . ctx , logger .WithField ("direction" , "write" ))
82+ readerResultCh := connectorPair .ReadConnector (ioCtx , logger .WithField ("direction" , "read" ))
83+ writerResultCh := connectorPair .WriteConnector (ioCtx , logger .WithField ("direction" , "write" ))
8284
8385 go func () {
86+ defer ioCancel ()
8487 defer close (copyDone )
8588
8689 var reader io.ReadCloser
@@ -119,7 +122,7 @@ func (connectorPair *IOConnectorPair) proxy(
119122 // If the io streams close on their own before the timeout, the Close calls here
120123 // should just be no-ops.
121124 go func () {
122- <- proc . ctx .Done ()
125+ <- ctx .Done ()
123126 time .AfterFunc (timeoutAfterExit , func () {
124127 logClose (logger , reader , writer )
125128 })
@@ -129,6 +132,7 @@ func (connectorPair *IOConnectorPair) proxy(
129132 defer logger .Debug ("end copying io" )
130133
131134 size , err := io .CopyBuffer (writer , reader , make ([]byte , internal .DefaultBufferSize ))
135+ logger .Debugf ("copied %d" , size )
132136 if err != nil {
133137 if strings .Contains (err .Error (), "use of closed network connection" ) ||
134138 strings .Contains (err .Error (), "file already closed" ) {
@@ -138,7 +142,6 @@ func (connectorPair *IOConnectorPair) proxy(
138142 }
139143 copyDone <- err
140144 }
141- logger .Debugf ("copied %d" , size )
142145 defer logClose (logger , reader , writer )
143146 }()
144147
@@ -174,19 +177,20 @@ func (ioConnectorSet *ioConnectorSet) start(proc *vmProc) (ioInitDone <-chan err
174177 if ioConnectorSet .stdin != nil {
175178 // For Stdin only, provide 0 as the timeout to wait after the proc exits before closing IO streams.
176179 // There's no reason to send stdin data to a proc that's already dead.
177- waitErrs (ioConnectorSet .stdin .proxy (proc , proc .logger .WithField ("stream" , "stdin" ), 0 ))
180+ waitErrs (ioConnectorSet .stdin .proxy (proc .ctx , proc .logger .WithField ("stream" , "stdin" ), 0 ))
181+
178182 } else {
179183 proc .logger .Debug ("skipping proxy io for unset stdin" )
180184 }
181185
182186 if ioConnectorSet .stdout != nil {
183- waitErrs (ioConnectorSet .stdout .proxy (proc , proc .logger .WithField ("stream" , "stdout" ), defaultIOFlushTimeout ))
187+ waitErrs (ioConnectorSet .stdout .proxy (proc . ctx , proc .logger .WithField ("stream" , "stdout" ), defaultIOFlushTimeout ))
184188 } else {
185189 proc .logger .Debug ("skipping proxy io for unset stdout" )
186190 }
187191
188192 if ioConnectorSet .stderr != nil {
189- waitErrs (ioConnectorSet .stderr .proxy (proc , proc .logger .WithField ("stream" , "stderr" ), defaultIOFlushTimeout ))
193+ waitErrs (ioConnectorSet .stderr .proxy (proc . ctx , proc .logger .WithField ("stream" , "stderr" ), defaultIOFlushTimeout ))
190194 } else {
191195 proc .logger .Debug ("skipping proxy io for unset stderr" )
192196 }
0 commit comments