Why combine these two different things.
Hadoop is good for data crunching, but the end-results in flat files don’t present well to the customer, also it’s hard to visualize your network data in excel.
Neo4J is perfect for working with our networked data. We use it a lot when visualizing our different sets of data.
So we prepare our dataset with Hadoop and import it into Neo4J, the graph database, to be able to query and visualize the data.
We have a lot of different ways we want to look at our dataset so we tend to create a new extract of the data with some new properties to look at every few days.
This blog is about how we combined Hadoop and Neo4J and describes the phases we went trough in our search for the optimal solution.
So this is how we started it.
Phase I:
– Use Hive to prepare data. For those of you not familiar with the Hadoop ecosystem, Hive is a tool which enables you to use SQL to write queries which are transformed into map/reduce jobs. We use this to create a nodes table and an edges table out of our data.
The end-result of this series of queries is two sets of files which we can get out of our Hadoop cluster to our local machine.
The nodes table/file looks something like this:
NodeId Property1 Property2 PropertyN
AAA nameOfA amountOfA someAThing
BBB nameOfB amountOfB someBThing
CCC nameOfC amountOfC someCThing
DDD nameOfD amountOfD someDThing
The edges table/file looks something like this:
fromNodeId ToNodeId EdgeProperty1 EdgePropertyN
AAA BBB someDate1 someNumber1
AAA DDD someDate2 someNumber2
BBB DDD someDate3 someNumber3
CCC BBB someDate4 someNumber4
DDD BBB someDate5 someNumber5
DDD CCC someDate6 someNumber6
– For loading these sets into Neo4J we use the batchinserter.
At the time we build our first importer the version of Neo4J was 1.6-ish. So we wrote some code and started the import.
The dataset we’re talking about has some 30 Million nodes with 9 properties each and about 650 Million edges with 4 properties each.
[code lang=”java”]
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.neo4j.kernel.impl.batchinsert.BatchInserter;
import org.neo4j.kernel.impl.batchinsert.BatchInserterImpl;
BatchInserter db = new BatchInserterImpl(<outputPath>, <config>)
long[] idCache = new long[<nrOfNodes>];
BufferedReader reader = new BufferedReader(new InputStreamReader(<InputStreamThingy>), 100 * 1024 *1024)
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(‘\t’);
int myOwnId = Integer.parseInt(parts[0]);
//some property magic goes here
idCache[myOwnId] = db.createNode(<propertiesMap>);
}
reader.close();
//Edges
reader = new BufferedReader(newInputStreamReader(<InputStreamThingyforEdges>), 100 * 1024 *1024)
while ((line = reader.readLine()) != null) {
String[] parts = line.split(‘\t’);
int fromNodeOwnId = Integer.parseInt(parts[0]);
int toNodeOwnId = Integer.parseInt(parts[1]);
//some property magic goes here
db.createRelationship(idCache[fromNodeOwnId], idCache[toNodeOwnId], <RelationshipType>, <propertiesMap>);
}
reader.close();
[/code]
– We import these nodes and edges on our desktop machine with 16Gb of RAM which takes about a full 20 hrs to complete.
Phase II:
So we need to speed things up a little. In this phase we remove the part where we get the nodes and edges files from our hadoop cluster to our local machine and read them straight from the cluster.
– Still use hive to prepare the data
– Make the importer read the files from the cluster directly (no copy needed anymore)
[code lang=”java”]
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
FileSystem fs = FileSystem.get(<hadoop configuration>);
FileStatus[] files = fs.globStatus(new Path(<pathToNodesTableOrFile>));
for (FileStatus file : files) {
DataInputStream dis = new DataInputstream(fs.open(file.getPath()));
BufferedReader reader = new BufferedReader(new InputStreamReader(dis), 100 * 1024 * 1024);
while ((line = reader.readLine()) != null) {
}
}
[/code]
– Run the importer on one of the worker nodes of the Hadoop cluster (has 32Gb) (make sure no Hadoop processes are running so we can take the full 32GB)
this takes about a full 16 hrs to complete and we need to get the data from that machine to the machine where we run the Neo4J database (took about 2 hrs for about 80 GB)
Phase III:
We achieved very little improvement of the total time it takes to create the full Neo4J database so we needed to try out some more.
On the mailing list there were some rumors that the current version of Neo4J (1.8) had some major improvements in performance of the batchinserter
So we upgraded the importer code to use version 1.8 of Neo4J (was 1.6)
The only code change in our own code was in the way we create the BatchInserter class. Other optimizations are inside the Neo4J code, mainly in the way the paging subsystem in the PersistenceWindowPool works.
[code lang=”java”]
import org.neo4j.unsafe.batchinsert.BatchInserter;
import org.neo4j.unsafe.batchinsert.BatchInserters;
BatchInserter db = BatchInserters.inserter(<outputPath>, <config>)
[/code]
– Still use Hive to prepare the data
– Run the importer on one of the worker nodes of the Hadoop cluster
takes about a 3 hrs to complete and we need to get the data from that machine to the machine where we run the Neo4J database (took about 2 hrs for about 80 GB)
This is were we are now. But we still have something up our sleeves.
While we were waiting for these imports to complete we decided to look if we could make the batchimporter work in a distributed way so we could make use of our 16 node Hadoop cluster to create the Neo4J initial database faster.
In the next blog I will go into the details about that quest