Skip to content

Commit 88137ed

Browse files
committed
Neighbors step
Like Descendants, but using edges in any direction (Descendants only uses edges from parent to child, Neighbors uses from parent to child and from child to parent). The different with the Both step, is Neighbors accumulate nodes seen. Example (pseudo-syntax): A -> B -> C G.V(A).Out().Out() return: C. But: G.V(A).Neighbors(2) return: A,B,C The parameters allowed are the same as in Descendants. Example: G.V('foo').Neighbors('RelationType',Within('ownership','foobar'),2) To improve speed and reduce backend load when using persistent backends, a new method, GetNodesFromIDs, is implemented in Graph and Backends. This method only uses one call (or a few in we have hundreds of nodes, see batching) to get all nodes from the backend. Batching is used to avoid hitting the max number of clauses set by ES (is set to the default value of 512).
1 parent e455f7a commit 88137ed

File tree

11 files changed

+778
-0
lines changed

11 files changed

+778
-0
lines changed

analyzer/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ func NewServerFromConfig() (*Server, error) {
333333
tr.AddTraversalExtension(ge.NewSocketsTraversalExtension())
334334
tr.AddTraversalExtension(ge.NewDescendantsTraversalExtension())
335335
tr.AddTraversalExtension(ge.NewAscendantsTraversalExtension())
336+
tr.AddTraversalExtension(ge.NewNeighborsTraversalExtension())
336337
tr.AddTraversalExtension(ge.NewNextHopTraversalExtension())
337338
tr.AddTraversalExtension(ge.NewGroupTraversalExtension())
338339

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ require (
7777
github.com/spf13/cobra v1.1.1
7878
github.com/spf13/pflag v1.0.5
7979
github.com/spf13/viper v1.7.0
80+
github.com/stretchr/testify v1.6.1 // indirect
8081
github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c
8182
github.com/tebeka/go2xunit v1.4.10
8283
github.com/tebeka/selenium v0.0.0-20170314201507-657e45ec600f

graffiti/graph/cachedbackend.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,17 @@ func (c *CachedBackend) GetNode(i Identifier, t Context) []*Node {
9999
return c.persistent.GetNode(i, t)
100100
}
101101

102+
// GetNodesFromIDs retrieve the list of nodes for the list of identifiers from the cache within a time slice
103+
func (c *CachedBackend) GetNodesFromIDs(i []Identifier, t Context) []*Node {
104+
mode := c.cacheMode.Load()
105+
106+
if t.TimeSlice == nil || mode == CacheOnlyMode || c.persistent == nil {
107+
return c.memory.GetNodesFromIDs(i, t)
108+
}
109+
110+
return c.persistent.GetNodesFromIDs(i, t)
111+
}
112+
102113
// GetNodeEdges retrieve a list of edges from a node within a time slice, matching metadata
103114
func (c *CachedBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher) (edges []*Edge) {
104115
mode := c.cacheMode.Load()

graffiti/graph/elasticsearch.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,44 @@ func (b *ElasticSearchBackend) GetNode(i Identifier, t Context) []*Node {
244244
return nodes
245245
}
246246

247+
// GetNodesFromIDs get the list of nodes for the list of identifiers within a time slice
248+
func (b *ElasticSearchBackend) GetNodesFromIDs(identifiersList []Identifier, t Context) []*Node {
249+
if len(identifiersList) == 0 {
250+
return []*Node{}
251+
}
252+
253+
// ES default max number of clauses is set by default to 1024
254+
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-settings.html
255+
// Group queries in a maximum of half of the max.
256+
// Other filters (time), will be also in the query.
257+
identifiersBatch := batchIdentifiers(identifiersList, maxClauseCount)
258+
259+
nodes := []*Node{}
260+
261+
for _, idList := range identifiersBatch {
262+
identifiersFilter := []*filters.Filter{}
263+
for _, i := range idList {
264+
identifiersFilter = append(identifiersFilter, filters.NewTermStringFilter("ID", string(i)))
265+
}
266+
identifiersORFilter := filters.NewOrFilter(identifiersFilter...)
267+
268+
nodes = append(nodes, b.searchNodes(&TimedSearchQuery{
269+
SearchQuery: filters.SearchQuery{
270+
Filter: identifiersORFilter,
271+
Sort: true,
272+
SortBy: "Revision",
273+
},
274+
TimeFilter: getTimeFilter(t.TimeSlice),
275+
})...)
276+
}
277+
278+
if len(nodes) > 1 && t.TimePoint {
279+
return []*Node{nodes[len(nodes)-1]}
280+
}
281+
282+
return nodes
283+
}
284+
247285
func (b *ElasticSearchBackend) indexEdge(e *Edge) error {
248286
raw, err := edgeToRaw(e)
249287
if err != nil {
@@ -708,3 +746,14 @@ func batchNodes(items []*Node, batchSize int) [][]*Node {
708746
return batches
709747

710748
}
749+
750+
func batchIdentifiers(items []Identifier, batchSize int) [][]Identifier {
751+
batches := make([][]Identifier, 0, (len(items)+batchSize-1)/batchSize)
752+
753+
for batchSize < len(items) {
754+
items, batches = items[batchSize:], append(batches, items[0:batchSize:batchSize])
755+
}
756+
batches = append(batches, items)
757+
758+
return batches
759+
}

graffiti/graph/graph.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ type Backend interface {
111111
NodeAdded(n *Node) error
112112
NodeDeleted(n *Node) error
113113
GetNode(i Identifier, at Context) []*Node
114+
GetNodesFromIDs(i []Identifier, at Context) []*Node
114115
GetNodeEdges(n *Node, at Context, m ElementMatcher) []*Edge
115116
GetNodesEdges(n []*Node, at Context, m ElementMatcher) []*Edge
116117

@@ -1186,6 +1187,14 @@ func (g *Graph) GetNode(i Identifier) *Node {
11861187
return nil
11871188
}
11881189

1190+
// GetNodesFromIDs returns a list of nodes from a list of identifiers
1191+
func (g *Graph) GetNodesFromIDs(i []Identifier) []*Node {
1192+
if len(i) == 0 {
1193+
return []*Node{}
1194+
}
1195+
return g.backend.GetNodesFromIDs(i, g.context)
1196+
}
1197+
11891198
// CreateNode returns a new node not bound to a graph
11901199
func CreateNode(i Identifier, m Metadata, t Time, h string, o string) *Node {
11911200
n := &Node{

graffiti/graph/memory.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,17 @@ func (m *MemoryBackend) GetNode(i Identifier, t Context) []*Node {
138138
return nil
139139
}
140140

141+
// GetNodesFromIDs from the graph backend
142+
func (m *MemoryBackend) GetNodesFromIDs(identifiersList []Identifier, t Context) []*Node {
143+
nodes := []*Node{}
144+
for _, i := range identifiersList {
145+
if n, ok := m.nodes[i]; ok {
146+
nodes = append(nodes, n.Node)
147+
}
148+
}
149+
return nodes
150+
}
151+
141152
// GetNodeEdges returns a list of edges of a node
142153
func (m *MemoryBackend) GetNodeEdges(n *Node, t Context, meta ElementMatcher) []*Edge {
143154
edges := []*Edge{}

graffiti/graph/orientdb.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,23 @@ func (o *OrientDBBackend) GetNode(i Identifier, t Context) (nodes []*Node) {
222222
return o.searchNodes(t, query)
223223
}
224224

225+
func (o *OrientDBBackend) GetNodesFromIDs(identifiersList []Identifier, t Context) (nodes []*Node) {
226+
query := orientdb.FilterToExpression(getTimeFilter(t.TimeSlice), nil)
227+
query += fmt.Sprintf(" AND (")
228+
for i, id := range identifiersList {
229+
if i == len(identifiersList)-1 {
230+
query += fmt.Sprintf(" ID = '%s') ORDER BY Revision", id)
231+
} else {
232+
query += fmt.Sprintf(" ID = '%s' OR", id)
233+
}
234+
}
235+
236+
if t.TimePoint {
237+
query += " DESC LIMIT 1"
238+
}
239+
return o.searchNodes(t, query)
240+
}
241+
225242
// GetNodeEdges returns a list of a node edges within time slice
226243
func (o *OrientDBBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher) (edges []*Edge) {
227244
query := orientdb.FilterToExpression(getTimeFilter(t.TimeSlice), nil)

gremlin/traversal/neighbors.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package traversal
2+
3+
import (
4+
"github.com/pkg/errors"
5+
6+
"github.com/skydive-project/skydive/graffiti/filters"
7+
"github.com/skydive-project/skydive/graffiti/graph"
8+
"github.com/skydive-project/skydive/graffiti/graph/traversal"
9+
"github.com/skydive-project/skydive/topology"
10+
)
11+
12+
// NeighborsTraversalExtension describes a new extension to enhance the topology
13+
type NeighborsTraversalExtension struct {
14+
NeighborsToken traversal.Token
15+
}
16+
17+
// NeighborsGremlinTraversalStep navigate the graph starting from a node, following edges
18+
// from parent to child and from child to parent.
19+
// It follows the same sintaxis as Ascendants and Descendants step.
20+
// The behaviour is like Ascendants+Descendants combined.
21+
// If only one param is defined, it is used as depth, eg: G.V('id').Neighbors(4)
22+
// If we have an event number of parameters, they are used as edge filter, and
23+
// depth is defaulted to one, eg.: G.V('id').Neighbors("Type","foo","RelationType","bar")
24+
// If we have an odd, but >1, number of parameters, all but the last one are used as
25+
// edge filters and the last one as depth, eg.: G.V('id').Neighbors("Type","foo","RelationType","bar",3)
26+
type NeighborsGremlinTraversalStep struct {
27+
context traversal.GremlinTraversalContext
28+
maxDepth int64
29+
edgeFilter graph.ElementMatcher
30+
// nextStepOnlyIDs is set to true if the next step only needs node IDs and not the whole node info
31+
nextStepOnlyIDs bool
32+
}
33+
34+
// NewNeighborsTraversalExtension returns a new graph traversal extension
35+
func NewNeighborsTraversalExtension() *NeighborsTraversalExtension {
36+
return &NeighborsTraversalExtension{
37+
NeighborsToken: traversalNeighborsToken,
38+
}
39+
}
40+
41+
// ScanIdent returns an associated graph token
42+
func (e *NeighborsTraversalExtension) ScanIdent(s string) (traversal.Token, bool) {
43+
switch s {
44+
case "NEIGHBORS":
45+
return e.NeighborsToken, true
46+
}
47+
return traversal.IDENT, false
48+
}
49+
50+
// ParseStep parses neighbors step
51+
func (e *NeighborsTraversalExtension) ParseStep(t traversal.Token, p traversal.GremlinTraversalContext) (traversal.GremlinTraversalStep, error) {
52+
switch t {
53+
case e.NeighborsToken:
54+
default:
55+
return nil, nil
56+
}
57+
58+
maxDepth := int64(1)
59+
edgeFilter, _ := topology.OwnershipMetadata().Filter()
60+
61+
switch len(p.Params) {
62+
case 0:
63+
default:
64+
i := len(p.Params) / 2 * 2
65+
filter, err := traversal.ParamsToFilter(filters.BoolFilterOp_OR, p.Params[:i]...)
66+
if err != nil {
67+
return nil, errors.Wrap(err, "Neighbors accepts an optional number of key/value tuples and an optional depth")
68+
}
69+
edgeFilter = filter
70+
71+
if i == len(p.Params) {
72+
break
73+
}
74+
75+
fallthrough
76+
case 1:
77+
depth, ok := p.Params[len(p.Params)-1].(int64)
78+
if !ok {
79+
return nil, errors.New("Neighbors last argument must be the maximum depth specified as an integer")
80+
}
81+
maxDepth = depth
82+
}
83+
84+
return &NeighborsGremlinTraversalStep{context: p, maxDepth: maxDepth, edgeFilter: graph.NewElementFilter(edgeFilter)}, nil
85+
}
86+
87+
// getNeighbors given a list of nodes, get its neighbors nodes for "maxDepth" depth relationships.
88+
// Edges between nodes must fulfill "edgeFilter" filter.
89+
// Nodes passed to this function will always be in the response.
90+
func (d *NeighborsGremlinTraversalStep) getNeighbors(g *graph.Graph, nodes []*graph.Node) []*graph.Node {
91+
// visitedNodes store neighors and avoid visiting twice the same node
92+
visitedNodes := map[graph.Identifier]interface{}{}
93+
94+
// currentDepthNodesIDs slice with the nodes being processed in each depth.
95+
// We use "empty" while procesing the neighbors nodes to avoid extra calls to the backend.
96+
var currentDepthNodesIDs []graph.Identifier
97+
// nextDepthNodes slice were next depth nodes are being stored.
98+
// Initializated with the list of origin nodes where it should start from.
99+
nextDepthNodesIDs := make([]graph.Identifier, 0, len(nodes))
100+
101+
// Mark origin nodes as already visited
102+
// Neighbor step will return also the origin nodes
103+
for _, n := range nodes {
104+
visitedNodes[n.ID] = struct{}{}
105+
nextDepthNodesIDs = append(nextDepthNodesIDs, n.ID)
106+
}
107+
108+
// DFS
109+
// BFS must not be used because could lead to ignore some servers in this case:
110+
// A -> B
111+
// B -> C
112+
// C -> D
113+
// A -> C
114+
// With depth=2, BFS will return A,B,C (C is visited in A->B->C, si ignored in A->C->D)
115+
// DFS will return, the correct, A,B,C,D
116+
for i := 0; i < int(d.maxDepth); i++ {
117+
// Copy values from nextDepthNodes to currentDepthNodes
118+
currentDepthNodesIDs = make([]graph.Identifier, len(nextDepthNodesIDs))
119+
copy(currentDepthNodesIDs, nextDepthNodesIDs)
120+
121+
nextDepthNodesIDs = nextDepthNodesIDs[:0] // Clean slice, keeping capacity
122+
// Get all edges for the list of nodes, filtered by edgeFilter
123+
// Convert the list of node ids to a list of nodes
124+
125+
currentDepthNodes := make([]*graph.Node, 0, len(currentDepthNodesIDs))
126+
for _, nID := range currentDepthNodesIDs {
127+
currentDepthNodes = append(currentDepthNodes, graph.CreateNode(nID, graph.Metadata{}, graph.Unix(0, 0), "", ""))
128+
}
129+
edges := g.GetNodesEdges(currentDepthNodes, d.edgeFilter)
130+
131+
for _, e := range edges {
132+
// Get nodeID of the other side of the edge
133+
// Store neighbors
134+
// We don't know in which side of the edge are the neighbors, so, add both sides if not already visited
135+
_, okParent := visitedNodes[e.Parent]
136+
if !okParent {
137+
visitedNodes[e.Parent] = struct{}{}
138+
// Do not walk nodes already processed
139+
nextDepthNodesIDs = append(nextDepthNodesIDs, e.Parent)
140+
}
141+
_, okChild := visitedNodes[e.Child]
142+
if !okChild {
143+
visitedNodes[e.Child] = struct{}{}
144+
nextDepthNodesIDs = append(nextDepthNodesIDs, e.Child)
145+
}
146+
}
147+
}
148+
149+
// Return "empty" nodes (just with the ID) if the next step only need that info
150+
if d.nextStepOnlyIDs {
151+
ret := make([]*graph.Node, 0, len(visitedNodes))
152+
for nID := range visitedNodes {
153+
ret = append(ret, graph.CreateNode(nID, graph.Metadata{}, graph.Unix(0, 0), "", ""))
154+
}
155+
return ret
156+
}
157+
158+
// Get concurrentl all nodes for the list of neighbors ids
159+
nodesIDs := make([]graph.Identifier, 0, len(visitedNodes))
160+
for n := range visitedNodes {
161+
nodesIDs = append(nodesIDs, n)
162+
}
163+
164+
return g.GetNodesFromIDs(nodesIDs)
165+
}
166+
167+
// Exec Neighbors step
168+
func (d *NeighborsGremlinTraversalStep) Exec(last traversal.GraphTraversalStep) (traversal.GraphTraversalStep, error) {
169+
switch tv := last.(type) {
170+
case *traversal.GraphTraversalV:
171+
tv.GraphTraversal.RLock()
172+
neighbors := d.getNeighbors(tv.GraphTraversal.Graph, tv.GetNodes())
173+
tv.GraphTraversal.RUnlock()
174+
175+
return traversal.NewGraphTraversalV(tv.GraphTraversal, neighbors), nil
176+
}
177+
return nil, traversal.ErrExecutionError
178+
}
179+
180+
// Reduce Neighbors step
181+
func (d *NeighborsGremlinTraversalStep) Reduce(next traversal.GremlinTraversalStep) (traversal.GremlinTraversalStep, error) {
182+
return next, nil
183+
}
184+
185+
// Context Neighbors step
186+
func (d *NeighborsGremlinTraversalStep) Context() *traversal.GremlinTraversalContext {
187+
return &d.context
188+
}

0 commit comments

Comments
 (0)