Using Flume Beyond Ingesting Data Streams into Hadoop

0

Apache Flume and Streaming Data:

Apache Flume, as its website mentions – is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store such as Hadoop HDFS. The application of Apache Flume is restricted not only to log data aggregation. It can be used to transport large volumes data streams from any possible data source.

In an organization, we would have a number of data sources that generate a variety of streaming data at high velocity and high volume. Processing streaming data has become very important and essential. For instance:

  • In the banking/financial services domain, monitoring credit card transactions that keep streaming in continuously from various sources like transactions at points-of-sale, ecommerce sites and mobile apps to detect and flag credit card frauds
  • Purchase behaviour from the above type of data streams can also be used by retail industry to push attractive sales/discount campaigns to clients as part of their up-selling or cross-selling promotions.
  • In the IT sector, several network or mail servers of data centres keep generating logs. Processing these will be helpful in identifying slow moving traffic, bottlenecks and locations where additional servers are required to ease or handle the traffic.
  • Capturing web-click streams by ecommerce sites to identify browsing patterns of prospective customers and send promotion campaigns that are pinpointed to their needs based on this data streams.


With Flume, we can not only capture and ingest the streaming data into a data store like Hadoop HDFS but we can also do certain amount of in-flight processing of this data. We will discuss how we can use Apache Flume with a specific attention to on this feature.

Components of Flume:

To process a data stream, we run a Flume agent, which is a long running process. A unit of data flow is referred to as an event. A flume agent hosts the components through which events flow from an external source to the next destination (sink).

To set up a flume agent, we need to write a configuration file specifying the properties of the source, channel and the sink. We can also specify properties of interceptors which gives the ability to modify and/or drop events in-flight.

Once the configuration is written, we can run a Flume agent using the command flume-ng which comes with Flume installation with the syntax given below.

flume-ng agent -n <agent_name> -c conf -f <flume-conf.properties>

Following are the details of each component.

Agent: The Agent receives events from clients or any other agents. Flume-Agent is an independent daemon process, which is installed on each node to collect the events. Any Java Virtual Machine (JVM) that runs flume, consists of sources, sinks, channels and other important components through which events get transferred from one place to another.

Source: The source is the component of an Agent which receives data from the data generators and transfers it to one or more channels in the form of Flume events. The major sources are:

  • NetCat Source
  • Spooling Directory Source
  • Syslog Source

Flume also allows us to specify advanced sources of data streams such as Twitter, Kafka and so on.

Example – Properties of a source in a typical Flume configuration:

# Name the components on this agent

agent1.sources = r1

# Describe/configure the source – SpoolDir

agent1.sources.r1.type = spooldir

agent1.sources.r1.spoolDir = /home/hduser/Desktop/flume/input

Channel: Channels are communication bridges between sources and sinks within an agent. Flume provides support for various channels, like:

  • Memory Channel
  • JDBC Channel
  • Kafka Channel
  • File Channel

Example – Properties of a channel in a typical Flume configuration:

agent1.channels = c1

# Use a channel which buffers events in file

agent1.channels.c1.type = file

agent1.channels.c1.capacity = 1000

agent1.channels.c1.transactionCapacity = 100

agent1.channels.c1.checkpointDir = /home/hduser/Desktop/flume/checkpoint/

agent1.channels.c1.dataDirs = /home/hduser/Desktop/flume/dataDir/

agent1.channels.file.writeFormat = Text

Sinks: Events are transferred to HDFS. Sinks supports multiple file formats like Text, AVRO, JSON, Compressed files. There are multiple sinks available that deliver data to a wide range of destinations, such as :

  • HDFS Sink
  • HBase Sink
  • Logger Sink

Example – Properties of a sink in a typical Flume configuration:

# Describe the HDFS sink

agent1.channels = c1

agent1.sinks = k1

agent1.sinks.k1.type = hdfs

agent1.sinks.k1.channel = c1

agent1.sinks.k1.hdfs.path = /user/hduser/regexpfilter

agent1.sinks.k1.hdfs.fileType= DataStream

agent1.sinks.k1.hdfs.writeFormat = Text

Flume Interceptors:

The events received by sources can be transformed or dropped by interceptors before they are written to the corresponding channels. Interceptors are simple components that sit between a source and the channels. Interceptors are commonly used to analyse events, we can do any filter, aggregation operations in Interceptors.

Types of Interceptors:

Timestamp Interceptor:

It inserts the timestamp into the Flume event headers with the timestamp key. The  timestamp key  is the header that the HDFS Sink uses for bucketing. If the timestamp header is already present, this interceptor will replace it unless the preserve Existing parameter is set to false. To add a timestamp interceptor, use the alias timestamp.

The configuration parameters are:

agent1.sources.source_name.interceptors = Interceptor_timestamp

agent1.sources.source_name.interceptors. Interceptor_timestamp.type =  timestamp

agent1.sources.source_name.interceptors. Interceptor_timestamp.preserveExisting = false

Host Interceptor

The host interceptor inserts the hostname  or IP address of the server on which the agent is running into the Flume event headers. The key to be used in the headers is configurable using the host Header parameter, but defaults to host. We can insert the hostname instead of the IP address, set useIP to false.

The configuration parameters are:

agent1.sources.source_name.interceptors = Interceptor_host

agent1.sources.source_name.interceptors.Interceptor_host.type = host

agent1.sources.source_name.interceptors.Interceptor_host.useIP = false

agent1.sources.source_name.interceptors.Interceptor_host.preserveExisting = true

UUID Interceptor:

UUID abbreviated as Universally Unique Identifier is used to set unique identifier on all events. These events are intercepted and assigned an UUID of 128-bit value. This enables deduplicating documents that may be accidentally duplicated because of replication and redelivery in a Flume network that is designed for high availability and high performance.

The configuration parameters are:

agent1.sources.source_name.interceptors = Interceptor_uuid

agent1.sources.source_name.interceptors. Interceptor_uuid.headerName = eventId

agent1.sources.source_name.interceptors. Interceptor_uuid.prefix = usingFlume

agent1.sources.source_name.interceptors. Interceptor_uuid.preserveExising = false

Regex filtering interceptor:

The previous discussed Interceptors push the entire events to sinks, which may result in huge data in sink. The Regex filtering interceptors can be used to make sure only important events are passed through Flume agents to reduce the volume of data being pushed into HDFS.

The configuration parameters are:

agent1.sources.source_name.interceptors = include exclude

agent1.sources.source_name.interceptors.include.type = regex_filter

agent1.sources.source_name.interceptors.include.regex = .*Pattern1.*

agent1.sources.source_name.interceptors.include.excludeEvents = false

agent1.sources.source_name.interceptors.exclude.type = regex_filter

agent1.sources.source_name.interceptors.exclude.regex = .* Pattern2.*

agent1.sources.source_name.interceptors.exclude.excludeEvents = true

Use Case:

An American Internet Service Provider (ISP) offers internet service to its clients across the globe. It has servers in every region and a huge network traffic in their servers at some point of time, which leads their servers brings down. They want to analyse the traffic congestion on their network using the log data generated by the servers. So that in future they can increase the number of servers in the regions.


The Log files generate the details of IP address, time and date, the web URL, the port number and the browser details.

123.223.223.123 – – [13/May/2016:00:23:50 -0400] “GET /wallpapers/flowers.jpeg HTTP/1.0” 200 1031 “http://www.LinkedIn.com/” “Chrome/68.0 (Macintosh; I; PPC)”

123.123.123.101 – – [13/May/2016:00:23:50 -0400] “GET /wallpapers/flowers.jpeg HTTP/1.0” 200 1031 “http://www.LinkedIn.com/” “Chrome/68.0 (Macintosh; I; PPC)”

123.223.223.123 – – [13/May/2016:00:23:50 -0400] “GET /wallpapers/flowers.jpeg HTTP/1.0” 200 1031 “http://www.LinkedIn.com/” “Chrome/68.0 (Macintosh; I; PPC)”

123.123.123.100 – – [13/May/2016:00:23:48 -0400] “GET /pics/wpaper.gif HTTP/1.0” 200 6248 “http://www.LinkedIn.com/” “Chrome/68.0 (Macintosh; I; PPC)”

123.123.123.112 – – [13/May/2016:00:23:47 -0400] “GET /asctortf/ HTTP/1.0” 200 8130 “http://search.netscape.com/Computers/Data_Formats/Document/Text/RTF” “Chrome/68.0 (Macintosh; I; PPC)”

123.223.223.123 – – [13/May/2016:00:23:48 -0400] “GET /downloads/sample.gif HTTP/1.0” 200 4005 “http://www.nfl.com/news” “Chrome/68.0 (Macintosh; I; PPC)”

How to run the Agents:

Step-1: Start the flume agent.

flume-ng agent –conf conf –conf-file spool_dir.conf –name agent1 -flume.root.logger=INFO,console

Step-2: Simulate the input data stream.

Run the one of the following shell scripts in order to simulate data stream source relevant to each flume agent, from the Linux shell prompt.

To simulate data stream for data source spooldir

$ ./spool_files.sh log.txt

To simulate data stream for data source netcat

$ ./socket_stream.sh log.txt | telnet localhost 7777

To simulate data stream for data source exec for generating a continuously growing file.

$ ./tail_file.sh log.txt

Step-3: To check the output files in the HDFS location

Hadoop fs -ls /user/hduser/regexpfilter

Appendix

Attached are the following files.

Flume Downloads