Round-trip demonstrator in PHP

<?php

/*
* Round-trip demonstrator
*
* While this example runs in a single process, that is just to make
* it easier to start and stop the example. Each thread has its own
* context and conceptually acts as a separate process.
*
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/

include 'zmsg.php';

function client_task()
{
$context = new ZMQContext();
$client = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
$client->setSockOpt(ZMQ::SOCKOPT_IDENTITY, "C");
$client->connect("tcp://localhost:5555");

echo "Setting up test…", PHP_EOL;
usleep(10000);

echo "Synchronous round-trip test…", PHP_EOL;
$start = microtime(true);
$text = "HELLO";
for ($requests = 0; $requests < 10000; $requests++) {
$client->send($text);
$msg = $client->recv();
}
printf (" %d calls/second%s",
(1000 * 10000) / (int) ((microtime(true) - $start) * 1000),
PHP_EOL);

echo "Asynchronous round-trip test…", PHP_EOL;
$start = microtime(true);
for ($requests = 0; $requests < 100000; $requests++) {
$client->send($text);
}

for ($requests = 0; $requests < 100000; $requests++) {
$client->recv();
}

printf (" %d calls/second%s",
(1000 * 100000) / (int) ((microtime(true) - $start) * 1000),
PHP_EOL);
}

function worker_task()
{
$context = new ZMQContext();
$worker = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
$worker->setSockOpt(ZMQ::SOCKOPT_IDENTITY, "W");
$worker->connect("tcp://localhost:5556");
while (true) {
$zmsg = new Zmsg($worker);
$zmsg->recv();
$zmsg->send();
}
}

function broker_task()
{
// Prepare our context and sockets
$context = new ZMQContext();
$frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
$backend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
$frontend->bind("tcp://*:5555");
$backend->bind("tcp://*:5556");

// Initialize poll set
$poll = new ZMQPoll();
$poll->add($frontend, ZMQ::POLL_IN);
$poll->add($backend, ZMQ::POLL_IN);
$read = $write = array();
while (true) {
$events = $poll->poll($read, $write);
foreach ($read as $socket) {
$zmsg = new Zmsg($socket);
$zmsg->recv();
if ($socket === $frontend) {
$zmsg->push("W");
$zmsg->set_socket($backend)->send();
} elseif ($socket === $backend) {
$zmsg->pop();
$zmsg->push("C");
$zmsg->set_socket($frontend)->send();
}

}
}
}

$wpid = pcntl_fork();
if ($wpid == 0) {
worker_task();
exit;
}
$bpid = pcntl_fork();
if ($bpid == 0) {
broker_task();
exit;
}
sleep(1);
client_task();
posix_kill($wpid, SIGKILL);
posix_kill($bpid, SIGKILL);