Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions devdocs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
| JsonRPC | Go | All | - | Yes | No | N/A |
| GraphQL | All but Go | All | All | Yes | No | N/A |
| Elasticsearch | All but Go | 7.14+ | /_search, /_msearch, /_bulk, /_doc | Yes | No | N/A |
| Opensearch | All but Go | 3.0.0+ | /_search, /_msearch, /_bulk, /_doc | Yes | No | N/A |
| AWS S3 | All but Go | | CreateBucket, DeleteBucket, PutObject, DeleteObject, ListBuckets, ListObjects, GetObject | Yes | No | N/A |
| AWS SQS | All but Go | | All | Yes | No | N/A |
70 changes: 49 additions & 21 deletions internal/test/integration/components/elasticsearch/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fastapi import FastAPI, HTTPException
from fastapi import FastAPI, HTTPException, Request
import os
import uvicorn
import requests
Expand All @@ -8,34 +8,44 @@

app = FastAPI()
HEADERS = {'Content-Type': 'application/json'}
ELASTICSEARCH_HOST = "http://elasticsearchserver:9200"

@app.get("/health")
async def health():
HEALTH_URL = ELASTICSEARCH_HOST + "/_cluster/health"

async def health(request: Request):
host_url = request.query_params.get("host_url")
if host_url is None:
raise HTTPException(
status_code=400,
detail="The 'host_url' query parameter is required."
)
server_url = host_url + "/_cluster/health"
try:
response = requests.get(HEALTH_URL, timeout=5)
response = requests.get(server_url, timeout=5)
response.raise_for_status()
status = response.json().get("status", "red")

if status in ("red","yellow"):
raise HTTPException(
status_code=503,
detail={"status": "red","message": "Elasticsearch cluster unhealthy"})
return {"status": status, "message": "Elasticsearch cluster healthy"}
detail={"status": "red","message": "Cluster unhealthy"})
return {"status": status, "message": "Cluster healthy"}

except requests.RequestException as e:
raise HTTPException(
status_code=503,
detail={"status": "error","message": f"Cannot reach Elasticsearch cluster: {str(e)}"})
detail={"status": "error","message": f"Cannot reach Cluster: {str(e)}"})

@app.get("/doc")
async def doc():
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/test_index/_doc/1"
async def doc(request: Request):
host_url = request.query_params.get("host_url")
if host_url is None:
raise HTTPException(
status_code=400,
detail="The 'host_url' query parameter is required."
)
server_url = host_url + "/test_index/_doc/1"

try:
response = requests.get(ELASTICSEARCH_URL, headers=HEADERS)
response = requests.get(server_url, headers=HEADERS)

except Exception as e:
print(json.dumps({"error": str(e)}))
Expand All @@ -44,8 +54,14 @@ async def doc():


@app.get("/search")
async def search():
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/test_index/_search"
async def search(request: Request):
host_url = request.query_params.get("host_url")
if host_url is None:
raise HTTPException(
status_code=400,
detail="The 'host_url' query parameter is required."
)
server_url = host_url + "/test_index/_search"
query_body = {
"query": {
"match": {
Expand All @@ -54,16 +70,22 @@ async def search():
}
}
try:
response = requests.post(ELASTICSEARCH_URL, json=query_body, headers=HEADERS)
response = requests.post(server_url, json=query_body, headers=HEADERS)

except Exception as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)
return {"status": "OK"}

@app.get("/msearch")
async def msearch():
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_msearch"
async def msearch(request: Request):
host_url = request.query_params.get("host_url")
if host_url is None:
raise HTTPException(
status_code=400,
detail="The 'host_url' query parameter is required."
)
server_url = host_url + "/_msearch"
searches = [
{},
{
Expand All @@ -83,7 +105,7 @@ async def msearch():
}
]
try:
response = requests.post(ELASTICSEARCH_URL, json=searches, headers=HEADERS)
response = requests.post(server_url, json=searches, headers=HEADERS)

except Exception as e:
print(json.dumps({"error": str(e)}))
Expand All @@ -92,8 +114,14 @@ async def msearch():


@app.get("/bulk")
async def bulk():
ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_bulk"
async def bulk(request: Request):
host_url = request.query_params.get("host_url")
if host_url is None:
raise HTTPException(
status_code=400,
detail="The 'host_url' query parameter is required."
)
server_url = host_url + "/_bulk"
actions=[
{
"index": {
Expand Down Expand Up @@ -132,7 +160,7 @@ async def bulk():
}
]
try:
response = requests.post(ELASTICSEARCH_URL, json=actions, headers=HEADERS)
response = requests.post(server_url, json=actions, headers=HEADERS)

except Exception as e:
print(json.dumps({"error": str(e)}))
Expand Down
30 changes: 29 additions & 1 deletion internal/test/integration/docker-compose-elasticsearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,33 @@ services:
- "9200:9200"
environment:
- discovery.type=single-node
- xpack.security.enabled=false # Only for local testing
- xpack.security.enabled=false
deploy:
resources:
limits:
cpus: '1.5'
memory: 2G
reservations:
cpus: '0.25'
memory: 1G

opensearchserver:
image: opensearchproject/opensearch:3.3.1@sha256:773b05b026c01e7d520f6b857f486b7312d9290e14f2aebe4630eb80787e9eb3
container_name: opensearch-server
ports:
- "9201:9200"
environment:
- discovery.type=single-node
- DISABLE_SECURITY_PLUGIN=true
- DISABLE_INSTALL_DEMO_CONFIG=true
deploy:
resources:
limits:
cpus: '1.5'
memory: 2G
reservations:
cpus: '0.25'
memory: 1G

testserver:
build:
Expand All @@ -20,6 +46,8 @@ services:
condition: service_started
elasticsearchserver:
condition: service_started
opensearchserver:
condition: service_started

obi:
build:
Expand Down
7 changes: 6 additions & 1 deletion internal/test/integration/suites_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,12 @@ func TestSuite_PythonElasticsearch(t *testing.T) {

compose.Env = append(compose.Env, `OTEL_EBPF_OPEN_PORT=8080`, `OTEL_EBPF_EXECUTABLE_PATH=`, `TEST_SERVICE_PORTS=8381:8080`)
require.NoError(t, compose.Up())
t.Run("Python Elasticsearch", testPythonElasticsearch)
t.Run("Python Elasticsearch", func(t *testing.T) {
testPythonElasticsearch(t, "elasticsearch")
})
t.Run("Python Opensearch", func(t *testing.T) {
testPythonElasticsearch(t, "opensearch")
})
require.NoError(t, compose.Close())
}

Expand Down
56 changes: 33 additions & 23 deletions internal/test/integration/test_python_elasticsearchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,38 @@ import (
ti "go.opentelemetry.io/obi/pkg/test/integration"
)

func testPythonElasticsearch(t *testing.T) {
url := "http://localhost:8381"
comm := "python3.12"
index := "test_index"
const (
comm = "python3.12"
testIndex = "test_index"
testServerURL = "http://localhost:8381"
)

waitForTestComponentsRoute(t, url, "/health")
testElasticsearchSearch(t, comm, url, index)
func testPythonElasticsearch(t *testing.T, dbSystemName string) {
var url string
switch dbSystemName {
case "elasticsearch":
url = "http://elasticsearchserver:9200"
case "opensearch":
url = "http://opensearchserver:9200"
}
queryParam := "?host_url=" + url
waitForTestComponentsNoMetrics(t, testServerURL+"/health"+queryParam)
testElasticsearchSearch(t, dbSystemName, queryParam)
// populate the server is optional, the elasticsearch request will fail
// but we will have the span
testElasticsearchMsearch(t, comm, url)
testElasticsearchBulk(t, comm, url)
testElasticsearchDoc(t, comm, url, index)
testElasticsearchMsearch(t, dbSystemName, queryParam)
testElasticsearchBulk(t, dbSystemName, queryParam)
testElasticsearchDoc(t, dbSystemName, queryParam)
}

func testElasticsearchSearch(t *testing.T, comm, url, index string) {
func testElasticsearchSearch(t *testing.T, dbSystemName, queryParam string) {
queryText := "{\"query\": {\"match\": {\"name\": \"OBI\"}}}"
urlPath := "/search"
ti.DoHTTPGet(t, url+urlPath, 200)
assertElasticsearchOperation(t, comm, "search", queryText, index)
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
assertElasticsearchOperation(t, dbSystemName, "search", queryText, testIndex)
}

func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index string) {
func assertElasticsearchOperation(t *testing.T, dbSystemName, op, queryText, index string) {
params := neturl.Values{}
params.Add("service", comm)
var operationName string
Expand Down Expand Up @@ -85,31 +95,31 @@ func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index strin

tag, found = jaeger.FindIn(span.Tags, "db.system.name")
assert.True(t, found)
assert.Equal(t, "elasticsearch", tag.Value)
assert.Equal(t, dbSystemName, tag.Value)

tag, found = jaeger.FindIn(span.Tags, "elasticsearch.node.name")
assert.True(t, found)
assert.Empty(t, tag.Value)
}, test.Interval(100*time.Millisecond))
}

func testElasticsearchMsearch(t *testing.T, comm, url string) {
func testElasticsearchMsearch(t *testing.T, dbSystemName, queryParam string) {
queryText := "[{}, {\"query\": {\"match\": {\"message\": \"this is a test\"}}}, {\"index\": \"my-index-000002\"}, {\"query\": {\"match_all\": {}}}]"
urlPath := "/msearch"
ti.DoHTTPGet(t, url+urlPath, 200)
assertElasticsearchOperation(t, comm, "msearch", queryText, "")
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
assertElasticsearchOperation(t, dbSystemName, "msearch", queryText, "")
}

func testElasticsearchBulk(t *testing.T, comm, url string) {
func testElasticsearchBulk(t *testing.T, dbSystemName, queryParam string) {
queryText := "[{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}, {\"field1\": \"value1\"}, {\"delete\": {\"_index\": \"test\", \"_id\": \"2\"}}, {\"create\": {\"_index\": \"test\", \"_id\": \"3\"}}, {\"field1\": \"value3\"}, {\"update\": {\"_id\": \"1\", \"_index\": \"test\"}}, {\"doc\": {\"field2\": \"value2\"}}]"
urlPath := "/bulk"
ti.DoHTTPGet(t, url+urlPath, 200)
assertElasticsearchOperation(t, comm, "bulk", queryText, "")
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
assertElasticsearchOperation(t, dbSystemName, "bulk", queryText, "")
}

func testElasticsearchDoc(t *testing.T, comm, url, index string) {
func testElasticsearchDoc(t *testing.T, dbSystemName, queryParam string) {
queryText := ""
urlPath := "/doc"
ti.DoHTTPGet(t, url+urlPath, 200)
assertElasticsearchOperation(t, comm, "doc", queryText, index)
ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200)
assertElasticsearchOperation(t, dbSystemName, "doc", queryText, testIndex)
}
2 changes: 2 additions & 0 deletions pkg/appolly/app/request/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ type Elasticsearch struct {
NodeName string `json:"nodeName"`
DBOperationName string `json:"dbOperationName"`
DBQueryText string `json:"dbQueryText"`
DBSystemName string `json:"dbSystemName"`
}

type AWS struct {
Expand Down Expand Up @@ -304,6 +305,7 @@ func spanAttributes(s *Span) SpanAttributes {
attrs["nodeName"] = s.Elasticsearch.NodeName
attrs["dbOperationName"] = s.Elasticsearch.DBOperationName
attrs["dbQueryText"] = s.Elasticsearch.DBQueryText
attrs["dbSystemName"] = s.Elasticsearch.DBSystemName
}
if s.SubType == HTTPSubtypeAWSS3 && s.AWS != nil {
s3 := s.AWS.S3
Expand Down
2 changes: 1 addition & 1 deletion pkg/appolly/app/request/span_getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func spanOTELGetters(name attr.Name) (attributes.Getter[*Span, attribute.KeyValu
return DBSystemName(semconv.DBSystemMongoDB.Value.AsString())
case EventTypeHTTPClient:
if span.SubType == HTTPSubtypeElasticsearch {
return DBSystemName(semconv.DBSystemElasticsearch.Value.AsString())
return DBSystemName(span.Elasticsearch.DBSystemName)
}
}
return DBSystemName("unknown")
Expand Down
25 changes: 23 additions & 2 deletions pkg/ebpf/common/http/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type elasticsearchOperation struct {
NodeName string
DBQueryText string
DBCollectionName string
DBSytemName string
}

var elasticsearchOperationMethods = map[string]map[string]struct{}{
Expand All @@ -37,7 +38,8 @@ var elasticsearchOperationMethods = map[string]map[string]struct{}{
}

func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Response) (request.Span, bool) {
if !isElasticsearchResponse(resp) {
dbSystemName := elasticsearchSystemName(resp)
if dbSystemName == "" {
return *baseSpan, false
}

Expand Down Expand Up @@ -66,10 +68,20 @@ func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Res
DBOperationName: operationName,
DBCollectionName: op.DBCollectionName,
DBQueryText: op.DBQueryText,
DBSystemName: dbSystemName,
}
return *baseSpan, true
}

func elasticsearchSystemName(resp *http.Response) string {
if isElasticsearchResponse(resp) {
return "elasticsearch"
} else if isOpensearchResponse(resp) {
return "opensearch"
}
return ""
}

func parseElasticsearchRequest(req *http.Request) (elasticsearchOperation, error) {
var op elasticsearchOperation
reqB, err := io.ReadAll(req.Body)
Expand Down Expand Up @@ -104,7 +116,16 @@ func isElasticsearchResponse(resp *http.Response) bool {
return headerValue == expectedValue
}

// extractElasticsearchOperationName is a generic function used to extract the operation name
// isOpensearchResponse checks if X-Opensearch-Version HTTP header is present.
// Note: this header should be present from release 3.0.0
// https://github.com/opensearch-project/OpenSearch/blob/dc4efa821904cc2d7ea7ef61c0f577d3fc0d8be9/server/src/main/java/org/opensearch/http/DefaultRestChannel.java#L73
func isOpensearchResponse(resp *http.Response) bool {
headerValue := resp.Header.Get("X-OpenSearch-Version")
expectedValue := "OpenSearch/"
return strings.Contains(headerValue, expectedValue)
}

// extractOperationName is a generic function used to extract the operation name
// that is the endpoint identifier provided in the request
// we can have different operations where the name of the operation is found in
// the last or second to last part of the url
Expand Down
2 changes: 1 addition & 1 deletion pkg/export/otel/tracesgen/tracesgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func TraceAttributesSelector(span *request.Span, optionalAttrs map[attr.Name]str
attrs = append(attrs, request.DBQueryText(span.Elasticsearch.DBQueryText))
}
attrs = append(attrs, request.DBOperationName(span.Elasticsearch.DBOperationName))
attrs = append(attrs, request.DBSystemName(semconv.DBSystemElasticsearch.Value.AsString()))
attrs = append(attrs, request.DBSystemName(span.Elasticsearch.DBSystemName))
attrs = append(attrs, request.ErrorType(span.DBError.ErrorCode))
}

Expand Down
Loading