Kafka: Gearing up as an Improved Platform for High Velocity Data

0

With the new additions of Kafka Connect and Kafka Streams in its recent releases, Kafka is gearing up to become a more comprehensive platform for handling high velocity data streams. A brief overview of Kafka is in order before we get into the new features.

Kafka Overview:

Apache Kafka was developed in LinkedIn and was open sourced in 2011. It is a distributed streaming platform that can:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system
  • Store streams of records in a fault-tolerant durable way
  • Allow processing streams of records as they occur

Kafka runs on a cluster of one or more servers (termed brokers) that can span even multiple data centres. The Kafka cluster stores streams of records (messages) in categories called topics. Each record consists of a key, a value and a timestamp. The data streaming through a topic is divided into different partitions and each partition is stored in a different broker of the Kafka cluster. These stored logs are retained for a specified period which is configurable. Multiple consumers can subscribe to the data written into a topic. A Kafka cluster can be depicted as below.

Kafka uses Zookeeper which is a coordination service for distributed applications. Zookeeper manages the brokers in that it maintains which brokers are alive and available and provides failure notifications. It also performs the leader election i.e. in case any broker goes down then it automatically chooses the leader for the partitions that are present in that broker from the available brokers that have the replicas of the partitions.

Also, since Zookeeper itself is a distributed application that runs on 3 or 5 or 7 servers, if one Zookeeper server goes down for any reason automatically, another server gets elected as the leader. This builds makes Kafka highly fault-tolerant.

With this background, we can now run Kafka and do some basic operations on it. If you are using Jigsaw Virtual Machine-2018, please note that Kafka 0.11 is already installed on it. If you need to install it, you can follow the instructions given on Kafka’s web site (http://kafka.apache.org/quickstart) and in a few  steps you get started.

We first need to start Zookeeper services with the following command in a terminal window:

zookeeper-server-start.sh <Kafka_Install_Dir>/config/zookeeper.properties

Next we can start Kafka server using the command below on another windows:

kafka-server-start.sh <Kafka_Install_Dir>/config/server.properties

Now we can create a topic giving the following command on another window.

kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic firstTopic

We would see the response:

Created topic “firstTopic”.

The following command lists the topics created so far.

kafka-topics.sh –zookeeper localhost:2181 –list

And the command below gives the description of the topic firstTopic.

kafka-topics.sh –zookeeper localhost:2181 –describe –topic firstTopic

Kafka would respond with:

Topic:firstTopic PartitionCount:1 ReplicationFactor:1   Configs:

Topic: firstTopic     Partition: 0    Leader: 0  Replicas: 0     Isr: 0

Now that we have created a topic, we can use a producer to publish data into the topic and use a consumer to consume the data. Kafka installation comes with a console-producer and a console-consumer. The console-producer takes the data from standard input device i.e. keyboard and publishes it to a given topic. And the console-consumer consumes the data from a given topic and displays it on standard output device i.e. on screen.

We can run the console-producer script on a terminal as below:

kafka-console-producer.sh –broker-list localhost:9092 –topic firstTopic

And enter some sample lines:

>Hellow world! This is the first message.

>And here is another message.

Let us now open another terminal and run console-consumer:

kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic firstTopic –from-beginning

We will see that the lines published by the console-producer to our topic are consumed and displayed on the terminal by console-consumer.

Kafka Connect:

Using Kafka as above to capture coming through data from standard input and writing it to standard output is a good start. However, we often need to capture data streaming from different sources like a server log or a port or a database and so on. Similarly, we may need to write data into a data store such as a file or a database etc. Instead of writing our own code for requirements like these, we can use Kafka Connect to import & export data to & from Kafka. It is an extensible tool that runs connectors, which implement the custom logic for interacting with many external systems.

Kafka Connect was introduced in Kafka 0.9 in late 2015 and improvements were added in version 0.10.1 and 0.10.2 in late 2016 and in March 2017.

Kafka Connect makes it easy to quickly define connectors that move large collections of data into and out of Kafka. It can ingest entire databases or collect metrics from application servers into Kafka topics, making the data available for stream processing with low latency. An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis.

To copy data between Kafka and another system, we initiate Kafka Connectors for the systems we want to pull data from or push data to. Source Connectors import data from another system (for example, a log file or a database into Kafka) and Sink Connectors export data (for example, the contents of a Kafka topic to an external file or to HDFS). The way Kafka Connect functions can be depicted as below.

Kafka Connect currently supports two modes of execution: standalone (single process) and distributed.

In the standalone mode, all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker is required (for example, collecting log files). Distributed mode has the benefit of fault tolerance.

Let us start a standalone process with a source connector that takes the data from a log file (test.log) and publishes it to a topic in Kafka (connect-test) and a sink connector that takes the data from the topic and writes it into a text file (sink.out). We first need to specify the properties file for the worker (the process that runs the connectors and the data import/export tasks) and then the properties files for two connectors. We can then start the process with the following command. If properties files are in the same directory, you need to give the full path of the files.

connect-standalone.sh worker.properties connect-file-source.properties connect-file-sink.properties

After giving the command, we can add a couple of lines into the file test.log as follows.

echo “Hellow world! This is the first message.” > test.log

echo “And here is another message.” >> test.log

And now, if we check the file sink.out, we will see that these lines are present in it. So, these lines are picked up by the Source Connector from test.log file, published to the Kafka topic connect-test, then consumed by Sink Connector and written to the file sink.out.

The worker properties file specifies the configuration parameters like the Kafka brokers to connect to, serialization format and frequency of committing the offset.

bootstrap.servers=localhost:9092

 

key.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true

value.converter=org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable=true

 

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

 

# this config is only for standalone workers

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000

The other two properties files specify the configuration of the source and sink connectors respectively as shown below.

 

name=local-file-source

connector.class=FileStreamSource

tasks.max=1

file=test.log

topic=connect-test

 

name=local-file-sink

connector.class=FileStreamSink

tasks.max=1

file=sink.out

topics=connect-test

Connector class (libraries) are available for a number of sources and sinks such as JDBC, HDFS, HBase, Cassandra, AmazonS3 and so on. An exhaustive list is made available by Confluent Inc., on their web site at: https://www.confluent.io/product/connectors/

Kafka Streams:

Kafka Streams is a client library for building applications and micro-services. It enables processing data streams coming into Kafka cluster and process it and store the output into the Kafka cluster. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.

Kafka Streams was introduced in Kafka 0.10 released in 2016 and was fully matured in Kafka 0.11 that was released in mid-2017. It can be a serious contender for Apache Spark Streaming and other streaming applications as well like Flink or NiFi. It is still new and evolving though we can expect the newer versions to maintain backward compatibility of the APIs.

In a typical scenario, data would stream into a Kafka Topic from data sources in an organization. It needs to be processed in real-time and stored into target systems. A Kafka Streams-based application plays the role on the right-hand side as shown above and publishes the processed or transformed data into an output topic.

Let us now look at Word Count application to understand how a Kafka Streams application works and how to write one. Source code for word count application is available at: http://kafka.apache.org/20/documentation/streams/. Note that the code is in Java-8 and uses lambda expressions. The core of the application has the following lines, after setting the properties like the name of the application, boot strap server name & port and serialization/deserialization parameters.

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> textLines = builder.stream(“TextLinesTopic”);

KTable<String, Long> wordCounts = textLines

.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split(“\\W+”)))

.groupBy((key, word) -> word)

.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(“counts-store”));

wordCounts.toStream().to(“WordsWithCountsTopic”, Produced.with(Serdes.String(), Serdes.Long()));

In the code we are:

Getting the data stream from the topic TextLinesTopic

Performing the following transformations on the records received

Convert the message to lowercase

Split it into words based on white space (“\\W+”)

Flatten the array of strings so that each word is available as a record

Group by the word itself so that the records are of the form (word1, (word1, word1)), (word2, (word2, word2, word2))…

Aggregate the groups with count

Write the result as a stream into output topic WordsWithCountsTopic

We can download the code, build the JAR file with a build tool like Maven and run it from an IDE such as IntelliJ or directly with java -jar command. We can also run it using kafka-run-class.sh script that comes with Kafka installation files.

Before running the application, make sure Zookeeper and Kafka server are running and:

Create the input topic TextLinesTopic

Create the final output topic WordsWithCountsTopic

Now run the application as mentioned above

Run kafka-console-producer giving the input topic name as the parameter and enter a couple of lines which will be published on the topic

Now run kafka-console-consumer giving the output topic as the parameter and passing serialization properties for the key & value of the records as the additional parameters.

We will now see that the word counts from the input lines are written to the output topic.

While the application is running, if we list the topics on Kafka using kafka-topics.sh –list command, we will notice two intermediate topics created internally by the Kafka Stream application. One of them is for repartitioning the stream as we are doing a group-by key operation. And the other one is for aggregation as we are doing a count aggregation.

Also, note that Kafka Streams offers high-level DSL for most common data transformation operations such as map, filter, join and aggregations out-of-the-box besides the lower-level Processor APIs. These transformations include stateless operations such as filter as well as stateful operations like aggregation or join etc.

Another very significant and useful aspect of Kafka Streams is that it provides the data abstractions –Kstream and KTable. A KStream is an abstraction of a record stream, where each record represents a self-contained unit of data in the unbounded data set. Using the table analogy, data records in a record stream are always interpreted as an “INSERT” – i.e. adding more entries to an append-only dataset as no record replaces an existing row with the same key. A KTable is an abstraction of a changelog stream, where each data record represents an update. More specifically, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (If a corresponding key doesn’t exist yet, the update will be considered an INSERT).

With the provision of these in-built tools, Kafka Streams enables us to write clean and efficient data streaming applications. It allows processing streams at record level thus facilitating sub-second response times and almost no latency unlike other systems like Spark Streaming applications that employ micro-batching of stream records.

Further, to get a better handle on Kafka Streams, there are several resources available. The following Github repository by Confluent Inc. has source code in Java and Scala for good use case scenarios including Word Count application with slight changes and tweaks to the abovementioned code.

https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java

In Conclusion:

As mentioned at the outset, with the introduction of Kafka Connect and Kafka Streams library, Kafka has become a very comprehensive data streaming platform enabling efficient application development. The following picture shows typical usage of data streaming applications on Kafka with these new tools.

 

 Lastly, Jigsaw Virtual Machine-2018 has Kafka 0.11 already installed as mentioned earlier and it has Maven as well and Java-8. So you can build the Kafka Stream word count application and the other applications in order to get more familiar with Kafka Streams and be able to develop any custom applications.