Pages

ØMQ subscriber

We are talking about the pub-sub pattern, as implemented in ØMQ. We have already seen a simple publisher, now we are about to write a client for it.

We need a few information on the publisher before starting writing the code for the subscriber: where it is and what it is sending around. Checking its code we see that we can connect it using the TCP protocol. I assume you are going to run both application on the same machine, so as IP address we use localhost; and the port number is 5556. The message is sent as a C-string of characters (without the terminator) and it is expected to be in a X:Y:Z format, three integers colon separated.

Then we should deciding what our client want actually doing with the stream of data coming from the server. Let's say that we want to get ten messages, extract the second integer from each of them (when at least two integers actually are in it), sum them and display the result to the user.

Knowing that, we can write the subscriber code, that should end up in something like this:
try
{
zmq::context_t context(1); // 1

std::cout << "Collecting updates from publisher" << std::endl;
zmq::socket_t subscriber(context, ZMQ_SUB); // 2
subscriber.connect("tcp://localhost:5556"); // 3

char* filter = "1";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); // 4

long total = 0;
for(int update_nbr = 0; update_nbr < 10; update_nbr++) // 5
{
zmq::message_t message; // 6
if(subscriber.recv(&message)) // 7
{
std::string data((char*)message.data(), (char*)message.data() + message.size()); // 8
total += getRelevantValue(data); // 9
}
}
std::cout << "Total collected for code " << filter << " is " << total << std::endl;
} // 10
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. As usual, first thing is initializing the context. This results in a call to zmq_init().
2. Then we create a socket - under the curtain a call to zmq_socket() is done. The socket type is ZMQ_SUB, stating this is a subscriber, created to work with a publisher.
3. From the socket we try to connect to the server, API function is zmq_connect(), specifying protocol and address.
4. This is an important point. We must specify a filter, setting it as an option for the socket. In this case looks natural, since we want to get only the messages starting with "1", but even in the case we don't actually want any filter we must set it (to an empty string). Here the API function in the background is zmq_setsockopt().
5. Let's get ten messages coming from the publisher.
6. A new message is created, the API function zmq_msg_init() would be called here.
7. The message is filled, by zmq_recv(), with data coming from the server.
8. The raw character array is converted in a std::string, so that it could be actually used by the code.
9. A utility function is called to implement the actual client logic - its example code is shown below.
10. The socket goes out of scope, zmq_close() is called by its dtor, and the same for the context, that it result in a call to zmq_term().

The code below is not related to ØMQ but it is kind of fun (in a twisted way) since it makes use of a couple of boost functionality. The task of this function is splitting its input std:string, converting the second field to integer, and returning it. By default (for instance, if the second element in the string is not an integer) it returns zero:
int getRelevantValue(const std::string& data)
{
std::vector<std::string> vs;
vs.reserve(3); // 1
boost::split(vs, data, boost::is_any_of(":")); // 2

std::cout << "Message received: "; // 3
std::for_each(vs.begin(), vs.end(), [](std::string s)
{
std::cout << s << ' ';
});
std::cout << std::endl;

if(vs.size() > 1)
try { return boost::lexical_cast<int>(vs[1]); } catch(...) {} // 4
return 0; // 5
}

1. Kind of overkilling. Its main sense is documenting that we are expecting the string to be split in three fields.
2. This friendly Boost Algorithm function puts in the first parameter the result of splitting the second parameter using the list of possible delimiters specified in the third parameter.
3. For debug purposes we dump to standard output the resulting vector elements - I usually enjoy coupling the standard algorithm for_each() with a lambda function in a case like that.
4. If we actually have at least two elements, we get the second, cast it to int and return it. Notice that I used the Boost lexical_cast<> construct to do that.
5. If there are less than two elements, or we didn't succeed in casting the second element to int, we return zero, as designed.

The official Z-Guide has much more to tell you if you are interested in such kind of things.

No comments:

Post a Comment