To use AIACC-Training to accelerate training, you need to make small modifications to the model. This topic describes how to adapt the model of multiple frameworks for AIACC-Training.

Prerequisites

AIACC-Training is installed. For more information, see Automatically install AIACC-Training and Manually install AIACC-Training.

Background information

AIACC-Training is used for synchronous communication. The AIACC-Training method supports data parallelism and model parallelism, where data parallelism is the primary training method. To implement model parallelism, the model must be adapted by using the communication interface provided by AIACC-Training. Existing use cases include model parallelism for InsightFace and the combined automatic model parallelism and data parallelism for Megatron-LM.

When you use AIACC-Training to accelerate training, the minimum unit for work is a single GPU. Each GPU is assigned an independent Message Passing Interface (MPI) process. In scheduling mode, a single machine attached with multiple GPUs and multiple machines attached with multiple GPUs both deliver the same level of performance. Each GPU has is ranked equally and no central node or central GPU exists.
Note When multiple GPUs share one MPI process, the training performance is poor. To improve the training performance, the scheduling mode assigns one MPI process to only a single GPU.

AIACC-Training is compatible with most major APIs. For example, AIACC-Training is compatible with Horovod API operations on TensorFlow and PyTorch, and is compatible with Horovod API as well as KVStore API operations on MXNet. This simplifies the use of AIACC-Training.

The following figure shows the basic framework to implement data parallel acceleration by using AIACC-Training. training-proc
The preceding figure illustrates the procedure to implement data parallel acceleration.
  1. Divide data.
  2. Set the device ID of the GPU based on local rank.
  3. Run the code on a single GPU to complete forward and backward computing.
  4. Before you update the gradient, AIACC-Training intervenes for the gradient fusion communication.

Adapt TensorFlow models for AIACC-Training

AIACC-Training is compatible with Horovod API operations for TensorFlow. The following process describes how to adapt a TensorFlow model:
  1. Import the Perseus Horovod module.
    Sample code:
    import perseus.tensorflow.horovod as hvd
  2. Initialize the Perseus Horovod module.
    Note Initialize the module before you use other API operations as the first part of the main function.
    Sample code:
    hvd.init()
  3. In most cases, you must increase the learning rate based on the total number of workers. The learning rate is the product that the current learning rate is multiplied by hvd.size(). For example, if you have two nodes and each node is equipped with eight GPUs, the total number of workers is 16.
    Note You do not need to increase the learning rate of some models such as Bidirectional Encoder Representations from Transformers (BERT).
    Sample code:
    # Horovod: scale learning rate by the number of workers.
    optimizer = tf.train.MomentumOptimizer(
        learning_rate=0.001 * hvd.size(), momentum=0.9)
  4. Use hvd.DistributedOptimizer() to overload optimizer.
    The input parameter of this method is optimizer in TensorFlow standard. The output parameter is optimizer overloaded by Perseus Horovod. Sample code:
    # Horovod: add Horovod Distributed Optimizer.
    optimizer = hvd.DistributedOptimizer(optimizer)
  5. Add BroadcastGlobalVariablesHook(0) to session hook to broadcast global variable parameters to all nodes at the beginning of the training.
    Before you call train, define the hook for broadcasting. When you call train, add the hook and divide the number of steps by the number of total workers. the number of total workers is specified by size(). Sample code:
        # Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states from
        # rank 0 to all other processes. This is necessary to ensure consistent
        # initialization of all workers when training is started with random weights or
        # restored from a checkpoint.
        bcast_hook = hvd.BroadcastGlobalVariablesHook(0)
    
        # Horovod: adjust number of steps based on number of GPUs.
        mnist_classifier.train(
            input_fn=train_input_fn,
            steps=20000 // hvd.size(),
            hooks=[logging_hook, bcast_hook])
  6. Bind the current process to the corresponding GPU.

    Use local_rank() to obtain the number of GPUs on the node. For example, when you call local_rank() for a node that is equipped with eight GPUs, numbers from zero to seven are returned. You can bind the process to the corresponding GPU based on the number that is returned.

    Sample code:
    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    config.gpu_options.visible_device_list = str(hvd.local_rank())
  7. Save the checkpoint on root rank and set other settings to None. This ensures that the workers do not overwrite each other, and no conflicts occur.
    Sample code:
    checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None

Adapt PyTorch models for AIACC-Training

Complete sample code

AIACC-Training is compatible with Horovod API operations for PyTorch. The following process describes how to adapt the model code of PyTorch:
  1. Import the Perseus Horovod module.
    Sample code:
    import perseus.torch.horovod as hvd
  2. Initialize the Perseus Horovod module.
    Note Initialize the module before you use other API operations as the first part of the main function.
    Sample code:
    hvd.init()
  3. Bind the current process to the corresponding GPU.
    Sample code:
    torch.cuda.set_device(hvd.local_rank())
  4. In most cases, you must increase the learning rate based on the total number of workers. The learning rate is the product that the current learning rate is multiplied by hvd.size(). For example, if you have two nodes and each node is equipped with eight GPUs, the total number of workers is 16.
    Note You do not need to increase the learning rate of some models such as BERT.
  5. Overload optimizer.
    Sample code:
    optimizer = hvd.DistributedOptimizer(
                              optimizer, named_parameters=model.named_parameters())
    If you adapt multiple PyTorch models for AIACC-Training, multiple named_parameters parameters exist and must be merged. Sample code:
    all_named_parameters = []  
    for name, value in model1.named_parameters():  
        all_named_parameters.append((name, value))  
    for name, value in model2.named_parameters():  
        all_named_parameters.append((name, value))  
    optimizer = hvd.DistributedOptimizer(
                               optimizer, named_parameters=all_named_parameters)
  6. Broadcast global variable parameters to all nodes.
    Sample code:
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(optimizer, root_rank=0)
    If you adapt multiple PyTorch models for AIACC-Training, multiple state_dict parameters exist and must be merged. Sample code:
    all_state_dict={}  
    all_state_dict.update(model1.state_dict())  
    all_state_dict.update(model2.state_dict())  
    hvd.broadcast_parameters(all_state_dict, root_rank=0)
  7. Divide data.
    Sample code:
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset, num_replicas=hvd.size(), rank=hvd.rank())  
    loader = torch.utils.DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler, **kwargs) 
  8. Configure one machine that has a single GPU to run the program.

    Perseus Horovod runs the program and then sends the task to the GPUs for distributed training. This allows you to have the effect of one machine and multiple GPUs.

    Original sample code:
    model = nn.DataParallel(model.cuda())
    Sample code after the modification:
    # Method 1.
    model = nn.DataParallel(model.cuda(), device_ids=[hvd.local_rank()])
    # Method 2. The process in Step 3 is bound to the GPU. The cuda() method is used to find the default GPU bound by the current process.
    model = model.cuda()
  9. Save the checkpoint.
    You need only to save the information of the checkpoint, verbose, and tensorboardX on process 0 to avoid conflicts with other processes.
    save_checkpoint = True if hvd.rank() == 0 else False
    verbose = 1 if hvd.rank() == 0 else 0
    log_writer = tensorboardX.SummaryWritter(log_dir) if hvd.rank() == 0 else None
  10. Load the checkpoint.
    if hvd.rank() == 0:
        checkpoint = torch.load(filepath)
        model.load_state_dict(checkpoint['model'])
        optimizer.load_state_dict(checkpoint['optimizer'])
After you adapt the model code of PyTorch to AIACC-Training, you can execute the training task. Sample command:
  • A single machine that has eight GPUs (eight processes in total)
    mpirun –allow-run-as-root -np 8 -npernode 8 -x NCCL_DEBUG=INFO ./train.sh
  • Four machines that each has eight GPUs (32 processes in total)
    mpirun –allow-run-as-root –bind-to none -np 32 -npernode 8 \
                       -x NCCL_DEBUG=INFO -x PATH -x LD_LIBRARY \
                       -x PERSEUS_ALLREDUCE_STREAMS=8 
                       -hostfile mpi_host.txt ./train.sh

Adapt the DDP interface of PyTorch models for AIACC-Training

Complete sample code

This procedure is applicable to PyTorch 1.5.0 or later. You need only to modify the import package and the model of DDP to adapt the PyTorch model for AIACC-Training. Perform the following operations:
  1. Modify the import package of the sample code.
    Original sample code:
    import torch
    import torch.utils.data.distributed
    Sample code after the modification:
    import perseus
    import perseus.torch.distributed as dist
  2. Modify the model wrapper of the DDP interface in the sample code by prefixing perseus. to torch.nn.parallel.DistributedDataParallel.
    Original sample code:
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
    model = torch.nn.parallel.DistributedDataParallel(model)
    Sample code after the modification:
    model = perseus.torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
    model = perseus.torch.nn.parallel.DistributedDataParallel(model)
  3. Broadcast the optimizer parameter to all nodes.
    If you are using an optimizer provided by PyTorch, skip this step. If you are using a custom optimizer, add the following content to the sample code to synchronize the optimizer parameter to all nodes.
    if dist.use_perseus:
        dist.broadcast_optimizer_state(optimizer, root_rank=0)
  4. Modify the sample code to set the dataloader parameter.
    Original sample code:
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    Modify the sample code to assign values to the rank and num_replicas parameters:
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset, num_replicas=dist.get_world_size(), rank=dist.get_rank())
  5. Start the training script.
    The train.sh training script is used in this example. The following code block shows the contents of the script.
    #!/bin/bash
    python main.py -a resnet50 --dist-url 'tcp://127.0.0.1:88' --dist-backend nccl --world-size 1 --epochs 1 --rank 0 /mnt/imagenet/rawdata -b 64
    main.py is the modified sample training script. Take note of the following items when you modify the script:
    • The modified main.py script must be training code that runs on a single machine with a single GPU. Multiple processes are not supported.
    • Take note when you set hyperparameters. For example, the batchsize hyperparameter must follow the code logic of one machine that has one GPU. However, because the script uses the AllReduce algorithm for synchronous training, the value of total_batchsize is equal to the value of batchsize * dist.get_world_size().
    • Configure the args.distributed parameter based on your business requirements. Sample code:
      if dist.use_perseus:
          args.distributed = True
    • If you preset or use args.gpu in your training code, add the following code to set the value of local_rank to args.gpu.
      args.gpu = dist.get_local_rank()
    Run the following mpirun command to start train.sh:
    mpirun –allow-run-as-root –bind-to none -np 32 -npernode 8 \ 
        -x NCCL_DEBUG=INFO -x PATH -x LD_LIBRARY \ 
        -x PERSEUS_ALLREDUCE_STREAMS=8 -hostfile mpi_host.txt ./train.sh

Adapt MXNet models for AIACC-Training

AIACC-Training is compatible with Horovod and KVStore API operations for MXNet.

The following process describes how to adapt the model code of MXNet to Horovod API operations:
  1. Import the Perseus Horovod module.
    Sample code:
    import perseus.mxnet as hvd    
  2. Initialize the Perseus Horovod module.
    Sample code:
    hvd.init()
  3. Bind the current process to the corresponding GPU. In most cases, you must increase the learning rate based on the total number of workers. The learning rate is the product that the current learning rate is multiplied by hvd.size().
    Note You do not need to increase the learning rate of some models such as BERT.
    Sample code:
    # rank and size
    rank = hvd.rank()
    num_workers = hvd.size()
    local_rank = hvd.local_rank()
    
    # Horovod: pin GPU to local rank
    context = mx.gpu(local_rank)
  4. Overload optimizer.
    Sample code:
    opt = mx.optimizer.create(optimizer, **optimizer_params)
    # Horovod: create DistributedTrainer, a subclass of gluon.Trainer
    trainer = hvd.DistributedTrainer(params, opt)
  5. Broadcast global variable parameters to all nodes.
    Sample code:
    # Horovod: fetch and broadcast parameters
    params = net.collect_params()
    if params is not None:
        hvd.broadcast_parameters(params)
AIACC-Training is compatible with KVStore API operations. The MXNet model overloads KVStore to accelerate training. After you use AIACC-Training, you cannot use multiple GPUs in a single process of native MXNet. A single process must be bound to a single GPU. Therefore, you must modify the ctx settings in the model code. The following code provides an example on how to bind the current process to the GPU corresponding to kv.local_rank:
    ctx = []
    cvd = os.environ['DEVICES'].strip()
    if 'perseus' in args.kv_store:
        import perseus.mxnet as perseus
        ctx.append(mx.gpu(kv.local_rank))
The following process describes how to adapt the model code of MXNet to KVStore API operations:
  1. Modify the model code, including the method of importing the Perseus MXNet module and the method of creating KVStore.
    Original sample code:
    diff --git a/example/image-classification/common/fit.py b/example/image-classification/common/fit.py
    index 9412b6f..3a6e9a0 100755
     a/example/image-classification/common/fit.py
    @@ -22,6 +22,7 @@ import time
     import re
     import math
     import mxnet as mx
    
     def _get_lr_scheduler(args, kv):
    @@ -146,7 +147,8 @@ def fit(args, network, data_loader, **kwargs):
         data_loader : function that returns the train and val data iterators
         """
         # kvstore
       kv = mx.kvstore.create(args.kv_store)
    Sample code after the modification:
    diff --git a/example/image-classification/common/fit.py b/example/image-classification/common/fit.py
    index 9412b6f..3a6e9a0 100755
     b/example/image-classification/common/fit.py
    @@ -22,6 +22,7 @@ import time
     import re
     import math
     import mxnet as mx
     import perseus.mxnet as perseus_kv
    
     def _get_lr_scheduler(args, kv):
    @@ -146,7 +147,8 @@ def fit(args, network, data_loader, **kwargs):
         data_loader : function that returns the train and val data iterators
         """
         # kvstore
      kv = perseus_kv.create(args.kv_store) if args.kv_store == dist_sync_perseus else mx.kvstore.create(args.kv_store)
         if args.gc_type != 'none':
             kv.set_gradient_compression({'type': args.gc_type,
                                          'threshold': args.gc_threshold})
  2. Prepare the config.sh configuration script required to execute the training task.

    The configuration script ensures that the process is bound to the corresponding GPU when you execute the training task. You must set the MPI environment variable MXNET_VISIBLE_DEVICE. This variable is used to obtain the GPU device ID that is used in the process, and pass the GPU device ID as a parameter to the model code to create a ctx.

    Sample code of config.sh:

    #!/bin/sh
    let GPU=OMPI_COMM_WORLD_RANK % OMPI_COMM_WORLD_LOCAL_SIZE
    
    export OMP_NUM_THREADS=4
    
    MXNET_VISIBLE_DEVICE=$GPU python train_imagenet.py \
                                     --network resnet \
                                     --num-layers 50 \
                                     --kv-store dist_sync_perseus \
                                     --gpus $GPU …
  3. Execute the training task.
    No parameter server exists when you use AIACC-Training. Therefore, the training task is executed by running the mpirun command. Sample command that is used to execute a training task on four machines that have eight GPUs (32 processes in total):
    mpirun –np 32 –npernode 8 –hostfile mpi_host.txt  ./config.sh
    mpi_host.txt is used to specify the IP address of the node, which is a common MPI machine file and similar to the host file when the native MXNet uses ssh launcher. Example content of mpi_host.txt:
    192.168.0.1
    192.168.0.2
    192.168.0.3
    192.168.0.4
    By default, the open source version of MXNet occupies all CPU resources of the system. Therefore, the open source version consumes more CPU time during the startup of training tasks. This slows down the startup speed. You can configure the following environment variables to accelerate the startup:
    export MXNET_USE_OPERATOR_TUNING=0;
    export MXNET_USE_NUM_CORES_OPERATOR_TUNING=1
    export OMP_NUM_THREADS=1
  4. After the training starts, each GPU occupies a process and exports the training results.

In scenarios in which the batch size is small such as object recognition scenarios, the mean and variance calculated by BatchNorm on each GPU have large deviations. This results in a loss of accuracy. Compared with the original BatchNorm, SyncBatchNorm trades training performance for improved convergence accuracy.

The following process describes how to use SyncBatchNorm:
  1. Install the patch.
    patch -p1 < perseus-mxnet-sync-bn.patch
  2. Compile the source code.
    make USE_OPENCV=1 USE_BLAS=openblas USE_CUDA=1 USE_CUDA_PATH=/usr/local/cuda USE_CUDNN=1 USE_DIST_KVSTORE=1 USE_NCCL=1 USE_LIBJPEG_TURBO=1 MPI_ROOT=/usr/local -j24 

    SyncBatchNorm is implemented based on the official MXNet code src/operator/contrib/sync_batch_norm-inl.h. Batch normalization (BN) loads libperseus_mxnet.so and calls the communication API operations of Perseus MXNet. This way, BN is synchronized across GPUs within the operator.

  3. The method to synchronize BN across GPUs.

    SyncBatchNorm is based on the official MXNet code. Therefore, SyncBatchNorm is compatible with the original usage method. If you change the name to PerseusSyncBatchNorm, you can modify the synchronization mode. Example: mx.gluon.contrib.nn.PerseusSyncBatchNorm(comm_scope=0) and mx.sym.contrib.PerseusSyncBatchNorm(comm_scope=0).

    The local and global modes are supported.
    • Local mode: local average (comm_scope=0). This is the default mode. After the mean and variance are calculated forward and backward each time, the results are synchronized between GPUs on the single machine.
    • Global mode: global average (comm_scope=1). After the mean and variance are calculated forward and backward each time, the results are synchronized between GPUs on all machines.

Adapt Caffe models for AIACC-Training

AIACC-Training increases fusion communication to accelerate model training for Caffe. The following process describes how to adapt the model code of Berkeley Vision and Learning Center (BVLC) Caffe for the AIACC-Training distributed framework:
  1. Install the patch for BVLC Caffe.
    1. Make sure that the version of BVLC Caffe is 99bd99795dcdf0b1d3086a8d67ab1782a8a08383.
    2. Install the patch.
      git apply perseus-dist-1.2.0/patches/bvlc_caffe/1_perseus_distribute_train.patch
      git apply perseus-dist-1.2.0/patches/bvlc_caffe/2_cudnn_batchnorm.patch
      git apply perseus-dist-1.2.0/patches/bvlc_caffe/3_data_augmentation.patch

      1_perseus_distribute_train.patch is used for distributed training of AIACC-Training. 2_cudnn_batchnorm.patch is used for cuDNN Batch Norm. 3_data_augmentation.patch is used to enhance data of ImageData Layer. If you need only distributed training, install 1_perseus_distribute_train.patch.

  2. Run the following command to recompile Caffe.
    The output directory of the binary file is located in $HOME/perseus-caffe-dist.
    cd $HOME/caffe/build
    cmake -DCMAKE_INSTALL_PREFIX=$HOME/perseus-caffe-dist -DBLAS=open -DUSE_PERSEUS=ON -DPERSEUS_LIBRARY_PATH=/root/caffe/libperseus-caffe.so ..
    make all -j16 && make instal
  3. Execute distributed training tasks.
    Sample command for two machines that have eight GPUs each (16 processes in total):
    mpirun --allow-run-as-root -np 16 -npernode 8 \
        -machinefile /root/hostfile \
        --mca bind-to none \
        --mca btl_tcp_if_include eth0 \
        --mca orte_keep_fqdn_hostnames t \
        -x NCCL_IB_DISABLE=1 \
        -x NCCL_SOCKET_IFNAME=eth0 \
        -x LD_LIBRARY_PATH \
        -x NCCL_DEBUG=INFO \
        /root/perseus-caffe-dist/bin/caffe train --solver solver.prototxt
    Note The GPU ID does not need to be specified in the caffe train command. AIACC-Training automatically allocates GPU resources based on the number of processes started.