Prototype local and cloud flow in C

// Broker peering simulation (part 2)
// Prototypes the request-reply flow

#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define WORKER_READY "\001" // Signals worker is ready

// Our own name; in practice this would be configured per node
static char *self;

// The client task does a request-reply dialog using a standard
// synchronous REQ socket:

static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://%s-localfe.ipc", self);

while (true) {
// Send request, get reply
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}

// The worker task plugs into the load-balancer using a REQ
// socket:

static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://%s-localbe.ipc", self);

// Tell broker we're ready for work
zframe_t *frame = zframe_new (WORKER_READY, 1);
zframe_send (&frame, worker, 0);

// Process messages as they arrive
while (true) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted

zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}

// The main task begins by setting-up its frontend and backend sockets
// and then starting its client and worker tasks:

int main (int argc, char *argv [])
{
// First argument is this broker's name
// Other arguments are our peers' names
//
if (argc < 2) {
printf ("syntax: peering2 me {you}…\n");
return 0;
}
self = argv [1];
printf ("I: preparing broker at %s…\n", self);
srandom ((unsigned) time (NULL));

zctx_t *ctx = zctx_new ();

// Bind cloud frontend to endpoint
void *cloudfe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_set_identity (cloudfe, self);
zsocket_bind (cloudfe, "ipc://%s-cloud.ipc", self);

// Connect cloud backend to all peers
void *cloudbe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_set_identity (cloudbe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: connecting to cloud frontend at '%s'\n", peer);
zsocket_connect (cloudbe, "ipc://%s-cloud.ipc", peer);
}
// Prepare local frontend and backend
void *localfe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localfe, "ipc://%s-localfe.ipc", self);
void *localbe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localbe, "ipc://%s-localbe.ipc", self);

// Get user to tell us when we can start…
printf ("Press Enter when all brokers are started: ");
getchar ();

// Start local workers
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (worker_task, NULL);

// Start local clients
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (client_task, NULL);

// Interesting part
// Here, we handle the request-reply flow. We're using load-balancing
// to poll workers at all times, and clients only when there are one //
// or more workers available.//

// Least recently used queue of available workers
int capacity = 0;
zlist_t *workers = zlist_new ();

while (true) {
// First, route any waiting replies from workers
zmq_pollitem_t backends [] = {
{ localbe, 0, ZMQ_POLLIN, 0 },
{ cloudbe, 0, ZMQ_POLLIN, 0 }
};
// If we have no workers, wait indefinitely
int rc = zmq_poll (backends, 2,
capacity? 1000 * ZMQ_POLL_MSEC: -1);
if (rc == -1)
break; // Interrupted

// Handle reply from local worker
zmsg_t *msg = NULL;
if (backends [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localbe);
if (!msg)
break; // Interrupted
zframe_t *identity = zmsg_unwrap (msg);
zlist_append (workers, identity);
capacity++;

// If it's READY, don't route the message any further
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0)
zmsg_destroy (&msg);
}
// Or handle reply from peer broker
else
if (backends [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudbe);
if (!msg)
break; // Interrupted
// We don't use peer broker identity for anything
zframe_t *identity = zmsg_unwrap (msg);
zframe_destroy (&identity);
}
// Route reply to cloud if it's addressed to a broker
for (argn = 2; msg && argn < argc; argn++) {
char *data = (char *) zframe_data (zmsg_first (msg));
size_t size = zframe_size (zmsg_first (msg));
if (size == strlen (argv [argn])
&& memcmp (data, argv [argn], size) == 0)
zmsg_send (&msg, cloudfe);
}
// Route reply to client if we still need to
if (msg)
zmsg_send (&msg, localfe);

// Now we route as many client requests as we have worker capacity
// for. We may reroute requests from our local frontend, but not from //
// the cloud frontend. We reroute randomly now, just to test things
// out. In the next version, we'll do this properly by calculating
// cloud capacity://

while (capacity) {
zmq_pollitem_t frontends [] = {
{ localfe, 0, ZMQ_POLLIN, 0 },
{ cloudfe, 0, ZMQ_POLLIN, 0 }
};
rc = zmq_poll (frontends, 2, 0);
assert (rc >= 0);
int reroutable = 0;
// We'll do peer brokers first, to prevent starvation
if (frontends [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudfe);
reroutable = 0;
}
else
if (frontends [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localfe);
reroutable = 1;
}
else
break; // No work, go back to backends

// If reroutable, send to cloud 20% of the time
// Here we'd normally use cloud status information
//
if (reroutable && argc > 2 && randof (5) == 0) {
// Route to random broker peer
int peer = randof (argc - 2) + 2;
zmsg_pushmem (msg, argv [peer], strlen (argv [peer]));
zmsg_send (&msg, cloudbe);
}
else {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zmsg_wrap (msg, frame);
zmsg_send (&msg, localbe);
capacity--;
}
}
}
// When we're done, clean up properly
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}