2525COLLECTIONS_INDEX = os .getenv ("STAC_COLLECTIONS_INDEX" , "collections" )
2626ITEMS_INDEX_PREFIX = os .getenv ("STAC_ITEMS_INDEX_PREFIX" , "items_" )
2727
28- DEFAULT_INDICES = f"*,-*kibana*,- { COLLECTIONS_INDEX } "
28+ ITEM_INDICES = f"{ ITEMS_INDEX_PREFIX } * "
2929
3030DEFAULT_SORT = {
3131 "properties.datetime" : {"order" : "desc" },
@@ -150,7 +150,7 @@ def indices(collection_ids: Optional[List[str]]) -> str:
150150 A string of comma-separated index names. If `collection_ids` is None, returns the default indices.
151151 """
152152 if collection_ids is None :
153- return DEFAULT_INDICES
153+ return ITEM_INDICES
154154 else :
155155 return "," .join ([f"{ ITEMS_INDEX_PREFIX } { c .strip ()} " for c in collection_ids ])
156156
@@ -164,7 +164,8 @@ async def create_collection_index() -> None:
164164 client = AsyncElasticsearchSettings ().create_client
165165
166166 await client .indices .create (
167- index = COLLECTIONS_INDEX ,
167+ index = f"{ COLLECTIONS_INDEX } -000001" ,
168+ aliases = {COLLECTIONS_INDEX : {}},
168169 mappings = ES_COLLECTIONS_MAPPINGS ,
169170 ignore = 400 , # ignore 400 already exists code
170171 )
@@ -183,9 +184,11 @@ async def create_item_index(collection_id: str):
183184
184185 """
185186 client = AsyncElasticsearchSettings ().create_client
187+ index_name = index_by_collection_id (collection_id )
186188
187189 await client .indices .create (
188- index = index_by_collection_id (collection_id ),
190+ index = f"{ index_by_collection_id (collection_id )} -000001" ,
191+ aliases = {index_name : {}},
189192 mappings = ES_ITEMS_MAPPINGS ,
190193 settings = ES_ITEMS_SETTINGS ,
191194 ignore = 400 , # ignore 400 already exists code
@@ -201,7 +204,14 @@ async def delete_item_index(collection_id: str):
201204 """
202205 client = AsyncElasticsearchSettings ().create_client
203206
204- await client .indices .delete (index = index_by_collection_id (collection_id ))
207+ name = index_by_collection_id (collection_id )
208+ resolved = await client .indices .resolve_index (name = name )
209+ if "aliases" in resolved and resolved ["aliases" ]:
210+ [alias ] = resolved ["aliases" ]
211+ await client .indices .delete_alias (index = alias ["indices" ], name = alias ["name" ])
212+ await client .indices .delete (index = alias ["indices" ])
213+ else :
214+ await client .indices .delete (index = name )
205215 await client .close ()
206216
207217
@@ -759,14 +769,11 @@ async def bulk_async(
759769 `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the
760770 index is refreshed after the bulk insert. The function does not return any value.
761771 """
762- await asyncio .get_event_loop ().run_in_executor (
763- None ,
764- lambda : helpers .bulk (
765- self .sync_client ,
766- mk_actions (collection_id , processed_items ),
767- refresh = refresh ,
768- raise_on_error = False ,
769- ),
772+ await helpers .async_bulk (
773+ self .client ,
774+ mk_actions (collection_id , processed_items ),
775+ refresh = refresh ,
776+ raise_on_error = False ,
770777 )
771778
772779 def bulk_sync (
@@ -797,7 +804,7 @@ def bulk_sync(
797804 async def delete_items (self ) -> None :
798805 """Danger. this is only for tests."""
799806 await self .client .delete_by_query (
800- index = DEFAULT_INDICES ,
807+ index = ITEM_INDICES ,
801808 body = {"query" : {"match_all" : {}}},
802809 wait_for_completion = True ,
803810 )
0 commit comments