I’m currently working on a Hadoop project using Cloudera’s stack. We’re running a couple Flume jobs to move data around our cluster. Our Flume Metric Details page in Cloudera Manager looked like this:

screenshot

You could infer from the image that we run a BarSource alongside our FooSource and BazSource and you would be correct. However, it doesn’t show up in Cloudera Manager. Why not?

The FooSource and BazSource are standard source types included with the platform. The BarSource is a subclass of AbstractEventDrivenSource that we wrote ourselves to pull data from a customer-specific system.

How do you get a custom Flume source or sink included on this dashboard? This is not difficult, the secret is simply JMX. There’s very little documentation. At the time of writing, the Flume Developer Guide doesn’t mention JMX at all.

The flume-core package includes JMX MBeans for each of the component types: SourceCounter, ChannelCounter and SinkCounter. If you include the appropriate counter MBean in your custom Flume component, that component will appear in Cloudera Manager. Here’s a simple example of SourceCounter in use:

[sourcecode language=”java” collapse=”false” highlight=”14,21,27,32,39,42,50,57″]
package com.xebia.blog.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractEventDrivenSource;

/**
* Demonstrates the use of the {@code SourceCounter} in Flume-NG.
*/
public class DemoSource extends AbstractEventDrivenSource {

private SourceCounter counter;

@Override
protected void doConfigure(Context context) throws FlumeException {

// Counter MBeans are created in the configure method, with the component name
// we’ve been provided.
this.counter = new SourceCounter(this.getName());
}

@Override
protected void doStart() throws FlumeException {
// You start the counter in start()
this.counter.start();

// This example is an event-driven source, so we’ll typically have some sort of
// connection and callback method.
connectToDataSourceWithCallback(this);
this.counter.setOpenConnectionCount(1);
}

@Override
protected void doStop() throws FlumeException {
// Disconnect from the data source…
disconnect();
this.counter.setOpenConnectionCount(0);

// …and stop the counter.
this.counter.stop();
}

/**
* Callback handler for our example data source.
*/
public void onIncomingData(Object dataSourceEvent) {
// Count how many events we receive…
this.counter.incrementEventReceivedCount();

// …do whatever processing it is we do…
Event flumeEvent = convertToFlumeEvent(dataSourceEvent);

// …and count how many are successfully forwarded.
getChannelProcessor().processEvent(flumeEvent);
this.counter.incrementEventAcceptedCount();
}
}
[/sourcecode]

The SourceCounter MBean has a some other metrics that you can increment as needed. The ChannelCounter and SinkCounter MBeans work the same way. Documentation is thin, but the Flume source code can be mined for examples: sources, channels and sinks.