Pages

Basic DEALER-REP example

I stripped out every possible detail and wrote a minimal ZeroMQ that uses a DEALER-REPLY socket combination. It should be useful to play around with it to see how this pattern works on its own, before combining it in a more complex structure.

The code is based on ØMQ 2.2 for C++, using my zmq::socket_t extension that you can find in an include file on github). The target platform is Windows + Visual C++ 2010. Some STL, Boost, and C++11 features have been used.

The dealer thread creates a few multipart messages, sends them through the socket, and wait for their echo. Each reply thread would get its part of that messages, and send them back to the socket.

There are a few interesting point to notice.

The dealer sends multipart messages, composed by three frames, representing the sender address, a separator, and the actual payload. The reply socket gets just the payload, it has no access to the address (an nor to the separator).

The dealer performs a fair load balancing among the REP sockets connected to it, so we expect each client to get the same share of messages. To let the example run as expected, we should ensure that all the clients are connected to the server before the messages begin to be sent on the socket, otherwise only the already connected ones would get their share.

The clients and server are created by this code:
boost::thread_group threads;
for(int i = 0; i < nRep; ++i) // 1
    threads.create_thread(std::bind(rep2dealer, nMsg)); // 2
threads.create_thread(std::bind(dealer, nRep, nMsg)); // 3
threads.join_all();
1. nRep is the number of REP clients that we want the application to have. You should ensure that it is at least one, and "not too big", accordingly to what "big" is in your environment.
2. Each REP client runs its own copy of the rep2dealer() function, see below. It expects in input the number of messages that each client should receive. Same consideration as seen in (1) applies.
3. The server runs on the dealer() function. It needs to know how many clients to expect, and how many messages to send to each of them.

The socket addresses are:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
I am not showing here the code for the utility function dumpId(), it just print to std::cout, locking to protect this shared resource by concurrent accesses.

Here is the function executed by each client:
void rep2dealer(int nMsg)
{
    dumpId("REP startup");
    zmq::context_t context(1);
    
    zmq::Socket skRep(context, ZMQ_REP); // 1
    skRep.connect(SK_ADDR_CLI);
    dumpId("REP CLI on", SK_ADDR_CLI);

    for(int i =0; i < nMsg; ++i) // 2
    {
        std::string msg = skRep.recvAsString();
        dumpId("REP for", msg.c_str());
        skRep.send(msg);
    }
}
1. Reply socket, connected to the DEALER one that will see in the server.
2. On a reply socket we synchronously receive and send messages. Here we loop to do that for a (single-part) message for the number of expected messages for each client. If the balance done by the dealer wouldn't be fair, or for any reason a client would receive less messages than expected, it would hang trying to get more stuff.

And this is the server:
void dealer(int nRep, int nMsg)
{
    dumpId("DEALER startup");
    zmq::context_t context(1);

    zmq::Socket skDealer(context, ZMQ_DEALER); // 1
    skDealer.bind(SK_ADDR_SRV);
    dumpId("DEALER SRV on", SK_ADDR_SRV);

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 2

    zmq::Frames frames; // 3
    frames.reserve(3);
    frames.push_back(boost::lexical_cast<std::string>(boost::this_thread::get_id()));
    frames.push_back("");
    frames.push_back("");

    for(int i =0; i < nRep * nMsg; ++i)
    {
        frames[2] += 'k'; // 4
        skDealer.send(frames);
    }
    dumpId("all sent");

    for(int i =0; i < nRep * nMsg; ++i)
    {
        zmq::Frames input = skDealer.blockingRecv(3); // 5
        dumpId(input[2].c_str()); // 6
    }

    dumpId("DEALER done");
}
1. This is the DEALER socket accepting the connection from the REP client sockets.
2. Before sending messages, we wait till all the clients are connected, so that each of them would get its own share of messages.
3. Let's prepare the base for the message to be sent. It is a three part message, first frame is the address of the sender, here it is not interesting, but still I set it to the thread id. Second element is empty, acting as a separator. Third element is the payload. It is initialized empty and modified in the sending loop, to give some sensible testing feedback.
4. The first generated message has a single 'k' as a payload, any other time another 'k' is added.
5. Getting the message back from the clients.
6. Dump to standard output the payload.

No comments:

Post a Comment