diff --git a/devdocs/features.md b/devdocs/features.md index a4a7364d4..52e060838 100644 --- a/devdocs/features.md +++ b/devdocs/features.md @@ -12,5 +12,6 @@ | JsonRPC | Go | All | - | Yes | No | N/A | | GraphQL | All but Go | All | All | Yes | No | N/A | | Elasticsearch | All but Go | 7.14+ | /_search, /_msearch, /_bulk, /_doc | Yes | No | N/A | +| Opensearch | All but Go | 3.0.0+ | /_search, /_msearch, /_bulk, /_doc | Yes | No | N/A | | AWS S3 | All but Go | | CreateBucket, DeleteBucket, PutObject, DeleteObject, ListBuckets, ListObjects, GetObject | Yes | No | N/A | | AWS SQS | All but Go | | All | Yes | No | N/A | diff --git a/internal/test/integration/components/elasticsearch/main.py b/internal/test/integration/components/elasticsearch/main.py index 8bc1aa8bb..71f1db7ba 100644 --- a/internal/test/integration/components/elasticsearch/main.py +++ b/internal/test/integration/components/elasticsearch/main.py @@ -1,4 +1,4 @@ -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, Request import os import uvicorn import requests @@ -8,34 +8,44 @@ app = FastAPI() HEADERS = {'Content-Type': 'application/json'} -ELASTICSEARCH_HOST = "http://elasticsearchserver:9200" @app.get("/health") -async def health(): - HEALTH_URL = ELASTICSEARCH_HOST + "/_cluster/health" - +async def health(request: Request): + host_url = request.query_params.get("host_url") + if host_url is None: + raise HTTPException( + status_code=400, + detail="The 'host_url' query parameter is required." + ) + server_url = host_url + "/_cluster/health" try: - response = requests.get(HEALTH_URL, timeout=5) + response = requests.get(server_url, timeout=5) response.raise_for_status() status = response.json().get("status", "red") if status in ("red","yellow"): raise HTTPException( status_code=503, - detail={"status": "red","message": "Elasticsearch cluster unhealthy"}) - return {"status": status, "message": "Elasticsearch cluster healthy"} + detail={"status": "red","message": "Cluster unhealthy"}) + return {"status": status, "message": "Cluster healthy"} except requests.RequestException as e: raise HTTPException( status_code=503, - detail={"status": "error","message": f"Cannot reach Elasticsearch cluster: {str(e)}"}) + detail={"status": "error","message": f"Cannot reach Cluster: {str(e)}"}) @app.get("/doc") -async def doc(): - ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/test_index/_doc/1" +async def doc(request: Request): + host_url = request.query_params.get("host_url") + if host_url is None: + raise HTTPException( + status_code=400, + detail="The 'host_url' query parameter is required." + ) + server_url = host_url + "/test_index/_doc/1" try: - response = requests.get(ELASTICSEARCH_URL, headers=HEADERS) + response = requests.get(server_url, headers=HEADERS) except Exception as e: print(json.dumps({"error": str(e)})) @@ -44,8 +54,14 @@ async def doc(): @app.get("/search") -async def search(): - ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/test_index/_search" +async def search(request: Request): + host_url = request.query_params.get("host_url") + if host_url is None: + raise HTTPException( + status_code=400, + detail="The 'host_url' query parameter is required." + ) + server_url = host_url + "/test_index/_search" query_body = { "query": { "match": { @@ -54,7 +70,7 @@ async def search(): } } try: - response = requests.post(ELASTICSEARCH_URL, json=query_body, headers=HEADERS) + response = requests.post(server_url, json=query_body, headers=HEADERS) except Exception as e: print(json.dumps({"error": str(e)})) @@ -62,8 +78,14 @@ async def search(): return {"status": "OK"} @app.get("/msearch") -async def msearch(): - ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_msearch" +async def msearch(request: Request): + host_url = request.query_params.get("host_url") + if host_url is None: + raise HTTPException( + status_code=400, + detail="The 'host_url' query parameter is required." + ) + server_url = host_url + "/_msearch" searches = [ {}, { @@ -83,7 +105,7 @@ async def msearch(): } ] try: - response = requests.post(ELASTICSEARCH_URL, json=searches, headers=HEADERS) + response = requests.post(server_url, json=searches, headers=HEADERS) except Exception as e: print(json.dumps({"error": str(e)})) @@ -92,8 +114,14 @@ async def msearch(): @app.get("/bulk") -async def bulk(): - ELASTICSEARCH_URL = ELASTICSEARCH_HOST + "/_bulk" +async def bulk(request: Request): + host_url = request.query_params.get("host_url") + if host_url is None: + raise HTTPException( + status_code=400, + detail="The 'host_url' query parameter is required." + ) + server_url = host_url + "/_bulk" actions=[ { "index": { @@ -132,7 +160,7 @@ async def bulk(): } ] try: - response = requests.post(ELASTICSEARCH_URL, json=actions, headers=HEADERS) + response = requests.post(server_url, json=actions, headers=HEADERS) except Exception as e: print(json.dumps({"error": str(e)})) diff --git a/internal/test/integration/docker-compose-elasticsearch.yml b/internal/test/integration/docker-compose-elasticsearch.yml index d2df32345..6534b8cde 100644 --- a/internal/test/integration/docker-compose-elasticsearch.yml +++ b/internal/test/integration/docker-compose-elasticsearch.yml @@ -6,7 +6,33 @@ services: - "9200:9200" environment: - discovery.type=single-node - - xpack.security.enabled=false # Only for local testing + - xpack.security.enabled=false + deploy: + resources: + limits: + cpus: '1.5' + memory: 2G + reservations: + cpus: '0.25' + memory: 1G + + opensearchserver: + image: opensearchproject/opensearch:3.3.1@sha256:773b05b026c01e7d520f6b857f486b7312d9290e14f2aebe4630eb80787e9eb3 + container_name: opensearch-server + ports: + - "9201:9200" + environment: + - discovery.type=single-node + - DISABLE_SECURITY_PLUGIN=true + - DISABLE_INSTALL_DEMO_CONFIG=true + deploy: + resources: + limits: + cpus: '1.5' + memory: 2G + reservations: + cpus: '0.25' + memory: 1G testserver: build: @@ -20,6 +46,8 @@ services: condition: service_started elasticsearchserver: condition: service_started + opensearchserver: + condition: service_started obi: build: diff --git a/internal/test/integration/suites_test.go b/internal/test/integration/suites_test.go index 71fbfeaa7..521b684a4 100644 --- a/internal/test/integration/suites_test.go +++ b/internal/test/integration/suites_test.go @@ -518,7 +518,12 @@ func TestSuite_PythonElasticsearch(t *testing.T) { compose.Env = append(compose.Env, `OTEL_EBPF_OPEN_PORT=8080`, `OTEL_EBPF_EXECUTABLE_PATH=`, `TEST_SERVICE_PORTS=8381:8080`) require.NoError(t, compose.Up()) - t.Run("Python Elasticsearch", testPythonElasticsearch) + t.Run("Python Elasticsearch", func(t *testing.T) { + testPythonElasticsearch(t, "elasticsearch") + }) + t.Run("Python Opensearch", func(t *testing.T) { + testPythonElasticsearch(t, "opensearch") + }) require.NoError(t, compose.Close()) } diff --git a/internal/test/integration/test_python_elasticsearchclient.go b/internal/test/integration/test_python_elasticsearchclient.go index 562bbd6ad..a861b1f26 100644 --- a/internal/test/integration/test_python_elasticsearchclient.go +++ b/internal/test/integration/test_python_elasticsearchclient.go @@ -21,28 +21,38 @@ import ( ti "go.opentelemetry.io/obi/pkg/test/integration" ) -func testPythonElasticsearch(t *testing.T) { - url := "http://localhost:8381" - comm := "python3.12" - index := "test_index" +const ( + comm = "python3.12" + testIndex = "test_index" + testServerURL = "http://localhost:8381" +) - waitForTestComponentsRoute(t, url, "/health") - testElasticsearchSearch(t, comm, url, index) +func testPythonElasticsearch(t *testing.T, dbSystemName string) { + var url string + switch dbSystemName { + case "elasticsearch": + url = "http://elasticsearchserver:9200" + case "opensearch": + url = "http://opensearchserver:9200" + } + queryParam := "?host_url=" + url + waitForTestComponentsNoMetrics(t, testServerURL+"/health"+queryParam) + testElasticsearchSearch(t, dbSystemName, queryParam) // populate the server is optional, the elasticsearch request will fail // but we will have the span - testElasticsearchMsearch(t, comm, url) - testElasticsearchBulk(t, comm, url) - testElasticsearchDoc(t, comm, url, index) + testElasticsearchMsearch(t, dbSystemName, queryParam) + testElasticsearchBulk(t, dbSystemName, queryParam) + testElasticsearchDoc(t, dbSystemName, queryParam) } -func testElasticsearchSearch(t *testing.T, comm, url, index string) { +func testElasticsearchSearch(t *testing.T, dbSystemName, queryParam string) { queryText := "{\"query\": {\"match\": {\"name\": \"OBI\"}}}" urlPath := "/search" - ti.DoHTTPGet(t, url+urlPath, 200) - assertElasticsearchOperation(t, comm, "search", queryText, index) + ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200) + assertElasticsearchOperation(t, dbSystemName, "search", queryText, testIndex) } -func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index string) { +func assertElasticsearchOperation(t *testing.T, dbSystemName, op, queryText, index string) { params := neturl.Values{} params.Add("service", comm) var operationName string @@ -85,7 +95,7 @@ func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index strin tag, found = jaeger.FindIn(span.Tags, "db.system.name") assert.True(t, found) - assert.Equal(t, "elasticsearch", tag.Value) + assert.Equal(t, dbSystemName, tag.Value) tag, found = jaeger.FindIn(span.Tags, "elasticsearch.node.name") assert.True(t, found) @@ -93,23 +103,23 @@ func assertElasticsearchOperation(t *testing.T, comm, op, queryText, index strin }, test.Interval(100*time.Millisecond)) } -func testElasticsearchMsearch(t *testing.T, comm, url string) { +func testElasticsearchMsearch(t *testing.T, dbSystemName, queryParam string) { queryText := "[{}, {\"query\": {\"match\": {\"message\": \"this is a test\"}}}, {\"index\": \"my-index-000002\"}, {\"query\": {\"match_all\": {}}}]" urlPath := "/msearch" - ti.DoHTTPGet(t, url+urlPath, 200) - assertElasticsearchOperation(t, comm, "msearch", queryText, "") + ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200) + assertElasticsearchOperation(t, dbSystemName, "msearch", queryText, "") } -func testElasticsearchBulk(t *testing.T, comm, url string) { +func testElasticsearchBulk(t *testing.T, dbSystemName, queryParam string) { queryText := "[{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}, {\"field1\": \"value1\"}, {\"delete\": {\"_index\": \"test\", \"_id\": \"2\"}}, {\"create\": {\"_index\": \"test\", \"_id\": \"3\"}}, {\"field1\": \"value3\"}, {\"update\": {\"_id\": \"1\", \"_index\": \"test\"}}, {\"doc\": {\"field2\": \"value2\"}}]" urlPath := "/bulk" - ti.DoHTTPGet(t, url+urlPath, 200) - assertElasticsearchOperation(t, comm, "bulk", queryText, "") + ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200) + assertElasticsearchOperation(t, dbSystemName, "bulk", queryText, "") } -func testElasticsearchDoc(t *testing.T, comm, url, index string) { +func testElasticsearchDoc(t *testing.T, dbSystemName, queryParam string) { queryText := "" urlPath := "/doc" - ti.DoHTTPGet(t, url+urlPath, 200) - assertElasticsearchOperation(t, comm, "doc", queryText, index) + ti.DoHTTPGet(t, testServerURL+urlPath+queryParam, 200) + assertElasticsearchOperation(t, dbSystemName, "doc", queryText, testIndex) } diff --git a/pkg/appolly/app/request/span.go b/pkg/appolly/app/request/span.go index abf129ede..d797d204f 100644 --- a/pkg/appolly/app/request/span.go +++ b/pkg/appolly/app/request/span.go @@ -181,6 +181,7 @@ type Elasticsearch struct { NodeName string `json:"nodeName"` DBOperationName string `json:"dbOperationName"` DBQueryText string `json:"dbQueryText"` + DBSystemName string `json:"dbSystemName"` } type AWS struct { @@ -304,6 +305,7 @@ func spanAttributes(s *Span) SpanAttributes { attrs["nodeName"] = s.Elasticsearch.NodeName attrs["dbOperationName"] = s.Elasticsearch.DBOperationName attrs["dbQueryText"] = s.Elasticsearch.DBQueryText + attrs["dbSystemName"] = s.Elasticsearch.DBSystemName } if s.SubType == HTTPSubtypeAWSS3 && s.AWS != nil { s3 := s.AWS.S3 diff --git a/pkg/appolly/app/request/span_getters.go b/pkg/appolly/app/request/span_getters.go index 1ae282454..465269ed6 100644 --- a/pkg/appolly/app/request/span_getters.go +++ b/pkg/appolly/app/request/span_getters.go @@ -106,7 +106,7 @@ func spanOTELGetters(name attr.Name) (attributes.Getter[*Span, attribute.KeyValu return DBSystemName(semconv.DBSystemMongoDB.Value.AsString()) case EventTypeHTTPClient: if span.SubType == HTTPSubtypeElasticsearch { - return DBSystemName(semconv.DBSystemElasticsearch.Value.AsString()) + return DBSystemName(span.Elasticsearch.DBSystemName) } } return DBSystemName("unknown") diff --git a/pkg/ebpf/common/http/elasticsearch.go b/pkg/ebpf/common/http/elasticsearch.go index e6c93cc12..aea658316 100644 --- a/pkg/ebpf/common/http/elasticsearch.go +++ b/pkg/ebpf/common/http/elasticsearch.go @@ -20,6 +20,7 @@ type elasticsearchOperation struct { NodeName string DBQueryText string DBCollectionName string + DBSytemName string } var elasticsearchOperationMethods = map[string]map[string]struct{}{ @@ -37,7 +38,8 @@ var elasticsearchOperationMethods = map[string]map[string]struct{}{ } func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Response) (request.Span, bool) { - if !isElasticsearchResponse(resp) { + dbSystemName := elasticsearchSystemName(resp) + if dbSystemName == "" { return *baseSpan, false } @@ -66,10 +68,20 @@ func ElasticsearchSpan(baseSpan *request.Span, req *http.Request, resp *http.Res DBOperationName: operationName, DBCollectionName: op.DBCollectionName, DBQueryText: op.DBQueryText, + DBSystemName: dbSystemName, } return *baseSpan, true } +func elasticsearchSystemName(resp *http.Response) string { + if isElasticsearchResponse(resp) { + return "elasticsearch" + } else if isOpensearchResponse(resp) { + return "opensearch" + } + return "" +} + func parseElasticsearchRequest(req *http.Request) (elasticsearchOperation, error) { var op elasticsearchOperation reqB, err := io.ReadAll(req.Body) @@ -104,7 +116,16 @@ func isElasticsearchResponse(resp *http.Response) bool { return headerValue == expectedValue } -// extractElasticsearchOperationName is a generic function used to extract the operation name +// isOpensearchResponse checks if X-Opensearch-Version HTTP header is present. +// Note: this header should be present from release 3.0.0 +// https://github.com/opensearch-project/OpenSearch/blob/dc4efa821904cc2d7ea7ef61c0f577d3fc0d8be9/server/src/main/java/org/opensearch/http/DefaultRestChannel.java#L73 +func isOpensearchResponse(resp *http.Response) bool { + headerValue := resp.Header.Get("X-OpenSearch-Version") + expectedValue := "OpenSearch/" + return strings.Contains(headerValue, expectedValue) +} + +// extractOperationName is a generic function used to extract the operation name // that is the endpoint identifier provided in the request // we can have different operations where the name of the operation is found in // the last or second to last part of the url diff --git a/pkg/export/otel/tracesgen/tracesgen.go b/pkg/export/otel/tracesgen/tracesgen.go index 29fe2dddd..f2454deff 100644 --- a/pkg/export/otel/tracesgen/tracesgen.go +++ b/pkg/export/otel/tracesgen/tracesgen.go @@ -356,7 +356,7 @@ func TraceAttributesSelector(span *request.Span, optionalAttrs map[attr.Name]str attrs = append(attrs, request.DBQueryText(span.Elasticsearch.DBQueryText)) } attrs = append(attrs, request.DBOperationName(span.Elasticsearch.DBOperationName)) - attrs = append(attrs, request.DBSystemName(semconv.DBSystemElasticsearch.Value.AsString())) + attrs = append(attrs, request.DBSystemName(span.Elasticsearch.DBSystemName)) attrs = append(attrs, request.ErrorType(span.DBError.ErrorCode)) }