Producer-Consumer Containers |
Three types of producer-consumer containers are discussed in this section.
In a producer-consumer relationship, producer threads must wait until the buffer is not full, deposit its data, and then notify the consumers that the buffer is not empty, and consumer threads must wait until the buffer is not empty, retrieve the data, and then notify the producers that the buffer is not full. When the number of unread data entries equals or exceeds the user-specified maximum capacity, the buffer is considered full.
The following example uses a priority based producer consumer queue synchronize threads that produce and consume numbers. Notice that, if there is more than one item in the buffer at a particular instance, the produced numbers are consumed in a descending order.
#include <iostream>
#include <string>
#include <ospace/thread.h>
os_thread_toolkit init_thread;
typedef os_pc_priority_queue
<
long,
vector< long >,
less< long >
> os_pc_priority_queue_long;
os_pc_priority_queue_long pc_priority_queue( 5 ); // Max size 5.
void*
producer( void* args )
{
cout << "==> producer thread " << os_this_thread::tid()
<< " waiting to produce" << endl;
long number = (long)args;
pc_priority_queue.push( number );
cout << "==> producer thread " << os_this_thread::tid()
<< " produces " << number << endl;
return 0;
}
void*
consumer( void* )
{
cout << "<== consumer thread " << os_this_thread::tid()
<< " waiting to consume" << endl;
long number = pc_priority_queue.pop();
cout << "<== consumer thread " << os_this_thread::tid()
<< " consumes " << number << endl;
return 0;
}
int
main()
{
// Producers.
os_thread pt1( producer, (void*)4 );
os_thread pt2( producer, (void*)2 );
os_thread pt3( producer, (void*)6 );
os_thread pt4( producer, (void*)10 );
os_thread pt5( producer, (void*)8 );
os_thread pt6( producer, (void*)5 );
os_this_thread::sleep( 5 );
// Consumers.
for ( int i = 0; i < 7; ++i )
os_thread::create_thread( consumer );
os_thread pt7( producer, (void*)1 );
os_thread pt8( producer, (void*)11 );
os_thread pt9( producer, (void*)3 );
// Wait for the above 16 threads to complete.
for ( int j = 0; j < 16; ++j )
os_this_thread::wait_for_any_thread();
return 0;
}
==> producer thread <0x40015b90, 3, 4> waiting to produce
==> producer thread <0x40015b90, 3, 4> produces 4
==> producer thread <0x400172d8, 4, 4> waiting to produce
==> producer thread <0x400172d8, 4, 4> produces 2
==> producer thread <0x40018a20, 5, 4> waiting to produce
==> producer thread <0x40018a20, 5, 4> produces 6
==> producer thread <0x4001a168, 6, 4> waiting to produce
==> producer thread <0x4001a168, 6, 4> produces 10
==> producer thread <0x4001b8b0, 7, 4> waiting to produce
==> producer thread <0x4001b8b0, 7, 4> produces 8
==> producer thread <0x4001cff8, 8, 4> waiting to produce
<== consumer thread <0x4001e740, 9, 4> waiting to consume
<== consumer thread <0x4001e740, 9, 4> consumes 10
<== consumer thread <0x4001fe88, 10, 4> waiting to consume
<== consumer thread <0x4001fe88, 10, 4> consumes 8
<== consumer thread <0x401ea008, 15, 4> waiting to consume
<== consumer thread <0x401ea008, 15, 4> consumes 6
<== consumer thread <0x400215d0, 11, 4> waiting to consume
<== consumer thread <0x400215d0, 11, 4> consumes 5
==> producer thread <0x401ecad8, 17, 4> waiting to produce
==> producer thread <0x401ecad8, 17, 4> produces 11
<== consumer thread <0x40025ba8, 14, 4> waiting to consume
<== consumer thread <0x40025ba8, 14, 4> consumes 11
<== consumer thread <0x40024460, 13, 4> waiting to consume
<== consumer thread <0x40024460, 13, 4> consumes 4
==> producer thread <0x401eb520, 16, 4> waiting to produce
==> producer thread <0x401eb520, 16, 4> produces 1
==> producer thread <0x401ee220, 18, 4> waiting to produce
==> producer thread <0x401ee220, 18, 4> produces 3
==> producer thread <0x4001cff8, 8, 4> produces 5
<== consumer thread <0x40022d18, 12, 4> waiting to consume
<== consumer thread <0x40022d18, 12, 4> consumes 3
The following example uses a producer consumer queue to synchronize food producer and food consumer threads. Notice that, if there is more than one item in the buffer at a particular instance, the produced food is consumed in a first in first out order.
#include <iostream>
#include <string>
#include <ospace/thread.h>
os_thread_toolkit init_thread;
typedef os_pc_queue
<
string,
deque< string >
> os_pc_queue_string;
os_pc_queue_string pc_queue( 3 ); // Max size 3.
void*
producer( void* args )
{
cout << "==> producer thread " << os_this_thread::tid()
<< " waiting to produce" << endl;
os_string food( (const char*)args );
cout << "*** producer ***" << os_this_thread::tid() << endl;
pc_queue.push( food );
cout << "*** producer ***" << os_this_thread::tid() << endl;
cout << "==> producer thread " << os_this_thread::tid()
<< " produces " << food << endl;
return 0;
}
void*
consumer( void* )
{
cout << "<== consumer thread " << os_this_thread::tid()
<< " waiting to consume" << endl;
cout << "*** consumer ***" << os_this_thread::tid() << endl;
os_string food = pc_queue.pop();
cout << "*** consumer ***" << os_this_thread::tid() << endl;
cout << "<== consumer thread " << os_this_thread::tid()
<< " consumes " << food << endl;
return 0;
}
int
main()
{
// Producers.
os_thread pt1( producer, (void*)"bread" );
os_thread pt2( producer, (void*)"butter" );
os_thread pt3( producer, (void*)"milk" );
os_thread pt4( producer, (void*)"candy" );
// Consumers.
os_thread ct1( consumer );
os_thread ct2( consumer );
os_thread ct3( consumer );
os_thread ct4( consumer );
os_thread ct5( consumer );
os_this_thread::yield();
// More producers.
os_thread pt5( producer, (void*)"soda" );
os_thread pt6( producer, (void*)"tea" );
os_thread pt7( producer, (void*)"coffee" );
// Wait for the above 12 threads to complete.
for ( int i = 0; i < 12; ++i )
os_this_thread::wait_for_any_thread();
return 0;
}
==> producer thread <0x40015b90, 3, 4> waiting to produce
==> producer thread <0x40015b90, 3, 4> produces bread
==> producer thread <0x400172d8, 4, 4> waiting to produce
==> producer thread <0x400172d8, 4, 4> produces butter
<== consumer thread <0x4001e740, 9, 4> waiting to consume
<== consumer thread <0x4001e740, 9, 4> consumes bread
==> producer thread <0x40025ba8, 14, 4> waiting to produce
==> producer thread <0x40025ba8, 14, 4> produces coffee
<== consumer thread <0x4001fe88, 10, 4> waiting to consume
<== consumer thread <0x4001fe88, 10, 4> consumes butter
==> producer thread <0x4001a168, 6, 4> waiting to produce
==> producer thread <0x4001a168, 6, 4> produces candy
==> producer thread <0x40024460, 13, 4> waiting to produce
==> producer thread <0x40024460, 13, 4> produces tea
==> producer thread <0x40022d18, 12, 4> waiting to produce
==> producer thread <0x40018a20, 5, 4> waiting to produce
<== consumer thread <0x4001b8b0, 7, 4> waiting to consume
<== consumer thread <0x4001b8b0, 7, 4> consumes coffee
<== consumer thread <0x4001cff8, 8, 4> waiting to consume
<== consumer thread <0x4001cff8, 8, 4> consumes candy
<== consumer thread <0x400215d0, 11, 4> waiting to consume
<== consumer thread <0x400215d0, 11, 4> consumes tea
==> producer thread <0x40022d18, 12, 4> produces soda
==> producer thread <0x40018a20, 5, 4> produces milk
The following example uses a producer consumer stack to synchronize food producer and food consumer threads. Notice that, if there is more than one item in the buffer at a particular instance, the produced food is consumed in a last in first out order.
#include <iostream>
#include <string>
#include <ospace/thread.h>
os_thread_toolkit init_thread;
typedef os_pc_stack
<
string,
deque< string >
> os_pc_stack_string;
os_pc_stack_string pc_stack( 3 ); // Max size 3.
void*
producer( void* args )
{
cout << "==> producer thread " << os_this_thread::tid()
<< " waiting to produce" << endl;
os_string food( (const char*)args );
pc_stack.push( food );
cout << "==> producer thread " << os_this_thread::tid()
<< " produces " << food << endl;
return 0;
}
void*
consumer( void* )
{
cout << "<== consumer thread " << os_this_thread::tid()
<< " waiting to consume" << endl;
os_string food = pc_stack.pop();
cout << "<== consumer thread " << os_this_thread::tid()
<< " consumes " << food << endl;
return 0;
}
int
main()
{
// Producers.
os_thread pt1( producer, (void*)"bread" );
os_thread pt2( producer, (void*)"butter" );
os_thread pt3( producer, (void*)"milk" );
os_thread pt4( producer, (void*)"candy" );
// Consumers.
os_thread ct1( consumer );
os_thread ct2( consumer );
os_thread ct3( consumer );
os_thread ct4( consumer );
os_thread ct5( consumer );
// More producers.
os_thread pt5( producer, (void*)"soda" );
os_thread pt6( producer, (void*)"tea" );
os_thread pt7( producer, (void*)"coffee" );
// Wait for the above 12 threads to complete.
for ( int i = 0; i < 12; ++i )
os_this_thread::wait_for_any_thread();
return 0;
}
==> producer thread <0x40015b90, 3, 4> waiting to produce
==> producer thread <0x40015b90, 3, 4> produces bread
==> producer thread <0x400172d8, 4, 4> waiting to produce
==> producer thread <0x400172d8, 4, 4> produces butter
<== consumer thread <0x4001e740, 9, 4> waiting to consume
<== consumer thread <0x4001e740, 9, 4> consumes butter
==> producer thread <0x40025ba8, 14, 4> waiting to produce
==> producer thread <0x40025ba8, 14, 4> produces coffee
<== consumer thread <0x4001fe88, 10, 4> waiting to consume
<== consumer thread <0x4001fe88, 10, 4> consumes coffee
==> producer thread <0x4001a168, 6, 4> waiting to produce
==> producer thread <0x4001a168, 6, 4> produces candy
==> producer thread <0x40024460, 13, 4> waiting to produce
==> producer thread <0x40024460, 13, 4> produces tea
==> producer thread <0x40022d18, 12, 4> waiting to produce
==> producer thread <0x40018a20, 5, 4> waiting to produce
<== consumer thread <0x4001b8b0, 7, 4> waiting to consume
<== consumer thread <0x4001b8b0, 7, 4> consumes tea
<== consumer thread <0x4001cff8, 8, 4> waiting to consume
<== consumer thread <0x4001cff8, 8, 4> consumes soda
<== consumer thread <0x400215d0, 11, 4> waiting to consume
<== consumer thread <0x400215d0, 11, 4> consumes milk
==> producer thread <0x40022d18, 12, 4> produces soda
==> producer thread <0x40018a20, 5, 4> produces milk
Copyright©1994-2026 Recursion
Software LLC
All Rights Reserved - For use by licensed users only.