F: "Wo auf der Welt war die Situation in Ägypten das heißeste Stadtgespräch?" A: "Die Menschen in Großbritannien / London waren ganz aus dem Häuschen, auch der Nahe Osten und die Städte an der Ostküste der USA zeigten mehr Interesse als der Rest der Welt." F: "Woher wissen Sie das?" A: "Nehmen Sie einfach ein paar hunderttausend Twitter-Nachrichten, die 'Ägypten' enthalten, und lassen Sie sie durch einen MapReduce-Job laufen, der die Anzahl der Nachrichten pro Ort zählt, und stellen Sie das auf einer Karte wie dieser dar:"
(geocommons.com/maps/49541) Die Karte zeigt die Anzahl der Twitter-Nachrichten, die das Wort "Ägypten" enthalten und von Orten auf der ganzen Welt stammen. Ein größerer Kreis bedeutet mehr Nachrichten von diesem Ort. Die Nachrichten wurden in einem Zeitraum von fünf Stunden am 28. Januar gesammelt, dem Tag, nachdem das ägyptische Internet lahmgelegt worden war.
In diesem Beitrag zeige ich Ihnen ein Beispiel dafür, was Sie mit halbstrukturierten Daten, leicht verfügbarer Open-Source-Software und etwa einhundert Zeilen zusammengehacktem Code machen können. Dieser Beitrag besteht aus zwei Teilen, einer vom Management genehmigten Zusammenfassung, die keine technischen Details enthält, und einem technischen Teil mit Code und Erläuterungen, wie die Karte erstellt wurde. Wenn Sie den nicht-technischen, wichtigen Teil suchen, der Ihnen hilft zu verstehen, warum Hadoop und MapReduce wichtig sind, halten Sie sich einfach an die grau hinterlegten Absätze. Wenn Sie nur wissen wollen, wie die Map erstellt wurde, können Sie die hervorgehobenen Bereiche auch weglassen. Abrufen von Daten Um alle Nachrichten, die 'Ägypten' enthalten, abzurufen, verwenden Sie die Twitter Streaming API. Es handelt sich dabei um die einfachste verfügbare API. Sie benötigt nur ein einfaches HTTP POST und eine einfache Authentifizierung (kein OAuth erforderlich) und schon beginnt sie mit dem Streaming von Tweets. Um Tweets im JSON-Format in eine Datei zu streamen, tun Sie dies einfach in einer Shell:
[bash]curl -d 'track=egypt' https://stream.twitter.com/1/statuses/filter.json
-u username:password > tweets.json[/bash]
Ersetzen Sie den Benutzernamen und das Passwort durch die eines beliebigen Twitter-Kontos. Lassen Sie das Programm einige Stunden lang laufen und wenn Sie glauben, dass Sie genug Daten haben (überprüfen Sie die Größe der tweets.json Datei), stoppen Sie es einfach mit STRG+C. Anzeigen von Daten Wenn Sie mit einem unbekannten Datensatz arbeiten, sollten Sie ihn zunächst ein wenig durchstöbern, um ein Gefühl dafür zu bekommen, was darin enthalten ist. In diesem Fall bedeutet das, dass wir JSON parsen und es ansprechend darstellen müssen. Gibt es einen besseren Weg, mit JSON zu arbeiten, als JavaScript zu verwenden? Ich verwende NodeJS für diese Art von Aufgaben. Node hat keine eingebaute Bibliothek für das zeilenweise Lesen von Dateien, aber ich habe mir hier eine funktionierende Lösung ausgeliehen. Mit json2.js und den folgenden Zeilen von JavaScript erhalte ich eine Liste aller Stellen in der Datei.
[javascript]
require('./json2.js');
var flr = require('./FileLineReader.js');
var reader = flr.FileLineReader(process.argv[2], 10240);
while (reader.hasNextLine()) {
try {
var line = reader.nextLine();
var tweet = JSON.parse(line);
console.log(tweet.user.location);
} catch(err) {
console.log("Error: " + err);
}
}
[/javascript]
Da sich in meiner Datei eine große Anzahl von Tweets befindet, habe ich beschlossen, eine kleinere Menge von nur 10000 Nachrichten zu erstellen, um sie zu durchsuchen:
[bash]head -n 10000 tweets.json > 10000-tweets.json[/bash]
Mit etwas Piping durch sort, uniq und grep können Sie sich einen guten Überblick über die in Tweets enthaltenen Standortinformationen verschaffen. Hinweis: Wahrscheinlich hätte ich dasselbe mit einer Kombination aus grep, awk und sed erreichen können, aber das JavaScript ist schöner, wenn Sie mit verschiedenen Eigenschaften der Tweet- und Benutzerobjekte herumspielen wollen. Wenn Sie zum Beispiel das Tweet-Objekt auf der Konsole ausgeben, erhalten Sie eine schön formatierte Ansicht aller Felder und Werte. Auf diese Weise habe ich herausgefunden, dass manchmal auch das Zeitzonenfeld nützlich sein kann und dass geo_enabled == false nicht bedeutet, dass das Ortsfeld keine GPS-Koordinaten hat.
Disambiguierung von Twitter-Standorten Standortinformationen in Tweets sind von geringer Qualität. Die meisten Twitter-Nutzer verwenden kein geo-fähiges Gerät, um ihre Nachrichten zu versenden (weniger als 10 % in meiner Stichprobe). Die Nutzer geben oft einen vagen Hinweis auf ihren tatsächlichen Standort in das Standortfeld ein. Leider ist dieses Feld ein Freitextfeld, und die Nutzer neigen dazu, unterschiedliche Qualifikationen für ihren physischen Standort zu verwenden. Sie lassen sich grob in drei Kategorien einteilen:
- Vollständig qualifizierte Beschreibungen von Orten auf der Erde, wie tatsächliche GPS-Koordinaten oder Städtenamen einschließlich Staat/Region und Land, z.B. 'iphone:53.165418,6.781723' und 'New York City, NY, USA'.
- Mehrdeutige Ortsangaben, die auf verschiedene Orte auf der Erde hinweisen könnten. London oder Amsterdam sind Beispiele dafür. Es ist vielleicht nicht offensichtlich, aber es gibt mehr als fünf Orte auf der Welt, die Amsterdam heißen. Dabei sind New Amsterdam (mehrere auf der ganzen Welt) und Nieuw Amsterdam (Surinam) nicht mitgezählt.
- Völlig nutzlose Informationen, die keinen geografischen Standort angeben. Ich habe 'hinter Ihnen!', 'zwischen Himmel und Erde' und 'in der Nähe von Justin Biebers Herz' gesehen. Das sind die schlimmsten.
Was wir jetzt brauchen, ist etwas, das Ortsnamen in Geokoordinaten umwandeln kann. Und wir brauchen etwas, das in der Lage ist, London, London UK und London Great Britain als dasselbe zu erkennen. Um dies zu erreichen, brauchen wir zumindest eine Liste aller Orte auf der Welt mit ihren Geokoordinaten. Ich habe herausgefunden, dass MaxMind eine solche Liste veröffentlicht hierdie nicht nur alle Städte enthält, sondern auch die Einwohnerzahl für jede Stadt (falls bekannt), was für die Disambiguierung interessant sein könnte. Die Anforderungen an den Standortfinder sind in etwa so:
- New York City, NY, USA sollte zu NYC passen und zu nichts anderem.
- Amsterdam sollte mit Amsterdam, NL übereinstimmen und nicht mit einem der anderen Orte auf der Welt, die Amsterdam heißen.
- Amsterdam, CA sollte Amsterdam, Kalifornien, USA entsprechen und nicht Amsterdam NL.
- UK, Vereinigtes Königreich, Großbritannien sollte das gleiche Land sein. Dasselbe gilt für US, USA, Vereinigte Staaten und Amerika.
- 'Hinter Ihnen' und dergleichen sollten überhaupt nicht zu einem Treffer führen.
Da es sich hier um einen einfachen Textabgleich handelt, habe ich beschlossen, eine Lucene basierten Index, um nach Orten zu suchen. Der bei weitem einfachste Weg, Lucene zu verwenden, ist der Download SOLR und verwenden Sie das. Ich habe mir das folgende SOLR-Schema für die Speicherung von Orten ausgedacht:
[xml] <field name="id" type="string" indexed="true" stored="true" required="true" /> <field name="location" type="text_ws" indexed="true" stored="true" multiValued="false" omitNorms="true" /> <field name="city" type="string" indexed="false" stored="true" multiValued="false" /> <field name="country" type="string" indexed="false" stored="true" multiValued="true" /> <field name="state" type="string" indexed="false" stored="true" multiValued="true" /> <field name="lat" type="string" indexed="false" stored="true" multiValued="false" /> <field name="lon" type="string" indexed="false" stored="true" multiValued="false" /> <field name="population" type="int" indexed="false" stored="true" multiValued="false" /> [/xml]
Das Ortsfeld enthält alles, was man über einen Ort wissen muss, also auch den Namen der Stadt, alle möglichen Bezeichnungen für das Land (z.B. UK, Vereinigtes Königreich, Großbritannien, GB) und für US-Städte auch den Bundesstaat, sowohl abgekürzt als auch mit vollem Namen. Dies ist das Feld, auf das ich die Suche stütze. Die anderen Felder werden nicht indiziert, da sie nur für die Nachbearbeitung der Suchergebnisse verwendet werden. Wenn ich SOLR nach einem Ort abfrage, frage ich nach den ersten 200 Ergebnissen. Anschließend wende ich auf die Ergebnisse eine grundlegende Bewertung an, indem ich folgende Schritte durchführe: für jedes Dokument in der Ergebnismenge
wenn die Stadt an dem Ort vorhanden ist, fügen Sie 1 Punkt hinzu wenn das Land an dem Ort vorhanden ist, fügen Sie 1 Punkt hinzu (nur USA) wenn der Staat an dem Ort vorhanden ist, fügen Sie 1 Punkt hinzu wenn diese Stadt die größte Einwohnerzahl der Ergebnismenge hat, fügen Sie 0,5 Punkte hinzu
Wenn kein Ergebnis höher als 1,4 ist, geben Sie nichts zurück (nicht gefunden). Andernfalls verwenden Sie das Ergebnis mit der höchsten Punktzahl. Der Schwellenwert ist erforderlich, da sonst Orte wie 'hinter Ihnen!' mit einer der vielen Städte in Vietnam übereinstimmen, die 'Sie' enthalten. Diese Logik ist die einfachste, die funktionieren wird. Weitere Optimierungen und Verfeinerungen werden zu besseren Ergebnissen führen.
- Sie müssen beim Kauf von Hardware nicht alles über zukünftige Kapazitätsanforderungen wissen.
- Sie können später bei Bedarf einfach Maschinen hinzufügen.
- Commodity-Hardware ist bei vielen Anbietern erhältlich, so dass die Preise immer wettbewerbsfähig sind und das Material in der Regel auf Lager ist.
- Durch den Einsatz vieler Maschinen anstelle einer einzigen gibt es keinen einzigen Ausfallpunkt.
- Viel mehr...
Mit Hadoop und MapReduce können Sie Scale-Out-Szenarien nur mit Open Source und Standard-Hardware umsetzen. Vor allem können Sie dieselbe Software auch problemlos auf Cloud-basierten Maschinen (wie Amazon EC2) einsetzen, was für die kurzfristige Nutzung oder einmalige Datenverarbeitung sehr praktisch ist. Mit Cloud-basierten Maschinen kann ich vorübergehend einen Cluster einrichten, der Gigabytes von Tweets (oder anderen Daten) in wenigen Minuten verarbeiten kann, und ich bezahle nur für die Zeit, in der ich die Maschinen tatsächlich nutze, und nichts weiter. Für die Einrichtung eines solchen Clusters benötige ich weniger als eine Stunde. Dies bietet interessante und reale Möglichkeiten, die es vorher nicht gab.
Da die Datenmengen, die Sie von Twitter erhalten, beträchtlich sein können, habe ich mich entschieden, die Verarbeitung als MapReduce-Job mit Hadoop durchzuführen. So kann ich denselben Code auch später noch verwenden, wenn ich noch größere Mengen an Tweets herunterlade (was ich vorhabe). Ich werde hier nicht auf die Einrichtung von Hadoop-Clustern eingehen, da es zu diesem Thema an anderer Stelle eine Menge großartiger Ressourcen gibt. Ich werde hier nur den eigentlichen Code vorstellen, der für die Verarbeitung der Tweets verwendet wird. Das Erstellen und Ausführen eines MapReduce-Auftrags in Java ist recht einfach. Sie benötigen einen Mapper, einen Reducer und eine Hauptklasse, die den Auftrag erstellt, konfiguriert und absendet. In meinem Fall ist der Mapper derjenige, der die meiste Arbeit macht:
[java]
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String location = (String) ((Map<String, Object>) ((Map<String, Object>)
new ObjectMapper()
.readValue(value.toString(), Map.class))
.get("user")).get("location");
String[] coords = finder.getLatLon(location.toLowerCase().replaceAll("[\[\]",.;:~`?-_=+|!@#$%^&*()]", " "));
context.write(new Text(coords[0] + "," + coords[1]), new LongWritable(1L));
} catch(Throwable t) {
log.error("ERROR in mapper.", t);
context.getCounter(Counter.TWEET_ERRORS).increment(1L);
}
}
[/java]
Die Reduziererseite des Auftrags muss nur die Zählung vornehmen. Dies ist dem weit verbreiteten MapReduce-Beispiel der Wortzählung sehr ähnlich.
[java]
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
[/java]
Schließlich wird der JobRunner benötigt, um den Job einzurichten und ihn an den Cluster zu übermitteln.
[java]
public class JobRunner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// Standard boilerplate for creating and running a hadoop job
Configuration config = getConf();
Job job = new Job(config, "Tweet location counter job");
job.setJarByClass(this.getClass());
String input = args[0];
String output = args[1];
job.setJobName("Get tweets from " + input);
// Input is just text files in HDFS
TextInputFormat.setInputPaths(job, new Path(input));
job.setMapperClass(TweetLocationMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(LocationCountingReducer.class);
job.setCombinerClass(LocationCountingReducer.class);
// Output is to the table output format, and we set the table we want
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
try {
int result = ToolRunner.run(new Configuration(), new JobRunner(), args);
System.exit(result);
} catch(Throwable t) {
System.err.println("ERROR: " + t.getMessage());
t.printStackTrace(System.err);
System.exit(1);
}
}
}
[/java]
Wie Sie sehen können, verwende ich die Klasse reducer auch als Combiner für den Auftrag. Es ist oft ratsam, einen Combiner in Ihren MapReduce-Aufträgen zu verwenden. Dadurch wird die Menge der Zwischendaten, die gespeichert und über das Netzwerk zwischen den Knoten verschoben werden müssen, verringert. Oft kann der Reducer auch als Combiner fungieren, so dass er leicht hinzugefügt werden kann. Um den Auftrag auszuführen, benötigen Sie eine .jar-Datei mit Ihren Klassen und allen Abhängigkeiten. Da mein Projekt auf Maven basiert, kann ich diese .jar-Datei ganz einfach mit dem Assembly-Plugin erstellen. Fügen Sie dies einfach in die pom.xml ein:
[xml]
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.xebia.twitstuff.mapreduce.JobRunner</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
[/xml]
Und dann tun:
[bash]Ӭmvn clean install assembly:assembly[/bash]
Um den Auftrag auszuführen, kopieren Sie zunächst die Daten auf Ihren Cluster:
[bash]hadoop fs -put tweets.json data/tweets.json[/bash]
Und dann führen Sie den Auftrag aus:
[bash]hadoop jar twitstuff-with-deps.jar data/tweets.json output/tweet-counts[/bash]
Erstellen der Karte Um die Zählungen pro Ort auf einer Karte darzustellen, habe ich geocommons verwendet. Dabei handelt es sich um ein Flash-basiertes Mapping-Tool, mit dem Sie Datensätze in einer Reihe von Formaten hochladen können. Sie können aber auch vorhandene Datensätze verwenden, die von anderen hochgeladen wurden. Da der MapReduce-Auftrag nur die Zählung im Reducer vornimmt (keine sehr schwere Operation) und ein Combiner vorhanden ist, konnte ich die Anzahl der Reducer auf 1 setzen. Das bedeutet, dass der Auftrag nur eine Ausgabedatei erzeugt, die alle eindeutigen Orte und eine Zählung enthält. Ich habe für diesen Auftrag das TextOutputFormat verwendet, so dass die Ausgabe eine tabulatorgetrennte Textdatei sein wird. Geocommons ist bei den Daten, die Sie hochladen, ein wenig clever, so dass nur eine kleine Anpassung erforderlich ist. Ich musste die tabulatorgetrennte Datei in eine CSV-Datei umwandeln und der Datei eine Kopfzeile hinzufügen, in der die Felder in der CSV-Datei benannt werden. Ich habe sie lat, lon und count genannt. Die resultierende Datei sieht also wie folgt aus:
[code]
lat,lon,count
-0.0333333,109.3333333,7
-0.1,109.35,2
-0.1,34.75,1
-0.2166667,-78.5,17
-0.2833333,36.0666667,1
-0.3666667,-52.15,43
etc...
[/code]
Wenn Sie diese Datei auf geocommons hochladen, wird automatisch erkannt, dass die Datei Längen- und Breitengradinformationen enthält, und Sie können diese für die Darstellung der Zählungen auf einer Karte verwenden. Der Rest ist nur ein Assistent und es wird einfach funktionieren (Tipp: Ändern Sie die Bereiche und erhöhen Sie die Anzahl der Ebenen für die Heatmap von 5 auf 7). Das ist alles, was Sie brauchen, um die Karte zu erstellen. Der gesamte Code ist unter github.com/friso/twitstuff verfügbar. (Hinweis: Der Code, den ich dafür geschrieben habe, ist von schlechter Qualität, aber er funktioniert und demonstriert die Aussagen in diesem Beitrag. Es ist ein schneller Hack. Das Repo enthält nicht meine Tweets-Daten und die Weltstädte-Datenbank von MaxMind, aber die können Sie sich leicht selbst besorgen).
Wenn Sie also Fragen haben, die Ihre Datenbank nicht beantworten kann, und Sie Zugang zu Daten haben, die die Antwort enthalten, dann finden Sie sie mit MapReduce heraus! Haftungsausschluss Die für dieses kleine Experiment verwendeten Daten sind unvollständig. Sie decken nur einen Zeitraum von fünf Stunden ab, können also nichts über die tatsächliche Verteilung der Nachrichten über Ägypten aussagen. Das ist wahrscheinlich der Grund, warum die Westküste der USA so ruhig aussieht: Dort haben alle noch geschlafen, als ich die Datenerfassung durchgeführt habe. Dieser Beitrag ist ein Beispiel, um zu zeigen, wie einfach es ist, mit Daten zu arbeiten, und soll keine echte Trendanalyse darstellen.
Verfasst von
Friso van Vollenhoven
Unsere Ideen
Weitere Blogs
Contact



