This article is featured in the free magazine "Data Science in Production
I was recently working on importing Google Analytics data into an Amazon EMR cluster.
Google Analytics was offering files as Avro, but we wanted Parquet files partitioned by date (we literally have a field date
in there). There are multiple reasons why you would choose one or another, but for us it came down to faster analytics thanks to the columnar format.
Using Spark for the ETL process makes this a piece of cake:
(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
.write.format('parquet')
.partitionBy('date').saveAsTable('tablename'))
Or does it? The first issue is that if one of your columns has a nested schema exceeding 4000 characters, the Hive metastore will not accept it.1
If you look around, you’ll see this is a long standing issue open since October 2015. Who’s to blame here is apparently Oracle (it’s always Oracle!).
The good news is that this limit can be changed in the metastore! I’m assuming you’re using a Postgres instance as the metastore, but the syntax is similar all across the board!
Once you’re logged in type2
ALTER TABLE "COLUMNS_V2" ALTER COLUMN "TYPE_NAME" TYPE VARCHAR(8000);
ALTER TABLE "TABLE_PARAMS" ALTER COLUMN "PARAMS_VALUES" TYPE VARCHAR(8000);
ALTER TABLE "SERDE_PARAMS" ALTER COLUMN "PARAMS_VALUES" TYPE VARCHAR(8000);
ALTER TABLE "SD_PARAMS" ALTER COLUMN "PARAMS_VALUES" TYPE VARCHAR(8000);
At this point you might re-execute the Spark command above. But you’d be surprised by what Spark tells you
WARN CreateDataSourceTableUtils: Persisting partitioned data source relation <code>tmp
.tmp1
into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
This is another long standing issue where the workaround is to first create the table, and then do one of the following:
- Insert into it for a particular partition (this can be accomplished with
INSERT INTO test_partition PARTITION(date=2013) SELECT * FROM test
) or: - Write directly to disk and then create the partition in Hive manually (for example:
ALTER TABLE test_partition ADD PARTITION(date=2013)
);
"First create the table" is of course deceptively simple: you need to create a partitioned table which is basically equal to the initial one, save for the date
field, that has to be set as the partition column.
Here’s the steps I’ve used to get this done:
- Use Spark to read the Avro file and write it unpartitioned somewhere
(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
.write.format('parquet').saveAsTable('test'))
- Use beeline to save the created schema:
beeline -u {jdbc connection string} -e 'DESCRIBE test' > schema.sql
- Edit the
schema.sql
file by hand, removedate
from the columns and add it as a partition
CREATE TABLE test_partition (
fullVisitorId STRING,
visitorId INT,
...
visitStartTime INT
-- date STRING, note that this is commented out now
totals STRUCT<...>,
...
)
PARTITION BY (<code>date
STRING) STORED AS PARQUET
- Now you should execute this query in beeline. However, as I could not restart the metastore, and the metastore was checking on an ORM level for fields not longer than 4000 characters, I could not do it. After a good hour of searching, I thought that I could just execute the query using
spark.sql("""YOUR QUERY HERE""")
. I totally forgot that Spark could bypass the ORM. - Now, normally, you could just directly write using
.partitionBy('date').mode('append').saveAsTable('test_partition)
. However you cannot usepartitionBy
withsaveAsTable
if the table already exists. And if you remove thepartitionBy
, Spark assumes thatfield4
(the one that was coming afterdate
in the example above) was supposed to be partition column (this is of course not correct); - At this point what’s left is to use:
(df.write.format('parquet')
.mode('append').partitionBy('date')
.save('/user/hive/warehouse/database.db/test_partition'))
- Since we’ve manually written the files, we still need to tell Hive that new partitions are there. Doing that programmatically in Spark is pretty simple (ugly interpolation, sorry!):
for dt in df.select('date').distinct().rdd.map(lambda row: row['date']).collect():
spark.sql("ALTER TABLE test_partition ADD PARTITION(<code>date
=%s)" % dt)
- Done! You can now query the data. Note that only the last two steps are needed when new files come in. They can automated easily enough with the workflow manager of your choice!
Let me know what you think, especially if you disagree (I’m @gglanzani on Twitter if you want to reach out!).