Pages

PUSH-PULL with ZeroMQ 3.1: worker

I am rewriting the divide and conquer example using this time the raw C interface to ØMQ 3.1; we have already seen the ventilator, now it is the turn of the worker. This component is a PULL-PUSH bridge between the ventilator, that starts the execution stream splitting the original (huge) task in many (tiny) tasks, and the sink, that receives the results from the workers.

The worker is a client of both the ventilator and the sink, and in this first simple implementation it has a flaw: no one tells it when the job is done. It just hangs on the ventilator waiting for more messages that sadly are not coming. For the moment we will kill it with an interrupt, in the near future will see a more elegant way of dealing with this issue.
void* context = zmq_init(1);
void* skPull = zmq_socket(context, ZMQ_PULL); // 1
zmq_connect(skPull, "tcp://localhost:5557");

void* skPush = zmq_socket(context, ZMQ_PUSH); // 2
zmq_connect(skPush, "tcp://localhost:5558");

while(true) // 3
{
    int message;
    if(zmq_recv(skPull, &message, sizeof(int), 0) == sizeof(int)) // 4
    {
        std::cout << '[' << message << ']';
        boost::this_thread::sleep(boost::posix_time::milliseconds(message)); // 5
    }
    else
    {
        std::cout << "[-]";
    }

    zmq_send(skPush, NULL, 0, 0); // 6
}
zmq_close(skPull);
zmq_close(skPush);
zmq_term(context);
1. This socket is used here to connect in PULL mode to the PUSH server socket defined in the ventilator.
2. A second socket used to PUSH messages to the sink, that we are going to provide of a PULL server socket.
3. This forever loop is the soft spot of this worker implementation. There is currently no way to break it other than sending an interrupt to the process running this code. Let's keep it in this way for the time being.
4. We expect the ventilator is sending messages at most int sized, the only null-sized message should be the first one, the flag that signal that the ventilator is about to start its real job. The zmq_recv() makes available space just for an int, if a bigger message is detected, it is firstly truncated, and then discarded.
5. The job done by the worker is emulated by this sleep()
6. In this implementation the sink doesn't expect any meaningful information from the worker, just an acknowledgment of the job done, so an empty message is enough.

Even if the application is not complete, we are still missing the sink, we can already run ventilator and worker. We won't get any result, but we can check if anything goes on fine.

No comments:

Post a Comment