@@ -17,7 +17,6 @@ limitations under the License.
1717package plugin
1818
1919import (
20- "context"
2120 "fmt"
2221 "io"
2322 "os/exec"
@@ -144,15 +143,15 @@ func readFromReader(reader io.ReadCloser, maxBytes int64) ([]byte, error) {
144143}
145144
146145func (p * Plugin ) run (rule cpmtypes.CustomRule ) (exitStatus cpmtypes.Status , output string ) {
147- var ctx context. Context
148- var cancel context. CancelFunc
146+ isTimeout := false
147+ isHung := false
149148
149+ var timeoutDuration time.Duration
150150 if rule .Timeout != nil && * rule .Timeout < * p .config .PluginGlobalConfig .Timeout {
151- ctx , cancel = context . WithTimeout ( context . Background (), * rule .Timeout )
151+ timeoutDuration = * rule .Timeout
152152 } else {
153- ctx , cancel = context . WithTimeout ( context . Background (), * p .config .PluginGlobalConfig .Timeout )
153+ timeoutDuration = * p .config .PluginGlobalConfig .Timeout
154154 }
155- defer cancel ()
156155
157156 cmd := util .Exec (rule .Path , rule .Args ... )
158157
@@ -171,37 +170,6 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
171170 return cpmtypes .Unknown , "Error in starting plugin. Please check the error log"
172171 }
173172
174- waitChan := make (chan struct {})
175- defer close (waitChan )
176-
177- var m sync.Mutex
178- timeout := false
179-
180- go func () {
181- select {
182- case <- ctx .Done ():
183- if ctx .Err () == context .Canceled {
184- return
185- }
186- klog .Errorf ("Error in running plugin timeout %q" , rule .Path )
187- if cmd .Process == nil || cmd .Process .Pid == 0 {
188- klog .Errorf ("Error in cmd.Process check %q" , rule .Path )
189- break
190- }
191-
192- m .Lock ()
193- timeout = true
194- m .Unlock ()
195-
196- err := util .Kill (cmd )
197- if err != nil {
198- klog .Errorf ("Error in kill process %d, %v" , cmd .Process .Pid , err )
199- }
200- case <- waitChan :
201- return
202- }
203- }()
204-
205173 var (
206174 wg sync.WaitGroup
207175 stdout []byte
@@ -221,14 +189,46 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
221189 }()
222190 // This will wait for the reads to complete. If the execution times out, the pipes
223191 // will be closed and the wait group unblocks.
224- wg .Wait ()
192+ // If the timeout is caused by the plugin process or sub-process hung due to GPU device errors or other reasons,
193+ // wg.Wait() will be blocked forever, so we need to add a timeout to the wait group.
194+ waitChan := make (chan struct {})
195+ go func () {
196+ wg .Wait ()
197+ close (waitChan )
198+ }()
199+ select {
200+ case <- waitChan :
201+ // The reads are done.
202+ break
203+ case <- time .After (timeoutDuration ):
204+ klog .Errorf ("Waiting for command output timed out when running plugin %q" , rule .Path )
205+ isTimeout = true
206+ err := util .Kill (cmd )
207+ if err != nil {
208+ klog .Errorf ("Error when killing process %d: %v" , cmd .Process .Pid , err )
209+ } else {
210+ klog .Infof ("Killed process %d successfully" , cmd .Process .Pid )
211+ }
225212
226- if stdoutErr != nil {
213+ // Check if the process is in D state. If it is, the process is hung and can not be killed.
214+ // It also means that the plugin can not report the correct status, instead reports Unknown status.
215+ // On a GPU machine, a plugin with Python script calling pynvml API may hang in D state due to some GPU device errors.
216+ if util .IsProcessInDState (cmd .Process .Pid ) {
217+ klog .Errorf ("Process %d is hung in D state" , cmd .Process .Pid )
218+ isHung = true
219+ }
220+ }
221+
222+ if isHung {
223+ return cpmtypes .Unknown , fmt .Sprintf ("Process is hung when running plugin %s" , rule .Path )
224+ }
225+
226+ if ! isTimeout && stdoutErr != nil {
227227 klog .Errorf ("Error reading stdout for plugin %q: error - %v" , rule .Path , err )
228228 return cpmtypes .Unknown , "Error reading stdout for plugin. Please check the error log"
229229 }
230230
231- if stderrErr != nil {
231+ if ! isTimeout && stderrErr != nil {
232232 klog .Errorf ("Error reading stderr for plugin %q: error - %v" , rule .Path , err )
233233 return cpmtypes .Unknown , "Error reading stderr for plugin. Please check the error log"
234234 }
@@ -240,16 +240,13 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
240240 }
241241 }
242242
243- // trim suffix useless bytes
244- output = string (stdout )
245- output = strings .TrimSpace (output )
246-
247- m .Lock ()
248- cmdKilled := timeout
249- m .Unlock ()
250-
251- if cmdKilled {
252- output = fmt .Sprintf ("Timeout when running plugin %q: state - %s. output - %q" , rule .Path , cmd .ProcessState .String (), output )
243+ stderrStr := ""
244+ if isTimeout {
245+ output = fmt .Sprintf ("Timeout when running plugin %q: state - %s. output - %q" , rule .Path , cmd .ProcessState .String (), "" )
246+ } else {
247+ // trim suffix useless bytes
248+ output = strings .TrimSpace (string (stdout ))
249+ stderrStr = strings .TrimSpace (string (stderr ))
253250 }
254251
255252 // cut at position max_output_length if stdout is longer than max_output_length bytes
@@ -260,13 +257,13 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
260257 exitCode := cmd .ProcessState .Sys ().(syscall.WaitStatus ).ExitStatus ()
261258 switch exitCode {
262259 case 0 :
263- logPluginStderr (rule , string ( stderr ) , 3 )
260+ logPluginStderr (rule , stderrStr , 3 )
264261 return cpmtypes .OK , output
265262 case 1 :
266- logPluginStderr (rule , string ( stderr ) , 0 )
263+ logPluginStderr (rule , stderrStr , 0 )
267264 return cpmtypes .NonOK , output
268265 default :
269- logPluginStderr (rule , string ( stderr ) , 0 )
266+ logPluginStderr (rule , stderrStr , 0 )
270267 return cpmtypes .Unknown , output
271268 }
272269}
0 commit comments