Skip to content

Commit eb4af08

Browse files
Merge branch 'main' into add-opensearch
Update with changes from main
2 parents 6a9b666 + 0cad96c commit eb4af08

File tree

16 files changed

+412
-45
lines changed

16 files changed

+412
-45
lines changed

.github/workflows/cicd.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
services:
1717

1818
elasticsearch_8_svc:
19-
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.4
19+
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
2020
env:
2121
cluster.name: stac-cluster
2222
node.name: es01
@@ -58,7 +58,7 @@ jobs:
5858

5959
# Setup Python (faster than using Python container)
6060
- name: Setup Python
61-
uses: actions/setup-python@v4
61+
uses: actions/setup-python@v5
6262
with:
6363
python-version: ${{ matrix.python-version }}
6464
- name: Lint code

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
99

1010
### Added
1111

12+
- Advanced comparison (LIKE, IN, BETWEEN) operators to the Filter extension [#178](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/178)
13+
1214
### Changed
1315

16+
- Elasticsearch drivers from 7.17.9 to 8.11.0 [#169](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/169)
17+
1418
### Fixed
1519

20+
- Exclude unset fields in search response [#166](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/166)
21+
- Upgrade stac-fastapi to v2.4.9 [#172](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/172)
22+
- Set correct default filter-lang for GET /search requests [#179](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/179)
1623

1724
## [v1.0.0]
1825

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ services:
5959

6060
elasticsearch:
6161
container_name: es-container
62-
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.10.4}
62+
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.11.0}
6363
environment:
6464
ES_JAVA_OPTS: -Xms512m -Xmx1g
6565
volumes:

stac_fastapi/elasticsearch/setup.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010
"attrs",
1111
"pydantic[dotenv]<2",
1212
"stac_pydantic==2.0.*",
13-
"stac-fastapi.types==2.4.8",
14-
"stac-fastapi.api==2.4.8",
15-
"stac-fastapi.extensions==2.4.8",
16-
"elasticsearch[async]==7.17.9",
17-
"elasticsearch-dsl==7.4.1",
13+
"stac-fastapi.types==2.4.9",
14+
"stac-fastapi.api==2.4.9",
15+
"stac-fastapi.extensions==2.4.9",
16+
"elasticsearch[async]==8.11.0",
17+
"elasticsearch-dsl==8.11.0",
1818
"pystac[validation]",
1919
"uvicorn",
2020
"orjson",

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424
settings = ElasticsearchSettings()
2525
session = Session.create_from_settings(settings)
2626

27+
filter_extension = FilterExtension(client=EsAsyncBaseFiltersClient())
28+
filter_extension.conformance_classes.append(
29+
"http://www.opengis.net/spec/cql2/1.0/conf/advanced-comparison-operators"
30+
)
31+
2732
extensions = [
2833
TransactionExtension(client=TransactionsClient(session=session), settings=settings),
2934
BulkTransactionExtension(client=BulkTransactionsClient(session=session)),
@@ -32,7 +37,7 @@
3237
SortExtension(),
3338
TokenPaginationExtension(),
3439
ContextExtension(),
35-
FilterExtension(client=EsAsyncBaseFiltersClient()),
40+
filter_extension,
3641
]
3742

3843
post_request_model = create_post_request_model(extensions)

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,43 @@
11
"""API configuration."""
22
import os
3+
import ssl
34
from typing import Any, Dict, Set
45

56
from elasticsearch import AsyncElasticsearch, Elasticsearch # type: ignore
67
from stac_fastapi.types.config import ApiSettings
78

89

910
def _es_config() -> Dict[str, Any]:
11+
# Determine the scheme (http or https)
12+
use_ssl = os.getenv("ES_USE_SSL", "true").lower() == "true"
13+
scheme = "https" if use_ssl else "http"
14+
15+
# Configure the hosts parameter with the correct scheme
16+
hosts = [f"{scheme}://{os.getenv('ES_HOST')}:{os.getenv('ES_PORT')}"]
17+
18+
# Initialize the configuration dictionary
1019
config = {
11-
"hosts": [{"host": os.getenv("ES_HOST"), "port": os.getenv("ES_PORT")}],
20+
"hosts": hosts,
1221
"headers": {"accept": "application/vnd.elasticsearch+json; compatible-with=7"},
13-
"use_ssl": True,
14-
"verify_certs": True,
1522
}
1623

17-
if (u := os.getenv("ES_USER")) and (p := os.getenv("ES_PASS")):
18-
config["http_auth"] = (u, p)
24+
# Explicitly exclude SSL settings when not using SSL
25+
if not use_ssl:
26+
return config
1927

20-
if (v := os.getenv("ES_USE_SSL")) and v == "false":
21-
config["use_ssl"] = False
28+
# Include SSL settings if using https
29+
config["ssl_version"] = ssl.TLSVersion.TLSv1_3 # type: ignore
30+
config["verify_certs"] = os.getenv("ES_VERIFY_CERTS", "true").lower() != "false" # type: ignore
2231

23-
if (v := os.getenv("ES_VERIFY_CERTS")) and v == "false":
24-
config["verify_certs"] = False
32+
# Include CA Certificates if verifying certs
33+
if config["verify_certs"]:
34+
config["ca_certs"] = os.getenv(
35+
"CURL_CA_BUNDLE", "/etc/ssl/certs/ca-certificates.crt"
36+
)
2537

26-
if v := os.getenv("CURL_CA_BUNDLE"):
27-
config["ca_certs"] = v
38+
# Handle authentication
39+
if (u := os.getenv("ES_USER")) and (p := os.getenv("ES_PASS")):
40+
config["http_auth"] = (u, p)
2841

2942
return config
3043

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from stac_fastapi.elasticsearch.session import Session
2727
from stac_fastapi.extensions.third_party.bulk_transactions import (
2828
BaseBulkTransactionsClient,
29+
BulkTransactionMethod,
2930
Items,
3031
)
3132
from stac_fastapi.types import stac as stac_types
@@ -379,12 +380,12 @@ async def get_search(
379380
base_args["sortby"] = sort_param
380381

381382
if filter:
382-
if filter_lang == "cql2-text":
383+
if filter_lang == "cql2-json":
383384
base_args["filter-lang"] = "cql2-json"
384-
base_args["filter"] = orjson.loads(to_cql2(parse_cql2_text(filter)))
385+
base_args["filter"] = orjson.loads(unquote_plus(filter))
385386
else:
386387
base_args["filter-lang"] = "cql2-json"
387-
base_args["filter"] = orjson.loads(unquote_plus(filter))
388+
base_args["filter"] = orjson.loads(to_cql2(parse_cql2_text(filter)))
388389

389390
if fields:
390391
includes = set()
@@ -509,7 +510,9 @@ async def post_search(
509510
filter_kwargs = search_request.fields.filter_fields
510511

511512
items = [
512-
orjson.loads(stac_pydantic.Item(**feat).json(**filter_kwargs))
513+
orjson.loads(
514+
stac_pydantic.Item(**feat).json(**filter_kwargs, exclude_unset=True)
515+
)
513516
for feat in items
514517
]
515518

@@ -566,7 +569,7 @@ async def create_item(
566569
if item["type"] == "FeatureCollection":
567570
bulk_client = BulkTransactionsClient()
568571
processed_items = [
569-
bulk_client.preprocess_item(item, base_url) for item in item["features"] # type: ignore
572+
bulk_client.preprocess_item(item, base_url, BulkTransactionMethod.INSERT) for item in item["features"] # type: ignore
570573
]
571574

572575
await self.database.bulk_async(
@@ -716,17 +719,23 @@ def __attrs_post_init__(self):
716719
settings = ElasticsearchSettings()
717720
self.client = settings.create_client
718721

719-
def preprocess_item(self, item: stac_types.Item, base_url) -> stac_types.Item:
722+
def preprocess_item(
723+
self, item: stac_types.Item, base_url, method: BulkTransactionMethod
724+
) -> stac_types.Item:
720725
"""Preprocess an item to match the data model.
721726
722727
Args:
723728
item: The item to preprocess.
724729
base_url: The base URL of the request.
730+
method: The bulk transaction method.
725731
726732
Returns:
727733
The preprocessed item.
728734
"""
729-
return self.database.sync_prep_create_item(item=item, base_url=base_url)
735+
exist_ok = method == BulkTransactionMethod.UPSERT
736+
return self.database.sync_prep_create_item(
737+
item=item, base_url=base_url, exist_ok=exist_ok
738+
)
730739

731740
@overrides
732741
def bulk_item_insert(
@@ -749,7 +758,8 @@ def bulk_item_insert(
749758
base_url = ""
750759

751760
processed_items = [
752-
self.preprocess_item(item, base_url) for item in items.items.values()
761+
self.preprocess_item(item, base_url, items.method)
762+
for item in items.items.values()
753763
]
754764

755765
# not a great way to get the collection_id-- should be part of the method signature

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_elasticsearch/database_logic.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -171,18 +171,19 @@ def indices(collection_ids: Optional[List[str]]) -> str:
171171

172172

173173
async def create_collection_index() -> None:
174-
"""Create the index for Collections in Elasticsearch.
174+
"""
175+
Create the index for a Collection.
176+
177+
Returns:
178+
None
175179
176-
This function creates the Elasticsearch index for the `Collections` with the predefined mapping.
177-
If the index already exists, the function ignores the error and continues execution.
178180
"""
179181
client = AsyncElasticsearchSettings().create_client
180182

181-
await client.indices.create(
183+
await client.options(ignore_status=400).indices.create(
182184
index=f"{COLLECTIONS_INDEX}-000001",
183185
aliases={COLLECTIONS_INDEX: {}},
184186
mappings=ES_COLLECTIONS_MAPPINGS,
185-
ignore=400, # ignore 400 already exists code
186187
)
187188
await client.close()
188189

@@ -201,12 +202,11 @@ async def create_item_index(collection_id: str):
201202
client = AsyncElasticsearchSettings().create_client
202203
index_name = index_by_collection_id(collection_id)
203204

204-
await client.indices.create(
205+
await client.options(ignore_status=400).indices.create(
205206
index=f"{index_by_collection_id(collection_id)}-000001",
206207
aliases={index_name: {}},
207208
mappings=ES_ITEMS_MAPPINGS,
208209
settings=ES_ITEMS_SETTINGS,
209-
ignore=400, # ignore 400 already exists code
210210
)
211211
await client.close()
212212

@@ -577,13 +577,16 @@ async def check_collection_exists(self, collection_id: str):
577577
if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id):
578578
raise NotFoundError(f"Collection {collection_id} does not exist")
579579

580-
async def prep_create_item(self, item: Item, base_url: str) -> Item:
580+
async def prep_create_item(
581+
self, item: Item, base_url: str, exist_ok: bool = False
582+
) -> Item:
581583
"""
582584
Preps an item for insertion into the database.
583585
584586
Args:
585587
item (Item): The item to be prepped for insertion.
586588
base_url (str): The base URL used to create the item's self URL.
589+
exist_ok (bool): Indicates whether the item can exist already.
587590
588591
Returns:
589592
Item: The prepped item.
@@ -594,7 +597,7 @@ async def prep_create_item(self, item: Item, base_url: str) -> Item:
594597
"""
595598
await self.check_collection_exists(collection_id=item["collection"])
596599

597-
if await self.client.exists(
600+
if not exist_ok and await self.client.exists(
598601
index=index_by_collection_id(item["collection"]),
599602
id=mk_item_id(item["id"], item["collection"]),
600603
):
@@ -604,17 +607,20 @@ async def prep_create_item(self, item: Item, base_url: str) -> Item:
604607

605608
return self.item_serializer.stac_to_db(item, base_url)
606609

607-
def sync_prep_create_item(self, item: Item, base_url: str) -> Item:
610+
def sync_prep_create_item(
611+
self, item: Item, base_url: str, exist_ok: bool = False
612+
) -> Item:
608613
"""
609614
Prepare an item for insertion into the database.
610615
611616
This method performs pre-insertion preparation on the given `item`,
612617
such as checking if the collection the item belongs to exists,
613-
and verifying that an item with the same ID does not already exist in the database.
618+
and optionally verifying that an item with the same ID does not already exist in the database.
614619
615620
Args:
616621
item (Item): The item to be inserted into the database.
617622
base_url (str): The base URL used for constructing URLs for the item.
623+
exist_ok (bool): Indicates whether the item can exist already.
618624
619625
Returns:
620626
Item: The item after preparation is done.
@@ -628,7 +634,7 @@ def sync_prep_create_item(self, item: Item, base_url: str) -> Item:
628634
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id):
629635
raise NotFoundError(f"Collection {collection_id} does not exist")
630636

631-
if self.sync_client.exists(
637+
if not exist_ok and self.sync_client.exists(
632638
index=index_by_collection_id(collection_id),
633639
id=mk_item_id(item_id, collection_id),
634640
):

0 commit comments

Comments
 (0)