Nehmen wir an, Sie haben eine tabulatorgetrennte Datei im HDFS, die nur eindeutige Entitäten enthält, eine pro Zeile, und Sie möchten jeder eine eindeutige ID zuweisen, die zudem monoton ansteigend sein muss. Für N Datensätze sollten die IDs also 0..N-1 sein. Wenn die Datei klein ist, werden Sie wahrscheinlich etwas ganz Ähnliches machen wie hier:
hadoop fs -cat /einige-datei.tsv | cat -n | hadoop fs -put - /irgendeine-datei-mit-ids.tsv
(wenn es durch Kommata getrennt ist oder etwas anderes, werden Sie wahrscheinlich etwas awk einfügen, um das zu beheben)
Und wir sind fertig! Bei großen Dateien ist dies jedoch problematisch, da alle Daten einen einzigen Prozess auf einem einzigen Rechner durchlaufen müssen. Aufgrund der mangelnden Parallelität in diesem Fall wird die Festplatte oder das Netzwerk zum Engpass. In diesem Beitrag zeigen wir Ihnen eine verteilte Methode, die in Hadoop MapReduce implementiert ist.
Verteilte Zeilennummerierung
Wir haben einige Anforderungen:
- Die Datensätze befinden sich in einer Datei auf HDFS und sind zeilenweise getrennt (1 Datensatz pro Zeile)
- Die IDs müssen von 0 bis N-1 reichen, wobei N die Anzahl der Datensätze ist.
- Ich möchte die Vorteile der Parallelität nutzen (also keinen Job mit einem einzigen Reducer)
Wir brauchen also wirklich ein verteiltes 'cat -n'. Wer hätte gedacht, dass das komplizierter ist als ein verteiltes 'grep'? Wie auch immer, hier ist eine Lösung:
- Lassen Sie die Daten durch einen Mapper laufen und lassen Sie den Mapper jeden Datensatz AS IS ausgeben
- Lassen Sie jeden Mapper verfolgen, wie viele Datensätze er an jeden Reducer sendet.
- Lassen Sie in der Bereinigungsmethode jeden Mapper einen speziellen Datensatz für jeden Reducer ausgeben, der die Gesamtzahl der Datensätze enthält, die er an den Reducer-Slot VOR diesem gesendet hat.
- Verwenden Sie einen speziellen Partitionierer, der normale Datensätze auf der Grundlage einer Hash-Funktion partitioniert (die den Mappern ebenfalls bekannt sein muss, damit die Zählungen korrekt sind) und der die speziellen Zählerdatensätze auf der Grundlage eines Partitionsfeldes partitioniert, das im Wert kodiert ist.
- Lassen Sie das Framework die Zähler-Schlüssel+Wert-Paare vor den Datensätzen sortieren, indem Sie einen speziellen Sortierkomparator verwenden.
- Lassen Sie das Framework alle Schlüssel+Wert-Paare gruppieren, so dass jeder Reduzierer nur einen Reduzierungsaufruf erhält, indem ein spezieller Gruppierungskomparator verwendet wird (der immer 0 zurückgibt).
- Lassen Sie jeden Reducer die Zähldatensätze abrufen und die Zählungen von jedem Mapper, den er erhält, akkumulieren
- Lassen Sie jeden Reducer jeden Datensatz AS IS ausgeben, wobei eine Zeilen-ID vorangestellt wird, die mit der in Schritt 7 berechneten ID-Sequenz beginnt.
Der Trick besteht also darin, die Mapping-Phase zu nutzen, um genügend globales Wissen über den Datensatz aufzubauen, um genau zu wissen, wie viele Datensätze es gibt und wie sie auf die Reduzierer aufgeteilt sind. Das Framework wird als eine Art riesige Synchronisationsbarriere verwendet und jeder Reducer weiß dann genau, wie viele Datensätze seinem Teil der Daten vorausgegangen sind.
Beispiel
Hier ist ein illustriertes / pseudocodiertes Beispiel für den gesamten Prozess. Der Auftrag in diesem Beispiel besteht aus zwei Mappern und drei Reduzierern. Die Partitionierung basiert auf einer fiktiven Hash-Funktion, die die Ergebnisse in Klammern angibt. Jeder Mapper führt ein Array von Zählern, um zu verfolgen, wie viele Datensätze er an jeden Reducer gesendet hat. Beachten Sie, dass sowohl der Mapper als auch der Partitionierer über die Hash-Funktion Bescheid wissen müssen, damit sie funktionieren.
Die Eingabedatei ist diese:
ABC (hash == 0) DEF (hash == 1) GHI (hash == 2) JKL (hash == 0) MNO (hash == 1) ------ split ------ (hier oben geht es zu Mapper 0, unten zu Mapper 1) PQR (hash == 2) STU (hash == 0) VWX (hash == 1) YZ0 (hash == 1)
Der Auftrag wird wie folgt ausgeführt:
Mapper 0: Einrichtung: Zähler initialisieren = [0, 0, 0] Karte: Eingabe "ABC" ==> "ABC" ausgeben, Zähler[0] inkrementieren Eingabe "DEF" ==> "DEF" ausgeben, Zähler[1] inkrementieren Eingabe "GHI" ==> "GHI" ausgeben, Zähler[2] inkrementieren Eingabe "JKL" ==> "JKL" ausgeben, Zähler[0] inkrementieren Eingabe "MNO" ==> "MNO" ausgeben, Zähler[1] erhöhen //jetzt Zähler == [2, 2, 1] Aufräumen: Zähler für Partition 0 ausgeben: keine Zähler für Partition 1 ausgeben: zähler[0] = 2 Zähler für Partition 2 ausgeben: Zähler[0] + Zähler[1] = 4 Mapper 1: Einrichtung: Zähler initialisieren = [0, 0, 0] Karte: Eingabe "PQR" ==> "PQR" ausgeben, Zähler[2] inkrementieren Eingabe "STU" ==> "STU" ausgeben, Zähler[0] inkrementieren Eingabe "VWX" ==> "VWX" ausgeben, Zähler[1] inkrementieren Eingabe "YZ0" ==> YZ0" ausgeben, Zähler[1] inkrementieren //jetzt Zähler == [1, 2, 1] Aufräumen: Zähler für Partition 0 ausgeben: keine Zähler für Partition 1 ausgeben: zähler[0] = 1 Zähler für Partition 2 ausgeben: Zähler[0] + Zähler[1] = 3 ========================= STARTEN SORTIEREN / MISCHEN / ZUSAMMENFÜHREN ========================= Das Framework sortiert alle Schlüssel+Wert-Paare anhand der Schlüssel. Wir verwenden ein spezielles Komparator, der nur dafür sorgt, dass die Zähler oberhalb des tatsächlichen Zeilen. Außerdem verwenden wir einen speziellen Gruppierungskomparator, der nur eine Gruppe pro Reducer, so dass jede Reduktionsmethode nur einmal aufgerufen wird, wobei die Zähler sortiert oben und alle Datensätze danach. Scrollen Sie nach unten für die die tatsächliche Umsetzung dieser. ========================= ENDE SORTIEREN / MISCHEN / ZUSAMMENFÜHREN =========================== Untersetzer 0: reduzieren: initialisieren offset = 0 Eingabe "ABC" ==> Offset" ausgebenABC", Offset inkrementieren //gibt 0 aus ABC input "JKL" ==> emit "offset JKL", Offset inkrementieren //gibt 1 aus ABC, und so weiter... input "STU" ==> emit "offset STU", Offset inkrementieren //der zuletzt gesendete Offset ist ab hier 2 Reducer 1: reduce: initialize offset = 0 input counter with value 2 ==> offset = offset + 2 input counter with value 1 ==> offset = offset + 1 //now offset == 3 input "DEF" ==> emit "offset DEF", Offset erhöhen //erzeugt 3 DEF input "MNO" ==> emit "offset MNO", Offset inkrementieren //emittiert 4 MNO, und so weiter... input "VWX" ==> emit "offset VWX", Offset inkrementieren input "XY0" ==> emit "offset XY0", Offset inkrementieren //der zuletzt gesendete Offset ist ab hier 6 Untersetzer 2: reduzieren: initialisieren offset = 0 Eingangszähler mit Wert 4 ==> Versatz = Versatz + 4 Eingangszähler mit Wert 3 ==> Versatz = Versatz + 3 //jetzt Versatz == 7 Eingabe "GHI" ==> Offset" ausgeben GHI", Offset inkrementieren //emits 7GHI input "PQR" ==> emit "offset PQR", offset inkrementieren //emits 8PQR
Das Ergebnis wird dies sein:
0 ABC 1 JKL 2 STU 3 DEF 4 MNO 5 VWX 6 XY0 7 GHI 8 PQR
Beachten Sie, dass die Reihenfolge der Datensätze bei diesem Vorgang nicht beibehalten wurde. Der Auftrag ordnet die Datensätze willkürlich neu an. Wenn das Ergebnis in irgendeiner Weise sortiert werden muss, gibt es die Lösung, die Daten vorzuprüfen und eine Partitionierung in totaler Reihenfolge vorzunehmen (genau wie im Terasort-Beispiel). Damit dies funktioniert, müssen die Mapper auch die Partitionsgrenzen kennen, die der Partitionierer für die Gesamtordnung verwendet.
Code
Es gibt zwei Quelldateien: RowNumberJob.java und RowNumberWritable.java. Die erste ist der Job (mit Mapper- und Reducer-Implementierungen) und die zweite ist die beschreibbare Implementierung, die für Zwischenwerte und den Partitionierer verwendet wird. Im Folgenden finden Sie einen kurzen Kommentar zu einigen der beweglichen Teile.
Implementierung
Die Implementierung dieses Auftrags besteht aus drei besonderen Teilen (außer der Implementierung von Mapper und Reducer):
- Ein benutzerdefinierter Partitionierer
- Ein benutzerdefinierter Gruppierungsvergleicher
- Eine benutzerdefinierte beschreibbare Datei zur Verwendung als Wert in den Zwischendaten
Partitionierer
In Hadoop kann der Partitionierer seine Partitionierungsentscheidung sowohl auf den Schlüssel als auch auf den Wert stützen. Wir machen uns diese Tatsache zunutze, indem wir die Partitionierung anhand des Wertes vornehmen. Wenn der Wert ein Zählerdatensatz ist, betrachten wir die vorgegebene Partition (die im Mapper-Code festgelegt wurde). Wenn der Wert ein tatsächlicher Datensatz ist, partitionieren wir auf der Grundlage des Hashwerts des Datensatzes. Die Hash-Logik wird ebenfalls aus dem Partitionierer exportiert (als Methode), so dass der Mapper-Code dieselbe Logik verwenden kann, um zu bestimmen, welche Zähler inkrementiert werden sollen.
Sortieren
Wenn Sie gut aufgepasst haben, wissen Sie, dass ich mich auf die Sortierung verlasse, um sicherzustellen, dass die Zählereinträge oben stehen. Um dies zu gewährleisten, müssen wir nichts Besonderes implementieren. Wir geben einfach die Zählerdatensätze mit einem Schlüssel aus, der vor dem Schlüssel sortiert wird, den wir für die Wertedatensätze verwenden. In diesem Fall verwenden wir ein standardmäßiges ByteWritable mit den Werten 'T' für Zähler und 'W' für Wert (die Niederländer werden das verstehen).
Gruppierung Vergleicher
Da wir die Zählerdatensätze nur einmal vom Mapper ausgeben, erhalten wir sie auch nur einmal auf der Seite des Reduzierers. Jeder Reduzierer gibt nur das aus, was er an tatsächlichen Datensätzen erhält. Es macht also keinen Sinn, Dinge zu gruppieren. Denken Sie daran, dass wir den Schlüssel verwenden, um zwischen den Zählern und den tatsächlichen Datensätzen zu unterscheiden (das ist notwendig, um die Zähler nach oben zu sortieren). Wenn wir keinen benutzerdefinierten Gruppierungskomparator erstellen würden, würden die Zähler und die tatsächlichen Werte als zwei verschiedene Gruppen in der Reduzierfunktion erscheinen. Um alles in eine Gruppe zu bekommen, implementieren wir einen Komparator, der immer 0 zurückgibt (was bedeutet, dass alles gleich ist).
Leistung
Wir lesen alle Daten, schreiben sie in einer anderen Reihenfolge zurück, schieben den größten Teil davon durch das Netzwerk und lesen und schreiben sie dann noch einmal. Das ist eine Menge Aufwand, nur um eine Zeilennummerierung durchzuführen. Ist es das wirklich wert? Sehen Sie sich diese Ergebnisse an:
Eingabegröße 1.45GB ==> Katze gewinnt
Pipe durch Kat: 0m23s MapReduce: 0m33s (50 Mapper, 3 Reducer)
Eingabegröße 145GB ==> MapReduce gewinnt
Pipe durch Kat: 44m03s MapReduce: 7m36s (2633 Mapper, 80 Reducer)
Dieser Test wurde auf einem 16-Knoten-Cluster mit 12 Festplatten/Knoten, 32 GB RAM und 1-Gb-NICs durchgeführt. Abgesehen von einigen Grundlagen wurden keine ausgefallenen Einstellungen vorgenommen. Ein- und Ausgabe waren unkomprimiert, Zwischendaten wurden mit dem Snappy Codec komprimiert. Der Pipe-Through-Cat-Test wurde vom Master-Knoten des Clusters aus durchgeführt und war durch die Verwendung jeweils nur einer einzigen Festplatte begrenzt.
Unsere Ideen
Weitere Blogs
Contact



