Espresso Pattern in C

// Espresso Pattern
// This shows how to capture data using a pub-sub proxy

#include "czmq.h"

// The subscriber thread requests messages starting with
// A and B, then reads and counts incoming messages.

static void
subscriber_thread (void *args, zctx_t *ctx, void *pipe)
{
// Subscribe to "A" and "B"
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (subscriber, "tcp://localhost:6001");
zsocket_set_subscribe (subscriber, "A");
zsocket_set_subscribe (subscriber, "B");

int count = 0;
while (count < 5) {
char *string = zstr_recv (subscriber);
if (!string)
break; // Interrupted
free (string);
count++;
}
zsocket_destroy (ctx, subscriber);
}

// The publisher sends random messages starting with A-J:

static void
publisher_thread (void *args, zctx_t *ctx, void *pipe)
{
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:6000");

while (!zctx_interrupted) {
char string [10];
sprintf (string, "%c-%05d", randof (10) + 'A', randof (100000));
if (zstr_send (publisher, string) == -1)
break; // Interrupted
zclock_sleep (100); // Wait for 1/10th second
}
}

// The listener receives all messages flowing through the proxy, on its
// pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connect
// attached child threads. In other languages your mileage may vary:

static void
listener_thread (void *args, zctx_t *ctx, void *pipe)
{
// Print everything that arrives on pipe
while (true) {
zframe_t *frame = zframe_recv (pipe);
if (!frame)
break; // Interrupted
zframe_print (frame, NULL);
zframe_destroy (&frame);
}
}

// The main task starts the subscriber and publisher, and then sets
// itself up as a listening proxy. The listener runs as a child thread:

int main (void)
{
// Start child threads
zctx_t *ctx = zctx_new ();
zthread_fork (ctx, publisher_thread, NULL);
zthread_fork (ctx, subscriber_thread, NULL);

void *subscriber = zsocket_new (ctx, ZMQ_XSUB);
zsocket_connect (subscriber, "tcp://localhost:6000");
void *publisher = zsocket_new (ctx, ZMQ_XPUB);
zsocket_bind (publisher, "tcp://*:6001");
void *listener = zthread_fork (ctx, listener_thread, NULL);
zmq_proxy (subscriber, publisher, listener);

puts (" interrupted");
// Tell attached threads to exit
zctx_destroy (&ctx);
return 0;
}