Reader Writer Problem - I


The Reader-Writer problem is a classic synchronisation problem in Operating Systems. There are several writers and readers accessing a shared resource such as a data array. The writers fill/update the data in the shared array while the readers use the data but do not modify the shared array. The problem is to ensure that writers do not overwrite data written either by other writers or before it is used by the readers. There exist many versions of the problem depending on the number of writers and readers as well as the size of the shared array.

In this lesson, we talk about the first and the simplest version of the problem, viz. the single-reader-single-writer problem. There is only a single writer writing into the array and a single reader using the data written into the array. The shared resource is a linear buffer/array and the process terminates when all the cells in the buffer are written by the writer and used by the reader.

What are the synchronisation issues here?

  • A reader MUST NOT use the data from a cell that is not yet written.

The solution requires the use of a condition variable from the pthread library. Two operations are defined on a condition variable.

  1. pthread_cond_wait(pthread_cont_t *cvar, pthread_mutex_t *mutex)
    A thread is blocked on this function if the special variable cvar is not ready. The thread gets unblocked only when cvar becomes ready upon receiving a signal from any other thread.
    The variable mutex ensures mutual exclusion so that the test-and-wait sequence on cvar is atomic. Secondly, a condition variable is always used to wait for modifying a shared resource. Therefore, when a condition variable becomes ready, the unblocked thread will modify a shared resource. We should remember that modifying a shared resource is always a critical section and the mutex variable is required to ensure exclusive access. Please read the APUE book or search the Internet to understand the use of this variable in this function call.
  2. pthread_cond_signal(pthread_cond_t *cvar)
    This sends a signal to indicate that the variable cvar is now ready. This also causes any one of the threads that are currently blocked in a pthread_cond_wait() function on the cvar condition variable to be unblocked. The specific thread that will be unblocked cannot be determined.

The writer thread needs to send a signal as and when needed whenever it writes into the cell. Similarly, the reader thread needs to check the condition if there are any newly-written items before continuing its execution.

1. Complete code and explanation


The code to handle the reader-writer problem is split into four sections for easy understanding.

  1. Headers and global variables
  2. The main process and thread creation
  3. The writer thread function
  4. The reader thread function
The code must be compiled along with the pthread library.
gcc -o srsw reader_writer_01.c -lpthread

Two versions of the code are given. The first, does not have any synchronisation and it illustrates the kind of errors we can get. The second is the correct version that synchronises the reader and writer threads when they modify the shared resources.

1.1 Headers and Global Variables

The key in writing multi-threaded programs is to clearly distinguish between shared variables that require that require synchronisation from those that don't. It is also a great idea to identify which variables are shared between writers and which between readers.

One way of achieving this is by defining structures that bring together such variables. In the header section below, it can be seen that there are two variables which are shared between the reader and the writer.

  1. The common buffer buff[NMAX]; and,
  2. The variable nready which is the number of items that are ready to be verified by the reader.
Therefore, these two are put in a structure called shared.

There are also two variables which are used only by the writer thread.

  1. int nput: The location in the shared buffer where data is to be written
  2. int nval: The value that will be written into the buffer
These two are put in a structure called wshared.

Finally, the two thread functions are void *writer(void *) and void *reader(void *). These two declarations complete the header section. The other variables are largely self-explanatory(!)

In [ ]:
//%cflags: -lpthread

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define MAXTHREADS  20
#define NMAX        2000000

int nitems;                           /* No. of items to write */
struct {
    int buff[NMAX];                   /* Shared buffer         */
    int nready;                       /* No. of items to be verified by reader */
} shared;
/* All variables shared by the writer threads are put in a
   variable called wshared so that we know easily that access
   to these variables may need synchronisation */
struct {
     int nput;
     int nval;
} wshared;

void *writer(void *);             /* Writer thread function */
void *reader(void *);             /* Reader thread function */
int count;                        /* No. items written by each
                                     writer */

1.2 The main() Function

The main() function is relatively straight-forward. It takes two command-line arguments and is executed as

./srsw <no. of threads> <no. of items>
nthreads is not used anywhere in this version of the program. nitems is the number of items to write.

pthread_create() function from the pthread library is used to create the two threads: the first call with writer() function and the second with reader() function. The writer function takes one argument count which returns the number of items written by the writer thread. As there is only one writer, count must be equal to nitems when the writer terminates.

After creating the two threads, main() waits for them to return by calling pthread_join() function twice. Once the threads terminate, the process also terminates.

In [ ]:
/* Process creates two threads: one writer and one reader and
   runs them concurrently. */
int main(int argc, char *argv[])
{
     int i, nthreads;
     pthread_t tid[2];
     
     nthreads = atoi(argv[1]);            /* No. of threads to create */
     nitems = atoi(argv[2]);              /* No. of items to write */
     count = 0;
     if (pthread_create(&tid[0], NULL, writer, 
                        (void *)&count) != 0) {
          fprintf(stderr, "Error creating writer thread\n");
          exit(1);
     }                            /* Created writer thread */
    
     if (pthread_create(&tid[1], NULL, reader, 
                        (void *)NULL) != 0) {
          fprintf(stderr, "Error creating reader thread\n");
          exit(1);
     }                            /* Created reader thread */
    
     pthread_join(tid[0], NULL);  /* Wait for writer to exit */
     printf("Writer thread completed after writing %d items\n", count);
     pthread_join(tid[1], NULL);  /* Wait for reader to exit */

     exit(0);
}

1.3 The writer() Thread

In the single-writer version, the writer thread is quite simple. Initially, both the shared variables nput = 0 and nval = 0. The writer

  1. Checks if the current value of nput ≥ nitems. If so, it returns.
  2. Otherwise, it writes nval into the nput location and then increments both.
  3. Finally, it increments count by one. Understand fully all the type conversions in that one line of code!

In [ ]:
/* Writer thread simply writes items into the shared buffer until 
   nitems are written */
void *writer(void *count)
{
     for ( ; ; ) {
          if (wshared.nput >= nitems) {
               return(NULL);
          }
         /* These three statements simply write the value
            nval into location nput such that buff[i] = i
            for all i */
          shared.buff[wshared.nput] = wshared.nval;
          wshared.nput++;
          wshared.nval++;

          *((int *) count) += 1;   /* Return no. of items written */
     }
}

1.4 The reader() Thread

The reader() function is very simple! It goes through the entire shared buffer and checks if, at each location i, the value written is also i. If not, it outputs an error message for every such mismatched entry.

In [ ]:
/* Reader thread checks every cell in the buff array to ensure that
   buff[i] = i. Otherwise, it prints an error giving information
   about the location(s) and value(s) written incorrectly */
void *reader(void *count)
{
     int i;

     for (i=0; i<nitems; i++) {
          if (shared.buff[i] != i)
               printf("Error found: buff[%d] = %d\n", i, shared.buff[i]);
     }
     
     return(NULL);
}

Let us compile and run this program; but, wait a minute, where are the synchronisation primitives? Well, there aren't any yet! Let us see what happens when this program runs without any synchronisation between the writer and reader threads. Run the program 10 times (as below) and carefully check the output.

./srsw 2 5000
  • How many times did the reader() output error messages?
  • How many errors did you find in a single run?
  • How many times did you get no errors?

Repeat the above experiment but with 25000 items. Again, answer the three questions above?

Finally, look carefully at the errors, they will be of two kinds:

  • buff[<num>] = 0
  • buff[i] = i for some i
Here is the big question: why is the second case an error? Can you answer this question by thinking deeply about when the thread is scheduled out?

2. The Synchronised Version!


This is the version where we put in the synchronisation primitives and make the program run correctly: the reader never prints anything! Let us now look at the modifications needed.

  • Condition Variable: cvar. Condition variable is initially set to not ready. This makes the reader thread go into a wait state if the writer thread is not scheduled first or if it has not written anything new into the shared buffer.
  • Associated Mutex Variable: cvmutex. This mutex is needed to make the condition variable work properly. It is used for synchronisation between reader and writer threads because both modify the shared nready variable. Writer thread increments it when it writes a new value in the buffer and the reader thread decrements it when it checks a buffer entry for correctness.

As both these are required really for the reader to synchronise with the writer, we create a new structure called rshared and wrap the two synchronisation variables within it.

The new header section is given below. Note the initialisation of the two synchronisation variables. These ensure that the mutex variable is initially unlocked and the condition variable is not ready and in wait state.

In [ ]:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define MAXTHREADS  20
#define NMAX        2000000

int nitems;                           /* No. of items to write */
struct {
    int buff[NMAX];                   /* Shared buffer         */
    int nready;                       /* No. of items to be verified by reader */
} shared;
/* All variables shared by the writer threads are put in a
   variable called wshared so that we know easily that access
   to these variables may need synchronisation */
struct {
     int nput;
     int nval;
} wshared;

struct {
     pthread_mutex_t cvmutex;
     pthread_cond_t cvar;
} rshared = {
     PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER
};

void *writer(void *);             /* Writer thread function */
void *reader(void *);             /* Reader thread function */
int count;                        /* No. items written by each
                                     writer */

2.1 Modified writer()

The writer now needs to check if all the previously written items are already checked by the reader. If that is the case, shared.nready = 0 and there is a chance that the reader is blocked on the condition variable. Therefore, the writer locks the mutex to get exclusive access and checks if shared.nready == 0. If so, it now sends a signal to indicate that a new value is written, increments the shared.nready variable and then unlocks the mutex so that the reader can run.

The new code is given below.

In [ ]:
void *writer(void *count)
{
     for ( ; ; ) {
          if (wshared.nput >= nitems) {
               return(NULL);
          }
         /* These three statements simply write the value
            nval into location nput such that buff[i] = i
            for all i */
          shared.buff[wshared.nput] = wshared.nval;
          wshared.nput++;
          wshared.nval++;

          pthread_mutex_lock(&rshared.cvmutex);
          if (shared.nready == 0)
               pthread_cond_signal(&rshared.cvar);
          shared.nready++;
          pthread_mutex_unlock(&rshared.cvmutex);
          
          *((int *) count) += 1;   /* Return no. of items written */
     }
}

2.2 The modified reader()

Reader also needs a bit of extra code. First the reader locks the mutex to get exclusive access and checks if there are any newly written items to read. This is done by checking the variable shared.nready == 0. If so, there are no items to verify and reader calls pthread_cond_wait() to get blocked until new items are written. If there are items to read, then the reader decrements shared.nready and unlocks the mutex so that writer thread can also run alongside.

Here is the new version.

In [ ]:
void *reader(void *count)
{
     int i;

     for (i=0; i<nitems; i++) {
          pthread_mutex_lock(&rshared.cvmutex);
          while (shared.nready == 0)
               pthread_cond_wait(&rshared.cvar, &rshared.cvmutex);
          shared.nready--;
          pthread_mutex_unlock(&rshared.cvmutex);
          
          if (shared.buff[i] != i)
               printf("Error found: buff[%d] = %d\n", i, shared.buff[i]);
     }
     
     return(NULL);
}

Compile and run the code again and see if you get any errors. Understand the code thoroughly before you go to the next version which is naturally more complex!


THE END