Breadth-First Search (BFS) & MapReduce

The Breadth-First Search (BFS) & MapReduce was roughly introduced from Distributed Computing Seminar. The graph is stored as a sparse matrix, finds shortest path using Map/Reduce as describe below:

Finding the Shortest Path: Intuition

- We can define the solution to this problem inductively:
- DistanceTo(startNode) = 0
- For all nodes n directly reachable from startNode, DistanceTo(n) = 1
- For all nodes n reachable from some other set of nodes S,
- DistanceTo(n) = 1 + min(DistanceTo(m), m ∈ S)


From Intuition to Algorithm

- A map task receives a node n as a key, and (D, points-to) as its value
- D is the distance to the node from the start
- points-to is a list of nodes reachable from n
∀p ∈ points-to, emit (p, D+1)
- Reduce task gathers possible distances to a given p
and selects the minimum one

According to above-mentioned idea, A map task receives a node n as a key, and (D, points-to) as its value. It means that an "input" is a set of all reachable path from 'start' to each key.

Updated: My mis-understand. It doesn't need an set of all reachable path. See page 9 - "Adjacency Matrices".

- Another classic graph representation.
M[i][j]= '1' implies a link from node i to j.
- Naturally encapsulates iteration over nodes

| 1 2 3 4
--+----------
1 | 0 1 0 1
2 | 1 0 1 1
3 | 0 1 0 0
4 | 1 0 1 0

So, It can be represented as below:

1 (2, 4)
2 (1, 3, 4)
3 (2)
4 (1, 3)

Then, MR job will iterate 'hop' times(= maximum distance). Let's assume the start node is "1".

First MR job:

1 D=0, points-to=(2, 4)
2 D=inf, points-to=(1, 3, 4)
3 D=inf, points-to=(2)
4 D=inf, points-to=(1, 3)

Result: (2,1), (4,1)

Second MR job:

1 D=0, points-to=(2, 4)
2 D=1, points-to=(1, 3, 4)
3 D=inf, points-to=(2)
4 D=1, points-to=(1, 3)

Result: (2,1), (3,2), (4,1)

Here's my test code:

public class BFS {
static Map<String, ArrayList<String>> collect = new HashMap<String, ArrayList<String>>();

public static void main(String[] args) {
String[] value = {
// key | distance | points-to
"1|0|2;4",
"2|"+Integer.MAX_VALUE+"|1;3;4",
"3|"+Integer.MAX_VALUE+"|2",
"4|"+Integer.MAX_VALUE+"|1;3",
};

mapper(value);
reducer(collect.entrySet());
}

private static void mapper(String[] value) {
for (int i = 0; i < value.length; i++) {
String line = value[i].toString();
String[] keyVal = line.split("[|]");

String Key = keyVal[0];
String sDist = keyVal[1];
String[] links = null;
if (keyVal.length > 2) {
links = keyVal[2].split(";");
int Dist = Integer.parseInt(sDist);

if (Dist != Integer.MAX_VALUE)
Dist++;

for (int x = 0; x < links.length; x++) {
if (links[x] != "") {
ArrayList<String> list;
if (collect.containsKey(links[x])) {
list = collect.get(links[x]);
} else {
list = new ArrayList<String>();
}

list.add(Dist + "|");
collect.put(links[x], list);
}
}

ArrayList<String> list;
if (collect.containsKey(Key)) {
list = collect.get(Key);
} else {
list = new ArrayList<String>();
}
list.add(sDist + "|" + keyVal[2]);
collect.put(Key, list);
}
}
}

private static void reducer(Set<Entry<String, ArrayList<String>>> entrySet) {
for (Map.Entry<String, ArrayList<String>> e : entrySet) {
Iterator<String> values = e.getValue().iterator();
int minDist = Integer.MAX_VALUE;
String link_list = "";

while (values.hasNext()) {
String[] dist_links = values.next().toString().split("[|]");
if (dist_links.length > 1)
link_list = dist_links[1];

int dist = Integer.parseInt(dist_links[0]);
minDist = Math.min(minDist, dist);
}

System.out.println(e.getKey() + " - D " + (minDist + " | " + link_list));
}
}
}

14 comments:

  1. Great! Really awesome. I've started with hadoop few months ago, and now I'm exploring hama. Cool project, I had a quite few ideas on the past that would take huge sparse matrix (millionsxmillions), I hope this solves my problems.
    Thanks for the project dude!

    ReplyDelete
  2. I thank you for your time and I hope to hear your advice. ;)

    ReplyDelete
  3. Just commented, thinking The row URLs and anchor family of webTable that mentioned in BigTable paper is same with above structure. It's the web-link graph which is represented as an adjacency matrix.

    ReplyDelete
  4. As far as I understand, this BFS method will have to run in O(D) iterations where D is the diameter of the graph. Consider a graph that is formed like a linked-list (a chain). Running BFS on this graph where the source is on the start of the linked list will force it to run N iterations of map/reduce. That is, N is the diameter of the graph. Thus making map/reduce NOT suitable for processing graph with large diameter. This is also true for finding shortest path, it has to run in O(D) map/reduce iterations.

    Do you know any tricks to improve finding shortest path that requires less than O(D) map/reduce iterations?

    ReplyDelete
  5. Yeah right, It requires O(D) M/R iterations.

    I think, there is no better solution. Even if iterations are reduced, It'll cause huge network costs. In other words, the M/R programming isn't fit with graph computing.

    There is a Google Pregel for large-scale graph computing, and the BSP package of Apache Hama -- http://blog.udanax.org/2009/12/bsp-package-of-hama-on-hadoop-is-now.html

    I hope this is of some help. Don't hesitate to ask further details.

    ReplyDelete
  6. I saw Apache Hamburg and Hama presentation slides:

    http://wiki.apache.org/hama/Presentations?action=AttachFile&do=get&target=Hamburg-Hadoop.pdf

    http://wiki.apache.org/hama/BulkSynchronizationParallel?action=AttachFile&do=get&target=Apache_HAMA_BSP.pdf

    They are great! BSP seemed to help in reducing the number of M/R iterations in doing the BFS by grouping some vertices in a "super node" and process them in a "super step" thus each iteration may expand the frontier more than one.

    I saw partial codes in the slides on how to do Breadth-frist search using Hama. However it'd be nice to see the full working code and actuall try it and benchmark it with the regular M/R (the non BSP). The wiki BFS on Hama is not available yet (http://wiki.apache.org/hama/BFS).

    Can you give some clue on the Hama project? How mature it is right now?

    Thanks!

    ReplyDelete
  7. Website: http://incubator.apache.org/hama/

    ReplyDelete
  8. I'm thinking that BSP can be done in plain M/R if there is a way to "Group" the values for a certain key-range for the Mapper (we know that in Reducer, there is a way to group the values). Here is how to simulate BSP in regular M/R:

    Currently, in regular M/R, each map() function will take and process exactly ONE key-value pair at at time. Now we want to modify the map() function so that it takes SEVERAL key-value pairs (so that they can be processed in "bulk") thus simulating BSP ;) To group several key-value pairs together, we need to have a Mapper GROUPING function to tell which key-value pairs belong to one group. The Mapper map function API has to be changed to something like:

    map(
    KeyRangeWritable keyRange,
    List values,
    OutputCollector output,
    Reporter reporter
    )

    The "keyRange" contains a pair of key: start key and end key of this group.
    The "values" contains the actual values for the keys inbetween the start and end of the "keyRange" above.
    The "output" will collect INDIVIDUAL KEY that is emmitted from this map.
    The "reporter" is the same as the old reporter.

    So now, each map() processes a group of key-value pairs, thus it's like BSP and can do the Bulk processing as in slide #11 and #17 of:

    http://wiki.apache.org/hama/Presentations?action=AttachFile&do=get&target=Hamburg-Hadoop.pdf

    One way to group the key-value pairs is first you need to sort the input based on the key, and then define the Group function (like the Reducer grouping the values in SecondarySort) that tells the range of the keys for the groups. Complex grouping can be done by defining a customized sorting function for the key-value pair and then use the grouping function based on that input.

    So, how does Hama works to provide BSP? Is there any paper/articles of it?

    ReplyDelete
  9. http://blog.udanax.org/2009/07/hamburg-graph-computing-framework-on.html

    This post could be helpful for you. In M/R case, it should be iterate 3 times. That algorithm can't be implemented on M/R framework.

    I agree that they are similar on one side but, If you look closely, they are different in synchronization/combine of data. Each nodes should know who they should communicate with.

    >> So, how does Hama works to provide BSP? Is there any paper/articles of it?

    Yes but, we're still in progress. :)

    ReplyDelete
  10. I cannot draw the graph here in this example. I believe there is a mistake.
    When "3 (2)", how is "4 (1, 3)" possible? In other words, node 3 can reach node 2 and node 4 can reach nodes 1 and 3. Then, why node 3 cannot reach node 4?

    ReplyDelete
  11. It seems there's a typo. 4 (1, 2), not 4 (1, 3)

    ReplyDelete
  12. This comment has been removed by the author.

    ReplyDelete
  13. Hi Edward J. Yoon, I'm looking on MapReduce apply in board games, like Go, Othello,.... I want to use MapReduce for set Monte Carlo Tree search and Minimax algorithms in board games such as GomoKu, Go, Connect6 ... Hope you help? You can give me for examples are not? For example Tic-Tac-Toe game?
    Look forward to your help.
    Thank you very much!

    ReplyDelete
  14. I realize that this content is worth to read. I expect to see more posts from you that makes me impressed just like this one. Good job!!

    ReplyDelete