@@ -2,6 +2,7 @@ package template
22
33import (
44 "fmt"
5+ "time"
56
67 "github.com/docker/infrakit/pkg/discovery"
78 metadata_rpc "github.com/docker/infrakit/pkg/rpc/metadata"
@@ -20,64 +21,179 @@ func MetadataFunc(discovery func() discovery.Plugins) func(string, ...interface{
2021
2122 return func (path string , optionalValue ... interface {}) (interface {}, error ) {
2223
23- if plugins == nil {
24- return nil , fmt .Errorf ("no plugin discovery:%s" , path )
24+ switch len (optionalValue ) {
25+ case 0 : // read
26+ return doGetSet (plugins , path , optionalValue ... )
27+ case 1 : // set
28+ return doGetSet (plugins , path , optionalValue ... )
29+ case 2 : // a retry time + timeout is specified for a read
30+ retry , err := duration (optionalValue [0 ])
31+ if err != nil {
32+ return nil , err
33+ }
34+ timeout , err := duration (optionalValue [1 ])
35+ if err != nil {
36+ return nil , err
37+ }
38+ return doBlockingGet (plugins , path , retry , timeout )
39+ case 3 : // a retry time + timeout is specified for a read + bool to return error
40+ retry , err := duration (optionalValue [0 ])
41+ if err != nil {
42+ return nil , err
43+ }
44+ timeout , err := duration (optionalValue [1 ])
45+ if err != nil {
46+ return nil , err
47+ }
48+ errOnTimeout , is := optionalValue [2 ].(bool )
49+ if ! is {
50+ return nil , fmt .Errorf ("must be boolean %v" , optionalValue [2 ])
51+ }
52+ v , err := doBlockingGet (plugins , path , retry , timeout )
53+ if errOnTimeout {
54+ return v , err
55+ }
56+ return v , nil
2557 }
58+ return template .VoidValue , fmt .Errorf ("wrong number of args" )
59+ }
60+ }
2661
27- mpath := types .PathFromString (path )
28- first := mpath .Index (0 )
29- if first == nil {
30- return nil , fmt .Errorf ("unknown plugin from path: %s" , path )
31- }
62+ func duration (v interface {}) (time.Duration , error ) {
63+ switch v := v .(type ) {
64+ case time.Duration :
65+ return v , nil
66+ case types.Duration :
67+ return v .Duration (), nil
68+ case []byte :
69+ return time .ParseDuration (string (v ))
70+ case string :
71+ return time .ParseDuration (string (v ))
72+ case int64 :
73+ return time .Duration (int64 (v )), nil
74+ case uint :
75+ return time .Duration (int64 (v )), nil
76+ case uint64 :
77+ return time .Duration (int64 (v )), nil
78+ case int :
79+ return time .Duration (int64 (v )), nil
80+ }
81+ return 0 , fmt .Errorf ("cannot convert to duration: %v" , v )
82+ }
83+
84+ // blocking get from metadata. block the same go routine / thread until timeout or value is available
85+ func doBlockingGet (plugins func () discovery.Plugins , path string , retry , timeout time.Duration ) (interface {}, error ) {
86+
87+ if plugins == nil {
88+ return nil , fmt .Errorf ("no plugin discovery:%s" , path )
89+ }
90+
91+ mpath := types .PathFromString (path )
92+ first := mpath .Index (0 )
93+ if first == nil {
94+ return nil , fmt .Errorf ("unknown plugin from path: %s" , path )
95+ }
96+
97+ lookup , err := plugins ().List ()
98+ endpoint , has := lookup [* first ]
99+ if ! has {
100+ return false , nil // Don't return error. Just return false for non-existence
101+ } else if mpath .Len () == 1 {
102+ return true , nil // This is a test for availability of the plugin
103+ }
104+
105+ metadataPlugin , err := metadata_rpc .NewClient (endpoint .Address )
106+ if err != nil {
107+ return nil , fmt .Errorf ("cannot connect to plugin: %s" , * first )
108+ }
109+
110+ key := mpath .Shift (1 )
111+ var value interface {}
112+
113+ expiry := time .Now ().Add (timeout )
114+
115+ for i := 0 ; ; i ++ {
32116
33- lookup , err := plugins ().List ()
34- endpoint , has := lookup [* first ]
35- if ! has {
36- return false , nil // Don't return error. Just return false for non-existence
37- } else if mpath .Len () == 1 {
38- return true , nil // This is a test for availability of the plugin
117+ any , err := metadataPlugin .Get (key )
118+ if err == nil && any != nil {
119+ err = any .Decode (& value )
120+ if err != nil {
121+ return any .String (), err // note the type changed to string in error return
122+ }
123+ return value , err
39124 }
40125
41- metadataPlugin , err := metadata_rpc .NewClient (endpoint .Address )
42- if err != nil {
43- return nil , fmt .Errorf ("cannot connect to plugin: %s" , * first )
126+ if i > 0 && time .Now ().After (expiry ) {
127+ break
44128 }
45129
46- key := mpath .Shift (1 )
47- var value interface {}
48- any , err := metadataPlugin .Get (key )
49- if err != nil {
50- return nil , err
130+ if retry > 0 {
131+ <- time .After (retry )
51132 }
133+ }
134+ return value , fmt .Errorf ("expired waiting" )
135+ }
136+
137+ func doGetSet (plugins func () discovery.Plugins , path string , optionalValue ... interface {}) (interface {}, error ) {
138+ if plugins == nil {
139+ return nil , fmt .Errorf ("no plugin discovery:%s" , path )
140+ }
141+
142+ mpath := types .PathFromString (path )
143+ first := mpath .Index (0 )
144+ if first == nil {
145+ return nil , fmt .Errorf ("unknown plugin from path: %s" , path )
146+ }
147+
148+ lookup , err := plugins ().List ()
149+ endpoint , has := lookup [* first ]
150+ if ! has {
151+ return false , nil // Don't return error. Just return false for non-existence
152+ } else if mpath .Len () == 1 {
153+ return true , nil // This is a test for availability of the plugin
154+ }
52155
156+ metadataPlugin , err := metadata_rpc .NewClient (endpoint .Address )
157+ if err != nil {
158+ return nil , fmt .Errorf ("cannot connect to plugin: %s" , * first )
159+ }
160+
161+ key := mpath .Shift (1 )
162+ var value interface {}
163+ any , err := metadataPlugin .Get (key )
164+ if err != nil {
165+ return nil , err
166+ }
167+
168+ if any != nil {
53169 err = any .Decode (& value )
54170 if err != nil {
55171 return any .String (), err // note the type changed to string in error return
56172 }
173+ }
57174
58- // Update case: return value is the version before this successful commit.
59- if len (optionalValue ) == 1 {
175+ // Update case: return value is the version before this successful commit.
176+ if len (optionalValue ) == 1 {
60177
61- any , err := types .AnyValue (optionalValue [0 ])
62- if err != nil {
63- return value , err
64- }
65-
66- // update it
67- updatablePlugin , is := metadataPlugin .(metadata.Updatable )
68- if ! is {
69- return value , fmt .Errorf ("value is read-only" )
70- }
71- _ , proposed , cas , err := updatablePlugin .Changes ([]metadata.Change {
72- {
73- Path : key ,
74- Value : any ,
75- },
76- })
77- err = updatablePlugin .Commit (proposed , cas )
78- return template .VoidValue , err
178+ any , err := types .AnyValue (optionalValue [0 ])
179+ if err != nil {
180+ return value , err
79181 }
80182
81- return value , err
183+ // update it
184+ updatablePlugin , is := metadataPlugin .(metadata.Updatable )
185+ if ! is {
186+ return value , fmt .Errorf ("value is read-only" )
187+ }
188+ _ , proposed , cas , err := updatablePlugin .Changes ([]metadata.Change {
189+ {
190+ Path : key ,
191+ Value : any ,
192+ },
193+ })
194+ err = updatablePlugin .Commit (proposed , cas )
195+ return template .VoidValue , err
82196 }
197+
198+ return value , err
83199}
0 commit comments