pthreads stands for POSIX Threads which is the de facto standard threading related API that operating systems need to support for thread managements. Almost all Unix-like operating systems such as Linux, MacOS, Android, FreeBSD support pthreads.

The pthreads interfaces are defined in pthread.h header file in C programming language. It contains function declarations, mappings for threading interfaces, and a set of constants used by those functions.

I’m going to look over some of the pthreads primitives and play them with some simple examples.

Threads

pthread data type

pthread_t is a data type to represent threads:

pthread_t my_thread;

It contains information such as a unique identifier, execution state that’s used by the pthread library.

pthread attributes

The thread-specific attributes can be set through pthread_attr_t data type.

The code below is an example of setting a thread as detachable:

pthread_attr_t my_thread_attr;
pthread_attr_init(&my_thread_attr);
pthread_attr_setdetachstate(&my_thread_attr, PTHREAD_CREATE_DETACHED);
  • pthread_attr_t: a pthread attributes data type
  • pthread_attr_init: initializes the thread attributes object pointed to by attr with default attribute values
  • pthread_attr_setdetachstate: sets the thread to be created in a detached state (default: PTHREAD_CREATE_JOINABLE)
  • PTHREAD_CREATE_DETACHED: a constant value to set threads as detachable

Both pthread_attr_init() and pthread_attr_setdetachstate() return 0 on success or a nonzero integer value otherwise.

Detached threads cannot be joined and do not terminate if the process (or the parent thread) terminates. They only terminate when they’re done execution.

Some other functions to set the attributes:

  • pthread_attr_setschedpolicy(): sets the scheduling policy of the thread
  • pthread_attr_setscope(): sets the contention scope of the thread whether it’s system scope or process scope
  • pthread_attr_setstacksize(): sets the minimum stack size (in bytes) of the thread

Creating threads

pthread provides pthread_create() function for thread creation.

int pthread_create(pthread_t *thread,
                   const pthread_attr_t *attr,
                   void *(*start_routine)(void *),
                   void *arg);
  • thread: pthread data structure
  • attr: pthread attribute data (if it’s NULL, the the thread is created with default attributes)
  • start_routine: a procedure to call
  • arg: arguments to pass into start_routine

The function return 0 on success or a nonzero integer value otherwise.

Joining threads

pthread_join() waits for the thread to terminate.

int pthread_join(pthread_t thread, void **retval);
  • thread: joinable pthread data structure
  • retval: to store the exit status of the target thread (i.e., the value that’s passed into pthread_exit())

Terminating threads

Lastly, pthread_exit() function terminates the calling thread and returns a value to the retval argument of pthread_join()

pthread_exit(0);

This function always succeeds and does not return to the caller.


Here’s the example code that prints out Hello, world! using pthread that competes for resources with all other threads in all processes on the system:

#include <pthread.h>
#include <stdio.h>

void *greet(void *arg)
{
    printf("Hello, world!\n");
    pthread_exit(0);
}

int main(void)
{
    int retval;

    pthread_t my_thread;
    pthread_attr_t my_thread_attr;
    pthread_attr_init(&my_thread_attr);
    pthread_attr_setscope(&my_thread_attr, PTHREAD_SCOPE_SYSTEM);
    pthread_create(&my_thread, &my_thread_attr, greet, NULL);
    pthread_join(my_thread, (void *)&retval);

    printf("my_thread terminates with an exit status of %d\n", retval);

    return 0;
}

The output of the program:

Hello, world!
my_thread terminates with an exit status of 0

Do note that without pthread_join(), the calling thread can terminate before even my_thread executes, and it prints out some garbage value because pthread_exit() couldn’t copy the exit status to retval. For example, the output of the program would look like this:

my_thread terminates with an exit status of -521087904

Mutexes

Just like pthread data types, a pthread mutexe can be intialized with its own specific attributes:

pthread_mutex_t my_mutex;
pthread_mutexattr_t my_mutex_attr;
pthread_mutexattr_init(&my_mutex_attr);
pthread_mutex_init(&my_mutex, &my_mutex_attr);
  • pthread_mutex_t: a data type to represent mutexes:
  • pthread_mutexattr_t: the mutex attributes data type
  • pthread_mutexattr_init: initializes the mutex attributes
  • pthread_mutex_init: initializes a mutex with the specified attributes (if attr is NULL, all attributes are set to the default)

The more succint way to initialize a mutex with the default attributes:

pthread_mutex_t my_mutex = PTHREAD_MUTEX_INITIALIZER;

The locking and unlocking operations in pthreads are explicitly implemented:

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

The critical section thereby must be written bewteen the two operations:

pthread_mutex_lock(&my_mutex);
// critical section
pthread_mutex_unlock(&my_mutex);

Explicit unlocking implies that programmers can easily forget to unlock the mutexes. Besides, there’s a risk that wrong mutexes will be unlocked if it’s not implemented carefully. Therefore, unlocking the correct mutex must not be forgotten.

And every mutex must be freed up eventually:

int pthread_mutex_destroy(pthread_mutex_t *mutex);  

Condition Variables

Just like mutexes, a pthread condition variable can be intialized with its own specific attributes:

pthread_cond_t my_cond;
pthread_cond_attr my_cond_attr;
pthread_condattr_init(&my_cond_attr);
pthread_cond_init(&my_cond, &my_cond_attr);
  • pthread_cond_t: a data type to represent condition variables:
  • pthread_cond_attr: the condition variable attributes data type
  • pthread_condattr_init: initializes the condition variable attributes
  • pthread_cond_init: initializes a condition variable with the specified attributes (if attr is NULL, all attributes are set to the default)

The more succint way to initialize a condition variable with the default attributes:

pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;

To wait (or block) on a condition variable:

int pthread_cond_wait(pthread_cond_t *cond, 
                      pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond,
                           pthread_mutex_t *mutex,
                           const struct timespec *timespec);

pthread_cond_timedwait() provides a way to return an error if timespec passed before cond is signaled or broadcasted.

To unblock a thread(signal) or threads(broadcast) that are waiting on a condition variable:

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

Examples

First off, I’m going to implement a queue using a linked list:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

typedef struct node
{
    int val;
    struct node *next;
} node;

node *head = NULL;
node *tail = NULL;

int enqueue(int val);
int dequeue();
void print_queue();

int main(int argc, char *argv[])
{
    // todo

    return 0;
}

int enqueue(int val)
{
    printf("Adding %i\n", val);

    node *n = malloc(sizeof(node));
    if (n == NULL) // sanity check
        return 1;
    n->val = val;
    n->next = NULL;

    if (head == NULL)
        head = n;
    if (tail == NULL)
        tail = n;

    tail->next = n;
    sleep(1 / 10); // Simulate the delay
    tail = n;

    return 0;
}

int dequeue()
{
    node *tmp = head;
    if (tmp == NULL)
    {
        printf("(The queue is empty. No node to dequeue.)\n");
        return 1;
    }
    printf("Removing %i\n", tmp->val);
    head = tmp->next;

    return 0;
}

void print_queue()
{
    printf("queue: [");
    for (node *tmp = head; tmp != NULL; tmp = tmp->next)
        (tmp->next == NULL) ? (printf("%i", tmp->val)) : (printf("%i -> ", tmp->val));
    printf("]\n");
}

The insertion takes 100 milliseconds to simulate the race condition when multiple threads are concurrently running without any mutex.

For adding nodes into a queue using pthreads, I will use 3 (producer) threads:

#define NUM_PRODUCERS 3

And each producer thread will insert 10 nodes with a value of the index of it:

void *produce(void *param)
{
    int num = *((int *)param);
    for (int i = 0; i < 10; i++)
        enqueue(num);
    return 0;
}

In main function, each producer thread takes its index as the parameter:

int main(int argc, char *argv[])
{
    int producers_nums[NUM_PRODUCERS];
    pthread_t producers[NUM_PRODUCERS];
    int i = 0;

    // Create (NUM_PRODUCERS) producer threads.
    for (i = 0; i < NUM_PRODUCERS; i++)
    {
        producers_nums[i] = i;
        pthread_create(&producers[i], NULL, produce, &producers_nums[i]);
    }
    // Wait for all threads to exit.
    for (i = 0; i < NUM_PRODUCERS; i++)
    {
        pthread_join(producers[i], NULL);
    }

    print_queue();

    return 0;
}

As a result, the length of the queue must be 3 * 10.

But the actual result of the example above does not look as we expected:

Adding 0
...(omitted)
queue: [0 -> 0 -> 1 -> 1 -> 1 -> 2]

(The results may vary every time you run the program.)

The reason being is that all the threads are competing for the same pointer variable - tail:

tail->next = n;
sleep(1 / 10); // Simulate the delay
tail = n;

Now we need to use a mutex to coordinate the threads. To do this, the mutex type must be declared in the global scope to be referenced by all the threads in the program:

pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;

Inside produce(), we will wrap around enqueue() with pthread_mutex_lock() and pthread_mutex_unlock():

void *produce(void *param)
{
    int num = *((int *)param);
    for (int i = 0; i < 10; i++)
    {
        pthread_mutex_lock(&m);
        enqueue(num); // critical section
        pthread_mutex_unlock(&m);
    }
    return 0;
}

We can see the 30 nodes in the queue in the end:

Adding 1
...(omitted)
queue: [1 -> 1 -> 1 -> 1 -> 1 -> 1 -> 1 -> 1 -> 0 -> 0 -> 0 -> 0 -> 0 -> 0 -> 0 -> 0 -> 0 -> 0 -> 1 -> 1 -> 2 -> 2 -> 2 -> 2 -> 2 -> 2 -> 2 -> 2 -> 2 -> 2]

Because the order of insertions are scheduled by the operating system, the order of the elements inside the queue is out of our control and varies every time we execute the program.

Lastly, we will limit the size of the queue to 5 and create a consumer thread which will remove(dequeue()) the node from the head when the queue reaches its maximum size.

The diagram below describes the overall picture of the last example:

producers and consumer

The full example code is as follows:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#define NUM_PRODUCERS 3
#define MAX_QUEUE_SIZE 5

typedef struct node
{
    int val;
    struct node *next;
} node;

node *head = NULL;
node *tail = NULL;

int enqueue(int val);
int dequeue();
void print_queue();

void *produce(void *param);
void *consume(void *param);

int queue_size = 0;
int quit = 0; // to prevent infinite loop of conumser

pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv_con = PTHREAD_COND_INITIALIZER;
pthread_cond_t cv_prod = PTHREAD_COND_INITIALIZER;

int main(int argc, char *argv[])
{
    int producers_nums[NUM_PRODUCERS];
    pthread_t producers[NUM_PRODUCERS];
    pthread_t consumer; // Declare a consumer thread.
    int i = 0;

    // Create (NUM_PRODUCERS) producer threads.
    for (i = 0; i < NUM_PRODUCERS; i++)
    {
        producers_nums[i] = i;
        pthread_create(&producers[i], NULL, produce, &producers_nums[i]);
    }

    // Create a consumer thread.
    pthread_create(&consumer, NULL, consume, NULL);

    // Wait for all threads to exit.
    for (i = 0; i < NUM_PRODUCERS; i++)
    {
        pthread_join(producers[i], NULL);
    }
    pthread_join(consumer, NULL);

    print_queue();

    return 0;
}

int enqueue(int val)
{
    printf("Adding %i\n", val);

    node *n = malloc(sizeof(node));
    if (n == NULL) // sanity check
        return 1;
    n->val = val;
    n->next = NULL;

    if (head == NULL)
        head = n;
    if (tail == NULL)
        tail = n;

    tail->next = n;
    sleep(1 / 10); // Simulate the delay
    tail = n;

    return 0;
}

int dequeue()
{
    node *tmp = head;
    if (tmp == NULL)
    {
        printf("(The queue is empty. No node to dequeue.)\n");
        return 1;
    }
    printf("Removing %i\n", tmp->val);
    head = tmp->next;

    return 0;
}

void print_queue()
{
    printf("queue: [");
    for (node *tmp = head; tmp != NULL; tmp = tmp->next)
        (tmp->next == NULL) ? (printf("%i", tmp->val)) : (printf("%i -> ", tmp->val));
    printf("]\n");
}

void *produce(void *param)
{
    int num = *((int *)param);
    for (int i = 0; i < 10; i++)
    {
        pthread_mutex_lock(&m);

        if (MAX_QUEUE_SIZE < queue_size)
            exit(1); // underflow error

        while (queue_size == MAX_QUEUE_SIZE)
            pthread_cond_wait(&cv_prod, &m);

        enqueue(num);
        queue_size++;

        pthread_mutex_unlock(&m);
        pthread_cond_signal(&cv_con);
    }

    quit++;

    return 0;
}

void *consume(void *param)
{
    while (quit < NUM_PRODUCERS) // break out the while loop if all producer threads exit
    {
        pthread_mutex_lock(&m);

        if (queue_size < 0)
            exit(1); // overflow error

        while (queue_size == 0)
            pthread_cond_wait(&cv_con, &m);

        while (head != NULL)
            dequeue();
        queue_size--;

        pthread_mutex_unlock(&m);
        pthread_cond_signal(&cv_prod);
    }
    return 0;
}

I deliberately added a global integer variable quit to prevent the inifinite while loop of the consumer thread.

The result:

Adding 0
Adding 0
Adding 0
Adding 0
Adding 0
Removing 0
Removing 0
Removing 0
Removing 0
Removing 0
Adding 0
Adding 2
Adding 2
Adding 2
Adding 2
Removing 0
Removing 2
Removing 2
Removing 2
Removing 2
Adding 2
Adding 2
Adding 0
Adding 0
Adding 0
Removing 2
Removing 2
Removing 0
Removing 0
Removing 0
Adding 1
Adding 1
Adding 1
Adding 1
Adding 1
Removing 1
Removing 1
Removing 1
Removing 1
Removing 1
Adding 0
Adding 1
Adding 1
Adding 1
Adding 1
Removing 0
Removing 1
Removing 1
Removing 1
Removing 1
Adding 1
Adding 2
Adding 2
Adding 2
Adding 2
Removing 1
Removing 2
Removing 2
Removing 2
Removing 2
queue: []

We can confirm that all the elements are consumed and the queue becomes empty in the end.