Chapter Five

Chapter Five - Advanced Publish-Subscribe

topprevnext

In Chapters Three and Four we looked at advanced use of ØMQ's request-reply pattern. If you managed to digest all that, congratulations. In this chapter we'll focus on publish-subscribe, and extend ØMQ's core pub-sub pattern with higher-level patterns for performance, reliability, state distribution, and security.

We'll cover:

  • How to handle too-slow subscribers (the Suicidal Snail pattern).
  • How to design high-speed subscribers (the Black Box pattern).
  • How to build a shared key-value cache (the Clone pattern).

Slow Subscriber Detection (Suicidal Snail Pattern)

topprevnext

A common problem you will hit when using the pub-sub pattern in real life is the slow subscriber. In an ideal world, we stream data at full speed from publishers to subscribers. In reality, subscriber applications are often written in interpreted languages, or just do a lot of work, or are just badly written, to the extent that they can't keep up with publishers.

How do we handle a slow subscriber? The ideal fix is to make the subscriber faster, but that might take work and time. Some of the classic strategies for handling a slow subscriber are:

  • Queue messages on the publisher. This is what Gmail does when I don't read my email for a couple of hours. But in high-volume messaging, pushing queues upstream has the thrilling but unprofitable result of making publishers run out of memory and crash. Especially if there are lots of subscribers and it's not possible to flush to disk for performance reasons.
  • Queue messages on the subscriber. This is much better, and it's what ØMQ does by default if the network can keep up with things. If anyone's going to run out of memory and crash, it'll be the subscriber rather than the publisher, which is fair. This is perfect for "peaky" streams where a subscriber can't keep up for a while, but can catch up when the stream slows down. However it's no answer to a subscriber that's simply too slow in general.
  • Stop queuing new messages after a while. This is what Gmail does when my mailbox overflows its 7.554GB, no 7.555GB of space. New messages just get rejected or dropped. This is a great strategy from the perspective of the publisher, and it's what ØMQ does when the publisher sets a high water mark or HWM. However it still doesn't help us fix the slow subscriber. Now we just get gaps in our message stream.
  • Punish slow subscribers with disconnect. This is what Hotmail does when I don't login for two weeks, which is why I'm on my fifteenth Hotmail account. It's a nice brutal strategy that forces subscribers to sit up and pay attention, and would be ideal, but ØMQ doesn't do this, and there's no way to layer it on top since subscribers are invisible to publisher applications.

None of these classic strategies fit. So we need to get creative. Rather than disconnect the publisher, let's convince the subscriber to kill itself. This is the Suicidal Snail pattern. When a subscriber detects that it's running too slowly (where "too slowly" is presumably a configured option that really means "so slowly that if you ever get here, shout really loudly because I need to know, so I can fix this!"), it croaks and dies.

How can a subscriber detect this? One way would be to sequence messages (number them in order), and use a HWM at the publisher. Now, if the subscriber detects a gap (i.e. the numbering isn't consecutive), it knows something is wrong. We then tune the HWM to the "croak and die if you hit this" level.

There are two problems with this solution. One, if we have many publishers, how do we sequence messages? The solution is to give each publisher a unique ID and add that to the sequencing. Second, if subscribers use ZMQ_SUBSCRIBE filters, they will get gaps by definition. Our precious sequencing will be for nothing.

Some use-cases won't use filters, and sequencing will work for them. But a more general solution is that the publisher timestamps each message. When a subscriber gets a message it checks the time, and if the difference is more than, say, one second, it does the "croak and die" thing. Possibly firing off a squawk to some operator console first.

The Suicide Snail pattern works especially when subscribers have their own clients and service-level agreements and need to guarantee certain maximum latencies. Aborting a subscriber may not seem like a constructive way to guarantee a maximum latency, but it's the assertion model. Abort today, and the problem will be fixed. Allow late data to flow downstream, and the problem may cause wider damage and take longer to appear on the radar.

So here is a minimal example of a Suicidal Snail:

<?php
/* Suicidal Snail
*
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/

/* ---------------------------------------------------------------------
* This is our subscriber
* It connects to the publisher and subscribes to everything. It
* sleeps for a short time between messages to simulate doing too
* much work. If a message is more than 1 second late, it croaks.
*/

define("MAX_ALLOWED_DELAY", 100); // msecs

function subscriber()
{
$context = new ZMQContext();

// Subscribe to everything
$subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB);
$subscriber->connect("tcp://localhost:5556");
$subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");

// Get and process messages
while (true) {
$clock = $subscriber->recv();
// Suicide snail logic
if (microtime(true)*100 - $clock*100 > MAX_ALLOWED_DELAY) {
echo "E: subscriber cannot keep up, aborting", PHP_EOL;
break;
}

// Work for 1 msec plus some random additional time
usleep(1000 + rand(0, 1000));
}
}

/* ---------------------------------------------------------------------
* This is our server task
* It publishes a time-stamped message to its pub socket every 1ms.
*/

function publisher()
{
$context = new ZMQContext();

// Prepare publisher
$publisher = new ZMQSocket($context, ZMQ::SOCKET_PUB);
$publisher->bind("tcp://*:5556");

while (true) {
// Send current clock (msecs) to subscribers
$publisher->send(microtime(true));
usleep(1000); // 1msec wait
}
}

/*
* This main thread simply starts a client, and a server, and then
* waits for the client to croak.
*/

$pid = pcntl_fork();
if ($pid == 0) {
publisher();
exit();
}

$pid = pcntl_fork();
if ($pid == 0) {
subscriber();
exit();
}

suisnail.php: Suicidal Snail

Notes about this example:

  • The message here consists simply of the current system clock as a number of milliseconds. In a realistic application you'd have at least a message header with the timestamp, and a message body with data.
  • The example has subscriber and publisher in a single process, as two threads. In reality they would be separate processes. Using threads is just convenient for the demonstration.

High-speed Subscribers (Black Box Pattern)

topprevnext

A common use-case for pub-sub is distributing large data streams. For example, 'market data' coming from stock exchanges. A typical set-up would have a publisher connected to a stock exchange, taking price quotes, and sending them out to a number of subscribers. If there are a handful of subscribers, we could use TCP. If we have a larger number of subscribers, we'd probably use reliable multicast, i.e. pgm.

Let's imagine our feed has an average of 100,000 100-byte messages a second. That's a typical rate, after filtering market data we don't need to send on to subscribers. Now we decide to record a day's data (maybe 250 GB in 8 hours), and then replay it to a simulation network, i.e. a small group of subscribers. While 100K messages a second is easy for a ØMQ application, we want to replay much faster.

So we set-up our architecture with a bunch of boxes, one for the publisher, and one for each subscriber. These are well-specified boxes, eight cores, twelve for the publisher. (If you're reading this in 2015, which is when the Guide is scheduled to be finished, please add a zero to those numbers.)

And as we pump data into our subscribers, we notice two things:

  1. When we do even the slightest amount of work with a message, it slows down our subscriber to the point where it can't catch up with the publisher again.
  2. We're hitting a ceiling, at both publisher and subscriber, to around say 6M messages a second, even after careful optimization and TCP tuning.

The first thing we have to do is break our subscriber into a multithreaded design so that we can do work with messages in one set of threads, while reading messages in another. Typically we don't want to process every message the same way. Rather, the subscriber will filter some messages, perhaps by prefix key. When a message matches some criteria, the subscriber will call a worker to deal with it. In ØMQ terms this means sending the message to a worker thread.

So the subscriber looks something like a queue device. We could use various sockets to connect the subscriber and workers. If we assume one-way traffic, and workers that are all identical, we can use PUSH and PULL, and delegate all the routing work to ØMQ. This is the simplest and fastest approach.

Figure 62 - The Simple Black Box Pattern

fig62.png

The subscriber talks to the publisher over TCP or PGM. The subscriber talks to its workers, which are all in the same process, over inproc.

Now to break that ceiling. What happens is that the subscriber thread hits 100% of CPU, and since it is one thread, it cannot use more than one core. A single thread will always hit a ceiling, be it at 2M, 6M, or more messages per second. We want to split the work across multiple threads that can run in parallel.

The approach used by many high-performance products, which works here, is sharding, meaning we split the work into parallel and independent streams. E.g. half of the topic keys are in one stream, half in another. We could use many streams, but performance won't scale unless we have free cores.

So let's see how to shard into two streams:

Figure 63 - Mad Black Box Pattern

fig63.png

With two streams, working at full speed, we would configure ØMQ as follows:

  • Two I/O threads, rather than one.
  • Two network interfaces (NIC), one per subscriber.
  • Each I/O thread bound to a specific NIC.
  • Two subscriber threads, bound to specific cores.
  • Two SUB sockets, one per subscriber thread.
  • The remaining cores assigned to worker threads.
  • Worker threads connected to both subscriber PUSH sockets.

With ideally, no more threads in our architecture than we had cores. Once we create more threads than cores, we get contention between threads, and diminishing returns. There would be no benefit, for example, in creating more I/O threads.

A Shared Key-Value Cache (Clone Pattern)

topprevnext

Pub-sub is like a radio broadcast, you miss everything before you join, and then how much information you get depends on the quality of your reception. Surprisingly, for engineers who are used to aiming for "perfection", this model is useful and wide-spread, because it maps perfectly to real-world distribution of information. Think of Facebook and Twitter, the BBC World Service, and the sports results.

However, there are also a whole lot of cases where more reliable pub-sub would be valuable, if we could do it. As we did for request-reply, let's define ''reliability' in terms of what can go wrong. Here are the classic problems with pub-sub:

  • Subscribers join late, so miss messages the server already sent.
  • Subscriber connections are slow, and can lose messages during that time.
  • Subscribers go away, and lose messages while they are away.

Less often, we see problems like these:

  • Subscribers can crash, and restart, and lose whatever data they already received.
  • Subscribers can fetch messages too slowly, so queues build up and then overflow.
  • Networks can become overloaded and drop data (specifically, for PGM).
  • Networks can become too slow, so publisher-side queues overflow, and publishers crash.

A lot more can go wrong but these are the typical failures we see in a realistic system.

We've already solved some of these, such as the slow subscriber, which we handle with the Suicidal Snail pattern. But for the rest, it would be nice to have a generic, reusable framework for reliable pub-sub.

The difficulty is that we have no idea what our target applications actually want to do with their data. Do they filter it, and process only a subset of messages? Do they log the data somewhere for later reuse? Do they distribute the data further to workers? There are dozens of plausible scenarios, and each will have its own ideas about what reliability means and how much it's worth in terms of effort and performance.

So we'll build an abstraction that we can implement once, and then reuse for many applications. This abstraction is a shared key-value cache, which stores a set of blobs indexed by unique keys.

Don't confuse this with distributed hash tables, which solve the wider problem of connecting peers in a distributed network, or with distributed key-value tables, which act like non-SQL databases. All we will build is a system that reliably clones some in-memory state from a server to a set of clients. We want to:

  • Let a client join the network at any time, and reliably get the current server state.
  • Let any client update the key-value cache (inserting new key-value pairs, updating existing ones, or deleting them).
  • Reliably propagate changes to all clients, and do this with minimum latency overhead.
  • Handle very large numbers of clients, e.g. tens of thousands or more.

The key aspect of the Clone pattern is that clients talk back to servers, which is more than we do in a simple pub-sub dialog. This is why I use the terms 'server' and 'client' instead of 'publisher' and 'subscriber'. We'll use pub-sub as the core of Clone but it is a bit more than that.

Distributing Key-Value Updates

topprevnext

We'll develop Clone in stages, solving one problem at a time. First, let's look at how to distribute key-value updates from a server to a set of clients. We'll take our weather server from Chapter One and refactor it to send messages as key-value pairs. We'll modify our client to store these in a hash table.

Figure 64 - Simplest Clone Model

fig64.png

This is the server:

// Clone server Model One

#include "kvsimple.c"

int main (void)
{
// Prepare our context and publisher socket
zctx_t *ctx = zctx_new ();
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5556");
zclock_sleep (200);

zhash_t *kvmap = zhash_new ();
int64_t sequence = 0;
srandom ((unsigned) time (NULL));

while (!zctx_interrupted) {
// Distribute as key-value message
kvmsg_t *kvmsg = kvmsg_new (++sequence);
kvmsg_fmt_key (kvmsg, "%d", randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_store (&kvmsg, kvmap);
}
printf (" Interrupted\n%d messages out\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}

clonesrv1.c: Clone server, Model One

And here is the client:

// Clone client Model One

#include "kvsimple.c"

int main (void)
{
// Prepare our context and updates socket
zctx_t *ctx = zctx_new ();
void *updates = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (updates, "");
zsocket_connect (updates, "tcp://localhost:5556");

zhash_t *kvmap = zhash_new ();
int64_t sequence = 0;

while (true) {
kvmsg_t *kvmsg = kvmsg_recv (updates);
if (!kvmsg)
break; // Interrupted
kvmsg_store (&kvmsg, kvmap);
sequence++;
}
printf (" Interrupted\n%d messages in\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}

clonecli1.c: Clone client, Model One

Some notes about this code:

  • All the hard work is done in a kvmsg class. This class works with key-value message objects, which are multipart ØMQ messages structured as three frames: a key (a ØMQ string), a sequence number (64-bit value, in network byte order), and a binary body (holds everything else).
  • The server generates messages with a randomized 4-digit key, which lets us simulate a large but not enormous hash table (10K entries).
  • The server does a 200 millisecond pause after binding its socket. This is to prevent "slow joiner syndrome" where the subscriber loses messages as it connects to the server's socket. We'll remove that in later models.
  • We'll use the terms 'publisher' and 'subscriber' in the code to refer to sockets. This will help later when we have multiple sockets doing different things.

Here is the kvmsg class, in the simplest form that works for now:

// kvsimple class - key-value message class for example applications

#include "kvsimple.h"
#include "zlist.h"

// Keys are short strings
#define KVMSG_KEY_MAX 255

// Message is formatted on wire as 3 frames:
// frame 0: key (0MQ string)
// frame 1: sequence (8 bytes, network order)
// frame 2: body (blob)
#define FRAME_KEY 0
#define FRAME_SEQ 1
#define FRAME_BODY 2
#define KVMSG_FRAMES 3

// The kvmsg class holds a single key-value message consisting of a
// list of 0 or more frames:

struct _kvmsg {
// Presence indicators for each frame
int present [KVMSG_FRAMES];
// Corresponding 0MQ message frames, if any
zmq_msg_t frame [KVMSG_FRAMES];
// Key, copied into safe C string
char key [KVMSG_KEY_MAX + 1];
};

// Here are the constructor and destructor for the class:

// Constructor, takes a sequence number for the new kvmsg instance:
kvmsg_t *
kvmsg_new (int64_t sequence)
{
kvmsg_t
*self;

self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
kvmsg_set_sequence (self, sequence);
return self;
}

// zhash_free_fn callback helper that does the low level destruction:
void
kvmsg_free (void *ptr)
{
if (ptr) {
kvmsg_t *self = (kvmsg_t *) ptr;
// Destroy message frames if any
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);

// Free object itself
free (self);
}
}

// Destructor
void
kvmsg_destroy (kvmsg_t **self_p)
{
assert (self_p);
if (*self_p) {
kvmsg_free (*self_p);
*self_p = NULL;
}
}

// This method reads a key-value message from socket, and returns a new
// kvmsg instance:

kvmsg_t *
kvmsg_recv (void *socket)
{
assert (socket);
kvmsg_t *self = kvmsg_new (0);

// Read all frames off the wire, reject if bogus
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);
zmq_msg_init (&self->frame [frame_nbr]);
self->present [frame_nbr] = 1;
if (zmq_msg_recv (&self->frame [frame_nbr], socket, 0) == -1) {
kvmsg_destroy (&self);
break;
}
// Verify multipart framing
int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
if (zsocket_rcvmore (socket) != rcvmore) {
kvmsg_destroy (&self);
break;
}
}
return self;
}

// This method sends a multiframe key-value message to a socket:

void
kvmsg_send (kvmsg_t *self, void *socket)
{
assert (self);
assert (socket);

int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
zmq_msg_t copy;
zmq_msg_init (&copy);
if (self->present [frame_nbr])
zmq_msg_copy (&copy, &self->frame [frame_nbr]);
zmq_msg_send (&copy, socket,
(frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
zmq_msg_close (&copy);
}
}

// These methods let the caller get and set the message key, as a
// fixed string and as a printf formatted string:

char *
kvmsg_key (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_KEY]) {
if (!*self->key) {
size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
if (size > KVMSG_KEY_MAX)
size = KVMSG_KEY_MAX;
memcpy (self->key,
zmq_msg_data (&self->frame [FRAME_KEY]), size);
self->key [size] = 0;
}
return self->key;
}
else
return NULL;
}

void
kvmsg_set_key (kvmsg_t *self, char *key)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_KEY];
if (self->present [FRAME_KEY])
zmq_msg_close (msg);
zmq_msg_init_size (msg, strlen (key));
memcpy (zmq_msg_data (msg), key, strlen (key));
self->present [FRAME_KEY] = 1;
}

void
kvmsg_fmt_key (kvmsg_t *self, char *format, …)
{
char value [KVMSG_KEY_MAX + 1];
va_list args;

assert (self);
va_start (args, format);
vsnprintf (value, KVMSG_KEY_MAX, format, args);
va_end (args);
kvmsg_set_key (self, value);
}

// These two methods let the caller get and set the message sequence number:

int64_t
kvmsg_sequence (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_SEQ]) {
assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
int64_t sequence = ((int64_t) (source [0]) << 56)
+ ((int64_t) (source [1]) << 48)
+ ((int64_t) (source [2]) << 40)
+ ((int64_t) (source [3]) << 32)
+ ((int64_t) (source [4]) << 24)
+ ((int64_t) (source [5]) << 16)
+ ((int64_t) (source [6]) << 8)
+ (int64_t) (source [7]);
return sequence;
}
else
return 0;
}

void
kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_SEQ];
if (self->present [FRAME_SEQ])
zmq_msg_close (msg);
zmq_msg_init_size (msg, 8);

byte *source = zmq_msg_data (msg);
source [0] = (byte) ((sequence >> 56) & 255);
source [1] = (byte) ((sequence >> 48) & 255);
source [2] = (byte) ((sequence >> 40) & 255);
source [3] = (byte) ((sequence >> 32) & 255);
source [4] = (byte) ((sequence >> 24) & 255);
source [5] = (byte) ((sequence >> 16) & 255);
source [6] = (byte) ((sequence >> 8) & 255);
source [7] = (byte) ((sequence) & 255);

self->present [FRAME_SEQ] = 1;
}

// These methods let the caller get and set the message body as a
// fixed string and as a printf formatted string:

byte *
kvmsg_body (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
else
return NULL;
}

void
kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_BODY];
if (self->present [FRAME_BODY])
zmq_msg_close (msg);
self->present [FRAME_BODY] = 1;
zmq_msg_init_size (msg, size);
memcpy (zmq_msg_data (msg), body, size);
}

void
kvmsg_fmt_body (kvmsg_t *self, char *format, …)
{
char value [255 + 1];
va_list args;

assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);
kvmsg_set_body (self, (byte *) value, strlen (value));
}

// This method returns the body size of the most recently read message,
// if any exists:

size_t
kvmsg_size (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return zmq_msg_size (&self->frame [FRAME_BODY]);
else
return 0;
}

// This method stores the key-value message into a hash map, unless
// the key and value are both null. It nullifies the kvmsg reference
// so that the object is owned by the hash map, not the caller:

void
kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
{
assert (self_p);
if (*self_p) {
kvmsg_t *self = *self_p;
assert (self);
if (self->present [FRAME_KEY]
&& self->present [FRAME_BODY]) {
zhash_update (hash, kvmsg_key (self), self);
zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
}
*self_p = NULL;
}
}

// This method prints the key-value message to stderr for
// debugging and tracing:

void
kvmsg_dump (kvmsg_t *self)
{
if (self) {
if (!self) {
fprintf (stderr, "NULL");
return;
}
size_t size = kvmsg_size (self);
byte *body = kvmsg_body (self);
fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self));
fprintf (stderr, "[key:%s]", kvmsg_key (self));
fprintf (stderr, "[size:%zd] ", size);
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++)
fprintf (stderr, "%02X", body [char_nbr]);
fprintf (stderr, "\n");
}
else
fprintf (stderr, "NULL message\n");
}

// It's good practice to have a self-test method that tests the class; this
// also shows how it's used in applications:

int
kvmsg_test (int verbose)
{
kvmsg_t
*kvmsg;

printf (" * kvmsg: ");

// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *output = zsocket_new (ctx, ZMQ_DEALER);
int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);
void *input = zsocket_new (ctx, ZMQ_DEALER);
rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);

zhash_t *kvmap = zhash_new ();

// Test send and receive of simple message
kvmsg = kvmsg_new (1);
kvmsg_set_key (kvmsg, "key");
kvmsg_set_body (kvmsg, (byte *) "body", 4);
if (verbose)
kvmsg_dump (kvmsg);
kvmsg_send (kvmsg, output);
kvmsg_store (&kvmsg, kvmap);

kvmsg = kvmsg_recv (input);
if (verbose)
kvmsg_dump (kvmsg);
assert (streq (kvmsg_key (kvmsg), "key"));
kvmsg_store (&kvmsg, kvmap);

// Shutdown and destroy all objects
zhash_destroy (&kvmap);
zctx_destroy (&ctx);

printf ("OK\n");
return 0;
}

kvsimple.c: Key-value message class

We'll make a more sophisticated kvmsg class later, for using in real applications.

Both the server and client maintain hash tables, but this first model only works properly if we start all clients before the server, and the clients never crash. That's not 'reliability'.

Getting a Snapshot

topprevnext

In order to allow a late (or recovering) client to catch up with a server it has to get a snapshot of the server's state. Just as we've reduced "message" to mean "a sequenced key-value pair", we can reduce "state" to mean "a hash table". To get the server state, a client opens a REQ socket and asks for it explicitly.

Figure 65 - State Replication

fig65.png

To make this work, we have to solve the timing problem. Getting a state snapshot will take a certain time, possibly fairly long if the snapshot is large. We need to correctly apply updates to the snapshot. But the server won't know when to start sending us updates. One way would be to start subscribing, get a first update, and then ask for "state for update N". This would require the server storing one snapshot for each update, which isn't practical.

So we will do the synchronization in the client, as follows:

  • The client first subscribes to updates and then makes a state request. This guarantees that the state is going to be newer than the oldest update it has.
  • The client waits for the server to reply with state, and meanwhile queues all updates. It does this simply by not reading them: ØMQ keeps them queued on the socket queue, since we don't set a HWM.
  • When the client receives its state update, it begins once again to read updates. However it discards any updates that are older than the state update. So if the state update includes updates up to 200, the client will discard updates up to 201.
  • The client then applies updates to its own state snapshot.

It's a simple model that exploits ØMQ's own internal queues. Here's the server:

// Clone server - Model Two

// Lets us build this source without creating a library
#include "kvsimple.c"

static int s_send_single (const char *key, void *data, void *args);
static void state_manager (void *args, zctx_t *ctx, void *pipe);

int main (void)
{
// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5557");

int64_t sequence = 0;
srandom ((unsigned) time (NULL));

// Start state manager and wait for synchronization signal
void *updates = zthread_fork (ctx, state_manager, NULL);
free (zstr_recv (updates));

while (!zctx_interrupted) {
// Distribute as key-value message
kvmsg_t *kvmsg = kvmsg_new (++sequence);
kvmsg_fmt_key (kvmsg, "%d", randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_send (kvmsg, updates);
kvmsg_destroy (&kvmsg);
}
printf (" Interrupted\n%d messages out\n", (int) sequence);
zctx_destroy (&ctx);
return 0;
}

// Routing information for a key-value snapshot
typedef struct {
void *socket; // ROUTER socket to send to
zframe_t *identity; // Identity of peer who requested state
} kvroute_t;

// Send one state snapshot key-value pair to a socket
// Hash item data is our kvmsg object, ready to send

static int
s_send_single (const char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
// Send identity of recipient first
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_t *kvmsg = (kvmsg_t *) data;
kvmsg_send (kvmsg, kvroute->socket);
return 0;
}

// The state manager task maintains the state and handles requests from
// clients for snapshots:

static void
state_manager (void *args, zctx_t *ctx, void *pipe)
{
zhash_t *kvmap = zhash_new ();

zstr_send (pipe, "READY");
void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (snapshot, "tcp://*:5556");

zmq_pollitem_t items [] = {
{ pipe, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
int64_t sequence = 0; // Current snapshot version number
while (!zctx_interrupted) {
int rc = zmq_poll (items, 2, -1);
if (rc == -1 && errno == ETERM)
break; // Context has been shut down

// Apply state update from main thread
if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (pipe);
if (!kvmsg)
break; // Interrupted
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
}
// Execute state snapshot request
if (items [1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv (snapshot);
if (!identity)
break; // Interrupted

// Request is in second frame of message
char *request = zstr_recv (snapshot);
if (streq (request, "ICANHAZ?"))
free (request);
else {
printf ("E: bad request, aborting\n");
break;
}
// Send state snapshot to client
kvroute_t routing = { snapshot, identity };

// For each entry in kvmap, send kvmsg to client
zhash_foreach (kvmap, s_send_single, &routing);

// Now send END message with sequence number
printf ("Sending state shapshot=%d\n", (int) sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
}
}
zhash_destroy (&kvmap);
}

clonesrv2.c: Clone server, Model Two

And here is the client:

// Clone client - Model Two

// Lets us build this source without creating a library
#include "kvsimple.c"

int main (void)
{
// Prepare our context and subscriber
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (subscriber, "");
zsocket_connect (subscriber, "tcp://localhost:5557");

zhash_t *kvmap = zhash_new ();

// Get state snapshot
int64_t sequence = 0;
zstr_send (snapshot, "ICANHAZ?");
while (true) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // Interrupted
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence (kvmsg);
printf ("Received snapshot=%d\n", (int) sequence);
kvmsg_destroy (&kvmsg);
break; // Done
}
kvmsg_store (&kvmsg, kvmap);
}
// Now apply pending updates, discard out-of-sequence messages
while (!zctx_interrupted) {
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
break; // Interrupted
if (kvmsg_sequence (kvmsg) > sequence) {
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
}
else
kvmsg_destroy (&kvmsg);
}
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}

clonecli2.c: Clone client, Model Two

Some notes about this code:

  • The server uses two threads, for simpler design. One thread produces random updates, and the second thread handles state. The two communicate across PAIR sockets. You might like to use SUB sockets but you'd hit the "slow joiner" problem where the subscriber would randomly miss some messages while connecting. PAIR sockets let us explicitly synchronize the two threads.
  • We set a HWM on the updates socket pair, since hash table insertions are relatively slow. Without this, the server runs out of memory. On inproc connections, the real HWM is the sum of the HWM of both sockets, so we set the HWM on each socket.
  • The client is really simple. In C, under 60 lines of code. A lot of the heavy lifting is done in the kvmsg class, but still, the basic Clone pattern is easier to implement than it seemed at first.
  • We don't use anything fancy for serializing the state. The hash table holds a set of kvmsg objects, and the server sends these, as a batch of messages, to the client requesting state. If multiple clients request state at once, each will get a different snapshot.
  • We assume that the client has exactly one server to talk to. The server must be running; we do not try to solve the question of what happens if the server crashes.

Right now, these two programs don't do anything real, but they correctly synchronize state. It's a neat example of how to mix different patterns: PAIR-over-inproc, PUB-SUB, and ROUTER-DEALER.

Republishing Updates

topprevnext

In our second model, changes to the key-value cache came from the server itself. This is a centralized model, useful for example if we have a central configuration file we want to distribute, with local caching on each node. A more interesting model takes updates from clients, not the server. The server thus becomes a stateless broker. This gives us some benefits:

  • We're less worried about the reliability of the server. If it crashes, we can start a new instance, and feed it new values.
  • We can use the key-value cache to share knowledge between dynamic peers.

Updates from clients go via a PUSH-PULL socket flow from client to server.

Figure 66 - Republishing Updates

fig66.png

Why don't we allow clients to publish updates directly to other clients? While this would reduce latency, it makes it impossible to assign ascending unique sequence numbers to messages. The server can do this. There's a more subtle second reason. In many applications it's important that updates have a single order, across many clients. Forcing all updates through the server ensures that they have the same order when they finally get to clients.

With unique sequencing, clients can detect the nastier failures - network congestion and queue overflow. If a client discovers that its incoming message stream has a hole, it can take action. It seems sensible that the client contact the server and ask for the missing messages, but in practice that isn't useful. If there are holes, they're caused by network stress, and adding more stress to the network will make things worse. All the client can really do is warn its users "Unable to continue", and stop, and not restart until someone has manually checked the cause of the problem.

We'll now generate state updates in the client. Here's the server:

// Clone server - Model Three

// Lets us build this source without creating a library
#include "kvsimple.c"

// Routing information for a key-value snapshot
typedef struct {
void *socket; // ROUTER socket to send to
zframe_t *identity; // Identity of peer who requested state
} kvroute_t;

// Send one state snapshot key-value pair to a socket
// Hash item data is our kvmsg object, ready to send

static int
s_send_single (const char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
// Send identity of recipient first
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_t *kvmsg = (kvmsg_t *) data;
kvmsg_send (kvmsg, kvroute->socket);
return 0;
}

int main (void)
{
// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (snapshot, "tcp://*:5556");
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5557");
void *collector = zsocket_new (ctx, ZMQ_PULL);
zsocket_bind (collector, "tcp://*:5558");

// The body of the main task collects updates from clients and
// publishes them back out to clients:

int64_t sequence = 0;
zhash_t *kvmap = zhash_new ();

zmq_pollitem_t items [] = {
{ collector, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);

// Apply state update sent from client
if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (collector);
if (!kvmsg)
break; // Interrupted
kvmsg_set_sequence (kvmsg, ++sequence);
kvmsg_send (kvmsg, publisher);
kvmsg_store (&kvmsg, kvmap);
printf ("I: publishing update %5d\n", (int) sequence);
}
// Execute state snapshot request
if (items [1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv (snapshot);
if (!identity)
break; // Interrupted

// Request is in second frame of message
char *request = zstr_recv (snapshot);
if (streq (request, "ICANHAZ?"))
free (request);
else {
printf ("E: bad request, aborting\n");
break;
}
// Send state snapshot to client
kvroute_t routing = { snapshot, identity };

// For each entry in kvmap, send kvmsg to client
zhash_foreach (kvmap, s_send_single, &routing);

// Now send END message with sequence number
printf ("I: sending shapshot=%d\n", (int) sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
}
}
printf (" Interrupted\n%d messages handled\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);

return 0;
}

clonesrv3.c: Clone server, Model Three

And here is the client:

// Clone client - Model Three

// Lets us build this source without creating a library
#include "kvsimple.c"

int main (void)
{
// Prepare our context and subscriber
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (subscriber, "");
zsocket_connect (subscriber, "tcp://localhost:5557");
void *publisher = zsocket_new (ctx, ZMQ_PUSH);
zsocket_connect (publisher, "tcp://localhost:5558");

zhash_t *kvmap = zhash_new ();
srandom ((unsigned) time (NULL));

// We first request a state snapshot:
int64_t sequence = 0;
zstr_send (snapshot, "ICANHAZ?");
while (true) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // Interrupted
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence (kvmsg);
printf ("I: received snapshot=%d\n", (int) sequence);
kvmsg_destroy (&kvmsg);
break; // Done
}
kvmsg_store (&kvmsg, kvmap);
}
// Now we wait for updates from the server and every so often, we
// send a random key-value update to the server:

int64_t alarm = zclock_time () + 1000;
while (!zctx_interrupted) {
zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
int tickless = (int) ((alarm - zclock_time ()));
if (tickless < 0)
tickless = 0;
int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Context has been shut down

if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
break; // Interrupted

// Discard out-of-sequence kvmsgs, incl. heartbeats
if (kvmsg_sequence (kvmsg) > sequence) {
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
printf ("I: received update=%d\n", (int) sequence);
}
else
kvmsg_destroy (&kvmsg);
}
// If we timed out, generate a random kvmsg
if (zclock_time () >= alarm) {
kvmsg_t *kvmsg = kvmsg_new (0);
kvmsg_fmt_key (kvmsg, "%d", randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_destroy (&kvmsg);
alarm = zclock_time () + 1000;
}
}
printf (" Interrupted\n%d messages in\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}

clonecli3.c: Clone client, Model Three

Some notes about this code:

  • The server has collapsed to a single task. It manages a PULL socket for incoming updates, a ROUTER socket for state requests, and a PUB socket for outgoing updates.
  • The client uses a simple tickless timer to send a random update to the server once a second. In a real implementation we would drive updates from application code.

Clone Subtrees

topprevnext

A realistic key-value cache will get large, and clients will usually be interested only in parts of the cache. Working with a subtree is fairly simple. The client has to tell the server the subtree when it makes a state request, and it has to specify the same subtree when it subscribes to updates.

There are a couple of common syntaxes for trees. One is the "path hierarchy", and another is the "topic tree". These look like:

  • Path hierarchy: "/some/list/of/paths"
  • Topic tree: "some.list.of.topics"

We'll use the path hierarchy, and extend our client and server so that a client can work with a single subtree. Working with multiple subtrees is not much more difficult, we won't do that here but it's a trivial extension.

Here's the server, a small variation on Model Three:

// Clone server - Model Four

// Lets us build this source without creating a library
#include "kvsimple.c"

// Routing information for a key-value snapshot
typedef struct {
void *socket; // ROUTER socket to send to
zframe_t *identity; // Identity of peer who requested state
char *subtree; // Client subtree specification
} kvroute_t;

// Send one state snapshot key-value pair to a socket
// Hash item data is our kvmsg object, ready to send

static int
s_send_single (const char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
&& memcmp (kvroute->subtree,
kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
// Send identity of recipient first
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send (kvmsg, kvroute->socket);
}
return 0;
}

// The main task is identical to clonesrv3 except for where it
// handles subtrees.

int main (void)
{
// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (snapshot, "tcp://*:5556");
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5557");
void *collector = zsocket_new (ctx, ZMQ_PULL);
zsocket_bind (collector, "tcp://*:5558");

int64_t sequence = 0;
zhash_t *kvmap = zhash_new ();

zmq_pollitem_t items [] = {
{ collector, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);

// Apply state update sent from client
if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (collector);
if (!kvmsg)
break; // Interrupted
kvmsg_set_sequence (kvmsg, ++sequence);
kvmsg_send (kvmsg, publisher);
kvmsg_store (&kvmsg, kvmap);
printf ("I: publishing update %5d\n", (int) sequence);
}
// Execute state snapshot request
if (items [1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv (snapshot);
if (!identity)
break; // Interrupted

// Request is in second frame of message
char *request = zstr_recv (snapshot);
char *subtree = NULL;
if (streq (request, "ICANHAZ?")) {
free (request);
subtree = zstr_recv (snapshot);
}
else {
printf ("E: bad request, aborting\n");
break;
}
// Send state snapshot to client
kvroute_t routing = { snapshot, identity, subtree };

// For each entry in kvmap, send kvmsg to client
zhash_foreach (kvmap, s_send_single, &routing);

// Now send END message with sequence number
printf ("I: sending shapshot=%d\n", (int) sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) subtree, 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
free (subtree);
}
}
printf (" Interrupted\n%d messages handled\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);

return 0;
}

clonesrv4.c: Clone server, Model Four

And here is the client:

// Clone client - Model Four

// Lets us build this source without creating a library
#include "kvsimple.c"

// This client is identical to clonecli3 except for where we
// handles subtrees.
#define SUBTREE "/client/"

int main (void)
{
// Prepare our context and subscriber
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (subscriber, "");
zsocket_connect (subscriber, "tcp://localhost:5557");
zsocket_set_subscribe (subscriber, SUBTREE);
void *publisher = zsocket_new (ctx, ZMQ_PUSH);
zsocket_connect (publisher, "tcp://localhost:5558");

zhash_t *kvmap = zhash_new ();
srandom ((unsigned) time (NULL));

// We first request a state snapshot:
int64_t sequence = 0;
zstr_sendm (snapshot, "ICANHAZ?");
zstr_send (snapshot, SUBTREE);
while (true) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // Interrupted
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence (kvmsg);
printf ("I: received snapshot=%d\n", (int) sequence);
kvmsg_destroy (&kvmsg);
break; // Done
}
kvmsg_store (&kvmsg, kvmap);
}
int64_t alarm = zclock_time () + 1000;
while (!zctx_interrupted) {
zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
int tickless = (int) ((alarm - zclock_time ()));
if (tickless < 0)
tickless = 0;
int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Context has been shut down

if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
break; // Interrupted

// Discard out-of-sequence kvmsgs, incl. heartbeats
if (kvmsg_sequence (kvmsg) > sequence) {
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
printf ("I: received update=%d\n", (int) sequence);
}
else
kvmsg_destroy (&kvmsg);
}
// If we timed out, generate a random kvmsg
if (zclock_time () >= alarm) {
kvmsg_t *kvmsg = kvmsg_new (0);
kvmsg_fmt_key (kvmsg, "%s%d", SUBTREE, randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_destroy (&kvmsg);
alarm = zclock_time () + 1000;
}
}
printf (" Interrupted\n%d messages in\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}

clonecli4.c: Clone client, Model Four

Ephemeral Values

topprevnext

An ephemeral value is one that expires dynamically. If you think of Clone being used for a DNS-like service, then ephemeral values would let you do dynamic DNS. A node joins the network, publishes its address, and refreshes this regularly. If the node dies, its address eventually gets removed.

The usual abstraction for ephemeral values is to attach them to a "session", and delete them when the session ends. In Clone, sessions would be defined by clients, and would end if the client died.

The simpler alternative to using sessions is to define every ephemeral value with a "time to live" that tells the server when to expire the value. Clients then refresh values, and if they don't, the values expire.

I'm going to implement that simpler model because we don't know yet that it's worth making a more complex one. The difference is really in performance. If clients have a handful of ephemeral values, it's fine to set a TTL on each one. If clients use masses of ephemeral values, it's more efficient to attach them to sessions, and expire them in bulk.

First off, we need a way to encode the TTL in the key-value message. We could add a frame. The problem with using frames for properties is that each time we want to add a new property, we have to change the structure of our kvmsg class. It breaks compatibility. So let's add a 'properties' frame to the message, and code to let us get and put property values.

Next, we need a way to say, "delete this value". Up to now servers and clients have always blindly inserted or updated new values into their hash table. We'll say that if the value is empty, that means "delete this key".

Here's a more complete version of the kvmsg class, which implements a 'properties' frame (and adds a UUID frame, which we'll need later on). It also handles empty values by deleting the key from the hash, if necessary:

// kvmsg class - key-value message class for example applications

#include "kvmsg.h"
#include <uuid/uuid.h>
#include "zlist.h"

// Keys are short strings
#define KVMSG_KEY_MAX 255

// Message is formatted on wire as 5 frames:
// frame 0: key (0MQ string)
// frame 1: sequence (8 bytes, network order)
// frame 2: uuid (blob, 16 bytes)
// frame 3: properties (0MQ string)
// frame 4: body (blob)
#define FRAME_KEY 0
#define FRAME_SEQ 1
#define FRAME_UUID 2
#define FRAME_PROPS 3
#define FRAME_BODY 4
#define KVMSG_FRAMES 5

// Structure of our class
struct _kvmsg {
// Presence indicators for each frame
int present [KVMSG_FRAMES];
// Corresponding 0MQ message frames, if any
zmq_msg_t frame [KVMSG_FRAMES];
// Key, copied into safe C string
char key [KVMSG_KEY_MAX + 1];
// List of properties, as name=value strings
zlist_t *props;
size_t props_size;
};

// These two helpers serialize a list of properties to and from a
// message frame:

static void
s_encode_props (kvmsg_t *self)
{
zmq_msg_t *msg = &self->frame [FRAME_PROPS];
if (self->present [FRAME_PROPS])
zmq_msg_close (msg);

zmq_msg_init_size (msg, self->props_size);
char *prop = zlist_first (self->props);
char *dest = (char *) zmq_msg_data (msg);
while (prop) {
strcpy (dest, prop);
dest += strlen (prop);
*dest++ = '\n';
prop = zlist_next (self->props);
}
self->present [FRAME_PROPS] = 1;
}

static void
s_decode_props (kvmsg_t *self)
{
zmq_msg_t *msg = &self->frame [FRAME_PROPS];
self->props_size = 0;
while (zlist_size (self->props))
free (zlist_pop (self->props));

size_t remainder = zmq_msg_size (msg);
char *prop = (char *) zmq_msg_data (msg);
char *eoln = memchr (prop, '\n', remainder);
while (eoln) {
*eoln = 0;
zlist_append (self->props, strdup (prop));
self->props_size += strlen (prop) + 1;
remainder -= strlen (prop) + 1;
prop = eoln + 1;
eoln = memchr (prop, '\n', remainder);
}
}

// Here are the constructor and destructor for the class:

// Constructor, takes a sequence number for the new kvmsg instance:
kvmsg_t *
kvmsg_new (int64_t sequence)
{
kvmsg_t
*self;

self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
self->props = zlist_new ();
kvmsg_set_sequence (self, sequence);
return self;
}

// zhash_free_fn callback helper that does the low level destruction:
void
kvmsg_free (void *ptr)
{
if (ptr) {
kvmsg_t *self = (kvmsg_t *) ptr;
// Destroy message frames if any
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);

// Destroy property list
while (zlist_size (self->props))
free (zlist_pop (self->props));
zlist_destroy (&self->props);

// Free object itself
free (self);
}
}

// Destructor
void
kvmsg_destroy (kvmsg_t **self_p)
{
assert (self_p);
if (*self_p) {
kvmsg_free (*self_p);
*self_p = NULL;
}
}

// This method reads a key-value message from the socket and returns a
// new kvmsg instance:

kvmsg_t *
kvmsg_recv (void *socket)
{
// This method is almost unchanged from kvsimple
assert (socket);
kvmsg_t *self = kvmsg_new (0);

// Read all frames off the wire, reject if bogus
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);
zmq_msg_init (&self->frame [frame_nbr]);
self->present [frame_nbr] = 1;
if (zmq_msg_recv (&self->frame [frame_nbr], socket, 0) == -1) {
kvmsg_destroy (&self);
break;
}
// Verify multipart framing
int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
if (zsocket_rcvmore (socket) != rcvmore) {
kvmsg_destroy (&self);
break;
}
}
if (self)
s_decode_props (self);
return self;
}

// Send key-value message to socket; any empty frames are sent as such.
void
kvmsg_send (kvmsg_t *self, void *socket)
{
assert (self);
assert (socket);

s_encode_props (self);
// The rest of the method is unchanged from kvsimple
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
zmq_msg_t copy;
zmq_msg_init (&copy);
if (self->present [frame_nbr])
zmq_msg_copy (&copy, &self->frame [frame_nbr]);
zmq_msg_send (&copy, socket,
(frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
zmq_msg_close (&copy);
}
}

// This method duplicates a kvmsg instance, returns the new instance:

kvmsg_t *
kvmsg_dup (kvmsg_t *self)
{
kvmsg_t *kvmsg = kvmsg_new (0);
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present [frame_nbr]) {
zmq_msg_t *src = &self->frame [frame_nbr];
zmq_msg_t *dst = &kvmsg->frame [frame_nbr];
zmq_msg_init_size (dst, zmq_msg_size (src));
memcpy (zmq_msg_data (dst),
zmq_msg_data (src), zmq_msg_size (src));
kvmsg->present [frame_nbr] = 1;
}
}
kvmsg->props_size = zlist_size (self->props);
char *prop = (char *) zlist_first (self->props);
while (prop) {
zlist_append (kvmsg->props, strdup (prop));
prop = (char *) zlist_next (self->props);
}
return kvmsg;
}

// The key, sequence, body, and size methods are the same as in kvsimple.

// Return key from last read message, if any, else NULL
char *
kvmsg_key (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_KEY]) {
if (!*self->key) {
size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
if (size > KVMSG_KEY_MAX)
size = KVMSG_KEY_MAX;
memcpy (self->key,
zmq_msg_data (&self->frame [FRAME_KEY]), size);
self->key [size] = 0;
}
return self->key;
}
else
return NULL;
}

// Set message key as provided
void
kvmsg_set_key (kvmsg_t *self, char *key)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_KEY];
if (self->present [FRAME_KEY])
zmq_msg_close (msg);
zmq_msg_init_size (msg, strlen (key));
memcpy (zmq_msg_data (msg), key, strlen (key));
self->present [FRAME_KEY] = 1;
}

// Set message key using printf format
void
kvmsg_fmt_key (kvmsg_t *self, char *format, …)
{
char value [KVMSG_KEY_MAX + 1];
va_list args;

assert (self);
va_start (args, format);
vsnprintf (value, KVMSG_KEY_MAX, format, args);
va_end (args);
kvmsg_set_key (self, value);
}

// Return sequence nbr from last read message, if any
int64_t
kvmsg_sequence (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_SEQ]) {
assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
int64_t sequence = ((int64_t) (source [0]) << 56)
+ ((int64_t) (source [1]) << 48)
+ ((int64_t) (source [2]) << 40)
+ ((int64_t) (source [3]) << 32)
+ ((int64_t) (source [4]) << 24)
+ ((int64_t) (source [5]) << 16)
+ ((int64_t) (source [6]) << 8)
+ (int64_t) (source [7]);
return sequence;
}
else
return 0;
}

// Set message sequence number
void
kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_SEQ];
if (self->present [FRAME_SEQ])
zmq_msg_close (msg);
zmq_msg_init_size (msg, 8);

byte *source = zmq_msg_data (msg);
source [0] = (byte) ((sequence >> 56) & 255);
source [1] = (byte) ((sequence >> 48) & 255);
source [2] = (byte) ((sequence >> 40) & 255);
source [3] = (byte) ((sequence >> 32) & 255);
source [4] = (byte) ((sequence >> 24) & 255);
source [5] = (byte) ((sequence >> 16) & 255);
source [6] = (byte) ((sequence >> 8) & 255);
source [7] = (byte) ((sequence) & 255);

self->present [FRAME_SEQ] = 1;
}

// Return body from last read message, if any, else NULL
byte *
kvmsg_body (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
else
return NULL;
}

// Set message body
void
kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_BODY];
if (self->present [FRAME_BODY])
zmq_msg_close (msg);
self->present [FRAME_BODY] = 1;
zmq_msg_init_size (msg, size);
memcpy (zmq_msg_data (msg), body, size);
}

// Set message body using printf format
void
kvmsg_fmt_body (kvmsg_t *self, char *format, …)
{
char value [255 + 1];
va_list args;

assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);
kvmsg_set_body (self, (byte *) value, strlen (value));
}

// Return body size from last read message, if any, else zero
size_t
kvmsg_size (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return zmq_msg_size (&self->frame [FRAME_BODY]);
else
return 0;
}

// These methods get and set the UUID for the key-value message:

byte *
kvmsg_uuid (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_UUID]
&& zmq_msg_size (&self->frame [FRAME_UUID]) == sizeof (uuid_t))
return (byte *) zmq_msg_data (&self->frame [FRAME_UUID]);
else
return NULL;
}

// Sets the UUID to a randomly generated value
void
kvmsg_set_uuid (kvmsg_t *self)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_UUID];
uuid_t uuid;
uuid_generate (uuid);
if (self->present [FRAME_UUID])
zmq_msg_close (msg);
zmq_msg_init_size (msg, sizeof (uuid));
memcpy (zmq_msg_data (msg), uuid, sizeof (uuid));
self->present [FRAME_UUID] = 1;
}

// These methods get and set a specified message property:

// Get message property, return "" if no such property is defined.
char *
kvmsg_get_prop (kvmsg_t *self, char *name)
{
assert (strchr (name, '=') == NULL);
char *prop = zlist_first (self->props);
size_t namelen = strlen (name);
while (prop) {
if (strlen (prop) > namelen
&& memcmp (prop, name, namelen) == 0
&& prop [namelen] == '=')
return prop + namelen + 1;
prop = zlist_next (self->props);
}
return "";
}

// Set message property. Property name cannot contain '='. Max length of
// value is 255 chars.

void
kvmsg_set_prop (kvmsg_t *self, char *name, char *format, …)
{
assert (strchr (name, '=') == NULL);

char value [255 + 1];
va_list args;
assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);

// Allocate name=value string
char *prop = malloc (strlen (name) + strlen (value) + 2);

// Remove existing property if any
sprintf (prop, "%s=", name);
char *existing = zlist_first (self->props);
while (existing) {
if (memcmp (prop, existing, strlen (prop)) == 0) {
self->props_size -= strlen (existing) + 1;
zlist_remove (self->props, existing);
free (existing);
break;
}
existing = zlist_next (self->props);
}
// Add new name=value property string
strcat (prop, value);
zlist_append (self->props, prop);
self->props_size += strlen (prop) + 1;
}

// This method stores the key-value message into a hash map, unless
// the key and value are both null. It nullifies the kvmsg reference
// so that the object is owned by the hash map, not the caller:

void
kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
{
assert (self_p);
if (*self_p) {
kvmsg_t *self = *self_p;
assert (self);
if (kvmsg_size (self)) {
if (self->present [FRAME_KEY]
&& self->present [FRAME_BODY]) {
zhash_update (hash, kvmsg_key (self), self);
zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
}
}
else
zhash_delete (hash, kvmsg_key (self));

*self_p = NULL;
}
}

// This method extends the kvsimple implementation with support for
// message properties:

void
kvmsg_dump (kvmsg_t *self)
{
if (self) {
if (!self) {
fprintf (stderr, "NULL");
return;
}
size_t size = kvmsg_size (self);
byte *body = kvmsg_body (self);
fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self));
fprintf (stderr, "[key:%s]", kvmsg_key (self));
fprintf (stderr, "[size:%zd] ", size);
if (zlist_size (self->props)) {
fprintf (stderr, "[");
char *prop = zlist_first (self->props);
while (prop) {
fprintf (stderr, "%s;", prop);
prop = zlist_next (self->props);
}
fprintf (stderr, "]");
}
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++)
fprintf (stderr, "%02X", body [char_nbr]);
fprintf (stderr, "\n");
}
else
fprintf (stderr, "NULL message\n");
}

// This method is the same as in kvsimple with added support
// for the uuid and property features of kvmsg:

int
kvmsg_test (int verbose)
{
kvmsg_t
*kvmsg;

printf (" * kvmsg: ");

// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *output = zsocket_new (ctx, ZMQ_DEALER);
int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);
void *input = zsocket_new (ctx, ZMQ_DEALER);
rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);

zhash_t *kvmap = zhash_new ();

// Test send and receive of simple message
kvmsg = kvmsg_new (1);
kvmsg_set_key (kvmsg, "key");
kvmsg_set_uuid (kvmsg);
kvmsg_set_body (kvmsg, (byte *) "body", 4);
if (verbose)
kvmsg_dump (kvmsg);
kvmsg_send (kvmsg, output);
kvmsg_store (&kvmsg, kvmap);

kvmsg = kvmsg_recv (input);
if (verbose)
kvmsg_dump (kvmsg);
assert (streq (kvmsg_key (kvmsg), "key"));
kvmsg_store (&kvmsg, kvmap);

// Test send and receive of message with properties
kvmsg = kvmsg_new (2);
kvmsg_set_prop (kvmsg, "prop1", "value1");
kvmsg_set_prop (kvmsg, "prop2", "value1");
kvmsg_set_prop (kvmsg, "prop2", "value2");
kvmsg_set_key (kvmsg, "key");
kvmsg_set_uuid (kvmsg);
kvmsg_set_body (kvmsg, (byte *) "body", 4);
assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2"));
if (verbose)
kvmsg_dump (kvmsg);
kvmsg_send (kvmsg, output);
kvmsg_destroy (&kvmsg);

kvmsg = kvmsg_recv (input);
if (verbose)
kvmsg_dump (kvmsg);
assert (streq (kvmsg_key (kvmsg), "key"));
assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2"));
kvmsg_destroy (&kvmsg);
// Shutdown and destroy all objects
zhash_destroy (&kvmap);
zctx_destroy (&ctx);

printf ("OK\n");
return 0;
}

kvmsg.c: Key-value message class - full

The Model Five client is almost identical to Model Four. Diff is your friend. It uses the full kvmsg class instead of kvsimple, and sets a randomized 'ttl' property (measured in seconds) on each message:

// Create a new Binary Star instance, using local (bind) and
// remote (connect) endpoints to set-up the server peering.

bstar_t *bstar_new (int primary, char *local, char *remote);

// Destroy a Binary Star instance
void bstar_destroy (bstar_t **self_p);

// Return underlying zloop reactor, for timer and reader
// registration and cancelation.

zloop_t *bstar_zloop (bstar_t *self);

// Register voting reader
int bstar_voter (bstar_t *self, char *endpoint, int type,
zloop_fn handler, void *arg);

// Register main state change handlers
void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);

// Start the reactor, ends if a callback function returns -1, or the
// process received SIGINT or SIGTERM.

int bstar_start (bstar_t *self);

The Model Five server has totally changed. Instead of a poll loop, we're now using a reactor. This just makes it simpler to mix timers and socket events. Unfortunately in C the reactor style is more verbose. Your mileage will vary in other languages. But reactors seem to be a better way of building more complex ØMQ applications. Here's the server:

// Clone server - Model Five

// Lets us build this source without creating a library
#include "kvmsg.c"

// zloop reactor handlers
static int s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int s_flush_ttl (zloop_t *loop, int timer_id, void *args);

// Our server is defined by these properties
typedef struct {
zctx_t *ctx; // Context wrapper
zhash_t *kvmap; // Key-value store
zloop_t *loop; // zloop reactor
int port; // Main port we're working on
int64_t sequence; // How many updates we're at
void *snapshot; // Handle snapshot requests
void *publisher; // Publish updates to clients
void *collector; // Collect updates from clients
} clonesrv_t;

int main (void)
{
clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
self->port = 5556;
self->ctx = zctx_new ();
self->kvmap = zhash_new ();
self->loop = zloop_new ();
zloop_set_verbose (self->loop, false);

// Set up our clone server sockets
self->snapshot = zsocket_new (self->ctx, ZMQ_ROUTER);
zsocket_bind (self->snapshot, "tcp://*:%d", self->port);
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
self->collector = zsocket_new (self->ctx, ZMQ_PULL);
zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);

// Register our handlers with reactor
zmq_pollitem_t poller = { 0, 0, ZMQ_POLLIN };
poller.socket = self->snapshot;
zloop_poller (self->loop, &poller, s_snapshots, self);
poller.socket = self->collector;
zloop_poller (self->loop, &poller, s_collector, self);
zloop_timer (self->loop, 1000, 0, s_flush_ttl, self);

// Run reactor until process interrupted
zloop_start (self->loop);

zloop_destroy (&self->loop);
zhash_destroy (&self->kvmap);
zctx_destroy (&self->ctx);
free (self);
return 0;
}

// We handle ICANHAZ? requests by sending snapshot data to the
// client that requested it:

// Routing information for a key-value snapshot
typedef struct {
void *socket; // ROUTER socket to send to
zframe_t *identity; // Identity of peer who requested state
char *subtree; // Client subtree specification
} kvroute_t;

// We call this function for each key-value pair in our hash table
static int
s_send_single (const char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
&& memcmp (kvroute->subtree,
kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
zframe_send (&kvroute->identity, // Choose recipient
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send (kvmsg, kvroute->socket);
}
return 0;
}

// This is the reactor handler for the snapshot socket; it accepts
// just the ICANHAZ? request and replies with a state snapshot ending
// with a KTHXBAI message:

static int
s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

zframe_t *identity = zframe_recv (poller->socket);
if (identity) {
// Request is in second frame of message
char *request = zstr_recv (poller->socket);
char *subtree = NULL;
if (streq (request, "ICANHAZ?")) {
free (request);
subtree = zstr_recv (poller->socket);
}
else
printf ("E: bad request, aborting\n");

if (subtree) {
// Send state socket to client
kvroute_t routing = { poller->socket, identity, subtree };
zhash_foreach (self->kvmap, s_send_single, &routing);

// Now send END message with sequence number
zclock_log ("I: sending shapshot=%d", (int) self->sequence);
zframe_send (&identity, poller->socket, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (self->sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) subtree, 0);
kvmsg_send (kvmsg, poller->socket);
kvmsg_destroy (&kvmsg);
free (subtree);
}
zframe_destroy(&identity);
}
return 0;
}

// We store each update with a new sequence number, and if necessary, a
// time-to-live. We publish updates immediately on our publisher socket:

static int
s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

kvmsg_t *kvmsg = kvmsg_recv (poller->socket);
if (kvmsg) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
if (ttl)
kvmsg_set_prop (kvmsg, "ttl",
"%" PRId64, zclock_time () + ttl * 1000);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: publishing update=%d", (int) self->sequence);
}
return 0;
}

// At regular intervals, we flush ephemeral values that have expired. This
// could be slow on very large data sets:

// If key-value pair has expired, delete it and publish the
// fact to listening clients.

static int
s_flush_single (const char *key, void *data, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

kvmsg_t *kvmsg = (kvmsg_t *) data;
int64_t ttl;
sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
if (ttl && zclock_time () >= ttl) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: publishing delete=%d", (int) self->sequence);
}
return 0;
}

static int
s_flush_ttl (zloop_t *loop, int timer_id, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
if (self->kvmap)
zhash_foreach (self->kvmap, s_flush_single, args);
return 0;
}

clonesrv5.c: Clone server, Model Five

Clone Server Reliability

topprevnext

Clone models one to five are relatively simple. We're now going to get into unpleasantly complex territory here that has me getting up for another espresso. You should appreciate that making "reliable" messaging is complex enough that you always need to ask, "do we actually need this?" before jumping into it. If you can get away with unreliable, or "good enough" reliability, you can make a huge win in terms of cost and complexity. Sure, you may lose some data now and then. It is often a good trade-off. Having said, that, and (sips) since the espresso is really good, let's jump in!

As you play with model three, you'll stop and restart the server. It might look like it recovers, but of course it's applying updates to an empty state, instead of the proper current state. Any new client joining the network will get just the latest updates, instead of all of them. So let's work out a design for making Clone work despite server failures.

Let's list the failures we want to be able to handle:

  • Clone server process crashes and is automatically or manually restarted. The process loses its state and has to get it back from somewhere.
  • Clone server machine dies and is off-line for a significant time. Clients have to switch to an alternate server somewhere.
  • Clone server process or machine gets disconnected from the network, e.g. a switch dies. It may come back at some point, but in the meantime clients need an alternate server.

Our first step is to add a second server. We can use the Binary Star pattern from Chapter four to organize these into primary and backup. Binary Star is a reactor, so it's useful that we already refactored the last server model into a reactor style.

We need to ensure that updates are not lost if the primary server crashes. The simplest technique is to send them to both servers.

The backup server can then act as a client, and keep its state synchronized by receiving updates as all clients do. It'll also get new updates from clients. It can't yet store these in its hash table, but it can hold onto them for a while.

So, Model Six introduces these changes over Model Five:

  • We use a pub-sub flow instead of a push-pull flow for client updates (to the servers). The reasons: push sockets will block if there is no recipient, and they round-robin, so we'd need to open two of them. We'll bind the servers' SUB sockets and connect the clients' PUB sockets to them. This takes care of fanning out from one client to two servers.
  • We add heartbeats to server updates (to clients), so that a client can detect when the primary server has died. It can then switch over to the backup server.
  • We connect the two servers using the Binary Star bstar reactor class. Binary Star relies on the clients to 'vote' by making an explicit request to the server they consider "master". We'll use snapshot requests for this.
  • We make all update messages uniquely identifiable by adding a UUID field. The client generates this, and the server propagates it back on re-published updates.
  • The slave server keeps a "pending list" of updates that it has received from clients, but not yet from the master server. Or, updates it's received from the master, but not yet clients. The list is ordered from oldest to newest, so that it is easy to remove updates off the head.

It's useful to design the client logic as a finite state machine. The client cycles through three states:

  • The client opens and connects its sockets, and then requests a snapshot from the first server. To avoid request storms, it will ask any given server only twice. One request might get lost, that'd be bad luck. Two getting lost would be carelessness.
  • The client waits for a reply (snapshot data) from the current server, and if it gets it, it stores it. If there is no reply within some timeout, it fails over to the next server.
  • When the client has gotten its snapshot, it waits for and processes updates. Again, if it doesn't hear anything from the server within some timeout, it fails over to the next server.

The client loops forever. It's quite likely during startup or fail-over that some clients may be trying to talk to the primary server while others are trying to talk to the backup server. The Binary Star state machine handles this, hopefully accurately. (One of the joys of making designs like this is we cannot prove they are right, we can only prove them wrong. So it's like a guy falling off a tall building. So far, so good… so far, so good…)

Figure 67 - Clone Client Finite State Machine

fig67.png

Fail-over happens as follows:

  • The client detects that primary server is no longer sending heartbeats, so has died. The client connects to the backup server and requests a new state snapshot.
  • The backup server starts to receive snapshot requests from clients, and detects that primary server has gone, so takes over as primary.
  • The backup server applies its pending list to its own hash table, and then starts to process state snapshot requests.

When the primary server comes back on-line, it will:

  • Start up as slave server, and connect to the backup server as a Clone client.
  • Start to receive updates from clients, via its SUB socket.

We make some assumptions:

  • That at least one server will keep running. If both servers crash, we lose all server state and there's no way to recover it.
  • That multiple clients do not update the same hash keys, at the same time. Client updates will arrive at the two servers in a different order. So, the backup server may apply updates from its pending list in a different order than the primary server would or did. Updates from one client will always arrive in the same order on both servers, so that is safe.

So the architecture for our high-availability server pair using the Binary Star pattern has two servers and a set of clients that talk to both servers.

Figure 68 - High-availability Clone Server Pair

fig68.png

As a first step to building this, we're going to refactor the client as a reusable class. This is partly for fun (writing asynchronous classes with ØMQ is like an exercise in elegance), but mainly because we want Clone to be really easy to plug-in to random applications. Since resilience depends on clients behaving correctly, it's much easier to guarantee this when there's a reusable client API. When we start to handle fail-over in clients, it does get a little complex (imagine mixing a Freelance client with a Clone client). So, reusability ahoy!

My usual design approach is to first design an API that feels right, then to implement that. So, we start by taking the clone client, and rewriting it to sit on top of some presumed class API called clone. Turning random code into an API means defining a reasonably stable and abstract contract with applications. For example, in Model Five, the client opened three separate sockets to the server, using endpoints that were hard-coded in the source. We could make an API with three methods, like this:

kvmsg_set_prop (kvmsg, "ttl", "%d", randof (30));

But this is both verbose and fragile. It's not a good idea to expose the internals of a design to applications. Today, we use three sockets. Tomorrow, two, or four. Do we really want to go and change every application that uses the clone class? So to hide these sausage factory details, we make a small abstraction, like this:

// Specify endpoints for each socket we need
clone_subscribe (clone, "tcp://localhost:5556");
clone_snapshot (clone, "tcp://localhost:5557");
clone_updates (clone, "tcp://localhost:5558");

// Times two, since we have two servers
clone_subscribe (clone, "tcp://localhost:5566");
clone_snapshot (clone, "tcp://localhost:5567");
clone_updates (clone, "tcp://localhost:5568");

Which has the advantage of simplicity (one server sits at one endpoint) but has an impact on our internal design. We now need to somehow turn that single endpoint into three endpoints. One way would be to bake the knowledge "client and server talk over three consecutive ports" into our client-server protocol. Another way would be to get the two missing endpoints from the server. We'll take the simplest way, which is:

  • The server state router (ROUTER) is at port P.
  • The server updates publisher (PUB) is at port P + 1.
  • The server updates subscriber (SUB) is at port P + 2.

The clone class has the same structure as the flcliapi class from Chapter Four. It consists of two parts:

  • An asynchronous clone agent that runs in a background thread. The agent handles all network I/O, talking to servers in real-time, no matter what the application is doing.
  • A synchronous 'clone' class which runs in the caller's thread. When you create a clone object, that automatically launches an agent thread, and when you destroy a clone object, it kills the agent thread.

The frontend class talks to the agent class over an inproc 'pipe' socket. In C, the CZMQ thread layer creates this pipe automatically for us as it starts an "attached thread". This is a natural pattern for multithreading over ØMQ.

Without ØMQ, this kind of asynchronous class design would be weeks of really hard work. With ØMQ, it was a day or two of work. The results are kind of complex, given the simplicity of the Clone protocol it's actually running. There are some reasons for this. We could turn this into a reactor, but that'd make it harder to use in applications. So the API looks a bit like a key-value table that magically talks to some servers:

// Specify primary and backup servers
clone_connect (clone, "tcp://localhost:5551");
clone_connect (clone, "tcp://localhost:5561");

So here is Model Six of the clone client, which has now become just a thin shell using the clone class:

// Clone client Model Six

// Lets us build this source without creating a library
#include "clone.c"
#define SUBTREE "/client/"

int main (void)
{
// Create distributed hash instance
clone_t *clone = clone_new ();

// Specify configuration
clone_subtree (clone, SUBTREE);
clone_connect (clone, "tcp://localhost", "5556");
clone_connect (clone, "tcp://localhost", "5566");

// Set random tuples into the distributed hash
while (!zctx_interrupted) {
// Set random value, check it was stored
char key [255];
char value [10];
sprintf (key, "%s%d", SUBTREE, randof (10000));
sprintf (value, "%d", randof (1000000));
clone_set (clone, key, value, randof (30));
sleep (1);
}
clone_destroy (&clone);
return 0;
}

clonecli6.c: Clone client, Model Six

And here is the actual clone class implementation:

// clone class - Clone client API stack (multithreaded)

#include "clone.h"
// If no server replies within this time, abandon request
#define GLOBAL_TIMEOUT 4000 // msecs

// =====================================================================
// Synchronous part, works in our application thread

// Structure of our class

struct _clone_t {
zctx_t *ctx; // Our context wrapper
void *pipe; // Pipe through to clone agent
};

// This is the thread that handles our real clone class
static void clone_agent (void *args, zctx_t *ctx, void *pipe);

// Here are the constructor and destructor for the clone class. Note that
// we create a context specifically for the pipe that connects our
// frontend to the backend agent:

clone_t *
clone_new (void)
{
clone_t
*self;

self = (clone_t *) zmalloc (sizeof (clone_t));
self->ctx = zctx_new ();
self->pipe = zthread_fork (self->ctx, clone_agent, NULL);
return self;
}

void
clone_destroy (clone_t **self_p)
{
assert (self_p);
if (*self_p) {
clone_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}

// Specify subtree for snapshot and updates, which we must do before
// connecting to a server as the subtree specification is sent as the
// first command to the server. Sends a [SUBTREE][subtree] command to
// the agent:

void clone_subtree (clone_t *self, char *subtree)
{
assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "SUBTREE");
zmsg_addstr (msg, subtree);
zmsg_send (&msg, self->pipe);
}

// Connect to a new server endpoint. We can connect to at most two
// servers. Sends [CONNECT][endpoint][service] to the agent:

void
clone_connect (clone_t *self, char *address, char *service)
{
assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "CONNECT");
zmsg_addstr (msg, address);
zmsg_addstr (msg, service);
zmsg_send (&msg, self->pipe);
}

// Set a new value in the shared hashmap. Sends a [SET][key][value][ttl]
// command through to the agent which does the actual work:

void
clone_set (clone_t *self, char *key, char *value, int ttl)
{
char ttlstr [10];
sprintf (ttlstr, "%d", ttl);

assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "SET");
zmsg_addstr (msg, key);
zmsg_addstr (msg, value);
zmsg_addstr (msg, ttlstr);
zmsg_send (&msg, self->pipe);
}

// Look up value in distributed hash table. Sends [GET][key] to the agent and
// waits for a value response. If there is no value available, will eventually
// return NULL:

char *
clone_get (clone_t *self, char *key)
{
assert (self);
assert (key);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "GET");
zmsg_addstr (msg, key);
zmsg_send (&msg, self->pipe);

zmsg_t *reply = zmsg_recv (self->pipe);
if (reply) {
char *value = zmsg_popstr (reply);
zmsg_destroy (&reply);
return value;
}
return NULL;
}

// The backend agent manages a set of servers, which we implement using
// our simple class model:

typedef struct {
char *address; // Server address
int port; // Server port
void *snapshot; // Snapshot socket
void *subscriber; // Incoming updates
uint64_t expiry; // When server expires
uint requests; // How many snapshot requests made?
} server_t;

static server_t *
server_new (zctx_t *ctx, char *address, int port, char *subtree)
{
server_t *self = (server_t *) zmalloc (sizeof (server_t));

zclock_log ("I: adding server %s:%d…", address, port);
self->address = strdup (address);
self->port = port;

self->snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (self->snapshot, "%s:%d", address, port);
self->subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (self->subscriber, "%s:%d", address, port + 1);
zsocket_set_subscribe (self->subscriber, subtree);
zsocket_set_subscribe (self->subscriber, "HUGZ");
return self;
}

static void
server_destroy (server_t **self_p)
{
assert (self_p);
if (*self_p) {
server_t *self = *self_p;
free (self->address);
free (self);
*self_p = NULL;
}
}

// Here is the implementation of the backend agent itself:

// Number of servers to which we will talk to
#define SERVER_MAX 2

// Server considered dead if silent for this long
#define SERVER_TTL 5000 // msecs

// States we can be in
#define STATE_INITIAL 0
// Before asking server for state
#define STATE_SYNCING 1 // Getting state from server
#define STATE_ACTIVE 2 // Getting new updates from server

typedef struct {
zctx_t *ctx; // Context wrapper
void *pipe; // Pipe back to application
zhash_t *kvmap; // Actual key/value table
char *subtree; // Subtree specification, if any
server_t *server [SERVER_MAX];
uint nbr_servers; // 0 to SERVER_MAX
uint state; // Current state
uint cur_server; // If active, server 0 or 1
int64_t sequence; // Last kvmsg processed
void *publisher; // Outgoing updates
} agent_t;

static agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->kvmap = zhash_new ();
self->subtree = strdup ("");
self->state = STATE_INITIAL;
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
return self;
}

static void
agent_destroy (agent_t **self_p)
{
assert (self_p);
if (*self_p) {
agent_t *self = *self_p;
int server_nbr;
for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++)
server_destroy (&self->server [server_nbr]);
zhash_destroy (&self->kvmap);
free (self->subtree);
free (self);
*self_p = NULL;
}
}

// Here we handle the different control messages from the frontend;
// SUBTREE, CONNECT, SET, and GET:

static int
agent_control_message (agent_t *self)
{
zmsg_t *msg = zmsg_recv (self->pipe);
char *command = zmsg_popstr (msg);
if (command == NULL)
return -1; // Interrupted

if (streq (command, "SUBTREE")) {
free (self->subtree);
self->subtree = zmsg_popstr (msg);
}
else
if (streq (command, "CONNECT")) {
char *address = zmsg_popstr (msg);
char *service = zmsg_popstr (msg);
if (self->nbr_servers < SERVER_MAX) {
self->server [self->nbr_servers++] = server_new (
self->ctx, address, atoi (service), self->subtree);
// We broadcast updates to all known servers
zsocket_connect (self->publisher, "%s:%d",
address, atoi (service) + 2);
}
else
zclock_log ("E: too many servers (max. %d)", SERVER_MAX);
free (address);
free (service);
}
else
// When we set a property, we push the new key-value pair onto
// all our connected servers:
if (streq (command, "SET")) {
char *key = zmsg_popstr (msg);
char *value = zmsg_popstr (msg);
char *ttl = zmsg_popstr (msg);

// Send key-value pair on to server
kvmsg_t *kvmsg = kvmsg_new (0);
kvmsg_set_key (kvmsg, key);
kvmsg_set_uuid (kvmsg);
kvmsg_fmt_body (kvmsg, "%s", value);
kvmsg_set_prop (kvmsg, "ttl", ttl);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
free (key);
free (value);
free (ttl);
}
else
if (streq (command, "GET")) {
char *key = zmsg_popstr (msg);
kvmsg_t *kvmsg = (kvmsg_t *) zhash_lookup (self->kvmap, key);
byte *value = kvmsg? kvmsg_body (kvmsg): NULL;
if (value)
zmq_send (self->pipe, value, kvmsg_size (kvmsg), 0);
else
zstr_send (self->pipe, "");
free (key);
}
free (command);
zmsg_destroy (&msg);
return 0;
}

// The asynchronous agent manages a server pool and handles the
// request-reply dialog when the application asks for it:

static void
clone_agent (void *args, zctx_t *ctx, void *pipe)
{
agent_t *self = agent_new (ctx, pipe);

while (true) {
zmq_pollitem_t poll_set [] = {
{ pipe, 0, ZMQ_POLLIN, 0 },
{ 0, 0, ZMQ_POLLIN, 0 }
};
int poll_timer = -1;
int poll_size = 2;
server_t *server = self->server [self->cur_server];
switch (self->state) {
case STATE_INITIAL:
// In this state we ask the server for a snapshot,
// if we have a server to talk to…
if (self->nbr_servers > 0) {
zclock_log ("I: waiting for server at %s:%d…",
server->address, server->port);
if (server->requests < 2) {
zstr_sendm (server->snapshot, "ICANHAZ?");
zstr_send (server->snapshot, self->subtree);
server->requests++;
}
server->expiry = zclock_time () + SERVER_TTL;
self->state = STATE_SYNCING;
poll_set [1].socket = server->snapshot;
}
else
poll_size = 1;
break;

case STATE_SYNCING:
// In this state we read from snapshot and we expect
// the server to respond, else we fail over.
poll_set [1].socket = server->snapshot;
break;

case STATE_ACTIVE:
// In this state we read from subscriber and we expect
// the server to give HUGZ, else we fail over.
poll_set [1].socket = server->subscriber;
break;
}
if (server) {
poll_timer = (server->expiry - zclock_time ())
* ZMQ_POLL_MSEC;
if (poll_timer < 0)
poll_timer = 0;
}
// We're ready to process incoming messages; if nothing at all
// comes from our server within the timeout, that means the
// server is dead:

int rc = zmq_poll (poll_set, poll_size, poll_timer);
if (rc == -1)
break; // Context has been shut down

if (poll_set [0].revents & ZMQ_POLLIN) {
if (agent_control_message (self))
break; // Interrupted
}
else
if (poll_set [1].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket);
if (!kvmsg)
break; // Interrupted

// Anything from server resets its expiry time
server->expiry = zclock_time () + SERVER_TTL;
if (self->state == STATE_SYNCING) {
// Store in snapshot until we're finished
server->requests = 0;
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
self->sequence = kvmsg_sequence (kvmsg);
self->state = STATE_ACTIVE;
zclock_log ("I: received from %s:%d snapshot=%d",
server->address, server->port,
(int) self->sequence);
kvmsg_destroy (&kvmsg);
}
else
kvmsg_store (&kvmsg, self->kvmap);
}
else
if (self->state == STATE_ACTIVE) {
// Discard out-of-sequence updates, incl. HUGZ
if (kvmsg_sequence (kvmsg) > self->sequence) {
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: received from %s:%d update=%d",
server->address, server->port,
(int) self->sequence);
}
else
kvmsg_destroy (&kvmsg);
}
}
else {
// Server has died, failover to next
zclock_log ("I: server at %s:%d didn't give HUGZ",
server->address, server->port);
self->cur_server = (self->cur_server + 1) % self->nbr_servers;
self->state = STATE_INITIAL;
}
}
agent_destroy (&self);
}

clone.c: Clone class

Finally, here is the sixth and last model of the clone server:

// Clone server Model Six

// Lets us build this source without creating a library
#include "bstar.c"
#include "kvmsg.c"

// We define a set of reactor handlers and our server object structure:

// Bstar reactor handlers
static int
s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int
s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int
s_flush_ttl (zloop_t *loop, int timer_id, void *args);
static int
s_send_hugz (zloop_t *loop, int timer_id, void *args);
static int
s_new_active (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int
s_new_passive (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int
s_subscriber (zloop_t *loop, zmq_pollitem_t *poller, void *args);

// Our server is defined by these properties
typedef struct {
zctx_t *ctx; // Context wrapper
zhash_t *kvmap; // Key-value store
bstar_t *bstar; // Bstar reactor core
int64_t sequence; // How many updates we're at
int port; // Main port we're working on
int peer; // Main port of our peer
void *publisher; // Publish updates and hugz
void *collector; // Collect updates from clients
void *subscriber; // Get updates from peer
zlist_t *pending; // Pending updates from clients
bool primary; // true if we're primary
bool active; // true if we're active
bool passive; // true if we're passive
} clonesrv_t;

// The main task parses the command line to decide whether to start
// as a primary or backup server. We're using the Binary Star pattern
// for reliability. This interconnects the two servers so they can
// agree on which one is primary and which one is backup. To allow the
// two servers to run on the same box, we use different ports for
// primary and backup. Ports 5003/5004 are used to interconnect the
// servers. Ports 5556/5566 are used to receive voting events (snapshot
// requests in the clone pattern). Ports 5557/5567 are used by the
// publisher, and ports 5558/5568 are used by the collector:

int main (int argc, char *argv [])
{
clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
if (argc == 2 && streq (argv [1], "-p")) {
zclock_log ("I: primary active, waiting for backup (passive)");
self->bstar = bstar_new (BSTAR_PRIMARY, "tcp://*:5003",
"tcp://localhost:5004");
bstar_voter (self->bstar, "tcp://*:5556",
ZMQ_ROUTER, s_snapshots, self);
self->port = 5556;
self->peer = 5566;
self->primary = true;
}
else
if (argc == 2 && streq (argv [1], "-b")) {
zclock_log ("I: backup passive, waiting for primary (active)");
self->bstar = bstar_new (BSTAR_BACKUP, "tcp://*:5004",
"tcp://localhost:5003");
bstar_voter (self->bstar, "tcp://*:5566",
ZMQ_ROUTER, s_snapshots, self);
self->port = 5566;
self->peer = 5556;
self->primary = false;
}
else {
printf ("Usage: clonesrv6 { -p | -b }\n");
free (self);
exit (0);
}
// Primary server will become first active
if (self->primary)
self->kvmap = zhash_new ();

self->ctx = zctx_new ();
self->pending = zlist_new ();
bstar_set_verbose (self->bstar, true);

// Set up our clone server sockets
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
self->collector = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_set_subscribe (self->collector, "");
zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);

// Set up our own clone client interface to peer
self->subscriber = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_set_subscribe (self->subscriber, "");
zsocket_connect (self->subscriber,
"tcp://localhost:%d", self->peer + 1);

// After we've setup our sockets, we register our binary star
// event handlers, and then start the bstar reactor. This finishes
// when the user presses Ctrl-C or when the process receives a SIGINT
// interrupt:

// Register state change handlers
bstar_new_active (self->bstar, s_new_active, self);
bstar_new_passive (self->bstar, s_new_passive, self);

// Register our other handlers with the bstar reactor
zmq_pollitem_t poller = { self->collector, 0, ZMQ_POLLIN };
zloop_poller (bstar_zloop (self->bstar), &poller, s_collector, self);
zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_flush_ttl, self);
zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_send_hugz, self);

// Start the bstar reactor
bstar_start (self->bstar);

// Interrupted, so shut down
while (zlist_size (self->pending)) {
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
kvmsg_destroy (&kvmsg);
}
zlist_destroy (&self->pending);
bstar_destroy (&self->bstar);
zhash_destroy (&self->kvmap);
zctx_destroy (&self->ctx);
free (self);

return 0;
}

// We handle ICANHAZ? requests exactly as in the clonesrv5 example.

// Routing information for a key-value snapshot
typedef struct {
void *socket; // ROUTER socket to send to
zframe_t *identity; // Identity of peer who requested state
char *subtree; // Client subtree specification
} kvroute_t;

// Send one state snapshot key-value pair to a socket
// Hash item data is our kvmsg object, ready to send

static int
s_send_single (const char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
&& memcmp (kvroute->subtree,
kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
zframe_send (&kvroute->identity, // Choose recipient
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send (kvmsg, kvroute->socket);
}
return 0;
}

static int
s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

zframe_t *identity = zframe_recv (poller->socket);
if (identity) {
// Request is in second frame of message
char *request = zstr_recv (poller->socket);
char *subtree = NULL;
if (streq (request, "ICANHAZ?")) {
free (request);
subtree = zstr_recv (poller->socket);
}
else
printf ("E: bad request, aborting\n");

if (subtree) {
// Send state socket to client
kvroute_t routing = { poller->socket, identity, subtree };
zhash_foreach (self->kvmap, s_send_single, &routing);

// Now send END message with sequence number
zclock_log ("I: sending shapshot=%d", (int) self->sequence);
zframe_send (&identity, poller->socket, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (self->sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) subtree, 0);
kvmsg_send (kvmsg, poller->socket);
kvmsg_destroy (&kvmsg);
free (subtree);
}
zframe_destroy(&identity);
}
return 0;
}

// The collector is more complex than in the clonesrv5 example because the
// way it processes updates depends on whether we're active or passive.
// The active applies them immediately to its kvmap, whereas the passive
// queues them as pending:

// If message was already on pending list, remove it and return true,
// else return false.

static int
s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg)
{
kvmsg_t *held = (kvmsg_t *) zlist_first (self->pending);
while (held) {
if (memcmp (kvmsg_uuid (kvmsg),
kvmsg_uuid (held), sizeof (uuid_t)) == 0) {
zlist_remove (self->pending, held);
return true;
}
held = (kvmsg_t *) zlist_next (self->pending);
}
return false;
}

static int
s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

kvmsg_t *kvmsg = kvmsg_recv (poller->socket);
if (kvmsg) {
if (self->active) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
if (ttl)
kvmsg_set_prop (kvmsg, "ttl",
"%" PRId64, zclock_time () + ttl * 1000);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: publishing update=%d", (int) self->sequence);
}
else {
// If we already got message from active, drop it, else
// hold on pending list
if (s_was_pending (self, kvmsg))
kvmsg_destroy (&kvmsg);
else
zlist_append (self->pending, kvmsg);
}
}
return 0;
}

// We purge ephemeral values using exactly the same code as in
// the previous clonesrv5 example.
// If key-value pair has expired, delete it and publish the
// fact to listening clients.

static int
s_flush_single (const char *key, void *data, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

kvmsg_t *kvmsg = (kvmsg_t *) data;
int64_t ttl;
sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
if (ttl && zclock_time () >= ttl) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: publishing delete=%d", (int) self->sequence);
}
return 0;
}

static int
s_flush_ttl (zloop_t *loop, int timer_id, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
if (self->kvmap)
zhash_foreach (self->kvmap, s_flush_single, args);
return 0;
}

// We send a HUGZ message once a second to all subscribers so that they
// can detect if our server dies. They'll then switch over to the backup
// server, which will become active:

static int
s_send_hugz (zloop_t *loop, int timer_id, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

kvmsg_t *kvmsg = kvmsg_new (self->sequence);
kvmsg_set_key (kvmsg, "HUGZ");
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, self->publisher);
kvmsg_destroy (&kvmsg);

return 0;
}

// When we switch from passive to active, we apply our pending list so that
// our kvmap is up-to-date. When we switch to passive, we wipe our kvmap
// and grab a new snapshot from the active server:

static int
s_new_active (zloop_t *loop, zmq_pollitem_t *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

self->active = true;
self->passive = false;

// Stop subscribing to updates
zmq_pollitem_t poller = { self->subscriber, 0, ZMQ_POLLIN };
zloop_poller_end (bstar_zloop (self->bstar), &poller);

// Apply pending list to own hash table
while (zlist_size (self->pending)) {
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: publishing pending=%d", (int) self->sequence);
}
return 0;
}

static int
s_new_passive (zloop_t *loop, zmq_pollitem_t *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

zhash_destroy (&self->kvmap);
self->active = false;
self->passive = true;

// Start subscribing to updates
zmq_pollitem_t poller = { self->subscriber, 0, ZMQ_POLLIN };
zloop_poller (bstar_zloop (self->bstar), &poller, s_subscriber, self);

return 0;
}

// When we get an update, we create a new kvmap if necessary, and then
// add our update to our kvmap. We're always passive in this case:

static int
s_subscriber (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
// Get state snapshot if necessary
if (self->kvmap == NULL) {
self->kvmap = zhash_new ();
void *snapshot = zsocket_new (self->ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:%d", self->peer);
zclock_log ("I: asking for snapshot from: tcp://localhost:%d",
self->peer);
zstr_sendm (snapshot, "ICANHAZ?");
zstr_send (snapshot, ""); // blank subtree to get all
while (true) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // Interrupted
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_destroy (&kvmsg);
break; // Done
}
kvmsg_store (&kvmsg, self->kvmap);
}
zclock_log ("I: received snapshot=%d", (int) self->sequence);
zsocket_destroy (self->ctx, snapshot);
}
// Find and remove update off pending list
kvmsg_t *kvmsg = kvmsg_recv (poller->socket);
if (!kvmsg)
return 0;

if (strneq (kvmsg_key (kvmsg), "HUGZ")) {
if (!s_was_pending (self, kvmsg)) {
// If active update came before client update, flip it
// around, store active update (with sequence) on pending
// list and use to clear client update when it comes later
zlist_append (self->pending, kvmsg_dup (kvmsg));
}
// If update is more recent than our kvmap, apply it
if (kvmsg_sequence (kvmsg) > self->sequence) {
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: received update=%d", (int) self->sequence);
}
else
kvmsg_destroy (&kvmsg);
}
else
kvmsg_destroy (&kvmsg);

return 0;
}

clonesrv6.c: Clone server, Model Six

This main program is only a few hundred lines of code, but it took some time to get working. To be accurate, building Model Six took about a full week of "sweet god, this is just too complex for the Guide" hacking. We've assembled pretty much everything and the kitchen sink into this small application. We have fail-over, ephemeral values, subtrees, and so on. What surprised me was that the upfront design was pretty accurate. But the details of writing and debugging so many socket flows is something special. Here's how I made this work:

  • By using reactors (bstar, on top of zloop), which remove a lot of grunt-work from the code, and leave what remains simpler and more obvious. The whole server runs as one thread, so there's no inter-thread weirdness going on. Just pass a structure pointer ('self') around to all handlers, which can do their thing happily. One nice side-effect of using reactors is that code, being less tightly integrated into a poll loop, is much easier to reuse. Large chunks of Model Six are taken from Model Five.
  • By building it piece by piece, and getting each piece working properly before going onto the next one. Since there are four or five main socket flows, that meant quite a lot of debugging and testing. I debug just by printing stuff to the console (e.g. dumping messages). There's no sense in actually opening a debugger for this kind of work.
  • By always testing under Valgrind, so that I'm sure there are no memory leaks. In C this is a major concern, you can't delegate to some garbage collector. Using proper and consistent abstractions like kvmsg and CZMQ helps enormously.

I'm sure the code still has flaws which kind readers will spend weekends debugging and fixing for me. I'm happy enough with this model to use it as the basis for real applications.

To test the sixth model, start the primary server and backup server, and a set of clients, in any order. Then kill and restart one of the servers, randomly, and keep doing this. If the design and code is accurate, clients will continue to get the same stream of updates from whatever server is currently master.

Clone Protocol Specification

topprevnext

After this much work to build reliable pub-sub, we want some guarantee that we can safely build applications to exploit the work. A good start is to write-up the protocol. This lets us make implementations in other languages and lets us improve the design on paper, rather than hands-deep in code.

Here, then, is the Clustered Hashmap Protocol, which "defines a cluster-wide key-value hashmap, and mechanisms for sharing this across a set of clients. CHP allows clients to work with subtrees of the hashmap, to update values, and to define ephemeral values."

(More coming soon…)