• Apache Kafka
  • Technical
“Kongo” Part 2: Exploring Apache Kafka application architecture: Event Types and Loose Coupling

This is the second post in our series exploring designing and developing and example IOT application with Apache Kafka to illustrate typical design and implementation considerations and patterns. In the previous blog, we introduced our Instaclustr “Kongo” IoT Logistics Streaming Demo Application. The code for Version 1 of the Kongo application was designed as an initial stand-alone tightly-coupled prototype to demonstrate the functional behaviour. It won’t cope with any failures and won’t scale beyond a single server. In this post, our goal is to re-engineer the Kongo application to introduce Apache Kafka so that it is more reliable and scalable.

In order to get Kongo ready for Kafka we decided to make an intermediate design change to make it more loosely coupled. This involved (1) adding explicit event types, and (2) adding an Event Bus to introduce topics, and allow objects to publish, subscribe and consume events to the topics. See Version 2 of the Kongo application for the code details.

1. Event Types

Kongo has two main types of events: sensor events and RFID events. Sensor events are produced by warehouses and trucks and represent measurements of metrics at a particular time and place. RFID events are produced by RFID readers when goods are moved through a warehouse dock to and from trucks and are either UNLOAD or LOAD events. We added classes for RFIDLoadEvent and RIFDUnloadEvent which are initially just containers for the data including <time, warehouseKey, goodsKey, truckKey>. Both classes have the same fields but there is a difference in semantics (i.e. the truck is either loaded or unloaded).

We also added a Sensor event class with <time, warehouse or truck location, metric, value> fields. The simulation loop (Simulate.java) was changed so that explicit RFID Load and Unload and Sensor event objects are now created. But what should we do with them?

2. Loose-Coupling

An earlier attempt at a loosely-coupled communication system    (Source: Wikimedia)

To get the application ready for Kafka we need to make it loosely coupled in a way that will hopefully make it easier to be distributed across multiple processes and servers.  As an initial step towards this, we decided to introduce a publish-subscribe eventing pattern. Typically this means creating 1 or more event queues, defining what will happen when each type of event is received, setting up subscriptions to topics, and sending events to the correct event queues. We used the Google Guava EventBus to implement this. EventBus allows publish-subscribe-style communication between components (but isn’t a general pub-sub inter-process mechanism).

2.1 Sensor Events: One or Many Topics?

We tackled the sensor events first. The initial design was for a single sensor topic with all the sensor events (from warehouses and trucks) posted to it. E.g. in the simulate loop:

Sensor sensor = new Sensor(time, "SENSOR WAREHOUSE", warehouseKey, "temp", value);

sensorTopic.post(sensor);

In the original prototype code, there is an inefficient and tightly coupled call to a method to find all the Goods at the location of the sensor and check all the rules. Introducing loose coupling means that the sensor event producers don’t need to have any knowledge of what happens to the events anymore. But something does!

The initial loosely-coupled design registered all Goods objects with the topic when the Goods were first created. I.e.

EventBus sensorTopic = new EventBus(“sensorTopic”);

for (Goods goods: allGoods.values())

topic.register(goods);

The EventBus register method registers listener methods on the object passed in the argument. Listener methods must have a @Subscribe annotation and are specific to event types that will be posted. For example, the listener method for the Goods objects looks like this:

// subscribe Goods object listener to Sensor events

@Subscribe
public void sensorEvent(Sensor sensor)
{

System.out.println("GOT SENSOR EVENT! Object=" +  tag + ", event=" + sensor.toStr());
if (goodsInLocation(event.tag)

String v = violatedSensorCatRules(sensor);
// etc
}

This worked, but had the obvious problem that every sensor event was sent to every Goods object, which then had to check if it could ignore the event or not (based on location).  Here’s a diagram showing this “many-to-many” pattern:


This won’t scale to millions of Goods objects. One of the expected benefits of pub-sub is to enable consumers to subscribe to only a subset of events that are relevant to them. A simple improvement was therefore to have multiple EventBuses (topics), one topic per location, and then subscribe Goods to only the topic that they are located at. Here’s the code for publishing sensor events to the correct topic. For the full code see Github.

// Find the topic corresponding to the warehouse location

// and post the sensor event to only that topic

EventBus topic = topics.get(warehousekey);

Sensor sensor = new Sensor(time, "SENSOR WAREHOUSE", warehousekey, "temp", value);
topic.post(sensor);

This refinement worked well, and is highly scalable, even with increasing numbers of topics (e.g. 1000 warehouses + 2000 trucks = 3000 topics) and Goods objects (millions). Here’s the diagram for this pattern:

In theory there is a further refinement based on the observation that not all Goods objects care about every sensor metric (although eventually the code might not really be loosely coupled anymore). It may therefore be possible to further refine the subscriptions by metric type.

This isn’t yet a complete solution as Goods objects move around, and we need to run the co-location rules to check ifGoods of different categories are permitted to travel in the same truck. As they change locations, the subscriptions must be dynamically updated. We haven’t implemented RFID event topics and handlers yet, and as this is where theGoods movement events come from, this is obviously the next step.

2.2 RFID Events

The design for RFID events was to have an EventBus/topic for each of load and unload events, with the handlers defined in the RFIDLoadEvent and RFIDUnloadEvent classes (in hindsight 2 topics weren’t really necessary given that we had different RFID event types):

rfidLoadTopic = new EventBus("load");
rfidUnloadTopic = new EventBus("unload");
RFIDLoadEvent loadHandler = new RFIDLoadEvent();
rfidLoadTopic.register(loadHandler);
RFIDUnloadEvent unloadHandler = new RFIDUnloadEvent();
rfidUnloadTopic.register(unloadHandler);

What should the handlers do for each event type? For RIFDUnloadEvent the logic is as follows:

RFIDUnloadEvent: Move theGoods object from a truck to a warehouse

  1. Get the key of theGoods to unload from the event.
  2. Find the goods object given theGoods key.
  3. Find the location topic that theGoods key is currently registered with (truck).
  4. Unregister the goods from that topic.
  5. Find the location topic of the warehouse.
  6. Register the goods object with that topic.

What this sequence of steps achieves is that theGoods object will stop receiving sensor events from the truck location, and start receiving sensor events at the warehouse location.

The RFIDLoadEvent handler is slightly more complex as it must also deal with co-locatedGoods rules checking:

RFIDLoadEvent: Move theGoods object from a warehouse to a truck, and check for co-location rules violations

  1. Get the key of theGoods to load from the event.
  2. Find the goods object given theGoods key.
  3. Find the location topic that theGoods key is currently registered with (warehouse).
  4. Unregister the goods from that topic.
  5. Find the location topic of the truck.
  6. Create colocatedCheckEvent <time, goods, truck>.
  7. Post this event to the truck location topic.
  8. Register the goods object with truck location topic.

Steps 1-6 are similar to the step for the Unload events, but the remaining 2 steps are critical and are used to construct a new event type (colocatedCheckEvent) and post it to the truck location. This results in all the Goods objects that have already been loaded onto the truck checking their rules to see if they are happy with the new Goods object to be loaded or not.

The Goods class has a new method handler, colocatedRulesEvent(ColocatedCheckEvent event), which gets theGoods object to be loaded, and checks the co-location rules between “this” object and the object to be loaded. Note that we are now sending two event types to sensor topics, and each type has a different handler.

I initially (accidently) tried a simpler solution which just re-used the sensor location topics and added the RFID event handlers to theGoods objects. This also worked, but required each object to check if it was the intended recipient of the load/unload event. In practice it is inefficient as everyGoods object except the one being moved would unnecessarily receive the event and then just ignore it.

What’s still missing? We probably need to do something with the rules violations (sensor and co-location). The obvious thing would be to have a violation topic and publish the violation events in that.

Did these design changes make it easier to migrate the Kongo application to Kafka? Find out next blog.

A grammatical note: Goods is both plural and singular!

That makes sense as using Good for a singular Goods (which I have caught myself doing) could cause confusion, being an ethical judgement, in contrast to “Bad”).

https://www.wordhippo.com/what-is/the-singular-of/goods.html

xfgdfh