Interessieren Sie sich für Ihre beliebtesten Produkte, die wichtigsten Beiträge zu Ihren KPIs oder ähnliche Fragen zu Ihren Daten? Wahrscheinlich ja, denn es ist sehr üblich, dass man sich solche Einblicke in Analysen, Berichte und Dashboards wünscht.
Die Beantwortung von Fragen wie diesen mit der Apache Arrow DataFusion Abfrage-Engine ist nach Optimierungen an diesem Open-Source-Projekt [1] [2] [3] [4] jetzt viel schneller und benötigt weniger Speicher. Die Optimierungen werden in der nächsten Version von DataFusion (13) verfügbar sein.
Eine SQL-Abfrage in diesem "TopK"-Formular sieht wie folgt aus:
SELECT
o_orderkey,
o_totalprice
FROM
orders
ORDER BY
o_totalprice DESC
LIMIT 10
Die Abfrage gibt die Aufträge mit dem höchsten Gesamtpreis zurück. Dies führt zu folgendem Ergebnis (basierend auf der Auftragstabelle aus dem TPCH-Benchmark)
+------------+--------------+
| o_orderkey | o_totalprice |
+------------+--------------+
| 39394405 | 558822.56 |
| 3578692 | 558702.81 |
| 42290181 | 558289.17 |
| 2745894 | 557664.53 |
| 36667107 | 550142.18 |
| 43906817 | 549431.65 |
| 21213895 | 549380.08 |
| 49667013 | 542546.72 |
| 2199712 | 542154.01 |
| 30671170 | 541620.62 |
+------------+--------------+
Um die Ergebnisse für die Abfrage zu erhalten, müssen Sie die Daten sortieren (ORDER BY) und dann die ersten 10 (LIMIT 10) Ergebnisse nehmen. Und das ist genau das, was Datafusion vorher gemacht hat!
Schauen wir uns an, was die vorherige Version von DataFusion macht, indem wir uns einen (vereinfachten) Ausführungsplan einer einfachen Abfrage ansehen. Dazu verwenden wir eine EXPLAIN Anweisung.
| GlobalLimitExec: skip=0, fetch=10
| SortExec: [o_totalprice@4 DESC]
| CoalescePartitionsExec
| ParquetExec: limit=None, partitions=[..], projection=[..]
Sie können den Plan von unten nach oben lesen:
- ParquetExec: Durchsucht die vollständige (Parquet-)Tabelle mit allen benötigten Spalten. limit=None bedeutet, dass es keine Begrenzung gibt: Es müssen alle Daten gelesen werden, um die Ergebnisse zu erzeugen.
- CoalescePartitionsExec: alle Partitionen werden zu einer einzigen Partition zusammengefasst. Wenn Sie dies nicht tun, führt das Sortieren einzelner Partitionen nur zu sortierten Ergebnissen innerhalb jeder Partition.
- SortExec: Sortiert alle Eingabedaten nach der Reihenfolge des Ausdrucks. Dieser Operator sortiert und verwaltet alle Eingabedaten und sortiert / führt die einzelnen Stapel zusammen. Die Ausgabe sind vollständig sortierte Daten. Wenn die Daten nicht in den Speicher passen, gibt dieser Operator die Daten während der Verarbeitung auf die Festplatte aus.
- GlobalLimitExec: beschränkt die Ausgabe auf die Ausgabe der Abrufe, in diesem Fall die ersten 10.
Wenn Sie sich den Plan ansehen, gibt es zwei wichtige Optimierungsmöglichkeiten:
- Paralleles Sortieren. DataFusion kann zunächst einzelne Partitionen parallel sortieren. Danach sollten die sortierten Partitionen zusammengeführt werden, um eine vollständig sortierte Ausgabe zu erhalten. Glücklicherweise gibt es dafür in DataFusion bereits einen Operator namens
SortPreservingMergeExec . Dieser nimmt eine Reihe von Partitionen mit (einzeln) sortierten Daten auf und gibt eine Partition mit sortierten Daten aus. - Schieben Sie die Grenze zu SortExec. Mit der Information, dass nur 10 Elemente benötigt werden, können wir den Arbeitsaufwand für die Sortierung der Eingabedaten und die Speichernutzung reduzieren. Da wir wissen, dass wir nur 10 Elemente benötigen, kann das Programm die effizientere Teilsortierung verwenden und Elemente jenseits der Top 10 direkt verwerfen. Das bedeutet, dass wir, anstatt Stapel mit jeweils 8192 (die Standardstapelgröße) sortierten Objekten zu puffern, nur Stapel mit 10 Objekten (819,2 mal kleiner) aufbewahren und so einen Großteil der weiteren Verarbeitung und des Speicherbedarfs einsparen. Wir können damit auch die Zusammenführung der sortierten Stapel optimieren, indem wir die Informationen nutzen, um effizienter zu sortieren und die ersten 10 Elemente zu überspringen. Die Ausgabe von SortExec enthält jetzt nur noch 10 Elemente, was den Arbeitsaufwand in SortPreservingMergeExec einschränkt. Bei wirklich großen Tabellen bedeutet dies auch, dass wir in den meisten Fällen vermeiden, die Daten auf der Festplatte zu speichern, was noch mehr Zeit spart! In Ballista, der verteilten Abfrage-Engine, die auf DataFusion basiert, sind die Einsparungen sogar noch größer, da das Umschichten großer Datensätze vermieden wird, was oft der langsamste Teil der verteilten Abfrageausführung ist.
Ok, und wie sieht der Plan nach diesen Optimierungen aus?
| GlobalLimitExec: skip=0, fetch=10
| SortPreservingMergeExec: [o_totalprice@0 DESC]
| SortExec: [o_totalprice@4 DESC] fetch=10
| ParquetExec: limit=None, partitions=[..], projection=[..]
Was ist jetzt anders? CoalescePartitionsExec gibt es nicht mehr, was bedeutet, dass Sortierungen vollständig parallel ausgeführt werden können. SortExec wurde ein Fetch-Parameter hinzugefügt, der einen Großteil der Arbeit und des Speichers spart. Ein SortPreservingMergeExec wurde hinzugefügt, um die sortierten Partitionen in eine Partition zusammenzuführen, wobei die sortierten Partitionen zu einer vollständig sortierten Ausgabe zusammengeführt werden.
Sind wir in Bezug auf die Ausführungszeit besser geworden? Lassen Sie es uns herausfinden!
Zuerst führen wir es in DataFusion 12 aus:
SELECT o_orderkey, o_totalprice FROM orders ORDER BY o_totalprice DESC
LIMIT 10;
+------------+--------------+
| o_orderkey | o_totalprice |
+------------+--------------+
| 39394405 | 558822.56 |
| 3578692 | 558702.81 |
| 42290181 | 558289.17 |
| 2745894 | 557664.53 |
| 36667107 | 550142.18 |
| 43906817 | 549431.65 |
| 21213895 | 549380.08 |
| 49667013 | 542546.72 |
| 2199712 | 542154.01 |
| 30671170 | 541620.62 |
+------------+--------------+
10 rows in set. Query took 1.460 seconds.
Führen Sie nun die gleiche Abfrage in einer Version nach den Änderungen aus:
SELECT o_orderkey, o_totalprice FROM orders ORDER BY o_totalprice DESC
LIMIT 10;
+------------+--------------+
| o_orderkey | o_totalprice |
+------------+--------------+
| 39394405 | 558822.56 |
| 3578692 | 558702.81 |
| 42290181 | 558289.17 |
| 2745894 | 557664.53 |
| 36667107 | 550142.18 |
| 43906817 | 549431.65 |
| 21213895 | 549380.08 |
| 49667013 | 542546.72 |
| 2199712 | 542154.01 |
| 30671170 | 541620.62 |
+------------+--------------+
10 rows in set. Query took 0.093 seconds.
Die Ausführungszeit verbesserte sich von etwa 1,5 Sekunden auf eine fast sofortige Fertigstellung. Sie läuft mehr als 15 Mal so schnell! Außerdem verbraucht die neue Version nur noch wenig Speicherplatz, vorher waren es etwa 3 GB. Die genaue Beschleunigung, die Sie für andere Szenarien erwarten können, hängt von der Abfrage und den Eingabedaten ab, aber Sie können mit großen Beschleunigungen für ähnliche Abfragen rechnen, die ORDER BY mit LIMIT enthalten.
DataFusion Projekt
Sind Sie daran interessiert, DataFusion auszuprobieren oder haben Sie Interesse an dem Open-Source-Projekt? Schauen Sie sich das Projekt an und werden Sie Mitglied der Community.
Verfasst von
Daniël Heres
Contact
