diff --git a/poetry.lock b/poetry.lock index 5cbd156e..56a003a6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2415,6 +2415,27 @@ botocore = ">=1.33.2,<2.0a.0" [package.extras] crt = ["botocore[crt] (>=1.33.2,<2.0a.0)"] +[[package]] +name = "seaborn" +version = "0.13.2" +description = "Statistical data visualization" +optional = false +python-versions = ">=3.8" +files = [ + {file = "seaborn-0.13.2-py3-none-any.whl", hash = "sha256:636f8336facf092165e27924f223d3c62ca560b1f2bb5dff7ab7fad265361987"}, + {file = "seaborn-0.13.2.tar.gz", hash = "sha256:93e60a40988f4d65e9f4885df477e2fdaff6b73a9ded434c1ab356dd57eefff7"}, +] + +[package.dependencies] +matplotlib = ">=3.4,<3.6.1 || >3.6.1" +numpy = ">=1.20,<1.24.0 || >1.24.0" +pandas = ">=1.2" + +[package.extras] +dev = ["flake8", "flit", "mypy", "pandas-stubs", "pre-commit", "pytest", "pytest-cov", "pytest-xdist"] +docs = ["ipykernel", "nbconvert", "numpydoc", "pydata_sphinx_theme (==0.10.0rc2)", "pyyaml", "sphinx (<6.0.0)", "sphinx-copybutton", "sphinx-design", "sphinx-issues"] +stats = ["scipy (>=1.7)", "statsmodels (>=0.12)"] + [[package]] name = "semver" version = "2.13.0" @@ -3022,4 +3043,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.10.0" -content-hash = "1f12e5cae46cd809d35b1b2770315519ef73fe6e3392b8ffed8fbd647763db5b" +content-hash = "6568016bb582ebe13057676a4fc66655b5f59bda9453524b2fef77d3096b7c2d" diff --git a/pyproject.toml b/pyproject.toml index 03efcbc5..591f1f9b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "redis-benchmarks-specification" -version = "0.1.323" +version = "0.2.11" description = "The Redis benchmarks specification describes the cross-language/tools requirements and expectations to foster performance and observability standards around redis related technologies. Members from both industry and academia, including organizations and individuals are encouraged to contribute." authors = ["filipecosta90 ","Redis Performance Group "] readme = "Readme.md" @@ -27,6 +27,7 @@ pandas = "^2.1.2" numpy = "^2.0.0" jsonpath-ng = "^1.6.1" +seaborn = "^0.13.2" [tool.poetry.dev-dependencies] click = "8.1.7" black = "24.4.2" diff --git a/redis_benchmarks_specification/__builder__/builder.py b/redis_benchmarks_specification/__builder__/builder.py index e91bed69..55908b5d 100644 --- a/redis_benchmarks_specification/__builder__/builder.py +++ b/redis_benchmarks_specification/__builder__/builder.py @@ -28,6 +28,7 @@ SPECS_PATH_SETUPS, STREAM_GH_EVENTS_COMMIT_BUILDERS_CG, STREAM_KEYNAME_NEW_BUILD_EVENTS, + get_arch_specific_stream_name, REDIS_HEALTH_CHECK_INTERVAL, REDIS_SOCKET_TIMEOUT, REDIS_BINS_EXPIRE_SECS, @@ -47,6 +48,69 @@ PERFORMANCE_GH_TOKEN = os.getenv("PERFORMANCE_GH_TOKEN", None) +def clear_pending_messages_for_builder_consumer(conn, builder_group, builder_id): + """Clear all pending messages for a specific builder consumer on startup""" + consumer_name = f"{builder_group}-proc#{builder_id}" + + try: + # Get pending messages for this specific consumer + pending_info = conn.xpending_range( + STREAM_KEYNAME_GH_EVENTS_COMMIT, + builder_group, + min="-", + max="+", + count=1000, # Get up to 1000 pending messages + consumername=consumer_name, + ) + + if pending_info: + message_ids = [msg["message_id"] for msg in pending_info] + logging.info( + f"Found {len(message_ids)} pending messages for builder consumer {consumer_name}. Clearing them..." + ) + + # Acknowledge all pending messages to clear them + ack_count = conn.xack( + STREAM_KEYNAME_GH_EVENTS_COMMIT, builder_group, *message_ids + ) + + logging.info( + f"Successfully cleared {ack_count} pending messages for builder consumer {consumer_name}" + ) + else: + logging.info( + f"No pending messages found for builder consumer {consumer_name}" + ) + + except redis.exceptions.ResponseError as e: + if "NOGROUP" in str(e): + logging.info(f"Builder consumer group {builder_group} does not exist yet") + else: + logging.warning(f"Error clearing pending messages: {e}") + except Exception as e: + logging.error(f"Unexpected error clearing pending messages: {e}") + + +def reset_builder_consumer_group_to_latest(conn, builder_group): + """Reset the builder consumer group position to only read new messages (skip old ones)""" + try: + # Set the consumer group position to '$' (latest) to skip all existing messages + conn.xgroup_setid(STREAM_KEYNAME_GH_EVENTS_COMMIT, builder_group, id="$") + logging.info( + f"Reset builder consumer group {builder_group} position to latest - will only process new messages" + ) + + except redis.exceptions.ResponseError as e: + if "NOGROUP" in str(e): + logging.info(f"Builder consumer group {builder_group} does not exist yet") + else: + logging.warning(f"Error resetting builder consumer group position: {e}") + except Exception as e: + logging.error( + f"Unexpected error resetting builder consumer group position: {e}" + ) + + class ZipFileWithPermissions(ZipFile): def _extract_member(self, member, targetpath, pwd): if not isinstance(member, ZipInfo): @@ -104,6 +168,12 @@ def main(): ) parser.add_argument("--github_token", type=str, default=PERFORMANCE_GH_TOKEN) parser.add_argument("--pull-request", type=str, default=None, nargs="?", const="") + parser.add_argument( + "--skip-clear-pending-on-startup", + default=False, + action="store_true", + help="Skip automatically clearing pending messages and resetting consumer group position on startup. By default, pending messages are cleared and consumer group is reset to latest position to skip old work and recover from crashes.", + ) args = parser.parse_args() if args.logname is not None: print("Writting log to {}".format(args.logname)) @@ -169,6 +239,19 @@ def main(): builder_id = "1" builder_consumer_group_create(conn, builder_group) + + # Clear pending messages and reset consumer group position by default (unless explicitly skipped) + if not args.skip_clear_pending_on_startup: + logging.info( + "Clearing pending messages and resetting builder consumer group position on startup (default behavior)" + ) + clear_pending_messages_for_builder_consumer(conn, builder_group, builder_id) + reset_builder_consumer_group_to_latest(conn, builder_group) + else: + logging.info( + "Skipping pending message cleanup and builder consumer group reset as requested" + ) + if args.github_token is not None: logging.info("detected a github token. will update as much as possible!!! =)") previous_id = args.consumer_start_id @@ -268,7 +351,32 @@ def builder_process_stream( build_request_arch, arch ) ) + # Acknowledge the message even though we're skipping it + ack_reply = conn.xack( + STREAM_KEYNAME_GH_EVENTS_COMMIT, + STREAM_GH_EVENTS_COMMIT_BUILDERS_CG, + streamId, + ) + if type(ack_reply) == bytes: + ack_reply = ack_reply.decode() + if ack_reply == "1" or ack_reply == 1: + logging.info( + "Successfully acknowledged build variation stream with id {} (filtered by arch).".format( + streamId + ) + ) + else: + logging.error( + "Unable to acknowledge build variation stream with id {}. XACK reply {}".format( + streamId, ack_reply + ) + ) return previous_id, new_builds_count, build_stream_fields_arr + else: + logging.info( + "No arch info found on the stream. Using default arch {}.".format(arch) + ) + build_request_arch = arch home = str(Path.home()) if b"git_hash" in testDetails: @@ -429,6 +537,105 @@ def builder_process_stream( if b"server_name" in testDetails: server_name = testDetails[b"server_name"].decode() + # Check if artifacts already exist before building + prefix = f"build_spec={build_spec}/github_org={github_org}/github_repo={github_repo}/git_branch={str(git_branch)}/git_version={str(git_version)}/git_hash={str(git_hash)}" + + # Create a comprehensive build signature that includes all build-affecting parameters + import hashlib + + build_signature_parts = [ + str(id), # build config ID + str(build_command), # build command + str(build_vars_str), # environment variables + str(compiler), # compiler + str(cpp_compiler), # C++ compiler + str(build_image), # build image + str(build_os), # OS + str(build_arch), # architecture + ",".join(sorted(build_artifacts)), # artifacts list + ] + build_signature = hashlib.sha256( + ":".join(build_signature_parts).encode() + ).hexdigest()[:16] + + # Check if all artifacts already exist + all_artifacts_exist = True + artifact_keys = {} + for artifact in build_artifacts: + bin_key = f"zipped:artifacts:{prefix}:{id}:{build_signature}:{artifact}.zip" + artifact_keys[artifact] = bin_key + if not conn.exists(bin_key): + all_artifacts_exist = False + break + + if all_artifacts_exist: + logging.info( + f"Artifacts for {git_hash}:{id} with build signature {build_signature} already exist, reusing them" + ) + # Skip build and reuse existing artifacts + build_stream_fields, result = generate_benchmark_stream_request( + id, + conn, + run_image, + build_arch, + testDetails, + build_os, + build_artifacts, + build_command, + build_config_metadata, + build_image, + build_vars_str, + compiler, + cpp_compiler, + git_branch, + git_hash, + git_timestamp_ms, + git_version, + pull_request, + None, # redis_temporary_dir not needed for reuse + tests_groups_regexp, + tests_priority_lower_limit, + tests_priority_upper_limit, + tests_regexp, + ".*", # command_regexp - default to all commands + use_git_timestamp, + server_name, + github_org, + github_repo, + artifact_keys, # Pass existing artifact keys + ) + # Add to benchmark stream even when reusing artifacts + if result is True: + arch_specific_stream = get_arch_specific_stream_name(build_arch) + logging.info( + f"Adding reused build work to architecture-specific stream: {arch_specific_stream}" + ) + benchmark_stream_id = conn.xadd( + arch_specific_stream, build_stream_fields + ) + logging.info( + "successfully reused build variant {} for redis git_sha {}. Stream id: {}".format( + id, git_hash, benchmark_stream_id + ) + ) + streamId_decoded = streamId.decode() + benchmark_stream_id_decoded = benchmark_stream_id.decode() + builder_list_completed = ( + f"builder:{streamId_decoded}:builds_completed" + ) + conn.lpush(builder_list_completed, benchmark_stream_id_decoded) + conn.expire(builder_list_completed, REDIS_BINS_EXPIRE_SECS) + logging.info( + f"Adding information of build->benchmark stream info in list {builder_list_completed}. Adding benchmark stream id: {benchmark_stream_id_decoded}" + ) + build_stream_fields_arr.append(build_stream_fields) + new_builds_count = new_builds_count + 1 + continue # Skip to next build spec + + logging.info( + f"Building artifacts for {git_hash}:{id} with build signature {build_signature}" + ) + build_start_datetime = datetime.datetime.utcnow() logging.info( "Using the following build command {}.".format(build_command) @@ -502,14 +709,20 @@ def builder_process_stream( tests_priority_lower_limit, tests_priority_upper_limit, tests_regexp, + ".*", # command_regexp - default to all commands use_git_timestamp, server_name, github_org, github_repo, + None, # existing_artifact_keys - None for new builds ) if result is True: + arch_specific_stream = get_arch_specific_stream_name(build_arch) + logging.info( + f"Adding new build work to architecture-specific stream: {arch_specific_stream}" + ) benchmark_stream_id = conn.xadd( - STREAM_KEYNAME_NEW_BUILD_EVENTS, build_stream_fields + arch_specific_stream, build_stream_fields ) logging.info( "sucessfully built build variant {} for redis git_sha {}. Stream id: {}".format( @@ -642,10 +855,12 @@ def generate_benchmark_stream_request( tests_priority_lower_limit=0, tests_priority_upper_limit=10000, tests_regexp=".*", + command_regexp=".*", use_git_timestamp=False, server_name="redis", github_org="redis", github_repo="redis", + existing_artifact_keys=None, ): build_stream_fields = { "id": id, @@ -658,6 +873,7 @@ def generate_benchmark_stream_request( "tests_priority_upper_limit": tests_priority_upper_limit, "tests_priority_lower_limit": tests_priority_lower_limit, "tests_groups_regexp": tests_groups_regexp, + "command_regexp": command_regexp, "server_name": server_name, "github_org": github_org, "github_repo": github_repo, @@ -688,21 +904,50 @@ def generate_benchmark_stream_request( if git_timestamp_ms is not None: build_stream_fields["git_timestamp_ms"] = git_timestamp_ms - prefix = f"github_org={github_org}/github_repo={github_repo}/git_branch={str(git_branch)}/git_version={str(git_version)}/git_hash={str(git_hash)}" - for artifact in build_artifacts: - bin_key = f"zipped:artifacts:{prefix}:{id}:{artifact}.zip" - if artifact == "redisearch.so": - bin_artifact = open( - f"{redis_temporary_dir}modules/redisearch/src/bin/linux-x64-release/search-community/{artifact}", - "rb", - ).read() - else: - bin_artifact = open(f"{redis_temporary_dir}src/{artifact}", "rb").read() - bin_artifact_len = len(bytes(bin_artifact)) - assert bin_artifact_len > 0 - conn.set(bin_key, bytes(bin_artifact), ex=REDIS_BINS_EXPIRE_SECS) - build_stream_fields[artifact] = bin_key - build_stream_fields["{}_len_bytes".format(artifact)] = bin_artifact_len + if existing_artifact_keys is not None: + # Use existing artifact keys (for reuse case) + for artifact in build_artifacts: + bin_key = existing_artifact_keys[artifact] + build_stream_fields[artifact] = bin_key + # Get the length from the existing artifact + bin_artifact_len = conn.strlen(bin_key) + build_stream_fields["{}_len_bytes".format(artifact)] = bin_artifact_len + else: + # Build new artifacts and store them + prefix = f"github_org={github_org}/github_repo={github_repo}/git_branch={str(git_branch)}/git_version={str(git_version)}/git_hash={str(git_hash)}" + + # Create build signature for new artifacts + import hashlib + + build_signature_parts = [ + str(id), # build config ID + str(build_command), # build command + str(build_vars_str), # environment variables + str(compiler), # compiler + str(cpp_compiler), # C++ compiler + str(build_image), # build image + str(build_os), # OS + str(build_arch), # architecture + ",".join(sorted(build_artifacts)), # artifacts list + ] + build_signature = hashlib.sha256( + ":".join(build_signature_parts).encode() + ).hexdigest()[:16] + + for artifact in build_artifacts: + bin_key = f"zipped:artifacts:{prefix}:{id}:{build_signature}:{artifact}.zip" + if artifact == "redisearch.so": + bin_artifact = open( + f"{redis_temporary_dir}modules/redisearch/src/bin/linux-x64-release/search-community/{artifact}", + "rb", + ).read() + else: + bin_artifact = open(f"{redis_temporary_dir}src/{artifact}", "rb").read() + bin_artifact_len = len(bytes(bin_artifact)) + assert bin_artifact_len > 0 + conn.set(bin_key, bytes(bin_artifact), ex=REDIS_BINS_EXPIRE_SECS) + build_stream_fields[artifact] = bin_key + build_stream_fields["{}_len_bytes".format(artifact)] = bin_artifact_len result = True if b"platform" in testDetails: build_stream_fields["platform"] = testDetails[b"platform"] diff --git a/redis_benchmarks_specification/__cli__/args.py b/redis_benchmarks_specification/__cli__/args.py index ebbf00a5..86d1adf4 100644 --- a/redis_benchmarks_specification/__cli__/args.py +++ b/redis_benchmarks_specification/__cli__/args.py @@ -138,7 +138,7 @@ def spec_cli_args(parser): parser.add_argument("--gh_repo", type=str, default="redis") parser.add_argument("--server_name", type=str, default=None) parser.add_argument("--run_image", type=str, default="redis") - parser.add_argument("--build_arch", type=str, default=None) + parser.add_argument("--arch", type=str, default="amd64") parser.add_argument("--id", type=str, default="dockerhub") parser.add_argument("--mnt_point", type=str, default="") parser.add_argument("--trigger-unstable-commits", type=bool, default=True) @@ -217,4 +217,10 @@ def spec_cli_args(parser): default=-1, help="Wait x sections for build. If -1, waits forever.", ) + parser.add_argument( + "--command-regex", + type=str, + default=".*", + help="Filter tests by command using regex. Only tests that include commands matching this regex will be processed.", + ) return parser diff --git a/redis_benchmarks_specification/__cli__/cli.py b/redis_benchmarks_specification/__cli__/cli.py index c0331c9e..eb315a20 100644 --- a/redis_benchmarks_specification/__cli__/cli.py +++ b/redis_benchmarks_specification/__cli__/cli.py @@ -44,6 +44,7 @@ STREAM_KEYNAME_GH_EVENTS_COMMIT, STREAM_GH_EVENTS_COMMIT_BUILDERS_CG, STREAM_KEYNAME_NEW_BUILD_EVENTS, + get_arch_specific_stream_name, ) from redis_benchmarks_specification.__common__.package import ( get_version_string, @@ -84,7 +85,7 @@ def trigger_tests_dockerhub_cli_command_logic(args, project_name, project_versio args.id, conn, args.run_image, - args.build_arch, + args.arch, testDetails, "n/a", [], @@ -104,6 +105,12 @@ def trigger_tests_dockerhub_cli_command_logic(args, project_name, project_versio 0, 10000, args.tests_regexp, + ".*", # command_regexp + False, # use_git_timestamp + "redis", # server_name + "redis", # github_org + "redis", # github_repo + None, # existing_artifact_keys ) build_stream_fields["github_repo"] = args.gh_repo build_stream_fields["github_org"] = args.gh_org @@ -118,9 +125,12 @@ def trigger_tests_dockerhub_cli_command_logic(args, project_name, project_versio store_airgap_image_redis(conn, docker_client, args.run_image) if result is True: - benchmark_stream_id = conn.xadd( - STREAM_KEYNAME_NEW_BUILD_EVENTS, build_stream_fields + # Use architecture-specific stream + arch_specific_stream = get_arch_specific_stream_name(args.arch) + logging.info( + f"CLI adding work to architecture-specific stream: {arch_specific_stream}" ) + benchmark_stream_id = conn.xadd(arch_specific_stream, build_stream_fields) logging.info( "sucessfully requested a new run {}. Stream id: {}".format( build_stream_fields, benchmark_stream_id @@ -432,9 +442,9 @@ def trigger_tests_cli_command_logic(args, project_name, project_version): commit_dict["tests_groups_regexp"] = tests_groups_regexp commit_dict["github_org"] = args.gh_org commit_dict["github_repo"] = args.gh_repo - if args.build_arch is not None: - commit_dict["build_arch"] = args.build_arch - commit_dict["arch"] = args.build_arch + if args.arch is not None: + commit_dict["build_arch"] = args.arch + commit_dict["arch"] = args.arch if args.server_name is not None and args.server_name != "": commit_dict["server_name"] = args.server_name if args.build_artifacts != "": diff --git a/redis_benchmarks_specification/__common__/env.py b/redis_benchmarks_specification/__common__/env.py index 4ed76884..63f6c258 100644 --- a/redis_benchmarks_specification/__common__/env.py +++ b/redis_benchmarks_specification/__common__/env.py @@ -32,6 +32,20 @@ "STREAM_KEYNAME_NEW_BUILD_EVENTS", "oss:api:gh/redis/redis/builds" ) + +# Function to get architecture-specific build events stream name +def get_arch_specific_stream_name(arch): + """Get architecture-specific stream name for build events""" + base_stream = STREAM_KEYNAME_NEW_BUILD_EVENTS + if arch in ["amd64", "x86_64"]: + return f"{base_stream}:amd64" + elif arch in ["arm64", "aarch64"]: + return f"{base_stream}:arm64" + else: + # Fallback to base stream for unknown architectures + return base_stream + + STREAM_GH_NEW_BUILD_RUNNERS_CG = os.getenv( "STREAM_GH_NEW_BUILD_RUNNERS_CG", "runners-cg:redis/redis/commits" ) diff --git a/redis_benchmarks_specification/__common__/runner.py b/redis_benchmarks_specification/__common__/runner.py index d9b06f8e..2481aef8 100644 --- a/redis_benchmarks_specification/__common__/runner.py +++ b/redis_benchmarks_specification/__common__/runner.py @@ -101,7 +101,7 @@ def get_benchmark_specs(testsuites_folder, test="", test_regex=".*"): for test_name in original_files: match_obj = re.search(test_regexp_string, test_name) if match_obj is None: - logging.info( + logging.debug( "Skipping test file: {} given it does not match regex {}".format( test_name, test_regexp_string ) @@ -291,13 +291,7 @@ def export_redis_metrics( metric_name, metric_value, ) in overall_end_time_metrics.items(): - tsname_metric = "{}/{}/{}/benchmark_end/{}/{}".format( - sprefix, - test_name, - by_variant, - setup_name, - metric_name, - ) + tsname_metric = f"{sprefix}/{test_name}/{by_variant}/benchmark_end/{running_platform}/{setup_name}/{metric_name}" logging.debug( "Adding a redis server side metric collected at the end of benchmark." @@ -404,6 +398,7 @@ def exporter_datasink_common( running_platform, None, git_hash, + disable_target_tables=True, ) if collect_memory_metrics: logging.info("Collecting memory metrics") diff --git a/redis_benchmarks_specification/__common__/suppress_warnings.py b/redis_benchmarks_specification/__common__/suppress_warnings.py new file mode 100644 index 00000000..c37274af --- /dev/null +++ b/redis_benchmarks_specification/__common__/suppress_warnings.py @@ -0,0 +1,20 @@ +""" +Warning suppression module that should be imported first to suppress known warnings. +""" + +import warnings + +# Suppress cryptography deprecation warnings from paramiko +warnings.filterwarnings("ignore", category=DeprecationWarning, module="paramiko") +warnings.filterwarnings("ignore", message=".*TripleDES.*", category=DeprecationWarning) +warnings.filterwarnings( + "ignore", message=".*cryptography.*", category=DeprecationWarning +) + +# Also suppress the specific CryptographyDeprecationWarning if it exists +try: + from cryptography.utils import CryptographyDeprecationWarning + + warnings.filterwarnings("ignore", category=CryptographyDeprecationWarning) +except ImportError: + pass diff --git a/redis_benchmarks_specification/__common__/timeseries.py b/redis_benchmarks_specification/__common__/timeseries.py index e09c35aa..4b6fb843 100644 --- a/redis_benchmarks_specification/__common__/timeseries.py +++ b/redis_benchmarks_specification/__common__/timeseries.py @@ -838,34 +838,47 @@ def common_exporter_logic( and artifact_version != "" and artifact_version != "N/A" ): - # extract per-version datapoints - total_hs_ts = len(per_hash_time_series_dict.keys()) - logging.info( - f"Extending the by.hash {git_hash} timeseries ({total_hs_ts}) with version info {artifact_version}" - ) - for hash_timeserie in per_hash_time_series_dict.values(): - hash_timeserie["labels"]["version"] = artifact_version - ( - _, - per_version_time_series_dict, - version_target_tables, - ) = extract_perversion_timeseries_from_results( - used_ts, - metrics, - results_dict, - artifact_version, - tf_github_org, - tf_github_repo, - deployment_name, - deployment_type, - test_name, - tf_triggering_env, - metadata_tags, - build_variant_name, - running_platform, - testcase_metric_context_paths, - ) - total_break_by_added += 1 + # Check if version 255.255.255 should only be pushed for unstable branch + should_push_version = True + if artifact_version == "255.255.255": + if tf_github_branch != "unstable": + logging.info( + f"Skipping version 255.255.255 data push for branch '{tf_github_branch}' " + f"(only pushing for 'unstable' branch)" + ) + should_push_version = False + else: + logging.info(f"Pushing version 255.255.255 data for unstable branch") + + if should_push_version: + # extract per-version datapoints + total_hs_ts = len(per_hash_time_series_dict.keys()) + logging.info( + f"Extending the by.hash {git_hash} timeseries ({total_hs_ts}) with version info {artifact_version}" + ) + for hash_timeserie in per_hash_time_series_dict.values(): + hash_timeserie["labels"]["version"] = artifact_version + ( + _, + per_version_time_series_dict, + version_target_tables, + ) = extract_perversion_timeseries_from_results( + used_ts, + metrics, + results_dict, + artifact_version, + tf_github_org, + tf_github_repo, + deployment_name, + deployment_type, + test_name, + tf_triggering_env, + metadata_tags, + build_variant_name, + running_platform, + testcase_metric_context_paths, + ) + total_break_by_added += 1 else: logging.warning( "there was no git VERSION information to push data brokedown by VERSION" @@ -1054,6 +1067,9 @@ def add_standardized_metric_bybranch( labels["deployment_name+branch"] = "{} {}".format( deployment_name, tf_github_branch ) + labels["running_platform+branch"] = "{} {}".format( + running_platform, tf_github_branch + ) labels["test_name"] = str(test_name) labels["metric"] = str(metric_name) logging.info( @@ -1118,11 +1134,15 @@ def add_standardized_metric_byversion( tf_triggering_env, metadata_tags, build_variant_name, + running_platform, ) labels["version"] = artifact_version labels["deployment_name+version"] = "{} {}".format( deployment_name, artifact_version ) + labels["running_platform+version"] = "{} {}".format( + running_platform, artifact_version + ) labels["test_name"] = str(test_name) labels["metric"] = str(metric_name) logging.info( @@ -1169,6 +1189,7 @@ def timeseries_test_sucess_flow( running_platform=None, timeseries_dict=None, git_hash=None, + disable_target_tables=False, ): testcase_metric_context_paths = [] version_target_tables = None @@ -1205,7 +1226,7 @@ def timeseries_test_sucess_flow( ) ) push_data_to_redistimeseries(rts, timeseries_dict) - if version_target_tables is not None: + if not disable_target_tables and version_target_tables is not None: logging.info( "There are a total of {} distinct target tables by version".format( len(version_target_tables.keys()) @@ -1225,7 +1246,12 @@ def timeseries_test_sucess_flow( rts.hset( version_target_table_keyname, None, None, version_target_table_dict ) - if branch_target_tables is not None: + elif disable_target_tables: + logging.info( + "Target tables disabled - skipping version target table creation" + ) + + if not disable_target_tables and branch_target_tables is not None: logging.info( "There are a total of {} distinct target tables by branch".format( len(branch_target_tables.keys()) @@ -1246,6 +1272,10 @@ def timeseries_test_sucess_flow( rts.hset( branch_target_table_keyname, None, None, branch_target_table_dict ) + elif disable_target_tables: + logging.info( + "Target tables disabled - skipping branch target table creation" + ) if test_name is not None: if type(test_name) is str: update_secondary_result_keys( diff --git a/redis_benchmarks_specification/__compare__/args.py b/redis_benchmarks_specification/__compare__/args.py index 946a7cb2..5e014b9d 100644 --- a/redis_benchmarks_specification/__compare__/args.py +++ b/redis_benchmarks_specification/__compare__/args.py @@ -46,6 +46,30 @@ def create_compare_arguments(parser): default="", help="specify a test (or a comma separated list of tests) to use for comparison. If none is specified by default will use all of them.", ) + parser.add_argument( + "--extra-filters", + type=str, + default="", + help="specify extra filters to pass to baseline and comparison.", + ) + parser.add_argument( + "--use-test-suites-folder", + action="store_true", + default=False, + help="Use test names from YAML files in test-suites folder instead of database", + ) + parser.add_argument( + "--generate-boxplot", + action="store_true", + default=False, + help="Generate box plot showing performance change distribution per command", + ) + parser.add_argument( + "--boxplot-output", + type=str, + default="command_performance_boxplot.png", + help="Output filename for the box plot (supports .png, .svg, .pdf)", + ) parser.add_argument( "--defaults_filename", type=str, @@ -155,6 +179,20 @@ def create_compare_arguments(parser): parser.add_argument("--simple-table", type=bool, default=False) parser.add_argument("--use_metric_context_path", type=bool, default=False) parser.add_argument("--testname_regex", type=str, default=".*", required=False) + parser.add_argument( + "--command-group-regex", + type=str, + default=".*", + required=False, + help="Filter commands by command group using regex. Only commands belonging to matching groups will be included in boxplot and summary.", + ) + parser.add_argument( + "--command-regex", + type=str, + default=".*", + required=False, + help="Filter tests by command using regex. Only tests that include commands matching this regex will be processed.", + ) parser.add_argument( "--regression_str", type=str, default="REGRESSION", required=False ) diff --git a/redis_benchmarks_specification/__compare__/compare.py b/redis_benchmarks_specification/__compare__/compare.py index 3b3e624b..777666eb 100644 --- a/redis_benchmarks_specification/__compare__/compare.py +++ b/redis_benchmarks_specification/__compare__/compare.py @@ -15,10 +15,29 @@ import os from tqdm import tqdm import argparse +import numpy as np from io import StringIO import sys +# Import command categorization function +try: + from utils.summary import categorize_command +except ImportError: + # Fallback if utils.summary is not available + def categorize_command(command): + return "unknown" + + +# Optional matplotlib import for box plot generation +try: + import matplotlib.pyplot as plt + + MATPLOTLIB_AVAILABLE = True +except ImportError: + MATPLOTLIB_AVAILABLE = False + logging.warning("matplotlib not available, box plot generation will be disabled") + from redis_benchmarks_specification.__common__.github import ( update_comment_if_needed, create_new_pr_comment, @@ -355,6 +374,8 @@ def compare_command_logic(args, project_name, project_version): total_stable, total_unstable, total_comparison_points, + boxplot_data, + command_change, ) = compute_regression_table( rts, tf_github_org, @@ -399,6 +420,11 @@ def compare_command_logic(args, project_name, project_version): args.regression_str, args.improvement_str, tests_with_config, + args.use_test_suites_folder, + testsuites_folder, + args.extra_filters, + getattr(args, "command_group_regex", ".*"), + getattr(args, "command_regex", ".*"), ) total_regressions = len(regressions_list) total_improvements = len(improvements_list) @@ -432,7 +458,26 @@ def compare_command_logic(args, project_name, project_version): args.regressions_percent_lower_limit, regressions_list, improvements_list, + args.improvement_str, + args.regression_str, ) + + # Generate box plot if requested + if args.generate_boxplot and command_change: + if MATPLOTLIB_AVAILABLE: + logging.info(f"Generating box plot with {len(command_change)} commands...") + generate_command_performance_boxplot_from_command_data( + command_change, + args.boxplot_output, + args.regression_str, + args.improvement_str, + getattr(args, "command_group_regex", ".*"), + ) + else: + logging.error( + "Box plot generation requested but matplotlib is not available" + ) + return ( detected_regressions, "", @@ -474,6 +519,8 @@ def prepare_regression_comment( regressions_percent_lower_limit, regressions_list=[], improvements_list=[], + improvement_str="Improvement", + regression_str="Regression", ): if total_comparison_points > 0: comment_body = "### Automated performance analysis summary\n\n" @@ -513,21 +560,24 @@ def prepare_regression_comment( ) ) if total_improvements > 0: - comparison_summary += "- Detected a total of {} improvements above the improvement water line.\n".format( - total_improvements + comparison_summary += "- Detected a total of {} improvements above the improvement water line ({}).\n".format( + total_improvements, improvement_str ) if len(improvements_list) > 0: - regression_values = [l[1] for l in improvements_list] - regression_df = pd.DataFrame(regression_values) - median_regression = round(float(regression_df.median().iloc[0]), 1) - max_regression = round(float(regression_df.max().iloc[0]), 1) - min_regression = round(float(regression_df.min().iloc[0]), 1) + improvement_values = [l[1] for l in improvements_list] + improvement_df = pd.DataFrame(improvement_values) + median_improvement = round(float(improvement_df.median().iloc[0]), 1) + max_improvement = round(float(improvement_df.max().iloc[0]), 1) + min_improvement = round(float(improvement_df.min().iloc[0]), 1) + p25_improvement = round(float(improvement_df.quantile(0.25).iloc[0]), 1) + p75_improvement = round(float(improvement_df.quantile(0.75).iloc[0]), 1) - comparison_summary += f" - Median/Common-Case improvement was {median_regression}% and ranged from [{min_regression}%,{max_regression}%].\n" + comparison_summary += f" - The median improvement ({improvement_str}) was {median_improvement}%, with values ranging from {min_improvement}% to {max_improvement}%.\n" + comparison_summary += f" - Quartile distribution: P25={p25_improvement}%, P50={median_improvement}%, P75={p75_improvement}%.\n" if total_regressions > 0: - comparison_summary += "- Detected a total of {} regressions bellow the regression water line {}.\n".format( - total_regressions, regressions_percent_lower_limit + comparison_summary += "- Detected a total of {} regressions below the regression water line of {} ({}).\n".format( + total_regressions, regressions_percent_lower_limit, regression_str ) if len(regressions_list) > 0: regression_values = [l[1] for l in regressions_list] @@ -535,8 +585,11 @@ def prepare_regression_comment( median_regression = round(float(regression_df.median().iloc[0]), 1) max_regression = round(float(regression_df.max().iloc[0]), 1) min_regression = round(float(regression_df.min().iloc[0]), 1) + p25_regression = round(float(regression_df.quantile(0.25).iloc[0]), 1) + p75_regression = round(float(regression_df.quantile(0.75).iloc[0]), 1) - comparison_summary += f" - Median/Common-Case regression was {median_regression}% and ranged from [{min_regression}%,{max_regression}%].\n" + comparison_summary += f" - The median regression ({regression_str}) was {median_regression}%, with values ranging from {min_regression}% to {max_regression}%.\n" + comparison_summary += f" - Quartile distribution: P25={p25_regression}%, P50={median_regression}%, P75={p75_regression}%.\n" comment_body += comparison_summary comment_body += "\n" @@ -686,6 +739,11 @@ def compute_regression_table( regression_str="REGRESSION", improvement_str="IMPROVEMENT", tests_with_config={}, + use_test_suites_folder=False, + test_suites_folder=None, + extra_filters="", + command_group_regex=".*", + command_regex=".*", ): START_TIME_NOW_UTC, _, _ = get_start_time_vars() START_TIME_LAST_MONTH_UTC = START_TIME_NOW_UTC - datetime.timedelta(days=31) @@ -746,10 +804,18 @@ def compute_regression_table( if test != "": test_names = test.split(",") logging.info("Using test name {}".format(test_names)) + elif use_test_suites_folder: + test_names = get_test_names_from_yaml_files( + test_suites_folder, tags_regex_string + ) else: test_names = get_test_names_from_db( rts, tags_regex_string, test_names, used_key ) + + # Apply command regex filtering to tests_with_config + tests_with_config = filter_tests_by_command_regex(tests_with_config, command_regex) + ( detected_regressions, table_full, @@ -770,6 +836,7 @@ def compute_regression_table( no_datapoints_list, group_change, command_change, + boxplot_data, ) = from_rts_to_regression_table( baseline_deployment_name, comparison_deployment_name, @@ -803,6 +870,7 @@ def compute_regression_table( regression_str, improvement_str, tests_with_config, + extra_filters, ) logging.info( "Printing differential analysis between {} and {}".format( @@ -818,19 +886,40 @@ def compute_regression_table( ) table_output += "
\n By GROUP change csv:\n\n" - table_output += "\ncommand_group,min_change,max_change \n" + table_output += ( + "\ncommand_group,min_change,q1_change,median_change,q3_change,max_change \n" + ) for group_name, changes_list in group_change.items(): - max_change = max(changes_list) min_change = min(changes_list) - table_output += f"{group_name},{min_change:.3f},{max_change:.3f}\n" + q1_change = np.percentile(changes_list, 25) + median_change = np.median(changes_list) + q3_change = np.percentile(changes_list, 75) + max_change = max(changes_list) + table_output += f"{group_name},{min_change:.3f},{q1_change:.3f},{median_change:.3f},{q3_change:.3f},{max_change:.3f}\n" table_output += "\n
\n" table_output += "\n\n" table_output += "
\n By COMMAND change csv:\n\n" - table_output += "\ncommand,min_change,max_change \n" - for command_name, changes_list in command_change.items(): - max_change = max(changes_list) + table_output += ( + "\ncommand,min_change,q1_change,median_change,q3_change,max_change \n" + ) + + # Filter commands by command group regex if specified + filtered_command_change = command_change + if command_group_regex != ".*": + group_regex = re.compile(command_group_regex) + filtered_command_change = {} + for command_name, changes_list in command_change.items(): + command_group = categorize_command(command_name.lower()) + if re.search(group_regex, command_group): + filtered_command_change[command_name] = changes_list + + for command_name, changes_list in filtered_command_change.items(): min_change = min(changes_list) - table_output += f"{command_name},{min_change:.3f},{max_change:.3f}\n" + q1_change = np.percentile(changes_list, 25) + median_change = np.median(changes_list) + q3_change = np.percentile(changes_list, 75) + max_change = max(changes_list) + table_output += f"{command_name},{min_change:.3f},{q1_change:.3f},{median_change:.3f},{q3_change:.3f},{max_change:.3f}\n" table_output += "\n
\n" if total_unstable > 0: @@ -954,6 +1043,8 @@ def compute_regression_table( total_stable, total_unstable, total_comparison_points, + boxplot_data, + command_change, ) @@ -1067,11 +1158,11 @@ def get_by_strings( if comparison_hash is not None: # check if we had already covered comparison - if comparison_covered: - logging.error( - "--comparison-branch, --comparison-tag, --comparison-hash, --comparison-target-branch, and --comparison-target-table are mutually exclusive. Pick one..." - ) - exit(1) + # if comparison_covered: + # logging.error( + # "--comparison-branch, --comparison-tag, --comparison-hash, --comparison-target-branch, and --comparison-target-table are mutually exclusive. Pick one..." + # ) + # exit(1) comparison_covered = True by_str_comparison = "hash" comparison_str = comparison_hash @@ -1124,6 +1215,7 @@ def from_rts_to_regression_table( regression_str="REGRESSION", improvement_str="IMPROVEMENT", tests_with_config={}, + extra_filters="", ): print_all = print_regressions_only is False and print_improvements_only is False table_full = [] @@ -1150,6 +1242,9 @@ def from_rts_to_regression_table( group_change = {} command_change = {} original_metric_mode = metric_mode + + # Data collection for box plot + boxplot_data = [] for test_name in test_names: tested_groups = [] tested_commands = [] @@ -1176,6 +1271,8 @@ def from_rts_to_regression_table( "github_repo={}".format(baseline_github_repo), "triggering_env={}".format(tf_triggering_env_baseline), ] + if extra_filters != "": + filters_baseline.append(extra_filters) if baseline_str != "": filters_baseline.append("{}={}".format(by_str_baseline, baseline_str)) if baseline_deployment_name != "": @@ -1200,6 +1297,8 @@ def from_rts_to_regression_table( filters_comparison.append( "deployment_name={}".format(comparison_deployment_name) ) + if extra_filters != "": + filters_comparison.append(extra_filters) if comparison_github_org != "": filters_comparison.append(f"github_org={comparison_github_org}") if "hash" not in by_str_baseline: @@ -1362,10 +1461,18 @@ def from_rts_to_regression_table( unstable_list.append([test_name, "n/a"]) baseline_v_str = prepare_value_str( - baseline_pct_change, baseline_v, baseline_values, simplify_table + baseline_pct_change, + baseline_v, + baseline_values, + simplify_table, + metric_name, ) comparison_v_str = prepare_value_str( - comparison_pct_change, comparison_v, comparison_values, simplify_table + comparison_pct_change, + comparison_v, + comparison_values, + simplify_table, + metric_name, ) if metric_mode == "higher-better": @@ -1377,6 +1484,9 @@ def from_rts_to_regression_table( percentage_change = ( -(float(baseline_v) - float(comparison_v)) / float(baseline_v) ) * 100.0 + + # Collect data for box plot + boxplot_data.append((test_name, percentage_change)) else: logging.warn( f"Missing data for test {test_name}. baseline_v={baseline_v} (pct_change={baseline_pct_change}), comparison_v={comparison_v} (pct_change={comparison_pct_change}) " @@ -1540,6 +1650,7 @@ def from_rts_to_regression_table( no_datapoints_list, group_change, command_change, + boxplot_data, ) @@ -1571,13 +1682,47 @@ def check_multi_value_filter(baseline_str): return multi_value_baseline -def prepare_value_str(baseline_pct_change, baseline_v, baseline_values, simplify_table): - if baseline_v < 1.0: - baseline_v_str = " {:.2f}".format(baseline_v) - elif baseline_v < 10.0: - baseline_v_str = " {:.1f}".format(baseline_v) +def is_latency_metric(metric_name): + """Check if a metric represents latency and should use 3-digit precision""" + latency_indicators = [ + "latency", + "percentile", + "usec", + "msec", + "overallQuantiles", + "latencystats", + "p50", + "p95", + "p99", + "p999", + ] + metric_name_lower = metric_name.lower() + return any(indicator in metric_name_lower for indicator in latency_indicators) + + +def prepare_value_str( + baseline_pct_change, baseline_v, baseline_values, simplify_table, metric_name="" +): + """Prepare value string with appropriate precision based on metric type""" + # Use 3-digit precision for latency metrics + if is_latency_metric(metric_name): + if baseline_v < 1.0: + baseline_v_str = " {:.3f}".format(baseline_v) + elif baseline_v < 10.0: + baseline_v_str = " {:.3f}".format(baseline_v) + elif baseline_v < 100.0: + baseline_v_str = " {:.3f}".format(baseline_v) + else: + baseline_v_str = " {:.3f}".format(baseline_v) else: - baseline_v_str = " {:.0f}".format(baseline_v) + # Original formatting for non-latency metrics + if baseline_v < 1.0: + baseline_v_str = " {:.2f}".format(baseline_v) + elif baseline_v < 10.0: + baseline_v_str = " {:.1f}".format(baseline_v) + else: + baseline_v_str = " {:.0f}".format(baseline_v) + stamp_b = "" if baseline_pct_change > 10.0: stamp_b = "UNSTABLE " @@ -1620,6 +1765,444 @@ def get_test_names_from_db(rts, tags_regex_string, test_names, used_key): return test_names +def filter_tests_by_command_regex(tests_with_config, command_regex=".*"): + """Filter tests based on command regex matching tested-commands""" + if command_regex == ".*": + return tests_with_config + + logging.info(f"Filtering tests by command regex: {command_regex}") + command_regex_compiled = re.compile(command_regex, re.IGNORECASE) + filtered_tests = {} + + for test_name, test_config in tests_with_config.items(): + tested_commands = test_config.get("tested-commands", []) + + # Check if any tested command matches the regex + command_match = False + for command in tested_commands: + if re.search(command_regex_compiled, command): + command_match = True + logging.info(f"Including test {test_name} (matches command: {command})") + break + + if command_match: + filtered_tests[test_name] = test_config + else: + logging.info(f"Excluding test {test_name} (commands: {tested_commands})") + + logging.info( + f"Command regex filtering: {len(filtered_tests)} tests remaining out of {len(tests_with_config)}" + ) + return filtered_tests + + +def get_test_names_from_yaml_files(test_suites_folder, tags_regex_string): + """Get test names from YAML files in test-suites folder""" + from redis_benchmarks_specification.__common__.runner import get_benchmark_specs + + # Get all YAML files + yaml_files = get_benchmark_specs(test_suites_folder, test="", test_regex=".*") + + # Extract test names (remove path and .yml extension) + test_names = [] + for yaml_file in yaml_files: + test_name = os.path.basename(yaml_file).replace(".yml", "") + # Apply regex filtering like database version + match_obj = re.search(tags_regex_string, test_name) + if match_obj is not None: + test_names.append(test_name) + + test_names.sort() + logging.info( + "Based on test-suites folder ({}) we have {} comparison points: {}".format( + test_suites_folder, len(test_names), test_names + ) + ) + return test_names + + +def extract_command_from_test_name(test_name): + """Extract Redis command from test name""" + # Common patterns in test names + test_name_lower = test_name.lower() + + # Handle specific patterns + if "memtier_benchmark" in test_name_lower: + # Look for command patterns in memtier test names + for cmd in [ + "get", + "set", + "hget", + "hset", + "hgetall", + "hmset", + "hmget", + "hdel", + "hexists", + "hkeys", + "hvals", + "hincrby", + "hincrbyfloat", + "hsetnx", + "hscan", + "multi", + "exec", + ]: + if cmd in test_name_lower: + return cmd.upper() + + # Try to extract command from test name directly + parts = test_name.split("-") + for part in parts: + part_upper = part.upper() + # Check if it looks like a Redis command + if len(part_upper) >= 3 and part_upper.isalpha(): + return part_upper + + return "UNKNOWN" + + +def generate_command_performance_boxplot_from_command_data( + command_change, + output_filename, + regression_str="Regression", + improvement_str="Improvement", + command_group_regex=".*", +): + """Generate vertical box plot showing performance change distribution per command using command_change data""" + if not MATPLOTLIB_AVAILABLE: + logging.error("matplotlib not available, cannot generate box plot") + return + + try: + if not command_change: + logging.warning("No command data found for box plot generation") + return + + # Filter commands by command group regex + if command_group_regex != ".*": + logging.info( + f"Filtering commands by command group regex: {command_group_regex}" + ) + group_regex = re.compile(command_group_regex) + filtered_command_change = {} + + for cmd, changes in command_change.items(): + command_group = categorize_command(cmd.lower()) + if re.search(group_regex, command_group): + filtered_command_change[cmd] = changes + logging.info(f"Including command {cmd} (group: {command_group})") + else: + logging.info(f"Excluding command {cmd} (group: {command_group})") + + command_change = filtered_command_change + + if not command_change: + logging.warning( + f"No commands found matching command group regex: {command_group_regex}" + ) + return + + logging.info(f"After filtering: {len(command_change)} commands remaining") + + # Sort commands by median performance change for better visualization + commands_with_median = [ + (cmd, np.median(changes)) for cmd, changes in command_change.items() + ] + commands_with_median.sort(key=lambda x: x[1]) + commands = [cmd for cmd, _ in commands_with_median] + + # Prepare data for plotting (vertical orientation) + data_for_plot = [command_change[cmd] for cmd in commands] + + # Create labels with test count + labels_with_count = [ + f"{cmd}\n({len(command_change[cmd])} tests)" for cmd in commands + ] + + # Create the plot (vertical orientation) + plt.figure(figsize=(10, 16)) + + # Create horizontal box plot (which makes it vertical when we rotate) + positions = range(1, len(commands) + 1) + box_plot = plt.boxplot( + data_for_plot, + positions=positions, + patch_artist=True, + showfliers=True, + flierprops={"marker": "o", "markersize": 4}, + vert=False, + ) # vert=False makes it horizontal (commands on Y-axis) + + # Color the boxes and add value annotations + for i, (patch, cmd) in enumerate(zip(box_plot["boxes"], commands)): + changes = command_change[cmd] + median_change = np.median(changes) + min_change = min(changes) + max_change = max(changes) + + # Color based on median performance + if median_change > 0: + patch.set_facecolor("lightcoral") # Red for improvements + patch.set_alpha(0.7) + else: + patch.set_facecolor("lightblue") # Blue for degradations + patch.set_alpha(0.7) + + # Store values for later annotation (after xlim is set) + y_pos = i + 1 # Position corresponds to the box position + + # Store annotation data for after xlim is set + if not hasattr(plt, "_annotation_data"): + plt._annotation_data = [] + plt._annotation_data.append( + { + "y_pos": y_pos, + "min_change": min_change, + "median_change": median_change, + "max_change": max_change, + } + ) + + # Calculate optimal x-axis limits for maximum visibility + all_values = [] + for changes in command_change.values(): + all_values.extend(changes) + + if all_values: + data_min = min(all_values) + data_max = max(all_values) + + logging.info(f"Box plot data range: {data_min:.3f}% to {data_max:.3f}%") + + # Add minimal padding - tight to the data + data_range = data_max - data_min + if data_range == 0: + # If all values are the same, add minimal symmetric padding + padding = max(abs(data_min) * 0.05, 0.5) # At least 5% or 0.5 + x_min = data_min - padding + x_max = data_max + padding + else: + # Add minimal padding: 2% on each side + padding = data_range * 0.02 + x_min = data_min - padding + x_max = data_max + padding + + # Only include 0 if it's actually within or very close to the data range + if data_min <= 0 <= data_max: + # 0 is within the data range, keep current limits + pass + elif data_min > 0 and data_min < data_range * 0.1: + # All positive values, but 0 is very close - include it + x_min = 0 + elif data_max < 0 and abs(data_max) < data_range * 0.1: + # All negative values, but 0 is very close - include it + x_max = 0 + + plt.xlim(x_min, x_max) + logging.info(f"Box plot x-axis limits set to: {x_min:.3f}% to {x_max:.3f}%") + + # Add vertical line at 0% (only if 0 is visible) + current_xlim = plt.xlim() + if current_xlim[0] <= 0 <= current_xlim[1]: + plt.axvline(x=0, color="black", linestyle="-", linewidth=1, alpha=0.8) + + # Add background shading with current limits + x_min, x_max = plt.xlim() + if x_max > 0: + plt.axvspan(max(0, x_min), x_max, alpha=0.1, color="red") + if x_min < 0: + plt.axvspan(x_min, min(0, x_max), alpha=0.1, color="blue") + + # Add value annotations within the plot area + if hasattr(plt, "_annotation_data"): + x_range = x_max - x_min + for data in plt._annotation_data: + y_pos = data["y_pos"] + min_change = data["min_change"] + median_change = data["median_change"] + max_change = data["max_change"] + + # Position annotations inside the plot area + # Use the actual values' positions with small offsets + offset = x_range * 0.01 # Small offset for readability + + # Position each annotation near its corresponding value + plt.text( + max_change + offset, + y_pos + 0.15, + f"{max_change:.1f}%", + fontsize=7, + va="center", + ha="left", + color="darkred", + weight="bold", + bbox=dict( + boxstyle="round,pad=0.2", + facecolor="white", + alpha=0.8, + edgecolor="none", + ), + ) + plt.text( + median_change + offset, + y_pos, + f"{median_change:.1f}%", + fontsize=7, + va="center", + ha="left", + color="black", + weight="bold", + bbox=dict( + boxstyle="round,pad=0.2", + facecolor="yellow", + alpha=0.8, + edgecolor="none", + ), + ) + plt.text( + min_change + offset, + y_pos - 0.15, + f"{min_change:.1f}%", + fontsize=7, + va="center", + ha="left", + color="darkblue", + weight="bold", + bbox=dict( + boxstyle="round,pad=0.2", + facecolor="white", + alpha=0.8, + edgecolor="none", + ), + ) + + # Clean up the temporary data + delattr(plt, "_annotation_data") + + # Set Y-axis labels (commands) + plt.yticks(positions, labels_with_count, fontsize=10) + + # Customize the plot + title = f"Performance Change Distribution by Redis Command\nRedis is better ← | → Valkey is better" + plt.title(title, fontsize=14, fontweight="bold", pad=20) + plt.xlabel("Performance Change (%)", fontsize=12) + plt.ylabel("Redis Commands", fontsize=12) + plt.grid(True, alpha=0.3, axis="x") + + # Add legend for box colors (at the bottom) + from matplotlib.patches import Patch + + legend_elements = [ + Patch( + facecolor="lightcoral", alpha=0.7, label="Positive % = Valkey is better" + ), + Patch( + facecolor="lightblue", alpha=0.7, label="Negative % = Redis is better" + ), + ] + plt.legend( + handles=legend_elements, + bbox_to_anchor=(0.5, -0.05), + loc="upper center", + fontsize=10, + ncol=2, + ) + + # Add statistics text + total_commands = len(command_change) + total_measurements = sum(len(changes) for changes in command_change.values()) + plt.figtext( + 0.02, + 0.02, + f"Commands: {total_commands} | Total measurements: {total_measurements}", + fontsize=10, + style="italic", + ) + + # Adjust layout and save + plt.tight_layout() + plt.savefig(output_filename, dpi=300, bbox_inches="tight") + plt.close() + + logging.info(f"Box plot saved to {output_filename}") + + # Print summary statistics + logging.info("Command performance summary:") + for cmd in commands: + changes = command_change[cmd] + min_change = min(changes) + max_change = max(changes) + median_change = np.median(changes) + q1_change = np.percentile(changes, 25) + q3_change = np.percentile(changes, 75) + logging.info( + f" {cmd}: min={min_change:.3f}%, max={max_change:.3f}%, median={median_change:.3f}% ({len(changes)} measurements)" + ) + + # Print quartile summary for boxplot readiness + logging.info("Command performance quartile summary (boxplot ready):") + for cmd in commands: + changes = command_change[cmd] + min_change = min(changes) + q1_change = np.percentile(changes, 25) + median_change = np.median(changes) + q3_change = np.percentile(changes, 75) + max_change = max(changes) + logging.info( + f" {cmd}: min={min_change:.3f}%, Q1={q1_change:.3f}%, median={median_change:.3f}%, Q3={q3_change:.3f}%, max={max_change:.3f}%" + ) + + except Exception as e: + logging.error(f"Error generating box plot: {e}") + import traceback + + traceback.print_exc() + + +def generate_command_performance_boxplot(comparison_data, output_filename): + """Generate box plot showing performance change distribution per command""" + if not MATPLOTLIB_AVAILABLE: + logging.error("matplotlib not available, cannot generate box plot") + return + + try: + # Group data by command + command_data = {} + + for test_name, pct_change in comparison_data: + command = extract_command_from_test_name(test_name) + if command not in command_data: + command_data[command] = [] + command_data[command].append(pct_change) + + if not command_data: + logging.warning("No command data found for box plot generation") + return + + # Filter out commands with insufficient data + filtered_command_data = { + cmd: changes + for cmd, changes in command_data.items() + if len(changes) >= 1 and cmd != "UNKNOWN" + } + + if not filtered_command_data: + logging.warning("No valid command data found for box plot generation") + return + + # Use the new function with the filtered data + generate_command_performance_boxplot_from_command_data( + filtered_command_data, output_filename, command_group_regex=".*" + ) + + except Exception as e: + logging.error(f"Error generating box plot: {e}") + import traceback + + traceback.print_exc() + + def get_line( baseline_v_str, comparison_v_str, diff --git a/redis_benchmarks_specification/__runner__/args.py b/redis_benchmarks_specification/__runner__/args.py index 979931ed..930bfea1 100644 --- a/redis_benchmarks_specification/__runner__/args.py +++ b/redis_benchmarks_specification/__runner__/args.py @@ -83,6 +83,22 @@ def create_client_runner_args(project_name): default=".*", help="Interpret PATTERN as a regular expression to filter test names", ) + parser.add_argument( + "--commands-regex", + type=str, + default=".*", + help="Filter tests by command using regex. Only tests that include commands matching this regex will be processed (e.g., 'bitcount|bitpos').", + ) + parser.add_argument( + "-u", + "--uri", + type=str, + default=None, + help="Server URI on format redis://user:password@host:port/dbnum. " + "User, password and dbnum are optional. For authentication " + "without a username, use username 'default'. For TLS, use " + "the scheme 'rediss'. If provided, overrides individual host/port/password arguments.", + ) parser.add_argument("--db_server_host", type=str, default="localhost") parser.add_argument("--db_server_password", type=str, default=None) parser.add_argument("--db_server_port", type=int, default=6379) diff --git a/redis_benchmarks_specification/__runner__/runner.py b/redis_benchmarks_specification/__runner__/runner.py index 2b15590b..bba538f9 100644 --- a/redis_benchmarks_specification/__runner__/runner.py +++ b/redis_benchmarks_specification/__runner__/runner.py @@ -1,3 +1,6 @@ +# Import warning suppression first +from redis_benchmarks_specification.__common__.suppress_warnings import * + import datetime import json import logging @@ -12,27 +15,32 @@ from pathlib import Path import re import tqdm +from urllib.parse import urlparse import docker import redis from docker.models.containers import Container from pytablewriter import CsvTableWriter, MarkdownTableWriter from redisbench_admin.profilers.profilers_local import ( check_compatible_system_and_kernel_and_prepare_profile, - local_profilers_platform_checks, - profilers_start_if_required, - profilers_stop_if_required, ) from redisbench_admin.run.common import ( get_start_time_vars, merge_default_and_config_metrics, prepare_benchmark_parameters, - dbconfig_keyspacelen_check, ) from redis_benchmarks_specification.__common__.runner import ( export_redis_metrics, ) +from redisbench_admin.profilers.profilers_local import ( + local_profilers_platform_checks, + profilers_start_if_required, + profilers_stop_if_required, +) +from redisbench_admin.run.common import ( + dbconfig_keyspacelen_check, +) from redisbench_admin.run.metrics import extract_results_table from redisbench_admin.run.run import calculate_client_tool_duration_and_check from redisbench_admin.utils.benchmark_config import ( @@ -90,6 +98,166 @@ def signal_handler(signum, frame): sys.exit(1) +def parse_redis_uri(uri): + """ + Parse Redis URI and extract connection parameters. + + Args: + uri (str): Redis URI in format redis://user:password@host:port/dbnum + or rediss://user:password@host:port/dbnum for TLS + + Returns: + dict: Dictionary containing parsed connection parameters + """ + if not uri: + return {} + + try: + parsed = urlparse(uri) + + # Extract connection parameters + params = {} + + # Host (required) + if parsed.hostname: + params["host"] = parsed.hostname + + # Port (optional, defaults to 6379) + if parsed.port: + params["port"] = parsed.port + + # Username and password + if parsed.username: + params["username"] = parsed.username + if parsed.password: + params["password"] = parsed.password + + # Database number + if parsed.path and len(parsed.path) > 1: # path starts with '/' + try: + params["db"] = int(parsed.path[1:]) # Remove leading '/' + except ValueError: + logging.warning(f"Invalid database number in URI: {parsed.path[1:]}") + + # TLS detection + if parsed.scheme == "rediss": + params["tls_enabled"] = True + elif parsed.scheme == "redis": + params["tls_enabled"] = False + else: + logging.warning( + f"Unknown scheme in URI: {parsed.scheme}. Assuming non-TLS." + ) + params["tls_enabled"] = False + + logging.info( + f"Parsed Redis URI: host={params.get('host', 'N/A')}, " + f"port={params.get('port', 'N/A')}, " + f"username={params.get('username', 'N/A')}, " + f"db={params.get('db', 'N/A')}, " + f"tls={params.get('tls_enabled', False)}" + ) + + return params + + except Exception as e: + logging.error(f"Failed to parse Redis URI '{uri}': {e}") + return {} + + +def validate_benchmark_metrics( + results_dict, test_name, benchmark_config=None, default_metrics=None +): + """ + Validate benchmark metrics to ensure they contain reasonable values. + Fails the test if critical metrics indicate something is wrong. + + Args: + results_dict: Dictionary containing benchmark results + test_name: Name of the test being validated + benchmark_config: Benchmark configuration (unused, for compatibility) + default_metrics: Default metrics configuration (unused, for compatibility) + + Returns: + tuple: (is_valid, error_message) + """ + try: + # Define validation rules + throughput_patterns = [ + "ops/sec", + "qps", + "totals.ops/sec", + "all_stats.totals.ops/sec", + ] + + latency_patterns = ["p50", "p95", "p99", "p999", "percentile"] + + validation_errors = [] + + def check_nested_dict(data, path=""): + """Recursively check nested dictionary for metrics""" + if isinstance(data, dict): + for key, value in data.items(): + current_path = f"{path}.{key}" if path else key + check_nested_dict(value, current_path) + elif isinstance(data, (int, float)): + metric_path_lower = path.lower() + + # Skip Waits metrics as they can legitimately be 0 + if "waits" in metric_path_lower: + return + + # Skip general latency metrics that can legitimately be 0 + # Only validate specific percentile latencies (p50, p95, etc.) + if any( + pattern in metric_path_lower + for pattern in [ + "average latency", + "totals.latency", + "all_stats.totals.latency", + ] + ): + return + + # Check throughput metrics + for pattern in throughput_patterns: + if pattern in metric_path_lower: + if data <= 10: # Below 10 QPS threshold + validation_errors.append( + f"Throughput metric '{path}' has invalid value: {data} " + f"(below 10 QPS threshold)" + ) + break + + # Check latency metrics + for pattern in latency_patterns: + if pattern in metric_path_lower: + if data <= 0.0: # Invalid latency + validation_errors.append( + f"Latency metric '{path}' has invalid value: {data} " + f"(should be > 0.0)" + ) + break + + # Validate the results dictionary + check_nested_dict(results_dict) + + if validation_errors: + error_msg = f"Test {test_name} failed metric validation:\n" + "\n".join( + validation_errors + ) + logging.error(error_msg) + return False, error_msg + + logging.info(f"Test {test_name} passed metric validation") + return True, None + + except Exception as e: + logging.warning(f"Error during metric validation for test {test_name}: {e}") + # Don't fail the test if validation itself fails + return True, None + + def run_local_command_with_timeout(command_str, timeout_seconds, description="command"): """ Run a local command with timeout support. @@ -1316,14 +1484,15 @@ def delete_temporary_files( dry_run_include_preload = args.dry_run_include_preload defaults_filename = args.defaults_filename override_test_runs = args.override_test_runs - ( - _, - _, - default_metrics, - _, - _, - _, - ) = get_defaults(defaults_filename) + get_defaults_result = get_defaults(defaults_filename) + # Handle variable number of return values from get_defaults + if len(get_defaults_result) >= 3: + default_metrics = get_defaults_result[2] + else: + default_metrics = [] + logging.warning( + "get_defaults returned fewer values than expected, using empty default_metrics" + ) # For memory comparison mode, analyze datasets before starting if memory_comparison_only: @@ -1390,6 +1559,31 @@ def delete_temporary_files( logging.info(f"Exit requested by user. Skipping test {test_name}.") break + # Filter by command regex if specified + if hasattr(args, "commands_regex") and args.commands_regex != ".*": + if "tested-commands" in benchmark_config: + tested_commands = benchmark_config["tested-commands"] + command_regex_compiled = re.compile( + args.commands_regex, re.IGNORECASE + ) + command_match = False + for command in tested_commands: + if re.search(command_regex_compiled, command): + command_match = True + logging.info( + f"Including test {test_name} (matches command: {command})" + ) + break + if not command_match: + logging.info( + f"Skipping test {test_name} (commands: {tested_commands} do not match regex: {args.commands_regex})" + ) + continue + else: + logging.warning( + f"Test {test_name} does not contain 'tested-commands' property. Cannot filter by commands." + ) + if tls_enabled: test_name = test_name + "-tls" logging.info( @@ -1418,35 +1612,80 @@ def delete_temporary_files( build_variant_name = "NA" git_branch = None - port = args.db_server_port - host = args.db_server_host + # Parse URI if provided, otherwise use individual arguments + if hasattr(args, "uri") and args.uri: + uri_params = parse_redis_uri(args.uri) + port = uri_params.get("port", args.db_server_port) + host = uri_params.get("host", args.db_server_host) + password = uri_params.get("password", args.db_server_password) + # Override TLS setting from URI if specified + if "tls_enabled" in uri_params: + tls_enabled = uri_params["tls_enabled"] + if tls_enabled: + test_name = test_name + "-tls" + logging.info( + "TLS enabled via URI. Appending -tls to testname." + ) + # Note: username and db are handled by redis-py automatically when using URI + logging.info( + f"Using connection parameters from URI: host={host}, port={port}, tls={tls_enabled}" + ) + else: + port = args.db_server_port + host = args.db_server_host + password = args.db_server_password + logging.info( + f"Using individual connection arguments: host={host}, port={port}" + ) + unix_socket = args.unix_socket - password = args.db_server_password oss_cluster_api_enabled = args.cluster_mode ssl_cert_reqs = "required" if tls_skip_verify: ssl_cert_reqs = None - # Build Redis connection parameters - redis_params = { - "host": host, - "port": port, - "password": password, - "ssl": tls_enabled, - "ssl_cert_reqs": ssl_cert_reqs, - "ssl_check_hostname": False, - } - - # Only add SSL certificate parameters if they are provided - if tls_enabled: - if tls_key is not None and tls_key != "": - redis_params["ssl_keyfile"] = tls_key - if tls_cert is not None and tls_cert != "": - redis_params["ssl_certfile"] = tls_cert - if tls_cacert is not None and tls_cacert != "": - redis_params["ssl_ca_certs"] = tls_cacert + # Create Redis connection - use URI if provided, otherwise use individual parameters + if hasattr(args, "uri") and args.uri: + # Use URI connection (redis-py handles URI parsing automatically) + redis_params = {} + + # Only add SSL parameters if TLS is enabled + if tls_enabled: + redis_params["ssl_cert_reqs"] = ssl_cert_reqs + redis_params["ssl_check_hostname"] = False + if tls_key is not None and tls_key != "": + redis_params["ssl_keyfile"] = tls_key + if tls_cert is not None and tls_cert != "": + redis_params["ssl_certfile"] = tls_cert + if tls_cacert is not None and tls_cacert != "": + redis_params["ssl_ca_certs"] = tls_cacert + + r = redis.StrictRedis.from_url(args.uri, **redis_params) + logging.info(f"Connected to Redis using URI: {args.uri}") + else: + # Use individual connection parameters + redis_params = { + "host": host, + "port": port, + "password": password, + "ssl": tls_enabled, + "ssl_cert_reqs": ssl_cert_reqs, + "ssl_check_hostname": False, + } - r = redis.StrictRedis(**redis_params) + # Only add SSL certificate parameters if they are provided + if tls_enabled: + if tls_key is not None and tls_key != "": + redis_params["ssl_keyfile"] = tls_key + if tls_cert is not None and tls_cert != "": + redis_params["ssl_certfile"] = tls_cert + if tls_cacert is not None and tls_cacert != "": + redis_params["ssl_ca_certs"] = tls_cacert + + r = redis.StrictRedis(**redis_params) + logging.info( + f"Connected to Redis using individual parameters: {host}:{port}" + ) setup_name = "oss-standalone" r.ping() @@ -1703,7 +1942,7 @@ def delete_temporary_files( benchmark_tool_global=benchmark_tool_global, ) continue - logging.info( + logging.debug( "Test {} priority ({}) is within the priority limit [{},{}]".format( test_name, priority, @@ -2600,6 +2839,23 @@ def delete_temporary_files( "Using aggregated JSON results from multi-client execution" ) results_dict = json.loads(client_container_stdout) + + # Validate benchmark metrics + is_valid, validation_error = validate_benchmark_metrics( + results_dict, test_name, benchmark_config, default_metrics + ) + if not is_valid: + logging.error( + f"Test {test_name} failed metric validation: {validation_error}" + ) + test_result = False + delete_temporary_files( + temporary_dir_client=temporary_dir_client, + full_result_path=full_result_path, + benchmark_tool_global=benchmark_tool_global, + ) + continue + # Print results table for multi-client print_results_table_stdout( benchmark_config, @@ -2662,6 +2918,23 @@ def delete_temporary_files( "r", ) as json_file: results_dict = json.load(json_file) + + # Validate benchmark metrics + is_valid, validation_error = validate_benchmark_metrics( + results_dict, test_name, benchmark_config, default_metrics + ) + if not is_valid: + logging.error( + f"Test {test_name} failed metric validation: {validation_error}" + ) + test_result = False + delete_temporary_files( + temporary_dir_client=temporary_dir_client, + full_result_path=full_result_path, + benchmark_tool_global=benchmark_tool_global, + ) + continue + print_results_table_stdout( benchmark_config, default_metrics, diff --git a/redis_benchmarks_specification/__self_contained_coordinator__/args.py b/redis_benchmarks_specification/__self_contained_coordinator__/args.py index d85071ae..53a6a193 100644 --- a/redis_benchmarks_specification/__self_contained_coordinator__/args.py +++ b/redis_benchmarks_specification/__self_contained_coordinator__/args.py @@ -28,6 +28,12 @@ def create_self_contained_coordinator_args(project_name): description=project_name, formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) + parser.add_argument( + "--version", + action="version", + version=project_name, + help="Show version information and exit", + ) parser.add_argument("--event_stream_host", type=str, default=GH_REDIS_SERVER_HOST) parser.add_argument("--event_stream_port", type=int, default=GH_REDIS_SERVER_PORT) parser.add_argument("--event_stream_pass", type=str, default=GH_REDIS_SERVER_AUTH) @@ -171,4 +177,34 @@ def create_self_contained_coordinator_args(project_name): default="", help="Filter tests to run only with the specified topology (e.g. oss-standalone)", ) + parser.add_argument( + "--exclusive-hardware", + default=False, + action="store_true", + help="Enable exclusive hardware mode. Kills all memtier processes and stops all docker containers before and after each test.", + ) + parser.add_argument( + "--http-port", + type=int, + default=8080, + help="Port for HTTP server endpoints (/ping health check and /reset-queue POST endpoint).", + ) + parser.add_argument( + "--http-auth-username", + type=str, + default=None, + help="Username for HTTP endpoint authentication. HTTP server is disabled if not provided.", + ) + parser.add_argument( + "--http-auth-password", + type=str, + default=None, + help="Password for HTTP endpoint authentication. HTTP server is disabled if not provided.", + ) + parser.add_argument( + "--skip-clear-pending-on-startup", + default=False, + action="store_true", + help="Skip automatically clearing pending messages and resetting consumer group position on startup. By default, pending messages are cleared and consumer group is reset to latest position to skip old work and recover from crashes.", + ) return parser diff --git a/redis_benchmarks_specification/__self_contained_coordinator__/runners.py b/redis_benchmarks_specification/__self_contained_coordinator__/runners.py index aee3094e..f6cfcd76 100644 --- a/redis_benchmarks_specification/__self_contained_coordinator__/runners.py +++ b/redis_benchmarks_specification/__self_contained_coordinator__/runners.py @@ -15,6 +15,12 @@ dbconfig_keyspacelen_check, ) +from redisbench_admin.profilers.profilers_local import ( + local_profilers_platform_checks, + profilers_start_if_required, + profilers_stop_if_required, +) + from redisbench_admin.profilers.profilers_local import ( local_profilers_platform_checks, profilers_start_if_required, @@ -38,6 +44,7 @@ from redis_benchmarks_specification.__common__.env import ( STREAM_KEYNAME_NEW_BUILD_EVENTS, + get_arch_specific_stream_name, STREAM_GH_NEW_BUILD_RUNNERS_CG, S3_BUCKET_NAME, ) @@ -71,12 +78,16 @@ ) -def build_runners_consumer_group_create(conn, running_platform, id="$"): +def build_runners_consumer_group_create(conn, running_platform, arch="amd64", id="$"): consumer_group_name = get_runners_consumer_group_name(running_platform) + arch_specific_stream = get_arch_specific_stream_name(arch) logging.info("Will use consumer group named {}.".format(consumer_group_name)) + logging.info( + "Will read from architecture-specific stream: {}.".format(arch_specific_stream) + ) try: conn.xgroup_create( - STREAM_KEYNAME_NEW_BUILD_EVENTS, + arch_specific_stream, consumer_group_name, mkstream=True, id=id, @@ -99,6 +110,80 @@ def get_runners_consumer_group_name(running_platform): return consumer_group_name +def clear_pending_messages_for_consumer( + conn, running_platform, consumer_pos, arch="amd64" +): + """Clear all pending messages for a specific consumer on startup""" + consumer_group_name = get_runners_consumer_group_name(running_platform) + consumer_name = "{}-self-contained-proc#{}".format( + consumer_group_name, consumer_pos + ) + arch_specific_stream = get_arch_specific_stream_name(arch) + logging.info( + f"Clearing pending messages from architecture-specific stream: {arch_specific_stream}" + ) + + try: + # Get pending messages for this specific consumer + pending_info = conn.xpending_range( + arch_specific_stream, + consumer_group_name, + min="-", + max="+", + count=1000, # Get up to 1000 pending messages + consumername=consumer_name, + ) + + if pending_info: + message_ids = [msg["message_id"] for msg in pending_info] + logging.info( + f"Found {len(message_ids)} pending messages for consumer {consumer_name}. Clearing them..." + ) + + # Acknowledge all pending messages to clear them + ack_count = conn.xack( + arch_specific_stream, consumer_group_name, *message_ids + ) + + logging.info( + f"Successfully cleared {ack_count} pending messages for consumer {consumer_name}" + ) + else: + logging.info(f"No pending messages found for consumer {consumer_name}") + + except redis.exceptions.ResponseError as e: + if "NOGROUP" in str(e): + logging.info(f"Consumer group {consumer_group_name} does not exist yet") + else: + logging.warning(f"Error clearing pending messages: {e}") + except Exception as e: + logging.error(f"Unexpected error clearing pending messages: {e}") + + +def reset_consumer_group_to_latest(conn, running_platform, arch="amd64"): + """Reset the consumer group position to only read new messages (skip old ones)""" + consumer_group_name = get_runners_consumer_group_name(running_platform) + arch_specific_stream = get_arch_specific_stream_name(arch) + logging.info( + f"Resetting consumer group position for architecture-specific stream: {arch_specific_stream}" + ) + + try: + # Set the consumer group position to '$' (latest) to skip all existing messages + conn.xgroup_setid(arch_specific_stream, consumer_group_name, id="$") + logging.info( + f"Reset consumer group {consumer_group_name} position to latest on stream {arch_specific_stream} - will only process new messages" + ) + + except redis.exceptions.ResponseError as e: + if "NOGROUP" in str(e): + logging.info(f"Consumer group {consumer_group_name} does not exist yet") + else: + logging.warning(f"Error resetting consumer group position: {e}") + except Exception as e: + logging.error(f"Unexpected error resetting consumer group position: {e}") + + def process_self_contained_coordinator_stream( conn, datasink_push_results_redistimeseries, @@ -615,6 +700,9 @@ def process_self_contained_coordinator_stream( metadata, build_variant_name, running_platform, + None, + None, + disable_target_tables=True, ) test_result = True total_test_suite_runs = total_test_suite_runs + 1 diff --git a/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py b/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py index 10958ab7..42d8dd43 100644 --- a/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py +++ b/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py @@ -1,9 +1,14 @@ +# Import warning suppression first +from redis_benchmarks_specification.__common__.suppress_warnings import * + import datetime import json import logging import pathlib import shutil +import subprocess import tempfile +import threading import traceback import re import docker @@ -13,6 +18,9 @@ from pathlib import Path import sys import time +import base64 +from http.server import HTTPServer, BaseHTTPRequestHandler +from urllib.parse import urlparse, parse_qs from docker.models.containers import Container from redis_benchmarks_specification.__self_contained_coordinator__.post_processing import ( @@ -59,6 +67,7 @@ from redis_benchmarks_specification.__runner__.runner import ( print_results_table_stdout, prepare_memtier_benchmark_parameters, + validate_benchmark_metrics, ) from redis_benchmarks_specification.__self_contained_coordinator__.args import ( create_self_contained_coordinator_args, @@ -66,6 +75,8 @@ from redis_benchmarks_specification.__self_contained_coordinator__.runners import ( build_runners_consumer_group_create, get_runners_consumer_group_name, + clear_pending_messages_for_consumer, + reset_consumer_group_to_latest, ) from redis_benchmarks_specification.__setups__.topologies import get_topologies @@ -91,6 +102,7 @@ from redis_benchmarks_specification.__common__.env import ( STREAM_KEYNAME_NEW_BUILD_EVENTS, + get_arch_specific_stream_name, S3_BUCKET_NAME, ) from redis_benchmarks_specification.__common__.spec import ( @@ -108,6 +120,376 @@ extract_build_info_from_streamdata, ) +# Global variables for HTTP server control +_reset_queue_requested = False +_exclusive_hardware = False +_http_auth_username = None +_http_auth_password = None +_flush_timestamp = None + + +class CoordinatorHTTPHandler(BaseHTTPRequestHandler): + """HTTP request handler for coordinator endpoints""" + + def log_message(self, format, *args): + """Override to use our logging system""" + logging.info(f"HTTP {format % args}") + + def _authenticate(self): + """Check if the request is authenticated""" + global _http_auth_username, _http_auth_password + + # Check for Authorization header + auth_header = self.headers.get("Authorization") + if not auth_header: + return False + + # Parse Basic auth + try: + if not auth_header.startswith("Basic "): + return False + + # Decode base64 credentials + encoded_credentials = auth_header[6:] # Remove 'Basic ' prefix + decoded_credentials = base64.b64decode(encoded_credentials).decode("utf-8") + username, password = decoded_credentials.split(":", 1) + + # Verify credentials + return username == _http_auth_username and password == _http_auth_password + + except Exception as e: + logging.warning(f"Authentication error: {e}") + return False + + def _send_auth_required(self): + """Send 401 Unauthorized response""" + self.send_response(401) + self.send_header( + "WWW-Authenticate", 'Basic realm="Redis Benchmarks Coordinator"' + ) + self.send_header("Content-type", "application/json") + self.end_headers() + response = { + "error": "Authentication required", + "message": "Please provide valid credentials using Basic authentication", + } + self.wfile.write(json.dumps(response).encode()) + + def do_GET(self): + """Handle GET requests""" + # Check authentication + if not self._authenticate(): + self._send_auth_required() + return + + parsed_path = urlparse(self.path) + + if parsed_path.path == "/ping": + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + response = { + "status": "healthy", + "timestamp": datetime.datetime.utcnow().isoformat(), + "service": "redis-benchmarks-self-contained-coordinator", + } + self.wfile.write(json.dumps(response).encode()) + + elif parsed_path.path == "/containers": + # Check for stuck containers + stuck_containers = self._check_stuck_containers() + + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + response = { + "status": "success", + "stuck_containers": stuck_containers, + "total_stuck": len(stuck_containers), + "timestamp": datetime.datetime.utcnow().isoformat(), + } + self.wfile.write(json.dumps(response).encode()) + + else: + self.send_response(404) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"error": "Not found"}).encode()) + + def do_POST(self): + """Handle POST requests""" + # Check authentication + if not self._authenticate(): + self._send_auth_required() + return + + global _reset_queue_requested, _flush_timestamp + + parsed_path = urlparse(self.path) + + if parsed_path.path == "/reset-queue": + try: + # Read request body + content_length = int(self.headers.get("Content-Length", 0)) + if content_length > 0: + post_data = self.rfile.read(content_length) + try: + request_data = json.loads(post_data.decode()) + except json.JSONDecodeError: + request_data = {} + else: + request_data = {} + + # Set the reset flag + _reset_queue_requested = True + logging.info("Queue reset requested via HTTP endpoint") + + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + response = { + "status": "success", + "message": "Queue reset requested", + "timestamp": datetime.datetime.utcnow().isoformat(), + } + self.wfile.write(json.dumps(response).encode()) + + except Exception as e: + logging.error(f"Error handling reset-queue request: {e}") + self.send_response(500) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"error": str(e)}).encode()) + + elif parsed_path.path == "/flush": + try: + # Read request body (optional) + content_length = int(self.headers.get("Content-Length", 0)) + if content_length > 0: + post_data = self.rfile.read(content_length) + try: + request_data = json.loads(post_data.decode()) + except json.JSONDecodeError: + request_data = {} + else: + request_data = {} + + # Record flush timestamp + flush_time = datetime.datetime.utcnow() + _flush_timestamp = flush_time + + logging.info( + "Flush requested via HTTP endpoint - stopping all containers and processes" + ) + + # Perform flush cleanup + self._perform_flush_cleanup() + + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + response = { + "status": "success", + "message": "Flush completed - all containers stopped and processes killed", + "flush_timestamp": flush_time.isoformat(), + "timestamp": datetime.datetime.utcnow().isoformat(), + } + self.wfile.write(json.dumps(response).encode()) + + except Exception as e: + logging.error(f"Error during flush operation: {e}") + self.send_response(500) + self.send_header("Content-type", "application/json") + self.end_headers() + response = { + "status": "error", + "message": f"Flush failed: {str(e)}", + "timestamp": datetime.datetime.utcnow().isoformat(), + } + self.wfile.write(json.dumps(response).encode()) + + else: + self.send_response(404) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"error": "Not found"}).encode()) + + def _perform_flush_cleanup(self): + """Perform flush cleanup: stop all containers and kill memtier processes""" + import subprocess + + # Kill all memtier processes + try: + logging.info("Killing all memtier_benchmark processes") + result = subprocess.run( + ["pkill", "-f", "memtier_benchmark"], capture_output=True, text=True + ) + if result.returncode == 0: + logging.info("Successfully killed memtier_benchmark processes") + else: + logging.info("No memtier_benchmark processes found to kill") + + result = subprocess.run( + ["pkill", "-f", "memtier"], capture_output=True, text=True + ) + if result.returncode == 0: + logging.info("Successfully killed memtier processes") + else: + logging.info("No memtier processes found to kill") + except Exception as e: + logging.warning(f"Error killing memtier processes: {e}") + + # Stop all Docker containers with force if needed + try: + logging.info("Stopping all Docker containers") + client = docker.from_env() + containers = client.containers.list() + + if not containers: + logging.info("No running containers found") + return + + logging.info(f"Found {len(containers)} running containers") + + for container in containers: + try: + # Get container info + created_time = container.attrs["Created"] + uptime = ( + datetime.datetime.utcnow() + - datetime.datetime.fromisoformat( + created_time.replace("Z", "+00:00") + ) + ) + + logging.info( + f"Stopping container: {container.name} ({container.id[:12]}) - uptime: {uptime}" + ) + + # Try graceful stop first + container.stop(timeout=10) + logging.info(f"Successfully stopped container: {container.name}") + + except Exception as e: + logging.warning(f"Error stopping container {container.name}: {e}") + try: + # Force kill if graceful stop failed + logging.info(f"Force killing container: {container.name}") + container.kill() + logging.info( + f"Successfully force killed container: {container.name}" + ) + except Exception as e2: + logging.error( + f"Failed to force kill container {container.name}: {e2}" + ) + + except Exception as e: + logging.warning(f"Error accessing Docker client: {e}") + + logging.info("Flush cleanup completed") + + def _check_stuck_containers(self, max_hours=2): + """Check for containers running longer than max_hours and return info""" + try: + client = docker.from_env() + containers = client.containers.list() + stuck_containers = [] + + for container in containers: + try: + created_time = container.attrs["Created"] + uptime = ( + datetime.datetime.utcnow() + - datetime.datetime.fromisoformat( + created_time.replace("Z", "+00:00") + ) + ) + uptime_hours = uptime.total_seconds() / 3600 + + if uptime_hours > max_hours: + stuck_containers.append( + { + "name": container.name, + "id": container.id[:12], + "image": ( + container.image.tags[0] + if container.image.tags + else "unknown" + ), + "uptime_hours": round(uptime_hours, 2), + "status": container.status, + } + ) + except Exception as e: + logging.warning(f"Error checking container {container.name}: {e}") + + return stuck_containers + except Exception as e: + logging.warning(f"Error accessing Docker client: {e}") + return [] + + +def start_http_server(port=8080): + """Start the HTTP server in a separate thread""" + + def run_server(): + try: + server = HTTPServer(("0.0.0.0", port), CoordinatorHTTPHandler) + logging.info(f"Starting HTTP server on port {port}") + logging.info(f"Available endpoints:") + logging.info(f" GET /ping - Health check") + logging.info(f" GET /containers - Check for stuck containers") + logging.info( + f" POST /reset-queue - Reset pending streams and skip running tests" + ) + logging.info( + f" POST /flush - Stop all containers and processes, ignore work before flush time" + ) + server.serve_forever() + except Exception as e: + logging.error(f"HTTP server error: {e}") + + server_thread = threading.Thread(target=run_server, daemon=True) + server_thread.start() + return server_thread + + +def cleanup_system_processes(): + """Clean up memtier processes and docker containers for exclusive hardware mode""" + global _exclusive_hardware + + if not _exclusive_hardware: + return + + logging.info("Exclusive hardware mode: Cleaning up system processes") + + try: + # Kill all memtier_benchmark processes + logging.info("Killing all memtier_benchmark processes") + subprocess.run(["pkill", "-f", "memtier_benchmark"], check=False) + + # Stop all docker containers + logging.info("Stopping all docker containers") + docker_client = docker.from_env() + containers = docker_client.containers.list() + for container in containers: + try: + logging.info( + f"Stopping container: {container.name} ({container.id[:12]})" + ) + container.stop(timeout=10) + container.remove(force=True) + except Exception as e: + logging.warning(f"Error stopping container {container.id[:12]}: {e}") + + # Wait a moment for cleanup to complete + time.sleep(2) + logging.info("System cleanup completed") + + except Exception as e: + logging.error(f"Error during system cleanup: {e}") + def print_directory_logs(directory_path, description=""): """Print all .log files in a directory for debugging purposes.""" @@ -157,6 +539,8 @@ def print_directory_logs(directory_path, description=""): def main(): + global _exclusive_hardware, _http_auth_username, _http_auth_password + _, _, project_version = populate_with_poetry_data() project_name = "redis-benchmarks-spec runner(self-contained)" parser = create_self_contained_coordinator_args( @@ -164,6 +548,7 @@ def main(): ) args = parser.parse_args() + # Configure logging first, before any logging calls if args.logname is not None: print("Writting log to {}".format(args.logname)) logging.basicConfig( @@ -180,6 +565,23 @@ def main(): level=LOG_LEVEL, datefmt=LOG_DATEFMT, ) + + # Set global exclusive hardware flag + _exclusive_hardware = args.exclusive_hardware + if _exclusive_hardware: + logging.info("Exclusive hardware mode enabled") + + # Set HTTP authentication credentials and start server only if credentials are provided + _http_auth_username = args.http_auth_username + _http_auth_password = args.http_auth_password + + if _http_auth_username and _http_auth_password: + logging.info( + "Starting HTTP server with authentication on port {}".format(args.http_port) + ) + start_http_server(args.http_port) + else: + logging.info("HTTP server disabled - no authentication credentials provided") logging.info(get_version_string(project_name, project_version)) topologies_folder = os.path.abspath(args.setups_folder + "/topologies") logging.info("Using topologies folder dir {}".format(topologies_folder)) @@ -249,7 +651,23 @@ def main(): logging.info("checking build spec requirements") running_platform = args.platform_name - build_runners_consumer_group_create(gh_event_conn, running_platform) + build_runners_consumer_group_create(gh_event_conn, running_platform, args.arch) + + # Clear pending messages and reset consumer group position by default (unless explicitly skipped) + if not args.skip_clear_pending_on_startup: + consumer_pos = args.consumer_pos + logging.info( + "Clearing pending messages and resetting consumer group position on startup (default behavior)" + ) + clear_pending_messages_for_consumer( + gh_event_conn, running_platform, consumer_pos, args.arch + ) + reset_consumer_group_to_latest(gh_event_conn, running_platform, args.arch) + else: + logging.info( + "Skipping pending message cleanup and consumer group reset as requested" + ) + stream_id = None docker_client = docker.from_env() home = str(Path.home()) @@ -275,14 +693,15 @@ def main(): grafana_profile_dashboard = args.grafana_profile_dashboard defaults_filename = args.defaults_filename - ( - _, - _, - default_metrics, - _, - _, - _, - ) = get_defaults(defaults_filename) + get_defaults_result = get_defaults(defaults_filename) + # Handle variable number of return values from get_defaults + if len(get_defaults_result) >= 3: + default_metrics = get_defaults_result[2] + else: + default_metrics = [] + logging.warning( + "get_defaults returned fewer values than expected, using empty default_metrics" + ) # Consumer id consumer_pos = args.consumer_pos @@ -401,10 +820,15 @@ def self_contained_coordinator_blocking_read( get_runners_consumer_group_name(platform_name), consumer_name ) ) + # Use architecture-specific stream + arch_specific_stream = get_arch_specific_stream_name(arch) + logging.info( + f"Reading work from architecture-specific stream: {arch_specific_stream}" + ) newTestInfo = github_event_conn.xreadgroup( get_runners_consumer_group_name(platform_name), consumer_name, - {STREAM_KEYNAME_NEW_BUILD_EVENTS: stream_id}, + {arch_specific_stream: stream_id}, count=1, block=0, ) @@ -454,26 +878,35 @@ def __init__(self): ) num_process_streams = num_process_streams + 1 num_process_test_suites = num_process_test_suites + total_test_suite_runs - if overall_result is True: - ack_reply = github_event_conn.xack( - STREAM_KEYNAME_NEW_BUILD_EVENTS, - get_runners_consumer_group_name(platform_name), - stream_id, - ) - if type(ack_reply) == bytes: - ack_reply = ack_reply.decode() - if ack_reply == "1" or ack_reply == 1: + + # Always acknowledge the message, even if it was filtered out + arch_specific_stream = get_arch_specific_stream_name(arch) + ack_reply = github_event_conn.xack( + arch_specific_stream, + get_runners_consumer_group_name(platform_name), + stream_id, + ) + if type(ack_reply) == bytes: + ack_reply = ack_reply.decode() + if ack_reply == "1" or ack_reply == 1: + if overall_result is True: logging.info( - "Sucessfully acknowledge BENCHMARK variation stream with id {}.".format( + "Successfully acknowledged BENCHMARK variation stream with id {} (processed).".format( stream_id ) ) else: - logging.error( - "Unable to acknowledge build variation stream with id {}. XACK reply {}".format( - stream_id, ack_reply + logging.info( + "Successfully acknowledged BENCHMARK variation stream with id {} (filtered/skipped).".format( + stream_id ) ) + else: + logging.error( + "Unable to acknowledge build variation stream with id {}. XACK reply {}".format( + stream_id, ack_reply + ) + ) return overall_result, stream_id, num_process_streams, num_process_test_suites @@ -569,6 +1002,23 @@ def process_self_contained_coordinator_stream( git_timestamp_ms, run_arch, ) = extract_build_info_from_streamdata(testDetails) + + # Check if this work should be ignored due to flush + global _flush_timestamp + if ( + _flush_timestamp is not None + and use_git_timestamp + and git_timestamp_ms is not None + ): + # Convert flush timestamp to milliseconds for comparison + flush_timestamp_ms = int(_flush_timestamp.timestamp() * 1000) + if git_timestamp_ms < flush_timestamp_ms: + logging.info( + f"Ignoring work with git_timestamp_ms {git_timestamp_ms} " + f"(before flush timestamp {flush_timestamp_ms}). Stream id: {stream_id}" + ) + return stream_id, False, 0 + tf_github_org = default_github_org if b"github_org" in testDetails: tf_github_org = testDetails[b"github_org"].decode() @@ -674,14 +1124,25 @@ def process_self_contained_coordinator_stream( f"detected a command groups regexp definition on the streamdata {command_groups_regexp}" ) + command_regexp = None + if b"command_regexp" in testDetails: + command_regexp = testDetails[b"command_regexp"].decode() + logging.info( + f"detected a command regexp definition on the streamdata {command_regexp}" + ) + skip_test = False if b"platform" in testDetails: platform = testDetails[b"platform"] - if running_platform != platform: + # Decode bytes to string for proper comparison + platform_str = ( + platform.decode() if isinstance(platform, bytes) else platform + ) + if running_platform != platform_str: skip_test = True logging.info( "skipping stream_id {} given plaform {}!={}".format( - stream_id, running_platform, platform + stream_id, running_platform, platform_str ) ) @@ -729,8 +1190,17 @@ def process_self_contained_coordinator_stream( tests_regexp, testsuite_spec_files, command_groups_regexp, + command_regexp, ) + logging.info( + f"Adding {len(filtered_test_files)} tests to pending test list" + ) + + # Use pipeline for efficient bulk operations + pipeline = github_event_conn.pipeline() + test_names_added = [] + for test_file in filtered_test_files: with open(test_file, "r") as stream: ( @@ -738,14 +1208,20 @@ def process_self_contained_coordinator_stream( benchmark_config, test_name, ) = get_final_benchmark_config(None, None, stream, "") - github_event_conn.lpush(stream_test_list_pending, test_name) - github_event_conn.expire( - stream_test_list_pending, REDIS_BINS_EXPIRE_SECS - ) - logging.info( - f"Added test named {test_name} to the pending test list in key {stream_test_list_pending}" + pipeline.lpush(stream_test_list_pending, test_name) + test_names_added.append(test_name) + logging.debug( + f"Queued test named {test_name} for addition to pending test list" ) + # Set expiration and execute pipeline + pipeline.expire(stream_test_list_pending, REDIS_BINS_EXPIRE_SECS) + pipeline.execute() + + logging.info( + f"Successfully added {len(test_names_added)} tests to pending test list in key {stream_test_list_pending}" + ) + pending_tests = len(filtered_test_files) failed_tests = 0 benchmark_suite_start_datetime = datetime.datetime.utcnow() @@ -773,6 +1249,22 @@ def process_self_contained_coordinator_stream( ) for test_file in filtered_test_files: + # Check if queue reset was requested + global _reset_queue_requested + if _reset_queue_requested: + logging.info( + "Queue reset requested. Skipping remaining tests and clearing queues." + ) + # Clear all pending tests from the queue + github_event_conn.delete(stream_test_list_pending) + github_event_conn.delete(stream_test_list_running) + logging.info("Cleared pending and running test queues") + _reset_queue_requested = False + break + + # Clean up system processes if in exclusive hardware mode + cleanup_system_processes() + redis_containers = [] client_containers = [] with open(test_file, "r") as stream: @@ -786,8 +1278,8 @@ def process_self_contained_coordinator_stream( github_event_conn.expire( stream_test_list_running, REDIS_BINS_EXPIRE_SECS ) - logging.info( - f"Added test named {test_name} to the pending test list in key {stream_test_list_running}" + logging.debug( + f"Added test named {test_name} to the running test list in key {stream_test_list_running}" ) ( _, @@ -1086,25 +1578,111 @@ def process_self_contained_coordinator_stream( ) # run the benchmark benchmark_start_time = datetime.datetime.now() + + # Calculate container timeout + container_timeout = 300 # 5 minutes default + buffer_timeout = 60 # Default buffer + + # Try to extract test time from command and add buffer + import re + + test_time_match = re.search( + r"--?test-time[=\s]+(\d+)", benchmark_command_str + ) + if test_time_match: + test_time = int(test_time_match.group(1)) + container_timeout = test_time + buffer_timeout + logging.info( + f"Set container timeout to {container_timeout}s (test-time: {test_time}s + {buffer_timeout}s buffer)" + ) + else: + logging.info( + f"Using default container timeout: {container_timeout}s" + ) + try: - client_container_stdout = ( - docker_client.containers.run( - image=client_container_image, - volumes={ - temporary_dir_client: { - "bind": client_mnt_point, - "mode": "rw", - }, + # Start container with detach=True to enable timeout handling + container = docker_client.containers.run( + image=client_container_image, + volumes={ + temporary_dir_client: { + "bind": client_mnt_point, + "mode": "rw", }, - auto_remove=True, - privileged=True, - working_dir=benchmark_tool_workdir, - command=benchmark_command_str, - network_mode="host", - detach=False, - cpuset_cpus=client_cpuset_cpus, - ) + }, + auto_remove=False, # Don't auto-remove so we can get logs if timeout + privileged=True, + working_dir=benchmark_tool_workdir, + command=benchmark_command_str, + network_mode="host", + detach=True, # Detach to enable timeout + cpuset_cpus=client_cpuset_cpus, + ) + + logging.info( + f"Started container {container.name} ({container.id[:12]}) with {container_timeout}s timeout" ) + + # Wait for container with timeout + try: + result = container.wait( + timeout=container_timeout + ) + client_container_stdout = container.logs( + stdout=True, stderr=False + ).decode("utf-8") + container_stderr = container.logs( + stdout=False, stderr=True + ).decode("utf-8") + + # Check exit code + if result["StatusCode"] != 0: + logging.error( + f"Container exited with code {result['StatusCode']}" + ) + logging.error( + f"Container stderr: {container_stderr}" + ) + raise docker.errors.ContainerError( + container, + result["StatusCode"], + benchmark_command_str, + client_container_stdout, + container_stderr, + ) + + logging.info( + f"Container {container.name} completed successfully" + ) + + except Exception as timeout_error: + if "timeout" in str(timeout_error).lower(): + logging.error( + f"Container {container.name} timed out after {container_timeout}s" + ) + # Get logs before killing + try: + timeout_logs = container.logs( + stdout=True, stderr=True + ).decode("utf-8") + logging.error( + f"Container logs before timeout: {timeout_logs}" + ) + except: + pass + # Kill the container + container.kill() + raise Exception( + f"Container timed out after {container_timeout} seconds" + ) + else: + raise timeout_error + finally: + # Clean up container + try: + container.remove(force=True) + except: + pass except docker.errors.ContainerError as e: logging.info( "stdout: {}".format( @@ -1242,6 +1820,23 @@ def process_self_contained_coordinator_stream( results_dict = post_process_vector_db( temporary_dir_client ) + + # Validate benchmark metrics for vector-db-benchmark + is_valid, validation_error = ( + validate_benchmark_metrics( + results_dict, + test_name, + benchmark_config, + default_metrics, + ) + ) + if not is_valid: + logging.error( + f"Test {test_name} failed metric validation: {validation_error}" + ) + test_result = False + failed_tests += 1 + continue else: post_process_benchmark_results( benchmark_tool, @@ -1268,6 +1863,24 @@ def process_self_contained_coordinator_stream( "r", ) as json_file: results_dict = json.load(json_file) + + # Validate benchmark metrics + is_valid, validation_error = ( + validate_benchmark_metrics( + results_dict, + test_name, + benchmark_config, + default_metrics, + ) + ) + if not is_valid: + logging.error( + f"Test {test_name} failed metric validation: {validation_error}" + ) + test_result = False + failed_tests += 1 + continue + print_results_table_stdout( benchmark_config, default_metrics, @@ -1422,6 +2035,9 @@ def process_self_contained_coordinator_stream( overall_result &= test_result + # Clean up system processes after test completion if in exclusive hardware mode + cleanup_system_processes() + github_event_conn.lrem(stream_test_list_running, 1, test_name) github_event_conn.lpush(stream_test_list_completed, test_name) github_event_conn.expire( @@ -1584,7 +2200,7 @@ def process_self_contained_coordinator_stream( e.__str__() ) ) - logging.info( + logging.debug( f"Added test named {test_name} to the completed test list in key {stream_test_list_completed}" ) else: @@ -1657,6 +2273,7 @@ def filter_test_files( tests_regexp, testsuite_spec_files, command_groups_regexp=None, + command_regexp=None, ): filtered_test_files = [] for test_file in testsuite_spec_files: @@ -1693,14 +2310,14 @@ def filter_test_files( continue if command_groups_regexp is not None: - logging.info( + logging.debug( "Filtering all test command groups via a regular expression: {}".format( command_groups_regexp ) ) if "tested-groups" in benchmark_config: command_groups = benchmark_config["tested-groups"] - logging.info( + logging.debug( f"The file {test_file} (test name = {test_name}) contains the following groups: {command_groups}" ) groups_regex_string = re.compile(command_groups_regexp) @@ -1709,17 +2326,40 @@ def filter_test_files( match_obj = re.search(groups_regex_string, command_group) if match_obj is not None: found = True - logging.info(f"found the command group {command_group}") + logging.debug(f"found the command group {command_group}") if found is False: logging.info( f"Skipping {test_file} given the following groups: {command_groups} does not match command group regex {command_groups_regexp}" ) continue else: - logging.warning( + logging.debug( f"The file {test_file} (test name = {test_name}) does not contain the property 'tested-groups'. Cannot filter based uppon groups..." ) + # Filter by command regex if specified + if command_regexp is not None and command_regexp != ".*": + if "tested-commands" in benchmark_config: + tested_commands = benchmark_config["tested-commands"] + command_regex_compiled = re.compile(command_regexp, re.IGNORECASE) + found = False + for command in tested_commands: + if re.search(command_regex_compiled, command): + found = True + logging.info( + f"found the command {command} matching regex {command_regexp}" + ) + break + if found is False: + logging.info( + f"Skipping {test_file} given the following commands: {tested_commands} does not match command regex {command_regexp}" + ) + continue + else: + logging.warning( + f"The file {test_file} (test name = {test_name}) does not contain the property 'tested-commands'. Cannot filter based upon commands..." + ) + if "priority" in benchmark_config: priority = benchmark_config["priority"] @@ -1806,22 +2446,92 @@ def data_prepopulation_step( # run the benchmark preload_start_time = datetime.datetime.now() - client_container_stdout = docker_client.containers.run( - image=preload_image, - volumes={ - temporary_dir: { - "bind": client_mnt_point, - "mode": "rw", + # Set preload timeout (preload can take longer than benchmarks) + preload_timeout = 1800 # 30 minutes default for data loading + logging.info(f"Starting preload container with {preload_timeout}s timeout") + + try: + # Start container with detach=True to enable timeout handling + container = docker_client.containers.run( + image=preload_image, + volumes={ + temporary_dir: { + "bind": client_mnt_point, + "mode": "rw", + }, }, - }, - auto_remove=True, - privileged=True, - working_dir=benchmark_tool_workdir, - command=preload_command_str, - network_mode="host", - detach=False, - cpuset_cpus=client_cpuset_cpus, - ) + auto_remove=False, # Don't auto-remove so we can get logs if timeout + privileged=True, + working_dir=benchmark_tool_workdir, + command=preload_command_str, + network_mode="host", + detach=True, # Detach to enable timeout + cpuset_cpus=client_cpuset_cpus, + ) + + logging.info( + f"Started preload container {container.name} ({container.id[:12]}) with {preload_timeout}s timeout" + ) + + # Wait for container with timeout + try: + result = container.wait(timeout=preload_timeout) + client_container_stdout = container.logs( + stdout=True, stderr=False + ).decode("utf-8") + container_stderr = container.logs(stdout=False, stderr=True).decode( + "utf-8" + ) + + # Check exit code + if result["StatusCode"] != 0: + logging.error( + f"Preload container exited with code {result['StatusCode']}" + ) + logging.error(f"Preload container stderr: {container_stderr}") + raise docker.errors.ContainerError( + container, + result["StatusCode"], + preload_command_str, + client_container_stdout, + container_stderr, + ) + + logging.info( + f"Preload container {container.name} completed successfully" + ) + + except Exception as timeout_error: + if "timeout" in str(timeout_error).lower(): + logging.error( + f"Preload container {container.name} timed out after {preload_timeout}s" + ) + # Get logs before killing + try: + timeout_logs = container.logs(stdout=True, stderr=True).decode( + "utf-8" + ) + logging.error( + f"Preload container logs before timeout: {timeout_logs}" + ) + except: + pass + # Kill the container + container.kill() + raise Exception( + f"Preload container timed out after {preload_timeout} seconds" + ) + else: + raise timeout_error + finally: + # Clean up container + try: + container.remove(force=True) + except: + pass + except Exception as e: + logging.error(f"Preload container failed: {e}") + raise e preload_end_time = datetime.datetime.now() preload_duration_seconds = calculate_client_tool_duration_and_check( diff --git a/redis_benchmarks_specification/test-suites/memtier_benchmark-1Mkeys-string-setget200c-1KiB-pipeline-10.yml b/redis_benchmarks_specification/test-suites/memtier_benchmark-1Mkeys-string-setget200c-1KiB-pipeline-10.yml new file mode 100644 index 00000000..7f4b59f2 --- /dev/null +++ b/redis_benchmarks_specification/test-suites/memtier_benchmark-1Mkeys-string-setget200c-1KiB-pipeline-10.yml @@ -0,0 +1,40 @@ +version: 0.4 +name: memtier_benchmark-1Mkeys-string-setget200c-1KiB-pipeline-10 +description: Runs memtier_benchmark, for a keyspace of 1M keys with 10% SETs and 90% + GETs (mixed) with a data size of 1000 Bytes and pipeline 10. +dbconfig: + configuration-parameters: + save: '""' + check: + keyspacelen: 1000000 + preload_tool: + run_image: redislabs/memtier_benchmark:edge + tool: memtier_benchmark + arguments: --data-size 1000 --pipeline 50 -n allkeys --ratio 1:0 --key-pattern + P:P -c 1 -t 4 --hide-histogram --key-minimum 1 --key-maximum 1000000 + resources: + requests: + memory: 1g + dataset_name: 1Mkeys-string-1KiB-size + dataset_description: This dataset contains 1 million string keys, each with a data + size of 1 KiB. +tested-commands: +- set +- get +tested-groups: +- string +redis-topologies: +- oss-standalone +build-variants: +- gcc:15.2.0-amd64-debian-bookworm-default +- gcc:15.2.0-arm64-debian-bookworm-default +- dockerhub +clientconfig: + run_image: redislabs/memtier_benchmark:edge + tool: memtier_benchmark + arguments: '"--data-size" "1000" --ratio 1:10 --key-pattern R:R --key-minimum=1 --key-maximum 1000000 --test-time 180 -c 50 -t 4 --hide-histogram --pipeline 10' + resources: + requests: + cpus: '4' + memory: 2g +priority: 1 diff --git a/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-leaderboard.yml b/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-leaderboard.yml new file mode 100644 index 00000000..3edde918 --- /dev/null +++ b/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-leaderboard.yml @@ -0,0 +1,92 @@ +version: 0.4 +name: memtier_benchmark-playbook-leaderboard-top-100 +description: Runs memtier_benchmark, for a keyspace length of 1M keys loading/querying ZSETs. Esports/live events with constant score changes, occasional bursts of reads. writes ≈ 60, reads ≈ 40%. + with encoding:listpack with 100 elements. +dbconfig: + configuration-parameters: + save: '""' + check: + keyspacelen: 0 + resources: + requests: + memory: 1g +tested-commands: +- zadd +redis-topologies: +- oss-standalone +build-variants: +- gcc:15.2.0-amd64-debian-bookworm-default +- gcc:15.2.0-arm64-debian-bookworm-default +- dockerhub +clientconfig: + run_image: redislabs/memtier_benchmark:edge + tool: memtier_benchmark + arguments: --test-time 180 -c 50 -t 4 --command "ZADD __key__ 0.652540306855235 + UaZtxmrKBkxhBqJOlNJlwVUfHVALGkjnUemvubDvbGSVAaaQkXLlsg 0.8731899671198792 RKlVyoHMTvZcoh + 0.0785627468533846 VZWJQlqLBHicktajowIvkyuaddTC 0.7688563664469605 YaXTxnRGWhENdaOFuXNAJmx + 0.0033318113277969186 JjfFkvonkksjIfHS 0.49606648747577575 Q 0.8234766164292862 + MZgqGWzeYWDAlplbxvlzllKR 0.42687597903639085 AGSAuhePDVmONKmViHovKsiIGSXQZdqX + 0.34912978268081996 ZHIKgOWejxTOcrVCRxztCNqtVFEdqCbowiaROZfbhMmzFlr 0.15838599188422475 + UuyeNtxYcAmkfjtovBmajhGcYvzucpgFjLnqKIw 0.4376432899068222 cfgHnJGQAHaXHjLIuLWYkDACbxQbywRIQMbuE + 0.5536939917085721 uyZtiyKNdKhFhgNlcdtnHgDFEdBoovVYkyLIpREln 0.403546330051068 + kRUDWaPmeYXeJZhgqKpnroFxUSCQ 0.8808108541089766 PCGgHbGIRxpzFRfADhupCtBKhknUNyqK + 0.23044636914343264 OaHUUrbZEvSOmYOtbUzxnJq 0.6404180580960533 qJabjKLYYivniNNtGsbnBzRLzYP + 0.6907030016224224 KJrGsIeQESEPkBdTgUhrQuAUzEfySPhVBoNaWDjOawkRrjyktuOMuBcwp 0.24900135884008867 + bwQzFiMXWxSKtYJiFCSaDWxoAEoNGvFsMoAugjlIcWTRWhXyixemQnagvqL 0.7928656841898908 + NKtwuzbLhLCCPupqgkRsRdGijHmSrnqwdfSj 0.9331302297178864 TcZibcjyBLAbljRGHjFgwVeVjbeZwi + 0.640319798434186 nsmIvzevkqEJkJTiybCjqiXRjJVMqSqsSaHKxTzJmygg 0.5239886233297175 + DZGleQLgYgAjWUcCOhzqsi 0.6136266546940706 ZVlOgBjrpZhQuQzJYVLYqnxytasTT 0.6591139297465682 + AFgEEoLcwlGmoWeYLZZJnlilPagiaWdqhItaEMLovOfbaPAgtCeeyjCuEV 0.4468461455464968 + pAjBRwToyaDwhSExhMreHmbgzPuJFPlpVrBbjhkyogmUAQamL 0.5614231865616031 KyArAdlPTTgxCatXdDQIPgZWHCBZicfkZjCKCbuAuvA + 0.022487789155224203 qOuBflYpFEUWKIfvrIuZYEwJYtSIwSlvegDBDYCUTKnBoRWpwDuBAHgYbTwURCW + 0.8510804209364501 QcjxLOamZbsRzGPlmSvuvBpYmDaooLoY 0.43460695876638156 WcTDiiDHQwNoubgkihXThvM + 0.5610492156901229 XPGwnyfBmCnhkPlJxhZhPrjvXoGKWEWbqrFvZauVZGQ 0.7513407089150304 + MTXpbeTMlXoxUsCDccRYHeKKfYunLBZuHbLgJmNStiiALTm 0.09175823221394674 vkAxXiwoCWTBUFUVtpBfpAlILPGaMvUrFaRuBqEgqvWnISaZaPTxiklQtzGS + 0.8586634748104288 wyThKKWAeRg 0.36952587840155204 cRkudmpGSSMUnPOMhPSzjQ 0.538356756374977 + TsRYebuuDgtamrxVbrxOaCBUGWEaGheHKWgzWsgPxVBfLwAev 0.5218317459277027 N 0.658938125456635 + mAI 0.8474868095237909 JrYvYRtaFZldysTlMUVfZfVpHrSzayZGWysedghOV 0.20143282352735015 + WjaRWpmPlWwPuepbXywfDmqIEPWnBh 0.23967093347793234 ZCLtDpzxZHqWUMzAJOfavhqZrGrPiGZQSBYDPqfyqqYAbGIHuyskvLk + 0.8108325344648399 WhxmpALjwYdCOMwTeHRbFRSFfktMmQUVItotZiICjWfwebl 0.1322111892750637 + dfVVGlvVTMufbfSsy 0.2285660170875491 DBqEkifRzIyvzTydaSqNrqalAXBZAkccYOuIBFEn + 0.9396586731821924 NyIOQsgOAQevXYKYhGZXjVzTeqPiwjdJhtq 0.32374056012496966 TQYLPdswKkUntJEjvGWJTWxmVJGdBcbIclOrYzVqqtfFznolqcHYTVj + 0.9080986140709963 IdUdmeLZNHLsCGaVkEdfKMemYRdzhQUqvhuQdXnYchGWXyYRAYZQWkb 0.3129458198716534 + LJFPbCuN 0.39349745859330765 YTPdFVszdCVVgGGHKqoQxfjU 0.14704886640549086 fOFwwDNEsxjfpkUrwd + 0.9168814654086035 rJSASiFxeYIINRbbgTBqPbATrcKOP 0.3100306836090321 VKKOyHveatcnmQR + 0.2370937718635434 OCaIAjJkxSLYaWpMAVjmQoOXoZJMcAO 0.4941647120371836 neeyLswlmLnZXSCDtFx + 0.269336828778751 UuIrk 0.12151325035284255 FmCgjvkoPmAcscWZixkzsFgsQBGAmDHqFSTaKcUdfxlfYWu + 0.0035001439463812067 aThlZZWlDITljruiWa 0.5009568203132024 TBszlGVnMdDvMOLiCysTjjANuFqeq + 0.4783242512285928 dafCundgmRFJLzTEexYHeprmCXFjxrJYLZAcsLXhTrgC 0.531664059031722 + reqfWtmvctIfWtqVEuaTEGfarDa 0.47961187976147 CBvTozpAXrLpS 0.10226598211977789 + eGQCebwBiUOPhMfrVRHiThsDeIFzPDEKHczj 0.721486119508813 nHhoxBCSgFFhJWBhTSVVXBpdFoLKYYKZmWEgvCfa + 0.3044462915617381 AMu 0.8380361220680647 tQiutM 0.7791893412340167 jcKQrBTFeuiuHCprihKx + 0.9140067046543505 bHuocqbTLhTHlHKIfDdemFDPwrcYFeJrXXafsATvSmFvKGYKRJYnqFmwKoRuaptI + 0.6755477551341951 sJuZYHZFrVYyTvMdbMZJDuCqUmpRIsEgizBVplIPCkSAtGc 0.843803368180551 + WgcCuSQuSzcvSPOzzXViuXfnOtBSxjm 0.4683803962889672 CqNaseFeIzhsgZdZDgySGfNawLz + 0.300477038321727 pLiQVPjQuEROtmKceRjEcaiZxMvMSIeailtKgwUeVsgqQUdTUbWg 0.6782593142253811 + bgbVdSCYQstqQQloWsJLCYYrmdvtmKeNXDfWGHvzdyTFuuiMwSxuSZa 0.09916181882671649 jHWIgQCltkPBgzLWWTfRJlLFrxesFUmaJzfeeKDsFglvPAwNIDCDdlvicM + 0.031870198089671176 YfsobnenpUlKHCXNgKYVeWHNRFEPOsAU 0.9753495730511261 OHVmNYCiDZFlkmsqhwgc + 0.6796131843206142 CPVsbsqYVeWPVRsTZKTvlcDwHnUzHpZyEchJqxnbrWM 0.41149806297291536 + MGFGt 0.0635854843880973 xVozQKiljlffdYDherQcdYdEmEX 0.17349539011413317 SORWOElEDHqZuOvYslqzY + 0.6957717601786134 XNVNKDBXRoFHDAu 0.514568844593022 GTagGWDGPcixUFOSdZTBaKpiJSPiKhmyricEAMLBwjQyEXmJIZn + 0.5054082476784039 YOxGoUKeShnXhcdvTJQFGTukjSiRZFidGRfkttgXJBeKKnJebZNubpk 0.5791117949403571 + PFfNxLyEMLCXtgEQVpFVGxadSZeOXdaSCaQmDBrMdJLSlIXAnerUpaF 0.6869490812905924 EDFLipKWqONzJ + 0.5858117779265328 ufGNBddDuAsmqgFMQYqtMrOTUbOiexjEYqLKdPLscwavpbHWCbGvMSKfkp + 0.3328807181469634 kIxgjCASHjQbqrvhxiPozqCtRVebWwuHCptoxBJ 0.5593178033061493 + BHmhWwOEWhCelxVLRsUSiAsSicrsXyAYhbrOWaDtKVVDyVpgTtkbO 0.9568394270185203 XrsvMeAof + 0.2895493213801318 eVIlCARlsKEygkqngxlVoJIhthE 0.36404485659899855 tAahXQIJDAbNWOgsSyAUkgwVcIzNvkawniDRxludhXrgdbReUJWKaDjPkLNCi + 0.6751249599564046 zUNDGaygGLVztZyQgSMyevPstQKjDVTMafZnepYqPvANg 0.4002349143471098 + blBkPgUMFnFxBlocRBolScezbONPxsKrGsdsMsiZWrhuRsINONXImqQlQY 0.4373588125087955 + LpfFjgOkMnuixmgfGamVKyPICLQQzNKmdJaJRnflC 0.8323339473420782 TsvDvTRrfMUEskXqXTEHTKzYcuzaoBTAyKVkcnvvqRTgXItNQwJ + 0.7464672802658118 GpXkYQokvVduxNQfcaSYTSiZsOMxrAlNMtPPdWCLKWWqbeunMDLgkIRRu 0.15234267491477727 + bTapHmxvqZNdGNP 0.42610519579163275 DllLIwysTAcQqwVPjAGkGEedTflBevgZmdgwMbKpNdKJoWGteGWJwyhrI + 0.05073435890699274 uoKiwzrJXTOqaDpeWknKEXdTkQziOGXjZZikNbaEPwOMFvlKoERaUq 0.8053618509879708 + UYuKZw 0.21335197746306034 OqRQLePoVHjfSoZV 0.9955843393406656 s" + --command-key-pattern="R" --command-ratio 60 --command "ZREVRANGE __key__ 0 1 WITHSCORES" --command-key-pattern="R" --command-ratio 40 + --key-minimum=1 --key-maximum 1000000 --hide-histogram + resources: + requests: + cpus: '4' + memory: 4g +tested-groups: +- sorted-set +priority: 12 diff --git a/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-rate-limiting-lua-100k-sessions.yml b/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-rate-limiting-lua-100k-sessions.yml new file mode 100644 index 00000000..b6996c60 --- /dev/null +++ b/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-rate-limiting-lua-100k-sessions.yml @@ -0,0 +1,65 @@ +version: 0.4 +name: memtier_benchmark-playbook-rate-limiting-lua-100k-sessions +description: | + Runs memtier_benchmark to model a **bitmap-based rate limit gate** using Lua: the script performs + a `BITCOUNT` on a single bitmap key and **allows** if the count is below a configurable threshold, + otherwise **denies** (no other data structures or mutations are performed). + +dbconfig: + configuration-parameters: + save: '""' + + init_commands: + - '"SETBIT" "1" "100000" "1"' + preload_tool: + run_image: redislabs/memtier_benchmark:edge + tool: memtier_benchmark + arguments: > + --hide-histogram + --command "SETBIT __key__ __key__ 1" + --key-maximum 100000 + --key-minimum 1 + --key-prefix "" + -t 1 -c 1 + --command-key-pattern R + --distinct-client-seed + -n 100000 + --pipeline 50 + resources: + requests: + cpus: '2' + memory: 1g + +tested-commands: +- bitcount +- eval +tested-groups: +- bitmap +- scripting + +redis-topologies: +- oss-standalone + +build-variants: +- gcc:15.2.0-amd64-debian-bookworm-default +- gcc:15.2.0-arm64-debian-bookworm-default +- dockerhub + +clientconfig: + run_image: redislabs/memtier_benchmark:edge + tool: memtier_benchmark + arguments: > + --print-percentiles=50,90,95,99 + --run-count=1 + --test-time=120 + --hide-histogram + --key-maximum 100000 + --key-minimum 1 + --key-prefix "" + --command='EVAL "local k=KEYS[1];local limit=tonumber(ARGV[1]);local c=redis.call(\"BITCOUNT\",k);if c>=limit then return {0,0,c} else return {1,limit-c,c} end" 1 __key__ 2000000' + resources: + requests: + cpus: '2' + memory: 2g + +priority: 19 diff --git a/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-realtime-analytics-membership-pipeline-10.yml b/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-realtime-analytics-membership-pipeline-10.yml new file mode 100644 index 00000000..3c3129b7 --- /dev/null +++ b/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-realtime-analytics-membership-pipeline-10.yml @@ -0,0 +1,55 @@ +version: 0.4 +name: memtier_benchmark-playbook-realtime-analytics-membership-pipeline-10 +description: 'This benchmark captures the performance of Redis under realistic **membership analytics** workloads, where frequent set algebra operations (intersection, union, difference) drive personalization, segmentation, and eligibility checks.' +dbconfig: + configuration-parameters: + save: '""' + check: + keyspacelen: 2 + resources: + requests: + memory: 1g + init_commands: + - '"SADD" "set:{org1}:10" "lysbgqqfqw" "mtccjerdon" "jekkafodvk" "nmgxcctxpn" "vyqqkuszzh" + "pytrnqdhvs" "oguwnmniig" "gekntrykfh" "nhfnbxqgol" "cgoeihlnei"' + - '"SADD" "set:{org1}:100" "vyoomgwuzv" "xamjodnbpf" "ewomnmugfa" "ljcgdooafo" "pcxdhdjwnf" + "djetcyfxuc" "licotqplim" "alqlzsvuuz" "ijsmoyesvd" "whmotknaff" "rkaznetutk" + "ksqpdywgdd" "gorgpnnqwr" "gekntrykfh" "rjkknoigmu" "luemuetmia" "gxephxbdru" + "ncjfckgkcl" "hhjclfbbka" "cgoeihlnei" "zwnitejtpg" "upodnpqenn" "mibvtmqxcy" + "htvbwmfyic" "rqvryfvlie" "nxcdcaqgit" "gfdqdrondm" "lysbgqqfqw" "nxzsnkmxvi" + "nsxaigrnje" "cwaveajmcz" "xsepfhdizi" "owtkxlzaci" "agsdggdghc" "tcjvjofxtd" + "kgqrovsxce" "ouuybhtvyb" "ueyrvldzwl" "vpbkvwgxsf" "pytrnqdhvs" "qbiwbqiubb" + "ssjqrsluod" "urvgxwbiiz" "ujrxcmpvsq" "mtccjerdon" "xczfmrxrja" "imyizmhzjk" + "oguwnmniig" "mxwgdcutnb" "pqyurbvifk" "ccagtnjilc" "mbxohpancs" "lgrkndhekf" + "eqlgkwosie" "jxoxtnzujs" "lbtpbknelm" "ichqzmiyot" "mbgehjiauu" "aovfsvbwjg" + "nmgxcctxpn" "vyqqkuszzh" "rojeolnopp" "ibhohmfxzt" "qbyhorvill" "nhfnbxqgol" + "wkbasfyzqz" "mjjuylgssm" "imdqxmkzdj" "oapbvnisyq" "bqntlsaqjb" "ocrcszcznp" + "hhniikmtsx" "hlpdstpvzw" "wqiwdbncmt" "vymjzlzqcn" "hhjchwjlmc" "ypfeltycpy" + "qjyeqcfhjj" "uapsgmizgh" "owbbdezgxn" "qrosceblyo" "sahqeskveq" "dapacykoah" + "wvcnqbvlnf" "perfwnpvkl" "ulbrotlhze" "fhuvzpxjbc" "holjcdpijr" "onzjrteqmu" + "pquewclxuy" "vpmpffdoqz" "eouliovvra" "vxcbagyymm" "jekkafodvk" "ypekeuutef" + "dlbqcynhrn" "erxulvebrj" "qwxrsgafzy" "dlsjwmqzhx" "exvhmqxvvp"' + dataset_name: 2keys-set-10-100-elements-org1 + dataset_description: This dataset contains 2 set keys, one with 10 elements and + the other with 100 elements. The smaller set is a subset of the larger one. +tested-commands: +- smembers +- sdiff +redis-topologies: +- oss-standalone +build-variants: +- gcc:15.2.0-amd64-debian-bookworm-default +- gcc:15.2.0-arm64-debian-bookworm-default +- dockerhub +clientconfig: + run_image: redislabs/memtier_benchmark:edge + tool: memtier_benchmark + arguments: --command="SDIFF set:{org1}:100 set:{org1}:10" --command "SMEMBERS set:{org1}:100" --command + "SMEMBERS set:{org1}:10" --command "SUNION set:{org1}:100 set:{org1}:10" --hide-histogram --test-time 180 --pipeline 10 + resources: + requests: + cpus: '4' + memory: 2g +tested-groups: +- set +priority: 131 diff --git a/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-realtime-analytics-membership.yml b/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-realtime-analytics-membership.yml new file mode 100644 index 00000000..dbf6a590 --- /dev/null +++ b/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-realtime-analytics-membership.yml @@ -0,0 +1,55 @@ +version: 0.4 +name: memtier_benchmark-playbook-realtime-analytics-membership +description: 'This benchmark captures the performance of Redis under realistic **membership analytics** workloads, where frequent set algebra operations (intersection, union, difference) drive personalization, segmentation, and eligibility checks.' +dbconfig: + configuration-parameters: + save: '""' + check: + keyspacelen: 2 + resources: + requests: + memory: 1g + init_commands: + - '"SADD" "set:{org1}:10" "lysbgqqfqw" "mtccjerdon" "jekkafodvk" "nmgxcctxpn" "vyqqkuszzh" + "pytrnqdhvs" "oguwnmniig" "gekntrykfh" "nhfnbxqgol" "cgoeihlnei"' + - '"SADD" "set:{org1}:100" "vyoomgwuzv" "xamjodnbpf" "ewomnmugfa" "ljcgdooafo" "pcxdhdjwnf" + "djetcyfxuc" "licotqplim" "alqlzsvuuz" "ijsmoyesvd" "whmotknaff" "rkaznetutk" + "ksqpdywgdd" "gorgpnnqwr" "gekntrykfh" "rjkknoigmu" "luemuetmia" "gxephxbdru" + "ncjfckgkcl" "hhjclfbbka" "cgoeihlnei" "zwnitejtpg" "upodnpqenn" "mibvtmqxcy" + "htvbwmfyic" "rqvryfvlie" "nxcdcaqgit" "gfdqdrondm" "lysbgqqfqw" "nxzsnkmxvi" + "nsxaigrnje" "cwaveajmcz" "xsepfhdizi" "owtkxlzaci" "agsdggdghc" "tcjvjofxtd" + "kgqrovsxce" "ouuybhtvyb" "ueyrvldzwl" "vpbkvwgxsf" "pytrnqdhvs" "qbiwbqiubb" + "ssjqrsluod" "urvgxwbiiz" "ujrxcmpvsq" "mtccjerdon" "xczfmrxrja" "imyizmhzjk" + "oguwnmniig" "mxwgdcutnb" "pqyurbvifk" "ccagtnjilc" "mbxohpancs" "lgrkndhekf" + "eqlgkwosie" "jxoxtnzujs" "lbtpbknelm" "ichqzmiyot" "mbgehjiauu" "aovfsvbwjg" + "nmgxcctxpn" "vyqqkuszzh" "rojeolnopp" "ibhohmfxzt" "qbyhorvill" "nhfnbxqgol" + "wkbasfyzqz" "mjjuylgssm" "imdqxmkzdj" "oapbvnisyq" "bqntlsaqjb" "ocrcszcznp" + "hhniikmtsx" "hlpdstpvzw" "wqiwdbncmt" "vymjzlzqcn" "hhjchwjlmc" "ypfeltycpy" + "qjyeqcfhjj" "uapsgmizgh" "owbbdezgxn" "qrosceblyo" "sahqeskveq" "dapacykoah" + "wvcnqbvlnf" "perfwnpvkl" "ulbrotlhze" "fhuvzpxjbc" "holjcdpijr" "onzjrteqmu" + "pquewclxuy" "vpmpffdoqz" "eouliovvra" "vxcbagyymm" "jekkafodvk" "ypekeuutef" + "dlbqcynhrn" "erxulvebrj" "qwxrsgafzy" "dlsjwmqzhx" "exvhmqxvvp"' + dataset_name: 2keys-set-10-100-elements-org1 + dataset_description: This dataset contains 2 set keys, one with 10 elements and + the other with 100 elements. The smaller set is a subset of the larger one. +tested-commands: +- smembers +- sdiff +redis-topologies: +- oss-standalone +build-variants: +- gcc:15.2.0-amd64-debian-bookworm-default +- gcc:15.2.0-arm64-debian-bookworm-default +- dockerhub +clientconfig: + run_image: redislabs/memtier_benchmark:edge + tool: memtier_benchmark + arguments: --command="SDIFF set:{org1}:100 set:{org1}:10" --command "SMEMBERS set:{org1}:100" --command + "SMEMBERS set:{org1}:10" --command "SUNION set:{org1}:100 set:{org1}:10" --hide-histogram --test-time 180 + resources: + requests: + cpus: '4' + memory: 2g +tested-groups: +- set +priority: 131 diff --git a/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-session-caching-json-100k-sessions.yml b/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-session-caching-json-100k-sessions.yml new file mode 100644 index 00000000..6d9582d6 --- /dev/null +++ b/redis_benchmarks_specification/test-suites/memtier_benchmark-playbook-session-caching-json-100k-sessions.yml @@ -0,0 +1,109 @@ +version: 0.4 +name: memtier_benchmark-session-caching-json-100k-sessions +description: | + Runs memtier_benchmark to simulate a session caching workload for a SaaS application. + This benchmark focuses exclusively on JSON-based session storage, where each session + is stored as a Redis JSON document (`session:`) with fields like user ID, timestamps, device info, + and metadata (total ~400–600B). + + The benchmark models a typical read-heavy cache usage pattern, with an approximate + **read:write ratio of 90:10**, reflecting session retrievals and infrequent updates. + + Command groups: + - Session cache reads (`JSON.GET`): ~90% + - Session cache writes (`JSON.SET`): ~10% + + To better approximate real-world access patterns, the benchmark uses a **Zipfian key distribution** + (`--command-key-pattern=Z`). This simulates **skewed access** where a small subset of sessions (hot keys) + receives a majority of reads — a common pattern in production workloads. + + While Zipfian is technically a power-law distribution, it effectively mimics **Poisson-like behavior** + in large-scale systems, where access frequency is uneven but statistically predictable. + This access skew mirrors real-life scenarios such as: + - Frequently accessed or "sticky" user sessions + - Popular user accounts or active devices + - Hot caches for trending or recently used resources + + Using Zipfian distribution allows this benchmark to capture **contention**, **cache pressure**, and + **read amplification** effects that occur in real SaaS applications under load. + + +dbconfig: + configuration-parameters: + save: '""' + resources: + requests: + memory: 1g + init_lua: | + local seed = 12345 + math.randomseed(seed) + local now = tonumber(redis.call('TIME')[1]) + local function rand_str(len) + local chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' + local res = '' + for i = 1, len do + local idx = math.random(#chars) + res = res .. chars:sub(idx, idx) + end + return res + end + for i = 1, 100000 do + local session_id = 'session:' .. i + local user_id = 'user-' .. i + local org_id = 'org-' .. i + local session_data = { + userId = user_id, + organizationId = org_id, + role = 'member', + createdAt = tostring(now - math.random(3600)), + lastAccessed = tostring(now), + ipAddress = '192.168.1.' .. (i % 255), + device = 'device-' .. rand_str(8), + authMethod = 'password', + status = 'active', + metadata = rand_str(200 + (i % 100)) + } + redis.call('JSON.SET', session_id, '$', cjson.encode(session_data)) + end + return 'OK' + +tested-groups: +- json + +tested-commands: +- json.get +- json.set + +redis-topologies: +- oss-standalone + +build-variants: +- gcc:15.2.0-amd64-debian-bookworm-default +- gcc:15.2.0-arm64-debian-bookworm-default +- dockerhub + +clientconfig: + run_image: redislabs/memtier_benchmark:edge + tool: memtier_benchmark + arguments: > + --key-prefix "" + --key-minimum 1 + --key-maximum 100000 + --data-size-range=400-600 + --pipeline=1 + --print-percentiles=50,90,95,99 + --run-count=1 + --test-time=120 + --command="JSON.GET session:__key__" + --command-key-pattern=Z + --command-ratio=90 + --command="JSON.SET session:__key__ $ \"{\\\"userId\\\":\\\"user-__key__\\\",\\\"organizationId\\\":\\\"org-__key__\\\",\\\"role\\\":\\\"admin\\\",\\\"email\\\":\\\"user__key__@example.com\\\",\\\"name\\\":\\\"User __key__\\\",\\\"permissions\\\":[\\\"read\\\",\\\"write\\\"],\\\"lastActivity\\\":__key__,\\\"ipAddress\\\":\\\"192.168.1.__key__\\\",\\\"userAgent\\\":\\\"Mozilla/5.0\\\",\\\"createdAt\\\":__key__}\"" + --command-key-pattern=Z + --command-ratio=10 + --hide-histogram + resources: + requests: + cpus: '4' + memory: 2g + +priority: 150 diff --git a/utils/tests/test_self_contained_coordinator_memtier.py b/utils/tests/test_self_contained_coordinator_memtier.py index 16160f4b..e5f95aa6 100644 --- a/utils/tests/test_self_contained_coordinator_memtier.py +++ b/utils/tests/test_self_contained_coordinator_memtier.py @@ -248,6 +248,7 @@ def test_self_contained_coordinator_dockerhub_preload(): build_arch, testDetails, build_os, + existing_artifact_keys=None, ) build_stream_fields["mnt_point"] = "" if result is True: @@ -1317,6 +1318,7 @@ def test_self_contained_coordinator_duplicated_ts(): build_os, git_timestamp_ms=timestamp, use_git_timestamp=True, + existing_artifact_keys=None, ) build_stream_fields["mnt_point"] = "" if result is True: