{"id":58,"date":"2024-12-10T21:10:00","date_gmt":"2024-12-10T21:10:00","guid":{"rendered":"https:\/\/neuronix.us\/?p=58"},"modified":"2025-01-26T17:25:41","modified_gmt":"2025-01-26T17:25:41","slug":"distributed-training-implementation-in-pytorch-using-ddp-and-horovod","status":"publish","type":"post","link":"https:\/\/neuronix.us\/?p=58","title":{"rendered":"Distributed Training: Implementation in PyTorch Using DDP and Horovod"},"content":{"rendered":"\n<h3 class=\"wp-block-heading\"><\/h3>\n\n\n\n<p class=\"wp-block-paragraph\">Distributed training allows deep learning models to scale across multiple GPUs or nodes, accelerating training and enabling the handling of larger models and datasets. Two common frameworks for distributed training in PyTorch are <strong>Distributed Data Parallel (DDP)<\/strong> and <strong>Horovod<\/strong>. Both offer tools for scaling training efficiently, but they operate differently.<\/p>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>1. Overview of DDP and Horovod<\/strong><\/h3>\n\n\n\n<figure class=\"wp-block-table\"><table class=\"has-fixed-layout\"><thead><tr><th><strong>Aspect<\/strong><\/th><th><strong>Distributed Data Parallel (DDP)<\/strong><\/th><th><strong>Horovod<\/strong><\/th><\/tr><\/thead><tbody><tr><td><strong>Primary Use Case<\/strong><\/td><td>Native PyTorch distributed training.<\/td><td>Multi-framework distributed training.<\/td><\/tr><tr><td><strong>Communication<\/strong><\/td><td>NCCL (NVIDIA Collective Communication Library) for GPU-to-GPU communication.<\/td><td>AllReduce operations for gradient aggregation.<\/td><\/tr><tr><td><strong>Ease of Use<\/strong><\/td><td>Requires PyTorch\u2019s native distributed package setup.<\/td><td>Simple API, supports multiple deep learning frameworks.<\/td><\/tr><tr><td><strong>Scalability<\/strong><\/td><td>Scales well across nodes and GPUs.<\/td><td>Scales efficiently across diverse infrastructures.<\/td><\/tr><tr><td><strong>Fault Tolerance<\/strong><\/td><td>Minimal support for node failure.<\/td><td>Better support for fault tolerance and elasticity.<\/td><\/tr><\/tbody><\/table><\/figure>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>2. Implementation with Distributed Data Parallel (DDP)<\/strong><\/h3>\n\n\n\n<p class=\"wp-block-paragraph\"><strong>DDP<\/strong> is PyTorch\u2019s native approach to distributed training, offering efficient communication and synchronized gradients across multiple GPUs or nodes.<\/p>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Steps to Set Up DDP<\/strong><\/h4>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Set Up the Environment<\/strong>:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Launch the script using <code>torch.distributed.launch<\/code> or <code>torchrun<\/code>.<\/li>\n\n\n\n<li>Specify the backend (e.g., <code>NCCL<\/code> for GPUs).<\/li>\n<\/ul>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Initialize the Distributed Process Group<\/strong>:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Each process needs to know about the global world (number of nodes and GPUs).<\/li>\n<\/ul>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Wrap the Model with DDP<\/strong>:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Use <code>torch.nn.parallel.DistributedDataParallel<\/code>.<\/li>\n<\/ul>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Split Data Across GPUs<\/strong>:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Use <code>torch.utils.data.DistributedSampler<\/code> to ensure data parallelism.<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Code Example for DDP<\/strong><\/h4>\n\n\n\n<pre class=\"wp-block-code\"><code>import os\nimport torch\nimport torch.distributed as dist\nimport torch.nn as nn\nimport torch.optim as optim\nfrom torch.nn.parallel import DistributedDataParallel as DDP\nfrom torch.utils.data import DataLoader, DistributedSampler, TensorDataset\n\ndef setup(rank, world_size):\n    # Initialize the process group\n    dist.init_process_group(\n        backend='nccl',\n        init_method='env:\/\/',\n        rank=rank,\n        world_size=world_size\n    )\n    torch.cuda.set_device(rank)\n\ndef cleanup():\n    dist.destroy_process_group()\n\ndef train(rank, world_size):\n    setup(rank, world_size)\n\n    # Create a simple dataset\n    dataset = TensorDataset(torch.randn(1000, 10), torch.randint(0, 2, (1000,)))\n    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)\n    dataloader = DataLoader(dataset, sampler=sampler, batch_size=32)\n\n    # Create a simple model\n    model = nn.Sequential(\n        nn.Linear(10, 32),\n        nn.ReLU(),\n        nn.Linear(32, 1),\n        nn.Sigmoid()\n    ).cuda(rank)\n\n    # Wrap the model with DDP\n    model = DDP(model, device_ids=&#91;rank])\n\n    # Define loss and optimizer\n    criterion = nn.BCELoss()\n    optimizer = optim.SGD(model.parameters(), lr=0.01)\n\n    # Training loop\n    for epoch in range(5):\n        for inputs, labels in dataloader:\n            inputs, labels = inputs.cuda(rank), labels.float().cuda(rank)\n            optimizer.zero_grad()\n            outputs = model(inputs)\n            loss = criterion(outputs.squeeze(), labels)\n            loss.backward()\n            optimizer.step()\n\n        print(f\"Rank {rank}, Epoch {epoch}, Loss: {loss.item()}\")\n\n    cleanup()\n\nif __name__ == \"__main__\":\n    world_size = torch.cuda.device_count()\n    torch.multiprocessing.spawn(train, args=(world_size,), nprocs=world_size, join=True)<\/code><\/pre>\n\n\n\n<p class=\"wp-block-paragraph\"><strong>Key Points<\/strong>:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li><code>torch.distributed.init_process_group<\/code>: Initializes the distributed backend.<\/li>\n\n\n\n<li><code>DistributedDataParallel<\/code>: Ensures gradients are synchronized across GPUs.<\/li>\n\n\n\n<li><code>DistributedSampler<\/code>: Splits data evenly across GPUs.<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>3. Implementation with Horovod<\/strong><\/h3>\n\n\n\n<p class=\"wp-block-paragraph\"><strong>Horovod<\/strong> is an open-source framework for distributed training that works across multiple deep learning frameworks (PyTorch, TensorFlow, etc.) and provides robust support for multi-node training.<\/p>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Steps to Set Up Horovod<\/strong><\/h4>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Install Horovod<\/strong>:<\/li>\n<\/ol>\n\n\n\n<pre class=\"wp-block-code\"><code>   pip install horovod&#91;pytorch]<\/code><\/pre>\n\n\n\n<ol start=\"2\" class=\"wp-block-list\">\n<li><strong>Initialize Horovod<\/strong>:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Use <code>hvd.init()<\/code> to initialize the environment.<\/li>\n<\/ul>\n\n\n\n<ol start=\"2\" class=\"wp-block-list\">\n<li><strong>Broadcast the Model<\/strong>:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Ensure all workers start with the same model weights using <code>hvd.broadcast_parameters<\/code>.<\/li>\n<\/ul>\n\n\n\n<ol start=\"2\" class=\"wp-block-list\">\n<li><strong>Wrap the Optimizer<\/strong>:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Use <code>hvd.DistributedOptimizer<\/code> to synchronize gradients across workers.<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Code Example for Horovod<\/strong><\/h4>\n\n\n\n<pre class=\"wp-block-code\"><code>import torch\nimport torch.nn as nn\nimport torch.optim as optim\nimport horovod.torch as hvd\nfrom torch.utils.data import DataLoader, TensorDataset\n\n# Initialize Horovod\nhvd.init()\ntorch.cuda.set_device(hvd.local_rank())\n\n# Create a simple dataset\ndataset = TensorDataset(torch.randn(1000, 10), torch.randint(0, 2, (1000,)))\ndataloader = DataLoader(dataset, batch_size=32, shuffle=True)\n\n# Create a simple model\nmodel = nn.Sequential(\n    nn.Linear(10, 32),\n    nn.ReLU(),\n    nn.Linear(32, 1),\n    nn.Sigmoid()\n).cuda()\n\n# Broadcast parameters from rank 0 to ensure all workers start with the same model weights\nhvd.broadcast_parameters(model.state_dict(), root_rank=0)\n\n# Wrap optimizer with Horovod\noptimizer = optim.SGD(model.parameters(), lr=0.01)\noptimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())\n\n# Define loss function\ncriterion = nn.BCELoss()\n\n# Training loop\nfor epoch in range(5):\n    for inputs, labels in dataloader:\n        inputs, labels = inputs.cuda(), labels.float().cuda()\n        optimizer.zero_grad()\n        outputs = model(inputs)\n        loss = criterion(outputs.squeeze(), labels)\n        loss.backward()\n        optimizer.step()\n\n    print(f\"Rank {hvd.rank()}, Epoch {epoch}, Loss: {loss.item()}\")<\/code><\/pre>\n\n\n\n<p class=\"wp-block-paragraph\"><strong>Key Points<\/strong>:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li><code>hvd.init()<\/code>: Initializes Horovod for distributed training.<\/li>\n\n\n\n<li><code>hvd.DistributedOptimizer<\/code>: Handles gradient synchronization across workers.<\/li>\n\n\n\n<li><code>hvd.broadcast_parameters<\/code>: Ensures all workers start with the same initial weights.<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>4. Comparison of DDP and Horovod<\/strong><\/h3>\n\n\n\n<figure class=\"wp-block-table\"><table class=\"has-fixed-layout\"><thead><tr><th><strong>Aspect<\/strong><\/th><th><strong>DDP<\/strong><\/th><th><strong>Horovod<\/strong><\/th><\/tr><\/thead><tbody><tr><td><strong>Ease of Use<\/strong><\/td><td>Built into PyTorch but requires setup of process groups.<\/td><td>Simpler API for multi-node setups.<\/td><\/tr><tr><td><strong>Framework Support<\/strong><\/td><td>PyTorch only.<\/td><td>PyTorch, TensorFlow, MXNet, etc.<\/td><\/tr><tr><td><strong>Communication<\/strong><\/td><td>NCCL for fast GPU-to-GPU communication.<\/td><td>AllReduce with better fault tolerance.<\/td><\/tr><tr><td><strong>Scalability<\/strong><\/td><td>High, optimized for large-scale training.<\/td><td>High, with better multi-node scaling.<\/td><\/tr><\/tbody><\/table><\/figure>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>5. Best Practices for Distributed Training<\/strong><\/h3>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Efficient Communication<\/strong>:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Use NCCL for GPU communication.<\/li>\n\n\n\n<li>Optimize batch sizes to minimize communication overhead.<\/li>\n<\/ul>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Resource Allocation<\/strong>:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Ensure all workers have balanced workloads using distributed samplers.<\/li>\n<\/ul>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Fault Tolerance<\/strong>:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li>For multi-node training, Horovod\u2019s elasticity can handle node failures better than DDP.<\/li>\n<\/ul>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Monitoring<\/strong>:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Use tools like NVIDIA\u2019s <strong>Nsight Systems<\/strong> or PyTorch\u2019s <strong>Profiler<\/strong> to monitor training performance.<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>Conclusion<\/strong><\/h3>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Use <strong>DDP<\/strong> when training on single-node multi-GPU setups or when you want to leverage PyTorch\u2019s native features.<\/li>\n\n\n\n<li>Use <strong>Horovod<\/strong> for multi-node, multi-framework scenarios with better fault tolerance and elasticity.<\/li>\n<\/ul>\n\n\n\n<p class=\"wp-block-paragraph\"><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Distributed training allows deep learning models to scale across multiple GPUs or nodes, accelerating training and enabling the handling of larger models and datasets. Two common frameworks for distributed training in PyTorch are Distributed Data Parallel (DDP) and Horovod. Both offer tools for scaling training efficiently, but they operate differently. 1. Overview of DDP and [&hellip;]<\/p>\n","protected":false},"author":2,"featured_media":132,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_event_date":"","_event_time":"","_event_location":"","_event_registration_url":"","footnotes":""},"categories":[1],"tags":[],"class_list":["post-58","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/neuronix.us\/index.php?rest_route=\/wp\/v2\/posts\/58","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/neuronix.us\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/neuronix.us\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/neuronix.us\/index.php?rest_route=\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/neuronix.us\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=58"}],"version-history":[{"count":1,"href":"https:\/\/neuronix.us\/index.php?rest_route=\/wp\/v2\/posts\/58\/revisions"}],"predecessor-version":[{"id":59,"href":"https:\/\/neuronix.us\/index.php?rest_route=\/wp\/v2\/posts\/58\/revisions\/59"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/neuronix.us\/index.php?rest_route=\/wp\/v2\/media\/132"}],"wp:attachment":[{"href":"https:\/\/neuronix.us\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=58"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/neuronix.us\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=58"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/neuronix.us\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=58"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}