Dieser Artikel ist in dem kostenlosen Magazin "Data Science in Production" erschienen.
Ich habe kürzlich daran gearbeitet, Google Analytics-Daten in einen Amazon EMR-Cluster zu importieren.
Google Analytics bot Dateien im Avro-Format an, aber wir wollten Parquet-Dateien, die nach Datum unterteilt sind (wir haben buchstäblich ein Feld date darin). Es gibt viele Gründe, warum Sie sich für das eine oder das andere entscheiden sollten, aber für uns kam es darauf an, dank des Spaltenformats schnellere Analysen zu erhalten.
Die Verwendung von Spark für den ETL-Prozess macht dies zu einem Kinderspiel:
(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
.write.format('parquet')
.partitionBy('date').saveAsTable('tablename'))
Oder doch? Das erste Problem ist, dass der Hive-Metaspeicher ein verschachteltes Schema mit mehr als 4000 Zeichen nicht akzeptiert, wenn eine Ihrer Spalten ein solches aufweist. 1
Wenn Sie sich umsehen, werden Sie feststellen, dass dieses Problem schon seit Oktober 2015 besteht. Der Schuldige ist offenbar Oracle (es ist immer Oracle!).
Die gute Nachricht ist, dass dieses Limit im Metastore geändert werden kann! Ich gehe davon aus, dass Sie eine Postgres-Instanz als Metaspeicher verwenden, aber die Syntax ist überall ähnlich!
Sobald Sie eingeloggt sind, geben Sie2
ALTER TABLE "COLUMNS_V2" ALTER COLUMN "TYPE_NAME" TYPE VARCHAR(8000);
ALTER TABLE "TABLE_PARAMS" ALTER COLUMN "PARAMS_VALUES" TYPE VARCHAR(8000);
ALTER TABLE "SERDE_PARAMS" ALTER COLUMN "PARAMS_VALUES" TYPE VARCHAR(8000);
ALTER TABLE "SD_PARAMS" ALTER COLUMN "PARAMS_VALUES" TYPE VARCHAR(8000);
An diesem Punkt könnten Sie den obigen Spark-Befehl erneut ausführen. Aber Sie werden überrascht sein, was Spark Ihnen sagt
WARN CreateDataSourceTableUtils: Persisting partitioned data source relation tmp.tmp1 into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
Dies ist ein weiteres langjähriges Problem, bei dem die Lösung darin besteht, zunächst die Tabelle zu erstellen und dann eine der folgenden Maßnahmen zu ergreifen:
- Einfügen für eine bestimmte Partition (dies kann mit
INSERT INTO test_partition PARTITION(date=2013) SELECT * FROM testerreicht werden) oder: - Schreiben Sie direkt auf die Festplatte und erstellen Sie dann die Partition in Hive manuell (z.B.:
ALTER TABLE test_partition ADD PARTITION(date=2013));
"Erstellen Sie zuerst die Tabelle" ist natürlich trügerisch einfach: Sie müssen eine partitionierte Tabelle erstellen, die im Grunde der ursprünglichen Tabelle entspricht, bis auf das Feld date, das als Partitionsspalte festgelegt werden muss.
Hier sind die Schritte, mit denen ich dies erreicht habe:
- Verwenden Sie Spark, um die Avro-Datei zu lesen und sie unpartitioniert irgendwo hin zu schreiben
(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
.write.format('parquet').saveAsTable('test'))
- Verwenden Sie beeline, um das erstellte Schema zu speichern:
beeline -u {jdbc connection string} -e 'DESCRIBE test' > schema.sql
- Bearbeiten Sie die Datei
schema.sqlvon Hand, entfernen Siedateaus den Spalten und fügen Sie es als Partition hinzu
CREATE TABLE test_partition (
fullVisitorId STRING,
visitorId INT,
...
visitStartTime INT
-- date STRING, note that this is commented out now
totals STRUCT<...>,
...
)
PARTITION BY (date STRING) STORED AS PARQUET
- Jetzt sollten Sie diese Abfrage in beeline ausführen. Da ich jedoch den Metastore nicht neu starten konnte und der Metastore auf ORM-Ebene prüfte, ob die Felder nicht länger als 4000 Zeichen waren, konnte ich es nicht tun. Nach einer guten Stunde der Suche dachte ich, dass ich die Abfrage einfach mit
ausführen könnte. Ich hatte völlig vergessen, dass Spark den ORM umgehen kann. - Normalerweise könnten Sie jetzt einfach direkt mit
.partitionBy('date').mode('append').saveAsTable('test_partition)schreiben. Sie können jedoch nichtmit verwenden, wenn die Tabelle bereits existiert. Und wenn Sie partitionByentfernen, nimmt Spark an, dassfield4(die Spalte, die im obigen Beispiel nachdatekam) als Partitionsspalte gedacht war (was natürlich nicht stimmt); - An diesem Punkt bleibt nur noch die Verwendung:
(df.write.format('parquet')
.mode('append').partitionBy('date')
.save('/user/hive/warehouse/database.db/test_partition'))
- Da wir die Dateien manuell geschrieben haben, müssen wir Hive noch mitteilen, dass neue Partitionen vorhanden sind. Das ist in Spark ziemlich einfach zu programmieren (hässliche Interpolation, sorry!):
for dt in df.select('date').distinct().rdd.map(lambda row: row['date']).collect():
spark.sql("ALTER TABLE test_partition ADD PARTITION(date=%s)" % dt)
- Erledigt! Sie können die Daten nun abfragen. Beachten Sie, dass nur die letzten beiden Schritte erforderlich sind, wenn neue Dateien eingehen. Sie können sie ganz einfach mit dem Workflow-Manager Ihrer Wahl automatisieren!
Lassen Sie mich wissen, was Sie denken, vor allem, wenn Sie anderer Meinung sind (ich bin @gglanzani auf Twitter, wenn Sie mich erreichen wollen!).
- In unserem Fall war es die Spalte
hits. Einfach Schauen Sie sich die Menge der verschachtelten Felder an! - 8000 ist willkürlich: Machen Sie daraus genug für Ihren Anwendungsfall!
Verfasst von
Giovanni Lanzani
Unsere Ideen
Weitere Blogs
Contact



