Skip to content

Commit dffc213

Browse files
committed
Add megatron_ray_fault_tolerant example with comprehensive fault tolerance
- Implements PPO-style training with Megatron and Ray - Features automatic actor recovery from failures - Includes backup actor pool for seamless replacement - Supports DP, TP, PP, and CP parallelism - Distributed checkpoint saving/loading - Process group re-initialization after failures - Added comprehensive documentation in README files
1 parent 812c876 commit dffc213

18 files changed

+4074
-0
lines changed

README.md

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Examples
2+
3+
This repository contains examples for deploying and running distributed applications.
4+
5+
## Job Examples
6+
7+
### 1. Hello World Job
8+
**Directory:** `01_job_hello_world/`
9+
10+
A simple "Hello World" example demonstrating how to submit and run basic jobs.
11+
12+
### 2. Image Processing
13+
**Directory:** `image_processing/`
14+
15+
Process large-scale image datasets using Ray Data. This example demonstrates processing the ReLAION-2B dataset with over 2 billion rows.
16+
17+
### 3. Megatron + Ray Fault Tolerant Training
18+
**Directory:** `megatron_ray_fault_tolerant/`
19+
20+
Implements PPO-style distributed training with Megatron and Ray, featuring comprehensive fault tolerance capabilities:
21+
- Automatic actor recovery from failures
22+
- Backup actor groups for seamless replacement
23+
- Distributed checkpoint saving/loading
24+
- Process group re-initialization after failures
25+
- Support for tensor, pipeline, data, and context parallelism
26+
27+
## Service Examples
28+
29+
### 1. Hello World Service
30+
**Directory:** `02_service_hello_world/`
31+
32+
A simple service deployment example demonstrating the basics of Ray Serve.
33+
34+
### 2. Deploy Llama 3.1 8B
35+
**Directory:** `03_deploy_llama_3_8b/`
36+
37+
Deploy Llama 3.1 8B model using Ray Serve and vLLM with autoscaling capabilities.
38+
39+
### 3. Deploy Llama 3.1 70B
40+
**Directory:** `deploy_llama_3_1_70b/`
41+
42+
Deploy the larger Llama 3.1 70B model with optimized serving configuration.
43+
44+
### 4. Tensor Parallel Serving
45+
**Directory:** `serve_tensor_parallel/`
46+
47+
Demonstrates tensor parallelism for serving large language models across multiple GPUs.
48+
49+
### 5. FastVideo Generation
50+
**Directory:** `video_generation_with_fastvideo/`
51+
52+
Deploy a video generation service using the FastVideo framework.
53+
54+
## Reinforcement Learning Examples
55+
56+
### SkyRL
57+
**Directory:** `skyrl/`
58+
59+
Reinforcement learning training example using Ray and distributed computing.
60+
61+
## Getting Started
62+
63+
Most examples include their own README with specific instructions. Generally, you'll need:
64+
65+
1. Install the Anyscale CLI:
66+
```bash
67+
pip install -U anyscale
68+
anyscale login
69+
```
70+
71+
2. Navigate to the example directory:
72+
```bash
73+
cd <example_directory>
74+
```
75+
76+
3. Deploy the service or submit the job:
77+
```bash
78+
# For services
79+
anyscale service deploy -f service.yaml
80+
81+
# For jobs
82+
anyscale job submit -f job.yaml
83+
```
84+
85+
## Requirements
86+
87+
- Anyscale account and CLI access
88+
- Appropriate cloud credentials configured
89+
- GPU resources for ML/LLM examples
90+
91+
## Contributing
92+
93+
When adding new examples:
94+
1. Create a descriptive directory name
95+
2. Include a README.md with setup and usage instructions
96+
3. Add appropriate YAML configuration files
97+
4. Update this main README with your example
98+
99+
## License
100+
101+
See individual example directories for specific licensing information.
102+
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__pycache__/
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
repos:
2+
- repo: https://github.com/astral-sh/ruff-pre-commit
3+
rev: v0.11.9
4+
hooks:
5+
- id: ruff
6+
args: [ --fix, --exit-non-zero-on-fix ]
7+
exclude: (^(skyagent)/.*)$
8+
9+
# Black needs to be ran after ruff with --fix
10+
- repo: https://github.com/psf/black
11+
rev: 24.10.0
12+
hooks:
13+
- id: black
14+
exclude: (^(skyagent)/.*)$
15+
16+
# Detect secrets and sensitive information
17+
- repo: https://github.com/gitleaks/gitleaks
18+
rev: v8.24.2
19+
hooks:
20+
- id: gitleaks
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
FROM anyscale/ray:2.51.0-slim-py312-cu128
2+
3+
RUN sudo apt-get update -y && sudo apt-get install -y wget kmod libxml2 build-essential libnuma-dev
4+
5+
# the cuda compiler here is needed for deepspeed
6+
RUN wget https://developer.download.nvidia.com/compute/cuda/12.8.0/local_installers/cuda_12.8.0_570.86.10_linux.run \
7+
&& sudo sh cuda_12.8.0_570.86.10_linux.run --silent --toolkit && rm -rf cuda_12.8.0_570.86.10_linux.run
8+
9+
RUN curl -LsSf https://astral.sh/uv/0.9.4/install.sh | sh
10+
RUN echo "export RAY_RUNTIME_ENV_HOOK=ray._private.runtime_env.uv_runtime_env_hook.hook" >> /home/ray/.bashrc
11+
12+
13+
RUN sudo apt-get update \
14+
&& sudo apt-get install -y openssh-server iputils-ping net-tools iproute2 traceroute netcat \
15+
libopenexr-dev libxi-dev libglfw3-dev libglew-dev libomp-dev libxinerama-dev libxcursor-dev tzdata \
16+
&& sudo apt-get clean && sudo rm -rf /var/lib/apt/lists/*
17+
18+
RUN sudo apt update && sudo apt install --fix-broken && sudo apt install -y default-jre-headless openjdk-8-jdk \
19+
&& sudo apt-get clean \
20+
&& sudo rm -rf /var/lib/apt/lists/*
21+
22+
# ---------- PyTorch + cuDNN + Transformer Engine ----------
23+
# PyTorch + cuDNN + Transformer Engine
24+
RUN pip install --no-cache-dir "torch==2.7.1" "nvidia-cudnn-cu12>=9.3" && \
25+
CUDNN_PATH="$(python -c 'import inspect, nvidia.cudnn as c, os; print(os.path.dirname(inspect.getfile(c)))')" && \
26+
sudo mkdir -p /opt && sudo ln -sfn "$CUDNN_PATH" /opt/cudnn && \
27+
echo "/opt/cudnn/lib" | sudo tee /etc/ld.so.conf.d/cudnn.conf >/dev/null && sudo ldconfig
28+
29+
ENV CUDNN_PATH=/opt/cudnn
30+
ENV CPATH=${CUDNN_PATH}/include:${CPATH}
31+
ENV LD_LIBRARY_PATH=${CUDNN_PATH}/lib:${LD_LIBRARY_PATH}
32+
33+
RUN pip install --no-cache-dir --no-build-isolation "transformer_engine[pytorch]==2.5.0"
34+
# --------------------
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
# Megatron + Ray Fault Tolerant Training
2+
3+
This example implements PPO-style distributed training using Megatron and Ray with comprehensive fault tolerance capabilities. The system can automatically recover from actor failures during training by utilizing backup actors and re-initializing process groups.
4+
5+
## Key Features
6+
7+
### Fault Tolerance Mechanisms
8+
9+
1. **Actor Health Monitoring**: Continuously monitors the health of distributed training actors
10+
2. **Backup Actor Pool**: Pre-allocated backup actors ready to replace failed workers
11+
3. **Automatic Recovery**: Seamlessly recovers from failures by:
12+
- Detecting dead actors
13+
- Destroying old process groups
14+
- Replacing failed actors with backup actors
15+
- Re-initializing process groups with new world size
16+
- Reloading model and optimizer state from checkpoints
17+
18+
4. **Distributed Checkpointing**: Implements efficient sharded checkpoint saving/loading using Megatron's distributed checkpointing
19+
5. **Process Group Management**: Handles NCCL process group initialization, destruction, and re-initialization
20+
21+
### Parallelism Support
22+
23+
- **Data Parallelism (DP)**: Distributes training data across multiple GPUs
24+
- **Tensor Parallelism (TP)**: Splits model tensors across GPUs
25+
- **Pipeline Parallelism (PP)**: Distributes model layers across GPUs
26+
- **Context Parallelism (CP)**: Enables sequence parallelism for long contexts
27+
28+
### Advanced Training Features
29+
30+
- **PPO Training**: Implements Proximal Policy Optimization with micro-batch accumulation
31+
- **Mixed Precision**: Supports BF16 training for improved performance
32+
- **Gradient Accumulation**: Handles micro-batches with automatic gradient accumulation
33+
- **Distributed Optimizer**: Uses Megatron's distributed optimizer for memory efficiency
34+
35+
## Architecture
36+
37+
### Core Components
38+
39+
1. **MegatronActor** (`megatron_actor.py`):
40+
- Individual training actor wrapping Megatron models
41+
- Handles model initialization, forward/backward passes, and checkpointing
42+
- Supports dynamic process group re-initialization
43+
44+
2. **MegatronActorGroup** (`megatron_actor.py`):
45+
- Manages a group of distributed actors
46+
- Implements fault recovery logic
47+
- Coordinates distributed training operations
48+
49+
3. **Dispatch System** (`dispatch.py`):
50+
- **MeshDispatch**: Distributes data across the device mesh (DP, SP, TP, PP)
51+
- **PassThroughDispatch**: Broadcasts same data/commands to all actors
52+
- Handles data sharding and result collection
53+
54+
4. **Training Batch** (`training_batch.py`):
55+
- Defines input/output batch structures for PPO training
56+
- Supports chunking and concatenation for distributed operations
57+
58+
5. **Checkpoint I/O** (`file_io.py`):
59+
- Cloud-aware file I/O supporting S3, GCS, and local storage
60+
- Efficient checkpoint upload/download with parallel transfers
61+
62+
## Getting Started
63+
64+
### Quick Start
65+
66+
```bash
67+
uv run --isolated main.py
68+
```
69+
70+
This will:
71+
1. Create a placement group with workers and backup GPUs
72+
2. Initialize the actor group and model
73+
3. Run a training step
74+
4. Save a checkpoint
75+
5. Simulate a failure by killing actors
76+
6. Recover from the failure using backup actors
77+
7. Resume training after recovery
78+
79+
### Configuration
80+
81+
Edit the `Config` class in `main.py` to customize:
82+
83+
```python
84+
@dataclass
85+
class Config:
86+
model: str = "Qwen/Qwen3-0.6B" # HuggingFace model name
87+
num_nodes: int = 1
88+
num_gpus_per_node: int = 4
89+
num_spare_gpus: int = 4 # Backup actors for fault tolerance
90+
mini_batch_size: int = 16
91+
micro_train_batch_size_per_gpu: int = 2
92+
93+
# Megatron parallelism settings
94+
megatron_config: MegatronConfig = field(default_factory=MegatronConfig)
95+
```
96+
97+
### Megatron Parallelism Configuration
98+
99+
```python
100+
@dataclass
101+
class MegatronConfig:
102+
tensor_model_parallel_size: int = 1 # TP degree
103+
pipeline_model_parallel_size: int = 1 # PP degree
104+
context_parallel_size: int = 1 # CP degree
105+
expert_model_parallel_size: int = 1 # For MoE models
106+
```
107+
108+
## Fault Recovery Workflow
109+
110+
1. **Training Phase**:
111+
- Actors perform distributed training using Megatron
112+
- Periodic checkpoints saved to cloud storage
113+
114+
2. **Failure Detection**:
115+
- System detects actor failures via health checks
116+
- Identifies affected data parallel groups
117+
118+
3. **Recovery Process**:
119+
- Destroy old process groups on healthy actors
120+
- Pop backup actors from the backup pool
121+
- Insert backup actors at failed ranks
122+
- Update world size and reassign ranks
123+
- Re-initialize process groups with new configuration
124+
- Reload model/optimizer state from checkpoint
125+
126+
4. **Resume Training**:
127+
- Continue training with recovered actor group
128+
- No loss of training progress (from last checkpoint)
129+
130+
## Advanced Usage
131+
132+
### Custom Dispatch Types
133+
134+
Register custom dispatch strategies:
135+
136+
```python
137+
from dispatch import register_dispatch_type, Dispatch
138+
139+
class CustomDispatch(Dispatch):
140+
# Implement dispatch, collect, and validate methods
141+
pass
142+
143+
register_dispatch_type("custom", CustomDispatch)
144+
```
145+
146+
### CPU Offloading (Experimental)
147+
148+
For faster recovery, offload model/optimizer state to CPU memory:
149+
150+
```python
151+
# Before failure
152+
ray.get(actor_group.async_run_ray_method("pass_through", "offload_to_cpu"))
153+
154+
# After recovery, on healthy actors
155+
ray.get(actor_group.async_run_ray_method("pass_through", "backload_to_gpu"))
156+
```
157+
158+
## Dependencies
159+
160+
See `pyproject.toml` for full dependency list. Key dependencies:
161+
- Ray for distributed orchestration
162+
- Megatron-Core for model parallelism
163+
- PyTorch with CUDA support
164+
- Transformers for model loading
165+
- vLLM and related libraries
166+
167+
## Running on Anyscale
168+
169+
Submit the job using:
170+
171+
```bash
172+
anyscale job submit -f job.yaml
173+
```
174+
175+
The job configuration in `job.yaml` specifies:
176+
- Container image with dependencies
177+
- GPU instance types (g6e.12xlarge with 4xL4)
178+
- Resource limits and scaling
179+
- Environment variables for NCCL configuration
180+
181+
## Limitations and Future Work
182+
183+
- Virtual pipeline parallelism not yet supported
184+
- CPU offloading optimization in progress
185+
- Async checkpoint saving planned for future releases
186+
187+
## References
188+
189+
- [Megatron-LM](https://github.com/NVIDIA/Megatron-LM)
190+
- [Ray Documentation](https://docs.ray.io/)
191+
- [Anyscale Platform](https://docs.anyscale.com/)

0 commit comments

Comments
 (0)