Pages

LRU Queue Device - receiving on the routers

Last big chunk left in the development of our sample LRU queue broker is its couple of private methods that take cares of receiving messages on its two router sockets. If you don't have a clue of what I am talking about, you should know that this is a post in a series dedicated to a ZeroMQ device that gets input messages from a few clients, uses a bunch of workers to elaborate a result, that is sent back to the original client. The worker choice is made on a Least Recently Used algorithm, and the available worker identities are stored in a queue. That should explain at least the name of the post. A better introduction is given in a previous post.

The two functions I am about to talk about here are used by the poll() function that is described in the post where the class public interface is discussed.

In the private section of this class are also defined a bunch of private data members:
zmq::context_t context_;
zmq::socket_t backend_;
zmq::socket_t frontend_;
std::queue<std::string> qWorkers_;
boost::thread_group thWorkers_;
boost::thread_group thClients_;
They represent the ZeroMQ context, the two ROUTER sockets, the queue that keeps the worker ids currently available, and the worker and client threads.

Let's see what we do when a message is pending on the backend socket:
void receivingOnBackend()
{
    zmq::message_t zmWorkerId; // 1
    backend_.recv(&zmWorkerId);

    zmq::message_t zmDummy;
    backend_.recv(&zmDummy);

    zmq::message_t zmClientId; // 2
    backend_.recv(&zmClientId);

    backend_.recv(&zmDummy);

    zmq::message_t zmPayload; // 3
    backend_.recv(&zmPayload);

    const char* base = static_cast<const char*>(zmWorkerId.data());
    std::string id(base, base + zmWorkerId.size());
    dump(id, "registering worker");
    qWorkers_.push(id); // 4

    if(zmClientId.size()) // 5
    {
        frontend_.send(zmClientId, ZMQ_SNDMORE); // 6
        frontend_.send(zmDummy, ZMQ_SNDMORE);
        frontend_.send(zmPayload);
    }
}
1. Messages are multipart. The first frame is the worker id, we need to store it somewhere, so that we can use it to send a reply to it.
2. Then we have an empty frame, conceptually a separator between "real" frames, and then a message that represent the id of the client that originally sent a message to be elaborated by the worker.
3. Another separator, and finally we have the actual payload, the result of the job done by the worker that we want to pass back to the client.
4. First thing that we have to do now, is adding the worker id on our queue, marking it in this way as available for a new task.
5. Actually, there is a special case. The first time a worker sends a message to the device, it is just to signal that it is up, and no real client id (and payload either) is actually passed. This is detected by this check on the client id frame size. A zero size means there is nothing to do more.
6. Otherwise, we have to forward to the frontend the message as received from the worker, stripping down the envelope containing the worker address.

And this is the frontend socket message management:
void receivingOnFrontend()
{
    zmq::message_t zmIdCli;
    frontend_.recv(&zmIdCli); // 1
    dump("Receiving on frontend from", zmIdCli);

    zmq::message_t zmDummy;
    frontend_.recv(&zmDummy);

    zmq::message_t zmPayload;
    frontend_.recv(&zmPayload); // 2

    std::string idWork = qWorkers_.front(); // 3
    qWorkers_.pop();
    dump(idWork, "selected worker");

    zmq::message_t zmIdWork(idWork.size());
    memcpy(zmIdWork.data(), idWork.c_str(), idWork.size());
    backend_.send(zmIdWork, ZMQ_SNDMORE); // 4
    backend_.send(zmDummy, ZMQ_SNDMORE);
    backend_.send(zmIdCli, ZMQ_SNDMORE);
    backend_.send(zmDummy, ZMQ_SNDMORE);
    backend_.send(zmPayload);

    dump(idWork, "message sent to worker");
}
1. The first frame in the multipart message contains the client id.
2. After a separator frame, we have the payload.
3. We should send that message to a worker, so we get the first worker id from the queue, that represents the last recently used worker. Than we pop it from the queue, the worker is not available anymore.
4. The first frame in the multipart message that we send through the backend socket, is the address of the worker that should receive it, than we send the client id, and finally the payload. Remember to always send an empty frame between one "real" frame and other.

No comments:

Post a Comment