Majordomo worker API in Go

// mdwrkapi class - Majordomo Protocol Worker API
// Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
//
// Author: iano <moc.liamg|onai.ylacs#moc.liamg|onai.ylacs>
// Based on C & Python example

package main

import (
zmq "github.com/alecthomas/gozmq"
"log"
"time"
)

type Worker interface {
Close()
Recv([][]byte) [][]byte
}

type mdWorker struct {
broker string
context *zmq.Context
service string
verbose bool
worker *zmq.Socket

heartbeat time.Duration
heartbeatAt time.Time
liveness int
reconnect time.Duration

expectReply bool
replyTo []byte
}

func NewWorker(broker, service string, verbose bool) Worker {
context, _ := zmq.NewContext()
self := &mdWorker{
broker: broker,
context: context,
service: service,
verbose: verbose,
heartbeat: 2500 * time.Millisecond,
liveness: 0,
reconnect: 2500 * time.Millisecond,
}
self.reconnectToBroker()
return self
}

func (self *mdWorker) reconnectToBroker() {
if self.worker != nil {
self.worker.Close()
}
self.worker, _ = self.context.NewSocket(zmq.DEALER)
self.worker.SetLinger(0)
self.worker.Connect(self.broker)
if self.verbose {
log.Printf("I: connecting to broker at %s…\n", self.broker)
}
self.sendToBroker(MDPW_READY, []byte(self.service), nil)
self.liveness = HEARTBEAT_LIVENESS
self.heartbeatAt = time.Now().Add(self.heartbeat)
}

func (self *mdWorker) sendToBroker(command string, option []byte, msg [][]byte) {
if len(option) > 0 {
msg = append([][]byte{option}, msg…)
}

msg = append([][]byte{nil, []byte(MDPW_WORKER), []byte(command)}, msg…)
if self.verbose {
log.Printf("I: sending %X to broker\n", command)
Dump(msg)
}
self.worker.SendMultipart(msg, 0)
}

func (self *mdWorker) Close() {
if self.worker != nil {
self.worker.Close()
}
self.context.Close()
}

func (self *mdWorker) Recv(reply [][]byte) (msg [][]byte) {
// Format and send the reply if we were provided one

if len(reply) == 0 && self.expectReply {
panic("Error reply")
}

if len(reply) > 0 {
if len(self.replyTo) == 0 {
panic("Error replyTo")
}
reply = append([][]byte{self.replyTo, nil}, reply…)
self.sendToBroker(MDPW_REPLY, nil, reply)
}

self.expectReply = true

for {
items := zmq.PollItems{
zmq.PollItem{Socket: self.worker, Events: zmq.POLLIN},
}

_, err := zmq.Poll(items, self.heartbeat)
if err != nil {
panic(err) // Interrupted
}

if item := items[0]; item.REvents&zmq.POLLIN != 0 {
msg, _ = self.worker.RecvMultipart(0)
if self.verbose {
log.Println("I: received message from broker: ")
Dump(msg)
}
self.liveness = HEARTBEAT_LIVENESS
if len(msg) < 3 {
panic("Invalid msg") // Interrupted
}

header := msg[1]
if string(header) != MDPW_WORKER {
panic("Invalid header") // Interrupted
}

switch command := string(msg[2]); command {
case MDPW_REQUEST:
// We should pop and save as many addresses as there are
// up to a null part, but for now, just save one…
self.replyTo = msg[3]
msg = msg[5:]
return
case MDPW_HEARTBEAT:
// do nothin
case MDPW_DISCONNECT:
self.reconnectToBroker()
default:
log.Println("E: invalid input message:")
Dump(msg)
}
} else if self.liveness--; self.liveness <= 0 {
if self.verbose {
log.Println("W: disconnected from broker - retrying…")
}
time.Sleep(self.reconnect)
self.reconnectToBroker()
}

// Send HEARTBEAT if it's time
if self.heartbeatAt.Before(time.Now()) {
self.sendToBroker(MDPW_HEARTBEAT, nil, nil)
self.heartbeatAt = time.Now().Add(self.heartbeat)
}
}

return
}