Producer-Consumer Containers


Three types of producer-consumer containers are discussed in this section.

In a producer-consumer relationship, producer threads must wait until the buffer is not full, deposit its data, and then notify the consumers that the buffer is not empty, and consumer threads must wait until the buffer is not empty, retrieve the data, and then notify the producers that the buffer is not full. When the number of unread data entries equals or exceeds the user-specified maximum capacity, the buffer is considered full.

pc_priority_queue

The following example uses a priority based producer consumer queue synchronize threads that produce and consume numbers. Notice that, if there is more than one item in the buffer at a particular instance, the produced numbers are consumed in a descending order.

Example <ospace/thread/examples/priqueue1.cpp>
#include <iostream>
#include <string>
#include <ospace/thread.h>

os_thread_toolkit init_thread;

typedef os_pc_priority_queue
  <
  long,
  vector< long >,
  less< long > 
  > os_pc_priority_queue_long;


os_pc_priority_queue_long pc_priority_queue( 5 ); // Max size 5.


void*
producer( void* args )
  {
  cout << "==> producer thread " << os_this_thread::tid() 
    << " waiting to produce" << endl;
  long number = (long)args;
  pc_priority_queue.push( number );
  cout << "==> producer thread " << os_this_thread::tid() 
    << " produces  " << number << endl;
  return 0;
  }


void*
consumer( void* )
  {
  cout << "<== consumer thread " << os_this_thread::tid() 
    << " waiting to consume" << endl;
  long number = pc_priority_queue.pop();
  cout << "<== consumer thread " << os_this_thread::tid() 
    << " consumes  " << number << endl;
  return 0;
  }


int 
main()
  {
  // Producers.
  os_thread pt1( producer, (void*)4 ); 
  os_thread pt2( producer, (void*)2 );
  os_thread pt3( producer, (void*)6 );
  os_thread pt4( producer, (void*)10 );
  os_thread pt5( producer, (void*)8 );
  os_thread pt6( producer, (void*)5 );

  os_this_thread::sleep( 5 );

  // Consumers.
  for ( int i = 0; i < 7; ++i )
    os_thread::create_thread( consumer );

  os_thread pt7( producer, (void*)1 );
  os_thread pt8( producer, (void*)11 );
  os_thread pt9( producer, (void*)3 );

  // Wait for the above 16 threads to complete.
  for ( int j = 0; j < 16; ++j )
    os_this_thread::wait_for_any_thread();

  return 0;
  }

==> producer thread <0x40015b90, 3, 4> waiting to produce
==> producer thread <0x40015b90, 3, 4> produces  4
==> producer thread <0x400172d8, 4, 4> waiting to produce
==> producer thread <0x400172d8, 4, 4> produces  2
==> producer thread <0x40018a20, 5, 4> waiting to produce
==> producer thread <0x40018a20, 5, 4> produces  6
==> producer thread <0x4001a168, 6, 4> waiting to produce
==> producer thread <0x4001a168, 6, 4> produces  10
==> producer thread <0x4001b8b0, 7, 4> waiting to produce
==> producer thread <0x4001b8b0, 7, 4> produces  8
==> producer thread <0x4001cff8, 8, 4> waiting to produce
<== consumer thread <0x4001e740, 9, 4> waiting to consume
<== consumer thread <0x4001e740, 9, 4> consumes  10
<== consumer thread <0x4001fe88, 10, 4> waiting to consume
<== consumer thread <0x4001fe88, 10, 4> consumes  8
<== consumer thread <0x401ea008, 15, 4> waiting to consume
<== consumer thread <0x401ea008, 15, 4> consumes  6
<== consumer thread <0x400215d0, 11, 4> waiting to consume
<== consumer thread <0x400215d0, 11, 4> consumes  5
==> producer thread <0x401ecad8, 17, 4> waiting to produce
==> producer thread <0x401ecad8, 17, 4> produces  11
<== consumer thread <0x40025ba8, 14, 4> waiting to consume
<== consumer thread <0x40025ba8, 14, 4> consumes  11
<== consumer thread <0x40024460, 13, 4> waiting to consume
<== consumer thread <0x40024460, 13, 4> consumes  4
==> producer thread <0x401eb520, 16, 4> waiting to produce
==> producer thread <0x401eb520, 16, 4> produces  1
==> producer thread <0x401ee220, 18, 4> waiting to produce
==> producer thread <0x401ee220, 18, 4> produces  3
==> producer thread <0x4001cff8, 8, 4> produces  5
<== consumer thread <0x40022d18, 12, 4> waiting to consume
<== consumer thread <0x40022d18, 12, 4> consumes  3

pc_queue

The following example uses a producer consumer queue to synchronize food producer and food consumer threads. Notice that, if there is more than one item in the buffer at a particular instance, the produced food is consumed in a first in first out order.

Example <ospace/thread/examples/pcqueue1.cpp>
#include <iostream>
#include <string>
#include <ospace/thread.h>

os_thread_toolkit init_thread;

typedef os_pc_queue
  <
  string,
  deque< string >
  > os_pc_queue_string;


os_pc_queue_string pc_queue( 3 ); // Max size 3.


void*
producer( void* args )
  {
  cout << "==> producer thread " << os_this_thread::tid() 
    << " waiting to produce" << endl;
  os_string food( (const char*)args );
  cout << "*** producer ***" << os_this_thread::tid() << endl;
  pc_queue.push( food );
  cout << "*** producer ***" << os_this_thread::tid() << endl;
  cout << "==> producer thread " << os_this_thread::tid() 
    << " produces  " << food << endl;
  return 0;
  }


void*
consumer( void* )
  {
  cout << "<== consumer thread " << os_this_thread::tid() 
    << " waiting to consume" << endl;
  cout << "*** consumer ***" << os_this_thread::tid() << endl;
  os_string food = pc_queue.pop();
  cout << "*** consumer ***" << os_this_thread::tid() << endl;
  cout << "<== consumer thread " << os_this_thread::tid() 
    << " consumes  " << food << endl;
  return 0;
  }


int 
main()
  {
  // Producers.
  os_thread pt1( producer, (void*)"bread" ); 
  os_thread pt2( producer, (void*)"butter" );
  os_thread pt3( producer, (void*)"milk" );
  os_thread pt4( producer, (void*)"candy" );

  // Consumers.
  os_thread ct1( consumer );
  os_thread ct2( consumer );
  os_thread ct3( consumer );
  os_thread ct4( consumer );
  os_thread ct5( consumer );
  
  os_this_thread::yield();

  // More producers.
  os_thread pt5( producer, (void*)"soda" );
  os_thread pt6( producer, (void*)"tea" );
  os_thread pt7( producer, (void*)"coffee" );

  // Wait for the above 12 threads to complete.
  for ( int i = 0; i < 12; ++i )
    os_this_thread::wait_for_any_thread();   

  return 0;
  }

==> producer thread <0x40015b90, 3, 4> waiting to produce
==> producer thread <0x40015b90, 3, 4> produces  bread
==> producer thread <0x400172d8, 4, 4> waiting to produce
==> producer thread <0x400172d8, 4, 4> produces  butter
<== consumer thread <0x4001e740, 9, 4> waiting to consume
<== consumer thread <0x4001e740, 9, 4> consumes  bread
==> producer thread <0x40025ba8, 14, 4> waiting to produce
==> producer thread <0x40025ba8, 14, 4> produces  coffee
<== consumer thread <0x4001fe88, 10, 4> waiting to consume
<== consumer thread <0x4001fe88, 10, 4> consumes  butter
==> producer thread <0x4001a168, 6, 4> waiting to produce
==> producer thread <0x4001a168, 6, 4> produces  candy
==> producer thread <0x40024460, 13, 4> waiting to produce
==> producer thread <0x40024460, 13, 4> produces  tea
==> producer thread <0x40022d18, 12, 4> waiting to produce
==> producer thread <0x40018a20, 5, 4> waiting to produce
<== consumer thread <0x4001b8b0, 7, 4> waiting to consume
<== consumer thread <0x4001b8b0, 7, 4> consumes  coffee
<== consumer thread <0x4001cff8, 8, 4> waiting to consume
<== consumer thread <0x4001cff8, 8, 4> consumes  candy
<== consumer thread <0x400215d0, 11, 4> waiting to consume
<== consumer thread <0x400215d0, 11, 4> consumes  tea
==> producer thread <0x40022d18, 12, 4> produces  soda
==> producer thread <0x40018a20, 5, 4> produces  milk

pc_stack

The following example uses a producer consumer stack to synchronize food producer and food consumer threads. Notice that, if there is more than one item in the buffer at a particular instance, the produced food is consumed in a last in first out order.

Example <ospace/thread/examples/pcstack1.cpp>
#include <iostream>
#include <string>
#include <ospace/thread.h>

os_thread_toolkit init_thread;

typedef os_pc_stack
  <
  string,
  deque< string >
  > os_pc_stack_string;


os_pc_stack_string pc_stack( 3 ); // Max size 3.


void*
producer( void* args )
  {
  cout << "==> producer thread " << os_this_thread::tid() 
    << " waiting to produce" << endl;
  os_string food( (const char*)args );
  pc_stack.push( food );
  cout << "==> producer thread " << os_this_thread::tid() 
    << " produces  " << food << endl;
  return 0;
  }


void*
consumer( void* )
  {
  cout << "<== consumer thread " << os_this_thread::tid() 
    << " waiting to consume" << endl;
  os_string food = pc_stack.pop();
  cout << "<== consumer thread " << os_this_thread::tid() 
    << " consumes  " << food << endl;
  return 0;
  }

int 
main()
  {
  // Producers.
  os_thread pt1( producer, (void*)"bread" ); 
  os_thread pt2( producer, (void*)"butter" );
  os_thread pt3( producer, (void*)"milk" );
  os_thread pt4( producer, (void*)"candy" );

  // Consumers.
  os_thread ct1( consumer );
  os_thread ct2( consumer );
  os_thread ct3( consumer );
  os_thread ct4( consumer );
  os_thread ct5( consumer );

  // More producers.
  os_thread pt5( producer, (void*)"soda" );
  os_thread pt6( producer, (void*)"tea" );
  os_thread pt7( producer, (void*)"coffee" );

  // Wait for the above 12 threads to complete.
  for ( int i = 0; i < 12; ++i )
    os_this_thread::wait_for_any_thread();   

  return 0;
  }

==> producer thread <0x40015b90, 3, 4> waiting to produce
==> producer thread <0x40015b90, 3, 4> produces  bread
==> producer thread <0x400172d8, 4, 4> waiting to produce
==> producer thread <0x400172d8, 4, 4> produces  butter
<== consumer thread <0x4001e740, 9, 4> waiting to consume
<== consumer thread <0x4001e740, 9, 4> consumes  butter
==> producer thread <0x40025ba8, 14, 4> waiting to produce
==> producer thread <0x40025ba8, 14, 4> produces  coffee
<== consumer thread <0x4001fe88, 10, 4> waiting to consume
<== consumer thread <0x4001fe88, 10, 4> consumes  coffee
==> producer thread <0x4001a168, 6, 4> waiting to produce
==> producer thread <0x4001a168, 6, 4> produces  candy
==> producer thread <0x40024460, 13, 4> waiting to produce
==> producer thread <0x40024460, 13, 4> produces  tea
==> producer thread <0x40022d18, 12, 4> waiting to produce
==> producer thread <0x40018a20, 5, 4> waiting to produce
<== consumer thread <0x4001b8b0, 7, 4> waiting to consume
<== consumer thread <0x4001b8b0, 7, 4> consumes  tea
<== consumer thread <0x4001cff8, 8, 4> waiting to consume
<== consumer thread <0x4001cff8, 8, 4> consumes  soda
<== consumer thread <0x400215d0, 11, 4> waiting to consume
<== consumer thread <0x400215d0, 11, 4> consumes  milk
==> producer thread <0x40022d18, 12, 4> produces  soda
==> producer thread <0x40018a20, 5, 4> produces  milk

Copyright©1994-2026 Recursion Software LLC
All Rights Reserved - For use by licensed users only.