|
28 | 28 | SPECS_PATH_SETUPS, |
29 | 29 | STREAM_GH_EVENTS_COMMIT_BUILDERS_CG, |
30 | 30 | STREAM_KEYNAME_NEW_BUILD_EVENTS, |
| 31 | + get_arch_specific_stream_name, |
31 | 32 | REDIS_HEALTH_CHECK_INTERVAL, |
32 | 33 | REDIS_SOCKET_TIMEOUT, |
33 | 34 | REDIS_BINS_EXPIRE_SECS, |
|
47 | 48 | PERFORMANCE_GH_TOKEN = os.getenv("PERFORMANCE_GH_TOKEN", None) |
48 | 49 |
|
49 | 50 |
|
| 51 | +def clear_pending_messages_for_builder_consumer(conn, builder_group, builder_id): |
| 52 | + """Clear all pending messages for a specific builder consumer on startup""" |
| 53 | + consumer_name = f"{builder_group}-proc#{builder_id}" |
| 54 | + |
| 55 | + try: |
| 56 | + # Get pending messages for this specific consumer |
| 57 | + pending_info = conn.xpending_range( |
| 58 | + STREAM_KEYNAME_GH_EVENTS_COMMIT, |
| 59 | + builder_group, |
| 60 | + min="-", |
| 61 | + max="+", |
| 62 | + count=1000, # Get up to 1000 pending messages |
| 63 | + consumername=consumer_name, |
| 64 | + ) |
| 65 | + |
| 66 | + if pending_info: |
| 67 | + message_ids = [msg["message_id"] for msg in pending_info] |
| 68 | + logging.info( |
| 69 | + f"Found {len(message_ids)} pending messages for builder consumer {consumer_name}. Clearing them..." |
| 70 | + ) |
| 71 | + |
| 72 | + # Acknowledge all pending messages to clear them |
| 73 | + ack_count = conn.xack( |
| 74 | + STREAM_KEYNAME_GH_EVENTS_COMMIT, builder_group, *message_ids |
| 75 | + ) |
| 76 | + |
| 77 | + logging.info( |
| 78 | + f"Successfully cleared {ack_count} pending messages for builder consumer {consumer_name}" |
| 79 | + ) |
| 80 | + else: |
| 81 | + logging.info( |
| 82 | + f"No pending messages found for builder consumer {consumer_name}" |
| 83 | + ) |
| 84 | + |
| 85 | + except redis.exceptions.ResponseError as e: |
| 86 | + if "NOGROUP" in str(e): |
| 87 | + logging.info(f"Builder consumer group {builder_group} does not exist yet") |
| 88 | + else: |
| 89 | + logging.warning(f"Error clearing pending messages: {e}") |
| 90 | + except Exception as e: |
| 91 | + logging.error(f"Unexpected error clearing pending messages: {e}") |
| 92 | + |
| 93 | + |
| 94 | +def reset_builder_consumer_group_to_latest(conn, builder_group): |
| 95 | + """Reset the builder consumer group position to only read new messages (skip old ones)""" |
| 96 | + try: |
| 97 | + # Set the consumer group position to '$' (latest) to skip all existing messages |
| 98 | + conn.xgroup_setid(STREAM_KEYNAME_GH_EVENTS_COMMIT, builder_group, id="$") |
| 99 | + logging.info( |
| 100 | + f"Reset builder consumer group {builder_group} position to latest - will only process new messages" |
| 101 | + ) |
| 102 | + |
| 103 | + except redis.exceptions.ResponseError as e: |
| 104 | + if "NOGROUP" in str(e): |
| 105 | + logging.info(f"Builder consumer group {builder_group} does not exist yet") |
| 106 | + else: |
| 107 | + logging.warning(f"Error resetting builder consumer group position: {e}") |
| 108 | + except Exception as e: |
| 109 | + logging.error( |
| 110 | + f"Unexpected error resetting builder consumer group position: {e}" |
| 111 | + ) |
| 112 | + |
| 113 | + |
50 | 114 | class ZipFileWithPermissions(ZipFile): |
51 | 115 | def _extract_member(self, member, targetpath, pwd): |
52 | 116 | if not isinstance(member, ZipInfo): |
@@ -104,6 +168,12 @@ def main(): |
104 | 168 | ) |
105 | 169 | parser.add_argument("--github_token", type=str, default=PERFORMANCE_GH_TOKEN) |
106 | 170 | parser.add_argument("--pull-request", type=str, default=None, nargs="?", const="") |
| 171 | + parser.add_argument( |
| 172 | + "--skip-clear-pending-on-startup", |
| 173 | + default=False, |
| 174 | + action="store_true", |
| 175 | + help="Skip automatically clearing pending messages and resetting consumer group position on startup. By default, pending messages are cleared and consumer group is reset to latest position to skip old work and recover from crashes.", |
| 176 | + ) |
107 | 177 | args = parser.parse_args() |
108 | 178 | if args.logname is not None: |
109 | 179 | print("Writting log to {}".format(args.logname)) |
@@ -169,6 +239,19 @@ def main(): |
169 | 239 | builder_id = "1" |
170 | 240 |
|
171 | 241 | builder_consumer_group_create(conn, builder_group) |
| 242 | + |
| 243 | + # Clear pending messages and reset consumer group position by default (unless explicitly skipped) |
| 244 | + if not args.skip_clear_pending_on_startup: |
| 245 | + logging.info( |
| 246 | + "Clearing pending messages and resetting builder consumer group position on startup (default behavior)" |
| 247 | + ) |
| 248 | + clear_pending_messages_for_builder_consumer(conn, builder_group, builder_id) |
| 249 | + reset_builder_consumer_group_to_latest(conn, builder_group) |
| 250 | + else: |
| 251 | + logging.info( |
| 252 | + "Skipping pending message cleanup and builder consumer group reset as requested" |
| 253 | + ) |
| 254 | + |
172 | 255 | if args.github_token is not None: |
173 | 256 | logging.info("detected a github token. will update as much as possible!!! =)") |
174 | 257 | previous_id = args.consumer_start_id |
@@ -268,7 +351,32 @@ def builder_process_stream( |
268 | 351 | build_request_arch, arch |
269 | 352 | ) |
270 | 353 | ) |
| 354 | + # Acknowledge the message even though we're skipping it |
| 355 | + ack_reply = conn.xack( |
| 356 | + STREAM_KEYNAME_GH_EVENTS_COMMIT, |
| 357 | + STREAM_GH_EVENTS_COMMIT_BUILDERS_CG, |
| 358 | + streamId, |
| 359 | + ) |
| 360 | + if type(ack_reply) == bytes: |
| 361 | + ack_reply = ack_reply.decode() |
| 362 | + if ack_reply == "1" or ack_reply == 1: |
| 363 | + logging.info( |
| 364 | + "Successfully acknowledged build variation stream with id {} (filtered by arch).".format( |
| 365 | + streamId |
| 366 | + ) |
| 367 | + ) |
| 368 | + else: |
| 369 | + logging.error( |
| 370 | + "Unable to acknowledge build variation stream with id {}. XACK reply {}".format( |
| 371 | + streamId, ack_reply |
| 372 | + ) |
| 373 | + ) |
271 | 374 | return previous_id, new_builds_count, build_stream_fields_arr |
| 375 | + else: |
| 376 | + logging.info( |
| 377 | + "No arch info found on the stream. Using default arch {}.".format(arch) |
| 378 | + ) |
| 379 | + build_request_arch = arch |
272 | 380 |
|
273 | 381 | home = str(Path.home()) |
274 | 382 | if b"git_hash" in testDetails: |
@@ -429,6 +537,105 @@ def builder_process_stream( |
429 | 537 | if b"server_name" in testDetails: |
430 | 538 | server_name = testDetails[b"server_name"].decode() |
431 | 539 |
|
| 540 | + # Check if artifacts already exist before building |
| 541 | + prefix = f"build_spec={build_spec}/github_org={github_org}/github_repo={github_repo}/git_branch={str(git_branch)}/git_version={str(git_version)}/git_hash={str(git_hash)}" |
| 542 | + |
| 543 | + # Create a comprehensive build signature that includes all build-affecting parameters |
| 544 | + import hashlib |
| 545 | + |
| 546 | + build_signature_parts = [ |
| 547 | + str(id), # build config ID |
| 548 | + str(build_command), # build command |
| 549 | + str(build_vars_str), # environment variables |
| 550 | + str(compiler), # compiler |
| 551 | + str(cpp_compiler), # C++ compiler |
| 552 | + str(build_image), # build image |
| 553 | + str(build_os), # OS |
| 554 | + str(build_arch), # architecture |
| 555 | + ",".join(sorted(build_artifacts)), # artifacts list |
| 556 | + ] |
| 557 | + build_signature = hashlib.sha256( |
| 558 | + ":".join(build_signature_parts).encode() |
| 559 | + ).hexdigest()[:16] |
| 560 | + |
| 561 | + # Check if all artifacts already exist |
| 562 | + all_artifacts_exist = True |
| 563 | + artifact_keys = {} |
| 564 | + for artifact in build_artifacts: |
| 565 | + bin_key = f"zipped:artifacts:{prefix}:{id}:{build_signature}:{artifact}.zip" |
| 566 | + artifact_keys[artifact] = bin_key |
| 567 | + if not conn.exists(bin_key): |
| 568 | + all_artifacts_exist = False |
| 569 | + break |
| 570 | + |
| 571 | + if all_artifacts_exist: |
| 572 | + logging.info( |
| 573 | + f"Artifacts for {git_hash}:{id} with build signature {build_signature} already exist, reusing them" |
| 574 | + ) |
| 575 | + # Skip build and reuse existing artifacts |
| 576 | + build_stream_fields, result = generate_benchmark_stream_request( |
| 577 | + id, |
| 578 | + conn, |
| 579 | + run_image, |
| 580 | + build_arch, |
| 581 | + testDetails, |
| 582 | + build_os, |
| 583 | + build_artifacts, |
| 584 | + build_command, |
| 585 | + build_config_metadata, |
| 586 | + build_image, |
| 587 | + build_vars_str, |
| 588 | + compiler, |
| 589 | + cpp_compiler, |
| 590 | + git_branch, |
| 591 | + git_hash, |
| 592 | + git_timestamp_ms, |
| 593 | + git_version, |
| 594 | + pull_request, |
| 595 | + None, # redis_temporary_dir not needed for reuse |
| 596 | + tests_groups_regexp, |
| 597 | + tests_priority_lower_limit, |
| 598 | + tests_priority_upper_limit, |
| 599 | + tests_regexp, |
| 600 | + ".*", # command_regexp - default to all commands |
| 601 | + use_git_timestamp, |
| 602 | + server_name, |
| 603 | + github_org, |
| 604 | + github_repo, |
| 605 | + artifact_keys, # Pass existing artifact keys |
| 606 | + ) |
| 607 | + # Add to benchmark stream even when reusing artifacts |
| 608 | + if result is True: |
| 609 | + arch_specific_stream = get_arch_specific_stream_name(build_arch) |
| 610 | + logging.info( |
| 611 | + f"Adding reused build work to architecture-specific stream: {arch_specific_stream}" |
| 612 | + ) |
| 613 | + benchmark_stream_id = conn.xadd( |
| 614 | + arch_specific_stream, build_stream_fields |
| 615 | + ) |
| 616 | + logging.info( |
| 617 | + "successfully reused build variant {} for redis git_sha {}. Stream id: {}".format( |
| 618 | + id, git_hash, benchmark_stream_id |
| 619 | + ) |
| 620 | + ) |
| 621 | + streamId_decoded = streamId.decode() |
| 622 | + benchmark_stream_id_decoded = benchmark_stream_id.decode() |
| 623 | + builder_list_completed = ( |
| 624 | + f"builder:{streamId_decoded}:builds_completed" |
| 625 | + ) |
| 626 | + conn.lpush(builder_list_completed, benchmark_stream_id_decoded) |
| 627 | + conn.expire(builder_list_completed, REDIS_BINS_EXPIRE_SECS) |
| 628 | + logging.info( |
| 629 | + f"Adding information of build->benchmark stream info in list {builder_list_completed}. Adding benchmark stream id: {benchmark_stream_id_decoded}" |
| 630 | + ) |
| 631 | + build_stream_fields_arr.append(build_stream_fields) |
| 632 | + new_builds_count = new_builds_count + 1 |
| 633 | + continue # Skip to next build spec |
| 634 | + |
| 635 | + logging.info( |
| 636 | + f"Building artifacts for {git_hash}:{id} with build signature {build_signature}" |
| 637 | + ) |
| 638 | + |
432 | 639 | build_start_datetime = datetime.datetime.utcnow() |
433 | 640 | logging.info( |
434 | 641 | "Using the following build command {}.".format(build_command) |
@@ -507,10 +714,15 @@ def builder_process_stream( |
507 | 714 | server_name, |
508 | 715 | github_org, |
509 | 716 | github_repo, |
| 717 | + None, # existing_artifact_keys - None for new builds |
510 | 718 | ) |
511 | 719 | if result is True: |
| 720 | + arch_specific_stream = get_arch_specific_stream_name(build_arch) |
| 721 | + logging.info( |
| 722 | + f"Adding new build work to architecture-specific stream: {arch_specific_stream}" |
| 723 | + ) |
512 | 724 | benchmark_stream_id = conn.xadd( |
513 | | - STREAM_KEYNAME_NEW_BUILD_EVENTS, build_stream_fields |
| 725 | + arch_specific_stream, build_stream_fields |
514 | 726 | ) |
515 | 727 | logging.info( |
516 | 728 | "sucessfully built build variant {} for redis git_sha {}. Stream id: {}".format( |
@@ -648,6 +860,7 @@ def generate_benchmark_stream_request( |
648 | 860 | server_name="redis", |
649 | 861 | github_org="redis", |
650 | 862 | github_repo="redis", |
| 863 | + existing_artifact_keys=None, |
651 | 864 | ): |
652 | 865 | build_stream_fields = { |
653 | 866 | "id": id, |
@@ -691,21 +904,50 @@ def generate_benchmark_stream_request( |
691 | 904 | if git_timestamp_ms is not None: |
692 | 905 | build_stream_fields["git_timestamp_ms"] = git_timestamp_ms |
693 | 906 |
|
694 | | - prefix = f"github_org={github_org}/github_repo={github_repo}/git_branch={str(git_branch)}/git_version={str(git_version)}/git_hash={str(git_hash)}" |
695 | | - for artifact in build_artifacts: |
696 | | - bin_key = f"zipped:artifacts:{prefix}:{id}:{artifact}.zip" |
697 | | - if artifact == "redisearch.so": |
698 | | - bin_artifact = open( |
699 | | - f"{redis_temporary_dir}modules/redisearch/src/bin/linux-x64-release/search-community/{artifact}", |
700 | | - "rb", |
701 | | - ).read() |
702 | | - else: |
703 | | - bin_artifact = open(f"{redis_temporary_dir}src/{artifact}", "rb").read() |
704 | | - bin_artifact_len = len(bytes(bin_artifact)) |
705 | | - assert bin_artifact_len > 0 |
706 | | - conn.set(bin_key, bytes(bin_artifact), ex=REDIS_BINS_EXPIRE_SECS) |
707 | | - build_stream_fields[artifact] = bin_key |
708 | | - build_stream_fields["{}_len_bytes".format(artifact)] = bin_artifact_len |
| 907 | + if existing_artifact_keys is not None: |
| 908 | + # Use existing artifact keys (for reuse case) |
| 909 | + for artifact in build_artifacts: |
| 910 | + bin_key = existing_artifact_keys[artifact] |
| 911 | + build_stream_fields[artifact] = bin_key |
| 912 | + # Get the length from the existing artifact |
| 913 | + bin_artifact_len = conn.strlen(bin_key) |
| 914 | + build_stream_fields["{}_len_bytes".format(artifact)] = bin_artifact_len |
| 915 | + else: |
| 916 | + # Build new artifacts and store them |
| 917 | + prefix = f"github_org={github_org}/github_repo={github_repo}/git_branch={str(git_branch)}/git_version={str(git_version)}/git_hash={str(git_hash)}" |
| 918 | + |
| 919 | + # Create build signature for new artifacts |
| 920 | + import hashlib |
| 921 | + |
| 922 | + build_signature_parts = [ |
| 923 | + str(id), # build config ID |
| 924 | + str(build_command), # build command |
| 925 | + str(build_vars_str), # environment variables |
| 926 | + str(compiler), # compiler |
| 927 | + str(cpp_compiler), # C++ compiler |
| 928 | + str(build_image), # build image |
| 929 | + str(build_os), # OS |
| 930 | + str(build_arch), # architecture |
| 931 | + ",".join(sorted(build_artifacts)), # artifacts list |
| 932 | + ] |
| 933 | + build_signature = hashlib.sha256( |
| 934 | + ":".join(build_signature_parts).encode() |
| 935 | + ).hexdigest()[:16] |
| 936 | + |
| 937 | + for artifact in build_artifacts: |
| 938 | + bin_key = f"zipped:artifacts:{prefix}:{id}:{build_signature}:{artifact}.zip" |
| 939 | + if artifact == "redisearch.so": |
| 940 | + bin_artifact = open( |
| 941 | + f"{redis_temporary_dir}modules/redisearch/src/bin/linux-x64-release/search-community/{artifact}", |
| 942 | + "rb", |
| 943 | + ).read() |
| 944 | + else: |
| 945 | + bin_artifact = open(f"{redis_temporary_dir}src/{artifact}", "rb").read() |
| 946 | + bin_artifact_len = len(bytes(bin_artifact)) |
| 947 | + assert bin_artifact_len > 0 |
| 948 | + conn.set(bin_key, bytes(bin_artifact), ex=REDIS_BINS_EXPIRE_SECS) |
| 949 | + build_stream_fields[artifact] = bin_key |
| 950 | + build_stream_fields["{}_len_bytes".format(artifact)] = bin_artifact_len |
709 | 951 | result = True |
710 | 952 | if b"platform" in testDetails: |
711 | 953 | build_stream_fields["platform"] = testDetails[b"platform"] |
|
0 commit comments