Kafka: Building a Real-Time Data Pipeline

Apache Kafka (latest version 0.8.2.1) is an open-source distributed publish-subscribe messaging system for data integration that was originally developed at LinkedIn and written in Scala. The project aims to provide collecting and delivering huge volume of log data with low latency for handling real-time data feeds through data pipeline (data motion from one point to another). The design is heavily influenced by log processing.

>> Kafka major components

Topic – A collection of messages of a particular type is defined by a topic (bucket for messages).
Partition – Split topic into multiple shard to balance load (scalability).
Producer – A producer can publish messages to a topic.
Broker – The published messages are then stored at a set of servers called brokers.
Consumer – A consumer can subscribe to one or more topics from the brokers, and consume the subscribed messages by pulling data from the brokers.

Figure-1 : Kafka Cluster

kafka-cluster

>> Kafka key features

  • High message throughput (1 million messages in a second)
  • It support both online (continuous consumption) and offline consumption (one time in a day) of the log data of all types.
  • Kafka run on cluster of servers each of which is called a broker.
  • Split topic into multiple partitions (sharded) to balance load (Scalability). Each broker stores one or more partitions.
  • Partition replication (the number of times a partition is duplicated) for fault tolerance (Reliability). One replica of each partition is selected as a leader partition, and all reads and writes go to this lead partition and the rest are called  followers. 
  • Messages in the partitions are each assigned a unique (per partition) and sequential id called the offset. See Figure-2
  • Persists/Retain (Durability) messages for a configurable period of time.
  • Consumer group is a collection of consumer instances. In a group, each partition will be consumed by exactly one consumer instance but one consumer instance can consume from more than one partition. See Figure-3
  • Kafka employs a pull-based consumption model which means consumer pull messages from broker whenever it want instead broker push messages to consumer as in traditional messaging system.
  • Reading messages from Kafka is very fast using the Zero-copy approach.
  • Consistent performance even in different work load (Performance) with O(1) disk structures that provide constant time performance. See also “Message persistence” below.
  • Communication between the clients and the servers is done with TCP protocol.

Figure-2. Partition offsets : Consumers track their pointers via (offset, partition, topic)

partition-offset

Figure-3. Partition & Consumer groups

consumer-group

>> Kafka key improvements

1. Message persistence

Any kind of data loss cannot be afforded. Kafka relies heavily on the filesystem for storing and caching messages. There is a general perception that “disks are slow” which makes people skeptical that a persistent structure can offer competitive performance.

To compensate for this performance divergence modern operating systems have become increasingly aggressive in their use of main memory for disk caching (Page caching). A modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. All disk reads and writes will go through this unified cache.

Thus it is better to use the filesystem and to rely on pagecache rather than maintain application’s in-memory cache. So the design is, immediately writing data to the persistent log on filesystem. That is data is transferred into kernel’s pagecache from where the OS can flush the data later into disk.

2. Consistent performance

Kafka maintain queue (does not use Btree) as it’s persistent data structure with O(1) time that provide constant time performance even with many terabytes (TB) of stored messages.

Let’s consider persistent queue data structure. Normally logging solutions use queue for simple read and as a data structure to append to files. The best part is the operations on queue have O(1) time complexity and there is no inter-dependency between the writes and reads, and they can occur simultaneously. It is also more performant, as the performance is not dependent on the data size. We can expand one of the servers by adding more low-cost disks.

Thus with the possibility of having huge disk space without any other side-effects helps Kafka provide few features distinct from other messaging systems. For instance, rather than delete a message instantly after consumption, we can now persist the messages for more time, say a week.

3. Message acknowledgement

Keeping track of what has been consumed, is, surprisingly, one of the key performance points of a messaging system. Kafka handed over this responsibility to consumer.

Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait for acknowledgement from the consumer. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else this state could go. Since the data structure used for storage in many messaging systems scale poorly, this is also a pragmatic choice–since the broker knows what is consumed it can immediately delete it, keeping the data size small.

What is perhaps not obvious, is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems. First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice. The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed). Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged.

Kafka handles this differently. Our topic is divided into a set of totally ordered partitions, each of which is consumed by one consumer at any given time. This means that the position of consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap.

There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a messaging system, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.

4. Message replication

In the literature, there are two typical approaches of maintaining strongly consistent replicas. Both require one of the replicas to be designated as the leader, to which all writes are issued. The leader is responsible for ordering all incoming writes, and for propagating those writes to other replicas (followers), in the same order.

The first approach is quorum-based. The leader waits until a majority of replicas have received the data before it is considered safe (i.e., committed). On leader failure, a new leader is elected through the coordination of a majority of the followers.

The second approach is in-sync replica (ISR) for the leader to wait for “all” replicas to receive the data before committed. When the leader fails, any other replica can then take over as the new leader.

We selected the second approach for Kafka replication for two primary reasons:

  1. The second approach can tolerate more failures with the same number of replicas. That is, it can tolerate f failures with f+1 replicas, while the first approach often only tolerates f failures with 2f +1 replicas. For example, if there are only 2 replicas, the first approach can’t tolerate any failures.
  2. While the first approach generally has better latency, as it hides the delay from a slow replica, our replication is designed for a cluster within the same datacenter, so variance due to network delay is small.

Figure-4. A Kafka cluster with 4 brokers, 1 topic and 2 partitions, each with 3 replicas
kafka_replication_diagram

>> How-to

Writes : Publish a message

First, the client (Producer) needs to decide which particular partition it needs to publish message to for the topic based on Partitioner class. To publish a message to a partition, the client (Producer) first finds the leader of the partition from Zookeeper and sends the message to the leader. If Zookeeper not used then, client no need to worry about figuring out which broker is the leader for the partition, the client (Producer) connect to a broker (any broker in the cluster). If the broker you connected is not the leader for the partition you need, your producer will get the leader info (called “topic metadata“) from the connected broker and reconnect to the correct leader broker and sends the message. The leader writes the message to its local log. Each follower constantly pulls new messages from the leader using a single socket channel. That way, the follower receives all messages in the same order as written in the leader. The follower writes each received message to its own log and sends an acknowledgement back to the leader. Once the leader receives the acknowledgement from all replicas in ISR, the message is committed. The leader sends an acknowledgement to the client. For better performance, each follower sends an acknowledgement after the message is written to memory. So, for each committed message, we guarantee that the message is stored in multiple replicas in memory. However, there is no guarantee that any replica has persisted the commit message to disks though. Given that correlated failures are relatively rare, this approach gives us a good balance between response time and durability.

Note: Producer needs to decide which particular partition it needs to publish message to. It may do so in a round robin fashion. It can also be done based on some semantic partition function (key based).

Reads : Consume a message

For simplicity, reads are always served from the leader. Only messages those are committed are exposed to the reader.

>> Kafka Vs JMS

  1. It’s designed as a distributed system that’s easy to scale out.
  2. It persists messages on disk and thus supporting batch consumers that may be offline, or online consumers that want messages at low latency.
  3. It offers high throughput (100k+/sec) for both publishing and subscribing.
  4. It supports multi-subscribers and automatically balances the consumers during failure.

>> Kafka Web UI features

  • Be able to easily see which brokers are up
  • Be able to see lists of topics, connected producers, consumer groups, connected consumers
  • Be able to see, for each consumer/partition, its offset, and more importantly, # of bytes unconsumed (== largest offset for partition – current offset)
  • (Wish list) have a graphical view of the offsets
  • (Wish list) be able to clean up consumer state, such as stale claimed partitions

>> Kafka source code

https://git-wip-us.apache.org/repos/asf/kafka.git

One thought on “Kafka: Building a Real-Time Data Pipeline

Leave a comment