Chapter 4 - Reliable Request-Reply Patterns #
Chapter 3 - Advanced Request-Reply Patterns covered advanced uses of ZeroMQ’s request-reply pattern with working examples. This chapter looks at the general question of reliability and builds a set of reliable messaging patterns on top of ZeroMQ’s core request-reply pattern.
In this chapter, we focus heavily on user-space request-reply patterns, reusable models that help you design your own ZeroMQ architectures:
- The Lazy Pirate pattern: reliable request-reply from the client side
- The Simple Pirate pattern: reliable request-reply using load balancing
- The Paranoid Pirate pattern: reliable request-reply with heartbeating
- The Majordomo pattern: service-oriented reliable queuing
- The Titanic pattern: disk-based/disconnected reliable queuing
- The Binary Star pattern: primary-backup server failover
- The Freelance pattern: brokerless reliable request-reply
What is “Reliability”? #
Most people who speak of “reliability” don’t really know what they mean. We can only define reliability in terms of failure. That is, if we can handle a certain set of well-defined and understood failures, then we are reliable with respect to those failures. No more, no less. So let’s look at the possible causes of failure in a distributed ZeroMQ application, in roughly descending order of probability:
-
Application code is the worst offender. It can crash and exit, freeze and stop responding to input, run too slowly for its input, exhaust all memory, and so on.
-
System code–such as brokers we write using ZeroMQ–can die for the same reasons as application code. System code should be more reliable than application code, but it can still crash and burn, and especially run out of memory if it tries to queue messages for slow clients.
-
Message queues can overflow, typically in system code that has learned to deal brutally with slow clients. When a queue overflows, it starts to discard messages. So we get “lost” messages.
-
Networks can fail (e.g., WiFi gets switched off or goes out of range). ZeroMQ will automatically reconnect in such cases, but in the meantime, messages may get lost.
-
Hardware can fail and take with it all the processes running on that box.
-
Networks can fail in exotic ways, e.g., some ports on a switch may die and those parts of the network become inaccessible.
-
Entire data centers can be struck by lightning, earthquakes, fire, or more mundane power or cooling failures.
To make a software system fully reliable against all of these possible failures is an enormously difficult and expensive job and goes beyond the scope of this book.
Because the first five cases in the above list cover 99.9% of real world requirements outside large companies (according to a highly scientific study I just ran, which also told me that 78% of statistics are made up on the spot, and moreover never to trust a statistic that we didn’t falsify ourselves), that’s what we’ll examine. If you’re a large company with money to spend on the last two cases, contact my company immediately! There’s a large hole behind my beach house waiting to be converted into an executive swimming pool.
Designing Reliability #
So to make things brutally simple, reliability is “keeping things working properly when code freezes or crashes”, a situation we’ll shorten to “dies”. However, the things we want to keep working properly are more complex than just messages. We need to take each core ZeroMQ messaging pattern and see how to make it work (if we can) even when code dies.
Let’s take them one-by-one:
-
Request-reply: if the server dies (while processing a request), the client can figure that out because it won’t get an answer back. Then it can give up in a huff, wait and try again later, find another server, and so on. As for the client dying, we can brush that off as “someone else’s problem” for now.
-
Pub-sub: if the client dies (having gotten some data), the server doesn’t know about it. Pub-sub doesn’t send any information back from client to server. But the client can contact the server out-of-band, e.g., via request-reply, and ask, “please resend everything I missed”. As for the server dying, that’s out of scope for here. Subscribers can also self-verify that they’re not running too slowly, and take action (e.g., warn the operator and die) if they are.
-
Pipeline: if a worker dies (while working), the ventilator doesn’t know about it. Pipelines, like the grinding gears of time, only work in one direction. But the downstream collector can detect that one task didn’t get done, and send a message back to the ventilator saying, “hey, resend task 324!” If the ventilator or collector dies, whatever upstream client originally sent the work batch can get tired of waiting and resend the whole lot. It’s not elegant, but system code should really not die often enough to matter.
In this chapter we’ll focus just on request-reply, which is the low-hanging fruit of reliable messaging.
The basic request-reply pattern (a REQ client socket doing a blocking send/receive to a REP server socket) scores low on handling the most common types of failure. If the server crashes while processing the request, the client just hangs forever. If the network loses the request or the reply, the client hangs forever.
Request-reply is still much better than TCP, thanks to ZeroMQ’s ability to reconnect peers silently, to load balance messages, and so on. But it’s still not good enough for real work. The only case where you can really trust the basic request-reply pattern is between two threads in the same process where there’s no network or separate server process to die.
However, with a little extra work, this humble pattern becomes a good basis for real work across a distributed network, and we get a set of reliable request-reply (RRR) patterns that I like to call the Pirate patterns (you’ll eventually get the joke, I hope).
There are, in my experience, roughly three ways to connect clients to servers. Each needs a specific approach to reliability:
-
Multiple clients talking directly to a single server. Use case: a single well-known server to which clients need to talk. Types of failure we aim to handle: server crashes and restarts, and network disconnects.
-
Multiple clients talking to a broker proxy that distributes work to multiple workers. Use case: service-oriented transaction processing. Types of failure we aim to handle: worker crashes and restarts, worker busy looping, worker overload, queue crashes and restarts, and network disconnects.
-
Multiple clients talking to multiple servers with no intermediary proxies. Use case: distributed services such as name resolution. Types of failure we aim to handle: service crashes and restarts, service busy looping, service overload, and network disconnects.
Each of these approaches has its trade-offs and often you’ll mix them. We’ll look at all three in detail.
Client-Side Reliability (Lazy Pirate Pattern) #
We can get very simple reliable request-reply with some changes to the client. We call this the Lazy Pirate pattern. Rather than doing a blocking receive, we:
- Poll the REQ socket and receive from it only when it’s sure a reply has arrived.
- Resend a request, if no reply has arrived within a timeout period.
- Abandon the transaction if there is still no reply after several requests.
If you try to use a REQ socket in anything other than a strict send/receive fashion, you’ll get an error (technically, the REQ socket implements a small finite-state machine to enforce the send/receive ping-pong, and so the error code is called “EFSM”). This is slightly annoying when we want to use REQ in a pirate pattern, because we may send several requests before getting a reply.
The pretty good brute force solution is to close and reopen the REQ socket after an error:
lpclient: Lazy Pirate client in Ada
lpclient: Lazy Pirate client in Basic
lpclient: Lazy Pirate client in C
#include <czmq.h>
#define REQUEST_TIMEOUT 2500 // msecs, (>1000!)
#define REQUEST_RETRIES 3 // Before we abandon
#define SERVER_ENDPOINT "tcp://localhost:5555"
int main()
{
zsock_t *client = zsock_new_req(SERVER_ENDPOINT);
printf("I: Connecting to server...\n");
assert(client);
int sequence = 0;
int retries_left = REQUEST_RETRIES;
printf("Entering while loop...\n");
while(retries_left) // interrupt needs to be handled
{
// We send a request, then we get a reply
char request[10];
sprintf(request, "%d", ++sequence);
zstr_send(client, request);
int expect_reply = 1;
while(expect_reply)
{
printf("Expecting reply....\n");
zmq_pollitem_t items [] = {{zsock_resolve(client), 0, ZMQ_POLLIN, 0}};
printf("After polling\n");
int rc = zmq_poll(items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
printf("Polling Done.. \n");
if (rc == -1)
break; // Interrupted
// Here we process a server reply and exit our loop if the
// reply is valid. If we didn't get a reply we close the
// client socket, open it again and resend the request. We
// try a number times before finally abandoning:
if (items[0].revents & ZMQ_POLLIN)
{
// We got a reply from the server, must match sequence
char *reply = zstr_recv(client);
if(!reply)
break; // interrupted
if (atoi(reply) == sequence)
{
printf("I: server replied OK (%s)\n", reply);
retries_left=REQUEST_RETRIES;
expect_reply = 0;
}
else
{
printf("E: malformed reply from server: %s\n", reply);
}
free(reply);
}
else
{
if(--retries_left == 0)
{
printf("E: Server seems to be offline, abandoning\n");
break;
}
else
{
printf("W: no response from server, retrying...\n");
zsock_destroy(&client);
printf("I: reconnecting to server...\n");
client = zsock_new_req(SERVER_ENDPOINT);
zstr_send(client, request);
}
}
}
zsock_destroy(&client);
return 0;
}
}
lpclient: Lazy Pirate client in C++
//
// Lazy Pirate client
// Use zmq_poll to do a safe request-reply
// To run, start piserver and then randomly kill/restart it
//
#include "zhelpers.hpp"
#include <sstream>
#define REQUEST_TIMEOUT 2500 // msecs, (> 1000!)
#define REQUEST_RETRIES 3 // Before we abandon
// Helper function that returns a new configured socket
// connected to the Hello World server
//
static zmq::socket_t * s_client_socket (zmq::context_t & context) {
std::cout << "I: connecting to server..." << std::endl;
zmq::socket_t * client = new zmq::socket_t (context, ZMQ_REQ);
client->connect ("tcp://localhost:5555");
// Configure socket to not wait at close time
int linger = 0;
client->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
return client;
}
int main () {
zmq::context_t context (1);
zmq::socket_t * client = s_client_socket (context);
int sequence = 0;
int retries_left = REQUEST_RETRIES;
while (retries_left) {
std::stringstream request;
request << ++sequence;
s_send (*client, request.str());
sleep (1);
bool expect_reply = true;
while (expect_reply) {
// Poll socket for a reply, with timeout
zmq::pollitem_t items[] = {
{ *client, 0, ZMQ_POLLIN, 0 } };
zmq::poll (&items[0], 1, REQUEST_TIMEOUT);
// If we got a reply, process it
if (items[0].revents & ZMQ_POLLIN) {
// We got a reply from the server, must match sequence
std::string reply = s_recv (*client);
if (atoi (reply.c_str ()) == sequence) {
std::cout << "I: server replied OK (" << reply << ")" << std::endl;
retries_left = REQUEST_RETRIES;
expect_reply = false;
}
else {
std::cout << "E: malformed reply from server: " << reply << std::endl;
}
}
else
if (--retries_left == 0) {
std::cout << "E: server seems to be offline, abandoning" << std::endl;
expect_reply = false;
break;
}
else {
std::cout << "W: no response from server, retrying..." << std::endl;
// Old socket will be confused; close it and open a new one
delete client;
client = s_client_socket (context);
// Send request again, on new socket
s_send (*client, request.str());
}
}
}
delete client;
return 0;
}
lpclient: Lazy Pirate client in C#
lpclient: Lazy Pirate client in CL
lpclient: Lazy Pirate client in Delphi
program lpclient;
//
// Lazy Pirate client
// Use zmq_poll to do a safe request-reply
// To run, start lpserver and then randomly kill/restart it
// @author Varga Balazs <bb.varga@gmail.com>
//
{$APPTYPE CONSOLE}
uses
SysUtils
, zmqapi
;
const
REQUEST_TIMEOUT = 2500; // msecs, (> 1000!)
REQUEST_RETRIES = 3; // Before we abandon
SERVER_ENDPOINT = 'tcp://localhost:5555';
var
ctx: TZMQContext;
client: TZMQSocket;
sequence,
retries_left,
expect_reply: Integer;
request,
reply: Utf8String;
poller: TZMQPoller;
begin
ctx := TZMQContext.create;
Writeln( 'I: connecting to server...' );
client := ctx.Socket( stReq );
client.Linger := 0;
client.connect( SERVER_ENDPOINT );
poller := TZMQPoller.Create( true );
poller.Register( client, [pePollIn] );
sequence := 0;
retries_left := REQUEST_RETRIES;
while ( retries_left > 0 ) and not ctx.Terminated do
try
// We send a request, then we work to get a reply
inc( sequence );
request := Format( '%d', [sequence] );
client.send( request );
expect_reply := 1;
while ( expect_reply > 0 ) do
begin
// Poll socket for a reply, with timeout
poller.poll( REQUEST_TIMEOUT );
// Here we process a server reply and exit our loop if the
// reply is valid. If we didn't a reply we close the client
// socket and resend the request. We try a number of times
// before finally abandoning:
if pePollIn in poller.PollItem[0].revents then
begin
// We got a reply from the server, must match sequence
client.recv( reply );
if StrToInt( reply ) = sequence then
begin
Writeln( Format( 'I: server replied OK (%s)', [reply] ) );
retries_left := REQUEST_RETRIES;
expect_reply := 0;
end else
Writeln( Format( 'E: malformed reply from server: %s', [ reply ] ) );
end else
begin
dec( retries_left );
if retries_left = 0 then
begin
Writeln( 'E: server seems to be offline, abandoning' );
break;
end else
begin
Writeln( 'W: no response from server, retrying...' );
// Old socket is confused; close it and open a new one
poller.Deregister( client, [pePollIn] );
client.Free;
Writeln( 'I: reconnecting to server...' );
client := ctx.Socket( stReq );
client.Linger := 0;
client.connect( SERVER_ENDPOINT );
poller.Register( client, [pePollIn] );
// Send request again, on new socket
client.send( request );
end;
end;
end;
except
end;
poller.Free;
ctx.Free;
end.
lpclient: Lazy Pirate client in Erlang
lpclient: Lazy Pirate client in Elixir
lpclient: Lazy Pirate client in F#
lpclient: Lazy Pirate client in Felix
lpclient: Lazy Pirate client in Go
// Lazy Pirate client
// Use zmq_poll to do a safe request-reply
// To run, start lpserver and then randomly kill/restart it
//
// Author: iano <scaly.iano@gmail.com>
// Based on C example
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq"
"strconv"
"time"
)
const (
REQUEST_TIMEOUT = time.Duration(2500) * time.Millisecond
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"
)
func main() {
context, _ := zmq.NewContext()
defer context.Close()
fmt.Println("I: Connecting to server...")
client, _ := context.NewSocket(zmq.REQ)
client.Connect(SERVER_ENDPOINT)
for sequence, retriesLeft := 1, REQUEST_RETRIES; retriesLeft > 0; sequence++ {
fmt.Printf("I: Sending (%d)\n", sequence)
client.Send([]byte(strconv.Itoa(sequence)), 0)
for expectReply := true; expectReply; {
// Poll socket for a reply, with timeout
items := zmq.PollItems{
zmq.PollItem{Socket: client, Events: zmq.POLLIN},
}
if _, err := zmq.Poll(items, REQUEST_TIMEOUT); err != nil {
panic(err) // Interrupted
}
// .split process server reply
// Here we process a server reply and exit our loop if the
// reply is valid. If we didn't a reply we close the client
// socket and resend the request. We try a number of times
// before finally abandoning:
if item := items[0]; item.REvents&zmq.POLLIN != 0 {
// We got a reply from the server, must match sequence
reply, err := item.Socket.Recv(0)
if err != nil {
panic(err) // Interrupted
}
if replyInt, err := strconv.Atoi(string(reply)); replyInt == sequence && err == nil {
fmt.Printf("I: Server replied OK (%s)\n", reply)
retriesLeft = REQUEST_RETRIES
expectReply = false
} else {
fmt.Printf("E: Malformed reply from server: %s", reply)
}
} else if retriesLeft--; retriesLeft == 0 {
fmt.Println("E: Server seems to be offline, abandoning")
client.SetLinger(0)
client.Close()
break
} else {
fmt.Println("W: No response from server, retrying...")
// Old socket is confused; close it and open a new one
client.SetLinger(0)
client.Close()
client, _ = context.NewSocket(zmq.REQ)
client.Connect(SERVER_ENDPOINT)
fmt.Printf("I: Resending (%d)\n", sequence)
// Send request again, on new socket
client.Send([]byte(strconv.Itoa(sequence)), 0)
}
}
}
}
lpclient: Lazy Pirate client in Haskell
{--
Lazy Pirate client in Haskell
--}
module Main where
import System.ZMQ4.Monadic
import System.Random (randomRIO)
import System.Exit (exitSuccess)
import Control.Monad (forever, when)
import Control.Concurrent (threadDelay)
import Data.ByteString.Char8 (pack, unpack)
requestRetries = 3
requestTimeout_ms = 2500
serverEndpoint = "tcp://localhost:5555"
main :: IO ()
main =
runZMQ $ do
liftIO $ putStrLn "I: Connecting to server"
client <- socket Req
connect client serverEndpoint
sendServer 1 requestRetries client
sendServer :: Int -> Int -> Socket z Req -> ZMQ z ()
sendServer _ 0 _ = return ()
sendServer seq retries client = do
send client [] (pack $ show seq)
pollServer seq retries client
pollServer :: Int -> Int -> Socket z Req -> ZMQ z ()
pollServer seq retries client = do
[evts] <- poll requestTimeout_ms [Sock client [In] Nothing]
if In `elem` evts
then do
reply <- receive client
if (read . unpack $ reply) == seq
then do
liftIO $ putStrLn $ "I: Server replied OK " ++ (unpack reply)
sendServer (seq+1) requestRetries client
else do
liftIO $ putStrLn $ "E: malformed reply from server: " ++ (unpack reply)
pollServer seq retries client
else
if retries == 0
then liftIO $ putStrLn "E: Server seems to be offline, abandoning" >> exitSuccess
else do
liftIO $ putStrLn $ "W: No response from server, retrying..."
client' <- socket Req
connect client' serverEndpoint
send client' [] (pack $ show seq)
pollServer seq (retries-1) client'
lpclient: Lazy Pirate client in Haxe
package ;
import haxe.Stack;
import neko.Lib;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import org.zeromq.ZMQPoller;
import org.zeromq.ZSocket;
/**
* Lazy Pirate client
* Use zmq_poll to do a safe request-reply
* To run, start lpserver and then randomly kill / restart it.
*
* @see http://zguide.zeromq.org/page:all#Client-side-Reliability-Lazy-Pirate-Pattern
*/
class LPClient
{
private static inline var REQUEST_TIMEOUT = 2500; // msecs, (> 1000!)
private static inline var REQUEST_RETRIES = 3; // Before we abandon
private static inline var SERVER_ENDPOINT = "tcp://localhost:5555";
public static function main() {
Lib.println("** LPClient (see: http://zguide.zeromq.org/page:all#Client-side-Reliability-Lazy-Pirate-Pattern)");
var ctx:ZContext = new ZContext();
Lib.println("I: connecting to server ...");
var client = ctx.createSocket(ZMQ_REQ);
if (client == null)
return;
client.connect(SERVER_ENDPOINT);
var sequence = 0;
var retries_left = REQUEST_RETRIES;
var poller = new ZMQPoller();
while (retries_left > 0 && !ZMQ.isInterrupted()) {
// We send a request, then we work to get a reply
var request = Std.string(++sequence);
ZFrame.newStringFrame(request).send(client);
var expect_reply = true;
while (expect_reply) {
poller.registerSocket(client, ZMQ.ZMQ_POLLIN());
// Poll socket for a reply, with timeout
try {
var res = poller.poll(REQUEST_TIMEOUT * 1000);
} catch (e:ZMQException) {
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
ctx.destroy();
return;
}
// If we got a reply, process it
if (poller.pollin(1)) {
// We got a reply from the server, must match sequence
var replyFrame = ZFrame.recvFrame(client);
if (replyFrame == null)
break; // Interrupted
if (Std.parseInt(replyFrame.toString()) == sequence) {
Lib.println("I: server replied OK (" + sequence + ")");
retries_left = REQUEST_RETRIES;
expect_reply = false;
} else
Lib.println("E: malformed reply from server: " + replyFrame.toString());
replyFrame.destroy();
} else if (--retries_left == 0) {
Lib.println("E: server seems to be offline, abandoning");
break;
} else {
Lib.println("W: no response from server, retrying...");
// Old socket is confused, close it and open a new one
ctx.destroySocket(client);
Lib.println("I: reconnecting to server...");
client = ctx.createSocket(ZMQ_REQ);
client.connect(SERVER_ENDPOINT);
// Send request again, on new socket
ZFrame.newStringFrame(request).send(client);
}
poller.unregisterAllSockets();
}
}
ctx.destroy();
}
}
lpclient: Lazy Pirate client in Java
package guide;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
//
// Lazy Pirate client
// Use zmq_poll to do a safe request-reply
// To run, start lpserver and then randomly kill/restart it
//
public class lpclient
{
private final static int REQUEST_TIMEOUT = 2500; // msecs, (> 1000!)
private final static int REQUEST_RETRIES = 3; // Before we abandon
private final static String SERVER_ENDPOINT = "tcp://localhost:5555";
public static void main(String[] argv)
{
try (ZContext ctx = new ZContext()) {
System.out.println("I: connecting to server");
Socket client = ctx.createSocket(SocketType.REQ);
assert (client != null);
client.connect(SERVER_ENDPOINT);
Poller poller = ctx.createPoller(1);
poller.register(client, Poller.POLLIN);
int sequence = 0;
int retriesLeft = REQUEST_RETRIES;
while (retriesLeft > 0 && !Thread.currentThread().isInterrupted()) {
// We send a request, then we work to get a reply
String request = String.format("%d", ++sequence);
client.send(request);
int expect_reply = 1;
while (expect_reply > 0) {
// Poll socket for a reply, with timeout
int rc = poller.poll(REQUEST_TIMEOUT);
if (rc == -1)
break; // Interrupted
// Here we process a server reply and exit our loop if the
// reply is valid. If we didn't a reply we close the client
// socket and resend the request. We try a number of times
// before finally abandoning:
if (poller.pollin(0)) {
// We got a reply from the server, must match
// getSequence
String reply = client.recvStr();
if (reply == null)
break; // Interrupted
if (Integer.parseInt(reply) == sequence) {
System.out.printf(
"I: server replied OK (%s)\n", reply
);
retriesLeft = REQUEST_RETRIES;
expect_reply = 0;
}
else System.out.printf(
"E: malformed reply from server: %s\n", reply
);
}
else if (--retriesLeft == 0) {
System.out.println(
"E: server seems to be offline, abandoning\n"
);
break;
}
else {
System.out.println(
"W: no response from server, retrying\n"
);
// Old socket is confused; close it and open a new one
poller.unregister(client);
ctx.destroySocket(client);
System.out.println("I: reconnecting to server\n");
client = ctx.createSocket(SocketType.REQ);
client.connect(SERVER_ENDPOINT);
poller.register(client, Poller.POLLIN);
// Send request again, on new socket
client.send(request);
}
}
}
}
}
}
lpclient: Lazy Pirate client in Julia
lpclient: Lazy Pirate client in Lua
--
-- Lazy Pirate client
-- Use zmq_poll to do a safe request-reply
-- To run, start lpserver and then randomly kill/restart it
--
-- Author: Robert G. Jakabosky <bobby@sharedrealm.com>
--
require"zmq"
require"zmq.poller"
require"zhelpers"
local REQUEST_TIMEOUT = 2500 -- msecs, (> 1000!)
local REQUEST_RETRIES = 3 -- Before we abandon
-- Helper function that returns a new configured socket
-- connected to the Hello World server
--
local function s_client_socket(context)
printf ("I: connecting to server...\n")
local client = context:socket(zmq.REQ)
client:connect("tcp://localhost:5555")
-- Configure socket to not wait at close time
client:setopt(zmq.LINGER, 0)
return client
end
s_version_assert (2, 1)
local context = zmq.init(1)
local client = s_client_socket (context)
local sequence = 0
local retries_left = REQUEST_RETRIES
local expect_reply = true
local poller = zmq.poller(1)
local function client_cb()
-- We got a reply from the server, must match sequence
--local reply = assert(client:recv(zmq.NOBLOCK))
local reply = client:recv()
if (tonumber(reply) == sequence) then
printf ("I: server replied OK (%s)\n", reply)
retries_left = REQUEST_RETRIES
expect_reply = false
else
printf ("E: malformed reply from server: %s\n", reply)
end
end
poller:add(client, zmq.POLLIN, client_cb)
while (retries_left > 0) do
sequence = sequence + 1
-- We send a request, then we work to get a reply
local request = string.format("%d", sequence)
client:send(request)
expect_reply = true
while (expect_reply) do
-- Poll socket for a reply, with timeout
local cnt = assert(poller:poll(REQUEST_TIMEOUT * 1000))
-- Check if there was no reply
if (cnt == 0) then
retries_left = retries_left - 1
if (retries_left == 0) then
printf ("E: server seems to be offline, abandoning\n")
break
else
printf ("W: no response from server, retrying...\n")
-- Old socket is confused; close it and open a new one
poller:remove(client)
client:close()
client = s_client_socket (context)
poller:add(client, zmq.POLLIN, client_cb)
-- Send request again, on new socket
client:send(request)
end
end
end
end
client:close()
context:term()
lpclient: Lazy Pirate client in Node.js
lpclient: Lazy Pirate client in Objective-C
lpclient: Lazy Pirate client in ooc
lpclient: Lazy Pirate client in Perl
# Lazy Pirate client in Perl
# Use poll to do a safe request-reply
# To run, start lpserver.pl then randomly kill/restart it
use strict;
use warnings;
use v5.10;
use ZMQ::FFI;
use ZMQ::FFI::Constants qw(ZMQ_REQ);
use EV;
my $REQUEST_TIMEOUT = 2500; # msecs
my $REQUEST_RETRIES = 3; # Before we abandon
my $SERVER_ENDPOINT = 'tcp://localhost:5555';
my $ctx = ZMQ::FFI->new();
say 'I: connecting to server...';
my $client = $ctx->socket(ZMQ_REQ);
$client->connect($SERVER_ENDPOINT);
my $sequence = 0;
my $retries_left = $REQUEST_RETRIES;
REQUEST_LOOP:
while ($retries_left) {
# We send a request, then we work to get a reply
my $request = ++$sequence;
$client->send($request);
my $expect_reply = 1;
RETRY_LOOP:
while ($expect_reply) {
# Poll socket for a reply, with timeout
EV::once $client->get_fd, EV::READ, $REQUEST_TIMEOUT / 1000, sub {
my ($revents) = @_;
# Here we process a server reply and exit our loop if the
# reply is valid. If we didn't get a reply we close the client
# socket and resend the request. We try a number of times
# before finally abandoning:
if ($revents == EV::READ) {
while ($client->has_pollin) {
# We got a reply from the server, must match sequence
my $reply = $client->recv();
if ($reply == $sequence) {
say "I: server replied OK ($reply)";
$retries_left = $REQUEST_RETRIES;
$expect_reply = 0;
}
else {
say "E: malformed reply from server: $reply";
}
}
}
elsif (--$retries_left == 0) {
say 'E: server seems to be offline, abandoning';
}
else {
say "W: no response from server, retrying...";
# Old socket is confused; close it and open a new one
$client->close;
say "reconnecting to server...";
$client = $ctx->socket(ZMQ_REQ);
$client->connect($SERVER_ENDPOINT);
# Send request again, on new socket
$client->send($request);
}
};
last RETRY_LOOP if $retries_left == 0;
EV::run;
}
}
lpclient: Lazy Pirate client in PHP
<?php
/*
* Lazy Pirate client
* Use zmq_poll to do a safe request-reply
* To run, start lpserver and then randomly kill/restart it
*
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
define("REQUEST_TIMEOUT", 2500); // msecs, (> 1000!)
define("REQUEST_RETRIES", 3); // Before we abandon
/*
* Helper function that returns a new configured socket
* connected to the Hello World server
*/
function client_socket(ZMQContext $context)
{
echo "I: connecting to server...", PHP_EOL;
$client = new ZMQSocket($context,ZMQ::SOCKET_REQ);
$client->connect("tcp://localhost:5555");
// Configure socket to not wait at close time
$client->setSockOpt(ZMQ::SOCKOPT_LINGER, 0);
return $client;
}
$context = new ZMQContext();
$client = client_socket($context);
$sequence = 0;
$retries_left = REQUEST_RETRIES;
$read = $write = array();
while ($retries_left) {
// We send a request, then we work to get a reply
$client->send(++$sequence);
$expect_reply = true;
while ($expect_reply) {
// Poll socket for a reply, with timeout
$poll = new ZMQPoll();
$poll->add($client, ZMQ::POLL_IN);
$events = $poll->poll($read, $write, REQUEST_TIMEOUT);
// If we got a reply, process it
if ($events > 0) {
// We got a reply from the server, must match sequence
$reply = $client->recv();
if (intval($reply) == $sequence) {
printf ("I: server replied OK (%s)%s", $reply, PHP_EOL);
$retries_left = REQUEST_RETRIES;
$expect_reply = false;
} else {
printf ("E: malformed reply from server: %s%s", $reply, PHP_EOL);
}
} elseif (--$retries_left == 0) {
echo "E: server seems to be offline, abandoning", PHP_EOL;
break;
} else {
echo "W: no response from server, retrying...", PHP_EOL;
// Old socket will be confused; close it and open a new one
$client = client_socket($context);
// Send request again, on new socket
$client->send($sequence);
}
}
}
lpclient: Lazy Pirate client in Python
#
# Lazy Pirate client
# Use zmq_poll to do a safe request-reply
# To run, start lpserver and then randomly kill/restart it
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
import itertools
import logging
import sys
import zmq
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)
REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"
context = zmq.Context()
logging.info("Connecting to server…")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
for sequence in itertools.count():
request = str(sequence).encode()
logging.info("Sending (%s)", request)
client.send(request)
retries_left = REQUEST_RETRIES
while True:
if (client.poll(REQUEST_TIMEOUT) & zmq.POLLIN) != 0:
reply = client.recv()
if int(reply) == sequence:
logging.info("Server replied OK (%s)", reply)
retries_left = REQUEST_RETRIES
break
else:
logging.error("Malformed reply from server: %s", reply)
continue
retries_left -= 1
logging.warning("No response from server")
# Socket is confused. Close and remove it.
client.setsockopt(zmq.LINGER, 0)
client.close()
if retries_left == 0:
logging.error("Server seems to be offline, abandoning")
sys.exit()
logging.info("Reconnecting to server…")
# Create new connection
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
logging.info("Resending (%s)", request)
client.send(request)
lpclient: Lazy Pirate client in Q
lpclient: Lazy Pirate client in Racket
lpclient: Lazy Pirate client in Ruby
#!/usr/bin/env ruby
# Author: Han Holl <han.holl@pobox.com>
require 'rubygems'
require 'ffi-rzmq'
class LPClient
def initialize(connect, retries = nil, timeout = nil)
@connect = connect
@retries = (retries || 3).to_i
@timeout = (timeout || 10).to_i
@ctx = ZMQ::Context.new(1)
client_sock
at_exit do
@socket.close
end
end
def client_sock
@socket = @ctx.socket(ZMQ::REQ)
@socket.setsockopt(ZMQ::LINGER, 0)
@socket.connect(@connect)
end
def send(message)
@retries.times do |tries|
raise("Send: #{message} failed") unless @socket.send(message)
if ZMQ.select( [@socket], nil, nil, @timeout)
yield @socket.recv
return
else
@socket.close
client_sock
end
end
raise 'Server down'
end
end
if $0 == __FILE__
server = LPClient.new(ARGV[0] || "tcp://localhost:5555", ARGV[1], ARGV[2])
count = 0
loop do
request = "#{count}"
count += 1
server.send(request) do |reply|
if reply == request
puts("I: server replied OK (#{reply})")
else
puts("E: malformed reply from server: #{reply}")
end
end
end
puts 'success'
end
lpclient: Lazy Pirate client in Rust
use std::convert::TryInto;
const REQUEST_TIMEOUT: i64 = 2500;
const REQUEST_RETRIES: usize = 3;
const SERVER_ENDPOINT: &'static str = "tcp://localhost:5555";
fn connect(context: &zmq::Context) -> zmq::Socket {
let socket = context.socket(zmq::REQ).unwrap();
socket.set_linger(0).unwrap();
socket.connect(SERVER_ENDPOINT).unwrap();
socket
}
fn reconnect(context: &zmq::Context, socket: zmq::Socket) -> zmq::Socket {
drop(socket);
connect(context)
}
fn main() {
let mut retries_left = REQUEST_RETRIES;
let context = zmq::Context::new();
let mut socket = connect(&context);
let mut i = 0i64;
loop {
let value = i.to_le_bytes();
i += 1;
while retries_left > 0 {
retries_left -= 1;
let request = zmq::Message::from(&value.as_slice());
println!("Sending request {request:?}");
socket.send(request, 0).unwrap();
if socket.poll(zmq::POLLIN, REQUEST_TIMEOUT).unwrap() != 0 {
let reply = socket.recv_msg(0).unwrap();
let reply_content =
TryInto::<[u8; 8]>::try_into(&*reply).and_then(|x| Ok(i64::from_le_bytes(x)));
if let Ok(i) = reply_content {
println!("Server replied OK {i}");
retries_left = REQUEST_RETRIES;
break;
} else {
println!("Malformed message: {reply:?}");
continue;
}
}
println!("No response from Server");
println!("Reconnecting server...");
socket = reconnect(&context, socket);
println!("Resending Request");
}
if retries_left == 0 {
println!("Server is offline. Goodbye...");
return;
}
}
}
lpclient: Lazy Pirate client in Scala
lpclient: Lazy Pirate client in Tcl
#
# Lazy Pirate client
# Use zmq_poll to do a safe request-reply
# To run, start lpserver and then randomly kill/restart it
#
package require zmq
set REQUEST_TIMEOUT 2500 ;# msecs, (> 1000!)
set REQUEST_RETRIES 3 ;# Before we abandon
set SERVER_ENDPOINT "tcp://localhost:5555"
zmq context context
puts "I: connecting to server..."
zmq socket client context REQ
client connect $SERVER_ENDPOINT
set sequence 0
set retries_left $REQUEST_RETRIES
while {$retries_left} {
# We send a request, then we work to get a reply
client send [incr sequence]
set expect_reply 1
while {$expect_reply} {
# Poll socket for a reply, with timeout
set rpoll_set [zmq poll {{client {POLLIN}}} $REQUEST_TIMEOUT]
# If we got a reply, process it
if {[llength $rpoll_set] && [lindex $rpoll_set 0 0] eq "client"} {
# We got a reply from the server, must match sequence
set reply [client recv]
if {$reply eq $sequence} {
puts "I: server replied OK ($reply)"
set retries_left $REQUEST_RETRIES
set expect_reply 0
} else {
puts "E: malformed reply from server: $reply"
}
} elseif {[incr retries_left -1] <= 0} {
puts "E: server seems to be offline, abandoning"
set retries_left 0
break
} else {
puts "W: no response from server, retrying..."
# Old socket is confused; close it and open a new one
client close
puts "I: connecting to server..."
zmq socket client context REQ
client connect $SERVER_ENDPOINT
# Send request again, on new socket
client send $sequence
}
}
}
client close
context term
lpclient: Lazy Pirate client in OCaml
Run this together with the matching server:
lpserver: Lazy Pirate server in Ada
lpserver: Lazy Pirate server in Basic
lpserver: Lazy Pirate server in C
// Lazy Pirate server
// Binds REQ socket to tcp://*:5555
// Like hwserver except:
// - echoes request as-is
// - randomly runs slowly, or exits to simulate a crash.
#include "zhelpers.h"
#include <unistd.h>
int main (void)
{
srandom ((unsigned) time (NULL));
void *context = zmq_ctx_new ();
void *server = zmq_socket (context, ZMQ_REP);
zmq_bind (server, "tcp://*:5555");
int cycles = 0;
while (1) {
char *request = s_recv (server);
cycles++;
// Simulate various problems, after a few cycles
if (cycles > 3 && randof (3) == 0) {
printf ("I: simulating a crash\n");
break;
}
else
if (cycles > 3 && randof (3) == 0) {
printf ("I: simulating CPU overload\n");
sleep (2);
}
printf ("I: normal request (%s)\n", request);
sleep (1); // Do some heavy work
s_send (server, request);
free (request);
}
zmq_close (server);
zmq_ctx_destroy (context);
return 0;
}
lpserver: Lazy Pirate server in C++
//
// Lazy Pirate server
// Binds REQ socket to tcp://*:5555
// Like hwserver except:
// - echoes request as-is
// - randomly runs slowly, or exits to simulate a crash.
//
#include "zhelpers.hpp"
int main ()
{
srandom ((unsigned) time (NULL));
zmq::context_t context(1);
zmq::socket_t server(context, ZMQ_REP);
server.bind("tcp://*:5555");
int cycles = 0;
while (1) {
std::string request = s_recv (server);
cycles++;
// Simulate various problems, after a few cycles
if (cycles > 3 && within (3) == 0) {
std::cout << "I: simulating a crash" << std::endl;
break;
}
else
if (cycles > 3 && within (3) == 0) {
std::cout << "I: simulating CPU overload" << std::endl;
sleep (2);
}
std::cout << "I: normal request (" << request << ")" << std::endl;
sleep (1); // Do some heavy work
s_send (server, request);
}
return 0;
}
lpserver: Lazy Pirate server in C#
lpserver: Lazy Pirate server in CL
lpserver: Lazy Pirate server in Delphi
program lpserver;
//
// Lazy Pirate server
// Binds REQ socket to tcp://*:5555
// Like hwserver except:
// - echoes request as-is
// - randomly runs slowly, or exits to simulate a crash.
// @author Varga Balazs <bb.varga@gmail.com>
//
{$APPTYPE CONSOLE}
uses
SysUtils
, zmqapi
;
var
context: TZMQContext;
server: TZMQSocket;
cycles: Integer;
request: Utf8String;
begin
Randomize;
context := TZMQContext.create;
server := context.socket( stRep );
server.bind( 'tcp://*:5555' );
cycles := 0;
while not context.Terminated do
try
server.recv( request );
inc( cycles );
// Simulate various problems, after a few cycles
if ( cycles > 3 ) and ( random(3) = 0) then
begin
Writeln( 'I: simulating a crash' );
break;
end else
if ( cycles > 3 ) and ( random(3) = 0 ) then
begin
Writeln( 'I: simulating CPU overload' );
sleep(2000);
end;
Writeln( Format( 'I: normal request (%s)', [request] ) );
sleep (1000); // Do some heavy work
server.send( request );
except
end;
context.Free;
end.
lpserver: Lazy Pirate server in Erlang
lpserver: Lazy Pirate server in Elixir
lpserver: Lazy Pirate server in F#
lpserver: Lazy Pirate server in Felix
lpserver: Lazy Pirate server in Go
// Lazy Pirate server
// Binds REQ socket to tcp://*:5555
// Like hwserver except:
// - echoes request as-is
// - randomly runs slowly, or exits to simulate a crash.
//
// Author: iano <scaly.iano@gmail.com>
// Based on C example
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq"
"math/rand"
"time"
)
const (
SERVER_ENDPOINT = "tcp://*:5555"
)
func main() {
src := rand.NewSource(time.Now().UnixNano())
random := rand.New(src)
context, _ := zmq.NewContext()
defer context.Close()
server, _ := context.NewSocket(zmq.REP)
defer server.Close()
server.Bind(SERVER_ENDPOINT)
for cycles := 1; ; cycles++ {
request, _ := server.Recv(0)
// Simulate various problems, after a few cycles
if cycles > 3 {
switch r := random.Intn(3); r {
case 0:
fmt.Println("I: Simulating a crash")
return
case 1:
fmt.Println("I: simulating CPU overload")
time.Sleep(2 * time.Second)
}
}
fmt.Printf("I: normal request (%s)\n", request)
time.Sleep(1 * time.Second) // Do some heavy work
server.Send(request, 0)
}
}
lpserver: Lazy Pirate server in Haskell
{--
Lazy Pirate server in Haskell
--}
module Main where
import System.ZMQ4.Monadic
import System.Random (randomRIO)
import System.Exit (exitSuccess)
import Control.Monad (forever, when)
import Control.Concurrent (threadDelay)
import Data.ByteString.Char8 (pack, unpack)
main :: IO ()
main =
runZMQ $ do
server <- socket Rep
bind server "tcp://*:5555"
sendClient 0 server
sendClient :: Int -> Socket z Rep -> ZMQ z ()
sendClient cycles server = do
req <- receive server
chance <- liftIO $ randomRIO (0::Int, 3)
when (cycles > 3 && chance == 0) $ do
liftIO crash
chance' <- liftIO $ randomRIO (0::Int, 3)
when (cycles > 3 && chance' == 0) $ do
liftIO overload
liftIO $ putStrLn $ "I: normal request " ++ (unpack req)
liftIO $ threadDelay $ 1 * 1000 * 1000
send server [] req
sendClient (cycles+1) server
crash = do
putStrLn "I: Simulating a crash"
exitSuccess
overload = do
putStrLn "I: Simulating CPU overload"
threadDelay $ 2 * 1000 * 1000
lpserver: Lazy Pirate server in Haxe
package ;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
/**
* Lazy Pirate server
* Binds REP socket to tcp://*:5555
* Like HWServer except:
* - echoes request as-is
* - randomly runs slowly, or exists to simulate a crash.
*
* @see http://zguide.zeromq.org/page:all#Client-side-Reliability-Lazy-Pirate-Pattern
*
*/
class LPServer
{
public static function main() {
Lib.println("** LPServer (see: http://zguide.zeromq.org/page:all#Client-side-Reliability-Lazy-Pirate-Pattern)");
var ctx = new ZContext();
var server = ctx.createSocket(ZMQ_REP);
server.bind("tcp://*:5555");
var cycles = 0;
while (true) {
var requestFrame = ZFrame.recvFrame(server);
cycles++;
// Simulate various problems, after a few cycles
if (cycles > 3 && ZHelpers.randof(3) == 0) {
Lib.println("I: simulating a crash");
break;
}
else if (cycles > 3 && ZHelpers.randof(3) == 0) {
Lib.println("I: simulating CPU overload");
Sys.sleep(2.0);
}
Lib.println("I: normal request (" + requestFrame.toString() + ")");
Sys.sleep(1.0); // Do some heavy work
requestFrame.send(server);
requestFrame.destroy();
}
server.close();
ctx.destroy();
}
}
lpserver: Lazy Pirate server in Java
package guide;
import java.util.Random;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZContext;
//
// Lazy Pirate server
// Binds REQ socket to tcp://*:5555
// Like hwserver except:
// - echoes request as-is
// - randomly runs slowly, or exits to simulate a crash.
//
public class lpserver
{
public static void main(String[] argv) throws Exception
{
Random rand = new Random(System.nanoTime());
try (ZContext context = new ZContext()) {
Socket server = context.createSocket(SocketType.REP);
server.bind("tcp://*:5555");
int cycles = 0;
while (true) {
String request = server.recvStr();
cycles++;
// Simulate various problems, after a few cycles
if (cycles > 3 && rand.nextInt(3) == 0) {
System.out.println("I: simulating a crash");
break;
}
else if (cycles > 3 && rand.nextInt(3) == 0) {
System.out.println("I: simulating CPU overload");
Thread.sleep(2000);
}
System.out.printf("I: normal request (%s)\n", request);
Thread.sleep(1000); // Do some heavy work
server.send(request);
}
}
}
}
lpserver: Lazy Pirate server in Julia
lpserver: Lazy Pirate server in Lua
--
-- Lazy Pirate server
-- Binds REQ socket to tcp://*:5555
-- Like hwserver except:
-- - echoes request as-is
-- - randomly runs slowly, or exits to simulate a crash.
--
-- Author: Robert G. Jakabosky <bobby@sharedrealm.com>
--
require"zmq"
require"zhelpers"
math.randomseed(os.time())
local context = zmq.init(1)
local server = context:socket(zmq.REP)
server:bind("tcp://*:5555")
local cycles = 0
while true do
local request = server:recv()
cycles = cycles + 1
-- Simulate various problems, after a few cycles
if (cycles > 3 and randof (3) == 0) then
printf("I: simulating a crash\n")
break
elseif (cycles > 3 and randof (3) == 0) then
printf("I: simulating CPU overload\n")
s_sleep(2000)
end
printf("I: normal request (%s)\n", request)
s_sleep(1000) -- Do some heavy work
server:send(request)
end
server:close()
context:term()
lpserver: Lazy Pirate server in Node.js
lpserver: Lazy Pirate server in Objective-C
lpserver: Lazy Pirate server in ooc
lpserver: Lazy Pirate server in Perl
# Lazy Pirate server in Perl
# Binds REQ socket to tcp://*:5555
# Like hwserver except:
# - echoes request as-is
# - randomly runs slowly, or exits to simulate a crash.
use strict;
use warnings;
use v5.10;
use ZMQ::FFI;
use ZMQ::FFI::Constants qw(ZMQ_REP);
my $context = ZMQ::FFI->new();
my $server = $context->socket(ZMQ_REP);
$server->bind('tcp://*:5555');
my $cycles = 0;
SERVER_LOOP:
while (1) {
my $request = $server->recv();
$cycles++;
# Simulate various problems, after a few cycles
if ($cycles > 3 && int(rand(3)) == 0) {
say "I: simulating a crash";
last SERVER_LOOP;
}
elsif ($cycles > 3 && int(rand(3)) == 0) {
say "I: simulating CPU overload";
sleep 2;
}
say "I: normal request ($request)";
sleep 1; # Do some heavy work
$server->send($request);
}
lpserver: Lazy Pirate server in PHP
<?php
/*
* Lazy Pirate server
* Binds REQ socket to tcp://*:5555
* Like hwserver except:
* - echoes request as-is
* - randomly runs slowly, or exits to simulate a crash.
*
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext();
$server = new ZMQSocket($context, ZMQ::SOCKET_REP);
$server->bind("tcp://*:5555");
$cycles = 0;
while (true) {
$request = $server->recv();
$cycles++;
// Simulate various problems, after a few cycles
if ($cycles > 3 && rand(0, 3) == 0) {
echo "I: simulating a crash", PHP_EOL;
break;
} elseif ($cycles > 3 && rand(0, 3) == 0) {
echo "I: simulating CPU overload", PHP_EOL;
sleep(5);
}
printf ("I: normal request (%s)%s", $request, PHP_EOL);
sleep(1); // Do some heavy work
$server->send($request);
}
lpserver: Lazy Pirate server in Python
#
# Lazy Pirate server
# Binds REQ socket to tcp://*:5555
# Like hwserver except:
# - echoes request as-is
# - randomly runs slowly, or exits to simulate a crash.
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
from random import randint
import itertools
import logging
import time
import zmq
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind("tcp://*:5555")
for cycles in itertools.count():
request = server.recv()
# Simulate various problems, after a few cycles
if cycles > 3 and randint(0, 3) == 0:
logging.info("Simulating a crash")
break
elif cycles > 3 and randint(0, 3) == 0:
logging.info("Simulating CPU overload")
time.sleep(2)
logging.info("Normal request (%s)", request)
time.sleep(1) # Do some heavy work
server.send(request)
lpserver: Lazy Pirate server in Q
lpserver: Lazy Pirate server in Racket
lpserver: Lazy Pirate server in Ruby
#!/usr/bin/env ruby
# Author: Han Holl <han.holl@pobox.com>
require 'rubygems'
require 'zmq'
class LPServer
def initialize(connect)
@ctx = ZMQ::Context.new(1)
@socket = @ctx.socket(ZMQ::REP)
@socket.bind(connect)
end
def run
begin
loop do
rsl = yield @socket.recv
@socket.send rsl
end
ensure
@socket.close
@ctx.close
end
end
end
if $0 == __FILE__
cycles = 0
srand
LPServer.new(ARGV[0] || "tcp://*:5555").run do |request|
cycles += 1
if cycles > 3
if rand(3) == 0
puts "I: simulating a crash"
break
elsif rand(3) == 0
puts "I: simulating CPU overload"
sleep(3)
end
end
puts "I: normal request (#{request})"
sleep(1)
request
end
end
lpserver: Lazy Pirate server in Rust
use rand::{thread_rng, Rng};
use std::time::Duration;
fn main() {
let context = zmq::Context::new();
let server = context.socket(zmq::REP).unwrap();
server.bind("tcp://*:5555").unwrap();
let mut i = 0;
loop {
i += 1;
let request = server.recv_msg(0).unwrap();
println!("Got Request: {request:?}");
server.send(request, 0).unwrap();
std::thread::sleep(Duration::from_secs(1));
if (i > 3) && (thread_rng().gen_range(0..3) == 0) {
// simulate a crash
println!("Oh no! Server crashed.");
break;
}
if (i > 3) && (thread_rng().gen_range(0..3) == 0) {
// simulate overload
println!("Server is busy.");
std::thread::sleep(Duration::from_secs(2));
}
}
}
lpserver: Lazy Pirate server in Scala
import org.zeromq.ZMQ;
import java.util.Random;
/*
* Lazy Pirate server
* @author Zac Li
* @email zac.li.cmu@gmail.com
*/
object lpserver{
def main (args : Array[String]) {
val rand = new Random(System.nanoTime())
val context = ZMQ.context(1)
val server = context.socket(ZMQ.REP)
server.bind("tcp://*:5555")
val cycles = 0;
while (true) {
val request = server.recvStr()
cycles++
// Simulate various problems, after a few cycles
if (cycles > 3 && rand.nextInt(3) == 0) {
println("I: simulating a crash")
break
} else if (cycles > 3 && rand.nextInt(3) == 0) {
println("I: simulating CPU overload")
Thread.sleep(2000)
}
println(f"I: normal request (%s)\n", request)
Thread.sleep(1000)
server.send(request)
}
server close()
context term()
}
}
lpserver: Lazy Pirate server in Tcl
#
# Lazy Pirate server
# Binds REQ socket to tcp://*:5555
# Like hwserver except:
# - echoes request as-is
# - randomly runs slowly, or exits to simulate a crash.
#
package require zmq
expr {srand([pid])}
zmq context context
zmq socket server context REP
server bind "tcp://*:5555"
set cycles 0
while {1} {
set request [server recv]
incr cycles
# Simulate various problems, after a few cycles
if {$cycles > 3 && int(rand()*3) == 0} {
puts "I: simulating a crash"
break;
} elseif {$cycles > 3 && int(rand()*3) == 0} {
puts "I: simulating CPU overload"
after 2000
}
puts "I: normal request ($request)"
after 1000 ;# Do some heavy work
server send $request
}
server close
context term
lpserver: Lazy Pirate server in OCaml
To run this test case, start the client and the server in two console windows. The server will randomly misbehave after a few messages. You can check the client’s response. Here is typical output from the server:
I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash
And here is the client’s response:
I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning
The client sequences each message and checks that replies come back exactly in order: that no requests or replies are lost, and no replies come back more than once, or out of order. Run the test a few times until you’re convinced that this mechanism actually works. You don’t need sequence numbers in a production application; they just help us trust our design.
The client uses a REQ socket, and does the brute force close/reopen because REQ sockets impose that strict send/receive cycle. You might be tempted to use a DEALER instead, but it would not be a good decision. First, it would mean emulating the secret sauce that REQ does with envelopes (if you’ve forgotten what that is, it’s a good sign you don’t want to have to do it). Second, it would mean potentially getting back replies that you didn’t expect.
Handling failures only at the client works when we have a set of clients talking to a single server. It can handle a server crash, but only if recovery means restarting that same server. If there’s a permanent error, such as a dead power supply on the server hardware, this approach won’t work. Because the application code in servers is usually the biggest source of failures in any architecture, depending on a single server is not a great idea.
So, pros and cons:
- Pro: simple to understand and implement.
- Pro: works easily with existing client and server application code.
- Pro: ZeroMQ automatically retries the actual reconnection until it works.
- Con: doesn’t failover to backup or alternate servers.
Basic Reliable Queuing (Simple Pirate Pattern) #
Our second approach extends the Lazy Pirate pattern with a queue proxy that lets us talk, transparently, to multiple servers, which we can more accurately call “workers”. We’ll develop this in stages, starting with a minimal working model, the Simple Pirate pattern.
In all these Pirate patterns, workers are stateless. If the application requires some shared state, such as a shared database, we don’t know about it as we design our messaging framework. Having a queue proxy means workers can come and go without clients knowing anything about it. If one worker dies, another takes over. This is a nice, simple topology with only one real weakness, namely the central queue itself, which can become a problem to manage, and a single point of failure.
The basis for the queue proxy is the load balancing broker from Chapter 3 - Advanced Request-Reply Patterns. What is the very minimum we need to do to handle dead or blocked workers? Turns out, it’s surprisingly little. We already have a retry mechanism in the client. So using the load balancing pattern will work pretty well. This fits with ZeroMQ’s philosophy that we can extend a peer-to-peer pattern like request-reply by plugging naive proxies in the middle.
We don’t need a special client; we’re still using the Lazy Pirate client. Here is the queue, which is identical to the main task of the load balancing broker:
spqueue: Simple Pirate queue in Ada
spqueue: Simple Pirate queue in Basic
spqueue: Simple Pirate queue in C
// Simple Pirate broker
// This is identical to load-balancing pattern, with no reliability
// mechanisms. It depends on the client for recovery. Runs forever.
#include "czmq.h"
#define WORKER_READY "\001" // Signals worker is ready
int main (void)
{
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555"); // For clients
zsocket_bind (backend, "tcp://*:5556"); // For workers
// Queue of available workers
zlist_t *workers = zlist_new ();
// The body of this example is exactly the same as lbbroker2.
// .skip
while (true) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
if (rc == -1)
break; // Interrupted
// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
// Use worker identity for load-balancing
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // Interrupted
zframe_t *identity = zmsg_unwrap (msg);
zlist_append (workers, identity);
// Forward message to client if it's not a READY
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), WORKER_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// Get client request, route to first available worker
zmsg_t *msg = zmsg_recv (frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
zmsg_send (&msg, backend);
}
}
}
// When we're done, clean up properly
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
// .until
}
spqueue: Simple Pirate queue in C++
//
// Simple Pirate queue
// This is identical to the LRU pattern, with no reliability mechanisms
// at all. It depends on the client for recovery. Runs forever.
//
// Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at
#include "zmsg.hpp"
#include <queue>
#define MAX_WORKERS 100
int main (void)
{
s_version_assert (2, 1);
// Prepare our context and sockets
zmq::context_t context(1);
zmq::socket_t frontend (context, ZMQ_ROUTER);
zmq::socket_t backend (context, ZMQ_ROUTER);
frontend.bind("tcp://*:5555"); // For clients
backend.bind("tcp://*:5556"); // For workers
// Queue of available workers
std::queue<std::string> worker_queue;
while (1) {
zmq::pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
if (worker_queue.size())
zmq::poll (items, 2, -1);
else
zmq::poll (items, 1, -1);
// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
zmsg zm(backend);
//zmsg_t *zmsg = zmsg_recv (backend);
// Use worker address for LRU routing
assert (worker_queue.size() < MAX_WORKERS);
worker_queue.push(zm.unwrap());
// Return reply to client if it's not a READY
if (strcmp (zm.address(), "READY") == 0)
zm.clear();
else
zm.send (frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// Now get next client request, route to next worker
zmsg zm(frontend);
// REQ socket in worker needs an envelope delimiter
zm.wrap(worker_queue.front().c_str(), "");
zm.send(backend);
// Dequeue and drop the next worker address
worker_queue.pop();
}
}
// We never exit the main loop
return 0;
}
spqueue: Simple Pirate queue in C#
spqueue: Simple Pirate queue in CL
spqueue: Simple Pirate queue in Delphi
program spqueue;
//
// Simple Pirate broker
// This is identical to load-balancing pattern, with no reliability
// mechanisms. It depends on the client for recovery. Runs forever.
// @author Varga Balazs <bb.varga@gmail.com>
//
{$APPTYPE CONSOLE}
uses
SysUtils
, zmqapi
;
const
WORKER_READY = '\001'; // Signals worker is ready
var
ctx: TZMQContext;
frontend,
backend: TZMQSocket;
workers: TZMQMsg;
poller: TZMQPoller;
pc: Integer;
msg: TZMQMsg;
identity,
frame: TZMQFrame;
begin
ctx := TZMQContext.create;
frontend := ctx.Socket( stRouter );
backend := ctx.Socket( stRouter );
frontend.bind( 'tcp://*:5555' ); // For clients
backend.bind( 'tcp://*:5556' ); // For workers
// Queue of available workers
workers := TZMQMsg.create;
poller := TZMQPoller.Create( true );
poller.Register( backend, [pePollIn] );
poller.Register( frontend, [pePollIn] );
// The body of this example is exactly the same as lbbroker2.
while not ctx.Terminated do
try
// Poll frontend only if we have available workers
if workers.size > 0 then
pc := 2
else
pc := 1;
poller.poll( 1000, pc );
// Handle worker activity on backend
if pePollIn in poller.PollItem[0].revents then
begin
// Use worker identity for load-balancing
backend.recv( msg );
identity := msg.unwrap;
workers.add( identity );
// Forward message to client if it's not a READY
frame := msg.first;
if frame.asUtf8String = WORKER_READY then
begin
msg.Free;
msg := nil;
end else
frontend.send( msg );
end;
if pePollIn in poller.PollItem[1].revents then
begin
// Get client request, route to first available worker
frontend.recv( msg );
msg.wrap( workers.pop );
backend.send( msg );
end;
except
end;
workers.Free;
ctx.Free;
end.
spqueue: Simple Pirate queue in Erlang
spqueue: Simple Pirate queue in Elixir
spqueue: Simple Pirate queue in F#
spqueue: Simple Pirate queue in Felix
spqueue: Simple Pirate queue in Go
// Simple Pirate broker
// This is identical to load-balancing pattern, with no reliability
// mechanisms. It depends on the client for recovery. Runs forever.
//
// Author: iano <scaly.iano@gmail.com>
// Based on C & Python example
package main
import (
zmq "github.com/alecthomas/gozmq"
)
const LRU_READY = "\001"
func main() {
context, _ := zmq.NewContext()
defer context.Close()
frontend, _ := context.NewSocket(zmq.ROUTER)
defer frontend.Close()
frontend.Bind("tcp://*:5555") // For clients
backend, _ := context.NewSocket(zmq.ROUTER)
defer backend.Close()
backend.Bind("tcp://*:5556") // For workers
// Queue of available workers
workers := make([][]byte, 0, 0)
for {
items := zmq.PollItems{
zmq.PollItem{Socket: backend, Events: zmq.POLLIN},
zmq.PollItem{Socket: frontend, Events: zmq.POLLIN},
}
// Poll frontend only if we have available workers
if len(workers) > 0 {
zmq.Poll(items, -1)
} else {
zmq.Poll(items[:1], -1)
}
// Handle worker activity on backend
if items[0].REvents&zmq.POLLIN != 0 {
// Use worker identity for load-balancing
msg, err := backend.RecvMultipart(0)
if err != nil {
panic(err) // Interrupted
}
address := msg[0]
workers = append(workers, address)
// Forward message to client if it's not a READY
if reply := msg[2:]; string(reply[0]) != LRU_READY {
frontend.SendMultipart(reply, 0)
}
}
if items[1].REvents&zmq.POLLIN != 0 {
// Get client request, route to first available worker
msg, err := frontend.RecvMultipart(0)
if err != nil {
panic(err) // Interrupted
}
last := workers[len(workers)-1]
workers = workers[:len(workers)-1]
request := append([][]byte{last, nil}, msg...)
backend.SendMultipart(request, 0)
}
}
}
spqueue: Simple Pirate queue in Haskell
{--
Simple Pirate queue in Haskell
--}
module Main where
import System.ZMQ4.Monadic
import Control.Concurrent (threadDelay)
import Control.Applicative ((<$>))
import Control.Monad (when)
import Data.ByteString.Char8 (pack, unpack, empty)
import Data.List (intercalate)
type SockID = String
workerReady = "\001"
main :: IO ()
main =
runZMQ $ do
frontend <- socket Router
bind frontend "tcp://*:5555"
backend <- socket Router
bind backend "tcp://*:5556"
pollPeers frontend backend []
pollPeers :: Socket z Router -> Socket z Router -> [SockID] -> ZMQ z ()
pollPeers frontend backend workers = do
let toPoll = getPollList workers
evts <- poll 0 toPoll
workers' <- getBackend backend frontend evts workers
workers'' <- getFrontend frontend backend evts workers'
pollPeers frontend backend workers''
where getPollList [] = [Sock backend [In] Nothing]
getPollList _ = [Sock backend [In] Nothing, Sock frontend [In] Nothing]
getBackend :: Socket z Router -> Socket z Router ->
[[Event]] -> [SockID] -> ZMQ z ([SockID])
getBackend backend frontend evts workers =
if (In `elem` (evts !! 0))
then do
wkrID <- receive backend
id <- (receive backend >> receive backend)
msg <- (receive backend >> receive backend)
when ((unpack msg) /= workerReady) $ do
liftIO $ putStrLn $ "I: sending backend - " ++ (unpack msg)
send frontend [SendMore] id
send frontend [SendMore] empty
send frontend [] msg
return $ (unpack wkrID):workers
else return workers
getFrontend :: Socket z Router -> Socket z Router ->
[[Event]] -> [SockID] -> ZMQ z [SockID]
getFrontend frontend backend evts workers =
if (length evts > 1 && In `elem` (evts !! 1))
then do
id <- receive frontend
msg <- (receive frontend >> receive frontend)
liftIO $ putStrLn $ "I: msg on frontend - " ++ (unpack msg)
let wkrID = head workers
send backend [SendMore] (pack wkrID)
send backend [SendMore] empty
send backend [SendMore] id
send backend [SendMore] empty
send backend [] msg
return $ tail workers
else return workers
spqueue: Simple Pirate queue in Haxe
package ;
import haxe.Stack;
import neko.Lib;
import org.zeromq.ZFrame;
import org.zeromq.ZContext;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import org.zeromq.ZMQException;
/**
* Simple Pirate queue
* This is identical to the LRU pattern, with no reliability mechanisms
* at all. It depends on the client for recovery. Runs forever.
*
* @see http://zguide.zeromq.org/page:all#Basic-Reliable-Queuing-Simple-Pirate-Pattern
*/
class SPQueue
{
// Signals workers are ready
private static inline var LRU_READY:String = String.fromCharCode(1);
public static function main() {
Lib.println("** SPQueue (see: http://zguide.zeromq.org/page:all#Basic-Reliable-Queuing-Simple-Pirate-Pattern)");
// Prepare our context and sockets
var context:ZContext = new ZContext();
var frontend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
var backend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
frontend.bind("tcp://*:5555"); // For clients
backend.bind("tcp://*:5556"); // For workers
// Queue of available workers
var workerQueue:List<ZFrame> = new List<ZFrame>();
var poller:ZMQPoller = new ZMQPoller();
poller.registerSocket(backend, ZMQ.ZMQ_POLLIN());
while (true) {
poller.unregisterSocket(frontend);
if (workerQueue.length > 0) {
// Only poll frontend if there is at least 1 worker ready to do work
poller.registerSocket(frontend, ZMQ.ZMQ_POLLIN());
}
try {
poller.poll( -1 );
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
if (poller.pollin(1)) {
// Use worker address for LRU routing
var msg = ZMsg.recvMsg(backend);
if (msg == null)
break; // Interrupted
var address = msg.unwrap();
workerQueue.add(address);
// Forward message to client if it's not a READY
var frame = msg.first();
if (frame.streq(LRU_READY))
msg.destroy();
else
msg.send(frontend);
}
if (poller.pollin(2)) {
// Get client request, route to first available worker
var msg = ZMsg.recvMsg(frontend);
if (msg != null) {
msg.wrap(workerQueue.pop());
msg.send(backend);
}
}
}
// When we're done, clean up properly
for (f in workerQueue) {
f.destroy();
}
context.destroy();
}
}
spqueue: Simple Pirate queue in Java
package guide;
import java.util.ArrayList;
import org.zeromq.*;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
//
// Simple Pirate queue
// This is identical to load-balancing pattern, with no reliability mechanisms
// at all. It depends on the client for recovery. Runs forever.
//
public class spqueue
{
private final static String WORKER_READY = "\001"; // Signals worker is ready
public static void main(String[] args)
{
try (ZContext ctx = new ZContext()) {
Socket frontend = ctx.createSocket(SocketType.ROUTER);
Socket backend = ctx.createSocket(SocketType.ROUTER);
frontend.bind("tcp://*:5555"); // For clients
backend.bind("tcp://*:5556"); // For workers
// Queue of available workers
ArrayList<ZFrame> workers = new ArrayList<ZFrame>();
Poller poller = ctx.createPoller(2);
poller.register(backend, Poller.POLLIN);
poller.register(frontend, Poller.POLLIN);
// The body of this example is exactly the same as lruqueue2.
while (true) {
boolean workersAvailable = workers.size() > 0;
int rc = poller.poll(-1);
// Poll frontend only if we have available workers
if (rc == -1)
break; // Interrupted
// Handle worker activity on backend
if (poller.pollin(0)) {
// Use worker address for LRU routing
ZMsg msg = ZMsg.recvMsg(backend);
if (msg == null)
break; // Interrupted
ZFrame address = msg.unwrap();
workers.add(address);
// Forward message to client if it's not a READY
ZFrame frame = msg.getFirst();
if (new String(frame.getData(), ZMQ.CHARSET).equals(WORKER_READY))
msg.destroy();
else msg.send(frontend);
}
if (workersAvailable && poller.pollin(1)) {
// Get client request, route to first available worker
ZMsg msg = ZMsg.recvMsg(frontend);
if (msg != null) {
msg.wrap(workers.remove(0));
msg.send(backend);
}
}
}
// When we're done, clean up properly
while (workers.size() > 0) {
ZFrame frame = workers.remove(0);
frame.destroy();
}
workers.clear();
}
}
}
spqueue: Simple Pirate queue in Julia
spqueue: Simple Pirate queue in Lua
--
-- Simple Pirate queue
-- This is identical to the LRU pattern, with no reliability mechanisms
-- at all. It depends on the client for recovery. Runs forever.
--
-- Author: Robert G. Jakabosky <bobby@sharedrealm.com>
--
require"zmq"
require"zmq.poller"
require"zhelpers"
require"zmsg"
local tremove = table.remove
local MAX_WORKERS = 100
s_version_assert (2, 1)
-- Prepare our context and sockets
local context = zmq.init(1)
local frontend = context:socket(zmq.ROUTER)
local backend = context:socket(zmq.ROUTER)
frontend:bind("tcp://*:5555"); -- For clients
backend:bind("tcp://*:5556"); -- For workers
-- Queue of available workers
local worker_queue = {}
local is_accepting = false
local poller = zmq.poller(2)
local function frontend_cb()
-- Now get next client request, route to next worker
local msg = zmsg.recv (frontend)
-- Dequeue a worker from the queue.
local worker = tremove(worker_queue, 1)
msg:wrap(worker, "")
msg:send(backend)
if (#worker_queue == 0) then
-- stop accepting work from clients, when no workers are available.
poller:remove(frontend)
is_accepting = false
end
end
-- Handle worker activity on backend
poller:add(backend, zmq.POLLIN, function()
local msg = zmsg.recv(backend)
-- Use worker address for LRU routing
worker_queue[#worker_queue + 1] = msg:unwrap()
-- start accepting client requests, if we are not already doing so.
if not is_accepting then
is_accepting = true
poller:add(frontend, zmq.POLLIN, frontend_cb)
end
-- Forward message to client if it's not a READY
if (msg:address() ~= "READY") then
msg:send(frontend)
end
end)
-- start poller's event loop
poller:start()
-- We never exit the main loop
spqueue: Simple Pirate queue in Node.js
spqueue: Simple Pirate queue in Objective-C
spqueue: Simple Pirate queue in ooc
spqueue: Simple Pirate queue in Perl
spqueue: Simple Pirate queue in PHP
<?php
/*
* Simple Pirate queue
* This is identical to the LRU pattern, with no reliability mechanisms
* at all. It depends on the client for recovery. Runs forever.
*
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
include 'zmsg.php';
define("MAX_WORKERS", 100);
// Prepare our context and sockets
$context = new ZMQContext();
$frontend = $context->getSocket(ZMQ::SOCKET_ROUTER);
$backend = $context->getSocket(ZMQ::SOCKET_ROUTER);
$frontend->bind("tcp://*:5555"); // For clients
$backend->bind("tcp://*:5556"); // For workers
// Queue of available workers
$available_workers = 0;
$worker_queue = array();
$read = $write = array();
while (true) {
$poll = new ZMQPoll();
$poll->add($backend, ZMQ::POLL_IN);
// Poll frontend only if we have available workers
if ($available_workers) {
$poll->add($frontend, ZMQ::POLL_IN);
}
$events = $poll->poll($read, $write);
foreach ($read as $socket) {
$zmsg = new Zmsg($socket);
$zmsg->recv();
// Handle worker activity on backend
if ($socket === $backend) {
// Use worker address for LRU routing
assert($available_workers < MAX_WORKERS);
array_push($worker_queue, $zmsg->unwrap());
$available_workers++;
// Return reply to client if it's not a READY
if ($zmsg->address() != "READY") {
$zmsg->set_socket($frontend)->send();
}
} elseif ($socket === $frontend) {
// Now get next client request, route to next worker
// REQ socket in worker needs an envelope delimiter
// Dequeue and drop the next worker address
$zmsg->wrap(array_shift($worker_queue), "");
$zmsg->set_socket($backend)->send();
$available_workers--;
}
}
// We never exit the main loop
}
spqueue: Simple Pirate queue in Python
#
# Simple Pirate queue
# This is identical to the LRU pattern, with no reliability mechanisms
# at all. It depends on the client for recovery. Runs forever.
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
import zmq
LRU_READY = "\x01"
context = zmq.Context(1)
frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER) # ROUTER
frontend.bind("tcp://*:5555") # For clients
backend.bind("tcp://*:5556") # For workers
poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)
poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)
workers = []
while True:
if workers:
socks = dict(poll_both.poll())
else:
socks = dict(poll_workers.poll())
# Handle worker activity on backend
if socks.get(backend) == zmq.POLLIN:
# Use worker address for LRU routing
msg = backend.recv_multipart()
if not msg:
break
address = msg[0]
workers.append(address)
# Everything after the second (delimiter) frame is reply
reply = msg[2:]
# Forward message to client if it's not a READY
if reply[0] != LRU_READY:
frontend.send_multipart(reply)
if socks.get(frontend) == zmq.POLLIN:
# Get client request, route to first available worker
msg = frontend.recv_multipart()
request = [workers.pop(0), ''.encode()] + msg
backend.send_multipart(request)
spqueue: Simple Pirate queue in Q
spqueue: Simple Pirate queue in Racket
spqueue: Simple Pirate queue in Ruby
spqueue: Simple Pirate queue in Rust
spqueue: Simple Pirate queue in Scala
spqueue: Simple Pirate queue in Tcl
#
# Simple Pirate queue
# This is identical to the LRU pattern, with no reliability mechanisms
# at all. It depends on the client for recovery. Runs forever.
#
package require zmq
set LRU_READY "READY" ;# Signals worker is ready
# Prepare our context and sockets
zmq context context
zmq socket frontend context ROUTER
zmq socket backend context ROUTER
frontend bind "tcp://*:5555" ;# For clients
backend bind "tcp://*:5556" ;# For workers
# Queue of available workers
set workers {}
while {1} {
if {[llength $workers]} {
set poll_set [list [list backend [list POLLIN]] [list frontend [list POLLIN]]]
} else {
set poll_set [list [list backend [list POLLIN]]]
}
set rpoll_set [zmq poll $poll_set -1]
foreach rpoll $rpoll_set {
switch [lindex $rpoll 0] {
backend {
# Use worker address for LRU routing
set msg [zmsg recv backend]
set address [zmsg unwrap msg]
lappend workers $address
# Forward message to client if it's not a READY
if {[lindex $msg 0] ne $LRU_READY} {
zmsg send frontend $msg
}
}
frontend {
# Get client request, route to first available worker
set msg [zmsg recv frontend]
set workers [lassign $workers worker]
set msg [zmsg wrap $msg $worker]
zmsg send backend $msg
}
}
}
}
frontend close
backend close
context term
spqueue: Simple Pirate queue in OCaml
Here is the worker, which takes the Lazy Pirate server and adapts it for the load balancing pattern (using the REQ “ready” signaling):
spworker: Simple Pirate worker in Ada
spworker: Simple Pirate worker in Basic
spworker: Simple Pirate worker in C
// Simple Pirate worker
// Connects REQ socket to tcp://*:5556
// Implements worker part of load-balancing
#include "czmq.h"
#define WORKER_READY "\001" // Signals worker is ready
int main (void)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
// Set random identity to make tracing easier
srandom ((unsigned) time (NULL));
char identity [10];
sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
zmq_setsockopt (worker, ZMQ_IDENTITY, identity, strlen (identity));
zsocket_connect (worker, "tcp://localhost:5556");
// Tell broker we're ready for work
printf ("I: (%s) worker ready\n", identity);
zframe_t *frame = zframe_new (WORKER_READY, 1);
zframe_send (&frame, worker, 0);
int cycles = 0;
while (true) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted
// Simulate various problems, after a few cycles
cycles++;
if (cycles > 3 && randof (5) == 0) {
printf ("I: (%s) simulating a crash\n", identity);
zmsg_destroy (&msg);
break;
}
else
if (cycles > 3 && randof (5) == 0) {
printf ("I: (%s) simulating CPU overload\n", identity);
sleep (3);
if (zctx_interrupted)
break;
}
printf ("I: (%s) normal reply\n", identity);
sleep (1); // Do some heavy work
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return 0;
}
spworker: Simple Pirate worker in C++
//
// Simple Pirate worker
// Connects REQ socket to tcp://*:5556
// Implements worker part of LRU queueing
//
// Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at>
#include "zmsg.hpp"
int main (void)
{
srandom ((unsigned) time (NULL));
zmq::context_t context(1);
zmq::socket_t worker(context, ZMQ_REQ);
// Set random identity to make tracing easier
std::string identity = s_set_id(worker);
worker.connect("tcp://localhost:5556");
// Tell queue we're ready for work
std::cout << "I: (" << identity << ") worker ready" << std::endl;
s_send (worker, std::string("READY"));
int cycles = 0;
while (1) {
zmsg zm (worker);
// Simulate various problems, after a few cycles
cycles++;
if (cycles > 3 && within (5) == 0) {
std::cout << "I: (" << identity << ") simulating a crash" << std::endl;
zm.clear ();
break;
}
else
if (cycles > 3 && within (5) == 0) {
std::cout << "I: (" << identity << ") simulating CPU overload" << std::endl;
sleep (5);
}
std::cout << "I: (" << identity << ") normal reply - " << zm.body () << std::endl;
sleep (1); // Do some heavy work
zm.send(worker);
}
return 0;
}
spworker: Simple Pirate worker in C#
spworker: Simple Pirate worker in CL
spworker: Simple Pirate worker in Delphi
program spworker;
//
// Simple Pirate worker
// Connects REQ socket to tcp://*:5556
// Implements worker part of load-balancing
// @author Varga Balazs <bb.varga@gmail.com>
//
{$APPTYPE CONSOLE}
uses
SysUtils
, zmqapi
, zhelpers
;
const
WORKER_READY = '\001'; // Signals worker is ready
var
ctx: TZMQContext;
worker: TZMQSocket;
identity: String;
frame: TZMQFrame;
cycles: Integer;
msg: TZMQMsg;
begin
ctx := TZMQContext.create;
worker := ctx.Socket( stReq );
// Set random identity to make tracing easier
identity := s_random( 8 );
worker.Identity := identity;
worker.connect( 'tcp://localhost:5556' );
// Tell broker we're ready for work
Writeln( Format( 'I: (%s) worker ready', [identity] ) );
frame := TZMQFrame.create;
frame.asUtf8String := WORKER_READY;
worker.send( frame );
cycles := 0;
while not ctx.Terminated do
try
worker.recv( msg );
// Simulate various problems, after a few cycles
Inc( cycles );
if ((cycles > 3) and (random(5) = 0)) then
begin
Writeln( Format( 'I: (%s) simulating a crash', [identity] ) );
msg.Free;
msg := nil;
break;
end else
if ( (cycles > 3) and (random(5) = 0) ) then
begin
Writeln( Format( 'I: (%s) simulating CPU overload', [identity] ));
sleep (3000);
end;
Writeln( Format('I: (%s) normal reply', [identity]) );
sleep(1000); // Do some heavy work
worker.send( msg );
except
end;
ctx.Free;
end.
spworker: Simple Pirate worker in Erlang
spworker: Simple Pirate worker in Elixir
spworker: Simple Pirate worker in F#
spworker: Simple Pirate worker in Felix
spworker: Simple Pirate worker in Go
// Simple Pirate worker
// Connects REQ socket to tcp://*:5556
// Implements worker part of load-balancing
//
// Author: iano <scaly.iano@gmail.com>
// Based on C & Python example
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq"
"math/rand"
"time"
)
const LRU_READY = "\001"
func main() {
context, _ := zmq.NewContext()
defer context.Close()
worker, _ := context.NewSocket(zmq.REQ)
defer worker.Close()
// Set random identity to make tracing easier
src := rand.NewSource(time.Now().UnixNano())
random := rand.New(src)
identity := fmt.Sprintf("%04X-%04X", random.Intn(0x10000), random.Intn(0x10000))
worker.SetIdentity(identity)
worker.Connect("tcp://localhost:5556")
// Tell broker we're ready for work
fmt.Printf("I: (%s) worker ready\n", identity)
worker.Send([]byte(LRU_READY), 0)
for cycles := 1; ; cycles++ {
msg, err := worker.RecvMultipart(0)
if err != nil {
panic(err) // Interrupted
}
if cycles > 3 {
switch r := random.Intn(5); r {
case 0:
fmt.Printf("I: (%s) simulating a crash\n", identity)
return
case 1:
fmt.Printf("I: (%s) simulating CPU overload\n", identity)
time.Sleep(3 * time.Second)
}
}
fmt.Printf("I: (%s) normal reply\n", identity)
time.Sleep(1 * time.Second) // Do some heavy work
worker.SendMultipart(msg, 0)
}
}
spworker: Simple Pirate worker in Haskell
{--
Simple Pirate worker in Haskell
--}
module Main where
import System.ZMQ4.Monadic
import ZHelpers
import System.Random (randomRIO)
import System.Exit (exitSuccess)
import Control.Monad (when)
import Control.Concurrent (threadDelay)
import Control.Applicative ((<$>))
import Data.ByteString.Char8 (pack, unpack, empty)
workerReady = "\001"
main :: IO ()
main =
runZMQ $ do
worker <- socket Req
setRandomIdentity worker
connect worker "tcp://localhost:5556"
id <- identity worker
liftIO $ putStrLn $ "I: Worker ready " ++ unpack id
send worker [SendMore] empty
send worker [SendMore] empty
send worker [] (pack workerReady)
sendRequests worker 1
sendRequests :: Socket z Req -> Int -> ZMQ z ()
sendRequests worker cycles = do
clID <- receive worker
msg <- (receive worker >> receive worker)
chance <- liftIO $ randomRIO (0::Int, 5)
id <- identity worker
if cycles > 3 && chance == 0
then do
liftIO $ putStrLn $ "I: Simulating a crash " ++ unpack id
liftIO $ exitSuccess
else do
chance' <- liftIO $ randomRIO (0::Int, 5)
when (cycles > 3 && chance' == 0) $ do
liftIO $ putStrLn $ "I: Simulating overload " ++ unpack id
liftIO $ threadDelay $ 3 * 1000 * 1000
liftIO $ putStrLn $ "I: Normal reply " ++ unpack id
liftIO $ threadDelay $ 1 * 1000 * 1000
send worker [SendMore] clID
send worker [SendMore] (pack "")
send worker [] msg
sendRequests worker (cycles+1)
spworker: Simple Pirate worker in Haxe
package ;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMsg;
/**
* Simple Pirate worker
* Connects REQ socket to tcp://*:5556
* Implements worker part of LRU queuing
* @see http://zguide.zeromq.org/page:all#Basic-Reliable-Queuing-Simple-Pirate-Pattern
*/
class SPWorker
{
// Signals workers are ready
private static inline var LRU_READY:String = String.fromCharCode(1);
public static function main() {
Lib.println("** SPWorker (see: http://zguide.zeromq.org/page:all#Basic-Reliable-Queuing-Simple-Pirate-Pattern)");
var ctx = new ZContext();
var worker = ctx.createSocket(ZMQ_REQ);
// Set random identity to make tracing easier
var identity = ZHelpers.setID(worker);
worker.connect("tcp://localhost:5556");
// Tell broker we're ready for work
Lib.println("I: (" + identity + ") worker ready");
ZFrame.newStringFrame(LRU_READY).send(worker);
var cycles = 0;
while (true) {
var msg = ZMsg.recvMsg(worker);
if (msg == null)
break; // Interrupted
cycles++;
// Simulate various problems, after a few cycles
if (cycles > 3 && ZHelpers.randof(5) == 0) {
Lib.println("I: simulating a crash");
break;
}
else if (cycles > 3 && ZHelpers.randof(5) == 0) {
Lib.println("I: simulating CPU overload");
Sys.sleep(3.0);
if (ZMQ.isInterrupted())
break;
}
Lib.println("I: ("+identity+") normal reply");
Sys.sleep(1.0); // Do some heavy work
msg.send(worker);
}
ctx.destroy();
}
}
spworker: Simple Pirate worker in Java
package guide;
import java.util.Random;
import org.zeromq.*;
import org.zeromq.ZMQ.Socket;
//
// Simple Pirate worker
// Connects REQ socket to tcp://*:5556
// Implements worker part of load-balancing queueing
//
public class spworker
{
private final static String WORKER_READY = "\001"; // Signals worker is ready
public static void main(String[] args) throws Exception
{
try (ZContext ctx = new ZContext()) {
Socket worker = ctx.createSocket(SocketType.REQ);
// Set random identity to make tracing easier
Random rand = new Random(System.nanoTime());
String identity = String.format(
"%04X-%04X", rand.nextInt(0x10000), rand.nextInt(0x10000)
);
worker.setIdentity(identity.getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5556");
// Tell broker we're ready for work
System.out.printf("I: (%s) worker ready\n", identity);
ZFrame frame = new ZFrame(WORKER_READY);
frame.send(worker, 0);
int cycles = 0;
while (true) {
ZMsg msg = ZMsg.recvMsg(worker);
if (msg == null)
break; // Interrupted
// Simulate various problems, after a few cycles
cycles++;
if (cycles > 3 && rand.nextInt(5) == 0) {
System.out.printf("I: (%s) simulating a crash\n", identity);
msg.destroy();
break;
}
else if (cycles > 3 && rand.nextInt(5) == 0) {
System.out.printf(
"I: (%s) simulating CPU overload\n", identity
);
Thread.sleep(3000);
}
System.out.printf("I: (%s) normal reply\n", identity);
Thread.sleep(1000); // Do some heavy work
msg.send(worker);
}
}
}
}
spworker: Simple Pirate worker in Julia
spworker: Simple Pirate worker in Lua
--
-- Simple Pirate worker
-- Connects REQ socket to tcp://*:5556
-- Implements worker part of LRU queueing
--
-- Author: Robert G. Jakabosky <bobby@sharedrealm.com>
--
require"zmq"
require"zmsg"
math.randomseed(os.time())
local context = zmq.init(1)
local worker = context:socket(zmq.REQ)
-- Set random identity to make tracing easier
local identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
worker:setopt(zmq.IDENTITY, identity)
worker:connect("tcp://localhost:5556")
-- Tell queue we're ready for work
printf ("I: (%s) worker ready\n", identity)
worker:send("READY")
local cycles = 0
while true do
local msg = zmsg.recv (worker)
-- Simulate various problems, after a few cycles
cycles = cycles + 1
if (cycles > 3 and randof (5) == 0) then
printf ("I: (%s) simulating a crash\n", identity)
break
elseif (cycles > 3 and randof (5) == 0) then
printf ("I: (%s) simulating CPU overload\n", identity)
s_sleep (5000)
end
printf ("I: (%s) normal reply - %s\n",
identity, msg:body())
s_sleep (1000) -- Do some heavy work
msg:send(worker)
end
worker:close()
context:term()
spworker: Simple Pirate worker in Node.js
spworker: Simple Pirate worker in Objective-C
spworker: Simple Pirate worker in ooc
spworker: Simple Pirate worker in Perl
spworker: Simple Pirate worker in PHP
<?php
/*
* Simple Pirate worker
* Connects REQ socket to tcp://*:5556
* Implements worker part of LRU queueing
*
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
include 'zmsg.php';
$context = new ZMQContext();
$worker = new ZMQSocket($context, ZMQ::SOCKET_REQ);
// Set random identity to make tracing easier
$identity = sprintf ("%04X-%04X", rand(0, 0x10000), rand(0, 0x10000));
$worker->setSockOpt(ZMQ::SOCKOPT_IDENTITY, $identity);
$worker->connect("tcp://localhost:5556");
// Tell queue we're ready for work
printf ("I: (%s) worker ready%s", $identity, PHP_EOL);
$worker->send("READY");
$cycles = 0;
while (true) {
$zmsg = new Zmsg($worker);
$zmsg->recv();
$cycles++;
// Simulate various problems, after a few cycles
if ($cycles > 3 && rand(0, 3) == 0) {
printf ("I: (%s) simulating a crash%s", $identity, PHP_EOL);
break;
} elseif ($cycles > 3 && rand(0, 3) == 0) {
printf ("I: (%s) simulating CPU overload%s", $identity, PHP_EOL);
sleep(5);
}
printf ("I: (%s) normal reply - %s%s", $identity, $zmsg->body(), PHP_EOL);
sleep(1); // Do some heavy work
$zmsg->send();
}
spworker: Simple Pirate worker in Python
#
# Simple Pirate worker
# Connects REQ socket to tcp://*:5556
# Implements worker part of LRU queueing
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
from random import randint
import time
import zmq
LRU_READY = "\x01"
context = zmq.Context(1)
worker = context.socket(zmq.REQ)
identity = "%04X-%04X" % (randint(0, 0x10000), randint(0,0x10000))
worker.setsockopt_string(zmq.IDENTITY, identity)
worker.connect("tcp://localhost:5556")
print("I: (%s) worker ready" % identity)
worker.send_string(LRU_READY)
cycles = 0
while True:
msg = worker.recv_multipart()
if not msg:
break
cycles += 1
if cycles>0 and randint(0, 5) == 0:
print("I: (%s) simulating a crash" % identity)
break
elif cycles>3 and randint(0, 5) == 0:
print("I: (%s) simulating CPU overload" % identity)
time.sleep(3)
print("I: (%s) normal reply" % identity)
time.sleep(1) # Do some heavy work
worker.send_multipart(msg)
spworker: Simple Pirate worker in Q
spworker: Simple Pirate worker in Racket
spworker: Simple Pirate worker in Ruby
spworker: Simple Pirate worker in Rust
spworker: Simple Pirate worker in Scala
spworker: Simple Pirate worker in Tcl
#
# Simple Pirate worker
# Connects REQ socket to tcp://*:5556
# Implements worker part of LRU queueing
#
package require zmq
set LRU_READY "READY" ;# Signals worker is ready
expr {srand([pid])}
zmq context context
zmq socket worker context REQ
# Set random identity to make tracing easier
set identity [format "%04X-%04X" [expr {int(rand()*0x10000)}] [expr {int(rand()*0x10000)}]]
worker setsockopt IDENTITY $identity
worker connect "tcp://localhost:5556"
# Tell broker we're ready for work
puts "I: ($identity) worker ready"
worker send $LRU_READY
set cycles 0
while {1} {
set msg [zmsg recv worker]
# Simulate various problems, after a few cycles
incr cycles
if {$cycles > 3 && [expr {int(rand()*5)}] == 0} {
puts "I: ($identity) simulating a crash"
break
} elseif {$cycles > 3 && [expr {int(rand()*5)}] == 0} {
puts "I: ($identity) simulating CPU overload"
after 3000
}
puts "I: ($identity) normal reply"
after 1000 ;# Do some heavy work
zmsg send worker $msg
}
worker close
context term
spworker: Simple Pirate worker in OCaml
To test this, start a handful of workers, a Lazy Pirate client, and the queue, in any order. You’ll see that the workers eventually all crash and burn, and the client retries and then gives up. The queue never stops, and you can restart workers and clients ad nauseam. This model works with any number of clients and workers.
Robust Reliable Queuing (Paranoid Pirate Pattern) #
The Simple Pirate Queue pattern works pretty well, especially because it’s just a combination of two existing patterns. Still, it does have some weaknesses:
-
It’s not robust in the face of a queue crash and restart. The client will recover, but the workers won’t. While ZeroMQ will reconnect workers’ sockets automatically, as far as the newly started queue is concerned, the workers haven’t signaled ready, so don’t exist. To fix this, we have to do heartbeating from queue to worker so that the worker can detect when the queue has gone away.
-
The queue does not detect worker failure, so if a worker dies while idle, the queue can’t remove it from its worker queue until the queue sends it a request. The client waits and retries for nothing. It’s not a critical problem, but it’s not nice. To make this work properly, we do heartbeating from worker to queue, so that the queue can detect a lost worker at any stage.
We’ll fix these in a properly pedantic Paranoid Pirate Pattern.
We previously used a REQ socket for the worker. For the Paranoid Pirate worker, we’ll switch to a DEALER socket. This has the advantage of letting us send and receive messages at any time, rather than the lock-step send/receive that REQ imposes. The downside of DEALER is that we have to do our own envelope management (re-read Chapter 3 - Advanced Request-Reply Patterns for background on this concept).
We’re still using the Lazy Pirate client. Here is the Paranoid Pirate queue proxy:
ppqueue: Paranoid Pirate queue in Ada
ppqueue: Paranoid Pirate queue in Basic
ppqueue: Paranoid Pirate queue in C
// Paranoid Pirate queue
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
// Paranoid Pirate Protocol constants
#define PPP_READY "\001" // Signals worker is ready
#define PPP_HEARTBEAT "\002" // Signals worker heartbeat
// .split worker class structure
// Here we define the worker class; a structure and a set of functions that
// act as constructor, destructor, and methods on worker objects:
typedef struct {
zframe_t *identity; // Identity of worker
char *id_string; // Printable identity
int64_t expiry; // Expires at this time
} worker_t;
// Construct new worker
static worker_t *
s_worker_new (zframe_t *identity)
{
worker_t *self = (worker_t *) zmalloc (sizeof (worker_t));
self->identity = identity;
self->id_string = zframe_strhex (identity);
self->expiry = zclock_time ()
+ HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
return self;
}
// Destroy specified worker object, including identity frame.
static void
s_worker_destroy (worker_t **self_p)
{
assert (self_p);
if (*self_p) {
worker_t *self = *self_p;
zframe_destroy (&self->identity);
free (self->id_string);
free (self);
*self_p = NULL;
}
}
// .split worker ready method
// The ready method puts a worker to the end of the ready list:
static void
s_worker_ready (worker_t *self, zlist_t *workers)
{
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
if (streq (self->id_string, worker->id_string)) {
zlist_remove (workers, worker);
s_worker_destroy (&worker);
break;
}
worker = (worker_t *) zlist_next (workers);
}
zlist_append (workers, self);
}
// .split get next available worker
// The next method returns the next available worker identity:
static zframe_t *
s_workers_next (zlist_t *workers)
{
worker_t *worker = zlist_pop (workers);
assert (worker);
zframe_t *frame = worker->identity;
worker->identity = NULL;
s_worker_destroy (&worker);
return frame;
}
// .split purge expired workers
// The purge method looks for and kills expired workers. We hold workers
// from oldest to most recent, so we stop at the first alive worker:
static void
s_workers_purge (zlist_t *workers)
{
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
if (zclock_time () < worker->expiry)
break; // Worker is alive, we're done here
zlist_remove (workers, worker);
s_worker_destroy (&worker);
worker = (worker_t *) zlist_first (workers);
}
}
// .split main task
// The main task is a load-balancer with heartbeating on workers so we
// can detect crashed or blocked worker tasks:
int main (void)
{
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555"); // For clients
zsocket_bind (backend, "tcp://*:5556"); // For workers
// List of available workers
zlist_t *workers = zlist_new ();
// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
while (true) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
int rc = zmq_poll (items, zlist_size (workers)? 2: 1,
HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted
// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
// Use worker identity for load-balancing
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // Interrupted
// Any sign of life from worker means it's ready
zframe_t *identity = zmsg_unwrap (msg);
worker_t *worker = s_worker_new (identity);
s_worker_ready (worker, workers);
// Validate control message, or return reply to client
if (zmsg_size (msg) == 1) {
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), PPP_READY, 1)
&& memcmp (zframe_data (frame), PPP_HEARTBEAT, 1)) {
printf ("E: invalid message from worker");
zmsg_dump (msg);
}
zmsg_destroy (&msg);
}
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// Now get next client request, route to next worker
zmsg_t *msg = zmsg_recv (frontend);
if (!msg)
break; // Interrupted
zframe_t *identity = s_workers_next (workers);
zmsg_prepend (msg, &identity);
zmsg_send (&msg, backend);
}
// .split handle heartbeating
// We handle heartbeating after any socket activity. First, we send
// heartbeats to any idle workers if it's time. Then, we purge any
// dead workers:
if (zclock_time () >= heartbeat_at) {
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
zframe_send (&worker->identity, backend,
ZFRAME_REUSE + ZFRAME_MORE);
zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
zframe_send (&frame, backend, 0);
worker = (worker_t *) zlist_next (workers);
}
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
s_workers_purge (workers);
}
// When we're done, clean up properly
while (zlist_size (workers)) {
worker_t *worker = (worker_t *) zlist_pop (workers);
s_worker_destroy (&worker);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}
ppqueue: Paranoid Pirate queue in C++
//
// Paranoid Pirate queue
//
// Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at>
//
#include "zmsg.hpp"
#include <stdint.h>
#include <vector>
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
// This defines one active worker in our worker queue
typedef struct {
std::string identity; // Address of worker
int64_t expiry; // Expires at this time
} worker_t;
// Insert worker at end of queue, reset expiry
// Worker must not already be in queue
static void
s_worker_append (std::vector<worker_t> &queue, std::string &identity)
{
bool found = false;
for (std::vector<worker_t>::iterator it = queue.begin(); it < queue.end(); it++) {
if (it->identity.compare(identity) == 0) {
std::cout << "E: duplicate worker identity " << identity.c_str() << std::endl;
found = true;
break;
}
}
if (!found) {
worker_t worker;
worker.identity = identity;
worker.expiry = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
queue.push_back(worker);
}
}
// Remove worker from queue, if present
static void
s_worker_delete (std::vector<worker_t> &queue, std::string &identity)
{
for (std::vector<worker_t>::iterator it = queue.begin(); it < queue.end(); it++) {
if (it->identity.compare(identity) == 0) {
it = queue.erase(it);
break;
}
}
}
// Reset worker expiry, worker must be present
static void
s_worker_refresh (std::vector<worker_t> &queue, std::string &identity)
{
bool found = false;
for (std::vector<worker_t>::iterator it = queue.begin(); it < queue.end(); it++) {
if (it->identity.compare(identity) == 0) {
it->expiry = s_clock ()
+ HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
found = true;
break;
}
}
if (!found) {
std::cout << "E: worker " << identity << " not ready" << std::endl;
}
}
// Pop next available worker off queue, return identity
static std::string
s_worker_dequeue (std::vector<worker_t> &queue)
{
assert (queue.size());
std::string identity = queue[0].identity;
queue.erase(queue.begin());
return identity;
}
// Look for & kill expired workers
static void
s_queue_purge (std::vector<worker_t> &queue)
{
int64_t clock = s_clock();
for (std::vector<worker_t>::iterator it = queue.begin(); it < queue.end(); it++) {
if (clock > it->expiry) {
it = queue.erase(it)-1;
}
}
}
int main (void)
{
s_version_assert (4, 0);
// Prepare our context and sockets
zmq::context_t context(1);
zmq::socket_t frontend(context, ZMQ_ROUTER);
zmq::socket_t backend (context, ZMQ_ROUTER);
frontend.bind("tcp://*:5555"); // For clients
backend.bind ("tcp://*:5556"); // For workers
// Queue of available workers
std::vector<worker_t> queue;
// Send out heartbeats at regular intervals
int64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
while (1) {
zmq::pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
if (queue.size()) {
zmq::poll (items, 2, HEARTBEAT_INTERVAL);
} else {
zmq::poll (items, 1, HEARTBEAT_INTERVAL);
}
// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
zmsg msg (backend);
std::string identity(msg.unwrap ());
// Return reply to client if it's not a control message
if (msg.parts () == 1) {
if (strcmp (msg.address (), "READY") == 0) {
s_worker_delete (queue, identity);
s_worker_append (queue, identity);
}
else {
if (strcmp (msg.address (), "HEARTBEAT") == 0) {
s_worker_refresh (queue, identity);
} else {
std::cout << "E: invalid message from " << identity << std::endl;
msg.dump ();
}
}
}
else {
msg.send (frontend);
s_worker_append (queue, identity);
}
}
if (items [1].revents & ZMQ_POLLIN) {
// Now get next client request, route to next worker
zmsg msg (frontend);
std::string identity = std::string(s_worker_dequeue (queue));
msg.push_front((char*)identity.c_str());
msg.send (backend);
}
// Send heartbeats to idle workers if it's time
if (s_clock () > heartbeat_at) {
for (std::vector<worker_t>::iterator it = queue.begin(); it < queue.end(); it++) {
zmsg msg ("HEARTBEAT");
msg.wrap (it->identity.c_str(), NULL);
msg.send (backend);
}
heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
}
s_queue_purge(queue);
}
// We never exit the main loop
// But pretend to do the right shutdown anyhow
queue.clear();
return 0;
}
ppqueue: Paranoid Pirate queue in C#
ppqueue: Paranoid Pirate queue in CL
ppqueue: Paranoid Pirate queue in Delphi
ppqueue: Paranoid Pirate queue in Erlang
ppqueue: Paranoid Pirate queue in Elixir
ppqueue: Paranoid Pirate queue in F#
ppqueue: Paranoid Pirate queue in Felix
ppqueue: Paranoid Pirate queue in Go
// Paranoid Pirate queue
//
// Author: iano <scaly.iano@gmail.com>
// Based on C & Python example
package main
import (
"container/list"
"fmt"
zmq "github.com/alecthomas/gozmq"
"time"
)
const (
HEARTBEAT_INTERVAL = time.Second // time.Duration
// Paranoid Pirate Protocol constants
PPP_READY = "\001" // Signals worker is ready
PPP_HEARTBEAT = "\002" // Signals worker heartbeat
)
type PPWorker struct {
address []byte // Address of worker
expiry time.Time // Expires at this time
}
func NewPPWorker(address []byte) *PPWorker {
return &PPWorker{
address: address,
expiry: time.Now().Add(HEARTBEAT_LIVENESS * HEARTBEAT_INTERVAL),
}
}
type WorkerQueue struct {
queue *list.List
}
func NewWorkerQueue() *WorkerQueue {
return &WorkerQueue{
queue: list.New(),
}
}
func (workers *WorkerQueue) Len() int {
return workers.queue.Len()
}
func (workers *WorkerQueue) Next() []byte {
elem := workers.queue.Back()
worker, _ := elem.Value.(*PPWorker)
workers.queue.Remove(elem)
return worker.address
}
func (workers *WorkerQueue) Ready(worker *PPWorker) {
for elem := workers.queue.Front(); elem != nil; elem = elem.Next() {
if w, _ := elem.Value.(*PPWorker); string(w.address) == string(worker.address) {
workers.queue.Remove(elem)
break
}
}
workers.queue.PushBack(worker)
}
func (workers *WorkerQueue) Purge() {
now := time.Now()
for elem := workers.queue.Front(); elem != nil; elem = workers.queue.Front() {
if w, _ := elem.Value.(*PPWorker); w.expiry.After(now) {
break
}
workers.queue.Remove(elem)
}
}
func main() {
context, _ := zmq.NewContext()
defer context.Close()
frontend, _ := context.NewSocket(zmq.ROUTER)
defer frontend.Close()
frontend.Bind("tcp://*:5555") // For clients
backend, _ := context.NewSocket(zmq.ROUTER)
defer backend.Close()
backend.Bind("tcp://*:5556") // For workers
workers := NewWorkerQueue()
heartbeatAt := time.Now().Add(HEARTBEAT_INTERVAL)
for {
items := zmq.PollItems{
zmq.PollItem{Socket: backend, Events: zmq.POLLIN},
zmq.PollItem{Socket: frontend, Events: zmq.POLLIN},
}
// Poll frontend only if we have available workers
if workers.Len() > 0 {
zmq.Poll(items, HEARTBEAT_INTERVAL)
} else {
zmq.Poll(items[:1], HEARTBEAT_INTERVAL)
}
// Handle worker activity on backend
if items[0].REvents&zmq.POLLIN != 0 {
frames, err := backend.RecvMultipart(0)
if err != nil {
panic(err) // Interrupted
}
address := frames[0]
workers.Ready(NewPPWorker(address))
// Validate control message, or return reply to client
if msg := frames[1:]; len(msg) == 1 {
switch status := string(msg[0]); status {
case PPP_READY:
fmt.Println("I: PPWorker ready")
case PPP_HEARTBEAT:
fmt.Println("I: PPWorker heartbeat")
default:
fmt.Println("E: Invalid message from worker: ", msg)
}
} else {
frontend.SendMultipart(msg, 0)
}
}
if items[1].REvents&zmq.POLLIN != 0 {
// Now get next client request, route to next worker
frames, err := frontend.RecvMultipart(0)
if err != nil {
panic(err)
}
frames = append([][]byte{workers.Next()}, frames...)
backend.SendMultipart(frames, 0)
}
// .split handle heartbeating
// We handle heartbeating after any socket activity. First we send
// heartbeats to any idle workers if it's time. Then we purge any
// dead workers:
if heartbeatAt.Before(time.Now()) {
for elem := workers.queue.Front(); elem != nil; elem = elem.Next() {
w, _ := elem.Value.(*PPWorker)
msg := [][]byte{w.address, []byte(PPP_HEARTBEAT)}
backend.SendMultipart(msg, 0)
}
heartbeatAt = time.Now().Add(HEARTBEAT_INTERVAL)
}
workers.Purge()
}
}
ppqueue: Paranoid Pirate queue in Haskell
{-
Paranoid Pirate Pattern queue in Haskell.
Uses heartbeating to detect crashed or blocked workers.
-}
module Main where
import System.ZMQ4.Monadic
import ZHelpers
import Control.Monad (when, forM_)
import Control.Applicative ((<$>))
import System.IO (hSetEncoding, stdout, utf8)
import Data.ByteString.Char8 (pack, unpack, empty)
import qualified Data.List.NonEmpty as N
type SockID = String
data Worker = Worker {
sockID :: SockID
, expiry :: Integer
} deriving (Show)
heartbeatLiveness = 3
heartbeatInterval_ms = 1000
pppReady = "\001"
pppHeartbeat = "\002"
main :: IO ()
main =
runZMQ $ do
frontend <- socket Router
bind frontend "tcp://*:5555"
backend <- socket Router
bind backend "tcp://*:5556"
liftIO $ hSetEncoding stdout utf8
heartbeat_at <- liftIO $ nextHeartbeatTime_ms heartbeatInterval_ms
pollPeers frontend backend [] heartbeat_at
createWorker :: SockID -> IO Worker
createWorker id = do
currTime <- currentTime_ms
let expiry = currTime + heartbeatInterval_ms * heartbeatLiveness
return (Worker id expiry)
pollPeers :: Socket z Router -> Socket z Router -> [Worker] -> Integer -> ZMQ z ()
pollPeers frontend backend workers heartbeat_at = do
let toPoll = getPollList workers
evts <- poll (fromInteger heartbeatInterval_ms) toPoll
workers' <- getBackend backend frontend evts workers
workers'' <- getFrontend frontend backend evts workers'
newHeartbeatAt <- heartbeat backend workers'' heartbeat_at
workersPurged <- purge workers''
pollPeers frontend backend workersPurged newHeartbeatAt
where getPollList [] = [Sock backend [In] Nothing]
getPollList _ = [Sock backend [In] Nothing, Sock frontend [In] Nothing]
getBackend :: Socket z Router -> Socket z Router ->
[[Event]] -> [Worker] -> ZMQ z ([Worker])
getBackend backend frontend evts workers =
if (In `elem` (evts !! 0))
then do
frame <- receiveMulti backend
let wkrID = frame !! 0
msg = frame !! 1
if ((length frame) == 2) -- PPP message
then when (unpack msg `notElem` [pppReady, pppHeartbeat]) $ do
liftIO $ putStrLn $ "E: Invalid message from worker " ++ (unpack msg)
else do -- Route the message to the client
liftIO $ putStrLn "I: Sending normal message to client"
let id = frame !! 1
msg = frame !! 3
send frontend [SendMore] id
send frontend [SendMore] empty
send frontend [] msg
newWorker <- liftIO $ createWorker $ unpack wkrID
return $ workers ++ [newWorker]
else return workers
getFrontend :: Socket z Router -> Socket z Router ->
[[Event]] -> [Worker] -> ZMQ z ([Worker])
getFrontend frontend backend evts workers =
if (length evts > 1 && In `elem` (evts !! 1))
then do -- Route message to workers
frame <- receiveMulti frontend
let wkrID = sockID . head $ workers
send backend [SendMore] (pack wkrID)
send backend [SendMore] empty
sendMulti backend (N.fromList frame)
return $ tail workers
else return workers
heartbeat :: Socket z Router -> [Worker] -> Integer -> ZMQ z Integer
heartbeat backend workers heartbeat_at = do
currTime <- liftIO currentTime_ms
if (currTime >= heartbeat_at)
then do
forM_ workers (\worker -> do
send backend [SendMore] (pack $ sockID worker)
send backend [SendMore] empty
send backend [] (pack pppHeartbeat)
liftIO $ putStrLn $ "I: sending heartbeat to '" ++ (sockID worker) ++ "'")
liftIO $ nextHeartbeatTime_ms heartbeatInterval_ms
else return heartbeat_at
purge :: [Worker] -> ZMQ z ([Worker])
purge workers = do
currTime <- liftIO currentTime_ms
return $ filter (\wkr -> expiry wkr > currTime) workers
ppqueue: Paranoid Pirate queue in Haxe
package ;
import haxe.Stack;
import neko.Lib;
import org.zeromq.ZFrame;
import org.zeromq.ZContext;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import org.zeromq.ZMQException;
/**
* Paranoid Pirate Queue
*
* @see http://zguide.zeromq.org/page:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern
*
* Author: rsmith (at) rsbatechnology (dot) co (dot) uk
*/
class PPQueue
{
private static inline var HEARTBEAT_LIVENESS = 3;
private static inline var HEARTBEAT_INTERVAL = 1000; // msecs
private static inline var PPP_READY = String.fromCharCode(1);
private static inline var PPP_HEARTBEAT = String.fromCharCode(2);
public static function main() {
Lib.println("** PPQueue (see: http://zguide.zeromq.org/page:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern)");
// Prepare our context and sockets
var context:ZContext = new ZContext();
var frontend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
var backend:ZMQSocket = context.createSocket(ZMQ_ROUTER);
frontend.bind("tcp://*:5555"); // For clients
backend.bind("tcp://*:5556"); // For workerQueue
// Queue of available workerQueue
var workerQueue = new WorkerQueue(HEARTBEAT_LIVENESS, HEARTBEAT_INTERVAL);
// Send out heartbeats at regular intervals
var heartbeatAt = Date.now().getTime() + HEARTBEAT_INTERVAL;
var poller = new ZMQPoller();
while (true) {
poller.unregisterAllSockets();
poller.registerSocket(backend, ZMQ.ZMQ_POLLIN());
// Only poll frontend clients if we have at least one worker to do stuff
if (workerQueue.size() > 0) {
poller.registerSocket(frontend, ZMQ.ZMQ_POLLIN());
}
try {
poller.poll(HEARTBEAT_INTERVAL * 1000);
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
// Handle worker activity
if (poller.pollin(1)) {
// use worker addressFrame for LRU routing
var msg = ZMsg.recvMsg(backend);
if (msg == null)
break; // Interrupted
// Any sign of life from worker means it's ready
var addressFrame = msg.unwrap();
var identity = addressFrame.toString();
// Validate control message, or return reply to client
if (msg.size() == 1) {
var frame = msg.first();
if (frame.streq(PPP_READY)) {
workerQueue.delete(identity);
workerQueue.append(addressFrame,identity);
} else if (frame.streq(PPP_HEARTBEAT)) {
workerQueue.refresh(identity);
} else {
Lib.println("E: invalid message from worker");
Lib.println(msg.toString());
}
msg.destroy();
} else {
msg.send(frontend);
workerQueue.append(addressFrame, identity);
}
}
if (poller.pollin(2)) {
// Now get next client request, route to next worker
var msg = ZMsg.recvMsg(frontend);
if (msg == null)
break; // Interrupted
var worker = workerQueue.dequeue();
msg.push(worker.addressFrame.duplicate());
msg.send(backend);
}
// Send heartbeats to idle workerQueue if it's time
if (Date.now().getTime() >= heartbeatAt) {
for ( w in workerQueue) {
var msg = new ZMsg();
msg.add(w.addressFrame.duplicate()); // Add a duplicate of the stored worker addressFrame frame,
// to prevent w.addressFrame ZFrame object from being destroyed when msg is sent
msg.addString(PPP_HEARTBEAT);
msg.send(backend);
}
heartbeatAt = Date.now().getTime() + HEARTBEAT_INTERVAL;
}
workerQueue.purge();
}
// When we're done, clean up properly
context.destroy();
}
}
typedef WorkerT = {
addressFrame:ZFrame,
identity:String,
expiry:Float // in msecs since 1 Jan 1970
};
/**
* Internal class managing a queue of workerQueue
*/
private class WorkerQueue {
// Stores hash of worker heartbeat expiries, keyed by worker identity
private var queue:List<WorkerT>;
private var heartbeatLiveness:Int;
private var heartbeatInterval:Int;
/**
* Constructor
* @param liveness
* @param interval
*/
public function new(liveness:Int, interval:Int) {
queue = new List<WorkerT>();
heartbeatLiveness = liveness;
heartbeatInterval = interval;
}
// Implement Iterable typedef signature
public function iterator():Iterator<WorkerT> {
return queue.iterator();
}
/**
* Insert worker at end of queue, reset expiry
* Worker must not already be in queue
* @param identity
*/
public function append(addressFrame:ZFrame,identity:String) {
if (get(identity) != null)
Lib.println("E: duplicate worker identity " + identity);
else
queue.add({addressFrame:addressFrame, identity:identity, expiry:generateExpiry()});
}
/**
* Remove worker from queue, if present
* @param identity
*/
public function delete(identity:String) {
var w = get(identity);
if (w != null) {
queue.remove(w);
}
}
public function refresh(identity:String) {
var w = get(identity);
if (w == null)
Lib.println("E: worker " + identity + " not ready");
else
w.expiry = generateExpiry();
}
/**
* Pop next worker off queue, return WorkerT
* @param identity
*/
public function dequeue():WorkerT {
return queue.pop();
}
/**
* Look for & kill expired workerQueue
*/
public function purge() {
for (w in queue) {
if (Date.now().getTime() > w.expiry) {
queue.remove(w);
}
}
}
/**
* Return the size of this worker Queue
* @return
*/
public function size():Int {
return queue.length;
}
/**
* Returns a WorkerT anon object if exists in the queue, else null
* @param identity
* @return
*/
private function get(identity:String):WorkerT {
for (w in queue) {
if (w.identity == identity)
return w;
}
return null; // nothing found
}
private inline function generateExpiry():Float {
return Date.now().getTime() + heartbeatInterval * heartbeatLiveness;
}
}
ppqueue: Paranoid Pirate queue in Java
package guide;
import java.util.ArrayList;
import java.util.Iterator;
import org.zeromq.*;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
//
// Paranoid Pirate queue
//
public class ppqueue
{
private final static int HEARTBEAT_LIVENESS = 3; // 3-5 is reasonable
private final static int HEARTBEAT_INTERVAL = 1000; // msecs
// Paranoid Pirate Protocol constants
private final static String PPP_READY = "\001"; // Signals worker is ready
private final static String PPP_HEARTBEAT = "\002"; // Signals worker heartbeat
// Here we define the worker class; a structure and a set of functions that
// as constructor, destructor, and methods on worker objects:
private static class Worker
{
ZFrame address; // Address of worker
String identity; // Printable identity
long expiry; // Expires at this time
protected Worker(ZFrame address)
{
this.address = address;
identity = new String(address.getData(), ZMQ.CHARSET);
expiry = System.currentTimeMillis() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
}
// The ready method puts a worker to the end of the ready list:
protected void ready(ArrayList<Worker> workers)
{
Iterator<Worker> it = workers.iterator();
while (it.hasNext()) {
Worker worker = it.next();
if (identity.equals(worker.identity)) {
it.remove();
break;
}
}
workers.add(this);
}
// The next method returns the next available worker address:
protected static ZFrame next(ArrayList<Worker> workers)
{
Worker worker = workers.remove(0);
assert (worker != null);
ZFrame frame = worker.address;
return frame;
}
// The purge method looks for and kills expired workers. We hold workers
// from oldest to most recent, so we stop at the first alive worker:
protected static void purge(ArrayList<Worker> workers)
{
Iterator<Worker> it = workers.iterator();
while (it.hasNext()) {
Worker worker = it.next();
if (System.currentTimeMillis() < worker.expiry) {
break;
}
it.remove();
}
}
};
// The main task is an LRU queue with heartbeating on workers so we can
// detect crashed or blocked worker tasks:
public static void main(String[] args)
{
try (ZContext ctx = new ZContext()) {
Socket frontend = ctx.createSocket(SocketType.ROUTER);
Socket backend = ctx.createSocket(SocketType.ROUTER);
frontend.bind("tcp://*:5555"); // For clients
backend.bind("tcp://*:5556"); // For workers
// List of available workers
ArrayList<Worker> workers = new ArrayList<Worker>();
// Send out heartbeats at regular intervals
long heartbeat_at = System.currentTimeMillis() + HEARTBEAT_INTERVAL;
Poller poller = ctx.createPoller(2);
poller.register(backend, Poller.POLLIN);
poller.register(frontend, Poller.POLLIN);
while (true) {
boolean workersAvailable = workers.size() > 0;
int rc = poller.poll(HEARTBEAT_INTERVAL);
if (rc == -1)
break; // Interrupted
// Handle worker activity on backend
if (poller.pollin(0)) {
// Use worker address for LRU routing
ZMsg msg = ZMsg.recvMsg(backend);
if (msg == null)
break; // Interrupted
// Any sign of life from worker means it's ready
ZFrame address = msg.unwrap();
Worker worker = new Worker(address);
worker.ready(workers);
// Validate control message, or return reply to client
if (msg.size() == 1) {
ZFrame frame = msg.getFirst();
String data = new String(frame.getData(), ZMQ.CHARSET);
if (!data.equals(PPP_READY) &&
!data.equals(PPP_HEARTBEAT)) {
System.out.println(
"E: invalid message from worker"
);
msg.dump(System.out);
}
msg.destroy();
}
else msg.send(frontend);
}
if (workersAvailable && poller.pollin(1)) {
// Now get next client request, route to next worker
ZMsg msg = ZMsg.recvMsg(frontend);
if (msg == null)
break; // Interrupted
msg.push(Worker.next(workers));
msg.send(backend);
}
// We handle heartbeating after any socket activity. First we
// send heartbeats to any idle workers if it's time. Then we
// purge any dead workers:
if (System.currentTimeMillis() >= heartbeat_at) {
for (Worker worker : workers) {
worker.address.send(
backend, ZFrame.REUSE + ZFrame.MORE
);
ZFrame frame = new ZFrame(PPP_HEARTBEAT);
frame.send(backend, 0);
}
long now = System.currentTimeMillis();
heartbeat_at = now + HEARTBEAT_INTERVAL;
}
Worker.purge(workers);
}
// When we're done, clean up properly
workers.clear();
}
}
}
ppqueue: Paranoid Pirate queue in Julia
ppqueue: Paranoid Pirate queue in Lua
--
-- Paranoid Pirate queue
--
-- Author: Robert G. Jakabosky <bobby@sharedrealm.com>
--
require"zmq"
require"zmq.poller"
require"zmsg"
local MAX_WORKERS = 100
local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable
local HEARTBEAT_INTERVAL = 1000 -- msecs
local tremove = table.remove
-- Insert worker at end of queue, reset expiry
-- Worker must not already be in queue
local function s_worker_append(queue, identity)
if queue[identity] then
printf ("E: duplicate worker identity %s", identity)
else
assert (#queue < MAX_WORKERS)
queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
queue[#queue + 1] = identity
end
end
-- Remove worker from queue, if present
local function s_worker_delete(queue, identity)
for i=1,#queue do
if queue[i] == identity then
tremove(queue, i)
break
end
end
queue[identity] = nil
end
-- Reset worker expiry, worker must be present
local function s_worker_refresh(queue, identity)
if queue[identity] then
queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
else
printf("E: worker %s not ready\n", identity)
end
end
-- Pop next available worker off queue, return identity
local function s_worker_dequeue(queue)
assert (#queue > 0)
local identity = tremove(queue, 1)
queue[identity] = nil
return identity
end
-- Look for & kill expired workers
local function s_queue_purge(queue)
local curr_clock = s_clock()
-- Work backwards from end to simplify removal
for i=#queue,1,-1 do
local id = queue[i]
if (curr_clock > queue[id]) then
tremove(queue, i)
queue[id] = nil
end
end
end
s_version_assert (2, 1)
-- Prepare our context and sockets
local context = zmq.init(1)
local frontend = context:socket(zmq.ROUTER)
local backend = context:socket(zmq.ROUTER)
frontend:bind("tcp://*:5555"); -- For clients
backend:bind("tcp://*:5556"); -- For workers
-- Queue of available workers
local queue = {}
local is_accepting = false
-- Send out heartbeats at regular intervals
local heartbeat_at = s_clock() + HEARTBEAT_INTERVAL
local poller = zmq.poller(2)
local function frontend_cb()
-- Now get next client request, route to next worker
local msg = zmsg.recv(frontend)
local identity = s_worker_dequeue (queue)
msg:push(identity)
msg:send(backend)
if (#queue == 0) then
-- stop accepting work from clients, when no workers are available.
poller:remove(frontend)
is_accepting = false
end
end
-- Handle worker activity on backend
poller:add(backend, zmq.POLLIN, function()
local msg = zmsg.recv(backend)
local identity = msg:unwrap()
-- Return reply to client if it's not a control message
if (msg:parts() == 1) then
if (msg:address() == "READY") then
s_worker_delete(queue, identity)
s_worker_append(queue, identity)
elseif (msg:address() == "HEARTBEAT") then
s_worker_refresh(queue, identity)
else
printf("E: invalid message from %s\n", identity)
msg:dump()
end
else
-- reply for client.
msg:send(frontend)
s_worker_append(queue, identity)
end
-- start accepting client requests, if we are not already doing so.
if not is_accepting and #queue > 0 then
is_accepting = true
poller:add(frontend, zmq.POLLIN, frontend_cb)
end
end)
-- start poller's event loop
while true do
local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))
-- Send heartbeats to idle workers if it's time
if (s_clock() > heartbeat_at) then
for i=1,#queue do
local msg = zmsg.new("HEARTBEAT")
msg:wrap(queue[i], nil)
msg:send(backend)
end
heartbeat_at = s_clock() + HEARTBEAT_INTERVAL
end
s_queue_purge(queue)
end
-- We never exit the main loop
-- But pretend to do the right shutdown anyhow
while (#queue > 0) do
s_worker_dequeue(queue)
end
frontend:close()
backend:close()
ppqueue: Paranoid Pirate queue in Node.js
ppqueue: Paranoid Pirate queue in Objective-C
ppqueue: Paranoid Pirate queue in ooc
ppqueue: Paranoid Pirate queue in Perl
ppqueue: Paranoid Pirate queue in PHP
<?php
/*
* Paranoid Pirate queue
*
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
include 'zmsg.php';
define("MAX_WORKERS", 100);
define("HEARTBEAT_LIVENESS", 3); // 3-5 is reasonable
define("HEARTBEAT_INTERVAL", 1); // secs
class Queue_T implements Iterator
{
private $queue = array();
/* Iterator functions */
public function rewind() { return reset($this->queue); }
public function valid() { return current($this->queue); }
public function key() { return key($this->queue); }
public function next() { return next($this->queue); }
public function current() { return current($this->queue); }
/*
* Insert worker at end of queue, reset expiry
* Worker must not already be in queue
*/
public function s_worker_append($identity)
{
if (isset($this->queue[$identity])) {
printf ("E: duplicate worker identity %s", $identity);
} else {
$this->queue[$identity] = microtime(true) + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
}
}
/*
* Remove worker from queue, if present
*/
public function s_worker_delete($identity)
{
unset($this->queue[$identity]);
}
/*
* Reset worker expiry, worker must be present
*/
public function s_worker_refresh($identity)
{
if (!isset($this->queue[$identity])) {
printf ("E: worker %s not ready\n", $identity);
} else {
$this->queue[$identity] = microtime(true) + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
}
}
/*
* Pop next available worker off queue, return identity
*/
public function s_worker_dequeue()
{
reset($this->queue);
$identity = key($this->queue);
unset($this->queue[$identity]);
return $identity;
}
/*
* Look for & kill expired workers
*/
public function s_queue_purge()
{
foreach ($this->queue as $id => $expiry) {
if (microtime(true) > $expiry) {
unset($this->queue[$id]);
}
}
}
/*
* Return the size of the queue
*/
public function size()
{
return count($this->queue);
}
}
// Prepare our context and sockets
$context = new ZMQContext();
$frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
$backend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
$frontend->bind("tcp://*:5555"); // For clients
$backend->bind("tcp://*:5556"); // For workers
$read = $write = array();
// Queue of available workers
$queue = new Queue_T();
// Send out heartbeats at regular intervals
$heartbeat_at = microtime(true) + HEARTBEAT_INTERVAL;
while (true) {
$poll = new ZMQPoll();
$poll->add($backend, ZMQ::POLL_IN);
// Poll frontend only if we have available workers
if ($queue->size()) {
$poll->add($frontend, ZMQ::POLL_IN);
}
$events = $poll->poll($read, $write, HEARTBEAT_INTERVAL * 1000 ); // milliseconds
if ($events > 0) {
foreach ($read as $socket) {
$zmsg = new Zmsg($socket);
$zmsg->recv();
// Handle worker activity on backend
if ($socket === $backend) {
$identity = $zmsg->unwrap();
// Return reply to client if it's not a control message
if ($zmsg->parts() == 1) {
if ($zmsg->address() == "READY") {
$queue->s_worker_delete($identity);
$queue->s_worker_append($identity);
} elseif ($zmsg->address() == 'HEARTBEAT') {
$queue->s_worker_refresh($identity);
} else {
printf ("E: invalid message from %s%s%s", $identity, PHP_EOL, $zmsg->__toString());
}
} else {
$zmsg->set_socket($frontend)->send();
$queue->s_worker_append($identity);
}
} else {
// Now get next client request, route to next worker
$identity = $queue->s_worker_dequeue();
$zmsg->wrap($identity);
$zmsg->set_socket($backend)->send();
}
}
if (microtime(true) > $heartbeat_at) {
foreach ($queue as $id => $expiry) {
$zmsg = new Zmsg($backend);
$zmsg->body_set("HEARTBEAT");
$zmsg->wrap($identity, NULL);
$zmsg->send();
}
$heartbeat_at = microtime(true) + HEARTBEAT_INTERVAL;
}
$queue->s_queue_purge();
}
}
ppqueue: Paranoid Pirate queue in Python
#
## Paranoid Pirate queue
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
from collections import OrderedDict
import time
import zmq
HEARTBEAT_LIVENESS = 3 # 3..5 is reasonable
HEARTBEAT_INTERVAL = 1.0 # Seconds
# Paranoid Pirate Protocol constants
PPP_READY = b"\x01" # Signals worker is ready
PPP_HEARTBEAT = b"\x02" # Signals worker heartbeat
class Worker(object):
def __init__(self, address):
self.address = address
self.expiry = time.time() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
class WorkerQueue(object):
def __init__(self):
self.queue = OrderedDict()
def ready(self, worker):
self.queue.pop(worker.address, None)
self.queue[worker.address] = worker
def purge(self):
"""Look for & kill expired workers."""
t = time.time()
expired = []
for address, worker in self.queue.items():
if t > worker.expiry: # Worker expired
expired.append(address)
for address in expired:
print("W: Idle worker expired: %s" % address)
self.queue.pop(address, None)
def next(self):
address, worker = self.queue.popitem(False)
return address
context = zmq.Context(1)
frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER) # ROUTER
frontend.bind("tcp://*:5555") # For clients
backend.bind("tcp://*:5556") # For workers
poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)
poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)
workers = WorkerQueue()
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
while True:
if len(workers.queue) > 0:
poller = poll_both
else:
poller = poll_workers
socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))
# Handle worker activity on backend
if socks.get(backend) == zmq.POLLIN:
# Use worker address for LRU routing
frames = backend.recv_multipart()
if not frames:
break
address = frames[0]
workers.ready(Worker(address))
# Validate control message, or return reply to client
msg = frames[1:]
if len(msg) == 1:
if msg[0] not in (PPP_READY, PPP_HEARTBEAT):
print("E: Invalid message from worker: %s" % msg)
else:
frontend.send_multipart(msg)
# Send heartbeats to idle workers if it's time
if time.time() >= heartbeat_at:
for worker in workers.queue:
msg = [worker, PPP_HEARTBEAT]
backend.send_multipart(msg)
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
if socks.get(frontend) == zmq.POLLIN:
frames = frontend.recv_multipart()
if not frames:
break
frames.insert(0, workers.next())
backend.send_multipart(frames)
workers.purge()
ppqueue: Paranoid Pirate queue in Q
ppqueue: Paranoid Pirate queue in Racket
ppqueue: Paranoid Pirate queue in Ruby
ppqueue: Paranoid Pirate queue in Rust
ppqueue: Paranoid Pirate queue in Scala
ppqueue: Paranoid Pirate queue in Tcl
#
# Paranoid Pirate queue
#
package require zmq
set HEARTBEAT_LIVENESS 3 ;# 3-5 is reasonable
set HEARTBEAT_INTERVAL 1 ;# secs
# Paranoid Pirate Protocol constants
set PPP_READY "READY" ;# Signals worker is ready
set PPP_HEARTBEAT "HEARTBEAT" ;# Signals worker heartbeat
# This defines one active worker in our worker list
# dict with keys address, identity and expiry
# Construct new worker
proc s_worker_new {address} {
global HEARTBEAT_LIVENESS HEARTBEAT_INTERVAL
return [dict create address $address identity $address expiry [expr {[clock seconds] + $HEARTBEAT_INTERVAL * $HEARTBEAT_LIVENESS}]]
}
# Worker is ready, remove if on list and move to end
proc s_worker_ready {self workersnm} {
upvar $workersnm workers
set nworkers {}
foreach worker $workers {
if {[dict get $self identity] ne [dict get $worker identity]} {
lappend nworkers $worker
}
}
lappend nworkers $self
set workers $nworkers
}
# Return next available worker address
proc s_workers_next {workersnm} {
upvar $workersnm workers
set workers [lassign $workers worker]
return [dict get $worker address]
}
# Look for & kill expired workers. Workers are oldest to most recent,
# so we stop at the first alive worker.
proc s_workers_purge {workersnm} {
upvar $workersnm workers
set nworkers {}
foreach worker $workers {
if {[clock seconds] < [dict get $worker expiry]} {
# Worker is alive
lappend nworkers $worker
}
}
set workers $nworkers
}
set ctx [zmq context context]
zmq socket frontend $ctx ROUTER
zmq socket backend $ctx ROUTER
frontend bind "tcp://*:5555" ;# For clients
backend bind "tcp://*:5556";# For workers
# List of available workers
set workers {}
# Send out heartbeats at regular intervals
set heartbeat_at [expr {[clock seconds] + $HEARTBEAT_INTERVAL}]
while {1} {
if {[llength $workers]} {
set poll_set [list [list backend [list POLLIN]] [list frontend [list POLLIN]]]
} else {
set poll_set [list [list backend [list POLLIN]]]
}
set rpoll_set [zmq poll $poll_set $HEARTBEAT_INTERVAL]
foreach rpoll $rpoll_set {
switch [lindex $rpoll 0] {
backend {
# Handle worker activity on backend
# Use worker address for LRU routing
set msg [zmsg recv backend]
# Any sign of life from worker means it's ready
set address [zmsg unwrap msg]
set worker [s_worker_new $address]
s_worker_ready $worker workers
# Validate control message, or return reply to client
if {[llength $msg] == 1} {
if {[lindex $msg 0] ne $PPP_READY && [lindex $msg 0] ne $PPP_HEARTBEAT} {
puts "E: invalid message from worker"
zmsg dump $msg
}
} else {
zmsg send frontend $msg
}
}
frontend {
# Now get next client request, route to next worker
set msg [zmsg recv frontend]
set msg [zmsg push $msg [s_workers_next workers]]
zmsg send backend $msg
}
}
}
# Send heartbeats to idle workers if it's time
if {[clock seconds] >= $heartbeat_at} {
puts "I: heartbeat ([llength $workers])"
foreach worker $workers {
backend sendmore [dict get $worker address]
backend send $PPP_HEARTBEAT
}
set heartbeat_at [expr {[clock seconds] + $HEARTBEAT_INTERVAL}]
}
s_workers_purge workers
}
frontend close
backend close
$ctx term
ppqueue: Paranoid Pirate queue in OCaml
The queue extends the load balancing pattern with heartbeating of workers. Heartbeating is one of those “simple” things that can be difficult to get right. I’ll explain more about that in a second.
Here is the Paranoid Pirate worker:
ppworker: Paranoid Pirate worker in Ada
ppworker: Paranoid Pirate worker in Basic
ppworker: Paranoid Pirate worker in C
// Paranoid Pirate worker
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
#define INTERVAL_INIT 1000 // Initial reconnect
#define INTERVAL_MAX 32000 // After exponential backoff
// Paranoid Pirate Protocol constants
#define PPP_READY "\001" // Signals worker is ready
#define PPP_HEARTBEAT "\002" // Signals worker heartbeat
// Helper function that returns a new configured socket
// connected to the Paranoid Pirate queue
static void *
s_worker_socket (zctx_t *ctx) {
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (worker, "tcp://localhost:5556");
// Tell queue we're ready for work
printf ("I: worker ready\n");
zframe_t *frame = zframe_new (PPP_READY, 1);
zframe_send (&frame, worker, 0);
return worker;
}
// .split main task
// We have a single task that implements the worker side of the
// Paranoid Pirate Protocol (PPP). The interesting parts here are
// the heartbeating, which lets the worker detect if the queue has
// died, and vice versa:
int main (void)
{
zctx_t *ctx = zctx_new ();
void *worker = s_worker_socket (ctx);
// If liveness hits zero, queue is considered disconnected
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;
// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
srandom ((unsigned) time (NULL));
int cycles = 0;
while (true) {
zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted
if (items [0].revents & ZMQ_POLLIN) {
// Get message
// - 3-part envelope + content -> request
// - 1-part HEARTBEAT -> heartbeat
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted
// .split simulating problems
// To test the robustness of the queue implementation we
// simulate various typical problems, such as the worker
// crashing or running very slowly. We do this after a few
// cycles so that the architecture can get up and running
// first:
if (zmsg_size (msg) == 3) {
cycles++;
if (cycles > 3 && randof (5) == 0) {
printf ("I: simulating a crash\n");
zmsg_destroy (&msg);
break;
}
else
if (cycles > 3 && randof (5) == 0) {
printf ("I: simulating CPU overload\n");
sleep (3);
if (zctx_interrupted)
break;
}
printf ("I: normal reply\n");
zmsg_send (&msg, worker);
liveness = HEARTBEAT_LIVENESS;
sleep (1); // Do some heavy work
if (zctx_interrupted)
break;
}
else
// .split handle heartbeats
// When we get a heartbeat message from the queue, it means the
// queue was (recently) alive, so we must reset our liveness
// indicator:
if (zmsg_size (msg) == 1) {
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), PPP_HEARTBEAT, 1) == 0)
liveness = HEARTBEAT_LIVENESS;
else {
printf ("E: invalid message\n");
zmsg_dump (msg);
}
zmsg_destroy (&msg);
}
else {
printf ("E: invalid message\n");
zmsg_dump (msg);
}
interval = INTERVAL_INIT;
}
else
// .split detecting a dead queue
// If the queue hasn't sent us heartbeats in a while, destroy the
// socket and reconnect. This is the simplest most brutal way of
// discarding any messages we might have sent in the meantime:
if (--liveness == 0) {
printf ("W: heartbeat failure, can't reach queue\n");
printf ("W: reconnecting in %zd msec...\n", interval);
zclock_sleep (interval);
if (interval < INTERVAL_MAX)
interval *= 2;
zsocket_destroy (ctx, worker);
worker = s_worker_socket (ctx);
liveness = HEARTBEAT_LIVENESS;
}
// Send heartbeat to queue if it's time
if (zclock_time () > heartbeat_at) {
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
printf ("I: worker heartbeat\n");
zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
zframe_send (&frame, worker, 0);
}
}
zctx_destroy (&ctx);
return 0;
}
ppworker: Paranoid Pirate worker in C++
//
// Paranoid Pirate worker
//
//
// Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at>
//
#include "zmsg.hpp"
#include <iomanip>
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
#define INTERVAL_INIT 1000 // Initial reconnect
#define INTERVAL_MAX 32000 // After exponential backoff
// Helper function that returns a new configured socket
// connected to the Hello World server
//
std::string identity;
static zmq::socket_t *
s_worker_socket (zmq::context_t &context) {
zmq::socket_t * worker = new zmq::socket_t(context, ZMQ_DEALER);
// Set random identity to make tracing easier
identity = s_set_id(*worker);
worker->connect ("tcp://localhost:5556");
// Configure socket to not wait at close time
int linger = 0;
worker->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
// Tell queue we're ready for work
std::cout << "I: (" << identity << ") worker ready" << std::endl;
s_send (*worker, std::string("READY"));
return worker;
}
int main (void)
{
s_version_assert (4, 0);
srandom ((unsigned) time (NULL));
zmq::context_t context (1);
zmq::socket_t * worker = s_worker_socket (context);
// If liveness hits zero, queue is considered disconnected
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;
// Send out heartbeats at regular intervals
int64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
int cycles = 0;
while (1) {
zmq::pollitem_t items[] = {
{static_cast<void*>(*worker), 0, ZMQ_POLLIN, 0 } };
zmq::poll (items, 1, HEARTBEAT_INTERVAL);
if (items [0].revents & ZMQ_POLLIN) {
// Get message
// - 3-part envelope + content -> request
// - 1-part "HEARTBEAT" -> heartbeat
zmsg msg (*worker);
if (msg.parts () == 3) {
// Simulate various problems, after a few cycles
cycles++;
if (cycles > 3 && within (5) == 0) {
std::cout << "I: (" << identity << ") simulating a crash" << std::endl;
msg.clear ();
break;
}
else {
if (cycles > 3 && within (5) == 0) {
std::cout << "I: (" << identity << ") simulating CPU overload" << std::endl;
sleep (5);
}
}
std::cout << "I: (" << identity << ") normal reply - " << msg.body() << std::endl;
msg.send (*worker);
liveness = HEARTBEAT_LIVENESS;
sleep (1); // Do some heavy work
}
else {
if (msg.parts () == 1
&& strcmp (msg.body (), "HEARTBEAT") == 0) {
liveness = HEARTBEAT_LIVENESS;
}
else {
std::cout << "E: (" << identity << ") invalid message" << std::endl;
msg.dump ();
}
}
interval = INTERVAL_INIT;
}
else
if (--liveness == 0) {
std::cout << "W: (" << identity << ") heartbeat failure, can't reach queue" << std::endl;
std::cout << "W: (" << identity << ") reconnecting in " << interval << " msec..." << std::endl;
s_sleep (interval);
if (interval < INTERVAL_MAX) {
interval *= 2;
}
delete worker;
worker = s_worker_socket (context);
liveness = HEARTBEAT_LIVENESS;
}
// Send heartbeat to queue if it's time
if (s_clock () > heartbeat_at) {
heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
std::cout << "I: (" << identity << ") worker heartbeat" << std::endl;
s_send (*worker, std::string("HEARTBEAT"));
}
}
delete worker;
return 0;
}
ppworker: Paranoid Pirate worker in C#
ppworker: Paranoid Pirate worker in CL
ppworker: Paranoid Pirate worker in Delphi
ppworker: Paranoid Pirate worker in Erlang
ppworker: Paranoid Pirate worker in Elixir
ppworker: Paranoid Pirate worker in F#
ppworker: Paranoid Pirate worker in Felix
ppworker: Paranoid Pirate worker in Go
// Paranoid Pirate worker
//
// Author: iano <scaly.iano@gmail.com>
// Based on C & Python example
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq"
"math/rand"
"time"
)
const (
HEARTBEAT_INTERVAL = time.Second // time.Duration
INTERVAL_INIT = time.Second // Initial reconnect
INTERVAL_MAX = 32 * time.Second // After exponential backoff
// Paranoid Pirate Protocol constants
PPP_READY = "\001" // Signals worker is ready
PPP_HEARTBEAT = "\002" // Signals worker heartbeat
)
// Helper function that returns a new configured socket
// connected to the Paranoid Pirate queue
func WorkerSocket(context *zmq.Context) *zmq.Socket {
worker, _ := context.NewSocket(zmq.DEALER)
worker.Connect("tcp://localhost:5556")
// Tell queue we're ready for work
fmt.Println("I: worker ready")
worker.Send([]byte(PPP_READY), 0)
return worker
}
// .split main task
// We have a single task, which implements the worker side of the
// Paranoid Pirate Protocol (PPP). The interesting parts here are
// the heartbeating, which lets the worker detect if the queue has
// died, and vice-versa:
func main() {
src := rand.NewSource(time.Now().UnixNano())
random := rand.New(src)
context, _ := zmq.NewContext()
defer context.Close()
worker := WorkerSocket(context)
liveness := HEARTBEAT_LIVENESS
interval := INTERVAL_INIT
heartbeatAt := time.Now().Add(HEARTBEAT_INTERVAL)
cycles := 0
for {
items := zmq.PollItems{
zmq.PollItem{Socket: worker, Events: zmq.POLLIN},
}
zmq.Poll(items, HEARTBEAT_INTERVAL)
if items[0].REvents&zmq.POLLIN != 0 {
frames, err := worker.RecvMultipart(0)
if err != nil {
panic(err)
}
if len(frames) == 3 {
cycles++
if cycles > 3 {
switch r := random.Intn(5); r {
case 0:
fmt.Println("I: Simulating a crash")
worker.Close()
return
case 1:
fmt.Println("I: Simulating CPU overload")
time.Sleep(3 * time.Second)
}
}
fmt.Println("I: Normal reply")
worker.SendMultipart(frames, 0)
liveness = HEARTBEAT_LIVENESS
time.Sleep(1 * time.Second)
} else if len(frames) == 1 && string(frames[0]) == PPP_HEARTBEAT {
fmt.Println("I: Queue heartbeat")
liveness = HEARTBEAT_LIVENESS
} else {
fmt.Println("E: Invalid message")
}
interval = INTERVAL_INIT
} else if liveness--; liveness == 0 {
fmt.Println("W: Heartbeat failure, can't reach queue")
fmt.Printf("W: Reconnecting in %ds...\n", interval/time.Second)
time.Sleep(interval)
if interval < INTERVAL_MAX {
interval *= 2
}
worker.Close()
worker = WorkerSocket(context)
liveness = HEARTBEAT_LIVENESS
}
if heartbeatAt.Before(time.Now()) {
heartbeatAt = time.Now().Add(HEARTBEAT_INTERVAL)
worker.Send([]byte(PPP_HEARTBEAT), 0)
}
}
}
ppworker: Paranoid Pirate worker in Haskell
{-
Paranoid Pirate worker in Haskell.
Uses heartbeating to detect crashed queue.
-}
module Main where
import System.ZMQ4.Monadic
import ZHelpers
import System.Random (randomRIO)
import System.Exit (exitSuccess)
import System.IO (hSetEncoding, stdout, utf8)
import Control.Monad (when)
import Control.Concurrent (threadDelay)
import Data.ByteString.Char8 (pack, unpack, empty)
heartbeatLiveness = 3
heartbeatInterval_ms = 1000 :: Integer
reconnectIntervalInit = 1000
reconnectIntervalLimit = 32000
pppReady = pack "\001"
pppHeartbeat = pack "\002"
createWorkerSocket :: ZMQ z (Socket z Dealer)
createWorkerSocket = do
worker <- socket Dealer
connect worker "tcp://localhost:5556"
liftIO $ putStrLn "I: worker ready"
send worker [] pppReady
return worker
main :: IO ()
main =
runZMQ $ do
worker <- createWorkerSocket
heartbeatAt <- liftIO $ nextHeartbeatTime_ms heartbeatInterval_ms
liftIO $ hSetEncoding stdout utf8
pollWorker worker heartbeatAt heartbeatLiveness reconnectIntervalInit 0
pollWorker :: Socket z Dealer -> Integer -> Integer -> Integer -> Int -> ZMQ z ()
pollWorker worker heartbeat liveness reconnectInterval cycles = do
[evts] <- poll (fromInteger heartbeatInterval_ms) [Sock worker [In] Nothing]
if In `elem` evts
then do
frame <- receiveMulti worker
if length frame == 4 -- Handle normal message
then do
chance <- liftIO $ randomRIO (0::Int, 5)
if cycles > 3 && chance == 0
then do
liftIO $ putStrLn "I: Simulating a crash"
liftIO $ exitSuccess
else do
chance' <- liftIO $ randomRIO (0::Int, 5)
when (cycles > 3 && chance' == 0) $ do
liftIO $ putStrLn "I: Simulating CPU overload"
liftIO $ threadDelay $ 3 * 1000 * 1000
let id = frame !! 1
info = frame !! 3
liftIO $ putStrLn "I: Normal reply"
send worker [SendMore] id
send worker [SendMore] empty
send worker [] info
liftIO $ threadDelay $ 1 * 1000 * 1000
pollWorker worker heartbeat heartbeatLiveness reconnectIntervalInit (cycles+1)
else if length frame == 2 -- Handle heartbeat or eventual invalid message
then do
let msg = frame !! 1
if msg == pppHeartbeat
then pollWorker worker heartbeat heartbeatLiveness reconnectIntervalInit cycles
else do
liftIO $ putStrLn "E: invalid message"
pollWorker worker heartbeat liveness reconnectIntervalInit cycles
else do
liftIO $ putStrLn "E: invalid message"
pollWorker worker heartbeat liveness reconnectIntervalInit cycles
else do
when (liveness == 0) $ do -- Try to reconnect
liftIO $ putStrLn "W: heartbeat failure, can't reach queue"
liftIO $ putStrLn $ "W: reconnecting in " ++ (show reconnectInterval) ++ " msec..."
liftIO $ threadDelay $ (fromInteger reconnectInterval) * 1000
worker' <- createWorkerSocket
if (reconnectInterval < reconnectIntervalLimit)
then pollWorker worker' heartbeat heartbeatLiveness (reconnectInterval * 2) cycles
else pollWorker worker' heartbeat heartbeatLiveness reconnectInterval cycles
currTime <- liftIO $ currentTime_ms -- Send heartbeat
when (currTime > heartbeat) $ do
liftIO $ putStrLn "I: worker heartbeat"
send worker [] pppHeartbeat
newHeartbeat <- liftIO $ nextHeartbeatTime_ms heartbeat
pollWorker worker newHeartbeat (liveness-1) reconnectInterval cycles
pollWorker worker heartbeat (liveness-1) reconnectInterval cycles
ppworker: Paranoid Pirate worker in Haxe
package ;
import haxe.Stack;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQException;
import org.zeromq.ZMsg;
import org.zeromq.ZSocket;
/**
* Paranoid Pirate worker
*
* @see http://zguide.zeromq.org/page:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern
*
* Author: rsmith (at) rsbatechnology (dot) co (dot) uk
*/
class PPWorker
{
private static inline var HEARTBEAT_LIVENESS = 3;
private static inline var HEARTBEAT_INTERVAL = 1000; // msecs
private static inline var INTERVAL_INIT = 1000; // Initial reconnect
private static inline var INTERVAL_MAX = 32000; // After exponential backoff
private static inline var PPP_READY = String.fromCharCode(1);
private static inline var PPP_HEARTBEAT = String.fromCharCode(2);
/**
* Helper function that returns a new configured socket
* connected to the Paranid Pirate queue
* @param ctx
* @return
*/
private static function workerSocket(ctx:ZContext):ZMQSocket {
var worker = ctx.createSocket(ZMQ_DEALER);
worker.connect("tcp://localhost:5556");
// Tell queue we're ready for work
Lib.println("I: worker ready");
ZFrame.newStringFrame(PPP_READY).send(worker);
return worker;
}
public static function main() {
Lib.println("** PPWorker (see: http://zguide.zeromq.org/page:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern)");
var ctx = new ZContext();
var worker = workerSocket(ctx);
// If liveness hits zero, queue is considered disconnected
var liveness = HEARTBEAT_LIVENESS;
var interval = INTERVAL_INIT;
// Send out heartbeats at regular intervals
var heartbeatAt = Date.now().getTime() + HEARTBEAT_INTERVAL;
var cycles = 0;
var poller = new ZMQPoller();
poller.registerSocket(worker, ZMQ.ZMQ_POLLIN());
while (true) {
try {
poller.poll(HEARTBEAT_INTERVAL * 1000);
} catch (e:ZMQException) {
if (ZMQ.isInterrupted())
break;
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
if (poller.pollin(1)) {
// Get message
// - 3-part envelope + content -> request
// - 1-part HEARTBEAT -> heartbeat
var msg = ZMsg.recvMsg(worker);
if (msg == null)
break; // Interrupted
if (msg.size() == 3) {
// Simulate various problems, after a few cycles
cycles++;
if (cycles > 3 && ZHelpers.randof(5) == 0) {
Lib.println("I: simulating a crash");
msg.destroy();
break;
} else if (cycles > 3 && ZHelpers.randof(5) == 0) {
Lib.println("I: simulating CPU overload");
Sys.sleep(3.0);
if (ZMQ.isInterrupted())
break;
}
Lib.println("I: normal reply");
msg.send(worker);
liveness = HEARTBEAT_LIVENESS;
Sys.sleep(1.0); // Do some heavy work
if (ZMQ.isInterrupted())
break;
} else if (msg.size() == 1) {
var frame = msg.first();
if (frame.streq(PPP_HEARTBEAT))
liveness = HEARTBEAT_LIVENESS;
else {
Lib.println("E: invalid message");
Lib.println(msg.toString());
}
msg.destroy();
} else {
Lib.println("E: invalid message");
Lib.println(msg.toString());
}
interval = INTERVAL_INIT;
} else if (--liveness == 0) {
Lib.println("W: heartbeat failure, can't reach queue");
Lib.println("W: reconnecting in " + interval + " msec...");
Sys.sleep(interval / 1000.0);
if (interval < INTERVAL_MAX)
interval *= 2;
ctx.destroySocket(worker);
worker = workerSocket(ctx);
poller.unregisterAllSockets();
poller.registerSocket(worker, ZMQ.ZMQ_POLLIN());
liveness = HEARTBEAT_LIVENESS;
}
// Send heartbeat to queue if it's time
if (Date.now().getTime() > heartbeatAt) {
heartbeatAt = Date.now().getTime() + HEARTBEAT_INTERVAL;
Lib.println("I: worker heartbeat");
ZFrame.newStringFrame(PPP_HEARTBEAT).send(worker);
}
}
ctx.destroy();
}
}
ppworker: Paranoid Pirate worker in Java
package guide;
import java.util.Random;
import org.zeromq.*;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
//
// Paranoid Pirate worker
//
public class ppworker
{
private final static int HEARTBEAT_LIVENESS = 3; // 3-5 is reasonable
private final static int HEARTBEAT_INTERVAL = 1000; // msecs
private final static int INTERVAL_INIT = 1000; // Initial reconnect
private final static int INTERVAL_MAX = 32000; // After exponential backoff
// Paranoid Pirate Protocol constants
private final static String PPP_READY = "\001"; // Signals worker is ready
private final static String PPP_HEARTBEAT = "\002"; // Signals worker heartbeat
// Helper function that returns a new configured socket
// connected to the Paranoid Pirate queue
private static Socket worker_socket(ZContext ctx)
{
Socket worker = ctx.createSocket(SocketType.DEALER);
worker.connect("tcp://localhost:5556");
// Tell queue we're ready for work
System.out.println("I: worker ready\n");
ZFrame frame = new ZFrame(PPP_READY);
frame.send(worker, 0);
return worker;
}
// We have a single task, which implements the worker side of the
// Paranoid Pirate Protocol (PPP). The interesting parts here are
// the heartbeating, which lets the worker detect if the queue has
// died, and vice-versa:
public static void main(String[] args)
{
try (ZContext ctx = new ZContext()) {
Socket worker = worker_socket(ctx);
Poller poller = ctx.createPoller(1);
poller.register(worker, Poller.POLLIN);
// If liveness hits zero, queue is considered disconnected
int liveness = HEARTBEAT_LIVENESS;
int interval = INTERVAL_INIT;
// Send out heartbeats at regular intervals
long heartbeat_at = System.currentTimeMillis() + HEARTBEAT_INTERVAL;
Random rand = new Random(System.nanoTime());
int cycles = 0;
while (true) {
int rc = poller.poll(HEARTBEAT_INTERVAL);
if (rc == -1)
break; // Interrupted
if (poller.pollin(0)) {
// Get message
// - 3-part envelope + content -> request
// - 1-part HEARTBEAT -> heartbeat
ZMsg msg = ZMsg.recvMsg(worker);
if (msg == null)
break; // Interrupted
// To test the robustness of the queue implementation we
// simulate various typical problems, such as the worker
// crashing, or running very slowly. We do this after a few
// cycles so that the architecture can get up and running
// first:
if (msg.size() == 3) {
cycles++;
if (cycles > 3 && rand.nextInt(5) == 0) {
System.out.println("I: simulating a crash\n");
msg.destroy();
msg = null;
break;
}
else if (cycles > 3 && rand.nextInt(5) == 0) {
System.out.println("I: simulating CPU overload\n");
try {
Thread.sleep(3000);
}
catch (InterruptedException e) {
break;
}
}
System.out.println("I: normal reply\n");
msg.send(worker);
liveness = HEARTBEAT_LIVENESS;
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
break;
} // Do some heavy work
}
else
// When we get a heartbeat message from the queue, it
// means the queue was (recently) alive, so reset our
// liveness indicator:
if (msg.size() == 1) {
ZFrame frame = msg.getFirst();
String frameData = new String(
frame.getData(), ZMQ.CHARSET
);
if (PPP_HEARTBEAT.equals(frameData))
liveness = HEARTBEAT_LIVENESS;
else {
System.out.println("E: invalid message\n");
msg.dump(System.out);
}
msg.destroy();
}
else {
System.out.println("E: invalid message\n");
msg.dump(System.out);
}
interval = INTERVAL_INIT;
}
else
// If the queue hasn't sent us heartbeats in a while,
// destroy the socket and reconnect. This is the simplest
// most brutal way of discarding any messages we might have
// sent in the meantime.
if (--liveness == 0) {
System.out.println(
"W: heartbeat failure, can't reach queue\n"
);
System.out.printf(
"W: reconnecting in %sd msec\n", interval
);
try {
Thread.sleep(interval);
}
catch (InterruptedException e) {
e.printStackTrace();
}
if (interval < INTERVAL_MAX)
interval *= 2;
ctx.destroySocket(worker);
worker = worker_socket(ctx);
liveness = HEARTBEAT_LIVENESS;
}
// Send heartbeat to queue if it's time
if (System.currentTimeMillis() > heartbeat_at) {
long now = System.currentTimeMillis();
heartbeat_at = now + HEARTBEAT_INTERVAL;
System.out.println("I: worker heartbeat\n");
ZFrame frame = new ZFrame(PPP_HEARTBEAT);
frame.send(worker, 0);
}
}
}
}
}
ppworker: Paranoid Pirate worker in Julia
ppworker: Paranoid Pirate worker in Lua
--
-- Paranoid Pirate worker
--
-- Author: Robert G. Jakabosky <bobby@sharedrealm.com>
--
require"zmq"
require"zmq.poller"
require"zmsg"
local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable
local HEARTBEAT_INTERVAL = 1000 -- msecs
local INTERVAL_INIT = 1000 -- Initial reconnect
local INTERVAL_MAX = 32000 -- After exponential backoff
-- Helper function that returns a new configured socket
-- connected to the Hello World server
--
local identity
local function s_worker_socket (context)
local worker = context:socket(zmq.DEALER)
-- Set random identity to make tracing easier
identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
worker:setopt(zmq.IDENTITY, identity)
worker:connect("tcp://localhost:5556")
-- Configure socket to not wait at close time
worker:setopt(zmq.LINGER, 0)
-- Tell queue we're ready for work
printf("I: (%s) worker ready\n", identity)
worker:send("READY")
return worker
end
s_version_assert (2, 1)
math.randomseed(os.time())
local context = zmq.init(1)
local worker = s_worker_socket (context)
-- If liveness hits zero, queue is considered disconnected
local liveness = HEARTBEAT_LIVENESS
local interval = INTERVAL_INIT
-- Send out heartbeats at regular intervals
local heartbeat_at = s_clock () + HEARTBEAT_INTERVAL
local poller = zmq.poller(1)
local is_running = true
local cycles = 0
local function worker_cb()
-- Get message
-- - 3-part envelope + content -> request
-- - 1-part "HEARTBEAT" -> heartbeat
local msg = zmsg.recv (worker)
if (msg:parts() == 3) then
-- Simulate various problems, after a few cycles
cycles = cycles + 1
if (cycles > 3 and randof (5) == 0) then
printf ("I: (%s) simulating a crash\n", identity)
is_running = false
return
elseif (cycles > 3 and randof (5) == 0) then
printf ("I: (%s) simulating CPU overload\n",
identity)
s_sleep (5000)
end
printf ("I: (%s) normal reply - %s\n",
identity, msg:body())
msg:send(worker)
liveness = HEARTBEAT_LIVENESS
s_sleep(1000); -- Do some heavy work
elseif (msg:parts() == 1 and msg:body() == "HEARTBEAT") then
liveness = HEARTBEAT_LIVENESS
else
printf ("E: (%s) invalid message\n", identity)
msg:dump()
end
interval = INTERVAL_INIT
end
poller:add(worker, zmq.POLLIN, worker_cb)
while is_running do
local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))
if (cnt == 0) then
liveness = liveness - 1
if (liveness == 0) then
printf ("W: (%s) heartbeat failure, can't reach queue\n",
identity)
printf ("W: (%s) reconnecting in %d msec...\n",
identity, interval)
s_sleep (interval)
if (interval < INTERVAL_MAX) then
interval = interval * 2
end
poller:remove(worker)
worker:close()
worker = s_worker_socket (context)
poller:add(worker, zmq.POLLIN, worker_cb)
liveness = HEARTBEAT_LIVENESS
end
end
-- Send heartbeat to queue if it's time
if (s_clock () > heartbeat_at) then
heartbeat_at = s_clock () + HEARTBEAT_INTERVAL
printf("I: (%s) worker heartbeat\n", identity)
worker:send("HEARTBEAT")
end
end
worker:close()
context:term()
ppworker: Paranoid Pirate worker in Node.js
ppworker: Paranoid Pirate worker in Objective-C
ppworker: Paranoid Pirate worker in ooc
ppworker: Paranoid Pirate worker in Perl
ppworker: Paranoid Pirate worker in PHP
<?php
/*
* Paranoid Pirate worker
*
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
include 'zmsg.php';
define("HEARTBEAT_LIVENESS", 3); // 3-5 is reasonable
define("HEARTBEAT_INTERVAL", 1); // secs
define("INTERVAL_INIT", 1000); // Initial reconnect
define("INTERVAL_MAX", 32000); // After exponential backoff
/*
* Helper function that returns a new configured socket
* connected to the Hello World server
*/
function s_worker_socket($context)
{
$worker = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
// Set random identity to make tracing easier
$identity = sprintf ("%04X-%04X", rand(0, 0x10000), rand(0, 0x10000));
$worker->setSockOpt(ZMQ::SOCKOPT_IDENTITY, $identity);
$worker->connect("tcp://localhost:5556");
// Configure socket to not wait at close time
$worker->setSockOpt(ZMQ::SOCKOPT_LINGER, 0);
// Tell queue we're ready for work
printf ("I: (%s) worker ready%s", $identity, PHP_EOL);
$worker->send("READY");
return array($worker, $identity);
}
$context = new ZMQContext();
list($worker, $identity) = s_worker_socket($context);
// If liveness hits zero, queue is considered disconnected
$liveness = HEARTBEAT_LIVENESS;
$interval = INTERVAL_INIT;
// Send out heartbeats at regular intervals
$heartbeat_at = microtime(true) + HEARTBEAT_INTERVAL;
$read = $write = array();
$cycles = 0;
while (true) {
$poll = new ZMQPoll();
$poll->add($worker, ZMQ::POLL_IN);
$events = $poll->poll($read, $write, HEARTBEAT_INTERVAL * 1000);
if ($events) {
// Get message
// - 3-part envelope + content -> request
// - 1-part "HEARTBEAT" -> heartbeat
$zmsg = new Zmsg($worker);
$zmsg->recv();
if ($zmsg->parts() == 3) {
// Simulate various problems, after a few cycles
$cycles++;
if ($cycles > 3 && rand(0, 5) == 0) {
printf ("I: (%s) simulating a crash%s", $identity, PHP_EOL);
break;
} elseif ($cycles > 3 && rand(0, 5) == 0) {
printf ("I: (%s) simulating CPU overload%s", $identity, PHP_EOL);
sleep(5);
}
printf ("I: (%s) normal reply - %s%s", $identity, $zmsg->body(), PHP_EOL);
$zmsg->send();
$liveness = HEARTBEAT_LIVENESS;
sleep(1); // Do some heavy work
} elseif ($zmsg->parts() == 1 && $zmsg->body() == 'HEARTBEAT') {
$liveness = HEARTBEAT_LIVENESS;
} else {
printf ("E: (%s) invalid message%s%s", $identity, PHP_EOL, $zmsg->__toString());
}
$interval = INTERVAL_INIT;
} elseif (--$liveness == 0) {
printf ("W: (%s) heartbeat failure, can't reach queue%s", $identity, PHP_EOL);
printf ("W: (%s) reconnecting in %d msec...%s", $identity, $interval, PHP_EOL);
usleep ($interval * 1000);
if ($interval < INTERVAL_MAX) {
$interval *= 2;
}
list($worker, $identity) = s_worker_socket ($context);
$liveness = HEARTBEAT_LIVENESS;
}
// Send heartbeat to queue if it's time
if (microtime(true) > $heartbeat_at) {
$heartbeat_at = microtime(true) + HEARTBEAT_INTERVAL;
printf ("I: (%s) worker heartbeat%s", $identity, PHP_EOL);
$worker->send("HEARTBEAT");
}
}
ppworker: Paranoid Pirate worker in Python
#
## Paranoid Pirate worker
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
from random import randint
import time
import zmq
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1
INTERVAL_INIT = 1
INTERVAL_MAX = 32
# Paranoid Pirate Protocol constants
PPP_READY = b"\x01" # Signals worker is ready
PPP_HEARTBEAT = b"\x02" # Signals worker heartbeat
def worker_socket(context, poller):
"""Helper function that returns a new configured socket
connected to the Paranoid Pirate queue"""
worker = context.socket(zmq.DEALER) # DEALER
identity = b"%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
worker.setsockopt(zmq.IDENTITY, identity)
poller.register(worker, zmq.POLLIN)
worker.connect("tcp://localhost:5556")
worker.send(PPP_READY)
return worker
context = zmq.Context(1)
poller = zmq.Poller()
liveness = HEARTBEAT_LIVENESS
interval = INTERVAL_INIT
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
worker = worker_socket(context, poller)
cycles = 0
while True:
socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))
# Handle worker activity on backend
if socks.get(worker) == zmq.POLLIN:
# Get message
# - 3-part envelope + content -> request
# - 1-part HEARTBEAT -> heartbeat
frames = worker.recv_multipart()
if not frames:
break # Interrupted
if len(frames) == 3:
# Simulate various problems, after a few cycles
cycles += 1
if cycles > 3 and randint(0, 5) == 0:
print("I: Simulating a crash")
break
if cycles > 3 and randint(0, 5) == 0:
print("I: Simulating CPU overload")
time.sleep(3)
print("I: Normal reply")
worker.send_multipart(frames)
liveness = HEARTBEAT_LIVENESS
time.sleep(1) # Do some heavy work
elif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:
print("I: Queue heartbeat")
liveness = HEARTBEAT_LIVENESS
else:
print("E: Invalid message: %s" % frames)
interval = INTERVAL_INIT
else:
liveness -= 1
if liveness == 0:
print("W: Heartbeat failure, can't reach queue")
print("W: Reconnecting in %0.2fs..." % interval)
time.sleep(interval)
if interval < INTERVAL_MAX:
interval *= 2
poller.unregister(worker)
worker.setsockopt(zmq.LINGER, 0)
worker.close()
worker = worker_socket(context, poller)
liveness = HEARTBEAT_LIVENESS
if time.time() > heartbeat_at:
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
print("I: Worker heartbeat")
worker.send(PPP_HEARTBEAT)
ppworker: Paranoid Pirate worker in Q
ppworker: Paranoid Pirate worker in Racket
ppworker: Paranoid Pirate worker in Ruby
ppworker: Paranoid Pirate worker in Rust
ppworker: Paranoid Pirate worker in Scala
ppworker: Paranoid Pirate worker in Tcl
#
# Paranoid Pirate worker
#
package require zmq
set HEARTBEAT_LIVENESS 3 ;# 3-5 is reasonable
set HEARTBEAT_INTERVAL 1000 ;# msecs
set INTERVAL_INIT 1000 ;# Initial reconnect
set INTERVAL_MAX 32000 ;# After exponential backoff
# Paranoid Pirate Protocol constants
set PPP_READY "READY" ;# Signals worker is ready
set PPP_HEARTBEAT "HEARTBEAT" ;# Signals worker heartbeat
expr {srand([pid])}
# Helper function that returns a new configured socket
# connected to the Paranoid Pirate queue
proc s_worker_socket {ctx} {
global PPP_READY
set worker [zmq socket worker $ctx DEALER]
$worker connect "tcp://localhost:5556"
# Tell queue we're ready for work
puts "I: worker ready"
$worker send $PPP_READY
return $worker
}
set ctx [zmq context context]
set worker [s_worker_socket $ctx]
# If liveness hits zero, queue is considered disconnected
set liveness $HEARTBEAT_LIVENESS
set interval $INTERVAL_INIT
# Send out heartbeats at regular intervals
set heartbeat_at [expr {[clock seconds] + $HEARTBEAT_INTERVAL}]
set cycles 0
while {1} {
set poll_set [list [list $worker [list POLLIN]]]
set rpoll_set [zmq poll $poll_set $HEARTBEAT_INTERVAL]
if {[llength $rpoll_set] && "POLLIN" in [lindex $rpoll_set 0 1]} {
# Get message
# - 3-part envelope + content -> request
# - 1-part HEARTBEAT -> heartbeat
set msg [zmsg recv $worker]
if {[llength $msg] == 3} {
# Simulate various problems, after a few cycles
incr cycles
if {$cycles > 3 && [expr {int(rand()*5)}] == 0} {
puts "I: simulating a crash"
break
} elseif {$cycles > 3 && [expr {int(rand()*5)}] == 0} {
puts "I: simulating CPU overload"
after 3000
}
puts "I: normal reply"
zmsg send $worker $msg
set liveness $HEARTBEAT_LIVENESS
after 1000 ;# Do some heavy work
} elseif {[llength $msg] == 1} {
if {[lindex $msg 0] eq $PPP_HEARTBEAT} {
puts "I: heartbeat"
set liveness $HEARTBEAT_LIVENESS
} else {
puts "E: invalid message"
zmsg dump $msg
}
} else {
puts "E: invalid message"
zmsg dump $msg
}
set interval $INTERVAL_INIT
} elseif {[incr liveness -1] == 0} {
puts "W: heartbeat failure, can't reach queue"
puts "W: reconnecting in $interval msec..."
after $interval
if {$interval < $INTERVAL_MAX} {
set interval [expr {$interval * 2}]
}
$worker close
set worker [s_worker_socket $ctx]
set liveness $HEARTBEAT_LIVENESS
}
# Send heartbeat to queue if it's time
if {[clock seconds] > $heartbeat_at} {
set heartbeat_at [expr {[clock seconds] + $HEARTBEAT_INTERVAL}]
puts "I: worker heartbeat"
$worker send $PPP_HEARTBEAT
}
}
$worker close
$ctx term
ppworker: Paranoid Pirate worker in OCaml
Some comments about this example:
-
The code includes simulation of failures, as before. This makes it (a) very hard to debug, and (b) dangerous to reuse. When you want to debug this, disable the failure simulation.
-
The worker uses a reconnect strategy similar to the one we designed for the Lazy Pirate client, with two major differences: (a) it does an exponential back-off, and (b) it retries indefinitely (whereas the client retries a few times before reporting a failure).
Try the client, queue, and workers, such as by using a script like this:
ppqueue &
for i in 1 2 3 4; do
ppworker &
sleep 1
done
lpclient &
You should see the workers die one-by-one as they simulate a crash, and the client eventually give up. You can stop and restart the queue and both client and workers will reconnect and carry on. And no matter what you do to queues and workers, the client will never get an out-of-order reply: the whole chain either works, or the client abandons.
Heartbeating #
Heartbeating solves the problem of knowing whether a peer is alive or dead. This is not an issue specific to ZeroMQ. TCP has a long timeout (30 minutes or so), that means that it can be impossible to know whether a peer has died, been disconnected, or gone on a weekend to Prague with a case of vodka, a redhead, and a large expense account.
It’s not easy to get heartbeating right. When writing the Paranoid Pirate examples, it took about five hours to get the heartbeating working properly. The rest of the request-reply chain took perhaps ten minutes. It is especially easy to create “false failures”, i.e., when peers decide that they are disconnected because the heartbeats aren’t sent properly.
We’ll look at the three main answers people use for heartbeating with ZeroMQ.
Shrugging It Off #
The most common approach is to do no heartbeating at all and hope for the best. Many if not most ZeroMQ applications do this. ZeroMQ encourages this by hiding peers in many cases. What problems does this approach cause?
-
When we use a ROUTER socket in an application that tracks peers, as peers disconnect and reconnect, the application will leak memory (resources that the application holds for each peer) and get slower and slower.
-
When we use SUB- or DEALER-based data recipients, we can’t tell the difference between good silence (there’s no data) and bad silence (the other end died). When a recipient knows the other side died, it can for example switch over to a backup route.
-
If we use a TCP connection that stays silent for a long while, it will, in some networks, just die. Sending something (technically, a “keep-alive” more than a heartbeat), will keep the network alive.
One-Way Heartbeats #
A second option is to send a heartbeat message from each node to its peers every second or so. When one node hears nothing from another within some timeout (several seconds, typically), it will treat that peer as dead. Sounds good, right? Sadly, no. This works in some cases but has nasty edge cases in others.
For pub-sub, this does work, and it’s the only model you can use. SUB sockets cannot talk back to PUB sockets, but PUB sockets can happily send “I’m alive” messages to their subscribers.
As an optimization, you can send heartbeats only when there is no real data to send. Furthermore, you can send heartbeats progressively slower and slower, if network activity is an issue (e.g., on mobile networks where activity drains the battery). As long as the recipient can detect a failure (sharp stop in activity), that’s fine.
Here are the typical problems with this design:
-
It can be inaccurate when we send large amounts of data, as heartbeats will be delayed behind that data. If heartbeats are delayed, you can get false timeouts and disconnections due to network congestion. Thus, always treat any incoming data as a heartbeat, whether or not the sender optimizes out heartbeats.
-
While the pub-sub pattern will drop messages for disappeared recipients, PUSH and DEALER sockets will queue them. So if you send heartbeats to a dead peer and it comes back, it will get all the heartbeats you sent, which can be thousands. Whoa, whoa!
-
This design assumes that heartbeat timeouts are the same across the whole network. But that won’t be accurate. Some peers will want very aggressive heartbeating in order to detect faults rapidly. And some will want very relaxed heartbeating, in order to let sleeping networks lie and save power.
Ping-Pong Heartbeats #
The third option is to use a ping-pong dialog. One peer sends a ping command to the other, which replies with a pong command. Neither command has any payload. Pings and pongs are not correlated. Because the roles of “client” and “server” are arbitrary in some networks, we usually specify that either peer can in fact send a ping and expect a pong in response. However, because the timeouts depend on network topologies known best to dynamic clients, it is usually the client that pings the server.
This works for all ROUTER-based brokers. The same optimizations we used in the second model make this work even better: treat any incoming data as a pong, and only send a ping when not otherwise sending data.
Heartbeating for Paranoid Pirate #
For Paranoid Pirate, we chose the second approach. It might not have been the simplest option: if designing this today, I’d probably try a ping-pong approach instead. However the principles are similar. The heartbeat messages flow asynchronously in both directions, and either peer can decide the other is “dead” and stop talking to it.
In the worker, this is how we handle heartbeats from the queue:
- We calculate a liveness, which is how many heartbeats we can still miss before deciding the queue is dead. It starts at three and we decrement it each time we miss a heartbeat.
- We wait, in the zmq_poll loop, for one second each time, which is our heartbeat interval.
- If there’s any message from the queue during that time, we reset our liveness to three.
- If there’s no message during that time, we count down our liveness.
- If the liveness reaches zero, we consider the queue dead.
- If the queue is dead, we destroy our socket, create a new one, and reconnect.
- To avoid opening and closing too many sockets, we wait for a certain interval before reconnecting, and we double the interval each time until it reaches 32 seconds.
And this is how we handle heartbeats to the queue:
- We calculate when to send the next heartbeat; this is a single variable because we’re talking to one peer, the queue.
- In the zmq_poll loop, whenever we pass this time, we send a heartbeat to the queue.
Here’s the essential heartbeating code for the worker:
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
#define INTERVAL_INIT 1000 // Initial reconnect
#define INTERVAL_MAX 32000 // After exponential backoff
...
// If liveness hits zero, queue is considered disconnected
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;
// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
while (true) {
zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (items [0].revents & ZMQ_POLLIN) {
// Receive any message from queue
liveness = HEARTBEAT_LIVENESS;
interval = INTERVAL_INIT;
}
else
if (--liveness == 0) {
zclock_sleep (interval);
if (interval < INTERVAL_MAX)
interval *= 2;
zsocket_destroy (ctx, worker);
...
liveness = HEARTBEAT_LIVENESS;
}
// Send heartbeat to queue if it's time
if (zclock_time () > heartbeat_at) {
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
// Send heartbeat message to queue
}
}
The queue does the same, but manages an expiration time for each worker.
Here are some tips for your own heartbeating implementation:
-
Use zmq_poll or a reactor as the core of your application’s main task.
-
Start by building the heartbeating between peers, test it by simulating failures, and then build the rest of the message flow. Adding heartbeating afterwards is much trickier.
-
Use simple tracing, i.e., print to console, to get this working. To help you trace the flow of messages between peers, use a dump method such as zmsg offers, and number your messages incrementally so you can see if there are gaps.
-
In a real application, heartbeating must be configurable and usually negotiated with the peer. Some peers will want aggressive heartbeating, as low as 10 msecs. Other peers will be far away and want heartbeating as high as 30 seconds.
-
If you have different heartbeat intervals for different peers, your poll timeout should be the lowest (shortest time) of these. Do not use an infinite timeout.
-
Do heartbeating on the same socket you use for messages, so your heartbeats also act as a keep-alive to stop the network connection from going stale (some firewalls can be unkind to silent connections).
Contracts and Protocols #
If you’re paying attention, you’ll realize that Paranoid Pirate is not interoperable with Simple Pirate, because of the heartbeats. But how do we define “interoperable”? To guarantee interoperability, we need a kind of contract, an agreement that lets different teams in different times and places write code that is guaranteed to work together. We call this a “protocol”.
It’s fun to experiment without specifications, but that’s not a sensible basis for real applications. What happens if we want to write a worker in another language? Do we have to read code to see how things work? What if we want to change the protocol for some reason? Even a simple protocol will, if it’s successful, evolve and become more complex.
Lack of contracts is a sure sign of a disposable application. So let’s write a contract for this protocol. How do we do that?
There’s a wiki at rfc.zeromq.org that we made especially as a home for public ZeroMQ contracts. To create a new specification, register on the wiki if needed, and follow the instructions. It’s fairly straightforward, though writing technical texts is not everyone’s cup of tea.
It took me about fifteen minutes to draft the new Pirate Pattern Protocol. It’s not a big specification, but it does capture enough to act as the basis for arguments (“your queue isn’t PPP compatible; please fix it!").
Turning PPP into a real protocol would take more work:
-
There should be a protocol version number in the READY command so that it’s possible to distinguish between different versions of PPP.
-
Right now, READY and HEARTBEAT are not entirely distinct from requests and replies. To make them distinct, we would need a message structure that includes a “message type” part.
Service-Oriented Reliable Queuing (Majordomo Pattern) #
The nice thing about progress is how fast it happens when lawyers and committees aren’t involved. The one-page MDP specification turns PPP into something more solid. This is how we should design complex architectures: start by writing down the contracts, and only then write software to implement them.
The Majordomo Protocol (MDP) extends and improves on PPP in one interesting way: it adds a “service name” to requests that the client sends, and asks workers to register for specific services. Adding service names turns our Paranoid Pirate queue into a service-oriented broker. The nice thing about MDP is that it came out of working code, a simpler ancestor protocol (PPP), and a precise set of improvements that each solved a clear problem. This made it easy to draft.
To implement Majordomo, we need to write a framework for clients and workers. It’s really not sane to ask every application developer to read the spec and make it work, when they could be using a simpler API that does the work for them.
So while our first contract (MDP itself) defines how the pieces of our distributed architecture talk to each other, our second contract defines how user applications talk to the technical framework we’re going to design.
Majordomo has two halves, a client side and a worker side. Because we’ll write both client and worker applications, we will need two APIs. Here is a sketch for the client API, using a simple object-oriented approach:
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
That’s it. We open a session to the broker, send a request message, get a reply message back, and eventually close the connection. Here’s a sketch for the worker API:
mdwrk_t *mdwrk_new (char *broker,char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);
It’s more or less symmetrical, but the worker dialog is a little different. The first time a worker does a recv(), it passes a null reply. Thereafter, it passes the current reply, and gets a new request.
The client and worker APIs were fairly simple to construct because they’re heavily based on the Paranoid Pirate code we already developed. Here is the client API:
mdcliapi: Majordomo client API in Ada
mdcliapi: Majordomo client API in Basic
mdcliapi: Majordomo client API in C
// mdcliapi class - Majordomo Protocol Client API
// Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
#include "mdcliapi.h"
// Structure of our class
// We access these properties only via class methods
struct _mdcli_t {
zctx_t *ctx; // Our context
char *broker;
void *client; // Socket to broker
int verbose; // Print activity to stdout
int timeout; // Request timeout
int retries; // Request retries
};
// Connect or reconnect to broker
void s_mdcli_connect_to_broker (mdcli_t *self)
{
if (self->client)
zsocket_destroy (self->ctx, self->client);
self->client = zsocket_new (self->ctx, ZMQ_REQ);
zmq_connect (self->client, self->broker);
if (self->verbose)
zclock_log ("I: connecting to broker at %s...", self->broker);
}
// .split constructor and destructor
// Here we have the constructor and destructor for our class:
// Constructor
mdcli_t *
mdcli_new (char *broker, int verbose)
{
assert (broker);
mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->verbose = verbose;
self->timeout = 2500; // msecs
self->retries = 3; // Before we abandon
s_mdcli_connect_to_broker (self);
return self;
}
// Destructor
void
mdcli_destroy (mdcli_t **self_p)
{
assert (self_p);
if (*self_p) {
mdcli_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self);
*self_p = NULL;
}
}
// .split configure retry behavior
// These are the class methods. We can set the request timeout and number
// of retry attempts before sending requests:
// Set request timeout
void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
assert (self);
self->timeout = timeout;
}
// Set request retries
void
mdcli_set_retries (mdcli_t *self, int retries)
{
assert (self);
self->retries = retries;
}
// .split send request and wait for reply
// Here is the {{send}} method. It sends a request to the broker and gets
// a reply even if it has to retry several times. It takes ownership of
// the request message, and destroys it when sent. It returns the reply
// message, or NULL if there was no reply after multiple attempts:
zmsg_t *
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
assert (self);
assert (request_p);
zmsg_t *request = *request_p;
// Prefix request with protocol frames
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: Service name (printable string)
zmsg_pushstr (request, service);
zmsg_pushstr (request, MDPC_CLIENT);
if (self->verbose) {
zclock_log ("I: send request to '%s' service:", service);
zmsg_dump (request);
}
int retries_left = self->retries;
while (retries_left && !zctx_interrupted) {
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->client);
zmq_pollitem_t items [] = {
{ self->client, 0, ZMQ_POLLIN, 0 }
};
// .split body of send
// On any blocking call, {{libzmq}} will return -1 if there was
// an error; we could in theory check for different error codes,
// but in practice it's OK to assume it was {{EINTR}} (Ctrl-C):
int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted
// If we got a reply, process it
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->client);
if (self->verbose) {
zclock_log ("I: received reply:");
zmsg_dump (msg);
}
// We would handle malformed replies better in real code
assert (zmsg_size (msg) >= 3);
zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPC_CLIENT));
zframe_destroy (&header);
zframe_t *reply_service = zmsg_pop (msg);
assert (zframe_streq (reply_service, service));
zframe_destroy (&reply_service);
zmsg_destroy (&request);
return msg; // Success
}
else
if (--retries_left) {
if (self->verbose)
zclock_log ("W: no reply, reconnecting...");
s_mdcli_connect_to_broker (self);
}
else {
if (self->verbose)
zclock_log ("W: permanent error, abandoning");
break; // Give up
}
}
if (zctx_interrupted)
printf ("W: interrupt received, killing client...\n");
zmsg_destroy (&request);
return NULL;
}
mdcliapi: Majordomo client API in C++
#ifndef __MDCLIAPI_HPP_INCLUDED__
#define __MDCLIAPI_HPP_INCLUDED__
//#include "mdcliapi.h"
#include "zmsg.hpp"
#include "mdp.h"
class mdcli {
public:
// ---------------------------------------------------------------------
// Constructor
mdcli (std::string broker, int verbose): m_broker(broker), m_verbose(verbose)
{
assert (broker.size()!=0);
s_version_assert (4, 0);
m_context = new zmq::context_t(1);
s_catch_signals ();
connect_to_broker ();
}
// Destructor
virtual
~mdcli ()
{
delete m_client;
delete m_context;
}
// ---------------------------------------------------------------------
// Connect or reconnect to broker
void connect_to_broker ()
{
if (m_client) {
delete m_client;
}
m_client = new zmq::socket_t (*m_context, ZMQ_REQ);
s_set_id(*m_client);
int linger = 0;
m_client->setsockopt(ZMQ_LINGER, &linger, sizeof (linger));
m_client->connect (m_broker.c_str());
if (m_verbose) {
s_console ("I: connecting to broker at %s...", m_broker.c_str());
}
}
// ---------------------------------------------------------------------
// Set request timeout
void
set_timeout (int timeout)
{
m_timeout = timeout;
}
// ---------------------------------------------------------------------
// Set request retries
void
set_retries (int retries)
{
m_retries = retries;
}
// ---------------------------------------------------------------------
// Send request to broker and get reply by hook or crook
// Takes ownership of request message and destroys it when sent.
// Returns the reply message or NULL if there was no reply.
zmsg *
send (std::string service, zmsg *&request_p)
{
assert (request_p);
zmsg *request = request_p;
// Prefix request with protocol frames
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: Service name (printable string)
request->push_front(service.c_str());
request->push_front(k_mdp_client.data());
if (m_verbose) {
s_console ("I: send request to '%s' service:", service.c_str());
request->dump();
}
int retries_left = m_retries;
while (retries_left && !s_interrupted) {
zmsg * msg = new zmsg(*request);
msg->send(*m_client);
while (!s_interrupted) {
// Poll socket for a reply, with timeout
zmq::pollitem_t items [] = {
{ *m_client, 0, ZMQ_POLLIN, 0 } };
zmq::poll (items, 1, m_timeout);
// If we got a reply, process it
if (items [0].revents & ZMQ_POLLIN) {
zmsg * recv_msg = new zmsg(*m_client);
if (m_verbose) {
s_console ("I: received reply:");
recv_msg->dump ();
}
// Don't try to handle errors, just assert noisily
assert (recv_msg->parts () >= 3);
ustring header = recv_msg->pop_front();
assert (header.compare((unsigned char *)k_mdp_client.data()) == 0);
ustring reply_service = recv_msg->pop_front();
assert (reply_service.compare((unsigned char *)service.c_str()) == 0);
delete request;
return recv_msg; // Success
}
else {
if (--retries_left) {
if (m_verbose) {
s_console ("W: no reply, reconnecting...");
}
// Reconnect, and resend message
connect_to_broker ();
zmsg msg (*request);
msg.send (*m_client);
}
else {
if (m_verbose) {
s_console ("W: permanent error, abandoning request");
}
break; // Give up
}
}
}
}
if (s_interrupted) {
std::cout << "W: interrupt received, killing client..." << std::endl;
}
delete request;
return 0;
}
private:
const std::string m_broker;
zmq::context_t * m_context;
zmq::socket_t * m_client{nullptr}; // Socket to broker
const int m_verbose; // Print activity to stdout
int m_timeout{2500}; // Request timeout
int m_retries{3}; // Request retries
};
#endif
mdcliapi: Majordomo client API in C#
mdcliapi: Majordomo client API in CL
mdcliapi: Majordomo client API in Delphi
mdcliapi: Majordomo client API in Erlang
mdcliapi: Majordomo client API in Elixir
mdcliapi: Majordomo client API in F#
mdcliapi: Majordomo client API in Felix
mdcliapi: Majordomo client API in Go
// mdcliapi class - Majordomo Protocol Client API
// Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7
//
// Author: iano <scaly.iano@gmail.com>
// Based on C & Python example
package main
import (
zmq "github.com/alecthomas/gozmq"
"log"
"time"
)
type Client interface {
Close()
Send([]byte, [][]byte) [][]byte
}
type mdClient struct {
broker string
client *zmq.Socket
context *zmq.Context
retries int
timeout time.Duration
verbose bool
}
func NewClient(broker string, verbose bool) Client {
context, _ := zmq.NewContext()
self := &mdClient{
broker: broker,
context: context,
retries: 3,
timeout: 2500 * time.Millisecond,
verbose: verbose,
}
self.reconnect()
return self
}
func (self *mdClient) reconnect() {
if self.client != nil {
self.client.Close()
}
self.client, _ = self.context.NewSocket(zmq.REQ)
self.client.SetLinger(0)
self.client.Connect(self.broker)
if self.verbose {
log.Printf("I: connecting to broker at %s...\n", self.broker)
}
}
func (self *mdClient) Close() {
if self.client != nil {
self.client.Close()
}
self.context.Close()
}
func (self *mdClient) Send(service []byte, request [][]byte) (reply [][]byte) {
// Prefix request with protocol frames
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: Service name (printable string)
frame := append([][]byte{[]byte(MDPC_CLIENT), service}, request...)
if self.verbose {
log.Printf("I: send request to '%s' service:", service)
Dump(request)
}
for retries := self.retries; retries > 0; {
self.client.SendMultipart(frame, 0)
items := zmq.PollItems{
zmq.PollItem{Socket: self.client, Events: zmq.POLLIN},
}
_, err := zmq.Poll(items, self.timeout)
if err != nil {
panic(err) // Interrupted
}
if item := items[0]; item.REvents&zmq.POLLIN != 0 {
msg, _ := self.client.RecvMultipart(0)
if self.verbose {
log.Println("I: received reply: ")
Dump(msg)
}
// We would handle malformed replies better in real code
if len(msg) < 3 {
panic("Error msg len")
}
header := msg[0]
if string(header) != MDPC_CLIENT {
panic("Error header")
}
replyService := msg[1]
if string(service) != string(replyService) {
panic("Error reply service")
}
reply = msg[2:]
break
} else if retries--; retries > 0 {
if self.verbose {
log.Println("W: no reply, reconnecting...")
}
self.reconnect()
} else {
if self.verbose {
log.Println("W: permanent error, abandoning")
}
break
}
}
return
}
mdcliapi: Majordomo client API in Haskell
mdcliapi: Majordomo client API in Haxe
package ;
import haxe.Stack;
import neko.Lib;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMsg;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQException;
import MDP;
/**
* Majordomo Protocol Client API
* Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7
*/
class MDCliAPI
{
/** Request timeout (in msec) */
public var timeout:Int;
/** Request #retries */
public var retries:Int;
// Private instance fields
/** Our context */
private var ctx:ZContext;
/** Connection string to reach broker */
private var broker:String;
/** Socket to broker */
private var client:ZMQSocket;
/** Print activity to stdout */
private var verbose:Bool;
/** Logger function used in verbose mode */
private var log:Dynamic->Void;
/**
* Constructor
* @param broker
* @param verbose
*/
public function new(broker:String, ?verbose:Bool=false, ?logger:Dynamic->Void) {
ctx = new ZContext();
this.broker = broker;
this.verbose = verbose;
this.timeout = 2500; // msecs
this.retries = 3; // before we abandon
if (logger != null)
log = logger;
else
log = Lib.println;
connectToBroker();
}
/**
* Connect or reconnect to broker
*/
public function connectToBroker() {
if (client != null)
client.close();
client = ctx.createSocket(ZMQ_REQ);
client.setsockopt(ZMQ_LINGER, 0);
client.connect(broker);
if (verbose)
log("I: client connecting to broker at " + broker + "...");
}
/**
* Destructor
*/
public function destroy() {
ctx.destroy();
}
/**
* Send request to broker and get reply by hook or crook.
* Takes ownership of request message and destroys it when sent.
* Returns the reply message or NULL if there was no reply after #retries
* @param service
* @param request
* @return
*/
public function send(service:String, request:ZMsg):ZMsg {
// Prefix request with MDP protocol frames
// Frame 1: "MDPCxy" (six bytes, MDP/Client)
// Frame 2: Service name (printable string)
request.push(ZFrame.newStringFrame(service));
request.push(ZFrame.newStringFrame(MDP.MDPC_CLIENT));
if (verbose) {
log("I: send request to '" + service + "' service:");
log(request.toString());
}
var retries_left = retries;
var poller = new ZMQPoller();
while (retries_left > 0 && !ZMQ.isInterrupted()) {
// We send a request, then we work to get a reply
var msg = request.duplicate();
msg.send(client);
var expect_reply = true;
while (expect_reply) {
poller.registerSocket(client, ZMQ.ZMQ_POLLIN());
// Poll socket for a reply, with timeout
try {
var res = poller.poll(timeout * 1000);
} catch (e:ZMQException) {
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
ctx.destroy();
return null;
}
// If we got a reply, process it
if (poller.pollin(1)) {
// We got a reply from the server, must match sequence
var msg = ZMsg.recvMsg(client);
if (msg == null)
break; // Interrupted
if (verbose)
log("I: received reply:" + msg.toString());
if (msg.size() < 3)
break; // Don't try to handle errors
var header = msg.pop();
if (!header.streq(MDP.MDPC_CLIENT))
break; // Assert
header.destroy();
var reply_service = msg.pop();
if (!reply_service.streq(service))
break; // Assert
reply_service.destroy();
request.destroy();
return msg; // Success
} else if (--retries_left > 0) {
if (verbose)
log("W: no reply, reconnecting...");
// Reconnect , and resend message
connectToBroker();
msg = request.duplicate();
msg.send(client);
} else {
if (verbose)
log("E: permanent error, abandoning");
break;
}
poller.unregisterAllSockets();
}
}
return null;
}
}
mdcliapi: Majordomo client API in Java
package guide;
import java.util.Formatter;
import org.zeromq.*;
/**
* Majordomo Protocol Client API, Java version Implements the MDP/Worker spec at
* http://rfc.zeromq.org/spec:7.
*
*/
public class mdcliapi
{
private String broker;
private ZContext ctx;
private ZMQ.Socket client;
private long timeout = 2500;
private int retries = 3;
private boolean verbose;
private Formatter log = new Formatter(System.out);
public long getTimeout()
{
return timeout;
}
public void setTimeout(long timeout)
{
this.timeout = timeout;
}
public int getRetries()
{
return retries;
}
public void setRetries(int retries)
{
this.retries = retries;
}
public mdcliapi(String broker, boolean verbose)
{
this.broker = broker;
this.verbose = verbose;
ctx = new ZContext();
reconnectToBroker();
}
/**
* Connect or reconnect to broker
*/
void reconnectToBroker()
{
if (client != null) {
ctx.destroySocket(client);
}
client = ctx.createSocket(SocketType.REQ);
client.connect(broker);
if (verbose)
log.format("I: connecting to broker at %s\n", broker);
}
/**
* Send request to broker and get reply by hook or crook Takes ownership of
* request message and destroys it when sent. Returns the reply message or
* NULL if there was no reply.
*
* @param service
* @param request
* @return
*/
public ZMsg send(String service, ZMsg request)
{
request.push(new ZFrame(service));
request.push(MDP.C_CLIENT.newFrame());
if (verbose) {
log.format("I: send request to '%s' service: \n", service);
request.dump(log.out());
}
ZMsg reply = null;
int retriesLeft = retries;
while (retriesLeft > 0 && !Thread.currentThread().isInterrupted()) {
request.duplicate().send(client);
// Poll socket for a reply, with timeout
ZMQ.Poller items = ctx.createPoller(1);
items.register(client, ZMQ.Poller.POLLIN);
if (items.poll(timeout) == -1)
break; // Interrupted
if (items.pollin(0)) {
ZMsg msg = ZMsg.recvMsg(client);
if (verbose) {
log.format("I: received reply: \n");
msg.dump(log.out());
}
// Don't try to handle errors, just assert noisily
assert (msg.size() >= 3);
ZFrame header = msg.pop();
assert (MDP.C_CLIENT.equals(header.toString()));
header.destroy();
ZFrame replyService = msg.pop();
assert (service.equals(replyService.toString()));
replyService.destroy();
reply = msg;
break;
}
else {
items.unregister(client);
if (--retriesLeft == 0) {
log.format("W: permanent error, abandoning\n");
break;
}
log.format("W: no reply, reconnecting\n");
reconnectToBroker();
}
items.close();
}
request.destroy();
return reply;
}
public void destroy()
{
ctx.destroy();
}
}
mdcliapi: Majordomo client API in Julia
mdcliapi: Majordomo client API in Lua
--
-- mdcliapi.lua - Majordomo Protocol Client API
--
-- Author: Robert G. Jakabosky <bobby@sharedrealm.com>
--
local setmetatable = setmetatable
local mdp = require"mdp"
local zmq = require"zmq"
local zpoller = require"zmq.poller"
local zmsg = require"zmsg"
require"zhelpers"
local s_version_assert = s_version_assert
local obj_mt = {}
obj_mt.__index = obj_mt
function obj_mt:set_timeout(timeout)
self.timeout = timeout
end
function obj_mt:set_retries(retries)
self.retries = retries
end
function obj_mt:destroy()
if self.client then self.client:close() end
self.context:term()
end
local function s_mdcli_connect_to_broker(self)
-- close old socket.
if self.client then
self.poller:remove(self.client)
self.client:close()
end
self.client = assert(self.context:socket(zmq.REQ))
assert(self.client:setopt(zmq.LINGER, 0))
assert(self.client:connect(self.broker))
if self.verbose then
s_console("I: connecting to broker at %s...", self.broker)
end
-- add socket to poller
self.poller:add(self.client, zmq.POLLIN, function()
self.got_reply = true
end)
end
--
-- Send request to broker and get reply by hook or crook
-- Returns the reply message or nil if there was no reply.
--
function obj_mt:send(service, request)
-- Prefix request with protocol frames
-- Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
-- Frame 2: Service name (printable string)
request:push(service)
request:push(mdp.MDPC_CLIENT)
if self.verbose then
s_console("I: send request to '%s' service:", service)
request:dump()
end
local retries = self.retries
while (retries > 0) do
local msg = request:dup()
msg:send(self.client)
self.got_reply = false
while true do
local cnt = assert(self.poller:poll(self.timeout * 1000))
if cnt ~= 0 and self.got_reply then
local msg = zmsg.recv(self.client)
if self.verbose then
s_console("I: received reply:")
msg:dump()
end
assert(msg:parts() >= 3)
local header = msg:pop()
assert(header == mdp.MDPC_CLIENT)
local reply_service = msg:pop()
assert(reply_service == service)
return msg
else
retries = retries - 1
if (retries > 0) then
if self.verbose then
s_console("W: no reply, reconnecting...")
end
-- Reconnect
s_mdcli_connect_to_broker(self)
break -- outer loop will resend request.
else
if self.verbose then
s_console("W: permanent error, abandoning request")
end
return nil -- Giving up
end
end
end
end
end
module(...)
function new(broker, verbose)
s_version_assert (2, 1);
local self = setmetatable({
context = zmq.init(1),
poller = zpoller.new(1),
broker = broker,
verbose = verbose,
timeout = 2500, -- msecs
retries = 3, -- before we abandon
}, obj_mt)
s_mdcli_connect_to_broker(self)
return self
end
setmetatable(_M, { __call = function(self, ...) return new(...) end })
mdcliapi: Majordomo client API in Node.js
mdcliapi: Majordomo client API in Objective-C
mdcliapi: Majordomo client API in ooc
mdcliapi: Majordomo client API in Perl
mdcliapi: Majordomo client API in PHP
<?php
/* =====================================================================
* mdcliapi.h
*
* Majordomo Protocol Client API
* Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
*/
include_once 'zmsg.php';
include_once 'mdp.php';
class MDCli
{
// Structure of our class
// We access these properties only via class methods
private $broker;
private $context;
private $client; // Socket to broker
private $verbose; // Print activity to stdout
private $timeout; // Request timeout
private $retries; // Request retries
/**
* Constructor
*
* @param string $broker
* @param boolean $verbose
*/
public function __construct($broker, $verbose = false)
{
$this->broker = $broker;
$this->context = new ZMQContext();
$this->verbose = $verbose;
$this->timeout = 2500; // msecs
$this->retries = 3; // Before we abandon
$this->connect_to_broker();
}
/**
* Connect or reconnect to broker
*/
protected function connect_to_broker()
{
if ($this->client) {
unset($this->client);
}
$this->client = new ZMQSocket($this->context, ZMQ::SOCKET_REQ);
$this->client->setSockOpt(ZMQ::SOCKOPT_LINGER, 0);
$this->client->connect($this->broker);
if ($this->verbose) {
printf("I: connecting to broker at %s...", $this->broker);
}
}
/**
* Set request timeout
*
* @param int $timeout (msecs)
*/
public function set_timeout($timeout)
{
$this->timeout = $timeout;
}
/**
* Set request retries
*
* @param int $retries
*/
public function set_retries($retries)
{
$this->retries = $retries;
}
/**
* Send request to broker and get reply by hook or crook
* Takes ownership of request message and destroys it when sent.
* Returns the reply message or NULL if there was no reply.
*
* @param string $service
* @param Zmsg $request
* @param string $client
* @return Zmsg
*/
public function send($service, Zmsg $request)
{
// Prefix request with protocol frames
// Frame 1: "MDPCxy" (six bytes, MDP/Client
// Frame 2: Service name (printable string)
$request->push($service);
$request->push(MDPC_CLIENT);
if ($this->verbose) {
printf ("I: send request to '%s' service:", $service);
echo $request->__toString();
}
$retries_left = $this->retries;
$read = $write = array();
while ($retries_left) {
$request->set_socket($this->client)->send();
// Poll socket for a reply, with timeout
$poll = new ZMQPoll();
$poll->add($this->client, ZMQ::POLL_IN);
$events = $poll->poll($read, $write, $this->timeout);
// If we got a reply, process it
if ($events) {
$request->recv();
if ($this->verbose) {
echo "I: received reply:", $request->__toString(), PHP_EOL;
}
// Don't try to handle errors, just assert noisily
assert ($request->parts() >= 3);
$header = $request->pop();
assert($header == MDPC_CLIENT);
$reply_service = $request->pop();
assert($reply_service == $service);
return $request; // Success
} elseif ($retries_left--) {
if ($this->verbose) {
echo "W: no reply, reconnecting...", PHP_EOL;
}
// Reconnect, and resend message
$this->connect_to_broker();
$request->send();
} else {
echo "W: permanent error, abandoning request", PHP_EOL;
break; // Give up
}
}
}
}
mdcliapi: Majordomo client API in Python
"""Majordomo Protocol Client API, Python version.
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""
import logging
import zmq
import MDP
from zhelpers import dump
class MajorDomoClient(object):
"""Majordomo Protocol Client API, Python version.
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
broker = None
ctx = None
client = None
poller = None
timeout = 2500
retries = 3
verbose = False
def __init__(self, broker, verbose=False):
self.broker = broker
self.verbose = verbose
self.ctx = zmq.Context()
self.poller = zmq.Poller()
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
self.reconnect_to_broker()
def reconnect_to_broker(self):
"""Connect or reconnect to broker"""
if self.client:
self.poller.unregister(self.client)
self.client.close()
self.client = self.ctx.socket(zmq.REQ)
self.client.linger = 0
self.client.connect(self.broker)
self.poller.register(self.client, zmq.POLLIN)
if self.verbose:
logging.info("I: connecting to broker at %s...", self.broker)
def send(self, service, request):
"""Send request to broker and get reply by hook or crook.
Takes ownership of request message and destroys it when sent.
Returns the reply message or None if there was no reply.
"""
if not isinstance(request, list):
request = [request]
request = [MDP.C_CLIENT, service] + request
if self.verbose:
logging.warn("I: send request to '%s' service: ", service)
dump(request)
reply = None
retries = self.retries
while retries > 0:
self.client.send_multipart(request)
try:
items = self.poller.poll(self.timeout)
except KeyboardInterrupt:
break # interrupted
if items:
msg = self.client.recv_multipart()
if self.verbose:
logging.info("I: received reply:")
dump(msg)
# Don't try to handle errors, just assert noisily
assert len(msg) >= 3
header = msg.pop(0)
assert MDP.C_CLIENT == header
reply_service = msg.pop(0)
assert service == reply_service
reply = msg
break
else:
if retries:
logging.warn("W: no reply, reconnecting...")
self.reconnect_to_broker()
else:
logging.warn("W: permanent error, abandoning")
break
retries -= 1
return reply
def destroy(self):
self.context.destroy()
mdcliapi: Majordomo client API in Q
mdcliapi: Majordomo client API in Racket
mdcliapi: Majordomo client API in Ruby
mdcliapi: Majordomo client API in Rust
mdcliapi: Majordomo client API in Scala
mdcliapi: Majordomo client API in Tcl
# Majordomo Protocol Client API, Tcl version.
# Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
package require TclOO
package require zmq
package require mdp
package provide MDClient 1.0
oo::class create MDClient {
variable context broker verbose timeout retries client
constructor {ibroker {iverbose 0}} {
set context [zmq context mdcli_context_[::mdp::contextid]]
set broker $ibroker
set verbose $iverbose
set timeout 2500
set retries 3
set client ""
my connect_to_broker
}
destructor {
$client close
$context term
}
method connect_to_broker {} {
if {[string length $client]} {
$client close
}
set client [zmq socket mdcli_socket_[::mdp::socketid] $context REQ]
$client connect $broker
if {$verbose} {
puts "I: connecting to broker at $broker..."
}
}
method set_timeout {itimeout} {
set timeout $itimeout
}
method set_retries {iretries} {
set retries $iretries
}
# Send request to broker and get reply by hook or crook
# Takes ownership of request message and destroys it when sent.
# Returns the reply message or NULL if there was no reply.
method send {service request} {
# Prefix request with protocol frames
# Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
# Frame 2: Service name (printable string)
set request [zmsg push $request $service]
set request [zmsg push $request $mdp::MDPC_CLIENT]
if {$verbose} {
puts "I: send request to '$service' service:"
puts [join [zmsg dump $request] \n]
}
set retries_left $retries
while {$retries_left} {
set msg $request
zmsg send $client $msg
# Poll socket for a reply, with timeout
set poll_set [list [list $client [list POLLIN]]]
set rpoll_set [zmq poll $poll_set $timeout]
# If we got a reply, process it
if {[llength $rpoll_set] && "POLLIN" in [lindex $rpoll_set 0 1]} {
set msg [zmsg recv $client]
if {$verbose} {
puts "I: received reply:"
puts [join [zmsg dump $msg] \n]
}
# Don't try to handle errors, just assert noisily
if {[llength $msg] < 3} {
error "message size < 3"
}
set header [zmsg pop msg]
if {$header ne $mdp::MDPC_CLIENT} {
error "unexpected header"
}
set reply_service [zmsg pop msg]
if {$reply_service ne $service} {
error "unexpected service"
}
return $msg
} elseif {[incr retries_left -1]} {
if {$verbose} {
puts "W: no reply, reconnecting..."
}
# Reconnect socket
my connect_to_broker
} else {
if {$verbose} {
puts "W: permanent error, abandoning"
}
break ;# Give up
}
}
return {}
}
}
mdcliapi: Majordomo client API in OCaml
Let’s see how the client API looks in action, with an example test program that does 100K request-reply cycles:
mdclient: Majordomo client application in Ada
mdclient: Majordomo client application in Basic
mdclient: Majordomo client application in C
// Majordomo Protocol client example
// Uses the mdcli API to hide all MDP aspects
// Lets us build this source without creating a library
#include "mdcliapi.c"
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
int count;
for (count = 0; count < 100000; count++) {
zmsg_t *request = zmsg_new ();
zmsg_pushstr (request, "Hello world");
zmsg_t *reply = mdcli_send (session, "echo", &request);
if (reply)
zmsg_destroy (&reply);
else
break; // Interrupt or failure
}
printf ("%d requests/replies processed\n", count);
mdcli_destroy (&session);
return 0;
}
mdclient: Majordomo client application in C++
//
// Majordomo Protocol client example
// Uses the mdcli API to hide all MDP aspects
//
// Lets us 'build mdclient' and 'build all'
//
// Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at>
//
#include "mdcliapi.hpp"
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && strcmp (argv [1], "-v") == 0);
mdcli session ("tcp://localhost:5555", verbose);
int count;
for (count = 0; count < 100000; count++) {
zmsg * request = new zmsg("Hello world");
zmsg * reply = session.send ("echo", request);
if (reply) {
delete reply;
} else {
break; // Interrupt or failure
}
}
std::cout << count << " requests/replies processed" << std::endl;
return 0;
}
mdclient: Majordomo client application in C#
mdclient: Majordomo client application in CL
mdclient: Majordomo client application in Delphi
mdclient: Majordomo client application in Erlang
mdclient: Majordomo client application in Elixir
mdclient: Majordomo client application in F#
mdclient: Majordomo client application in Felix
mdclient: Majordomo client application in Go
// Majordomo Protocol client example
// Uses the mdcli API to hide all MDP aspects
//
// To run this example, you may need to run multiple *.go files as below
// go run mdp.go zhelpers.go mdcliapi.go mdclient.go [-v]
//
// Author: iano <scaly.iano@gmail.com>
package main
import (
"fmt"
"os"
)
func main() {
verbose := len(os.Args) >= 2 && os.Args[1] == "-v"
client := NewClient("tcp://localhost:5555", verbose)
defer client.Close()
count := 0
for ; count < 1e5; count++ {
request := [][]byte{[]byte("Hello world")}
reply := client.Send([]byte("echo"), request)
if len(reply) == 0 {
break
}
}
fmt.Printf("%d requests/replies processed\n", count)
}
mdclient: Majordomo client application in Haskell
import MDClientAPI
import ZHelpers
import System.Environment
import Control.Monad (forM_, when)
import Data.ByteString.Char8 (unpack, pack)
main :: IO ()
main = do
args <- getArgs
when (length args /= 1) $
error "usage: mdclient <is_verbose(True|False)>"
let isVerbose = read (args !! 0) :: Bool
withMDCli "tcp://localhost:5555" isVerbose $ \api ->
forM_ [0..10000] $ \i -> do
mdSend api "echo" [pack "Hello world"]
mdclient: Majordomo client application in Haxe
package ;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZMsg;
/**
* Majordomo Protocol client example
* Uses the MDPCli API to hide all MDP aspects
*
* @author Richard J Smith
*/
class MDClient
{
public static function main() {
var argArr = Sys.args();
var verbose = (argArr.length > 1 && argArr[argArr.length - 1] == "-v");
var session = new MDCliAPI("tcp://localhost:5555", verbose);
var count = 0;
for (i in 0 ... 100000) {
var request = new ZMsg();
request.pushString("Hello world: "+i);
var reply = session.send("echo", request);
if (reply != null)
reply.destroy();
else
break; // Interrupt or failure
count++;
}
Lib.println(count + " requests/replies processed");
session.destroy();
}
}
mdclient: Majordomo client application in Java
package guide;
import org.zeromq.ZMsg;
/**
* Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects
*/
public class mdclient
{
public static void main(String[] args)
{
boolean verbose = (args.length > 0 && "-v".equals(args[0]));
mdcliapi clientSession = new mdcliapi("tcp://localhost:5555", verbose);
int count;
for (count = 0; count < 100000; count++) {
ZMsg request = new ZMsg();
request.addString("Hello world");
ZMsg reply = clientSession.send("echo", request);
if (reply != null)
reply.destroy();
else break; // Interrupt or failure
}
System.out.printf("%d requests/replies processed\n", count);
clientSession.destroy();
}
}
mdclient: Majordomo client application in Julia
mdclient: Majordomo client application in Lua
--
-- Majordomo Protocol client example
-- Uses the mdcli API to hide all MDP aspects
--
-- Author: Robert G. Jakabosky <bobby@sharedrealm.com>
--
require"mdcliapi"
require"zmsg"
require"zhelpers"
local verbose = (arg[1] == "-v")
local session = mdcliapi.new("tcp://localhost:5555", verbose)
local count=1
repeat
local request = zmsg.new("Hello world")
local reply = session:send("echo", request)
if not reply then
break -- Interrupt or failure
end
count = count + 1
until (count == 100000)
printf("%d requests/replies processed\n", count)
session:destroy()
mdclient: Majordomo client application in Node.js
mdclient: Majordomo client application in Objective-C
mdclient: Majordomo client application in ooc
mdclient: Majordomo client application in Perl
mdclient: Majordomo client application in PHP
<?php
/*
* Majordomo Protocol client example
* Uses the mdcli API to hide all MDP aspects
*
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
include_once 'mdcliapi.php';
$verbose = $_SERVER['argc'] > 1 && $_SERVER['argv'][1] == '-v';
$session = new MDCli("tcp://localhost:5555", $verbose);
for ($count = 0; $count < 100000; $count++) {
$request = new Zmsg();
$request->body_set("Hello world");
$reply = $session->send("echo", $request);
if (!$reply) {
break; // Interrupt or failure
}
}
printf ("%d requests/replies processed", $count);
echo PHP_EOL;
mdclient: Majordomo client application in Python
"""
Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects
Author : Min RK <benjaminrk@gmail.com>
"""
import sys
from mdcliapi import MajorDomoClient
def main():
verbose = '-v' in sys.argv
client = MajorDomoClient("tcp://localhost:5555", verbose)
count = 0
while count < 100000:
request = b"Hello world"
try:
reply = client.send(b"echo", request)
except KeyboardInterrupt:
break
else:
# also break on failure to reply:
if reply is None:
break
count += 1
print ("%i requests/replies processed" % count)
if __name__ == '__main__':
main()
mdclient: Majordomo client application in Q
mdclient: Majordomo client application in Racket
mdclient: Majordomo client application in Ruby
mdclient: Majordomo client application in Rust
mdclient: Majordomo client application in Scala
mdclient: Majordomo client application in Tcl
#
# Majordomo Protocol client example
# Uses the mdcli API to hide all MDP aspects
#
lappend auto_path .
package require MDClient 1.0
set verbose 0
foreach {k v} $argv {
if {$k eq "-v"} { set verbose 1 }
}
set session [MDClient new "tcp://localhost:5555" $verbose]
for {set count 0} {$count < 10000} {incr count} {
set request [list "Hello world"]
set reply [$session send "echo" $request]
if {[llength $reply] == 0} {
break ;# Interrupt or failure
}
}
puts "$count requests/replies processed"
$session destroy
mdclient: Majordomo client application in OCaml
And here is the worker API:
mdwrkapi: Majordomo worker API in Ada
mdwrkapi: Majordomo worker API in Basic
mdwrkapi: Majordomo worker API in C
// mdwrkapi class - Majordomo Protocol Worker API
// Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
#include "mdwrkapi.h"
// Reliability parameters
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
// .split worker class structure
// This is the structure of a worker API instance. We use a pseudo-OO
// approach in a lot of the C examples, as well as the CZMQ binding:
// Structure of our class
// We access these properties only via class methods
struct _mdwrk_t {
zctx_t *ctx; // Our context
char *broker;
char *service;
void *worker; // Socket to broker
int verbose; // Print activity to stdout
// Heartbeat management
uint64_t heartbeat_at; // When to send HEARTBEAT
size_t liveness; // How many attempts left
int heartbeat; // Heartbeat delay, msecs
int reconnect; // Reconnect delay, msecs
int expect_reply; // Zero only at start
zframe_t *reply_to; // Return identity, if any
};
// .split utility functions
// We have two utility functions; to send a message to the broker and
// to (re)connect to the broker:
// Send message to broker
// If no msg is provided, creates one internally
static void
s_mdwrk_send_to_broker (mdwrk_t *self, char *command, char *option,
zmsg_t *msg)
{
msg = msg? zmsg_dup (msg): zmsg_new ();
// Stack protocol envelope to start of message
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);
zmsg_pushstr (msg, "");
if (self->verbose) {
zclock_log ("I: sending %s to broker",
mdps_commands [(int) *command]);
zmsg_dump (msg);
}
zmsg_send (&msg, self->worker);
}
// Connect or reconnect to broker
void s_mdwrk_connect_to_broker (mdwrk_t *self)
{
if (self->worker)
zsocket_destroy (self->ctx, self->worker);
self->worker = zsocket_new (self->ctx, ZMQ_DEALER);
zmq_connect (self->worker, self->broker);
if (self->verbose)
zclock_log ("I: connecting to broker at %s...", self->broker);
// Register service with broker
s_mdwrk_send_to_broker (self, MDPW_READY, self->service, NULL);
// If liveness hits zero, queue is considered disconnected
self->liveness = HEARTBEAT_LIVENESS;
self->heartbeat_at = zclock_time () + self->heartbeat;
}
// .split constructor and destructor
// Here we have the constructor and destructor for our mdwrk class:
// Constructor
mdwrk_t *
mdwrk_new (char *broker,char *service, int verbose)
{
assert (broker);
assert (service);
mdwrk_t *self = (mdwrk_t *) zmalloc (sizeof (mdwrk_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->service = strdup (service);
self->verbose = verbose;
self->heartbeat = 2500; // msecs
self->reconnect = 2500; // msecs
s_mdwrk_connect_to_broker (self);
return self;
}
// Destructor
void
mdwrk_destroy (mdwrk_t **self_p)
{
assert (self_p);
if (*self_p) {
mdwrk_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self->service);
free (self);
*self_p = NULL;
}
}
// .split configure worker
// We provide two methods to configure the worker API. You can set the
// heartbeat interval and retries to match the expected network performance.
// Set heartbeat delay
void
mdwrk_set_heartbeat (mdwrk_t *self, int heartbeat)
{
self->heartbeat = heartbeat;
}
// Set reconnect delay
void
mdwrk_set_reconnect (mdwrk_t *self, int reconnect)
{
self->reconnect = reconnect;
}
// .split recv method
// This is the {{recv}} method; it's a little misnamed because it first sends
// any reply and then waits for a new request. If you have a better name
// for this, let me know.
// Send reply, if any, to broker and wait for next request.
zmsg_t *
mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p)
{
// Format and send the reply if we were provided one
assert (reply_p);
zmsg_t *reply = *reply_p;
assert (reply || !self->expect_reply);
if (reply) {
assert (self->reply_to);
zmsg_wrap (reply, self->reply_to);
s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply);
zmsg_destroy (reply_p);
}
self->expect_reply = 1;
while (true) {
zmq_pollitem_t items [] = {
{ self->worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->worker);
if (!msg)
break; // Interrupted
if (self->verbose) {
zclock_log ("I: received message from broker:");
zmsg_dump (msg);
}
self->liveness = HEARTBEAT_LIVENESS;
// Don't try to handle errors, just assert noisily
assert (zmsg_size (msg) >= 3);
zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
zframe_destroy (&empty);
zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPW_WORKER));
zframe_destroy (&header);
zframe_t *command = zmsg_pop (msg);
if (zframe_streq (command, MDPW_REQUEST)) {
// We should pop and save as many addresses as there are
// up to a null part, but for now, just save one...
self->reply_to = zmsg_unwrap (msg);
zframe_destroy (&command);
// .split process message
// Here is where we actually have a message to process; we
// return it to the caller application:
return msg; // We have a request to process
}
else
if (zframe_streq (command, MDPW_HEARTBEAT))
; // Do nothing for heartbeats
else
if (zframe_streq (command, MDPW_DISCONNECT))
s_mdwrk_connect_to_broker (self);
else {
zclock_log ("E: invalid input message");
zmsg_dump (msg);
}
zframe_destroy (&command);
zmsg_destroy (&msg);
}
else
if (--self->liveness == 0) {
if (self->verbose)
zclock_log ("W: disconnected from broker - retrying...");
zclock_sleep (self->reconnect);
s_mdwrk_connect_to_broker (self);
}
// Send HEARTBEAT if it's time
if (zclock_time () > self->heartbeat_at) {
s_mdwrk_send_to_broker (self, MDPW_HEARTBEAT, NULL, NULL);
self->heartbeat_at = zclock_time () + self->heartbeat;
}
}
if (zctx_interrupted)
printf ("W: interrupt received, killing worker...\n");
return NULL;
}
mdwrkapi: Majordomo worker API in C++
#ifndef __MDWRKAPI_HPP_INCLUDED__
#define __MDWRKAPI_HPP_INCLUDED__
#include "zmsg.hpp"
#include "mdp.h"
// Reliability parameters
// Structure of our class
// We access these properties only via class methods
class mdwrk {
public:
// ---------------------------------------------------------------------
// Constructor
mdwrk (std::string broker, std::string service, int verbose): m_broker(broker), m_service(service),m_verbose(verbose)
{
s_version_assert (4, 0);
m_context = new zmq::context_t (1);
s_catch_signals ();
connect_to_broker ();
}
// ---------------------------------------------------------------------
// Destructor
virtual
~mdwrk ()
{
delete m_worker;
delete m_context;
}
// ---------------------------------------------------------------------
// Send message to broker
// If no _msg is provided, creates one internally
void send_to_broker(const char *command, std::string option, zmsg *_msg)
{
zmsg *msg = _msg? new zmsg(*_msg): new zmsg ();
// Stack protocol envelope to start of message
if (!option.empty()) {
msg->push_front (option.c_str());
}
msg->push_front (command);
msg->push_front (k_mdpw_worker.data());
msg->push_front ("");
if (m_verbose) {
s_console ("I: sending %s to broker",
mdps_commands [(int) *command].data());
msg->dump ();
}
msg->send (*m_worker);
delete msg;
}
// ---------------------------------------------------------------------
// Connect or reconnect to broker
void connect_to_broker ()
{
if (m_worker) {
delete m_worker;
}
m_worker = new zmq::socket_t (*m_context, ZMQ_DEALER);
int linger = 0;
m_worker->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
s_set_id(*m_worker);
m_worker->connect (m_broker.c_str());
if (m_verbose)
s_console ("I: connecting to broker at %s...", m_broker.c_str());
// Register service with broker
send_to_broker (k_mdpw_ready.data(), m_service, NULL);
// If liveness hits zero, queue is considered disconnected
m_liveness = n_heartbeat_liveness;
m_heartbeat_at = s_clock () + m_heartbeat;
}
// ---------------------------------------------------------------------
// Set heartbeat delay
void
set_heartbeat (int heartbeat)
{
m_heartbeat = heartbeat;
}
// ---------------------------------------------------------------------
// Set reconnect delay
void
set_reconnect (int reconnect)
{
m_reconnect = reconnect;
}
// ---------------------------------------------------------------------
// Send reply, if any, to broker and wait for next request.
zmsg *
recv (zmsg *&reply_p)
{
// Format and send the reply if we were provided one
zmsg *reply = reply_p;
assert (reply || !m_expect_reply);
if (reply) {
assert (m_reply_to.size()!=0);
reply->wrap (m_reply_to.c_str(), "");
m_reply_to = "";
send_to_broker (k_mdpw_reply.data(), "", reply);
delete reply_p;
reply_p = 0;
}
m_expect_reply = true;
while (!s_interrupted) {
zmq::pollitem_t items[] = {
{ *m_worker, 0, ZMQ_POLLIN, 0 } };
zmq::poll (items, 1, m_heartbeat);
if (items[0].revents & ZMQ_POLLIN) {
zmsg *msg = new zmsg(*m_worker);
if (m_verbose) {
s_console ("I: received message from broker:");
msg->dump ();
}
m_liveness = n_heartbeat_liveness;
// Don't try to handle errors, just assert noisily
assert (msg->parts () >= 3);
ustring empty = msg->pop_front ();
assert (empty.compare((unsigned char *)"") == 0);
//assert (strcmp (empty, "") == 0);
//free (empty);
ustring header = msg->pop_front ();
assert (header.compare((unsigned char *)k_mdpw_worker.data()) == 0);
//free (header);
std::string command =(char*) msg->pop_front ().c_str();
if (command.compare (k_mdpw_request.data()) == 0) {
// We should pop and save as many addresses as there are
// up to a null part, but for now, just save one...
m_reply_to = msg->unwrap ();
return msg; // We have a request to process
}
else if (command.compare (k_mdpw_heartbeat.data()) == 0) {
// Do nothing for heartbeats
}
else if (command.compare (k_mdpw_disconnect.data()) == 0) {
connect_to_broker ();
}
else {
s_console ("E: invalid input message (%d)",
(int) *(command.c_str()));
msg->dump ();
}
delete msg;
}
else
if (--m_liveness == 0) {
if (m_verbose) {
s_console ("W: disconnected from broker - retrying...");
}
s_sleep (m_reconnect);
connect_to_broker ();
}
// Send HEARTBEAT if it's time
if (s_clock () >= m_heartbeat_at) {
send_to_broker (k_mdpw_heartbeat.data(), "", NULL);
m_heartbeat_at += m_heartbeat;
}
}
if (s_interrupted)
printf ("W: interrupt received, killing worker...\n");
return NULL;
}
private:
static constexpr uint32_t n_heartbeat_liveness = 3;// 3-5 is reasonable
const std::string m_broker;
const std::string m_service;
zmq::context_t *m_context;
zmq::socket_t *m_worker{}; // Socket to broker
const int m_verbose; // Print activity to stdout
// Heartbeat management
int64_t m_heartbeat_at; // When to send HEARTBEAT
size_t m_liveness; // How many attempts left
int m_heartbeat{2500}; // Heartbeat delay, msecs
int m_reconnect{2500}; // Reconnect delay, msecs
// Internal state
bool m_expect_reply{false}; // Zero only at start
// Return address, if any
std::string m_reply_to;
};
#endif
mdwrkapi: Majordomo worker API in C#
mdwrkapi: Majordomo worker API in CL
mdwrkapi: Majordomo worker API in Delphi
mdwrkapi: Majordomo worker API in Erlang
mdwrkapi: Majordomo worker API in Elixir
mdwrkapi: Majordomo worker API in F#
mdwrkapi: Majordomo worker API in Felix
mdwrkapi: Majordomo worker API in Go
// mdwrkapi class - Majordomo Protocol Worker API
// Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
//
// Author: iano <scaly.iano@gmail.com>
// Based on C & Python example
package main
import (
zmq "github.com/alecthomas/gozmq"
"log"
"time"
)
type Worker interface {
Close()
Recv([][]byte) [][]byte
}
type mdWorker struct {
broker string
context *zmq.Context
service string
verbose bool
worker *zmq.Socket
heartbeat time.Duration
heartbeatAt time.Time
liveness int
reconnect time.Duration
expectReply bool
replyTo []byte
}
func NewWorker(broker, service string, verbose bool) Worker {
context, _ := zmq.NewContext()
self := &mdWorker{
broker: broker,
context: context,
service: service,
verbose: verbose,
heartbeat: 2500 * time.Millisecond,
liveness: 0,
reconnect: 2500 * time.Millisecond,
}
self.reconnectToBroker()
return self
}
func (self *mdWorker) reconnectToBroker() {
if self.worker != nil {
self.worker.Close()
}
self.worker, _ = self.context.NewSocket(zmq.DEALER)
self.worker.SetLinger(0)
self.worker.Connect(self.broker)
if self.verbose {
log.Printf("I: connecting to broker at %s...\n", self.broker)
}
self.sendToBroker(MDPW_READY, []byte(self.service), nil)
self.liveness = HEARTBEAT_LIVENESS
self.heartbeatAt = time.Now().Add(self.heartbeat)
}
func (self *mdWorker) sendToBroker(command string, option []byte, msg [][]byte) {
if len(option) > 0 {
msg = append([][]byte{option}, msg...)
}
msg = append([][]byte{nil, []byte(MDPW_WORKER), []byte(command)}, msg...)
if self.verbose {
log.Printf("I: sending %X to broker\n", command)
Dump(msg)
}
self.worker.SendMultipart(msg, 0)
}
func (self *mdWorker) Close() {
if self.worker != nil {
self.worker.Close()
}
self.context.Close()
}
func (self *mdWorker) Recv(reply [][]byte) (msg [][]byte) {
// Format and send the reply if we were provided one
if len(reply) == 0 && self.expectReply {
panic("Error reply")
}
if len(reply) > 0 {
if len(self.replyTo) == 0 {
panic("Error replyTo")
}
reply = append([][]byte{self.replyTo, nil}, reply...)
self.sendToBroker(MDPW_REPLY, nil, reply)
}
self.expectReply = true
for {
items := zmq.PollItems{
zmq.PollItem{Socket: self.worker, Events: zmq.POLLIN},
}
_, err := zmq.Poll(items, self.heartbeat)
if err != nil {
panic(err) // Interrupted
}
if item := items[0]; item.REvents&zmq.POLLIN != 0 {
msg, _ = self.worker.RecvMultipart(0)
if self.verbose {
log.Println("I: received message from broker: ")
Dump(msg)
}
self.liveness = HEARTBEAT_LIVENESS
if len(msg) < 3 {
panic("Invalid msg") // Interrupted
}
header := msg[1]
if string(header) != MDPW_WORKER {
panic("Invalid header") // Interrupted
}
switch command := string(msg[2]); command {
case MDPW_REQUEST:
// We should pop and save as many addresses as there are
// up to a null part, but for now, just save one...
self.replyTo = msg[3]
msg = msg[5:]
return
case MDPW_HEARTBEAT:
// do nothin
case MDPW_DISCONNECT:
self.reconnectToBroker()
default:
log.Println("E: invalid input message:")
Dump(msg)
}
} else if self.liveness--; self.liveness <= 0 {
if self.verbose {
log.Println("W: disconnected from broker - retrying...")
}
time.Sleep(self.reconnect)
self.reconnectToBroker()
}
// Send HEARTBEAT if it's time
if self.heartbeatAt.Before(time.Now()) {
self.sendToBroker(MDPW_HEARTBEAT, nil, nil)
self.heartbeatAt = time.Now().Add(self.heartbeat)
}
}
return
}
mdwrkapi: Majordomo worker API in Haskell
mdwrkapi: Majordomo worker API in Haxe
package ;
import MDP;
import haxe.Stack;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMsg;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQException;
/**
* Majordomo Protocol Worker API
* Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7
*/
class MDWrkAPI
{
/** When to send HEARTBEAT */
public var heartbeatAt:Float;
/** Reconnect delay, in msecs */
public var reconnect:Int;
// Private instance fields
/** Our context */
private var ctx:ZContext;
/** Connection string to broker */
private var broker:String;
/** Socket to broker */
private var worker:ZMQSocket;
/** Defines service string provided by this worker */
private var service:String;
/** Print activity to stdout */
private var verbose:Bool;
/** Logger function used in verbose mode */
private var log:Dynamic->Void;
/** How many attempts left */
private var liveness:Int;
/** Heartbeat delay, in msecs */
private var heartbeat:Int;
/** Internal state */
private var expect_reply:Bool;
/** Return address frame, if any */
private var replyTo:ZFrame;
/** Reliability parameters */
private static inline var HEARTBEAT_LIVENESS = 3;
/**
* Constructor
* @param broker
* @param service
* @param ?verbose
* @param ?logger
*/
public function new(broker:String, service:String, ?verbose:Bool = false, ?logger:Dynamic->Void) {
ctx = new ZContext();
this.broker = broker;
this.service = service;
this.verbose = verbose;
this.heartbeat = 2500; // msecs
this.reconnect = 2500; // msecs
if (logger != null)
log = logger;
else
log = neko.Lib.println;
expect_reply = false;
connectToBroker();
}
/**
* Connect or reconnect to broker
*/
public function connectToBroker() {
if (worker != null)
worker.close();
worker = ctx.createSocket(ZMQ_DEALER);
worker.setsockopt(ZMQ_LINGER, 0);
worker.connect(broker);
if (verbose)
log("I: worker connecting to broker at " + broker + "...");
sendToBroker(MDP.MDPW_READY, service);
// If liveness hits zero, queue is considered disconnected
liveness = HEARTBEAT_LIVENESS;
heartbeatAt = Date.now().getTime() + heartbeat;
}
/**
* Destructor
*/
public function destroy() {
ctx.destroy();
}
/**
* Send message to broker
* If no msg is provided, creates one internally
*
* @param command
* @param option
* @param ?msg
*/
public function sendToBroker(command:String, option:String, ?msg:ZMsg) {
var _msg:ZMsg = { if (msg != null) msg.duplicate(); else new ZMsg(); }
// Stack protocol envelope to start of message
if (option != null) {
_msg.pushString(option);
}
_msg.pushString(command);
_msg.pushString(MDP.MDPW_WORKER);
_msg.pushString("");
if (verbose)
log("I: sending " + MDP.MDPS_COMMANDS[command.charCodeAt(0)] + " to broker " + _msg.toString());
_msg.send(worker);
}
/**
* Send reply, if any, to broker and wait for next request
*
* @param reply Message to send back. Destroyed if send if successful
* @return ZMsg Returns if there is a request to process
*/
public function recv(?reply:ZMsg):ZMsg {
if (reply == null && expect_reply)
return null;
if (reply != null) {
if (replyTo == null)
return null; // Cannot send if we dont know which client to reply to
reply.wrap(replyTo);
sendToBroker(MDP.MDPW_REPLY, null, reply);
reply.destroy();
}
expect_reply = true;
var poller = new ZMQPoller();
poller.registerSocket(worker, ZMQ.ZMQ_POLLIN());
while (true) {
// Poll socket for a reply, with timeout
try {
var res = poller.poll(heartbeat * 1000);
} catch (e:ZMQException) {
if (!ZMQ.isInterrupted()) {
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
} else {
Lib.println("W: interrupt received, killing worker...");
}
ctx.destroy();
return null;
}
if (poller.pollin(1)) {
var msg = ZMsg.recvMsg(worker);
if (msg == null)
break; // Interrupted
if (verbose)
log("I: received message from broker:" + msg.toString());
liveness = HEARTBEAT_LIVENESS;
if (msg.size() < 2)
return null; // Don't handle errors, just quit quietly
msg.pop();
var header = msg.pop();
if (!header.streq(MDP.MDPW_WORKER))
return null; // Don't handle errors, just quit quietly
header.destroy();
var command = msg.pop();
if (command.streq(MDP.MDPW_REQUEST)) {
// We should pop and save as many addresses as there are
// up to a null part, but for now, just save one...
replyTo = msg.unwrap();
command.destroy();
return msg; // We have a request to process
} else if (command.streq(MDP.MDPW_HEARTBEAT)) {
// Do nothing for heartbeats
} else if (command.streq(MDP.MDPW_DISCONNECT)) {
connectToBroker();
poller.unregisterAllSockets();
poller.registerSocket(worker, ZMQ.ZMQ_POLLIN());
} else {
log("E: invalid input message:" + msg.toString());
}
command.destroy();
msg.destroy();
} else if (--liveness == 0) {
if (verbose)
log("W: disconnected from broker - retrying...");
Sys.sleep(reconnect / 1000);
connectToBroker();
poller.unregisterAllSockets();
poller.registerSocket(worker, ZMQ.ZMQ_POLLIN());
}
// Send HEARTBEAT if it's time
if (Date.now().getTime() > heartbeatAt) {
sendToBroker(MDP.MDPW_HEARTBEAT, null, null);
heartbeatAt = Date.now().getTime() + heartbeat;
}
}
return null;
}
}
mdwrkapi: Majordomo worker API in Java
package guide;
import java.util.Formatter;
import org.zeromq.*;
/**
* Majordomo Protocol Client API, Java version Implements the MDP/Worker spec at
* http://rfc.zeromq.org/spec:7.
*/
public class mdwrkapi
{
private static final int HEARTBEAT_LIVENESS = 3; // 3-5 is reasonable
private String broker;
private ZContext ctx;
private String service;
private ZMQ.Socket worker; // Socket to broker
private long heartbeatAt; // When to send HEARTBEAT
private int liveness; // How many attempts left
private int heartbeat = 2500; // Heartbeat delay, msecs
private int reconnect = 2500; // Reconnect delay, msecs
// Internal state
private boolean expectReply = false; // false only at start
private long timeout = 2500;
private boolean verbose; // Print activity to stdout
private Formatter log = new Formatter(System.out);
// Return address, if any
private ZFrame replyTo;
public mdwrkapi(String broker, String service, boolean verbose)
{
assert (broker != null);
assert (service != null);
this.broker = broker;
this.service = service;
this.verbose = verbose;
ctx = new ZContext();
reconnectToBroker();
}
/**
* Send message to broker If no msg is provided, creates one internally
*
* @param command
* @param option
* @param msg
*/
void sendToBroker(MDP command, String option, ZMsg msg)
{
msg = msg != null ? msg.duplicate() : new ZMsg();
// Stack protocol envelope to start of message
if (option != null)
msg.addFirst(new ZFrame(option));
msg.addFirst(command.newFrame());
msg.addFirst(MDP.W_WORKER.newFrame());
msg.addFirst(new ZFrame(ZMQ.MESSAGE_SEPARATOR));
if (verbose) {
log.format("I: sending %s to broker\n", command);
msg.dump(log.out());
}
msg.send(worker);
}
/**
* Connect or reconnect to broker
*/
void reconnectToBroker()
{
if (worker != null) {
ctx.destroySocket(worker);
}
worker = ctx.createSocket(SocketType.DEALER);
worker.connect(broker);
if (verbose)
log.format("I: connecting to broker at %s\n", broker);
// Register service with broker
sendToBroker(MDP.W_READY, service, null);
// If liveness hits zero, queue is considered disconnected
liveness = HEARTBEAT_LIVENESS;
heartbeatAt = System.currentTimeMillis() + heartbeat;
}
/**
* Send reply, if any, to broker and wait for next request.
*/
public ZMsg receive(ZMsg reply)
{
// Format and send the reply if we were provided one
assert (reply != null || !expectReply);
if (reply != null) {
assert (replyTo != null);
reply.wrap(replyTo);
sendToBroker(MDP.W_REPLY, null, reply);
reply.destroy();
}
expectReply = true;
while (!Thread.currentThread().isInterrupted()) {
// Poll socket for a reply, with timeout
ZMQ.Poller items = ctx.createPoller(1);
items.register(worker, ZMQ.Poller.POLLIN);
if (items.poll(timeout) == -1)
break; // Interrupted
if (items.pollin(0)) {
ZMsg msg = ZMsg.recvMsg(worker);
if (msg == null)
break; // Interrupted
if (verbose) {
log.format("I: received message from broker: \n");
msg.dump(log.out());
}
liveness = HEARTBEAT_LIVENESS;
// Don't try to handle errors, just assert noisily
assert (msg != null && msg.size() >= 3);
ZFrame empty = msg.pop();
assert (empty.getData().length == 0);
empty.destroy();
ZFrame header = msg.pop();
assert (MDP.W_WORKER.frameEquals(header));
header.destroy();
ZFrame command = msg.pop();
if (MDP.W_REQUEST.frameEquals(command)) {
// We should pop and save as many addresses as there are
// up to a null part, but for now, just save one
replyTo = msg.unwrap();
command.destroy();
return msg; // We have a request to process
}
else if (MDP.W_HEARTBEAT.frameEquals(command)) {
// Do nothing for heartbeats
}
else if (MDP.W_DISCONNECT.frameEquals(command)) {
reconnectToBroker();
}
else {
log.format("E: invalid input message: \n");
msg.dump(log.out());
}
command.destroy();
msg.destroy();
}
else if (--liveness == 0) {
if (verbose)
log.format("W: disconnected from broker - retrying\n");
try {
Thread.sleep(reconnect);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the
// interrupted status
break;
}
reconnectToBroker();
}
// Send HEARTBEAT if it's time
if (System.currentTimeMillis() > heartbeatAt) {
sendToBroker(MDP.W_HEARTBEAT, null, null);
heartbeatAt = System.currentTimeMillis() + heartbeat;
}
items.close();
}
if (Thread.currentThread().isInterrupted())
log.format("W: interrupt received, killing worker\n");
return null;
}
public void destroy()
{
ctx.destroy();
}
// ============== getters and setters =================
public int getHeartbeat()
{
return heartbeat;
}
public void setHeartbeat(int heartbeat)
{
this.heartbeat = heartbeat;
}
public int getReconnect()
{
return reconnect;
}
public void setReconnect(int reconnect)
{
this.reconnect = reconnect;
}
}
mdwrkapi: Majordomo worker API in Julia
mdwrkapi: Majordomo worker API in Lua
--
-- mdwrkapi.lua - Majordomo Protocol Worker API
--
-- Author: Robert G. Jakabosky <bobby@sharedrealm.com>
--
local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable
local setmetatable = setmetatable
local mdp = require"mdp"
local zmq = require"zmq"
local zpoller = require"zmq.poller"
local zmsg = require"zmsg"
require"zhelpers"
local s_version_assert = s_version_assert
local obj_mt = {}
obj_mt.__index = obj_mt
function obj_mt:set_heartbeat(heartbeat)
self.heartbeat = heartbeat
end
function obj_mt:set_reconnect(reconnect)
self.reconnect = reconnect
end
function obj_mt:destroy()
if self.worker then self.worker:close() end
self.context:term()
end
-- Send message to broker
-- If no msg is provided, create one internally
local function s_mdwrk_send_to_broker(self, command, option, msg)
msg = msg or zmsg.new()
-- Stack protocol envelope to start of message
if option then
msg:push(option)
end
msg:push(command)
msg:push(mdp.MDPW_WORKER)
msg:push("")
if self.verbose then
s_console("I: sending %s to broker", mdp.mdps_commands[command])
msg:dump()
end
msg:send(self.worker)
end
local function s_mdwrk_connect_to_broker(self)
-- close old socket.
if self.worker then
self.poller:remove(self.worker)
self.worker:close()
end
self.worker = assert(self.context:socket(zmq.DEALER))
assert(self.worker:setopt(zmq.LINGER, 0))
assert(self.worker:connect(self.broker))
if self.verbose then
s_console("I: connecting to broker at %s...", self.broker)
end
-- Register service with broker
s_mdwrk_send_to_broker(self, mdp.MDPW_READY, self.service)
-- If liveness hits zero, queue is considered disconnected
self.liveness = HEARTBEAT_LIVENESS
self.heartbeat_at = s_clock() + self.heartbeat
-- add socket to poller
self.poller:add(self.worker, zmq.POLLIN, function()
self.got_msg = true
end)
end
--
-- Send reply, if any, to broker and wait for next request.
--
function obj_mt:recv(reply)
-- Format and send the reply if we are provided one
if reply then
assert(self.reply_to)
reply:wrap(self.reply_to, "")
self.reply_to = nil
s_mdwrk_send_to_broker(self, mdp.MDPW_REPLY, nil, reply)
end
self.expect_reply = true
self.got_msg = false
while true do
local cnt = assert(self.poller:poll(self.heartbeat * 1000))
if cnt ~= 0 and self.got_msg then
self.got_msg = false
local msg = zmsg.recv(self.worker)
if self.verbose then
s_console("I: received message from broker:")
msg:dump()
end
self.liveness = HEARTBEAT_LIVENESS
-- Don't try to handle errors, just assert noisily
assert(msg:parts() >= 3)
local empty = msg:pop()
assert(empty == "")
local header = msg:pop()
assert(header == mdp.MDPW_WORKER)
local command = msg:pop()
if command == mdp.MDPW_REQUEST then
-- We should pop and save as many addresses as there are
-- up to a null part, but for now, just save one...
self.reply_to = msg:unwrap()
return msg -- We have a request to process
elseif command == mdp.MDPW_HEARTBEAT then
-- Do nothing for heartbeats
elseif command == mdp.MDPW_DISCONNECT then
-- dis-connect and re-connect to broker.
s_mdwrk_connect_to_broker(self)
else
s_console("E: invalid input message (%d)", command:byte(1,1))
msg:dump()
end
else
self.liveness = self.liveness - 1
if (self.liveness == 0) then
if self.verbose then
s_console("W: disconnected from broker - retrying...")
end
-- sleep then Reconnect
s_sleep(self.reconnect)
s_mdwrk_connect_to_broker(self)
end
-- Send HEARTBEAT if it's time
if (s_clock() > self.heartbeat_at) then
s_mdwrk_send_to_broker(self, mdp.MDPW_HEARTBEAT)
self.heartbeat_at = s_clock() + self.heartbeat
end
end
end
end
module(...)
function new(broker, service, verbose)
s_version_assert(2, 1);
local self = setmetatable({
context = zmq.init(1),
poller = zpoller.new(1),
broker = broker,
service = service,
verbose = verbose,
heartbeat = 2500, -- msecs
reconnect = 2500, -- msecs
}, obj_mt)
s_mdwrk_connect_to_broker(self)
return self
end
setmetatable(_M, { __call = function(self, ...) return new(...) end })
mdwrkapi: Majordomo worker API in Node.js
mdwrkapi: Majordomo worker API in Objective-C
mdwrkapi: Majordomo worker API in ooc
mdwrkapi: Majordomo worker API in Perl
mdwrkapi: Majordomo worker API in PHP
<?php
/* =====================================================================
* mdwrkapi.php
*
* Majordomo Protocol Worker API
* Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
*/
include_once 'zmsg.php';
include_once 'mdp.php';
// Reliability parameters
define("HEARTBEAT_LIVENESS", 3); // 3-5 is reasonable
// Structure of our class
// We access these properties only via class methods
class MDWrk
{
private $ctx; // Our context
private $broker;
private $service;
private $worker; // Socket to broker
private $verbose = false; // Print activity to stdout
// Heartbeat management
private $heartbeat_at; // When to send HEARTBEAT
private $liveness; // How many attempts left
private $heartbeat; // Heartbeat delay, msecs
private $reconnect; // Reconnect delay, msecs
// Internal state
private $expect_reply = 0;
// Return address, if any
private $reply_to;
/**
* Constructor
*
* @param string $broker
* @param string $service
* @param boolean $verbose
*/
public function __construct($broker, $service, $verbose = false)
{
$this->ctx = new ZMQContext();
$this->broker = $broker;
$this->service = $service;
$this->verbose = $verbose;
$this->heartbeat = 2500; // msecs
$this->reconnect = 2500; // msecs
$this->connect_to_broker();
}
/**
* Send message to broker
* If no msg is provided, creates one internally
*
* @param string $command
* @param string $option
* @param Zmsg $msg
*/
public function send_to_broker($command, $option, $msg = null)
{
$msg = $msg ? $msg : new Zmsg();
if ($option) {
$msg->push($option);
}
$msg->push($command);
$msg->push(MDPW_WORKER);
$msg->push("");
if ($this->verbose) {
printf("I: sending %s to broker %s", $command, PHP_EOL);
echo $msg->__toString();
}
$msg->set_socket($this->worker)->send();
}
/**
* Connect or reconnect to broker
*/
public function connect_to_broker()
{
$this->worker = new ZMQSocket($this->ctx, ZMQ::SOCKET_DEALER);
$this->worker->connect($this->broker);
if ($this->verbose) {
printf("I: connecting to broker at %s... %s", $this->broker, PHP_EOL);
}
// Register service with broker
$this->send_to_broker(MDPW_READY, $this->service, NULL);
// If liveness hits zero, queue is considered disconnected
$this->liveness = HEARTBEAT_LIVENESS;
$this->heartbeat_at = microtime(true) + ($this->heartbeat / 1000);
}
/**
* Set heartbeat delay
*
* @param int $heartbeat
*/
public function set_heartbeat($heartbeat)
{
$this->heartbeat = $heartbeat;
}
/**
* Set reconnect delay
*
* @param int $reconnect
*/
public function set_reconnect($reconnect)
{
$this->reconnect = $reconnect;
}
/**
* Send reply, if any, to broker and wait for next request.
*
* @param Zmsg $reply
* @return Zmsg Returns if there is a request to process
*/
public function recv($reply = null)
{
// Format and send the reply if we were provided one
assert ($reply || !$this->expect_reply);
if ($reply) {
$reply->wrap($this->reply_to);
$this->send_to_broker(MDPW_REPLY, NULL, $reply);
}
$this->expect_reply = true;
$read = $write = array();
while (true) {
$poll = new ZMQPoll();
$poll->add($this->worker, ZMQ::POLL_IN);
$events = $poll->poll($read, $write, $this->heartbeat);
if ($events) {
$zmsg = new Zmsg($this->worker);
$zmsg->recv();
if ($this->verbose) {
echo "I: received message from broker:", PHP_EOL;
echo $zmsg->__toString();
}
$this->liveness = HEARTBEAT_LIVENESS;
// Don't try to handle errors, just assert noisily
assert ($zmsg->parts() >= 3);
$zmsg->pop();
$header = $zmsg->pop();
assert($header == MDPW_WORKER);
$command = $zmsg->pop();
if ($command == MDPW_REQUEST) {
// We should pop and save as many addresses as there are
// up to a null part, but for now, just save one...
$this->reply_to = $zmsg->unwrap();
return $zmsg;// We have a request to process
} elseif ($command == MDPW_HEARTBEAT) {
// Do nothing for heartbeats
} elseif ($command == MDPW_DISCONNECT) {
$this->connect_to_broker();
} else {
echo "E: invalid input message", PHP_EOL;
echo $zmsg->__toString();
}
} elseif (--$this->liveness == 0) { // poll ended on timeout, $event being false
if ($this->verbose) {
echo "W: disconnected from broker - retrying...", PHP_EOL;
}
usleep($this->reconnect*1000);
$this->connect_to_broker();
}
// Send HEARTBEAT if it's time
if (microtime(true) > $this->heartbeat_at) {
$this->send_to_broker(MDPW_HEARTBEAT, NULL, NULL);
$this->heartbeat_at = microtime(true) + ($this->heartbeat/1000);
}
}
}
}
mdwrkapi: Majordomo worker API in Python
"""Majordomo Protocol Worker API, Python version
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""
import logging
import time
import zmq
from zhelpers import dump
# MajorDomo protocol constants:
import MDP
class MajorDomoWorker(object):
"""Majordomo Protocol Worker API, Python version
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
broker = None
ctx = None
service = None
worker = None # Socket to broker
heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
liveness = 0 # How many attempts left
heartbeat = 2500 # Heartbeat delay, msecs
reconnect = 2500 # Reconnect delay, msecs
# Internal state
expect_reply = False # False only at start
timeout = 2500 # poller timeout
verbose = False # Print activity to stdout
# Return address, if any
reply_to = None
def __init__(self, broker, service, verbose=False):
self.broker = broker
self.service = service
self.verbose = verbose
self.ctx = zmq.Context()
self.poller = zmq.Poller()
logging.basicConfig(format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
self.reconnect_to_broker()
def reconnect_to_broker(self):
"""Connect or reconnect to broker"""
if self.worker:
self.poller.unregister(self.worker)
self.worker.close()
self.worker = self.ctx.socket(zmq.DEALER)
self.worker.linger = 0
self.worker.connect(self.broker)
self.poller.register(self.worker, zmq.POLLIN)
if self.verbose:
logging.info("I: connecting to broker at %s...", self.broker)
# Register service with broker
self.send_to_broker(MDP.W_READY, self.service, [])
# If liveness hits zero, queue is considered disconnected
self.liveness = self.HEARTBEAT_LIVENESS
self.heartbeat_at = time.time() + 1e-3 * self.heartbeat
def send_to_broker(self, command, option=None, msg=None):
"""Send message to broker.
If no msg is provided, creates one internally
"""
if msg is None:
msg = []
elif not isinstance(msg, list):
msg = [msg]
if option:
msg = [option] + msg
msg = [b'', MDP.W_WORKER, command] + msg
if self.verbose:
logging.info("I: sending %s to broker", command)
dump(msg)
self.worker.send_multipart(msg)
def recv(self, reply=None):
"""Send reply, if any, to broker and wait for next request."""
# Format and send the reply if we were provided one
assert reply is not None or not self.expect_reply
if reply is not None:
assert self.reply_to is not None
reply = [self.reply_to, b''] + reply
self.send_to_broker(MDP.W_REPLY, msg=reply)
self.expect_reply = True
while True:
# Poll socket for a reply, with timeout
try:
items = self.poller.poll(self.timeout)
except KeyboardInterrupt:
break # Interrupted
if items:
msg = self.worker.recv_multipart()
if self.verbose:
logging.info("I: received message from broker: ")
dump(msg)
self.liveness = self.HEARTBEAT_LIVENESS
# Don't try to handle errors, just assert noisily
assert len(msg) >= 3
empty = msg.pop(0)
assert empty == b''
header = msg.pop(0)
assert header == MDP.W_WORKER
command = msg.pop(0)
if command == MDP.W_REQUEST:
# We should pop and save as many addresses as there are
# up to a null part, but for now, just save one...
self.reply_to = msg.pop(0)
# pop empty
empty = msg.pop(0)
assert empty == b''
return msg # We have a request to process
elif command == MDP.W_HEARTBEAT:
# Do nothing for heartbeats
pass
elif command == MDP.W_DISCONNECT:
self.reconnect_to_broker()
else :
logging.error("E: invalid input message: ")
dump(msg)
else:
self.liveness -= 1
if self.liveness == 0:
if self.verbose:
logging.warn("W: disconnected from broker - retrying...")
try:
time.sleep(1e-3*self.reconnect)
except KeyboardInterrupt:
break
self.reconnect_to_broker()
# Send HEARTBEAT if it's time
if time.time() > self.heartbeat_at:
self.send_to_broker(MDP.W_HEARTBEAT)
self.heartbeat_at = time.time() + 1e-3*self.heartbeat
logging.warn("W: interrupt received, killing worker...")
return None
def destroy(self):
# context.destroy depends on pyzmq >= 2.1.10
self.ctx.destroy(0)
mdwrkapi: Majordomo worker API in Q
mdwrkapi: Majordomo worker API in Racket
mdwrkapi: Majordomo worker API in Ruby
#!/usr/bin/env ruby
# Majordomo Protocol Worker API, Ruby version
#
# Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
#
# Author: Tom van Leeuwen <tom@vleeuwen.eu>
# Based on Python example by Min RK
require 'ffi-rzmq'
require './mdp.rb'
class MajorDomoWorker
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
def initialize broker, service
@broker = broker
@service = service
@context = ZMQ::Context.new(1)
@poller = ZMQ::Poller.new
@worker = nil # Socket to broker
@heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
@liveness = 0 # How many attempts left
@timeout = 2500
@heartbeat = 2500 # Heartbeat delay, msecs
@reconnect = 2500 # Reconnect delay, msecs
@expect_reply = false # false only at start
@reply_to = nil
reconnect_to_broker
end
def recv reply
if reply and @reply_to
reply = reply.is_a?(Array) ? [@reply_to, ''].concat(reply) : [@reply_to, '', reply]
send_to_broker MDP::W_REPLY, nil, reply
end
@expect_reply = true
loop do
items = @poller.poll(@timeout)
if items
messages = []
@worker.recv_strings messages
@liveness = HEARTBEAT_LIVENESS
messages.shift # empty
if messages.shift != MDP::W_WORKER
puts "E: Header is not MDP::WORKER"
end
command = messages.shift
case command
when MDP::W_REQUEST
# We should pop and save as many addresses as there are
# up to a null part, but for now, just save one...
@reply_to = messages.shift
messages.shift # empty
return messages # We have a request to process
when MDP::W_HEARTBEAT
# do nothing
when MDP::W_DISCONNECT
reconnect_to_broker
else
end
else
@liveness -= 1
if @liveness == 0
sleep 0.001*@reconnect
reconnect_to_broker
end
end
if Time.now > @heartbeat_at
send_to_broker MDP::W_HEARTBEAT
@heartbeat_at = Time.now + 0.001 * @heartbeat
end
end
end
def reconnect_to_broker
if @worker
@poller.deregister @worker, ZMQ::DEALER
@worker.close
end
@worker = @context.socket ZMQ::DEALER
@worker.setsockopt ZMQ::LINGER, 0
@worker.connect @broker
@poller.register @worker, ZMQ::POLLIN
send_to_broker(MDP::W_READY, @service, [])
@liveness = HEARTBEAT_LIVENESS
@heartbeat_at = Time.now + 0.001 * @heartbeat
end
def send_to_broker command, option=nil, message=nil
# if no message is provided, create on internally
if message.nil?
message = []
elsif not message.is_a?(Array)
message = [message]
end
message = [option].concat message if option
message = ['', MDP::W_WORKER, command].concat message
@worker.send_strings message
end
end
mdwrkapi: Majordomo worker API in Rust
mdwrkapi: Majordomo worker API in Scala
mdwrkapi: Majordomo worker API in Tcl
# Majordomo Protocol Worker API, Tcl version.
# Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
package require TclOO
package require zmq
package require mdp
package provide MDWorker 1.0
oo::class create MDWorker {
variable context broker service worker verbose heartbeat_at liveness heartbeat reconnect expect_reply reply_to
constructor {ibroker iservice {iverbose}} {
set context [zmq context mdwrk_context_[::mdp::contextid]]
set broker $ibroker
set service $iservice
set verbose $iverbose
set heartbeat 2500
set reconnect 2500
set expect_reply 0
set reply_to ""
set worker ""
my connect_to_broker
}
destructor {
$worker close
$context term
}
# Send message to broker
method send_to_broker {command option msg} {
# Stack protocol envelope to start of message
if {[string length $option]} {
set msg [zmsg push $msg $option]
}
set msg [zmsg push $msg $::mdp::MDPW_COMMAND($command)]
set msg [zmsg push $msg $::mdp::MDPW_WORKER]
set msg [zmsg push $msg ""]
if {$verbose} {
puts "I: sending $command to broker"
puts [join [zmsg dump $msg] \n]
}
zmsg send $worker $msg
}
# Connect or reconnect to broker
method connect_to_broker {} {
if {[string length $worker]} {
$worker close
}
set worker [zmq socket mdwrk_socket_[::mdp::socketid] $context DEALER]
$worker connect $broker
if {$verbose} {
puts "I: connecting to broker at $broker..."
}
# Register service with broker
my send_to_broker READY $service {}
# If liveness hits zero, queue is considered disconnected
set liveness $::mdp::HEARTBEAT_LIVENESS
set heartbeat_at [expr {[clock milliseconds] + $heartbeat}]
}
# Set heartbeat delay
method set_heartbeat {iheartbeat} {
set heartbeat $iheartbeat
}
# Set reconnect delay
method set_reconnect {ireconnect} {
set reconnect $ireconnect
}
# Send reply, if any, to broker and wait for next request.
method recv {reply} {
# Format and send the reply if we were provided one
if {!([string length $reply] || !$expect_reply)} {
error "reply expected"
}
if {[string length $reply]} {
if {![string length $reply_to]} {
error "no reply_to found"
}
set reply [zmsg wrap $reply $reply_to]
my send_to_broker REPLY {} $reply
}
set expect_reply 1
while {1} {
set poll_set [list [list $worker [list POLLIN]]]
set rpoll_set [zmq poll $poll_set $heartbeat]
if {[llength $rpoll_set] && "POLLIN" in [lindex $rpoll_set 0 1]} {
set msg [zmsg recv $worker]
if {$verbose} {
puts "I: received message from broker:"
puts [join [zmsg dump $msg] \n]
}
set liveness $::mdp::HEARTBEAT_LIVENESS
# Don't try to handle errors, just assert noisily
if {[llength $msg] < 3} {
error "invalid message size"
}
set empty [zmsg pop msg]
if {[string length $empty]} {
error "expected empty frame"
}
set header [zmsg pop msg]
if {$header ne $mdp::MDPW_WORKER} {
error "unexpected header"
}
set command [zmsg pop msg]
if {$command eq $::mdp::MDPW_COMMAND(REQUEST)} {
# We should pop and save as many addresses as there are
# up to a null part, but for now, just save one…
set reply_to [zmsg unwrap msg]
return $msg ;# We have a request to process
} elseif {$command eq $mdp::MDPW_COMMAND(HEARTBEAT)} {
;# Do nothing for heartbeats
} elseif {$command eq $mdp::MDPW_COMMAND(DISCONNECT)} {
my connect_to_broker
} else {
puts "E: invalid input message"
puts [join [zmsg dump $msg] \n]
}
} elseif {[incr liveness -1] == 0} {
if {$verbose} {
puts "W: disconnected from broker - retrying..."
}
after $reconnect
my connect_to_broker
}
# Send HEARTBEAT if it's time
if {[clock milliseconds] > $heartbeat_at} {
my send_to_broker HEARTBEAT {} {}
set heartbeat_at [expr {[clock milliseconds] + $heartbeat}]
}
}
}
}
mdwrkapi: Majordomo worker API in OCaml
Let’s see how the worker API looks in action, with an example test program that implements an echo service:
mdworker: Majordomo worker application in Ada
mdworker: Majordomo worker application in Basic
mdworker: Majordomo worker application in C
// Majordomo Protocol worker example
// Uses the mdwrk API to hide all MDP aspects
// Lets us build this source without creating a library
#include "mdwrkapi.c"
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdwrk_t *session = mdwrk_new (
"tcp://localhost:5555", "echo", verbose);
zmsg_t *reply = NULL;
while (true) {
zmsg_t *request = mdwrk_recv (session, &reply);
if (request == NULL)
break; // Worker was interrupted
reply = request; // Echo is complex... :-)
}
mdwrk_destroy (&session);
return 0;
}
mdworker: Majordomo worker application in C++
//
// Majordomo Protocol worker example
// Uses the mdwrk API to hide all MDP aspects
//
// Lets us 'build mdworker' and 'build all'
//
// Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at>
//
#include "mdwrkapi.hpp"
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && strcmp (argv [1], "-v") == 0);
mdwrk session ("tcp://localhost:5555", "echo", verbose);
zmsg *reply = 0;
while (1) {
zmsg *request = session.recv (reply);
if (request == 0) {
break; // Worker was interrupted
}
reply = request; // Echo is complex... :-)
}
return 0;
}
mdworker: Majordomo worker application in C#
mdworker: Majordomo worker application in CL
mdworker: Majordomo worker application in Delphi
mdworker: Majordomo worker application in Erlang
mdworker: Majordomo worker application in Elixir
mdworker: Majordomo worker application in F#
mdworker: Majordomo worker application in Felix
mdworker: Majordomo worker application in Go
// Majordomo Protocol worker example
// Uses the mdwrk API to hide all MDP aspects
//
// To run this example, you may need to run multiple *.go files as below
// go run mdp.go zhelpers.go mdwrkapi.go mdworker.go [-v]
//
// Author: iano <scaly.iano@gmail.com>
package main
import (
"os"
)
func main() {
verbose := len(os.Args) >= 2 && os.Args[1] == "-v"
worker := NewWorker("tcp://localhost:5555", "echo", verbose)
for reply := [][]byte{}; ; {
request := worker.Recv(reply)
if len(request) == 0 {
break
}
reply = request
}
}
mdworker: Majordomo worker application in Haskell
import MDWorkerAPI
import ZHelpers
import System.Environment (getArgs)
import System.IO (hSetBuffering, stdout, BufferMode(NoBuffering))
import Control.Monad (forever, mapM_, when)
import Data.ByteString.Char8 (unpack, empty, ByteString(..))
main :: IO ()
main = do
args <- getArgs
when (length args /= 1) $
error "usage: mdworker <isVerbose(True|False)>"
let isVerbose = read (args !! 0) :: Bool
hSetBuffering stdout NoBuffering
withMDWorker "tcp://localhost:5555" "echo" isVerbose $ \session ->
doEcho session [empty]
where doEcho session reply = do
request <- mdwkrExchange session reply
doEcho (fst request) (snd request)
mdworker: Majordomo worker application in Haxe
package ;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZMsg;
/**
* Majordomo Protocol worker example
* Uses the MDWrk API to hde all MDP aspects
* @author Richard J Smith
*/
class MDWorker
{
public static function main() {
Lib.println("** MDWorker (see: http://zguide.zeromq.org/page:all#Service-Oriented-Reliable-Queuing-Majordomo-Pattern)");
var argArr = Sys.args();
var verbose = (argArr.length > 1 && argArr[argArr.length - 1] == "-v");
var session = new MDWrkAPI("tcp://localhost:5555", "echo", verbose);
var reply:ZMsg = null;
while (true) {
var request = session.recv(reply);
if (request == null)
break; // Interrupted
reply = request;
}
session.destroy();
}
}
mdworker: Majordomo worker application in Java
package guide;
import org.zeromq.ZMsg;
/**
* Majordomo Protocol worker example. Uses the mdwrk API to hide all MDP aspects
*
*/
public class mdworker
{
/**
* @param args
*/
public static void main(String[] args)
{
boolean verbose = (args.length > 0 && "-v".equals(args[0]));
mdwrkapi workerSession = new mdwrkapi("tcp://localhost:5555", "echo", verbose);
ZMsg reply = null;
while (!Thread.currentThread().isInterrupted()) {
ZMsg request = workerSession.receive(reply);
if (request == null)
break; //Interrupted
reply = request; // Echo is complex :-)
}
workerSession.destroy();
}
}
mdworker: Majordomo worker application in Julia
mdworker: Majordomo worker application in Lua
--
-- Majordomo Protocol worker example
-- Uses the mdwrk API to hide all MDP aspects
--
-- Author: Robert G. Jakabosky <bobby@sharedrealm.com>
--
require"mdwrkapi"
require"zmsg"
local verbose = (arg[1] == "-v")
local session = mdwrkapi.new("tcp://localhost:5555", "echo", verbose)
local reply
while true do
local request = session:recv(reply)
if not request then
break -- Worker was interrupted
end
reply = request -- Echo is complex... :-)
end
session:destroy()
mdworker: Majordomo worker application in Node.js
mdworker: Majordomo worker application in Objective-C
mdworker: Majordomo worker application in ooc
mdworker: Majordomo worker application in Perl
mdworker: Majordomo worker application in PHP
<?php
/*
* Majordomo Protocol worker example
* Uses the mdwrk API to hide all MDP aspects
*
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
include_once 'mdwrkapi.php';
$verbose = $_SERVER['argc'] > 1 && $_SERVER['argv'][1] == "-v";
$mdwrk = new Mdwrk("tcp://localhost:5555", "echo", $verbose);
$reply = null;
while (true) {
$request = $mdwrk->recv($reply);
$reply = $request; // Echo is complex... :-)
}
mdworker: Majordomo worker application in Python
"""Majordomo Protocol worker example.
Uses the mdwrk API to hide all MDP aspects
Author: Min RK <benjaminrk@gmail.com>
"""
import sys
from mdwrkapi import MajorDomoWorker
def main():
verbose = '-v' in sys.argv
worker = MajorDomoWorker("tcp://localhost:5555", b"echo", verbose)
reply = None
while True:
request = worker.recv(reply)
if request is None:
break # Worker was interrupted
reply = request # Echo is complex... :-)
if __name__ == '__main__':
main()
mdworker: Majordomo worker application in Q
mdworker: Majordomo worker application in Racket
mdworker: Majordomo worker application in Ruby
#!/usr/bin/env ruby
# Majordomo Protocol worker example.
#
# Author: Tom van Leeuwen <tom@vleeuwen.eu>
require './mdwrkapi.rb'
worker = MajorDomoWorker.new('tcp://localhost:5555', 'echo')
reply = nil
loop do
request = worker.recv reply
reply = request # Echo is complex... :-)
end
mdworker: Majordomo worker application in Rust
mdworker: Majordomo worker application in Scala
mdworker: Majordomo worker application in Tcl
#
# Majordomo Protocol worker example
# Uses the mdwrk API to hide all MDP aspects
#
lappend auto_path .
package require MDWorker 1.0
set verbose 0
foreach {k v} $argv {
if {$k eq "-v"} { set verbose 1 }
}
set session [MDWorker new "tcp://localhost:5555" "echo" $verbose]
set reply {}
while {1} {
set request [$session recv $reply]
if {[llength $request] == 0} {
break ;# Worker was interrupted
}
set reply [list "$request @ [clock format [clock seconds]] from $session"] ;# Echo is complex… :-)
}
$session destroy
mdworker: Majordomo worker application in OCaml
Here are some things to note about the worker API code:
-
The APIs are single-threaded. This means, for example, that the worker won’t send heartbeats in the background. Happily, this is exactly what we want: if the worker application gets stuck, heartbeats will stop and the broker will stop sending requests to the worker.
-
The worker API doesn’t do an exponential back-off; it’s not worth the extra complexity.
-
The APIs don’t do any error reporting. If something isn’t as expected, they raise an assertion (or exception depending on the language). This is ideal for a reference implementation, so any protocol errors show immediately. For real applications, the API should be robust against invalid messages.
You might wonder why the worker API is manually closing its socket and opening a new one, when ZeroMQ will automatically reconnect a socket if the peer disappears and comes back. Look back at the Simple Pirate and Paranoid Pirate workers to understand. Although ZeroMQ will automatically reconnect workers if the broker dies and comes back up, this isn’t sufficient to re-register the workers with the broker. I know of at least two solutions. The simplest, which we use here, is for the worker to monitor the connection using heartbeats, and if it decides the broker is dead, to close its socket and start afresh with a new socket. The alternative is for the broker to challenge unknown workers when it gets a heartbeat from the worker and ask them to re-register. That would require protocol support.
Now let’s design the Majordomo broker. Its core structure is a set of queues, one per service. We will create these queues as workers appear (we could delete them as workers disappear, but forget that for now because it gets complex). Additionally, we keep a queue of workers per service.
And here is the broker:
mdbroker: Majordomo broker in Ada
mdbroker: Majordomo broker in Basic
mdbroker: Majordomo broker in C
// Majordomo Protocol broker
// A minimal C implementation of the Majordomo Protocol as defined in
// http://rfc.zeromq.org/spec:7 and http://rfc.zeromq.org/spec:8.
#include "czmq.h"
#include "mdp.h"
// We'd normally pull these from config data
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 2500 // msecs
#define HEARTBEAT_EXPIRY HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
// .split broker class structure
// The broker class defines a single broker instance:
typedef struct {
zctx_t *ctx; // Our context
void *socket; // Socket for clients & workers
int verbose; // Print activity to stdout
char *endpoint; // Broker binds to this endpoint
zhash_t *services; // Hash of known services
zhash_t *workers; // Hash of known workers
zlist_t *waiting; // List of waiting workers
uint64_t heartbeat_at; // When to send HEARTBEAT
} broker_t;
static broker_t *
s_broker_new (int verbose);
static void
s_broker_destroy (broker_t **self_p);
static void
s_broker_bind (broker_t *self, char *endpoint);
static void
s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
s_broker_client_msg (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
s_broker_purge (broker_t *self);
// .split service class structure
// The service class defines a single service instance:
typedef struct {
broker_t *broker; // Broker instance
char *name; // Service name
zlist_t *requests; // List of client requests
zlist_t *waiting; // List of waiting workers
size_t workers; // How many workers we have
} service_t;
static service_t *
s_service_require (broker_t *self, zframe_t *service_frame);
static void
s_service_destroy (void *argument);
static void
s_service_dispatch (service_t *service, zmsg_t *msg);
// .split worker class structure
// The worker class defines a single worker, idle or active:
typedef struct {
broker_t *broker; // Broker instance
char *id_string; // Identity of worker as string
zframe_t *identity; // Identity frame for routing
service_t *service; // Owning service, if known
int64_t expiry; // When worker expires, if no heartbeat
} worker_t;
static worker_t *
s_worker_require (broker_t *self, zframe_t *identity);
static void
s_worker_delete (worker_t *self, int disconnect);
static void
s_worker_destroy (void *argument);
static void
s_worker_send (worker_t *self, char *command, char *option,
zmsg_t *msg);
static void
s_worker_waiting (worker_t *self);
// .split broker constructor and destructor
// Here are the constructor and destructor for the broker:
static broker_t *
s_broker_new (int verbose)
{
broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
// Initialize broker state
self->ctx = zctx_new ();
self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
self->verbose = verbose;
self->services = zhash_new ();
self->workers = zhash_new ();
self->waiting = zlist_new ();
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
return self;
}
static void
s_broker_destroy (broker_t **self_p)
{
assert (self_p);
if (*self_p) {
broker_t *self = *self_p;
zctx_destroy (&self->ctx);
zhash_destroy (&self->services);
zhash_destroy (&self->workers);
zlist_destroy (&self->waiting);
free (self);
*self_p = NULL;
}
}
// .split broker bind method
// This method binds the broker instance to an endpoint. We can call
// this multiple times. Note that MDP uses a single socket for both clients
// and workers:
void
s_broker_bind (broker_t *self, char *endpoint)
{
zsocket_bind (self->socket, endpoint);
zclock_log ("I: MDP broker/0.2.0 is active at %s", endpoint);
}
// .split broker worker_msg method
// This method processes one READY, REPLY, HEARTBEAT, or
// DISCONNECT message sent to the broker by a worker:
static void
s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 1); // At least, command
zframe_t *command = zmsg_pop (msg);
char *id_string = zframe_strhex (sender);
int worker_ready = (zhash_lookup (self->workers, id_string) != NULL);
free (id_string);
worker_t *worker = s_worker_require (self, sender);
if (zframe_streq (command, MDPW_READY)) {
if (worker_ready) // Not first command in session
s_worker_delete (worker, 1);
else
if (zframe_size (sender) >= 4 // Reserved service name
&& memcmp (zframe_data (sender), "mmi.", 4) == 0)
s_worker_delete (worker, 1);
else {
// Attach worker to service and mark as idle
zframe_t *service_frame = zmsg_pop (msg);
worker->service = s_service_require (self, service_frame);
worker->service->workers++;
s_worker_waiting (worker);
zframe_destroy (&service_frame);
}
}
else
if (zframe_streq (command, MDPW_REPLY)) {
if (worker_ready) {
// Remove and save client return envelope and insert the
// protocol header and service name, then rewrap envelope.
zframe_t *client = zmsg_unwrap (msg);
zmsg_pushstr (msg, worker->service->name);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
s_worker_waiting (worker);
}
else
s_worker_delete (worker, 1);
}
else
if (zframe_streq (command, MDPW_HEARTBEAT)) {
if (worker_ready)
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
else
s_worker_delete (worker, 1);
}
else
if (zframe_streq (command, MDPW_DISCONNECT))
s_worker_delete (worker, 0);
else {
zclock_log ("E: invalid input message");
zmsg_dump (msg);
}
free (command);
zmsg_destroy (&msg);
}
// .split broker client_msg method
// Process a request coming from a client. We implement MMI requests
// directly here (at present, we implement only the mmi.service request):
static void
s_broker_client_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 2); // Service name + body
zframe_t *service_frame = zmsg_pop (msg);
service_t *service = s_service_require (self, service_frame);
// Set reply return identity to client sender
zmsg_wrap (msg, zframe_dup (sender));
// If we got a MMI service request, process that internally
if (zframe_size (service_frame) >= 4
&& memcmp (zframe_data (service_frame), "mmi.", 4) == 0) {
char *return_code;
if (zframe_streq (service_frame, "mmi.service")) {
char *name = zframe_strdup (zmsg_last (msg));
service_t *service =
(service_t *) zhash_lookup (self->services, name);
return_code = service && service->workers? "200": "404";
free (name);
}
else
return_code = "501";
zframe_reset (zmsg_last (msg), return_code, strlen (return_code));
// Remove & save client return envelope and insert the
// protocol header and service name, then rewrap envelope.
zframe_t *client = zmsg_unwrap (msg);
zmsg_prepend (msg, &service_frame);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
}
else
// Else dispatch the message to the requested service
s_service_dispatch (service, msg);
zframe_destroy (&service_frame);
}
// .split broker purge method
// This method deletes any idle workers that haven't pinged us in a
// while. We hold workers from oldest to most recent so we can stop
// scanning whenever we find a live worker. This means we'll mainly stop
// at the first worker, which is essential when we have large numbers of
// workers (we call this method in our critical path):
static void
s_broker_purge (broker_t *self)
{
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
if (zclock_time () < worker->expiry)
break; // Worker is alive, we're done here
if (self->verbose)
zclock_log ("I: deleting expired worker: %s",
worker->id_string);
s_worker_delete (worker, 0);
worker = (worker_t *) zlist_first (self->waiting);
}
}
// .split service methods
// Here is the implementation of the methods that work on a service:
// Lazy constructor that locates a service by name or creates a new
// service if there is no service already with that name.
static service_t *
s_service_require (broker_t *self, zframe_t *service_frame)
{
assert (service_frame);
char *name = zframe_strdup (service_frame);
service_t *service =
(service_t *) zhash_lookup (self->services, name);
if (service == NULL) {
service = (service_t *) zmalloc (sizeof (service_t));
service->broker = self;
service->name = name;
service->requests = zlist_new ();
service->waiting = zlist_new ();
zhash_insert (self->services, name, service);
zhash_freefn (self->services, name, s_service_destroy);
if (self->verbose)
zclock_log ("I: added service: %s", name);
}
else
free (name);
return service;
}
// Service destructor is called automatically whenever the service is
// removed from broker->services.
static void
s_service_destroy (void *argument)
{
service_t *service = (service_t *) argument;
while (zlist_size (service->requests)) {
zmsg_t *msg = zlist_pop (service->requests);
zmsg_destroy (&msg);
}
zlist_destroy (&service->requests);
zlist_destroy (&service->waiting);
free (service->name);
free (service);
}
// .split service dispatch method
// This method sends requests to waiting workers:
static void
s_service_dispatch (service_t *self, zmsg_t *msg)
{
assert (self);
if (msg) // Queue message if any
zlist_append (self->requests, msg);
s_broker_purge (self->broker);
while (zlist_size (self->waiting) && zlist_size (self->requests)) {
worker_t *worker = zlist_pop (self->waiting);
zlist_remove (self->broker->waiting, worker);
zmsg_t *msg = zlist_pop (self->requests);
s_worker_send (worker, MDPW_REQUEST, NULL, msg);
zmsg_destroy (&msg);
}
}
// .split worker methods
// Here is the implementation of the methods that work on a worker:
// Lazy constructor that locates a worker by identity, or creates a new
// worker if there is no worker already with that identity.
static worker_t *
s_worker_require (broker_t *self, zframe_t *identity)
{
assert (identity);
// self->workers is keyed off worker identity
char *id_string = zframe_strhex (identity);
worker_t *worker =
(worker_t *) zhash_lookup (self->workers, id_string);
if (worker == NULL) {
worker = (worker_t *) zmalloc (sizeof (worker_t));
worker->broker = self;
worker->id_string = id_string;
worker->identity = zframe_dup (identity);
zhash_insert (self->workers, id_string, worker);
zhash_freefn (self->workers, id_string, s_worker_destroy);
if (self->verbose)
zclock_log ("I: registering new worker: %s", id_string);
}
else
free (id_string);
return worker;
}
// This method deletes the current worker.
static void
s_worker_delete (worker_t *self, int disconnect)
{
assert (self);
if (disconnect)
s_worker_send (self, MDPW_DISCONNECT, NULL, NULL);
if (self->service) {
zlist_remove (self->service->waiting, self);
self->service->workers--;
}
zlist_remove (self->broker->waiting, self);
// This implicitly calls s_worker_destroy
zhash_delete (self->broker->workers, self->id_string);
}
// Worker destructor is called automatically whenever the worker is
// removed from broker->workers.
static void
s_worker_destroy (void *argument)
{
worker_t *self = (worker_t *) argument;
zframe_destroy (&self->identity);
free (self->id_string);
free (self);
}
// .split worker send method
// This method formats and sends a command to a worker. The caller may
// also provide a command option, and a message payload:
static void
s_worker_send (worker_t *self, char *command, char *option, zmsg_t *msg)
{
msg = msg? zmsg_dup (msg): zmsg_new ();
// Stack protocol envelope to start of message
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);
// Stack routing envelope to start of message
zmsg_wrap (msg, zframe_dup (self->identity));
if (self->broker->verbose) {
zclock_log ("I: sending %s to worker",
mdps_commands [(int) *command]);
zmsg_dump (msg);
}
zmsg_send (&msg, self->broker->socket);
}
// This worker is now waiting for work
static void
s_worker_waiting (worker_t *self)
{
// Queue to broker and service waiting lists
assert (self->broker);
zlist_append (self->broker->waiting, self);
zlist_append (self->service->waiting, self);
self->expiry = zclock_time () + HEARTBEAT_EXPIRY;
s_service_dispatch (self->service, NULL);
}
// .split main task
// Finally, here is the main task. We create a new broker instance and
// then process messages on the broker socket:
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
broker_t *self = s_broker_new (verbose);
s_broker_bind (self, "tcp://*:5555");
// Get and process messages forever or until interrupted
while (true) {
zmq_pollitem_t items [] = {
{ self->socket, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted
// Process next input message, if any
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->socket);
if (!msg)
break; // Interrupted
if (self->verbose) {
zclock_log ("I: received message:");
zmsg_dump (msg);
}
zframe_t *sender = zmsg_pop (msg);
zframe_t *empty = zmsg_pop (msg);
zframe_t *header = zmsg_pop (msg);
if (zframe_streq (header, MDPC_CLIENT))
s_broker_client_msg (self, sender, msg);
else
if (zframe_streq (header, MDPW_WORKER))
s_broker_worker_msg (self, sender, msg);
else {
zclock_log ("E: invalid message:");
zmsg_dump (msg);
zmsg_destroy (&msg);
}
zframe_destroy (&sender);
zframe_destroy (&empty);
zframe_destroy (&header);
}
// Disconnect and delete any expired workers
// Send heartbeats to idle workers if needed
if (zclock_time () > self->heartbeat_at) {
s_broker_purge (self);
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
s_worker_send (worker, MDPW_HEARTBEAT, NULL, NULL);
worker = (worker_t *) zlist_next (self->waiting);
}
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
}
if (zctx_interrupted)
printf ("W: interrupt received, shutting down...\n");
s_broker_destroy (&self);
return 0;
}
mdbroker: Majordomo broker in C++
//
// Majordomo Protocol broker
// A minimal implementation of http://rfc.zeromq.org/spec:7 and spec:8
//
// Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at>
//
#include "zmsg.hpp"
#include "mdp.h"
#include <map>
#include <set>
#include <deque>
#include <list>
// We'd normally pull these from config data
static constexpr uint32_t n_heartbeat_liveness = 3;
static constexpr uint32_t n_heartbeat_interval = 2500; // msecs
static constexpr uint32_t n_heartbeat_expiry = n_heartbeat_interval * n_heartbeat_liveness; // msecs
struct service;
// This defines one worker, idle or active
struct worker
{
std::string m_identity; // Address of worker
service * m_service; // Owning service, if known
int64_t m_expiry; // Expires at unless heartbeat
worker(std::string identity, service * service = nullptr, int64_t expiry = 0): m_identity(identity), m_service(service), m_expiry(expiry) {}
};
// This defines a single service
struct service
{
~service ()
{
for(size_t i = 0; i < m_requests.size(); i++) {
delete m_requests[i];
}
}
const std::string m_name; // Service name
std::deque<zmsg*> m_requests; // List of client requests
std::list<worker*> m_waiting; // List of waiting workers
size_t m_workers; // How many workers we have
service(std::string name): m_name(name) {}
};
// This defines a single broker
class broker {
public:
// ---------------------------------------------------------------------
// Constructor for broker object
broker (int verbose):m_verbose(verbose)
{
// Initialize broker state
m_context = new zmq::context_t(1);
m_socket = new zmq::socket_t(*m_context, ZMQ_ROUTER);
}
// ---------------------------------------------------------------------
// Destructor for broker object
virtual
~broker ()
{
while (! m_services.empty())
{
delete m_services.begin()->second;
m_services.erase(m_services.begin());
}
while (! m_workers.empty())
{
delete m_workers.begin()->second;
m_workers.erase(m_workers.begin());
}
}
// ---------------------------------------------------------------------
// Bind broker to endpoint, can call this multiple times
// We use a single socket for both clients and workers.
void
bind (std::string endpoint)
{
m_endpoint = endpoint;
m_socket->bind(m_endpoint.c_str());
s_console ("I: MDP broker/0.1.1 is active at %s", endpoint.c_str());
}
private:
// ---------------------------------------------------------------------
// Delete any idle workers that haven't pinged us in a while.
void
purge_workers ()
{
std::deque<worker*> toCull;
int64_t now = s_clock();
for (auto wrk = m_waiting.begin(); wrk != m_waiting.end(); ++wrk)
{
if ((*wrk)->m_expiry <= now)
toCull.push_back(*wrk);
}
for (auto wrk = toCull.begin(); wrk != toCull.end(); ++wrk)
{
if (m_verbose) {
s_console ("I: deleting expired worker: %s",
(*wrk)->m_identity.c_str());
}
worker_delete(*wrk, 0);
}
}
// ---------------------------------------------------------------------
// Locate or create new service entry
service *
service_require (std::string name)
{
assert (!name.empty());
if (m_services.count(name)) {
return m_services.at(name);
}
service * srv = new service(name);
m_services.insert(std::pair{name, srv});
if (m_verbose) {
s_console("I: added service: %s", name.c_str());
}
return srv;
}
// ---------------------------------------------------------------------
// Dispatch requests to waiting workers as possible
void
service_dispatch (service *srv, zmsg *msg)
{
assert (srv);
if (msg) { // Queue message if any
srv->m_requests.push_back(msg);
}
purge_workers ();
while (! srv->m_waiting.empty() && ! srv->m_requests.empty())
{
// Choose the most recently seen idle worker; others might be about to expire
auto wrk = srv->m_waiting.begin();
auto next = wrk;
for (++next; next != srv->m_waiting.end(); ++next)
{
if ((*next)->m_expiry > (*wrk)->m_expiry)
wrk = next;
}
zmsg *msg = srv->m_requests.front();
srv->m_requests.pop_front();
worker_send (*wrk, k_mdpw_request.data(), "", msg);
m_waiting.erase(*wrk);
srv->m_waiting.erase(wrk);
delete msg;
}
}
// ---------------------------------------------------------------------
// Handle internal service according to 8/MMI specification
void
service_internal (std::string service_name, zmsg *msg)
{
if (service_name.compare("mmi.service") == 0) {
// service *srv = m_services[msg->body()]; // Dangerous! Silently add key with default value
service *srv = m_services.count(msg->body()) ? m_services.at(msg->body()) : nullptr;
if (srv && srv->m_workers) {
msg->body_set("200");
} else {
msg->body_set("404");
}
} else {
msg->body_set("501");
}
// Remove & save client return envelope and insert the
// protocol header and service name, then rewrap envelope.
std::string client = msg->unwrap();
msg->wrap(k_mdp_client.data(), service_name.c_str());
msg->wrap(client.c_str(), "");
msg->send (*m_socket);
delete msg;
}
// ---------------------------------------------------------------------
// Creates worker if necessary
worker *
worker_require (std::string identity)
{
assert (!identity.empty());
// self->workers is keyed off worker identity
if (m_workers.count(identity)) {
return m_workers.at(identity);
} else {
worker *wrk = new worker(identity);
m_workers.insert(std::make_pair(identity, wrk));
if (m_verbose) {
s_console ("I: registering new worker: %s", identity.c_str());
}
return wrk;
}
}
// ---------------------------------------------------------------------
// Deletes worker from all data structures, and destroys worker
void
worker_delete (worker *&wrk, int disconnect)
{
assert (wrk);
if (disconnect) {
worker_send (wrk, k_mdpw_disconnect.data(), "", NULL);
}
if (wrk->m_service) {
for(auto it = wrk->m_service->m_waiting.begin();
it != wrk->m_service->m_waiting.end();) {
if (*it == wrk) {
it = wrk->m_service->m_waiting.erase(it);
}
else {
++it;
}
}
wrk->m_service->m_workers--;
}
m_waiting.erase(wrk);
// This implicitly calls the worker destructor
m_workers.erase(wrk->m_identity);
delete wrk;
}
// ---------------------------------------------------------------------
// Process message sent to us by a worker
void
worker_process (std::string sender, zmsg *msg)
{
assert (msg && msg->parts() >= 1); // At least, command
std::string command = (char *)msg->pop_front().c_str();
bool worker_ready = m_workers.count(sender)>0;
worker *wrk = worker_require (sender);
if (command.compare (k_mdpw_ready.data()) == 0) {
if (worker_ready) { // Not first command in session
worker_delete (wrk, 1);
}
else {
if (sender.size() >= 4 // Reserved service name
&& sender.find_first_of("mmi.") == 0) {
worker_delete (wrk, 1);
} else {
// Attach worker to service and mark as idle
std::string service_name = (char *) msg->pop_front ().c_str();
wrk->m_service = service_require (service_name);
wrk->m_service->m_workers++;
worker_waiting (wrk);
}
}
} else {
if (command.compare (k_mdpw_reply.data()) == 0) {
if (worker_ready) {
// Remove & save client return envelope and insert the
// protocol header and service name, then rewrap envelope.
std::string client = msg->unwrap ();
msg->wrap (k_mdp_client.data(), wrk->m_service->m_name.c_str());
msg->wrap (client.c_str(), "");
msg->send (*m_socket);
worker_waiting (wrk);
}
else {
worker_delete (wrk, 1);
}
} else {
if (command.compare (k_mdpw_heartbeat.data()) == 0) {
if (worker_ready) {
wrk->m_expiry = s_clock () + n_heartbeat_expiry;
} else {
worker_delete (wrk, 1);
}
} else {
if (command.compare (k_mdpw_disconnect.data()) == 0) {
worker_delete (wrk, 0);
} else {
s_console ("E: invalid input message (%d)", (int) *command.c_str());
msg->dump ();
}
}
}
}
delete msg;
}
// ---------------------------------------------------------------------
// Send message to worker
// If pointer to message is provided, sends that message
void
worker_send (worker *worker,
const char *command, std::string option, zmsg *msg)
{
msg = (msg ? new zmsg(*msg) : new zmsg ());
// Stack protocol envelope to start of message
if (option.size()>0) { // Optional frame after command
msg->push_front (option.c_str());
}
msg->push_front (command);
msg->push_front (k_mdpw_worker.data());
// Stack routing envelope to start of message
msg->wrap(worker->m_identity.c_str(), "");
if (m_verbose) {
s_console ("I: sending %s to worker",
mdps_commands [(int) *command].data());
msg->dump ();
}
msg->send (*m_socket);
delete msg;
}
// ---------------------------------------------------------------------
// This worker is now waiting for work
void
worker_waiting (worker *worker)
{
assert (worker);
// Queue to broker and service waiting lists
m_waiting.insert(worker);
worker->m_service->m_waiting.push_back(worker);
worker->m_expiry = s_clock () + n_heartbeat_expiry;
// Attempt to process outstanding requests
service_dispatch (worker->m_service, 0);
}
// ---------------------------------------------------------------------
// Process a request coming from a client
void
client_process (std::string sender, zmsg *msg)
{
assert (msg && msg->parts () >= 2); // Service name + body
std::string service_name =(char *) msg->pop_front().c_str();
service *srv = service_require (service_name);
// Set reply return address to client sender
msg->wrap (sender.c_str(), "");
if (service_name.length() >= 4
&& service_name.find_first_of("mmi.") == 0) {
service_internal (service_name, msg);
} else {
service_dispatch (srv, msg);
}
}
public:
// Get and process messages forever or until interrupted
void
start_brokering() {
int64_t now = s_clock();
int64_t heartbeat_at = now + n_heartbeat_interval;
while (!s_interrupted) {
zmq::pollitem_t items [] = {
{ *m_socket, 0, ZMQ_POLLIN, 0} };
int64_t timeout = heartbeat_at - now;
if (timeout < 0)
timeout = 0;
zmq::poll (items, 1, (long)timeout);
// Process next input message, if any
if (items [0].revents & ZMQ_POLLIN) {
zmsg *msg = new zmsg(*m_socket);
if (m_verbose) {
s_console ("I: received message:");
msg->dump ();
}
std::string sender = (char*)msg->pop_front ().c_str();
msg->pop_front (); //empty message
std::string header = (char*)msg->pop_front ().c_str();
if (header.compare(k_mdp_client.data()) == 0) {
client_process (sender, msg);
}
else if (header.compare(k_mdpw_worker.data()) == 0) {
worker_process (sender, msg);
}
else {
s_console ("E: invalid message:");
msg->dump ();
delete msg;
}
}
// Disconnect and delete any expired workers
// Send heartbeats to idle workers if needed
now = s_clock();
if (now >= heartbeat_at) {
purge_workers ();
for (auto it = m_waiting.begin();
it != m_waiting.end() && (*it)!=0; it++) {
worker_send (*it, k_mdpw_heartbeat.data(), "", NULL);
}
heartbeat_at += n_heartbeat_interval;
now = s_clock();
}
}
}
private:
zmq::context_t * m_context; // 0MQ context
zmq::socket_t * m_socket; // Socket for clients & workers
const int m_verbose; // Print activity to stdout
std::string m_endpoint; // Broker binds to this endpoint
std::map<std::string, service*> m_services; // Hash of known services
std::map<std::string, worker*> m_workers; // Hash of known workers
std::set<worker*> m_waiting; // List of waiting workers
};
// ---------------------------------------------------------------------
// Main broker work happens here
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && strcmp (argv [1], "-v") == 0);
s_version_assert (4, 0);
s_catch_signals ();
broker brk(verbose);
brk.bind ("tcp://*:5555");
brk.start_brokering();
if (s_interrupted)
printf ("W: interrupt received, shutting down...\n");
return 0;
}
mdbroker: Majordomo broker in C#
mdbroker: Majordomo broker in CL
mdbroker: Majordomo broker in Delphi
mdbroker: Majordomo broker in Erlang
mdbroker: Majordomo broker in Elixir
mdbroker: Majordomo broker in F#
mdbroker: Majordomo broker in Felix
mdbroker: Majordomo broker in Go
// Majordomo Protocol broker
// A minimal C implementation of the Majordomo Protocol as defined in
// http://rfc.zeromq.org/spec:7 and http://rfc.zeromq.org/spec:8.
//
// To run this example, you may need to run multiple *.go files as below
// go run mdp.go zhelpers.go mdbroker.go [-v]
//
// Author: iano <scaly.iano@gmail.com>
// Based on C & Python example
package main
import (
"encoding/hex"
zmq "github.com/alecthomas/gozmq"
"log"
"os"
"time"
)
const (
INTERNAL_SERVICE_PREFIX = "mmi."
HEARTBEAT_INTERVAL = 2500 * time.Millisecond
HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
)
type Broker interface {
Close()
Run()
}
type mdbWorker struct {
identity string // Hex Identity of worker
address []byte // Address to route to
expiry time.Time // Expires at this point, unless heartbeat
service *mdService // Owning service, if known
}
type mdService struct {
broker Broker
name string
requests [][][]byte // List of client requests
waiting *ZList // List of waiting workers
}
type mdBroker struct {
context *zmq.Context // Context
heartbeatAt time.Time // When to send HEARTBEAT
services map[string]*mdService // Known services
socket *zmq.Socket // Socket for clients & workers
waiting *ZList // Idle workers
workers map[string]*mdbWorker // Known workers
verbose bool // Print activity to stdout
}
func NewBroker(endpoint string, verbose bool) Broker {
context, _ := zmq.NewContext()
socket, _ := context.NewSocket(zmq.ROUTER)
socket.SetLinger(0)
socket.Bind(endpoint)
log.Printf("I: MDP broker/0.1.1 is active at %s\n", endpoint)
return &mdBroker{
context: context,
heartbeatAt: time.Now().Add(HEARTBEAT_INTERVAL),
services: make(map[string]*mdService),
socket: socket,
waiting: NewList(),
workers: make(map[string]*mdbWorker),
verbose: verbose,
}
}
// Deletes worker from all data structures, and deletes worker.
func (self *mdBroker) deleteWorker(worker *mdbWorker, disconnect bool) {
if worker == nil {
panic("Nil worker")
}
if disconnect {
self.sendToWorker(worker, MDPW_DISCONNECT, nil, nil)
}
if worker.service != nil {
worker.service.waiting.Delete(worker)
}
self.waiting.Delete(worker)
delete(self.workers, worker.identity)
}
// Dispatch requests to waiting workers as possible
func (self *mdBroker) dispatch(service *mdService, msg [][]byte) {
if service == nil {
panic("Nil service")
}
// Queue message if any
if len(msg) != 0 {
service.requests = append(service.requests, msg)
}
self.purgeWorkers()
for service.waiting.Len() > 0 && len(service.requests) > 0 {
msg, service.requests = service.requests[0], service.requests[1:]
elem := service.waiting.Pop()
self.waiting.Remove(elem)
worker, _ := elem.Value.(*mdbWorker)
self.sendToWorker(worker, MDPW_REQUEST, nil, msg)
}
}
// Process a request coming from a client.
func (self *mdBroker) processClient(sender []byte, msg [][]byte) {
// Service name + body
if len(msg) < 2 {
panic("Invalid msg")
}
service := msg[0]
// Set reply return address to client sender
msg = append([][]byte{sender, nil}, msg[1:]...)
if string(service[:4]) == INTERNAL_SERVICE_PREFIX {
self.serviceInternal(service, msg)
} else {
self.dispatch(self.requireService(string(service)), msg)
}
}
// Process message sent to us by a worker.
func (self *mdBroker) processWorker(sender []byte, msg [][]byte) {
// At least, command
if len(msg) < 1 {
panic("Invalid msg")
}
command, msg := msg[0], msg[1:]
identity := hex.EncodeToString(sender)
worker, workerReady := self.workers[identity]
if !workerReady {
worker = &mdbWorker{
identity: identity,
address: sender,
expiry: time.Now().Add(HEARTBEAT_EXPIRY),
}
self.workers[identity] = worker
if self.verbose {
log.Printf("I: registering new worker: %s\n", identity)
}
}
switch string(command) {
case MDPW_READY:
// At least, a service name
if len(msg) < 1 {
panic("Invalid msg")
}
service := msg[0]
// Not first command in session or Reserved service name
if workerReady || string(service[:4]) == INTERNAL_SERVICE_PREFIX {
self.deleteWorker(worker, true)
} else {
// Attach worker to service and mark as idle
worker.service = self.requireService(string(service))
self.workerWaiting(worker)
}
case MDPW_REPLY:
if workerReady {
// Remove & save client return envelope and insert the
// protocol header and service name, then rewrap envelope.
client := msg[0]
msg = append([][]byte{client, nil, []byte(MDPC_CLIENT), []byte(worker.service.name)}, msg[2:]...)
self.socket.SendMultipart(msg, 0)
self.workerWaiting(worker)
} else {
self.deleteWorker(worker, true)
}
case MDPW_HEARTBEAT:
if workerReady {
worker.expiry = time.Now().Add(HEARTBEAT_EXPIRY)
} else {
self.deleteWorker(worker, true)
}
case MDPW_DISCONNECT:
self.deleteWorker(worker, false)
default:
log.Println("E: invalid message:")
Dump(msg)
}
}
// Look for & kill expired workers.
// Workers are oldest to most recent, so we stop at the first alive worker.
func (self *mdBroker) purgeWorkers() {
now := time.Now()
for elem := self.waiting.Front(); elem != nil; elem = self.waiting.Front() {
worker, _ := elem.Value.(*mdbWorker)
if worker.expiry.After(now) {
break
}
self.deleteWorker(worker, false)
}
}
// Locates the service (creates if necessary).
func (self *mdBroker) requireService(name string) *mdService {
if len(name) == 0 {
panic("Invalid service name")
}
service, ok := self.services[name]
if !ok {
service = &mdService{
name: name,
waiting: NewList(),
}
self.services[name] = service
}
return service
}
// Send message to worker.
// If message is provided, sends that message.
func (self *mdBroker) sendToWorker(worker *mdbWorker, command string, option []byte, msg [][]byte) {
// Stack routing and protocol envelopes to start of message and routing envelope
if len(option) > 0 {
msg = append([][]byte{option}, msg...)
}
msg = append([][]byte{worker.address, nil, []byte(MDPW_WORKER), []byte(command)}, msg...)
if self.verbose {
log.Printf("I: sending %X to worker\n", command)
Dump(msg)
}
self.socket.SendMultipart(msg, 0)
}
// Handle internal service according to 8/MMI specification
func (self *mdBroker) serviceInternal(service []byte, msg [][]byte) {
returncode := "501&