Chapter 4

Chapter 4 - Reliability

In Chapter Three we looked at advanced use of ØMQ's request-reply pattern with worked examples. In this chapter we'll look at the general question of reliability and learn how to build reliable messaging on top of ØMQ's patterns.

Design Discussions

What is "Reliability"?

If you're using tcp, ipc or inproc then messages don't just get lost. It's not as if messages need a GPS to get to their destination. Transports like TCP are explicitly designed to retry and resend, to the point of silliness sometimes. So to understand what reliability means, we have to look at its opposite, namely *failure*. If we can handle a certain set of failures, we are reliable with respect to those failures. No more, no less.

So let's look at the possible causes of failure in a distributed application, in descending order of probability:

  • Application code is the worst offender. It can crash and exit, freeze and stop responding to input, run too slowly for its input, exhaust all memory, etc.
  • System code - like brokers we write using ØMQ - can die. System code should be more reliable than application code but can still crash and burn, and especially run out of memory if it tries to compensate for slow clients.
  • Message queues can overflow, typically in system code that has learned to deal brutally with slow clients. When a queue overflows, it starts to discard messages.
  • Hardware can fail and take with it all the processes running on that box.
  • Networks can fail in exotic ways, e.g. some ports on a switch may die and those parts of the network become inaccessible.
  • Entire data centers can be struck by lightning, earthquakes, fire, or more mundane power or cooling failures.

To make a software system fully reliable against *all* of these possible failures is an enormously difficult and expensive job and goes beyond the scope of this modest guide.

Since the first three or four cases cover 99.9% of real world requirements outside large companies (according to a highly scientific study I just ran), that's what we'll look at. If you're a large company with money to spend on the last two cases, contact me immediately, there's a large hole behind my beach house waiting to be converted into a pool.

The Slow Client Problem

In any high-volume architecture (data or workload distribution), applications need to be able to keep up with incoming data. The problem is that application developers too often don't have the skills to write fast code, or use languages that are inherently slow, or deploy to boxes that can easily run very slowly. Even a fast, well-written client application can appear to run "slowly" if the network is congested, or the application gets temporarily disconnected from the server.

Handling slow clients correctly is delicate. On the one hand, you really don't want to face application developers with an excuse like "sorry, our messaging layer lost your message somewhere". On the other hand, if you allow message queues to build up, especially in publishers that handle many clients, things just break.

ØMQ does two things to handle the slow client problem:

  • It moves messages as rapidly as possible to the client, and queues them there. In all the asynchronous messaging patterns (that is, all except synchronous request-reply), messages are sent to their destination without pause. By itself, this strategy avoids the bulk of queue overflow problems. If the client application runs out of memory, we don't really care.
  • For cases where network congestion or client disconnection stops the sender getting rid of messages, ØMQ offers a "high water mark" that limits the size of a given socket queue. Since each use case has its own needs, ØMQ lets you set this per outgoing socket. When a queue hits the HWM, messages are just dropped. It's brutal, but there is no other sane strategy.

Disk-based Reliability

You can, and people do, use spinning rust to store messages. It rather makes a mess of the idea of "performance" but we're usually more comfortable knowing a really important message (such as that transfer of $400M to my Cyprus account) is stored on disk rather than only in memory. Spinning rust only makes sense for some patterns, mainly request-reply. If we get bored in this chapter we'll play with that, but otherwise, just shove really critical messages into a database that all parties can access, and skip ØMQ for those parts of your dialog.

Reliability Patterns

So to make things brutally simple, reliability is "keeping things working properly when code freezes or crashes", a situation we'll shorten to "dies". However the things we want to keep working properly are more complex than just messages. We need to take each ØMQ messaging pattern and see how to make it work (if we can) even when code dies.

Let's take them one by one:

  • Request-reply: if the server dies (while processing a request), the client can figure that out since it won't get an answer back. Then it can give up in a huff, wait and try again later, find another server, etc. As for the client dying, we can brush that off as "someone else's problem" for now.
  • Publish-subscribe: if the client dies (having gotten some data), the server doesn't know about it. Pubsub doesn't send any information back from client to server. But the client can contact the server out-of-band, e.g. via request-reply, and ask, "please resend everything I missed". As for the server dying, that's out of scope for here.
  • Pipeline: if a worker dies (while working), the ventilator doesn't know about it. Pipelines, like pubsub, and the grinding gears of time, only work in one direction. But the downstream collector can detect that one task didn't get done, and send a message back to the ventilator saying, "hey, resend task 324!" If the ventilator or collector die, then whatever upstream client originally sent the work batch can get tired of waiting and resend the whole lot. It's not elegant but system code should really not die often enough to matter.

Reliable Request-Reply - the Pirate Pattern

The basic request-reply pattern (a REQ socket talking to a REP socket) fails miserably if the server or network dies, so it's basically unreliable. By default the REQ socket blocks on a receive, meaning the client just waits forever. If there's a problem down the line, too bad. I'd probably only use the basic request-reply pattern between two threads in the same process where there's no network or separate server process to die.

However, with a little extra work it becomes a good basis for real work across a distributed network, and we get a reliable pattern I like to call the "Pirate" pattern. RRR! The minimum Pirate sits just in the client, which needs to:

  • Poll the REQ socket and only receive from it when it's sure a reply has arrived.
  • Resend a request several times, it no reply arrived within a timeout period.
  • Abandon the transaction if after several requests, there is still no reply.

Rather than rely on a single server, which may die and be unavailable for a long period, Pirate works better with a pool of servers. A good model is live-live redundant, i.e. a client can send a request to any server. If one server dies, the others take over. In practice that usually means servers need some common storage, e.g. shared access to a single database. To work with a pool of servers, we need to:

  • Know a list of servers to connect to, rather than a single server. This makes configuration and management more work, but there are answers for that.
  • Choose a server to send a request to. The least-recently-used routing from Chapter 3 is suitable here, it means we won't send a request to a server that is not up and running.
  • Detect a server that dies while processing a request. We do this with a timeout: if the server does not reply within a certain time, we treat it as dead.
  • Track dead servers so we don't send requests to them (again). A simple list will work.

With a pool of servers, we can make the client's retry strategy smarter:

  • If there is just one server in the pool, the client waits with a timeout for the server to reply. If the server does not reply within the timeout, the client will retry a number of times before abandoning.
  • If there are multiple servers in the pool, the client tries each server in succession, but does not retry the same server twice.
  • If a server appears to be really dead (i.e. has not responded for some time), the client removes it from its pool.

While many Pirate use cases are *idempotent* (i.e. executing the same request more than once is safe), some are not. Examples of an idempotent Pirate include:

  • Stateless task distribution, i.e. a collapsed pipeline where the client is both ventilator and sink, and the servers are stateless workers that compute a reply based purely on the state provided by a request. In such a case it's safe (though inefficient) to execute the same request many times.
  • A name service that translates logical addresses into endpoints to bind or connect to. In such a case it's safe to make the same lookup request many times.

And here are examples of a non-idempotent Pirate pattern:

  • A logging service. One does not want the same log information recorded more than once.
  • Any service that has impact on downstream nodes, e.g. sends on information to other nodes. If that service gets the same request more than once, downstream nodes will get duplicate information.
  • Any service that modifies shared data in some non-idempotent way. E.g. a service that debits a bank account is definitely not idempotent.

When our server is not idempotent, we have to think more carefully about when exactly a server can crash. If it dies when it's idle, or while it's processing a request, that's usually fine. We can use database transactions to make sure a debit and a credit are always done together, if at all. If the server dies while sending its reply, that's a problem, because as far as its concerned, it's done its work.

if the network dies just as the reply is making its way back to the client, the same situation arises. The client will think the server died, will resend the request, and the server will do the same work twice. Which is not what we want.

We use the standard solution of detecting and rejecting duplicate requests. This means:

  • The client must stamp every request with a unique client identifier and a unique message number.
  • The server, before sending back a reply, stores it using the client id + message number as a key.
  • The server, when getting a request from a given client, first checks if it has a reply for that client id + message number. If so, it does not process the request but just resends the reply.

The final touch to a robust Pirate pattern is server heartbeating. This means getting the server to say "hello" every so often even when it's not doing any work. The smoothest design is where the client pings the server, which pongs back. We don't need to send heartbeats to a working server, only one that's idle. Knowing when an idle server has died means we don't uselessly send requests to dead servers, which improves response time in those cases.

We'll explore Pirates with running code later. For now, let's continue the design discussion by looking at reliable pubsub.

Reliable Publish-Subscribe - the Clone Pattern

Pubsub is inherently unreliable. In fact it's so easy to lose messages with this pattern that you might wonder why ØMQ bothers to implement it at all. :-) That's a rhetorical question. There are many cases where simplicity and speed are more important than pedantic delivery. In fact this covers perhaps the majority of information distribution in the real world.

However, reliable pubsub is also a useful tool. Let's do as before and define what that 'reliability' means in terms of what can go wrong.

Happens all the time:

  • Subscribers join late, so miss messages the server already sent.
  • Subscriber connections take a non-zero time, and can lose messages during that time.

Happens exceptionally:

  • Subscribers can crash, and restart, and lose whatever data they already received.
  • Subscribers can fetch messages too slowly, so queues build up and then overflow.
  • Networks can become overloaded and drop data (specifically, for PGM).
  • Networks can become too slow, so publisher-side queues overflow.

A lot more can go wrong but these are the typical failures we see in a realistic system. The difficulty in defining 'reliability' now is that we have no idea, at the messaging level, what the application actually does with its data. So we need a generic model that we can implement once, and then use for a wide range of applications.

What we'll design is a simple *shared key-value cache* that stores a set of blobs indexed by unique keys. Don't confuse this with *distributed hash tables*, which solve the wider problem of connecting peers in a distributed network, or with *distributed key-value tables*, which act like non-SQL databases. All we will build is a system that reliably clones some in-memory state from a server to a set of clients. We want to:

  • Let a client join the network at any time, and reliably get the current server state.
  • Let any client update the key-value cache (inserting new key-value pairs, updating existing ones, or deleting them).
  • Reliably propagates changes to all clients, and does this with minimum latency overhead.
  • Handle very large numbers of clients, e.g. tens of thousands or more.

The key aspect of the Clone pattern is that clients talk back to servers, which is more than we do in a simple pub-sub dialog. This is why I use the terms 'server' and 'client' instead of 'publisher' and 'subscriber'. We'll use pubsub as part of the Clone pattern but it is more than that.

When a client joins the network, it subscribes a SUB socket, as we'd expect, to the data stream coming from the server (the publisher). This goes across some pub-sub topology (a multicast bus, perhaps, or a tree of forwarder devices, or direct client-to-server connections).

At some undetermined point, it will start getting messages from the server. Note that we can't predict what the client will receive as its first message. If a zmq_connect(3) call takes 10msec, and in that time the server has sent 100 messages, the client might get messages starting from the 100th message.

Let's define a message as a key-value pair. The semantics are simple: if the value is provided, it's an insert or update operation. If there is no value, it's a delete operation. The key provides the subscription filter, so clients can treat the cache as a tree, and select whatever branches of the tree they want to hold.

The client now connects to the server using a different socket (a REQ socket) and asks for a snapshot of the cache. It tells the server two things: which message it received (which means the server has to number messages), and which branch or branches of the cache it wants. To keep things simple we'll assume that any client has exactly one server that it talks to, and gets its cache from. The server *must* be running; we do not try to solve the question of what happens if the server crashes (that's left as an exercise for you to hurt your brain over).

The server builds a snapshot and sends that to the client's REQ socket. This can take some time, especially if the cache is large. The client continues to receive updates from the server on its SUB socket, which it queues but does not process. We'll assume these updates fit into memory. At some point it gets the snapshot on its REQ socket. It then applies the updates to that snapshot, which gives it a working cache.

You'll perhaps see one difficulty here. If the client asks for a snapshot based on message 100, how does the server provide this? After all, it may have sent out lots of updates in the meantime. We solve this by cheating gracefully. The server just sends its current snapshot, but tells the client what its latest message number is. Say that's 200. The client gets the snapshot, and in its queue, it has messages 100 to 300. It throws out 100 to 200, and starts applying 201 to 300 to the snapshot.

Once the client has happily gotten its cache, it disconnects from the server (destroys that REQ socket), which is not used for anything more.

How does Clone handle updates from clients? There are several options but the simplest seems to be that each client acts as a publisher back to the server, which subscribes. In a TCP network this will mean persistent connections between clients and servers. In a PGM network this will mean using a shared multicast bus that clients write to, and the server listens to.

So the client, at startup, opens a PUB socket and part of its initial request to the server includes the address of that socket, so the server can open a SUB socket and connect back to it.

Why don't we allow clients to publish updates directly to other clients? While this would reduce latency, it makes it impossible to sequence messages. Updates *must* pass through the server to make sense to other clients. There's a more subtle second reason. In many applications it's important that updates have a single order, across many clients. Forcing all updates through the server ensures that they have the same order when they finally get to clients.

With unique sequencing, clients can detect the nastier failures - network congestion and queue overflow. If a client discovers that its incoming message stream has a hole, it can take action. It seems sensible that the client contact the server and ask for the missing messages, but in practice that isn't useful. If there are holes, adding more stress to the network will make things worse. All the client can really do is warn its users "Unable to continue", and stop, and not restart until someone has manually checked the cause of the problem.

Clone is complex enough in practice that you don't want to implement it directly in your applications. Instead, it makes a good basis for an application server framework, which talks to applications via the key-value table.

(More coming soon…)