@@ -2,6 +2,7 @@ package main
22
33import (
44 "bytes"
5+ "context"
56 "encoding/json"
67 "errors"
78 "io"
@@ -13,7 +14,10 @@ import (
1314 "time"
1415
1516 log "github.com/Sirupsen/logrus"
16- "github.com/fsouza/go-dockerclient"
17+ "github.com/docker/docker/api/types"
18+ "github.com/docker/docker/api/types/events"
19+ "github.com/docker/docker/api/types/filters"
20+ "github.com/docker/docker/client"
1721)
1822
1923// TCPMessage defines what a message that can be
@@ -27,14 +31,10 @@ type TCPMessage struct {
2731}
2832
2933// StatsOptionsEntry is used to collect stats from
30- //mthe Docker daemon
34+ // the Docker daemon
3135type StatsOptionsEntry struct {
32- statsOptions docker.StatsOptions
33- // statsOptions can only store the one-way channel
34- // that's why that extra field is required
35- statsChan chan * docker.Stats
36- // same comment as for statsChan
37- doneChan chan bool
36+ statsChan chan * types.StatsJSON
37+ doneChan chan bool
3838}
3939
4040// ContainerEvent is one kind of Data that can
@@ -54,7 +54,7 @@ type ContainerEvent struct {
5454// Daemon maintains state when the dockercraft daemon is running
5555type Daemon struct {
5656 // Client is an instance of the DockerClient
57- Client * docker .Client
57+ Client * client .Client
5858 // Version is the version of the Docker Daemon
5959 Version string
6060 // BinaryName is the name of the Docker Binary
@@ -63,10 +63,6 @@ type Daemon struct {
6363 // docker daemon through the docker remote api
6464 previousCPUStats map [string ]* CPUStats
6565
66- // containerEvents is a global variable channel
67- // that receives all container events
68- containerEvents chan * docker.APIEvents
69-
7066 // tcpMessages can be used to send bytes to the Lua
7167 // plugin from any go routine.
7268 tcpMessages chan []byte
@@ -94,21 +90,19 @@ type CPUStats struct {
9490// Init initializes a Daemon
9591func (d * Daemon ) Init () error {
9692 var err error
97- d .Client , err = docker . NewClient ( "unix:///var/run/docker.sock" )
93+ d .Client , err = client . NewEnvClient ( )
9894 if err != nil {
9995 return err
10096 }
10197
102- // get the version of the docker remote API
103- env , err := d .Client .Version ( )
98+ // get the version of the remote docker
99+ info , err := d .Client .Info ( context . Background () )
104100 if err != nil {
105- return err
101+ log . Fatal ( err . Error ())
106102 }
107- d .Version = env .Get ("Version" )
108-
103+ d .Version = info .ServerVersion
109104 d .statsOptionsStore = make (map [string ]StatsOptionsEntry )
110105 d .tcpMessages = make (chan []byte )
111- d .containerEvents = make (chan * docker.APIEvents )
112106
113107 return nil
114108}
@@ -138,16 +132,25 @@ func (d *Daemon) Serve() {
138132// Docker daemon and uses callback to transmit them
139133// to LUA scripts.
140134func (d * Daemon ) StartMonitoringEvents () {
141- go func () {
142- err := d .Client .AddEventListener (d .containerEvents )
143- if err != nil {
144- log .Fatal (err )
145- }
146- for {
147- event := <- d .containerEvents
135+ log .Info ("Monitoring Docker Events" )
136+ filters := filters .NewArgs ()
137+ filters .Add ("type" , events .ContainerEventType )
138+ opts := types.EventsOptions {
139+ Filters : filters ,
140+ }
141+
142+ //context.TODO, cancel := context.WithCancel(d.context.TODO)
143+ //defer cancel()
144+ events , errs := d .Client .Events (context .Background (), opts )
145+ for {
146+ select {
147+ case event := <- events :
148+ log .Info ("New Event Received" )
148149 d .eventCallback (event )
150+ case err := <- errs :
151+ log .Fatal (err .Error ())
149152 }
150- }()
153+ }
151154}
152155
153156// handleConn handles a TCP connection
@@ -241,7 +244,7 @@ func (d *Daemon) handleMessage(message []byte) {
241244}
242245
243246// eventCallback receives and handles the docker events
244- func (d * Daemon ) eventCallback (event * docker. APIEvents ) {
247+ func (d * Daemon ) eventCallback (event events. Message ) {
245248
246249 containerEvent , err := d .apiEventToContainerEvent (event )
247250 if err != nil {
@@ -252,7 +255,7 @@ func (d *Daemon) eventCallback(event *docker.APIEvents) {
252255 switch event .Status {
253256
254257 case "create" :
255-
258+ log . Infof ( "Container Create Event received for %s" , containerEvent . ID )
256259 containerEvent .Action = "createContainer"
257260
258261 data , err := containerEventToTCPMsg (containerEvent )
@@ -262,9 +265,9 @@ func (d *Daemon) eventCallback(event *docker.APIEvents) {
262265 }
263266
264267 d .tcpMessages <- append (data , '\n' )
265-
268+ log . Info ( "DONE" )
266269 case "start" :
267-
270+ log . Infof ( "Container Start Event received for %s" , containerEvent . ID )
268271 containerEvent .Action = "startContainer"
269272
270273 data , err := containerEventToTCPMsg (containerEvent )
@@ -276,21 +279,10 @@ func (d *Daemon) eventCallback(event *docker.APIEvents) {
276279 d .tcpMessages <- append (data , '\n' )
277280
278281 d .startStatsMonitoring (containerEvent .ID )
279-
280- case "stop" :
281- // die event is enough
282- // http://docs.docker.com/reference/api/docker_remote_api/#docker-events
283-
284- case "restart" :
285- // start event is enough
286- // http://docs.docker.com/reference/api/docker_remote_api/#docker-events
287-
288- case "kill" :
289- // die event is enough
290- // http://docs.docker.com/reference/api/docker_remote_api/#docker-events
282+ log .Info ("DONE" )
291283
292284 case "die" :
293-
285+ log . Infof ( "Container Die Event received for %s" , containerEvent . ID )
294286 containerEvent .Action = "stopContainer"
295287
296288 data , err := containerEventToTCPMsg (containerEvent )
@@ -302,18 +294,22 @@ func (d *Daemon) eventCallback(event *docker.APIEvents) {
302294 d .tcpMessages <- append (data , '\n' )
303295
304296 d .Lock ()
297+ log .Info ("Removing Container" )
305298 statsOptionsEntry , found := d .statsOptionsStore [containerEvent .ID ]
306299 if found {
300+ log .Info ("Sending Done on channel" )
307301 close (statsOptionsEntry .doneChan )
302+ log .Info ("Deleting the entry from the list" )
308303 delete (d .statsOptionsStore , containerEvent .ID )
309304 }
310305 d .Unlock ()
311306
312307 // enforce 0% display (Cpu & Ram)
313308 d .statCallback (containerEvent .ID , nil )
309+ log .Info ("DONE" )
314310
315311 case "destroy" :
316-
312+ log . Infof ( "Container Destroy Event received for %s" , containerEvent . ID )
317313 containerEvent .Action = "destroyContainer"
318314
319315 data , err := containerEventToTCPMsg (containerEvent )
@@ -322,12 +318,17 @@ func (d *Daemon) eventCallback(event *docker.APIEvents) {
322318 return
323319 }
324320 d .tcpMessages <- append (data , '\n' )
321+ log .Info ("DONE" )
322+
323+ default :
324+ // Ignoring
325+ log .Debug ("Ignoring event: %s" , event .Status )
325326 }
326327}
327328
328329// statCallback receives the stats (cpu & ram) from containers and send them to
329330// the cuberite server
330- func (d * Daemon ) statCallback (id string , stats * docker. Stats , args ... interface {}) {
331+ func (d * Daemon ) statCallback (id string , stats * types. StatsJSON , args ... interface {}) {
331332 containerEvent := ContainerEvent {}
332333 containerEvent .ID = id
333334 containerEvent .Action = "stats"
@@ -339,7 +340,7 @@ func (d *Daemon) statCallback(id string, stats *docker.Stats, args ...interface{
339340 cpuPercent = calculateCPUPercent (preCPUStats , & stats .CPUStats )
340341 }
341342
342- d .previousCPUStats [id ] = & CPUStats {TotalUsage : stats .CPUStats .CPUUsage .TotalUsage , SystemUsage : stats .CPUStats .SystemCPUUsage }
343+ d .previousCPUStats [id ] = & CPUStats {TotalUsage : stats .CPUStats .CPUUsage .TotalUsage , SystemUsage : stats .CPUStats .SystemUsage }
343344
344345 containerEvent .CPU = strconv .FormatFloat (cpuPercent , 'f' , 2 , 64 ) + "%"
345346 containerEvent .RAM = strconv .FormatFloat (memPercent , 'f' , 2 , 64 ) + "%"
@@ -383,7 +384,7 @@ func (d *Daemon) execDockerCmd(args []string) {
383384// listContainers handles and reply to http requests having the path "/containers"
384385func (d * Daemon ) listContainers () {
385386 go func () {
386- containers , err := d .Client .ListContainers (docker. ListContainersOptions {All : true })
387+ containers , err := d .Client .ContainerList ( context . Background (), types. ContainerListOptions {All : true })
387388 if err != nil {
388389 log .Println (err .Error ())
389390 return
@@ -416,9 +417,7 @@ func (d *Daemon) listContainers() {
416417 containerEvent .ImageRepo = imageRepo
417418 containerEvent .ImageTag = imageTag
418419 containerEvent .Name = name
419- // container.Status can be "paused", "Up <time>" or "Exit <code>"
420- parts := strings .SplitN (container .Status , " " , 2 )
421- containerEvent .Running = len (parts ) > 0 && parts [0 ] == "Up"
420+ containerEvent .Running = container .State == "running"
422421
423422 data , err := containerEventToTCPMsg (containerEvent )
424423 if err != nil {
@@ -440,67 +439,69 @@ func (d *Daemon) listContainers() {
440439}
441440
442441func (d * Daemon ) startStatsMonitoring (containerID string ) {
443- // Monitor stats
444- go func () {
445- var statsChan chan * docker.Stats
446- var doneChan chan bool
447-
448- d .Lock ()
449- statsOptionsEntry , found := d .statsOptionsStore [containerID ]
450- if ! found {
451- statsChan = make (chan * docker.Stats )
452- doneChan = make (chan bool )
453- statsOptions := docker.StatsOptions {
454- ID : containerID ,
455- Stats : statsChan ,
456- Stream : true ,
457- Done : doneChan ,
458- Timeout : time .Second * 60 ,
459- }
460- statsOptionsEntry = StatsOptionsEntry {statsOptions , statsChan , doneChan }
461- d .statsOptionsStore [containerID ] = statsOptionsEntry
462- } else {
463- statsChan = statsOptionsEntry .statsChan
464- doneChan = statsOptionsEntry .doneChan
442+ d .Lock ()
443+ statsOptionsEntry , found := d .statsOptionsStore [containerID ]
444+ if ! found {
445+ statsOptionsEntry = StatsOptionsEntry {
446+ make (chan * types.StatsJSON ),
447+ make (chan bool , 1 ),
465448 }
466- d .Unlock ()
449+ d .statsOptionsStore [containerID ] = statsOptionsEntry
450+ }
451+ d .Unlock ()
467452
468- go func () {
469- log .Println ("Start go routine to collect events from" , containerID )
470- for {
471- select {
472- case stats := <- statsChan :
473- //!\\ stats == nil when channel is closed
474- if stats != nil {
475- d .statCallback (containerID , stats )
453+ go func () {
454+ log .Infof ("Start monitoring stats: %s" , containerID )
455+ resp , err := d .Client .ContainerStats (context .Background (), containerID , true )
456+ if err != nil {
457+ log .Printf ("dClient.Stats err: %#v" , err )
458+ }
459+ defer resp .Body .Close ()
460+ dec := json .NewDecoder (resp .Body )
461+ for {
462+ select {
463+ case <- statsOptionsEntry .doneChan :
464+ log .Infof ("Stopping collecting stats for %s" , containerID )
465+ return
466+ default :
467+ v := types.StatsJSON {}
468+ if err := dec .Decode (& v ); err != nil {
469+ dec = json .NewDecoder (io .MultiReader (dec .Buffered (), resp .Body ))
470+ if err != io .EOF {
471+ break
476472 }
477- case <- doneChan :
478- log .Println ("Go routine END" )
479- return
473+ time .Sleep (100 * time .Millisecond )
474+ continue
480475 }
476+ statsOptionsEntry .statsChan <- & v
481477 }
482- }()
478+ }
479+ }()
483480
484- if ! found {
485- log .Println ("Start monitoring events:" , containerID )
486- err := d .Client .Stats (statsOptionsEntry .statsOptions )
487- if err != nil {
488- log .Printf ("dClient.Stats err: %#v" , err )
481+ go func () {
482+ for {
483+ select {
484+ case stats := <- statsOptionsEntry .statsChan :
485+ if stats != nil {
486+ d .statCallback (containerID , stats )
487+ }
488+ case <- statsOptionsEntry .doneChan :
489+ log .Println ("Go routine END" )
490+ return
489491 }
490- log .Println ("Stop monitoring events:" , containerID )
491492 }
492493 }()
493494}
494495
495496// Utility functions
496497
497- func calculateCPUPercent (previousCPUStats * CPUStats , newCPUStats * docker .CPUStats ) float64 {
498+ func calculateCPUPercent (previousCPUStats * CPUStats , newCPUStats * types .CPUStats ) float64 {
498499 var (
499500 cpuPercent = 0.0
500501 // calculate the change for the cpu usage of the container in between readings
501502 cpuDelta = float64 (newCPUStats .CPUUsage .TotalUsage - previousCPUStats .TotalUsage )
502503 // calculate the change for the entire system between readings
503- systemDelta = float64 (newCPUStats .SystemCPUUsage - previousCPUStats .SystemUsage )
504+ systemDelta = float64 (newCPUStats .SystemUsage - previousCPUStats .SystemUsage )
504505 )
505506
506507 if systemDelta > 0.0 && cpuDelta > 0.0 {
@@ -540,18 +541,18 @@ func containerEventToTCPMsg(containerEvent ContainerEvent) ([]byte, error) {
540541 return data , nil
541542}
542543
543- func (d * Daemon ) apiEventToContainerEvent (event * docker. APIEvents ) (ContainerEvent , error ) {
544+ func (d * Daemon ) apiEventToContainerEvent (event events. Message ) (ContainerEvent , error ) {
544545
545546 containerEvent := ContainerEvent {}
546- containerEvent .ID = event .ID
547+ containerEvent .ID = event .Actor . ID
547548
548549 // don't try to inspect container in that case, it's already gone!
549550 if event .Action == "destroy" {
550551 return containerEvent , nil
551552 }
552553
553554 log .Debugf ("apiEventToContainerEvent: %#v\n " , event )
554- container , err := d .Client .InspectContainer ( event .ID )
555+ container , err := d .Client .ContainerInspect ( context . Background (), containerEvent .ID )
555556 if err != nil {
556557 return containerEvent , err
557558 }
0 commit comments