Skip to content

Commit c0dfd6e

Browse files
authored
Merge pull request #1669 from safchain/debug-weird-node-not-found
Fix graph replication
2 parents bf87f6a + 647609c commit c0dfd6e

35 files changed

+617
-422
lines changed

analyzer/server.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
usertopology "github.com/skydive-project/skydive/topology/enhancers"
4848
"github.com/skydive-project/skydive/topology/probes/netlink"
4949
"github.com/skydive-project/skydive/ui"
50+
"github.com/skydive-project/skydive/websocket"
5051
ws "github.com/skydive-project/skydive/websocket"
5152
)
5253

@@ -251,8 +252,20 @@ func NewServerFromConfig() (*Server, error) {
251252

252253
uiServer.RegisterLoginRoute(apiAuthBackend)
253254

255+
peers, err := config.GetAnalyzerServiceAddresses()
256+
if err != nil {
257+
return nil, fmt.Errorf("Unable to get the analyzers list: %s", err)
258+
}
259+
260+
opts := websocket.ServerOpts{
261+
WriteCompression: true,
262+
QueueSize: 10000,
263+
PingDelay: 2 * time.Second,
264+
PongTimeout: 5 * time.Second,
265+
}
266+
254267
clusterAuthOptions := ClusterAuthenticationOpts()
255-
hub, err := hub.NewHub(hserver, g, cached, apiAuthBackend, clusterAuthBackend, clusterAuthOptions, "/ws/agent/topology", true, 10000, 2*time.Second, 5*time.Second)
268+
hub, err := hub.NewHub(hserver, g, cached, apiAuthBackend, clusterAuthBackend, clusterAuthOptions, "/ws/agent/topology", peers, opts)
256269
if err != nil {
257270
return nil, err
258271
}

analyzer/storage.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
es "github.com/skydive-project/skydive/storage/elasticsearch"
3131
)
3232

33-
// NewESConfig returns a new elasticsearch configution for the given backend name
33+
// NewESConfig returns a new elasticsearch configuration for the given backend name
3434
func NewESConfig(name ...string) es.Config {
3535
cfg := es.Config{}
3636

@@ -63,13 +63,14 @@ func newGraphBackendFromConfig(etcdClient *etcd.Client) (graph.Backend, error) {
6363
cfg := NewESConfig(backend)
6464
return graph.NewElasticSearchBackendFromConfig(cfg, etcdClient)
6565
case "memory":
66-
return graph.NewMemoryBackend()
66+
// cached memory will be used
67+
return nil, nil
6768
case "orientdb":
6869
addr := config.GetString(configPath + ".addr")
6970
database := config.GetString(configPath + ".database")
7071
username := config.GetString(configPath + ".username")
7172
password := config.GetString(configPath + ".password")
72-
return graph.NewOrientDBBackend(addr, database, username, password)
73+
return graph.NewOrientDBBackend(addr, database, username, password, etcdClient)
7374
default:
7475
return nil, fmt.Errorf("Topology backend driver '%s' not supported", driver)
7576
}

cmd/client/topology.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import (
2626

2727
"github.com/skydive-project/skydive/common"
2828
"github.com/skydive-project/skydive/config"
29-
"github.com/skydive-project/skydive/graffiti/graph"
3029
"github.com/skydive-project/skydive/graffiti/hub"
30+
gws "github.com/skydive-project/skydive/graffiti/websocket"
3131
"github.com/skydive-project/skydive/websocket"
3232
"github.com/spf13/cobra"
3333
)
@@ -91,7 +91,7 @@ var TopologyImport = &cobra.Command{
9191
exitOnError(err)
9292
}
9393

94-
syncMsg := []*graph.SyncMsg{}
94+
syncMsg := []*gws.SyncMsg{}
9595
if err := json.Unmarshal(content, &syncMsg); err != nil {
9696
exitOnError(err)
9797
}
@@ -101,14 +101,14 @@ var TopologyImport = &cobra.Command{
101101
}
102102

103103
for _, node := range syncMsg[0].Nodes {
104-
msg := websocket.NewStructMessage(graph.Namespace, graph.NodeAddedMsgType, node)
104+
msg := gws.NewStructMessage(gws.NodeAddedMsgType, node)
105105
if err := client.SendMessage(msg); err != nil {
106106
exitOnError(fmt.Errorf("Failed to send message: %s", err))
107107
}
108108
}
109109

110110
for _, edge := range syncMsg[0].Edges {
111-
msg := websocket.NewStructMessage(graph.Namespace, graph.EdgeAddedMsgType, edge)
111+
msg := gws.NewStructMessage(gws.EdgeAddedMsgType, edge)
112112
if err := client.SendMessage(msg); err != nil {
113113
exitOnError(fmt.Errorf("Failed to send message: %s", err))
114114
}

config/websocket.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,14 @@ func NewWSClient(clientType common.ServiceType, url *url.URL, opts websocket.Cli
4444

4545
// NewWSServer creates a Server based on the configuration
4646
func NewWSServer(server *shttp.Server, endpoint string, authBackend shttp.AuthenticationBackend) *websocket.Server {
47-
queueSize := GetInt("http.ws.queue_size")
48-
writeCompression := GetBool("http.ws.enable_write_compression")
4947
pingDelay := time.Duration(GetInt("http.ws.ping_delay")) * time.Second
50-
pongTimeout := time.Duration(GetInt("http.ws.pong_timeout"))*time.Second + pingDelay
5148

52-
return websocket.NewServer(server, endpoint, authBackend, writeCompression, queueSize, pingDelay, pongTimeout)
49+
opts := websocket.ServerOpts{
50+
WriteCompression: GetBool("http.ws.enable_write_compression"),
51+
QueueSize: GetInt("http.ws.queue_size"),
52+
PingDelay: pingDelay,
53+
PongTimeout: time.Duration(GetInt("http.ws.pong_timeout"))*time.Second + pingDelay,
54+
}
55+
56+
return websocket.NewServer(server, endpoint, authBackend, opts)
5357
}

flow/storage/orientdb/orientdb.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func (c *Storage) SearchRawPackets(fsq filters.SearchQuery, packetFilter *filter
270270
}
271271
}
272272

273-
result, err := c.client.Search(sql)
273+
result, err := c.client.SQL(sql)
274274
if err != nil {
275275
return nil, err
276276
}
@@ -316,7 +316,7 @@ func (c *Storage) SearchMetrics(fsq filters.SearchQuery, metricFilter *filters.F
316316
}
317317
}
318318

319-
result, err := c.client.Search(sql)
319+
result, err := c.client.SQL(sql)
320320
if err != nil {
321321
return nil, err
322322
}

graffiti/cmd/hub.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
ge "github.com/skydive-project/skydive/gremlin/traversal"
3232
shttp "github.com/skydive-project/skydive/http"
3333
"github.com/skydive-project/skydive/logging"
34+
"github.com/skydive-project/skydive/websocket"
3435
"github.com/spf13/cobra"
3536
)
3637

@@ -91,7 +92,7 @@ var HubCmd = &cobra.Command{
9192

9293
authBackend := shttp.NewNoAuthenticationBackend()
9394

94-
// declare all extension available throught API and filtering
95+
// declare all extension available through API and filtering
9596
tr := traversal.NewGremlinTraversalParser()
9697
tr.AddTraversalExtension(ge.NewDescendantsTraversalExtension())
9798

@@ -101,7 +102,14 @@ var HubCmd = &cobra.Command{
101102
}
102103
api.RegisterTopologyAPI(httpServer, g, tr, authBackend)
103104

104-
hub, err := hub.NewHub(httpServer, g, cached, authBackend, authBackend, nil, "/ws/pod", writeCompression, queueSize, time.Second*time.Duration(pingDelay), time.Second*time.Duration(pongTimeout))
105+
serverOpts := websocket.ServerOpts{
106+
WriteCompression: writeCompression,
107+
QueueSize: queueSize,
108+
PingDelay: time.Second * time.Duration(pingDelay),
109+
PongTimeout: time.Second * time.Duration(pongTimeout),
110+
}
111+
112+
hub, err := hub.NewHub(httpServer, g, cached, authBackend, authBackend, nil, "/ws/pod", nil, serverOpts)
105113
if err != nil {
106114
logging.GetLogger().Error(err)
107115
os.Exit(1)

0 commit comments

Comments
 (0)