NetApp Closes Acquisition of Instaclustr Read the announcement
  • Cadence
  • Apache Kafka
  • Technical
Spinning Apache Kafka® Microservices With Cadence Workflows

In “Spinning your Workflows With Cadence!” we explored Cadence, a new scalable, developer-focused, open source workflow engine. Cadence is a great solution for highly fault-tolerant stateful workflow management, also known as “orchestration” (based on the idea of a conductor directing an orchestra). However, another popular architectural alternative for large-scale message-based distributed systems uses a loosely-coupled approach, known as “choreography” (based on the idea of dancers interacting with their immediate neighbours). Apache Kafka is a common example and enables large numbers of microservices to communicate with each other scalably and at low-latency, with no central state management required.

However, in many enterprises, it is common to have a proliferation of Kafka microservices already, and typical modern enterprise systems use multiple technologies, so more realistically you need to combine both architectural styles and integrate Kafka microservices with Cadence workflows. Rather than having just an orchestra or just dancers, you really need to perform a ballet which combines both.

Uber Cadence

(Source: Shutterstock)

There are potentially a number of different Use Cases for Kafka plus Cadence, but in this blog, we’ll focus on the benefits of reusing Kafka consumer-based microservices from Cadence Workflows, i.e. how do you send a message from Cadence to Kafka to enable some activity in Kafka to occur, and asynchronously wait for, and eventually receive, a response? For example, in my Anomalia Machina blog series, I implemented an anomaly detection system as a Kafka consumer.

What if we want to perform an anomaly check as a step in a Cadence workflow?

Sending a message to Kafka just involves using a Kafka producer inside a Cadence activity method (but with the right meta-data to ensure a response can be sent/received). However, there are a couple of different approaches to waiting and obtaining a response. Let’s take a closer look.

1. Sending a Message from Cadence to Kafka

(Source: Shutterstock)

Luckily sending a message to Kafka is a bit easier than tossing a bottle overboard in the hope that the message inside will be received.

First, prepare your “bottle” (the Kafka producer), I used a properties file like this:

The producer.properties file is configured as follows for an Instaclustr Managed Kafka service (you’ll need to fill in the cluster specific details from the Instaclustr console or management API):

 

Then, define a Cadence activity method which sends a message to Kafka as follows:

Note that in the Kafka Record we set the destination topic, a null key, and the message value. But we use the Kafka Header to pass the workflow id to Kafka so it knows where the message came from. A header is a key-value pair, and you can have multiple headers for each Kafka record. We’ve seen Kafka headers used before, for supporting OpenTracing.

  Why did we use headers for the workflow id rather than using say the record key?

Well, I’m assuming that we want to reuse existing Kafka microservices, so we want to minimize the impact on any changes on the Kafka side. Existing Kafka consumers will have assumptions about the record key and value, but the idea behind Kafka headers is that you add optional meta-data, which won’t break existing implementations, but implementations can be easily enhanced to understand the meta-data and respond appropriately.

If all you are interested in doing is sending a message to Kafka and then continuing on with the rest of the workflow (“fire and forget”), then your job is done. In fact, you don’t need to bother sending the workflow id if you don’t expect a response (unless there is some other reason the Kafka consumer needs to know that the message originated from Cadence). The workflow (using BPMN-like notation) looks like this (with an optional 2nd task etc.).Kafka Signal

However, we assume that our use case involves some processing occurring on the Kafka side in response to the message being received, with a result that needs to be received by the workflow again before proceeding with other activities.  How can we complete the loop?

2. Introducing Cadence Signals

Aldis lamps are used for ship to ship signalling at sea

Aldis lamps are used for ship to ship signalling at sea. https://commons.wikimedia.org/wiki/File:Seaman_send_Morse_code_signals.jp

To get a response back from Kafka we need something better than messages in bottles. Cadence Signals are modeled on UNIX-like IPC (inter-process communication) signals, which allow messages to be sent to a process or a specific thread to notify it of an event (e.g. “kill -9 <pid>”!). In a recent blog on Apache ZooKeeperTM and Curator, we also came across the use of signals (semaphores, section 3.3).

The documentation says that Cadence Signals:

“… provide a fully asynchronous and durable mechanism for providing data to a running workflow. When a signal is received for a running workflow, Cadence persists the event and the payload in the workflow history. The workflow can then process the signal at any time afterwards without the risk of losing the information. The workflow also has the option to stop execution by blocking on a signal channel.”

In my first Cadence blog, we discovered that every workflow has an interface and an implementation, with potentially multiple methods, and there’s only one method with the @WorkflowMethod annotation, indicating which method is the entry point for the workflow. However, there can be other methods annotated with @SignalMethod. For example:

And the implementation of both these methods:

You can have multiple different signal methods, but for this example I just have one signal method for simplicity. Every time an ExampleWorkflow.signalKafkaReply signal is delivered to a workflow instance, the corresponding signal method is invoked. Signal methods never return a value, and typically do something as a side effect, in this case set the message variable to the value of the signal argument.

The final thing to note here is that we’ve implemented the startWorkflow  method, which now calls the sendMessageToKafka activity method, but then immediately blocks using the Workflow.await() function, which blocks until the function it receives as a parameter evaluates to true, i.e. until this workflow receives a signal with a non-null message value. Note that because of the way that Cadence is designed, the condition is going to be evaluated only on workflow state changes (i.e. it’s not polling). 

So how do we use Cadence Signals to integrate with Kafka?

2. Using Cadence Signals to Integrate With Kafka: First Approach

Here’s the workflow that we are aiming for using Signals:

Integration using signal

It’s now time to see how the complete pattern works, including sending a message to Kafka, waiting for a response, and signalling the correct workflow instance from Kafka. We’ve already covered the first two parts above, so the final part of the puzzle is how a Kafka consumer can signal a particular workflow instance. In the Kafka producer above we sent the workflow id in the Record header. This enables the Kafka consumer to retrieve this information and

  1.  realize that the record originated from Cadence, and
  2.  send the response back to the correct workflow.

 The Kafka consumer needs to be configured to connect to both the Kafka cluster that the record was sent to, and also the Cadence cluster that manages that workflow (resulting in KafkaConsumer and WorkflowClient objects). The main polling loop of the Kafka consumer looks like this:

Note that this is just a demonstration example, so the Kafka consumer doesn’t actually do any processing of the request, and the response is just a String which includes the request value. If “Cadence_WFID” is found in the header then we first create a new stub to the workflow using WorkflowClient.newWorkflowStub() with the ExampleWorkflow.class and the workflow id, and then send the signal to it by calling the signal method; else normal processing and delivery of results can occur. Once the workflow receives the signal, it unblocks and proceeds using the response and with any other activities as required.

 

Note that this example assumes that only one signal will be sent as a reply for each message sent to Kafka, but Kafka can have consumer groups, which in theory would result in multiple signals being sent. For this example, it doesn’t matter in practice, as only the first signal is important, however, for other examples it may be necessary to handle multiple signals correctly.

Cadence signals are documented here and here.

3. Cadence Activity Completions: Second Approach

Sydney Bridge
Completion of the “join the arch” activity on the Sydney Harbour Bridge. 

https://commons.wikimedia.org/wiki/File:SLNSW_44281_Sydney_ferry_Kubu_passes_near_the_Harbour_Bridge_as_the_lower_chord_is_ready_for_joining.jpg circa 1930

Cadence Asynchronous Activity Completion is an alternative approach to solve the same problem. Cadence Activity Completion is documented here, here and here. In fact, from the documentation it seems to be a perfect match:

Sometimes an activity lifecycle goes beyond a synchronous method invocation. For example, a request can be put in a queue and later a reply comes and is picked up by a different worker process. The whole request-reply interaction can be modeled as a single Cadence activity.

How is this approach different from signals? The main differences are that blocking now occurs (conceptually) inside the activity method, rather than in the workflow itself, and the completion call is associated with the activity rather than the workflow. However, it’s still “point to point” communication, and works across process boundaries. Here’s what this workflow will look like:

Kafka integration completion

The main workflow implementation is trivial now as we don’t need to wait, or implement a signal method:

 

However, the activity method is slightly more complex as follows:

Instead of the workflow id which we used with signals above, we now get the task token and put that in the Kafka Header as the value of the key “Cadence_TASKID”. The message is sent to the Kafka topic with a Kafka producer as before. But then the activity indicates that it will not be “completed” on return with the Activity.doNotCompleteOnReturn() method. And the return value is “ignored” (it can be anything you like)—what does this mean? Basically, that any value returned from the activity method is discarded, and eventually magically replaced by the value supplied by the completion call.

There are also some changes in the Kafka consumer as follows:

First, we need to create an ActivityCompletionClient using the WorkflowClient instance (that we assume we have created with the correct Cadence cluster settings). Then if the “Cadence_TASKID” key is found in the Kafka Record Header for an event, then we call the completion method with the task ID value, and some response value. This response value is returned from the activity method that is blocked waiting for the completion to occur.

Note that we could have a handler in the Kafka consumer for the signal approach as well as the completion approach, and if neither is present in the header then normal processing can occur.

I was still curious about completions, so I asked our developers for further enlightenment (thanks Kuangda, Matthew, and Tanvir). Here’s a composite explanation: 

Completion is a useful concurrent programming pattern for waiting for an external callback to provide the result (push), instead of polling (pull) for the result. The initial return value is ignored because it’s actually returning a special error (using a Golang error handling pattern), indicating that the result is still pending, which will be supplied by the completion callback.

The Cadence go client documentation makes this clearer. Java has a similar mechanism, using Futures and CompletableFutures, and another blog.

4. Kafka With Cadence: Conclusions

(Source: Shutterstock)

So at the end of this blog we have two solutions for integrating Kafka microservices with Cadence workflows, to combine the worlds of choreography and orchestration—Signals and Activity Completions. Both solutions worked correctly. I hope you enjoyed the “ballet”! 

The main difference is really that Signals work at the level of Workflows, but Activity Completions are finer-grained and work at the level of tasks/activities. This may be an advantage for correlating more complex workflows with multiple concurrent tasks/activities communicating with Kafka at the same time. 

Note that for the completion approach, the activity timeout governs the timeout for the complete message round-trip to and from Kafka. But for the signal approach, a separate timeout can be used for the activity (sending the message to Kafka) and then waiting for the reply from Kafka, which may be more flexible. 

A final thing to watch out for, is that because signals and activity completions are a remote call, they will introduce a delay into the Kafka consumer. The signal approach requires 2 remote calls but the completion approach may be slightly faster as it only uses 1 remote call. You may need to increase the number of Kafka topic partitions and consumers in order to provide sufficient throughput. 

The demonstration code is available here.

Try Instaclustr Managed Cadence Workflow

Get Early Access

Follow the series: Spinning Your Drones With Cadence

Part 1: Spinning Your Workflows With Cadence!

Part 2: Spinning Apache Kafka® Microservices With Cadence Workflows

Part 3: Spinning your drones with Cadence

Part 4: Architecture, Order and Delivery Workflows

Part 5 Integration Patterns and New Cadence Features