Skip to content

Commit 647609c

Browse files
committed
graph: fix remaining element in datastore after restart
1 parent 41a8e55 commit 647609c

File tree

10 files changed

+172
-82
lines changed

10 files changed

+172
-82
lines changed

analyzer/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func newGraphBackendFromConfig(etcdClient *etcd.Client) (graph.Backend, error) {
7070
database := config.GetString(configPath + ".database")
7171
username := config.GetString(configPath + ".username")
7272
password := config.GetString(configPath + ".password")
73-
return graph.NewOrientDBBackend(addr, database, username, password)
73+
return graph.NewOrientDBBackend(addr, database, username, password, etcdClient)
7474
default:
7575
return nil, fmt.Errorf("Topology backend driver '%s' not supported", driver)
7676
}

flow/storage/orientdb/orientdb.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func (c *Storage) SearchRawPackets(fsq filters.SearchQuery, packetFilter *filter
270270
}
271271
}
272272

273-
result, err := c.client.Search(sql)
273+
result, err := c.client.SQL(sql)
274274
if err != nil {
275275
return nil, err
276276
}
@@ -316,7 +316,7 @@ func (c *Storage) SearchMetrics(fsq filters.SearchQuery, metricFilter *filters.F
316316
}
317317
}
318318

319-
result, err := c.client.Search(sql)
319+
result, err := c.client.SQL(sql)
320320
if err != nil {
321321
return nil, err
322322
}

graffiti/graph/elasticsearch.go

Lines changed: 26 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package graph
1919

2020
import (
2121
"encoding/json"
22-
"errors"
2322
"fmt"
2423

2524
"github.com/olivere/elastic"
@@ -113,12 +112,12 @@ var topologyArchiveIndex = es.Index{
113112
RollIndex: true,
114113
}
115114

116-
// ElasticSearchBackend describes a presisent backend based on ElasticSearch
115+
// ElasticSearchBackend describes a persistent backend based on ElasticSearch
117116
type ElasticSearchBackend struct {
118117
Backend
119-
client es.ClientInterface
120-
prevRevision map[Identifier]*rawData
121-
electionService common.MasterElectionService
118+
client es.ClientInterface
119+
prevRevision map[Identifier]*rawData
120+
election common.MasterElection
122121
}
123122

124123
// TimedSearchQuery describes a search query within a time slice and metadata filters
@@ -525,66 +524,43 @@ func (b *ElasticSearchBackend) IsHistorySupported() bool {
525524
return true
526525
}
527526

528-
func (b *ElasticSearchBackend) DelOriginGraph(origin string) error {
529-
tsq := &TimedSearchQuery{
530-
TimeFilter: filters.NewNullFilter("DeletedAt"),
531-
SearchQuery: filters.SearchQuery{
532-
Filter: filters.NewTermStringFilter("Origin", origin),
533-
},
534-
}
527+
func (b *ElasticSearchBackend) flushGraph() error {
528+
logging.GetLogger().Info("Flush graph elements")
535529

536-
var fails bool
537-
for _, e := range b.searchEdges(tsq) {
538-
if err := b.EdgeDeleted(e); err != nil {
539-
fails = true
540-
}
541-
}
542-
for _, n := range b.searchNodes(tsq) {
543-
if err := b.NodeDeleted(n); err != nil {
544-
fails = true
545-
}
546-
}
530+
query := es.FormatFilter(filters.NewNullFilter("DeletedAt"), "")
547531

548-
if fails {
549-
return errors.New("Error while deleting the graph")
550-
}
532+
script := elastic.NewScript("ctx._source.DeletedAt = params.now; ctx._source.ArchivedAt = params.now;")
533+
script.Lang("painless")
534+
script.Params(map[string]interface{}{
535+
"now": TimeUTC().Unix(),
536+
})
551537

552-
return nil
538+
return b.client.UpdateByScript("graph_element", query, script, topologyLiveIndex.Alias(), topologyArchiveIndex.IndexWildcard())
553539
}
554540

555-
func (b *ElasticSearchBackend) DelGraph() error {
556-
tsq := &TimedSearchQuery{
557-
TimeFilter: filters.NewNullFilter("DeletedAt"),
558-
}
559-
560-
var fails bool
561-
for _, e := range b.searchEdges(tsq) {
562-
if err := b.EdgeDeleted(e); err != nil {
563-
fails = true
541+
// OnStarted implements storage client listener interface
542+
func (b *ElasticSearchBackend) OnStarted() {
543+
if b.election != nil && b.election.IsMaster() {
544+
if err := b.flushGraph(); err != nil {
545+
logging.GetLogger().Errorf("Unable to flush graph element: %s", err)
564546
}
565547
}
566-
for _, n := range b.searchNodes(tsq) {
567-
if err := b.NodeDeleted(n); err != nil {
568-
fails = true
569-
}
570-
}
571-
572-
if fails {
573-
return errors.New("Error while deleting the graph")
574-
}
575-
576-
return nil
577548
}
578549

579550
// NewElasticSearchBackendFromClient creates a new graph backend using the given elasticsearch
580551
// client connection
581552
func NewElasticSearchBackendFromClient(client es.ClientInterface, electionService common.MasterElectionService) (*ElasticSearchBackend, error) {
582553
c := &ElasticSearchBackend{
583-
client: client,
584-
prevRevision: make(map[Identifier]*rawData),
585-
electionService: electionService,
554+
client: client,
555+
prevRevision: make(map[Identifier]*rawData),
556+
}
557+
558+
if electionService != nil {
559+
c.election = electionService.NewElection("es-graph-flush")
560+
c.election.StartAndWait()
586561
}
587562

563+
client.AddEventListener(c)
588564
client.Start()
589565

590566
return c, nil

graffiti/graph/elasticsearch_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"github.com/skydive-project/skydive/common"
2929
"github.com/skydive-project/skydive/filters"
30+
"github.com/skydive-project/skydive/storage"
3031
es "github.com/skydive-project/skydive/storage/elasticsearch"
3132
)
3233

@@ -85,12 +86,17 @@ func (f *fakeESClient) Search(typ string, query elastic.Query, fsq filters.Searc
8586
}
8687
func (f *fakeESClient) Start() {
8788
}
89+
func (f *fakeESClient) AddEventListener(l storage.EventListener) {
90+
}
91+
func (f *fakeESClient) UpdateByScript(typ string, query elastic.Query, script *elastic.Script, indices ...string) error {
92+
return nil
93+
}
8894

8995
func newElasticsearchGraph(t *testing.T) (*Graph, *fakeESClient) {
9096
client := &fakeESClient{
9197
indices: make(map[string]*fakeESIndex),
9298
}
93-
b, err := NewElasticSearchBackendFromClient(client)
99+
b, err := NewElasticSearchBackendFromClient(client, nil)
94100
client.searchResult.Hits = &elastic.SearchHits{}
95101

96102
if err != nil {

graffiti/graph/orientdb.go

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"encoding/json"
2222
"fmt"
2323
"strings"
24-
"time"
2524

25+
"github.com/skydive-project/skydive/common"
2626
"github.com/skydive-project/skydive/filters"
2727
"github.com/skydive-project/skydive/logging"
2828
"github.com/skydive-project/skydive/storage/orientdb"
@@ -31,7 +31,8 @@ import (
3131
// OrientDBBackend describes an OrientDB backend
3232
type OrientDBBackend struct {
3333
Backend
34-
client orientdb.ClientInterface
34+
client orientdb.ClientInterface
35+
election common.MasterElection
3536
}
3637

3738
type eventTime struct {
@@ -90,7 +91,7 @@ func (o *OrientDBBackend) updateTimes(e string, id string, events ...eventTime)
9091
attrs = append(attrs, fmt.Sprintf("%s = %d", event.name, event.t.Unix()))
9192
}
9293
query := fmt.Sprintf("UPDATE %s SET %s WHERE DeletedAt IS NULL AND ArchivedAt IS NULL AND ID = '%s'", e, strings.Join(attrs, ", "), id)
93-
result, err := o.client.Search(query)
94+
result, err := o.client.SQL(query)
9495
if err != nil {
9596
return fmt.Errorf("Error while deleting %s: %s", id, err)
9697
}
@@ -145,7 +146,7 @@ func (o *OrientDBBackend) searchNodes(t Context, where string) []*Node {
145146
query += " ORDER BY UpdatedAt"
146147
}
147148

148-
result, err := o.client.Search(query)
149+
result, err := o.client.SQL(query)
149150
if err != nil {
150151
logging.GetLogger().Errorf("Error while retrieving nodes: %s", err)
151152
return nil
@@ -172,7 +173,7 @@ func (o *OrientDBBackend) searchEdges(t Context, where string) []*Edge {
172173
query += " ORDER BY UpdatedAt"
173174
}
174175

175-
result, err := o.client.Search(query)
176+
result, err := o.client.SQL(query)
176177
if err != nil {
177178
logging.GetLogger().Errorf("Error while retrieving edges: %s", err)
178179
return nil
@@ -320,7 +321,44 @@ func (o *OrientDBBackend) IsHistorySupported() bool {
320321
return true
321322
}
322323

323-
func newOrientDBBackend(client orientdb.ClientInterface) (*OrientDBBackend, error) {
324+
func (o *OrientDBBackend) flushGraph() error {
325+
logging.GetLogger().Info("Flush graph elements")
326+
327+
now := TimeUTC().Unix()
328+
329+
query := fmt.Sprintf("UPDATE Node SET DeletedAt = %d, ArchivedAt = %d WHERE DeletedAt IS NULL", now, now)
330+
if _, err := o.client.SQL(query); err != nil {
331+
return fmt.Errorf("Error while flushing graph: %s", err)
332+
}
333+
334+
query = fmt.Sprintf("UPDATE Edge SET DeletedAt = %d, ArchivedAt = %d WHERE DeletedAt IS NULL", now, now)
335+
if _, err := o.client.SQL(query); err != nil {
336+
return fmt.Errorf("Error while flushing graph: %s", err)
337+
}
338+
339+
return nil
340+
}
341+
342+
// OnStarted implements storage client listener interface
343+
func (o *OrientDBBackend) OnStarted() {
344+
if o.election != nil && o.election.IsMaster() {
345+
o.flushGraph()
346+
}
347+
}
348+
349+
func newOrientDBBackend(client orientdb.ClientInterface, electionService common.MasterElectionService) (*OrientDBBackend, error) {
350+
o := &OrientDBBackend{
351+
client: client,
352+
}
353+
354+
if electionService != nil {
355+
o.election = electionService.NewElection("orientdb-graph-flush")
356+
o.election.StartAndWait()
357+
}
358+
359+
client.AddEventListener(o)
360+
client.Connect()
361+
324362
if _, err := client.GetDocumentClass("Node"); err != nil {
325363
class := orientdb.ClassDefinition{
326364
Name: "Node",
@@ -371,18 +409,16 @@ func newOrientDBBackend(client orientdb.ClientInterface) (*OrientDBBackend, erro
371409
}
372410
}
373411

374-
return &OrientDBBackend{
375-
client: client,
376-
}, nil
412+
return o, nil
377413
}
378414

379415
// NewOrientDBBackend creates a new graph backend and
380416
// connect to an OrientDB instance
381-
func NewOrientDBBackend(addr string, database string, username string, password string) (*OrientDBBackend, error) {
417+
func NewOrientDBBackend(addr string, database string, username string, password string, electionService common.MasterElectionService) (*OrientDBBackend, error) {
382418
client, err := orientdb.NewClient(addr, database, username, password)
383419
if err != nil {
384420
return nil, err
385421
}
386422

387-
return newOrientDBBackend(client)
423+
return newOrientDBBackend(client, electionService)
388424
}

graffiti/graph/orientdb_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"github.com/skydive-project/skydive/common"
3030
"github.com/skydive-project/skydive/filters"
31+
"github.com/skydive-project/skydive/storage"
3132
"github.com/skydive-project/skydive/storage/orientdb"
3233
)
3334

@@ -108,11 +109,12 @@ func (f *fakeOrientDBClient) Query(obj string, query *filters.SearchQuery) (*ori
108109
func (f *fakeOrientDBClient) Connect() error {
109110
return nil
110111
}
112+
func (f *fakeOrientDBClient) AddEventListener(l storage.EventListener) {
113+
}
111114

112115
func newOrientDBGraph(t *testing.T) (*Graph, *fakeOrientDBClient) {
113116
client := &fakeOrientDBClient{}
114-
b, err := newOrientDBBackend(client)
115-
117+
b, err := newOrientDBBackend(client, nil)
116118
if err != nil {
117119
t.Error(err)
118120
}

storage/elasticsearch/client.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/skydive-project/skydive/common"
3636
"github.com/skydive-project/skydive/filters"
3737
"github.com/skydive-project/skydive/logging"
38+
"github.com/skydive-project/skydive/storage"
3839
)
3940

4041
const (
@@ -61,6 +62,8 @@ type ClientInterface interface {
6162
BulkDelete(index Index, id string) error
6263
Search(typ string, query elastic.Query, pagination filters.SearchQuery, indices ...string) (*elastic.SearchResult, error)
6364
Start()
65+
AddEventListener(listener storage.EventListener)
66+
UpdateByScript(typ string, query elastic.Query, script *elastic.Script, indices ...string) error
6467
}
6568

6669
// Index defines a Client Index
@@ -84,6 +87,7 @@ type Client struct {
8487
cfg Config
8588
indices map[string]Index
8689
rollService *rollIndexService
90+
listeners []storage.EventListener
8791
}
8892

8993
var (
@@ -220,6 +224,12 @@ func (c *Client) start() error {
220224

221225
logging.GetLogger().Infof("client started for %s", strings.Join(aliases, ", "))
222226

227+
c.RLock()
228+
for _, l := range c.listeners {
229+
l.OnStarted()
230+
}
231+
c.RUnlock()
232+
223233
return nil
224234
}
225235

@@ -337,6 +347,14 @@ func (c *Client) BulkDelete(index Index, id string) error {
337347
return nil
338348
}
339349

350+
// UpdateByScript updates the document using the given script
351+
func (c *Client) UpdateByScript(typ string, query elastic.Query, script *elastic.Script, indices ...string) error {
352+
if _, err := c.esClient.UpdateByQuery(indices...).Type(typ).Query(query).Script(script).Do(context.Background()); err != nil {
353+
return err
354+
}
355+
return nil
356+
}
357+
340358
// Search an object
341359
func (c *Client) Search(typ string, query elastic.Query, opts filters.SearchQuery, indices ...string) (*elastic.SearchResult, error) {
342360
searchQuery := c.esClient.
@@ -392,6 +410,12 @@ func (c *Client) Stop() {
392410

393411
c.esClient.Stop()
394412
}
413+
414+
c.RLock()
415+
for _, l := range c.listeners {
416+
l.OnStarted()
417+
}
418+
c.RUnlock()
395419
}
396420

397421
// Started is the client already started ?
@@ -417,6 +441,13 @@ func (c *Client) GetClient() *elastic.Client {
417441
return c.esClient
418442
}
419443

444+
// AddEventListener add event listener
445+
func (c *Client) AddEventListener(listener storage.EventListener) {
446+
c.Lock()
447+
c.listeners = append(c.listeners, listener)
448+
c.Unlock()
449+
}
450+
420451
// NewClient creates a new ElasticSearch client based on configuration
421452
func NewClient(indices []Index, cfg Config, electionService common.MasterElectionService) (*Client, error) {
422453
url, err := urlFromHost(cfg.ElasticHost)

0 commit comments

Comments
 (0)