Implement a distributed program for vertex engines. This should run both our own GAS programs and the tinkerpop3 vertex program API.
This can be a callable distributed to each node on the cluster with the parameterized GAS algorithm. The SPO(C) and O(C)SP indices provide forward and reverse traversal indices (aka out-edges and in-edges respectively). The decomposition of the GAS algorithm should use low latency communications since we will need to provide efficient support for a lot of in-flight fine-grained operations. This could evolve to an asynch engine easily enough, and we might even want to build that in front the start since it will improve performance significantly and closely align with our MapGraph goals.
We probably need to use the MemStore for the compute state on the nodes to avoid heap pressure (this is also true for the single machine gas engine which currently uses a ConcurrentHashMap).
Scale-out stores each edge multiple times (3 times for triples, 6 times for quads). The SPO(C) shards correspond to the in-edges. The O(C)SP shards correspond to the out-edges. Some consequences of this design:
- The shards for the in-edges and out-edges are NOT co-located.
- Note: the 2D decomposition currently used by MapGraph would co-locate those shards. So does the 4-store architecture. Either we should plan to change the way in which we handle shard splits in the future to bring the system into alignment with the 2D architecture of MapGraph -or- get clarity about the real target data layout for MapGraph and converge on that (which is not a 2D layout and might actually be closer to what we are already doing).
- Vertex state needs to be mapped onto DS or CS nodes based on something such as the hash of the vertex id. This means that it will basically never be co-located with the edges for the vertex.
- Low-level and continuous streaming small messages will be important for scalability of the computation since edges are basically always remote.
- We could run the vertex computation on the CS nodes. If we also key-range partition over the CS nodes on hash(SO), then we could co-locate the computation state for edges.
The Gather and Scatter operations need to be decomposed over the DS nodes for the SPO(C) and O(C)SP (depending on whether we are reading on the in-edges or out-edges). There are two ways of approaching this, depending on whether or not we want to make the DS nodes responsible for the computation (and storing the computation state) or concentrate the computation on the CS nodes only doing reads on the DS nodes. Since both CS and DS nodes implement the discoverable IRemoteExecutor service, we could implement for the assumption that the computation runs on the CS nodes but allow for it to be executed on the DS nodes. There are some tradeoffs that need to be explored for the CS vs DS nodes decision, including:
- If we are allocating CS nodes for bulk load, then they could be readily reused for vertex programs and the overall system might be more amenable to concurrent vertex program execution since we could just allocate more CS nodes even though the read demand on the DS nodes would be scaled by concurrent vertex programs;
- The DS nodes have local access to the ISPO. This would let us avoid sending the edges for the scatter and gather operations (the DS would read the remote vertex state from whatever node covers that vertex state). It also co-locates the edge-state with the edge which eliminates NIO for the edge state. These reductions in NIO are probably quite important. Overall, I think that we are better off refactoring the bulk-loader to run on the DS nodes that we are using the CS nodes to execute GAS algorithms.
- decided: The GAS execution will occur on the DS nodes for better scalability and significantly less NIO.
- decided: The vertex state needs to be hashed across the DS nodes (hash(S|O)) and should be maintained in direct memory buffers to reduce heap pressure.
- decided: The edge state will be 1:1 with the DS node having the in-edge or out-edge shard (and there will be distinct state for the in-edge and the out-edge).
- decided: We need to have queues for the message, vertex, and edge operations to support lots of in-flight messages and asynchronous communications.
- The vertex state, edge-state, and frontier SHOULD be maintained in direct memory buffers to reduce heap pressure.
Scatter/Gather: Note: A GAS algorithm may include a scatter phase, gather phase, or both:
- The node covering the source vertex state sees that the vertex is active in the frontier. The node distributes a request over the DS nodes spanning the shards for the in-edges, out-edges, or both for that source vertex.
- The DS reads those edges from the local shards and combines them with the (local) edge state for the computation.
- The DS node reads the remote vertex state (this will nearly always be a remote node). Note: This is done for both gather and scatter for simplicity and because sending the edge + edgeState required by a scatter operation is nearly always more expensive in terms of network bandwidth. The only catch is that we can not implement atomic scatter updates of the remote vertex unless we send the edge + edgeState to the node covering the remote vertex. If that is required, then we need to change this to have a different pattern for gather vs scatter.
- The node dequeues the remote vertex state. At this point it has on hand the source vertex state, edge, edge state, and remote vertex state:
- Gather: The node computes the gather over the edge, edge state, and remote vertex state and updates the local aggregation of the gather (binary operator).
- Scatter: The node computes the scatter over the edge, edge state, and remote vertex state:
- If the scatter needs to update the edge state, then it modifies the local edge state (this is basically free). Note: If we need the forward and reverse edges to have the same edge state, then we need to enqueue a message with the new edge state for the reverse edge.
- If the scatter needs to enqueue the vertex and/or modify its state, the DS node sends the updated vertex state back to the remote node.
- BSP: Once the gather phase is done, the node covering the state for a vertex will run apply() and then the optional scatter phase.
- ASP: apply() will be triggered periodically based on a work queue of vertices for which updates have been received.
- BSP: There needs to be a global barrier for each iteration and we need to be able to tell when all in-flight messages for the current iteration are done.
- ASP: The algorithm continues to execute speculatively and asynchronously. We need a way to decide that all nodes are converged. Once this decision is reached, we interrupt the execution of the algorithm and report the current result.
In order to implement BSP (vs ASP) semantics we would need to include the iteration number in the tuple being sent to the remote vertex CS and the results being aggregated by the source vertex CS.
TODO Both BSP and ASP require us to track the #of inflight messages so we can decide when all CS nodes are done. This can be done by the clients using a CAS (a striped atomic long class).
Step 1: Analyze the description above. Reduce to a network flow / message diagram. Compare with network flow / message diagrams for 2D MapGraph and asynchronous MapGraph engines. Compare with BG/Q asynchronous hybrid data layout paper in terms of message and data volume. Look at how we will make termination decisions.
Step 2: Implement BSP/ASP engines based on queuing and light weight messages with lots of in flight messages and low per-message overhead to reduce latency as much as possible at the application tier. Look for opportunities to coalesce messages while they are enqueued for output (per target node) and when they are enqueued on the receiver. For Java, we want to use over-subscription of threads to high latency for both disk access (AP scans for in-edge and out-edge lists) and memory access (for vertex and edge state data stored on the native heap in managed NIO buffers). Those buffers should be scoped to the specific vertex program instance so when the task is cancelled / completes, all memory is immediately recovered.
Step 2a: Explore impact of this design in terms of a fully asynchronous MapGraph engine. For example, if we dictionary encode everything (which means that the load is not embarrassingly parallel and also that we need to do dictionary materialization for everything, but we have fixed length 9 byte IVs for everything), then we could directly integrate MapGraph. (We could also not do dictionary encoding when loading data into BG and do it when lifting the data into MapGraph).
Step 3: Test suite for simple graphs (we already have this, but I am not sure yet whether we can reuse the GASEngine test suite in scale-out).
Step 4: Performance testing on large graphs. For example, for BFS we can use the same validation techniques that are used for Graph 500 runs.
See http://www.tinkerpop.com/docs/3.0.0.M1/ (Tinkerpop3)
See http://www.tinkerpop.com/docs/3.0.0.M1/#implementations (Vendor implementation requirements)