Pages

LRU Pattern - client

Let's implement the Least Recently Used Routing pattern for ZeroMQ 2.2.0 (just released) in C++ on Windows by MSCV. On the ZGuide you would find the rationale behind this code, and the C implementation that I used as guideline in this post. Beside porting it to C++, I have adapted it to Windows (there is no IPC support for this platform) and I have done some minor changes that, I guess, makes this example even more interesting.

It is a multithreaded application, where the client has a router socket that connects to a bunch of request sockets, one for each worker thread. The REQ sends a message containing its own id to the ROUTER when it is ready to process a new message; the ROUTER use the REQ id to reply to that specific REQ socket that is known as available. Here I talk about the client part of the application, but you can already have a look at the entire source code on github.

The main function gets as parameter the number of workers that we want to run, creates a router socket and the worker threads, each of them with its own REQ socket, on the same context, lets the router send a few messages to the workers, and then terminates them, getting as a side effect the number of messages processed by each thread:
void lru(int nWorkers)
{
    zmq::context_t context(1);
    MyRouter router(context); // 1

    boost::thread_group threads; // 2
    for(int i = 0; i < nWorkers; ++i)
        threads.create_thread(std::bind(worker, std::ref(context))); // 3

    for(int i = 0; i < nWorkers * 10; ++i)
        router.sendMessage(); // 4

    int processed = 0;
    for(int i = 0; i < nWorkers; ++i)
        processed += router.terminateWorker(); // 5

    threads.join_all(); // 6
    std::cout << "Number of processed messages: " << processed << std::endl;
}
1. See below for the MyRouter class, for the moment think of it as a wrapper to a ØMQ ROUTER socket.
2. The Boost thread_group makes easy to manage a group of threads, like the one we want to have here.
3. Each thread is created on the function worker(), we'll talk about it in a next post, passing to it a reference to the 0MQ context.
4. Send an average of ten messages to each 0MQ REQ socket.
5. Send a terminator to each worker, terminateWorker() returns the number of processed message for the current worker.
6. Join all the threads, give a feedback to the user and terminate.

The MyRouter class uses a const and a class:
const char* SOCK_ADDR = "inproc://workers"; // 1

class RandomTimeGenerator // 2
{
private:
    boost::random::mt19937 generator_;
    boost::random::uniform_int_distribution<> random_;
public:
    RandomTimeGenerator(int low, int high) : random_(low, high) {}
    int getValue() { return random_(generator_); }
};

class MyRouter
{
private:
    zmq::socket_t client_;
    RandomTimeGenerator rtg_;

public:
    MyRouter(zmq::context_t& context) : client_(context, ZMQ_ROUTER), rtg_(1, 1000) // 3
    {
        client_.bind(SOCK_ADDR);
    }

    void sendMessage() // 4
    {
        zmq::message_t zmAddress;
        client_.recv(&zmAddress);

        zmq::message_t zmDummy1;
        client_.recv(&zmDummy1);
        zmq::message_t zmDummy2;
        client_.recv(&zmDummy2);

        client_.send(zmAddress, ZMQ_SNDMORE);

        zmq::message_t zmEmpty; // 5
        client_.send(zmEmpty, ZMQ_SNDMORE);

        int value = rtg_.getValue();
        zmq::message_t zmPayload(sizeof(int));
        memcpy(zmPayload.data(), &value, sizeof(int));
        client_.send(zmPayload);
    }

    int terminateWorker() // 6
    {
        zmq::message_t zmAddress;
        client_.recv(&zmAddress);
        zmq::message_t zmDummy;
        client_.recv(&zmDummy);

        zmq::message_t zmPayload;
        client_.recv(&zmPayload);
        std::string id((char*)zmAddress.data(), (char*)zmAddress.data() + zmAddress.size());
        int acknowledged = *(int*)zmPayload.data();
        dumpMessage(id, acknowledged); // 7

        client_.send(zmAddress, ZMQ_SNDMORE); // 8

        zmq::message_t zmEmpty;
        client_.send(zmEmpty, ZMQ_SNDMORE);
        client_.send(zmEmpty);

        return acknowledged;
    }
};
1. The address used by the sockets for the inproc connections.
2. Class to generate a random int that will be used to simulate a randomly long task to be executed by the worker. As generator is used a Boost Marsenne twister, and as distribution an uniform integer one.
3. The ctor expects in input the ZeroMQ context that should be used to create the 0MQ ROUTER socket. Besides, the random generator is initialized, so that it would generate a series of number in the interval [1, 1000]. The client socket is bound to the above specified inproc socket address.
4. The core of this class, firstly, we wait for a worker ZeroMQ REQ socket to state that it is available, we care only about the first frame it has sent to us, that contains the address of the REQ socket. The other two parts are discarded, but the first is sent back as the first frame of our reply, so that ZeroMQ could associate it to the original sender.
5. Second frame of the reply is empty, and in the third one we place an int as returned by the random generator. Notice the ZMQ_SNDMORE flag for the first two frames, to let ZeroMQ understand as the three sends have to be seen as three parts of an single message.
6. The last message received from each 0MQ REQ is managed differently from all the previous ones. The first frame is sent back as in (4), but we use also the third frame, the actual payload in this multipart message, from which we extract an integer, that represent the number of messages that have been received by that worker.
7. This function just print on the standard console the passed parameters. But we are in a multithread context, and std::cout is a shared resource, so we should be careful.
8. We send a last message to the current worker. Actually an empty message that has to be interpreted as a terminator.

No comments:

Post a Comment