Tuesday, August 1, 2017

Data parallel and model parallel distributed training with Tensorflow

Google's TensorFlow is a popular platform that is able to perform distributed training of machine learning/deep learning applications. In this post, I will present several ways of performing distributed training with TensorFlow, especially data parallel and model parallel training. By distributed training, I mean the training process run on multiple devices, especially multiple nodes/machines.

About data parallelism and model parallelism: 
Distributed training comes into play as a solution to deal with big data and big model problem. Distributed training enhances the degree of parallelism. Usually, there are two kinds of parallelism in distributed training with Tensorflow: data parallelism and model parallelism. For more explanation: Please refer to this Quora answer What is the difference between model parallelism and data parallelism?

About parameter server concept:
TensorFlow introduces the abstraction of "parameter server" or "ps" from its predecessor DistBelief. Other platforms like MXnetPetuum also have the same abstraction. Basically, parameter server (ps) is a collection of computing unit (such as device, node and application process) that is responsible for storing and updating the parameters of a machine learning/deep learning model. As a common way to handle mutable state (model parameters),  most of the distributed machine learning/deep learning platforms support parameter server functionality.

I am assuming you have basic understanding of Tensorflow and I will omit basic concept/ knowledge about Tensorflow. The following shows the ways we can implement distributed training with TensorFlow:
Fig 1  


  • Data Parallelism:
Example 1 (Figure 1):
Each worker node (corresponding to a worker task process) executes the same copy of compute-intensive part of the TensorFlow computation graph. Model parameters are distributed/divided into multiple PS nodes(corresponding to  ps task processes). There is a separate client for each worker task. In each iteration of training, each worker task takes into a batch of training data and performs computation(e.g. compute the gradient in SGD training) with the model parameters fetched from PS. The resulting gradients from each worker task are aggregated and applied to PS in either sync or async fashion

Some notes: 
1, In this scheme, the PS only contains a single copy of the model. However, the single copy of the model parameters can be split on different nodes.
2, This scheme can achieve both synchronous and asynchronous training.
3, This is a typical data parallelism training scenario, because in each iteration, each worker performs the same computation logic and has to accommodate the whole model parameters for computation. However, each worker only needs to hold a subset of the training data for each iteration's computation.
4, There is no application-level data (e.g. parameters or activations) communication among workers.
5, The example employs between-graph replication training mechanism. A similar scheme is in-graph replication training,  "in-graph" means their is only a single client in the training cluster and the copies of compute-intensive part are within the single computation graph built by the single client. In-graph replication training is less popular than between-graph replication training. 
For more information about between-graph replication training, please refer to the tutorial.

Example 2 (Fig 2)
In example 1, I am assuming each computing node in the cluster only contains CPU device. With GPU-powered nodes, the data parallel distributed training might be as the following example 2 ( Fig 2) (You can find this figure here) :
Fig 2
Example 2 shows a single node (with one CPU and two GPUs)'s view of data parallel distributed training. Each GPU executes a worker task. The difference between this example and the previous one is: in example 2 each node's the CPU first aggregates the gradients from all the local GPUs and then applies the aggregated gradients to PS. The fetching of parameters from PS is done by each node's CPU and the CPU then copies the parameters to local GPUs.

You can find code examples for data parallel distributed training on my github: code examples of distributed training with TensorFlow

  • Model Parallelism
When a big model can not fit into a single node's memory, model parallel training can be employed to  handle the big model. Model parallelism training has two key features:
1, each worker task is responsible for estimating different part of the model parameters. So the computation logic in each worker is different from other one else.
2, There is application-level data communication between workers.
The following Fig 3 shows a model parallel training example:
Fig 3

 This example shows how to use model parallel to train a neural network (with three hidden layers) with back-propagation algorithm. (Here is a wonderful tutorial for understanding back-propagation algorithm) There are four computing units (here four worker nodes). Each worker is responsible for storing and updating a subset of the network parameters. In the forward phase (blue arrow), each worker passes the activations to its successor worker. And in the backward phase (red arrow), each worker passes the "error" to its predecessor worker. Once a worker receives the errors from its successor worker, it is ready to update its corresponding parameters. It's worth meaning that in this model parallel scheme, workers are not able to perform computation simultaneously, as a worker can not get starting computing until it receives the necessary stuff, either activations or errors, from its neighbor.
This Stackoverflow Question is about a simple implementation of model parallel training in TensorFlow. You may find it useful.




Wednesday, February 24, 2016

Parameter Server and Petuum: Specific system for large scale machine learning algorithm.

Parameter Server and Petuum are two distributed computational framework for large scale machine learning tasks. Both are created by CMU guys. The links for them are: Petuum,Paremeter Server. In my previous post (Introduction to Petuum) , I have already made an introduction for Petuum. In this post, I will make an introduction to Parameter Server. This post is not a short abstraction of each part of Parameter Server paper. In this post, I will mainly talk about some important aspects for designing this kind of system for large scale machine learning task, with parameter server being a good example.
  • 1,  Iterative-convergence algorithm and Error Tolerance Convergence
A large number of machine learning problems can be solved by iterative-convergence algorithms. By iterative-convergence I mean, these algorithms iteratively converge to a optimal state which may be a global optimal state or local optimal state.  For this kind of algorithms, there are some key properties that we can exploit to facilitate the design and implementation of a distributed computing system. One of the key properties is Error Tolerance Convergence. I will illustrate this in the following.
  • 2,  Stale Synchronous Parallal (SSP)----trade-off between efficiency and consistency
It's hard to design a distributed system with both well efficiency and consistent guarantee. Most people in distributes system area may be familiar with MapReduce framework. It employs Bulk Synchronous Parallel (BSP) model and provide strong consistency guarantee. For some problems, strong consistency should be necessarily guaranteed. For example, sorting problem. When using MapReduce to solve some machine learning problems the efficiency suffers because MapReduce's consistency mode requires significant communication overhead. Can we sacrifice consistency to gain efficiency improvement in solving large scale machine learning problems. The answer is of cause!

We can treat SSP model as a trade-off between a fully synchronous system and a fully asynchronous system. SSP bring inconsistency into a system, however this inconsistency is bounded. For some machine learning algorithms especially iterative-convergence algorithms, bounded inconsistency would not impose a bad effect and the correctness of executing these algorithms in the context of SSP is guaranteed. Both Parameter Server and Petuum employ SSP model (Actually, the consistency mechanism of Parameter Server can be configured by users, it can be configured as a fully Asyn or Sync model). Why these iterative-convergence machine learning algorithms can, to some extent,  bear inconsistency? The reason is that these algorithm usually have the capacity of tolerating bounded error. Linear regression is a classic model in machine learning. We can use gradient descent algorithm to get the model parameter. In this post, I take gradient descent algorithm as an example to illustrate the property of error-tolerant-convergence. (In this post, I omit basics regarding linear regression and gradient descent algorithm, if you are not familiar with it, you can refer to Andrew Ng's online courses)

In gradient descent algorithm, we usually want to minimize a cost function $J(\Theta)$ , where $\Theta \in \mathbb{R}^{n}$ is a n-dimension vector. The gradient descent algorithm updates $\Theta$ iteratively until $J(\Theta)$ gets its local/global minimum (convergence). In each iteration, $\Theta$ is updated in the following way:
$\Theta :=\Theta+\alpha \cdot \nabla_\Theta  J(\Theta)$             $(1)$
formula $(1)$ is equivalent to the following update:
$\Theta_j:=\Theta_j+\alpha \cdot \frac{\partial J(\Theta )}{\partial \Theta_j}  \quad j \in{1,2...n}$      $(2)$
where $\alpha$ is called learning rate, $ \frac{\partial J(\Theta )}{\partial \Theta_j}$ is called gradient. Formula $(1)$ can be represented in another way:
$\Theta :=\Theta+u$                                    $(3)$
Where $u=\alpha \cdot \nabla_\Theta  J(\Theta)$ is the update in each iteration.  Since the training data sample for this kind of machine learning algorithms are usually independent and identically distributed, we can partition the whole training data set into several parts and assign each part to a worker machine. Then each worker is responsible for computing a partial update based on its assigned data subset. As for strongly consistency system like MapReduce, if we have $r$ workers, $r$ sub-updates are needed to be aggregated  in each iteration and then the parameter $\Theta$ will be updated based on the aggregated gradient (sum of sub-updates). This means the update for parameter $\Theta$ can only be triggered when all workers finish computation in each individual iteration. In this situation,  we trade efficiency for strong consistency. 

SSP model employed by Parameter Server and Petuum actually sacrifice, to some extent, consistency to gain efficiency improvement. At the same time, algorithm correctness is guaranteed. SSP model allow faster worker go a little more "further" than others but guarantee the faster and slowest worker be no more than $\tau$ apart.  In Parameter Server and Petuum $\tau$ could be iteration count. So any worker's sub-gradient may be able to trigger the update for parameter $\Theta$ even though some   other workers may not have their sub-gradient done. This means a faster worker do not have to wait a slower one as long as the slower worker is not that slower (more than $\tau$ far behind). This process does cause errors in iteration, but the errors are bounded. There are theoretical proofs for that in the following papers (To be truth, I am not clear enough regarding every details in the papers):


  • 3,  The architecture of Parameter Server
The previous parts are preparation which is necessary. Now let's take a look at Parameter Server (PS).
   Figure 1,  the architecture of PS
The PS employs client-server model. 
Server group: share global parameters such as $\Theta$ in gradient descent algorithm. The nodes in the server can communicate with each other to replicate and/or migrate parameters for reliability and scaling.
---Server manager node: maintain consistent view of server nodes (metadata of servers)
Work group:  Workers in a group is responsible for a application
---Task scheduler: In each worker group, assign tasks to each worker and monitor their progress. If workers are added or removed, it reschedules unfinished tasks.
Workers only communicate with server group but not with other workers within a group. Worker groups can either work together for a big task or work individually for individual tasks.

  • 4,  Data structure and communication  in Parameter Server
The shared parameter in PS can be represented as a set of (key,value) pairs. In addition, in order to optimize linear algebra operation the keys are ordered. This lets us treat the parameters as (key, value) pairs while endowing them with vector and matrix semantics. For example, in the gradient descent algorithm parameter $\Theta$ can be seen as a vector with each entry being as ($ID$, $\Theta_i$).

The PS supports Range Push and Pull operations.  Suppose the key range is denoted by $R$. Parameter Server includes the following two operations:
$u.push(R,dest)$: send all existing entries of $u$ in key range $R$ to the destination.
$u.pull(R,dest)$:  read all  existing entries of $u$ in key range $R$ from the destination.

  • 5,  An algorithm example:  Distributed Subgradient Descent


Algorithm 1


The example is a little different with the original gradient descent algorithm (a penalty function $\Omega$ is added to original cost function).  However, the method for estimating model parameter and the property of error-tolerant-convergency is almost the same.

  • 6,  Task dependence and flexible consistency
In PS, we are able to  impose specific task dependency constraints. By default, tasks are executed in parallel. If we want a task to be executed only after some other tasks have been finished, we can set this constraints in PS. The PS system is responsible for maintaining the task-dependency constraints, if users define such constraints.  For example, the aggregation logic in $ServerIterate$ of Algorithm 1 updates $w$ only after all worker gradients have been aggregated. This can be implemented by having the updating task depend on the push tasks of all workers. (*Make a note here*: there are some variants of gradient decent algorithm, one of them is stochastic gradient descent algorithm(SGD). SGD does not scan all the data points for each iteration, it instead relies on only one sample from data set for each iteration. In the case of SGD, we do not need the constraints as in algorithm 1)

Instead of providing only one consistency model, PS provides flexibility for uses in defining consistency model. Three kinds of consistency models are supported by PS.

  1. Bulk Synchronous Parallel (BSP) 
  2. Stale Synchronous Parallel (SSP)
  3. Asynchronous Model
Actually, BSP and Asynchronous Model can be seen as special case of SSP.


 7, Distributed Hash table in Server Group

Parameters are stored in server group. PS provides the following mechanism of fault tolerance and scalability.
The Parameter Server partition keys much as a conventional distributed hash table does: keys and server node IDs are both inserted into the hash ring . Note that a physical node is inserted multiple times in the form of virtual nodes to facilitate load-balancing.
Figure 2

Just like that showed in Figure 2, each node manages the key segment starting with its insertion point to the next point by other nodes in the anti-clockwise direction. In Figure 2, server nodes manages segment of the same color. The mapping from key segments to nodes is stored in Zookeeper. Each key segment is then duplicated into the k anti-clockwise neighbor server nodes for fault tolerance.
PS also provides effective mechanism for dealing with failure recovery.

At last, there are still some points which are not discussed in this post such as message passing and compression, (key value) pairs with vector clock, user-defined filters.



Tuesday, December 1, 2015

Introduction to Petuum: A top to bottom designed platform for distributed Machine Learning on Big data

This post includes some of my thought after reading the paper: Petuum (A New Platform for
Distributed Machine Learning on Big Data). This paper comes from academic environment (CMU).

  • What's the purpose of creating Petuum?

In machine learning domain, it's a common case that an algorithm can be easily applied to a small date set but can not be applied to a large scale data set. When facing large scale data set, the first thing that comes to mind is parallel process. In a parallel environment, unexpected problems/obstacles may arise. The follows are some obstacles: 1), need a decent programming model/abstraction/framework which can make the codes run in distributed environment. 2),  hard to guarantee that an algorithm can work correctly in a distributed system. By saying "correctly", I mean the system can guarantee fault tolerance (for example, even if one or some servers crash, the system has the capacity of making algorithm run as intended). In Petuum, they design a platform applicable to a wide range of different ML algorithms at scale. I think the most significant feature of Petuum is that it takes both System and Algorithm into consideration and proposes the Model Parallelism concept with Theory guarantee. This is a new angle and perspective for design distributed system for ML on large scale data set.

  • How does Petuum do?
Before introducing Petuum, it's better to know how current distributed systems/platforms solve large scale ML tasks. We may be familiar with some distributed platforms such as MapReduce, Spark. These platforms are not designed specifically for ML on large data. They are designed for general big data process tasks. Theoretically, we can use these platforms to implement ML algorithm, however some ML algorithms are either hard to be implemented or suffering high latency of computation with these platforms. To tackle that, some specific programming libraries are added on the top of these platforms. For example, Spark has MLlib which contains several ML algorithms and utilities. One thing I want to emphasize is that these platforms (such as Hadoop-MapReduce, Spark) take effort to either adjust themselves or add new utilities/libraries so as to allow large scale ML algorithms to be implemented efficiently and correctly on them. I call this approach From Bottom to Top solution. But Petuum goes in an opposite direction: From Top to Bottom.

The inspiration of Petuum comes from an observation that a number of ML algorithms are optimization-based. For these algorithms, we usually have an model with parameters to be estimated/determined. To figure out proper parameters, we usually set a appropriate function $L$ (RMS loss, likelihood, margin) and a stop criteria (for example, $L$ gets its minimum). Basically, what an algorithm does is it iteratively updates parameters until stop criteria is reached. Given this observation, they proposed two parallel mechanisms: Data Parallelism and Model Parallelism for Petuum system.

Iterative-Convergent ML Algorithm: Petuum system is designed for this kind of ML algorithm. The follows are representation of Iterative-Convergent ML Algorithm:
$D$ is data set; loss $L$ is a fitness function such as RMS loss, likelihood, margin; $(t)$ denotes iteration. $A$ is model state (i.e. parameters and/or latent variables). $\Delta _L$ performs computation on data $D$ and model state A, and outputs intermediate results to be aggregated by $F()$. For simplicity, $L$ in the subscript is omitted in the rest of this post. 


Data Parallelism: data set $D$ is partitioned and assigned to workers. The $p$-th subset $D_p$ is handled by $p$-th worker.  Each worker can iteratively update ($\Delta$) by operating on the data partition it has been assigned. All updated outputs from every workers can be aggregated via summation. The key to the validity of this "additive update" is the notion of "independent and identically distributed data set", which is assumed for many ML algorithm. The data-parallel update equation is:


Model Parallelism: the model $A$ is partitioned and assigned to workers. Unlike data-parallelism, each update takes a scheduling function $S_{P}^{(t-1)}$, which restricts $ \Delta\left (  \right )$ to operate on a subset of the model parameters $A$. This process can be presented by:
Where $D$ is omitted for simplicity. $S_{P}^{(t-1)}$ outputs a set of indices $\{j_1, j_2, ...\}$ so that $ \Delta\left (  \right )$ only performs updates on $A_{j_1}, A_{j_2}, ...$. One thing we should bear in mind is that $A_j$ are not, in general, independent of each other.  In my opinion, the core problem for model parallelism of Petuum is how to implement a schedule function $S_p()$.
  • Implementing Data and Model parallel Programs--Petuum
Petuum is an implemented system allowing data-parallel and model-parallel. The Petuum system comprises three components: scheduler, workers, and parameter server. The following is Petuum system structure:
Scheduler: The scheduler system enables model-parallelism by allowing users to control which model parameters are updated by worker machines. This is performed through a user-defined scheduling function $schedule( )$ (corresponding to $S_{P}^{(t-1)}$)
Workers: Each worker $p$ receives parameters to be updated from $schedule( )$, and then runs parallelism update functions $push( )$ (corresponding to $ \Delta\left (  \right )$) on data $D$. While $push( )$ is being executed, the model state $A$ is automatically synchronized with the parameter server via the parameter exchange channel.
Parameter Server: The parameter servers provide global access to model parameters $A$ via a convenient distributed shared memory.  

The rest of this paper provides some theory proof and application examples of Petuum as well as some experimental results. The main purpose of this post is to introduce Petuum briefly, so I won't go through that part. If you are interested, please refer to Petuum paper.

Question?
Which way is proper in designing a large-scale distributed system for machine learning algorithm? From bottom to up OR from up to bottom OR others?













    

Wednesday, November 11, 2015

Time Dataflow Framwork for Naiad

Timely Dataflow model (TDM)is the computation model for Naiad. The key feature of timely dataflow model is that it adds loop structure into traditional stream process model (such as Millwheel). The follows is a picture to show the advanced structure,


  •  Graph structure


Time dataflow graph are directed graphs with the constraint that the vertices are organized into possibly nested loop contexts, with three associated system-provided vertices.
                  ingress vertex, 
                  egress vertex,
                  feed-back vertex
Edges entering a loop context must pass through an ingress vertex  and edges leaving a loop context must pass through an egress vertex.  Additionally, every cycle in the graph must be contained entirely
within some loop context, and include at least one feed-back vertex  that is not nested within any inner loop contexts.
  • New timestamp
Since the message can entry a loop (iteration). A single integer is not able to track a message's state. For example in traditional stream process a message with timestamp t=2 usually proceeds before a message with timestamp t=3. However when facing loop/iteration structure in timely dataflow model we can not guarantee that. So how to handle this? The answer is that TDM employs a new type of logical timestamp it has the form as follows:
We can see that there is one loop counter for each of the k loop contexts.  The ingress, egress, and feedback vertices act only on the timestamps of messages passing through them. The vertices adjust incoming timestamps as follows:
  • Vertex Computation

As in other dataflow models, the main programming element in a timely dataflow program is the vertex. A timely dataflow vertex is a possibly stateful object that sends and receives messages, and requests and receives notifications.
       -- A vertex may invoke two system-provided methods in the context of these callbacks:
       -- Each vertex v implements two callbacks:


At this point one thing we must bear in mind it that: TDM combine low-latency asynchronous message flow with lightweight synchronous coordination when required.
  • could-result-in relation relation (Process tracking protocol)
By now, we already know notification in TDF is synchronous. That means v.ONNOTIFY(t) can not be invoked until all messages associated with time equal to and less than t are received by vertex v. In  Naiad paper, they define a new type of timestamp called Pointstamp to help guarantee the correctness of delivering notifications. The Pointstamp is defined as follows:
The SENDBY(e,m,t) and NOTIFYAT method generate new event:

For v.SENDBY(e,m,t) the pointstamp of m is (t,e)
For v.NOTIFYAT(t) the pointstamp of the notification is (t,v)
I think this is the most tough part to understand in Naiad paper! I 'd like to use a concrete example to illustrate the could-result-in relation.
Figure 1
Let's say we have a data flow graph as showed in Figure 1. A is ingress vertex, E is egress vertex, D is feed-back vertex. Messages in a timely dataflow system flow only along edges, and their timestamps are modified by ingress, egress, and feedback vertices. we have discussed this constrain in previous part in this blog. In figure 1, each outstanding event (SENDBY,NOTIFY) have a timestamp. We can "sort" these events according a partial order called could-result-in. Please look at Figure 2.
Figure 2
The idea is a message could result in the notification if there exist a chain of events in the system that starts with delivering that message and ends with a message arriving at the same location as a notification at the same time or earlier. The directed edges in Figure 2 can suggest the could-result-in relation. Since we can "sort" events by could-result-in relation, that means there is a well-defined frontier of the events that have no existing predecessors (events within dashed rectangle are in the frontier). And this frontier march forward as events are delivered. So the frontier represents the global progress in a system. When a undelivered vertex notification finally makes it into frontier, that notification is safe to deliver. Because by the way we defined could-result-in order, there is no way that the vertex could ever receive a message with earlier time. That means tracking this frontier is crucial. In Naiad, a scheduler does this thing. A scheduler maintains a set of active events(pointstamp), which are those that correspond to at least one unprocessed event. For each active pointstamp the scheduler maintains two counts: occurrence count and precursor count. Figure 3 show the set.

Figure 3

.
.
.
I will continue writing/add something for this blog.