Including custom Flume components in Cloudera Manager

18 Feb, 2016

I’m currently working on a Hadoop project using Cloudera’s stack. We’re running a couple Flume jobs to move data around our cluster. Our Flume Metric Details page in Cloudera Manager looked like this:
You could infer from the image that we run a BarSource alongside our FooSource and BazSource and you would be correct. However, it doesn’t show up in Cloudera Manager. Why not?

The FooSource and BazSource are standard source types included with the platform. The BarSource is a subclass of AbstractEventDrivenSource that we wrote ourselves to pull data from a customer-specific system.
How do you get a custom Flume source or sink included on this dashboard? This is not difficult, the secret is simply JMX. There’s very little documentation. At the time of writing, the Flume Developer Guide doesn’t mention JMX at all.
The flume-core package includes JMX MBeans for each of the component types: SourceCounter, ChannelCounter and SinkCounter. If you include the appropriate counter MBean in your custom Flume component, that component will appear in Cloudera Manager. Here’s a simple example of SourceCounter in use:
[sourcecode language=”java” collapse=”false” highlight=”14,21,27,32,39,42,50,57″]
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractEventDrivenSource;
* Demonstrates the use of the {@code SourceCounter} in Flume-NG.
public class DemoSource extends AbstractEventDrivenSource {
private SourceCounter counter;
protected void doConfigure(Context context) throws FlumeException {
// Counter MBeans are created in the configure method, with the component name
// we’ve been provided.
this.counter = new SourceCounter(this.getName());
protected void doStart() throws FlumeException {
// You start the counter in start()
// This example is an event-driven source, so we’ll typically have some sort of
// connection and callback method.
protected void doStop() throws FlumeException {
// Disconnect from the data source…
// …and stop the counter.
* Callback handler for our example data source.
public void onIncomingData(Object dataSourceEvent) {
// Count how many events we receive…
// …do whatever processing it is we do…
Event flumeEvent = convertToFlumeEvent(dataSourceEvent);
// …and count how many are successfully forwarded.
The SourceCounter MBean has a some other metrics that you can increment as needed. The ChannelCounter and SinkCounter MBeans work the same way. Documentation is thin, but the Flume source code can be mined for examples: sources, channels and sinks.

Newest Most Voted
Inline Feedbacks
View all comments
Andrew Phillips
Andrew Phillips
6 years ago

Thanks for digging this up, Barend! Looking at one of the examples (ScribeSource), I notice that the code seems to try to prevent possible repeated initialization of the counter:
if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
Do you know if this is required? Have you run into repeated initializations of your source that mess with the counter and the way your source is displayed?

Explore related posts