Pages

Client side reliable REQ-REP

We can't assume any software being 100% reliable - actually, we can't assume that for anything in the world - but the ZeroMQ connection are designed to be fast more than reliable, so we usually want to add some fallback mechanism to avoid the more common issues.

In the ZGuide are shown a few patterns we could implement to go in the direction of an improved reliability. The lazy pirate pattern, as they call it, is the first of them. It is very simple, but it could be enough. Here I rewrite that example for C++ and using zmq::Socket, my subclass for the zmq::socket_t, as defined in the original 0MQ version 2.x C++ binding.

The point is that the REQ client sends (along with the payload, not shown in the example) a sequence number. The server replies sending it back to the client (usually along with some feedback, again, not in this example), so that the client could check if anything worked fine. If the client gets no confirmation, we assume the message we sent was lost, so we resend it. That is a bit more complicated than saying it, since the REQ-REP protocol is strictly synchronous. Sending more than one REQ before getting a REP results in an EFSM error. An easy, brutal but effective way to overcome this issue, is closing and reopening the socket before resending the message.

To keep testing easy, I put both client and server in the same process, this is the procedure in the main thread that spawns the two threads:
boost::thread_group threads;
threads.create_thread(std::bind(client, timeout, retries)); // 1
threads.create_thread(server); // 2
threads.join_all();
1. This starts the client, passing to it a couple of ints, timeout, in millisecs, and the number of retries that the client should do before assuming that the server is down. The balance between these number has to be chosen carefully, being very sensitive for giving a good behaving system. In this example I used 2500 and 3, as suggested by the ZGuide, then I played around with them to see what happened. I'd suggest you to do the same.
2. It starts the server.

A few constants used by the code:
const char* SK_ADDR_CLI = "tcp://localhost:5555"; // 1
const char* SK_ADDR_SRV = "tcp://*:5555"; // 2

const int BASE_TIMEOUT = 1000; // 3
1. The client socket connects to this address.
2. The server socket bound address.
3. I set the standard timeout for the application to 1 second (here is in millis, as you would have correctly guessed).

Server

It is very simple. Even simpler than the original one:
void server()
{
    dumpId("Starting server"); // 1

    zmq::context_t context(1);
    zmq::Socket skServer(context, ZMQ_REP);
    skServer.bind(SK_ADDR_SRV);

    for(int i = 1; i < 10; ++i) // 2
    {
        int message = skServer.recvAsInt(); // 3

        if(i % 4 == 0) // 4
        {
            dumpId("CPU overload");
            boost::this_thread::sleep(boost::posix_time::millisec(2 * BASE_TIMEOUT));
        }
        else
            dumpId("Normal request", message);

        boost::this_thread::sleep(boost::posix_time::millisec(BASE_TIMEOUT));
        skServer.send(message);
    }

    dumpId("Terminating, as for a server crash");
}
1. I paid the simplification of running both client and server in the same process, being forced to care about concurrent issues on std::cout. The dumpId() function uses a lock on a mutex to avoid that printing gets garbled by competing threads.
2. This toy server just recv/send a bunch of times before terminate simulating a crash.
3. Socket::recvAsInt() receives on the socket and converts the result in an int, that is returned to the caller.
4. After a while, a CPU overload is simulated, sleeping for a while.

The Lazy Pirate Client

To keep the client code a bit more readable, I have decided to use a utility class tailored on the example need. Actually I should say I am not completely happy about it, but I should have modified the original zmq::socket_t to get a more satisfactory result, but I am not sure that it would be a smart move in this moment. Maybe in the future.

Anyway, here is my current result:
class LazyPirateClient
{
private:
    zmq::context_t& context_;
    std::unique_ptr<zmq::Socket> sk_; // 1

    int sent_; // 2

    void reset() // 3
    {
        sk_.reset(new zmq::Socket(context_, ZMQ_REQ));
        sk_->connect(SK_ADDR_CLI);

        int linger = 0;
        sk_->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
    }
public:
    LazyPirateClient(zmq::context_t& context) : context_(context), sent_(-1) // 4
    {
        reset();
    }

    bool checkedRecv() // 5
    {
        int value = sk_->recvAsInt();
        if(value == sent_)
            return true;

        dumpId("Unexpected reply from server", value);
        return false;
    }

    zmq::Socket* operator()() { return sk_.get(); } // 6

    void send(int value) // 7
    {
        sk_->send(value);
        sent_ = value;
    }

    void resend() // 8
    {
        reset();
        send(sent_);
    }
};
1. Given the limitation of the original zmq::socket_t class, where the underlying raw socket pointer is stored in its private section, I had to think some smart alternative solution to close and reopen the lazy pirate underlying socket.
2. Internal flag, keeping memory of the last sequence number sent to the server, so that we can compare the received result.
3. Create a new REQ socket, and establish a connection to the REP server socket. The linger flag is set to zero, meaning that we want pending messages to be immediately discarded.
4. Ctor, rely on the above defined reset().
5. We don't care much of what we get back from the server, besides checking if the sequence number matches. This method does just that. It uses the Socket::recvAsInt() utility function that extract an int from the received message.
6. Utility function to get the underlying pointer to zmq::Socket
7. It sends the value (as an int, thanks to an explicit overload of send() defined in zmq::Socket), and store it so that it could be checked by (5).
8. As explained before, we can't simply resend a message, we would get an EFSM error. So we reset the socket, deleting the old one and creating a new one, before sending again the last value.

Client

This is the function on which runs the client thread:
void client(int timeout, int retries)
{
    dumpId("Starting client");

    zmq::context_t context(1);
    LazyPirateClient skClient(context);

    for(int sequence = 1; sequence < 100; ++sequence) // 1
    {
        skClient.send(sequence); // 2
        boost::this_thread::sleep(boost::posix_time::millisec(BASE_TIMEOUT));

        bool confirmed = false;
        zmq::pollitem_t items[] = { { *skClient(), 0, ZMQ_POLLIN, 0 } }; // 3
        for(int cycle = 0; cycle < retries; ++cycle)
        {
            zmq::poll(&items[0], 1, timeout * 1000); // 4

            if(items[0].revents & ZMQ_POLLIN) // 5
            {
                if(skClient.checkedRecv()) // 6
                {
                    dumpId("Server is synchronized", sequence);
                    confirmed = true;
                    break;
                }
            }
            else // 7
            {
                dumpId("Retrying", cycle);
                skClient.resend();
                items[0].socket = *skClient(); // 8
            }
        }
        if(!confirmed) // 9
        {
            dumpId("No answer from server, abandoning.");
            break;
        }
    }
}
1. Loop for some time. Actually, when the server is down, the client would automatically shutdown, and this is what is going to happen almost immediately.
2. Send the sequence number to the server. Usually we want to send more interesting stuff, too.
3. Prepare to poll on the client socket for input messages.
4. Wait for the period specified by the user.
5. We actually have something in input.
6. The sequence check succeeded, give the user a feedback and go on.
7. Nothing received in the interval, whatever happened, we simply resend the last message.
8. Remember that we want to poll on the new socket.
9. No confirmation from the server received, the client assumes that something very bad happened out there, and shutdown.

Full C++ source code is on github.

No comments:

Post a Comment