Pages

Anonymous mutex

The application we are going to use to show how to use anonymous mutex is build around a simple cyclic buffer that resides in shared memory and it is used by two different processes. An interprocess_mutex, defined in the Boost IPC library, is used to rule the access to the shared resources through a scoped_lock.

To create different processes we call the executable with no parameter - for the master process - and then with a parameter, the name of the secondary process.

So, the main of our application would distinguish among the two different cases, calling the appropriate function, using a piece of code like that:
if(argc == 1)
  ip07a();
else
  ip07b(argv[1]);
Let's now have a look at the complete code, than we'll say something about the most interesting passages:
#include <cstdio>
#include <iostream>

#include "boost/interprocess/sync/interprocess_mutex.hpp"
#include "boost/interprocess/sync/scoped_lock.hpp"
#include "boost/interprocess/shared_memory_object.hpp"
#include "boost/interprocess/mapped_region.hpp"
#include "boost/thread/thread.hpp"

using namespace boost::interprocess;

namespace
{
const char* MY_SHARED = "MySharedMemory";

class SharedMemoryLog // 1
{
private:
  enum { NUM_ITEMS = 10, LINE_SIZE = 100 };
  boost::interprocess::interprocess_mutex mutex_;

  char items[NUM_ITEMS][LINE_SIZE];
  int curLine_;
  bool done_;
public:
  SharedMemoryLog() : curLine_(0), done_(false) {}

  void push_line(const char* id, int index)
  {
    scoped_lock<interprocess_mutex> lock(mutex_);
    std::sprintf(items[(curLine_++) % SharedMemoryLog::NUM_ITEMS], "%s_%d", id, index);
    std::cout << "Inserting item " << id << ' ' << index << std::endl;
  }

  void dump()
  {
    scoped_lock<interprocess_mutex> lock(mutex_);
    for(int i = 0; i < NUM_ITEMS; ++i)
      std::cout << items[i] << std::endl;
  }

  void done()
  {
    scoped_lock<interprocess_mutex> lock(mutex_);
    done_ = true;
  }

  bool isDone()
  {
    scoped_lock<interprocess_mutex> lock(mutex_);
    return done_;
  }
};

class ShMemManager // 2
{
private:
  std::string name_;
  bool create_;
  shared_memory_object shm_;
  mapped_region region_;
  SharedMemoryLog* sml_;

  void remove() { shared_memory_object::remove(name_.c_str()); }
public:
  ShMemManager(const char* name, bool create = true) : name_(name), create_(create)
  {
    if(create_)
    {
      remove();

      shared_memory_object shm(create_only, name_.c_str(), read_write);
      shm.truncate(sizeof(SharedMemoryLog));
      shm_.swap(shm);
    }
    else
    {
      shared_memory_object shm(open_only, name_.c_str(), read_write);
      shm_.swap(shm);
    }

    mapped_region region(shm_, read_write);
    region_.swap(region);
    void* addr = region_.get_address();

    sml_ = create_ ? new (addr) SharedMemoryLog : static_cast<SharedMemoryLog*>(addr);
  }

  ~ShMemManager() { remove(); }

  SharedMemoryLog* getMemory() { return sml_; }
};
}

void ip07a() // 4
{
  std::cout << "Starting master process ..." << std::endl;

  try
  {
    ShMemManager smm(MY_SHARED);
    SharedMemoryLog* data = smm.getMemory();

    for(int i = 0; i < 7; ++i)
    {
      data->push_line("master", i);
      boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
    }

    std::cout << "Master dumps data:" << std::endl;
    data->dump();

    while(true)
    {
      if(data->isDone())
      {
        std::cout << "Master sees that the other process is done" << std::endl;
        break;
      }

      std::cout << "Master waits for the other process" << std::endl;
      boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
    }

    std::cout << "Master dumps again the data:" << std::endl;
    data->dump();
  }
  catch(interprocess_exception& ex)
  {
    std::cout << ex.what() << std::endl;
    return;
  }

  std::cout << "Master execution completed" << std::endl;
}

void ip07b(const char* id) // 5
{
  std::cout << "Process " << id << " started" << std::endl;

  try
  {
    ShMemManager smm(MY_SHARED, false);
    SharedMemoryLog* data = smm.getMemory();

    for(int i = 0; i < 7; ++i)
    {
      data->push_line(id, i);
      boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
    }
    data->done();

    std::cout << id << " dumps data:" << std::endl;
    data->dump();
  }
  catch(interprocess_exception& ex)
  {
    std::cout << ex.what() << std::endl;
    return;
  }

  std::cout << "Process " << id << " done" << std::endl;
}
1. the class SharedMemoryLog manages the concurrent access by the differnt processes, and it would be associated to memory placed in shared memory. Notice that any method is shielded by scoped_lock on the mutex owned by the class.
2. the class ShMemManager is used to manage the shared memory. A parameter in the constructor let us to determine if we want call it to actually create the shared memory - that would be the usage for the master process - or just to read it - for the secondary process.
3. the last line of the ShMemManager constructor associate the sml_ pointer to the SharedMemoryLog class to the shared memory we have just created or accessed. If we are in creation mode, we should actually call the constructor for the SharedMemoryLog asking it to use the shared memory. To do that we use the so called placement new construct "new (addr) SharedMemoryLog" specifying the memory address we should use. Otherwise we simply perform a static cast to the require type.
4. the function called from the master process. It just puts a few lines in the log (slowing down the process with a sleep call), dumps the log, stays in busy wait for the other process to complete, then performs another dump before returning. This busy wait is not very good programming, we should use a condition instead. We'll see how to do that in a next post.
5. the function called by the secondary process. The main diffences to the master is that we call the constructor for the ShMemManager specifying that we want to access shared memory already available; and then we let the master knowing we are done callint the function done() that sets an internal flag. As already said, this is not a very clean way of working, we'll see how to do better using a condition.

The code is based on an example provided by the Boost Library Documentation.

No comments:

Post a Comment