Skip to content

Commit f41386f

Browse files
[wip]
1 parent 36fc1f8 commit f41386f

File tree

3 files changed

+17
-28
lines changed

3 files changed

+17
-28
lines changed

tests/test_opensearch.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ def test_bulk_index_creates_records(
459459
test_opensearch_client, five_valid_index_libguides_records
460460
):
461461
assert tim_os.bulk_index(
462-
test_opensearch_client, "test-index", five_valid_index_libguides_records, "index"
462+
test_opensearch_client, "test-index", five_valid_index_libguides_records
463463
) == {
464464
"created": 5,
465465
"updated": 0,
@@ -474,22 +474,22 @@ def test_bulk_index_updates_records(
474474
):
475475
monkeypatch.setenv("STATUS_UPDATE_INTERVAL", "5")
476476
assert tim_os.bulk_index(
477-
test_opensearch_client, "test-index", five_valid_index_libguides_records, "index"
477+
test_opensearch_client, "test-index", five_valid_index_libguides_records
478478
) == {
479479
"created": 0,
480480
"updated": 5,
481481
"errors": 0,
482482
"total": 5,
483483
}
484-
assert "Status update: 5 records processed so far!" in caplog.text
484+
assert "Status update: 5 records indexed so far!" in caplog.text
485485

486486

487487
@my_vcr.use_cassette("opensearch/bulk_index_record_mapper_parsing_error.yaml")
488488
def test_bulk_index_logs_mapper_parsing_errors(
489489
caplog, test_opensearch_client, one_invalid_index_libguides_records
490490
):
491491
assert tim_os.bulk_index(
492-
test_opensearch_client, "test-index", one_invalid_index_libguides_records, "index"
492+
test_opensearch_client, "test-index", one_invalid_index_libguides_records
493493
) == {
494494
"created": 0,
495495
"updated": 0,

tim/cli.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -308,9 +308,7 @@ def bulk_update(
308308
action="index",
309309
)
310310
try:
311-
index_results.update(
312-
tim_os.bulk_index(client, index, records_to_index, action="index")
313-
)
311+
index_results.update(tim_os.bulk_index(client, index, records_to_index))
314312
except BulkIndexingError as exception:
315313
logger.info(f"Bulk indexing failed: {exception}")
316314

@@ -404,9 +402,7 @@ def bulk_update_embeddings(
404402
)
405403
# ==== END TEMPORARY CODE ====
406404
try:
407-
update_results.update(
408-
tim_os.bulk_index(client, index, records_to_update, action="update")
409-
)
405+
update_results.update(tim_os.bulk_index(client, index, records_to_update))
410406
except BulkIndexingError as exception:
411407
logger.info(f"Bulk update with embeddings failed: {exception}")
412408

@@ -482,9 +478,7 @@ def reindex_source(
482478
action="index",
483479
)
484480
try:
485-
index_results.update(
486-
tim_os.bulk_index(client, index, records_to_index, action="index")
487-
)
481+
index_results.update(tim_os.bulk_index(client, index, records_to_index))
488482
except BulkIndexingError as exception:
489483
logger.info(f"Bulk indexing failed: {exception}")
490484

tim/opensearch.py

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -359,26 +359,22 @@ def bulk_delete(
359359
return result
360360

361361

362-
def bulk_index(
363-
client: OpenSearch, index: str, records: Iterator[dict], action: str
364-
) -> dict[str, int]:
362+
def bulk_index(client: OpenSearch, index: str, records: Iterator[dict]) -> dict[str, int]:
365363
"""Indexes records into an existing index using the streaming bulk helper.
366364
367-
This method uses the OpenSearch "index" and "update" operations.
368-
- Setting `action` to "index" will either create or update a record.
369-
If a record with the same _id exists in the index, it will be updated;
370-
if it does not exist, the record will be added as a new document.
371-
- Setting `action` to "update" will update a document only if it exists
372-
in the index. Otherwise, an error is raised.
365+
This action function uses the OpenSearch "index" action, which is a
366+
combination of create and update: if a record with the same _id exists in the
367+
index, it will be updated. If it does not exist, the record will be indexed as a
368+
new document.
373369
374-
If an error occurs during the operation, it will be logged, and the bulk
375-
operation will continue until all records have been processed.
370+
If an error occurs during record indexing, it will be logged and bulk indexing will
371+
continue until all records have been processed.
376372
377373
Returns total sums of: records created, records updated, errors, and total records
378374
processed.
379375
"""
380376
result = {"created": 0, "updated": 0, "errors": 0, "total": 0}
381-
actions = helpers.generate_bulk_actions(index, records, action)
377+
actions = helpers.generate_bulk_actions(index, records, "index")
382378
responses = streaming_bulk(
383379
client,
384380
actions,
@@ -404,14 +400,13 @@ def bulk_index(
404400
result["updated"] += 1
405401
else:
406402
logger.error(
407-
"Something unexpected happened during ingest. "
408-
f"Bulk {action} response: %s",
403+
"Something unexpected happened during ingest. Bulk index response: %s",
409404
json.dumps(response),
410405
)
411406
result["errors"] += 1
412407
result["total"] += 1
413408
if result["total"] % int(os.getenv("STATUS_UPDATE_INTERVAL", "1000")) == 0:
414-
logger.info("Status update: %s records processed so far!", result["total"])
409+
logger.info("Status update: %s records indexed so far!", result["total"])
415410
logger.info("All records ingested, refreshing index.")
416411
response = client.indices.refresh(
417412
index=index,

0 commit comments

Comments
 (0)