11package main
22
33import (
4+ "crypto/rand"
5+ "encoding/binary"
46 "fmt"
57 event_rpc "github.com/docker/infrakit/pkg/rpc/event"
68 rpc_server "github.com/docker/infrakit/pkg/rpc/server"
9+ "github.com/docker/infrakit/pkg/spi/application"
710 "github.com/docker/infrakit/pkg/spi/event"
811 testing_event "github.com/docker/infrakit/pkg/testing/event"
912 "github.com/docker/infrakit/pkg/types"
1013 MQTT "github.com/eclipse/paho.mqtt.golang"
1114 "github.com/stretchr/testify/require"
1215 "io/ioutil"
1316 "path/filepath"
17+ "strconv"
1418 "testing"
1519 "time"
1620)
1721
18- var MQTTTESTSERVER string = "tcp://test.mosquitto.org:1883"
22+ var MQTTTESTSERVER = "tcp://iot.eclipse.org:1883"
23+ var EVENTNUM = 5
24+
25+ func TestUnitUpdate (t * testing.T ) {
26+ socketPath := tempSocket ()
27+ stub := func () interface {} { return nil }
28+ m := map [string ]interface {}{}
29+ types .Put (types .PathFromString ("test/instance1" ), stub , m )
30+ types .Put (types .PathFromString ("test/instance2" ), stub , m )
31+ plugin := & testing_event.Plugin {
32+ Publisher : (* testing_event .Publisher )(nil ),
33+ DoList : func (topic types.Path ) ([]string , error ) {
34+ return types .List (topic , m ), nil
35+ },
36+ }
37+ var impl rpc_server.VersionedInterface = event_rpc .PluginServerWithTypes (
38+ map [string ]event.Plugin {
39+ "test" : plugin ,
40+ })
41+ server , err := rpc_server .StartPluginAtPath (socketPath , impl )
42+ require .NoError (t , err )
43+ defer server .Stop ()
44+
45+ //Test ADD operation
46+ require .NoError (t , err )
47+ e := NewEventRepeater (socketPath , "" , "stderr" , false ).(* eventRepeater )
48+ mes := & application.Message {
49+ Op : application .ADD ,
50+ Resource : "event" ,
51+ Data : types .AnyString ("[{\" sourcetopic\" :\" test/instance1\" ,\" sinktopic\" :\" test/sink/instance1\" },{\" sourcetopic\" :\" test/instance2\" ,\" sinktopic\" :\" test/sink/instance2\" }]" ),
52+ }
53+ err = e .Update (mes )
54+ require .NoError (t , err )
55+ require .Equal (t , 2 , len (e .Events ))
56+ require .Equal (t , "test/sink/instance1" , e .Events ["test/instance1" ].SinkTopic )
57+ require .Equal (t , "test/sink/instance2" , e .Events ["test/instance2" ].SinkTopic )
58+
59+ //Test DELETE operation
60+ mes = & application.Message {
61+ Op : application .DELETE ,
62+ Resource : "event" ,
63+ Data : types .AnyString ("[{\" sourcetopic\" :\" test/instance2\" ,\" sinktopic\" :\" \" }]" ),
64+ }
65+ err = e .Update (mes )
66+ require .NoError (t , err )
67+ require .Equal (t , 1 , len (e .Events ))
68+ require .Equal (t , "test/sink/instance1" , e .Events ["test/instance1" ].SinkTopic )
69+ _ , ok := e .Events ["test/instance2" ]
70+ require .Equal (t , false , ok )
71+
72+ //Test UPDATE operation
73+ mes = & application.Message {
74+ Op : application .UPDATE ,
75+ Resource : "event" ,
76+ Data : types .AnyString ("[{\" sourcetopic\" :\" test/instance1\" ,\" sinktopic\" :\" test/event/instance1\" }]" ),
77+ }
78+ err = e .Update (mes )
79+ require .NoError (t , err )
80+ require .Equal (t , "test/event/instance1" , e .Events ["test/instance1" ].SinkTopic )
81+ }
1982
2083func tempSocket () string {
2184 dir , err := ioutil .TempDir ("" , "infrakit-test-" )
@@ -24,24 +87,37 @@ func tempSocket() string {
2487 }
2588 return filepath .Join (dir , "app-impl-test" )
2689}
27- func runEvent (startPub chan struct {}) (* string , rpc_server.Stoppable , error ) {
90+ func runEvent (startPub chan struct {}, tPrefix string ) (string , rpc_server.Stoppable , error ) {
2891 socketPath := tempSocket ()
29- events := 5
3092 publishChan0 := make (chan chan <- * event.Event )
3193 go func () {
3294 publish := <- publishChan0
3395 defer close (publish )
3496 <- startPub
3597 // here we have the channel and ready to go
36- for i := 0 ; i < events ; i ++ {
98+ for i := 0 ; i < EVENTNUM ; i ++ {
3799 <- time .After (50 * time .Millisecond )
38- fmt .Printf ("publish event%d\n " , i )
39100 publish <- event.Event {
40101 Topic : types .PathFromString ("instance/create" ),
41102 ID : fmt .Sprintf ("host-%d" , i ),
42103 }.Init ().WithDataMust ([]int {1 , 2 }).Now ()
43104 }
44105 }()
106+ publishChan1 := make (chan chan <- * event.Event )
107+ go func () {
108+ publish := <- publishChan1
109+ defer close (publish )
110+ <- startPub
111+ // here we have the channel and ready to go
112+ for i := 0 ; i < EVENTNUM ; i ++ {
113+ <- time .After (50 * time .Millisecond )
114+ publish <- event.Event {
115+ Topic : types .PathFromString ("instance/create" ),
116+ ID : fmt .Sprintf ("disk-%d" , i ),
117+ }.Init ().WithDataMust ([]string {"foo" , "bar" }).Now ()
118+ }
119+ }()
120+
45121 m := map [string ]interface {}{}
46122 types .Put (types .PathFromString ("instance/create" ), "instance-create" , m )
47123 plugin0 := & testing_event.Plugin {
@@ -55,61 +131,145 @@ func runEvent(startPub chan struct{}) (*string, rpc_server.Stoppable, error) {
55131 },
56132 },
57133 }
134+ plugin1 := & testing_event.Plugin {
135+ DoList : func (topic types.Path ) ([]string , error ) {
136+ return types .List (topic , m ), nil
137+ },
138+ Publisher : & testing_event.Publisher {
139+ DoPublishOn : func (c chan <- * event.Event ) {
140+ publishChan1 <- c
141+ close (publishChan1 )
142+ },
143+ },
144+ }
58145 var impl rpc_server.VersionedInterface = event_rpc .PluginServerWithTypes (
59146 map [string ]event.Plugin {
60- "iktest" : plugin0 ,
147+ tPrefix + "-compute" : plugin0 ,
148+ tPrefix + "-storage" : plugin1 ,
61149 })
62150 server , err := rpc_server .StartPluginAtPath (socketPath , impl )
63151 if err != nil {
64- return nil , nil , err
152+ return "" , nil , err
65153 }
66- return & socketPath , server , nil
154+ return socketPath , server , nil
67155}
68156
69- func runSub (msgch chan MQTT.Message ) (MQTT.Client , error ) {
157+ func runSub (msgch chan MQTT.Message , tPrefix string ) (MQTT.Client , error ) {
70158 opts := MQTT .NewClientOptions ().AddBroker (MQTTTESTSERVER )
71159 client := MQTT .NewClient (opts )
72160 if token := client .Connect (); token .Wait () && token .Error () != nil {
73161 return nil , token .Error ()
74162 }
75163 subToken := client .Subscribe (
76- "iktest /instance/create" ,
164+ tPrefix + " /instance/create" ,
77165 0 ,
78166 func (client MQTT.Client , msg MQTT.Message ) {
79167 msgch <- msg
80168 })
81- fmt .Printf ("mqtt substart" )
82169 if subToken .Wait () && subToken .Error () != nil {
83170 return nil , subToken .Error ()
84171 }
85172 return client , nil
86173}
87174
88- func TestIntegration (t * testing.T ) {
175+ func TestIntegrationAllowAll (t * testing.T ) {
176+ var n uint64
177+ binary .Read (rand .Reader , binary .LittleEndian , & n )
178+ randString := strconv .FormatUint (n , 36 )
179+ topicPrefix := "ifktest-" + randString
89180 startPub := make (chan struct {})
90- socketPath , erpcsrv , err := runEvent (startPub )
181+ socketPath , erpcsrv , err := runEvent (startPub , topicPrefix )
91182 defer erpcsrv .Stop ()
92183 require .NoError (t , err )
93- mqsubch := make (chan MQTT.Message )
94- mqttClient , err := runSub (mqsubch )
184+ mqsubch0 := make (chan MQTT.Message )
185+ mqttClient0 , err := runSub (mqsubch0 , topicPrefix + "-compute" )
186+ require .NoError (t , err )
187+ mqsubch1 := make (chan MQTT.Message )
188+ mqttClient1 , err := runSub (mqsubch1 , topicPrefix + "-storage" )
95189 require .NoError (t , err )
96- defer mqttClient .Disconnect (250 )
97- app := NewEventRepeater (* socketPath , MQTTTESTSERVER , "mqtt" , true )
190+ defer mqttClient0 .Disconnect (250 )
191+ defer mqttClient1 .Disconnect (250 )
192+ app := NewEventRepeater (socketPath , MQTTTESTSERVER , "mqtt" , true )
98193 defer app .(* eventRepeater ).Stop ()
99194 close (startPub )
100- var subEvent int = 0
195+ var subEvent0 int
196+ var subEvent1 int
101197loop:
102198 for {
103199 select {
104200 case <- time .After (500 * time .Millisecond ):
105201 break loop
106- case sub := <- mqsubch :
107- subany := types .AnyBytes (sub .Payload ())
202+ case sub0 := <- mqsubch0 :
203+ subany := types .AnyBytes (sub0 .Payload ())
204+ subevent := event.Event {}
205+ err := subany .Decode (& subevent )
206+ require .NoError (t , err )
207+ require .Equal (t , fmt .Sprintf ("host-%d" , subEvent0 ), subevent .ID )
208+ subEvent0 ++
209+ case sub1 := <- mqsubch1 :
210+ subany := types .AnyBytes (sub1 .Payload ())
211+ subevent := event.Event {}
212+ err := subany .Decode (& subevent )
213+ require .NoError (t , err )
214+ require .Equal (t , fmt .Sprintf ("disk-%d" , subEvent1 ), subevent .ID )
215+ subEvent1 ++
216+
217+ }
218+ }
219+ require .Equal (t , EVENTNUM , subEvent0 )
220+ require .Equal (t , EVENTNUM , subEvent1 )
221+ }
222+
223+ func TestIntegrationDenyAll (t * testing.T ) {
224+ var n uint64
225+ binary .Read (rand .Reader , binary .LittleEndian , & n )
226+ randString := strconv .FormatUint (n , 36 )
227+ topicPrefix := "ifktest-" + randString
228+ startPub := make (chan struct {})
229+ socketPath , erpcsrv , err := runEvent (startPub , topicPrefix )
230+ defer erpcsrv .Stop ()
231+ require .NoError (t , err )
232+ mqsubch0 := make (chan MQTT.Message )
233+ mqttClient0 , err := runSub (mqsubch0 , topicPrefix + "-compute" )
234+ require .NoError (t , err )
235+ mqsubch1 := make (chan MQTT.Message )
236+ mqttClient1 , err := runSub (mqsubch1 , topicPrefix + "-storage" )
237+ require .NoError (t , err )
238+ defer mqttClient0 .Disconnect (250 )
239+ defer mqttClient1 .Disconnect (250 )
240+ app := NewEventRepeater (socketPath , MQTTTESTSERVER , "mqtt" , false )
241+ defer app .(* eventRepeater ).Stop ()
242+ m := & application.Message {
243+ Op : application .ADD ,
244+ Resource : "event" ,
245+ Data : types .AnyString ("[{\" sourcetopic\" :\" " + topicPrefix + "-compute/instance/create\" ,\" sinktopic\" :\" \" }]" ),
246+ }
247+ err = app .Update (m )
248+ require .NoError (t , err )
249+ close (startPub )
250+ var subEvent0 int
251+ var subEvent1 int
252+ loop:
253+ for {
254+ select {
255+ case <- time .After (500 * time .Millisecond ):
256+ break loop
257+ case sub0 := <- mqsubch0 :
258+ subany := types .AnyBytes (sub0 .Payload ())
259+ subevent := event.Event {}
260+ err := subany .Decode (& subevent )
261+ require .NoError (t , err )
262+ require .Equal (t , fmt .Sprintf ("host-%d" , subEvent0 ), subevent .ID )
263+ subEvent0 ++
264+ case sub1 := <- mqsubch1 :
265+ subany := types .AnyBytes (sub1 .Payload ())
108266 subevent := event.Event {}
109267 err := subany .Decode (& subevent )
110268 require .NoError (t , err )
111- require .Equal (t , subevent . ID , fmt .Sprintf ("host -%d" , subEvent ) )
112- subEvent ++
269+ require .Equal (t , fmt .Sprintf ("disk -%d" , subEvent1 ), subevent . ID )
270+ subEvent1 ++
113271 }
114272 }
273+ require .Equal (t , EVENTNUM , subEvent0 )
274+ require .Equal (t , 0 , subEvent1 )
115275}
0 commit comments