Blog

27 Sep, 2011

In this tutorial we are going to create a PageRanking for Wikipedia with the use of Hadoop. This was a good hands-on excercise to get started with Hadoop. The page ranking is not a new thing, but a suitable usecase and way cooler than a word counter! The Wikipedia (en) has 3.7M articles at the moment and is still growing. Each article has many links to other articles. With those incomming and outgoing links we can determine which page is more important than others, which basically is what PageRanking does.

PageRanking
Larry Page came up with the algoritm to determine the page ranking and build a search engine around it in 1996 and named it Google. He is now the CEO at Google, but only earns 1 dollar a year. I will try to explain the page ranking algorithm and how we will implement it.
In this example I will use 4 pages: A, B, C and D an non-existing page. This is a page that has not been created yet, but is being links to from C. In wikipedia you recongnize those pages as red and underlined. The links between the pages are as follows:

Rank of A is highest, because it will get points from B and C.
PageRank of page A = ‘share’ of the PageRank of the pages linking to A.
The formula of calculating the points is as following:

The formula can be simplified to this:
PR(A) = (1-d) + d( PR(B) / Cout(B) + … + PR(C) / Cout(C) )
The d in the formula is the damping factor to simulate ‘a random surfer’ and is usualy set to 0.85. If you are very interested in the details please visit the wiki pageranking page or the pageranking explained page.
If you apply the formula to our example:
PageRank of A = 0.15 + 0.85 * ( PageRank(B)/outgoing links(B) + PageRank(…)/outgoing link(…) )
Calculation of A with initial ranking 1.0 per page:

If we use the initial rank value 1.0 for A, B and C we would have the following output:
I have skipped page D in the result, because it is not an existing page.
A: 1.425
B: 0.15
C: 0.15
Calculation of A with ranking from ITERATION-1:

If we use these ranks as input and calculate it again:
A: 0.34125
B: 0.15
C: 0.15
We see that the page rank of page A is reduced. The PageRank is based on previous calculations and will get more accurate after more runs. You can add new pages, new links in the future and calculate the new rankings. This is one of the tools which search engines use to create there index. We are going to do this with a set of wikipedia pages.

In this tutorial I will not explain how to setup Hadoop, because I cannot explain it better than the very good yahoo-hadoop-tutorial and ebiquity-Hadoop-tutorial with screen shots. I will be using the current stable version hadoop 0.20.2. Note: The eclipse plugin didn’t work for me, I used the latest version instead.
So I assume you have setup an Hadoop configuration with HDFS and an Eclipse environment where you can upload files into the cluster and execute jobs against your files.

The Plan

We will split the work in three different Hadoop jobs: parsing, calculating and ordering.
Parse the big wiki xml into articles in Hadoop Job 1.
In the Hadoop mapping phase, get the article’s name and its outgoing links.
In the Hadoop reduce phase, get for each wikipage the links to other pages.
Store the page, initial rank and outgoing links.

Hadoop Job 2 will calculate the new pageRank.
In the mapping phase, map each outgoing link to the page with its rank and total outgoing links.
In the reduce phase calculate the new page rank for the pages.
Store the page, new rank and outgoing links.
Repeat these steps for more accurate results.

Hadoop Job 3 will map the rank and page
Store the rank and page (ordered on rank)
See the top 10 pages!

If you use the code in your IDE, you will notice lots of the classes are marked as depricated. In this example I use the old API prior to 0.20.x. There is the new API (org.hadoop.mapreduce.) and the old API (org.hadoop.mapred.). Most examples I found on internet were based on the old API. Thats why I used the old API here. The changes can be found in the new hadoop api 0.21. It should not be very difficult to change to new API.

Lets take a look at the structure of a page. A page can be downloaded as a xml file by adding Special:Export to the URL. E.g. to get the XML forthe wiki page about Hilversum:
Wiki Hilversum

[xml title="Hilversum.xml"]
<mediawiki xmlns="https://www.mediawiki.org/xml/export-0.5/"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.mediawiki.org/xml/export-0.5/
https://www.mediawiki.org/xml/export-0.5.xsd"
version="0.5" xml:lang="en">
<siteinfo>
<sitename>Wikipedia</sitename>
<base>https://en.wikipedia.org/wiki/Main_Page</base>
<generator>MediaWiki 1.17wmf1</generator>
<case>first-letter</case>
<namespaces>
<namespace key="-2" case="first-letter">Media</namespace>
...
</namespaces>
</siteinfo>
<page>
<title>Hilversum</title>
<id>13686</id>
<revision>
<id>449460543</id>
<timestamp>2011-09-10T06:42:48Z</timestamp>
<contributor>
<id>7283012</id>
</contributor>
<comment>Hilversum vlag.svg</comment>
<text xml:space="preserve" bytes="13996">
... the page latest revision content with [[LINKS]],
links can point to other pages, files, external sites etc...
</text>
</revision>
</page>
</mediawiki>
[/xml]

It is a fairly simple xml structure with some siteinfo metadata and the page with the latest revision. The main part we are interested in is within the title and the text tags. Download the xml and place it in your HDFS in /user/[hostname]/[user]/wiki/in dir. When you run the job you will see the location where the file should be placed, so you can put the files in the correct directory later, after the first run.

11/09/19 12:02:08 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
11/09/19 12:02:08 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

Lets create the classes in our project for Job 1. The first class we need is the main class that we can run against the hadoop cluster. I called it the WikiPageRanking. It will contain all the jobs later, but for now it only contains the first job.
Note: you can view and fetch the source from github here <a title="github" href="https://github.com/abij/hadoop-wiki-pageranking">abij/hadoop-wiki-pageranking</a>

[java title="WikiPageRanking.java"]
public class WikiPageRanking {
public static void main(String[] args) throws Exception {
WikiPageRanking pageRanking = new WikiPageRanking();
//In and Out dirs in HDFS
pageRanking.runXmlParsing("wiki/in", "wiki/ranking/iter00");
}
public void runXmlParsing(String inputPath, String outputPath) throws IOException {
JobConf conf = new JobConf(WikiPageRanking.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
// Mahout class to Parse XML + config
conf.setInputFormat(XmlInputFormat.class);
conf.set(XmlInputFormat.START_TAG_KEY, "<page>");
conf.set(XmlInputFormat.END_TAG_KEY, "</page>");
// Our class to parse links from content.
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
// Our class to create initial output
JobClient.runJob(conf);
}
[/java]

The main class that can run against your hadoop cluster, we will add more jobs later. You can debug your code (Mapper and Reducer) when you start the program as Debug As..
The normal InputFormat class is the TextInputFormat that will read line by line as values for the map. We want parts of the whole xml to be our input. I chose to use the Mahout XmlInputFormat to get nice input for the mapper interface. It will chop the xml into little parts within the given start and end tag <Page>. From the Hilversum.xml we will get the value between the page tags.

[java title="WikiPageLinksMapper.java" 1="[34-113" 2="2="2="2="2="2="2="2="2="language="class="highlight:""""""""""]"]
public class WikiPageLinksMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private static final Pattern wikiLinksPattern = Pattern.compile("\$.+?\$");
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
// Returns String[0] = <title>[TITLE]</title>
// String[1] = <text>[CONTENT]</text>
// !! without the <tags>.
String[] titleAndText = parseTitleAndText(value);
String pageString = titleAndText[0];
Text page = new Text(pageString.replace(' ', '_'));
//Loop through the matched links in [CONTENT]
while (matcher.find()) {
String otherPage = matcher.group();
//Filter only wiki pages.
//- some have [[realPage|linkName]], some single [realPage]
//- some link to files or external pages.
//- some link to paragraphs into other pages.
if(otherPage == null || otherPage.isEmpty())
continue;
// add valid otherPages to the map.
output.collect(page, new Text(otherPage));
}
}
//... the impl of parsePageAndText(..)
}
}
[/java]

The mapper class that will parse the chunks of xml to key page and value outLinks tuples. In this implementation all links are added to the map, even if they appear multiple times on the page.

[java title="WikiLinksReducer.java"]
public class WikiLinksReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String pagerank = "1.0\t";
boolean first = true;
while(values.hasNext()){
if(!first) pagerank += ",";
pagerank += values.next().toString();
first = false;
}
output.collect(key, new Text(pagerank));
}
}
[/java]

The reducer class that will store the page with the initial PageRank and the outgoing links. This output format is used as input format for the next job. Key<tab>rank<tab>CommaSeparatedList-of-linksOtherPages.
First Run result:
 Hilversum 1.0 Country,Netherlands,Province,North_Holland,Mayor,Democrats_66,A...
Get a bigger file! The 500Mb latest Dutch Wiki is a sufficient start. Extracted the big xml is around 2.3 Gb.
Upload the file to your HFDS in the wiki/in folder and remove the old result folder ‘ranking’. Hadoop will throw an exception if you are about to overwrite existing results. It would be a pitty if your job ran for 3 days and another job overwrites the results without notice.

Hadoop Job 2: Calculate new Page rank

This job calculates the new ranking and generates the same output format as the input, so this job can run multiple times. We will run this job after Job 1. The PageRank will become more accurate after multiple runs, so we will execute the job a few times.

Running the big dataset (1 node)

On my laptop I used a virtual machine for the hadoop setup. The parsing of the XML, calculating 5 times and ordering took in total:
Time: 15 minutes
Input file: ~2,3 Gb
Each rank file: 238 Mb
Result file: 22 Mb
I will not spoil you with the actual results, you should see it for yourself after some heavy data crunching!
It would be nice to execute it on a cluster with multiple nodes and experience the speed, loadbalancing and failover. That’s something for the next blog. When I have used a bigger cluster I will update the post.