|
1 | 1 | # ruff: noqa: TRY003, EM101 |
| 2 | +import json |
2 | 3 | import logging |
3 | 4 | from datetime import timedelta |
4 | 5 | from time import perf_counter |
5 | 6 |
|
6 | 7 | import rich_click as click |
| 8 | +from timdex_dataset_api import TIMDEXDataset |
7 | 9 |
|
8 | 10 | from tim import errors, helpers |
9 | 11 | from tim import opensearch as tim_os |
10 | 12 | from tim.config import PRIMARY_ALIAS, VALID_SOURCES, configure_logger, configure_sentry |
| 13 | +from tim.errors import BulkIndexingError |
11 | 14 |
|
12 | 15 | logger = logging.getLogger(__name__) |
13 | 16 |
|
|
23 | 26 | }, |
24 | 27 | { |
25 | 28 | "name": "Bulk record processing commands", |
26 | | - "commands": ["bulk-index", "bulk-delete"], |
| 29 | + "commands": ["bulk-index", "bulk-delete", "bulk-update"], |
27 | 30 | }, |
28 | 31 | ] |
29 | 32 | } |
@@ -252,6 +255,7 @@ def promote(ctx: click.Context, index: str, alias: list[str]) -> None: |
252 | 255 | # Bulk record processing commands |
253 | 256 |
|
254 | 257 |
|
| 258 | +# NOTE: FEATURE FLAG: 'bulk_index' supports ETL v1 |
255 | 259 | @main.command() |
256 | 260 | @click.option("-i", "--index", help="Name of the index to bulk index records into.") |
257 | 261 | @click.option( |
@@ -295,6 +299,7 @@ def bulk_index(ctx: click.Context, index: str, source: str, filepath: str) -> No |
295 | 299 | ) |
296 | 300 |
|
297 | 301 |
|
| 302 | +# NOTE: FEATURE FLAG: 'bulk_delete' supports ETL v1 |
298 | 303 | @main.command() |
299 | 304 | @click.option("-i", "--index", help="Name of the index to bulk delete records from.") |
300 | 305 | @click.option( |
@@ -334,3 +339,65 @@ def bulk_delete(ctx: click.Context, index: str, source: str, filepath: str) -> N |
334 | 339 | results["deleted"], |
335 | 340 | results["total"], |
336 | 341 | ) |
| 342 | + |
| 343 | + |
| 344 | +@main.command() |
| 345 | +@click.option( |
| 346 | + "-i", |
| 347 | + "--index", |
| 348 | + help="Name of the index on which to perform bulk indexing and deletion.", |
| 349 | +) |
| 350 | +@click.option( |
| 351 | + "-s", |
| 352 | + "--source", |
| 353 | + type=click.Choice(VALID_SOURCES), |
| 354 | + help="Source whose primary-aliased index to bulk index records into.", |
| 355 | +) |
| 356 | +@click.option("-d", "--run-date", help="Run date, formatted as YYYY-MM-DD.") |
| 357 | +@click.option("-rid", "--run-id", help="Run ID.") |
| 358 | +@click.argument("dataset_path", type=click.Path()) |
| 359 | +@click.pass_context |
| 360 | +def bulk_update( |
| 361 | + ctx: click.Context, |
| 362 | + index: str, |
| 363 | + source: str, |
| 364 | + run_date: str, |
| 365 | + run_id: str, |
| 366 | + dataset_path: str, |
| 367 | +) -> None: |
| 368 | + """Bulk update records for an index. |
| 369 | +
|
| 370 | + Must provide either the name of an existing index in the cluster or a valid source. |
| 371 | + If source is provided, it will perform indexing and/or deletion of records for |
| 372 | + the primary-aliased index for the source. |
| 373 | +
|
| 374 | + The method will read transformed records from a TIMDEXDataset |
| 375 | + located at dataset_path using the 'timdex-dataset-api' library. The dataset |
| 376 | + is filtered by run date and run ID. |
| 377 | +
|
| 378 | + Logs an error and aborts if the provided index doesn't exist in the cluster. |
| 379 | + """ |
| 380 | + client = ctx.obj["CLIENT"] |
| 381 | + index = helpers.validate_bulk_cli_options(index, source, client) |
| 382 | + |
| 383 | + logger.info(f"Bulk updating records from dataset '{dataset_path}' into '{index}'") |
| 384 | + |
| 385 | + index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0} |
| 386 | + delete_results = {"deleted": 0, "errors": 0, "total": 0} |
| 387 | + |
| 388 | + td = TIMDEXDataset(location=dataset_path) |
| 389 | + td.load(run_date=run_date, run_id=run_id) |
| 390 | + |
| 391 | + # bulk index records |
| 392 | + records_to_index = td.read_transformed_records_iter(action="index") |
| 393 | + try: |
| 394 | + index_results.update(tim_os.bulk_index(client, index, records_to_index)) |
| 395 | + except BulkIndexingError as exception: |
| 396 | + logger.info(f"Bulk indexing failed: {exception}") |
| 397 | + |
| 398 | + # bulk delete records |
| 399 | + records_to_delete = td.read_dicts_iter(columns=["timdex_record_id"], action="delete") |
| 400 | + delete_results.update(tim_os.bulk_delete(client, index, records_to_delete)) |
| 401 | + |
| 402 | + summary_results = {"index": index_results, "delete": delete_results} |
| 403 | + logger.info(f"Bulk update complete: {json.dumps(summary_results)}") |
0 commit comments