Data | Data Science and AI
White paper: The big data revolution Friso van Vollenhoven 18 Apr, 2011
Disambiguating Twitter locations Location information in tweets is of low quality. Most Twitter users do not use a geo-enabled device to send their messages (less than 10% in my sample set). Users will often provide some vague hint to their actual location in the location field. Unfortunately this field is free form text and people tend to use different qualifications for their physical locations. They fall roughly into three categories:
Now what we need is something that can turn names of places into geo coordinates. And we need it to be able to tell that London, London UK and London Great Britain are the same thing. To achieve this, at the very least, we need a list of all the places in the world with their geo coordinates. I found that MaxMind publishes such a list here, which not only contains all cities but also the population for each city (if known), which could be interesting for disambiguation. The requirements for the location finder are roughly this:
Because this is all basic text matching, I decided to use a Lucene based index to lookup locations. By far the easiest way to use Lucene is to just download SOLR and use that. I came up with the following SOLR schema for storing locations: [xml] <field name="id" type="string" indexed="true" stored="true" required="true" /> <field name="location" type="text_ws" indexed="true" stored="true" multiValued="false" omitNorms="true" /> <field name="city" type="string" indexed="false" stored="true" multiValued="false" /> <field name="country" type="string" indexed="false" stored="true" multiValued="true" /> <field name="state" type="string" indexed="false" stored="true" multiValued="true" /> <field name="lat" type="string" indexed="false" stored="true" multiValued="false" /> <field name="lon" type="string" indexed="false" stored="true" multiValued="false" /> <field name="population" type="int" indexed="false" stored="true" multiValued="false" /> [/xml] The location field contains everything there is to know about a place, so that includes the city’s name, all possible ways to name the country (e.g. UK, United Kingdom, Great Britain, GB), and for US cities also the state both abbreviated and the full name. This is the field I base the search on. The other fields are not indexed, because they’re only used for post-processing of search results. When querying SOLR for a location, I ask it for the first 200 results. Afterwards I apply some basic scoring to the results, using the following steps: for each document in the result set
if the city is present in the location, add 1 point if the country is present in the location add 1 point (US only) if the state is present in the location add 1 point it this city has the largest population of the result set, add 0.5 point
If no results scored higher than 1.4, return nothing (not found). Otherwise use the result with the highest score. The threshold is required, because otherwise locations like ‘behind you!’ will match one of the many cities containing ‘you’ that exist in Vietnam. This logic is the simplest thing that will work. Further tweaking and refinement will give better results.
Because the amount of data you get from Twitter can be substantial, I chose to do the processing work as a MapReduce job using Hadoop. This will also allow me to use the same code later on, when I download even greater amounts of tweets (which I plan to do). I will not discuss Hadoop cluster setup here, as there are a lot of great resources on this subject elsewhere. Here I will just present the actual code used to process the tweets. Creating and running a MapReduce job in Java is quite easy. What you need is a mapper, a reducer and a main class that creates, configures and submits the job. In my case, the mapper is the one doing most of the work: [java] protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String location = (String) ((Map<String, Object>) ((Map<String, Object>) new ObjectMapper() .readValue(value.toString(), Map.class)) .get("user")).get("location"); String[] coords = finder.getLatLon(location.toLowerCase().replaceAll("[\\[\\]\",.;:~`?-_=+|!@#$%^&*()]", " ")); context.write(new Text(coords[0] + "," + coords[1]), new LongWritable(1L)); } catch(Throwable t) { log.error("ERROR in mapper.", t); context.getCounter(Counter.TWEET_ERRORS).increment(1L); } } [/java] The reducer side of the job only has to do the counting. This is very similar to the widely used word counting MapReduce example. [java] protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value : values) { count += value.get(); } context.write(key, new LongWritable(count)); } [/java] Finally, the JobRunner is required to do the boiler plate of setting up the job and submitting it to the cluster. [java] public class JobRunner extends Configured implements Tool { @Override public int run(String[] args) throws Exception { // Standard boilerplate for creating and running a hadoop job Configuration config = getConf(); Job job = new Job(config, "Tweet location counter job"); job.setJarByClass(this.getClass()); String input = args[0]; String output = args[1]; job.setJobName("Get tweets from " + input); // Input is just text files in HDFS TextInputFormat.setInputPaths(job, new Path(input)); job.setMapperClass(TweetLocationMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(LocationCountingReducer.class); job.setCombinerClass(LocationCountingReducer.class); // Output is to the table output format, and we set the table we want job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { try { int result = ToolRunner.run(new Configuration(), new JobRunner(), args); System.exit(result); } catch(Throwable t) { System.err.println("ERROR: " + t.getMessage()); t.printStackTrace(System.err); System.exit(1); } } } [/java] As you can see, I use the reducer class also as combiner for the job. It is often advisable to use a combiner in your MapReduce jobs. It will decrease the amount of intermediate data that has to be stored and moved over the network between nodes. Often the reducer can double as combiner, so it is easy to add. To run the job, you need a .jar file with your classes and all dependencies. Since my project is Maven based, I can easily create this .jar using the assembly plugin. Just put this in the pom.xml: [xml] <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>com.xebia.twitstuff.mapreduce.JobRunner</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> [/xml] And then do: [bash]Ӭmvn clean install assembly:assembly[/bash] Now, to run the job, first copy the data onto your cluster: [bash]hadoop fs -put tweets.json data/tweets.json[/bash] And then run the job: [bash]hadoop jar twitstuff-with-deps.jar data/tweets.json output/tweet-counts[/bash] Creating the map To plot the counts per location on a map I used geocommons. It is a Flash based mapping tool that allows you to upload datasets in a number of formats but you can also use existing datasets uploaded by others. Because the MapReduce job does only counting in the reducer (not a very heavy operation) and there is a combiner in place, I could set the number of reducers to 1. This means that there is only one output file coming from the job, which will contain all unique locations and a count. I used the TextOutputFormat for this job, so the output will be a tab delimited text file. Geocommons is a bit clever about data that you upload, so only a little bit of tweaking is required. I needed to take the tab delimited file and change it to CSV and I needed to add a header line on top of the file, naming the fields in the CSV. I named them lat, lon and count. So the resulting file looks like this: [code] lat,lon,count -0.0333333,109.3333333,7 -0.1,109.35,2 -0.1,34.75,1 -0.2166667,-78.5,17 -0.2833333,36.0666667,1 -0.3666667,-52.15,43 etc... [/code] When you upload this to geocommons, it will automatically figure out that there is latitude and longitude information in the file and let you use that for plotting the counts on a map. The rest is just a wizard and it will just work (hint: do some tweaking with the ranges and increase the number of levels for the heat map from 5 to 7). That is all it takes to create the map. All code is available here. (Note: the code that I wrote to do this is of poor quality, but it works and demonstrates the statements in this post. It is a quick hack. The repo does not include my tweets data and the world cities database from MaxMind but you can easily get those yourself).Conclusion Working with datasets is fun and easy. With a limited amount of code and some cleverness you can plot things on maps, extract averages, aggregates, counts, locations and trends from millions or billions of records. You can do this without buying supercomputers or spending big on infrastructure; a credit card and an internet connection will get you hours of using a cluster heavy enough to process gigabytes in minutes for roughly the price of a decent restaurant meal for two.So, if you have questions that your database cannot answer and you have access to data that contains the answer, MapReduce it and find out! Disclaimer The data used for this little experiment is incomplete. It covers only a five hour period, so it cannot say anything about the actual distribution of messages about Egypt. This is probably why the US West coast looks all quiet: everyone over there was still asleep when I ran the data collection. This post is an example to show how easy it is to work with data and is not meant to represent a real trending analysis.