Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 3c421b4

Browse files
authored
Merge pull request #41 from inloco/feat/elasticsearch-index-prefix
feat(es): allow client to add common prefixes to created ES indices
2 parents 9821220 + eb892e0 commit 3c421b4

File tree

4 files changed

+22
-15
lines changed

4 files changed

+22
-15
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ compile-binary:
66

77
docker/build: compile-binary
88
GOOS=linux GOARCH=386 go build -o bin/producer util/producer/producer.go
9-
docker build --rm=false -t "inlocomedia/kafka-elasticsearch-injector:local" -f cmd/Dockerfile .
10-
docker build --rm=false -t "inlocomedia/kafka-elasticsearch-injector:producer-local" -f util/producer/Dockerfile .
9+
docker build --rm=false -t "quay.io/inloco/kafka-elasticsearch-injector:local" -f cmd/Dockerfile .
10+
docker build --rm=false -t "quay.io/inloco/kafka-elasticsearch-injector:producer-local" -f util/producer/Dockerfile .
1111

1212
docker/run:
1313
docker-compose up -d zookeeper kafka schema-registry elasticsearch kibana

README.md

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,29 @@ To create new injectors for your topics, you should create a new kubernetes depl
1414
### Configuration variables
1515
- `KAFKA_ADDRESS` Kafka url. **REQUIRED**
1616
- `SCHEMA_REGISTRY_URL` Schema registry url port and protocol. **REQUIRED**
17-
- `KAFKA_TOPICS` Comma separated list of kafka topics to subscribe **REQUIRED**
17+
- `KAFKA_TOPICS` Comma separated list of Kafka topics to subscribe **REQUIRED**
1818
- `KAFKA_CONSUMER_GROUP` Consumer group id, should be unique across the cluster. Please be careful with this variable **REQUIRED**
1919
- `ELASTICSEARCH_HOST` Elasticsearch url with port and protocol. **REQUIRED**
20-
- `ES_INDEX` Elasticsearch index prefix to write records to(actual index is followed by the record's timestamp to avoid very large indexes). Defaults to topic name. **OPTIONAL**
20+
- `ES_INDEX` Elasticsearch index to write records to (actual index is followed by the record's timestamp to avoid very large indexes). Defaults to topic name. **OPTIONAL**
21+
- `ES_INDEX_PREFIX` Prefix that will be added to every Elasticsearch index. Defaults to an empty string. **OPTIONAL**
2122
- `PROBES_PORT` Kubernetes probes port. Set to any available port. **REQUIRED**
2223
- `K8S_LIVENESS_ROUTE` Kubernetes route for liveness check. **REQUIRED**
2324
- `K8S_READINESS_ROUTE`Kubernetes route for readiness check. **REQUIRED**
2425
- `ELASTICSEARCH_USER` Elasticsearch user. **OPTIONAL**
2526
- `ELASTICSEARCH_PASSWORD` Elasticsearch password. **OPTIONAL**
26-
- `ELASTICSEARCH_SCHEME` scheme to be used when connecting to elasticsearch(http or https). Defaults to http. **OPTIONAL**
27-
- `ELASTICSEARCH_IGNORE_CERT` if set to "true", ignores certificates when connecting to a secure elasticsearch cluster. Defaults to false. **OPTIONAL**
27+
- `ELASTICSEARCH_SCHEME` scheme to be used when connecting to Elasticsearch (http or https). Defaults to http. **OPTIONAL**
28+
- `ELASTICSEARCH_IGNORE_CERT` if set to "true", ignores certificates when connecting to a secure Elasticsearch cluster. Defaults to false. **OPTIONAL**
2829
- `ELASTICSEARCH_DISABLE_SNIFFING` if set to "true", the client will not sniff Elasticsearch nodes during the node discovery process. Defaults to false. **OPTIONAL**
2930
- `KAFKA_CONSUMER_CONCURRENCY` Number of parallel goroutines working as a consumer. Default value is 1 **OPTIONAL**
30-
- `KAFKA_CONSUMER_BATCH_SIZE` Number of records to accumulate before sending them to elasticsearch(for each goroutine). Default value is 100 **OPTIONAL**
31+
- `KAFKA_CONSUMER_BATCH_SIZE` Number of records to accumulate before sending them to Elasticsearch (for each goroutine). Default value is 100 **OPTIONAL**
3132
- `ES_INDEX_COLUMN` Record field to append to index name. Ex: to create one ES index per campaign, use "campaign_id" here **OPTIONAL**
32-
- `ES_BLACKLISTED_COLUMNS` Comma separated list of record fields to filter before sending to elasticsearch. Defaults to empty string. **OPTIONAL**
33+
- `ES_BLACKLISTED_COLUMNS` Comma separated list of record fields to filter before sending to Elasticsearch. Defaults to empty string. **OPTIONAL**
3334
- `ES_DOC_ID_COLUMN` Record field to be the document ID of Elasticsearch. Defaults to "kafkaRecordPartition:kafkaRecordOffset". **OPTIONAL**
3435
- `LOG_LEVEL` Determines the log level for the app. Should be set to DEBUG, WARN, NONE or INFO. Defaults to INFO. **OPTIONAL**
3536
- `METRICS_PORT` Port to export app metrics **REQUIRED**
36-
- `ES_BULK_TIMEOUT` Timeout for elasticsearch bulk writes in the format of golang's `time.ParseDuration`. Default value is 1s **OPTIONAL**
37-
- `ES_BULK_BACKOFF` Constant backoff when elasticsearch is overloaded. in the format of golang's `time.ParseDuration`. Default value is 1s **OPTIONAL**
38-
- `ES_TIME_SUFFIX` Indicates what time unit to append to index names on elasticsearch. Supported values are `day` and `hour`. Default value is `day` **OPTIONAL**
37+
- `ES_BULK_TIMEOUT` Timeout for Elasticsearch bulk writes in the format of golang's `time.ParseDuration`. Default value is 1s **OPTIONAL**
38+
- `ES_BULK_BACKOFF` Constant backoff when Elasticsearch is overloaded. in the format of golang's `time.ParseDuration`. Default value is 1s **OPTIONAL**
39+
- `ES_TIME_SUFFIX` Indicates what time unit to append to index names on Elasticsearch. Supported values are `day` and `hour`. Default value is `day` **OPTIONAL**
3940
- `KAFKA_CONSUMER_RECORD_TYPE` Kafka record type. Should be set to "avro" or "json". Defaults to avro. **OPTIONAL**
4041
- `KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL` The interval which the app updates the exported metrics in the format of golang's `time.ParseDuration`. Defaults to 30s. **OPTIONAL**
4142

src/elasticsearch/codec.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ func (c basicCodec) EncodeElasticRecords(records []*models.Record) ([]*models.El
4848
}
4949

5050
func (c basicCodec) getDatabaseIndex(record *models.Record) (string, error) {
51-
indexPrefix := c.config.Index
52-
if indexPrefix == "" {
53-
indexPrefix = record.Topic
51+
indexName := c.config.Index
52+
if indexName == "" {
53+
indexName = record.Topic
5454
}
5555

5656
indexColumn := c.config.IndexColumn
@@ -67,7 +67,11 @@ func (c basicCodec) getDatabaseIndex(record *models.Record) (string, error) {
6767
indexSuffix = newIndexSuffix
6868
}
6969

70-
return fmt.Sprintf("%s-%s", indexPrefix, indexSuffix), nil
70+
return fmt.Sprintf(
71+
"%s%s-%s",
72+
c.config.IndexPrefix,
73+
indexName,
74+
indexSuffix), nil
7175
}
7276

7377
func (c basicCodec) getDatabaseDocID(record *models.Record) (string, error) {

src/elasticsearch/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Config struct {
2121
IgnoreCertificate bool
2222
Scheme string
2323
Index string
24+
IndexPrefix string
2425
IndexColumn string
2526
DocIDColumn string
2627
BlacklistedColumns []string
@@ -85,6 +86,7 @@ func NewConfig() Config {
8586
IgnoreCertificate: ignoreCert,
8687
Scheme: scheme,
8788
Index: os.Getenv("ES_INDEX"),
89+
IndexPrefix: os.Getenv("ES_INDEX_PREFIX"),
8890
IndexColumn: os.Getenv("ES_INDEX_COLUMN"),
8991
DocIDColumn: os.Getenv("ES_DOC_ID_COLUMN"),
9092
BlacklistedColumns: strings.Split(os.Getenv("ES_BLACKLISTED_COLUMNS"), ","),

0 commit comments

Comments
 (0)