This article is featured in the free magazine "Data Science in ProductionÂ
I was recently working on importing Google Analytics data into an Amazon EMR cluster.
Google Analytics was offering files as Avro, but we wanted Parquet files partitioned by date (we literally have a field date in there). There are multiple reasons why you would choose one or another, but for us it came down to faster analytics thanks to the columnar format.
Using Spark for the ETL process makes this a piece of cake:
(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
.write.format('parquet')
.partitionBy('date').saveAsTable('tablename'))
Or does it? The first issue is that if one of your columns has a nested schema exceeding 4000 characters, the Hive metastore will not accept it.1
If you look around, you'll see this is a long standing issue open since October 2015. Who's to blame here is apparently Oracle (it's always Oracle!).
The good news is that this limit can be changed in the metastore! I'm assuming you're using a Postgres instance as the metastore, but the syntax is similar all across the board!
Once you're logged in type2
ALTER TABLE "COLUMNS_V2" ALTER COLUMN "TYPE_NAME" TYPE VARCHAR(8000);
ALTER TABLE "TABLE_PARAMS" ALTER COLUMN "PARAMS_VALUES" TYPE VARCHAR(8000);
ALTER TABLE "SERDE_PARAMS" ALTER COLUMN "PARAMS_VALUES" TYPE VARCHAR(8000);
ALTER TABLE "SD_PARAMS" ALTER COLUMN "PARAMS_VALUES" TYPE VARCHAR(8000);
At this point you might re-execute the Spark command above. But you'd be surprised by what Spark tells you
WARN CreateDataSourceTableUtils: Persisting partitioned data source relation tmp.tmp1 into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
This is another long standing issue where the workaround is to first create the table, and then do one of the following:
- Insert into it for a particular partition (this can be accomplished with
INSERT INTO test_partition PARTITION(date=2013) SELECT * FROM test) or: - Write directly to disk and then create the partition in Hive manually (for example:
ALTER TABLE test_partition ADD PARTITION(date=2013));
"First create the table" is of course deceptively simple: you need to create a partitioned table which is basically equal to the initial one, save for the date field, that has to be set as the partition column.
Here's the steps I've used to get this done:
- Use Spark to read the Avro file and write it unpartitioned somewhere
(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
.write.format('parquet').saveAsTable('test'))
- Use beeline to save the created schema:
beeline -u {jdbc connection string} -e 'DESCRIBE test' > schema.sql
- Edit the
schema.sqlfile by hand, removedatefrom the columns and add it as a partition
CREATE TABLE test_partition (
fullVisitorId STRING,
visitorId INT,
...
visitStartTime INT
-- date STRING, note that this is commented out now
totals STRUCT<...>,
...
)
PARTITION BY (date STRING) STORED AS PARQUET
- Now you should execute this query in beeline. However, as I could not restart the metastore, and the metastore was checking on an ORM level for fields not longer than 4000 characters, I could not do it. After a good hour of searching, I thought that I could just execute the query using
spark.sql("""YOUR QUERY HERE"""). I totally forgot that Spark could bypass the ORM. - Now, normally, you could just directly write using
.partitionBy('date').mode('append').saveAsTable('test_partition). However you cannot usepartitionBywithsaveAsTableif the table already exists. And if you remove thepartitionBy, Spark assumes thatfield4(the one that was coming afterdatein the example above) was supposed to be partition column (this is of course not correct); - At this point what's left is to use:
(df.write.format('parquet')
.mode('append').partitionBy('date')
.save('/user/hive/warehouse/database.db/test_partition'))
- Since we've manually written the files, we still need to tell Hive that new partitions are there. Doing that programmatically in Spark is pretty simple (ugly interpolation, sorry!):
for dt in df.select('date').distinct().rdd.map(lambda row: row['date']).collect():
spark.sql("ALTER TABLE test_partition ADD PARTITION(date=%s)" % dt)
- Done! You can now query the data. Note that only the last two steps are needed when new files come in. They can automated easily enough with the workflow manager of your choice!
Let me know what you think, especially if you disagree (I'm @gglanzani on Twitter if you want to reach out!).
Written by
Giovanni Lanzani
Our Ideas
Explore More Blogs
A
- Agent-Oriented Architecture
- Agentic AI Alignment
- Agentic AI for Customer Engagement
- Agentic AI for Decision Support
- Agentic AI for Knowledge Management
- Agentic AI for Predictive Operations
- Agentic AI for Process Optimization
- Agentic AI for Workflow Automation
- Agentic AI Safety
- Agentic AI Strategy
- Agile Development
- Agile Development Methodology
- AI Agents for IT Service Management
- AI for Compliance Monitoring
- AI for Customer Sentiment Analysis
- AI for Demand Forecasting
- AI for Edge Computing (Edge AI)
- AI for Energy Consumption Optimization
- AI for Predictive Analytics
- AI for Predictive Maintenance
- AI for Real Time Risk Monitoring
- AI for Telecom Network Optimization
- AI Governance Frameworks
- AI Implementation Approach
- AI Implementation Methodology
- AI in Cybersecurity
- AI Orchestration
- AI Performance Measurement (KPIs, ROI)
- AI Use-Case Discovery
- AI Use-Case Prioritization
- AI-Driven Business Transformation
- AI-Driven Cybersecurity Solutions
- Algorithm
- API Integration
- API Management
- Application Modernization
- Applied & GenAI
- Artificial Intelligence
- Artificial Neural Network
- Augmented Reality
- Autonomous AI Agents



