Q: “Where in the world was the situation in Egypt the hottest talk of the town?”
A: “People in UK / London were all over it, also the Middle East and US east coast cities show more interest than the rest of the world.”
Q: “How do you know?”
A: “Just take a couple hundred thousand Twitter messages containing ‘egypt’ and run them through a MapReduce job that counts the number of messages per location and plot that on a map like this:”
(geocommons.com/maps/49541)
The map shows the number of Twitter messages containing the word ‘egypt’ that originated from locations around the world. A larger circle means more messages from that place. The messages were gathered during a five hour period on january 28, the day after the Egyptian internet was crippled.
In this post I will present an example of what you can do using semi-structured data, readily available open source software, and about one hundred lines of hacked together code. This post has two parts, a management approved summary that contains no technical details and a technical part with code and explanation on how the map was created. If you are looking for the non-technical, important part that helps you understand why Hadoop and MapReduce matter, just stick to the paragraphs highlighted with a gray background. Equally, if you just want to know how the map was generated, skip the highlighted areas.
Getting data
To get all messages containing ‘egypt’, use the Twitter streaming API. It is the simplest API available; it requires just a simple HTTP POST and basic authentication (no OAuth required) and it will happily start streaming tweets. To stream tweets in JSON format to a file, just do this in a shell:
[bash]curl -d 'track=egypt' https://stream.twitter.com/1/statuses/filter.json \
-u username:password > tweets.json[/bash]
Substitute the username and password with those of any Twitter account. Leave this running for a few hours and when you think you have enough data (check the tweets.json file size), just stop it using CTRL+C.
Viewing data
The first thing that you want to do when working with a unknown dataset is just browsing through it a bit to get a feel for what is in there. In this case, it means we have to parse JSON and display it nicely. What better way to work with JSON than to use JavaScript? I use NodeJS for this kind of thing. Node doesn’t have a built in library for line by line reading of files, but I borrowed a working solution here. Using json2.js and the following lines of JavaScript, I get a list of all locations in the file.
[javascript]
require('./json2.js');
var flr = require('./FileLineReader.js');
var reader = flr.FileLineReader(process.argv[2], 10240);
while (reader.hasNextLine()) {
try {
var line = reader.nextLine();
var tweet = JSON.parse(line);
console.log(tweet.user.location);
} catch(err) {
console.log("Error: " + err);
}
}
[/javascript]
Because there is a large number of tweets in my file, I decided to create a smaller set of only 10000 messages just for browsing and looking:
[bash]head -n 10000 tweets.json > 10000-tweets.json[/bash]
With some piping through sort, uniq and grep, you can get a good idea about the location information provided in tweets.
Note: quite probably, I could have achieved the same using a combination of grep, awk and sed, but the JavaScript is nicer if you want to play around with different properties of the tweet and user objects. For example printing out the tweet object to console gives you a nicely formatted view of all fields and values. This way, I found out that sometimes the timezone field can also be useful and that geo_enabled == false does not mean that the location field does not have GPS coordinates.
Unfortunately the location field in Twitter is free form text, so something like ‘at work’ is a valid location for Twitter. Normally, when you deal with substantial amounts of data, you’d put everything in a database and query that database later on. However, with unstructured data, this is not always helpful. For the use case of generating the above map, we need to turn descriptions of locations into actual geo coordinates. For other use cases, you might need to have the actual text as it was entered by the user (e.g. to analyze location description by humans) or you could need to have all the locations as formatted text (e.g. Amsterdam, The Netherlands or New York City, NY, USA). Depending on what the use case is, you need to process the locations differently. Because of this, a database does not help; there is no way to know what to store in advance. You need to process the source data in a different way for each different query you have. So what would help is a way to process large amounts of data quickly. That’s what Hadoop MapReduce does.
Disambiguating Twitter locations Location information in tweets is of low quality. Most Twitter users do not use a geo-enabled device to send their messages (less than 10% in my sample set). Users will often provide some vague hint to their actual location in the location field. Unfortunately this field is free form text and people tend to use different qualifications for their physical locations. They fall roughly into three categories:
- Fully qualified descriptions of places on earth, like actual GPS coordinates or city names including state/region and country, e.g. ‘iphone:53.165418,6.781723’ and ‘New York City, NY, USA’.
- Ambiguous location information that could point to several places on earth. London or Amsterdam are examples of this. It might not be obvious, but there are more than five places called Amsterdam in the world. This is not counting New Amsterdam (several around the world) or Nieuw Amsterdam (Suriname).
- Totally useless information not denoting any geo location. I have seen ‘behind you!’, ‘between heaven and earth’ and ‘close to Justin Bieber’s heart’. Those are the worst.
Now what we need is something that can turn names of places into geo coordinates. And we need it to be able to tell that London, London UK and London Great Britain are the same thing. To achieve this, at the very least, we need a list of all the places in the world with their geo coordinates. I found that MaxMind publishes such a list here, which not only contains all cities but also the population for each city (if known), which could be interesting for disambiguation. The requirements for the location finder are roughly this:
- New York City, NY, USA should match NYC and nothing else.
- Amsterdam should match Amsterdam, NL and not any of the other places called Amsterdam around the world.
- Amsterdam, CA should match Amsterdam, California, USA and not Amsterdam NL.
- UK, United Kingdom, Great Britain should be the same country. Same for US, USA, United States and America.
- ‘Behind you’ and the like should not result in a match at all.
Because this is all basic text matching, I decided to use a Lucene based index to lookup locations. By far the easiest way to use Lucene is to just download SOLR and use that. I came up with the following SOLR schema for storing locations:
[xml] <field name="id" type="string" indexed="true" stored="true" required="true" /> <field name="location" type="text_ws" indexed="true" stored="true" multiValued="false" omitNorms="true" /> <field name="city" type="string" indexed="false" stored="true" multiValued="false" /> <field name="country" type="string" indexed="false" stored="true" multiValued="true" /> <field name="state" type="string" indexed="false" stored="true" multiValued="true" /> <field name="lat" type="string" indexed="false" stored="true" multiValued="false" /> <field name="lon" type="string" indexed="false" stored="true" multiValued="false" /> <field name="population" type="int" indexed="false" stored="true" multiValued="false" /> [/xml]
The location field contains everything there is to know about a place, so that includes the city’s name, all possible ways to name the country (e.g. UK, United Kingdom, Great Britain, GB), and for US cities also the state both abbreviated and the full name. This is the field I base the search on. The other fields are not indexed, because they’re only used for post-processing of search results. When querying SOLR for a location, I ask it for the first 200 results. Afterwards I apply some basic scoring to the results, using the following steps: for each document in the result set
if the city is present in the location, add 1 point
if the country is present in the location add 1 point
(US only) if the state is present in the location add 1 point
it this city has the largest population of the result set, add 0.5 point
If no results scored higher than 1.4, return nothing (not found). Otherwise use the result with the highest score. The threshold is required, because otherwise locations like ‘behind you!’ will match one of the many cities containing ‘you’ that exist in Vietnam. This logic is the simplest thing that will work. Further tweaking and refinement will give better results.
When working with substantial volumes of data, one approach is to get hold of a very fast, very heavy machine and use that. This is called scaling up. However, more recently, a lot of smart people are doing things differently, using standard commodity hardware and just using more than one machine when more capacity is required. This is called scaling out. MapReduce is all about scaling out. This has numerous benefits:
- You do not have to know all about future capacity requirements when purchasing hardware.
- You can just add machines later on, when required.
- Commodity hardware is widely available from a lot of suppliers, so pricing is always competitive and supplies are usually in stock.
- By using many machines instead of one, there is no single point of failure.
- Much more…
With Hadoop and MapReduce, you can deploy scale out scenarios using only open source and standard hardware. Above all, you can also easily deploy the same software to cloud based machines (like Amazon EC2) which is handy for short term usage or one time data crunching efforts.
Using cloud based machines, I can temporarily setup a cluster that is capable of processing gigabytes of tweets (or other data) in minutes and just pay for the time I actually use the machines and nothing more. Setting up a cluster to do this takes me less than an hour. This provides interesting and real opportunities that didn’t exist before.
Because the amount of data you get from Twitter can be substantial, I chose to do the processing work as a MapReduce job using Hadoop. This will also allow me to use the same code later on, when I download even greater amounts of tweets (which I plan to do). I will not discuss Hadoop cluster setup here, as there are a lot of great resources on this subject elsewhere. Here I will just present the actual code used to process the tweets. Creating and running a MapReduce job in Java is quite easy. What you need is a mapper, a reducer and a main class that creates, configures and submits the job. In my case, the mapper is the one doing most of the work:
[java]
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String location = (String) ((Map<String, Object>) ((Map<String, Object>)
new ObjectMapper()
.readValue(value.toString(), Map.class))
.get("user")).get("location");
String[] coords = finder.getLatLon(location.toLowerCase().replaceAll("[\\[\\]\",.;:~`?-_=+|!@#$%^&*()]", " "));
context.write(new Text(coords[0] + "," + coords[1]), new LongWritable(1L));
} catch(Throwable t) {
log.error("ERROR in mapper.", t);
context.getCounter(Counter.TWEET_ERRORS).increment(1L);
}
}
[/java]
The reducer side of the job only has to do the counting. This is very similar to the widely used word counting MapReduce example.
[java]
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
[/java]
Finally, the JobRunner is required to do the boiler plate of setting up the job and submitting it to the cluster.
[java]
public class JobRunner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// Standard boilerplate for creating and running a hadoop job
Configuration config = getConf();
Job job = new Job(config, "Tweet location counter job");
job.setJarByClass(this.getClass());
String input = args[0];
String output = args[1];
job.setJobName("Get tweets from " + input);
// Input is just text files in HDFS
TextInputFormat.setInputPaths(job, new Path(input));
job.setMapperClass(TweetLocationMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(LocationCountingReducer.class);
job.setCombinerClass(LocationCountingReducer.class);
// Output is to the table output format, and we set the table we want
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
try {
int result = ToolRunner.run(new Configuration(), new JobRunner(), args);
System.exit(result);
} catch(Throwable t) {
System.err.println("ERROR: " + t.getMessage());
t.printStackTrace(System.err);
System.exit(1);
}
}
}
[/java]
As you can see, I use the reducer class also as combiner for the job. It is often advisable to use a combiner in your MapReduce jobs. It will decrease the amount of intermediate data that has to be stored and moved over the network between nodes. Often the reducer can double as combiner, so it is easy to add. To run the job, you need a .jar file with your classes and all dependencies. Since my project is Maven based, I can easily create this .jar using the assembly plugin. Just put this in the pom.xml:
[xml]
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.xebia.twitstuff.mapreduce.JobRunner</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
[/xml]
And then do:
[bash]Ӭmvn clean install assembly:assembly[/bash]
Now, to run the job, first copy the data onto your cluster:
[bash]hadoop fs -put tweets.json data/tweets.json[/bash]
And then run the job:
[bash]hadoop jar twitstuff-with-deps.jar data/tweets.json output/tweet-counts[/bash]
Creating the map To plot the counts per location on a map I used geocommons. It is a Flash based mapping tool that allows you to upload datasets in a number of formats but you can also use existing datasets uploaded by others. Because the MapReduce job does only counting in the reducer (not a very heavy operation) and there is a combiner in place, I could set the number of reducers to 1. This means that there is only one output file coming from the job, which will contain all unique locations and a count. I used the TextOutputFormat for this job, so the output will be a tab delimited text file. Geocommons is a bit clever about data that you upload, so only a little bit of tweaking is required. I needed to take the tab delimited file and change it to CSV and I needed to add a header line on top of the file, naming the fields in the CSV. I named them lat, lon and count. So the resulting file looks like this:
[code]
lat,lon,count
-0.0333333,109.3333333,7
-0.1,109.35,2
-0.1,34.75,1
-0.2166667,-78.5,17
-0.2833333,36.0666667,1
-0.3666667,-52.15,43
etc...
[/code]
When you upload this to geocommons, it will automatically figure out that there is latitude and longitude information in the file and let you use that for plotting the counts on a map. The rest is just a wizard and it will just work (hint: do some tweaking with the ranges and increase the number of levels for the heat map from 5 to 7). That is all it takes to create the map. All code is available at github.com/friso/twitstuff. (Note: the code that I wrote to do this is of poor quality, but it works and demonstrates the statements in this post. It is a quick hack. The repo does not include my tweets data and the world cities database from MaxMind but you can easily get those yourself).
Working with datasets is fun and easy. With a limited amount of code and some cleverness you can plot things on maps, extract averages, aggregates, counts, locations and trends from millions or billions of records. You can do this without buying supercomputers or spending big on infrastructure; a credit card and an internet connection will get you hours of using a cluster heavy enough to process gigabytes in minutes for roughly the price of a decent restaurant meal for two.
So, if you have questions that your database cannot answer and you have access to data that contains the answer, MapReduce it and find out!
Disclaimer
The data used for this little experiment is incomplete. It covers only a five hour period, so it cannot say anything about the actual distribution of messages about Egypt. This is probably why the US West coast looks all quiet: everyone over there was still asleep when I ran the data collection. This post is an example to show how easy it is to work with data and is not meant to represent a real trending analysis.