Skip to content

Commit 464da26

Browse files
[wip]
1 parent 3254366 commit 464da26

File tree

3 files changed

+28
-36
lines changed

3 files changed

+28
-36
lines changed

tests/test_opensearch.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ def test_bulk_index_creates_records(
459459
test_opensearch_client, five_valid_index_libguides_records
460460
):
461461
assert tim_os.bulk_index(
462-
test_opensearch_client, "test-index", five_valid_index_libguides_records
462+
test_opensearch_client, "test-index", five_valid_index_libguides_records, "index"
463463
) == {
464464
"created": 5,
465465
"updated": 0,
@@ -474,22 +474,22 @@ def test_bulk_index_updates_records(
474474
):
475475
monkeypatch.setenv("STATUS_UPDATE_INTERVAL", "5")
476476
assert tim_os.bulk_index(
477-
test_opensearch_client, "test-index", five_valid_index_libguides_records
477+
test_opensearch_client, "test-index", five_valid_index_libguides_records, "index"
478478
) == {
479479
"created": 0,
480480
"updated": 5,
481481
"errors": 0,
482482
"total": 5,
483483
}
484-
assert "Status update: 5 records indexed so far!" in caplog.text
484+
assert "Status update: 5 records processed so far!" in caplog.text
485485

486486

487487
@my_vcr.use_cassette("opensearch/bulk_index_record_mapper_parsing_error.yaml")
488488
def test_bulk_index_logs_mapper_parsing_errors(
489489
caplog, test_opensearch_client, one_invalid_index_libguides_records
490490
):
491491
assert tim_os.bulk_index(
492-
test_opensearch_client, "test-index", one_invalid_index_libguides_records
492+
test_opensearch_client, "test-index", one_invalid_index_libguides_records, "index"
493493
) == {
494494
"created": 0,
495495
"updated": 0,

tim/cli.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,9 @@ def bulk_update(
308308
action="index",
309309
)
310310
try:
311-
index_results.update(tim_os.bulk_index(client, index, records_to_index))
311+
index_results.update(
312+
tim_os.bulk_index(client, index, records_to_index, action="index")
313+
)
312314
except BulkIndexingError as exception:
313315
logger.info(f"Bulk indexing failed: {exception}")
314316

@@ -401,9 +403,10 @@ def bulk_update_embeddings(
401403
]
402404
)
403405
# ==== END TEMPORARY CODE ====
404-
405406
try:
406-
update_results.update(tim_os.bulk_update(client, index, records_to_update))
407+
update_results.update(
408+
tim_os.bulk_index(client, index, records_to_update, action="update")
409+
)
407410
except BulkIndexingError as exception:
408411
logger.info(f"Bulk update with embeddings failed: {exception}")
409412

@@ -479,7 +482,9 @@ def reindex_source(
479482
action="index",
480483
)
481484
try:
482-
index_results.update(tim_os.bulk_index(client, index, records_to_index))
485+
index_results.update(
486+
tim_os.bulk_index(client, index, records_to_index, action="index")
487+
)
483488
except BulkIndexingError as exception:
484489
logger.info(f"Bulk indexing failed: {exception}")
485490

tim/opensearch.py

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -359,22 +359,26 @@ def bulk_delete(
359359
return result
360360

361361

362-
def bulk_index(client: OpenSearch, index: str, records: Iterator[dict]) -> dict[str, int]:
362+
def bulk_index(
363+
client: OpenSearch, index: str, records: Iterator[dict], action: str
364+
) -> dict[str, int]:
363365
"""Indexes records into an existing index using the streaming bulk helper.
364366
365-
This action function uses the OpenSearch "index" action, which is a
366-
combination of create and update: if a record with the same _id exists in the
367-
index, it will be updated. If it does not exist, the record will be indexed as a
368-
new document.
367+
This method uses the OpenSearch "index" and "update" operations.
368+
- Setting `action` to "index" will either create or update a record.
369+
If a record with the same _id exists in the index, it will be updated;
370+
if it does not exist, the record will be added as a new document.
371+
- Setting `action` to "update" will update a document only if it exists
372+
in the index. Otherwise, an error is raised.
369373
370-
If an error occurs during record indexing, it will be logged and bulk indexing will
371-
continue until all records have been processed.
374+
If an error occurs during the operation, it will be logged, and the bulk
375+
operation will continue until all records have been processed.
372376
373377
Returns total sums of: records created, records updated, errors, and total records
374378
processed.
375379
"""
376380
result = {"created": 0, "updated": 0, "errors": 0, "total": 0}
377-
actions = helpers.generate_bulk_actions(index, records, "index")
381+
actions = helpers.generate_bulk_actions(index, records, action)
378382
responses = streaming_bulk(
379383
client,
380384
actions,
@@ -400,34 +404,17 @@ def bulk_index(client: OpenSearch, index: str, records: Iterator[dict]) -> dict[
400404
result["updated"] += 1
401405
else:
402406
logger.error(
403-
"Something unexpected happened during ingest. Bulk index response: %s",
407+
"Something unexpected happened during ingest. "
408+
f"Bulk {action} response: %s",
404409
json.dumps(response),
405410
)
406411
result["errors"] += 1
407412
result["total"] += 1
408413
if result["total"] % int(os.getenv("STATUS_UPDATE_INTERVAL", "1000")) == 0:
409-
logger.info("Status update: %s records indexed so far!", result["total"])
414+
logger.info("Status update: %s records processed so far!", result["total"])
410415
logger.info("All records ingested, refreshing index.")
411416
response = client.indices.refresh(
412417
index=index,
413418
)
414419
logger.debug(response)
415420
return result
416-
417-
418-
def bulk_update(
419-
client: OpenSearch, index: str, records: Iterator[dict]
420-
) -> dict[str, int]:
421-
result = {"updated": 0, "errors": 0, "total": 0}
422-
actions = helpers.generate_bulk_actions(index, records, "update")
423-
responses = streaming_bulk(
424-
client,
425-
actions,
426-
max_chunk_bytes=REQUEST_CONFIG["OPENSEARCH_BULK_MAX_CHUNK_BYTES"],
427-
raise_on_error=False,
428-
)
429-
for response in responses:
430-
# TODO: Parse different responses from bulk operations
431-
pass
432-
logger.debug(response)
433-
return result

0 commit comments

Comments
 (0)