Pages

Pub/Sub with envelope

A typical use for ZeroMQ multipart messages is to implement the message envelope concept.

A message envelope is a place where we store additional information related to the message but not explicitly part of it, as could be an address.

In the publisher/subscriber pattern context, an envelope, seen as the first frame of the multipart message, has the interesting property of being the only area filtered by the subscriber.

Let see a publisher that sends indefinitely a couple of multipart messages per second:
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5563");

std::string aEnv("A"); // 1
std::string aCon("This is A message");
std::string bEnv("B");
std::string bCon("This is B message");

while(true)
{
std::cout << '.'; // 2

zmq::message_t ma1((void*)aEnv.c_str(), aEnv.length(), NULL); // 3
zmq::message_t ma2((void*)aCon.c_str(), aCon.length(), NULL);
publisher.send(ma1, ZMQ_SNDMORE); // 4
publisher.send(ma2); // 5

zmq::message_t mb1((void*)bEnv.c_str(), bEnv.length(), NULL);
zmq::message_t mb2((void*)bCon.c_str(), bCon.length(), NULL);
publisher.send(mb1, ZMQ_SNDMORE);
publisher.send(mb2);

boost::this_thread::sleep(boost::posix_time::seconds(1)); // 6
}

1. We are going to send this couple of messages, envelope and content as defined here, all over again.
2. Some feedback to show that the server is actually alive.
3. We use the ZeroMQ zero-copy idiom, the data sent is actually stored in the C++ std::string.
4. The envelope is marked as frame of a multipart message.
5. The content is marked as last frame in a message.
6. Take a sleep, using boost to be more portable.

And here is the C++ code for the subscribers:
zmq::context_t context(1);
zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5563");
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); // 1

while(true)
{
zmq::message_t envelope;
zmq::message_t content;
subscriber.recv(&envelope); // 2
subscriber.recv(&content); // 3

std::cout << "Envelope: ";
dumpMessage(envelope); // 4
std::cout << " content: ";
dumpMessage(content);
}

1. As a filter is expected a C-string that could be passed to the client as an argument from the command line.
2. The client is pending on its SUB socket, waiting for a message that matches with its filter. Remember that multipart messages are managed atomically, and the filtering works accordingly. So if no match is found in the envelope, the complete multipart message is discarded, as one would expect.
3. We know that the message is made by two frames, so we simply get the second one. Usually we should implement a more robust check on the RCVMORE socket option to ensure that there actually is next frame, before receiving it. See the post on multipart messages for details.
4. Since a message data is not a real C-string, missing the terminating NUL character, we need a utility function to print it, something like this:
void dumpMessage(zmq::message_t& msg)
{
std::for_each((char*)msg.data(), (char*)msg.data() + msg.size(),
[](char c){ std::cout << c;});
std::cout << std::endl;
}

In the Z-Guide you'll find more on this issue and the original C source code on which I have based this example.

No comments:

Post a Comment