Titanic broker example in Java

import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;
import org.zeromq.ZThread;
import org.zeromq.ZThread.IAttachedRunnable;
import org.zeromq.ZThread.IDetachedRunnable;

import java.io.BufferedWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.UUID;

public class titanic
{
// Return a new UUID as a printable character string
// Caller must free returned string when finished with it

static String generateUUID()
{
return UUID.randomUUID().toString();
}

private static final String TITANIC_DIR = ".titanic";

// Returns freshly allocated request filename for given UUID
private static String requestFilename(String uuid) {
String filename = String.format("%s/%s.req", TITANIC_DIR, uuid);
return filename;
}

// Returns freshly allocated reply filename for given UUID
private static String replyFilename(String uuid) {
String filename = String.format("%s/%s.rep", TITANIC_DIR, uuid);
return filename;
}

// The titanic.request task waits for requests to this service. It writes
// each request to disk and returns a UUID to the client. The client picks
// up the reply asynchronously using the titanic.reply service:

static class TitanicRequest implements IAttachedRunnable
{

@Override
public void run(Object[] args, ZContext ctx, Socket pipe)
{
mdwrkapi worker = new mdwrkapi(
"tcp://localhost:5555", "titanic.request", false);
ZMsg reply = null;

while (true) {
// Send reply if it's not null
// And then get next request from broker
ZMsg request = worker.receive(reply);
if (request == null)
break; // Interrupted, exit

// Ensure message directory exists
new File(TITANIC_DIR).mkdirs();

// Generate UUID and save message to disk
String uuid = generateUUID();
String filename = requestFilename(uuid);
DataOutputStream file = null;
try {
file = new DataOutputStream(new FileOutputStream(filename));
ZMsg.save(request, file);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (file != null)
file.close();
} catch (IOException e) {
}
}
request.destroy();

// Send UUID through to message queue
reply = new ZMsg();
reply.add(uuid);
reply.send(pipe);

// Now send UUID back to client
// Done by the mdwrk_recv() at the top of the loop
reply = new ZMsg();
reply.add("200");
reply.add(uuid);
}
worker.destroy();
}
}
// The titanic.reply task checks if there's a reply for the specified
// request (by UUID), and returns a 200 (OK), 300 (Pending), or 400
// (Unknown) accordingly:

static class TitanicReply implements IDetachedRunnable
{

@Override
public void run(Object[] args)
{
mdwrkapi worker = new mdwrkapi(
"tcp://localhost:5555", "titanic.reply", false);
ZMsg reply = null;

while (true) {
ZMsg request = worker.receive(reply);
if (request == null)
break; // Interrupted, exit

String uuid = request.popString();
String reqFilename = requestFilename(uuid);
String repFilename = replyFilename(uuid);

if (new File(repFilename).exists()) {
DataInputStream file = null;
try {
file = new DataInputStream(new FileInputStream(repFilename));
reply = ZMsg.load(file);
reply.push("200");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (file != null)
file.close();
} catch (IOException e) {
}
}
}
else {
reply = new ZMsg();
if (new File(reqFilename).exists())
reply.push("300"); //Pending
else
reply.push("400"); //Unknown
}
request.destroy();
}
worker.destroy();
}
}

// The titanic.close task removes any waiting replies for the request
// (specified by UUID). It's idempotent, so it is safe to call more than
// once in a row:
static class TitanicClose implements IDetachedRunnable
{
@Override
public void run(Object[] args)
{
mdwrkapi worker = new mdwrkapi(
"tcp://localhost:5555", "titanic.close", false);
ZMsg reply = null;

while (true) {
ZMsg request = worker.receive(reply);
if (request == null)
break; // Interrupted, exit

String uuid = request.popString();
String req_filename = requestFilename(uuid);
String rep_filename = replyFilename(uuid);
new File(rep_filename).delete();
new File(req_filename).delete();

request.destroy();
reply = new ZMsg();
reply.add("200");
}
worker.destroy();
}
}

// This is the main thread for the Titanic worker. It starts three child
// threads; for the request, reply, and close services. It then dispatches
// requests to workers using a simple brute force disk queue. It receives
// request UUIDs from the titanic.request service, saves these to a disk
// file, and then throws each request at MDP workers until it gets a
// response.

public static void main(String[] args)
{
boolean verbose = (args.length > 0 && "-v".equals(args[0]));

ZContext ctx = new ZContext();

Socket requestPipe = ZThread.fork(ctx, new TitanicRequest());
ZThread.start(new TitanicReply());
ZThread.start(new TitanicClose());

// Main dispatcher loop
while (true) {
// We'll dispatch once per second, if there's no activity
PollItem items [] = { new PollItem(requestPipe, ZMQ.Poller.POLLIN) };
int rc = ZMQ.poll(items, 1, 1000);
if (rc == -1)
break; // Interrupted
if (items [0].isReadable()) {
// Ensure message directory exists
new File(TITANIC_DIR).mkdirs();

// Append UUID to queue, prefixed with '-' for pending
ZMsg msg = ZMsg.recvMsg(requestPipe);
if (msg == null)
break; // Interrupted
String uuid = msg.popString();
BufferedWriter wfile = null;
try {
wfile = new BufferedWriter(new FileWriter(TITANIC_DIR + "/queue", true));
wfile.write("-" + uuid + "\n");
} catch (IOException e) {
e.printStackTrace();
break;
} finally {
try {
if (wfile != null)
wfile.close();
} catch (IOException e) {
}
}
msg.destroy();
}
// Brute force dispatcher
byte[] entry = new byte[37]; //"?……..:….:….:….:…………:";
RandomAccessFile file = null;
try {
file = new RandomAccessFile(TITANIC_DIR + "/queue", "rw");
while (file.read(entry) > 0) {

// UUID is prefixed with '-' if still waiting
if (entry[0] == '-') {
if (verbose)
System.out.printf("I: processing request %s\n", new String(entry, 1, entry.length -1));
if (serviceSuccess(new String(entry, 1, entry.length -1))) {
// Mark queue entry as processed
file.seek(file.getFilePointer() - 37);
file.writeBytes("+");
file.seek(file.getFilePointer() + 36);
}
}
// Skip end of line, LF or CRLF
if (file.readByte() == '\r')
file.readByte();
if (Thread.currentThread().isInterrupted())
break;
}
} catch (FileNotFoundException e) {
} catch (IOException e) {
e.printStackTrace();
} finally {
if (file != null) {
try {
file.close();
} catch (IOException e) {
}
}
}
}
}
// Here, we first check if the requested MDP service is defined or not,
// using a MMI lookup to the Majordomo broker. If the service exists,
// we send a request and wait for a reply using the conventional MDP
// client API. This is not meant to be fast, just very simple:

static boolean serviceSuccess(String uuid)
{
// Load request message, service will be first frame
String filename = requestFilename(uuid);

// If the client already closed request, treat as successful
if (!new File(filename).exists())
return true;

DataInputStream file = null;
ZMsg request;
try {
file = new DataInputStream(new FileInputStream(filename));
request = ZMsg.load(file);
} catch (IOException e) {
e.printStackTrace();
return true;
} finally {
try {
if (file != null)
file.close();
} catch (IOException e) {
}
}
ZFrame service = request.pop();
String serviceName = service.toString();

// Create MDP client session with short timeout
mdcliapi client = new mdcliapi("tcp://localhost:5555", false);
client.setTimeout(1000); // 1 sec
client.setRetries(1); // only 1 retry

// Use MMI protocol to check if service is available
ZMsg mmiRequest = new ZMsg();
mmiRequest.add(service);
ZMsg mmiReply = client.send("mmi.service", mmiRequest);
boolean serviceOK = (mmiReply != null
&& mmiReply.getFirst().toString().equals("200"));

if(mmiReply != null)
mmiReply.destroy();

boolean result = false;
if (serviceOK) {
ZMsg reply = client.send(serviceName, request);
if (reply != null) {
filename = replyFilename(uuid);
DataOutputStream ofile = null;
try {
ofile = new DataOutputStream(new FileOutputStream(filename));
ZMsg.save(reply, ofile);
} catch (IOException e) {
e.printStackTrace();
return true;
} finally {
try {
if (file != null)
file.close();
} catch (IOException e) {
}
}
result = true;
reply.destroy();
}

}
else
request.destroy();

client.destroy();
return result;
}
}