Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit b65e9b4

Browse files
author
João Filipe Moura
authored
Merge pull request #30 from inloco/hotfix/workers-locking
Hotfix/workers locking
2 parents f002940 + 71664c2 commit b65e9b4

File tree

7 files changed

+68
-13
lines changed

7 files changed

+68
-13
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ The exported metrics are:
113113
- `kafka_consumer_records_consumed_successfully`: number of records consumed successfully by this instance.
114114
- `kafka_consumer_endpoint_latency_histogram_seconds`: endpoint latency in seconds (insertion to elasticsearch).
115115
- `kafka_consumer_buffer_full`: indicates whether the app buffer is full(meaning that elasticsearch is not being able to keep up with the topic volume).
116+
- `elasticsearch_events_retried`: number of events that needed to be retryed to sent to Elasticsearch
117+
- `elasticsearch_document_already_exists`: number of events that tryed to be inserted on elasticsearch but already existed
118+
- `elasticsearch_bad_request`: the number of requests that failed due to malformed events
116119

117120
## Development
118121

src/elasticsearch/elasticsearch.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"crypto/tls"
66
"fmt"
77

8+
"github.com/inloco/kafka-elasticsearch-injector/src/metrics"
9+
810
"github.com/pkg/errors"
911

1012
"net/http"
@@ -29,8 +31,9 @@ type RecordDatabase interface {
2931
}
3032

3133
type recordDatabase struct {
32-
logger log.Logger
33-
config Config
34+
metricsPublisher metrics.MetricsPublisher
35+
logger log.Logger
36+
config Config
3437
}
3538

3639
func (d recordDatabase) GetClient() *elastic.Client {
@@ -76,6 +79,7 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse
7679
timeout := d.config.BulkTimeout
7780
ctx, cancel := context.WithTimeout(context.Background(), timeout)
7881
defer cancel()
82+
7983
res, err := bulkRequest.Do(ctx)
8084
if err == elastic.ErrNoClient || errors.Cause(err) == elastic.ErrNoClient {
8185
_ = level.Warn(d.logger).Log("message", "no elasticsearch node available", "err", err)
@@ -104,7 +108,12 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse
104108
recordMap[rec.ID] = rec
105109
}
106110
for _, f := range failed {
111+
if f.Status == http.StatusBadRequest {
112+
d.metricsPublisher.ElasticsearchBadRequests(1)
113+
continue
114+
}
107115
if f.Status == http.StatusConflict {
116+
d.metricsPublisher.ElasticsearchConflicts(1)
108117
continue
109118
}
110119
retry = append(retry, recordMap[f.Id])
@@ -117,6 +126,7 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse
117126
level.Warn(d.logger).Log("message", "insert failed: elasticsearch is overloaded", "retry_count", len(retry))
118127
}
119128
}
129+
d.metricsPublisher.ElasticsearchRetries(len(retry))
120130
return &InsertResponse{alreadyExistsIds, retry, overloaded}, nil
121131
}
122132

@@ -145,6 +155,10 @@ func (d recordDatabase) buildBulkRequest(records []*models.ElasticRecord) (*elas
145155
return bulkRequest, nil
146156
}
147157

148-
func NewDatabase(logger log.Logger, config Config) RecordDatabase {
149-
return recordDatabase{logger: logger, config: config}
158+
func NewDatabase(logger log.Logger, config Config, metrics metrics.MetricsPublisher) RecordDatabase {
159+
return recordDatabase{
160+
metricsPublisher: metrics,
161+
logger: logger,
162+
config: config,
163+
}
150164
}

src/elasticsearch/elasticsearch_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/inloco/kafka-elasticsearch-injector/src/kafka/fixtures"
1717
"github.com/inloco/kafka-elasticsearch-injector/src/logger_builder"
18+
"github.com/inloco/kafka-elasticsearch-injector/src/metrics"
1819
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1920
"github.com/olivere/elastic/v7"
2021
"github.com/stretchr/testify/assert"
@@ -28,7 +29,8 @@ var config = Config{
2829
BlacklistedColumns: []string{},
2930
BulkTimeout: 10 * time.Second,
3031
}
31-
var db = NewDatabase(logger, config)
32+
33+
var db = NewDatabase(logger, config, metrics.NewMetricsPublisher())
3234
var template = `
3335
{
3436
"template": "my-topic-*",

src/injector/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func NewService(logger log.Logger, metrics metrics.MetricsPublisher) Service {
2828
return instrumentingMiddleware{
2929
metricsPublisher: metrics,
3030
next: basicService{
31-
store.NewStore(logger),
31+
store.NewStore(logger, metrics),
3232
},
3333
}
3434
}

src/injector/store/store.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/go-kit/kit/log"
77
"github.com/inloco/kafka-elasticsearch-injector/src/elasticsearch"
8+
"github.com/inloco/kafka-elasticsearch-injector/src/metrics"
89
"github.com/inloco/kafka-elasticsearch-injector/src/models"
910
)
1011

@@ -24,6 +25,7 @@ func (s basicStore) Insert(records []*models.Record) error {
2425
if err != nil {
2526
return err
2627
}
28+
2729
for {
2830
res, err := s.db.Insert(elasticRecords)
2931
if err != nil {
@@ -45,10 +47,10 @@ func (s basicStore) ReadinessCheck() bool {
4547
return s.db.ReadinessCheck()
4648
}
4749

48-
func NewStore(logger log.Logger) Store {
50+
func NewStore(logger log.Logger, metricsPublisher metrics.MetricsPublisher) Store {
4951
config := elasticsearch.NewConfig()
5052
return basicStore{
51-
db: elasticsearch.NewDatabase(logger, config),
53+
db: elasticsearch.NewDatabase(logger, config, metricsPublisher),
5254
codec: elasticsearch.NewCodec(logger, config),
5355
backoff: config.Backoff,
5456
}

src/kafka/consumer_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@ func (be *fixtureEndpoints) Insert() endpoint.Endpoint {
5151
}
5252

5353
var (
54-
logger = logger_builder.NewLogger("consumer-test")
55-
config = elasticsearch.Config{
54+
metricsPublisher = metrics.NewMetricsPublisher()
55+
logger = logger_builder.NewLogger("consumer-test")
56+
config = elasticsearch.Config{
5657
Host: "http://localhost:9200",
5758
Index: fixtures.DefaultTopic,
5859
BulkTimeout: 10 * time.Second,
5960
}
60-
db = elasticsearch.NewDatabase(logger, config)
61+
db = elasticsearch.NewDatabase(logger, config, metricsPublisher)
6162
codec = elasticsearch.NewCodec(logger, config)
6263
service = fixtureService{db, codec}
6364
endpoints = &fixtureEndpoints{
@@ -90,7 +91,7 @@ func TestMain(m *testing.M) {
9091
BatchSize: 1,
9192
MetricsUpdateInterval: 30 * time.Second,
9293
}
93-
k = NewKafka("localhost:9092", consumer, metrics.NewMetricsPublisher())
94+
k = NewKafka("localhost:9092", consumer, metricsPublisher)
9495
retCode := m.Run()
9596
os.Exit(retCode)
9697
}

src/metrics/metrics.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ type metrics struct {
1818
recordsConsumed *kitprometheus.Counter
1919
endpointLatencyHistogram *kitprometheus.Summary
2020
bufferFullGauge *kitprometheus.Gauge
21+
elasticsearchRetries *kitprometheus.Counter
22+
elasticsearchConflicts *kitprometheus.Counter
23+
elasticsearchBadRequest *kitprometheus.Counter
2124
lock sync.RWMutex
2225
topicPartitionToOffset map[string]map[int32]int64
2326
}
@@ -69,12 +72,27 @@ func (m *metrics) BufferFull(full bool) {
6972
m.bufferFullGauge.Set(val)
7073
}
7174

75+
func (m *metrics) ElasticsearchRetries(count int) {
76+
m.elasticsearchRetries.Add(float64(count))
77+
}
78+
79+
func (m *metrics) ElasticsearchConflicts(count int) {
80+
m.elasticsearchConflicts.Add(float64(count))
81+
}
82+
83+
func (m *metrics) ElasticsearchBadRequests(count int) {
84+
m.elasticsearchBadRequest.Add(float64(count))
85+
}
86+
7287
type MetricsPublisher interface {
7388
PublishOffsetMetrics(highWaterMarks map[string]map[int32]int64)
7489
UpdateOffset(topic string, partition int32, delay int64)
7590
IncrementRecordsConsumed(count int)
7691
RecordEndpointLatency(latency float64)
7792
BufferFull(full bool)
93+
ElasticsearchRetries(count int)
94+
ElasticsearchConflicts(count int)
95+
ElasticsearchBadRequests(cont int)
7896
}
7997

8098
func NewMetricsPublisher() MetricsPublisher {
@@ -95,13 +113,28 @@ func NewMetricsPublisher() MetricsPublisher {
95113
Name: "kafka_consumer_buffer_full",
96114
Help: "Kafka consumer boolean indicating if app buffer is full",
97115
}, []string{})
116+
elasticsearchRetriesCounter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
117+
Name: "elasticsearch_events_retryed",
118+
Help: "number of events that needed to be retryed sending to Elasticsearch",
119+
}, []string{})
120+
elasticsearchConflictsCounter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
121+
Name: "elasticsearch_document_already_exists",
122+
Help: "number of events that tried to be inserted on elasticsearch but alredy existed",
123+
}, []string{})
124+
elasticsearchBadRequestCounter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
125+
Name: "elasticsearch_bad_request",
126+
Help: "the number of malformed events",
127+
}, []string{})
98128
return &metrics{
99129
logger: logger,
100130
partitionDelay: partitionDelay,
101131
recordsConsumed: recordsConsumed,
102132
endpointLatencyHistogram: endpointLatencySummary,
103133
bufferFullGauge: bufferFullGauge,
104134
lock: sync.RWMutex{},
105-
topicPartitionToOffset: make(map[string]map[int32]int64),
135+
elasticsearchRetries: elasticsearchRetriesCounter,
136+
elasticsearchConflicts: elasticsearchConflictsCounter,
137+
elasticsearchBadRequest: elasticsearchBadRequestCounter,
138+
topicPartitionToOffset: make(map[string]map[int32]int64),
106139
}
107140
}

0 commit comments

Comments
 (0)