Key-value message class: full in Python


kvmsg - key-value message class for example applications

Author: Min RK <moc.liamg|krnimajneb#moc.liamg|krnimajneb>


import struct # for packing integers
import sys
from uuid import uuid4

import zmq
# zmq.jsonapi ensures bytes, instead of unicode:

def encode_properties(properties_dict):
prop_s = b""
for key, value in properties_dict.items():
prop_s += b"%s=%s\n" % (key, value)
return prop_s

def decode_properties(prop_s):
prop = {}
line_array = prop_s.split(b"\n")

for line in line_array:
key, value = line.split(b"=")
prop[key] = value
except ValueError as e:
#Catch empty line

return prop

class KVMsg(object):
Message is formatted on wire as 5 frames:
frame 0: key (0MQ string)
frame 1: sequence (8 bytes, network order)
frame 2: uuid (blob, 16 bytes)
frame 3: properties (0MQ string)
frame 4: body (blob)

key = None
sequence = 0
properties = None
body = None

def __init__(self, sequence, uuid=None, key=None, properties=None, body=None):
assert isinstance(sequence, int)
self.sequence = sequence
if uuid is None:
uuid = uuid4().bytes
self.uuid = uuid
self.key = key = {} if properties is None else properties
self.body = body

# dictionary access maps to properties:
def __getitem__(self, k):

def __setitem__(self, k, v):[k] = v

def get(self, k, default=None):
return, default)

def store(self, dikt):
"""Store me in a dict if I have anything to store
else delete me from the dict."""

if self.key is not None and self.body is not None:
dikt[self.key] = self
elif self.key in dikt:
del dikt[self.key]

def send(self, socket):
"""Send key-value message to socket; any empty frames are sent as such."""
key = b'' if self.key is None else self.key
seq_s = struct.pack('!q', self.sequence)
body = b'' if self.body is None else self.body
prop_s = encode_properties(
socket.send_multipart([ key, seq_s, self.uuid, prop_s, body ])

def recv(cls, socket):
"""Reads key-value message from socket, returns new kvmsg instance."""
return cls.from_msg(socket.recv_multipart())

def from_msg(cls, msg):
"""Construct key-value message from a multipart message"""
key, seq_s, uuid, prop_s, body = msg
key = key if key else None
seq = struct.unpack('!q',seq_s)[0]
body = body if body else None
prop = decode_properties(prop_s)
return cls(seq, uuid=uuid, key=key, properties=prop, body=body)

def __repr__(self):
if self.body is None:
size = 0
size = len(self.body)
data = repr(self.body)

mstr = "[seq:{seq}][key:{key}][size:{size}][props:{props}][data:{data}]".format(
# uuid=hexlify(self.uuid),
return mstr

def dump(self):
print("@<@@"[[/span]], [[span style="color:#008000"]]str[[/span]]([[span style="color:#008000"]]self[[/span]]), [[span style="color:#BA2121"]]"@@>@", file=sys.stderr)
# ---------------------------------------------------------------------
# Runs self test of class

def test_kvmsg (verbose):
print(" * kvmsg: ", end='')

# Prepare our context and sockets
ctx = zmq.Context()
output = ctx.socket(zmq.DEALER)
input = ctx.socket(zmq.DEALER)

kvmap = {}
# Test send and receive of simple message
kvmsg = KVMsg(1)
kvmsg.key = b"key"
kvmsg.body = b"body"
if verbose:

kvmsg2 = KVMsg.recv(input)
if verbose:
assert kvmsg2.key == b"key"

assert len(kvmap) == 1 # shouldn't be different

# test send/recv with properties:
kvmsg = KVMsg(2, key=b"key", body=b"body")
kvmsg[b"prop1"] = b"value1"
kvmsg[b"prop2"] = b"value2"
kvmsg[b"prop3"] = b"value3"
assert kvmsg[b"prop1"] == b"value1"
if verbose:
kvmsg2 = KVMsg.recv(input)
if verbose:
# ensure properties were preserved
assert kvmsg2.key == kvmsg.key
assert kvmsg2.body == kvmsg.body
assert ==
assert kvmsg2[b"prop2"] == kvmsg[b"prop2"]


if __name__ == '__main__':
test_kvmsg('-v' in sys.argv)