Pages

Simple ØMQ TCP server

Here I am going to rewrite the already seen simple TCP echo server using ØMQ instead of ASIO. I am doing it in C++, using its standard wrapper, but I keep an eye on the underlying C code, to understand better what is going on.

Exception Handling

We are using C++ as implementation language, so exception is the preferred way to managed unexpected errors. The C++ ØMQ functions throws a zmq::error_t exception, derived from std::exception in this case. So, our code would be in such a try/catch block:

try
{
   // ...
}
catch(const zmq::error_t& ze)
{
   std::cout << "Exception: " << ze.what() << std::endl;
}

Context

First thing we have to do is creating a context. If we have a look at the source code for zmq::context_t we see that this class is basically a way to ensure that zmq_init() is called at startup and zmq_term() at shutdown. So we could say that its task is implementing RAII, where the acquired resource is the ØMQ context. If we develop a ØMQ application in C language, it should the programmer responsibility to ensure zmq_init() / zmq_term() are called as expected. Using C++ we enforce it by putting these mandatory function in the ctor and dtor of the context class:
zmq::context_t context(1);
The parameter we pass to the context is passed directly to zmq_init() and represents the number of threads in the thread pool used by ØMQ to handle I/O operations.

Socket

The zmq::socket_t class wraps the socket functionality offered by ØMQ. Its ctor calls zmq_socket() with the parameter passed by the user and throws a zmq::error_t in case of error. The dtor call its member function close() that is a wrapper for a call zmq_close() on the current socket. We call it in this way:
zmq::socket_t socket(context, ZMQ_REP);
Meaning that we want to create a new socket of ZMQ_REP type for the current context.

Specifying ZMQ_REP as type we are saying to ØMQ that we want our socket to be used in a request-reply pattern (in the reply role, as one should expect). The reason why we should specify this, is that ØMQ provides an higher lever of abstraction than ASIO, and we are required to provide to ØMQ some information on what we want actually doing, so that ØMQ could know how it should to behave.

Binding a socket

Once a socket is created, it had to be bound to be able to accept connections. The C function for doing that, zmq_bind(), is wrapped in the zmq::socket_t::bind() member function:
socket.bind("tcp://*:50013");
The passed string is the endpoint for the socket. Its first part, here "tcp", specifies the transport protocol that we want to use. The second part, after the separator "://" contains information as required by the actual protocol used. In this case we say that we want our socket to work for any available interface (that is the meaning of the star "*") and then, after the colon, we specify the port that we want to use (50013).

Waiting for request on a socket

If we have a look at the source code for class zmq::message_t, we see that it derives privately from zmq_msg_t and it is a way to enforce by code the requirement of calling zmq_msg_init() on a zmq_msg_t before using it, and then calling zmq_msg_close() on it at the end of its usage.

Once the zmq::message_t is initialized, we can pass it to zmq::socket_t::recv(), like this:
zmq::message_t request;
socket.recv(&request);
As usual, recv() is a wrapper to zmq_recv(), adding error check and exception generation, when something goes wrong.

The call to recv() blocks the current execution flow, waiting a client for filling the message with data. If everything goes fine (that means, no exception) the next step is about checking what we actually have in the received message.

Extracting the received data

In this specific piece of code, we decided that an empty message has to be interpreted by the server as a request of terminating its execution. To check if this is the case, we check the length of the message, calling its member method size():
request.size()
It shouldn't be a big surprise to find out that it just wraps a call to zmq_msg_size().

The message moved around by ØMQ is actually just a bunch of bytes, accessed by the member function data(), that wraps a call to zmq_msg_data(). In this application we can safely assume that the data is actually a string of characters, so we can convert it to a std::string using the ctor that requires as parameters the start and end iterator:
std::string data((char*)request.data(), (char*)request.data() + request.size());

Sending a reply

Building a zmq::message_t should be quite straightforward, it just a matter of setting the size of the data, and copying the actual message, byte by byte, in its reserved area. To actually send it we call:
socket.send(reply);
That actually means calling zmq_send() and ensuring that there is no error.

Here is the resulting code:
try
{
   zmq::context_t context(1);
   zmq::socket_t socket(context, ZMQ_REP);
   socket.bind("tcp://*:50013");
   while(true)
   {
      zmq::message_t request;
      socket.recv(&request);

      if(request.size() == 0)
      {
         std::cout << "Empty message received - shutting down the server" << std::endl;
         break;
      }

      std::string data((char*)request.data(), (char*)request.data() + request.size());
      std::cout << "Received: " << data << std::endl;

      zmq::message_t reply(data.length());
      memcpy(reply.data(), data.c_str(), data.length());
      socket.send(reply);
   }
}
catch(const zmq::error_t& ze)
{
   std::cout << "Exception: " << ze.what() << std::endl;
}

If you are looking for more information on this stuff, you would probably enjoy reading the official Z-Guide.

No comments:

Post a Comment