Pages

Router to router

Using the ZeroMQ router to router socket combination is kind of tricky. I guess that ZMQ_ROUTER socket has been designed thinking to the request-router messaging pattern, where the router is playing as server, and where each client sends an initial message to the server through its REQ socket, identifying itself, so that the router can use that identity when sending its reply.

Implementing the REQ-ROUTER pattern is straightforward. REQ sends its identity to ROUTER, ROUTER uses that identity as part of its reply to REQ.

I had to think a bit more to find out how to write a router to router example.

I have router server and a configurable number of router clients. I want each client sending a single empty message to the server, and then ending. That looks pretty easy, but we should consider that the client needs to know the server identity to send it a message. It is not enough knowing its tcp address.

We risk to fall in an infinite recursion. To send messages we need the recipient identity, but to get the that identity, the should send it to us, but it needs our identity to do that!

A way to get out of this impasse is remembering that a ZeroMQ identity could be explicitly set. So we can avoid a bootstrap paradox predefining the identities to be used by the clients, or the one used by the server.

Now, there are two choices. Here I show a first solution, where the client identities are predefined. In the next post I'll show the case where the server identity is passed around.

The code is written for ZeroMQ 2.2, Windows+MSVC2010, C++11 with STL and Boost, but could easily ported to a different platform.

It is single process 0MQ application. A real application would almost certainly multiprocess, but since I use the tcp protocol for the sockets, it should be easy to refactor this example to work in that way.

By the way, these are the addresses I use for the sockets:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
The main procedure knows about the client identities, and starts the clients and the server:
const char* clients[] = { "ClientA", "ClientB", "ClientC" }; // 1
int nClients = sizeof(clients) / sizeof(const char*);

boost::thread_group threads;
for(int i =0; i < nClients; ++i)
    threads.create_thread(std::bind(rrClientC, clients[i])); // 2

threads.create_thread(std::bind(rrServerC, clients, nClients)); // 3
threads.join_all();
1. We have three clients, and we want them having these identities.
2. Start the clients, each in a new thread, on the rrClientC() function, passing the i-th client identity, that has to be assigned to its socket.
3. Start the server on rrServerC(), passing the client addresses, and the number of clients.

Here is the client code:
void rrClientC(const char* id)
{
    dumpId("client startup"); // 1
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER, id); // 2
    skRouter.connect(SK_ADDR_CLI);
    dumpId(SK_ADDR_CLI, id);

    zmq::Frames input = skRouter.blockingRecv(2); // 3
    dumpId("client received", input[1].c_str());

    skRouter.send(input[0], id); // 4
    dumpId("client shutdown");
}
1. Being in a multithread context, we can't use a shared resource, like std::cout is, without protecting its access by mutex/lock, this is what dumpId() does.
2. Create a ROUTER socket assigning to it the specified identity, then connect it to the server socket.
3. Wait for a multipart message in two frames. We are interested just in the first frame, that contains the server id.
4. Send a two-frames message to the server, the first one is the mandatory recipient id, the second is the payload, here the client id, for testing purpose.

And this is the server:
void rrServerC(const char* ids[], int nClients) // 1
{
    dumpId("server startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER); // 2
    skRouter.bind(SK_ADDR_SRV);

    dumpId("server ready on", SK_ADDR_SRV);
    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 3

    for(int i =0; i < nClients; ++i)
    {
        dumpId("server send to", ids[i]);
        skRouter.send(ids[i], "hello"); // 4
    }

    for(int i =0; i < nClients; ++i)
    {
        zmq::Frames in = skRouter.blockingRecv(2); // 5
        dumpId("receiving from", in[1].c_str());
    }
    dumpId("server shutdown");
}
1. The server needs to know id and numbers of the clients.
2. The server socket id is not important, we can keep the one generated by 0MQ.
3. This is a key point. Before start sending messages through the socket, we should ensure all the clients are already connected. Here I just wait for a while, giving time to the clients to connect. For production code we should think to a most robust solution.
4. Send an "hello" message to each client. First frame is the id of the recipient client.
5. Receive a message from each client. Second frame is the payload.

The complete C++ source code for this and for the next post example is on github.

No comments:

Post a Comment