Xebia Background Header Wave

In this tutorial we are going to create a PageRanking for Wikipedia with the use of Hadoop. This was a good hands-on excercise to get started with Hadoop. The page ranking is not a new thing, but a suitable usecase and way cooler than a word counter! The Wikipedia (en) has 3.7M articles at the moment and is still growing. Each article has many links to other articles. With those incomming and outgoing links we can determine which page is more important than others, which basically is what PageRanking does.

PageRanking
Larry Page came up with the algoritm to determine the page ranking and build a search engine around it in 1996 and named it Google. He is now the CEO at Google, but only earns 1 dollar a year. I will try to explain the page ranking algorithm and how we will implement it.
In this example I will use 4 pages: A, B, C and D an non-existing page. This is a page that has not been created yet, but is being links to from C. In wikipedia you recongnize those pages as red and underlined. The links between the pages are as follows:

wiki links relationship

Rank of A is highest, because it will get points from B and C.
PageRank of page A = ‘share’ of the PageRank of the pages linking to A.
The formula of calculating the points is as following:

PageRank formula

The formula can be simplified to this:
PR(A) = (1-d) + d( PR(B) / Cout(B) + … + PR(C) / Cout(C) )
The d in the formula is the damping factor to simulate ‘a random surfer’ and is usualy set to 0.85. If you are very interested in the details please visit the wiki pageranking page or the pageranking explained page.
If you apply the formula to our example:
PageRank of A = 0.15 + 0.85 * ( PageRank(B)/outgoing links(B) + PageRank(…)/outgoing link(…) )
Calculation of A with initial ranking 1.0 per page:

If we use the initial rank value 1.0 for A, B and C we would have the following output:
I have skipped page D in the result, because it is not an existing page.
A: 1.425
B: 0.15
C: 0.15
Calculation of A with ranking from ITERATION-1:

If we use these ranks as input and calculate it again:
A: 0.34125
B: 0.15
C: 0.15
We see that the page rank of page A is reduced. The PageRank is based on previous calculations and will get more accurate after more runs. You can add new pages, new links in the future and calculate the new rankings. This is one of the tools which search engines use to create there index. We are going to do this with a set of wikipedia pages.

Hadoop Setup

In this tutorial I will not explain how to setup Hadoop, because I cannot explain it better than the very good yahoo-hadoop-tutorial and ebiquity-Hadoop-tutorial with screen shots. I will be using the current stable version hadoop 0.20.2. Note: The eclipse plugin didn’t work for me, I used the latest version instead.
So I assume you have setup an Hadoop configuration with HDFS and an Eclipse environment where you can upload files into the cluster and execute jobs against your files.

The Plan

We will split the work in three different Hadoop jobs: parsing, calculating and ordering.
Parse the big wiki xml into articles in Hadoop Job 1.
In the Hadoop mapping phase, get the article’s name and its outgoing links.
In the Hadoop reduce phase, get for each wikipage the links to other pages.
Store the page, initial rank and outgoing links.

Hadoop Job 2 will calculate the new pageRank.
In the mapping phase, map each outgoing link to the page with its rank and total outgoing links.
In the reduce phase calculate the new page rank for the pages.
Store the page, new rank and outgoing links.
Repeat these steps for more accurate results.

Hadoop Job 3 will map the rank and page
Store the rank and page (ordered on rank)
See the top 10 pages!

job3

Hadoop API

If you use the code in your IDE, you will notice lots of the classes are marked as depricated. In this example I use the old API prior to 0.20.x. There is the new API (org.hadoop.mapreduce.) and the old API (org.hadoop.mapred.). Most examples I found on internet were based on the old API. Thats why I used the old API here. The changes can be found in the new hadoop api 0.21. It should not be very difficult to change to new API.

Hadoop Job 1: Parse the XML to Page with Links

Lets take a look at the structure of a page. A page can be downloaded as a xml file by adding Special:Export to the URL. E.g. to get the XML forthe wiki page about Hilversum:
Wiki Hilversum

[xml title="Hilversum.xml"]
<mediawiki xmlns="https://www.mediawiki.org/xml/export-0.5/"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.mediawiki.org/xml/export-0.5/
https://www.mediawiki.org/xml/export-0.5.xsd"
version="0.5" xml:lang="en">
<siteinfo>
<sitename>Wikipedia</sitename>
<base>https://en.wikipedia.org/wiki/Main_Page</base>
<generator>MediaWiki 1.17wmf1</generator>
<case>first-letter</case>
<namespaces>
<namespace key="-2" case="first-letter">Media</namespace>
...
</namespaces>
</siteinfo>
<page>
<title>Hilversum</title>
<id>13686</id>
<revision>
<id>449460543</id>
<timestamp>2011-09-10T06:42:48Z</timestamp>
<contributor>
<username>Archengigi</username>
<id>7283012</id>
</contributor>
<comment>Hilversum vlag.svg</comment>
<text xml:space="preserve" bytes="13996">
... the page latest revision content with [[LINKS]],
links can point to other pages, files, external sites etc...
</text>
</revision>
</page>
</mediawiki>
[/xml]

It is a fairly simple xml structure with some siteinfo metadata and the page with the latest revision. The main part we are interested in is within the title and the text tags. Download the xml and place it in your HDFS in /user/[hostname]/[user]/wiki/in dir. When you run the job you will see the location where the file should be placed, so you can put the files in the correct directory later, after the first run.

11/09/19 12:02:08 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
11/09/19 12:02:08 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://192.168.88.128:54310/user/alexanderlaptop/alexander/wiki/in
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:190)

Lets create the classes in our project for Job 1. The first class we need is the main class that we can run against the hadoop cluster. I called it the WikiPageRanking. It will contain all the jobs later, but for now it only contains the first job.
Note: you can view and fetch the source from github here <a title="github" href="https://github.com/abij/hadoop-wiki-pageranking">abij/hadoop-wiki-pageranking</a>

[java title="WikiPageRanking.java"]
public class WikiPageRanking {
public static void main(String[] args) throws Exception {
WikiPageRanking pageRanking = new WikiPageRanking();
//In and Out dirs in HDFS
pageRanking.runXmlParsing("wiki/in", "wiki/ranking/iter00");
}
public void runXmlParsing(String inputPath, String outputPath) throws IOException {
JobConf conf = new JobConf(WikiPageRanking.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
// Mahout class to Parse XML + config
conf.setInputFormat(XmlInputFormat.class);
conf.set(XmlInputFormat.START_TAG_KEY, "<page>");
conf.set(XmlInputFormat.END_TAG_KEY, "</page>");
// Our class to parse links from content.
conf.setMapperClass(WikiPageLinksMapper.class);
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
// Our class to create initial output
conf.setReducerClass(WikiLinksReducer.class);
JobClient.runJob(conf);
}
[/java]

The main class that can run against your hadoop cluster, we will add more jobs later. You can debug your code (Mapper and Reducer) when you start the program as Debug As..
The normal InputFormat class is the TextInputFormat that will read line by line as values for the map. We want parts of the whole xml to be our input. I chose to use the Mahout XmlInputFormat to get nice input for the mapper interface. It will chop the xml into little parts within the given start and end tag <Page>. From the Hilversum.xml we will get the value between the page tags.

[java title="WikiPageLinksMapper.java" 1="[34-113" 2="2="2="2="2="2="2="2="2="language="class="highlight:""""""""""]"]
public class WikiPageLinksMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private static final Pattern wikiLinksPattern = Pattern.compile("\\[.+?\\]");
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
// Returns String[0] = <title>[TITLE]</title>
// String[1] = <text>[CONTENT]</text>
// !! without the <tags>.
String[] titleAndText = parseTitleAndText(value);
String pageString = titleAndText[0];
Text page = new Text(pageString.replace(' ', '_'));
Matcher matcher = wikiLinksPattern.matcher(titleAndText[1]);
//Loop through the matched links in [CONTENT]
while (matcher.find()) {
String otherPage = matcher.group();
//Filter only wiki pages.
//- some have [[realPage|linkName]], some single [realPage]
//- some link to files or external pages.
//- some link to paragraphs into other pages.
otherPage = getWikiPageFromLink(otherPage);
if(otherPage == null || otherPage.isEmpty())
continue;
// add valid otherPages to the map.
output.collect(page, new Text(otherPage));
}
}
//... the impl of parsePageAndText(..)
//... the impl of getWikiPageFromLink(..)
}
}
[/java]

The mapper class that will parse the chunks of xml to key page and value outLinks tuples. In this implementation all links are added to the map, even if they appear multiple times on the page.

[java title="WikiLinksReducer.java"]
public class WikiLinksReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String pagerank = "1.0\t";
boolean first = true;
while(values.hasNext()){
if(!first) pagerank += ",";
pagerank += values.next().toString();
first = false;
}
output.collect(key, new Text(pagerank));
}
}
[/java]

The reducer class that will store the page with the initial PageRank and the outgoing links. This output format is used as input format for the next job. Key<tab>rank<tab>CommaSeparatedList-of-linksOtherPages.
First Run result:

Hilversum 1.0 Country,Netherlands,Province,North_Holland,Mayor,Democrats_66,A...

Get a bigger file! The 500Mb latest Dutch Wiki is a sufficient start. Extracted the big xml is around 2.3 Gb.
Upload the file to your HFDS in the wiki/in folder and remove the old result folder ‘ranking’. Hadoop will throw an exception if you are about to overwrite existing results. It would be a pitty if your job ran for 3 days and another job overwrites the results without notice.

Hadoop Job 2: Calculate new Page rank

This job calculates the new ranking and generates the same output format as the input, so this job can run multiple times. We will run this job after Job 1. The PageRank will become more accurate after multiple runs, so we will execute the job a few times.

Mapper

This job has its own mapper and reducer classes:
sample input:</h2>
<p>Page_A 1.0
Page_B 1.0 Page_A
Page_C 1.0 Page_A,Page_D

[java title="Mapper"]
public class RankCalculateMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
int pageTabIndex = value.find("\t");
int rankTabIndex = value.find("\t", pageTabIndex+1);
String page = Text.decode(value.getBytes(), 0, pageTabIndex);
String pageWithRank = Text.decode(value.getBytes(), 0, rankTabIndex+1);
// Mark page as an Existing page (ignore red wiki-links)
output.collect(new Text(page), new Text("!"));
// Skip pages with no links.
if(rankTabIndex == -1) return;
String links = Text.decode(value.getBytes(), rankTabIndex+1, value.getLength()-(rankTabIndex+1));
String[] allOtherPages = links.split(",");
int totalLinks = allOtherPages.length;
for (String otherPage : allOtherPages){
Text pageRankTotalLinks = new Text(pageWithRank + totalLinks);
output.collect(new Text(otherPage), pageRankTotalLinks);
}
// Put the original links of the page for the reduce output
output.collect(new Text(page), new Text("|"+links));
}
}
[/java]

Some links point to wikipages that do not exist (yet). In the browser you see them as red links. In the result I want to skip the non-existing pages. I chose to mark the page with an explanetion mark to indicate this page is an actual wiki page. The reducer-class will use only these pages to generate output.

For each link there is an output with the combined value page, rank and totalLink.
The last output of the mapper is the page and the origional links. We need the link so the reducer is be able to produce the correct output.
sample output:</h2>
<p>Page_A !
Page_C |Page_A
Page_B !
Page_B |Page_A
Page_A Page_B 1.0 1
Page_C !
Page_A Page_C 1.0 2
Page_D Page_C 1.0 2

Recuder

The reducer will receive the key, values ordered by key. In a distributed environment the map is cut in slices and all nodes will get a share. The reducer will calculate the new pageRank and write it to output for the existing pages with the origional links.
sample input (sorted on key):</h2>
<p>Page_A !
Page_A Page_C 1.0 2
Page_A Page_B 1.0 1
Page_B !
Page_B |Page_A
Page_C !
Page_C |Page_A
Page_D Page_C 1.0 2

[java title="Reducer"]
public class RankCalculateReduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
private static final float damping = 0.85F;
@Override
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> out, Reporter reporter) throws IOException {
boolean isExistingWikiPage = false;
String[] split;
float sumShareOtherPageRanks = 0;
String links = "";
String pageWithRank;
// For each otherPage:
// - check control characters
// - calculate pageRank share <rank> / count(<links>)
// - add the share to sumShareOtherPageRanks
while(values.hasNext()){
pageWithRank = values.next().toString();
if(pageWithRank.equals("!")) {
isExistingWikiPage = true;
continue;
}
if(pageWithRank.startsWith("|")){
links = "\t"+pageWithRank.substring(1);
continue;
}
split = pageWithRank.split("\\t");
float pageRank = Float.valueOf(split[0]);
int countOutLinks = Integer.valueOf(split[1]);
sumShareOtherPageRanks += (pageRank/countOutLinks);
}
if(!isExistingWikiPage) return;
float newRank = damping * sumShareOtherPageRanks + (1-damping);
out.collect(key, new Text(newRank + links));
}
}
[/java]

The output of the reducer contains the new pageRank for the existing pages with the links on those pages.
sample output:
Page_A 1.425
Page_B 0.15 Page_A
Page_C 0.15 Page_A,Page_D

We need to configure the main class so the new job is executed for a couple of times after the xml-parsing job. I have commented out the last job for now, we will create it after in the next paragraph.

[java]
public class WikiPageRanking {
private static NumberFormat nf = new DecimalFormat("00");
public static void main(String[] args) throws Exception {
WikiPageRanking pageRanking = new WikiPageRanking();
//Job 1: Parse XML
pageRanking.runXmlParsing("wiki/in", "wiki/ranking/iter00");
int runs = 0;
for (; runs < 5; runs++) {
//Job 2: Calculate new rank
pageRanking.runRankCalculation("wiki/ranking/iter"+nf.format(runs), "wiki/ranking/iter"+nf.format(runs + 1));
}
//Job 3: Order by rank
//pageRanking.runRankOrdering("wiki/ranking/iter"+nf.format(runs), "wiki/result");
}
public void runXmlParsing(String inputPath, String outputPath) throws IOException {
JobConf conf = new JobConf(WikiPageRanking.class);
conf.set(XmlInputFormat.START_TAG_KEY, "<page>");
conf.set(XmlInputFormat.END_TAG_KEY, "</page>");
// Input / Mapper
FileInputFormat.setInputPaths(conf, new Path(inputPath));
conf.setInputFormat(XmlInputFormat.class);
conf.setMapperClass(WikiPageLinksMapper.class);
// Output / Reducer
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setReducerClass(WikiLinksReducer.class);
JobClient.runJob(conf);
}
private void runRankCalculation(String inputPath, String outputPath) throws IOException {
JobConf conf = new JobConf(WikiPageRanking.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setMapperClass(RankCalculateMapper.class);
conf.setReducerClass(RankCalculateReduce.class);
JobClient.runJob(conf);
}
/*
private void runRankOrdering(String inputPath, String outputPath) throws IOException {
JobConf conf = new JobConf(WikiPageRanking.class);
conf.setOutputKeyClass(FloatWritable.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setMapperClass(RankingMapper.class);
JobClient.runJob(conf);
}
*/
}
[/java]

I have added a loop around the execution of Job 2. It will take the input from wiki/ranking/iter00 for the first run and create output in wiki/ranking/iter01. For the next run the dir iter01 is considered the input directoy. When the loop is finished the Job 3 will get the last iterXX dir as input for the final job the ordering.

Job 3: Order last run on PageRank

This is a simple job that uses the input to get the page and rank. And map the key: rank to value: page. Hadoop will do the sorting on key for us. We don’t need to implement a reducer. THe mapper and sorting is enough for our result, the ordered list.
sample input:</h2>
<p>Page_A 1.425
Page_B 0.15 Page_A
Page_C 0.15 Page_A,Page_D

[java]
public class RankingMapper extends MapReduceBase implements Mapper<LongWritable, Text, FloatWritable, Text> {
@Override
public void map(LongWritable key, Text value, OutputCollector<FloatWritable, Text> output, Reporter arg3) throws IOException {
String[] pageAndRank = getPageAndRank(key, value);
float parseFloat = Float.parseFloat(pageAndRank[1]);
Text page = new Text(pageAndRank[0]);
FloatWritable rank = new FloatWritable(parseFloat);
output.collect(rank, page);
}
private String[] getPageAndRank(LongWritable key, Text value) throws CharacterCodingException {
String[] pageAndRank = new String[2];
int tabPageIndex = value.find("\t");
int tabRankIndex = value.find("\t", tabPageIndex + 1);
// no tab after rank (when there are no links)
int end;
if (tabRankIndex == -1) {
end = value.getLength() - (tabPageIndex + 1);
} else {
end = tabRankIndex - (tabPageIndex + 1);
}
pageAndRank[0] = Text.decode(value.getBytes(), 0, tabPageIndex);
pageAndRank[1] = Text.decode(value.getBytes(), tabPageIndex + 1, end);
return pageAndRank;
}
}
[/java]

The sorting on the key is ascending. So at the bottom is the highest rank page. Preferably the job should order descending. For now the result is ordered and that is good enough. Now we can uncomment Job 3 in the main class and execute all jobs together against the big dataset.
sample output:</h2>
<p>1.425 Page_A
0.15 Page_B
0.15 Page_C

Running the big dataset (1 node)

On my laptop I used a virtual machine for the hadoop setup. The parsing of the XML, calculating 5 times and ordering took in total:
Time: 15 minutes
Input file: ~2,3 Gb
Each rank file: 238 Mb
Result file: 22 Mb
I will not spoil you with the actual results, you should see it for yourself after some heavy data crunching!
It would be nice to execute it on a cluster with multiple nodes and experience the speed, loadbalancing and failover. That’s something for the next blog. When I have used a bigger cluster I will update the post.
You can view/download the source files from github.

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts