Blog

Importieren von partitionierten Google Analytics-Daten in Hive mit Parquet

Giovanni Lanzani

Giovanni Lanzani

Aktualisiert Oktober 21, 2025
4 Minuten

Dieser Artikel ist in dem kostenlosen Magazin "Data Science in Production" erschienen.

Ich habe kürzlich daran gearbeitet, Google Analytics-Daten in einen Amazon EMR-Cluster zu importieren.

Google Analytics bot Dateien im Avro-Format an, aber wir wollten Parquet-Dateien, die nach Datum unterteilt sind (wir haben buchstäblich ein Feld date darin). Es gibt viele Gründe, warum Sie sich für das eine oder das andere entscheiden sollten, aber für uns kam es darauf an, dank des Spaltenformats schnellere Analysen zu erhalten.

Die Verwendung von Spark für den ETL-Prozess macht dies zu einem Kinderspiel:

(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
          .write.format('parquet')
          .partitionBy('date').saveAsTable('tablename'))

Oder doch? Das erste Problem ist, dass der Hive-Metaspeicher ein verschachteltes Schema mit mehr als 4000 Zeichen nicht akzeptiert, wenn eine Ihrer Spalten ein solches aufweist. 1

Wenn Sie sich umsehen, werden Sie feststellen, dass dieses Problem schon seit Oktober 2015 besteht. Der Schuldige ist offenbar Oracle (es ist immer Oracle!).

Die gute Nachricht ist, dass dieses Limit im Metastore geändert werden kann! Ich gehe davon aus, dass Sie eine Postgres-Instanz als Metaspeicher verwenden, aber die Syntax ist überall ähnlich!

Sobald Sie eingeloggt sind, geben Sie2

ALTER TABLE "COLUMNS_V2" ALTER COLUMN "TYPE_NAME"  TYPE VARCHAR(8000);
ALTER TABLE "TABLE_PARAMS" ALTER COLUMN "PARAMS_VALUES"  TYPE VARCHAR(8000);
ALTER TABLE "SERDE_PARAMS" ALTER COLUMN "PARAMS_VALUES"  TYPE VARCHAR(8000);
ALTER TABLE "SD_PARAMS" ALTER COLUMN "PARAMS_VALUES"  TYPE VARCHAR(8000);

An diesem Punkt könnten Sie den obigen Spark-Befehl erneut ausführen. Aber Sie werden überrascht sein, was Spark Ihnen sagt

WARN CreateDataSourceTableUtils: Persisting partitioned data source relation tmp.tmp1 into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.

Dies ist ein weiteres langjähriges Problem, bei dem die Lösung darin besteht, zunächst die Tabelle zu erstellen und dann eine der folgenden Maßnahmen zu ergreifen:

  • Einfügen für eine bestimmte Partition (dies kann mit INSERT INTO test_partition PARTITION(date=2013) SELECT * FROM test erreicht werden) oder:
  • Schreiben Sie direkt auf die Festplatte und erstellen Sie dann die Partition in Hive manuell (z.B.: ALTER TABLE test_partition ADD PARTITION(date=2013));

"Erstellen Sie zuerst die Tabelle" ist natürlich trügerisch einfach: Sie müssen eine partitionierte Tabelle erstellen, die im Grunde der ursprünglichen Tabelle entspricht, bis auf das Feld date, das als Partitionsspalte festgelegt werden muss.

Hier sind die Schritte, mit denen ich dies erreicht habe:

  • Verwenden Sie Spark, um die Avro-Datei zu lesen und sie unpartitioniert irgendwo hin zu schreiben
(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
          .write.format('parquet').saveAsTable('test'))
  • Verwenden Sie beeline, um das erstellte Schema zu speichern:
beeline -u {jdbc connection string} -e 'DESCRIBE test' > schema.sql
  • Bearbeiten Sie die Datei schema.sql von Hand, entfernen Sie date aus den Spalten und fügen Sie es als Partition hinzu
CREATE TABLE test_partition (
fullVisitorId  STRING,
visitorId      INT,
...
visitStartTime INT
--        date STRING,    note that this is commented out now
totals         STRUCT<...>,
...
)
PARTITION BY (date STRING) STORED AS PARQUET
  • Jetzt sollten Sie diese Abfrage in beeline ausführen. Da ich jedoch den Metastore nicht neu starten konnte und der Metastore auf ORM-Ebene prüfte, ob die Felder nicht länger als 4000 Zeichen waren, konnte ich es nicht tun. Nach einer guten Stunde der Suche dachte ich, dass ich die Abfrage einfach mit ausführen könnte. Ich hatte völlig vergessen, dass Spark den ORM umgehen kann.
  • Normalerweise könnten Sie jetzt einfach direkt mit .partitionBy('date').mode('append').saveAsTable('test_partition) schreiben. Sie können jedoch nicht mit verwenden, wenn die Tabelle bereits existiert. Und wenn Sie partitionBy entfernen, nimmt Spark an, dass field4 (die Spalte, die im obigen Beispiel nach date kam) als Partitionsspalte gedacht war (was natürlich nicht stimmt);
  • An diesem Punkt bleibt nur noch die Verwendung:
(df.write.format('parquet')
   .mode('append').partitionBy('date')
   .save('/user/hive/warehouse/database.db/test_partition'))
  • Da wir die Dateien manuell geschrieben haben, müssen wir Hive noch mitteilen, dass neue Partitionen vorhanden sind. Das ist in Spark ziemlich einfach zu programmieren (hässliche Interpolation, sorry!):
for dt in df.select('date').distinct().rdd.map(lambda row: row['date']).collect():
    spark.sql("ALTER TABLE test_partition ADD PARTITION(date=%s)" % dt)
  • Erledigt! Sie können die Daten nun abfragen. Beachten Sie, dass nur die letzten beiden Schritte erforderlich sind, wenn neue Dateien eingehen. Sie können sie ganz einfach mit dem Workflow-Manager Ihrer Wahl automatisieren!

Lassen Sie mich wissen, was Sie denken, vor allem, wenn Sie anderer Meinung sind (ich bin @gglanzani auf Twitter, wenn Sie mich erreichen wollen!).

Wir stellen ein


  1. In unserem Fall war es die Spalte hits. Einfach Schauen Sie sich die Menge der verschachtelten Felder an!
  2. 8000 ist willkürlich: Machen Sie daraus genug für Ihren Anwendungsfall!

Verfasst von

Giovanni Lanzani

Contact

Let’s discuss how we can support your journey.