In diesem Tutorial werden wir ein PageRanking für Wikipedia mit Hilfe von Hadoop erstellen. Dies war eine gute praktische Übung, um mit Hadoop zu beginnen. Das PageRanking ist keine neue Sache, aber ein geeigneter Anwendungsfall und viel cooler als ein Wortzähler! Die Wikipedia (en) hat derzeit 3,7 Millionen Artikel und wächst weiter. Jeder Artikel hat viele Links zu anderen Artikeln. Anhand dieser eingehenden und ausgehenden Links können wir feststellen, welche Seite wichtiger ist als andere, was im Grunde genommen die Funktion des PageRankings ist.
PageRanking Larry Page entwickelte 1996 den Algorithmus zur Bestimmung des Seiten-Rankings und baute eine Suchmaschine um ihn herum, die er Google nannte. Heute ist er der CEO von Google, verdient aber nur 1 Dollar im Jahr. Ich werde versuchen, den Algorithmus für das Seitenranking zu erklären und wie wir ihn implementieren. In diesem Beispiel werde ich 4 Seiten verwenden: A, B, C und D eine nicht existierende Seite. Dies ist eine Seite, die noch nicht erstellt wurde, auf die aber von C aus verlinkt wird. In Wikipedia erkennen Sie diese Seiten als rot und unterstrichen. Die Links zwischen den Seiten sind wie folgt:
Der Rang von A ist am höchsten, da er Punkte von B und C erhält. PageRank der Seite A = 'Anteil' des PageRanks der Seiten, die auf A verlinken. Die Formel zur Berechnung der Punkte lautet wie folgt:
Die Formel kann wie folgt vereinfacht werden: PR(A) = (1-d) + d( PR(B) / Cout(B) + ... + PR(C) / Cout(C) ) Das d in der Formel ist der Dämpfungsfaktor, um einen 'zufälligen Surfer' zu simulieren und wird normalerweise auf 0,85 gesetzt. Wenn Sie sich sehr für die Details interessieren, besuchen Sie bitte die Wiki-Seite pageranking oder die Seite pageranking explained. Wenn Sie die Formel auf unser Beispiel anwenden: PageRank von A = 0,15 + 0,85 * ( PageRank(B)/ausgehende Links(B) + PageRank(...)/ausgehender Link(...) ) Berechnung von A mit einem anfänglichen Ranking von 1,0 pro Seite:
Wenn wir den anfänglichen Rangwert 1.0 für A, B und C verwenden, erhalten wir die folgende Ausgabe: Ich habe die Seite D im Ergebnis übersprungen, da sie nicht existiert. A: 1.425 B: 0.15 C: 0.15 Berechnung von A mit dem Ranking von ITERATION-1:
Wenn wir diese Ränge als Eingabe verwenden und erneut berechnen: A: 0.34125 B: 0.15 C: 0.15 Wir sehen, dass der PageRank von Seite A reduziert wird. Der PageRank basiert auf früheren Berechnungen und wird nach mehreren Durchläufen immer genauer. Sie können in Zukunft neue Seiten und neue Links hinzufügen und die neuen Rankings berechnen. Dies ist eines der Werkzeuge, die Suchmaschinen verwenden, um ihren Index zu erstellen. Wir werden dies mit einer Reihe von Wikipedia-Seiten tun.
Hadoop-Einrichtung
In diesem Tutorial werde ich nicht erklären, wie man Hadoop einrichtet, da ich es nicht besser erklären kann als das sehr gute yahoo-hadoop-tutorial und ebiquity-Hadoop-tutorial mit Screenshots. Ich werde die aktuelle stabile Version hadoop 0.20.2 verwenden. Hinweis: Das Eclipse-Plugin hat bei mir nicht funktioniert, ich habe stattdessen die neueste Version verwendet. Ich gehe also davon aus, dass Sie eine Hadoop-Konfiguration mit HDFS und eine Eclipse-Umgebung eingerichtet haben, in der Sie Dateien in den Cluster hochladen und Aufträge gegen Ihre Dateien ausführen können.
Der Plan
Wir werden die Arbeit in drei verschiedene Hadoop-Jobs aufteilen: Parsen, Berechnen und Ordnen. Parsen Sie die große Wiki-XML in Artikel in Hadoop Job 1. In der Hadoop-Mapping-Phase erhalten Sie den Namen des Artikels und seine ausgehenden Links. In der Hadoop-Reduktionsphase erhalten Sie für jede Wikiseite die Links zu anderen Seiten. Speichern Sie die Seite, den anfänglichen Rang und die ausgehenden Links.
Hadoop Job 2 berechnet den neuen PageRank. In der Mapping-Phase ordnen Sie jeden ausgehenden Link der Seite mit seinem Rang und der Gesamtzahl der ausgehenden Links zu. In der Reduktionsphase berechnen Sie den neuen PageRank für die Seiten. Speichern Sie die Seite, den neuen Rang und die ausgehenden Links. Wiederholen Sie diese Schritte, um genauere Ergebnisse zu erhalten.
Hadoop Job 3 mappt den Rang und die Seite Speichern Sie den Rang und die Seite (geordnet nach Rang) Sehen Sie sich die Top 10 Seiten an!
Hadoop-API
Wenn Sie den Code in Ihrer IDE verwenden, werden Sie feststellen, dass viele der Klassen als depriviert markiert sind. In diesem Beispiel verwende ich die alte API vor 0.20.x. Es gibt die neue API (org.hadoop.mapreduce.) und die alte API (org.hadoop.mapred.). Die meisten Beispiele, die ich im Internet gefunden habe, basieren auf der alten API. Deshalb habe ich hier die alte API verwendet. Die Änderungen sind in der neuen hadoop api 0.21 zu finden. Es sollte nicht sehr schwierig sein, auf die neue API zu wechseln.
Hadoop-Aufgabe 1: Parsen der XML-Datei zur Seite mit Links
Werfen wir einen Blick auf die Struktur einer Seite. Eine Seite kann als XML-Datei heruntergeladen werden, indem Sie Special:Export zur URL hinzufügen. Um z.B. die XML-Datei für die Wiki-Seite über Hilversum zu erhalten: Wiki Hilversum
[xml title="Hilversum.xml"]
<mediawiki
xmlns_xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi_schemaLocation="https://www.mediawiki.org/xml/export-0.5/
https://www.mediawiki.org/xml/export-0.5.xsd"
version="0.5" xml_lang="en">
<siteinfo>
<sitename>Wikipedia</sitename>
<base>https://en.wikipedia.org/wiki/Main_Page</base>
<generator>MediaWiki 1.17wmf1</generator>
<case>first-letter</case>
<namespaces>
<namespace key="-2" case="first-letter">Media</namespace>
...
</namespaces>
</siteinfo>
<page>
<title>Hilversum</title>
<id>13686</id>
<revision>
<id>449460543</id>
<timestamp>2011-09-10T06:42:48Z</timestamp>
<contributor>
<username>Archengigi</username>
<id>7283012</id>
</contributor>
<comment>Hilversum vlag.svg</comment>
<text xml_space="preserve" bytes="13996">
... the page latest revision content with [[LINKS]],
links can point to other pages, files, external sites etc...
</text>
</revision>
</page>
</mediawiki>
[/xml]
Es handelt sich um eine recht einfache xml-Struktur mit einigen siteinfo-Metadaten und der Seite mit der letzten Revision. Der Hauptteil, der uns interessiert, befindet sich innerhalb des Titels und der Text-Tags. Laden Sie die xml-Datei herunter und legen Sie sie in Ihrem HDFS im Verzeichnis /user/[hostname]/[user]/wiki/in ab. Wenn Sie den Job ausführen, sehen Sie den Ort, an dem die Datei abgelegt werden soll, so dass Sie die Dateien später, nach dem ersten Lauf, in das richtige Verzeichnis legen können.
11/09/19 12:02:08 INFO jvm.JvmMetrics: Initialisierung von JVM Metrics mit processName=JobTracker, sessionId=
11/09/19 12:02:08 WARN mapred.JobClient: Verwenden Sie GenericOptionsParser zum Parsen der Argumente. Die Anwendungen sollten ein entsprechendes Tool implementieren.
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Eingabepfad existiert nicht: hdfs://192.168.88.128:54310/user/alexanderlaptop/alexander/wiki/in
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:190)
Lassen Sie uns die Klassen in unserem Projekt für Job 1 erstellen. Die erste Klasse, die wir brauchen, ist die Hauptklasse, die wir gegen den Hadoop-Cluster laufen lassen können. Ich habe sie WikiPageRanking genannt. Sie wird später alle Jobs enthalten, aber im Moment enthält sie nur den ersten Job.
Note: you can view and fetch the source from github here abij/hadoop-wiki-pageranking
[java title="WikiPageRanking.java"]
public class WikiPageRanking {
public static void main(String[] args) throws Exception {
WikiPageRanking pageRanking = new WikiPageRanking();
//In and Out dirs in HDFS
pageRanking.runXmlParsing("wiki/in", "wiki/ranking/iter00");
}
public void runXmlParsing(String inputPath, String outputPath) throws IOException {
JobConf conf = new JobConf(WikiPageRanking.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
// Mahout class to Parse XML + config
conf.setInputFormat(XmlInputFormat.class);
conf.set(XmlInputFormat.START_TAG_KEY, "<page>");
conf.set(XmlInputFormat.END_TAG_KEY, "</page>");
// Our class to parse links from content.
conf.setMapperClass(WikiPageLinksMapper.class);
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
// Our class to create initial output
conf.setReducerClass(WikiLinksReducer.class);
JobClient.runJob(conf);
}
[/java]
Die Hauptklasse, die gegen Ihren Hadoop-Cluster laufen kann. Wir werden später weitere Jobs hinzufügen. Sie können Ihren Code (Mapper und Reducer) debuggen, wenn Sie das Programm als Debug As...
starten. Die normale InputFormat-Klasse ist das TextInputFormat, das Zeile für Zeile als Werte für die Map liest. Wir möchten, dass Teile der gesamten Xml als Eingabe dienen. Ich habe mich entschieden, das Mahout XmlInputFormat zu verwenden, um eine schöne Eingabe für die Mapper-Schnittstelle zu erhalten. Es zerlegt die Xml in kleine Teile innerhalb des angegebenen Start- und End-Tags
[java title="WikiPageLinksMapper.java" 1="[34-113" 2="2="2="2="2="2="2="2="2="language="class="highlight:""""""""""]"]
public class WikiPageLinksMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private static final Pattern wikiLinksPattern = Pattern.compile("\[.+?\]");
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
// Returns String[0] = <title>[TITLE]</title>
// String[1] = <text>[CONTENT]</text>
// !! without the <tags>.
String[] titleAndText = parseTitleAndText(value);
String pageString = titleAndText[0];
Text page = new Text(pageString.replace(' ', '_'));
Matcher matcher = wikiLinksPattern.matcher(titleAndText[1]);
//Loop through the matched links in [CONTENT]
while (matcher.find()) {
String otherPage = matcher.group();
//Filter only wiki pages.
//- some have [[realPage|linkName]], some single [realPage]
//- some link to files or external pages.
//- some link to paragraphs into other pages.
otherPage = getWikiPageFromLink(otherPage);
if(otherPage == null || otherPage.isEmpty())
continue;
// add valid otherPages to the map.
output.collect(page, new Text(otherPage));
}
}
//... the impl of parsePageAndText(..)
//... the impl of getWikiPageFromLink(..)
}
}
[/java]
Die Mapperklasse, die die XML-Brocken in die Tupel key page und value outLinks zerlegt. Bei dieser Implementierung werden alle Links zur Map hinzugefügt, auch wenn sie mehrfach auf der Seite erscheinen.
[java title="WikiLinksReducer.java"]
public class WikiLinksReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String pagerank = "1.0t";
boolean first = true;
while(values.hasNext()){
if(!first) pagerank += ",";
pagerank += values.next().toString();
first = false;
}
output.collect(key, new Text(pagerank));
}
}
[/java]
Die Klasse reducer, die die Seite mit dem anfänglichen PageRank und den ausgehenden Links speichert. Dieses Ausgabeformat wird als Eingabeformat für den nächsten Auftrag verwendet. KeyrankCommaSeparatedList-of-linksOtherPages
Hilversum 1.0 Country,Netherlands,Province,North_Holland,Mayor,Democrats_66,A...
Besorgen Sie sich eine größere Datei! Das
Hadoop Job 2: Berechnen des neuen Page Rank
Dieser Job berechnet das neue Ranking und erzeugt das gleiche Ausgabeformat wie die Eingabe, so dass dieser Job mehrfach ausgeführt werden kann. Wir werden diesen Auftrag nach Auftrag 1 ausführen. Der PageRank wird nach mehreren Durchläufen genauer, daher werden wir den Job einige Male ausführen.
Mapper
Dieser Job hat seine eigenen Mapper- und Reducer-Klassen:
Beispiel-Eingabe:
Seite_A 1.0 Seite_B 1.0 Seite_A Seite_C 1.0 Seite_A,Seite_D
[java title="Mapper"]
public class RankCalculateMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
int pageTabIndex = value.find("t");
int rankTabIndex = value.find("t", pageTabIndex+1);
String page = Text.decode(value.getBytes(), 0, pageTabIndex);
String pageWithRank = Text.decode(value.getBytes(), 0, rankTabIndex+1);
// Mark page as an Existing page (ignore red wiki-links)
output.collect(new Text(page), new Text("!"));
// Skip pages with no links.
if(rankTabIndex == -1) return;
String links = Text.decode(value.getBytes(), rankTabIndex+1, value.getLength()-(rankTabIndex+1));
String[] allOtherPages = links.split(",");
int totalLinks = allOtherPages.length;
for (String otherPage : allOtherPages){
Text pageRankTotalLinks = new Text(pageWithRank + totalLinks);
output.collect(new Text(otherPage), pageRankTotalLinks);
}
// Put the original links of the page for the reduce output
output.collect(new Text(page), new Text("|"+links));
}
}
[/java]
Einige Links verweisen auf Wikipages, die (noch) nicht existieren. Im Browser sehen Sie sie als rote Links. Im Ergebnis möchte ich die nicht existierenden Seiten überspringen. Ich habe mich dafür entschieden, die Seite mit einer Erklärungsmarkierung zu versehen, um anzuzeigen, dass es sich bei dieser Seite um eine echte Wikiseite handelt. Die reducer-Klasse wird nur diese Seiten verwenden, um die Ausgabe zu erzeugen.
Für jeden Link gibt es eine Ausgabe mit den kombinierten Werten page, rank und totalLink.
Die letzte Ausgabe des Mappers ist die Seite und die ursprünglichen Links. Wir brauchen den Link, damit der Reducer die richtige Ausgabe erzeugen kann.
Beispiel-Ausgabe:
Page_A ! Page_C |Page_A Page_B ! Page_B |Page_A Page_A Page_B 1.0 1 Page_C ! Page_A Page_C 1.0 2 Page_D Page_C 1.0 2
Recuder
Der Reducer erhält den Schlüssel und die nach dem Schlüssel geordneten Werte. In einer verteilten Umgebung wird die Map in Scheiben geschnitten und alle Knoten erhalten einen Anteil. Der Reducer berechnet den neuen pageRank und gibt ihn für die vorhandenen Seiten mit den ursprünglichen Links aus.
Beispieleingabe (sortiert nach Schlüssel):
Page_A ! Page_A Page_C 1.0 2 Page_A Page_B 1.0 1 Page_B ! Page_B |Page_A Page_C ! Page_C |Page_A Page_D Page_C 1.0 2
[java title="Reducer"]
public class RankCalculateReduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
private static final float damping = 0.85F;
@Override
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> out, Reporter reporter) throws IOException {
boolean isExistingWikiPage = false;
String[] split;
float sumShareOtherPageRanks = 0;
String links = "";
String pageWithRank;
// For each otherPage:
// - check control characters
// - calculate pageRank share <rank> / count(<links>)
// - add the share to sumShareOtherPageRanks
while(values.hasNext()){
pageWithRank = values.next().toString();
if(pageWithRank.equals("!")) {
isExistingWikiPage = true;
continue;
}
if(pageWithRank.startsWith("|")){
links = "t"+pageWithRank.substring(1);
continue;
}
split = pageWithRank.split("\t");
float pageRank = Float.valueOf(split[0]);
int countOutLinks = Integer.valueOf(split[1]);
sumShareOtherPageRanks += (pageRank/countOutLinks);
}
if(!isExistingWikiPage) return;
float newRank = damping * sumShareOtherPageRanks + (1-damping);
out.collect(key, new Text(newRank + links));
}
}
[/java]
Die Ausgabe des Reduzierers enthält den neuen PageRank für die vorhandenen Seiten mit den Links auf diesen Seiten.
sample output:
Page_A 1.425
Page_B 0.15 Page_A
Page_C 0.15 Page_A,Page_D
Wir müssen die Hauptklasse so konfigurieren, dass der neue Auftrag ein paar Mal nach dem Xml-Parsing-Auftrag ausgeführt wird. Den letzten Job habe ich vorerst auskommentiert, wir werden ihn im nächsten Absatz erstellen.
[java]
public class WikiPageRanking {
private static NumberFormat nf = new DecimalFormat("00");
public static void main(String[] args) throws Exception {
WikiPageRanking pageRanking = new WikiPageRanking();
//Job 1: Parse XML
pageRanking.runXmlParsing("wiki/in", "wiki/ranking/iter00");
int runs = 0;
for (; runs < 5; runs++) {
//Job 2: Calculate new rank
pageRanking.runRankCalculation("wiki/ranking/iter"+nf.format(runs), "wiki/ranking/iter"+nf.format(runs + 1));
}
//Job 3: Order by rank
//pageRanking.runRankOrdering("wiki/ranking/iter"+nf.format(runs), "wiki/result");
}
public void runXmlParsing(String inputPath, String outputPath) throws IOException {
JobConf conf = new JobConf(WikiPageRanking.class);
conf.set(XmlInputFormat.START_TAG_KEY, "<page>");
conf.set(XmlInputFormat.END_TAG_KEY, "</page>");
// Input / Mapper
FileInputFormat.setInputPaths(conf, new Path(inputPath));
conf.setInputFormat(XmlInputFormat.class);
conf.setMapperClass(WikiPageLinksMapper.class);
// Output / Reducer
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setReducerClass(WikiLinksReducer.class);
JobClient.runJob(conf);
}
private void runRankCalculation(String inputPath, String outputPath) throws IOException {
JobConf conf = new JobConf(WikiPageRanking.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setMapperClass(RankCalculateMapper.class);
conf.setReducerClass(RankCalculateReduce.class);
JobClient.runJob(conf);
}
/*
private void runRankOrdering(String inputPath, String outputPath) throws IOException {
JobConf conf = new JobConf(WikiPageRanking.class);
conf.setOutputKeyClass(FloatWritable.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setMapperClass(RankingMapper.class);
JobClient.runJob(conf);
}
*/
}
[/java]
Ich habe eine Schleife um die Ausführung von Job 2 hinzugefügt. Beim ersten Durchlauf wird die Eingabe aus wiki/ranking/iter00 übernommen und die Ausgabe in wiki/ranking/iter01 erstellt. Für den nächsten Durchlauf wird das Verzeichnis iter01 als Eingabequelle verwendet. Wenn die Schleife beendet ist, erhält Job 3 das letzte iterXX-Verzeichnis als Eingabe für den letzten Job, die Bestellung.
Job 3: Letzte Ausführung von PageRank bestellen
Dies ist ein einfacher Auftrag, der die Eingabe verwendet, um die Seite und den Rang zu erhalten. Und er ordnet den Schlüssel: Rang dem Wert: Seite zu. Hadoop übernimmt die Sortierung nach Schlüssel für uns. Wir brauchen keinen Reducer zu implementieren. Der Mapper und die Sortierung reichen für unser Ergebnis, die geordnete Liste, aus.
Beispiel-Eingabe:
Seite_A 1.425 Seite_B 0.15 Seite_A Seite_C 0.15 Seite_A,Seite_D
[java]
public class RankingMapper extends MapReduceBase implements Mapper<LongWritable, Text, FloatWritable, Text> {
@Override
public void map(LongWritable key, Text value, OutputCollector<FloatWritable, Text> output, Reporter arg3) throws IOException {
String[] pageAndRank = getPageAndRank(key, value);
float parseFloat = Float.parseFloat(pageAndRank[1]);
Text page = new Text(pageAndRank[0]);
FloatWritable rank = new FloatWritable(parseFloat);
output.collect(rank, page);
}
private String[] getPageAndRank(LongWritable key, Text value) throws CharacterCodingException {
String[] pageAndRank = new String[2];
int tabPageIndex = value.find("t");
int tabRankIndex = value.find("t", tabPageIndex + 1);
// no tab after rank (when there are no links)
int end;
if (tabRankIndex == -1) {
end = value.getLength() - (tabPageIndex + 1);
} else {
end = tabRankIndex - (tabPageIndex + 1);
}
pageAndRank[0] = Text.decode(value.getBytes(), 0, tabPageIndex);
pageAndRank[1] = Text.decode(value.getBytes(), tabPageIndex + 1, end);
return pageAndRank;
}
}
[/java]
Die Sortierung auf dem Schlüssel ist aufsteigend. Am Ende steht also die Seite mit dem höchsten Rang. Vorzugsweise sollte der Auftrag absteigend sortiert werden. Für den Moment ist das Ergebnis geordnet und das ist gut genug. Jetzt können wir Job 3 in der Hauptklasse auskommentieren und alle Jobs zusammen mit dem großen Datensatz ausführen.
Beispiel-Ausgabe:
1.425 Seite_A 0.15 Seite_B 0.15 Seite_C
Ausführen des großen Datensatzes (1 Knoten)
Auf meinem Laptop habe ich eine virtuelle Maschine für die Hadoop-Einrichtung verwendet. Das Parsen der XML-Datei, das 5-fache Berechnen und das Ordnen dauerte insgesamt: Zeit: 15 Minuten Eingabedatei: ~2,3 Gb Jede Rangdatei: 238 Mb Ergebnisdatei: 22 Mb Ich werde Ihnen die Ergebnisse nicht vorenthalten, Sie sollten sie selbst sehen, nachdem Sie einige Daten verarbeitet haben! Es wäre schön, das Programm auf einem Cluster mit mehreren Knoten auszuführen und die Geschwindigkeit, den Lastausgleich und die Ausfallsicherung zu erleben. Das ist etwas für den nächsten Blog. Wenn ich einen größeren Cluster verwendet habe, werde ich den Beitrag aktualisieren. Sie können die Quelldateien auf github ansehen/herunterladen.
Verfasst von

Alexander Bij
Unsere Ideen
Weitere Blogs
Contact



