Pages

Worker - ØMQ pull/push

Second step of our simple divide and conquer ØMQ application. Here we see the code for the worker, the process that is connected as client to the ventilator, being the pull side of a push/pull pattern, and to the sink, here acting as push in a pull/push relation.

Here is the code:
try
{
zmq::context_t context(1);

zmq::socket_t receiver(context, ZMQ_PULL); // 1
receiver.connect("tcp://localhost:5557");

zmq::socket_t sender(context, ZMQ_PUSH); // 2
sender.connect("tcp://localhost:5558");

while(true)
{
zmq::message_t msgIn;
if(receiver.recv(&msgIn))
{
if(msgIn.size() == sizeof(int)) // 3
{
int delay = *(static_cast<int*>(msgIn.data())); // 4

std::cout << '[' << delay << ']';
boost::this_thread::sleep(boost::posix_time::milliseconds(delay)); // 5
}
else // 6
{
std::cout << "[-]";
}
}
else // 7
std::cout << '!';

zmq::message_t msgOut(0); // 8
sender.send(msgOut);
} // 9
}
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. A pull socket connects this process to the ventilator.
2. A push socket connects this process to the sink.
3. We expect the message being an int, and having its size.
4. If the message is as expected, we can safely see it as an int.
5. We fake the job sleeping for the number of milliseconds we get from the ventilator.
6. Messages of unexpected size are just discarded.
7. In case of problem receiving a message, we log a warning.
8. We send an empty message to the sink to show it that the worker elaboration has been completed.

This example is basically a rewrite in C++ with Boost of the code you find in the official Z-Guide.

No comments:

Post a Comment