Skip to content

Commit f16c91c

Browse files
[wip]
1 parent 142f055 commit f16c91c

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

tim/cli.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,11 +361,14 @@ def bulk_update_embeddings(
361361
td = TIMDEXDataset(location=dataset_path)
362362

363363
# TODO: update TDA to read embeddings
364+
records_to_update = iter()
364365

365366
try:
366-
update_results.update(tim_os.bulk_index(client, index, records_to_index))
367+
update_results.update(tim_os.bulk_update(client, index, records_to_update))
367368
except BulkIndexingError as exception:
368-
logger.info(f"Bulk indexing failed: {exception}")
369+
logger.info(f"Bulk update with embeddings failed: {exception}")
370+
371+
logger.info(f"Bulk update with embeddings complete: {json.dumps(update_results)}")
369372

370373

371374
@main.command()

tim/opensearch.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,3 +413,21 @@ def bulk_index(client: OpenSearch, index: str, records: Iterator[dict]) -> dict[
413413
)
414414
logger.debug(response)
415415
return result
416+
417+
418+
def bulk_update(
419+
client: OpenSearch, index: str, records: Iterator[dict]
420+
) -> dict[str, int]:
421+
result = {"updated": 0, "errors": 0, "total": 0}
422+
actions = helpers.generate_bulk_actions(index, records, "update")
423+
responses = streaming_bulk(
424+
client,
425+
actions,
426+
max_chunk_bytes=REQUEST_CONFIG["OPENSEARCH_BULK_MAX_CHUNK_BYTES"],
427+
raise_on_error=False,
428+
)
429+
for response in responses:
430+
# TODO: Parse different responses from bulk operations
431+
pass
432+
logger.debug(response)
433+
return result

0 commit comments

Comments
 (0)