Wednesday, November 11, 2015

Time Dataflow Framwork for Naiad

Timely Dataflow model (TDM)is the computation model for Naiad. The key feature of timely dataflow model is that it adds loop structure into traditional stream process model (such as Millwheel). The follows is a picture to show the advanced structure,


  •  Graph structure


Time dataflow graph are directed graphs with the constraint that the vertices are organized into possibly nested loop contexts, with three associated system-provided vertices.
                  ingress vertex, 
                  egress vertex,
                  feed-back vertex
Edges entering a loop context must pass through an ingress vertex  and edges leaving a loop context must pass through an egress vertex.  Additionally, every cycle in the graph must be contained entirely
within some loop context, and include at least one feed-back vertex  that is not nested within any inner loop contexts.
  • New timestamp
Since the message can entry a loop (iteration). A single integer is not able to track a message's state. For example in traditional stream process a message with timestamp t=2 usually proceeds before a message with timestamp t=3. However when facing loop/iteration structure in timely dataflow model we can not guarantee that. So how to handle this? The answer is that TDM employs a new type of logical timestamp it has the form as follows:
We can see that there is one loop counter for each of the k loop contexts.  The ingress, egress, and feedback vertices act only on the timestamps of messages passing through them. The vertices adjust incoming timestamps as follows:
  • Vertex Computation

As in other dataflow models, the main programming element in a timely dataflow program is the vertex. A timely dataflow vertex is a possibly stateful object that sends and receives messages, and requests and receives notifications.
       -- A vertex may invoke two system-provided methods in the context of these callbacks:
       -- Each vertex v implements two callbacks:


At this point one thing we must bear in mind it that: TDM combine low-latency asynchronous message flow with lightweight synchronous coordination when required.
  • could-result-in relation relation (Process tracking protocol)
By now, we already know notification in TDF is synchronous. That means v.ONNOTIFY(t) can not be invoked until all messages associated with time equal to and less than t are received by vertex v. In  Naiad paper, they define a new type of timestamp called Pointstamp to help guarantee the correctness of delivering notifications. The Pointstamp is defined as follows:
The SENDBY(e,m,t) and NOTIFYAT method generate new event:

For v.SENDBY(e,m,t) the pointstamp of m is (t,e)
For v.NOTIFYAT(t) the pointstamp of the notification is (t,v)
I think this is the most tough part to understand in Naiad paper! I 'd like to use a concrete example to illustrate the could-result-in relation.
Figure 1
Let's say we have a data flow graph as showed in Figure 1. A is ingress vertex, E is egress vertex, D is feed-back vertex. Messages in a timely dataflow system flow only along edges, and their timestamps are modified by ingress, egress, and feedback vertices. we have discussed this constrain in previous part in this blog. In figure 1, each outstanding event (SENDBY,NOTIFY) have a timestamp. We can "sort" these events according a partial order called could-result-in. Please look at Figure 2.
Figure 2
The idea is a message could result in the notification if there exist a chain of events in the system that starts with delivering that message and ends with a message arriving at the same location as a notification at the same time or earlier. The directed edges in Figure 2 can suggest the could-result-in relation. Since we can "sort" events by could-result-in relation, that means there is a well-defined frontier of the events that have no existing predecessors (events within dashed rectangle are in the frontier). And this frontier march forward as events are delivered. So the frontier represents the global progress in a system. When a undelivered vertex notification finally makes it into frontier, that notification is safe to deliver. Because by the way we defined could-result-in order, there is no way that the vertex could ever receive a message with earlier time. That means tracking this frontier is crucial. In Naiad, a scheduler does this thing. A scheduler maintains a set of active events(pointstamp), which are those that correspond to at least one unprocessed event. For each active pointstamp the scheduler maintains two counts: occurrence count and precursor count. Figure 3 show the set.

Figure 3

.
.
.
I will continue writing/add something for this blog.