Blog
Einbindung benutzerdefinierter Flume-Komponenten in Cloudera Manager

Ich arbeite derzeit an einem Hadoop-Projekt mit dem Stack von Cloudera. Wir lassen ein paar Flume-Jobs laufen, um Daten in unserem Cluster zu bewegen. Unsere Seite mit den Flume-Metrikdetails im Cloudera Manager sieht so aus:
Sie könnten aus dem Bild schließen, dass wir neben unserem FooSource und BazSource auch ein BarSource betreiben, und Sie hätten Recht. Allerdings wird sie im Cloudera Manager nicht angezeigt. Und warum nicht?
FooSource und BazSource sind Standardquellentypen, die in der Plattform enthalten sind. Der BarSource ist eine Unterklasse von AbstractEventDrivenSource die wir selbst geschrieben haben, um Daten aus einem kundenspezifischen System zu ziehen.
Wie können Sie eine benutzerdefinierte Flume-Quelle oder -Senke in dieses Dashboard aufnehmen? Das ist nicht schwer, das Geheimnis ist einfach JMX. Leider ist die Dokumentation ein wenig dünn. Zum Zeitpunkt der Erstellung dieses Artikels wird JMX im Flume Developer Guide überhaupt nicht erwähnt.
Das Paket flume-core enthält JMX MBeans für jeden der Komponententypen: SourceCounter, ChannelCounter und SinkCounter. Wenn Sie die entsprechende Zähler-MBean in Ihre benutzerdefinierte Flume-Komponente aufnehmen, wird diese Komponente im Cloudera Manager angezeigt. Hier ist ein einfaches Beispiel für SourceCounter im Einsatz:
Paket com.xebia.blog.flume; importieren org.apache.flume.Context; importieren org.apache.flume.Event; importieren org.apache.flume.FlumeException; importieren org.apache.flume.instrumentation.SourceCounter; importieren org.apache.flume.source.AbstractEventDrivenSource; /** * Demonstriert die Verwendung des {@code SourceCounter}` in Flume-NG. */ öffentlich Klasse DemoSource erweitert AbstractEventDrivenSource { privat QuellZähler Zähler; @Override geschützt void doConfigure(Kontext Kontext) wirft FlumeException { // Zähler-MBeans werden in der configure-Methode mit dem Komponentennamen erstellt, den wir erhalten haben. diese.Zähler = neu QuellZähler(diese.getName()); } @Override geschützt void doStart() wirft FlumeException { // Sie starten den Zähler in start() diese.Zähler.starten(); // Bei diesem Beispiel handelt es sich um eine ereignisgesteuerte Quelle, so dass wir in der Regel eine Art von Verbindung und Rückrufmethode haben. connectToDataSourceWithCallback(diese); diese.Zähler.setOpenConnectionCount(1); } @Override geschützt void doStop() wirft FlumeException { // Trennen Sie die Verbindung mit der Datenquelle... Trennen Sie die Verbindung(); diese.Zähler.setOpenConnectionCount(0); // ...und stoppen Sie den Zähler. diese.Zähler.stoppen(); } /** * Callback-Handler für unsere Beispiel-Datenquelle. */ öffentlich void onIncomingData(Objekt dataSourceEvent) { // Zählen Sie, wie viele Ereignisse wir erhalten... diese.Zähler.incrementEventReceivedCount(); // ...tun, was auch immer wir tun... Veranstaltung flumeEvent = convertToFlumeEvent(dataSourceEvent); // ...und zählen, wie viele erfolgreich weitergeleitet wurden. getChannelProcessor().processEvent(flumeEvent); diese.Zähler.incrementEventAcceptedCount(); } }
Die MBean SourceCounter hat einige andere Metriken, die Sie nach Bedarf erhöhen können. Die MBeans ChannelCounter und SinkCounter funktionieren auf die gleiche Weise. Anstelle einer vollständigen Dokumentation können Sie den Flume-Quellcode nach Beispielen durchsuchen: Quellen, Kanäle und Senken.
Dieser Artikel wurde ursprünglich auf dem Xebia Blog veröffentlicht.
Verfasst von
Barend Garvelink
Unsere Ideen
Weitere Blogs
Contact



