ØMQ - The Guide

By Pieter Hintjens

With thanks to the hundred or so people who contributed examples in two dozen programming languages, who helped with suggestions and fixes, and who kept pushing for more examples of how to connect your code.

Thanks to Bill Desmarais, Brian Dorsey, Daniel Lin, Eric Desgranges, Gonzalo Diethelm, Guido Goldstein, Hunter Ford, Kamil Shakirov, Martin Sustrik, Mike Castleman, Naveen Chawla, Nicola Peduzzi, Oliver Smith, Olivier Chamoux, Peter Alexander, Pierre Rouleau, Randy Dryburgh, John Unwin, Alex Thomas, Mihail Minkov, Jeremy Avnet, Michael Compton, Kamil Kisiel, Mark Kharitonov, Guillaume Aubert, Ian Barber, Mike Sheridan, Faruk Akgul, Oleg Sidorov, Lev Givon, Allister MacLeod, Alexander D'Archangel, Andreas Hoelzlwimmer, Han Holl, Robert G. Jakabosky, Felipe Cruz, Marcus McCurdy, Mikhail Kulemin, Dr. Gergő Érdi, Pavel Zhukov, Alexander Else, Giovanni Ruggiero, Rick "Technoweenie", Daniel Lundin, Dave Hoover, Simon Jefford, Benjamin Peterson, Justin Case, Devon Weller, Richard Smith, Alexander Morland, Wadim Grasza, Michael Jakl, Uwe Dauernheim, Sebastian Nowicki, Simone Deponti, Aaron Raddon, Dan Colish, Markus Schirp, Benoit Larroque, Jonathan Palardy, Isaiah Peng, Arkadiusz Orzechowski, Umut Aydin, Matthew Horsfall, Jeremy W. Sherman, Eric Pugh, Tyler Sellon, John E. Vincent, Pavel Mitin, Min RK, Igor Wiedler, Olof Åkesson, Patrick Lucas, Heow Goodman, Senthil Palanisami, John Gallagher, Tomas Roos, Stephen McQuay, Erik Allik, Arnaud Cogoluègnes, Rob Gagnon, Dan Williams, Edward Smith, James Tucker, Kristian Kristensen, Vadim Shalts, Martin Trojer, Tom van Leeuwen, Pandya Hiten, Harm Aarts, Marc Harter, Iskren Ivov Chernev, Jay Han, Sonia Hamilton, and Zed Shaw.

Thanks to Stathis Sideris for Ditaa, which I used for the diagrams.

Please use the issue tracker for all comments and errata. This version covers the latest stable release of ØMQ (2.2.x) and was published on Wed 8 August, 2012. If you are using ØMQ/3.1 some of the examples and explanations won't be accurate.

The Guide is originally in C, but also in PHP, Python, Lua, and Haxe. We've also translated most of the examples into C++, C#, CL, Erlang, F#, Felix, Haskell, Java, Objective-C, Ruby, Ada, Basic, Clojure, Go, Haxe, Node.js, ooc, Perl, and Scala.

Chapter One - Basic Stuff

topprevnext

Fixing the World

topprevnext

How to explain ØMQ? Some of us start by saying all the wonderful things it does. It's sockets on steroids. It's like mailboxes with routing. It's fast! Others try to share their moment of enlightenment, that zap-pow-kaboom satori paradigm-shift moment when it all became obvious. Things just become simpler. Complexity goes away. It opens the mind. Others try to explain by comparison. It's smaller, simpler, but still looks familiar. Personally, I like to remember why we made ØMQ at all, because that's most likely where you, the reader, still are today.

Programming is a science dressed up as art, because most of us don't understand the physics of software, and it's rarely if ever taught. The physics of software is not algorithms, data structures, languages and abstractions. These are just tools we make, use, throw away. The real physics of software is the physics of people.

Specifically, our limitations when it comes to complexity, and our desire to work together to solve large problems in pieces. This is the science of programming: make building blocks that people can understand and use easily, and people will work together to solve the very largest problems.

We live in a connected world, and modern software has to navigate this world. So the building blocks for tomorrow's very largest solutions are connected and massively parallel. It's not enough for code to be "strong and silent" any more. Code has to talk to code. Code has to be chatty, sociable, well-connected. Code has to run like the human brain, trillions of individual neurons firing off messages to each other, a massively parallel network with no central control, no single point of failure, yet able to solve immensely difficult problems. And it's no accident that the future of code looks like the human brain, because the endpoints of every network are, at some level, human brains.

If you've done any work with threads, protocols, or networks, you'll realize this is pretty much impossible. It's a dream. Even connecting a few programs across a few sockets is plain nasty, when you start to handle real life situations. Trillions? The cost would be unimaginable. Connecting computers is so difficult that software and services to do this is a multi-billion dollar business.

So we live in a world where the wiring is years ahead of our ability to use it. We had a software crisis in the 1980s, when leading software engineers like Fred Brooks believed there was no "Silver Bullet" to "promise even one order of magnitude of improvement in productivity, reliability, or simplicity".

Brooks missed free and open source software, which solved that crisis, enabling us to share knowledge efficiently. Today we face another software crisis, but it's one we don't talk about much. Only the largest, richest firms can afford to create connected applications. There is a cloud, but it's proprietary. Our data, our knowledge is disappearing from our personal computers into clouds that we cannot access, cannot compete with. Who owns our social networks? It is like the mainframe-PC revolution in reverse.

We can leave the political philosophy for another book. The point is that while the Internet offers the potential of massively connected code, the reality is that this is out of reach for most of us, and so, large interesting problems (in health, education, economics, transport, and so on) remain unsolved because there is no way to connect the code, and thus no way to connect the brains that could work together to solve these problems.

There have been many attempts to solve the challenge of connected software. There are thousands of IETF specifications, each solving part of the puzzle. For application developers, HTTP is perhaps the one solution to have been simple enough to work, but it arguably makes the problem worse, by encouraging developers and architects to think in terms of big servers and thin, stupid clients.

So today people are still connecting applications using raw UDP and TCP, proprietary protocols, HTTP, WebSockets. It remains painful, slow, hard to scale, and essentially centralized. Distributed P2P architectures are mostly for play, not work. How many applications use Skype or Bittorrent to exchange data?

Which brings us back to the science of programming. To fix the world, we needed to do two things. One, to solve the general problem of "how to connect any code to any code, anywhere". Two, to wrap that up in the simplest possible building blocks that people could understand and use easily.

It sounds ridiculously simple. And maybe it is. That's kind of the whole point.

ØMQ in a Hundred Words

topprevnext

ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.

Some Assumptions

topprevnext

We assume you are using the latest stable release of ØMQ. We assume you are using a Linux box or something similar. We assume you can read C code, more or less, that's the default language for the examples. We assume that when we write constants like PUSH or SUBSCRIBE you can imagine they are really called ZMQ_PUSH or ZMQ_SUBSCRIBE if the programming language needs it.

Getting the Examples

topprevnext

The Guide examples live in the Guide's git repository. The simplest way to get all the examples is to clone this repository:

git clone --depth=1 git://github.com/imatix/zguide.git

And then browse the examples subdirectory. You'll find examples by language. If there are examples missing in a language you use, you're encouraged to submit a translation. This is how the Guide became so useful, thanks to the work of many people.

All examples are licensed under MIT/X11, unless otherwise specified in the source code.

Ask and Ye Shall Receive

topprevnext

So let's start with some code. We start of course with a Hello World example. We'll make a client and a server. The client sends "Hello" to the server, which replies with "World". Here's the server in C, which opens a ØMQ socket on port 5555, reads requests on it, and replies with "World" to each request:

// Hello World server

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

int main (void)
{
// Socket to talk to clients
void *context = zmq_ctx_new ();
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind (responder, "tcp://*:5555");
assert (rc == 0);

while (1) {
char buffer [10];
zmq_recv (responder, buffer, 10, 0);
printf ("Received Hello\n");
sleep (1); // Do some 'work'
zmq_send (responder, "World", 5, 0);
}
return 0;
}

hwserver.c: Hello World server

Figure 1 - Request-Reply

fig1.png

The REQ-REP socket pair is lockstep. The client does zmq_send(3) and then zmq_recv(3), in a loop (or once if that's all it needs). Doing any other sequence (e.g. sending two messages in a row) will result in a return code of -1 from the send or recv call. Similarly the service does zmq_recv(3) and then zmq_send(3) in that order, and as often as it needs to.

ØMQ uses C as its reference language and this is the main language we'll use for examples. If you're reading this on-line, the link below the example takes you to translations into other programming languages. Let's compare the same server in C++:

//
// Hello World server in C++
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//

#include <zmq.hpp>
#include <string>
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>

#define sleep(n) Sleep(n)
#endif

int main () {
// Prepare our context and socket
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REP);
socket.bind ("tcp://*:5555");

while (true) {
zmq::message_t request;

// Wait for next request from client
socket.recv (&request);
std::cout << "Received Hello" << std::endl;

// Do some 'work'
sleep(1);

// Send reply back to client
zmq::message_t reply (5);
memcpy (reply.data (), "World", 5);
socket.send (reply);
}
return 0;
}

hwserver.cpp: Hello World server

You can see that the ØMQ API is similar in C and C++. In a language like PHP, we can hide even more and the code becomes even easier to read:

<?php
/*
* Hello World server
* Binds REP socket to tcp://*:5555
* Expects "Hello" from client, replies with "World"
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/

$context = new ZMQContext(1);

// Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");

while (true) {
// Wait for next request from client
$request = $responder->recv();
printf ("Received request: [%s]\n", $request);

// Do some 'work'
sleep (1);

// Send reply back to client
$responder->send("World");
}

hwserver.php: Hello World server

Here's the client code (click the link below the source to look at, or contribute a translation in your favorite programming language):

// Hello World client
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

int main (void)
{
printf ("Connecting to hello world server…\n");
void *context = zmq_ctx_new ();
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5555");

int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
char buffer [10];
printf ("Sending Hello %d…\n", request_nbr);
zmq_send (requester, "Hello", 5, 0);
zmq_recv (requester, buffer, 10, 0);
printf ("Received World %d\n", request_nbr);
}
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}

hwclient.c: Hello World client

Now this looks too simple to be realistic, but a ØMQ socket is what you get when you take a normal TCP socket, inject it with a mix of radioactive isotopes stolen from a secret Soviet atomic research project, bombard it with 1950-era cosmic rays, and put it into the hands of a drug-addled comic book author with a badly-disguised fetish for bulging muscles clad in spandex. Yes, ØMQ sockets are the world-saving superheroes of the networking world.

Figure 2 - A terrible accident…

fig2.png

You could literally throw thousands of clients at this server, all at once, and it would continue to work happily and quickly. For fun, try starting the client and then starting the server, see how it all still works, then think for a second what this means.

Let me explain briefly what these two programs are actually doing. They create a ØMQ context to work with, and a socket. Don't worry what the words mean. You'll pick it up. The server binds its REP (reply) socket to port 5555. The server waits for a request, in a loop, and responds each time with a reply. The client sends a request and reads the reply back from the server.

If you kill the server (Ctrl-C) and restart it, the client won't recover properly. Recovering from crashing processes isn't quite that easy. Making a reliable request-reply flow is complex enough that I won't cover it until Chapter Four.

There is a lot happening behind the scenes but what matters to us programmers is how short and sweet the code is, and how often it doesn't crash, even under heavy load. This is the request-reply pattern, probably the simplest way to use ØMQ. It maps to RPC and the classic client-server model.

A Minor Note on Strings

topprevnext

ØMQ doesn't know anything about the data you send except its size in bytes. That means you are responsible for formatting it safely so that applications can read it back. Doing this for objects and complex data types is a job for specialized libraries like Protocol Buffers. But even for strings you need to take care.

In C and some other languages, strings are terminated with a null byte. We could send a string like "HELLO" with that extra null byte:

zmq_send (requester, "Hello", 6, 0);

However if you send a string from another language it probably will not include that null byte. For example, when we send that same string in Python, we do this:

socket.send ("Hello")

Then what goes onto the wire is a length (one byte for shorter strings) and the string contents, as individual characters.

Figure 3 - A ØMQ string

fig3.png

And if you read this from a C program, you will get something that looks like a string, and might by accident act like a string (if by luck the five bytes find themselves followed by an innocently lurking null), but isn't a proper string. Which means that your client and server don't agree on the string format, you will get weird results.

When you receive string data from ØMQ, in C, you simply cannot trust that it's safely terminated. Every single time you read a string you should allocate a new buffer with space for an extra byte, copy the string, and terminate it properly with a null.

So let's establish the rule that ØMQ strings are length-specified, and are sent on the wire without a trailing null. In the simplest case (and we'll do this in our examples) a ØMQ string maps neatly to a ØMQ message frame, which looks like the above figure, a length and some bytes.

Here is what we need to do, in C, to receive a ØMQ string and deliver it to the application as a valid C string:

// Receive ZeroMQ string from socket and convert into C string
// Chops string at 255 chars, if it's longer

static char *
s_recv (void *socket) {
char buffer [256];
int size = zmq_recv (socket, buffer, 255, 0);
if (size == -1)
return NULL;
if (size > 255)
size = 255;
buffer [size] = 0;
return strdup (buffer);
}

This makes a very handy helper function and in the spirit of making things we can reuse profitably, let's write a similar 's_send' function that sends strings in the correct ØMQ format, and package this into a header file we can reuse.

The result is zhelpers.h, which lets us write sweeter and shorter ØMQ applications in C. It is a fairly long source, and only fun for C developers, so read it at leisure.

Version Reporting

topprevnext

ØMQ does come in several versions and quite often, if you hit a problem, it'll be something that's been fixed in a later version. So it's a useful trick to know exactly what version of ØMQ you're actually linking with. Here is a tiny program that does that:

// Report 0MQ version

#include <zmq.h>

int main (void)
{
int major, minor, patch;
zmq_version (&major, &minor, &patch);
printf ("Current 0MQ version is %d.%d.%d\n", major, minor, patch);
return 0;
}

version.c: ØMQ version reporting

Getting the Message Out

topprevnext

The second classic pattern is one-way data distribution, in which a server pushes updates to a set of clients. Let's see an example that pushes out weather updates consisting of a zip code, temperature, and relative humidity. We'll generate random values, just like the real weather stations do.

Here's the server. We'll use port 5556 for this application:

package ;
import haxe.io.Bytes;
import neko.Lib;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Weather update server in Haxe
* Binds PUB socket to tcp://*:5556
* Publishes random weather updates
*
* See: http://zguide.zeromq.org/page:all#Getting-the-Message-Out
*
* Use with WUClient.hx
*/

class WUServer
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** WUServer (see: http://zguide.zeromq.org/page:all#Getting-the-Message-Out)");

var publisher:ZMQSocket = context.socket(ZMQ_PUB);
publisher.bind("tcp://127.0.0.1:5556");

while (true) {
// Get values that will fool the boss
var zipcode, temperature, relhumidity;
zipcode = Std.random(100000) + 1;
temperature = Std.random(215) - 80 + 1;
relhumidity = Std.random(50) + 10 + 1;

// Send message to all subscribers
var update:String = zipcode + " " + temperature + " " + relhumidity;
publisher.sendMsg(Bytes.ofString(update));
}
}
}

wuserver.hx: Weather update server

There's no start, and no end to this stream of updates, it's like a never ending broadcast.

Figure 4 - Publish-Subscribe

fig4.png

Here is client application, which listens to the stream of updates and grabs anything to do with a specified zip code, by default New York City because that's a great place to start any adventure:

package ;
import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQException;
import org.zeromq.ZMQSocket;

/**
* Weather update client in Haxe
* Connects SUB socket to tcp://localhost:5556
* Collects weather updates and finds average temp in zipcode
*
* Use optional argument to specify zip code (in range 1 to 100000)
*
* See: http://zguide.zeromq.org/page:all#Getting-the-Message-Out
*
* Use with WUServer.hx
*/

class WUClient
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** WUClient (see: http://zguide.zeromq.org/page:all#Getting-the-Message-Out)");

// Socket to talk to server
trace ("Collecting updates from weather server…");
var subscriber:ZMQSocket = context.socket(ZMQ_SUB);
subscriber.setsockopt(ZMQ_LINGER, 0); // Don't block when closing socket at end

subscriber.connect("tcp://localhost:5556");

// Subscribe to zipcode, default in NYC, 10001
var filter:String =
if (Sys.args().length > 0) {
Sys.args()[0];
} else {
"10001";
};

try {
subscriber.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString(filter));
} catch (e:ZMQException) {
trace (e.str());
}

// Process 100 updates
var update_nbr = 0;
var total_temp:Int = 0;
for (update_nbr in 0100) {
var msg:Bytes = subscriber.recvMsg();
trace (update_nbr+ ". Received: " + msg.toString());

var zipcode, temperature, relhumidity;

var sscanf:Array<String> = msg.toString().split(" ");
zipcode = sscanf[0];
temperature = sscanf[1];
relhumidity = sscanf[2];
total_temp += Std.parseInt(temperature);

}
trace ("Average temperature for zipcode " + filter + " was " + total_temp / 100);

// Close gracefully
subscriber.close();
context.term();
}
}

wuclient.hx: Weather update client

Note that when you use a SUB socket you must set a subscription using zmq_setsockopt(3) and SUBSCRIBE, as in this code. If you don't set any subscription, you won't get any messages. It's a common mistake for beginners. The subscriber can set many subscriptions, which are added together. That is, if a update matches ANY subscription, the subscriber receives it. The subscriber can also unsubscribe specific subscriptions. Subscriptions are length-specified blobs. See zmq_setsockopt(3) for how this works.

The PUB-SUB socket pair is asynchronous. The client does zmq_recv(3), in a loop (or once if that's all it needs). Trying to send a message to a SUB socket will cause an error. Similarly the service does zmq_send(3) as often as it needs to, but must not do zmq_recv(3) on a PUB socket.

In theory with ØMQ sockets, it does not matter which end connects, and which end binds. However with PUB-SUB sockets, if you bind the SUB socket and connect the PUB socket, the SUB socket may receive old messages, i.e. messages sent before the SUB started up. This is an artifact of the way bind/connect works. It's best to bind the PUB and connect the SUB, if you can.

There is one more important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

This "slow joiner" symptom hits enough people, often enough, that I'm going to explain it in detail. Remember that ØMQ does asynchronous I/O, i.e. in the background. Say you have two nodes doing this, in this order:

  • Subscriber connects to an endpoint and receives and counts messages.
  • Publisher binds to an endpoint and immediately sends 1,000 messages.

Then the subscriber will most likely not receive anything. You'll blink, check that you set a correct filter, and try again, and the subscriber will still not receive anything.

Making a TCP connection involves to and fro handshaking that takes several milliseconds depending on your network and the number of hops between peers. In that time, ØMQ can send very many messages. For sake of argument assume it takes 5 msecs to establish a connection, and that same link can handle 1M messages per second. During the 5 msecs that the subscriber is connecting to the publisher, it takes the publisher only 1 msec to send out those 1K messages.

In Chapter Two I'll explain how to synchronize a publisher and subscribers so that you don't start to publish data until the subscriber(s) really are connected and ready. There is a simple and stupid way to delay the publisher, which is to sleep. I'd never do this in a real application though, it is extremely fragile as well as inelegant and slow. Use sleeps to prove to yourself what's happening, and then wait for Chapter 2 to see how to do this right.

The alternative to synchronization is to simply assume that the published data stream is infinite and has no start, and no end. This is how we built our weather client example.

So the client subscribes to its chosen zip code and collects a thousand updates for that zip code. That means about ten million updates from the server, if zip codes are randomly distributed. You can start the client, and then the server, and the client will keep working. You can stop and restart the server as often as you like, and the client will keep working. When the client has collected its thousand updates, it calculates the average, prints it, and exits.

Some points about the publish-subscribe pattern:

  • A subscriber can in fact connect to more than one publisher, using one 'connect' call each time. Data will then arrive and be interleaved so that no single publisher drowns out the others.
  • If a publisher has no connected subscribers, then it will simply drop all messages.
  • If you're using TCP, and a subscriber is slow, messages will queue up on the publisher. We'll look at how to protect publishers against this, using the "high-water mark" later.
  • In the current versions of ØMQ, filtering happens at the subscriber side, not the publisher side. This means, over TCP, that a publisher will send all messages to all subscribers, which will then drop messages they don't want.

This is how long it takes to receive and filter 10M messages on my box, which is an Intel 4 core Q8300, fast but nothing special:

ph@ws200901:~/work/git/ØMQGuide/examples/c$ time wuclient
Collecting updates from weather server...
Average temperature for zipcode '10001 ' was 18F

real    0m5.939s
user    0m1.590s
sys     0m2.290s

Divide and Conquer

topprevnext

As a final example (you are surely getting tired of juicy code and want to delve back into philological discussions about comparative abstractive norms), let's do a little supercomputing. Then coffee. Our supercomputing application is a fairly typical parallel processing model:

  • We have a ventilator that produces tasks that can be done in parallel.
  • We have a set of workers that process tasks.
  • We have a sink that collects results back from the worker processes.

In reality, workers run on superfast boxes, perhaps using GPUs (graphic processing units) to do the hard maths. Here is the ventilator. It generates 100 tasks, each is a message telling the worker to sleep for some number of milliseconds:

package ;

import haxe.io.Bytes;
import haxe.Stack;
import neko.Lib;
import neko.io.File;
import neko.io.FileInput;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQException;
import org.zeromq.ZMQSocket;

/**
* Task ventilator in Haxe
* Binds PUSH socket to tcp://localhost:5557
* Sends batch of tasks to workers via that socket.
*
* Based on code from: http://zguide.zeromq.org/java:taskvent
*
* Use with TaskWork.hx and TaskSink.hx
*/

class TaskVent
{

public static function main() {

try {
var context:ZMQContext = ZMQContext.instance();
var sender:ZMQSocket = context.socket(ZMQ_PUSH);

Lib.println("** TaskVent (see: http://zguide.zeromq.org/page:all#Divide-and-Conquer)");

sender.bind("tcp://127.0.0.1:5557");

Lib.println("Press Enter when the workers are ready: ");
var f:FileInput = File.stdin();
var str:String = f.readLine();
Lib.println("Sending tasks to workers …\n");

// The first message is "0" and signals starts of batch
sender.sendMsg(Bytes.ofString("0"));

// Send 100 tasks
var totalMsec:Int = 0; // Total expected cost in msec
for (task_nbr in 0 100) {
var workload = Std.random(100) + 1; // Generates 1 to 100 msecs
totalMsec += workload;
Lib.print(workload + ".");
sender.sendMsg(Bytes.ofString(Std.string(workload)));
}
Lib.println("Total expected cost: " + totalMsec + " msec");

// Give 0MQ time to deliver
Sys.sleep(1);

sender.close();
context.term();
} catch (e:ZMQException) {
trace("ZMQException #:" + ZMQ.errNoToErrorType(e.errNo) + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));

}
}
}

taskvent.hx: Parallel task ventilator

Figure 5 - Parallel Pipeline

fig5.png

Here is the worker application. It receives a message, sleeps for that number of seconds, then signals that it's finished:

package ;

import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Task worker in Haxe
* Connects PULL socket to tcp://localhost:5557
* Collects workloads from ventilator via that socket
* Connects PUSH socket to tcp://localhost:5558
* Sends results to sink via that socket
*
* See: http://zguide.zeromq.org/page:all#Divide-and-Conquer
*
* Based on code from: http://zguide.zeromq.org/java:taskwork
*
* Use with TaskVent.hx and TaskSink.hx
*/

class TaskWork
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** TaskWork (see: http://zguide.zeromq.org/page:all#Divide-and-Conquer)");

// Socket to receive messages on
var receiver:ZMQSocket = context.socket(ZMQ_PULL);
receiver.connect("tcp://127.0.0.1:5557");

// Socket to send messages to
var sender:ZMQSocket = context.socket(ZMQ_PUSH);
sender.connect("tcp://127.0.0.1:5558");

// Process tasks forever
while (true) {
var msgString = StringTools.trim(receiver.recvMsg().toString());
var sec:Float = Std.parseFloat(msgString) / 1000.0;
Lib.print(msgString + ".");

// Do the work
Sys.sleep(sec);

// Send results to sink
sender.sendMsg(Bytes.ofString(""));
}


}
}

taskwork.hx: Parallel task worker

Here is the sink application. It collects the 100 tasks, then calculates how long the overall processing took, so we can confirm that the workers really were running in parallel, if there are more than one of them:

package ;

import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Task sink in Haxe
* Binds PULL request socket to tcp://localhost:5558
* Collects results from workers via this socket
*
* See: http://zguide.zeromq.org/page:all#Divide-and-Conquer
*
* Based on http://zguide.zeromq.org/java:tasksink
*
* Use with TaskVent.hx and TaskWork.hx
*/

class TaskSink
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** TaskSink (see: http://zguide.zeromq.org/page:all#Divide-and-Conquer)");

// Socket to receive messages on
var receiver:ZMQSocket = context.socket(ZMQ_PULL);
receiver.bind("tcp://127.0.0.1:5558");

// Wait for start of batch
var msgString = StringTools.trim(receiver.recvMsg().toString());

// Start our clock now
var tStart = Sys.time();

// Process 100 messages
var task_nbr:Int;
for (task_nbr in 0 100) {
msgString = StringTools.trim(receiver.recvMsg().toString());
if (task_nbr % 10 == 0) {
Lib.println(":"); // Print a ":" every 10 messages
} else {
Lib.print(".");
}
}
// Calculate and report duation of batch
var tEnd = Sys.time();
Lib.println("Total elapsed time: " + Math.ceil((tEnd - tStart) * 1000) + " msec");

receiver.close();
context.term();
}
}

tasksink.hx: Parallel task sink

The average cost of a batch is 5 seconds. When we start 1, 2, 4 workers we get results like this from the sink:

#   1 worker
Total elapsed time: 5034 msec
#   2 workers
Total elapsed time: 2421 msec
#   4 workers
Total elapsed time: 1018 msec

Let's look at some aspects of this code in more detail:

  • The workers connect upstream to the ventilator, and downstream to the sink. This means you can add workers arbitrarily. If the workers bound to their endpoints, you would need (a) more endpoints and (b) to modify the ventilator and/or the sink each time you added a worker. We say that the ventilator and sink are 'stable' parts of our architecture and the workers are 'dynamic' parts of it.
  • We have to synchronize the start of the batch with all workers being up and running. This is a fairly common gotcha in ØMQ and there is no easy solution. The 'connect' method takes a certain time. So when a set of workers connect to the ventilator, the first one to successfully connect will get a whole load of messages in that short time while the others are also connecting. If you don't synchronize the start of the batch somehow, the system won't run in parallel at all. Try removing the wait, and see.
  • The ventilator's PUSH socket distributes tasks to workers (assuming they are all connected before the batch starts going out) evenly. This is called load-balancing and it's something we'll look at again in more detail.
  • The sink's PULL socket collects results from workers evenly. This is called fair-queuing.

Figure 6 - Fair Queuing

fig6.png

The pipeline pattern also exhibits the "slow joiner" syndrome, leading to accusations that PUSH sockets don't load balance properly. If you are using PUSH and PULL, and one of your workers gets way more messages than the others, it's because that PULL socket has joined faster than the others, and grabs a lot of messages before the others manage to connect.

Programming with ØMQ

topprevnext

Having seen some examples, you're eager to start using ØMQ in some apps. Before you start that, take a deep breath, chillax, and reflect on some basic advice that will save you stress and confusion.

  • Learn ØMQ step by step. It's just one simple API but it hides a world of possibilities. Take the possibilities slowly, master each one.
  • Write nice code. Ugly code hides problems and makes it hard for others to help you. You might get used to meaningless variable names, but people reading your code won't. Use names that are real words, that say something other than "I'm too careless to tell you what this variable is really for". Use consistent indentation, clean layout. Write nice code and your world will be more comfortable.
  • Test what you make as you make it. When your program doesn't work, you should know what five lines are to blame. This is especially true when you do ØMQ magic, which just won't work the first few times you try it.
  • When you find that things don't work as expected, break your code into pieces, test each one, see which one is not working. ØMQ lets you make essentially modular code, use that to your advantage.
  • Make abstractions (classes, methods, whatever) as you need them. If you copy/paste a lot of code you're going to copy/paste errors too.

To illustrate, here is a fragment of code someone asked me to help fix:

//  NOTE: do NOT reuse this example code!
static char *topic_str = "msg.x|";

void* pub_worker(void* arg){
    void *ctx = arg;
    assert(ctx);

    void *qskt = zmq_socket(ctx, ZMQ_REP);
    assert(qskt);

    int rc = zmq_connect(qskt, "inproc://querys");
    assert(rc == 0);

    void *pubskt = zmq_socket(ctx, ZMQ_PUB);
    assert(pubskt);

    rc = zmq_bind(pubskt, "inproc://publish");
    assert(rc == 0);

    uint8_t cmd;
    uint32_t nb;
    zmq_msg_t topic_msg, cmd_msg, nb_msg, resp_msg;

    zmq_msg_init_data(&topic_msg, topic_str, strlen(topic_str) , NULL, NULL);

    fprintf(stdout,"WORKER: ready to receive messages\n");
    //  NOTE: do NOT reuse this example code, It's broken.
    //  e.g. topic_msg will be invalid the second time through
    while (1){
    zmq_send(pubskt, &topic_msg, ZMQ_SNDMORE);

    zmq_msg_init(&cmd_msg);
    zmq_recv(qskt, &cmd_msg, 0);
    memcpy(&cmd, zmq_msg_data(&cmd_msg), sizeof(uint8_t));
    zmq_send(pubskt, &cmd_msg, ZMQ_SNDMORE);
    zmq_msg_close(&cmd_msg);

    fprintf(stdout, "received cmd %u\n", cmd);

    zmq_msg_init(&nb_msg);
    zmq_recv(qskt, &nb_msg, 0);
    memcpy(&nb, zmq_msg_data(&nb_msg), sizeof(uint32_t));
    zmq_send(pubskt, &nb_msg, 0);
    zmq_msg_close(&nb_msg);

    fprintf(stdout, "received nb %u\n", nb);

    zmq_msg_init_size(&resp_msg, sizeof(uint8_t));
    memset(zmq_msg_data(&resp_msg), 0, sizeof(uint8_t));
    zmq_send(qskt, &resp_msg, 0);
    zmq_msg_close(&resp_msg);

    }
    return NULL;
}

This is what I rewrote it to, as part of finding the bug:

// NOTE: do NOT reuse this example code!
static char *topic_str = "msg.x|";

void* pub_worker(void* arg){
void *ctx = arg;
assert(ctx);

void *qskt = zmq_socket(ctx, ZMQ_REP);
assert(qskt);

int rc = zmq_connect(qskt, "inproc://querys");
assert(rc == 0);

void *pubskt = zmq_socket(ctx, ZMQ_PUB);
assert(pubskt);

rc = zmq_bind(pubskt, "inproc://publish");
assert(rc == 0);

uint8_t cmd;
uint32_t nb;
zmq_msg_t topic_msg, cmd_msg, nb_msg, resp_msg;

zmq_msg_init_data(&topic_msg, topic_str, strlen(topic_str) , NULL, NULL);

fprintf(stdout,"WORKER: ready to receive messages\n");
// NOTE: do NOT reuse this example code, It's broken.
// e.g. topic_msg will be invalid the second time through
while (1){
zmq_msg_send(pubskt, &topic_msg, ZMQ_SNDMORE);

zmq_msg_init(&cmd_msg);
zmq_msg_recv(qskt, &cmd_msg, 0);
memcpy(&cmd, zmq_msg_data(&cmd_msg), sizeof(uint8_t));
zmq_msg_send(pubskt, &cmd_msg, ZMQ_SNDMORE);
zmq_msg_close(&cmd_msg);

fprintf(stdout, "received cmd %u\n", cmd);

zmq_msg_init(&nb_msg);
zmq_msg_recv(qskt, &nb_msg, 0);
memcpy(&nb, zmq_msg_data(&nb_msg), sizeof(uint32_t));
zmq_msg_send(pubskt, &nb_msg, 0);
zmq_msg_close(&nb_msg);

fprintf(stdout, "received nb %u\n", nb);

zmq_msg_init_size(&resp_msg, sizeof(uint8_t));
memset(zmq_msg_data(&resp_msg), 0, sizeof(uint8_t));
zmq_msg_send(qskt, &resp_msg, 0);
zmq_msg_close(&resp_msg);

}
return NULL;
}

In the end, the problem was that the application was passing sockets between threads, which crashed weirdly. It became legal behavior in ØMQ/2.1, but remains dangerous unless you use a "full memory barrier", and it's something we advise against doing.

Getting the Context Right

topprevnext

ØMQ applications always start by creating a context, and then using that for creating sockets. In C, it's the zmq_init(3) call. You should create and use exactly one context in your process. Technically, the context is the container for all sockets in a single process, and acts as the transport for inproc sockets, which are the fastest way to connect threads in one process. If at runtime a process has two contexts, these are like separate ØMQ instances. If that's explicitly what you want, OK, but otherwise remember:

Do one zmq_init(3) at the start of your main line code, and one zmq_term(3) at the end.

If you're using the fork() system call, each process needs its own context. If you do zmq_init(3) in the main process before calling fork(), the child processes get their own contexts. In general you want to do the interesting stuff in the child processes, and just manage these from the parent process.

Making a Clean Exit

topprevnext

Classy programmers share the same motto as classy hit men: always clean-up when you finish the job. When you use ØMQ in a language like Python, stuff gets automatically freed for you. But when using C you have to carefully free objects when you're finished with them, or you get memory leaks, unstable applications, and generally bad karma.

Memory leaks are one thing, but ØMQ is quite finicky about how you exit an application. The reasons are technical and painful but the upshot is that if you leave any sockets open, the zmq_term(3) function will hang forever. And even if you close all sockets, zmq_term(3) will by default wait forever if there are pending connects or sends. Unless you set the LINGER to zero on those sockets before closing them.

The ØMQ objects we need to worry about are messages, sockets, and contexts. Luckily it's quite simple, at least in simple programs:

  • Always close a message the moment you are done with it, using zmq_msg_close(3).
  • If you are opening and closing a lot of sockets, that's probably a sign you need to redesign your application.
  • When you exit the program, close your sockets and then call zmq_term(3). This destroys the context.

If you're doing multithreaded work, it gets rather more complex than this. We'll get to multithreading in the next chapter, but because some of you will, despite warnings, will try to run before you can safely walk, below is the quick and dirty guide to making a clean exit in a multithreaded ØMQ application.

First, do not try to use the same socket from multiple threads. No, don't explain why you think this would be excellent fun, just please don't do it. Next, relingerfy and close all sockets, and terminate the context in the main thread. Lastly, this'll cause any blocking receives or polls or sends in attached threads (i.e. which share the same context) to return with an error. Catch that, and then relingerize and close sockets in that thread, and exit. Do not terminate the same context twice. The zmq_term in the main thread will block until all sockets it knows about are safely closed.

Voila! It's complex and painful enough that any language binding author worth his or her salt will do this automatically and make the socket closing dance unnecessary.

Why We Needed ØMQ

topprevnext

Now that you've seen ØMQ in action, let's go back to the "why".

Many applications these days consist of components that stretch across some kind of network, either a LAN or the Internet. So many application developers end up doing some kind of messaging. Some developers use message queuing products, but most of the time they do it themselves, using TCP or UDP. These protocols are not hard to use, but there is a great difference between sending a few bytes from A to B, and doing messaging in any kind of reliable way.

Let's look at the typical problems we face when we start to connect pieces using raw TCP. Any reusable messaging layer would need to solve all or most these:

  • How do we handle I/O? Does our application block, or do we handle I/O in the background? This is a key design decision. Blocking I/O creates architectures that do not scale well. But background I/O can be very hard to do right.
  • How do we handle dynamic components, i.e. pieces that go away temporarily? Do we formally split components into "clients" and "servers" and mandate that servers cannot disappear? What then if we want to connect servers to servers? Do we try to reconnect every few seconds?
  • How do we represent a message on the wire? How do we frame data so it's easy to write and read, safe from buffer overflows, efficient for small messages, yet adequate for the very largest videos of dancing cats wearing party hats?
  • How do we handle messages that we can't deliver immediately? Particularly, if we're waiting for a component to come back on-line? Do we discard messages, put them into a database, or into a memory queue?
  • Where do we store message queues? What happens if the component reading from a queue is very slow, and causes our queues to build up? What's our strategy then?
  • How do we handle lost messages? Do we wait for fresh data, request a resend, or do we build some kind of reliability layer that ensures messages cannot be lost? What if that layer itself crashes?
  • What if we need to use a different network transport. Say, multicast instead of TCP unicast? Or IPv6? Do we need to rewrite the applications, or is the transport abstracted in some layer?
  • How do we route messages? Can we send the same message to multiple peers? Can we send replies back to an original requester?
  • How do we write an API for another language? Do we re-implement a wire-level protocol or do we repackage a library? If the former, how can we guarantee efficient and stable stacks? If the latter, how can we guarantee interoperability?
  • How do we represent data so that it can be read between different architectures? Do we enforce a particular encoding for data types? How far is this the job of the messaging system rather than a higher layer?
  • How do we handle network errors? Do we wait and retry, ignore them silently, or abort?

Take a typical open source project like Hadoop Zookeeper and read the C API code in src/c/src/zookeeper.c. It's 3,200 lines of mystery and in there is an undocumented, client-server network communication protocol. I see it's efficient because it uses poll() instead of select(). But really, Zookeeper should be using a generic messaging layer and an explicitly documented wire level protocol. It is incredibly wasteful for teams to be building this particular wheel over and over.

Figure 7 - Messaging as it Starts

fig7.png

But how to make a reusable messaging layer? Why, when so many projects need this technology, are people still doing it the hard way, by driving TCP sockets in their code, and solving the problems in that long list, over and over?

It turns out that building reusable messaging systems is really difficult, which is why few FOSS projects ever tried, and why commercial messaging products are complex, expensive, inflexible, and brittle. In 2006 iMatix designed AMQP which started to give FOSS developers perhaps the first reusable recipe for a messaging system. AMQP works better than many other designs but remains relatively complex, expensive, and brittle. It takes weeks to learn to use, and months to create stable architectures that don't crash when things get hairy.

Most messaging projects, like AMQP, that try to solve this long list of problems in a reusable way do so by inventing a new concept, the "broker", that does addressing, routing, and queuing. This results in a client-server protocol or a set of APIs on top of some undocumented protocol, that let applications speak to this broker. Brokers are an excellent thing in reducing the complexity of large networks. But adding broker-based messaging to a product like Zookeeper would make it worse, not better. It would mean adding an additional big box, and a new single point of failure. A broker rapidly becomes a bottleneck and a new risk to manage. If the software supports it, we can add a second, third, fourth broker and make some fail-over scheme. People do this. It creates more moving pieces, more complexity, more things to break.

And a broker-centric set-up needs its own operations team. You literally need to watch the brokers day and night, and beat them with a stick when they start misbehaving. You need boxes, and you need backup boxes, and you need people to manage those boxes. It is only worth doing for large applications with many moving pieces, built by several teams of people, over several years.

So small to medium application developers are trapped. Either they avoid network programming, and make monolithic applications that do not scale. Or they jump into network programming and make brittle, complex applications that are hard to maintain. Or they bet on a messaging product, and end up with scalable applications that depend on expensive, easily broken technology. There has been no really good choice, which is maybe why messaging is largely stuck in the last century and stirs strong emotions. Negative ones for users, gleeful joy for those selling support and licenses.

Figure 8 - Messaging as it Becomes

fig8.png

What we need is something that does the job of messaging but does it in such a simple and cheap way that it can work in any application, with close to zero cost. It should be a library that you just link with, without any other dependencies. No additional moving pieces, so no additional risk. It should run on any OS and work with any programming language.

And this is ØMQ: an efficient, embeddable library that solves most of the problems an application needs to become nicely elastic across a network, without much cost.

Specifically:

  • It handles I/O asynchronously, in background threads. These communicate with application threads using lock-free data structures, so ØMQ applications need no locks, semaphores, or other wait states.
  • Components can come and go dynamically and ØMQ will automatically reconnect. This means you can start components in any order. You can create "service-oriented architectures" (SOAs) where services can join and leave the network at any time.
  • It queues messages automatically when needed. It does this intelligently, pushing messages as close as possible to the receiver before queuing them.
  • It has ways of dealing with over-full queues (called "high water mark"). When a queue is full, ØMQ automatically blocks senders, or throws away messages, depending on the kind of messaging you are doing (the so-called "pattern").
  • It lets your applications talk to each other over arbitrary transports: TCP, multicast, in-process, inter-process. You don't need to change your code to use a different transport.
  • It handles slow/blocked readers safely, using different strategies that depend on the messaging pattern.
  • It lets you route messages using a variety of patterns such as request-reply and publish-subscribe. These patterns are how you create the topology, the structure of your network.
  • It lets you place pattern-extending "devices" (small brokers) in the network when you need to reduce the complexity of interconnecting many pieces.
  • It delivers whole messages exactly as they were sent, using a simple framing on the wire. If you write a 10k message, you will receive a 10k message.
  • It does not impose any format on messages. They are blobs of zero to gigabytes large. When you want to represent data you choose some other product on top, such as Google's protocol buffers, XDR, and others.
  • It handles network errors intelligently. Sometimes it retries, sometimes it tells you an operation failed.
  • It reduces your carbon footprint. Doing more with less CPU means your boxes use less power, and you can keep your old boxes in use for longer. Al Gore would love ØMQ.

Actually ØMQ does rather more than this. It has a subversive effect on how you develop network-capable applications. Superficially it's just a socket API on which you do zmq_recv(3) and zmq_send(3). But message processing rapidly becomes the central loop, and your application soon breaks down into a set of message processing tasks. It is elegant and natural. And it scales: each of these tasks maps to a node, and the nodes talk to each other across arbitrary transports. Two nodes in one process (node is a thread), two nodes on one box (node is a process), two boxes on one network (node is a box). With no application code changes.

Socket Scalability

topprevnext

Let's see ØMQ's scalability in action. Here is a shell script that starts the weather server and then a bunch of clients in parallel:

wuserver &
wuclient 12345 &
wuclient 23456 &
wuclient 34567 &
wuclient 45678 &
wuclient 56789 &

As the clients run, we take a look at the active processes using 'top', and we see something like (on a 4-core box):

  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
 7136 ph        20   0 1040m 959m 1156 R  157 12.0  16:25.47 wuserver
 7966 ph        20   0 98608 1804 1372 S   33  0.0   0:03.94 wuclient
 7963 ph        20   0 33116 1748 1372 S   14  0.0   0:00.76 wuclient
 7965 ph        20   0 33116 1784 1372 S    6  0.0   0:00.47 wuclient
 7964 ph        20   0 33116 1788 1372 S    5  0.0   0:00.25 wuclient
 7967 ph        20   0 33072 1740 1372 S    5  0.0   0:00.35 wuclient

Let's think for a second about what is happening here. The weather server has a single socket, and yet here we have it sending data to five clients in parallel. We could have thousands of concurrent clients. The server application doesn't see them, doesn't talk to them directly.

Missing Message Problem Solver

topprevnext

As you start to program with ØMQ you will come across one problem more than once: you lose messages that you expect to receive. Here is a basic problem solver that walks through the most common causes for this. Don't worry if some of the terminology is unfamiliar still, it'll become clearer in the next chapters.

Figure 9 - Missing Message Problem Solver

fig9.png

If you're using ØMQ in a context where failures are expensive, then you want to plan properly. First, build prototypes that let you learn and test the different aspects of your design. Stress them until they break, so that you know exactly how strong your designs are. Second, invest in testing. This means building test frameworks, ensuring you have access to realistic setups with sufficient computer power, and getting time or help to actually test seriously. Ideally, one team writes the code, a second team tries to break it. Lastly, do get your organization to contact iMatix to discuss how we can help to make sure things work properly, and can be fixed rapidly if they break.

In short: if you have not proven an architecture works in realistic conditions, it will most likely break at the worst possible moment.

Warning - Unstable Paradigms!

topprevnext

Traditional network programming is built on the general assumption that one socket talks to one connection, one peer. There are multicast protocols but they are exotic. When we assume "one socket = one connection", we scale our architectures in certain ways. We create threads of logic where each thread work with one socket, one peer. We place intelligence and state in these threads.

In the ØMQ universe, sockets are clever multithreaded applications that manage a whole set of connections automagically for you. You can't see, work with, open, close, or attach state to these connections. Whether you use blocking send or receive, or poll, all you can talk to is the socket, not the connections it manages for you. The connections are private and invisible, and this is the key to ØMQ's scalability.

Because your code, talking to a socket, can then handle any number of connections across whatever network protocols are around, without change. A messaging pattern sitting in ØMQ can scale more cheaply than a messaging pattern sitting in your application code.

So the general assumption no longer applies. As you read the code examples, your brain will try to map them to what you know. You will read "socket" and think "ah, that represents a connection to another node". That is wrong. You will read "thread" and your brain will again think, "ah, a thread represents a connection to another node", and again your brain will be wrong.

If you're reading this Guide for the first time, realize that until you actually write ØMQ code for a day or two (and maybe three or four days), you may feel confused, especially by how simple ØMQ makes things for you, and you may try to impose that general assumption on ØMQ, and it won't work. And then you will experience your moment of enlightenment and trust, that zap-pow-kaboom satori paradigm-shift moment when it all becomes clear.

Chapter Two - Intermediate Stuff

topprevnext

In Chapter One we took ØMQ for a drive, with some basic examples of the main ØMQ patterns: request-reply, publish-subscribe, and pipeline. In this chapter we're going to get our hands dirty and start to learn how to use these tools in real programs.

We'll cover:

  • How to create and work with ØMQ sockets.
  • How to send and receive messages on sockets.
  • How to build your apps around ØMQ's asynchronous I/O model.
  • How to handle multiple sockets in one thread.
  • How to handle fatal and non-fatal errors properly.
  • How to handle interrupt signals like Ctrl-C.
  • How to shutdown a ØMQ application cleanly.
  • How to check a ØMQ application for memory leaks.
  • How to send and receive multipart messages.
  • How to forward messages across networks.
  • How to build a simple message queuing broker.
  • How to write multithreaded applications with ØMQ.
  • How to use ØMQ to signal between threads.
  • How to use ØMQ to coordinate a network of nodes.
  • How to create and use message envelopes for publish-subscribe.
  • Using the high-water mark (HWM) to protect against memory overflows.

The Zen of Zero

topprevnext

The Ø in ØMQ is all about tradeoffs. On the one hand this strange name lowers ØMQ's visibility on Google and Twitter. On the other hand it annoys the heck out of some Danish folk who write us things like "ØMG røtfl", and "Ø is not a funny looking zero!" and "Rødgrød med Fløde!", which is apparently an insult that means "may your neighbours be the direct descendants of Grendel!" Seems like a fair trade.

Originally the zero in ØMQ was meant as "zero broker" and (as close to) "zero latency" (as possible). In the meantime it has come to cover different goals: zero administration, zero cost, zero waste. More generally, "zero" refers to the culture of minimalism that permeates the project. We add power by removing complexity rather than exposing new functionality.

The Socket API

topprevnext

To be perfectly honest, ØMQ does a kind of switch-and-bait on you. Which we don't apologize for, it's for your own good and hurts us more than it hurts you. It presents a familiar BSD socket API but that hides a bunch of message-processing machines that will slowly fix your world-view about how to design and write distributed software.

Sockets are the de-facto standard API for network programming, as well as being useful for stopping your eyes from falling onto your cheeks. One thing that makes ØMQ especially tasty to developers is that it uses a standard socket API. Kudos to Martin Sustrik for pulling this idea off. It turns "Message Oriented Middleware", a phrase guaranteed to send the whole room off to Catatonia, into "Extra Spicy Sockets!" which leaves us with a strange craving for pizza, and a desire to know more.

Like a nice pepperoni pizza, ØMQ sockets are easy to digest. Sockets have a life in four parts, just like BSD sockets:

  • Creating and destroying sockets, which go together to form a karmic circle of socket life (see zmq_socket(3), zmq_close(3)).
  • Plugging sockets onto the network topology by creating ØMQ connections to and from them (see zmq_bind(3), zmq_connect(3)).

Which looks like this, in C:

static void *
worker_thread (void *arg) {
void *context = arg;
void *worker = zmq_socket (context, ZMQ_REP);
assert (worker);
int rc;
rc = zmq_connect (worker, "ipc://worker");
assert (rc == 0);

void *broadcast = zmq_socket (context, ZMQ_PUB);
assert (broadcast);
rc = zmq_bind (broadcast, "ipc://publish");
assert (rc == 0);

while (1) {
char *part1 = s_recv (worker);
char *part2 = s_recv (worker);
printf ("Worker got [%s][%s]\n", part1, part2);
s_sendmore (broadcast, "msg");
s_sendmore (broadcast, part1);
s_send (broadcast, part2);
free (part1);
free (part2);

s_send (worker, "OK");
}
return NULL;
}

Note that sockets are always void pointers, and messages (which we'll come to very soon) are structures. So in C you pass sockets as-such, but you pass addresses of messages in all functions that work with messages, like zmq_send(3) and zmq_recv(3). As a mnemonic, realize that "in ØMQ all your sockets are belong to us", but messages are things you actually own in your code.

Creating, destroying, and configuring sockets works as you'd expect for any object. But remember that ØMQ is an asynchronous, elastic fabric. This has some impact on how we plug sockets into the network topology, and how we use the sockets after that.

Plugging Sockets Into the Topology

topprevnext

To create a connection between two nodes you use zmq_bind(3) in one node, and zmq_connect(3) in the other. As a general rule of thumb, the node which does zmq_bind(3) is a "server", sitting on a well-known network address, and the node which does zmq_connect(3) is a "client", with unknown or arbitrary network addresses. Thus we say that we "bind a socket to an endpoint" and "connect a socket to an endpoint", the endpoint being that well-known network address.

ØMQ connections are somewhat different from old-fashioned TCP connections. The main notable differences are:

  • They exist when a client does zmq_connect(3) to an endpoint, whether or not a server has already done zmq_bind(3) to that endpoint.
  • They are asynchronous, and have queues that magically exist where and when needed.
  • They may express a certain "messaging pattern", according to the type of socket used at each end.
  • One socket may have many outgoing and many incoming connections.
  • There is no zmq_accept() method. When a socket is bound to an endpoint it automatically starts accepting connections.
  • Your application code cannot work with these connections directly; they are encapsulated under the socket.

Many architectures follow some kind of client-server model, where the server is the component that is most stable, and the clients are the components that are most dynamic, i.e. they come and go the most. There are sometimes issues of addressing: servers will be visible to clients, but not necessarily vice-versa. So mostly it's obvious which node should be doing zmq_bind(3) (the server) and which should be doing zmq_connect(3) (the client). It also depends on the kind of sockets you're using, with some exceptions for unusual network architectures. We'll look at socket types later.

Now, imagine we start the client before we start the server. In traditional networking we get a big red Fail flag. But ØMQ lets us start and stop pieces arbitrarily. As soon as the client node does zmq_connect(3) the connection exists and that node can start to write messages to the socket. At some stage (hopefully before messages queue up so much that they start to get discarded, or the client blocks), the server comes alive, does a zmq_bind(3) and ØMQ starts to deliver messages.

A server node can bind to many endpoints and it can do this using a single socket. This means it will accept connections across different transports:

zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "ipc://myserver.ipc");

You cannot bind to the same endpoint twice, that will cause an exception.

Each time a client node does a zmq_connect(3) to any of these endpoints, the server node's socket gets another connection. There is no inherent limit to how many connections a socket can have. A client node can also connect to many endpoints using a single socket.

In most cases, which node acts as client, and which as server, is about network topology rather than message flow. However, there are cases (resending when connections are broken) where the same socket type will behave differently if it's a server or if it's a client.

What this means is that you should always think in terms of "servers" as stable parts of your topology, with more-or-less fixed endpoint addresses, and "clients" as dynamic parts that come and go. Then, design your application around this model. The chances that it will "just work" are much better like that.

Sockets have types. The socket type defines the semantics of the socket, its policies for routing messages inwards and outwards, queuing, etc. You can connect certain types of socket together, e.g. a publisher socket and a subscriber socket. Sockets work together in "messaging patterns". We'll look at this in more detail later.

It's the ability to connect sockets in these different ways that gives ØMQ its basic power as a message queuing system. There are layers on top of this, such as devices and topic routing, which we'll get to later. But essentially, with ØMQ you define your network architecture by plugging pieces together like a child's construction toy.

Using Sockets to Carry Data

topprevnext

To send and receive messages you use the zmq_send(3) and zmq_recv(3) methods. The names are conventional but ØMQ's I/O model is different enough from the TCP model that you will need time to get your head around it.

Figure 10 - TCP sockets are 1 to 1

fig10.png

Let's look at the main differences between TCP sockets and ØMQ sockets when it comes to carrying data:

  • ØMQ sockets carry messages, rather than bytes (as in TCP) or frames (as in UDP). A message is a length-specified blob of binary data. We'll come to messages shortly, their design is optimized for performance and thus somewhat tricky to understand.
  • ØMQ sockets do their I/O in a background thread. This means that messages arrive in a local input queue, and are sent from a local output queue, no matter what your application is busy doing. These are configurable memory queues, by the way.
  • ØMQ sockets can, depending on the socket type, be connected to (or from, it's the same) many other sockets. Where TCP emulates a one-to-one phone call, ØMQ implements one-to-many (like a radio broadcast), many-to-many (like a post office), many-to-one (like a mail box), and even one-to-one.
  • ØMQ sockets can send to many endpoints (creating a fan-out model), or receive from many endpoints (creating a fan-in model).

Figure 11 - ØMQ Sockets are N to N

fig11.png

So writing a message to a socket may send the message to one or many other places at once, and conversely, one socket will collect messages from all connections sending messages to it. The zmq_recv(3) method uses a fair-queuing algorithm so each sender gets an even chance.

The zmq_send(3) method does not actually send the message to the socket connection(s). It queues the message so that the I/O thread can send it asynchronously. It does not block except in some exception cases. So the message is not necessarily sent when zmq_send(3) returns to your application. If you created a message using zmq_msg_init_data(3) you cannot reuse the data or free it, otherwise the I/O thread will rapidly find itself writing overwritten or unallocated garbage. This is a common mistake for beginners. We'll see a little later how to properly work with messages.

Unicast Transports

topprevnext

ØMQ provides a set of unicast transports (inproc, ipc, and tcp) and multicast transports (epgm, pgm). Multicast is an advanced technique that we'll come to later. Don't even start using it unless you know that your fanout ratios will make 1-to-N unicast impossible.

For most common cases, use tcp, which is a disconnected TCP transport. It is elastic, portable, and fast enough for most cases. We call this 'disconnected' because ØMQ's tcp transport doesn't require that the endpoint exists before you connect to it. Clients and servers can connect and bind at any time, can go and come back, and it remains transparent to applications.

The inter-process transport, ipc, is like tcp except that it is abstracted from the LAN, so you don't need to specify IP addresses or domain names. This makes it better for some purposes, and we use it quite often in the examples in this book. ØMQ's ipc transport is disconnected, like tcp. It has one limitation: it does not work on Windows. This may be fixed in future versions of ØMQ. By convention we use endpoint names with an ".ipc" extension to avoid potential conflict with other file names. On UNIX systems, if you use ipc endpoints you need to create these with appropriate permissions otherwise they may not be shareable between processes running under different user ids. You must also make sure all processes can access the files, e.g. by running in the same working directory.

The inter-thread transport, inproc, is a connected signaling transport. It is much faster than tcp or ipc. This transport has a specific limitation compared to ipc and tcp: you must do bind before connect. This is something future versions of ØMQ may fix, but at present this defines you use inproc sockets. We create and bind one socket, start the child threads, which create and connect the other sockets.

ØMQ is Not a Neutral Carrier

topprevnext

A common question that newcomers to ØMQ ask (it's one I asked myself) is something like, "how do I write a XYZ server in ØMQ?" For example, "how do I write an HTTP server in ØMQ?"

The implication is that if we use normal sockets to carry HTTP requests and responses, we should be able to use ØMQ sockets to do the same, only much faster and better.

Sadly the answer is "this is not how it works". ØMQ is not a neutral carrier, it imposes a framing on the transport protocols it uses. This framing is not compatible with existing protocols, which tend to use their own framing. For example, compare an HTTP request, and a ØMQ request, both over TCP/IP.

Figure 12 - HTTP On the Wire

fig12.png

Where the HTTP request uses CR-LF as its simplest framing delimiter, and ØMQ uses a length-specified frame.

Figure 13 - ØMQ On the Wire

fig13.png

So you could write a HTTP-like protocol using ØMQ, using for example the request-reply socket pattern. But it would not be HTTP.

There is however a good answer to the question, "How can I make profitable use of ØMQ when making my new XYZ server?" You need to implement whatever protocol you want to speak in any case, but you can connect that protocol server (which can be extremely thin) to a ØMQ backend that does the real work. The beautiful part here is that you can then extend your backend with code in any language, running locally or remotely, as you wish. Zed Shaw's Mongrel2 web server is a great example of such an architecture.

I/O Threads

topprevnext

We said that ØMQ does I/O in a background thread. One I/O thread (for all sockets) is sufficient for all but the most extreme applications. This is the magic '1' that we use when creating a context, meaning "use one I/O thread":

typedef struct {
void *socket; // ZeroMQ socket to poll on
int fd; // OR, native file handle to poll on
short events; // Events to poll on
short revents; // Events returned after poll
} zmq_pollitem_t;

There is a major difference between a ØMQ application and a conventional networked application, which is that you don't create one socket per connection. One socket handles all incoming and outgoing connections for a particular point of work. E.g. when you publish to a thousand subscribers, it's via one socket. When you distribute work among twenty services, it's via one socket. When you collect data from a thousand web applications, it's via one socket.

This has a fundamental impact on how you write applications. A traditional networked application has one process or one thread per remote connection, and that process or thread handles one socket. ØMQ lets you collapse this entire structure into a single thread, and then break it up as necessary for scaling.

Core Messaging Patterns

topprevnext

Underneath the brown paper wrapping of ØMQ's socket API lies the world of messaging patterns. If you have a background in enterprise messaging, these will be vaguely familiar. But to most ØMQ newcomers they are a surprise, we're so used to the TCP paradigm where a socket represents another node.

Let's recap briefly what ØMQ does for you. It delivers blobs of data (messages) to nodes, quickly and efficiently. You can map nodes to threads, processes, or boxes. It gives your applications a single socket API to work with, no matter what the actual transport (like in-process, inter-process, TCP, or multicast). It automatically reconnects to peers as they come and go. It queues messages at both sender and receiver, as needed. It manages these queues carefully to ensure processes don't run out of memory, overflowing to disk when appropriate. It handles socket errors. It does all I/O in background threads. It uses lock-free techniques for talking between nodes, so there are never locks, waits, semaphores, or deadlocks.

But cutting through that, it routes and queues messages according to precise recipes called patterns. It is these patterns that provide ØMQ's intelligence. They encapsulate our hard-earned experience of the best ways to distribute data and work. ØMQ's patterns are hard-coded but future versions may allow user-definable patterns.

ØMQ patterns are implemented by pairs of sockets with matching types. In other words, to understand ØMQ patterns you need to understand socket types and how they work together. Mostly this just takes learning, there is little that is obvious at this level.

The built-in core ØMQ patterns are:

  • Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
  • Publish-subscribe, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
  • Pipeline, connects nodes in a fan-out / fan-in pattern that can have multiple steps, and loops. This is a parallel task distribution and collection pattern.

We looked at each of these in the first chapter. There's one more pattern that people tend to try to use when they still think of ØMQ in terms of traditional TCP sockets:

  • Exclusive pair, which connects two sockets in an exclusive pair. This is a low-level pattern for specific, advanced use-cases. We'll see an example at the end of this chapter.

The zmq_socket(3) man page is fairly clear about the patterns, it's worth reading several times until it starts to make sense. We'll look at each pattern and the use-cases it covers.

These are the socket combinations that are valid for a connect-bind pair (either side can bind):

  • PUB and SUB
  • REQ and REP
  • REQ and ROUTER
  • DEALER and REP
  • DEALER and ROUTER
  • DEALER and DEALER
  • ROUTER and ROUTER
  • PUSH and PULL
  • PAIR and PAIR

Any other combination will produce undocumented and unreliable results and future versions of ØMQ will probably return errors if you try them. You can and will of course bridge other socket types via code, i.e. read from one socket type and write to another.

High-level Messaging Patterns

topprevnext

These four core patterns are cooked-in to ØMQ. They are part of the ØMQ API, implemented in the core C++ library, and guaranteed to be available in all fine retail stores. If one day the Linux kernel includes ØMQ, for example, these patterns would be there.

On top, we add high-level patterns. We build these high-level patterns on top of ØMQ and implement them in whatever language we're using for our application. They are not part of the core library, do not come with the ØMQ package, and exist in their own space, as part of the ØMQ community.

One of the things we aim to provide you with this guide are a set of such high-level patterns, both small (how to handle messages sanely) to large (how to make a reliable publish-subscribe architecture).

Working with Messages

topprevnext

On the wire, ØMQ messages are blobs of any size from zero upwards, fitting in memory. You do your own serialization using Google Protocol Buffers, XDR, JSON, or whatever else your applications need to speak. It's wise to choose a data representation that is portable and fast, but you can make your own decisions about trade-offs.

In memory, ØMQ messages are zmq_msg_t structures (or classes depending on your language). Here are the basic ground rules for using ØMQ messages in C:

  • You create and pass around zmq_msg_t objects, not blocks of data.
  • To write a message from new data, you use zmq_msg_init_size(3) to create a message and at the same time allocate a block of data of some size. You then fill that data using memcpy[3], and pass the message to zmq_send(3).
  • To release (not destroy) a message you call zmq_msg_close(3). This drops a reference, and eventually ØMQ will destroy the message.

Here is a typical chunk of code working with messages, which should be familiar if you have been paying attention. This is from the zhelpers.h file we use in all the examples:

typedef struct {
void *socket; // 0MQ socket to poll on
int fd; // OR, native file handle to poll on
short events; // Events to poll on
short revents; // Events returned after poll
} zmq_pollitem_t;

You can easily extend this code to send and receive blobs of arbitrary length.

Note than when you have passed a message to zmq_send(3), ØMQ will clear the message, i.e. set the size to zero. You cannot send the same message twice, and you cannot access the message data after sending it.

If you want to send the same message more than once, create a second message, initialize it using zmq_msg_init(3) and then use zmq_msg_copy(3) to create a copy of the first message. This does not copy the data but the reference. You can then send the message twice (or more, if you create more copies) and the message will only be finally destroyed when the last copy is sent or closed.

ØMQ also supports multipart messages, which let you handle a list of blobs as a single message. This is widely used in real applications and we'll look at that later in this chapter and in Chapter Three.

Some other things that are worth knowing about messages:

  • ØMQ sends and receives them atomically, i.e. you get a whole message, or you don't get it at all.
  • ØMQ does not send a message right away but at some indeterminate later time.
  • You can send zero-length messages, e.g. for sending a signal from one thread to another.
  • A message must fit in memory. If you want to send files of arbitrary sizes, you should break them into pieces and send each piece as a separate message.
  • You must call zmq_msg_close(3) when finished with a message, in languages that don't automatically destroy objects when a scope closes.

And to be necessarily repetitive, do not use zmq_msg_init_data(3), yet. This is a zero-copy method and guaranteed to create trouble for you. There are far more important things to learn about ØMQ before you start to worry about shaving off microseconds.

Handling Multiple Sockets

topprevnext

In all the examples so far, the main loop of most examples has been:

  1. wait for message on socket
  2. process message
  3. repeat

What if we want to read from multiple sockets at the same time? The simplest way is to connect one socket to multiple endpoints and get ØMQ to do the fanin for us. This is legal if the remote endpoints are in the same pattern but it would be illegal to e.g. connect a PULL socket to a PUB endpoint. Fun, but illegal. If you start mixing patterns you break future scalability.

The right way is to use zmq_poll(3). An even better way might be to wrap zmq_poll(3) in a framework that turns it into a nice event-driven reactor, but it's significantly more work than we want to cover here.

Let's start with a dirty hack, partly for the fun of not doing it right, but mainly because it lets me show you how to do non-blocking socket reads. Here is a simple example of reading from two sockets using non-blocking reads. This rather confused program acts both as a subscriber to weather updates, and a worker for parallel tasks:

// Reading from multiple sockets
// This version uses a simple recv loop

#include "zhelpers.h"

int main (void)
{
// Connect to task ventilator
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");

// Connect to weather server
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);

// Process messages from both sockets
// We prioritize traffic from the task ventilator
while (1) {
char msg [256];
while (1) {
int size = zmq_recv (receiver, msg, 255, ZMQ_DONTWAIT);
if (size != -1) {
// Process task
}
else
break;
}
while (1) {
int size = zmq_recv (subscriber, msg, 255, ZMQ_DONTWAIT);
if (size != -1) {
// Process weather update
}
else
break;
}
// No activity, so sleep for 1 msec
s_sleep (1);
}
zmq_close (receiver);
zmq_close (subscriber);
zmq_ctx_destroy (context);
return 0;
}

msreader.c: Multiple socket reader

The cost of this approach is some additional latency on the first message (the sleep at the end of the loop, when there are no waiting messages to process). This would be a problem in applications where sub-millisecond latency was vital. Also, you need to check the documentation for nanosleep() or whatever function you use to make sure it does not busy-loop.

You can treat the sockets fairly by reading first from one, then the second rather than prioritizing them as we did in this example. This is called "fair-queuing", something that ØMQ does automatically when one socket receives messages from more than one source.

Now let's see the same little senseless application done right, using zmq_poll(3):

// Reading from multiple sockets
// This version uses zmq_poll()

#include "zhelpers.h"

int main (void)
{
// Connect to task ventilator
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");

// Connect to weather server
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);

// Process messages from both sockets
while (1) {
char msg [256];
zmq_pollitem_t items [] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ subscriber, 0, ZMQ_POLLIN, 0 }
};
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
int size = zmq_recv (receiver, msg, 255, 0);
if (size != -1) {
// Process task
}
}
if (items [1].revents & ZMQ_POLLIN) {
int size = zmq_recv (subscriber, msg, 255, 0);
if (size != -1) {
// Process weather update
}
}
}
zmq_close (subscriber);
zmq_ctx_destroy (context);
return 0;
}

mspoller.c: Multiple socket poller

Handling Errors and ETERM

topprevnext

ØMQ's error handling philosophy is a mix of fail-fast and resilience. Processes, we believe, should be as vulnerable as possible to internal errors, and as robust as possible against external attacks and errors. To give an analogy, a living cell will self-destruct if it detects a single internal error, yet it will resist attack from the outside by all means possible. Assertions, which pepper the ØMQ code, are absolutely vital to robust code, they just have to be on the right side of the cellular wall. And there should be such a wall. If it is unclear whether a fault is internal or external, that is a design flaw that needs to be fixed.

In C, assertions stop the application immediately with an error. In other languages you may get exceptions or halts.

When ØMQ detects an external fault it returns an error to the calling code. In some rare cases it drops messages silently, if there is no obvious strategy for recovering from the error. In a few places ØMQ still asserts on external faults, but these are considered bugs.

In most of the C examples we've seen so far there's been no error handling. Real code should do error handling on every single ØMQ call. If you're using a language binding other than C, the binding may handle errors for you. In C you do need to do this yourself. There are some simple rules, starting with POSIX conventions:

  • Methods that create objects will return NULL in case they fail.
  • Other methods will return 0 on success and other values (mostly -1) on an exceptional condition (usually failure).

There are two main exceptional conditions that you may want to handle as non-fatal:

  • When a thread calls zmq_recv(3) with the NOBLOCK option and there is no waiting data. ØMQ will return -1 and set errno to EAGAIN.
  • When a thread calls zmq_term(3) and other threads are doing blocking work. The zmq_term(3) call closes the context and all blocking calls exit with -1, and errno set to ETERM.

What this boils down to is that in most cases you can use assertions on ØMQ calls, like this, in C:

typedef struct {
void *socket; // 0MQ socket to poll on
int fd; // OR, native file handle to poll on
short events; // Events to poll on
short revents; // Events returned after poll
} zmq_pollitem_t;

In the first version of this code I put the assert() call around the function. Not a good idea, since an optimized build will turn all assert() macros to null and happily wallop those functions. Use a return code, and assert the return code.

Let's see how to shut down a process cleanly. We'll take the parallel pipeline example from the previous section. If we've started a whole lot of workers in the background, we now want to kill them when the batch is finished. Let's do this by sending a kill message to the workers. The best place to do this is the sink, since it really knows when the batch is done.

How do we connect the sink to the workers? The PUSH/PULL sockets are one-way only. The standard ØMQ answer is: create a new socket flow for each type of problem you need to solve. We'll use a publish-subscribe model to send kill messages to the workers:

  • The sink creates a PUB socket on a new endpoint.
  • Workers bind their input socket to this endpoint.
  • When the sink detects the end of the batch it sends a kill to its PUB socket.
  • When a worker detects this kill message, it exits.

It doesn't take much new code in the sink:

// Receive 0MQ string from socket and convert into C string
static char *
s_recv (void *socket) {
zmq_msg_t message;
zmq_msg_init (&message);
int size = zmq_msg_recv (&message, socket, 0);
if (size == -1)
return NULL;
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
zmq_msg_close (&message);
string [size] = 0;
return (string);
}

// Convert C string to 0MQ string and send to socket
static int
s_send (void *socket, char *string) {
zmq_msg_t message;
zmq_msg_init_size (&message, strlen (string));
memcpy (zmq_msg_data (&message), string, strlen (string));
int size = zmq_msg_send (&message, socket, 0);
zmq_msg_close (&message);
return (size);
}

Figure 14 - Parallel Pipeline with Kill Signaling

fig14.png

Here is the worker process, which manages two sockets (a PULL socket getting tasks, and a SUB socket getting control commands) using the zmq_poll(3) technique we saw earlier:

package ;

import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;

/**
* Parallel Task worker with kill signalling in Haxe
* Connects PULL socket to tcp://localhost:5557
* Collects workloads from ventilator via that socket
* Connects PUSH socket to tcp://localhost:5558
* Sends results to sink via that socket
*
* See: http://zguide.zeromq.org/page:all#Handling-Errors-and-ETERM
*
* Based on code from: http://zguide.zeromq.org/java:taskwork2
*/

class TaskWork2
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** TaskWork2 (see: http://zguide.zeromq.org/page:all#Handling-Errors-and-ETERM)");

// Socket to receive messages on
var receiver:ZMQSocket = context.socket(ZMQ_PULL);
receiver.connect("tcp://127.0.0.1:5557");

// Socket to send messages to
var sender:ZMQSocket = context.socket(ZMQ_PUSH);
sender.connect("tcp://127.0.0.1:5558");

// Socket to receive controller messages from
var controller:ZMQSocket = context.socket(ZMQ_SUB);
controller.connect("tcp://127.0.0.1:5559");
controller.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString(""));

var items:ZMQPoller = context.poller();
items.registerSocket(receiver, ZMQ.ZMQ_POLLIN());
items.registerSocket(controller, ZMQ.ZMQ_POLLIN());

var msgString:String;

// Process tasks forever
while (true) {
var numSocks = items.poll();
if (items.pollin(1)) {
// receiver socket has events
msgString = StringTools.trim(receiver.recvMsg().toString());
var sec:Float = Std.parseFloat(msgString) / 1000.0;
Lib.print(msgString + ".");

// Do the work
Sys.sleep(sec);

// Send results to sink
sender.sendMsg(Bytes.ofString(""));
}
if (items.pollin(2)) {
break; // Exit loop
}
}
receiver.close();
sender.close();
controller.close();
context.term();
}
}

taskwork2.hx: Parallel task worker with kill signaling

Here is the modified sink application. When it's finished collecting results it broadcasts a KILL message to all workers:

package ;

import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Parallel Task sink with kil signalling in Haxe
* Binds PULL request socket to tcp://localhost:5558
* Collects results from workers via this socket
*
* See: http://zguide.zeromq.org/page:all#Handling-Errors-and-ETERM
*
* Based on http://zguide.zeromq.org/cs:tasksink2
*
* Use with TaskVent.hx and TaskWork2.hx
*/

class TaskSink2
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** TaskSink2 (see: http://zguide.zeromq.org/page:all#Handling-Errors-and-ETERM)");

// Socket to receive messages on
var receiver:ZMQSocket = context.socket(ZMQ_PULL);
receiver.bind("tcp://127.0.0.1:5558");

// Socket to send control messages to workers
var controller:ZMQSocket = context.socket(ZMQ_PUB);
controller.bind("tcp://127.0.0.1:5559");

// Wait for start of batch
var msgString = StringTools.trim(receiver.recvMsg().toString());

// Start our clock now
var tStart = Sys.time();

// Process 100 messages
var task_nbr:Int;
for (task_nbr in 0 100) {
msgString = StringTools.trim(receiver.recvMsg().toString());
if (task_nbr % 10 == 0) {
Lib.println(":"); // Print a ":" every 10 messages
} else {
Lib.print(".");
}
}

// Calculate and report duation of batch
var tEnd = Sys.time();
Lib.println("Total elapsed time: " + Math.ceil((tEnd - tStart) * 1000) + " msec");

// Send kill signal to workers
controller.sendMsg(Bytes.ofString("KILL"));
Sys.sleep(1.0); // Give 0MQ time to deliver

// Shut down
receiver.close();
controller.close();
context.term();
}
}

tasksink2.hx: Parallel task sink with kill signaling

Handling Interrupt Signals

topprevnext

Realistic applications need to shutdown cleanly when interrupted with Ctrl-C or another signal such as SIGTERM. By default, these simply kill the process, meaning messages won't be flushed, files won't be closed cleanly, etc.

Here is how we handle a signal in various languages:

package ;

import haxe.io.Bytes;
import haxe.Stack;
import neko.Lib;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQException;

/**
* Signal Handling
*
* Call
*/

class Interrupt
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();
var receiver:ZMQSocket = context.socket(ZMQ_REP);
receiver.bind("tcp://127.0.0.1:5559");

Lib.println("** Interrupt (see: http://zguide.zeromq.org/page:all#Handling-Interrupt-Signals)");

ZMQ.catchSignals();

Lib.println ("\nPress Ctrl+C");

while (true) {
// Blocking read, will exit only on an interrupt (Ctrl+C)

try {
var msg:Bytes = receiver.recvMsg();
} catch (e:ZMQException) {
if (ZMQ.isInterrupted()) {
trace ("W: interrupt received, killing server …\n");
break;
}

// Handle other errors
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
}
// Close up gracefully
receiver.close();
context.term();


}
}

interrupt.hx: Handling Ctrl-C cleanly

The program provides s_catch_signals(), which traps Ctrl-C (SIGINT) and SIGTERM. When either of these signals arrive, the s_catch_signals() handler sets the global variable s_interrupted. Your application will not die automatically, you have to now explicitly check for an interrupt, and handle it properly. Here's how:

  • Call s_catch_signals() (copy this from interrupt.c) at the start of your main code. This sets-up the signal handling.
  • If your code is blocking in zmq_recv(3), zmq_poll(3), or zmq_send(3), when a signal arrives, the call will return with EINTR.
  • Wrappers like s_recv() return NULL if they are interrupted.
  • So, your application checks for an EINTR return code, a NULL return, and/or s_interrupted.

Here is a typical code fragment:

s_catch_signals ();
client = zmq_socket (...);
while (!s_interrupted) {
    char *message = s_recv (client);
    if (!message)
        break;          //  Ctrl-C used
}
zmq_close (client);

If you call s_catch_signals() and don't test for interrupts, the your application will become immune to Ctrl-C and SIGTERM, which may be useful, but is usually not.

Detecting Memory Leaks

topprevnext

Any long-running application has to manage memory correctly, or eventually it'll use up all available memory and crash. If you use a language that handles this automatically for you, congratulations. If you program in C or C++ or any other language where you're responsible for memory management, here's a short tutorial on using valgrind, which among other things will report on any leaks your programs have.

  • To install valgrind, e.g. on Ubuntu or Debian: sudo apt-get install valgrind.
  • By default, ØMQ will cause valgrind to complain a lot. To remove these warnings, create a file valgrind.supp that contains this:
{
   <socketcall_sendto>
   Memcheck:Param
   socketcall.sendto(msg)
   fun:send
   ...
}
{
   <socketcall_sendto>
   Memcheck:Param
   socketcall.send(msg)
   fun:send
   ...
}
  • Fix your applications to exit cleanly after Ctrl-C. For any application that exits by itself, that's not needed, but for long-running applications (like devices), this is essential, otherwise valgrind will complain about all currently allocated memory.
  • Build your application with -DDEBUG, if it's not your default setting. That ensures valgrind can tell you exactly where memory is being leaked.
  • Finally, run valgrind thus:
valgrind --tool=memcheck --leak-check=full --suppressions=valgrind.supp someprog

And after fixing any errors it reported, you should get the pleasant message:

==30536== ERROR SUMMARY: 0 errors from 0 contexts...

Multipart Messages

topprevnext

ØMQ lets us compose a message out of several frames, giving us a 'multipart message'. Realistic applications use multipart messages heavily, especially to make "envelopes". We'll look at them later. What we'll learn now is simply how to safely (but blindly) read and write multipart messages because otherwise the devices we write won't work with applications that use multipart messages.

When you work with multipart messages, each part is a zmq_msg item. E.g. if you are sending a message with five parts, you must construct, send, and destroy five zmq_msg items. You can do this in advance (and store the zmq_msg items in an array or structure), or as you send them, one by one.

Here is how we send the frames in a multipart message (we receive each frame into a message object):

void *control = zmq_socket (context, ZMQ_PUB);
zmq_bind (control, "tcp://*:5559");

// Send kill signal to workers
zmq_msg_init_data (&message, "KILL", 5);
zmq_msg_send (control, &message, 0);
zmq_msg_close (&message);

Here is how we receive and process all the parts in a message, be it single part or multipart:

zmq_msg_send (socket, &message, ZMQ_SNDMORE);

zmq_msg_send (socket, &message, ZMQ_SNDMORE);

zmq_msg_send (socket, &message, 0);

Some things to know about multipart messages:

  • When you send a multipart message, the first part (and all following parts) are only sent when you send the final part.
  • If you are using zmq_poll(3), when you receive the first part of a message, all the rest has also arrived.
  • You will receive all parts of a message, or none at all.
  • Each part of a message is a separate zmq_msg item.
  • You will receive all parts of a message whether or not you check the RCVMORE option.
  • On sending, ØMQ queues message frames in memory until the last is received, then sends them all.
  • There is no way to cancel a partially sent message, except by closing the socket.

Intermediates and Devices

topprevnext

Any connected set hits a complexity curve as the number of set members increases. A small number of members can all know about each other but as the set gets larger, the cost to each member of knowing all other interesting members grows linearly, and the overall cost of connecting members is factorial. The solution is to break sets into smaller ones, and use intermediates to connect the sets.

This pattern is extremely common in the real world and is why our societies and economies are filled with intermediaries who have no other real function than to reduce the complexity and scaling costs of larger networks. Intermediaries are typically called wholesalers, distributors, managers, etc.

A ØMQ network like any cannot grow beyond a certain size without needing intermediaries. In ØMQ, we call these "devices". When we use ØMQ we usually start building our applications as a set of nodes on a network with the nodes talking to each other, without intermediaries.

Figure 15 - Small-scale ØMQ Application

fig15.png

And then we extend the application across a wider network, placing devices in specific places and scaling up the number of nodes.

Figure 16 - Larger-scale ØMQ Application

fig16.png

ØMQ devices generally connect a set of 'frontend' sockets to a set of 'backend' sockets, though there are no strict design rules. They ideally run with no state, so that it becomes possible to stretch applications over as many intermediates as needed. You can run them as threads within a process, or as stand-alone processes. ØMQ provides some very basic devices but you will in practice develop your own.

ØMQ devices can do intermediation of addresses, services, queues, or any other abstraction you care to define above the message and socket layers. Different messaging patterns have different complexity issues and need different kinds of intermediation. For example, request-reply works well with queue and service abstractions, while publish-subscribe works well with streams or topics.

What's interesting about ØMQ as compared to traditional centralized brokers is that you can place devices precisely where you need them, and they can do the optimal intermediation.

A Publish-Subscribe Proxy Server

topprevnext

It is a common requirement to extend a publish-subscribe architecture over more than one network segment or transport. Perhaps there are a group of subscribers sitting at a remote location. Perhaps we want to publish to local subscribers via multicast, and to remote subscribers via TCP.

We're going to write a simple proxy server that sits in between a publisher and a set of subscribers, bridging two networks. This is perhaps the simplest case of a useful device. The device has two sockets, a frontend facing the internal network, where the weather server is sitting, and a backend facing subscribers on the external network. It subscribes to the weather service on the frontend socket, and republishes its data on the backend socket:

package ;

import haxe.io.Bytes;
import haxe.Stack;

import neko.Lib;

import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQException;

/**
* Weather proxy device.
*
* See: http://zguide.zeromq.org/page:all#A-Publish-Subscribe-Proxy-Server
*
* Use with WUClient and WUServer
*/

class WUProxy
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();
Lib.println("** WUProxy (see: http://zguide.zeromq.org/page:all#A-Publish-Subscribe-Proxy-Server)");

// This is where the weather service sits
var frontend:ZMQSocket = context.socket(ZMQ_SUB);
frontend.connect("tcp://localhost:5556");

// This is our public endpoint for subscribers
var backend:ZMQSocket = context.socket(ZMQ_PUB);
backend.bind("tcp://10.1.1.0:8100");

// Subscribe on everything
frontend.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString(""));

var more = false;
var msgBytes:Bytes;

ZMQ.catchSignals();

var stopped = false;
while (!stopped) {
try {
msgBytes = frontend.recvMsg();
more = frontend.hasReceiveMore();

// proxy it
backend.sendMsg(msgBytes, { if (more) SNDMORE else null; } );
if (!more) {
stopped = true;
}
} catch (e:ZMQException) {
if (ZMQ.isInterrupted()) {
stopped = true;
} else {
// Handle other errors
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
}
}
frontend.close();
backend.close();
context.term();
}
}

wuproxy.hx: Weather update proxy

We call this a proxy because it acts as a subscriber to publishers, and acts as a publisher to subscribers. That means you can slot this device into an existing network without affecting it (of course the new subscribers need to know to speak to the proxy).

Figure 17 - Forwarder Proxy Device

fig17.png

Note that this application is multipart safe. It correctly detects multipart messages and sends them as it reads them. If we did not set the SNDMORE option on outgoing multipart data, the final recipient would get a corrupted message. You should always make your devices multipart safe so that there is no risk they will corrupt the data they switch.

A Request-Reply Broker

topprevnext

Let's explore how to solve a problem of scale by writing a little message queuing broker in ØMQ. We'll look at the request-reply pattern for this case.

In the Hello World client-server application we have one client that talks to one service. However in real cases we usually need to allow multiple services as well as multiple clients. This lets us scale up the power of the service (many threads or processes or boxes rather than just one). The only constraint is that services must be stateless, all state being in the request or in some shared storage such as a database.

There are two ways to connect multiple clients to multiple servers. The brute-force way is to connect each client socket to multiple service endpoints. One client socket can connect to multiple service sockets, and the REQ socket will then load-balance requests among these services. Let's say you connect a client socket to three service endpoints, A, B, and C. The client makes requests R1, R2, R3, R4. R1 and R4 go to service A, R2 goes to B, and R3 goes to service C.

Figure 18 - Load-balancing of Requests

fig18.png

This design lets you add more clients cheaply. You can also add more services. Each client will load-balance its requests to the services. But each client has to know the service topology. If you have 100 clients and then you decide to add three more services, you need to reconfigure and restart 100 clients in order for the clients to know about the three new services.

That's clearly not the kind of thing we want to be doing at 3am when our supercomputing cluster has run out of resources and we desperately need to add a couple of hundred new service nodes. Too many stable pieces are like liquid concrete: knowledge is distributed and the more stable pieces you have, the more effort it is to change the topology. What we want is something sitting in between clients and services that centralizes all knowledge of the topology. Ideally, we should be able to add and remove services or clients at any time without touching any other part of the topology.

So we'll write a little message queuing broker that gives us this flexibility. The broker binds to two endpoints, a frontend for clients and a backend for services. It then uses zmq_poll(3) to monitor these two sockets for activity and when it has some, it shuttles messages between its two sockets. It doesn't actually manage any queues explicitly — ØMQ does that automatically on each socket.

When you use REQ to talk to REP you get a strictly synchronous request-reply dialog. The client sends a request, the service reads the request and sends a reply. The client then reads the reply. If either the client or the service try to do anything else (e.g. sending two requests in a row without waiting for a response) they will get an error.

But our broker has to be non-blocking. Obviously we can use zmq_poll(3) to wait for activity on either socket, but we can't use REP and REQ.

Luckily there are two sockets called DEALER and ROUTER that let you do non-blocking request-response. These sockets used to be called XREQ and XREP, and you may see these names in old code. The old names suggested that XREQ was an "extended REQ" and XREP was an "extended REP" but that's inaccurate. You'll see in Chapter Three how DEALER and ROUTER sockets let you build all kinds of asynchronous request-reply flows.

Now, we're just going to see how DEALER and ROUTER let us extend REQ-REP across a device, that is, our little broker.

In this simple stretched request-reply pattern, REQ talks to ROUTER and DEALER talks to REP. In between the DEALER and ROUTER we have to have code (like our broker) that pulls messages off the one socket and shoves them onto the other.

Figure 19 - Extended Request-reply

fig19.png

The request-reply broker binds to two endpoints, one for clients to connect to (the frontend socket) and one for services to connect to (the backend). To test this broker, you will want to change your services so they connect to the backend socket. Here are a client and service that show what I mean:

package ;

import neko.Lib;
import haxe.io.Bytes;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Hello World Client
* Connects REQ socket to tcp://localhost:5559
* Sends "Hello" to server, expects "World" back
*
* See: http://zguide.zeromq.org/page:all#A-Request-Reply-Broker
*
* Use with RrServer and RrBroker
*/

class RrClient
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** RrClient (see: http://zguide.zeromq.org/page:all#A-Request-Reply-Broker)");

var requester:ZMQSocket = context.socket(ZMQ_REQ);
requester.connect ("tcp://localhost:5559");

Lib.println ("Launch and connect client.");

// Do 10 requests, waiting each time for a response
for (i in 010) {
var requestString = "Hello ";
// Send the message
requester.sendMsg(Bytes.ofString(requestString));

// Wait for the reply
var msg:Bytes = requester.recvMsg();

Lib.println("Received reply " + i + ": [" + msg.toString() + "]");

}

// Shut down socket and context
requester.close();
context.term();
}
}

rrclient.hx: Request-reply client

Here is the service:

package ;

import haxe.io.Bytes;
import haxe.Stack;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQException;
import org.zeromq.ZMQSocket;

/**
* Hello World server in Haxe
* Binds REP to tcp://*:5560
* Expects "Hello" from client, replies with "World"
* Use with RrClient.hx and RrBroker.hx
*
*/

class RrServer
{

public static function main() {

var context:ZMQContext = ZMQContext.instance();

Lib.println("** RrServer (see: http://zguide.zeromq.org/page:all#A-Request-Reply-Broker)");

// Socket to talk to clients
var responder:ZMQSocket = context.socket(ZMQ_REP);
responder.connect("tcp://localhost:5560");

Lib.println("Launch and connect server.");

ZMQ.catchSignals();

while (true) {

try {
// Wait for next request from client
var request:Bytes = responder.recvMsg();

trace ("Received request:" + request.toString());

// Do some work
Sys.sleep(1);

// Send reply back to client
responder.sendMsg(Bytes.ofString("World"));
} catch (e:ZMQException) {
if (ZMQ.isInterrupted()) {
break;
}
// Handle other errors
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}

}
responder.close();
context.term();

}

}

rrserver.hx: Request-reply service

And here is the broker. You will see that it's multipart safe:

package ;
import haxe.io.Bytes;
import haxe.Stack;
import neko.Lib;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQException;

/**
* Simple request-reply broker
*
* Use with RrClient.hx and RrServer.hx
*/

class RrBroker
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** RrBroker (see: http://zguide.zeromq.org/page:all#A-Request-Reply-Broker)");

var frontend:ZMQSocket = context.socket(ZMQ_ROUTER);
var backend:ZMQSocket = context.socket(ZMQ_DEALER);
frontend.bind("tcp://*:5559");
backend.bind("tcp://*:5560");

Lib.println("Launch and connect broker.");

// Initialise poll set
var items:ZMQPoller = context.poller();
items.registerSocket(frontend, ZMQ.ZMQ_POLLIN());
items.registerSocket(backend, ZMQ.ZMQ_POLLIN());

var more = false;
var msgBytes:Bytes;

ZMQ.catchSignals();

while (true) {
try {
items.poll();
if (items.pollin(1)) {
while (true) {
// receive message
msgBytes = frontend.recvMsg();
more = frontend.hasReceiveMore();
// broker it to backend
backend.sendMsg(msgBytes, { if (more) SNDMORE else null; } );
if (!more) break;
}
}

if (items.pollin(2)) {
while (true) {
// receive message
msgBytes = backend.recvMsg();
more = backend.hasReceiveMore();
// broker it to frontend
frontend.sendMsg(msgBytes, { if (more) SNDMORE else null; } );
if (!more) break;
}
}
} catch (e:ZMQException) {
if (ZMQ.isInterrupted()) {
break;
}
// Handle other errors
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
}
frontend.close();
backend.close();
context.term();
}
}

rrbroker.hx: Request-reply broker

Using a request-reply broker makes your client-server architectures easier to scale since clients don't see services, and services don't see clients. The only stable node is the device in the middle.

Figure 20 - Request-reply Broker

fig20.png

Built-in Devices

topprevnext

ØMQ provides some built-in devices, though most advanced users write their own devices. The built-in devices are:

  • QUEUE, which is like the request-reply broker.
  • FORWARDER, which is like the pub-sub proxy server.
  • STREAMER, which is like FORWARDER but for pipeline flows.

To start a device, you call zmq_device(3) and pass it two sockets, one for the frontend and one for the backend:

while (1) {
zmq_msg_t message;
zmq_msg_init (&message);
zmq_msg_recv (socket, &message, 0);
// Process the message frame
zmq_msg_close (&message);
int64_t more;
size_t more_size = sizeof (more);
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
if (!more)
break; // Last message frame
}

Which if you start a QUEUE device is exactly like plugging the main body of the request-reply broker into your code at that spot. You need to create the sockets, bind or connect them, and possibly configure them, before calling zmq_device(3). It is trivial to do. Here is the request-reply broker re-written to call QUEUE and rebadged as an expensive-sounding "message queue" (people have charged houses for code that did less):

package ;

import org.zeromq.ZMQ;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQDevice;
import org.zeromq.ZContext;
import neko.Lib;

/**
* Simple message queuing broker
* Same as request-reply broker but using QUEUE device
* See: http://zguide.zeromq.org/page:all#Built-in-Devices
*
* Use with RrClient and RrServer
*/

class MsgQueue
{

public static function main() {
var context:ZContext = new ZContext();
Lib.println("** MsgQueue (see: http://zguide.zeromq.org/page:all#Built-in-Devices)");

// Socket facing clients
var frontend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
frontend.bind("tcp://*:5559");

// Socket facing services
var backend:ZMQSocket = context.createSocket(ZMQ_DEALER);
backend.bind("tcp://*:5560");

// Start build-in device
var device = new ZMQDevice(ZMQ_QUEUE, frontend, backend);

// We never get here
context.destroy();

}
}

msgqueue.hx: Message queue broker

The built-in devices do proper error handling, whereas the examples we have shown don't. Since you can configure the sockets as you need to, before starting the device, it's worth using the built-in devices when you can.

If you're like most ØMQ users, at this stage your mind is starting to think, "what kind of evil stuff can I do if I plug random socket types into devices?" The short answer is: don't do it. You can mix socket types but the results are going to be weird. So stick to using ROUTER/DEALER for queue devices, SUB/PUB for forwarders and PULL/PUSH for streamers.

When you start to need other combinations, it's time to write your own devices.

Multithreading with ØMQ

topprevnext

ØMQ is perhaps the nicest way ever to write multithreaded (MT) applications. Whereas as ØMQ sockets require some readjustment if you are used to traditional sockets, ØMQ multithreading will take everything you know about writing MT applications, throw it into a heap in the garden, pour gasoline over it, and set it alight. It's a rare book that deserves burning, but most books on concurrent programming do.

To make utterly perfect MT programs (and I mean that literally) we don't need mutexes, locks, or any other form of inter-thread communication except messages sent across ØMQ sockets.

By "perfect" MT programs I mean code that's easy to write and understand, that works with one technology in any language and on any operating system, and that scales across any number of CPUs with zero wait states and no point of diminishing returns.

If you've spent years learning tricks to make your MT code work at all, let alone rapidly, with locks and semaphores and critical sections, you will be disgusted when you realize it was all for nothing. If there's one lesson we've learned from 30+ years of concurrent programming it is: just don't share state. It's like two drunkards trying to share a beer. It doesn't matter if they're good buddies. Sooner or later they're going to get into a fight. And the more drunkards you add to the pavement, the more they fight each other over the beer. The tragic majority of MT applications look like drunken bar fights.

The list of weird problems that you need to fight as you write classic shared-state MT code would be hilarious if it didn't translate directly into stress and risk, as code that seems to work suddenly fails under pressure. Here is a list of "11 Likely Problems In Your Multithreaded Code" from a large firm with world-beating experience in buggy code: forgotten synchronization, incorrect granularity, read and write tearing, lock-free reordering, lock convoys, two-step dance, and priority inversion.

Yeah, we also counted seven, not eleven. That's not the point though. The point is, do you really want that code running the power grid or stock market to start getting two-step lock convoys at 3pm on a busy Thursday? Who cares what the terms actually mean. This is not what turned us on to programming, fighting ever more complex side-effects with ever more complex hacks.

Some widely used metaphors, despite being the basis for billion-dollar industries, are fundamentally broken, and shared state concurrency is one of them. Code that wants to scale without limit does it like the Internet does, by sending messages and sharing nothing except a common contempt for broken programming metaphors.

You should follow some rules to write happy multithreaded code with ØMQ:

  • You MUST NOT access the same data from multiple threads. Using classic MT techniques like mutexes are an anti-pattern in ØMQ applications. The only exception to this is a ØMQ context object, which is threadsafe.
  • You MUST create a ØMQ context for your process, and pass that to all threads that you want to connect via inproc sockets.
  • You MAY treat threads as separate tasks, with their own context, but these threads cannot communicate over inproc. However they will be easier to break into standalone processes afterwards.
  • You MUST NOT share ØMQ sockets between threads. ØMQ sockets are not threadsafe. Technically it's possible to do this, but it demands semaphores, locks, or mutexes. This will make your application slow and fragile. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.

If you need to start more than one device in an application, for example, you will want to run each in their own thread. It is easy to make the error of creating the device sockets in one thread, and then passing the sockets to the device in another thread. This may appear to work but will fail randomly. Remember: Do not use or close sockets except in the thread that created them.

If you follow these rules, you can quite easily split threads into separate processes, when you need to. Application logic can sit in threads, processes, boxes: whatever your scale needs.

ØMQ uses native OS threads rather than virtual "green" threads. The advantage is that you don't need to learn any new threading API, and that ØMQ threads map cleanly to your operating system. You can use standard tools like Intel's ThreadChecker to see what your application is doing. The disadvantages are that your code, when it for instance starts new threads, won't be portable, and that if you have a huge number of threads (thousands), some operating systems will get stressed.

Let's see how this works in practice. We'll turn our old Hello World server into something more capable. The original server was a single thread. If the work per request is low, that's fine: one ØMQ thread can run at full speed on a CPU core, with no waits, doing an awful lot of work. But realistic servers have to do non-trivial work per request. A single core may not be enough when 10,000 clients hit the server all at once. So a realistic server must start multiple worker threads. It then accepts requests as fast as it can, and distributes these to its worker threads. The worker threads grind through the work, and eventually send their replies back.

You can of course do all this using a queue device and external worker processes, but often it's easier to start one process that gobbles up sixteen cores, than sixteen processes, each gobbling up one core. Further, running workers as threads will cut out a network hop, latency, and network traffic.

The MT version of the Hello World service basically collapses the queue device and workers into a single process:

package ;

import haxe.io.Bytes;
import haxe.Stack;
import neko.Lib;
import neko.Sys;
#if !php
import neko.vm.Thread;
#end
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQException;

/**
* Multithreaded Hello World Server
*
* See: http://zguide.zeromq.org/page:all#Multithreading-with-MQ
* Use with HelloWorldClient.hx
*
*/

class MTServer
{

static function worker() {
var context:ZMQContext = ZMQContext.instance();

// Socket to talk to dispatcher
var responder:ZMQSocket = context.socket(ZMQ_REP);
#if (neko || cpp)
responder.connect("inproc://workers");
#elseif php
responder.connect("ipc://workers.ipc");
#end
ZMQ.catchSignals();

while (true) {

try {
// Wait for next request from client
var request:Bytes = responder.recvMsg();

trace ("Received request:" + request.toString());

// Do some work
Sys.sleep(1);

// Send reply back to client
responder.sendMsg(Bytes.ofString("World"));
} catch (e:ZMQException) {
if (ZMQ.isInterrupted()) {
break;
}
trace (e.toString());
}
}
responder.close();
return null;
}

/**
* Implements a reqeust/reply QUEUE broker device
* Returns if poll is interrupted
* @param ctx
* @param frontend
* @param backend
*/

static function queueDevice(ctx:ZMQContext, frontend:ZMQSocket, backend:ZMQSocket) {

// Initialise pollset
var poller:ZMQPoller = ctx.poller();
poller.registerSocket(frontend, ZMQ.ZMQ_POLLIN());
poller.registerSocket(backend, ZMQ.ZMQ_POLLIN());

ZMQ.catchSignals();

while (true) {
try {
poller.poll();
if (poller.pollin(1)) {
var more:Bool = true;
while (more) {
// Receive message
var msg = frontend.recvMsg();
more = frontend.hasReceiveMore();

// Broker it
backend.sendMsg(msg, { if (more) SNDMORE else null; } );
}
}

if (poller.pollin(2)) {
var more:Bool = true;
while (more) {
// Receive message
var msg = backend.recvMsg();
more = backend.hasReceiveMore();

// Broker it
frontend.sendMsg(msg, { if (more) SNDMORE else null; } );
}
}
} catch (e:ZMQException) {
if (ZMQ.isInterrupted()) {
break;
}
// Handle other errors
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));

}

}

}
public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println ("** MTServer (see: http://zguide.zeromq.org/page:all#Multithreading-with-MQ)");

// Socket to talk to clients
var clients:ZMQSocket = context.socket(ZMQ_ROUTER);
clients.bind ("tcp://*:5556");

// Socket to talk to workers
var workers:ZMQSocket = context.socket(ZMQ_DEALER);

#if (neko || cpp)
workers.bind ("inproc://workers");

// Launch worker thread pool
var workerThreads:List<Thread> = new List<Thread>();
for (thread_nbr in 0 5) {
workerThreads.add(Thread.create(worker));
}
#elseif php
workers.bind ("ipc://workers.ipc");

// Launch pool of worker processes, due to php's lack of thread support
// See: https://github.com/imatix/zguide/blob/master/examples/PHP/mtserver.php
for (thread_nbr in 0 5) {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
// Running in child process
worker();
exit();
}'
);
}
#end
// Invoke request / reply broker (aka QUEUE device) to connect clients to workers
queueDevice(context, clients, workers);

// Close up shop
clients.close();
workers.close();
context.term();
}
}

mtserver.hx: Multithreaded service

All the code should be recognizable to you by now. How it works:

  • The server starts a set of worker threads. Each worker thread creates a REP socket and then processes requests on this socket. Worker threads are just like single-threaded servers. The only differences are the transport (inproc instead of tcp), and the bind-connect direction.
  • The server creates a ROUTER socket to talk to clients and binds this to its external interface (over tcp).
  • The server creates a DEALER socket to talk to the workers and binds this to its internal interface (over inproc).
  • The server starts a QUEUE device that connects the two sockets. The QUEUE device keeps a single queue for incoming requests, and distributes those out to workers. It also routes replies back to their origin.

Note that creating threads is not portable in most programming languages. The POSIX library is pthreads, but on Windows you have to use a different API. We'll see in Chapter Three how to wrap this in a portable API.

Here the 'work' is just a one-second pause. We could do anything in the workers, including talking to other nodes. This is what the MT server looks like in terms of ØMQ sockets and nodes. Note how the request-reply chain is REQ-ROUTER-queue-DEALER-REP.

Figure 21 - Multithreaded Server

fig21.png

Signaling between Threads

topprevnext

When you start making multithreaded applications with ØMQ, you'll hit the question of how to coordinate your threads. Though you might be tempted to insert 'sleep' statements, or use multithreading techniques such as semaphores or mutexes, the only mechanism that you should use are ØMQ messages. Remember the story of The Drunkards and the Beer Bottle.

Let's make three threads that signal each other when they are ready. In this example we use PAIR sockets over the inproc transport:

package ;

import haxe.io.Bytes;
#if !php
import neko.vm.Thread;
#end
import neko.Lib;

import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Multi-threaded relay in haXe
*
*/

class MTRelay
{

static function step1() {
var context:ZMQContext = ZMQContext.instance();

// Connect to step2 and tell it we are ready
var xmitter:ZMQSocket = context.socket(ZMQ_PAIR);
#if (neko || cpp)
xmitter.connect("inproc://step2");
#elseif php
xmitter.connect("ipc://step2.ipc");
#end
xmitter.sendMsg(Bytes.ofString("READY"));
xmitter.close();
}

static function step2() {
var context:ZMQContext = ZMQContext.instance();

// Bind inproc socket before starting step 1
var receiver:ZMQSocket = context.socket(ZMQ_PAIR);
#if (neko || cpp)
receiver.bind("inproc://step2");
Thread.create(step1);
#elseif php
receiver.bind("ipc://step2.ipc");
untyped __php__('
$pid = pcntl_fork();
if($pid == 0) {
step1();
exit();
}'
);
#end
// Wait for signal and pass it on
var msgBytes = receiver.recvMsg();
receiver.close();

// Connect to step3 and tell it we are ready
var xmitter:ZMQSocket = context.socket(ZMQ_PAIR);
#if (neko || cpp)
xmitter.connect("inproc://step3");
#elseif php
xmitter.connect("ipc://step3.ipc");
#end
xmitter.sendMsg(Bytes.ofString("READY"));
xmitter.close();
}

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println ("** MTRelay (see: http://zguide.zeromq.org/page:all#Signaling-between-Threads)");

// This main thread represents Step 3

// Bind to inproc: endpoint then start upstream thread
var receiver:ZMQSocket = context.socket(ZMQ_PAIR);
#if (neko || cpp)
receiver.bind("inproc://step3");

// Step2 relays the signal to step 3
Thread.create(step2);
#elseif php
// Use child processes instead of Threads
receiver.bind("ipc://step3.ipc");
// Step2 relays the signal to step 3
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
step2();
exit();
}'
);

#end
// Wait for signal
var msgBytes = receiver.recvMsg();
receiver.close();

trace ("Test successful!");
context.term();
}
}

mtrelay.hx: Multithreaded relay

Figure 22 - The Relay Race

fig22.png

This is a classic pattern for multithreading with ØMQ:

  1. Two threads communicate over inproc, using a shared context.
  2. The parent thread creates one socket, binds it to an inproc:// endpoint, and then starts the child thread, passing the context to it.
  3. The child thread creates the second socket, connects it to that inproc:// endpoint, and then signals to the parent thread that it's ready.

Note that multithreading code using this pattern is not scalable out to processes. If you use inproc and socket pairs, you are building a tightly-bound application. Do this when low latency is really vital. For all normal apps, use one context per thread, and ipc or tcp. Then you can easily break your threads out to separate processes, or boxes, as needed.

This is the first time we've shown an example using PAIR sockets. Why use PAIR? Other socket combinations might seem to work but they all have side-effects that could interfere with signaling:

  • You can use PUSH for the sender and PULL for the receiver. This looks simple and will work, but remember that PUSH will load-balance messages to all available receivers. If you by accident start two receivers (e.g. you already have one running and you start a second), you'll "lose" half of your signals. PAIR has the advantage of refusing more than one connection, the pair is exclusive.
  • You can use DEALER for the sender and ROUTER for the receiver. ROUTER however wraps your message in an "envelope", meaning your zero-size signal turns into a multipart message. If you don't care about the data, and treat anything as a valid signal, and if you don't read more than once from the socket, that won't matter. If however you decide to send real data, you will suddenly find ROUTER providing you with "wrong" messages. DEALER also load-balances, giving the same risk as PUSH.
  • You can use PUB for the sender and SUB for the receiver. This will correctly deliver your messages exactly as you sent them and PUB does not load-balance as PUSH or DEALER do. However you need to configure the subscriber with an empty subscription, which is annoying. Worse, the reliability of the PUB-SUB link is timing dependent and messages can get lost if the SUB socket is connecting while the PUB socket is sending its messages.

For these reasons, PAIR makes the best choice for coordination between pairs of threads.

Node Coordination

topprevnext

When you want to coordinate nodes, PAIR sockets won't work well any more. This is one of the few areas where the strategies for threads and nodes are different. Principally nodes come and go whereas threads are stable. PAIR sockets do not automatically reconnect if the remote node goes away and comes back.

The second significant difference between threads and nodes is that you typically have a fixed number of threads but a more variable number of nodes. Let's take one of our earlier scenarios (the weather server and clients) and use node coordination to ensure that subscribers don't lose data when starting up.

This is how the application will work:

  • The publisher knows in advance how many subscribers it expects. This is just a magic number it gets from somewhere.
  • The publisher starts up and waits for all subscribers to connect. This is the node coordination part. Each subscriber subscribes and then tells the publisher it's ready via another socket.
  • When the publisher has all subscribers connected, it starts to publish data.

In this case we'll use a REQ-REP socket flow to synchronize subscribers and publisher. Here is the publisher:

package ;
import haxe.io.Bytes;
import neko.Lib;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Synchronised publisher
*
* See: http://zguide.zeromq.org/page:all#Node-Coordination
*
* Use with SyncSub.hx
*/

class SyncPub
{
static inline var SUBSCRIBERS_EXPECTED = 10;

public static function main() {
var context:ZMQContext = ZMQContext.instance();
Lib.println("** SyncPub (see: http://zguide.zeromq.org/page:all#Node-Coordination)");

// Socket to talk to clients
var publisher:ZMQSocket = context.socket(ZMQ_PUB);
publisher.bind("tcp://*:5561");

// Socket to receive signals
var syncService:ZMQSocket = context.socket(ZMQ_REP);
syncService.bind("tcp://*:5562");

// get synchronisation from subscribers
var subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {
// wait for synchronisation request
var msgBytes = syncService.recvMsg();

// send synchronisation reply
syncService.sendMsg(Bytes.ofString(""));
subscribers++;
}

// Now broadcast exactly 1m updates followed by END
for (update_nbr in 0 1000000) {
publisher.sendMsg(Bytes.ofString("Rhubarb"));
}
publisher.sendMsg(Bytes.ofString("END"));

publisher.close();
syncService.close();
context.term();
}
}

syncpub.hx: Synchronized publisher

Figure 23 - Pub-Sub Synchronization

fig23.png

And here is the subscriber:

package ;

import neko.Lib;
import haxe.io.Bytes;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Synchronised subscriber
*
* See: http://zguide.zeromq.org/page:all#Node-Coordination
*
* Use with SyncPub.hx
*/

class SyncSub
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** SyncSub (see: http://zguide.zeromq.org/page:all#Node-Coordination)");

// First connect our subscriber socket
var subscriber:ZMQSocket = context.socket(ZMQ_SUB);
subscriber.connect("tcp://127.0.0.1:5561");
subscriber.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString(""));

// 0MQ is so fast, we need to wait a little while
Sys.sleep(1.0);

// Second, synchronise with publisher
var syncClient:ZMQSocket = context.socket(ZMQ_REQ);
syncClient.connect("tcp://127.0.0.1:5562");

// Send a synchronisation request
syncClient.sendMsg(Bytes.ofString(""));

// Wait for a synchronisation reply
var msgBytes:Bytes = syncClient.recvMsg();

// Third, get our updates and report how many we got
var update_nbr = 0;
while (true) {
msgBytes = subscriber.recvMsg();
if (msgBytes.toString() == "END") {
break;
}
msgBytes = null;
update_nbr++;
}
Lib.println("Received " + update_nbr + " updates\n");

subscriber.close();
syncClient.close();
context.term();
}
}

syncsub.hx: Synchronized subscriber

This Linux shell script will start ten subscribers and then the publisher:

echo "Starting subscribers..."
for a in 1 2 3 4 5 6 7 8 9 10; do
    syncsub &
done
echo "Starting publisher..."
syncpub

Which gives us this satisfying output:

Starting subscribers...
Starting publisher...
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates

We can't assume that the SUB connect will be finished by the time the REQ/REP dialog is complete. There are no guarantees that outbound connects will finish in any order whatsoever, if you're using any transport except inproc. So, the example does a brute-force sleep of one second between subscribing, and sending the REQ/REP synchronization.

A more robust model could be:

  • Publisher opens PUB socket and starts sending "Hello" messages (not data).
  • Subscribers connect SUB socket and when they receive a Hello message they tell the publisher via a REQ/REP socket pair.
  • When the publisher has had all the necessary confirmations, it starts to send real data.

Zero Copy

topprevnext

We teased you in Chapter One, when you were still a ØMQ newbie, about zero-copy. If you survived this far, you are probably ready to use zero-copy. However, remember that there are many roads to Hell, and premature optimization is not the most enjoyable nor profitable one, by far. In English, trying to do zero-copy properly while your architecture is not perfect is a waste of time and will make things worse, not better.

ØMQ's message API lets you can send and receive messages directly from and to application buffers without copying data. Given that ØMQ sends messages in the background, zero-copy needs some extra sauce.

To do zero-copy you use zmq_msg_init_data(3) to create a message that refers to a block of data already allocated on the heap with malloc(), and then you pass that to zmq_send(3). When you create the message you also pass a function that ØMQ will call to free the block of data, when it has finished sending the message. This is the simplest example, assuming 'buffer' is a block of 1000 bytes allocated on the heap:

zmq_proxy (frontend, backend, capture);

There is no way to do zero-copy on receive: ØMQ delivers you a buffer that you can store as long as you wish but it will not write data directly into application buffers.

On writing, ØMQ's multipart messages work nicely together with zero-copy. In traditional messaging you need to marshal different buffers together into one buffer that you can send. That means copying data. With ØMQ, you can send multiple buffers coming from different sources as individual message frames. We send each field as a length-delimited frame. To the application it looks like a series of send and recv calls. But internally the multiple parts get written to the network and read back with single system calls, so it's very efficient.

Pub-sub Message Envelopes

topprevnext

We've looked briefly at multipart messages. Let's now look at their main use-case, which is message envelopes. An envelope is a way of safely packaging up data with an address, without touching the data itself.

In the pub-sub pattern, the envelope at least holds the subscription key for filtering but you can also add the sender identity in the envelope.

If you want to use pub-sub envelopes, you make them yourself. It's optional, and in previous pub-sub examples we didn't do this. Using a pub-sub envelope is a little more work for simple cases but it's cleaner especially for real cases, where the key and the data are naturally separate things. It's also faster, if you are writing the data directly from an application buffer.

Here is what a publish-subscribe message with an envelope looks like:

Figure 24 - Pub-sub Envelope with Separate Key

fig24.png

Recall that pub-sub matches messages based on the prefix. Putting the key into a separate frame makes the matching very obvious, since there is no chance an application will accidentally match on part of the data.

Here is a minimalist example of how pub-sub envelopes look in code. This publisher sends messages of two types, A and B. The envelope holds the message type:

package ;
import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQException;
import org.zeromq.ZMQSocket;

/**
* Pubsub envelope publisher
*
* See: http://zguide.zeromq.org/page:all#Pub-sub-Message-Envelopes
*
* Use with PSEnvSub
*/

class PSEnvPub
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** PSEnvPub (see: http://zguide.zeromq.org/page:all#Pub-sub-Message-Envelopes)");

var publisher:ZMQSocket = context.socket(ZMQ_PUB);
publisher.bind("tcp://*:5563");

ZMQ.catchSignals();

while (true) {
publisher.sendMsg(Bytes.ofString("A"), SNDMORE);
publisher.sendMsg(Bytes.ofString("We don't want to see this"));
publisher.sendMsg(Bytes.ofString("B"), SNDMORE);
publisher.sendMsg(Bytes.ofString("We would like to see this"));
Sys.sleep(1.0);
}
// We never get here but clean up anyhow
publisher.close();
context.term();
}
}

psenvpub.hx: Pub-sub envelope publisher

The subscriber only wants messages of type B:

package ;
import haxe.io.Bytes;
import neko.Lib;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Pubsub envelope subscriber
*
* See: http://zguide.zeromq.org/page:all#Pub-sub-Message-Envelopes
*
* Use with PSEnvPub
*/

class PSEnvSub
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** PSEnvSub (see: http://zguide.zeromq.org/page:all#Pub-sub-Message-Envelopes)");

var subscriber:ZMQSocket = context.socket(ZMQ_SUB);
subscriber.connect("tcp://127.0.0.1:5563");
subscriber.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString("B"));

while (true) {
var msgAddress:Bytes = subscriber.recvMsg();
// Read message contents
var msgContent:Bytes = subscriber.recvMsg();
trace (msgAddress.toString() + " " + msgContent.toString() + "\n");
}
// We never get here but clean up anyway
subscriber.close();
context.term();
}
}

psenvsub.hx: Pub-sub envelope subscriber

When you run the two programs, the subscriber should show you this:

[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
...

This examples shows that the subscription filter rejects or accepts the entire multipart message (key plus data). You won't get part of a multipart message, ever.

If you subscribe to multiple publishers and you want to know their identity so that you can send them data via another socket (and this is a fairly typical use-case), you create a three-part message:

Figure 25 - Pub-sub Envelope with Sender Address

fig25.png

High Water Marks

topprevnext

When you can send messages rapidly from process to process, you soon discover that memory is a precious resource, and one that's trivially filled up. A few seconds delay somewhere in a process can turn into a backlog that blows up a server, unless you understand the problem and take precautions.

The problem is this: if you have process A sending messages to process B, which suddenly gets very busy (garbage collection, CPU overload, whatever), then what happens to the messages that process A wants to send? Some will sit in B's network buffers. Some will sit on the Ethernet wire itself. Some will sit in A's network buffers. And the rest will accumulate in A's memory. If you don't take some precaution, A can easily run out of memory and crash. It is a consistent, classic problem with message brokers.

What are the answers? One is to pass the problem upstream. A is getting the messages from somewhere else. So tell that process, "stop!" And so on. This is called "flow control". It sounds great, but what if you're sending out a Twitter feed? Do you tell the whole world to stop tweeting while B gets its act together?

Flow control works in some cases but in others, the transport layer can't tell the application layer "stop" any more than a subway system can tell a large business, "please keep your staff at work another half an hour, I'm too busy".

The answer for messaging is to set limits on the size of buffers, and then when we reach those limits, take some sensible action. In most cases (not for a subway system, though), the answer is to throw away messages. In a few others, it's to wait.

ØMQ uses the concept of "high water mark" or HWM to define the capacity of its internal pipes. Each connection out of a socket or into a socket has its own pipe, and HWM capacity.

In ØMQ/2.x the HWM was set to infinite by default. In ØMQ/3.x it's set to 1,000 by default, which is more sensible. If you're using ØMQ/2.x you should always set a HWM on your sockets, be it 1,000 to match ØMQ/3.x or another figure that takes into account your message sizes.

The high water mark affects both the transmit and receive buffers of a single socket. Some sockets (PUB, PUSH) only have transmit buffers. Some (SUB, PULL, REQ, REP) only have receive buffers. Some (DEALER, ROUTER, PAIR) have both transmit and receive buffers.

When your socket reaches its high-water mark, it will either block or drop data depending on the socket type. PUB sockets will drop data if they reach their high-water mark, while other socket types will block.

Over the inproc transport, the sender and receiver share the same buffers, so the real HWM is the sum of the HWM set by both sides. This means in effect that if one side does not set a HWM, there is no limit to the buffer size.

A Bare Necessity

topprevnext

ØMQ is like a box of pieces that plug together, the only limitation being your imagination and sobriety.

The scalable elastic architecture you get should be an eye-opener. You might need a coffee or two first. Don't make the mistake I made once and buy exotic German coffee labeled Entkoffeiniert. That does not mean "Delicious". Scalable elastic architectures are not a new idea - flow-based programming and languages like Erlang already worked like this - but ØMQ makes it easier to use than ever before.

As Gonzo Diethelm said, 'My gut feeling is summarized in this sentence: "if ØMQ didn't exist, it would be necessary to invent it". Meaning that I ran into ØMQ after years of brain-background processing, and it made instant sense… ØMQ simply seems to me a "bare necessity" nowadays.'

Chapter Three - Advanced Request-Reply Patterns

topprevnext

In Chapter Two we worked through the basics of using ØMQ by developing a series of small applications, each time exploring new aspects of ØMQ. We'll continue this approach in this chapter, as we explore advanced patterns built on top of ØMQ's core request-reply pattern.

We'll cover:

  • How to create and use message envelopes for request-reply.
  • How to use the REQ, REP, DEALER, and ROUTER sockets.
  • How to set manual reply addresses using identities.
  • How to do custom random scatter routing.
  • How to do custom least-recently used routing.
  • How to build a higher-level message class.
  • How to build a basic request-reply broker.
  • How to choose good names for sockets.
  • How to simulate a cluster of clients and workers.
  • How to build a scalable cloud of request-reply clusters.
  • How to use pipeline sockets for monitoring threads.

Request-Reply Envelopes

topprevnext

In the request-reply pattern, the envelope holds the return address for replies. It is how a ØMQ network with no state can create round-trip request-reply dialogs.

You don't in fact need to understand how request-reply envelopes work to use them for common cases. When you use REQ and REP, your sockets build and use envelopes automatically. When you write a device, and we covered this in the last chapter, you just need to read and write all the parts of a message. ØMQ implements envelopes using multipart data, so if you copy multipart data safely, you implicitly copy envelopes too.

However, getting under the hood and playing with request-reply envelopes is necessary for advanced request-reply work. It's time to explain how ROUTER works, in terms of envelopes:

  • When you receive a message from a ROUTER socket, it shoves a brown paper envelope around the message and scribbles on with indelible ink, "This came from Lucy". Then it gives that to you. That is, the ROUTER socket gives you what came off the wire, wrapped up in an envelope with the reply address on it.
  • When you send a message to a ROUTER socket, it rips off that brown paper envelope, tries to read its own handwriting, and if it knows who "Lucy" is, sends the contents back to Lucy. That is the reverse process of receiving a message.

If you leave the brown envelope alone, and then pass that message to another ROUTER socket (e.g. by sending to a DEALER connected to a ROUTER), the second ROUTER socket will in turn stick another brown envelope on it, and scribble the name of that DEALER on it.

The whole point of this is that each ROUTER knows how to send replies back to the right place. All you need to do, in your application, is respect the brown envelopes. Now the REP socket makes sense. It carefully slices open the brown envelopes, one by one, keeps them safely aside, and gives you (the application code that owns the REP socket) the original message. When you send the reply, it re-wraps the reply in the brown paper envelopes, so it can hand the resulting brown package back to the ROUTER sockets down the chain.

Which lets you insert ROUTER-DEALER devices into a request-reply pattern like this:

[REQ] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [ROUTER--DEALER] <--> [REP]
...etc.

If you connect a REQ socket to a ROUTER socket, and send one request message, you will get a message that consists of three frames: a reply address, an empty message frame, and the 'real' message.

Figure 26 - Single-hop Request-reply Envelope

fig26.png

Breaking this down:

  • The data in frame 3 is what the sending application sends to the REQ socket.
  • The empty message frame in frame 2 is prepended by the REQ socket when it sends the message to the ROUTER socket.
  • The reply address in frame 1 is prepended by the ROUTER before it passes the message to the receiving application.

Now if we extend this with a chain of devices, we get envelope on envelope, with the newest envelope always stuck at the beginning of the stack.

Figure 27 - Multihop Request-reply Envelope

fig27.png

Here now is a more detailed explanation of the four socket types we use for request-reply patterns:

  • DEALER just load-balances (deals out) the messages you send to all connected peers, and fair-queues (deals in) the messages it receives. It is exactly like a PUSH and PULL socket combined.
  • REQ prepends an empty message frame to every message you send, and removes the empty message frame from each message you receive. It then works like DEALER (and in fact is built on DEALER) except it also imposes a strict send / receive cycle.
  • ROUTER prepends an envelope with reply address to each message it receives, before passing it to the application. It also chops off the envelope (the first message frame) from each message it sends, and uses that reply address to decide which peer the message should go to.
  • REP stores all the message frames up to the first empty message frame, when you receive a message and it passes the rest (the data) to your application. When you send a reply, REP prepends the saved envelopes to the message and sends it back using the same semantics as ROUTER (and in fact REP is built on top of ROUTER), but matching REQ, imposes a strict receive / send cycle.

REP requires that the envelopes end with an empty message frame. If you're not using REQ at the other end of the chain then you must add the empty message frame yourself.

So the obvious question about ROUTER is, where does it get the reply addresses from? And the obvious answer is, it uses the socket's identity. As we already learned, if a socket does not set an identity, the ROUTER socket generates an identity that it can associate with the connection to that socket.

Figure 28 - ROUTER Invents a UUID

fig28.png

When we set our own identity on a socket, this gets passed to the ROUTER socket, which passes it to the application as part of the envelope for each message that comes in.

Figure 29 - ROUTER uses Identity If It knows It

fig29.png

Let's observe the above two cases in practice. This program dumps the contents of the message frames that a ROUTER socket receives from two REP sockets, one not using identities, and one using an identity 'Hello':

package ;

import ZHelpers;

import neko.Lib;
import neko.Sys;
import haxe.io.Bytes;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQSocket;

/**
* Demonstrate identities as used by the request-reply pattern. Run this
* program by itself.
*/

class Identity
{

public static function main() {
var context:ZContext = new ZContext();
Lib.println("** Identity (see: http://zguide.zeromq.org/page:all#Request-Reply-Envelopes)");

// Socket facing clients
var sink:ZMQSocket = context.createSocket(ZMQ_ROUTER);
sink.bind("inproc://example");

// First allow 0MQ to set the identity
var anonymous:ZMQSocket = context.createSocket(ZMQ_REQ);
anonymous.connect("inproc://example");
anonymous.sendMsg(Bytes.ofString("ROUTER uses a generated 5 byte identity"));
ZHelpers.dump(sink);

// Then set the identity ourselves
var identified:ZMQSocket = context.createSocket(ZMQ_REQ);
identified.setsockopt(ZMQ_IDENTITY, Bytes.ofString("PEER2"));
identified.connect("inproc://example");
identified.sendMsg(Bytes.ofString("ROUTER socket uses REQ's socket identity"));
ZHelpers.dump(sink);

context.destroy();

}

}

identity.hx: Identity check

Here is what the dump function prints:

----------------------------------------
[017] 00314F043F46C441E28DD0AC54BE8DA727
[000]
[026] ROUTER uses a generated UUID
----------------------------------------
[005] Hello
[000]
[038] ROUTER socket uses REQ's socket identity

Custom Request-Reply Routing

topprevnext

We already saw that ROUTER uses the message envelope to decide which client to route a reply back to. Now let me express that in another way: ROUTER will route messages asynchronously to any peer connected to it, if you provide the correct routing address via a properly constructed envelope.

So ROUTER is really a fully controllable router. We'll dig into this magic in detail.

But first, and because we're going to go off-road into some rough and possibly illegal terrain now, let's look closer at REQ and REP. Few people know this, but despite their kindergarten approach to messaging, REQ and REP are actually colorful characters:

  • REQ is a mama socket, doesn't listen but always expects an answer. Mamas are strictly synchronous and if you use them they are always the 'request' end of a chain.
  • REP is a papa socket, always answers, but never starts a conversation. Papas are strictly synchronous and if you use them, they are always the 'reply' end of a chain.

The thing about Mama sockets is, as we all learned as kids, you can't speak until spoken to. Mamas do not have simple open-mindedness of papas, nor the ambiguous "sure, whatever" shrugged-shoulder aloofness of a dealer. So to speak to a mama socket, you have to get the mama socket to talk to you first. The good part is mamas don't care if you reply now, or much later. Just bring a good sob story and a bag of laundry.

Papa sockets on the other hand are strong and silent, and pedantic. They do just one thing, which is to give you an answer to whatever you ask, perfectly framed and precise. Don't expect a papa socket to be chatty, or to pass a message on to someone else, this is just not going to happen.

While we usually think of request-reply as a to-and-fro pattern, in fact it can be fully asynchronous, as long as we understand that any mamas or papas will be at the end of a chain, never in the middle of it, and always synchronous. All we need to know is the address of the peer we want to talk to, and then we can then send it messages asynchronously, via a router. The router is the one and only ØMQ socket type capable of being told "send this message to X" where X is the address of a connected peer.

These are the ways we can know the address to send a message to, and you'll see most of these used in the examples of custom request-reply routing:

  • By default, a peer has a null identity and the router will generate a UUID and use that to refer to the connection when it delivers you each incoming message from that peer.
  • If the peer socket set an identity, the router will give that identity when it delivers an incoming request envelope from that peer.
  • Peers with explicit identities can send them via some other mechanism, e.g. via some other sockets.
  • Peers can have prior knowledge of each others' identities, e.g. via configuration files or some other magic.

There are at least three routing patterns, one for each of the socket types we can easily connect to a router:

  • Router-to-dealer.
  • Router-to-mama (REQ).
  • Router-to-papa (REP).

In each of these cases we have total control over how we route messages, but the different patterns cover different use-cases and message flows. Let's break it down over the next sections with examples of different routing algorithms.

But first some warnings about custom routing:

  • This goes against a fairly solid ØMQ rule: delegate peer addressing to the socket. The only reason we do it is because ØMQ lacks a wide range of routing algorithms.
  • Future versions of ØMQ will probably do some of the routing we're going to build here. That means the code we design now may break, or become redundant in the future.
  • While the built-in routing has certain guarantees of scalability, such as being friendly to devices, custom routing doesn't. You will need to make your own devices.

So overall, custom routing is more expensive and more fragile than delegating this to ØMQ. Only do it if you need it. Having said that, let's jump in, the water's great!

Router-to-Dealer Routing

topprevnext

The router-to-dealer pattern is the simplest. You connect one router to many dealers, and then distribute messages to the dealers using any algorithm you like. The dealers can be sinks (process the messages without any response), proxies (send the messages on to other nodes), or services (send back replies).

If you expect the dealer to reply, there should only be one router talking to it. Dealers have no idea how to reply to a specific peer, so if they have multiple peers, they will load-balance between them, which would be weird. If the dealer is a sink, any number of routers can talk to it.

What kind of routing can you do with a router-to-dealer pattern? If the dealers talk back to the router, e.g. telling the router when they finished a task, you can use that knowledge to route depending on how fast a dealer is. Since both router and dealer are asynchronous, it can get a little tricky. You'd need to use zmq_poll(3) at least.

We'll make an example where the dealers don't talk back, they're pure sinks. Our routing algorithm will be a weighted random scatter: we have two dealers and we send twice as many messages to one as to the other.

Figure 30 - Router-to-Dealer Custom Routing

fig30.png

Here's code that shows how this works:

package ;

import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
#if (neko || cpp)
import neko.Random;
import neko.vm.Thread;
#end
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;
import org.zeromq.ZMQSocket;

/**
* Custom routing Router to Dealer
*
* While this example runs in a single process, that is just to make
* it easier to start and stop the example. Each thread has its own
* context and conceptually acts as a separate process.
*
* See: http://zguide.zeromq.org/page:all#Router-to-Dealer-Routing
*/

class RTDealer
{

public static function workerTask(id:String) {
var context:ZContext = new ZContext();
var worker:ZMQSocket = context.createSocket(ZMQ_DEALER);
worker.setsockopt(ZMQ_IDENTITY, Bytes.ofString(id));
worker.connect("ipc:///tmp/routing.ipc");

var total = 0;
while (true) {
// We receive one part, with the workload
var request:ZFrame = ZFrame.recvFrame(worker);
if (request == null) break;
if (request.streq("END")) {
Lib.println(id + " received: " + total);
break;
}
total++;
}
context.destroy();
}

public static function main() {

Lib.println("** RTDealer (see: http://zguide.zeromq.org/page:all#Router-to-Dealer-Routing)");

// Implementation note: Had to move php forking before main thread ZMQ Context creation to
// get the main thread to receive messages from the child processes.
#if php
// For PHP, use processes, not threads
forkWorkerTasks();
#else
var workerA = Thread.create(callback(workerTask, "A"));
var workerB = Thread.create(callback(workerTask, "B"));
#end

var context:ZContext = new ZContext();
var client:ZMQSocket = context.createSocket(ZMQ_ROUTER);
// Implementation note: Had to add the /tmp prefix to get this to work on Linux Ubuntu 10
client.bind("ipc:///tmp/routing.ipc");

// Wait for threads to connect, since otherwise the messages
// we send won't be routable.
Sys.sleep(1);

// Send 10 tasks scattered to A twice as often as B
var workload = ZFrame.newStringFrame("This is the workload");
var address:ZFrame;
#if !php
var rnd = new Random();
rnd.setSeed(Date.now().getSeconds());
#end
for (task_nbr in 0 10) {
// Send two message parts, first the address…
var randNumber:Int;
#if php
randNumber = untyped __php__('rand(0, 2)');
#else
randNumber = rnd.int(2);
#end
if (randNumber > 0)
address = ZFrame.newStringFrame("A");
else
address = ZFrame.newStringFrame("B");

address.send(client, ZFrame.ZFRAME_MORE);

// And then the workload
workload.send(client, ZFrame.ZFRAME_REUSE);
}

ZFrame.newStringFrame("A").send(client, ZFrame.ZFRAME_MORE);
ZFrame.newStringFrame("END").send(client);

ZFrame.newStringFrame("B").send(client, ZFrame.ZFRAME_MORE);
ZFrame.newStringFrame("END").send(client);

workload.destroy();
context.destroy();

}

#if php
private static inline function forkWorkerTasks() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
RTDealer::workerTask("A");
exit();
}'
);
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
RTDealer::workerTask("B");
exit();
}'
);
return;
}
#end

}

rtdealer.hx: Router-to-dealer

Some comments on this code:

  • The router doesn't know when the dealers are ready, and it would be distracting for our example to add in the signaling to do that. So the router just does a "sleep (1)" after starting the dealer threads. Without this sleep, the router will send out messages that can't be routed, and ØMQ will discard them.
  • Note that this behavior is specific to ROUTER sockets. PUB sockets will also discard messages if there are no subscribers, but all other socket types will queue sent messages until there's a peer to receive them.

To route to a dealer, we create an envelope consisting of just an identity frame (we don't need a null separator).

Figure 31 - Routing Envelope for Dealer

fig31.png

The router socket removes the first frame, and sends the second frame, which the dealer gets as-is. When the dealer sends a message to the router, it sends one frame. The router prepends the dealer's address and gives us back a similar envelope in two parts.

Something to note: if you use an invalid address, the router discards the message silently. There is not much else it can do usefully. In normal cases this either means the peer has gone away, or that there is a programming error somewhere and you're using a bogus address. In any case you cannot ever assume a message will be routed successfully until and unless you get a reply of some sort from the destination node. We'll come to creating reliable patterns later on.

Dealers in fact work exactly like PUSH and PULL combined. It's however illegal and pointless to connect PULL or PUSH to a request-reply socket.

Least-Recently Used Routing (LRU Pattern)

topprevnext

Like we said, mamas (REQ sockets, if you really insist on it) don't listen to you, and if you try to speak out of turn they'll ignore you. You have to wait for them to say something, then you can give a sarcastic answer. This is very useful for routing because it means we can keep a bunch of mamas waiting for answers. In effect, mamas tell us when they're ready.

You can connect one router to many mamas, and distribute messages as you would to dealers. Mamas will usually want to reply, but they will let you have the last word. However it's one thing at a time:

  • Mama speaks to router
  • Router replies to mama
  • Mama speaks to router
  • Router replies to mama
  • etc.

Like dealers, mamas can only talk to one router and since mamas always start by talking to the router, you should never connect one mama to more than one router unless you are doing sneaky stuff like multi-pathway redundant routing. I'm not even going to explain that now, and hopefully the jargon is complex enough to stop you trying this until you need it.

Figure 32 - Router to Mama Custom Routing

fig32.png

What kind of routing can you do with a router-to-mama pattern? Probably the most obvious is "least-recently-used" (LRU), where we always route to the mama that's been waiting longest. Here is an example that does LRU routing to a set of mamas:

package ;

import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
#if (neko || cpp)
import neko.vm.Thread;
#end
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;
import org.zeromq.ZMQSocket;

import ZHelpers;

/**
* Custom routing Router to Mama (ROUTER to REQ)
*
* While this example runs in a single process (for cpp & neko), that is just
* to make it easier to start and stop the example. Each thread has its own
* context and conceptually acts as a separate process.
*
* See: http://zguide.zeromq.org/page:all#Least-Recently-Used-Routing-LRU-Pattern
*/

class RTMama
{

private static inline var NBR_WORKERS = 10;

public static function workerTask() {
var context:ZContext = new ZContext();
var worker:ZMQSocket = context.createSocket(ZMQ_REQ);

// Use a random string identity for ease here
var id = ZHelpers.setID(worker);
worker.connect("ipc:///tmp/routing.ipc");

var total = 0;
while (true) {
// Tell the router we are ready
ZFrame.newStringFrame("ready").send(worker);
// Get workload from router, until finished
var workload:ZFrame = ZFrame.recvFrame(worker);
if (workload == null) break;
if (workload.streq("END")) {
Lib.println("Processed: " + total + " tasks");
break;
}
total++;

// Do some random work
Sys.sleep((ZHelpers.randof(1000) + 1) / 1000.0);
}
context.destroy();
}

public static function main() {

Lib.println("** RTMama (see: http://zguide.zeromq.org/page:all#Least-Recently-Used-Routing-LRU-Pattern)");

// Implementation note: Had to move php forking before main thread ZMQ Context creation to
// get the main thread to receive messages from the child processes.
for (worker_nbr in 0 NBR_WORKERS) {
#if php
forkWorkerTask();
#else
Thread.create(workerTask);
#end
}

var context:ZContext = new ZContext();
var client:ZMQSocket = context.createSocket(ZMQ_ROUTER);
// Implementation note: Had to add the /tmp prefix to get this to work on Linux Ubuntu 10
client.bind("ipc:///tmp/routing.ipc");


Sys.sleep(1);
for (task_nbr in 0 NBR_WORKERS * 10) {
// LRU worker is next waiting in queue
var address:ZFrame = ZFrame.recvFrame(client);
var empty:ZFrame = ZFrame.recvFrame(client);
var ready:ZFrame = ZFrame.recvFrame(client);

address.send(client, ZFrame.ZFRAME_MORE);
ZFrame.newStringFrame("").send(client, ZFrame.ZFRAME_MORE);
ZFrame.newStringFrame("This is the workload").send(client);
}
// Now ask mamas to shut down and report their results
for (worker_nbr in 0 NBR_WORKERS) {
var address:ZFrame = ZFrame.recvFrame(client);
var empty:ZFrame = ZFrame.recvFrame(client);
var ready:ZFrame = ZFrame.recvFrame(client);

address.send(client, ZFrame.ZFRAME_MORE);
ZFrame.newStringFrame("").send(client, ZFrame.ZFRAME_MORE);
ZFrame.newStringFrame("END").send(client);
}
context.destroy();
}

#if php
private static inline function forkWorkerTask() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
RTMama::workerTask();
exit();
}'
);
return;
}
#end

}

rtmama.hx: Router-to-mama

For this example the LRU doesn't need any particular data structures above what ØMQ gives us (message queues) because we don't need to synchronize the workers with anything. A more realistic LRU algorithm would have to collect workers as they become ready, into a queue, and the use this queue when routing client requests. We'll do this in a later example.

To prove that the LRU is working as expected, the mamas print the total tasks they each did. Since the mamas do random work, and we're not load balancing, we expect each mama to do approximately the same amount but with random variation. And that is indeed what we see:

Processed: 8 tasks
Processed: 8 tasks
Processed: 11 tasks
Processed: 7 tasks
Processed: 9 tasks
Processed: 11 tasks
Processed: 14 tasks
Processed: 11 tasks
Processed: 11 tasks
Processed: 10 tasks

Some comments on this code

  • We don't need any settle time, since the mamas explicitly tell the router when they are ready.
  • We're generating our own identities here, as printable strings, using the zhelpers.h s_set_id function. That's just to make our life a little simpler. In a realistic application the mamas would be fully anonymous and then you'd call zmq_recv(3) and zmq_send(3) directly instead of the zhelpers s_recv() and s_send() functions, which can only handle strings.
  • If you copy and paste example code without understanding it, you deserve what you get. It's like watching Spiderman leap off the roof and then trying that yourself.

To route to a mama, we must create a mama-friendly envelope consisting of an address plus an empty message frame.

Figure 33 - Routing Envelope for Mama (REQ)

fig33.png

Address-based Routing

topprevnext

Papas are, if we care about them at all, only there to answer questions. And to pay the bills, fix the car when mama drives it into the garage wall, put up shelves, and walk the dog when it's raining. But apart from that, papas are only there to answer questions.

In a classic request-reply pattern a router wouldn't talk to a papa socket at all, but rather would get a dealer to do the job for it. That's what dealers are for: to pass questions onto random papas and come back with their answers. Routers are generally more comfortable talking to mamas. OK, dear reader, you may stop the psychoanalysis. These are analogies, not life stories.

It's worth remembering with ØMQ that the classic patterns are the ones that work best, that the beaten path is there for a reason, and that when we go off-road we take the risk of falling off cliffs and getting eaten by zombies. Having said that, let's plug a router into a papa and see what the heck emerges.

The special thing about papas, all joking aside, is actually two things:

  • One, they are strictly lockstep request-reply.
  • Two, they accept an envelope stack of any size and will return that intact.

In the normal request-reply pattern, papas are anonymous and replaceable (wow, these analogies are scary), but we're learning about custom routing. So, in our use-case we have reason to send a request to papa A rather than papa B. This is essential if you want to keep some kind of a conversation going between you, at one end of a large network, and a papa sitting somewhere far away.

A core philosophy of ØMQ is that the edges are smart and many, and the middle is vast and dumb. This does mean the edges can address each other, and this also means we want to know how to reach a given papa. Doing routing across multiple hops is something we'll look at later but for now we'll look just at the final step: a router talking to a specific papa.

Figure 34 - Router-to-Papa Custom Routing

fig34.png

This example shows a very specific chain of events:

  • The client has a message that it expects to route back (via another router) to some node. The message has two addresses (a stack), an empty part, and a body.
  • The client passes that to the router but specifies a papa address first.
  • The router removes the papa address, uses that to decide which papa to send the message to.
  • The papa receives the addresses, empty part, and body.
  • It removes the addresses, saves them, and passes the body to the worker.
  • The worker sends a reply back to the papa.
  • The papa recreates the envelope stack and sends that back with the worker's reply to the router.
  • The router prepends the papa's address and provides that to the client along with the rest of the address stack, empty part, and the body.

It's complex but worth working through until you understand it. Just remember a papa is garbage in, garbage out.

package ;
import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQSocket;
import org.zeromq.ZFrame;

/**
* Custom routing Router to Papa (ROUTER to REP)
*
* We will do this all in one thread to emphasize the sequence
* of events…
*
* See: http://zguide.zeromq.org/page:all#Address-based-Routing
*/

class RTPapa
{

public static function main() {
Lib.println("** RTPapa (see: http://zguide.zeromq.org/page:all#Address-based-Routing)");

var context:ZContext = new ZContext();
var client:ZMQSocket = context.createSocket(ZMQ_ROUTER);
client.bind("ipc:///tmp/routing.ipc");
var worker:ZMQSocket = context.createSocket(ZMQ_REP);
worker.setsockopt(ZMQ_IDENTITY, Bytes.ofString("A"));
worker.connect("ipc:///tmp/routing.ipc");

// Wait for the worker to connect so that when we send a message
// with a routing envelope, it will actually match the worker…
Sys.sleep(1);

// Send papa address, address stack, empty part, and request
ZFrame.newStringFrame("A").send(client, ZFrame.ZFRAME_MORE);
ZFrame.newStringFrame("address 3").send(client, ZFrame.ZFRAME_MORE);
ZFrame.newStringFrame("address 2").send(client, ZFrame.ZFRAME_MORE);
ZFrame.newStringFrame("address 1").send(client, ZFrame.ZFRAME_MORE);
ZFrame.newStringFrame("").send(client, ZFrame.ZFRAME_MORE);
ZFrame.newStringFrame("This is the workload").send(client);

// Worker should just get the workload
ZHelpers.dump(worker);

// We don't play with the envelopes in the worker
ZFrame.newStringFrame("This is the reply").send(worker);

// Now we dump what we got off the ROUTER socket
ZHelpers.dump(client);

context.destroy();
}
}

rtpapa.hx: Router-to-papa

Run this program and it should show you this:

----------------------------------------
[020] This is the workload
----------------------------------------
[001] A
[009] address 3
[009] address 2
[009] address 1
[000]
[017] This is the reply

Some comments on this code:

  • In reality we'd have the papa and router in separate nodes. This example does it all in one thread because it makes the sequence of events really clear.
  • zmq_connect(3) doesn't happen instantly. When the papa socket connects to the router, that takes a certain time and happens in the background. In a realistic application the router wouldn't even know the papa existed until there had been some previous dialog. In our toy example we'll just sleep (1); to make sure the connection's done. If you remove the sleep, the papa socket won't get the message. (Try it.)
  • We're routing using the papa's identity. Just to convince yourself this really is happening, try sending to a wrong address, like "B". The papa won't get the message.
  • The s_dump and other utility functions (in the C code) come from the zhelpers.h header file. It becomes clear that we do the same work over and over on sockets, and there are interesting layers we can build on top of the ØMQ API. We'll come back to this later when we make a real application rather than these toy examples.

To route to a papa, we must create a papa-friendly envelope.

Figure 35 - Routing Envelope for Papa aka REP

fig35.png

A Request-Reply Message Broker

topprevnext

We'll recap the knowledge we have so far about doing weird stuff with ØMQ message envelopes, and build the core of a generic custom routing queue device that we can properly call a message broker. Sorry for all the buzzwords. What we'll make is a queue device that connects a bunch of clients to a bunch of workers, and lets you use any routing algorithm you want. What we'll do is least-recently used, since it's the most obvious use-case apart from load-balancing.

To start with, let's look back at the classic request-reply pattern and then see how it extends over a larger and larger service-oriented network. The basic pattern just has one client talking to a few workers.

Figure 36 - Basic Request-reply

fig36.png

This extends to multiple papas, but if we want to handle multiple mamas as well we need a device in the middle, which normally consists of a router and a dealer back to back, connected by a classic ZMQ_QUEUE device that just copies message frames between the two sockets as fast as it can.

Figure 37 - Stretched Request-reply

fig37.png

The key here is that the router stores the originating mama address in the request envelope, the dealer and papas don't touch that, and so the router knows which mama to send the reply back to. Papas are anonymous and not addressed in this pattern, all papas are assumed to provide the same service.

In the above design, we're using the built-in load balancing routing that the dealer socket provides. However we want for our broker to use a least-recently used algorithm, so we take the router-mama pattern we learned, and apply that.

Figure 38 - Stretched Request-reply with LRU

fig38.png

Our broker - a router-to-router LRU queue - can't simply copy message frames blindly. Here is the code, it's fairly complex but the core logic is reusable in any request-reply broker that wants to do LRU routing:

package ;
import haxe.io.Bytes;
import neko.Lib;
#if (neko || cpp)
import neko.vm.Thread;
#end
import haxe.Stack;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQException;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;

/**
* Least - recently used (LRU) queue device
* Clients and workers are shown here in-process
*
* While this example runs in a single process, that is just to make
* it easier to start and stop the example. Each thread has its own
* context and conceptually acts as a separate process.
*
* NB: LRUQueue deliberately uses the lower-level ZMQxxx.hx classes.
* See LRUQueue2 for a cleaner implementation using the Zxxx.hx classes, modelled on czmq
*
* See: http://zguide.zeromq.org/page:all#A-Request-Reply-Message-Broker
*/

class LRUQueue
{

private static inline var NBR_CLIENTS = 10;
private static inline var NBR_WORKERS = 3;

/**
* Basic request-reply client using REQ socket.
*/

public static function clientTask() {
var context:ZContext = new ZContext();
var client:ZMQSocket = context.createSocket(ZMQ_REQ);
var id = ZHelpers.setID(client);
client.connect("ipc:///tmp/frontend.ipc");

// Send request, receive reply
client.sendMsg(Bytes.ofString("HELLO"));
var reply = client.recvMsg();
Lib.println("Client "+id+": " + reply.toString());

context.destroy();
}

/**
* Worker using REQ socket to do LRU routing.
*/

public static function workerTask() {
var context:ZContext = new ZContext();
var worker:ZMQSocket = context.createSocket(ZMQ_REQ);
var id = ZHelpers.setID(worker);
worker.connect("ipc:///tmp/backend.ipc");

// Tell broker we're ready to do work
worker.sendMsg(Bytes.ofString("READY"));

while (true) {
// Read and save all frames until we get an empty frame
// In this example, there is only 1 but it could be more.
var address = worker.recvMsg();
var empty = worker.recvMsg();

// Get request, send reply
var request = worker.recvMsg();
Lib.println("Worker "+id+": " + request.toString());

worker.sendMsg(address, SNDMORE);
worker.sendMsg(empty, SNDMORE);
worker.sendMsg(Bytes.ofString("OK"));
}

context.destroy();
}

public static function main() {
Lib.println("** LRUQueue (see: http://zguide.zeromq.org/page:all#A-Request-Reply-Message-Broker)");
var client_nbr:Int = 0, worker_nbr:Int;

#if php
// PHP appears to require tasks to be forked before main process creates ZMQ context
for (client_nbr in 0 NBR_CLIENTS) {
forkClientTask();
}
for (worker_nbr in 0 NBR_WORKERS) {
forkWorkerTask();
}
#end
// Prepare our context and sockets
var context:ZContext = new ZContext();
var frontend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
var backend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
frontend.bind("ipc:///tmp/frontend.ipc");
backend.bind("ipc:///tmp/backend.ipc");
#if !php
// Non-PHP targets require threads to be created after main thread has set up ZMQ Context
for (client_nbr in 0 NBR_CLIENTS) {
Thread.create(clientTask);
}
for (worker_nbr in 0 NBR_WORKERS) {
Thread.create(workerTask);
}
#end

// Logic of LRU loop:
// - Poll backend always, frontend only if 1 or more worker si ready
// - If worker replies, queue worker as ready and forward reply
// to client if necessary.
// - If client requests, pop next worker and send request to it.

// Queue of available workers
var workerQueue:List<String> = new List<String>();

var poller:ZMQPoller = new ZMQPoller();
poller.registerSocket(backend, ZMQ.ZMQ_POLLIN());

client_nbr = NBR_CLIENTS;
while (true) {
poller.unregisterSocket(frontend);
if (workerQueue.length > 0) {
// Only poll frontend if there is at least 1 worker ready to do work
poller.registerSocket(frontend, ZMQ.ZMQ_POLLIN());
}

try {
poller.poll( -1 );
} catch (e:ZMQException) {
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
// Handle worker activity on backend
if (poller.pollin(1)) {
// Queue worker address for LRU routing
var workerAddr = backend.recvMsg();
if (workerQueue.length < NBR_WORKERS)
workerQueue.add(workerAddr.toString());

// Second frame is empty
var empty = backend.recvMsg();

// Third frame is READY or else a client reply address
var clientAddr = backend.recvMsg();

// If client reply, send rest back to frontend
if (clientAddr.toString() != "READY") {
empty = backend.recvMsg();
var reply = backend.recvMsg();
frontend.sendMsg(clientAddr, SNDMORE);
frontend.sendMsg(Bytes.ofString(""), SNDMORE);
frontend.sendMsg(reply);
if (--client_nbr == 0)
break; // Exit after NBR_CLIENTS messages
}
}

if (poller.pollin(2)) {
// Now get next client request, route to LRU worker
// Client request is [address][empty][request]
var clientAddr = frontend.recvMsg();
var empty = frontend.recvMsg();
var request = frontend.recvMsg();

backend.sendMsg(Bytes.ofString(workerQueue.pop()), SNDMORE);
backend.sendMsg(Bytes.ofString(""), SNDMORE);
backend.sendMsg(clientAddr, SNDMORE);
backend.sendMsg(Bytes.ofString(""), SNDMORE);
backend.sendMsg(request);

}
}

context.destroy();
}

#if php
private static inline function forkWorkerTask() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
LRUQueue::workerTask();
exit();
}'
);
return;
}

private static inline function forkClientTask() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
LRUQueue::clientTask();
exit();
}'
);
return;
}
#end

}

lruqueue.hx: LRU queue broker

The difficult part of this program is (a) the envelopes that each socket reads and writes, and (b) the LRU algorithm. We'll take these in turn, starting with the message envelope formats.

First, recall that a mama REQ socket always puts on an empty part (the envelope delimiter) on sending and removes this empty part on reception. The reason for this isn't important, it's just part of the 'normal' request-reply pattern. What we care about here is just keeping mama happy by doing precisely what she needs. Second, the router always adds an envelope with the address of whomever the message came from.

We can now walk through a full request-reply chain from client to worker and back. In the code we set the identity of client and worker sockets to make it easier to print the message frames if we want to. Let's assume the client's identity is "CLIENT" and the worker's identity is "WORKER". The client sends a single frame with the message.

Figure 39 - Message that Client Sends

fig39.png

What the queue gets, when reading off the router frontend socket, are three frames consisting of the sender address, empty frame delimiter, and the data part.

Figure 40 - Message Coming in on Frontend

fig40.png

The broker sends this to the worker, prefixed by the address of the worker, taken from the LRU queue, plus an additional empty part to keep the mama at the other end happy.

Figure 41 - Message Sent to Backend

fig41.png

This complex envelope stack gets chewed up first by the backend router socket, which removes the first frame. Then the mama socket in the worker removes the empty part, and provides the rest to the worker.

Figure 42 - Message Delivered to Worker

fig42.png

Which is exactly the same as what the queue received on its frontend router socket. The worker has to save the envelope (which is all the parts up to and including the empty message frame) and then it can do what's needed with the data part.

On the return path the messages are the same as when they come in, i.e. the backend socket gives the queue a message in five parts, and the queue sends the frontend socket a message in three parts, and the client gets a message in one part.

Now let's look at the LRU algorithm. It requires that both clients and workers use mama sockets, and that workers correctly store and replay the envelope on messages they get. The algorithm is:

  • Create a pollset which polls the backend always, and the frontend only if there are one or more workers available.
  • Poll for activity with infinite timeout.
  • If there is activity on the backend, we either have a "ready" message or a reply for a client. In either case we store the worker address (the first part) on our LRU queue, and if the rest is a client reply we send it back to that client via the frontend.
  • If there is activity on the frontend, we take the client request, pop the next worker (which is the least-recently used), and send the request to the backend. This means sending the worker address, empty part, and then the three parts of the client request.

You should now see that you can reuse and extend the LRU algorithm with variations based on the information the worker provides in its initial "ready" message. For example, workers might start up and do a performance self-test, then tell the broker how fast they are. The broker can then choose the fastest available worker rather than LRU or round-robin.

A High-Level API for ØMQ

topprevnext

Reading and writing multipart messages using the native ØMQ API is like eating a bowl of hot noodle soup, with fried chicken and extra vegetables, using a toothpick. Look at the core of the worker thread from our LRU queue broker:

void my_free (void *data, void *hint) {
free (data);
}
// Send message from buffer, which we allocate and 0MQ will free for us
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_msg_send (socket, &message, 0);

That code isn't even reusable, because it can only handle one envelope. And this code already does some wrapping around the ØMQ API. If we used the libzmq API directly this is what we'd have to write:

while (1) {
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
char *address = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);

// Get request, send reply
char *request = s_recv (worker);
printf ("Worker: %s\n", request);
free (request);

s_sendmore (worker, address);
s_sendmore (worker, "");
s_send (worker, "OK");
free (address);
}

What we want is an API that lets us receive and send an entire message in one shot, including all envelopes. One that lets us do what we want with the absolute least lines of code. The ØMQ core API itself doesn't aim to do this, but nothing prevents us making layers on top, and part of learning to use ØMQ intelligently is to do exactly that.

Making a good message API is fairly difficult, especially if we want to avoid copying data around too much. We have a problem of terminology: ØMQ uses "message" to describe both multipart messages, and individual parts of a message. We have a problem of semantics: sometimes it's natural to see message content as printable string data, sometimes as binary blobs.

So one solution is to use three concepts: string (already the basis for s_send and s_recv), frame (a message frame), and message (a list of one or more frames). Here is the worker code, rewritten onto an API using these concepts:

while (1) {
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
zmq_msg_t address;
zmq_msg_init (&address);
zmq_msg_recv (worker, &address, 0);

zmq_msg_t empty;
zmq_msg_init (&empty);
zmq_msg_recv (worker, &empty, 0);

// Get request, send reply
zmq_msg_t payload;
zmq_msg_init (&payload);
zmq_msg_recv (worker, &payload, 0);

int char_nbr;
printf ("Worker: ");
for (char_nbr = 0; char_nbr < zmq_msg_size (&payload); char_nbr++)
printf ("%c", *(char *) (zmq_msg_data (&payload) + char_nbr));
printf ("\n");

zmq_msg_init_size (&payload, 2);
memcpy (zmq_msg_data (&payload), "OK", 2);

zmq_msg_send (worker, &address, ZMQ_SNDMORE);
zmq_close (&address);
zmq_msg_send (worker, &empty, ZMQ_SNDMORE);
zmq_close (&empty);
zmq_msg_send (worker, &payload, 0);
zmq_close (&payload);
}

Replacing 22 lines of code with four is a good deal, especially since the results are easy to read and understand. We can continue this process for other aspects of working with ØMQ. Let's make a wishlist of things we would like in a higher-level API:

  • Automatic handling of sockets. I find it really annoying to have to close sockets manually, and to have to explicitly define the linger timeout in some but not all cases. It'd be great to have a way to close sockets automatically when I close the context.
  • Portable thread management. Every non-trivial ØMQ application uses threads, but POSIX threads aren't portable. So a decent high-level API should hide this under a portable layer.
  • Portable clocks. Even getting the time to a millisecond resolution, or sleeping for some milliseconds, is not portable. Realistic ØMQ applications need portable clocks, so our API should provide them.
  • A reactor to replace zmq_poll(3). The poll loop is simple but clumsy. Writing a lot of these, we end up doing the same work over and over: calculating timers, and calling code when sockets are ready. A simple reactor with socket readers, and timers, would save a lot of repeated work.
  • Proper handling of Ctrl-C. We already saw how to catch an interrupt. It would be useful if this happened in all applications.

Turning this wishlist into reality gives us CZMQ, a high-level C API for ØMQ. This high-level binding in fact developed out of earlier versions of the Guide. It combines nicer semantics for working with ØMQ with some portability layers, and (importantly for C but less for other languages) containers like hashes and lists.

Here is the LRU queue broker rewritten to use CZMQ:

package ;
import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZFrame;
import org.zeromq.ZMsg;
#if (neko || cpp)
import neko.vm.Thread;
#end
import haxe.Stack;
import org.zeromq.ZContext;
import org.zeromq.ZSocket;
using org.zeromq.ZSocket;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;

/**
* Least - recently used (LRU) queue device
* Clients and workers are shown here in-process
*
* While this example runs in a single process, that is just to make
* it easier to start and stop the example. Each thread has its own
* context and conceptually acts as a separate process.
*
* See: http://zguide.zeromq.org/page:all#A-High-Level-API-for-MQ
*/

class LRUQueue2
{

private static inline var NBR_CLIENTS = 10;
private static inline var NBR_WORKERS = 3;

// Signals workers are ready
private static inline var LRU_READY:String = String.fromCharCode(1);

private static inline var WORKER_DONE:Bytes = Bytes.ofString("OK");

/**
* Basic request-reply client using REQ socket.
*/

public static function clientTask() {
var context:ZContext = new ZContext();
var client:ZMQSocket = context.createSocket(ZMQ_REQ);
var id = ZHelpers.setID(client);
client.connectEndpoint("ipc", "/tmp/frontend.ipc");

while (true) {
ZFrame.newStringFrame("HELLO").send(client);
var reply = ZFrame.recvFrame(client);
if (reply == null) {
break;
}
Lib.println("Client "+id+": " + reply.toString());
Sys.sleep(1);
}

context.destroy();
}

/**
* Worker using REQ socket to do LRU routing.
*/

public static function workerTask() {
var context:ZContext = new ZContext();
var worker:ZMQSocket = context.createSocket(ZMQ_REQ);
var id = ZHelpers.setID(worker);
worker.connectEndpoint("ipc", "/tmp/backend.ipc");

// Tell broker we're ready to do work
ZFrame.newStringFrame(LRU_READY).send(worker);

// Process messages as they arrive
while (true) {
var msg:ZMsg = ZMsg.recvMsg(worker);

if (msg == null) {
break;
}
// Lib.println("Worker " + id + " received " + msg.toString());
msg.last().reset(WORKER_DONE);
msg.send(worker);
}
context.destroy();
}

public static function main() {
Lib.println("** LRUQueue2 (see: http://zguide.zeromq.org/page:all#A-High-Level-API-for-MQ)");

#if php
// PHP appears to require tasks to be forked before main process creates ZMQ context
for (client_nbr in 0 NBR_CLIENTS) {
forkClientTask();
}
for (worker_nbr in 0 NBR_WORKERS) {
forkWorkerTask();
}
#end
// Prepare our context and sockets
var context:ZContext = new ZContext();
var frontend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
var backend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
frontend.bindEndpoint("ipc", "/tmp/frontend.ipc");
backend.bindEndpoint("ipc", "/tmp/backend.ipc");
#if !php
// Non-PHP targets require threads to be created after main thread has set up ZMQ Context
for (client_nbr in 0 NBR_CLIENTS) {
Thread.create(clientTask);
}
for (worker_nbr in 0 NBR_WORKERS) {
Thread.create(workerTask);
}
#end

// Logic of LRU loop:
// - Poll backend always, frontend only if 1 or more worker si ready
// - If worker replies, queue worker as ready and forward reply
// to client if necessary.
// - If client requests, pop next worker and send request to it.

// Queue of available workers
var workerQueue:List<ZFrame> = new List<ZFrame>();

var poller:ZMQPoller = new ZMQPoller();
poller.registerSocket(backend, ZMQ.ZMQ_POLLIN());

while (true) {
poller.unregisterSocket(frontend);
if (workerQueue.length > 0) {
// Only poll frontend if there is at least 1 worker ready to do work
poller.registerSocket(frontend, ZMQ.ZMQ_POLLIN());
}

try {
poller.poll( -1 );
} catch (e:ZMQException) {
if (ZMQ.isInterrupted()) {
break; // Interrupted or terminated
}
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
// Handle worker activity on backend
if (poller.pollin(1)) {
// Use worker address for LRU routing
var msg:ZMsg = ZMsg.recvMsg(backend);
if (msg == null) {
break;
}
var workerAddr = msg.unwrap();
if (workerQueue.length < NBR_WORKERS)
workerQueue.add(workerAddr);

// Third frame is READY or else a client reply address
var frame = msg.first();

// If client reply, send rest back to frontend
if (frame.toString() == LRU_READY) {
msg.destroy();
} else {
msg.send(frontend);
}
}

if (poller.pollin(2)) {
// get client request, route to first available worker
var msg = ZMsg.recvMsg(frontend);
if (msg != null) {
msg.wrap(workerQueue.pop());
msg.send(backend);
}
}
}
// When we're done, clean up properly
for (f in workerQueue) {
f.destroy();
}
context.destroy();
}

#if php
private static inline function forkWorkerTask() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
LRUQueue2::workerTask();
exit();
}'
);
return;
}

private static inline function forkClientTask() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
LRUQueue2::clientTask();
exit();
}'
);
return;
}
#end

}

lruqueue2.hx: LRU queue broker using CZMQ

One thing CZMQ provides is clean interrupt handling. This means that Ctrl-C will cause any blocking ØMQ call to exit with a return code -1 and errno set to EINTR. The CZMQ message recv methods will return NULL in such cases. So, you can cleanly exit a loop like this:

while (1) {
zmsg_t *zmsg = zmsg_recv (worker);
zframe_print (zmsg_last (zmsg), "Worker: ");
zframe_reset (zmsg_last (zmsg), "OK", 2);
zmsg_send (&zmsg, worker);
}

Or, if you're doing zmq_poll, test on the return code:

while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}

The previous example still uses zmq_poll(3). So how about reactors? The CZMQ zloop reactor is simple but functional. It lets you:

  • Set a reader on any socket, i.e. code that is called whenever the socket has input.
  • Cancel a reader on a socket.
  • Set a timer that goes off once or multiple times at specific intervals.

zloop of course uses zmq_poll(3) internally. It rebuilds its poll set each time you add or remove readers, and it calculates the poll timeout to match the next timer. Then, it calls the reader and timer handlers for each socket and timer that needs attention.

When we use a reactor pattern, our code turns inside out. The main logic looks like this:

int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
if (rc == -1)
break; // Interrupted

While the actual handling of messages sits inside dedicated functions or methods. You may not like the style, it's a matter of taste. What it does help with is mixing timers and socket activity. In the rest of this text we'll use zmq_poll(3) in simpler cases, and zloop in more complex examples.

Here is the LRU queue broker rewritten once again, this time to use zloop:

package ;
import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZFrame;
import org.zeromq.ZLoop;
import org.zeromq.ZMsg;
#if (neko || cpp)
import neko.vm.Thread;
#end
import haxe.Stack;
import org.zeromq.ZContext;
import org.zeromq.ZSocket;
using org.zeromq.ZSocket;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;

/**
* Least - recently used (LRU) queue device 3
* Demonstrates use of Zxxxx.hx API and reactor style using the ZLoop class.
*
* While this example runs in a single process, that is just to make
* it easier to start and stop the example. Each thread has its own
* context and conceptually acts as a separate process.
*
* See: http://zguide.zeromq.org/page:all#A-High-Level-API-for-MQ
*/

class LRUQueue3
{

private static inline var NBR_CLIENTS = 10;
private static inline var NBR_WORKERS = 3;

// Signals workers are ready
private static inline var LRU_READY:String = String.fromCharCode(1);

private static inline var WORKER_DONE:Bytes = Bytes.ofString("OK");

/**
* Basic request-reply client using REQ socket.
*/

public static function clientTask() {
var context:ZContext = new ZContext();
var client:ZMQSocket = context.createSocket(ZMQ_REQ);
var id = ZHelpers.setID(client);
client.connectEndpoint("ipc", "/tmp/frontend.ipc");

while (true) {
ZFrame.newStringFrame("HELLO").send(client);
var reply = ZFrame.recvFrame(client);
if (reply == null) {
break;
}
Lib.println("Client "+id+": " + reply.toString());
Sys.sleep(1);
}

context.destroy();
}

/**
* Worker using REQ socket to do LRU routing.
*/

public static function workerTask() {
var context:ZContext = new ZContext();
var worker:ZMQSocket = context.createSocket(ZMQ_REQ);
var id = ZHelpers.setID(worker);
worker.connectEndpoint("ipc", "/tmp/backend.ipc");

// Tell broker we're ready to do work
ZFrame.newStringFrame(LRU_READY).send(worker);

// Process messages as they arrive
while (true) {
var msg:ZMsg = ZMsg.recvMsg(worker);

if (msg == null) {
break;
}
// Lib.println("Worker " + id + " received " + msg.toString());
msg.last().reset(WORKER_DONE);
msg.send(worker);
}
context.destroy();
}

// Hold information baout our LRU Queue structure
private static var frontend:ZMQSocket;
private static var backend:ZMQSocket;
private static var workerQueue:List<ZFrame>;

/**
* Handle input from client, on frontend
* @param loop
* @param socket
* @return
*/

private static function handleFrontEnd(loop:ZLoop, socket:ZMQSocket):Int {
var msg = ZMsg.recvMsg(frontend);
if (msg != null) {
msg.wrap(workerQueue.pop());
msg.send(backend);
// Cancel reader on frontend if we went from 1 to 0 workers
if (workerQueue.length == 0)
loop.unregisterPoller({socket:frontend,event:ZMQ.ZMQ_POLLIN()});
}
return 0;
}

/**
* Hande input from worker on backend
* @param loop
* @param socket
* @return
*/

private static function handleBackEnd(loop:ZLoop, socket:ZMQSocket):Int {
var msg:ZMsg = ZMsg.recvMsg(backend);
if (msg != null) {
var address = msg.unwrap();
workerQueue.add(address);
if (workerQueue.length == 1)
loop.registerPoller( { socket:frontend, event:ZMQ.ZMQ_POLLIN() }, handleFrontEnd);
// Forward message to client if it is not a READY
var frame = msg.first();
if (frame.streq(LRU_READY))
msg.destroy();
else
msg.send(frontend);
}
return 0;
}

public static function main() {
Lib.println("** LRUQueue3 (see: http://zguide.zeromq.org/page:all#A-High-Level-API-for-MQ)");

#if php
// PHP appears to require tasks to be forked before main process creates ZMQ context
for (client_nbr in 0 NBR_CLIENTS) {
forkClientTask();
}
for (worker_nbr in 0 NBR_WORKERS) {
forkWorkerTask();
}
#end
// Prepare our context and sockets
var context:ZContext = new ZContext();
frontend = context.createSocket(ZMQ_ROUTER);
backend = context.createSocket(ZMQ_ROUTER);
frontend.bindEndpoint("ipc", "/tmp/frontend.ipc");
backend.bindEndpoint("ipc", "/tmp/backend.ipc");
#if !php
// Non-PHP targets require threads to be created after main thread has set up ZMQ Context
for (client_nbr in 0 NBR_CLIENTS) {
Thread.create(clientTask);
}
for (worker_nbr in 0 NBR_WORKERS) {
Thread.create(workerTask);
}
#end

// Logic of LRU loop:
// - Poll backend always, frontend only if 1 or more worker si ready
// - If worker replies, queue worker as ready and forward reply
// to client if necessary.
// - If client requests, pop next worker and send request to it.

// Initialise queue of available workers
workerQueue = new List<ZFrame>();

// Prepare reactor and fire it up
var reactor:ZLoop = new ZLoop();
reactor.registerPoller( { socket:backend, event:ZMQ.ZMQ_POLLIN() }, handleBackEnd);
reactor.start();
reactor.destroy();

// When we're done, clean up properly
for (f in workerQueue) {
f.destroy();
}
context.destroy();
}

#if php
private static inline function forkWorkerTask() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
LRUQueue3::workerTask();
exit();
}'
);
return;
}

private static inline function forkClientTask() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
LRUQueue3::clientTask();
exit();
}'
);
return;
}
#end

}

lruqueue3.hx: LRU queue broker using zloop

Getting applications to properly shut-down when you send them Ctrl-C can be tricky. If you use the zctx class it'll automatically set-up signal handling, but your code still has to cooperate. You must break any loop if zmq_poll returns -1 or if any of the recv methods (zstr_recv, zframe_recv, zmsg_recv) return NULL. If you have nested loops, it can be useful to make the outer ones conditional on !zctx_interrupted.

Asynchronous Client-Server

topprevnext

In the router-to-dealer example we saw a 1-to-N use case where one client talks asynchronously to multiple workers. We can turn this upside-down to get a very useful N-to-1 architecture where various clients talk to a single server, and do this asynchronously.

Figure 43 - Asynchronous Client-Server

fig43.png

Here's how it works:

  • Clients connect to the server and send requests.
  • For each request, the server sends 0 to N replies.
  • Clients can send multiple requests without waiting for a reply.
  • Servers can send multiple replies without waiting for new requests.

Here's code that shows how this works:

package ;
import neko.Lib;
import org.zeromq.ZMQException;
#if !php
import neko.Random;
import neko.vm.Thread;
#end
import neko.Sys;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/**
* Asynchronous client-server (DEALER to ROUTER)
*
* While this example runs in a single process, that is just to make
* it easier to start and stop the example. Each thread has its own
* context and conceptually acts as a separate process.
*
* See: http://zguide.zeromq.org/page:all#Asynchronous-Client-Server
*/

class ASyncSrv
{

#if php
private static inline var internalServerEndpoint:String = "ipc:///tmp/backend";
#else
private static inline var internalServerEndpoint:String = "inproc://backend";
#end
/**
* This is our client task
* It connects to the server, and then sends a request once per second
* It collects responses as they arrive, and it prints them out. We will
* run several client tasks in parallel, each with a different random ID.
*/

public static function clientTask(context:ZContext) {
var client:ZMQSocket = context.createSocket(ZMQ_DEALER);

// Set random identity to make tracing easier
var id = ZHelpers.setID(client);
client.connect("tcp://localhost:5570");

//trace ("Started client " + id);

var poller = new ZMQPoller();
poller.registerSocket(client, ZMQ.ZMQ_POLLIN());
var request_nbr = 0;

while (true) {
for (centitick in 0 100) {
try {
poller.poll(10000); // Poll for 10ms
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace (e.toString());
break;
}
if (poller.pollin(1)) {
var msg:ZMsg = ZMsg.recvMsg(client);
Lib.println("Client: " + id + " received:" + msg.last().toString());
msg.destroy();
}
}
if (poller == null)
break; // Interrupted
ZMsg.newStringMsg("request #" + ++request_nbr).send(client);
}
context.destroy();
}

/**
* Accept a request and reply with the same text a random number of
* times, with random delays between replies.
*/

public static function serverWorker(context:ZContext) {
var worker:ZMQSocket = context.createSocket(ZMQ_DEALER);
worker.connect(internalServerEndpoint);

while (true) {
// The DEALER socket gives us the address envelope and message
var msg = ZMsg.recvMsg(worker);
var address:ZFrame = msg.pop();
var content:ZFrame = msg.pop();
//trace ("Got request from " + address.toString());
if (content == null)
break;
msg.destroy();

// Send 0…4 replies back
#if php
var replies = untyped __php__('rand(0, 4)');
#else
var replies = new Random().int(4);
#end
for (reply in 0replies) {
// Sleep for some fraction of a second
#if php
Sys.sleep((untyped __php__('rand(0, 1000)') + 1) / 1000);
#else
Sys.sleep(new Random().float() + 0.001);
#end
address.send(worker, ZFrame.ZFRAME_MORE + ZFrame.ZFRAME_REUSE);
content.send(worker, ZFrame.ZFRAME_REUSE);
}
address.destroy();
content.destroy();
}
}

/**
* This is our server task
* It uses the multithreaded server model to deal requests out to a pool
* of workers and route replies back to clients. One worker can handle
* one request at a time but one client can talk to multiple workers at
* once.
*/

public static function serverTask(context:ZContext) {

#if php
for (thread_nbr in 0 5) {
forkServerWorker(context);
}
#end
// Frontend socket talks to clients over TCP
var frontend = context.createSocket(ZMQ_ROUTER);
frontend.bind("tcp://*:5570");

// Backend socket talks to workers over inproc
var backend = context.createSocket(ZMQ_DEALER);
backend.bind(internalServerEndpoint);

// Launch pool of worker threads, precise number is not critical
#if !php
for (thread_nbr in 0 5) {
Thread.create(callback(serverWorker,context));
}
#end
// Connect backend to frontend via queue device
// We could do this via //
// new ZMQDevice(ZMQ_QUEUE, frontend, backend);
// but doing it ourselves means we can debug this more easily

// Switch messages between frontend and backend
var poller:ZMQPoller = new ZMQPoller();
poller.registerSocket(frontend, ZMQ.ZMQ_POLLIN());
poller.registerSocket(backend, ZMQ.ZMQ_POLLIN());

while (true) {
try {
poller.poll( -1);
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace (e.toString());
break;
}
if (poller.pollin(1)) {
var msg = ZMsg.recvMsg(frontend);
//trace("Request from client:"+msg.toString());
msg.send(backend);
}
if (poller.pollin(2)) {
var msg = ZMsg.recvMsg(backend);
//trace ("Reply from worker:" + msg.toString());//

msg.send(frontend);
}
}
context.destroy();
}

public static function main() {
Lib.println("** ASyncSrv (see: http://zguide.zeromq.org/page:all#Asynchronous-Client-Server)");

var context = new ZContext();

#if php
forkClientTask(context);
forkClientTask(context);
forkClientTask(context);
forkServerTask(context);
#else
Thread.create(callback(clientTask, context));
Thread.create(callback(clientTask, context));
Thread.create(callback(clientTask, context));
Thread.create(callback(serverTask, context));
#end

// Run for 5 seconds then quit
Sys.sleep(5);
context.destroy();
}

#if php
private static inline function forkServerWorker(context:ZContext) {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
ASyncSrv::serverWorker($context);
exit();
}'
);
return;
}

private static inline function forkClientTask(context:ZContext) {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
ASyncSrv::clientTask($context);
exit();
}'
);
return;
}

private static inline function forkServerTask(context:ZContext) {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
ASyncSrv::serverTask($context);
exit();
}'
);
return;
}

#end

}

asyncsrv.hx: Asynchronous client-server

Just run that example by itself. Like other multi-task examples, it runs in a single process but each task has its own context and conceptually acts as a separate process. You will see three clients (each with a random ID), printing out the replies they get from the server. Look carefully and you'll see each client task gets 0 or more replies per request.

Some comments on this code:

  • The clients send a request once per second, and get zero or more replies back. To make this work using zmq_poll(3), we can't simply poll with a 1-second timeout, or we'd end up sending a new request only one second after we received the last reply. So we poll at a high frequency (100 times at 1/100th of a second per poll), which is approximately accurate. This means the server could use requests as a form of heartbeat, i.e. detecting when clients are present or disconnected.
  • The server uses a pool of worker threads, each processing one request synchronously. It connects these to its frontend socket using an internal queue. To help debug this, the code implements its own queue device logic. In the C code, you can uncomment the zmsg_dump() calls to get debugging output.

Figure 44 - Detail of Asynchronous Server

fig44.png

Note that we're doing a dealer-to-router dialog between client and server, but internally between the server main thread and workers we're doing dealer-to-dealer. If the workers were strictly synchronous, we'd use REP. But since we want to send multiple replies we need an async socket. We do not want to route replies, they always go to the single server thread that sent us the request.

Let's think about the routing envelope. The client sends a simple message. The server thread receives a two-part message (real message prefixed by client identity). We have two possible designs for the server-to-worker interface:

  • Workers get unaddressed messages, and we manage the connections from server thread to worker threads explicitly using a router socket as backend. This would require that workers start by telling the server they exist, which can then route requests to workers and track which client is 'connected' to which worker. This is the LRU pattern we already covered.
  • Workers get addressed messages, and they return addressed replies. This requires that workers can properly decode and recode envelopes but it doesn't need any other mechanisms.

The second design is much simpler, so that's what we use:

     client          server       frontend       worker
   [ DEALER ]<---->[ ROUTER <----> DEALER <----> DEALER ]
             1 part         2 parts       2 parts

When you build servers that maintain stateful conversations with clients, you will run into a classic problem. If the server keeps some state per client, and clients keep coming and going, eventually it will run out of resources. Even if the same clients keep connecting, if you're using default identities, each connection will look like a new one.

We cheat in the above example by keeping state only for a very short time (the time it takes a worker to process a request) and then throwing away the state. But that's not practical for many cases.

To properly manage client state in a stateful asynchronous server you must:

  • Do heartbeating from client to server. In our example we send a request once per second, which can reliably be used as a heartbeat.
  • Store state using the client identity (whether generated or explicit) as key.
  • Detect a stopped heartbeat. If there's no request from a client within, say, two seconds, the server can detect this and destroy any state it's holding for that client.

Worked Example: Inter-Broker Routing

topprevnext

Let's take everything we've seen so far, and scale things up. Our best client calls us urgently and asks for a design of a large cloud computing facility. He has this vision of a cloud that spans many data centers, each a cluster of clients and workers, and that works together as a whole.

Because we're smart enough to know that practice always beats theory, we propose to make a working simulation using ØMQ. Our client, eager to lock down the budget before his own boss changes his mind, and having read great things about ØMQ on Twitter, agrees.

Establishing the Details

topprevnext

Several espressos later, we want to jump into writing code but a little voice tells us to get more details before making a sensational solution to entirely the wrong problem. "What kind of work is the cloud doing?", we ask. The client explains:

  • Workers run on various kinds of hardware, but they are all able to handle any task. There are several hundred workers per cluster, and as many as a dozen clusters in total.
  • Clients create tasks for workers. Each task is an independent unit of work and all the client wants is to find an available worker, and send it the task, as soon as possible. There will be a lot of clients and they'll come and go arbitrarily.
  • The real difficulty is to be able to add and remove clusters at any time. A cluster can leave or join the cloud instantly, bringing all its workers and clients with it.
  • If there are no workers in their own cluster, clients' tasks will go off to other available workers in the cloud.
  • Clients send out one task at a time, waiting for a reply. If they don't get an answer within X seconds they'll just send out the task again. This ain't our concern, the client API does it already.
  • Workers process one task at a time, they are very simple beasts. If they crash, they get restarted by whatever script started them.

So we double check to make sure that we understood this correctly:

  • "There will be some kind of super-duper network interconnect between clusters, right?", we ask. The client says, "Yes, of course, we're not idiots."
  • "What kind of volumes are we talking about?", we ask. The client replies, "Up to a thousand clients per cluster, each doing max. ten requests per second. Requests are small, and replies are also small, no more than 1K bytes each."

So we do a little calculation and see that this will work nicely over plain TCP. 2,500 clients x 10/second x 1,000 bytes x 2 directions = 50MB/sec or 400Mb/sec, not a problem for a 1Gb network.

It's a straight-forward problem that requires no exotic hardware or protocols, just some clever routing algorithms and careful design. We start by designing one cluster (one data center) and then we figure out how to connect clusters together.

Architecture of a Single Cluster

topprevnext

Workers and clients are synchronous. We want to use the LRU pattern to route tasks to workers. Workers are all identical, our facility has no notion of different services. Workers are anonymous, clients never address them directly. We make no attempt here to provide guaranteed delivery, retry, etc.

For reasons we already looked at, clients and workers won't speak to each other directly. It makes it impossible to add or remove nodes dynamically. So our basic model consists of the request-reply message broker we saw earlier.

Figure 45 - Cluster Architecture

fig45.png

Scaling to Multiple Clusters

topprevnext

Now we scale this out to more than one cluster. Each cluster has a set of clients and workers, and a broker that joins these together:

Figure 46 - Multiple Clusters

fig46.png

The question is: how do we get the clients of each cluster talking to the workers of the other cluster? There are a few possibilities, each with pros and cons:

  • Clients could connect directly to both brokers. The advantage is that we don't need to modify brokers or workers. But clients get more complex, and become aware of the overall topology. If we want to add, e.g. a third or forth cluster, all the clients are affected. In effect we have to move routing and fail-over logic into the clients and that's not nice.
  • Workers might connect directly to both brokers. But mama workers can't do that, they can only reply to one broker. We might use papas but papas don't give us customizable broker-to-worker routing like LRU, only the built-in load balancing. That's a fail, if we want to distribute work to idle workers: we precisely need LRU. One solution would be to use router sockets for the worker nodes. Let's label this "Idea #1".
  • Brokers could connect to each other. This looks neatest because it creates the fewest additional connections. We can't add clusters on the fly but that is probably out of scope. Now clients and workers remain ignorant of the real network topology, and brokers tell each other when they have spare capacity. Let's label this "Idea #2".

Let's explore Idea #1. In this model we have workers connecting to both brokers and accepting jobs from either.

Figure 47 - Idea 1 - Cross-connected Workers

fig47.png

It looks feasible. However it doesn't provide what we wanted, which was that clients get local workers if possible and remote workers only if it's better than waiting. Also workers will signal "ready" to both brokers and can get two jobs at once, while other workers remain idle. It seems this design fails because again we're putting routing logic at the edges.

So idea #2 then. We interconnect the brokers and don't touch the clients or workers, which are mamas like we're used to.

Figure 48 - Idea 2 - Brokers Talking to Each Other

fig48.png

This design is appealing because the problem is solved in one place, invisible to the rest of the world. Basically, brokers open secret channels to each other and whisper, like camel traders, "Hey, I've got some spare capacity, if you have too many clients give me a shout and we'll deal".

It is in effect just a more sophisticated routing algorithm: brokers become subcontractors for each other. Other things to like about this design, even before we play with real code:

  • It treats the common case (clients and workers on the same cluster) as default and does extra work for the exceptional case (shuffling jobs between clusters).
  • It lets us use different message flows for the different types of work. That means we can handle them differently, e.g. using different types of network connection.
  • It feels like it would scale smoothly. Interconnecting three, or more brokers doesn't get over-complex. If we find this to be a problem, it's easy to solve by adding a super-broker.

We'll now make a worked example. We'll pack an entire cluster into one process. That is obviously not realistic but it makes it simple to simulate, and the simulation can accurately scale to real processes. This is the beauty of ØMQ, you can design at the microlevel and scale that up to the macro level. Threads become processes, become boxes and the patterns and logic remain the same. Each of our 'cluster' processes contains client threads, worker threads, and a broker thread.

We know the basic model well by now:

  • The mama client (REQ) threads create workloads and pass them to the broker (ROUTER).
  • The mama worker (REQ) threads process workloads and return the results to the broker (ROUTER).
  • The broker queues and distributes workloads using the LRU routing model.

Federation vs. Peering

topprevnext

There are several possible ways to interconnect brokers. What we want is to be able to tell other brokers, "we have capacity", and then receive multiple tasks. We also need to be able to tell other brokers "stop, we're full". It doesn't need to be perfect: sometimes we may accept jobs we can't process immediately, then we'll do them as soon as possible.

The simplest interconnect is federation in which brokers simulate clients and workers for each other. We would do this by connecting our frontend to the other broker's backend socket. Note that it is legal to both bind a socket to an endpoint and connect it to other endpoints.

Figure 49 - Cross-connected Brokers in Federation Model

fig49.png

This would give us simple logic in both brokers and a reasonably good mechanism: when there are no clients, tell the other broker 'ready', and accept one job from it. The problem is also that it is too simple for this problem. A federated broker would be able to handle only one task at once. If the broker emulates a lock-step client and worker, it is by definition also going to be lock-step and if it has lots of available workers they won't be used. Our brokers need to be connected in a fully asynchronous fashion.

The federation model is perfect for other kinds of routing, especially service-oriented architectures or SOAs (which route by service name and proximity rather than LRU or load-balancing or random scatter). So don't dismiss it as useless, it's just not right for least-recently used and cluster load-balancing.

So instead of federation, let's look at a peering approach in which brokers are explicitly aware of each other and talk over privileged channels. Let's break this down, assuming we want to interconnect N brokers. Each broker has (N - 1) peers, and all brokers are using exactly the same code and logic. There are two distinct flows of information between brokers:

  • Each broker needs to tell its peers how many workers it has available at any time. This can be fairly simple information, just a quantity that is updated regularly. The obvious (and correct) socket pattern for this is publish-subscribe. So every broker opens a PUB socket and publishes state information on that, and every broker also opens a SUB socket and connects that to the PUB socket of every other broker, to get state information from its peers.
  • Each broker needs a way to delegate tasks to a peer and get replies back, asynchronously. We'll do this using router/router (ROUTER/ROUTER) sockets, no other combination works. Each broker has two such sockets: one for tasks it receives, one for tasks it delegates. If we didn't use two sockets it would be more work to know whether we were reading a request or a reply each time. That would mean adding more information to the message envelope.

And there is also the flow of information between a broker and its local clients and workers.

The Naming Ceremony

topprevnext

Three flows x two sockets for each flow = six sockets that we have to manage in the broker. Choosing good names is vital to keeping a multi-socket juggling act reasonably coherent in our minds. Sockets do something and what they do should form the basis for their names. It's about being able to read the code several weeks later on a cold Monday morning before coffee, and not feeling pain.

Let's do a shamanistic naming ceremony for the sockets. The three flows are:

  • A local request-reply flow between the broker and its clients and workers.
  • A cloud request-reply flow between the broker and its peer brokers.
  • A state flow between the broker and its peer brokers.

Finding meaningful names that are all the same length means our code will align beautifully. It may seem irrelevant but such attention to such details turn ordinary code into something more like art.

For each flow the broker has two sockets that we can orthogonally call the "frontend" and "backend". We've used these names quite often. A frontend receives information or tasks. A backend sends those out to other peers. The conceptual flow is from front to back (with replies going in the opposite direction from back to front).

So in all the code we write for this tutorial will use these socket names:

  • localfe and localbe for the local flow.
  • cloudfe and cloudbe for the cloud flow.
  • statefe and statebe for the state flow.

For our transport we'll use ipc for everything. This has the advantage of working like tcp in terms of connectivity (i.e. it's a disconnected transport, unlike inproc), yet we don't need IP addresses or DNS names, which would be a pain here. Instead, we will use ipc endpoints called something-local, something-cloud, and something-state, where something is the name of our simulated cluster.

You may be thinking that this is a lot of work for some names. Why not call them s1, s2, s3, s4, etc.? The answer is that if your brain is not a perfect machine, you need a lot of help when reading code, and we'll see that these names do help. It is a lot easier to remember "three flows, two directions" than "six different sockets".

Figure 50 - Broker Socket Arrangement

fig50.png

Note that we connect the cloudbe in each broker to the cloudfe in every other broker, and likewise we connect the statebe in each broker to the statefe in every other broker.

Prototyping the State Flow

topprevnext

Since each socket flow has its own little traps for the unwary, we will test them in real code one by one, rather than try to throw the whole lot into code in one go. When we're happy with each flow, we can put them together into a full program. We'll start with the state flow.

Figure 51 - The State Flow

fig51.png

Here is how this works in code:

package ;
import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMsg;
import org.zeromq.ZSocket;

/**
* Broker peering simulation (part 1)
* Prototypes the state flow.
*
* NB: If running from Run.hx, set ARG_OFFSET to 1
* If running directly, set ARG_OFFSET to 0
*/

class Peering1
{
private static inline var ARG_OFFSET = 1;

public static function main() {
Lib.println("** Peering1 (see: http://zguide.zeromq.org/page:all#Prototyping-the-State-Flow)");

// First argument is this broker's name
// Other arguments are our peers' names
if (Sys.args().length < 2+ARG_OFFSET) {
Lib.println("syntax: ./Peering1 me {you} …");
return;
}

var self = Sys.args()[0+ARG_OFFSET];
Lib.println("I: preparing broker at " + self + " …");

// Prepare our context and sockets
var ctx = new ZContext();
var statebe = ctx.createSocket(ZMQ_PUB);
statebe.bind("ipc:///tmp/" + self + "-state.ipc");

// Connect statefe to all peers
var statefe = ctx.createSocket(ZMQ_SUB);
statefe.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString(""));
for (argn in 1+ARG_OFFSET Sys.args().length) {
var peer = Sys.args()[argn];
Lib.println("I: connecting to state backend at '" + peer + "'");
statefe.connect("ipc:///tmp/" + peer + "-state.ipc");
}
// Send out status messages to peers, and collect from peers
// The ZMQPoller timeout defines our own heartbeating
//
var poller = new ZMQPoller();
while (true) {
// Initialise poll set
poller.registerSocket(statefe, ZMQ.ZMQ_POLLIN());
try {
// Poll for activity, or 1 second timeout
var res = poller.poll(1000 * 1000);
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace (e.toString());
return;
}
// Handle incoming status messages
if (poller.pollin(1)) {
var msg = ZMsg.recvMsg(statefe);
var peerNameFrame = msg.first();
var availableFrame = msg.last();
Lib.println(peerNameFrame.toString() + " - " + availableFrame.toString() + " workers free");
} else {
// Send random value for worker availability
// We stick our own address onto the envelope
var msg:ZMsg = new ZMsg();
msg.addString(self);
msg.addString(Std.string(ZHelpers.randof(10)));
msg.send(statebe);
}
}

ctx.destroy();
}
}

peering1.hx: Prototype state flow

Notes about this code:

  • Each broker has an identity that we use to construct ipc endpoint names. A real broker would need to work with TCP and a more sophisticated configuration scheme. We'll look at such schemes later in this book but for now, using generated ipc names lets us ignore the problem of where to get TCP/IP addresses or names from.
  • We use a zmq_poll(3) loop as the core of the program. This processes incoming messages and sends out state messages. We send a state message only if we did not get any incoming messages and we waited for a second. If we send out a state message each time we get one in, we'll get message storms.
  • We use a two-part pubsub message consisting of sender address and data. Note that we will need to know the address of the publisher in order to send it tasks, and the only way is to send this explicitly as a part of the message.
  • We don't set identities on subscribers, because if we did then we'd get out of date state information when connecting to running brokers.
  • We don't set a HWM on the publisher, but if we were using ØMQ/2.x that would be a wise idea.

We can build this little program and run it three times to simulate three clusters. Let's call them DC1, DC2, and DC3 (the names are arbitrary). We run these three commands, each in a separate window:

peering1 DC1 DC2 DC3  #  Start DC1 and connect to DC2 and DC3
peering1 DC2 DC1 DC3  #  Start DC2 and connect to DC1 and DC3
peering1 DC3 DC1 DC2  #  Start DC3 and connect to DC1 and DC2

You'll see each cluster report the state of its peers, and after a few seconds they will all happily be printing random numbers once per second. Try this and satisfy yourself that the three brokers all match up and synchronize to per-second state updates.

In real life we'd not send out state messages at regular intervals but rather whenever we had a state change, i.e. whenever a worker becomes available or unavailable. That may seem like a lot of traffic but state messages are small and we've established that the inter-cluster connections are super-fast.

If we wanted to send state messages at precise intervals we'd create a child thread and open the statebe socket in that thread. We'd then send irregular state updates to that child thread from our main thread, and allow the child thread to conflate them into regular outgoing messages. This is more work than we need here.

Prototyping the Local and Cloud Flows

topprevnext

Let's now prototype at the flow of tasks via the local and cloud sockets. This code pulls requests from clients and then distributes them to local workers and cloud peers on a random basis.

Figure 52 - The Flow of Tasks

fig52.png

Before we jump into the code, which is getting a little complex, let's sketch the core routing logic and break it down into a simple but robust design.

We need two queues, one for requests from local clients and one for requests from cloud clients. One option would be to pull messages off the local and cloud frontends, and pump these onto their respective queues. But this is kind of pointless because ØMQ sockets are queues already. So let's use the ØMQ socket buffers as queues.

This was the technique we used in the LRU queue broker, and it worked nicely. We only read from the two frontends when there is somewhere to send the requests. We can always read from the backends, since they give us replies to route back. As long as the backends aren't talking to us, there's no point in even looking at the frontends.

So our main loop becomes:

  • Poll the backends for activity. When we get a message, it may be "READY" from a worker or it may be a reply. If it's a reply, route back via the local or cloud frontend.
  • If a worker replied, it became available, so we queue it and count it.
  • While there are workers available, take a request, if any, from either frontend and route to a local worker, or randomly, a cloud peer.

Randomly sending tasks to a peer broker rather than a worker simulates work distribution across the cluster. It's dumb but that is fine for this stage.

We use broker identities to route messages between brokers. Each broker has a name, which we provide on the command line in this simple prototype. As long as these names don't overlap with the ØMQ-generated UUIDs used for client nodes, we can figure out whether to route a reply back to a client or to a broker.

Here is how this works in code. The interesting part starts around the comment "Interesting part".

package ;
import org.zeromq.ZMQException;
import ZHelpers;
import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import neko.io.File;
import neko.io.FileInput;
#if (neko || cpp)
import neko.vm.Thread;
#end
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMsg;
import org.zeromq.ZFrame;

/**
* Broker peering simulation (part 2)
* Prototypes the request-reply flow
*
* While this example runs in a single process (for cpp & neko) and forked processes (for php), that is just
* to make it easier to start and stop the example. Each thread has its own
* context and conceptually acts as a separate process.
*
* See: http://zguide.zeromq.org/page:all#Prototyping-the-Local-and-Cloud-Flows
*
* NB: If running from Run.hx, set ARG_OFFSET to 1
* If running directly, set ARG_OFFSET to 0
*/

class Peering2
{

private static inline var NBR_CLIENTS = 10;
private static inline var NBR_WORKERS = 3;
private static inline var LRU_READY:String = String.fromCharCode(1); // Signals workers are ready
private static inline var WORKER_DONE = "OK";

// Our own name; in practise this would be configured per node
private static var self:String;

private static inline var ARG_OFFSET = 1;

/**
* Request - reply client using REQ socket
*/

private static function clientTask() {
var ctx = new ZContext();
var client = ctx.createSocket(ZMQ_REQ);
client.connect("ipc:///tmp/" + self + "-localfe.ipc");

while (true) {
ZFrame.newStringFrame("HELLO").send(client);
var reply = ZFrame.recvFrame(client);
if (reply == null) {
break;
}
Lib.println("Client: " + reply.toString());
Sys.sleep(1);
}
ctx.destroy();
}

/**
* Worker using REQ socket to do LRU routing
*/

public static function workerTask() {
var context:ZContext = new ZContext();
var worker:ZMQSocket = context.createSocket(ZMQ_REQ);
worker.connect("ipc:///tmp/"+self+"-localbe.ipc");

// Tell broker we're ready to do work
ZFrame.newStringFrame(LRU_READY).send(worker);

// Process messages as they arrive
while (true) {
var msg:ZMsg = ZMsg.recvMsg(worker);
if (msg == null) {
break;
}
Lib.println("Worker received " + msg.last().toString());
msg.last().reset(Bytes.ofString(WORKER_DONE));
msg.send(worker);
}
context.destroy();
}

public static function main() {
Lib.println("** Peering2 (see: http://zguide.zeromq.org/page:all#Prototyping-the-Local-and-Cloud-Flows)");

// First argument is this broker's name
// Other arguments are our peers' names
if (Sys.args().length < 2+ARG_OFFSET) {
Lib.println("syntax: ./Peering2 me {you} …");
return;
}

self = Sys.args()[0 + ARG_OFFSET];

#if php
// Start local workers
for (worker_nbr in 0 NBR_WORKERS) {
forkWorkerTask();
}

// Start local clients
for (client_nbr in 0 NBR_CLIENTS) {
forkClientTask();
}
#end

Lib.println("I: preparing broker at " + self + " …");

// Prepare our context and sockets
var ctx = new ZContext();
var endpoint:String;

// Bind cloud frontend to endpoint
var cloudfe = ctx.createSocket(ZMQ_ROUTER);
cloudfe.setsockopt(ZMQ_IDENTITY, Bytes.ofString(self));
cloudfe.bind("ipc:///tmp/" + self + "-cloud.ipc");

// Connect cloud backend to all peers
var cloudbe = ctx.createSocket(ZMQ_ROUTER);
cloudbe.setsockopt(ZMQ_IDENTITY, Bytes.ofString(self));
for (argn in 1 + ARG_OFFSET Sys.args().length) {
var peer = Sys.args()[argn];
Lib.println("I: connecting to cloud frontend at '" + peer + "'");
cloudbe.connect("ipc:///tmp/" + peer + "-cloud.ipc");
}
// Prepare local frontend and backend
var localfe = ctx.createSocket(ZMQ_ROUTER);
localfe.bind("ipc:///tmp/" + self + "-localfe.ipc");
var localbe = ctx.createSocket(ZMQ_ROUTER);
localbe.bind("ipc:///tmp/" + self + "-localbe.ipc");

// Get user to tell us when we can start…
Lib.println("Press Enter when all brokers are started: ");
var f:FileInput = File.stdin();
var str:String = f.readLine();

#if !php
// Start local workers
for (worker_nbr in 0 NBR_WORKERS) {
Thread.create(workerTask);
}

// Start local clients
for (client_nbr in 0 NBR_CLIENTS) {
Thread.create(clientTask);
}
#end

// Interesting part
// -------------------------------------------------------------
// Request-reply flow
// - Poll backends and process local/cloud replies
// - While worker available, route localfe to local or cloud

// Queue of available workers
var capacity = 0;
var workerQueue:List<ZFrame> = new List<ZFrame>();
var backend = new ZMQPoller();
backend.registerSocket(localbe, ZMQ.ZMQ_POLLIN());
backend.registerSocket(cloudbe, ZMQ.ZMQ_POLLIN());
var frontend = new ZMQPoller();
frontend.registerSocket(localfe, ZMQ.ZMQ_POLLIN());
frontend.registerSocket(cloudfe, ZMQ.ZMQ_POLLIN());

while (true) {
var ret = 0;

try {
// If we have no workers anyhow, wait indefinitely
ret = backend.poll( {
if (capacity > 0) 1000 * 1000 else -1; } );
} catch (e:ZMQException) {
if (ZMQ.isInterrupted()) {
break;
}
trace (e.toString());
return;
}

var msg:ZMsg = null;

// Handle reply from local worker
if (backend.pollin(1)) {
msg = ZMsg.recvMsg(localbe);
if (msg == null)
break; // Interrupted
var address = msg.unwrap();
workerQueue.add(address);
capacity++;

// If it's READY, don't route the message any further
var frame = msg.first();
if (frame.streq(LRU_READY))
msg.destroy();
}
// Or handle reply from peer broker
else if (backend.pollin(2)) {
msg = ZMsg.recvMsg(cloudbe);
if (msg == null)
break;
// We don't use peer broker address for anything
var address = msg.unwrap();
}
// Route reply to cloud if it's addressed to a broker
if (msg != null && !msg.isEmpty()) {
for (argv in 1 + ARG_OFFSET Sys.args().length) {
if (!msg.isEmpty() && msg.first().streq(Sys.args()[argv])) {
msg.send(cloudfe);
}
}
}
// Route reply to client if we still need to
if (msg != null && !msg.isEmpty()) {
msg.send(localfe);
}
// Now route as many client requests as we can handle
while (capacity > 0) {
try {
ret = frontend.poll(0);
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace (e.toString());
return;
}
var reroutable = 0;
// We'll do peer brokers first, to prevent starvation
if (frontend.pollin(2)) {
msg = ZMsg.recvMsg(cloudfe);
reroutable = 0;
} else if (frontend.pollin(1)){
msg = ZMsg.recvMsg(localfe);
reroutable = 1;
} else
break; // No work, go back to the backends

// If reroutable, send to cloud 20% of the time
// Here we'd normally use cloud status information
//
if (reroutable > 0 && Sys.args().length > 1 + ARG_OFFSET && ZHelpers.randof(5) == 0) {
// Route to random broker peer
var randomPeer = ZHelpers.randof(Sys.args().length - (2 + ARG_OFFSET)) + (1 + ARG_OFFSET);
trace ("Routing to peer#"+randomPeer+":" + Sys.args()[randomPeer]);
msg.wrap(ZFrame.newStringFrame(Sys.args()[randomPeer]));
msg.send(cloudbe);
} else {
msg.wrap(workerQueue.pop());
msg.send(localbe);
capacity--;
}
}
}
// When we're done, clean up properly
ctx.destroy();

}

#if php
private static inline function forkClientTask() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
Peering2::clientTask();
exit();
}'
);
return;
}

private static inline function forkWorkerTask() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
Peering2::workerTask();
exit();
}'
);
return;
}

#end
}

peering2.hx: Prototype local and cloud flow

Run this by, for instance, starting two instance of the broker in two windows:

peering2 me you
peering2 you me

Some comments on this code:

  • Using the zmsg class makes life much easier, and our code much shorter. It's obviously an abstraction that works, and which should form part of your toolbox as a ØMQ programmer.
  • Since we're not getting any state information from peers, we naively assume they are running. The code prompts you to confirm when you've started all the brokers. In the real case we'd not send anything to brokers who had not told us they exist.

You can satisfy yourself that the code works by watching it run forever. If there were any misrouted messages, clients would end up blocking, and the brokers would stop printing trace information. You can prove that by killing either of the brokers. The other broker tries to send requests to the cloud, and one by one its clients block, waiting for an answer.

Putting it All Together

topprevnext

Let's put this together into a single package. As before, we'll run an entire cluster as one process. We're going to take the two previous examples and merge them into one properly working design that lets you simulate any number of clusters.

This code is the size of both previous prototypes together, at 270 LoC. That's pretty good for a simulation of a cluster that includes clients and workers and cloud workload distribution. Here is the code:

package ;
import org.zeromq.ZMQException;
import ZHelpers;
import haxe.io.Bytes;
import haxe.Stack;
import neko.Lib;
import neko.Sys;
#if (neko || cpp)
import neko.vm.Thread;
#end
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMsg;
import org.zeromq.ZFrame;

/**
* Broker peering simulation (part 3)
* Prototypes the full flow of status and tasks
*
* While this example runs in a single process (for cpp & neko) and forked processes (for php), that is just
* to make it easier to start and stop the example. Each thread has its own
* context and conceptually acts as a separate process.
*
* See: http://zguide.zeromq.org/page:all#Putting-it-All-Together
*
* NB: If running from Run.hx, set ARG_OFFSET to 1
* If running directly, set ARG_OFFSET to 0
*/

class Peering3
{

private static inline var NBR_CLIENTS = 10;
private static inline var NBR_WORKERS = 3;
private static inline var LRU_READY:String = String.fromCharCode(1); // Signals workers are ready

// Our own name; in practise this would be configured per node
private static var self:String;

private static inline var ARG_OFFSET = 1;

/**
* Request - reply client using REQ socket
* To simulate load, clients issue a burst of requests and then
* sleep for a random period.
*/

private static function clientTask() {
var ctx = new ZContext();
var client = ctx.createSocket(ZMQ_REQ);
client.connect("ipc:///tmp/" + self + "-localfe.ipc");
var monitor = ctx.createSocket(ZMQ_PUSH);
monitor.connect("ipc:///tmp/" + self + "-monitor.ipc");

var poller = new ZMQPoller();
poller.registerSocket(client, ZMQ.ZMQ_POLLIN());

while (true) {
Sys.sleep(ZHelpers.randof(5));
var burst = ZHelpers.randof(14);
for (i in 0 burst) {
var taskID = StringTools.hex(ZHelpers.randof(0x10000), 4);
// Send request with random hex ID
Lib.println("Client send task " + taskID);
try {
ZFrame.newStringFrame(taskID).send(client);
} catch (e:ZMQException) {
trace("ZMQException #:" + ZMQ.errNoToErrorType(e.errNo) + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
return; // quit
} catch (e:Dynamic) {
trace (e);
}

// Wait max ten seconds for a reply, then complain
try {
poller.poll(10 * 1000 * 1000);
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace("ZMQException #:" + ZMQ.errNoToErrorType(e.errNo) + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
return; // quit
}
if (poller.pollin(1)) {
var reply = ZFrame.recvFrame(client);
if (reply == null)
break;
// Worker is supposed to answer us with our task id
if (!reply.streq(taskID)) {
Lib.println("E: Returned task ID:" + reply.toString() + " does not match requested taskID:" + taskID);
break;
}
} else {
ZMsg.newStringMsg("E: CLIENT EXIT - lost task " + taskID).send(monitor);
}
}
}
ctx.destroy();
}

/**
* Worker using REQ socket to do LRU routing
*/

public static function workerTask() {
var context:ZContext = new ZContext();
var worker:ZMQSocket = context.createSocket(ZMQ_REQ);
worker.connect("ipc:///tmp/"+self+"-localbe.ipc");

// Tell broker we're ready to do work
ZFrame.newStringFrame(LRU_READY).send(worker);

// Process messages as they arrive
while (true) {
try {
var msg:ZMsg = ZMsg.recvMsg(worker);
if (msg == null) {
context.destroy();
return;
}
Lib.println("Worker received " + msg.last().toString());
// Workers are busy for 0 / 1/ 2 seconds
Sys.sleep(ZHelpers.randof(2));
msg.send(worker);
} catch (e:ZMQException) {
trace("ZMQException #:" + ZMQ.errNoToErrorType(e.errNo) + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
}
context.destroy();
}

public static function main() {
Lib.println("** Peering3 (see: http://zguide.zeromq.org/page:all#Putting-it-All-Together)");

// First argument is this broker's name
// Other arguments are our peers' names
if (Sys.args().length < 2+ARG_OFFSET) {
Lib.println("syntax: ./Peering3 me {you} …");
return;
}

self = Sys.args()[0 + ARG_OFFSET];

#if php
// Start local workers
for (worker_nbr in 0 NBR_WORKERS) {
forkWorkerTask();
}

// Start local clients
for (client_nbr in 0 NBR_CLIENTS) {
forkClientTask();
}
#end

Lib.println("I: preparing broker at " + self + " …");

// Prepare our context and sockets
var ctx = new ZContext();
var endpoint:String;

// Bind cloud frontend to endpoint
var cloudfe = ctx.createSocket(ZMQ_ROUTER);
cloudfe.setsockopt(ZMQ_IDENTITY, Bytes.ofString(self));
cloudfe.bind("ipc:///tmp/" + self + "-cloud.ipc");

// Bind state backend / publisher to endpoint
var statebe = ctx.createSocket(ZMQ_PUB);
statebe.bind("ipc:///tmp/" + self + "-state.ipc");

// Connect cloud backend to all peers
var cloudbe = ctx.createSocket(ZMQ_ROUTER);
cloudbe.setsockopt(ZMQ_IDENTITY, Bytes.ofString(self));
for (argn in 1 + ARG_OFFSET Sys.args().length) {
var peer = Sys.args()[argn];
Lib.println("I: connecting to cloud frontend at '" + peer + "'");
cloudbe.connect("ipc:///tmp/" + peer + "-cloud.ipc");
}

// Connect statefe to all peers
var statefe = ctx.createSocket(ZMQ_SUB);
statefe.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString(""));
for (argn in 1+ARG_OFFSET Sys.args().length) {
var peer = Sys.args()[argn];
Lib.println("I: connecting to state backend at '" + peer + "'");
statefe.connect("ipc:///tmp/" + peer + "-state.ipc");
}
// Prepare local frontend and backend
var localfe = ctx.createSocket(ZMQ_ROUTER);
localfe.bind("ipc:///tmp/" + self + "-localfe.ipc");
var localbe = ctx.createSocket(ZMQ_ROUTER);
localbe.bind("ipc:///tmp/" + self + "-localbe.ipc");

// Prepare monitor socket
var monitor = ctx.createSocket(ZMQ_PULL);
monitor.bind("ipc:///tmp/" + self + "-monitor.ipc");

#if !php
// Start local workers
for (worker_nbr in 0 NBR_WORKERS) {
Thread.create(workerTask);
}

// Start local clients
for (client_nbr in 0 NBR_CLIENTS) {
Thread.create(clientTask);
}
#end

// Interesting part
// -------------------------------------------------------------
// Publish-subscribe flow
// - Poll statefe and process capacity updates
// - Each time capacity changes, broadcast new value
// Request-reply flow
// - Poll primary and process local/cloud replies
// - While worker available, route localfe to local or cloud

// Queue of available workers
var localCapacity = 0;
var cloudCapacity = 0;
var workerQueue:List<ZFrame> = new List<ZFrame>();

var primary = new ZMQPoller();
primary.registerSocket(localbe, ZMQ.ZMQ_POLLIN());
primary.registerSocket(cloudbe, ZMQ.ZMQ_POLLIN());
primary.registerSocket(statefe, ZMQ.ZMQ_POLLIN());
primary.registerSocket(monitor, ZMQ.ZMQ_POLLIN());


while (true) {
trace ("**Start main loop iteration");
var ret = 0;

try {
// If we have no workers anyhow, wait indefinitely
ret = primary.poll( {
if (localCapacity > 0) 1000 * 1000 else -1; } );
} catch (e:ZMQException) {
if (ZMQ.isInterrupted()) {
break;
}
trace("ZMQException #:" + ZMQ.errNoToErrorType(e.errNo) + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
return;
}

// Track if capacity changes in this iteration
var previous = localCapacity;

var msg:ZMsg = null;

// Handle reply from local worker
if (primary.pollin(1)) {
msg = ZMsg.recvMsg(localbe);
if (msg == null)
break; // Interrupted
var address = msg.unwrap();
workerQueue.add(address);
localCapacity++;

// If it's READY, don't route the message any further
var frame = msg.first();
if (frame.streq(LRU_READY))
msg.destroy();
}
// Or handle reply from peer broker
else if (primary.pollin(2)) {
msg = ZMsg.recvMsg(cloudbe);
if (msg == null)
break;
// We don't use peer broker address for anything
var address = msg.unwrap();
}
// Route reply to cloud if it's addressed to a broker
if (msg != null && !msg.isEmpty()) {
for (argv in 1 + ARG_OFFSET Sys.args().length) {
if (!msg.isEmpty() && msg.first().streq(Sys.args()[argv])) {
trace ("Route reply to peer:" + Sys.args()[argv]);
msg.send(cloudfe);
}
}
}
// Route reply to client if we still need to
if (msg != null && !msg.isEmpty()) {
msg.send(localfe);
}

// Handle capacity updates
if (primary.pollin(3)) {
try {
var msg = ZMsg.recvMsg(statefe);
trace ("State msg received:" + msg.toString());
var availableFrame = msg.last();
cloudCapacity = Std.parseInt(availableFrame.data.toString());
} catch (e:ZMQException) {
trace("ZMQException #:" + ZMQ.errNoToErrorType(e.errNo) + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
} catch (e:Dynamic) {
trace (e);
}
}
// Handle monitor message
if (primary.pollin(4)) {
try {
var status = ZMsg.recvMsg(monitor);
Lib.println(status.first().data.toString());
return;
} catch (e:ZMQException) {
trace("ZMQException #:" + ZMQ.errNoToErrorType(e.errNo) + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
} catch (e:Dynamic) {
trace (e);
}
}

trace ("** Polling secondary sockets");
// Now route as many clients requests as we can handle
// - If we have local capacity we poll both localfe and cloudfe
// - If we have cloud capacity only, we poll just localfe
// - Route any request locally if we can, else to cloud
//
while (localCapacity + cloudCapacity > 0) {
trace (" ** polling secondary, with total capacity:" + Std.string(localCapacity + cloudCapacity));
var secondary = new ZMQPoller();
secondary.registerSocket(localfe, ZMQ.ZMQ_POLLIN());

if (localCapacity > 0) {
secondary.registerSocket(cloudfe, ZMQ.ZMQ_POLLIN());
}
try {
ret = secondary.poll(0);
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace("ZMQException #:" + ZMQ.errNoToErrorType(e.errNo) + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
return;
}
// We'll do peer brokers first, to prevent starvation
trace (" ** Secondary poll completed");
if (secondary.pollin(1)) {
trace (" ** About to receive from localfe");
msg = ZMsg.recvMsg(localfe);
trace (msg.toString());
} else if (secondary.pollin(2)) {
trace (" ** About to receive from cloudfe");
msg = ZMsg.recvMsg(cloudfe);
trace (msg.toString());
} else {
trace (" ** No requests, go back to primary");
break; // No work, go back to the primary
}
if (localCapacity > 0) {
var frame = workerQueue.pop();
msg.wrap(frame);
msg.send(localbe);
localCapacity--;
} else {
// Route to random broker peer
var randomPeer = ZHelpers.randof(Sys.args().length - (2 + ARG_OFFSET)) + (1 + ARG_OFFSET);
trace ("Routing to peer#"+randomPeer+":" + Sys.args()[randomPeer]);
msg.wrap(ZFrame.newStringFrame(Sys.args()[randomPeer]));
msg.send(cloudbe);
}
}

trace ("Updating status :"+ Std.string(localCapacity != previous));
if (localCapacity != previous) {
// We stick our own address onto the envelope
msg = new ZMsg();
msg.add(ZFrame.newStringFrame(Std.string(localCapacity)));
msg.wrap(ZFrame.newStringFrame(self));
trace ("Updating status:" + msg.toString());
msg.send(statebe);
}
}

// When we're done, clean up properly
ctx.destroy();

}

#if php
private static inline function forkClientTask() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
Peering2::clientTask();
exit();
}'
);
return;
}

private static inline function forkWorkerTask() {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
Peering2::workerTask();
exit();
}'
);
return;
}

#end
}

peering3.hx: Full cluster simulation

It's a non-trivial program and took about a day to get working. These are the highlights:

  • The client threads detect and report a failed request. They do this by polling for a response and if none arrives after a while (10 seconds), printing an error message.
  • Client threads don't print directly, but instead send a message to a 'monitor' socket (PUSH) that the main loop collects (PULL) and prints off. This is the first case we've seen of using ØMQ sockets for monitoring and logging; this is a big use case we'll come back to later.
  • Clients simulate varying loads to get the cluster 100% at random moments, so that tasks are shifted over to the cloud. The number of clients and workers, and delays in the client and worker threads control this. Feel free to play with them to see if you can make a more realistic simulation.
  • The main loop uses two pollsets. It could in fact use three: information, backends, and frontends. As in the earlier prototype, there is no point in taking a frontend message if there is no backend capacity.

These are some of the problems that hit during development of this program:

  • Clients would freeze, due to requests or replies getting lost somewhere. Recall that the ØMQ ROUTER/router socket drops messages it can't route. The first tactic here was to modify the client thread to detect and report such problems. Secondly, I put zmsg_dump() calls after every recv() and before every send() in the main loop, until it was clear what the problems were.
  • The main loop was mistakenly reading from more than one ready socket. This caused the first message to be lost. Fixed that by reading only from the first ready socket.
  • The zmsg class was not properly encoding UUIDs as C strings. This caused UUIDs that contain 0 bytes to be corrupted. Fixed by modifying zmsg to encode UUIDs as printable hex strings.

This simulation does not detect disappearance of a cloud peer. If you start several peers and stop one, and it was broadcasting capacity to the others, they will continue to send it work even if it's gone. You can try this, and you will get clients that complain of lost requests. The solution is twofold: first, only keep the capacity information for a short time so that if a peer does disappear, its capacity is quickly set to 'zero'. Second, add reliability to the request-reply chain. We'll look at reliability in the next chapter.

Chapter Four - Reliable Request-Reply

topprevnext

In Chapter Three we looked at advanced use of ØMQ's request-reply pattern with worked examples. In this chapter we'll look at the general question of reliability and build a set of reliable messaging patterns on top of ØMQ's core request-reply pattern.

In this chapter we focus heavily on user-space 'patterns', which are reusable models that help you design your ØMQ architecture:

  • The Lazy Pirate pattern: reliable request reply from the client side.
  • The Simple Pirate pattern: reliable request-reply using a LRU queue.
  • The Paranoid Pirate pattern: reliable request-reply with heartbeating.
  • The Majordomo pattern: service-oriented reliable queuing.
  • The Titanic pattern: disk-based / disconnected reliable queuing.
  • The Binary Star pattern: primary-backup server fail-over.
  • The Freelance pattern: brokerless reliable request-reply.

What is "Reliability"?

topprevnext

To understand what 'reliability' means, we have to look at its opposite, namely failure. If we can handle a certain set of failures, we are reliable with respect to those failures. No more, no less. So let's look at the possible causes of failure in a distributed ØMQ application, in roughly descending order of probability:

  • Application code is the worst offender. It can crash and exit, freeze and stop responding to input, run too slowly for its input, exhaust all memory, etc.
  • System code - like brokers we write using ØMQ - can die. System code should be more reliable than application code but can still crash and burn, and especially run out of memory if it tries to compensate for slow clients.
  • Message queues can overflow, typically in system code that has learned to deal brutally with slow clients. When a queue overflows, it starts to discard messages.
  • Networks can fail temporarily, causing intermittent message loss. Such errors are hidden to ØMQ applications since it automatically reconnects peers after a network-forced disconnection.
  • Hardware can fail and take with it all the processes running on that box.
  • Networks can fail in exotic ways, e.g. some ports on a switch may die and those parts of the network become inaccessible.
  • Entire data centers can be struck by lightning, earthquakes, fire, or more mundane power or cooling failures.

To make a software system fully reliable against all of these possible failures is an enormously difficult and expensive job and goes beyond the scope of this modest guide.

Since the first five cases cover 99.9% of real world requirements outside large companies (according to a highly scientific study I just ran), that's what we'll look at. If you're a large company with money to spend on the last two cases, contact me immediately, there's a large hole behind my beach house waiting to be converted into a pool.

Designing Reliability

topprevnext

So to make things brutally simple, reliability is "keeping things working properly when code freezes or crashes", a situation we'll shorten to "dies". However the things we want to keep working properly are more complex than just messages. We need to take each core ØMQ messaging pattern and see how to make it work (if we can) even when code dies.

Let's take them one by one:

  • Request-reply: if the server dies (while processing a request), the client can figure that out since it won't get an answer back. Then it can give up in a huff, wait and try again later, find another server, etc. As for the client dying, we can brush that off as "someone else's problem" for now.
  • Publish-subscribe: if the client dies (having gotten some data), the server doesn't know about it. Pubsub doesn't send any information back from client to server. But the client can contact the server out-of-band, e.g. via request-reply, and ask, "please resend everything I missed". As for the server dying, that's out of scope for here. Subscribers can also self-verify that they're not running too slowly, and take action (e.g. warn the operator, and die) if they are.
  • Pipeline: if a worker dies (while working), the ventilator doesn't know about it. Pipelines, like pubsub, and the grinding gears of time, only work in one direction. But the downstream collector can detect that one task didn't get done, and send a message back to the ventilator saying, "hey, resend task 324!" If the ventilator or collector dies, then whatever upstream client originally sent the work batch can get tired of waiting and resend the whole lot. It's not elegant but system code should really not die often enough to matter.

In this chapter we'll focus on request-reply, and we'll cover reliable pub-sub and pipeline in the following chapters.

The basic request-reply pattern (a REQ client socket doing a blocking send/recv to a REP server socket) scores low on handling the most common types of failure. If the server crashes while processing the request, the client just hangs forever. If the network loses the request or the reply, the client hangs forever.

It is a lot better than TCP, thanks to ØMQ's ability to reconnect peers silently, to load-balance messages, and so on. But it's still not good enough for real work. The only use case where you can trust the basic request-reply pattern is between two threads in the same process where there's no network or separate server process to die.

However, with a little extra work this humble pattern becomes a good basis for real work across a distributed network, and we get a set of reliable request-reply patterns I like to call the "Pirate" patterns. RRR!

There are, roughly, three ways to connect clients to servers, each needing a specific approach to reliability:

  • Multiple clients talking directly to a single server. Use case: single well-known server that clients need to talk to. Types of failure we aim to handle: server crashes and restarts, network disconnects.
  • Multiple clients talking to a single queue device that distributes work to multiple servers. Use case: workload distribution to workers. Types of failure we aim to handle: worker crashes and restarts, worker busy looping, worker overload, queue crashes and restarts, network disconnects.
  • Multiple clients talking to multiple servers with no intermediary devices. Use case: distributed services such as name resolution. Types of failure we aim to handle: service crashes and restarts, service busy looping, service overload, network disconnects.

Each of these has their trade-offs and often you'll mix them. We'll look at all three of these in detail.

Client-side Reliability (Lazy Pirate Pattern)

topprevnext

We can get very simple reliable request-reply with only some changes in the client. We call this the Lazy Pirate pattern. Rather than doing a blocking receive, we:

  • Poll the REQ socket and only receive from it when it's sure a reply has arrived.
  • Resend a request several times, if no reply arrived within a timeout period.
  • Abandon the transaction if after several requests, there is still no reply.

Figure 53 - The Lazy Pirate Pattern

fig53.png

If you try to use a REQ socket in anything than a strict send-recv fashion, you'll get an error (technically, the REQ socket implements a small finite-state machine to enforce the send-recv ping-pong, and so the error code is called "EFSM"). This is slightly annoying when we want to use REQ in a pirate pattern, because we may send several requests before getting a reply. The pretty good brute-force solution is to close and reopen the REQ socket after an error:

package ;
import haxe.Stack;
import neko.Lib;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import org.zeromq.ZMQPoller;
import org.zeromq.ZSocket;

/**
* Lazy Pirate client
* Use zmq_poll to do a safe request-reply
* To run, start lpserver and then randomly kill / restart it.
*
* @see http://zguide.zeromq.org/page:all#Client-side-Reliability-Lazy-Pirate-Pattern
*/

class LPClient
{

private static inline var REQUEST_TIMEOUT = 2500; // msecs, (> 1000!)
private static inline var REQUEST_RETRIES = 3; // Before we abandon
private static inline var SERVER_ENDPOINT = "tcp://localhost:5555";

public static function main() {
Lib.println("** LPClient (see: http://zguide.zeromq.org/page:all#Client-side-Reliability-Lazy-Pirate-Pattern)");
var ctx:ZContext = new ZContext();
Lib.println("I: connecting to server …");
var client = ctx.createSocket(ZMQ_REQ);
if (client == null)
return;
client.connect(SERVER_ENDPOINT);

var sequence = 0;
var retries_left = REQUEST_RETRIES;

var poller = new ZMQPoller();

while (retries_left > 0 && !ZMQ.isInterrupted()) {
// We send a request, then we work to get a reply
var request = Std.string(++sequence);
ZFrame.newStringFrame(request).send(client);

var expect_reply = true;
while (expect_reply) {
poller.registerSocket(client, ZMQ.ZMQ_POLLIN());
// Poll socket for a reply, with timeout
try {
var res = poller.poll(REQUEST_TIMEOUT * 1000);
} catch (e:ZMQException) {
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
ctx.destroy();
return;
}
// If we got a reply, process it
if (poller.pollin(1)) {
// We got a reply from the server, must match sequence
var replyFrame = ZFrame.recvFrame(client);
if (replyFrame == null)
break; // Interrupted
if (Std.parseInt(replyFrame.toString()) == sequence) {
Lib.println("I: server replied OK (" + sequence + ")");
retries_left = REQUEST_RETRIES;
expect_reply = false;
} else
Lib.println("E: malformed reply from server: " + replyFrame.toString());
replyFrame.destroy();
} else if (--retries_left == 0) {
Lib.println("E: server seems to be offline, abandoning");
break;
} else {
Lib.println("W: no response from server, retrying…");
// Old socket is confused, close it and open a new one
ctx.destroySocket(client);
Lib.println("I: reconnecting to server…");
client = ctx.createSocket(ZMQ_REQ);
client.connect(SERVER_ENDPOINT);
// Send request again, on new socket
ZFrame.newStringFrame(request).send(client);
}
poller.unregisterAllSockets();
}
}
ctx.destroy();
}
}

lpclient.hx: Lazy Pirate client

Run this together with the matching server:

package ;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;

/**
* Lazy Pirate server
* Binds REP socket to tcp://*:5555
* Like HWServer except:
* - echoes request as-is
* - randomly runs slowly, or exists to simulate a crash.
*
* @see http://zguide.zeromq.org/page:all#Client-side-Reliability-Lazy-Pirate-Pattern
*
*/

class LPServer
{

public static function main() {
Lib.println("** LPServer (see: http://zguide.zeromq.org/page:all#Client-side-Reliability-Lazy-Pirate-Pattern)");
var ctx = new ZContext();
var server = ctx.createSocket(ZMQ_REP);
server.bind("tcp://*:5555");

var cycles = 0;
while (true) {
var requestFrame = ZFrame.recvFrame(server);
cycles++;

// Simulate various problems, after a few cycles
if (cycles > 3 && ZHelpers.randof(3) == 0) {
Lib.println("I: simulating a crash");
break;
}
else if (cycles > 3 && ZHelpers.randof(3) == 0) {
Lib.println("I: simulating CPU overload");
Sys.sleep(2.0);
}
Lib.println("I: normal request (" + requestFrame.toString() + ")");
Sys.sleep(1.0); // Do some heavy work
requestFrame.send(server);
requestFrame.destroy();

}
server.close();
ctx.destroy();
}
}

lpserver.hx: Lazy Pirate server

To run this testcase, start the client and the server in two console windows. The server will randomly misbehave after a few messages. You can check the client's response. Here is a typical output from the server:

I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash

And here is the client's response:

I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning

The client sequences each message, and checks that replies come back exactly in order: that no requests or replies are lost, and no replies come back more than once, or out of order. Run the test a few times until you're convinced this mechanism actually works. You don't need sequence numbers in reality, they just help us trust our design.

The client uses a REQ socket, and does the brute-force close/reopen because REQ sockets impose a strict send/receive cycle. You might be tempted to use a DEALER instead, but it would not be a good decision. First, it would mean emulating the secret sauce that REQ does with envelopes (if you've forgotten what that is, it's a good sign you don't want to have to do it). Second, it would mean potentially getting back replies that you didn't expect.

Handling failures only at the client works when we have a set of clients talking to a single server. It can handle a server crash, but only if recovery means restarting that same server. If there's a permanent error - e.g. a dead power supply on the server hardware - this approach won't work. Since the application code in servers is usually the biggest source of failures in any architecture, depending on a single server is not a great idea.

So, pros and cons:

  • Pro: simple to understand and implement.
  • Pro: works easily with existing client and server application code.
  • Pro: ØMQ automatically retries the actual reconnection until it works.
  • Con: doesn't do fail-over to backup / alternate servers.

Basic Reliable Queuing (Simple Pirate Pattern)

topprevnext

Our second approach takes Lazy Pirate pattern and extends it with a queue device that lets us talk, transparently, to multiple servers, which we can more accurately call 'workers'. We'll develop this in stages, starting with a minimal working model, the Simple Pirate pattern.

In all these Pirate patterns, workers are stateless, or have some shared state we don't know about, e.g. a shared database. Having a queue device means workers can come and go without clients knowing anything about it. If one worker dies, another takes over. This is a nice simple topology with only one real weakness, namely the central queue itself, which can become a problem to manage, and a single point of failure.

The basis for the queue device is the least-recently-used (LRU) routing queue from Chapter Three. What is the very minimum we need to do to handle dead or blocked workers? Turns out, it's surprisingly little. We already have a retry mechanism in the client. So using the standard LRU queue will work pretty well. This fits with ØMQ's philosophy that we can extend a peer-to-peer pattern like request-reply by plugging naive devices in the middle.

Figure 54 - The Simple Pirate Pattern

fig54.png

We don't need a special client, we're still using the Lazy Pirate client. Here is the queue, which is exactly a LRU queue, no more or less:

package ;
import haxe.Stack;
import neko.Lib;
import org.zeromq.ZFrame;
import org.zeromq.ZContext;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import org.zeromq.ZMQException;

/**
* Simple Pirate queue
* This is identical to the LRU pattern, with no reliability mechanisms
* at all. It depends on the client for recovery. Runs forever.
*
* @see http://zguide.zeromq.org/page:all#Basic-Reliable-Queuing-Simple-Pirate-Pattern
*/

class SPQueue
{

// Signals workers are ready
private static inline var LRU_READY:String = String.fromCharCode(1);

public static function main() {
Lib.println("** SPQueue (see: http://zguide.zeromq.org/page:all#Basic-Reliable-Queuing-Simple-Pirate-Pattern)");

// Prepare our context and sockets
var context:ZContext = new ZContext();
var frontend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
var backend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
frontend.bind("tcp://*:5555"); // For clients
backend.bind("tcp://*:5556"); // For workers

// Queue of available workers
var workerQueue:List<ZFrame> = new List<ZFrame>();

var poller:ZMQPoller = new ZMQPoller();
poller.registerSocket(backend, ZMQ.ZMQ_POLLIN());

while (true) {
poller.unregisterSocket(frontend);
if (workerQueue.length > 0) {
// Only poll frontend if there is at least 1 worker ready to do work
poller.registerSocket(frontend, ZMQ.ZMQ_POLLIN());
}

try {
poller.poll( -1 );
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
if (poller.pollin(1)) {
// Use worker address for LRU routing
var msg = ZMsg.recvMsg(backend);
if (msg == null)
break; // Interrupted
var address = msg.unwrap();
workerQueue.add(address);

// Forward message to client if it's not a READY
var frame = msg.first();
if (frame.streq(LRU_READY))
msg.destroy();
else
msg.send(frontend);
}
if (poller.pollin(2)) {
// Get client request, route to first available worker
var msg = ZMsg.recvMsg(frontend);
if (msg != null) {
msg.wrap(workerQueue.pop());
msg.send(backend);
}
}
}
// When we're done, clean up properly
for (f in workerQueue) {
f.destroy();
}
context.destroy();
}
}

spqueue.hx: Simple Pirate queue

Here is the worker, which takes the Lazy Pirate server and adapts it for the LRU pattern (using the REQ 'ready' signaling):

package ;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMsg;

/**
* Simple Pirate worker
* Connects REQ socket to tcp://*:5556
* Implements worker part of LRU queuing
* @see http://zguide.zeromq.org/page:all#Basic-Reliable-Queuing-Simple-Pirate-Pattern
*/

class SPWorker
{

// Signals workers are ready
private static inline var LRU_READY:String = String.fromCharCode(1);

public static function main() {
Lib.println("** SPWorker (see: http://zguide.zeromq.org/page:all#Basic-Reliable-Queuing-Simple-Pirate-Pattern)");

var ctx = new ZContext();
var worker = ctx.createSocket(ZMQ_REQ);

// Set random identity to make tracing easier
var identity = ZHelpers.setID(worker);
worker.connect("tcp://localhost:5556");

// Tell broker we're ready for work
Lib.println("I: (" + identity + ") worker ready");
ZFrame.newStringFrame(LRU_READY).send(worker);

var cycles = 0;
while (true) {
var msg = ZMsg.recvMsg(worker);
if (msg == null)
break; // Interrupted
cycles++;

// Simulate various problems, after a few cycles
if (cycles > 3 && ZHelpers.randof(5) == 0) {
Lib.println("I: simulating a crash");
break;
}
else if (cycles > 3 && ZHelpers.randof(5) == 0) {
Lib.println("I: simulating CPU overload");
Sys.sleep(3.0);
if (ZMQ.isInterrupted())
break;
}
Lib.println("I: ("+identity+") normal reply");
Sys.sleep(1.0); // Do some heavy work
msg.send(worker);
}
ctx.destroy();

}
}

spworker.hx: Simple Pirate worker

To test this, start a handful of workers, a client, and the queue, in any order. You'll see that the workers eventually all crash and burn, and the client retries and then gives up. The queue never stops, and you can restart workers and clients ad-nauseam. This model works with any number of clients and workers.

Robust Reliable Queuing (Paranoid Pirate Pattern)

topprevnext

The Simple Pirate Queue pattern works pretty well, especially since it's just a combination of two existing patterns, but it has some weaknesses:

  • It's not robust against a queue crash and restart. The client will recover, but the workers won't. While ØMQ will reconnect workers' sockets automatically, as far as the newly started queue is concerned, the workers haven't signaled "READY", so don't exist. To fix this we have to do heartbeating from queue to worker, so that the worker can detect when the queue has gone away.
  • The queue does not detect worker failure, so if a worker dies while idle, the queue can only remove it from its worker queue by first sending it a request. The client waits and retries for nothing. It's not a critical problem but it's not nice. To make this work properly we do heartbeating from worker to queue, so that the queue can detect a lost worker at any stage.

We'll fix these in a properly pedantic Paranoid Pirate Pattern.

We previously used a REQ socket for the worker. For the Paranoid Pirate worker we'll switch to a DEALER socket. This has the advantage of letting us send and receive messages at any time, rather than the lock-step send/receive that REQ imposes. The downside of DEALER is that we have to do our own envelope management. If you don't know what I mean, please re-read Chapter Three.

Figure 55 - The Paranoid Pirate Pattern

fig55.png

We're still using the Lazy Pirate client. Here is the Paranoid Pirate queue device:

package ;
import haxe.Stack;
import neko.Lib;
import org.zeromq.ZFrame;
import org.zeromq.ZContext;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import org.zeromq.ZMQException;

/**
* Paranoid Pirate Queue
*
* @see http://zguide.zeromq.org/page:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern
*
* Author: rsmith (at) rsbatechnology (dot) co (dot) uk
*/

class PPQueue
{
private static inline var HEARTBEAT_LIVENESS = 3;
private static inline var HEARTBEAT_INTERVAL = 1000; // msecs

private static inline var PPP_READY = String.fromCharCode(1);
private static inline var PPP_HEARTBEAT = String.fromCharCode(2);

public static function main() {
Lib.println("** PPQueue (see: http://zguide.zeromq.org/page:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern)");

// Prepare our context and sockets
var context:ZContext = new ZContext();
var frontend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
var backend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
frontend.bind("tcp://*:5555"); // For clients
backend.bind("tcp://*:5556"); // For workerQueue

// Queue of available workerQueue
var workerQueue = new WorkerQueue(HEARTBEAT_LIVENESS, HEARTBEAT_INTERVAL);

// Send out heartbeats at regular intervals
var heartbeatAt = Date.now().getTime() + HEARTBEAT_INTERVAL;

var poller = new ZMQPoller();

while (true) {
poller.unregisterAllSockets();
poller.registerSocket(backend, ZMQ.ZMQ_POLLIN());
// Only poll frontend clients if we have at least one worker to do stuff
if (workerQueue.size() > 0) {
poller.registerSocket(frontend, ZMQ.ZMQ_POLLIN());
}
try {
poller.poll(HEARTBEAT_INTERVAL * 1000);
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}

// Handle worker activity
if (poller.pollin(1)) {
// use worker addressFrame for LRU routing
var msg = ZMsg.recvMsg(backend);
if (msg == null)
break; // Interrupted

// Any sign of life from worker means it's ready
var addressFrame = msg.unwrap();
var identity = addressFrame.toString();

// Validate control message, or return reply to client
if (msg.size() == 1) {
var frame = msg.first();
if (frame.streq(PPP_READY)) {
workerQueue.delete(identity);
workerQueue.append(addressFrame,identity);
} else if (frame.streq(PPP_HEARTBEAT)) {
workerQueue.refresh(identity);
} else {
Lib.println("E: invalid message from worker");
Lib.println(msg.toString());
}
msg.destroy();
} else {
msg.send(frontend);
workerQueue.append(addressFrame, identity);
}
}

if (poller.pollin(2)) {
// Now get next client request, route to next worker
var msg = ZMsg.recvMsg(frontend);
if (msg == null)
break; // Interrupted
var worker = workerQueue.dequeue();
msg.push(worker.addressFrame.duplicate());
msg.send(backend);
}

// Send heartbeats to idle workerQueue if it's time
if (Date.now().getTime() >= heartbeatAt) {
for ( w in workerQueue) {
var msg = new ZMsg();
msg.add(w.addressFrame.duplicate()); // Add a duplicate of the stored worker addressFrame frame,
// to prevent w.addressFrame ZFrame object from being destroyed when msg is sent
msg.addString(PPP_HEARTBEAT);
msg.send(backend);
}
heartbeatAt = Date.now().getTime() + HEARTBEAT_INTERVAL;
}
workerQueue.purge();
}
// When we're done, clean up properly
context.destroy();

}
}

typedef WorkerT = {
addressFrame:ZFrame,
identity:String,
expiry:Float // in msecs since 1 Jan 1970
};

/**
* Internal class managing a queue of workerQueue
*/

private class WorkerQueue {

// Stores hash of worker heartbeat expiries, keyed by worker identity
private var queue:List<WorkerT>;

private var heartbeatLiveness:Int;
private var heartbeatInterval:Int;

/**
* Constructor
* @param liveness
* @param interval
*/

public function new(liveness:Int, interval:Int) {
queue = new List<WorkerT>();
heartbeatLiveness = liveness;
heartbeatInterval = interval;
}

// Implement Iterable typedef signature
public function iterator():Iterator<WorkerT> {
return queue.iterator();
}

/**
* Insert worker at end of queue, reset expiry
* Worker must not already be in queue
* @param identity
*/

public function append(addressFrame:ZFrame,identity:String) {
if (get(identity) != null)
Lib.println("E: duplicate worker identity " + identity);
else
queue.add({addressFrame:addressFrame, identity:identity, expiry:generateExpiry()});
}

/**
* Remove worker from queue, if present
* @param identity
*/

public function delete(identity:String) {
var w = get(identity);
if (w != null) {
queue.remove(w);
}
}

public function refresh(identity:String) {
var w = get(identity);
if (w == null)
Lib.println("E: worker " + identity + " not ready");
else
w.expiry = generateExpiry();
}

/**
* Pop next worker off queue, return WorkerT
* @param identity
*/

public function dequeue():WorkerT {
return queue.pop();
}

/**
* Look for & kill expired workerQueue
*/

public function purge() {
for (w in queue) {
if (Date.now().getTime() > w.expiry) {
queue.remove(w);
}
}
}

/**
* Return the size of this worker Queue
* @return
*/

public function size():Int {
return queue.length;
}

/**
* Returns a WorkerT anon object if exists in the queue, else null
* @param identity
* @return
*/

private function get(identity:String):WorkerT {
for (w in queue) {
if (w.identity == identity)
return w;
}
return null; // nothing found
}

private inline function generateExpiry():Float {
return Date.now().getTime() + heartbeatInterval * heartbeatLiveness;
}
}

ppqueue.hx: Paranoid Pirate queue

The queue extends the LRU pattern with heartbeating of workers. Heartbeating is simple once it works, but quite difficult to invent. I'll explain more about that in a second.

Here is the Paranoid Pirate worker:

package ;
import haxe.Stack;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQException;
import org.zeromq.ZMsg;
import org.zeromq.ZSocket;

/**
* Paranoid Pirate worker
*
* @see http://zguide.zeromq.org/page:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern
*
* Author: rsmith (at) rsbatechnology (dot) co (dot) uk
*/

class PPWorker
{
private static inline var HEARTBEAT_LIVENESS = 3;
private static inline var HEARTBEAT_INTERVAL = 1000; // msecs
private static inline var INTERVAL_INIT = 1000; // Initial reconnect
private static inline var INTERVAL_MAX = 32000; // After exponential backoff

private static inline var PPP_READY = String.fromCharCode(1);
private static inline var PPP_HEARTBEAT = String.fromCharCode(2);

/**
* Helper function that returns a new configured socket
* connected to the Paranid Pirate queue
* @param ctx
* @return
*/

private static function workerSocket(ctx:ZContext):ZMQSocket {
var worker = ctx.createSocket(ZMQ_DEALER);
worker.connect("tcp://localhost:5556");

// Tell queue we're ready for work
Lib.println("I: worker ready");
ZFrame.newStringFrame(PPP_READY).send(worker);

return worker;
}

public static function main() {
Lib.println("** PPWorker (see: http://zguide.zeromq.org/page:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern)");

var ctx = new ZContext();
var worker = workerSocket(ctx);

// If liveness hits zero, queue is considered disconnected
var liveness = HEARTBEAT_LIVENESS;
var interval = INTERVAL_INIT;

// Send out heartbeats at regular intervals
var heartbeatAt = Date.now().getTime() + HEARTBEAT_INTERVAL;

var cycles = 0;
var poller = new ZMQPoller();
poller.registerSocket(worker, ZMQ.ZMQ_POLLIN());

while (true) {
try {
poller.poll(HEARTBEAT_INTERVAL * 1000);
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
if (poller.pollin(1)) {
// Get message
// - 3-part envelope + content -> request
// - 1-part HEARTBEAT -> heartbeat
var msg = ZMsg.recvMsg(worker);
if (msg == null)
break; // Interrupted
if (msg.size() == 3) {
// Simulate various problems, after a few cycles
cycles++;
if (cycles > 3 && ZHelpers.randof(5) == 0) {
Lib.println("I: simulating a crash");
msg.destroy();
break;
} else if (cycles > 3 && ZHelpers.randof(5) == 0) {
Lib.println("I: simulating CPU overload");
Sys.sleep(3.0);
if (ZMQ.isInterrupted())
break;
}
Lib.println("I: normal reply");
msg.send(worker);
liveness = HEARTBEAT_LIVENESS;
Sys.sleep(1.0); // Do some heavy work
if (ZMQ.isInterrupted())
break;
} else if (msg.size() == 1) {
var frame = msg.first();
if (frame.streq(PPP_HEARTBEAT))
liveness = HEARTBEAT_LIVENESS;
else {
Lib.println("E: invalid message");
Lib.println(msg.toString());
}
msg.destroy();
} else {
Lib.println("E: invalid message");
Lib.println(msg.toString());
}
interval = INTERVAL_INIT;
} else if (--liveness == 0) {
Lib.println("W: heartbeat failure, can't reach queue");
Lib.println("W: reconnecting in " + interval + " msec…");
Sys.sleep(interval / 1000.0);

if (interval < INTERVAL_MAX)
interval *= 2;
ctx.destroySocket(worker);