Pages

PUSH-PULL with ZeroMQ 3.1: sink

We have a ventilator that pushes messages to one or (usually) more workers. We have the worker, that pulls messages from the ventilator, does some job, and then pushes the result to the sink. Now we are going to write the code for this latter part of this ØMQ 3.1 Divide and Conquer pattern.

The sink is a very simple piece of code. It acts as a PULL server, and runs till it receives all the expected messages, than it prints some statistics before terminating:
void* context = zmq_init(1);
void* socket = zmq_socket(context, ZMQ_PULL); // 1
zmq_bind(socket, "tcp://*:5558");

if(zmq_recv(socket, NULL, 0, 0) == 0) // 2
    std::cout << "Ventilator has started to generate jobs" << std::endl;
else
    std::cout << '!';

boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time(); // 3

for(int i = 0; i < 100; ++i) // 4
{
    if(zmq_recv(socket, NULL, 0, 0) != 0) // 5
        std::cout << '!';
    else
        std::cout << (i%10 == 9 ? ':' : '.');
}
boost::posix_time::ptime end = boost::posix_time::microsec_clock::local_time(); // 6
std::cout << std::endl << "Delta is: " << end - start << std::endl;

zmq_close(socket);
zmq_term(context);
1. The socket is used as puller in a PULL-PUSH ZeroMQ messaging pattern, and the call to zmq_bind() specifies that it has the server role (on a tcp protocol).
2. This could look a bit wierd at first sight but it is exactely what we need here. We are asking to get a message, passing no buffer where store, but specifying that we are expecting a zero sized message, so it actually doesn't need any place to be stored. If the message, as received was bigger than expected, or if we get an error receiving it, an exclamation mark is showed to the user (quite a poor error handling, but it would suffice here), otherwise we signal that the first dummy message has been received.
3. The job of the sink is just collecting the time taken from the workers to run. So we store the current time now.
4. This is not very nice. We have this "one hundred" written in stone matching the number of tasks generated by splitting from the ventilator. It would be better to establish a connection ventilator-sink so that they could synchronize.
5. Again, messages coming from the workers don't carry any information more than the fact they are issued, so we can avoid any buffer where to store it.
6. At the end of the loop, we calculate and report duration of batch.

This application is meant to be run on a modern machine, with at least a couple of cores available. On a mono-processor, mono-core machine, we'll see how all our job of splitting and parallelize the execution will be just a waste. Otherwise you could have some fun checking the execution time when the number of workers varies.

No comments:

Post a Comment