Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.

Commit cfa4183

Browse files
committed
replace runv with kata-containers
Signed-off-by: Fupan Li <lifupan@gmail.com>
1 parent 78de507 commit cfa4183

File tree

17 files changed

+1061
-494
lines changed

17 files changed

+1061
-494
lines changed

daemon/pod/container.go

Lines changed: 225 additions & 115 deletions
Large diffs are not rendered by default.

daemon/pod/decommission.go

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ import (
1010
dockertypes "github.com/docker/engine-api/types"
1111

1212
"github.com/hyperhq/hyperd/utils"
13-
"github.com/hyperhq/runv/hypervisor"
13+
vc "github.com/kata-containers/runtime/virtcontainers"
14+
// "github.com/hyperhq/runv/hypervisor"
1415
)
1516

16-
type sandboxOp func(sb *hypervisor.Vm) error
17+
type sandboxOp func(sb *vc.Sandbox) error
1718
type stateValidator func(state PodState) bool
1819

1920
func (p *XPod) DelayDeleteOn() bool {
@@ -37,9 +38,9 @@ func (p *XPod) Stop(graceful int) error {
3738

3839
func (p *XPod) ForceQuit() {
3940
err := p.protectedSandboxOperation(
40-
func(sb *hypervisor.Vm) error {
41-
sb.Kill()
42-
return nil
41+
func(sb *vc.Sandbox) error {
42+
_, err := vc.StopSandbox(sb.ID())
43+
return err
4344
},
4445
time.Second*5,
4546
"kill pod")
@@ -118,8 +119,8 @@ func (p *XPod) Pause() error {
118119
p.statusLock.Unlock()
119120

120121
err := p.protectedSandboxOperation(
121-
func(sb *hypervisor.Vm) error {
122-
return sb.Pause(true)
122+
func(sb *vc.Sandbox) error {
123+
return sb.Pause()
123124
},
124125
time.Second*5,
125126
"pause pod")
@@ -148,8 +149,8 @@ func (p *XPod) UnPause() error {
148149
p.statusLock.Unlock()
149150

150151
err := p.protectedSandboxOperation(
151-
func(sb *hypervisor.Vm) error {
152-
return sb.Pause(false)
152+
func(sb *vc.Sandbox) error {
153+
return sb.Pause()
153154
},
154155
time.Second*5,
155156
"resume pod")
@@ -176,8 +177,9 @@ func (p *XPod) KillContainer(id string, sig int64) error {
176177
}
177178
c.setKill()
178179
return p.protectedSandboxOperation(
179-
func(sb *hypervisor.Vm) error {
180-
return sb.KillContainer(id, syscall.Signal(sig))
180+
func(sb *vc.Sandbox) error {
181+
// return sb.KillContainer(id, syscall.Signal(sig))
182+
return vc.KillContainer(sb.ID(), id, syscall.Signal(sig), true)
181183
},
182184
time.Second*5,
183185
fmt.Sprintf("Kill container %s with %d", id, sig))
@@ -307,7 +309,7 @@ func (p *XPod) RemoveContainer(id string) error {
307309
// protectedSandboxOperation() protect the hypervisor operations, which may
308310
// panic or hang too long time.
309311
func (p *XPod) protectedSandboxOperation(op sandboxOp, timeout time.Duration, comment string) error {
310-
dangerousOp := func(sb *hypervisor.Vm, errChan chan<- error) {
312+
dangerousOp := func(sb *vc.Sandbox, errChan chan<- error) {
311313
defer func() {
312314
err := recover()
313315
if err != nil {
@@ -393,13 +395,13 @@ func (p *XPod) doStopPod(graceful int) error {
393395
}
394396

395397
p.Log(INFO, "stop container success, shutdown sandbox")
396-
result := p.sandbox.Shutdown()
397-
if result.IsSuccess() {
398+
_, err = vc.StopSandbox(p.sandbox.ID())
399+
if err == nil {
398400
p.Log(INFO, "pod is stopped")
399401
return nil
400402
}
401403

402-
err = fmt.Errorf("failed to shuting down: %s", result.Message())
404+
err = fmt.Errorf("failed to shuting down: %s", err)
403405
p.Log(ERROR, err)
404406
return err
405407
}
@@ -448,13 +450,22 @@ func (p *XPod) stopContainers(cList []string, graceful int) error {
448450
}
449451
future.Add(c.Id(), func() error {
450452
var toc <-chan time.Time
453+
var retch = make(chan int32)
454+
451455
if int64(graceful) < 0 {
452456
toc = make(chan time.Time)
453457
} else {
454458
toc = time.After(waitTime)
455459
}
460+
456461
forceKill := graceful == 0
457-
resChan := p.sandbox.WaitProcess(true, []string{c.Id()}, -1)
462+
//
463+
//
464+
go func(retch chan int32, c *Container) {
465+
ret, _ := p.sandbox.WaitProcess(c.Id(), c.Id())
466+
retch <- ret
467+
}(retch, c)
468+
458469
c.Log(DEBUG, "now, stop container")
459470
err := c.terminate(forceKill)
460471
// TODO filter container/process can't find error
@@ -464,20 +475,11 @@ func (p *XPod) stopContainers(cList []string, graceful int) error {
464475
return err
465476
}
466477
}
467-
if resChan == nil {
468-
err := fmt.Errorf("cannot wait container %s", c.Id())
469-
p.Log(ERROR, err)
470-
return err
471-
}
478+
472479
for {
473480
select {
474-
case ex, ok := <-resChan:
475-
if !ok {
476-
err := fmt.Errorf("chan broken while waiting container: %s", c.Id())
477-
p.Log(WARNING, err)
478-
return err
479-
}
480-
p.Log(DEBUG, "container %s stopped (%v)", ex.Id, ex.Code)
481+
case ret := <-retch:
482+
p.Log(DEBUG, "container %s stopped (%d)", c.Id(), ret)
481483
return nil
482484
case <-toc:
483485
if forceKill {
@@ -493,6 +495,7 @@ func (p *XPod) stopContainers(cList []string, graceful int) error {
493495
}
494496
}
495497
return nil
498+
496499
})
497500
}
498501

@@ -532,7 +535,8 @@ func (p *XPod) waitVMStop() {
532535
}
533536
p.statusLock.RUnlock()
534537

535-
_, _ = <-p.sandbox.WaitVm(-1)
538+
monitor, _ := p.sandbox.Monitor()
539+
_ = <-monitor
536540
p.Log(INFO, "got vm exit event")
537541
p.cleanup()
538542
}

daemon/pod/exec.go

Lines changed: 56 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,9 @@ import (
88
"time"
99

1010
"github.com/docker/docker/pkg/stdcopy"
11-
1211
"github.com/hyperhq/hypercontainer-utils/hlog"
1312
"github.com/hyperhq/hyperd/utils"
14-
"github.com/hyperhq/runv/api"
15-
"github.com/hyperhq/runv/hypervisor"
13+
vc "github.com/kata-containers/runtime/virtcontainers"
1614
)
1715

1816
type Exec struct {
@@ -57,7 +55,7 @@ func (p *XPod) CreateExec(containerId, cmds string, terminal bool) (string, erro
5755
p.statusLock.Lock()
5856
p.execs[execId] = &Exec{
5957
Container: containerId,
60-
Id: execId,
58+
Id: "",
6159
Cmds: command,
6260
Terminal: terminal,
6361
ExitCode: 255,
@@ -85,6 +83,7 @@ type writeCloser struct {
8583
}
8684

8785
func (p *XPod) StartExec(stdin io.ReadCloser, stdout io.WriteCloser, containerId, execId string) error {
86+
8887
c, ok := p.containers[containerId]
8988
if !ok {
9089
err := fmt.Errorf("no container %s available for exec %s", containerId, execId)
@@ -103,7 +102,7 @@ func (p *XPod) StartExec(stdin io.ReadCloser, stdout io.WriteCloser, containerId
103102
}
104103

105104
wReader := &waitClose{ReadCloser: stdin, wait: make(chan bool)}
106-
tty := &hypervisor.TtyIO{
105+
tty := &TtyIO{
107106
Stdin: wReader,
108107
Stdout: stdout,
109108
}
@@ -124,21 +123,39 @@ func (p *XPod) StartExec(stdin io.ReadCloser, stdout io.WriteCloser, containerId
124123
}
125124
}
126125

126+
cmd := vc.Cmd{
127+
Args: es.Cmds,
128+
Envs: c.cmdEnvs([]vc.EnvVar{}),
129+
WorkDir: c.spec.Workdir,
130+
Interactive: es.Terminal,
131+
Detach: !es.Terminal,
132+
User: "0", //set the default user and group
133+
PrimaryGroup: "0",
134+
}
135+
136+
if c.spec.User != nil {
137+
cmd.User = c.spec.User.Name
138+
cmd.PrimaryGroup = c.spec.User.Group
139+
}
140+
141+
_, process, err := p.sandbox.EnterContainer(containerId, cmd)
142+
if err != nil {
143+
err := fmt.Errorf("cannot enter container %s, with err %s", containerId, err)
144+
p.Log(ERROR, err)
145+
return err
146+
}
147+
es.Id = process.Token
148+
127149
go func(es *Exec) {
128-
result := p.sandbox.WaitProcess(false, []string{execId}, -1)
129-
if result == nil {
150+
ret, err := p.sandbox.WaitProcess(containerId, es.Id)
151+
if err == nil {
130152
es.Log(ERROR, "can not wait exec")
131153
return
132154
}
133155

134-
r, ok := <-result
135-
if !ok {
136-
es.Log(ERROR, "waiting exec interrupted")
137-
return
138-
}
156+
es.Log(DEBUG, "exec terminated at %v with code %d", time.Now(), int(ret))
157+
es.ExitCode = uint8(ret)
139158

140-
es.Log(DEBUG, "exec terminated at %v with code %d", r.FinishedAt, r.Code)
141-
es.ExitCode = uint8(r.Code)
142159
select {
143160
case es.finChan <- true:
144161
es.Log(DEBUG, "wake exec stopped chan")
@@ -147,30 +164,17 @@ func (p *XPod) StartExec(stdin io.ReadCloser, stdout io.WriteCloser, containerId
147164
}
148165
}(es)
149166

150-
var envs []string
151-
for e, v := range c.descript.Envs {
152-
envs = append(envs, fmt.Sprintf("%s=%s", e, v))
153-
}
154-
155-
process := &api.Process{
156-
Container: es.Container,
157-
Id: es.Id,
158-
Terminal: es.Terminal,
159-
Args: es.Cmds,
160-
Envs: envs,
161-
Workdir: c.descript.Workdir,
162-
}
163-
164-
if c.descript.UGI != nil {
165-
process.User = c.descript.UGI.User
166-
process.Group = c.descript.UGI.Group
167-
process.AdditionalGroup = c.descript.UGI.AdditionalGroups
167+
cstdin, cstdout, cstderr, err := p.sandbox.IOStream(containerId, es.Id)
168+
if err != nil {
169+
c.Log(ERROR, err)
170+
return err
168171
}
169172

170-
err := p.sandbox.AddProcess(process, tty)
173+
go streamCopy(tty, cstdin, cstdout, cstderr)
171174

172175
<-wReader.wait
173-
return err
176+
177+
return nil
174178
}
175179

176180
func (p *XPod) GetExecExitCode(containerId, execId string) (uint8, error) {
@@ -208,8 +212,8 @@ func (p *XPod) KillExec(execId string, sig int64) error {
208212
}
209213

210214
return p.protectedSandboxOperation(
211-
func(sb *hypervisor.Vm) error {
212-
return sb.SignalProcess(es.Container, es.Id, syscall.Signal(sig))
215+
func(sb *vc.Sandbox) error {
216+
return sb.SignalProcess(es.Container, es.Id, syscall.Signal(sig), true)
213217
},
214218
time.Second*5,
215219
fmt.Sprintf("Kill process %s with %d", es.Id, sig))
@@ -228,16 +232,20 @@ func (p *XPod) CleanupExecs() {
228232
}
229233

230234
func (p *XPod) ExecVM(cmd string, stdin io.ReadCloser, stdout, stderr io.WriteCloser) (int, error) {
231-
wReader := &waitClose{ReadCloser: stdin, wait: make(chan bool)}
232-
tty := &hypervisor.TtyIO{
233-
Stdin: wReader,
234-
Stdout: stdout,
235-
Stderr: stderr,
236-
}
237-
res, err := p.sandbox.HyperstartExec(cmd, tty)
238-
if err != nil {
239-
return res, err
240-
}
241-
<-wReader.wait
242-
return res, err
235+
/*
236+
wReader := &waitClose{ReadCloser: stdin, wait: make(chan bool)}
237+
tty := &hypervisor.TtyIO{
238+
Stdin: wReader,
239+
Stdout: stdout,
240+
Stderr: stderr,
241+
}
242+
243+
res, err := p.sandbox.HyperstartExec(cmd, tty)
244+
if err != nil {
245+
return res, err
246+
}
247+
<-wReader.wait
248+
*/
249+
// return res, err
250+
return 0, nil
243251
}

daemon/pod/networks.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,14 @@ func (inf *Interface) add() error {
7676
inf.Log(ERROR, err)
7777
return err
7878
}
79-
err := inf.p.sandbox.AddNic(inf.descript)
80-
if err != nil {
81-
inf.Log(ERROR, "failed to add NIC: %v", err)
82-
}
83-
return err
79+
/*
80+
err := inf.p.sandbox.AddNic(inf.descript)
81+
if err != nil {
82+
inf.Log(ERROR, "failed to add NIC: %v", err)
83+
}
84+
return err
85+
*/
86+
return nil
8487
}
8588

8689
func (inf *Interface) cleanup() error {

0 commit comments

Comments
 (0)