Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.

Commit 7ac7317

Browse files
committed
libhyperstart: add & use WaitProcess()
Signed-off-by: Lai Jiangshan <jiangshanlai@gmail.com>
1 parent 652ea59 commit 7ac7317

File tree

6 files changed

+92
-68
lines changed

6 files changed

+92
-68
lines changed

hyperstart/libhyperstart/hyperstart.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ type Hyperstart interface {
1212
LastStreamSeq() uint64
1313

1414
APIVersion() (uint32, error)
15-
ProcessAsyncEvents() (<-chan hyperstartapi.ProcessAsyncEvent, error)
1615
NewContainer(c *hyperstartapi.Container) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error)
1716
RestoreContainer(c *hyperstartapi.Container) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error)
1817
AddProcess(container string, p *hyperstartapi.Process) (io.WriteCloser, io.ReadCloser, io.ReadCloser, error)
1918
SignalProcess(container, process string, signal syscall.Signal) error
19+
WaitProcess(container, process string) int
2020
TtyWinResize(container, process string, row, col uint16) error
2121

2222
StartSandbox(pod *hyperstartapi.Pod) error

hyperstart/libhyperstart/json.go

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,20 @@ type pState struct {
3030
stdinPipe streamIn
3131
stdoutPipe io.ReadCloser
3232
stderrPipe io.ReadCloser
33+
exitStatus *int
34+
waitChan chan int
3335
}
3436

3537
type jsonBasedHyperstart struct {
3638
sync.RWMutex
37-
logPrefix string
38-
vmAPIVersion uint32
39-
closed bool
40-
lastStreamSeq uint64
41-
procs map[pKey]*pState
42-
streamOuts map[uint64]streamOut
43-
ctlChan chan *hyperstartCmd
44-
streamChan chan *hyperstartapi.TtyMessage
45-
processAsyncEvents chan hyperstartapi.ProcessAsyncEvent
39+
logPrefix string
40+
vmAPIVersion uint32
41+
closed bool
42+
lastStreamSeq uint64
43+
procs map[pKey]*pState
44+
streamOuts map[uint64]streamOut
45+
ctlChan chan *hyperstartCmd
46+
streamChan chan *hyperstartapi.TtyMessage
4647
}
4748

4849
type hyperstartCmd struct {
@@ -56,13 +57,12 @@ type hyperstartCmd struct {
5657

5758
func NewJsonBasedHyperstart(id, ctlSock, streamSock string, lastStreamSeq uint64, waitReady bool) Hyperstart {
5859
h := &jsonBasedHyperstart{
59-
logPrefix: fmt.Sprintf("SB[%s] ", id),
60-
procs: make(map[pKey]*pState),
61-
lastStreamSeq: lastStreamSeq,
62-
streamOuts: make(map[uint64]streamOut),
63-
ctlChan: make(chan *hyperstartCmd, 128),
64-
streamChan: make(chan *hyperstartapi.TtyMessage, 128),
65-
processAsyncEvents: make(chan hyperstartapi.ProcessAsyncEvent, 16),
60+
logPrefix: fmt.Sprintf("SB[%s] ", id),
61+
procs: make(map[pKey]*pState),
62+
lastStreamSeq: lastStreamSeq,
63+
streamOuts: make(map[uint64]streamOut),
64+
ctlChan: make(chan *hyperstartCmd, 128),
65+
streamChan: make(chan *hyperstartapi.TtyMessage, 128),
6666
}
6767
go handleStreamSock(h, streamSock)
6868
go handleCtlSock(h, ctlSock, waitReady)
@@ -86,8 +86,9 @@ func (h *jsonBasedHyperstart) Close() {
8686
defer h.Unlock()
8787
if !h.closed {
8888
h.Log(TRACE, "close jsonBasedHyperstart")
89-
for pk := range h.procs {
90-
h.processAsyncEvents <- hyperstartapi.ProcessAsyncEvent{Container: pk.c, Process: pk.p, Event: "finished", Status: 255}
89+
for pk, ps := range h.procs {
90+
ps.exitStatus = makeExitStatus(255)
91+
h.handleWaitProcess(pk, ps)
9192
}
9293
h.procs = make(map[pKey]*pState)
9394
for _, out := range h.streamOuts {
@@ -96,7 +97,6 @@ func (h *jsonBasedHyperstart) Close() {
9697
h.streamOuts = make(map[uint64]streamOut)
9798
close(h.ctlChan)
9899
close(h.streamChan)
99-
close(h.processAsyncEvents)
100100
for cmd := range h.ctlChan {
101101
if cmd.Code != hyperstartapi.INIT_ACK && cmd.Code != hyperstartapi.INIT_ERROR {
102102
cmd.result <- fmt.Errorf("hyperstart closed")
@@ -106,10 +106,6 @@ func (h *jsonBasedHyperstart) Close() {
106106
}
107107
}
108108

109-
func (h *jsonBasedHyperstart) ProcessAsyncEvents() (<-chan hyperstartapi.ProcessAsyncEvent, error) {
110-
return h.processAsyncEvents, nil
111-
}
112-
113109
func (h *jsonBasedHyperstart) LastStreamSeq() uint64 {
114110
return h.lastStreamSeq
115111
}
@@ -459,13 +455,22 @@ func handleStreamFromHyperstart(h *jsonBasedHyperstart, conn io.Reader) {
459455
}
460456
}
461457

458+
func makeExitStatus(status int) *int { return &status }
459+
460+
func (h *jsonBasedHyperstart) handleWaitProcess(pk pKey, ps *pState) {
461+
if ps.exitStatus != nil && ps.waitChan != nil {
462+
delete(h.procs, pk)
463+
ps.waitChan <- *ps.exitStatus
464+
}
465+
}
466+
462467
func (h *jsonBasedHyperstart) sendProcessAsyncEvent(pae hyperstartapi.ProcessAsyncEvent) {
463468
h.Lock()
464469
defer h.Unlock()
465470
pk := pKey{c: pae.Container, p: pae.Process}
466-
if _, ok := h.procs[pk]; ok {
467-
delete(h.procs, pk)
468-
h.processAsyncEvents <- pae
471+
if ps, ok := h.procs[pk]; ok {
472+
ps.exitStatus = makeExitStatus(pae.Status)
473+
h.handleWaitProcess(pk, ps)
469474
}
470475
}
471476

@@ -474,8 +479,8 @@ func (h *jsonBasedHyperstart) sendProcessAsyncEvent4242(stdioSeq uint64, code ui
474479
defer h.Unlock()
475480
for pk, ps := range h.procs {
476481
if ps.stdioSeq == stdioSeq {
477-
delete(h.procs, pk)
478-
h.processAsyncEvents <- hyperstartapi.ProcessAsyncEvent{Container: pk.c, Process: pk.p, Event: "finished", Status: int(code)}
482+
ps.exitStatus = makeExitStatus(int(code))
483+
h.handleWaitProcess(pk, ps)
479484
}
480485
}
481486
}
@@ -752,6 +757,26 @@ func (h *jsonBasedHyperstart) SignalProcess(container, process string, signal sy
752757
})
753758
}
754759

760+
// wait the process until exit. like waitpid()
761+
// the state is saved until someone calls WaitProcess() if the process exited earlier
762+
// the non-first call of WaitProcess() after process started MAY fail to find the process if the process exited earlier
763+
func (h *jsonBasedHyperstart) WaitProcess(container, process string) int {
764+
h.Lock()
765+
pk := pKey{c: container, p: process}
766+
if ps, ok := h.procs[pk]; ok {
767+
if ps.waitChan == nil {
768+
ps.waitChan = make(chan int, 1)
769+
h.handleWaitProcess(pk, ps)
770+
}
771+
h.Unlock()
772+
status := <-ps.waitChan
773+
ps.waitChan <- status
774+
return status
775+
}
776+
h.Unlock()
777+
return -1
778+
}
779+
755780
func (h *jsonBasedHyperstart) StartSandbox(pod *hyperstartapi.Pod) error {
756781
return h.hyperstartCommand(hyperstartapi.INIT_STARTPOD, pod)
757782
}

hypervisor/context.go

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -236,31 +236,6 @@ func (ctx *VmContext) LookupBySession(session uint64) string {
236236
return ""
237237
}
238238

239-
func (ctx *VmContext) handleProcessAsyncEvent(pae *hyperstartapi.ProcessAsyncEvent) {
240-
if pae.Event != "finished" {
241-
return
242-
}
243-
if pae.Process == "init" {
244-
ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{
245-
Id: pae.Container, Code: uint8(pae.Status), Ack: make(chan bool, 1),
246-
})
247-
ctx.lock.Lock()
248-
if c, ok := ctx.containers[pae.Container]; ok {
249-
c.Log(TRACE, "container finished, unset iostream pipes")
250-
c.stdinPipe = nil
251-
c.stdoutPipe = nil
252-
c.stderrPipe = nil
253-
c.tty = nil
254-
}
255-
ctx.lock.Unlock()
256-
} else {
257-
ctx.DeleteExec(pae.Process)
258-
ctx.reportProcessFinished(types.E_EXEC_FINISHED, &types.ProcessFinished{
259-
Id: pae.Process, Code: uint8(pae.Status), Ack: make(chan bool, 1),
260-
})
261-
}
262-
}
263-
264239
func (ctx *VmContext) Close() {
265240
ctx.closeOnce.Do(func() {
266241
ctx.Log(INFO, "VmContext Close()")

hypervisor/hypervisor.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,6 @@ func (ctx *VmContext) loop() {
3131
ctx.Log(DEBUG, "main event loop exiting")
3232
}
3333

34-
func (ctx *VmContext) handlePAEs() {
35-
ch, err := ctx.hyperstart.ProcessAsyncEvents()
36-
if err == nil {
37-
for e := range ch {
38-
ctx.handleProcessAsyncEvent(&e)
39-
}
40-
}
41-
ctx.hyperstart.Close()
42-
ctx.Log(ERROR, "hyperstart stopped")
43-
ctx.Hub <- &Interrupted{Reason: "hyperstart stopped"}
44-
}
45-
4634
func (ctx *VmContext) watchHyperstart(sendReadyEvent bool) {
4735
timeout := time.AfterFunc(30*time.Second, func() {
4836
if ctx.PauseState == PauseStateUnpaused {
@@ -92,7 +80,6 @@ func (ctx *VmContext) Launch() {
9280
}
9381

9482
go ctx.loop()
95-
go ctx.handlePAEs()
9683
}
9784

9885
func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, pack []byte) (*VmContext, error) {
@@ -130,7 +117,6 @@ func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, p
130117
//}
131118

132119
go context.watchHyperstart(false)
133-
go context.handlePAEs()
134120
go context.loop()
135121
return context, nil
136122
}

hypervisor/vm.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,13 @@ func (vm *Vm) AddProcess(container, execId string, terminal bool, args []string,
493493
}
494494

495495
go streamCopy(tty, stdinPipe, stdoutPipe, stderrPipe)
496+
go func() {
497+
status := vm.ctx.hyperstart.WaitProcess(container, execId)
498+
vm.ctx.DeleteExec(execId)
499+
vm.ctx.reportProcessFinished(types.E_EXEC_FINISHED, &types.ProcessFinished{
500+
Id: execId, Code: uint8(status), Ack: make(chan bool, 1),
501+
})
502+
}()
496503
return nil
497504
}
498505

hypervisor/vm_states.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/hyperhq/hypercontainer-utils/hlog"
12+
"github.com/hyperhq/runv/hypervisor/types"
1213
)
1314

1415
// states
@@ -44,6 +45,21 @@ func (ctx *VmContext) newContainer(id string) error {
4445
go streamCopy(c.tty, c.stdinPipe, c.stdoutPipe, c.stderrPipe)
4546
}
4647
ctx.Log(TRACE, "sent INIT_NEWCONTAINER")
48+
go func() {
49+
status := ctx.hyperstart.WaitProcess(id, "init")
50+
ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{
51+
Id: id, Code: uint8(status), Ack: make(chan bool, 1),
52+
})
53+
ctx.lock.Lock()
54+
if c, ok := ctx.containers[id]; ok {
55+
c.Log(TRACE, "container finished, unset iostream pipes")
56+
c.stdinPipe = nil
57+
c.stdoutPipe = nil
58+
c.stderrPipe = nil
59+
c.tty = nil
60+
}
61+
ctx.lock.Unlock()
62+
}()
4763
return err
4864
} else {
4965
return fmt.Errorf("container %s not exist", id)
@@ -72,6 +88,21 @@ func (ctx *VmContext) restoreContainer(id string) (alive bool, err error) {
7288
}
7389
return false, nil
7490
}
91+
go func() {
92+
status := ctx.hyperstart.WaitProcess(id, "init")
93+
ctx.reportProcessFinished(types.E_CONTAINER_FINISHED, &types.ProcessFinished{
94+
Id: id, Code: uint8(status), Ack: make(chan bool, 1),
95+
})
96+
ctx.lock.Lock()
97+
if c, ok := ctx.containers[id]; ok {
98+
c.Log(TRACE, "container finished, unset iostream pipes")
99+
c.stdinPipe = nil
100+
c.stdoutPipe = nil
101+
c.stderrPipe = nil
102+
c.tty = nil
103+
}
104+
ctx.lock.Unlock()
105+
}()
75106
return true, nil
76107
}
77108

0 commit comments

Comments
 (0)