Streaming Architecture:

New Designs Using Apache Kafka and MapR Streams

by Ted Dunning & Ellen Friedman

Geo-Distributed Data Streams

For our final example of how to design stream-based systems, we focus on a specific requirement: geo-distributed replication of data streams. This capability is needed in a wide variety of sectors, including telecommunications, oil and gas exploration, retail, and banking, but we’ve chosen a transportation example—international container shipping—to show you how to plan the data flow for systems that require data to be replicated efficiently across distant locations.

For this example, we focus on how the design would work with MapR Streams because it has special capabilities that make it particularly well suited for this class of use cases. MapR Streams is distinctive in being able to:

  • Handle huge numbers of topics (hundreds of thousands or more with high throughput)
  • Organize a group of topics into a stream, which makes data management much easier since many topics can be managed together
  • Provide uni- and bi-directional replication easily and reliably across geo-distributed data centers

In our shipping example (or examples from any of the other sectors), many different processes in addition to the messaging could be taking place on the same cluster since MapR’s messaging feature is integrated into the data platform. But for simplicity and in order to keep our explanation focused on how the data streams are replicated to distant sites, we will just examine the messaging aspect of the architecture rather than all the analytics and persistence components.

As we work through this IoT transportation example, envision how this type of design would play out in your own projects. You may not have ships and containers, but you may well have data needs that are similar.

Stakeholders

Let’s set the scene: in international container shipping, different stakeholders are interested in using shipping data in different ways. How you would choose to organize data depends on the interest of each stakeholder. What data would be assigned to separate topics? How many topics? Which topics would be grouped together into streams? When would you use geo-distributed stream replication?

There is no single correct design for this container shipping example, but by exploring the implications of a particular set of design choices, you can better decide how you would assign data to topics and streams in your own examples.

In our hypothetical example, here are the players: we have a fictional giant shipping company, Big Blue, that owns a fleet of ships and hundreds of thousands of shipping containers. Big Blue is based out of Los Angeles, but its ships travel to ports all around the world, including Tokyo, Sydney, and Singapore; the latter city is shown in Figure 7-1. Big Blue not only ships cargo in its own containers, but it also leases space for containers owned by other shipping companies or by large manufacturers. Big Blue wants to know where its ships are, which containers have arrived at what ports, and what the environmental conditions are like on board and on shore. Branch offices of Big Blue are located in major ports, and they want to be able to share data among themselves as well as to report back to Big Blue’s headquarters.

ndsa 0701
Figure 7-1. Singapore is shown here at night. A large portion of the world’s container shipping passes through this port, where huge stacks of containers are monitored via sensor data while they are being loaded onto and off of ships. (Image © Ellen Friedman 2015)

Who else is interested in the shipping-related data? Another major group of stakeholders are manufacturers who are paying to ship goods with Big Blue. Back at their corporate headquarters, the manufacturers want to know the status of their goods in terms of what environmental conditions they are being exposed to during their voyage and during transfers as well as to track the progress of these goods to their destination. The recipients of the goods who’ve paid to purchase them also want this information, although perhaps in a less fine-grained manner. Port authorities at each stop have their own interests. They are not just interested in the ships and containers of Big Blue but rather in knowing exactly which ships of various companies have docked or departed, what containers have been delivered to the docks to be loaded, or what has been unloaded. These are just some of the major players.

Design Goals

Now that we’ve considered the interests of some of the major stakeholders, we can put together a short list of design goals. Again, we have greatly simplified this example so that we can more easily follow the geo-distribution aspect of data and how the messaging layer supports data flow in this design. Here are some of the things our design must do:

  • Reliably ingest, analyze, and retain very large-volume data from continuous events and measurements (including IoT sensor data) with high performance and low latency

  • Decouple consolidation and processing steps at data centers from delivery of data from sources such as sensors

  • Control access by easily managing who can see what data

  • Cope with intermittent transmission of ship-to-shore data while the ship is at sea

  • Efficiently replicate data streams across geo-distributed clusters, including between ports, from port to manufacturing or shipping headquarters, and from ship to shore

Design Choices

With these goals as a guide, we can begin to make design choices. For the simplified scope of our explanation, we’ll follow the data involved in shipping some plastic duck toys from their origin in Tokyo. The headquarters for the manufacturer is also located in Tokyo, and the company engages Big Blue Shipping to transport the toy ducks, as illustrated in Figure 7-2. To demonstrate our design choices, we’ll follow data as a ship arrives in Tokyo (A), takes on a load of toy ducks and other goods, then heads for Singapore (B). Some of the containers of ducks are off-loaded in Singapore, where a few will go to local retail outlets and the rest will await transfer to another ship to be taken to London (not shown on our diagram). The rest of the duck-filled containers will continue on the original ship to the port of Sydney (C) along with new containers owned by other companies that were loaded on in Singapore.

ndsa 0702
Figure 7-2. The diagram shows data flow for a container shipping company. Data from environmental sensors and tracking sensors on the containers is continuously streamed (black arrows) to an onboard cluster (white square) owned by the shipping company. When the ship arrives in a port, a temporary connection is made (dashed arrow) to stream data from the onboard cluster to an onshore data center cluster (not shown) owned by the shipping company. Streams are also replicated bidirectionally between data centers in different ports (double-headed gray arrows).

Our Design

Big Blue has equipped each of its ships with a small data cluster and a cell network. The onboard cluster continuously collects IoT data from sensors on the various containers as well as some sensors located on the ship itself. Each port also has a data center cluster that belongs to Big Blue. When a ship arrives near a port, it establishes a temporary connection and streams data from its onboard cluster to the cluster onshore at the port.

In our design, we assign a topic to each container (one ship can have up to tens of thousands of containers and therefore as many topics). These per-container topics are managed by putting them into a single stream that is replicated worldwide to all Big Blue facilities. In addition, because Big Blue is also interested in the travel history for each of its ships, we assign each ship a topic and manage these using a single, worldwide Big Blue ships stream. The per-ship topics serve much like an old-fashioned ship’s log. Ship-specific data could be contained in more than one ship-related topic; perhaps data for the ship’s location goes to one topic for that ship while environmental sensor data for the ship goes to another topic, and all are organized into the ships stream along with topics from other ships in the Big Blue fleet.

Follow the Data

What are the implications of this design relative to our design goals and the stakeholder’s needs? We’ll look at just a part of the system, with reference to Figure 7-2. Starting at stage A, our ship has been loaded in Tokyo, and its onboard cluster provides updates to the Tokyo Big Blue cluster with data about the ship (two topics in the ships stream) and about which containers have been loaded onto the ship, some with ducks and some with other goods (one topic per container; updates to thousands of topics in the containers stream). The Tokyo Big Blue cluster reports a subset of this information to the headquarters of the toy manufacturing company (labeled Corporate HQ in Figure 7-2). These updates are also propagated in near–real time to data centers in other ports and to Big Blue headquarters.

As the ship heads for Singapore, sensors on board continue to send messages to topics in the streams on the ship’s cluster. The ship does not normally communicate large amounts of data directly with any of the onshore clusters since satellite data transmission is so expensive. However, when the ship arrives in Singapore, container topics on the Singapore Big Blue cluster have already been updated about which containers were loaded in Tokyo. This was done directly between the Tokyo and Singapore clusters via the geo-distributed streams replication capability of MapR Streams. When the ship arrives in port, it establishes a temporary connection with the Singapore cluster and further updates it with event data collected during the passage from the containers and the ship itself. The geo-distributed streams replication is bidirectional, so this new information is copied back to Tokyo as well as on ahead to Sydney.

Some containers are offloaded in Singapore and new ones owned by someone other than Big Blue (depicted by a different color in our figure) are loaded on board. Sensors report which containers were left behind and which new ones were loaded on (updates to their topics for the containers stream or a new topic if the container is newly placed in service). Sensor data also confirms that the remainder of the containers are still safely on board.

Control Who Has Access to Stream Data

Here’s another aspect of how our design meets the design goals. The owners of the new containers may want access to the message data related to their containers, but Big Blue does not want them to have access to all of the data. Fortunately, MapR Streams enables fine-grained control over who has access to data. Access Control Expressions (ACEs) are assigned at the Streams level, so you could set up a separate stream for the yellow and red container topics. That way Big Blue provides a customer with access to topics related to their own containers while restricting data access to Big Blue streaming data.

Back to our ship: next, the ship heads for Sydney. As before, data reaches the next port before the ship does. Data that the ship uploaded to the onshore Singapore cluster will reach the Sydney cluster via MapR Streams replication. This replication is triggered by the updates to topics that took place in Singapore. When the ship arrives in Sydney, a temporary ship-to-shore connection once again is established, and data for events during the passage is delivered to the Sydney cluster.

While the ship is in port, sensors on containers continue to provide a flow of data to the onboard cluster to report their status as some containers are offloaded and some remain on board. This message flow from sensors is how the onboard cluster receives the information that triggers an alert when several containers of toy ducks slip off the back of the ship (stage C in Figure 7-2). This information will be replicated in seconds to the port clusters as well as back to Big Blue headquarters in Los Angeles. The managers will not be happy when they have to send a report to the toy manufacturer to tell them the fate of the lost toy ducks (check your local beaches to see where they end up.)1

Advantages of Streams-based Geo-Replication

In our “toy example” (pun intended) architecture, our use of a topic per container and topics grouped as a container stream means that the topic provides a continuous history of that particular container, even through its life on different ships or on docks in different ports. The organization into a stream is convenient because time-to-live, geo-distributed replication, and data access control can all be set at the stream level.

The huge number of containers to be tracked by an international shipping company would require the ability to handle up to hundreds of thousands of topics per stream or more. At present, MapR Streams is unusual among messaging technologies in its ability to handle that number of topics. The MapR Streams capability for multi-master geo-distributed replication is also distinctive and beneficial. We chose this particular example because it brings the issues of huge numbers of topics, intermittent network connections, streaming client fail-over, and geo-distribution into the foreground, but these issues exist in many situations without being quite so obvious.

1Our example is a nod to a real event in which toy ducks, turtles, beavers, and frogs, called “Friendly Floatees,” were lost at sea in the Pacific off a ship that sailed from Hong Kong in 1992. Some arrived in Alaska in late 1992. Most recent Floatee sightings were on UK beaches in 2007. For more information, see https://bit.ly/lost-ducks.