Pages

Anonymous condition

Let's write an example to show how to use anonymous conditions in a multiprocess application using the boost interprocess (IPC) library. We write a producer/consumer that manage the exchange of message through a buffer built in the shared memory.

Two conditions are used to let the two process communicate to each other the change of buffer state.

The main of the application performs the selection between the two functions that specify if the process is actually the producer or the consumer. The producer, identified by an argument, should be launched first:
if(argc == 2)
ip09a(argv[1]);
else
ip09b();

And here is the actual code:

#include <iostream>
#include <cstdio>
#include <cstring>
#include <string>

#include "boost/interprocess/sync/interprocess_mutex.hpp"
#include "boost/interprocess/sync/interprocess_condition.hpp"
#include "boost/interprocess/sync/scoped_lock.hpp"
#include "boost/interprocess/shared_memory_object.hpp"
#include "boost/interprocess/mapped_region.hpp"

using namespace boost::interprocess;

namespace
{
const char* MY_SHARED = "MySharedMemory";

class SharedMessage // 1.
{
private:
enum { SIZE = 100 };
char message_[SIZE];

interprocess_mutex mutex_;
bool pending_;
bool terminated_;

interprocess_condition cEmpty_;
interprocess_condition cFull_;

public:
SharedMessage() : pending_(false), terminated_(false) {}

void sendMessage(const char* id, int i) // 2.
{
scoped_lock<interprocess_mutex> lock(mutex_);
if(pending_)
cFull_.wait(lock);

std::sprintf(message_, "%s_%d", id, i);
pending_ = true;
cEmpty_.notify_one();
}

void terminate() // 3.
{
scoped_lock<interprocess_mutex> lock(mutex_);
if(pending_)
cFull_.wait(lock);

terminated_ = true;
pending_ = true;

cEmpty_.notify_one();
}

bool readMessage(std::string& buffer) // 4.
{
scoped_lock<interprocess_mutex> lock(mutex_);
if(!pending_)
cEmpty_.wait(lock);

if(terminated_)
return false; // no message read

buffer = message_;
pending_ = false;
cFull_.notify_one();

return true;
}
};

class SMManager // 5.
{
private:
std::string name_;
shared_memory_object shm_;
mapped_region region_;
SharedMessage* sm_;

void remove() { shared_memory_object::remove(name_.c_str()); }
public:
SMManager(const char* name, bool create) : name_(name)
{
if(create)
{
remove();

shared_memory_object shm(create_only, name, read_write);
shm.truncate(sizeof(SharedMessage));
shm_.swap(shm);
}
else
{
shared_memory_object shm(open_only, name, read_write);
shm_.swap(shm);
}

mapped_region region(shm_, read_write);
region_.swap(region);

void* addr = region_.get_address();
sm_ = create ? new (addr) SharedMessage : static_cast<SharedMessage*>(addr);
}

~SMManager() { remove(); }

SharedMessage* getSharedMessage() { return sm_; }
};
}

void ip09a(const char* id) // 6.
{
std::cout << "Starting producer process" << std::endl;

try
{
SMManager smm(MY_SHARED, true);
SharedMessage* pSM = smm.getSharedMessage();

for(int i = 0; i < 7; ++i)
pSM->sendMessage(id, i);
pSM->terminate();
}
catch(interprocess_exception &ex)
{
std::cout << ex.what() << std::endl;
return;
}

std::cout << "Execution completed" << std::endl;
}

void ip09b() // 7.
{
std::cout << "Starting consumer " << std::endl;

try
{
SMManager smm(MY_SHARED, false);
SharedMessage* pSM = smm.getSharedMessage();

std::string message;
while(pSM->readMessage(message))
std::cout << "Message received: " << message << std::endl;
}
catch(interprocess_exception &ex)
{
std::cout << ex.what() << std::endl;
return;
}

std::cout << "Execution completed" << std::endl;
}

1. the SharedMessage class manages the buffer in the shared memory. Basically we have a mutex to rule the access to the shared resources and a couple of conditions to let the processes to communicate between them the change of status. A boolean, pending_, is used to keep track of the current status of the buffer and another one, terminated_, to signal the consumer when the producer has completed its operations.
2. SharedMessage.sendMessage() is used by the producer to put a message in the shared memory for the consumer. A scoped lock is created, than we check for the status variable, if there is already a message in the buffer, the process waits on the lock through the condition variable cFull_. After acquiring the rightful access, we put the message in the shared memory, set the status boolean, and then notify to the condition cEmpty that, well, message is not empty anymore.
3. SharedMessage.terminate() puts a different message in the shared memory, the one saying that the producer is done with producing messages. Instead of putting a string in the buffer we just change the boolean terminated_ setting it to true.
4. SharedMessage.readMessage() is used by the consumer to read the message stored by the producer in the shared memory, but before reading the buffer we check if actually the producer has ended its message production, looking the terminated_ flag. Notice that the condition variable usage mirrows the one of the sendMessage() function.
5. the SMManager class takes care of the shared memory management. Its constructor has a paramenter, create, that let us use it for the producer, that actually allocates the shared memory, and the consumer, that just accesses it. The shared memory is associated to a SharedMessage object, the producer use the new placement to call the SharedMessage constructor without allocating memory, we just use the shared memory that we already have available, the consumer just cast the memory to a pointer to SharedMessage.
6. This is the function used to implement the producer functionality. It creates a SMManager object - to allocate shared memory and create a SharedMessage object - and then sends a few messages.
7. Here we see the logic of the consumer. It creates a SMManager object - to gain access to the shared memory allocated by the producer, seeing it as a SharedMessage object - and then reads messages until it finds that the producer has completed its job.

The code is based on an example provided by the Boost Library Documentation.

No comments:

Post a Comment