Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.

Commit 7f03f99

Browse files
committed
Ensure the vm is running when vm operation called
part of the fix of hyperhq/hyperd#598 (pod creation rollback), changes include: - change context lock from Mutex to RWMutex - move ctx.current access inside lock - ensure vm operations has the running status check Signed-off-by: Wang Xu <gnawux@gmail.com>
1 parent b01db84 commit 7f03f99

File tree

3 files changed

+76
-92
lines changed

3 files changed

+76
-92
lines changed

hypervisor/context.go

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type VmContext struct {
7070

7171
logPrefix string
7272

73-
lock sync.Mutex //protect update of context
73+
lock sync.RWMutex //protect update of context
7474
idLock sync.Mutex
7575
pauseLock sync.Mutex
7676
closeOnce sync.Once
@@ -157,8 +157,8 @@ func InitContext(id string, hub chan VmEvent, client chan *types.VmResponse, dc
157157
// no handler associated with the context. VmEvent handling happens in a
158158
// separate goroutine, so this is thread-safe and asynchronous.
159159
func (ctx *VmContext) SendVmEvent(ev VmEvent) error {
160-
ctx.lock.Lock()
161-
defer ctx.lock.Unlock()
160+
ctx.lock.RLock()
161+
defer ctx.lock.RUnlock()
162162

163163
if ctx.handler == nil {
164164
return fmt.Errorf("VmContext(%s): event handler already shutdown.", ctx.Id)
@@ -202,8 +202,8 @@ func (ctx *VmContext) NextPciAddr() int {
202202
}
203203

204204
func (ctx *VmContext) LookupExecBySession(session uint64) string {
205-
ctx.lock.Lock()
206-
defer ctx.lock.Unlock()
205+
ctx.lock.RLock()
206+
defer ctx.lock.RUnlock()
207207

208208
for id, exec := range ctx.vmExec {
209209
if exec.Process.Stdio == session {
@@ -223,8 +223,8 @@ func (ctx *VmContext) DeleteExec(id string) {
223223
}
224224

225225
func (ctx *VmContext) LookupBySession(session uint64) string {
226-
ctx.lock.Lock()
227-
defer ctx.lock.Unlock()
226+
ctx.lock.RLock()
227+
defer ctx.lock.RUnlock()
228228

229229
for id, c := range ctx.containers {
230230
if c.process.Stdio == session {
@@ -281,6 +281,14 @@ func (ctx *VmContext) Become(handler stateHandler, desc string) {
281281
ctx.Log(DEBUG, "state change from %s to '%s'", orig, desc)
282282
}
283283

284+
func (ctx *VmContext) IsRunning() bool {
285+
var running bool
286+
ctx.lock.RLock()
287+
running = ctx.current == StateRunning
288+
ctx.lock.RUnlock()
289+
return running
290+
}
291+
284292
// User API
285293
func (ctx *VmContext) SetNetworkEnvironment(net *api.SandboxConfig) {
286294
ctx.lock.Lock()
@@ -298,13 +306,23 @@ func (ctx *VmContext) AddInterface(inf *api.InterfaceDescription, result chan ap
298306
ctx.lock.Lock()
299307
defer ctx.lock.Unlock()
300308

309+
if ctx.current != StateRunning {
310+
ctx.Log(DEBUG, "add interface %s during %v", inf.Id, ctx.current)
311+
result <- NewNotReadyError(ctx.Id)
312+
}
313+
301314
ctx.networks.addInterface(inf, result)
302315
}
303316

304317
func (ctx *VmContext) RemoveInterface(id string, result chan api.Result) {
305318
ctx.lock.Lock()
306319
defer ctx.lock.Unlock()
307320

321+
if ctx.current != StateRunning {
322+
ctx.Log(DEBUG, "remove interface %s during %v", id, ctx.current)
323+
result <- api.NewResultBase(id, true, "pod not running")
324+
}
325+
308326
ctx.networks.removeInterface(id, result)
309327
}
310328

@@ -328,6 +346,11 @@ func (ctx *VmContext) AddContainer(c *api.ContainerDescription, result chan api.
328346
ctx.lock.Lock()
329347
defer ctx.lock.Unlock()
330348

349+
if ctx.current != StateRunning {
350+
ctx.Log(DEBUG, "add container %s during %v", c.Id, ctx.current)
351+
result <- NewNotReadyError(ctx.Id)
352+
}
353+
331354
if ctx.LogLevel(TRACE) {
332355
ctx.Log(TRACE, "add container %#v", c)
333356
}
@@ -393,6 +416,11 @@ func (ctx *VmContext) RemoveContainer(id string, result chan<- api.Result) {
393416
ctx.lock.Lock()
394417
defer ctx.lock.Unlock()
395418

419+
if ctx.current != StateRunning {
420+
ctx.Log(DEBUG, "remove container %s during %v", id, ctx.current)
421+
result <- api.NewResultBase(id, true, "pod not running")
422+
}
423+
396424
cc, ok := ctx.containers[id]
397425
if !ok {
398426
ctx.Log(WARNING, "container %s not exist", id)
@@ -417,6 +445,11 @@ func (ctx *VmContext) AddVolume(vol *api.VolumeDescription, result chan api.Resu
417445
ctx.lock.Lock()
418446
defer ctx.lock.Unlock()
419447

448+
if ctx.current != StateRunning {
449+
ctx.Log(DEBUG, "add volume %s during %v", vol.Name, ctx.current)
450+
result <- NewNotReadyError(ctx.Id)
451+
}
452+
420453
if _, ok := ctx.volumes[vol.Name]; ok {
421454
estr := fmt.Sprintf("duplicate volume %s", vol.Name)
422455
ctx.Log(WARNING, estr)
@@ -441,6 +474,11 @@ func (ctx *VmContext) RemoveVolume(name string, result chan<- api.Result) {
441474
ctx.lock.Lock()
442475
defer ctx.lock.Unlock()
443476

477+
if ctx.current != StateRunning {
478+
ctx.Log(DEBUG, "remove container %s during %v", name, ctx.current)
479+
result <- api.NewResultBase(name, true, "pod not running")
480+
}
481+
444482
disk, ok := ctx.volumes[name]
445483
if !ok {
446484
ctx.Log(WARNING, "volume %s not exist", name)

hypervisor/vm.go

Lines changed: 15 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (vm *Vm) WaitResponse(match matchResponse, timeout int) chan error {
143143
}
144144

145145
func (vm *Vm) ReleaseVm() error {
146-
if vm.ctx.current != StateRunning {
146+
if !vm.ctx.IsRunning() {
147147
return nil
148148
}
149149

@@ -226,58 +226,6 @@ func (vm *Vm) WaitProcess(isContainer bool, ids []string, timeout int) <-chan *a
226226
return result
227227
}
228228

229-
//func (vm *Vm) handlePodEvent(mypod *PodStatus) {
230-
// glog.V(1).Infof("hyperHandlePodEvent pod %s, vm %s", mypod.Id, vm.Id)
231-
//
232-
// Status, err := vm.GetResponseChan()
233-
// if err != nil {
234-
// return
235-
// }
236-
// defer vm.ReleaseResponseChan(Status)
237-
//
238-
// exit := false
239-
// mypod.Wg.Add(1)
240-
// for {
241-
// Response, ok := <-Status
242-
// if !ok {
243-
// break
244-
// }
245-
//
246-
// switch Response.Code {
247-
// case types.E_CONTAINER_FINISHED:
248-
// ps, ok := Response.Data.(*types.ProcessFinished)
249-
// if ok {
250-
// mypod.SetOneContainerStatus(ps.Id, ps.Code)
251-
// close(ps.Ack)
252-
// }
253-
// case types.E_EXEC_FINISHED:
254-
// ps, ok := Response.Data.(*types.ProcessFinished)
255-
// if ok {
256-
// mypod.SetExecStatus(ps.Id, ps.Code)
257-
// close(ps.Ack)
258-
// }
259-
// case types.E_VM_SHUTDOWN: // vm exited, sucessful or not
260-
// if mypod.Status == types.S_POD_RUNNING { // not received finished pod before
261-
// mypod.Status = types.S_POD_FAILED
262-
// mypod.FinishedAt = time.Now().Format("2006-01-02T15:04:05Z")
263-
// mypod.SetContainerStatus(types.S_POD_FAILED)
264-
// }
265-
// mypod.Vm = ""
266-
// exit = true
267-
// }
268-
//
269-
// if mypod.Handler != nil {
270-
// mypod.Handler.Handle(Response, mypod.Handler.Data, mypod, vm)
271-
// }
272-
//
273-
// if exit {
274-
// vm.clients = nil
275-
// break
276-
// }
277-
// }
278-
// mypod.Wg.Done()
279-
//}
280-
281229
func (vm *Vm) InitSandbox(config *api.SandboxConfig) {
282230
vm.ctx.SetNetworkEnvironment(config)
283231
vm.ctx.startPod()
@@ -299,7 +247,7 @@ func (vm *Vm) WaitInit() api.Result {
299247
}
300248

301249
func (vm *Vm) Shutdown() api.Result {
302-
if vm.ctx.current != StateRunning {
250+
if !vm.ctx.IsRunning() {
303251
return api.NewResultBase(vm.Id, false, "not in running state")
304252
}
305253

@@ -352,10 +300,6 @@ func (vm *Vm) AddRoute() error {
352300
}
353301

354302
func (vm *Vm) AddNic(info *api.InterfaceDescription) error {
355-
if vm.ctx.current != StateRunning {
356-
return NewNotReadyError(vm.Id)
357-
}
358-
359303
client := make(chan api.Result, 1)
360304
vm.ctx.AddInterface(info, client)
361305

@@ -375,10 +319,6 @@ func (vm *Vm) AddNic(info *api.InterfaceDescription) error {
375319
}
376320

377321
func (vm *Vm) DeleteNic(id string) error {
378-
if vm.ctx.current != StateRunning {
379-
return NewNotReadyError(vm.Id)
380-
}
381-
382322
client := make(chan api.Result, 1)
383323
vm.ctx.RemoveInterface(id, client)
384324

@@ -403,7 +343,7 @@ func (vm *Vm) SetCpus(cpus int) error {
403343
return nil
404344
}
405345

406-
if vm.ctx.current != StateRunning {
346+
if !vm.ctx.IsRunning() {
407347
return NewNotReadyError(vm.Id)
408348
}
409349

@@ -420,7 +360,7 @@ func (vm *Vm) AddMem(totalMem int) error {
420360
}
421361

422362
size := totalMem - vm.Mem
423-
if vm.ctx.current != StateRunning {
363+
if !vm.ctx.IsRunning() {
424364
return NewNotReadyError(vm.Id)
425365
}
426366

@@ -525,6 +465,10 @@ func (vm *Vm) Exec(container, execId, cmd string, terminal bool, tty *TtyIO) err
525465
}
526466

527467
func (vm *Vm) AddProcess(container, execId string, terminal bool, args []string, env []string, workdir string, tty *TtyIO) error {
468+
if !vm.ctx.IsRunning() {
469+
return NewNotReadyError(vm.Id)
470+
}
471+
528472
envs := []hyperstartapi.EnvironmentVar{}
529473

530474
for _, v := range env {
@@ -553,21 +497,12 @@ func (vm *Vm) AddProcess(container, execId string, terminal bool, args []string,
553497
}
554498

555499
func (vm *Vm) AddVolume(vol *api.VolumeDescription) api.Result {
556-
if vm.ctx.current != StateRunning {
557-
vm.Log(ERROR, "VM is not ready for insert volume %#v", vol)
558-
return NewNotReadyError(vm.Id)
559-
}
560-
561500
result := make(chan api.Result, 1)
562501
vm.ctx.AddVolume(vol, result)
563502
return <-result
564503
}
565504

566505
func (vm *Vm) AddContainer(c *api.ContainerDescription) api.Result {
567-
if vm.ctx.current != StateRunning {
568-
return NewNotReadyError(vm.Id)
569-
}
570-
571506
result := make(chan api.Result, 1)
572507
vm.ctx.AddContainer(c, result)
573508
return <-result
@@ -631,10 +566,6 @@ func (vm *Vm) batchWaitResult(names []string, op waitResultOp) (bool, map[string
631566
}
632567

633568
func (vm *Vm) StartContainer(id string) error {
634-
if vm.ctx.current != StateRunning {
635-
return NewNotReadyError(vm.Id)
636-
}
637-
638569
err := vm.ctx.newContainer(id)
639570
if err != nil {
640571
return fmt.Errorf("Create new container failed: %v", err)
@@ -652,10 +583,6 @@ func (vm *Vm) Tty(containerId, execId string, row, column int) error {
652583
}
653584

654585
func (vm *Vm) Attach(tty *TtyIO, container string) error {
655-
if vm.ctx.current != StateRunning {
656-
return NewNotReadyError(vm.Id)
657-
}
658-
659586
cmd := &AttachCommand{
660587
Streams: tty,
661588
Container: container,
@@ -666,7 +593,7 @@ func (vm *Vm) Attach(tty *TtyIO, container string) error {
666593
func (vm *Vm) Stats() *types.PodStats {
667594
ctx := vm.ctx
668595

669-
if ctx.current != StateRunning {
596+
if !vm.ctx.IsRunning() {
670597
vm.ctx.Log(WARNING, "could not get stats from non-running pod")
671598
return nil
672599
}
@@ -681,7 +608,7 @@ func (vm *Vm) Stats() *types.PodStats {
681608

682609
func (vm *Vm) Pause(pause bool) error {
683610
ctx := vm.ctx
684-
if ctx.current != StateRunning {
611+
if !vm.ctx.IsRunning() {
685612
return NewNotReadyError(vm.Id)
686613
}
687614

@@ -713,10 +640,13 @@ func (vm *Vm) Pause(pause bool) error {
713640

714641
func (vm *Vm) Save(path string) error {
715642
ctx := vm.ctx
643+
if !vm.ctx.IsRunning() {
644+
return NewNotReadyError(vm.Id)
645+
}
716646

717647
ctx.pauseLock.Lock()
718648
defer ctx.pauseLock.Unlock()
719-
if ctx.current != StateRunning || ctx.PauseState != PauseStatePaused {
649+
if ctx.PauseState != PauseStatePaused {
720650
return NewNotReadyError(vm.Id)
721651
}
722652

@@ -726,7 +656,7 @@ func (vm *Vm) Save(path string) error {
726656
func (vm *Vm) GetIPAddrs() []string {
727657
ips := []string{}
728658

729-
if vm.ctx.current != StateRunning {
659+
if !vm.ctx.IsRunning() {
730660
vm.Log(ERROR, "get pod ip failed: %v", NewNotReadyError(vm.Id))
731661
return ips
732662
}

hypervisor/vm_states.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ func (ctx *VmContext) newContainer(id string) error {
3030
ctx.lock.Lock()
3131
defer ctx.lock.Unlock()
3232

33+
if ctx.current != StateRunning {
34+
ctx.Log(DEBUG, "start container %s during %v", id, ctx.current)
35+
return NewNotReadyError(ctx.Id)
36+
}
37+
3338
c, ok := ctx.containers[id]
3439
if ok {
3540
ctx.Log(TRACE, "start sending INIT_NEWCONTAINER")
@@ -48,6 +53,12 @@ func (ctx *VmContext) newContainer(id string) error {
4853
func (ctx *VmContext) restoreContainer(id string) (alive bool, err error) {
4954
ctx.lock.Lock()
5055
defer ctx.lock.Unlock()
56+
57+
if ctx.current != StateRunning {
58+
ctx.Log(DEBUG, "start container %s during %v", id, ctx.current)
59+
return false, NewNotReadyError(ctx.Id)
60+
}
61+
5162
c, ok := ctx.containers[id]
5263
if !ok {
5364
return false, fmt.Errorf("try to associate a container not exist in sandbox")
@@ -77,6 +88,11 @@ func (ctx *VmContext) attachCmd(cmd *AttachCommand) error {
7788
ctx.lock.Lock()
7889
defer ctx.lock.Unlock()
7990

91+
if ctx.current != StateRunning {
92+
ctx.Log(DEBUG, "attach container %s during %v", cmd.Container, ctx.current)
93+
return NewNotReadyError(ctx.Id)
94+
}
95+
8096
c, ok := ctx.containers[cmd.Container]
8197
if !ok {
8298
estr := fmt.Sprintf("cannot find container %s to attach", cmd.Container)

0 commit comments

Comments
 (0)