Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.

Commit ec0599f

Browse files
authored
Merge pull request #486 from laijs/WaitProcess
libhyperstart: add WaitProcess() which also waits streamout closed
2 parents 05c709c + 4b1f940 commit ec0599f

File tree

6 files changed

+107
-72
lines changed

6 files changed

+107
-72
lines changed

hyperstart/libhyperstart/hyperstart.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ type Hyperstart interface {
1212
LastStreamSeq() uint64
1313

1414
APIVersion() (uint32, error)
15-
ProcessAsyncEvents() (<-chan hyperstartapi.ProcessAsyncEvent, error)
1615
NewContainer(c *hyperstartapi.Container) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error)
1716
RestoreContainer(c *hyperstartapi.Container) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error)
1817
AddProcess(container string, p *hyperstartapi.Process) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error)
1918
SignalProcess(container, process string, signal syscall.Signal) error
19+
WaitProcess(container, process string) int
2020
TtyWinResize(container, process string, row, col uint16) error
2121

2222
StartSandbox(pod *hyperstartapi.Pod) error

hyperstart/libhyperstart/json.go

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,21 @@ type pState struct {
3030
stdinPipe streamIn
3131
stdoutPipe io.ReadCloser
3232
stderrPipe io.ReadCloser
33+
exitStatus *int
34+
outClosed bool
35+
waitChan chan int
3336
}
3437

3538
type jsonBasedHyperstart struct {
3639
sync.RWMutex
37-
logPrefix string
38-
vmAPIVersion uint32
39-
closed bool
40-
lastStreamSeq uint64
41-
procs map[pKey]*pState
42-
streamOuts map[uint64]streamOut
43-
ctlChan chan *hyperstartCmd
44-
streamChan chan *hyperstartapi.TtyMessage
45-
processAsyncEvents chan hyperstartapi.ProcessAsyncEvent
40+
logPrefix string
41+
vmAPIVersion uint32
42+
closed bool
43+
lastStreamSeq uint64
44+
procs map[pKey]*pState
45+
streamOuts map[uint64]streamOut
46+
ctlChan chan *hyperstartCmd
47+
streamChan chan *hyperstartapi.TtyMessage
4648
}
4749

4850
type hyperstartCmd struct {
@@ -56,13 +58,12 @@ type hyperstartCmd struct {
5658

5759
func NewJsonBasedHyperstart(id, ctlSock, streamSock string, lastStreamSeq uint64, waitReady bool) Hyperstart {
5860
h := &jsonBasedHyperstart{
59-
logPrefix: fmt.Sprintf("SB[%s] ", id),
60-
procs: make(map[pKey]*pState),
61-
lastStreamSeq: lastStreamSeq,
62-
streamOuts: make(map[uint64]streamOut),
63-
ctlChan: make(chan *hyperstartCmd, 128),
64-
streamChan: make(chan *hyperstartapi.TtyMessage, 128),
65-
processAsyncEvents: make(chan hyperstartapi.ProcessAsyncEvent, 16),
61+
logPrefix: fmt.Sprintf("SB[%s] ", id),
62+
procs: make(map[pKey]*pState),
63+
lastStreamSeq: lastStreamSeq,
64+
streamOuts: make(map[uint64]streamOut),
65+
ctlChan: make(chan *hyperstartCmd, 128),
66+
streamChan: make(chan *hyperstartapi.TtyMessage, 128),
6667
}
6768
go handleStreamSock(h, streamSock)
6869
go handleCtlSock(h, ctlSock, waitReady)
@@ -86,17 +87,18 @@ func (h *jsonBasedHyperstart) Close() {
8687
defer h.Unlock()
8788
if !h.closed {
8889
h.Log(TRACE, "close jsonBasedHyperstart")
89-
for pk := range h.procs {
90-
h.processAsyncEvents <- hyperstartapi.ProcessAsyncEvent{Container: pk.c, Process: pk.p, Event: "finished", Status: 255}
91-
}
92-
h.procs = make(map[pKey]*pState)
9390
for _, out := range h.streamOuts {
9491
out.Close()
9592
}
9693
h.streamOuts = make(map[uint64]streamOut)
94+
for pk, ps := range h.procs {
95+
ps.outClosed = true
96+
ps.exitStatus = makeExitStatus(255)
97+
h.handleWaitProcess(pk, ps)
98+
}
99+
h.procs = make(map[pKey]*pState)
97100
close(h.ctlChan)
98101
close(h.streamChan)
99-
close(h.processAsyncEvents)
100102
for cmd := range h.ctlChan {
101103
if cmd.Code != hyperstartapi.INIT_ACK && cmd.Code != hyperstartapi.INIT_ERROR {
102104
cmd.result <- fmt.Errorf("hyperstart closed")
@@ -106,10 +108,6 @@ func (h *jsonBasedHyperstart) Close() {
106108
}
107109
}
108110

109-
func (h *jsonBasedHyperstart) ProcessAsyncEvents() (<-chan hyperstartapi.ProcessAsyncEvent, error) {
110-
return h.processAsyncEvents, nil
111-
}
112-
113111
func (h *jsonBasedHyperstart) LastStreamSeq() uint64 {
114112
return h.lastStreamSeq
115113
}
@@ -458,13 +456,22 @@ func handleStreamFromHyperstart(h *jsonBasedHyperstart, conn io.Reader) {
458456
}
459457
}
460458

459+
func makeExitStatus(status int) *int { return &status }
460+
461+
func (h *jsonBasedHyperstart) handleWaitProcess(pk pKey, ps *pState) {
462+
if ps.exitStatus != nil && ps.waitChan != nil && ps.outClosed {
463+
delete(h.procs, pk)
464+
ps.waitChan <- *ps.exitStatus
465+
}
466+
}
467+
461468
func (h *jsonBasedHyperstart) sendProcessAsyncEvent(pae hyperstartapi.ProcessAsyncEvent) {
462469
h.Lock()
463470
defer h.Unlock()
464471
pk := pKey{c: pae.Container, p: pae.Process}
465-
if _, ok := h.procs[pk]; ok {
466-
delete(h.procs, pk)
467-
h.processAsyncEvents <- pae
472+
if ps, ok := h.procs[pk]; ok {
473+
ps.exitStatus = makeExitStatus(pae.Status)
474+
h.handleWaitProcess(pk, ps)
468475
}
469476
}
470477

@@ -473,8 +480,8 @@ func (h *jsonBasedHyperstart) sendProcessAsyncEvent4242(stdioSeq uint64, code ui
473480
defer h.Unlock()
474481
for pk, ps := range h.procs {
475482
if ps.stdioSeq == stdioSeq {
476-
delete(h.procs, pk)
477-
h.processAsyncEvents <- hyperstartapi.ProcessAsyncEvent{Container: pk.c, Process: pk.p, Event: "finished", Status: int(code)}
483+
ps.exitStatus = makeExitStatus(int(code))
484+
h.handleWaitProcess(pk, ps)
478485
}
479486
}
480487
}
@@ -486,8 +493,17 @@ func (h *jsonBasedHyperstart) removeStreamOut(seq uint64) {
486493
// doesn't send eof of the stderr back, so we also remove stderr seq here
487494
if out, ok := h.streamOuts[seq]; ok {
488495
delete(h.streamOuts, seq)
489-
if seq == out.ps.stdioSeq && out.ps.stderrSeq > 0 {
490-
delete(h.streamOuts, out.ps.stderrSeq)
496+
if seq == out.ps.stdioSeq {
497+
if out.ps.stderrSeq > 0 {
498+
h.streamOuts[out.ps.stderrSeq].Close()
499+
delete(h.streamOuts, out.ps.stderrSeq)
500+
}
501+
for pk, ps := range h.procs {
502+
if ps.stdioSeq == seq {
503+
ps.outClosed = true
504+
h.handleWaitProcess(pk, ps)
505+
}
506+
}
491507
}
492508
}
493509
}
@@ -751,6 +767,26 @@ func (h *jsonBasedHyperstart) SignalProcess(container, process string, signal sy
751767
})
752768
}
753769

770+
// wait the process until exit. like waitpid()
771+
// the state is saved until someone calls WaitProcess() if the process exited earlier
772+
// the non-first call of WaitProcess() after process started MAY fail to find the process if the process exited earlier
773+
func (h *jsonBasedHyperstart) WaitProcess(container, process string) int {
774+
h.Lock()
775+
pk := pKey{c: container, p: process}
776+
if ps, ok := h.procs[pk]; ok {
777+
if ps.waitChan == nil {
778+
ps.waitChan = make(chan int, 1)
779+
h.handleWaitProcess(pk, ps)
780+
}
781+
h.Unlock()
782+
status := <-ps.waitChan
783+
ps.waitChan <- status
784+
return status
785+
}
786+
h.Unlock()
787+
return -1
788+
}
789+
754790
func (h *jsonBasedHyperstart) StartSandbox(pod *hyperstartapi.Pod) error {
755791
return h.hyperstartCommand(hyperstartapi.INIT_STARTPOD, pod)
756792
}

hypervisor/context.go

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -236,31 +236,6 @@ func (ctx *VmContext) LookupBySession(session uint64) string {
236236
return ""
237237
}
238238

239-
func (ctx *VmContext) handleProcessAsyncEvent(pae *hyperstartapi.ProcessAsyncEvent) {
240-
if pae.Event != "finished" {
241-
return
242-
}
243-
if pae.Process == "init" {
244-
ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{
245-
Id: pae.Container, Code: uint8(pae.Status), Ack: make(chan bool, 1),
246-
})
247-
ctx.lock.Lock()
248-
if c, ok := ctx.containers[pae.Container]; ok {
249-
c.Log(TRACE, "container finished, unset iostream pipes")
250-
c.stdinPipe = nil
251-
c.stdoutPipe = nil
252-
c.stderrPipe = nil
253-
c.tty = nil
254-
}
255-
ctx.lock.Unlock()
256-
} else {
257-
ctx.DeleteExec(pae.Process)
258-
ctx.reportProcessFinished(types.E_EXEC_FINISHED, &types.ProcessFinished{
259-
Id: pae.Process, Code: uint8(pae.Status), Ack: make(chan bool, 1),
260-
})
261-
}
262-
}
263-
264239
func (ctx *VmContext) Close() {
265240
ctx.closeOnce.Do(func() {
266241
ctx.Log(INFO, "VmContext Close()")

hypervisor/hypervisor.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,6 @@ func (ctx *VmContext) loop() {
3131
ctx.Log(DEBUG, "main event loop exiting")
3232
}
3333

34-
func (ctx *VmContext) handlePAEs() {
35-
ch, err := ctx.hyperstart.ProcessAsyncEvents()
36-
if err == nil {
37-
for e := range ch {
38-
ctx.handleProcessAsyncEvent(&e)
39-
}
40-
}
41-
ctx.hyperstart.Close()
42-
ctx.Log(ERROR, "hyperstart stopped")
43-
ctx.Hub <- &Interrupted{Reason: "hyperstart stopped"}
44-
}
45-
4634
func (ctx *VmContext) watchHyperstart(sendReadyEvent bool) {
4735
timeout := time.AfterFunc(60*time.Second, func() {
4836
if ctx.PauseState == PauseStateUnpaused {
@@ -92,7 +80,6 @@ func (ctx *VmContext) Launch() {
9280
}
9381

9482
go ctx.loop()
95-
go ctx.handlePAEs()
9683
}
9784

9885
func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, pack []byte) (*VmContext, error) {
@@ -130,7 +117,6 @@ func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, p
130117
//}
131118

132119
go context.watchHyperstart(false)
133-
go context.handlePAEs()
134120
go context.loop()
135121
return context, nil
136122
}

hypervisor/vm.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,13 @@ func (vm *Vm) AddProcess(container, execId string, terminal bool, args []string,
493493
}
494494

495495
go streamCopy(tty, stdinPipe, stdoutPipe, stderrPipe)
496+
go func() {
497+
status := vm.ctx.hyperstart.WaitProcess(container, execId)
498+
vm.ctx.DeleteExec(execId)
499+
vm.ctx.reportProcessFinished(types.E_EXEC_FINISHED, &types.ProcessFinished{
500+
Id: execId, Code: uint8(status), Ack: make(chan bool, 1),
501+
})
502+
}()
496503
return nil
497504
}
498505

hypervisor/vm_states.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/hyperhq/hypercontainer-utils/hlog"
12+
"github.com/hyperhq/runv/hypervisor/types"
1213
)
1314

1415
// states
@@ -44,6 +45,21 @@ func (ctx *VmContext) newContainer(id string) error {
4445
go streamCopy(c.tty, c.stdinPipe, c.stdoutPipe, c.stderrPipe)
4546
}
4647
ctx.Log(TRACE, "sent INIT_NEWCONTAINER")
48+
go func() {
49+
status := ctx.hyperstart.WaitProcess(id, "init")
50+
ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{
51+
Id: id, Code: uint8(status), Ack: make(chan bool, 1),
52+
})
53+
ctx.lock.Lock()
54+
if c, ok := ctx.containers[id]; ok {
55+
c.Log(TRACE, "container finished, unset iostream pipes")
56+
c.stdinPipe = nil
57+
c.stdoutPipe = nil
58+
c.stderrPipe = nil
59+
c.tty = nil
60+
}
61+
ctx.lock.Unlock()
62+
}()
4763
return err
4864
} else {
4965
return fmt.Errorf("container %s not exist", id)
@@ -72,6 +88,21 @@ func (ctx *VmContext) restoreContainer(id string) (alive bool, err error) {
7288
}
7389
return false, nil
7490
}
91+
go func() {
92+
status := ctx.hyperstart.WaitProcess(id, "init")
93+
ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{
94+
Id: id, Code: uint8(status), Ack: make(chan bool, 1),
95+
})
96+
ctx.lock.Lock()
97+
if c, ok := ctx.containers[id]; ok {
98+
c.Log(TRACE, "container finished, unset iostream pipes")
99+
c.stdinPipe = nil
100+
c.stdoutPipe = nil
101+
c.stderrPipe = nil
102+
c.tty = nil
103+
}
104+
ctx.lock.Unlock()
105+
}()
75106
return true, nil
76107
}
77108

0 commit comments

Comments
 (0)