Clone server, Model Five in Python

Clone server Model Five

Author: Min RK <moc.liamg|krnimajneb#moc.liamg|krnimajneb

import logging
import time

import zmq
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream

from kvmsg import KVMsg
from zhelpers import dump

# simple struct for routing information for a key-value snapshot
class Route:
def __init__(self, socket, identity, subtree):
self.socket = socket # ROUTER socket to send to
self.identity = identity # Identity of peer who requested state
self.subtree = subtree # Client subtree specification

def send_single(key, kvmsg, route):
"""Send one state snapshot key-value pair to a socket"""
# check front of key against subscription subtree:
if kvmsg.key.startswith(route.subtree):
# Send identity of recipient first
route.socket.send(route.identity, zmq.SNDMORE)

class CloneServer(object):

# Our server is defined by these properties
ctx = None # Context wrapper
kvmap = None # Key-value store
loop = None # IOLoop reactor
port = None # Main port we're working on
sequence = 0 # How many updates we're at
snapshot = None # Handle snapshot requests
publisher = None # Publish updates to clients
collector = None # Collect updates from clients

def __init__(self, port=5556):
self.port = port
self.ctx = zmq.Context()
self.kvmap = {}
self.loop = IOLoop.instance()

# Set up our clone server sockets
self.snapshot = self.ctx.socket(zmq.ROUTER)
self.publisher = self.ctx.socket(zmq.PUB)
self.collector = self.ctx.socket(zmq.PULL)
self.snapshot.bind("tcp://*:%d" % self.port)
self.publisher.bind("tcp://*:%d" % (self.port + 1))
self.collector.bind("tcp://*:%d" % (self.port + 2))

# Wrap sockets in ZMQStreams for IOLoop handlers
self.snapshot = ZMQStream(self.snapshot)
self.publisher = ZMQStream(self.publisher)
self.collector = ZMQStream(self.collector)

# Register our handlers with reactor
self.flush_callback = PeriodicCallback(self.flush_ttl, 1000)

# basic log formatting:
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",

def start(self):
# Run reactor until process interrupted
except KeyboardInterrupt:

def handle_snapshot(self, msg):
"""snapshot requests"""
if len(msg) != 3 or msg[1] != b"ICANHAZ?":
print("E: bad request, aborting")
identity, request, subtree = msg
if subtree:
# Send state snapshot to client
route = Route(self.snapshot, identity, subtree)

# For each entry in kvmap, send kvmsg to client
for k,v in self.kvmap.items():

# Now send END message with sequence number"I: Sending state shapshot=%d" % self.sequence)
self.snapshot.send(identity, zmq.SNDMORE)
kvmsg = KVMsg(self.sequence)
kvmsg.key = b"KTHXBAI"
kvmsg.body = subtree

def handle_collect(self, msg):
"""Collect updates from clients"""
kvmsg = KVMsg.from_msg(msg)
self.sequence += 1
kvmsg.sequence = self.sequence
ttl = float(kvmsg.get(b'ttl', 0))
if ttl:
kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)"I: publishing update=%d", self.sequence)

def flush_ttl(self):
"""Purge ephemeral values that have expired"""
for key,kvmsg in list(self.kvmap.items()):
# used list() to exhaust the iterator before deleting from the dict

def flush_single(self, kvmsg):
"""If key-value pair has expired, delete it and publish the fact
to listening clients."""

ttl = float(kvmsg.get(b'ttl', 0))
if ttl and ttl <= time.time():
kvmsg.body = b""
self.sequence += 1
kvmsg.sequence = self.sequence
del self.kvmap[kvmsg.key]"I: publishing delete=%d", self.sequence)

def main():
clone = CloneServer()

if __name__ == '__main__':