Chapter Three

Chapter Three - Advanced Request-Reply Patterns

topprevnext

In Chapter Two we worked through the basics of using ØMQ by developing a series of small applications, each time exploring new aspects of ØMQ. We'll continue this approach in this chapter, as we explore advanced patterns built on top of ØMQ's core request-reply pattern.

We'll cover:

  • How to create and use message envelopes for request-reply.
  • How to use the REQ, REP, DEALER, and ROUTER sockets.
  • How to set manual reply addresses using identities.
  • How to do custom random scatter routing.
  • How to do custom least-recently used routing.
  • How to build a higher-level message class.
  • How to build a basic request-reply broker.
  • How to choose good names for sockets.
  • How to simulate a cluster of clients and workers.
  • How to build a scalable cloud of request-reply clusters.
  • How to use pipeline sockets for monitoring threads.

Request-Reply Envelopes

topprevnext

In the request-reply pattern, the envelope holds the return address for replies. It is how a ØMQ network with no state can create round-trip request-reply dialogs.

You don't in fact need to understand how request-reply envelopes work to use them for common cases. When you use REQ and REP, your sockets build and use envelopes automatically. When you write a device, and we covered this in the last chapter, you just need to read and write all the parts of a message. ØMQ implements envelopes using multipart data, so if you copy multipart data safely, you implicitly copy envelopes too.

However, getting under the hood and playing with request-reply envelopes is necessary for advanced request-reply work. It's time to explain how ROUTER works, in terms of envelopes:

  • When you receive a message from a ROUTER socket, it shoves a brown paper envelope around the message and scribbles on with indelible ink, "This came from Lucy". Then it gives that to you. That is, the ROUTER socket gives you what came off the wire, wrapped up in an envelope with the reply address on it.
  • When you send a message to a ROUTER socket, it rips off that brown paper envelope, tries to read its own handwriting, and if it knows who "Lucy" is, sends the contents back to Lucy. That is the reverse process of receiving a message.

If you leave the brown envelope alone, and then pass that message to another ROUTER socket (e.g. by sending to a DEALER connected to a ROUTER), the second ROUTER socket will in turn stick another brown envelope on it, and scribble the name of that DEALER on it.

The whole point of this is that each ROUTER knows how to send replies back to the right place. All you need to do, in your application, is respect the brown envelopes. Now the REP socket makes sense. It carefully slices open the brown envelopes, one by one, keeps them safely aside, and gives you (the application code that owns the REP socket) the original message. When you send the reply, it re-wraps the reply in the brown paper envelopes, so it can hand the resulting brown package back to the ROUTER sockets down the chain.

Which lets you insert ROUTER-DEALER devices into a request-reply pattern like this:

[REQ] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [ROUTER--DEALER] <--> [REP]
...etc.

If you connect a REQ socket to a ROUTER socket, and send one request message, you will get a message that consists of three frames: a reply address, an empty message frame, and the 'real' message.

Figure 26 - Single-hop Request-reply Envelope

fig26.png

Breaking this down:

  • The data in frame 3 is what the sending application sends to the REQ socket.
  • The empty message frame in frame 2 is prepended by the REQ socket when it sends the message to the ROUTER socket.
  • The reply address in frame 1 is prepended by the ROUTER before it passes the message to the receiving application.

Now if we extend this with a chain of devices, we get envelope on envelope, with the newest envelope always stuck at the beginning of the stack.

Figure 27 - Multihop Request-reply Envelope

fig27.png

Here now is a more detailed explanation of the four socket types we use for request-reply patterns:

  • DEALER just load-balances (deals out) the messages you send to all connected peers, and fair-queues (deals in) the messages it receives. It is exactly like a PUSH and PULL socket combined.
  • REQ prepends an empty message frame to every message you send, and removes the empty message frame from each message you receive. It then works like DEALER (and in fact is built on DEALER) except it also imposes a strict send / receive cycle.
  • ROUTER prepends an envelope with reply address to each message it receives, before passing it to the application. It also chops off the envelope (the first message frame) from each message it sends, and uses that reply address to decide which peer the message should go to.
  • REP stores all the message frames up to the first empty message frame, when you receive a message and it passes the rest (the data) to your application. When you send a reply, REP prepends the saved envelopes to the message and sends it back using the same semantics as ROUTER (and in fact REP is built on top of ROUTER), but matching REQ, imposes a strict receive / send cycle.

REP requires that the envelopes end with an empty message frame. If you're not using REQ at the other end of the chain then you must add the empty message frame yourself.

So the obvious question about ROUTER is, where does it get the reply addresses from? And the obvious answer is, it uses the socket's identity. As we already learned, if a socket does not set an identity, the ROUTER socket generates an identity that it can associate with the connection to that socket.

Figure 28 - ROUTER Invents a UUID

fig28.png

When we set our own identity on a socket, this gets passed to the ROUTER socket, which passes it to the application as part of the envelope for each message that comes in.

Figure 29 - ROUTER uses Identity If It knows It

fig29.png

Let's observe the above two cases in practice. This program dumps the contents of the message frames that a ROUTER socket receives from two REP sockets, one not using identities, and one using an identity 'Hello':

--
-- Demonstrate identities as used by the request-reply pattern. Run this
-- program by itself. Note that the utility functions s_ are provided by
-- zhelpers.h. It gets boring for everyone to keep repeating this code.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zhelpers"

local context = zmq.init(1)

local sink = context:socket(zmq.ROUTER)
sink:bind("inproc://example")

-- First allow 0MQ to set the identity
local anonymous = context:socket(zmq.REQ)
anonymous:connect("inproc://example")
anonymous:send("ROUTER uses a generated 5 byte identity")
s_dump(sink)

-- Then set the identity ourselves
local identified = context:socket(zmq.REQ)
identified:setopt(zmq.IDENTITY, "PEER2")
identified:connect("inproc://example")
identified:send("ROUTER socket uses REQ's socket identity")
s_dump(sink)

sink:close()
anonymous:close()
identified:close()
context:term()

identity.lua: Identity check

Here is what the dump function prints:

----------------------------------------
[017] 00314F043F46C441E28DD0AC54BE8DA727
[000]
[026] ROUTER uses a generated UUID
----------------------------------------
[005] Hello
[000]
[038] ROUTER socket uses REQ's socket identity

Custom Request-Reply Routing

topprevnext

We already saw that ROUTER uses the message envelope to decide which client to route a reply back to. Now let me express that in another way: ROUTER will route messages asynchronously to any peer connected to it, if you provide the correct routing address via a properly constructed envelope.

So ROUTER is really a fully controllable router. We'll dig into this magic in detail.

But first, and because we're going to go off-road into some rough and possibly illegal terrain now, let's look closer at REQ and REP. Few people know this, but despite their kindergarten approach to messaging, REQ and REP are actually colorful characters:

  • REQ is a mama socket, doesn't listen but always expects an answer. Mamas are strictly synchronous and if you use them they are always the 'request' end of a chain.
  • REP is a papa socket, always answers, but never starts a conversation. Papas are strictly synchronous and if you use them, they are always the 'reply' end of a chain.

The thing about Mama sockets is, as we all learned as kids, you can't speak until spoken to. Mamas do not have simple open-mindedness of papas, nor the ambiguous "sure, whatever" shrugged-shoulder aloofness of a dealer. So to speak to a mama socket, you have to get the mama socket to talk to you first. The good part is mamas don't care if you reply now, or much later. Just bring a good sob story and a bag of laundry.

Papa sockets on the other hand are strong and silent, and pedantic. They do just one thing, which is to give you an answer to whatever you ask, perfectly framed and precise. Don't expect a papa socket to be chatty, or to pass a message on to someone else, this is just not going to happen.

While we usually think of request-reply as a to-and-fro pattern, in fact it can be fully asynchronous, as long as we understand that any mamas or papas will be at the end of a chain, never in the middle of it, and always synchronous. All we need to know is the address of the peer we want to talk to, and then we can then send it messages asynchronously, via a router. The router is the one and only ØMQ socket type capable of being told "send this message to X" where X is the address of a connected peer.

These are the ways we can know the address to send a message to, and you'll see most of these used in the examples of custom request-reply routing:

  • By default, a peer has a null identity and the router will generate a UUID and use that to refer to the connection when it delivers you each incoming message from that peer.
  • If the peer socket set an identity, the router will give that identity when it delivers an incoming request envelope from that peer.
  • Peers with explicit identities can send them via some other mechanism, e.g. via some other sockets.
  • Peers can have prior knowledge of each others' identities, e.g. via configuration files or some other magic.

There are at least three routing patterns, one for each of the socket types we can easily connect to a router:

  • Router-to-dealer.
  • Router-to-mama (REQ).
  • Router-to-papa (REP).

In each of these cases we have total control over how we route messages, but the different patterns cover different use-cases and message flows. Let's break it down over the next sections with examples of different routing algorithms.

But first some warnings about custom routing:

  • This goes against a fairly solid ØMQ rule: delegate peer addressing to the socket. The only reason we do it is because ØMQ lacks a wide range of routing algorithms.
  • Future versions of ØMQ will probably do some of the routing we're going to build here. That means the code we design now may break, or become redundant in the future.
  • While the built-in routing has certain guarantees of scalability, such as being friendly to devices, custom routing doesn't. You will need to make your own devices.

So overall, custom routing is more expensive and more fragile than delegating this to ØMQ. Only do it if you need it. Having said that, let's jump in, the water's great!

Router-to-Dealer Routing

topprevnext

The router-to-dealer pattern is the simplest. You connect one router to many dealers, and then distribute messages to the dealers using any algorithm you like. The dealers can be sinks (process the messages without any response), proxies (send the messages on to other nodes), or services (send back replies).

If you expect the dealer to reply, there should only be one router talking to it. Dealers have no idea how to reply to a specific peer, so if they have multiple peers, they will load-balance between them, which would be weird. If the dealer is a sink, any number of routers can talk to it.

What kind of routing can you do with a router-to-dealer pattern? If the dealers talk back to the router, e.g. telling the router when they finished a task, you can use that knowledge to route depending on how fast a dealer is. Since both router and dealer are asynchronous, it can get a little tricky. You'd need to use zmq_poll(3) at least.

We'll make an example where the dealers don't talk back, they're pure sinks. Our routing algorithm will be a weighted random scatter: we have two dealers and we send twice as many messages to one as to the other.

Figure 30 - Router-to-Dealer Custom Routing

fig30.png

Here's code that shows how this works:

--
-- Custom routing Router to Dealer
--
-- While this example runs in a single process, that is just to make
-- it easier to start and stop the example. Each thread has its own
-- context and conceptually acts as a separate process.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zmq.threads"
require"zhelpers"

local pre_code = [[
local zmq = require"zmq"
require"zhelpers"
--local threads = require"zmq.threads"
--local context = threads.get_parent_ctx()
]]

-- We have two workers, here we copy the code, normally these would
-- run on different boxes…
--
local worker_task_a = pre_code .. [[
local context = zmq.init(1)
local worker = context:socket(zmq.DEALER)
worker:setopt(zmq.IDENTITY, "A")
worker:connect("ipc://routing.ipc")

local total = 0
while true do
-- We receive one part, with the workload
local request = worker:recv()
local finished = (request == "END")

if (finished) then
printf ("A received: %d\n", total)
break
end
total = total + 1
end
worker:close()
context:term()
]]

local worker_task_b = pre_code .. [[
local context = zmq.init(1)
local worker = context:socket(zmq.DEALER)
worker:setopt(zmq.IDENTITY, "B")
worker:connect("ipc://routing.ipc")

local total = 0
while true do
-- We receive one part, with the workload
local request = worker:recv()
local finished = (request == "END")

if (finished) then
printf ("B received: %d\n", total)
break
end
total = total + 1
end
worker:close()
context:term()
]]

s_version_assert (2, 1)
local context = zmq.init(1)

local client = context:socket(zmq.ROUTER)
client:bind("ipc://routing.ipc")

local task_a = zmq.threads.runstring(context, worker_task_a)
task_a:start()

local task_b = zmq.threads.runstring(context, worker_task_b)
task_b:start()

-- Wait for threads to connect, since otherwise the messages
-- we send won't be routable.
s_sleep (1000)

-- Send 10 tasks scattered to A twice as often as B
math.randomseed(os.time())
for n=1,10 do
-- Send two message parts, first the address…
if (randof (3) > 0) then
client:send("A", zmq.SNDMORE)
else
client:send("B", zmq.SNDMORE)
end

-- And then the workload
client:send("This is the workload")
end
client:send("A", zmq.SNDMORE)
client:send("END")

client:send("B", zmq.SNDMORE)
client:send("END")

client:close()
context:term()

assert(task_a:join())
assert(task_b:join())

rtdealer.lua: Router-to-dealer

Some comments on this code:

  • The router doesn't know when the dealers are ready, and it would be distracting for our example to add in the signaling to do that. So the router just does a "sleep (1)" after starting the dealer threads. Without this sleep, the router will send out messages that can't be routed, and ØMQ will discard them.
  • Note that this behavior is specific to ROUTER sockets. PUB sockets will also discard messages if there are no subscribers, but all other socket types will queue sent messages until there's a peer to receive them.

To route to a dealer, we create an envelope consisting of just an identity frame (we don't need a null separator).

Figure 31 - Routing Envelope for Dealer

fig31.png

The router socket removes the first frame, and sends the second frame, which the dealer gets as-is. When the dealer sends a message to the router, it sends one frame. The router prepends the dealer's address and gives us back a similar envelope in two parts.

Something to note: if you use an invalid address, the router discards the message silently. There is not much else it can do usefully. In normal cases this either means the peer has gone away, or that there is a programming error somewhere and you're using a bogus address. In any case you cannot ever assume a message will be routed successfully until and unless you get a reply of some sort from the destination node. We'll come to creating reliable patterns later on.

Dealers in fact work exactly like PUSH and PULL combined. It's however illegal and pointless to connect PULL or PUSH to a request-reply socket.

Least-Recently Used Routing (LRU Pattern)

topprevnext

Like we said, mamas (REQ sockets, if you really insist on it) don't listen to you, and if you try to speak out of turn they'll ignore you. You have to wait for them to say something, then you can give a sarcastic answer. This is very useful for routing because it means we can keep a bunch of mamas waiting for answers. In effect, mamas tell us when they're ready.

You can connect one router to many mamas, and distribute messages as you would to dealers. Mamas will usually want to reply, but they will let you have the last word. However it's one thing at a time:

  • Mama speaks to router
  • Router replies to mama
  • Mama speaks to router
  • Router replies to mama
  • etc.

Like dealers, mamas can only talk to one router and since mamas always start by talking to the router, you should never connect one mama to more than one router unless you are doing sneaky stuff like multi-pathway redundant routing. I'm not even going to explain that now, and hopefully the jargon is complex enough to stop you trying this until you need it.

Figure 32 - Router to Mama Custom Routing

fig32.png

What kind of routing can you do with a router-to-mama pattern? Probably the most obvious is "least-recently-used" (LRU), where we always route to the mama that's been waiting longest. Here is an example that does LRU routing to a set of mamas:

--
-- Custom routing Router to Mama (ROUTER to REQ)
--
-- While this example runs in a single process, that is just to make
-- it easier to start and stop the example. Each thread has its own
-- context and conceptually acts as a separate process.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zmq.threads"
require"zhelpers"

NBR_WORKERS = 10

local pre_code = [[
local identity, seed = …
local zmq = require"zmq"
require"zhelpers"
math.randomseed(seed)
]]

local worker_task = pre_code .. [[
local context = zmq.init(1)
local worker = context:socket(zmq.REQ)

-- We use a string identity for ease here
worker:setopt(zmq.IDENTITY, identity)
worker:connect("ipc://routing.ipc")

local total = 0
while true do
-- Tell the router we're ready for work
worker:send("ready")

-- Get workload from router, until finished
local workload = worker:recv()
local finished = (workload == "END")

if (finished) then
printf ("Processed: %d tasks\n", total)
break
end
total = total + 1

-- Do some random work
s_sleep (randof (1000) + 1)
end
worker:close()
context:term()
]]

s_version_assert (2, 1)
local context = zmq.init(1)

local client = context:socket(zmq.ROUTER)
client:bind("ipc://routing.ipc")
math.randomseed(os.time())

local workers = {}
for n=1,NBR_WORKERS do
local identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
local seed = os.time() + math.random()
workers[n] = zmq.threads.runstring(context, worker_task, identity, seed)
workers[n]:start()
end
for n=1,(NBR_WORKERS * 10) do
-- LRU worker is next waiting in queue
local address = client:recv()
local empty = client:recv()

local ready = client:recv()

client:send(address, zmq.SNDMORE)
client:send("", zmq.SNDMORE)
client:send("This is the workload")

end
-- Now ask mamas to shut down and report their results
for n=1,NBR_WORKERS do
local address = client:recv()
local empty = client:recv()

local ready = client:recv()

client:send(address, zmq.SNDMORE)
client:send("", zmq.SNDMORE)
client:send("END")

end

for n=1,NBR_WORKERS do
assert(workers[n]:join())
end

client:close()
context:term()

rtmama.lua: Router-to-mama

For this example the LRU doesn't need any particular data structures above what ØMQ gives us (message queues) because we don't need to synchronize the workers with anything. A more realistic LRU algorithm would have to collect workers as they become ready, into a queue, and the use this queue when routing client requests. We'll do this in a later example.

To prove that the LRU is working as expected, the mamas print the total tasks they each did. Since the mamas do random work, and we're not load balancing, we expect each mama to do approximately the same amount but with random variation. And that is indeed what we see:

Processed: 8 tasks
Processed: 8 tasks
Processed: 11 tasks
Processed: 7 tasks
Processed: 9 tasks
Processed: 11 tasks
Processed: 14 tasks
Processed: 11 tasks
Processed: 11 tasks
Processed: 10 tasks

Some comments on this code

  • We don't need any settle time, since the mamas explicitly tell the router when they are ready.
  • We're generating our own identities here, as printable strings, using the zhelpers.h s_set_id function. That's just to make our life a little simpler. In a realistic application the mamas would be fully anonymous and then you'd call zmq_recv(3) and zmq_send(3) directly instead of the zhelpers s_recv() and s_send() functions, which can only handle strings.
  • If you copy and paste example code without understanding it, you deserve what you get. It's like watching Spiderman leap off the roof and then trying that yourself.

To route to a mama, we must create a mama-friendly envelope consisting of an address plus an empty message frame.

Figure 33 - Routing Envelope for Mama (REQ)

fig33.png

Address-based Routing

topprevnext

Papas are, if we care about them at all, only there to answer questions. And to pay the bills, fix the car when mama drives it into the garage wall, put up shelves, and walk the dog when it's raining. But apart from that, papas are only there to answer questions.

In a classic request-reply pattern a router wouldn't talk to a papa socket at all, but rather would get a dealer to do the job for it. That's what dealers are for: to pass questions onto random papas and come back with their answers. Routers are generally more comfortable talking to mamas. OK, dear reader, you may stop the psychoanalysis. These are analogies, not life stories.

It's worth remembering with ØMQ that the classic patterns are the ones that work best, that the beaten path is there for a reason, and that when we go off-road we take the risk of falling off cliffs and getting eaten by zombies. Having said that, let's plug a router into a papa and see what the heck emerges.

The special thing about papas, all joking aside, is actually two things:

  • One, they are strictly lockstep request-reply.
  • Two, they accept an envelope stack of any size and will return that intact.

In the normal request-reply pattern, papas are anonymous and replaceable (wow, these analogies are scary), but we're learning about custom routing. So, in our use-case we have reason to send a request to papa A rather than papa B. This is essential if you want to keep some kind of a conversation going between you, at one end of a large network, and a papa sitting somewhere far away.

A core philosophy of ØMQ is that the edges are smart and many, and the middle is vast and dumb. This does mean the edges can address each other, and this also means we want to know how to reach a given papa. Doing routing across multiple hops is something we'll look at later but for now we'll look just at the final step: a router talking to a specific papa.

Figure 34 - Router-to-Papa Custom Routing

fig34.png

This example shows a very specific chain of events:

  • The client has a message that it expects to route back (via another router) to some node. The message has two addresses (a stack), an empty part, and a body.
  • The client passes that to the router but specifies a papa address first.
  • The router removes the papa address, uses that to decide which papa to send the message to.
  • The papa receives the addresses, empty part, and body.
  • It removes the addresses, saves them, and passes the body to the worker.
  • The worker sends a reply back to the papa.
  • The papa recreates the envelope stack and sends that back with the worker's reply to the router.
  • The router prepends the papa's address and provides that to the client along with the rest of the address stack, empty part, and the body.

It's complex but worth working through until you understand it. Just remember a papa is garbage in, garbage out.

--
-- Custom routing Router to Papa (ROUTER to REP)
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zhelpers"

-- We will do this all in one thread to emphasize the sequence
-- of events…

local context = zmq.init(1)

local client = context:socket(zmq.ROUTER)
client:bind("ipc://routing.ipc")

local worker = context:socket(zmq.REP)
worker:setopt(zmq.IDENTITY, "A")
worker:connect("ipc://routing.ipc")

-- Wait for the worker to connect so that when we send a message
-- with routing envelope, it will actually match the worker…
s_sleep (1000)

-- Send papa address, address stack, empty part, and request
client:send("A", zmq.SNDMORE)
client:send("address 3", zmq.SNDMORE)
client:send("address 2", zmq.SNDMORE)
client:send("address 1", zmq.SNDMORE)
client:send("", zmq.SNDMORE)
client:send("This is the workload")

-- Worker should get just the workload
s_dump (worker)

-- We don't play with envelopes in the worker
worker:send("This is the reply")

-- Now dump what we got off the ROUTER socket…
s_dump (client)

client:close()
worker:close()
context:term()

rtpapa.lua: Router-to-papa

Run this program and it should show you this:

----------------------------------------
[020] This is the workload
----------------------------------------
[001] A
[009] address 3
[009] address 2
[009] address 1
[000]
[017] This is the reply

Some comments on this code:

  • In reality we'd have the papa and router in separate nodes. This example does it all in one thread because it makes the sequence of events really clear.
  • zmq_connect(3) doesn't happen instantly. When the papa socket connects to the router, that takes a certain time and happens in the background. In a realistic application the router wouldn't even know the papa existed until there had been some previous dialog. In our toy example we'll just sleep (1); to make sure the connection's done. If you remove the sleep, the papa socket won't get the message. (Try it.)
  • We're routing using the papa's identity. Just to convince yourself this really is happening, try sending to a wrong address, like "B". The papa won't get the message.
  • The s_dump and other utility functions (in the C code) come from the zhelpers.h header file. It becomes clear that we do the same work over and over on sockets, and there are interesting layers we can build on top of the ØMQ API. We'll come back to this later when we make a real application rather than these toy examples.

To route to a papa, we must create a papa-friendly envelope.

Figure 35 - Routing Envelope for Papa aka REP

fig35.png

A Request-Reply Message Broker

topprevnext

We'll recap the knowledge we have so far about doing weird stuff with ØMQ message envelopes, and build the core of a generic custom routing queue device that we can properly call a message broker. Sorry for all the buzzwords. What we'll make is a queue device that connects a bunch of clients to a bunch of workers, and lets you use any routing algorithm you want. What we'll do is least-recently used, since it's the most obvious use-case apart from load-balancing.

To start with, let's look back at the classic request-reply pattern and then see how it extends over a larger and larger service-oriented network. The basic pattern just has one client talking to a few workers.

Figure 36 - Basic Request-reply

fig36.png

This extends to multiple papas, but if we want to handle multiple mamas as well we need a device in the middle, which normally consists of a router and a dealer back to back, connected by a classic ZMQ_QUEUE device that just copies message frames between the two sockets as fast as it can.

Figure 37 - Stretched Request-reply

fig37.png

The key here is that the router stores the originating mama address in the request envelope, the dealer and papas don't touch that, and so the router knows which mama to send the reply back to. Papas are anonymous and not addressed in this pattern, all papas are assumed to provide the same service.

In the above design, we're using the built-in load balancing routing that the dealer socket provides. However we want for our broker to use a least-recently used algorithm, so we take the router-mama pattern we learned, and apply that.

Figure 38 - Stretched Request-reply with LRU

fig38.png

Our broker - a router-to-router LRU queue - can't simply copy message frames blindly. Here is the code, it's fairly complex but the core logic is reusable in any request-reply broker that wants to do LRU routing:

--
-- Least-recently used (LRU) queue device
-- Clients and workers are shown here in-process
--
-- While this example runs in a single process, that is just to make
-- it easier to start and stop the example. Each thread has its own
-- context and conceptually acts as a separate process.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

require"zmq"
require"zmq.threads"
require"zmq.poller"
require"zhelpers"

local tremove = table.remove

local NBR_CLIENTS = 10
local NBR_WORKERS = 3

local pre_code = [[
local identity, seed = …
local zmq = require"zmq"
require"zhelpers"
math.randomseed(seed)
]]

-- Basic request-reply client using REQ socket
-- Since s_send and s_recv can't handle 0MQ binary identities we
-- set a printable text identity to allow routing.
--
local client_task = pre_code .. [[
local context = zmq.init(1)
local client = context:socket(zmq.REQ)
client:setopt(zmq.IDENTITY, identity) -- Set a printable identity
client:connect("ipc://frontend.ipc")

-- Send request, get reply
client:send("HELLO")
local reply = client:recv()
printf ("Client: %s\n", reply)

client:close()
context:term()
]]

-- Worker using REQ socket to do LRU routing
-- Since s_send and s_recv can't handle 0MQ binary identities we
-- set a printable text identity to allow routing.
--
local worker_task = pre_code .. [[
local context = zmq.init(1)
local worker = context:socket(zmq.REQ)
worker:setopt(zmq.IDENTITY, identity) -- Set a printable identity
worker:connect("ipc://backend.ipc")

-- Tell broker we're ready for work
worker:send("READY")

while true do
-- Read and save all frames until we get an empty frame
-- In this example there is only 1 but it could be more
local address = worker:recv()
local empty = worker:recv()
assert (#empty == 0)

-- Get request, send reply
local request = worker:recv()
printf ("Worker: %s\n", request)

worker:send(address, zmq.SNDMORE)
worker:send("", zmq.SNDMORE)
worker:send("OK")

end
worker:close()
context:term()
]]

s_version_assert (2, 1)

-- Prepare our context and sockets
local context = zmq.init(1)
local frontend = context:socket(zmq.ROUTER)
local backend = context:socket(zmq.ROUTER)
frontend:bind("ipc://frontend.ipc")
backend:bind("ipc://backend.ipc")

local clients = {}
for n=1,NBR_CLIENTS do
local identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
local seed = os.time() + math.random()
clients[n] = zmq.threads.runstring(context, client_task, identity, seed)
clients[n]:start()
end
local workers = {}
for n=1,NBR_WORKERS do
local identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
local seed = os.time() + math.random()
workers[n] = zmq.threads.runstring(context, worker_task, identity, seed)
workers[n]:start(true)
end

-- Logic of LRU loop
-- - Poll backend always, frontend only if 1+ worker ready
-- - If worker replies, queue worker as ready and forward reply
-- to client if necessary
-- - If client requests, pop next worker and send request to it

-- Queue of available workers
local worker_queue = {}

local is_accepting = false
local max_requests = #clients

local poller = zmq.poller(2)

local function frontend_cb()
-- Now get next client request, route to LRU worker
-- Client request is [address][empty][request]
local client_addr = frontend:recv()
local empty = frontend:recv()
assert (#empty == 0)

local request = frontend:recv()

-- Dequeue a worker from the queue.
local worker = tremove(worker_queue, 1)

backend:send(worker, zmq.SNDMORE)
backend:send("", zmq.SNDMORE)
backend:send(client_addr, zmq.SNDMORE)
backend:send("", zmq.SNDMORE)
backend:send(request)

if (#worker_queue == 0) then
-- stop accepting work from clients, when no workers are available.
poller:remove(frontend)
is_accepting = false
end
end

poller:add(backend, zmq.POLLIN, function()
-- Queue worker address for LRU routing
local worker_addr = backend:recv()
worker_queue[#worker_queue + 1] = worker_addr

-- start accepting client requests, if we are not already doing so.
if not is_accepting then
is_accepting = true
poller:add(frontend, zmq.POLLIN, frontend_cb)
end

-- Second frame is empty
local empty = backend:recv()
assert (#empty == 0)

-- Third frame is READY or else a client reply address
local client_addr = backend:recv()

-- If client reply, send rest back to frontend
if (client_addr ~= "READY") then
empty = backend:recv()
assert (#empty == 0)

local reply = backend:recv()
frontend:send(client_addr, zmq.SNDMORE)
frontend:send("", zmq.SNDMORE)
frontend:send(reply)

max_requests = max_requests - 1
if (max_requests == 0) then
poller:stop() -- Exit after N messages
end
end
end)

-- start poller's event loop
poller:start()

frontend:close()
backend:close()
context:term()

for n=1,NBR_CLIENTS do
assert(clients[n]:join())
end
-- workers are detached, we don't need to join with them.

lruqueue.lua: LRU queue broker

The difficult part of this program is (a) the envelopes that each socket reads and writes, and (b) the LRU algorithm. We'll take these in turn, starting with the message envelope formats.

First, recall that a mama REQ socket always puts on an empty part (the envelope delimiter) on sending and removes this empty part on reception. The reason for this isn't important, it's just part of the 'normal' request-reply pattern. What we care about here is just keeping mama happy by doing precisely what she needs. Second, the router always adds an envelope with the address of whomever the message came from.

We can now walk through a full request-reply chain from client to worker and back. In the code we set the identity of client and worker sockets to make it easier to print the message frames if we want to. Let's assume the client's identity is "CLIENT" and the worker's identity is "WORKER". The client sends a single frame with the message.

Figure 39 - Message that Client Sends

fig39.png

What the queue gets, when reading off the router frontend socket, are three frames consisting of the sender address, empty frame delimiter, and the data part.

Figure 40 - Message Coming in on Frontend

fig40.png

The broker sends this to the worker, prefixed by the address of the worker, taken from the LRU queue, plus an additional empty part to keep the mama at the other end happy.

Figure 41 - Message Sent to Backend

fig41.png

This complex envelope stack gets chewed up first by the backend router socket, which removes the first frame. Then the mama socket in the worker removes the empty part, and provides the rest to the worker.

Figure 42 - Message Delivered to Worker

fig42.png

Which is exactly the same as what the queue received on its frontend router socket. The worker has to save the envelope (which is all the parts up to and including the empty message frame) and then it can do what's needed with the data part.

On the return path the messages are the same as when they come in, i.e. the backend socket gives the queue a message in five parts, and the queue sends the frontend socket a message in three parts, and the client gets a message in one part.

Now let's look at the LRU algorithm. It requires that both clients and workers use mama sockets, and that workers correctly store and replay the envelope on messages they get. The algorithm is:

  • Create a pollset which polls the backend always, and the frontend only if there are one or more workers available.
  • Poll for activity with infinite timeout.
  • If there is activity on the backend, we either have a "ready" message or a reply for a client. In either case we store the worker address (the first part) on our LRU queue, and if the rest is a client reply we send it back to that client via the frontend.
  • If there is activity on the frontend, we take the client request, pop the next worker (which is the least-recently used), and send the request to the backend. This means sending the worker address, empty part, and then the three parts of the client request.

You should now see that you can reuse and extend the LRU algorithm with variations based on the information the worker provides in its initial "ready" message. For example, workers might start up and do a performance self-test, then tell the broker how fast they are. The broker can then choose the fastest available worker rather than LRU or round-robin.

A High-Level API for ØMQ

topprevnext

Reading and writing multipart messages using the native ØMQ API is like eating a bowl of hot noodle soup, with fried chicken and extra vegetables, using a toothpick. Look at the core of the worker thread from our LRU queue broker:

void my_free (void *data, void *hint) {
free (data);
}
// Send message from buffer, which we allocate and 0MQ will free for us
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_msg_send (socket, &message, 0);

That code isn't even reusable, because it can only handle one envelope. And this code already does some wrapping around the ØMQ API. If we used the libzmq API directly this is what we'd have to write:

while (1) {
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
char *address = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);

// Get request, send reply
char *request = s_recv (worker);
printf ("Worker: %s\n", request);
free (request);

s_sendmore (worker, address);
s_sendmore (worker, "");
s_send (worker, "OK");
free (address);
}

What we want is an API that lets us receive and send an entire message in one shot, including all envelopes. One that lets us do what we want with the absolute least lines of code. The ØMQ core API itself doesn't aim to do this, but nothing prevents us making layers on top, and part of learning to use ØMQ intelligently is to do exactly that.

Making a good message API is fairly difficult, especially if we want to avoid copying data around too much. We have a problem of terminology: ØMQ uses "message" to describe both multipart messages, and individual parts of a message. We have a problem of semantics: sometimes it's natural to see message content as printable string data, sometimes as binary blobs.

So one solution is to use three concepts: string (already the basis for s_send and s_recv), frame (a message frame), and message (a list of one or more frames). Here is the worker code, rewritten onto an API using these concepts:

while (1) {
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
zmq_msg_t address;
zmq_msg_init (&address);
zmq_msg_recv (worker, &address, 0);

zmq_msg_t empty;
zmq_msg_init (&empty);
zmq_msg_recv (worker, &empty, 0);

// Get request, send reply
zmq_msg_t payload;
zmq_msg_init (&payload);
zmq_msg_recv (worker, &payload, 0);

int char_nbr;
printf ("Worker: ");
for (char_nbr = 0; char_nbr < zmq_msg_size (&payload); char_nbr++)
printf ("%c", *(char *) (zmq_msg_data (&payload) + char_nbr));
printf ("\n");

zmq_msg_init_size (&payload, 2);
memcpy (zmq_msg_data (&payload), "OK", 2);

zmq_msg_send (worker, &address, ZMQ_SNDMORE);
zmq_close (&address);
zmq_msg_send (worker, &empty, ZMQ_SNDMORE);
zmq_close (&empty);
zmq_msg_send (worker, &payload, 0);
zmq_close (&payload);
}

Replacing 22 lines of code with four is a good deal, especially since the results are easy to read and understand. We can continue this process for other aspects of working with ØMQ. Let's make a wishlist of things we would like in a higher-level API:

  • Automatic handling of sockets. I find it really annoying to have to close sockets manually, and to have to explicitly define the linger timeout in some but not all cases. It'd be great to have a way to close sockets automatically when I close the context.
  • Portable thread management. Every non-trivial ØMQ application uses threads, but POSIX threads aren't portable. So a decent high-level API should hide this under a portable layer.
  • Portable clocks. Even getting the time to a millisecond resolution, or sleeping for some milliseconds, is not portable. Realistic ØMQ applications need portable clocks, so our API should provide them.
  • A reactor to replace zmq_poll(3). The poll loop is simple but clumsy. Writing a lot of these, we end up doing the same work over and over: calculating timers, and calling code when sockets are ready. A simple reactor with socket readers, and timers, would save a lot of repeated work.
  • Proper handling of Ctrl-C. We already saw how to catch an interrupt. It would be useful if this happened in all applications.

Turning this wishlist into reality gives us CZMQ, a high-level C API for ØMQ. This high-level binding in fact developed out of earlier versions of the Guide. It combines nicer semantics for working with ØMQ with some portability layers, and (importantly for C but less for other languages) containers like hashes and lists.

Here is the LRU queue broker rewritten to use CZMQ:

--
-- Least-recently used (LRU) queue device
-- Demonstrates use of the msg class
--
-- While this example runs in a single process, that is just to make
-- it easier to start and stop the example. Each thread has its own
-- context and conceptually acts as a separate process.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

require"zmq"
require"zmq.threads"
require"zmq.poller"
require"zmsg"

local tremove = table.remove

local NBR_CLIENTS = 10
local NBR_WORKERS = 3

local pre_code = [[
local identity, seed = …
local zmq = require"zmq"
local zmsg = require"zmsg"
require"zhelpers"
math.randomseed(seed)
]]

-- Basic request-reply client using REQ socket
--
local client_task = pre_code .. [[
local context = zmq.init(1)
local client = context:socket(zmq.REQ)
client:setopt(zmq.IDENTITY, identity) -- Set a printable identity
client:connect("ipc://frontend.ipc")

-- Send request, get reply
client:send("HELLO")
local reply = client:recv()
printf ("Client: %s\n", reply)

client:close()
context:term()
]]

-- Worker using REQ socket to do LRU routing
--
local worker_task = pre_code .. [[
local context = zmq.init(1)
local worker = context:socket(zmq.REQ)
worker:setopt(zmq.IDENTITY, identity) -- Set a printable identity
worker:connect("ipc://backend.ipc")

-- Tell broker we're ready for work
worker:send("READY")

while true do
local msg = zmsg.recv (worker)
printf ("Worker: %s\n", msg:body())
msg:body_set("OK")
msg:send(worker)
end
worker:close()
context:term()
]]

s_version_assert (2, 1)

-- Prepare our context and sockets
local context = zmq.init(1)
local frontend = context:socket(zmq.ROUTER)
local backend = context:socket(zmq.ROUTER)
frontend:bind("ipc://frontend.ipc")
backend:bind("ipc://backend.ipc")

local clients = {}
for n=1,NBR_CLIENTS do
local identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
local seed = os.time() + math.random()
clients[n] = zmq.threads.runstring(context, client_task, identity, seed)
clients[n]:start()
end
local workers = {}
for n=1,NBR_WORKERS do
local identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
local seed = os.time() + math.random()
workers[n] = zmq.threads.runstring(context, worker_task, identity, seed)
workers[n]:start(true)
end

-- Logic of LRU loop
-- - Poll backend always, frontend only if 1+ worker ready
-- - If worker replies, queue worker as ready and forward reply
-- to client if necessary
-- - If client requests, pop next worker and send request to it

-- Queue of available workers
local worker_queue = {}

local is_accepting = false
local max_requests = #clients

local poller = zmq.poller(2)

local function frontend_cb()
-- Now get next client request, route to next worker
local msg = zmsg.recv (frontend)

-- Dequeue a worker from the queue.
local worker = tremove(worker_queue, 1)

msg:wrap(worker, "")
msg:send(backend)

if (#worker_queue == 0) then
-- stop accepting work from clients, when no workers are available.
poller:remove(frontend)
is_accepting = false
end
end

poller:add(backend, zmq.POLLIN, function()
local msg = zmsg.recv(backend)
-- Use worker address for LRU routing
worker_queue[#worker_queue + 1] = msg:unwrap()

-- start accepting client requests, if we are not already doing so.
if not is_accepting then
is_accepting = true
poller:add(frontend, zmq.POLLIN, frontend_cb)
end

-- Forward message to client if it's not a READY
if (msg:address() ~= "READY") then
msg:send(frontend)

max_requests = max_requests - 1
if (max_requests == 0) then
poller:stop() -- Exit after N messages
end
end
end)

-- start poller's event loop
poller:start()

frontend:close()
backend:close()
context:term()

for n=1,NBR_CLIENTS do
assert(clients[n]:join())
end
-- workers are detached, we don't need to join with them.

lruqueue2.lua: LRU queue broker using CZMQ

One thing CZMQ provides is clean interrupt handling. This means that Ctrl-C will cause any blocking ØMQ call to exit with a return code -1 and errno set to EINTR. The CZMQ message recv methods will return NULL in such cases. So, you can cleanly exit a loop like this:

while (1) {
zmsg_t *zmsg = zmsg_recv (worker);
zframe_print (zmsg_last (zmsg), "Worker: ");
zframe_reset (zmsg_last (zmsg), "OK", 2);
zmsg_send (&zmsg, worker);
}

Or, if you're doing zmq_poll, test on the return code:

while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}

The previous example still uses zmq_poll(3). So how about reactors? The CZMQ zloop reactor is simple but functional. It lets you:

  • Set a reader on any socket, i.e. code that is called whenever the socket has input.
  • Cancel a reader on a socket.
  • Set a timer that goes off once or multiple times at specific intervals.

zloop of course uses zmq_poll(3) internally. It rebuilds its poll set each time you add or remove readers, and it calculates the poll timeout to match the next timer. Then, it calls the reader and timer handlers for each socket and timer that needs attention.

When we use a reactor pattern, our code turns inside out. The main logic looks like this:

int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
if (rc == -1)
break; // Interrupted

While the actual handling of messages sits inside dedicated functions or methods. You may not like the style, it's a matter of taste. What it does help with is mixing timers and socket activity. In the rest of this text we'll use zmq_poll(3) in simpler cases, and zloop in more complex examples.

Here is the LRU queue broker rewritten once again, this time to use zloop:

//
// Least-recently used (LRU) queue device
// Demonstrates use of the CZMQ API and reactor style
//
// The client and worker tasks are identical from the previous example.

#include "czmq.h"

#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // Signals worker is ready

// Basic request-reply client using REQ socket
//

static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://frontend.ipc");

// Send request, get reply
while (true) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break;
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}

// Worker using REQ socket to do LRU routing
//

static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://backend.ipc");

// Tell broker we're ready for work
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);

// Process messages as they arrive
while (true) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted
//zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}

// Our LRU queue structure, passed to reactor handlers
typedef struct {
void *frontend; // Listen to clients
void *backend; // Listen to workers
zlist_t *workers; // List of ready workers
} lruqueue_t;

// In the reactor design, each time a message arrives on a socket, the
// reactor passes it to a handler function. We have two handlers; one
// for the frontend, one for the backend:

// Handle input from client, on frontend
int s_handle_frontend (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
lruqueue_t *self = (lruqueue_t *) arg;
zmsg_t *msg = zmsg_recv (self->frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (self->workers));
zmsg_send (&msg, self->backend);

// Cancel reader on frontend if we went from 1 to 0 workers
if (zlist_size (self->workers) == 0) {
zmq_pollitem_t poller = { self->frontend, 0, ZMQ_POLLIN };
zloop_poller_end (loop, &poller);
}
}
return 0;
}

// Handle input from worker, on backend
int s_handle_backend (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
// Use worker address for LRU routing
lruqueue_t *self = (lruqueue_t *) arg;
zmsg_t *msg = zmsg_recv (self->backend);
if (msg) {
zframe_t *address = zmsg_unwrap (msg);
zlist_append (self->workers, address);

// Enable reader on frontend if we went from 0 to 1 workers
if (zlist_size (self->workers) == 1) {
zmq_pollitem_t poller = { self->frontend, 0, ZMQ_POLLIN };
zloop_poller (loop, &poller, s_handle_frontend, self);
}
// Forward message to client if it's not a READY
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, self->frontend);
}
return 0;
}

// And the main task now sets-up child tasks, then starts its reactor.
// If you press Ctrl-C, the reactor exits and the main task shuts down.
// Since the reactor is a CZMQ class, this example may not translate
// into all languages equally well.

int main (void)
{
zctx_t *ctx = zctx_new ();
lruqueue_t *self = (lruqueue_t *) zmalloc (sizeof (lruqueue_t));
self->frontend = zsocket_new (ctx, ZMQ_ROUTER);
self->backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (self->frontend, "ipc://frontend.ipc");
zsocket_bind (self->backend, "ipc://backend.ipc");

int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (worker_task, NULL);

// Queue of available workers
self->workers = zlist_new ();

// Prepare reactor and fire it up
zloop_t *reactor = zloop_new ();
zmq_pollitem_t poller = { self->backend, 0, ZMQ_POLLIN };
zloop_poller (reactor, &poller, s_handle_backend, self);
zloop_start (reactor);
zloop_destroy (&reactor);

// When we're done, clean up properly
while (zlist_size (self->workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (self->workers);
zframe_destroy (&frame);
}
zlist_destroy (&self->workers);
zctx_destroy (&ctx);
free (self);
return 0;
}

lruqueue3.c: LRU queue broker using zloop

Getting applications to properly shut-down when you send them Ctrl-C can be tricky. If you use the zctx class it'll automatically set-up signal handling, but your code still has to cooperate. You must break any loop if zmq_poll returns -1 or if any of the recv methods (zstr_recv, zframe_recv, zmsg_recv) return NULL. If you have nested loops, it can be useful to make the outer ones conditional on !zctx_interrupted.

Asynchronous Client-Server

topprevnext

In the router-to-dealer example we saw a 1-to-N use case where one client talks asynchronously to multiple workers. We can turn this upside-down to get a very useful N-to-1 architecture where various clients talk to a single server, and do this asynchronously.

Figure 43 - Asynchronous Client-Server

fig43.png

Here's how it works:

  • Clients connect to the server and send requests.
  • For each request, the server sends 0 to N replies.
  • Clients can send multiple requests without waiting for a reply.
  • Servers can send multiple replies without waiting for new requests.

Here's code that shows how this works:

--
-- Asynchronous client-to-server (DEALER to ROUTER)
--
-- While this example runs in a single process, that is just to make
-- it easier to start and stop the example. Each task has its own
-- context and conceptually acts as a separate process.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

require"zmq"
require"zmq.threads"
require"zmsg"
require"zhelpers"

local NBR_CLIENTS = 3

-- ---------------------------------------------------------------------
-- This is our client task
-- It connects to the server, and then sends a request once per second
-- It collects responses as they arrive, and it prints them out. We will
-- run several client tasks in parallel, each with a different random ID.

local client_task = [[
local identity, seed = …
local zmq = require"zmq"
require"zmq.poller"
require"zmq.threads"
local zmsg = require"zmsg"
require"zhelpers"
math.randomseed(seed)

local context = zmq.init(1)
local client = context:socket(zmq.DEALER)

-- Generate printable identity for the client
client:setopt(zmq.IDENTITY, identity)
client:connect("tcp://localhost:5570")

local poller = zmq.poller(2)

poller:add(client, zmq.POLLIN, function()
local msg = zmsg.recv (client)
printf ("%s: %s\n", identity, msg:body())
end)
local request_nbr = 0
while true do
-- Tick once per second, pulling in arriving messages
local centitick
for centitick=1,100 do
poller:poll(10000)
end
local msg = zmsg.new()
request_nbr = request_nbr + 1
msg:body_fmt("request #%d", request_nbr)
msg:send(client)
end
-- Clean up and end task properly
client:close()
context:term()
]]

-- ---------------------------------------------------------------------
-- This is our server task
-- It uses the multithreaded server model to deal requests out to a pool
-- of workers and route replies back to clients. One worker can handle
-- one request at a time but one client can talk to multiple workers at
-- once.

local server_task = [[
local server_worker = …
local zmq = require"zmq"
require"zmq.poller"
require"zmq.threads"
local zmsg = require"zmsg"
require"zhelpers"
math.randomseed(os.time())

local context = zmq.init(1)

-- Frontend socket talks to clients over TCP
local frontend = context:socket(zmq.ROUTER)
frontend:bind("tcp://*:5570")

-- Backend socket talks to workers over inproc
local backend = context:socket(zmq.DEALER)
backend:bind("inproc://backend")

-- Launch pool of worker threads, precise number is not critical
local workers = {}
for n=1,5 do
local seed = os.time() + math.random()
workers[n] = zmq.threads.runstring(context, server_worker, seed)
workers[n]:start()
end
-- Connect backend to frontend via a queue device
-- We could do this:
-- zmq:device(.QUEUE, frontend, backend)
-- But doing it ourselves means we can debug this more easily

local poller = zmq.poller(2)

poller:add(frontend, zmq.POLLIN, function()
local msg = zmsg.recv (frontend)
--print ("Request from client:")
--msg:dump()
msg:send(backend)
end)
poller:add(backend, zmq.POLLIN, function()
local msg = zmsg.recv (backend)
--print ("Reply from worker:")
--msg:dump()
msg:send(frontend)
end)
-- Switch messages between frontend and backend
poller:start()

for n=1,5 do
assert(workers[n]:join())
end
frontend:close()
backend:close()
context:term()
]]

-- Accept a request and reply with the same text a random number of
-- times, with random delays between replies.
--
local server_worker = [[
local seed = …
local zmq = require"zmq"
require"zmq.threads"
local zmsg = require"zmsg"
require"zhelpers"
math.randomseed(seed)
local threads = require"zmq.threads"
local context = threads.get_parent_ctx()

local worker = context:socket(zmq.DEALER)
worker:connect("inproc://backend")

while true do
-- The DEALER socket gives us the address envelope and message
local msg = zmsg.recv (worker)
assert (msg:parts() == 2)

-- Send 0..4 replies back
local reply
local replies = randof (5)
for reply=1,replies do
-- Sleep for some fraction of a second
s_sleep (randof (1000) + 1)
local dup = msg:dup()
dup:send(worker)
end
end
worker:close()
]]

-- This main thread simply starts several clients, and a server, and then
-- waits for the server to finish.
--

s_version_assert (2, 1)

local clients = {}
for n=1,NBR_CLIENTS do
local identity = string.format("%04X", randof (0x10000))
local seed = os.time() + math.random()
clients[n] = zmq.threads.runstring(nil, client_task, identity, seed)
clients[n]:start()
end

local server = zmq.threads.runstring(nil, server_task, server_worker)
assert(server:start())
assert(server:join())

asyncsrv.lua: Asynchronous client-server

Just run that example by itself. Like other multi-task examples, it runs in a single process but each task has its own context and conceptually acts as a separate process. You will see three clients (each with a random ID), printing out the replies they get from the server. Look carefully and you'll see each client task gets 0 or more replies per request.

Some comments on this code:

  • The clients send a request once per second, and get zero or more replies back. To make this work using zmq_poll(3), we can't simply poll with a 1-second timeout, or we'd end up sending a new request only one second after we received the last reply. So we poll at a high frequency (100 times at 1/100th of a second per poll), which is approximately accurate. This means the server could use requests as a form of heartbeat, i.e. detecting when clients are present or disconnected.
  • The server uses a pool of worker threads, each processing one request synchronously. It connects these to its frontend socket using an internal queue. To help debug this, the code implements its own queue device logic. In the C code, you can uncomment the zmsg_dump() calls to get debugging output.

Figure 44 - Detail of Asynchronous Server

fig44.png

Note that we're doing a dealer-to-router dialog between client and server, but internally between the server main thread and workers we're doing dealer-to-dealer. If the workers were strictly synchronous, we'd use REP. But since we want to send multiple replies we need an async socket. We do not want to route replies, they always go to the single server thread that sent us the request.

Let's think about the routing envelope. The client sends a simple message. The server thread receives a two-part message (real message prefixed by client identity). We have two possible designs for the server-to-worker interface:

  • Workers get unaddressed messages, and we manage the connections from server thread to worker threads explicitly using a router socket as backend. This would require that workers start by telling the server they exist, which can then route requests to workers and track which client is 'connected' to which worker. This is the LRU pattern we already covered.
  • Workers get addressed messages, and they return addressed replies. This requires that workers can properly decode and recode envelopes but it doesn't need any other mechanisms.

The second design is much simpler, so that's what we use:

     client          server       frontend       worker
   [ DEALER ]<---->[ ROUTER <----> DEALER <----> DEALER ]
             1 part         2 parts       2 parts

When you build servers that maintain stateful conversations with clients, you will run into a classic problem. If the server keeps some state per client, and clients keep coming and going, eventually it will run out of resources. Even if the same clients keep connecting, if you're using default identities, each connection will look like a new one.

We cheat in the above example by keeping state only for a very short time (the time it takes a worker to process a request) and then throwing away the state. But that's not practical for many cases.

To properly manage client state in a stateful asynchronous server you must:

  • Do heartbeating from client to server. In our example we send a request once per second, which can reliably be used as a heartbeat.
  • Store state using the client identity (whether generated or explicit) as key.
  • Detect a stopped heartbeat. If there's no request from a client within, say, two seconds, the server can detect this and destroy any state it's holding for that client.

Worked Example: Inter-Broker Routing

topprevnext

Let's take everything we've seen so far, and scale things up. Our best client calls us urgently and asks for a design of a large cloud computing facility. He has this vision of a cloud that spans many data centers, each a cluster of clients and workers, and that works together as a whole.

Because we're smart enough to know that practice always beats theory, we propose to make a working simulation using ØMQ. Our client, eager to lock down the budget before his own boss changes his mind, and having read great things about ØMQ on Twitter, agrees.

Establishing the Details

topprevnext

Several espressos later, we want to jump into writing code but a little voice tells us to get more details before making a sensational solution to entirely the wrong problem. "What kind of work is the cloud doing?", we ask. The client explains:

  • Workers run on various kinds of hardware, but they are all able to handle any task. There are several hundred workers per cluster, and as many as a dozen clusters in total.
  • Clients create tasks for workers. Each task is an independent unit of work and all the client wants is to find an available worker, and send it the task, as soon as possible. There will be a lot of clients and they'll come and go arbitrarily.
  • The real difficulty is to be able to add and remove clusters at any time. A cluster can leave or join the cloud instantly, bringing all its workers and clients with it.
  • If there are no workers in their own cluster, clients' tasks will go off to other available workers in the cloud.
  • Clients send out one task at a time, waiting for a reply. If they don't get an answer within X seconds they'll just send out the task again. This ain't our concern, the client API does it already.
  • Workers process one task at a time, they are very simple beasts. If they crash, they get restarted by whatever script started them.

So we double check to make sure that we understood this correctly:

  • "There will be some kind of super-duper network interconnect between clusters, right?", we ask. The client says, "Yes, of course, we're not idiots."
  • "What kind of volumes are we talking about?", we ask. The client replies, "Up to a thousand clients per cluster, each doing max. ten requests per second. Requests are small, and replies are also small, no more than 1K bytes each."

So we do a little calculation and see that this will work nicely over plain TCP. 2,500 clients x 10/second x 1,000 bytes x 2 directions = 50MB/sec or 400Mb/sec, not a problem for a 1Gb network.

It's a straight-forward problem that requires no exotic hardware or protocols, just some clever routing algorithms and careful design. We start by designing one cluster (one data center) and then we figure out how to connect clusters together.

Architecture of a Single Cluster

topprevnext

Workers and clients are synchronous. We want to use the LRU pattern to route tasks to workers. Workers are all identical, our facility has no notion of different services. Workers are anonymous, clients never address them directly. We make no attempt here to provide guaranteed delivery, retry, etc.

For reasons we already looked at, clients and workers won't speak to each other directly. It makes it impossible to add or remove nodes dynamically. So our basic model consists of the request-reply message broker we saw earlier.

Figure 45 - Cluster Architecture

fig45.png

Scaling to Multiple Clusters

topprevnext

Now we scale this out to more than one cluster. Each cluster has a set of clients and workers, and a broker that joins these together:

Figure 46 - Multiple Clusters

fig46.png

The question is: how do we get the clients of each cluster talking to the workers of the other cluster? There are a few possibilities, each with pros and cons:

  • Clients could connect directly to both brokers. The advantage is that we don't need to modify brokers or workers. But clients get more complex, and become aware of the overall topology. If we want to add, e.g. a third or forth cluster, all the clients are affected. In effect we have to move routing and fail-over logic into the clients and that's not nice.
  • Workers might connect directly to both brokers. But mama workers can't do that, they can only reply to one broker. We might use papas but papas don't give us customizable broker-to-worker routing like LRU, only the built-in load balancing. That's a fail, if we want to distribute work to idle workers: we precisely need LRU. One solution would be to use router sockets for the worker nodes. Let's label this "Idea #1".
  • Brokers could connect to each other. This looks neatest because it creates the fewest additional connections. We can't add clusters on the fly but that is probably out of scope. Now clients and workers remain ignorant of the real network topology, and brokers tell each other when they have spare capacity. Let's label this "Idea #2".

Let's explore Idea #1. In this model we have workers connecting to both brokers and accepting jobs from either.

Figure 47 - Idea 1 - Cross-connected Workers

fig47.png

It looks feasible. However it doesn't provide what we wanted, which was that clients get local workers if possible and remote workers only if it's better than waiting. Also workers will signal "ready" to both brokers and can get two jobs at once, while other workers remain idle. It seems this design fails because again we're putting routing logic at the edges.

So idea #2 then. We interconnect the brokers and don't touch the clients or workers, which are mamas like we're used to.

Figure 48 - Idea 2 - Brokers Talking to Each Other

fig48.png

This design is appealing because the problem is solved in one place, invisible to the rest of the world. Basically, brokers open secret channels to each other and whisper, like camel traders, "Hey, I've got some spare capacity, if you have too many clients give me a shout and we'll deal".

It is in effect just a more sophisticated routing algorithm: brokers become subcontractors for each other. Other things to like about this design, even before we play with real code:

  • It treats the common case (clients and workers on the same cluster) as default and does extra work for the exceptional case (shuffling jobs between clusters).
  • It lets us use different message flows for the different types of work. That means we can handle them differently, e.g. using different types of network connection.
  • It feels like it would scale smoothly. Interconnecting three, or more brokers doesn't get over-complex. If we find this to be a problem, it's easy to solve by adding a super-broker.

We'll now make a worked example. We'll pack an entire cluster into one process. That is obviously not realistic but it makes it simple to simulate, and the simulation can accurately scale to real processes. This is the beauty of ØMQ, you can design at the microlevel and scale that up to the macro level. Threads become processes, become boxes and the patterns and logic remain the same. Each of our 'cluster' processes contains client threads, worker threads, and a broker thread.

We know the basic model well by now:

  • The mama client (REQ) threads create workloads and pass them to the broker (ROUTER).
  • The mama worker (REQ) threads process workloads and return the results to the broker (ROUTER).
  • The broker queues and distributes workloads using the LRU routing model.

Federation vs. Peering

topprevnext

There are several possible ways to interconnect brokers. What we want is to be able to tell other brokers, "we have capacity", and then receive multiple tasks. We also need to be able to tell other brokers "stop, we're full". It doesn't need to be perfect: sometimes we may accept jobs we can't process immediately, then we'll do them as soon as possible.

The simplest interconnect is federation in which brokers simulate clients and workers for each other. We would do this by connecting our frontend to the other broker's backend socket. Note that it is legal to both bind a socket to an endpoint and connect it to other endpoints.

Figure 49 - Cross-connected Brokers in Federation Model

fig49.png

This would give us simple logic in both brokers and a reasonably good mechanism: when there are no clients, tell the other broker 'ready', and accept one job from it. The problem is also that it is too simple for this problem. A federated broker would be able to handle only one task at once. If the broker emulates a lock-step client and worker, it is by definition also going to be lock-step and if it has lots of available workers they won't be used. Our brokers need to be connected in a fully asynchronous fashion.

The federation model is perfect for other kinds of routing, especially service-oriented architectures or SOAs (which route by service name and proximity rather than LRU or load-balancing or random scatter). So don't dismiss it as useless, it's just not right for least-recently used and cluster load-balancing.

So instead of federation, let's look at a peering approach in which brokers are explicitly aware of each other and talk over privileged channels. Let's break this down, assuming we want to interconnect N brokers. Each broker has (N - 1) peers, and all brokers are using exactly the same code and logic. There are two distinct flows of information between brokers:

  • Each broker needs to tell its peers how many workers it has available at any time. This can be fairly simple information, just a quantity that is updated regularly. The obvious (and correct) socket pattern for this is publish-subscribe. So every broker opens a PUB socket and publishes state information on that, and every broker also opens a SUB socket and connects that to the PUB socket of every other broker, to get state information from its peers.
  • Each broker needs a way to delegate tasks to a peer and get replies back, asynchronously. We'll do this using router/router (ROUTER/ROUTER) sockets, no other combination works. Each broker has two such sockets: one for tasks it receives, one for tasks it delegates. If we didn't use two sockets it would be more work to know whether we were reading a request or a reply each time. That would mean adding more information to the message envelope.

And there is also the flow of information between a broker and its local clients and workers.

The Naming Ceremony

topprevnext

Three flows x two sockets for each flow = six sockets that we have to manage in the broker. Choosing good names is vital to keeping a multi-socket juggling act reasonably coherent in our minds. Sockets do something and what they do should form the basis for their names. It's about being able to read the code several weeks later on a cold Monday morning before coffee, and not feeling pain.

Let's do a shamanistic naming ceremony for the sockets. The three flows are:

  • A local request-reply flow between the broker and its clients and workers.
  • A cloud request-reply flow between the broker and its peer brokers.
  • A state flow between the broker and its peer brokers.

Finding meaningful names that are all the same length means our code will align beautifully. It may seem irrelevant but such attention to such details turn ordinary code into something more like art.

For each flow the broker has two sockets that we can orthogonally call the "frontend" and "backend". We've used these names quite often. A frontend receives information or tasks. A backend sends those out to other peers. The conceptual flow is from front to back (with replies going in the opposite direction from back to front).

So in all the code we write for this tutorial will use these socket names:

  • localfe and localbe for the local flow.
  • cloudfe and cloudbe for the cloud flow.
  • statefe and statebe for the state flow.

For our transport we'll use ipc for everything. This has the advantage of working like tcp in terms of connectivity (i.e. it's a disconnected transport, unlike inproc), yet we don't need IP addresses or DNS names, which would be a pain here. Instead, we will use ipc endpoints called something-local, something-cloud, and something-state, where something is the name of our simulated cluster.

You may be thinking that this is a lot of work for some names. Why not call them s1, s2, s3, s4, etc.? The answer is that if your brain is not a perfect machine, you need a lot of help when reading code, and we'll see that these names do help. It is a lot easier to remember "three flows, two directions" than "six different sockets".

Figure 50 - Broker Socket Arrangement

fig50.png

Note that we connect the cloudbe in each broker to the cloudfe in every other broker, and likewise we connect the statebe in each broker to the statefe in every other broker.

Prototyping the State Flow

topprevnext

Since each socket flow has its own little traps for the unwary, we will test them in real code one by one, rather than try to throw the whole lot into code in one go. When we're happy with each flow, we can put them together into a full program. We'll start with the state flow.

Figure 51 - The State Flow

fig51.png

Here is how this works in code:

--
-- Broker peering simulation (part 1)
-- Prototypes the state flow
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zmq.poller"
require"zmsg"

-- First argument is this broker's name
-- Other arguments are our peers' names
--
if (#arg < 1) then
printf ("syntax: peering1 me doyouend…\n")
os.exit(-1)
end
local self = arg[1]
printf ("I: preparing broker at %s…\n", self)
math.randomseed(os.time())

-- Prepare our context and sockets
local context = zmq.init(1)

-- Bind statebe to endpoint
local statebe = context:socket(zmq.PUB)
local endpoint = string.format("ipc://%s-state.ipc", self)
assert(statebe:bind(endpoint))

-- Connect statefe to all peers
local statefe = context:socket(zmq.SUB)
statefe:setopt(zmq.SUBSCRIBE, "", 0)

for n=2,#arg do
local peer = arg[n]
printf ("I: connecting to state backend at '%s'\n", peer)
local endpoint = string.format("ipc://%s-state.ipc", peer)
assert(statefe:connect(endpoint))
end

local poller = zmq.poller(1)
-- Send out status messages to peers, and collect from peers
-- The zmq_poll timeout defines our own heartbeating
--
poller:add(statefe, zmq.POLLIN, function()
local msg = zmsg.recv (statefe)
printf ("%s - %s workers free\n",
msg:address(), msg:body())
end)
while true do
-- Poll for activity, or 1 second timeout
local count = assert(poller:poll(1000000))

-- if no other activity.
if count == 0 then
-- Send random value for worker availability
local msg = zmsg.new()
msg:body_fmt("%d", randof (10))
-- We stick our own address onto the envelope
msg:wrap(self, nil)
msg:send(statebe)
end
end
-- We never get here but clean up anyhow
statebe:close()
statefe:close()
context:term()

peering1.lua: Prototype state flow

Notes about this code:

  • Each broker has an identity that we use to construct ipc endpoint names. A real broker would need to work with TCP and a more sophisticated configuration scheme. We'll look at such schemes later in this book but for now, using generated ipc names lets us ignore the problem of where to get TCP/IP addresses or names from.
  • We use a zmq_poll(3) loop as the core of the program. This processes incoming messages and sends out state messages. We send a state message only if we did not get any incoming messages and we waited for a second. If we send out a state message each time we get one in, we'll get message storms.
  • We use a two-part pubsub message consisting of sender address and data. Note that we will need to know the address of the publisher in order to send it tasks, and the only way is to send this explicitly as a part of the message.
  • We don't set identities on subscribers, because if we did then we'd get out of date state information when connecting to running brokers.
  • We don't set a HWM on the publisher, but if we were using ØMQ/2.x that would be a wise idea.

We can build this little program and run it three times to simulate three clusters. Let's call them DC1, DC2, and DC3 (the names are arbitrary). We run these three commands, each in a separate window:

peering1 DC1 DC2 DC3  #  Start DC1 and connect to DC2 and DC3
peering1 DC2 DC1 DC3  #  Start DC2 and connect to DC1 and DC3
peering1 DC3 DC1 DC2  #  Start DC3 and connect to DC1 and DC2

You'll see each cluster report the state of its peers, and after a few seconds they will all happily be printing random numbers once per second. Try this and satisfy yourself that the three brokers all match up and synchronize to per-second state updates.

In real life we'd not send out state messages at regular intervals but rather whenever we had a state change, i.e. whenever a worker becomes available or unavailable. That may seem like a lot of traffic but state messages are small and we've established that the inter-cluster connections are super-fast.

If we wanted to send state messages at precise intervals we'd create a child thread and open the statebe socket in that thread. We'd then send irregular state updates to that child thread from our main thread, and allow the child thread to conflate them into regular outgoing messages. This is more work than we need here.

Prototyping the Local and Cloud Flows

topprevnext

Let's now prototype at the flow of tasks via the local and cloud sockets. This code pulls requests from clients and then distributes them to local workers and cloud peers on a random basis.

Figure 52 - The Flow of Tasks

fig52.png

Before we jump into the code, which is getting a little complex, let's sketch the core routing logic and break it down into a simple but robust design.

We need two queues, one for requests from local clients and one for requests from cloud clients. One option would be to pull messages off the local and cloud frontends, and pump these onto their respective queues. But this is kind of pointless because ØMQ sockets are queues already. So let's use the ØMQ socket buffers as queues.

This was the technique we used in the LRU queue broker, and it worked nicely. We only read from the two frontends when there is somewhere to send the requests. We can always read from the backends, since they give us replies to route back. As long as the backends aren't talking to us, there's no point in even looking at the frontends.

So our main loop becomes:

  • Poll the backends for activity. When we get a message, it may be "READY" from a worker or it may be a reply. If it's a reply, route back via the local or cloud frontend.
  • If a worker replied, it became available, so we queue it and count it.
  • While there are workers available, take a request, if any, from either frontend and route to a local worker, or randomly, a cloud peer.

Randomly sending tasks to a peer broker rather than a worker simulates work distribution across the cluster. It's dumb but that is fine for this stage.

We use broker identities to route messages between brokers. Each broker has a name, which we provide on the command line in this simple prototype. As long as these names don't overlap with the ØMQ-generated UUIDs used for client nodes, we can figure out whether to route a reply back to a client or to a broker.

Here is how this works in code. The interesting part starts around the comment "Interesting part".

--
-- Broker peering simulation (part 2)
-- Prototypes the request-reply flow
--
-- While this example runs in a single process, that is just to make
-- it easier to start and stop the example. Each thread has its own
-- context and conceptually acts as a separate process.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

require"zmq"
require"zmq.poller"
require"zmq.threads"
require"zmsg"

local tremove = table.remove

local NBR_CLIENTS = 10
local NBR_WORKERS = 3

local pre_code = [[
local self, seed = …
local zmq = require"zmq"
local zmsg = require"zmsg"
require"zhelpers"
math.randomseed(seed)
local context = zmq.init(1)

]]

-- Request-reply client using REQ socket
--
local client_task = pre_code .. [[
local client = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(client:connect(endpoint))

while true do
-- Send request, get reply
local msg = zmsg.new ("HELLO")
msg:send(client)
msg = zmsg.recv (client)
printf ("I: client status: %s\n", msg:body())
end
-- We never get here but if we did, this is how we'd exit cleanly
client:close()
context:term()
]]

-- Worker using REQ socket to do LRU routing
--
local worker_task = pre_code .. [[
local worker = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(worker:connect(endpoint))

-- Tell broker we're ready for work
local msg = zmsg.new ("READY")
msg:send(worker)

while true do
msg = zmsg.recv (worker)
-- Do some 'work'
s_sleep (1000)
msg:body_fmt("OK - %04x", randof (0x10000))
msg:send(worker)
end
-- We never get here but if we did, this is how we'd exit cleanly
worker:close()
context:term()
]]

-- First argument is this broker's name
-- Other arguments are our peers' names
--
s_version_assert (2, 1)
if (#arg < 1) then
printf ("syntax: peering2 me doyouend…\n")
os.exit(-1)
end
-- Our own name; in practice this'd be configured per node
local self = arg[1]
printf ("I: preparing broker at %s…\n", self)
math.randomseed(os.time())

-- Prepare our context and sockets
local context = zmq.init(1)

-- Bind cloud frontend to endpoint
local cloudfe = context:socket(zmq.ROUTER)
local endpoint = string.format("ipc://%s-cloud.ipc", self)
cloudfe:setopt(zmq.IDENTITY, self)
assert(cloudfe:bind(endpoint))

-- Connect cloud backend to all peers
local cloudbe = context:socket(zmq.ROUTER)
cloudbe:setopt(zmq.IDENTITY, self)

local peers = {}
for n=2,#arg do
local peer = arg[n]
-- add peer name to peers list.
peers[#peers + 1] = peer
peers[peer] = true -- map peer's name to 'true' for fast lookup
printf ("I: connecting to cloud frontend at '%s'\n", peer)
local endpoint = string.format("ipc://%s-cloud.ipc", peer)
assert(cloudbe:connect(endpoint))
end
-- Prepare local frontend and backend
local localfe = context:socket(zmq.ROUTER)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(localfe:bind(endpoint))

local localbe = context:socket(zmq.ROUTER)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(localbe:bind(endpoint))

-- Get user to tell us when we can start…
printf ("Press Enter when all brokers are started: ")
io.read('*l')

-- Start local workers
local workers = {}
for n=1,NBR_WORKERS do
local seed = os.time() + math.random()
workers[n] = zmq.threads.runstring(nil, worker_task, self, seed)
workers[n]:start(true)
end
-- Start local clients
local clients = {}
for n=1,NBR_CLIENTS do
local seed = os.time() + math.random()
clients[n] = zmq.threads.runstring(nil, client_task, self, seed)
clients[n]:start(true)
end

-- Interesting part
-- -------------------------------------------------------------
-- Request-reply flow
-- - Poll backends and process local/cloud replies
-- - While worker available, route localfe to local or cloud

-- Queue of available workers
local worker_queue = {}
local backends = zmq.poller(2)

local function send_reply(msg)
local address = msg:address()
-- Route reply to cloud if it's addressed to a broker
if peers[address] then
msg:send(cloudfe) -- reply is for a peer.
else
msg:send(localfe) -- reply is for a local client.
end
end

backends:add(localbe, zmq.POLLIN, function()
local msg = zmsg.recv(localbe)

-- Use worker address for LRU routing
worker_queue[#worker_queue + 1] = msg:unwrap()
-- if reply is not "READY" then route reply back to client.
if (msg:address() ~= "READY") then
send_reply(msg)
end
end)

backends:add(cloudbe, zmq.POLLIN, function()
local msg = zmsg.recv(cloudbe)
-- We don't use peer broker address for anything
msg:unwrap()
-- send reply back to client.
send_reply(msg)
end)

local frontends = zmq.poller(2)
local localfe_ready = false
local cloudfe_ready = false

frontends:add(localfe, zmq.POLLIN, function() localfe_ready = true end)
frontends:add(cloudfe, zmq.POLLIN, function() cloudfe_ready = true end)

while true do
local timeout = (#worker_queue > 0) and 1000000 or -1
-- If we have no workers anyhow, wait indefinitely
rc = backends:poll(timeout)
assert (rc >= 0)

-- Now route as many clients requests as we can handle
--
while (#worker_queue > 0) do
rc = frontends:poll(0)
assert (rc >= 0)
local reroutable = false
local msg
-- We'll do peer brokers first, to prevent starvation
if (cloudfe_ready) then
cloudfe_ready = false -- reset flag
msg = zmsg.recv (cloudfe)
reroutable = false
elseif (localfe_ready) then
localfe_ready = false -- reset flag
msg = zmsg.recv (localfe)
reroutable = true
else
break; -- No work, go back to backends
end

-- If reroutable, send to cloud 20% of the time
-- Here we'd normally use cloud status information
--
local percent = randof (5)
if (reroutable and #peers > 0 and percent == 0) then
-- Route to random broker peer
local random_peer = randof (#peers) + 1
msg:wrap(peers[random_peer], nil)
msg:send(cloudbe)
else
-- Dequeue and drop the next worker address
local worker = tremove(worker_queue, 1)
msg:wrap(worker, "")
msg:send(localbe)
end
end
end
-- We never get here but clean up anyhow
localbe:close()
cloudbe:close()
localfe:close()
cloudfe:close()
context:term()

peering2.lua: Prototype local and cloud flow

Run this by, for instance, starting two instance of the broker in two windows:

peering2 me you
peering2 you me

Some comments on this code:

  • Using the zmsg class makes life much easier, and our code much shorter. It's obviously an abstraction that works, and which should form part of your toolbox as a ØMQ programmer.
  • Since we're not getting any state information from peers, we naively assume they are running. The code prompts you to confirm when you've started all the brokers. In the real case we'd not send anything to brokers who had not told us they exist.

You can satisfy yourself that the code works by watching it run forever. If there were any misrouted messages, clients would end up blocking, and the brokers would stop printing trace information. You can prove that by killing either of the brokers. The other broker tries to send requests to the cloud, and one by one its clients block, waiting for an answer.

Putting it All Together

topprevnext

Let's put this together into a single package. As before, we'll run an entire cluster as one process. We're going to take the two previous examples and merge them into one properly working design that lets you simulate any number of clusters.

This code is the size of both previous prototypes together, at 270 LoC. That's pretty good for a simulation of a cluster that includes clients and workers and cloud workload distribution. Here is the code:

--
-- Broker peering simulation (part 3)
-- Prototypes the full flow of status and tasks
--
-- While this example runs in a single process, that is just to make
-- it easier to start and stop the example. Each thread has its own
-- context and conceptually acts as a separate process.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--

require"zmq"
require"zmq.poller"
require"zmq.threads"
require"zmsg"

local tremove = table.remove

local NBR_CLIENTS = 10
local NBR_WORKERS = 5

local pre_code = [[
local self, seed = …
local zmq = require"zmq"
local zmsg = require"zmsg"
require"zhelpers"
math.randomseed(seed)
local context = zmq.init(1)

]]

-- Request-reply client using REQ socket
-- To simulate load, clients issue a burst of requests and then
-- sleep for a random period.
--
local client_task = pre_code .. [[
require"zmq.poller"

local client = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(client:connect(endpoint))

local monitor = context:socket(zmq.PUSH)
local endpoint = string.format("ipc://%s-monitor.ipc", self)
assert(monitor:connect(endpoint))

local poller = zmq.poller(1)
local task_id = nil

poller:add(client, zmq.POLLIN, function()
local msg = zmsg.recv (client)
-- Worker is supposed to answer us with our task id
assert (msg:body() == task_id)
-- mark task as processed.
task_id = nil
end)
local is_running = true
while is_running do
s_sleep (randof (5) * 1000)

local burst = randof (15)
while (burst > 0) do
burst = burst - 1
-- Send request with random hex ID
task_id = string.format("%04X", randof (0x10000))
local msg = zmsg.new(task_id)
msg:send(client)

-- Wait max ten seconds for a reply, then complain
rc = poller:poll(10 * 1000000)
assert (rc >= 0)

if task_id then
local msg = zmsg.new()
msg:body_fmt(
"E: CLIENT EXIT - lost task %s", task_id)
msg:send(monitor)
-- exit event loop
is_running = false
break
end
end
end
-- We never get here but if we did, this is how we'd exit cleanly
client:close()
monitor:close()
context:term()
]]

-- Worker using REQ socket to do LRU routing
--
local worker_task = pre_code .. [[
local worker = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(worker:connect(endpoint))

-- Tell broker we're ready for work
local msg = zmsg.new ("READY")
msg:send(worker)

while true do
-- Workers are busy for 0/1/2 seconds
msg = zmsg.recv (worker)
s_sleep (randof (2) * 1000)
msg:send(worker)
end
-- We never get here but if we did, this is how we'd exit cleanly
worker:close()
context:term()
]]

-- First argument is this broker's name
-- Other arguments are our peers' names
--
s_version_assert (2, 1)
if (#arg < 1) then
printf ("syntax: peering3 me doyouend…\n")
os.exit(-1)
end
-- Our own name; in practice this'd be configured per node
local self = arg[1]
printf ("I: preparing broker at %s…\n", self)
math.randomseed(os.time())

-- Prepare our context and sockets
local context = zmq.init(1)

-- Bind cloud frontend to endpoint
local cloudfe = context:socket(zmq.ROUTER)
local endpoint = string.format("ipc://%s-cloud.ipc", self)
cloudfe:setopt(zmq.IDENTITY, self)
assert(cloudfe:bind(endpoint))

-- Bind state backend / publisher to endpoint
local statebe = context:socket(zmq.PUB)
local endpoint = string.format("ipc://%s-state.ipc", self)
assert(statebe:bind(endpoint))

-- Connect cloud backend to all peers
local cloudbe = context:socket(zmq.ROUTER)
cloudbe:setopt(zmq.IDENTITY, self)

for n=2,#arg do
local peer = arg[n]
printf ("I: connecting to cloud frontend at '%s'\n", peer)
local endpoint = string.format("ipc://%s-cloud.ipc", peer)
assert(cloudbe:connect(endpoint))
end
-- Connect statefe to all peers
local statefe = context:socket(zmq.SUB)
statefe:setopt(zmq.SUBSCRIBE, "", 0)

local peers = {}
for n=2,#arg do
local peer = arg[n]
-- add peer name to peers list.
peers[#peers + 1] = peer
peers[peer] = 0 -- set peer's initial capacity to zero.
printf ("I: connecting to state backend at '%s'\n", peer)
local endpoint = string.format("ipc://%s-state.ipc", peer)
assert(statefe:connect(endpoint))
end
-- Prepare local frontend and backend
local localfe = context:socket(zmq.ROUTER)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(localfe:bind(endpoint))

local localbe = context:socket(zmq.ROUTER)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(localbe:bind(endpoint))

-- Prepare monitor socket
local monitor = context:socket(zmq.PULL)
local endpoint = string.format("ipc://%s-monitor.ipc", self)
assert(monitor:bind(endpoint))

-- Start local workers
local workers = {}
for n=1,NBR_WORKERS do
local seed = os.time() + math.random()
workers[n] = zmq.threads.runstring(nil, worker_task, self, seed)
workers[n]:start(true)
end
-- Start local clients
local clients = {}
for n=1,NBR_CLIENTS do
local seed = os.time() + math.random()
clients[n] = zmq.threads.runstring(nil, client_task, self, seed)
clients[n]:start(true)
end

-- Interesting part
-- -------------------------------------------------------------
-- Publish-subscribe flow
-- - Poll statefe and process capacity updates
-- - Each time capacity changes, broadcast new value
-- Request-reply flow
-- - Poll primary and process local/cloud replies
-- - While worker available, route localfe to local or cloud

-- Queue of available workers
local local_capacity = 0
local cloud_capacity = 0
local worker_queue = {}
local backends = zmq.poller(2)

local function send_reply(msg)
local address = msg:address()
-- Route reply to cloud if it's addressed to a broker
if peers[address] then
msg:send(cloudfe) -- reply is for a peer.
else
msg:send(localfe) -- reply is for a local client.
end
end

backends:add(localbe, zmq.POLLIN, function()
local msg = zmsg.recv(localbe)

-- Use worker address for LRU routing
local_capacity = local_capacity + 1
worker_queue[local_capacity] = msg:unwrap()
-- if reply is not "READY" then route reply back to client.
if (msg:address() ~= "READY") then
send_reply(msg)
end
end)

backends:add(cloudbe, zmq.POLLIN, function()
local msg = zmsg.recv(cloudbe)

-- We don't use peer broker address for anything
msg:unwrap()
-- send reply back to client.
send_reply(msg)
end)

backends:add(statefe, zmq.POLLIN, function()
local msg = zmsg.recv (statefe)
-- TODO: track capacity for each peer
cloud_capacity = tonumber(msg:body())
end)

backends:add(monitor, zmq.POLLIN, function()
local msg = zmsg.recv (monitor)
printf("%s\n", msg:body())
end)

local frontends = zmq.poller(2)
local localfe_ready = false
local cloudfe_ready = false

frontends:add(localfe, zmq.POLLIN, function() localfe_ready = true end)
frontends:add(cloudfe, zmq.POLLIN, function() cloudfe_ready = true end)

local MAX_BACKEND_REPLIES = 20

while true do
-- If we have no workers anyhow, wait indefinitely
local timeout = (local_capacity > 0) and 1000000 or -1
local rc, err = backends:poll(timeout)
assert (rc >= 0, err)

-- Track if capacity changes during this iteration
local previous = local_capacity

-- Now route as many clients requests as we can handle
-- - If we have local capacity we poll both localfe and cloudfe
-- - If we have cloud capacity only, we poll just localfe
-- - Route any request locally if we can, else to cloud
--
while ((local_capacity + cloud_capacity) > 0) do
local rc, err = frontends:poll(0)
assert (rc >= 0, err)

if (localfe_ready) then
localfe_ready = false
msg = zmsg.recv (localfe)
elseif (cloudfe_ready and local_capacity > 0) then
cloudfe_ready = false
-- we have local capacity poll cloud frontend for work.
msg = zmsg.recv (cloudfe)
else
break; -- No work, go back to primary
end

if (local_capacity > 0) then
-- Dequeue and drop the next worker address
local worker = tremove(worker_queue, 1)
local_capacity = local_capacity - 1
msg:wrap(worker, "")
msg:send(localbe)
else
-- Route to random broker peer
printf ("I: route request %s to cloud…\n",
msg:body())
local random_peer = randof (#peers) + 1
msg:wrap(peers[random_peer], nil)
msg:send(cloudbe)
end
end
if (local_capacity ~= previous) then
-- Broadcast new capacity
local msg = zmsg.new()
-- TODO: send our name with capacity.
msg:body_fmt("%d", local_capacity)
-- We stick our own address onto the envelope
msg:wrap(self, nil)
msg:send(statebe)
end
end
-- We never get here but clean up anyhow
localbe:close()
cloudbe:close()
localfe:close()
cloudfe:close()
statefe:close()
monitor:close()
context:term()

peering3.lua: Full cluster simulation

It's a non-trivial program and took about a day to get working. These are the highlights:

  • The client threads detect and report a failed request. They do this by polling for a response and if none arrives after a while (10 seconds), printing an error message.
  • Client threads don't print directly, but instead send a message to a 'monitor' socket (PUSH) that the main loop collects (PULL) and prints off. This is the first case we've seen of using ØMQ sockets for monitoring and logging; this is a big use case we'll come back to later.
  • Clients simulate varying loads to get the cluster 100% at random moments, so that tasks are shifted over to the cloud. The number of clients and workers, and delays in the client and worker threads control this. Feel free to play with them to see if you can make a more realistic simulation.
  • The main loop uses two pollsets. It could in fact use three: information, backends, and frontends. As in the earlier prototype, there is no point in taking a frontend message if there is no backend capacity.

These are some of the problems that hit during development of this program:

  • Clients would freeze, due to requests or replies getting lost somewhere. Recall that the ØMQ ROUTER/router socket drops messages it can't route. The first tactic here was to modify the client thread to detect and report such problems. Secondly, I put zmsg_dump() calls after every recv() and before every send() in the main loop, until it was clear what the problems were.
  • The main loop was mistakenly reading from more than one ready socket. This caused the first message to be lost. Fixed that by reading only from the first ready socket.
  • The zmsg class was not properly encoding UUIDs as C strings. This caused UUIDs that contain 0 bytes to be corrupted. Fixed by modifying zmsg to encode UUIDs as printable hex strings.

This simulation does not detect disappearance of a cloud peer. If you start several peers and stop one, and it was broadcasting capacity to the others, they will continue to send it work even if it's gone. You can try this, and you will get clients that complain of lost requests. The solution is twofold: first, only keep the capacity information for a short time so that if a peer does disappear, its capacity is quickly set to 'zero'. Second, add reliability to the request-reply chain. We'll look at reliability in the next chapter.