Asynchronous Implementation of Deep Q-Network


In this blog I will share my personal experience of implementing some basic deep learning algorithm asynchronously, including some difficulties I have encountered.

Contents

Deep Q-Network

Deep Q-Network (DQN) is a basic reinforcement learning algorithm that is able to play Atari games with visual input. Its training pipeline is shown below:

Asynchronous DQN

Sometimes we want to accelerate the training progress via asynchronous training. The basic idea is to create multiple processes so that the network is shared and trained simultaneously by different processes. The training pipeline of asynchronous DQN is shown below:

Asynchronous Implementation in TensorFlow

In TensorFlow, asynchronous implementation can be achieved by Distributed TensorFlow. In this blog, I will not go into details as the above link has covered all the basics. Instead, I would like to talk about some common problems I have encountered.

Parameter Server Hanging

This is probably the first problem one will encounter by following the tutorial example. That is, the parameter server process ps will never end even if all the worker processes worker have finished their tasks. The cause of this problem is

if job_name == "ps":
  server.join()

When server.join() is executed, ps will be blocked permanently. To solve this problem, ps should be notified whenever a worker has finished its task, and ps should end when all worker have finished their tasks. According to this question, we can modify the codes accrodingly:

if job_name == "ps":
  # Parameter server.
  with tf.device("/job:ps/task:" + str(task_index)):
    queue = tf.FIFOQueue(cluster.num_tasks("worker"), tf.int32, shared_name = "done_queue" + str(task_index))
  # Close the parameter server when all queues from workers have been filled.
  with tf.Session(server.target) as sess:
    for i in range(cluster.num_tasks("worker")):
      sess.run(queue.dequeue())
elif job_name == "worker":
  # The logic part of a worker.
  ...
  
  # Execute the following code when a worker finished its job.
  queues = []
  # Create a shared queue on the worker which is visible on the parameter server.
  for i in range(cluster.num_tasks("ps")):
    with tf.device("/job:ps/task:" + str(i)):
      queue = tf.FIFOQueue(cluster.num_tasks("worker"), tf.int32, shared_name = "done_queue" + str(i))
      queues.append(queue)
  # Notify all parameter servers that the current worker has finished the task.
  with tf.Session(server.target) as sess:
    for i in range(cluster.num_tasks("ps")):
      sess.run(queues[i].enqueue(task_index))
Memory Allocation of GPU

When I run my initial codes on a GPU server, the server tells me that it cannot allocate extra memory to other worker processes. My original code looks like this:

# Note: this version DOES NOT work.
cluster = tf.train.ClusterSpec(...)
server = tf.train.Server(cluster, job_name = job_name, task_index = task_index)
if job_name == "ps":
  # Do something for parameter server.
  ...
elif job_name == "worker":
  # Worker.
  with tf.device(tf.train.replica_device_setter(worker_device = "/job:worker/task:" + str(task_index), cluster = cluster)):
    # Build your network model here.
    ...
  
  # GPU configuration.
  gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction = PER_PROCESS_GPU_MEMORY_FRACTION)
  config = tf.ConfigProto(gpu_options = gpu_options)
  
  with tf.train.MonitoredTrainingSession(
    master = server.target,
    is_chief = (task_index == 0),
    config = config
    ) as sess:
      # Logic part of the worker.
      ...

It turns out that the GPU configuration should be defined in the tf.train.Server(). So the correct version should be:

# GPU configuration.
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction = PER_PROCESS_GPU_MEMORY_FRACTION)
config = tf.ConfigProto(gpu_options = gpu_options)

cluster = tf.train.ClusterSpec(cluster_dict)
server = tf.train.Server(cluster, job_name = job_name, task_index = task_index, config = config)
if job_name == "ps":
  # Do something for parameter server.
  ...
elif job_name == "worker":
  # Worker.
  with tf.device(tf.train.replica_device_setter(worker_device = "/job:worker/task:" + str(task_index), cluster = cluster)):
    # Build your network model here.
    ...
  
  with tf.train.MonitoredTrainingSession(
    master = server.target,
    is_chief = (task_index == 0)
    ) as sess:
      # Logic part of the worker.
      ...
Releasing GPU Memory

When a worker finishes its task, it will not release the allocated GPU memory by default. To solve this problem, we need to make small modifications in the code:

if job_name == "ps":
  # Parameter server.
  ...
elif job_name == "worker":
  # Worker.
  ...
  
  # Release memory when a worker is finished.
  tf.contrib.keras.backend.clear_session()

All we need is to add tf.contrib.keras.backend.clear_session() at the end so that the worker can release GPU memory.

Saving with Monitored Session

I encountered the saving issue when I try to save the model parameters as usual:

# Note: this version DOES NOT work.
with tf.train.MonitoredTrainingSession(...) as sess:
  # Logic part of the worker.
  ...
  
  # Save the network parameters.
  saver = tf.train.Saver(var_list = ...)
  saver.save(sess, file_name)

It turns out that in tf.train.MonitoredTrainingSession() you can only save models using sess._sess._sess._sess._sess:

with tf.train.MonitoredTrainingSession(...) as sess:
  # Logic part of the worker.
  ...
  
  # Save the network parameters.
  saver = tf.train.Saver(var_list = ...)
  saver.save(sess._sess._sess._sess._sess, file_name)

Performance

Next I will compare the performance of standard and asynchronous implementation of DQN. In both experiments, the agent is trained to play the Atari game “Pong” since this is a simple environment which can be easily solved by exploiting the weakness of the computer-controlled opponent.

Learning Curves

<b>Fig. 1:</b> Learning curve of standard DQN in the game Pong.
Fig. 1: Learning curve of standard DQN in the game Pong.

<b>Fig. 2:</b> Learning curve of asynchronous DQN in the game Pong.
Fig. 2: Learning curve of asynchronous DQN in the game Pong.

Fig. 1 and Fig. 2 show the learning curves with respect to the episodic reward during training. Higher is better. We can see that the agent is able to learn slightly faster with standard implementation. This is probably because in standard implementation the gradients are always up-to-date while in asynchronous implementation there is some delay for gradients updating.

Training Time

Both experiments are conducted under a g3.4xlarge instance (by using Amazon Web Service). 10 worker processes are created for the asynchronous implementation. The training time is shown in the following table. Lower is faster.

Variant Time(s)
Standard 12812
Asynchronous 5523


This shows that asynchronous implementation gets a huge speed boost.

Conclusion

In this blog I shared my personal experience of asynchronous implementation using distributed TensorFlow. Even though the experiments are conducted under some reinforcement learning task (DQN), I believe that similar experiment results should still apply to other deep learning tasks, that is, asynchronous implementation gets a huge speed boost in exchange of some convergence speed loss.

Resources