4. Reliable Request-Reply Patterns

Chapter 4 - Reliable Request-Reply Patterns #

Chapter 3 - Advanced Request-Reply Patterns covered advanced uses of ZeroMQ’s request-reply pattern with working examples. This chapter looks at the general question of reliability and builds a set of reliable messaging patterns on top of ZeroMQ’s core request-reply pattern.

In this chapter, we focus heavily on user-space request-reply patterns, reusable models that help you design your own ZeroMQ architectures:

  • The Lazy Pirate pattern: reliable request-reply from the client side
  • The Simple Pirate pattern: reliable request-reply using load balancing
  • The Paranoid Pirate pattern: reliable request-reply with heartbeating
  • The Majordomo pattern: service-oriented reliable queuing
  • The Titanic pattern: disk-based/disconnected reliable queuing
  • The Binary Star pattern: primary-backup server failover
  • The Freelance pattern: brokerless reliable request-reply

What is “Reliability”? #

Most people who speak of “reliability” don’t really know what they mean. We can only define reliability in terms of failure. That is, if we can handle a certain set of well-defined and understood failures, then we are reliable with respect to those failures. No more, no less. So let’s look at the possible causes of failure in a distributed ZeroMQ application, in roughly descending order of probability:

  • Application code is the worst offender. It can crash and exit, freeze and stop responding to input, run too slowly for its input, exhaust all memory, and so on.

  • System code–such as brokers we write using ZeroMQ–can die for the same reasons as application code. System code should be more reliable than application code, but it can still crash and burn, and especially run out of memory if it tries to queue messages for slow clients.

  • Message queues can overflow, typically in system code that has learned to deal brutally with slow clients. When a queue overflows, it starts to discard messages. So we get “lost” messages.

  • Networks can fail (e.g., WiFi gets switched off or goes out of range). ZeroMQ will automatically reconnect in such cases, but in the meantime, messages may get lost.

  • Hardware can fail and take with it all the processes running on that box.

  • Networks can fail in exotic ways, e.g., some ports on a switch may die and those parts of the network become inaccessible.

  • Entire data centers can be struck by lightning, earthquakes, fire, or more mundane power or cooling failures.

To make a software system fully reliable against all of these possible failures is an enormously difficult and expensive job and goes beyond the scope of this book.

Because the first five cases in the above list cover 99.9% of real world requirements outside large companies (according to a highly scientific study I just ran, which also told me that 78% of statistics are made up on the spot, and moreover never to trust a statistic that we didn’t falsify ourselves), that’s what we’ll examine. If you’re a large company with money to spend on the last two cases, contact my company immediately! There’s a large hole behind my beach house waiting to be converted into an executive swimming pool.

Designing Reliability #

So to make things brutally simple, reliability is “keeping things working properly when code freezes or crashes”, a situation we’ll shorten to “dies”. However, the things we want to keep working properly are more complex than just messages. We need to take each core ZeroMQ messaging pattern and see how to make it work (if we can) even when code dies.

Let’s take them one-by-one:

  • Request-reply: if the server dies (while processing a request), the client can figure that out because it won’t get an answer back. Then it can give up in a huff, wait and try again later, find another server, and so on. As for the client dying, we can brush that off as “someone else’s problem” for now.

  • Pub-sub: if the client dies (having gotten some data), the server doesn’t know about it. Pub-sub doesn’t send any information back from client to server. But the client can contact the server out-of-band, e.g., via request-reply, and ask, “please resend everything I missed”. As for the server dying, that’s out of scope for here. Subscribers can also self-verify that they’re not running too slowly, and take action (e.g., warn the operator and die) if they are.

  • Pipeline: if a worker dies (while working), the ventilator doesn’t know about it. Pipelines, like the grinding gears of time, only work in one direction. But the downstream collector can detect that one task didn’t get done, and send a message back to the ventilator saying, “hey, resend task 324!” If the ventilator or collector dies, whatever upstream client originally sent the work batch can get tired of waiting and resend the whole lot. It’s not elegant, but system code should really not die often enough to matter.

In this chapter we’ll focus just on request-reply, which is the low-hanging fruit of reliable messaging.

The basic request-reply pattern (a REQ client socket doing a blocking send/receive to a REP server socket) scores low on handling the most common types of failure. If the server crashes while processing the request, the client just hangs forever. If the network loses the request or the reply, the client hangs forever.

Request-reply is still much better than TCP, thanks to ZeroMQ’s ability to reconnect peers silently, to load balance messages, and so on. But it’s still not good enough for real work. The only case where you can really trust the basic request-reply pattern is between two threads in the same process where there’s no network or separate server process to die.

However, with a little extra work, this humble pattern becomes a good basis for real work across a distributed network, and we get a set of reliable request-reply (RRR) patterns that I like to call the Pirate patterns (you’ll eventually get the joke, I hope).

There are, in my experience, roughly three ways to connect clients to servers. Each needs a specific approach to reliability:

  • Multiple clients talking directly to a single server. Use case: a single well-known server to which clients need to talk. Types of failure we aim to handle: server crashes and restarts, and network disconnects.

  • Multiple clients talking to a broker proxy that distributes work to multiple workers. Use case: service-oriented transaction processing. Types of failure we aim to handle: worker crashes and restarts, worker busy looping, worker overload, queue crashes and restarts, and network disconnects.

  • Multiple clients talking to multiple servers with no intermediary proxies. Use case: distributed services such as name resolution. Types of failure we aim to handle: service crashes and restarts, service busy looping, service overload, and network disconnects.

Each of these approaches has its trade-offs and often you’ll mix them. We’ll look at all three in detail.

Client-Side Reliability (Lazy Pirate Pattern) #

We can get very simple reliable request-reply with some changes to the client. We call this the Lazy Pirate pattern. Rather than doing a blocking receive, we:

  • Poll the REQ socket and receive from it only when it’s sure a reply has arrived.
  • Resend a request, if no reply has arrived within a timeout period.
  • Abandon the transaction if there is still no reply after several requests.

If you try to use a REQ socket in anything other than a strict send/receive fashion, you’ll get an error (technically, the REQ socket implements a small finite-state machine to enforce the send/receive ping-pong, and so the error code is called “EFSM”). This is slightly annoying when we want to use REQ in a pirate pattern, because we may send several requests before getting a reply.

The pretty good brute force solution is to close and reopen the REQ socket after an error:

C | C++ | Delphi | Go | Haskell | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Rust | Tcl | Ada | Basic | C# | CL | Erlang | Elixir | F# | Felix | Julia | Node.js | Objective-C | ooc | Q | Racket | Scala | OCaml

Run this together with the matching server:

C | C++ | Delphi | Go | Haskell | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Rust | Scala | Tcl | Ada | Basic | C# | CL | Erlang | Elixir | F# | Felix | Julia | Node.js | Objective-C | ooc | Q | Racket | OCaml
Figure 47 - The Lazy Pirate Pattern

To run this test case, start the client and the server in two console windows. The server will randomly misbehave after a few messages. You can check the client’s response. Here is typical output from the server:

I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash

And here is the client’s response:

I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning

The client sequences each message and checks that replies come back exactly in order: that no requests or replies are lost, and no replies come back more than once, or out of order. Run the test a few times until you’re convinced that this mechanism actually works. You don’t need sequence numbers in a production application; they just help us trust our design.

The client uses a REQ socket, and does the brute force close/reopen because REQ sockets impose that strict send/receive cycle. You might be tempted to use a DEALER instead, but it would not be a good decision. First, it would mean emulating the secret sauce that REQ does with envelopes (if you’ve forgotten what that is, it’s a good sign you don’t want to have to do it). Second, it would mean potentially getting back replies that you didn’t expect.

Handling failures only at the client works when we have a set of clients talking to a single server. It can handle a server crash, but only if recovery means restarting that same server. If there’s a permanent error, such as a dead power supply on the server hardware, this approach won’t work. Because the application code in servers is usually the biggest source of failures in any architecture, depending on a single server is not a great idea.

So, pros and cons:

  • Pro: simple to understand and implement.
  • Pro: works easily with existing client and server application code.
  • Pro: ZeroMQ automatically retries the actual reconnection until it works.
  • Con: doesn’t failover to backup or alternate servers.

Basic Reliable Queuing (Simple Pirate Pattern) #

Our second approach extends the Lazy Pirate pattern with a queue proxy that lets us talk, transparently, to multiple servers, which we can more accurately call “workers”. We’ll develop this in stages, starting with a minimal working model, the Simple Pirate pattern.

In all these Pirate patterns, workers are stateless. If the application requires some shared state, such as a shared database, we don’t know about it as we design our messaging framework. Having a queue proxy means workers can come and go without clients knowing anything about it. If one worker dies, another takes over. This is a nice, simple topology with only one real weakness, namely the central queue itself, which can become a problem to manage, and a single point of failure.

Figure 48 - The Simple Pirate Pattern

The basis for the queue proxy is the load balancing broker from Chapter 3 - Advanced Request-Reply Patterns. What is the very minimum we need to do to handle dead or blocked workers? Turns out, it’s surprisingly little. We already have a retry mechanism in the client. So using the load balancing pattern will work pretty well. This fits with ZeroMQ’s philosophy that we can extend a peer-to-peer pattern like request-reply by plugging naive proxies in the middle.

We don’t need a special client; we’re still using the Lazy Pirate client. Here is the queue, which is identical to the main task of the load balancing broker:

C | C++ | Delphi | Go | Haskell | Haxe | Java | Lua | PHP | Python | Tcl | Ada | Basic | C# | CL | Erlang | Elixir | F# | Felix | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Ruby | Rust | Scala | OCaml

Here is the worker, which takes the Lazy Pirate server and adapts it for the load balancing pattern (using the REQ “ready” signaling):

C | C++ | Delphi | Go | Haskell | Haxe | Java | Lua | PHP | Python | Tcl | Ada | Basic | C# | CL | Erlang | Elixir | F# | Felix | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Ruby | Rust | Scala | OCaml

To test this, start a handful of workers, a Lazy Pirate client, and the queue, in any order. You’ll see that the workers eventually all crash and burn, and the client retries and then gives up. The queue never stops, and you can restart workers and clients ad nauseam. This model works with any number of clients and workers.

Robust Reliable Queuing (Paranoid Pirate Pattern) #

Figure 49 - The Paranoid Pirate Pattern

The Simple Pirate Queue pattern works pretty well, especially because it’s just a combination of two existing patterns. Still, it does have some weaknesses:

  • It’s not robust in the face of a queue crash and restart. The client will recover, but the workers won’t. While ZeroMQ will reconnect workers’ sockets automatically, as far as the newly started queue is concerned, the workers haven’t signaled ready, so don’t exist. To fix this, we have to do heartbeating from queue to worker so that the worker can detect when the queue has gone away.

  • The queue does not detect worker failure, so if a worker dies while idle, the queue can’t remove it from its worker queue until the queue sends it a request. The client waits and retries for nothing. It’s not a critical problem, but it’s not nice. To make this work properly, we do heartbeating from worker to queue, so that the queue can detect a lost worker at any stage.

We’ll fix these in a properly pedantic Paranoid Pirate Pattern.

We previously used a REQ socket for the worker. For the Paranoid Pirate worker, we’ll switch to a DEALER socket. This has the advantage of letting us send and receive messages at any time, rather than the lock-step send/receive that REQ imposes. The downside of DEALER is that we have to do our own envelope management (re-read Chapter 3 - Advanced Request-Reply Patterns for background on this concept).

We’re still using the Lazy Pirate client. Here is the Paranoid Pirate queue proxy:

C | C++ | Go | Haskell | Haxe | Java | Lua | PHP | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Ruby | Rust | Scala | OCaml

The queue extends the load balancing pattern with heartbeating of workers. Heartbeating is one of those “simple” things that can be difficult to get right. I’ll explain more about that in a second.

Here is the Paranoid Pirate worker:

C | C++ | Go | Haskell | Haxe | Java | Lua | PHP | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Ruby | Rust | Scala | OCaml

Some comments about this example:

  • The code includes simulation of failures, as before. This makes it (a) very hard to debug, and (b) dangerous to reuse. When you want to debug this, disable the failure simulation.

  • The worker uses a reconnect strategy similar to the one we designed for the Lazy Pirate client, with two major differences: (a) it does an exponential back-off, and (b) it retries indefinitely (whereas the client retries a few times before reporting a failure).

Try the client, queue, and workers, such as by using a script like this:

ppqueue &
for i in 1 2 3 4; do
    ppworker &
    sleep 1
done
lpclient &

You should see the workers die one-by-one as they simulate a crash, and the client eventually give up. You can stop and restart the queue and both client and workers will reconnect and carry on. And no matter what you do to queues and workers, the client will never get an out-of-order reply: the whole chain either works, or the client abandons.

Heartbeating #

Heartbeating solves the problem of knowing whether a peer is alive or dead. This is not an issue specific to ZeroMQ. TCP has a long timeout (30 minutes or so), that means that it can be impossible to know whether a peer has died, been disconnected, or gone on a weekend to Prague with a case of vodka, a redhead, and a large expense account.

It’s not easy to get heartbeating right. When writing the Paranoid Pirate examples, it took about five hours to get the heartbeating working properly. The rest of the request-reply chain took perhaps ten minutes. It is especially easy to create “false failures”, i.e., when peers decide that they are disconnected because the heartbeats aren’t sent properly.

We’ll look at the three main answers people use for heartbeating with ZeroMQ.

Shrugging It Off #

The most common approach is to do no heartbeating at all and hope for the best. Many if not most ZeroMQ applications do this. ZeroMQ encourages this by hiding peers in many cases. What problems does this approach cause?

  • When we use a ROUTER socket in an application that tracks peers, as peers disconnect and reconnect, the application will leak memory (resources that the application holds for each peer) and get slower and slower.

  • When we use SUB- or DEALER-based data recipients, we can’t tell the difference between good silence (there’s no data) and bad silence (the other end died). When a recipient knows the other side died, it can for example switch over to a backup route.

  • If we use a TCP connection that stays silent for a long while, it will, in some networks, just die. Sending something (technically, a “keep-alive” more than a heartbeat), will keep the network alive.

One-Way Heartbeats #

A second option is to send a heartbeat message from each node to its peers every second or so. When one node hears nothing from another within some timeout (several seconds, typically), it will treat that peer as dead. Sounds good, right? Sadly, no. This works in some cases but has nasty edge cases in others.

For pub-sub, this does work, and it’s the only model you can use. SUB sockets cannot talk back to PUB sockets, but PUB sockets can happily send “I’m alive” messages to their subscribers.

As an optimization, you can send heartbeats only when there is no real data to send. Furthermore, you can send heartbeats progressively slower and slower, if network activity is an issue (e.g., on mobile networks where activity drains the battery). As long as the recipient can detect a failure (sharp stop in activity), that’s fine.

Here are the typical problems with this design:

  • It can be inaccurate when we send large amounts of data, as heartbeats will be delayed behind that data. If heartbeats are delayed, you can get false timeouts and disconnections due to network congestion. Thus, always treat any incoming data as a heartbeat, whether or not the sender optimizes out heartbeats.

  • While the pub-sub pattern will drop messages for disappeared recipients, PUSH and DEALER sockets will queue them. So if you send heartbeats to a dead peer and it comes back, it will get all the heartbeats you sent, which can be thousands. Whoa, whoa!

  • This design assumes that heartbeat timeouts are the same across the whole network. But that won’t be accurate. Some peers will want very aggressive heartbeating in order to detect faults rapidly. And some will want very relaxed heartbeating, in order to let sleeping networks lie and save power.

Ping-Pong Heartbeats #

The third option is to use a ping-pong dialog. One peer sends a ping command to the other, which replies with a pong command. Neither command has any payload. Pings and pongs are not correlated. Because the roles of “client” and “server” are arbitrary in some networks, we usually specify that either peer can in fact send a ping and expect a pong in response. However, because the timeouts depend on network topologies known best to dynamic clients, it is usually the client that pings the server.

This works for all ROUTER-based brokers. The same optimizations we used in the second model make this work even better: treat any incoming data as a pong, and only send a ping when not otherwise sending data.

Heartbeating for Paranoid Pirate #

For Paranoid Pirate, we chose the second approach. It might not have been the simplest option: if designing this today, I’d probably try a ping-pong approach instead. However the principles are similar. The heartbeat messages flow asynchronously in both directions, and either peer can decide the other is “dead” and stop talking to it.

In the worker, this is how we handle heartbeats from the queue:

  • We calculate a liveness, which is how many heartbeats we can still miss before deciding the queue is dead. It starts at three and we decrement it each time we miss a heartbeat.
  • We wait, in the zmq_poll loop, for one second each time, which is our heartbeat interval.
  • If there’s any message from the queue during that time, we reset our liveness to three.
  • If there’s no message during that time, we count down our liveness.
  • If the liveness reaches zero, we consider the queue dead.
  • If the queue is dead, we destroy our socket, create a new one, and reconnect.
  • To avoid opening and closing too many sockets, we wait for a certain interval before reconnecting, and we double the interval each time until it reaches 32 seconds.

And this is how we handle heartbeats to the queue:

  • We calculate when to send the next heartbeat; this is a single variable because we’re talking to one peer, the queue.
  • In the zmq_poll loop, whenever we pass this time, we send a heartbeat to the queue.

Here’s the essential heartbeating code for the worker:


#define HEARTBEAT_LIVENESS  3       //  3-5 is reasonable
#define HEARTBEAT_INTERVAL  1000    //  msecs
#define INTERVAL_INIT       1000    //  Initial reconnect
#define INTERVAL_MAX       32000    //  After exponential backoff

...
//  If liveness hits zero, queue is considered disconnected
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;

//  Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

while (true) {
    zmq_pollitem_t items [] = { { worker,  0, ZMQ_POLLIN, 0 } };
    int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);

    if (items [0].revents & ZMQ_POLLIN) {
        //  Receive any message from queue
        liveness = HEARTBEAT_LIVENESS;
        interval = INTERVAL_INIT;
    }
    else
    if (--liveness == 0) {
        zclock_sleep (interval);
        if (interval < INTERVAL_MAX)
            interval *= 2;
        zsocket_destroy (ctx, worker);
        ...
        liveness = HEARTBEAT_LIVENESS;
    }
    //  Send heartbeat to queue if it's time
    if (zclock_time () > heartbeat_at) {
        heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
        //  Send heartbeat message to queue
    }
}

The queue does the same, but manages an expiration time for each worker.

Here are some tips for your own heartbeating implementation:

  • Use zmq_poll or a reactor as the core of your application’s main task.

  • Start by building the heartbeating between peers, test it by simulating failures, and then build the rest of the message flow. Adding heartbeating afterwards is much trickier.

  • Use simple tracing, i.e., print to console, to get this working. To help you trace the flow of messages between peers, use a dump method such as zmsg offers, and number your messages incrementally so you can see if there are gaps.

  • In a real application, heartbeating must be configurable and usually negotiated with the peer. Some peers will want aggressive heartbeating, as low as 10 msecs. Other peers will be far away and want heartbeating as high as 30 seconds.

  • If you have different heartbeat intervals for different peers, your poll timeout should be the lowest (shortest time) of these. Do not use an infinite timeout.

  • Do heartbeating on the same socket you use for messages, so your heartbeats also act as a keep-alive to stop the network connection from going stale (some firewalls can be unkind to silent connections).

Contracts and Protocols #

If you’re paying attention, you’ll realize that Paranoid Pirate is not interoperable with Simple Pirate, because of the heartbeats. But how do we define “interoperable”? To guarantee interoperability, we need a kind of contract, an agreement that lets different teams in different times and places write code that is guaranteed to work together. We call this a “protocol”.

It’s fun to experiment without specifications, but that’s not a sensible basis for real applications. What happens if we want to write a worker in another language? Do we have to read code to see how things work? What if we want to change the protocol for some reason? Even a simple protocol will, if it’s successful, evolve and become more complex.

Lack of contracts is a sure sign of a disposable application. So let’s write a contract for this protocol. How do we do that?

There’s a wiki at rfc.zeromq.org that we made especially as a home for public ZeroMQ contracts. To create a new specification, register on the wiki if needed, and follow the instructions. It’s fairly straightforward, though writing technical texts is not everyone’s cup of tea.

It took me about fifteen minutes to draft the new Pirate Pattern Protocol. It’s not a big specification, but it does capture enough to act as the basis for arguments (“your queue isn’t PPP compatible; please fix it!").

Turning PPP into a real protocol would take more work:

  • There should be a protocol version number in the READY command so that it’s possible to distinguish between different versions of PPP.

  • Right now, READY and HEARTBEAT are not entirely distinct from requests and replies. To make them distinct, we would need a message structure that includes a “message type” part.

Service-Oriented Reliable Queuing (Majordomo Pattern) #

Figure 50 - The Majordomo Pattern

The nice thing about progress is how fast it happens when lawyers and committees aren’t involved. The one-page MDP specification turns PPP into something more solid. This is how we should design complex architectures: start by writing down the contracts, and only then write software to implement them.

The Majordomo Protocol (MDP) extends and improves on PPP in one interesting way: it adds a “service name” to requests that the client sends, and asks workers to register for specific services. Adding service names turns our Paranoid Pirate queue into a service-oriented broker. The nice thing about MDP is that it came out of working code, a simpler ancestor protocol (PPP), and a precise set of improvements that each solved a clear problem. This made it easy to draft.

To implement Majordomo, we need to write a framework for clients and workers. It’s really not sane to ask every application developer to read the spec and make it work, when they could be using a simpler API that does the work for them.

So while our first contract (MDP itself) defines how the pieces of our distributed architecture talk to each other, our second contract defines how user applications talk to the technical framework we’re going to design.

Majordomo has two halves, a client side and a worker side. Because we’ll write both client and worker applications, we will need two APIs. Here is a sketch for the client API, using a simple object-oriented approach:


mdcli_t *mdcli_new     (char *broker);
void     mdcli_destroy (mdcli_t **self_p);
zmsg_t  *mdcli_send    (mdcli_t *self, char *service, zmsg_t **request_p);

That’s it. We open a session to the broker, send a request message, get a reply message back, and eventually close the connection. Here’s a sketch for the worker API:


mdwrk_t *mdwrk_new     (char *broker,char *service);
void     mdwrk_destroy (mdwrk_t **self_p);
zmsg_t  *mdwrk_recv    (mdwrk_t *self, zmsg_t *reply);

It’s more or less symmetrical, but the worker dialog is a little different. The first time a worker does a recv(), it passes a null reply. Thereafter, it passes the current reply, and gets a new request.

The client and worker APIs were fairly simple to construct because they’re heavily based on the Paranoid Pirate code we already developed. Here is the client API:

C | C++ | Go | Haxe | Java | Lua | PHP | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Haskell | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Ruby | Rust | Scala | OCaml

Let’s see how the client API looks in action, with an example test program that does 100K request-reply cycles:

C | C++ | Go | Haskell | Haxe | Java | Lua | PHP | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Ruby | Rust | Scala | OCaml

And here is the worker API:

C | C++ | Go | Haxe | Java | Lua | PHP | Python | Ruby | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Haskell | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Rust | Scala | OCaml

Let’s see how the worker API looks in action, with an example test program that implements an echo service:

C | C++ | Go | Haskell | Haxe | Java | Lua | PHP | Python | Ruby | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Rust | Scala | OCaml

Here are some things to note about the worker API code:

  • The APIs are single-threaded. This means, for example, that the worker won’t send heartbeats in the background. Happily, this is exactly what we want: if the worker application gets stuck, heartbeats will stop and the broker will stop sending requests to the worker.

  • The worker API doesn’t do an exponential back-off; it’s not worth the extra complexity.

  • The APIs don’t do any error reporting. If something isn’t as expected, they raise an assertion (or exception depending on the language). This is ideal for a reference implementation, so any protocol errors show immediately. For real applications, the API should be robust against invalid messages.

You might wonder why the worker API is manually closing its socket and opening a new one, when ZeroMQ will automatically reconnect a socket if the peer disappears and comes back. Look back at the Simple Pirate and Paranoid Pirate workers to understand. Although ZeroMQ will automatically reconnect workers if the broker dies and comes back up, this isn’t sufficient to re-register the workers with the broker. I know of at least two solutions. The simplest, which we use here, is for the worker to monitor the connection using heartbeats, and if it decides the broker is dead, to close its socket and start afresh with a new socket. The alternative is for the broker to challenge unknown workers when it gets a heartbeat from the worker and ask them to re-register. That would require protocol support.

Now let’s design the Majordomo broker. Its core structure is a set of queues, one per service. We will create these queues as workers appear (we could delete them as workers disappear, but forget that for now because it gets complex). Additionally, we keep a queue of workers per service.

And here is the broker: