Tuesday, December 1, 2015

Introduction to Petuum: A top to bottom designed platform for distributed Machine Learning on Big data

This post includes some of my thought after reading the paper: Petuum (A New Platform for
Distributed Machine Learning on Big Data). This paper comes from academic environment (CMU).

  • What's the purpose of creating Petuum?

In machine learning domain, it's a common case that an algorithm can be easily applied to a small date set but can not be applied to a large scale data set. When facing large scale data set, the first thing that comes to mind is parallel process. In a parallel environment, unexpected problems/obstacles may arise. The follows are some obstacles: 1), need a decent programming model/abstraction/framework which can make the codes run in distributed environment. 2),  hard to guarantee that an algorithm can work correctly in a distributed system. By saying "correctly", I mean the system can guarantee fault tolerance (for example, even if one or some servers crash, the system has the capacity of making algorithm run as intended). In Petuum, they design a platform applicable to a wide range of different ML algorithms at scale. I think the most significant feature of Petuum is that it takes both System and Algorithm into consideration and proposes the Model Parallelism concept with Theory guarantee. This is a new angle and perspective for design distributed system for ML on large scale data set.

  • How does Petuum do?
Before introducing Petuum, it's better to know how current distributed systems/platforms solve large scale ML tasks. We may be familiar with some distributed platforms such as MapReduce, Spark. These platforms are not designed specifically for ML on large data. They are designed for general big data process tasks. Theoretically, we can use these platforms to implement ML algorithm, however some ML algorithms are either hard to be implemented or suffering high latency of computation with these platforms. To tackle that, some specific programming libraries are added on the top of these platforms. For example, Spark has MLlib which contains several ML algorithms and utilities. One thing I want to emphasize is that these platforms (such as Hadoop-MapReduce, Spark) take effort to either adjust themselves or add new utilities/libraries so as to allow large scale ML algorithms to be implemented efficiently and correctly on them. I call this approach From Bottom to Top solution. But Petuum goes in an opposite direction: From Top to Bottom.

The inspiration of Petuum comes from an observation that a number of ML algorithms are optimization-based. For these algorithms, we usually have an model with parameters to be estimated/determined. To figure out proper parameters, we usually set a appropriate function $L$ (RMS loss, likelihood, margin) and a stop criteria (for example, $L$ gets its minimum). Basically, what an algorithm does is it iteratively updates parameters until stop criteria is reached. Given this observation, they proposed two parallel mechanisms: Data Parallelism and Model Parallelism for Petuum system.

Iterative-Convergent ML Algorithm: Petuum system is designed for this kind of ML algorithm. The follows are representation of Iterative-Convergent ML Algorithm:
$D$ is data set; loss $L$ is a fitness function such as RMS loss, likelihood, margin; $(t)$ denotes iteration. $A$ is model state (i.e. parameters and/or latent variables). $\Delta _L$ performs computation on data $D$ and model state A, and outputs intermediate results to be aggregated by $F()$. For simplicity, $L$ in the subscript is omitted in the rest of this post. 


Data Parallelism: data set $D$ is partitioned and assigned to workers. The $p$-th subset $D_p$ is handled by $p$-th worker.  Each worker can iteratively update ($\Delta$) by operating on the data partition it has been assigned. All updated outputs from every workers can be aggregated via summation. The key to the validity of this "additive update" is the notion of "independent and identically distributed data set", which is assumed for many ML algorithm. The data-parallel update equation is:


Model Parallelism: the model $A$ is partitioned and assigned to workers. Unlike data-parallelism, each update takes a scheduling function $S_{P}^{(t-1)}$, which restricts $ \Delta\left (  \right )$ to operate on a subset of the model parameters $A$. This process can be presented by:
Where $D$ is omitted for simplicity. $S_{P}^{(t-1)}$ outputs a set of indices $\{j_1, j_2, ...\}$ so that $ \Delta\left (  \right )$ only performs updates on $A_{j_1}, A_{j_2}, ...$. One thing we should bear in mind is that $A_j$ are not, in general, independent of each other.  In my opinion, the core problem for model parallelism of Petuum is how to implement a schedule function $S_p()$.
  • Implementing Data and Model parallel Programs--Petuum
Petuum is an implemented system allowing data-parallel and model-parallel. The Petuum system comprises three components: scheduler, workers, and parameter server. The following is Petuum system structure:
Scheduler: The scheduler system enables model-parallelism by allowing users to control which model parameters are updated by worker machines. This is performed through a user-defined scheduling function $schedule( )$ (corresponding to $S_{P}^{(t-1)}$)
Workers: Each worker $p$ receives parameters to be updated from $schedule( )$, and then runs parallelism update functions $push( )$ (corresponding to $ \Delta\left (  \right )$) on data $D$. While $push( )$ is being executed, the model state $A$ is automatically synchronized with the parameter server via the parameter exchange channel.
Parameter Server: The parameter servers provide global access to model parameters $A$ via a convenient distributed shared memory.  

The rest of this paper provides some theory proof and application examples of Petuum as well as some experimental results. The main purpose of this post is to introduce Petuum briefly, so I won't go through that part. If you are interested, please refer to Petuum paper.

Question?
Which way is proper in designing a large-scale distributed system for machine learning algorithm? From bottom to up OR from up to bottom OR others?













    

Wednesday, November 11, 2015

Time Dataflow Framwork for Naiad

Timely Dataflow model (TDM)is the computation model for Naiad. The key feature of timely dataflow model is that it adds loop structure into traditional stream process model (such as Millwheel). The follows is a picture to show the advanced structure,


  •  Graph structure


Time dataflow graph are directed graphs with the constraint that the vertices are organized into possibly nested loop contexts, with three associated system-provided vertices.
                  ingress vertex, 
                  egress vertex,
                  feed-back vertex
Edges entering a loop context must pass through an ingress vertex  and edges leaving a loop context must pass through an egress vertex.  Additionally, every cycle in the graph must be contained entirely
within some loop context, and include at least one feed-back vertex  that is not nested within any inner loop contexts.
  • New timestamp
Since the message can entry a loop (iteration). A single integer is not able to track a message's state. For example in traditional stream process a message with timestamp t=2 usually proceeds before a message with timestamp t=3. However when facing loop/iteration structure in timely dataflow model we can not guarantee that. So how to handle this? The answer is that TDM employs a new type of logical timestamp it has the form as follows:
We can see that there is one loop counter for each of the k loop contexts.  The ingress, egress, and feedback vertices act only on the timestamps of messages passing through them. The vertices adjust incoming timestamps as follows:
  • Vertex Computation

As in other dataflow models, the main programming element in a timely dataflow program is the vertex. A timely dataflow vertex is a possibly stateful object that sends and receives messages, and requests and receives notifications.
       -- A vertex may invoke two system-provided methods in the context of these callbacks:
       -- Each vertex v implements two callbacks:


At this point one thing we must bear in mind it that: TDM combine low-latency asynchronous message flow with lightweight synchronous coordination when required.
  • could-result-in relation relation (Process tracking protocol)
By now, we already know notification in TDF is synchronous. That means v.ONNOTIFY(t) can not be invoked until all messages associated with time equal to and less than t are received by vertex v. In  Naiad paper, they define a new type of timestamp called Pointstamp to help guarantee the correctness of delivering notifications. The Pointstamp is defined as follows:
The SENDBY(e,m,t) and NOTIFYAT method generate new event:

For v.SENDBY(e,m,t) the pointstamp of m is (t,e)
For v.NOTIFYAT(t) the pointstamp of the notification is (t,v)
I think this is the most tough part to understand in Naiad paper! I 'd like to use a concrete example to illustrate the could-result-in relation.
Figure 1
Let's say we have a data flow graph as showed in Figure 1. A is ingress vertex, E is egress vertex, D is feed-back vertex. Messages in a timely dataflow system flow only along edges, and their timestamps are modified by ingress, egress, and feedback vertices. we have discussed this constrain in previous part in this blog. In figure 1, each outstanding event (SENDBY,NOTIFY) have a timestamp. We can "sort" these events according a partial order called could-result-in. Please look at Figure 2.
Figure 2
The idea is a message could result in the notification if there exist a chain of events in the system that starts with delivering that message and ends with a message arriving at the same location as a notification at the same time or earlier. The directed edges in Figure 2 can suggest the could-result-in relation. Since we can "sort" events by could-result-in relation, that means there is a well-defined frontier of the events that have no existing predecessors (events within dashed rectangle are in the frontier). And this frontier march forward as events are delivered. So the frontier represents the global progress in a system. When a undelivered vertex notification finally makes it into frontier, that notification is safe to deliver. Because by the way we defined could-result-in order, there is no way that the vertex could ever receive a message with earlier time. That means tracking this frontier is crucial. In Naiad, a scheduler does this thing. A scheduler maintains a set of active events(pointstamp), which are those that correspond to at least one unprocessed event. For each active pointstamp the scheduler maintains two counts: occurrence count and precursor count. Figure 3 show the set.

Figure 3

.
.
.
I will continue writing/add something for this blog.