In Chapter 2: Stream-based Architecture, we established that at the heart of the revolution in design for streaming architectures is the capability for message passing that meets particular fundamental requirements for these large-scale systems. We recommended two technologies that are a good fit for the needed capabilities: Apache Kafka and MapR Streams. In this chapter, we examine in some detail Kafka, a pioneer in this style of messaging.
Apache Kafka started life as an engineering project at LinkedIn that was intended to bring order to the way that data moved between services. Most of the services at LinkedIn were originally designed to make heavy use of a relational database and to use remote method invocation (RMI) between Java processes where communication was necessary.
Unfortunately, both of these choices made it very difficult to deal with the rapid expansion of both the number of services and the amount of data being moved. Whenever one service needed to communicate with another, an adapter had to be developed and maintained. Moreover, each adapter tended to make the modification of both sender and receiver more difficult since every pair of communicating services effectively exposed a bit of the implementation of each to the other. The result was that it was incredibly difficult to update systems. Just as important, it was very difficult to move as much information between services as was needed.
Systems like SOAP, CORBA, or Java’s RMI have long been based on the assumption that strict version control and strict type-safety were key to interprocess communication, but the experience of the LinkedIn team and many others over the last decade or so is very different. The crux of the problem that the LinkedIn team experienced is that as services start to communicate, if they use any form of strong typing contract on the interaction (such as a strictly versioned API, or a database schema), then each service acts as a bit of an anchor on further development or modification of the other service. Before long, the anchors can come to outweigh the services. These dependencies become a burden.
Realizing their predicament, the engineers at LinkedIn decided that they needed a consistent mechanism for communication between services that avoided the creeping problems they were facing. It was clear that the communication had to be asynchronous and message-based. In order to further decouple senders and receivers of messages, it was deemed important to have all messages persisted. This requirement for persistence combined with the required throughput, however, made conventional messaging systems infeasible in exactly the way that we saw in the previous chapter on micro-services.
Kafka adopted many of the basic ideas and much of the design of conventional message queues. Producers send messages to a message queue (or topic) that is identified by a topic name. Consumers read messages from topics and can arrange to be notified when any of a number of topics to which they subscribe have new messages.
There are, however, some key differences in how Kafka is built compared with older messaging systems. Key technical innovations have allowed Kafka to solve the problems of building a feasible message-passing layer for a large-scale service architecture.
Key technical innovations of Kafka:
This eliminated the need to track acknowledgements on a per-message, per-listener basis and allowed a reader’s operations to be very similar to reading a file.
This eliminated the requirement to track when readers have finished with particular messages by allowing the retention time to be set so long that readers are almost certain to be finished with messages before they are deleted.
While the actual offsets for committed messages are stored by Kafka (using Apache Zookeeper), offsets for independent consumers are independent. Applications can even manage their offsets outside of Kafka entirely.
The technical impact of these innovations is that Kafka can write messages to a file system. The files are written sequentially as messages are produced, and they are read sequentially as messages are consumed. These design decisions mean that nonsequential reading or writing of files by a Kafka message broker is very, very rare, and that lets Kafka handle messages at very high speeds.
Before talking in detail about how to use Kafka, it is helpful to settle a bit on the players in a Kafka system and the roles they play. Overall, Kafka presents a particularly simple model to users, but the nomenclature may be a bit unfamiliar.
Think of these terms from the viewpoint of Kafka itself. Producers send messages to a Kafka broker, which is one server in a Kafka cluster. These messages can be read by consumers. This general arrangement is illustrated in Figure 4-1.
A Kafka broker is responsible for taking care of messages in transit. Messages contain bytes in an application-defined serialized format and are associated by the producer with a topic, which is a high-level abstraction for grouping messages.
The use of a topic helps the consumers of messages find messages of interest without having to read lots of uninteresting messages.
The broker stores and forwards messages for many topics, and messages can be sent to a single topic by multiple producers. The producer will buffer a number of messages before actually sending them to the Kafka broker. The degree to which messages are buffered before sending can be controlled by the producer by limiting either the number of messages to buffer or the time that messages are allowed to linger before being sent.
A consumer ultimately reads these messages. In the simplest case, all messages sent to a topic are read by a single consumer in the order that the broker received them. If it is necessary to have higher throughput in consuming messages, it is possible to divide a topic into multiple partitions. When this is done, the consumer can use multiple threads that may even be spread across multiple processes to read the messages from a topic, but the ordering of messages in a topic will only be preserved within a single partition, and the number of threads cannot be larger than the number of partitions. The producer can control the assignment of messages to partitions directly by specifying a partition or indirectly by specifying a key whose hash determines the partition.
Importantly, messages in a topic partition are ordered, and there is no provision for consumers to acknowledge individual messages out of order. A consumer has a read point that determines which message will be read next. Reading messages advances that read point automatically, but the read point can also be explicitly set to the start of any specific message, to the earliest message the broker has or to the end of the latest message that the broker has. Even so, the messages after the read point are read in order until the read point is explicitly repositioned. This file-like API is very different from the traditional sort of message queuing API in which messages can be read and acknowledged in any order.
Another major difference between Kafka and traditional messaging systems is that persistence of messages is unconditional. Moreover, messages are retained or discarded in the order they were received according to the retention policy of the topic, without any regard paid to whether particular consumers have consumed the message yet.
The one exception is that when old messages in a topic are about to be deleted, they can be compacted instead. With compaction, a message is retained if no later message has been received with the same key, but deleted otherwise. The purpose of compaction is to allow a topic to store updates to a key-value database without unbounded growth. When a single key has been updated many times, only the latest update matters, since any previous update would have been overwritten by the last update (at least). With compaction, a topic cannot grow much larger than the table being updated, and since all access to the topic is in time order, very simple methods can be used to store the topic.
Kafka’s APIs have undergone a significant evolution over time. Originally, the APIs were very bare-bones, with significant complexity forced back onto the programs using Kafka. Subsequently, separate low- and high-level APIs were developed, but there was never a clear separation between the two, and it was common to need to use both high- and low-level APIs to accomplish fairly standard tasks.
With the recent 0.9 release of Kafka, the low- and high-level APIs have been merged into a single coherent API that simplifies how clients need to be written. This makes using Kafka considerably more intuitive. All new applications should use the 0.9 APIs (or later versions as they are released) if at all possible.
With Kafka, all messages to consumers are sent via the broker by using a
KafkaProducer. As they are sent, messages are assigned to topics by the sending process. Within topics, messages are assigned to partitions either explicitly or implicitly via the hashcode of the key associated with the message.
Each instance of a
KafkaProducer represents a separate connection to the Kafka broker, but there is typically little or no speed advantage to having multiple instances. Each
KafkaProducer is thread-safe, so no special consideration is needed when using a single instance in multiple threads.
All messages in Kafka are sent asynchronously; that is, the messages are not actually sent over the network to the broker until some time after they are given to the
KafkaProducer to send. Instead, they are buffered until the buffer fills (
buffer.size in the Kafka configuration), or until a specified time period has passed (
linger.ms in the Kafka configuration). By default, the buffer size and timeout are set quite low, which can impair throughput, but tends to give fairly good latency.
When data is sent from the producer to Kafka, there are differing degrees of durability guarantees that are possible via different configurations. These are controlled primarily by the
acks (in the producer configuration) and
min.insync.replicas (in the topic level configuration). At the lowest level, a message only needs to be sent before being acknowledged. Slightly better than this, you can require that a message be acknowledged by at least one broker. At the highest level, all brokers holding up-to-date replicas of a topic must acknowledge the receipt of a message.
Generally, we would strongly recommend starting with
acks=all. The result is that all acknowledged messages will be on all of the up-to-date copies of a topic, and there will always be at least two such brokers for all acknowledged messages. This policy is similar to the policy used in the MapR file system and guarantees that if at least one up-to-date replica survives, no data loss will occur. Further, no single node or disk failure will cause loss of acknowledged messages.
There are a number of apparently performance-related properties that can be manipulated for a
KafkaProducer. Mostly, the defaults for these are adjusted for good latency, but very moderate changes to just a few parameters can substantially improve performance. In particular, it helps to buffer more records (controlled by
batch.size) and to wait a short bit before sending those buffered records off to the broker (controlled by
linger.ms). For throughput-sensitive applications, increasing these parameters to 1 MB and 10 milliseconds, respectively, has a substantial impact on performance. For instance, in a small benchmark, 4 seconds were used in creating 1 million records without sending them, and it took 24 seconds to create and send them using the default parameters. Changing
linger.ms as recommended here decreased the runtime to about 8 seconds for about a 5x improvement in throughput. Increasing the parameters well beyond these settings had essentially no effect. Your results with real applications will differ, but it is clear that more buffering and just a bit more lingering have a substantial impact.
Once you have a
KafkaProducer object, you can send messages to the broker using the
flush methods. The
send method simply copies the message to an internal buffer, which is automatically sent to the broker according to the policies applied to the
KafkaProducer or when
flush is called explicitly. Note that because the
send method is completely asynchronous, there is no way for it to return the result of trying to send a message to the broker. The
send method does, however, return a future that you can wait for, and there is a version that allows a callback to be passed with the message being sent. Both the returned future and the callback can be used to determine whether a message was sent successfully.
As we mentioned earlier, each message sent to the broker winds up in one of the partitions of the topic that it is sent to. The producer that sends the message gets to decide how the partition is chosen based on which version of the
send method it uses. The partition can be specified directly and explicitly by providing the partition number, or the hashcode of a key value or a round-robin assignment to partitions.
When you are sending lots of data to the broker, it will likely not help to call
flush, since the data is going to be flushed very shortly in any case due to the amount of messages being sent. On the other hand, when you are sending very few messages, it may improve latency a bit to explicitly call
flush also helps if you need to know that messages have arrived at the broker before sending other messages.
The GitHub project contains a simple message producer in the
In many ways, the consumer side of Kafka is trickier to code well than the producer side. The major areas that people have trouble with are the concept of consumer groups as they relate to partitioning of topics, the question of which messages have been processed by a failed process, and how consumer configurations can have surprising effects on the level of throughput that a consumer can achieve.
The point of consumer groups is to allow a controllable mixture between universal broadcast of all messages (which is helpful for adding new kinds of message consumers) and designating a single handler for each message (which is helpful for implementing parallelism in processing messages). Kafka uses the concept of a consumer group to mediate between these two extremes. All messages go to all consumer groups who subscribe to a topic, but within a consumer group only one consumer handles each message.
In addition, rather than allowing complete flexibility about which messages are sent to which consumers, Kafka requires that a topic be divided into partitions at the point of production. Thus, the producer of a message decides which topic and which partition a message is sent to, but the consumer group designates exactly which consumers handle which partitions of a topic.
The relationship of producers, topics, partitions, consumer groups, and consumers is illustrated in Figure 4-2.
While Kafka provides ordering guarantees for messages within a single topic partition, applications should be careful about depending too much on transaction ordering. In Figure 4-2, for instance, messages m1 and m2 might be sent from
producer1 on the same partition as used by
producer2 to send messages m3 and m4. If this happens, m1 will always arrive to whichever consumer receives it before m2 arrives, but it is very hard to make any statement about whether m1 arrives before or after m3. It is best to code very defensively in such situations. If both producers are running on the same node, this should be easy, but if they are not, then it may be worthwhile investing in very high-precision clock synchronization.
One of the best defenses against confused ordering is a high-precision clock.
Another major confusion that new users have when writing a consumer has to do with the idea that messages may have been consumed, but not yet acknowledged by the consumer to have been consumed. The confusion comes from the fact that individual consumers keep track of the offset of the next message to be read, but they don’t write that offset back to Kafka very often because it can be expensive for the broker to keep track of these updates. Kafka uses the terms “current offset” and “committed offset” to describe these two concepts. If a consumer continues to process messages in an orderly fashion and ultimately exits in a well-behaved manner by committing the current offset, then there won’t be any problems. The fact that the current offset is ahead of the committed offset much of the time just won’t matter.
On the other hand, if a consumer crashes without committing its offset back to the broker, then when another consumer starts processing the same partition, it will start from the previously committed offset, thus running the risk of processing messages twice (if the offset is committed after processing a batch of messages) or not at all (if the offset is committed before processing a batch of messages). Since it is generally impossible to make the processing of messages and the committing of the offset into a single atomic operation, there is pretty much always going to be a risk of not processing all messages exactly once.
The final confusion that strikes newcomers is the way a consumer decides what to read.
Default parameters that define how the consumer decides what to read and when to read it are tuned in Kafka 0.9 for low latency on topics that do not have a large amount of messages per second. This tuning can lead to problems like the one shown in Figure 4-3, where throughput is high for a short time, but then crashes for several seconds.
The key to dealing with this kind of instability is to set
receive.buffer.bytes to a large enough value so that processing can proceed at a sustained high rate. It is also common to need to increase the values of
max.partition.fetch.bytes. Figure 4-4 shows how much better things can be when these consumer configuration values are set appropriately.
One potentially confusing aspect of the most recent release is that Kafka retains an older API for consumers that is a melding of a very old API and a newer API that did not completely replace the functionality of the old API. This results in a confused programming model. More importantly, the old API exposed access to the implementation details of Kafka in ways that seriously hampered efforts to improve performance and made it particularly difficult to improve reliability and security.
The older API has been retained in the 0.9 release, but should not be used for new applications. Instead, the newer style of interaction, often called the 0.9 API should be used. The sample programs we wrote are good examples of the basics of how to use the 0.9 API.
Kafka comes with a variety of utility programs that help administer a Kafka cluster as well as some basic diagnostics. These utility programs can be used to start broker processes, create topics, move partitions or topics around in a cluster, and inject or display messages in a topic.
Two areas of particular importance have to do with (a) balancing of partitions and loading across a cluster and (b) the mirroring of topics to other clusters.
Kafka has a particularly simple model in which each partition in a topic resides in its entirety on a broker, possibly with several other brokers acting as replicas of that partition. Partitions cannot be split across machines. This can become a serious problem because Kafka does not automatically move partitions to balance the amount of load or space on brokers. Likewise, if you add new nodes to a cluster, you have to migrate data to these new nodes manually. This can be a tricky and labor-intensive task since it is difficult to estimate which partitions are likely to grow or cause high loads.
Mirroring of data between Kafka clusters can be done using a utility called MirrorMaker. When it is run, MirrorMaker starts a number of threads that each subscribe to the topics found in a Kafka cluster. Multiple MirrorMaker processes can be run on different machines to get the benefits of higher parallelism and failure tolerance. Each MirrorMaker thread consists of a consumer that reads from the Kafka source cluster and a producer that writes to the destination cluster. There can be multiple source clusters. MirrorMaker makes use of consumer groups to allow traffic to be balanced across threads and servers.
There are some things that you should watch out for with Kafka mirroring, however.
With Kafka, there is no inherent connection between the source and the destination clusters other than the fact that messages that are written into a mirrored topic in the source cluster will be read from that topic by MirrorMaker and then sent to the same topic in the destination cluster. Offsets and consumer read offsets will not be preserved, and the mirror cannot be used as a fallback for consumers in a disaster recovery situation. Offsets will not be preserved, and the mirror cannot be used as a fallback for consumers in a disaster recovery situation.
For the same reasons, mirroring of any topic in Kafka has to be set up as a tree rather than as a general graph. The major issue is that if there are any cycles in the replication pattern, the amount of traffic will grow exponentially because duplicates are not detected during mirroring.
Kafka has made a huge difference in how easy it is to realize the promise of streaming architectures, but it is still a work in progress. Kafka is hampered in some respects by the fact that it is building from a very low foundation with respect to features such as data replication, fault tolerance, and geo-replication. This situation makes progress on some capabilities challenging.
That said, Kafka has made huge progress in a short time and clearly fills an important role, at least partially. Kafka is particularly good on first impression, and understandably so. You can download Kafka, check out a sample program, and have a sample application running in well under 10 minutes (I just checked).
Kafka also scales very well, simplifies the design of large systems, and is relatively easy to use, especially with the 0.9 API. It’s no wonder that Kafka has a large and growing community of users. What’s not to like?
There are a fair number of significant issues that can crop up with Kafka as it goes into production at scale. Some of these issues are inherent in the current design of Kafka and are unlikely to change in the near future. Others are more likely to get better over time.
The number of topics that can be handled by a Kafka cluster has a soft limit that starts to degrade operations at around a thousand topics. These problems are likely substantially tied to the fundamental implementation decisions that underpin how Kafka works. In particular, as the number of topics increases, the amount of random I/O that is imposed on the broker increases dramatically because each topic partition write is essentially a separate file append operation. This becomes more and more problematic as the number of partitions increases and is very difficult to fix without Kafka taking over the scheduling of I/O. Just above the current limits on number of partitions, there are likely other limits waiting, some fairly serious. In particular, the number of file descriptors that a single process can open is typically limited.
In order to be better prepared for production settings, Kafka needs to solve most of the problems inherent in file system design. That could well take several years of persistent effort to fix.
Keep in mind that the practical impact of having a limited number of topics varies by application and design. Most applications using Kafka so far have little trouble with the limit. The limitation may be more of an issue when Kafka is used as a multi-tenant resource since different users are likely to each want to use a fair number of topics themselves.
A limit on the number of topics also constrains the solution space that you can use and prevents Kafka from being a primary or archival store for many applications. For instance, if you are an electrical utility doing smart metering, it would be plausible to design a system in which each meter has a separate topic. The question of such a design is moot with Kafka, however, because having millions to hundreds of millions of topics is just not plausible.
Partition replicas in Kafka must each fit on a single machine and cannot be split across multiple machines. As partitions grow, it is expected that some machine in your Kafka cluster will have the bad luck of having multiple large partitions assigned to it. Kafka doesn’t have any automated mechanism for moving these partitions around, however, so you have to manage this yourself. Monitoring disk space, diagnosing which partitions are causing the problem, and then determining a good place to move the partition are all manual management tasks that can’t be ignored for a production Kafka cluster.
This sort of management can work when clusters are relatively small and data is small with respect to available space, but it can also become completely unmanageable if your traffic is subject to rapid growth or you don’t have top-flight system administrators. Even worse, any instability in this respect or perceived competition for resources can cause DevOps teams to switch to using their own Kafka clusters, thus defeating the entire point of streaming data as an infrastructural resource. The requirement that all replicas of a partition must fit on all of the brokers holding it also makes Kafka unattractive for long-term archiving of data.
Fixing this will require a number of fundamental revamps to Kafka’s architecture, but it is plausible that the basic implementation of Kafka could be changed so that the files that make up a partition are not constrained to being on a single machine. Doing this well would mean that the brokers in a cluster would have to maintain knowledge not only about where the master broker for a partition is, but would instead would have to remember where every small segment of every topic partition is located and which broker is serving as master for the current segment.
Doing this housekeeping is plausible, but would require a substantial effort to fix, and there are number of subtle points in handling this well. The issue of load and space balancing is handled differently with MapR Streams technology, as we’ll describe in Chapter 5: MapR Streams.
Kafka doesn’t have a favored solution to the problem of serializing data structures. This means that different developers tend to pick different serialization methods. It is crucial to successful use of Kafka that there be consistency in how messages are serialized between services that need to communicate.
The general argument against having a favored serialization convention is typically based on worries about performance of any general mechanism and about not having the benefit of future developments. In the context of a single organization, the arguments often boil down to issues of traditional usage. If there is a strong culture and established expertise that already supports one serialization framework, it is hard to imagine that a different serialization framework would be better enough to justify having a second framework.
On the other hand, having all serialization be external to Kafka forces data to be copied at least one more time than necessary. This can substantially impair performance of the messaging system.
Regardless of whether Kafka should have a strong convention in favor of one serialization system or another, it is imperative that each organization have a strong preference to avoid a messaging Babel. The gotcha isn’t so much a system issue as a social one.
Establishing a strong convention early pays huge dividends later.
The mirroring system used in Kafka is very simple—and for many enterprise applications, a bit too simple. By simply forwarding messages to the mirror cluster, the offsets in the source cluster become useless in the destination. This means that producers and consumers cannot fail over from one cluster to a mirror. This ability to fail over is often considered required table stakes in enterprise systems and not having it may completely preclude getting the benefits of Kafka.
Kafka’s mirroring design could cause similar headaches in the design of a chain of replication. Because the source cluster and the mirror don’t really know anything about each other, any cycle in the replication of messages from cluster to cluster will cause messages to be mirrored over and over. Disaster will follow in short order.
Mirroring without cycles and without splits means that the replication pattern is a tree, and that is a very brittle design. Practically by definition, losing any link in a tree causes a partition. Worse, because of the ad hoc nature of mirroring in Kafka, it is essentially impossible to route around a lost step in a replication chain. This means that while data may be mostly preserved in these situations, it may not be clear which data is preserved.
The lack of cycles in mirroring patterns in Kafka also means that mirroring cannot be used to create multi-master systems in which you can pick any mirror to update. Multi-master replication is often considered a critical requirement in enterprise settings. Mirroring is handled very differently in MapR Streams, and a comparison may be useful.
Kafka has broken new ground as an early and innovative solution for streaming architecture. Kafka satisfies many of the requirements for high-throughput, single data–center messaging in support of microservice architectures. The API introduced in the 0.9 release is easy to use. Kafka does, however, require significant amounts of care and watering to manually manage storage space and distribution.
Given the strength of this design, there is naturally widespread interest in Kafka and Kafka-esque approaches. For multi-data center deployment, you may find that Kafka has significant issues. In those situations, keep in mind that programs written with the Kafka 0.9 API also run on MapR Streams. That flexibility in the Kafka API may provide a solution for the issues associated with running Kafka in a geo-distributed setting.