-
Notifications
You must be signed in to change notification settings - Fork 0
Stub CLI command for updating existing docs with embeddings #370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
| from tim.errors import ( | ||
| AliasNotFoundError, | ||
| BulkIndexingError, | ||
| BulkOperationError, | ||
| IndexExistsError, | ||
| IndexNotFoundError, | ||
| ) | ||
|
|
@@ -370,6 +371,10 @@ def bulk_index(client: OpenSearch, index: str, records: Iterator[dict]) -> dict[ | |
| If an error occurs during record indexing, it will be logged and bulk indexing will | ||
| continue until all records have been processed. | ||
|
|
||
| NOTE: The update performed by the "index" action results in a full replacement of the | ||
| document in OpenSearch. If a partial record is provided, this will result in a new | ||
| document in OpenSearch containing only the fields provided in the partial record. | ||
|
|
||
| Returns total sums of: records created, records updated, errors, and total records | ||
| processed. | ||
| """ | ||
|
|
@@ -413,3 +418,54 @@ def bulk_index(client: OpenSearch, index: str, records: Iterator[dict]) -> dict[ | |
| ) | ||
| logger.debug(response) | ||
| return result | ||
|
|
||
|
|
||
| def bulk_update( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Commenting here broadly: I think the addition of this helper was a great idea. As discussed off-PR, I think it's the right call to introduce code duplication now and get things working, and then could optionally take a second pass and consider combining
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ghukill Should I go add a ticket in the USE backlog or create a GH issue? 🤔
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's try a GH issue! It doesn't feel at all required, just sort of optional code hygene and maintenance consideration. If we don't touch it, no problem, but can get a feel if GH issues are valuable + remembered. |
||
| client: OpenSearch, index: str, records: Iterator[dict] | ||
| ) -> dict[str, int]: | ||
| """Updates existing documents in the index using the streaming bulk helper. | ||
|
|
||
| This method uses the OpenSearch "update" action, which updates existing documents | ||
| and returns an error if the document does not exist. The "update" action can accept | ||
| a full or partial record and will only update the corresponding fields in the | ||
| document. | ||
|
|
||
| Returns total sums of: records updated, errors, and total records | ||
| processed. | ||
| """ | ||
| result = {"updated": 0, "errors": 0, "total": 0} | ||
| actions = helpers.generate_bulk_actions(index, records, "update") | ||
| responses = streaming_bulk( | ||
| client, | ||
| actions, | ||
| max_chunk_bytes=REQUEST_CONFIG["OPENSEARCH_BULK_MAX_CHUNK_BYTES"], | ||
| raise_on_error=False, | ||
| ) | ||
| for response in responses: | ||
| if response[0] is False: | ||
| error = response[1]["update"]["error"] | ||
| record = response[1]["update"]["_id"] | ||
| if error["type"] == "mapper_parsing_exception": | ||
| logger.error( | ||
| "Error updating record '%s'. Details: %s", | ||
| record, | ||
| json.dumps(error), | ||
| ) | ||
| result["errors"] += 1 | ||
| else: | ||
| raise BulkOperationError( | ||
| "update", record, index, json.dumps(error) # noqa: EM101 | ||
| ) | ||
| elif response[1]["update"].get("result") == "updated": | ||
| result["updated"] += 1 | ||
| else: | ||
| logger.error( | ||
| "Something unexpected happened during update. Bulk update response: %s", | ||
| json.dumps(response), | ||
| ) | ||
| result["errors"] += 1 | ||
| result["total"] += 1 | ||
| if result["total"] % int(os.getenv("STATUS_UPDATE_INTERVAL", "1000")) == 0: | ||
| logger.info("Status update: %s records updated so far!", result["total"]) | ||
| logger.debug(response) | ||
| return result | ||
Uh oh!
There was an error while loading. Please reload this page.