This blog post is how Instaclustr built a highly-available monitoring solution for our fleet of Apache Cassandra and Apache Spark clusters; by using a combination of Riemann (an event stream processor) and RabbitMQ (a distributed, fault-tolerant message broker).
Riemann is a fantastic stream-based event processing engine, written in Clojure (a JVM language), that serves as a solid foundation to build an extendable, flexible and tailored monitoring solution. It has a vast built-in library of composable functions that can be used to process and filter events, and forward notable events onto services such as PagerDuty and DataDog for further action. Instaclustr has been running Riemann in production for nearly 1.5 years, forming the basis of our service-wide monitoring system that collects, analyses, and reacts upon metrics gathered from our entire fleet of deployed Apache Cassandra and Spark clusters. It has allowed us to write concise custom rules that meet our needs — to preempt pending failures, promptly address service outages, and gather and report metrics to our customers via the Instaclustr Console.
Riemann is not without its limitations. For us, the most glaring omissions are the lack of built-in options for high-availability and scaling. Overcoming these omissions were critical for our continued use of Riemann as we grow our customer base.
A traditional Riemann deployment has a single centralised Riemann server with multiple clients connected directly, each feeding Riemann a stream of events. There is no built-in support for fail-over or clustering. Our initial monitoring implementation was based around this centralised approach. Any failures or restarts of Riemann left us in the dark for a short, but unacceptable period of time. It became obvious that we needed to integrate fail-over support into or alongside Riemann to ensure that our monitoring systems were always available.
There is a limit to the number of events that a single Riemann instance can process, which is capped by available CPU time and network bandwidth. Historically, we hit this limit numerous times. We threw beefier hardware at Riemann, which extended the runway for a while, but the only viable long-term solution was to distribute the load across multiple machines. As of this writing we have over 500 individual Cassandra nodes under management, each sending upwards of 3000 individual metrics every ten seconds or so — which is approximately 150,000 metrics per-second! This throughput put us very close to the limits of a single Riemann instance — any hitches in per-event processing latency resulted in a backlog of unprocessed events that took significant time to clear (during which we were unable to react quickly to events).
Distributing Riemann across multiple instances let us kill two birds with one stone. Running Riemann on a single server wasn’t fault-tolerant, and we were pushing the boundaries of what a single server is capable of. Distributed Riemann gives us the fault-tolerance and scalability required to monitor our ever-growing fleet of Cassandra and Spark clusters.
Distributing Riemann
There was a number of significant challenges that we had to overcome to create a fault-tolerant and scalable Riemann system.
A number of our Riemann streams compare events — we collect a lot of raw, de-normalised data that requires other information to give it context. As an example, we parse /proc/stat 1 on every host and extract the per-CPU user, system, idle, iowait, etc. values and send these as separate events to Riemann. To answer the question; “What is the CPU utilisation on a host?” requires combining together the values of all the CPU-related events for a host. We also have rules that compare events between hosts, such as “Is this hosts’s keyspace read latency higher than the Cassandra cluster average?“. Simply distributing events between Riemann instances does not guarantee that events for a particular host or cluster will be processed by the same Riemann instance consistently, ultimately breaking all streams that make this assumption.
Additionally, Riemann is stateful. While Riemann can process events in a state-less fashion, a large number of the built-in stream functions and some of the custom functions we’ve written remember state between each invocation (each stream function is invoked as each event flows through the stream). For example, riemann.streams/throttle 2, which restricts the flow of events passed downstream is a stateful stream function, as it remembers the timestamp of the last event to flow through it.
State can also be persisted via the Riemann index, an initially empty event set, keyed by host and service, managed by the active Riemann configuration. The index can be queried, and events can be selectively appended & removed with stream functions. Events inserted into the Riemann index can expire after an optionally specified per-event TTL. Once events expire, they are re-injected into the defined streams for further processing. This is useful for things like watchdog timers and maintenance rules.
In order to compare events at the host and cluster level, and to ensure that any stateful components of Riemann exhibit the correct behaviour, we had to ensure that events from nodes in the same cluster were sent consistently to the same Riemann instance, with Riemann instance topology changes being the only exception (i.e., adding or removing Riemann instances).
Instaclustr relies on RabbitMQ to handle all inter-application bi-directional messaging. This not only includes communication between our central services, but communication between our central services and our custom agents running on every node under management. While Riemann and its event-generating clients traditionally communicate over TCP or UDP sockets, we’ve opted to proxy Riemann communications over RabbitMQ. Clients publish messages containing raw Riemann Protobuf data (typically a riemann.proto Msg structure) to RabbitMQ, which is then forwarded directly to Riemann’s TCP socket by a proxy application at the receiving end.
Proxying gives us a huge amount of flexibility over Riemann’s stock TCP sockets. We route all Riemann communications via a RabbitMQ exchange which allows us to distribute messages across multiple Riemann instances with grouping via cluster ID. We apply per-message TTL/expiry to the queues to drop old data if processing isn’t keeping up — we don’t care about 10 minute-old data! We get the added security benefits of RabbitMQ’s protocol, including built-in TLS support and authentication. Clients no longer have a direct dependency on Riemann, in the form of a TCP socket — instead they connect to our RabbitMQ cluster and the RMQ client library handles automatic fail-over and connection recovery.
RabbitMQ has many features that reduce the work required to build fault-tolerant and scalable applications. The Consistent Hash Exchange plugin, bundled with the standard RabbitMQ distribution (but not enabled by default), is one such feature. The plugin adds a new exchange type, x-consistent-hash, which routes messages to its bound queues based on the Consistent Hash of each individual message’s routing key. Queues can be bound to- or unbound from the exchange at any time (on purpose, or from death of the client in the case of non-persistent queues) and messages will be re-routed accordingly to other bound queues.
We use a combination of a Consistent Hash Exchange and a per-message routing key equal to a nodes parent cluster ID to ensure that messages from all nodes in a cluster are routed to the same Riemann instance, allowing the aforementioned stateful index and stream functions to work appropriately, and thus permit our per-node and per-cluster monitoring rules to function.
Watchdog Timers
Watchdog timers are a special case. The common paradigm for implementing watchdog timers as outlined in the Riemann how-to for detecting down services, thus follows:
- Have the service under watch send a heartbeat event periodically, every ? seconds, to Riemann. 3
- On receipt of this event, insert this event into the Riemann index with an appropriate TTL — the chosen TTL is often a multiplier of the heartbeat interval, i.e. ?⋅? will allow for ? missed heartbeats before the event expires.
- On expiry of the heartbeat event in the Riemann index, actions may be performed to react to the watchdog timeout for the service (such as send an email or PagerDuty alert).
One flaw with this strategy is that it requires the service to send an initial heartbeat to initialise, or “arm”, the watchdog timer — and the event has to be inserted into the index for the timeout action to execute. Riemann will never know about the service if it is offline when Riemann starts.
This common approach also fails in a high-availability scenario, where events may be arbitrary re-routed to an alternate Riemann instances. While the Consistent Hash Exchange tries to consistently route messages to the same queue, occasionally a queue topology change may cause messages for a number of hosts to be re-routed to the queue of a different Riemann instance (where the addition of queues having less impact than the removal of an existing queue, such as in the event of a Riemann instance failure). Riemann isn’t natively aware of these topology changes and hence the Riemann instance that was previously responsible for the re-routed hosts may still have watchdog timers running, will never receive heartbeats for those hosts, and the watchdog timers will eventually expire leading to spurious and invalid timeout actions being executed.
Instaclustr stores the details of every cluster, their constituent data centres and nodes in a central Postgres database. At any moment we can answer the question “What nodes should be running?“. We use this information to register running nodes with Riemann. We periodically inform each running Riemann instance of the hosts it is responsible for via registration broadcasts; for each running node, a Riemann message containing a registration event is published to the Consistent Hash Exchange, with a routing key equal to the node’s parent cluster ID. These events are routed to the Riemann instances responsible (or “chosen” by the exchange to be responsible) and, upon receipt, indexed with a reasonably short TTL.
If a registration event for a host expires then all events in the index for that host are expunged (removed silently; not marked as expired; not re-injected into streams for further processing). Expiry of registration events occur when either the routing topology of the Consistent Hash Exchange changes or the node is no longer considered to be running (such as when the owning user deletes it).
The receipt of a registration event for a host starts a presence watchdog timer by inserting into the index a presence event for the host with a TTL equal to a multiple of the registration event TTL (to ensure that, if the node becomes unregistered, it is expunged before the presence event expires). Hosts are expected to periodically send presence events to reset this watchdog timer otherwise timeout actions will be triggered (such as a PagerDuty alert).
We use the indexed registration event as filter to prevent spurious alerts during fail-over — if Riemann receives an event for a host that isn’t registered (i.e. a registration event for the host isn’t present in the index) then the event is silently dropped.
Registration and presence events solve the watchdog timer problem, and let every Riemann instance know which nodes it its responsible for.
Maintenance Mode
Support for maintenance mode events also required special consideration. Maintenance events are flags or indicators that are sent to Riemann to temporarily disable the actions that transpire from detected issues, such as sending PagerDuty alerts, in situations where those issues are expected or already known. For example, if Cassandra is purposefully offline on an instance for maintenance, the on-call support staff need not be continuously paged about an offline Cassandra service, which is not only very annoying, but significantly increases noise and hence the likelihood that an actual failure goes un-noticed.
Riemann’s how-to on querying the index from within a stream gives a sample implementation of maintenance mode, which boils down to:
- Send an event to Riemann that represents maintenance mode with a TTL equal to the maintenance window (e.g, 1 hour).
- Index this event upon receipt.
- Before actioning detected issues, query the index for a maintenance mode event, and, if present and un-expired, prevent any further action.
Our Riemann configuration has extensive, fine-grained maintenance mode support based on a variant of this pattern. We can silence all or specific alerts for a particular node or cluster, or silence a specific alert fleet-wide. The latter is useful for silencing newly deployed, mis-behaving rules. We have opted to not have the ability to silence all alerts for all nodes, which could be rather dangerous!
The approach detailed in the Riemann how-to has a downside whereby all maintenance mode events are only persisted in the Riemann index, which isn’t fault tolerant. A crash or restart of Riemann erases all active maintenance events, which, after restart, results in a flood of alerts about already known, under-investigation problems. This is far from ideal, especially in a fault-tolerant system, where failure should be unnoticeable.
Riemann’s index is also not distributed, hence attempting to run multiple copies of Riemann, each with their own list of maintenance events, would introduce the complexity of trying to keep every list in sync, lest there be spurious and possibly confusing alerts caused by inconsistencies. In addition, trying to derive the set of active maintenance events across all instances would be complex and error-prone, and might produce confusing and less-than-useful information when Riemann instances have out-of-sync and potentially conflicting state.
For our distributed Riemann implementation, we opted to store a authoritative list of maintenance events in our central Postgres database and broadcast this list periodically to every running Riemann instance. This solves the lack of persistence and the lack of a canonical list to query against.
We have our own command-line tool that makes it simple for our support staff to manage maintenance mode events. New events can be created and the set of active, unexpired events can be queried. Initially this tool connected directly to Riemann via TCP to send new events and query for active events, but for distributed Riemann now just inserts (or updates) records in our Postgres database.
The Internals
Our distributed Riemann implementation utilises the following components, an number of which were already present in our architecture.
- Riemann itself, for the event processing.
- RabbitMQ, for fault-tolerant, distributed message routing.
- Postgres, for maintenance rule persistence.
- node-agent, our own custom metrics collection agent running on every node.
- monitoringd, our Riemann “helper”, because we didn’t want to write everything in Clojure 4.
On the RabbitMQ side, we have a pre-defined, persistent, x-consistent-hash exchange, named monitoring. This only needs to be created and configured once.
Every Riemann instance runs two processes, Riemann itself and our custom monitoringd, which acts as a proxy for RabbitMQ ⟷ Riemann. monitoringd binds a queue to the monitoring exchange and forwards each incoming message that contains a Riemann Msg Protobuf struct to the local Riemann instance via TCP socket, acknowledging receipt to RabbitMQ once Riemann has confirmed successful delivery. Manual ack, combined with prefetch, allows RabbitMQ to handle flow-control. We have per-queue TTLs defined, so that if processing is delayed, older, less relevant messages are dropped in-favour of fresh messages.
Registration refresh: Our Postgres database contains the details of all node and their parent clusters. Every instance of monitoringd periodically queries Postgres for the details of every running node, which includes their parent cluster IDs. The details of every node are forwarded to Riemann indirectly; they are published via RabbitMQ to the monitoring exchange as registration events, with a routing key equal to the nodes cluster ID. Hence, node registration events will be delivered to the Riemann instance responsible for monitoring that node.
Registration events are indexed by Riemann with a short TTL equal to a multiple of the monitoringd registration refresh frequency (this ensures that node registrations don’t expire if message delivery is briefly interrupted). If a registration event does expire, the Riemann index is expunged of all events for that node, preventing spurious alerts caused by future expiring events for that node (such as service watchdog timers). If an event for a node, other than a registration, is received by Riemann for which there is no registration event present in the index, then the incoming event is silently dropped 5.
Metrics collection: Every Instaclustr-managed node runs node-agent, a custom metrics collection agent. Every 20 seconds or so it collects metrics, in the form of Riemann Event Protobuf structs, from a number of sources, ranging from system-level metrics (via /proc/) to Cassandra itself (via JMX). The collected events are bundled into a Riemann Msg struct and serialised to binary using the Riemann Java client Protobuf interface. This binary blob is then published as a RabbitMQ message to the monitoring exchange, with the nodes cluster ID as the message routing key, hence it will be consistently routed to the same Riemann instance for processing (unless the exchange topology changes, i.e due to Riemann instance fail-over).
When a node is newly registered, a presence watchdog timer is started with an timeout equal to a multiple of the registration refresh frequency. This timeout is to ensure that, if the node becomes unregistered, it is expunged before the presence event expires. Included in every set of published events is a presence event, which resets the presence watchdog timer. If this watchdog timer expires, we know that a node failed to send metrics & advertise its presence, and requires further investigation.
Maintenance rule refresh: Our Postgres database contains a list of maintenance events, and their associated expiry timestamp. Every instance of monitoringd periodically queries Postgres for a list of active maintenance events (i.e, where their expiry timestamp is > now). Each maintenance event is sent directly (using the Riemann Java TCP client; not via RabbitMQ) to the local Riemann instance and indexed with a short TTL 6 equal to a multiple of the maintenance rule refresh period.
While we run Postgres in a HA configuration, we opted to decouple our monitoring system from possible Postgres outages. Monitoringd caches the results of every query (such as all nodes or all maintenance events) it executes against Postgres and, if database connectivity issues arise, will operate on cached data until connectivity is restored.
Footnotes
1 proc (5), https://linux.die.net/man/5/proc ↩
2 https://github.com/riemann/riemann/blob/0.2.10/src/riemann/streams.clj#L1060 ↩
3 Sometimes this is called “kicking the dog”. https://en.wikipedia.org/wiki/Watchdog_timer#Watchdog_restart ↩
4 Our engineering team has a strong background in developing Java applications. Riemann and its configuration files are written in Clojure. Initially, Clojure was a complete mystery to a number of us and hence when we needed to extend the functionality of Riemann past that of defining monitoring rules/streams, it was easier to consider Riemann as a black-box and instead build a helper application that would run alongside Riemann. As Clojure is a language that targets the JVM, and has extensive Java ⟷ Clojure inter-op support, it’s most certainly possible to combine Riemann and monitoringd together. ↩
5 Who monitors the monitoring system? We keep an counter of such events and expose it via Riemann’s built-in instrumentation service, which we monitor with DataDog. ↩
6 Rows are never deleted from the maintenance events table (for audit), but may be updated to change the expiry time (e.g., expiry may be set to now if a maintenance window is to be closed sooner than expected). By using a short TTL and frequent refresh, rather than inserting or updating events in the Riemann index with a TTL equivalent to the expiry timestamp, prevents issues when maintenance events are updated in the database to have a TTL shorter than expected — they won’t be retuned by the next refresh query, and hence won’t be updated to have a TTL of 0. Instead they will keep their previous TTL, resulting in incorrect behaviour. ↩