Suicidal Snail in C

// Suicidal Snail

#include "czmq.h"

// This is our subscriber. It connects to the publisher and subscribes
// to everything. It sleeps for a short time between messages to
// simulate doing too much work. If a message is more than one second
// late, it croaks.

#define MAX_ALLOWED_DELAY 1000 // msecs

static void
subscriber (void *args, zctx_t *ctx, void *pipe)
// Subscribe to everything
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_set_subscribe (subscriber, "");
zsocket_connect (subscriber, "tcp://localhost:5556");

// Get and process messages
while (true) {
char *string = zstr_recv (subscriber);
printf("%s\n", string);
int64_t clock;
int terms = sscanf (string, "%" PRId64, &clock);
assert (terms == 1);
free (string);

// Suicide snail logic
if (zclock_time () - clock > MAX_ALLOWED_DELAY) {
fprintf (stderr, "E: subscriber cannot keep up, aborting\n");
// Work for 1 msec plus some random additional time
zclock_sleep (1 + randof (2));
zstr_send (pipe, "gone and died");

// This is our publisher task. It publishes a time-stamped message to its
// PUB socket every millisecond:

static void
publisher (void *args, zctx_t *ctx, void *pipe)
// Prepare publisher
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5556");

while (true) {
// Send current clock (msecs) to subscribers
char string [20];
sprintf (string, "%" PRId64, zclock_time ());
zstr_send (publisher, string);
char *signal = zstr_recv_nowait (pipe);
if (signal) {
free (signal);
zclock_sleep (1); // 1msec wait

// The main task simply starts a client and a server, and then
// waits for the client to signal that it has died:

int main (void)
zctx_t *ctx = zctx_new ();
void *pubpipe = zthread_fork (ctx, publisher, NULL);
void *subpipe = zthread_fork (ctx, subscriber, NULL);
free (zstr_recv (subpipe));
zstr_send (pubpipe, "break");
zclock_sleep (100);
zctx_destroy (&ctx);
return 0;