Chapter Four

Chapter Four - Reliable Request-Reply

topprevnext

In Chapter Three we looked at advanced use of ØMQ's request-reply pattern with worked examples. In this chapter we'll look at the general question of reliability and build a set of reliable messaging patterns on top of ØMQ's core request-reply pattern.

In this chapter we focus heavily on user-space 'patterns', which are reusable models that help you design your ØMQ architecture:

  • The Lazy Pirate pattern: reliable request reply from the client side.
  • The Simple Pirate pattern: reliable request-reply using a LRU queue.
  • The Paranoid Pirate pattern: reliable request-reply with heartbeating.
  • The Majordomo pattern: service-oriented reliable queuing.
  • The Titanic pattern: disk-based / disconnected reliable queuing.
  • The Binary Star pattern: primary-backup server fail-over.
  • The Freelance pattern: brokerless reliable request-reply.

What is "Reliability"?

topprevnext

To understand what 'reliability' means, we have to look at its opposite, namely failure. If we can handle a certain set of failures, we are reliable with respect to those failures. No more, no less. So let's look at the possible causes of failure in a distributed ØMQ application, in roughly descending order of probability:

  • Application code is the worst offender. It can crash and exit, freeze and stop responding to input, run too slowly for its input, exhaust all memory, etc.
  • System code - like brokers we write using ØMQ - can die. System code should be more reliable than application code but can still crash and burn, and especially run out of memory if it tries to compensate for slow clients.
  • Message queues can overflow, typically in system code that has learned to deal brutally with slow clients. When a queue overflows, it starts to discard messages.
  • Networks can fail temporarily, causing intermittent message loss. Such errors are hidden to ØMQ applications since it automatically reconnects peers after a network-forced disconnection.
  • Hardware can fail and take with it all the processes running on that box.
  • Networks can fail in exotic ways, e.g. some ports on a switch may die and those parts of the network become inaccessible.
  • Entire data centers can be struck by lightning, earthquakes, fire, or more mundane power or cooling failures.

To make a software system fully reliable against all of these possible failures is an enormously difficult and expensive job and goes beyond the scope of this modest guide.

Since the first five cases cover 99.9% of real world requirements outside large companies (according to a highly scientific study I just ran), that's what we'll look at. If you're a large company with money to spend on the last two cases, contact me immediately, there's a large hole behind my beach house waiting to be converted into a pool.

Designing Reliability

topprevnext

So to make things brutally simple, reliability is "keeping things working properly when code freezes or crashes", a situation we'll shorten to "dies". However the things we want to keep working properly are more complex than just messages. We need to take each core ØMQ messaging pattern and see how to make it work (if we can) even when code dies.

Let's take them one by one:

  • Request-reply: if the server dies (while processing a request), the client can figure that out since it won't get an answer back. Then it can give up in a huff, wait and try again later, find another server, etc. As for the client dying, we can brush that off as "someone else's problem" for now.
  • Publish-subscribe: if the client dies (having gotten some data), the server doesn't know about it. Pubsub doesn't send any information back from client to server. But the client can contact the server out-of-band, e.g. via request-reply, and ask, "please resend everything I missed". As for the server dying, that's out of scope for here. Subscribers can also self-verify that they're not running too slowly, and take action (e.g. warn the operator, and die) if they are.
  • Pipeline: if a worker dies (while working), the ventilator doesn't know about it. Pipelines, like pubsub, and the grinding gears of time, only work in one direction. But the downstream collector can detect that one task didn't get done, and send a message back to the ventilator saying, "hey, resend task 324!" If the ventilator or collector dies, then whatever upstream client originally sent the work batch can get tired of waiting and resend the whole lot. It's not elegant but system code should really not die often enough to matter.

In this chapter we'll focus on request-reply, and we'll cover reliable pub-sub and pipeline in the following chapters.

The basic request-reply pattern (a REQ client socket doing a blocking send/recv to a REP server socket) scores low on handling the most common types of failure. If the server crashes while processing the request, the client just hangs forever. If the network loses the request or the reply, the client hangs forever.

It is a lot better than TCP, thanks to ØMQ's ability to reconnect peers silently, to load-balance messages, and so on. But it's still not good enough for real work. The only use case where you can trust the basic request-reply pattern is between two threads in the same process where there's no network or separate server process to die.

However, with a little extra work this humble pattern becomes a good basis for real work across a distributed network, and we get a set of reliable request-reply patterns I like to call the "Pirate" patterns. RRR!

There are, roughly, three ways to connect clients to servers, each needing a specific approach to reliability:

  • Multiple clients talking directly to a single server. Use case: single well-known server that clients need to talk to. Types of failure we aim to handle: server crashes and restarts, network disconnects.
  • Multiple clients talking to a single queue device that distributes work to multiple servers. Use case: workload distribution to workers. Types of failure we aim to handle: worker crashes and restarts, worker busy looping, worker overload, queue crashes and restarts, network disconnects.
  • Multiple clients talking to multiple servers with no intermediary devices. Use case: distributed services such as name resolution. Types of failure we aim to handle: service crashes and restarts, service busy looping, service overload, network disconnects.

Each of these has their trade-offs and often you'll mix them. We'll look at all three of these in detail.

Client-side Reliability (Lazy Pirate Pattern)

topprevnext

We can get very simple reliable request-reply with only some changes in the client. We call this the Lazy Pirate pattern. Rather than doing a blocking receive, we:

  • Poll the REQ socket and only receive from it when it's sure a reply has arrived.
  • Resend a request several times, if no reply arrived within a timeout period.
  • Abandon the transaction if after several requests, there is still no reply.

Figure 53 - The Lazy Pirate Pattern

fig53.png

If you try to use a REQ socket in anything than a strict send-recv fashion, you'll get an error (technically, the REQ socket implements a small finite-state machine to enforce the send-recv ping-pong, and so the error code is called "EFSM"). This is slightly annoying when we want to use REQ in a pirate pattern, because we may send several requests before getting a reply. The pretty good brute-force solution is to close and reopen the REQ socket after an error:

--
-- Lazy Pirate client
-- Use zmq_poll to do a safe request-reply
-- To run, start lpserver and then randomly kill/restart it
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

require"zmq"
require"zmq.poller"
require"zhelpers"

local REQUEST_TIMEOUT = 2500 -- msecs, (> 1000!)
local REQUEST_RETRIES = 3 -- Before we abandon

-- Helper function that returns a new configured socket
-- connected to the Hello World server
--
local function s_client_socket(context)
printf ("I: connecting to server…\n")
local client = context:socket(zmq.REQ)
client:connect("tcp://localhost:5555")

-- Configure socket to not wait at close time
client:setopt(zmq.LINGER, 0)
return client
end
s_version_assert (2, 1)
local context = zmq.init(1)
local client = s_client_socket (context)

local sequence = 0
local retries_left = REQUEST_RETRIES
local expect_reply = true

local poller = zmq.poller(1)

local function client_cb()
-- We got a reply from the server, must match sequence
--local reply = assert(client:recv(zmq.NOBLOCK))
local reply = client:recv()
if (tonumber(reply) == sequence) then
printf ("I: server replied OK (%s)\n", reply)
retries_left = REQUEST_RETRIES
expect_reply = false
else
printf ("E: malformed reply from server: %s\n", reply)
end
end
poller:add(client, zmq.POLLIN, client_cb)

while (retries_left > 0) do
sequence = sequence + 1
-- We send a request, then we work to get a reply
local request = string.format("%d", sequence)
client:send(request)
expect_reply = true

while (expect_reply) do
-- Poll socket for a reply, with timeout
local cnt = assert(poller:poll(REQUEST_TIMEOUT * 1000))

-- Check if there was no reply
if (cnt == 0) then
retries_left = retries_left - 1
if (retries_left == 0) then
printf ("E: server seems to be offline, abandoning\n")
break
else
printf ("W: no response from server, retrying…\n")
-- Old socket is confused; close it and open a new one
poller:remove(client)
client:close()
client = s_client_socket (context)
poller:add(client, zmq.POLLIN, client_cb)
-- Send request again, on new socket
client:send(request)
end
end
end
end
client:close()
context:term()

lpclient.lua: Lazy Pirate client

Run this together with the matching server:

--
-- Lazy Pirate server
-- Binds REQ socket to tcp://*:5555
-- Like hwserver except:
-- - echoes request as-is
-- - randomly runs slowly, or exits to simulate a crash.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zhelpers"

math.randomseed(os.time())

local context = zmq.init(1)
local server = context:socket(zmq.REP)
server:bind("tcp://*:5555")

local cycles = 0
while true do
local request = server:recv()
cycles = cycles + 1

-- Simulate various problems, after a few cycles
if (cycles > 3 and randof (3) == 0) then
printf("I: simulating a crash\n")
break
elseif (cycles > 3 and randof (3) == 0) then
printf("I: simulating CPU overload\n")
s_sleep(2000)
end
printf("I: normal request (%s)\n", request)
s_sleep(1000) -- Do some heavy work
server:send(request)
end
server:close()
context:term()

lpserver.lua: Lazy Pirate server

To run this testcase, start the client and the server in two console windows. The server will randomly misbehave after a few messages. You can check the client's response. Here is a typical output from the server:

I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash

And here is the client's response:

I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning

The client sequences each message, and checks that replies come back exactly in order: that no requests or replies are lost, and no replies come back more than once, or out of order. Run the test a few times until you're convinced this mechanism actually works. You don't need sequence numbers in reality, they just help us trust our design.

The client uses a REQ socket, and does the brute-force close/reopen because REQ sockets impose a strict send/receive cycle. You might be tempted to use a DEALER instead, but it would not be a good decision. First, it would mean emulating the secret sauce that REQ does with envelopes (if you've forgotten what that is, it's a good sign you don't want to have to do it). Second, it would mean potentially getting back replies that you didn't expect.

Handling failures only at the client works when we have a set of clients talking to a single server. It can handle a server crash, but only if recovery means restarting that same server. If there's a permanent error - e.g. a dead power supply on the server hardware - this approach won't work. Since the application code in servers is usually the biggest source of failures in any architecture, depending on a single server is not a great idea.

So, pros and cons:

  • Pro: simple to understand and implement.
  • Pro: works easily with existing client and server application code.
  • Pro: ØMQ automatically retries the actual reconnection until it works.
  • Con: doesn't do fail-over to backup / alternate servers.

Basic Reliable Queuing (Simple Pirate Pattern)

topprevnext

Our second approach takes Lazy Pirate pattern and extends it with a queue device that lets us talk, transparently, to multiple servers, which we can more accurately call 'workers'. We'll develop this in stages, starting with a minimal working model, the Simple Pirate pattern.

In all these Pirate patterns, workers are stateless, or have some shared state we don't know about, e.g. a shared database. Having a queue device means workers can come and go without clients knowing anything about it. If one worker dies, another takes over. This is a nice simple topology with only one real weakness, namely the central queue itself, which can become a problem to manage, and a single point of failure.

The basis for the queue device is the least-recently-used (LRU) routing queue from Chapter Three. What is the very minimum we need to do to handle dead or blocked workers? Turns out, it's surprisingly little. We already have a retry mechanism in the client. So using the standard LRU queue will work pretty well. This fits with ØMQ's philosophy that we can extend a peer-to-peer pattern like request-reply by plugging naive devices in the middle.

Figure 54 - The Simple Pirate Pattern

fig54.png

We don't need a special client, we're still using the Lazy Pirate client. Here is the queue, which is exactly a LRU queue, no more or less:

--
-- Simple Pirate queue
-- This is identical to the LRU pattern, with no reliability mechanisms
-- at all. It depends on the client for recovery. Runs forever.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

require"zmq"
require"zmq.poller"
require"zhelpers"
require"zmsg"

local tremove = table.remove

local MAX_WORKERS = 100

s_version_assert (2, 1)

-- Prepare our context and sockets
local context = zmq.init(1)
local frontend = context:socket(zmq.ROUTER)
local backend = context:socket(zmq.ROUTER)
frontend:bind("tcp://*:5555"); -- For clients
backend:bind("tcp://*:5556"); -- For workers

-- Queue of available workers
local worker_queue = {}
local is_accepting = false

local poller = zmq.poller(2)

local function frontend_cb()
-- Now get next client request, route to next worker
local msg = zmsg.recv (frontend)

-- Dequeue a worker from the queue.
local worker = tremove(worker_queue, 1)

msg:wrap(worker, "")
msg:send(backend)

if (#worker_queue == 0) then
-- stop accepting work from clients, when no workers are available.
poller:remove(frontend)
is_accepting = false
end
end

-- Handle worker activity on backend
poller:add(backend, zmq.POLLIN, function()
local msg = zmsg.recv(backend)
-- Use worker address for LRU routing
worker_queue[#worker_queue + 1] = msg:unwrap()

-- start accepting client requests, if we are not already doing so.
if not is_accepting then
is_accepting = true
poller:add(frontend, zmq.POLLIN, frontend_cb)
end

-- Forward message to client if it's not a READY
if (msg:address() ~= "READY") then
msg:send(frontend)
end
end)

-- start poller's event loop
poller:start()

-- We never exit the main loop

spqueue.lua: Simple Pirate queue

Here is the worker, which takes the Lazy Pirate server and adapts it for the LRU pattern (using the REQ 'ready' signaling):

--
-- Simple Pirate worker
-- Connects REQ socket to tcp://*:5556
-- Implements worker part of LRU queueing
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zmsg"

math.randomseed(os.time())

local context = zmq.init(1)
local worker = context:socket(zmq.REQ)

-- Set random identity to make tracing easier
local identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
worker:setopt(zmq.IDENTITY, identity)
worker:connect("tcp://localhost:5556")

-- Tell queue we're ready for work
printf ("I: (%s) worker ready\n", identity)
worker:send("READY")

local cycles = 0
while true do
local msg = zmsg.recv (worker)

-- Simulate various problems, after a few cycles
cycles = cycles + 1
if (cycles > 3 and randof (5) == 0) then
printf ("I: (%s) simulating a crash\n", identity)
break
elseif (cycles > 3 and randof (5) == 0) then
printf ("I: (%s) simulating CPU overload\n", identity)
s_sleep (5000)
end
printf ("I: (%s) normal reply - %s\n",
identity, msg:body())
s_sleep (1000) -- Do some heavy work
msg:send(worker)
end
worker:close()
context:term()

spworker.lua: Simple Pirate worker

To test this, start a handful of workers, a client, and the queue, in any order. You'll see that the workers eventually all crash and burn, and the client retries and then gives up. The queue never stops, and you can restart workers and clients ad-nauseam. This model works with any number of clients and workers.

Robust Reliable Queuing (Paranoid Pirate Pattern)

topprevnext

The Simple Pirate Queue pattern works pretty well, especially since it's just a combination of two existing patterns, but it has some weaknesses:

  • It's not robust against a queue crash and restart. The client will recover, but the workers won't. While ØMQ will reconnect workers' sockets automatically, as far as the newly started queue is concerned, the workers haven't signaled "READY", so don't exist. To fix this we have to do heartbeating from queue to worker, so that the worker can detect when the queue has gone away.
  • The queue does not detect worker failure, so if a worker dies while idle, the queue can only remove it from its worker queue by first sending it a request. The client waits and retries for nothing. It's not a critical problem but it's not nice. To make this work properly we do heartbeating from worker to queue, so that the queue can detect a lost worker at any stage.

We'll fix these in a properly pedantic Paranoid Pirate Pattern.

We previously used a REQ socket for the worker. For the Paranoid Pirate worker we'll switch to a DEALER socket. This has the advantage of letting us send and receive messages at any time, rather than the lock-step send/receive that REQ imposes. The downside of DEALER is that we have to do our own envelope management. If you don't know what I mean, please re-read Chapter Three.

Figure 55 - The Paranoid Pirate Pattern

fig55.png

We're still using the Lazy Pirate client. Here is the Paranoid Pirate queue device:

--
-- Paranoid Pirate queue
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zmq.poller"
require"zmsg"

local MAX_WORKERS = 100
local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable
local HEARTBEAT_INTERVAL = 1000 -- msecs

local tremove = table.remove

-- Insert worker at end of queue, reset expiry
-- Worker must not already be in queue
local function s_worker_append(queue, identity)
if queue[identity] then
printf ("E: duplicate worker identity %s", identity)
else
assert (#queue < MAX_WORKERS)
queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
queue[#queue + 1] = identity
end
end
-- Remove worker from queue, if present
local function s_worker_delete(queue, identity)
for i=1,#queue do
if queue == identity then
tremove(queue, i)
break
end
end
queue[identity] = nil
end
-- Reset worker expiry, worker must be present
local function s_worker_refresh(queue, identity)
if queue[identity] then
queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
else
printf("E: worker %s not ready\n", identity)
end
end
-- Pop next available worker off queue, return identity
local function s_worker_dequeue(queue)
assert (#queue > 0)
local identity = tremove(queue, 1)
queue[identity] = nil
return identity
end
-- Look for & kill expired workers
local function s_queue_purge(queue)
local curr_clock = s_clock()
-- Work backwards from end to simplify removal
for i=#queue,1,-1 do
local id = queue
if (curr_clock > queue[id]) then
tremove(queue, i)
queue[id] = nil
end
end
end
s_version_assert (2, 1)

-- Prepare our context and sockets
local context = zmq.init(1)
local frontend = context:socket(zmq.ROUTER)
local backend = context:socket(zmq.ROUTER)
frontend:bind("tcp://*:5555"); -- For clients
backend:bind("tcp://*:5556"); -- For workers

-- Queue of available workers
local queue = {}
local is_accepting = false

-- Send out heartbeats at regular intervals
local heartbeat_at = s_clock() + HEARTBEAT_INTERVAL

local poller = zmq.poller(2)

local function frontend_cb()
-- Now get next client request, route to next worker
local msg = zmsg.recv(frontend)
local identity = s_worker_dequeue (queue)
msg:push(identity)
msg:send(backend)

if (#queue == 0) then
-- stop accepting work from clients, when no workers are available.
poller:remove(frontend)
is_accepting = false
end
end

-- Handle worker activity on backend
poller:add(backend, zmq.POLLIN, function()
local msg = zmsg.recv(backend)
local identity = msg:unwrap()

-- Return reply to client if it's not a control message
if (msg:parts() == 1) then
if (msg:address() == "READY") then
s_worker_delete(queue, identity)
s_worker_append(queue, identity)
elseif (msg:address() == "HEARTBEAT") then
s_worker_refresh(queue, identity)
else
printf("E: invalid message from %s\n", identity)
msg:dump()
end
else
-- reply for client.
msg:send(frontend)
s_worker_append(queue, identity)
end

-- start accepting client requests, if we are not already doing so.
if not is_accepting and #queue > 0 then
is_accepting = true
poller:add(frontend, zmq.POLLIN, frontend_cb)
end
end)

-- start poller's event loop
while true do
local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))
-- Send heartbeats to idle workers if it's time
if (s_clock() > heartbeat_at) then
for i=1,#queue do
local msg = zmsg.new("HEARTBEAT")
msg:wrap(queue, nil)
msg:send(backend)
end
heartbeat_at = s_clock() + HEARTBEAT_INTERVAL
end
s_queue_purge(queue)
end

-- We never exit the main loop
-- But pretend to do the right shutdown anyhow
while (#queue > 0) do
s_worker_dequeue(queue)
end

frontend:close()
backend:close()

ppqueue.lua: Paranoid Pirate queue

The queue extends the LRU pattern with heartbeating of workers. Heartbeating is simple once it works, but quite difficult to invent. I'll explain more about that in a second.

Here is the Paranoid Pirate worker:

--
-- Paranoid Pirate worker
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zmq.poller"
require"zmsg"

local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable
local HEARTBEAT_INTERVAL = 1000 -- msecs
local INTERVAL_INIT = 1000 -- Initial reconnect
local INTERVAL_MAX = 32000 -- After exponential backoff

-- Helper function that returns a new configured socket
-- connected to the Hello World server
--
local identity

local function s_worker_socket (context)
local worker = context:socket(zmq.DEALER)

-- Set random identity to make tracing easier
identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
worker:setopt(zmq.IDENTITY, identity)
worker:connect("tcp://localhost:5556")

-- Configure socket to not wait at close time
worker:setopt(zmq.LINGER, 0)

-- Tell queue we're ready for work
printf("I: (%s) worker ready\n", identity)
worker:send("READY")

return worker
end

s_version_assert (2, 1)
math.randomseed(os.time())

local context = zmq.init(1)
local worker = s_worker_socket (context)

-- If liveness hits zero, queue is considered disconnected
local liveness = HEARTBEAT_LIVENESS
local interval = INTERVAL_INIT

-- Send out heartbeats at regular intervals
local heartbeat_at = s_clock () + HEARTBEAT_INTERVAL

local poller = zmq.poller(1)

local is_running = true

local cycles = 0
local function worker_cb()
-- Get message
-- - 3-part envelope + content -> request
-- - 1-part "HEARTBEAT" -> heartbeat
local msg = zmsg.recv (worker)

if (msg:parts() == 3) then
-- Simulate various problems, after a few cycles
cycles = cycles + 1
if (cycles > 3 and randof (5) == 0) then
printf ("I: (%s) simulating a crash\n", identity)
is_running = false
return
elseif (cycles > 3 and randof (5) == 0) then
printf ("I: (%s) simulating CPU overload\n",
identity)
s_sleep (5000)
end
printf ("I: (%s) normal reply - %s\n",
identity, msg:body())
msg:send(worker)
liveness = HEARTBEAT_LIVENESS
s_sleep(1000); -- Do some heavy work
elseif (msg:parts() == 1 and msg:body() == "HEARTBEAT") then
liveness = HEARTBEAT_LIVENESS
else
printf ("E: (%s) invalid message\n", identity)
msg:dump()
end
interval = INTERVAL_INIT
end
poller:add(worker, zmq.POLLIN, worker_cb)

while is_running do
local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))

if (cnt == 0) then
liveness = liveness - 1
if (liveness == 0) then
printf ("W: (%s) heartbeat failure, can't reach queue\n",
identity)
printf ("W: (%s) reconnecting in %d msec…\n",
identity, interval)
s_sleep (interval)

if (interval < INTERVAL_MAX) then
interval = interval * 2
end
poller:remove(worker)
worker:close()
worker = s_worker_socket (context)
poller:add(worker, zmq.POLLIN, worker_cb)
liveness = HEARTBEAT_LIVENESS
end
end
-- Send heartbeat to queue if it's time
if (s_clock () > heartbeat_at) then
heartbeat_at = s_clock () + HEARTBEAT_INTERVAL
printf("I: (%s) worker heartbeat\n", identity)
worker:send("HEARTBEAT")
end
end
worker:close()
context:term()

ppworker.lua: Paranoid Pirate worker

Some comments about this example:

  • The code includes simulation of failures, as before. This makes it (a) very hard to debug, and (b) dangerous to reuse. When you want to debug this, disable the failure simulation.
  • As for the Paranoid Pirate queue, the heartbeating is quite tricky to get right. See below for a discussion about heartbeating.
  • The worker uses a reconnect strategy similar to the one we designed for the Lazy Pirate client. With two major differences: (a) it does an exponential back-off, and (b) it never abandons.

Try the client, queue, and workers, e.g. using a script like this:

ppqueue &
for i in 1 2 3 4; do
    ppworker &
    sleep 1
done
lpclient &

You should see the workers die, one by one, as they simulate a crash, and the client eventually give up. You can stop and restart the queue and both client and workers will reconnect and carry on. And no matter what you do to queues and workers, the client will never get an out-of-order reply: the whole chain either works, or the client abandons.

Heartbeating

topprevnext

When writing the Paranoid Pirate examples, it took about five hours to get the queue-to-worker heartbeating working properly. The rest of the request-reply chain took perhaps ten minutes. Heartbeating is one of those reliability layers that often causes more trouble than it saves. It is especially easy to create 'false failures', i.e. peers decide that they are disconnected because the heartbeats aren't sent properly.

Some points to consider when understanding and implementing heartbeating:

  • Note that heartbeats are not request-reply. They flow asynchronously in both directions. Either peer can decide the other is 'dead' and stop talking to it.
  • First, get the heartbeating working, and only then add in the rest of the message flow. You should be able to prove the heartbeating works by starting peers in any order, stopping and restarting them, simulating freezes, and so on.
  • When your main loop is based on zmq_poll(3), use a secondary timer to trigger heartbeats. Do not use the poll loop for this, because it will either send too many heartbeats (overloading the network), or too few (causing peers to disconnect). The zhelpers package provides an s_clock() method that returns the current system clock in milliseconds. It's easy to use this to calculate when to send the next heartbeats. Thus, in C:

zloop_t *reactor = zloop_new ();
zloop_reader (reactor, self->backend, s_handle_backend, self);
zloop_start (reactor);
zloop_destroy (&reactor);

  • Your main poll loop should use the heartbeat interval as its timeout. Obviously, don't use infinity. Anything less will just waste cycles.
  • Use simple tracing, i.e. print to console, to get this working. Some tricks to help you trace the flow of messages between peers: a dump method such as zmsg offers; number messages incrementally so you can see if there are gaps.
  • In a real application, heartbeating must be configurable and usually negotiated with the peer. Some peers will want aggressive heartbeating, as low as 10 msecs. Other peers will be far away and want heartbeating as high as 30 seconds.
  • If you have different heartbeat intervals for different peers, your poll timeout should be the lowest of these.
  • You might be tempted to open a separate socket dialog for heartbeats. This is superficially nice because you can separate different dialogs, e.g. the synchronous request-reply from the asynchronous heartbeating. However it's a bad idea for several reasons. First, if you're sending data you don't need to send heartbeats. Second, sockets may, due to network vagaries, become jammed. You need to know when your main data socket is silent because it's dead, rather than just not busy, so you need heartbeats on that socket. Lastly, two sockets is more complex than one.
  • We're not doing heartbeating from client to queue. We could, but it would add significant complexity for no real benefit.

Contracts and Protocols

topprevnext

If you're paying attention you'll realize that Paranoid Pirate is not compatible with Simple Pirate, because of the heartbeats.

In fact what we have here is a protocol that needs writing down. It's fun to experiment without specifications, but that's not a sensible basis for real applications. What happens if we want to write a worker in another language? Do we have to read code to see how things work? What if we want to change the protocol for some reason? The protocol may be simple but it's not obvious, and if it's successful it'll become more complex.

Lack of contracts is a sure sign of a disposable application. So, let's write a contract for this protocol. How do we do that?

  • There's a wiki, at rfc.zeromq.org, that we made especially as a home for public ØMQ contracts.
  • To create a new specification, register, and follow the instructions. It's straight-forward, though technical writing is not for everyone.

It took me about fifteen minutes to draft the new Pirate Pattern Protocol. It's not a big specification but it does capture enough to act as the basis for arguments ("your queue isn't PPP compatible, please fix it!").

Turning PPP into a real protocol would take more work:

  • There should be a protocol version number in the READY command so that it's possible to create new versions of PPP safely.
  • Right now, READY and HEARTBEAT are not entirely distinct from requests and replies. To make them distinct, we would want a message structure that includes a "message type" part.

Service-Oriented Reliable Queuing (Majordomo Pattern)

topprevnext

The nice thing about progress is how fast it happens when lawyers and committees aren't involved. Just a few sentences ago we were dreaming of a better protocol that would fix the world. And here we have it:

This one-page specification takes PPP and turns it into something more solid. This is how we should design complex architectures: start by writing down the contracts, and only then write software to implement them.

The Majordomo Protocol (MDP) extends and improves PPP in one interesting way apart from the two points above. It adds a "service name" to requests that the client sends, and asks workers to register for specific services. The nice thing about MDP is that it came from working code, a simpler protocol, and a precise set of improvements. This made it easy to draft.

Adding service names is a small but significant change that turns our Paranoid Pirate queue into a service-oriented broker.

Figure 56 - The Majordomo Pattern

fig56.png

To implement Majordomo we need to write a framework for clients and workers. It's really not sane to ask every application developer to read the spec and make it work, when they could be using a simpler API built and tested just once.

So, while our first contract (MDP itself) defines how the pieces of our distributed architecture talk to each other, our second contract defines how user applications talk to the technical framework we're going to design.

Majordomo has two halves, a client side and a worker side. Since we'll write both client and worker applications, we will need two APIs. Here is a sketch for the client API, using a simple object-oriented approach. We write this in C, using the style of the ZFL library:

// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

while (1) {

int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);

// Send heartbeat to queue if it's time
if (zclock_time () > heartbeat_at) {
… Send heartbeats to all peers that expect them
// Set timer for next heartbeat
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
}

That's it. We open a session to the broker, we send a request message and get a reply message back, and we eventually close the connection. Here's a sketch for the worker API:

mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);

It's more or less symmetrical but the worker dialog is a little different. The first time a worker does a recv(), it passes a null reply, thereafter it passes the current reply, and gets a new request.

The client and worker APIs were fairly simple to construct, since they're heavily based on the Paranoid Pirate code we already developed. Here is the client API:

--
-- mdcliapi.lua - Majordomo Protocol Client API
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

local setmetatable = setmetatable

local mdp = require"mdp"

local zmq = require"zmq"
local zpoller = require"zmq.poller"
local zmsg = require"zmsg"
require"zhelpers"

local s_version_assert = s_version_assert

local obj_mt = {}
obj_mt.__index = obj_mt

function obj_mt:set_timeout(timeout)
self.timeout = timeout
end

function obj_mt:set_retries(retries)
self.retries = retries
end

function obj_mt:destroy()
if self.client then self.client:close() end
self.context:term()
end

local function s_mdcli_connect_to_broker(self)
-- close old socket.
if self.client then
self.poller:remove(self.client)
self.client:close()
end
self.client = assert(self.context:socket(zmq.REQ))
assert(self.client:setopt(zmq.LINGER, 0))
assert(self.client:connect(self.broker))
if self.verbose then
s_console("I: connecting to broker at %s…", self.broker)
end
-- add socket to poller
self.poller:add(self.client, zmq.POLLIN, function()
self.got_reply = true
end)
end

--
-- Send request to broker and get reply by hook or crook
-- Returns the reply message or nil if there was no reply.
--
function obj_mt:send(service, request)
-- Prefix request with protocol frames
-- Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
-- Frame 2: Service name (printable string)
request:push(service)
request:push(mdp.MDPC_CLIENT)
if self.verbose then
s_console("I: send request to '%s' service:", service)
request:dump()
end

local retries = self.retries
while (retries > 0) do
local msg = request:dup()
msg:send(self.client)
self.got_reply = false

while true do
local cnt = assert(self.poller:poll(self.timeout * 1000))
if cnt ~= 0 and self.got_reply then
local msg = zmsg.recv(self.client)
if self.verbose then
s_console("I: received reply:")
msg:dump()
end
assert(msg:parts() >= 3)

local header = msg:pop()
assert(header == mdp.MDPC_CLIENT)
local reply_service = msg:pop()
assert(reply_service == service)
return msg
else
retries = retries - 1
if (retries > 0) then
if self.verbose then
s_console("W: no reply, reconnecting…")
end
-- Reconnect
s_mdcli_connect_to_broker(self)
break -- outer loop will resend request.
else
if self.verbose then
s_console("W: permanent error, abandoning request")
end
return nil -- Giving up
end
end
end
end
end

module(...)

function new(broker, verbose)
s_version_assert (2, 1);
local self = setmetatable({
context = zmq.init(1),
poller = zpoller.new(1),
broker = broker,
verbose = verbose,
timeout = 2500, -- msecs
retries = 3, -- before we abandon
}, obj_mt)

s_mdcli_connect_to_broker(self)
return self
end

setmetatable(_M, { __call = function(self, ...) return new(...) end })

mdcliapi.lua: Majordomo client API

With an example test program that does 100K request-reply cycles:

--
-- Majordomo Protocol client example
-- Uses the mdcli API to hide all MDP aspects
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

require"mdcliapi"
require"zmsg"
require"zhelpers"

local verbose = (arg[1] == "-v")
local session = mdcliapi.new("tcp://localhost:5555", verbose)

local count=1
repeat
local request = zmsg.new("Hello world")
local reply = session:send("echo", request)
if not reply then
break -- Interrupt or failure
end
count = count + 1
until (count == 100000)
printf("%d requests/replies processed\n", count)
session:destroy()

mdclient.lua: Majordomo client application

And here is the worker API:

--
-- mdwrkapi.lua - Majordomo Protocol Worker API
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable

local setmetatable = setmetatable

local mdp = require"mdp"

local zmq = require"zmq"
local zpoller = require"zmq.poller"
local zmsg = require"zmsg"
require"zhelpers"

local s_version_assert = s_version_assert

local obj_mt = {}
obj_mt.__index = obj_mt

function obj_mt:set_heartbeat(heartbeat)
self.heartbeat = heartbeat
end

function obj_mt:set_reconnect(reconnect)
self.reconnect = reconnect
end

function obj_mt:destroy()
if self.worker then self.worker:close() end
self.context:term()
end

-- Send message to broker
-- If no msg is provided, create one internally
local function s_mdwrk_send_to_broker(self, command, option, msg)
msg = msg or zmsg.new()

-- Stack protocol envelope to start of message
if option then
msg:push(option)
end
msg:push(command)
msg:push(mdp.MDPW_WORKER)
msg:push("")

if self.verbose then
s_console("I: sending %s to broker", mdp.mdps_commands[command])
msg:dump()
end
msg:send(self.worker)
end

local function s_mdwrk_connect_to_broker(self)
-- close old socket.
if self.worker then
self.poller:remove(self.worker)
self.worker:close()
end
self.worker = assert(self.context:socket(zmq.DEALER))
assert(self.worker:setopt(zmq.LINGER, 0))
assert(self.worker:connect(self.broker))
if self.verbose then
s_console("I: connecting to broker at %s…", self.broker)
end
-- Register service with broker
s_mdwrk_send_to_broker(self, mdp.MDPW_READY, self.service)
-- If liveness hits zero, queue is considered disconnected
self.liveness = HEARTBEAT_LIVENESS
self.heartbeat_at = s_clock() + self.heartbeat
-- add socket to poller
self.poller:add(self.worker, zmq.POLLIN, function()
self.got_msg = true
end)
end

--
-- Send reply, if any, to broker and wait for next request.
--
function obj_mt:recv(reply)
-- Format and send the reply if we are provided one
if reply then
assert(self.reply_to)
reply:wrap(self.reply_to, "")
self.reply_to = nil
s_mdwrk_send_to_broker(self, mdp.MDPW_REPLY, nil, reply)
end
self.expect_reply = true

self.got_msg = false
while true do
local cnt = assert(self.poller:poll(self.heartbeat * 1000))
if cnt ~= 0 and self.got_msg then
self.got_msg = false
local msg = zmsg.recv(self.worker)
if self.verbose then
s_console("I: received message from broker:")
msg:dump()
end
self.liveness = HEARTBEAT_LIVENESS
-- Don't try to handle errors, just assert noisily
assert(msg:parts() >= 3)

local empty = msg:pop()
assert(empty == "")

local header = msg:pop()
assert(header == mdp.MDPW_WORKER)

local command = msg:pop()
if command == mdp.MDPW_REQUEST then
-- We should pop and save as many addresses as there are
-- up to a null part, but for now, just save one…
self.reply_to = msg:unwrap()
return msg -- We have a request to process
elseif command == mdp.MDPW_HEARTBEAT then
-- Do nothing for heartbeats
elseif command == mdp.MDPW_DISCONNECT then
-- dis-connect and re-connect to broker.
s_mdwrk_connect_to_broker(self)
else
s_console("E: invalid input message (%d)", command:byte(1,1))
msg:dump()
end
else
self.liveness = self.liveness - 1
if (self.liveness == 0) then
if self.verbose then
s_console("W: disconnected from broker - retrying…")
end
-- sleep then Reconnect
s_sleep(self.reconnect)
s_mdwrk_connect_to_broker(self)
end

-- Send HEARTBEAT if it's time
if (s_clock() > self.heartbeat_at) then
s_mdwrk_send_to_broker(self, mdp.MDPW_HEARTBEAT)
self.heartbeat_at = s_clock() + self.heartbeat
end
end
end
end

module(...)

function new(broker, service, verbose)
s_version_assert(2, 1);
local self = setmetatable({
context = zmq.init(1),
poller = zpoller.new(1),
broker = broker,
service = service,
verbose = verbose,
heartbeat = 2500, -- msecs
reconnect = 2500, -- msecs
}, obj_mt)

s_mdwrk_connect_to_broker(self)
return self
end

setmetatable(_M, { __call = function(self, ...) return new(...) end })

mdwrkapi.lua: Majordomo worker API

With an example test program that implements an 'echo' service:

--
-- Majordomo Protocol worker example
-- Uses the mdwrk API to hide all MDP aspects
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

require"mdwrkapi"
require"zmsg"

local verbose = (arg[1] == "-v")
local session = mdwrkapi.new("tcp://localhost:5555", "echo", verbose)

local reply
while true do
local request = session:recv(reply)
if not request then
break -- Worker was interrupted
end
reply = request -- Echo is complex… :-)
end
session:destroy()

mdworker.lua: Majordomo worker application

Notes on this code:

  • The APIs are single threaded. This means, for example, that the worker won't send heartbeats in the background. Happily, this is exactly what we want: if the worker application gets stuck, heartbeats will stop and the broker will stop sending requests to the worker.
  • The worker API doesn't do an exponential back-off, it's not worth the extra complexity.
  • The APIs don't do any error reporting. If something isn't as expected, they raise an assertion (or exception depending on the language). This is ideal for a reference implementation, so any protocol errors show immediately. For real applications the API should be robust against invalid messages.

You might wonder why the worker API is manually closing its socket and opening a new one, when ØMQ will automatically reconnect a socket if the peer disappears and comes back. Look back at the Simple Pirate worker, and the Paranoid Pirate worker to understand. While ØMQ will automatically reconnect workers, if the broker dies and comes back up, this isn't sufficient to re-register the workers with the broker. There are at least two solutions I know of. The simplest, which we use here, is that the worker monitors the connection using heartbeats, and if it decides the broker is dead, closes its socket and starts afresh with a new socket. The alternative is for the broker to challenge unknown workers — when it gets a heartbeat from the worker — and ask them to re-register. That would require protocol support.

Let's design the Majordomo broker. Its core structure is a set of queues, one per service. We will create these queues as workers appear (we could delete them as workers disappear but forget that for now, it gets complex). Additionally, we keep a queue of workers per service.

To make the C examples easier to write and read, I've taken the hash and list container classes from the ZFL project, and renamed them as [zlist and zhash, as we did with zmsg. In any modern language you can of course use built-in containers.

And here is the broker:

--
-- Majordomo Protocol broker
-- A minimal implementation of http://rfc.zeromq.org/spec:7 and spec:8
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zmq.poller"
require"zmsg"
require"zhelpers"
require"mdp"

local tremove = table.remove

-- We'd normally pull these from config data

local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable
local HEARTBEAT_INTERVAL = 2500 -- msecs
local HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

-- ---------------------------------------------------------------------
-- Constructor for broker object

-- ---------------------------------------------------------------------
-- Broker object's metatable.
local broker_mt = {}
broker_mt.__index = broker_mt

function broker_new(verbose)
local context = zmq.init(1)
-- Initialize broker state
return setmetatable({
context = context,
socket = context:socket(zmq.ROUTER),
verbose = verbose,
services = {},
workers = {},
waiting = {},
heartbeat_at = s_clock() + HEARTBEAT_INTERVAL,
}, broker_mt)
end

-- ---------------------------------------------------------------------
-- Service object
local service_mt = {}
service_mt.__index = service_mt

-- Worker object
local worker_mt = {}
worker_mt.__index = worker_mt

-- helper list remove function
local function zlist_remove(list, item)
for n=#list,1,-1 do
if list[n] == item then
tremove(list, n)
end
end
end

-- ---------------------------------------------------------------------
-- Destructor for broker object

function broker_mt:destroy()
self.socket:close()
self.context:term()
for name, service in pairs(self.services) do
service:destroy()
end
for id, worker in pairs(self.workers) do
worker:destroy()
end
end

-- ---------------------------------------------------------------------
-- Bind broker to endpoint, can call this multiple times
-- We use a single socket for both clients and workers.

function broker_mt:bind(endpoint)
self.socket:bind(endpoint)
s_console("I: MDP broker/0.1.1 is active at %s", endpoint)
end

-- ---------------------------------------------------------------------
-- Delete any idle workers that haven't pinged us in a while.

function broker_mt:purge_workers()
local waiting = self.waiting
for n=1,#waiting do
local worker = waiting[n]
if (worker:expired()) then
if (self.verbose) then
s_console("I: deleting expired worker: %s", worker.identity)
end

self:worker_delete(worker, false)
end
end
end

-- ---------------------------------------------------------------------
-- Locate or create new service entry

function broker_mt:service_require(name)
assert (name)
local service = self.services[name]
if not service then
service = setmetatable({
name = name,
requests = {},
waiting = {},
workers = 0,
}, service_mt)
self.services[name] = service
if (self.verbose) then
s_console("I: received message:")
end
end
return service
end

-- ---------------------------------------------------------------------
-- Destroy service object, called when service is removed from
-- broker.services.

function service_mt:destroy()
end

-- ---------------------------------------------------------------------
-- Dispatch requests to waiting workers as possible

function broker_mt:service_dispatch(service, msg)
assert (service)
local requests = service.requests
if (msg) then -- Queue message if any
requests[#requests + 1] = msg
end

self:purge_workers()
local waiting = service.waiting
while (#waiting > 0 and #requests > 0) do
local worker = tremove(waiting, 1) -- pop worker from service's waiting queue.
zlist_remove(self.waiting, worker) -- also remove worker from broker's waiting queue.
local msg = tremove(requests, 1) -- pop request from service's request queue.
self:worker_send(worker, mdp.MDPW_REQUEST, nil, msg)
end
end

-- ---------------------------------------------------------------------
-- Handle internal service according to 8/MMI specification

function broker_mt:service_internal(service_name, msg)
if (service_name == "mmi.service") then
local name = msg:body()
local service = self.services[name]
if (service and service.workers) then
msg:body_set("200")
else
msg:body_set("404")
end
else
msg:body_set("501")
end

-- Remove & save client return envelope and insert the
-- protocol header and service name, then rewrap envelope.
local client = msg:unwrap()
msg:wrap(mdp.MDPC_CLIENT, service_name)
msg:wrap(client, "")

msg:send(self.socket)
end

-- ---------------------------------------------------------------------
-- Creates worker if necessary

function broker_mt:worker_require(identity)
assert (identity)

-- self.workers is keyed off worker identity
local worker = self.workers[identity]
if (not worker) then
worker = setmetatable({
identity = identity,
expiry = 0,
}, worker_mt)
self.workers[identity] = worker
if (self.verbose) then
s_console("I: registering new worker: %s", identity)
end
end
return worker
end

-- ---------------------------------------------------------------------
-- Deletes worker from all data structures, and destroys worker

function broker_mt:worker_delete(worker, disconnect)
assert (worker)
if (disconnect) then
self:worker_send(worker, mdp.MDPW_DISCONNECT)
end
local service = worker.service
if (service) then
zlist_remove (service.waiting, worker)
service.workers = service.workers - 1
end
zlist_remove (self.waiting, worker)
self.workers[worker.identity] = nil
worker:destroy()
end

-- ---------------------------------------------------------------------
-- Destroy worker object, called when worker is removed from
-- broker.workers.

function worker_mt:destroy(argument)
end

-- ---------------------------------------------------------------------
-- Process message sent to us by a worker

function broker_mt:worker_process(sender, msg)
assert (msg:parts() >= 1) -- At least, command

local command = msg:pop()
local worker_ready = (self.workers[sender] ~= nil)
local worker = self:worker_require(sender)

if (command == mdp.MDPW_READY) then
if (worker_ready) then -- Not first command in session then
self:worker_delete(worker, true)
elseif (sender:sub(1,4) == "mmi.") then -- Reserved service name
self:worker_delete(worker, true)
else
-- Attach worker to service and mark as idle
local service_name = msg:pop()
local service = self:service_require(service_name)
worker.service = service
service.workers = service.workers + 1
self:worker_waiting(worker)
end
elseif (command == mdp.MDPW_REPLY) then
if (worker_ready) then
-- Remove & save client return envelope and insert the
-- protocol header and service name, then rewrap envelope.
local client = msg:unwrap()
msg:wrap(mdp.MDPC_CLIENT, worker.service.name)
msg:wrap(client, "")

msg:send(self.socket)
self:worker_waiting(worker)
else
self:worker_delete(worker, true)
end
elseif (command == mdp.MDPW_HEARTBEAT) then
if (worker_ready) then
worker.expiry = s_clock() + HEARTBEAT_EXPIRY
else
self:worker_delete(worker, true)
end
elseif (command == mdp.MDPW_DISCONNECT) then
self:worker_delete(worker, false)
else
s_console("E: invalid input message (%d)", command:byte(1,1))
msg:dump()
end
end

-- ---------------------------------------------------------------------
-- Send message to worker
-- If pointer to message is provided, sends & destroys that message

function broker_mt:worker_send(worker, command, option, msg)
msg = msg and msg:dup() or zmsg.new()

-- Stack protocol envelope to start of message
if (option) then -- Optional frame after command
msg:push(option)
end
msg:push(command)
msg:push(mdp.MDPW_WORKER)
-- Stack routing envelope to start of message
msg:wrap(worker.identity, "")

if (self.verbose) then
s_console("I: sending %s to worker", mdp.mdps_commands[command])
msg:dump()
end
msg:send(self.socket)
end

-- ---------------------------------------------------------------------
-- This worker is now waiting for work

function broker_mt:worker_waiting(worker)
-- Queue to broker and service waiting lists
self.waiting[#self.waiting + 1] = worker
worker.service.waiting[#worker.service.waiting + 1] = worker
worker.expiry = s_clock() + HEARTBEAT_EXPIRY
self:service_dispatch(worker.service, nil)
end

-- ---------------------------------------------------------------------
-- Return 1 if worker has expired and must be deleted

function worker_mt:expired()
return (self.expiry < s_clock())
end
-- ---------------------------------------------------------------------
-- Process a request coming from a client

function broker_mt:client_process(sender, msg)
assert (msg:parts() >= 2) -- Service name + body

local service_name = msg:pop()
local service = self:service_require(service_name)
-- Set reply return address to client sender
msg:wrap(sender, "")
if (service_name:sub(1,4) == "mmi.") then
self:service_internal(service_name, msg)
else
self:service_dispatch(service, msg)
end
end

-- ---------------------------------------------------------------------
-- Main broker work happens here

local verbose = (arg[1] == "-v")

s_version_assert (2, 1)
s_catch_signals ()
local self = broker_new(verbose)
self:bind("tcp://*:5555")

local poller = zmq.poller.new(1)

-- Process next input message, if any
poller:add(self.socket, zmq.POLLIN, function()
local msg = zmsg.recv(self.socket)
if (self.verbose) then
s_console("I: received message:")
msg:dump()
end
local sender = msg:pop()
local empty = msg:pop()
local header = msg:pop()

if (header == mdp.MDPC_CLIENT) then
self:client_process(sender, msg)
elseif (header == mdp.MDPW_WORKER) then
self:worker_process(sender, msg)
else
s_console("E: invalid message:")
msg:dump()
end
end)

-- Get and process messages forever or until interrupted
while (not s_interrupted) do
local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))
-- Disconnect and delete any expired workers
-- Send heartbeats to idle workers if needed
if (s_clock() > self.heartbeat_at) then
self:purge_workers()
local waiting = self.waiting
for n=1,#waiting do
local worker = waiting[n]
self:worker_send(worker, mdp.MDPW_HEARTBEAT)
end
self.heartbeat_at = s_clock() + HEARTBEAT_INTERVAL
end
end
if (s_interrupted) then
printf("W: interrupt received, shutting down…\n")
end
self:destroy()

mdbroker.lua: Majordomo broker

This is by far the most complex example we've seen. It's almost 500 lines of code. To write this, and make it fully robust took two days. However this is still a short piece of code for a full service-oriented broker.

Notes on this code:

  • The Majordomo Protocol lets us handle both clients and workers on a single socket. This is nicer for those deploying and managing the broker: it just sits on one ØMQ endpoint rather than the two that most devices need.
  • The broker implements all of MDP/0.1 properly (as far as I know), including disconnection if the broker sends invalid commands, heartbeating, and the rest.
  • It can be extended to run multiple threads, each managing one socket and one set of clients and workers. This could be interesting for segmenting large architectures. The C code is already organized around a broker class to make this trivial.
  • A primary-fail-over or live-live broker reliability model is easy, since the broker essentially has no state except service presence. It's up to clients and workers to choose another broker if their first choice isn't up and running.
  • The examples use 5-second heartbeats, mainly to reduce the amount of output when you enable tracing. Realistic values would be lower for most LAN applications. However, any retry has to be slow enough to allow for a service to restart, say 10 seconds at least.

Asynchronous Majordomo Pattern

topprevnext

The way we implemented Majordomo, above, is simple and stupid. The client is just the original Simple Pirate, wrapped up in a sexy API. When I fire up a client, broker, and worker on a test box, it can process 100,000 requests in about 14 seconds. That is partly due to the code, which cheerfully copies message frames around as if CPU cycles were free. But the real problem is that we're doing network round-trips. ØMQ disables Nagle's algorithm, but round-tripping is still slow.

Theory is great in theory, but in practice, practice is better. Let's measure the actual cost of round-tripping with a simple test program. This sends a bunch of messages, first waiting for a reply to each message, and second as a batch, reading all the replies back as a batch. Both approaches do the same work, but they give very different results. We mock-up a client, broker, and worker:

--
-- Round-trip demonstrator
--
-- While this example runs in a single process, that is just to make
-- it easier to start and stop the example. Each thread has its own
-- context and conceptually acts as a separate process.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zmq.threads"
require"zmsg"

local common_code = [[
require"zmq"
require"zmsg"
require"zhelpers"
]]

local client_task = common_code .. [[
local context = zmq.init(1)
local client = context:socket(zmq.DEALER)
client:setopt(zmq.IDENTITY, "C", 1)
client:connect("tcp://localhost:5555")

printf("Setting up test…\n")
s_sleep(100)

local requests
local start

printf("Synchronous round-trip test…\n")
requests = 10000
start = s_clock()
for n=1,requests do
local msg = zmsg.new("HELLO")
msg:send(client)
msg = zmsg.recv(client)
end
printf(" %d calls/second\n",
(1000 * requests) / (s_clock() - start))

printf("Asynchronous round-trip test…\n")
requests = 100000
start = s_clock()
for n=1,requests do
local msg = zmsg.new("HELLO")
msg:send(client)
end
for n=1,requests do
local msg = zmsg.recv(client)
end
printf(" %d calls/second\n",
(1000 * requests) / (s_clock() - start))

client:close()
context:term()
]]

local worker_task = common_code .. [[
local context = zmq.init(1)
local worker = context:socket(zmq.DEALER)
worker:setopt(zmq.IDENTITY, "W", 1)
worker:connect("tcp://localhost:5556")

while true do
local msg = zmsg.recv(worker)
msg:send(worker)
end
worker:close()
context:term()
]]

local broker_task = common_code .. [[
-- Prepare our context and sockets
local context = zmq.init(1)
local frontend = context:socket(zmq.ROUTER)
local backend = context:socket(zmq.ROUTER)
frontend:bind("tcp://*:5555")
backend:bind("tcp://*:5556")

require"zmq.poller"
local poller = zmq.poller(2)
poller:add(frontend, zmq.POLLIN, function()
local msg = zmsg.recv(frontend)
--msg[1] = "W"
msg:pop()
msg:push("W")
msg:send(backend)
end)
poller:add(backend, zmq.POLLIN, function()
local msg = zmsg.recv(backend)
--msg[1] = "C"
msg:pop()
msg:push("C")
msg:send(frontend)
end)
poller:start()
frontend:close()
backend:close()
context:term()
]]

s_version_assert(2, 1)

local client = zmq.threads.runstring(nil, client_task)
assert(client:start())
local worker = zmq.threads.runstring(nil, worker_task)
assert(worker:start(true))
local broker = zmq.threads.runstring(nil, broker_task)
assert(broker:start(true))

assert(client:join())

tripping.lua: Round-trip demonstrator

On my development box, this program says:

Setting up test...
Synchronous round-trip test...
 9057 calls/second
Asynchronous round-trip test...
 173010 calls/second

Note that the client thread does a small pause before starting. This is to get around one of the 'features' of the router socket: if you send a message with the address of a peer that's not yet connected, the message gets discarded. In this example we don't use the LRU mechanism, so without the sleep, if the worker thread is too slow to connect, it'll lose messages, making a mess of our test.

As we see, round-tripping in the simplest case is 20 times slower than "shove it down the pipe as fast as it'll go" asynchronous approach. Let's see if we can apply this to Majordomo to make it faster.

First, we modify the client API to have separate send and recv methods:

mdwrk_t *mdwrk_new (char *broker,char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);

It's literally a few minutes' work to refactor the synchronous client API to become asynchronous:

--
-- mdcliapi2.lua - Majordomo Protocol Client API (async version)
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

local setmetatable = setmetatable

local mdp = require"mdp"

local zmq = require"zmq"
local zpoller = require"zmq.poller"
local zmsg = require"zmsg"
require"zhelpers"

local s_version_assert = s_version_assert

local obj_mt = {}
obj_mt.__index = obj_mt

function obj_mt:set_timeout(timeout)
self.timeout = timeout
end

function obj_mt:destroy()
if self.client then self.client:close() end
self.context:term()
end

local function s_mdcli_connect_to_broker(self)
-- close old socket.
if self.client then
self.poller:remove(self.client)
self.client:close()
end
self.client = assert(self.context:socket(zmq.DEALER))
assert(self.client:setopt(zmq.LINGER, 0))
assert(self.client:connect(self.broker))
if self.verbose then
s_console("I: connecting to broker at %s…", self.broker)
end
-- add socket to poller
self.poller:add(self.client, zmq.POLLIN, function()
self.got_reply = true
end)
end

--
-- Send request to broker and get reply by hook or crook
--
function obj_mt:send(service, request)
-- Prefix request with protocol frames
-- Frame 0: empty (REQ emulation)
-- Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
-- Frame 2: Service name (printable string)
request:push(service)
request:push(mdp.MDPC_CLIENT)
request:push("")
if self.verbose then
s_console("I: send request to '%s' service:", service)
request:dump()
end
request:send(self.client)
return 0
end

-- Returns the reply message or NULL if there was no reply. Does not
-- attempt to recover from a broker failure, this is not possible
-- without storing all unanswered requests and resending them all…
function obj_mt:recv()
self.got_reply = false

local cnt = assert(self.poller:poll(self.timeout * 1000))
if cnt ~= 0 and self.got_reply then
local msg = zmsg.recv(self.client)
if self.verbose then
s_console("I: received reply:")
msg:dump()
end
assert(msg:parts() >= 3)

local empty = msg:pop()
assert(empty == "")

local header = msg:pop()
assert(header == mdp.MDPC_CLIENT)

return msg
end
if self.verbose then
s_console("W: permanent error, abandoning request")
end
return nil -- Giving up
end

module(...)

function new(broker, verbose)
s_version_assert (2, 1);
local self = setmetatable({
context = zmq.init(1),
poller = zpoller.new(1),
broker = broker,
verbose = verbose,
timeout = 2500, -- msecs
}, obj_mt)

s_mdcli_connect_to_broker(self)
return self
end

setmetatable(_M, { __call = function(self, ...) return new(...) end })

mdcliapi2.lua: Majordomo asynchronous client API

And here's the corresponding client test program:

--
-- Majordomo Protocol client example - asynchronous
-- Uses the mdcli API to hide all MDP aspects
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

require"mdcliapi2"
require"zmsg"
require"zhelpers"

local verbose = (arg[1] == "-v")
local session = mdcliapi2.new("tcp://localhost:5555", verbose)

local count=100000
for n=1,count do
local request = zmsg.new("Hello world")
session:send("echo", request)
end
for n=1,count do
local reply = session:recv()
if not reply then
break -- Interrupted by Ctrl-C
end
end
printf("%d replies received\n", count)
session:destroy()

mdclient2.lua: Majordomo client application

The broker and worker are unchanged, since we've not modified the protocol at all. We see an immediate improvement in performance. Here's the synchronous client chugging through 100K request-reply cycles:

$ time mdclient
100000 requests/replies processed

real    0m14.088s
user    0m1.310s
sys     0m2.670s

And here's the asynchronous client, with a single worker:

$ time mdclient2
100000 replies received

real    0m8.730s
user    0m0.920s
sys     0m1.550s

Twice as fast. Not bad, but let's fire up 10 workers, and see how it handles:

$ time mdclient2
100000 replies received

real    0m3.863s
user    0m0.730s
sys     0m0.470s

It isn't fully asynchronous since workers get their messages on a strict LRU basis. But it will scale better with more workers. On my fast test box, after eight or so workers it doesn't get any faster. Four cores only stretches so far. But we got a 4x improvement in throughput with just a few minutes' work. The broker is still unoptimized. It spends most of its time copying message frames around, instead of doing zero copy, which it could. But we're getting 25K reliable request/reply calls a second, with pretty low effort.

However the asynchronous Majordomo pattern isn't all roses. It has a fundamental weakness, namely that it cannot survive a broker crash without more work. If you look at the mdcliapi2 code you'll see it does not attempt to reconnect after a failure. A proper reconnect would require:

  • That every request is numbered, and every reply has a matching number, which would ideally require a change to the protocol to enforce.
  • That the client API tracks and holds onto all outstanding requests, i.e. for which no reply had yet been received.
  • That in case of fail-over, the client API resends all outstanding requests to the broker.

It's not a deal breaker but it does show that performance often means complexity. Is this worth doing for Majordomo? It depends on your use case. For a name lookup service you call once per session, no. For a web front-end serving thousands of clients, probably yes.

Service Discovery

topprevnext

So, we have a nice service-oriented broker, but we have no way of knowing whether a particular service is available or not. We know if a request failed, but we don't know why. It is useful to be able to ask the broker, "is the echo service running?" The most obvious way would be to modify our MDP/Client protocol to add commands to ask the broker, "is service X running?" But MDP/Client has the great charm of being simple. Adding service discovery to it would make it as complex as the MDP/Worker protocol.

Another option is to do what email does, and ask that undeliverable requests be returned. This can work well in an asynchronous world but it also adds complexity. We need ways to distinguish returned requests from replies, and to handle these properly.

Let's try to use what we've already built, building on top of MDP instead of modifying it. Service discovery is, itself, a service. It might indeed be one of several management services, such as "disable service X", "provide statistics", and so on. What we want is a general, extensible solution that doesn't affect the protocol nor existing applications.

So here's a small RFC - MMI, or the Majordomo Management Interface - that layers this on top of MDP: http://rfc.zeromq.org/spec:8. We already implemented it in the broker, though unless you read the whole thing you probably missed that. Here's how we use the service discovery in an application:

--
-- MMI echo query example
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

require"mdcliapi"
require"zmsg"
require"zhelpers"

local verbose = (arg[1] == "-v")
local session = mdcliapi.new("tcp://localhost:5555", verbose)

-- This is the service we want to look up
local request = zmsg.new("echo")

-- This is the service we send our request to
local reply = session:send("mmi.service", request)

if (reply) then
printf ("Lookup echo service: %s\n", reply:body())
else
printf ("E: no response from broker, make sure it's running\n")
end

session:destroy()

mmiecho.lua: Service discovery over Majordomo

The broker checks the service name, and handles any service starting with "mmi." itself, rather than passing the request on to a worker. Try this with and without a worker running, and you should see the little program report '200' or '404' accordingly. The implementation of MMI in our example broker is pretty weak. For example if a worker disappears, services remain "present". In practice a broker should remove services that have no workers after some configurable timeout.

Idempotent Services

topprevnext

Idempotency is not something to take a pill for. What it means is that it's safe to repeat an operation. Checking the clock is idempotent. Lending ones credit card to ones wife is not. While many client-to-server use cases are idempotent, some are not. Examples of idempotent use cases include:

  • Stateless task distribution, i.e. a pipeline where the servers are stateless workers that compute a reply based purely on the state provided by a request. In such a case it's safe (though inefficient) to execute the same request many times.
  • A name service that translates logical addresses into endpoints to bind or connect to. In such a case it's safe to make the same lookup request many times.

And here are examples of a non-idempotent use cases:

  • A logging service. One does not want the same log information recorded more than once.
  • Any service that has impact on downstream nodes, e.g. sends on information to other nodes. If that service gets the same request more than once, downstream nodes will get duplicate information.
  • Any service that modifies shared data in some non-idempotent way. E.g. a service that debits a bank account is definitely not idempotent.

When our server applications are not idempotent, we have to think more carefully about when exactly they might crash. If an application dies when it's idle, or while it's processing a request, that's usually fine. We can use database transactions to make sure a debit and a credit are always done together, if at all. If the server dies while sending its reply, that's a problem, because as far as it's concerned, it's done its work.

if the network dies just as the reply is making its way back to the client, the same problem arises. The client will think the server died, will resend the request, and the server will do the same work twice. Which is not what we want.

We use the fairly standard solution of detecting and rejecting duplicate requests. This means:

  • The client must stamp every request with a unique client identifier and a unique message number.
  • The server, before sending back a reply, stores it using the client id + message number as a key.
  • The server, when getting a request from a given client, first checks if it has a reply for that client id + message number. If so, it does not process the request but just resends the reply.

Disconnected Reliability (Titanic Pattern)

topprevnext

Once you realize that Majordomo is a 'reliable' message broker, you might be tempted to add some spinning rust (that is, ferrous-based hard disk platters). After all, this works for all the enterprise messaging systems. It's such a tempting idea that it's a little sad to have to be negative. But brutal cynicism is one of my specialties. So, some reasons you don't want rust-based brokers sitting in the center of your architecture are:

  • As you've seen, the Lazy Pirate client performs surprisingly well. It works across a whole range of architectures, from direct client-to-server to distributed queue devices. It does tend to assume that workers are stateless and idempotent. But we can work around that limitation without resorting to rust.
  • Rust brings a whole set of problems, from slow performance to additional pieces to have to manage, repair, and create 6am panics as they inevitably break at the start of daily operations. The beauty of the Pirate patterns in general is their simplicity. They won't crash. And if you're still worried about the hardware, you can move to a peer-to-peer pattern that has no broker at all. I'll explain later in this chapter.

Having said this, however, there is one sane use case for rust-based reliability, which is an asynchronous disconnected network. It solves a major problem with Pirate, namely that a client has to wait for an answer in real-time. If clients and workers are only sporadically connected (think of email as an analogy), we can't use a stateless network between clients and workers. We have to put state in the middle.

So, here's the Titanic pattern, in which we write messages to disk to ensure they never get lost, no matter how sporadically clients and workers are connected. As we did for service discovery, we're going to layer Titanic on top of Majordomo rather than extend MDP. It's wonderfully lazy because it means we can implement our fire-and-forget reliability in a specialized worker, rather than in the broker. This is excellent for several reasons:

  • It is much easier because we divide and conquer: the broker handles message routing and the worker handles reliability.
  • It lets us mix brokers written in one language with workers written in another.
  • It lets us evolve the fire-and-forget technology independently.

The only downside is that there's an extra network hop between broker and hard disk. This is easily worth it.

There are many ways to make a persistent request-reply architecture. We'll aim for simple and painless. The simplest design I could come up with, after playing with this for a few hours, is Titanic as a "proxy service". That is, it doesn't affect workers at all. If a client wants a reply immediately, it talks directly to a service and hopes the service is available. If a client is happy to wait a while, it talks to Titanic instead and asks, "hey, buddy, would you take care of this for me while I go buy my groceries?"

Figure 57 - The Titanic Pattern

fig57.png

Titanic is thus both a worker, and a client. The dialog between client and Titanic goes along these lines:

  • Client: please accept this request for me. Titanic: OK, done.
  • Client: do you have a reply for me? Titanic: Yes, here it is. Or, no, not yet.
  • Client: ok, you can wipe that request now, it's all happy. Titanic: OK, done.

Whereas the dialog between Titanic and broker and worker goes like this:

  • Titanic: hey, broker, is there an echo service? Broker: uhm, yeah, seems like.
  • Titanic: hey, echo, please handle this for me. Echo: sure, here you are.
  • Titanic: sweeeeet!

You can work through this, and the possible failure scenarios. If a worker crashes while processing a request, Titanic retries, indefinitely. If a reply gets lost somewhere, Titanic will retry. If the request gets processed but the client doesn't get the reply, it will ask again. If Titanic crashes while processing a request, or a reply, the client will try again. As long as requests are fully committed to safe storage, work can't get lost.

The handshaking is pedantic, but can be pipelined, i.e. clients can use the asynchronous Majordomo pattern to do a lot of work and then get the responses later.

We need some way for a client to request its replies. We'll have many clients asking for the same services, and clients disappear and reappear with different identities. So here is a simple, reasonably secure solution:

  • Every request generates a universally unique ID (UUID), which Titanic returns to the client when it's queued the request.
  • When a client asks for a reply, it must specify the UUID for the original request.

This puts some onus on the client to store its request UUIDs safely, but it removes any need for authentication. What alternatives are there?

Before we jump off and write yet another formal specification (fun, fun!) let's consider how the client talks to Titanic. One way is to use a single service and send it three different request types. Another way, which seems simpler, is to use three services:

  • titanic.request - store a request message, return a UUID for the request.
  • titanic.reply - fetch a reply, if available, for a given request UUID.
  • titanic.close - confirm that a reply has been stored and processed.

We'll just make a multithreaded worker, which as we've seen from our multithreading experience with ØMQ, is trivial. However before jumping into code let's sketch down what Titanic would look like in terms of ØMQ messages and frames: http://rfc.zeromq.org/spec:9. This is the "Titanic Service Protocol", or TSP.

Using TSP is clearly more work for client applications than accessing a service directly via MDP. Here's the shortest robust 'echo' client example:

// Titanic client example
// Implements client side of http://rfc.zeromq.org/spec:9

// Lets build this source without creating a library
#include "mdcliapi.c"

// Calls a TSP service
// Returns response if successful (status code 200 OK), else NULL
//

static zmsg_t *
s_service_call (mdcli_t *session, char *service, zmsg_t **request_p)
{
zmsg_t *reply = mdcli_send (session, service, request_p);
if (reply) {
zframe_t *status = zmsg_pop (reply);
if (zframe_streq (status, "200")) {
zframe_destroy (&status);
return reply;
}
else
if (zframe_streq (status, "400")) {
printf ("E: client fatal error, aborting\n");
exit (EXIT_FAILURE);
}
else
if (zframe_streq (status, "500")) {
printf ("E: server fatal error, aborting\n");
exit (EXIT_FAILURE);
}
}
else
exit (EXIT_SUCCESS); // Interrupted or failed

zmsg_destroy (&reply);
return NULL; // Didn't succeed; don't care why not
}

// The main task tests our service call by sending an echo request:

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);

// 1. Send 'echo' request to Titanic
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "echo");
zmsg_addstr (request, "Hello world");
zmsg_t *reply = s_service_call (
session, "titanic.request", &request);

zframe_t *uuid = NULL;
if (reply) {
uuid = zmsg_pop (reply);
zmsg_destroy (&reply);
zframe_print (uuid, "I: request UUID ");
}
// 2. Wait until we get a reply
while (!zctx_interrupted) {
zclock_sleep (100);
request = zmsg_new ();
zmsg_add (request, zframe_dup (uuid));
zmsg_t *reply = s_service_call (
session, "titanic.reply", &request);

if (reply) {
char *reply_string = zframe_strdup (zmsg_last (reply));
printf ("Reply: %s\n", reply_string);
free (reply_string);
zmsg_destroy (&reply);

// 3. Close request
request = zmsg_new ();
zmsg_add (request, zframe_dup (uuid));
reply = s_service_call (session, "titanic.close", &request);
zmsg_destroy (&reply);
break;
}
else {
printf ("I: no reply yet, trying again…\n");
zclock_sleep (5000); // Try again in 5 seconds
}
}
zframe_destroy (&uuid);
mdcli_destroy (&session);
return 0;
}

ticlient.c: Titanic client example

Of course this can and in practice would be wrapped up in some kind of framework. Real application developers should never see messaging up close, it's a tool for more technically-minded experts to build frameworks and APIs. If we had infinite time to explore this, I'd make a TSP API example, and bring the client application back down to a few lines of code. But it's the same principle as we saw for MDP, no need to be repetitive.

Here's the Titanic implementation. This server handles the three services using three threads, as proposed. It does full persistence to disk using the most brute-force approach possible: one file per message. It's so simple it's scary, the only complex part is that it keeps a separate 'queue' of all requests to avoid reading the directory over and over:

// Titanic service
// Implements server side of http://rfc.zeromq.org/spec:9

// Lets us build this source without creating a library
#include "mdwrkapi.c"
#include "mdcliapi.c"

#include "zfile.h"
#include <uuid/uuid.h>

// Return a new UUID as a printable character string
// Caller must free returned string when finished with it

static char *
s_generate_uuid (void)
{
char hex_char [] = "0123456789ABCDEF";
char *uuidstr = zmalloc (sizeof (uuid_t) * 2 + 1);
uuid_t uuid;
uuid_generate (uuid);
int byte_nbr;
for (byte_nbr = 0; byte_nbr < sizeof (uuid_t); byte_nbr++) {
uuidstr [byte_nbr * 2 + 0] = hex_char [uuid [byte_nbr] >> 4];
uuidstr [byte_nbr * 2 + 1] = hex_char [uuid [byte_nbr] & 15];
}
return uuidstr;
}

// Returns freshly allocated request filename for given UUID

#define TITANIC_DIR ".titanic"

static char *
s_request_filename (char *uuid) {
char *filename = malloc (256);
snprintf (filename, 256, TITANIC_DIR "/%s.req", uuid);
return filename;
}

// Returns freshly allocated reply filename for given UUID

static char *
s_reply_filename (char *uuid) {
char *filename = malloc (256);
snprintf (filename, 256, TITANIC_DIR "/%s.rep", uuid);
return filename;
}

// The titanic.request task waits for requests to this service. It writes
// each request to disk and returns a UUID to the client. The client picks
// up the reply asynchronously using the titanic.reply service:

static void
titanic_request (void *args, zctx_t *ctx, void *pipe)
{
mdwrk_t *worker = mdwrk_new (
"tcp://localhost:5555", "titanic.request", 0);
zmsg_t *reply = NULL;

while (true) {
// Send reply if it's not null
// And then get next request from broker
zmsg_t *request = mdwrk_recv (worker, &reply);
if (!request)
break; // Interrupted, exit

// Ensure message directory exists
zfile_mkdir (TITANIC_DIR);

// Generate UUID and save message to disk
char *uuid = s_generate_uuid ();
char *filename = s_request_filename (uuid);
FILE *file = fopen (filename, "w");
assert (file);
zmsg_save (request, file);
fclose (file);
free (filename);
zmsg_destroy (&request);

// Send UUID through to message queue
reply = zmsg_new ();
zmsg_addstr (reply, uuid);
zmsg_send (&reply, pipe);

// Now send UUID back to client
// Done by the mdwrk_recv() at the top of the loop
reply = zmsg_new ();
zmsg_addstr (reply, "200");
zmsg_addstr (reply, uuid);
free (uuid);
}
mdwrk_destroy (&worker);
}

// The titanic.reply task checks if there's a reply for the specified
// request (by UUID), and returns a 200 (OK), 300 (Pending), or 400
// (Unknown) accordingly:

static void *
titanic_reply (void *context)
{
mdwrk_t *worker = mdwrk_new (
"tcp://localhost:5555", "titanic.reply", 0);
zmsg_t *reply = NULL;

while (true) {
zmsg_t *request = mdwrk_recv (worker, &reply);
if (!request)
break; // Interrupted, exit

char *uuid = zmsg_popstr (request);
char *req_filename = s_request_filename (uuid);
char *rep_filename = s_reply_filename (uuid);
if (zfile_exists (rep_filename)) {
FILE *file = fopen (rep_filename, "r");
assert (file);
reply = zmsg_load (NULL, file);
zmsg_pushstr (reply, "200");
fclose (file);
}
else {
reply = zmsg_new ();
if (zfile_exists (req_filename))
zmsg_pushstr (reply, "300"); //Pending
else
zmsg_pushstr (reply, "400"); //Unknown
}
zmsg_destroy (&request);
free (uuid);
free (req_filename);
free (rep_filename);
}
mdwrk_destroy (&worker);
return 0;
}

// The titanic.close task removes any waiting replies for the request
// (specified by UUID). It's idempotent, so it is safe to call more than
// once in a row:

static void *
titanic_close (void *context)
{
mdwrk_t *worker = mdwrk_new (
"tcp://localhost:5555", "titanic.close", 0);
zmsg_t *reply = NULL;

while (true) {
zmsg_t *request = mdwrk_recv (worker, &reply);
if (!request)
break; // Interrupted, exit

char *uuid = zmsg_popstr (request);
char *req_filename = s_request_filename (uuid);
char *rep_filename = s_reply_filename (uuid);
zfile_delete (req_filename);
zfile_delete (rep_filename);
free (uuid);
free (req_filename);
free (rep_filename);

zmsg_destroy (&request);
reply = zmsg_new ();
zmsg_addstr (reply, "200");
}
mdwrk_destroy (&worker);
return 0;
}

// This is the main thread for the Titanic worker. It starts three child
// threads; for the request, reply, and close services. It then dispatches
// requests to workers using a simple brute force disk queue. It receives
// request UUIDs from the titanic.request service, saves these to a disk
// file, and then throws each request at MDP workers until it gets a
// response.

static int s_service_success (char *uuid);

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
zctx_t *ctx = zctx_new ();

void *request_pipe = zthread_fork (ctx, titanic_request, NULL);
zthread_new (titanic_reply, NULL);
zthread_new (titanic_close, NULL);

// Main dispatcher loop
while (true) {
// We'll dispatch once per second, if there's no activity
zmq_pollitem_t items [] = { { request_pipe, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted
if (items [0].revents & ZMQ_POLLIN) {
// Ensure message directory exists
zfile_mkdir (TITANIC_DIR);

// Append UUID to queue, prefixed with '-' for pending
zmsg_t *msg = zmsg_recv (request_pipe);
if (!msg)
break; // Interrupted
FILE *file = fopen (TITANIC_DIR "/queue", "a");
char *uuid = zmsg_popstr (msg);
fprintf (file, "-%s\n", uuid);
fclose (file);
free (uuid);
zmsg_destroy (&msg);
}
// Brute force dispatcher
char entry [] = "?…….:…….:…….:…….:";
FILE *file = fopen (TITANIC_DIR "/queue", "r+");
while (file && fread (entry, 33, 1, file) == 1) {
// UUID is prefixed with '-' if still waiting
if (entry [0] == '-') {
if (verbose)
printf ("I: processing request %s\n", entry + 1);
if (s_service_success (entry + 1)) {
// Mark queue entry as processed
fseek (file, -33, SEEK_CUR);
fwrite ("+", 1, 1, file);
fseek (file, 32, SEEK_CUR);
}
}
// Skip end of line, LF or CRLF
if (fgetc (file) == '\r')
fgetc (file);
if (zctx_interrupted)
break;
}
if (file)
fclose (file);
}
return 0;
}

// Here, we first check if the requested MDP service is defined or not,
// using a MMI lookup to the Majordomo broker. If the service exists,
// we send a request and wait for a reply using the conventional MDP
// client API. This is not meant to be fast, just very simple:

static int
s_service_success (char *uuid)
{
// Load request message, service will be first frame
char *filename = s_request_filename (uuid);
FILE *file = fopen (filename, "r");
free (filename);

// If the client already closed request, treat as successful
if (!file)
return 1;

zmsg_t *request = zmsg_load (NULL, file);
fclose (file);
zframe_t *service = zmsg_pop (request);
char *service_name = zframe_strdup (service);

// Create MDP client session with short timeout
mdcli_t *client = mdcli_new ("tcp://localhost:5555", false);
mdcli_set_timeout (client, 1000); // 1 sec
mdcli_set_retries (client, 1); // only 1 retry

// Use MMI protocol to check if service is available
zmsg_t *mmi_request = zmsg_new ();
zmsg_add (mmi_request, service);
zmsg_t *mmi_reply = mdcli_send (client, "mmi.service", &mmi_request);
int service_ok = (mmi_reply
&& zframe_streq (zmsg_first (mmi_reply), "200"));
zmsg_destroy (&mmi_reply);

int result = 0;
if (service_ok) {
zmsg_t *reply = mdcli_send (client, service_name, &request);
if (reply) {
filename = s_reply_filename (uuid);
FILE *file = fopen (filename, "w");
assert (file);
zmsg_save (reply, file);
fclose (file);
free (filename);
result = 1;
}
zmsg_destroy (&reply);
}
else
zmsg_destroy (&request);

mdcli_destroy (&client);
free (service_name);
return result;
}

titanic.c: Titanic broker example

To test this, start mdbroker and titanic, then run ticlient. Now start mdworker arbitrarily, and you should see the client getting a response and exiting happily.

Some notes about this code:

  • We use MMI to only send requests to services that appear to be running. This works as well as the MMI implementation in the broker.
  • We use an inproc connection to send new request data from the titanic.request service through to the main dispatcher. This saves the dispatcher from having to scan the disk directory, load all request files, and sort them by date/time.

The important thing about this example is not performance (which is surely terrible, I've not tested it), but how well it implements the reliability contract. To try it, start the mdbroker and titanic programs. Then start the ticlient, and then start the mdworker echo service. You can run all four of these using the '-v' option to do verbose tracing of activity. You can stop and restart any piece except the client and nothing will get lost.

If you want to use Titanic in real cases, you'll rapidly be asking "how do we make this faster?" Here's what I'd do, starting with the example implementation:

  • Use a single disk file for all data, rather than multiple files. Operating systems are usually better at handling a few large files than many smaller ones.
  • Organize that disk file as a circular buffer so that new requests can be written contiguously (with very occasional wraparound). One thread, writing full speed to a disk file can work rapidly.
  • Keep the index in memory and rebuild the index at startup time, from the disk buffer. This saves the extra disk head flutter needed to keep the index fully safe on disk. You would want an fsync after every message, or every N milliseconds if you were prepared to lose the last M messages in case of a system failure.
  • Use a solid-state drive rather than spinning iron oxide platters.
  • Preallocate the entire file, or allocate in large chunks allowing the circular buffer to grow and shrink as needed. This avoids fragmentation and ensures most reads and writes are contiguous.

And so on. What I'd not recommend is storing messages in a database, not even a 'fast' key/value store, unless you really like a specific database and don't have performance worries. You will pay a steep price for the abstraction, 10 to 1000x over a raw disk file.

If you want to make Titanic even more reliable, you can do this by duplicating requests to a second server, which you'd place in a second location just far enough to survive nuclear attack on your primary location, yet not so far that you get too much latency.

If you want to make Titanic much faster and less reliable, you can store requests and replies purely in memory. This will give you the functionality of a disconnected network, but it won't survive a crash of the Titanic server itself.

High-availability Pair (Binary Star Pattern)

topprevnext

Overview

topprevnext

The Binary Star pattern puts two servers in a primary-backup high-availability pair. At any given time, one of these accepts connections from client applications (it is the "master") and one does not (it is the "slave"). Each server monitors the other. If the master disappears from the network, after a certain time the slave takes over as master.

Binary Star pattern was developed by Pieter Hintjens and Martin Sustrik for the iMatix OpenAMQ server. We designed it:

  • To provide a straight-forward high-availability solution.
  • To be simple enough to actually understand and use.
  • To fail-over reliably when needed, and only when needed.

Figure 58 - High-availability Pair, Normal Operation

fig58.png

Assuming we have a Binary Star pair running, here are the different scenarios that will result in fail-over happening:

  1. The hardware running the primary server has a fatal problem (power supply explodes, machine catches fire, or someone simply unplugs it by mistake), and disappears. Applications see this, and reconnect to the backup server.
  2. The network segment on which the primary server sits crashes - perhaps a router gets hit by a power spike - and applications start to reconnect to the backup server.
  3. The primary server crashes or is killed by the operator and does not restart automatically.

Figure 59 - High-availability Pair During Failover

fig59.png

Recovery from fail-over works as follows:

  1. The operators restart the primary server and fix whatever problems were causing it to disappear from the network.
  2. The operators stop the backup server, at a moment that will cause minimal disruption to applications.
  3. When applications have reconnected to the primary server, the operators restart the backup server.

Recovery (to using the primary server as master) is a manual operation. Painful experience teaches us that automatic recovery is undesirable. There are several reasons:

  • Failover creates an interruption of service to applications, possibly lasting 10-30 seconds. If there is a real emergency, this is much better than total outage. But if recovery creates a further 10-30 second outage, it is better that this happens off-peak, when users have gone off the network.
  • When there is an emergency, it's a Good Idea to create predictability for those trying to fix things. Automatic recovery creates uncertainty for system admins, who can no longer be sure which server is in charge without double-checking.
  • Last, you can get situations with automatic recovery where networks will fail over, and then recover, and operators are then placed in a difficult position to analyze what happened. There was an interruption of service, but the cause isn't clear.

Having said this, the Binary Star pattern will fail back to the primary server if this is running (again) and the backup server fails. In fact this is how we provoke recovery.

The shutdown process for a Binary Star pair is to either:

  1. Stop the passive server and then stop the active server at any later time, or
  2. Stop both servers in any order but within a few seconds of each other.

Stopping the active and then the passive server with any delay longer than the fail-over timeout will cause applications to disconnect, then reconnect, then disconnect again, which may disturb users.

Detailed Requirements

topprevnext

Binary Star is as simple as it can be, while still working accurately. In fact the current design is the third complete redesign. Each of the previous designs we found to be too complex, trying to do too much, and we stripped out functionality until we came to a design that was understandable and use, and reliable enough to be worth using.

These are our requirements for a high-availability architecture:

  • The fail-over is meant to provide insurance against catastrophic system failures, such as hardware breakdown, fire, accident, etc. To guard against ordinary server crashes there are simpler ways to recover.
  • Failover time should be under 60 seconds and preferably under 10 seconds.
  • Failover has to happen automatically, whereas recover must happen manually. We want applications to switch over to the backup server automatically but we do not want them to switch back to the primary server except when the operators have fixed whatever problem there was, and decided that it is a good time to interrupt applications again.
  • The semantics for client applications should be simple and easy for developers to understand. Ideally they should be hidden in the client API.
  • There should be clear instructions for network architects on how to avoid designs that could lead to split brain syndrome in which both servers in a Binary Star pair think they are the master server.
  • There should be no dependencies on the order in which the two servers are started.
  • It must be possible to make planned stops and restarts of either server without stopping client applications (though they may be forced to reconnect).
  • Operators must be able to monitor both servers at all times.
  • It must be possible to connect the two servers using a high-speed dedicated network connection. That is, fail-over synchronization must be able to use a specific IP route.

We make these assumptions:

  • A single backup server provides enough insurance, we don't need multiple levels of backup.
  • The primary and backup servers are equally capable of carrying the application load. We do not attempt to balance load across the servers.
  • There is sufficient budget to cover a fully redundant backup server that does nothing almost all the time.

We don't attempt to cover:

  • The use of an active backup server or load balancing. In a Binary Star pair, the backup server is inactive and does no useful work until the primary server goes off-line.
  • The handling of persistent messages or transactions in any way. We assuming a network of unreliable (and probably untrusted) servers or Binary Star pairs.
  • Any automatic exploration of the network. The Binary Star pair is manually and explicitly defined in the network and is known to applications (at least in their configuration data).
  • Replication of state or messages between servers. All server-side state much be recreated by applications when they fail over.

Here is the key terminology we use in Binary Star:

  • Primary - the primary server is the one that is normally 'master'.
  • Backup - the backup server is the one that is normally 'slave', it will become master if and when the primary server disappears from the network, and when client applications ask the backup server to connect.
  • Master - the master server is the one of a Binary Star pair that accepts client connections. There is at most one master server.
  • Slave - the slave server is the one that takes over if the master disappears. Note that when a Binary Star pair is running normally, the primary server is master, and the backup is slave. When a fail-over has happened, the roles are switched.

To configure a Binary Star pair, you need to:

  1. Tell the primary server where the backup server is.
  2. Tell the backup server where the primary server is.
  3. Optionally, tune the fail-over response times, which must be the same for both servers.

The main tuning concern is how frequently you want the servers to check their peering status, and how quickly you want to activate fail-over. In our example, the fail-over timeout value defaults to 2000 msec. If you reduce this, the backup server will take over as master more rapidly but may take over in cases where the primary server could recover. You may for example have wrapped the primary server in a shell script that restarts it if it crashes. In that case the timeout should be higher than the time needed to restart the primary server.

For client applications to work properly with a Binary Star pair, they must:

  1. Know both server addresses.
  2. Try to connect to the primary server, and if that fails, to the backup server.
  3. Detect a failed connection, typically using heartbeating.
  4. Try to reconnect to primary, and then backup, with a delay between retries that is at least as high as the server fail-over timeout.
  5. Recreate all of the state they require on a server.
  6. Retransmit messages lost during a fail-over, if messages need to be reliable.

It's not trivial work, and we'd usually wrap this in an API that hides it from real end-user applications.

These are the main limitations of the Binary Star pattern:

  • A server process cannot be part of more than one Binary Star pair.
  • A primary server can have a single backup server, no more.
  • The backup server cannot do useful work while in slave mode.
  • The backup server must be capable of handling full application loads.
  • Failover configuration cannot be modified at runtime.
  • Client applications must do some work to benefit from fail-over.

Preventing Split-Brain Syndrome

topprevnext

"Split-brain syndrome" is when different parts of a cluster think they are 'master' at the same time. It causes applications to stop seeing each other. Binary Star has an algorithm for detecting and eliminating split brain, based on a three-way decision mechanism (a server will not decide to become master until it gets application connection requests and it cannot see its peer server).

However it is still possible to (mis)design a network to fool this algorithm. A typical scenario would a Binary Star pair distributed between two buildings, where each building also had a set of applications, and there was a single network link between both buildings. Breaking this link would create two sets of client applications, each with half of the Binary Star pair, and each fail-over server would become active.

To prevent split-brain situations, we MUST connect Binary Star pairs using a dedicated network link, which can be as simple as plugging them both into the same switch or better, using a cross-over cable directly between two machines.

We must not split a Binary Star architecture into two islands, each with a set of applications. While this may be a common type of network architecture, we'd use federation, not high-availability fail-over, in such cases.

A suitably paranoid network configuration would use two private cluster interconnects, rather than a single one. Further, the network cards used for the cluster would be different to those used for message in/out, and possibly even on different PCI paths on the server hardware. The goal being to separate possible failures in the network from possible failures in the cluster. Network ports have a relatively high failure rate.

Binary Star Implementation

topprevnext

Without further ado, here is a proof-of-concept implementation of the Binary Star server:

// Binary Star server proof-of-concept implementation. This server does no
// real work; it just demonstrates the Binary Star failover model.

#include "czmq.h"

// States we can be in at any point in time
typedef enum {
STATE_PRIMARY = 1, // Primary, waiting for peer to connect
STATE_BACKUP = 2, // Backup, waiting for peer to connect
STATE_ACTIVE = 3, // Active - accepting connections
STATE_PASSIVE = 4 // Passive - not accepting connections
} state_t;

// Events, which start with the states our peer can be in
typedef enum {
PEER_PRIMARY = 1, // HA peer is pending primary
PEER_BACKUP = 2, // HA peer is pending backup
PEER_ACTIVE = 3, // HA peer is active
PEER_PASSIVE = 4, // HA peer is passive
CLIENT_REQUEST = 5 // Client makes request
} event_t;

// Our finite state machine
typedef struct {
state_t state; // Current state
event_t event; // Current event
int64_t peer_expiry; // When peer is considered 'dead'
} bstar_t;

// We send state information this often
// If peer doesn't respond in two heartbeats, it is 'dead'
#define HEARTBEAT 1000
// In msecs

// The heart of the Binary Star design is its finite-state machine (FSM).
// The FSM runs one event at a time. We apply an event to the current state,
// which checks if the event is accepted, and if so, sets a new state:

static bool
s_state_machine (bstar_t *fsm)
{
bool exception = false;

// These are the PRIMARY and BACKUP states; we're waiting to become
// ACTIVE or PASSIVE depending on events we get from our peer:
if (fsm->state == STATE_PRIMARY) {
if (fsm->event == PEER_BACKUP) {
printf ("I: connected to backup (passive), ready active\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_ACTIVE) {
printf ("I: connected to backup (active), ready passive\n");
fsm->state = STATE_PASSIVE;
}
// Accept client connections
}
else
if (fsm->state == STATE_BACKUP) {
if (fsm->event == PEER_ACTIVE) {
printf ("I: connected to primary (active), ready passive\n");
fsm->state = STATE_PASSIVE;
}
else
// Reject client connections when acting as backup
if (fsm->event == CLIENT_REQUEST)
exception = true;
}
else
// These are the ACTIVE and PASSIVE states:

if (fsm->state == STATE_ACTIVE) {
if (fsm->event == PEER_ACTIVE) {
// Two actives would mean split-brain
printf ("E: fatal error - dual actives, aborting\n");
exception = true;
}
}
else
// Server is passive
// CLIENT_REQUEST events can trigger failover if peer looks dead
if (fsm->state == STATE_PASSIVE) {
if (fsm->event == PEER_PRIMARY) {
// Peer is restarting - become active, peer will go passive
printf ("I: primary (passive) is restarting, ready active\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_BACKUP) {
// Peer is restarting - become active, peer will go passive
printf ("I: backup (passive) is restarting, ready active\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_PASSIVE) {
// Two passives would mean cluster would be non-responsive
printf ("E: fatal error - dual passives, aborting\n");
exception = true;
}
else
if (fsm->event == CLIENT_REQUEST) {
// Peer becomes active if timeout has passed
// It's the client request that triggers the failover
assert (fsm->peer_expiry > 0);
if (zclock_time () >= fsm->peer_expiry) {
// If peer is dead, switch to the active state
printf ("I: failover successful, ready active\n");
fsm->state = STATE_ACTIVE;
}
else
// If peer is alive, reject connections
exception = true;
}
}
return exception;
}

// This is our main task. First we bind/connect our sockets with our
// peer and make sure we will get state messages correctly. We use
// three sockets; one to publish state, one to subscribe to state, and
// one for client requests/replies:

int main (int argc, char *argv [])
{
// Arguments can be either of:
// -p primary server, at tcp://localhost:5001
// -b backup server, at tcp://localhost:5002
zctx_t *ctx = zctx_new ();
void *statepub = zsocket_new (ctx, ZMQ_PUB);
void *statesub = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (statesub, "");
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
bstar_t fsm = { 0 };

if (argc == 2 && streq (argv [1], "-p")) {
printf ("I: Primary active, waiting for backup (passive)\n");
zsocket_bind (frontend, "tcp://*:5001");
zsocket_bind (statepub, "tcp://*:5003");
zsocket_connect (statesub, "tcp://localhost:5004");
fsm.state = STATE_PRIMARY;
}
else
if (argc == 2 && streq (argv [1], "-b")) {
printf ("I: Backup passive, waiting for primary (active)\n");
zsocket_bind (frontend, "tcp://*:5002");
zsocket_bind (statepub, "tcp://*:5004");
zsocket_connect (statesub, "tcp://localhost:5003");
fsm.state = STATE_BACKUP;
}
else {
printf ("Usage: bstarsrv { -p | -b }\n");
zctx_destroy (&ctx);
exit (0);
}
// We now process events on our two input sockets, and process these
// events one at a time via our finite-state machine. Our "work" for
// a client request is simply to echo it back:

// Set timer for next outgoing state message
int64_t send_state_at = zclock_time () + HEARTBEAT;
while (!zctx_interrupted) {
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ statesub, 0, ZMQ_POLLIN, 0 }
};
int time_left = (int) ((send_state_at - zclock_time ()));
if (time_left < 0)
time_left = 0;
int rc = zmq_poll (items, 2, time_left * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Context has been shut down

if (items [0].revents & ZMQ_POLLIN) {
// Have a client request
zmsg_t *msg = zmsg_recv (frontend);
fsm.event = CLIENT_REQUEST;
if (s_state_machine (&fsm) == false)
// Answer client by echoing request back
zmsg_send (&msg, frontend);
else
zmsg_destroy (&msg);
}
if (items [1].revents & ZMQ_POLLIN) {
// Have state from our peer, execute as event
char *message = zstr_recv (statesub);
fsm.event = atoi (message);
free (message);
if (s_state_machine (&fsm))
break; // Error, so exit
fsm.peer_expiry = zclock_time () + 2 * HEARTBEAT;
}
// If we timed out, send state to peer
if (zclock_time () >= send_state_at) {
char message [2];
sprintf (message, "%d", fsm.state);
zstr_send (statepub, message);
send_state_at = zclock_time () + HEARTBEAT;
}
}
if (zctx_interrupted)
printf ("W: interrupted\n");

// Shutdown sockets and context
zctx_destroy (&ctx);
return 0;
}

bstarsrv.c: Binary Star server

And here is the client:

// Binary Star client proof-of-concept implementation. This client does no
// real work; it just demonstrates the Binary Star failover model.

#include "czmq.h"
#define REQUEST_TIMEOUT 1000 // msecs
#define SETTLE_DELAY 2000 // Before failing over

int main (void)
{
zctx_t *ctx = zctx_new ();

char *server [] = { "tcp://localhost:5001", "tcp://localhost:5002" };
uint server_nbr = 0;

printf ("I: connecting to server at %s…\n", server [server_nbr]);
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, server [server_nbr]);

int sequence = 0;
while (!zctx_interrupted) {
// We send a request, then we work to get a reply
char request [10];
sprintf (request, "%d", ++sequence);
zstr_send (client, request);

int expect_reply = 1;
while (expect_reply) {
// Poll socket for a reply, with timeout
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

// We use a Lazy Pirate strategy in the client. If there's no
// reply within our timeout, we close the socket and try again.
// In Binary Star, it's the client vote that decides which
// server is primary; the client must therefore try to connect
// to each server in turn:

if (items [0].revents & ZMQ_POLLIN) {
// We got a reply from the server, must match sequence
char *reply = zstr_recv (client);
if (atoi (reply) == sequence) {
printf ("I: server replied OK (%s)\n", reply);
expect_reply = 0;
sleep (1); // One request per second
}
else
printf ("E: bad reply from server: %s\n", reply);
free (reply);
}
else {
printf ("W: no response from server, failing over\n");

// Old socket is confused; close it and open a new one
zsocket_destroy (ctx, client);
server_nbr = (server_nbr + 1) % 2;
zclock_sleep (SETTLE_DELAY);
printf ("I: connecting to server at %s…\n",
server [server_nbr]);
client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, server [server_nbr]);

// Send request again, on new socket
zstr_send (client, request);
}
}
}
zctx_destroy (&ctx);
return 0;
}

bstarcli.c: Binary Star client

To test Binary Star, start the servers and client in any order:

bstarsrv -p     # Start primary
bstarsrv -b     # Start backup
bstarcli

You can then provoke fail-over by killing the primary server, and recovery by restarting the primary and killing the backup. Note how it's the client vote that triggers fail-over, and recovery.

Binary star is driven by a finite state machine. States in green accept client requests, states in pink refuse them. Events are the peer state, so "Peer Active" means the other server has told us it's active. "Client Request" means we've received a client request. "Client Vote" means we've received a client request AND our peer is inactive for two heartbeats.

Figure 60 - Binary Star Finite State Machine

fig60.png

Note that the servers use PUB-SUB sockets for state exchange. No other socket combination will work here. PUSH and DEALER block if there is no peer ready to receive a message. PAIR does not reconnect if the peer disappears and comes back. ROUTER needs the address of the peer before it can send it a message.

These are the main limitations of the Binary Star pattern:

  • A server process cannot be part of more than one Binary Star pair.
  • A primary server can have a single backup server, no more.
  • The backup server cannot do useful work while in slave mode.
  • The backup server must be capable of handling full application loads.
  • Failover configuration cannot be modified at runtime.
  • Client applications must do some work to benefit from fail-over.

Binary Star Reactor

topprevnext

Binary Star is useful and generic enough to package up as a reusable reactor class. The reactor then runs and calls our code whenever it has a message to process. This is much nicer than copying/pasting the Binary Star code into each server where we want that capability. In C we wrap the CZMQ zloop class, though your mileage may vary in other languages. Here is the bstar interface in C:

mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
zmsg_t *mdcli_recv (mdcli_t *self);

And here is the class implementation:

// bstar class - Binary Star reactor

#include "bstar.h"

// States we can be in at any point in time
typedef enum {
STATE_PRIMARY = 1, // Primary, waiting for peer to connect
STATE_BACKUP = 2, // Backup, waiting for peer to connect
STATE_ACTIVE = 3, // Active - accepting connections
STATE_PASSIVE = 4 // Passive - not accepting connections
} state_t;

// Events, which start with the states our peer can be in
typedef enum {
PEER_PRIMARY = 1, // HA peer is pending primary
PEER_BACKUP = 2, // HA peer is pending backup
PEER_ACTIVE = 3, // HA peer is active
PEER_PASSIVE = 4, // HA peer is passive
CLIENT_REQUEST = 5 // Client makes request
} event_t;

// Structure of our class

struct _bstar_t {
zctx_t *ctx; // Our private context
zloop_t *loop; // Reactor loop
void *statepub; // State publisher
void *statesub; // State subscriber
state_t state; // Current state
event_t event; // Current event
int64_t peer_expiry; // When peer is considered 'dead'
zloop_fn *voter_fn; // Voting socket handler
void *voter_arg; // Arguments for voting handler
zloop_fn *active_fn; // Call when become active
void *active_arg; // Arguments for handler
zloop_fn *passive_fn; // Call when become passive
void *passive_arg; // Arguments for handler
};

// The finite-state machine is the same as in the proof-of-concept server.
// To understand this reactor in detail, first read the CZMQ zloop class.

// We send state information every this often
// If peer doesn't respond in two heartbeats, it is 'dead'
#define BSTAR_HEARTBEAT 1000 // In msecs

// Binary Star finite state machine (applies event to state)
// Returns -1 if there was an exception, 0 if event was valid.

static int
s_execute_fsm (bstar_t *self)
{
int rc = 0;
// Primary server is waiting for peer to connect
// Accepts CLIENT_REQUEST events in this state
if (self->state == STATE_PRIMARY) {
if (self->event == PEER_BACKUP) {
zclock_log ("I: connected to backup (passive), ready as active");
self->state = STATE_ACTIVE;
if (self->active_fn)
(self->active_fn) (self->loop, NULL, self->active_arg);
}
else
if (self->event == PEER_ACTIVE) {
zclock_log ("I: connected to backup (active), ready as passive");
self->state = STATE_PASSIVE;
if (self->passive_fn)
(self->passive_fn) (self->loop, NULL, self->passive_arg);
}
else
if (self->event == CLIENT_REQUEST) {
// Allow client requests to turn us into the active if we've
// waited sufficiently long to believe the backup is not
// currently acting as active (i.e., after a failover)
assert (self->peer_expiry > 0);
if (zclock_time () >= self->peer_expiry) {
zclock_log ("I: request from client, ready as active");
self->state = STATE_ACTIVE;
if (self->active_fn)
(self->active_fn) (self->loop, NULL, self->active_arg);
} else
// Don't respond to clients yet - it's possible we're
// performing a failback and the backup is currently active
rc = -1;
}
}
else
// Backup server is waiting for peer to connect
// Rejects CLIENT_REQUEST events in this state
if (self->state == STATE_BACKUP) {
if (self->event == PEER_ACTIVE) {
zclock_log ("I: connected to primary (active), ready as passive");
self->state = STATE_PASSIVE;
if (self->passive_fn)
(self->passive_fn) (self->loop, NULL, self->passive_arg);
}
else
if (self->event == CLIENT_REQUEST)
rc = -1;
}
else
// Server is active
// Accepts CLIENT_REQUEST events in this state
// The only way out of ACTIVE is death
if (self->state == STATE_ACTIVE) {
if (self->event == PEER_ACTIVE) {
// Two actives would mean split-brain
zclock_log ("E: fatal error - dual actives, aborting");
rc = -1;
}
}
else
// Server is passive
// CLIENT_REQUEST events can trigger failover if peer looks dead
if (self->state == STATE_PASSIVE) {
if (self->event == PEER_PRIMARY) {
// Peer is restarting - become active, peer will go passive
zclock_log ("I: primary (passive) is restarting, ready as active");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_BACKUP) {
// Peer is restarting - become active, peer will go passive
zclock_log ("I: backup (passive) is restarting, ready as active");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_PASSIVE) {
// Two passives would mean cluster would be non-responsive
zclock_log ("E: fatal error - dual passives, aborting");
rc = -1;
}
else
if (self->event == CLIENT_REQUEST) {
// Peer becomes active if timeout has passed
// It's the client request that triggers the failover
assert (self->peer_expiry > 0);
if (zclock_time () >= self->peer_expiry) {
// If peer is dead, switch to the active state
zclock_log ("I: failover successful, ready as active");
self->state = STATE_ACTIVE;
}
else
// If peer is alive, reject connections
rc = -1;
}
// Call state change handler if necessary
if (self->state == STATE_ACTIVE && self->active_fn)
(self->active_fn) (self->loop, NULL, self->active_arg);
}
return rc;
}

static void
s_update_peer_expiry (bstar_t *self)
{
self->peer_expiry = zclock_time () + 2 * BSTAR_HEARTBEAT;
}

// Reactor event handlers…

// Publish our state to peer
int s_send_state (zloop_t *loop, int timer_id, void *arg)
{
bstar_t *self = (bstar_t *) arg;
zstr_sendf (self->statepub, "%d", self->state);
return 0;
}

// Receive state from peer, execute finite state machine
int s_recv_state (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
bstar_t *self = (bstar_t *) arg;
char *state = zstr_recv (poller->socket);
if (state) {
self->event = atoi (state);
s_update_peer_expiry (self);
free (state);
}
return s_execute_fsm (self);
}

// Application wants to speak to us, see if it's possible
int s_voter_ready (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
bstar_t *self = (bstar_t *) arg;
// If server can accept input now, call appl handler
self->event = CLIENT_REQUEST;
if (s_execute_fsm (self) == 0)
(self->voter_fn) (self->loop, poller, self->voter_arg);
else {
// Destroy waiting message, no-one to read it
zmsg_t *msg = zmsg_recv (poller->socket);
zmsg_destroy (&msg);
}
return 0;
}

// This is the constructor for our bstar class. We have to tell it
// whether we're primary or backup server, as well as our local and
// remote endpoints to bind and connect to:

bstar_t *
bstar_new (int primary, char *local, char *remote)
{
bstar_t
*self;

self = (bstar_t *) zmalloc (sizeof (bstar_t));

// Initialize the Binary Star
self->ctx = zctx_new ();
self->loop = zloop_new ();
self->state = primary? STATE_PRIMARY: STATE_BACKUP;

// Create publisher for state going to peer
self->statepub = zsocket_new (self->ctx, ZMQ_PUB);
zsocket_bind (self->statepub, local);

// Create subscriber for state coming from peer
self->statesub = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_set_subscribe (self->statesub, "");
zsocket_connect (self->statesub, remote);

// Set-up basic reactor events
zloop_timer (self->loop, BSTAR_HEARTBEAT, 0, s_send_state, self);
zmq_pollitem_t poller = { self->statesub, 0, ZMQ_POLLIN };
zloop_poller (self->loop, &poller, s_recv_state, self);
return self;
}

// The destructor shuts down the bstar reactor:

void
bstar_destroy (bstar_t **self_p)
{
assert (self_p);
if (*self_p) {
bstar_t *self = *self_p;
zloop_destroy (&self->loop);
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}

// This method returns the underlying zloop reactor, so we can add
// additional timers and readers:

zloop_t *
bstar_zloop (bstar_t *self)
{
return self->loop;
}

// This method registers a client voter socket. Messages received
// on this socket provide the CLIENT_REQUEST events for the Binary Star
// FSM and are passed to the provided application handler. We require
// exactly one voter per bstar instance:

int
bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler,
void *arg)
{
// Hold actual handler+arg so we can call this later
void *socket = zsocket_new (self->ctx, type);
zsocket_bind (socket, endpoint);
assert (!self->voter_fn);
self->voter_fn = handler;
self->voter_arg = arg;
zmq_pollitem_t poller = { socket, 0, ZMQ_POLLIN };
return zloop_poller (self->loop, &poller, s_voter_ready, self);
}

// Register handlers to be called each time there's a state change:

void
bstar_new_active (bstar_t *self, zloop_fn handler, void *arg)
{
assert (!self->active_fn);
self->active_fn = handler;
self->active_arg = arg;
}

void
bstar_new_passive (bstar_t *self, zloop_fn handler, void *arg)
{
assert (!self->passive_fn);
self->passive_fn = handler;
self->passive_arg = arg;
}

// Enable/disable verbose tracing, for debugging:

void bstar_set_verbose (bstar_t *self, bool verbose)
{
zloop_set_verbose (self->loop, verbose);
}

// Finally, start the configured reactor. It will end if any handler
// returns -1 to the reactor, or if the process receives SIGINT or SIGTERM:

int
bstar_start (bstar_t *self)
{
assert (self->voter_fn);
s_update_peer_expiry (self);
return zloop_start (self->loop);
}

bstar.c: Binary Star core class

Which gives us the following short main program for the server:

// Binary Star server, using bstar reactor

// Lets us build this source without creating a library
#include "bstar.c"

// Echo service
int s_echo (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
zmsg_t *msg = zmsg_recv (poller->socket);
zmsg_send (&msg, poller->socket);
return 0;
}

int main (int argc, char *argv [])
{
// Arguments can be either of:
// -p primary server, at tcp://localhost:5001
// -b backup server, at tcp://localhost:5002
bstar_t *bstar;
if (argc == 2 && streq (argv [1], "-p")) {
printf ("I: Primary active, waiting for backup (passive)\n");
bstar = bstar_new (BSTAR_PRIMARY,
"tcp://*:5003", "tcp://localhost:5004");
bstar_voter (bstar, "tcp://*:5001", ZMQ_ROUTER, s_echo, NULL);
}
else
if (argc == 2 && streq (argv [1], "-b")) {
printf ("I: Backup passive, waiting for primary (active)\n");
bstar = bstar_new (BSTAR_BACKUP,
"tcp://*:5004", "tcp://localhost:5003");
bstar_voter (bstar, "tcp://*:5002", ZMQ_ROUTER, s_echo, NULL);
}
else {
printf ("Usage: bstarsrvs { -p | -b }\n");
exit (0);
}
bstar_start (bstar);
bstar_destroy (&bstar);
return 0;
}

bstarsrv2.c: Binary Star server, using core class

Brokerless Reliability (Freelance Pattern)

topprevnext

It might seem ironic to focus so much on broker-based reliability, when we often explain ØMQ as "brokerless messaging". However in messaging, as in real life, the middleman is both a burden and a benefit. In practice, most messaging architectures benefit from a mix of distributed and brokered messaging. You get the best results when you can decide freely what tradeoffs you want to make. This is why I can drive 10km to a wholesaler to buy five cases of wine for a party, but I can also walk 10 minutes to a corner store to buy one bottle for a dinner. Our highly context-sensitive relative valuations of time, energy, and cost are essential to the real world economy. And they are essential to an optimal message-based architecture.

Which is why ØMQ does not impose a broker-centric architecture, though it gives you the tools to build brokers, aka "devices", and we've built a dozen or so different ones so far, just for practice.

So we'll end this chapter by deconstructing the broker-based reliability we've built so far, and turning it back into a distributed peer-to-peer architecture I call the Freelance pattern. Our use case will be a name resolution service. This is a common problem with ØMQ architectures: how do we know the endpoint to connect to? Hard-coding TCP/IP addresses in code is insanely fragile. Using configuration files creates an administration nightmare. Imagine if you had to hand-configure your web browser, on every PC or mobile phone you used, to realize that "google.com" was "74.125.230.82".

A ØMQ name service (and we'll make a simple implementation) has to:

  • Resolve a logical name into at least a bind endpoint, and a connect endpoint. A realistic name service would provide multiple bind endpoints, and possibly multiple connect endpoints too.
  • Allow us to manage multiple parallel environments, e.g. "test" vs. "production" without modifying code.
  • Be reliable, because if it is unavailable, applications won't be able to connect to the network.

Putting a name service behind a service-oriented Majordomo broker is clever from some points of view. However it's simpler and much less surprising to just expose the name service as a server that clients can connect to directly. If we do this right, the name service becomes the only global network endpoint we need to hard-code in our code or configuration files.

The types of failure we aim to handle are server crashes and restarts, server busy looping, server overload, and network issues. To get reliability, we'll create a pool of name servers so if one crashes or goes away, clients can connect to another, and so on. In practice, two would be enough. But for the example, we'll assume the pool can be any size.

Figure 61 - The Freelance Pattern

fig61.png

In this architecture a large set of clients connect to a small set of servers directly. The servers bind to their respective addresses. It's fundamentally different from a broker-based approach like Majordomo, where workers connect to the broker. For clients, there are a couple of options:

  • Clients could use REQ sockets and the Lazy Pirate pattern. Easy, but would need some additional intelligence to not stupidly reconnect to dead servers over and over.
  • Clients could use DEALER sockets and blast out requests (which will be load balanced to all connected servers) until they get a reply. Brutal, but not elegant.
  • Clients could use ROUTER sockets so they can address specific servers. But how does the client know the identity of the server sockets? Either the server has to ping the client first (complex), or the each server has to use a hard-coded, fixed identity known to the client (nasty).

Model One - Simple Retry and Failover

topprevnext

So our menu appears to offer: simple, brutal, complex, or nasty. Let's start with 'simple' and then work out the kinks. We take Lazy Pirate and rewrite it to work with multiple server endpoints. Start the server first, specifying a bind endpoint as argument. Run one or several servers:

--
-- Freelance server - Model 1
-- Trivial echo service
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmsg"
require"zmq"

if (#arg < 1) then
printf("I: syntax: %s <endpoint>\n", arg[0])
os.exit(0)
end
local context = zmq.init(1)
s_catch_signals()

-- Implement basic echo service
local server = context:socket(zmq.REP)
server:bind(arg[1])
printf("I: echo service is ready at %s\n", arg[1])
while (not s_interrupted) do
local msg, err = zmsg.recv(server)
if err then
print('recv error:', err)
break -- Interrupted
end
msg:send(server)
end
if (s_interrupted) then
printf("W: interrupted\n")
end
server:close()
context:term()

flserver1.lua: Freelance server, Model One

Then start the client, specifying one or more connect endpoints as arguments:

// Freelance client - Model 1
// Uses REQ socket to query one or more services

#include "czmq.h"
#define REQUEST_TIMEOUT 1000
#define MAX_RETRIES 3 // Before we abandon

static zmsg_t *
s_try_request (zctx_t *ctx, char *endpoint, zmsg_t *request)
{
printf ("I: trying echo service at %s…\n", endpoint);
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, endpoint);

// Send request, wait safely for reply
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, client);
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
zmsg_t *reply = NULL;
if (items [0].revents & ZMQ_POLLIN)
reply = zmsg_recv (client);

// Close socket in any case, we're done with it now
zsocket_destroy (ctx, client);
return reply;
}

// The client uses a Lazy Pirate strategy if it only has one server to talk
// to. If it has two or more servers to talk to, it will try each server just
// once:

int main (int argc, char *argv [])
{
zctx_t *ctx = zctx_new ();
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "Hello world");
zmsg_t *reply = NULL;

int endpoints = argc - 1;
if (endpoints == 0)
printf ("I: syntax: %s <endpoint> …\n", argv [0]);
else
if (endpoints == 1) {
// For one endpoint, we retry N times
int retries;
for (retries = 0; retries < MAX_RETRIES; retries++) {
char *endpoint = argv [1];
reply = s_try_request (ctx, endpoint, request);
if (reply)
break; // Successful
printf ("W: no response from %s, retrying…\n", endpoint);
}
}
else {
// For multiple endpoints, try each at most once
int endpoint_nbr;
for (endpoint_nbr = 0; endpoint_nbr < endpoints; endpoint_nbr++) {
char *endpoint = argv [endpoint_nbr + 1];
reply = s_try_request (ctx, endpoint, request);
if (reply)
break; // Successful
printf ("W: no response from %s\n", endpoint);
}
}
if (reply)
printf ("Service is running OK\n");

zmsg_destroy (&request);
zmsg_destroy (&reply);
zctx_destroy (&ctx);
return 0;
}

flclient1.c: Freelance client, Model One

For example:

flserver1 tcp://*:5555 &
flserver1 tcp://*:5556 &
flclient1 tcp://localhost:5555 tcp://localhost:5556

While the basic approach is Lazy Pirate, the client aims to just get one successful reply. It has two techniques, depending on whether you are running a single server, or multiple servers:

  • With a single server, the client will retry several times, exactly as for Lazy Pirate.
  • With multiple servers, the client will try each server at most once, until it's received a reply, or has tried all servers.

This solves the main weakness of Lazy Pirate, namely that it could not do fail-over to backup / alternate servers.

However this design won't work well in a real application. If we're connecting many sockets, and our primary name server is down, we're going to do this painful timeout each time.

Model Two - Brutal Shotgun Massacre

topprevnext

Let's switch our client to using a DEALER socket. Our goal here is to make sure we get a reply back within the shortest possible time, no matter whether the primary server is down or not. Our client takes this approach:

  • We set things up, connecting to all servers.
  • When we have a request, we blast it out as many times as we have servers.
  • We wait for the first reply, and take that.
  • We ignore any other replies.

What will happen in practice is that when all servers are running, ØMQ will distribute the requests so each server gets one request, and sends one reply. When any server is offline, and disconnected, ØMQ will distribute the requests to the remaining servers. So a server may in some cases get the same request more than once.

What's more annoying for the client is that we'll get multiple replies back, but there's no guarantee we'll get a precise number of replies. Requests and replies can get lost (e.g. if the server crashes while processing a request).

So, we have to number requests, and ignore any replies that don't match the request number. Our Model One server will work, since it's an echo server, but coincidence is not a great basis for understanding. So we'll make a Model Two server that chews up the message, returns a correctly-numbered reply with the content "OK". We'll use messages consisting of two parts, a sequence number and a body.

Start the server once or more, specifying a bind endpoint each time:

--
-- Freelance server - Model 2
-- Does some work, replies OK, with message sequencing
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zmsg"

if (#arg < 1) then
printf ("I: syntax: %s <endpoint>\n", arg[0])
os.exit (0)
end
local context = zmq.init(1)
s_catch_signals()

local server = context:socket(zmq.REP)
server:bind(arg[1])
printf ("I: service is ready at %s\n", arg[1])
while (not s_interrupted) do
local msg, err = zmsg.recv(server)
if err then
print('recv error:', err)
break -- Interrupted
end
-- Fail nastily if run against wrong client
assert (msg:parts() == 2)

msg:body_set("OK")
msg:send(server)
end
if (s_interrupted) then
printf("W: interrupted\n")
end
server:close()
context:term()

flserver2.lua: Freelance server, Model Two

Then start the client, specifying the connect endpoints as arguments:

// Freelance client - Model 2
// Uses DEALER socket to blast one or more services

#include "czmq.h"

// We design our client API as a class, using the CZMQ style
#ifdef __cplusplus
extern "C" {
#endif

typedef struct _flclient_t flclient_t;
flclient_t *flclient_new (void);
void flclient_destroy (flclient_t **self_p);
void flclient_connect (flclient_t *self, char *endpoint);
zmsg_t *flclient_request (flclient_t *self, zmsg_t **request_p);

#ifdef __cplusplus

}
#endif

// If not a single service replies within this time, give up
#define GLOBAL_TIMEOUT 2500

int main (int argc, char *argv [])
{
if (argc == 1) {
printf ("I: syntax: %s <endpoint> …\n", argv [0]);
return 0;
}
// Create new freelance client object
flclient_t *client = flclient_new ();

// Connect to each endpoint
int argn;
for (argn = 1; argn < argc; argn++)
flclient_connect (client, argv [argn]);

// Send a bunch of name resolution 'requests', measure time
int requests = 10000;
uint64_t start = zclock_time ();
while (requests--) {
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "random name");
zmsg_t *reply = flclient_request (client, &request);
if (!reply) {
printf ("E: name service not available, aborting\n");
break;
}
zmsg_destroy (&reply);
}
printf ("Average round trip cost: %d usec\n",
(int) (zclock_time () - start) / 10);

flclient_destroy (&client);
return 0;
}

// Here is the flclient class implementation. Each instance has a
// context, a DEALER socket it uses to talk to the servers, a counter
// of how many servers it's connected to, and a request sequence number:

struct _flclient_t {
zctx_t *ctx; // Our context wrapper
void *socket; // DEALER socket talking to servers
size_t servers; // How many servers we have connected to
uint sequence; // Number of requests ever sent
};

// Constructor

flclient_t *
flclient_new (void)
{
flclient_t
*self;

self = (flclient_t *) zmalloc (sizeof (flclient_t));
self->ctx = zctx_new ();
self->socket = zsocket_new (self->ctx, ZMQ_DEALER);
return self;
}

// Destructor

void
flclient_destroy (flclient_t **self_p)
{
assert (self_p);
if (*self_p) {
flclient_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}

// Connect to new server endpoint

void
flclient_connect (flclient_t *self, char *endpoint)
{
assert (self);
zsocket_connect (self->socket, endpoint);
self->servers++;
}

// This method does the hard work. It sends a request to all
// connected servers in parallel (for this to work, all connections
// must be successful and completed by this time). It then waits
// for a single successful reply, and returns that to the caller.
// Any other replies are just dropped:

zmsg_t *
flclient_request (flclient_t *self, zmsg_t **request_p)
{
assert (self);
assert (*request_p);
zmsg_t *request = *request_p;

// Prefix request with sequence number and empty envelope
char sequence_text [10];
sprintf (sequence_text, "%u", ++self->sequence);
zmsg_pushstr (request, sequence_text);
zmsg_pushstr (request, "");

// Blast the request to all connected servers
int server;
for (server = 0; server < self->servers; server++) {
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->socket);
}
// Wait for a matching reply to arrive from anywhere
// Since we can poll several times, calculate each one
zmsg_t *reply = NULL;
uint64_t endtime = zclock_time () + GLOBAL_TIMEOUT;
while (zclock_time () < endtime) {
zmq_pollitem_t items [] = { { self->socket, 0, ZMQ_POLLIN, 0 } };
zmq_poll (items, 1, (endtime - zclock_time ()) * ZMQ_POLL_MSEC);
if (items [0].revents & ZMQ_POLLIN) {
// Reply is [empty][sequence][OK]
reply = zmsg_recv (self->socket);
assert (zmsg_size (reply) == 3);
free (zmsg_popstr (reply));
char *sequence = zmsg_popstr (reply);
int sequence_nbr = atoi (sequence);
free (sequence);
if (sequence_nbr == self->sequence)
break;
zmsg_destroy (&reply);
}
}
zmsg_destroy (request_p);
return reply;
}

flclient2.c: Freelance client, Model Two

Some notes on this code:

  • The client is structured as a nice little class-based API that hides the dirty work of creating ØMQ contexts and sockets, and talking to the server. If a shotgun blast to the midriff can be called "talking".
  • The client will abandon the chase if it can't find any responsive server within a few seconds.
  • The client has to create a valid REP envelope, i.e. add an empty message frame to the front of the message.

The client does 10,000 name resolution requests (fake ones, since our server does essentially nothing), and measures the average cost. On my test box, talking to one server, it's about 60 usec. Talking to three servers, it's about 80 usec.

So pros and cons of our shotgun approach:

  • Pro: it is simple, easy to make and easy to understand.
  • Pro: it does the job of fail-over, and works rapidly, so long as there is at least one server running.
  • Con: it creates redundant network traffic.
  • Con: we can't prioritize our servers, i.e. Primary, then Secondary.
  • Con: the server can do at most one request at a time, period.

Model Three - Complex and Nasty

topprevnext

The shotgun approach seems too good to be true. Let's be scientific and work through all the alternatives. We're going to explore the complex/nasty option, even if it's only to finally realize that we preferred brutal. Ah, the story of my life.

We can solve the main problems of the client by switching to a ROUTER socket. That lets us send requests to specific servers, avoid servers we know are dead, and in general be as smart as we want to make it. We can also solve the main problem of the server (single threadedness) by switching to a ROUTER socket.

But doing ROUTER-to-ROUTER between two anonymous sockets (which haven't set an identity) is not possible. Both sides generate an identity (for the other peer) only when they receive a first message, and thus neither can talk to the other until it has first received a message. The only way out of this conundrum is to cheat, and use hard-coded identities in one direction. The proper way to cheat, in a client server case, is that the client 'knows' the identity of the server. Vice-versa would be insane, on top of complex and nasty. Great attributes for a genocidal dictator, terrible ones for software.

Rather than invent yet another concept to manage, we'll use the connection endpoint as identity. This is a unique string both sides can agree on without more prior knowledge than they already have for the shotgun model. It's a sneaky and effective way to connect two ROUTER sockets.

Remember how ØMQ identities work. The server ROUTER socket sets an identity before it binds its socket. When a client connects, they do a little handshake to exchange identities, before either side sends a real message. The client ROUTER socket, having not set an identity, sends a null identity to the server. The server generates a random UUID for the client, for its own use. The server sends its identity (which we've agreed is going to be an endpoint string) to the client.

This means our client can route a message to the server (i.e. send on its ROUTER socket, specifying the server endpoint as identity) as soon as the connection is established. That's not immediately after doing a zmq_connect, but some random time thereafter. Herein lies one problem: we don't know when the server will actually be available and complete its connection handshake. If the server is actually online, it could be after a few milliseconds. If the server is down, and the sysadmin is out to lunch, it could be an hour.

There's a small paradox here. We need to know when servers become connected and available for work. In the Freelance pattern, unlike the broker-based patterns we saw earlier in this chapter, servers are silent until spoken to. Thus we can't talk to a server until it's told us it's on-line, which it can't do until we've asked it.

My solution is to mix in a little of the shotgun approach from model 2, meaning we'll fire (harmless) shots at anything we can, and if anything moves, we know it's alive. We're not going to fire real requests, but rather a kind of ping-pong heartbeat.

This brings us to the realm of protocols again, so here's a short spec that defines how a Freelance client and server exchange PING-PONG commands, and request-reply commands:

It is short and sweet to implement as a server. Here's our echo server, Model Three, now speaking FLP.

Model Three of the server is just slightly different:

--
-- Freelance server - Model 3
-- Uses an ROUTER/ROUTER socket but just one thread
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zmsg"

local verbose = (arg[1] == "-v")

local context = zmq.init(1)
s_catch_signals ()

-- Prepare server socket with predictable identity
local bind_endpoint = "tcp://*:5555"
local connect_endpoint = "tcp://localhost:5555"
local server = context:socket(zmq.ROUTER)
server:setopt(zmq.IDENTITY, connect_endpoint)
server:bind(bind_endpoint)
printf ("I: service is ready at %s\n", bind_endpoint)

while (not s_interrupted) do
local request = zmsg.recv (server)
local reply = nil
if (not request) then
break -- Interrupted
end
if (verbose) then
request:dump()
end
-- Frame 0: identity of client
-- Frame 1: PING, or client control frame
-- Frame 2: request body
local address = request:pop()
if (request:parts() == 1 and request:body() == "PING") then
reply = zmsg.new ("PONG")
elseif (request:parts() > 1) then
reply = request
request = nil
reply:body_set("OK")
end
reply:push(address)
if (verbose and reply) then
reply:dump()
end
reply:send(server)
end
if (s_interrupted) then
printf ("W: interrupted\n")
end
server:close()
context:term()

flserver3.lua: Freelance server, Model Three

The Freelance client, however, has gotten large. For clarity, it's split into an example application and a class that does the hard work. Here's the top-level application:

// Freelance client - Model 3
// Uses flcliapi class to encapsulate Freelance pattern

// Lets us build this source without creating a library
#include "flcliapi.c"

int main (void)
{
// Create new freelance client object
flcliapi_t *client = flcliapi_new ();

// Connect to several endpoints
flcliapi_connect (client, "tcp://localhost:5555");
flcliapi_connect (client, "tcp://localhost:5556");
flcliapi_connect (client, "tcp://localhost:5557");

// Send a bunch of name resolution 'requests', measure time
int requests = 1000;
uint64_t start = zclock_time ();
while (requests--) {
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "random name");
zmsg_t *reply = flcliapi_request (client, &request);
if (!reply) {
printf ("E: name service not available, aborting\n");
break;
}
zmsg_destroy (&reply);
}
printf ("Average round trip cost: %d usec\n",
(int) (zclock_time () - start) / 10);

flcliapi_destroy (&client);
return 0;
}

flclient3.c: Freelance client, Model Three

And here, almost as complex and large as the Majordomo broker, is the client API class:

// flcliapi class - Freelance Pattern agent class
// Implements the Freelance Protocol at http://rfc.zeromq.org/spec:10

#include "flcliapi.h"

// If no server replies within this time, abandon request
#define GLOBAL_TIMEOUT 3000 // msecs
// PING interval for servers we think are alive
#define PING_INTERVAL 2000
// msecs
// Server considered dead if silent for this long
#define SERVER_TTL 6000
// msecs

// This API works in two halves, a common pattern for APIs that need to
// run in the background. One half is an frontend object our application
// creates and works with; the other half is a backend "agent" that runs
// in a background thread. The frontend talks to the backend over an
// inproc pipe socket:

// Structure of our frontend class

struct _flcliapi_t {
zctx_t *ctx; // Our context wrapper
void *pipe; // Pipe through to flcliapi agent
};

// This is the thread that handles our real flcliapi class
static void flcliapi_agent (void *args, zctx_t *ctx, void *pipe);

// Constructor

flcliapi_t *
flcliapi_new (void)
{
flcliapi_t
*self;

self = (flcliapi_t *) zmalloc (sizeof (flcliapi_t));
self->ctx = zctx_new ();
self->pipe = zthread_fork (self->ctx, flcliapi_agent, NULL);
return self;
}

// Destructor

void
flcliapi_destroy (flcliapi_t **self_p)
{
assert (self_p);
if (*self_p) {
flcliapi_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}

// To implement the connect method, the frontend object sends a multipart
// message to the backend agent. The first part is a string "CONNECT", and
// the second part is the endpoint. It waits 100msec for the connection to
// come up, which isn't pretty, but saves us from sending all requests to a
// single server, at startup time:

void
flcliapi_connect (flcliapi_t *self, char *endpoint)
{
assert (self);
assert (endpoint);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "CONNECT");
zmsg_addstr (msg, endpoint);
zmsg_send (&msg, self->pipe);
zclock_sleep (100); // Allow connection to come up
}

// To implement the request method, the frontend object sends a message
// to the backend, specifying a command "REQUEST" and the request message:

zmsg_t *
flcliapi_request (flcliapi_t *self, zmsg_t **request_p)
{
assert (self);
assert (*request_p);

zmsg_pushstr (*request_p, "REQUEST");
zmsg_send (request_p, self->pipe);
zmsg_t *reply = zmsg_recv (self->pipe);
if (reply) {
char *status = zmsg_popstr (reply);
if (streq (status, "FAILED"))
zmsg_destroy (&reply);
free (status);
}
return reply;
}

// Here we see the backend agent. It runs as an attached thread, talking
// to its parent over a pipe socket. It is a fairly complex piece of work
// so we'll break it down into pieces. First, the agent manages a set of
// servers, using our familiar class approach:

// Simple class for one server we talk to

typedef struct {
char *endpoint; // Server identity/endpoint
uint alive; // 1 if known to be alive
int64_t ping_at; // Next ping at this time
int64_t expires; // Expires at this time
} server_t;

server_t *
server_new (char *endpoint)
{
server_t *self = (server_t *) zmalloc (sizeof (server_t));
self->endpoint = strdup (endpoint);
self->alive = 0;
self->ping_at = zclock_time () + PING_INTERVAL;
self->expires = zclock_time () + SERVER_TTL;
return self;
}

void
server_destroy (server_t **self_p)
{
assert (self_p);
if (*self_p) {
server_t *self = *self_p;
free (self->endpoint);
free (self);
*self_p = NULL;
}
}

int
server_ping (const char *key, void *server, void *socket)
{
server_t *self = (server_t *) server;
if (zclock_time () >= self->ping_at) {
zmsg_t *ping = zmsg_new ();
zmsg_addstr (ping, self->endpoint);
zmsg_addstr (ping, "PING");
zmsg_send (&ping, socket);
self->ping_at = zclock_time () + PING_INTERVAL;
}
return 0;
}

int
server_tickless (const char *key, void *server, void *arg)
{
server_t *self = (server_t *) server;
uint64_t *tickless = (uint64_t *) arg;
if (*tickless > self->ping_at)
*tickless = self->ping_at;
return 0;
}

// We build the agent as a class that's capable of processing messages
// coming in from its various sockets:

// Simple class for one background agent

typedef struct {
zctx_t *ctx; // Own context
void *pipe; // Socket to talk back to application
void *router; // Socket to talk to servers
zhash_t *servers; // Servers we've connected to
zlist_t *actives; // Servers we know are alive
uint sequence; // Number of requests ever sent
zmsg_t *request; // Current request if any
zmsg_t *reply; // Current reply if any
int64_t expires; // Timeout for request/reply
} agent_t;

agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->router = zsocket_new (self->ctx, ZMQ_ROUTER);
self->servers = zhash_new ();
self->actives = zlist_new ();
return self;
}

void
agent_destroy (agent_t **self_p)
{
assert (self_p);
if (*self_p) {
agent_t *self = *self_p;
zhash_destroy (&self->servers);
zlist_destroy (&self->actives);
zmsg_destroy (&self->request);
zmsg_destroy (&self->reply);
free (self);
*self_p = NULL;
}
}

// This method processes one message from our frontend class
// (it's going to be CONNECT or REQUEST):

// Callback when we remove server from agent 'servers' hash table

static void
s_server_free (void *argument)
{
server_t *server = (server_t *) argument;
server_destroy (&server);
}

void
agent_control_message (agent_t *self)
{
zmsg_t *msg = zmsg_recv (self->pipe);
char *command = zmsg_popstr (msg);

if (streq (command, "CONNECT")) {
char *endpoint = zmsg_popstr (msg);
printf ("I: connecting to %s…\n", endpoint);
int rc = zmq_connect (self->router, endpoint);
assert (rc == 0);
server_t *server = server_new (endpoint);
zhash_insert (self->servers, endpoint, server);
zhash_freefn (self->servers, endpoint, s_server_free);
zlist_append (self->actives, server);
server->ping_at = zclock_time () + PING_INTERVAL;
server->expires = zclock_time () + SERVER_TTL;
free (endpoint);
}
else
if (streq (command, "REQUEST")) {
assert (!self->request); // Strict request-reply cycle
// Prefix request with sequence number and empty envelope
char sequence_text [10];
sprintf (sequence_text, "%u", ++self->sequence);
zmsg_pushstr (msg, sequence_text);
// Take ownership of request message
self->request = msg;
msg = NULL;
// Request expires after global timeout
self->expires = zclock_time () + GLOBAL_TIMEOUT;
}
free (command);
zmsg_destroy (&msg);
}

// This method processes one message from a connected
// server:

void
agent_router_message (agent_t *self)
{
zmsg_t *reply = zmsg_recv (self->router);

// Frame 0 is server that replied
char *endpoint = zmsg_popstr (reply);
server_t *server =
(server_t *) zhash_lookup (self->servers, endpoint);
assert (server);
free (endpoint);
if (!server->alive) {
zlist_append (self->actives, server);
server->alive = 1;
}
server->ping_at = zclock_time () + PING_INTERVAL;
server->expires = zclock_time () + SERVER_TTL;

// Frame 1 may be sequence number for reply
char *sequence = zmsg_popstr (reply);
if (atoi (sequence) == self->sequence) {
zmsg_pushstr (reply, "OK");
zmsg_send (&reply, self->pipe);
zmsg_destroy (&self->request);
}
else
zmsg_destroy (&reply);
}

// Finally, here's the agent task itself, which polls its two sockets
// and processes incoming messages:

static void
flcliapi_agent (void *args, zctx_t *ctx, void *pipe)
{
agent_t *self = agent_new (ctx, pipe);

zmq_pollitem_t items [] = {
{ self->pipe, 0, ZMQ_POLLIN, 0 },
{ self->router, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
// Calculate tickless timer, up to 1 hour
uint64_t tickless = zclock_time () + 1000 * 3600;
if (self->request
&& tickless > self->expires)
tickless = self->expires;
zhash_foreach (self->servers, server_tickless, &tickless);

int rc = zmq_poll (items, 2,
(tickless - zclock_time ()) * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Context has been shut down

if (items [0].revents & ZMQ_POLLIN)
agent_control_message (self);

if (items [1].revents & ZMQ_POLLIN)
agent_router_message (self);

// If we're processing a request, dispatch to next server
if (self->request) {
if (zclock_time () >= self->expires) {
// Request expired, kill it
zstr_send (self->pipe, "FAILED");
zmsg_destroy (&self->request);
}
else {
// Find server to talk to, remove any expired ones
while (zlist_size (self->actives)) {
server_t *server =
(server_t *) zlist_first (self->actives);
if (zclock_time () >= server->expires) {
zlist_pop (self->actives);
server->alive = 0;
}
else {
zmsg_t *request = zmsg_dup (self->request);
zmsg_pushstr (request, server->endpoint);
zmsg_send (&request, self->router);
break;
}
}
}
}
// Disconnect and delete any expired servers
// Send heartbeats to idle servers if needed
zhash_foreach (self[[span style="color:#666666"]]->servers, server_ping, self[[span style="color:#666666"]]->router);
}
agent_destroy ([[span style="color:#666666"]]&self);
}

flcliapi.c: Freelance client API

This API implementation is fairly sophisticated and uses a couple of techniques that we've not seen before:

Asynchronous agent class

The client API consists of two parts, a synchronous 'flcliapi' class that runs in the application thread, and an asynchronous 'agent' class that runs in the background. The flcliapi and agent classes talk to each other over an inproc socket. All ØMQ aspects (such as creating and terminating a context) are hidden in the API. The agent in effect acts like a mini-broker, talking to servers in the background, so that when we make a request, it can make a best effort to reach a server it believes is available.

Tickless poll timer

In previous poll loops we always used a fixed tick interval, e.g. 1 second, which is simple enough but not excellent on power-sensitive clients, such as notebooks or mobile phones, where waking the CPU costs power. For fun, and to help save the planet, the agent uses a 'tickless timer', which calculates the poll delay based on the next timeout we're expecting. A proper implementation would keep an ordered list of timeouts. We just check all timeouts and calculate the poll delay until the next one.

Conclusion

topprevnext

In this chapter we've seen a variety of reliable request-reply mechanisms, each with certain costs and benefits. The example code is largely ready for real use, though it is not optimized. Of all the different patterns, the two that stand out are the Majordomo pattern, for broker-based reliability, and the Freelance pattern for brokerless reliability.