Pages

LRU Queue Device - worker

Before taking care of the LRU queue broker itself, and after having seen what its client does, it is time to look to the other side of the device, the worker threads.

Our device is designed to run a user defined number of workers. Each worker is associated to a thread, and each of them runs this function:
const char* SK_ADDR_BACKEND = "inproc://backend"; // 1

// ...

void worker(zmq::context_t& context) // 2
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    zmq::socket_t skWorker(context, ZMQ_REQ); // 4
    zmq_setsockopt(skWorker, ZMQ_IDENTITY, id.c_str(), id.length());
    skWorker.connect(SK_ADDR_BACKEND);

    std::string receiver;
    int payload = 0;
    while(true)
    {
        zmq::message_t zmReceiver(receiver.length()); // 5
        memcpy(zmReceiver.data(), receiver.c_str(), receiver.length());
        skWorker.send(zmReceiver, ZMQ_SNDMORE); // 5a

        zmq::message_t zmDummy;
        skWorker.send(zmDummy, ZMQ_SNDMORE); // 5b

        zmq::message_t zmOutput(sizeof(int));
        memcpy(zmOutput.data(), &payload, sizeof(int));
        skWorker.send(zmOutput); // 5c

        zmq::message_t zmClientId;
        skWorker.recv(&zmClientId); // 6
        dump(id, zmClientId);

        if(!zmClientId.size()) // 7
        {
            dump(id, "terminating");
            return;
        }
        const char* base = static_cast<const char*>(zmClientId.data());
        receiver = std::string(base, base + zmClientId.size()); // 8

        skWorker.recv(&zmDummy); // 9

        zmq::message_t zmPayload;
        skWorker.recv(&zmPayload); // 10
        if(zmPayload.size() != sizeof(int)) // 11
        {
            dump(id, "bad payload detected");
            return;
        }

        payload = *(int*)zmPayload.data(); // 12
        dump(id, payload);

        boost::this_thread::sleep(boost::posix_time::millisec(payload));
    }
}
1. We are developing a multithread ZeroMQ application, where the sockets are connected on the inproc protocol. The name I choose for the backend connection between the device and the workers is "backend".
2. As we should remember, the 0MQ context is the only object that could be shared among different threads, and actually it should be shared among them when, as in this case, we want to have a data exchange.
3. Usually we don't care to specify the socket identity, and we let ØMQ to do it. Here it is done as a way to make easier to test the application. That's why I used the Boost thread id.
4. To this point, the worker acts just like the client. Both of them are REQUEST sockets, connected inproc to a ROUTER socket in the device. The difference is on what is going to happen next. The worker is going to send a dummy message to let the device know it is up and running, than it waits for a reply, doing some job on it, sends an answer back and waits for a new job, till it gets a terminating message.
5. It takes eight lines to send a request to the device. And the first time, as said in (4), it is just a dummy. The trouble is that we are sending a multipart message, and there is no easy way to do it, out of taking care of the gory details.
As first part (5a) we are sending a character string representing the address of the client that asked for this job. The first time we have no associated client, we are just signalling to the device that we are ready, so we are actually sending an empty frame.
Second part (5b) is a zero sized frame, seen from ZeroMQ as a separator.
Third and last part (5c) is the payload, representing the result of the job performed by the worker. For the first loop this value is meaningless.
6. Now the worker hangs, waiting to receive a reply on the socket. As the name of the variable suggests, what we expect to get in it is the client id, that we'll use to send back an answer in (5a).
7. If no client id is specified, we interpret the message as a termination request, so we stop looping.
8. Otherwise, we convert the raw byte array in a proper C++ string, and we store it in the local variable that will be used in (5a).
9. Next frame is a dummy, we just ignore it.
10. Finally we receive the payload, the read stuff we should work with.
11. This sample application is designed so that only int are expected here, anything else is considered a catastrophic failure (real production shouldn't behave like this, as I guess you well know).
12. The integer in the last frame of the message is converted to an int, and used by the worker to, well, determine how long it should sleep. The value itself, without any change, is going to be used in (5c) to be sent back to the client.

The full C++ source code for this example is on github.

No comments:

Post a Comment