From 1e29cee1a67ea91d9757b620bee59d8bbdd98162 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 7 Nov 2025 07:22:30 -0800 Subject: [PATCH 1/9] adds integration tests to CI --- .github/workflows/gpu_test.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/gpu_test.yaml b/.github/workflows/gpu_test.yaml index 71455c122..a89e04225 100644 --- a/.github/workflows/gpu_test.yaml +++ b/.github/workflows/gpu_test.yaml @@ -40,7 +40,8 @@ jobs: - name: Install torchforge run: pip install uv && uv pip install . && uv pip install .[dev] - name: Run unit tests with coverage - # TODO add all tests run: pytest tests/unit_tests --cov=. --cov-report=xml --durations=20 -vv + - name: Run unit tests with coverage + run: pytest tests/integration_tests --cov=. --cov-report=xml --durations=20 -vv - name: Upload Coverage to Codecov uses: codecov/codecov-action@v3 From e74918aa748c51976d483486ef1494aa932d4753 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 7 Nov 2025 07:40:07 -0800 Subject: [PATCH 2/9] remove coder test --- tests/integration_tests/test_coder.py | 89 --------------------------- 1 file changed, 89 deletions(-) delete mode 100644 tests/integration_tests/test_coder.py diff --git a/tests/integration_tests/test_coder.py b/tests/integration_tests/test_coder.py deleted file mode 100644 index 45a80ec4d..000000000 --- a/tests/integration_tests/test_coder.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -""" -Integration tests for forge.actors.coder.SandboxedPythonCoder. - -Requires enroot to be installed. - -""" - -import os -import uuid - -import pytest - -from forge.actors.coder import SandboxedPythonCoder - - -@pytest.mark.timeout(30) -@pytest.mark.asyncio -async def test_coder_runs_python(): - """Integration test for SandboxedPythonCoder with real container execution.""" - # Create unique names to avoid test conflicts - unique_id = str(uuid.uuid1()) - container_name = f"test_sandbox_{unique_id}" - image_path = f"/tmp/python_test_{unique_id}.sqsh" - - coder = None - try: - coder = await SandboxedPythonCoder.as_actor( - docker_image="docker://python:3.10", - sqsh_image_path=image_path, - container_name=container_name, - ) - - # Execute code - results, _ = await coder.execute.call_one( - code="print('hello world')", - ) - print("Got results", results) - assert results == "hello world\n" - - finally: - # Clean up resources - if coder: - await SandboxedPythonCoder.shutdown(coder) - - # Clean up the image file - if os.path.exists(image_path): - os.unlink(image_path) - - -@pytest.mark.timeout(30) -@pytest.mark.asyncio -async def test_coder_catches_error(): - """Integration test for SandboxedPythonCoder with real container execution.""" - # Create unique names to avoid test conflicts - unique_id = str(uuid.uuid1()) - container_name = f"test_sandbox_{unique_id}" - image_path = f"/tmp/python_test_{unique_id}.sqsh" - - coder = None - try: - print("starting test") - coder = await SandboxedPythonCoder.as_actor( - docker_image="docker://python:3.10", - sqsh_image_path=image_path, - container_name=container_name, - ) - print("Got coder") - - # Execute code - _, stderr = await coder.execute.call_one( - code="hello world", - ) - print("got stderr", stderr) - assert "SyntaxError" in stderr - - finally: - # Clean up resources - if coder: - await SandboxedPythonCoder.shutdown(coder) - - # Clean up the image file - if os.path.exists(image_path): - os.unlink(image_path) From 7f918a16da06331bd935034500b551d7a0f70dab Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 7 Nov 2025 09:14:50 -0800 Subject: [PATCH 3/9] adds grpo test, fixes broken tests --- .github/workflows/gpu_test.yaml | 13 +- pyproject.toml | 1 + tests/integration_tests/README.md | 119 ++++++++++++++ tests/integration_tests/conftest.py | 13 ++ .../fixtures/grpo_smoke_test.yaml | 147 ++++++++++++++++++ tests/integration_tests/test_grpo_e2e.py | 102 ++++++++++++ tests/integration_tests/test_policy_update.py | 14 +- 7 files changed, 399 insertions(+), 10 deletions(-) create mode 100644 tests/integration_tests/README.md create mode 100644 tests/integration_tests/fixtures/grpo_smoke_test.yaml create mode 100644 tests/integration_tests/test_grpo_e2e.py diff --git a/.github/workflows/gpu_test.yaml b/.github/workflows/gpu_test.yaml index a89e04225..62a8727f1 100644 --- a/.github/workflows/gpu_test.yaml +++ b/.github/workflows/gpu_test.yaml @@ -40,8 +40,15 @@ jobs: - name: Install torchforge run: pip install uv && uv pip install . && uv pip install .[dev] - name: Run unit tests with coverage - run: pytest tests/unit_tests --cov=. --cov-report=xml --durations=20 -vv - - name: Run unit tests with coverage - run: pytest tests/integration_tests --cov=. --cov-report=xml --durations=20 -vv + run: PYTHONPATH=. pytest tests/unit_tests --cov=. --cov-report=xml --durations=20 -vv + - name: Run integration tests with coverage (each file separately for Monarch isolation) + run: | + # Run each test file in a separate process to avoid Monarch state leakage + # set -e ensures the CI fails if any test file fails (not just the last one) + set -e + for test_file in tests/integration_tests/test_*.py; do + echo "Running $test_file" + PYTHONPATH=. pytest "$test_file" --cov=. --cov-append --cov-report=xml --durations=20 -vv + done - name: Upload Coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/pyproject.toml b/pyproject.toml index 8460b5b78..c684f06de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ dev = [ "pytest", "pytest-cov", "pytest-timeout", + "pytest-xdist", "tensorboard", "expecttest", "tomli>=1.1.0", diff --git a/tests/integration_tests/README.md b/tests/integration_tests/README.md new file mode 100644 index 000000000..cd1ff84f2 --- /dev/null +++ b/tests/integration_tests/README.md @@ -0,0 +1,119 @@ +# Integration Tests + +This directory contains end-to-end integration tests for Forge components. + +## Running Tests + +### Important: Monarch Cleanup Issues + +Monarch has seen issues in the past with proper cleanup between tests, so **integration tests should NOT be run all together in a single pytest invocation**. Running multiple integration tests in the same process can cause state leakage and test failures. + +### Recommended Approach + +**Run individual test files:** +```bash +PYTHONPATH=. pytest tests/integration_tests/test_grpo_e2e.py -vv +PYTHONPATH=. pytest tests/integration_tests/test_policy_update.py -vv +``` + +**Run all integration tests (each in separate process):** +```bash +for f in tests/integration_tests/test_*.py; do + PYTHONPATH=. pytest "$f" -vv +done +``` + +**Run a specific test:** +```bash +PYTHONPATH=. pytest -s tests/integration_tests/test_grpo_e2e.py::TestGRPOEndToEnd::test_grpo_smoke_test +``` + +## Available Tests + +### `test_grpo_e2e.py` +End-to-end smoke test for the GRPO training loop. Runs the full pipeline (`apps/grpo/main.py`) for 3 training steps with a minimal configuration. + +**What it tests:** +- Actor/service initialization (policy, trainer, reference model, replay buffer, etc.) +- Rollout loop (data loading → generation → reward evaluation) +- Reference model logprob calculation +- Replay buffer batching +- Training step execution +- Weight synchronization (trainer → policy) +- Torchstore operations +- Clean shutdown + +**Config:** `fixtures/grpo_smoke_test.yaml` (minimal model, 1 step, small dataset) + +**Run:** +```bash +PYTHONPATH=. pytest tests/integration_tests/test_grpo_e2e.py -vv +``` + +### `test_policy_update.py` +Tests weight synchronization and sharding between RLTrainer and Policy services. + +**Run with default config:** +```bash +PYTHONPATH=. pytest tests/integration_tests/test_policy_update.py -vv +``` + +**Run with custom config:** +```bash +PYTHONPATH=. pytest -s tests/integration_tests/test_policy_update.py::TestWeightSync::test_sanity_check \ + --config tests/integration_tests/fixtures/qwen3_1_7b_tp.yaml --use_dcp=false +``` + +**Default config:** `fixtures/qwen3_1_7b_no_tp.yaml` + +### `test_vllm_policy_correctness.py` +Validates vLLM policy correctness. + +**Run:** +```bash +PYTHONPATH=. pytest tests/integration_tests/test_vllm_policy_correctness.py -vv +``` + +### `test_titan_fwd_vs_hf_fwd.py` +Compares Titan forward pass with HuggingFace forward pass. + +**Run:** +```bash +PYTHONPATH=. pytest tests/integration_tests/test_titan_fwd_vs_hf_fwd.py -vv +``` + +## Writing New Integration Tests + +When adding new integration tests: + +1. **Keep tests isolated** - Each test file should be runnable independently +2. **Use minimal configs** - Create small configs in `fixtures/` for fast testing +3. **Clean up resources** - Use pytest fixtures to ensure proper cleanup +4. **Skip when needed** - Use `@pytest.mark.skipif` for GPU/resource requirements +5. **Document run commands** - Add docstrings showing how to run the test + +### Example Test Structure + +```python +import pytest +import torch + +requires_cuda = pytest.mark.skipif( + not torch.cuda.is_available(), + reason="CUDA not available", +) + +@pytest.mark.asyncio +@requires_cuda +async def test_my_integration(): + """ + Run with: + PYTHONPATH=. pytest tests/integration_tests/test_my_integration.py -vv + """ + # Your test code here + pass +``` + +## CI Integration + +The CI pipeline (`.github/workflows/gpu_test.yaml`) automatically runs all integration tests, executing each test file in a separate process for isolation. diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index cf16a8cf0..dc3b0bcf6 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -4,6 +4,19 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. +"""Integration tests configuration. + +IMPORTANT: Due to Monarch's cleanup issues, integration tests should be run +separately for proper isolation. Run individual test files like: + + pytest tests/integration_tests/test_grpo_e2e.py -vv + pytest tests/integration_tests/test_policy_update.py -vv + +Or run all tests (each file in separate process): + + for f in tests/integration_tests/test_*.py; do pytest "$f" -vv; done +""" + import argparse import pytest diff --git a/tests/integration_tests/fixtures/grpo_smoke_test.yaml b/tests/integration_tests/fixtures/grpo_smoke_test.yaml new file mode 100644 index 000000000..35da38743 --- /dev/null +++ b/tests/integration_tests/fixtures/grpo_smoke_test.yaml @@ -0,0 +1,147 @@ +# Minimal GRPO configuration for integration testing +# This config is designed to run quickly with minimal resources + +# Global configuration +group_size: 2 # Minimal group size +local_batch_size: 2 # Minimal batch size +max_req_tokens: 128 # Reduced from 1024 +max_res_tokens: 128 # Reduced from 1024 +model: "Qwen/Qwen3-1.7B" +off_by_n: 1 + +# Main loop configuration +rollout_threads: 1 +training_threads: 1 + +# Observability configuration - console only for tests +metric_logging: + console: + logging_mode: global_reduce + +# Dataset configuration - streaming with limited samples +dataset: + path: "openai/gsm8k" + revision: "main" + data_split: "train" + streaming: true # Required by DatasetActor.sample() + model: ${model} + +# Policy configuration +policy: + engine_args: + model: ${model} + tensor_parallel_size: 1 + pipeline_parallel_size: 1 + enforce_eager: true # Eager mode for simpler testing + sampling_params: + n: ${group_size} + max_tokens: ${max_res_tokens} + temperature: 1.0 + top_p: 1.0 + +# Trainer configuration +trainer: + model: + name: qwen3 + flavor: 1.7B + hf_assets_path: hf://${model} + optimizer: + name: AdamW + lr: 1e-5 + eps: 1e-8 + lr_scheduler: + warmup_steps: 1 + training: + local_batch_size: ${local_batch_size} + seq_len: ${sum:${max_req_tokens},${max_res_tokens}} + max_norm: 1.0 + steps: 1 + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 1 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + disable_loss_parallel: true + checkpoint: + enable: true + folder: /tmp/grpo_test_checkpoint + initial_load_path: hf://${model} + initial_load_in_hf: true + last_save_in_hf: false # Don't save back to HF in tests + interval: 500 + async_mode: "disabled" + activation_checkpoint: + mode: selective + selective_ac_option: op + +# Replay buffer configuration +replay_buffer: + batch_size: ${local_batch_size} + max_policy_age: ${off_by_n} + dp_size: ${trainer.parallelism.data_parallel_shard_degree} + +# Reference model configuration +ref_model: + model: + name: qwen3 + flavor: 1.7B + hf_assets_path: hf://${model} + training: + seq_len: ${trainer.training.seq_len} + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 1 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + checkpoint: + enable: true + initial_load_path: hf://${model} + initial_load_in_hf: true + +# All resource allocations +services: + policy: + procs: ${policy.engine_args.tensor_parallel_size} + num_replicas: 1 + mesh_name: policy + with_gpus: true + ref_model: + procs: 1 + num_replicas: 1 + mesh_name: ref_model + with_gpus: true + reward_actor: + procs: 1 + num_replicas: 1 + mesh_name: reward_actor + with_gpus: false + +actors: + dataset: + procs: 1 + with_gpus: false + mesh_name: dataset + trainer: + procs: 1 + with_gpus: true + mesh_name: trainer + replay_buffer: + procs: 1 + with_gpus: false + mesh_name: replay_buffer + compute_advantages: + procs: 1 + with_gpus: false + mesh_name: compute_advantages diff --git a/tests/integration_tests/test_grpo_e2e.py b/tests/integration_tests/test_grpo_e2e.py new file mode 100644 index 000000000..c8bb8e074 --- /dev/null +++ b/tests/integration_tests/test_grpo_e2e.py @@ -0,0 +1,102 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +"""A simple smoke test that runs the GRPO loop for 3 steps. + +Run this with: +PYTHONPATH=. pytest -s tests/integration_tests/test_grpo_e2e.py::test_grpo_smoke_test + +For running all integration tests with proper isolation (due to Monarch cleanup issues): +for f in tests/integration_tests/test_*.py; do pytest "$f" -vv; done + +""" + +import logging +import shutil +from pathlib import Path + +import pytest +import torch + +from forge.util.config import resolve_hf_hub_paths +from omegaconf import DictConfig, OmegaConf + +logger: logging.Logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +requires_cuda = pytest.mark.skipif( + not torch.cuda.is_available(), + reason="CUDA not available", +) + +TEST_CHECKPOINT_DIR = "/tmp/grpo_test_checkpoint" + + +def _load_config(config_path: str) -> DictConfig: + """Load and resolve config from YAML file.""" + cfg = None + try: + cfg = OmegaConf.load(config_path) + except Exception as e: + pytest.fail(f"Failed to load config file {config_path}: {e}") + + assert isinstance(cfg, DictConfig) + cfg = resolve_hf_hub_paths(cfg) + return cfg + + +def _cleanup_checkpoint_dir(): + """Clean up test checkpoint directory.""" + path = Path(TEST_CHECKPOINT_DIR) + if path.exists() and path.is_dir(): + try: + shutil.rmtree(path) + logger.info(f"Successfully removed {TEST_CHECKPOINT_DIR}") + except Exception as e: + logger.error(f"Failed to remove {TEST_CHECKPOINT_DIR}: {e}") + + +class TestGRPOEndToEnd: + """End-to-end integration tests for GRPO training loop.""" + + @pytest.mark.asyncio + @requires_cuda + async def test_grpo_smoke_test(self): + """ + Smoke test for GRPO training loop. + + This test runs the full GRPO pipeline for 3 training steps to verify: + - All actors and services initialize correctly + - Rollout loop generates completions + - Rewards are evaluated + - Reference model computes logprobs + - Replay buffer collects and batches experiences + - Trainer updates weights + - Policy receives weight updates + - Training completes successfully + """ + try: + # Load test config + config_path = "tests/integration_tests/fixtures/grpo_smoke_test.yaml" + cfg = _load_config(config_path) + + logger.info("Starting GRPO smoke test with config:") + logger.info(f" Model: {cfg.model}") + logger.info(f" Group size: {cfg.group_size}") + logger.info(f" Batch size: {cfg.local_batch_size}") + logger.info(f" Training steps: {cfg.trainer.training.steps}") + logger.info( + f" Max req/res tokens: {cfg.max_req_tokens}/{cfg.max_res_tokens}" + ) + + # Import main here to avoid issues with module-level imports + from apps.grpo.main import main + + # Run the main training loop + # This should run for exactly 3 steps and then exit cleanly + await main(cfg) + finally: + # Cleanup + _cleanup_checkpoint_dir() diff --git a/tests/integration_tests/test_policy_update.py b/tests/integration_tests/test_policy_update.py index 202c10686..35edb8c82 100644 --- a/tests/integration_tests/test_policy_update.py +++ b/tests/integration_tests/test_policy_update.py @@ -152,9 +152,9 @@ async def _setup_and_teardown(request): # ---- setup ---- # config_path = request.config.getoption("--config", default=None) if not config_path: - pytest.skip( - "No config file provided. Use --config to specify a YAML config file" - ) + # Use default config if none provided + config_path = "tests/integration_tests/fixtures/qwen3_1_7b_no_tp.yaml" + logger.info(f"No config provided, using default: {config_path}") use_dcp_override = request.config.getoption("--use_dcp") cfg = _load_config(config_path=config_path) @@ -254,10 +254,10 @@ async def test_sanity_check(self, _setup_and_teardown): # Setting everything to zero await rl_trainer.zero_out_model_states.call() await rl_trainer.push_weights.call(policy_version=v1) - await policy.save_model_params.fanout() + await policy._test_save_model_params.fanout() # Sanity check that before update all the tests pass - all_errs = await policy.validate_model_params.fanout( + all_errs = await policy._test_validate_model_params.fanout( _test_validate_params_unchanged ) for errs in all_errs: @@ -265,7 +265,7 @@ async def test_sanity_check(self, _setup_and_teardown): assert not e, f"Validation failed with exception: {e}" await policy.update_weights.fanout(version=v1) - all_errs = await policy.validate_model_params.fanout( + all_errs = await policy._test_validate_model_params.fanout( _test_validate_params_all_zeros ) for errs in all_errs: @@ -274,7 +274,7 @@ async def test_sanity_check(self, _setup_and_teardown): # Reloading v0, getting back original weights await policy.update_weights.fanout(version=v0) - all_errs = await policy.validate_model_params.fanout( + all_errs = await policy._test_validate_model_params.fanout( _test_validate_params_unchanged ) for errs in all_errs: From eb801ab689c3987662cd755d76a98ee598fa7d31 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 7 Nov 2025 09:19:43 -0800 Subject: [PATCH 4/9] some comments --- .github/workflows/gpu_test.yaml | 4 +- pyproject.toml | 1 - tests/integration_tests/README.md | 86 ------------------------ tests/integration_tests/conftest.py | 12 ---- tests/integration_tests/test_grpo_e2e.py | 3 - 5 files changed, 2 insertions(+), 104 deletions(-) diff --git a/.github/workflows/gpu_test.yaml b/.github/workflows/gpu_test.yaml index 62a8727f1..b6f3f5f8c 100644 --- a/.github/workflows/gpu_test.yaml +++ b/.github/workflows/gpu_test.yaml @@ -41,11 +41,11 @@ jobs: run: pip install uv && uv pip install . && uv pip install .[dev] - name: Run unit tests with coverage run: PYTHONPATH=. pytest tests/unit_tests --cov=. --cov-report=xml --durations=20 -vv - - name: Run integration tests with coverage (each file separately for Monarch isolation) + - name: Run integration tests with coverage run: | - # Run each test file in a separate process to avoid Monarch state leakage # set -e ensures the CI fails if any test file fails (not just the last one) set -e + # Run each test file in a separate process to avoid Monarch state leakage for test_file in tests/integration_tests/test_*.py; do echo "Running $test_file" PYTHONPATH=. pytest "$test_file" --cov=. --cov-append --cov-report=xml --durations=20 -vv diff --git a/pyproject.toml b/pyproject.toml index c684f06de..8460b5b78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,6 @@ dev = [ "pytest", "pytest-cov", "pytest-timeout", - "pytest-xdist", "tensorboard", "expecttest", "tomli>=1.1.0", diff --git a/tests/integration_tests/README.md b/tests/integration_tests/README.md index cd1ff84f2..9054a18c8 100644 --- a/tests/integration_tests/README.md +++ b/tests/integration_tests/README.md @@ -28,92 +28,6 @@ done PYTHONPATH=. pytest -s tests/integration_tests/test_grpo_e2e.py::TestGRPOEndToEnd::test_grpo_smoke_test ``` -## Available Tests - -### `test_grpo_e2e.py` -End-to-end smoke test for the GRPO training loop. Runs the full pipeline (`apps/grpo/main.py`) for 3 training steps with a minimal configuration. - -**What it tests:** -- Actor/service initialization (policy, trainer, reference model, replay buffer, etc.) -- Rollout loop (data loading → generation → reward evaluation) -- Reference model logprob calculation -- Replay buffer batching -- Training step execution -- Weight synchronization (trainer → policy) -- Torchstore operations -- Clean shutdown - -**Config:** `fixtures/grpo_smoke_test.yaml` (minimal model, 1 step, small dataset) - -**Run:** -```bash -PYTHONPATH=. pytest tests/integration_tests/test_grpo_e2e.py -vv -``` - -### `test_policy_update.py` -Tests weight synchronization and sharding between RLTrainer and Policy services. - -**Run with default config:** -```bash -PYTHONPATH=. pytest tests/integration_tests/test_policy_update.py -vv -``` - -**Run with custom config:** -```bash -PYTHONPATH=. pytest -s tests/integration_tests/test_policy_update.py::TestWeightSync::test_sanity_check \ - --config tests/integration_tests/fixtures/qwen3_1_7b_tp.yaml --use_dcp=false -``` - -**Default config:** `fixtures/qwen3_1_7b_no_tp.yaml` - -### `test_vllm_policy_correctness.py` -Validates vLLM policy correctness. - -**Run:** -```bash -PYTHONPATH=. pytest tests/integration_tests/test_vllm_policy_correctness.py -vv -``` - -### `test_titan_fwd_vs_hf_fwd.py` -Compares Titan forward pass with HuggingFace forward pass. - -**Run:** -```bash -PYTHONPATH=. pytest tests/integration_tests/test_titan_fwd_vs_hf_fwd.py -vv -``` - -## Writing New Integration Tests - -When adding new integration tests: - -1. **Keep tests isolated** - Each test file should be runnable independently -2. **Use minimal configs** - Create small configs in `fixtures/` for fast testing -3. **Clean up resources** - Use pytest fixtures to ensure proper cleanup -4. **Skip when needed** - Use `@pytest.mark.skipif` for GPU/resource requirements -5. **Document run commands** - Add docstrings showing how to run the test - -### Example Test Structure - -```python -import pytest -import torch - -requires_cuda = pytest.mark.skipif( - not torch.cuda.is_available(), - reason="CUDA not available", -) - -@pytest.mark.asyncio -@requires_cuda -async def test_my_integration(): - """ - Run with: - PYTHONPATH=. pytest tests/integration_tests/test_my_integration.py -vv - """ - # Your test code here - pass -``` - ## CI Integration The CI pipeline (`.github/workflows/gpu_test.yaml`) automatically runs all integration tests, executing each test file in a separate process for isolation. diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index dc3b0bcf6..9c3d5a1f1 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -4,18 +4,6 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. -"""Integration tests configuration. - -IMPORTANT: Due to Monarch's cleanup issues, integration tests should be run -separately for proper isolation. Run individual test files like: - - pytest tests/integration_tests/test_grpo_e2e.py -vv - pytest tests/integration_tests/test_policy_update.py -vv - -Or run all tests (each file in separate process): - - for f in tests/integration_tests/test_*.py; do pytest "$f" -vv; done -""" import argparse diff --git a/tests/integration_tests/test_grpo_e2e.py b/tests/integration_tests/test_grpo_e2e.py index c8bb8e074..6cbcd2f03 100644 --- a/tests/integration_tests/test_grpo_e2e.py +++ b/tests/integration_tests/test_grpo_e2e.py @@ -8,9 +8,6 @@ Run this with: PYTHONPATH=. pytest -s tests/integration_tests/test_grpo_e2e.py::test_grpo_smoke_test -For running all integration tests with proper isolation (due to Monarch cleanup issues): -for f in tests/integration_tests/test_*.py; do pytest "$f" -vv; done - """ import logging From 9317bbc239e172625aff6b2ec6143ae9a4dfca29 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 7 Nov 2025 10:33:23 -0800 Subject: [PATCH 5/9] disable RDMA for ci --- .github/workflows/gpu_test.yaml | 4 +++- tests/integration_tests/test_grpo_e2e.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/.github/workflows/gpu_test.yaml b/.github/workflows/gpu_test.yaml index b6f3f5f8c..1880feffa 100644 --- a/.github/workflows/gpu_test.yaml +++ b/.github/workflows/gpu_test.yaml @@ -42,13 +42,15 @@ jobs: - name: Run unit tests with coverage run: PYTHONPATH=. pytest tests/unit_tests --cov=. --cov-report=xml --durations=20 -vv - name: Run integration tests with coverage + env: + TORCHSTORE_RDMA_ENABLED: "0" # Disable RDMA on CI to avoid hangs run: | # set -e ensures the CI fails if any test file fails (not just the last one) set -e # Run each test file in a separate process to avoid Monarch state leakage for test_file in tests/integration_tests/test_*.py; do echo "Running $test_file" - PYTHONPATH=. pytest "$test_file" --cov=. --cov-append --cov-report=xml --durations=20 -vv + PYTHONPATH=. pytest "$test_file" --cov=. --cov-append --cov-report=xml --durations=20 -vvs done - name: Upload Coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/tests/integration_tests/test_grpo_e2e.py b/tests/integration_tests/test_grpo_e2e.py index 6cbcd2f03..8bb538574 100644 --- a/tests/integration_tests/test_grpo_e2e.py +++ b/tests/integration_tests/test_grpo_e2e.py @@ -59,6 +59,7 @@ class TestGRPOEndToEnd: """End-to-end integration tests for GRPO training loop.""" @pytest.mark.asyncio + @pytest.mark.timeout(600) # 10 minute timeout to prevent hanging @requires_cuda async def test_grpo_smoke_test(self): """ @@ -74,6 +75,10 @@ async def test_grpo_smoke_test(self): - Policy receives weight updates - Training completes successfully """ + logger.info("=" * 80) + logger.info("Starting GRPO smoke test") + logger.info("=" * 80) + try: # Load test config config_path = "tests/integration_tests/fixtures/grpo_smoke_test.yaml" @@ -91,9 +96,20 @@ async def test_grpo_smoke_test(self): # Import main here to avoid issues with module-level imports from apps.grpo.main import main + logger.info("Starting main training loop...") # Run the main training loop # This should run for exactly 3 steps and then exit cleanly await main(cfg) + + logger.info("Main training loop completed successfully") + logger.info("GRPO smoke test completed successfully!") + + except Exception as e: + logger.error(f"GRPO smoke test failed with error: {e}") + raise finally: # Cleanup + logger.info("Cleaning up test checkpoint directory...") _cleanup_checkpoint_dir() + logger.info("Cleanup complete") + logger.info("=" * 80) From ed61dbda2d739d31ffa2c4810a0a5d32dbde2741 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 7 Nov 2025 10:49:51 -0800 Subject: [PATCH 6/9] ugly proc mesh stop workaround --- tests/integration_tests/test_grpo_e2e.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/integration_tests/test_grpo_e2e.py b/tests/integration_tests/test_grpo_e2e.py index 8bb538574..531532d06 100644 --- a/tests/integration_tests/test_grpo_e2e.py +++ b/tests/integration_tests/test_grpo_e2e.py @@ -14,12 +14,19 @@ import shutil from pathlib import Path +import monarch + import pytest import torch from forge.util.config import resolve_hf_hub_paths from omegaconf import DictConfig, OmegaConf +# Temporary workaround - without this, proc_mesh.stop +# will raise an exit code 1 failing all other tests. +monarch.actor.unhandled_fault_hook = lambda failure: None + + logger: logging.Logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) From d20a8cd06dee32470d6f5031a8bdccd51d15aa04 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 7 Nov 2025 11:00:34 -0800 Subject: [PATCH 7/9] monarch actor --- tests/integration_tests/test_grpo_e2e.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration_tests/test_grpo_e2e.py b/tests/integration_tests/test_grpo_e2e.py index 531532d06..5f564ed36 100644 --- a/tests/integration_tests/test_grpo_e2e.py +++ b/tests/integration_tests/test_grpo_e2e.py @@ -15,6 +15,7 @@ from pathlib import Path import monarch +import monarch.actor import pytest import torch From 4d3bb5ffe0985fd7ee3e67fbd7733ed1571bbea7 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 7 Nov 2025 11:18:48 -0800 Subject: [PATCH 8/9] disable temp --- .../{test_grpo_e2e.py => disabled_test_grpo_e2e.py} | 1 + 1 file changed, 1 insertion(+) rename tests/integration_tests/{test_grpo_e2e.py => disabled_test_grpo_e2e.py} (99%) diff --git a/tests/integration_tests/test_grpo_e2e.py b/tests/integration_tests/disabled_test_grpo_e2e.py similarity index 99% rename from tests/integration_tests/test_grpo_e2e.py rename to tests/integration_tests/disabled_test_grpo_e2e.py index 5f564ed36..6e35e21ce 100644 --- a/tests/integration_tests/test_grpo_e2e.py +++ b/tests/integration_tests/disabled_test_grpo_e2e.py @@ -8,6 +8,7 @@ Run this with: PYTHONPATH=. pytest -s tests/integration_tests/test_grpo_e2e.py::test_grpo_smoke_test + """ import logging From e73907523195e2f45b33edd5a5c0d7167112a372 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 7 Nov 2025 11:39:23 -0800 Subject: [PATCH 9/9] re-enable --- tests/integration_tests/test_grpo_e2e.py | 124 +++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 tests/integration_tests/test_grpo_e2e.py diff --git a/tests/integration_tests/test_grpo_e2e.py b/tests/integration_tests/test_grpo_e2e.py new file mode 100644 index 000000000..6e35e21ce --- /dev/null +++ b/tests/integration_tests/test_grpo_e2e.py @@ -0,0 +1,124 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +"""A simple smoke test that runs the GRPO loop for 3 steps. + +Run this with: +PYTHONPATH=. pytest -s tests/integration_tests/test_grpo_e2e.py::test_grpo_smoke_test + + +""" + +import logging +import shutil +from pathlib import Path + +import monarch +import monarch.actor + +import pytest +import torch + +from forge.util.config import resolve_hf_hub_paths +from omegaconf import DictConfig, OmegaConf + +# Temporary workaround - without this, proc_mesh.stop +# will raise an exit code 1 failing all other tests. +monarch.actor.unhandled_fault_hook = lambda failure: None + + +logger: logging.Logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +requires_cuda = pytest.mark.skipif( + not torch.cuda.is_available(), + reason="CUDA not available", +) + +TEST_CHECKPOINT_DIR = "/tmp/grpo_test_checkpoint" + + +def _load_config(config_path: str) -> DictConfig: + """Load and resolve config from YAML file.""" + cfg = None + try: + cfg = OmegaConf.load(config_path) + except Exception as e: + pytest.fail(f"Failed to load config file {config_path}: {e}") + + assert isinstance(cfg, DictConfig) + cfg = resolve_hf_hub_paths(cfg) + return cfg + + +def _cleanup_checkpoint_dir(): + """Clean up test checkpoint directory.""" + path = Path(TEST_CHECKPOINT_DIR) + if path.exists() and path.is_dir(): + try: + shutil.rmtree(path) + logger.info(f"Successfully removed {TEST_CHECKPOINT_DIR}") + except Exception as e: + logger.error(f"Failed to remove {TEST_CHECKPOINT_DIR}: {e}") + + +class TestGRPOEndToEnd: + """End-to-end integration tests for GRPO training loop.""" + + @pytest.mark.asyncio + @pytest.mark.timeout(600) # 10 minute timeout to prevent hanging + @requires_cuda + async def test_grpo_smoke_test(self): + """ + Smoke test for GRPO training loop. + + This test runs the full GRPO pipeline for 3 training steps to verify: + - All actors and services initialize correctly + - Rollout loop generates completions + - Rewards are evaluated + - Reference model computes logprobs + - Replay buffer collects and batches experiences + - Trainer updates weights + - Policy receives weight updates + - Training completes successfully + """ + logger.info("=" * 80) + logger.info("Starting GRPO smoke test") + logger.info("=" * 80) + + try: + # Load test config + config_path = "tests/integration_tests/fixtures/grpo_smoke_test.yaml" + cfg = _load_config(config_path) + + logger.info("Starting GRPO smoke test with config:") + logger.info(f" Model: {cfg.model}") + logger.info(f" Group size: {cfg.group_size}") + logger.info(f" Batch size: {cfg.local_batch_size}") + logger.info(f" Training steps: {cfg.trainer.training.steps}") + logger.info( + f" Max req/res tokens: {cfg.max_req_tokens}/{cfg.max_res_tokens}" + ) + + # Import main here to avoid issues with module-level imports + from apps.grpo.main import main + + logger.info("Starting main training loop...") + # Run the main training loop + # This should run for exactly 3 steps and then exit cleanly + await main(cfg) + + logger.info("Main training loop completed successfully") + logger.info("GRPO smoke test completed successfully!") + + except Exception as e: + logger.error(f"GRPO smoke test failed with error: {e}") + raise + finally: + # Cleanup + logger.info("Cleaning up test checkpoint directory...") + _cleanup_checkpoint_dir() + logger.info("Cleanup complete") + logger.info("=" * 80)