Skip to content

Commit e455f7a

Browse files
committed
Improve subgraph performance with persistent backends
Do just one call to get all the edges for the given nodes, instead of one call for each node. This new call has been added as a new method to Graph and Backends. GetNodesEdges, return the list with all edges for a list of nodes. Batching is used to avoid hitting the max number of clauses set by ES (is set to the default value of 512).
1 parent d1b35f8 commit e455f7a

File tree

6 files changed

+120
-8
lines changed

6 files changed

+120
-8
lines changed

graffiti/graph/cachedbackend.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@ func (c *CachedBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher) (edge
110110
return c.persistent.GetNodeEdges(n, t, m)
111111
}
112112

113+
// GetNodesEdges return the list of all edges for a list of nodes within time slice, matching metadata
114+
func (c *CachedBackend) GetNodesEdges(n []*Node, t Context, m ElementMatcher) (edges []*Edge) {
115+
mode := c.cacheMode.Load()
116+
117+
if t.TimeSlice == nil || mode == CacheOnlyMode || c.persistent == nil {
118+
return c.memory.GetNodesEdges(n, t, m)
119+
}
120+
121+
return c.persistent.GetNodesEdges(n, t, m)
122+
}
123+
113124
// EdgeAdded add an edge in the cache
114125
func (c *CachedBackend) EdgeAdded(e *Edge) error {
115126
mode := c.cacheMode.Load()

graffiti/graph/elasticsearch.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ const graphElementMapping = `
8787
const (
8888
nodeType = "node"
8989
edgeType = "edge"
90+
// maxClauseCount limit the number of clauses in one query to ES
91+
maxClauseCount = 512
9092
)
9193

9294
// ElasticSearchBackend describes a persistent backend based on ElasticSearch
@@ -506,6 +508,53 @@ func (b *ElasticSearchBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher
506508
return
507509
}
508510

511+
// GetNodesEdges return the list of all edges for a list of nodes within time slice
512+
func (b *ElasticSearchBackend) GetNodesEdges(nodeList []*Node, t Context, m ElementMatcher) (edges []*Edge) {
513+
if len(nodeList) == 0 {
514+
return []*Edge{}
515+
}
516+
517+
// See comment at GetNodesFromIDs
518+
// As we are adding two operations per item, make small batches
519+
nodesBatch := batchNodes(nodeList, maxClauseCount/2)
520+
521+
for _, nList := range nodesBatch {
522+
var filter *filters.Filter
523+
if m != nil {
524+
f, err := m.Filter()
525+
if err != nil {
526+
return []*Edge{}
527+
}
528+
filter = f
529+
}
530+
531+
var searchQuery filters.SearchQuery
532+
if !t.TimePoint {
533+
searchQuery = filters.SearchQuery{Sort: true, SortBy: "UpdatedAt"}
534+
}
535+
536+
nodesFilter := []*filters.Filter{}
537+
for _, n := range nList {
538+
nodesFilter = append(nodesFilter, filters.NewTermStringFilter("Parent", string(n.ID)))
539+
nodesFilter = append(nodesFilter, filters.NewTermStringFilter("Child", string(n.ID)))
540+
}
541+
searchQuery.Filter = filters.NewOrFilter(nodesFilter...)
542+
543+
edges = append(edges, b.searchEdges(&TimedSearchQuery{
544+
SearchQuery: searchQuery,
545+
TimeFilter: getTimeFilter(t.TimeSlice),
546+
ElementFilter: filter,
547+
})...)
548+
549+
}
550+
551+
if len(edges) > 1 && t.TimePoint {
552+
edges = dedupEdges(edges)
553+
}
554+
555+
return
556+
}
557+
509558
// IsHistorySupported returns that this backend does support history
510559
func (b *ElasticSearchBackend) IsHistorySupported() bool {
511560
return true
@@ -647,3 +696,15 @@ func NewElasticSearchBackendFromConfig(cfg es.Config, extraDynamicTemplates map[
647696

648697
return newElasticSearchBackendFromClient(client, cfg.IndexPrefix, liveIndex, archiveIndex, logger), nil
649698
}
699+
700+
func batchNodes(items []*Node, batchSize int) [][]*Node {
701+
batches := make([][]*Node, 0, (len(items)+batchSize-1)/batchSize)
702+
703+
for batchSize < len(items) {
704+
items, batches = items[batchSize:], append(batches, items[0:batchSize:batchSize])
705+
}
706+
batches = append(batches, items)
707+
708+
return batches
709+
710+
}

graffiti/graph/graph.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ type Backend interface {
112112
NodeDeleted(n *Node) error
113113
GetNode(i Identifier, at Context) []*Node
114114
GetNodeEdges(n *Node, at Context, m ElementMatcher) []*Edge
115+
GetNodesEdges(n []*Node, at Context, m ElementMatcher) []*Edge
115116

116117
EdgeAdded(e *Edge) error
117118
EdgeDeleted(e *Edge) error
@@ -1365,6 +1366,14 @@ func (g *Graph) GetNodeEdges(n *Node, m ElementMatcher) []*Edge {
13651366
return g.backend.GetNodeEdges(n, g.context, m)
13661367
}
13671368

1369+
// GetNodesEdges returns the list with all edges for a list of nodes
1370+
func (g *Graph) GetNodesEdges(n []*Node, m ElementMatcher) []*Edge {
1371+
if len(n) == 0 {
1372+
return []*Edge{}
1373+
}
1374+
return g.backend.GetNodesEdges(n, g.context, m)
1375+
}
1376+
13681377
func (g *Graph) String() string {
13691378
j, _ := json.Marshal(g)
13701379
return string(j)

graffiti/graph/memory.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,22 @@ func (m *MemoryBackend) GetNodeEdges(n *Node, t Context, meta ElementMatcher) []
153153
return edges
154154
}
155155

156+
// GetNodesEdges returns the list of edges for a list of nodes
157+
func (m *MemoryBackend) GetNodesEdges(nodeList []*Node, t Context, meta ElementMatcher) []*Edge {
158+
edges := []*Edge{}
159+
for _, n := range nodeList {
160+
if n, ok := m.nodes[n.ID]; ok {
161+
for _, e := range n.edges {
162+
if e.MatchMetadata(meta) {
163+
edges = append(edges, e.Edge)
164+
}
165+
}
166+
}
167+
}
168+
169+
return edges
170+
}
171+
156172
// EdgeDeleted in the graph backend
157173
func (m *MemoryBackend) EdgeDeleted(e *Edge) error {
158174
if _, ok := m.edges[e.ID]; !ok {

graffiti/graph/orientdb.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,23 @@ func (o *OrientDBBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher) (ed
232232
return o.searchEdges(t, query)
233233
}
234234

235+
// GetNodesEdges returns a list of a node edges within time slice
236+
func (o *OrientDBBackend) GetNodesEdges(nodeList []*Node, t Context, m ElementMatcher) (edges []*Edge) {
237+
query := orientdb.FilterToExpression(getTimeFilter(t.TimeSlice), nil)
238+
query += fmt.Sprintf(" AND (")
239+
for i, n := range nodeList {
240+
if i == len(nodeList)-1 {
241+
query += fmt.Sprintf(" Parent = '%s' OR Child = '%s')", n.ID, n.ID)
242+
} else {
243+
query += fmt.Sprintf(" Parent = '%s' OR Child = '%s' OR", n.ID, n.ID)
244+
}
245+
}
246+
if matcherQuery := matcherToOrientDBSelectString(m); matcherQuery != "" {
247+
query += " AND " + matcherQuery
248+
}
249+
return o.searchEdges(t, query)
250+
}
251+
235252
func (o *OrientDBBackend) createEdge(e *Edge) error {
236253
fromQuery := fmt.Sprintf("SELECT FROM Node WHERE DeletedAt IS NULL AND ArchivedAt IS NULL AND ID = '%s'", e.Parent)
237254
toQuery := fmt.Sprintf("SELECT FROM Node WHERE DeletedAt IS NULL AND ArchivedAt IS NULL AND ID = '%s'", e.Child)

graffiti/graph/traversal/traversal.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,14 +1415,12 @@ func (tv *GraphTraversalV) SubGraph(ctx StepContext, s ...interface{}) *GraphTra
14151415

14161416
// then insert edges, ignore edge insert error since one of the linked node couldn't be part
14171417
// of the SubGraph
1418-
for _, n := range tv.nodes {
1419-
edges := tv.GraphTraversal.Graph.GetNodeEdges(n, nil)
1420-
for _, e := range edges {
1421-
switch err := memory.EdgeAdded(e); err {
1422-
case nil, graph.ErrParentNotFound, graph.ErrChildNotFound, graph.ErrEdgeConflict:
1423-
default:
1424-
return &GraphTraversal{error: fmt.Errorf("Error while adding edge to SubGraph: %s", err)}
1425-
}
1418+
edges := tv.GraphTraversal.Graph.GetNodesEdges(tv.nodes, nil)
1419+
for _, e := range edges {
1420+
switch err := memory.EdgeAdded(e); err {
1421+
case nil, graph.ErrParentNotFound, graph.ErrChildNotFound, graph.ErrEdgeConflict:
1422+
default:
1423+
return &GraphTraversal{error: fmt.Errorf("Error while adding edge to SubGraph: %s", err)}
14261424
}
14271425
}
14281426

0 commit comments

Comments
 (0)