Last Value Caching Proxy in C

// Last value cache
// Uses XPUB subscription messages to re-send data

#include "czmq.h"

int main (void)
{
zctx_t *context = zctx_new ();
void *frontend = zsocket_new (context, ZMQ_SUB);
zsocket_connect (frontend, "tcp://*:5557");
void *backend = zsocket_new (context, ZMQ_XPUB);
zsocket_bind (backend, "tcp://*:5558");

// Subscribe to every single topic from publisher
zsocket_set_subscribe (frontend, "");

// Store last instance of each topic in a cache
zhash_t *cache = zhash_new ();

// We route topic updates from frontend to backend, and
// we handle subscriptions by sending whatever we cached,
// if anything:
while (true) {
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
if (zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC) == -1)
break; // Interrupted

// Any new topic data we cache and then forward
if (items [0].revents & ZMQ_POLLIN) {
char *topic = zstr_recv (frontend);
char *current = zstr_recv (frontend);
if (!topic)
break;
char *previous = zhash_lookup (cache, topic);
if (previous) {
zhash_delete (cache, topic);
free (previous);
}
zhash_insert (cache, topic, current);
zstr_sendm (backend, topic);
zstr_send (backend, current);
free (topic);
}
// When we get a new subscription, we pull data from the cache:
if (items [1].revents & ZMQ_POLLIN) {
zframe_t *frame = zframe_recv (backend);
if (!frame)
break;
// Event is one byte 0=unsub or 1=sub, followed by topic
byte *event = zframe_data (frame);
if (event [0] == 1) {
char *topic = zmalloc (zframe_size (frame));
memcpy (topic, event + 1, zframe_size (frame) - 1);
printf ("Sending cached topic %s\n", topic);
char *previous = zhash_lookup (cache, topic);
if (previous) {
zstr_sendm (backend, topic);
zstr_send (backend, previous);
}
free (topic);
}
zframe_destroy (&frame);
}
}
zctx_destroy (&context);
zhash_destroy (&cache);
return 0;
}