Apache Hadoop (CDH 5) Flume with CDH5: a single-node Flume deployment (telnet example)
In this tutorial, we'll setup an flume agent and start the agent. Then, in a separate terminal, we will send an event to the agent via telnet. As a final step, we'll see the agent indeed receives the message from telnet.
Apache Flume is a continuous data ingestion system. It can efficiently collect, aggregate, and move large amounts of log data. It uses a simple extensible data model that allows for online analytic application.
Picture source: https://flume.apache.org/.
Picture source: Apache Flume.
Flume was originally designed to be a log aggregation system by Cloudera engineers, and evolved to handle any type of streaming event data.
"A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes. A Flume agent is a (JVM) process that hosts the components through which events flow from an external source to the next destination (hop)."
"A Flume source consumes events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. For example, an Avro Flume source can be used to receive Avro events from Avro clients or other Flume agents in the flow that send events from an Avro sink. A similar flow can be defined using a Thrift Flume Source to receive events from a Thrift Sink or a Flume Thrift Rpc Client or Thrift clients written in any language generated from the Flume thrift protocol.When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it's consumed by a Flume sink. The file channel is one example - it is backed by the local filesystem. The sink removes the event from the channel and puts it into an external repository like HDFS (via Flume HDFS sink) or forwards it to the Flume source of the next Flume agent (next hop) in the flow. The source and sink within the given agent run asynchronously with the events staged in the channel."
https://flume.apache.org/FlumeUserGuide.html.
Flume (blue) - Kafka (red):
Here is a good reference for Apache Kafka for Beginners.
The client operates at the point of origin of events and delivers them to a Flume agent. Clients typically operate in the process space of the application they are consuming data from. Flume currently supports Avro, log4j, syslog, and Http POST (with a JSON body) as ways to transfer data from a external source. Additionally, there's an ExecSource that can consume the output of a local process as input to Flume.
-From https://flume.apache.org/FlumeDeveloperGuide.html
[cloudera@quickstart Flume]$ flume-ng version Flume 1.5.0-cdh5.3.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: cc2139f386f7fccc9a6e105e2026228af58c6e9f Compiled by jenkins on Tue Dec 16 20:25:18 PST 2014 From source with checksum 0b02653a07c9e96af03ce2189b8d51c3
The "ng" in "flume-ng" means "next generation"
Flume 1.x provides a template configuration file for flume.conf called conf/flume-conf.properties.template and a template for flume-env.sh called conf/flume-env.sh.template.
[root@quickstart etc]# pwd /etc [root@quickstart etc]# tree flume*
Copy the Flume template property file conf/flume-conf.properties.template to conf/flume.conf, then edit it as appropriate.
$ sudo cp conf/flume-conf.properties.template conf/flume.conf
This is where we define your sources, sinks, and channels, and the flow within an agent. By default, the properties file is configured to work out of the box using a sequence generator source, a logger sink, and a memory channel (Cloudera QuickVM).
A sample of conf/flume.conf:
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent.sources = seqGenSrc agent.channels = memoryChannel agent.sinks = loggerSink # For each one of the sources, the type is defined agent.sources.seqGenSrc.type = seq # The channel can be defined as follows. agent.sources.seqGenSrc.channels = memoryChannel # Each sink's type must be defined agent.sinks.loggerSink.type = logger #Specify the channel the sink should use agent.sinks.loggerSink.channel = memoryChannel # Each channel's type is defined. agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel.capacity = 100
"Flume agent configuration is stored in a local configuration file. This is a text file that follows the Java properties file format. Configurations for one or more agents can be specified in the same configuration file. The configuration file includes properties of each source, sink and channel in an agent and how they are wired together to form data flows."
"Each component (source, sink or channel) in the flow has a name, type, and set of properties that are specific to the type and instantiation. For example, an Avro source needs a hostname (or IP address) and a port number to receive data from. A memory channel can have max queue size ("capacity"), and an HDFS sink needs to know the file system URI, path to create files, frequency of file rotation ("hdfs.rollInterval") etc. All such attributes of a component needs to be set in the properties file of the hosting Flume agent."
"The agent needs to know what individual components to load and how they are connected in order to constitute the flow. This is done by listing the names of each of the sources, sinks and channels in the agent, and then specifying the connecting channel for each sink and source. For example, an agent flows events from an Avro source called avroWeb to HDFS sink hdfs-cluster1 via a file channel called file-channel. The configuration file will contain names of these components and file-channel as a shared channel for both avroWeb source and hdfs-cluster1 sink."
- from https://flume.apache.org/FlumeUserGuide.html
We can start flume agent using flume-ng which is located in the bin directory of the Flume distribution:
[root@quickstart conf]# which flume-ng /usr/bin/flume-ng
We need to specify the agent name, the config directory, and the config file on the command line:
$ /usr/bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf
Then, the agent will start running source and sinks configured in the given properties file.
Here is an example configuration file which describes a single-node Flume deployment. This configuration below lets a user generate events and subsequently logs them to the console. As we can see, we define a single agent named a1.
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Notations: source, sink, channel.
a1 has a source that listens for data on port 44444, a channel that buffers event data in memory, and a sink that logs event data to the console. The configuration file names the various components, then describes their types and configuration parameters. A given configuration file might define several named agents; when a given Flume process is launched a flag is passed telling it which named agent to manifest.
With the example.conf, we can start Flume:
$ /usr/bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console ... 15/04/03 09:33:41 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 15/04/03 09:33:41 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:example.conf 15/04/03 09:33:41 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1 15/04/03 09:33:41 INFO conf.FlumeConfiguration: Processing:k1 15/04/03 09:33:41 INFO conf.FlumeConfiguration: Processing:k1 15/04/03 09:33:41 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 15/04/03 09:33:41 INFO node.AbstractConfigurationProvider: Creating channels 15/04/03 09:33:42 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 15/04/03 09:33:42 INFO node.AbstractConfigurationProvider: Created channel c1 15/04/03 09:33:42 INFO source.DefaultSourceFactory: Creating instance of source r1, type netcat 15/04/03 09:33:42 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 15/04/03 09:33:42 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1] 15/04/03 09:33:42 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@14874647 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 15/04/03 09:33:42 INFO node.Application: Starting Channel c1 15/04/03 09:33:42 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 15/04/03 09:33:42 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 15/04/03 09:33:42 INFO node.Application: Starting Sink k1 15/04/03 09:33:42 INFO node.Application: Starting Source r1
From a separate terminal, we can then telnet port 44444 and send Flume an event:
[cloudera@quickstart ~]$ telnet localhost 44444 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. Hello Flume! OK
The original Flume terminal will output the event in a log message:
15/04/03 09:33:42 INFO source.NetcatSource: Source starting 15/04/03 09:33:42 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] 15/04/03 09:42:12 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 21 0D Hello Flume!. }
Ph.D. / Golden Gate Ave, San Francisco / Seoul National Univ / Carnegie Mellon / UC Berkeley / DevOps / Deep Learning / Visualization