How to explain ZeroMQ? Some of us start by saying all the wonderful things it does. It’s sockets on steroids. It’s like mailboxes with routing. It’s fast! Others try to share their moment of enlightenment, that zap-pow-kaboom satori paradigm-shift moment when it all became obvious. Things just become simpler. Complexity goes away. It opens the mind. Others try to explain by comparison. It’s smaller, simpler, but still looks familiar. Personally, I like to remember why we made ZeroMQ at all, because that’s most likely where you, the reader, still are today.
Programming is science dressed up as art because most of us don’t understand the physics of software and it’s rarely, if ever, taught. The physics of software is not algorithms, data structures, languages and abstractions. These are just tools we make, use, throw away. The real physics of software is the physics of people–specifically, our limitations when it comes to complexity, and our desire to work together to solve large problems in pieces. This is the science of programming: make building blocks that people can understand and use easily, and people will work together to solve the very largest problems.
We live in a connected world, and modern software has to navigate this world. So the building blocks for tomorrow’s very largest solutions are connected and massively parallel. It’s not enough for code to be “strong and silent” any more. Code has to talk to code. Code has to be chatty, sociable, well-connected. Code has to run like the human brain, trillions of individual neurons firing off messages to each other, a massively parallel network with no central control, no single point of failure, yet able to solve immensely difficult problems. And it’s no accident that the future of code looks like the human brain, because the endpoints of every network are, at some level, human brains.
If you’ve done any work with threads, protocols, or networks, you’ll realize this is pretty much impossible. It’s a dream. Even connecting a few programs across a few sockets is plain nasty when you start to handle real life situations. Trillions? The cost would be unimaginable. Connecting computers is so difficult that software and services to do this is a multi-billion dollar business.
So we live in a world where the wiring is years ahead of our ability to use it. We had a software crisis in the 1980s, when leading software engineers like Fred Brooks believed
there was no “Silver Bullet” to “promise even one order of magnitude of improvement in productivity, reliability, or simplicity”.
Brooks missed free and open source software, which solved that crisis, enabling us to share knowledge efficiently. Today we face another software crisis, but it’s one we don’t talk about much. Only the largest, richest firms can afford to create connected applications. There is a cloud, but it’s proprietary. Our data and our knowledge is disappearing from our personal computers into clouds that we cannot access and with which we cannot compete. Who owns our social networks? It is like the mainframe-PC revolution in reverse.
We can leave the political philosophy
for another book. The point is that while the Internet offers the potential of massively connected code, the reality is that this is out of reach for most of us, and so large interesting problems (in health, education, economics, transport, and so on) remain unsolved because there is no way to connect the code, and thus no way to connect the brains that could work together to solve these problems.
There have been many attempts to solve the challenge of connected code. There are thousands of IETF specifications, each solving part of the puzzle. For application developers, HTTP is perhaps the one solution to have been simple enough to work, but it arguably makes the problem worse by encouraging developers and architects to think in terms of big servers and thin, stupid clients.
So today people are still connecting applications using raw UDP and TCP, proprietary protocols, HTTP, and Websockets. It remains painful, slow, hard to scale, and essentially centralized. Distributed P2P architectures are mostly for play, not work. How many applications use Skype or Bittorrent to exchange data?
Which brings us back to the science of programming. To fix the world, we needed to do two things. One, to solve the general problem of “how to connect any code to any code, anywhere”. Two, to wrap that up in the simplest possible building blocks that people could understand and use easily.
It sounds ridiculously simple. And maybe it is. That’s kind of the whole point.
We assume you are using at least version 3.2 of ZeroMQ. We assume you are using a Linux box or something similar. We assume you can read C code, more or less, as that’s the default language for the examples. We assume that when we write constants like PUSH or SUBSCRIBE, you can imagine they are really called ZMQ_PUSH or ZMQ_SUBSCRIBE if the programming language needs it.
Next, browse the examples subdirectory. You’ll find examples by language. If there are examples missing in a language you use, you’re encouraged to
submit a translation. This is how this text became so useful, thanks to the work of many people. All examples are licensed under MIT/X11.
So let’s start with some code. We start of course with a Hello World example. We’ll make a client and a server. The client sends “Hello” to the server, which replies with “World”. Here’s the server in C, which opens a ZeroMQ socket on port 5555, reads requests on it, and replies with “World” to each request:
;;; -*- Mode:Lisp; Syntax:ANSI-Common-Lisp; -*-;;;;;; Hello World server in Common Lisp;;; Binds REP socket to tcp://*:5555;;; Expects "Hello" from client, replies with "World";;;;;; Kamil Shakirov <kamils80@gmail.com>;;;
(defpackage#:zguide.hwserver
(:nicknames#:hwserver)
(:use#:cl#:zhelpers)
(:export#:main))
(in-package:zguide.hwserver)
(defunmain ()
;; Prepare our context and socket
(zmq:with-context (context1)
(zmq:with-socket (socketcontextzmq:rep)
(zmq:bindsocket"tcp://*:5555")
(loop
(let ((request (make-instance'zmq:msg)))
;; Wait for next request from client
(zmq:recvsocketrequest)
(message"Received request: [~A]~%"
(zmq:msg-data-as-stringrequest))
;; Do some 'work'
(sleep1)
;; Send reply back to client
(let ((reply (make-instance'zmq:msg:data"World")))
(zmq:sendsocketreply))))))
(cleanup))
hwserver: Hello World server in Delphi
program hwserver;
//
// Hello World server
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
// @author Varga Balazs <bb.varga@gmail.com>
//
{$APPTYPE CONSOLE}
{$I zmq.inc}
uses
SysUtils
, zmq
;
var
context,
responder: Pointer;
request,
reply: zmq_msg_t;
begin
context := zmq_init(1);
// Socket to talk to clients
responder := zmq_socket( context, ZMQ_REP );
zmq_bind( responder, 'tcp://*:5555' );
while true do
begin
// Wait for next request from client
zmq_msg_init( request );
{$ifdef zmq3}
zmq_recvmsg( responder, request, 0 );
{$else}
zmq_recv( responder, request, 0 );
{$endif}
Writeln( 'Received Hello' );
zmq_msg_close( request );
// Do some 'work'
sleep( 1000 );
// Send reply back to client
zmq_msg_init( reply );
zmq_msg_init_size( reply, 5 );
Move( 'World', zmq_msg_data( reply )^, 5 );
{$ifdef zmq3}
zmq_sendmsg( responder, reply, 0 );
{$else}
zmq_send( responder, reply, 0 );
{$endif}
zmq_msg_close( reply );
end;
// We never get here but if we did, this would be how we end
zmq_close( responder );
zmq_term( context );
end.
// Hello World server
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
open ZMQ;
println "hwserver, Felix version";
var context = zmq_init (1);
// Socket to talk to clients
var responder = context.mk_socket ZMQ_REP;
responder.bind "tcp://*:5555";
var request = #zmq_msg_t;
var reply = #zmq_msg_t;
while true do
// Wait for next request from client
request.init_string "Hello";
responder.recv_msg request;
println$ "Received Hello=" + string(request);
request.close;
// Do some 'work'
Faio::sleep (sys_clock,1.0);
// Send reply back to client
reply.init_size 5.size;
memcpy (zmq_msg_data reply, c"World".address, 5.size);
responder.send_msg reply;
reply.close;
done
hwserver: Hello World server in Go
//
// Hello World Zeromq server
//
// Author: Aaron Raddon github.com/araddon
// Requires: http://github.com/alecthomas/gozmq
//
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq""time"
)
funcmain() {
context, _ := zmq.NewContext()
socket, _ := context.NewSocket(zmq.REP)
defer context.Close()
defer socket.Close()
socket.Bind("tcp://*:5555")
// Wait for messages
for {
msg, _ := socket.Recv(0)
println("Received ", string(msg))
// do some fake "work"
time.Sleep(time.Second)
// send reply back to client
reply := fmt.Sprintf("World")
socket.Send([]byte(reply), 0)
}
}
hwserver: Hello World server in Haskell
{-# LANGUAGE OverloadedStrings #-}-- Hello World servermoduleMainwhereimportControl.ConcurrentimportControl.MonadimportSystem.ZMQ4.Monadicmain::IO()main= runZMQ $ do-- Socket to talk to clients
responder <- socket Rep
bind responder "tcp://*:5555"
forever $ do
buffer <- receive responder
liftIO $ do
putStrLn "Received Hello"
threadDelay 1000000-- Do some 'work'
send responder []"World"
hwserver: Hello World server in Haxe
package ;
importhaxe.io.Bytes;
importneko.Lib;
importneko.Sys;
importorg.zeromq.ZMQ;
importorg.zeromq.ZMQContext;
importorg.zeromq.ZMQException;
importorg.zeromq.ZMQSocket;
/**
* Hello World server in Haxe
* Binds REP to tcp://*:5556
* Expects "Hello" from client, replies with "World"
* Use with HelloWorldClient.hx
*
*/class HelloWorldServer
{
publicstaticfunctionmain() {
var context:ZMQContext = ZMQContext.instance();
var responder:ZMQSocket = context.socket(ZMQ_REP);
Lib.println("** HelloWorldServer (see: http://zguide.zeromq.org/page:all#Ask-and-Ye-Shall-Receive)");
responder.setsockopt(ZMQ_LINGER, 0);
responder.bind("tcp://*:5556");
try {
while (true) {
// Wait for next request from clientvar request:Bytes = responder.recvMsg();
trace ("Received request:" + request.toString());
// Do some work
Sys.sleep(1);
// Send reply back to client
responder.sendMsg(Bytes.ofString("World"));
}
} catch (e:ZMQException) {
trace (e.toString());
}
responder.close();
context.term();
}
}
hwserver: Hello World server in Java
packageguide;
//
// Hello World server in Java
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
importorg.zeromq.SocketType;
importorg.zeromq.ZMQ;
importorg.zeromq.ZContext;
publicclasshwserver
{
publicstaticvoidmain(String[] args) throws Exception
{
try (ZContext context = new ZContext()) {
// Socket to talk to clients
ZMQ.Socket socket = context.createSocket(SocketType.REP);
socket.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
byte[] reply = socket.recv(0);
System.out.println(
"Received " + ": [" + new String(reply, ZMQ.CHARSET) + "]"
);
Thread.sleep(1000); // Do some 'work'
String response = "world";
socket.send(response.getBytes(ZMQ.CHARSET), 0);
}
}
}
}
hwserver: Hello World server in Julia
#!/usr/bin/env julia## Hello World server in Julia# Binds REP socket to tcp://*:5555# Expects "Hello" from client, replies "World"#using ZMQ
context = Context()
socket = Socket(context, REP)
ZMQ.bind(socket, "tcp://*:5555")
whiletrue# Wait for next request from client
message = String(ZMQ.recv(socket))
println("Received request: $message")
# Do some 'work'
sleep(1)
# Send reply back to client
ZMQ.send(socket, "World")
end# classy hit men always clean up when finish the job.
ZMQ.close(socket)
ZMQ.close(context)
hwserver: Hello World server in Lua
---- Hello World server-- Binds REP socket to tcp://*:5555-- Expects "Hello" from client, replies with "World"---- Author: Robert G. Jakabosky <bobby@sharedrealm.com>--
require"zmq"
require"zhelpers"local context = zmq.init(1)
-- Socket to talk to clientslocal socket = context:socket(zmq.REP)
socket:bind("tcp://*:5555")
whiletruedo-- Wait for next request from clientlocal request = socket:recv()
print("Received Hello [" .. request .. "]")
-- Do some 'work'
s_sleep(1000)
-- Send reply back to client
socket:send("World")
end-- We never get here but if we did, this would be how we end
socket:close()
context:term()
hwserver: Hello World server in Node.js
// Hello World server
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "world"
var zmq = require('zeromq');
// socket to talk to clients
var responder = zmq.socket('rep');
responder.on('message', function(request) {
console.log("Received request: [", request.toString(), "]");
// do some 'work'
setTimeout(function() {
// send reply back to client.
responder.send("World");
}, 1000);
});
responder.bind('tcp://*:5555', function(err) {
if (err) {
console.log(err);
} else {
console.log("Listening on 5555...");
}
});
process.on('SIGINT', function() {
responder.close();
});
hwserver: Hello World server in Objective-C
//
// Hello World server
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
#import <Foundation/Foundation.h>
#import "ZMQObjc.h"
intmain(void)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init];
ZMQContext *ctx = [[[ZMQContext alloc] initWithIOThreads:1U] autorelease];
/* Get a socket to talk to clients. */static NSString *const kEndpoint = @"tcp://*:5555";
ZMQSocket *responder = [ctx socketWithType:ZMQ_REP];
BOOL didBind = [responder bindToEndpoint:kEndpoint];
if (!didBind) {
NSLog(@"*** Failed to bind to endpoint [%@].", kEndpoint);
return EXIT_FAILURE;
}
for (;;) {
/* Create a local pool so that autoreleased objects can be disposed of
* at the end of each go through the loop.
* Otherwise, memory usage would continue to rise
* until the end of the process.
*/
NSAutoreleasePool *localPool = [[NSAutoreleasePool alloc] init];
/* Block waiting for next request from client. */
NSData *request = [responder receiveDataWithFlags:0];
NSString *text = [[[NSString alloc]
initWithData:request encoding:NSUTF8StringEncoding] autorelease];
NSLog(@"Received request: %@", text);
/* "Work" for a bit. */
sleep(1);
/* Send reply back to client. */static NSString *const kWorld = @"World";
constchar *replyCString = [kWorld UTF8String];
const NSUInteger replyLength = [kWorld
lengthOfBytesUsingEncoding:NSUTF8StringEncoding];
NSData *reply = [NSData dataWithBytes:replyCString length:replyLength];
[responder sendData:reply withFlags:0];
[localPool drain];
}
/* Close the socket to avoid blocking in -[ZMQContext terminate]. */
[responder close];
/* Dispose of the context and socket. */
[pool drain];
return EXIT_SUCCESS;
}
# Hello World server in Perlusestrict;
usewarnings;
usev5.10;
useZMQ::FFI;
useZMQ::FFI::Constantsqw(ZMQ_REP);
# Socket to talk to clientsmy$context = ZMQ::FFI->new();
my$responder = $context->socket(ZMQ_REP);
$responder->bind("tcp://*:5555");
while (1) {
$responder->recv();
say "Received Hello";
sleep1;
$responder->send("World");
}
hwserver: Hello World server in PHP
<?php/*
* Hello World server
* Binds REP socket to tcp://*:5555
* Expects "Hello" from client, replies with "World"
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/$context = new ZMQContext(1);
// Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");
while (true) {
// Wait for next request from client
$request = $responder->recv();
printf ("Received request: [%s]\n", $request);
// Do some 'work'
sleep (1);
// Send reply back to client
$responder->send("World");
}
hwserver: Hello World server in Python
## Hello World server in Python# Binds REP socket to tcp://*:5555# Expects b"Hello" from client, replies with b"World"#importtimeimportzmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
# Wait for next request from client
message = socket.recv()
print(f"Received request: {message}")
# Do some 'work'
time.sleep(1)
# Send reply back to client
socket.send_string("World")
hwserver: Hello World server in Q
// Hello World server
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
\l qzmq.q
ctx:zctx.new[]
// Socket to talk to clients
responder:zsocket.new[ctx; zmq`REP]
port:zsocket.bind[responder; `$"tcp://*:5555"]
while[1b and not zctx.interrupted[];
// Wait for next request from client
s:zmsg.recv responder;
// Do some 'work'
zclock.sleep 1;
// Send reply back to client
m1:zmsg.new[];
zmsg.push[m1; zframe.new["World"]];
zmsg.send[m1; responder]]
// We never get here but if we did, this would how we end
zsocket.destroy[ctx; responder]
zctx.destroy[ctx]
hwserver: Hello World server in Racket
#lang racket#|
Hello World server in Racket
Binds REP socket to tcp://*:5555
Expects "Hello" from client, replies with "World"
|#
(require net/zmq)
(define ctxt (context 1))
(define sock (socket ctxt 'REP))
(socket-bind! sock "tcp://*:5555")
(let loop ()
(define message (socket-recv! sock))
(printf"Received request: ~a\n" message)
(sleep1)
(socket-send! sock #"World")
(loop))
(context-close! ctxt)
hwserver: Hello World server in Ruby
#!/usr/bin/env ruby# author: Bill Desmarais bill@witsaid.com# this code is licenced under the MIT/X11 licence.require'rubygems'require'ffi-rzmq'
context = ZMQ::Context.new(1)
puts"Starting Hello World server..."# socket to listen for clients
socket = context.socket(ZMQ::REP)
socket.bind("tcp://*:5555")
whiletruedo# Wait for next request from client
request = ''
rc = socket.recv_string(request)
puts"Received request. Data: #{request.inspect}"# Do some 'work'sleep1# Send reply back to client
socket.send_string("world")
end
//
// Hello World server in Scala
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
// author Giovanni Ruggiero
// email giovanni.ruggiero@gmail.com
//
//
importorg.zeromq.ZMQimportorg.zeromq.ZMQ.{Context,Socket}
objectHelloWorldServer {
def main(args :Array[String]) {
// Prepare our context and socket
val context =ZMQ.context(1)
val socket = context.socket(ZMQ.REP)
println ("starting")
socket.bind ("tcp://*:5555")
while (true) {
// Wait for next request from client
// We will wait for a 0-terminated string (C string) from the client,
// so that this server also works with The Guide's C and C++ "Hello World" clients
val request = socket.recv (0)
// In order to display the 0-terminated string as a String,
// we omit the last byte from request
println ("Received request: ["
+ newString(request,0,request.length-1) // Creates a String from request, minus the last byte
+ "]")
// Do some 'work'
try {
Thread.sleep (1000)
} catch {
case e:InterruptedException => e.printStackTrace()
}
// Send reply back to client
// We will send a 0-terminated string (C string) back to the client,
// so that this server also works with The Guide's C and C++ "Hello World" clients
val reply ="World ".getBytes
reply(reply.length-1)=0//Sets the last byte of the reply to 0
socket.send(reply, 0)
}
}
}
hwserver: Hello World server in Tcl
package require zmq
zmq context context
zmq socket responder context REP
responder bind "tcp://*:5555"while{1}{zmq message request
responder recv_msg request
puts"Received [request data]"request close
zmq message reply -data "World @ [clock format [clock seconds]]"responder send_msg reply
reply close
}responder close
context term
hwserver: Hello World server in OCaml
(** Hello World server *)openZmqopenHelperslet () =
with_context @@ fun ctx ->
with_socket ctx Socket.rep @@ fun resp ->
Socket.bind resp "tcp://*:5555";
while not !should_exit dolet s = Socket.recv resp in
printfn "Received : %S" s;
sleep_ms 1000; (* Do some 'work' *)Socket.send resp "World";
done
The REQ-REP socket pair is in lockstep. The client issues zmq_send() and then zmq_recv(), in a loop (or once if that’s all it needs). Doing any other sequence (e.g., sending two messages in a row) will result in a return code of -1 from the send or recv call. Similarly, the service issues zmq_recv() and then zmq_send() in that order, as often as it needs to.
ZeroMQ uses C as its reference language and this is the main language we’ll use for examples. If you’re reading this online, the link below the example takes you to translations into other programming languages. Let’s compare the same server in C++:
hwserver: Hello World server in C++
//
// Hello World server in C++
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
#include<zmq.hpp>#include<string>#include<iostream>#ifndef _WIN32
#include<unistd.h>#else
#include<windows.h>#define sleep(n) Sleep(n)
#endif
intmain () {
// Prepare our context and socket
zmq::context_t context (2);
zmq::socket_t socket (context, zmq::socket_type::rep);
socket.bind ("tcp://*:5555");
while (true) {
zmq::message_t request;
// Wait for next request from client
socket.recv (request, zmq::recv_flags::none);
std::cout << "Received Hello" << std::endl;
// Do some 'work'
sleep(1);
// Send reply back to client
zmq::message_t reply (5);
memcpy (reply.data (), "World", 5);
socket.send (reply, zmq::send_flags::none);
}
return0;
}
You can see that the ZeroMQ API is similar in C and C++. In a language like PHP or Java, we can hide even more and the code becomes even easier to read:
hwserver: Hello World server in PHP
<?php/*
* Hello World server
* Binds REP socket to tcp://*:5555
* Expects "Hello" from client, replies with "World"
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/$context = new ZMQContext(1);
// Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");
while (true) {
// Wait for next request from client
$request = $responder->recv();
printf ("Received request: [%s]\n", $request);
// Do some 'work'
sleep (1);
// Send reply back to client
$responder->send("World");
}
hwserver: Hello World server in Java
packageguide;
//
// Hello World server in Java
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
importorg.zeromq.SocketType;
importorg.zeromq.ZMQ;
importorg.zeromq.ZContext;
publicclasshwserver
{
publicstaticvoidmain(String[] args) throws Exception
{
try (ZContext context = new ZContext()) {
// Socket to talk to clients
ZMQ.Socket socket = context.createSocket(SocketType.REP);
socket.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
byte[] reply = socket.recv(0);
System.out.println(
"Received " + ": [" + new String(reply, ZMQ.CHARSET) + "]"
);
Thread.sleep(1000); // Do some 'work'
String response = "world";
socket.send(response.getBytes(ZMQ.CHARSET), 0);
}
}
}
}
;;; -*- Mode:Lisp; Syntax:ANSI-Common-Lisp; -*-;;;;;; Hello World server in Common Lisp;;; Binds REP socket to tcp://*:5555;;; Expects "Hello" from client, replies with "World";;;;;; Kamil Shakirov <kamils80@gmail.com>;;;
(defpackage#:zguide.hwserver
(:nicknames#:hwserver)
(:use#:cl#:zhelpers)
(:export#:main))
(in-package:zguide.hwserver)
(defunmain ()
;; Prepare our context and socket
(zmq:with-context (context1)
(zmq:with-socket (socketcontextzmq:rep)
(zmq:bindsocket"tcp://*:5555")
(loop
(let ((request (make-instance'zmq:msg)))
;; Wait for next request from client
(zmq:recvsocketrequest)
(message"Received request: [~A]~%"
(zmq:msg-data-as-stringrequest))
;; Do some 'work'
(sleep1)
;; Send reply back to client
(let ((reply (make-instance'zmq:msg:data"World")))
(zmq:sendsocketreply))))))
(cleanup))
hwserver: Hello World server in Delphi
program hwserver;
//
// Hello World server
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
// @author Varga Balazs <bb.varga@gmail.com>
//
{$APPTYPE CONSOLE}
{$I zmq.inc}
uses
SysUtils
, zmq
;
var
context,
responder: Pointer;
request,
reply: zmq_msg_t;
begin
context := zmq_init(1);
// Socket to talk to clients
responder := zmq_socket( context, ZMQ_REP );
zmq_bind( responder, 'tcp://*:5555' );
while true do
begin
// Wait for next request from client
zmq_msg_init( request );
{$ifdef zmq3}
zmq_recvmsg( responder, request, 0 );
{$else}
zmq_recv( responder, request, 0 );
{$endif}
Writeln( 'Received Hello' );
zmq_msg_close( request );
// Do some 'work'
sleep( 1000 );
// Send reply back to client
zmq_msg_init( reply );
zmq_msg_init_size( reply, 5 );
Move( 'World', zmq_msg_data( reply )^, 5 );
{$ifdef zmq3}
zmq_sendmsg( responder, reply, 0 );
{$else}
zmq_send( responder, reply, 0 );
{$endif}
zmq_msg_close( reply );
end;
// We never get here but if we did, this would be how we end
zmq_close( responder );
zmq_term( context );
end.
// Hello World server
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
open ZMQ;
println "hwserver, Felix version";
var context = zmq_init (1);
// Socket to talk to clients
var responder = context.mk_socket ZMQ_REP;
responder.bind "tcp://*:5555";
var request = #zmq_msg_t;
var reply = #zmq_msg_t;
while true do
// Wait for next request from client
request.init_string "Hello";
responder.recv_msg request;
println$ "Received Hello=" + string(request);
request.close;
// Do some 'work'
Faio::sleep (sys_clock,1.0);
// Send reply back to client
reply.init_size 5.size;
memcpy (zmq_msg_data reply, c"World".address, 5.size);
responder.send_msg reply;
reply.close;
done
hwserver: Hello World server in Go
//
// Hello World Zeromq server
//
// Author: Aaron Raddon github.com/araddon
// Requires: http://github.com/alecthomas/gozmq
//
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq""time"
)
funcmain() {
context, _ := zmq.NewContext()
socket, _ := context.NewSocket(zmq.REP)
defer context.Close()
defer socket.Close()
socket.Bind("tcp://*:5555")
// Wait for messages
for {
msg, _ := socket.Recv(0)
println("Received ", string(msg))
// do some fake "work"
time.Sleep(time.Second)
// send reply back to client
reply := fmt.Sprintf("World")
socket.Send([]byte(reply), 0)
}
}
hwserver: Hello World server in Haskell
{-# LANGUAGE OverloadedStrings #-}-- Hello World servermoduleMainwhereimportControl.ConcurrentimportControl.MonadimportSystem.ZMQ4.Monadicmain::IO()main= runZMQ $ do-- Socket to talk to clients
responder <- socket Rep
bind responder "tcp://*:5555"
forever $ do
buffer <- receive responder
liftIO $ do
putStrLn "Received Hello"
threadDelay 1000000-- Do some 'work'
send responder []"World"
hwserver: Hello World server in Haxe
package ;
importhaxe.io.Bytes;
importneko.Lib;
importneko.Sys;
importorg.zeromq.ZMQ;
importorg.zeromq.ZMQContext;
importorg.zeromq.ZMQException;
importorg.zeromq.ZMQSocket;
/**
* Hello World server in Haxe
* Binds REP to tcp://*:5556
* Expects "Hello" from client, replies with "World"
* Use with HelloWorldClient.hx
*
*/class HelloWorldServer
{
publicstaticfunctionmain() {
var context:ZMQContext = ZMQContext.instance();
var responder:ZMQSocket = context.socket(ZMQ_REP);
Lib.println("** HelloWorldServer (see: http://zguide.zeromq.org/page:all#Ask-and-Ye-Shall-Receive)");
responder.setsockopt(ZMQ_LINGER, 0);
responder.bind("tcp://*:5556");
try {
while (true) {
// Wait for next request from clientvar request:Bytes = responder.recvMsg();
trace ("Received request:" + request.toString());
// Do some work
Sys.sleep(1);
// Send reply back to client
responder.sendMsg(Bytes.ofString("World"));
}
} catch (e:ZMQException) {
trace (e.toString());
}
responder.close();
context.term();
}
}
hwserver: Hello World server in Java
packageguide;
//
// Hello World server in Java
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
importorg.zeromq.SocketType;
importorg.zeromq.ZMQ;
importorg.zeromq.ZContext;
publicclasshwserver
{
publicstaticvoidmain(String[] args) throws Exception
{
try (ZContext context = new ZContext()) {
// Socket to talk to clients
ZMQ.Socket socket = context.createSocket(SocketType.REP);
socket.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
byte[] reply = socket.recv(0);
System.out.println(
"Received " + ": [" + new String(reply, ZMQ.CHARSET) + "]"
);
Thread.sleep(1000); // Do some 'work'
String response = "world";
socket.send(response.getBytes(ZMQ.CHARSET), 0);
}
}
}
}
hwserver: Hello World server in Julia
#!/usr/bin/env julia## Hello World server in Julia# Binds REP socket to tcp://*:5555# Expects "Hello" from client, replies "World"#using ZMQ
context = Context()
socket = Socket(context, REP)
ZMQ.bind(socket, "tcp://*:5555")
whiletrue# Wait for next request from client
message = String(ZMQ.recv(socket))
println("Received request: $message")
# Do some 'work'
sleep(1)
# Send reply back to client
ZMQ.send(socket, "World")
end# classy hit men always clean up when finish the job.
ZMQ.close(socket)
ZMQ.close(context)
hwserver: Hello World server in Lua
---- Hello World server-- Binds REP socket to tcp://*:5555-- Expects "Hello" from client, replies with "World"---- Author: Robert G. Jakabosky <bobby@sharedrealm.com>--
require"zmq"
require"zhelpers"local context = zmq.init(1)
-- Socket to talk to clientslocal socket = context:socket(zmq.REP)
socket:bind("tcp://*:5555")
whiletruedo-- Wait for next request from clientlocal request = socket:recv()
print("Received Hello [" .. request .. "]")
-- Do some 'work'
s_sleep(1000)
-- Send reply back to client
socket:send("World")
end-- We never get here but if we did, this would be how we end
socket:close()
context:term()
hwserver: Hello World server in Node.js
// Hello World server
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "world"
var zmq = require('zeromq');
// socket to talk to clients
var responder = zmq.socket('rep');
responder.on('message', function(request) {
console.log("Received request: [", request.toString(), "]");
// do some 'work'
setTimeout(function() {
// send reply back to client.
responder.send("World");
}, 1000);
});
responder.bind('tcp://*:5555', function(err) {
if (err) {
console.log(err);
} else {
console.log("Listening on 5555...");
}
});
process.on('SIGINT', function() {
responder.close();
});
hwserver: Hello World server in Objective-C
//
// Hello World server
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
#import <Foundation/Foundation.h>
#import "ZMQObjc.h"
intmain(void)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init];
ZMQContext *ctx = [[[ZMQContext alloc] initWithIOThreads:1U] autorelease];
/* Get a socket to talk to clients. */static NSString *const kEndpoint = @"tcp://*:5555";
ZMQSocket *responder = [ctx socketWithType:ZMQ_REP];
BOOL didBind = [responder bindToEndpoint:kEndpoint];
if (!didBind) {
NSLog(@"*** Failed to bind to endpoint [%@].", kEndpoint);
return EXIT_FAILURE;
}
for (;;) {
/* Create a local pool so that autoreleased objects can be disposed of
* at the end of each go through the loop.
* Otherwise, memory usage would continue to rise
* until the end of the process.
*/
NSAutoreleasePool *localPool = [[NSAutoreleasePool alloc] init];
/* Block waiting for next request from client. */
NSData *request = [responder receiveDataWithFlags:0];
NSString *text = [[[NSString alloc]
initWithData:request encoding:NSUTF8StringEncoding] autorelease];
NSLog(@"Received request: %@", text);
/* "Work" for a bit. */
sleep(1);
/* Send reply back to client. */static NSString *const kWorld = @"World";
constchar *replyCString = [kWorld UTF8String];
const NSUInteger replyLength = [kWorld
lengthOfBytesUsingEncoding:NSUTF8StringEncoding];
NSData *reply = [NSData dataWithBytes:replyCString length:replyLength];
[responder sendData:reply withFlags:0];
[localPool drain];
}
/* Close the socket to avoid blocking in -[ZMQContext terminate]. */
[responder close];
/* Dispose of the context and socket. */
[pool drain];
return EXIT_SUCCESS;
}
# Hello World server in Perlusestrict;
usewarnings;
usev5.10;
useZMQ::FFI;
useZMQ::FFI::Constantsqw(ZMQ_REP);
# Socket to talk to clientsmy$context = ZMQ::FFI->new();
my$responder = $context->socket(ZMQ_REP);
$responder->bind("tcp://*:5555");
while (1) {
$responder->recv();
say "Received Hello";
sleep1;
$responder->send("World");
}
hwserver: Hello World server in PHP
<?php/*
* Hello World server
* Binds REP socket to tcp://*:5555
* Expects "Hello" from client, replies with "World"
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/$context = new ZMQContext(1);
// Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");
while (true) {
// Wait for next request from client
$request = $responder->recv();
printf ("Received request: [%s]\n", $request);
// Do some 'work'
sleep (1);
// Send reply back to client
$responder->send("World");
}
hwserver: Hello World server in Python
## Hello World server in Python# Binds REP socket to tcp://*:5555# Expects b"Hello" from client, replies with b"World"#importtimeimportzmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
# Wait for next request from client
message = socket.recv()
print(f"Received request: {message}")
# Do some 'work'
time.sleep(1)
# Send reply back to client
socket.send_string("World")
hwserver: Hello World server in Q
// Hello World server
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
\l qzmq.q
ctx:zctx.new[]
// Socket to talk to clients
responder:zsocket.new[ctx; zmq`REP]
port:zsocket.bind[responder; `$"tcp://*:5555"]
while[1b and not zctx.interrupted[];
// Wait for next request from client
s:zmsg.recv responder;
// Do some 'work'
zclock.sleep 1;
// Send reply back to client
m1:zmsg.new[];
zmsg.push[m1; zframe.new["World"]];
zmsg.send[m1; responder]]
// We never get here but if we did, this would how we end
zsocket.destroy[ctx; responder]
zctx.destroy[ctx]
hwserver: Hello World server in Racket
#lang racket#|
Hello World server in Racket
Binds REP socket to tcp://*:5555
Expects "Hello" from client, replies with "World"
|#
(require net/zmq)
(define ctxt (context 1))
(define sock (socket ctxt 'REP))
(socket-bind! sock "tcp://*:5555")
(let loop ()
(define message (socket-recv! sock))
(printf"Received request: ~a\n" message)
(sleep1)
(socket-send! sock #"World")
(loop))
(context-close! ctxt)
hwserver: Hello World server in Ruby
#!/usr/bin/env ruby# author: Bill Desmarais bill@witsaid.com# this code is licenced under the MIT/X11 licence.require'rubygems'require'ffi-rzmq'
context = ZMQ::Context.new(1)
puts"Starting Hello World server..."# socket to listen for clients
socket = context.socket(ZMQ::REP)
socket.bind("tcp://*:5555")
whiletruedo# Wait for next request from client
request = ''
rc = socket.recv_string(request)
puts"Received request. Data: #{request.inspect}"# Do some 'work'sleep1# Send reply back to client
socket.send_string("world")
end
//
// Hello World server in Scala
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
// author Giovanni Ruggiero
// email giovanni.ruggiero@gmail.com
//
//
importorg.zeromq.ZMQimportorg.zeromq.ZMQ.{Context,Socket}
objectHelloWorldServer {
def main(args :Array[String]) {
// Prepare our context and socket
val context =ZMQ.context(1)
val socket = context.socket(ZMQ.REP)
println ("starting")
socket.bind ("tcp://*:5555")
while (true) {
// Wait for next request from client
// We will wait for a 0-terminated string (C string) from the client,
// so that this server also works with The Guide's C and C++ "Hello World" clients
val request = socket.recv (0)
// In order to display the 0-terminated string as a String,
// we omit the last byte from request
println ("Received request: ["
+ newString(request,0,request.length-1) // Creates a String from request, minus the last byte
+ "]")
// Do some 'work'
try {
Thread.sleep (1000)
} catch {
case e:InterruptedException => e.printStackTrace()
}
// Send reply back to client
// We will send a 0-terminated string (C string) back to the client,
// so that this server also works with The Guide's C and C++ "Hello World" clients
val reply ="World ".getBytes
reply(reply.length-1)=0//Sets the last byte of the reply to 0
socket.send(reply, 0)
}
}
}
hwserver: Hello World server in Tcl
package require zmq
zmq context context
zmq socket responder context REP
responder bind "tcp://*:5555"while{1}{zmq message request
responder recv_msg request
puts"Received [request data]"request close
zmq message reply -data "World @ [clock format [clock seconds]]"responder send_msg reply
reply close
}responder close
context term
hwserver: Hello World server in OCaml
(** Hello World server *)openZmqopenHelperslet () =
with_context @@ fun ctx ->
with_socket ctx Socket.rep @@ fun resp ->
Socket.bind resp "tcp://*:5555";
while not !should_exit dolet s = Socket.recv resp in
printfn "Received : %S" s;
sleep_ms 1000; (* Do some 'work' *)Socket.send resp "World";
done
// Hello World client
// Connects REQ socket to tcp://localhost:5555
// Sends "Hello" to server, expects "World" back
open ZMQ;
println "hwclient, Felix version";
var context = zmq_init 1;
// Socket to talk to server
println "Connecting to hello world server";
var requester = context.mk_socket ZMQ_REQ;
requester.connect "tcp://localhost:5555";
var request = #zmq_msg_t;
var reply = #zmq_msg_t;
for var request_nbr in 0 upto 9 do
request.init_size 5.size;
memcpy (zmq_msg_data request, c"Hello".address, 5.size);
print$ f"Sending Hello %d\n" request_nbr;
requester.send_msg request;
request.close;
reply.init_size 5.size;
requester.recv_msg reply;
println$ f"Received World %d=%S" (request_nbr, reply.string);
reply.close;
done
requester.close;
context.term;
hwclient: Hello World client in Go
//
// Hello World Zeromq Client
//
// Author: Aaron Raddon github.com/araddon
// Requires: http://github.com/alecthomas/gozmq
//
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq"
)
funcmain() {
context, _ := zmq.NewContext()
socket, _ := context.NewSocket(zmq.REQ)
defer context.Close()
defer socket.Close()
fmt.Printf("Connecting to hello world server...")
socket.Connect("tcp://localhost:5555")
for i := 0; i < 10; i++ {
// send hello
msg := fmt.Sprintf("Hello %d", i)
socket.Send([]byte(msg), 0)
println("Sending ", msg)
// Wait for reply:
reply, _ := socket.Recv(0)
println("Received ", string(reply))
}
}
hwclient: Hello World client in Haskell
{-# LANGUAGE OverloadedStrings #-}-- Hello World clientmoduleMainwhereimportControl.MonadimportSystem.ZMQ4.Monadicmain::IO()main= runZMQ $ do
liftIO $ putStrLn "Connecting to hello world server..."
requester <- socket Req
connect requester "tcp://localhost:5555"
forM_ [1..10] $ \i ->do
liftIO . putStrLn $ "Sending Hello " ++ show i ++ "..."
send requester []"Hello"_<- receive requester
liftIO . putStrLn $ "Received World " ++ show i
hwclient: Hello World client in Haxe
package ;
importhaxe.io.Bytes;
importneko.Lib;
importneko.Sys;
importorg.zeromq.ZMQ;
importorg.zeromq.ZMQContext;
importorg.zeromq.ZMQSocket;
/**
* Hello World client in Haxe.
* Use with HelloWorldServer.hx and MTServer.hx
*/class HelloWorldClient
{
publicstaticfunctionmain() {
var context:ZMQContext = ZMQContext.instance();
var socket:ZMQSocket = context.socket(ZMQ_REQ);
Lib.println("** HelloWorldClient (see: http://zguide.zeromq.org/page:all#Ask-and-Ye-Shall-Receive)");
trace ("Connecting to hello world server...");
socket.connect ("tcp://localhost:5556");
// Do 10 requests, waiting each time for a responsefor (i in0...10) {
var requestString = "Hello ";
// Send the message
trace ("Sending request " + i + " ...");
socket.sendMsg(Bytes.ofString(requestString));
// Wait for the replyvar msg:Bytes = socket.recvMsg();
trace ("Received reply " + i + ": [" + msg.toString() + "]");
}
// Shut down socket and context
socket.close();
context.term();
}
}
hwclient: Hello World client in Java
packageguide;
//
// Hello World client in Java
// Connects REQ socket to tcp://localhost:5555
// Sends "Hello" to server, expects "World" back
//
importorg.zeromq.SocketType;
importorg.zeromq.ZMQ;
importorg.zeromq.ZContext;
publicclasshwclient
{
publicstaticvoidmain(String[] args)
{
try (ZContext context = new ZContext()) {
// Socket to talk to server
System.out.println("Connecting to hello world server");
ZMQ.Socket socket = context.createSocket(SocketType.REQ);
socket.connect("tcp://localhost:5555");
for (int requestNbr = 0; requestNbr != 10; requestNbr++) {
String request = "Hello";
System.out.println("Sending Hello " + requestNbr);
socket.send(request.getBytes(ZMQ.CHARSET), 0);
byte[] reply = socket.recv(0);
System.out.println(
"Received " + new String(reply, ZMQ.CHARSET) + " " +
requestNbr
);
}
}
}
}
hwclient: Hello World client in Julia
#!/usr/bin/env julia## Hello World client in Julia# Connects REQ socket to tcp://localhost:5555# Sends "Hello" to server, expects "World" back#using ZMQ
context = Context()
# Socket to talk to server
println("Connecting to hello world server...")
socket = Socket(context, REQ)
ZMQ.connect(socket, "tcp://localhost:5555")
for request in1:10
println("Sending request $request ...")
ZMQ.send(socket, "Hello")
# Get the reply.
message = String(ZMQ.recv(socket))
println("Received reply $request [ $message ]")
end# Making a clean exit.
ZMQ.close(socket)
ZMQ.close(context)
hwclient: Hello World client in Lua
---- Hello World client-- Connects REQ socket to tcp://localhost:5555-- Sends "Hello" to server, expects "World" back---- Author: Robert G. Jakabosky <bobby@sharedrealm.com>--
require"zmq"local context = zmq.init(1)
-- Socket to talk to server
print("Connecting to hello world server...")
local socket = context:socket(zmq.REQ)
socket:connect("tcp://localhost:5555")
for n=1,10do
print("Sending Hello " .. n .. " ...")
socket:send("Hello")
local reply = socket:recv()
print("Received World " .. n .. " [" .. reply .. "]")
end
socket:close()
context:term()
hwclient: Hello World client in Node.js
// Hello World client
// Connects REQ socket to tcp://localhost:5555
// Sends "Hello" to server.
var zmq = require('zeromq');
// socket to talk to server
console.log("Connecting to hello world server...");
var requester = zmq.socket('req');
var x = 0;
requester.on("message", function(reply) {
console.log("Received reply", x, ": [", reply.toString(), ']');
x += 1;
if (x === 10) {
requester.close();
process.exit(0);
}
});
requester.connect("tcp://localhost:5555");
for (var i = 0; i < 10; i++) {
console.log("Sending request", i, '...');
requester.send("Hello");
}
process.on('SIGINT', function() {
requester.close();
});
# Hello World client in Perlusestrict;
usewarnings;
usev5.10;
useZMQ::FFI;
useZMQ::FFI::Constantsqw(ZMQ_REQ);
say "Connecting to hello world server...";
my$context = ZMQ::FFI->new();
my$requestor = $context->socket(ZMQ_REQ);
$requestor->connect("tcp://localhost:5555");
formy$request_nbr (0..9) {
say "Sending Hello $request_nbr...";
$requestor->send("Hello");
$requestor->recv();
say "Received World $request_nbr";
}
hwclient: Hello World client in PHP
<?php/*
* Hello World client
* Connects REQ socket to tcp://localhost:5555
* Sends "Hello" to server, expects "World" back
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/$context = new ZMQContext();
// Socket to talk to server
echo"Connecting to hello world server...\n";
$requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);
$requester->connect("tcp://localhost:5555");
for ($request_nbr = 0; $request_nbr != 10; $request_nbr++) {
printf ("Sending request %d...\n", $request_nbr);
$requester->send("Hello");
$reply = $requester->recv();
printf ("Received reply %d: [%s]\n", $request_nbr, $reply);
}
hwclient: Hello World client in Python
## Hello World client in Python# Connects REQ socket to tcp://localhost:5555# Sends "Hello" to server, expects "World" back#importzmq
context = zmq.Context()
# Socket to talk to serverprint("Connecting to hello world server...")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
# Do 10 requests, waiting each time for a responsefor request inrange(10):
print(f"Sending request {request} ...")
socket.send_string("Hello")
# Get the reply.
message = socket.recv()
print(f"Received reply {request} [ {message} ]")
hwclient: Hello World client in Q
// Hello World client
// Connects REQ socket to tcp://localhost:5555
// Sends "Hello" to server, expects "World" back
\l qzmq.q
zclock.log "Connecting to hello world server..."
ctx:zctx.new[]
// Socket to talk to server
requester:zsocket.new[ctx; zmq`REQ]
zsocket.connect[requester; `tcp://127.0.0.1:5555]
do[10; m:zmsg.new[]; zmsg.push[m; f:zframe.new["Hello"]];
zmsg.send[m; requester]; zmsg.dump[zmsg.recv[requester]]]
zsocket.destroy[ctx; requester]
zctx.destroy[ctx]
\\
hwclient: Hello World client in Racket
#lang racket#|
# Hello World client in Racket
# Connects REQ socket to tcp://localhost:5555
# Sends "Hello" to server, expects "World" back
|#
(require net/zmq)
; Prepare our context and sockets
(define ctxt (context 1))
(define sock (socket ctxt 'REQ))
(printf"Connecting to hello world server...\n")
(socket-connect! sock "tcp://localhost:5555")
; Do 10 requests, waiting each time for a response
(for ([request (in-range10)])
(printf"Sending request ~a...\n" request)
(socket-send! sock #"Hello")
; Get the reply.
(define message (socket-recv! sock))
(printf"Received reply ~a [~a]\n" request message))
(context-close! ctxt)
hwclient: Hello World client in Ruby
#!/usr/bin/env rubyrequire'rubygems'require'ffi-rzmq'
context = ZMQ::Context.new(1)
# Socket to talk to serverputs"Connecting to hello world server..."
requester = context.socket(ZMQ::REQ)
requester.connect("tcp://localhost:5555")
0.upto(9) do |request_nbr|
puts"Sending request #{request_nbr}..."
requester.send_string "Hello"
reply = ''
rc = requester.recv_string(reply)
puts"Received reply #{request_nbr}: [#{reply}]"end
hwclient: Hello World client in Rust
fnmain(){println!("Connecting to hello world server...");letcontext=zmq::Context::new();letrequester=context.socket(zmq::REQ).unwrap();assert!(requester.connect("tcp://localhost:5555").is_ok());forrequest_nbrin0..10{letbuffer=&mut[0;10];println!("Sending Hello {:?}...",request_nbr);requester.send("Hello",0).unwrap();requester.recv_into(buffer,0).unwrap();println!("Received World {:?}",request_nbr);}}
hwclient: Hello World client in Scala
/*
*
* Hello World client in Scala
* Connects REQ socket to tcp://localhost:5555
* Sends "Hello" to server, expects "World" back
*
* @author Giovanni Ruggiero
* @email giovanni.ruggiero@gmail.com
*/importorg.zeromq.ZMQimportorg.zeromq.ZMQ.{Context,Socket}
objectHelloWorldClient{
def main(args :Array[String]) {
// Prepare our context and socket
val context =ZMQ.context(1)
val socket = context.socket(ZMQ.REQ)
println("Connecting to hello world server...")
socket.connect ("tcp://localhost:5555")
// Do 10 requests, waiting each time for a response
for (request_nbr <-1 to 10) {
// Create a "Hello" message.
// Ensure that the last byte of our "Hello" message is 0 because
// our "Hello World" server is expecting a 0-terminated string:
val request ="Hello ".getBytes()
request(request.length-1)=0//Sets the last byte to 0
// Send the message
println("Sending request " + request_nbr + "...") + request.toString
socket.send(request, 0)
// Get the reply.
val reply = socket.recv(0)
// When displaying reply as a String, omit the last byte because
// our "Hello World" server has sent us a 0-terminated string:
println("Received reply " + request_nbr + ": [" + newString(reply,0,reply.length-1) + "]")
}
}
}
hwclient: Hello World client in Tcl
package require zmq
zmq context context
zmq socket client context REQ
client connect "tcp://localhost:5555"for{set i 0}{$i < 10}{incr i}{zmq message msg -data "Hello @ [clock format [clock seconds]]"client send_msg msg
msg close
zmq message msg
client recv_msg msg
puts"Received [msg data]/[msg size]"msg close
}client close
context term
hwclient: Hello World client in OCaml
(** Hello World client *)openZmqopenHelperslet () =
printfn "Connecting to hello world server...";
with_context @@ fun ctx ->
with_socket ctx Socket.req @@ fun req ->
Socket.connect req "tcp://localhost:5555";
for i = 0 to 9 do
printfn "Sending Hello %d..." i;
Socket.send req "Hello";
let answer = Socket.recv req in
printfn "Received %d : %S" i answer
done
Now this looks too simple to be realistic, but ZeroMQ sockets have, as we already learned, superpowers. You could throw thousands of clients at this server, all at once, and it would continue to work happily and quickly. For fun, try starting the client and then starting the server, see how it all still works, then think for a second what this means.
Let us explain briefly what these two programs are actually doing. They create a ZeroMQ context to work with, and a socket. Don’t worry what the words mean. You’ll pick it up. The server binds its REP (reply) socket to port 5555. The server waits for a request in a loop, and responds each time with a reply. The client sends a request and reads the reply back from the server.
If you kill the server (Ctrl-C) and restart it, the client won’t recover properly. Recovering from crashing processes isn’t quite that easy. Making a reliable request-reply flow is complex enough that we won’t cover it until
Chapter 4 - Reliable Request-Reply Patterns.
There is a lot happening behind the scenes but what matters to us programmers is how short and sweet the code is, and how often it doesn’t crash, even under a heavy load. This is the request-reply pattern, probably the simplest way to use ZeroMQ. It maps to RPC and the classic client/server model.
ZeroMQ doesn’t know anything about the data you send except its size in bytes. That means you are responsible for formatting it safely so that applications can read it back. Doing this for objects and complex data types is a job for specialized libraries like Protocol Buffers. But even for strings, you need to take care.
In C and some other languages, strings are terminated with a null byte. We could send a string like “HELLO” with that extra null byte:
zmq_send (requester, "Hello", 6, 0);
However, if you send a string from another language, it probably will not include that null byte. For example, when we send that same string in Python, we do this:
socket.send ("Hello")
Then what goes onto the wire is a length (one byte for shorter strings) and the string contents as individual characters.
And if you read this from a C program, you will get something that looks like a string, and might by accident act like a string (if by luck the five bytes find themselves followed by an innocently lurking null), but isn’t a proper string. When your client and server don’t agree on the string format, you will get weird results.
When you receive string data from ZeroMQ in C, you simply cannot trust that it’s safely terminated. Every single time you read a string, you should allocate a new buffer with space for an extra byte, copy the string, and terminate it properly with a null.
So let’s establish the rule that ZeroMQ strings are length-specified and are sent on the wire without a trailing null. In the simplest case (and we’ll do this in our examples), a ZeroMQ string maps neatly to a ZeroMQ message frame, which looks like the above figure–a length and some bytes.
Here is what we need to do, in C, to receive a ZeroMQ string and deliver it to the application as a valid C string:
// Receive ZeroMQ string from socket and convert into C string
// Chops string at 255 chars, if it's longer
staticchar *
s_recv (void *socket) {
char buffer [256];
int size = zmq_recv (socket, buffer, 255, 0);
if (size == -1)
returnNULL;
if (size > 255)
size = 255;
buffer [size] = '\0';
/* use strndup(buffer, sizeof(buffer)-1) in *nix */return strdup (buffer);
}
This makes a handy helper function and in the spirit of making things we can reuse profitably, let’s write a similar s_send function that sends strings in the correct ZeroMQ format, and package this into a header file we can reuse.
The result is zhelpers.h, which lets us write sweeter and shorter ZeroMQ applications in C. It is a fairly long source, and only fun for C developers, so
read it at leisure.
ZeroMQ does come in several versions and quite often, if you hit a problem, it’ll be something that’s been fixed in a later version. So it’s a useful trick to know exactly what version of ZeroMQ you’re actually linking with.
program version;
//
// Report 0MQ version
// @author Varga Balazs <bb.varga@gmail.com>
//
{$APPTYPE CONSOLE}
uses
SysUtils
, zmqapi
;
var
major,
minor,
patch: Integer;
begin
ZMQVersion( major, minor, patch );
Writeln( Format( 'Current 0MQ version is %d.%d.%d', [major, minor, patch]) );
end.
version: ZeroMQ version reporting in Erlang
#! /usr/bin/env escript
%%
%% Report 0MQ version
%%
main(_) ->
{Maj, Min, Patch} = erlzmq:version(),
io:format("Current 0MQ version is ~b.~b.~b~n", [Maj, Min, Patch]).
version: ZeroMQ version reporting in Elixir
defmodule Version do
@moduledoc"""
Generated by erl2ex (http://github.com/dazuma/erl2ex)
From Erlang source: (Unknown source file)
At: 2019-12-20 13:57:38
"""
def main() do
{maj, var_min, patch} = :erlzmq.version()
:io.format('Current 0MQ version is ~b.~b.~b~n', [maj, var_min, patch])
end
end
Version.main
println$ f"Current 0MQ version is %d.%d.%d" #ZeroMQ::zmq_version;
version: ZeroMQ version reporting in Go
//
// 0MQ version example.
//
// Author: Max Riveiro <kavu13@gmail.com>
// Requires: http://github.com/alecthomas/gozmq
//
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq"
)
funcmain() {
major, minor, patch := zmq.Version()
fmt.Printf("Current 0MQ version is %d.%d.%d\n", major, minor, patch)
}
version: ZeroMQ version reporting in Haskell
moduleMainwhereimportSystem.ZMQ4 (version)
importText.Printf (printf)
main::IO()main=do
(major, minor, patch) <- version
printf "Current 0MQ version is %d.%d.%d" major minor patch
packageguide;
importorg.zeromq.ZMQ;
// Report 0MQ version
publicclassversion
{
publicstaticvoidmain(String[] args)
{
String version = ZMQ.getVersionString();
int fullVersion = ZMQ.getFullVersion();
System.out.println(
String.format(
"Version string: %s, Version int: %d", version, fullVersion
)
);
}
}
version: ZeroMQ version reporting in Julia
#!/usr/bin/env juliausing ZMQ
println("Current ZMQ version is $(ZMQ.version)")
version: ZeroMQ version reporting in Lua
---- Report 0MQ version---- Author: Robert G. Jakabosky <bobby@sharedrealm.com>--
require"zmq"
print("Current 0MQ version is " .. table.concat(zmq.version(), '.'))
version: ZeroMQ version reporting in Node.js
// Report 0MQ version in Node.js
var zmq = require('zeromq');
console.log("Current 0MQ version is " + zmq.version);
version: ZeroMQ version reporting in Objective-C
/* Reports the 0MQ version. */#import "ZMQObjC.h"
intmain(void)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init];
int major = 0;
int minor = 0;
int patch = 0;
[ZMQContext getZMQVersionMajor:&major minor:&minor patch:&patch];
NSLog(@"Current 0MQ version is %d.%d.%d.", major, minor, patch);
[pool drain];
return EXIT_SUCCESS;
}
# Report 0MQ version in Perlusestrict;
usewarnings;
usev5.10;
useZMQ::FFI;
my ($major, $minor, $patch) = ZMQ::FFI->new->version;
say "Current 0MQ version is $major.$minor.$patch";
version: ZeroMQ version reporting in PHP
<?php/* Report 0MQ version
*
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/if (class_exists("ZMQ") && defined("ZMQ::LIBZMQ_VER")) {
echo ZMQ::LIBZMQ_VER, PHP_EOL;
}
version: ZeroMQ version reporting in Python
# Report 0MQ version## Author: Lev Givon <lev(at)columbia(dot)edu>importzmqprint(f"Current libzmq version is {zmq.zmq_version()}")
print(f"Current pyzmq version is {zmq.__version__}")
version: ZeroMQ version reporting in Q
// Report 0MQ version
\l qzmq.q
mnp:libzmq.version[]
zclock.log "Current 0MQ version is ","." sv (string mnp)
\\
The second classic pattern is one-way data distribution, in which a server pushes updates to a set of clients. Let’s see an example that pushes out weather updates consisting of a zip code, temperature, and relative humidity. We’ll generate random values, just like the real weather stations do.
Here’s the server. We’ll use port 5556 for this application:
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
#include"zhelpers.h"intmain (void)
{
// Prepare our context and publisher
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, "tcp://*:5556");
assert (rc == 0);
// Initialize random number generator
srandom ((unsigned) time (NULL));
while (1) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = randof (100000);
temperature = randof (215) - 80;
relhumidity = randof (50) + 10;
// Send message to all subscribers
char update [20];
sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send (publisher, update);
}
zmq_close (publisher);
zmq_ctx_destroy (context);
return0;
}
wuserver: Weather update server in C++
//
// Weather update server in C++
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
#include<zmq.hpp>#include<stdio.h>#include<stdlib.h>#include<time.h>#if (defined (WIN32))
#include<zhelpers.hpp>#endif
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
intmain () {
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, zmq::socket_type::pub);
publisher.bind("tcp://*:5556");
publisher.bind("ipc://weather.ipc"); // Not usable on Windows.
// Initialize random number generator
srandom ((unsigned) time (NULL));
while (1) {
int zipcode, temperature, relhumidity;
// Get values that will fool the boss
zipcode = within (100000);
temperature = within (215) - 80;
relhumidity = within (50) + 10;
// Send message to all subscribers
zmq::message_t message(20);
snprintf ((char *) message.data(), 20 ,
"%05d %d %d", zipcode, temperature, relhumidity);
publisher.send(message, zmq::send_flags::none);
}
return0;
}
//
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
open ZMQ;
// Prepare our context and publisher
var context = zmq_init 1;
var publisher = context.mk_socket ZMQ_PUB;
publisher.bind "tcp://*:5556";
publisher.bind "ipc://weather.ipc";
while true do
// Get values that will fool the boss
zipcode := #rand % 1000+1000;
temperature := #rand % 80 - 20; // Oztraila mate!
relhumidity := #rand % 50 + 10;
// Send message to all subscribers
update := f"%03d %d %d" (zipcode, temperature, relhumidity);
publisher.send_string update;
done
wuserver: Weather update server in Go
//
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq""math/rand""time"
)
funcmain() {
context, _ := zmq.NewContext()
socket, _ := context.NewSocket(zmq.PUB)
defer context.Close()
defer socket.Close()
socket.Bind("tcp://*:5556")
socket.Bind("ipc://weather.ipc")
// Seed the random number generator
rand.Seed(time.Now().UnixNano())
// loop for a while aparently
for {
// make values that will fool the boss
zipcode := rand.Intn(100000)
temperature := rand.Intn(215) - 80
relhumidity := rand.Intn(50) + 10
msg := fmt.Sprintf("%d %d %d", zipcode, temperature, relhumidity)
// Send message to all subscribers
socket.Send([]byte(msg), 0)
}
}
wuserver: Weather update server in Haskell
{-# LANGUAGE ScopedTypeVariables #-}-- Weather update server-- Binds PUB socket to tcp://*:5556-- Publishes random weather updatesmoduleMainwhereimportControl.MonadimportqualifiedData.ByteString.Char8as BS
importSystem.RandomimportSystem.ZMQ4.MonadicimportText.Printfmain::IO()main= runZMQ $ do-- Prepare our publisher
publisher <- socket Pub
bind publisher "tcp://*:5556"
forever $ do-- Get values that will fool the boss
zipcode ::Int<- liftIO $ randomRIO (0, 100000)
temperature ::Int<- liftIO $ randomRIO (-30, 135)
relhumidity ::Int<- liftIO $ randomRIO (10, 60)
-- Send message to all subscriberslet update = printf "%05d %d %d" zipcode temperature relhumidity
send publisher [] (BS.pack update)
wuserver: Weather update server in Haxe
package ;
importhaxe.io.Bytes;
importneko.Lib;
importorg.zeromq.ZMQ;
importorg.zeromq.ZMQContext;
importorg.zeromq.ZMQSocket;
/**
* Weather update server in Haxe
* Binds PUB socket to tcp://*:5556
* Publishes random weather updates
*
* See: http://zguide.zeromq.org/page:all#Getting-the-Message-Out
*
* Use with WUClient.hx
*/class WUServer
{
publicstaticfunctionmain() {
var context:ZMQContext = ZMQContext.instance();
Lib.println("** WUServer (see: http://zguide.zeromq.org/page:all#Getting-the-Message-Out)");
var publisher:ZMQSocket = context.socket(ZMQ_PUB);
publisher.bind("tcp://127.0.0.1:5556");
while (true) {
// Get values that will fool the bossvar zipcode, temperature, relhumidity;
zipcode = Std.random(100000) + 1;
temperature = Std.random(215) - 80 + 1;
relhumidity = Std.random(50) + 10 + 1;
// Send message to all subscribersvar update:String = zipcode + " " + temperature + " " + relhumidity;
publisher.sendMsg(Bytes.ofString(update));
}
}
}
wuserver: Weather update server in Java
packageguide;
importjava.util.Random;
importorg.zeromq.SocketType;
importorg.zeromq.ZMQ;
importorg.zeromq.ZContext;
//
// Weather update server in Java
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
publicclasswuserver
{
publicstaticvoidmain(String[] args) throws Exception
{
// Prepare our context and publisher
try (ZContext context = new ZContext()) {
ZMQ.Socket publisher = context.createSocket(SocketType.PUB);
publisher.bind("tcp://*:5556");
publisher.bind("ipc://weather");
// Initialize random number generator
Random srandom = new Random(System.currentTimeMillis());
while (!Thread.currentThread().isInterrupted()) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = 10000 + srandom.nextInt(10000);
temperature = srandom.nextInt(215) - 80 + 1;
relhumidity = srandom.nextInt(50) + 10 + 1;
// Send message to all subscribers
String update = String.format(
"%05d %d %d", zipcode, temperature, relhumidity
);
publisher.send(update, 0);
}
}
}
}
---- Weather update server-- Binds PUB socket to tcp://*:5556-- Publishes random weather updates---- Author: Robert G. Jakabosky <bobby@sharedrealm.com>--
require"zmq"-- Prepare our context and publisherlocal context = zmq.init(1)
local publisher = context:socket(zmq.PUB)
publisher:bind("tcp://*:5556")
publisher:bind("ipc://weather.ipc")
-- Initialize random number generator
math.randomseed(os.time())
while (1) do-- Get values that will fool the bosslocal zipcode, temperature, relhumidity
zipcode = math.random(0, 99999)
temperature = math.random(-80, 135)
relhumidity = math.random(10, 60)
-- Send message to all subscribers
publisher:send(string.format("%05d %d %d", zipcode, temperature, relhumidity))
end
publisher:close()
context:term()
wuserver: Weather update server in Node.js
// Weather update server in node.js
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
var zmq = require('zeromq')
, publisher = zmq.socket('pub');
publisher.bindSync("tcp://*:5556");
publisher.bindSync("ipc://weather.ipc");
function zeropad(num) {
return num.toString().padStart(5, "0");
};
function rand(upper, extra) {
var num = Math.abs(Math.round(Math.random() * upper));
return num + (extra || 0);
};
while (true) {
// Get values that will fool the boss
var zipcode = rand(100000)
, temperature = rand(215, -80)
, relhumidity = rand(50, 10)
, update = `${zeropad(zipcode)}${temperature}${relhumidity}`;
publisher.send(update);
}
wuserver: Weather update server in Objective-C
//
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
#import "ZMQObjC.h"
#import "ZMQHelper.h"
intmain(void)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init];
// Prepare our context and publisher
ZMQContext *ctx = [[[ZMQContext alloc] initWithIOThreads:1U] autorelease];
ZMQSocket *publisher = [ctx socketWithType:ZMQ_PUB];
[publisher bindToEndpoint:@"tcp://*:5556"];
[publisher bindToEndpoint:@"ipc://weather.ipc"];
// Initialize random number generator
srandom ((unsigned) time (NULL));
for (;;) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = within (100000);
temperature = within (215) - 80;
relhumidity = within (50) + 10;
// Send message to all subscribers
NSString *update = [NSString stringWithFormat:@"%05d %d %d",
zipcode, temperature, relhumidity];
NSData *data = [update dataUsingEncoding:NSUTF8StringEncoding];
[publisher sendData:data withFlags:0];
}
[publisher close];
[pool drain];
return EXIT_SUCCESS;
}
//
// Weather update server in Scala
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
// author Giovanni Ruggiero
// email giovanni.ruggiero@gmail.com
importjava.util.Randomimportorg.zeromq.ZMQobjectwuserver {
def main(args :Array[String]) {
// Prepare our context and publisher
val context =ZMQ.context(1)
val publisher = context.socket(ZMQ.PUB)
publisher.bind("tcp://*:5556")
// Initialize random number generator
val srandom =newRandom(System.currentTimeMillis())
while (true) {
// Get values that will fool the boss
val zipcode:Integer = srandom.nextInt(100000) + 1val temperature:Integer = srandom.nextInt(215) - 80 + 1val relhumidity:Integer = srandom.nextInt(50) + 10 + 1// Send message to all subscribers
val update =String.format("%05d %d %d\u0000", zipcode, temperature, relhumidity);
publisher.send(update.getBytes(), 0)
}
}
}
wuserver: Weather update server in Tcl
## Weather update server
# Binds PUB socket to tcp:#*:5556
# Publishes random weather updates
#
package require zmq
# Prepare our context and publisher
zmq context context
zmq socket publisher context PUB
publisher bind "tcp://*:5556"publisher bind "ipc://weather.ipc"# Initialize random number generator
expr{srand([clock seconds])}while{1}{# Get values that will fool the boss
set zipcode [expr{int(rand()*100000)}]set temperature [expr{int(rand()*215)-80}]set relhumidity [expr{int(rand()*50)+50}]# Send message to all subscribers
set data [format"%05d %d %d"$zipcode$temperature$relhumidity]if{$zipcodeeq"10001"}{puts$data}zmq message msg -data $datapublisher send_msg msg
msg close
}publisher close
context term
wuserver: Weather update server in OCaml
(**
Weather update server
Binds PUB socket to tcp://*:5556
Publishes random weather updates
*)openZmqopenHelperslet () =
with_context @@ fun ctx ->
with_socket ctx Socket.pub @@ fun pub ->
Socket.bind pub "tcp://*:5556";
Socket.bind pub "ipc://weather.ipc";
Random.self_init ();
while not !should_exit do(* Get values that will fool the boss *)let zipcode = Random.int 100_000 inlet temperature = Random.int 215 - 80 inlet relhumidity = Random.int 50 + 10 in(* Send message to all subscribers *)Socket.send pub (Printf.sprintf "%05d %d %d" zipcode temperature relhumidity);
done
There’s no start and no end to this stream of updates, it’s like a never ending broadcast.
Here is the client application, which listens to the stream of updates and grabs anything to do with a specified zip code, by default New York City because that’s a great place to start any adventure:
//
// Weather update client
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
open ZMQ;
fun parse_int(s:string,var i:int) = {
var acc = 0;
while s.[i] \in "0123456789" do
acc = acc * 10 + s.[i].ord - "0".char.ord;
++i;
done
return i,acc;
}
fun parse_space(s:string, i:int)=> i+1;
fun parse_weather(s:string) = {
var i = 0;
def i, val zipcode = parse_int (s,i);
i = parse_space(s,i);
def i, val temperature = parse_int (s,i);
i = parse_space(s,i);
def i, val relhumidity= parse_int (s,i);
return zipcode, temperature, relhumidity;
}
var context = zmq_init 1;
// Socket to talk to server
println "Collecting updates from weather server...";
var subscriber = context.mk_socket ZMQ_SUB;
subscriber.connect "tcp://localhost:5556";
// Subscribe to zipcode 100
filter := if System::argc > 1 then System::argv 1 else "1001" endif;
subscriber.set_opt$ zmq_subscribe filter;
// Process 100 updates
var total_temp = 0;
for var update_nbr in 0 upto 99 do
s := subscriber.recv_string;
zipcode, temperature, relhumidity := parse_weather s;
total_temp += temperature;
done
println$
f"Average temperature for zipcode '%S' was %d C\n"$
filter, total_temp / update_nbr
;
subscriber.close;
context.term;
wuclient: Weather update client in Go
//
// Weather proxy listens to weather server which is constantly
// emitting weather data
// Binds SUB socket to tcp://*:5556
//
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq""os""strconv""strings"
)
funcmain() {
context, _ := zmq.NewContext()
socket, _ := context.NewSocket(zmq.SUB)
defer context.Close()
defer socket.Close()
var temps []stringvar err errorvar temp int64
total_temp := 0
filter := "59937"// find zipcode
iflen(os.Args) > 1 { // ./wuclient 85678
filter = string(os.Args[1])
}
// Subscribe to just one zipcode (whitefish MT 59937)
fmt.Printf("Collecting updates from weather server for %s…\n", filter)
socket.SetSubscribe(filter)
socket.Connect("tcp://localhost:5556")
for i := 0; i < 101; i++ {
// found temperature point
datapt, _ := socket.Recv(0)
temps = strings.Split(string(datapt), " ")
temp, err = strconv.ParseInt(temps[1], 10, 64)
if err == nil {
// Invalid string
total_temp += int(temp)
}
}
fmt.Printf("Average temperature for zipcode %s was %dF \n\n", filter, total_temp/100)
}
wuclient: Weather update client in Haskell
{-# LANGUAGE LambdaCase #-}{-# LANGUAGE OverloadedStrings #-}{-# LANGUAGE ScopedTypeVariables #-}-- Weather update client-- Connects SUB socket to tcp://localhost:5556-- Collects weather updates and finds avg temp in zipcodemoduleMainwhereimportControl.MonadimportqualifiedData.ByteString.Char8as BS
importSystem.EnvironmentimportSystem.ZMQ4.MonadicimportText.Printfmain::IO()main= runZMQ $ do
liftIO $ putStrLn "Collecting updates from weather server..."-- Socket to talk to server
subscriber <- socket Sub
connect subscriber "tcp://localhost:5556"-- Subscribe to zipcode, default is NYC, 10001
filter <- liftIO getArgs >>= \case[]-> return "10001 "
(zipcode:_) -> return (BS.pack zipcode)
subscribe subscriber filter
-- Process 100 updates
temperature <- fmap sum $
replicateM 100 $ do
string <- receive subscriber
let [_, temperature ::Int, _] = map read . words . BS.unpack $ string
return temperature
liftIO $
printf "Average temperature for zipcode '%s' was %dF"
(BS.unpack filter)
(temperature `div` 100)
wuclient: Weather update client in Haxe
package ;
importhaxe.io.Bytes;
importneko.Lib;
importneko.Sys;
importorg.zeromq.ZMQ;
importorg.zeromq.ZMQContext;
importorg.zeromq.ZMQException;
importorg.zeromq.ZMQSocket;
/**
* Weather update client in Haxe
* Connects SUB socket to tcp://localhost:5556
* Collects weather updates and finds average temp in zipcode
*
* Use optional argument to specify zip code (in range 1 to 100000)
*
* See: http://zguide.zeromq.org/page:all#Getting-the-Message-Out
*
* Use with WUServer.hx
*/class WUClient
{
publicstaticfunctionmain() {
var context:ZMQContext = ZMQContext.instance();
Lib.println("** WUClient (see: http://zguide.zeromq.org/page:all#Getting-the-Message-Out)");
// Socket to talk to server
trace ("Collecting updates from weather server...");
var subscriber:ZMQSocket = context.socket(ZMQ_SUB);
subscriber.setsockopt(ZMQ_LINGER, 0); // Don't block when closing socket at end
subscriber.connect("tcp://localhost:5556");
// Subscribe to zipcode, default in NYC, 10001var filter:String =
if (Sys.args().length > 0) {
Sys.args()[0];
} else {
"10001";
};
try {
subscriber.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString(filter));
} catch (e:ZMQException) {
trace (e.str());
}
// Process 100 updatesvar update_nbr = 0;
var total_temp:Int = 0;
for (update_nbr in0...100) {
var msg:Bytes = subscriber.recvMsg();
trace (update_nbr+ ". Received: " + msg.toString());
var zipcode, temperature, relhumidity;
var sscanf:Array<String> = msg.toString().split(" ");
zipcode = sscanf[0];
temperature = sscanf[1];
relhumidity = sscanf[2];
total_temp += Std.parseInt(temperature);
}
trace ("Average temperature for zipcode " + filter + " was " + total_temp / 100);
// Close gracefully
subscriber.close();
context.term();
}
}
wuclient: Weather update client in Java
packageguide;
importjava.util.StringTokenizer;
importorg.zeromq.SocketType;
importorg.zeromq.ZMQ;
importorg.zeromq.ZContext;
//
// Weather update client in Java
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
publicclasswuclient
{
publicstaticvoidmain(String[] args)
{
try (ZContext context = new ZContext()) {
// Socket to talk to server
System.out.println("Collecting updates from weather server");
ZMQ.Socket subscriber = context.createSocket(SocketType.SUB);
subscriber.connect("tcp://localhost:5556");
// Subscribe to zipcode, default is NYC, 10001
String filter = (args.length > 0) ? args[0] : "10001 ";
subscriber.subscribe(filter.getBytes(ZMQ.CHARSET));
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
// Use trim to remove the tailing '0' character
String string = subscriber.recvStr(0).trim();
StringTokenizer sscanf = new StringTokenizer(string, " ");
int zipcode = Integer.valueOf(sscanf.nextToken());
int temperature = Integer.valueOf(sscanf.nextToken());
int relhumidity = Integer.valueOf(sscanf.nextToken());
total_temp += temperature;
}
System.out.println(
String.format(
"Average temperature for zipcode '%s' was %d.",
filter,
(int)(total_temp / update_nbr)
)
);
}
}
}
wuclient: Weather update client in Julia
#!/usr/bin/env julia## Weather update client# Connects SUB socket to tcp://localhost:5556# Collects weather updates and finds avg temp in zipcode#using ZMQ
context = Context()
socket = Socket(context, SUB)
println("Collecting updates from weather server...")
connect(socket, "tcp://localhost:5556")
# Subscribe to zipcode, default is NYC, 10001
zip_filter = length(ARGS) > 0 ? int(ARGS[1]) : 10001
subscribe(socket, string(zip_filter))
# Process 5 updates
update_nbr = 5
total_temp = 0for update in1:update_nbr
global total_temp
message = unsafe_string(recv(socket))
zipcode, temperature, relhumidity = split(message)
total_temp += parse(Int, temperature)
end
avg_temp = total_temp / update_nbr
println("Average temperature for zipcode $zip_filter was $(avg_temp)F")
# Making a clean exit.
close(socket)
close(context)
wuclient: Weather update client in Lua
---- Weather update client-- Connects SUB socket to tcp://localhost:5556-- Collects weather updates and finds avg temp in zipcode---- Author: Robert G. Jakabosky <bobby@sharedrealm.com>--
require"zmq"local context = zmq.init(1)
-- Socket to talk to server
print("Collecting updates from weather server...")
local subscriber = context:socket(zmq.SUB)
subscriber:connect(arg[2] or"tcp://localhost:5556")
-- Subscribe to zipcode, default is NYC, 10001local filter = arg[1] or"10001 "
subscriber:setopt(zmq.SUBSCRIBE, filter)
-- Process 100 updateslocal update_nbr = 0local total_temp = 0for n=1,100dolocal message = subscriber:recv()
local zipcode, temperature, relhumidity = message:match("([%d-]*) ([%d-]*) ([%d-]*)")
total_temp = total_temp + temperature
update_nbr = update_nbr + 1end
print(string.format("Average temperature for zipcode '%s' was %dF, total = %d",
filter, (total_temp / update_nbr), total_temp))
subscriber:close()
context:term()
wuclient: Weather update client in Node.js
// weather update client in node.js
// connects SUB socket to tcp://localhost:5556
// collects weather updates and finds avg temp in zipcode
var zmq = require('zeromq');
console.log("Collecting updates from weather server...");
// Socket to talk to server
var subscriber = zmq.socket('sub');
// Subscribe to zipcode, default is NYC, 10001
var filter = null;
if (process.argv.length > 2) {
filter = process.argv[2];
} else {
filter = "10001";
}
console.log(filter);
subscriber.subscribe(filter);
// process 100 updates
var total_temp = 0
, temps = 0;
subscriber.on('message', function(data) {
var pieces = data.toString().split(" ")
, zipcode = parseInt(pieces[0], 10)
, temperature = parseInt(pieces[1], 10)
, relhumidity = parseInt(pieces[2], 10);
temps += 1;
total_temp += temperature;
if (temps === 100) {
console.log([
"Average temperature for zipcode '",
filter,
"' was ",
(total_temp / temps).toFixed(2),
" F"].join(""));
total_temp = 0;
temps = 0;
}
});
subscriber.connect("tcp://localhost:5556");
wuclient: Weather update client in Objective-C
//
// Weather update client
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
#import "ZMQObjC.h"
intmain(int argc, constchar *argv[])
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init];
ZMQContext *ctx = [[[ZMQContext alloc] initWithIOThreads:1U] autorelease];
// Socket to talk to server
ZMQSocket *subscriber = [ctx socketWithType:ZMQ_SUB];
if (![subscriber connectToEndpoint:@"tcp://localhost:5556"]) {
/* ZMQSocket will already have logged the error. */return EXIT_FAILURE;
}
/* Subscribe to zipcode (defaults to NYC, 10001). */constchar *kNYCZipCode = "10001";
constchar *filter = (argc > 1)? argv[1] : kNYCZipCode;
NSData *filterData = [NSData dataWithBytes:filter length:strlen(filter)];
[subscriber setData:filterData forOption:ZMQ_SUBSCRIBE];
/* Write to stdout immediately rather than at each newline.
* This makes the incremental temperatures appear incrementally.
*/
(void)setvbuf(stdout, NULL, _IONBF, BUFSIZ);
/* Process updates. */
NSLog(@"Collecting temperatures for zipcode %s from weather server...", filter);
constint kMaxUpdate = 100;
long total_temp = 0;
for (int update_nbr = 0; update_nbr < kMaxUpdate; ++update_nbr) {
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init];
NSData *msg = [subscriber receiveDataWithFlags:0];
constchar *string = [msg bytes];
int zipcode = 0, temperature = 0, relhumidity = 0;
(void)sscanf(string, "%d %d %d", &zipcode, &temperature, &relhumidity);
printf("%d ", temperature);
total_temp += temperature;
[pool drain];
}
/* End line of temperatures. */
putchar('\n');
NSLog(@"Average temperature for zipcode '%s' was %ld degF.",
filter, total_temp / kMaxUpdate);
/* [ZMQContext sockets] makes it easy to close all associated sockets. */
[[ctx sockets] makeObjectsPerformSelector:@selector(close)];
[pool drain];
return EXIT_SUCCESS;
}
#lang racket#|
# Weather update client
# Connects SUB socket to tcp://localhost:5556
# Collects weather updates and finds avg temp in zipcode
|#
(require net/zmq)
; Socket to talk to server
(define ctxt (context 1))
(define sock (socket ctxt 'SUB))
(printf"Collecting updates from weather server...\n")
(socket-connect! sock "tcp://localhost:5556")
; Subscribe to zipcode, default is NYC, 10001
(definefilter
(command-line#:program"wuclient"#:args maybe-zip
(match maybe-zip
[(list zipcode) zipcode]
[(list) "10001"]
[else (error 'wuclient"Incorrect argument list")])))
(set-socket-option! sock 'SUBSCRIBE (string->bytes/utf-8filter))
; Process 5 updates
(define how-many 5)
(define total-temp
(for/fold ([tot 0])
([i (in-range how-many)])
(match-define (regexp#rx"([0-9]+) (-?[0-9]+) ([0-9]+)" (list_ zipcode temp humid))
(socket-recv! sock))
(+ tot (string->number (bytes->string/utf-8 temp)))))
(printf"Average temperature for zipcode '~a' was ~a\n"filter (/ total-temp how-many))
(context-close! ctxt)
wuclient: Weather update client in Ruby
#!/usr/bin/env ruby## Weather update client in Ruby# Connects SUB socket to tcp://localhost:5556# Collects weather updates and finds avg temp in zipcode#require'rubygems'require'ffi-rzmq'COUNT = 100
context = ZMQ::Context.new(1)
# Socket to talk to serverputs"Collecting updates from weather server..."
subscriber = context.socket(ZMQ::SUB)
subscriber.connect("tcp://localhost:5556")
# Subscribe to zipcode, default is NYC, 10001
filter = ARGV.size > 0 ? ARGV[0] : "10001 "
subscriber.setsockopt(ZMQ::SUBSCRIBE, filter)
# Process 100 updates
total_temp = 01.upto(COUNT) do |update_nbr|
s = ''
subscriber.recv_string(s)#.split.map(&:to_i)
zipcode, temperature, relhumidity = s.split.map(&:to_i)
total_temp += temperature
endputs"Average temperature for zipcode '#{filter}' was #{total_temp / COUNT}F"
wuclient: Weather update client in Rust
usestd::env;fnatoi(s: &str)-> i64 {s.parse().unwrap()}fnmain(){println!("Collecting updates from weather server...");letcontext=zmq::Context::new();letsubscriber=context.socket(zmq::SUB).unwrap();assert!(subscriber.connect("tcp://localhost:5556").is_ok());letargs: Vec<String>=env::args().collect();letfilter=ifargs.len()>1{&args[1]}else{"10001"};assert!(subscriber.set_subscribe(filter.as_bytes()).is_ok());letmuttotal_temp=0;for_in0..100{letstring=subscriber.recv_string(0).unwrap().unwrap();letchks: Vec<i64>=string.split(' ').map(atoi).collect();let(_zipcode,temperature,_relhumidity)=(chks[0],chks[1],chks[2]);total_temp+=temperature;}println!("Average temperature for zipcode {} was {}F",filter,total_temp/100);}
wuclient: Weather update client in Scala
/*
* Weather update client in Scala
* Connects SUB socket to tcp://localhost:5556
* Collects weather updates and finds avg temp in zipcode
*
* @author Giovanni Ruggiero
* @email giovanni.ruggiero@gmail.com
*/importjava.util.StringTokenizerimportorg.zeromq.ZMQobjectwuclient {
def main(args :Array[String]) {
val context =ZMQ.context(1)
// Socket to talk to server
println("Collecting updates from weather server...")
val subscriber = context.socket(ZMQ.SUB)
subscriber.connect("tcp://localhost:5556")
// Subscribe to zipcode, default is NYC, 10001
val filter = {if (args.length > 0) args(0) else"10001 "}
subscriber.subscribe(filter.getBytes())
// Process 100 updates
val update_nbr =100var total_temp =0for (i <-1 to update_nbr ) {
// Use trim to remove the tailing '0' character
val sscanf =newString(subscriber.recv(0)).trim.split(' ').map(_.toInt)
val zipcode = sscanf(0)
val temperature = sscanf(1)
val relhumidity = sscanf(2)
total_temp += temperature
}
println("Average temperature for zipcode '" + filter + "' was " + (total_temp / update_nbr))
}
}
wuclient: Weather update client in Tcl
## Weather update client
# Connects SUB socket to tcp:#localhost:5556
# Collects weather updates and finds avg temp in zipcode
#
package require zmq
# Socket to talk to server
zmq context context
zmq socket subscriber context SUB
subscriber connect "tcp://localhost:5556"# Subscribe to zipcode, default is NYC, 10001
if{[llength$argv]}{set filter [lindex$argv0]}else{set filter "10001"}subscriber setsockopt SUBSCRIBE $filter# Process 100 updates
set total_temp 0for{set update_nbr 0}{$update_nbr < 100}{incr update_nbr}{zmq message msg
subscriber recv_msg msg
lassign[msg data] zipcode temperature relhumidity
puts[msg data]msg close
incr total_temp $temperature}puts"Averate temperatur for zipcode $filter was [expr {$total_temp/$update_nbr}]F"subscriber close
context term
wuclient: Weather update client in OCaml
(**
Weather update client
Connects SUB socket to tcp://localhost:5556
Collects weather updates and finds avg temp in zipcode
*)openZmqopenHelperslet () =
with_context @@ fun ctx ->
with_socket ctx Socket.sub @@ fun sub ->
printfn "Collecting updates from weather server...";
Socket.connect sub "tcp://localhost:5556";
(* Subscribe to zipcode, default is NYC, 10001 *)let filter = matchArray.to_list Sys.argv with _::zip::_ -> zip | _ -> "10001 "inSocket.subscribe sub filter;
(* Process 100 updates *)let total_temp = ref 0 infor i = 0 to pred 100 doScanf.sscanf (Socket.recv sub) "%d %d %d"beginfun _zipcode temperature _relhumidity ->
total_temp := !total_temp + temperature
enddone;
printfn "Average temperature for zipcode '%s' was %dF" filter (!total_temp / 100)
Note that when you use a SUB socket you must set a subscription using zmq_setsockopt() and SUBSCRIBE, as in this code. If you don’t set any subscription, you won’t get any messages. It’s a common mistake for beginners. The subscriber can set many subscriptions, which are added together. That is, if an update matches ANY subscription, the subscriber receives it. The subscriber can also cancel specific subscriptions. A subscription is often, but not always, a printable string. See zmq_setsockopt() for how this works.
The PUB-SUB socket pair is asynchronous. The client does zmq_recv(), in a loop (or once if that’s all it needs). Trying to send a message to a SUB socket will cause an error. Similarly, the service does zmq_send() as often as it needs to, but must not do zmq_recv() on a PUB socket.
In theory with ZeroMQ sockets, it does not matter which end connects and which end binds. However, in practice there are undocumented differences that I’ll come to later. For now, bind the PUB and connect the SUB, unless your network design makes that impossible.
There is one more important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.
This “slow joiner” symptom hits enough people often enough that we’re going to explain it in detail. Remember that ZeroMQ does asynchronous I/O, i.e., in the background. Say you have two nodes doing this, in this order:
Subscriber connects to an endpoint and receives and counts messages.
Publisher binds to an endpoint and immediately sends 1,000 messages.
Then the subscriber will most likely not receive anything. You’ll blink, check that you set a correct filter and try again, and the subscriber will still not receive anything.
Making a TCP connection involves to and from handshaking that takes several milliseconds depending on your network and the number of hops between peers. In that time, ZeroMQ can send many messages. For sake of argument assume it takes 5 msecs to establish a connection, and that same link can handle 1M messages per second. During the 5 msecs that the subscriber is connecting to the publisher, it takes the publisher only 1 msec to send out those 1K messages.
In
Chapter 2 - Sockets and Patterns we’ll explain how to synchronize a publisher and subscribers so that you don’t start to publish data until the subscribers really are connected and ready. There is a simple and stupid way to delay the publisher, which is to sleep. Don’t do this in a real application, though, because it is extremely fragile as well as inelegant and slow. Use sleeps to prove to yourself what’s happening, and then wait for
Chapter 2 - Sockets and Patterns to see how to do this right.
The alternative to synchronization is to simply assume that the published data stream is infinite and has no start and no end. One also assumes that the subscriber doesn’t care what transpired before it started up. This is how we built our weather client example.
So the client subscribes to its chosen zip code and collects 100 updates for that zip code. That means about ten million updates from the server, if zip codes are randomly distributed. You can start the client, and then the server, and the client will keep working. You can stop and restart the server as often as you like, and the client will keep working. When the client has collected its hundred updates, it calculates the average, prints it, and exits.
Some points about the publish-subscribe (pub-sub) pattern:
A subscriber can connect to more than one publisher, using one connect call each time. Data will then arrive and be interleaved (“fair-queued”) so that no single publisher drowns out the others.
If a publisher has no connected subscribers, then it will simply drop all messages.
If you’re using TCP and a subscriber is slow, messages will queue up on the publisher. We’ll look at how to protect publishers against this using the “high-water mark” later.
From ZeroMQ v3.x, filtering happens at the publisher side when using a connected protocol (tcp:@<>@ or ipc:@<>@). Using the epgm:@<//>@ protocol, filtering happens at the subscriber side. In ZeroMQ v2.x, all filtering happened at the subscriber side.
This is how long it takes to receive and filter 10M messages on my laptop, which is an 2011-era Intel i5, decent but nothing special:
$ time wuclient
Collecting updates from weather server...
Average temperature for zipcode '10001 ' was 28F
real 0m4.470s
user 0m0.000s
sys 0m0.008s
As a final example (you are surely getting tired of juicy code and want to delve back into philological discussions about comparative abstractive norms), let’s do a little supercomputing. Then coffee. Our supercomputing application is a fairly typical parallel processing model. We have:
A ventilator that produces tasks that can be done in parallel
A set of workers that process tasks
A sink that collects results back from the worker processes
In reality, workers run on superfast boxes, perhaps using GPUs (graphic processing units) to do the hard math. Here is the ventilator. It generates 100 tasks, each a message telling the worker to sleep for some number of milliseconds:
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
#include"zhelpers.h"intmain (void)
{
void *context = zmq_ctx_new ();
// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers...\n");
// The first message is "0" and signals start of batch
s_send (sink, "0");
// Initialize random number generator
srandom ((unsigned) time (NULL));
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = randof (100) + 1;
total_msec += workload;
char string [10];
sprintf (string, "%d", workload);
s_send (sender, string);
}
printf ("Total expected cost: %d msec\n", total_msec);
zmq_close (sink);
zmq_close (sender);
zmq_ctx_destroy (context);
return0;
}
taskvent: Parallel task ventilator in C++
//
// Task ventilator in C++
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
//
#include<zmq.hpp>#include<stdlib.h>#include<stdio.h>#include<unistd.h>#include<iostream>#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
intmain (int argc, char *argv[])
{
zmq::context_t context (1);
// Socket to send messages on
zmq::socket_t sender(context, ZMQ_PUSH);
sender.bind("tcp://*:5557");
std::cout << "Press Enter when the workers are ready: " << std::endl;
getchar ();
std::cout << "Sending tasks to workers...\n" << std::endl;
// The first message is "0" and signals start of batch
zmq::socket_t sink(context, ZMQ_PUSH);
sink.connect("tcp://localhost:5558");
zmq::message_t message(2);
memcpy(message.data(), "0", 1);
sink.send(message);
// Initialize random number generator
srandom ((unsigned) time (NULL));
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = within (100) + 1;
total_msec += workload;
message.rebuild(10);
memset(message.data(), '\0', 10);
sprintf ((char *) message.data(), "%d", workload);
sender.send(message);
}
std::cout << "Total expected cost: " << total_msec << " msec" << std::endl;
sleep (1); // Give 0MQ time to deliver
return0;
}
;;; -*- Mode:Lisp; Syntax:ANSI-Common-Lisp; -*-;;;;;; Task ventilator in Common Lisp;;; Binds PUSH socket to tcp://localhost:5557;;; Sends batch of tasks to workers via that socket;;;;;; Kamil Shakirov <kamils80@gmail.com>;;;
(defpackage#:zguide.taskvent
(:nicknames#:taskvent)
(:use#:cl#:zhelpers)
(:export#:main))
(in-package:zguide.taskvent)
(defunmain ()
(zmq:with-context (context1)
;; Socket to send messages on
(zmq:with-socket (sendercontextzmq:push)
(zmq:bindsender"tcp://*:5557")
(message"Press Enter when the workers are ready: ")
(read-char)
(message"Sending tasks to workers...~%")
;; The first message is "0" and signals start of batch
(let ((msg (make-instance'zmq:msg:data"0")))
(zmq:sendsendermsg))
;; Send 100 tasks
(let ((total-msec0))
(loop:repeat100:do;; Random workload from 1 to 100 msecs
(let ((workload (within100)))
(incftotal-msecworkload)
(let ((msg (make-instance'zmq:msg:data (formatnil"~D"workload))))
(zmq:sendsendermsg))))
(message"Total expected cost: ~D msec~%"total-msec)
;; Give 0MQ time to deliver
(sleep1))))
(cleanup))
taskvent: Parallel task ventilator in Delphi
program taskvent;
//
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
// @author Varga Balazs <bb.varga@gmail.com>
//
{$APPTYPE CONSOLE}
uses
SysUtils
, zmqapi
;
const
task_count = 100;
var
context: TZMQContext;
sender,
sink: TZMQSocket;
s: String;
i,
total_msec,
workload: Integer;
begin
context := TZMQContext.Create;
// Socket to send messages on
sender := Context.Socket( stPush );
sender.bind( 'tcp://*:5557' );
// Socket to send start of batch message on
sink := Context.Socket( stPush );
sink.connect( 'tcp://localhost:5558' );
Write( 'Press Enter when the workers are ready: ' );
Readln( s );
Writeln( 'Sending tasks to workers...' );
// The first message is "0" and signals start of batch
sink.send( '0' );
// Initialize random number generator
randomize;
// Send 100 tasks
total_msec := 0; // Total expected cost in msecs
for i := 0 to task_count - 1 do
begin
// Random workload from 1 to 100msecs
workload := Random( 100 ) + 1;
total_msec := total_msec + workload;
s := IntToStr( workload );
sender.send( s );
end;
Writeln( Format( 'Total expected cost: %d msec', [total_msec] ) );
sleep(1000); // Give 0MQ time to deliver
sink.Free;
sender.Free;
Context.Free;
end.
taskvent: Parallel task ventilator in Erlang
#! /usr/bin/env escript
%%
%% Task ventilator
%% Binds PUSH socket to tcp://localhost:5557
%% Sends batch of tasks to workers via that socket
%%
main(_Args) ->
application:start(chumak),
{ok, Sender} = chumak:socket(push),
casechumak:bind(Sender, tcp, "*", 5557) of
{ok, _BindPid} ->
io:format("Binding OK with Pid: ~p\n", [Sender]);
{error, Reason} ->
io:format("Connection Failed for this reason: ~p\n", [Reason]);
X ->
io:format("Unhandled reply for bind ~p\n", [X])
end,
{ok, Sink} = chumak:socket(push),
casechumak:connect(Sink, tcp, "localhost", 5558) of
{ok, _ConnectPid} ->
io:format("Connection OK with Pid: ~p\n", [Sink]);
{error, Reason2} ->
io:format("Connection Failed for this reason: ~p\n", [Reason2]);
X2 ->
io:format("Unhandled reply for connect ~p\n", [X2])
end,
{ok, _} = io:fread("Press Enter when workers are ready: ", ""),
io:format("Sending task to workers~n", []),
ok = chumak:send(Sink, <<"0">>),
%% Send 100 tasks
TotalCost = send_tasks(Sender, 100, 0),
io:format("Total expected cost: ~b msec~n", [TotalCost]).
send_tasks(_Sender, 0, TotalCost) -> TotalCost;
send_tasks(Sender, N, TotalCost) whenN>0 ->
Workload = rand:uniform(100) + 1,
ok = chumak:send(Sender, list_to_binary(integer_to_list(Workload))),
send_tasks(Sender, N-1, TotalCost + Workload).
taskvent: Parallel task ventilator in Elixir
defmodule Taskvent do
@moduledoc"""
Generated by erl2ex (http://github.com/dazuma/erl2ex)
From Erlang source: (Unknown source file)
At: 2019-12-20 13:57:36
"""
def main() do
{:ok, context} = :erlzmq.context()
{:ok, sender} = :erlzmq.socket(context, :push)
:ok = :erlzmq.bind(sender, 'tcp://*:5557')
{:ok, sink} = :erlzmq.socket(context, :push)
:ok = :erlzmq.connect(sink, 'tcp://localhost:5558')
{:ok, _} = :io.fread('Press Enter when workers are ready: ', [])
:io.format('Sending task to workers~n', [])
:ok = :erlzmq.send(sink, "0")
totalCost = send_tasks(sender, 100, 0)
:io.format('Total expected cost: ~b msec~n', [totalCost])
:ok = :erlzmq.close(sink)
:ok = :erlzmq.close(sender)
:erlzmq.term(context, 1000)
end
def send_tasks(_sender, 0, totalCost) do
totalCost
end
def send_tasks(sender, n, totalCost) when n > 0 do
workload = :random.uniform(100) + 1:ok = :erlzmq.send(sender, :erlang.list_to_binary(:erlang.integer_to_list(workload)))
send_tasks(sender, n - 1, totalCost + workload)
end
end
Taskvent.main
//
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
open ZMQ;
var context = zmq_init 1;
// Socket to send messages on
var sender = context.mk_socket ZMQ_PUSH;
sender.bind "tcp://*:5557";
// Socket to send start of batch message on
var sink = context.mk_socket ZMQ_PUSH;
sink.connect "tcp://localhost:5558";
print ("Press Enter when the workers are ready: "); fflush;
C_hack::ignore (readln stdin);
println "Sending tasks to workers...";
// The first message is "0" and signals start of batch
sink.send_string "0";
// Send 100 tasks
var total_msec = 0; // Total expected cost in msecs
for var task_nbr in 0 upto 99 do
// Random workload from 1 to 100msecs
var workload = #rand % 100 + 1;
total_msec += workload;
var s = f"%d" workload;
sender.send_string s;
done
println$ f"Total expected cost: %d msec" total_msec;
Faio::sleep (sys_clock, 1.0); // Give 0MQ time to deliver
sink.close;
sender.close;
context.term;
taskvent: Parallel task ventilator in Go
//
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
//
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq""math/rand""time"
)
funcmain() {
context, _ := zmq.NewContext()
defer context.Close()
// Socket to send messages On
sender, _ := context.NewSocket(zmq.PUSH)
defer sender.Close()
sender.Bind("tcp://*:5557")
// Socket to send start of batch message on
sink, _ := context.NewSocket(zmq.PUSH)
defer sink.Close()
sink.Connect("tcp://localhost:5558")
fmt.Print("Press Enter when the workers are ready: ")
var line string
fmt.Scanln(&line)
fmt.Println("Sending tasks to workers…")
sink.Send([]byte("0"), 0)
// Seed the random number generator
rand.Seed(time.Now().UnixNano())
total_msec := 0for i := 0; i < 100; i++ {
workload := rand.Intn(100)
total_msec += workload
msg := fmt.Sprintf("%d", workload)
sender.Send([]byte(msg), 0)
}
fmt.Printf("Total expected cost: %d msec\n", total_msec)
time.Sleep(1e9) // Give 0MQ time to deliver: one second == 1e9 ns
}
taskvent: Parallel task ventilator in Haskell
{-# LANGUAGE OverloadedStrings #-}{-# LANGUAGE ScopedTypeVariables #-}-- Task ventilator-- Binds PUSH socket to tcp://localhost:5557-- Sends batch of tasks to workers via that socketmoduleMainwhereimportControl.MonadimportqualifiedData.ByteString.Char8as BS
importSystem.ZMQ4.MonadicimportSystem.Randommain::IO()main= runZMQ $ do-- Socket to send messages on
sender <- socket Push
bind sender "tcp://*:5557"-- Socket to send start of batch message on
sink <- socket Push
connect sink "tcp://localhost:5558"
liftIO $ do
putStrLn "Press Enter when the workers are ready: "_<- getLine
putStrLn "Sending tasks to workers..."-- The first message is "0" and signals start of batch
send sink []"0"-- Send 100 tasks
total_msec <- fmap sum $
replicateM 100 $ do-- Random workload from 1 to 100msecs
workload ::Int<- liftIO $ randomRIO (1, 100)
send sender [] $ BS.pack (show workload)
return workload
liftIO . putStrLn $ "Total expected cost: " ++ show total_msec ++ " msec"
taskvent: Parallel task ventilator in Haxe
package ;
importhaxe.io.Bytes;
importhaxe.Stack;
importneko.Lib;
importneko.io.File;
importneko.io.FileInput;
importneko.Sys;
importorg.zeromq.ZMQ;
importorg.zeromq.ZMQContext;
importorg.zeromq.ZMQException;
importorg.zeromq.ZMQSocket;
/**
* Task ventilator in Haxe
* Binds PUSH socket to tcp://localhost:5557
* Sends batch of tasks to workers via that socket.
*
* Based on code from: http://zguide.zeromq.org/java:taskvent
*
* Use with TaskWork.hx and TaskSink.hx
*/class TaskVent
{
publicstaticfunctionmain() {
try {
var context:ZMQContext = ZMQContext.instance();
var sender:ZMQSocket = context.socket(ZMQ_PUSH);
Lib.println("** TaskVent (see: http://zguide.zeromq.org/page:all#Divide-and-Conquer)");
sender.bind("tcp://127.0.0.1:5557");
Lib.println("Press Enter when the workers are ready: ");
var f:FileInput = File.stdin();
var str:String = f.readLine();
Lib.println("Sending tasks to workers ...\n");
// The first message is "0" and signals starts of batch
sender.sendMsg(Bytes.ofString("0"));
// Send 100 tasksvar totalMsec:Int = 0; // Total expected cost in msecfor (task_nbr in0 ... 100) {
var workload = Std.random(100) + 1; // Generates 1 to 100 msecs
totalMsec += workload;
Lib.print(workload + ".");
sender.sendMsg(Bytes.ofString(Std.string(workload)));
}
Lib.println("Total expected cost: " + totalMsec + " msec");
// Give 0MQ time to deliver
Sys.sleep(1);
sender.close();
context.term();
} catch (e:ZMQException) {
trace("ZMQException #:" + ZMQ.errNoToErrorType(e.errNo) + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
}
}
}
taskvent: Parallel task ventilator in Java
packageguide;
importjava.util.Random;
importorg.zeromq.SocketType;
importorg.zeromq.ZMQ;
importorg.zeromq.ZContext;
//
// Task ventilator in Java
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
//
publicclasstaskvent
{
publicstaticvoidmain(String[] args) throws Exception
{
try (ZContext context = new ZContext()) {
// Socket to send messages on
ZMQ.Socket sender = context.createSocket(SocketType.PUSH);
sender.bind("tcp://*:5557");
// Socket to send messages on
ZMQ.Socket sink = context.createSocket(SocketType.PUSH);
sink.connect("tcp://localhost:5558");
System.out.println("Press Enter when the workers are ready: ");
System.in.read();
System.out.println("Sending tasks to workers\n");
// The first message is "0" and signals start of batch
sink.send("0", 0);
// Initialize random number generator
Random srandom = new Random(System.currentTimeMillis());
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = srandom.nextInt(100) + 1;
total_msec += workload;
System.out.print(workload + ".");
String string = String.format("%d", workload);
sender.send(string, 0);
}
System.out.println("Total expected cost: " + total_msec + " msec");
Thread.sleep(1000); // Give 0MQ time to deliver
}
}
}
taskvent: Parallel task ventilator in Julia
#!/usr/bin/env julia## Task ventilator# Binds PUSH socket to tcp://localhost:5557# Sends batch of tasks to workers via that socket#using ZMQ
using Random: seed!
context = Context()
sender = Socket(context, PUSH)
bind(sender, "tcp://*:5557")
# Socket with direct access to the sink: used to synchronize start of batch
sink = Socket(context, PUSH)
connect(sink, "tcp://localhost:5558")
println("Press Enter when the workers are ready: ")
_ = readline(stdin)
println("Sending tasks to workers...")
# The first message is "0" and signals start of batch
send(sink, 0x30)
# Initialize random number generator
seed!(1)
# Send 100 tasks
total_msec = 0for task_nbr in1:100global total_msec
# Random workload from 1 to 100 msecs
workload = rand(1:100)
total_msec += workload
send(sender, "$workload")
end
println("Total expected cost: $total_msec msec")
# Give 0MQ time to deliver
sleep(1)
# Making a clean exit.
close(sender)
close(sink)
close(context)
taskvent: Parallel task ventilator in Lua
---- Task ventilator-- Binds PUSH socket to tcp://localhost:5557-- Sends batch of tasks to workers via that socket---- Author: Robert G. Jakabosky <bobby@sharedrealm.com>--
require"zmq"
require"zhelpers"local context = zmq.init(1)
-- Socket to send messages onlocal sender = context:socket(zmq.PUSH)
sender:bind("tcp://*:5557")
printf ("Press Enter when the workers are ready: ")
io.read('*l')
printf ("Sending tasks to workers...\n")
-- The first message is "0" and signals start of batch
sender:send("0")
-- Initialize random number generator
math.randomseed(os.time())
-- Send 100 taskslocal task_nbr
local total_msec = 0-- Total expected cost in msecsfor task_nbr=0,99dolocal workload
-- Random workload from 1 to 100msecs
workload = randof (100) + 1
total_msec = total_msec + workload
local msg = string.format("%d", workload)
sender:send(msg)
end
printf ("Total expected cost: %d msec\n", total_msec)
s_sleep (1000) -- Give 0MQ time to deliver
sender:close()
context:term()
taskvent: Parallel task ventilator in Node.js
// Task ventilator in node.js
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket.
var zmq = require('zeromq');
process.stdin.resume();
process.stdin.setRawMode(true);
// Socket to send messages on
var sender = zmq.socket('push');
sender.bindSync("tcp://*:5557");
var sink = zmq.socket('push');
sink.connect('tcp://localhost:5558');
var i = 0
, total_msec = 0;
function work() {
console.log("Sending tasks to workers...");
// The first message is "0" and signals start of batch
sink.send("0");
// send 100 tasks
while (i < 100) {
var workload = Math.abs(Math.round(Math.random() * 100)) + 1;
total_msec += workload;
process.stdout.write(workload.toString() + ".");
sender.send(workload.toString());
i++;
}
console.log("Total expected cost:", total_msec, "msec");
sink.close();
sender.close();
process.exit();
};
console.log("Press enter when the workers are ready...");
process.stdin.on("data", function() {
if (i === 0) {
work();
}
process.stdin.pause();
});
taskvent: Parallel task ventilator in Objective-C
/* Task ventilator - sends task batch to workers via PUSH socket. */#import <Foundation/Foundation.h>
#import "ZMQObjC.h"
#import "ZMQHelper.h"
intmain(void)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init];
ZMQContext *ctx = [[[ZMQContext alloc] initWithIOThreads:1U] autorelease];
ZMQSocket *sender = [ctx socketWithType:ZMQ_PUSH];
[sender bindToEndpoint:@"tcp://*:5557"];
NSLog(@"Press Enter when the workers are ready: ");
(void)getchar();
NSLog(@"Sending tasks to workers...");
/* Signal batch start with message of "0". */
NSData *signalData = [@"0" dataUsingEncoding:NSUTF8StringEncoding];
[sender sendData:signalData withFlags:0];
/* Initialize random number generator. */
(void)srandom((unsigned)time(NULL));
/* Send kTaskCount tasks. */unsignedlong totalMsec = 0UL;
staticconstint kTaskCount = 100;
for (int task = 0; task < kTaskCount; ++task) {
/* Random workload from 1 to 100 msec. */int workload = within(100) + 1;
totalMsec += workload;
NSString *text = [NSString stringWithFormat:@"%d", workload];
NSData *textData = [text dataUsingEncoding:NSUTF8StringEncoding];
[sender sendData:textData withFlags:0];
}
NSLog(@"Total expected cost: %lu ms", totalMsec);
/* Let IOThreads finish sending. */
sleep(1);
[sender close];
[pool drain];
return EXIT_SUCCESS;
}
# Task ventilator# Binds PUSH socket to tcp://localhost:5557# Sends batch of tasks to workers via that socketusestrict;
usewarnings;
usev5.10;
useZMQ::FFI;
useZMQ::FFI::Constantsqw(ZMQ_PUSH);
my$ctx = ZMQ::FFI->new();
# Socket to send messages onmy$sender = $ctx->socket(ZMQ_PUSH);
$sender->bind('tcp://*:5557');
# Socket to send start of batch message onmy$sink = $ctx->socket(ZMQ_PUSH);
$sink->connect('tcp://localhost:5558');
say "Press Enter when the workers are ready: ";
<STDIN>;
say "Sending tasks to workers...";
# The first message is "0" and signals start of batch$sink->send('0');
# Send 100 tasksmy$total_msec = 0;
my$workload;
for (1..100) {
$workload = int(rand(100) + 1);
$total_msec += $workload;
$sender->send($workload);
}
say "Total expected cost: $total_msec msec";
taskvent: Parallel task ventilator in PHP
<?php/*
* Task ventilator
* Binds PUSH socket to tcp://localhost:5557
* Sends batch of tasks to workers via that socket
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/$context = new ZMQContext();
// Socket to send messages on
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender->bind("tcp://*:5557");
echo"Press Enter when the workers are ready: ";
$fp = fopen('php://stdin', 'r');
$line = fgets($fp, 512);
fclose($fp);
echo"Sending tasks to workers...", PHP_EOL;
// The first message is "0" and signals start of batch
$sender->send(0);
// Send 100 tasks
$total_msec = 0; // Total expected cost in msecs
for ($task_nbr = 0; $task_nbr < 100; $task_nbr++) {
// Random workload from 1 to 100msecs
$workload = mt_rand(1, 100);
$total_msec += $workload;
$sender->send($workload);
}
printf ("Total expected cost: %d msec\n", $total_msec);
sleep (1); // Give 0MQ time to deliver
taskvent: Parallel task ventilator in Python
# Task ventilator# Binds PUSH socket to tcp://localhost:5557# Sends batch of tasks to workers via that socket## Author: Lev Givon <lev(at)columbia(dot)edu>importzmqimportrandomimporttime
context = zmq.Context()
# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")
# Socket with direct access to the sink: used to synchronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")
print("Press Enter when the workers are ready: ")
_ = input()
print("Sending tasks to workers...")
# The first message is "0" and signals start of batch
sink.send(b'0')
# Initialize random number generator
random.seed()
# Send 100 tasks
total_msec = 0for task_nbr inrange(100):
# Random workload from 1 to 100 msecs
workload = random.randint(1, 100)
total_msec += workload
sender.send_string(f"{workload}")
print(f"Total expected cost: {total_msec} msec")
# Give 0MQ time to deliver
time.sleep(1)
#!/usr/bin/env ruby## Task ventilator in Ruby# Binds PUSH socket to tcp://localhost:5557# Sends batch of tasks to workers via that socket#require'rubygems'require'ffi-rzmq'
context = ZMQ::Context.new(1)
# Socket to send messages on
sender = context.socket(ZMQ::PUSH)
sender.bind("tcp://*:5557")
# Socket to start of batch message on
sink = context.socket(ZMQ::PUSH)
sink.connect("tcp://localhost:5558")
puts"Press enter when the workers are ready..."
$stdin.read(1)
puts"Sending tasks to workers..."# The first message is "0" and signals start of batch
sink.send_string('0')
# Send 100 tasks
total_msec = 0# Total expected cost in msecs100.times do
workload = rand(100) + 1
total_msec += workload
$stdout << "#{workload}."
sender.send_string(workload.to_s)
endputs"Total expected cost: #{total_msec} msec"Kernel.sleep(1) # Give 0MQ time to deliver
taskvent: Parallel task ventilator in Rust
userand::Rng;usestd::io::{self,BufRead};usestd::{thread,time};fnmain(){letcontext=zmq::Context::new();letsender=context.socket(zmq::PUSH).unwrap();assert!(sender.bind("tcp://*:5557").is_ok());letsink=context.socket(zmq::PUSH).unwrap();assert!(sink.connect("tcp://localhost:5558").is_ok());println!("Press Enter when the workers are ready: ");letstdin=io::stdin();stdin.lock().lines().next();println!("Sending tasks to workers...");sink.send("0",0).unwrap();letmutrng=rand::thread_rng();letmuttotal_msec=0;for_in0..100{letworkload=rng.gen_range(1..101);total_msec+=workload;letstring=format!("{}",workload);sender.send(&string,0).unwrap();}println!("Total expected cost: {} msec",total_msec);thread::sleep(time::Duration::from_secs(1));}
taskvent: Parallel task ventilator in Scala
/*
*
* Task ventilator in Scala
* Binds PUSH socket to tcp://localhost:5557
* Sends batch of tasks to workers via that socket
*
* @author Giovanni Ruggiero
* @email giovanni.ruggiero@gmail.com
*/importjava.util.Randomimportorg.zeromq.ZMQobjecttaskvent {
def main(args :Array[String]) {
val context =ZMQ.context(1)
// Socket to send messages on
val sender = context.socket(ZMQ.PUSH)
sender.bind("tcp://*:5557")
println("Press Enter when the workers are ready: ")
System.in.read()
println("Sending tasks to workers...\n")
// The first message is "0" and signals start of batch
sender.send("0\u0000".getBytes(), 0)
// Initialize random number generator
val srandom =newRandom(System.currentTimeMillis())
// Send 100 tasks
var total_msec =0// Total expected cost in msecs
for (i <-1 to 100 ) {
// Random workload from 1 to 100msecs
val workload = srandom.nextInt(100) + 1
total_msec += workload
print(workload + ".")
val string =String.format("%d\u0000", workload.asInstanceOf[Integer] )
sender.send(string.getBytes(), 0)
}
println("Total expected cost: " + total_msec + " msec")
Thread.sleep(1000) // Give 0MQ time to deliver
}
}
taskvent: Parallel task ventilator in Tcl
## Task ventilator
# Binds PUSH socket to tcp://localhost:5557
# Sends batch of tasks to workers via that socket
#
package require zmq
zmq context context
zmq socket sender context PUSH
sender bind "tcp://*:5557"zmq socket sink context PUSH
sink connect "tcp://localhost:5558"puts -nonewline "Press Enter when the workers are ready: "flush stdout
gets stdin c
puts"Sending tasks to workers..."# The first message is "0" and signals start of batch
sink send "0"# Initialize random number generator
expr{srand([clock seconds])}# Send 100 tasks
set total_msec 0for{set task_nbr 0}{$task_nbr < 100}{incr task_nbr}{set workload [expr{int(rand()*100)+1}]puts -nonewline "$workload."incr total_msec $workloadsender send $workload}puts"Total expected cost: $total_msec msec"after1000sink close
sender close
context term
taskvent: Parallel task ventilator in OCaml
(**
Task ventilator
Binds PUSH socket to tcp://localhost:5557
Sends batch of tasks to workers via that socket
*)openZmqopenHelperslet () =
with_context @@ fun ctx ->
(* Socket to send messages on *)
with_socket ctx Socket.push @@ fun sender ->
Socket.bind sender "tcp://*:5557";
(* Socket to send start of batch message on *)
with_socket ctx Socket.push @@ fun sink ->
Socket.connect sink "tcp://localhost:5558";
print_string "Press Enter when the workers are ready: ";
flush stdout;
let _ = input_line stdin in
printfn "Sending tasks to workers...";
(* The first message is "0" and signals start of batch *)Socket.send sink "0";
(* Initialize random number generator *)Random.self_init ();
(* Send 100 tasks *)let total_msec = ref 0 in(* Total expected cost in msecs *)for i = 0 to pred 100 do(* Random workload from 1 to 100msecs *)let workload = Random.int 100 + 1 in
total_msec := !total_msec + workload;
Socket.send sender (string_of_int workload)
done;
printfn "Total expected cost: %d msec" !total_msec
// Task worker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
#include"zhelpers.h"intmain (void)
{
// Socket to receive messages on
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// Process tasks forever
while (1) {
char *string = s_recv (receiver);
printf ("%s.", string); // Show progress
fflush (stdout);
s_sleep (atoi (string)); // Do the work
free (string);
s_send (sender, ""); // Send results to sink
}
zmq_close (receiver);
zmq_close (sender);
zmq_ctx_destroy (context);
return0;
}
taskwork: Parallel task worker in C++
//
// Task worker in C++
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
//
#include"zhelpers.hpp"#include<string>intmain (int argc, char *argv[])
{
zmq::context_t context(1);
// Socket to receive messages on
zmq::socket_t receiver(context, ZMQ_PULL);
receiver.connect("tcp://localhost:5557");
// Socket to send messages to
zmq::socket_t sender(context, ZMQ_PUSH);
sender.connect("tcp://localhost:5558");
// Process tasks forever
while (1) {
zmq::message_t message;
int workload; // Workload in msecs
receiver.recv(&message);
std::string smessage(static_cast<char*>(message.data()), message.size());
std::istringstream iss(smessage);
iss >> workload;
// Do the work
s_sleep(workload);
// Send results to sink
message.rebuild();
sender.send(message);
// Simple progress indicator for the viewer
std::cout << "." << std::flush;
}
return0;
}
;;; -*- Mode:Lisp; Syntax:ANSI-Common-Lisp; -*-;;;;;; Task worker in Common Lisp;;; Connects PULL socket to tcp://localhost:5557;;; Collects workloads from ventilator via that socket;;; Connects PUSH socket to tcp://localhost:5558;;; Sends results to sink via that socket;;;;;; Kamil Shakirov <kamils80@gmail.com>;;;
(defpackage#:zguide.taskwork
(:nicknames#:taskwork)
(:use#:cl#:zhelpers)
(:export#:main))
(in-package:zguide.taskwork)
(defunmain ()
(zmq:with-context (context1)
;; Socket to receive messages on
(zmq:with-socket (receivercontextzmq:pull)
(zmq:connectreceiver"tcp://localhost:5557")
;; Socket to send messages to
(zmq:with-socket (sendercontextzmq:push)
(zmq:connectsender"tcp://localhost:5558")
;; Process tasks forever
(loop
(let ((pull-msg (make-instance'zmq:msg)))
(zmq:recvreceiverpull-msg)
(let* ((string (zmq:msg-data-as-stringpull-msg))
(delay (* (parse-integerstring) 1000)))
;; Simple progress indicator for the viewer
(message"~A."string)
;; Do the work
(isys:usleepdelay)
;; Send results to sink
(let ((push-msg (make-instance'zmq:msg:data"")))
(zmq:sendsenderpush-msg))))))))
(cleanup))
taskwork: Parallel task worker in Delphi
program taskwork;
//
// Task worker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
// @author Varga Balazs <bb.varga@gmail.com>
//
{$APPTYPE CONSOLE}
uses
SysUtils
, zmqapi
;
var
context: TZMQContext;
receiver,
sender: TZMQSocket;
s: Utf8String;
begin
context := TZMQContext.Create;
// Socket to receive messages on
receiver := Context.Socket( stPull );
receiver.connect( 'tcp://localhost:5557' );
// Socket to send messages to
sender := Context.Socket( stPush );
sender.connect( 'tcp://localhost:5558' );
// Process tasks forever
while True do
begin
receiver.recv( s );
// Simple progress indicator for the viewer
Writeln( s );
// Do the work
sleep( StrToInt( s ) );
// Send results to sink
sender.send('');
end;
receiver.Free;
sender.Free;
context.Free;
end.
taskwork: Parallel task worker in Erlang
#! /usr/bin/env escript
%%
%% Task worker
%% Connects PULL socket to tcp://localhost:5557
%% Collects workloads from ventillator via that socket
%% Connects PUSH socket to tcp://localhost:5558
%% Sends results to sink via that socket
%%
main(_) ->
application:start(chumak),
{ok, Receiver} = chumak:socket(pull),
casechumak:connect(Receiver, tcp, "localhost", 5557) of
{ok, _ConnectPid} ->
io:format("Connection OK with Pid: ~p\n", [Receiver]);
{error, Reason} ->
io:format("Connection failed for this reason: ~p\n", [Reason])
end,
{ok, Sender} = chumak:socket(push),
casechumak:connect(Sender, tcp, "localhost", 5558) of
{ok, _ConnectPid1} ->
io:format("Connection OK with Pid: ~p\n", [Sender])
end,
loop(Receiver, Sender).
loop(Receiver, Sender) ->
{ok, Work} = chumak:recv(Receiver),
io:format(" . "),
timer:sleep(list_to_integer(binary_to_list(Work))),
ok = chumak:send(Sender, <<" ">>),
loop(Receiver, Sender).
taskwork: Parallel task worker in Elixir
defmodule Taskwork do
@moduledoc"""
Generated by erl2ex (http://github.com/dazuma/erl2ex)
From Erlang source: (Unknown source file)
At: 2019-12-20 13:57:37
"""
def main() do
{:ok, context} = :erlzmq.context()
{:ok, receiver} = :erlzmq.socket(context, :pull)
:ok = :erlzmq.connect(receiver, 'tcp://localhost:5557')
{:ok, sender} = :erlzmq.socket(context, :push)
:ok = :erlzmq.connect(sender, 'tcp://localhost:5558')
loop(receiver, sender)
:ok = :erlzmq.close(receiver)
:ok = :erlzmq.close(sender)
:ok = :erlzmq.term(context)
end
def loop(receiver, sender) do
{:ok, work} = :erlzmq.recv(receiver)
:io.format('.')
:timer.sleep(:erlang.list_to_integer(:erlang.binary_to_list(work)))
:ok = :erlzmq.send(sender, "")
loop(receiver, sender)
end
end
Taskwork.main
//
// Task worker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
open ZMQ;
var context = zmq_init (1);
// Socket to receive messages on
var receiver = context.mk_socket ZMQ_PULL;
receiver.connect "tcp://localhost:5557";
// Socket to send messages to
var sender = context.mk_socket ZMQ_PUSH;
sender.connect "tcp://localhost:5558";
// Process tasks forever
while true do
var s = receiver.recv_string;
// Simple progress indicator for the viewer
println s; fflush stdout;
// Do the work
Faio::sleep (sys_clock, atof s/1000.0);
// Send results to sink
sender.send_string "";
done
taskwork: Parallel task worker in Go
//
// Task Wroker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
//
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq""strconv""time"
)
funcmain() {
context, _ := zmq.NewContext()
defer context.Close()
// Socket to receive messages on
receiver, _ := context.NewSocket(zmq.PULL)
defer receiver.Close()
receiver.Connect("tcp://localhost:5557")
// Socket to send messages to task sink
sender, _ := context.NewSocket(zmq.PUSH)
defer sender.Close()
sender.Connect("tcp://localhost:5558")
// Process tasks forever
for {
msgbytes, _ := receiver.Recv(0)
fmt.Printf("%s.\n", string(msgbytes))
// Do the work
msec, _ := strconv.ParseInt(string(msgbytes), 10, 64)
time.Sleep(time.Duration(msec) * 1e6)
// Send results to sink
sender.Send([]byte(""), 0)
}
}
taskwork: Parallel task worker in Haskell
{-# LANGUAGE OverloadedStrings #-}-- Task worker-- Connects PULL socket to tcp://localhost:5557-- Collects workloads from ventilator via that socket-- Connects PUSH socket to tcp://localhost:5558-- Sends results to sink via that socketmoduleMainwhereimportControl.ConcurrentimportControl.MonadimportData.MonoidimportqualifiedData.ByteString.Char8as BS
importSystem.IOimportSystem.ZMQ4.Monadicmain::IO()main= runZMQ $ do-- Socket to receive messages on
receiver <- socket Pull
connect receiver "tcp://localhost:5557"-- Socket to send messages to
sender <- socket Push
connect sender "tcp://localhost:5558"-- Process tasks forever
forever $ do
string <- receive receiver
liftIO $ doBS.putStr (string <> ".")
hFlush stdout
threadDelay $ read (BS.unpack string) * 1000
send sender []""
taskwork: Parallel task worker in Haxe
package ;
importhaxe.io.Bytes;
importneko.Lib;
importneko.Sys;
importorg.zeromq.ZMQ;
importorg.zeromq.ZMQContext;
importorg.zeromq.ZMQSocket;
/**
* Task worker in Haxe
* Connects PULL socket to tcp://localhost:5557
* Collects workloads from ventilator via that socket
* Connects PUSH socket to tcp://localhost:5558
* Sends results to sink via that socket
*
* See: http://zguide.zeromq.org/page:all#Divide-and-Conquer
*
* Based on code from: http://zguide.zeromq.org/java:taskwork
*
* Use with TaskVent.hx and TaskSink.hx
*/class TaskWork
{
publicstaticfunctionmain() {
var context:ZMQContext = ZMQContext.instance();
Lib.println("** TaskWork (see: http://zguide.zeromq.org/page:all#Divide-and-Conquer)");
// Socket to receive messages onvar receiver:ZMQSocket = context.socket(ZMQ_PULL);
receiver.connect("tcp://127.0.0.1:5557");
// Socket to send messages tovar sender:ZMQSocket = context.socket(ZMQ_PUSH);
sender.connect("tcp://127.0.0.1:5558");
// Process tasks foreverwhile (true) {
var msgString = StringTools.trim(receiver.recvMsg().toString());
var sec:Float = Std.parseFloat(msgString) / 1000.0;
Lib.print(msgString + ".");
// Do the work
Sys.sleep(sec);
// Send results to sink
sender.sendMsg(Bytes.ofString(""));
}
}
}
taskwork: Parallel task worker in Java
packageguide;
importorg.zeromq.SocketType;
importorg.zeromq.ZMQ;
importorg.zeromq.ZContext;
//
// Task worker in Java
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
//
publicclasstaskwork
{
publicstaticvoidmain(String[] args) throws Exception
{
try (ZContext context = new ZContext()) {
// Socket to receive messages on
ZMQ.Socket receiver = context.createSocket(SocketType.PULL);
receiver.connect("tcp://localhost:5557");
// Socket to send messages to
ZMQ.Socket sender = context.createSocket(SocketType.PUSH);
sender.connect("tcp://localhost:5558");
// Process tasks forever
while (!Thread.currentThread().isInterrupted()) {
String string = new String(receiver.recv(0), ZMQ.CHARSET).trim();
long msec = Long.parseLong(string);
// Simple progress indicator for the viewer
System.out.flush();
System.out.print(string + '.');
// Do the work
Thread.sleep(msec);
// Send results to sink
sender.send(ZMQ.MESSAGE_SEPARATOR, 0);
}
}
}
}
taskwork: Parallel task worker in Julia
#!/usr/bin/env julia## Task worker# Connects PULL socket to tcp://localhost:5557# Collects workloads from ventilator via that socket# Connects PUSH socket to tcp://localhost:5558# Sends results to sink via that socket#using ZMQ
context = Context()
# Socket to receive messages on
receiver = Socket(context, PULL)
connect(receiver, "tcp://localhost:5557")
# Socket to send messages to
sender = Socket(context, PUSH)
connect(sender, "tcp://localhost:5558")
# Process tasks foreverwhiletrue
s = recv(receiver, String)
# Simple progress indicator for the viewer
write(stdout, ".")
flush(stdout)
# Do the work
sleep(parse(Int, s) * 0.001)
# Send results to sink
send(sender, 0x00)
end
close(sender)
close(receiver)
taskwork: Parallel task worker in Lua
---- Task worker-- Connects PULL socket to tcp://localhost:5557-- Collects workloads from ventilator via that socket-- Connects PUSH socket to tcp://localhost:5558-- Sends results to sink via that socket---- Author: Robert G. Jakabosky <bobby@sharedrealm.com>--
require"zmq"
require"zhelpers"local context = zmq.init(1)
-- Socket to receive messages onlocal receiver = context:socket(zmq.PULL)
receiver:connect("tcp://localhost:5557")
-- Socket to send messages tolocal sender = context:socket(zmq.PUSH)
sender:connect("tcp://localhost:5558")
-- Process tasks foreverwhiletruedolocal msg = receiver:recv()
-- Simple progress indicator for the viewer
io.stdout:flush()
printf("%s.", msg)
-- Do the work
s_sleep(tonumber(msg))
-- Send results to sink
sender:send("")
end
receiver:close()
sender:close()
context:term()
taskwork: Parallel task worker in Node.js
// Task worker in node.js
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
var zmq = require('zeromq')
, receiver = zmq.socket('pull')
, sender = zmq.socket('push');
receiver.on('message', function(buf) {
var msec = parseInt(buf.toString(), 10);
// simple progress indicator for the viewer
process.stdout.write(buf.toString() + ".");
// do the work
// not a great node sample for zeromq,
// node receives messages while timers run.
setTimeout(function() {
sender.send("");
}, msec);
});
receiver.connect('tcp://localhost:5557');
sender.connect('tcp://localhost:5558');
process.on('SIGINT', function() {
receiver.close();
sender.close();
process.exit();
});
taskwork: Parallel task worker in Objective-C
/* taskwork.m: PULLs workload from tcp://localhost:5557
* PUSHes results to tcp://localhost:5558
*/#import <Foundation/Foundation.h>
#import "ZMQObjC.h"
#define NSEC_PER_MSEC (1000000)
intmain(void)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init];
ZMQContext *ctx = [[[ZMQContext alloc] initWithIOThreads:1U] autorelease];
/* (jws/2011-02-05)!!!: Do NOT terminate the endpoint with a final slash.
* If you connect to @"tcp://localhost:5557/", you will get
* Assertion failed: rc == 0 (zmq_connecter.cpp:46)
* instead of a connected socket. Binding works fine, though. */
ZMQSocket *pull = [ctx socketWithType:ZMQ_PULL];
[pull connectToEndpoint:@"tcp://localhost:5557"];
ZMQSocket *push = [ctx socketWithType:ZMQ_PUSH];
[push connectToEndpoint:@"tcp://localhost:5558"];
/* Process tasks forever. */struct timespec t;
NSData *emptyData = [NSData data];
for (;;) {
NSAutoreleasePool *p = [[NSAutoreleasePool alloc] init];
NSData *d = [pull receiveDataWithFlags:0];
NSString *s = [NSString stringWithUTF8String:[d bytes]];
t.tv_sec = 0;
t.tv_nsec = [s integerValue] * NSEC_PER_MSEC;
printf("%d.", [s intValue]);
fflush(stdout);
/* Do work, then report finished. */
(void)nanosleep(&t, NULL);
[push sendData:emptyData withFlags:0];
[p drain];
}
[ctx closeSockets];
[pool drain];
return EXIT_SUCCESS;
}
# Task worker in Perl# Connects PULL socket to tcp://localhost:5557# Collects workloads from ventilator via that socket# Connects PUSH socket to tcp://localhost:5558# Sends results to sink via that socketusestrict;
usewarnings;
$| = 1; # autoflush stdout after each printuseTime::HiResqw(usleep);
useZMQ::FFI;
useZMQ::FFI::Constantsqw(ZMQ_PUSH ZMQ_PULL);
my$context = ZMQ::FFI->new();
# Socket to receive messages onmy$receiver = $context->socket(ZMQ_PULL);
$receiver->connect('tcp://localhost:5557');
# Socket to send messages onmy$sender = $context->socket(ZMQ_PUSH);
$sender->connect('tcp://localhost:5558');
# Process tasks forevermy$string;
while (1) {
$string = $receiver->recv();
print"$string."; # Show progress
usleep $string*1000; # Do the work$sender->send(""); # Send results to sink
}
taskwork: Parallel task worker in PHP
<?php/*
* Task worker
* Connects PULL socket to tcp://localhost:5557
* Collects workloads from ventilator via that socket
* Connects PUSH socket to tcp://localhost:5558
* Sends results to sink via that socket
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/$context = new ZMQContext();
// Socket to receive messages on
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->connect("tcp://localhost:5557");
// Socket to send messages to
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender->connect("tcp://localhost:5558");
// Process tasks forever
while (true) {
$string = $receiver->recv();
// Simple progress indicator for the viewer
echo$string, PHP_EOL;
// Do the work
usleep($string * 1000);
// Send results to sink
$sender->send("");
}
taskwork: Parallel task worker in Python
# Task worker# Connects PULL socket to tcp://localhost:5557# Collects workloads from ventilator via that socket# Connects PUSH socket to tcp://localhost:5558# Sends results to sink via that socket## Author: Lev Givon <lev(at)columbia(dot)edu>importsysimporttimeimportzmq
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
# Process tasks foreverwhile True:
s = receiver.recv()
# Simple progress indicator for the viewer
sys.stdout.write('.')
sys.stdout.flush()
# Do the work
time.sleep(int(s)*0.001)
# Send results to sink
sender.send(b'')
#!/usr/bin/env ruby## Task worker in Ruby# Connects PULL socket to tcp://localhost:5557# Collects workloads from ventilator via that socket# Connects PUSH socket to tcp://localhost:5558# Sends results to sink via that socket#require'rubygems'require'ffi-rzmq'
context = ZMQ::Context.new(1)
# Socket to receive messages on
receiver = context.socket(ZMQ::PULL)
receiver.connect("tcp://localhost:5557")
# Socket to send messages to
sender = context.socket(ZMQ::PUSH)
sender.connect("tcp://localhost:5558")
# Process tasks foreverwhiletrue
receiver.recv_string(msec = '')
# Simple progress indicator for the viewer
$stdout << "#{msec}."
$stdout.flush
# Do the worksleep(msec.to_f / 1000)
# Send results to sink
sender.send_string("")
end
/*
* Task worker in Scala
* Connects PULL socket to tcp://localhost:5557
* Collects workloads from ventilator via that socket
* Connects PUSH socket to tcp://localhost:5558
* Sends results to sink via that socket
*
* @author Giovanni Ruggiero
* @email giovanni.ruggiero@gmail.com
*/importorg.zeromq.ZMQobjecttaskwork {
def main(args :Array[String]) {
val context =ZMQ.context(1)
// Socket to receive messages on
val receiver = context.socket(ZMQ.PULL)
receiver.connect("tcp://localhost:5557")
// Socket to send messages to
val sender = context.socket(ZMQ.PUSH)
sender.connect("tcp://localhost:5558")
// Process tasks forever
while (true) {
val string =newString(receiver.recv(0)).trim()
val nsec = string.toLong * 1000// Simple progress indicator for the viewer
System.out.flush()
print(string + '.')
// Do the work
Thread.sleep(nsec)
// Send results to sink
sender.send("".getBytes(), 0)
}
}
}
taskwork: Parallel task worker in Tcl
## Task worker
# Connects PULL socket to tcp://localhost:5557
# Collects workloads from ventilator via that socket
# Connects PUSH socket to tcp://localhost:5558
# Sends results to sink via that socket
#
package require zmq
zmq context context
# Socket to receive messages on
zmq socket receiver context PULL
receiver connect "tcp://localhost:5557"# Socket to send messages to
zmq socket sender context PUSH
sender connect "tcp://localhost:5558"# Process tasks forever
while{1}{set string [receiver recv]# Simple progress indicator for the viewer
puts -nonewline "$string."flush stdout
# Do the work
after$string# Send result to sink
sender send "$string"}receiver close
sender close
context term
taskwork: Parallel task worker in OCaml
(**
Task worker
Connects PULL socket to tcp://localhost:5557
Collects workloads from ventilator via that socket
Connects PUSH socket to tcp://localhost:5558
Sends results to sink via that socket
*)openZmqopenHelperslet () =
with_context @@ fun ctx ->
(* Socket to receive messages on *)
with_socket ctx Socket.pull @@ fun receiver ->
Socket.connect receiver "tcp://localhost:5557";
(* Socket to send messages to *)
with_socket ctx Socket.push @@ fun sender ->
Socket.connect sender "tcp://localhost:5558";
(* Process tasks forever *)while true dolet s = Socket.recv receiver in
printfn "%s." s;
sleep_ms @@ int_of_string s; (* Do the work *)Socket.send sender ""; (* Send results to sink *)done
Here is the sink application. It collects the 100 tasks, then calculates how long the overall processing took, so we can confirm that the workers really were running in parallel if there are more than one of them:
//
// Task sink
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
//
include "std/posix/time";
open ZMQ;
// Prepare our context and socket
var context = zmq_init 1;
var receiver = context.mk_socket ZMQ_PULL;
receiver.bind "tcp://*:5558";
// Wait for start of batch
C_hack::ignore receiver.recv_string;
// Start our clock now
var start_time = #Time::time;
// Process 100 confirmations
for var task_nbr in 0 upto 99 do
C_hack::ignore receiver.recv_string;
if (task_nbr / 10) * 10 == task_nbr do
print ":";
else
print ".";
fflush (stdout);
done
done
// Calculate and report duration of batch
var now = #Time::time;
println$ f"Total elapsed time: %d ms"
((now - start_time)*1000.0).int
;
receiver.close;
context.term;
tasksink: Parallel task sink in Go
//
// Task sink
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
//
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq""time"
)
funcmain() {
context, _ := zmq.NewContext()
defer context.Close()
// Socket to receive messages on
receiver, _ := context.NewSocket(zmq.PULL)
defer receiver.Close()
receiver.Bind("tcp://*:5558")
// Wait for start of batch
msgbytes, _ := receiver.Recv(0)
fmt.Println("Received Start Msg ", string(msgbytes))
// Start our clock now
start_time := time.Now().UnixNano()
// Process 100 confirmations
for i := 0; i < 100; i++ {
msgbytes, _ = receiver.Recv(0)
fmt.Print(".")
}
// Calculate and report duration of batch
te := time.Now().UnixNano()
fmt.Printf("Total elapsed time: %d msec\n", (te-start_time)/1e6)
}
tasksink: Parallel task sink in Haskell
-- Task sink-- Binds PULL socket to tcp://localhost:5558-- Collects results from workers via that socketmoduleMainwhereimportControl.MonadimportData.Time.ClockimportSystem.IOimportSystem.ZMQ4.Monadicmain::IO()main= runZMQ $ do-- Prepare our socket
receiver <- socket Pull
bind receiver "tcp://*:5558"-- Wait for start of batch_<- receive receiver
-- Start our clock now
start_time <- liftIO getCurrentTime
-- Process 100 confirmations
liftIO $ hSetBuffering stdout NoBuffering
forM_ [1..100] $ \i ->do_<- receive receiver
if i `mod` 10 == 0then liftIO $ putStr ":"else liftIO $ putStr "."-- Calculate and report duration of batch
end_time <- liftIO getCurrentTime
liftIO . putStrLn $ "Total elapsed time: " ++ show (diffUTCTime end_time start_time * 1000) ++ " msec"
tasksink: Parallel task sink in Haxe
package ;
importhaxe.io.Bytes;
importneko.Lib;
importneko.Sys;
importorg.zeromq.ZMQ;
importorg.zeromq.ZMQContext;
importorg.zeromq.ZMQSocket;
/**
* Task sink in Haxe
* Binds PULL request socket to tcp://localhost:5558
* Collects results from workers via this socket
*
* See: http://zguide.zeromq.org/page:all#Divide-and-Conquer
*
* Based on http://zguide.zeromq.org/java:tasksink
*
* Use with TaskVent.hx and TaskWork.hx
*/class TaskSink
{
publicstaticfunctionmain() {
var context:ZMQContext = ZMQContext.instance();
Lib.println("** TaskSink (see: http://zguide.zeromq.org/page:all#Divide-and-Conquer)");
// Socket to receive messages onvar receiver:ZMQSocket = context.socket(ZMQ_PULL);
receiver.bind("tcp://127.0.0.1:5558");
// Wait for start of batchvar msgString = StringTools.trim(receiver.recvMsg().toString());
// Start our clock nowvar tStart = Sys.time();
// Process 100 messagesvar task_nbr:Int;
for (task_nbr in0 ... 100) {
msgString = StringTools.trim(receiver.recvMsg().toString());
if (task_nbr % 10 == 0) {
Lib.println(":"); // Print a ":" every 10 messages
} else {
Lib.print(".");
}
}
// Calculate and report duation of batchvar tEnd = Sys.time();
Lib.println("Total elapsed time: " + Math.ceil((tEnd - tStart) * 1000) + " msec");
receiver.close();
context.term();
}
}
tasksink: Parallel task sink in Java
packageguide;
importorg.zeromq.SocketType;
importorg.zeromq.ZMQ;
importorg.zeromq.ZContext;
//
// Task sink in Java
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
//
publicclasstasksink
{
publicstaticvoidmain(String[] args) throws Exception
{
// Prepare our context and socket
try (ZContext context = new ZContext()) {
ZMQ.Socket receiver = context.createSocket(SocketType.PULL);
receiver.bind("tcp://*:5558");
// Wait for start of batch
String string = new String(receiver.recv(0), ZMQ.CHARSET);
// Start our clock now
long tstart = System.currentTimeMillis();
// Process 100 confirmations
int task_nbr;
int total_msec = 0; // Total calculated cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
string = new String(receiver.recv(0), ZMQ.CHARSET).trim();
if ((task_nbr / 10) * 10 == task_nbr) {
System.out.print(":");
}
else {
System.out.print(".");
}
}
// Calculate and report duration of batch
long tend = System.currentTimeMillis();
System.out.println(
"\nTotal elapsed time: " + (tend - tstart) + " msec"
);
}
}
}
tasksink: Parallel task sink in Julia
#!/usr/bin/env julia## Task sink# Binds PULL socket to tcp://localhost:5558# Collects results from workers via that socket#using ZMQ
using Dates
context = Context()
# Socket to receive messages on
receiver = Socket(context, PULL)
bind(receiver, "tcp://*:5558")
# Wait for start of batch
s = recv(receiver)
# Start our tic toc clock
tstart = now()
# Process 100 confirmationsfor task_nbr in1:100
s = recv(receiver)
if task_nbr % 10 == 0
write(stdout, ":")
else
write(stdout, ".")
end
flush(stdout)
end# Calculate and report duration of batch
tend = now()
elapsed = tend - tstart
println("\nTotal elapsed time: $(elapsed * 1000) msec")
# Making a clean exit.
close(receiver)
close(context)
tasksink: Parallel task sink in Lua
---- Task sink-- Binds PULL socket to tcp://localhost:5558-- Collects results from workers via that socket---- Author: Robert G. Jakabosky <bobby@sharedrealm.com>--
require"zmq"
require"zhelpers"local fmod = math.fmod
-- Prepare our context and socketlocal context = zmq.init(1)
local receiver = context:socket(zmq.PULL)
receiver:bind("tcp://*:5558")
-- Wait for start of batchlocal msg = receiver:recv()
-- Start our clock nowlocal start_time = s_clock ()
-- Process 100 confirmationslocal task_nbr
for task_nbr=0,99dolocal msg = receiver:recv()
if (fmod(task_nbr, 10) == 0) then
printf (":")
else
printf (".")
end
io.stdout:flush()
end-- Calculate and report duration of batch
printf("Total elapsed time: %d msec\n", (s_clock () - start_time))
receiver:close()
context:term()
tasksink: Parallel task sink in Node.js
// Task sink in node.js
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket.
var zmq = require('zeromq')
, receiver = zmq.socket('pull');
var started = false
, i = 0
, label = "Total elapsed time";
receiver.on('message', function() {
// wait for start of batch
if (!started) {
console.time(label);
started = true;
// process 100 confirmations
} else {
i += 1;
process.stdout.write(i % 10 === 0 ? ':' : '.');
if (i === 100) {
console.timeEnd(label);
receiver.close();
process.exit();
}
}
});
receiver.bindSync("tcp://*:5558");
tasksink: Parallel task sink in Objective-C
/* tasksink.m: PULLs workers' results from tcp://localhost:5558/. *//* You can wire up the vent, workers, and sink like so:
* $ ./tasksink &
* $ ./taskwork & # Repeat this as many times as you want workers.
* $ ./taskvent &
*/#import <Foundation/Foundation.h>
#import "ZMQObjC.h"
#import <sys/time.h>
#define NSEC_PER_MSEC (1000000)
#define MSEC_PER_SEC (1000)
intmain(void)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init];
/* Prepare context and socket. */
ZMQContext *ctx = [[[ZMQContext alloc] initWithIOThreads:1U] autorelease];
ZMQSocket *pull = [ctx socketWithType:ZMQ_PULL];
[pull bindToEndpoint:@"tcp://*:5558"];
/* Wait for batch start. *//* Cast result to void because we don't actually care about the value.
* The return value has been autoreleased, so no memory is leaked. */
(void)[pull receiveDataWithFlags:0];
/* Start clock. */struct timeval tstart, tdiff, tend;
(void)gettimeofday(&tstart, NULL);
/* Process |kTaskCount| confirmations. */staticconstint kTaskCount = 100;
for (int task = 0; task < kTaskCount; ++task) {
NSAutoreleasePool *p = [[NSAutoreleasePool alloc] init];
(void)[pull receiveDataWithFlags:0];
BOOL isMultipleOfTen = (0 == (task % 10));
if (isMultipleOfTen) {
fputs(":", stdout);
} else {
fputs(".", stdout);
}
fflush(stdout);
[p drain];
}
fputc('\n', stdout);
/* Stop clock. */
(void)gettimeofday(&tend, NULL);
/* Calculate the difference. */
tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;
tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;
if (tdiff.tv_usec < 0) {
tdiff.tv_sec -= 1;
tdiff.tv_usec += NSEC_PER_SEC;
}
/* Convert it to milliseconds. */unsignedlong totalMsec = tdiff.tv_sec * MSEC_PER_SEC
+ tdiff.tv_usec / NSEC_PER_MSEC;
NSLog(@"Total elapsed time: %lu ms", totalMsec);
[ctx closeSockets];
[pool drain];
return EXIT_SUCCESS;
}
The average cost of a batch is 5 seconds. When we start 1, 2, or 4 workers we get results like this from the sink:
1 worker: total elapsed time: 5034 msecs.
2 workers: total elapsed time: 2421 msecs.
4 workers: total elapsed time: 1018 msecs.
Let’s look at some aspects of this code in more detail:
The workers connect upstream to the ventilator, and downstream to the sink. This means you can add workers arbitrarily. If the workers bound to their endpoints, you would need (a) more endpoints and (b) to modify the ventilator and/or the sink each time you added a worker. We say that the ventilator and sink are stable parts of our architecture and the workers are dynamic parts of it.
We have to synchronize the start of the batch with all workers being up and running. This is a fairly common gotcha in ZeroMQ and there is no easy solution. The zmq_connect method takes a certain time. So when a set of workers connect to the ventilator, the first one to successfully connect will get a whole load of messages in that short time while the others are also connecting. If you don’t synchronize the start of the batch somehow, the system won’t run in parallel at all. Try removing the wait in the ventilator, and see what happens.
The ventilator’s PUSH socket distributes tasks to workers (assuming they are all connected before the batch starts going out) evenly. This is called load balancing and it’s something we’ll look at again in more detail.
The sink’s PULL socket collects results from workers evenly. This is called fair-queuing.
The pipeline pattern also exhibits the “slow joiner” syndrome, leading to accusations that PUSH sockets don’t load balance properly. If you are using PUSH and PULL, and one of your workers gets way more messages than the others, it’s because that PULL socket has joined faster than the others, and grabs a lot of messages before the others manage to connect. If you want proper load balancing, you probably want to look at the load balancing pattern in
Chapter 3 - Advanced Request-Reply Patterns.
Having seen some examples, you must be eager to start using ZeroMQ in some apps. Before you start that, take a deep breath, chillax, and reflect on some basic advice that will save you much stress and confusion.
Learn ZeroMQ step-by-step. It’s just one simple API, but it hides a world of possibilities. Take the possibilities slowly and master each one.
Write nice code. Ugly code hides problems and makes it hard for others to help you. You might get used to meaningless variable names, but people reading your code won’t. Use names that are real words, that say something other than “I’m too careless to tell you what this variable is really for”. Use consistent indentation and clean layout. Write nice code and your world will be more comfortable.
Test what you make as you make it. When your program doesn’t work, you should know what five lines are to blame. This is especially true when you do ZeroMQ magic, which just won’t work the first few times you try it.
When you find that things don’t work as expected, break your code into pieces, test each one, see which one is not working. ZeroMQ lets you make essentially modular code; use that to your advantage.
Make abstractions (classes, methods, whatever) as you need them. If you copy/paste a lot of code, you’re going to copy/paste errors, too.
ZeroMQ applications always start by creating a context, and then using that for creating sockets. In C, it’s the zmq_ctx_new() call. You should create and use exactly one context in your process. Technically, the context is the container for all sockets in a single process, and acts as the transport for inproc sockets, which are the fastest way to connect threads in one process. If at runtime a process has two contexts, these are like separate ZeroMQ instances. If that’s explicitly what you want, OK, but otherwise remember:
If you’re using the fork() system call, do zmq_ctx_new()after the fork and at the beginning of the child process code. In general, you want to do interesting (ZeroMQ) stuff in the children, and boring process management in the parent.
Classy programmers share the same motto as classy hit men: always clean-up when you finish the job. When you use ZeroMQ in a language like Python, stuff gets automatically freed for you. But when using C, you have to carefully free objects when you’re finished with them or else you get memory leaks, unstable applications, and generally bad karma.
Memory leaks are one thing, but ZeroMQ is quite finicky about how you exit an application. The reasons are technical and painful, but the upshot is that if you leave any sockets open, the zmq_ctx_destroy() function will hang forever. And even if you close all sockets, zmq_ctx_destroy() will by default wait forever if there are pending connects or sends unless you set the LINGER to zero on those sockets before closing them.
The ZeroMQ objects we need to worry about are messages, sockets, and contexts. Luckily it’s quite simple, at least in simple programs:
Use zmq_send() and zmq_recv() when you can, as it avoids the need to work with zmq_msg_t objects.
If you do use zmq_msg_recv(), always release the received message as soon as you’re done with it, by calling zmq_msg_close().
If you are opening and closing a lot of sockets, that’s probably a sign that you need to redesign your application. In some cases socket handles won’t be freed until you destroy the context.
When you exit the program, close your sockets and then call zmq_ctx_destroy(). This destroys the context.
This is at least the case for C development. In a language with automatic object destruction, sockets and contexts will be destroyed as you leave the scope. If you use exceptions you’ll have to do the clean-up in something like a “final” block, the same as for any resource.
If you’re doing multithreaded work, it gets rather more complex than this. We’ll get to multithreading in the next chapter, but because some of you will, despite warnings, try to run before you can safely walk, below is the quick and dirty guide to making a clean exit in a multithreaded ZeroMQ application.
First, do not try to use the same socket from multiple threads. Please don’t explain why you think this would be excellent fun, just please don’t do it. Next, you need to shut down each socket that has ongoing requests. The proper way is to set a low LINGER value (1 second), and then close the socket. If your language binding doesn’t do this for you automatically when you destroy a context, I’d suggest sending a patch.
Finally, destroy the context. This will cause any blocking receives or polls or sends in attached threads (i.e., which share the same context) to return with an error. Catch that error, and then set linger on, and close sockets in that thread, and exit. Do not destroy the same context twice. The zmq_ctx_destroy in the main thread will block until all sockets it knows about are safely closed.
Voila! It’s complex and painful enough that any language binding author worth his or her salt will do this automatically and make the socket closing dance unnecessary.
Now that you’ve seen ZeroMQ in action, let’s go back to the “why”.
Many applications these days consist of components that stretch across some kind of network, either a LAN or the Internet. So many application developers end up doing some kind of messaging. Some developers use message queuing products, but most of the time they do it themselves, using TCP or UDP. These protocols are not hard to use, but there is a great difference between sending a few bytes from A to B, and doing messaging in any kind of reliable way.
Let’s look at the typical problems we face when we start to connect pieces using raw TCP. Any reusable messaging layer would need to solve all or most of these:
How do we handle I/O? Does our application block, or do we handle I/O in the background? This is a key design decision. Blocking I/O creates architectures that do not scale well. But background I/O can be very hard to do right.
How do we handle dynamic components, i.e., pieces that go away temporarily? Do we formally split components into “clients” and “servers” and mandate that servers cannot disappear? What then if we want to connect servers to servers? Do we try to reconnect every few seconds?
How do we represent a message on the wire? How do we frame data so it’s easy to write and read, safe from buffer overflows, efficient for small messages, yet adequate for the very largest videos of dancing cats wearing party hats?
How do we handle messages that we can’t deliver immediately? Particularly, if we’re waiting for a component to come back online? Do we discard messages, put them into a database, or into a memory queue?
Where do we store message queues? What happens if the component reading from a queue is very slow and causes our queues to build up? What’s our strategy then?
How do we handle lost messages? Do we wait for fresh data, request a resend, or do we build some kind of reliability layer that ensures messages cannot be lost? What if that layer itself crashes?
What if we need to use a different network transport. Say, multicast instead of TCP unicast? Or IPv6? Do we need to rewrite the applications, or is the transport abstracted in some layer?
How do we route messages? Can we send the same message to multiple peers? Can we send replies back to an original requester?
How do we write an API for another language? Do we re-implement a wire-level protocol or do we repackage a library? If the former, how can we guarantee efficient and stable stacks? If the latter, how can we guarantee interoperability?
How do we represent data so that it can be read between different architectures? Do we enforce a particular encoding for data types? How far is this the job of the messaging system rather than a higher layer?
How do we handle network errors? Do we wait and retry, ignore them silently, or abort?
Take a typical open source project like
Hadoop Zookeeper and read the C API code in src/c/src/zookeeper.c. When I read this code, in January 2013, it was 4,200 lines of mystery and in there is an undocumented, client/server network communication protocol. I see it’s efficient because it uses poll instead of select. But really, Zookeeper should be using a generic messaging layer and an explicitly documented wire level protocol. It is incredibly wasteful for teams to be building this particular wheel over and over.
But how to make a reusable messaging layer? Why, when so many projects need this technology, are people still doing it the hard way by driving TCP sockets in their code, and solving the problems in that long list over and over?
It turns out that building reusable messaging systems is really difficult, which is why few FOSS projects ever tried, and why commercial messaging products are complex, expensive, inflexible, and brittle. In 2006, iMatix designed
AMQP which started to give FOSS developers perhaps the first reusable recipe for a messaging system. AMQP works better than many other designs,
but remains relatively complex, expensive, and brittle. It takes weeks to learn to use, and months to create stable architectures that don’t crash when things get hairy.
Most messaging projects, like AMQP, that try to solve this long list of problems in a reusable way do so by inventing a new concept, the “broker”, that does addressing, routing, and queuing. This results in a client/server protocol or a set of APIs on top of some undocumented protocol that allows applications to speak to this broker. Brokers are an excellent thing in reducing the complexity of large networks. But adding broker-based messaging to a product like Zookeeper would make it worse, not better. It would mean adding an additional big box, and a new single point of failure. A broker rapidly becomes a bottleneck and a new risk to manage. If the software supports it, we can add a second, third, and fourth broker and make some failover scheme. People do this. It creates more moving pieces, more complexity, and more things to break.
And a broker-centric setup needs its own operations team. You literally need to watch the brokers day and night, and beat them with a stick when they start misbehaving. You need boxes, and you need backup boxes, and you need people to manage those boxes. It is only worth doing for large applications with many moving pieces, built by several teams of people over several years.
So small to medium application developers are trapped. Either they avoid network programming and make monolithic applications that do not scale. Or they jump into network programming and make brittle, complex applications that are hard to maintain. Or they bet on a messaging product, and end up with scalable applications that depend on expensive, easily broken technology. There has been no really good choice, which is maybe why messaging is largely stuck in the last century and stirs strong emotions: negative ones for users, gleeful joy for those selling support and licenses.
What we need is something that does the job of messaging, but does it in such a simple and cheap way that it can work in any application, with close to zero cost. It should be a library which you just link, without any other dependencies. No additional moving pieces, so no additional risk. It should run on any OS and work with any programming language.
And this is ZeroMQ: an efficient, embeddable library that solves most of the problems an application needs to become nicely elastic across a network, without much cost.
Specifically:
It handles I/O asynchronously, in background threads. These communicate with application threads using lock-free data structures, so concurrent ZeroMQ applications need no locks, semaphores, or other wait states.
Components can come and go dynamically and ZeroMQ will automatically reconnect. This means you can start components in any order. You can create “service-oriented architectures” (SOAs) where services can join and leave the network at any time.
It queues messages automatically when needed. It does this intelligently, pushing messages as close as possible to the receiver before queuing them.
It has ways of dealing with over-full queues (called “high water mark”). When a queue is full, ZeroMQ automatically blocks senders, or throws away messages, depending on the kind of messaging you are doing (the so-called “pattern”).
It lets your applications talk to each other over arbitrary transports: TCP, multicast, in-process, inter-process. You don’t need to change your code to use a different transport.
It handles slow/blocked readers safely, using different strategies that depend on the messaging pattern.
It lets you route messages using a variety of patterns such as request-reply and pub-sub. These patterns are how you create the topology, the structure of your network.
It lets you create proxies to queue, forward, or capture messages with a single call. Proxies can reduce the interconnection complexity of a network.
It delivers whole messages exactly as they were sent, using a simple framing on the wire. If you write a 10k message, you will receive a 10k message.
It does not impose any format on messages. They are blobs from zero to gigabytes large. When you want to represent data you choose some other product on top, such as msgpack, Google’s protocol buffers, and others.
It handles network errors intelligently, by retrying automatically in cases where it makes sense.
It reduces your carbon footprint. Doing more with less CPU means your boxes use less power, and you can keep your old boxes in use for longer. Al Gore would love ZeroMQ.
Actually ZeroMQ does rather more than this. It has a subversive effect on how you develop network-capable applications. Superficially, it’s a socket-inspired API on which you do zmq_recv() and zmq_send(). But message processing rapidly becomes the central loop, and your application soon breaks down into a set of message processing tasks. It is elegant and natural. And it scales: each of these tasks maps to a node, and the nodes talk to each other across arbitrary transports. Two nodes in one process (node is a thread), two nodes on one box (node is a process), or two nodes on one network (node is a box)–it’s all the same, with no application code changes.
As the clients run, we take a look at the active processes using the top command’, and we see something like (on a 4-core box):
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
7136 ph 20 0 1040m 959m 1156 R 157 12.0 16:25.47 wuserver
7966 ph 20 0 98608 1804 1372 S 33 0.0 0:03.94 wuclient
7963 ph 20 0 33116 1748 1372 S 14 0.0 0:00.76 wuclient
7965 ph 20 0 33116 1784 1372 S 6 0.0 0:00.47 wuclient
7964 ph 20 0 33116 1788 1372 S 5 0.0 0:00.25 wuclient
7967 ph 20 0 33072 1740 1372 S 5 0.0 0:00.35 wuclient
Let’s think for a second about what is happening here. The weather server has a single socket, and yet here we have it sending data to five clients in parallel. We could have thousands of concurrent clients. The server application doesn’t see them, doesn’t talk to them directly. So the ZeroMQ socket is acting like a little server, silently accepting client requests and shoving data out to them as fast as the network can handle it. And it’s a multithreaded server, squeezing more juice out of your CPU.
These changes don’t impact existing application code directly:
Pub-sub filtering is now done at the publisher side instead of subscriber side. This improves performance significantly in many pub-sub use cases. You can mix v3.2 and v2.1/v2.2 publishers and subscribers safely.
These are the main areas of impact on applications and language bindings:
Changed send/recv methods: zmq_send() and zmq_recv() have a different, simpler interface, and the old functionality is now provided by zmq_msg_send() and zmq_msg_recv(). Symptom: compile errors. Solution: fix up your code.
These two methods return positive values on success, and -1 on error. In v2.x they always returned zero on success. Symptom: apparent errors when things actually work fine. Solution: test strictly for return code = -1, not non-zero.
zmq_poll() now waits for milliseconds, not microseconds. Symptom: application stops responding (in fact responds 1000 times slower). Solution: use the ZMQ_POLL_MSEC macro defined below, in all zmq_poll calls.
ZMQ_NOBLOCK is now called ZMQ_DONTWAIT. Symptom: compile failures on the ZMQ_NOBLOCK macro.
The ZMQ_HWM socket option is now broken into ZMQ_SNDHWM and ZMQ_RCVHWM. Symptom: compile failures on the ZMQ_HWM macro.
Most but not all zmq_getsockopt() options are now integer values. Symptom: runtime error returns on zmq_setsockopt and zmq_getsockopt.
The ZMQ_SWAP option has been removed. Symptom: compile failures on ZMQ_SWAP. Solution: redesign any code that uses this functionality.
For applications that want to run on both v2.x and v3.2, such as language bindings, our advice is to emulate v3.2 as far as possible. Here are C macro definitions that help your C/C++ code to work across both versions (taken from
CZMQ):
Traditional network programming is built on the general assumption that one socket talks to one connection, one peer. There are multicast protocols, but these are exotic. When we assume “one socket = one connection”, we scale our architectures in certain ways. We create threads of logic where each thread work with one socket, one peer. We place intelligence and state in these threads.
In the ZeroMQ universe, sockets are doorways to fast little background communications engines that manage a whole set of connections automagically for you. You can’t see, work with, open, close, or attach state to these connections. Whether you use blocking send or receive, or poll, all you can talk to is the socket, not the connections it manages for you. The connections are private and invisible, and this is the key to ZeroMQ’s scalability.
This is because your code, talking to a socket, can then handle any number of connections across whatever network protocols are around, without change. A messaging pattern sitting in ZeroMQ scales more cheaply than a messaging pattern sitting in your application code.
So the general assumption no longer applies. As you read the code examples, your brain will try to map them to what you know. You will read “socket” and think “ah, that represents a connection to another node”. That is wrong. You will read “thread” and your brain will again think, “ah, a thread represents a connection to another node”, and again your brain will be wrong.
If you’re reading this Guide for the first time, realize that until you actually write ZeroMQ code for a day or two (and maybe three or four days), you may feel confused, especially by how simple ZeroMQ makes things for you, and you may try to impose that general assumption on ZeroMQ, and it won’t work. And then you will experience your moment of enlightenment and trust, that zap-pow-kaboom satori paradigm-shift moment when it all becomes clear.