Full cluster simulation in Python

# Broker peering simulation (part 3) in Python
# Prototypes the full flow of status and tasks
# While this example runs in a single process, that is just to make
# it easier to start and stop the example. Each thread has its own
# context and conceptually acts as a separate process.
# Author : Min RK
# Contact: benjaminrk(at)gmail(dot)com
import random
import sys
import threading
import time

import zmq


def asbytes(obj):
s = str(obj)
if str is not bytes:
# Python 3
s = s.encode('ascii')
return s

def client_task(name, i):
"""Request-reply client using REQ socket"""
ctx = zmq.Context()
client = ctx.socket(zmq.REQ)
client.identity = (u"Client-%s-%s" % (name, i)).encode('ascii')
client.connect("ipc://%s-localfe.ipc" % name)
monitor = ctx.socket(zmq.PUSH)
monitor.connect("ipc://%s-monitor.ipc" % name)

poller = zmq.Poller()
poller.register(client, zmq.POLLIN)
while True:
time.sleep(random.randint(0, 5))
for _ in range(random.randint(0, 15)):
# send request with random hex ID
task_id = u"%04X" % random.randint(0, 10000)

# wait max 10 seconds for a reply, then complain
events = dict(poller.poll(10000))
except zmq.ZMQError:
return # interrupted

if events:
reply = client.recv_string()
assert reply == task_id, "expected %s, got %s" % (task_id, reply)
monitor.send_string(u"E: CLIENT EXIT - lost task %s" % task_id)

def worker_task(name, i):
"""Worker using REQ socket to do LRU routing"""
ctx = zmq.Context()
worker = ctx.socket(zmq.REQ)
worker.identity = ("Worker-%s-%s" % (name, i)).encode('ascii')
worker.connect("ipc://%s-localbe.ipc" % name)

# Tell broker we're ready for work

# Process messages as they arrive
while True:
msg = worker.recv_multipart()
except zmq.ZMQError:
# interrupted
# Workers are busy for 0/1 seconds
time.sleep(random.randint(0, 1))

def main(myself, peers):
print("I: preparing broker at %s…" % myself)

# Prepare our context and sockets
ctx = zmq.Context()

# Bind cloud frontend to endpoint
cloudfe = ctx.socket(zmq.ROUTER)
cloudfe.setsockopt(zmq.IDENTITY, myself)
cloudfe.bind("ipc://%s-cloud.ipc" % myself)

# Bind state backend / publisher to endpoint
statebe = ctx.socket(zmq.PUB)
statebe.bind("ipc://%s-state.ipc" % myself)

# Connect cloud and state backends to all peers
cloudbe = ctx.socket(zmq.ROUTER)
statefe = ctx.socket(zmq.SUB)
statefe.setsockopt(zmq.SUBSCRIBE, b"")
cloudbe.setsockopt(zmq.IDENTITY, myself)

for peer in peers:
print("I: connecting to cloud frontend at %s" % peer)
cloudbe.connect("ipc://%s-cloud.ipc" % peer)
print("I: connecting to state backend at %s" % peer)
statefe.connect("ipc://%s-state.ipc" % peer)

# Prepare local frontend and backend
localfe = ctx.socket(zmq.ROUTER)
localfe.bind("ipc://%s-localfe.ipc" % myself)
localbe = ctx.socket(zmq.ROUTER)
localbe.bind("ipc://%s-localbe.ipc" % myself)

# Prepare monitor socket
monitor = ctx.socket(zmq.PULL)
monitor.bind("ipc://%s-monitor.ipc" % myself)

# Get user to tell us when we can start…
# raw_input("Press Enter when all brokers are started: ")

# create workers and clients threads
for i in range(NBR_WORKERS):
thread = threading.Thread(target=worker_task, args=(myself, i))
thread.daemon = True

for i in range(NBR_CLIENTS):
thread_c = threading.Thread(target=client_task, args=(myself, i))
thread_c.daemon = True

# Interesting part
# -------------------------------------------------------------
# Publish-subscribe flow
# - Poll statefe and process capacity updates
# - Each time capacity changes, broadcast new value
# Request-reply flow
# - Poll primary and process local/cloud replies
# - While worker available, route localfe to local or cloud

local_capacity = 0
cloud_capacity = 0
workers = []

# setup backend poller
pollerbe = zmq.Poller()
pollerbe.register(localbe, zmq.POLLIN)
pollerbe.register(cloudbe, zmq.POLLIN)
pollerbe.register(statefe, zmq.POLLIN)
pollerbe.register(monitor, zmq.POLLIN)

while True:
# If we have no workers anyhow, wait indefinitely
events = dict(pollerbe.poll(1000 if local_capacity else None))
except zmq.ZMQError:
break # interrupted

previous = local_capacity
# Handle reply from local worker
msg = None
if localbe in events:
msg = localbe.recv_multipart()
(address, empty), msg = msg[:2], msg[2:]
local_capacity += 1

# If it's READY, don't route the message any further
if msg[-1] == b'READY':
msg = None
elif cloudbe in events:
msg = cloudbe.recv_multipart()
(address, empty), msg = msg[:2], msg[2:]

# We don't use peer broker address for anything

if msg is not None:
address = msg[0]
if address in peers:
# Route reply to cloud if it's addressed to a broker
# Route reply to client if we still need to

# Handle capacity updates
if statefe in events:
peer, s = statefe.recv_multipart()
cloud_capacity = int(s)

# handle monitor message
if monitor in events:

# Now route as many clients requests as we can handle
# - If we have local capacity we poll both localfe and cloudfe
# - If we have cloud capacity only, we poll just localfe
# - Route any request locally if we can, else to cloud
while local_capacity + cloud_capacity:
secondary = zmq.Poller()
secondary.register(localfe, zmq.POLLIN)
if local_capacity:
secondary.register(cloudfe, zmq.POLLIN)
events = dict(secondary.poll(0))

# We'll do peer brokers first, to prevent starvation
if cloudfe in events:
msg = cloudfe.recv_multipart()
elif localfe in events:
msg = localfe.recv_multipart()
break # No work, go back to backends

if local_capacity:
msg = [workers.pop(0), b''] + msg
local_capacity -= 1
# Route to random broker peer
msg = [random.choice(peers), b''] + msg
if local_capacity != previous:
statebe.send_multipart([myself, asbytes(local_capacity)])

if __name__ == '__main__':
if len(sys.argv) >= 2:
myself = asbytes(sys.argv[1])
main(myself, peers=[ asbytes(a) for a in sys.argv[2:] ])
print("Usage: peering3.py <me> [@�2�@]]")