Majordomo worker API in PHP

<?php
/* =====================================================================
* mdwrkapi.php
*
* Majordomo Protocol Worker API
* Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
*/

include_once 'zmsg.php';
include_once 'mdp.php';

// Reliability parameters
define("HEARTBEAT_LIVENESS", 3); // 3-5 is reasonable

// Structure of our class
// We access these properties only via class methods

class MDWrk
{
private $ctx; // Our context
private $broker;
private $service;
private $worker; // Socket to broker
private $verbose = false; // Print activity to stdout

// Heartbeat management
private $heartbeat_at; // When to send HEARTBEAT
private $liveness; // How many attempts left
private $heartbeat; // Heartbeat delay, msecs
private $reconnect; // Reconnect delay, msecs

// Internal state
private $expect_reply = 0;

// Return address, if any
private $reply_to;

/**
* Constructor
*
* @param string $broker
* @param string $service
* @param boolean $verbose
*/

public function __construct($broker, $service, $verbose = false)
{
$this->ctx = new ZMQContext();
$this->broker = $broker;
$this->service = $service;
$this->verbose = $verbose;
$this->heartbeat = 2500; // msecs
$this->reconnect = 2500; // msecs

$this->connect_to_broker();
}

/**
* Send message to broker
* If no msg is provided, creates one internally
*
* @param string $command
* @param string $option
* @param Zmsg $msg
*/

public function send_to_broker($command, $option, $msg = null)
{
$msg = $msg ? $msg : new Zmsg();

if ($option) {
$msg->push($option);
}
$msg->push($command);
$msg->push(MDPW_WORKER);
$msg->push("");

if ($this->verbose) {
printf("I: sending %s to broker %s", $command, PHP_EOL);
echo $msg->__toString();
}

$msg->set_socket($this->worker)->send();
}

/**
* Connect or reconnect to broker
*/

public function connect_to_broker()
{
$this->worker = new ZMQSocket($this->ctx, ZMQ::SOCKET_DEALER);
$this->worker->connect($this->broker);
if ($this->verbose) {
printf("I: connecting to broker at %s… %s", $this->broker, PHP_EOL);
}

// Register service with broker
$this->send_to_broker(MDPW_READY, $this->service, NULL);

// If liveness hits zero, queue is considered disconnected
$this->liveness = HEARTBEAT_LIVENESS;
$this->heartbeat_at = microtime(true) + ($this->heartbeat / 1000);
}

/**
* Set heartbeat delay
*
* @param int $heartbeat
*/

public function set_heartbeat($heartbeat)
{
$this->heartbeat = $heartbeat;
}

/**
* Set reconnect delay
*
* @param int $reconnect
*/

public function set_reconnect($reconnect)
{
$this->reconnect = $reconnect;
}

/**
* Send reply, if any, to broker and wait for next request.
*
* @param Zmsg $reply
* @return Zmsg Returns if there is a request to process
*/

public function recv($reply = null)
{
// Format and send the reply if we were provided one
assert ($reply || !$this->expect_reply);
if ($reply) {
$reply->wrap($this->reply_to);
$this->send_to_broker(MDPW_REPLY, NULL, $reply);
}
$this->expect_reply = true;

$read = $write = array();
while (true) {
$poll = new ZMQPoll();
$poll->add($this->worker, ZMQ::POLL_IN);
$events = $poll->poll($read, $write, $this->heartbeat);

if ($events) {
$zmsg = new Zmsg($this->worker);
$zmsg->recv();

if ($this->verbose) {
echo "I: received message from broker:", PHP_EOL;
echo $zmsg->__toString();
}

$this->liveness = HEARTBEAT_LIVENESS;

// Don't try to handle errors, just assert noisily
assert ($zmsg->parts() >= 3);

$zmsg->pop();
$header = $zmsg->pop();
assert($header == MDPW_WORKER);

$command = $zmsg->pop();
if ($command == MDPW_REQUEST) {
// We should pop and save as many addresses as there are
// up to a null part, but for now, just save one…
$this->reply_to = $zmsg->unwrap();

return $zmsg;// We have a request to process
} elseif ($command == MDPW_HEARTBEAT) {
// Do nothing for heartbeats
} elseif ($command == MDPW_DISCONNECT) {
$this->connect_to_broker();
} else {
echo "E: invalid input message", PHP_EOL;
echo $zmsg->__toString();
}
} elseif (--$this->liveness == 0) { // poll ended on timeout, $event being false
if ($this->verbose) {
echo "W: disconnected from broker - retrying…", PHP_EOL;
}
usleep($this->reconnect*1000);
$this->connect_to_broker();
}

// Send HEARTBEAT if it's time
if (microtime(true) > $this->heartbeat_at) {
$this->send_to_broker(MDPW_HEARTBEAT, NULL, NULL);
$this->heartbeat_at = microtime(true) + ($this->heartbeat/1000);
}
}
}
}