Horovod is the main choice for users to perform distributed training in TensorFlow. AIACC-Training 1.5 is compatible with Horovod API operations and can accelerate distributed training in TensorFlow. This topic describes how to use AIACC-Training for TensorFlow. This topic also provides answers to some commonly asked questions about AIACC-Training for TensorFlow.

Adapt to Horovod API operations

This section describes how to call Horovod API operations to perform distributed training in TensorFlow and how to adapt your original training code to AIACC-Training.

AIACC-Training for TensorFlow supports Horovod API operations. You can call Horovod API operations to perform distributed training in the same way as you use Horovod to perform distributed training. If you have used Horovod to perform distributed training, you need to only change the code in the import module to the following code:
import perseus.tensorflow.horovod as hvd

If your training code is not written to perform distributed training, you can perform the following operations to upgrade your code to the code for distributed training that is compatible with Horovod API operations.

  1. In the first part of the main() function, run the following command to initialize the Perseus Horovod module.
    Note Make sure that you run the command before you call other Perseus API operations.
    hvd.init()
  2. Split training data.
    To perform parallel training, you must make sure that a different set of data is used in each process. Horovod allows you to split data used for parallel training in manual or automatic mode. You can use the shard() function provided by TensorFlow in tf.data.Dataset to split a dataset into shards in automatic mode. You can also use the shard() function together with the size(), local_rank(), or rank() function to perform custom data splitting. Sample code:
    # The dataset used in each machine is the same.
    dataset = tf.dataset.shard(hvd.size(), hvd.rank())
    
    # The dataset is split into shards and a different shard is used in each machine.
    dataset = tf.dataset.shard(hvd.size(), hvd.local_rank())
    Functions:
    • size(): returns the number of GPUs used in the training. For example, if you used two machines and each machine was configured with eight GPUs, the return value is 16. You can also use this function to adjust the learning rate and the batch number when you save checkpoints.
    • local_rank(): returns the numerical identifier of the current GPU on the machine. For example, if you used two machines and each machine was configured with eight GPUs, the return value ranges from 0 to 7. You can bind a process to a GPU based on the return value of the function.
    • rank(): returns the numerical identifier of the current GPU among all GPUs on all machines. For example, if you used four machines and each machine was configured with eight GPUs, the return value ranges from 0 to 31.
    Important
    • If you want to invoke the shuffle() and repeat() functions for a dataset, you must make sure that the shard() function is invoked before the two functions. This ensures the accuracy of the algorithm and the stability of the performance.
    • You cannot split the evaluation or test dataset. Otherwise, the evaluation results of each process become inconsistent.
  3. In most cases, the number of training steps and warmup steps must be divided by hvd.size(), and the learning rate must be multiplied by hvd.size(). hvd.size() is the total number of processes.
    Note You do not need to increase the learning rate of specific models, such as the Bidirectional Encoder Representations from Transformers (BERT) model. You can determine whether to increase the learning rate based on the training convergence results.
    step = step // hvd.size()
    learning_rate = learning_rate * hvd.size()
  4. Reload the optimizer.

    You can invoke the DistributedOptimizer() function to reload the optimizer of your original machine. Sample code:

    # original optimizer
    optimizer = tf.train.AdamOptimizer(learning_rate)
    
    # AIACC-Training: wrap the Distributed Optimizer.
    optimizer = hvd.DistributedOptimizer(optimizer)
    DistributedOptimizer(TensorFlow_Optimizer): reloads the optimizer so that the optimizer can be used by AIACC-Training. The input parameter is the first optimizer parameter in the preceding code. The output parameter is the second optimizer parameter in the preceding code.
  5. Bind the current process to a GPU.
    • TensorFlow 1.x
      config = tf.ConfigProto()
      config.gpu_options.allow_growth= True
      config.gpu_options.visible_device_list =str(hvd.local_rank())
      You must pass config to a function based on the method that you want to use:
      • If you want to use tf.Session.run(), you must pass config to the function when the system initializes tf.Session.
        with tf.Session(config=config) as sess:
          sess.run(...)
      • If you want to use Estimator, you must encapsulate tf.estimator.RunConfig(session_config=config) and pass config to the function when the system initializes Estimator.
         mnist_classifier = tf.estimator.Estimator(
                model_fn=cnn_model_fn, model_dir=model_dir,
                config=tf.estimator.RunConfig(session_config=config))
      • If you want to use Keras, you must encapsulate and pass config to the function.
        from tensorflow.keras import backend as K
        K.set_session(tf.Session(config=config))
    • TensorFlow 2.x
      gpus = tf.config.experimental.list_physical_devices('GPU')
      for gpu in gpus:
          tf.config.experimental.set_memory_growth(gpu, True)
      if gpus:
          tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
  6. Before you start the training, synchronize the initial status of all training processes. You can adapt the code based on the method that you want to use:
    • If you want to use the traditional method tf.Session.run(), run tf.Session.run(hvd.broadcast_global_variables(0)).
    • If you want to use Keras, add hvd.keras.callbacks.BroadcastGlobalVariablesCallback(0) to the callbacks of the training.
    • If you want to use Estimator, add hvd.BroadcastGlobalVariablesHook(0) to the session hook.
  7. Save the checkpoint or the model only on the root rank and set other ranks to none.
    Note This prevents that processes overwrite each other when the system saves the model.
    checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None
  8. Start perseusrun to perform distributed training for your training script. In this example, two machines are used and each machine is configured with eight GPUs. Sample code:
    perseusrun -np 16 -H {IP1:8},{IP2:8} python xxx.py
    In the preceding command, IP specifies the IP address of your internal network.
    Note For more information about startup methods, see Startup commands.

Example

AIACC-Training provides the sample adapted code in the directory of its software package for different training methods, such as tf.Session.run(), Keras, Estimator, and tf2 eager. You can implement the following operations to perform training.

  1. Go to the directory in which the sample adapted code is stored.
    cd `echo $(python -c "import perseus; print(perseus)") | cut -d\' -f 4 | sed "s/\_\_init\_\_\.py//"`examples/
  2. Start distributed training.
    The following sample code shows how to start training for a Mixed National Institute of Standards and Technology (MNIST) model based on Keras. In this example, one machine that is configured with two GPUs is used.
    perseusrun -np 2 -H localhost:2 python tensorflow_keras_mnist.py

FAQ

How do I handle out of memory errors that occur during training?

You can perform the following operations to troubleshoot potential errors:
  • During startup, run nvidia-smi to check whether the occupied GPU memory of each GPU increases evenly, whether a process is bound to a GPU, and whether the GPU memory occupied by processes is close to each other.
  • If multiple processes are bound to a GPU, check whether config.gpu_options.visible_device_list is set to a valid value.
  • If no available GPU memory can be used, add config.allow_growth = True to the command.
  • If Accelerated Linear Algebra (XLA) is used, add config.gpu_options.per_process_gpu_memory_fraction = 0.9 or config.gpu_options.per_process_gpu_memory_fraction = 0.8 to the command.

How do I quickly determine whether performance bottlenecks are caused by gradient communication?

You can comment out DistributedOptimizer(opt) in the adapted code to prevent gradient communication. You can further locate the causes of performance bottlenecks, such as data I/O and CPU pre-processing.

What are the considerations when I split a dataset into shards?

When you use AIACC-Training, a set of training code is started by multiple processes. You must split the dataset into multiple shards to ensure that each shard is handled by and trained in a different process. You can use the shard() function provided by TensorFlow in tf.data.Dataset to split a dataset into shards in automatic mode. You can also use the shard() function together with the size(), local_rank(), or rank() function to perform custom data splitting. Sample code:
dataset = tf.dataset.shard(hvd.size(), hvd.rank())
Important If you want to invoke the shuffle() and repeat() functions for the dataset, you must make sure that the shard() function is invoked before the two functions. This ensures the accuracy of the algorithm and the stability of the performance. Otherwise, pre-processing is affected and performance is compromised.